MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
HugoAsynchTransactions.cpp
1 /*
2  Copyright (C) 2003-2008 MySQL AB, 2008 Sun Microsystems, Inc.
3  All rights reserved. Use is subject to license terms.
4 
5  This program is free software; you can redistribute it and/or modify
6  it under the terms of the GNU General Public License as published by
7  the Free Software Foundation; version 2 of the License.
8 
9  This program is distributed in the hope that it will be useful,
10  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  GNU General Public License for more details.
13 
14  You should have received a copy of the GNU General Public License
15  along with this program; if not, write to the Free Software
16  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18 
19 #include <ndb_global.h>
20 #include <NdbSleep.h>
21 #include <HugoAsynchTransactions.hpp>
22 #include <random.h>
23 
24 
25 HugoAsynchTransactions::HugoAsynchTransactions(const NdbDictionary::Table& _t)
26  : HugoTransactions(_t),
27  transactionsCompleted(0),
28  transInfo(NULL),
29  theNdb(NULL),
30  totalLoops(0),
31  recordsPerLoop(0),
32  operationType(NO_READ),
33  execType(Commit),
34  nextUnProcessedRecord(0),
35  loopNum(0),
36  totalCompletedRecords(0),
37  maxUsedRetries(0),
38  finished(false),
39  testResult(NDBT_OK)
40 {
41 }
42 
43 HugoAsynchTransactions::~HugoAsynchTransactions(){
44  deallocTransactions();
45 }
46 
47 int
48 HugoAsynchTransactions::loadTableAsynch(Ndb* pNdb,
49  int records,
50  int batch,
51  int trans,
52  int operations){
53 
54  int result = executeAsynchOperation(pNdb, records, batch, trans, operations,
55  NO_INSERT);
56  g_info << (unsigned int)transactionsCompleted * operations
57  << "|- inserted..." << endl;
58 
59  return result;
60 }
61 
62 int
63 HugoAsynchTransactions::pkDelRecordsAsynch(Ndb* pNdb,
64  int records,
65  int batch,
66  int trans,
67  int operations) {
68 
69  g_info << "|- Deleting records asynchronous..." << endl;
70 
71  int result = executeAsynchOperation(pNdb, records, batch, trans,
72  operations,
73  NO_DELETE);
74  g_info << "|- " << (unsigned int)transactionsCompleted * operations
75  << " deleted..." << endl;
76 
77  return result;
78 }
79 
80 int
81 HugoAsynchTransactions::pkReadRecordsAsynch(Ndb* pNdb,
82  int records,
83  int batch,
84  int trans,
85  int operations) {
86 
87  g_info << "|- Reading records asynchronous..." << endl;
88 
89  allocRows(trans*operations);
90  int result = executeAsynchOperation(pNdb, records, batch, trans, operations,
91  NO_READ);
92 
93  g_info << "|- " << (unsigned int)transactionsCompleted * operations
94  << " read..."
95  << endl;
96 
97  deallocRows();
98 
99  return result;
100 }
101 
102 int
103 HugoAsynchTransactions::pkUpdateRecordsAsynch(Ndb* pNdb,
104  int records,
105  int batch,
106  int trans,
107  int operations) {
108 
109  g_info << "|- Updating records asynchronous..." << endl;
110 
111  allocRows(trans*operations);
112  int result = executeAsynchOperation(pNdb, records, batch, trans, operations,
113  NO_UPDATE);
114 
115  g_info << "|- " << (unsigned int)transactionsCompleted * operations
116  << " read..."
117  << endl;
118 
119  deallocRows();
120 
121  return result;
122 };
123 
124 
125 void
126 HugoAsynchTransactions::allocTransactions(int trans, int maxOpsPerTrans) {
127  if (transInfo != NULL) {
128  deallocTransactions();
129  }
130  transInfo = new TransactionInfo[trans];
131 
132  /* Initialise transaction info array */
133  TransactionInfo init;
134  init.hugoP= this;
135  init.transaction= NULL;
136  init.startRecordId= 0;
137  init.numRecords= 0;
138  init.resultRowStartIndex= 0;
139  init.retries= 0;
140  init.opType= NO_READ;
141 
142  for (int i=0; i < trans; i++)
143  {
144  transInfo[i]= init;
145  transInfo[i].resultRowStartIndex= (i * maxOpsPerTrans);
146  };
147 }
148 
149 void
150 HugoAsynchTransactions::deallocTransactions() {
151  if (transInfo != NULL){
152  delete[] transInfo;
153  }
154  transInfo = NULL;
155 }
156 
157 int
158 HugoAsynchTransactions::getNextWorkTask(int* startRecordId, int* numRecords)
159 {
160  /* Get a start record id and # of records for the next work task
161  * We return a range of up to maxOpsPerTrans records
162  * If there are no unprocessed records remaining, we return -1
163  */
164  if (nextUnProcessedRecord == recordsPerLoop)
165  {
166  /* If we've completed all loops then stop. Otherwise, loop around */
167  if ((loopNum + 1) == totalLoops)
168  return -1; // All work has been dispatched
169  else
170  {
171  loopNum++;
172  nextUnProcessedRecord= 0;
173  }
174  }
175 
176  int availableRecords= recordsPerLoop- nextUnProcessedRecord;
177  int recordsInTask= (availableRecords < maxOpsPerTrans)?
178  availableRecords : maxOpsPerTrans;
179 
180  *startRecordId= nextUnProcessedRecord;
181  *numRecords= recordsInTask;
182 
183  nextUnProcessedRecord+= recordsInTask;
184 
185  return 0;
186 }
187 
188 int
189 HugoAsynchTransactions::defineUpdateOpsForTask(TransactionInfo* tInfo)
190 {
191  int check= 0;
192  int a= 0;
193 
194  NdbTransaction* trans= tInfo->transaction;
195 
196  if (trans == NULL) {
197  return -1;
198  }
199 
200  for (int recordId= tInfo->startRecordId;
201  recordId < (tInfo->startRecordId + tInfo->numRecords);
202  recordId++)
203  {
204  NdbOperation* pOp= trans->getNdbOperation(tab.getName());
205  if (pOp == NULL) {
206  ERR(trans->getNdbError());
207  trans->close();
208  return -1;
209  }
210 
211  /* We assume that row values have already been read. */
212  int updateVal= calc.getUpdatesValue(rows[recordId]) + 1;
213 
214  check= pOp->updateTuple();
215  if (equalForRow(pOp, recordId) != 0)
216  {
217  ERR(trans->getNdbError());
218  trans->close();
219  return -1;
220  }
221  // Update the record
222  for (a = 0; a < tab.getNoOfColumns(); a++) {
223  if (tab.getColumn(a)->getPrimaryKey() == false) {
224  if (setValueForAttr(pOp, a, recordId, updateVal) != 0) {
225  ERR(trans->getNdbError());
226  trans->close();
227  return -1;
228  }
229  }
230  }
231  } // For recordId
232 
233  return 0;
234 }
235 
236 int
237 HugoAsynchTransactions::defineTransactionForTask(TransactionInfo* tInfo,
238  ExecType taskExecType)
239 {
240  int check= 0;
241  int a= 0;
242  NdbTransaction* trans= theNdb->startTransaction();
243 
244  if (trans == NULL) {
245  ERR(theNdb->getNdbError());
246  return -1;
247  }
248 
249  for (int recordId= tInfo->startRecordId;
250  recordId < (tInfo->startRecordId + tInfo->numRecords);
251  recordId++)
252  {
253  NdbOperation* pOp= trans->getNdbOperation(tab.getName());
254  if (pOp == NULL) {
255  ERR(trans->getNdbError());
256  theNdb->closeTransaction(trans);
257  return -1;
258  }
259 
260  switch (tInfo->opType) {
261  case NO_INSERT:
262  // Insert
263  check = pOp->insertTuple();
264  if (check == -1) {
265  ERR(trans->getNdbError());
266  theNdb->closeTransaction(trans);
267  return -1;
268  }
269 
270  // Set a calculated value for each attribute in this table
271  for (a = 0; a < tab.getNoOfColumns(); a++) {
272  if (setValueForAttr(pOp, a, recordId, 0 ) != 0) {
273  ERR(trans->getNdbError());
274  theNdb->closeTransaction(trans);
275  return -1;
276  }
277  } // For each attribute
278  break;
279  case NO_UPDATE:
280  {
281  g_err << "Attempt to define update transaction" << endl;
282  return -1;
283  }
284  case NO_READ:
285  // Define primary keys
286  check = pOp->readTuple();
287  if (equalForRow(pOp, recordId) != 0)
288  {
289  ERR(trans->getNdbError());
290  theNdb->closeTransaction(trans);
291  return -1;
292  }
293  // Define attributes to read
294  for (a = 0; a < tab.getNoOfColumns(); a++) {
295  if ((rows[recordId]->attributeStore(a) =
296  pOp->getValue(tab.getColumn(a)->getName())) == 0) {
297  ERR(trans->getNdbError());
298  theNdb->closeTransaction(trans);
299  return -1;
300  }
301  }
302  break;
303  case NO_DELETE:
304  // Delete
305  check = pOp->deleteTuple();
306  if (check == -1) {
307  ERR(trans->getNdbError());
308  theNdb->closeTransaction(trans);
309  return -1;
310  }
311 
312  // Define primary keys
313  if (equalForRow(pOp, recordId) != 0)
314  {
315  ERR(trans->getNdbError());
316  theNdb->closeTransaction(trans);
317  return -1;
318  }
319  break;
320  default:
321  // Should not happen...
322  theNdb->closeTransaction(trans);
323  return -1;
324  }
325  } // For recordId
326 
327  tInfo->transaction= trans;
328 
329  /* Now send it */
330  tInfo->transaction->executeAsynch(taskExecType,
331  &callbackFunc,
332  tInfo);
333 
334  return 0;
335 }
336 
337 int
338 HugoAsynchTransactions::beginNewTask(TransactionInfo* tInfo)
339 {
340  tInfo->transaction= NULL;
341  tInfo->startRecordId= 0;
342  tInfo->numRecords= 0;
343  tInfo->retries= 0;
344 
345  /* Adjust for update special case */
346  NDB_OPERATION realOpType= operationType;
347  ExecType realExecType= execType;
348  if (operationType == NO_UPDATE)
349  {
350  realOpType= NO_READ;
351  realExecType= NoCommit;
352  }
353  tInfo->opType= realOpType;
354 
355  if (getNextWorkTask(&tInfo->startRecordId,
356  &tInfo->numRecords) == 0)
357  {
358  /* Have a task to do */
359  if (defineTransactionForTask(tInfo, realExecType) != 0)
360  {
361  g_err << "Error defining new transaction" << endl;
362  return -1;
363  }
364 
365  return 0;
366  }
367  else
368  {
369  /* No more work to do */
370  return 1;
371  }
372 }
373 
374 void
375 HugoAsynchTransactions::callbackFunc(int result,
376  NdbConnection* pTrans,
377  void* anObject) {
378  /* Execute callback method on passed object */
379  HugoAsynchTransactions::TransactionInfo* tranInfo=
380  (HugoAsynchTransactions::TransactionInfo*) anObject;
381 
382  tranInfo->hugoP->callback(result, pTrans, tranInfo);
383 };
384 
385 
386 void
387 HugoAsynchTransactions::callback(int result,
388  NdbConnection* pTrans,
389  TransactionInfo* tInfo)
390 {
391  if (finished)
392  return; // No point continuing here
393 
394  // Paranoia
395  if (pTrans != tInfo->transaction)
396  {
397  g_err << "Transactions not same in callback!" << endl;
398  finished= true;
399  testResult= NDBT_FAILED;
400  return;
401  }
402 
403  NdbError transErr= pTrans->getNdbError();
404 
405  if (transErr.code == 0)
406  {
407  /* This transaction executed successfully, perform post-execution
408  * steps
409  */
410  switch (tInfo->opType)
411  {
412  case NO_READ:
413  // Verify the data!
414  for (int recordId = tInfo->startRecordId;
415  recordId < (tInfo->startRecordId + tInfo->numRecords);
416  recordId++)
417  {
418  if (calc.verifyRowValues(rows[recordId]) != 0) {
419  g_info << "|- Verify failed..." << endl;
420  // Close all transactions
421  finished= true;
422  testResult= NDBT_FAILED;
423  return;
424  }
425  }
426 
427  if (operationType == NO_UPDATE)
428  {
429  /* Read part of update completed, now define the update...*/
430  if (defineUpdateOpsForTask(tInfo) == 0)
431  {
432  tInfo->opType= NO_UPDATE;
433  tInfo->transaction->executeAsynch(Commit,
434  &callbackFunc,
435  tInfo);
436  }
437  else
438  {
439  g_err << "Error defining update operations in callback" << endl;
440  finished= true;
441  testResult= NDBT_FAILED;
442  }
443 
444  /* return to polling loop awaiting completion of updates...*/
445  return;
446  }
447 
448  break;
449  case NO_UPDATE:
450  case NO_INSERT:
451  case NO_DELETE:
452  break;
453  }
454 
455  /* Task completed successfully
456  * Now close the transaction, and start next task, if there is one
457  */
458  pTrans->close();
459  transactionsCompleted ++;
460  totalCompletedRecords+= tInfo->numRecords;
461 
462  if (beginNewTask(tInfo) < 0)
463  {
464  finished= true;
465  testResult= NDBT_FAILED;
466  }
467  }
468  else
469  {
470  /* We have had some sort of issue with this transaction ... */
471  g_err << "Callback got error on task : "
472  << tInfo->startRecordId << " to "
473  << tInfo->startRecordId + tInfo->numRecords << " "
474  << transErr.code << ":"
475  << transErr.message
476  << ". Task type : " << tInfo->opType << endl;
477 
478  switch(transErr.status) {
480 
481  if (tInfo->retries < 10) // Support up to 10 retries
482  {
483  /* Retry original request */
484  tInfo->retries++;
485  tInfo->transaction->close();
486 
487  if (tInfo->retries > maxUsedRetries)
488  maxUsedRetries= tInfo->retries;
489 
490  /* Exponential backoff - note that this also delays callback
491  * handling for other outstanding transactions so in effect
492  * serialises processing
493  */
494  int multiplier= 1 << tInfo->retries;
495  int base= 200; // millis
496  int backoffMillis= multiplier*base + myRandom48(base);
497 
498  g_err << " Error is temporary, retrying in "
499  << backoffMillis << " millis. Retry number "
500  << tInfo->retries << endl;
501  NdbSleep_MilliSleep(backoffMillis);
502 
503  /* If we failed somewhere in an update operation, redo from the start
504  * (including reads)
505  */
506  tInfo->opType= operationType;
507  ExecType taskExecType= execType;
508  if (operationType == NO_UPDATE)
509  {
510  tInfo->opType= NO_READ;
511  taskExecType= NoCommit;
512  }
513 
514  /* Define a new transction to perform the original task */
515  if (defineTransactionForTask(tInfo, taskExecType) != 0)
516  {
517  g_err << "Error defining retry transaction in callback" << endl;
518  finished= true;
519  testResult= NDBT_FAILED;
520  }
521 
522  break;
523  }
524 
525  g_err << "Too many retries (" << tInfo->retries
526  << ") failing." << endl;
527  // Fall through
528 
529  default:
530  /* Non temporary error */
531  ERR(transErr);
532  g_err << "Status= " << transErr.status << " Failing test" << endl;
533  testResult= NDBT_FAILED;
534  finished= true;
535  break;
536  };
537  } // Successful execution
538 } // callbackFunc
539 
540 int
541 HugoAsynchTransactions::executeAsynchOperation(Ndb* pNdb,
542  int records,
543  int batch,
544  int trans,
545  int operations,
546  NDB_OPERATION theOperation,
547  ExecType theType) {
548 
549  /* We want to process 'records' records using at most 'trans' transactions,
550  * each with at most 'operations' operations.
551  * This is done 'batch' times.
552  * This procedure sets up the control state, and starts the first 'trans'
553  * transactions
554  * After that the execution completion callback code handles operation
555  * results, and initiating new transactions or retrying failed transactions
556  * as necessary.
557  * If there is a failure, the finished bool is set, which is detected in the
558  * polling loop below.
559  * If all of the requested records have been read, this is detected in the
560  * loop below
561  * Note that Update operations are a special case, comprising a read, executed
562  * with NoCommit, followed by an Update executed with Commit.
563  */
564 
565  theNdb= pNdb;
566  totalLoops= batch;
567  loopNum= 0;
568  recordsPerLoop= records;
569  maxOpsPerTrans= operations;
570  operationType= theOperation;
571  execType= theType;
572  nextUnProcessedRecord= 0;
573  totalCompletedRecords= 0;
574  maxUsedRetries= 0;
575  finished= false;
576  testResult= NDBT_OK;
577 
578  allocTransactions(trans, maxOpsPerTrans);
579 
580  /* Start by defining all transactions */
581  int nextUndefinedTrans= 0;
582  while ((nextUndefinedTrans < trans) &&
583  (beginNewTask(&transInfo[nextUndefinedTrans++]) == 0))
584  { /* Empty */ };
585 
586  /* Poll for results, the transaction callback will handle results
587  * and initiate new operations as necessary, setting finished to
588  * true if there's a problem.
589  */
590  while (!finished)
591  {
592  pNdb->pollNdb(3000,0);
593 
594  if (totalCompletedRecords == (records * totalLoops))
595  finished = true;
596  };
597 
598  deallocTransactions();
599  theNdb= NULL;
600 
601  return testResult;
602 }