Drizzled Public API Documentation

transaction_writer.cc
00001 /* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
00002  *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
00003  *
00004  *  Copyright (C) 2009 Sun Microsystems, Inc.
00005  *
00006  *  Authors:
00007  *
00008  *    Jay Pipes <joinfu@sun.com>
00009  *
00010  *  This program is free software; you can redistribute it and/or modify
00011  *  it under the terms of the GNU General Public License as published by
00012  *  the Free Software Foundation; version 2 of the License.
00013  *
00014  *  This program is distributed in the hope that it will be useful,
00015  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00016  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017  *  GNU General Public License for more details.
00018  *
00019  *  You should have received a copy of the GNU General Public License
00020  *  along with this program; if not, write to the Free Software
00021  *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
00022  */
00023 
00024 #include <config.h>
00025 #include <drizzled/algorithm/crc32.h>
00026 #include <drizzled/gettext.h>
00027 #include <drizzled/replication_services.h>
00028 
00029 #include <sys/types.h>
00030 #include <sys/stat.h>
00031 #include <fcntl.h>
00032 #include <string>
00033 #include <fstream>
00034 #include <unistd.h>
00035 
00036 #if TIME_WITH_SYS_TIME
00037 # include <sys/time.h>
00038 # include <time.h>
00039 #else
00040 # if HAVE_SYS_TIME_H
00041 #  include <sys/time.h>
00042 # else
00043 #  include <time.h>
00044 # endif
00045 #endif
00046 
00047 #include <drizzled/message/transaction.pb.h>
00048 
00049 #include <google/protobuf/io/coded_stream.h>
00050 #include <google/protobuf/io/zero_copy_stream_impl.h>
00051 
00052 #include <drizzled/gettext.h>
00053 
00058 using namespace std;
00059 using namespace drizzled;
00060 using namespace google;
00061 
00062 static uint32_t server_id= 1;
00063 static uint64_t transaction_id= 1;
00064 
00065 static uint64_t getNanoTimestamp()
00066 {
00067 #ifdef HAVE_CLOCK_GETTIME
00068   struct timespec tp;
00069   clock_gettime(CLOCK_REALTIME, &tp);
00070   return (uint64_t) tp.tv_sec * 10000000
00071        + (uint64_t) tp.tv_nsec;
00072 #else
00073   struct timeval tv;
00074   gettimeofday(&tv,NULL);
00075   return (uint64_t) tv.tv_sec * 10000000
00076        + (uint64_t) tv.tv_usec * 1000;
00077 #endif
00078 }
00079 
00080 static void initTransactionContext(message::Transaction &transaction)
00081 {
00082   message::TransactionContext *ctx= transaction.mutable_transaction_context();
00083   ctx->set_transaction_id(transaction_id++);
00084   ctx->set_start_timestamp(getNanoTimestamp());
00085   ctx->set_server_id(server_id);
00086 }
00087 
00088 static void finalizeTransactionContext(message::Transaction &transaction)
00089 {
00090   message::TransactionContext *ctx= transaction.mutable_transaction_context();
00091   ctx->set_end_timestamp(getNanoTimestamp());
00092 }
00093 
00094 static void doCreateTable1(message::Transaction &transaction)
00095 {
00096   message::Statement *statement= transaction.add_statement();
00097 
00098   statement->set_type(message::Statement::RAW_SQL);
00099   statement->set_sql("CREATE TABLE t1 (a VARCHAR(32) NOT NULL, PRIMARY KEY a) ENGINE=InnoDB");
00100   statement->set_start_timestamp(getNanoTimestamp());
00101   statement->set_end_timestamp(getNanoTimestamp());
00102 }
00103 
00104 static void doCreateTable2(message::Transaction &transaction)
00105 {
00106   message::Statement *statement= transaction.add_statement();
00107 
00108   statement->set_type(message::Statement::RAW_SQL);
00109   statement->set_sql("CREATE TABLE t2 (a INTEGER NOT NULL, PRIMARY KEY a) ENGINE=InnoDB");
00110   statement->set_start_timestamp(getNanoTimestamp());
00111   statement->set_end_timestamp(getNanoTimestamp());
00112 }
00113 
00114 static void doCreateTable3(message::Transaction &transaction)
00115 {
00116   message::Statement *statement= transaction.add_statement();
00117 
00118   statement->set_type(message::Statement::RAW_SQL);
00119   statement->set_sql("CREATE TABLE t3 (a INTEGER NOT NULL, b BLOB NOT NULL, PRIMARY KEY a) ENGINE=InnoDB");
00120   statement->set_start_timestamp(getNanoTimestamp());
00121   statement->set_end_timestamp(getNanoTimestamp());
00122 }
00123 
00124 static void doSimpleInsert(message::Transaction &transaction)
00125 {
00126   message::Statement *statement= transaction.add_statement();
00127 
00128   /* Do generic Statement setup */
00129   statement->set_type(message::Statement::INSERT);
00130   statement->set_sql("INSERT INTO t1 (a) VALUES (\"1\"), (\"2\")");
00131   statement->set_start_timestamp(getNanoTimestamp());
00132 
00133   /* Do INSERT-specific header and setup */
00134   message::InsertHeader *header= statement->mutable_insert_header();
00135 
00136   /* Add table and field metadata for the statement */
00137   message::TableMetadata *t_meta= header->mutable_table_metadata();
00138   t_meta->set_schema_name("test");
00139   t_meta->set_table_name("t1");
00140 
00141   message::FieldMetadata *f_meta= header->add_field_metadata();
00142   f_meta->set_name("a");
00143   f_meta->set_type(message::Table::Field::VARCHAR);
00144 
00145   /* Add new values... */
00146   message::InsertData *data= statement->mutable_insert_data();
00147   data->set_segment_id(1);
00148   data->set_end_segment(true);
00149 
00150   message::InsertRecord *record1= data->add_record();
00151   message::InsertRecord *record2= data->add_record();
00152 
00153   record1->add_insert_value("1");
00154   record2->add_insert_value("2");
00155 
00156   statement->set_end_timestamp(getNanoTimestamp());
00157 }
00158 
00159 static void doNonVarcharInsert(message::Transaction &transaction)
00160 {
00161   message::Statement *statement= transaction.add_statement();
00162 
00163   /* Do generic Statement setup */
00164   statement->set_type(message::Statement::INSERT);
00165   statement->set_sql("INSERT INTO t2 (a) VALUES (1), (2)");
00166   statement->set_start_timestamp(getNanoTimestamp());
00167 
00168   /* Do INSERT-specific header and setup */
00169   message::InsertHeader *header= statement->mutable_insert_header();
00170 
00171   /* Add table and field metadata for the statement */
00172   message::TableMetadata *t_meta= header->mutable_table_metadata();
00173   t_meta->set_schema_name("test");
00174   t_meta->set_table_name("t2");
00175 
00176   message::FieldMetadata *f_meta= header->add_field_metadata();
00177   f_meta->set_name("a");
00178   f_meta->set_type(message::Table::Field::INTEGER);
00179 
00180   /* Add new values... */
00181   message::InsertData *data= statement->mutable_insert_data();
00182   data->set_segment_id(1);
00183   data->set_end_segment(true);
00184 
00185   message::InsertRecord *record1= data->add_record();
00186   message::InsertRecord *record2= data->add_record();
00187 
00188   record1->add_insert_value("1");
00189   record2->add_insert_value("2");
00190 
00191   statement->set_end_timestamp(getNanoTimestamp());
00192 }
00193 
00194 static void doBlobInsert(message::Transaction &transaction)
00195 {
00196   message::Statement *statement= transaction.add_statement();
00197 
00198   /* Do generic Statement setup */
00199   statement->set_type(message::Statement::INSERT);
00200   statement->set_sql("INSERT INTO t3 (a, b) VALUES (1, 'test\0me')", 43); /* 43 == length including \0 */
00201   statement->set_start_timestamp(getNanoTimestamp());
00202 
00203   /* Do INSERT-specific header and setup */
00204   message::InsertHeader *header= statement->mutable_insert_header();
00205 
00206   /* Add table and field metadata for the statement */
00207   message::TableMetadata *t_meta= header->mutable_table_metadata();
00208   t_meta->set_schema_name("test");
00209   t_meta->set_table_name("t3");
00210 
00211   message::FieldMetadata *f_meta= header->add_field_metadata();
00212   f_meta->set_name("a");
00213   f_meta->set_type(message::Table::Field::INTEGER);
00214 
00215   f_meta= header->add_field_metadata();
00216   f_meta->set_name("b");
00217   f_meta->set_type(message::Table::Field::BLOB);
00218 
00219   /* Add new values... */
00220   message::InsertData *data= statement->mutable_insert_data();
00221   data->set_segment_id(1);
00222   data->set_end_segment(true);
00223 
00224   message::InsertRecord *record1= data->add_record();
00225 
00226   record1->add_insert_value("1");
00227   record1->add_insert_value("test\0me", 7); /* 7 == length including \0 */
00228 
00229   statement->set_end_timestamp(getNanoTimestamp());
00230 }
00231 
00232 static void doSimpleDelete(message::Transaction &transaction)
00233 {
00234   message::Statement *statement= transaction.add_statement();
00235 
00236   /* Do generic Statement setup */
00237   statement->set_type(message::Statement::DELETE);
00238   statement->set_sql("DELETE FROM t1 WHERE a = \"1\"");
00239   statement->set_start_timestamp(getNanoTimestamp());
00240 
00241   /* Do DELETE-specific header and setup */
00242   message::DeleteHeader *header= statement->mutable_delete_header();
00243 
00244   /* Add table and field metadata for the statement */
00245   message::TableMetadata *t_meta= header->mutable_table_metadata();
00246   t_meta->set_schema_name("test");
00247   t_meta->set_table_name("t1");
00248 
00249   message::FieldMetadata *f_meta= header->add_key_field_metadata();
00250   f_meta->set_name("a");
00251   f_meta->set_type(message::Table::Field::VARCHAR);
00252 
00253   /* Add new values... */
00254   message::DeleteData *data= statement->mutable_delete_data();
00255   data->set_segment_id(1);
00256   data->set_end_segment(true);
00257 
00258   message::DeleteRecord *record1= data->add_record();
00259 
00260   record1->add_key_value("1");
00261 
00262   statement->set_end_timestamp(getNanoTimestamp());
00263 }
00264 
00265 static void doSimpleUpdate(message::Transaction &transaction)
00266 {
00267   message::Statement *statement= transaction.add_statement();
00268 
00269   /* Do generic Statement setup */
00270   statement->set_type(message::Statement::UPDATE);
00271   statement->set_sql("UPDATE t1 SET a = \"5\" WHERE a = \"1\"");
00272   statement->set_start_timestamp(getNanoTimestamp());
00273 
00274   /* Do UPDATE-specific header and setup */
00275   message::UpdateHeader *header= statement->mutable_update_header();
00276 
00277   /* Add table and field metadata for the statement */
00278   message::TableMetadata *t_meta= header->mutable_table_metadata();
00279   t_meta->set_schema_name("test");
00280   t_meta->set_table_name("t1");
00281 
00282   message::FieldMetadata *kf_meta= header->add_key_field_metadata();
00283   kf_meta->set_name("a");
00284   kf_meta->set_type(message::Table::Field::VARCHAR);
00285 
00286   message::FieldMetadata *sf_meta= header->add_set_field_metadata();
00287   sf_meta->set_name("a");
00288   sf_meta->set_type(message::Table::Field::VARCHAR);
00289 
00290   /* Add new values... */
00291   message::UpdateData *data= statement->mutable_update_data();
00292   data->set_segment_id(1);
00293   data->set_end_segment(true);
00294 
00295   message::UpdateRecord *record1= data->add_record();
00296 
00297   record1->add_after_value("5");
00298   record1->add_key_value("1");
00299 
00300   statement->set_end_timestamp(getNanoTimestamp());
00301 }
00302 
00303 static void doMultiKeyUpdate(message::Transaction &transaction)
00304 {
00305   message::Statement *statement= transaction.add_statement();
00306 
00307   /* Do generic Statement setup */
00308   statement->set_type(message::Statement::UPDATE);
00309   statement->set_sql("UPDATE t1 SET a = \"5\"");
00310   statement->set_start_timestamp(getNanoTimestamp());
00311 
00312   /* Do UPDATE-specific header and setup */
00313   message::UpdateHeader *header= statement->mutable_update_header();
00314 
00315   /* Add table and field metadata for the statement */
00316   message::TableMetadata *t_meta= header->mutable_table_metadata();
00317   t_meta->set_schema_name("test");
00318   t_meta->set_table_name("t1");
00319 
00320   message::FieldMetadata *kf_meta= header->add_key_field_metadata();
00321   kf_meta->set_name("a");
00322   kf_meta->set_type(message::Table::Field::VARCHAR);
00323 
00324   message::FieldMetadata *sf_meta= header->add_set_field_metadata();
00325   sf_meta->set_name("a");
00326   sf_meta->set_type(message::Table::Field::VARCHAR);
00327 
00328   /* Add new values... */
00329   message::UpdateData *data= statement->mutable_update_data();
00330   data->set_segment_id(1);
00331   data->set_end_segment(true);
00332 
00333   message::UpdateRecord *record1= data->add_record();
00334   message::UpdateRecord *record2= data->add_record();
00335 
00336   record1->add_after_value("5");
00337   record1->add_key_value("1");
00338   record2->add_after_value("5");
00339   record2->add_key_value("2");
00340 
00341   statement->set_end_timestamp(getNanoTimestamp());
00342 }
00343 
00344 static void writeTransaction(protobuf::io::CodedOutputStream *output, message::Transaction &transaction)
00345 {
00346   std::string buffer("");
00347   finalizeTransactionContext(transaction);
00348   transaction.SerializeToString(&buffer);
00349 
00350   size_t length= buffer.length();
00351 
00352   output->WriteLittleEndian32(static_cast<uint32_t>(ReplicationServices::TRANSACTION));
00353   output->WriteLittleEndian32(static_cast<uint32_t>(length));
00354   output->WriteString(buffer);
00355   output->WriteLittleEndian32(drizzled::algorithm::crc32(buffer.c_str(), length)); /* checksum */
00356 }
00357 
00358 int main(int argc, char* argv[])
00359 {
00360   GOOGLE_PROTOBUF_VERIFY_VERSION;
00361   int file;
00362 
00363   if (argc != 2) 
00364   {
00365     fprintf(stderr, _("Usage: %s TRANSACTION_LOG\n"), argv[0]);
00366     return -1;
00367   }
00368 
00369   if ((file= open(argv[1], O_APPEND|O_CREAT|O_SYNC|O_WRONLY, S_IRWXU)) == -1)
00370   {
00371     fprintf(stderr, _("Cannot open file: %s\n"), argv[1]);
00372     return -1;
00373   }
00374 
00375   protobuf::io::ZeroCopyOutputStream *raw_output= new protobuf::io::FileOutputStream(file);
00376   protobuf::io::CodedOutputStream *coded_output= new protobuf::io::CodedOutputStream(raw_output);
00377 
00378   /* Write a series of statements which test each type of Statement */
00379   message::Transaction transaction;
00380 
00381   /* Simple CREATE TABLE statements as raw sql */
00382   initTransactionContext(transaction);
00383   doCreateTable1(transaction);
00384   writeTransaction(coded_output, transaction);
00385   transaction.Clear();
00386 
00387   initTransactionContext(transaction);
00388   doCreateTable2(transaction);
00389   writeTransaction(coded_output, transaction);
00390   transaction.Clear();
00391 
00392   /* Simple INSERT statement */
00393   initTransactionContext(transaction);
00394   doSimpleInsert(transaction);
00395   writeTransaction(coded_output, transaction);
00396   transaction.Clear();
00397 
00398   /* Write a DELETE and an UPDATE in one transaction */
00399   initTransactionContext(transaction);
00400   doSimpleDelete(transaction);
00401   doSimpleUpdate(transaction);
00402   writeTransaction(coded_output, transaction);
00403   transaction.Clear();
00404 
00405   /* Test an INSERT into non-varchar columns */
00406   initTransactionContext(transaction);
00407   doNonVarcharInsert(transaction);
00408   writeTransaction(coded_output, transaction);
00409   transaction.Clear();
00410 
00411   /* Write an UPDATE which affects >1 row */
00412   initTransactionContext(transaction);
00413   doMultiKeyUpdate(transaction);
00414   writeTransaction(coded_output, transaction);
00415   transaction.Clear();
00416 
00417   /* Write an INSERT which writes BLOB data */
00418   initTransactionContext(transaction);
00419   doCreateTable3(transaction);
00420   doBlobInsert(transaction);
00421   writeTransaction(coded_output, transaction);
00422   transaction.Clear();
00423 
00424   delete coded_output;
00425   delete raw_output;
00426 
00427   return 0;
00428 }