00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #include <config.h>
00022
00023 #include "read_replication.h"
00024 #include "create_replication.h"
00025
00026 #ifdef UNIV_NONINL
00027 #include "dict0crea.ic"
00028 #endif
00029
00030 #include "btr0pcur.h"
00031 #include "btr0btr.h"
00032 #include "page0page.h"
00033 #include "mach0data.h"
00034 #include "dict0boot.h"
00035 #include "dict0dict.h"
00036 #include "que0que.h"
00037 #include "row0ins.h"
00038 #include "row0mysql.h"
00039 #include "pars0pars.h"
00040 #include "trx0roll.h"
00041 #include "usr0sess.h"
00042 #include "ut0vec.h"
00043 #include "row0merge.h"
00044 #include "row0mysql.h"
00045
00046 UNIV_INTERN ulint dict_create_sys_replication_log(void)
00047 {
00048 dict_table_t* table1;
00049 ulint error;
00050 trx_t *trx;
00051
00052 mutex_enter(&(dict_sys->mutex));
00053
00054 table1 = dict_table_get_low("SYS_REPLICATION_LOG");
00055
00056 trx_sys_read_commit_id();
00057
00058 if (table1)
00059 {
00060 mutex_exit(&(dict_sys->mutex));
00061
00062 return(DB_SUCCESS);
00063 }
00064
00065 mutex_exit(&(dict_sys->mutex));
00066
00067 trx= trx_allocate_for_mysql();
00068
00069 trx->op_info= "creating replication sys table";
00070
00071 row_mysql_lock_data_dictionary(trx);
00072
00073 pars_info_t *info= pars_info_create();
00074
00075
00076 error = que_eval_sql(info,
00077 "PROCEDURE CREATE_SYS_REPLICATION_LOG_PROC () IS\n"
00078 "BEGIN\n"
00079 "CREATE TABLE SYS_REPLICATION_LOG(ID INT(8), SEGID INT, COMMIT_ID INT(8), END_TIMESTAMP INT(8), MESSAGE_LEN INT, MESSAGE BLOB);\n"
00080 "CREATE UNIQUE CLUSTERED INDEX PRIMARY ON SYS_REPLICATION_LOG (ID, SEGID);\n"
00081 "CREATE INDEX COMMIT_IDX ON SYS_REPLICATION_LOG (COMMIT_ID, ID);\n"
00082 "END;\n"
00083 , FALSE, trx);
00084
00085
00086
00087 if (error != DB_SUCCESS)
00088 {
00089 fprintf(stderr, "InnoDB: error %lu in creation.\n", (ulong) error);
00090
00091 ut_a(error == DB_OUT_OF_FILE_SPACE || error == DB_TOO_MANY_CONCURRENT_TRXS);
00092
00093 fprintf(stderr,
00094 "InnoDB: creation failed\n"
00095 "InnoDB: tablespace is full\n"
00096 "InnoDB: dropping incompletely created SYS_REPLICATION_LOG table.\n");
00097
00098 row_drop_table_for_mysql("SYS_REPLICATION_LOG", trx, TRUE);
00099
00100 error = DB_MUST_GET_MORE_FILE_SPACE;
00101 }
00102
00103 trx_commit_for_mysql(trx);
00104
00105 row_mysql_unlock_data_dictionary(trx);
00106
00107 trx_free_for_mysql(trx);
00108
00109 return(error);
00110 }
00111
00112 UNIV_INTERN int read_replication_log_table_message(const char* table_name, drizzled::message::Table *table_message)
00113 {
00114 std::string search_string(table_name);
00115 boost::algorithm::to_lower(search_string);
00116
00117 if (search_string.compare("sys_replication_log") != 0)
00118 return -1;
00119
00120 drizzled::message::Engine *engine= table_message->mutable_engine();
00121 engine->set_name("InnoDB");
00122 table_message->set_name("SYS_REPLICATION_LOG");
00123 table_message->set_schema("DATA_DICTIONARY");
00124 table_message->set_type(drizzled::message::Table::STANDARD);
00125 table_message->set_creation_timestamp(0);
00126 table_message->set_update_timestamp(0);
00127
00128 drizzled::message::Table::TableOptions *options= table_message->mutable_options();
00129 options->set_collation_id(drizzled::my_charset_bin.number);
00130 options->set_collation(drizzled::my_charset_bin.name);
00131 options->set_dont_replicate(true);
00132
00133 drizzled::message::Table::Field *field= table_message->add_field();
00134 field->set_name("ID");
00135 field->set_type(drizzled::message::Table::Field::BIGINT);
00136
00137 field= table_message->add_field();
00138 field->set_name("SEGID");
00139 field->set_type(drizzled::message::Table::Field::INTEGER);
00140
00141 field= table_message->add_field();
00142 field->set_name("COMMIT_ID");
00143 field->set_type(drizzled::message::Table::Field::BIGINT);
00144
00145 field= table_message->add_field();
00146 field->set_name("END_TIMESTAMP");
00147 field->set_type(drizzled::message::Table::Field::BIGINT);
00148
00149 field= table_message->add_field();
00150 field->set_name("MESSAGE_LEN");
00151 field->set_type(drizzled::message::Table::Field::INTEGER);
00152
00153 field= table_message->add_field();
00154 field->set_name("MESSAGE");
00155 field->set_type(drizzled::message::Table::Field::BLOB);
00156 drizzled::message::Table::Field::StringFieldOptions *stropt= field->mutable_string_options();
00157 stropt->set_collation_id(drizzled::my_charset_bin.number);
00158 stropt->set_collation(drizzled::my_charset_bin.name);
00159
00160 drizzled::message::Table::Index *index= table_message->add_indexes();
00161 index->set_name("PRIMARY");
00162 index->set_is_primary(true);
00163 index->set_is_unique(true);
00164 index->set_type(drizzled::message::Table::Index::BTREE);
00165 index->set_key_length(12);
00166 drizzled::message::Table::Index::IndexPart *part= index->add_index_part();
00167 part->set_fieldnr(0);
00168 part->set_compare_length(8);
00169 part= index->add_index_part();
00170 part->set_fieldnr(1);
00171 part->set_compare_length(4);
00172
00173 index= table_message->add_indexes();
00174 index->set_name("COMMIT_IDX");
00175 index->set_is_primary(false);
00176 index->set_is_unique(false);
00177 index->set_type(drizzled::message::Table::Index::BTREE);
00178 index->set_key_length(16);
00179 part= index->add_index_part();
00180 part->set_fieldnr(2);
00181 part->set_compare_length(8);
00182 part= index->add_index_part();
00183 part->set_fieldnr(0);
00184 part->set_compare_length(8);
00185
00186 return 0;
00187 }
00188
00189 extern dtuple_t* row_get_prebuilt_insert_row(row_prebuilt_t* prebuilt);
00190
00191 ulint insert_replication_message(const char *message, size_t size,
00192 trx_t *trx, uint64_t trx_id,
00193 uint64_t end_timestamp, bool is_end_segment,
00194 uint32_t seg_id)
00195 {
00196 ulint error;
00197 row_prebuilt_t* prebuilt;
00198 dict_table_t *table;
00199 que_thr_t* thr;
00200 byte* data;
00201
00202 table = dict_table_get("SYS_REPLICATION_LOG",TRUE);
00203
00204 prebuilt = row_create_prebuilt(table);
00205
00206 if (prebuilt->trx != trx)
00207 {
00208 row_update_prebuilt_trx(prebuilt, trx);
00209 }
00210
00211
00212
00213
00214
00215
00216
00217
00218 bool is_started= true;
00219 if (trx->conc_state == TRX_NOT_STARTED)
00220 {
00221 is_started= false;
00222 }
00223
00224 dtuple_t* dtuple= row_get_prebuilt_insert_row(prebuilt);
00225 dfield_t *dfield;
00226
00227 dfield = dtuple_get_nth_field(dtuple, 0);
00228 data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 8));
00229 row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&trx_id, 8, dict_table_is_comp(prebuilt->table));
00230 dfield_set_data(dfield, data, 8);
00231
00232 dfield = dtuple_get_nth_field(dtuple, 1);
00233
00234 data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 4));
00235 row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&seg_id, 4, dict_table_is_comp(prebuilt->table));
00236 dfield_set_data(dfield, data, 4);
00237
00238 uint64_t commit_id= 0;
00239 if (is_end_segment)
00240 {
00241 commit_id= trx_sys_commit_id.increment();
00242 }
00243
00244 dfield = dtuple_get_nth_field(dtuple, 2);
00245 data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 8));
00246 row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&commit_id, 8, dict_table_is_comp(prebuilt->table));
00247 dfield_set_data(dfield, data, 8);
00248
00249 dfield = dtuple_get_nth_field(dtuple, 3);
00250 data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 8));
00251 row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&end_timestamp, 8, dict_table_is_comp(prebuilt->table));
00252 dfield_set_data(dfield, data, 8);
00253
00254 dfield = dtuple_get_nth_field(dtuple, 4);
00255 data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 4));
00256 row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&size, 4, dict_table_is_comp(prebuilt->table));
00257 dfield_set_data(dfield, data, 4);
00258
00259 dfield = dtuple_get_nth_field(dtuple, 5);
00260 dfield_set_data(dfield, message, size);
00261
00262 ins_node_t* node = prebuilt->ins_node;
00263
00264 thr = que_fork_get_first_thr(prebuilt->ins_graph);
00265
00266 if (prebuilt->sql_stat_start) {
00267 node->state = INS_NODE_SET_IX_LOCK;
00268 prebuilt->sql_stat_start = FALSE;
00269 } else {
00270 node->state = INS_NODE_ALLOC_ROW_ID;
00271 }
00272
00273 que_thr_move_to_run_state_for_mysql(thr, trx);
00274
00275
00276 thr->run_node = node;
00277 thr->prev_node = node;
00278
00279 row_ins_step(thr);
00280
00281 error = trx->error_state;
00282
00283 que_thr_stop_for_mysql_no_error(thr, trx);
00284 row_prebuilt_free(prebuilt, FALSE);
00285
00286 if (! is_started)
00287 {
00288 trx_commit_for_mysql(trx);
00289 }
00290
00291 return error;
00292 }
00293
00294 UNIV_INTERN read_replication_state_st *replication_read_init(void)
00295 {
00296 read_replication_state_st *state= new read_replication_state_st;
00297
00298 mutex_enter(&(dict_sys->mutex));
00299
00300 mtr_start(&state->mtr);
00301 state->sys_tables= dict_table_get_low("SYS_REPLICATION_LOG");
00302 state->sys_index= UT_LIST_GET_FIRST(state->sys_tables->indexes);
00303
00304 mutex_exit(&(dict_sys->mutex));
00305
00306 btr_pcur_open_at_index_side(TRUE, state->sys_index, BTR_SEARCH_LEAF, &state->pcur, TRUE, &state->mtr);
00307
00308 return state;
00309 }
00310
00311 UNIV_INTERN void replication_read_deinit(struct read_replication_state_st *state)
00312 {
00313 btr_pcur_close(&state->pcur);
00314 mtr_commit(&state->mtr);
00315 delete state;
00316 }
00317
00318 UNIV_INTERN struct read_replication_return_st replication_read_next(struct read_replication_state_st *state)
00319 {
00320 struct read_replication_return_st ret;
00321 const rec_t *rec;
00322
00323 btr_pcur_move_to_next_user_rec(&state->pcur, &state->mtr);
00324
00325 rec= btr_pcur_get_rec(&state->pcur);
00326
00327 while (btr_pcur_is_on_user_rec(&state->pcur))
00328 {
00329 const byte* field;
00330 ulint len;
00331
00332
00333 if (rec_get_deleted_flag(rec, 0))
00334 continue;
00335
00336
00337 field = rec_get_nth_field_old(rec, 0, &len);
00338 byte idbyte[8];
00339 convert_to_mysql_format(idbyte, field, 8);
00340 ret.id= *(uint64_t *)idbyte;
00341
00342
00343 field = rec_get_nth_field_old(rec, 1, &len);
00344 byte segbyte[4];
00345 convert_to_mysql_format(segbyte, field, 4);
00346 ret.seg_id= *(uint32_t *)segbyte;
00347
00348 field = rec_get_nth_field_old(rec, 4, &len);
00349 byte commitbyte[8];
00350 convert_to_mysql_format(commitbyte, field, 8);
00351 ret.commit_id= *(uint64_t *)commitbyte;
00352
00353 field = rec_get_nth_field_old(rec, 5, &len);
00354 byte timestampbyte[8];
00355 convert_to_mysql_format(timestampbyte, field, 8);
00356 ret.end_timestamp= *(uint64_t *)timestampbyte;
00357
00358
00359 field = rec_get_nth_field_old(rec, 7, &len);
00360 ret.message= (char *)field;
00361 ret.message_length= len;
00362
00363
00364
00365 btr_pcur_store_position(&state->pcur, &state->mtr);
00366 mtr_commit(&state->mtr);
00367
00368 mtr_start(&state->mtr);
00369
00370 btr_pcur_restore_position(BTR_SEARCH_LEAF, &state->pcur, &state->mtr);
00371
00372 return ret;
00373 }
00374
00375
00376 memset(&ret, 0, sizeof(ret));
00377
00378 return ret;
00379 }
00380
00381 UNIV_INTERN void convert_to_mysql_format(byte* out, const byte* in, int len)
00382 {
00383 byte *ptr;
00384 ptr = out + len;
00385
00386 for (;;) {
00387 ptr--;
00388 *ptr = *in;
00389 if (ptr == out) {
00390 break;
00391 }
00392 in++;
00393 }
00394
00395 out[len - 1] = (byte) (out[len - 1] ^ 128);
00396
00397 }