MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
listen.cpp
1 /*
2  Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 
19 #include <NdbOut.hpp>
20 #include <NdbApi.hpp>
21 #include <NdbSleep.h>
22 #include <NDBT.hpp>
23 #include <HugoTransactions.hpp>
24 #include <getarg.h>
25 
26 
27 #define BATCH_SIZE 128
28 struct Table_info
29 {
30  Uint32 id;
31 };
32 
33 struct Trans_arg
34 {
35  Ndb *ndb;
36  NdbTransaction *trans;
37  Uint32 bytes_batched;
38 };
39 
40 Vector< Vector<NdbRecAttr*> > event_values;
41 Vector< Vector<NdbRecAttr*> > event_pre_values;
42 Vector<struct Table_info> table_infos;
43 
44 static char* event_name(uint etype, char * buf)
45 {
46  switch(etype){
48  strcpy(buf, "TE_INSERT");
49  break;
51  strcpy(buf, "TE_DELETE");
52  break;
54  strcpy(buf, "TE_UPDATE");
55  break;
57  strcpy(buf, "TE_CLUSTER_FAILURE");
58  break;
60  strcpy(buf, "TE_ALTER");
61  break;
63  strcpy(buf, "TE_DROP");
64  break;
66  strcpy(buf, "TE_NODE_FAILURE");
67  break;
69  strcpy(buf, "TE_SUBSCRIBE");
70  break;
72  strcpy(buf, "TE_UNSUBSCRIBE");
73  break;
74  default:
75  strcpy(buf, "unknown");
76  }
77  return buf;
78 }
79 
80 static void do_begin(Ndb *ndb, struct Trans_arg &trans_arg)
81 {
82  trans_arg.ndb = ndb;
83  trans_arg.trans = ndb->startTransaction();
84  trans_arg.bytes_batched = 0;
85 }
86 
87 static void do_equal(NdbOperation *op,
88  NdbEventOperation *pOp)
89 {
90  struct Table_info *ti = (struct Table_info *)pOp->getCustomData();
91  Vector<NdbRecAttr*> &ev = event_values[ti->id];
92  const NdbDictionary::Table *tab= pOp->getTable();
93  unsigned i, n_columns = tab->getNoOfColumns();
94  for (i= 0; i < n_columns; i++)
95  {
96  if (tab->getColumn(i)->getPrimaryKey() &&
97  op->equal(i, ev[i]->aRef()))
98  {
99  abort();
100  }
101  }
102 }
103 
104 static void do_set_value(NdbOperation *op,
105  NdbEventOperation *pOp)
106 {
107  struct Table_info *ti = (struct Table_info *)pOp->getCustomData();
108  Vector<NdbRecAttr*> &ev = event_values[ti->id];
109  const NdbDictionary::Table *tab= pOp->getTable();
110  unsigned i, n_columns = tab->getNoOfColumns();
111  for (i= 0; i < n_columns; i++)
112  {
113  if (!tab->getColumn(i)->getPrimaryKey() &&
114  op->setValue(i, ev[i]->aRef()))
115  {
116  abort();
117  }
118  }
119 }
120 
121 static void do_insert(struct Trans_arg &trans_arg, NdbEventOperation *pOp)
122 {
123  if (!trans_arg.trans)
124  return;
125 
126  NdbOperation *op =
127  trans_arg.trans->getNdbOperation(pOp->getEvent()->getTableName());
128  op->writeTuple();
129 
130  do_equal(op, pOp);
131  do_set_value(op, pOp);
132 
133  trans_arg.bytes_batched++;
134  if (trans_arg.bytes_batched > BATCH_SIZE)
135  {
136  trans_arg.trans->execute(NdbTransaction::NoCommit);
137  trans_arg.bytes_batched = 0;
138  }
139 }
140 static void do_update(struct Trans_arg &trans_arg, NdbEventOperation *pOp)
141 {
142  if (!trans_arg.trans)
143  return;
144 
145  NdbOperation *op =
146  trans_arg.trans->getNdbOperation(pOp->getEvent()->getTableName());
147  op->writeTuple();
148 
149  do_equal(op, pOp);
150  do_set_value(op, pOp);
151 
152  trans_arg.bytes_batched++;
153  if (trans_arg.bytes_batched > BATCH_SIZE)
154  {
155  trans_arg.trans->execute(NdbTransaction::NoCommit);
156  trans_arg.bytes_batched = 0;
157  }
158 }
159 static void do_delete(struct Trans_arg &trans_arg, NdbEventOperation *pOp)
160 {
161  if (!trans_arg.trans)
162  return;
163 
164  NdbOperation *op =
165  trans_arg.trans->getNdbOperation(pOp->getEvent()->getTableName());
166  op->deleteTuple();
167 
168  do_equal(op, pOp);
169 
170  trans_arg.bytes_batched++;
171  if (trans_arg.bytes_batched > BATCH_SIZE)
172  {
173  trans_arg.trans->execute(NdbTransaction::NoCommit);
174  trans_arg.bytes_batched = 0;
175  }
176 }
177 static void do_commit(struct Trans_arg &trans_arg)
178 {
179  if (!trans_arg.trans)
180  return;
181  trans_arg.trans->execute(NdbTransaction::Commit);
182  trans_arg.ndb->closeTransaction(trans_arg.trans);
183 }
184 
185 int
186 main(int argc, const char** argv){
187  ndb_init();
188 
189 
190  int _help = 0;
191  const char* db = 0;
192  const char* connectstring1 = 0;
193  const char* connectstring2 = 0;
194 
195  struct getargs args[] = {
196  { "connectstring1", 'c',
197  arg_string, &connectstring1, "connectstring1", "" },
198  { "connectstring2", 'C',
199  arg_string, &connectstring2, "connectstring2", "" },
200  { "database", 'd', arg_string, &db, "Database", "" },
201  { "usage", '?', arg_flag, &_help, "Print help", "" }
202  };
203  int num_args = sizeof(args) / sizeof(args[0]);
204  int optind = 0, i;
205  char desc[] =
206  "<tabname>+ \nThis program listen to events on specified tables\n";
207 
208  if(getarg(args, num_args, argc, argv, &optind) ||
209  argv[optind] == NULL || _help) {
210  arg_printusage(args, num_args, argv[0], desc);
211  return NDBT_ProgramExit(NDBT_WRONGARGS);
212  }
213 
214  // Connect to Ndb
215  Ndb_cluster_connection con(connectstring1);
216  if(con.connect(12, 5, 1) != 0)
217  {
218  return NDBT_ProgramExit(NDBT_FAILED);
219  }
220  Ndb MyNdb( &con, db ? db : "TEST_DB" );
221 
222  if(MyNdb.init() != 0){
223  ERR(MyNdb.getNdbError());
224  return NDBT_ProgramExit(NDBT_FAILED);
225  }
226 
227  // Connect to Ndb and wait for it to become ready
228  while(MyNdb.waitUntilReady() != 0)
229  ndbout << "Waiting for ndb to become ready..." << endl;
230 
231  Ndb_cluster_connection *con2 = NULL;
232  Ndb *ndb2 = NULL;
233  if (connectstring2)
234  {
235  con2 = new Ndb_cluster_connection(connectstring2);
236 
237  if(con2->connect(12, 5, 1) != 0)
238  {
239  return NDBT_ProgramExit(NDBT_FAILED);
240  }
241  ndb2 = new Ndb( con2, db ? db : "TEST_DB" );
242 
243  if(ndb2->init() != 0){
244  ERR(ndb2->getNdbError());
245  return NDBT_ProgramExit(NDBT_FAILED);
246  }
247 
248  // Connect to Ndb and wait for it to become ready
249  while(ndb2->waitUntilReady() != 0)
250  ndbout << "Waiting for ndb to become ready..." << endl;
251  }
252 
253  int result = 0;
254 
255  NdbDictionary::Dictionary *myDict = MyNdb.getDictionary();
257  Vector<NdbEventOperation*> event_ops;
258  int sz = 0;
259  for(i= optind; i<argc; i++)
260  {
261  const NdbDictionary::Table* table= myDict->getTable(argv[i]);
262  if(!table)
263  {
264  ndbout_c("Could not find table: %s, skipping", argv[i]);
265  continue;
266  }
267 
269  name.appfmt("EV-%s", argv[i]);
270  NdbDictionary::Event *myEvent= new NdbDictionary::Event(name.c_str());
271  myEvent->setTable(table->getName());
273  for(int a = 0; a < table->getNoOfColumns(); a++){
274  myEvent->addEventColumn(a);
275  }
277  (NdbDictionary::Event::ER_UPDATED |
278  NdbDictionary::Event::ER_DDL));
279 
280  if (myDict->createEvent(* myEvent))
281  {
283  {
284  g_info << "Event creation failed event exists. Removing...\n";
285  if (myDict->dropEvent(name.c_str()))
286  {
287  g_err << "Failed to drop event: " << myDict->getNdbError() << endl;
288  result = 1;
289  goto end;
290  }
291  // try again
292  if (myDict->createEvent(* myEvent))
293  {
294  g_err << "Failed to create event: " << myDict->getNdbError() << endl;
295  result = 1;
296  goto end;
297  }
298  }
299  else
300  {
301  g_err << "Failed to create event: " << myDict->getNdbError() << endl;
302  result = 1;
303  goto end;
304  }
305  }
306 
307  events.push_back(myEvent);
308 
309  NdbEventOperation* pOp = MyNdb.createEventOperation(name.c_str());
310  if ( pOp == NULL ) {
311  g_err << "Event operation creation failed" << endl;
312  result = 1;
313  goto end;
314  }
315 
316  event_values.push_back(Vector<NdbRecAttr *>());
317  event_pre_values.push_back(Vector<NdbRecAttr *>());
318  for (int a = 0; a < table->getNoOfColumns(); a++)
319  {
320  event_values[sz].
321  push_back(pOp->getValue(table->getColumn(a)->getName()));
322  event_pre_values[sz].
323  push_back(pOp->getPreValue(table->getColumn(a)->getName()));
324  }
325  event_ops.push_back(pOp);
326  {
327  struct Table_info ti;
328  ti.id = sz;
329  table_infos.push_back(ti);
330  }
331  pOp->setCustomData((void *)&table_infos[sz]);
332  sz++;
333  }
334 
335  for(i= 0; i<(int)event_ops.size(); i++)
336  {
337  if (event_ops[i]->execute())
338  {
339  g_err << "operation execution failed: " << event_ops[i]->getNdbError()
340  << endl;
341  result = 1;
342  goto end;
343  }
344  }
345 
346  struct Trans_arg trans_arg;
347  char buf[64];
348 
349  while(true)
350  {
351  while(MyNdb.pollEvents(100) == 0);
352 
353  NdbEventOperation* pOp= MyNdb.nextEvent();
354  while(pOp)
355  {
356  Uint64 gci= pOp->getGCI();
357  Uint64 cnt_i= 0, cnt_u= 0, cnt_d= 0;
358  if (ndb2)
359  do_begin(ndb2, trans_arg);
360  do
361  {
362  switch(pOp->getEventType())
363  {
365  cnt_i++;
366  if (ndb2)
367  do_insert(trans_arg, pOp);
368  break;
370  cnt_d++;
371  if (ndb2)
372  do_delete(trans_arg, pOp);
373  break;
375  cnt_u++;
376  if (ndb2)
377  do_update(trans_arg, pOp);
378  break;
380  ndbout_c("Received event: %s", event_name(pOp->getEventType(), buf));
381  break;
383  ndbout_c("Received event: %s", event_name(pOp->getEventType(), buf));
384  break;
386  ndbout_c("Received event: %s", event_name(pOp->getEventType(), buf));
387  break;
389  ndbout_c("Received event: %s", event_name(pOp->getEventType(), buf));
390  break;
393  ndbout_c("Received event: %s", event_name(pOp->getEventType(), buf));
394  break;
395  default:
396  /* We should REALLY never get here. */
397  ndbout_c("Error: unknown event type: %u",
398  (Uint32)pOp->getEventType());
399  abort();
400  }
401  } while ((pOp= MyNdb.nextEvent()) && gci == pOp->getGCI());
402  if (ndb2)
403  do_commit(trans_arg);
404  ndbout_c("GCI: %u/%u events: %lld(I) %lld(U) %lld(D)",
405  Uint32(gci >> 32), Uint32(gci), cnt_i, cnt_u, cnt_d);
406  }
407  }
408 end:
409  for(i= 0; i<(int)event_ops.size(); i++)
410  MyNdb.dropEventOperation(event_ops[i]);
411 
412  if (ndb2)
413  delete ndb2;
414  if (con2)
415  delete con2;
416  return NDBT_ProgramExit(NDBT_OK);
417 }
418 
419 template class Vector<struct Table_info>;
420 template class Vector<NdbRecAttr*>;
421 template class Vector< Vector<NdbRecAttr*> >;
422 template class Vector<NdbDictionary::Event*>;
423 template class Vector<NdbEventOperation*>;