MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ndbapi_async.cpp
1 
2 
3 /*
4  Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
5 
6  This program is free software; you can redistribute it and/or modify
7  it under the terms of the GNU General Public License as published by
8  the Free Software Foundation; version 2 of the License.
9 
10  This program is distributed in the hope that it will be useful,
11  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  GNU General Public License for more details.
14 
15  You should have received a copy of the GNU General Public License
16  along with this program; if not, write to the Free Software
17  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 */
19 
20 
52 #include <mysql.h>
53 #include <mysqld_error.h>
54 #include <NdbApi.hpp>
55 
56 #include <iostream> // Used for cout
57 
61 static void
62 milliSleep(int milliseconds){
63  struct timeval sleeptime;
64  sleeptime.tv_sec = milliseconds / 1000;
65  sleeptime.tv_usec = (milliseconds - (sleeptime.tv_sec * 1000)) * 1000000;
66  select(0, 0, 0, 0, &sleeptime);
67 }
68 
69 
73 #define PRINT_ERROR(code,msg) \
74  std::cout << "Error in " << __FILE__ << ", line: " << __LINE__ \
75  << ", code: " << code \
76  << ", msg: " << msg << "." << std::endl
77 #define MYSQLERROR(mysql) { \
78  PRINT_ERROR(mysql_errno(&mysql),mysql_error(&mysql)); \
79  exit(-1); }
80 #define APIERROR(error) { \
81  PRINT_ERROR(error.code,error.message); \
82  exit(-1); }
83 
84 #ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
85 
91 typedef struct {
92  Ndb * ndb;
93  int transaction;
94  int data;
95  int retries;
97 
101 typedef struct {
103  int used;
104 } transaction_t;
105 
109 transaction_t transaction[1024]; //1024 - max number of outstanding
110  //transaction in one Ndb object
111 
112 #endif
113 
120 int populate(Ndb * myNdb, int data, async_callback_t * cbData);
121 
125 bool asynchErrorHandler(NdbTransaction * trans, Ndb* ndb);
126 
130 void asynchExitHandler(Ndb * m_ndb) ;
131 
135 void closeTransaction(Ndb * ndb , async_callback_t * cb);
136 
140 void create_table(MYSQL &mysql);
141 
145 void drop_table(MYSQL &mysql);
146 
150 int tempErrors = 0;
151 int permErrors = 0;
152 
153 void
154 closeTransaction(Ndb * ndb , async_callback_t * cb)
155 {
156  ndb->closeTransaction(transaction[cb->transaction].conn);
157  transaction[cb->transaction].conn = 0;
158  transaction[cb->transaction].used = 0;
159  cb->retries++;
160 }
161 
165 static void
166 callback(int result, NdbTransaction* trans, void* aObject)
167 {
168  async_callback_t * cbData = (async_callback_t *)aObject;
169  if (result<0)
170  {
174  if (asynchErrorHandler(trans, (Ndb*)cbData->ndb))
175  {
176  closeTransaction((Ndb*)cbData->ndb, cbData);
177  while(populate((Ndb*)cbData->ndb, cbData->data, cbData) < 0)
178  milliSleep(10);
179  }
180  else
181  {
182  std::cout << "Restore: Failed to restore data "
183  << "due to a unrecoverable error. Exiting..." << std::endl;
184  delete cbData;
185  asynchExitHandler((Ndb*)cbData->ndb);
186  }
187  }
188  else
189  {
193  closeTransaction((Ndb*)cbData->ndb, cbData);
194  delete cbData;
195  }
196 }
197 
198 
202 void create_table(MYSQL &mysql)
203 {
204  while (mysql_query(&mysql,
205  "CREATE TABLE"
206  " api_async"
207  " (REG_NO INT UNSIGNED NOT NULL,"
208  " BRAND CHAR(20) NOT NULL,"
209  " COLOR CHAR(20) NOT NULL,"
210  " PRIMARY KEY USING HASH (REG_NO))"
211  " ENGINE=NDB"))
212  {
213  if (mysql_errno(&mysql) != ER_TABLE_EXISTS_ERROR)
214  MYSQLERROR(mysql);
215  std::cout << "MySQL Cluster already has example table: api_async. "
216  << "Dropping it..." << std::endl;
217  drop_table(mysql);
218  }
219 }
220 
224 void drop_table(MYSQL &mysql)
225 {
226  if (mysql_query(&mysql, "DROP TABLE api_async"))
227  MYSQLERROR(mysql);
228 }
229 
230 
231 void asynchExitHandler(Ndb * m_ndb)
232 {
233  if (m_ndb != NULL)
234  delete m_ndb;
235  exit(-1);
236 }
237 
238 /* returns true if is recoverable (temporary),
239  * false if it is an error that is permanent.
240  */
241 bool asynchErrorHandler(NdbTransaction * trans, Ndb* ndb)
242 {
243  NdbError error = trans->getNdbError();
244  switch(error.status)
245  {
246  case NdbError::Success:
247  return false;
248  break;
249 
262  return false;
263  milliSleep(10);
264  tempErrors++;
265  return true;
266  break;
268  std::cout << error.message << std::endl;
269  return false;
270  break;
271  default:
273  switch (error.code)
274  {
275  case 499:
276  case 250:
277  milliSleep(10);
278  return true; // SCAN errors that can be retried. Requires restart of scan.
279  default:
280  break;
281  }
282  //ERROR
283  std::cout << error.message << std::endl;
284  return false;
285  break;
286  }
287  return false;
288 }
289 
290 static int nPreparedTransactions = 0;
291 static int MAX_RETRIES = 10;
292 static int parallelism = 100;
293 
294 
295 /************************************************************************
296  * populate()
297  * 1. Prepare 'parallelism' number of insert transactions.
298  * 2. Send transactions to NDB and wait for callbacks to execute
299  */
300 int populate(Ndb * myNdb, int data, async_callback_t * cbData)
301 {
302 
303  NdbOperation* myNdbOperation; // For operations
304  const NdbDictionary::Dictionary* myDict= myNdb->getDictionary();
305  const NdbDictionary::Table *myTable= myDict->getTable("api_async");
306  if (myTable == NULL)
307  APIERROR(myDict->getNdbError());
308 
309  async_callback_t * cb;
310  int retries = 0;
311  int current = 0;
312  for(int i=0; i<1024; i++)
313  {
314  if(transaction[i].used == 0)
315  {
316  current = i;
317  if (cbData == 0)
318  {
323  cb = new async_callback_t;
324  cb->retries = 0;
325  }
326  else
327  {
331  cb =cbData;
332  retries = cbData->retries;
333  }
337  cb->ndb = myNdb; //handle to Ndb object so that we can close transaction
338  // in the callback (alt. make myNdb global).
339 
340  cb->data = data; //this is the data we want to insert
341  cb->transaction = current; //This is the number (id) of this transaction
342  transaction[current].used = 1 ; //Mark the transaction as used
343  break;
344  }
345  }
346  if(!current)
347  return -1;
348 
349  while(retries < MAX_RETRIES)
350  {
351  transaction[current].conn = myNdb->startTransaction();
352  if (transaction[current].conn == NULL) {
356  milliSleep(10);
357  retries++;
358  continue;
359  }
360  myNdbOperation = transaction[current].conn->getNdbOperation(myTable);
361  if (myNdbOperation == NULL)
362  {
363  if (asynchErrorHandler(transaction[current].conn, myNdb))
364  {
365  myNdb->closeTransaction(transaction[current].conn);
366  transaction[current].conn = 0;
367  milliSleep(10);
368  retries++;
369  continue;
370  }
371  asynchExitHandler(myNdb);
372  } // if
373  if(myNdbOperation->insertTuple() < 0 ||
374  myNdbOperation->equal("REG_NO", data) < 0 ||
375  myNdbOperation->setValue("BRAND", "Mercedes") <0 ||
376  myNdbOperation->setValue("COLOR", "Blue") < 0)
377  {
378  if (asynchErrorHandler(transaction[current].conn, myNdb))
379  {
380  myNdb->closeTransaction(transaction[current].conn);
381  transaction[current].conn = 0;
382  retries++;
383  milliSleep(10);
384  continue;
385  }
386  asynchExitHandler(myNdb);
387  }
388 
389  /*Prepare transaction (the transaction is NOT yet sent to NDB)*/
390  transaction[current].conn->executeAsynchPrepare(NdbTransaction::Commit,
391  &callback,
392  cb);
400  if (nPreparedTransactions == parallelism-1)
401  {
402  // send-poll all transactions
403  // close transaction is done in callback
404  myNdb->sendPollNdb(3000, parallelism );
405  nPreparedTransactions=0;
406  }
407  else
408  nPreparedTransactions++;
409  return 1;
410  }
411  std::cout << "Unable to recover from errors. Exiting..." << std::endl;
412  asynchExitHandler(myNdb);
413  return -1;
414 }
415 
416 int main(int argc, char** argv)
417 {
418  if (argc != 3)
419  {
420  std::cout << "Arguments are <socket mysqld> <connect_string cluster>.\n";
421  exit(-1);
422  }
423  char * mysqld_sock = argv[1];
424  const char *connectstring = argv[2];
425  ndb_init();
426  MYSQL mysql;
427 
428  /**************************************************************
429  * Connect to mysql server and create table *
430  **************************************************************/
431  {
432  if ( !mysql_init(&mysql) ) {
433  std::cout << "mysql_init failed\n";
434  exit(-1);
435  }
436  if ( !mysql_real_connect(&mysql, "localhost", "root", "", "",
437  0, mysqld_sock, 0) )
438  MYSQLERROR(mysql);
439 
440  mysql_query(&mysql, "CREATE DATABASE ndb_examples");
441  if (mysql_query(&mysql, "USE ndb_examples") != 0) MYSQLERROR(mysql);
442 
443  create_table(mysql);
444  }
445 
446  /**************************************************************
447  * Connect to ndb cluster *
448  **************************************************************/
449  Ndb_cluster_connection cluster_connection(connectstring);
450  if (cluster_connection.connect(4, 5, 1))
451  {
452  std::cout << "Unable to connect to cluster within 30 secs." << std::endl;
453  exit(-1);
454  }
455  // Optionally connect and wait for the storage nodes (ndbd's)
456  if (cluster_connection.wait_until_ready(30,0) < 0)
457  {
458  std::cout << "Cluster was not ready within 30 secs.\n";
459  exit(-1);
460  }
461 
462  Ndb* myNdb = new Ndb( &cluster_connection,
463  "ndb_examples" ); // Object representing the database
464  if (myNdb->init(1024) == -1) { // Set max 1024 parallel transactions
465  APIERROR(myNdb->getNdbError());
466  }
467 
471  for(int i = 0 ; i < 10 ; i++)
472  {
473  transaction[i].used = 0;
474  transaction[i].conn = 0;
475 
476  }
477  int i=0;
481  while(i < 10)
482  {
483  while(populate(myNdb,i,0)<0) // <0, no space on free list. Sleep and try again.
484  milliSleep(10);
485 
486  i++;
487  }
488  std::cout << "Number of temporary errors: " << tempErrors << std::endl;
489  delete myNdb;
490 }