Drizzled Public API Documentation

rabbitmq_handler.cc
00001 /* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
00002  *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
00003  *
00004  *  Copyright (C) 2010 Marcus Eriksson
00005  *
00006  *  Authors:
00007  *
00008  *  Marcus Eriksson <krummas@gmail.com>
00009  *
00010  *  This program is free software; you can redistribute it and/or modify
00011  *  it under the terms of the GNU General Public License as published by
00012  *  the Free Software Foundation; either version 2 of the License, or
00013  *  (at your option) any later version.
00014  *
00015  *  This program is distributed in the hope that it will be useful,
00016  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00018  *  GNU General Public License for more details.
00019  *
00020  *  You should have received a copy of the GNU General Public License
00021  *  along with this program; if not, write to the Free Software
00022  *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
00023  */
00024 
00025 #include <config.h>
00026 
00027 #include <drizzled/gettext.h>
00028 
00029 #include "rabbitmq_handler.h"
00030 
00031 using namespace std;
00032 
00033 namespace drizzle_plugin
00034 {
00035 
00036 RabbitMQHandler::RabbitMQHandler(const std::string &rabbitMQHost, 
00037                                  const in_port_t rabbitMQPort, 
00038                                  const std::string &rabbitMQUsername, 
00039                                  const std::string &rabbitMQPassword, 
00040                                  const std::string &rabbitMQVirtualhost) 
00041   throw(rabbitmq_handler_exception) :
00042     rabbitmqConnection(amqp_new_connection()),
00043     sockfd(amqp_open_socket(rabbitMQHost.c_str(), rabbitMQPort)),
00044     hostname(rabbitMQHost),
00045     port(rabbitMQPort),
00046     username(rabbitMQUsername),
00047     password(rabbitMQPassword),
00048     virtualhost(rabbitMQVirtualhost)
00049 {
00050   /* open the socket to the rabbitmq server */
00051   if(sockfd < 0) 
00052   {
00053     throw rabbitmq_handler_exception(_("Could not open socket, is rabbitmq running?"));
00054   }
00055   amqp_set_sockfd(rabbitmqConnection, sockfd);
00056   /* login to rabbitmq, handleAMQPError throws exception if there is a problem */
00057   handleAMQPError(amqp_login(rabbitmqConnection, 
00058                              virtualhost.c_str(), 
00059                              0, 
00060                              131072, 
00061                              0, 
00062                              AMQP_SASL_METHOD_PLAIN, 
00063                              username.c_str(), 
00064                              password.c_str()), 
00065                   "rabbitmq login");
00066   /* open the channel */
00067   amqp_channel_open(rabbitmqConnection, 1);
00068 }
00069 
00070 RabbitMQHandler::~RabbitMQHandler()
00071 {
00072   try
00073   {
00074     handleAMQPError(amqp_channel_close(rabbitmqConnection, 
00075                1, 
00076                AMQP_REPLY_SUCCESS),
00077         "close channel");
00078     handleAMQPError(amqp_connection_close(rabbitmqConnection, 
00079             AMQP_REPLY_SUCCESS),
00080         "close connection");
00081     amqp_destroy_connection(rabbitmqConnection);
00082   }
00083   catch(exception& e) {} // do not throw in destructorn 
00084   
00085   close(sockfd);
00086 }
00087 
00088 void RabbitMQHandler::publish(void *message, 
00089                               const int length, 
00090                               const std::string &exchangeName, 
00091                               const std::string &routingKey)
00092 throw(rabbitmq_handler_exception)
00093 {
00094   amqp_bytes_t b;
00095   b.bytes= message;
00096   b.len= length;
00097   
00098   if (amqp_basic_publish(rabbitmqConnection,
00099                          1,
00100                          amqp_cstring_bytes(exchangeName.c_str()),
00101                          amqp_cstring_bytes(routingKey.c_str()),
00102                          0,
00103                          0,
00104                          NULL,
00105                          b) < 0)
00106   {
00107     throw rabbitmq_handler_exception("Could not publish message");
00108   }
00109 
00110 }
00111 
00112 void RabbitMQHandler::handleAMQPError(amqp_rpc_reply_t x, string context) throw(rabbitmq_handler_exception)
00113 {
00114   string errorMessage("");
00115   switch (x.reply_type) {
00116   case AMQP_RESPONSE_NORMAL:
00117     break;
00118   case AMQP_RESPONSE_NONE:
00119     errorMessage.assign("No response in ");
00120     errorMessage.append(context);
00121     throw rabbitmq_handler_exception(errorMessage);
00122   case AMQP_RESPONSE_LIBRARY_EXCEPTION:
00123   case AMQP_RESPONSE_SERVER_EXCEPTION:
00124     switch (x.reply.id) {      
00125     case AMQP_CONNECTION_CLOSE_METHOD:
00126       errorMessage.assign("Connection closed in ");
00127       errorMessage.append(context);
00128       throw rabbitmq_handler_exception(errorMessage);
00129     case AMQP_CHANNEL_CLOSE_METHOD:
00130       errorMessage.assign("Channel closed in ");
00131       errorMessage.append(context);
00132       throw rabbitmq_handler_exception(errorMessage);
00133     default:
00134       errorMessage.assign("Unknown error in ");
00135       errorMessage.append(context);
00136       throw rabbitmq_handler_exception(errorMessage);
00137     }
00138   }
00139 }
00140 
00141 } /* namespace drizzle_plugin */