MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
CrundNdbApiOperations.cpp
1 /* -*- mode: java; c-basic-offset: 4; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=4:tabstop=4:smarttab:
3  *
4  * Copyright (c) 2010, 2011, Oracle and/or its affiliates. All rights reserved.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; version 2 of the License.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18  */
19 
20 #include <iostream>
21 #include <vector>
22 #include <algorithm>
23 #include <string>
24 #include <cstring>
25 #include <cassert>
26 
27 #include <NdbApi.hpp>
28 #include <NdbError.hpp>
29 
30 #include "helpers.hpp"
31 #include "string_helpers.hpp"
32 
33 #include "CrundNdbApiOperations.hpp"
34 
35 //using namespace std;
36 using std::cout;
37 using std::flush;
38 using std::endl;
39 using std::string;
40 using std::vector;
41 
42 // JNI crashes with gcc & operator<<(ostream &, long/int)
43 using utils::toString;
44 
45 /************************************************************
46  * Helper Macros & Functions
47  ************************************************************/
48 
49 // This benchmark's error handling of NDBAPI calls is rigorous but crude:
50 // - all calls' return value is checked for errors
51 // - all errors are reported and then followed by a process exit
52 
53 /*
54 // JNI crashes with gcc & operator<<(ostream &, long/int)
55 #define ABORT_NDB_ERROR0(error) \
56  do { cout << "!!! error in " << __FILE__ << ", line: " << __LINE__ \
57  << ", code: " << (int)error.code \
58  << ", msg: " << error.message << "." << endl; \
59  exit(-1); \
60  } while (0)
61 */
62 #define ABORT_NDB_ERROR(error) \
63  do { \
64  char l[1024]; \
65  sprintf(l, "%d", __LINE__); \
66  char c[1024]; \
67  sprintf(c, "%d", error.code); \
68  cout << endl << "!!! error in " << __FILE__ \
69  << ", line: " << l << "," << endl; \
70  cout << " error code: " << c \
71  << ", error msg: " << error.message << "." << endl; \
72  exit(-1); \
73  } while (0)
74 
75 #define VERIFY(cond) \
76  if (cond); else ABORT_ERROR("wrong data; verification failed")
77 
78 /************************************************************
79  * Member Functions of Class CrundModel
80  ************************************************************/
81 
82 void
83 CrundModel::init(Ndb* ndb)
84 {
85  const NdbDictionary::Dictionary* dict = ndb->getDictionary();
86 
87  // get columns of table A
88  if ((table_A = dict->getTable("a")) == NULL)
89  ABORT_NDB_ERROR(dict->getNdbError());
90  if ((column_A_id = table_A->getColumn("id")) == NULL)
91  ABORT_NDB_ERROR(dict->getNdbError());
92  if ((column_A_cint = table_A->getColumn("cint")) == NULL)
93  ABORT_NDB_ERROR(dict->getNdbError());
94  if ((column_A_clong = table_A->getColumn("clong")) == NULL)
95  ABORT_NDB_ERROR(dict->getNdbError());
96  if ((column_A_cfloat = table_A->getColumn("cfloat")) == NULL)
97  ABORT_NDB_ERROR(dict->getNdbError());
98  if ((column_A_cdouble = table_A->getColumn("cdouble")) == NULL)
99  ABORT_NDB_ERROR(dict->getNdbError());
100 
101  // get columns of table B0
102  if ((table_B0 = dict->getTable("b0")) == NULL)
103  ABORT_NDB_ERROR(dict->getNdbError());
104  if ((column_B0_id = table_B0->getColumn("id")) == NULL)
105  ABORT_NDB_ERROR(dict->getNdbError());
106  if ((column_B0_cint = table_B0->getColumn("cint")) == NULL)
107  ABORT_NDB_ERROR(dict->getNdbError());
108  if ((column_B0_clong = table_B0->getColumn("clong")) == NULL)
109  ABORT_NDB_ERROR(dict->getNdbError());
110  if ((column_B0_cfloat = table_B0->getColumn("cfloat")) == NULL)
111  ABORT_NDB_ERROR(dict->getNdbError());
112  if ((column_B0_cdouble = table_B0->getColumn("cdouble")) == NULL)
113  ABORT_NDB_ERROR(dict->getNdbError());
114  if ((column_B0_a_id = table_B0->getColumn("a_id")) == NULL)
115  ABORT_NDB_ERROR(dict->getNdbError());
116  if ((column_B0_cvarbinary_def = table_B0->getColumn("cvarbinary_def")) == NULL)
117  ABORT_NDB_ERROR(dict->getNdbError());
118  if ((column_B0_cvarchar_def = table_B0->getColumn("cvarchar_def")) == NULL)
119  ABORT_NDB_ERROR(dict->getNdbError());
120 
121  // get indexes of table B0
122  if ((idx_B0_a_id = dict->getIndex("I_B0_FK", "b0")) == NULL)
123  ABORT_NDB_ERROR(dict->getNdbError());
124 
125  // get common attribute ids for tables A, B0
126  attr_id = column_A_id->getAttrId();
127  if (attr_id != column_B0_id->getAttrId())
128  ABORT_ERROR("attribute id mismatch");
129  attr_cint = column_A_cint->getAttrId();
130  if (attr_cint != column_B0_cint->getAttrId())
131  ABORT_ERROR("attribute id mismatch");
132  attr_clong = column_A_clong->getAttrId();
133  if (attr_clong != column_B0_clong->getAttrId())
134  ABORT_ERROR("attribute id mismatch");
135  attr_cfloat = column_A_cfloat->getAttrId();
136  if (attr_cfloat != column_B0_cfloat->getAttrId())
137  ABORT_ERROR("attribute id mismatch");
138  attr_cdouble = column_A_cdouble->getAttrId();
139  if (attr_cdouble != column_B0_cdouble->getAttrId())
140  ABORT_ERROR("attribute id mismatch");
141 
142  // get attribute ids for table B0
143  attr_B0_a_id = column_B0_a_id->getAttrId();
144  attr_B0_cvarbinary_def = column_B0_cvarbinary_def->getAttrId();
145  attr_B0_cvarchar_def = column_B0_cvarchar_def->getAttrId();
146 
147  // get attribute ids for columns in index B0_a_id
148  attr_idx_B0_a_id = idx_B0_a_id->getColumn(0)->getAttrId();
149 }
150 
151 /************************************************************
152  * Member Functions of Class Operations
153  ************************************************************/
154 
155 /* XXX 5.1 Reference Manual, 16.14.3
156 
157  Transaction isolation level. The NDBCLUSTER storage engine supports only
158  the READ COMMITTED transaction isolation level.
159 
160  If a SELECT from a Cluster table includes a BLOB or TEXT column, the
161  READ COMMITTED transaction isolation level is converted to a read with
162  read lock. This is done to guarantee consistency, due to the fact that
163  parts of the values stored in columns of these types are actually read
164  from a separate table.
165 
166  DELETE FROM (even with no WHERE clause) is transactional. For tables
167  containing a great many rows, you may find that performance is improved
168  by using several DELETE FROM ... LIMIT ... statements to “chunk” the
169  delete operation. If your objective is to empty the table, then you may
170  wish to use TRUNCATE instead.
171 */
172 
173 void
174 CrundNdbApiOperations::init(const char* mgmd_conn_str)
175 {
176  assert(mgmd == NULL);
177  assert(mgmd_conn_str);
178 
179  // ndb_init must be called first
180  cout << endl
181  << "initializing NDBAPI ..." << flush;
182  int stat = ndb_init();
183  if (stat != 0)
184  ABORT_ERROR("ndb_init() returned: " << stat);
185  cout << " [ok]" << endl;
186 
187  // instantiate NDB cluster singleton
188  cout << "creating cluster connection ..." << flush;
189  assert(mgmd_conn_str);
190  mgmd = new Ndb_cluster_connection(mgmd_conn_str);
191  cout << " [ok]" << endl; // no useful mgmd->string conversion
192 
193  // connect to cluster management node (ndb_mgmd)
194  cout << "connecting to mgmd ..." << flush;
195  const int retries = 0; // number of retries (< 0 = indefinitely)
196  const int delay = 0; // seconds to wait after retry
197  const int verbose = 1; // print report of progess
198  // returns: 0 = success, 1 = recoverable error, -1 = non-recoverable error
199  if (mgmd->connect(retries, delay, verbose) != 0)
200  ABORT_ERROR("mgmd@" << mgmd_conn_str << " was not ready within "
201  << (retries * delay) << "s.");
202  cout << " [ok: " << mgmd_conn_str << "]" << endl;
203 }
204 
205 void
206 CrundNdbApiOperations::close()
207 {
208  assert(mgmd != NULL);
209 
210  cout << "closing cluster connection ..." << flush;
211  delete mgmd;
212  mgmd = NULL;
213  cout << " [ok]" << endl;
214 
215  // ndb_close must be called last
216  cout << "closing NDBAPI ... " << flush;
217  ndb_end(0);
218  cout << " [ok]" << endl;
219 }
220 
221 void
222 CrundNdbApiOperations::initConnection(const char* catalog, const char* schema,
223  NdbOperation::LockMode defaultLockMode)
224 {
225  assert(mgmd != NULL);
226  assert(ndb == NULL);
227  assert(tx == NULL);
228  assert(model == NULL);
229 
230  // optionally, connect and wait for reaching the data nodes (ndbds)
231  cout << "waiting for data nodes ..." << flush;
232  const int initial_wait = 10; // seconds to wait until first node detected
233  const int final_wait = 0; // seconds to wait after first node detected
234  // returns: 0 all nodes live, > 0 at least one node live, < 0 error
235  if (mgmd->wait_until_ready(initial_wait, final_wait) < 0)
236  ABORT_ERROR("data nodes were not ready within "
237  << (initial_wait + final_wait) << "s.");
238  cout << " [ok]" << endl;
239 
240  // connect to database
241  cout << "connecting to database ..." << flush;
242  ndb = new Ndb(mgmd, catalog, schema);
243  const int max_no_tx = 10; // maximum number of parallel tx (<=1024)
244  // note each scan or index scan operation uses one extra transaction
245  //if (ndb->init() != 0)
246  if (ndb->init(max_no_tx) != 0)
247  ABORT_NDB_ERROR(ndb->getNdbError());
248  cout << " [ok: " << catalog << "." << schema << "]" << endl;
249 
250  cout << "caching metadata ..." << flush;
251  CrundModel* m = new CrundModel();
252  m->init(ndb);
253  model = m;
254  cout << " [ok]" << endl;
255 
256  cout << "using lock mode for reads ..." << flush;
257  ndbOpLockMode = defaultLockMode;
258  string lm;
259  switch (defaultLockMode) {
261  lm = "LM_CommittedRead";
262  break;
264  lm = "LM_Read";
265  break;
267  lm = "LM_Exclusive";
268  break;
269  default:
270  ndbOpLockMode = NdbOperation::LM_CommittedRead;
271  lm = "LM_CommittedRead";
272  assert(false);
273  }
274  cout << " [ok: " + lm + "]" << endl;
275 }
276 
277 void
278 CrundNdbApiOperations::closeConnection()
279 {
280  assert(mgmd != NULL);
281  assert(ndb != NULL);
282  assert(tx == NULL);
283  assert(model != NULL);
284 
285  cout << "clearing metadata cache ..." << flush;
286  delete model;
287  model = NULL;
288  cout << " [ok]" << endl;
289 
290  cout << "closing database connection ..." << flush;
291  // no ndb->close();
292  delete ndb;
293  ndb = NULL;
294  cout << " [ok]" << endl;
295 }
296 
297 void
298 CrundNdbApiOperations::beginTransaction()
299 {
300  assert(tx == NULL);
301 
302  // start a transaction
303  // must be closed with Ndb::closeTransaction or NdbTransaction::close
304  if ((tx = ndb->startTransaction()) == NULL)
305  ABORT_NDB_ERROR(ndb->getNdbError());
306 }
307 
308 void
309 CrundNdbApiOperations::executeOperations()
310 {
311  assert(tx != NULL);
312 
313  // execute but don't commit the current transaction
314  if (tx->execute(NdbTransaction::NoCommit) != 0
316  ABORT_NDB_ERROR(tx->getNdbError());
317 }
318 
319 void
320 CrundNdbApiOperations::commitTransaction()
321 {
322  assert(tx != NULL);
323 
324  // commit the current transaction
325  if (tx->execute(NdbTransaction::Commit) != 0
327  ABORT_NDB_ERROR(tx->getNdbError());
328 }
329 
330 void
331 CrundNdbApiOperations::closeTransaction()
332 {
333  assert(tx != NULL);
334 
335  // close the current transaction
336  // to be called irrespectively of success or failure
337  ndb->closeTransaction(tx);
338  tx = NULL;
339 }
340 
341 // ----------------------------------------------------------------------
342 
343 void
344 CrundNdbApiOperations::clearData()
345 {
346  cout << "deleting all rows ..." << flush;
347  const bool batch = true;
348  int delB0 = -1;
349  delByScan(model->table_B0, delB0, batch);
350  cout << " [B0: " << toString(delB0) << flush;
351  int delA = -1;
352  delByScan(model->table_A, delA, batch);
353  cout << ", A: " << toString(delA) << "]" << endl;
354 }
355 
356 struct CommonAB {
357  Int32 id;
358  Int32 cint;
359  Int64 clong;
360  float cfloat;
361  double cdouble;
362 };
363 
364 // for sorting
365 static inline bool
367  return (i.id < j.id);
368 }
369 
370 static inline Int32
371 getCommonAB(const CommonAB* const ab)
372 {
373  Int32 cint = ab->cint;
374  Int64 clong = ab->clong;
375  VERIFY(clong == cint);
376  float cfloat = ab->cfloat;
377  VERIFY(cfloat == cint);
378  double cdouble = ab->cdouble;
379  VERIFY(cdouble == cint);
380  return cint;
381 }
382 
383 // some string literals
384 static const char* const astring1 = "i";
385 static const char* const astring10 = "xxxxxxxxxx";
386 static const char* const astring100 = "cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc";
387 static const char* const astring1000 = "mmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmm";
388 
389 static inline const char*
390 selectString(int length)
391 {
392  switch (length) {
393  case 0: return NULL;
394  case 1: return astring1;
395  case 10: return astring10;
396  case 100: return astring100;
397  case 1000: return astring1000;
398  default:
399  assert(false);
400  return "";
401  }
402 }
403 
404 void
405 CrundNdbApiOperations::ins(const NdbDictionary::Table* table,
406  int from, int to,
407  bool setAttrs, bool batch)
408 {
409  beginTransaction();
410  for (int i = from; i <= to; i++) {
411  // get an insert operation for the table
412  NdbOperation* op = tx->getNdbOperation(table);
413  if (op == NULL)
414  ABORT_NDB_ERROR(tx->getNdbError());
415  if (op->insertTuple() != 0)
416  ABORT_NDB_ERROR(tx->getNdbError());
417 
418  // set values; key attribute needs to be set first
419  if (op->equal(model->attr_id, (Int32)i) != 0)
420  ABORT_NDB_ERROR(tx->getNdbError());
421  if (setAttrs) {
422  if (op->setValue(model->attr_cint, (Int32)-i) != 0)
423  ABORT_NDB_ERROR(tx->getNdbError());
424  if (op->setValue(model->attr_clong, (Int64)-i) != 0)
425  ABORT_NDB_ERROR(tx->getNdbError());
426  if (op->setValue(model->attr_cfloat, (float)-i) != 0)
427  ABORT_NDB_ERROR(tx->getNdbError());
428  if (op->setValue(model->attr_cdouble, (double)-i) != 0)
429  ABORT_NDB_ERROR(tx->getNdbError());
430  }
431 
432  // execute the operation now if in non-batching mode
433  if (!batch)
434  executeOperations();
435  }
436  commitTransaction();
437  closeTransaction();
438 }
439 
440 void
441 CrundNdbApiOperations::delByScan(const NdbDictionary::Table* table, int& count,
442  bool batch)
443 {
444  beginTransaction();
445 
446  // get a full table scan operation (no scan filter defined)
447  NdbScanOperation* op = tx->getNdbScanOperation(table);
448  if (op == NULL)
449  ABORT_NDB_ERROR(tx->getNdbError());
450 
451  // define a read scan with exclusive locks
453  const int scan_flags = 0;
454  const int parallel = 0;
455  const int batch_ = 0;
456  if (op->readTuples(lock_mode, scan_flags, parallel, batch_) != 0)
457  ABORT_NDB_ERROR(tx->getNdbError());
458 
459  // start the scan; don't commit yet
460  executeOperations();
461 
462  // delete all rows in a given scan
463  count = 0;
464  int stat;
465  const bool allowFetch = true; // request new batches when exhausted
466  const bool forceSend = false; // send may be delayed
467  while ((stat = op->nextResult(allowFetch, forceSend)) == 0) {
468  // delete all tuples within a batch
469  do {
470  if (op->deleteCurrentTuple() != 0)
471  ABORT_NDB_ERROR(tx->getNdbError());
472  count++;
473 
474  // execute the operation now if in non-batching mode
475  if (!batch)
476  executeOperations();
477  } while ((stat = op->nextResult(!allowFetch, forceSend)) == 0);
478 
479  if (stat == 1) {
480  // no more batches
481  break;
482  }
483  if (stat == 2) {
484  // end of current batch, fetch next
485  if (tx->execute(NdbTransaction::NoCommit) != 0
487  ABORT_NDB_ERROR(tx->getNdbError());
488  continue;
489  }
490  ABORT_ERROR("stat == " + stat);
491  }
492  if (stat != 1)
493  ABORT_ERROR("stat == " + stat);
494 
495  // close the scan
496  const bool forceSend_ = false;
497  const bool releaseOp = false;
498  op->close(forceSend_, releaseOp);
499  //CDBG << "!!! deleted " << toString(count) << " rows" << endl;
500 
501  commitTransaction();
502  closeTransaction();
503 }
504 
505 void
506 CrundNdbApiOperations::delByPK(const NdbDictionary::Table* table,
507  int from, int to,
508  bool batch)
509 {
510  beginTransaction();
511  for (int i = from; i <= to; i++) {
512  // get a delete operation for the table
513  NdbOperation* op = tx->getNdbOperation(table);
514  if (op == NULL)
515  ABORT_NDB_ERROR(tx->getNdbError());
516  if (op->deleteTuple() != 0)
517  ABORT_NDB_ERROR(tx->getNdbError());
518 
519  // set key attribute
520  if (op->equal(model->attr_id, (Int32)i) != 0)
521  ABORT_NDB_ERROR(tx->getNdbError());
522 
523  // execute the operation now if in non-batching mode
524  if (!batch)
525  executeOperations();
526  }
527  commitTransaction();
528  closeTransaction();
529 }
530 
531 void
532 CrundNdbApiOperations::setByPK(const NdbDictionary::Table* table,
533  int from, int to,
534  bool batch)
535 {
536  beginTransaction();
537  for (int i = from; i <= to; i++) {
538  // get an update operation for the table
539  NdbOperation* op = tx->getNdbOperation(table);
540  if (op == NULL)
541  ABORT_NDB_ERROR(tx->getNdbError());
542  if (op->updateTuple() != 0)
543  ABORT_NDB_ERROR(tx->getNdbError());
544 
545  // set values; key attribute needs to be set first
546  if (op->equal(model->attr_id, (Int32)i) != 0)
547  ABORT_NDB_ERROR(tx->getNdbError());
548  if (op->setValue(model->attr_cint, (Int32)i) != 0)
549  ABORT_NDB_ERROR(tx->getNdbError());
550  if (op->setValue(model->attr_clong, (Int64)i) != 0)
551  ABORT_NDB_ERROR(tx->getNdbError());
552  if (op->setValue(model->attr_cfloat, (float)i) != 0)
553  ABORT_NDB_ERROR(tx->getNdbError());
554  if (op->setValue(model->attr_cdouble, (double)i) != 0)
555  ABORT_NDB_ERROR(tx->getNdbError());
556 
557  // execute the operation now if in non-batching mode
558  if (!batch)
559  executeOperations();
560  }
561  commitTransaction();
562  closeTransaction();
563 }
564 
565 void
566 CrundNdbApiOperations::getByPK_bb(const NdbDictionary::Table* table,
567  int from, int to,
568  bool batch)
569 {
570  // allocate attributes holder
571  const int count = (to - from) + 1;
572  CommonAB* const ab = new CommonAB[count];
573 
574  // fetch attributes by key
575  beginTransaction();
576  CommonAB* pab = ab;
577  for (int i = from; i <= to; i++, pab++) {
578  // get a read operation for the table
579  NdbOperation* op = tx->getNdbOperation(table);
580  if (op == NULL)
581  ABORT_NDB_ERROR(tx->getNdbError());
582  if (op->readTuple(ndbOpLockMode) != 0)
583  ABORT_NDB_ERROR(tx->getNdbError());
584 
585  // set key attribute
586  if (op->equal(model->attr_id, (Int32)i) != 0)
587  ABORT_NDB_ERROR(tx->getNdbError());
588 
589  // get attributes (not readable until after commit)
590  if (op->getValue(model->attr_id, (char*)&pab->id) == NULL)
591  ABORT_NDB_ERROR(tx->getNdbError());
592  if (op->getValue(model->attr_cint, (char*)&pab->cint) == NULL)
593  ABORT_NDB_ERROR(tx->getNdbError());
594  if (op->getValue(model->attr_clong, (char*)&pab->clong) == NULL)
595  ABORT_NDB_ERROR(tx->getNdbError());
596  if (op->getValue(model->attr_cfloat, (char*)&pab->cfloat) == NULL)
597  ABORT_NDB_ERROR(tx->getNdbError());
598  if (op->getValue(model->attr_cdouble, (char*)&pab->cdouble) == NULL)
599  ABORT_NDB_ERROR(tx->getNdbError());
600 
601  // execute the operation now if in non-batching mode
602  if (!batch)
603  executeOperations();
604  }
605  commitTransaction();
606  closeTransaction();
607 
608  // check fetched values
609  pab = ab;
610  for (int i = from; i <= to; i++, pab++) {
611  // check fetched values
612  Int32 id = pab->id;
613  VERIFY(id == i);
614 
615  Int32 j = getCommonAB(pab);
616  //CDBG << "!!! id=" << toString(id) << ", i=" << toString(i) << endl;
617  VERIFY(j == id);
618  }
619 
620  // release attributes holder
621  delete[] ab;
622 }
623 
624 struct CommonAB_AR {
625  NdbRecAttr* id;
626  NdbRecAttr* cint;
627  NdbRecAttr* clong;
628  NdbRecAttr* cfloat;
629  NdbRecAttr* cdouble;
630 };
631 
632 static inline Int32
633 getCommonAB(const CommonAB_AR* const ab)
634 {
635  Int32 cint = ab->cint->int32_value();
636  Int64 clong = ab->clong->int64_value();
637  VERIFY(clong == cint);
638  float cfloat = ab->cfloat->float_value();
639  VERIFY(cfloat == cint);
640  double cdouble = ab->cdouble->double_value();
641  VERIFY(cdouble == cint);
642  return cint;
643 }
644 
645 void
646 CrundNdbApiOperations::getByPK_ar(const NdbDictionary::Table* table,
647  int from, int to,
648  bool batch)
649 {
650  // allocate attributes holder
651  const int count = (to - from) + 1;
652  CommonAB_AR* const ab = new CommonAB_AR[count];
653 
654  // fetch attributes by key
655  beginTransaction();
656  CommonAB_AR* pab = ab;
657  for (int i = from; i <= to; i++, pab++) {
658  // get a read operation for the table
659  NdbOperation* op = tx->getNdbOperation(table);
660  if (op == NULL)
661  ABORT_NDB_ERROR(tx->getNdbError());
662  if (op->readTuple(ndbOpLockMode) != 0)
663  ABORT_NDB_ERROR(tx->getNdbError());
664 
665  // set key attribute
666  if (op->equal(model->attr_id, (Int32)i) != 0)
667  ABORT_NDB_ERROR(tx->getNdbError());
668 
669  // get attributes (not readable until after commit)
670  if ((pab->id = op->getValue(model->attr_id, NULL)) == NULL)
671  ABORT_NDB_ERROR(tx->getNdbError());
672  if ((pab->cint = op->getValue(model->attr_cint, NULL)) == NULL)
673  ABORT_NDB_ERROR(tx->getNdbError());
674  if ((pab->clong = op->getValue(model->attr_clong, NULL)) == NULL)
675  ABORT_NDB_ERROR(tx->getNdbError());
676  if ((pab->cfloat = op->getValue(model->attr_cfloat, NULL)) == NULL)
677  ABORT_NDB_ERROR(tx->getNdbError());
678  if ((pab->cdouble = op->getValue(model->attr_cdouble, NULL)) == NULL)
679  ABORT_NDB_ERROR(tx->getNdbError());
680 
681  // execute the operation now if in non-batching mode
682  if (!batch)
683  executeOperations();
684  }
685  commitTransaction();
686  closeTransaction();
687 
688  // check fetched values
689  pab = ab;
690  for (int i = from; i <= to; i++, pab++) {
691  // check fetched values
692  Int32 id = pab->id->int32_value();
693  VERIFY(id == i);
694 
695  Int32 j = getCommonAB(pab);
696  //CDBG << "!!! id=" << toString(id) << ", i=" << toString(i) << endl;
697  VERIFY(j == id);
698  }
699 
700  // release attributes holder
701  delete[] ab;
702 }
703 
704 void
705 CrundNdbApiOperations::setVarbinary(const NdbDictionary::Table* table,
706  int from, int to, bool batch, int length)
707 {
708  setVar(table, model->attr_B0_cvarbinary_def,
709  from, to, batch, selectString(length));
710 }
711 
712 void
713 CrundNdbApiOperations::setVarchar(const NdbDictionary::Table* table,
714  int from, int to, bool batch, int length)
715 {
716  setVar(table, model->attr_B0_cvarchar_def,
717  from, to, batch, selectString(length));
718 }
719 
720 void
721 CrundNdbApiOperations::getVarbinary(const NdbDictionary::Table* table,
722  int from, int to, bool batch, int length)
723 {
724  getVar(table, model->attr_B0_cvarbinary_def,
725  from, to, batch, selectString(length));
726 }
727 
728 void
729 CrundNdbApiOperations::getVarchar(const NdbDictionary::Table* table,
730  int from, int to, bool batch, int length)
731 {
732  getVar(table, model->attr_B0_cvarchar_def,
733  from, to, batch, selectString(length));
734 }
735 
736 void
737 CrundNdbApiOperations::setVar(const NdbDictionary::Table* table, int attr_cvar,
738  int from, int to,
739  bool batch, const char* str)
740 {
741  char* buf = NULL;
742  if (str != NULL) {
743  // allocate attributes holder
744  size_t slen = strlen(str);
745  // XXX assumes column declared as VARBINARY/CHAR(<255)
746  size_t sbuf = 1 + slen;
747  // XXX buffer overflow if slen >255!!!
748  assert(slen < 255);
749  buf = new char[sbuf];
750  buf[0] = (char)slen;
751  memcpy(buf + 1, str, slen);
752  //CDBG << "!!! buf[0]=" << toString(buf[0]) << endl;
753  //CDBG << "!!! buf[1]=" << toString(buf[1]) << endl;
754  //CDBG << "!!! buf[" << toString(slen) << "]=" << toString(buf[slen]) << endl;
755  }
756 
757  beginTransaction();
758  for (int i = from; i <= to; i++) {
759  // get an update operation for the table
760  NdbOperation* op = tx->getNdbOperation(table);
761  if (op == NULL)
762  ABORT_NDB_ERROR(tx->getNdbError());
763  if (op->updateTuple() != 0)
764  ABORT_NDB_ERROR(tx->getNdbError());
765 
766  // set values; key attribute needs to be set first
767  if (op->equal(model->attr_id, (Int32)i) != 0)
768  ABORT_NDB_ERROR(tx->getNdbError());
769  if (op->setValue(attr_cvar, buf) != 0)
770  ABORT_NDB_ERROR(tx->getNdbError());
771 
772  // execute the operation now if in non-batching mode
773  if (!batch)
774  executeOperations();
775  }
776  commitTransaction();
777  closeTransaction();
778 
779  // release attributes holder
780  if (buf != NULL) {
781  delete[] buf;
782  }
783 }
784 
785 void
786 CrundNdbApiOperations::getVar(const NdbDictionary::Table* table, int attr_cvar,
787  int from, int to,
788  bool batch, const char* str)
789 {
790  assert(str);
791 
792  // allocate attributes holder
793  const int count = (to - from) + 1;
794  size_t slen = strlen(str);
795  const size_t sline = (1 + slen);
796  const size_t sbuf = count * sline;
797  char* const buf = new char[sbuf];
798  //memset(buf, 1, sbuf);
799  //CDBG << "!!! buf[0]=" << toString(buf[0]) << endl;
800  //CDBG << "!!! buf[1]=" << toString(buf[1]) << endl;
801  //CDBG << "!!! buf[" << toString(slen) << "]=" << toString(buf[slen]) << endl;
802  //CDBG << "!!! buf[" << toString(slen+1) << "]=" << toString(buf[slen+1]) << endl;
803 
804  // fetch string attribute by key
805  char* s = buf;
806  beginTransaction();
807  for (int i = from; i <= to; i++, s += sline) {
808  // get a read operation for the table
809  NdbOperation* op = tx->getNdbOperation(table);
810  if (op == NULL)
811  ABORT_NDB_ERROR(tx->getNdbError());
812  if (op->readTuple(ndbOpLockMode) != 0)
813  ABORT_NDB_ERROR(tx->getNdbError());
814 
815  // set key attribute
816  if (op->equal(model->attr_id, (Int32)1) != 0)
817  ABORT_NDB_ERROR(tx->getNdbError());
818 
819  // get attributes (not readable until after commit)
820  if (op->getValue(attr_cvar, (char*)s) == NULL)
821  ABORT_NDB_ERROR(tx->getNdbError());
822 
823  // execute the operation now if in non-batching mode
824  if (!batch)
825  executeOperations();
826  }
827  commitTransaction();
828  closeTransaction();
829  assert(s == buf + sbuf);
830 
831  // copy (move) the strings to make them aligned and 0-terminated
832  s = buf;
833  for (int i = from; i <= to; i++, s += sline) {
834  //CDBG << "!!! s[0]=" << toString(s[0]) << endl;
835  //CDBG << "!!! s[1]=" << toString(s[1]) << endl;
836  //CDBG << "!!! s[" << toString(slen) << "]=" << toString(s[slen]) << endl;
837  //CDBG << "!!! s[" << toString(slen+1) << "]=" << toString(s[slen + 1]) << endl;
838 
839  const size_t n = s[0];
840  VERIFY(n < sline);
841 
842  // move and 0-terminated string
843  memmove(s, s + 1, n);
844  s[n] = 0;
845 
846  // check fetched values
847  VERIFY(strcmp(s, str) == 0);
848  }
849  assert(s == buf + sbuf);
850 
851  // release attributes holder
852  delete[] buf;
853 }
854 
855 void
856 CrundNdbApiOperations::setB0ToA(int nOps, bool batch)
857 {
858  beginTransaction();
859  for (int i = 1; i <= nOps; i++) {
860  // get an update operation for the table
861  NdbOperation* op = tx->getNdbOperation(model->table_B0);
862  if (op == NULL)
863  ABORT_NDB_ERROR(tx->getNdbError());
864  if (op->updateTuple() != 0)
865  ABORT_NDB_ERROR(tx->getNdbError());
866 
867  // set key attribute
868  if (op->equal(model->attr_id, (Int32)i) != 0)
869  ABORT_NDB_ERROR(tx->getNdbError());
870 
871  // set a_id attribute
872  int a_id = ((i - 1) % nOps) + 1;
873  if (op->setValue(model->attr_B0_a_id, (Int32)a_id) != 0)
874  ABORT_NDB_ERROR(tx->getNdbError());
875 
876  // execute the operation now if in non-batching mode
877  if (!batch)
878  executeOperations();
879  }
880  commitTransaction();
881  closeTransaction();
882 }
883 
884 void
885 CrundNdbApiOperations::nullB0ToA(int nOps, bool batch)
886 {
887  beginTransaction();
888  for (int i = 1; i <= nOps; i++) {
889  // get an update operation for the table
890  NdbOperation* op = tx->getNdbOperation(model->table_B0);
891  if (op == NULL)
892  ABORT_NDB_ERROR(tx->getNdbError());
893  if (op->updateTuple() != 0)
894  ABORT_NDB_ERROR(tx->getNdbError());
895 
896  // set key attribute
897  if (op->equal(model->attr_id, (Int32)i) != 0)
898  ABORT_NDB_ERROR(tx->getNdbError());
899 
900  // clear a_id attribute
901  if (op->setValue(model->attr_B0_a_id, (char*)NULL) != 0)
902  ABORT_NDB_ERROR(tx->getNdbError());
903 
904  // execute the operation now if in non-batching mode
905  if (!batch)
906  executeOperations();
907  }
908  commitTransaction();
909  closeTransaction();
910 }
911 
912 void
913 CrundNdbApiOperations::navB0ToA(int nOps, bool batch)
914 {
915  // allocate attributes holder
916  CommonAB* const ab = new CommonAB[nOps];
917 
918  // fetch the foreign keys from B0 and read attributes from A
919  beginTransaction();
920  CommonAB* pab = ab;
921  for (int i = 1; i <= nOps; i++, pab++) {
922  // fetch the foreign key value from B0
923  Int32 a_id;
924  {
925  // get a read operation for the table
926  NdbOperation* op = tx->getNdbOperation(model->table_B0);
927  if (op == NULL)
928  ABORT_NDB_ERROR(tx->getNdbError());
929  if (op->readTuple(ndbOpLockMode) != 0)
930  ABORT_NDB_ERROR(tx->getNdbError());
931 
932  // set key attribute
933  if (op->equal(model->attr_id, (Int32)i) != 0)
934  ABORT_NDB_ERROR(tx->getNdbError());
935 
936  // get attribute (not readable until after commit)
937  if (op->getValue(model->attr_B0_a_id, (char*)&a_id) == NULL)
938  ABORT_NDB_ERROR(tx->getNdbError());
939  }
940  executeOperations(); // execute the operation; don't commit yet
941 
942  // fetch the attributes from A
943  {
944  // get a read operation for the table
945  NdbOperation* op = tx->getNdbOperation(model->table_A);
946  if (op == NULL)
947  ABORT_NDB_ERROR(tx->getNdbError());
948  if (op->readTuple(ndbOpLockMode) != 0)
949  ABORT_NDB_ERROR(tx->getNdbError());
950 
951  // set key attribute
952  assert(a_id == ((i - 1) % nOps) + 1);
953  if (op->equal(model->attr_id, a_id) != 0)
954  ABORT_NDB_ERROR(tx->getNdbError());
955 
956  // get attributes (not readable until after commit)
957  if (op->getValue(model->attr_id, (char*)&pab->id) == NULL)
958  ABORT_NDB_ERROR(tx->getNdbError());
959  if (op->getValue(model->attr_cint, (char*)&pab->cint) == NULL)
960  ABORT_NDB_ERROR(tx->getNdbError());
961  if (op->getValue(model->attr_clong, (char*)&pab->clong) == NULL)
962  ABORT_NDB_ERROR(tx->getNdbError());
963  if (op->getValue(model->attr_cfloat, (char*)&pab->cfloat) == NULL)
964  ABORT_NDB_ERROR(tx->getNdbError());
965  if (op->getValue(model->attr_cdouble, (char*)&pab->cdouble) == NULL)
966  ABORT_NDB_ERROR(tx->getNdbError());
967  }
968 
969  // execute the operation now if in non-batching mode
970  if (!batch)
971  executeOperations();
972  }
973  commitTransaction();
974  closeTransaction();
975 
976  // check fetched values
977  pab = ab;
978  for (int i = 1; i <= nOps; i++, pab++) {
979  // check fetched values
980  Int32 id = pab->id;
981  VERIFY(id == ((i - 1) % nOps) + 1);
982 
983  Int32 j = getCommonAB(pab);
984  //CDBG << "!!! id=" << toString(id) << ", i=" << toString(i) << endl;
985  VERIFY(j == id);
986  }
987 
988  // release attributes holder
989  delete[] ab;
990 }
991 
992 void
993 CrundNdbApiOperations::navB0ToAalt(int nOps, bool batch)
994 {
995  // allocate foreign key values holder
996  Int32* const a_id = new Int32[nOps];
997 
998  // fetch the foreign key values from B0
999  beginTransaction();
1000  Int32* pa_id = a_id;
1001  for (int i = 1; i <= nOps; i++) {
1002  // get a read operation for the table
1003  NdbOperation* op = tx->getNdbOperation(model->table_B0);
1004  if (op == NULL)
1005  ABORT_NDB_ERROR(tx->getNdbError());
1006  if (op->readTuple(ndbOpLockMode) != 0)
1007  ABORT_NDB_ERROR(tx->getNdbError());
1008 
1009  // set key attribute
1010  if (op->equal(model->attr_id, (Int32)i) != 0)
1011  ABORT_NDB_ERROR(tx->getNdbError());
1012 
1013  // get attribute (not readable until after commit)
1014  if (op->getValue(model->attr_B0_a_id, (char*)pa_id++) == NULL)
1015  ABORT_NDB_ERROR(tx->getNdbError());
1016 
1017  // execute the operation now if in non-batching mode
1018  if (!batch)
1019  executeOperations();
1020  }
1021  executeOperations(); // execute the operation; don't commit yet
1022 
1023  // allocate attributes holder
1024  CommonAB* const ab = new CommonAB[nOps];
1025 
1026  // fetch rows from A
1027  pa_id = a_id;
1028  CommonAB* pab = ab;
1029  for (int i = 1; i <= nOps; i++, pa_id++, pab++) {
1030  // get a read operation for the table
1031  NdbOperation* op = tx->getNdbOperation(model->table_A);
1032  if (op == NULL)
1033  ABORT_NDB_ERROR(tx->getNdbError());
1034  if (op->readTuple(ndbOpLockMode) != 0)
1035  ABORT_NDB_ERROR(tx->getNdbError());
1036 
1037  // set key attribute
1038  assert(*pa_id == ((i - 1) % nOps) + 1);
1039  if (op->equal(model->attr_id, (Int32)*pa_id) != 0)
1040  ABORT_NDB_ERROR(tx->getNdbError());
1041 
1042  // get attributes (not readable until after commit)
1043  if (op->getValue(model->attr_id, (char*)&pab->id) == NULL)
1044  ABORT_NDB_ERROR(tx->getNdbError());
1045  if (op->getValue(model->attr_cint, (char*)&pab->cint) == NULL)
1046  ABORT_NDB_ERROR(tx->getNdbError());
1047  if (op->getValue(model->attr_clong, (char*)&pab->clong) == NULL)
1048  ABORT_NDB_ERROR(tx->getNdbError());
1049  if (op->getValue(model->attr_cfloat, (char*)&pab->cfloat) == NULL)
1050  ABORT_NDB_ERROR(tx->getNdbError());
1051  if (op->getValue(model->attr_cdouble, (char*)&pab->cdouble) == NULL)
1052  ABORT_NDB_ERROR(tx->getNdbError());
1053 
1054  // execute the operation now if in non-batching mode
1055  if (!batch)
1056  executeOperations();
1057  }
1058  commitTransaction();
1059  closeTransaction();
1060 
1061  // release foreign key values holder
1062  delete[] a_id;
1063 
1064  // check fetched values
1065  pab = ab;
1066  for (int i = 1; i <= nOps; i++, pab++) {
1067  // check fetched values
1068  Int32 id = pab->id;
1069  VERIFY(id == ((i - 1) % nOps) + 1);
1070 
1071  Int32 j = getCommonAB(pab);
1072  //CDBG << "!!! id=" << toString(id) << ", i=" << toString(i) << endl;
1073  VERIFY(j == id);
1074  }
1075 
1076  // release attributes holder
1077  delete[] ab;
1078 }
1079 
1080 void
1081 CrundNdbApiOperations::navAToB0(int nOps, bool forceSend)
1082 {
1083  // attributes holder
1084  CommonAB h;
1085 
1086  // allocate attributes holder
1087  CommonAB* const ab = new CommonAB[nOps];
1088 
1089  // fetch attributes from B0 by foreign key scan
1090  beginTransaction();
1091  CommonAB* pab = ab;
1092  for (int i = 1; i <= nOps; i++) {
1093  // get an index scan operation for the table
1095  = tx->getNdbIndexScanOperation(model->idx_B0_a_id);
1096  if (op == NULL)
1097  ABORT_NDB_ERROR(tx->getNdbError());
1098 
1099  if (op->readTuples(ndbOpLockMode) != 0)
1100  ABORT_NDB_ERROR(tx->getNdbError());
1101 
1102  // define the scan's bounds (more efficient than using a scan filter)
1103  // the argument to setBound() is not the column's attribute id
1104  // if (op->setBound(model->attr_B0_a_id, ...
1105  // or column name
1106  // if (op->setBound("a_id", ...
1107  // but the attribute id of the column *in the index*.
1108  // if (op->setBound(idx_B0_a_id->getColumn(0)->getAttrId()...
1109  // for which we introduced a shortcut.
1110  if (op->setBound(model->attr_idx_B0_a_id,
1112  ABORT_NDB_ERROR(tx->getNdbError());
1113 
1114  // get attributes (not readable until after commit)
1115  if (op->getValue(model->attr_id, (char*)&h.id) == NULL)
1116  ABORT_NDB_ERROR(tx->getNdbError());
1117  if (op->getValue(model->attr_cint, (char*)&h.cint) == NULL)
1118  ABORT_NDB_ERROR(tx->getNdbError());
1119  if (op->getValue(model->attr_clong, (char*)&h.clong) == NULL)
1120  ABORT_NDB_ERROR(tx->getNdbError());
1121  if (op->getValue(model->attr_cfloat, (char*)&h.cfloat) == NULL)
1122  ABORT_NDB_ERROR(tx->getNdbError());
1123  if (op->getValue(model->attr_cdouble, (char*)&h.cdouble) == NULL)
1124  ABORT_NDB_ERROR(tx->getNdbError());
1125 
1126  // start the scan; don't commit yet
1127  executeOperations();
1128 
1129  // read the result set executing the defined read operations
1130  int stat;
1131  const bool allowFetch = true; // request new batches when exhausted
1132  while ((stat = op->nextResult(allowFetch, forceSend)) == 0) {
1133  assert(ab <= pab && pab < ab + nOps);
1134  *pab++ = h;
1135  }
1136  if (stat != 1)
1137  ABORT_NDB_ERROR(tx->getNdbError());
1138 
1139  op->close();
1140  }
1141  commitTransaction();
1142  closeTransaction();
1143  //CDBG << "!!! pab - ab =" << toString(pab-ab) << endl;
1144  assert(pab == ab + nOps);
1145 
1146  // check fetched values
1147  // XXX this is not the most efficient way of testing...
1148  vector<CommonAB> b(ab, ab + nOps);
1149  sort(b.begin(), b.end(), compare);
1150  vector<CommonAB>::const_iterator it = b.begin();
1151  for (int i = 1; i <= nOps; i++, it++) {
1152  Int32 id = getCommonAB(&it[0]);
1153  //CDBG << "!!! id=" << toString(id) << ", i=" << toString(i) << endl;
1154  VERIFY(id == i);
1155  }
1156 
1157  // release attributes holder
1158  delete[] ab;
1159 }
1160 
1161 void
1162 CrundNdbApiOperations::navAToB0alt(int nOps, bool forceSend)
1163 {
1164  // number of operations in a multi-scan batch
1165  const int nmscans = (nOps < 256 ? nOps : 256);
1166 
1167  // attributes holder
1168  CommonAB h;
1169 
1170  // allocate attributes holder
1171  CommonAB* const ab = new CommonAB[nOps];
1172  CommonAB* pab = ab;
1173 
1174  // fetch attributes from B0 by foreign key scan
1175  beginTransaction();
1176  int a_id = 1;
1177  while (a_id <= nOps) {
1178  // allocate scan operations array
1179  NdbIndexScanOperation** const op = new NdbIndexScanOperation*[nmscans];
1180 
1181  for (int i = 0; i < nmscans; i++) {
1182  // get an index scan operation for the table
1183  op[i] = tx->getNdbIndexScanOperation(model->idx_B0_a_id);
1184  if (op[i] == NULL)
1185  ABORT_NDB_ERROR(tx->getNdbError());
1186 
1187  // XXX ? no locks (LM_CommittedRead) or shared locks (LM_Read)
1188  if (op[i]->readTuples(ndbOpLockMode) != 0)
1189  ABORT_NDB_ERROR(tx->getNdbError());
1190 
1191  // define the scan's bounds (more efficient than using a scan filter)
1192  // the argument to setBound() is not the column's attribute id
1193  // if (op[i]->setBound(model->attr_B0_a_id, ...
1194  // or column name
1195  // if (op[i]->setBound("a_id", ...
1196  // but the attribute id of the column *in the index*.
1197  // if (op[i]->setBound(idx_B0_a_id->getColumn(0)->getAttrId()...
1198  // for which we introduced a shortcut.
1199  if (op[i]->setBound(model->attr_idx_B0_a_id,
1200  NdbIndexScanOperation::BoundEQ, &a_id) != 0)
1201  ABORT_NDB_ERROR(tx->getNdbError());
1202 
1203  // get attributes (not readable until after commit)
1204  if (op[i]->getValue(model->attr_id, (char*)&h.id) == NULL)
1205  ABORT_NDB_ERROR(tx->getNdbError());
1206  if (op[i]->getValue(model->attr_cint, (char*)&h.cint) == NULL)
1207  ABORT_NDB_ERROR(tx->getNdbError());
1208  if (op[i]->getValue(model->attr_clong, (char*)&h.clong) == NULL)
1209  ABORT_NDB_ERROR(tx->getNdbError());
1210  if (op[i]->getValue(model->attr_cfloat, (char*)&h.cfloat) == NULL)
1211  ABORT_NDB_ERROR(tx->getNdbError());
1212  if (op[i]->getValue(model->attr_cdouble, (char*)&h.cdouble) == NULL)
1213  ABORT_NDB_ERROR(tx->getNdbError());
1214 
1215  // next a
1216  a_id++;
1217  }
1218  executeOperations(); // start the scans; don't commit yet
1219 
1220  // fetch attributes from B0 by foreign key scan
1221  for (int i = 0; i < nmscans; i++) {
1222  // read the result set executing the defined read operations
1223  int stat;
1224  const bool allowFetch = true; // request new batches when exhausted
1225  while ((stat = op[i]->nextResult(allowFetch, forceSend)) == 0) {
1226  assert(ab <= pab && pab < ab + nOps);
1227  *pab++ = h;
1228  }
1229  if (stat != 1)
1230  ABORT_NDB_ERROR(tx->getNdbError());
1231 
1232  op[i]->close();
1233  }
1234 
1235  // release scan operations array
1236  delete[] op;
1237  }
1238  commitTransaction();
1239  closeTransaction();
1240  //CDBG << "!!! pab - ab =" << toString(pab-ab) << endl;
1241  assert(a_id == nOps + 1);
1242  assert(pab == ab + nOps);
1243 
1244  // check fetched values
1245  // XXX this is not the most efficient way of testing...
1246  vector<CommonAB> b(ab, ab + nOps);
1247  sort(b.begin(), b.end(), compare);
1248  vector<CommonAB>::const_iterator it = b.begin();
1249  for (int i = 1; i <= nOps; i++, it++) {
1250  Int32 id = getCommonAB(&it[0]);
1251  //CDBG << "!!! id=" << toString(id) << ", i=" << toString(i) << endl;
1252  VERIFY(id == i);
1253  }
1254 
1255  // release attributes holder
1256  delete[] ab;
1257 }
1258 
1259 // ----------------------------------------------------------------------
1260