MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
NdbJTieLoad.java
1 /* -*- mode: java; c-basic-offset: 4; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=4:tabstop=4:smarttab:
3  *
4  * Copyright (c) 2010, 2011, Oracle and/or its affiliates. All rights reserved.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; version 2 of the License.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18  */
19 
20 package com.mysql.cluster.crund;
21 
22 import java.util.Arrays;
23 import java.nio.ByteBuffer;
24 import java.nio.ByteOrder;
25 import java.nio.IntBuffer;
26 
27 import com.mysql.ndbjtie.ndbapi.Ndb_cluster_connection;
28 import com.mysql.ndbjtie.ndbapi.Ndb;
29 import com.mysql.ndbjtie.ndbapi.NdbDictionary.Dictionary;
30 import com.mysql.ndbjtie.ndbapi.NdbDictionary.TableConst;
31 import com.mysql.ndbjtie.ndbapi.NdbDictionary.Table;
32 import com.mysql.ndbjtie.ndbapi.NdbDictionary.ColumnConst;
33 import com.mysql.ndbjtie.ndbapi.NdbDictionary.IndexConst;
34 import com.mysql.ndbjtie.ndbapi.NdbErrorConst;
35 import com.mysql.ndbjtie.ndbapi.NdbError;
36 import com.mysql.ndbjtie.ndbapi.NdbTransaction;
37 import com.mysql.ndbjtie.ndbapi.NdbOperation;
38 import com.mysql.ndbjtie.ndbapi.NdbScanOperation;
39 import com.mysql.ndbjtie.ndbapi.NdbRecAttr;
40 /*
41 //import com.mysql.ndbjtie.ndbapi.ExecType; // ndbj-0.7.0
42 import com.mysql.ndbjtie.ndbapi.NdbTransaction.ExecType; // ndbj-0.7.1
43 import com.mysql.ndbjtie.ndbapi.NdbOperation;
44 import com.mysql.ndbjtie.ndbapi.NdbOperation.LockMode;
45 //import com.mysql.ndbjtie.ndbapi.AbortOption; // ndbj-0.7.0
46 import com.mysql.ndbjtie.ndbapi.NdbOperation.AbortOption; // ndbj-0.7.1
47 import com.mysql.ndbjtie.ndbapi.NdbIndexScanOperation;
48 import com.mysql.ndbjtie.ndbapi.NdbIndexScanOperation.BoundType;
49 */
50 
54 public class NdbJTieLoad extends NdbBase {
55 
56  // ----------------------------------------------------------------------
57  // NDB JTie resources
58  // ----------------------------------------------------------------------
59 
60  // singleton object representing the NDB cluster (one per process)
61  protected Ndb_cluster_connection mgmd;
62 
63  // object representing a connection to an NDB database
64  protected Ndb ndb;
65 
66  // the benchmark's metadata shortcuts
67  protected Model model;
68 
69  // object representing an NDB database transaction
70  protected NdbTransaction tx;
71 
72  // ----------------------------------------------------------------------
73  // NDB JTie intializers/finalizers
74  // ----------------------------------------------------------------------
75 
76  protected void initProperties() {
77  super.initProperties();
78  descr = "->ndbjtie(" + mgmdConnect + ")";
79  }
80 
81  protected void initLoad() throws Exception {
82  // XXX support generic load class
83  //super.init();
84 
85  // load native library (better diagnostics doing it explicitely)
86  out.println();
87  loadSystemLibrary("ndbclient");
88 
89  // instantiate NDB cluster singleton
90  out.print("creating cluster connection ...");
91  out.flush();
92  mgmd = Ndb_cluster_connection.create(mgmdConnect);
93  assert mgmd != null;
94  out.println(" [ok]");
95 
96  // connect to cluster management node (ndb_mgmd)
97  out.print("connecting to mgmd ...");
98  out.flush();
99  final int retries = 0; // retries (< 0 = indefinitely)
100  final int delay = 0; // seconds to wait after retry
101  final int verbose = 1; // print report of progess
102  // 0 = success, 1 = recoverable error, -1 = non-recoverable error
103  if (mgmd.connect(retries, delay, verbose) != 0) {
104  final String msg = ("mgmd@" + mgmdConnect
105  + " was not ready within "
106  + (retries * delay) + "s.");
107  out.println(msg);
108  throw new RuntimeException("!!! " + msg);
109  }
110  out.println(" [ok: " + mgmdConnect + "]");
111  }
112 
113  protected void closeLoad() throws Exception {
114  out.println();
115  out.print("closing mgmd connection ...");
116  out.flush();
117  if (mgmd != null)
118  Ndb_cluster_connection.delete(mgmd);
119  mgmd = null;
120  out.println(" [ok]");
121 
122  // XXX support generic load class
123  //super.close();
124  }
125 
126  // ----------------------------------------------------------------------
127  // NDB JTie operations
128  // ----------------------------------------------------------------------
129 
130  // returns a string representation of an NdbError
131  static protected String toStr(NdbErrorConst e) {
132  return "NdbError[" + e.code() + "]: " + e.message();
133  }
134 
135  // holds shortcuts to the benchmark's schema information
136  static protected class Model {
137  public final TableConst table_A;
138  public final TableConst table_B0;
139  public final ColumnConst column_A_id;
140  public final ColumnConst column_A_cint;
141  public final ColumnConst column_A_clong;
142  public final ColumnConst column_A_cfloat;
143  public final ColumnConst column_A_cdouble;
144  public final ColumnConst column_B0_id;
145  public final ColumnConst column_B0_cint;
146  public final ColumnConst column_B0_clong;
147  public final ColumnConst column_B0_cfloat;
148  public final ColumnConst column_B0_cdouble;
149  public final ColumnConst column_B0_a_id;
150  public final ColumnConst column_B0_cvarbinary_def;
151  public final ColumnConst column_B0_cvarchar_def;
152  public final IndexConst idx_B0_a_id;
153  public final int attr_id;
154  public final int attr_cint;
155  public final int attr_clong;
156  public final int attr_cfloat;
157  public final int attr_cdouble;
158  public final int attr_B0_a_id;
159  public final int attr_B0_cvarbinary_def;
160  public final int attr_B0_cvarchar_def;
161 
162  // initialize this instance from the dictionary
163  public Model(Ndb ndb) {
164  final Dictionary dict = ndb.getDictionary();
165 
166  // get columns of table A
167  if ((table_A = dict.getTable("a")) == null)
168  throw new RuntimeException(toStr(dict.getNdbError()));
169  if ((column_A_id = table_A.getColumn("id")) == null)
170  throw new RuntimeException(toStr(dict.getNdbError()));
171  if ((column_A_cint = table_A.getColumn("cint")) == null)
172  throw new RuntimeException(toStr(dict.getNdbError()));
173  if ((column_A_clong = table_A.getColumn("clong")) == null)
174  throw new RuntimeException(toStr(dict.getNdbError()));
175  if ((column_A_cfloat = table_A.getColumn("cfloat")) == null)
176  throw new RuntimeException(toStr(dict.getNdbError()));
177  if ((column_A_cdouble = table_A.getColumn("cdouble")) == null)
178  throw new RuntimeException(toStr(dict.getNdbError()));
179 
180  // get columns of table B0
181  if ((table_B0 = dict.getTable("b0")) == null)
182  throw new RuntimeException(toStr(dict.getNdbError()));
183  if ((column_B0_id = table_B0.getColumn("id")) == null)
184  throw new RuntimeException(toStr(dict.getNdbError()));
185  if ((column_B0_cint = table_B0.getColumn("cint")) == null)
186  throw new RuntimeException(toStr(dict.getNdbError()));
187  if ((column_B0_clong = table_B0.getColumn("clong")) == null)
188  throw new RuntimeException(toStr(dict.getNdbError()));
189  if ((column_B0_cfloat = table_B0.getColumn("cfloat")) == null)
190  throw new RuntimeException(toStr(dict.getNdbError()));
191  if ((column_B0_cdouble = table_B0.getColumn("cdouble")) == null)
192  throw new RuntimeException(toStr(dict.getNdbError()));
193  if ((column_B0_a_id = table_B0.getColumn("a_id")) == null)
194  throw new RuntimeException(toStr(dict.getNdbError()));
195  if ((column_B0_cvarbinary_def = table_B0.getColumn("cvarbinary_def")) == null)
196  throw new RuntimeException(toStr(dict.getNdbError()));
197  if ((column_B0_cvarchar_def = table_B0.getColumn("cvarchar_def")) == null)
198  throw new RuntimeException(toStr(dict.getNdbError()));
199 
200  // get indexes of table B0
201  if ((idx_B0_a_id = dict.getIndex("I_B0_FK", "b0")) == null)
202  throw new RuntimeException(toStr(dict.getNdbError()));
203 
204  // get common attribute ids for tables A, B0
205  attr_id = column_A_id.getColumnNo();
206  if (attr_id != column_B0_id.getColumnNo())
207  throw new RuntimeException("attribute id mismatch");
208  attr_cint = column_A_cint.getColumnNo();
209  if (attr_cint != column_B0_cint.getColumnNo())
210  throw new RuntimeException("attribute id mismatch");
211  attr_clong = column_A_clong.getColumnNo();
212  if (attr_clong != column_B0_clong.getColumnNo())
213  throw new RuntimeException("attribute id mismatch");
214  attr_cfloat = column_A_cfloat.getColumnNo();
215  if (attr_cfloat != column_B0_cfloat.getColumnNo())
216  throw new RuntimeException("attribute id mismatch");
217  attr_cdouble = column_A_cdouble.getColumnNo();
218  if (attr_cdouble != column_B0_cdouble.getColumnNo())
219  throw new RuntimeException("attribute id mismatch");
220 
221  // get attribute ids for table B0
222  attr_B0_a_id = column_B0_a_id.getColumnNo();
223  attr_B0_cvarbinary_def = column_B0_cvarbinary_def.getColumnNo();
224  attr_B0_cvarchar_def = column_B0_cvarchar_def.getColumnNo();
225  }
226  };
227 
228  protected void initOperations() {
229  out.print("initializing operations ...");
230  out.flush();
231 
232  //out.println("default charset: "
233  // + java.nio.charset.Charset.defaultCharset().displayName());
234 
235  for (boolean f = false, done = false; !done; done = f, f = true) {
236  // inner classes can only refer to a constant
237  final boolean batch = f;
238  final boolean forceSend = f;
239  final boolean setAttrs = true;
240 
241  ops.add(
242  new Op("insA" + (batch ? "_batch" : "")) {
243  public void run(int nOps) {
244  ins(model.table_A, 1, nOps, !setAttrs, batch);
245  }
246  });
247 
248  ops.add(
249  new Op("insB0" + (batch ? "_batch" : "")) {
250  public void run(int nOps) {
251  ins(model.table_B0, 1, nOps, !setAttrs, batch);
252  }
253  });
254 
255  ops.add(
256  new Op("setAByPK" + (batch ? "_batch" : "")) {
257  public void run(int nOps) {
258  setByPK(model.table_A, 1, nOps, batch);
259  }
260  });
261 
262  ops.add(
263  new Op("setB0ByPK" + (batch ? "_batch" : "")) {
264  public void run(int nOps) {
265  setByPK(model.table_B0, 1, nOps, batch);
266  }
267  });
268 
269  ops.add(
270  new Op("getAByPK_bb" + (batch ? "_batch" : "")) {
271  public void run(int nOps) {
272  getByPK_bb(model.table_A, 1, nOps, batch);
273  }
274  });
275 
276  ops.add(
277  new Op("getAByPK_ar" + (batch ? "_batch" : "")) {
278  public void run(int nOps) {
279  getByPK_ar(model.table_A, 1, nOps, batch);
280  }
281  });
282 
283  ops.add(
284  new Op("getB0ByPK_bb" + (batch ? "_batch" : "")) {
285  public void run(int nOps) {
286  getByPK_bb(model.table_B0, 1, nOps, batch);
287  }
288  });
289 
290  ops.add(
291  new Op("getB0ByPK_ar" + (batch ? "_batch" : "")) {
292  public void run(int nOps) {
293  getByPK_ar(model.table_B0, 1, nOps, batch);
294  }
295  });
296 
297  for (int i = 0, l = 1; l <= maxVarbinaryBytes; l *= 10, i++) {
298  final byte[] b = bytes[i];
299  assert l == b.length;
300 
301  ops.add(
302  new Op("setVarbinary" + l + (batch ? "_batch" : "")) {
303  public void run(int nOps) {
304  setVarbinary(model.table_B0, 1, nOps, batch, b);
305  }
306  });
307 
308  ops.add(
309  new Op("getVarbinary" + l + (batch ? "_batch" : "")) {
310  public void run(int nOps) {
311  getVarbinary(model.table_B0, 1, nOps, batch, b);
312  }
313  });
314 
315  ops.add(
316  new Op("clearVarbinary" + l + (batch ? "_batch" : "")) {
317  public void run(int nOps) {
318  setVarbinary(model.table_B0, 1, nOps, batch, null);
319  }
320  });
321  }
322 
323  for (int i = 0, l = 1; l <= maxVarcharChars; l *= 10, i++) {
324  final String s = strings[i];
325  assert l == s.length();
326 
327  ops.add(
328  new Op("setVarchar" + l + (batch ? "_batch" : "")) {
329  public void run(int nOps) {
330  setVarchar(model.table_B0, 1, nOps, batch, s);
331  }
332  });
333 
334  ops.add(
335  new Op("getVarchar" + l + (batch ? "_batch" : "")) {
336  public void run(int nOps) {
337  getVarchar(model.table_B0, 1, nOps, batch, s);
338  }
339  });
340 
341  ops.add(
342  new Op("clearVarchar" + l + (batch ? "_batch" : "")) {
343  public void run(int nOps) {
344  setVarchar(model.table_B0, 1, nOps, batch, null);
345  }
346  });
347  }
348 
349  ops.add(
350  new Op("setB0->A" + (batch ? "_batch" : "")) {
351  public void run(int nOps) {
352  setB0ToA(nOps, batch);
353  }
354  });
355 
356  ops.add(
357  new Op("navB0->A" + (batch ? "_batch" : "")) {
358  public void run(int nOps) {
359  navB0ToA(nOps, batch);
360  }
361  });
362 
363  ops.add(
364  new Op("navB0->A_alt" + (batch ? "_batch" : "")) {
365  public void run(int nOps) {
366  navB0ToAalt(nOps, batch);
367  }
368  });
369 
370  ops.add(
371  new Op("navA->B0" + (forceSend ? "_forceSend" : "")) {
372  public void run(int nOps) {
373  navAToB0(nOps, forceSend);
374  }
375  });
376 
377  ops.add(
378  new Op("navA->B0_alt" + (forceSend ? "_forceSend" : "")) {
379  public void run(int nOps) {
380  navAToB0alt(nOps, forceSend);
381  }
382  });
383 
384  ops.add(
385  new Op("nullB0->A" + (batch ? "_batch" : "")) {
386  public void run(int nOps) {
387  nullB0ToA(nOps, batch);
388  }
389  });
390 
391  ops.add(
392  new Op("delB0ByPK" + (batch ? "_batch" : "")) {
393  public void run(int nOps) {
394  delByPK(model.table_B0, 1, nOps, batch);
395  }
396  });
397 
398  ops.add(
399  new Op("delAByPK" + (batch ? "_batch" : "")) {
400  public void run(int nOps) {
401  delByPK(model.table_A, 1, nOps, batch);
402  }
403  });
404 
405  ops.add(
406  new Op("insA_attr" + (batch ? "_batch" : "")) {
407  public void run(int nOps) {
408  ins(model.table_A, 1, nOps, setAttrs, batch);
409  }
410  });
411 
412  ops.add(
413  new Op("insB0_attr" + (batch ? "_batch" : "")) {
414  public void run(int nOps) {
415  ins(model.table_B0, 1, nOps, setAttrs, batch);
416  }
417  });
418 
419  ops.add(
420  new Op("delAllB0" + (batch ? "_batch" : "")) {
421  public void run(int nOps) {
422  final int count = delByScan(model.table_B0, batch);
423  assert count == nOps;
424  }
425  });
426 
427  ops.add(
428  new Op("delAllA" + (batch ? "_batch" : "")) {
429  public void run(int nOps) {
430  final int count = delByScan(model.table_A, batch);
431  assert count == nOps;
432  }
433  });
434  }
435 
436  out.println(" [Op: " + ops.size() + "]");
437  }
438 
439  protected void closeOperations() {
440  out.print("closing operations ...");
441  out.flush();
442  ops.clear();
443  out.println(" [ok]");
444  }
445 
446  protected void beginTransaction() {
447  // start a transaction
448  // must be closed with NdbTransaction.close
449  final TableConst table = null;
450  final ByteBuffer keyData = null;
451  final int keyLen = 0;
452  if ((tx = ndb.startTransaction(table, keyData, keyLen)) == null)
453  throw new RuntimeException(toStr(ndb.getNdbError()));
454  }
455 
456  protected void executeOperations() {
457  // execute but don't commit the current transaction
458  final int execType = NdbTransaction.ExecType.NoCommit;
459  final int abortOption = NdbOperation.AbortOption.AbortOnError;
460  final int force = 0;
461  if (tx.execute(execType, abortOption, force) != 0
462  || tx.getNdbError().status() != NdbError.Status.Success)
463  throw new RuntimeException(toStr(tx.getNdbError()));
464  }
465 
466  protected void commitTransaction() {
467  // commit the current transaction
468  final int execType = NdbTransaction.ExecType.Commit;
469  final int abortOption = NdbOperation.AbortOption.AbortOnError;
470  final int force = 0;
471  if (tx.execute(execType, abortOption, force) != 0
472  || tx.getNdbError().status() != NdbError.Status.Success)
473  throw new RuntimeException(toStr(tx.getNdbError()));
474  }
475 
476  protected void rollbackTransaction() {
477  // abort the current transaction
478  final int execType = NdbTransaction.ExecType.Rollback;
479  final int abortOption = NdbOperation.AbortOption.DefaultAbortOption;
480  final int force = 0;
481  if (tx.execute(execType, abortOption, force) != 0
482  || tx.getNdbError().status() != NdbError.Status.Success)
483  throw new RuntimeException(toStr(tx.getNdbError()));
484  }
485 
486  protected void closeTransaction() {
487  // close the current transaction
488  // to be called irrespectively of success or failure
489  // equivalent to tx.close()
490  ndb.closeTransaction(tx);
491  tx = null;
492  }
493 
494  // ----------------------------------------------------------------------
495 
496  static protected class CommonAB_RA {
497  public NdbRecAttr id;
498  public NdbRecAttr cint;
499  public NdbRecAttr clong;
500  public NdbRecAttr cfloat;
501  public NdbRecAttr cdouble;
502  };
503 
504  protected void fetchCommonAttributes(CommonAB_RA cab, NdbOperation op) {
505  final ByteBuffer val = null;
506  if ((cab.id = op.getValue(model.attr_id, val)) == null)
507  throw new RuntimeException(toStr(tx.getNdbError()));
508  if ((cab.cint = op.getValue(model.attr_cint, val)) == null)
509  throw new RuntimeException(toStr(tx.getNdbError()));
510  if ((cab.clong = op.getValue(model.attr_clong, val)) == null)
511  throw new RuntimeException(toStr(tx.getNdbError()));
512  if ((cab.cfloat = op.getValue(model.attr_cfloat, val)) == null)
513  throw new RuntimeException(toStr(tx.getNdbError()));
514  if ((cab.cdouble = op.getValue(model.attr_cdouble, val)) == null)
515  throw new RuntimeException(toStr(tx.getNdbError()));
516  }
517 
518  protected int verifyCommonAttributes(CommonAB_RA cab) {
519  final int id = cab.id.int32_value();
520  final int cint = cab.cint.int32_value();
521  verify(id == cint);
522  final long clong = cab.clong.int64_value();
523  verify(clong == cint);
524  final float cfloat = cab.cfloat.float_value();
525  verify(cfloat == cint);
526  final double cdouble = cab.cdouble.double_value();
527  verify(cdouble == cint);
528  return cint;
529  }
530 
531  protected void ins(TableConst table, int from, int to,
532  boolean setAttrs, boolean batch) {
533  beginTransaction();
534  for (int i = from; i <= to; i++) {
535  // get an insert operation for the table
536  NdbOperation op = tx.getNdbOperation(table);
537  if (op == null)
538  throw new RuntimeException(toStr(tx.getNdbError()));
539  if (op.insertTuple() != 0)
540  throw new RuntimeException(toStr(tx.getNdbError()));
541 
542  // set values; key attribute needs to be set first
543  if (op.equal(model.attr_id, i) != 0)
544  throw new RuntimeException(toStr(tx.getNdbError()));
545  if (setAttrs) {
546  if (op.setValue(model.attr_cint, -i) != 0)
547  throw new RuntimeException(toStr(tx.getNdbError()));
548  if (op.setValue(model.attr_clong, (long)-i) != 0)
549  throw new RuntimeException(toStr(tx.getNdbError()));
550  if (op.setValue(model.attr_cfloat, (float)-i) != 0)
551  throw new RuntimeException(toStr(tx.getNdbError()));
552  if (op.setValue(model.attr_cdouble, (double)-i) != 0)
553  throw new RuntimeException(toStr(tx.getNdbError()));
554  }
555 
556  // execute the operation now if in non-batching mode
557  if (!batch)
558  executeOperations();
559  }
560  commitTransaction();
561  closeTransaction();
562  }
563 
564  protected void delByPK(TableConst table, int from, int to,
565  boolean batch) {
566  beginTransaction();
567  for (int i = from; i <= to; i++) {
568  // get a delete operation for the table
569  NdbOperation op = tx.getNdbOperation(table);
570  if (op == null)
571  throw new RuntimeException(toStr(tx.getNdbError()));
572  if (op.deleteTuple() != 0)
573  throw new RuntimeException(toStr(tx.getNdbError()));
574 
575  // set key attribute
576  if (op.equal(model.attr_id, i) != 0)
577  throw new RuntimeException(toStr(tx.getNdbError()));
578 
579  // execute the operation now if in non-batching mode
580  if (!batch)
581  executeOperations();
582  }
583  commitTransaction();
584  closeTransaction();
585  }
586 
587  protected int delByScan(TableConst table, boolean batch) {
588  beginTransaction();
589 
590  // get a full table scan operation (no scan filter defined)
591  final NdbScanOperation op = tx.getNdbScanOperation(table);
592  if (op == null)
593  throw new RuntimeException(toStr(tx.getNdbError()));
594 
595  // define a read scan with exclusive locks
596  final int lock_mode = NdbOperation.LockMode.LM_Exclusive;
597  final int scan_flags = 0;
598  final int parallel = 0;
599  final int batch_ = 0;
600  if (op.readTuples(lock_mode, scan_flags, parallel, batch_) != 0)
601  throw new RuntimeException(toStr(tx.getNdbError()));
602 
603  // start the scan; don't commit yet
604  executeOperations();
605 
606  // delete all rows in a given scan
607  int count = 0;
608  int stat;
609  final boolean allowFetch = true; // request new batches when exhausted
610  final boolean forceSend = false; // send may be delayed
611  while ((stat = op.nextResult(allowFetch, forceSend)) == 0) {
612  // delete all tuples within a batch
613  do {
614  if (op.deleteCurrentTuple() != 0)
615  throw new RuntimeException(toStr(tx.getNdbError()));
616  count++;
617 
618  // execute the operation now if in non-batching mode
619  if (!batch)
620  executeOperations();
621  } while ((stat = op.nextResult(!allowFetch, forceSend)) == 0);
622 
623  if (stat == 1) {
624  // no more batches
625  break;
626  }
627  if (stat == 2) {
628  // end of current batch, fetch next
629  final int execType = NdbTransaction.ExecType.NoCommit;
630  final int abortOption = NdbOperation.AbortOption.AbortOnError;
631  final int force = 0;
632  if (tx.execute(execType, abortOption, force) != 0
633  || tx.getNdbError().status() != NdbError.Status.Success)
634  throw new RuntimeException(toStr(tx.getNdbError()));
635  continue;
636  }
637  throw new RuntimeException("stat == " + stat);
638  }
639  if (stat != 1)
640  throw new RuntimeException("stat == " + stat);
641 
642  // close the scan
643  final boolean forceSend_ = false;
644  final boolean releaseOp = false;
645  op.close(forceSend_, releaseOp);
646 
647  commitTransaction();
648  closeTransaction();
649  return count;
650  }
651 
652  protected void setByPK(TableConst table, int from, int to,
653  boolean batch) {
654  beginTransaction();
655  for (int i = from; i <= to; i++) {
656  // get an update operation for the table
657  NdbOperation op = tx.getNdbOperation(table);
658  if (op == null)
659  throw new RuntimeException(toStr(tx.getNdbError()));
660  if (op.updateTuple() != 0)
661  throw new RuntimeException(toStr(tx.getNdbError()));
662 
663  // set key attribute
664  if (op.equal(model.attr_id, i) != 0)
665  throw new RuntimeException(toStr(tx.getNdbError()));
666 
667  // set values
668  if (op.setValue(model.attr_cint, i) != 0)
669  throw new RuntimeException(toStr(tx.getNdbError()));
670  if (op.setValue(model.attr_clong, (long)i) != 0)
671  throw new RuntimeException(toStr(tx.getNdbError()));
672  if (op.setValue(model.attr_cfloat, (float)i) != 0)
673  throw new RuntimeException(toStr(tx.getNdbError()));
674  if (op.setValue(model.attr_cdouble, (double)i) != 0)
675  throw new RuntimeException(toStr(tx.getNdbError()));
676 
677  // execute the operation now if in non-batching mode
678  if (!batch)
679  executeOperations();
680  }
681  commitTransaction();
682  closeTransaction();
683  }
684 
685  protected void fetchCommonAttributes(ByteBuffer cab, NdbOperation op) {
686  if (op.getValue(model.attr_id, cab) == null)
687  throw new RuntimeException(toStr(tx.getNdbError()));
688  int p = cab.position();
689  //out.println("cab.position() == " + p);
690  cab.position(p += 4);
691  if (op.getValue(model.attr_cint, cab) == null)
692  throw new RuntimeException(toStr(tx.getNdbError()));
693  cab.position(p += 4);
694  if (op.getValue(model.attr_clong, cab) == null)
695  throw new RuntimeException(toStr(tx.getNdbError()));
696  cab.position(p += 8);
697  if (op.getValue(model.attr_cfloat, cab) == null)
698  throw new RuntimeException(toStr(tx.getNdbError()));
699  cab.position(p += 4);
700  if (op.getValue(model.attr_cdouble, cab) == null)
701  throw new RuntimeException(toStr(tx.getNdbError()));
702  cab.position(p += 8);
703  }
704 
705  protected int verifyCommonAttributes(ByteBuffer cab) {
706  final int id = cab.getInt();
707  final int cint = cab.getInt();
708  final long clong = cab.getLong();
709  final float cfloat = cab.getFloat();
710  final double cdouble = cab.getDouble();
711 
712  if (false) {
713  out.println("id == " + id);
714  out.println("cint == " + cint);
715  out.println("clong == " + clong);
716  out.println("cfloat == " + cfloat);
717  out.println("cdouble == " + cdouble);
718  }
719  if (false) {
720  verify(cint == id);
721  verify(clong == cint);
722  verify(cfloat == cint);
723  verify(cdouble == cint);
724  }
725  return cint;
726  }
727 
728  protected void getByPK_bb(TableConst table, int from, int to,
729  boolean batch) {
730  // operation results
731  final int count = (to - from) + 1;
732  final ByteBuffer cab = ByteBuffer.allocateDirect(count * 28);
733  cab.order(ByteOrder.nativeOrder());
734 
735  beginTransaction();
736  for (int i = 0, j = from; i < count; i++, j++) {
737  // get a read operation for the table
738  NdbOperation op = tx.getNdbOperation(table);
739  if (op == null)
740  throw new RuntimeException(toStr(tx.getNdbError()));
741  if (op.readTuple(NdbOperation.LockMode.LM_CommittedRead) != 0)
742  throw new RuntimeException(toStr(tx.getNdbError()));
743 
744  // set key attribute
745  if (op.equal(model.attr_id, j) != 0)
746  throw new RuntimeException(toStr(tx.getNdbError()));
747 
748  // get attributes (not readable until after commit)
749  fetchCommonAttributes(cab, op);
750 
751  // execute the operation now if in non-batching mode
752  if (!batch)
753  executeOperations();
754  }
755  commitTransaction();
756 
757  // check fetched values
758  cab.rewind();
759 
760  for (int i = 0, j = from; i < count; i++, j++) {
761  // check other attributes
762  final int id1 = verifyCommonAttributes(cab);
763  verify(id1 == j);
764  }
765  closeTransaction();
766  }
767 
768  protected void getByPK_ar(TableConst table, int from, int to,
769  boolean batch) {
770  // operation results
771  final int count = (to - from) + 1;
772  final CommonAB_RA[] cab_ra = new CommonAB_RA[count];
773 
774  beginTransaction();
775  for (int i = 0, j = from; i < count; i++, j++) {
776  // get a read operation for the table
777  NdbOperation op = tx.getNdbOperation(table);
778  if (op == null)
779  throw new RuntimeException(toStr(tx.getNdbError()));
780  if (op.readTuple(NdbOperation.LockMode.LM_CommittedRead) != 0)
781  throw new RuntimeException(toStr(tx.getNdbError()));
782 
783  // set key attribute
784  if (op.equal(model.attr_id, j) != 0)
785  throw new RuntimeException(toStr(tx.getNdbError()));
786 
787  // get attributes (not readable until after commit)
788  final CommonAB_RA c = new CommonAB_RA();
789  //if ((c.id = op.getValue(model.attr_id, null)) == null)
790  // throw new RuntimeException(toStr(tx.getNdbError()));
791  fetchCommonAttributes(c, op);
792  cab_ra[i] = c;
793 
794  // execute the operation now if in non-batching mode
795  if (!batch)
796  executeOperations();
797  }
798  commitTransaction();
799 
800  // check fetched values
801  for (int i = 0, j = from; i < count; i++, j++) {
802  //check key attribute
803  verify(cab_ra[i].id.int32_value() == j);
804 
805  // check other attributes
806  final int id1 = verifyCommonAttributes(cab_ra[i]);
807  verify(id1 == j);
808  }
809  closeTransaction();
810  }
811 
812  protected void setVarbinary(TableConst table, int from, int to,
813  boolean batch, byte[] bytes) {
814 // XXX Buffer overflow, need to debug
815 /*
816  final ByteBuffer buf;
817  if (bytes == null) {
818  buf = null;
819  } else {
820  final int slen = bytes.length;
821  // XXX assumes column declared as VARBINARY/CHAR(<255)
822  final int sbuf = 1 + slen;
823  // XXX buffer overflow if slen >255!!!
824  assert (slen < 255);
825  buf = ByteBuffer.allocateDirect(slen);
826  //buf.order(ByteOrder.nativeOrder());
827  buf.put((byte)slen);
828  buf.put(bytes, 0, slen);
829  buf.flip();
830  }
831 
832  beginTransaction();
833  for (int i = from; i <= to; i++) {
834  // get an insert operation for the table
835  NdbOperation op = tx.getNdbOperation(table);
836  if (op == null)
837  throw new RuntimeException(toStr(tx.getNdbError()));
838  if (op.updateTuple() != 0)
839  throw new RuntimeException(toStr(tx.getNdbError()));
840 
841  // set key attribute
842  if (op.equal(model.attr_id, i) != 0)
843  throw new RuntimeException(toStr(tx.getNdbError()));
844 
845  // set values
846  if (op.setValue(model.attr_B0_cvarbinary_def, buf) != 0)
847  throw new RuntimeException(toStr(tx.getNdbError()));
848 
849  // execute the operation now if in non-batching mode
850  if (!batch)
851  executeOperations();
852  }
853  commitTransaction();
854  closeTransaction();
855 */
856  }
857 
858  protected void setVarchar(TableConst table, int from, int to,
859  boolean batch, String string) {
860 // XXX not implemented yet
861  }
862 
863  protected void getVarbinary(TableConst table, int from, int to,
864  boolean batch, byte[] bytes) {
865 // XXX not implemented yet
866  }
867 
868  protected void getVarchar(TableConst table, int from, int to,
869  boolean batch, String string) {
870 // XXX not implemented yet
871  }
872 
873  protected void setB0ToA(int nOps, boolean batch) {
874 // XXX not implemented yet
875  }
876 
877  protected void nullB0ToA(int nOps, boolean batch) {
878 // XXX not implemented yet
879  }
880 
881  protected void navB0ToA(int nOps, boolean batch) {
882 // XXX not implemented yet
883  }
884 
885  protected void navB0ToAalt(int nOps, boolean batch) {
886 // XXX not implemented yet
887  }
888 
889  protected void navAToB0(int nOps, boolean forceSend) {
890 // XXX not implemented yet
891  }
892 
893  protected void navAToB0alt(int nOps, boolean forceSend) {
894 // XXX not implemented yet
895  }
896 
897  // ----------------------------------------------------------------------
898  // NDB JTie datastore operations
899  // ----------------------------------------------------------------------
900 
901  protected void initConnection() {
902  out.println();
903 
904  // optionally, connect and wait for reaching the data nodes (ndbds)
905  out.print("waiting for ndbd ...");
906  out.flush();
907  final int initial_wait = 10; // secs to wait until first node detected
908  final int final_wait = 0; // secs to wait after first node detected
909  // returns: 0 all nodes live, > 0 at least one node live, < 0 error
910  if (mgmd.wait_until_ready(initial_wait, final_wait) < 0) {
911  final String msg = ("data nodes were not ready within "
912  + (initial_wait + final_wait) + "s.");
913  out.println(msg);
914  throw new RuntimeException(msg);
915  }
916  out.println(" [ok]");
917 
918  // connect to database
919  out.print("connecting to ndbd ...");
920  out.flush();
921  ndb = Ndb.create(mgmd, catalog, schema);
922  final int max_no_tx = 10; // maximum number of parallel tx (<=1024)
923  // note each scan or index scan operation uses one extra transaction
924  if (ndb.init(max_no_tx) != 0) {
925  String msg = "Error caught: " + ndb.getNdbError().message();
926  throw new RuntimeException(msg);
927  }
928  out.println(" [ok]");
929 
930  // initialize the schema shortcuts
931  model = new Model(ndb);
932  }
933 
934  protected void closeConnection() {
935  out.println();
936  out.print("closing ndbd connection ...");
937  out.flush();
938  model = null;
939  Ndb.delete(ndb);
940  ndb = null;
941  out.println(" [ok]");
942  }
943 
944  protected void clearData() {
945  out.print("deleting all rows ...");
946  out.flush();
947  final int delB0 = delByScan(model.table_B0, true);
948  out.print(" [B0: " + delB0);
949  out.flush();
950  final int delA = delByScan(model.table_A, true);
951  out.print(", A: " + delA);
952  out.flush();
953  out.println("]");
954  }
955 
956  // ----------------------------------------------------------------------
957 
958  static public void main(String[] args) {
959  System.out.println("NdbJTieLoad.main()");
960  parseArguments(args);
961  new NdbJTieLoad().run();
962  System.out.println();
963  System.out.println("NdbJTieLoad.main(): done.");
964  }
965 }