MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
NdbIndexStatImpl.cpp
1 /* Copyright (C) 2003 MySQL AB
2 
3  This program is free software; you can redistribute it and/or modify
4  it under the terms of the GNU General Public License as published by
5  the Free Software Foundation; version 2 of the License.
6 
7  This program is distributed in the hope that it will be useful,
8  but WITHOUT ANY WARRANTY; without even the implied warranty of
9  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  GNU General Public License for more details.
11 
12  You should have received a copy of the GNU General Public License
13  along with this program; if not, write to the Free Software
14  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
15 
16 #include <ndb_global.h>
17 #include <Ndb.hpp>
18 #include <NdbTransaction.hpp>
19 #include <NdbRecAttr.hpp>
20 #include <NdbOut.hpp>
21 #include <NdbEnv.h>
22 #include <Bitmask.hpp>
23 #include <NdbSqlUtil.hpp>
24 #include <NdbRecord.hpp>
25 #include <NdbEventOperation.hpp>
26 #include "NdbIndexStatImpl.hpp"
27 
28 #undef min
29 #undef max
30 #define min(a, b) ((a) <= (b) ? (a) : (b))
31 #define max(a, b) ((a) >= (b) ? (a) : (b))
32 
33 static const char* const g_headtable_name = NDB_INDEX_STAT_HEAD_TABLE;
34 static const char* const g_sampletable_name = NDB_INDEX_STAT_SAMPLE_TABLE;
35 static const char* const g_sampleindex1_name = NDB_INDEX_STAT_SAMPLE_INDEX1;
36 
37 static const int ERR_NoSuchObject[] = { 709, 723, 4243, 0 };
38 static const int ERR_TupleNotFound[] = { 626, 0 };
39 
40 NdbIndexStatImpl::NdbIndexStatImpl(NdbIndexStat& facade) :
41  NdbIndexStat(*this),
42  m_facade(&facade),
43  m_keyData(m_keySpec, false, 2),
44  m_valueData(m_valueSpec, false, 2)
45 {
46  init();
47  m_query_mutex = NdbMutex_Create();
48  assert(m_query_mutex != 0);
49  m_eventOp = 0;
50  m_mem_handler = &c_mem_default_handler;
51 }
52 
53 void
54 NdbIndexStatImpl::init()
55 {
56  m_indexSet = false;
57  m_indexId = 0;
58  m_indexVersion = 0;
59  m_tableId = 0;
60  m_keyAttrs = 0;
61  m_valueAttrs = 0;
62  // buffers
63  m_keySpecBuf = 0;
64  m_valueSpecBuf = 0;
65  m_keyDataBuf = 0;
66  m_valueDataBuf = 0;
67  // cache
68  m_cacheBuild = 0;
69  m_cacheQuery = 0;
70  m_cacheClean = 0;
71  // head
72  init_head(m_facadeHead);
73 }
74 
75 NdbIndexStatImpl::~NdbIndexStatImpl()
76 {
77  reset_index();
78  if (m_query_mutex != 0)
79  {
80  NdbMutex_Destroy(m_query_mutex);
81  m_query_mutex = 0;
82  }
83 }
84 
85 // sys tables meta
86 
87 NdbIndexStatImpl::Sys::Sys(NdbIndexStatImpl* impl, Ndb* ndb) :
88  m_impl(impl),
89  m_ndb(ndb)
90 {
91  m_dic = m_ndb->getDictionary();
92  m_headtable = 0;
93  m_sampletable = 0;
94  m_sampleindex1 = 0;
95  m_obj_cnt = 0;
96 }
97 
98 NdbIndexStatImpl::Sys::~Sys()
99 {
100  m_impl->sys_release(*this);
101 }
102 
103 void
104 NdbIndexStatImpl::sys_release(Sys& sys)
105 {
106  // close schema trans if any exists
107  NdbDictionary::Dictionary* const dic = sys.m_dic;
108  (void)dic->endSchemaTrans(NdbDictionary::Dictionary::SchemaTransAbort);
109 
110  if (sys.m_headtable != 0)
111  {
112  sys.m_dic->removeTableGlobal(*sys.m_headtable, false);
113  sys.m_headtable = 0;
114  }
115  if (sys.m_sampletable != 0)
116  {
117  sys.m_dic->removeTableGlobal(*sys.m_sampletable, false);
118  sys.m_sampletable = 0;
119  }
120  if (sys.m_sampleindex1 != 0)
121  {
122  sys.m_dic->removeIndexGlobal(*sys.m_sampleindex1, false);
123  sys.m_sampleindex1 = 0;
124  }
125 }
126 
127 int
128 NdbIndexStatImpl::make_headtable(NdbDictionary::Table& tab)
129 {
130  tab.setName(g_headtable_name);
131  tab.setLogging(true);
132  int ret;
133  ret = tab.setFrm(g_ndb_index_stat_head_frm_data,
134  g_ndb_index_stat_head_frm_len);
135  if (ret != 0)
136  {
137  setError(ret, __LINE__);
138  return -1;
139  }
140  // key must be first
141  {
142  NdbDictionary::Column col("index_id");
143  col.setType(NdbDictionary::Column::Unsigned);
144  col.setPrimaryKey(true);
145  tab.addColumn(col);
146  }
147  {
148  NdbDictionary::Column col("index_version");
149  col.setType(NdbDictionary::Column::Unsigned);
150  col.setPrimaryKey(true);
151  tab.addColumn(col);
152  }
153  // table
154  {
155  NdbDictionary::Column col("table_id");
156  col.setType(NdbDictionary::Column::Unsigned);
157  col.setNullable(false);
158  tab.addColumn(col);
159  }
160  {
161  NdbDictionary::Column col("frag_count");
162  col.setType(NdbDictionary::Column::Unsigned);
163  col.setNullable(false);
164  tab.addColumn(col);
165  }
166  // current sample
167  {
168  NdbDictionary::Column col("value_format");
169  col.setType(NdbDictionary::Column::Unsigned);
170  col.setNullable(false);
171  tab.addColumn(col);
172  }
173  {
174  NdbDictionary::Column col("sample_version");
175  col.setType(NdbDictionary::Column::Unsigned);
176  col.setNullable(false);
177  tab.addColumn(col);
178  }
179  {
180  NdbDictionary::Column col("load_time");
181  col.setType(NdbDictionary::Column::Unsigned);
182  col.setNullable(false);
183  tab.addColumn(col);
184  }
185  {
186  NdbDictionary::Column col("sample_count");
187  col.setType(NdbDictionary::Column::Unsigned);
188  col.setNullable(false);
189  tab.addColumn(col);
190  }
191  {
192  NdbDictionary::Column col("key_bytes");
193  col.setType(NdbDictionary::Column::Unsigned);
194  col.setNullable(false);
195  tab.addColumn(col);
196  }
197  NdbError error;
198  if (tab.validate(error) == -1) {
199  setError(error.code, __LINE__);
200  return -1;
201  }
202  return 0;
203 }
204 
205 int
206 NdbIndexStatImpl::make_sampletable(NdbDictionary::Table& tab)
207 {
208  tab.setName(g_sampletable_name);
209  tab.setLogging(true);
210  int ret;
211  ret = tab.setFrm(g_ndb_index_stat_sample_frm_data,
212  g_ndb_index_stat_sample_frm_len);
213  if (ret != 0)
214  {
215  setError(ret, __LINE__);
216  return -1;
217  }
218  // key must be first
219  {
220  NdbDictionary::Column col("index_id");
221  col.setType(NdbDictionary::Column::Unsigned);
222  col.setPrimaryKey(true);
223  tab.addColumn(col);
224  }
225  {
226  NdbDictionary::Column col("index_version");
227  col.setType(NdbDictionary::Column::Unsigned);
228  col.setPrimaryKey(true);
229  tab.addColumn(col);
230  }
231  {
232  NdbDictionary::Column col("sample_version");
233  col.setType(NdbDictionary::Column::Unsigned);
234  col.setPrimaryKey(true);
235  tab.addColumn(col);
236  }
237  {
238  NdbDictionary::Column col("stat_key");
240  col.setPrimaryKey(true);
241  col.setLength(MaxKeyBytes);
242  tab.addColumn(col);
243  }
244  // value
245  {
246  NdbDictionary::Column col("stat_value");
248  col.setNullable(false);
249  col.setLength(MaxValueCBytes);
250  tab.addColumn(col);
251  }
252  NdbError error;
253  if (tab.validate(error) == -1) {
254  setError(error.code, __LINE__);
255  return -1;
256  }
257  return 0;
258 }
259 
260 int
261 NdbIndexStatImpl::make_sampleindex1(NdbDictionary::Index& ind)
262 {
263  ind.setTable(g_sampletable_name);
264  ind.setName(g_sampleindex1_name);
266  ind.setLogging(false);
267  ind.addColumnName("index_id");
268  ind.addColumnName("index_version");
269  ind.addColumnName("sample_version");
270  return 0;
271 }
272 
273 int
274 NdbIndexStatImpl::check_table(const NdbDictionary::Table& tab1,
275  const NdbDictionary::Table& tab2)
276 {
277  if (tab1.getNoOfColumns() != tab2.getNoOfColumns())
278  return -1;
279  const uint n = tab1.getNoOfColumns();
280  for (uint i = 0; i < n; i++)
281  {
282  const NdbDictionary::Column* col1 = tab1.getColumn(i);
283  const NdbDictionary::Column* col2 = tab2.getColumn(i);
284  require(col1 != 0 && col2 != 0);
285  if (!col1->equal(*col2))
286  return -1;
287  }
288  return 0;
289 }
290 
291 int
292 NdbIndexStatImpl::check_index(const NdbDictionary::Index& ind1,
293  const NdbDictionary::Index& ind2)
294 {
295  if (ind1.getNoOfColumns() != ind2.getNoOfColumns())
296  return -1;
297  const uint n = ind1.getNoOfColumns();
298  for (uint i = 0; i < n; i++)
299  {
300  const NdbDictionary::Column* col1 = ind1.getColumn(i);
301  const NdbDictionary::Column* col2 = ind2.getColumn(i);
302  require(col1 != 0 && col2 != 0);
303  // getColumnNo() does not work on non-retrieved
304  if (!col1->equal(*col2))
305  return -1;
306  }
307  return 0;
308 }
309 
310 int
311 NdbIndexStatImpl::get_systables(Sys& sys)
312 {
313  Ndb* ndb = sys.m_ndb;
314  NdbDictionary::Dictionary* const dic = ndb->getDictionary();
315  const int NoSuchTable = 723;
316  const int NoSuchIndex = 4243;
317 
318  sys.m_headtable = dic->getTableGlobal(g_headtable_name);
319  if (sys.m_headtable == 0)
320  {
321  int code = dic->getNdbError().code;
322  if (code != NoSuchTable) {
323  setError(code, __LINE__);
324  return -1;
325  }
326  }
327  else
328  {
330  make_headtable(tab);
331  if (check_table(*sys.m_headtable, tab) == -1)
332  {
333  setError(BadSysTables, __LINE__);
334  return -1;
335  }
336  sys.m_obj_cnt++;
337  }
338 
339  sys.m_sampletable = dic->getTableGlobal(g_sampletable_name);
340  if (sys.m_sampletable == 0)
341  {
342  int code = dic->getNdbError().code;
343  if (code != NoSuchTable) {
344  setError(code, __LINE__);
345  return -1;
346  }
347  }
348  else
349  {
351  make_sampletable(tab);
352  if (check_table(*sys.m_sampletable, tab) == -1)
353  {
354  setError(BadSysTables, __LINE__);
355  return -1;
356  }
357  sys.m_obj_cnt++;
358  }
359 
360  if (sys.m_sampletable != 0)
361  {
362  sys.m_sampleindex1 = dic->getIndexGlobal(g_sampleindex1_name, *sys.m_sampletable);
363  if (sys.m_sampleindex1 == 0)
364  {
365  int code = dic->getNdbError().code;
366  if (code != NoSuchIndex) {
367  setError(code, __LINE__);
368  return -1;
369  }
370  }
371  else
372  {
374  make_sampleindex1(ind);
375  if (check_index(*sys.m_sampleindex1, ind) == -1)
376  {
377  setError(BadSysTables, __LINE__);
378  return -1;
379  }
380  sys.m_obj_cnt++;
381  }
382  }
383 
384  return 0;
385 }
386 
387 int
388 NdbIndexStatImpl::create_systables(Ndb* ndb)
389 {
390  Sys sys(this, ndb);
391 
392  if (get_systables(sys) == -1)
393  return -1;
394 
395  if (sys.m_obj_cnt == Sys::ObjCnt)
396  {
397  setError(HaveSysTables, __LINE__);
398  return -1;
399  }
400 
401  if (sys.m_obj_cnt != 0)
402  {
403  setError(BadSysTables, __LINE__);
404  return -1;
405  }
406 
407  NdbDictionary::Dictionary* const dic = sys.m_dic;
408 
409  if (dic->beginSchemaTrans() == -1)
410  {
411  setError(dic->getNdbError().code, __LINE__);
412  return -1;
413  }
414 
415  {
417  if (make_headtable(tab) == -1)
418  return -1;
419  if (dic->createTable(tab) == -1)
420  {
421  setError(dic->getNdbError().code, __LINE__);
422  return -1;
423  }
424 
425  sys.m_headtable = dic->getTableGlobal(tab.getName());
426  if (sys.m_headtable == 0)
427  {
428  setError(dic->getNdbError().code, __LINE__);
429  return -1;
430  }
431  }
432 
433  {
435  if (make_sampletable(tab) == -1)
436  return -1;
437 
438 #ifdef VM_TRACE
439  // test of schema trans
440  {
441  const char* p = NdbEnv_GetEnv("NDB_INDEX_STAT_ABORT_SYS_CREATE", (char*)0, 0);
442  if (p != 0 && strchr("1Y", p[0]) != 0)
443  {
444  setError(9999, __LINE__);
445  return -1;
446  }
447  }
448 #endif
449 
450  if (dic->createTable(tab) == -1)
451  {
452  setError(dic->getNdbError().code, __LINE__);
453  return -1;
454  }
455 
456  sys.m_sampletable = dic->getTableGlobal(tab.getName());
457  if (sys.m_sampletable == 0)
458  {
459  setError(dic->getNdbError().code, __LINE__);
460  return -1;
461  }
462  }
463 
464  {
466  if (make_sampleindex1(ind) == -1)
467  return -1;
468  if (dic->createIndex(ind, *sys.m_sampletable) == -1)
469  {
470  setError(dic->getNdbError().code, __LINE__);
471  return -1;
472  }
473 
474  sys.m_sampleindex1 = dic->getIndexGlobal(ind.getName(), sys.m_sampletable->getName());
475  if (sys.m_sampleindex1 == 0)
476  {
477  setError(dic->getNdbError().code, __LINE__);
478  return -1;
479  }
480  }
481 
482  if (dic->endSchemaTrans() == -1)
483  {
484  setError(dic->getNdbError().code, __LINE__);
485  return -1;
486  }
487 
488  return 0;
489 }
490 
491 int
492 NdbIndexStatImpl::drop_systables(Ndb* ndb)
493 {
494  Sys sys(this, ndb);
495 
496  if (get_systables(sys) == -1 &&
497  m_error.code != BadSysTables)
498  return -1;
499 
500  NdbDictionary::Dictionary* const dic = sys.m_dic;
501 
502  if (dic->beginSchemaTrans() == -1)
503  {
504  setError(dic->getNdbError().code, __LINE__);
505  return -1;
506  }
507 
508  if (sys.m_headtable != 0)
509  {
510  if (dic->dropTableGlobal(*sys.m_headtable) == -1)
511  {
512  setError(dic->getNdbError().code, __LINE__);
513  return -1;
514  }
515  }
516 
517  if (sys.m_sampletable != 0)
518  {
519 
520 #ifdef VM_TRACE
521  // test of schema trans
522  {
523  const char* p = NdbEnv_GetEnv("NDB_INDEX_STAT_ABORT_SYS_DROP", (char*)0, 0);
524  if (p != 0 && strchr("1Y", p[0]) != 0)
525  {
526  setError(9999, __LINE__);
527  return -1;
528  }
529  }
530 #endif
531 
532  if (dic->dropTableGlobal(*sys.m_sampletable) == -1)
533  {
534  setError(dic->getNdbError().code, __LINE__);
535  return -1;
536  }
537  }
538 
539  if (dic->endSchemaTrans() == -1)
540  {
541  setError(dic->getNdbError().code, __LINE__);
542  return -1;
543  }
544 
545  return 0;
546 }
547 
548 int
549 NdbIndexStatImpl::check_systables(Sys& sys)
550 {
551  if (get_systables(sys) == -1)
552  return -1;
553 
554  if (sys.m_obj_cnt == 0)
555  {
556  setError(NoSysTables, __LINE__);
557  return -1;
558  }
559 
560  if (sys.m_obj_cnt != Sys::ObjCnt)
561  {
562  setError(BadSysTables, __LINE__);
563  return -1;
564  }
565 
566  return 0;
567 }
568 
569 int
570 NdbIndexStatImpl::check_systables(Ndb* ndb)
571 {
572  Sys sys(this, ndb);
573 
574  if (check_systables(sys) == -1)
575  return -1;
576 
577  return 0;
578 }
579 
580 // operation context
581 
582 NdbIndexStatImpl::Con::Con(NdbIndexStatImpl* impl, Head& head, Ndb* ndb) :
583  m_impl(impl),
584  m_head(head),
585  m_ndb(ndb)
586 {
587  head.m_indexId = m_impl->m_indexId;
588  head.m_indexVersion = m_impl->m_indexVersion;
589  m_dic = m_ndb->getDictionary();
590  m_headtable = 0;
591  m_sampletable = 0;
592  m_sampleindex1 = 0;
593  m_tx = 0;
594  m_op = 0;
595  m_scanop = 0;
596  m_cacheBuild = 0;
597  m_cachePos = 0;
598  m_cacheKeyOffset = 0;
599  m_cacheValueOffset = 0;
600  m_start.seconds = 0;
601  m_start.micro_seconds = 0;
602 }
603 
604 NdbIndexStatImpl::Con::~Con()
605 {
606  if (m_cacheBuild != 0)
607  {
608  m_impl->free_cache(m_cacheBuild);
609  m_cacheBuild = 0;
610  }
611  if (m_tx != 0)
612  {
613  m_ndb->closeTransaction(m_tx);
614  m_tx = 0;
615  }
616  m_impl->sys_release(*this);
617 }
618 
619 int
620 NdbIndexStatImpl::Con::startTransaction()
621 {
622  assert(m_headtable != 0 && m_ndb != 0 && m_tx == 0);
623  Uint32 key[2] = {
624  m_head.m_indexId,
625  m_head.m_indexVersion
626  };
627  m_tx = m_ndb->startTransaction(m_headtable, (const char*)key, sizeof(key));
628  if (m_tx == 0)
629  return -1;
630  return 0;
631 }
632 
633 int
634 NdbIndexStatImpl::Con::execute(bool commit)
635 {
636  assert(m_tx != 0);
637  if (commit)
638  {
639  if (m_tx->execute(NdbTransaction::Commit) == -1)
640  return -1;
641  m_ndb->closeTransaction(m_tx);
642  m_tx = 0;
643  }
644  else
645  {
646  if (m_tx->execute(NdbTransaction::NoCommit) == -1)
647  return -1;
648  }
649  return 0;
650 }
651 
652 int
653 NdbIndexStatImpl::Con::getNdbOperation()
654 {
655  assert(m_headtable != 0);
656  assert(m_tx != 0 && m_op == 0);
657  m_op = m_tx->getNdbOperation(m_headtable);
658  if (m_op == 0)
659  return -1;
660  return 0;
661 }
662 
663 int
664 NdbIndexStatImpl::Con::getNdbIndexScanOperation()
665 {
666  assert(m_sampletable != 0 && m_sampleindex1 != 0);
667  assert( m_tx != 0 && m_scanop == 0);
668  m_scanop = m_tx->getNdbIndexScanOperation(m_sampleindex1, m_sampletable);
669  if (m_scanop == 0)
670  return -1;
671  return 0;
672 }
673 
674 void
675 NdbIndexStatImpl::Con::set_time()
676 {
677  NdbTick_getMicroTimer(&m_start);
678 }
679 
680 NDB_TICKS
681 NdbIndexStatImpl::Con::get_time()
682 {
683  MicroSecondTimer stop;
684  NdbTick_getMicroTimer(&stop);
685  NDB_TICKS us = NdbTick_getMicrosPassed(m_start, stop);
686  return us;
687 }
688 
689 // index
690 
691 int
692 NdbIndexStatImpl::set_index(const NdbDictionary::Index& index,
694 {
695  if (m_indexSet)
696  {
697  setError(UsageError, __LINE__);
698  return -1;
699  }
700  m_indexId = index.getObjectId();
701  m_indexVersion = index.getObjectVersion();
702  m_tableId = table.getObjectId();
703  m_keyAttrs = index.getNoOfColumns();
704  m_valueAttrs = 1 + m_keyAttrs;
705  if (m_keyAttrs == 0)
706  {
707  setError(InternalError, __LINE__);
708  return -1;
709  }
710  if (m_keyAttrs > MaxKeyCount)
711  {
712  setError(InternalError, __LINE__);
713  return -1;
714  }
715 
716  // spec buffers
717  m_keySpecBuf = new NdbPack::Type [m_keyAttrs];
718  m_valueSpecBuf = new NdbPack::Type [m_valueAttrs];
719  if (m_keySpecBuf == 0 || m_valueSpecBuf == 0)
720  {
721  setError(NoMemError, __LINE__);
722  return -1;
723  }
724  m_keySpec.set_buf(m_keySpecBuf, m_keyAttrs);
725  m_valueSpec.set_buf(m_valueSpecBuf, m_valueAttrs);
726 
727  // index key spec
728  {
729  for (uint i = 0; i < m_keyAttrs; i++)
730  {
731  const NdbDictionary::Column* icol = index.getColumn(i);
732  if (icol == 0)
733  {
734  setError(UsageError, __LINE__);
735  return -1;
736  }
738  icol->getType(),
739  icol->getSizeInBytes(),
740  icol->getNullable(),
741  icol->getCharset() != 0 ? icol->getCharset()->number : 0
742  );
743  if (m_keySpec.add(type) == -1)
744  {
745  setError(UsageError, __LINE__, m_keySpec.get_error_code());
746  return -1;
747  }
748  }
749  }
750  // stat values spec
751  {
752  NdbPack::Type type(NDB_TYPE_UNSIGNED, 4, false, 0);
753  // rir + rpk
754  if (m_valueSpec.add(type, m_valueAttrs) == -1)
755  {
756  setError(InternalError, __LINE__, m_valueSpec.get_error_code());
757  return -1;
758  }
759  }
760 
761  // data buffers (rounded to word)
762  m_keyDataBuf = new Uint8 [m_keyData.get_max_len4()];
763  m_valueDataBuf = new Uint8 [m_valueData.get_max_len4()];
764  if (m_keyDataBuf == 0 || m_valueDataBuf == 0)
765  {
766  setError(NoMemError, __LINE__);
767  return -1;
768  }
769  m_keyData.set_buf(m_keyDataBuf, m_keyData.get_max_len());
770  m_valueData.set_buf(m_valueDataBuf, m_valueData.get_max_len());
771 
772  m_indexSet = true;
773  return 0;
774 }
775 
776 void
777 NdbIndexStatImpl::reset_index()
778 {
779  free_cache();
780  m_keySpec.reset();
781  m_valueSpec.reset();
782  delete [] m_keySpecBuf;
783  delete [] m_valueSpecBuf;
784  delete [] m_keyDataBuf;
785  delete [] m_valueDataBuf;
786  init();
787 }
788 
789 // head
790 
791 void
792 NdbIndexStatImpl::init_head(Head& head)
793 {
794  head.m_found = -1;
795  head.m_eventType = -1;
796  head.m_indexId = 0;
797  head.m_indexVersion = 0;
798  head.m_tableId = 0;
799  head.m_fragCount = 0;
800  head.m_valueFormat = 0;
801  head.m_sampleVersion = 0;
802  head.m_loadTime = 0;
803  head.m_sampleCount = 0;
804  head.m_keyBytes = 0;
805 }
806 
807 // sys tables data
808 
809 int
810 NdbIndexStatImpl::sys_init(Con& con)
811 {
812  Ndb* ndb = con.m_ndb;
813  NdbDictionary::Dictionary* const dic = ndb->getDictionary();
814  sys_release(con);
815 
816  con.m_headtable = dic->getTableGlobal(g_headtable_name);
817  if (con.m_headtable == 0)
818  {
819  setError(con, __LINE__);
820  mapError(ERR_NoSuchObject, NoSysTables);
821  return -1;
822  }
823  con.m_sampletable = dic->getTableGlobal(g_sampletable_name);
824  if (con.m_sampletable == 0)
825  {
826  setError(con, __LINE__);
827  mapError(ERR_NoSuchObject, NoSysTables);
828  return -1;
829  }
830  con.m_sampleindex1 = dic->getIndexGlobal(g_sampleindex1_name, *con.m_sampletable);
831  if (con.m_sampleindex1 == 0)
832  {
833  setError(con, __LINE__);
834  mapError(ERR_NoSuchObject, NoSysTables);
835  return -1;
836  }
837  return 0;
838 }
839 
840 void
841 NdbIndexStatImpl::sys_release(Con& con)
842 {
843  if (con.m_headtable != 0)
844  {
845  con.m_dic->removeTableGlobal(*con.m_headtable, false);
846  con.m_headtable = 0;
847  }
848  if (con.m_sampletable != 0)
849  {
850  con.m_dic->removeTableGlobal(*con.m_sampletable, false);
851  con.m_sampletable = 0;
852  }
853  if (con.m_sampleindex1 != 0)
854  {
855  con.m_dic->removeIndexGlobal(*con.m_sampleindex1, false);
856  con.m_sampleindex1 = 0;
857  }
858 }
859 
860 int
861 NdbIndexStatImpl::sys_read_head(Con& con, bool commit)
862 {
863  Head& head = con.m_head;
864  head.m_sampleVersion = 0;
865  head.m_found = false;
866 
867  if (con.getNdbOperation() == -1)
868  {
869  setError(con, __LINE__);
870  return -1;
871  }
872  if (con.m_op->readTuple(NdbOperation::LM_Read) == -1)
873  {
874  setError(con, __LINE__);
875  return -1;
876  }
877  if (sys_head_setkey(con) == -1)
878  return -1;
879  if (sys_head_getvalue(con) == -1)
880  return -1;
881  if (con.execute(commit) == -1)
882  {
883  setError(con, __LINE__);
884  mapError(ERR_TupleNotFound, NoIndexStats);
885  return -1;
886  }
887  head.m_found = true;
888  if (head.m_sampleVersion == 0)
889  {
890  setError(NoIndexStats, __LINE__);
891  return -1;
892  }
893  return 0;
894 }
895 
896 int
897 NdbIndexStatImpl::sys_head_setkey(Con& con)
898 {
899  Head& head = con.m_head;
900  NdbOperation* op = con.m_op;
901  if (op->equal("index_id", (char*)&head.m_indexId) == -1)
902  {
903  setError(con, __LINE__);
904  return -1;
905  }
906  if (op->equal("index_version", (char*)&head.m_indexVersion) == -1)
907  {
908  setError(con, __LINE__);
909  return -1;
910  }
911  return 0;
912 }
913 
914 int
915 NdbIndexStatImpl::sys_head_getvalue(Con& con)
916 {
917  Head& head = con.m_head;
918  NdbOperation* op = con.m_op;
919  if (op->getValue("table_id", (char*)&head.m_tableId) == 0)
920  {
921  setError(con, __LINE__);
922  return -1;
923  }
924  if (op->getValue("frag_count", (char*)&head.m_fragCount) == 0)
925  {
926  setError(con, __LINE__);
927  return -1;
928  }
929  if (op->getValue("value_format", (char*)&head.m_valueFormat) == 0)
930  {
931  setError(con, __LINE__);
932  return -1;
933  }
934  if (op->getValue("sample_version", (char*)&head.m_sampleVersion) == 0)
935  {
936  setError(con, __LINE__);
937  return -1;
938  }
939  if (op->getValue("load_time", (char*)&head.m_loadTime) == 0)
940  {
941  setError(con, __LINE__);
942  return -1;
943  }
944  if (op->getValue("sample_count", (char*)&head.m_sampleCount) == 0)
945  {
946  setError(con, __LINE__);
947  return -1;
948  }
949  if (op->getValue("key_bytes", (char*)&head.m_keyBytes) == 0)
950  {
951  setError(con, __LINE__);
952  return -1;
953  }
954  return 0;
955 }
956 
957 int
958 NdbIndexStatImpl::sys_sample_setkey(Con& con)
959 {
960  Head& head = con.m_head;
961  NdbIndexScanOperation* op = con.m_scanop;
962  if (op->equal("index_id", (char*)&head.m_indexId) == -1)
963  {
964  setError(con, __LINE__);
965  return -1;
966  }
967  if (op->equal("index_version", (char*)&head.m_indexVersion) == -1)
968  {
969  setError(con, __LINE__);
970  return -1;
971  }
972  if (op->equal("sample_version", (char*)&head.m_sampleVersion) == -1)
973  {
974  setError(con, __LINE__);
975  return -1;
976  }
977  if (op->equal("stat_key", (char*)m_keyData.get_full_buf()) == -1)
978  {
979  setError(con, __LINE__);
980  return -1;
981  }
982  return 0;
983 }
984 
985 int
986 NdbIndexStatImpl::sys_sample_getvalue(Con& con)
987 {
988  NdbIndexScanOperation* op = con.m_scanop;
989  if (op->getValue("stat_key", (char*)m_keyData.get_full_buf()) == 0)
990  {
991  setError(con, __LINE__);
992  return -1;
993  }
994  if (op->getValue("stat_value", (char*)m_valueData.get_full_buf()) == 0)
995  {
996  setError(con, __LINE__);
997  return -1;
998  }
999  return 0;
1000 }
1001 
1002 int
1003 NdbIndexStatImpl::sys_sample_setbound(Con& con, int sv_bound)
1004 {
1005  Head& head = con.m_head;
1006  NdbIndexScanOperation* op = con.m_scanop;
1007  const NdbIndexScanOperation::BoundType eq_bound =
1009 
1010  if (op->setBound("index_id", eq_bound, &head.m_indexId) == -1)
1011  {
1012  setError(con, __LINE__);
1013  return -1;
1014  }
1015  if (op->setBound("index_version", eq_bound, &head.m_indexVersion) == -1)
1016  {
1017  setError(con, __LINE__);
1018  return -1;
1019  }
1020  if (sv_bound != -1)
1021  {
1022  if (op->setBound("sample_version", sv_bound, &head.m_sampleVersion) == -1)
1023  {
1024  setError(con, __LINE__);
1025  return -1;
1026  }
1027  }
1028  return 0;
1029 }
1030 
1031 // update, delete
1032 
1033 int
1034 NdbIndexStatImpl::update_stat(Ndb* ndb, Head& head)
1035 {
1036  Con con(this, head, ndb);
1037  if (con.m_dic->updateIndexStat(m_indexId, m_indexVersion, m_tableId) == -1)
1038  {
1039  setError(con, __LINE__);
1040  return -1;
1041  }
1042  return 0;
1043 }
1044 
1045 int
1046 NdbIndexStatImpl::delete_stat(Ndb* ndb, Head& head)
1047 {
1048  Con con(this, head, ndb);
1049  if (con.m_dic->deleteIndexStat(m_indexId, m_indexVersion, m_tableId) == -1)
1050  {
1051  setError(con, __LINE__);
1052  return -1;
1053  }
1054  return 0;
1055 }
1056 
1057 // read
1058 
1059 int
1060 NdbIndexStatImpl::read_head(Ndb* ndb, Head& head)
1061 {
1062  Con con(this, head, ndb);
1063  if (!m_indexSet)
1064  {
1065  setError(UsageError, __LINE__);
1066  return -1;
1067  }
1068  if (sys_init(con) == -1)
1069  return -1;
1070  if (con.startTransaction() == -1)
1071  {
1072  setError(con, __LINE__);
1073  return -1;
1074  }
1075  if (sys_read_head(con, true) == -1)
1076  return -1;
1077  return 0;
1078 }
1079 
1080 int
1081 NdbIndexStatImpl::read_stat(Ndb* ndb, Head& head)
1082 {
1083  Con con(this, head, ndb);
1084  con.set_time();
1085 
1086  if (read_start(con) == -1)
1087  return -1;
1088  if (save_start(con) == -1)
1089  return -1;
1090  while (1)
1091  {
1092  int ret = read_next(con);
1093  if (ret == -1)
1094  return -1;
1095  if (ret != 0)
1096  break;
1097  if (save_next(con) == -1)
1098  return -1;
1099  }
1100  if (read_commit(con) == -1)
1101  return -1;
1102 
1103  NDB_TICKS save_time = con.get_time();
1104  con.set_time();
1105 
1106  if (save_commit(con) == -1)
1107  return -1;
1108  NDB_TICKS sort_time = con.get_time();
1109 
1110  const Cache& c = *m_cacheBuild;
1111  c.m_save_time = save_time;
1112  c.m_sort_time = sort_time;
1113  return 0;
1114 }
1115 
1116 int
1117 NdbIndexStatImpl::read_start(Con& con)
1118 {
1119  //UNUSED Head& head = con.m_head;
1120  if (!m_indexSet)
1121  {
1122  setError(UsageError, __LINE__);
1123  return -1;
1124  }
1125  if (sys_init(con) == -1)
1126  return -1;
1127  if (con.startTransaction() == -1)
1128  {
1129  setError(con, __LINE__);
1130  return -1;
1131  }
1132  if (sys_read_head(con, false) == -1)
1133  return -1;
1134  if (con.getNdbIndexScanOperation() == -1)
1135  {
1136  setError(con, __LINE__);
1137  return -1;
1138  }
1139  if (con.m_scanop->readTuples(NdbOperation::LM_CommittedRead, 0) == -1)
1140  {
1141  setError(con, __LINE__);
1142  return -1;
1143  }
1144  if (sys_sample_setbound(con, NdbIndexScanOperation::BoundEQ) == -1)
1145  return -1;
1146  if (sys_sample_getvalue(con) == -1)
1147  return -1;
1148  if (con.execute(false) == -1)
1149  {
1150  setError(con, __LINE__);
1151  return -1;
1152  }
1153  return 0;
1154 }
1155 
1156 int
1157 NdbIndexStatImpl::read_next(Con& con)
1158 {
1159  m_keyData.reset();
1160  m_valueData.reset();
1161  int ret = con.m_scanop->nextResult();
1162  if (ret != 0)
1163  {
1164  if (ret == -1)
1165  setError(con, __LINE__);
1166  return ret;
1167  }
1168 
1169  /*
1170  * Key and value are raw data and little-endian. Create the complete
1171  * NdbPack::Data instance and convert it to native-endian.
1172  */
1173  const NdbPack::Endian::Value from_endian = NdbPack::Endian::Little;
1174  const NdbPack::Endian::Value to_endian = NdbPack::Endian::Native;
1175 
1176  if (m_keyData.desc_all(m_keyAttrs, from_endian) == -1)
1177  {
1178  setError(InternalError, __LINE__, m_keyData.get_error_code());
1179  return -1;
1180  }
1181  if (m_keyData.convert(to_endian) == -1)
1182  {
1183  setError(InternalError, __LINE__, m_keyData.get_error_code());
1184  return -1;
1185  }
1186  if (m_valueData.desc_all(m_valueAttrs, from_endian) == -1)
1187  {
1188  setError(InternalError, __LINE__, m_valueData.get_error_code());
1189  return -1;
1190  }
1191  if (m_valueData.convert(to_endian) == -1)
1192  {
1193  setError(InternalError, __LINE__, m_valueData.get_error_code());
1194  return -1;
1195  }
1196  return 0;
1197 }
1198 
1199 int
1200 NdbIndexStatImpl::read_commit(Con& con)
1201 {
1202  if (con.execute(true) == -1)
1203  {
1204  setError(con, __LINE__);
1205  return -1;
1206  }
1207  return 0;
1208 }
1209 
1210 // save
1211 
1212 int
1213 NdbIndexStatImpl::save_start(Con& con)
1214 {
1215  if (m_cacheBuild != 0)
1216  {
1217  free_cache(m_cacheBuild);
1218  m_cacheBuild = 0;
1219  }
1220  con.m_cacheBuild = new Cache;
1221  if (con.m_cacheBuild == 0)
1222  {
1223  setError(NoMemError, __LINE__);
1224  return -1;
1225  }
1226  new (con.m_cacheBuild) Cache;
1227  if (cache_init(con) == -1)
1228  return -1;
1229  return 0;
1230 }
1231 
1232 int
1233 NdbIndexStatImpl::save_next(Con& con)
1234 {
1235  if (cache_insert(con) == -1)
1236  return -1;
1237  return 0;
1238 }
1239 
1240 int
1241 NdbIndexStatImpl::save_commit(Con& con)
1242 {
1243  if (cache_commit(con) == -1)
1244  return -1;
1245  m_cacheBuild = con.m_cacheBuild;
1246  con.m_cacheBuild = 0;
1247  return 0;
1248 }
1249 
1250 // cache inline
1251 
1252 inline uint
1253 NdbIndexStatImpl::Cache::get_keyaddr(uint pos) const
1254 {
1255  assert(pos < m_sampleCount);
1256  const uint offset = pos * m_addrLen;
1257  assert(offset + m_addrLen <= m_addrBytes);
1258  const Uint8* src = &m_addrArray[offset];
1259  uint addr = 0;
1260  switch (m_addrLen) {
1261  case 4:
1262  addr += src[3] << 24;
1263  case 3:
1264  addr += src[2] << 16;
1265  case 2:
1266  addr += src[1] << 8;
1267  case 1:
1268  addr += src[0] << 0;
1269  break;
1270  default:
1271  assert(false);
1272  }
1273  return addr;
1274 }
1275 
1276 inline void
1277 NdbIndexStatImpl::Cache::set_keyaddr(uint pos, uint addr)
1278 {
1279  assert(pos < m_sampleCount);
1280  const uint offset = pos * m_addrLen;
1281  assert(offset + m_addrLen <= m_addrBytes);
1282  Uint8* dst = &m_addrArray[offset];
1283  switch (m_addrLen) {
1284  case 4:
1285  dst[3] = (addr >> 24) & 0xFF;
1286  case 3:
1287  dst[2] = (addr >> 16) & 0xFF;
1288  case 2:
1289  dst[1] = (addr >> 8) & 0xFF;
1290  case 1:
1291  dst[0] = (addr >> 0) & 0xFF;
1292  break;
1293  default:
1294  assert(false);
1295  }
1296  assert(get_keyaddr(pos) == addr);
1297 }
1298 
1299 inline const Uint8*
1300 NdbIndexStatImpl::Cache::get_keyptr(uint addr) const
1301 {
1302  assert(addr < m_keyBytes);
1303  return &m_keyArray[addr];
1304 }
1305 
1306 inline Uint8*
1307 NdbIndexStatImpl::Cache::get_keyptr(uint addr)
1308 {
1309  assert(addr < m_keyBytes);
1310  return &m_keyArray[addr];
1311 }
1312 
1313 inline const Uint8*
1314 NdbIndexStatImpl::Cache::get_valueptr(uint pos) const
1315 {
1316  assert(pos < m_sampleCount);
1317  return &m_valueArray[pos * m_valueLen];
1318 }
1319 
1320 inline Uint8*
1321 NdbIndexStatImpl::Cache::get_valueptr(uint pos)
1322 {
1323  assert(pos < m_sampleCount);
1324  return &m_valueArray[pos * m_valueLen];
1325 }
1326 
1327 inline void
1328 NdbIndexStatImpl::Cache::swap_entry(uint pos1, uint pos2)
1329 {
1330  uint hold_addr;
1331  Uint8 hold_value[MaxValueBytes];
1332 
1333  hold_addr = get_keyaddr(pos1);
1334  memcpy(hold_value, get_valueptr(pos1), m_valueLen);
1335  set_keyaddr(pos1, get_keyaddr(pos2));
1336  memcpy(get_valueptr(pos1), get_valueptr(pos2), m_valueLen);
1337  set_keyaddr(pos2, hold_addr);
1338  memcpy(get_valueptr(pos2), hold_value, m_valueLen);
1339 }
1340 
1341 inline double
1342 NdbIndexStatImpl::Cache::get_rir(uint pos) const
1343 {
1344  const Uint8* ptr = get_valueptr(pos);
1345  Uint32 n;
1346  memcpy(&n, &ptr[0], 4);
1347  double x = (double)m_fragCount * (double)n;
1348  return x;
1349 }
1350 
1351 inline double
1352 NdbIndexStatImpl::Cache::get_rir(uint pos1, uint pos2) const
1353 {
1354  assert(pos2 > pos1);
1355  return get_rir(pos2) - get_rir(pos1);
1356 }
1357 
1358 inline double
1359 NdbIndexStatImpl::Cache::get_unq(uint pos, uint k) const
1360 {
1361  assert(k < m_keyAttrs);
1362  const Uint8* ptr = get_valueptr(pos);
1363  Uint32 n;
1364  memcpy(&n, &ptr[4 + k * 4], 4);
1365  double x = (double)m_fragCount * (double)n;
1366  return x;
1367 }
1368 
1369 inline double
1370 NdbIndexStatImpl::Cache::get_unq(uint pos1, uint pos2, uint k) const
1371 {
1372  assert(pos2 > pos1);
1373  return get_unq(pos2, k) - get_unq(pos1, k);
1374 }
1375 
1376 inline double
1377 NdbIndexStatImpl::Cache::get_rpk(uint pos, uint k) const
1378 {
1379  return get_rir(pos) / get_unq(pos, k);
1380 }
1381 
1382 inline double
1383 NdbIndexStatImpl::Cache::get_rpk(uint pos1, uint pos2, uint k) const
1384 {
1385  assert(pos2 > pos1);
1386  return get_rir(pos1, pos2) / get_unq(pos1, pos2, k);
1387 }
1388 
1389 // cache
1390 
1391 NdbIndexStatImpl::Cache::Cache()
1392 {
1393  m_valid = false;
1394  m_keyAttrs = 0;
1395  m_valueAttrs = 0;
1396  m_fragCount = 0;
1397  m_sampleVersion = 0;
1398  m_sampleCount = 0;
1399  m_keyBytes = 0;
1400  m_valueLen = 0;
1401  m_valueBytes = 0;
1402  m_addrLen = 0;
1403  m_addrBytes = 0;
1404  m_addrArray = 0;
1405  m_keyArray = 0;
1406  m_valueArray = 0;
1407  m_nextClean = 0;
1408  // performance
1409  m_save_time = 0;
1410  m_sort_time = 0;
1411 }
1412 
1413 int
1414 NdbIndexStatImpl::cache_init(Con& con)
1415 {
1416  Cache& c = *con.m_cacheBuild;
1417  Head& head = con.m_head;
1418  Mem* mem = m_mem_handler;
1419 
1420  if (m_keyAttrs == 0)
1421  {
1422  setError(InternalError, __LINE__);
1423  return -1;
1424  }
1425  c.m_keyAttrs = m_keyAttrs;
1426  c.m_valueAttrs = m_valueAttrs;
1427  c.m_fragCount = head.m_fragCount;
1428  c.m_sampleCount = head.m_sampleCount;
1429  c.m_keyBytes = head.m_keyBytes;
1430  c.m_valueLen = 4 + c.m_keyAttrs * 4;
1431  c.m_valueBytes = c.m_sampleCount * c.m_valueLen;
1432  c.m_addrLen =
1433  c.m_keyBytes < (1 << 8) ? 1 :
1434  c.m_keyBytes < (1 << 16) ? 2 :
1435  c.m_keyBytes < (1 << 24) ? 3 : 4;
1436  c.m_addrBytes = c.m_sampleCount * c.m_addrLen;
1437 
1438  // wl4124_todo omit addrArray if keys have fixed size
1439  c.m_addrArray = (Uint8*)mem->mem_alloc(c.m_addrBytes);
1440  if (c.m_addrArray == 0)
1441  {
1442  setError(NoMemError, __LINE__);
1443  return -1;
1444  }
1445  c.m_keyArray = (Uint8*)mem->mem_alloc(c.m_keyBytes);
1446  if (c.m_keyArray == 0)
1447  {
1448  setError(NoMemError, __LINE__);
1449  return -1;
1450  }
1451  c.m_valueArray = (Uint8*)mem->mem_alloc(c.m_valueBytes);
1452  if (c.m_valueArray == 0)
1453  {
1454  setError(NoMemError, __LINE__);
1455  return -1;
1456  }
1457  return 0;
1458 }
1459 
1460 int
1461 NdbIndexStatImpl::cache_insert(Con& con)
1462 {
1463  Cache& c = *con.m_cacheBuild;
1464 
1465  const uint nextPos = con.m_cachePos + 1;
1466  if (nextPos > c.m_sampleCount)
1467  {
1468  setError(InternalError, __LINE__);
1469  return -1;
1470  }
1471  assert(m_keyData.is_full());
1472  const uint keyLen = m_keyData.get_data_len();
1473  const uint nextKeyOffset = con.m_cacheKeyOffset + keyLen;
1474  if (nextKeyOffset > c.m_keyBytes)
1475  {
1476  setError(InternalError, __LINE__);
1477  return -1;
1478  }
1479  if (m_valueData.get_data_len() != c.m_valueLen)
1480  {
1481  setError(InternalError, __LINE__);
1482  return -1;
1483  }
1484  const uint nextValueOffset = con.m_cacheValueOffset + c.m_valueLen;
1485  if (nextValueOffset > c.m_valueBytes)
1486  {
1487  setError(InternalError, __LINE__);
1488  return -1;
1489  }
1490 
1491  c.set_keyaddr(con.m_cachePos, con.m_cacheKeyOffset);
1492  con.m_cachePos = nextPos;
1493 
1494  Uint8* cacheKeyPtr = &c.m_keyArray[con.m_cacheKeyOffset];
1495  const Uint8* keyPtr = (const Uint8*)m_keyData.get_data_buf();
1496  memcpy(cacheKeyPtr, keyPtr, keyLen);
1497  con.m_cacheKeyOffset = nextKeyOffset;
1498 
1499  Uint8* cacheValuePtr = &c.m_valueArray[con.m_cacheValueOffset];
1500  const Uint8* valuePtr = (const Uint8*)m_valueData.get_data_buf();
1501  memcpy(cacheValuePtr, valuePtr, c.m_valueLen);
1502  con.m_cacheValueOffset = nextValueOffset;
1503 
1504  // verify sanity
1505  {
1506  const Uint8* rir_ptr = &cacheValuePtr[0];
1507  Uint32 rir;
1508  memcpy(&rir, rir_ptr, 4);
1509  if (!(rir != 0))
1510  {
1511  setError(InvalidCache, __LINE__);
1512  return -1;
1513  }
1514  Uint32 unq_prev = 0;
1515  for (uint k = 0; k < c.m_keyAttrs; k++)
1516  {
1517  Uint8* unq_ptr = &cacheValuePtr[4 + k * 4];
1518  Uint32 unq;
1519  memcpy(&unq, unq_ptr, 4);
1520  if (!(unq != 0))
1521  {
1522  setError(InvalidCache, __LINE__);
1523  return -1;
1524  }
1525  if (!(rir >= unq))
1526  {
1527  setError(InvalidCache, __LINE__);
1528  return -1;
1529  }
1530  if (!(unq >= unq_prev))
1531  {
1532  setError(InvalidCache, __LINE__);
1533  return -1;
1534  }
1535  unq_prev = unq;
1536  }
1537  }
1538  return 0;
1539 }
1540 
1541 int
1542 NdbIndexStatImpl::cache_commit(Con& con)
1543 {
1544  Cache& c = *con.m_cacheBuild;
1545  Head& head = con.m_head;
1546  if (con.m_cachePos != c.m_sampleCount)
1547  {
1548  setError(InternalError, __LINE__);
1549  return -1;
1550  }
1551  if (con.m_cacheKeyOffset != c.m_keyBytes)
1552  {
1553  setError(InternalError, __LINE__);
1554  return -1;
1555  }
1556  if (con.m_cacheValueOffset != c.m_valueBytes)
1557  {
1558  setError(InternalError, __LINE__);
1559  return -1;
1560  }
1561  c.m_sampleVersion = head.m_sampleVersion;
1562  if (cache_sort(c) == -1)
1563  return -1;
1564  if (cache_verify(c) == -1)
1565  return -1;
1566  c.m_valid = true;
1567  return 0;
1568 }
1569 
1570 int
1571 NdbIndexStatImpl::cache_cmpaddr(const Cache& c, uint addr1, uint addr2) const
1572 {
1573  const Uint8* key1 = c.get_keyptr(addr1);
1574  const Uint8* key2 = c.get_keyptr(addr2);
1575 
1576  NdbPack::DataC keyData1(m_keySpec, false);
1577  NdbPack::DataC keyData2(m_keySpec, false);
1578  keyData1.set_buf(key1, c.m_keyBytes - addr1, c.m_keyAttrs);
1579  keyData2.set_buf(key2, c.m_keyBytes - addr2, c.m_keyAttrs);
1580 
1581  Uint32 num_eq;
1582  int res = keyData1.cmp(keyData2, c.m_keyAttrs, num_eq);
1583  assert(addr1 == addr2 || res != 0);
1584  return res;
1585 }
1586 
1587 int
1588 NdbIndexStatImpl::cache_cmppos(const Cache& c, uint pos1, uint pos2) const
1589 {
1590  uint addr1 = c.get_keyaddr(pos1);
1591  uint addr2 = c.get_keyaddr(pos2);
1592  return cache_cmpaddr(c, addr1, addr2);
1593 }
1594 
1595 /*
1596  * Sort addr and value arrays via key values. The samples were inserted
1597  * in key order and were read back via index scan so they may be nearly
1598  * ordered at first. This is quicksort worst case so we do not use it.
1599  */
1600 int
1601 NdbIndexStatImpl::cache_sort(Cache& c)
1602 {
1603  if (c.m_sampleCount > 1)
1604  cache_hsort(c);
1605  return 0;
1606 }
1607 
1608 // insertion sort - expensive
1609 void
1610 NdbIndexStatImpl::cache_isort(Cache& c)
1611 {
1612  int n = c.m_sampleCount;
1613  for (int i = 1; i < n; i++)
1614  {
1615  for (int j = i - 1; j >= 0; j--)
1616  {
1617  int res = cache_cmppos(c, j, j + 1);
1618  if (res < 0)
1619  break;
1620  c.swap_entry(j, j + 1);
1621  }
1622  }
1623 }
1624 
1625 // heapsort
1626 void
1627 NdbIndexStatImpl::cache_hsort(Cache& c)
1628 {
1629  int count = c.m_sampleCount;
1630  int i;
1631 
1632  // highest entry which can have children
1633  i = count / 2;
1634 
1635  // make into heap (binary tree where child < parent)
1636  while (i >= 0)
1637  {
1638  cache_hsort_sift(c, i, count);
1639  i--;
1640  }
1641 
1642  // verify is too expensive to enable under VM_TRACE
1643 
1644 #ifdef ndb_index_stat_hsort_verify
1645  cache_hsort_verify(c, count);
1646 #endif
1647 
1648  // sort
1649  i = count - 1;
1650  while (i > 0)
1651  {
1652  // move current max to proper position
1653  c.swap_entry(0, i);
1654 
1655  // restore heap property for the rest
1656  cache_hsort_sift(c, 0, i);
1657 #ifdef ndb_index_stat_hsort_verify
1658  cache_hsort_verify(c, i);
1659 #endif
1660  i--;
1661  }
1662 }
1663 
1664 void
1665 NdbIndexStatImpl::cache_hsort_sift(Cache& c, int i, int count)
1666 {
1667  int parent = i;
1668 
1669  while (1)
1670  {
1671  // left child if any
1672  int child = parent * 2 + 1;
1673  if (! (child < count))
1674  break;
1675 
1676  // replace by right child if bigger
1677  if (child + 1 < count && cache_cmppos(c, child, child + 1) < 0)
1678  child = child + 1;
1679 
1680  // done if both children are less than parent
1681  if (cache_cmppos(c, child, parent) < 0)
1682  break;
1683 
1684  c.swap_entry(parent, child);
1685  parent = child;
1686  }
1687 }
1688 
1689 // verify heap property
1690 void
1691 NdbIndexStatImpl::cache_hsort_verify(Cache& c, int count)
1692 {
1693  for (int i = 0; i < count; i++)
1694  {
1695  int parent = i;
1696  int child1 = 2 * i + 1;
1697  int child2 = 2 * i + 2;
1698  if (child1 < count)
1699  {
1700  assert(cache_cmppos(c, child1, parent) < 0);
1701  }
1702  if (child2 < count)
1703  {
1704  assert(cache_cmppos(c, child2, parent) < 0);
1705  }
1706  }
1707 }
1708 
1709 int
1710 NdbIndexStatImpl::cache_verify(const Cache& c)
1711 {
1712  for (uint pos1 = 0; pos1 < c.m_sampleCount; pos1++)
1713  {
1714  const uint addr1 = c.get_keyaddr(pos1);
1715  const Uint8* key1 = c.get_keyptr(addr1);
1716  NdbPack::DataC keyData1(m_keySpec, false);
1717  keyData1.set_buf(key1, c.m_keyBytes - addr1, c.m_keyAttrs);
1718  uint pos2 = pos1 + 1;
1719  if (pos2 < c.m_sampleCount)
1720  {
1721  const uint addr2 = c.get_keyaddr(pos2);
1722  const Uint8* key2 = c.get_keyptr(addr2);
1723  NdbPack::DataC keyData2(m_keySpec, false);
1724  keyData2.set_buf(key2, c.m_keyBytes - addr2, c.m_keyAttrs);
1725  Uint32 num_eq;
1726  int res = keyData1.cmp(keyData2, c.m_keyAttrs, num_eq);
1727  if (!(res < 0))
1728  {
1729  setError(InvalidCache, __LINE__);
1730  return -1;
1731  }
1732  const Uint8* ptr1 = c.get_valueptr(pos1);
1733  const Uint8* ptr2 = c.get_valueptr(pos2);
1734  Uint32 rir1;
1735  Uint32 rir2;
1736  memcpy(&rir1, &ptr1[0], 4);
1737  memcpy(&rir2, &ptr2[0], 4);
1738  if (!(rir1 < rir2))
1739  {
1740  setError(InvalidCache, __LINE__);
1741  return -1;
1742  }
1743  for (uint k = 0; k < c.m_keyAttrs; k++)
1744  {
1745  Uint32 unq1;
1746  Uint32 unq2;
1747  memcpy(&unq1, &ptr1[4 + k * 4], 4);
1748  memcpy(&unq2, &ptr2[4 + k * 4], 4);
1749  if (!(unq1 <= unq2))
1750  {
1751  setError(InvalidCache, __LINE__);
1752  return -1;
1753  }
1754  if (k == c.m_keyAttrs - 1 && !(unq1 < unq2))
1755  {
1756  setError(InvalidCache, __LINE__);
1757  return -1;
1758  }
1759  }
1760  }
1761  }
1762  return 0;
1763 }
1764 
1765 void
1766 NdbIndexStatImpl::move_cache()
1767 {
1768  Cache* cacheTmp = m_cacheQuery;
1769 
1770  NdbMutex_Lock(m_query_mutex);
1771  m_cacheQuery = m_cacheBuild;
1772  NdbMutex_Unlock(m_query_mutex);
1773  m_cacheBuild = 0;
1774 
1775  if (cacheTmp != 0)
1776  {
1777  cacheTmp->m_nextClean = m_cacheClean;
1778  m_cacheClean = cacheTmp;
1779  }
1780 }
1781 
1782 void
1783 NdbIndexStatImpl::clean_cache()
1784 {
1785  while (m_cacheClean != 0)
1786  {
1787  NdbIndexStatImpl::Cache* tmp = m_cacheClean;
1788  m_cacheClean = tmp->m_nextClean;
1789  free_cache(tmp);
1790  }
1791 }
1792 
1793 void
1794 NdbIndexStatImpl::free_cache(Cache* c)
1795 {
1796  Mem* mem = m_mem_handler;
1797  mem->mem_free(c->m_addrArray);
1798  mem->mem_free(c->m_keyArray);
1799  mem->mem_free(c->m_valueArray);
1800  delete c;
1801 }
1802 
1803 void
1804 NdbIndexStatImpl::free_cache()
1805 {
1806  // twice to move all to clean list
1807  move_cache();
1808  move_cache();
1809  clean_cache();
1810 }
1811 
1812 // cache dump
1813 
1814 NdbIndexStatImpl::CacheIter::CacheIter(const NdbIndexStatImpl& impl) :
1815  m_keyData(impl.m_keySpec, false),
1816  m_valueData(impl.m_valueSpec, false)
1817 {
1818  m_keyCount = impl.m_keyAttrs;
1819  m_sampleCount = 0;
1820  m_sampleIndex = 0;
1821 }
1822 
1823 int
1824 NdbIndexStatImpl::dump_cache_start(CacheIter& iter)
1825 {
1826  if (m_cacheQuery == 0)
1827  {
1828  setError(UsageError, __LINE__);
1829  return -1;
1830  }
1831  const Cache& c = *m_cacheQuery;
1832  new (&iter) CacheIter(*this);
1833  iter.m_sampleCount = c.m_sampleCount;
1834  iter.m_sampleIndex = ~(Uint32)0;
1835  return 0;
1836 }
1837 
1838 bool
1839 NdbIndexStatImpl::dump_cache_next(CacheIter& iter)
1840 {
1841  if (iter.m_sampleIndex == ~(Uint32)0)
1842  iter.m_sampleIndex = 0;
1843  else
1844  iter.m_sampleIndex++;
1845  if (iter.m_sampleIndex >= iter.m_sampleCount)
1846  return false;
1847  const Cache& c = *m_cacheQuery;
1848  const uint pos = iter.m_sampleIndex;
1849  const uint addr = c.get_keyaddr(pos);
1850  const Uint8* key = c.get_keyptr(addr);
1851  const Uint8* value = c.get_valueptr(pos);
1852  iter.m_keyData.set_buf(key, c.m_keyBytes - addr, c.m_keyAttrs);
1853  iter.m_valueData.set_buf(value, c.m_valueLen, c.m_valueAttrs);
1854  return true;
1855 }
1856 
1857 // bound
1858 
1859 int
1860 NdbIndexStatImpl::finalize_bound(Bound& bound)
1861 {
1862  assert(bound.m_type == 0 || bound.m_type == 1);
1863  int side = 0;
1864  if (bound.m_data.get_cnt() == 0)
1865  {
1866  if (bound.m_strict != -1)
1867  {
1868  setError(UsageError, __LINE__);
1869  return -1;
1870  }
1871  }
1872  else
1873  {
1874  if (bound.m_strict == -1)
1875  {
1876  setError(UsageError, __LINE__);
1877  return -1;
1878  }
1879  if (bound.m_type == 0)
1880  side = bound.m_strict ? +1 : -1;
1881  else
1882  side = bound.m_strict ? -1 : +1;
1883  }
1884  if (bound.m_bound.finalize(side) == -1)
1885  {
1886  setError(UsageError, __LINE__);
1887  return -1;
1888  }
1889  return 0;
1890 }
1891 
1892 // range
1893 
1894 int
1895 NdbIndexStatImpl::convert_range(Range& range,
1896  const NdbRecord* key_record,
1898 {
1899  if (ib == 0)
1900  return 0;
1901  if (ib->low_key_count == 0 && ib->high_key_count == 0)
1902  return 0;
1903  for (uint j = 0; j <= 1; j++)
1904  {
1905  Bound& bound = j == 0 ? range.m_bound1 : range.m_bound2;
1906  bound.m_bound.reset();
1907  const char* key = j == 0 ? ib->low_key : ib->high_key;
1908  const uint key_count = j == 0 ? ib->low_key_count : ib->high_key_count;
1909  const bool inclusive = j == 0 ? ib->low_inclusive : ib->high_inclusive;
1910  Uint32 len_out;
1911  for (uint i = 0; i < key_count; i++)
1912  {
1913  const uint i2 = key_record->key_indexes[i];
1914  require(i2 < key_record->noOfColumns);
1915  const NdbRecord::Attr& attr = key_record->columns[i2];
1916  if (!attr.is_null(key))
1917  {
1918  const char* data = key + attr.offset;
1919  char buf[256];
1920  if (attr.flags & NdbRecord::IsMysqldShrinkVarchar)
1921  {
1922  Uint32 len;
1923  if (!attr.shrink_varchar(key, len, buf))
1924  {
1925  setError(InternalError, __LINE__);
1926  return -1;
1927  }
1928  data = buf;
1929  }
1930  if (bound.m_data.add(data, &len_out) == -1)
1931  {
1932  setError(InternalError, __LINE__, bound.m_data.get_error_code());
1933  return -1;
1934  }
1935  }
1936  else
1937  {
1938  if (bound.m_data.add_null(&len_out) == -1)
1939  {
1940  setError(InternalError, __LINE__, bound.m_data.get_error_code());
1941  return -1;
1942  }
1943  }
1944  }
1945  if (key_count > 0)
1946  bound.m_strict = !inclusive;
1947  if (finalize_bound(bound) == -1)
1948  {
1949  setError(InternalError, __LINE__);
1950  return -1;
1951  }
1952  }
1953  return 0;
1954 }
1955 
1956 // query
1957 
1958 // normalize values to >= 1.0
1959 void
1960 NdbIndexStatImpl::query_normalize(const Cache& c, StatValue& value)
1961 {
1962  if (!value.m_empty)
1963  {
1964  if (value.m_rir < 1.0)
1965  value.m_rir = 1.0;
1966  for (uint k = 0; k < c.m_keyAttrs; k++)
1967  {
1968  if (value.m_unq[k] < 1.0)
1969  value.m_unq[k] = 1.0;
1970  }
1971  }
1972  else
1973  {
1974  value.m_rir = 1.0;
1975  for (uint k = 0; k < c.m_keyAttrs; k++)
1976  value.m_unq[k] = 1.0;
1977  }
1978 }
1979 
1980 int
1981 NdbIndexStatImpl::query_stat(const Range& range, Stat& stat)
1982 {
1983  NdbMutex_Lock(m_query_mutex);
1984  const Cache* cacheTmp = m_cacheQuery;
1985  NdbMutex_Unlock(m_query_mutex);
1986 
1987  if (unlikely(cacheTmp == 0))
1988  {
1989  setError(UsageError, __LINE__);
1990  return -1;
1991  }
1992  const Cache& c = *cacheTmp;
1993  if (unlikely(!c.m_valid))
1994  {
1995  setError(InvalidCache, __LINE__);
1996  return -1;
1997  }
1998 
1999  query_interpolate(c, range, stat);
2000  query_normalize(c, stat.m_value);
2001  return 0;
2002 }
2003 
2004 void
2005 NdbIndexStatImpl::query_interpolate(const Cache& c,
2006  const Range& range,
2007  Stat& stat)
2008 {
2009  const uint keyAttrs = c.m_keyAttrs;
2010  StatValue& value = stat.m_value;
2011  value.m_empty = false;
2012  stat.m_rule[0] = "-";
2013  stat.m_rule[1] = "-";
2014  stat.m_rule[2] = "-";
2015 
2016  if (c.m_sampleCount == 0)
2017  {
2018  stat.m_rule[0] = "r1.1";
2019  value.m_empty = true;
2020  return;
2021  }
2022  const uint posMIN = 0;
2023  const uint posMAX = c.m_sampleCount - 1;
2024 
2025  const Bound& bound1 = range.m_bound1;
2026  const Bound& bound2 = range.m_bound2;
2027  if (bound1.m_data.is_empty() && bound2.m_data.is_empty())
2028  {
2029  stat.m_rule[0] = "r1.2";
2030  value.m_rir = c.get_rir(posMAX);
2031  for (uint k = 0; k < keyAttrs; k++)
2032  value.m_unq[k] = c.get_unq(posMAX, k);
2033  return;
2034  }
2035 
2036  StatBound& stat1 = stat.m_stat1;
2037  StatBound& stat2 = stat.m_stat2;
2038  if (!bound1.m_data.is_empty())
2039  {
2040  query_interpolate(c, bound1, stat1);
2041  query_normalize(c, stat1.m_value);
2042  stat.m_rule[1] = stat1.m_rule;
2043  }
2044  if (!bound2.m_data.is_empty())
2045  {
2046  query_interpolate(c, bound2, stat2);
2047  query_normalize(c, stat2.m_value);
2048  stat.m_rule[2] = stat2.m_rule;
2049  }
2050 
2051  const StatValue& value1 = stat1.m_value;
2052  const StatValue& value2 = stat2.m_value;
2053  const uint posL1 = stat1.m_pos - 1; // invalid if posH1 == posMIN
2054  const uint posH1 = stat1.m_pos;
2055  const uint posL2 = stat2.m_pos - 1; // invalid if posH2 == posMIN
2056  const uint posH2 = stat2.m_pos;
2057  const uint cnt1 = bound1.m_data.get_cnt();
2058  const uint cnt2 = bound2.m_data.get_cnt();
2059  const int side1 = bound1.m_bound.get_side();
2060  const int side2 = bound2.m_bound.get_side();
2061  const uint mincnt = min(cnt1, cnt2);
2062  Uint32 numEq = 0; // of bound1,bound2
2063 
2064  if (bound1.m_data.is_empty())
2065  {
2066  stat.m_rule[0] = "r1.3";
2067  value.m_rir = value2.m_rir;
2068  for (uint k = 0; k < keyAttrs; k++)
2069  value.m_unq[k] = value2.m_unq[k];
2070  return;
2071  }
2072  if (bound2.m_data.is_empty())
2073  {
2074  stat.m_rule[0] = "r1.4";
2075  value.m_rir = c.get_rir(posMAX) - value1.m_rir;
2076  for (uint k = 0; k < keyAttrs; k++)
2077  value.m_unq[k] = c.get_unq(posMAX, k) - value1.m_unq[k];
2078  return;
2079  }
2080  if (posH1 > posH2)
2081  {
2082  stat.m_rule[0] = "r1.5";
2083  value.m_empty = true;
2084  return;
2085  }
2086  // also returns number of equal initial components
2087  if (bound1.m_bound.cmp(bound2.m_bound, mincnt, numEq) >= 0)
2088  {
2089  stat.m_rule[0] = "r1.6";
2090  value.m_empty = true;
2091  return;
2092  }
2093  if (posH1 == posMIN)
2094  {
2095  stat.m_rule[0] = "r1.7";
2096  value.m_rir = value2.m_rir - value1.m_rir;
2097  for (uint k = 0; k < keyAttrs; k++)
2098  value.m_unq[k] = value2.m_unq[k] - value1.m_unq[k];
2099  return;
2100  }
2101  if (posH2 == posMAX + 1)
2102  {
2103  stat.m_rule[0] = "r1.8";
2104  value.m_rir = value2.m_rir - value1.m_rir;
2105  for (uint k = 0; k <= keyAttrs; k++)
2106  value.m_unq[k] = value2.m_unq[k] - value1.m_unq[k];
2107  return;
2108  }
2109  if (posL1 == posL2)
2110  {
2111  assert(posH1 == posH2);
2112  if (cnt1 == keyAttrs &&
2113  cnt2 == keyAttrs &&
2114  numEq == keyAttrs) {
2115  stat.m_rule[0] = "r2.1";
2116  assert(side1 == -1 && side2 == +1);
2117  assert(stat1.m_numEqL < keyAttrs && stat2.m_numEqH < keyAttrs);
2118  value.m_rir = c.get_rpk(posL1, posH1, keyAttrs - 1);
2119  for (uint k = 0; k < keyAttrs; k++)
2120  value.m_unq[k] = value.m_rir / c.get_rpk(posL1, posH1, k);
2121  return;
2122  }
2123  if (numEq != 0)
2124  {
2125  stat.m_rule[0] = "r2.2";
2126  // skip for now
2127  }
2128  if (true)
2129  {
2130  stat.m_rule[0] = "r2.3";
2131  const double w = 0.5;
2132  value.m_rir = w * c.get_rir(posL1, posH1);
2133  for (uint k = 0; k < keyAttrs; k++)
2134  value.m_unq[k] = w * c.get_unq(posL1, posH1, k);
2135  return;
2136  }
2137  }
2138  if (posH1 == posL2)
2139  {
2140  if (cnt1 == keyAttrs &&
2141  cnt2 == keyAttrs &&
2142  numEq == keyAttrs) {
2143  stat.m_rule[0] = "r3.1";
2144  assert(side1 == -1 && side2 == +1);
2145  assert(stat1.m_numEqH == keyAttrs && stat2.m_numEqL == keyAttrs);
2146  value.m_rir = value2.m_rir - value1.m_rir;
2147  for (uint k = 0; k < keyAttrs; k++)
2148  value.m_unq[k] = value2.m_unq[k] - value1.m_unq[k];
2149  return;
2150  }
2151  if (numEq != 0)
2152  {
2153  stat.m_rule[0] = "r3.2";
2154  // skip for now
2155  }
2156  if (true)
2157  {
2158  stat.m_rule[0] = "r3.3";
2159  const double w = 0.5;
2160  value.m_rir = w * c.get_rir(posL1, posH1);
2161  for (uint k = 0; k < keyAttrs; k++)
2162  value.m_unq[k] = w * c.get_unq(posL1, posH1, k);
2163  return;
2164  }
2165  }
2166  if (true)
2167  {
2168  stat.m_rule[0] = "r4";
2169  value.m_rir = value2.m_rir - value1.m_rir;
2170  for (uint k = 0; k < keyAttrs; k++)
2171  value.m_unq[k] = value2.m_unq[k] - value1.m_unq[k];
2172  return;
2173  }
2174 }
2175 
2176 void
2177 NdbIndexStatImpl::query_interpolate(const Cache& c,
2178  const Bound& bound,
2179  StatBound& stat)
2180 {
2181  const uint keyAttrs = c.m_keyAttrs;
2182  StatValue& value = stat.m_value;
2183  value.m_empty = false;
2184  stat.m_rule = "-";
2185 
2186  query_search(c, bound, stat);
2187 
2188  const uint posMIN = 0;
2189  const uint posMAX = c.m_sampleCount - 1;
2190  const uint posL = stat.m_pos - 1; // invalid if posH == posMIN
2191  const uint posH = stat.m_pos;
2192  const uint cnt = bound.m_data.get_cnt();
2193  const int side = bound.m_bound.get_side();
2194 
2195  if (posH == posMIN)
2196  {
2197  if (cnt == keyAttrs &&
2198  cnt == stat.m_numEqH) {
2199  stat.m_rule = "b1.1";
2200  assert(side == -1);
2201  value.m_rir = c.get_rir(posMIN) - c.get_rpk(posMIN, keyAttrs - 1);
2202  for (uint k = 0; k < keyAttrs; k++)
2203  value.m_unq[k] = c.get_unq(posMIN, k) - 1;
2204  return;
2205  }
2206  if (true)
2207  {
2208  stat.m_rule = "b1.2";
2209  value.m_empty = true;
2210  return;
2211  }
2212  }
2213  if (posH == posMAX + 1)
2214  {
2215  stat.m_rule = "b2";
2216  value.m_rir = c.get_rir(posMAX);
2217  for (uint k = 0; k < keyAttrs; k++)
2218  value.m_unq[k] = c.get_unq(posMAX, k);
2219  return;
2220  }
2221  if (cnt == keyAttrs &&
2222  cnt == stat.m_numEqL) {
2223  stat.m_rule = "b3.1";
2224  assert(side == +1);
2225  value.m_rir = c.get_rir(posL);
2226  for (uint k = 0; k < keyAttrs; k++)
2227  value.m_unq[k] = c.get_unq(posL, k);
2228  return;
2229  }
2230  if (cnt == keyAttrs &&
2231  cnt == stat.m_numEqH &&
2232  side == +1) {
2233  stat.m_rule = "b3.2";
2234  value.m_rir = c.get_rir(posH);
2235  for (uint k = 0; k < keyAttrs; k++)
2236  value.m_unq[k] = c.get_unq(posH, k);
2237  return;
2238  }
2239  if (cnt == keyAttrs &&
2240  cnt == stat.m_numEqH &&
2241  side == -1) {
2242  stat.m_rule = "b3.3";
2243  const double u = c.get_unq(posL, posH, keyAttrs - 1);
2244  const double wL = 1.0 / u;
2245  const double wH = 1.0 - wL;
2246  value.m_rir = wL * c.get_rir(posL) + wH * c.get_rir(posH);
2247  for (uint k = 0; k < keyAttrs; k++)
2248  value.m_unq[k] = wL * c.get_unq(posL, k) + wH * c.get_unq(posH, k);
2249  return;
2250  }
2251  if (true)
2252  {
2253  stat.m_rule = "b4";
2254  const double wL = 0.5;
2255  const double wH = 0.5;
2256  value.m_rir = wL * c.get_rir(posL) + wH * c.get_rir(posH);
2257  for (uint k = 0; k < keyAttrs; k++)
2258  value.m_unq[k] = wL * c.get_unq(posL, k) + wH * c.get_unq(posH, k);
2259  return;
2260  }
2261 }
2262 
2263 void
2264 NdbIndexStatImpl::query_search(const Cache& c,
2265  const Bound& bound,
2266  StatBound& stat)
2267 {
2268  assert(c.m_sampleCount > 0);
2269  assert(!bound.m_data.is_empty());
2270  Uint32 numEq;
2271 
2272  int lo = -1;
2273  int hi = c.m_sampleCount;
2274  while (hi - lo > 1)
2275  {
2276  int j = (hi + lo) / 2;
2277  assert(lo < j && j < hi);
2278  int res = query_keycmp(c, bound, j, numEq);
2279  if (res < 0)
2280  lo = j;
2281  else if (res > 0)
2282  hi = j;
2283  else
2284  {
2285  assert(false);
2286  return;
2287  }
2288  }
2289  assert(hi - lo == 1);
2290  stat.m_pos = hi;
2291 
2292  if (stat.m_pos > 0)
2293  {
2294  (void)query_keycmp(c, bound, stat.m_pos - 1, stat.m_numEqL);
2295  }
2296  if (stat.m_pos < c.m_sampleCount)
2297  {
2298  (void)query_keycmp(c, bound, stat.m_pos, stat.m_numEqH);
2299  }
2300 }
2301 
2302 // return <0/>0 for key before/after bound
2303 int
2304 NdbIndexStatImpl::query_keycmp(const Cache& c,
2305  const Bound& bound,
2306  uint pos, Uint32& numEq)
2307 {
2308  const uint addr = c.get_keyaddr(pos);
2309  const Uint8* key = c.get_keyptr(addr);
2310  NdbPack::DataC keyData(m_keySpec, false);
2311  keyData.set_buf(key, c.m_keyBytes - addr, c.m_keyAttrs);
2312  // reverse result for key vs bound
2313  Uint32 cnt = bound.m_bound.get_data().get_cnt();
2314  int res = (-1) * bound.m_bound.cmp(keyData, cnt, numEq);
2315  return res;
2316 }
2317 
2318 // events and polling
2319 
2320 int
2321 NdbIndexStatImpl::create_sysevents(Ndb* ndb)
2322 {
2323  Sys sys(this, ndb);
2324  NdbDictionary::Dictionary* const dic = ndb->getDictionary();
2325 
2326  if (check_systables(sys) == -1)
2327  return -1;
2328  const NdbDictionary::Table* tab = sys.m_headtable;
2329  require(tab != 0);
2330 
2331  const char* const evname = NDB_INDEX_STAT_HEAD_EVENT;
2332  NdbDictionary::Event ev(evname, *tab);
2333  ev.addTableEvent(NdbDictionary::Event::TE_INSERT);
2334  ev.addTableEvent(NdbDictionary::Event::TE_DELETE);
2335  ev.addTableEvent(NdbDictionary::Event::TE_UPDATE);
2336  for (int i = 0; i < tab->getNoOfColumns(); i++)
2337  ev.addEventColumn(i);
2338  ev.setReport(NdbDictionary::Event::ER_UPDATED);
2339 
2340  if (dic->createEvent(ev) == -1)
2341  {
2342  setError(dic->getNdbError().code, __LINE__);
2343  return -1;
2344  }
2345  return 0;
2346 }
2347 
2348 int
2349 NdbIndexStatImpl::drop_sysevents(Ndb* ndb)
2350 {
2351  Sys sys(this, ndb);
2352  NdbDictionary::Dictionary* const dic = ndb->getDictionary();
2353 
2354  if (check_systables(sys) == -1)
2355  return -1;
2356 
2357  const char* const evname = NDB_INDEX_STAT_HEAD_EVENT;
2358  if (dic->dropEvent(evname) == -1)
2359  {
2360  int code = dic->getNdbError().code;
2361  if (code != 4710)
2362  {
2363  setError(dic->getNdbError().code, __LINE__);
2364  return -1;
2365  }
2366  }
2367  return 0;
2368 }
2369 
2370 int
2371 NdbIndexStatImpl::check_sysevents(Ndb* ndb)
2372 {
2373  Sys sys(this, ndb);
2374  NdbDictionary::Dictionary* const dic = ndb->getDictionary();
2375 
2376  if (check_systables(sys) == -1)
2377  return -1;
2378 
2379  const char* const evname = NDB_INDEX_STAT_HEAD_EVENT;
2380  const NdbDictionary::Event* ev = dic->getEvent(evname);
2381  if (ev == 0)
2382  {
2383  setError(dic->getNdbError().code, __LINE__);
2384  return -1;
2385  }
2386  delete ev; // getEvent() creates new instance
2387  return 0;
2388 }
2389 
2390 int
2391 NdbIndexStatImpl::create_listener(Ndb* ndb)
2392 {
2393  if (m_eventOp != 0)
2394  {
2395  setError(UsageError, __LINE__);
2396  return -1;
2397  }
2398  const char* const evname = NDB_INDEX_STAT_HEAD_EVENT;
2399  m_eventOp = ndb->createEventOperation(evname);
2400  if (m_eventOp == 0)
2401  {
2402  setError(ndb->getNdbError().code, __LINE__);
2403  return -1;
2404  }
2405 
2406  // all columns are non-nullable
2407  Head& head = m_facadeHead;
2408  if (m_eventOp->getValue("index_id", (char*)&head.m_indexId) == 0 ||
2409  m_eventOp->getValue("index_version", (char*)&head.m_indexVersion) == 0 ||
2410  m_eventOp->getValue("table_id", (char*)&head.m_tableId) == 0 ||
2411  m_eventOp->getValue("frag_count", (char*)&head.m_fragCount) == 0 ||
2412  m_eventOp->getValue("value_format", (char*)&head.m_valueFormat) == 0 ||
2413  m_eventOp->getValue("sample_version", (char*)&head.m_sampleVersion) == 0 ||
2414  m_eventOp->getValue("load_time", (char*)&head.m_loadTime) == 0 ||
2415  m_eventOp->getValue("sample_count", (char*)&head.m_sampleCount) == 0 ||
2416  m_eventOp->getValue("key_bytes", (char*)&head.m_keyBytes) == 0)
2417  {
2418  setError(m_eventOp->getNdbError().code, __LINE__);
2419  return -1;
2420  }
2421  // wl4124_todo why this
2422  static Head xxx;
2423  if (m_eventOp->getPreValue("index_id", (char*)&xxx.m_indexId) == 0 ||
2424  m_eventOp->getPreValue("index_version", (char*)&xxx.m_indexVersion) == 0 ||
2425  m_eventOp->getPreValue("table_id", (char*)&xxx.m_tableId) == 0 ||
2426  m_eventOp->getPreValue("frag_count", (char*)&xxx.m_fragCount) == 0 ||
2427  m_eventOp->getPreValue("value_format", (char*)&xxx.m_valueFormat) == 0 ||
2428  m_eventOp->getPreValue("sample_version", (char*)&xxx.m_sampleVersion) == 0 ||
2429  m_eventOp->getPreValue("load_time", (char*)&xxx.m_loadTime) == 0 ||
2430  m_eventOp->getPreValue("sample_count", (char*)&xxx.m_sampleCount) == 0 ||
2431  m_eventOp->getPreValue("key_bytes", (char*)&xxx.m_keyBytes) == 0)
2432  {
2433  setError(m_eventOp->getNdbError().code, __LINE__);
2434  return -1;
2435  }
2436  return 0;
2437 }
2438 
2439 int
2440 NdbIndexStatImpl::execute_listener(Ndb* ndb)
2441 {
2442  if (m_eventOp == 0)
2443  {
2444  setError(UsageError, __LINE__);
2445  return -1;
2446  }
2447  if (m_eventOp->execute() == -1)
2448  {
2449  setError(m_eventOp->getNdbError().code, __LINE__);
2450  return -1;
2451  }
2452  return 0;
2453 }
2454 
2455 int
2456 NdbIndexStatImpl::poll_listener(Ndb* ndb, int max_wait_ms)
2457 {
2458  int ret;
2459  if ((ret = ndb->pollEvents(max_wait_ms)) < 0)
2460  {
2461  setError(ndb->getNdbError().code, __LINE__);
2462  return -1;
2463  }
2464  return (ret == 0 ? 0 : 1);
2465 }
2466 
2467 int
2468 NdbIndexStatImpl::next_listener(Ndb* ndb)
2469 {
2470  NdbEventOperation* op = ndb->nextEvent();
2471  if (op == 0)
2472  return 0;
2473 
2474  Head& head = m_facadeHead;
2475  head.m_eventType = (int)op->getEventType();
2476  return 1;
2477 }
2478 
2479 int
2480 NdbIndexStatImpl::drop_listener(Ndb* ndb)
2481 {
2482  if (m_eventOp == 0)
2483  {
2484  setError(UsageError, __LINE__);
2485  return -1;
2486  }
2487  if (ndb->dropEventOperation(m_eventOp) != 0)
2488  {
2489  setError(ndb->getNdbError().code, __LINE__);
2490  return -1;
2491  }
2492  m_eventOp = 0;
2493  return 0;
2494 }
2495 
2496 // mem alloc - default impl
2497 
2498 NdbIndexStatImpl::MemDefault::MemDefault()
2499 {
2500 }
2501 
2502 NdbIndexStatImpl::MemDefault::~MemDefault()
2503 {
2504 }
2505 
2506 void*
2507 NdbIndexStatImpl::MemDefault::mem_alloc(UintPtr size)
2508 {
2509  void* ptr = malloc(size);
2510  return ptr;
2511 }
2512 
2513 void
2515 {
2516  if (ptr != 0)
2517  free(ptr);
2518 }
2519 
2520 // error
2521 
2522 void
2523 NdbIndexStatImpl::setError(int code, int line, int extra)
2524 {
2525  if (code == 0)
2526  code = InternalError;
2527  m_error.code = code;
2528  m_error.line = line;
2529  m_error.extra = extra;
2530 #ifdef VM_TRACE
2531  const char* p = NdbEnv_GetEnv("NDB_INDEX_STAT_ABORT_ON_ERROR", (char*)0, 0);
2532  if (p != 0 && strchr("1Y", p[0]) != 0)
2533  abort();
2534 #endif
2535 }
2536 
2537 void
2538 NdbIndexStatImpl::setError(const Con& con, int line)
2539 {
2540  int code = 0;
2541  if (code == 0 && con.m_op != 0)
2542  {
2543  code = con.m_op->getNdbError().code;
2544  }
2545  if (code == 0 && con.m_scanop != 0)
2546  {
2547  code = con.m_scanop->getNdbError().code;
2548  }
2549  if (code == 0 && con.m_tx != 0)
2550  {
2551  code = con.m_tx->getNdbError().code;
2552  }
2553  if (code == 0 && con.m_dic != 0)
2554  {
2555  code = con.m_dic->getNdbError().code;
2556  }
2557  if (code == 0 && con.m_ndb != 0)
2558  {
2559  code = con.m_ndb->getNdbError().code;
2560  }
2561  setError(code, line);
2562 }
2563 
2564 void
2565 NdbIndexStatImpl::mapError(const int* map, int code)
2566 {
2567  while (*map != 0)
2568  {
2569  if (m_error.code == *map) {
2570  m_error.code = code;
2571  break;
2572  }
2573  map++;
2574  }
2575 }