00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026 #ifdef DRIZZLED
00027 #include <config.h>
00028 #include <drizzled/common.h>
00029 #include <drizzled/session.h>
00030 #endif
00031
00032 #include "cslib/CSConfig.h"
00033 #include <inttypes.h>
00034
00035 #include <sys/types.h>
00036 #include <sys/stat.h>
00037 #include <stdlib.h>
00038 #include <time.h>
00039
00040
00041 #include "cslib/CSGlobal.h"
00042 #include "cslib/CSStrUtil.h"
00043 #include "cslib/CSLog.h"
00044 #include "cslib/CSPath.h"
00045 #include "cslib/CSDirectory.h"
00046
00047 #include "ha_pbms.h"
00048
00049
00050 #include "mysql_ms.h"
00051 #include "database_ms.h"
00052 #include "open_table_ms.h"
00053 #include "discover_ms.h"
00054 #include "systab_util_ms.h"
00055
00056 #include "systab_cloud_ms.h"
00057
00058 DT_FIELD_INFO pbms_cloud_info[]=
00059 {
00060 {"Id", NOVAL, NULL, MYSQL_TYPE_LONG, NULL, NOT_NULL_FLAG, "The Cloud storage reference ID"},
00061 {"Server", 1024, NULL, MYSQL_TYPE_VARCHAR, &UTF8_CHARSET, NOT_NULL_FLAG, "S3 server name"},
00062 {"Bucket", 124, NULL, MYSQL_TYPE_VARCHAR, &UTF8_CHARSET, NOT_NULL_FLAG, "S3 bucket name"},
00063 {"PublicKey", 124, NULL, MYSQL_TYPE_VARCHAR, &UTF8_CHARSET, NOT_NULL_FLAG, "S3 public key"},
00064 {"PrivateKey", 124, NULL, MYSQL_TYPE_VARCHAR, &UTF8_CHARSET, NOT_NULL_FLAG, "S3 private key"},
00065 {NULL,NOVAL, NULL, MYSQL_TYPE_STRING,NULL, 0, NULL}
00066 };
00067
00068 DT_KEY_INFO pbms_cloud_keys[]=
00069 {
00070 {"pbms_cloud_pk", PRI_KEY_FLAG, {"Id", NULL}},
00071 {NULL, 0, {NULL}}
00072 };
00073
00074 #define MIN_CLOUD_TABLE_SIZE 4
00075
00076
00077 void MSCloudTable::startUp()
00078 {
00079 MSCloudInfo::startUp();
00080 }
00081
00082
00083 void MSCloudTable::shutDown()
00084 {
00085 MSCloudInfo::shutDown();
00086 }
00087
00088
00089 void MSCloudTable::loadTable(MSDatabase *db)
00090 {
00091
00092 enter_();
00093
00094 push_(db);
00095 lock_(MSCloudInfo::gCloudInfo);
00096
00097 if (MSCloudInfo::gMaxInfoRef == 0) {
00098 CSPath *path;
00099 path = getSysFile(getPBMSPath(RETAIN(db->myDatabasePath)), CLOUD_TABLE_NAME, MIN_CLOUD_TABLE_SIZE);
00100 push_(path);
00101
00102 if (path->exists()) {
00103 CSFile *file;
00104 SysTabRec *cloudData;
00105 const char *server, *bucket, *pubKey, *privKey;
00106 uint32_t info_id;
00107 MSCloudInfo *info;
00108 size_t size;
00109
00110 new_(cloudData, SysTabRec("pbms", CLOUD_TABLE_NAME".dat", CLOUD_TABLE_NAME));
00111 push_(cloudData);
00112
00113 file = path->openFile(CSFile::READONLY);
00114 push_(file);
00115 size = file->getEOF();
00116 cloudData->setLength(size);
00117 file->read(cloudData->getBuffer(0), 0, size, size);
00118 release_(file);
00119
00120 cloudData->firstRecord();
00121 MSCloudInfo::gMaxInfoRef = cloudData->getInt4Field();
00122
00123 if (! cloudData->isValidRecord())
00124 MSCloudInfo::gMaxInfoRef = 1;
00125
00126 while (cloudData->nextRecord()) {
00127 info_id = cloudData->getInt4Field();
00128 server = cloudData->getStringField();
00129 bucket = cloudData->getStringField();
00130 pubKey = cloudData->getStringField();
00131 privKey = cloudData->getStringField();
00132
00133 if (cloudData->isValidRecord()) {
00134 if (info_id > MSCloudInfo::gMaxInfoRef) {
00135 char msg[80];
00136 snprintf(msg, 80, "Cloud info id (%"PRIu32") larger than expected (%"PRIu32")\n", info_id, MSCloudInfo::gMaxInfoRef);
00137 CSL.log(self, CSLog::Warning, "pbms "CLOUD_TABLE_NAME".dat :possible damaged file or record. ");
00138 CSL.log(self, CSLog::Warning, msg);
00139 MSCloudInfo::gMaxInfoRef = info_id +1;
00140 }
00141 if ( MSCloudInfo::gCloudInfo->get(info_id)) {
00142 char msg[80];
00143 snprintf(msg, 80, "Duplicate Cloud info id (%"PRIu32") being ignored\n", info_id);
00144 CSL.log(self, CSLog::Warning, "pbms "CLOUD_TABLE_NAME".dat :possible damaged file or record. ");
00145 CSL.log(self, CSLog::Warning, msg);
00146 } else {
00147 new_(info, MSCloudInfo( info_id, server, bucket, pubKey, privKey));
00148 MSCloudInfo::gCloudInfo->set(info_id, info);
00149 }
00150 }
00151 }
00152 release_(cloudData); cloudData = NULL;
00153
00154 } else
00155 MSCloudInfo::gMaxInfoRef = 1;
00156
00157 release_(path);
00158
00159 }
00160 unlock_(MSCloudInfo::gCloudInfo);
00161
00162 release_(db);
00163
00164 exit_();
00165 }
00166
00167 void MSCloudTable::saveTable(MSDatabase *db)
00168 {
00169 SysTabRec *cloudData;
00170 MSCloudInfo *info;
00171 enter_();
00172
00173 push_(db);
00174
00175 new_(cloudData, SysTabRec("pbms", CLOUD_TABLE_NAME".dat", CLOUD_TABLE_NAME));
00176 push_(cloudData);
00177
00178
00179 cloudData->clear();
00180 lock_(MSCloudInfo::gCloudInfo);
00181
00182 cloudData->beginRecord();
00183 cloudData->setInt4Field(MSCloudInfo::gMaxInfoRef);
00184 cloudData->endRecord();
00185 for (int i = 0;(info = (MSCloudInfo*) MSCloudInfo::gCloudInfo->itemAt(i)); i++) {
00186
00187 cloudData->beginRecord();
00188 cloudData->setInt4Field(info->getCloudRefId());
00189 cloudData->setStringField(info->getServer());
00190 cloudData->setStringField(info->getBucket());
00191 cloudData->setStringField(info->getPublicKey());
00192 cloudData->setStringField(info->getPrivateKey());
00193 cloudData->endRecord();
00194 }
00195 unlock_(MSCloudInfo::gCloudInfo);
00196
00197 restoreTable(RETAIN(db), cloudData->getBuffer(0), cloudData->length(), false);
00198
00199 release_(cloudData);
00200 release_(db);
00201 exit_();
00202 }
00203
00204
00205 MSCloudTable::MSCloudTable(MSSystemTableShare *share, TABLE *table):
00206 MSOpenSystemTable(share, table),
00207 iCloudIndex(0)
00208 {
00209 }
00210
00211 MSCloudTable::~MSCloudTable()
00212 {
00213
00214 }
00215
00216 void MSCloudTable::use()
00217 {
00218 MSCloudInfo::gCloudInfo->lock();
00219 }
00220
00221 void MSCloudTable::unuse()
00222 {
00223 MSCloudInfo::gCloudInfo->unlock();
00224
00225 }
00226
00227
00228 void MSCloudTable::seqScanInit()
00229 {
00230 iCloudIndex = 0;
00231 }
00232
00233 #define MAX_PASSWORD ((int32_t)64)
00234 bool MSCloudTable::seqScanNext(char *buf)
00235 {
00236 char passwd[MAX_PASSWORD +1];
00237 TABLE *table = mySQLTable;
00238 Field *curr_field;
00239 byte *save;
00240 MY_BITMAP *save_write_set;
00241 MSCloudInfo *info;
00242 const char *val;
00243
00244 enter_();
00245
00246 info = (MSCloudInfo *) MSCloudInfo::gCloudInfo->itemAt(iCloudIndex++);
00247 if (!info)
00248 return_(false);
00249
00250 save_write_set = table->write_set;
00251 table->write_set = NULL;
00252
00253 #ifdef DRIZZLED
00254 memset(buf, 0xFF, table->getNullBytes());
00255 #else
00256 memset(buf, 0xFF, table->s->null_bytes);
00257 #endif
00258 for (Field **field=GET_TABLE_FIELDS(table) ; *field ; field++) {
00259 curr_field = *field;
00260 save = curr_field->ptr;
00261 #if MYSQL_VERSION_ID < 50114
00262 curr_field->ptr = (byte *) buf + curr_field->offset();
00263 #else
00264 #ifdef DRIZZLED
00265 curr_field->ptr = (byte *) buf + curr_field->offset(curr_field->getTable()->getInsertRecord());
00266 #else
00267 curr_field->ptr = (byte *) buf + curr_field->offset(curr_field->table->record[0]);
00268 #endif
00269 #endif
00270 switch (curr_field->field_name[0]) {
00271 case 'I':
00272 ASSERT(strcmp(curr_field->field_name, "Id") == 0);
00273 curr_field->store(info->getCloudRefId(), true);
00274 break;
00275
00276 case 'S':
00277 ASSERT(strcmp(curr_field->field_name, "Server") == 0);
00278 val = info->getServer();
00279 curr_field->store(val, strlen(val), &UTF8_CHARSET);
00280 setNotNullInRecord(curr_field, buf);
00281 break;
00282
00283 case 'B':
00284 ASSERT(strcmp(curr_field->field_name, "Bucket") == 0);
00285 val = info->getBucket();
00286 curr_field->store(val, strlen(val), &UTF8_CHARSET);
00287 setNotNullInRecord(curr_field, buf);
00288 break;
00289
00290 case 'P':
00291 if (curr_field->field_name[1] == 'u') {
00292 ASSERT(strcmp(curr_field->field_name, "PublicKey") == 0);
00293 val = info->getPublicKey();
00294 } else if (curr_field->field_name[1] == 'r') {
00295 ASSERT(strcmp(curr_field->field_name, "PrivateKey") == 0);
00296 val = info->getPrivateKey();
00297
00298 int32_t i;
00299 for (i = 0; (i < MAX_PASSWORD) && (i < (int32_t)strlen(val)); i++) passwd[i] = '*';
00300 passwd[i] = 0;
00301 val = passwd;
00302 } else {
00303 ASSERT(false);
00304 break;
00305 }
00306 curr_field->store(val, strlen(val), &UTF8_CHARSET);
00307 setNotNullInRecord(curr_field, buf);
00308 break;
00309
00310 default:
00311 ASSERT(false);
00312 }
00313 curr_field->ptr = save;
00314 }
00315
00316 table->write_set = save_write_set;
00317
00318 return_(true);
00319 }
00320
00321 void MSCloudTable::seqScanPos(unsigned char *pos )
00322 {
00323 int32_t index = iCloudIndex -1;
00324 if (index < 0)
00325 index = 0;
00326
00327 mi_int4store(pos, index);
00328 }
00329
00330 void MSCloudTable::seqScanRead(unsigned char *pos , char *buf)
00331 {
00332 iCloudIndex = mi_uint4korr(pos);
00333 seqScanNext(buf);
00334 }
00335
00336 void MSCloudTable::updateRow(char *old_data, char *new_data)
00337 {
00338 uint32_t n_id, o_id, o_indx, n_indx;
00339 const char *realPrivKey;
00340 String server, bucket, pubKey, privKey;
00341 String o_server, o_bucket, o_pubKey, o_privKey;
00342 MSCloudInfo *info;
00343
00344 enter_();
00345
00346 getFieldValue(new_data, 0, &n_id);
00347 getFieldValue(new_data, 1, &server);
00348 getFieldValue(new_data, 2, &bucket);
00349 getFieldValue(new_data, 3, &pubKey);
00350 getFieldValue(new_data, 4, &privKey);
00351
00352 getFieldValue(old_data, 0, &o_id);
00353 getFieldValue(old_data, 1, &o_server);
00354 getFieldValue(old_data, 2, &o_bucket);
00355 getFieldValue(old_data, 3, &o_pubKey);
00356 getFieldValue(old_data, 4, &o_privKey);
00357
00358
00359 if ((o_id != n_id) && MSCloudInfo::gCloudInfo->get(n_id)) {
00360 CSException::throwException(CS_CONTEXT, MS_ERR_DUPLICATE, "Attempt to update a row with a duplicate key in the "CLOUD_TABLE_NAME" table.");
00361 }
00362
00363
00364
00365
00366 if (strcmp(privKey.c_ptr(), o_privKey.c_ptr()))
00367 realPrivKey = privKey.c_ptr();
00368 else {
00369 info = (MSCloudInfo*) MSCloudInfo::gCloudInfo->get(o_id);
00370 realPrivKey = info->getPrivateKey();
00371 }
00372
00373 new_(info, MSCloudInfo( n_id, server.c_ptr(), bucket.c_ptr(), pubKey.c_ptr(), realPrivKey));
00374 push_(info);
00375
00376 o_indx = MSCloudInfo::gCloudInfo->getIndex(o_id);
00377
00378 MSCloudInfo::gCloudInfo->remove(o_id);
00379 pop_(info);
00380 MSCloudInfo::gCloudInfo->set(n_id, info);
00381 n_indx = MSCloudInfo::gCloudInfo->getIndex(n_id);
00382
00383
00384 if (o_indx < n_indx )
00385 iCloudIndex--;
00386
00387 saveTable(RETAIN(myShare->mySysDatabase));
00388 exit_();
00389 }
00390
00391 void MSCloudTable::insertRow(char *data)
00392 {
00393 uint32_t ref_id;
00394 String server, bucket, pubKey, privKey;
00395 MSCloudInfo *info;
00396
00397 enter_();
00398
00399 getFieldValue(data, 0, &ref_id);
00400
00401
00402 if (ref_id && MSCloudInfo::gCloudInfo->get(ref_id)) {
00403 CSException::throwException(CS_CONTEXT, MS_ERR_DUPLICATE, "Attempt to insert a row with a duplicate key in the "CLOUD_TABLE_NAME" table.");
00404 }
00405
00406 getFieldValue(data, 1, &server);
00407 getFieldValue(data, 2, &bucket);
00408 getFieldValue(data, 3, &pubKey);
00409 getFieldValue(data, 4, &privKey);
00410
00411 if (ref_id == 0)
00412 ref_id = MSCloudInfo::gMaxInfoRef++;
00413 else if (ref_id >= MSCloudInfo::gMaxInfoRef)
00414 MSCloudInfo::gMaxInfoRef = ref_id +1;
00415
00416 new_(info, MSCloudInfo( ref_id, server.c_ptr(), bucket.c_ptr(), pubKey.c_ptr(), privKey.c_ptr()));
00417 MSCloudInfo::gCloudInfo->set(ref_id, info);
00418
00419 saveTable(RETAIN(myShare->mySysDatabase));
00420 exit_();
00421 }
00422
00423 void MSCloudTable::deleteRow(char *data)
00424 {
00425 uint32_t ref_id, indx;
00426
00427 enter_();
00428
00429 getFieldValue(data, 0, &ref_id);
00430
00431
00432 indx = MSCloudInfo::gCloudInfo->getIndex(ref_id);
00433 if (indx <= iCloudIndex)
00434 iCloudIndex--;
00435
00436 MSCloudInfo::gCloudInfo->remove(ref_id);
00437 saveTable(RETAIN(myShare->mySysDatabase));
00438 exit_();
00439 }
00440
00441 void MSCloudTable::transferTable(MSDatabase *to_db, MSDatabase *from_db)
00442 {
00443 CSPath *path;
00444 enter_();
00445
00446 push_(from_db);
00447 push_(to_db);
00448
00449 path = CSPath::newPath(getPBMSPath(RETAIN(from_db->myDatabasePath)), CLOUD_TABLE_NAME".dat");
00450 push_(path);
00451 if (path->exists()) {
00452 CSPath *bu_path;
00453 bu_path = CSPath::newPath(getPBMSPath(RETAIN(to_db->myDatabasePath)), CLOUD_TABLE_NAME".dat");
00454 path->copyTo(bu_path, true);
00455 }
00456
00457 release_(path);
00458 release_(to_db);
00459 release_(from_db);
00460
00461 exit_();
00462 }
00463
00464 CSStringBuffer *MSCloudTable::dumpTable(MSDatabase *db)
00465 {
00466
00467 CSPath *path;
00468 CSStringBuffer *dump;
00469
00470 enter_();
00471
00472 push_(db);
00473 path = getSysFile(getPBMSPath(RETAIN(db->myDatabasePath)), CLOUD_TABLE_NAME, MIN_CLOUD_TABLE_SIZE);
00474 release_(db);
00475
00476 push_(path);
00477 new_(dump, CSStringBuffer(20));
00478 push_(dump);
00479
00480 if (path->exists()) {
00481 CSFile *file;
00482 size_t size;
00483
00484 file = path->openFile(CSFile::READONLY);
00485 push_(file);
00486
00487 size = file->getEOF();
00488 dump->setLength(size);
00489 file->read(dump->getBuffer(0), 0, size, size);
00490 release_(file);
00491 }
00492
00493 pop_(dump);
00494 release_(path);
00495 return_(dump);
00496 }
00497
00498 void MSCloudTable::restoreTable(MSDatabase *db, const char *data, size_t size, bool reload)
00499 {
00500 CSPath *path;
00501 CSFile *file;
00502
00503 enter_();
00504
00505 push_(db);
00506 path = getSysFile(getPBMSPath(RETAIN(db->myDatabasePath)), CLOUD_TABLE_NAME, MIN_CLOUD_TABLE_SIZE);
00507 push_(path);
00508
00509 file = path->openFile(CSFile::CREATE | CSFile::TRUNCATE);
00510 push_(file);
00511
00512 file->write(data, 0, size);
00513 file->close();
00514 release_(file);
00515
00516 release_(path);
00517
00518 pop_(db);
00519 if (reload)
00520 loadTable(db);
00521 else
00522 db->release();
00523
00524 exit_();
00525 }
00526
00527 void MSCloudTable::removeTable(CSString *db_path)
00528 {
00529 CSPath *path;
00530 char pbms_path[PATH_MAX];
00531
00532 enter_();
00533
00534 push_(db_path);
00535 cs_strcpy(PATH_MAX, pbms_path, db_path->getCString());
00536 release_(db_path);
00537
00538 if (strcmp(cs_last_name_of_path(pbms_path), "pbms") != 0)
00539 exit_();
00540
00541 cs_remove_last_name_of_path(pbms_path);
00542
00543 path = getSysFile(CSString::newString(pbms_path), CLOUD_TABLE_NAME, MIN_CLOUD_TABLE_SIZE);
00544 push_(path);
00545
00546 if (path->exists())
00547 path->removeFile();
00548 release_(path);
00549
00550 exit_();
00551 }
00552