MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
HugoTransactions.cpp
1 /*
2  Copyright (C) 2003-2008 MySQL AB, 2008-2010 Sun Microsystems, Inc.
3  All rights reserved. Use is subject to license terms.
4 
5  This program is free software; you can redistribute it and/or modify
6  it under the terms of the GNU General Public License as published by
7  the Free Software Foundation; version 2 of the License.
8 
9  This program is distributed in the hope that it will be useful,
10  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  GNU General Public License for more details.
13 
14  You should have received a copy of the GNU General Public License
15  along with this program; if not, write to the Free Software
16  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18 
19 #include "HugoTransactions.hpp"
20 #include <NDBT_Stats.hpp>
21 #include <NdbSleep.h>
22 #include <NdbTick.h>
23 
24 HugoTransactions::HugoTransactions(const NdbDictionary::Table& _tab,
25  const NdbDictionary::Index* idx):
26  HugoOperations(_tab, idx),
27  row(_tab){
28 
29  m_defaultScanUpdateMethod = 3;
30  setRetryMax();
31  m_stats_latency = 0;
32 
33  m_thr_count = 0;
34  m_thr_no = -1;
35 }
36 
37 HugoTransactions::~HugoTransactions(){
38  deallocRows();
39 }
40 
41 int
43  int records,
44  int abortPercent,
45  int parallelism,
47  int scan_flags)
48 {
49 
50  int retryAttempt = 0;
51  int check, a;
52  NdbScanOperation *pOp;
53 
54  while (true){
55 
56  if (retryAttempt >= m_retryMax){
57  g_err << __LINE__ << " ERROR: has retried this operation "
58  << retryAttempt << " times, failing!" << endl;
59  return NDBT_FAILED;
60  }
61 
62  pTrans = pNdb->startTransaction();
63  if (pTrans == NULL) {
64  const NdbError err = pNdb->getNdbError();
65 
66  if (err.status == NdbError::TemporaryError){
67  ERR(err);
68  NdbSleep_MilliSleep(50);
69  retryAttempt++;
70  continue;
71  }
72  ERR(err);
73  return NDBT_FAILED;
74  }
75 
76  pOp = getScanOperation(pTrans);
77  if (pOp == NULL) {
78  ERR(pTrans->getNdbError());
79  closeTransaction(pNdb);
80  return NDBT_FAILED;
81  }
82 
83  if( pOp ->readTuples(lm, scan_flags, parallelism) ) {
84  ERR(pTrans->getNdbError());
85  closeTransaction(pNdb);
86  return NDBT_FAILED;
87  }
88 
89  for(a = 0; a<tab.getNoOfColumns(); a++){
90  if((row.attributeStore(a) =
91  pOp->getValue(tab.getColumn(a)->getName())) == 0) {
92  ERR(pTrans->getNdbError());
93  closeTransaction(pNdb);
94  return NDBT_FAILED;
95  }
96  }
97 
98  check = pTrans->execute(NoCommit, AbortOnError);
99  if( check == -1 ) {
100  const NdbError err = pTrans->getNdbError();
101  if (err.status == NdbError::TemporaryError){
102  ERR(err);
103  closeTransaction(pNdb);
104  NdbSleep_MilliSleep(50);
105  retryAttempt++;
106  continue;
107  }
108  ERR(err);
109  closeTransaction(pNdb);
110  return NDBT_FAILED;
111  }
112 
113  // Abort after 1-100 or 1-records rows
114  int ranVal = rand();
115  int abortCount = ranVal % (records == 0 ? 100 : records);
116  bool abortTrans = false;
117  if (abort > 0){
118  // Abort if abortCount is less then abortPercent
119  if (abortCount < abortPercent)
120  abortTrans = true;
121  }
122 
123  int eof;
124  int rows = 0;
125  while((eof = pOp->nextResult(true)) == 0){
126  rows++;
127  if (calc.verifyRowValues(&row) != 0){
128  closeTransaction(pNdb);
129  return NDBT_FAILED;
130  }
131 
132  if (abortCount == rows && abortTrans == true){
133  ndbout << "Scan is aborted" << endl;
134  g_info << "Scan is aborted" << endl;
135  pOp->close();
136  if( check == -1 ) {
137  ERR(pTrans->getNdbError());
138  closeTransaction(pNdb);
139  return NDBT_FAILED;
140  }
141 
142  closeTransaction(pNdb);
143  return NDBT_OK;
144  }
145  }
146  if (eof == -1) {
147  const NdbError err = pTrans->getNdbError();
148 
149  if (err.status == NdbError::TemporaryError){
150  ERR_INFO(err);
151  closeTransaction(pNdb);
152  NdbSleep_MilliSleep(50);
153  switch (err.code){
154  case 488:
155  case 245:
156  case 490:
157  // Too many active scans, no limit on number of retry attempts
158  break;
159  default:
161  {
162  if (retryAttempt >= (m_retryMax / 10) &&
163  (parallelism == 0 || parallelism > 1))
164  {
168  parallelism = 1;
169  ndbout_c("decrease parallelism");
170  }
171  }
172  retryAttempt++;
173  }
174  continue;
175  }
176  ERR(err);
177  closeTransaction(pNdb);
178  return NDBT_FAILED;
179  }
180 
181  closeTransaction(pNdb);
182 
183  g_info << rows << " rows have been read" << endl;
184  if (records != 0 && rows != records){
185  g_err << "Check expected number of records failed" << endl
186  << " expected=" << records <<", " << endl
187  << " read=" << rows << endl;
188  return NDBT_FAILED;
189  }
190 
191  return NDBT_OK;
192  }
193  return NDBT_FAILED;
194 }
195 
196 int
198  const NdbDictionary::Index * pIdx,
199  int records,
200  int abortPercent,
201  int parallelism,
203  int scan_flags)
204 {
205 
206  int retryAttempt = 0;
207  int check, a;
209 
210  while (true){
211 
212  if (retryAttempt >= m_retryMax){
213  g_err << __LINE__ << " ERROR: has retried this operation "
214  << retryAttempt << " times, failing!" << endl;
215  g_err << "lm: " << Uint32(lm) << " flags: H'" << hex << scan_flags
216  << endl;
217  return NDBT_FAILED;
218  }
219 
220  pTrans = pNdb->startTransaction();
221  if (pTrans == NULL) {
222  const NdbError err = pNdb->getNdbError();
223 
224  if (err.status == NdbError::TemporaryError){
225  ERR(err);
226  NdbSleep_MilliSleep(50);
227  retryAttempt++;
228  continue;
229  }
230  ERR(err);
231  return NDBT_FAILED;
232  }
233 
234  pOp = pTrans->getNdbIndexScanOperation(pIdx->getName(), tab.getName());
235  if (pOp == NULL) {
236  ERR(pTrans->getNdbError());
237  closeTransaction(pNdb);
238  return NDBT_FAILED;
239  }
240 
241  if( pOp ->readTuples(lm, scan_flags, parallelism) ) {
242  ERR(pTrans->getNdbError());
243  closeTransaction(pNdb);
244  return NDBT_FAILED;
245  }
246 
247  for(a = 0; a<tab.getNoOfColumns(); a++){
248  if((row.attributeStore(a) =
249  pOp->getValue(tab.getColumn(a)->getName())) == 0) {
250  ERR(pTrans->getNdbError());
251  closeTransaction(pNdb);
252  return NDBT_FAILED;
253  }
254  }
255 
256  check = pTrans->execute(NoCommit, AbortOnError);
257  if( check == -1 ) {
258  const NdbError err = pTrans->getNdbError();
259  if (err.status == NdbError::TemporaryError){
260  ERR(err);
261  closeTransaction(pNdb);
262  NdbSleep_MilliSleep(50);
263  retryAttempt++;
264  continue;
265  }
266  ERR(err);
267  closeTransaction(pNdb);
268  return NDBT_FAILED;
269  }
270 
271  // Abort after 1-100 or 1-records rows
272  int ranVal = rand();
273  int abortCount = ranVal % (records == 0 ? 100 : records);
274  bool abortTrans = false;
275  if (abort > 0){
276  // Abort if abortCount is less then abortPercent
277  if (abortCount < abortPercent)
278  abortTrans = true;
279  }
280 
281  int eof;
282  int rows = 0;
283  while((eof = pOp->nextResult(true)) == 0){
284  rows++;
285  if (calc.verifyRowValues(&row) != 0){
286  closeTransaction(pNdb);
287  return NDBT_FAILED;
288  }
289 
290  if (abortCount == rows && abortTrans == true){
291  ndbout << "Scan is aborted" << endl;
292  g_info << "Scan is aborted" << endl;
293  pOp->close();
294  if( check == -1 ) {
295  ERR(pTrans->getNdbError());
296  closeTransaction(pNdb);
297  return NDBT_FAILED;
298  }
299 
300  closeTransaction(pNdb);
301  return NDBT_OK;
302  }
303  }
304  if (eof == -1) {
305  const NdbError err = pTrans->getNdbError();
306 
307  if (err.status == NdbError::TemporaryError){
308  ERR_INFO(err);
309  closeTransaction(pNdb);
310  NdbSleep_MilliSleep(50);
311  switch (err.code){
312  case 488:
313  case 245:
314  case 490:
315  // Too many active scans, no limit on number of retry attempts
316  break;
317  default:
319  {
320  if (retryAttempt >= (m_retryMax / 10) &&
321  (parallelism == 0 || parallelism > 1))
322  {
326  parallelism = 1;
327  ndbout_c("decrease parallelism");
328  }
329  else if (retryAttempt >= (m_retryMax / 5) &&
331  {
333  ndbout_c("switch to LM_CommittedRead");
334  }
335  else if (retryAttempt >= (m_retryMax / 4) &&
336  (pIdx != 0))
337  {
338  pIdx = 0;
339  scan_flags |= NdbScanOperation::SF_TupScan;
340  ndbout_c("switch to table-scan (SF_TupScan) form index-scan");
341  }
342  }
343  retryAttempt++;
344  }
345  continue;
346  }
347  ERR(err);
348  closeTransaction(pNdb);
349  return NDBT_FAILED;
350  }
351 
352  closeTransaction(pNdb);
353 
354  g_info << rows << " rows have been read" << endl;
355  if (records != 0 && rows != records){
356  g_err << "Check expected number of records failed" << endl
357  << " expected=" << records <<", " << endl
358  << " read=" << rows << endl;
359  return NDBT_FAILED;
360  }
361 
362  return NDBT_OK;
363  }
364  return NDBT_FAILED;
365 }
366 
367 
368 #define RESTART_SCAN 99
369 
370 int
371 HugoTransactions::scanUpdateRecords(Ndb* pNdb,
373  int records,
374  int abortPercent,
375  int parallelism){
376  int retryAttempt = 0;
377  int check, a;
378  NdbScanOperation *pOp;
379 
380  while (true){
381 restart:
382  if (retryAttempt++ >= m_retryMax){
383  g_info << "ERROR: has retried this operation " << retryAttempt
384  << " times, failing!" << endl;
385  return NDBT_FAILED;
386  }
387 
388  pTrans = pNdb->startTransaction();
389  if (pTrans == NULL) {
390  const NdbError err = pNdb->getNdbError();
391  ERR(err);
392  if (err.status == NdbError::TemporaryError){
393  NdbSleep_MilliSleep(50);
394  continue;
395  }
396  return NDBT_FAILED;
397  }
398 
399  pOp = getScanOperation(pTrans);
400  if (pOp == NULL) {
401  ERR(pTrans->getNdbError());
402  closeTransaction(pNdb);
403  return NDBT_FAILED;
404  }
405 
406  if( pOp->readTuples(NdbOperation::LM_Exclusive, flags,
407  parallelism))
408  {
409  closeTransaction(pNdb);
410  return NDBT_FAILED;
411  }
412 
413  // Read all attributes from this table
414  for(a=0; a<tab.getNoOfColumns(); a++){
415  if((row.attributeStore(a) = pOp->getValue(tab.getColumn(a)->getName())) == NULL){
416  ERR(pTrans->getNdbError());
417  closeTransaction(pNdb);
418  return NDBT_FAILED;
419  }
420  }
421 
422  check = pTrans->execute(NoCommit, AbortOnError);
423  if( check == -1 ) {
424  const NdbError err = pTrans->getNdbError();
425  ERR(err);
426  closeTransaction(pNdb);
427  if (err.status == NdbError::TemporaryError){
428  NdbSleep_MilliSleep(50);
429  continue;
430  }
431  return NDBT_FAILED;
432  }
433 
434  // Abort after 1-100 or 1-records rows
435  int ranVal = rand();
436  int abortCount = ranVal % (records == 0 ? 100 : records);
437  bool abortTrans = false;
438  if (abort > 0){
439  // Abort if abortCount is less then abortPercent
440  if (abortCount < abortPercent)
441  abortTrans = true;
442  }
443 
444  int rows = 0;
445  while((check = pOp->nextResult(true)) == 0){
446  do {
447  rows++;
448  NdbOperation* pUp = pOp->updateCurrentTuple();
449  if(pUp == 0){
450  ERR(pTrans->getNdbError());
451  closeTransaction(pNdb);
452  return NDBT_FAILED;
453  }
454  const int updates = calc.getUpdatesValue(&row) + 1;
455  const int r = calc.getIdValue(&row);
456 
457  for(a = 0; a<tab.getNoOfColumns(); a++){
458  if (tab.getColumn(a)->getPrimaryKey() == false){
459  if(setValueForAttr(pUp, a, r, updates ) != 0){
460  ERR(pTrans->getNdbError());
461  closeTransaction(pNdb);
462  return NDBT_FAILED;
463  }
464  }
465  }
466 
467  if (rows == abortCount && abortTrans == true){
468  g_info << "Scan is aborted" << endl;
469  // This scan should be aborted
470  closeTransaction(pNdb);
471  return NDBT_OK;
472  }
473  } while((check = pOp->nextResult(false)) == 0);
474 
475  if(check != -1){
476  check = pTrans->execute(Commit, AbortOnError);
477  if(check != -1)
478  m_latest_gci = pTrans->getGCI();
479  pTrans->restart();
480  }
481 
482  const NdbError err = pTrans->getNdbError();
483  if( check == -1 ) {
484  closeTransaction(pNdb);
485  ERR(err);
486  if (err.status == NdbError::TemporaryError){
487  NdbSleep_MilliSleep(50);
488  goto restart;
489  }
490  return NDBT_FAILED;
491  }
492  }
493 
494  const NdbError err = pTrans->getNdbError();
495  if( check == -1 ) {
496  closeTransaction(pNdb);
497  ERR(err);
498  if (err.status == NdbError::TemporaryError){
499  NdbSleep_MilliSleep(50);
500  goto restart;
501  }
502  return NDBT_FAILED;
503  }
504 
505  closeTransaction(pNdb);
506 
507  g_info << rows << " rows have been updated" << endl;
508  return NDBT_OK;
509  }
510  return NDBT_FAILED;
511 }
512 
513 int
514 HugoTransactions::scanUpdateRecords(Ndb* pNdb,
515  int records,
516  int abortPercent,
517  int parallelism){
518 
519  return scanUpdateRecords(pNdb,
521  records, abortPercent, parallelism);
522 }
523 
524 // Scan all records exclusive and update
525 // them one by one
526 int
527 HugoTransactions::scanUpdateRecords1(Ndb* pNdb,
528  int records,
529  int abortPercent,
530  int parallelism){
531  return scanUpdateRecords(pNdb,
533  records, abortPercent, 1);
534 }
535 
536 // Scan all records exclusive and update
537 // them batched by asking nextScanResult to
538 // give us all cached records before fetching new
539 // records from db
540 int
541 HugoTransactions::scanUpdateRecords2(Ndb* pNdb,
542  int records,
543  int abortPercent,
544  int parallelism){
545  return scanUpdateRecords(pNdb, (NdbScanOperation::ScanFlag)0,
546  records, abortPercent, parallelism);
547 }
548 
549 int
550 HugoTransactions::scanUpdateRecords3(Ndb* pNdb,
551  int records,
552  int abortPercent,
553  int parallelism)
554 {
555  return scanUpdateRecords(pNdb, (NdbScanOperation::ScanFlag)0,
556  records, abortPercent, parallelism);
557 }
558 
559 int
560 HugoTransactions::loadTable(Ndb* pNdb,
561  int records,
562  int batch,
563  bool allowConstraintViolation,
564  int doSleep,
565  bool oneTrans,
566  int value,
567  bool abort)
568 {
569  return loadTableStartFrom(pNdb, 0, records, batch, allowConstraintViolation,
570  doSleep, oneTrans, value, abort);
571 }
572 
573 int
574 HugoTransactions::loadTableStartFrom(Ndb* pNdb,
575  int startFrom,
576  int records,
577  int batch,
578  bool allowConstraintViolation,
579  int doSleep,
580  bool oneTrans,
581  int value,
582  bool abort){
583  int check;
584  int retryAttempt = 0;
585  int retryMax = 5;
586  bool first_batch = true;
587 
588  const int org = batch;
589  const int cols = tab.getNoOfColumns();
590  const int brow = tab.getRowSizeInBytes();
591  const int bytes = 12 + brow + 4 * cols;
592  batch = (batch * 256); // -> 512 -> 65536k per commit
593  batch = batch/bytes; //
594  batch = batch == 0 ? 1 : batch;
595 
596  if(batch != org){
597  g_info << "batch = " << org << " rowsize = " << bytes
598  << " -> rows/commit = " << batch << endl;
599  }
600 
601  //Uint32 orgbatch = batch;
602  g_info << "|- Inserting records..." << endl;
603  for (int c=0 ; c<records; ){
604  bool closeTrans = true;
605 
606  if(c + batch > records)
607  batch = records - c;
608 
609  if (retryAttempt >= retryMax){
610  g_info << "Record " << c << " could not be inserted, has retried "
611  << retryAttempt << " times " << endl;
612  // Reset retry counters and continue with next record
613  retryAttempt = 0;
614  c++;
615  }
616  if (doSleep > 0)
617  NdbSleep_MilliSleep(doSleep);
618 
619  // if (first_batch || !oneTrans) {
620  if (first_batch || !pTrans) {
621  first_batch = false;
622  pTrans = pNdb->startTransaction();
623  if (pTrans == NULL) {
624  const NdbError err = pNdb->getNdbError();
625 
626  if (err.status == NdbError::TemporaryError){
627  ERR(err);
628  NdbSleep_MilliSleep(50);
629  retryAttempt++;
630  continue;
631  }
632  ERR(err);
633  return NDBT_FAILED;
634  }
635  }
636 
637  if(pkInsertRecord(pNdb, c + startFrom, batch, value) != NDBT_OK)
638  {
639  ERR(pTrans->getNdbError());
640  closeTransaction(pNdb);
641  return NDBT_FAILED;
642  }
643 
644  // Execute the transaction and insert the record
645  if (!oneTrans || (c + batch) >= records) {
646  // closeTrans = true;
647  closeTrans = false;
648  if (!abort)
649  {
650  check = pTrans->execute(Commit, AbortOnError);
651  if(check != -1)
652  pTrans->getGCI(&m_latest_gci);
653  pTrans->restart();
654  }
655  else
656  {
657  check = pTrans->execute(NoCommit, AbortOnError);
658  if (check != -1)
659  {
660  check = pTrans->execute( Rollback );
661  closeTransaction(pNdb);
662  }
663  }
664  } else {
665  closeTrans = false;
666  check = pTrans->execute(NoCommit, AbortOnError);
667  }
668  if(check == -1 ) {
669  const NdbError err = pTrans->getNdbError();
670  closeTransaction(pNdb);
671  pTrans= 0;
672  switch(err.status){
673  case NdbError::Success:
674  ERR(err);
675  g_info << "ERROR: NdbError reports success when transcaction failed"
676  << endl;
677  return NDBT_FAILED;
678  break;
679 
681  ERR(err);
682  NdbSleep_MilliSleep(50);
683  retryAttempt++;
684  batch = 1;
685  continue;
686  break;
687 
689  ERR(err);
690  return NDBT_FAILED;
691  break;
692 
694  if (allowConstraintViolation == true){
695  switch (err.classification){
697  // Tuple already existed, OK but should be reported
698  g_info << c << ": " << err.code << " " << err.message << endl;
699  c++;
700  continue;
701  break;
702  default:
703  break;
704  }
705  }
706  ERR(err);
707  return err.code;
708  break;
709  }
710  }
711  else{
712  if (closeTrans) {
713  closeTransaction(pNdb);
714  pTrans= 0;
715  }
716  }
717 
718  // Step to next record
719  c = c+batch;
720  retryAttempt = 0;
721  }
722 
723  if(pTrans)
724  closeTransaction(pNdb);
725  return NDBT_OK;
726 }
727 
728 int
729 HugoTransactions::fillTable(Ndb* pNdb,
730  int batch){
731  return fillTableStartFrom(pNdb, 0, batch);
732 }
733 
734 int
735 HugoTransactions::fillTableStartFrom(Ndb* pNdb,
736  int startFrom,
737  int batch){
738  int check;
739  int retryAttempt = 0;
740  int retryMax = 5;
741 
742  const int org = batch;
743  const int cols = tab.getNoOfColumns();
744  const int brow = tab.getRowSizeInBytes();
745  const int bytes = 12 + brow + 4 * cols;
746  batch = (batch * 256); // -> 512 -> 65536k per commit
747  batch = batch/bytes; //
748  batch = batch == 0 ? 1 : batch;
749 
750  if(batch != org){
751  g_info << "batch = " << org << " rowsize = " << bytes
752  << " -> rows/commit = " << batch << endl;
753  }
754 
755  for (int c=startFrom ; ; ){
756 
757  if (retryAttempt >= retryMax){
758  g_info << "Record " << c << " could not be inserted, has retried "
759  << retryAttempt << " times " << endl;
760  // Reset retry counters and continue with next record
761  retryAttempt = 0;
762  c++;
763  }
764 
765  pTrans = pNdb->startTransaction();
766  if (pTrans == NULL) {
767  const NdbError err = pNdb->getNdbError();
768 
769  if (err.status == NdbError::TemporaryError){
770  ERR(err);
771  NdbSleep_MilliSleep(50);
772  retryAttempt++;
773  continue;
774  }
775  ERR(err);
776  return NDBT_FAILED;
777  }
778 
779  if(pkInsertRecord(pNdb, c, batch) != NDBT_OK)
780  {
781  ERR(pTrans->getNdbError());
782  closeTransaction(pNdb);
783  return NDBT_FAILED;
784  }
785 
786  // Execute the transaction and insert the record
787  check = pTrans->execute(Commit, CommitAsMuchAsPossible);
788  const NdbError err = pTrans->getNdbError();
789  if(check == -1 || err.code != 0) {
790  closeTransaction(pNdb);
791 
792  switch(err.status){
793  case NdbError::Success:
794  ERR(err);
795  g_info << "ERROR: NdbError reports success when transcaction failed"
796  << endl;
797  return NDBT_FAILED;
798  break;
799 
801  ERR(err);
802  NdbSleep_MilliSleep(50);
803  retryAttempt++;
804  continue;
805  break;
806 
808  ERR(err);
809  return NDBT_FAILED;
810  break;
811 
813  // if (allowConstraintViolation == true){
814  // switch (err.classification){
815  // case NdbError::ConstraintViolation:
816  // // Tuple already existed, OK but should be reported
817  // g_info << c << ": " << err.code << " " << err.message << endl;
818  // c++;
819  // continue;
820  // break;
821  // default:
822  // break;es
823  // }
824  // }
825 
826  // Check if this is the "db full" error
828  ERR(err);
829  return NDBT_OK;
830  }
831 
833  ERR(err);
834  break;
835  }
836  ERR(err);
837  return NDBT_FAILED;
838  break;
839  }
840  }
841  else{
842  pTrans->getGCI(&m_latest_gci);
843  closeTransaction(pNdb);
844  }
845 
846  // Step to next record
847  c = c+batch;
848  retryAttempt = 0;
849  }
850  return NDBT_OK;
851 }
852 
853 int
855  int records,
856  int batch,
858  int _rand){
859  int reads = 0;
860  int r = 0;
861  int retryAttempt = 0;
862  int check;
863 
864  if (batch == 0) {
865  g_info << "ERROR: Argument batch == 0 in pkReadRecords(). Not allowed." << endl;
866  return NDBT_FAILED;
867  }
868 
869  while (r < records){
870  if(r + batch > records)
871  batch = records - r;
872 
873  if (retryAttempt >= m_retryMax){
874  g_info << "ERROR: has retried this operation " << retryAttempt
875  << " times, failing!" << endl;
876  return NDBT_FAILED;
877  }
878 
879  pTrans = pNdb->startTransaction();
880  if (pTrans == NULL) {
881  const NdbError err = pNdb->getNdbError();
882 
883  if (err.status == NdbError::TemporaryError){
884  ERR(err);
885  NdbSleep_MilliSleep(50);
886  retryAttempt++;
887  continue;
888  }
889  ERR(err);
890  return NDBT_FAILED;
891  }
892 
893  MicroSecondTimer timer_start;
894  MicroSecondTimer timer_stop;
895  bool timer_active =
896  m_stats_latency != 0 &&
897  r >= batch && // first batch is "warmup"
898  r + batch != records; // last batch is usually partial
899 
900  if (timer_active)
901  NdbTick_getMicroTimer(&timer_start);
902 
903  NdbOperation::LockMode lmused;
904  if (_rand == 0)
905  {
906  if(pkReadRecord(pNdb, r, batch, lm, &lmused) != NDBT_OK)
907  {
908  ERR(pTrans->getNdbError());
909  closeTransaction(pNdb);
910  return NDBT_FAILED;
911  }
912  }
913  else
914  {
915  if(pkReadRandRecord(pNdb, records, batch, lm, &lmused) != NDBT_OK)
916  {
917  ERR(pTrans->getNdbError());
918  closeTransaction(pNdb);
919  return NDBT_FAILED;
920  }
921  }
922 
923  check = pTrans->execute(Commit, AbortOnError);
924 
925  if (check != -1 && lmused == NdbOperation::LM_CommittedRead)
926  {
932  if (pTrans->getNdbError().status != NdbError::Success)
933  {
934  check = -1;
935  }
936  }
937 
938  if( check == -1 ) {
939  const NdbError err = pTrans->getNdbError();
940 
941  if (err.status == NdbError::TemporaryError){
942  ERR(err);
943  closeTransaction(pNdb);
944  NdbSleep_MilliSleep(50);
945  retryAttempt++;
946  continue;
947  }
948  switch(err.code){
949  case 626: // Tuple did not exist
950  g_info << r << ": " << err.code << " " << err.message << endl;
951  r++;
952  break;
953 
954  default:
955  ERR(err);
956  closeTransaction(pNdb);
957  return NDBT_FAILED;
958  }
959  } else {
960 
961  if(indexScans.size() > 0)
962  {
963  /* Index scan used to read records....*/
964  int rows_found = 0;
965  for (Uint32 scanOp=0; scanOp < indexScans.size(); scanOp++)
966  {
967  while((check = indexScans[scanOp]->nextResult()) == 0)
968  {
969  rows_found++;
970  if (calc.verifyRowValues(rows[0]) != 0){
971  closeTransaction(pNdb);
972  return NDBT_FAILED;
973  }
974  }
975  }
976  if(check != 1 || rows_found > batch)
977  {
978  closeTransaction(pNdb);
979  return NDBT_FAILED;
980  }
981  else if(rows_found < batch)
982  {
983  if(batch == 1){
984  g_info << r << ": not found" << endl; abort(); }
985  else
986  g_info << "Found " << rows_found << " of "
987  << batch << " rows" << endl;
988  }
989  r += batch;
990  reads += rows_found;
991  }
992  else
993  {
994  for (int b=0; (b<batch) && (r+b<records); b++){
995  if (calc.verifyRowValues(rows[b]) != 0){
996  closeTransaction(pNdb);
997  return NDBT_FAILED;
998  }
999  reads++;
1000  r++;
1001  }
1002  }
1003  }
1004 
1005  closeTransaction(pNdb);
1006 
1007  if (timer_active) {
1008  NdbTick_getMicroTimer(&timer_stop);
1009  NDB_TICKS ticks = NdbTick_getMicrosPassed(timer_start, timer_stop);
1010  m_stats_latency->addObservation((double)ticks);
1011  }
1012  }
1013  deallocRows();
1014  indexScans.clear();
1015  g_info << reads << " records read" << endl;
1016  return NDBT_OK;
1017 }
1018 
1019 
1020 
1021 int
1022 HugoTransactions::pkUpdateRecords(Ndb* pNdb,
1023  int records,
1024  int batch,
1025  int doSleep){
1026  int updated = 0;
1027  int r = 0;
1028  int retryAttempt = 0;
1029  int check, b;
1030 
1031  allocRows(batch);
1032 
1033  g_info << "|- Updating records (batch=" << batch << ")..." << endl;
1034  int batch_no = 0;
1035  while (r < records){
1036  if(r + batch > records)
1037  batch = records - r;
1038 
1039  if (m_thr_count != 0 && m_thr_no != batch_no % m_thr_count)
1040  {
1041  r += batch;
1042  batch_no++;
1043  continue;
1044  }
1045 
1046  if (retryAttempt >= m_retryMax){
1047  g_info << "ERROR: has retried this operation " << retryAttempt
1048  << " times, failing!" << endl;
1049  return NDBT_FAILED;
1050  }
1051 
1052  if (doSleep > 0)
1053  NdbSleep_MilliSleep(doSleep);
1054 
1055  pTrans = pNdb->startTransaction();
1056  if (pTrans == NULL) {
1057  const NdbError err = pNdb->getNdbError();
1058 
1059  if (err.status == NdbError::TemporaryError){
1060  ERR(err);
1061  NdbSleep_MilliSleep(50);
1062  retryAttempt++;
1063  continue;
1064  }
1065  ERR(err);
1066  return NDBT_FAILED;
1067  }
1068 
1069  if(pkReadRecord(pNdb, r, batch, NdbOperation::LM_Exclusive) != NDBT_OK)
1070  {
1071  ERR(pTrans->getNdbError());
1072  closeTransaction(pNdb);
1073  return NDBT_FAILED;
1074  }
1075 
1076  check = pTrans->execute(NoCommit, AbortOnError);
1077  if( check == -1 ) {
1078  const NdbError err = pTrans->getNdbError();
1079 
1080  if (err.status == NdbError::TemporaryError){
1081  ERR(err);
1082  closeTransaction(pNdb);
1083  NdbSleep_MilliSleep(50);
1084  retryAttempt++;
1085  continue;
1086  }
1087  ERR(err);
1088  closeTransaction(pNdb);
1089  return NDBT_FAILED;
1090  }
1091 
1092  MicroSecondTimer timer_start;
1093  MicroSecondTimer timer_stop;
1094  bool timer_active =
1095  m_stats_latency != 0 &&
1096  r >= batch && // first batch is "warmup"
1097  r + batch != records; // last batch is usually partial
1098 
1099  if (timer_active)
1100  NdbTick_getMicroTimer(&timer_start);
1101 
1102  int rows_found = 0;
1103 
1104  if(indexScans.size() > 0)
1105  {
1106  /* Index scans used to read records */
1107  for (Uint32 scanOp=0; scanOp < indexScans.size(); scanOp++)
1108  {
1109  while((check = indexScans[scanOp]->nextResult(true)) == 0)
1110  {
1111  do {
1112 
1113  if (calc.verifyRowValues(rows[0]) != 0){
1114  g_info << "Row validation failure" << endl;
1115  closeTransaction(pNdb);
1116  return NDBT_FAILED;
1117  }
1118 
1119  int updates = calc.getUpdatesValue(rows[0]) + 1;
1120 
1121  /* Rows may not arrive in the order they were requested
1122  * (When multiple partitions scanned without ORDERBY)
1123  * therefore we use the id from the row to update it
1124  */
1125  const Uint32 rowId= calc.getIdValue(rows[0]);
1126  if(pkUpdateRecord(pNdb, rowId, 1, updates) != NDBT_OK)
1127  {
1128  ERR(pTrans->getNdbError());
1129  closeTransaction(pNdb);
1130  return NDBT_FAILED;
1131  }
1132  rows_found++;
1133  } while((check = indexScans[scanOp]->nextResult(false)) == 0);
1134 
1135  if(check != 2)
1136  break;
1137  if((check = pTrans->execute(NoCommit, AbortOnError)) != 0)
1138  break;
1139  } // Next fetch on this scan op...
1140 
1141  if(check != 1)
1142  {
1143  g_info << "Check failed" << endl;
1144  closeTransaction(pNdb);
1145  return NDBT_FAILED;
1146  }
1147  } // Next scan op...
1148 
1149  if (rows_found != batch)
1150  {
1151  g_info << "Incorrect num of rows found. Expected "
1152  << batch << ". Found " << rows_found << endl;
1153  closeTransaction(pNdb);
1154  return NDBT_FAILED;
1155  }
1156  }
1157  else
1158  {
1159  for(b = 0; b<batch && (b+r)<records; b++)
1160  {
1161  if (calc.verifyRowValues(rows[b]) != 0)
1162  {
1163  closeTransaction(pNdb);
1164  return NDBT_FAILED;
1165  }
1166 
1167  int updates = calc.getUpdatesValue(rows[b]) + 1;
1168 
1169  if(pkUpdateRecord(pNdb, r+b, 1, updates) != NDBT_OK)
1170  {
1171  ERR(pTrans->getNdbError());
1172  closeTransaction(pNdb);
1173  return NDBT_FAILED;
1174  }
1175  }
1176  check = pTrans->execute(Commit, AbortOnError);
1177  }
1178  if( check == -1 ) {
1179  const NdbError err = pTrans->getNdbError();
1180 
1181  if (err.status == NdbError::TemporaryError){
1182  ERR(err);
1183  closeTransaction(pNdb);
1184  NdbSleep_MilliSleep(50);
1185  retryAttempt++;
1186  continue;
1187  }
1188  ERR(err);
1189  ndbout << "r = " << r << endl;
1190  closeTransaction(pNdb);
1191  return NDBT_FAILED;
1192  }
1193  else{
1194  updated += batch;
1195  pTrans->getGCI(&m_latest_gci);
1196  }
1197 
1198  closeTransaction(pNdb);
1199 
1200  if (timer_active) {
1201  NdbTick_getMicroTimer(&timer_stop);
1202  NDB_TICKS ticks = NdbTick_getMicrosPassed(timer_start, timer_stop);
1203  m_stats_latency->addObservation((double)ticks);
1204  }
1205 
1206  r += batch; // Read next record
1207  batch_no++;
1208  }
1209 
1210  deallocRows();
1211  indexScans.clear();
1212  g_info << "|- " << updated << " records updated" << endl;
1213  return NDBT_OK;
1214 }
1215 
1216 int
1217 HugoTransactions::pkInterpretedUpdateRecords(Ndb* pNdb,
1218  int records,
1219  int batch){
1220  int updated = 0;
1221  int r = 0;
1222  int retryAttempt = 0;
1223  int check, a;
1224 
1225  while (r < records){
1226 
1227  if (retryAttempt >= m_retryMax){
1228  g_info << "ERROR: has retried this operation " << retryAttempt
1229  << " times, failing!" << endl;
1230  return NDBT_FAILED;
1231  }
1232 
1233  pTrans = pNdb->startTransaction();
1234  if (pTrans == NULL) {
1235  const NdbError err = pNdb->getNdbError();
1236 
1237  if (err.status == NdbError::TemporaryError){
1238  ERR(err);
1239  NdbSleep_MilliSleep(50);
1240  retryAttempt++;
1241  continue;
1242  }
1243  ERR(err);
1244  return NDBT_FAILED;
1245  }
1246 
1247  NdbOperation* pOp = pTrans->getNdbOperation(tab.getName());
1248  if (pOp == NULL) {
1249  ERR(pTrans->getNdbError());
1250  closeTransaction(pNdb);
1251  return NDBT_FAILED;
1252  }
1253 
1254  check = pOp->readTupleExclusive();
1255  if( check == -1 ) {
1256  ERR(pTrans->getNdbError());
1257  closeTransaction(pNdb);
1258  return NDBT_FAILED;
1259  }
1260 
1261  // Define primary keys
1262  if (equalForRow(pOp, r) != 0)
1263  {
1264  closeTransaction(pNdb);
1265  return NDBT_FAILED;
1266  }
1267 
1268  // Read update value
1269  for(a = 0; a<tab.getNoOfColumns(); a++){
1270  if (calc.isUpdateCol(a) == true){
1271  if((row.attributeStore(a) =
1272  pOp->getValue(tab.getColumn(a)->getName())) == 0) {
1273  ERR(pTrans->getNdbError());
1274  closeTransaction(pNdb);
1275  return NDBT_FAILED;
1276  }
1277  }
1278  }
1279 
1280  check = pTrans->execute(NoCommit, AbortOnError);
1281  if( check == -1 ) {
1282  const NdbError err = pTrans->getNdbError();
1283 
1284  if (err.status == NdbError::TemporaryError){
1285  ERR(err);
1286  closeTransaction(pNdb);
1287  NdbSleep_MilliSleep(50);
1288  retryAttempt++;
1289  continue;
1290  }
1291  ERR(err);
1292  closeTransaction(pNdb);
1293  return NDBT_FAILED;
1294  }
1295 
1296  int updates = calc.getUpdatesValue(&row) + 1;
1297 
1298  NdbOperation* pUpdOp;
1299  pUpdOp = pTrans->getNdbOperation(tab.getName());
1300  if (pUpdOp == NULL) {
1301  ERR(pTrans->getNdbError());
1302  closeTransaction(pNdb);
1303  return NDBT_FAILED;
1304  }
1305 
1306  check = pUpdOp->interpretedUpdateTuple();
1307  if( check == -1 ) {
1308  ERR(pTrans->getNdbError());
1309  closeTransaction(pNdb);
1310  return NDBT_FAILED;
1311  }
1312 
1313  // PKs
1314  if (equalForRow(pUpdOp, r) != 0)
1315  {
1316  closeTransaction(pNdb);
1317  return NDBT_FAILED;
1318  }
1319 
1320  // Update col
1321  for(a = 0; a<tab.getNoOfColumns(); a++){
1322  if ((tab.getColumn(a)->getPrimaryKey() == false) &&
1323  (calc.isUpdateCol(a) == true)){
1324 
1325  // TODO switch for 32/64 bit
1326  const NdbDictionary::Column* attr = tab.getColumn(a);
1327  Uint32 valToIncWith = 1;
1328  check = pUpdOp->incValue(attr->getName(), valToIncWith);
1329  if( check == -1 ) {
1330  ERR(pTrans->getNdbError());
1331  closeTransaction(pNdb);
1332  return NDBT_FAILED;
1333  }
1334  }
1335  }
1336 
1337  // Remaining attributes
1338  for(a = 0; a<tab.getNoOfColumns(); a++){
1339  if ((tab.getColumn(a)->getPrimaryKey() == false) &&
1340  (calc.isUpdateCol(a) == false)){
1341  if(setValueForAttr(pUpdOp, a, r, updates ) != 0){
1342  ERR(pTrans->getNdbError());
1343  closeTransaction(pNdb);
1344  return NDBT_FAILED;
1345  }
1346  }
1347  }
1348 
1349 
1350 
1351  check = pTrans->execute(Commit, AbortOnError);
1352  if( check == -1 ) {
1353  const NdbError err = pTrans->getNdbError();
1354 
1355  if (err.status == NdbError::TemporaryError){
1356  ERR(err);
1357  closeTransaction(pNdb);
1358  NdbSleep_MilliSleep(50);
1359  retryAttempt++;
1360  continue;
1361  }
1362  ERR(err);
1363  ndbout << "r = " << r << endl;
1364  closeTransaction(pNdb);
1365  return NDBT_FAILED;
1366  }
1367  else{
1368  updated++;
1369  pTrans->getGCI(&m_latest_gci);
1370  }
1371 
1372 
1373  closeTransaction(pNdb);
1374 
1375  r++; // Read next record
1376 
1377  }
1378 
1379  g_info << "|- " << updated << " records updated" << endl;
1380  return NDBT_OK;
1381 }
1382 
1383 int
1384 HugoTransactions::pkDelRecords(Ndb* pNdb,
1385  int records,
1386  int batch,
1387  bool allowConstraintViolation,
1388  int doSleep){
1389  // TODO Batch is not implemented
1390  int deleted = 0;
1391  int r = 0;
1392  int retryAttempt = 0;
1393  int check;
1394 
1395  g_info << "|- Deleting records..." << endl;
1396  int batch_no = 0;
1397  while (r < records){
1398  if(r + batch > records)
1399  batch = records - r;
1400 
1401  if (m_thr_count != 0 && m_thr_no != batch_no % m_thr_count)
1402  {
1403  r += batch;
1404  batch_no++;
1405  continue;
1406  }
1407 
1408  if (retryAttempt >= m_retryMax){
1409  g_info << "ERROR: has retried this operation " << retryAttempt
1410  << " times, failing!" << endl;
1411  return NDBT_FAILED;
1412  }
1413 
1414  if (doSleep > 0)
1415  NdbSleep_MilliSleep(doSleep);
1416 
1417  pTrans = pNdb->startTransaction();
1418  if (pTrans == NULL) {
1419  const NdbError err = pNdb->getNdbError();
1420 
1421  if (err.status == NdbError::TemporaryError){
1422  ERR(err);
1423  NdbSleep_MilliSleep(50);
1424  retryAttempt++;
1425  continue;
1426  }
1427  ERR(err);
1428  return NDBT_FAILED;
1429  }
1430 
1431  MicroSecondTimer timer_start;
1432  MicroSecondTimer timer_stop;
1433  bool timer_active =
1434  m_stats_latency != 0 &&
1435  r >= batch && // first batch is "warmup"
1436  r + batch != records; // last batch is usually partial
1437 
1438  if (timer_active)
1439  NdbTick_getMicroTimer(&timer_start);
1440 
1441  if(pkDeleteRecord(pNdb, r, batch) != NDBT_OK)
1442  {
1443  ERR(pTrans->getNdbError());
1444  closeTransaction(pNdb);
1445  return NDBT_FAILED;
1446  }
1447 
1448  check = pTrans->execute(Commit, AbortOnError);
1449  if( check == -1) {
1450  const NdbError err = pTrans->getNdbError();
1451 
1452  switch(err.status){
1454  ERR(err);
1455  closeTransaction(pNdb);
1456  NdbSleep_MilliSleep(50);
1457  retryAttempt++;
1458  continue;
1459  break;
1460 
1462  if (allowConstraintViolation == true){
1463  switch (err.classification){
1465  // Tuple did not exist, OK but should be reported
1466  g_info << r << ": " << err.code << " " << err.message << endl;
1467  continue;
1468  break;
1469  default:
1470  break;
1471  }
1472  }
1473  ERR(err);
1474  closeTransaction(pNdb);
1475  return NDBT_FAILED;
1476  break;
1477 
1478  default:
1479  ERR(err);
1480  closeTransaction(pNdb);
1481  return NDBT_FAILED;
1482  }
1483  }
1484  else {
1485  deleted += batch;
1486  pTrans->getGCI(&m_latest_gci);
1487  }
1488  closeTransaction(pNdb);
1489 
1490  if (timer_active) {
1491  NdbTick_getMicroTimer(&timer_stop);
1492  NDB_TICKS ticks = NdbTick_getMicrosPassed(timer_start, timer_stop);
1493  m_stats_latency->addObservation((double)ticks);
1494  }
1495 
1496  r += batch; // Read next record
1497  batch_no++;
1498  }
1499 
1500  g_info << "|- " << deleted << " records deleted" << endl;
1501  return NDBT_OK;
1502 }
1503 
1504 int
1505 HugoTransactions::pkRefreshRecords(Ndb* pNdb,
1506  int startFrom,
1507  int count,
1508  int batch)
1509 {
1510  int r = 0;
1511  int retryAttempt = 0;
1512 
1513  g_info << "|- Refreshing records..." << startFrom << "-" << (startFrom+count)
1514  << " (batch=" << batch << ")" << endl;
1515 
1516  while (r < count)
1517  {
1518  if(r + batch > count)
1519  batch = count - r;
1520 
1521  if (retryAttempt >= m_retryMax)
1522  {
1523  g_info << "ERROR: has retried this operation " << retryAttempt
1524  << " times, failing!" << endl;
1525  return NDBT_FAILED;
1526  }
1527 
1528  pTrans = pNdb->startTransaction();
1529  if (pTrans == NULL)
1530  {
1531  const NdbError err = pNdb->getNdbError();
1532 
1533  if (err.status == NdbError::TemporaryError){
1534  ERR(err);
1535  NdbSleep_MilliSleep(50);
1536  retryAttempt++;
1537  continue;
1538  }
1539  ERR(err);
1540  return NDBT_FAILED;
1541  }
1542 
1543  if (pkRefreshRecord(pNdb, r, batch) != NDBT_OK)
1544  {
1545  ERR(pTrans->getNdbError());
1546  closeTransaction(pNdb);
1547  return NDBT_FAILED;
1548  }
1549 
1550  if (pTrans->execute(Commit, AbortOnError) == -1)
1551  {
1552  const NdbError err = pTrans->getNdbError();
1553 
1554  switch(err.status){
1556  ERR(err);
1557  closeTransaction(pNdb);
1558  NdbSleep_MilliSleep(50);
1559  retryAttempt++;
1560  continue;
1561  break;
1562 
1563  default:
1564  ERR(err);
1565  closeTransaction(pNdb);
1566  return NDBT_FAILED;
1567  }
1568  }
1569 
1570  closeTransaction(pNdb);
1571  r += batch; // Read next record
1572  }
1573 
1574  return NDBT_OK;
1575 }
1576 
1577 int
1578 HugoTransactions::pkReadUnlockRecords(Ndb* pNdb,
1579  int records,
1580  int batch,
1582 {
1583  int reads = 0;
1584  int r = 0;
1585  int retryAttempt = 0;
1586  int check;
1587 
1588  if (batch == 0) {
1589  g_info << "ERROR: Argument batch == 0 in pkReadRecords(). Not allowed." << endl;
1590  return NDBT_FAILED;
1591  }
1592 
1593  if (idx != NULL) {
1594  g_info << "ERROR: Cannot call pkReadUnlockRecords for index" << endl;
1595  return NDBT_FAILED;
1596  }
1597 
1598  while (r < records){
1599  if(r + batch > records)
1600  batch = records - r;
1601 
1602  if (retryAttempt >= m_retryMax){
1603  g_info << "ERROR: has retried this operation " << retryAttempt
1604  << " times, failing!" << endl;
1605  return NDBT_FAILED;
1606  }
1607 
1608  pTrans = pNdb->startTransaction();
1609  if (pTrans == NULL) {
1610  const NdbError err = pNdb->getNdbError();
1611 
1612  if (err.status == NdbError::TemporaryError){
1613  ERR(err);
1614  NdbSleep_MilliSleep(50);
1615  retryAttempt++;
1616  continue;
1617  }
1618  ERR(err);
1619  return NDBT_FAILED;
1620  }
1621 
1622  MicroSecondTimer timer_start;
1623  MicroSecondTimer timer_stop;
1624  bool timer_active =
1625  m_stats_latency != 0 &&
1626  r >= batch && // first batch is "warmup"
1627  r + batch != records; // last batch is usually partial
1628 
1629  if (timer_active)
1630  NdbTick_getMicroTimer(&timer_start);
1631 
1632  Vector<const NdbLockHandle*> lockHandles;
1633 
1634  NdbOperation::LockMode lmused;
1635  if(pkReadRecordLockHandle(pNdb, lockHandles, r, batch, lm, &lmused) != NDBT_OK)
1636  {
1637  ERR(pTrans->getNdbError());
1638  closeTransaction(pNdb);
1639  return NDBT_FAILED;
1640  }
1641 
1642  check = pTrans->execute(NoCommit, AbortOnError);
1643 
1644  if( check == -1 ) {
1645  const NdbError err = pTrans->getNdbError();
1646 
1647  if (err.status == NdbError::TemporaryError){
1648  ERR(err);
1649  closeTransaction(pNdb);
1650  NdbSleep_MilliSleep(50);
1651  retryAttempt++;
1652  continue;
1653  }
1654  switch(err.code){
1655  case 626: // Tuple did not exist
1656  g_info << r << ": " << err.code << " " << err.message << endl;
1657  r++;
1658  break;
1659 
1660  default:
1661  ERR(err);
1662  closeTransaction(pNdb);
1663  return NDBT_FAILED;
1664  }
1665  } else {
1666  /* Execute succeeded ok */
1667  for (int b=0; (b<batch) && (r+b<records); b++){
1668  if (calc.verifyRowValues(rows[b]) != 0){
1669  closeTransaction(pNdb);
1670  return NDBT_FAILED;
1671  }
1672  reads++;
1673  r++;
1674  }
1675 
1676  if (pkUnlockRecord(pNdb,
1677  lockHandles) != NDBT_OK)
1678  {
1679  closeTransaction(pNdb);
1680  return NDBT_FAILED;
1681  }
1682 
1683  check = pTrans->execute(Commit, AbortOnError);
1684 
1685  if (check == -1 )
1686  {
1687  const NdbError err = pTrans->getNdbError();
1688 
1689  if (err.status == NdbError::TemporaryError){
1690  ERR(err);
1691  closeTransaction(pNdb);
1692  NdbSleep_MilliSleep(50);
1693  retryAttempt++;
1694  continue;
1695  }
1696  ERR(err);
1697  closeTransaction(pNdb);
1698  return NDBT_FAILED;
1699  }
1700  }
1701 
1702  closeTransaction(pNdb);
1703 
1704  if (timer_active) {
1705  NdbTick_getMicroTimer(&timer_stop);
1706  NDB_TICKS ticks = NdbTick_getMicrosPassed(timer_start, timer_stop);
1707  m_stats_latency->addObservation((double)ticks);
1708  }
1709  }
1710  deallocRows();
1711  g_info << reads << " records read" << endl;
1712  return NDBT_OK;
1713 }
1714 
1715 
1716 int
1717 HugoTransactions::lockRecords(Ndb* pNdb,
1718  int records,
1719  int percentToLock,
1720  int lockTime){
1721  // Place a lock on percentToLock% of the records in the Db
1722  // Keep the locks for lockTime ms, commit operation
1723  // and lock som other records
1724  int r = 0;
1725  int retryAttempt = 0;
1726  int check;
1728 
1729  // Calculate how many records to lock in each batch
1730  if (percentToLock <= 0)
1731  percentToLock = 1;
1732  double percentVal = (double)percentToLock / 100;
1733  int lockBatch = (int)(records * percentVal);
1734  if (lockBatch <= 0)
1735  lockBatch = 1;
1736 
1737  allocRows(lockBatch);
1738 
1739  while (r < records){
1740  if(r + lockBatch > records)
1741  lockBatch = records - r;
1742 
1743  g_info << "|- Locking " << lockBatch << " records..." << endl;
1744 
1745  if (retryAttempt >= m_retryMax){
1746  g_info << "ERROR: has retried this operation " << retryAttempt
1747  << " times, failing!" << endl;
1748  return NDBT_FAILED;
1749  }
1750 
1751  pTrans = pNdb->startTransaction();
1752  if (pTrans == NULL) {
1753  const NdbError err = pNdb->getNdbError();
1754 
1755  if (err.status == NdbError::TemporaryError){
1756  ERR(err);
1757  NdbSleep_MilliSleep(50);
1758  retryAttempt++;
1759  continue;
1760  }
1761  ERR(err);
1762  return NDBT_FAILED;
1763  }
1764 
1765  if(pkReadRecord(pNdb, r, lockBatch, lm) != NDBT_OK)
1766  {
1767  ERR(pTrans->getNdbError());
1768  closeTransaction(pNdb);
1769  return NDBT_FAILED;
1770  }
1771 
1772  // NoCommit lockTime times with 100 millis interval
1773  int sleepInterval = 50;
1774  int lockCount = lockTime / sleepInterval;
1775  int commitCount = 0;
1776  bool tempErr = false;
1777  do {
1778  check = pTrans->execute(NoCommit, AbortOnError);
1779  if( check == -1) {
1780  const NdbError err = pTrans->getNdbError();
1781 
1782  if (err.status == NdbError::TemporaryError){
1783  ERR(err);
1784  closeTransaction(pNdb);
1785  NdbSleep_MilliSleep(50);
1786  tempErr = true;
1787  retryAttempt++;
1788  break;
1789  }
1790  ERR(err);
1791  closeTransaction(pNdb);
1792  return NDBT_FAILED;
1793  }
1794  for (int b=0; (b<lockBatch) && (r+b<records); b++){
1795  if (calc.verifyRowValues(rows[b]) != 0){
1796  closeTransaction(pNdb);
1797  return NDBT_FAILED;
1798  }
1799  }
1800  commitCount++;
1801  NdbSleep_MilliSleep(sleepInterval);
1802  } while (commitCount < lockCount);
1803 
1804  if (tempErr)
1805  continue; /* Retry lock attempt */
1806 
1807  // Really commit the trans, puuh!
1808  check = pTrans->execute(Commit, AbortOnError);
1809  if( check == -1) {
1810  const NdbError err = pTrans->getNdbError();
1811 
1812  if (err.status == NdbError::TemporaryError){
1813  ERR(err);
1814  closeTransaction(pNdb);
1815  NdbSleep_MilliSleep(50);
1816  retryAttempt++;
1817  continue;
1818  }
1819  ERR(err);
1820  closeTransaction(pNdb);
1821  return NDBT_FAILED;
1822  }
1823  else{
1824  for (int b=0; (b<lockBatch) && (r<records); b++){
1825  if (calc.verifyRowValues(rows[b]) != 0){
1826  closeTransaction(pNdb);
1827  return NDBT_FAILED;
1828  }
1829  r++; // Read next record
1830  }
1831  }
1832 
1833  closeTransaction(pNdb);
1834 
1835  }
1836  deallocRows();
1837  g_info << "|- Record locking completed" << endl;
1838  return NDBT_OK;
1839 }
1840 
1841 int
1843  const char * idxName,
1844  int records,
1845  int batch){
1846  int reads = 0;
1847  int r = 0;
1848  int retryAttempt = 0;
1849  int check, a;
1850  NdbOperation *pOp;
1851  NdbIndexScanOperation *sOp;
1852 
1853  const NdbDictionary::Index* pIndex
1854  = pNdb->getDictionary()->getIndex(idxName, tab.getName());
1855 
1856  const bool ordered = (pIndex->getType()==NdbDictionary::Index::OrderedIndex);
1857 
1858  if (batch == 0) {
1859  g_info << "ERROR: Argument batch == 0 in indexReadRecords(). "
1860  << "Not allowed." << endl;
1861  return NDBT_FAILED;
1862  }
1863 
1864  if (ordered) {
1865  batch = 1;
1866  }
1867 
1868  allocRows(batch);
1869 
1870  while (r < records){
1871  if (retryAttempt >= m_retryMax){
1872  g_info << "ERROR: has retried this operation " << retryAttempt
1873  << " times, failing!" << endl;
1874  return NDBT_FAILED;
1875  }
1876 
1877  pTrans = pNdb->startTransaction();
1878  if (pTrans == NULL) {
1879  const NdbError err = pNdb->getNdbError();
1880 
1881  if (err.status == NdbError::TemporaryError){
1882  ERR(err);
1883  NdbSleep_MilliSleep(50);
1884  retryAttempt++;
1885  continue;
1886  }
1887  ERR(err);
1888  return NDBT_FAILED;
1889  }
1890 
1891  for(int b=0; (b<batch) && (r+b < records); b++){
1892  if(!ordered){
1893  pOp = pTrans->getNdbIndexOperation(idxName, tab.getName());
1894  if (pOp == NULL) {
1895  ERR(pTrans->getNdbError());
1896  closeTransaction(pNdb);
1897  return NDBT_FAILED;
1898  }
1899  check = pOp->readTuple();
1900  } else {
1901  pOp = sOp = pTrans->getNdbIndexScanOperation(idxName, tab.getName());
1902  if (sOp == NULL) {
1903  ERR(pTrans->getNdbError());
1904  closeTransaction(pNdb);
1905  return NDBT_FAILED;
1906  }
1907  check = sOp->readTuples();
1908  }
1909 
1910  if( check == -1 ) {
1911  ERR(pTrans->getNdbError());
1912  closeTransaction(pNdb);
1913  return NDBT_FAILED;
1914  }
1915 
1916  // Define primary keys
1917  if (equalForRow(pOp, r+b) != 0)
1918  {
1919  closeTransaction(pNdb);
1920  return NDBT_FAILED;
1921  }
1922 
1923  // Define attributes to read
1924  for(a = 0; a<tab.getNoOfColumns(); a++){
1925  if((rows[b]->attributeStore(a) =
1926  pOp->getValue(tab.getColumn(a)->getName())) == 0) {
1927  ERR(pTrans->getNdbError());
1928  closeTransaction(pNdb);
1929  return NDBT_FAILED;
1930  }
1931  }
1932  }
1933 
1934  check = pTrans->execute(Commit, AbortOnError);
1935  check = (check == -1 ? -1 : !ordered ? check : sOp->nextResult(true));
1936  if( check == -1 ) {
1937  const NdbError err = pTrans->getNdbError();
1938 
1939  if (err.status == NdbError::TemporaryError){
1940  ERR(err);
1941  closeTransaction(pNdb);
1942  NdbSleep_MilliSleep(50);
1943  retryAttempt++;
1944  continue;
1945  }
1946  switch(err.code){
1947  case 626: // Tuple did not exist
1948  g_info << r << ": " << err.code << " " << err.message << endl;
1949  r++;
1950  break;
1951 
1952  default:
1953  ERR(err);
1954  closeTransaction(pNdb);
1955  return NDBT_FAILED;
1956  }
1957  } else{
1958  for (int b=0; (b<batch) && (r+b<records); b++){
1959  if (calc.verifyRowValues(rows[b]) != 0){
1960  closeTransaction(pNdb);
1961  return NDBT_FAILED;
1962  }
1963  reads++;
1964  r++;
1965  }
1966  if(ordered && sOp->nextResult(true) == 0){
1967  ndbout << "Error when comparing records "
1968  << " - index op next_result to many" << endl;
1969  closeTransaction(pNdb);
1970  return NDBT_FAILED;
1971  }
1972  }
1973  closeTransaction(pNdb);
1974  }
1975  deallocRows();
1976  g_info << reads << " records read" << endl;
1977  return NDBT_OK;
1978 }
1979 
1980 
1981 
1982 int
1983 HugoTransactions::indexUpdateRecords(Ndb* pNdb,
1984  const char * idxName,
1985  int records,
1986  int batch){
1987 
1988  int updated = 0;
1989  int r = 0;
1990  int retryAttempt = 0;
1991  int check, a, b;
1992  NdbOperation *pOp;
1993  NdbScanOperation * sOp;
1994 
1995  const NdbDictionary::Index* pIndex
1996  = pNdb->getDictionary()->getIndex(idxName, tab.getName());
1997 
1998  const bool ordered = (pIndex->getType()==NdbDictionary::Index::OrderedIndex);
1999  if (ordered){
2000  batch = 1;
2001  }
2002 
2003  allocRows(batch);
2004 
2005  while (r < records){
2006  if (retryAttempt >= m_retryMax){
2007  g_info << "ERROR: has retried this operation " << retryAttempt
2008  << " times, failing!" << endl;
2009  return NDBT_FAILED;
2010  }
2011 
2012  pTrans = pNdb->startTransaction();
2013  if (pTrans == NULL) {
2014  const NdbError err = pNdb->getNdbError();
2015 
2016  if (err.status == NdbError::TemporaryError){
2017  ERR(err);
2018  NdbSleep_MilliSleep(50);
2019  retryAttempt++;
2020  continue;
2021  }
2022  ERR(err);
2023  return NDBT_FAILED;
2024  }
2025 
2026  for(b = 0; b<batch && (b+r)<records; b++){
2027  if(!ordered){
2028  pOp = pTrans->getNdbIndexOperation(idxName, tab.getName());
2029  if (pOp == NULL) {
2030  ERR(pTrans->getNdbError());
2031  closeTransaction(pNdb);
2032  return NDBT_FAILED;
2033  }
2034 
2035  check = pOp->readTupleExclusive();
2036  if( check == -1 ) {
2037  ERR(pTrans->getNdbError());
2038  closeTransaction(pNdb);
2039  return NDBT_FAILED;
2040  }
2041  } else {
2042  pOp = sOp = pTrans->getNdbIndexScanOperation(idxName, tab.getName());
2043  if (pOp == NULL) {
2044  ERR(pTrans->getNdbError());
2045  closeTransaction(pNdb);
2046  return NDBT_FAILED;
2047  }
2048 
2049  check = 0;
2050  sOp->readTuplesExclusive();
2051  }
2052 
2053  // Define primary keys
2054  if (equalForRow(pOp, r+b) != 0)
2055  {
2056  closeTransaction(pNdb);
2057  return NDBT_FAILED;
2058  }
2059 
2060  // Define attributes to read
2061  for(a = 0; a<tab.getNoOfColumns(); a++){
2062  if((rows[b]->attributeStore(a) =
2063  pOp->getValue(tab.getColumn(a)->getName())) == 0) {
2064  ERR(pTrans->getNdbError());
2065  closeTransaction(pNdb);
2066  return NDBT_FAILED;
2067  }
2068  }
2069  }
2070 
2071  check = pTrans->execute(NoCommit, AbortOnError);
2072  check = (check == -1 ? -1 : !ordered ? check : sOp->nextResult(true));
2073  if( check == -1 ) {
2074  const NdbError err = pTrans->getNdbError();
2075  ERR(err);
2076  closeTransaction(pNdb);
2077 
2078  if (err.status == NdbError::TemporaryError){
2079  NdbSleep_MilliSleep(50);
2080  retryAttempt++;
2081  continue;
2082  }
2083  return NDBT_FAILED;
2084  }
2085 
2086  if(ordered && check != 0){
2087  g_err << check << " - Row: " << r << " not found!!" << endl;
2088  closeTransaction(pNdb);
2089  return NDBT_FAILED;
2090  }
2091 
2092  for(b = 0; b<batch && (b+r)<records; b++){
2093  if (calc.verifyRowValues(rows[b]) != 0){
2094  closeTransaction(pNdb);
2095  return NDBT_FAILED;
2096  }
2097 
2098  int updates = calc.getUpdatesValue(rows[b]) + 1;
2099 
2100  NdbOperation* pUpdOp;
2101  if(!ordered){
2102  pUpdOp = pTrans->getNdbIndexOperation(idxName, tab.getName());
2103  check = (pUpdOp == 0 ? -1 : pUpdOp->updateTuple());
2104  } else {
2105  pUpdOp = sOp->updateCurrentTuple();
2106  }
2107 
2108  if (pUpdOp == NULL) {
2109  ERR(pTrans->getNdbError());
2110  closeTransaction(pNdb);
2111  return NDBT_FAILED;
2112  }
2113 
2114  if( check == -1 ) {
2115  ERR(pTrans->getNdbError());
2116  closeTransaction(pNdb);
2117  return NDBT_FAILED;
2118  }
2119 
2120  if(!ordered)
2121  {
2122  if (equalForRow(pUpdOp, r+b) != 0)
2123  {
2124  closeTransaction(pNdb);
2125  return NDBT_FAILED;
2126  }
2127  }
2128 
2129  for(a = 0; a<tab.getNoOfColumns(); a++){
2130  if (tab.getColumn(a)->getPrimaryKey() == false){
2131  if(setValueForAttr(pUpdOp, a, r+b, updates ) != 0){
2132  ERR(pTrans->getNdbError());
2133  closeTransaction(pNdb);
2134  return NDBT_FAILED;
2135  }
2136  }
2137  }
2138  }
2139 
2140  check = pTrans->execute(Commit, AbortOnError);
2141  if( check == -1 ) {
2142  const NdbError err = pTrans->getNdbError();
2143  ERR(err);
2144  closeTransaction(pNdb);
2145 
2146  if (err.status == NdbError::TemporaryError){
2147  NdbSleep_MilliSleep(50);
2148  retryAttempt++;
2149  continue;
2150  }
2151  ndbout << "r = " << r << endl;
2152  return NDBT_FAILED;
2153  } else {
2154  updated += batch;
2155  pTrans->getGCI(&m_latest_gci);
2156  }
2157 
2158  closeTransaction(pNdb);
2159 
2160  r+= batch; // Read next record
2161  }
2162 
2163  g_info << "|- " << updated << " records updated" << endl;
2164  return NDBT_OK;
2165 }
2166 
2167 template class Vector<NDBT_ResultRow*>;
2168 template class Vector<NdbIndexScanOperation*>;