MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ndbapi_scan.cpp
1 
2 /*
3  Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
4 
5  This program is free software; you can redistribute it and/or modify
6  it under the terms of the GNU General Public License as published by
7  the Free Software Foundation; version 2 of the License.
8 
9  This program is distributed in the hope that it will be useful,
10  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  GNU General Public License for more details.
13 
14  You should have received a copy of the GNU General Public License
15  along with this program; if not, write to the Free Software
16  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18 
19 
20 /*
21  * ndbapi_scan.cpp:
22  * Illustrates how to use the scan api in the NDBAPI.
23  * The example shows how to do scan, scan for update and scan for delete
24  * using NdbScanFilter and NdbScanOperation
25  *
26  * Classes and methods used in this example:
27  *
28  * Ndb_cluster_connection
29  * connect()
30  * wait_until_ready()
31  *
32  * Ndb
33  * init()
34  * getDictionary()
35  * startTransaction()
36  * closeTransaction()
37  *
38  * NdbTransaction
39  * getNdbScanOperation()
40  * execute()
41  *
42  * NdbScanOperation
43  * getValue()
44  * readTuples()
45  * nextResult()
46  * deleteCurrentTuple()
47  * updateCurrentTuple()
48  *
49  * const NdbDictionary::Dictionary
50  * getTable()
51  *
52  * const NdbDictionary::Table
53  * getColumn()
54  *
55  * const NdbDictionary::Column
56  * getLength()
57  *
58  * NdbOperation
59  * insertTuple()
60  * equal()
61  * setValue()
62  *
63  * NdbScanFilter
64  * begin()
65  * eq()
66  * end()
67  *
68  */
69 
70 
71 #include <mysql.h>
72 #include <mysqld_error.h>
73 #include <NdbApi.hpp>
74 // Used for cout
75 #include <iostream>
76 #include <stdio.h>
77 #include <string.h>
78 #include <stdlib.h>
79 
83 static void
84 milliSleep(int milliseconds){
85  struct timeval sleeptime;
86  sleeptime.tv_sec = milliseconds / 1000;
87  sleeptime.tv_usec = (milliseconds - (sleeptime.tv_sec * 1000)) * 1000000;
88  select(0, 0, 0, 0, &sleeptime);
89 }
90 
91 
95 #define PRINT_ERROR(code,msg) \
96  std::cout << "Error in " << __FILE__ << ", line: " << __LINE__ \
97  << ", code: " << code \
98  << ", msg: " << msg << "." << std::endl
99 #define MYSQLERROR(mysql) { \
100  PRINT_ERROR(mysql_errno(&mysql),mysql_error(&mysql)); \
101  exit(-1); }
102 #define APIERROR(error) { \
103  PRINT_ERROR(error.code,error.message); \
104  exit(-1); }
105 
106 struct Car
107 {
112  Car() { memset(this, 0, sizeof(* this)); }
113 
114  unsigned int reg_no;
115  char brand[20];
116  char color[20];
117 };
118 
122 void drop_table(MYSQL &mysql)
123 {
124  if (mysql_query(&mysql, "DROP TABLE IF EXISTS api_scan"))
125  MYSQLERROR(mysql);
126 }
127 
128 
132 void create_table(MYSQL &mysql)
133 {
134  while (mysql_query(&mysql,
135  "CREATE TABLE"
136  " api_scan"
137  " (REG_NO INT UNSIGNED NOT NULL,"
138  " BRAND CHAR(20) NOT NULL,"
139  " COLOR CHAR(20) NOT NULL,"
140  " PRIMARY KEY USING HASH (REG_NO))"
141  " ENGINE=NDB"))
142  {
143  if (mysql_errno(&mysql) != ER_TABLE_EXISTS_ERROR)
144  MYSQLERROR(mysql);
145  std::cout << "MySQL Cluster already has example table: api_scan. "
146  << "Dropping it..." << std::endl;
147  drop_table(mysql);
148  }
149 }
150 
151 int populate(Ndb * myNdb)
152 {
153  int i;
154  Car cars[15];
155 
156  const NdbDictionary::Dictionary* myDict= myNdb->getDictionary();
157  const NdbDictionary::Table *myTable= myDict->getTable("api_scan");
158 
159  if (myTable == NULL)
160  APIERROR(myDict->getNdbError());
161 
165  for (i = 0; i < 5; i++)
166  {
167  cars[i].reg_no = i;
168  sprintf(cars[i].brand, "Mercedes");
169  sprintf(cars[i].color, "Blue");
170  }
171 
175  for (i = 5; i < 10; i++)
176  {
177  cars[i].reg_no = i;
178  sprintf(cars[i].brand, "BMW");
179  sprintf(cars[i].color, "Black");
180  }
181 
185  for (i = 10; i < 15; i++)
186  {
187  cars[i].reg_no = i;
188  sprintf(cars[i].brand, "Toyota");
189  sprintf(cars[i].color, "Pink");
190  }
191 
192  NdbTransaction* myTrans = myNdb->startTransaction();
193  if (myTrans == NULL)
194  APIERROR(myNdb->getNdbError());
195 
196  for (i = 0; i < 15; i++)
197  {
198  NdbOperation* myNdbOperation = myTrans->getNdbOperation(myTable);
199  if (myNdbOperation == NULL)
200  APIERROR(myTrans->getNdbError());
201  myNdbOperation->insertTuple();
202  myNdbOperation->equal("REG_NO", cars[i].reg_no);
203  myNdbOperation->setValue("BRAND", cars[i].brand);
204  myNdbOperation->setValue("COLOR", cars[i].color);
205  }
206 
207  int check = myTrans->execute(NdbTransaction::Commit);
208 
209  myTrans->close();
210 
211  return check != -1;
212 }
213 
214 int scan_delete(Ndb* myNdb,
215  int column,
216  const char * color)
217 
218 {
219 
220  // Scan all records exclusive and delete
221  // them one by one
222  int retryAttempt = 0;
223  const int retryMax = 10;
224  int deletedRows = 0;
225  int check;
226  NdbError err;
227  NdbTransaction *myTrans;
228  NdbScanOperation *myScanOp;
229 
230  const NdbDictionary::Dictionary* myDict= myNdb->getDictionary();
231  const NdbDictionary::Table *myTable= myDict->getTable("api_scan");
232 
233  if (myTable == NULL)
234  APIERROR(myDict->getNdbError());
235 
245  while (true)
246  {
247  if (retryAttempt >= retryMax)
248  {
249  std::cout << "ERROR: has retried this operation " << retryAttempt
250  << " times, failing!" << std::endl;
251  return -1;
252  }
253 
254  myTrans = myNdb->startTransaction();
255  if (myTrans == NULL)
256  {
257  const NdbError err = myNdb->getNdbError();
258 
259  if (err.status == NdbError::TemporaryError)
260  {
261  milliSleep(50);
262  retryAttempt++;
263  continue;
264  }
265  std::cout << err.message << std::endl;
266  return -1;
267  }
268 
272  myScanOp = myTrans->getNdbScanOperation(myTable);
273  if (myScanOp == NULL)
274  {
275  std::cout << myTrans->getNdbError().message << std::endl;
276  myNdb->closeTransaction(myTrans);
277  return -1;
278  }
279 
283  if(myScanOp->readTuples(NdbOperation::LM_Exclusive) != 0)
284  {
285  std::cout << myTrans->getNdbError().message << std::endl;
286  myNdb->closeTransaction(myTrans);
287  return -1;
288  }
289 
293  NdbScanFilter filter(myScanOp) ;
294  if(filter.begin(NdbScanFilter::AND) < 0 ||
295  filter.cmp(NdbScanFilter::COND_EQ, column, color, 20) < 0 ||
296  filter.end() < 0)
297  {
298  std::cout << myTrans->getNdbError().message << std::endl;
299  myNdb->closeTransaction(myTrans);
300  return -1;
301  }
302 
306  if(myTrans->execute(NdbTransaction::NoCommit) != 0){
307  err = myTrans->getNdbError();
308  if(err.status == NdbError::TemporaryError){
309  std::cout << myTrans->getNdbError().message << std::endl;
310  myNdb->closeTransaction(myTrans);
311  milliSleep(50);
312  continue;
313  }
314  std::cout << err.code << std::endl;
315  std::cout << myTrans->getNdbError().code << std::endl;
316  myNdb->closeTransaction(myTrans);
317  return -1;
318  }
319 
320 
325  while((check = myScanOp->nextResult(true)) == 0){
326  do
327  {
328  if (myScanOp->deleteCurrentTuple() != 0)
329  {
330  std::cout << myTrans->getNdbError().message << std::endl;
331  myNdb->closeTransaction(myTrans);
332  return -1;
333  }
334  deletedRows++;
335 
341  } while((check = myScanOp->nextResult(false)) == 0);
342 
346  if(check != -1)
347  {
348  check = myTrans->execute(NdbTransaction::NoCommit);
349  }
350 
354  err = myTrans->getNdbError();
355  if(check == -1)
356  {
358  {
359  std::cout << myTrans->getNdbError().message << std::endl;
360  myNdb->closeTransaction(myTrans);
361  milliSleep(50);
362  continue;
363  }
364  }
368  }
372  if(myTrans->execute(NdbTransaction::Commit) == -1)
373  {
374  if(err.status == NdbError::TemporaryError){
375  std::cout << myTrans->getNdbError().message << std::endl;
376  myNdb->closeTransaction(myTrans);
377  milliSleep(50);
378  continue;
379  }
380  }
381 
382  std::cout << myTrans->getNdbError().message << std::endl;
383  myNdb->closeTransaction(myTrans);
384  return 0;
385  }
386 
387  if(myTrans!=0)
388  {
389  std::cout << myTrans->getNdbError().message << std::endl;
390  myNdb->closeTransaction(myTrans);
391  }
392  return -1;
393 }
394 
395 
396 int scan_update(Ndb* myNdb,
397  int update_column,
398  const char * before_color,
399  const char * after_color)
400 
401 {
402 
403  // Scan all records exclusive and update
404  // them one by one
405  int retryAttempt = 0;
406  const int retryMax = 10;
407  int updatedRows = 0;
408  int check;
409  NdbError err;
410  NdbTransaction *myTrans;
411  NdbScanOperation *myScanOp;
412 
413  const NdbDictionary::Dictionary* myDict= myNdb->getDictionary();
414  const NdbDictionary::Table *myTable= myDict->getTable("api_scan");
415 
416  if (myTable == NULL)
417  APIERROR(myDict->getNdbError());
418 
428  while (true)
429  {
430 
431  if (retryAttempt >= retryMax)
432  {
433  std::cout << "ERROR: has retried this operation " << retryAttempt
434  << " times, failing!" << std::endl;
435  return -1;
436  }
437 
438  myTrans = myNdb->startTransaction();
439  if (myTrans == NULL)
440  {
441  const NdbError err = myNdb->getNdbError();
442 
443  if (err.status == NdbError::TemporaryError)
444  {
445  milliSleep(50);
446  retryAttempt++;
447  continue;
448  }
449  std::cout << err.message << std::endl;
450  return -1;
451  }
452 
456  myScanOp = myTrans->getNdbScanOperation(myTable);
457  if (myScanOp == NULL)
458  {
459  std::cout << myTrans->getNdbError().message << std::endl;
460  myNdb->closeTransaction(myTrans);
461  return -1;
462  }
463 
467  if( myScanOp->readTuples(NdbOperation::LM_Exclusive) )
468  {
469  std::cout << myTrans->getNdbError().message << std::endl;
470  myNdb->closeTransaction(myTrans);
471  return -1;
472  }
473 
477  NdbScanFilter filter(myScanOp) ;
478  if(filter.begin(NdbScanFilter::AND) < 0 ||
479  filter.cmp(NdbScanFilter::COND_EQ, update_column, before_color, 20) <0||
480  filter.end() <0)
481  {
482  std::cout << myTrans->getNdbError().message << std::endl;
483  myNdb->closeTransaction(myTrans);
484  return -1;
485  }
486 
490  if(myTrans->execute(NdbTransaction::NoCommit) != 0)
491  {
492  err = myTrans->getNdbError();
493  if(err.status == NdbError::TemporaryError){
494  std::cout << myTrans->getNdbError().message << std::endl;
495  myNdb->closeTransaction(myTrans);
496  milliSleep(50);
497  continue;
498  }
499  std::cout << myTrans->getNdbError().code << std::endl;
500  myNdb->closeTransaction(myTrans);
501  return -1;
502  }
503 
508  while((check = myScanOp->nextResult(true)) == 0){
509  do {
513  NdbOperation * myUpdateOp = myScanOp->updateCurrentTuple();
514  if (myUpdateOp == 0)
515  {
516  std::cout << myTrans->getNdbError().message << std::endl;
517  myNdb->closeTransaction(myTrans);
518  return -1;
519  }
520  updatedRows++;
521 
525  myUpdateOp->setValue(update_column, after_color);
531  } while((check = myScanOp->nextResult(false)) == 0);
532 
536  if(check != -1)
537  {
538  check = myTrans->execute(NdbTransaction::NoCommit);
539  }
540 
544  err = myTrans->getNdbError();
545  if(check == -1)
546  {
547  if(err.status == NdbError::TemporaryError){
548  std::cout << myTrans->getNdbError().message << std::endl;
549  myNdb->closeTransaction(myTrans);
550  milliSleep(50);
551  continue;
552  }
553  }
557  }
558 
562  if(myTrans->execute(NdbTransaction::Commit) == -1)
563  {
564  if(err.status == NdbError::TemporaryError){
565  std::cout << myTrans->getNdbError().message << std::endl;
566  myNdb->closeTransaction(myTrans);
567  milliSleep(50);
568  continue;
569  }
570  }
571 
572  std::cout << myTrans->getNdbError().message << std::endl;
573  myNdb->closeTransaction(myTrans);
574  return 0;
575  }
576 
577 
578  if(myTrans!=0)
579  {
580  std::cout << myTrans->getNdbError().message << std::endl;
581  myNdb->closeTransaction(myTrans);
582  }
583  return -1;
584 }
585 
586 
587 
588 int scan_print(Ndb * myNdb)
589 {
590 // Scan all records exclusive and update
591  // them one by one
592  int retryAttempt = 0;
593  const int retryMax = 10;
594  int fetchedRows = 0;
595  int check;
596  NdbError err;
597  NdbTransaction *myTrans;
598  NdbScanOperation *myScanOp;
599  /* Result of reading attribute value, three columns:
600  REG_NO, BRAND, and COLOR
601  */
602  NdbRecAttr * myRecAttr[3];
603 
604  const NdbDictionary::Dictionary* myDict= myNdb->getDictionary();
605  const NdbDictionary::Table *myTable= myDict->getTable("api_scan");
606 
607  if (myTable == NULL)
608  APIERROR(myDict->getNdbError());
609 
619  while (true)
620  {
621 
622  if (retryAttempt >= retryMax)
623  {
624  std::cout << "ERROR: has retried this operation " << retryAttempt
625  << " times, failing!" << std::endl;
626  return -1;
627  }
628 
629  myTrans = myNdb->startTransaction();
630  if (myTrans == NULL)
631  {
632  const NdbError err = myNdb->getNdbError();
633 
634  if (err.status == NdbError::TemporaryError)
635  {
636  milliSleep(50);
637  retryAttempt++;
638  continue;
639  }
640  std::cout << err.message << std::endl;
641  return -1;
642  }
643  /*
644  * Define a scan operation.
645  * NDBAPI.
646  */
647  myScanOp = myTrans->getNdbScanOperation(myTable);
648  if (myScanOp == NULL)
649  {
650  std::cout << myTrans->getNdbError().message << std::endl;
651  myNdb->closeTransaction(myTrans);
652  return -1;
653  }
654 
658  if( myScanOp->readTuples(NdbOperation::LM_CommittedRead) == -1)
659  {
660  std::cout << myTrans->getNdbError().message << std::endl;
661  myNdb->closeTransaction(myTrans);
662  return -1;
663  }
664 
671  myRecAttr[0] = myScanOp->getValue("REG_NO");
672  myRecAttr[1] = myScanOp->getValue("BRAND");
673  myRecAttr[2] = myScanOp->getValue("COLOR");
674  if(myRecAttr[0] ==NULL || myRecAttr[1] == NULL || myRecAttr[2]==NULL)
675  {
676  std::cout << myTrans->getNdbError().message << std::endl;
677  myNdb->closeTransaction(myTrans);
678  return -1;
679  }
683  if(myTrans->execute(NdbTransaction::NoCommit) != 0){
684  err = myTrans->getNdbError();
685  if(err.status == NdbError::TemporaryError){
686  std::cout << myTrans->getNdbError().message << std::endl;
687  myNdb->closeTransaction(myTrans);
688  milliSleep(50);
689  continue;
690  }
691  std::cout << err.code << std::endl;
692  std::cout << myTrans->getNdbError().code << std::endl;
693  myNdb->closeTransaction(myTrans);
694  return -1;
695  }
696 
701  while((check = myScanOp->nextResult(true)) == 0){
702  do {
703 
704  fetchedRows++;
708  std::cout << myRecAttr[0]->u_32_value() << "\t";
709 
713  std::cout << myRecAttr[1]->aRef() << "\t";
714 
718  std::cout << myRecAttr[2]->aRef() << std::endl;
719 
725  } while((check = myScanOp->nextResult(false)) == 0);
726 
727  }
728  myNdb->closeTransaction(myTrans);
729  return 1;
730  }
731  return -1;
732 
733 }
734 
735 
736 int main(int argc, char** argv)
737 {
738  if (argc != 3)
739  {
740  std::cout << "Arguments are <socket mysqld> <connect_string cluster>.\n";
741  exit(-1);
742  }
743  char * mysqld_sock = argv[1];
744  const char *connectstring = argv[2];
745  ndb_init();
746  MYSQL mysql;
747 
748  /**************************************************************
749  * Connect to mysql server and create table *
750  **************************************************************/
751  {
752  if ( !mysql_init(&mysql) ) {
753  std::cout << "mysql_init failed\n";
754  exit(-1);
755  }
756  if ( !mysql_real_connect(&mysql, "localhost", "root", "", "",
757  0, mysqld_sock, 0) )
758  MYSQLERROR(mysql);
759 
760  mysql_query(&mysql, "CREATE DATABASE ndb_examples");
761  if (mysql_query(&mysql, "USE ndb_examples") != 0) MYSQLERROR(mysql);
762 
763  create_table(mysql);
764  }
765 
766  /**************************************************************
767  * Connect to ndb cluster *
768  **************************************************************/
769 
770  Ndb_cluster_connection cluster_connection(connectstring);
771  if (cluster_connection.connect(4, 5, 1))
772  {
773  std::cout << "Unable to connect to cluster within 30 secs." << std::endl;
774  exit(-1);
775  }
776  // Optionally connect and wait for the storage nodes (ndbd's)
777  if (cluster_connection.wait_until_ready(30,0) < 0)
778  {
779  std::cout << "Cluster was not ready within 30 secs.\n";
780  exit(-1);
781  }
782 
783  Ndb myNdb(&cluster_connection,"ndb_examples");
784  if (myNdb.init(1024) == -1) { // Set max 1024 parallel transactions
785  APIERROR(myNdb.getNdbError());
786  exit(-1);
787  }
788 
789  /*******************************************
790  * Check table definition *
791  *******************************************/
792  int column_color;
793  {
794  const NdbDictionary::Dictionary* myDict= myNdb.getDictionary();
795  const NdbDictionary::Table *t= myDict->getTable("api_scan");
796  if(t == NULL)
797  {
798  std::cout << "Dictionary::getTable() failed.";
799  exit(-1);
800  }
801  Car car;
802  if (t->getColumn("COLOR")->getLength() != sizeof(car.color) ||
803  t->getColumn("BRAND")->getLength() != sizeof(car.brand))
804  {
805  std::cout << "Wrong table definition" << std::endl;
806  exit(-1);
807  }
808  column_color= t->getColumn("COLOR")->getColumnNo();
809  }
810 
811  if(populate(&myNdb) > 0)
812  std::cout << "populate: Success!" << std::endl;
813 
814  if(scan_print(&myNdb) > 0)
815  std::cout << "scan_print: Success!" << std::endl << std::endl;
816 
817  std::cout << "Going to delete all pink cars!" << std::endl;
818 
819  {
823  Car tmp;
824  sprintf(tmp.color, "Pink");
825  if(scan_delete(&myNdb, column_color, tmp.color) > 0)
826  std::cout << "scan_delete: Success!" << std::endl << std::endl;
827  }
828 
829  if(scan_print(&myNdb) > 0)
830  std::cout << "scan_print: Success!" << std::endl << std::endl;
831 
832  {
836  Car tmp1, tmp2;
837  sprintf(tmp1.color, "Blue");
838  sprintf(tmp2.color, "Black");
839  std::cout << "Going to update all " << tmp1.color
840  << " cars to " << tmp2.color << " cars!" << std::endl;
841  if(scan_update(&myNdb, column_color, tmp1.color, tmp2.color) > 0)
842  std::cout << "scan_update: Success!" << std::endl << std::endl;
843  }
844  if(scan_print(&myNdb) > 0)
845  std::cout << "scan_print: Success!" << std::endl << std::endl;
846 
850  drop_table(mysql);
851 
852  return 0;
853 }