MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
NdbPack.cpp
1 /*
2  Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 #include <ndb_global.h>
19 #include <NdbPack.hpp>
20 #include <NdbOut.hpp>
21 #include <NdbEnv.h>
22 
23 // NdbPack::Error
24 
25 int
26 NdbPack::Error::get_error_code() const
27 {
28  return m_error_code;
29 }
30 
31 int
32 NdbPack::Error::get_error_line() const
33 {
34  return m_error_line;
35 }
36 
37 void
38 NdbPack::Error::set_error(int code, int line) const
39 {
40  m_error_code = code;
41  m_error_line = line;
42 #ifdef VM_TRACE
43  const char* p = NdbEnv_GetEnv("NDB_PACK_ABORT_ON_ERROR", (char*)0, 0);
44  if (p != 0 && strchr("1Y", p[0]) != 0)
45  require(false);
46 #endif
47 }
48 
49 void
50 NdbPack::Error::set_error(const Error& e2) const
51 {
52  set_error(e2.m_error_code, e2.m_error_line);
53 }
54 
55 // NdbPack::Endian
56 
57 void
58 NdbPack::Endian::convert(void* ptr, Uint32 len)
59 {
60  Uint8* p = (Uint8*)ptr;
61  for (Uint32 i = 0; i < len / 2; i++)
62  {
63  Uint32 j = len - i - 1;
64  Uint8 tmp = p[i];
65  p[i] = p[j];
66  p[j] = tmp;
67  }
68 }
69 
70 // NdbPack::Type
71 
73  bool m_supported;
74  Uint16 m_fixSize; // if non-zero must have this exact size
75  Uint16 m_arrayType; // 0,1,2 length bytes
76  bool m_charType; // type with character set
77  bool m_convert; // convert endian (reverse byte order)
78 };
79 
80 static const Ndb_pack_type_info
81 g_ndb_pack_type_info[] = {
82  { 0, 0, 0, 0, 0 }, // NDB_TYPE_UNDEFINED
83  { 1, 1, 0, 0, 1 }, // NDB_TYPE_TINYINT
84  { 1, 1, 0, 0, 1 }, // NDB_TYPE_TINYUNSIGNED
85  { 1, 2, 0, 0, 1 }, // NDB_TYPE_SMALLINT
86  { 1, 2, 0, 0, 1 }, // NDB_TYPE_SMALLUNSIGNED
87  { 1, 3, 0, 0, 1 }, // NDB_TYPE_MEDIUMINT
88  { 1, 3, 0, 0, 1 }, // NDB_TYPE_MEDIUMUNSIGNED
89  { 1, 4, 0, 0, 1 }, // NDB_TYPE_INT
90  { 1, 4, 0, 0, 1 }, // NDB_TYPE_UNSIGNED
91  { 1, 8, 0, 0, 1 }, // NDB_TYPE_BIGINT
92  { 1, 8, 0, 0, 1 }, // NDB_TYPE_BIGUNSIGNED
93  { 1, 4, 0, 0, 1 }, // NDB_TYPE_FLOAT
94  { 1, 8, 0, 0, 1 }, // NDB_TYPE_DOUBLE
95  { 1, 0, 0, 0, 0 }, // NDB_TYPE_OLDDECIMAL
96  { 1, 0, 0, 1, 0 }, // NDB_TYPE_CHAR
97  { 1, 0, 1, 1, 0 }, // NDB_TYPE_VARCHAR
98  { 1, 0, 0, 0, 0 }, // NDB_TYPE_BINARY
99  { 1, 0, 1, 0, 0 }, // NDB_TYPE_VARBINARY
100  { 1, 8, 0, 0, 0 }, // NDB_TYPE_DATETIME
101  { 1, 3, 0, 0, 0 }, // NDB_TYPE_DATE
102  { 0, 0, 0, 0, 0 }, // NDB_TYPE_BLOB
103  { 0, 0, 0, 1, 0 }, // NDB_TYPE_TEXT
104  { 0, 0, 0, 0, 0 }, // NDB_TYPE_BIT
105  { 1, 0, 2, 1, 0 }, // NDB_TYPE_LONGVARCHAR
106  { 1, 0, 2, 0, 0 }, // NDB_TYPE_LONGVARBINARY
107  { 1, 3, 0, 0, 0 }, // NDB_TYPE_TIME
108  { 1, 1, 0, 0, 0 }, // NDB_TYPE_YEAR
109  { 1, 4, 0, 0, 0 }, // NDB_TYPE_TIMESTAMP
110  { 1, 0, 0, 0, 0 }, // NDB_TYPE_OLDDECIMALUNSIGNED
111  { 1, 0, 0, 0, 0 }, // NDB_TYPE_DECIMAL
112  { 1, 0, 0, 0, 0 } // NDB_TYPE_DECIMALUNSIGNED
113 };
114 
115 static const int g_ndb_pack_type_info_cnt =
116  sizeof(g_ndb_pack_type_info) / sizeof(g_ndb_pack_type_info[0]);
117 
118 int
119 NdbPack::Type::complete()
120 {
121  if (m_typeId == 0)
122  {
123  set_error(TypeNotSet, __LINE__);
124  return -1;
125  }
126  if (m_typeId >= g_ndb_pack_type_info_cnt)
127  {
128  set_error(TypeNotSet, __LINE__);
129  return -1;
130  }
131  const Ndb_pack_type_info& info = g_ndb_pack_type_info[m_typeId];
132  if (!info.m_supported)
133  {
134  set_error(TypeNotSupported, __LINE__);
135  return -1;
136  }
137  if (m_byteSize == 0)
138  {
139  set_error(TypeSizeZero, __LINE__);
140  return -1;
141  }
142  if (info.m_fixSize != 0 && m_byteSize != info.m_fixSize)
143  {
144  set_error(TypeFixSizeInvalid, __LINE__);
145  return -1;
146  }
147  if (!(m_nullable <= 1))
148  {
149  set_error(TypeNullableNotBool, __LINE__);
150  return -1;
151  }
152  if (info.m_charType && m_csNumber == 0)
153  {
154  set_error(CharsetNotSpecified, __LINE__);
155  return -1;
156  }
157  if (info.m_charType && all_charsets[m_csNumber] == 0)
158  {
159  CHARSET_INFO* cs = get_charset(m_csNumber, MYF(0));
160  if (cs == 0)
161  {
162  set_error(CharsetNotFound, __LINE__);
163  return -1;
164  }
165  all_charsets[m_csNumber] = cs; // yes caller must do this
166  }
167  if (!info.m_charType && m_csNumber != 0)
168  {
169  set_error(CharsetNotAllowed, __LINE__);
170  return -1;
171  }
172  m_arrayType = info.m_arrayType;
173  return 0;
174 }
175 
176 // NdbPack::Spec
177 
178 int
179 NdbPack::Spec::add(Type type)
180 {
181  Uint32 cnt = m_cnt;
182  Uint32 nullable_cnt = m_nullableCnt;
183  Uint32 varsize_cnt = m_varsizeCnt;
184  Uint32 max_byte_size = m_maxByteSize;
185  if (type.complete() == -1)
186  {
187  set_error(type);
188  return -1;
189  }
190  type.m_nullbitPos = 0xFFFF;
191  if (type.m_nullable)
192  {
193  type.m_nullbitPos = nullable_cnt;
194  nullable_cnt++;
195  }
196  if (type.m_arrayType != 0)
197  {
198  varsize_cnt++;
199  }
200  max_byte_size += type.m_byteSize;
201  if (cnt >= m_bufMaxCnt)
202  {
203  set_error(SpecBufOverflow, __LINE__);
204  return -1;
205  }
206  m_buf[cnt] = type;
207  cnt++;
208  m_cnt = cnt;
209  m_nullableCnt = nullable_cnt;
210  m_varsizeCnt = varsize_cnt;
211  m_maxByteSize = max_byte_size;
212  return 0;
213 }
214 
215 int
216 NdbPack::Spec::add(Type type, Uint32 cnt)
217 {
218  for (Uint32 i = 0; i < cnt; i++)
219  {
220  if (add(type) == -1)
221  return -1;
222  }
223  return 0;
224 }
225 
226 void
227 NdbPack::Spec::copy(const Spec& s2)
228 {
229  assert(m_bufMaxCnt >= s2.m_cnt);
230  reset();
231  m_cnt = s2.m_cnt;
232  m_nullableCnt = s2.m_nullableCnt;
233  m_varsizeCnt = s2.m_varsizeCnt;
234  m_maxByteSize = s2.m_maxByteSize;
235  for (Uint32 i = 0; i < m_cnt; i++)
236  {
237  m_buf[i] = s2.m_buf[i];
238  }
239 }
240 
241 // NdbPack::Iter
242 
243 int
244 NdbPack::Iter::desc(const Uint8* item)
245 {
246  const Uint32 i = m_cnt; // item index
247  assert(i < m_spec.m_cnt);
248  const Type& type = m_spec.m_buf[i];
249  const Uint32 lenBytes = type.m_arrayType;
250  Uint32 bareLen = 0;
251  switch (lenBytes) {
252  case 0:
253  bareLen = type.m_byteSize;
254  break;
255  case 1:
256  bareLen = item[0];
257  break;
258  case 2:
259  bareLen = item[0] + (item[1] << 8);
260  break;
261  default:
262  assert(false);
263  set_error(InternalError, __LINE__);
264  return -1;
265  }
266  const Uint32 itemLen = lenBytes + bareLen;
267  if (itemLen > type.m_byteSize)
268  {
269  set_error(DataValueOverflow, __LINE__);
270  return -1;
271  }
272  m_itemPos += m_itemLen; // skip previous item
273  m_cnt++;
274  m_lenBytes = lenBytes;
275  m_bareLen = bareLen;
276  m_itemLen = itemLen;
277  return 0;
278 }
279 
280 int
281 NdbPack::Iter::desc_null()
282 {
283  assert(m_cnt < m_spec.m_cnt);
284  // caller checks if null allowed
285  m_itemPos += m_itemLen; // skip previous item
286  m_cnt++;
287  m_nullCnt++;
288  m_lenBytes = 0;
289  m_bareLen = 0;
290  m_itemLen = 0;
291  return 0;
292 }
293 
294 int
295 NdbPack::Iter::cmp(const Iter& r2, const Uint8* buf1, const Uint8* buf2) const
296 {
297  const Iter& r1 = *this;
298  assert(&r1.m_spec == &r2.m_spec);
299  assert(r1.m_cnt == r2.m_cnt && r1.m_cnt > 0);
300  const Uint32 i = r1.m_cnt - 1; // item index
301  int res = 0;
302  const Uint32 n1 = r1.m_itemLen;
303  const Uint32 n2 = r2.m_itemLen;
304  if (n1 != 0)
305  {
306  if (n2 != 0)
307  {
308  const Type& type = r1.m_spec.m_buf[i];
309  const NdbSqlUtil::Type& sqlType = getSqlType(type.m_typeId);
310  const Uint8* p1 = &buf1[r1.m_itemPos];
311  const Uint8* p2 = &buf2[r2.m_itemPos];
312  CHARSET_INFO* cs = all_charsets[type.m_csNumber];
313  res = (*sqlType.m_cmp)(cs, p1, n1, p2, n2);
314  }
315  else
316  {
317  res = +1;
318  }
319  }
320  else
321  {
322  if (n2 != 0)
323  res = -1;
324  }
325  return res;
326 }
327 
328 // NdbPack::DataC
329 
330 int
331 NdbPack::DataC::desc(Iter& r) const
332 {
333  const Uint32 i = r.m_cnt; // item index
334  assert(i < m_cnt);
335  const Type& type = m_spec.m_buf[i];
336  if (type.m_nullable || m_allNullable)
337  {
338  Uint32 nullbitPos = 0;
339  if (!m_allNullable)
340  nullbitPos = type.m_nullbitPos;
341  else
342  nullbitPos = i;
343  const Uint32 byte_pos = nullbitPos / 8;
344  const Uint32 bit_pos = nullbitPos % 8;
345  const Uint8 bit_mask = (1 << bit_pos);
346  const Uint8& the_byte = m_buf[byte_pos];
347  if ((the_byte & bit_mask) != 0)
348  {
349  if (r.desc_null() == -1)
350  {
351  set_error(r);
352  return -1;
353  }
354  return 0;
355  }
356  }
357  const Uint32 pos = r.m_itemPos + r.m_itemLen;
358  const Uint8* item = &m_buf[pos];
359  if (r.desc(item) == -1)
360  {
361  set_error(r);
362  return -1;
363  }
364  return 0;
365 }
366 
367 int
368 NdbPack::DataC::cmp(const DataC& d2, Uint32 cnt, Uint32& num_eq) const
369 {
370  const DataC& d1 = *this;
371  assert(cnt <= d1.m_cnt);
372  assert(cnt <= d2.m_cnt);
373  Iter r1(d1);
374  Iter r2(d2);
375  int res = 0;
376  Uint32 i; // remember last
377  for (i = 0; i < cnt; i++)
378  {
379  d1.desc(r1);
380  d2.desc(r2);
381  res = r1.cmp(r2, d1.m_buf, d2.m_buf);
382  if (res != 0)
383  break;
384  }
385  num_eq = i;
386  return res;
387 }
388 
389 // NdbPack::Data
390 
391 int
392 NdbPack::Data::add(const void* data, Uint32* len_out)
393 {
394  assert(data != 0);
395  const Uint8* item = (const Uint8*)data;
396  const Uint32 i = m_cnt; // item index
397  if (i >= m_spec.m_cnt)
398  {
399  set_error(DataCntOverflow, __LINE__);
400  return -1;
401  }
402  Iter& r = m_iter;
403  assert(r.m_cnt == i);
404  const Uint32 fullLen = m_varBytes + r.m_itemPos + r.m_itemLen;
405  if (r.desc(item) == -1)
406  {
407  set_error(r);
408  return -1;
409  }
410  if (fullLen + r.m_itemLen > m_bufMaxLen)
411  {
412  set_error(DataBufOverflow, __LINE__);
413  return -1;
414  }
415  memcpy(&m_buf[fullLen], item, r.m_itemLen);
416  *len_out = r.m_itemLen;
417  m_cnt++;
418  return 0;
419 }
420 
421 int
422 NdbPack::Data::add(const void* data, Uint32 cnt, Uint32* len_out)
423 {
424  const Uint8* data_ptr = (const Uint8*)data;
425  Uint32 len_tot = 0;
426  for (Uint32 i = 0; i < cnt; i++)
427  {
428  Uint32 len;
429  if (add(data_ptr, &len) == -1)
430  return -1;
431  if (data != 0)
432  data_ptr += len;
433  len_tot += len;
434  }
435  *len_out = len_tot;
436  return 0;
437 }
438 
439 int
440 NdbPack::Data::add_null(Uint32* len_out)
441 {
442  const Uint32 i = m_cnt; // item index
443  if (i >= m_spec.m_cnt)
444  {
445  set_error(DataCntOverflow, __LINE__);
446  return -1;
447  }
448  Iter& r = m_iter;
449  assert(r.m_cnt == i);
450  if (r.desc_null() == -1)
451  {
452  set_error(r);
453  return -1;
454  }
455  Uint32 nullbitPos = 0;
456  if (!m_allNullable)
457  {
458  const Type& type = m_spec.m_buf[i];
459  if (!type.m_nullable)
460  {
461  set_error(DataNotNullable, __LINE__);
462  return -1;
463  }
464  nullbitPos = type.m_nullbitPos;
465  }
466  else
467  {
468  nullbitPos = i;
469  }
470  const Uint32 byte_pos = nullbitPos / 8;
471  const Uint32 bit_pos = nullbitPos % 8;
472  const Uint8 bit_mask = (1 << bit_pos);
473  Uint8& the_byte = m_buf[m_varBytes + byte_pos];
474  assert((the_byte & bit_mask) == 0);
475  the_byte |= bit_mask;
476  *len_out = r.m_itemLen;
477  m_cnt++;
478  return 0;
479 }
480 
481 int
482 NdbPack::Data::add_null(Uint32 cnt, Uint32* len_out)
483 {
484  Uint32 len_tot = 0;
485  for (Uint32 i = 0; i < cnt; i++)
486  {
487  Uint32 len;
488  if (add_null(&len) == -1)
489  return -1;
490  len_tot += len;
491  }
492  *len_out = len_tot;
493  return 0;
494 }
495 
496 int
497 NdbPack::Data::add_poai(const Uint32* poai, Uint32* len_out)
498 {
499  const AttributeHeader ah = *(const AttributeHeader*)&poai[0];
500  if (!ah.isNULL())
501  {
502  if (add(&poai[1], len_out) == -1)
503  return -1;
504  }
505  else
506  {
507  if (add_null(len_out) == -1)
508  return -1;
509  }
510  if (ah.getByteSize() != *len_out)
511  {
512  set_error(InvalidAttrInfo, __LINE__);
513  return -1;
514  }
515  return 0;
516 }
517 
518 int
519 NdbPack::Data::add_poai(const Uint32* poai, Uint32 cnt, Uint32* len_out)
520 {
521  Uint32 len_tot = 0;
522  for (Uint32 i = 0; i < cnt; i++)
523  {
524  Uint32 len;
525  if (add_poai(poai, &len) == -1)
526  return -1;
527  len_tot += len;
528  poai += 1 + (len + 3) / 4;
529  }
530  *len_out = len_tot;
531  return 0;
532 }
533 
534 int
535 NdbPack::Data::finalize_impl()
536 {
537  const Uint32 dataLen = m_iter.m_itemPos + m_iter.m_itemLen;
538  switch (m_varBytes) {
539  // case 0: inlined
540  case 1:
541  if (dataLen <= 0xFF)
542  {
543  m_buf[0] = dataLen;
544  return 0;
545  }
546  break;
547  case 2:
548  if (dataLen <= 0xFFFF)
549  {
550  m_buf[0] = (dataLen & 0xFF);
551  m_buf[1] = (dataLen >> 8);
552  return 0;
553  }
554  break;
555  default:
556  break;
557  }
558  set_error(InternalError, __LINE__);
559  return -1;
560 }
561 
562 int
563 NdbPack::Data::desc_all(Uint32 cnt, Endian::Value from_endian)
564 {
565  if (from_endian == NdbPack::Endian::Native)
566  from_endian = NdbPack::Endian::get_endian();
567  m_endian = from_endian;
568  assert(m_cnt == 0); // reset() would destroy nullmask
569  for (Uint32 i = 0; i < cnt; i++)
570  {
571  m_cnt++;
572  if (desc(m_iter) == -1)
573  return -1;
574  }
575  if (finalize() == -1)
576  return -1;
577  return 0;
578 }
579 
580 int
581 NdbPack::Data::copy(const DataC& d2)
582 {
583  reset();
584  Iter r2(d2);
585  const Uint32 cnt2 = d2.m_cnt;
586  for (Uint32 i = 0; i < cnt2; i++)
587  {
588  if (d2.desc(r2) == -1)
589  return -1;
590  Uint32 len_out = ~(Uint32)0;
591  if (r2.m_itemLen != 0)
592  {
593  if (add(&d2.m_buf[r2.m_itemPos], &len_out) == -1)
594  return -1;
595  assert(len_out == r2.m_itemLen);
596  }
597  else
598  {
599  if (add_null(&len_out) == -1)
600  return -1;
601  assert(len_out ==0);
602  }
603  }
604  if (finalize() == -1)
605  return -1;
606  return 0;
607 }
608 
609 int
610 NdbPack::Data::convert_impl(Endian::Value to_endian)
611 {
612  const Spec& spec = m_spec;
613  Iter r(*this);
614  for (Uint32 i = 0; i < m_cnt; i++)
615  {
616  if (DataC::desc(r) == -1)
617  {
618  set_error(r);
619  return -1;
620  }
621  const Type& type = spec.m_buf[i];
622  const Uint32 typeId = type.m_typeId;
623  const Ndb_pack_type_info& info = g_ndb_pack_type_info[typeId];
624  if (info.m_convert)
625  {
626  Uint8* ptr = &m_buf[m_varBytes + r.m_itemPos];
627  Uint32 len = r.m_itemLen;
628  Endian::convert(ptr, len);
629  }
630  }
631  return 0;
632 }
633 
634 // NdbPack::BoundC
635 
636 int
637 NdbPack::BoundC::finalize(int side)
638 {
639  if (m_data.m_cnt == 0 && side != 0)
640  {
641  set_error(BoundEmptySide, __LINE__);
642  return -1;
643  }
644  if (m_data.m_cnt != 0 && side != -1 && side != +1)
645  {
646  set_error(BoundNonemptySide, __LINE__);
647  return -1;
648  }
649  m_side = side;
650  return 0;
651 }
652 
653 int
654 NdbPack::BoundC::cmp(const BoundC& b2, Uint32 cnt, Uint32& num_eq) const
655 {
656  const BoundC& b1 = *this;
657  const DataC& d1 = b1.m_data;
658  const DataC& d2 = b2.m_data;
659  int res = d1.cmp(d2, cnt, num_eq);
660  if (res == 0)
661  {
662  if (cnt < d1.m_cnt && cnt < d2.m_cnt)
663  ;
664  else if (d1.m_cnt < d2.m_cnt)
665  res = (+1) * b1.m_side;
666  else if (d1.m_cnt > d2.m_cnt)
667  res = (-1) * b2.m_side;
668  else if (b1.m_side < b2.m_side)
669  res = -1;
670  else if (b1.m_side > b2.m_side)
671  res = +1;
672  }
673  return res;
674 }
675 
676 // NdbPack::Bound
677 
678 // print
679 
680 NdbPack::Print::Print(char* buf, Uint32 bufsz) :
681  m_buf(buf), m_bufsz(bufsz), m_sz(0) {}
682 
683 void
684 NdbPack::Print::print(const char* fmt, ...)
685 {
686  va_list ap;
687  va_start(ap, fmt);
688  if (m_bufsz > m_sz)
689  {
690  BaseString::vsnprintf(&m_buf[m_sz], m_bufsz - m_sz, fmt, ap);
691  m_sz += (Uint32)strlen(&m_buf[m_sz]);
692  }
693  va_end(ap);
694 }
695 
696 // print Type
697 
698 NdbOut&
699 operator<<(NdbOut& out, const NdbPack::Type& a)
700 {
701  a.print(out);
702  return out;
703 }
704 
705 void
706 NdbPack::Type::print(NdbOut& out) const
707 {
708  char buf[200];
709  out << print(buf, sizeof(buf));
710 }
711 
712 const char*
713 NdbPack::Type::print(char* buf, Uint32 bufsz) const
714 {
715  Print p(buf, bufsz);
716  p.print("typeId:%u", m_typeId);
717  p.print(" byteSize:%u", m_byteSize);
718  p.print(" nullable:%u", m_nullable);
719  p.print(" csNumber:%u", m_csNumber);
720  return buf;
721 }
722 
723 // print Spec
724 
725 NdbOut&
726 operator<<(NdbOut& out, const NdbPack::Spec& a)
727 {
728  a.print(out);
729  return out;
730 }
731 
732 void
733 NdbPack::Spec::print(NdbOut& out) const
734 {
735  char buf[8000];
736  out << print(buf, sizeof(buf));
737 }
738 
739 const char*
740 NdbPack::Spec::print(char* buf, Uint32 bufsz) const
741 {
742  Print p(buf, bufsz);
743  p.print("cnt:%u", m_cnt);
744  p.print(" nullableCnt:%u", m_nullableCnt);
745  p.print(" varsizeCnt:%u", m_varsizeCnt);
746  p.print(" nullmaskLen:%u", get_nullmask_len(false));
747  p.print(" maxByteSize:%u", m_maxByteSize);
748  for (Uint32 i = 0; i < m_cnt; i++)
749  {
750  const Type& type = m_buf[i];
751  p.print(" [%u", i);
752  p.print(" typeId:%u", type.m_typeId);
753  p.print(" nullable:%u", type.m_nullable);
754  p.print(" byteSize:%u", type.m_byteSize);
755  p.print(" csNumber:%u", type.m_csNumber);
756  p.print("]");
757  }
758  return buf;
759 }
760 
761 // print DataC
762 
763 bool g_ndb_pack_print_hex_always = true;
764 
765 NdbOut&
766 operator<<(NdbOut& out, const NdbPack::DataC& a)
767 {
768  a.print(out);
769  return out;
770 }
771 
772 void
773 NdbPack::DataC::print(NdbOut& out) const
774 {
775  char buf[8000];
776  out << print(buf, sizeof(buf));
777 }
778 
779 const char*
780 NdbPack::DataC::print(char* buf, Uint32 bufsz, bool convert_flag) const
781 {
782  Print p(buf, bufsz);
783  const Spec& spec = m_spec;
784  const Uint32 nullmask_len = spec.get_nullmask_len(m_allNullable);
785  if (nullmask_len != 0)
786  {
787  p.print("nullmask:");
788  for (Uint32 i = 0; i < nullmask_len; i++)
789  {
790  int x = m_buf[i];
791  p.print("%02x", x);
792  }
793  }
794  Iter r(*this);
795  for (Uint32 i = 0; i < m_cnt; i++)
796  {
797  desc(r);
798  const Uint8* value = &m_buf[r.m_itemPos];
799  p.print(" [%u", i);
800  p.print(" pos:%u", r.m_itemPos);
801  p.print(" len:%u", r.m_itemLen);
802  if (r.m_itemLen > 0)
803  {
804  p.print(" value:");
805  // some specific types for debugging
806  const Type& type = spec.m_buf[i];
807  bool ok = true;
808  switch (type.m_typeId) {
809  case NDB_TYPE_TINYINT:
810  {
811  Int8 x;
812  memcpy(&x, value, 1);
813  if (convert_flag)
814  Endian::convert(&x, 1);
815  p.print("%d", (int)x);
816  }
817  break;
818  case NDB_TYPE_TINYUNSIGNED:
819  {
820  Uint8 x;
821  memcpy(&x, value, 1);
822  if (convert_flag)
823  Endian::convert(&x, 1);
824  p.print("%u", (uint)x);
825  }
826  break;
827  case NDB_TYPE_SMALLINT:
828  {
829  Int16 x;
830  memcpy(&x, value, 2);
831  if (convert_flag)
832  Endian::convert(&x, 2);
833  p.print("%d", (int)x);
834  }
835  break;
836  case NDB_TYPE_SMALLUNSIGNED:
837  {
838  Uint16 x;
839  memcpy(&x, value, 2);
840  if (convert_flag)
841  Endian::convert(&x, 2);
842  p.print("%u", (uint)x);
843  }
844  break;
845  case NDB_TYPE_INT:
846  {
847  Int32 x;
848  memcpy(&x, value, 4);
849  if (convert_flag)
850  Endian::convert(&x, 4);
851  p.print("%d", (int)x);
852  }
853  break;
854  case NDB_TYPE_UNSIGNED:
855  {
856  Uint32 x;
857  memcpy(&x, value, 4);
858  if (convert_flag)
859  Endian::convert(&x, 4);
860  p.print("%u", (uint)x);
861  }
862  break;
863  case NDB_TYPE_FLOAT:
864  {
865  float x;
866  memcpy(&x, value, 4);
867  if (convert_flag)
868  Endian::convert(&x, 4);
869  p.print("%g", (double)x);
870  }
871  break;
872  case NDB_TYPE_DOUBLE:
873  {
874  double x;
875  memcpy(&x, value, 8);
876  if (convert_flag)
877  Endian::convert(&x, 8);
878  p.print("%g", x);
879  }
880  break;
881  case NDB_TYPE_CHAR:
882  case NDB_TYPE_VARCHAR:
883  case NDB_TYPE_LONGVARCHAR:
884  {
885  const Uint32 off = type.m_arrayType;
886  for (Uint32 j = 0; j < r.m_bareLen; j++)
887  {
888  Uint8 x = value[off + j];
889  p.print("%c", (int)x);
890  }
891  }
892  break;
893  default:
894  ok = false;
895  break;
896  }
897  if (!ok || g_ndb_pack_print_hex_always)
898  {
899  p.print("<");
900  for (Uint32 j = 0; j < r.m_itemLen; j++)
901  {
902  int x = value[j];
903  p.print("%02x", x);
904  }
905  p.print(">");
906  }
907  }
908  p.print("]");
909  }
910  return buf;
911 }
912 
913 // print Data
914 
915 NdbOut&
916 operator<<(NdbOut& out, const NdbPack::Data& a)
917 {
918  a.print(out);
919  return out;
920 }
921 
922 void
923 NdbPack::Data::print(NdbOut& out) const
924 {
925  char buf[8000];
926  out << print(buf, sizeof(buf));
927 }
928 
929 const char*
930 NdbPack::Data::print(char* buf, Uint32 bufsz) const
931 {
932  Print p(buf, bufsz);
933  char* ptr = buf;
934  if (m_varBytes != 0)
935  {
936  p.print("varBytes:");
937  for (Uint32 i = 0; i < m_varBytes; i++)
938  {
939  int r = m_buf[i];
940  p.print("%02x", r);
941  }
942  p.print(" ");
943  }
944  p.print("dataLen:%u", m_iter.m_itemPos + m_iter.m_itemLen);
945  p.print(" ");
946  const bool convert_flag =
947  m_endian != Endian::Native &&
948  m_endian != Endian::get_endian();
949  DataC::print(&buf[p.m_sz], bufsz - p.m_sz, convert_flag);
950  return buf;
951 }
952 
953 // print BoundC
954 
955 NdbOut&
956 operator<<(NdbOut& out, const NdbPack::BoundC& a)
957 {
958  a.print(out);
959  return out;
960 }
961 
962 void
963 NdbPack::BoundC::print(NdbOut& out) const
964 {
965  char buf[8000];
966  out << print(buf, sizeof(buf));
967 }
968 
969 const char*
970 NdbPack::BoundC::print(char* buf, Uint32 bufsz) const
971 {
972  Print p(buf, bufsz);
973  p.print("side:%s ", m_side < 0 ? "-" : m_side > 0 ? "+" : "0");
974  m_data.print(&buf[p.m_sz], bufsz - p.m_sz);
975  return buf;
976 }
977 
978 // print Bound
979 
980 NdbOut&
981 operator<<(NdbOut& out, const NdbPack::Bound& a)
982 {
983  a.print(out);
984  return out;
985 }
986 
987 void
988 NdbPack::Bound::print(NdbOut& out) const
989 {
990  char buf[8000];
991  out << print(buf, sizeof(buf));
992 }
993 
994 const char*
995 NdbPack::Bound::print(char* buf, Uint32 bufsz) const
996 {
997  BoundC::print(buf, bufsz);
998  return buf;
999 }
1000 
1001 // validate
1002 
1003 int
1004 NdbPack::Type::validate() const
1005 {
1006  Type type2 = *this;
1007  if (type2.complete() == -1)
1008  {
1009  set_error(type2);
1010  return -1;
1011  }
1012  if (memcmp(this, &type2, sizeof(Type)) != 0)
1013  {
1014  set_error(ValidationError, __LINE__);
1015  return -1;
1016  }
1017  return 0;
1018 }
1019 
1020 int
1021 NdbPack::Spec::validate() const
1022 {
1023  Uint32 nullableCnt = 0;
1024  Uint32 varsizeCnt = 0;
1025  for (Uint32 i = 0; i < m_cnt; i++)
1026  {
1027  const Type& type = m_buf[i];
1028  if (type.validate() == -1)
1029  {
1030  set_error(type);
1031  return -1;
1032  }
1033  if (type.m_nullable)
1034  nullableCnt++;
1035  if (type.m_arrayType != 0)
1036  varsizeCnt++;
1037  }
1038  if (m_nullableCnt != nullableCnt)
1039  {
1040  set_error(ValidationError, __LINE__);
1041  return -1;
1042  }
1043  if (m_varsizeCnt != varsizeCnt)
1044  {
1045  set_error(ValidationError, __LINE__);
1046  return -1;
1047  }
1048  return 0;
1049 }
1050 
1051 int
1052 NdbPack::Data::validate() const
1053 {
1054  if (DataC::validate() == -1)
1055  return -1;
1056  const Iter& r = m_iter;
1057  if (r.m_cnt != m_cnt)
1058  {
1059  set_error(ValidationError, __LINE__);
1060  return -1;
1061  }
1062  Iter r2(*this);
1063  for (Uint32 i = 0; i < m_cnt; i++)
1064  {
1065  if (desc(r2) == -1)
1066  return -1;
1067  }
1068  if (r.m_itemPos != r2.m_itemPos)
1069  {
1070  set_error(ValidationError, __LINE__);
1071  return -1;
1072  }
1073  if (r.m_cnt != r2.m_cnt)
1074  {
1075  set_error(ValidationError, __LINE__);
1076  return -1;
1077  }
1078  if (r.m_nullCnt != r2.m_nullCnt)
1079  {
1080  set_error(ValidationError, __LINE__);
1081  return -1;
1082  }
1083  if (r.m_itemLen != r2.m_itemLen)
1084  {
1085  set_error(ValidationError, __LINE__);
1086  return -1;
1087  }
1088  return 0;
1089 }
1090 
1091 int
1092 NdbPack::BoundC::validate() const
1093 {
1094  if (m_data.validate() == -1)
1095  {
1096  set_error(m_data);
1097  return -1;
1098  }
1099  if (m_data.m_cnt == 0 && m_side != 0)
1100  {
1101  set_error(ValidationError, __LINE__);
1102  return -1;
1103  }
1104  if (m_data.m_cnt != 0 && m_side != -1 && m_side != +1)
1105  {
1106  set_error(ValidationError, __LINE__);
1107  return -1;
1108  }
1109  return 0;
1110 }
1111 
1112 int
1113 NdbPack::Bound::validate() const
1114 {
1115  if (BoundC::validate() == -1)
1116  return -1;
1117  if (m_data.validate() == -1)
1118  {
1119  set_error(m_data);
1120  return -1;
1121  }
1122  return 0;
1123 }
1124 
1125 #ifdef TEST_NDB_PACK
1126 #include <util/NdbTap.hpp>
1127 
1128 #define chk1(x) do { if (x) break; ndbout << "line " << __LINE__ << ": " << #x << endl; require(false); } while (0)
1129 
1130 #define chk2(x, e) do { if (x) break; ndbout << "line " << __LINE__ << ": " << #x << endl; ndbout << "NdbPack code: " << (e).get_error_code() << " line: " << (e).get_error_line() << endl; require(false); } while (0)
1131 
1132 #define ll0(x) do { if (verbose < 0) break; ndbout << "0- " << x << endl; } while (0)
1133 #define ll1(x) do { if (verbose < 1) break; ndbout << "1- " << x << endl; } while (0)
1134 #define ll2(x) do { if (verbose < 2) break; ndbout << "2- " << x << endl; } while (0)
1135 #define ll3(x) do { if (verbose < 3) break; ndbout << "3- " << x << endl; } while (0)
1136 
1137 #define xmin(a, b) ((a) < (b) ? (a) : (b))
1138 
1139 #include <ndb_rand.h>
1140 
1141 static uint // random 0..n-1
1142 getrandom(uint n)
1143 {
1144  if (n != 0) {
1145  uint k = ndb_rand();
1146  return k % n;
1147  }
1148  return 0;
1149 }
1150 
1151 static uint // random 0..n-1 biased exponentially to smaller
1152 getrandom(uint n, uint bias)
1153 {
1154  assert(bias != 0);
1155  uint k = getrandom(n);
1156  bias--;
1157  while (bias != 0) {
1158  k = getrandom(k + 1);
1159  bias--;
1160  }
1161  return k;
1162 }
1163 
1164 static bool
1165 getrandompct(uint pct)
1166 {
1167  return getrandom(100) < pct;
1168 }
1169 
1170 // change in TAPTEST
1171 static int seed = -1; // random
1172 static int loops = 0;
1173 static int spec_cnt = -1; // random
1174 static int fix_type = 0; // all types
1175 static int no_nullable = 0;
1176 static int data_cnt = -1; // Max
1177 static int bound_cnt = -1; // Max
1178 static int verbose = 0;
1179 
1180 struct Tspec {
1181  enum { Max = 100 };
1182  enum { MaxBuf = Max * 4000 };
1183  NdbPack::Spec m_spec;
1184  NdbPack::Type m_type[Max];
1185  Tspec() {
1186  m_spec.set_buf(m_type, Max);
1187  }
1188  void create();
1189 };
1190 
1191 static NdbOut&
1192 operator<<(NdbOut& out, const Tspec& tspec)
1193 {
1194  out << tspec.m_spec;
1195  return out;
1196 }
1197 
1198 void
1199 Tspec::create()
1200 {
1201  m_spec.reset();
1202  int cnt = spec_cnt == -1 ? 1 + getrandom(Tspec::Max, 3) : spec_cnt;
1203  int i = 0;
1204  while (i < cnt) {
1205  int typeId = fix_type;
1206  if (typeId == 0)
1207  typeId = getrandom(g_ndb_pack_type_info_cnt);
1208  const Ndb_pack_type_info& info = g_ndb_pack_type_info[typeId];
1209  switch (typeId) {
1210  case NDB_TYPE_INT:
1211  case NDB_TYPE_UNSIGNED:
1212  case NDB_TYPE_CHAR:
1213  case NDB_TYPE_VARCHAR:
1214  case NDB_TYPE_LONGVARCHAR:
1215  break;
1216  default:
1217  continue;
1218  }
1219  require(info.m_supported);
1220  int byteSize = 0;
1221  if (info.m_fixSize != 0)
1222  byteSize = info.m_fixSize;
1223  else if (info.m_arrayType == 0)
1224  byteSize = 1 + getrandom(128, 1); // char(1-128)
1225  else if (info.m_arrayType == 1)
1226  byteSize = 1 + getrandom(256, 2); // varchar(0-255)
1227  else if (info.m_arrayType == 2)
1228  byteSize = 2 + getrandom(1024, 3); // longvarchar(0-1023)
1229  else
1230  require(false);
1231  bool nullable = no_nullable ? false : getrandompct(50);
1232  int csNumber = 0;
1233  if (info.m_charType) {
1234  csNumber = 8; // should include ascii
1235  }
1236  NdbPack::Type type(typeId, byteSize, nullable, csNumber);
1237  chk2(m_spec.add(type) == 0, m_spec);
1238  i++;
1239  }
1240  chk2(m_spec.validate() == 0, m_spec);
1241 }
1242 
1243 struct Tdata {
1244  const Tspec& m_tspec;
1245  NdbPack::Data m_data;
1246  const bool m_isBound;
1247  int m_cnt;
1248  Uint8* m_xbuf; // unpacked
1249  int m_xsize;
1250  int m_xoff[Tspec::Max];
1251  int m_xlen[Tspec::Max];
1252  bool m_xnull[Tspec::Max];
1253  int m_xnulls;
1254  Uint32* m_poaiBuf; // plain old attr info
1255  int m_poaiSize;
1256  Uint8* m_packBuf; // packed
1257  int m_packLen;
1258  Tdata(Tspec& tspec, bool isBound, uint varBytes) :
1259  m_tspec(tspec),
1260  m_data(tspec.m_spec, isBound, varBytes),
1261  m_isBound(isBound)
1262  {
1263  m_cnt = tspec.m_spec.get_cnt();
1264  m_xbuf = 0;
1265  m_poaiBuf = 0;
1266  m_packBuf = 0;
1267  }
1268  ~Tdata() {
1269  delete [] m_xbuf;
1270  delete [] m_poaiBuf;
1271  delete [] m_packBuf;
1272  }
1273  void create();
1274  void add();
1275  void finalize();
1276  // compare using unpacked data
1277  int xcmp(const Tdata& tdata2, int* num_eq) const;
1278 };
1279 
1280 static NdbOut&
1281 operator<<(NdbOut& out, const Tdata& tdata)
1282 {
1283  out << tdata.m_data;
1284  return out;
1285 }
1286 
1287 void
1288 Tdata::create()
1289 {
1290  union {
1291  Uint8 xbuf[Tspec::MaxBuf];
1292  Uint64 xbuf_align;
1293  };
1294  memset(xbuf, 0x3f, sizeof(xbuf));
1295  m_xsize = 0;
1296  m_xnulls = 0;
1297  Uint32 poaiBuf[Tspec::MaxBuf / 4];
1298  memset(poaiBuf, 0x5f, sizeof(poaiBuf));
1299  m_poaiSize = 0;
1300  m_packLen = m_data.get_var_bytes();
1301  m_packLen += (m_tspec.m_spec.get_nullable_cnt(m_isBound) + 7) / 8;
1302  int i = 0, j;
1303  while (i < m_cnt) {
1304  const NdbPack::Type& type = m_tspec.m_spec.get_type(i);
1305  const int typeId = type.get_type_id();
1306  const Ndb_pack_type_info& info = g_ndb_pack_type_info[typeId];
1307  m_xnull[i] = type.get_nullable() && getrandompct(25);
1308  m_xnull[i] = false;
1309  if (type.get_nullable() || m_isBound)
1310  m_xnull[i] = getrandompct(20);
1311  int pad = 0; // null-char pad not counted in xlen
1312  if (!m_xnull[i]) {
1313  m_xoff[i] = m_xsize;
1314  Uint8* xptr = &xbuf[m_xsize];
1315  switch (typeId) {
1316  case NDB_TYPE_INT:
1317  {
1318  Int32 x = getrandom(10);
1319  if (getrandompct(50))
1320  x = (-1) * x;
1321  memcpy(xptr, &x, 4);
1322  m_xlen[i] = info.m_fixSize;
1323  }
1324  break;
1325  case NDB_TYPE_UNSIGNED:
1326  {
1327  Uint32 x = getrandom(10);
1328  memcpy(xptr, &x, 4);
1329  m_xlen[i] = info.m_fixSize;
1330  }
1331  break;
1332  case NDB_TYPE_CHAR:
1333  {
1334  require(type.get_byte_size() >= 1);
1335  int max_len = type.get_byte_size();
1336  int len = getrandom(max_len + 1, 1);
1337  for (j = 0; j < len; j++)
1338  {
1339  xptr[j] = 'a' + getrandom(3);
1340  }
1341  for (j = len; j < max_len; j++)
1342  {
1343  xptr[j] = 0x20;
1344  }
1345  m_xlen[i] = max_len;
1346  xptr[max_len] = 0;
1347  pad = 1;
1348  }
1349  break;
1350  case NDB_TYPE_VARCHAR:
1351  {
1352  require(type.get_byte_size() >= 1);
1353  int max_len = type.get_byte_size() - 1;
1354  int len = getrandom(max_len, 2);
1355  require(len < 256);
1356  xptr[0] = len;
1357  for (j = 0; j < len; j++)
1358  {
1359  xptr[1 + j] = 'a' + getrandom(3);
1360  }
1361  m_xlen[i] = 1 + len;
1362  xptr[1 + len] = 0;
1363  pad = 1;
1364  }
1365  break;
1366  case NDB_TYPE_LONGVARCHAR:
1367  {
1368  require(type.get_byte_size() >= 2);
1369  int max_len = type.get_byte_size() - 2;
1370  int len = getrandom(max_len, 3);
1371  require(len < 256 * 256);
1372  xptr[0] = (len & 0xFF);
1373  xptr[1] = (len >> 8);
1374  for (j = 0; j < len; j++)
1375  {
1376  xptr[2 + j] = 'a' + getrandom(3);
1377  }
1378  m_xlen[i] = 2 + len;
1379  xptr[2 + len] = 0;
1380  pad = 1;
1381  }
1382  break;
1383  default:
1384  require(false);
1385  break;
1386  }
1387  m_xsize += m_xlen[i] + pad;
1388  while (m_xsize % 8 != 0)
1389  m_xsize++;
1390  m_packLen += m_xlen[i];
1391  } else {
1392  m_xoff[i] = -1;
1393  m_xlen[i] = 0;
1394  m_xnulls++;
1395  }
1396  require(m_xnull[i] == (m_xoff[i] == -1));
1397  require(m_xnull[i] == (m_xlen[i] == 0));
1398  AttributeHeader* ah = (AttributeHeader*)&poaiBuf[m_poaiSize];
1399  ah->setAttributeId(i); // not used
1400  ah->setByteSize(m_xlen[i]);
1401  m_poaiSize++;
1402  if (!m_xnull[i]) {
1403  memcpy(&poaiBuf[m_poaiSize], &xbuf[m_xoff[i]], m_xlen[i]);
1404  m_poaiSize += (m_xlen[i] + 3) / 4;
1405  }
1406  i++;
1407  }
1408  require(m_xsize % 8 == 0);
1409  m_xbuf = (Uint8*) new Uint64 [m_xsize / 8];
1410  memcpy(m_xbuf, xbuf, m_xsize);
1411  m_poaiBuf = (Uint32*) new Uint32 [m_poaiSize];
1412  memcpy(m_poaiBuf, poaiBuf, m_poaiSize << 2);
1413 }
1414 
1415 void
1416 Tdata::add()
1417 {
1418  m_packBuf = new Uint8 [m_packLen];
1419  m_data.set_buf(m_packBuf, m_packLen);
1420  int i, j;
1421  j = 0;
1422  while (j <= 1) {
1423  if (j == 1)
1424  m_data.reset();
1425  i = 0;
1426  while (i < m_cnt) {
1427  Uint32 xlen = ~(Uint32)0;
1428  if (!m_xnull[i]) {
1429  int xoff = m_xoff[i];
1430  const Uint8* xptr = &m_xbuf[xoff];
1431  chk2(m_data.add(xptr, &xlen) == 0, m_data);
1432  chk1((int)xlen == m_xlen[i]);
1433  } else {
1434  chk2(m_data.add_null(&xlen) == 0, m_data);
1435  chk1(xlen == 0);
1436  }
1437  i++;
1438  }
1439  chk2(m_data.validate() == 0, m_data);
1440  chk1((int)m_data.get_null_cnt() == m_xnulls);
1441  j++;
1442  }
1443 }
1444 
1445 void
1446 Tdata::finalize()
1447 {
1448  chk2(m_data.finalize() == 0, m_data);
1449  ll3("create: " << m_data);
1450  chk1((int)m_data.get_full_len() == m_packLen);
1451  {
1452  const Uint8* p = (const Uint8*)m_data.get_full_buf();
1453  chk1(p[0] + (p[1] << 8) == m_packLen - 2);
1454  }
1455 }
1456 
1457 int
1458 Tdata::xcmp(const Tdata& tdata2, int* num_eq) const
1459 {
1460  const Tdata& tdata1 = *this;
1461  require(&tdata1.m_tspec == &tdata2.m_tspec);
1462  const Tspec& tspec = tdata1.m_tspec;
1463  int res = 0;
1464  int cnt = xmin(tdata1.m_cnt, tdata2.m_cnt);
1465  int i;
1466  for (i = 0; i < cnt; i++) {
1467  if (!tdata1.m_xnull[i]) {
1468  if (!tdata2.m_xnull[i]) {
1469  // the pointers are Uint64-aligned
1470  const Uint8* xptr1 = &tdata1.m_xbuf[tdata1.m_xoff[i]];
1471  const Uint8* xptr2 = &tdata2.m_xbuf[tdata2.m_xoff[i]];
1472  const int xlen1 = tdata1.m_xlen[i];
1473  const int xlen2 = tdata2.m_xlen[i];
1474  const NdbPack::Type& type = tspec.m_spec.get_type(i);
1475  const int typeId = type.get_type_id();
1476  const int csNumber = type.get_cs_number();
1477  CHARSET_INFO* cs = all_charsets[csNumber];
1478  switch (typeId) {
1479  case NDB_TYPE_INT:
1480  {
1481  require(cs == 0);
1482  Int32 x1 = *(const Int32*)xptr1;
1483  Int32 x2 = *(const Int32*)xptr2;
1484  if (x1 < x2)
1485  res = -1;
1486  else if (x1 > x2)
1487  res = +1;
1488  ll3("cmp res:" << res <<" x1:" << x1 << " x2:" << x2);
1489  }
1490  break;
1491  case NDB_TYPE_UNSIGNED:
1492  {
1493  require(cs == 0);
1494  Uint32 x1 = *(const Uint32*)xptr1;
1495  Uint32 x2 = *(const Uint32*)xptr2;
1496  if (x1 < x2)
1497  res = -1;
1498  else if (x1 > x2)
1499  res = +1;
1500  ll3("cmp res:" << res <<" x1:" << x1 << " x2:" << x2);
1501  }
1502  break;
1503  case NDB_TYPE_CHAR:
1504  {
1505  require(cs != 0 && cs->coll != 0);
1506  const uint n1 = xlen1;
1507  const uint n2 = xlen2;
1508  const uchar* t1 = &xptr1[0];
1509  const uchar* t2 = &xptr2[0];
1510  const char* s1 = (const char*)t1;
1511  const char* s2 = (const char*)t2;
1512  chk1(n1 == strlen(s1));
1513  chk1(n2 == strlen(s2));
1514  res = (*cs->coll->strnncollsp)(cs, t1, n1, t2, n2, false);
1515  ll3("cmp res:" << res <<" s1:" << s1 << " s2:" << s2);
1516  }
1517  break;
1518  case NDB_TYPE_VARCHAR:
1519  {
1520  require(cs != 0 && cs->coll != 0);
1521  const uint n1 = xptr1[0];
1522  const uint n2 = xptr2[0];
1523  const uchar* t1 = &xptr1[1];
1524  const uchar* t2 = &xptr2[1];
1525  const char* s1 = (const char*)t1;
1526  const char* s2 = (const char*)t2;
1527  chk1(n1 == strlen(s1));
1528  chk1(n2 == strlen(s2));
1529  res = (*cs->coll->strnncollsp)(cs, t1, n1, t2, n2, false);
1530  ll3("cmp res:" << res <<" s1:" << s1 << " s2:" << s2);
1531  }
1532  break;
1533  case NDB_TYPE_LONGVARCHAR:
1534  {
1535  require(cs != 0 && cs->coll != 0);
1536  const uint n1 = xptr1[0] | (xptr1[1] << 8);
1537  const uint n2 = xptr2[0] | (xptr2[1] << 8);
1538  const uchar* t1 = &xptr1[2];
1539  const uchar* t2 = &xptr2[2];
1540  const char* s1 = (const char*)t1;
1541  const char* s2 = (const char*)t2;
1542  chk1(n1 == strlen(s1));
1543  chk1(n2 == strlen(s2));
1544  res = (*cs->coll->strnncollsp)(cs, t1, n1, t2, n2, false);
1545  ll3("cmp res:" << res <<" s1:" << s1 << " s2:" << s2);
1546  }
1547  break;
1548  default:
1549  require(false);
1550  break;
1551  }
1552  } else
1553  res = +1;
1554  } else if (!tdata2.m_xnull[i])
1555  res = -1;
1556  if (res != 0)
1557  break;
1558  }
1559  *num_eq = i;
1560  ll3("xcmp res:" << res << " num_eq:" << *num_eq);
1561  return res;
1562 }
1563 
1564 struct Tbound {
1565  Tdata& m_tdata;
1566  NdbPack::Bound m_bound;
1567  Tbound(Tdata& tdata) :
1568  m_tdata(tdata),
1569  m_bound(tdata.m_data)
1570  {
1571  m_tdata.m_cnt = 1 + getrandom(m_tdata.m_cnt);
1572  }
1573  void create();
1574  void add();
1575  void finalize();
1576  int xcmp(const Tdata& tdata2, int* num_eq) const;
1577  int xcmp(const Tbound& tbound2, int* num_eq) const;
1578 };
1579 
1580 static NdbOut&
1581 operator<<(NdbOut& out, const Tbound& tbound)
1582 {
1583  out << tbound.m_bound;
1584  return out;
1585 }
1586 
1587 void
1588 Tbound::create()
1589 {
1590  m_tdata.create();
1591 }
1592 
1593 void
1594 Tbound::add()
1595 {
1596  m_tdata.add();
1597 }
1598 
1599 void
1600 Tbound::finalize()
1601 {
1602  int side = getrandompct(50) ? -1 : +1;
1603  chk2(m_bound.finalize(side) == 0, m_bound);
1604  chk2(m_bound.validate() == 0, m_bound);
1605  chk1((int)m_tdata.m_data.get_full_len() == m_tdata.m_packLen);
1606 }
1607 
1608 int
1609 Tbound::xcmp(const Tdata& tdata2, int* num_eq) const
1610 {
1611  const Tbound& tbound1 = *this;
1612  const Tdata& tdata1 = tbound1.m_tdata;
1613  require(tdata1.m_cnt <= tdata2.m_cnt);
1614  *num_eq = -1;
1615  int res = tdata1.xcmp(tdata2, num_eq);
1616  if (res == 0) {
1617  chk1(*num_eq == tdata1.m_cnt);
1618  res = m_bound.get_side();
1619  }
1620  return res;
1621 }
1622 
1623 int
1624 Tbound::xcmp(const Tbound& tbound2, int* num_eq) const
1625 {
1626  const Tbound& tbound1 = *this;
1627  const Tdata& tdata1 = tbound1.m_tdata;
1628  const Tdata& tdata2 = tbound2.m_tdata;
1629  *num_eq = -1;
1630  int res = tdata1.xcmp(tdata2, num_eq);
1631  chk1(0 <= *num_eq && *num_eq <= xmin(tdata1.m_cnt, tdata2.m_cnt));
1632  if (res == 0) {
1633  chk1(*num_eq == xmin(tdata1.m_cnt, tdata2.m_cnt));
1634  if (tdata1.m_cnt < tdata2.m_cnt)
1635  res = (+1) * tbound1.m_bound.get_side();
1636  else if (tdata1.m_cnt > tdata2.m_cnt)
1637  res = (-1) * tbound2.m_bound.get_side();
1638  else if (tbound1.m_bound.get_side() < tbound2.m_bound.get_side())
1639  res = -1;
1640  else if (tbound1.m_bound.get_side() > tbound2.m_bound.get_side())
1641  res = +1;
1642  }
1643  return res;
1644 }
1645 
1646 struct Tdatalist {
1647  enum { Max = 1000 };
1648  Tdata* m_tdata[Max];
1649  int m_cnt;
1650  Tdatalist(Tspec& tspec) {
1651  m_cnt = data_cnt == -1 ? Max : data_cnt;
1652  int i;
1653  for (i = 0; i < m_cnt; i++) {
1654  m_tdata[i] = new Tdata(tspec, false, 2);
1655  }
1656  }
1657  ~Tdatalist() {
1658  int i;
1659  for (i = 0; i < m_cnt; i++) {
1660  delete m_tdata[i];
1661  }
1662  }
1663  void create();
1664  void sort();
1665 };
1666 
1667 static NdbOut&
1668 operator<<(NdbOut& out, const Tdatalist& tdatalist)
1669 {
1670  int i;
1671  for (i = 0; i < tdatalist.m_cnt; i++) {
1672  out << "data " << i << ": " << *tdatalist.m_tdata[i];
1673  if (i + 1 < tdatalist.m_cnt)
1674  out << endl;
1675  }
1676  return out;
1677 }
1678 
1679 void
1680 Tdatalist::create()
1681 {
1682  int i;
1683  for (i = 0; i < m_cnt; i++) {
1684  Tdata& tdata = *m_tdata[i];
1685  tdata.create();
1686  tdata.add();
1687  tdata.finalize();
1688  }
1689 }
1690 
1691 static int
1692 data_cmp(const void* a1, const void* a2)
1693 {
1694  const Tdata& tdata1 = **(const Tdata**)a1;
1695  const Tdata& tdata2 = **(const Tdata**)a2;
1696  require(tdata1.m_cnt == tdata2.m_cnt);
1697  const Uint32 cnt = tdata1.m_cnt;
1698  Uint32 num_eq = ~(Uint32)0;
1699  int res = tdata1.m_data.cmp(tdata2.m_data, cnt, num_eq);
1700  require(num_eq <= (Uint32)tdata1.m_cnt);
1701  require(num_eq <= (Uint32)tdata2.m_cnt);
1702  return res;
1703 }
1704 
1705 void
1706 Tdatalist::sort()
1707 {
1708  ll1("data sort: in");
1709  ll3(endl << *this);
1710  qsort(m_tdata, m_cnt, sizeof(Tdata*), data_cmp);
1711  ll1("data sort: out");
1712  ll3(endl << *this);
1713  int i;
1714  for (i = 0; i + 1 < m_cnt; i++) {
1715  const Tdata& tdata1 = *m_tdata[i];
1716  const Tdata& tdata2 = *m_tdata[i + 1];
1717  require(tdata1.m_cnt == tdata2.m_cnt);
1718  const Uint32 cnt = tdata1.m_cnt;
1719  Uint32 num_eq1 = ~(Uint32)0;
1720  int res = tdata1.m_data.cmp(tdata2.m_data, cnt, num_eq1);
1721  chk1(res <= 0);
1722  // also via unpacked data
1723  int num_eq2 = -1;
1724  int res2 = tdata1.xcmp(tdata2, &num_eq2);
1725  if (res < 0)
1726  chk1(res2 < 0);
1727  else if (res == 0)
1728  chk1(res2 == 0);
1729  else
1730  chk1(res2 > 0);
1731  chk1(num_eq1 == (Uint32)num_eq2);
1732  }
1733 }
1734 
1735 struct Tboundlist {
1736  enum { Max = 1000 };
1737  Tbound* m_tbound[Max];
1738  int m_cnt;
1739  Tboundlist(Tspec& tspec) {
1740  m_cnt = bound_cnt == -1 ? Max : bound_cnt;
1741  int i;
1742  for (i = 0; i < m_cnt; i++) {
1743  Tdata* tdata = new Tdata(tspec, true, 0);
1744  m_tbound[i] = new Tbound(*tdata);
1745  }
1746  }
1747  ~Tboundlist() {
1748  int i;
1749  for (i = 0; i < m_cnt; i++) {
1750  Tdata* tdata = &m_tbound[i]->m_tdata;
1751  delete m_tbound[i];
1752  delete tdata;
1753  }
1754  }
1755  void create();
1756  void sort();
1757 };
1758 
1759 static NdbOut&
1760 operator<<(NdbOut& out, const Tboundlist& tboundlist)
1761 {
1762  int i;
1763  for (i = 0; i < tboundlist.m_cnt; i++) {
1764  out << "bound " << i << ": " << *tboundlist.m_tbound[i];
1765  if (i + 1 < tboundlist.m_cnt)
1766  out << endl;
1767  }
1768  return out;
1769 }
1770 
1771 void
1772 Tboundlist::create()
1773 {
1774  int i;
1775  for (i = 0; i < m_cnt; i++) {
1776  Tbound& tbound = *m_tbound[i];
1777  tbound.create();
1778  tbound.add();
1779  tbound.finalize();
1780  }
1781 }
1782 
1783 static int
1784 bound_cmp(const void* a1, const void* a2)
1785 {
1786  const Tbound& tbound1 = **(const Tbound**)a1;
1787  const Tbound& tbound2 = **(const Tbound**)a2;
1788  const Uint32 cnt = xmin(tbound1.m_tdata.m_cnt, tbound2.m_tdata.m_cnt);
1789  Uint32 num_eq = ~(Uint32)0;
1790  int res = tbound1.m_bound.cmp(tbound2.m_bound, cnt, num_eq);
1791  require(num_eq <= cnt);
1792  require(num_eq <= cnt);
1793  return res;
1794 }
1795 
1796 void
1797 Tboundlist::sort()
1798 {
1799  ll1("bound sort: in");
1800  ll3(endl << *this);
1801  qsort(m_tbound, m_cnt, sizeof(Tbound*), bound_cmp);
1802  ll1("bound sort: out");
1803  ll3(endl << *this);
1804  int i;
1805  for (i = 0; i + 1 < m_cnt; i++) {
1806  const Tbound& tbound1 = *m_tbound[i];
1807  const Tbound& tbound2 = *m_tbound[i + 1];
1808  const Uint32 cnt = xmin(tbound1.m_tdata.m_cnt, tbound2.m_tdata.m_cnt);
1809  Uint32 num_eq1 = ~(Uint32)0;
1810  int res = tbound1.m_bound.cmp(tbound2.m_bound, cnt, num_eq1);
1811  chk1(res <= 0);
1812  // also via unpacked data
1813  int num_eq2 = -1;
1814  int res2 = tbound1.xcmp(tbound2, &num_eq2);
1815  if (res < 0)
1816  chk1(res2 < 0);
1817  else if (res == 0)
1818  chk1(res2 == 0);
1819  else
1820  chk1(res2 > 0);
1821  chk1(num_eq1 == (Uint32)num_eq2);
1822  }
1823 }
1824 
1825 static void
1826 testdesc(const Tdata& tdata)
1827 {
1828  ll3("testdesc: " << tdata);
1829  const Tspec& tspec = tdata.m_tspec;
1830  const NdbPack::Data& data = tdata.m_data;
1831  const Uint8* buf_old = (const Uint8*)data.get_full_buf();
1832  const Uint32 varBytes = data.get_var_bytes();
1833  const Uint32 nullMaskLen = tspec.m_spec.get_nullmask_len(false);
1834  const Uint32 dataLen = data.get_data_len();
1835  const Uint32 fullLen = data.get_full_len();
1836  const Uint32 cnt = data.get_cnt();
1837  chk1(fullLen == varBytes + dataLen);
1838  NdbPack::Data data_new(tspec.m_spec, false, varBytes);
1839  Uint8 buf_new[Tspec::MaxBuf];
1840  data_new.set_buf(buf_new, sizeof(buf_new));
1841  memcpy(buf_new, buf_old, fullLen);
1842  chk2(data_new.desc_all(cnt, NdbPack::Endian::Native) == 0, data_new);
1843  chk1(memcmp(buf_new, data.get_full_buf(), data.get_full_len()) == 0);
1844  chk1(data_new.get_data_len() == data.get_data_len());
1845  chk1(data_new.get_cnt() == data.get_cnt());
1846  chk1(data_new.get_null_cnt() == data.get_null_cnt());
1847 }
1848 
1849 static void
1850 testcopy(const Tdata& tdata)
1851 {
1852  ll3("testcopy: " << tdata);
1853  const Tspec& tspec = tdata.m_tspec;
1854  const NdbPack::Data& data = tdata.m_data;
1855  uint n = getrandom(tdata.m_cnt + 1);
1856  do {
1857  ll3("testcopy: cnt:" << tdata.m_cnt << " n:" << n);
1858  NdbPack::DataC data_old(tspec.m_spec, false);
1859  data_old.set_buf(data.get_data_buf(), data.get_data_len(), n);
1860  chk1(data_old.get_cnt() == n);
1861  NdbPack::Data data_new(tspec.m_spec, false, 0);
1862  Uint8 buf_new[Tspec::MaxBuf];
1863  data_new.set_buf(buf_new, sizeof(buf_new));
1864  chk2(data_new.copy(data_old) == 0, data_new);
1865  chk1(data_new.get_cnt() == n);
1866  Uint32 num_eq1 = ~(Uint32)0;
1867  chk1(data_new.cmp(data_old, n, num_eq1) == 0);
1868  chk1(num_eq1 == n);
1869  Uint32 num_eq2 = ~(Uint32)0;
1870  chk1(data_old.cmp(data_new, n, num_eq2) == 0);
1871  chk1(num_eq2 == n);
1872  n = getrandom(n);
1873  } while (n != 0);
1874 }
1875 
1876 static void
1877 testpoai(const Tdata& tdata)
1878 {
1879  ll3("testpoai: " << tdata);
1880  const Tspec& tspec = tdata.m_tspec;
1881  const NdbPack::Data& data = tdata.m_data;
1882  NdbPack::Data data_new(tspec.m_spec, false, data.get_var_bytes());
1883  Uint8 buf_new[Tspec::MaxBuf];
1884  data_new.set_buf(buf_new, sizeof(buf_new));
1885  Uint32 poaiLen = ~(Uint32)0;
1886  chk2(data_new.add_poai(tdata.m_poaiBuf, tdata.m_cnt, &poaiLen) == 0, data);
1887  chk2(data_new.finalize() == 0, data_new);
1888  chk2(data_new.validate() == 0, data_new);
1889  chk1(tspec.m_spec.get_nullmask_len(false) + poaiLen == data.get_data_len());
1890  chk1(data_new.get_full_len() == data.get_full_len());
1891  chk1(memcmp(data_new.get_full_buf(), data.get_full_buf(), data.get_full_len()) == 0);
1892  chk1(data_new.get_null_cnt() == data.get_null_cnt());
1893 }
1894 
1895 static void
1896 testconvert(const Tdata& tdata)
1897 {
1898  ll3("testconvert: " << tdata);
1899  const Tspec& tspec = tdata.m_tspec;
1900  const NdbPack::Data& data = tdata.m_data;
1901  NdbPack::Data data_new(tspec.m_spec, false, 2);
1902  Uint8 buf_new[Tspec::MaxBuf];
1903  data_new.set_buf(buf_new, sizeof(buf_new));
1904  chk2(data_new.copy(data) == 0, data_new);
1905  require(tdata.m_cnt == (int)data.get_cnt());
1906  require(data.get_cnt() == data_new.get_cnt());
1907  const Uint32 cnt = tdata.m_cnt;
1908  Uint32 num_eq;
1909  int i;
1910  for (i = 0; i < 10; i++) {
1911  int k = getrandom(3); // assumes Endian::Value 0,1,2
1912  NdbPack::Endian::Value v = (NdbPack::Endian::Value)k;
1913  chk2(data_new.convert(v) == 0, data_new);
1914  if (v == NdbPack::Endian::Native ||
1915  v == NdbPack::Endian::get_endian()) {
1916  num_eq = ~(Uint32)0;
1917  chk1(data.cmp(data_new, cnt, num_eq) == 0);
1918  require(num_eq == cnt);
1919  }
1920  }
1921 }
1922 
1923 static void
1924 testdata(const Tdatalist& tdatalist)
1925 {
1926  int i;
1927  for (i = 0; i < tdatalist.m_cnt; i++) {
1928  const Tdata& tdata = *tdatalist.m_tdata[i];
1929  testdesc(tdata);
1930  testcopy(tdata);
1931  testpoai(tdata);
1932  testconvert(tdata);
1933  }
1934 }
1935 
1936 static void
1937 testcmp(const Tbound& tbound, const Tdatalist& tdatalist, int* kb)
1938 {
1939  ll3("testcmp: " << tbound);
1940  int oldres = 0;
1941  int n1 = 0;
1942  int n2 = 0;
1943  int i;
1944  for (i = 0; i < tdatalist.m_cnt; i++) {
1945  const Tdata& tdata = *tdatalist.m_tdata[i];
1946  require(tbound.m_tdata.m_cnt == (int)tbound.m_bound.get_data().get_cnt());
1947  const Uint32 cnt = tbound.m_tdata.m_cnt;
1948  Uint32 num_eq1 = ~(Uint32)0;
1949  // reverse result for key vs bound
1950  int res = (-1) * tbound.m_bound.cmp(tdata.m_data, cnt, num_eq1);
1951  chk1(res != 0);
1952  res = (res < 0 ? (n1++, -1) : (n2++, +1));
1953  if (i > 0) {
1954  // at some point flips from -1 to +1
1955  chk1(oldres <= res);
1956  }
1957  oldres = res;
1958  // also via unpacked data
1959  int num_eq2 = -1;
1960  int res2 = (-1) * tbound.xcmp(tdata, &num_eq2);
1961  if (res < 0)
1962  chk1(res2 < 0);
1963  else
1964  chk1(res2 > 0);
1965  chk1(num_eq1 == (Uint32)num_eq2);
1966  }
1967  require(n1 + n2 == tdatalist.m_cnt);
1968  ll2("keys before:" << n1 << " after:" << n2);
1969  *kb = n1;
1970 }
1971 
1972 static void
1973 testcmp(const Tboundlist& tboundlist, const Tdatalist& tdatalist)
1974 {
1975  int i;
1976  int oldkb = 0;
1977  for (i = 0; i < tboundlist.m_cnt; i++) {
1978  const Tbound& tbound = *tboundlist.m_tbound[i];
1979  int kb = 0;
1980  testcmp(tbound, tdatalist, &kb);
1981  if (i > 0) {
1982  chk1(oldkb <= kb);
1983  }
1984  oldkb = kb;
1985  }
1986 }
1987 
1988 static void
1989 testrun()
1990 {
1991  Tspec tspec;
1992  tspec.create();
1993  ll1("spec: " << tspec);
1994  Tdatalist tdatalist(tspec);
1995  tdatalist.create();
1996  tdatalist.sort();
1997  testdata(tdatalist);
1998  if (bound_cnt != 0) {
1999  Tboundlist tboundlist(tspec);
2000  tboundlist.create();
2001  tboundlist.sort();
2002  testcmp(tboundlist, tdatalist);
2003  }
2004 }
2005 
2006 extern void NdbOut_Init();
2007 
2008 static int
2009 testmain()
2010 {
2011  my_init();
2012  NdbOut_Init();
2013  signal(SIGABRT, SIG_DFL);
2014  { const char* p = NdbEnv_GetEnv("TEST_NDB_PACK_VERBOSE", (char*)0, 0);
2015  if (p != 0)
2016  verbose = atoi(p);
2017  }
2018  if (seed == 0)
2019  ll0("random seed: loop number");
2020  else {
2021  if (seed < 0)
2022  seed = getpid();
2023  ll0("random seed: " << seed);
2024  ndb_srand(seed);
2025  }
2026  loops = 100;
2027  int i;
2028  for (i = 0; loops == 0 || i < loops; i++) {
2029  ll0("loop:" << i << "/" << loops);
2030  if (seed == 0)
2031  ndb_srand(i);
2032  testrun();
2033  }
2034  // do not print "ok" in TAPTEST
2035  ndbout << "passed" << endl;
2036  return 0;
2037 }
2038 
2039 TAPTEST(NdbPack)
2040 {
2041  int ret = testmain();
2042  return (ret == 0);
2043 }
2044 
2045 #endif