MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
consumer_restorem.cpp
1 /*
2  Copyright (C) 2004-2006 MySQL AB, 2009 Sun Microsystems, Inc.
3  All rights reserved. Use is subject to license terms.
4 
5  This program is free software; you can redistribute it and/or modify
6  it under the terms of the GNU General Public License as published by
7  the Free Software Foundation; version 2 of the License.
8 
9  This program is distributed in the hope that it will be useful,
10  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  GNU General Public License for more details.
13 
14  You should have received a copy of the GNU General Public License
15  along with this program; if not, write to the Free Software
16  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18 
19 #include "consumer_restore.hpp"
20 #include <NdbSleep.h>
21 
22 extern FilteredNdbOut err;
23 extern FilteredNdbOut info;
24 extern FilteredNdbOut debug;
25 
26 static bool asynchErrorHandler(NdbTransaction * trans, Ndb * ndb);
27 static void callback(int result, NdbTransaction* trans, void* aObject);
28 
29 bool
30 BackupRestore::init()
31 {
32 
33  if (!m_restore && !m_restore_meta)
34  return true;
35 
36  m_ndb = new Ndb();
37 
38  if (m_ndb == NULL)
39  return false;
40 
41  // Turn off table name completion
42  m_ndb->useFullyQualifiedNames(false);
43 
44  m_ndb->init(1024);
45  if (m_ndb->waitUntilReady(30) != 0)
46  {
47  ndbout << "Failed to connect to ndb!!" << endl;
48  return false;
49  }
50  ndbout << "Connected to ndb!!" << endl;
51 
52 #if USE_MYSQL
53  if(use_mysql)
54  {
55  if ( mysql_thread_safe() == 0 )
56  {
57  ndbout << "Not thread safe mysql library..." << endl;
58  exit(-1);
59  }
60 
61  ndbout << "Connecting to MySQL..." <<endl;
62 
69  bool returnValue = true;
70  mysql_init(&mysql);
71  {
72  int portNo = 3306;
73  if ( mysql_real_connect(&mysql,
74  ga_host,
75  ga_user,
76  ga_password,
77  ga_database,
78  ga_port,
79 :: ga_socket,
80  0) == NULL )
81  {
82  ndbout_c("Connect failed: %s", mysql_error(&mysql));
83  returnValue = false;
84  }
85  mysql.reconnect= 1;
86  ndbout << "Connected to MySQL!!!" <<endl;
87  }
88 
89  /* if(returnValue){
90  mysql_set_server_option(&mysql, MYSQL_OPTION_MULTI_STATEMENTS_ON);
91  }
92  */
93  return returnValue;
94  }
95 #endif
96 
97  if (m_callback) {
98  delete [] m_callback;
99  m_callback = 0;
100  }
101 
102  m_callback = new restore_callback_t[m_parallelism];
103 
104  if (m_callback == 0)
105  {
106  ndbout << "Failed to allocate callback structs" << endl;
107  return false;
108  }
109 
110  m_free_callback = m_callback;
111  for (int i= 0; i < m_parallelism; i++) {
112  m_callback[i].restore = this;
113  m_callback[i].connection = 0;
114  m_callback[i].retries = 0;
115  if (i > 0)
116  m_callback[i-1].next = &(m_callback[i]);
117  }
118  m_callback[m_parallelism-1].next = 0;
119 
120  return true;
121 
122 }
123 
124 BackupRestore::~BackupRestore()
125 {
126  if (m_ndb != 0)
127  delete m_ndb;
128 
129  if (m_callback)
130  delete [] m_callback;
131 }
132 
133 #ifdef USE_MYSQL
134 bool
135 BackupRestore::table(const TableS & table, MYSQL * mysqlp){
136  if (!m_restore_meta)
137  {
138  return true;
139  }
140 
141  char tmpTabName[MAX_TAB_NAME_SIZE*2];
142  sprintf(tmpTabName, "%s", table.getTableName());
143  char * database = strtok(tmpTabName, "/");
144  char * schema = strtok( NULL , "/");
145  char * tableName = strtok( NULL , "/");
146 
151  if(database == NULL)
152  return false;
153  if(schema == NULL)
154  return false;
155  if(tableName==NULL)
156  tableName = schema;
157 
158  char stmtCreateDB[255];
159  sprintf(stmtCreateDB,"CREATE DATABASE %s", database);
160 
161  /*ignore return value. mysql_select_db will trap errors anyways*/
162  if (mysql_query(mysqlp,stmtCreateDB) == 0)
163  {
164  //ndbout_c("%s", stmtCreateDB);
165  }
166 
167  if (mysql_select_db(&mysql, database) != 0)
168  {
169  ndbout_c("Error: %s", mysql_error(&mysql));
170  return false;
171  }
172 
173  char buf [2048];
177  if (create_table_string(table, tableName, buf))
178  {
179  ndbout_c("Unable to create a table definition since the "
180  "backup contains undefined types");
181  return false;
182  }
183 
184  //ndbout_c("%s", buf);
185 
186  if (mysql_query(mysqlp,buf) != 0)
187  {
188  ndbout_c("Error: %s", mysql_error(&mysql));
189  return false;
190  } else
191  {
192  ndbout_c("Successfully restored table %s into database %s", tableName, database);
193  }
194 
195  return true;
196 }
197 #endif
198 
199 bool
200 BackupRestore::table(const TableS & table){
201  if (!m_restore_meta)
202  {
203  return true;
204  }
205  NdbDictionary::Dictionary* dict = m_ndb->getDictionary();
206  if (dict->createTable(*table.m_dictTable) == -1)
207  {
208  err << "Create table " << table.getTableName() << " failed: "
209  << dict->getNdbError() << endl;
210  return false;
211  }
212  info << "Successfully restored table " << table.getTableName()<< endl ;
213  return true;
214 }
215 
216 void BackupRestore::tuple(const TupleS & tup, Uint32 fragId)
217 {
218  if (!m_restore)
219  {
220  delete &tup;
221  return;
222  }
223 
224  restore_callback_t * cb = m_free_callback;
225 
226  if (cb)
227  {
228  m_free_callback = cb->next;
229  cb->retries = 0;
230  cb->fragId = fragId;
231  cb->tup = &tup;
232  tuple_a(cb);
233  }
234 
235  if (m_free_callback == 0)
236  {
237  // send-poll all transactions
238  // close transaction is done in callback
239  m_ndb->sendPollNdb(3000, 1);
240  }
241 }
242 
244 {
245  while (cb->retries < 10)
246  {
250  cb->connection = m_ndb->startTransaction();
251  if (cb->connection == NULL)
252  {
253  /*
254  if (asynchErrorHandler(cb->connection, m_ndb))
255  {
256  cb->retries++;
257  continue;
258  }
259  */
260  asynchExitHandler();
261  } // if
262 
263  const TupleS &tup = *(cb->tup);
264  const TableS * table = tup.getTable();
265  NdbOperation * op = cb->connection->getNdbOperation(table->getTableName());
266 
267  if (op == NULL)
268  {
269  if (asynchErrorHandler(cb->connection, m_ndb))
270  {
271  cb->retries++;
272  continue;
273  }
274  asynchExitHandler();
275  } // if
276 
277  if (op->writeTuple() == -1)
278  {
279  if (asynchErrorHandler(cb->connection, m_ndb))
280  {
281  cb->retries++;
282  continue;
283  }
284  asynchExitHandler();
285  } // if
286 
287  Uint32 ret = 0;
288  for (int i = 0; i < tup.getNoOfAttributes(); i++)
289  {
290  const AttributeS * attr = tup[i];
291  int size = attr->Desc->size;
292  int arraySize = attr->Desc->arraySize;
293  char * dataPtr = attr->Data.string_value;
294  Uint32 length = (size * arraySize) / 8;
295  if (attr->Desc->m_column->getPrimaryKey())
296  {
297  ret = op->equal(i, dataPtr, length);
298  }
299  else
300  {
301  if (attr->Data.null)
302  ret = op->setValue(i, NULL, 0);
303  else
304  ret = op->setValue(i, dataPtr, length);
305  }
306 
307  if (ret<0)
308  {
309  ndbout_c("Column: %d type %d",i,
310  tup.getTable()->m_dictTable->getColumn(i)->getType());
311  if (asynchErrorHandler(cb->connection, m_ndb))
312  {
313  cb->retries++;
314  break;
315  }
316  asynchExitHandler();
317  }
318  }
319  if (ret < 0)
320  continue;
321 
322  // Prepare transaction (the transaction is NOT yet sent to NDB)
323  cb->connection->executeAsynchPrepare(Commit, &callback, cb);
324  m_transactions++;
325  }
326  ndbout_c("Unable to recover from errors. Exiting...");
327  asynchExitHandler();
328 }
329 
330 void BackupRestore::cback(int result, restore_callback_t *cb)
331 {
332  if (result<0)
333  {
337  if (asynchErrorHandler(cb->connection, m_ndb))
338  {
339  cb->retries++;
340  tuple_a(cb);
341  }
342  else
343  {
344  ndbout_c("Restore: Failed to restore data "
345  "due to a unrecoverable error. Exiting...");
346  delete m_ndb;
347  delete cb->tup;
348  exit(-1);
349  }
350  }
351  else
352  {
356  m_ndb->closeTransaction(cb->connection);
357  delete cb->tup;
358  m_transactions--;
359  }
360 }
361 
362 void BackupRestore::asynchExitHandler()
363 {
364  if (m_ndb != NULL)
365  delete m_ndb;
366  exit(-1);
367 }
368 
369 #if 0 // old tuple impl
370 void
371 BackupRestore::tuple(const TupleS & tup)
372 {
373  if (!m_restore)
374  return;
375  while (1)
376  {
377  NdbTransaction * trans = m_ndb->startTransaction();
378  if (trans == NULL)
379  {
380  // TODO: handle the error
381  ndbout << "Cannot start transaction" << endl;
382  exit(-1);
383  } // if
384 
385  const TableS * table = tup.getTable();
386  NdbOperation * op = trans->getNdbOperation(table->getTableName());
387  if (op == NULL)
388  {
389  ndbout << "Cannot get operation: ";
390  ndbout << trans->getNdbError() << endl;
391  exit(-1);
392  } // if
393 
394  // TODO: check return value and handle error
395  if (op->writeTuple() == -1)
396  {
397  ndbout << "writeTuple call failed: ";
398  ndbout << trans->getNdbError() << endl;
399  exit(-1);
400  } // if
401 
402  for (int i = 0; i < tup.getNoOfAttributes(); i++)
403  {
404  const AttributeS * attr = tup[i];
405  int size = attr->Desc->size;
406  int arraySize = attr->Desc->arraySize;
407  const char * dataPtr = attr->Data.string_value;
408 
409  const Uint32 length = (size * arraySize) / 8;
410  if (attr->Desc->m_column->getPrimaryKey())
411  op->equal(i, dataPtr, length);
412  }
413 
414  for (int i = 0; i < tup.getNoOfAttributes(); i++)
415  {
416  const AttributeS * attr = tup[i];
417  int size = attr->Desc->size;
418  int arraySize = attr->Desc->arraySize;
419  const char * dataPtr = attr->Data.string_value;
420 
421  const Uint32 length = (size * arraySize) / 8;
422  if (!attr->Desc->m_column->getPrimaryKey())
423  if (attr->Data.null)
424  op->setValue(i, NULL, 0);
425  else
426  op->setValue(i, dataPtr, length);
427  }
428  int ret = trans->execute(Commit);
429  if (ret != 0)
430  {
431  ndbout << "execute failed: ";
432  ndbout << trans->getNdbError() << endl;
433  exit(-1);
434  }
435  m_ndb->closeTransaction(trans);
436  if (ret == 0)
437  break;
438  }
439  m_dataCount++;
440 }
441 #endif
442 
443 void
444 BackupRestore::endOfTuples()
445 {
446  if (!m_restore)
447  return;
448 
449  // Send all transactions to NDB
450  m_ndb->sendPreparedTransactions(0);
451 
452  // Poll all transactions
453  m_ndb->pollNdb(3000, m_transactions);
454 
455  // Close all transactions
456  // for (int i = 0; i < nPreparedTransactions; i++)
457  // m_ndb->closeTransaction(asynchTrans[i]);
458 }
459 
460 void
461 BackupRestore::logEntry(const LogEntry & tup)
462 {
463  if (!m_restore)
464  return;
465 
466  NdbTransaction * trans = m_ndb->startTransaction();
467  if (trans == NULL)
468  {
469  // TODO: handle the error
470  ndbout << "Cannot start transaction" << endl;
471  exit(-1);
472  } // if
473 
474  const TableS * table = tup.m_table;
475  NdbOperation * op = trans->getNdbOperation(table->getTableName());
476  if (op == NULL)
477  {
478  ndbout << "Cannot get operation: ";
479  ndbout << trans->getNdbError() << endl;
480  exit(-1);
481  } // if
482 
483  int check = 0;
484  switch(tup.m_type)
485  {
486  case LogEntry::LE_INSERT:
487  check = op->insertTuple();
488  break;
489  case LogEntry::LE_UPDATE:
490  check = op->updateTuple();
491  break;
492  case LogEntry::LE_DELETE:
493  check = op->deleteTuple();
494  break;
495  default:
496  ndbout << "Log entry has wrong operation type."
497  << " Exiting...";
498  exit(-1);
499  }
500 
501  for (int i = 0; i < tup.m_values.size(); i++)
502  {
503  const AttributeS * attr = tup.m_values[i];
504  int size = attr->Desc->size;
505  int arraySize = attr->Desc->arraySize;
506  const char * dataPtr = attr->Data.string_value;
507 
508  const Uint32 length = (size / 8) * arraySize;
509  if (attr->Desc->m_column->getPrimaryKey())
510  op->equal(attr->Desc->attrId, dataPtr, length);
511  else
512  op->setValue(attr->Desc->attrId, dataPtr, length);
513  }
514 
515 #if 1
516  trans->execute(Commit);
517 #else
518  const int ret = trans->execute(Commit);
519  // Both insert update and delete can fail during log running
520  // and it's ok
521 
522  if (ret != 0)
523  {
524  ndbout << "execute failed: ";
525  ndbout << trans->getNdbError() << endl;
526  exit(-1);
527  }
528 #endif
529 
530  m_ndb->closeTransaction(trans);
531  m_logCount++;
532 }
533 
534 void
535 BackupRestore::endOfLogEntrys()
536 {
537  if (m_restore)
538  {
539  ndbout << "Restored " << m_dataCount << " tuples and "
540  << m_logCount << " log entries" << endl;
541  }
542 }
543 #if 0
544 /*****************************************
545  *
546  * Callback function for asynchronous transactions
547  *
548  * Idea for error handling: Transaction objects have to be stored globally when
549  * they are prepared.
550  * In the callback function if the transaction:
551  * succeeded: delete the object from global storage
552  * failed but can be retried: execute the object that is in global storage
553  * failed but fatal: delete the object from global storage
554  *
555  ******************************************/
556 static void restoreCallback(int result, // Result for transaction
557  NdbTransaction *object, // Transaction object
558  void *anything) // Not used
559 {
560  static Uint32 counter = 0;
561 
562 
563  debug << "restoreCallback function called " << counter << " time(s)" << endl;
564 
565  ++counter;
566 
567  if (result == -1)
568  {
569  ndbout << " restoreCallback (" << counter;
570  if ((counter % 10) == 1)
571  {
572  ndbout << "st";
573  } // if
574  else if ((counter % 10) == 2)
575  {
576  ndbout << "nd";
577  } // else if
578  else if ((counter % 10 ) ==3)
579  {
580  ndbout << "rd";
581  } // else if
582  else
583  {
584  ndbout << "th";
585  } // else
586  err << " time: error detected " << object->getNdbError() << endl;
587  } // if
588 
589 } // restoreCallback
590 #endif
591 
592 
593 
594 /*
595  * callback : This is called when the transaction is polled
596  *
597  * (This function must have three arguments:
598  * - The result of the transaction,
599  * - The NdbTransaction object, and
600  * - A pointer to an arbitrary object.)
601  */
602 
603 static void
604 callback(int result, NdbTransaction* trans, void* aObject)
605 {
606  restore_callback_t *cb = (restore_callback_t *)aObject;
607  (cb->restore)->cback(result, cb);
608 }
609 
615 static
616 bool asynchErrorHandler(NdbTransaction * trans, Ndb* ndb)
617 {
618  NdbError error = trans->getNdbError();
619  ndb->closeTransaction(trans);
620  switch(error.status)
621  {
622  case NdbError::Success:
623  return false;
624  // ERROR!
625  break;
626 
628  NdbSleep_MilliSleep(10);
629  return true;
630  // RETRY
631  break;
632 
634  ndbout << error << endl;
635  return false;
636  // ERROR!
637  break;
638 
639  default:
641  switch (error.code)
642  {
643  case 499:
644  case 250:
645  NdbSleep_MilliSleep(10);
646  return true; //temp errors?
647  default:
648  break;
649  }
650  //ERROR
651  ndbout << error << endl;
652  return false;
653  break;
654  }
655  return false;
656 }