Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00044 #include <config.h>
00045
00046 #include <fcntl.h>
00047
00048 #include <climits>
00049 #include <cerrno>
00050 #include <cstdio>
00051
00052 #include "transaction_log_reader.h"
00053 #include "transaction_log.h"
00054
00055 #include <drizzled/gettext.h>
00056 #include <drizzled/message/transaction.pb.h>
00057
00058 #include <google/protobuf/io/zero_copy_stream_impl.h>
00059 #include <google/protobuf/io/coded_stream.h>
00060 #include <drizzled/algorithm/crc32.h>
00061 #include <drizzled/errmsg_print.h>
00062 #include <drizzled/definitions.h>
00063
00064 using namespace std;
00065 using namespace drizzled;
00066 using namespace google;
00067
00068 bool TransactionLogReader::read(const ReplicationServices::GlobalTransactionId &to_read_trx_id,
00069 message::Transaction *to_fill)
00070 {
00071
00072
00073
00074
00075
00076
00077
00078 string log_filename_to_read;
00079 bool log_file_found= log.findLogFilenameContainingTransactionId(to_read_trx_id, log_filename_to_read);
00080 bool result= true;
00081 bool do_checksum= false;
00082
00083 if (unlikely(! log_file_found))
00084 {
00085 return false;
00086 }
00087 else
00088 {
00089
00090 int log_file= open(log_filename_to_read.c_str(), O_RDONLY | O_NONBLOCK);
00091
00092 if (log_file == -1)
00093 {
00094 sql_perror(_("Failed to open transaction log file"), log_filename_to_read);
00095 return false;
00096 }
00097
00098 protobuf::io::ZeroCopyInputStream *raw_input= new protobuf::io::FileInputStream(log_file);
00099 protobuf::io::CodedInputStream *coded_input= new protobuf::io::CodedInputStream(raw_input);
00100
00101 char *buffer= NULL;
00102 char *temp_buffer= NULL;
00103 uint32_t length= 0;
00104 uint32_t previous_length= 0;
00105 uint32_t checksum= 0;
00106
00107 message::Transaction transaction;
00108
00109
00110 while (result == true && coded_input->ReadLittleEndian32(&length) == true)
00111 {
00112 if (length > INT_MAX)
00113 {
00114 fprintf(stderr, _("Attempted to read record bigger than INT_MAX\n"));
00115 exit(1);
00116 }
00117
00118 if (buffer == NULL)
00119 {
00120
00121
00122
00123
00124 temp_buffer= (char *) malloc(static_cast<size_t>(length));
00125 }
00126
00127 else if (length > previous_length)
00128 {
00129 temp_buffer= (char *) realloc(buffer, static_cast<size_t>(length));
00130 }
00131
00132 if (temp_buffer == NULL)
00133 {
00134 fprintf(stderr, _("Memory allocation failure trying to allocate %" PRIu64 " bytes.\n"),
00135 static_cast<uint64_t>(length));
00136 break;
00137 }
00138 else
00139 buffer= temp_buffer;
00140
00141
00142 result= coded_input->ReadRaw(buffer, length);
00143 if (result == false)
00144 {
00145 char errmsg[STRERROR_MAX];
00146 strerror_r(errno, errmsg, sizeof(errmsg));
00147 fprintf(stderr, _("Could not read transaction message.\n"));
00148 fprintf(stderr, _("GPB ERROR: %s.\n"), errmsg);
00149 fprintf(stderr, _("Raw buffer read: %s.\n"), buffer);
00150 break;
00151 }
00152
00153 result= transaction.ParseFromArray(buffer, static_cast<int32_t>(length));
00154 if (result == false)
00155 {
00156 fprintf(stderr, _("Unable to parse transaction. Got error: %s.\n"), transaction.InitializationErrorString().c_str());
00157 if (buffer != NULL)
00158 fprintf(stderr, _("BUFFER: %s\n"), buffer);
00159 break;
00160 }
00161
00162
00163 coded_input->ReadLittleEndian32(&checksum);
00164
00165 if (do_checksum)
00166 {
00167 if (checksum != drizzled::algorithm::crc32(buffer, static_cast<size_t>(length)))
00168 {
00169 fprintf(stderr, _("Checksum failed. Wanted %" PRIu32 " got %" PRIu32 "\n"), checksum, drizzled::algorithm::crc32(buffer, static_cast<size_t>(length)));
00170 }
00171 }
00172
00173
00174 if (transaction.transaction_context().transaction_id() == to_read_trx_id)
00175 {
00176
00177 to_fill->CopyFrom(transaction);
00178 break;
00179 }
00180
00181 previous_length= length;
00182 }
00183 if (buffer)
00184 free(buffer);
00185
00186 delete coded_input;
00187 delete raw_input;
00188
00189 return result;
00190 }
00191 }