MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
testDeadlock.cpp
1 /*
2  Copyright (C) 2004-2006 MySQL AB, 2008-2010 Sun Microsystems, Inc.
3  All rights reserved. Use is subject to license terms.
4 
5  This program is free software; you can redistribute it and/or modify
6  it under the terms of the GNU General Public License as published by
7  the Free Software Foundation; version 2 of the License.
8 
9  This program is distributed in the hope that it will be useful,
10  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  GNU General Public License for more details.
13 
14  You should have received a copy of the GNU General Public License
15  along with this program; if not, write to the Free Software
16  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18 
19 #include <ndb_global.h>
20 #include <NdbMain.h>
21 #include <NdbApi.hpp>
22 #include <NdbOut.hpp>
23 #include <NdbMutex.h>
24 #include <NdbCondition.h>
25 #include <NdbThread.h>
26 #include <NdbTest.hpp>
27 
28 struct Opt {
29  bool m_dbg;
30  const char* m_scan;
31  const char* m_tname;
32  const char* m_xname;
33  Opt() :
34  m_dbg(true),
35  m_scan("tx"),
36  m_tname("T"),
37  m_xname("X")
38  {}
39 };
40 
41 static void
42 printusage()
43 {
44  Opt d;
45  ndbout
46  << "usage: testDeadlock" << endl
47  << "-scan tx scan table, index [" << d.m_scan << "]" << endl
48  ;
49 }
50 
51 static Opt g_opt;
52 
53 static NdbMutex *ndbout_mutex= NULL;
54 static Ndb_cluster_connection *g_cluster_connection= 0;
55 #define DBG(x) \
56  do { \
57  if (! g_opt.m_dbg) break; \
58  NdbMutex_Lock(ndbout_mutex); \
59  ndbout << "line " << __LINE__ << " " << x << endl; \
60  NdbMutex_Unlock(ndbout_mutex); \
61  } while (0)
62 
63 #define CHK(x) \
64  do { \
65  if (x) break; \
66  ndbout << "line " << __LINE__ << ": " << #x << " failed" << endl; \
67  return -1; \
68  } while (0)
69 
70 #define CHN(p, x) \
71  do { \
72  if (x) break; \
73  ndbout << "line " << __LINE__ << ": " << #x << " failed" << endl; \
74  ndbout << (p)->getNdbError() << endl; \
75  return -1; \
76  } while (0)
77 
78 // threads
79 
80 typedef int (*Runstep)(struct Thr& thr);
81 
82 struct Thr {
83  enum State { Wait, Start, Stop, Stopped, Exit };
84  State m_state;
85  int m_no;
86  Runstep m_runstep;
87  int m_ret;
88  NdbMutex* m_mutex;
89  NdbCondition* m_cond;
90  NdbThread* m_thread;
91  void* m_status;
92  Ndb* m_ndb;
93  NdbConnection* m_con;
94  NdbScanOperation* m_scanop;
95  NdbIndexScanOperation* m_indexscanop;
96  //
97  Thr(int no);
98  ~Thr();
99  int run();
100  void start(Runstep runstep);
101  void stop();
102  void stopped();
103  void lock() { NdbMutex_Lock(m_mutex); }
104  void unlock() { NdbMutex_Unlock(m_mutex); }
105  void wait() { NdbCondition_Wait(m_cond, m_mutex); }
106  void signal() { NdbCondition_Signal(m_cond); }
107  void exit();
108  void join() { NdbThread_WaitFor(m_thread, &m_status); }
109 };
110 
111 static NdbOut&
112 operator<<(NdbOut& out, const Thr& thr) {
113  out << "thr " << thr.m_no;
114  return out;
115 }
116 
117 extern "C" { static void* runthread(void* arg); }
118 
119 Thr::Thr(int no)
120 {
121  m_state = Wait;
122  m_no = no;
123  m_runstep = 0;
124  m_ret = 0;
125  m_mutex = NdbMutex_Create();
126  m_cond = NdbCondition_Create();
127  assert(m_mutex != 0 && m_cond != 0);
128  const unsigned stacksize = 256 * 1024;
129  const NDB_THREAD_PRIO prio = NDB_THREAD_PRIO_LOW;
130  m_thread = NdbThread_Create(runthread, (void**)this, stacksize, "me", prio);
131  if (m_thread == 0) {
132  DBG("create thread failed: errno=" << errno);
133  m_ret = -1;
134  }
135  m_status = 0;
136  m_ndb = 0;
137  m_con = 0;
138  m_scanop = 0;
139  m_indexscanop = 0;
140 }
141 
142 Thr::~Thr()
143 {
144  if (m_thread != 0)
145  NdbThread_Destroy(&m_thread);
146  if (m_cond != 0)
147  NdbCondition_Destroy(m_cond);
148  if (m_mutex != 0)
149  NdbMutex_Destroy(m_mutex);
150 }
151 
152 static void*
153 runthread(void* arg) {
154  Thr& thr = *(Thr*)arg;
155  thr.run();
156  return 0;
157 }
158 
159 int
160 Thr::run()
161 {
162  DBG(*this << " run");
163  while (true) {
164  lock();
165  while (m_state != Start && m_state != Exit) {
166  wait();
167  }
168  if (m_state == Exit) {
169  DBG(*this << " exit");
170  unlock();
171  break;
172  }
173  m_ret = (*m_runstep)(*this);
174  m_state = Stopped;
175  signal();
176  unlock();
177  if (m_ret != 0) {
178  DBG(*this << " error exit");
179  break;
180  }
181  }
182  delete m_ndb;
183  m_ndb = 0;
184  return 0;
185 }
186 
187 void
188 Thr::start(Runstep runstep)
189 {
190  lock();
191  m_state = Start;
192  m_runstep = runstep;
193  signal();
194  unlock();
195 }
196 
197 void
198 Thr::stopped()
199 {
200  lock();
201  while (m_state != Stopped) {
202  wait();
203  }
204  m_state = Wait;
205  unlock();
206 }
207 
208 void
209 Thr::exit()
210 {
211  lock();
212  m_state = Exit;
213  signal();
214  unlock();
215 }
216 
217 // general
218 
219 static int
220 runstep_connect(Thr& thr)
221 {
222  Ndb* ndb = thr.m_ndb = new Ndb(g_cluster_connection, "TEST_DB");
223  CHN(ndb, ndb->init() == 0);
224  CHN(ndb, ndb->waitUntilReady() == 0);
225  DBG(thr << " connected");
226  return 0;
227 }
228 
229 static int
230 runstep_starttx(Thr& thr)
231 {
232  Ndb* ndb = thr.m_ndb;
233  assert(ndb != 0);
234  CHN(ndb, (thr.m_con = ndb->startTransaction()) != 0);
235  DBG("thr " << thr.m_no << " tx started");
236  return 0;
237 }
238 
239 /*
240  * WL1822 flush locks
241  *
242  * Table T with 3 tuples X, Y, Z.
243  * Two transactions (* = lock wait).
244  *
245  * - tx1 reads and locks Z
246  * - tx2 scans X, Y, *Z
247  * - tx2 returns X, Y before lock wait on Z
248  * - tx1 reads and locks *X
249  * - api asks for next tx2 result
250  * - LQH unlocks X via ACC or TUX [*]
251  * - tx1 gets lock on X
252  * - tx1 returns X to api
253  * - api commits tx1
254  * - tx2 gets lock on Z
255  * - tx2 returs Z to api
256  *
257  * The point is deadlock is avoided due to [*].
258  * The test is for 1 db node and 1 fragment table.
259  */
260 
261 static char wl1822_scantx = 0;
262 
263 static const Uint32 wl1822_valA[3] = { 0, 1, 2 };
264 static const Uint32 wl1822_valB[3] = { 3, 4, 5 };
265 
266 static Uint32 wl1822_bufA = ~0;
267 static Uint32 wl1822_bufB = ~0;
268 
269 // map scan row to key (A) and reverse
270 static unsigned wl1822_r2k[3] = { 0, 0, 0 };
271 static unsigned wl1822_k2r[3] = { 0, 0, 0 };
272 
273 static int
274 wl1822_createtable(Thr& thr)
275 {
276  Ndb* ndb = thr.m_ndb;
277  assert(ndb != 0);
279  // drop T
280  if (dic->getTable(g_opt.m_tname) != 0)
281  CHN(dic, dic->dropTable(g_opt.m_tname) == 0);
282  // create T
283  NdbDictionary::Table tab(g_opt.m_tname);
284  tab.setFragmentType(NdbDictionary::Object::FragAllSmall);
285  { NdbDictionary::Column col("A");
286  col.setType(NdbDictionary::Column::Unsigned);
287  col.setPrimaryKey(true);
288  tab.addColumn(col);
289  }
290  { NdbDictionary::Column col("B");
291  col.setType(NdbDictionary::Column::Unsigned);
292  col.setPrimaryKey(false);
293  tab.addColumn(col);
294  }
295  CHN(dic, dic->createTable(tab) == 0);
296  // create X
297  NdbDictionary::Index ind(g_opt.m_xname);
298  ind.setTable(g_opt.m_tname);
300  ind.setLogging(false);
301  ind.addColumn("B");
302  CHN(dic, dic->createIndex(ind) == 0);
303  DBG("created " << g_opt.m_tname << ", " << g_opt.m_xname);
304  return 0;
305 }
306 
307 static int
308 wl1822_insertrows(Thr& thr)
309 {
310  // insert X, Y, Z
311  Ndb* ndb = thr.m_ndb;
312  assert(ndb != 0);
313  NdbConnection* con;
314  NdbOperation* op;
315  for (unsigned k = 0; k < 3; k++) {
316  CHN(ndb, (con = ndb->startTransaction()) != 0);
317  CHN(con, (op = con->getNdbOperation(g_opt.m_tname)) != 0);
318  CHN(op, op->insertTuple() == 0);
319  CHN(op, op->equal("A", (char*)&wl1822_valA[k]) == 0);
320  CHN(op, op->setValue("B", (char*)&wl1822_valB[k]) == 0);
321  CHN(con, con->execute(Commit) == 0);
322  ndb->closeTransaction(con);
323  }
324  DBG("inserted X, Y, Z");
325  return 0;
326 }
327 
328 static int
329 wl1822_getscanorder(Thr& thr)
330 {
331  // cheat, table order happens to be key order in my test
332  wl1822_r2k[0] = 0;
333  wl1822_r2k[1] = 1;
334  wl1822_r2k[2] = 2;
335  wl1822_k2r[0] = 0;
336  wl1822_k2r[1] = 1;
337  wl1822_k2r[2] = 2;
338  DBG("scan order determined");
339  return 0;
340 }
341 
342 static int
343 wl1822_tx1_readZ(Thr& thr)
344 {
345  // tx1 read Z with exclusive lock
346  NdbConnection* con = thr.m_con;
347  assert(con != 0);
348  NdbOperation* op;
349  CHN(con, (op = con->getNdbOperation(g_opt.m_tname)) != 0);
350  CHN(op, op->readTupleExclusive() == 0);
351  CHN(op, op->equal("A", wl1822_valA[wl1822_r2k[2]]) == 0);
352  wl1822_bufB = ~0;
353  CHN(op, op->getValue("B", (char*)&wl1822_bufB) != 0);
354  CHN(con, con->execute(NoCommit) == 0);
355  CHK(wl1822_bufB == wl1822_valB[wl1822_r2k[2]]);
356  DBG("tx1 locked Z");
357  return 0;
358 }
359 
360 static int
361 wl1822_tx2_scanXY(Thr& thr)
362 {
363  // tx2 scan X, Y with exclusive lock
364  NdbConnection* con = thr.m_con;
365  assert(con != 0);
366  NdbScanOperation* scanop;
367  NdbIndexScanOperation* indexscanop;
368 
369  if (wl1822_scantx == 't') {
370  CHN(con, (scanop = thr.m_scanop = con->getNdbScanOperation(g_opt.m_tname)) != 0);
371  DBG("tx2 scan exclusive " << g_opt.m_tname);
372  }
373  if (wl1822_scantx == 'x') {
374  CHN(con, (scanop = thr.m_scanop = indexscanop = thr.m_indexscanop = con->getNdbIndexScanOperation(g_opt.m_xname, g_opt.m_tname)) != 0);
375  DBG("tx2 scan exclusive " << g_opt.m_xname);
376  }
377  CHN(scanop, scanop->readTuplesExclusive(16) == 0);
378  CHN(scanop, scanop->getValue("A", (char*)&wl1822_bufA) != 0);
379  CHN(scanop, scanop->getValue("B", (char*)&wl1822_bufB) != 0);
380  CHN(con, con->execute(NoCommit) == 0);
381  unsigned row = 0;
382  while (row < 2) {
383  DBG("before row " << row);
384  int ret;
385  wl1822_bufA = wl1822_bufB = ~0;
386  CHN(con, (ret = scanop->nextResult(true)) == 0);
387  DBG("got row " << row << " a=" << wl1822_bufA << " b=" << wl1822_bufB);
388  CHK(wl1822_bufA == wl1822_valA[wl1822_r2k[row]]);
389  CHK(wl1822_bufB == wl1822_valB[wl1822_r2k[row]]);
390  row++;
391  }
392  return 0;
393 }
394 
395 static int
396 wl1822_tx1_readX_commit(Thr& thr)
397 {
398  // tx1 read X with exclusive lock and commit
399  NdbConnection* con = thr.m_con;
400  assert(con != 0);
401  NdbOperation* op;
402  CHN(con, (op = con->getNdbOperation(g_opt.m_tname)) != 0);
403  CHN(op, op->readTupleExclusive() == 0);
404  CHN(op, op->equal("A", wl1822_valA[wl1822_r2k[2]]) == 0);
405  wl1822_bufB = ~0;
406  CHN(op, op->getValue("B", (char*)&wl1822_bufB) != 0);
407  CHN(con, con->execute(NoCommit) == 0);
408  CHK(wl1822_bufB == wl1822_valB[wl1822_r2k[2]]);
409  DBG("tx1 locked X");
410  CHN(con, con->execute(Commit) == 0);
411  DBG("tx1 commit");
412  return 0;
413 }
414 
415 static int
416 wl1822_tx2_scanZ_close(Thr& thr)
417 {
418  // tx2 scan Z with exclusive lock and close scan
419  Ndb* ndb = thr.m_ndb;
420  NdbConnection* con = thr.m_con;
421  NdbScanOperation* scanop = thr.m_scanop;
422  assert(ndb != 0 && con != 0 && scanop != 0);
423  unsigned row = 2;
424  while (true) {
425  DBG("before row " << row);
426  int ret;
427  wl1822_bufA = wl1822_bufB = ~0;
428  CHN(con, (ret = scanop->nextResult(true)) == 0 || ret == 1);
429  if (ret == 1)
430  break;
431  DBG("got row " << row << " a=" << wl1822_bufA << " b=" << wl1822_bufB);
432  CHK(wl1822_bufA == wl1822_valA[wl1822_r2k[row]]);
433  CHK(wl1822_bufB == wl1822_valB[wl1822_r2k[row]]);
434  row++;
435  }
436  ndb->closeTransaction(con);
437  CHK(row == 3);
438  return 0;
439 }
440 
441 // threads are synced between each step
442 static Runstep wl1822_step[][2] = {
443  { runstep_connect, runstep_connect },
444  { wl1822_createtable, 0 },
445  { wl1822_insertrows, 0 },
446  { wl1822_getscanorder, 0 },
447  { runstep_starttx, runstep_starttx },
448  { wl1822_tx1_readZ, 0 },
449  { 0, wl1822_tx2_scanXY },
450  { wl1822_tx1_readX_commit, wl1822_tx2_scanZ_close }
451 };
452 const unsigned wl1822_stepcount = sizeof(wl1822_step)/sizeof(wl1822_step[0]);
453 
454 static int
455 wl1822_main(char scantx)
456 {
457  wl1822_scantx = scantx;
458  static const unsigned thrcount = 2;
459  // create threads for tx1 and tx2
460  Thr* thrlist[2];
461  unsigned n;
462  for (n = 0; n < thrcount; n++) {
463  Thr& thr = *(thrlist[n] = new Thr(1 + n));
464  CHK(thr.m_ret == 0);
465  }
466  // run the steps
467  for (unsigned i = 0; i < wl1822_stepcount; i++) {
468  DBG("step " << i << " start");
469  for (n = 0; n < thrcount; n++) {
470  Thr& thr = *thrlist[n];
471  Runstep runstep = wl1822_step[i][n];
472  if (runstep != 0)
473  thr.start(runstep);
474  }
475  for (n = 0; n < thrcount; n++) {
476  Thr& thr = *thrlist[n];
477  Runstep runstep = wl1822_step[i][n];
478  if (runstep != 0)
479  thr.stopped();
480  }
481  }
482  // delete threads
483  for (n = 0; n < thrcount; n++) {
484  Thr& thr = *thrlist[n];
485  thr.exit();
486  thr.join();
487  delete &thr;
488  }
489  return 0;
490 }
491 
492 NDB_COMMAND(testOdbcDriver, "testDeadlock", "testDeadlock", "testDeadlock", 65535)
493 {
494  ndb_init();
495  if (ndbout_mutex == NULL)
496  ndbout_mutex= NdbMutex_Create();
497  while (++argv, --argc > 0) {
498  const char* arg = argv[0];
499  if (strcmp(arg, "-scan") == 0) {
500  if (++argv, --argc > 0) {
501  g_opt.m_scan = strdup(argv[0]);
502  continue;
503  }
504  }
505  printusage();
506  return NDBT_ProgramExit(NDBT_WRONGARGS);
507  }
508 
510  if(con.connect(12, 5, 1) != 0)
511  {
512  return NDBT_ProgramExit(NDBT_FAILED);
513  }
514  g_cluster_connection= &con;
515 
516  if ((strchr(g_opt.m_scan, 't') != 0 && wl1822_main('t') == -1) ||
517  (strchr(g_opt.m_scan, 'x') != 0 && wl1822_main('x') == -1))
518  {
519  return NDBT_ProgramExit(NDBT_FAILED);
520  }
521  return NDBT_ProgramExit(NDBT_OK);
522 }
523 
524 // vim: set sw=2 et: