Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00045 #include <config.h>
00046 #include "write_buffer.h"
00047 #include "transaction_log.h"
00048 #include "transaction_log_applier.h"
00049 #include "transaction_log_index.h"
00050
00051 #include <vector>
00052
00053 #include <drizzled/message/transaction.pb.h>
00054 #include <drizzled/util/functors.h>
00055 #include <drizzled/session.h>
00056
00057 using namespace std;
00058 using namespace drizzled;
00059
00060 TransactionLogApplier *transaction_log_applier= NULL;
00061
00062 TransactionLogApplier::TransactionLogApplier(const string name_arg,
00063 TransactionLog *in_transaction_log,
00064 TransactionLogIndex *in_transaction_log_index,
00065 uint32_t in_num_write_buffers) :
00066 plugin::TransactionApplier(name_arg),
00067 transaction_log(in_transaction_log),
00068 transaction_log_index(in_transaction_log_index),
00069 num_write_buffers(in_num_write_buffers),
00070 write_buffers()
00071 {
00072
00073
00074
00075 write_buffers.reserve(num_write_buffers);
00076 for (size_t x= 0; x < num_write_buffers; ++x)
00077 {
00078 write_buffers.push_back(new WriteBuffer());
00079 }
00080 }
00081
00082 TransactionLogApplier::~TransactionLogApplier()
00083 {
00084 for_each(write_buffers.begin(),
00085 write_buffers.end(),
00086 DeletePtr());
00087 write_buffers.clear();
00088 delete transaction_log;
00089 delete transaction_log_index;
00090 }
00091
00092 WriteBuffer *TransactionLogApplier::getWriteBuffer(const Session &session)
00093 {
00094 return write_buffers[session.getSessionId() % num_write_buffers];
00095 }
00096
00097 plugin::ReplicationReturnCode
00098 TransactionLogApplier::apply(Session &in_session,
00099 const message::Transaction &to_apply)
00100 {
00101 size_t entry_size= TransactionLog::getLogEntrySize(to_apply);
00102 WriteBuffer *write_buffer= getWriteBuffer(in_session);
00103
00104 uint32_t checksum;
00105
00106 write_buffer->lock();
00107 write_buffer->resize(entry_size);
00108 uint8_t *bytes= write_buffer->getRawBytes();
00109 bytes= transaction_log->packTransactionIntoLogEntry(to_apply,
00110 bytes,
00111 &checksum);
00112
00113 off_t written_to= transaction_log->writeEntry(bytes, entry_size);
00114 write_buffer->unlock();
00115
00116
00117 transaction_log_index->addEntry(TransactionLogEntry(ReplicationServices::TRANSACTION,
00118 written_to,
00119 entry_size),
00120 to_apply,
00121 checksum);
00122 return plugin::SUCCESS;
00123 }