26 #include "zeromq_log.h"
27 #include <drizzled/message/transaction.pb.h>
28 #include <google/protobuf/io/coded_stream.h>
30 #include <drizzled/module/registry.h>
31 #include <drizzled/plugin.h>
33 #include <boost/program_options.hpp>
37 namespace po= boost::program_options;
40 using namespace drizzled;
41 using namespace google;
43 namespace drizzle_plugin
46 ZeroMQLog::ZeroMQLog(
const string &name,
const string &endpoint) :
47 plugin::TransactionApplier(name)
49 void *context= zmq_init(1);
50 _socket= zmq_socket (context, ZMQ_PUB);
52 int rc= zmq_bind (_socket, endpoint.c_str());
54 pthread_mutex_init(&publishLock, NULL);
57 ZeroMQLog::~ZeroMQLog()
60 pthread_mutex_destroy(&publishLock);
63 plugin::ReplicationReturnCode
66 size_t message_byte_length= to_apply.ByteSize();
67 uint8_t* buffer=
new uint8_t[message_byte_length];
70 errmsg_printf(error::ERROR, _(
"Failed to allocate enough memory to transaction message\n"));
72 return plugin::UNKNOWN_ERROR;
75 string schema= getSchemaName(to_apply);
77 int rc= zmq_msg_init_size(&schemamsg, schema.length());
78 memcpy(zmq_msg_data(&schemamsg), schema.c_str(), schema.length());
80 to_apply.SerializeWithCachedSizesToArray(buffer);
82 rc= zmq_msg_init_size(&msg, message_byte_length);
84 memcpy(zmq_msg_data(&msg), buffer, message_byte_length);
87 pthread_mutex_lock(&publishLock);
88 rc= zmq_send(_socket, &schemamsg, ZMQ_SNDMORE);
89 rc= zmq_send(_socket, &msg, 0);
90 pthread_mutex_unlock(&publishLock);
93 zmq_msg_close(&schemamsg);
95 return plugin::SUCCESS;
99 if(txn.statement_size() == 0)
return "";
103 switch(statement.type())
105 case message::Statement::INSERT:
106 return statement.insert_header().table_metadata().schema_name();
107 case message::Statement::UPDATE:
108 return statement.update_header().table_metadata().schema_name();
109 case message::Statement::DELETE:
110 return statement.delete_header().table_metadata().schema_name();
111 case message::Statement::CREATE_TABLE:
112 return statement.create_table_statement().table().schema();
113 case message::Statement::TRUNCATE_TABLE:
114 return statement.truncate_table_statement().table_metadata().schema_name();
115 case message::Statement::DROP_TABLE:
116 return statement.drop_table_statement().table_metadata().schema_name();
117 case message::Statement::CREATE_SCHEMA:
118 return statement.create_schema_statement().schema().name();
119 case message::Statement::DROP_SCHEMA:
120 return statement.drop_schema_statement().schema_name();
126 static ZeroMQLog *zeromqLogger;
134 zeromqLogger=
new ZeroMQLog(
"zeromq_applier", vm[
"endpoint"].as<string>());
135 context.add(zeromqLogger);
145 po::value<string>()->default_value(
"tcp://*:9999"),
146 _(
"End point to bind to"));
147 context(
"use-replicator",
148 po::value<string>()->default_value(
"default_replicator"),
149 _(
"Name of the replicator plugin to use (default='default_replicator')"));
155 DRIZZLE_DECLARE_PLUGIN
161 N_(
"Publishes transactions to ZeroMQ"),
163 drizzle_plugin::init,
165 drizzle_plugin::init_options,
167 DRIZZLE_DECLARE_PLUGIN_END;