26 #include "rabbitmq_log.h"
27 #include <drizzled/message/transaction.pb.h>
28 #include <google/protobuf/io/coded_stream.h>
30 #include <drizzled/module/registry.h>
31 #include <drizzled/plugin.h>
33 #include "rabbitmq_handler.h"
34 #include <boost/program_options.hpp>
37 namespace po= boost::program_options;
40 using namespace drizzled;
41 using namespace google;
43 namespace drizzle_plugin
52 RabbitMQLog::RabbitMQLog(
const string &name,
54 plugin::TransactionApplier(name),
55 _rabbitMQHandler(mqHandler)
58 RabbitMQLog::~RabbitMQLog()
60 _rabbitMQHandler->disconnect();
61 delete _rabbitMQHandler;
64 plugin::ReplicationReturnCode
67 size_t message_byte_length= to_apply.ByteSize();
68 uint8_t* buffer=
new uint8_t[message_byte_length];
71 errmsg_printf(error::ERROR, _(
"Failed to allocate enough memory to transaction message\n"));
73 return plugin::UNKNOWN_ERROR;
76 to_apply.SerializeWithCachedSizesToArray(buffer);
79 while (!sent && tries > 0) {
83 _rabbitMQHandler->
publish(buffer,
int(message_byte_length));
88 errmsg_printf(error::ERROR,
"%s", e.what());
90 _rabbitMQHandler->reconnect();
91 }
catch(exception &e) {
92 errmsg_printf(error::ERROR, _(
"Could not reconnect, trying again.. - waiting 10 seconds for server to come back"));
99 if(sent)
return plugin::SUCCESS;
100 errmsg_printf(error::ERROR, _(
"RabbitMQ server has disappeared, failing transaction."));
102 return plugin::UNKNOWN_ERROR;
121 sysvar_rabbitmq_port,
122 vm[
"username"].as<string>(),
123 vm[
"password"].as<string>(),
124 vm[
"virtualhost"].as<string>(),
125 vm[
"exchange"].as<string>(),
126 vm[
"routingkey"].as<string>());
130 errmsg_printf(error::ERROR, _(
"Failed to allocate the RabbitMQHandler. Got error: %s\n"),
136 rabbitmqLogger=
new RabbitMQLog(
"rabbitmq_applier", rabbitmqHandler);
140 errmsg_printf(error::ERROR, _(
"Failed to allocate the RabbitMQLog instance. Got error: %s\n"),
145 context.add(rabbitmqLogger);
163 po::value<string>()->default_value(
"localhost"),
164 _(
"Host name to connect to"));
166 po::value<port_constraint>(&sysvar_rabbitmq_port)->default_value(5672),
167 _(
"Port to connect to"));
168 context(
"virtualhost",
169 po::value<string>()->default_value(
"/"),
170 _(
"RabbitMQ virtualhost"));
172 po::value<string>()->default_value(
"guest"),
173 _(
"RabbitMQ username"));
175 po::value<string>()->default_value(
"guest"),
176 _(
"RabbitMQ password"));
177 context(
"use-replicator",
178 po::value<string>()->default_value(
"default_replicator"),
179 _(
"Name of the replicator plugin to use (default='default_replicator')"));
181 po::value<string>()->default_value(
"ReplicationExchange"),
182 _(
"Name of RabbitMQ exchange to publish to"));
183 context(
"routingkey",
184 po::value<string>()->default_value(
"ReplicationRoutingKey"),
185 _(
"Name of RabbitMQ routing key to use"));
190 DRIZZLE_DECLARE_PLUGIN
196 N_(
"Publishes transactions to RabbitMQ"),
198 drizzle_plugin::init,
200 drizzle_plugin::init_options
202 DRIZZLE_DECLARE_PLUGIN_END;