20 #include <mysqld_error.h> 
   25 #include <ndb_global.h> 
   29 #include "../../src/ndbapi/NdbQueryBuilder.hpp" 
   30 #include "../../src/ndbapi/NdbQueryOperation.hpp" 
   38 #define ASSERT_ALWAYS(cond) if(unlikely(!(cond))){abort();} 
   40 #define ASSERT_ALWAYS assert 
   47 #define PRINT_ERROR(code,msg) \ 
   48   std::cout << "Error in " << __FILE__ << ", line: " << __LINE__ \ 
   49             << ", code: " << code \ 
   50             << ", msg: " << msg << "." << std::endl 
   52 #define APIERROR(error) { \ 
   53   PRINT_ERROR((error).code,(error).message); \ 
   60 const char* databaseName = 
"PTDB";
 
   61 const char* tableName = 
"TT";
 
   73   bool m_useLinkedOperations;
 
   81 static void *callback(
void* thread);
 
  104   enum {State_Active, State_Stopping, State_Stopped} m_state;
 
  105   pthread_t m_posixThread;
 
  106   pthread_mutex_t m_mutex;  
 
  107   pthread_cond_t m_condition;
 
  117   void doLinkedAPITest();
 
  118   void doNonLinkedAPITest();
 
  122 static void *callback(
void* thread){
 
  127 static void printMySQLError(
MYSQL& mysql, 
const char* before=NULL){
 
  131   ndbout << mysql_error(&mysql) << endl;
 
  135 static void mySQLExec(
MYSQL& mysql, 
const char* stmt){
 
  137   if(mysql_query(&mysql, stmt) != 0){
 
  138     ndbout << 
"Error executing '" << stmt << 
"' : ";
 
  139     printMySQLError(mysql);
 
  141   mysql_free_result(mysql_use_result(&mysql));
 
  149   m_ndb(&con, databaseName),
 
  150   m_state(State_Active){
 
  151   ASSERT_ALWAYS(m_ndb.
init()==0);
 
  152   ASSERT_ALWAYS(pthread_mutex_init(&m_mutex, NULL)==0);
 
  153   ASSERT_ALWAYS(pthread_cond_init(&m_condition, NULL)==0);
 
  154   ASSERT_ALWAYS(pthread_create(&m_posixThread, NULL, 
callback, 
this)
 
  159   m_index = dict->
getIndex(
"PRIMARY", tableName);
 
  160   ASSERT_ALWAYS(m_index != NULL);
 
  164   ASSERT_ALWAYS(m_resultRec!=NULL);
 
  168   ASSERT_ALWAYS(col1 != NULL);
 
  173   m_keyRec = dict->createRecord(m_tab, &spec, 1, 
sizeof spec);
 
  174   ASSERT_ALWAYS(m_keyRec != NULL);
 
  177   ASSERT_ALWAYS(m_indexRec != NULL);
 
  180   ASSERT_ALWAYS(mysql_init(&m_mysql));
 
  181   if(!mysql_real_connect(&m_mysql, host, 
"root", 
"", 
"",
 
  183     printMySQLError(m_mysql, 
"mysql_real_connect() failed:");
 
  184     ASSERT_ALWAYS(
false);
 
  187   sprintf(text, 
"use %s", databaseName);
 
  188   mySQLExec(m_mysql, text);
 
  191 TestThread::~TestThread(){
 
  192   ASSERT_ALWAYS(pthread_mutex_lock(&m_mutex)==0);
 
  194   m_state = State_Stopping;
 
  195   ASSERT_ALWAYS(pthread_cond_signal(&m_condition)==0);
 
  197   while(m_state != State_Stopped){
 
  198     ASSERT_ALWAYS(pthread_cond_wait(&m_condition, &m_mutex)==0);
 
  200   ASSERT_ALWAYS(m_params == NULL);
 
  201   ASSERT_ALWAYS(pthread_mutex_unlock(&m_mutex)==0);
 
  203   ASSERT_ALWAYS(pthread_cond_destroy(&m_condition)==0);
 
  204   ASSERT_ALWAYS(pthread_mutex_destroy(&m_mutex)==0);
 
  208   ASSERT_ALWAYS(pthread_mutex_lock(&m_mutex)==0);
 
  209   ASSERT_ALWAYS(m_params == NULL);
 
  211   ASSERT_ALWAYS(pthread_cond_signal(&m_condition)==0);
 
  212   ASSERT_ALWAYS(pthread_mutex_unlock(&m_mutex)==0);
 
  215 void TestThread::run(){
 
  217   ASSERT_ALWAYS(pthread_mutex_lock(&m_mutex)==0);
 
  219     while(m_params==NULL && m_state==State_Active){
 
  221       ASSERT_ALWAYS(pthread_cond_wait(&m_condition, &m_mutex)==0);
 
  223     if(m_state != State_Active){
 
  225       ASSERT_ALWAYS(m_state == State_Stopping);
 
  226       m_state = State_Stopped;
 
  228       ASSERT_ALWAYS(pthread_cond_signal(&m_condition)==0);
 
  229       ASSERT_ALWAYS(pthread_mutex_unlock(&m_mutex)==0);
 
  237       if(m_params->m_useLinkedOperations){
 
  240         doNonLinkedAPITest();
 
  244     ASSERT_ALWAYS(m_params != NULL);
 
  246     ASSERT_ALWAYS(pthread_cond_signal(&m_condition)==0);
 
  250 void TestThread::doLinkedAPITest(){
 
  254   const Row** resultPtrs = 
new const Row*[m_params->
m_depth+1];
 
  258   for(
int iterNo = 0; iterNo<m_params->m_iterations; iterNo++){
 
  263       if(queryDef != NULL){
 
  267       if(m_params->m_scanLength==0){
 
  270           builder->constValue(0), 
 
  273         parentOpDef = builder->readTuple(m_tab, rootKey);
 
  274       }
else if(m_params->m_scanLength==1){ 
 
  276           builder->constValue(m_params->m_scanLength),
 
  281         parentOpDef = builder->scanIndex(m_index, m_tab, &eqBound);
 
  285           builder->constValue(m_params->m_scanLength),
 
  290         parentOpDef = builder->scanIndex(m_index, m_tab, &bound);
 
  296           builder->linkedValue(parentOpDef, 
"b"),
 
  299         parentOpDef = builder->readTuple(m_tab, key);
 
  301       queryDef = builder->prepare();
 
  310       query->getQueryOperation(
i)
 
  311         ->setResultRowRef(m_resultRec,
 
  312                           reinterpret_cast<const char*&>(resultPtrs[
i]),
 
  315     int res = trans->
execute(NoCommit);
 
  318     ASSERT_ALWAYS(res == 0);
 
  323       if(outcome ==  NdbQuery::NextResult_scanComplete){
 
  326       ASSERT_ALWAYS(outcome== NdbQuery::NextResult_gotRow);
 
  331     ASSERT_ALWAYS(cnt== MAX(1,m_params->m_scanLength));
 
  333     if ((iterNo % 5) == 0) {
 
  345 void TestThread::doNonLinkedAPITest(){
 
  348   for(
int iterNo = 0; iterNo<m_params->m_iterations; iterNo++){
 
  350     if(m_params->m_scanLength>0){
 
  351       const KeyRow highKey = { m_params->m_scanLength };
 
  353       if(m_params->m_scanLength==1){ 
 
  355           reinterpret_cast<const char*
>(&highKey),
 
  358           reinterpret_cast<const char*>(&highKey),
 
  367                            NdbOperation::LM_Dirty,
 
  376           reinterpret_cast<const char*
>(&highKey),
 
  385                            NdbOperation::LM_Dirty,
 
  389       ASSERT_ALWAYS(scanOp != NULL);
 
  391       ASSERT_ALWAYS(trans->
execute(NoCommit) == 0);
 
  396         const Row* scanRow = NULL;
 
  398           scanOp->nextResult(reinterpret_cast<const char**>(&scanRow), 
 
  404         ASSERT_ALWAYS(retVal== 0);
 
  410           const KeyRow key = {row.b};
 
  412             trans->readTuple(m_keyRec, 
 
  413                              reinterpret_cast<const char*>(&key),
 
  415                              reinterpret_cast<char*>(&row),
 
  416                              NdbOperation::LM_Dirty);
 
  417           ASSERT_ALWAYS(lookupOp != NULL);
 
  418           ASSERT_ALWAYS(trans->
execute(NoCommit) == 0);
 
  425       ASSERT_ALWAYS(cnt== m_params->m_scanLength);
 
  426       scanOp->close(
false,
true);
 
  430         const KeyRow key = {row.b};
 
  432           trans->readTuple(m_keyRec, 
 
  433                            reinterpret_cast<const char*>(&key),
 
  435                            reinterpret_cast<char*>(&row),
 
  436                            NdbOperation::LM_Dirty);
 
  437         ASSERT_ALWAYS(lookupOp != NULL);
 
  438         ASSERT_ALWAYS(trans->
execute(NoCommit) == 0);
 
  446 static bool printQuery = 
false;
 
  448 void TestThread::doSQLTest(){
 
  449   if(m_params->m_useLinkedOperations){
 
  450     mySQLExec(m_mysql, 
"set ndb_join_pushdown = on;");
 
  452     mySQLExec(m_mysql, 
"set ndb_join_pushdown = off;");
 
  454   mySQLExec(m_mysql, 
"SET SESSION query_cache_type = OFF");
 
  460     explicit TextBuf(){m_buffer[0] = 
'\0';}
 
  463     char* tail(){ 
return m_buffer + strlen(m_buffer);}
 
  468   sprintf(text.tail(), 
"select * from ");
 
  470     sprintf(text.tail(), 
"%s t%d", tableName, 
i);
 
  471     if(i < m_params->m_depth){
 
  472       sprintf(text.tail(), 
", ");
 
  474       sprintf(text.tail(), 
" where ");
 
  478   if(m_params->m_scanLength==0){
 
  480     sprintf(text.tail(), 
"t0.a=0 ");
 
  483     sprintf(text.tail(), 
"t0.a<%d ", m_params->m_scanLength);
 
  488     sprintf(text.tail(), 
"and t%d.b=t%d.a ", 
i-1, 
i);
 
  491     ndbout << text.m_buffer << endl;
 
  494   for(
int i = 0; 
i < m_params->m_iterations; 
i++){
 
  495     mySQLExec(m_mysql, text.m_buffer);
 
  500   ASSERT_ALWAYS(pthread_mutex_lock(&m_mutex)==0);
 
  501   while(m_params!=NULL){
 
  502     ASSERT_ALWAYS(pthread_cond_wait(&m_condition, &m_mutex)==0);
 
  504   ASSERT_ALWAYS(pthread_mutex_unlock(&m_mutex)==0);
 
  510 static void makeDatabase(
const char* host, 
int port, 
int rowCount){
 
  512   ASSERT_ALWAYS(mysql_init(&mysql));
 
  513   if(!mysql_real_connect(&mysql, host, 
"root", 
"", 
"",
 
  515     printMySQLError(mysql, 
"mysql_real_connect() failed:");
 
  516     ASSERT_ALWAYS(
false);
 
  519   sprintf(text, 
"create database if not exists %s", databaseName);
 
  520   mySQLExec(mysql, text);
 
  521   sprintf(text, 
"use %s", databaseName);
 
  522   mySQLExec(mysql, text);
 
  523   sprintf(text, 
"drop table if exists %s", tableName);
 
  524   mySQLExec(mysql, text);
 
  525   sprintf(text, 
"create table %s(a int not null,"  
  527           "primary key(a)) ENGINE=NDB", tableName);
 
  528   mySQLExec(mysql, text);
 
  529   for(
int i = 0; 
i<rowCount; 
i++){
 
  530     sprintf(text, 
"insert into %s values(%d, %d)", tableName, 
 
  532     mySQLExec(mysql, text);
 
  536 static void printHeading(){
 
  537   ndbout << endl << 
"Use SQL; Use linked; Thread count; Iterations; " 
  538     "Scan length; Depth; Def re-use; Duration (ms); Tuples per sec;" << endl;
 
  542 void runTest(
TestThread** threads, 
int threadCount, 
 
  545   const NDB_TICKS 
start = NdbTick_CurrentMillisecond();
 
  546   for(
int i = 0; 
i<threadCount; 
i++){
 
  549   for(
int i = 0; 
i<threadCount; 
i++){
 
  552   const NDB_TICKS duration = NdbTick_CurrentMillisecond() - 
start;
 
  554   ndbout << param.m_useLinkedOperations << 
"; ";
 
  555   ndbout << threadCount << 
"; ";
 
  556   ndbout << param.m_iterations << 
"; ";
 
  557   ndbout << param.m_scanLength << 
"; ";
 
  558   ndbout << param.
m_depth <<
"; ";
 
  560   ndbout << duration << 
"; ";
 
  565     if(param.m_scanLength==0){
 
  566       tupPerSec = threadCount * 
 
  568         (param.
m_depth+1) * 1000 / duration;
 
  570       tupPerSec = threadCount * 
 
  573         (param.
m_depth+1) * 1000 / duration;
 
  576   ndbout << tupPerSec << 
"; ";
 
  581 const int threadCount = 1;
 
  585   ndbout << endl << 
"warmUp()" << endl;
 
  588   param.m_iterations = 10;
 
  589   param.m_useLinkedOperations = 
false;
 
  590   param.m_scanLength = 0;
 
  594   for(
int i = 0; 
i<20; 
i++){
 
  596     runTest(threads, threadCount, param);
 
  599   param.m_useLinkedOperations = 
true;
 
  600   for(
int i = 0; 
i<20; 
i++){
 
  602     runTest(threads, threadCount, param);
 
  606 void testLookupDepth(
bool useSQL){
 
  607   ndbout << endl << 
"testLookupDepth()" << endl;
 
  610   param.m_iterations = 100;
 
  611   param.m_useLinkedOperations = 
false;
 
  612   param.m_scanLength = 0;
 
  616   for(
int i = 0; 
i<20; 
i++){
 
  618     runTest(threads, threadCount, param);
 
  621   param.m_useLinkedOperations = 
true;
 
  622   for(
int i = 0; 
i<20; 
i++){
 
  624     runTest(threads, threadCount, param);
 
  628 void testScanDepth(
int scanLength, 
bool useSQL){
 
  629   ndbout  << endl << 
"testScanDepth()" << endl;
 
  632   param.m_iterations = 20;
 
  633   param.m_useLinkedOperations = 
false;
 
  634   param.m_scanLength = scanLength;
 
  637   for(
int i = 0; 
i<10; 
i++){
 
  639     runTest(threads, threadCount, param);
 
  642   param.m_useLinkedOperations = 
true;
 
  643   for(
int i = 0; 
i<10; 
i++){
 
  645     runTest(threads, threadCount, param);
 
  649 int main(
int argc, 
char* argv[]){
 
  651   if(argc!=4 && argc!=5){
 
  652     ndbout << 
"Usage: " << argv[0] << 
" [--print-query]"  
  653            << 
" <mysql IP address> <mysql port> <cluster connect string>"  
  658   if(strcmp(argv[argno],
"--print-query")==0){
 
  662   const char* 
const host=argv[argno++];
 
  663   const int port = atoi(argv[argno++]);
 
  664   const char* 
const connectString = argv[argno];
 
  666   makeDatabase(host, port, 200);
 
  669     ASSERT_ALWAYS(con.
connect(12, 5, 1) == 0);
 
  672     const int threadCount = 1;
 
  674     for(
int i = 0; 
i<threadCount; 
i++){
 
  683     testScanDepth(50, 
true);
 
  684     testLookupDepth(
true);
 
  686     for(
int i = 0; 
i<threadCount; 
i++){