MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
spj_performance_test.cpp
1 /*
2  Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 #include <assert.h>
19 #include <mysql.h>
20 #include <mysqld_error.h>
21 
22 //#include <iostream>
23 //#include <stdio.h>
24 
25 #include <ndb_global.h>
26 #include <ndb_opts.h>
27 #include <NDBT.hpp>
28 #include <NdbApi.hpp>
29 #include "../../src/ndbapi/NdbQueryBuilder.hpp"
30 #include "../../src/ndbapi/NdbQueryOperation.hpp"
31 #include <pthread.h>
32 #include <NdbTick.h>
33 
34 
35 
36 #ifdef NDEBUG
37 // Some asserts have side effects, and there is no other error handling anyway.
38 #define ASSERT_ALWAYS(cond) if(unlikely(!(cond))){abort();}
39 #else
40 #define ASSERT_ALWAYS assert
41 #endif
42 
43 #if 0
44 
47 #define PRINT_ERROR(code,msg) \
48  std::cout << "Error in " << __FILE__ << ", line: " << __LINE__ \
49  << ", code: " << code \
50  << ", msg: " << msg << "." << std::endl
51 
52 #define APIERROR(error) { \
53  PRINT_ERROR((error).code,(error).message); \
54  exit(-1); }
55 #endif
56 
57 
58 //const char* databaseName = "TEST_DB";
59 //const char* tableName = "T";
60 const char* databaseName = "PTDB";
61 const char* tableName = "TT";
62 
64 public:
65  int m_iterations;
67  int m_depth;
68  int m_scanLength; // m_scanLength==0 means root should be lookup.
73  bool m_useLinkedOperations;
75  bool m_useSQL;
76 
77  explicit TestParameters(){bzero(this, sizeof *this);}
78 };
79 
81 static void *callback(void* thread);
82 
83 class TestThread{
84  friend void *callback(void* thread);
85 public:
86  explicit TestThread(Ndb_cluster_connection& con, const char* host, int port);
87  ~TestThread();
89  void start(const TestParameters& params);
91  void wait();
92 private:
93  struct Row{
94  Uint32 a;
95  Uint32 b;
96  };
97 
98  struct KeyRow{
99  Uint32 a;
100  };
101 
102  const TestParameters* m_params;
103  Ndb m_ndb;
104  enum {State_Active, State_Stopping, State_Stopped} m_state;
105  pthread_t m_posixThread;
106  pthread_mutex_t m_mutex;
107  pthread_cond_t m_condition;
108  const NdbDictionary::Table* m_tab;
109  const NdbDictionary::Index* m_index;
110  const NdbRecord* m_resultRec;
111  const NdbRecord* m_keyRec;
112  const NdbRecord* m_indexRec;
113  MYSQL m_mysql;
114 
116  void run();
117  void doLinkedAPITest();
118  void doNonLinkedAPITest();
119  void doSQLTest();
120 };
121 
122 static void *callback(void* thread){
123  reinterpret_cast<TestThread*>(thread)->run();
124  return NULL;
125 }
126 
127 static void printMySQLError(MYSQL& mysql, const char* before=NULL){
128  if(before!=NULL){
129  ndbout << before;
130  }
131  ndbout << mysql_error(&mysql) << endl;
132  exit(-1);
133 }
134 
135 static void mySQLExec(MYSQL& mysql, const char* stmt){
136  //ndbout << stmt << endl;
137  if(mysql_query(&mysql, stmt) != 0){
138  ndbout << "Error executing '" << stmt << "' : ";
139  printMySQLError(mysql);
140  }
141  mysql_free_result(mysql_use_result(&mysql));
142 }
143 
144 // TestThread methods.
145 TestThread::TestThread(Ndb_cluster_connection& con,
146  const char* host,
147  int port):
148  m_params(NULL),
149  m_ndb(&con, databaseName),
150  m_state(State_Active){
151  ASSERT_ALWAYS(m_ndb.init()==0);
152  ASSERT_ALWAYS(pthread_mutex_init(&m_mutex, NULL)==0);
153  ASSERT_ALWAYS(pthread_cond_init(&m_condition, NULL)==0);
154  ASSERT_ALWAYS(pthread_create(&m_posixThread, NULL, callback, this)
155  ==0);
156  NdbDictionary::Dictionary* const dict = m_ndb.getDictionary();
157  m_tab = dict->getTable(tableName);
158 
159  m_index = dict->getIndex("PRIMARY", tableName);
160  ASSERT_ALWAYS(m_index != NULL);
161 
162  /* Create NdbRecord for row. */
163  m_resultRec = m_tab->getDefaultRecord();
164  ASSERT_ALWAYS(m_resultRec!=NULL);
165 
166  /* Create NdbRecord for primary key. */
167  const NdbDictionary::Column *col1= m_tab->getColumn("a");
168  ASSERT_ALWAYS(col1 != NULL);
170  col1, 0, 0, 0
171  };
172 
173  m_keyRec = dict->createRecord(m_tab, &spec, 1, sizeof spec);
174  ASSERT_ALWAYS(m_keyRec != NULL);
175 
176  m_indexRec = m_index->getDefaultRecord();
177  ASSERT_ALWAYS(m_indexRec != NULL);
178 
179  // Make SQL connection.
180  ASSERT_ALWAYS(mysql_init(&m_mysql));
181  if(!mysql_real_connect(&m_mysql, host, "root", "", "",
182  port, NULL, 0)){
183  printMySQLError(m_mysql, "mysql_real_connect() failed:");
184  ASSERT_ALWAYS(false);
185  }
186  char text[50];
187  sprintf(text, "use %s", databaseName);
188  mySQLExec(m_mysql, text);
189 }
190 
191 TestThread::~TestThread(){
192  ASSERT_ALWAYS(pthread_mutex_lock(&m_mutex)==0);
193  // Tell thread to stop.
194  m_state = State_Stopping;
195  ASSERT_ALWAYS(pthread_cond_signal(&m_condition)==0);
196  // Wait for thread to stop.
197  while(m_state != State_Stopped){
198  ASSERT_ALWAYS(pthread_cond_wait(&m_condition, &m_mutex)==0);
199  }
200  ASSERT_ALWAYS(m_params == NULL);
201  ASSERT_ALWAYS(pthread_mutex_unlock(&m_mutex)==0);
202 
203  ASSERT_ALWAYS(pthread_cond_destroy(&m_condition)==0);
204  ASSERT_ALWAYS(pthread_mutex_destroy(&m_mutex)==0);
205 }
206 
207 void TestThread::start(const TestParameters& params){
208  ASSERT_ALWAYS(pthread_mutex_lock(&m_mutex)==0);
209  ASSERT_ALWAYS(m_params == NULL);
210  m_params = &params;
211  ASSERT_ALWAYS(pthread_cond_signal(&m_condition)==0);
212  ASSERT_ALWAYS(pthread_mutex_unlock(&m_mutex)==0);
213 }
214 
215 void TestThread::run(){
216 
217  ASSERT_ALWAYS(pthread_mutex_lock(&m_mutex)==0);
218  while(true){
219  while(m_params==NULL && m_state==State_Active){
220  // Wait for a new command from master thread.
221  ASSERT_ALWAYS(pthread_cond_wait(&m_condition, &m_mutex)==0);
222  }
223  if(m_state != State_Active){
224  // We have been told to stop.
225  ASSERT_ALWAYS(m_state == State_Stopping);
226  m_state = State_Stopped;
227  // Wake up master thread and release lock.
228  ASSERT_ALWAYS(pthread_cond_signal(&m_condition)==0);
229  ASSERT_ALWAYS(pthread_mutex_unlock(&m_mutex)==0);
230  // Exit thread.
231  return;
232  }
233 
234  if(m_params->m_useSQL){
235  doSQLTest();
236  }else{
237  if(m_params->m_useLinkedOperations){
238  doLinkedAPITest();
239  }else{
240  doNonLinkedAPITest();
241  }
242  }
243 
244  ASSERT_ALWAYS(m_params != NULL);
245  m_params = NULL;
246  ASSERT_ALWAYS(pthread_cond_signal(&m_condition)==0);
247  }
248 }
249 
250 void TestThread::doLinkedAPITest(){
251  NdbQueryBuilder* const builder = NdbQueryBuilder::create();
252 
253  const NdbQueryDef* queryDef = NULL;
254  const Row** resultPtrs = new const Row*[m_params->m_depth+1];
255 
256  NdbTransaction* trans = NULL;
257 
258  for(int iterNo = 0; iterNo<m_params->m_iterations; iterNo++){
259  //ndbout << "Starting next iteration " << endl;
260  // Build query definition if needed.
261  if(iterNo==0 || (m_params->m_queryDefReuse>0 &&
262  iterNo%m_params->m_queryDefReuse==0)){
263  if(queryDef != NULL){
264  queryDef->destroy();
265  }
266  const NdbQueryOperationDef* parentOpDef = NULL;
267  if(m_params->m_scanLength==0){
268  // Root is lookup
269  const NdbQueryOperand* rootKey[] = {
270  builder->constValue(0), //a
271  NULL
272  };
273  parentOpDef = builder->readTuple(m_tab, rootKey);
274  }else if(m_params->m_scanLength==1){ //Pruned scan
275  const NdbQueryOperand* const key[] = {
276  builder->constValue(m_params->m_scanLength),
277  NULL
278  };
279 
280  const NdbQueryIndexBound eqBound(key);
281  parentOpDef = builder->scanIndex(m_index, m_tab, &eqBound);
282  }else{
283  // Root is index scan with single bound.
284  const NdbQueryOperand* const highKey[] = {
285  builder->constValue(m_params->m_scanLength),
286  NULL
287  };
288 
289  const NdbQueryIndexBound bound(NULL, false, highKey, false);
290  parentOpDef = builder->scanIndex(m_index, m_tab, &bound);
291  }
292 
293  // Add child lookup operations.
294  for(int i = 0; i<m_params->m_depth; i++){
295  const NdbQueryOperand* key[] = {
296  builder->linkedValue(parentOpDef, "b"),
297  NULL
298  };
299  parentOpDef = builder->readTuple(m_tab, key);
300  }
301  queryDef = builder->prepare();
302  }
303 
304  if (!trans) {
305  trans = m_ndb.startTransaction();
306  }
307  // Execute query.
308  NdbQuery* const query = trans->createQuery(queryDef);
309  for(int i = 0; i<m_params->m_depth+1; i++){
310  query->getQueryOperation(i)
311  ->setResultRowRef(m_resultRec,
312  reinterpret_cast<const char*&>(resultPtrs[i]),
313  NULL);
314  }
315  int res = trans->execute(NoCommit);
316  // if (res != 0)
317  // APIERROR(trans->getNdbError());
318  ASSERT_ALWAYS(res == 0);
319  int cnt=0;
320  while(true){
321  const NdbQuery::NextResultOutcome outcome
322  = query->nextResult(true, false);
323  if(outcome == NdbQuery::NextResult_scanComplete){
324  break;
325  }
326  ASSERT_ALWAYS(outcome== NdbQuery::NextResult_gotRow);
327  cnt++;
328  // if (m_params->m_scanLength==0)
329  // break;
330  }
331  ASSERT_ALWAYS(cnt== MAX(1,m_params->m_scanLength));
332  // query->close();
333  if ((iterNo % 5) == 0) {
334  m_ndb.closeTransaction(trans);
335  trans = NULL;
336  }
337  }
338  if (trans) {
339  m_ndb.closeTransaction(trans);
340  trans = NULL;
341  }
342  builder->destroy();
343 }
344 
345 void TestThread::doNonLinkedAPITest(){
346  Row row = {0, 0};
347  NdbTransaction* const trans = m_ndb.startTransaction();
348  for(int iterNo = 0; iterNo<m_params->m_iterations; iterNo++){
349  // NdbTransaction* const trans = m_ndb.startTransaction();
350  if(m_params->m_scanLength>0){
351  const KeyRow highKey = { m_params->m_scanLength };
352  NdbIndexScanOperation* scanOp = NULL;
353  if(m_params->m_scanLength==1){ // Pruned scan
354  const NdbIndexScanOperation::IndexBound bound = {
355  reinterpret_cast<const char*>(&highKey),
356  1, // Low key count.
357  true, // Low key inclusive
358  reinterpret_cast<const char*>(&highKey),
359  1, // High key count.
360  true, // High key inclusive.
361  0
362  };
363 
364  scanOp =
365  trans->scanIndex(m_indexRec,
366  m_resultRec,
367  NdbOperation::LM_Dirty,
368  NULL, // Result mask
369  &bound);
370  }else{
371  // Scan with upper bound only.
372  const NdbIndexScanOperation::IndexBound bound = {
373  NULL, // Low key
374  0, // Low key count.
375  false, // Low key inclusive
376  reinterpret_cast<const char*>(&highKey),
377  1, // High key count.
378  false, // High key inclusive.
379  0
380  };
381 
382  scanOp =
383  trans->scanIndex(m_indexRec,
384  m_resultRec,
385  NdbOperation::LM_Dirty,
386  NULL, // Result mask
387  &bound);
388  }
389  ASSERT_ALWAYS(scanOp != NULL);
390 
391  ASSERT_ALWAYS(trans->execute(NoCommit) == 0);
392 
393  // Iterate over scan result
394  int cnt = 0;
395  while(true){
396  const Row* scanRow = NULL;
397  const int retVal =
398  scanOp->nextResult(reinterpret_cast<const char**>(&scanRow),
399  true,
400  false);
401  if(retVal==1){
402  break;
403  }
404  ASSERT_ALWAYS(retVal== 0);
405  //ndbout << "ScanRow: " << scanRow->a << " " << scanRow->b << endl;
406  row = *scanRow;
407 
408  // Do a chain of lookups for each scan row.
409  for(int i = 0; i < m_params->m_depth; i++){
410  const KeyRow key = {row.b};
411  const NdbOperation* const lookupOp =
412  trans->readTuple(m_keyRec,
413  reinterpret_cast<const char*>(&key),
414  m_resultRec,
415  reinterpret_cast<char*>(&row),
416  NdbOperation::LM_Dirty);
417  ASSERT_ALWAYS(lookupOp != NULL);
418  ASSERT_ALWAYS(trans->execute(NoCommit) == 0);
419  //ndbout << "LookupRow: " << row.a << " " << row.b << endl;
420  }
421  cnt++;
422  // if (m_params->m_scanLength==0)
423  // break;
424  }
425  ASSERT_ALWAYS(cnt== m_params->m_scanLength);
426  scanOp->close(false,true);
427  }else{
428  // Root is lookup.
429  for(int i = 0; i < m_params->m_depth+1; i++){
430  const KeyRow key = {row.b};
431  const NdbOperation* const lookupOp =
432  trans->readTuple(m_keyRec,
433  reinterpret_cast<const char*>(&key),
434  m_resultRec,
435  reinterpret_cast<char*>(&row),
436  NdbOperation::LM_Dirty);
437  ASSERT_ALWAYS(lookupOp != NULL);
438  ASSERT_ALWAYS(trans->execute(NoCommit) == 0);
439  }
440  }//if(m_params->m_isScan)
441  // m_ndb.closeTransaction(trans);
442  }//for(int iterNo = 0; iterNo<m_params->m_iterations; iterNo++)
443  m_ndb.closeTransaction(trans);
444 }
445 
446 static bool printQuery = false;
447 
448 void TestThread::doSQLTest(){
449  if(m_params->m_useLinkedOperations){
450  mySQLExec(m_mysql, "set ndb_join_pushdown = on;");
451  }else{
452  mySQLExec(m_mysql, "set ndb_join_pushdown = off;");
453  }
454  mySQLExec(m_mysql, "SET SESSION query_cache_type = OFF");
455 
456  class TextBuf{
457  public:
458  char m_buffer[1000];
459 
460  explicit TextBuf(){m_buffer[0] = '\0';}
461 
462  // For appending to the string.
463  char* tail(){ return m_buffer + strlen(m_buffer);}
464  };
465 
466  TextBuf text;
467 
468  sprintf(text.tail(), "select * from ");
469  for(int i = 0; i<m_params->m_depth+1; i++){
470  sprintf(text.tail(), "%s t%d", tableName, i);
471  if(i < m_params->m_depth){
472  sprintf(text.tail(), ", ");
473  }else{
474  sprintf(text.tail(), " where ");
475  }
476  }
477 
478  if(m_params->m_scanLength==0){
479  // Root is lookup
480  sprintf(text.tail(), "t0.a=0 ");
481  }else{
482  // Root is scan.
483  sprintf(text.tail(), "t0.a<%d ", m_params->m_scanLength);
484  }
485 
486  for(int i = 1; i<m_params->m_depth+1; i++){
487  // Compare primary key of Tn to attribute of Tn-1.
488  sprintf(text.tail(), "and t%d.b=t%d.a ", i-1, i);
489  }
490  if(printQuery){
491  ndbout << text.m_buffer << endl;
492  }
493 
494  for(int i = 0; i < m_params->m_iterations; i++){
495  mySQLExec(m_mysql, text.m_buffer);
496  }
497 }
498 
500  ASSERT_ALWAYS(pthread_mutex_lock(&m_mutex)==0);
501  while(m_params!=NULL){
502  ASSERT_ALWAYS(pthread_cond_wait(&m_condition, &m_mutex)==0);
503  }
504  ASSERT_ALWAYS(pthread_mutex_unlock(&m_mutex)==0);
505 }
506 
507 
508 
509 
510 static void makeDatabase(const char* host, int port, int rowCount){
511  MYSQL mysql;
512  ASSERT_ALWAYS(mysql_init(&mysql));
513  if(!mysql_real_connect(&mysql, host, "root", "", "",
514  port, NULL, 0)){
515  printMySQLError(mysql, "mysql_real_connect() failed:");
516  ASSERT_ALWAYS(false);
517  }
518  char text[200];
519  sprintf(text, "create database if not exists %s", databaseName);
520  mySQLExec(mysql, text);
521  sprintf(text, "use %s", databaseName);
522  mySQLExec(mysql, text);
523  sprintf(text, "drop table if exists %s", tableName);
524  mySQLExec(mysql, text);
525  sprintf(text, "create table %s(a int not null,"
526  "b int not null,"
527  "primary key(a)) ENGINE=NDB", tableName);
528  mySQLExec(mysql, text);
529  for(int i = 0; i<rowCount; i++){
530  sprintf(text, "insert into %s values(%d, %d)", tableName,
531  i, (i+1)%rowCount);
532  mySQLExec(mysql, text);
533  }
534 }
535 
536 static void printHeading(){
537  ndbout << endl << "Use SQL; Use linked; Thread count; Iterations; "
538  "Scan length; Depth; Def re-use; Duration (ms); Tuples per sec;" << endl;
539 }
540 
541 
542 void runTest(TestThread** threads, int threadCount,
543  TestParameters& param){
544  //ndbout << "Doing test " << name << endl;
545  const NDB_TICKS start = NdbTick_CurrentMillisecond();
546  for(int i = 0; i<threadCount; i++){
547  threads[i]->start(param);
548  }
549  for(int i = 0; i<threadCount; i++){
550  threads[i]->wait();
551  }
552  const NDB_TICKS duration = NdbTick_CurrentMillisecond() - start;
553  ndbout << param.m_useSQL << "; ";
554  ndbout << param.m_useLinkedOperations << "; ";
555  ndbout << threadCount << "; ";
556  ndbout << param.m_iterations << "; ";
557  ndbout << param.m_scanLength << "; ";
558  ndbout << param.m_depth <<"; ";
559  ndbout << param.m_queryDefReuse << "; ";
560  ndbout << duration << "; ";
561  int tupPerSec;
562  if(duration==0){
563  tupPerSec = -1;
564  }else{
565  if(param.m_scanLength==0){
566  tupPerSec = threadCount *
567  param.m_iterations *
568  (param.m_depth+1) * 1000 / duration;
569  }else{
570  tupPerSec = threadCount *
571  param.m_iterations *
572  param.m_scanLength *
573  (param.m_depth+1) * 1000 / duration;
574  }
575  }
576  ndbout << tupPerSec << "; ";
577  ndbout << endl;
578  //ndbout << "Test " << name << " done in " << duration << "ms"<< endl;
579 }
580 
581 const int threadCount = 1;
582 TestThread** threads = NULL;
583 
584 void warmUp(){
585  ndbout << endl << "warmUp()" << endl;
586  TestParameters param;
587  param.m_useSQL = true;
588  param.m_iterations = 10;
589  param.m_useLinkedOperations = false;
590  param.m_scanLength = 0;
591  param.m_queryDefReuse = 0;
592 
593  printHeading();
594  for(int i = 0; i<20; i++){
595  param.m_depth = i;
596  runTest(threads, threadCount, param);
597  }
598  printHeading();
599  param.m_useLinkedOperations = true;
600  for(int i = 0; i<20; i++){
601  param.m_depth = i;
602  runTest(threads, threadCount, param);
603  }
604 }
605 
606 void testLookupDepth(bool useSQL){
607  ndbout << endl << "testLookupDepth()" << endl;
608  TestParameters param;
609  param.m_useSQL = useSQL;
610  param.m_iterations = 100;
611  param.m_useLinkedOperations = false;
612  param.m_scanLength = 0;
613  param.m_queryDefReuse = 0;
614 
615  printHeading();
616  for(int i = 0; i<20; i++){
617  param.m_depth = i;
618  runTest(threads, threadCount, param);
619  }
620  printHeading();
621  param.m_useLinkedOperations = true;
622  for(int i = 0; i<20; i++){
623  param.m_depth = i;
624  runTest(threads, threadCount, param);
625  }
626 }
627 
628 void testScanDepth(int scanLength, bool useSQL){
629  ndbout << endl << "testScanDepth()" << endl;
630  TestParameters param;
631  param.m_useSQL = useSQL;
632  param.m_iterations = 20;
633  param.m_useLinkedOperations = false;
634  param.m_scanLength = scanLength;
635  param.m_queryDefReuse = 0;
636  printHeading();
637  for(int i = 0; i<10; i++){
638  param.m_depth = i;
639  runTest(threads, threadCount, param);
640  }
641  printHeading();
642  param.m_useLinkedOperations = true;
643  for(int i = 0; i<10; i++){
644  param.m_depth = i;
645  runTest(threads, threadCount, param);
646  }
647 }
648 
649 int main(int argc, char* argv[]){
650  NDB_INIT(argv[0]);
651  if(argc!=4 && argc!=5){
652  ndbout << "Usage: " << argv[0] << " [--print-query]"
653  << " <mysql IP address> <mysql port> <cluster connect string>"
654  << endl;
655  return -1;
656  }
657  int argno = 1;
658  if(strcmp(argv[argno],"--print-query")==0){
659  printQuery = true;
660  argno++;
661  }
662  const char* const host=argv[argno++];
663  const int port = atoi(argv[argno++]);
664  const char* const connectString = argv[argno];
665 
666  makeDatabase(host, port, 200);
667  {
668  Ndb_cluster_connection con(connectString);
669  ASSERT_ALWAYS(con.connect(12, 5, 1) == 0);
670  ASSERT_ALWAYS(con.wait_until_ready(30,30) == 0);
671 
672  const int threadCount = 1;
673  threads = new TestThread*[threadCount];
674  for(int i = 0; i<threadCount; i++){
675  threads[i] = new TestThread(con, host, port);
676  }
677  sleep(1);
678 
679  //testScanDepth(1);
680  //testScanDepth(2);
681  //testScanDepth(5);
682  warmUp();
683  testScanDepth(50, true);
684  testLookupDepth(true);
685 
686  for(int i = 0; i<threadCount; i++){
687  delete threads[i];
688  }
689  delete[] threads;
690  } // Must call ~Ndb_cluster_connection() before ndb_end().
691  ndb_end(0);
692  return 0;
693 }
694