Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00031 #include <config.h>
00032 #include <drizzled/plugin/function.h>
00033 #include <drizzled/item/func.h>
00034 #include <drizzled/function/str/strfunc.h>
00035 #include <drizzled/error.h>
00036 #include <drizzled/internal/my_sys.h>
00037 #include <drizzled/charset.h>
00038 #include <drizzled/gettext.h>
00039 #include <drizzled/errmsg_print.h>
00040
00041 #include <fcntl.h>
00042 #include <errno.h>
00043
00044 #include "transaction_log.h"
00045 #include "print_transaction_message.h"
00046
00047 #include <drizzled/message/transaction.pb.h>
00048 #include <drizzled/replication_services.h>
00049 #include <google/protobuf/io/zero_copy_stream.h>
00050 #include <google/protobuf/io/zero_copy_stream_impl.h>
00051 #include <google/protobuf/io/coded_stream.h>
00052 #include <google/protobuf/text_format.h>
00053
00054 using namespace std;
00055 using namespace drizzled;
00056 using namespace google;
00057
00059 extern TransactionLog *transaction_log;
00060
00061 plugin::Create_function<PrintTransactionMessageFunction> *print_transaction_message_func_factory= NULL;
00062
00063 void PrintTransactionMessageFunction::fix_length_and_dec()
00064 {
00065 max_length= 2 * 1024 * 1024;
00066 args[0]->collation.set(
00067 get_charset_by_csname(args[0]->collation.collation->csname,
00068 MY_CS_BINSORT), DERIVATION_COERCIBLE);
00069 }
00070
00071 String *PrintTransactionMessageFunction::val_str(String *str)
00072 {
00073 assert(fixed == true);
00074
00075 String *filename_arg= args[0]->val_str(str);
00076 off_t offset_arg= static_cast<int64_t>(args[1]->val_int());
00077
00078 if (filename_arg == NULL || args[1]->null_value == true)
00079 {
00080 my_error(ER_INVALID_NULL_ARGUMENT, MYF(0), func_name());
00081 null_value= true;
00082 return NULL;
00083 }
00084
00085 if (transaction_log == NULL)
00086 {
00087 my_error(ER_INVALID_NULL_ARGUMENT, MYF(0), func_name());
00088 null_value= true;
00089 return NULL;
00090 }
00091
00092 null_value= false;
00093
00094 message::Transaction transaction_message;
00095
00102 const string &filename= transaction_log->getLogFilename();
00103 int log_file= open(filename.c_str(), O_RDONLY);
00104 if (log_file == -1)
00105 {
00106 sql_perror(_("Failed to open transaction log file"), filename);
00107 null_value= true;
00108
00109 return NULL;
00110 }
00111
00112 (void) lseek(log_file, offset_arg, SEEK_SET);
00113
00114 protobuf::io::FileInputStream *file_input= new protobuf::io::FileInputStream(log_file);
00115 file_input->SetCloseOnDelete(true);
00116
00117 protobuf::io::CodedInputStream *coded_input= new protobuf::io::CodedInputStream(file_input);
00118
00119
00120 uint32_t message_type;
00121 if (! coded_input->ReadLittleEndian32(&message_type))
00122 {
00123 delete coded_input;
00124 delete file_input;
00125
00127 null_value= true;
00128 return NULL;
00129 }
00130
00131
00132 if (message_type != ReplicationServices::TRANSACTION)
00133 {
00134 fprintf(stderr, _("GPB message is not a valid type.\n"));
00135 delete coded_input;
00136 delete file_input;
00137 null_value= true;
00138 return NULL;
00139 }
00140
00141 uint32_t message_size;
00142 if (! coded_input->ReadLittleEndian32(&message_size))
00143 {
00144 delete coded_input;
00145 delete file_input;
00146
00148 null_value= true;
00149 return NULL;
00150 }
00151
00152 if (message_size > INT_MAX)
00153 {
00154 fprintf(stderr, _("GPB message is not a valid size.\n"));
00155 delete coded_input;
00156 delete file_input;
00157 null_value= true;
00158 return NULL;
00159 }
00160
00161 uint8_t *buffer= (uint8_t *) malloc(message_size);
00162
00163 bool result= coded_input->ReadRaw(buffer, message_size);
00164 if (result == false)
00165 {
00166
00167 sql_perror(_("Could not read transaction message. Raw buffer read "), std::string((const char *)buffer, std::min(message_size, 120U)));
00168 }
00169
00170 result= transaction_message.ParseFromArray(buffer, static_cast<int32_t>(message_size));
00171 if (result == false)
00172 {
00173 fprintf(stderr, _("Unable to parse transaction. Got error: %s.\n"), transaction_message.InitializationErrorString().c_str());
00174 if (buffer != NULL)
00175 fprintf(stderr, _("BUFFER: %s\n"), buffer);
00176 }
00177
00178 free(buffer);
00179
00180 string transaction_text;
00181 protobuf::TextFormat::PrintToString(transaction_message, &transaction_text);
00182
00183 if (str->alloc(transaction_text.length()))
00184 {
00185 delete coded_input;
00186 delete file_input;
00187 null_value= true;
00188 return NULL;
00189 }
00190
00191 str->length(transaction_text.length());
00192
00193 strncpy(str->ptr(), transaction_text.c_str(), transaction_text.length());
00194
00195 delete coded_input;
00196 delete file_input;
00197
00198 return str;
00199 }