MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
testLcp.cpp
1 /*
2  Copyright (c) 2004, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 #include <NDBT.hpp>
19 #include <NdbApi.hpp>
20 #include <NdbRestarter.hpp>
21 #include <HugoOperations.hpp>
22 #include <HugoTransactions.hpp>
23 #include <UtilTransactions.hpp>
24 #include <signaldata/DumpStateOrd.hpp>
25 
26 #include <getarg.h>
27 #include <InputStream.hpp>
28 
29 struct CASE
30 {
31  bool start_row;
32  bool end_row;
33  bool curr_row;
34  const char * op1;
35  const char * op2;
36  const char * op3;
37  int val;
38 };
39 
40 static CASE g_op_types[] =
41 {
42  { false, true, false, "INS", 0, 0, 0 }, // 0x001 a
43  { true, true, false, "UPD", 0, 0, 0 }, // 0x002 d
44  { true, false, false, "DEL", 0, 0, 0 }, // 0x004 g
45 
46  { false, true, false, "INS", "UPD", 0, 0 }, // 0x008 b
47  { false, false, false, "INS", "DEL", 0, 0 }, // 0x010 c
48  { true, true, false, "UPD", "UPD", 0, 0 }, // 0x020 e
49  { true, false, false, "UPD", "DEL", 0, 0 }, // 0x040 f
50  { true, true, false, "DEL", "INS", 0, 0 }, // 0x080 h
51 
52  { false, true, false, "INS", "DEL", "INS", 0 }, // 0x100 i
53  { true, false, false, "DEL", "INS", "DEL", 0 } // 0x200 j
54 };
55 const size_t OP_COUNT = (sizeof(g_op_types)/sizeof(g_op_types[0]));
56 
57 static Ndb* g_ndb = 0;
58 static CASE* g_ops;
59 static Ndb_cluster_connection *g_cluster_connection= 0;
60 static HugoOperations* g_hugo_ops;
61 static int g_use_ops = 1 | 2 | 4;
62 static int g_cases = 0x1;
63 static int g_case_loop = 2;
64 static int g_rows = 10;
65 static int g_setup_tables = 1;
66 static int g_one_op_at_a_time = 0;
67 static const char * g_tablename = "T1";
68 static const NdbDictionary::Table* g_table = 0;
69 static NdbRestarter g_restarter;
70 
71 static int init_ndb(int argc, char** argv);
72 static int parse_args(int argc, char** argv);
73 static int connect_ndb();
74 static int drop_all_tables();
75 static int load_table();
76 static int pause_lcp(int error);
77 static int do_op(int row);
78 static int continue_lcp(int error = 0);
79 static int commit();
80 static int restart();
81 static int validate();
82 
83 int
84 main(int argc, char ** argv){
85  ndb_init();
86  require(!init_ndb(argc, argv));
87  if(parse_args(argc, argv))
88  return -1;
89  require(!connect_ndb());
90 
91  if(g_setup_tables){
92  require(!drop_all_tables());
93 
94  if(NDBT_Tables::createTable(g_ndb, g_tablename) != 0){
95  exit(-1);
96  }
97  }
98 
99  g_table = g_ndb->getDictionary()->getTable(g_tablename);
100  if(g_table == 0){
101  g_err << "Failed to retreive table: " << g_tablename << endl;
102  exit(-1);
103  }
104  require((g_hugo_ops = new HugoOperations(* g_table)) != 0);
105  require(!g_hugo_ops->startTransaction(g_ndb));
106 
107  g_ops= new CASE[g_rows];
108 
109  const int use_ops = g_use_ops;
110  for(size_t i = 0; i<OP_COUNT; i++)
111  {
112  if(g_one_op_at_a_time){
113  while(i < OP_COUNT && (use_ops & (1 << i)) == 0) i++;
114  if(i == OP_COUNT)
115  break;
116  ndbout_c("-- loop\noperation: %c use_ops: %x", int('a'+i), use_ops);
117  g_use_ops = (1 << i);
118  } else {
119  i = OP_COUNT - 1;
120  }
121 
122  size_t test_case = 0;
123  if((1 << test_case++) & g_cases)
124  {
125  for(size_t tl = 0; tl<(size_t)g_case_loop; tl++){
126  g_info << "Performing all ops wo/ inteference of LCP" << endl;
127 
128  g_info << "Testing pre LCP operations, ZLCP_OP_WRITE_RT_BREAK" << endl;
129  g_info << " where ZLCP_OP_WRITE_RT_BREAK is "
130  " finished before SAVE_PAGES" << endl;
131  require(!load_table());
132  require(!pause_lcp(5900));
133  for(size_t j = 0; j<(size_t)g_rows; j++){
134  require(!do_op(j));
135  }
136  require(!continue_lcp(5900));
137  require(!commit());
138  require(!pause_lcp(5900));
139  require(!restart());
140  require(!validate());
141  }
142  }
143 
144  if((1 << test_case++) & g_cases)
145  {
146  for(size_t tl = 0; tl<(size_t)g_case_loop; tl++){
147  g_info << "Testing pre LCP operations, ZLCP_OP_WRITE_RT_BREAK" << endl;
148  g_info << " where ZLCP_OP_WRITE_RT_BREAK is finished after SAVE_PAGES"
149  << endl;
150  require(!load_table());
151  require(!pause_lcp(5901));
152  for(size_t j = 0; j<(size_t)g_rows; j++){
153  require(!do_op(j));
154  }
155  require(!continue_lcp(5901));
156  require(!commit());
157  require(!pause_lcp(5900));
158  require(!restart());
159  require(!validate());
160  }
161  }
162 
163  if((1 << test_case++) & g_cases)
164  {
165  for(size_t tl = 0; tl<(size_t)g_case_loop; tl++){
166  g_info << "Testing pre LCP operations, undo-ed at commit" << endl;
167  require(!load_table());
168  require(!pause_lcp(5902));
169  for(size_t j = 0; j<(size_t)g_rows; j++){
170  require(!do_op(j));
171  }
172  require(!continue_lcp(5902));
173  require(!commit());
174  require(!continue_lcp(5903));
175  require(!pause_lcp(5900));
176  require(!restart());
177  require(!validate());
178  }
179  }
180 
181  if((1 << test_case++) & g_cases)
182  {
183  for(size_t tl = 0; tl<(size_t)g_case_loop; tl++){
184  g_info << "Testing prepared during LCP and committed after" << endl;
185  require(!load_table());
186  require(!pause_lcp(5904)); // Start LCP, but don't save pages
187  for(size_t j = 0; j<(size_t)g_rows; j++){
188  require(!do_op(j));
189  }
190  require(!continue_lcp(5904)); // Start ACC save pages
191  require(!pause_lcp(5900)); // Next LCP
192  require(!commit());
193  require(!restart());
194  require(!validate());
195  }
196  }
197  }
198 }
199 
200 static int init_ndb(int argc, char** argv)
201 {
202  ndb_init();
203  return 0;
204 }
205 
206 static int parse_args(int argc, char** argv)
207 {
208  size_t i;
209  char * ops= 0, *cases=0;
210  struct getargs args[] = {
211  { "records", 0, arg_integer, &g_rows, "Number of records", "records" },
212  { "operations", 'o', arg_string, &ops, "Operations [a-h]", 0 },
213  { "1", '1', arg_flag, &g_one_op_at_a_time, "One op at a time", 0 },
214  { "0", '0', arg_negative_flag, &g_one_op_at_a_time, "All ops at once", 0 },
215  { "cases", 'c', arg_string, &cases, "Cases [a-c]", 0 },
216  { 0, 't', arg_flag, &g_setup_tables, "Create table", 0 },
217  { 0, 'u', arg_negative_flag, &g_setup_tables, "Dont create table", 0 }
218  };
219 
220  int optind= 0;
221  const int num_args = sizeof(args)/sizeof(args[0]);
222  if(getarg(args, num_args, argc, (const char**)argv, &optind)) {
223  arg_printusage(args, num_args, argv[0], " tabname1\n");
224  ndbout_c("\n -- Operations [a-%c] = ", int('a'+OP_COUNT-1));
225  for(i = 0; i<OP_COUNT; i++){
226  ndbout_c("\t%c = %s %s",
227  int('a'+i), g_op_types[i].op1,
228  g_op_types[i].op2 ? g_op_types[i].op2 : "");
229  }
230  return -1;
231  }
232 
233  if(ops != 0){
234  g_use_ops = 0;
235  char * s = ops;
236  while(* s)
237  g_use_ops |= (1 << ((* s++) - 'a'));
238  }
239 
240  if(cases != 0){
241  g_cases = 0;
242  char * s = cases;
243  while(* s)
244  g_cases |= (1 << ((* s++) - 'a'));
245  }
246 
247  ndbout_c("table: %s", g_tablename);
248  printf("operations: ");
249  for(i = 0; i<OP_COUNT; i++)
250  if(g_use_ops & (1 << i))
251  printf("%c", int('a'+i));
252  printf("\n");
253 
254  printf("test cases: ");
255  for(i = 0; i<3; i++)
256  if(g_cases & (1 << i))
257  printf("%c", int('1'+i));
258  printf("\n");
259  printf("-------------\n");
260  return 0;
261 }
262 
263 static int connect_ndb()
264 {
265  g_cluster_connection = new Ndb_cluster_connection();
266  if(g_cluster_connection->connect(12, 5, 1) != 0)
267  {
268  return 1;
269  }
270 
271  g_ndb = new Ndb(g_cluster_connection, "TEST_DB");
272  g_ndb->init(256);
273  if(g_ndb->waitUntilReady(30) == 0){
274  return 0;
275 // int args[] = { DumpStateOrd::DihMaxTimeBetweenLCP };
276 // return g_restarter.dumpStateAllNodes(args, 1);
277  }
278  return -1;
279 }
280 
281 static int disconnect_ndb()
282 {
283  delete g_ndb;
284  delete g_cluster_connection;
285  g_ndb = 0;
286  g_table = 0;
287  g_cluster_connection= 0;
288  return 0;
289 }
290 
291 static int drop_all_tables()
292 {
293  NdbDictionary::Dictionary * dict = g_ndb->getDictionary();
294  require(dict);
295 
296  BaseString db = g_ndb->getDatabaseName();
297  BaseString schema = g_ndb->getSchemaName();
298 
300  if (dict->listObjects(list, NdbDictionary::Object::TypeUndefined) == -1){
301  g_err << "Failed to list tables: " << endl
302  << dict->getNdbError() << endl;
303  return -1;
304  }
305  for (unsigned i = 0; i < list.count; i++) {
307  switch (elt.type) {
310  g_ndb->setDatabaseName(elt.database);
311  g_ndb->setSchemaName(elt.schema);
312  if(dict->dropTable(elt.name) != 0){
313  g_err << "Failed to drop table: "
314  << elt.database << "/" << elt.schema << "/" << elt.name <<endl;
315  g_err << dict->getNdbError() << endl;
316  return -1;
317  }
318  break;
325  default:
326  break;
327  }
328  }
329 
330  g_ndb->setDatabaseName(db.c_str());
331  g_ndb->setSchemaName(schema.c_str());
332 
333  return 0;
334 }
335 
336 static int load_table()
337 {
338  UtilTransactions clear(* g_table);
339  require(!clear.clearTable(g_ndb));
340 
341  HugoOperations ops(* g_table);
342  require(!ops.startTransaction(g_ndb));
343  size_t op = 0;
344  size_t rows = 0;
345  size_t uncommitted = 0;
346  //bool prepared = false;
347  for(size_t i = 0; i<(size_t)g_rows; i++){
348  for(op %= OP_COUNT; !((1 << op) & g_use_ops); op = (op + 1) % OP_COUNT);
349  g_ops[i] = g_op_types[op++];
350  if(g_ops[i].start_row){
351  g_ops[i].curr_row = true;
352  g_ops[i].val = rand();
353  require(!ops.pkInsertRecord(g_ndb, i, 1, g_ops[i].val));
354  uncommitted++;
355  } else {
356  g_ops[i].curr_row = false;
357  }
358  if(uncommitted >= 100){
359  require(!ops.execute_Commit(g_ndb));
360  require(!ops.getTransaction()->restart());
361  rows += uncommitted;
362  uncommitted = 0;
363  }
364  }
365  if(uncommitted)
366  require(!ops.execute_Commit(g_ndb));
367 
368  require(!ops.closeTransaction(g_ndb));
369  rows += uncommitted;
370  g_info << "Inserted " << rows << " rows" << endl;
371  return 0;
372 }
373 
374 static int pause_lcp(int error)
375 {
376  int nodes = g_restarter.getNumDbNodes();
377 
378  int filter[] = { 15, NDB_MGM_EVENT_CATEGORY_INFO, 0 };
379 
380  NDB_SOCKET_TYPE my_fd;
381 #ifdef NDB_WIN
382  SOCKET fd= ndb_mgm_listen_event(g_restarter.handle, filter);
383  my_fd.s= fd;
384 #else
385  int fd = ndb_mgm_listen_event(g_restarter.handle, filter);
386  my_fd.fd= fd;
387 #endif
388 
389  require(my_socket_valid(my_fd));
390  require(!g_restarter.insertErrorInAllNodes(error));
391  int dump[] = { DumpStateOrd::DihStartLcpImmediately };
392  require(!g_restarter.dumpStateAllNodes(dump, 1));
393 
394  char *tmp;
395  char buf[1024];
396  SocketInputStream in(my_fd, 1000);
397  int count = 0;
398  do {
399  tmp = in.gets(buf, 1024);
400  if(tmp)
401  {
402  int id;
403  if(sscanf(tmp, "%*[^:]: LCP: %d ", &id) == 1 && id == error &&
404  --nodes == 0){
405  my_socket_close(my_fd);
406  return 0;
407  }
408  }
409  } while(count++ < 30);
410 
411  my_socket_close(my_fd);
412  return -1;
413 }
414 
415 static int do_op(int row)
416 {
417  HugoOperations & ops = * g_hugo_ops;
418  if(strcmp(g_ops[row].op1, "INS") == 0){
419  require(!g_ops[row].curr_row);
420  g_ops[row].curr_row = true;
421  g_ops[row].val = rand();
422  require(!ops.pkInsertRecord(g_ndb, row, 1, g_ops[row].val));
423  } else if(strcmp(g_ops[row].op1, "UPD") == 0){
424  require(g_ops[row].curr_row);
425  g_ops[row].val = rand();
426  require(!ops.pkUpdateRecord(g_ndb, row, 1, g_ops[row].val));
427  } else if(strcmp(g_ops[row].op1, "DEL") == 0){
428  require(g_ops[row].curr_row);
429  g_ops[row].curr_row = false;
430  require(!ops.pkDeleteRecord(g_ndb, row, 1));
431  }
432 
433  require(!ops.execute_NoCommit(g_ndb));
434 
435  if(g_ops[row].op2 == 0){
436  } else if(strcmp(g_ops[row].op2, "INS") == 0){
437  require(!g_ops[row].curr_row);
438  g_ops[row].curr_row = true;
439  g_ops[row].val = rand();
440  require(!ops.pkInsertRecord(g_ndb, row, 1, g_ops[row].val));
441  } else if(strcmp(g_ops[row].op2, "UPD") == 0){
442  require(g_ops[row].curr_row);
443  g_ops[row].val = rand();
444  require(!ops.pkUpdateRecord(g_ndb, row, 1, g_ops[row].val));
445  } else if(strcmp(g_ops[row].op2, "DEL") == 0){
446  require(g_ops[row].curr_row);
447  g_ops[row].curr_row = false;
448  require(!ops.pkDeleteRecord(g_ndb, row, 1));
449  }
450 
451  if(g_ops[row].op2 != 0)
452  require(!ops.execute_NoCommit(g_ndb));
453 
454  if(g_ops[row].op3 == 0){
455  } else if(strcmp(g_ops[row].op3, "INS") == 0){
456  require(!g_ops[row].curr_row);
457  g_ops[row].curr_row = true;
458  g_ops[row].val = rand();
459  require(!ops.pkInsertRecord(g_ndb, row, 1, g_ops[row].val));
460  } else if(strcmp(g_ops[row].op3, "UPD") == 0){
461  require(g_ops[row].curr_row);
462  g_ops[row].val = rand();
463  require(!ops.pkUpdateRecord(g_ndb, row, 1, g_ops[row].val));
464  } else if(strcmp(g_ops[row].op3, "DEL") == 0){
465  require(g_ops[row].curr_row);
466  g_ops[row].curr_row = false;
467  require(!ops.pkDeleteRecord(g_ndb, row, 1));
468  }
469 
470  if(g_ops[row].op3 != 0)
471  require(!ops.execute_NoCommit(g_ndb));
472 
473  return 0;
474 }
475 
476 static int continue_lcp(int error)
477 {
478  int filter[] = { 15, NDB_MGM_EVENT_CATEGORY_INFO, 0 };
479  NDB_SOCKET_TYPE my_fd;
480  my_socket_invalidate(&my_fd);
481 #ifdef NDB_WIN
482  SOCKET fd;
483 #else
484  int fd;
485 #endif
486 
487  if(error){
488  fd = ndb_mgm_listen_event(g_restarter.handle, filter);
489 #ifdef NDB_WIN
490  my_fd.s= fd;
491 #else
492  my_fd.fd= fd;
493 #endif
494  require(my_socket_valid(my_fd));
495  }
496 
497  int args[] = { DumpStateOrd::LCPContinue };
498  if(g_restarter.dumpStateAllNodes(args, 1) != 0)
499  return -1;
500 
501  if(error){
502  char *tmp;
503  char buf[1024];
504  SocketInputStream in(my_fd, 1000);
505  int count = 0;
506  int nodes = g_restarter.getNumDbNodes();
507  do {
508  tmp = in.gets(buf, 1024);
509  if(tmp)
510  {
511  int id;
512  if(sscanf(tmp, "%*[^:]: LCP: %d ", &id) == 1 && id == error &&
513  --nodes == 0){
514  my_socket_close(my_fd);
515  return 0;
516  }
517  }
518  } while(count++ < 30);
519 
520  my_socket_close(my_fd);
521  }
522  return 0;
523 }
524 
525 static int commit()
526 {
527  HugoOperations & ops = * g_hugo_ops;
528  int res = ops.execute_Commit(g_ndb);
529  if(res == 0){
530  return ops.getTransaction()->restart();
531  }
532  return res;
533 }
534 
535 static int restart()
536 {
537  g_info << "Restarting cluster" << endl;
538  g_hugo_ops->closeTransaction(g_ndb);
539  disconnect_ndb();
540  delete g_hugo_ops;
541 
542  require(!g_restarter.restartAll());
543  require(!g_restarter.waitClusterStarted(30));
544  require(!connect_ndb());
545 
546  g_table = g_ndb->getDictionary()->getTable(g_tablename);
547  require(g_table);
548  require((g_hugo_ops = new HugoOperations(* g_table)) != 0);
549  require(!g_hugo_ops->startTransaction(g_ndb));
550  return 0;
551 }
552 
553 static int validate()
554 {
555  HugoOperations ops(* g_table);
556  for(size_t i = 0; i<(size_t)g_rows; i++){
557  require(g_ops[i].curr_row == g_ops[i].end_row);
558  require(!ops.startTransaction(g_ndb));
559  ops.pkReadRecord(g_ndb, i, 1);
560  int res = ops.execute_Commit(g_ndb);
561  if(g_ops[i].curr_row){
562  require(res == 0 && ops.verifyUpdatesValue(g_ops[i].val) == 0);
563  } else {
564  require(res == 626);
565  }
566  ops.closeTransaction(g_ndb);
567  }
568 
569  for(size_t j = 0; j<10; j++){
570  UtilTransactions clear(* g_table);
571  require(!clear.clearTable(g_ndb));
572 
573  HugoTransactions trans(* g_table);
574  require(trans.loadTable(g_ndb, 1024) == 0);
575  }
576  return 0;
577 }
578