00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #include <config.h>
00022 #include <plugin/slave/replication_slave.h>
00023 #include <drizzled/program_options/config_file.h>
00024 #include <drizzled/errmsg_print.h>
00025 #include <boost/program_options.hpp>
00026 #include <fstream>
00027
00028 using namespace std;
00029 using namespace drizzled;
00030
00031 namespace po= boost::program_options;
00032
00033 namespace slave
00034 {
00035
00036
00037 void ReplicationSlave::startup(Session &session)
00038 {
00039 (void)session;
00040 if (not initWithConfig())
00041 {
00042 errmsg_printf(error::ERROR,
00043 _("Could not start slave services: %s\n"),
00044 getError().c_str());
00045 }
00046 else
00047 {
00048 _consumer_thread= boost::thread(&QueueConsumer::run, &_consumer);
00049 _producer_thread= boost::thread(&QueueProducer::run, &_producer);
00050 }
00051 }
00052
00053 bool ReplicationSlave::initWithConfig()
00054 {
00055 po::variables_map vm;
00056 po::options_description slave_options("Options for the slave plugin");
00057
00058 slave_options.add_options()
00059 ("master-host", po::value<string>()->default_value(""))
00060 ("master-port", po::value<uint16_t>()->default_value(3306))
00061 ("master-user", po::value<string>()->default_value(""))
00062 ("master-pass", po::value<string>()->default_value(""))
00063 ("max-reconnects", po::value<uint32_t>()->default_value(10))
00064 ("seconds-between-reconnects", po::value<uint32_t>()->default_value(30))
00065 ("io-thread-sleep", po::value<uint32_t>()->default_value(5))
00066 ("applier-thread-sleep", po::value<uint32_t>()->default_value(5));
00067
00068 ifstream cf_stream(_config_file.c_str());
00069
00070 if (not cf_stream.is_open())
00071 {
00072 _error= "Unable to open file ";
00073 _error.append(_config_file);
00074 return false;
00075 }
00076
00077 po::store(drizzled::program_options::parse_config_file(cf_stream, slave_options), vm);
00078
00079 po::notify(vm);
00080
00081 if (vm.count("master-host"))
00082 _producer.setMasterHost(vm["master-host"].as<string>());
00083
00084 if (vm.count("master-port"))
00085 _producer.setMasterPort(vm["master-port"].as<uint16_t>());
00086
00087 if (vm.count("master-user"))
00088 _producer.setMasterUser(vm["master-user"].as<string>());
00089
00090 if (vm.count("master-pass"))
00091 _producer.setMasterPassword(vm["master-pass"].as<string>());
00092
00093 if (vm.count("max-reconnects"))
00094 _producer.setMaxReconnectAttempts(vm["max-reconnects"].as<uint32_t>());
00095
00096 if (vm.count("seconds-between-reconnects"))
00097 _producer.setSecondsBetweenReconnects(vm["seconds-between-reconnects"].as<uint32_t>());
00098
00099 if (vm.count("io-thread-sleep"))
00100 _producer.setSleepInterval(vm["io-thread-sleep"].as<uint32_t>());
00101
00102 if (vm.count("applier-thread-sleep"))
00103 _consumer.setSleepInterval(vm["applier-thread-sleep"].as<uint32_t>());
00104
00105
00106 ReplicationSchema rs;
00107 if (not rs.create())
00108 {
00109 _error= rs.getErrorMessage();
00110 return false;
00111 }
00112
00113 if (_initial_max_commit_id)
00114 {
00115 if (not rs.setInitialMaxCommitId(_initial_max_commit_id))
00116 {
00117 _error= rs.getErrorMessage();
00118 return false;
00119 }
00120 }
00121
00122 return true;
00123 }
00124
00125 }