MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
test_event_multi_table.cpp
1 /*
2  Copyright (C) 2005, 2006, 2008 MySQL AB
3  All rights reserved. Use is subject to license terms.
4 
5  This program is free software; you can redistribute it and/or modify
6  it under the terms of the GNU General Public License as published by
7  the Free Software Foundation; version 2 of the License.
8 
9  This program is distributed in the hope that it will be useful,
10  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  GNU General Public License for more details.
13 
14  You should have received a copy of the GNU General Public License
15  along with this program; if not, write to the Free Software
16  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18 
19 #include <ndb_global.h>
20 #include <ndb_opts.h>
21 #include <NDBT_Test.hpp>
22 #include <NDBT_ReturnCodes.h>
23 #include <HugoTransactions.hpp>
24 #include <UtilTransactions.hpp>
25 #include <TestNdbEventOperation.hpp>
26 #include <NdbRestarter.hpp>
27 #include <NdbRestarts.hpp>
28 
29 static void usage()
30 {
31  ndb_std_print_version();
32 }
33 
34 static int start_transaction(Ndb *ndb, Vector<HugoOperations*> &ops)
35 {
36  if (ops[0]->startTransaction(ndb) != NDBT_OK)
37  return -1;
38  NdbTransaction * t= ops[0]->getTransaction();
39  for (int i= ops.size()-1; i > 0; i--)
40  {
41  ops[i]->setTransaction(t);
42  }
43  return 0;
44 }
45 
46 static int close_transaction(Ndb *ndb, Vector<HugoOperations*> &ops)
47 {
48  if (ops[0]->closeTransaction(ndb) != NDBT_OK)
49  return -1;
50  for (int i= ops.size()-1; i > 0; i--)
51  {
52  ops[i]->setTransaction(NULL);
53  }
54  return 0;
55 }
56 
57 static int execute_commit(Ndb *ndb, Vector<HugoOperations*> &ops)
58 {
59  if (ops[0]->execute_Commit(ndb) != NDBT_OK)
60  return -1;
61  return 0;
62 }
63 
64 static int copy_events(Ndb *ndb)
65 {
66  DBUG_ENTER("copy_events");
67  int r= 0;
69  while (1)
70  {
71  int res= ndb->pollEvents(1000); // wait for event or 1000 ms
72  DBUG_PRINT("info", ("pollEvents res=%d", res));
73  if (res <= 0)
74  {
75  break;
76  }
77  int error= 0;
78  NdbEventOperation *pOp;
79  while ((pOp= ndb->nextEvent()))
80  {
81  char buf[1024];
82  sprintf(buf, "%s_SHADOW", pOp->getTable()->getName());
83  const NdbDictionary::Table *table= dict->getTable(buf);
84 
85  if (table == 0)
86  {
87  g_err << "unable to find table " << buf << endl;
88  DBUG_RETURN(-1);
89  }
90 
91  if (pOp->isOverrun())
92  {
93  g_err << "buffer overrun\n";
94  DBUG_RETURN(-1);
95  }
96  r++;
97 
98  Uint32 gci= pOp->getGCI();
99 
100  if (!pOp->isConsistent()) {
101  g_err << "A node failure has occured and events might be missing\n";
102  DBUG_RETURN(-1);
103  }
104 
105  int noRetries= 0;
106  do
107  {
108  NdbTransaction *trans= ndb->startTransaction();
109  if (trans == 0)
110  {
111  g_err << "startTransaction failed "
112  << ndb->getNdbError().code << " "
113  << ndb->getNdbError().message << endl;
114  DBUG_RETURN(-1);
115  }
116 
117  NdbOperation *op= trans->getNdbOperation(table);
118  if (op == 0)
119  {
120  g_err << "getNdbOperation failed "
121  << trans->getNdbError().code << " "
122  << trans->getNdbError().message << endl;
123  DBUG_RETURN(-1);
124  }
125 
126  switch (pOp->getEventType()) {
128  if (op->insertTuple())
129  {
130  g_err << "insertTuple "
131  << op->getNdbError().code << " "
132  << op->getNdbError().message << endl;
133  DBUG_RETURN(-1);
134  }
135  break;
137  if (op->deleteTuple())
138  {
139  g_err << "deleteTuple "
140  << op->getNdbError().code << " "
141  << op->getNdbError().message << endl;
142  DBUG_RETURN(-1);
143  }
144  break;
146  if (op->updateTuple())
147  {
148  g_err << "updateTuple "
149  << op->getNdbError().code << " "
150  << op->getNdbError().message << endl;
151  DBUG_RETURN(-1);
152  }
153  break;
154  default:
155  abort();
156  }
157 
158  {
159  for (const NdbRecAttr *pk= pOp->getFirstPkAttr(); pk; pk= pk->next())
160  {
161  if (pk->isNULL())
162  {
163  g_err << "internal error: primary key isNull()="
164  << pk->isNULL() << endl;
165  DBUG_RETURN(NDBT_FAILED);
166  }
167  if (op->equal(pk->getColumn()->getColumnNo(),pk->aRef()))
168  {
169  g_err << "equal " << pk->getColumn()->getColumnNo() << " "
170  << op->getNdbError().code << " "
171  << op->getNdbError().message << endl;
172  DBUG_RETURN(NDBT_FAILED);
173  }
174  }
175  }
176 
177  switch (pOp->getEventType()) {
179  {
180  for (const NdbRecAttr *data= pOp->getFirstDataAttr(); data; data= data->next())
181  {
182  if (data->isNULL() < 0 ||
183  op->setValue(data->getColumn()->getColumnNo(),
184  data->isNULL() ? 0:data->aRef()))
185  {
186  g_err << "setValue(insert) " << data->getColumn()->getColumnNo() << " "
187  << op->getNdbError().code << " "
188  << op->getNdbError().message << endl;
189  DBUG_RETURN(-1);
190  }
191  }
192  break;
193  }
195  break;
197  {
198  for (const NdbRecAttr *data= pOp->getFirstDataAttr(); data; data= data->next())
199  {
200  if (data->isNULL() >= 0 &&
201  op->setValue(data->getColumn()->getColumnNo(),
202  data->isNULL() ? 0:data->aRef()))
203  {
204  g_err << "setValue(update) " << data->getColumn()->getColumnNo() << " "
205  << op->getNdbError().code << " "
206  << op->getNdbError().message << endl;
207  DBUG_RETURN(NDBT_FAILED);
208  }
209  }
210  break;
211  }
213  abort();
214  }
215  if (trans->execute(Commit) == 0)
216  {
217  trans->close();
218  // everything ok
219  break;
220  }
221  if (noRetries++ == 10 ||
223  {
224  g_err << "execute " << r << " failed "
225  << trans->getNdbError().code << " "
226  << trans->getNdbError().message << endl;
227  trans->close();
228  DBUG_RETURN(-1);
229  }
230  trans->close();
231  NdbSleep_MilliSleep(100); // sleep before retying
232  } while(1);
233  } // for
234  if (error)
235  {
236  g_err << "nextEvent()\n";
237  DBUG_RETURN(-1);
238  }
239  } // while(1)
240  DBUG_RETURN(r);
241 }
242 
243 static int verify_copy(Ndb *ndb,
246 {
247  for (unsigned i= 0; i < tabs1.size(); i++)
248  if (tabs1[i])
249  {
250  HugoTransactions hugoTrans(*tabs1[i]);
251  if (hugoTrans.compare(ndb, tabs2[i]->getName(), 0))
252  return -1;
253  }
254  return 0;
255 }
256 
257 static const char* _dbname = "TEST_DB";
258 struct my_option my_long_options[] =
259 {
260  NDB_STD_OPTS(""),
261  { "database", 'd', "Name of database table is in",
262  (uchar**) &_dbname, (uchar**) &_dbname, 0,
263  GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
264  { 0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}
265 };
266 
267 int
268 main(int argc, char** argv)
269 {
270  NDB_INIT(argv[0]);
271  const char *load_default_groups[]= { "mysql_cluster",0 };
272  load_defaults("my",load_default_groups,&argc,&argv);
273 
274  int ho_error;
275 #ifndef DBUG_OFF
276  opt_debug= "d:t:F:L";
277 #endif
278  if ((ho_error=handle_options(&argc, &argv, my_long_options,
279  ndb_std_get_one_option)))
280  return NDBT_ProgramExit(NDBT_WRONGARGS);
281 
282  DBUG_ENTER("main");
283  Ndb_cluster_connection con(opt_connect_str);
284  if(con.connect(12, 5, 1))
285  {
286  DBUG_RETURN(NDBT_ProgramExit(NDBT_FAILED));
287  }
288 
289 
290  Ndb ndb(&con,_dbname);
291  ndb.init();
292  while (ndb.waitUntilReady() != 0);
293 
295  int no_error= 1;
296  int i;
297 
298  // create all tables
300  if (argc == 0)
301  {
302  NDBT_Tables::dropAllTables(&ndb);
303  NDBT_Tables::createAllTables(&ndb);
304  for (i= 0; no_error && i < NDBT_Tables::getNumTables(); i++)
305  {
306  const NdbDictionary::Table *pTab= dict->getTable(NDBT_Tables::getTable(i)->getName());
307  if (pTab == 0)
308  {
309  ndbout << "Failed to create table" << endl;
310  ndbout << dict->getNdbError() << endl;
311  no_error= 0;
312  break;
313  }
314  pTabs.push_back(pTab);
315  }
316  }
317  else
318  {
319  for (i= 0; no_error && argc; argc--, i++)
320  {
321  dict->dropTable(argv[i]);
322  NDBT_Tables::createTable(&ndb, argv[i]);
323  const NdbDictionary::Table *pTab= dict->getTable(argv[i]);
324  if (pTab == 0)
325  {
326  ndbout << "Failed to create table" << endl;
327  ndbout << dict->getNdbError() << endl;
328  no_error= 0;
329  break;
330  }
331  pTabs.push_back(pTab);
332  }
333  }
334  pTabs.push_back(NULL);
335 
336  // create an event for each table
337  for (i= 0; no_error && pTabs[i]; i++)
338  {
339  HugoTransactions ht(*pTabs[i]);
340  if (ht.createEvent(&ndb)){
341  no_error= 0;
342  break;
343  }
344  }
345 
346  // create an event operation for each event
348  for (i= 0; no_error && pTabs[i]; i++)
349  {
350  char buf[1024];
351  sprintf(buf, "%s_EVENT", pTabs[i]->getName());
352  NdbEventOperation *pOp= ndb.createEventOperation(buf, 1000);
353  if ( pOp == NULL )
354  {
355  no_error= 0;
356  break;
357  }
358  pOps.push_back(pOp);
359  }
360 
361  // get storage for each event operation
362  for (i= 0; no_error && pTabs[i]; i++)
363  {
364  int n_columns= pTabs[i]->getNoOfColumns();
365  for (int j = 0; j < n_columns; j++) {
366  pOps[i]->getValue(pTabs[i]->getColumn(j)->getName());
367  pOps[i]->getPreValue(pTabs[i]->getColumn(j)->getName());
368  }
369  }
370 
371  // start receiving events
372  for (i= 0; no_error && pTabs[i]; i++)
373  {
374  if ( pOps[i]->execute() )
375  {
376  no_error= 0;
377  break;
378  }
379  }
380 
381  // create a "shadow" table for each table
383  for (i= 0; no_error && pTabs[i]; i++)
384  {
385  char buf[1024];
386  sprintf(buf, "%s_SHADOW", pTabs[i]->getName());
387 
388  dict->dropTable(buf);
389  if (dict->getTable(buf))
390  {
391  no_error= 0;
392  break;
393  }
394 
395  NdbDictionary::Table table_shadow(*pTabs[i]);
396  table_shadow.setName(buf);
397  dict->createTable(table_shadow);
398  pShadowTabs.push_back(dict->getTable(buf));
399  if (!pShadowTabs[i])
400  {
401  no_error= 0;
402  break;
403  }
404  }
405 
406  // create a hugo operation per table
407  Vector<HugoOperations *> hugo_ops;
408  for (i= 0; no_error && pTabs[i]; i++)
409  {
410  hugo_ops.push_back(new HugoOperations(*pTabs[i]));
411  }
412 
413  int n_records= 3;
414  // insert n_records records per table
415  do {
416  if (start_transaction(&ndb, hugo_ops))
417  {
418  no_error= 0;
419  break;
420  }
421  for (i= 0; no_error && pTabs[i]; i++)
422  {
423  hugo_ops[i]->pkInsertRecord(&ndb, 0, n_records);
424  }
425  if (execute_commit(&ndb, hugo_ops))
426  {
427  no_error= 0;
428  break;
429  }
430  if(close_transaction(&ndb, hugo_ops))
431  {
432  no_error= 0;
433  break;
434  }
435  } while(0);
436 
437  // copy events and verify
438  do {
439  if (copy_events(&ndb) < 0)
440  {
441  no_error= 0;
442  break;
443  }
444  if (verify_copy(&ndb, pTabs, pShadowTabs))
445  {
446  no_error= 0;
447  break;
448  }
449  } while (0);
450 
451  // update n_records-1 records in first table
452  do {
453  if (start_transaction(&ndb, hugo_ops))
454  {
455  no_error= 0;
456  break;
457  }
458 
459  hugo_ops[0]->pkUpdateRecord(&ndb, n_records-1);
460 
461  if (execute_commit(&ndb, hugo_ops))
462  {
463  no_error= 0;
464  break;
465  }
466  if(close_transaction(&ndb, hugo_ops))
467  {
468  no_error= 0;
469  break;
470  }
471  } while(0);
472 
473  // copy events and verify
474  do {
475  if (copy_events(&ndb) < 0)
476  {
477  no_error= 0;
478  break;
479  }
480  if (verify_copy(&ndb, pTabs, pShadowTabs))
481  {
482  no_error= 0;
483  break;
484  }
485  } while (0);
486 
487 
488  {
489  NdbRestarts restarts;
490  for (int j= 0; j < 10; j++)
491  {
492  // restart a node
493  if (no_error)
494  {
495  int timeout = 240;
496  if (restarts.executeRestart("RestartRandomNodeAbort", timeout))
497  {
498  no_error= 0;
499  break;
500  }
501  }
502 
503  // update all n_records records on all tables
504  if (start_transaction(&ndb, hugo_ops))
505  {
506  no_error= 0;
507  break;
508  }
509 
510  for (int r= 0; r < n_records; r++)
511  {
512  for (i= 0; pTabs[i]; i++)
513  {
514  hugo_ops[i]->pkUpdateRecord(&ndb, r);
515  }
516  }
517  if (execute_commit(&ndb, hugo_ops))
518  {
519  no_error= 0;
520  break;
521  }
522  if(close_transaction(&ndb, hugo_ops))
523  {
524  no_error= 0;
525  break;
526  }
527 
528  // copy events and verify
529  if (copy_events(&ndb) < 0)
530  {
531  no_error= 0;
532  break;
533  }
534  if (verify_copy(&ndb, pTabs, pShadowTabs))
535  {
536  no_error= 0;
537  break;
538  }
539  }
540  }
541 
542  // drop the event operations
543  for (i= 0; i < (int)pOps.size(); i++)
544  {
545  if (ndb.dropEventOperation(pOps[i]))
546  {
547  no_error= 0;
548  }
549  }
550 
551  if (no_error)
552  DBUG_RETURN(NDBT_ProgramExit(NDBT_OK));
553  DBUG_RETURN(NDBT_ProgramExit(NDBT_FAILED));
554 }
555 
556 template class Vector<HugoOperations *>;
557 template class Vector<NdbEventOperation *>;
558 template class Vector<NdbRecAttr*>;
559 template class Vector<Vector<NdbRecAttr*> >;