MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
NdbScanOperation.cpp
1 /*
2  Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 #include "API.hpp"
19 
20 #include <NdbSqlUtil.hpp>
21 #include <AttributeHeader.hpp>
22 
23 #include <signaldata/ScanTab.hpp>
24 #include <signaldata/KeyInfo.hpp>
25 #include <signaldata/AttrInfo.hpp>
26 #include <signaldata/TcKeyReq.hpp>
27 
28 #define DEBUG_NEXT_RESULT 0
29 
30 NdbScanOperation::NdbScanOperation(Ndb* aNdb, NdbOperation::Type aType) :
31  NdbOperation(aNdb, aType),
32  m_transConnection(NULL)
33 {
34  theParallelism = 0;
35  m_allocated_receivers = 0;
36  m_prepared_receivers = 0;
37  m_api_receivers = 0;
38  m_conf_receivers = 0;
39  m_sent_receivers = 0;
40  m_receivers = 0;
41  m_array = new Uint32[1]; // skip if on delete in fix_receivers
42  theSCAN_TABREQ = 0;
43  m_executed = false;
44  m_scan_buffer= NULL;
45  m_scanUsingOldApi= true;
46  m_readTuplesCalled= false;
47  m_interpretedCodeOldApi= NULL;
48 }
49 
50 NdbScanOperation::~NdbScanOperation()
51 {
52  for(Uint32 i = 0; i<m_allocated_receivers; i++){
53  m_receivers[i]->release();
54  theNdb->releaseNdbScanRec(m_receivers[i]);
55  }
56  delete[] m_array;
57 }
58 
59 void
60 NdbScanOperation::setErrorCode(int aErrorCode) const
61 {
62  NdbScanOperation *pnonConstThis=const_cast<NdbScanOperation *>(this);
63 
64  NdbTransaction* tmp = theNdbCon;
65  pnonConstThis->theNdbCon = m_transConnection;
66  NdbOperation::setErrorCode(aErrorCode);
67  pnonConstThis->theNdbCon = tmp;
68 }
69 
70 void
71 NdbScanOperation::setErrorCodeAbort(int aErrorCode) const
72 {
73  NdbScanOperation *pnonConstThis=const_cast<NdbScanOperation *>(this);
74 
75  NdbTransaction* tmp = theNdbCon;
76  pnonConstThis->theNdbCon = m_transConnection;
77  NdbOperation::setErrorCodeAbort(aErrorCode);
78  pnonConstThis->theNdbCon = tmp;
79 }
80 
81 /*****************************************************************************
82  * int init();
83  *
84  * Return Value: Return 0 : init was successful.
85  * Return -1: In all other case.
86  * Remark: Initiates operation record after allocation.
87  *****************************************************************************/
88 int
89 NdbScanOperation::init(const NdbTableImpl* tab, NdbTransaction* myConnection)
90 {
91  m_transConnection = myConnection;
92 
93  if (NdbOperation::init(tab, myConnection, false) != 0)
94  return -1;
95 
96  theNdb->theRemainingStartTransactions++; // will be checked in hupp...
97  NdbTransaction* aScanConnection = theNdb->hupp(myConnection);
98  if (!aScanConnection){
99  theNdb->theRemainingStartTransactions--;
100  setErrorCodeAbort(theNdb->getNdbError().code);
101  return -1;
102  }
103 
104  // NOTE! The hupped trans becomes the owner of the operation
105  theNdbCon= aScanConnection;
106 
107  initInterpreter();
108 
109  theStatus = GetValue;
110  theOperationType = OpenScanRequest;
111  theNdbCon->theMagicNumber = 0xFE11DF;
112  theNoOfTupKeyLeft = tab->m_noOfDistributionKeys;
113  m_ordered= false;
114  m_descending= false;
115  m_read_range_no = 0;
116  m_executed = false;
117  m_scanUsingOldApi= true;
118  m_readTuplesCalled= false;
119  m_interpretedCodeOldApi= NULL;
120  m_pruneState= SPS_UNKNOWN;
121 
122  m_api_receivers_count = 0;
123  m_current_api_receiver = 0;
124  m_sent_receivers_count = 0;
125  m_conf_receivers_count = 0;
126  return 0;
127 }
128 
129 int
130 NdbScanOperation::handleScanGetValuesOldApi()
131 {
132  /* Handle old API-defined scan getValue(s) */
133  assert(m_scanUsingOldApi);
134 
135  if (theReceiver.theFirstRecAttr != NULL)
136  {
137  /* theReceiver has a list of RecAttrs which the user
138  * wants to read. Traverse it, adding signals to the
139  * request to read them, *similar* to extra GetValue
140  * handling, except that we want to use the RecAttrs we've
141  * already got.
142  * Once these are added to the signal train, all other handling
143  * is exactly the same as for normal NdbRecord 'extra GetValues'
144  */
145  const NdbRecAttr* recAttrToRead = theReceiver.theFirstRecAttr;
146 
147  while(recAttrToRead != NULL)
148  {
149  int res;
150  res= insertATTRINFOHdr_NdbRecord(recAttrToRead->theAttrId, 0);
151  if (unlikely(res == -1))
152  return -1;
153  recAttrToRead= recAttrToRead->next();
154  }
155 
156  theInitialReadSize= theTotalCurrAI_Len - AttrInfo::SectionSizeInfoLength;
157 
158  }
159 
160  return 0;
161 }
162 
163 /* Method for adding interpreted code signals to a
164  * scan operation request.
165  * Both main program words and subroutine words can
166  * be added in one method as scans do not use
167  * the final update or final read sections.
168  */
169 int
170 NdbScanOperation::addInterpretedCode()
171 {
172  Uint32 mainProgramWords= 0;
173  Uint32 subroutineWords= 0;
174  const NdbInterpretedCode* code= m_interpreted_code;
175 
176  /* Any disk access? */
177  if (code->m_flags & NdbInterpretedCode::UsesDisk)
178  {
179  m_flags &= ~Uint8(OF_NO_DISK);
180  }
181 
182 
183  /* Main program size depends on whether there's subroutines */
184  mainProgramWords= code->m_first_sub_instruction_pos ?
185  code->m_first_sub_instruction_pos :
186  code->m_instructions_length;
187 
188  int res = insertATTRINFOData_NdbRecord((const char*)code->m_buffer,
189  mainProgramWords << 2);
190  if (res == 0)
191  {
192  /* Add subroutines, if we have any */
193  if (code->m_number_of_subs > 0)
194  {
195  assert(mainProgramWords > 0);
196  assert(code->m_first_sub_instruction_pos > 0);
197 
198  Uint32 *subroutineStart=
199  &code->m_buffer[ code->m_first_sub_instruction_pos ];
200  subroutineWords=
201  code->m_instructions_length -
202  code->m_first_sub_instruction_pos;
203 
204  res = insertATTRINFOData_NdbRecord((const char*) subroutineStart,
205  subroutineWords << 2);
206  }
207 
208  /* Update signal section lengths */
209  theInterpretedSize= mainProgramWords;
210  theSubroutineSize= subroutineWords;
211  }
212 
213  return res;
214 }
215 
216 /* Method for handling scanoptions passed into
217  * NdbTransaction::scanTable or scanIndex
218  */
219 int
220 NdbScanOperation::handleScanOptions(const ScanOptions *options)
221 {
222  /* Options size has already been checked.
223  * scan_flags, parallel and batch have been handled
224  * already (see NdbTransaction::scanTable and scanIndex)
225  */
226  if ((options->optionsPresent & ScanOptions::SO_GETVALUE) &&
227  (options->numExtraGetValues > 0))
228  {
229  if (options->extraGetValues == NULL)
230  {
231  setErrorCodeAbort(4299);
232  /* Incorrect combination of ScanOption flags,
233  * extraGetValues ptr and numExtraGetValues */
234  return -1;
235  }
236 
237  /* Add extra getValue()s */
238  for (unsigned int i=0; i < options->numExtraGetValues; i++)
239  {
240  NdbOperation::GetValueSpec *pvalSpec = &(options->extraGetValues[i]);
241 
242  pvalSpec->recAttr=NULL;
243 
244  if (pvalSpec->column == NULL)
245  {
246  setErrorCodeAbort(4295);
247  // Column is NULL in Get/SetValueSpec structure
248  return -1;
249  }
250 
251  /* Call internal NdbRecord specific getValue() method
252  * Same method handles table scans and index scans
253  */
254  NdbRecAttr *pra=
255  getValue_NdbRecord_scan(&NdbColumnImpl::getImpl(*pvalSpec->column),
256  (char *) pvalSpec->appStorage);
257 
258  if (pra == NULL)
259  {
260  return -1;
261  }
262 
263  pvalSpec->recAttr = pra;
264  }
265  }
266 
267  if (options->optionsPresent & ScanOptions::SO_PARTITION_ID)
268  {
269  /* Should not have any blobs defined at this stage */
270  assert(theBlobList == NULL);
271  assert(m_pruneState == SPS_UNKNOWN);
272 
273  /* Only allowed to set partition id for PK ops on UserDefined
274  * partitioned tables
275  */
276  if(unlikely(! (m_attribute_record->flags &
277  NdbRecord::RecHasUserDefinedPartitioning)))
278  {
279  /* Explicit partitioning info not allowed for table and operation*/
280  setErrorCodeAbort(4546);
281  return -1;
282  }
283 
284  m_pruneState= SPS_FIXED;
285  m_pruningKey= options->partitionId;
286 
287  /* And set the vars in the operation now too */
288  theDistributionKey = options->partitionId;
289  theDistrKeyIndicator_ = 1;
290  assert((m_attribute_record->flags & NdbRecord::RecHasUserDefinedPartitioning) != 0);
291  DBUG_PRINT("info", ("NdbScanOperation::handleScanOptions(dist key): %u",
292  theDistributionKey));
293  }
294 
295  if (options->optionsPresent & ScanOptions::SO_INTERPRETED)
296  {
297  /* Check the program's for the same table as the
298  * operation, within a major version number
299  * Perhaps NdbInterpretedCode should not contain the table
300  */
301  const NdbDictionary::Table* codeTable=
302  options->interpretedCode->getTable();
303  if (codeTable != NULL)
304  {
305  NdbTableImpl* impl= &NdbTableImpl::getImpl(*codeTable);
306 
307  if ((impl->m_id != (int) m_attribute_record->tableId) ||
308  (table_version_major(impl->m_version) !=
309  table_version_major(m_attribute_record->tableVersion)))
310  return 4524; // NdbInterpretedCode is for different table`
311  }
312 
313  if ((options->interpretedCode->m_flags &
314  NdbInterpretedCode::Finalised) == 0)
315  {
316  setErrorCodeAbort(4519);
317  return -1; // NdbInterpretedCode::finalise() not called.
318  }
319  m_interpreted_code= options->interpretedCode;
320  }
321 
322  /* User's operation 'tag' data. */
323  if (options->optionsPresent & ScanOptions::SO_CUSTOMDATA)
324  {
325  m_customData = options->customData;
326  }
327 
328  /* Preferred form of partitioning information */
329  if (options->optionsPresent & ScanOptions::SO_PART_INFO)
330  {
331  Uint32 partValue;
332  Ndb::PartitionSpec tmpSpec;
333  const Ndb::PartitionSpec* pSpec= options->partitionInfo;
334  if (unlikely(validatePartInfoPtr(pSpec,
335  options->sizeOfPartInfo,
336  tmpSpec) ||
337  getPartValueFromInfo(pSpec,
338  m_currentTable,
339  &partValue)))
340  return -1;
341 
342  assert(m_pruneState == SPS_UNKNOWN);
343  m_pruneState= SPS_FIXED;
344  m_pruningKey= partValue;
345 
346  theDistributionKey= partValue;
347  theDistrKeyIndicator_= 1;
348  DBUG_PRINT("info", ("Set distribution key from partition spec to %u",
349  partValue));
350  }
351 
352  return 0;
353 }
354 
361 int
363  bool& haveBlob,
364  const Uint32 * m_read_mask)
365 {
367  Uint32 columnCount= 0;
368  Uint32 maxAttrId= 0;
369 
370  haveBlob= false;
371 
372  for (Uint32 i= 0; i<result_record->noOfColumns; i++)
373  {
374  const NdbRecord::Attr *col= &result_record->columns[i];
375  Uint32 attrId= col->attrId;
376 
377  assert(!(attrId & AttributeHeader::PSEUDO));
378 
379  /* Skip column if result_mask says so and we don't need
380  * to read it
381  */
382  if (!BitmaskImpl::get(MAXNROFATTRIBUTESINWORDS, m_read_mask, attrId))
383  continue;
384 
385  /* Blob reads are handled with a getValue() in NdbBlob.cpp. */
386  if (unlikely(col->flags & NdbRecord::IsBlob))
387  {
388  m_keyInfo= 1; // Need keyinfo for blob scan
389  haveBlob= true;
390  continue;
391  }
392 
393  if (col->flags & NdbRecord::IsDisk)
394  m_flags &= ~Uint8(OF_NO_DISK);
395 
396  if (attrId > maxAttrId)
397  maxAttrId= attrId;
398 
399  readMask.set(attrId);
400  columnCount++;
401  }
402 
403  int result= 0;
404 
405  /* Are there any columns to read via NdbRecord?
406  * Old Api scans, and new Api scans which only read via extra getvalues
407  * may have no 'NdbRecord reads'
408  */
409  if (columnCount > 0)
410  {
411  bool all= (columnCount == m_currentTable->m_columns.size());
412 
413  if (all)
414  result= insertATTRINFOHdr_NdbRecord(AttributeHeader::READ_ALL,
415  columnCount);
416  else
417  {
418  /* How many bitmask words are significant? */
419  Uint32 sigBitmaskWords= (maxAttrId>>5) + 1;
420 
421  result= insertATTRINFOHdr_NdbRecord(AttributeHeader::READ_PACKED,
422  sigBitmaskWords << 2);
423  if (result != -1)
424  result= insertATTRINFOData_NdbRecord((const char*) &readMask.rep.data[0],
425  sigBitmaskWords << 2); // Bitmask
426  }
427  }
428 
429  return result;
430 }
431 
438 inline int
440  const Uint32 * readMask)
441 {
442  bool haveBlob= false;
443 
444  /* Add AttrInfos for packed read of cols in result_record */
445  if (generatePackedReadAIs(m_attribute_record, haveBlob, readMask) != 0)
446  return -1;
447 
448  theInitialReadSize= theTotalCurrAI_Len - AttrInfo::SectionSizeInfoLength;
449 
450  /* Handle any getValue() calls made against the old API. */
451  if (m_scanUsingOldApi)
452  {
453  if (handleScanGetValuesOldApi() !=0)
454  return -1;
455  }
456 
457  /* Handle scan options - always for old style scan API */
458  if (options != NULL)
459  {
460  if (handleScanOptions(options) != 0)
461  return -1;
462  }
463 
464  /* Get Blob handles unless this is an old Api scan op
465  * For old Api Scan ops, the Blob handles are already
466  * set up by the call to getBlobHandle()
467  */
468  if (unlikely(haveBlob) && !m_scanUsingOldApi)
469  {
470  if (getBlobHandlesNdbRecord(m_transConnection, readMask) == -1)
471  return -1;
472  }
473 
474  /* Add interpreted code words to ATTRINFO signal
475  * chain as necessary
476  */
477  if (m_interpreted_code != NULL)
478  {
479  if (addInterpretedCode() == -1)
480  return -1;
481  }
482 
483  /* Scan is now fully defined, so let's start preparing
484  * signals.
485  */
486  if (prepareSendScan(theNdbCon->theTCConPtr,
487  theNdbCon->theTransactionId) == -1)
488  /* Error code should be set */
489  return -1;
490 
491  return 0;
492 }
493 
494 int
495 NdbScanOperation::handleScanOptionsVersion(const ScanOptions*& optionsPtr,
496  Uint32 sizeOfOptions,
497  ScanOptions& currOptions)
498 {
499  /* Handle different sized ScanOptions */
500  if (unlikely((sizeOfOptions !=0) &&
501  (sizeOfOptions != sizeof(ScanOptions))))
502  {
503  /* Different size passed, perhaps it's an old client */
504  if (sizeOfOptions == sizeof(ScanOptions_v1))
505  {
506  const ScanOptions_v1* oldOptions=
507  (const ScanOptions_v1*) optionsPtr;
508 
509  /* v1 of ScanOptions, copy into current version
510  * structure and update options ptr
511  */
512  currOptions.optionsPresent= oldOptions->optionsPresent;
513  currOptions.scan_flags= oldOptions->scan_flags;
514  currOptions.parallel= oldOptions->parallel;
515  currOptions.batch= oldOptions->batch;
516  currOptions.extraGetValues= oldOptions->extraGetValues;
517  currOptions.numExtraGetValues= oldOptions->numExtraGetValues;
518  currOptions.partitionId= oldOptions->partitionId;
519  currOptions.interpretedCode= oldOptions->interpretedCode;
520  currOptions.customData= oldOptions->customData;
521 
522  /* New fields */
523  currOptions.partitionInfo= NULL;
524  currOptions.sizeOfPartInfo= 0;
525 
526  optionsPtr= &currOptions;
527  }
528  else
529  {
530  /* No other versions supported currently */
531  setErrorCodeAbort(4298);
532  /* Invalid or unsupported ScanOptions structure */
533  return -1;
534  }
535  }
536  return 0;
537 }
538 
539 int
540 NdbScanOperation::scanTableImpl(const NdbRecord *result_record,
541  NdbOperation::LockMode lock_mode,
542  const unsigned char *result_mask,
543  const NdbScanOperation::ScanOptions *options,
544  Uint32 sizeOfOptions)
545 {
546  int res;
547  Uint32 scan_flags = 0;
548  Uint32 parallel = 0;
549  Uint32 batch = 0;
550 
551  ScanOptions currentOptions;
552 
553  if (options != NULL)
554  {
555  if (handleScanOptionsVersion(options, sizeOfOptions, currentOptions))
556  return -1;
557 
558  /* Process some initial ScanOptions - most are
559  * handled later
560  */
561  if (options->optionsPresent & ScanOptions::SO_SCANFLAGS)
562  scan_flags = options->scan_flags;
563  if (options->optionsPresent & ScanOptions::SO_PARALLEL)
564  parallel = options->parallel;
565  if (options->optionsPresent & ScanOptions::SO_BATCH)
566  batch = options->batch;
567  }
568 #if 0 // ToDo: this breaks optimize index, but maybe there is a better solution
569  if (result_record->flags & NdbRecord::RecIsIndex)
570  {
571  setErrorCodeAbort(4340);
572  return -1;
573  }
574 #endif
575 
576  m_attribute_record= result_record;
577  AttributeMask readMask;
578  m_attribute_record->copyMask(readMask.rep.data, result_mask);
579 
580  /* Process scan definition info */
581  res= processTableScanDefs(lock_mode, scan_flags, parallel, batch);
582  if (res == -1)
583  return -1;
584 
585  theStatus= NdbOperation::UseNdbRecord;
586  /* Call generic scan code */
587  return scanImpl(options, readMask.rep.data);
588 }
589 
590 
591 int
592 NdbScanOperation::getPartValueFromInfo(const Ndb::PartitionSpec* partInfo,
593  const NdbTableImpl* table,
594  Uint32* partValue)
595 {
596  switch(partInfo->type)
597  {
598  case Ndb::PartitionSpec::PS_USER_DEFINED:
599  {
600  assert(table->m_fragmentType == NdbDictionary::Object::UserDefined);
601  *partValue= partInfo->UserDefined.partitionId;
602  return 0;
603  }
604 
605  case Ndb::PartitionSpec::PS_DISTR_KEY_PART_PTR:
606  {
607  assert(table->m_fragmentType != NdbDictionary::Object::UserDefined);
608  Uint32 hashVal;
609  int ret= Ndb::computeHash(&hashVal, table,
610  partInfo->KeyPartPtr.tableKeyParts,
611  partInfo->KeyPartPtr.xfrmbuf,
612  partInfo->KeyPartPtr.xfrmbuflen);
613  if (ret == 0)
614  {
615  /* We send the hash result here (rather than the partitionId
616  * generated by doing some function on the hash)
617  * Note that KEY and LINEAR KEY native partitioning hash->partitionId
618  * mapping functions are idempotent so that they can be
619  * applied multiple times to their result without changing it.
620  * DIH will apply them, so there's no need to also do it here in API,
621  * unless we want to see which physical partition we *think* will
622  * hold the values.
623  * Only possible advantage is that we could identify some locality
624  * not shown in the hash result. This is only *safe* for schemes
625  * which cannot change the hash->partitionId mapping function
626  * online.
627  * Can add as an optimisation if necessary.
628  */
629  *partValue= hashVal;
630  return 0;
631  }
632  else
633  {
634  setErrorCodeAbort(ret);
635  return -1;
636  }
637  }
638 
639  case Ndb::PartitionSpec::PS_DISTR_KEY_RECORD:
640  {
641  assert(table->m_fragmentType != NdbDictionary::Object::UserDefined);
642  Uint32 hashVal;
643  int ret= Ndb::computeHash(&hashVal,
644  partInfo->KeyRecord.keyRecord,
645  partInfo->KeyRecord.keyRow,
646  partInfo->KeyRecord.xfrmbuf,
647  partInfo->KeyRecord.xfrmbuflen);
648  if (ret == 0)
649  {
650  /* See comments above about sending hashResult rather than
651  * partitionId
652  */
653  *partValue= hashVal;
654  return 0;
655  }
656  else
657  {
658  setErrorCodeAbort(ret);
659  return -1;
660  }
661  }
662  }
663 
664  /* 4542 : Unknown partition information type */
665  setErrorCodeAbort(4542);
666  return -1;
667 }
668 
669 /*
670  Compare two rows on some prefix of the index.
671  This is used to see if we can determine that all rows in an index range scan
672  will come from a single fragment (if the two rows bound a single distribution
673  key).
674  */
675 static int
676 compare_index_row_prefix(const NdbRecord *rec,
677  const char *row1,
678  const char *row2,
679  Uint32 prefix_length)
680 {
681  Uint32 i;
682 
683  if (row1 == row2) // Easy case with same ptrs
684  return 0;
685 
686  for (i= 0; i<prefix_length; i++)
687  {
688  const NdbRecord::Attr *col= &rec->columns[rec->key_indexes[i]];
689 
690  bool is_null1= col->is_null(row1);
691  bool is_null2= col->is_null(row2);
692  if (is_null1)
693  {
694  if (!is_null2)
695  return -1;
696  /* Fall-through to compare next one. */
697  }
698  else
699  {
700  if (is_null2)
701  return 1;
702 
703  Uint32 offset= col->offset;
704  Uint32 maxSize= col->maxSize;
705  const char *ptr1= row1 + offset;
706  const char *ptr2= row2 + offset;
707 
708  /* bug#56853 */
709  char buf1[NdbRecord::Attr::SHRINK_VARCHAR_BUFFSIZE];
710  char buf2[NdbRecord::Attr::SHRINK_VARCHAR_BUFFSIZE];
711  if (col->flags & NdbRecord::IsMysqldShrinkVarchar)
712  {
713  Uint32 len1;
714  bool ok1 = col->shrink_varchar(row1, len1, buf1);
715  assert(ok1);
716  ptr1 = buf1;
717  Uint32 len2;
718  bool ok2 = col->shrink_varchar(row2, len2, buf2);
719  assert(ok2);
720  ptr2 = buf2;
721  }
722 
723  void *info= col->charset_info;
724  int res=
725  (*col->compare_function)(info, ptr1, maxSize, ptr2, maxSize);
726  if (res)
727  {
728  return res;
729  }
730  }
731  }
732 
733  return 0;
734 }
735 
736 int
737 NdbIndexScanOperation::getDistKeyFromRange(const NdbRecord *key_record,
738  const NdbRecord *result_record,
739  const char *row,
740  Uint32* distKey)
741 {
742  const Uint32 MaxKeySizeInLongWords= (NDB_MAX_KEY_SIZE + 7) / 8;
743  Uint64 tmp[ MaxKeySizeInLongWords ];
744  char* tmpshrink = (char*)tmp;
745  Uint32 tmplen = (Uint32)sizeof(tmp);
746 
747  /* This can't work for User Defined partitioning */
748  assert(key_record->table->m_fragmentType !=
749  NdbDictionary::Object::UserDefined);
750 
751  Ndb::Key_part_ptr ptrs[NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY+1];
752  Uint32 i;
753  for (i = 0; i<key_record->distkey_index_length; i++)
754  {
755  const NdbRecord::Attr *col =
756  &key_record->columns[key_record->distkey_indexes[i]];
757  if (col->flags & NdbRecord::IsMysqldShrinkVarchar)
758  {
759  if (tmplen >= 256)
760  {
761  Uint32 len;
762  bool len_ok = col->shrink_varchar(row, len, tmpshrink);
763  if (!len_ok)
764  {
765  /* 4209 : Length parameter in equal/setValue is incorrect */
766  setErrorCodeAbort(4209);
767  return -1;
768  }
769  ptrs[i].ptr = tmpshrink;
770  tmpshrink += len;
771  tmplen -= len;
772  }
773  else
774  {
775  /* 4207 : Key size is limited to 4092 bytes */
776  setErrorCodeAbort(4207);
777  return -1;
778  }
779  }
780  else
781  {
782  ptrs[i].ptr = row + col->offset;
783  }
784  ptrs[i].len = col->maxSize;
785  }
786  ptrs[i].ptr = 0;
787 
788  Uint32 hashValue;
789  int ret = Ndb::computeHash(&hashValue, result_record->table,
790  ptrs, tmpshrink, tmplen);
791  if (ret == 0)
792  {
793  *distKey = hashValue;
794  return 0;
795  }
796  else
797  {
798 #ifdef VM_TRACE
799  ndbout << "err: " << ret << endl;
800 #endif
801  setErrorCodeAbort(ret);
802  return -1;
803  }
804 }
805 
806 int
807 NdbScanOperation::validatePartInfoPtr(const Ndb::PartitionSpec*& partInfo,
808  Uint32 sizeOfPartInfo,
809  Ndb::PartitionSpec& tmpSpec)
810 {
811  if (unlikely(sizeOfPartInfo != sizeof(Ndb::PartitionSpec)))
812  {
813  if (sizeOfPartInfo == sizeof(Ndb::PartitionSpec_v1))
814  {
815  const Ndb::PartitionSpec_v1* oldPSpec=
816  (const Ndb::PartitionSpec_v1*) partInfo;
817 
818  /* Let's upgrade to the latest variant */
819  tmpSpec.type= oldPSpec->type;
820  if (tmpSpec.type == Ndb::PartitionSpec_v1::PS_USER_DEFINED)
821  {
822  tmpSpec.UserDefined.partitionId= oldPSpec->UserDefined.partitionId;
823  }
824  else
825  {
826  tmpSpec.KeyPartPtr.tableKeyParts= oldPSpec->KeyPartPtr.tableKeyParts;
827  tmpSpec.KeyPartPtr.xfrmbuf= oldPSpec->KeyPartPtr.xfrmbuf;
828  tmpSpec.KeyPartPtr.xfrmbuflen= oldPSpec->KeyPartPtr.xfrmbuflen;
829  }
830 
831  partInfo= &tmpSpec;
832  }
833  else
834  {
835  /* 4545 : Invalid or Unsupported PartitionInfo structure */
836  setErrorCodeAbort(4545);
837  return -1;
838  }
839  }
840 
841  if (partInfo->type != Ndb::PartitionSpec::PS_NONE)
842  {
843  if (m_pruneState == SPS_FIXED)
844  {
845  /* 4543 : Duplicate partitioning information supplied */
846  setErrorCodeAbort(4543);
847  return -1;
848  }
849 
850  if ((partInfo->type == Ndb::PartitionSpec::PS_USER_DEFINED) !=
851  ((m_currentTable->m_fragmentType == NdbDictionary::Object::UserDefined)))
852  {
853  /* Mismatch between type of partitioning info supplied, and table's
854  * partitioning type
855  */
856  /* 4544 : Wrong partitionInfo type for table */
857  setErrorCodeAbort(4544);
858  return -1;
859  }
860  }
861  else
862  {
863  /* PartInfo supplied, but set to NONE */
864  partInfo= NULL;
865  }
866 
867  return 0;
868 }
869 
870 
871 int
873  const IndexBound& bound)
874 {
875  return setBound(key_record, bound, NULL, 0);
876 }
877 
885 int
887  const IndexBound& bound,
888  const Ndb::PartitionSpec* partInfo,
889  Uint32 sizeOfPartInfo)
890 {
891  if (unlikely((theStatus != NdbOperation::UseNdbRecord)))
892  {
893  setErrorCodeAbort(4284);
894  /* Cannot mix NdbRecAttr and NdbRecord methods in one operation */
895  return -1;
896  }
897 
898  if (unlikely(key_record == NULL))
899  {
900  setErrorCodeAbort(4285);
901  /* NULL NdbRecord pointer */
902  return -1;
903  }
904 
905  /* Has the user supplied an open range (no bounds)? */
906  const bool openRange= (((bound.low_key == NULL) &&
907  (bound.high_key == NULL)) ||
908  ((bound.low_key_count == 0) &&
909  (bound.high_key_count == 0)));
910 
911  /* Check the base table's partitioning scheme
912  * (Ordered index itself has 'undefined' fragmentation)
913  */
914  bool tabHasUserDefPartitioning= (m_currentTable->m_fragmentType ==
915  NdbDictionary::Object::UserDefined);
916 
917  /* Validate explicit partitioning info if it's supplied */
918  Ndb::PartitionSpec tmpSpec;
919  if (partInfo)
920  {
921  /* May update the PartInfo ptr */
922  if (validatePartInfoPtr(partInfo,
923  sizeOfPartInfo,
924  tmpSpec))
925  return -1;
926  }
927 
928  m_num_bounds++;
929 
930  if (unlikely((m_num_bounds > 1) &&
931  (m_multi_range == 0)))
932  {
933  /* > 1 IndexBound, but not MRR */
934  setErrorCodeAbort(4509);
935  /* Non SF_MultiRange scan cannot have more than one bound */
936  return -1;
937  }
938 
939  Uint32 j;
940  Uint32 key_count, common_key_count;
941  Uint32 range_no;
942  Uint32 bound_head;
943 
944  range_no= bound.range_no;
945  if (unlikely(range_no > MaxRangeNo))
946  {
947  setErrorCodeAbort(4286);
948  return -1;
949  }
950 
951  /* Check valid ordering of supplied range numbers */
952  if ( m_read_range_no && m_ordered )
953  {
954  if (unlikely((m_num_bounds > 1) &&
955  (range_no <= m_previous_range_num)))
956  {
957  setErrorCodeAbort(4282);
958  /* range_no not strictly increasing in ordered multi-range index scan */
959  return -1;
960  }
961 
962  m_previous_range_num= range_no;
963  }
964 
965  key_count= bound.low_key_count;
966  common_key_count= key_count;
967  if (key_count < bound.high_key_count)
968  key_count= bound.high_key_count;
969  else
970  common_key_count= bound.high_key_count;
971 
972  if (unlikely(key_count > key_record->key_index_length))
973  {
974  /* Too many keys specified for key bound. */
975  setErrorCodeAbort(4281);
976  return -1;
977  }
978 
979  /* We need to get a ptr to the first word of this
980  * range so that we can set the total length of the range
981  * (and range num) at the end of writing out the range.
982  */
983  Uint32* firstRangeWord= NULL;
984  const Uint32 keyLenBeforeRange= theTupKeyLen;
985 
986  if (likely(!openRange))
987  {
988  /* If low and high key pointers are the same and key counts are
989  * the same, we send as an Eq bound to save bandwidth.
990  * This will not send an EQ bound if :
991  * - Different numbers of high and low keys are EQ
992  * - High and low keys are EQ, but use different ptrs
993  * This could be improved in future with another setBound() variant.
994  */
995  const bool isEqRange=
996  (bound.low_key == bound.high_key) &&
997  (bound.low_key_count == bound.high_key_count) &&
998  (bound.low_inclusive && bound.high_inclusive); // Does this matter?
999 
1000  if (isEqRange)
1001  {
1002  /* Using BoundEQ will result in bound being sent only once */
1003  for (j= 0; j<key_count; j++)
1004  {
1005  ndbrecord_insert_bound(key_record, key_record->key_indexes[j],
1006  bound.low_key, BoundEQ, firstRangeWord);
1007  }
1008  }
1009  else
1010  {
1011  /* Distinct upper and lower bounds, must specify them independently */
1012  /* Note : Protocol allows individual columns to be specified as EQ
1013  * or some prefix of columns. This is not currently supported from
1014  * NDBAPI.
1015  */
1016  for (j= 0; j<key_count; j++)
1017  {
1018  Uint32 bound_type;
1019  /* If key is part of lower bound */
1020  if (bound.low_key && j<bound.low_key_count)
1021  {
1022  /* Inclusive if defined, or matching rows can include this value */
1023  bound_type= bound.low_inclusive || j+1 < bound.low_key_count ?
1024  BoundLE : BoundLT;
1025  ndbrecord_insert_bound(key_record, key_record->key_indexes[j],
1026  bound.low_key, bound_type, firstRangeWord);
1027  }
1028  /* If key is part of upper bound */
1029  if (bound.high_key && j<bound.high_key_count)
1030  {
1031  /* Inclusive if defined, or matching rows can include this value */
1032  bound_type= bound.high_inclusive || j+1 < bound.high_key_count ?
1033  BoundGE : BoundGT;
1034  ndbrecord_insert_bound(key_record, key_record->key_indexes[j],
1035  bound.high_key, bound_type, firstRangeWord);
1036  }
1037  }
1038  }
1039  }
1040  else
1041  {
1042  /* Open range - all rows must be returned.
1043  * To encode this, we'll request all rows where the first
1044  * key column value is >= NULL
1045  */
1046  insert_open_bound(key_record, firstRangeWord);
1047  }
1048 
1049  /* Set the length of this range
1050  * Length = TupKeyLen@range end - TupKeyLen@ range start
1051  * Pack into Uint32 with range no and bound type as described
1052  * in KeyInfo.hpp
1053  */
1054  assert(firstRangeWord != NULL);
1055 
1056  bound_head= *firstRangeWord;
1057  bound_head|=
1058  (theTupKeyLen - keyLenBeforeRange) << 16 | (range_no << 4);
1059  *firstRangeWord= bound_head;
1060 
1061 
1062  /* Now determine if the scan can (continue to) be pruned to one
1063  * partition
1064  *
1065  * This can only be the case if
1066  * - There's no overriding partition id/info specified in
1067  * ScanOptions
1068  * AND
1069  * - This range scan can be pruned to 1 partition 'value'
1070  * AND
1071  * - All previous ranges (MRR) were partition pruned
1072  * to the same partition 'value'
1073  *
1074  * Where partition 'value' is either a partition id or a hash
1075  * that maps to one in the kernel.
1076  */
1077  if ((m_pruneState == SPS_UNKNOWN) || // First range
1078  (m_pruneState == SPS_ONE_PARTITION)) // Previous ranges are commonly pruned
1079  {
1080  bool currRangeHasOnePartVal= false;
1081  Uint32 currRangePartValue= 0;
1082 
1083  /* Determine whether this range scan can be pruned */
1084  if (partInfo)
1085  {
1086  /* Explicit partitioning info supplied, use it to get a value */
1087  currRangeHasOnePartVal= true;
1088 
1089  if (getPartValueFromInfo(partInfo,
1090  m_attribute_record->table,
1091  &currRangePartValue))
1092  {
1093  return -1;
1094  }
1095  }
1096  else
1097  {
1098  if (likely(!tabHasUserDefPartitioning))
1099  {
1100  /* Attempt to get implicit partitioning info from range bounds -
1101  * only possible if they are present and bound a single value
1102  * of the table's distribution keys
1103  */
1104  Uint32 index_distkeys = key_record->m_no_of_distribution_keys;
1105  Uint32 table_distkeys = m_attribute_record->m_no_of_distribution_keys;
1106  Uint32 distkey_min= key_record->m_min_distkey_prefix_length;
1107  if (index_distkeys == table_distkeys && // Index has all base table d-keys
1108  common_key_count >= distkey_min && // Bounds have all d-keys
1109  bound.low_key && // Have both bounds
1110  bound.high_key &&
1111  0==compare_index_row_prefix(key_record, // Both bounds are same
1112  bound.low_key,
1113  bound.high_key,
1114  distkey_min))
1115  {
1116  assert(! openRange);
1117  currRangeHasOnePartVal= true;
1118  if (getDistKeyFromRange(key_record, m_attribute_record,
1119  bound.low_key,
1120  &currRangePartValue))
1121  return -1;
1122  }
1123  }
1124  }
1125 
1126 
1127  /* Determine whether this pruned range fits with any existing
1128  * range pruning
1129  * As we can currently only prune a single scan to one partition
1130  * (Not a set of partitions, or a set of partitions per range)
1131  * we can only prune if all ranges happen to be prune-able to the
1132  * same partition.
1133  * In future perhaps Ndb can be enhanced to support partition sets
1134  * and/or per-range partition pruning.
1135  */
1136  const ScanPruningState prevPruneState= m_pruneState;
1137  if (currRangeHasOnePartVal)
1138  {
1139  if (m_pruneState == SPS_UNKNOWN)
1140  {
1141  /* Prune the scan to use this range's partition value */
1142  m_pruneState= SPS_ONE_PARTITION;
1143  m_pruningKey= currRangePartValue;
1144  }
1145  else
1146  {
1147  /* If this range's partition value is the same as the previous
1148  * ranges then we can stay pruned, otherwise we cannot
1149  */
1150  assert(m_pruneState == SPS_ONE_PARTITION);
1151  if (currRangePartValue != m_pruningKey)
1152  {
1153  /* This range is found in a different partition to previous
1154  * range(s). We cannot prune this scan.
1155  */
1156  m_pruneState= SPS_MULTI_PARTITION;
1157  }
1158  }
1159  }
1160  else
1161  {
1162  /* This range cannot be scanned by scanning a single partition
1163  * Therefore the scan must scan all partitions
1164  */
1165  m_pruneState= SPS_MULTI_PARTITION;
1166  }
1167 
1168  /* Now modify the SCANTABREQ */
1169  if (m_pruneState != prevPruneState)
1170  {
1171  theDistrKeyIndicator_= (m_pruneState == SPS_ONE_PARTITION);
1172  theDistributionKey= m_pruningKey;
1173 
1174  ScanTabReq *req= CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend());
1175  ScanTabReq::setDistributionKeyFlag(req->requestInfo, theDistrKeyIndicator_);
1176  req->distributionKey= theDistributionKey;
1177  theSCAN_TABREQ->setLength(ScanTabReq::StaticLength + theDistrKeyIndicator_);
1178  }
1179  } // if (m_pruneState == UNKNOWN / SPS_ONE_PARTITION)
1180 
1181  return 0;
1182 } // ::setBound();
1183 
1184 
1185 int
1186 NdbIndexScanOperation::scanIndexImpl(const NdbRecord *key_record,
1187  const NdbRecord *result_record,
1188  NdbOperation::LockMode lock_mode,
1189  const unsigned char *result_mask,
1190  const NdbIndexScanOperation::IndexBound *bound,
1191  const NdbScanOperation::ScanOptions *options,
1192  Uint32 sizeOfOptions)
1193 {
1194  int res;
1195  Uint32 i;
1196  Uint32 scan_flags = 0;
1197  Uint32 parallel = 0;
1198  Uint32 batch = 0;
1199 
1200  ScanOptions currentOptions;
1201 
1202  if (options != NULL)
1203  {
1204  if (handleScanOptionsVersion(options, sizeOfOptions, currentOptions))
1205  return -1;
1206 
1207  /* Process some initial ScanOptions here
1208  * The rest will be handled later
1209  */
1210  if (options->optionsPresent & ScanOptions::SO_SCANFLAGS)
1211  scan_flags = options->scan_flags;
1212  if (options->optionsPresent & ScanOptions::SO_PARALLEL)
1213  parallel = options->parallel;
1214  if (options->optionsPresent & ScanOptions::SO_BATCH)
1215  batch = options->batch;
1216  }
1217 
1218  if (!(key_record->flags & NdbRecord::RecHasAllKeys))
1219  {
1220  setErrorCodeAbort(4292);
1221  return -1;
1222  }
1223 
1224  AttributeMask readMask;
1225  result_record->copyMask(readMask.rep.data, result_mask);
1226 
1227  if (scan_flags & (NdbScanOperation::SF_OrderBy |
1229  {
1236  Uint32 keymask[MAXNROFATTRIBUTESINWORDS];
1237  BitmaskImpl::clear(MAXNROFATTRIBUTESINWORDS, keymask);
1238 
1239  for (i = 0; i < key_record->key_index_length; i++)
1240  {
1241  Uint32 attrId = key_record->columns[key_record->key_indexes[i]].attrId;
1242  if (attrId >= result_record->m_attrId_indexes_length ||
1243  result_record->m_attrId_indexes[attrId] < 0)
1244  {
1245  setErrorCodeAbort(4292);
1246  return -1;
1247  }
1248 
1249  BitmaskImpl::set(MAXNROFATTRIBUTESINWORDS, keymask, attrId);
1250  }
1251 
1252  if (scan_flags & NdbScanOperation::SF_OrderByFull)
1253  {
1254  BitmaskImpl::bitOR(MAXNROFATTRIBUTESINWORDS, readMask.rep.data, keymask);
1255  }
1256  else if (!BitmaskImpl::contains(MAXNROFATTRIBUTESINWORDS,
1257  readMask.rep.data, keymask))
1258  {
1259  setErrorCodeAbort(4341);
1260  return -1;
1261  }
1262  }
1263 
1264  if (!(key_record->flags & NdbRecord::RecIsIndex))
1265  {
1266  setErrorCodeAbort(4283);
1267  return -1;
1268  }
1269  if (result_record->flags & NdbRecord::RecIsIndex)
1270  {
1271  setErrorCodeAbort(4340);
1272  return -1;
1273  }
1274 
1275  /* Modify NdbScanOperation vars to indicate that we're an
1276  * IndexScan
1277  */
1279  m_currentTable= result_record->table;
1280 
1281  m_key_record = key_record;
1282  m_attribute_record= result_record;
1283 
1284  res= processIndexScanDefs(lock_mode, scan_flags, parallel, batch);
1285  if (res==-1)
1286  return -1;
1287 
1288  /* Fix theStatus as set in processIndexScanDefs(). */
1289  theStatus= NdbOperation::UseNdbRecord;
1290 
1291  /* Call generic scan code */
1292  res= scanImpl(options, readMask.rep.data);
1293 
1294  if (!res)
1295  {
1296  /*
1297  * Set up first key bound, if present
1298  * Extra bounds (MRR) can be added later
1299  */
1300  if (bound != NULL)
1301  {
1302  res= setBound(key_record, *bound);
1303  }
1304  }
1305 
1306  return res;
1307 } // ::scanIndexImpl();
1308 
1309 
1310 /* readTuples() method for table scans
1311  * This method performs minimal validation and initialisation,
1312  * deferring most of the work to a later call to processTableScanDefs
1313  * below.
1314  */
1315 int
1317  Uint32 scan_flags,
1318  Uint32 parallel,
1319  Uint32 batch)
1320 {
1321  // It is only possible to call readTuples if readTuples hasn't
1322  // already been called
1323  if (m_readTuplesCalled)
1324  {
1325  setErrorCode(4605);
1326  return -1;
1327  }
1328 
1329  /* Save parameters for later */
1330  m_readTuplesCalled= true;
1331  m_savedLockModeOldApi= lm;
1332  m_savedScanFlagsOldApi= scan_flags;
1333  m_savedParallelOldApi= parallel;
1334  m_savedBatchOldApi= batch;
1335 
1339  if (scan_flags & SF_OrderBy)
1340  m_savedScanFlagsOldApi |= SF_OrderByFull;
1341 
1342  return 0;
1343 }
1344 
1345 /* Most of the scan definition work for old + NdbRecord API scans is done here */
1346 int
1347 NdbScanOperation::processTableScanDefs(NdbScanOperation::LockMode lm,
1348  Uint32 scan_flags,
1349  Uint32 parallel,
1350  Uint32 batch)
1351 {
1352  m_ordered = m_descending = false;
1353  m_pruneState= SPS_UNKNOWN;
1354  Uint32 fragCount = m_currentTable->m_fragmentCount;
1355 
1356  assert(fragCount > 0);
1357 
1358  if (parallel > fragCount || parallel == 0) {
1359  parallel = fragCount;
1360  }
1361 
1362  theNdbCon->theScanningOp = this;
1363  bool tupScan = (scan_flags & SF_TupScan);
1364 
1365 #if 0 // XXX temp for testing
1366  { char* p = getenv("NDB_USE_TUPSCAN");
1367  if (p != 0) {
1368  unsigned n = atoi(p); // 0-10
1369  if ((unsigned int) (::time(0) % 10) < n) tupScan = true;
1370  }
1371  }
1372 #endif
1373  if (scan_flags & SF_DiskScan)
1374  {
1375  tupScan = true;
1376  m_flags &= ~Uint8(OF_NO_DISK);
1377  }
1378 
1379  bool rangeScan= false;
1380 
1381  /* NdbRecord defined scan, handle IndexScan specifics */
1382  if ( (int) m_accessTable->m_indexType ==
1384  {
1385  if (m_currentTable == m_accessTable){
1386  // Old way of scanning indexes, should not be allowed
1387  m_currentTable = theNdb->theDictionary->
1388  getTable(m_currentTable->m_primaryTable.c_str());
1389  assert(m_currentTable != NULL);
1390  }
1391  assert (m_currentTable != m_accessTable);
1392  // Modify operation state
1393  theStatus = GetValue;
1394  theOperationType = OpenRangeScanRequest;
1395  rangeScan = true;
1396  tupScan = false;
1397  }
1398 
1399  if (rangeScan && (scan_flags & (SF_OrderBy | SF_OrderByFull)))
1400  parallel = fragCount; /* Frag count of ordered index ==
1401  * Frag count of base table
1402  */
1403 
1404  theParallelism = parallel;
1405 
1406  if(fix_receivers(parallel) == -1){
1407  setErrorCodeAbort(4000);
1408  return -1;
1409  }
1410 
1411  if (theSCAN_TABREQ == NULL) {
1412  setErrorCodeAbort(4000);
1413  return -1;
1414  }//if
1415 
1416  theSCAN_TABREQ->setSignal(GSN_SCAN_TABREQ, refToBlock(theNdbCon->m_tcRef));
1417  ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend());
1418  req->apiConnectPtr = theNdbCon->theTCConPtr;
1419  req->tableId = m_accessTable->m_id;
1420  req->tableSchemaVersion = m_accessTable->m_version;
1421  req->storedProcId = 0xFFFF;
1422  req->buddyConPtr = theNdbCon->theBuddyConPtr;
1423  req->spare= 0;
1424  req->first_batch_size = batch; // Save user specified batch size
1425 
1426  Uint32 reqInfo = 0;
1427  ScanTabReq::setParallelism(reqInfo, parallel);
1428  ScanTabReq::setScanBatch(reqInfo, 0);
1429  ScanTabReq::setRangeScanFlag(reqInfo, rangeScan);
1430  ScanTabReq::setTupScanFlag(reqInfo, tupScan);
1431  req->requestInfo = reqInfo;
1432 
1433  m_keyInfo = (scan_flags & SF_KeyInfo) ? 1 : 0;
1434  setReadLockMode(lm);
1435 
1436  Uint64 transId = theNdbCon->getTransactionId();
1437  req->transId1 = (Uint32) transId;
1438  req->transId2 = (Uint32) (transId >> 32);
1439 
1440  assert(theSCAN_TABREQ->next() == NULL);
1441  NdbApiSignal* tSignal= theNdb->getSignal();
1442  theSCAN_TABREQ->next(tSignal);
1443  theLastKEYINFO = tSignal;
1444 
1445  theKEYINFOptr= tSignal->getDataPtrSend();
1446  keyInfoRemain= NdbApiSignal::MaxSignalWords;
1447  theTotalNrOfKeyWordInSignal= 0;
1448 
1449  getFirstATTRINFOScan();
1450  return 0;
1451 }
1452 
1453 int
1455 {
1456  if (theStatus == NdbOperation::UseNdbRecord)
1457  {
1458  setErrorCodeAbort(4284); // Cannot mix NdbRecAttr and NdbRecord methods...
1459  return -1;
1460  }
1461 
1462  if ((code->m_flags & NdbInterpretedCode::Finalised) == 0)
1463  {
1464  setErrorCodeAbort(4519); // NdbInterpretedCode::finalise() not called.
1465  return -1;
1466  }
1467 
1468  m_interpreted_code= code;
1469 
1470  return 0;
1471 }
1472 
1474 NdbScanOperation::allocInterpretedCodeOldApi()
1475 {
1476  /* Should only be called once */
1477  assert (m_interpretedCodeOldApi == NULL);
1478 
1479  /* Old Api scans only */
1480  if (! m_scanUsingOldApi)
1481  {
1482  /* NdbScanFilter constructor taking NdbOperation is not
1483  * supported for NdbRecord
1484  */
1485  setErrorCodeAbort(4536);
1486  return NULL;
1487  }
1488 
1489  m_interpretedCodeOldApi = new NdbInterpretedCode(m_currentTable->m_facade);
1490 
1491  if (m_interpretedCodeOldApi == NULL)
1492  setErrorCodeAbort(4000); // Memory allocation error
1493 
1494  return m_interpretedCodeOldApi;
1495 }
1496 
1497 void
1498 NdbScanOperation::freeInterpretedCodeOldApi()
1499 {
1500  if (m_interpretedCodeOldApi != NULL)
1501  {
1502  delete m_interpretedCodeOldApi;
1503  m_interpretedCodeOldApi= NULL;
1504  }
1505 }
1506 
1507 
1508 void
1509 NdbScanOperation::setReadLockMode(LockMode lockMode)
1510 {
1511  bool lockExcl, lockHoldMode, readCommitted;
1512  switch (lockMode)
1513  {
1514  case LM_CommittedRead:
1515  lockExcl= false;
1516  lockHoldMode= false;
1517  readCommitted= true;
1518  break;
1519  case LM_SimpleRead:
1520  case LM_Read:
1521  lockExcl= false;
1522  lockHoldMode= true;
1523  readCommitted= false;
1524  break;
1525  case LM_Exclusive:
1526  lockExcl= true;
1527  lockHoldMode= true;
1528  readCommitted= false;
1529  m_keyInfo= 1;
1530  break;
1531  default:
1532  /* Not supported / invalid. */
1533  assert(false);
1534  }
1535  theLockMode= lockMode;
1536  ScanTabReq *req= CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend());
1537  Uint32 reqInfo= req->requestInfo;
1538  ScanTabReq::setLockMode(reqInfo, lockExcl);
1539  ScanTabReq::setHoldLockFlag(reqInfo, lockHoldMode);
1540  ScanTabReq::setReadCommittedFlag(reqInfo, readCommitted);
1541  req->requestInfo= reqInfo;
1542 }
1543 
1544 int
1545 NdbScanOperation::fix_receivers(Uint32 parallel){
1546  assert(parallel > 0);
1547  if(parallel > m_allocated_receivers){
1548  const Uint32 sz = parallel * (4*sizeof(char*)+sizeof(Uint32));
1549 
1550  /* Allocate as Uint64 to ensure proper alignment for pointers. */
1551  Uint64 * tmp = new Uint64[(sz+7)/8];
1552  if (tmp == NULL)
1553  {
1554  setErrorCodeAbort(4000);
1555  return -1;
1556  }
1557  // Save old receivers
1558  memcpy(tmp, m_receivers, m_allocated_receivers*sizeof(char*));
1559  delete[] m_array;
1560  m_array = (Uint32*)tmp;
1561 
1562  m_receivers = (NdbReceiver**)tmp;
1563  m_api_receivers = m_receivers + parallel;
1564  m_conf_receivers = m_api_receivers + parallel;
1565  m_sent_receivers = m_conf_receivers + parallel;
1566  m_prepared_receivers = (Uint32*)(m_sent_receivers + parallel);
1567 
1568  // Only get/init "new" receivers
1569  NdbReceiver* tScanRec;
1570  for (Uint32 i = m_allocated_receivers; i < parallel; i ++) {
1571  tScanRec = theNdb->getNdbScanRec();
1572  if (tScanRec == NULL) {
1573  setErrorCodeAbort(4000);
1574  return -1;
1575  }//if
1576  m_receivers[i] = tScanRec;
1577  tScanRec->init(NdbReceiver::NDB_SCANRECEIVER, false, this);
1578  }
1579  m_allocated_receivers = parallel;
1580  }
1581 
1582  reset_receivers(parallel, 0);
1583  return 0;
1584 }
1585 
1589 void
1591  if(theError.code == 0){
1592  if(DEBUG_NEXT_RESULT)
1593  ndbout_c("receiver_delivered");
1594 
1595  Uint32 idx = tRec->m_list_index;
1596  Uint32 last = m_sent_receivers_count - 1;
1597  if(idx != last){
1598  NdbReceiver * move = m_sent_receivers[last];
1599  m_sent_receivers[idx] = move;
1600  move->m_list_index = idx;
1601  }
1602  m_sent_receivers_count = last;
1603 
1604  last = m_conf_receivers_count;
1605  m_conf_receivers[last] = tRec;
1606  m_conf_receivers_count = last + 1;
1607  tRec->m_current_row = 0;
1608  }
1609 }
1610 
1614 void
1616  if(theError.code == 0){
1617  if(DEBUG_NEXT_RESULT)
1618  ndbout_c("receiver_completed");
1619 
1620  Uint32 idx = tRec->m_list_index;
1621  Uint32 last = m_sent_receivers_count - 1;
1622  if(idx != last){
1623  NdbReceiver * move = m_sent_receivers[last];
1624  m_sent_receivers[idx] = move;
1625  move->m_list_index = idx;
1626  }
1627  m_sent_receivers_count = last;
1628  }
1629 }
1630 
1631 /*****************************************************************************
1632  * int getFirstATTRINFOScan()
1633  *
1634  * Return Value: Return 0: Successful
1635  * Return -1: All other cases
1636  * Parameters: None: Only allocate the first signal.
1637  * Remark: When a scan is defined we need to use this method instead
1638  * of insertATTRINFO for the first signal.
1639  * This is because we need not to mess up the code in
1640  * insertATTRINFO with if statements since we are not
1641  * interested in the TCKEYREQ signal.
1642  *****************************************************************************/
1643 int
1644 NdbScanOperation::getFirstATTRINFOScan()
1645 {
1646  NdbApiSignal* tSignal;
1647 
1648  tSignal = theNdb->getSignal();
1649  if (tSignal == NULL){
1650  setErrorCodeAbort(4000);
1651  return -1;
1652  }
1653 
1654  theAI_LenInCurrAI = AttrInfo::SectionSizeInfoLength;
1655  theATTRINFOptr = &tSignal->getDataPtrSend()[AttrInfo::SectionSizeInfoLength];
1656  attrInfoRemain= NdbApiSignal::MaxSignalWords - AttrInfo::SectionSizeInfoLength;
1657  tSignal->setLength(AttrInfo::SectionSizeInfoLength);
1658  theFirstATTRINFO = tSignal;
1659  theCurrentATTRINFO = tSignal;
1660  theCurrentATTRINFO->next(NULL);
1661 
1662  return 0;
1663 }
1664 
1665 int
1667 {
1668  /*
1669  * Call finaliseScanOldApi() for old style scans before
1670  * proceeding
1671  */
1672  bool locked = false;
1673  NdbImpl* theImpl = theNdb->theImpl;
1674 
1675  int res = 0;
1676  if (m_scanUsingOldApi && finaliseScanOldApi() == -1)
1677  {
1678  res = -1;
1679  goto done;
1680  }
1681 
1682  {
1683  locked = true;
1684  NdbTransaction * tCon = theNdbCon;
1685  theImpl->lock();
1686 
1687  Uint32 seq = tCon->theNodeSequence;
1688 
1689  if (theImpl->get_node_alive(nodeId) &&
1690  (theImpl->getNodeSequence(nodeId) == seq)) {
1691 
1692  tCon->theMagicNumber = 0x37412619;
1693 
1694  if (doSendScan(nodeId) == -1)
1695  {
1696  res = -1;
1697  goto done;
1698  }
1699 
1700  m_executed= true; // Mark operation as executed
1701  }
1702  else
1703  {
1704  if (!(theImpl->get_node_stopping(nodeId) &&
1705  (theImpl->getNodeSequence(nodeId) == seq)))
1706  {
1707  TRACE_DEBUG("The node is hard dead when attempting to start a scan");
1708  setErrorCode(4029);
1709  tCon->theReleaseOnClose = true;
1710  }
1711  else
1712  {
1713  TRACE_DEBUG("The node is stopping when attempting to start a scan");
1714  setErrorCode(4030);
1715  }//if
1716  res = -1;
1717  tCon->theCommitStatus = NdbTransaction::Aborted;
1718  }//if
1719  }
1720 
1721 done:
1728  m_curr_row = 0;
1729  m_sent_receivers_count = theParallelism;
1730  if(m_ordered)
1731  {
1732  m_current_api_receiver = theParallelism;
1733  m_api_receivers_count = theParallelism;
1734  }
1735 
1736  if (locked)
1737  theImpl->unlock();
1738 
1739  return res;
1740 }
1741 
1742 
1743 int
1744 NdbScanOperation::nextResult(bool fetchAllowed, bool forceSend)
1745 {
1746  /* Defer to NdbRecord implementation, which will copy values
1747  * out into the user's RecAttr objects.
1748  */
1749  const char * dummyOutRowPtr;
1750 
1751  if (unlikely(! m_scanUsingOldApi))
1752  {
1753  /* Cannot mix NdbRecAttr and NdbRecord methods in one operation */
1754  setErrorCode(4284);
1755  return -1;
1756  }
1757 
1758 
1759  return nextResult(&dummyOutRowPtr,
1760  fetchAllowed,
1761  forceSend);
1762 }
1763 
1764 /* nextResult() for NdbRecord operation. */
1765 int
1766 NdbScanOperation::nextResult(const char ** out_row_ptr,
1767  bool fetchAllowed, bool forceSend)
1768 {
1769  int res;
1770 
1771  if ((res = nextResultNdbRecord(*out_row_ptr, fetchAllowed, forceSend)) == 0) {
1772  NdbBlob* tBlob= theBlobList;
1773  NdbRecAttr *getvalue_recattr= theReceiver.theFirstRecAttr;
1774  if (((UintPtr)tBlob | (UintPtr)getvalue_recattr) != 0)
1775  {
1776  const Uint32 idx= m_current_api_receiver;
1777  assert(idx < m_api_receivers_count);
1778  const NdbReceiver *receiver= m_api_receivers[idx];
1779  Uint32 pos= 0;
1780 
1781  /* First take care of any getValue(). */
1782  while (getvalue_recattr != NULL)
1783  {
1784  const char *attr_data;
1785  Uint32 attr_size;
1786  if (receiver->getScanAttrData(attr_data, attr_size, pos) == -1)
1787  return -1;
1788  if (!getvalue_recattr->receive_data((const Uint32 *)attr_data,
1789  attr_size))
1790  return -1; // purecov: deadcode
1791  getvalue_recattr= getvalue_recattr->next();
1792  }
1793 
1794  /* Handle blobs. */
1795  if (tBlob)
1796  {
1797  Uint32 infoword; // Not used for blobs
1798  Uint32 key_length;
1799  const char *key_data;
1800  res= receiver->get_keyinfo20(infoword, key_length, key_data);
1801  if (res == -1)
1802  return -1;
1803 
1804  do
1805  {
1806  if (tBlob->atNextResultNdbRecord(key_data, key_length*4) == -1)
1807  return -1;
1808  tBlob= tBlob->theNext;
1809  } while (tBlob != 0);
1810  /* Flush blob part ops on behalf of user. */
1811  if (m_transConnection->executePendingBlobOps() == -1)
1812  return -1;
1813  }
1814  }
1815  return 0;
1816  }
1817  return res;
1818 }
1819 
1820 int
1821 NdbScanOperation::nextResultCopyOut(char * buffer,
1822  bool fetchAllowed, bool forceSend)
1823 {
1824  const char * data;
1825  int result;
1826  if ((result = nextResult(&data, fetchAllowed, forceSend)) == 0)
1827  {
1828  memcpy(buffer, data, m_attribute_record->m_row_size);
1829  }
1830  return result;
1831 }
1832 
1833 int
1835  bool fetchAllowed, bool forceSend)
1836 {
1837  if (m_ordered)
1838  return ((NdbIndexScanOperation*)this)->next_result_ordered_ndbrecord
1839  (out_row, fetchAllowed, forceSend);
1840 
1841  /* Return a row immediately if any is available. */
1842  while (m_current_api_receiver < m_api_receivers_count)
1843  {
1844  NdbReceiver *tRec= m_api_receivers[m_current_api_receiver];
1845  if (tRec->nextResult())
1846  {
1847  out_row= tRec->get_row();
1848  return 0;
1849  }
1850  m_current_api_receiver++;
1851  }
1852 
1853  if (!fetchAllowed)
1854  {
1855  /*
1856  Application wants to be informed that no more rows are available
1857  immediately.
1858  */
1859  return 2;
1860  }
1861 
1862  /* Now we have to wait for more rows (or end-of-file on all receivers). */
1863  Uint32 nodeId = theNdbCon->theDBnode;
1864  NdbImpl* theImpl = theNdb->theImpl;
1865  Uint32 timeout= theImpl->get_waitfor_timeout();
1866  int retVal= 2;
1867  Uint32 idx, last;
1868  /*
1869  The rest needs to be done under mutex due to synchronization with receiver
1870  thread.
1871  */
1872  PollGuard poll_guard(* theImpl);
1873 
1874  const Uint32 seq= theNdbCon->theNodeSequence;
1875 
1876  if(theError.code)
1877  {
1878  goto err4;
1879  }
1880 
1881  if(seq == theImpl->getNodeSequence(nodeId) &&
1882  send_next_scan(m_current_api_receiver, false) == 0)
1883  {
1884  idx= m_current_api_receiver;
1885  last= m_api_receivers_count;
1886 
1887  do {
1888  if (theError.code){
1889  setErrorCode(theError.code);
1890  return -1;
1891  }
1892 
1893  Uint32 cnt= m_conf_receivers_count;
1894  Uint32 sent= m_sent_receivers_count;
1895 
1896  if (cnt > 0)
1897  {
1898  /* New receivers with completed batches available. */
1899  memcpy(m_api_receivers+last, m_conf_receivers, cnt * sizeof(char*));
1900  last+= cnt;
1901  theImpl->incClientStat(Ndb::ScanBatchCount, cnt);
1902  m_conf_receivers_count= 0;
1903  }
1904  else if (retVal == 2 && sent > 0)
1905  {
1906  /* No completed... */
1907  theImpl->incClientStat(Ndb::WaitScanResultCount, 1);
1908 
1909  int ret_code= poll_guard.wait_scan(3*timeout, nodeId, forceSend);
1910  if (ret_code == 0 && seq == theImpl->getNodeSequence(nodeId)) {
1911  continue;
1912  } else if(ret_code == -1){
1913  retVal= -1;
1914  } else {
1915  idx= last;
1916  retVal= -2; //return_code;
1917  }
1918  }
1919  else if (retVal == 2)
1920  {
1924  theError.code= -1; // make sure user gets error if he tries again
1925  return 1;
1926  }
1927 
1928  if (retVal == 0)
1929  break;
1930 
1931  while (idx < last)
1932  {
1933  NdbReceiver* tRec= m_api_receivers[idx];
1934  if (tRec->nextResult())
1935  {
1936  out_row= tRec->get_row();
1937  retVal= 0;
1938  break;
1939  }
1940  idx++;
1941  }
1942  } while(retVal == 2);
1943 
1944  m_api_receivers_count= last;
1945  m_current_api_receiver= idx;
1946  } else {
1947  retVal = -3;
1948  }
1949 
1950  switch(retVal)
1951  {
1952  case 0:
1953  case 1:
1954  case 2:
1955  return retVal;
1956  case -1:
1957  setErrorCode(4008); // Timeout
1958  break;
1959  case -2:
1960  setErrorCode(4028); // Node fail
1961  break;
1962  case -3: // send_next_scan -> return fail (set error-code self)
1963  if(theError.code == 0)
1964  setErrorCode(4028); // seq changed = Node fail
1965  break;
1966  case -4:
1967 err4:
1968  setErrorCode(theError.code);
1969  break;
1970  }
1971 
1972  theNdbCon->theTransactionIsStarted= false;
1973  theNdbCon->theReleaseOnClose= true;
1974  return -1;
1975 }
1976 
1977 int
1978 NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag)
1979 {
1980  if(cnt > 0){
1981  NdbApiSignal tSignal(theNdb->theMyRef);
1982  tSignal.setSignal(GSN_SCAN_NEXTREQ, refToBlock(theNdbCon->m_tcRef));
1983 
1984  Uint32* theData = tSignal.getDataPtrSend();
1985  theData[0] = theNdbCon->theTCConPtr;
1986  theData[1] = stopScanFlag == true ? 1 : 0;
1987  Uint64 transId = theNdbCon->theTransactionId;
1988  theData[2] = (Uint32) transId;
1989  theData[3] = (Uint32) (transId >> 32);
1990 
1994  Uint32 last = m_sent_receivers_count;
1995  Uint32 * prep_array = (cnt > 21 ? m_prepared_receivers : theData + 4);
1996  Uint32 sent = 0;
1997  for(Uint32 i = 0; i<cnt; i++){
1998  NdbReceiver * tRec = m_api_receivers[i];
1999  if((prep_array[sent] = tRec->m_tcPtrI) != RNIL)
2000  {
2001  m_sent_receivers[last+sent] = tRec;
2002  tRec->m_list_index = last+sent;
2003  tRec->prepareSend();
2004  sent++;
2005  }
2006  }
2007  memmove(m_api_receivers, m_api_receivers+cnt,
2008  (theParallelism-cnt) * sizeof(char*));
2009 
2010  int ret = 0;
2011  if(sent)
2012  {
2013  Uint32 nodeId = theNdbCon->theDBnode;
2014  NdbImpl* impl = theNdb->theImpl;
2015  if(cnt > 21){
2016  tSignal.setLength(4);
2017  LinearSectionPtr ptr[3];
2018  ptr[0].p = prep_array;
2019  ptr[0].sz = sent;
2020  ret = impl->sendSignal(&tSignal, nodeId, ptr, 1);
2021  } else {
2022  tSignal.setLength(4+sent);
2023  ret = impl->sendSignal(&tSignal, nodeId);
2024  }
2025  }
2026  m_sent_receivers_count = last + sent;
2027  m_api_receivers_count -= cnt;
2028  m_current_api_receiver = 0;
2029 
2030  return ret;
2031  }
2032  return 0;
2033 }
2034 
2035 int
2036 NdbScanOperation::prepareSend(Uint32 TC_ConnectPtr,
2037  Uint64 TransactionId,
2039 {
2040  abort();
2041  return 0;
2042 }
2043 
2044 int
2045 NdbScanOperation::doSend(int ProcessorId)
2046 {
2047  return 0;
2048 }
2049 
2050 void NdbScanOperation::close(bool forceSend, bool releaseOp)
2051 {
2052  DBUG_ENTER("NdbScanOperation::close");
2053  DBUG_PRINT("enter", ("this: 0x%lx tcon: 0x%lx con: 0x%lx force: %d release: %d",
2054  (long) this,
2055  (long) m_transConnection, (long) theNdbCon,
2056  forceSend, releaseOp));
2057 
2058  if(m_transConnection){
2059  if(DEBUG_NEXT_RESULT)
2060  ndbout_c("close() theError.code = %d "
2061  "m_api_receivers_count = %d "
2062  "m_conf_receivers_count = %d "
2063  "m_sent_receivers_count = %d",
2064  theError.code,
2065  m_api_receivers_count,
2066  m_conf_receivers_count,
2067  m_sent_receivers_count);
2068 
2069  /*
2070  The PollGuard has an implicit call of unlock_and_signal through the
2071  ~PollGuard method. This method is called implicitly by the compiler
2072  in all places where the object is out of context due to a return,
2073  break, continue or simply end of statement block
2074  */
2075  PollGuard poll_guard(* theNdb->theImpl);
2076  close_impl(forceSend, &poll_guard);
2077  }
2078 
2079  // Keep in local variables, as "this" might be destructed below
2080  NdbConnection* tCon = theNdbCon;
2081  NdbConnection* tTransCon = m_transConnection;
2082  Ndb* tNdb = theNdb;
2083 
2084  theNdbCon = NULL;
2085  m_transConnection = NULL;
2086 
2087  if (tTransCon && releaseOp)
2088  {
2090 
2091  bool ret = true;
2092  if (theStatus != WaitResponse)
2093  {
2097  ret =
2098  tTransCon->releaseScanOperation(&tTransCon->m_theFirstScanOperation,
2099  &tTransCon->m_theLastScanOperation,
2100  tOp);
2101  }
2102  else
2103  {
2104  ret = tTransCon->releaseScanOperation(&tTransCon->m_firstExecutedScanOp,
2105  0, tOp);
2106  }
2107  assert(ret);
2108  }
2109 
2110  tCon->theScanningOp = 0;
2111  tNdb->closeTransaction(tCon);
2112  tNdb->theImpl->decClientStat(Ndb::TransCloseCount, 1); /* Correct stats */
2113  tNdb->theRemainingStartTransactions--;
2114  DBUG_VOID_RETURN;
2115 }
2116 
2117 void
2118 NdbScanOperation::execCLOSE_SCAN_REP(){
2119  m_conf_receivers_count = 0;
2120  m_sent_receivers_count = 0;
2121 }
2122 
2123 void NdbScanOperation::release()
2124 {
2125  if(theNdbCon != 0 || m_transConnection != 0){
2126  close();
2127  }
2128  for(Uint32 i = 0; i<m_allocated_receivers; i++){
2129  m_receivers[i]->release();
2130  }
2131  if (m_scan_buffer)
2132  {
2133  delete[] m_scan_buffer;
2134  m_scan_buffer= NULL;
2135  }
2136 
2137  NdbOperation::release();
2138 
2139  if(theSCAN_TABREQ)
2140  {
2141  theNdb->releaseSignal(theSCAN_TABREQ);
2142  theSCAN_TABREQ = 0;
2143  }
2144 }
2145 
2146 /*
2147  * This method finalises an Old API defined scan
2148  * This is done just prior to scan execution
2149  * The parameters provided via the RecAttr scan interface are
2150  * used to create an NdbRecord based scan
2151  */
2152 int NdbScanOperation::finaliseScanOldApi()
2153 {
2154  /* For a scan we use an NdbRecord structure for this
2155  * table, and add the user-requested values in a similar
2156  * way to the extra GetValues mechanism
2157  */
2158  assert(theOperationType == OpenScanRequest ||
2159  theOperationType == OpenRangeScanRequest);
2160 
2161  /* Prepare ScanOptions structure using saved parameters */
2162  ScanOptions options;
2163  options.optionsPresent=(ScanOptions::SO_SCANFLAGS |
2164  ScanOptions::SO_PARALLEL |
2165  ScanOptions::SO_BATCH);
2166 
2167  options.scan_flags= m_savedScanFlagsOldApi;
2168  options.parallel= m_savedParallelOldApi;
2169  options.batch= m_savedBatchOldApi;
2170 
2171  if (theDistrKeyIndicator_ == 1)
2172  {
2173  /* User has defined a partition id specifically */
2174  options.optionsPresent |= ScanOptions::SO_PARTITION_ID;
2175  options.partitionId= theDistributionKey;
2176  }
2177 
2178  /* customData or interpretedCode should
2179  * already be set in the operation members - no need
2180  * to pass in as ScanOptions
2181  */
2182 
2183  /* Next, call scanTable, passing in some of the
2184  * parameters we saved
2185  * It will look after building the correct signals
2186  */
2187  int result= -1;
2188 
2189  const unsigned char* emptyMask=
2190  (const unsigned char*) NdbDictionaryImpl::m_emptyMask;
2191 
2192  if (theOperationType == OpenScanRequest)
2193  /* Create table scan operation with an empty
2194  * mask for NdbRecord values
2195  */
2196  result= scanTableImpl(m_currentTable->m_ndbrecord,
2197  m_savedLockModeOldApi,
2198  emptyMask,
2199  &options,
2200  sizeof(ScanOptions));
2201  else
2202  {
2203  assert(theOperationType == OpenRangeScanRequest);
2204  NdbIndexScanOperation *isop =
2205  static_cast<NdbIndexScanOperation*>(this);
2206 
2207  if (isop->currentRangeOldApi != NULL)
2208  {
2209  /* Add current bound to bound list */
2210  if (isop->buildIndexBoundOldApi(0) != 0)
2211  return -1;
2212  }
2213 
2214  /* If this is an ordered scan, then we need
2215  * the pk columns in the mask, otherwise we
2216  * don't
2217  */
2218  const unsigned char * resultMask=
2219  ((m_savedScanFlagsOldApi & (SF_OrderBy | SF_OrderByFull)) !=0) ?
2220  m_accessTable->m_pkMask :
2221  emptyMask;
2222 
2223  result= isop->scanIndexImpl(m_accessTable->m_ndbrecord,
2224  m_currentTable->m_ndbrecord,
2225  m_savedLockModeOldApi,
2226  resultMask,
2227  NULL, // All bounds added below
2228  &options,
2229  sizeof(ScanOptions));
2230 
2231  /* Add any bounds that were specified */
2232  if (isop->firstRangeOldApi != NULL)
2233  {
2234  NdbRecAttr* bound= isop->firstRangeOldApi;
2235  while (bound != NULL)
2236  {
2237  if (isop->setBound( m_accessTable->m_ndbrecord,
2238  *isop->getIndexBoundFromRecAttr(bound) ) != 0)
2239  return -1;
2240 
2241  bound= bound->next();
2242  }
2243  }
2244 
2245  isop->releaseIndexBoundsOldApi();
2246  }
2247 
2248  /* Free any scan-owned ScanFilter generated InterpretedCode
2249  * object
2250  */
2251  freeInterpretedCodeOldApi();
2252 
2253  return result;
2254 }
2255 
2256 /***************************************************************************
2257 int prepareSendScan(Uint32 aTC_ConnectPtr,
2258  Uint64 aTransactionId)
2259 
2260 Return Value: Return 0 : preparation of send was succesful.
2261  Return -1: In all other case.
2262 Parameters: aTC_ConnectPtr: the Connect pointer to TC.
2263  aTransactionId: the Transaction identity of the transaction.
2264 Remark: Puts the the final data into ATTRINFO signal(s) after this
2265  we know the how many signal to send and their sizes
2266 ***************************************************************************/
2267 int NdbScanOperation::prepareSendScan(Uint32 aTC_ConnectPtr,
2268  Uint64 aTransactionId){
2269  if (theInterpretIndicator != 1 ||
2270  (theOperationType != OpenScanRequest &&
2271  theOperationType != OpenRangeScanRequest)) {
2272  setErrorCodeAbort(4005);
2273  return -1;
2274  }
2275 
2276  theErrorLine = 0;
2277 
2278  /* All scans use NdbRecord at this stage */
2279  assert(m_attribute_record);
2280 
2284  theReceiver.prepareSend();
2285  bool keyInfo = m_keyInfo;
2286  Uint32 key_size= keyInfo ? m_attribute_record->m_keyLenInWords : 0;
2287 
2292  ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend());
2293  Uint32 batch_size = req->first_batch_size; // User specified
2294  Uint32 batch_byte_size, first_batch_size;
2295  theReceiver.calculate_batch_size(key_size,
2296  theParallelism,
2297  batch_size,
2298  batch_byte_size,
2299  first_batch_size,
2300  m_attribute_record);
2301  ScanTabReq::setScanBatch(req->requestInfo, batch_size);
2302  req->batch_byte_size= batch_byte_size;
2303  req->first_batch_size= first_batch_size;
2304 
2310  Uint32 reqInfo = req->requestInfo;
2311  ScanTabReq::setKeyinfoFlag(reqInfo, keyInfo);
2312  ScanTabReq::setNoDiskFlag(reqInfo, (m_flags & OF_NO_DISK) != 0);
2313 
2314  /* Set distribution key info if required */
2315  ScanTabReq::setDistributionKeyFlag(reqInfo, theDistrKeyIndicator_);
2316  req->requestInfo = reqInfo;
2317  req->distributionKey= theDistributionKey;
2318  theSCAN_TABREQ->setLength(ScanTabReq::StaticLength + theDistrKeyIndicator_);
2319 
2320  /* All scans use NdbRecord internally */
2321  assert(theStatus == UseNdbRecord);
2322 
2323  assert(theParallelism > 0);
2324  Uint32 rowsize= NdbReceiver::ndbrecord_rowsize(m_attribute_record,
2325  theReceiver.theFirstRecAttr,
2326  key_size,
2327  m_read_range_no);
2328  Uint32 bufsize= batch_size*rowsize;
2329  char *buf= new char[bufsize*theParallelism];
2330  if (!buf)
2331  {
2332  setErrorCodeAbort(4000); // "Memory allocation error"
2333  return -1;
2334  }
2335  assert(!m_scan_buffer);
2336  m_scan_buffer= buf;
2337 
2338  for (Uint32 i = 0; i<theParallelism; i++)
2339  {
2340  m_receivers[i]->do_setup_ndbrecord(m_attribute_record, batch_size,
2341  key_size, m_read_range_no,
2342  rowsize, buf);
2343  buf+= bufsize;
2344  }
2345 
2346  /* Update ATTRINFO section sizes info */
2347  if (doSendSetAISectionSizes() == -1)
2348  return -1;
2349 
2350  return 0;
2351 }
2352 
2353 int
2354 NdbScanOperation::doSendSetAISectionSizes()
2355 {
2356  // Set the scan AI section sizes.
2357  Uint32* sectionSizesPtr= theFirstATTRINFO->getDataPtrSend();
2358  *sectionSizesPtr++ = theInitialReadSize;
2359  *sectionSizesPtr++ = theInterpretedSize;
2360  *sectionSizesPtr++ = 0; // Update size
2361  *sectionSizesPtr++ = 0; // Final read size
2362  *sectionSizesPtr = theSubroutineSize;
2363 
2364  return 0;
2365 }
2366 
2367 
2368 /*****************************************************************************
2369 int doSendScan()
2370 
2371 Return Value: Return >0 : send was succesful, returns number of signals sent
2372  Return -1: In all other case.
2373 Parameters: aProcessorId: Receiving processor node
2374 Remark: Sends the ATTRINFO signal(s)
2375 *****************************************************************************/
2376 int
2377 NdbScanOperation::doSendScan(int aProcessorId)
2378 {
2379  if (theInterpretIndicator != 1 ||
2380  (theOperationType != OpenScanRequest &&
2381  theOperationType != OpenRangeScanRequest)) {
2382  setErrorCodeAbort(4005);
2383  return -1;
2384  }
2385 
2386  assert(theSCAN_TABREQ != NULL);
2387 
2388  /* Check that we don't have too much AttrInfo */
2389  if (unlikely(theTotalCurrAI_Len > ScanTabReq::MaxTotalAttrInfo)) {
2390  setErrorCode(4257);
2391  return -1;
2392  }
2393 
2394  /* SCANTABREQ always has 2 mandatory sections and an optional
2395  * third section
2396  * Section 0 : List of receiver Ids NDBAPI has allocated
2397  * for the scan
2398  * Section 1 : ATTRINFO section
2399  * Section 2 : Optional KEYINFO section
2400  */
2401  GenericSectionPtr secs[3];
2402  LinearSectionIterator receiverIdIterator(m_prepared_receivers,
2403  theParallelism);
2404  SignalSectionIterator attrInfoIter(theFirstATTRINFO);
2405  SignalSectionIterator keyInfoIter(theSCAN_TABREQ->next());
2406 
2407  secs[0].sectionIter= &receiverIdIterator;
2408  secs[0].sz= theParallelism;
2409 
2410  secs[1].sectionIter= &attrInfoIter;
2411  secs[1].sz= theTotalCurrAI_Len;
2412 
2413  Uint32 numSections= 2;
2414 
2415  if (theTupKeyLen)
2416  {
2417  secs[2].sectionIter= &keyInfoIter;
2418  secs[2].sz= theTupKeyLen;
2419  numSections= 3;
2420  }
2421 
2422  NdbImpl* impl = theNdb->theImpl;
2423  {
2424  const Ndb::ClientStatistics counterIndex = (numSections == 3)?
2425  Ndb::RangeScanCount :
2426  Ndb::TableScanCount;
2427  impl->incClientStat(counterIndex, 1);
2428  if (getPruned())
2429  impl->incClientStat(Ndb::PrunedScanCount, 1);
2430  }
2431  Uint32 tcNodeVersion = impl->getNodeNdbVersion(aProcessorId);
2432  bool forceShort = impl->forceShortRequests;
2433  bool sendLong = ( tcNodeVersion >= NDBD_LONG_SCANTABREQ) &&
2434  ! forceShort;
2435 
2436  if (sendLong)
2437  {
2438  /* Send Fragmented as SCAN_TABREQ can be large */
2439  if (impl->sendFragmentedSignal(theSCAN_TABREQ,
2440  aProcessorId,
2441  &secs[0],
2442  numSections) == -1)
2443  {
2444  setErrorCode(4002);
2445  return -1;
2446  }
2447  }
2448  else
2449  {
2450  /* Send a 'short' SCANTABREQ - e.g. long SCANTABREQ
2451  * with signalIds as first section, followed by
2452  * AttrInfo and KeyInfo trains
2453  */
2454  Uint32 attrInfoLen = secs[1].sz;
2455  Uint32 keyInfoLen = (numSections == 3)? secs[2].sz : 0;
2456 
2457  ScanTabReq* scanTabReq = (ScanTabReq*) theSCAN_TABREQ->getDataPtrSend();
2458  Uint32 connectPtr = scanTabReq->apiConnectPtr;
2459  Uint32 transId1 = scanTabReq->transId1;
2460  Uint32 transId2 = scanTabReq->transId2;
2461 
2462  /* Modify ScanTabReq to carry length of keyinfo and attrinfo */
2463  scanTabReq->attrLenKeyLen = (keyInfoLen << 16) | attrInfoLen;
2464 
2465  /* Send with receiver Ids as first and only section */
2466  if (impl->sendSignal(theSCAN_TABREQ, aProcessorId, &secs[0], 1) == -1)
2467  {
2468  setErrorCode(4002);
2469  return -1;
2470  }
2471 
2472  if (keyInfoLen)
2473  {
2474  GSIReader keyInfoReader(secs[2].sectionIter);
2475  theSCAN_TABREQ->theVerId_signalNumber = GSN_KEYINFO;
2476  KeyInfo* keyInfo = (KeyInfo*) theSCAN_TABREQ->getDataPtrSend();
2477  keyInfo->connectPtr = connectPtr;
2478  keyInfo->transId[0] = transId1;
2479  keyInfo->transId[1] = transId2;
2480 
2481  while(keyInfoLen)
2482  {
2483  Uint32 dataWords = MIN(keyInfoLen, KeyInfo::DataLength);
2484  keyInfoReader.copyNWords(&keyInfo->keyData[0], dataWords);
2485  theSCAN_TABREQ->setLength(KeyInfo::HeaderLength + dataWords);
2486 
2487  if (impl->sendSignal(theSCAN_TABREQ, aProcessorId) == -1)
2488  {
2489  setErrorCode(4002);
2490  return -1;
2491  }
2492  keyInfoLen -= dataWords;
2493  }
2494  }
2495 
2496  GSIReader attrInfoReader(secs[1].sectionIter);
2497  theSCAN_TABREQ->theVerId_signalNumber = GSN_ATTRINFO;
2498  AttrInfo* attrInfo = (AttrInfo*) theSCAN_TABREQ->getDataPtrSend();
2499  attrInfo->connectPtr = connectPtr;
2500  attrInfo->transId[0] = transId1;
2501  attrInfo->transId[1] = transId2;
2502 
2503  while(attrInfoLen)
2504  {
2505  Uint32 dataWords = MIN(attrInfoLen, AttrInfo::DataLength);
2506  attrInfoReader.copyNWords(&attrInfo->attrData[0], dataWords);
2507  theSCAN_TABREQ->setLength(AttrInfo::HeaderLength + dataWords);
2508 
2509  if (impl->sendSignal(theSCAN_TABREQ, aProcessorId) == -1)
2510  {
2511  setErrorCode(4002);
2512  return -1;
2513  }
2514  attrInfoLen -= dataWords;
2515  }
2516  }
2517 
2518  theStatus = WaitResponse;
2519  return 1; // 1 signal sent
2520 }//NdbOperation::doSendScan()
2521 
2522 
2523 /* This method retrieves a pointer to the keyinfo for the current
2524  * row - it is used when creating a scan takeover operation
2525  */
2526 int
2527 NdbScanOperation::getKeyFromKEYINFO20(Uint32* data, Uint32 & size)
2528 {
2529  NdbRecAttr * tRecAttr = m_curr_row;
2530  if(tRecAttr)
2531  {
2532  const Uint32 * src = (Uint32*)tRecAttr->aRef();
2533 
2534  assert(tRecAttr->get_size_in_bytes() > 0);
2535  assert(tRecAttr->get_size_in_bytes() < 65536);
2536  const Uint32 len = (tRecAttr->get_size_in_bytes() + 3)/4-1;
2537 
2538  assert(size >= len);
2539  memcpy(data, src, 4*len);
2540  size = len;
2541  return 0;
2542  }
2543  return -1;
2544 }
2545 
2546 /*****************************************************************************
2547  * NdbOperation* takeOverScanOp(NdbTransaction* updateTrans);
2548  *
2549  * Parameters: The update transactions NdbTransaction pointer.
2550  * Return Value: A reference to the transferred operation object
2551  * or NULL if no success.
2552  * Remark: Take over the scanning transactions NdbOperation
2553  * object for a tuple to an update transaction,
2554  * which is the last operation read in nextScanResult()
2555  * (theNdbCon->thePreviousScanRec)
2556  *
2557  * FUTURE IMPLEMENTATION: (This note was moved from header file.)
2558  * In the future, it will even be possible to transfer
2559  * to a NdbTransaction on another Ndb-object.
2560  * In this case the receiving NdbTransaction-object must call
2561  * a method receiveOpFromScan to actually receive the information.
2562  * This means that the updating transactions can be placed
2563  * in separate threads and thus increasing the parallelism during
2564  * the scan process.
2565  ****************************************************************************/
2566 NdbOperation*
2567 NdbScanOperation::takeOverScanOp(OperationType opType, NdbTransaction* pTrans)
2568 {
2569  if (!m_scanUsingOldApi)
2570  {
2571  setErrorCodeAbort(4284);
2572  return NULL;
2573  }
2574 
2575  if (!m_keyInfo)
2576  {
2577  // Cannot take over lock if no keyinfo was requested
2578  setErrorCodeAbort(4604);
2579  return NULL;
2580  }
2581 
2582  /*
2583  * Get the Keyinfo from the NdbRecord result row
2584  */
2585  Uint32 infoword= 0;
2586  Uint32 len= 0;
2587  const char *src= NULL;
2588 
2589  Uint32 idx= m_current_api_receiver;
2590  if (idx >= m_api_receivers_count)
2591  return NULL;
2592  const NdbReceiver *receiver= m_api_receivers[m_current_api_receiver];
2593 
2594  /* Get this row's KeyInfo data */
2595  int res= receiver->get_keyinfo20(infoword, len, src);
2596  if (res == -1)
2597  return NULL;
2598 
2599  NdbOperation * newOp = pTrans->getNdbOperation(m_currentTable);
2600  if (newOp == NULL){
2601  return NULL;
2602  }
2603  pTrans->theSimpleState = 0;
2604 
2605  assert(len > 0);
2606  assert(len < 16384);
2607 
2608  newOp->theTupKeyLen = len;
2609  newOp->theOperationType = opType;
2610  newOp->m_abortOption = AbortOnError;
2611  switch (opType) {
2612  case (ReadRequest):
2613  newOp->theLockMode = theLockMode;
2614  // Fall through
2615  case (DeleteRequest):
2616  newOp->theStatus = GetValue;
2617  break;
2618  default:
2619  newOp->theStatus = SetValue;
2620  }
2621  const Uint32 tScanInfo = infoword & 0x3FFFF;
2622  const Uint32 tTakeOverFragment = infoword >> 20;
2623  {
2624  UintR scanInfo = 0;
2625  TcKeyReq::setTakeOverScanFlag(scanInfo, 1);
2626  TcKeyReq::setTakeOverScanFragment(scanInfo, tTakeOverFragment);
2627  TcKeyReq::setTakeOverScanInfo(scanInfo, tScanInfo);
2628  newOp->theScanInfo = scanInfo;
2629  newOp->theDistrKeyIndicator_ = 1;
2630  newOp->theDistributionKey = tTakeOverFragment;
2631  }
2632 
2633  // Copy the first 8 words of key info from KEYINF20 into TCKEYREQ
2634  TcKeyReq * tcKeyReq = CAST_PTR(TcKeyReq,newOp->theTCREQ->getDataPtrSend());
2635  Uint32 i = MIN(TcKeyReq::MaxKeyInfo, len);
2636  memcpy(tcKeyReq->keyInfo, src, 4*i);
2637  src += i * 4;
2638 
2639  if(i < len){
2640  NdbApiSignal* tSignal = theNdb->getSignal();
2641  newOp->theTCREQ->next(tSignal);
2642 
2643  Uint32 left = len - i;
2644  while(tSignal && left > KeyInfo::DataLength){
2645  tSignal->setSignal(GSN_KEYINFO, refToBlock(pTrans->m_tcRef));
2646  tSignal->setLength(KeyInfo::MaxSignalLength);
2647  KeyInfo * keyInfo = CAST_PTR(KeyInfo, tSignal->getDataPtrSend());
2648  memcpy(keyInfo->keyData, src, 4 * KeyInfo::DataLength);
2649  src += 4 * KeyInfo::DataLength;
2650  left -= KeyInfo::DataLength;
2651 
2652  tSignal->next(theNdb->getSignal());
2653  tSignal = tSignal->next();
2654  newOp->theLastKEYINFO = tSignal;
2655  }
2656 
2657  if(tSignal && left > 0){
2658  tSignal->setSignal(GSN_KEYINFO, refToBlock(pTrans->m_tcRef));
2659  tSignal->setLength(KeyInfo::HeaderLength + left);
2660  newOp->theLastKEYINFO = tSignal;
2661  KeyInfo * keyInfo = CAST_PTR(KeyInfo, tSignal->getDataPtrSend());
2662  memcpy(keyInfo->keyData, src, 4 * left);
2663  }
2664  }
2665  /* create blob handles automatically for a delete - other ops must
2666  * create manually
2667  */
2668  if (opType == DeleteRequest && m_currentTable->m_noOfBlobs != 0) {
2669  for (unsigned i = 0; i < m_currentTable->m_columns.size(); i++) {
2670  NdbColumnImpl* c = m_currentTable->m_columns[i];
2671  assert(c != 0);
2672  if (c->getBlobType()) {
2673  if (newOp->getBlobHandle(pTrans, c) == NULL)
2674  return NULL;
2675  }
2676  }
2677  }
2678 
2679  return newOp;
2680 }
2681 
2682 NdbOperation*
2683 NdbScanOperation::takeOverScanOpNdbRecord(OperationType opType,
2684  NdbTransaction* pTrans,
2685  const NdbRecord *record,
2686  char *row,
2687  const unsigned char *mask,
2688  const NdbOperation::OperationOptions *opts,
2689  Uint32 sizeOfOptions)
2690 {
2691  int res;
2692 
2693  if (!m_attribute_record)
2694  {
2695  setErrorCodeAbort(4284);
2696  return NULL;
2697  }
2698  if (!record)
2699  {
2700  setErrorCodeAbort(4285);
2701  return NULL;
2702  }
2703  if (!m_keyInfo)
2704  {
2705  // Cannot take over lock if no keyinfo was requested
2706  setErrorCodeAbort(4604);
2707  return NULL;
2708  }
2709  if (record->flags & NdbRecord::RecIsIndex)
2710  {
2711  /* result_record must be a base table ndbrecord, not an index ndbrecord */
2712  setErrorCodeAbort(4340);
2713  return NULL;
2714  }
2715  if (m_blob_lock_upgraded)
2716  {
2717  /* This was really a CommittedRead scan, which does not support
2718  * lock takeover
2719  */
2720  /* takeOverScanOp, to take over a scanned row one must explicitly
2721  * request keyinfo on readTuples call
2722  */
2723  setErrorCodeAbort(4604);
2724  return NULL;
2725  }
2726 
2727  NdbOperation *op= pTrans->getNdbOperation(record->table, NULL, true);
2728  if (!op)
2729  return NULL;
2730 
2731  pTrans->theSimpleState= 0;
2732  op->theStatus= NdbOperation::UseNdbRecord;
2733  op->theOperationType= opType;
2734  op->m_abortOption= AbortOnError;
2735  op->m_key_record= NULL; // This means m_key_row has KEYINFO20 data
2736  op->m_attribute_record= record;
2737  /*
2738  The m_key_row pointer is only valid until next call of
2739  nextResult(fetchAllowed=true). But that is ok, since the lock is also
2740  only valid until that time, so the application must execute() the new
2741  operation before then.
2742  */
2743 
2744  /* Now find the current row, and extract keyinfo. */
2745  Uint32 idx= m_current_api_receiver;
2746  if (idx >= m_api_receivers_count)
2747  return NULL;
2748  const NdbReceiver *receiver= m_api_receivers[m_current_api_receiver];
2749  Uint32 infoword;
2750  res= receiver->get_keyinfo20(infoword, op->m_keyinfo_length, op->m_key_row);
2751  if (res==-1)
2752  return NULL;
2753  Uint32 scanInfo= 0;
2754  TcKeyReq::setTakeOverScanFlag(scanInfo, 1);
2755  Uint32 fragment= infoword >> 20;
2756  TcKeyReq::setTakeOverScanFragment(scanInfo, fragment);
2757  TcKeyReq::setTakeOverScanInfo(scanInfo, infoword & 0x3FFFF);
2758  op->theScanInfo= scanInfo;
2759  op->theDistrKeyIndicator_= 1;
2760  op->theDistributionKey= fragment;
2761 
2762  op->m_attribute_row= row;
2763  AttributeMask readMask;
2764  record->copyMask(readMask.rep.data, mask);
2765 
2766  if (opType == ReadRequest)
2767  {
2768  op->theLockMode= theLockMode;
2769  /*
2770  * Apart from taking over the row lock, we also support reading again,
2771  * though typical usage will probably use an empty mask to read nothing.
2772  */
2773  op->theReceiver.getValues(record, row);
2774  }
2775  else if (opType == DeleteRequest && row != NULL)
2776  {
2777  /* Delete with a 'pre-read' - prepare the Receiver */
2778  op->theReceiver.getValues(record, row);
2779  }
2780 
2781 
2782  /* Handle any OperationOptions */
2783  if (opts != NULL)
2784  {
2785  /* Delegate to static method in NdbOperation */
2786  Uint32 result = NdbOperation::handleOperationOptions (opType,
2787  opts,
2788  sizeOfOptions,
2789  op);
2790  if (result != 0)
2791  {
2792  setErrorCodeAbort(result);
2793  return NULL;
2794  }
2795  }
2796 
2797 
2798  /* Setup Blob handles... */
2799  switch (opType)
2800  {
2801  case ReadRequest:
2802  case UpdateRequest:
2803  if (unlikely(record->flags & NdbRecord::RecHasBlob))
2804  {
2805  if (op->getBlobHandlesNdbRecord(pTrans, readMask.rep.data) == -1)
2806  return NULL;
2807  }
2808 
2809  break;
2810 
2811  case DeleteRequest:
2812  /* Create blob handles if required, to properly delete all blob parts
2813  * If a pre-delete-read was requested, check that it does not ask for
2814  * Blob columns to be read.
2815  */
2816  if (unlikely(record->flags & NdbRecord::RecTableHasBlob))
2817  {
2818  if (op->getBlobHandlesNdbRecordDelete(pTrans,
2819  row != NULL,
2820  readMask.rep.data) == -1)
2821  return NULL;
2822  }
2823  break;
2824  default:
2825  assert(false);
2826  return NULL;
2827  }
2828 
2829  /* Now prepare the signals to be sent...
2830  */
2831  int returnCode=op->buildSignalsNdbRecord(pTrans->theTCConPtr,
2832  pTrans->theTransactionId,
2833  readMask.rep.data);
2834 
2835  if (returnCode)
2836  {
2837  // buildSignalsNdbRecord should have set the error status
2838  // So we can return NULL
2839  return NULL;
2840  }
2841 
2842  return op;
2843 }
2844 
2845 NdbBlob*
2846 NdbScanOperation::getBlobHandle(const char* anAttrName)
2847 {
2848  const NdbColumnImpl* col= m_currentTable->getColumn(anAttrName);
2849 
2850  if (col != NULL)
2851  {
2852  /* We need the row KeyInfo for Blobs
2853  * Old Api scans have saved flags at this point
2854  */
2855  if (m_scanUsingOldApi)
2856  m_savedScanFlagsOldApi|= SF_KeyInfo;
2857  else
2858  m_keyInfo= 1;
2859 
2860  return NdbOperation::getBlobHandle(m_transConnection, col);
2861  }
2862  else
2863  {
2864  setErrorCode(4004);
2865  return NULL;
2866  }
2867 }
2868 
2869 NdbBlob*
2870 NdbScanOperation::getBlobHandle(Uint32 anAttrId)
2871 {
2872  const NdbColumnImpl* col= m_currentTable->getColumn(anAttrId);
2873 
2874  if (col != NULL)
2875  {
2876  /* We need the row KeyInfo for Blobs
2877  * Old Api scans have saved flags at this point
2878  */
2879  if (m_scanUsingOldApi)
2880  m_savedScanFlagsOldApi|= SF_KeyInfo;
2881  else
2882  m_keyInfo= 1;
2883 
2884  return NdbOperation::getBlobHandle(m_transConnection, col);
2885  }
2886  else
2887  {
2888  setErrorCode(4004);
2889  return NULL;
2890  }
2891 }
2892 
2899 NdbRecAttr*
2901  char* aValue)
2902 {
2903  DBUG_ENTER("NdbScanOperation::getValue_NdbRecord_scan");
2904  int res;
2905  NdbRecAttr *ra;
2906  DBUG_PRINT("info", ("Column: %u", attrInfo->m_attrId));
2907 
2908  if (attrInfo->m_storageType == NDB_STORAGETYPE_DISK)
2909  {
2910  m_flags &= ~Uint8(OF_NO_DISK);
2911  }
2912 
2913  res= insertATTRINFOHdr_NdbRecord(attrInfo->m_attrId, 0);
2914  if (res==-1)
2915  DBUG_RETURN(NULL);
2916 
2917  theInitialReadSize= theTotalCurrAI_Len - AttrInfo::SectionSizeInfoLength;
2918  ra= theReceiver.getValue(attrInfo, aValue);
2919  if (!ra)
2920  {
2921  setErrorCodeAbort(4000);
2922  DBUG_RETURN(NULL);
2923  }
2924  theErrorLine++;
2925  DBUG_RETURN(ra);
2926 }
2927 
2936 NdbRecAttr*
2938  char* aValue)
2939 {
2940  NdbRecAttr *recAttr= NULL;
2941 
2942  /* Get a RecAttr object, which is linked in to the Receiver's
2943  * RecAttr linked list, and return to caller
2944  */
2945  if (attrInfo != NULL)
2946  {
2947  if (attrInfo->m_storageType == NDB_STORAGETYPE_DISK)
2948  {
2949  m_flags &= ~Uint8(OF_NO_DISK);
2950  }
2951 
2952  recAttr = theReceiver.getValue(attrInfo, aValue);
2953 
2954  if (recAttr != NULL)
2955  theErrorLine++;
2956  else {
2957  /* MEMORY ALLOCATION ERROR */
2958  setErrorCodeAbort(4000);
2959  }
2960  }
2961  else {
2962  /* Attribute name or id not found in the table */
2963  setErrorCodeAbort(4004);
2964  }
2965 
2966  return recAttr;
2967 }
2968 
2969 NdbRecAttr*
2970 NdbScanOperation::getValue_impl(const NdbColumnImpl *attrInfo, char *aValue)
2971 {
2972  if (theStatus == UseNdbRecord)
2973  return getValue_NdbRecord_scan(attrInfo, aValue);
2974  else
2975  return getValue_NdbRecAttr_scan(attrInfo, aValue);
2976 }
2977 
2978 NdbIndexScanOperation::NdbIndexScanOperation(Ndb* aNdb)
2979  : NdbScanOperation(aNdb, NdbOperation::OrderedIndexScan)
2980 {
2981  firstRangeOldApi= NULL;
2982  lastRangeOldApi= NULL;
2983  currentRangeOldApi= NULL;
2984 
2985 }
2986 
2987 NdbIndexScanOperation::~NdbIndexScanOperation(){
2988 }
2989 
2990 int
2991 NdbIndexScanOperation::setBound(const char* anAttrName, int type,
2992  const void* aValue)
2993 {
2994  return setBound(m_accessTable->getColumn(anAttrName), type, aValue);
2995 }
2996 
2997 int
2998 NdbIndexScanOperation::setBound(Uint32 anAttrId, int type,
2999  const void* aValue)
3000 {
3001  return setBound(m_accessTable->getColumn(anAttrId), type, aValue);
3002 }
3003 
3004 int
3005 NdbIndexScanOperation::equal_impl(const NdbColumnImpl* anAttrObject,
3006  const char* aValue)
3007 {
3008  return setBound(anAttrObject, BoundEQ, aValue);
3009 }
3010 
3011 NdbRecAttr*
3012 NdbIndexScanOperation::getValue_impl(const NdbColumnImpl* attrInfo,
3013  char* aValue){
3014  /* Defer to ScanOperation implementation */
3015  // TODO : IndexScans always fetch PK columns via their key NdbRecord
3016  // If the user also requests them, we should avoid fetching them
3017  // twice.
3018  return NdbScanOperation::getValue_impl(attrInfo, aValue);
3019 }
3020 
3021 
3022 /* Helper for setBound called via the old Api.
3023  * Key bound information is stored in the operation for later
3024  * processing using the normal NdbRecord setBound interface.
3025  */
3026 int
3027 NdbIndexScanOperation::setBoundHelperOldApi(OldApiBoundInfo& boundInfo,
3028  Uint32 maxKeyRecordBytes,
3029  Uint32 index_attrId,
3030  Uint32 valueLen,
3031  bool inclusive,
3032  Uint32 byteOffset,
3033  Uint32 nullbit_byte_offset,
3034  Uint32 nullbit_bit_in_byte,
3035  const void *aValue)
3036 {
3037  Uint32 presentBitMask= (1 << (index_attrId & 0x1f));
3038 
3039  if ((boundInfo.keysPresentBitmap & presentBitMask) != 0)
3040  {
3041  /* setBound() called twice for same key */
3042  setErrorCodeAbort(4522);
3043  return -1;
3044  }
3045 
3046  /* Set bit in mask for key column presence */
3047  boundInfo.keysPresentBitmap |= presentBitMask;
3048 
3049  if ((index_attrId + 1) > boundInfo.highestKey)
3050  {
3051  // New highest key, check previous keys
3052  // are non-strict
3053  if (boundInfo.highestSoFarIsStrict)
3054  {
3055  /* Invalid set of range scan bounds */
3056  setErrorCodeAbort(4259);
3057  return -1;
3058  }
3059  boundInfo.highestKey= (index_attrId + 1);
3060  boundInfo.highestSoFarIsStrict= !inclusive;
3061  }
3062  else
3063  {
3064  /* Not highest, key, better not be strict */
3065  if (!inclusive)
3066  {
3067  /* Invalid set of range scan bounds */
3068  setErrorCodeAbort(4259);
3069  return -1;
3070  }
3071  }
3072 
3073  /* Copy data into correct part of RecAttr */
3074  assert(byteOffset + valueLen <= maxKeyRecordBytes);
3075 
3076  memcpy(boundInfo.key + byteOffset,
3077  aValue,
3078  valueLen);
3079 
3080  /* Set Null bit */
3081  bool nullBit=(aValue == NULL);
3082 
3083  boundInfo.key[nullbit_byte_offset]|=
3084  (nullBit) << nullbit_bit_in_byte;
3085 
3086  return 0;
3087 }
3088 
3089 /*
3090  * Define bound on index column in range scan.
3091  */
3092 int
3094  int type, const void* aValue)
3095 {
3096  if (!tAttrInfo)
3097  {
3098  setErrorCodeAbort(4318); // Invalid attribute
3099  return -1;
3100  }
3101  if (theOperationType == OpenRangeScanRequest &&
3102  (0 <= type && type <= 4))
3103  {
3104  const NdbRecord *key_record= m_accessTable->m_ndbrecord;
3105  const Uint32 maxKeyRecordBytes= key_record->m_row_size;
3106 
3107  Uint32 valueLen = 0;
3108  if (aValue != NULL)
3109  if (! tAttrInfo->get_var_length(aValue, valueLen)) {
3110  /* Length parameter in equal/setValue is incorrect */
3111  setErrorCodeAbort(4209);
3112  return -1;
3113  }
3114 
3115  /* Get details of column from NdbRecord */
3116  Uint32 byteOffset= 0;
3117 
3118  /* Get the Attr struct from the key NdbRecord for this index Attr */
3119  Uint32 attrId= tAttrInfo->m_attrId;
3120 
3121  if (attrId >= key_record->key_index_length)
3122  {
3123  /* Attempt to set bound on non key column */
3124  setErrorCodeAbort(4535);
3125  return -1;
3126  }
3127  Uint32 columnNum= key_record->key_indexes[ attrId ];
3128 
3129  if (columnNum >= key_record->noOfColumns)
3130  {
3131  /* Internal error in NdbApi */
3132  setErrorCodeAbort(4005);
3133  return -1;
3134  }
3135 
3136  NdbRecord::Attr attr= key_record->columns[ columnNum ];
3137 
3138  byteOffset= attr.offset;
3139 
3140  bool inclusive= ! ((type == BoundLT) || (type == BoundGT));
3141 
3142  if (currentRangeOldApi == NULL)
3143  {
3144  /* Current bound is undefined, allocate space for definition */
3145  NdbRecAttr* boundSpace= theNdb->getRecAttr();
3146  if (boundSpace == NULL)
3147  {
3148  /* Memory allocation error */
3149  setErrorCodeAbort(4000);
3150  return -1;
3151  }
3152  if (boundSpace->setup(sizeof(OldApiScanRangeDefinition) +
3153  (2 * maxKeyRecordBytes) - 1, NULL) != 0)
3154  {
3155  theNdb->releaseRecAttr(boundSpace);
3156  /* Memory allocation error */
3157  setErrorCodeAbort(4000);
3158  return -1;
3159  }
3160 
3161  /* Initialise bounds definition info */
3162  OldApiScanRangeDefinition* boundsDef=
3163  (OldApiScanRangeDefinition*) boundSpace->aRef();
3164 
3165  boundsDef->oldBound.lowBound.highestKey = 0;
3166  boundsDef->oldBound.lowBound.highestSoFarIsStrict = false;
3167  /* Should be STATIC_ASSERT */
3168  assert(NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY == 32);
3169  boundsDef->oldBound.lowBound.keysPresentBitmap = 0;
3170 
3171  boundsDef->oldBound.highBound= boundsDef->oldBound.lowBound;
3172  boundsDef->oldBound.lowBound.key= &boundsDef->space[ 0 ];
3173  boundsDef->oldBound.highBound.key= &boundsDef->space[ maxKeyRecordBytes ];
3174 
3175  currentRangeOldApi= boundSpace;
3176  }
3177 
3178  OldApiScanRangeDefinition* bounds=
3179  (OldApiScanRangeDefinition*) currentRangeOldApi->aRef();
3180 
3181 
3182  /* Add to lower bound if required */
3183  if (type == BoundEQ ||
3184  type == BoundLE ||
3185  type == BoundLT )
3186  {
3187  if (setBoundHelperOldApi(bounds->oldBound.lowBound,
3188  maxKeyRecordBytes,
3189  tAttrInfo->m_attrId,
3190  valueLen,
3191  inclusive,
3192  byteOffset,
3193  attr.nullbit_byte_offset,
3194  attr.nullbit_bit_in_byte,
3195  aValue) != 0)
3196  return -1;
3197  }
3198 
3199  /* Add to upper bound if required */
3200  if (type == BoundEQ ||
3201  type == BoundGE ||
3202  type == BoundGT)
3203  {
3204  if (setBoundHelperOldApi(bounds->oldBound.highBound,
3205  maxKeyRecordBytes,
3206  tAttrInfo->m_attrId,
3207  valueLen,
3208  inclusive,
3209  byteOffset,
3210  attr.nullbit_byte_offset,
3211  attr.nullbit_bit_in_byte,
3212  aValue) != 0)
3213  return -1;
3214  }
3215  return 0;
3216  }
3217  else {
3218  /* Can only call setBound/equal() for an NdbIndexScanOperation */
3219  setErrorCodeAbort(4514);
3220  return -1;
3221  }
3222 }
3223 
3224 
3225 /* Method called just prior to scan execution to initialise
3226  * the passed in IndexBound for the scan using the information
3227  * stored by the old API's setBound() call.
3228  * Return codes
3229  * 0 == bound present and built
3230  * 1 == bound not present
3231  * -1 == error
3232  */
3233 int
3234 NdbIndexScanOperation::buildIndexBoundOldApi(int range_no)
3235 {
3236  IndexBound ib;
3237  OldApiScanRangeDefinition* boundDef=
3238  (OldApiScanRangeDefinition*) currentRangeOldApi->aRef();
3239 
3240  int result = 1;
3241 
3242  if (boundDef->oldBound.lowBound.highestKey != 0)
3243  {
3244  /* Have a low bound
3245  * Check that a contiguous set of keys are supplied.
3246  * Setup low part of IndexBound
3247  */
3248  Uint32 expectedValue= (~(Uint32) 0) >> (32 - boundDef->oldBound.lowBound.highestKey);
3249 
3250  if (boundDef->oldBound.lowBound.keysPresentBitmap != expectedValue)
3251  {
3252  /* Invalid set of range scan bounds */
3253  setErrorCodeAbort(4259);
3254  return -1;
3255  }
3256 
3257  ib.low_key= boundDef->oldBound.lowBound.key;
3258  ib.low_key_count= boundDef->oldBound.lowBound.highestKey;
3259  ib.low_inclusive= !boundDef->oldBound.lowBound.highestSoFarIsStrict;
3260  result= 0;
3261  }
3262  else
3263  {
3264  ib.low_key= NULL;
3265  ib.low_key_count= 0;
3266  ib.low_inclusive= false;
3267  }
3268 
3269  if (boundDef->oldBound.highBound.highestKey != 0)
3270  {
3271  /* Have a high bound
3272  * Check that a contiguous set of keys are supplied.
3273  */
3274  Uint32 expectedValue= (~(Uint32) 0) >> (32 - boundDef->oldBound.highBound.highestKey);
3275 
3276  if (boundDef->oldBound.highBound.keysPresentBitmap != expectedValue)
3277  {
3278  /* Invalid set of range scan bounds */
3279  setErrorCodeAbort(4259);
3280  return -1;
3281  }
3282 
3283  ib.high_key= boundDef->oldBound.highBound.key;
3284  ib.high_key_count= boundDef->oldBound.highBound.highestKey;
3285  ib.high_inclusive= !boundDef->oldBound.highBound.highestSoFarIsStrict;
3286  result= 0;
3287  }
3288  else
3289  {
3290  ib.high_key= NULL;
3291  ib.high_key_count= 0;
3292  ib.high_inclusive= false;
3293  }
3294 
3295  ib.range_no= range_no;
3296 
3297  boundDef->ib= ib;
3298 
3299  assert( currentRangeOldApi->next() == NULL );
3300 
3301  if (lastRangeOldApi == NULL)
3302  {
3303  /* First bound */
3304  assert( firstRangeOldApi == NULL );
3305  firstRangeOldApi= lastRangeOldApi= currentRangeOldApi;
3306  }
3307  else
3308  {
3309  /* Other bounds exist, add this to the end of the bounds list */
3310  assert( firstRangeOldApi != NULL );
3311  assert( lastRangeOldApi->next() == NULL );
3312  lastRangeOldApi->next(currentRangeOldApi);
3313  lastRangeOldApi= currentRangeOldApi;
3314  }
3315 
3316  currentRangeOldApi= NULL;
3317 
3318  return result;
3319 }
3320 
3322 NdbIndexScanOperation::getIndexBoundFromRecAttr(NdbRecAttr* recAttr)
3323 {
3324  return &((OldApiScanRangeDefinition*)recAttr->aRef())->ib;
3325 }
3326 /* Method called to release any resources allocated by the old
3327  * Index Scan bound API
3328  */
3329 void
3330 NdbIndexScanOperation::releaseIndexBoundsOldApi()
3331 {
3332  NdbRecAttr* bound= firstRangeOldApi;
3333  while (bound != NULL)
3334  {
3335  NdbRecAttr* release= bound;
3336  bound= bound->next();
3337  theNdb->releaseRecAttr(release);
3338  }
3339 
3340  if (currentRangeOldApi != NULL)
3341  theNdb->releaseRecAttr(currentRangeOldApi);
3342 
3343  firstRangeOldApi= lastRangeOldApi= currentRangeOldApi= NULL;
3344 }
3345 
3346 
3347 int
3348 NdbIndexScanOperation::ndbrecord_insert_bound(const NdbRecord *key_record,
3349  Uint32 column_index,
3350  const char *row,
3351  Uint32 bound_type,
3352  Uint32*& firstWordOfBound)
3353 {
3354  char buf[NdbRecord::Attr::SHRINK_VARCHAR_BUFFSIZE];
3355  const NdbRecord::Attr *column= &key_record->columns[column_index];
3356 
3357  bool is_null= column->is_null(row);
3358  Uint32 len= 0;
3359  const void *aValue= row+column->offset;
3360 
3361  if (!is_null)
3362  {
3363  bool len_ok;
3364  /* Support for special mysqld varchar format in keys. */
3365  if (column->flags & NdbRecord::IsMysqldShrinkVarchar)
3366  {
3367  len_ok= column->shrink_varchar(row, len, buf);
3368  aValue= buf;
3369  }
3370  else
3371  {
3372  len_ok= column->get_var_length(row, len);
3373  }
3374  if (!len_ok) {
3375  setErrorCodeAbort(4209);
3376  return -1;
3377  }
3378  }
3379 
3380  /* Add bound type */
3381  if (unlikely(insertKEYINFO_NdbRecord((const char*) &bound_type,
3382  sizeof(Uint32))))
3383  {
3384  /* Some sort of allocation error */
3385  setErrorCodeAbort(4000);
3386  return -1;
3387  }
3388 
3389  assert( theKEYINFOptr != NULL );
3390  /* Grab ptr to first word of this bound if caller wants it */
3391  if (firstWordOfBound == NULL)
3392  firstWordOfBound= theKEYINFOptr - 1;
3393 
3394  AttributeHeader ah(column->index_attrId, len);
3395 
3396  /* Add AttrInfo header + data for bound */
3397  if (unlikely(insertKEYINFO_NdbRecord((const char*) &ah.m_value,
3398  sizeof(Uint32)) ||
3399  insertKEYINFO_NdbRecord((const char*) aValue, len) ))
3400  {
3401  /* Some sort of allocation error */
3402  setErrorCodeAbort(4000);
3403  return -1;
3404  }
3405 
3406  return 0;
3407 }
3408 
3409 int
3410 NdbIndexScanOperation::insert_open_bound(const NdbRecord *key_record,
3411  Uint32*& firstWordOfBound)
3412 {
3413  /* We want to insert an open bound into a scan
3414  * This is done by requesting all rows with first key column
3415  * >= NULL (so, confusingly, bound is <= NULL)
3416  * Sending this as bound info for an open bound allows us to
3417  * also send the range number etc so that MRR scans can include
3418  * open ranges.
3419  * Note that MRR scans with open ranges are an inefficient use of
3420  * MRR. Really the application should realise that all rows are
3421  * being processed and only fetch them once.
3422  */
3423  const Uint32 bound_type= NdbIndexScanOperation::BoundLE;
3424 
3425  if (unlikely(insertKEYINFO_NdbRecord((const char*) &bound_type,
3426  sizeof(Uint32))))
3427  {
3428  /* Some sort of allocation error */
3429  setErrorCodeAbort(4000);
3430  return -1;
3431  }
3432 
3433  /* Grab ptr to first word of this bound if caller wants it */
3434  if (firstWordOfBound == NULL)
3435  firstWordOfBound= theKEYINFOptr - 1;
3436 
3437  /*
3438  * bug#57396 wrong attr id inserted.
3439  * First index attr id is 0, key_record not used.
3440  * Create NULL attribute header.
3441  */
3442  AttributeHeader ah(0, 0);
3443 
3444  if (unlikely(insertKEYINFO_NdbRecord((const char*) &ah.m_value,
3445  sizeof(Uint32))))
3446  {
3447  /* Some sort of allocation error */
3448  setErrorCodeAbort(4000);
3449  return -1;
3450  };
3451 
3452  return 0;
3453 }
3454 
3455 /* IndexScan readTuples - part of old scan API
3456  * This call does the minimum amount of validation and state
3457  * storage possible. Most of the scan initialisation is done
3458  * later as part of processIndexScanDefs
3459  */
3460 int
3462  Uint32 scan_flags,
3463  Uint32 parallel,
3464  Uint32 batch)
3465 {
3466  /* Defer to Scan Operation's readTuples */
3467  int res= NdbScanOperation::readTuples(lm, scan_flags, parallel, batch);
3468 
3469  /* Set up IndexScan specific members */
3470  if (res == 0 &&
3471  ( (int) m_accessTable->m_indexType ==
3472  (int) NdbDictionary::Index::OrderedIndex))
3473  {
3474  if (m_currentTable == m_accessTable){
3475  // Old way of scanning indexes, should not be allowed
3476  m_currentTable = theNdb->theDictionary->
3477  getTable(m_currentTable->m_primaryTable.c_str());
3478  assert(m_currentTable != NULL);
3479  }
3480  assert (m_currentTable != m_accessTable);
3481  // Modify operation state
3482  theStatus = GetValue;
3483  theOperationType = OpenRangeScanRequest;
3484  }
3485 
3486  return res;
3487 }
3488 
3489 /* Most of the work of Index Scan definition for old and NdbRecord
3490  * Index scans is done in this method
3491  */
3492 int
3493 NdbIndexScanOperation::processIndexScanDefs(LockMode lm,
3494  Uint32 scan_flags,
3495  Uint32 parallel,
3496  Uint32 batch)
3497 {
3498  const bool order_by = scan_flags & (SF_OrderBy | SF_OrderByFull);
3499  const bool order_desc = scan_flags & SF_Descending;
3500  const bool read_range_no = scan_flags & SF_ReadRangeNo;
3501  m_multi_range = scan_flags & SF_MultiRange;
3502 
3503  /* Defer to table scan method */
3504  int res = NdbScanOperation::processTableScanDefs(lm,
3505  scan_flags,
3506  parallel,
3507  batch);
3508  if(!res && read_range_no)
3509  {
3510  m_read_range_no = 1;
3511  if (insertATTRINFOHdr_NdbRecord(AttributeHeader::RANGE_NO,
3512  0) == -1)
3513  res = -1;
3514  }
3515  if (!res)
3516  {
3526  if (order_desc) {
3527  m_descending = true;
3528  ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend());
3529  ScanTabReq::setDescendingFlag(req->requestInfo, true);
3530  }
3531  if (order_by) {
3532  m_ordered = true;
3533  Uint32 cnt = m_accessTable->getNoOfColumns() - 1;
3534  m_sort_columns = cnt; // -1 for NDB$NODE
3535  m_current_api_receiver = m_sent_receivers_count;
3536  m_api_receivers_count = m_sent_receivers_count;
3537  }
3538 
3539  /* Should always have NdbRecord at this point */
3540  assert (m_attribute_record);
3541  }
3542 
3543  m_num_bounds = 0;
3544  m_previous_range_num = 0;
3545 
3546  return res;
3547 }
3548 
3549 int compare_ndbrecord(const NdbReceiver *r1,
3550  const NdbReceiver *r2,
3551  const NdbRecord *key_record,
3552  const NdbRecord *result_record,
3553  bool descending,
3554  bool read_range_no)
3555 {
3556  Uint32 i;
3557  int jdir= 1 - 2 * (int)descending;
3558 
3559  assert(jdir == 1 || jdir == -1);
3560 
3561  const char *a_row= r1->peek_row();
3562  const char *b_row= r2->peek_row();
3563 
3564  /* First compare range_no if needed. */
3565  if (read_range_no)
3566  {
3567  Uint32 a_range_no= uint4korr(a_row+result_record->m_row_size);
3568  Uint32 b_range_no= uint4korr(b_row+result_record->m_row_size);
3569  if (a_range_no != b_range_no)
3570  return (a_range_no < b_range_no ? -1 : 1);
3571  }
3572 
3573  for (i= 0; i<key_record->key_index_length; i++)
3574  {
3575  const NdbRecord::Attr *key_col =
3576  &key_record->columns[key_record->key_indexes[i]];
3577  assert(key_col->attrId < result_record->m_attrId_indexes_length);
3578  int col_idx = result_record->m_attrId_indexes[key_col->attrId];
3579  assert(col_idx >= 0);
3580  assert((Uint32)col_idx < result_record->noOfColumns);
3581  const NdbRecord::Attr *result_col = &result_record->columns[col_idx];
3582 
3583  bool a_is_null= result_col->is_null(a_row);
3584  bool b_is_null= result_col->is_null(b_row);
3585  if (a_is_null)
3586  {
3587  if (!b_is_null)
3588  return -1 * jdir;
3589  }
3590  else
3591  {
3592  if (b_is_null)
3593  return 1 * jdir;
3594 
3595  Uint32 offset= result_col->offset;
3596  Uint32 maxSize= result_col->maxSize;
3597  const char *a_ptr= a_row + offset;
3598  const char *b_ptr= b_row + offset;
3599  void *info= result_col->charset_info;
3600  int res=
3601  (*result_col->compare_function)
3602  (info, a_ptr, maxSize, b_ptr, maxSize);
3603  if (res)
3604  {
3605  return res * jdir;
3606  }
3607  }
3608  }
3609 
3610  return 0;
3611 }
3612 
3613 /* This function performs the merge sort of the parallel ordered index scans
3614  * to produce a single sorted stream of rows to the application.
3615  *
3616  * To ensure the correct ordering, before a row can be returned, the function
3617  * must ensure that all fragments have either returned at least one row, or
3618  * indicated that they have no more rows to return.
3619  *
3620  * The function maintains an array of receivers, one per fragment, sorted by
3621  * the relative ordering of their next rows. Each time a row is taken from
3622  * the 'top' receiver, it is re-inserted in the ordered list of receivers
3623  * which requires O(log2(NumReceivers)) comparisons.
3624  */
3625 int
3626 NdbIndexScanOperation::next_result_ordered_ndbrecord(const char * & out_row,
3627  bool fetchAllowed,
3628  bool forceSend)
3629 {
3630  Uint32 current;
3631 
3632  /*
3633  Retrieve more rows if necessary, then sort the array of receivers.
3634 
3635  The special case m_current_api_receiver==theParallelism is for the
3636  initial call, where we need to wait for and sort all receviers.
3637  */
3638  if (m_current_api_receiver==theParallelism ||
3639  !m_api_receivers[m_current_api_receiver]->nextResult())
3640  {
3641  if (!fetchAllowed)
3642  return 2; // No more data available now
3643 
3644  /* Wait for all receivers to be retrieved. */
3645  int count= ordered_send_scan_wait_for_all(forceSend);
3646  if (count == -1)
3647  return -1;
3648 
3649  /*
3650  Insert all newly retrieved receivers in sorted array.
3651  The receivers are left in m_conf_receivers for us to move into place.
3652  */
3653  current= m_current_api_receiver;
3654  for (int i= 0; i < count; i++)
3655  ordered_insert_receiver(current--, m_conf_receivers[i]);
3656  m_current_api_receiver= current;
3657  theNdb->theImpl->incClientStat(Ndb::ScanBatchCount, count);
3658  }
3659  else
3660  {
3661  /*
3662  Just make sure the first receiver (from which we just returned a row, so
3663  it may no longer be in the correct sort position) is placed correctly.
3664  */
3665  current= m_current_api_receiver;
3666  ordered_insert_receiver(current + 1, m_api_receivers[current]);
3667  }
3668 
3669  /* Now just return the next row (if any). */
3670  if (current < theParallelism && m_api_receivers[current]->nextResult())
3671  {
3672  out_row= m_api_receivers[current]->get_row();
3673  return 0;
3674  }
3675  else
3676  {
3677  theError.code= -1;
3678  return 1; // End-of-file
3679  }
3680 }
3681 
3682 /* Insert a newly fully-retrieved receiver in the correct sorted place. */
3683 void
3684 NdbIndexScanOperation::ordered_insert_receiver(Uint32 start,
3685  NdbReceiver *receiver)
3686 {
3687  /*
3688  Binary search to find the position of the first receiver with no rows
3689  smaller than the first row for this receiver. We need to insert this
3690  receiver just before that position.
3691  */
3692  Uint32 first= start;
3693  Uint32 last= theParallelism;
3694  while (first < last)
3695  {
3696  Uint32 idx= (first+last)/2;
3697  int res= compare_ndbrecord(receiver,
3698  m_api_receivers[idx],
3699  m_key_record,
3700  m_attribute_record,
3701  m_descending,
3702  m_read_range_no);
3703  if (res <= 0)
3704  last= idx;
3705  else
3706  first= idx+1;
3707  }
3708 
3709  /* Move down any receivers that go before this one, then insert it. */
3710  if (last > start)
3711  memmove(&m_api_receivers[start-1],
3712  &m_api_receivers[start],
3713  (last - start) * sizeof(m_api_receivers[0]));
3714  m_api_receivers[last-1]= receiver;
3715 }
3716 
3717 /*
3718  This method is called during (NdbRecord) ordered index scans when all rows
3719  from one batch of one fragment scan are exhausted (identified by
3720  m_current_api_receiver).
3721 
3722  It sends a SCAN_NEXTREQ signal for the fragment and waits for the batch to
3723  be fully received.
3724 
3725  As a special case, it is also called at the start of the scan. In this case,
3726  no signal is sent, it just waits for the initial batch to be fully received
3727  from all fragments.
3728 
3729  The method returns -1 for error, and otherwise the number of fragments that
3730  were received (this will be 0 or 1, except for the initial call where it
3731  will be equal to theParallelism).
3732 
3733  The NdbReceiver object(s) are left in the m_conf_receivers array. Note that
3734  it is safe to read from m_conf_receivers without mutex protection immediately
3735  after return from this method; as all fragments are fully received no new
3736  receivers can enter that array until the next call to this method.
3737 */
3738 int
3739 NdbIndexScanOperation::ordered_send_scan_wait_for_all(bool forceSend)
3740 {
3741  NdbImpl* impl = theNdb->theImpl;
3742  Uint32 timeout= impl->get_waitfor_timeout();
3743 
3744  PollGuard poll_guard(* impl);
3745  if(theError.code)
3746  return -1;
3747 
3748  Uint32 seq= theNdbCon->theNodeSequence;
3749  Uint32 nodeId= theNdbCon->theDBnode;
3750  if (seq == impl->getNodeSequence(nodeId) &&
3751  !send_next_scan_ordered(m_current_api_receiver))
3752  {
3753  impl->incClientStat(Ndb::WaitScanResultCount, 1);
3754  while (m_sent_receivers_count > 0 && !theError.code)
3755  {
3756  int ret_code= poll_guard.wait_scan(3*timeout, nodeId, forceSend);
3757  if (ret_code == 0 && seq == impl->getNodeSequence(nodeId))
3758  continue;
3759  if(ret_code == -1){
3760  setErrorCode(4008);
3761  } else {
3762  setErrorCode(4028);
3763  }
3764  return -1;
3765  }
3766 
3767  if(theError.code){
3768  setErrorCode(theError.code);
3769  return -1;
3770  }
3771 
3772  Uint32 new_receivers= m_conf_receivers_count;
3773  m_conf_receivers_count= 0;
3774  return new_receivers;
3775  } else {
3776  setErrorCode(4028);
3777  return -1;
3778  }
3779 }
3780 
3781 /*
3782  This method is used in ordered index scan to acknowledge the reception of
3783  one batch of fragment scan rows and request the sending of another batch (it
3784  sends a SCAN_NEXTREQ signal with one scan fragment record pointer).
3785 
3786  It is called with the argument IDX set to the value of
3787  m_current_api_receiver, the receiver for the fragment scan to acknowledge.
3788  This receiver is moved from the m_api_receivers array to the
3789  m_sent_receivers array.
3790 
3791  This method is called with the PollGuard mutex held on the transporter.
3792 */
3793 int
3794 NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx)
3795 {
3796  if(idx == theParallelism)
3797  return 0;
3798 
3799  NdbReceiver* tRec = m_api_receivers[idx];
3800  NdbApiSignal tSignal(theNdb->theMyRef);
3801  tSignal.setSignal(GSN_SCAN_NEXTREQ, refToBlock(theNdbCon->m_tcRef));
3802 
3803  Uint32 last = m_sent_receivers_count;
3804  Uint32* theData = tSignal.getDataPtrSend();
3805  Uint32* prep_array = theData + 4;
3806 
3807  m_current_api_receiver = idx + 1;
3808  if((prep_array[0] = tRec->m_tcPtrI) == RNIL)
3809  {
3810  if(DEBUG_NEXT_RESULT)
3811  ndbout_c("receiver completed, don't send");
3812  return 0;
3813  }
3814 
3815  theData[0] = theNdbCon->theTCConPtr;
3816  theData[1] = 0;
3817  Uint64 transId = theNdbCon->theTransactionId;
3818  theData[2] = (Uint32) transId;
3819  theData[3] = (Uint32) (transId >> 32);
3820 
3824  m_sent_receivers[last] = tRec;
3825  tRec->m_list_index = last;
3826  tRec->prepareSend();
3827  m_sent_receivers_count = last + 1;
3828 
3829  Uint32 nodeId = theNdbCon->theDBnode;
3830  NdbImpl * impl = theNdb->theImpl;
3831  tSignal.setLength(4+1);
3832  int ret= impl->sendSignal(&tSignal, nodeId);
3833  return ret;
3834 }
3835 
3836 int
3837 NdbScanOperation::close_impl(bool forceSend, PollGuard *poll_guard)
3838 {
3839  NdbImpl* impl = theNdb->theImpl;
3840  Uint32 timeout= impl->get_waitfor_timeout();
3841  Uint32 seq = theNdbCon->theNodeSequence;
3842  Uint32 nodeId = theNdbCon->theDBnode;
3843 
3844  if (seq != impl->getNodeSequence(nodeId))
3845  {
3846  theNdbCon->theReleaseOnClose = true;
3847  return -1;
3848  }
3849 
3853  impl->incClientStat(Ndb::WaitScanResultCount, 1);
3854  while(theError.code == 0 && m_sent_receivers_count)
3855  {
3856  int return_code= poll_guard->wait_scan(3*timeout, nodeId, forceSend);
3857  switch(return_code){
3858  case 0:
3859  break;
3860  case -1:
3861  setErrorCode(4008);
3862  case -2:
3863  m_api_receivers_count = 0;
3864  m_conf_receivers_count = 0;
3865  m_sent_receivers_count = 0;
3866  theNdbCon->theReleaseOnClose = true;
3867  return -1;
3868  }
3869  }
3870 
3871  if(theError.code)
3872  {
3873  m_api_receivers_count = 0;
3874  m_current_api_receiver = m_ordered ? theParallelism : 0;
3875  }
3876 
3877 
3882  Uint32 api = m_api_receivers_count;
3883  Uint32 conf = m_conf_receivers_count;
3884 
3885  if(m_ordered)
3886  {
3890  memmove(m_api_receivers, m_api_receivers+m_current_api_receiver,
3891  (theParallelism - m_current_api_receiver) * sizeof(char*));
3892  api = (theParallelism - m_current_api_receiver);
3893  m_api_receivers_count = api;
3894  }
3895 
3896  if(DEBUG_NEXT_RESULT)
3897  ndbout_c("close_impl: [order api conf sent curr parr] %d %d %d %d %d %d",
3898  m_ordered, api, conf,
3899  m_sent_receivers_count, m_current_api_receiver, theParallelism);
3900 
3901  if(api+conf)
3902  {
3907  memcpy(m_api_receivers+api, m_conf_receivers, conf * sizeof(char*));
3908  m_api_receivers_count = api + conf;
3909  m_conf_receivers_count = 0;
3910  }
3911 
3912  // Send close scan
3913  if(send_next_scan(api+conf, true) == -1)
3914  {
3915  theNdbCon->theReleaseOnClose = true;
3916  return -1;
3917  }
3918 
3922  impl->incClientStat(Ndb::WaitScanResultCount, 1);
3923  while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count)
3924  {
3925  int return_code= poll_guard->wait_scan(3*timeout, nodeId, forceSend);
3926  switch(return_code){
3927  case 0:
3928  break;
3929  case -1:
3930  setErrorCode(4008);
3931  case -2:
3932  m_api_receivers_count = 0;
3933  m_conf_receivers_count = 0;
3934  m_sent_receivers_count = 0;
3935  theNdbCon->theReleaseOnClose = true;
3936  return -1;
3937  }
3938  }
3939 
3940  /* Rather nasty way to clean up IndexScan resources if
3941  * any
3942  */
3943  if (theOperationType == OpenRangeScanRequest)
3944  {
3945  NdbIndexScanOperation *isop=
3946  reinterpret_cast<NdbIndexScanOperation*> (this);
3947 
3948  /* Release any Index Bound resources */
3949  isop->releaseIndexBoundsOldApi();
3950  }
3951 
3952  /* Free any scan-owned ScanFilter generated InterpretedCode
3953  * object (old Api only)
3954  */
3955  freeInterpretedCodeOldApi();
3956 
3957  return 0;
3958 }
3959 
3960 void
3961 NdbScanOperation::reset_receivers(Uint32 parallell, Uint32 ordered){
3962  for(Uint32 i = 0; i<parallell; i++){
3963  m_receivers[i]->m_list_index = i;
3964  m_prepared_receivers[i] = m_receivers[i]->getId();
3965  m_sent_receivers[i] = m_receivers[i];
3966  m_conf_receivers[i] = 0;
3967  m_api_receivers[i] = 0;
3968  m_receivers[i]->prepareSend();
3969  }
3970 
3971  m_api_receivers_count = 0;
3972  m_current_api_receiver = 0;
3973  m_sent_receivers_count = 0;
3974  m_conf_receivers_count = 0;
3975 }
3976 
3977 int
3979 {
3980  DBUG_ENTER("end_of_bound");
3981  DBUG_PRINT("info", ("Range number %u", no));
3982 
3983  if (! (m_savedScanFlagsOldApi & SF_MultiRange || no == 0))
3984  {
3985  setErrorCodeAbort(4509);
3986  /* Non SF_MultiRange scan cannot have more than one bound */
3987  DBUG_RETURN(-1);
3988  }
3989 
3990  if (currentRangeOldApi == NULL)
3991  {
3992  setErrorCodeAbort(4259);
3993  /* Invalid set of range scan bounds */
3994  DBUG_RETURN(-1);
3995  }
3996 
3997  /* If it's an ordered scan and we're reading range numbers
3998  * back then check that range numbers are strictly
3999  * increasing
4000  */
4001  if ((m_savedScanFlagsOldApi & (SF_OrderBy | SF_OrderByFull)) &&
4002  (m_savedScanFlagsOldApi & SF_ReadRangeNo))
4003  {
4004  Uint32 expectedNum= 0;
4005 
4006  if (lastRangeOldApi != NULL)
4007  {
4008  assert( firstRangeOldApi != NULL );
4009  expectedNum =
4010  getIndexBoundFromRecAttr(lastRangeOldApi)->range_no + 1;
4011  }
4012 
4013  if (no != expectedNum)
4014  {
4015  setErrorCodeAbort(4282);
4016  /* range_no not strictly increasing in ordered multi-range index scan */
4017  DBUG_RETURN(-1);
4018  }
4019  }
4020 
4021  if (buildIndexBoundOldApi(no) != 0)
4022  DBUG_RETURN(-1);
4023 
4024  DBUG_RETURN(0);
4025 }
4026 
4027 int
4029 {
4030  assert(m_attribute_record);
4031 
4032  if (m_read_range_no)
4033  {
4034  Uint32 idx= m_current_api_receiver;
4035  if (idx >= m_api_receivers_count)
4036  return -1;
4037 
4038  const NdbReceiver *tRec= m_api_receivers[m_current_api_receiver];
4039  return tRec->get_range_no();
4040  }
4041  return -1;
4042 }
4043 
4044 const NdbOperation *
4046  const NdbRecord *result_rec,
4047  char *result_row,
4048  const unsigned char *result_mask,
4049  const NdbOperation::OperationOptions *opts,
4050  Uint32 sizeOfOptions)
4051 {
4052  unsigned char empty_mask[NDB_MAX_ATTRIBUTES_IN_TABLE>>3];
4053  /* Default is to not read any attributes, just take over the lock. */
4054  if (!result_row)
4055  {
4056  bzero(empty_mask, sizeof(empty_mask));
4057  result_mask= &empty_mask[0];
4058  }
4059  return takeOverScanOpNdbRecord(NdbOperation::ReadRequest, takeOverTrans,
4060  result_rec, result_row,
4061  result_mask, opts, sizeOfOptions);
4062 }
4063 
4064 bool
4066 {
4067  /* Note that for old Api scans, the bounds are not added until
4068  * execute() time, so this will return false until after execute
4069  */
4070  return ((m_pruneState == SPS_ONE_PARTITION) ||
4071  (m_pruneState == SPS_FIXED));
4072 }
4073 
4074 NdbBlob*
4075 NdbScanOperation::getBlobHandle(const char* anAttrName) const
4076 {
4077  return NdbOperation::getBlobHandle(anAttrName);
4078 }
4079 
4080 NdbBlob*
4081 NdbScanOperation::getBlobHandle(Uint32 anAttrId) const
4082 {
4083  return NdbOperation::getBlobHandle(anAttrId);
4084 }