MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
NdbjLoad.java
1 /* -*- mode: java; c-basic-offset: 4; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=4:tabstop=4:smarttab:
3  *
4  * Copyright (c) 2010, 2011, Oracle and/or its affiliates. All rights reserved.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; version 2 of the License.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18  */
19 
20 package com.mysql.cluster.crund;
21 
22 import java.util.Arrays;
23 
24 //import com.mysql.cluster.ndbj.*;
25 import com.mysql.cluster.ndbj.NdbClusterConnection;
26 import com.mysql.cluster.ndbj.Ndb;
27 import com.mysql.cluster.ndbj.NdbDictionary;
28 import com.mysql.cluster.ndbj.NdbTable;
29 import com.mysql.cluster.ndbj.NdbColumn;
30 import com.mysql.cluster.ndbj.NdbIndex;
31 import com.mysql.cluster.ndbj.NdbError;
32 import com.mysql.cluster.ndbj.NdbApiException;
33 import com.mysql.cluster.ndbj.NdbTransaction;
34 //import com.mysql.cluster.ndbj.ExecType; // ndbj-0.7.0
35 import com.mysql.cluster.ndbj.NdbTransaction.ExecType; // ndbj-0.7.1
36 import com.mysql.cluster.ndbj.NdbOperation;
37 import com.mysql.cluster.ndbj.NdbOperation.LockMode;
38 //import com.mysql.cluster.ndbj.AbortOption; // ndbj-0.7.0
39 import com.mysql.cluster.ndbj.NdbOperation.AbortOption; // ndbj-0.7.1
40 import com.mysql.cluster.ndbj.NdbScanOperation;
41 import com.mysql.cluster.ndbj.NdbIndexScanOperation;
42 import com.mysql.cluster.ndbj.NdbIndexScanOperation.BoundType;
43 import com.mysql.cluster.ndbj.NdbResultSet;
44 
45 
49 public class NdbjLoad extends NdbBase {
50 
51  // ----------------------------------------------------------------------
52  // NDBJ resources
53  // ----------------------------------------------------------------------
54 
55  // singleton object representing the NDB cluster (one per process)
56  protected NdbClusterConnection mgmd;
57 
58  // object representing a connection to an NDB database
59  protected Ndb ndb;
60 
61  // the benchmark's metadata shortcuts
62  protected Model model;
63 
64  // object representing an NDB database transaction
65  protected NdbTransaction tx;
66 
67  // ----------------------------------------------------------------------
68  // NDBJ intializers/finalizers
69  // ----------------------------------------------------------------------
70 
71  protected void initProperties() {
72  super.initProperties();
73  descr = "->ndbj->ndbapi(" + mgmdConnect + ")";
74  }
75 
76  protected void initLoad() throws Exception {
77  // XXX support generic load class
78  //super.init();
79 
80  // load native library (better diagnostics doing it explicitely)
81  out.println();
82  loadSystemLibrary("ndbj");
83 
84  // instantiate NDB cluster singleton
85  out.println();
86  out.print("creating cluster conn...");
87  out.flush();
88  mgmd = NdbClusterConnection.create(mgmdConnect);
89  assert mgmd != null;
90  out.println(" [ok]");
91 
92  // connect to cluster management node (ndb_mgmd)
93  out.print("connecting to mgmd ...");
94  out.flush();
95  final int retries = 0; // retries (< 0 = indefinitely)
96  final int delay = 0; // seconds to wait after retry
97  final boolean verbose = true; // print report of progess
98  // 0 = success, 1 = recoverable error, -1 = non-recoverable error
99  if (mgmd.connect(retries, delay, verbose) != 0) {
100  final String msg = ("mgmd@" + mgmdConnect
101  + " was not ready within "
102  + (retries * delay) + "s.");
103  out.println(msg);
104  throw new RuntimeException("!!! " + msg);
105  }
106  out.println(" [ok: " + mgmdConnect + "]");
107  }
108 
109  protected void closeLoad() throws Exception {
110  out.println();
111  out.print("closing mgmd connection ...");
112  out.flush();
113  if (mgmd != null)
114  mgmd.close();
115  mgmd = null;
116  out.println(" [ok]");
117 
118  // XXX support generic load class
119  //super.close();
120  }
121 
122  // ----------------------------------------------------------------------
123  // NDBJ operations
124  // ----------------------------------------------------------------------
125 
126  // returns a string representation of an NdbError
127  static protected String toStr(NdbError e) {
128  return "NdbError[" + e.getCode() + "]: " + e.getMessage();
129  }
130 
131  // holds shortcuts to the benchmark's schema information
132  static protected class Model {
133  public final NdbTable table_A;
134  public final NdbTable table_B0;
135  public final NdbColumn column_A_id;
136  public final NdbColumn column_A_cint;
137  public final NdbColumn column_A_clong;
138  public final NdbColumn column_A_cfloat;
139  public final NdbColumn column_A_cdouble;
140  public final NdbColumn column_B0_id;
141  public final NdbColumn column_B0_cint;
142  public final NdbColumn column_B0_clong;
143  public final NdbColumn column_B0_cfloat;
144  public final NdbColumn column_B0_cdouble;
145  public final NdbColumn column_B0_a_id;
146  public final NdbColumn column_B0_cvarbinary_def;
147  public final NdbColumn column_B0_cvarchar_def;
148  public final NdbIndex idx_B0_a_id;
149  public final int attr_id;
150  public final int attr_cint;
151  public final int attr_clong;
152  public final int attr_cfloat;
153  public final int attr_cdouble;
154  public final int attr_B0_a_id;
155  public final int attr_B0_cvarbinary_def;
156  public final int attr_B0_cvarchar_def;
157 
158  // XXX need names due to broken
159  // NdbOperation.getValue(int), NdbResultSet.getXXX(int)
160  public final String name_id;
161  public final String name_cint;
162  public final String name_clong;
163  public final String name_cfloat;
164  public final String name_cdouble;
165  public final String name_B0_a_id;
166  public final String name_B0_cvarbinary_def;
167  public final String name_B0_cvarchar_def;
168 
169  // initialize this instance from the dictionary
170  public Model(Ndb ndb) throws NdbApiException {
171  final NdbDictionary dict = ndb.getDictionary();
172 
173  // get columns of table A
174  if ((table_A = dict.getTable("a")) == null)
175  throw new RuntimeException(toStr(dict.getNdbError()));
176  if ((column_A_id = table_A.getColumn("id")) == null)
177  throw new RuntimeException(toStr(dict.getNdbError()));
178  if ((column_A_cint = table_A.getColumn("cint")) == null)
179  throw new RuntimeException(toStr(dict.getNdbError()));
180  if ((column_A_clong = table_A.getColumn("clong")) == null)
181  throw new RuntimeException(toStr(dict.getNdbError()));
182  if ((column_A_cfloat = table_A.getColumn("cfloat")) == null)
183  throw new RuntimeException(toStr(dict.getNdbError()));
184  if ((column_A_cdouble = table_A.getColumn("cdouble")) == null)
185  throw new RuntimeException(toStr(dict.getNdbError()));
186 
187  // get columns of table B0
188  if ((table_B0 = dict.getTable("b0")) == null)
189  throw new RuntimeException(toStr(dict.getNdbError()));
190  if ((column_B0_id = table_B0.getColumn("id")) == null)
191  throw new RuntimeException(toStr(dict.getNdbError()));
192  if ((column_B0_cint = table_B0.getColumn("cint")) == null)
193  throw new RuntimeException(toStr(dict.getNdbError()));
194  if ((column_B0_clong = table_B0.getColumn("clong")) == null)
195  throw new RuntimeException(toStr(dict.getNdbError()));
196  if ((column_B0_cfloat = table_B0.getColumn("cfloat")) == null)
197  throw new RuntimeException(toStr(dict.getNdbError()));
198  if ((column_B0_cdouble = table_B0.getColumn("cdouble")) == null)
199  throw new RuntimeException(toStr(dict.getNdbError()));
200  if ((column_B0_a_id = table_B0.getColumn("a_id")) == null)
201  throw new RuntimeException(toStr(dict.getNdbError()));
202  if ((column_B0_cvarbinary_def = table_B0.getColumn("cvarbinary_def")) == null)
203  throw new RuntimeException(toStr(dict.getNdbError()));
204  if ((column_B0_cvarchar_def = table_B0.getColumn("cvarchar_def")) == null)
205  throw new RuntimeException(toStr(dict.getNdbError()));
206 
207  // get indexes of table B0
208  if ((idx_B0_a_id = dict.getIndex("I_B0_FK", "b0")) == null)
209  throw new RuntimeException(toStr(dict.getNdbError()));
210 
211  // get common attribute ids for tables A, B0
212  attr_id = column_A_id.getColumnNo();
213  if (attr_id != column_B0_id.getColumnNo())
214  throw new RuntimeException("attribute id mismatch");
215  attr_cint = column_A_cint.getColumnNo();
216  if (attr_cint != column_B0_cint.getColumnNo())
217  throw new RuntimeException("attribute id mismatch");
218  attr_clong = column_A_clong.getColumnNo();
219  if (attr_clong != column_B0_clong.getColumnNo())
220  throw new RuntimeException("attribute id mismatch");
221  attr_cfloat = column_A_cfloat.getColumnNo();
222  if (attr_cfloat != column_B0_cfloat.getColumnNo())
223  throw new RuntimeException("attribute id mismatch");
224  attr_cdouble = column_A_cdouble.getColumnNo();
225  if (attr_cdouble != column_B0_cdouble.getColumnNo())
226  throw new RuntimeException("attribute id mismatch");
227 
228  // get attribute ids for table B0
229  attr_B0_a_id = column_B0_a_id.getColumnNo();
230  attr_B0_cvarbinary_def = column_B0_cvarbinary_def.getColumnNo();
231  attr_B0_cvarchar_def = column_B0_cvarchar_def.getColumnNo();
232 
233  // XXX need names due to broken
234  // NdbOperation.getValue(int), NdbResultSet.getXXX(int)
235  name_id = column_A_id.getName();
236  if (!name_id.equals(column_B0_id.getName()))
237  throw new RuntimeException("attribute name mismatch");
238  name_cint = column_A_cint.getName();
239  if (!name_cint.equals(column_B0_cint.getName()))
240  throw new RuntimeException("attribute name mismatch");
241  name_clong = column_A_clong.getName();
242  if (!name_clong.equals(column_B0_clong.getName()))
243  throw new RuntimeException("attribute name mismatch");
244  name_cfloat = column_A_cfloat.getName();
245  if (!name_cfloat.equals(column_B0_cfloat.getName()))
246  throw new RuntimeException("attribute name mismatch");
247  name_cdouble = column_A_cdouble.getName();
248  if (!name_cdouble.equals(column_B0_cdouble.getName()))
249  throw new RuntimeException("attribute name mismatch");
250  name_B0_a_id = column_B0_a_id.getName();
251  name_B0_cvarbinary_def = column_B0_cvarbinary_def.getName();
252  name_B0_cvarchar_def = column_B0_cvarchar_def.getName();
253  }
254  };
255 
256  // some string and byte literals
257  final String string1 = "i";
258  final String string10 = "xxxxxxxxxx";
259  final String string100 = "cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc";
260  final String string1000 = "mmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmm";
261  final byte[] bytes1 = string1.getBytes();
262  final byte[] bytes10 = string10.getBytes();
263  final byte[] bytes100 = string100.getBytes();
264  final byte[] bytes1000 = string1000.getBytes();
265  final String[] strings = { string1, string10, string100 };
266  final byte[][] bytes = { bytes1, bytes10, bytes100 };
267 
268  protected void initOperations() throws NdbApiException {
269  out.print("initializing operations ...");
270  out.flush();
271 
272  //out.println("default charset: "
273  // + java.nio.charset.Charset.defaultCharset().displayName());
274 
275  for (boolean f = false, done = false; !done; done = f, f = true) {
276  // inner classes can only refer to a constant
277  final boolean batch = f;
278  final boolean forceSend = f;
279  final boolean setAttrs = true;
280 
281  ops.add(
282  new Op("insA" + (batch ? "_batch" : "")) {
283  public void run(int nOps)
284  throws NdbApiException {
285  ins(model.table_A, 1, nOps, !setAttrs, batch);
286  }
287  });
288 
289  ops.add(
290  new Op("insB0" + (batch ? "_batch" : "")) {
291  public void run(int nOps)
292  throws NdbApiException {
293  ins(model.table_B0, 1, nOps, !setAttrs, batch);
294  }
295  });
296 
297  ops.add(
298  new Op("setAByPK" + (batch ? "_batch" : "")) {
299  public void run(int nOps)
300  throws NdbApiException {
301  setByPK(model.table_A, 1, nOps, batch);
302  }
303  });
304 
305  ops.add(
306  new Op("setB0ByPK" + (batch ? "_batch" : "")) {
307  public void run(int nOps)
308  throws NdbApiException {
309  setByPK(model.table_B0, 1, nOps, batch);
310  }
311  });
312 
313  ops.add(
314  new Op("getAByPK" + (batch ? "_batch" : "")) {
315  public void run(int nOps)
316  throws NdbApiException {
317  getByPK(model.table_A, 1, nOps, batch);
318  }
319  });
320 
321  ops.add(
322  new Op("getB0ByPK" + (batch ? "_batch" : "")) {
323  public void run(int nOps)
324  throws NdbApiException {
325  getByPK(model.table_B0, 1, nOps, batch);
326  }
327  });
328 
329  for (int i = 0, l = 1; l <= maxVarbinaryBytes; l *= 10, i++) {
330  final byte[] b = bytes[i];
331  assert l == b.length;
332 
333  ops.add(
334  new Op("setVarbinary" + l + (batch ? "_batch" : "")) {
335  public void run(int nOps)
336  throws NdbApiException {
337  setVarbinary(model.table_B0, 1, nOps, batch, b);
338  }
339  });
340 
341  ops.add(
342  new Op("getVarbinary" + l + (batch ? "_batch" : "")) {
343  public void run(int nOps)
344  throws NdbApiException {
345  getVarbinary(model.table_B0, 1, nOps, batch, b);
346  }
347  });
348  }
349 
350  for (int i = 0, l = 1; l <= maxVarcharChars; l *= 10, i++) {
351  final String s = strings[i];
352  assert l == s.length();
353 
354  ops.add(
355  new Op("setVarchar" + l + (batch ? "_batch" : "")) {
356  public void run(int nOps)
357  throws NdbApiException {
358  setVarchar(model.table_B0, 1, nOps, batch, s);
359  }
360  });
361 
362  ops.add(
363  new Op("getVarchar" + l + (batch ? "_batch" : "")) {
364  public void run(int nOps)
365  throws NdbApiException {
366  getVarchar(model.table_B0, 1, nOps, batch, s);
367  }
368  });
369  }
370 
371  ops.add(
372  new Op("setB0->A" + (batch ? "_batch" : "")) {
373  public void run(int nOps)
374  throws NdbApiException {
375  setB0ToA(nOps, batch);
376  }
377  });
378 
379  ops.add(
380  new Op("navB0->A" + (batch ? "_batch" : "")) {
381  public void run(int nOps)
382  throws NdbApiException {
383  navB0ToA(nOps, batch);
384  }
385  });
386 
387  ops.add(
388  new Op("navB0->A_alt" + (batch ? "_batch" : "")) {
389  public void run(int nOps)
390  throws NdbApiException {
391  navB0ToAalt(nOps, batch);
392  }
393  });
394 
395  // XXX exclude, NDB/J exceptions
396  ops.add(
397  new Op("navA->B0" + (forceSend ? "_forceSend" : "")) {
398  public void run(int nOps)
399  throws NdbApiException {
400  navAToB0(nOps, forceSend);
401  }
402  });
403 
404  // XXX exclude, not implemented yet
405  ops.add(
406  new Op("navA->B0_alt" + (forceSend ? "_forceSend" : "")) {
407  public void run(int nOps)
408  throws NdbApiException {
409  navAToB0alt(nOps, forceSend);
410  }
411  });
412 
413  ops.add(
414  new Op("nullB0->A" + (batch ? "_batch" : "")) {
415  public void run(int nOps)
416  throws NdbApiException {
417  nullB0ToA(nOps, batch);
418  }
419  });
420 
421  ops.add(
422  new Op("delB0ByPK" + (batch ? "_batch" : "")) {
423  public void run(int nOps)
424  throws NdbApiException {
425  delByPK(model.table_B0, 1, nOps, batch);
426  }
427  });
428 
429  ops.add(
430  new Op("delAByPK" + (batch ? "_batch" : "")) {
431  public void run(int nOps)
432  throws NdbApiException {
433  delByPK(model.table_A, 1, nOps, batch);
434  }
435  });
436 
437  ops.add(
438  new Op("insA_attr" + (batch ? "_batch" : "")) {
439  public void run(int nOps)
440  throws NdbApiException {
441  ins(model.table_A, 1, nOps, setAttrs, batch);
442  }
443  });
444 
445  ops.add(
446  new Op("insB0_attr" + (batch ? "_batch" : "")) {
447  public void run(int nOps)
448  throws NdbApiException {
449  ins(model.table_B0, 1, nOps, setAttrs, batch);
450  }
451  });
452 
453  ops.add(
454  new Op("delAllB0") {
455  public void run(int nOps)
456  throws NdbApiException {
457  final int count = delByScan(model.table_B0);
458  assert count == nOps;
459  }
460  });
461 
462  ops.add(
463  new Op("delAllA") {
464  public void run(int nOps)
465  throws NdbApiException {
466  final int count = delByScan(model.table_A);
467  assert count == nOps;
468  }
469  });
470  }
471 
472  out.println(" [Op: " + ops.size() + "]");
473  }
474 
475  protected void closeOperations() {
476  out.print("closing operations ...");
477  out.flush();
478  ops.clear();
479  out.println(" [ok]");
480  }
481 
482  protected void beginTransaction() throws NdbApiException {
483  // start a transaction
484  // must be closed with NdbTransaction.close
485  tx = ndb.startTransaction();
486  assert tx != null;
487  }
488 
489  protected void executeOperations() throws NdbApiException {
490  // execute but don't commit the current transaction
491  // XXX not documented: return value != 0 v throwing exception
492  // YYY Monty: should always throw exception -> void method
493  int stat = tx.execute(ExecType.NoCommit, AbortOption.AbortOnError);
494  if (stat != 0)
495  throw new RuntimeException("stat == " + stat);
496  }
497 
498  protected void commitTransaction() throws NdbApiException {
499  // commit the current transaction
500  // XXX not documented: return value != 0 v throwing exception
501  // YYY Monty: should always throw exception -> void method
502  assert tx != null;
503  int stat = tx.execute(ExecType.Commit, AbortOption.AbortOnError);
504  if (stat != 0)
505  throw new RuntimeException("stat == " + stat);
506  }
507 
508  protected void rollbackTransaction() throws NdbApiException {
509  // abort the current transaction
510  // XXX not documented: return value != 0 v throwing exception
511  // YYY Monty: should always throw exception -> void method
512  int stat = tx.execute(ExecType.Rollback);
513  if (stat != 0)
514  throw new RuntimeException("stat == " + stat);
515  }
516 
517  protected void closeTransaction() {
518  // close the current transaction
519  // to be called irrespectively of success or failure
520  tx.close();
521  tx = null;
522  }
523 
524  // ----------------------------------------------------------------------
525 
526  protected void fetchCommonAttributes(NdbOperation op)
527  throws NdbApiException {
528  op.getValue(model.name_cint);
529  op.getValue(model.name_clong);
530  op.getValue(model.name_cfloat);
531  op.getValue(model.name_cdouble);
532  }
533 
534  protected int getCommonAttributes(NdbResultSet rs)
535  throws NdbApiException {
536  final int cint = rs.getInt(model.name_cint);
537  final long clong = rs.getLong(model.name_clong);
538  verify(clong == cint);
539  final float cfloat = rs.getFloat(model.name_cfloat);
540  verify(cfloat == cint);
541  final double cdouble = rs.getDouble(model.name_cdouble);
542  verify(cdouble == cint);
543  return cint;
544  }
545 
546  protected void ins(NdbTable table, int from, int to,
547  boolean setAttrs, boolean batch)
548  throws NdbApiException {
549  beginTransaction();
550  for (int i = from; i <= to; i++) {
551  // get an insert operation for the table
552  final NdbOperation op = tx.getInsertOperation(table);
553  assert op != null;
554 
555  // set key attribute
556  op.equalInt(model.name_id, i);
557 
558  // set other attributes
559  if (setAttrs) {
560  op.setInt(model.name_cint, -i);
561  op.setLong(model.name_clong, -i);
562  op.setFloat(model.name_cfloat, -i);
563  op.setDouble(model.name_cdouble, -i);
564  }
565 
566  // execute the operation now if in non-batching mode
567  if (!batch)
568  executeOperations();
569  }
570  commitTransaction();
571  closeTransaction();
572  }
573 
574  protected void delByPK(NdbTable table, int from, int to,
575  boolean batch)
576  throws NdbApiException {
577  beginTransaction();
578  for (int i = from; i <= to; i++) {
579  // get a delete operation for the table
580  final NdbOperation op = tx.getDeleteOperation(table);
581  assert op != null;
582 
583  // set key attribute
584  op.equalInt(model.name_id, i);
585 
586  // execute the operation now if in non-batching mode
587  if (!batch)
588  executeOperations();
589  }
590  commitTransaction();
591  closeTransaction();
592  }
593 
594  protected int delByScan(NdbTable table) throws NdbApiException {
595  beginTransaction();
596 
597  // get a full table scan operation with exclusive locks
598  final NdbScanOperation op
599  = tx.getSelectScanOperation(table, LockMode.LM_Exclusive);
600  assert op != null;
601 
602  // start the scan; don't commit yet
603  executeOperations();
604 
605  // delete all rows in a given scan
606  int count = 0;
607  int stat;
608  final boolean allowFetch = true; // request new batches when exhausted
609  while ((stat = op.nextResult(allowFetch)) == 0) {
610  // delete all tuples within a batch
611  do {
612  op.deleteCurrentTuple();
613  count++;
614 
615  // XXX execute the operation now if in non-batching mode
616  //if (!batch)
617  // executeOperations();
618  } while ((stat = op.nextResult(!allowFetch)) == 0);
619 
620  if (stat == 1) {
621  // no more batches
622  break;
623  }
624  if (stat == 2) {
625  // end of current batch, fetch next
626  // XXX not documented: return value != 0 v throwing exception
627  // YYY Monty: should always throw exception -> void method
628  int s = tx.execute(ExecType.NoCommit, AbortOption.AbortOnError);
629  if (s != 0)
630  throw new RuntimeException("s == " + s);
631  continue;
632  }
633  throw new RuntimeException("stat == " + stat);
634  }
635  if (stat != 1)
636  throw new RuntimeException("stat == " + stat);
637 
638  // close the scan
639  op.close();
640 
641  commitTransaction();
642  closeTransaction();
643  return count;
644  }
645 
646  protected void setByPK(NdbTable table, int from, int to,
647  boolean batch)
648  throws NdbApiException {
649  beginTransaction();
650  for (int i = from; i <= to; i++) {
651  // get an insert operation for the table
652  final NdbOperation op = tx.getUpdateOperation(table);
653  assert op != null;
654 
655  // set key attribute
656  op.equalInt(model.name_id, i);
657 
658  // set other attributes
659  op.setInt(model.name_cint, i);
660  op.setLong(model.name_clong, i);
661  op.setFloat(model.name_cfloat, i);
662  op.setDouble(model.name_cdouble, i);
663 
664  // execute the operation now if in non-batching mode
665  if (!batch)
666  executeOperations();
667  }
668  commitTransaction();
669  closeTransaction();
670  }
671 
672  // XXX need to use names instead of ids due to broken
673  // NdbOperation.getValue(int), NdbResultSet.getXXX(int)
674  protected void testBrokenGetValueByIndex()
675  throws NdbApiException {
676  assert tx == null;
677  tx = ndb.startTransaction();
678  assert tx != null;
679  NdbOperation op = tx.getSelectOperation(model.table_A);
680  assert op != null;
681  final int int_val = 1;
682  op.equalInt("id", int_val);
683  op.getValue("id"); // XXX error with index instead of "id"
684  final NdbResultSet rs = op.resultData();
685  tx.execute(ExecType.Commit, AbortOption.AbortOnError, true);
686  while (rs.next()) {
687  int id = rs.getInt("id"); // XXX error with index instead of "id"
688  assert id == int_val;
689  }
690  tx.close();
691  tx = null;
692  }
693 
694  protected void getByPK(NdbTable table, int from, int to,
695  boolean batch)
696  throws NdbApiException {
697  // operation results
698  final int count = (to - from) + 1;
699  final NdbResultSet[] rss = new NdbResultSet[count];
700 
701  beginTransaction();
702  for (int i = 0, j = from; i < count; i++, j++) {
703  // get a read operation for the table
704  NdbOperation op = tx.getSelectOperation(table);
705  assert op != null;
706 
707  // set key attribute
708  op.equalInt(model.name_id, j);
709 
710  // define fetched attributes
711  op.getValue(model.name_id);
712  fetchCommonAttributes(op);
713 
714  // get attributes (not readable until after commit)
715  rss[i] = op.resultData();
716 
717  // execute the operation now if in non-batching mode
718  if (!batch)
719  executeOperations();
720  }
721  commitTransaction();
722 
723  // check fetched values
724  for (int i = 0, j = from; i < count; i++, j++) {
725  final NdbResultSet rs = rss[i];
726  final boolean hasNext = rs.next();
727  assert hasNext;
728 
729  // check key attribute
730  final int id = rs.getInt(model.name_id);
731  verify(id == j);
732 
733  // check other attributes
734  final int id1 = getCommonAttributes(rs);
735  verify(id1 == j);
736 
737  assert !rs.next();
738  }
739  closeTransaction();
740  }
741 
742  protected void setVarbinary(NdbTable table, int from, int to,
743  boolean batch, byte[] bytes)
744  throws NdbApiException {
745  beginTransaction();
746  for (int i = from; i <= to; i++) {
747  // get an update operation for the table
748  final NdbOperation op = tx.getUpdateOperation(table);
749  assert op != null;
750 
751  // set key attribute
752  op.equalInt(model.name_id, i);
753 
754  // set varbinary
755  op.setBytes(model.name_B0_cvarbinary_def, bytes);
756 
757  // execute the operation now if in non-batching mode
758  if (!batch)
759  executeOperations();
760  }
761  commitTransaction();
762  closeTransaction();
763  }
764 
765  protected void setVarchar(NdbTable table, int from, int to,
766  boolean batch, String string)
767  throws NdbApiException {
768  beginTransaction();
769  for (int i = from; i <= to; i++) {
770  // get an update operation for the table
771  final NdbOperation op = tx.getUpdateOperation(table);
772  assert op != null;
773 
774  // set key attribute
775  op.equalInt(model.name_id, i);
776 
777  // set varchar
778  op.setString(model.name_B0_cvarchar_def, string);
779 
780  // execute the operation now if in non-batching mode
781  if (!batch)
782  executeOperations();
783  }
784  commitTransaction();
785  closeTransaction();
786  }
787 
788  protected void getVarbinary(NdbTable table, int from, int to,
789  boolean batch, byte[] bytes)
790  throws NdbApiException {
791  // operation results
792  final int count = (to - from) + 1;
793  final NdbResultSet[] rss = new NdbResultSet[count];
794 
795  beginTransaction();
796  for (int i = 0, j = from; i < count; i++, j++) {
797  // get a read operation for the table
798  NdbOperation op = tx.getSelectOperation(table);
799  assert op != null;
800 
801  // set key attribute
802  op.equalInt(model.name_id, j);
803 
804  // define fetched attributes
805  op.getValue(model.name_B0_cvarbinary_def);
806 
807  // get attributes (not readable until after commit)
808  rss[i] = op.resultData();
809 
810  // execute the operation now if in non-batching mode
811  if (!batch)
812  executeOperations();
813  }
814  //executeOperations();
815  commitTransaction();
816 
817  // check fetched values
818  for (int i = 0, j = from; i < count; i++, j++) {
819  final NdbResultSet rs = rss[i];
820  final boolean hasNext = rs.next();
821  assert hasNext;
822 
823  // check varbinary
824  final byte[] cvarbinary_def
825  = rs.getBytes(model.name_B0_cvarbinary_def);
826  verify(Arrays.equals(bytes, cvarbinary_def));
827 
828  assert !rs.next();
829  }
830  closeTransaction();
831  }
832 
833  protected void getVarchar(NdbTable table, int from, int to,
834  boolean batch, String string)
835  throws NdbApiException {
836  // operation results
837  final int count = (to - from) + 1;
838  final NdbResultSet[] rss = new NdbResultSet[count];
839 
840  beginTransaction();
841  for (int i = 0, j = from; i < count; i++, j++) {
842  // get a read operation for the table
843  NdbOperation op = tx.getSelectOperation(table);
844  assert op != null;
845 
846  // set key attribute
847  op.equalInt(model.name_id, j);
848 
849  // define fetched attributes
850  op.getValue(model.name_B0_cvarchar_def);
851 
852  // get attributes (not readable until after commit)
853  rss[i] = op.resultData();
854 
855  // execute the operation now if in non-batching mode
856  if (!batch)
857  executeOperations();
858  }
859  //executeOperations();
860  commitTransaction();
861 
862  // check fetched values
863  for (int i = 0, j = from; i < count; i++, j++) {
864  final NdbResultSet rs = rss[i];
865  final boolean hasNext = rs.next();
866  assert hasNext;
867 
868  // check varchar
869  if (true) {
870  final String cvarchar_def
871  = rs.getString(model.name_B0_cvarchar_def);
872  verify(string.equals(cvarchar_def));
873  } else {
874  // verification imposes a string->bytes conversion penalty
875  final byte[] cvarchar_def
876  = rs.getStringBytes(model.name_B0_cvarchar_def);
877  verify(Arrays.equals(string.getBytes(), cvarchar_def));
878  }
879 
880  assert !rs.next();
881  }
882  closeTransaction();
883  }
884 
885  protected void setB0ToA(int nOps,
886  boolean batch)
887  throws NdbApiException {
888  beginTransaction();
889  for (int i = 1; i <= nOps; i++) {
890  // get an update operation for the table
891  final NdbOperation op = tx.getUpdateOperation(model.table_B0);
892  assert op != null;
893 
894  // set key attribute
895  op.equalInt(model.name_id, i);
896 
897  // set a_id attribute
898  int a_id = ((i - 1) % nOps) + 1;
899  op.setInt(model.name_B0_a_id, a_id);
900 
901  // execute the operation now if in non-batching mode
902  if (!batch)
903  executeOperations();
904  }
905  commitTransaction();
906  closeTransaction();
907  }
908 
909  protected void nullB0ToA(int nOps,
910  boolean batch)
911  throws NdbApiException {
912  beginTransaction();
913  for (int i = 1; i <= nOps; i++) {
914  // get an update operation for the table
915  final NdbOperation op = tx.getUpdateOperation(model.table_B0);
916  assert op != null;
917 
918  // set key attribute
919  op.equalInt(model.name_id, i);
920 
921  // set a_id attribute
922  int a_id = ((i - 1) % nOps) + 1;
923  op.setNull(model.name_B0_a_id);
924 
925  // execute the operation now if in non-batching mode
926  if (!batch)
927  executeOperations();
928  }
929  commitTransaction();
930  closeTransaction();
931  }
932 
933  protected void navB0ToA(int nOps,
934  boolean batch)
935  throws NdbApiException {
936  beginTransaction();
937 
938  // fetch the foreign keys from B0 and read attributes from A
939  final NdbResultSet[] abs = new NdbResultSet[nOps];
940  for (int i = 1, j = 0; i <= nOps; i++, j++) {
941  // fetch the foreign key value from B0
942  NdbResultSet rs;
943  {
944  // get a read operation for the table
945  NdbOperation op = tx.getSelectOperation(model.table_B0);
946  assert op != null;
947 
948  // set key attribute
949  op.equalInt(model.name_id, i);
950 
951  // define fetched attributes
952  op.getValue(model.name_B0_a_id);
953 
954  // get attributes (not readable until after commit)
955  rs = op.resultData();
956  }
957  executeOperations(); // start the scan; don't commit yet
958 
959  // fetch the attributes from A
960  {
961  // get a read operation for the table
962  NdbOperation op = tx.getSelectOperation(model.table_A);
963  assert op != null;
964 
965  // set key attribute
966  final int a_id = rs.getInt(model.name_B0_a_id);
967  assert a_id == ((i - 1) % nOps) + 1;
968  op.equalInt(model.name_id, a_id);
969 
970  // define fetched attributes
971  op.getValue(model.name_id);
972  fetchCommonAttributes(op);
973 
974  // get attributes (not readable until after commit)
975  abs[j] = op.resultData();
976  }
977 
978  // execute the operation now if in non-batching mode
979  if (!batch)
980  executeOperations();
981  }
982  commitTransaction();
983 
984  // check fetched values
985  for (int i = 1, j = 0; i <= nOps; i++, j++) {
986  final NdbResultSet ab = abs[j];
987  final boolean hasNext = ab.next();
988  assert hasNext;
989 
990  // check key attribute
991  final int id = ab.getInt(model.name_id);
992  //out.println("id = " + id + ", i = " + i);
993  verify(id == ((i - 1) % nOps) + 1);
994 
995  // check other attributes
996  final int k = getCommonAttributes(ab);
997  verify(k == id);
998 
999  assert !ab.next();
1000  }
1001  closeTransaction();
1002  }
1003 
1004  protected void navB0ToAalt(int nOps,
1005  boolean batch)
1006  throws NdbApiException {
1007  beginTransaction();
1008 
1009  // fetch the foreign key value from B0
1010  final NdbResultSet[] a_ids = new NdbResultSet[nOps];
1011  for (int i = 1, j = 0; i <= nOps; i++, j++) {
1012  // get a read operation for the table
1013  NdbOperation op = tx.getSelectOperation(model.table_B0);
1014  assert op != null;
1015 
1016  // set key attribute
1017  op.equalInt(model.name_id, i);
1018 
1019  // define fetched attributes
1020  op.getValue(model.name_B0_a_id);
1021 
1022  // get attributes (not readable until after commit)
1023  a_ids[j] = op.resultData();
1024  }
1025  executeOperations(); // start the scan; don't commit yet
1026 
1027  // fetch the attributes from A
1028  final NdbResultSet[] abs = new NdbResultSet[nOps];
1029  for (int i = 1, j = 0; i <= nOps; i++, j++) {
1030  // get a read operation for the table
1031  NdbOperation op = tx.getSelectOperation(model.table_A);
1032  assert op != null;
1033 
1034  // set key attribute
1035  final int a_id = a_ids[j].getInt(model.name_B0_a_id);
1036  assert a_id == ((i - 1) % nOps) + 1;
1037  op.equalInt(model.name_id, a_id);
1038 
1039  // define fetched attributes
1040  op.getValue(model.name_id);
1041  fetchCommonAttributes(op);
1042 
1043  // get attributes (not readable until after commit)
1044  abs[j] = op.resultData();
1045 
1046  // execute the operation now if in non-batching mode
1047  if (!batch)
1048  executeOperations();
1049  }
1050  commitTransaction();
1051 
1052  // check fetched values
1053  for (int i = 1, j = 0; i <= nOps; i++, j++) {
1054  final NdbResultSet ab = abs[j];
1055  final boolean hasNext = ab.next();
1056  assert hasNext;
1057 
1058  // check key attribute
1059  final int id = ab.getInt(model.name_id);
1060  //out.println("id = " + id + ", i = " + i);
1061  verify(id == ((i - 1) % nOps) + 1);
1062 
1063  // check other attributes
1064  final int k = getCommonAttributes(ab);
1065  verify(k == id);
1066 
1067  assert !ab.next();
1068  }
1069  closeTransaction();
1070  }
1071 
1072  protected void navAToB0(int nOps,
1073  boolean forceSend)
1074  throws NdbApiException {
1075 // throws exceptions, see below:
1076 /*
1077  beginTransaction();
1078 
1079  // fetch attributes from B0 by foreign key scan
1080  final NdbResultSet[] abs = new NdbResultSet[nOps];
1081  int j = 0;
1082  for (int i = 1; i <= nOps; i++) {
1083  // get an index scan operation for the table
1084  // XXX ? no locks (LM_CommittedRead) or shared locks (LM_Read)
1085  final NdbIndexScanOperation op
1086  = tx.getSelectIndexScanOperation(model.idx_B0_a_id,
1087  model.table_B0,
1088  LockMode.LM_CommittedRead);
1089  assert op != null;
1090 
1091  // define the scan's bounds (faster than using a scan filter)
1092  // XXX this hardwired column name isn't right
1093  //op.setBoundInt("a_id", BoundType.BoundEQ, i);
1094  // compare with Operations.cpp:
1095  // if (op->setBound(idx_B0_a_id->getColumn(0)->getAttrId()...
1096  //
1097  // which translates into
1098  //out.println("idx_B0_a_id.getNoOfColumns() = "
1099  // + model.idx_B0_a_id.getNoOfColumns());
1100  //out.println("idx_B0_a_id.getColumn(0).getColumnNo() = "
1101  // + model.idx_B0_a_id.getColumn(0).getColumnNo());
1102  //op.setBoundInt(model.idx_B0_a_id.getColumn(0).getColumnNo(),
1103  // BoundType.BoundEQ, i);
1104  // except that we get the usual error with NDBJ:
1105  //[java] idx_B0_a_id.getColumn(0).getColumnNo() = 0
1106  // [java] caught com.mysql.cluster.ndbj.NdbApiException:
1107  // Invalid attribute name or number
1108  //
1109  // so we go by column name
1110  //out.println("idx_B0_a_id.getColumn(0).getName() = "
1111  // + model.idx_B0_a_id.getColumn(0).getName());
1112  //op.setBoundInt(model.idx_B0_a_id.getColumn(0).getName(),
1113  // BoundType.BoundEQ, i);
1114  // which is actually "a_id", so, for now, we call
1115  op.setBoundInt("a_id", BoundType.BoundEQ, i);
1116 
1117  // define fetched attributes
1118  op.getValue(model.name_id);
1119  fetchCommonAttributes(op);
1120 
1121  // start the scan; don't commit yet
1122  executeOperations();
1123 
1124  int stat;
1125  final boolean allowFetch = true; // request new batches when exhausted
1126  while ((stat = op.nextResult(allowFetch, forceSend)) == 0) {
1127  // get attributes (not readable until after commit)
1128  abs[j++] = op.resultData();
1129  }
1130  if (stat != 1)
1131  throw new RuntimeException("stat == " + stat);
1132  }
1133  commitTransaction();
1134  assert (j++ == nOps);
1135 
1136  // check fetched values
1137  j = 0;
1138  for (int i = 1; i <= nOps; i++) {
1139  final NdbResultSet ab = abs[j++];
1140  //out.println("j = " + j + ", ab = " + ab);
1141  //final boolean hasNext = ab.next();
1142  // throws
1143  //[java] j = 1, ab = com.mysql.cluster.ndbj.NdbResultSetImpl@6f144c
1144  // [java] caught com.mysql.cluster.ndbj.NdbApiException: Unknown error code
1145  // [java] com.mysql.cluster.ndbj.NdbApiException: Unknown error code
1146  // [java] at com.mysql.cluster.ndbj.NdbjJNI.NdbScanOperationImpl_nextResult__SWIG_(Native Method)
1147  // [java] at com.mysql.cluster.ndbj.NdbScanOperationImpl.nextResult(NdbScanOperationImpl.java:93)
1148  // [java] at com.mysql.cluster.ndbj.NdbResultSetImpl.next(NdbResultSetImpl.java:362)
1149  // [java] at com.mysql.cluster.crund.NdbjLoad.navAToB0(NdbjLoad.java:1205)
1150  //
1151  // YYY Frazer: check tx object for error (could be node failure)
1152  // Martin: doesn't help much; after ab.next():
1153  //out.println("tx.getNdbError() = " + tx.getNdbError().getCode());
1154  // returns -1 and
1155  //out.println("tx.getNdbError() = " + tx.getNdbError().getMessage());
1156  // says "Unknown error code"
1157  //
1158  // apparently,
1159  //final boolean hasNext = ab.next();
1160  // is the same as
1161  //final boolean hasNext = ab.next(true);
1162  // this returns false, but throws no exception:
1163  //final boolean hasNext = ab.next(false);
1164  out.println("tx.getNdbError() = " + tx.getNdbError().getCode());
1165  final boolean hasNext = ab.next();
1166  assert hasNext;
1167 
1168  // check key attribute
1169  final int id = ab.getInt(model.name_id);
1170  verify(id == i);
1171 
1172  // check other attributes
1173  final int id1 = getCommonAttributes(ab);
1174  verify(id1 == i);
1175 
1176  assert !ab.next();
1177  }
1178  closeTransaction();
1179 */
1180  }
1181 
1182  protected void navAToB0alt(int nOps,
1183  boolean forceSend)
1184  throws NdbApiException {
1185 // XXX not implemented yet, fix exception in navAToB0() first
1186 /*
1187  assert false;
1188 */
1189  }
1190 
1191  // ----------------------------------------------------------------------
1192  // NDBJ datastore operations
1193  // ----------------------------------------------------------------------
1194 
1195  protected void initConnection() throws NdbApiException {
1196  out.println();
1197 
1198  // optionally, connect and wait for reaching the data nodes (ndbds)
1199  out.print("waiting for ndbd ...");
1200  out.flush();
1201  final int initial_wait = 10; // secs to wait until first node detected
1202  final int final_wait = 0; // secs to wait after first node detected
1203 
1204  // XXX return: 0 all nodes live, > 0 at least one node live, < 0 error
1205  try {
1206  mgmd.waitUntilReady(initial_wait, final_wait);
1207  } catch (NdbApiException e) {
1208  out.println();
1209  out.println("!!! data nodes were not ready within "
1210  + (initial_wait + final_wait) + "s.");
1211  throw e;
1212  }
1213  out.println(" [ok]");
1214 
1215  // connect to database
1216  out.print("connecting to ndbd ...");
1217  out.flush();
1218  try {
1219  // XXX where to set schema?
1220  // YYY Frazer: schema not too useful in NDB at the moment
1221  // XXX unclear if maxThreads ^= maxNumberOfTransactions
1222  // since ndb.init(maxNumberOfTransactions) is deprecated
1223  //final int maxThreads = 4;
1224  //ndb = mgmd.createNdb(catalog, maxThreads);
1225  // YYY Frazer: yes, maxThreads == maxNumber(concurrent)OfTransactions
1226  ndb = mgmd.createNdb(catalog);
1227  } catch (NdbApiException e) {
1228  out.println();
1229  out.println("!!! failed to connect: " + e);
1230  throw e;
1231  }
1232  out.println(" [ok]");
1233 
1234  // initialize the schema shortcuts
1235  model = new Model(ndb);
1236  }
1237 
1238  protected void closeConnection() {
1239  out.println();
1240  out.print("closing ndbd connection ...");
1241  out.flush();
1242  model = null;
1243  ndb.close();
1244  ndb = null;
1245  out.println(" [ok]");
1246  }
1247 
1248  protected void clearData() throws NdbApiException {
1249  out.print("deleting all rows ...");
1250  out.flush();
1251  final int delB0 = delByScan(model.table_B0);
1252  out.print(" [B0: " + delB0);
1253  out.flush();
1254  final int delA = delByScan(model.table_A);
1255  out.print(", A: " + delA);
1256  out.flush();
1257  out.println("]");
1258  }
1259 
1260  // ----------------------------------------------------------------------
1261 
1262  static public void main(String[] args) {
1263  System.out.println("NdbjLoad.main()");
1264  parseArguments(args);
1265  new NdbjLoad().run();
1266  System.out.println();
1267  System.out.println("NdbjLoad.main(): done.");
1268  }
1269 }