MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
NdbReceiver.cpp
1 /*
2  Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 #include "API.hpp"
19 #include <AttributeHeader.hpp>
20 #include <signaldata/TcKeyConf.hpp>
21 #include <signaldata/DictTabInfo.hpp>
22 
23 NdbReceiver::NdbReceiver(Ndb *aNdb) :
24  theMagicNumber(0),
25  m_ndb(aNdb),
26  m_id(NdbObjectIdMap::InvalidId),
27  m_tcPtrI(RNIL),
28  m_type(NDB_UNINITIALIZED),
29  m_owner(0),
30  m_using_ndb_record(false),
31  theFirstRecAttr(NULL),
32  theCurrentRecAttr(NULL),
33  m_rows(NULL),
34  m_current_row(0xffffffff),
35  m_result_rows(0)
36 {}
37 
38 NdbReceiver::~NdbReceiver()
39 {
40  DBUG_ENTER("NdbReceiver::~NdbReceiver");
41  if (m_id != NdbObjectIdMap::InvalidId) {
42  m_ndb->theImpl->theNdbObjectIdMap.unmap(m_id, this);
43  }
44  delete[] m_rows;
45  DBUG_VOID_RETURN;
46 }
47 
48 int
49 NdbReceiver::init(ReceiverType type, bool useRec, void* owner)
50 {
51  theMagicNumber = 0x11223344;
52  m_type = type;
53  m_using_ndb_record= useRec;
54  m_owner = owner;
55 
56  if (useRec)
57  {
58  m_record.m_ndb_record= NULL;
59  m_record.m_row_recv= NULL;
60  m_record.m_row_buffer= NULL;
61  m_record.m_row_offset= 0;
62  m_record.m_read_range_no= false;
63  }
64  theFirstRecAttr = NULL;
65  theCurrentRecAttr = NULL;
66 
67  if (m_id == NdbObjectIdMap::InvalidId) {
68  if (m_ndb)
69  {
70  m_id = m_ndb->theImpl->theNdbObjectIdMap.map(this);
71  if (m_id == NdbObjectIdMap::InvalidId)
72  {
73  setErrorCode(4000);
74  return -1;
75  }
76  }
77  }
78 
79  return 0;
80 }
81 
82 void
84  theMagicNumber = 0;
85  NdbRecAttr* tRecAttr = theFirstRecAttr;
86  while (tRecAttr != NULL)
87  {
88  NdbRecAttr* tSaveRecAttr = tRecAttr;
89  tRecAttr = tRecAttr->next();
90  m_ndb->releaseRecAttr(tSaveRecAttr);
91  }
92  m_using_ndb_record= false;
93  theFirstRecAttr = NULL;
94  theCurrentRecAttr = NULL;
95 }
96 
97 NdbRecAttr *
98 NdbReceiver::getValue(const NdbColumnImpl* tAttrInfo, char * user_dst_ptr){
99  NdbRecAttr* tRecAttr = m_ndb->getRecAttr();
100  if(tRecAttr && !tRecAttr->setup(tAttrInfo, user_dst_ptr)){
101  if (theFirstRecAttr == NULL)
102  theFirstRecAttr = tRecAttr;
103  else
104  theCurrentRecAttr->next(tRecAttr);
105  theCurrentRecAttr = tRecAttr;
106  tRecAttr->next(NULL);
107  return tRecAttr;
108  }
109  if(tRecAttr){
110  m_ndb->releaseRecAttr(tRecAttr);
111  }
112  return 0;
113 }
114 
115 void
116 NdbReceiver::getValues(const NdbRecord* rec, char *row_ptr)
117 {
118  assert(m_using_ndb_record);
119 
120  m_record.m_ndb_record= rec;
121  m_record.m_row_recv= row_ptr;
122  m_record.m_row_offset= rec->m_row_size;
123 }
124 
125 void
126 NdbReceiver::prepareReceive(char *buf)
127 {
128  /* Set pointers etc. to prepare for receiving the first row of the batch. */
129  assert(theMagicNumber == 0x11223344);
130  m_received_result_length = 0;
131  m_expected_result_length = 0;
132  if (m_using_ndb_record)
133  {
134  m_record.m_row_recv= buf;
135  }
136  theCurrentRecAttr = theFirstRecAttr;
137 }
138 
139 void
140 NdbReceiver::prepareRead(char *buf, Uint32 rows)
141 {
142  /* Set pointers etc. to prepare for reading the first row of the batch. */
143  assert(theMagicNumber == 0x11223344);
144  m_current_row = 0;
145  m_result_rows = rows;
146  if (m_using_ndb_record)
147  {
148  m_record.m_row_buffer = buf;
149  }
150 }
151 
152  #define KEY_ATTR_ID (~(Uint32)0)
153 
154 /*
155  Compute the batch size (rows between each NEXT_TABREQ / SCAN_TABCONF) to
156  use, taking into account limits in the transporter, user preference, etc.
157 
158  Hm, there are some magic overhead numbers (4 bytes/attr, 32 bytes/row) here,
159  would be nice with some explanation on how these numbers were derived.
160 
161  TODO : Check whether these numbers need to be revised w.r.t. read packed
162 */
163 //static
164 void
165 NdbReceiver::calculate_batch_size(const NdbImpl& theImpl,
166  const NdbRecord *record,
167  const NdbRecAttr *first_rec_attr,
168  Uint32 key_size,
169  Uint32 parallelism,
170  Uint32& batch_size,
171  Uint32& batch_byte_size,
172  Uint32& first_batch_size)
173 {
174  const NdbApiConfig & cfg = theImpl.get_ndbapi_config_parameters();
175  const Uint32 max_scan_batch_size= cfg.m_scan_batch_size;
176  const Uint32 max_batch_byte_size= cfg.m_batch_byte_size;
177  const Uint32 max_batch_size= cfg.m_batch_size;
178 
179  Uint32 tot_size= (key_size ? (key_size + 32) : 0); //key + signal overhead
180  if (record)
181  {
182  tot_size+= record->m_max_transid_ai_bytes;
183  }
184 
185  const NdbRecAttr *rec_attr= first_rec_attr;
186  while (rec_attr != NULL) {
187  Uint32 attr_size= rec_attr->getColumn()->getSizeInBytes();
188  attr_size= ((attr_size + 4 + 3) >> 2) << 2; //Even to word + overhead
189  tot_size+= attr_size;
190  rec_attr= rec_attr->next();
191  }
192 
193  tot_size+= 32; //include signal overhead
194 
201  if (batch_size == 0)
202  {
203  batch_byte_size= max_batch_byte_size;
204  }
205  else
206  {
207  batch_byte_size= batch_size * tot_size;
208  }
209 
210  if (batch_byte_size * parallelism > max_scan_batch_size) {
211  batch_byte_size= max_scan_batch_size / parallelism;
212  }
213  batch_size= batch_byte_size / tot_size;
214  if (batch_size == 0) {
215  batch_size= 1;
216  } else {
217  if (batch_size > max_batch_size) {
218  batch_size= max_batch_size;
219  } else if (batch_size > MAX_PARALLEL_OP_PER_SCAN) {
220  batch_size= MAX_PARALLEL_OP_PER_SCAN;
221  }
222  }
223  first_batch_size= batch_size;
224  return;
225 }
226 
227 void
228 NdbReceiver::calculate_batch_size(Uint32 key_size,
229  Uint32 parallelism,
230  Uint32& batch_size,
231  Uint32& batch_byte_size,
232  Uint32& first_batch_size,
233  const NdbRecord *record) const
234 {
235  calculate_batch_size(* m_ndb->theImpl,
236  record,
237  theFirstRecAttr,
238  key_size, parallelism, batch_size, batch_byte_size,
239  first_batch_size);
240 }
241 
242 void
243 NdbReceiver::do_setup_ndbrecord(const NdbRecord *ndb_record, Uint32 batch_size,
244  Uint32 key_size, Uint32 read_range_no,
245  Uint32 rowsize, char *row_buffer)
246 {
247  m_using_ndb_record= true;
248  m_record.m_ndb_record= ndb_record;
249  m_record.m_row_recv= row_buffer;
250  m_record.m_row_buffer= row_buffer;
251  m_record.m_row_offset= rowsize;
252  m_record.m_read_range_no= read_range_no;
253 }
254 
255 //static
256 Uint32
257 NdbReceiver::ndbrecord_rowsize(const NdbRecord *ndb_record,
258  const NdbRecAttr *first_rec_attr,
259  Uint32 key_size,
260  bool read_range_no)
261 {
262  Uint32 rowsize= (ndb_record) ? ndb_record->m_row_size : 0;
263 
264  /* Room for range_no. */
265  if (read_range_no)
266  rowsize+= 4;
267  /*
268  If keyinfo, need room for max. key + 4 bytes of actual key length + 4
269  bytes of scan info (all from KEYINFO20 signal).
270  */
271  if (key_size)
272  rowsize+= 8 + key_size*4;
273  /*
274  Compute extra space needed to buffer getValue() results in NdbRecord
275  scans.
276  */
277  const NdbRecAttr *ra= first_rec_attr;
278  while (ra != NULL)
279  {
280  rowsize+= sizeof(Uint32) + ra->getColumn()->getSizeInBytes();
281  ra= ra->next();
282  }
283  /* Ensure 4-byte alignment. */
284  rowsize= (rowsize+3) & 0xfffffffc;
285  return rowsize;
286 }
287 
303 static
304 inline
305 const Uint8*
306 pad(const Uint8* src, Uint32 align, Uint32 bitPos)
307 {
308  UintPtr ptr = UintPtr(src);
309  switch(align){
310  case DictTabInfo::aBit:
311  case DictTabInfo::a32Bit:
312  case DictTabInfo::a64Bit:
313  case DictTabInfo::a128Bit:
314  return (Uint8*)(((ptr + 3) & ~(UintPtr)3) + 4 * ((bitPos + 31) >> 5));
315 charpad:
316  case DictTabInfo::an8Bit:
317  case DictTabInfo::a16Bit:
318  return src + 4 * ((bitPos + 31) >> 5);
319  default:
320 #ifdef VM_TRACE
321  abort();
322 #endif
323  goto charpad;
324  }
325 }
326 
332 static
333 void
334 handle_packed_bit(const char* _src, Uint32 pos, Uint32 len, char* _dst)
335 {
336  Uint32 * src = (Uint32*)_src;
337  assert((UintPtr(src) & 3) == 0);
338 
339  /* Convert char* to aligned Uint32* and some byte offset */
340  UintPtr uiPtr= UintPtr((Uint32*)_dst);
341  Uint32 dstByteOffset= Uint32(uiPtr) & 3;
342  Uint32* dst= (Uint32*) (uiPtr - dstByteOffset);
343 
344  BitmaskImpl::copyField(dst, dstByteOffset << 3,
345  src, pos, len);
346 }
347 
348 
355 Uint32
356 NdbReceiver::receive_packed_recattr(NdbRecAttr** recAttr,
357  Uint32 bmlen,
358  const Uint32* aDataPtr,
359  Uint32 aLength)
360 {
361  NdbRecAttr* currRecAttr = *recAttr;
362  const Uint8 *src = (Uint8*)(aDataPtr + bmlen);
363  Uint32 bitPos = 0;
364  for (Uint32 i = 0, attrId = 0; i<32*bmlen; i++, attrId++)
365  {
366  if (BitmaskImpl::get(bmlen, aDataPtr, i))
367  {
368  const NdbColumnImpl & col =
369  NdbColumnImpl::getImpl(* currRecAttr->getColumn());
370  if (unlikely(attrId != (Uint32)col.m_attrId))
371  goto err;
372  if (col.m_nullable)
373  {
374  if (BitmaskImpl::get(bmlen, aDataPtr, ++i))
375  {
376  currRecAttr->setNULL();
377  currRecAttr = currRecAttr->next();
378  continue;
379  }
380  }
381  Uint32 align = col.m_orgAttrSize;
382  Uint32 attrSize = col.m_attrSize;
383  Uint32 array = col.m_arraySize;
384  Uint32 len = col.m_length;
385  Uint32 sz = attrSize * array;
386  Uint32 arrayType = col.m_arrayType;
387 
388  switch(align){
389  case DictTabInfo::aBit: // Bit
390  src = pad(src, 0, 0);
391  handle_packed_bit((const char*)src, bitPos, len,
392  currRecAttr->aRef());
393  src += 4 * ((bitPos + len) >> 5);
394  bitPos = (bitPos + len) & 31;
395  goto next;
396  default:
397  src = pad(src, align, bitPos);
398  }
399  switch(arrayType){
400  case NDB_ARRAYTYPE_FIXED:
401  break;
402  case NDB_ARRAYTYPE_SHORT_VAR:
403  sz = 1 + src[0];
404  break;
405  case NDB_ARRAYTYPE_MEDIUM_VAR:
406  sz = 2 + src[0] + 256 * src[1];
407  break;
408  default:
409  goto err;
410  }
411 
412  bitPos = 0;
413  currRecAttr->receive_data((Uint32*)src, sz);
414  src += sz;
415  next:
416  currRecAttr = currRecAttr->next();
417  }
418  }
419  * recAttr = currRecAttr;
420  return (Uint32)(((Uint32*)pad(src, 0, bitPos)) - aDataPtr);
421 
422 err:
423  abort();
424  return 0;
425 }
426 
427 
428 /* Set NdbRecord field to non-NULL value. */
429 static void assignToRec(const NdbRecord::Attr *col,
430  char *row,
431  const Uint8 *src,
432  Uint32 byteSize)
433 {
434  /* Set NULLable attribute to "not NULL". */
435  if (col->flags & NdbRecord::IsNullable)
436  row[col->nullbit_byte_offset]&= ~(1 << col->nullbit_bit_in_byte);
437 
438  memcpy(&row[col->offset], src, byteSize);
439 }
440 
441 /* Set NdbRecord field to NULL. */
442 static void setRecToNULL(const NdbRecord::Attr *col,
443  char *row)
444 {
445  assert(col->flags & NdbRecord::IsNullable);
446  row[col->nullbit_byte_offset]|= 1 << col->nullbit_bit_in_byte;
447 }
448 
449 int
450 NdbReceiver::get_range_no() const
451 {
452  int range_no;
453  assert(m_using_ndb_record);
454  Uint32 idx= m_current_row;
455  if (idx == 0 || !m_record.m_read_range_no)
456  return -1;
457  memcpy(&range_no,
458  m_record.m_row_buffer +
459  (idx-1)*m_record.m_row_offset +
460  m_record.m_ndb_record->m_row_size,
461  4);
462  return range_no;
463 }
464 
470 static void
471 handle_bitfield_ndbrecord(const NdbRecord::Attr* col,
472  const Uint8*& src,
473  Uint32& bitPos,
474  Uint32& len,
475  char* row)
476 {
477  if (col->flags & NdbRecord::IsNullable)
478  {
479  /* Clear nullbit in row */
480  row[col->nullbit_byte_offset] &=
481  ~(1 << col->nullbit_bit_in_byte);
482  }
483 
484  char* dest;
485  Uint64 mysqldSpace;
486 
487  /* For MySqldBitField, we read it as normal into a local on the
488  * stack and then use the put_mysqld_bitfield function to rearrange
489  * and write it to the row
490  */
491  bool isMDBitfield= (col->flags & NdbRecord::IsMysqldBitfield) != 0;
492 
493  if (isMDBitfield)
494  {
495  assert(len <= 64);
496  dest= (char*) &mysqldSpace;
497  }
498  else
499  dest= row + col->offset;
500 
501  /* Copy bitfield to memory starting at dest */
502  src = pad(src, 0, 0);
503  handle_packed_bit((const char*)src, bitPos, len, dest);
504  src += 4 * ((bitPos + len) >> 5);
505  bitPos = (bitPos + len) & 31;
506 
507  if (isMDBitfield)
508  /* Rearrange bitfield from stack to row storage */
509  col->put_mysqld_bitfield(row, dest);
510 }
511 
512 
519 Uint32
520 NdbReceiver::receive_packed_ndbrecord(Uint32 bmlen,
521  const Uint32* aDataPtr,
522  char* row)
523 {
524  const Uint8 *src = (Uint8*)(aDataPtr + bmlen);
525  Uint32 bitPos = 0;
526  const NdbRecord* rec= m_record.m_ndb_record;
527  const Uint32 maxAttrId= rec->columns[rec->noOfColumns -1].attrId;
528  const Uint32 bmSize= bmlen << 5;
529 
530  /* Use bitmap to determine which columns have been sent */
531  for (Uint32 i = 0, attrId = 0;
532  (i < bmSize) && (attrId <= maxAttrId);
533  i++, attrId++)
534  {
535  if (BitmaskImpl::get(bmlen, aDataPtr, i))
536  {
537  /* Found bit in column presence bitmask, get corresponding
538  * Attr struct from NdbRecord
539  */
540  assert(attrId < rec->m_attrId_indexes_length);
541  assert((Uint32) rec->m_attrId_indexes[attrId]
542  < rec->noOfColumns);
543  const NdbRecord::Attr* col= &rec->columns[rec->m_attrId_indexes[attrId]];
544 
545  assert((col->flags & NdbRecord::IsBlob) == 0);
546 
547  /* If col is nullable, check for null and
548  * set bit
549  */
550  if (col->flags & NdbRecord::IsNullable)
551  {
552  if (BitmaskImpl::get(bmlen, aDataPtr, ++i))
553  {
554  setRecToNULL(col, m_record.m_row_recv);
555 
556  // Next column...
557  continue;
558  }
559  }
560 
561  Uint32 align = col->orgAttrSize;
562  Uint32 sz = col->maxSize;
563  Uint32 len = col->bitCount;
564  Uint32 arrayType =
565  (col->flags & NdbRecord::IsVar1ByteLen)?
566  NDB_ARRAYTYPE_SHORT_VAR :
567  (
568  (col->flags & NdbRecord::IsVar2ByteLen)?
569  NDB_ARRAYTYPE_MEDIUM_VAR :
570  NDB_ARRAYTYPE_FIXED);
571 
572  switch(align){
573  case DictTabInfo::aBit: // Bit
574  handle_bitfield_ndbrecord(col,
575  src,
576  bitPos,
577  len,
578  row);
579  continue; // Next column
580  default:
581  src = pad(src, align, bitPos);
582  }
583  switch(arrayType){
584  case NDB_ARRAYTYPE_FIXED:
585  break;
586  case NDB_ARRAYTYPE_SHORT_VAR:
587  sz = 1 + src[0];
588  break;
589  case NDB_ARRAYTYPE_MEDIUM_VAR:
590  sz = 2 + src[0] + 256 * src[1];
591  break;
592  default:
593  abort();
594  }
595 
596  bitPos = 0;
597  assignToRec(col,
598  row,
599  src,
600  sz);
601 
602  src += sz;
603  }
604  }
605 
606  return (Uint32)(((Uint32*)pad(src, 0, bitPos)) - aDataPtr);
607 }
608 
609 
610 int
611 NdbReceiver::get_keyinfo20(Uint32 & scaninfo, Uint32 & length,
612  const char * & data_ptr) const
613 {
614  assert(m_using_ndb_record);
615  Uint32 idx= m_current_row;
616  if (idx == 0)
617  return -1; // No rows fetched yet
618  const char *p= m_record.m_row_buffer +
619  (idx-1)*m_record.m_row_offset +
620  m_record.m_ndb_record->m_row_size;
621  if (m_record.m_read_range_no)
622  p+= 4;
623  scaninfo= uint4korr(p);
624  p+= 4;
625  length= uint4korr(p);
626  p+= 4;
627  data_ptr= p;
628  return 0;
629 }
630 
631 
632 int
633 NdbReceiver::getScanAttrData(const char * & data, Uint32 & size, Uint32 & pos) const
634 {
635  assert(m_using_ndb_record);
636  Uint32 idx= m_current_row;
637  if (idx == 0)
638  return -1; // No rows fetched yet
639  const char *row_end= m_record.m_row_buffer + idx*m_record.m_row_offset;
640 
641  pos+= sizeof(Uint32);
642  memcpy(&size, row_end - pos, sizeof(Uint32));
643  pos+= size;
644  data= row_end - pos;
645 
646  assert (pos <= m_record.m_row_offset);
647  return 0;
648 }
649 
650 int
651 NdbReceiver::execTRANSID_AI(const Uint32* aDataPtr, Uint32 aLength)
652 {
653  /*
654  * NdbRecord and NdbRecAttr row result handling are merged here
655  * First any NdbRecord attributes are extracted
656  * Then any NdbRecAttr attributes are extracted
657  * NdbRecord scans with extra NdbRecAttr getValue() attrs
658  * are handled separately in the NdbRecord code
659  * Scenarios :
660  * NdbRecord only PK read result
661  * NdbRecAttr only PK read result
662  * Mixed PK read results
663  * NdbRecord only scan read result
664  * NdbRecAttr only scan read result
665  * Mixed scan read results
666  */
667  Uint32 exp= m_expected_result_length;
668  Uint32 tmp= m_received_result_length + aLength;
669  Uint32 origLength=aLength;
670  NdbRecAttr* currRecAttr = theCurrentRecAttr;
671  Uint32 save_pos= 0;
672 
673  bool ndbrecord_part_done= !m_using_ndb_record;
674  const bool isScan= (m_type == NDB_SCANRECEIVER) ||
675  (m_type == NDB_QUERY_OPERATION);
676 
677  /* Read words from the incoming signal train.
678  * The length passed in is enough for one row, either as an individual
679  * read op, or part of a scan. When there are no more words, we're at
680  * the end of the row
681  */
682  while (aLength > 0)
683  {
684  AttributeHeader ah(* aDataPtr++);
685  const Uint32 attrId= ah.getAttributeId();
686  Uint32 attrSize= ah.getByteSize();
687  aLength--;
688 
689  if (!ndbrecord_part_done)
690  {
691  /* Special case for RANGE_NO, which is received first and is
692  * stored just after the row. */
693  if (attrId == AttributeHeader::RANGE_NO)
694  {
695  assert(m_record.m_read_range_no);
696  assert(attrSize==4);
697  assert (m_record.m_row_offset >= m_record.m_ndb_record->m_row_size+attrSize);
698  memcpy(m_record.m_row_recv+m_record.m_ndb_record->m_row_size,
699  aDataPtr++, 4);
700  aLength--;
701  continue; // Next
702  }
703 
704  /* Normal case for all NdbRecord primary key, index key, table scan
705  * and index scan reads. Extract all requested columns from packed
706  * format into the row.
707  */
708  if (attrId == AttributeHeader::READ_PACKED)
709  {
710  assert (m_record.m_row_offset >= m_record.m_ndb_record->m_row_size);
711  Uint32 len= receive_packed_ndbrecord(attrSize >> 2, // Bitmap length
712  aDataPtr,
713  m_record.m_row_recv);
714  aDataPtr+= len;
715  aLength-= len;
716  continue; // Next
717  }
718 
719  /* If we get here then we must have 'extra getValues' - columns
720  * requested outwith the normal NdbRecord + bitmask mechanism.
721  * This could be : pseudo columns, columns read via an old-Api
722  * scan, or just some extra columns added by the user to an
723  * NdbRecord operation.
724  * If the extra values are part of a scan then they get copied
725  * to a special area after the end of the normal row data.
726  * When the user calls NdbScanOperation.nextResult() they will
727  * be copied into the correct NdbRecAttr objects.
728  * If the extra values are not part of a scan then they are
729  * put into their NdbRecAttr objects now.
730  */
731  if (isScan)
732  {
733  /* For scans, we save the extra information at the end of the
734  * row buffer, in reverse order. When nextResult() is called,
735  * this data is copied into the correct NdbRecAttr objects.
736  */
737 
738  /* Save this extra getValue */
739  save_pos+= sizeof(Uint32);
740  memcpy(m_record.m_row_recv + m_record.m_row_offset - save_pos,
741  &attrSize, sizeof(Uint32));
742  if (attrSize > 0)
743  {
744  save_pos+= attrSize;
745  assert (save_pos<=m_record.m_row_offset);
746  memcpy(m_record.m_row_recv + m_record.m_row_offset - save_pos,
747  aDataPtr, attrSize);
748  }
749 
750  Uint32 sizeInWords= (attrSize+3)>>2;
751  aDataPtr+= sizeInWords;
752  aLength-= sizeInWords;
753  continue; // Next
754  }
755  else
756  {
757  /* Not a scan, so extra information is added to RecAttrs in
758  * the 'normal' way.
759  */
760  assert(theCurrentRecAttr != NULL);
761  assert(theCurrentRecAttr->attrId() == attrId);
762  /* Handle extra attributes requested with getValue(). */
763  /* This implies that we've finished with the NdbRecord part
764  of the read, so move onto NdbRecAttr */
765  ndbrecord_part_done=true;
766  // Fall through to RecAttr handling
767  }
768  } // / if (!ndbrecord_part_done)
769 
770  /* If we get here then there are some attribute values to be
771  * read into the attached list of NdbRecAttrs.
772  * This occurs for old-Api primary and unique index keyed operations
773  * and for NdbRecord primary and unique index keyed operations
774  * using 'extra GetValues'.
775  */
776  if (ndbrecord_part_done)
777  {
778  // We've processed the NdbRecord part of the TRANSID_AI, if
779  // any. There are signal words left, so they must be
780  // RecAttr data
781  //
782  if (attrId == AttributeHeader::READ_PACKED)
783  {
784  assert(!m_using_ndb_record);
785  NdbRecAttr* tmp = currRecAttr;
786  Uint32 len = receive_packed_recattr(&tmp, attrSize>>2, aDataPtr, origLength);
787  aDataPtr += len;
788  aLength -= len;
789  currRecAttr = tmp;
790  continue;
791  }
796  while(currRecAttr && currRecAttr->attrId() != attrId){
797  currRecAttr = currRecAttr->next();
798  }
799 
800  if(currRecAttr && currRecAttr->receive_data(aDataPtr, attrSize))
801  {
802  Uint32 add= (attrSize + 3) >> 2;
803  aLength -= add;
804  aDataPtr += add;
805  currRecAttr = currRecAttr->next();
806  } else {
807  /*
808  This should not happen: we got back an attribute for which we have no
809  stored NdbRecAttr recording that we requested said attribute (or we got
810  back attributes in the wrong order).
811  So dump some info for debugging, and abort.
812  */
813  ndbout_c("this=%p: attrId: %d currRecAttr: %p theCurrentRecAttr: %p "
814  "attrSize: %d %d", this,
815  attrId, currRecAttr, theCurrentRecAttr, attrSize,
816  currRecAttr ? currRecAttr->get_size_in_bytes() : 0);
817  currRecAttr = theCurrentRecAttr;
818  while(currRecAttr != 0){
819  ndbout_c("%d ", currRecAttr->attrId());
820  currRecAttr = currRecAttr->next();
821  }
822  abort();
823  return -1;
824  } // if (currRecAttr...)
825  } // /if (ndbrecord_part_done)
826  } // / while (aLength > 0)
827 
828  theCurrentRecAttr = currRecAttr;
829 
830  m_received_result_length = tmp;
831 
832  if (m_using_ndb_record) {
833  /* Move onto next row in scan buffer */
834  m_record.m_row_recv+= m_record.m_row_offset;
835  }
836  return (tmp == exp || (exp > TcKeyConf::DirtyReadBit) ? 1 : 0);
837 }
838 
839 int
840 NdbReceiver::execKEYINFO20(Uint32 info, const Uint32* aDataPtr, Uint32 aLength)
841 {
842  if (m_using_ndb_record)
843  {
844  /* Copy in the keyinfo after the user row and any range_no value. */
845 
846  char *keyinfo_ptr= m_record.m_row_buffer +
847  m_current_row++ * m_record.m_row_offset +
848  m_record.m_ndb_record->m_row_size;
849  if (m_record.m_read_range_no)
850  keyinfo_ptr+= 4;
851 
852  int4store(keyinfo_ptr, info);
853  keyinfo_ptr+= 4;
854  int4store(keyinfo_ptr, aLength);
855  keyinfo_ptr+= 4;
856  memcpy(keyinfo_ptr, aDataPtr, 4*aLength);
857 
858  Uint32 tmp= m_received_result_length + aLength;
859  m_received_result_length = tmp;
860 
861  return (tmp == m_expected_result_length ? 1 : 0);
862  }
863 
864  /* The old method, using NdbRecAttr. */
865  NdbRecAttr* currRecAttr = m_rows[m_current_row++];
866  assert(currRecAttr->attrId() == KEY_ATTR_ID);
867  /*
868  This is actually reading data one word off the end of the received
869  signal (or off the end of the long signal data section 0, for a
870  long signal), due to the aLength+1. This is to ensure the correct length
871  being set for the NdbRecAttr (one extra word for the scanInfo word placed
872  at the end), overwritten immediately below.
873  But it's a bit ugly that we rely on being able to read one word over the
874  end of the signal without crashing...
875  */
876  currRecAttr->receive_data(aDataPtr, 4*(aLength + 1));
877 
881  ((Uint32*)currRecAttr->aRef())[aLength] = info;
882 
883  Uint32 tmp = m_received_result_length + aLength;
884  m_received_result_length = tmp;
885 
886  return (tmp == m_expected_result_length ? 1 : 0);
887 }
888 
889 void
891 {
892  theMagicNumber = 0;
893  if (getType()==NDB_QUERY_OPERATION)
894  {
895  NdbQueryOperationImpl* op = (NdbQueryOperationImpl*)getOwner();
896  op->getQuery().setErrorCode(code);
897  }
898  else
899  {
900  NdbOperation* const op = (NdbOperation*)getOwner();
901  assert(op->checkMagicNumber()==0);
902  op->setErrorCode(code);
903  }
904 }