23 #include <HugoTransactions.hpp> 
   27 #define BATCH_SIZE 128 
   44 static char* event_name(uint etype, 
char * 
buf)
 
   48     strcpy(buf, 
"TE_INSERT");
 
   51     strcpy(buf, 
"TE_DELETE");
 
   54     strcpy(buf, 
"TE_UPDATE");
 
   57     strcpy(buf, 
"TE_CLUSTER_FAILURE");
 
   60     strcpy(buf, 
"TE_ALTER");
 
   63     strcpy(buf, 
"TE_DROP");
 
   66     strcpy(buf, 
"TE_NODE_FAILURE");
 
   69     strcpy(buf, 
"TE_SUBSCRIBE");
 
   72     strcpy(buf, 
"TE_UNSUBSCRIBE");
 
   75     strcpy(buf, 
"unknown");
 
   80 static void do_begin(
Ndb *ndb, 
struct Trans_arg &trans_arg)
 
   84   trans_arg.bytes_batched = 0;
 
   94   for (i= 0; i < n_columns; i++)
 
   97         op->
equal(i, ev[i]->aRef()))
 
  111   for (i= 0; i < n_columns; i++)
 
  123   if (!trans_arg.trans)
 
  131   do_set_value(op, pOp);
 
  133   trans_arg.bytes_batched++;
 
  134   if (trans_arg.bytes_batched > BATCH_SIZE)
 
  137     trans_arg.bytes_batched = 0; 
 
  142   if (!trans_arg.trans)
 
  150   do_set_value(op, pOp);
 
  152   trans_arg.bytes_batched++;
 
  153   if (trans_arg.bytes_batched > BATCH_SIZE)
 
  156     trans_arg.bytes_batched = 0; 
 
  161   if (!trans_arg.trans)
 
  170   trans_arg.bytes_batched++;
 
  171   if (trans_arg.bytes_batched > BATCH_SIZE)
 
  174     trans_arg.bytes_batched = 0; 
 
  177 static void do_commit(
struct Trans_arg &trans_arg)
 
  179   if (!trans_arg.trans)
 
  186 main(
int argc, 
const char** argv){
 
  192   const char* connectstring1 = 0;
 
  193   const char* connectstring2 = 0;
 
  196     { 
"connectstring1", 
'c',
 
  197       arg_string, &connectstring1, 
"connectstring1", 
"" },
 
  198     { 
"connectstring2", 
'C',
 
  199       arg_string, &connectstring2, 
"connectstring2", 
"" },
 
  200     { 
"database", 
'd', arg_string, &db, 
"Database", 
"" },
 
  201     { 
"usage", 
'?', arg_flag, &_help, 
"Print help", 
"" }
 
  203   int num_args = 
sizeof(args) / 
sizeof(args[0]);
 
  206     "<tabname>+ \nThis program listen to events on specified tables\n";
 
  208   if(getarg(args, num_args, argc, argv, &optind) ||
 
  209      argv[optind] == NULL || _help) {
 
  210     arg_printusage(args, num_args, argv[0], desc);
 
  211     return NDBT_ProgramExit(NDBT_WRONGARGS);
 
  218     return NDBT_ProgramExit(NDBT_FAILED);
 
  220   Ndb MyNdb( &con, db ? db : 
"TEST_DB" );
 
  222   if(MyNdb.init() != 0){
 
  223     ERR(MyNdb.getNdbError());
 
  224     return NDBT_ProgramExit(NDBT_FAILED);
 
  228   while(MyNdb.waitUntilReady() != 0)
 
  229     ndbout << 
"Waiting for ndb to become ready..." << endl;
 
  237     if(con2->
connect(12, 5, 1) != 0)
 
  239       return NDBT_ProgramExit(NDBT_FAILED);
 
  241     ndb2 = 
new Ndb( con2, db ? db : 
"TEST_DB" );
 
  243     if(ndb2->
init() != 0){
 
  245       return NDBT_ProgramExit(NDBT_FAILED);
 
  250       ndbout << 
"Waiting for ndb to become ready..." << endl;
 
  259   for(i= optind; i<argc; i++)
 
  264       ndbout_c(
"Could not find table: %s, skipping", argv[i]);
 
  269     name.
appfmt(
"EV-%s", argv[i]);
 
  277                        (NdbDictionary::Event::ER_UPDATED |
 
  278                         NdbDictionary::Event::ER_DDL));
 
  284         g_info << 
"Event creation failed event exists. Removing...\n";
 
  287           g_err << 
"Failed to drop event: " << myDict->
getNdbError() << endl;
 
  294           g_err << 
"Failed to create event: " << myDict->
getNdbError() << endl;
 
  301         g_err << 
"Failed to create event: " << myDict->
getNdbError() << endl;
 
  307     events.push_back(myEvent);
 
  311       g_err << 
"Event operation creation failed" << endl;
 
  322       event_pre_values[sz].
 
  325     event_ops.push_back(pOp);
 
  329       table_infos.push_back(ti);
 
  331     pOp->setCustomData((
void *)&table_infos[sz]);
 
  335   for(i= 0; i<(int)event_ops.size(); i++)
 
  337     if (event_ops[i]->execute())
 
  339       g_err << 
"operation execution failed: " << event_ops[
i]->getNdbError()
 
  351     while(MyNdb.pollEvents(100) == 0);
 
  356       Uint64 gci= pOp->
getGCI();
 
  357       Uint64 cnt_i= 0, cnt_u= 0, cnt_d= 0;
 
  359         do_begin(ndb2, trans_arg);
 
  367             do_insert(trans_arg, pOp);
 
  372             do_delete(trans_arg, pOp);
 
  377             do_update(trans_arg, pOp);
 
  397           ndbout_c(
"Error: unknown event type: %u", 
 
  401       } 
while ((pOp= MyNdb.nextEvent()) && gci == pOp->
getGCI());
 
  403         do_commit(trans_arg);
 
  404       ndbout_c(
"GCI: %u/%u events: %lld(I) %lld(U) %lld(D)",
 
  405                Uint32(gci >> 32), Uint32(gci), cnt_i, cnt_u, cnt_d);
 
  409   for(i= 0; i<(int)event_ops.size(); i++)
 
  410     MyNdb.dropEventOperation(event_ops[i]);
 
  416   return NDBT_ProgramExit(NDBT_OK);