MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
testScanFilter.cpp
1 /*
2  Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 #include <NDBT.hpp>
19 #include <NDBT_Test.hpp>
20 
21 #define ERR_EXIT(obj, msg) \
22 do \
23 { \
24 fprintf(stderr, "%s: %s (%d) in %s:%d\n", \
25 msg, obj->getNdbError().message, obj->getNdbError().code, __FILE__, __LINE__); \
26 exit(-1); \
27 } \
28 while (0);
29 
30 #define PRINT_ERROR(code,msg) \
31 do \
32 { \
33 fprintf(stderr, "Error in %s, line: %d, code: %d, msg: %s.\n", __FILE__, __LINE__, code, msg); \
34 } \
35 while (0);
36 
37 #define MYSQLERROR(mysql) { \
38  PRINT_ERROR(mysql_errno(&mysql),mysql_error(&mysql)); \
39  exit(-1); }
40 #define APIERROR(error) { \
41  PRINT_ERROR(error.code,error.message); \
42  exit(-1); }
43 
44 #define TEST_NAME "TestScanFilter"
45 #define TABLE_NAME "TABLE_SCAN"
46 
47 const char *COL_NAME[] = {"id", "i", "j", "k", "l", "m", "n"};
48 const char COL_LEN = 7;
49 /*
50 * Not to change TUPLE_NUM, because the column in TABLE_NAME is fixed,
51 * there are six columns, 'i', 'j', 'k', 'l', 'm', 'n', and each on is equal to 1 or 1,
52 * Since each tuple should be unique in this case, then TUPLE_NUM = 2 power 6 = 64
53 */
54 const int TUPLE_NUM = 1 << (COL_LEN - 1);
55 
56 /*
57 * the recursive level of random scan filter, can
58 * modify this parameter more or less, range from
59 * 1 to 100, larger num consumes more scan time
60 */
61 const int RECURSIVE_LEVEL = 10;
62 
63 const int MAX_STR_LEN = (RECURSIVE_LEVEL * (COL_LEN+1) * 4);
64 
65 /*
66 * Each time stands for one test, it will produce a random
67 * filter string, and scan through ndb api and through
68 * calculation with tuples' data, then compare the result,
69 * if they are equal, this test passed, or failed.
70 * Only all TEST_NUM times tests passed, we can believe
71 * the suite of test cases are okay.
72 * Change TEST_NUM to larger will need more time to test
73 */
74 const int TEST_NUM = 5000;
75 
76 
77 /* Table definition*/
78 static
79 const
80 NDBT_Attribute MYTAB1Attribs[] = {
88 };
89 static
90 const
91 NDBT_Table MYTAB1(TABLE_NAME, sizeof(MYTAB1Attribs)/sizeof(NDBT_Attribute), MYTAB1Attribs);
92 
93 static
94 const
95 NDBT_Attribute MYTAB2Attribs[] = {
97  // _pk _nullable
98  NDBT_Attribute("1bitnn", NdbDictionary::Column::Bit, 1, false, false),
99  NDBT_Attribute("1bitnu", NdbDictionary::Column::Bit, 1, false, true),
100  NDBT_Attribute("2bitnn", NdbDictionary::Column::Bit, 2, false, false),
101  NDBT_Attribute("2bitnu", NdbDictionary::Column::Bit, 2, false, true),
102  NDBT_Attribute("7bitnn", NdbDictionary::Column::Bit, 7, false, false),
103  NDBT_Attribute("7bitnu", NdbDictionary::Column::Bit, 7, false, true),
104  NDBT_Attribute("8bitnn", NdbDictionary::Column::Bit, 8, false, false),
105  NDBT_Attribute("8bitnu", NdbDictionary::Column::Bit, 8, false, true),
106  NDBT_Attribute("15bitnn", NdbDictionary::Column::Bit, 15, false, false),
107  NDBT_Attribute("15bitnu", NdbDictionary::Column::Bit, 15, false, true),
108  NDBT_Attribute("31bitnn", NdbDictionary::Column::Bit, 31, false, false),
109  NDBT_Attribute("31bitnu", NdbDictionary::Column::Bit, 31, false, true),
110  NDBT_Attribute("32bitnn", NdbDictionary::Column::Bit, 32, false, false),
111  NDBT_Attribute("32bitnu", NdbDictionary::Column::Bit, 32, false, true),
112  NDBT_Attribute("33bitnn", NdbDictionary::Column::Bit, 33, false, false),
113  NDBT_Attribute("33bitnu", NdbDictionary::Column::Bit, 33, false, true),
114  NDBT_Attribute("63bitnn", NdbDictionary::Column::Bit, 63, false, false),
115  NDBT_Attribute("63bitnu", NdbDictionary::Column::Bit, 63, false, true),
116  NDBT_Attribute("64bitnn", NdbDictionary::Column::Bit, 64, false, false),
117  NDBT_Attribute("64bitnu", NdbDictionary::Column::Bit, 64, false, true),
118  NDBT_Attribute("65bitnn", NdbDictionary::Column::Bit, 65, false, false),
119  NDBT_Attribute("65bitnu", NdbDictionary::Column::Bit, 65, false, true),
120  NDBT_Attribute("127bitnn", NdbDictionary::Column::Bit, 127, false, false),
121  NDBT_Attribute("127bitnu", NdbDictionary::Column::Bit, 127, false, true),
122  NDBT_Attribute("513bitnn", NdbDictionary::Column::Bit, 513, false, false),
123  NDBT_Attribute("513bitnu", NdbDictionary::Column::Bit, 513, false, true)
124 };
125 
126 static const char* TABLE2_NAME= "MyTab2";
127 
128 static
129 const
130 NDBT_Table MYTAB2(TABLE2_NAME, sizeof(MYTAB2Attribs)/sizeof(NDBT_Attribute), MYTAB2Attribs);
131 
132 static const int NUM_COLS= sizeof(MYTAB2Attribs)/sizeof(NDBT_Attribute);
133 static const int MAX_BIT_WIDTH= 513;
134 /* One extra row for all bits == 0 */
135 static const int TOTAL_ROWS= MAX_BIT_WIDTH + 1;
136 
137 int createTable(Ndb* pNdb, const NdbDictionary::Table* tab, bool _temp,
138  bool existsOk, NDBT_CreateTableHook f)
139 {
140  int r = 0;
141  do{
142  NdbDictionary::Table tmpTab(* tab);
143  tmpTab.setStoredTable(_temp ? 0 : 1);
144  if(f != 0 && f(pNdb, tmpTab, 0, NULL))
145  {
146  ndbout << "Failed to create table" << endl;
147  return NDBT_FAILED;
148  }
149  r = pNdb->getDictionary()->createTable(tmpTab);
150  if(r == -1){
151  if(!existsOk){
152  ndbout << "Error: " << pNdb->getDictionary()->getNdbError() << endl;
153  break;
154  }
155  if(pNdb->getDictionary()->getNdbError().code != 721){
156  ndbout << "Error: " << pNdb->getDictionary()->getNdbError() << endl;
157  break;
158  }
159  r = 0;
160  }
161  }while(false);
162 
163  return r;
164 }
165 
166 /*
167 * Function to produce the tuples' data
168 */
169 int runPopulate(NDBT_Context* ctx, NDBT_Step* step)
170 {
171  Ndb *myNdb = GETNDB(step);
172  const NdbDictionary::Dictionary* myDict= myNdb->getDictionary();
173  const NdbDictionary::Table *myTable= myDict->getTable(TABLE_NAME);
174  if(myTable == NULL)
175  APIERROR(myDict->getNdbError());
176 
177  NdbTransaction* myTrans = myNdb->startTransaction();
178  if (myTrans == NULL)
179  APIERROR(myNdb->getNdbError());
180 
181  for(int num = 0; num < TUPLE_NUM; num++)
182  {
183  NdbOperation* myNdbOperation = myTrans->getNdbOperation(myTable);
184  if(myNdbOperation == NULL)
185  {
186  APIERROR(myTrans->getNdbError());
187  }
188 
189 /* the tuples' data in TABLE_NAME
190 +----+---+---+---+---+---+---+
191 | id | i | j | k | l | m | n |
192 +----+---+---+---+---+---+---+
193 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
194 | 1 | 0 | 0 | 0 | 0 | 0 | 1 |
195 | 2 | 0 | 0 | 0 | 0 | 1 | 0 |
196 | 3 | 0 | 0 | 0 | 0 | 1 | 1 |
197 | 4 | 0 | 0 | 0 | 1 | 0 | 0 |
198 | 5 | 0 | 0 | 0 | 1 | 0 | 1 |
199 | 6 | 0 | 0 | 0 | 1 | 1 | 0 |
200 | 7 | 0 | 0 | 0 | 1 | 1 | 1 |
201 | 8 | 0 | 0 | 1 | 0 | 0 | 0 |
202 | 9 | 0 | 0 | 1 | 0 | 0 | 1 |
203 | 10 | 0 | 0 | 1 | 0 | 1 | 0 |
204 | 11 | 0 | 0 | 1 | 0 | 1 | 1 |
205 | 12 | 0 | 0 | 1 | 1 | 0 | 0 |
206 | 13 | 0 | 0 | 1 | 1 | 0 | 1 |
207 | 14 | 0 | 0 | 1 | 1 | 1 | 0 |
208 | 15 | 0 | 0 | 1 | 1 | 1 | 1 |
209 | 16 | 0 | 1 | 0 | 0 | 0 | 0 |
210 | 17 | 0 | 1 | 0 | 0 | 0 | 1 |
211 | 18 | 0 | 1 | 0 | 0 | 1 | 0 |
212 | 19 | 0 | 1 | 0 | 0 | 1 | 1 |
213 | 20 | 0 | 1 | 0 | 1 | 0 | 0 |
214 | 21 | 0 | 1 | 0 | 1 | 0 | 1 |
215 | 22 | 0 | 1 | 0 | 1 | 1 | 0 |
216 | 23 | 0 | 1 | 0 | 1 | 1 | 1 |
217 | 24 | 0 | 1 | 1 | 0 | 0 | 0 |
218 | 25 | 0 | 1 | 1 | 0 | 0 | 1 |
219 | 26 | 0 | 1 | 1 | 0 | 1 | 0 |
220 | 27 | 0 | 1 | 1 | 0 | 1 | 1 |
221 | 28 | 0 | 1 | 1 | 1 | 0 | 0 |
222 | 29 | 0 | 1 | 1 | 1 | 0 | 1 |
223 | 30 | 0 | 1 | 1 | 1 | 1 | 0 |
224 | 31 | 0 | 1 | 1 | 1 | 1 | 1 |
225 | 32 | 1 | 0 | 0 | 0 | 0 | 0 |
226 | 33 | 1 | 0 | 0 | 0 | 0 | 1 |
227 | 34 | 1 | 0 | 0 | 0 | 1 | 0 |
228 | 35 | 1 | 0 | 0 | 0 | 1 | 1 |
229 | 36 | 1 | 0 | 0 | 1 | 0 | 0 |
230 | 37 | 1 | 0 | 0 | 1 | 0 | 1 |
231 | 38 | 1 | 0 | 0 | 1 | 1 | 0 |
232 | 39 | 1 | 0 | 0 | 1 | 1 | 1 |
233 | 40 | 1 | 0 | 1 | 0 | 0 | 0 |
234 | 41 | 1 | 0 | 1 | 0 | 0 | 1 |
235 | 42 | 1 | 0 | 1 | 0 | 1 | 0 |
236 | 43 | 1 | 0 | 1 | 0 | 1 | 1 |
237 | 44 | 1 | 0 | 1 | 1 | 0 | 0 |
238 | 45 | 1 | 0 | 1 | 1 | 0 | 1 |
239 | 46 | 1 | 0 | 1 | 1 | 1 | 0 |
240 | 47 | 1 | 0 | 1 | 1 | 1 | 1 |
241 | 48 | 1 | 1 | 0 | 0 | 0 | 0 |
242 | 49 | 1 | 1 | 0 | 0 | 0 | 1 |
243 | 50 | 1 | 1 | 0 | 0 | 1 | 0 |
244 | 51 | 1 | 1 | 0 | 0 | 1 | 1 |
245 | 52 | 1 | 1 | 0 | 1 | 0 | 0 |
246 | 53 | 1 | 1 | 0 | 1 | 0 | 1 |
247 | 54 | 1 | 1 | 0 | 1 | 1 | 0 |
248 | 55 | 1 | 1 | 0 | 1 | 1 | 1 |
249 | 56 | 1 | 1 | 1 | 0 | 0 | 0 |
250 | 57 | 1 | 1 | 1 | 0 | 0 | 1 |
251 | 58 | 1 | 1 | 1 | 0 | 1 | 0 |
252 | 59 | 1 | 1 | 1 | 0 | 1 | 1 |
253 | 60 | 1 | 1 | 1 | 1 | 0 | 0 |
254 | 61 | 1 | 1 | 1 | 1 | 0 | 1 |
255 | 62 | 1 | 1 | 1 | 1 | 1 | 0 |
256 | 63 | 1 | 1 | 1 | 1 | 1 | 1 |
257 +----+---+---+---+---+---+---+
258 */
259  myNdbOperation->insertTuple();
260  myNdbOperation->equal(COL_NAME[0], num);
261  for(int col = 1; col < COL_LEN; col++)
262  {
263  myNdbOperation->setValue(COL_NAME[col], (num>>(COL_LEN-1-col))&1);
264  }
265  }
266 
267  int check = myTrans->execute(NdbTransaction::Commit);
268 
269  myTrans->close();
270 
271  if (check == -1)
272  return NDBT_FAILED;
273  else
274  return NDBT_OK;
275 
276 }
277 
278 
279 
280 /*
281 * a=AND, o=OR, A=NAND, O=NOR
282 */
283 char op_string[] = "aoAO";
284 /*
285 * the six columns' name of test table
286 */
287 char col_string[] = "ijklmn";
288 const int op_len = strlen(op_string);
289 const int col_len = strlen(col_string);
290 
291 /*
292 * get a random op from "aoAO"
293 */
294 int get_rand_op_ch(char *ch)
295 {
296  static unsigned int num = 0;
297  if(++num == 0)
298  num = 1;
299  srand(num*(unsigned int)time(NULL));
300  *ch = op_string[rand() % op_len];
301  return 1;
302 }
303 
304 /*
305 * get a random order form of "ijklmn" trough exchanging letter
306 */
307 void change_col_order()
308 {
309  int pos1,pos2;
310  char temp;
311  for (int i = 0; i < 10; i++) //exchange for 10 times
312  {
313  srand((unsigned int)time(NULL)/(i+1));
314  pos1 = rand() % col_len;
315  srand((i+1)*(unsigned int)time(NULL));
316  pos2 = rand() % col_len;
317  if (pos1 == pos2)
318  continue;
319  temp = col_string[pos1];
320  col_string[pos1] = col_string[pos2];
321  col_string[pos2] = temp;
322  }
323 }
324 
325 /*
326 * get a random sub string of "ijklmn"
327 */
328 int get_rand_col_str(char *str)
329 {
330  int len;
331  static unsigned int num = 0;
332  if(++num == 0)
333  num = 1;
334  srand(num*(unsigned int)time(NULL));
335  len = rand() % col_len + 1;
336  change_col_order();
337  BaseString::snprintf(str, len+1, "%s", col_string); //len+1, including '\0'
338  return len;
339 }
340 
341 /*
342 * get a random string including operation and column
343 * eg, Alnikx
344 */
345 int get_rand_op_str(char *str)
346 {
347  char temp[256];
348  int len1, len2, len;
349  len1 = get_rand_op_ch(temp);
350  len2 = get_rand_col_str(temp+len1);
351  len = len1 + len2;
352  temp[len] = 'x';
353  temp[len+1] = '\0'; // Add ending \0
354  BaseString::snprintf(str, len+1+1, "%s", temp); //len+1, including '\0'
355  return len+1;
356 }
357 
358 /*
359 * replace a letter of source string with a new string
360 * e.g., source string: 'Aijkx', replace i with new string 'olmx'
361 * then source string is changed to 'Aolmxjkx'
362 * source: its format should be produced from get_rand_op_str()
363 * pos: range from 1 to strlen(source)-2
364 */
365 int replace_a_to_str(char *source, int pos, char *newstr)
366 {
367  char temp[MAX_STR_LEN];
368  BaseString::snprintf(temp, pos+1, "%s", source);
369  BaseString::snprintf(temp+pos, strlen(newstr)+1, "%s", newstr);
370  BaseString::snprintf(temp+pos+strlen(newstr), strlen(source)-pos, "%s", source+pos+1);
371  BaseString::snprintf(source, strlen(temp)+1, "%s", temp);
372  return strlen(source);
373 }
374 
375 /*
376 * check whether the inputed char is an operation
377 */
378 bool check_op(char ch)
379 {
380  if( ch == 'a' || ch == 'A' || ch == 'o' || ch == 'O')
381  return true;
382  else
383  return false;
384 }
385 
386 /*
387 * check whether the inputed char is end flag
388 */
389 bool check_end(char ch)
390 {
391  return (ch == 'x');
392 }
393 
394 /*
395 * check whether the inputed char is end flag
396 */
397 bool check_col(char ch)
398 {
399  if( ch == 'i' || ch == 'j' || ch == 'k'
400  || ch == 'l' || ch == 'm' || ch == 'n' )
401  return true;
402  else
403  return false;
404 }
405 
406 /*
407 * To ensure we can get a random string with RECURSIVE_LEVEL,
408 * we need a position where can replace a letter with a new string.
409 */
410 int get_rand_replace_pos(char *str, int len)
411 {
412  int pos_op = 0;
413  int pos_x = 0;
414  int pos_col = 0;
415  int span = 0;
416  static int num = 0;
417  char temp;
418 
419  for(int i = 0; i < len; i++)
420  {
421  temp = str[i];
422  if(! check_end(temp))
423  {
424  if(check_op(temp))
425  pos_op = i;
426  }
427  else
428  {
429  pos_x = i;
430  break;
431  }
432  }
433 
434  if(++num == 0)
435  num = 1;
436 
437  span = pos_x - pos_op - 1;
438  if(span <= 1)
439  {
440  pos_col = pos_op + 1;
441  }
442  else
443  {
444  srand(num*(unsigned int)time(NULL));
445  pos_col = pos_op + rand() % span + 1;
446  }
447  return pos_col;
448 }
449 
450 /*
451 * Check whether the given random string is valid
452 * and applicable for this test case
453 */
454 bool check_random_str(char *str)
455 {
456  char *p;
457  int op_num = 0;
458  int end_num = 0;
459 
460  for(p = str; *p; p++)
461  {
462  bool tmp1 = false, tmp2 = false;
463  if ((tmp1 = check_op(*p)))
464  op_num++;
465  if ((tmp2 = check_end(*p)))
466  end_num++;
467  if (!(tmp1 || tmp2 || check_col(*p))) //there are illegal letters
468  return false;
469  }
470 
471  if(op_num != end_num) //begins are not equal to ends
472  return false;
473 
474  return true;
475 }
476 
477 /*
478 * Get a random string with RECURSIVE_LEVEL
479 */
480 void get_rand_op_str_compound(char *str)
481 {
482  char small_str[256];
483  int pos;
484  int tmp;
485  int level;
486  static int num = 0;
487 
488  if(++num == 0)
489  num = 1;
490 
491  srand(num*(unsigned int)time(NULL));
492  level = 1 + rand() % RECURSIVE_LEVEL;
493 
494  get_rand_op_str(str);
495 
496  for(int i = 0; i < level; i++)
497  {
498  get_rand_op_str(small_str);
499  tmp = strlen(small_str);
500  get_rand_op_str(small_str + tmp); //get two operations
501  pos = get_rand_replace_pos(str, strlen(str));
502  replace_a_to_str(str, pos, small_str);
503  }
504 
505  //check the random string
506  if(!check_random_str(str))
507  {
508  fprintf(stderr, "Error random string! \n");
509  exit(-1);
510  }
511 }
512 
513 /*
514 * get column id of i,j,k,l,m,n
515 */
516 int get_column_id(char ch)
517 {
518  return (ch - 'i' + 1); //from 1 to 6
519 }
520 
521 /*
522 * check whether column value of the NO. tuple is equal to 1
523 * col_id: column id, range from 1 to 6
524 * tuple_no: record NO., range from 0 to 63
525 */
526 bool check_col_equal_one(int tuple_no, int col_id)
527 {
528  int i = 1 << (6 - col_id);
529  int j = tuple_no / i;
530  if(j % 2)
531  return true;
532  else
533  return false;
534 }
535 
536 /*
537 * get a result after all elements in the array with AND
538 * value: pointer to a bool array
539 * len: length of the bool array
540 */
541 bool AND_op(bool *value, int len)
542 {
543  for(int i = 0; i < len; i++)
544  {
545  if(! value[i])
546  return false;
547  }
548  return true;
549 }
550 
551 /*
552 * get a result after all elements in the array with OR
553 * value: pointer to a bool array
554 * len: length of the bool array
555 */
556 bool OR_op(bool *value, int len)
557 {
558  for(int i = 0; i < len; i++)
559  {
560  if(value[i])
561  return true;
562  }
563  return false;
564 }
565 
566 /*
567 * get a result after all elements in the array with NAND
568 * value: pointer to a bool array
569 * len: length of the bool array
570 */
571 bool NAND_op(bool *value, int len)
572 {
573  return (! AND_op(value, len));
574 }
575 
576 /*
577 * get a result after all elements in the array with NOR
578 * value: pointer to a bool array
579 * len: length of the bool array
580 */
581 bool NOR_op(bool *value, int len)
582 {
583  return (! OR_op(value, len));
584 }
585 
586 /*
587 * AND/NAND/OR/NOR operation for a bool array
588 */
589 bool calculate_one_op(char op_type, bool *value, int len)
590 {
591  switch(op_type)
592  {
593  case 'a':
594  return AND_op(value, len);
595  break;
596  case 'o':
597  return OR_op(value, len);
598  break;
599  case 'A':
600  return NAND_op(value, len);
601  break;
602  case 'O':
603  return NOR_op(value, len);
604  break;
605  }
606  return false; //make gcc happy
607 }
608 
609 typedef struct _stack_element
610 {
611  char type;
612  int num;
614 
615 /*
616 * stack_op, store info for AND,OR,NAND,NOR
617 * stack_col, store value of column(i,j,k,l,m,n) and temporary result for an operation
618 */
619 stack_element stack_op[RECURSIVE_LEVEL * COL_LEN];
620 bool stack_col[RECURSIVE_LEVEL * COL_LEN * 2];
621 
622 /*
623 * check whether the given tuple is chosen by judgement condition
624 * tuple_no, the NO of tuple in TABLE_NAME, range from 0 to TUPLE_NUM
625 * str: a random string of scan opearation and condition
626 * len: length of str
627 */
628 bool check_one_tuple(int tuple_no, char *str, int len)
629 {
630  int pop_op = 0;
631  int pop_col = 0;
632  for(int i = 0; i < len; i++)
633  {
634  char letter = *(str + i);
635  if(check_op(letter)) //push
636  {
637  stack_op[pop_op].type = letter;
638  stack_op[pop_op].num = 0;
639  pop_op++;
640  }
641  if(check_col(letter)) //push
642  {
643  stack_col[pop_col] = check_col_equal_one(tuple_no, get_column_id(letter));
644  pop_col++;
645  stack_op[pop_op-1].num += 1;
646  }
647  if(check_end(letter))
648  {
649  if(pop_op <= 1)
650  {
651  return calculate_one_op(stack_op[pop_op-1].type,
652  stack_col,
653  stack_op[pop_op-1].num);
654  }
655  else
656  {
657  bool tmp1 = calculate_one_op(stack_op[pop_op-1].type,
658  stack_col + pop_col - stack_op[pop_op-1].num,
659  stack_op[pop_op-1].num);
660  pop_col -= stack_op[pop_op-1].num; //pop
661  pop_op--;
662  stack_col[pop_col] = tmp1; //push
663  pop_col++;
664  stack_op[pop_op-1].num += 1;
665  }
666  }
667  }
668  return false; //make gcc happy
669 }
670 
671 /*
672 * get lists of tuples which match the scan condiction through calculating
673 * str: a random string of scan opearation and condition
674 */
675 void check_all_tuples(char *str, bool *res)
676 {
677  for (int i = 0; i < TUPLE_NUM; i++)
678  {
679  if(check_one_tuple(i, str, strlen(str)))
680  res[i] = true;
681  }
682 }
683 
684 /*
685 * convert a letter to group number what ndbapi need
686 */
687 NdbScanFilter::Group get_api_group(char op_name)
688 {
689  switch (op_name) {
690  case 'a': return NdbScanFilter::AND;
691  case 'o': return NdbScanFilter::OR;
692  case 'A': return NdbScanFilter::NAND;
693  case 'O': return NdbScanFilter::NOR;
694  default:
695  fprintf(stderr, "Invalid group name %c !\n", op_name);
696  exit(3);
697  }
698 }
699 
700 /*
701 * with ndbapi, call begin, eq/ne/lt/gt/le/ge..., end
702 */
703 NdbScanFilter * call_ndbapi(char *str, NdbTransaction *transaction,
704  NdbScanOperation *scan, NdbDictionary::Column const *col[])
705 {
706  NdbScanFilter *scanfilter = new NdbScanFilter(scan);
707  char *p;
708 
709  for (p = str; *p; p++)
710  {
711  if(check_op(*p))
712  {
713  if(scanfilter->begin(get_api_group(*p)))
714  ERR_EXIT(transaction, "filter begin() failed");
715  }
716  if(check_col(*p))
717  {
718  if(scanfilter->eq(col[*p-'i'+1]->getColumnNo(), (Uint32)1))
719  ERR_EXIT(transaction, "filter eq() failed");
720  }
721  if(check_end(*p))
722  {
723  if(scanfilter->end())
724  {
725  NdbError err= scanfilter->getNdbError();
726  printf("Problem closing ScanFilter= %d\n", err.code);
727  ERR_EXIT(transaction, "filter end() failed");
728  }
729  }
730  }
731 
732  return scanfilter;
733 }
734 
735 /*
736 * get the tuples through ndbapi, and save the tuples NO.
737 * str: a random string of scan opearation and condition
738 */
739 void ndbapi_tuples(Ndb *ndb, char *str, bool *res)
740 {
741  const NdbDictionary::Dictionary *dict = ndb->getDictionary();
742  if (!dict)
743  ERR_EXIT(ndb, "Can't get dict");
744 
745  const NdbDictionary::Table *table = dict->getTable(TABLE_NAME);
746  if (!table)
747  ERR_EXIT(dict, "Can't get table"TABLE_NAME);
748 
749  const NdbDictionary::Column *col[COL_LEN];
750 
751  for(int ii = 0; ii < COL_LEN; ii++)
752  {
753  char tmp[128];
754  col[ii] = table->getColumn(COL_NAME[ii]);
755  if(!col[ii])
756  {
757  BaseString::snprintf(tmp, 128, "Can't get column %s", COL_NAME[ii]);
758  ERR_EXIT(dict, tmp);
759  }
760  }
761 
762  NdbTransaction *transaction;
763  NdbScanOperation *scan;
764  NdbScanFilter *filter;
765 
766  transaction = ndb->startTransaction();
767  if (!transaction)
768  ERR_EXIT(ndb, "Can't start transaction");
769 
770  scan = transaction->getNdbScanOperation(table);
771  if (!scan)
772  ERR_EXIT(transaction, "Can't get scan op");
773 
775  ERR_EXIT(scan, "Can't set up read");
776 
777  NdbRecAttr *rec[COL_LEN];
778  for(int ii = 0; ii < COL_LEN; ii++)
779  {
780  char tmp[128];
781  rec[ii] = scan->getValue(COL_NAME[ii]);
782  if(!rec[ii])
783  {
784  BaseString::snprintf(tmp, 128, "Can't get rec of %s", COL_NAME[ii]);
785  ERR_EXIT(scan, tmp);
786  }
787  }
788 
789  filter = call_ndbapi(str, transaction, scan, col);
790 
791  if (transaction->execute(NdbTransaction::NoCommit))
792  ERR_EXIT(transaction, "Can't execute");
793 
794  int i,j,k,l,m,n;
795  while (scan->nextResult(true) == 0)
796  {
797  do
798  {
799  i = rec[1]->u_32_value();
800  j = rec[2]->u_32_value();
801  k = rec[3]->u_32_value();
802  l = rec[4]->u_32_value();
803  m = rec[5]->u_32_value();
804  n = rec[6]->u_32_value();
805  res[32*i+16*j+8*k+4*l+2*m+n] = true;
806  } while (scan->nextResult(false) == 0);
807  }
808 
809  delete filter;
810  transaction->close();
811 }
812 
813 /*
814 * compare the result between calculation and NDBAPI
815 * str: a random string of scan opearation and condition
816 * return: true stands for ndbapi ok, false stands for ndbapi failed
817 */
818 template class Vector<bool>;
819 bool compare_cal_ndb(char *str, Ndb *ndb)
820 {
821  Vector<bool> res_cal;
822  Vector<bool> res_ndb;
823 
824  for(int i = 0; i < TUPLE_NUM; i++)
825  {
826  res_cal.push_back(false);
827  res_ndb.push_back(false);
828  }
829 
830  check_all_tuples(str, res_cal.getBase());
831  ndbapi_tuples(ndb, str, res_ndb.getBase());
832 
833  for(int i = 0; i < TUPLE_NUM; i++)
834  {
835  if(res_cal[i] != res_ndb[i])
836  return false;
837  }
838  return true;
839 }
840 
841 
842 int runCreateTables(NDBT_Context* ctx, NDBT_Step* step)
843 {
844  Ndb *pNdb = GETNDB(step);
845  pNdb->getDictionary()->dropTable(MYTAB1.getName());
846  int ret = createTable(pNdb, &MYTAB1, false, true, 0);
847  if(ret)
848  return ret;
849  return NDBT_OK;
850 }
851 
852 
853 int runDropTables(NDBT_Context* ctx, NDBT_Step* step)
854 {
855  int ret = GETNDB(step)->getDictionary()->dropTable(MYTAB1.getName());
856  if(ret == -1)
857  return NDBT_FAILED;
858 
859  return NDBT_OK;
860 }
861 
862 int runScanRandomFilterTest(NDBT_Context* ctx, NDBT_Step* step)
863 {
864  char random_str[MAX_STR_LEN];
865  Ndb *myNdb = GETNDB(step);
866 
867  for(int i = 0; i < TEST_NUM; i++)
868  {
869  get_rand_op_str_compound(random_str);
870  if( !compare_cal_ndb(random_str, myNdb))
871  return NDBT_FAILED;
872  }
873 
874  return NDBT_OK;
875 }
876 
877 int runMaxScanFilterSize(NDBT_Context* ctx, NDBT_Step* step)
878 {
879  /* This testcase uses the ScanFilter methods to build a large
880  * scanFilter, checking that ScanFilter building fails
881  * at the expected point, with the correct error message
882  */
883  const Uint32 MaxLength= NDB_MAX_SCANFILTER_SIZE_IN_WORDS;
884 
885  const Uint32 InstructionWordsPerEq= 3;
886 
887  const Uint32 MaxEqsInScanFilter= MaxLength/InstructionWordsPerEq;
888 
889  Ndb *myNdb = GETNDB(step);
890  const NdbDictionary::Dictionary* myDict= myNdb->getDictionary();
891  const NdbDictionary::Table *myTable= myDict->getTable(TABLE_NAME);
892  if(myTable == NULL)
893  APIERROR(myDict->getNdbError());
894 
895  NdbInterpretedCode ic(myTable);
896 
897  NdbScanFilter sf(&ic);
898 
899  if (sf.begin()) // And group
900  {
901  ndbout << "Bad rc from begin\n";
902  ndbout << sf.getNdbError() << "\n";
903  return NDBT_FAILED;
904  }
905 
906  Uint32 loop=0;
907 
908  for (;loop < MaxEqsInScanFilter; loop++)
909  {
910  if (sf.eq(0u, 10u))
911  {
912  ndbout << "Bad rc from eq at loop " << loop << "\n";
913  ndbout << sf.getNdbError() << "\n";
914  return NDBT_FAILED;
915  }
916  }
917 
918  if (! sf.eq(0u, 10u))
919  {
920  ndbout << "Expected ScanFilter instruction addition to fail after"
921  << MaxEqsInScanFilter << "iterations, but it didn't\n";
922  return NDBT_FAILED;
923  }
924 
925  NdbError err=sf.getNdbError();
926 
927  if (err.code != 4294)
928  {
929  ndbout << "Expected to get error code 4294, but instead got " << err.code << "\n";
930  return NDBT_FAILED;
931  }
932 
933  return NDBT_OK;
934 }
935 
936 
937 int runScanFilterConstructorFail(NDBT_Context* ctx, NDBT_Step* step)
938 {
939  /* We test that failures in the ScanFilter constructor can be
940  * detected by the various ScanFilter methods without
941  * issues
942  */
943  Ndb *myNdb = GETNDB(step);
944  const NdbDictionary::Dictionary* myDict= myNdb->getDictionary();
945  const NdbDictionary::Table *myTable= myDict->getTable(TABLE_NAME);
946  if(myTable == NULL)
947  APIERROR(myDict->getNdbError());
948 
949  NdbTransaction* trans=myNdb->startTransaction();
950 
951  if (trans == NULL)
952  {
953  APIERROR(trans->getNdbError());
954  return NDBT_FAILED;
955  }
956 
957  /* Create an NdbRecord scan operation */
958  const NdbScanOperation* tabScan=
959  trans->scanTable(myTable->getDefaultRecord());
960 
961  if (tabScan==NULL)
962  {
963  APIERROR(trans->getNdbError());
964  return NDBT_FAILED;
965  }
966 
967  /* Now we hackily try to add a ScanFilter after the operation
968  * is defined. This will cause a failure within the
969  * constructor
970  */
971  NdbScanFilter brokenSf((NdbScanOperation*) tabScan);
972 
973  /* Scan operation should have an error */
974  if (tabScan->getNdbError().code != 4536)
975  {
976  ndbout << "Expected error 4536, had error " <<
977  tabScan->getNdbError().code << " instead" << endl;
978  return NDBT_FAILED;
979  }
980 
981  /* ScanFilter should have an error */
982  if (brokenSf.getNdbError().code != 4539)
983  {
984  ndbout << "Expected error 4539, had error " <<
985  brokenSf.getNdbError().code << " instead" << endl;
986  return NDBT_FAILED;
987  }
988 
989  if (brokenSf.begin() != -1)
990  { ndbout << "Bad rc from begin" << endl; return NDBT_FAILED; }
991 
992  if (brokenSf.istrue() != -1)
993  { ndbout << "Bad rc from istrue" << endl; return NDBT_FAILED; }
994 
995  if (brokenSf.isfalse() != -1)
996  { ndbout << "Bad rc from isfalse" << endl; return NDBT_FAILED; }
997 
998  if (brokenSf.isnull(0) != -1)
999  { ndbout << "Bad rc from isnull" << endl; return NDBT_FAILED; }
1000 
1001  if (brokenSf.isnotnull(0) != -1)
1002  { ndbout << "Bad rc from isnotnull" << endl; return NDBT_FAILED; }
1003 
1004  if (brokenSf.cmp(NdbScanFilter::COND_EQ, 0, NULL, 0) != -1)
1005  { ndbout << "Bad rc from cmp" << endl; return NDBT_FAILED; }
1006 
1007  if (brokenSf.end() != -1)
1008  { ndbout << "Bad rc from begin" << endl; return NDBT_FAILED; }
1009 
1010  trans->close();
1011 
1012  /* Now we check that we can define a ScanFilter before
1013  * calling readTuples() for a scan operation
1014  */
1015  trans= myNdb->startTransaction();
1016 
1017  if (trans == NULL)
1018  {
1019  APIERROR(trans->getNdbError());
1020  return NDBT_FAILED;
1021  }
1022 
1023  /* Get an old Api table scan operation */
1024  NdbScanOperation* tabScanOp=
1025  trans->getNdbScanOperation(myTable);
1026 
1027  if (tabScanOp==NULL)
1028  {
1029  APIERROR(trans->getNdbError());
1030  return NDBT_FAILED;
1031  }
1032 
1033  /* Attempt to define a ScanFilter before calling readTuples() */
1034  NdbScanFilter sf(tabScanOp);
1035 
1036  /* Should be no problem ... */
1037  if (sf.getNdbError().code != 0)
1038  { APIERROR(sf.getNdbError()); return NDBT_FAILED; };
1039 
1040 
1041  /* Ok, now attempt to define a ScanFilter against a primary key op */
1042  NdbOperation* pkOp= trans->getNdbOperation(myTable);
1043 
1044  if (pkOp == NULL)
1045  {
1046  APIERROR(trans->getNdbError());
1047  return NDBT_FAILED;
1048  }
1049 
1050  NdbScanFilter sf2(pkOp);
1051 
1052  if (sf2.getNdbError().code != 4539)
1053  {
1054  ndbout << "Error, expected 4539" << endl;
1055  APIERROR(sf2.getNdbError());
1056  return NDBT_FAILED;
1057  }
1058 
1059  return NDBT_OK;
1060 }
1061 
1062 bool getBit(const Uint32* bitMap,
1063  int bitNum)
1064 {
1065  return ((bitMap[ bitNum >> 5 ] & (1 << (bitNum & 31))) != 0);
1066 }
1067 
1068 void setBit(Uint32* bitMap, int bitMapByteSize,
1069  int bitNum)
1070 {
1071  assert(bitNum >= 0 && bitNum < (bitMapByteSize * 8));
1072  bitMap[ bitNum >> 5 ] |= (1 << (bitNum & 31));
1073 }
1074 
1075 enum TestConditions {
1076  COND_LE = 0,
1077  COND_LT = 1,
1078  COND_GE = 2,
1079  COND_GT = 3,
1080  COND_EQ = 4,
1081  COND_NE = 5,
1082  COND_NULL = 6,
1083  COND_NOTNULL = 7,
1084  COND_AND_EQ_MASK = 8,
1085  COND_AND_NE_MASK = 9,
1086  COND_AND_EQ_ZERO = 10,
1087  COND_AND_NE_ZERO = 11
1088 };
1089 
1090 int getExpectedBitsSet(int rowId,
1091  int colBitWidth)
1092 {
1093  /* returns value from -1 to colBitWidth -1
1094  * -1 == no bits set
1095  * 0 == bit 0 set
1096  * 1 == bit 0 + bit 1 set
1097  * ...
1098  */
1099  return (rowId % (colBitWidth + 1)) -1;
1100 }
1101 
1102 bool isNullValue(int rowId,
1103  int colBitWidth)
1104 {
1105  /* Occasionally we'll have a Null column */
1106  return (((rowId + colBitWidth) % 13) == 0);
1107 }
1108 
1109 void getBitfieldVariants(int bitNum, int& offset, bool& invert)
1110 {
1111  offset= 0;
1112  invert= false;
1113  if ((bitNum % 5) == 3)
1114  {
1115  /* Invert the mask */
1116  invert= true;
1117  }
1118  if ((bitNum % 7) == 6)
1119  {
1120  /* Shift the mask */
1121  offset= (bitNum / 2);
1122  }
1123 };
1124 
1125 
1126 bool isRowExpected(int rowId,
1127  TestConditions cond,
1128  int colBitWidth,
1129  int bitsSetInScanFilter,
1130  bool isNullable,
1131  const Uint32* maskBuff)
1132 {
1133  if (isNullable && isNullValue(rowId, colBitWidth))
1134  {
1135  switch (cond) {
1136  case COND_LE:
1137  return true; // null < any value
1138  case COND_LT:
1139  return true; // null < any value
1140  case COND_GE:
1141  return false; // null < any value
1142  case COND_GT:
1143  return false; // null < any value
1144  case COND_EQ:
1145  return false; // null != any value
1146  case COND_NE:
1147  return true; // null != any value
1148  case COND_NULL:
1149  return true; // null is null
1150  case COND_NOTNULL:
1151  return false;
1152  case COND_AND_EQ_MASK:
1153  return false; // NULL AND MASK != MASK
1154  case COND_AND_NE_MASK:
1155  return true; // NULL AND MASK != MASK
1156  case COND_AND_EQ_ZERO:
1157  return false; // NULL AND MASK != 0
1158  case COND_AND_NE_ZERO:
1159  return true; // NULL AND MASK != 0
1160  default:
1161  printf("isRowExpected given bad condition : %u\n",
1162  cond);
1163  return false;
1164  }
1165  }
1166  else
1167  {
1168  /* Not a null value */
1169  int expectedBitsSet= getExpectedBitsSet(rowId, colBitWidth);
1170 
1171  int offset= 0;
1172  bool invert= false;
1173 
1174  getBitfieldVariants(bitsSetInScanFilter + 1, offset, invert);
1175 
1176  switch (cond) {
1177  case COND_LE:
1178  return expectedBitsSet <= bitsSetInScanFilter;
1179  case COND_LT:
1180  return expectedBitsSet < bitsSetInScanFilter;
1181  case COND_GE:
1182  return expectedBitsSet >= bitsSetInScanFilter;
1183  case COND_GT:
1184  return expectedBitsSet > bitsSetInScanFilter;
1185  case COND_EQ:
1186  return expectedBitsSet == bitsSetInScanFilter;
1187  case COND_NE:
1188  return expectedBitsSet != bitsSetInScanFilter;
1189  case COND_NULL:
1190  return false;
1191  case COND_NOTNULL:
1192  return true;
1193  case COND_AND_EQ_MASK:
1194  case COND_AND_NE_MASK:
1195  {
1196  bool result= true;
1197  /* Compare data AND mask to the mask buff */
1198  for (int idx=0; idx < colBitWidth; idx++)
1199  {
1200  bool bitVal= (expectedBitsSet >= 0)?
1201  (expectedBitsSet >= idx): false;
1202  bool maskVal= getBit(maskBuff, idx);
1203  if ((bitVal & maskVal) != maskVal)
1204  {
1205  /* Difference to mask, know result */
1206  result= false;
1207  break;
1208  }
1209  }
1210 
1211  /* Invert result for NE condition */
1212  return (result ^ (cond == COND_AND_NE_MASK));
1213  }
1214  case COND_AND_EQ_ZERO:
1215  case COND_AND_NE_ZERO:
1216  {
1217  bool result= true;
1218  /* Compare data AND mask to zero */
1219  for (int idx=0; idx < colBitWidth; idx++)
1220  {
1221  bool bitVal= (expectedBitsSet >= 0)?
1222  (expectedBitsSet >= idx): false;
1223  bool maskVal= getBit(maskBuff, idx);
1224  if ((bitVal & maskVal) != 0)
1225  {
1226  /* Difference to 0, know result */
1227  result= false;
1228  break;
1229  }
1230  }
1231  /* Invert result for NE condition */
1232  return (result ^ (cond == COND_AND_NE_ZERO));
1233  }
1234  default:
1235  printf("isRowExpected given bad condition : %u\n",
1236  cond);
1237  return false;
1238  }
1239  }
1240 }
1241 
1242 int insertBitRows(Ndb* pNdb)
1243 {
1244  const NdbDictionary::Dictionary* myDict= pNdb->getDictionary();
1245  const NdbDictionary::Table *myTable= myDict->getTable(TABLE2_NAME);
1246  if(myTable == NULL)
1247  APIERROR(myDict->getNdbError());
1248 
1249  for (int i=0; i< TOTAL_ROWS; i++)
1250  {
1251  NdbTransaction* myTrans = pNdb->startTransaction();
1252  if (myTrans == NULL)
1253  APIERROR(pNdb->getNdbError());
1254 
1255  NdbOperation* insertOp= myTrans->getNdbOperation(myTable);
1256  if (insertOp == NULL)
1257  APIERROR(pNdb->getNdbError());
1258 
1259  const int buffSize= (MAX_BIT_WIDTH + 31) / 32;
1260  Uint32 buff[ buffSize ];
1261 
1262  if (insertOp->insertTuple() != 0)
1263  APIERROR(insertOp->getNdbError());
1264 
1265  if (insertOp->equal((Uint32)0, i) != 0) // Set id column
1266  APIERROR(insertOp->getNdbError());
1267 
1268  for (int col=1; col < myTable->getNoOfColumns(); col++)
1269  {
1270  const NdbDictionary::Column* c= myTable->getColumn(col);
1271  int colBitWidth= c->getLength();
1272  bool isNullable= c->getNullable();
1273 
1274  if (isNullable &&
1275  isNullValue(i, colBitWidth))
1276  {
1277  /* Set column value to NULL */
1278  if (insertOp->setValue(col, (char*) NULL) != 0)
1279  APIERROR(insertOp->getNdbError());
1280  }
1281  else
1282  {
1283  /* Set lowest bits in this column */
1284  memset(buff, 0, (4 * buffSize));
1285 
1286  int bitsToSet= getExpectedBitsSet(i, colBitWidth);
1287 
1288  if (bitsToSet >= 0)
1289  {
1290  for (int idx=0; idx <= bitsToSet; idx++)
1291  setBit(buff, sizeof(buff), idx);
1292  }
1293 
1294  if (insertOp->setValue(col, (char *)buff) != 0)
1295  APIERROR(insertOp->getNdbError());
1296  }
1297  }
1298 
1299  if (myTrans->execute(NdbTransaction::Commit) != 0)
1300  {
1301  APIERROR(myTrans->getNdbError());
1302  }
1303  myTrans->close();
1304  }
1305 
1306  printf("Inserted %u rows\n", TOTAL_ROWS);
1307 
1308  return NDBT_OK;
1309 }
1310 
1311 int verifyBitData(Ndb* pNdb)
1312 {
1313  const NdbDictionary::Dictionary* myDict= pNdb->getDictionary();
1314  const NdbDictionary::Table *myTable= myDict->getTable(TABLE2_NAME);
1315  if(myTable == NULL)
1316  APIERROR(myDict->getNdbError());
1317 
1318  NdbTransaction* myTrans= pNdb->startTransaction();
1319  if (myTrans == NULL)
1320  APIERROR(pNdb->getNdbError());
1321 
1322  NdbScanOperation* scanOp= myTrans->getNdbScanOperation(myTable);
1323  if (scanOp == NULL)
1324  APIERROR(pNdb->getNdbError());
1325 
1326  if (scanOp->readTuples() != 0)
1327  APIERROR(scanOp->getNdbError());
1328 
1329  NdbRecAttr* results[ NUM_COLS ];
1330 
1331  for (int col=0; col< NUM_COLS; col++)
1332  {
1333  if ((results[col]= scanOp->getValue(col)) == NULL)
1334  APIERROR(scanOp->getNdbError());
1335  }
1336 
1337  if (myTrans->execute(NdbTransaction::NoCommit) != 0)
1338  APIERROR(myTrans->getNdbError());
1339 
1340  for (int row=0; row < TOTAL_ROWS; row++)
1341  {
1342  if (scanOp->nextResult() != 0)
1343  APIERROR(scanOp->getNdbError());
1344 
1345  int rowId= results[0]->int32_value();
1346 
1347  for (int col=1; col < NUM_COLS; col++)
1348  {
1349  const NdbDictionary::Column* c= myTable->getColumn(col);
1350  bool isNullable= c->getNullable();
1351  int colBitWidth= c->getLength();
1352 
1353  if (isNullable &&
1354  isNullValue(rowId, colBitWidth))
1355  {
1356  if (!results[ col ] ->isNULL())
1357  {
1358  printf("Mismatch at result %d row %d, column %d, expected NULL\n",
1359  row, rowId, col);
1360  myTrans->close();
1361  return NDBT_FAILED;
1362  }
1363  }
1364  else
1365  {
1366  /* Non null value, check it */
1367  int expectedSetBits= getExpectedBitsSet(rowId, colBitWidth);
1368 
1369  const Uint32* val= (const Uint32 *) results[ col ]->aRef();
1370 
1371  for (int bitNum=0; bitNum < colBitWidth; bitNum++)
1372  {
1373  bool expectClear= (bitNum > expectedSetBits);
1374  bool isClear= ! getBit(val, bitNum);
1375  if (expectClear != isClear)
1376  {
1377  printf("Mismatch at result %d row %d, column %d, bit %d"
1378  " expected %d \n",
1379  row, rowId, col, bitNum, (expectClear)?0:1);
1380  myTrans->close();
1381  return NDBT_FAILED;
1382  }
1383  }
1384  }
1385  }
1386  }
1387 
1388  if (scanOp->nextResult() != 1)
1389  {
1390  printf("Too many rows returned\n");
1391  return NDBT_FAILED;
1392  }
1393 
1394  if (myTrans->execute(NdbTransaction::Commit) != 0)
1395  APIERROR(myTrans->getNdbError());
1396 
1397  myTrans->close();
1398 
1399  printf("Verified data for %u rows\n",
1400  TOTAL_ROWS);
1401 
1402  return NDBT_OK;
1403 }
1404 
1405 int verifyBitScanFilter(Ndb* pNdb)
1406 {
1407  const NdbDictionary::Dictionary* myDict= pNdb->getDictionary();
1408  const NdbDictionary::Table *myTable= myDict->getTable(TABLE2_NAME);
1409  if(myTable == NULL)
1410  APIERROR(myDict->getNdbError());
1411 
1412  /* Perform scan with scanfilter for :
1413  * - each column in the table
1414  * - each supported comparison type
1415  * - each potentially set bit in the column
1416  */
1417  int scanCount= 0;
1418  for (int col=1; col < NUM_COLS; col++)
1419  {
1420  const NdbDictionary::Column* c= myTable->getColumn(col);
1421  const int bitWidth= c->getLength();
1422  printf("Testing %s column %u (width=%u bits) with %u scan filter variants\n",
1423  (c->getNullable())?"Nullable":"Non-null",
1424  col, bitWidth, ((bitWidth+1) * (COND_AND_NE_ZERO + 1)));
1425  for (int comp=0; comp <= COND_AND_NE_ZERO; comp++)
1426  {
1427  for (int bitNum=0; bitNum <= bitWidth; bitNum++)
1428  {
1429  /* Define scan */
1430  NdbTransaction* myTrans= pNdb->startTransaction();
1431  if (myTrans == NULL)
1432  APIERROR(pNdb->getNdbError());
1433 
1434  NdbScanOperation* scanOp= myTrans->getNdbScanOperation(myTable);
1435  if (scanOp == NULL)
1436  APIERROR(pNdb->getNdbError());
1437 
1438  if (scanOp->readTuples() != 0)
1439  APIERROR(scanOp->getNdbError());
1440 
1441  NdbRecAttr* ra;
1442  if ((ra= scanOp->getValue((Uint32)0)) == NULL)
1443  APIERROR(scanOp->getNdbError());
1444 
1445  /* Define ScanFilter */
1446  const int buffSize= (MAX_BIT_WIDTH + 31)/32;
1447  Uint32 buff[ buffSize ];
1448  memset(buff, 0, (4 * buffSize));
1449 
1450  /* Define constant value, with some variants for bitwise operators */
1451  bool invert= false;
1452  int offset= 0;
1453 
1454  switch (comp) {
1455  case COND_AND_EQ_MASK:
1456  case COND_AND_NE_MASK:
1457  case COND_AND_EQ_ZERO:
1458  case COND_AND_NE_ZERO:
1459  getBitfieldVariants(bitNum, offset, invert);
1460  default:
1461  ;
1462  }
1463 
1464  /* Set lower bitNum -1 bits
1465  * If bitNum == 0, set none
1466  */
1467  int bitsSetInFilter= bitNum-1;
1468 
1469  if ((bitsSetInFilter >= 0) ||
1470  invert)
1471  {
1472  for (int idx=0; idx < (32 * buffSize); idx++)
1473  {
1474  if ((idx >= offset) &&
1475  (idx <= (offset + bitsSetInFilter)))
1476  {
1477  if (!invert)
1478  setBit(buff, sizeof(buff), idx);
1479  }
1480  else
1481  {
1482  if (invert)
1483  setBit(buff, sizeof(buff), idx);
1484  }
1485  }
1486  }
1487 
1488 
1489 
1490  NdbScanFilter sf(scanOp);
1491 
1492  if (sf.begin(NdbScanFilter::AND) != 0)
1493  APIERROR(sf.getNdbError());
1494 
1495  if ((comp != COND_NULL) &&
1496  (comp != COND_NOTNULL))
1497  {
1498  /* Operator with a constant */
1499  if (sf.cmp((NdbScanFilter::BinaryCondition)comp,
1500  col,
1501  (char*)buff) != 0)
1502  APIERROR(sf.getNdbError());
1503  }
1504  else
1505  {
1506  switch (comp) {
1507  case COND_NULL:
1508  if (sf.isnull(col) != 0)
1509  APIERROR(sf.getNdbError());
1510  break;
1511  case COND_NOTNULL:
1512  if (sf.isnotnull(col) != 0)
1513  APIERROR(sf.getNdbError());
1514  break;
1515  default:
1516  printf("Condition %u not supported\n", comp);
1517  return NDBT_FAILED;
1518  }
1519  }
1520 
1521  if (sf.end() != 0)
1522  APIERROR(sf.getNdbError());
1523 
1524  /* Calculate expected number of rows in result */
1525  const NdbDictionary::Column* c= myTable->getColumn(col);
1526  int colBitWidth= c->getLength();
1527  bool isNullable= c->getNullable();
1528 
1529  // printf("Determining expected rows\n");
1530  int expectedResultCount= 0;
1531  for (int i=0; i< TOTAL_ROWS; i++)
1532  {
1533  if (isRowExpected(i,
1534  (TestConditions)comp,
1535  colBitWidth,
1536  bitsSetInFilter,
1537  isNullable,
1538  buff))
1539  expectedResultCount++;
1540  }
1541 
1542 
1543  /* Execute */
1544  if (myTrans->execute(NdbTransaction::NoCommit) != 0)
1545  APIERROR(myTrans->getNdbError());
1546 
1547 
1548  /* Process results to ensure we got the expected rows back */
1549  int rc= 0;
1550  int count= 0;
1551  int matchCount= 0;
1552  // printf("Checking rows returned\n");
1553  while ((rc= scanOp->nextResult()) == 0)
1554  {
1555  int rowId= ra->int32_value();
1556  count++;
1557  /*
1558  * Check that this row was expected
1559  */
1560  if (isRowExpected(rowId,
1561  (TestConditions)comp,
1562  colBitWidth,
1563  bitsSetInFilter,
1564  isNullable,
1565  buff))
1566  {
1567  matchCount++;
1568  }
1569  else
1570  {
1571  printf("Col=%u Comp=%u BitNum=%u row=%u : "
1572  "Got row %u back which I did not expect\n",
1573  col, comp, bitNum, count, rowId);
1574  myTrans->close();
1575  return NDBT_FAILED;
1576  }
1577  }
1578 
1579  if (rc != 1)
1580  {
1581  printf("Col=%u Comp=%u BitNum=%u :"
1582  "nextResult failure %d\n", col, comp, bitNum, rc);
1583  APIERROR(myTrans->getNdbError());
1584  }
1585 
1586  //printf("Column %u, Comp=%u bitNum=%u num expectedResults=%u matchCount=%u\n",
1587  // col, comp, bitNum, expectedResultCount, matchCount);
1588 
1589  /* Check that we didn't miss any expected rows */
1590  if (matchCount != expectedResultCount)
1591  {
1592  printf("Col=%u Comp=%u BitNum=%u :"
1593  "Mismatch between expected(%u) and received(%u) result counts\n",
1594  col, comp, bitNum, expectedResultCount, matchCount);
1595  myTrans->close();
1596  return NDBT_FAILED;
1597  }
1598 
1599  if (myTrans->execute(NdbTransaction::Commit) != 0)
1600  APIERROR(myTrans->getNdbError());
1601 
1602  myTrans->close();
1603 
1604  scanCount++;
1605 
1606  } // for bitNum
1607  } // for comparison
1608  } // for column
1609 
1610  printf("Verified %u scans with bitfield ScanFilter conditions\n",
1611  scanCount);
1612 
1613  return NDBT_OK;
1614 }
1615 
1616 
1617 int runTestScanFilterBit(NDBT_Context* ctx, NDBT_Step* step)
1618 {
1619  /* Create table */
1620  Ndb *pNdb = GETNDB(step);
1621  pNdb->getDictionary()->dropTable(MYTAB2.getName());
1622  int ret = createTable(pNdb, &MYTAB2, false, true, 0);
1623  if(ret)
1624  return ret;
1625 
1626  /* Populate with data */
1627  if (insertBitRows(pNdb) != NDBT_OK)
1628  return NDBT_FAILED;
1629 
1630  /* Initial data check via scan */
1631  if (verifyBitData(pNdb) != NDBT_OK)
1632  return NDBT_FAILED;
1633 
1634  /* Verify Bit ScanFilter correctness */
1635  if (verifyBitScanFilter(pNdb) != NDBT_OK)
1636  return NDBT_FAILED;
1637 
1638  /* Drop table */
1639  pNdb->getDictionary()->dropTable(MYTAB2.getName());
1640 
1641  return NDBT_OK;
1642 }
1643 
1644 
1645 NDBT_TESTSUITE(testScanFilter);
1646 TESTCASE(TEST_NAME,
1647  "Scan table TABLE_NAME for the records which accord with \
1648  conditions of logical scan operations: AND/OR/NAND/NOR")
1649 {
1650  INITIALIZER(runCreateTables);
1651  INITIALIZER(runPopulate);
1652  INITIALIZER(runScanRandomFilterTest);
1653  INITIALIZER(runMaxScanFilterSize);
1654  INITIALIZER(runScanFilterConstructorFail);
1655  FINALIZER(runDropTables);
1656 }
1657 
1658 TESTCASE("TestScanFilterBit",
1659  "Test ScanFilter with bitfield columns")
1660 {
1661  INITIALIZER(runTestScanFilterBit);
1662 }
1663 
1664 NDBT_TESTSUITE_END(testScanFilter);
1665 
1666 
1667 int main(int argc, const char** argv)
1668 {
1669  ndb_init();
1670 
1672  if(con.connect(12, 5, 1))
1673  {
1674  return NDBT_ProgramExit(NDBT_FAILED);
1675  }
1676 
1677  NDBT_TESTSUITE_INSTANCE(testScanFilter);
1678  return testScanFilter.execute(argc, argv);
1679 }