00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #pragma once
00022
00023 #include <plugin/slave/queue_thread.h>
00024 #include <plugin/slave/sql_executor.h>
00025 #include <drizzled/session.h>
00026
00027 namespace drizzled
00028 {
00029 namespace message
00030 {
00031 class Transaction;
00032 }
00033 }
00034
00035 namespace slave
00036 {
00037
00038 class QueueConsumer : public QueueThread, public SQLExecutor
00039 {
00040 public:
00041 QueueConsumer() :
00042 QueueThread(),
00043 SQLExecutor("slave", "replication"),
00044 _check_interval(5)
00045 { }
00046
00047 bool init();
00048 bool process();
00049 void shutdown();
00050
00051 void setSleepInterval(uint32_t seconds)
00052 {
00053 _check_interval= seconds;
00054 }
00055
00056 uint32_t getSleepInterval()
00057 {
00058 return _check_interval;
00059 }
00060
00067 void setApplierState(const std::string &err_msg, bool status);
00068
00069 private:
00070 typedef std::vector<uint64_t> TrxIdList;
00071
00073 uint32_t _check_interval;
00074
00075 bool getListOfCompletedTransactions(TrxIdList &list);
00076
00077 bool getMessage(drizzled::message::Transaction &transaction,
00078 std::string &commit_id,
00079 uint64_t trx_id,
00080 uint32_t segment_id);
00081
00092 bool convertToSQL(const drizzled::message::Transaction &transaction,
00093 std::vector<std::string> &aggregate_sql,
00094 std::vector<std::string> &segmented_sql);
00095
00105 bool executeSQLWithCommitId(std::vector<std::string> &sql,
00106 const std::string &commit_id);
00107
00116 bool deleteFromQueue(uint64_t trx_id);
00117
00124 bool isEndStatement(const drizzled::message::Statement &statement);
00125 };
00126
00127 }
00128