MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
NdbQueryOperation.cpp
1 /*
2  Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 
19 #include <ndb_global.h>
20 #include <NdbDictionary.hpp>
21 #include <NdbIndexScanOperation.hpp>
22 #include "NdbQueryBuilder.hpp"
23 #include "NdbQueryOperation.hpp"
24 #include "API.hpp"
25 #include "NdbQueryBuilderImpl.hpp"
26 #include "NdbQueryOperationImpl.hpp"
27 #include "NdbInterpretedCode.hpp"
28 
29 #include <signaldata/TcKeyReq.hpp>
30 #include <signaldata/TcKeyRef.hpp>
31 #include <signaldata/ScanTab.hpp>
32 #include <signaldata/QueryTree.hpp>
33 #include <signaldata/DbspjErr.hpp>
34 
35 #include "AttributeHeader.hpp"
36 
37 #include <Bitmask.hpp>
38 
39 #if 0
40 #define DEBUG_CRASH() assert(false)
41 #else
42 #define DEBUG_CRASH()
43 #endif
44 
48 #define UNUSED(x) ((void)(x))
49 
50 // To force usage of SCAN_NEXTREQ even for small scans resultsets
51 static const bool testNextReq = false;
52 
53 /* Various error codes that are not specific to NdbQuery. */
54 static const int Err_TupleNotFound = 626;
55 static const int Err_MemoryAlloc = 4000;
56 static const int Err_SendFailed = 4002;
57 static const int Err_FunctionNotImplemented = 4003;
58 static const int Err_UnknownColumn = 4004;
59 static const int Err_ReceiveTimedOut = 4008;
60 static const int Err_NodeFailCausedAbort = 4028;
61 static const int Err_ParameterError = 4118;
62 static const int Err_SimpleDirtyReadFailed = 4119;
63 static const int Err_WrongFieldLength = 4209;
64 static const int Err_ReadTooMuch = 4257;
65 static const int Err_InvalidRangeNo = 4286;
66 static const int Err_DifferentTabForKeyRecAndAttrRec = 4287;
67 static const int Err_KeyIsNULL = 4316;
68 static const int Err_FinaliseNotCalled = 4519;
69 static const int Err_InterpretedCodeWrongTab = 4524;
70 
71 /* A 'void' index for a tuple in internal parent / child correlation structs .*/
72 static const Uint16 tupleNotFound = 0xffff;
73 
75 const bool traceSignals = false;
76 
77 enum
78 {
83  Parallelism_adaptive = 0xffff0000,
84 
89  Parallelism_max = 0xffff0001
90 };
91 
108 {
109 public:
110  static const Uint32 wordCount = 1;
111 
112  explicit TupleCorrelation()
113  : m_correlation((tupleNotFound<<16) | tupleNotFound)
114  {}
115 
117  explicit TupleCorrelation(Uint32 val)
118  : m_correlation(val)
119  {}
120  Uint32 toUint32() const
121  { return m_correlation; }
122 
123  Uint16 getTupleId() const
124  { return m_correlation & 0xffff;}
125 
126  Uint16 getParentTupleId() const
127  { return m_correlation >> 16;}
128 
129 private:
130  Uint32 m_correlation;
131 }; // class TupleCorrelation
132 
134 {
135 public:
136  static const Uint32 wordCount = 3;
137 
138  explicit CorrelationData(const Uint32* tupleData, Uint32 tupleLength):
139  m_corrPart(tupleData + tupleLength - wordCount)
140  {
141  assert(tupleLength >= wordCount);
142  assert(AttributeHeader(m_corrPart[0]).getAttributeId()
143  == AttributeHeader::CORR_FACTOR64);
144  assert(AttributeHeader(m_corrPart[0]).getByteSize() == 2*sizeof(Uint32));
145  assert(getTupleCorrelation().getTupleId()<tupleNotFound);
146  assert(getTupleCorrelation().getParentTupleId()<tupleNotFound);
147  }
148 
149  Uint32 getRootReceiverId() const
150  { return m_corrPart[2];}
151 
152  const TupleCorrelation getTupleCorrelation() const
153  { return TupleCorrelation(m_corrPart[1]); }
154 
155 private:
156  const Uint32* const m_corrPart;
157 }; // class CorrelationData
158 
178 public:
181  static void buildReciverIdMap(NdbRootFragment* frags,
182  Uint32 noOfFrags);
183 
186  Uint32 noOfFrags,
187  Uint32 receiverId);
188 
189  explicit NdbRootFragment();
190 
191  ~NdbRootFragment();
192 
199  void init(NdbQueryImpl& query, Uint32 fragNo);
200 
201  static void clear(NdbRootFragment* frags, Uint32 noOfFrags);
202 
203  Uint32 getFragNo() const
204  { return m_fragNo; }
205 
209  void prepareNextReceiveSet();
210 
214  void grabNextResultSet(); // Need mutex lock
215 
216  bool hasReceivedMore() const; // Need mutex lock
217 
218  void setReceivedMore(); // Need mutex lock
219 
220  void incrOutstandingResults(Int32 delta)
221  {
222  m_outstandingResults += delta;
223  }
224 
225  void clearOutstandingResults()
226  {
227  m_outstandingResults = 0;
228  }
229 
230  void setConfReceived(Uint32 tcPtrI);
231 
242  bool isFragBatchComplete() const
243  {
244  assert(m_fragNo!=voidFragNo);
245  return m_confReceived && m_outstandingResults==0;
246  }
247 
254  NdbResultStream& getResultStream(Uint32 operationNo) const;
255 
257  { return getResultStream(op.getQueryOperationDef().getQueryOperationIx()); }
258 
259  Uint32 getReceiverId() const;
260  Uint32 getReceiverTcPtrI() const;
261 
265  bool finalBatchReceived() const;
266 
271  bool isEmpty() const;
272 
278  void setRemainingSubScans(Uint32 nodeMask)
279  {
280  m_remainingScans = nodeMask;
281  }
282 
284  void postFetchRelease();
285 
286 private:
289  NdbRootFragment& operator=(const NdbRootFragment&);
290 
291  STATIC_CONST( voidFragNo = 0xffffffff);
292 
294  NdbQueryImpl* m_query;
295 
297  Uint32 m_fragNo;
298 
300  NdbResultStream* m_resultStreams;
301 
307  Uint32 m_availResultSets; // Need mutex
308 
317  Int32 m_outstandingResults;
318 
326  bool m_confReceived;
327 
332  Uint32 m_remainingScans;
333 
339  int m_idMapHead;
340 
346  int m_idMapNext;
347 }; //NdbRootFragment
348 
355 {
356  friend class NdbResultStream;
357 
358 public:
359  explicit NdbResultSet();
360 
361  void init(NdbQueryImpl& query,
362  Uint32 maxRows, Uint32 rowSize);
363 
364  void prepareReceive(NdbReceiver& receiver)
365  {
366  m_rowCount = 0;
367  receiver.prepareReceive(m_buffer);
368  }
369 
370  void prepareRead(NdbReceiver& receiver)
371  {
372  receiver.prepareRead(m_buffer,m_rowCount);
373  }
374 
375  Uint32 getRowCount() const
376  { return m_rowCount; }
377 
378 private:
380  NdbResultSet(const NdbResultSet&);
381  NdbResultSet& operator=(const NdbResultSet&);
382 
384  char* m_buffer;
385 
387  Uint32* m_batchOverflowCheck;
388 
390  TupleCorrelation* m_correlations;
391 
392  Uint32 m_rowSize;
393 
395  Uint32 m_rowCount;
396 
397 }; // class NdbResultSet
398 
410 public:
411 
416  explicit NdbResultStream(NdbQueryOperationImpl& operation,
417  NdbRootFragment& rootFrag);
418 
419  ~NdbResultStream();
420 
424  void prepare();
425 
427  void prepareNextReceiveSet();
428 
429  NdbReceiver& getReceiver()
430  { return m_receiver; }
431 
432  const NdbReceiver& getReceiver() const
433  { return m_receiver; }
434 
435  const char* getCurrentRow()
436  { return m_receiver.get_row(); }
437 
445  void execTRANSID_AI(const Uint32 *ptr, Uint32 len,
446  TupleCorrelation correlation);
447 
453  bool prepareResultSet(Uint32 remainingScans);
454 
462  Uint16 firstResult();
463  Uint16 nextResult();
464 
469  bool isEmpty() const
470  { return m_iterState == Iter_finished; }
471 
478  bool isSubScanComplete(Uint32 remainingScans) const
479  {
485  const Uint32 internalOpNo = m_operation.getQueryOperationDef().getQueryOperationId();
486 
487  const bool complete = !((remainingScans >> internalOpNo) & 1);
488  assert(complete || isScanResult()); // Lookups should always be 'complete'
489  return complete;
490  }
491 
492  bool isScanQuery() const
493  { return (m_properties & Is_Scan_Query); }
494 
495  bool isScanResult() const
496  { return (m_properties & Is_Scan_Result); }
497 
498  bool isInnerJoin() const
499  { return (m_properties & Is_Inner_Join); }
500 
502  friend NdbOut& operator<<(NdbOut& out, const NdbResultStream&);
503 
526  class TupleSet {
527  public:
528  // Tuple ids are unique within this batch and stream
529  Uint16 m_parentId; // Id of parent tuple which this tuple is correlated with
530  Uint16 m_tupleId; // Id of this tuple
531 
532  Uint16 m_hash_head; // Index of first item in TupleSet[] matching a hashed parentId.
533  Uint16 m_hash_next; // 'next' index matching
534 
535  bool m_skip; // Skip this tuple in result processing for now
536 
540  Bitmask<(NDB_SPJ_MAX_TREE_NODES+31)/32> m_hasMatchingChild;
541 
542  explicit TupleSet() : m_hash_head(tupleNotFound)
543  {}
544 
545  private:
547  TupleSet(const TupleSet&);
548  TupleSet& operator=(const TupleSet&);
549  };
550 
551 private:
556  const NdbRootFragment& m_rootFrag;
557 
559  NdbQueryOperationImpl& m_operation;
560 
562  NdbResultStream* const m_parent;
563 
564  const enum properties
565  {
566  Is_Scan_Query = 0x01,
567  Is_Scan_Result = 0x02,
568  Is_Inner_Join = 0x10
569  } m_properties;
570 
572  NdbReceiver m_receiver;
573 
578  NdbResultSet m_resultSets[1];
579  Uint32 m_read; // We read from m_resultSets[m_read]
580  Uint32 m_recv; // We receive into m_resultSets[m_recv]
581 
583  enum
584  {
586  Iter_notStarted,
588  Iter_started,
590  Iter_finished
591  } m_iterState;
592 
597  Uint16 m_currentRow;
598 
600  Uint32 m_maxRows;
601 
603  TupleSet* m_tupleSet;
604 
605  void buildResultCorrelations();
606 
607  Uint16 getTupleId(Uint16 tupleNo) const
608  { return (m_tupleSet) ? m_tupleSet[tupleNo].m_tupleId : 0; }
609 
610  Uint16 getCurrentTupleId() const
611  { return (m_currentRow==tupleNotFound) ? tupleNotFound : getTupleId(m_currentRow); }
612 
613  Uint16 findTupleWithParentId(Uint16 parentId) const;
614 
615  Uint16 findNextTuple(Uint16 tupleNo) const;
616 
619  NdbResultStream& operator=(const NdbResultStream&);
620 }; //class NdbResultStream
621 
625 
627  :m_objSize(objSize),
628  m_maxObjs(0),
629  m_buffer(NULL),
630  m_nextObjNo(0)
631 {}
632 
633 int NdbBulkAllocator::init(Uint32 maxObjs)
634 {
635  assert(m_buffer == NULL);
636  m_maxObjs = maxObjs;
637  // Add check for buffer overrun.
638  m_buffer = new char[m_objSize*m_maxObjs+1];
639  if (unlikely(m_buffer == NULL))
640  {
641  return Err_MemoryAlloc;
642  }
643  m_buffer[m_maxObjs * m_objSize] = endMarker;
644  return 0;
645 }
646 
648  // Overrun check.
649  assert(m_buffer == NULL || m_buffer[m_maxObjs * m_objSize] == endMarker);
650  // Overwrite with 0xff bytes to detect accidental use of released memory.
651  assert(m_buffer == NULL ||
652  memset(m_buffer, 0xff, m_maxObjs * m_objSize) != NULL);
653  delete [] m_buffer;
654  m_buffer = NULL;
655  m_nextObjNo = 0;
656  m_maxObjs = 0;
657 }
658 
659 void* NdbBulkAllocator::allocObjMem(Uint32 noOfObjs)
660 {
661  assert(m_nextObjNo + noOfObjs <= m_maxObjs);
662  void * const result = m_buffer+m_objSize*m_nextObjNo;
663  m_nextObjNo += noOfObjs;
664  return m_nextObjNo > m_maxObjs ? NULL : result;
665 }
666 
670 NdbResultSet::NdbResultSet() :
671  m_buffer(NULL),
672  m_batchOverflowCheck(NULL),
673  m_correlations(NULL),
674  m_rowSize(0),
675  m_rowCount(0)
676 {}
677 
678 void
679 NdbResultSet::init(NdbQueryImpl& query,
680  Uint32 maxRows,
681  Uint32 rowSize)
682 {
683  m_rowSize = rowSize;
684  {
685  const int bufferSize = rowSize * maxRows;
686  NdbBulkAllocator& bufferAlloc = query.getRowBufferAlloc();
687  m_buffer = reinterpret_cast<char*>(bufferAlloc.allocObjMem(bufferSize));
688 
689  // So that we can test for buffer overrun.
690  m_batchOverflowCheck =
691  reinterpret_cast<Uint32*>(bufferAlloc.allocObjMem(sizeof(Uint32)));
692  *m_batchOverflowCheck = 0xacbd1234;
693 
694  if (query.getQueryDef().isScanQuery())
695  {
696  m_correlations = reinterpret_cast<TupleCorrelation*>
697  (bufferAlloc.allocObjMem(maxRows*sizeof(TupleCorrelation)));
698  }
699  }
700 }
701 
705 
707  NdbRootFragment& rootFrag)
708 :
709  m_rootFrag(rootFrag),
710  m_operation(operation),
711  m_parent(operation.getParentOperation()
712  ? &rootFrag.getResultStream(*operation.getParentOperation())
713  : NULL),
714  m_properties(
715  (enum properties)
716  ((operation.getQueryDef().isScanQuery()
717  ? Is_Scan_Query : 0)
718  | (operation.getQueryOperationDef().isScanOperation()
719  ? Is_Scan_Result : 0)
720  | (operation.getQueryOperationDef().getMatchType() != NdbQueryOptions::MatchAll
721  ? Is_Inner_Join : 0))),
722  m_receiver(operation.getQuery().getNdbTransaction().getNdb()),
723  m_resultSets(), m_read(0xffffffff), m_recv(0),
724  m_iterState(Iter_notStarted),
725  m_currentRow(tupleNotFound),
726  m_maxRows(0),
727  m_tupleSet(NULL)
728 {};
729 
730 NdbResultStream::~NdbResultStream()
731 {
732  for (int i = static_cast<int>(m_maxRows)-1; i >= 0; i--)
733  {
734  m_tupleSet[i].~TupleSet();
735  }
736 }
737 
738 void
740 {
741  const Uint32 rowSize = m_operation.getRowSize();
742  NdbQueryImpl &query = m_operation.getQuery();
743 
744  /* Parent / child correlation is only relevant for scan type queries
745  * Don't create a m_tupleSet with these correlation id's for lookups!
746  */
747  if (isScanQuery())
748  {
749  m_maxRows = m_operation.getMaxBatchRows();
750  m_tupleSet =
751  new (query.getTupleSetAlloc().allocObjMem(m_maxRows))
752  TupleSet[m_maxRows];
753  }
754  else
755  m_maxRows = 1;
756 
757  m_resultSets[0].init(query, m_maxRows, rowSize);
758 
759  m_receiver.init(NdbReceiver::NDB_QUERY_OPERATION, false, &m_operation);
760  m_receiver.do_setup_ndbrecord(
761  m_operation.getNdbRecord(),
762  m_maxRows,
763  0 /*key_size*/,
764  0 /*read_range_no*/,
765  rowSize,
766  m_resultSets[m_recv].m_buffer);
767 } //NdbResultStream::prepare
768 
777 Uint16
778 NdbResultStream::findTupleWithParentId(Uint16 parentId) const
779 {
780  assert ((parentId==tupleNotFound) == (m_parent==NULL));
781 
782  if (likely(m_resultSets[m_read].m_rowCount>0))
783  {
784  if (m_tupleSet==NULL)
785  {
786  assert (m_resultSets[m_read].m_rowCount <= 1);
787  return 0;
788  }
789 
790  const Uint16 hash = (parentId % m_maxRows);
791  Uint16 currentRow = m_tupleSet[hash].m_hash_head;
792  while (currentRow != tupleNotFound)
793  {
794  assert(currentRow < m_maxRows);
795  if (m_tupleSet[currentRow].m_skip == false &&
796  m_tupleSet[currentRow].m_parentId == parentId)
797  {
798  return currentRow;
799  }
800  currentRow = m_tupleSet[currentRow].m_hash_next;
801  }
802  }
803  return tupleNotFound;
804 } //NdbResultStream::findTupleWithParentId()
805 
806 
811 Uint16
812 NdbResultStream::findNextTuple(Uint16 tupleNo) const
813 {
814  if (tupleNo!=tupleNotFound && m_tupleSet!=NULL)
815  {
816  assert(tupleNo < m_maxRows);
817  Uint16 parentId = m_tupleSet[tupleNo].m_parentId;
818  Uint16 nextRow = m_tupleSet[tupleNo].m_hash_next;
819 
820  while (nextRow != tupleNotFound)
821  {
822  assert(nextRow < m_maxRows);
823  if (m_tupleSet[nextRow].m_skip == false &&
824  m_tupleSet[nextRow].m_parentId == parentId)
825  {
826  return nextRow;
827  }
828  nextRow = m_tupleSet[nextRow].m_hash_next;
829  }
830  }
831  return tupleNotFound;
832 } //NdbResultStream::findNextTuple()
833 
834 
835 Uint16
837 {
838  Uint16 parentId = tupleNotFound;
839  if (m_parent!=NULL)
840  {
841  parentId = m_parent->getCurrentTupleId();
842  if (parentId == tupleNotFound)
843  {
844  m_currentRow = tupleNotFound;
845  m_iterState = Iter_finished;
846  return tupleNotFound;
847  }
848  }
849 
850  if ((m_currentRow=findTupleWithParentId(parentId)) != tupleNotFound)
851  {
852  m_iterState = Iter_started;
853  m_receiver.setCurrentRow(m_resultSets[m_read].m_buffer, m_currentRow);
854  return m_currentRow;
855  }
856 
857  m_iterState = Iter_finished;
858  return tupleNotFound;
859 } //NdbResultStream::firstResult()
860 
861 Uint16
862 NdbResultStream::nextResult()
863 {
864  // Fetch next row for this stream
865  if (m_currentRow != tupleNotFound &&
866  (m_currentRow=findNextTuple(m_currentRow)) != tupleNotFound)
867  {
868  m_iterState = Iter_started;
869  m_receiver.setCurrentRow(m_resultSets[m_read].m_buffer, m_currentRow);
870  return m_currentRow;
871  }
872  m_iterState = Iter_finished;
873  return tupleNotFound;
874 } //NdbResultStream::nextResult()
875 
879 void
880 NdbResultStream::execTRANSID_AI(const Uint32 *ptr, Uint32 len,
881  TupleCorrelation correlation)
882 {
883  NdbResultSet& receiveSet = m_resultSets[m_recv];
884  if (isScanQuery())
885  {
889  receiveSet.m_correlations[receiveSet.m_rowCount] = correlation;
890  }
891 
892  m_receiver.execTRANSID_AI(ptr, len);
893  receiveSet.m_rowCount++;
894 } // NdbResultStream::execTRANSID_AI()
895 
901 void
903 {
904  assert (isScanQuery());
905 
906  m_iterState = Iter_notStarted;
907  m_currentRow = tupleNotFound;
908  m_resultSets[m_recv].prepareReceive(m_receiver);
909 
914  for (Uint32 childNo = 0; childNo < m_operation.getNoOfChildOperations();
915  childNo++)
916  {
917  NdbQueryOperationImpl& child = m_operation.getChildOperation(childNo);
918  m_rootFrag.getResultStream(child).prepareNextReceiveSet();
919  }
920 } //NdbResultStream::prepareNextReceiveSet
921 
929 bool
930 NdbResultStream::prepareResultSet(Uint32 remainingScans)
931 {
932  bool isComplete = isSubScanComplete(remainingScans); //Childs with more rows
933  assert(isComplete || isScanResult()); //Lookups always 'complete'
934 
935  m_read = m_recv;
936  NdbResultSet& readResult = m_resultSets[m_read];
937 
938  // Set correct buffer and #rows received by this ResultSet.
939  readResult.prepareRead(m_receiver);
940 
946  if (m_tupleSet!=NULL)
947  {
948  const bool newResults = (m_iterState!=Iter_finished);
949  if (newResults)
950  {
951  buildResultCorrelations();
952  }
953  else
954  {
955  // Makes all rows in 'TupleSet' available (clear 'm_skip' flag)
956  for (Uint32 tupleNo=0; tupleNo<readResult.getRowCount(); tupleNo++)
957  {
958  m_tupleSet[tupleNo].m_skip = false;
959  }
960  }
961  }
962 
969  for (Uint32 childNo=0; childNo < m_operation.getNoOfChildOperations(); childNo++)
970  {
971  const NdbQueryOperationImpl& child = m_operation.getChildOperation(childNo);
972  NdbResultStream& childStream = m_rootFrag.getResultStream(child);
973  const bool allSubScansComplete = childStream.prepareResultSet(remainingScans);
974 
975  Uint32 childId = child.getQueryOperationDef().getQueryOperationIx();
976 
977  /* Condition 1) & 2) calc'ed outside loop, see comments further below: */
978  const bool skipNonMatches = !allSubScansComplete || // 1)
979  childStream.isInnerJoin(); // 2)
980 
981  if (m_tupleSet!=NULL)
982  {
983  for (Uint32 tupleNo=0; tupleNo<readResult.getRowCount(); tupleNo++)
984  {
985  if (!m_tupleSet[tupleNo].m_skip)
986  {
987  Uint16 tupleId = getTupleId(tupleNo);
988  if (childStream.findTupleWithParentId(tupleId)!=tupleNotFound)
989  m_tupleSet[tupleNo].m_hasMatchingChild.set(childId);
990 
992  // No child matched for this row. Making parent row visible
993  // will cause a NULL (outer join) row to be produced.
994  // Skip NULL row production when:
995  // 1) Some child batches are not complete; they may contain later matches.
996  // 2) Join type is 'inner join', skip as no child are matching.
997  // 3) A match was found in a previous batch.
998  // Condition 1) & 2) above is precalculated in 'bool skipNonMatches'
999  //
1000  else if (skipNonMatches // 1 & 2)
1001  || m_tupleSet[tupleNo].m_hasMatchingChild.get(childId)) // 3)
1002  m_tupleSet[tupleNo].m_skip = true;
1003  }
1004  }
1005  }
1006  isComplete &= allSubScansComplete;
1007  }
1008 
1009  // Set current position 'before first'
1010  m_iterState = Iter_notStarted;
1011  m_currentRow = tupleNotFound;
1012 
1013  return isComplete;
1014 } // NdbResultStream::prepareResultSet()
1015 
1016 
1029 void
1030 NdbResultStream::buildResultCorrelations()
1031 {
1032  const NdbResultSet& readResult = m_resultSets[m_read];
1033 
1034  // Buffer overrun check.
1035  assert(readResult.m_batchOverflowCheck==NULL ||
1036  *readResult.m_batchOverflowCheck==0xacbd1234);
1037 
1038 //if (m_tupleSet!=NULL)
1039  {
1040  /* Clear the hashmap structures */
1041  for (Uint32 i=0; i<m_maxRows; i++)
1042  {
1043  m_tupleSet[i].m_hash_head = tupleNotFound;
1044  }
1045 
1046  /* Rebuild correlation & hashmap from 'readResult' */
1047  for (Uint32 tupleNo=0; tupleNo<readResult.m_rowCount; tupleNo++)
1048  {
1049  const Uint16 tupleId = readResult.m_correlations[tupleNo].getTupleId();
1050  const Uint16 parentId = (m_parent!=NULL)
1051  ? readResult.m_correlations[tupleNo].getParentTupleId()
1052  : tupleNotFound;
1053 
1054  m_tupleSet[tupleNo].m_skip = false;
1055  m_tupleSet[tupleNo].m_parentId = parentId;
1056  m_tupleSet[tupleNo].m_tupleId = tupleId;
1057  m_tupleSet[tupleNo].m_hasMatchingChild.clear();
1058 
1059  /* Insert into parentId-hashmap */
1060  const Uint16 hash = (parentId % m_maxRows);
1061  if (m_parent==NULL)
1062  {
1063  /* Root stream: Insert sequentially in hash_next to make it
1064  * possible to use ::findTupleWithParentId() and ::findNextTuple()
1065  * to navigate even the root operation.
1066  */
1067  /* Link into m_hash_next in order to let ::findNextTuple() navigate correctly */
1068  if (tupleNo==0)
1069  m_tupleSet[hash].m_hash_head = tupleNo;
1070  else
1071  m_tupleSet[tupleNo-1].m_hash_next = tupleNo;
1072  m_tupleSet[tupleNo].m_hash_next = tupleNotFound;
1073  }
1074  else
1075  {
1076  /* Insert parentId in HashMap */
1077  m_tupleSet[tupleNo].m_hash_next = m_tupleSet[hash].m_hash_head;
1078  m_tupleSet[hash].m_hash_head = tupleNo;
1079  }
1080  }
1081  }
1082 } // NdbResultStream::buildResultCorrelations
1083 
1084 
1089  Uint32 noOfFrags)
1090 {
1091  for(Uint32 fragNo = 0; fragNo < noOfFrags; fragNo++)
1092  {
1093  const Uint32 receiverId = frags[fragNo].getReceiverId();
1098  assert((receiverId & 0x3) == 0);
1099  const int hash =
1100  (receiverId >> 2) % noOfFrags;
1101  frags[fragNo].m_idMapNext = frags[hash].m_idMapHead;
1102  frags[hash].m_idMapHead = fragNo;
1103  }
1104 }
1105 
1106 //static
1109  Uint32 noOfFrags,
1110  Uint32 receiverId)
1111 {
1116  assert((receiverId & 0x3) == 0);
1117  const int hash = (receiverId >> 2) % noOfFrags;
1118  int current = frags[hash].m_idMapHead;
1119  assert(current < static_cast<int>(noOfFrags));
1120  while (current >= 0 && frags[current].getReceiverId() != receiverId)
1121  {
1122  current = frags[current].m_idMapNext;
1123  assert(current < static_cast<int>(noOfFrags));
1124  }
1125  if (unlikely (current < 0))
1126  {
1127  return NULL;
1128  }
1129  else
1130  {
1131  return frags+current;
1132  }
1133 }
1134 
1135 
1136 NdbRootFragment::NdbRootFragment():
1137  m_query(NULL),
1138  m_fragNo(voidFragNo),
1139  m_resultStreams(NULL),
1140  m_availResultSets(0),
1141  m_outstandingResults(0),
1142  m_confReceived(false),
1143  m_remainingScans(0),
1144  m_idMapHead(-1),
1145  m_idMapNext(-1)
1146 {
1147 }
1148 
1149 NdbRootFragment::~NdbRootFragment()
1150 {
1151  assert(m_resultStreams==NULL);
1152 }
1153 
1154 void NdbRootFragment::init(NdbQueryImpl& query, Uint32 fragNo)
1155 {
1156  assert(m_fragNo==voidFragNo);
1157  m_query = &query;
1158  m_fragNo = fragNo;
1159 
1160  m_resultStreams = reinterpret_cast<NdbResultStream*>
1161  (query.getResultStreamAlloc().allocObjMem(query.getNoOfOperations()));
1162  assert(m_resultStreams!=NULL);
1163 
1164  for (unsigned opNo=0; opNo<query.getNoOfOperations(); opNo++)
1165  {
1166  NdbQueryOperationImpl& op = query.getQueryOperation(opNo);
1167  new (&m_resultStreams[opNo]) NdbResultStream(op,*this);
1168  m_resultStreams[opNo].prepare();
1169  }
1170 }
1171 
1176 void
1178 {
1179  if (m_resultStreams != NULL)
1180  {
1181  for (unsigned opNo=0; opNo<m_query->getNoOfOperations(); opNo++)
1182  {
1183  m_resultStreams[opNo].~NdbResultStream();
1184  }
1185  }
1191  m_resultStreams = NULL;
1192 }
1193 
1195 NdbRootFragment::getResultStream(Uint32 operationNo) const
1196 {
1197  assert(m_resultStreams);
1198  return m_resultStreams[operationNo];
1199 }
1200 
1204 //static
1205 void NdbRootFragment::clear(NdbRootFragment* rootFrags, Uint32 noOfFrags)
1206 {
1207  if (rootFrags != NULL)
1208  {
1209  for (Uint32 fragNo = 0; fragNo < noOfFrags; fragNo++)
1210  {
1211  rootFrags[fragNo].m_availResultSets = 0;
1212  }
1213  }
1214 }
1215 
1223 {
1224  assert(m_availResultSets==0);
1225  m_availResultSets++;
1226 }
1227 
1236 {
1237  return (m_availResultSets > 0);
1238 }
1239 
1241 {
1242  assert(m_fragNo!=voidFragNo);
1243  assert(m_outstandingResults == 0);
1244  assert(m_confReceived);
1245 
1246  for (unsigned opNo=0; opNo<m_query->getNoOfOperations(); opNo++)
1247  {
1248  NdbResultStream& resultStream = getResultStream(opNo);
1249  if (!resultStream.isSubScanComplete(m_remainingScans))
1250  {
1255  resultStream.prepareNextReceiveSet();
1256  }
1257  }
1258  m_confReceived = false;
1259 }
1260 
1268 {
1269  assert(m_availResultSets>0);
1270  m_availResultSets--;
1271 
1272  NdbResultStream& rootStream = getResultStream(0);
1273  rootStream.prepareResultSet(m_remainingScans);
1274 
1275  /* Position at the first (sorted?) row available from this fragments.
1276  */
1277  rootStream.firstResult();
1278 }
1279 
1280 void NdbRootFragment::setConfReceived(Uint32 tcPtrI)
1281 {
1282  /* For a query with a lookup root, there may be more than one TCKEYCONF
1283  message. For a scan, there should only be one SCAN_TABCONF per root
1284  fragment.
1285  */
1286  assert(!getResultStream(0).isScanQuery() || !m_confReceived);
1287  getResultStream(0).getReceiver().m_tcPtrI = tcPtrI;
1288  m_confReceived = true;
1289 }
1290 
1292 {
1293  return m_confReceived && getReceiverTcPtrI()==RNIL;
1294 }
1295 
1297 {
1298  return getResultStream(0).isEmpty();
1299 }
1300 
1310 {
1311  return getResultStream(0).getReceiver().getId();
1312 }
1313 
1314 Uint32 NdbRootFragment::getReceiverTcPtrI() const
1315 {
1316  return getResultStream(0).getReceiver().m_tcPtrI;
1317 }
1318 
1322 
1323 NdbQuery::NdbQuery(NdbQueryImpl& impl):
1324  m_impl(impl)
1325 {}
1326 
1327 NdbQuery::~NdbQuery()
1328 {}
1329 
1330 Uint32
1331 NdbQuery::getNoOfOperations() const
1332 {
1333  return m_impl.getNoOfOperations();
1334 }
1335 
1337 NdbQuery::getQueryOperation(Uint32 index) const
1338 {
1339  return &m_impl.getQueryOperation(index).getInterface();
1340 }
1341 
1343 NdbQuery::getQueryOperation(const char* ident) const
1344 {
1345  NdbQueryOperationImpl* op = m_impl.getQueryOperation(ident);
1346  return (op) ? &op->getInterface() : NULL;
1347 }
1348 
1349 Uint32
1350 NdbQuery::getNoOfParameters() const
1351 {
1352  return m_impl.getNoOfParameters();
1353 }
1354 
1355 const NdbParamOperand*
1356 NdbQuery::getParameter(const char* name) const
1357 {
1358  return m_impl.getParameter(name);
1359 }
1360 
1361 const NdbParamOperand*
1362 NdbQuery::getParameter(Uint32 num) const
1363 {
1364  return m_impl.getParameter(num);
1365 }
1366 
1367 int
1368 NdbQuery::setBound(const NdbRecord *keyRecord,
1369  const NdbIndexScanOperation::IndexBound *bound)
1370 {
1371  const int error = m_impl.setBound(keyRecord,bound);
1372  if (unlikely(error)) {
1373  m_impl.setErrorCode(error);
1374  return -1;
1375  } else {
1376  return 0;
1377  }
1378 }
1379 
1381 NdbQuery::nextResult(bool fetchAllowed, bool forceSend)
1382 {
1383  return m_impl.nextResult(fetchAllowed, forceSend);
1384 }
1385 
1386 void
1387 NdbQuery::close(bool forceSend)
1388 {
1389  m_impl.close(forceSend);
1390 }
1391 
1394 {
1395  return &m_impl.getNdbTransaction();
1396 }
1397 
1398 const NdbError&
1400  return m_impl.getNdbError();
1401 };
1402 
1403 int NdbQuery::isPrunable(bool& prunable) const
1404 {
1405  return m_impl.isPrunable(prunable);
1406 }
1407 
1408 NdbQueryOperation::NdbQueryOperation(NdbQueryOperationImpl& impl)
1409  :m_impl(impl)
1410 {}
1411 NdbQueryOperation::~NdbQueryOperation()
1412 {}
1413 
1414 Uint32
1415 NdbQueryOperation::getNoOfParentOperations() const
1416 {
1417  return m_impl.getNoOfParentOperations();
1418 }
1419 
1421 NdbQueryOperation::getParentOperation(Uint32 i) const
1422 {
1423  return &m_impl.getParentOperation(i).getInterface();
1424 }
1425 
1426 Uint32
1427 NdbQueryOperation::getNoOfChildOperations() const
1428 {
1429  return m_impl.getNoOfChildOperations();
1430 }
1431 
1433 NdbQueryOperation::getChildOperation(Uint32 i) const
1434 {
1435  return &m_impl.getChildOperation(i).getInterface();
1436 }
1437 
1438 const NdbQueryOperationDef&
1439 NdbQueryOperation::getQueryOperationDef() const
1440 {
1441  return m_impl.getQueryOperationDef().getInterface();
1442 }
1443 
1444 NdbQuery&
1445 NdbQueryOperation::getQuery() const {
1446  return m_impl.getQuery().getInterface();
1447 };
1448 
1449 NdbRecAttr*
1450 NdbQueryOperation::getValue(const char* anAttrName,
1451  char* resultBuffer)
1452 {
1453  return m_impl.getValue(anAttrName, resultBuffer);
1454 }
1455 
1456 NdbRecAttr*
1457 NdbQueryOperation::getValue(Uint32 anAttrId,
1458  char* resultBuffer)
1459 {
1460  return m_impl.getValue(anAttrId, resultBuffer);
1461 }
1462 
1463 NdbRecAttr*
1465  char* resultBuffer)
1466 {
1467  if (unlikely(column==NULL)) {
1468  m_impl.getQuery().setErrorCode(QRY_REQ_ARG_IS_NULL);
1469  return NULL;
1470  }
1471  return m_impl.getValue(NdbColumnImpl::getImpl(*column), resultBuffer);
1472 }
1473 
1474 int
1476  const NdbRecord *rec,
1477  char* resBuffer,
1478  const unsigned char* result_mask)
1479 {
1480  if (unlikely(rec==0 || resBuffer==0)) {
1481  m_impl.getQuery().setErrorCode(QRY_REQ_ARG_IS_NULL);
1482  return -1;
1483  }
1484  return m_impl.setResultRowBuf(rec, resBuffer, result_mask);
1485 }
1486 
1487 int
1488 NdbQueryOperation::setResultRowRef (
1489  const NdbRecord* rec,
1490  const char* & bufRef,
1491  const unsigned char* result_mask)
1492 {
1493  return m_impl.setResultRowRef(rec, bufRef, result_mask);
1494 }
1495 
1496 int
1498 {
1499  return m_impl.setOrdering(ordering);
1500 }
1501 
1504 {
1505  return m_impl.getOrdering();
1506 }
1507 
1508 int NdbQueryOperation::setParallelism(Uint32 parallelism){
1509  return m_impl.setParallelism(parallelism);
1510 }
1511 
1513  return m_impl.setMaxParallelism();
1514 }
1515 
1517  return m_impl.setAdaptiveParallelism();
1518 }
1519 
1520 int NdbQueryOperation::setBatchSize(Uint32 batchSize){
1521  return m_impl.setBatchSize(batchSize);
1522 }
1523 
1525 {
1526  return m_impl.setInterpretedCode(code);
1527 }
1528 
1531 {
1532  return m_impl.firstResult();
1533 }
1534 
1536 NdbQueryOperation::nextResult(bool fetchAllowed, bool forceSend)
1537 {
1538  return m_impl.nextResult(fetchAllowed, forceSend);
1539 }
1540 
1541 bool
1542 NdbQueryOperation::isRowNULL() const
1543 {
1544  return m_impl.isRowNULL();
1545 }
1546 
1547 bool
1548 NdbQueryOperation::isRowChanged() const
1549 {
1550  return m_impl.isRowChanged();
1551 }
1552 
1556 
1557 enum Type
1558 {
1559  Type_NULL,
1560  Type_raw, // Raw data formated according to bound Column format.
1561  Type_raw_shrink, // As Type_raw, except short VarChar has to be shrinked.
1562  Type_string, // '\0' terminated C-type string, char/varchar data only
1563  Type_Uint16,
1564  Type_Uint32,
1565  Type_Uint64,
1566  Type_Double
1567 };
1568 
1569 NdbQueryParamValue::NdbQueryParamValue(Uint16 val) : m_type(Type_Uint16)
1570 { m_value.uint16 = val; }
1571 
1572 NdbQueryParamValue::NdbQueryParamValue(Uint32 val) : m_type(Type_Uint32)
1573 { m_value.uint32 = val; }
1574 
1575 NdbQueryParamValue::NdbQueryParamValue(Uint64 val) : m_type(Type_Uint64)
1576 { m_value.uint64 = val; }
1577 
1578 NdbQueryParamValue::NdbQueryParamValue(double val) : m_type(Type_Double)
1579 { m_value.dbl = val; }
1580 
1581 // C-type string, terminated by '\0'
1582 NdbQueryParamValue::NdbQueryParamValue(const char* val) : m_type(Type_string)
1583 { m_value.string = val; }
1584 
1585 // Raw data
1586 NdbQueryParamValue::NdbQueryParamValue(const void* val, bool shrinkVarChar)
1587  : m_type(shrinkVarChar ? Type_raw_shrink : Type_raw)
1588 { m_value.raw = val; }
1589 
1590 // NULL-value, also used as optional end marker
1591 NdbQueryParamValue::NdbQueryParamValue() : m_type(Type_NULL)
1592 {}
1593 
1594 
1595 int
1597  Uint32Buffer& dst,
1598  Uint32& len,
1599  bool& isNull) const
1600 {
1601  const Uint32 maxSize = column.getSizeInBytes();
1602  isNull = false;
1603  // Start at (32-bit) word boundary.
1604  dst.skipRestOfWord();
1605 
1606  // Fetch parameter value and length.
1607  // Rudimentary typecheck of paramvalue: At least length should be as expected:
1608  // - Extend with more types if required
1609  // - Considder to add simple type conversion, ex: Int64 -> Int32
1610  // - Or
1611  // -- Represent all exact numeric as Int64 and convert to 'smaller' int
1612  // -- Represent all floats as Double and convert to smaller floats
1613  //
1614  switch(m_type)
1615  {
1616  case Type_NULL:
1617  isNull = true;
1618  len = 0;
1619  break;
1620 
1621  case Type_Uint16:
1622  if (unlikely(column.getType() != NdbDictionary::Column::Smallint &&
1624  return QRY_PARAMETER_HAS_WRONG_TYPE;
1625 
1626  len = static_cast<Uint32>(sizeof(m_value.uint16));
1627  DBUG_ASSERT(len == maxSize);
1628  dst.appendBytes(&m_value.uint16, len);
1629  break;
1630 
1631  case Type_Uint32:
1632  if (unlikely(column.getType() != NdbDictionary::Column::Int &&
1634  return QRY_PARAMETER_HAS_WRONG_TYPE;
1635 
1636  len = static_cast<Uint32>(sizeof(m_value.uint32));
1637  DBUG_ASSERT(len == maxSize);
1638  dst.appendBytes(&m_value.uint32, len);
1639  break;
1640 
1641  case Type_Uint64:
1642  if (unlikely(column.getType() != NdbDictionary::Column::Bigint &&
1644  return QRY_PARAMETER_HAS_WRONG_TYPE;
1645 
1646  len = static_cast<Uint32>(sizeof(m_value.uint64));
1647  DBUG_ASSERT(len == maxSize);
1648  dst.appendBytes(&m_value.uint64, len);
1649  break;
1650 
1651  case Type_Double:
1652  if (unlikely(column.getType() != NdbDictionary::Column::Double))
1653  return QRY_PARAMETER_HAS_WRONG_TYPE;
1654 
1655  len = static_cast<Uint32>(sizeof(m_value.dbl));
1656  DBUG_ASSERT(len == maxSize);
1657  dst.appendBytes(&m_value.dbl, len);
1658  break;
1659 
1660  case Type_string:
1661  if (unlikely(column.getType() != NdbDictionary::Column::Char &&
1664  return QRY_PARAMETER_HAS_WRONG_TYPE;
1665  {
1666  len = static_cast<Uint32>(strlen(m_value.string));
1667  if (unlikely(len > maxSize))
1668  return QRY_CHAR_PARAMETER_TRUNCATED;
1669 
1670  dst.appendBytes(m_value.string, len);
1671  }
1672  break;
1673 
1674  case Type_raw:
1675  // 'Raw' data is readily formated according to the bound column
1676  if (likely(column.m_arrayType == NDB_ARRAYTYPE_FIXED))
1677  {
1678  len = maxSize;
1679  dst.appendBytes(m_value.raw, maxSize);
1680  }
1681  else if (column.m_arrayType == NDB_ARRAYTYPE_SHORT_VAR)
1682  {
1683  len = 1+*((Uint8*)(m_value.raw));
1684 
1685  DBUG_ASSERT(column.getType() == NdbDictionary::Column::Varchar ||
1687  if (unlikely(len > 1+static_cast<Uint32>(column.getLength())))
1688  return QRY_CHAR_PARAMETER_TRUNCATED;
1689 
1690  dst.appendBytes(m_value.raw, len);
1691  }
1692  else if (column.m_arrayType == NDB_ARRAYTYPE_MEDIUM_VAR)
1693  {
1694  len = 2+uint2korr((Uint8*)m_value.raw);
1695 
1696  DBUG_ASSERT(column.getType() == NdbDictionary::Column::Longvarchar ||
1698  if (unlikely(len > 2+static_cast<Uint32>(column.getLength())))
1699  return QRY_CHAR_PARAMETER_TRUNCATED;
1700  dst.appendBytes(m_value.raw, len);
1701  }
1702  else
1703  {
1704  DBUG_ASSERT(0);
1705  }
1706  break;
1707 
1708  case Type_raw_shrink:
1709  // Only short VarChar can be shrinked
1710  if (unlikely(column.m_arrayType != NDB_ARRAYTYPE_SHORT_VAR))
1711  return QRY_PARAMETER_HAS_WRONG_TYPE;
1712 
1713  DBUG_ASSERT(column.getType() == NdbDictionary::Column::Varchar ||
1715 
1716  {
1717  // Convert from two-byte to one-byte length field.
1718  len = 1+uint2korr((Uint8*)m_value.raw);
1719  assert(len <= 0x100);
1720 
1721  if (unlikely(len > 1+static_cast<Uint32>(column.getLength())))
1722  return QRY_CHAR_PARAMETER_TRUNCATED;
1723 
1724  const Uint8 shortLen = static_cast<Uint8>(len-1);
1725  dst.appendBytes(&shortLen, 1);
1726  dst.appendBytes(((Uint8*)m_value.raw)+2, shortLen);
1727  }
1728  break;
1729 
1730  default:
1731  assert(false);
1732  }
1733  if (unlikely(dst.isMemoryExhausted())) {
1734  return Err_MemoryAlloc;
1735  }
1736  return 0;
1737 } // NdbQueryParamValue::serializeValue
1738 
1742 
1743 NdbQueryImpl::NdbQueryImpl(NdbTransaction& trans,
1744  const NdbQueryDefImpl& queryDef):
1745  m_interface(*this),
1746  m_state(Initial),
1747  m_tcState(Inactive),
1748  m_next(NULL),
1749  m_queryDef(&queryDef),
1750  m_error(),
1751  m_errorReceived(0),
1752  m_transaction(trans),
1753  m_scanTransaction(NULL),
1754  m_operations(0),
1755  m_countOperations(0),
1756  m_globalCursor(0),
1757  m_pendingFrags(0),
1758  m_rootFragCount(0),
1759  m_rootFrags(NULL),
1760  m_applFrags(),
1761  m_finalBatchFrags(0),
1762  m_num_bounds(0),
1763  m_shortestBound(0xffffffff),
1764  m_attrInfo(),
1765  m_keyInfo(),
1766  m_startIndicator(false),
1767  m_commitIndicator(false),
1768  m_prunability(Prune_No),
1769  m_pruneHashVal(0),
1770  m_operationAlloc(sizeof(NdbQueryOperationImpl)),
1771  m_tupleSetAlloc(sizeof(NdbResultStream::TupleSet)),
1772  m_resultStreamAlloc(sizeof(NdbResultStream)),
1773  m_pointerAlloc(sizeof(void*)),
1774  m_rowBufferAlloc(sizeof(char))
1775 {
1776  // Allocate memory for all m_operations[] in a single chunk
1777  m_countOperations = queryDef.getNoOfOperations();
1778  const int error = m_operationAlloc.init(m_countOperations);
1779  if (unlikely(error != 0))
1780  {
1781  setErrorCode(error);
1782  return;
1783  }
1784  m_operations = reinterpret_cast<NdbQueryOperationImpl*>
1785  (m_operationAlloc.allocObjMem(m_countOperations));
1786 
1787  // Then; use placement new to construct each individual
1788  // NdbQueryOperationImpl object in m_operations
1789  for (Uint32 i=0; i<m_countOperations; ++i)
1790  {
1791  const NdbQueryOperationDefImpl& def = queryDef.getQueryOperation(i);
1792  new(&m_operations[i]) NdbQueryOperationImpl(*this, def);
1793  // Failed to create NdbQueryOperationImpl object.
1794  if (m_error.code != 0)
1795  {
1796  // Destroy those objects that we have already constructed.
1797  for (int j = static_cast<int>(i)-1; j>= 0; j--)
1798  {
1799  m_operations[j].~NdbQueryOperationImpl();
1800  }
1801  m_operations = NULL;
1802  return;
1803  }
1804  }
1805 
1806  // Serialized QueryTree definition is first part of ATTRINFO.
1807  m_attrInfo.append(queryDef.getSerialized());
1808 }
1809 
1810 NdbQueryImpl::~NdbQueryImpl()
1811 {
1817  assert(m_state==Closed);
1818  assert(m_rootFrags==NULL);
1819 
1820  // NOTE: m_operations[] was allocated as a single memory chunk with
1821  // placement new construction of each operation.
1822  // Requires explicit call to d'tor of each operation before memory is free'ed.
1823  if (m_operations != NULL) {
1824  for (int i=m_countOperations-1; i>=0; --i)
1825  { m_operations[i].~NdbQueryOperationImpl();
1826  }
1827  m_operations = NULL;
1828  }
1829  m_state = Destructed;
1830 }
1831 
1832 void
1833 NdbQueryImpl::postFetchRelease()
1834 {
1835  if (m_rootFrags != NULL)
1836  {
1837  for (unsigned i=0; i<m_rootFragCount; i++)
1838  { m_rootFrags[i].postFetchRelease();
1839  }
1840  }
1841  if (m_operations != NULL)
1842  {
1843  for (unsigned i=0; i<m_countOperations; i++)
1844  { m_operations[i].postFetchRelease();
1845  }
1846  }
1847  delete[] m_rootFrags;
1848  m_rootFrags = NULL;
1849 
1850  m_rowBufferAlloc.reset();
1851  m_tupleSetAlloc.reset();
1852  m_resultStreamAlloc.reset();
1853 }
1854 
1855 
1856 //static
1857 NdbQueryImpl*
1859  const NdbQueryDefImpl& queryDef)
1860 {
1861  if (queryDef.getNoOfOperations()==0) {
1862  trans.setErrorCode(QRY_HAS_ZERO_OPERATIONS);
1863  return NULL;
1864  }
1865 
1866  NdbQueryImpl* const query = new NdbQueryImpl(trans, queryDef);
1867  if (unlikely(query==NULL)) {
1868  trans.setOperationErrorCodeAbort(Err_MemoryAlloc);
1869  return NULL;
1870  }
1871  if (unlikely(query->m_error.code != 0))
1872  {
1873  // Transaction error code set already.
1874  delete query;
1875  return NULL;
1876  }
1877  assert(query->m_state==Initial);
1878  return query;
1879 }
1880 
1881 
1888 int
1890 {
1896  // Build explicit key/filter/bounds for root operation, possibly refering paramValues
1897  const int error = getRoot().prepareKeyInfo(m_keyInfo, paramValues);
1898  if (unlikely(error != 0))
1899  {
1900  setErrorCode(error);
1901  return -1;
1902  }
1903 
1904  // Serialize parameter values for the other (non-root) operations
1905  // (No need to serialize for root (i==0) as root key is part of keyInfo above)
1906  for (Uint32 i=1; i<getNoOfOperations(); ++i)
1907  {
1908  if (getQueryDef().getQueryOperation(i).getNoOfParameters() > 0)
1909  {
1910  const int error = getQueryOperation(i).serializeParams(paramValues);
1911  if (unlikely(error != 0))
1912  {
1913  setErrorCode(error);
1914  return -1;
1915  }
1916  }
1917  }
1918  assert(m_state<Defined);
1919  m_state = Defined;
1920  return 0;
1921 } // NdbQueryImpl::assignParameters
1922 
1923 
1924 static int
1925 insert_bound(Uint32Buffer& keyInfo, const NdbRecord *key_record,
1926  Uint32 column_index,
1927  const char *row,
1928  Uint32 bound_type)
1929 {
1930  char buf[NdbRecord::Attr::SHRINK_VARCHAR_BUFFSIZE];
1931  const NdbRecord::Attr *column= &key_record->columns[column_index];
1932 
1933  bool is_null= column->is_null(row);
1934  Uint32 len= 0;
1935  const void *aValue= row+column->offset;
1936 
1937  if (!is_null)
1938  {
1939  bool len_ok;
1940  /* Support for special mysqld varchar format in keys. */
1941  if (column->flags & NdbRecord::IsMysqldShrinkVarchar)
1942  {
1943  len_ok= column->shrink_varchar(row, len, buf);
1944  aValue= buf;
1945  }
1946  else
1947  {
1948  len_ok= column->get_var_length(row, len);
1949  }
1950  if (!len_ok) {
1951  return Err_WrongFieldLength;
1952  }
1953  }
1954 
1955  AttributeHeader ah(column->index_attrId, len);
1956  keyInfo.append(bound_type);
1957  keyInfo.append(ah.m_value);
1958  keyInfo.appendBytes(aValue,len);
1959 
1960  return 0;
1961 }
1962 
1963 
1964 int
1965 NdbQueryImpl::setBound(const NdbRecord *key_record,
1966  const NdbIndexScanOperation::IndexBound *bound)
1967 {
1968  m_prunability = Prune_Unknown;
1969  if (unlikely(bound==NULL))
1970  return QRY_REQ_ARG_IS_NULL;
1971 
1972  assert (getRoot().getQueryOperationDef().getType()
1974  int startPos = m_keyInfo.getSize();
1975 
1976  // We don't handle both NdbQueryIndexBound defined in ::scanIndex()
1977  // in combination with a later ::setBound(NdbIndexScanOperation::IndexBound)
1978 //assert (m_bound.lowKeys==0 && m_bound.highKeys==0);
1979 
1980  if (unlikely(bound->range_no > NdbIndexScanOperation::MaxRangeNo))
1981  {
1982  // setErrorCodeAbort(4286);
1983  return Err_InvalidRangeNo;
1984  }
1985  assert (bound->range_no == m_num_bounds);
1986  m_num_bounds++;
1987 
1988  Uint32 key_count= bound->low_key_count;
1989  Uint32 common_key_count= key_count;
1990  if (key_count < bound->high_key_count)
1991  key_count= bound->high_key_count;
1992  else
1993  common_key_count= bound->high_key_count;
1994 
1995  if (m_shortestBound > common_key_count)
1996  {
1997  m_shortestBound = common_key_count;
1998  }
1999  /* Has the user supplied an open range (no bounds)? */
2000  const bool openRange= ((bound->low_key == NULL || bound->low_key_count == 0) &&
2001  (bound->high_key == NULL || bound->high_key_count == 0));
2002  if (likely(!openRange))
2003  {
2004  /* If low and high key pointers are the same and key counts are
2005  * the same, we send as an Eq bound to save bandwidth.
2006  * This will not send an EQ bound if :
2007  * - Different numbers of high and low keys are EQ
2008  * - High and low keys are EQ, but use different ptrs
2009  */
2010  const bool isEqRange=
2011  (bound->low_key == bound->high_key) &&
2012  (bound->low_key_count == bound->high_key_count) &&
2013  (bound->low_inclusive && bound->high_inclusive); // Does this matter?
2014 
2015  if (isEqRange)
2016  {
2017  /* Using BoundEQ will result in bound being sent only once */
2018  for (unsigned j= 0; j<key_count; j++)
2019  {
2020  const int error=
2021  insert_bound(m_keyInfo, key_record, key_record->key_indexes[j],
2022  bound->low_key, NdbIndexScanOperation::BoundEQ);
2023  if (unlikely(error))
2024  return error;
2025  }
2026  }
2027  else
2028  {
2029  /* Distinct upper and lower bounds, must specify them independently */
2030  /* Note : Protocol allows individual columns to be specified as EQ
2031  * or some prefix of columns. This is not currently supported from
2032  * NDBAPI.
2033  */
2034  for (unsigned j= 0; j<key_count; j++)
2035  {
2036  Uint32 bound_type;
2037  /* If key is part of lower bound */
2038  if (bound->low_key && j<bound->low_key_count)
2039  {
2040  /* Inclusive if defined, or matching rows can include this value */
2041  bound_type= bound->low_inclusive || j+1 < bound->low_key_count ?
2043  const int error=
2044  insert_bound(m_keyInfo, key_record, key_record->key_indexes[j],
2045  bound->low_key, bound_type);
2046  if (unlikely(error))
2047  return error;
2048  }
2049  /* If key is part of upper bound */
2050  if (bound->high_key && j<bound->high_key_count)
2051  {
2052  /* Inclusive if defined, or matching rows can include this value */
2053  bound_type= bound->high_inclusive || j+1 < bound->high_key_count ?
2055  const int error=
2056  insert_bound(m_keyInfo, key_record, key_record->key_indexes[j],
2057  bound->high_key, bound_type);
2058  if (unlikely(error))
2059  return error;
2060  }
2061  }
2062  }
2063  }
2064  else
2065  {
2066  /* Open range - all rows must be returned.
2067  * To encode this, we'll request all rows where the first
2068  * key column value is >= NULL
2069  */
2070  AttributeHeader ah(0, 0);
2071  m_keyInfo.append(NdbIndexScanOperation::BoundLE);
2072  m_keyInfo.append(ah.m_value);
2073  }
2074 
2075  Uint32 length = m_keyInfo.getSize()-startPos;
2076  if (unlikely(m_keyInfo.isMemoryExhausted())) {
2077  return Err_MemoryAlloc;
2078  } else if (unlikely(length > 0xFFFF)) {
2079  return QRY_DEFINITION_TOO_LARGE; // Query definition too large.
2080  } else if (likely(length > 0)) {
2081  m_keyInfo.put(startPos, m_keyInfo.get(startPos) | (length << 16) | (bound->range_no << 4));
2082  }
2083 
2084 #ifdef TRACE_SERIALIZATION
2085  ndbout << "Serialized KEYINFO w/ bounds for indexScan root : ";
2086  for (Uint32 i = startPos; i < m_keyInfo.getSize(); i++) {
2087  char buf[12];
2088  sprintf(buf, "%.8x", m_keyInfo.get(i));
2089  ndbout << buf << " ";
2090  }
2091  ndbout << endl;
2092 #endif
2093 
2094  assert(m_state<=Defined);
2095  m_state = Defined;
2096  return 0;
2097 } // NdbQueryImpl::setBound()
2098 
2099 
2100 Uint32
2102 {
2103  return m_countOperations;
2104 }
2105 
2106 Uint32
2108 {
2109  return getQueryOperation(Uint32(0)).getNoOfLeafOperations();
2110 }
2111 
2113 NdbQueryImpl::getQueryOperation(Uint32 index) const
2114 {
2115  assert(index<m_countOperations);
2116  return m_operations[index];
2117 }
2118 
2120 NdbQueryImpl::getQueryOperation(const char* ident) const
2121 {
2122  for(Uint32 i = 0; i<m_countOperations; i++){
2123  if(strcmp(m_operations[i].getQueryOperationDef().getName(), ident) == 0){
2124  return &m_operations[i];
2125  }
2126  }
2127  return NULL;
2128 }
2129 
2130 Uint32
2132 {
2133  return 0; // FIXME
2134 }
2135 
2136 const NdbParamOperand*
2137 NdbQueryImpl::getParameter(const char* name) const
2138 {
2139  return NULL; // FIXME
2140 }
2141 
2142 const NdbParamOperand*
2143 NdbQueryImpl::getParameter(Uint32 num) const
2144 {
2145  return NULL; // FIXME
2146 }
2147 
2158 NdbQueryImpl::nextResult(bool fetchAllowed, bool forceSend)
2159 {
2160  if (unlikely(m_state < Executing || m_state >= Closed)) {
2161  assert (m_state >= Initial && m_state < Destructed);
2162  if (m_state == Failed)
2163  setErrorCode(QRY_IN_ERROR_STATE);
2164  else
2165  setErrorCode(QRY_ILLEGAL_STATE);
2166  DEBUG_CRASH();
2167  return NdbQuery::NextResult_error;
2168  }
2169 
2170  assert (m_globalCursor < getNoOfOperations());
2171 
2172  while (m_state != EndOfData) // Or likely: return when 'gotRow'
2173  {
2175  getQueryOperation(m_globalCursor).nextResult(fetchAllowed,forceSend);
2176 
2177  if (unlikely(res == NdbQuery::NextResult_error))
2178  return res;
2179 
2180  else if (res == NdbQuery::NextResult_scanComplete)
2181  {
2182  if (m_globalCursor == 0) // Completed reading all results from root
2183  break;
2184  m_globalCursor--; // Get 'next' from ancestor
2185  }
2186 
2187  else if (res == NdbQuery::NextResult_gotRow)
2188  {
2189  // Position to 'firstResult()' for all childs.
2190  // Update m_globalCursor to itterate from last operation with results next time
2191  //
2192  for (uint child=m_globalCursor+1; child<getNoOfOperations(); child++)
2193  {
2194  res = getQueryOperation(child).firstResult();
2195  if (unlikely(res == NdbQuery::NextResult_error))
2196  return res;
2197  else if (res == NdbQuery::NextResult_gotRow)
2198  m_globalCursor = child;
2199  }
2200  return NdbQuery::NextResult_gotRow;
2201  }
2202  else
2203  {
2204  assert (res == NdbQuery::NextResult_bufferEmpty);
2205  return res;
2206  }
2207  }
2208 
2209  assert (m_state == EndOfData);
2210  return NdbQuery::NextResult_scanComplete;
2211 
2212 } //NdbQueryImpl::nextResult()
2213 
2214 
2222 NdbQueryImpl::nextRootResult(bool fetchAllowed, bool forceSend)
2223 {
2224  /* To minimize lock contention, each query has the separate root fragment
2225  * conatiner 'm_applFrags'. m_applFrags is only accessed by the application
2226  * thread, so it is safe to use it without locks.
2227  */
2228  while (m_state != EndOfData) // Or likely: return when 'gotRow' or error
2229  {
2230  const NdbRootFragment* rootFrag = m_applFrags.getCurrent();
2231  if (unlikely(rootFrag==NULL))
2232  {
2233  /* m_applFrags is empty, so we cannot get more results without
2234  * possibly blocking.
2235  *
2236  * ::awaitMoreResults() will either copy fragments that are already
2237  * complete (under mutex protection), or block until data
2238  * previously requested arrives.
2239  */
2240  const FetchResult fetchResult = awaitMoreResults(forceSend);
2241  switch (fetchResult) {
2242 
2243  case FetchResult_ok: // OK - got data wo/ error
2244  assert(m_state != Failed);
2245  rootFrag = m_applFrags.getCurrent();
2246  assert (rootFrag!=NULL);
2247  break;
2248 
2249  case FetchResult_noMoreData: // No data, no error
2250  assert(m_state != Failed);
2251  assert (m_applFrags.getCurrent()==NULL);
2252  getRoot().nullifyResult();
2253  m_state = EndOfData;
2254  postFetchRelease();
2255  return NdbQuery::NextResult_scanComplete;
2256 
2257  case FetchResult_noMoreCache: // No cached data, no error
2258  assert(m_state != Failed);
2259  assert (m_applFrags.getCurrent()==NULL);
2260  getRoot().nullifyResult();
2261  if (fetchAllowed)
2262  {
2263  break; // ::sendFetchMore() may request more results
2264  }
2265  return NdbQuery::NextResult_bufferEmpty;
2266 
2267  case FetchResult_gotError: // Error in 'm_error.code'
2268  assert (m_error.code != 0);
2269  return NdbQuery::NextResult_error;
2270 
2271  default:
2272  assert(false);
2273  }
2274  }
2275  else
2276  {
2277  assert(rootFrag->isFragBatchComplete());
2278  rootFrag->getResultStream(0).nextResult(); // Consume current
2279  m_applFrags.reorganize(); // Calculate new current
2280  // Reorg. may update 'current' RootFragment
2281  rootFrag = m_applFrags.getCurrent();
2282  }
2283 
2290  if (fetchAllowed)
2291  {
2292  // Ask for a new batch if we emptied some.
2293  NdbRootFragment** frags;
2294  const Uint32 cnt = m_applFrags.getFetchMore(frags);
2295  if (cnt > 0 && sendFetchMore(frags, cnt, forceSend) != 0)
2296  {
2297  return NdbQuery::NextResult_error;
2298  }
2299  }
2300 
2301  if (rootFrag!=NULL)
2302  {
2303  assert(rootFrag->isFragBatchComplete());
2304  getRoot().fetchRow(rootFrag->getResultStream(0));
2305  return NdbQuery::NextResult_gotRow;
2306  }
2307  } // m_state != EndOfData
2308 
2309  assert (m_state == EndOfData);
2310  return NdbQuery::NextResult_scanComplete;
2311 } //NdbQueryImpl::nextRootResult()
2312 
2313 
2319 NdbQueryImpl::FetchResult
2320 NdbQueryImpl::awaitMoreResults(bool forceSend)
2321 {
2322  assert(m_applFrags.getCurrent() == NULL);
2323 
2324  /* Check if there are any more completed fragments available.*/
2325  if (getQueryDef().isScanQuery())
2326  {
2327  assert (m_scanTransaction);
2328  assert (m_state==Executing);
2329 
2330  NdbImpl* const ndb = m_transaction.getNdb()->theImpl;
2331  {
2332  /* This part needs to be done under mutex due to synchronization with
2333  * receiver thread.
2334  */
2335  PollGuard poll_guard(*ndb);
2336 
2337  /* There may be pending (asynchronous received, mutex protected) errors
2338  * from TC / datanodes. Propogate these into m_error.code in 'API space'.
2339  */
2340  while (likely(!hasReceivedError()))
2341  {
2342  /* Scan m_rootFrags (under mutex protection) for fragments
2343  * which has received a complete batch. Add these to m_applFrags.
2344  */
2345  m_applFrags.prepareMoreResults(m_rootFrags,m_rootFragCount);
2346  if (m_applFrags.getCurrent() != NULL)
2347  {
2348  return FetchResult_ok;
2349  }
2350 
2351  /* There are no more available fragment results available without
2352  * first waiting for more to be received from the datanodes
2353  */
2354  if (m_pendingFrags == 0)
2355  {
2356  // 'No more *pending* results', ::sendFetchMore() may make more available
2357  return (m_finalBatchFrags < getRootFragCount()) ? FetchResult_noMoreCache
2358  : FetchResult_noMoreData;
2359  }
2360 
2361  const Uint32 timeout = ndb->get_waitfor_timeout();
2362  const Uint32 nodeId = m_transaction.getConnectedNodeId();
2363  const Uint32 seq = m_transaction.theNodeSequence;
2364 
2365  /* More results are on the way, so we wait for them.*/
2366  const FetchResult waitResult = static_cast<FetchResult>
2367  (poll_guard.wait_scan(3*timeout,
2368  nodeId,
2369  forceSend));
2370 
2371  if (ndb->getNodeSequence(nodeId) != seq)
2372  setFetchTerminated(Err_NodeFailCausedAbort,false);
2373  else if (likely(waitResult == FetchResult_ok))
2374  continue;
2375  else if (waitResult == FetchResult_timeOut)
2376  setFetchTerminated(Err_ReceiveTimedOut,false);
2377  else
2378  setFetchTerminated(Err_NodeFailCausedAbort,false);
2379 
2380  assert (m_state != Failed);
2381  } // while(!hasReceivedError())
2382  } // Terminates scope of 'PollGuard'
2383 
2384  // Fall through only if ::hasReceivedError()
2385  assert (m_error.code);
2386  return FetchResult_gotError;
2387  }
2388  else // is a Lookup query
2389  {
2390  /* The root operation is a lookup. Lookups are guaranteed to be complete
2391  * before NdbTransaction::execute() returns. Therefore we do not set
2392  * the lock, because we know that the signal receiver thread will not
2393  * be accessing m_rootFrags at this time.
2394  */
2395  m_applFrags.prepareMoreResults(m_rootFrags,m_rootFragCount);
2396  if (m_applFrags.getCurrent() != NULL)
2397  {
2398  return FetchResult_ok;
2399  }
2400 
2401  /* Getting here means that either:
2402  * - No results was returned (TCKEYREF)
2403  * - There was no matching row for an inner join.
2404  * - or, the application called nextResult() twice for a lookup query.
2405  */
2406  assert(m_pendingFrags == 0);
2407  assert(m_finalBatchFrags == getRootFragCount());
2408  return FetchResult_noMoreData;
2409  } // if(getQueryDef().isScanQuery())
2410 
2411 } //NdbQueryImpl::awaitMoreResults
2412 
2413 
2414 /*
2415  ::handleBatchComplete() is intended to be called when receiving signals only.
2416  The PollGuard mutex is then set and the shared 'm_pendingFrags' and
2417  'm_finalBatchFrags' can safely be updated and ::setReceivedMore() signaled.
2418 
2419  returns: 'true' when application thread should be resumed.
2420 */
2421 bool
2422 NdbQueryImpl::handleBatchComplete(NdbRootFragment& rootFrag)
2423 {
2424  if (traceSignals) {
2425  ndbout << "NdbQueryImpl::handleBatchComplete"
2426  << ", fragNo=" << rootFrag.getFragNo()
2427  << ", pendingFrags=" << (m_pendingFrags-1)
2428  << ", finalBatchFrags=" << m_finalBatchFrags
2429  << endl;
2430  }
2431 
2432  /* May received fragment data after a SCANREF() (timeout?)
2433  * terminated the scan. We are about to close this query,
2434  * and didn't expect any more data - ignore it!
2435  */
2436  if (likely(m_errorReceived == 0))
2437  {
2438  assert(rootFrag.isFragBatchComplete());
2439 
2440  assert(m_pendingFrags > 0); // Check against underflow.
2441  assert(m_pendingFrags <= m_rootFragCount); // .... and overflow
2442  m_pendingFrags--;
2443 
2444  if (rootFrag.finalBatchReceived())
2445  {
2446  m_finalBatchFrags++;
2447  assert(m_finalBatchFrags <= m_rootFragCount);
2448  }
2449 
2450  /* When application thread ::awaitMoreResults() it will later be
2451  * added to m_applFrags under mutex protection.
2452  */
2453  rootFrag.setReceivedMore();
2454  return true;
2455  }
2456 
2457  return false;
2458 } // NdbQueryImpl::handleBatchComplete
2459 
2460 int
2461 NdbQueryImpl::close(bool forceSend)
2462 {
2463  int res = 0;
2464 
2465  assert (m_state >= Initial && m_state < Destructed);
2466  if (m_state != Closed)
2467  {
2468  if (m_tcState != Inactive)
2469  {
2470  /* We have started a scan, but we have not yet received the last batch
2471  * for all root fragments. We must therefore close the scan to release
2472  * the scan context at TC.*/
2473  res = closeTcCursor(forceSend);
2474  }
2475 
2476  // Throw any pending results
2477  NdbRootFragment::clear(m_rootFrags,m_rootFragCount);
2478  m_applFrags.clear();
2479 
2480  Ndb* const ndb = m_transaction.getNdb();
2481  if (m_scanTransaction != NULL)
2482  {
2483  assert (m_state != Closed);
2484  assert (m_scanTransaction->m_scanningQuery == this);
2485  m_scanTransaction->m_scanningQuery = NULL;
2486  ndb->closeTransaction(m_scanTransaction);
2487  ndb->theRemainingStartTransactions--; // Compensate; m_scanTransaction was not a real Txn
2488  m_scanTransaction = NULL;
2489  }
2490 
2491  postFetchRelease();
2492  m_state = Closed; // Even if it was previously 'Failed' it is closed now!
2493  }
2494 
2499  m_queryDef= NULL;
2500 
2501  return res;
2502 } //NdbQueryImpl::close
2503 
2504 
2505 void
2507 {
2508  assert (m_state >= Initial && m_state < Destructed);
2509  if (m_state != Closed) {
2510  close(true); // Ignore any errors, explicit ::close() first if errors are of interest
2511  }
2512 
2513  delete this;
2514 }
2515 
2516 void
2518 {
2519  assert (aErrorCode!=0);
2520  m_error.code = aErrorCode;
2521  m_transaction.theErrorLine = 0;
2522  m_transaction.theErrorOperation = NULL;
2523 
2524  switch(aErrorCode)
2525  {
2526  // Not realy an error. A root lookup found no match.
2527  case Err_TupleNotFound:
2528  // Simple or dirty read failed due to node failure. Transaction will be aborted.
2529  case Err_SimpleDirtyReadFailed:
2534  case Err_FunctionNotImplemented:
2535  case Err_UnknownColumn:
2536  case Err_WrongFieldLength:
2537  case Err_InvalidRangeNo:
2538  case Err_DifferentTabForKeyRecAndAttrRec:
2539  case Err_KeyIsNULL:
2540  case QRY_REQ_ARG_IS_NULL:
2541  case QRY_PARAMETER_HAS_WRONG_TYPE:
2542  case QRY_RESULT_ROW_ALREADY_DEFINED:
2543  case QRY_CHAR_OPERAND_TRUNCATED:
2544  case QRY_WRONG_OPERATION_TYPE:
2545  case QRY_SEQUENTIAL_SCAN_SORTED:
2546  case QRY_SCAN_ORDER_ALREADY_SET:
2547  case QRY_MULTIPLE_SCAN_SORTED:
2548  m_transaction.setOperationErrorCode(aErrorCode);
2549  break;
2550 
2551  // For any other error, abort the transaction.
2552  default:
2553  m_state = Failed;
2554  m_transaction.setOperationErrorCodeAbort(aErrorCode);
2555  break;
2556  }
2557 }
2558 
2559 /*
2560  * ::setFetchTerminated() Should only be called with mutex locked.
2561  * Register result fetching as completed (possibly prematurely, w/ errorCode).
2562  */
2563 void
2564 NdbQueryImpl::setFetchTerminated(int errorCode, bool needClose)
2565 {
2566  assert(m_finalBatchFrags < getRootFragCount());
2567  if (!needClose)
2568  {
2569  m_finalBatchFrags = getRootFragCount();
2570  }
2571  if (errorCode!=0)
2572  {
2573  m_errorReceived = errorCode;
2574  }
2575  m_pendingFrags = 0;
2576 } // NdbQueryImpl::setFetchTerminated()
2577 
2578 
2579 /* There may be pending (asynchronous received, mutex protected) errors
2580  * from TC / datanodes. Propogate these into 'API space'.
2581  * ::hasReceivedError() Should only be called with mutex locked
2582  */
2583 bool
2584 NdbQueryImpl::hasReceivedError()
2585 {
2586  if (unlikely(m_errorReceived))
2587  {
2588  setErrorCode(m_errorReceived);
2589  return true;
2590  }
2591  return false;
2592 } // NdbQueryImpl::hasReceivedError
2593 
2594 
2595 bool
2597 {
2598  if (traceSignals) {
2599  ndbout << "NdbQueryImpl::execTCKEYCONF()" << endl;
2600  }
2601  assert(!getQueryDef().isScanQuery());
2602  NdbRootFragment& rootFrag = m_rootFrags[0];
2603 
2604  // We will get 1 + #leaf-nodes TCKEYCONF for a lookup...
2605  rootFrag.setConfReceived(RNIL);
2606  rootFrag.incrOutstandingResults(-1);
2607 
2608  bool ret = false;
2609  if (rootFrag.isFragBatchComplete())
2610  {
2611  ret = handleBatchComplete(rootFrag);
2612  }
2613 
2614  if (traceSignals) {
2615  ndbout << "NdbQueryImpl::execTCKEYCONF(): returns:" << ret
2616  << ", m_pendingFrags=" << m_pendingFrags
2617  << ", rootStream= {" << rootFrag.getResultStream(0) << "}"
2618  << endl;
2619  }
2620  return ret;
2621 } // NdbQueryImpl::execTCKEYCONF
2622 
2623 void
2624 NdbQueryImpl::execCLOSE_SCAN_REP(int errorCode, bool needClose)
2625 {
2626  if (traceSignals)
2627  {
2628  ndbout << "NdbQueryImpl::execCLOSE_SCAN_REP()" << endl;
2629  }
2630  setFetchTerminated(errorCode,needClose);
2631 }
2632 
2633 int
2635 {
2636  if (unlikely(m_state != Defined)) {
2637  assert (m_state >= Initial && m_state < Destructed);
2638  if (m_state == Failed)
2639  setErrorCode(QRY_IN_ERROR_STATE);
2640  else
2641  setErrorCode(QRY_ILLEGAL_STATE);
2642  DEBUG_CRASH();
2643  return -1;
2644  }
2645 
2646  // Determine execution parameters 'batch size'.
2647  // May be user specified (TODO), and/or, limited/specified by config values
2648  //
2649  if (getQueryDef().isScanQuery())
2650  {
2651  /* For the first batch, we read from all fragments for both ordered
2652  * and unordered scans.*/
2653  if (getQueryOperation(0U).m_parallelism == Parallelism_max)
2654  {
2655  m_rootFragCount
2656  = getRoot().getQueryOperationDef().getTable().getFragmentCount();
2657  }
2658  else
2659  {
2660  assert(getQueryOperation(0U).m_parallelism != Parallelism_adaptive);
2661  m_rootFragCount
2662  = MIN(getRoot().getQueryOperationDef().getTable().getFragmentCount(),
2663  getQueryOperation(0U).m_parallelism);
2664  }
2665  Ndb* const ndb = m_transaction.getNdb();
2666 
2670  ndb->theRemainingStartTransactions++; // Compensate; does not start a real Txn
2671  NdbTransaction *scanTxn = ndb->hupp(&m_transaction);
2672  if (scanTxn==NULL) {
2673  ndb->theRemainingStartTransactions--;
2674  m_transaction.setOperationErrorCodeAbort(ndb->getNdbError().code);
2675  return -1;
2676  }
2677  scanTxn->theMagicNumber = 0x37412619;
2678  scanTxn->m_scanningQuery = this;
2679  this->m_scanTransaction = scanTxn;
2680  }
2681  else // Lookup query
2682  {
2683  m_rootFragCount = 1;
2684  }
2685 
2686  int error = m_resultStreamAlloc.init(m_rootFragCount * getNoOfOperations());
2687  if (error != 0)
2688  {
2689  setErrorCode(error);
2690  return -1;
2691  }
2692  // Allocate space for ptrs to NdbResultStream and NdbRootFragment objects.
2693  error = m_pointerAlloc.init(m_rootFragCount *
2694  (OrderedFragSet::pointersPerFragment));
2695  if (error != 0)
2696  {
2697  setErrorCode(error);
2698  return -1;
2699  }
2700 
2701  // Some preparation for later batchsize calculations pr. (sub) scan
2702  getRoot().calculateBatchedRows(NULL);
2703  getRoot().setBatchedRows(1);
2704 
2709  Uint32 totalBuffSize = 0;
2710  for (Uint32 opNo = 0; opNo < getNoOfOperations(); opNo++)
2711  {
2712  const NdbQueryOperationImpl& op = getQueryOperation(opNo);
2713  // Add space for m_correlations, m_buffer & m_batchOverflowCheck
2714  totalBuffSize += (sizeof(TupleCorrelation) * op.getMaxBatchRows());
2715  totalBuffSize += (op.getRowSize() * op.getMaxBatchRows());
2716  totalBuffSize += sizeof(Uint32); // Overflow check
2717  }
2718 
2719  m_rowBufferAlloc.init(m_rootFragCount * totalBuffSize);
2720  if (getQueryDef().isScanQuery())
2721  {
2722  Uint32 totalRows = 0;
2723  for (Uint32 i = 0; i<getNoOfOperations(); i++)
2724  {
2725  totalRows += getQueryOperation(i).getMaxBatchRows();
2726  }
2727  error = m_tupleSetAlloc.init(m_rootFragCount * totalRows);
2728  if (unlikely(error != 0))
2729  {
2730  setErrorCode(error);
2731  return -1;
2732  }
2733  }
2734 
2740  m_rootFrags = new NdbRootFragment[m_rootFragCount];
2741  if (m_rootFrags == NULL)
2742  {
2743  setErrorCode(Err_MemoryAlloc);
2744  return -1;
2745  }
2746  for (Uint32 i = 0; i<m_rootFragCount; i++)
2747  {
2748  m_rootFrags[i].init(*this, i); // Set fragment number.
2749  }
2750 
2751  // Fill in parameters (into ATTRINFO) for QueryTree.
2752  for (Uint32 i = 0; i < m_countOperations; i++) {
2753  const int error = m_operations[i].prepareAttrInfo(m_attrInfo);
2754  if (unlikely(error))
2755  {
2756  setErrorCode(error);
2757  return -1;
2758  }
2759  }
2760 
2761  if (unlikely(m_attrInfo.isMemoryExhausted() || m_keyInfo.isMemoryExhausted())) {
2762  setErrorCode(Err_MemoryAlloc);
2763  return -1;
2764  }
2765 
2766  if (unlikely(m_attrInfo.getSize() > ScanTabReq::MaxTotalAttrInfo ||
2767  m_keyInfo.getSize() > ScanTabReq::MaxTotalAttrInfo)) {
2768  setErrorCode(Err_ReadTooMuch); // TODO: find a more suitable errorcode,
2769  return -1;
2770  }
2771 
2772  // Setup m_applStreams and m_fullStreams for receiving results
2773  const NdbRecord* keyRec = NULL;
2774  if(getRoot().getQueryOperationDef().getIndex()!=NULL)
2775  {
2776  /* keyRec is needed for comparing records when doing ordered index scans.*/
2777  keyRec = getRoot().getQueryOperationDef().getIndex()->getDefaultRecord();
2778  assert(keyRec!=NULL);
2779  }
2780  m_applFrags.prepare(m_pointerAlloc,
2781  getRoot().getOrdering(),
2782  m_rootFragCount,
2783  keyRec,
2784  getRoot().m_ndbRecord);
2785 
2786  if (getQueryDef().isScanQuery())
2787  {
2788  NdbRootFragment::buildReciverIdMap(m_rootFrags, m_rootFragCount);
2789  }
2790 
2791 #ifdef TRACE_SERIALIZATION
2792  ndbout << "Serialized ATTRINFO : ";
2793  for(Uint32 i = 0; i < m_attrInfo.getSize(); i++){
2794  char buf[12];
2795  sprintf(buf, "%.8x", m_attrInfo.get(i));
2796  ndbout << buf << " ";
2797  }
2798  ndbout << endl;
2799 #endif
2800 
2801  assert (m_pendingFrags==0);
2802  m_state = Prepared;
2803  return 0;
2804 } // NdbQueryImpl::prepareSend
2805 
2806 
2807 
2811 {
2812 public:
2813 
2815  Uint32 cnt)
2816  :m_rootFrags(rootFrags),
2817  m_fragCount(cnt),
2818  m_currFragNo(0)
2819  {}
2820 
2821  virtual ~InitialReceiverIdIterator() {};
2822 
2829  virtual const Uint32* getNextWords(Uint32& sz);
2830 
2831  virtual void reset()
2832  { m_currFragNo = 0;};
2833 
2834 private:
2840  static const Uint32 bufSize = 16;
2841 
2843  NdbRootFragment* m_rootFrags;
2844  const Uint32 m_fragCount;
2845 
2848  Uint32 m_currFragNo;
2850  Uint32 m_receiverIds[bufSize];
2851 };
2852 
2854 {
2859  if (m_currFragNo >= m_fragCount)
2860  {
2861  sz = 0;
2862  return NULL;
2863  }
2864  else
2865  {
2866  Uint32 cnt = 0;
2867  while (cnt < bufSize && m_currFragNo < m_fragCount)
2868  {
2869  m_receiverIds[cnt] = m_rootFrags[m_currFragNo].getReceiverId();
2870  cnt++;
2871  m_currFragNo++;
2872  }
2873  sz = cnt;
2874  return m_receiverIds;
2875  }
2876 }
2877 
2881 {
2882 public:
2884  Uint32 cnt)
2885  :m_rootFrags(rootFrags),
2886  m_fragCount(cnt),
2887  m_currFragNo(0)
2888  {}
2889 
2890  virtual ~FetchMoreTcIdIterator() {};
2891 
2898  virtual const Uint32* getNextWords(Uint32& sz);
2899 
2900  virtual void reset()
2901  { m_currFragNo = 0;};
2902 
2903 private:
2909  static const Uint32 bufSize = 16;
2910 
2912  NdbRootFragment** m_rootFrags;
2913  const Uint32 m_fragCount;
2914 
2917  Uint32 m_currFragNo;
2919  Uint32 m_receiverIds[bufSize];
2920 };
2921 
2922 const Uint32* FetchMoreTcIdIterator::getNextWords(Uint32& sz)
2923 {
2928  if (m_currFragNo >= m_fragCount)
2929  {
2930  sz = 0;
2931  return NULL;
2932  }
2933  else
2934  {
2935  Uint32 cnt = 0;
2936  while (cnt < bufSize && m_currFragNo < m_fragCount)
2937  {
2938  m_receiverIds[cnt] = m_rootFrags[m_currFragNo]->getReceiverTcPtrI();
2939  cnt++;
2940  m_currFragNo++;
2941  }
2942  sz = cnt;
2943  return m_receiverIds;
2944  }
2945 }
2946 
2947 /******************************************************************************
2948 int doSend() Send serialized queryTree and parameters encapsulated in
2949  either a SCAN_TABREQ or TCKEYREQ to TC.
2950 
2951 NOTE: The TransporterFacade mutex is already set by callee.
2952 
2953 Return Value: Return >0 : send was succesful, returns number of signals sent
2954  Return -1: In all other case.
2955 Parameters: nodeId: Receiving processor node
2956 Remark: Send a TCKEYREQ or SCAN_TABREQ (long) signal depending of
2957  the query being either a lookup or scan type.
2958  KEYINFO and ATTRINFO are included as part of the long signal
2959 ******************************************************************************/
2960 int
2961 NdbQueryImpl::doSend(int nodeId, bool lastFlag)
2962 {
2963  if (unlikely(m_state != Prepared)) {
2964  assert (m_state >= Initial && m_state < Destructed);
2965  if (m_state == Failed)
2966  setErrorCode(QRY_IN_ERROR_STATE);
2967  else
2968  setErrorCode(QRY_ILLEGAL_STATE);
2969  DEBUG_CRASH();
2970  return -1;
2971  }
2972 
2973  Ndb& ndb = *m_transaction.getNdb();
2974  NdbImpl * impl = ndb.theImpl;
2975 
2976  const NdbQueryOperationImpl& root = getRoot();
2977  const NdbQueryOperationDefImpl& rootDef = root.getQueryOperationDef();
2978  const NdbTableImpl* const rootTable = rootDef.getIndex()
2979  ? rootDef.getIndex()->getIndexTable()
2980  : &rootDef.getTable();
2981 
2982  Uint32 tTableId = rootTable->m_id;
2983  Uint32 tSchemaVersion = rootTable->m_version;
2984 
2985  if (rootDef.isScanOperation())
2986  {
2987  Uint32 scan_flags = 0; // TODO: Specify with ScanOptions::SO_SCANFLAGS
2988 
2989  bool tupScan = (scan_flags & NdbScanOperation::SF_TupScan);
2990  bool rangeScan = false;
2991 
2992  bool dummy;
2993  const int error = isPrunable(dummy);
2994  if (unlikely(error != 0))
2995  return error;
2996 
2997  /* Handle IndexScan specifics */
2998  if ( (int) rootTable->m_indexType ==
3000  {
3001  rangeScan = true;
3002  tupScan = false;
3003  }
3004  const Uint32 descending =
3005  root.getOrdering()==NdbQueryOptions::ScanOrdering_descending ? 1 : 0;
3006  assert(descending==0 || (int) rootTable->m_indexType ==
3007  (int) NdbDictionary::Index::OrderedIndex);
3008 
3009  assert (root.getMaxBatchRows() > 0);
3010 
3011  NdbApiSignal tSignal(&ndb);
3012  tSignal.setSignal(GSN_SCAN_TABREQ, refToBlock(m_scanTransaction->m_tcRef));
3013 
3014  ScanTabReq * const scanTabReq = CAST_PTR(ScanTabReq, tSignal.getDataPtrSend());
3015  Uint32 reqInfo = 0;
3016 
3017  const Uint64 transId = m_scanTransaction->getTransactionId();
3018 
3019  scanTabReq->apiConnectPtr = m_scanTransaction->theTCConPtr;
3020  scanTabReq->buddyConPtr = m_scanTransaction->theBuddyConPtr; // 'buddy' refers 'real-transaction'->theTCConPtr
3021  scanTabReq->spare = 0; // Unused in later protocoll versions
3022  scanTabReq->tableId = tTableId;
3023  scanTabReq->tableSchemaVersion = tSchemaVersion;
3024  scanTabReq->storedProcId = 0xFFFF;
3025  scanTabReq->transId1 = (Uint32) transId;
3026  scanTabReq->transId2 = (Uint32) (transId >> 32);
3027 
3028  Uint32 batchRows = root.getMaxBatchRows();
3029  Uint32 batchByteSize, firstBatchRows;
3030  NdbReceiver::calculate_batch_size(* ndb.theImpl,
3031  root.m_ndbRecord,
3032  root.m_firstRecAttr,
3033  0, // Key size.
3034  getRootFragCount(),
3035  batchRows,
3036  batchByteSize,
3037  firstBatchRows);
3038  assert(batchRows==root.getMaxBatchRows());
3039  assert(batchRows==firstBatchRows);
3040  ScanTabReq::setScanBatch(reqInfo, batchRows);
3041  scanTabReq->batch_byte_size = batchByteSize;
3042  scanTabReq->first_batch_size = firstBatchRows;
3043 
3044  ScanTabReq::setViaSPJFlag(reqInfo, 1);
3045  ScanTabReq::setPassAllConfsFlag(reqInfo, 1);
3046  ScanTabReq::setParallelism(reqInfo, getRootFragCount());
3047  ScanTabReq::setRangeScanFlag(reqInfo, rangeScan);
3048  ScanTabReq::setDescendingFlag(reqInfo, descending);
3049  ScanTabReq::setTupScanFlag(reqInfo, tupScan);
3050  ScanTabReq::setNoDiskFlag(reqInfo, !root.diskInUserProjection());
3051  ScanTabReq::set4WordConf(reqInfo, 1);
3052 
3053  // Assume LockMode LM_ReadCommited, set related lock flags
3054  ScanTabReq::setLockMode(reqInfo, false); // not exclusive
3055  ScanTabReq::setHoldLockFlag(reqInfo, false);
3056  ScanTabReq::setReadCommittedFlag(reqInfo, true);
3057 
3058 // m_keyInfo = (scan_flags & NdbScanOperation::SF_KeyInfo) ? 1 : 0;
3059 
3060  // If scan is pruned, use optional 'distributionKey' to hold hashvalue
3061  if (m_prunability == Prune_Yes)
3062  {
3063 // printf("Build pruned SCANREQ, w/ hashValue:%d\n", hashValue);
3064  ScanTabReq::setDistributionKeyFlag(reqInfo, 1);
3065  scanTabReq->distributionKey= m_pruneHashVal;
3066  tSignal.setLength(ScanTabReq::StaticLength + 1);
3067  } else {
3068  tSignal.setLength(ScanTabReq::StaticLength);
3069  }
3070  scanTabReq->requestInfo = reqInfo;
3071 
3082  GenericSectionPtr secs[3];
3083  InitialReceiverIdIterator receiverIdIter(m_rootFrags, m_rootFragCount);
3084  LinearSectionIterator attrInfoIter(m_attrInfo.addr(), m_attrInfo.getSize());
3085  LinearSectionIterator keyInfoIter(m_keyInfo.addr(), m_keyInfo.getSize());
3086 
3087  secs[0].sectionIter= &receiverIdIter;
3088  secs[0].sz= getRootFragCount();
3089 
3090  secs[1].sectionIter= &attrInfoIter;
3091  secs[1].sz= m_attrInfo.getSize();
3092 
3093  Uint32 numSections= 2;
3094  if (m_keyInfo.getSize() > 0)
3095  {
3096  secs[2].sectionIter= &keyInfoIter;
3097  secs[2].sz= m_keyInfo.getSize();
3098  numSections= 3;
3099  }
3100 
3101  /* Send Fragmented as SCAN_TABREQ can be large */
3102  const int res = impl->sendFragmentedSignal(&tSignal, nodeId, secs, numSections);
3103  if (unlikely(res == -1))
3104  {
3105  setErrorCode(Err_SendFailed); // Error: 'Send to NDB failed'
3106  return FetchResult_sendFail;
3107  }
3108  m_tcState = Active;
3109 
3110  } else { // Lookup query
3111 
3112  NdbApiSignal tSignal(&ndb);
3113  tSignal.setSignal(GSN_TCKEYREQ, refToBlock(m_transaction.m_tcRef));
3114 
3115  TcKeyReq * const tcKeyReq = CAST_PTR(TcKeyReq, tSignal.getDataPtrSend());
3116 
3117  const Uint64 transId = m_transaction.getTransactionId();
3118  tcKeyReq->apiConnectPtr = m_transaction.theTCConPtr;
3119  tcKeyReq->apiOperationPtr = root.getIdOfReceiver();
3120  tcKeyReq->tableId = tTableId;
3121  tcKeyReq->tableSchemaVersion = tSchemaVersion;
3122  tcKeyReq->transId1 = (Uint32) transId;
3123  tcKeyReq->transId2 = (Uint32) (transId >> 32);
3124 
3125  Uint32 attrLen = 0;
3126  tcKeyReq->setAttrinfoLen(attrLen, 0); // Not required for long signals.
3127  tcKeyReq->setAPIVersion(attrLen, NDB_VERSION);
3128  tcKeyReq->attrLen = attrLen;
3129 
3130  Uint32 reqInfo = 0;
3131  Uint32 interpretedFlag= root.hasInterpretedCode() &&
3132  rootDef.getType() == NdbQueryOperationDef::PrimaryKeyAccess;
3133 
3134  TcKeyReq::setOperationType(reqInfo, NdbOperation::ReadRequest);
3135  TcKeyReq::setViaSPJFlag(reqInfo, true);
3136  TcKeyReq::setKeyLength(reqInfo, 0); // This is a long signal
3137  TcKeyReq::setAIInTcKeyReq(reqInfo, 0); // Not needed
3138  TcKeyReq::setInterpretedFlag(reqInfo, interpretedFlag);
3139  TcKeyReq::setStartFlag(reqInfo, m_startIndicator);
3140  TcKeyReq::setExecuteFlag(reqInfo, lastFlag);
3141  TcKeyReq::setNoDiskFlag(reqInfo, !root.diskInUserProjection());
3142  TcKeyReq::setAbortOption(reqInfo, NdbOperation::AO_IgnoreError);
3143 
3144  TcKeyReq::setDirtyFlag(reqInfo, true);
3145  TcKeyReq::setSimpleFlag(reqInfo, true);
3146  TcKeyReq::setCommitFlag(reqInfo, m_commitIndicator);
3147  tcKeyReq->requestInfo = reqInfo;
3148 
3149  tSignal.setLength(TcKeyReq::StaticLength);
3150 
3151 /****
3152  // Unused optional part located after TcKeyReq::StaticLength
3153  tcKeyReq->scanInfo = 0;
3154  tcKeyReq->distrGroupHashValue = 0;
3155  tcKeyReq->distributionKeySize = 0;
3156  tcKeyReq->storedProcId = 0xFFFF;
3157 ***/
3158 
3159 /**** TODO ... maybe - from NdbOperation::prepareSendNdbRecord(AbortOption ao)
3160  Uint8 abortOption= (ao == DefaultAbortOption) ?
3161  (Uint8) m_abortOption : (Uint8) ao;
3162 
3163  m_abortOption= theSimpleIndicator && theOperationType==ReadRequest ?
3164  (Uint8) AO_IgnoreError : (Uint8) abortOption;
3165 
3166  TcKeyReq::setAbortOption(reqInfo, m_abortOption);
3167  TcKeyReq::setCommitFlag(tcKeyReq->requestInfo, theCommitIndicator);
3168 *****/
3169 
3170  LinearSectionPtr secs[2];
3171  secs[TcKeyReq::KeyInfoSectionNum].p= m_keyInfo.addr();
3172  secs[TcKeyReq::KeyInfoSectionNum].sz= m_keyInfo.getSize();
3173  Uint32 numSections= 1;
3174 
3175  if (m_attrInfo.getSize() > 0)
3176  {
3177  secs[TcKeyReq::AttrInfoSectionNum].p= m_attrInfo.addr();
3178  secs[TcKeyReq::AttrInfoSectionNum].sz= m_attrInfo.getSize();
3179  numSections= 2;
3180  }
3181 
3182  const int res = impl->sendSignal(&tSignal, nodeId, secs, numSections);
3183  if (unlikely(res == -1))
3184  {
3185  setErrorCode(Err_SendFailed); // Error: 'Send to NDB failed'
3186  return FetchResult_sendFail;
3187  }
3188  m_transaction.OpSent();
3189  m_rootFrags[0].incrOutstandingResults(1 + getNoOfOperations() +
3191  } // if
3192 
3193  assert (m_pendingFrags==0);
3194  m_pendingFrags = m_rootFragCount;
3195 
3196  // Shrink memory footprint by removing structures not required after ::execute()
3197  m_keyInfo.releaseExtend();
3198  m_attrInfo.releaseExtend();
3199 
3200  // TODO: Release m_interpretedCode now?
3201 
3202  /* Todo : Consider calling NdbOperation::postExecuteRelease()
3203  * Ideally it should be called outside TP mutex, so not added
3204  * here yet
3205  */
3206 
3207  m_state = Executing;
3208  return 1;
3209 } // NdbQueryImpl::doSend()
3210 
3211 
3212 /******************************************************************************
3213 int sendFetchMore() - Fetch another scan batch, optionaly closing the scan
3214 
3215  Request another batch of rows to be retrieved from the scan.
3216 
3217 Return Value: 0 if send succeeded, -1 otherwise.
3218 Parameters: emptyFrag: Root frgament for which to ask for another batch.
3219 Remark:
3220 ******************************************************************************/
3221 int
3222 NdbQueryImpl::sendFetchMore(NdbRootFragment* rootFrags[],
3223  Uint32 cnt,
3224  bool forceSend)
3225 {
3226  assert(getQueryDef().isScanQuery());
3227 
3228  for (Uint32 i=0; i<cnt; i++)
3229  {
3230  NdbRootFragment* rootFrag = rootFrags[i];
3231  assert(rootFrag->isFragBatchComplete());
3232  assert(!rootFrag->finalBatchReceived());
3233  rootFrag->prepareNextReceiveSet();
3234  }
3235 
3236  Ndb& ndb = *getNdbTransaction().getNdb();
3237  NdbApiSignal tSignal(&ndb);
3238  tSignal.setSignal(GSN_SCAN_NEXTREQ, refToBlock(m_scanTransaction->m_tcRef));
3239  ScanNextReq * const scanNextReq =
3240  CAST_PTR(ScanNextReq, tSignal.getDataPtrSend());
3241 
3242  assert (m_scanTransaction);
3243  const Uint64 transId = m_scanTransaction->getTransactionId();
3244 
3245  scanNextReq->apiConnectPtr = m_scanTransaction->theTCConPtr;
3246  scanNextReq->stopScan = 0;
3247  scanNextReq->transId1 = (Uint32) transId;
3248  scanNextReq->transId2 = (Uint32) (transId >> 32);
3249  tSignal.setLength(ScanNextReq::SignalLength);
3250 
3251  FetchMoreTcIdIterator receiverIdIter(rootFrags, cnt);
3252 
3253  GenericSectionPtr secs[1];
3254  secs[ScanNextReq::ReceiverIdsSectionNum].sectionIter = &receiverIdIter;
3255  secs[ScanNextReq::ReceiverIdsSectionNum].sz = cnt;
3256 
3257  NdbImpl * impl = ndb.theImpl;
3258  Uint32 nodeId = m_transaction.getConnectedNodeId();
3259  Uint32 seq = m_transaction.theNodeSequence;
3260 
3261  /* This part needs to be done under mutex due to synchronization with
3262  * receiver thread.
3263  */
3264  PollGuard poll_guard(* impl);
3265 
3266  if (unlikely(hasReceivedError()))
3267  {
3268  // Errors arrived inbetween ::await released mutex, and sendFetchMore grabbed it
3269  return -1;
3270  }
3271  if (impl->getNodeSequence(nodeId) != seq ||
3272  impl->sendSignal(&tSignal, nodeId, secs, 1) != 0)
3273  {
3274  setErrorCode(Err_NodeFailCausedAbort);
3275  return -1;
3276  }
3277  impl->do_forceSend(forceSend);
3278 
3279  m_pendingFrags += cnt;
3280  assert(m_pendingFrags <= getRootFragCount());
3281 
3282  return 0;
3283 } // NdbQueryImpl::sendFetchMore()
3284 
3285 int
3286 NdbQueryImpl::closeTcCursor(bool forceSend)
3287 {
3288  assert (getQueryDef().isScanQuery());
3289 
3290  NdbImpl* const ndb = m_transaction.getNdb()->theImpl;
3291  const Uint32 timeout = ndb->get_waitfor_timeout();
3292  const Uint32 nodeId = m_transaction.getConnectedNodeId();
3293  const Uint32 seq = m_transaction.theNodeSequence;
3294 
3295  /* This part needs to be done under mutex due to synchronization with
3296  * receiver thread.
3297  */
3298  PollGuard poll_guard(*ndb);
3299 
3300  if (unlikely(ndb->getNodeSequence(nodeId) != seq))
3301  {
3302  setErrorCode(Err_NodeFailCausedAbort);
3303  return -1; // Transporter disconnected and reconnected, no need to close
3304  }
3305 
3306  /* Wait for outstanding scan results from current batch fetch */
3307  while (m_pendingFrags > 0)
3308  {
3309  const FetchResult result = static_cast<FetchResult>
3310  (poll_guard.wait_scan(3*timeout, nodeId, forceSend));
3311 
3312  if (unlikely(ndb->getNodeSequence(nodeId) != seq))
3313  setFetchTerminated(Err_NodeFailCausedAbort,false);
3314  else if (unlikely(result != FetchResult_ok))
3315  {
3316  if (result == FetchResult_timeOut)
3317  setFetchTerminated(Err_ReceiveTimedOut,false);
3318  else
3319  setFetchTerminated(Err_NodeFailCausedAbort,false);
3320  }
3321  if (hasReceivedError())
3322  {
3323  break;
3324  }
3325  } // while
3326 
3327  assert(m_pendingFrags==0);
3328  NdbRootFragment::clear(m_rootFrags,m_rootFragCount);
3329  m_errorReceived = 0; // Clear errors caused by previous fetching
3330  m_error.code = 0;
3331 
3332  if (m_finalBatchFrags < getRootFragCount()) // TC has an open scan cursor.
3333  {
3334  /* Send SCAN_NEXTREQ(close) */
3335  const int error = sendClose(m_transaction.getConnectedNodeId());
3336  if (unlikely(error))
3337  return error;
3338 
3339  assert(m_finalBatchFrags+m_pendingFrags==getRootFragCount());
3340 
3341  /* Wait for close to be confirmed: */
3342  while (m_pendingFrags > 0)
3343  {
3344  const FetchResult result = static_cast<FetchResult>
3345  (poll_guard.wait_scan(3*timeout, nodeId, forceSend));
3346 
3347  if (unlikely(ndb->getNodeSequence(nodeId) != seq))
3348  setFetchTerminated(Err_NodeFailCausedAbort,false);
3349  else if (unlikely(result != FetchResult_ok))
3350  {
3351  if (result == FetchResult_timeOut)
3352  setFetchTerminated(Err_ReceiveTimedOut,false);
3353  else
3354  setFetchTerminated(Err_NodeFailCausedAbort,false);
3355  }
3356  if (hasReceivedError())
3357  {
3358  break;
3359  }
3360  } // while
3361  } // if
3362 
3363  return 0;
3364 } //NdbQueryImpl::closeTcCursor
3365 
3366 
3367 /*
3368  * This method is called with the PollGuard mutex held on the transporter.
3369  */
3370 int
3371 NdbQueryImpl::sendClose(int nodeId)
3372 {
3373  assert(m_finalBatchFrags < getRootFragCount());
3374  m_pendingFrags = getRootFragCount() - m_finalBatchFrags;
3375 
3376  Ndb& ndb = *m_transaction.getNdb();
3377  NdbApiSignal tSignal(&ndb);
3378  tSignal.setSignal(GSN_SCAN_NEXTREQ, refToBlock(m_scanTransaction->m_tcRef));
3379  ScanNextReq * const scanNextReq = CAST_PTR(ScanNextReq, tSignal.getDataPtrSend());
3380 
3381  assert (m_scanTransaction);
3382  const Uint64 transId = m_scanTransaction->getTransactionId();
3383 
3384  scanNextReq->apiConnectPtr = m_scanTransaction->theTCConPtr;
3385  scanNextReq->stopScan = true;
3386  scanNextReq->transId1 = (Uint32) transId;
3387  scanNextReq->transId2 = (Uint32) (transId >> 32);
3388  tSignal.setLength(ScanNextReq::SignalLength);
3389 
3390  NdbImpl * impl = ndb.theImpl;
3391  return impl->sendSignal(&tSignal, nodeId);
3392 
3393 } // NdbQueryImpl::sendClose()
3394 
3395 
3396 int NdbQueryImpl::isPrunable(bool& prunable)
3397 {
3398  if (m_prunability == Prune_Unknown)
3399  {
3400  const int error = getRoot().getQueryOperationDef()
3401  .checkPrunable(m_keyInfo, m_shortestBound, prunable, m_pruneHashVal);
3402  if (unlikely(error != 0))
3403  {
3404  prunable = false;
3405  setErrorCode(error);
3406  return -1;
3407  }
3408  m_prunability = prunable ? Prune_Yes : Prune_No;
3409  }
3410  prunable = (m_prunability == Prune_Yes);
3411  return 0;
3412 }
3413 
3414 
3415 /****************
3416  * NdbQueryImpl::OrderedFragSet methods.
3417  ***************/
3418 
3419 NdbQueryImpl::OrderedFragSet::OrderedFragSet():
3420  m_capacity(0),
3421  m_activeFragCount(0),
3422  m_fetchMoreFragCount(0),
3423  m_finalFragCount(0),
3424  m_ordering(NdbQueryOptions::ScanOrdering_void),
3425  m_keyRecord(NULL),
3426  m_resultRecord(NULL),
3427  m_activeFrags(NULL),
3428  m_fetchMoreFrags(NULL)
3429 {
3430 }
3431 
3432 NdbQueryImpl::OrderedFragSet::~OrderedFragSet()
3433 {
3434  m_activeFrags = NULL;
3435  m_fetchMoreFrags = NULL;
3436 }
3437 
3438 void NdbQueryImpl::OrderedFragSet::clear()
3439 {
3440  m_activeFragCount = 0;
3441  m_fetchMoreFragCount = 0;
3442 }
3443 
3444 void
3445 NdbQueryImpl::OrderedFragSet::prepare(NdbBulkAllocator& allocator,
3447  int capacity,
3448  const NdbRecord* keyRecord,
3449  const NdbRecord* resultRecord)
3450 {
3451  assert(m_activeFrags==NULL);
3452  assert(m_capacity==0);
3453  assert(ordering!=NdbQueryOptions::ScanOrdering_void);
3454 
3455  if (capacity > 0)
3456  {
3457  m_capacity = capacity;
3458 
3459  m_activeFrags =
3460  reinterpret_cast<NdbRootFragment**>(allocator.allocObjMem(capacity));
3461  bzero(m_activeFrags, capacity * sizeof(NdbRootFragment*));
3462 
3463  m_fetchMoreFrags =
3464  reinterpret_cast<NdbRootFragment**>(allocator.allocObjMem(capacity));
3465  bzero(m_fetchMoreFrags, capacity * sizeof(NdbRootFragment*));
3466  }
3467  m_ordering = ordering;
3468  m_keyRecord = keyRecord;
3469  m_resultRecord = resultRecord;
3470 } // OrderedFragSet::prepare()
3471 
3472 
3481 NdbQueryImpl::OrderedFragSet::getCurrent() const
3482 {
3484  {
3489  if (unlikely(m_activeFragCount+m_finalFragCount < m_capacity))
3490  {
3491  return NULL;
3492  }
3493  }
3494 
3495  if (unlikely(m_activeFragCount==0))
3496  {
3497  return NULL;
3498  }
3499  else
3500  {
3501  assert(!m_activeFrags[m_activeFragCount-1]->isEmpty());
3502  return m_activeFrags[m_activeFragCount-1];
3503  }
3504 } // OrderedFragSet::getCurrent()
3505 
3513 void
3514 NdbQueryImpl::OrderedFragSet::reorganize()
3515 {
3516  assert(m_activeFragCount > 0);
3517  NdbRootFragment* const frag = m_activeFrags[m_activeFragCount-1];
3518 
3519  // Remove the current fragment if the batch has been emptied.
3520  if (frag->isEmpty())
3521  {
3522  if (frag->finalBatchReceived())
3523  {
3524  m_finalFragCount++;
3525  }
3526  else
3527  {
3528  m_fetchMoreFrags[m_fetchMoreFragCount++] = frag;
3529  }
3530  m_activeFragCount--;
3531  assert(m_activeFragCount + m_fetchMoreFragCount + m_finalFragCount
3532  <= m_capacity);
3533 
3534  return; // Remaining m_activeFrags[] are sorted
3535  }
3536 
3537  // Reorder fragments if this is a sorted scan.
3539  {
3547  int first = 0;
3548  int last = m_activeFragCount-1;
3549  int middle = (first+last)/2;
3550 
3551  while (first<last)
3552  {
3553  assert(middle<m_activeFragCount);
3554  const int cmpRes = compare(*frag, *m_activeFrags[middle]);
3555  if (cmpRes < 0)
3556  {
3557  first = middle + 1;
3558  }
3559  else if (cmpRes == 0)
3560  {
3561  last = first = middle;
3562  }
3563  else
3564  {
3565  last = middle;
3566  }
3567  middle = (first+last)/2;
3568  }
3569 
3570  // Move into correct sorted position
3571  if (middle < m_activeFragCount-1)
3572  {
3573  assert(compare(*frag, *m_activeFrags[middle]) >= 0);
3574  memmove(m_activeFrags+middle+1,
3575  m_activeFrags+middle,
3576  (m_activeFragCount - middle - 1) * sizeof(NdbRootFragment*));
3577  m_activeFrags[middle] = frag;
3578  }
3579  assert(verifySortOrder());
3580  }
3581  assert(m_activeFragCount + m_fetchMoreFragCount + m_finalFragCount
3582  <= m_capacity);
3583 } // OrderedFragSet::reorganize()
3584 
3585 void
3586 NdbQueryImpl::OrderedFragSet::add(NdbRootFragment& frag)
3587 {
3588  assert(m_activeFragCount+m_finalFragCount < m_capacity);
3589 
3590  m_activeFrags[m_activeFragCount++] = &frag; // Add avail fragment
3591  reorganize(); // Move into position
3592 } // OrderedFragSet::add()
3593 
3594 void
3595 NdbQueryImpl::OrderedFragSet::prepareMoreResults(NdbRootFragment rootFrags[], Uint32 cnt)
3596 {
3597  for (Uint32 fragNo = 0; fragNo < cnt; fragNo++)
3598  {
3599  NdbRootFragment& rootFrag = rootFrags[fragNo];
3600  if (rootFrag.hasReceivedMore()) // Another ResultSet is available
3601  {
3602  rootFrag.grabNextResultSet(); // Get new ResultSet.
3603  add(rootFrag); // Make avail. to appl. thread
3604  }
3605  } // for all 'rootFrags[]'
3606 
3607  assert(m_activeFragCount + m_fetchMoreFragCount + m_finalFragCount
3608  <= m_capacity);
3609 } // OrderedFragSet::prepareMoreResults()
3610 
3611 Uint32
3612 NdbQueryImpl::OrderedFragSet::getFetchMore(NdbRootFragment** &frags)
3613 {
3614  const int cnt = m_fetchMoreFragCount;
3615  frags = m_fetchMoreFrags;
3616  m_fetchMoreFragCount = 0;
3617  return cnt;
3618 }
3619 
3620 bool
3621 NdbQueryImpl::OrderedFragSet::verifySortOrder() const
3622 {
3623  for (int i = 0; i<m_activeFragCount-1; i++)
3624  {
3625  if (compare(*m_activeFrags[i], *m_activeFrags[i+1]) < 0)
3626  {
3627  assert(false);
3628  return false;
3629  }
3630  }
3631  return true;
3632 }
3633 
3639 int
3641  const NdbRootFragment& frag2) const
3642 {
3643  assert(m_ordering!=NdbQueryOptions::ScanOrdering_unordered);
3644 
3645  /* f1<f2 if f1 is empty but f2 is not.*/
3646  if(frag1.isEmpty())
3647  {
3648  if(!frag2.isEmpty())
3649  {
3650  return -1;
3651  }
3652  else
3653  {
3654  return 0;
3655  }
3656  }
3657 
3658  /* Neither stream is empty so we must compare records.*/
3659  return compare_ndbrecord(&frag1.getResultStream(0).getReceiver(),
3660  &frag2.getResultStream(0).getReceiver(),
3661  m_keyRecord,
3662  m_resultRecord,
3663  m_ordering
3664  == NdbQueryOptions::ScanOrdering_descending,
3665  false);
3666 }
3667 
3668 
3669 
3673 
3674 NdbQueryOperationImpl::NdbQueryOperationImpl(
3675  NdbQueryImpl& queryImpl,
3676  const NdbQueryOperationDefImpl& def):
3677  m_interface(*this),
3678  m_magic(MAGIC),
3679  m_queryImpl(queryImpl),
3680  m_operationDef(def),
3681  m_parent(NULL),
3682  m_children(def.getNoOfChildOperations()),
3683  m_maxBatchRows(0), // >0: User specified prefered value, ==0: Use default CFG values
3684  m_params(),
3685  m_resultBuffer(NULL),
3686  m_resultRef(NULL),
3687  m_isRowNull(true),
3688  m_ndbRecord(NULL),
3689  m_read_mask(NULL),
3690  m_firstRecAttr(NULL),
3691  m_lastRecAttr(NULL),
3692  m_ordering(NdbQueryOptions::ScanOrdering_unordered),
3693  m_interpretedCode(NULL),
3694  m_diskInUserProjection(false),
3695  m_parallelism(def.getQueryOperationIx() == 0
3696  ? Parallelism_max : Parallelism_adaptive),
3697  m_rowSize(0xffffffff)
3698 {
3699  if (errno == ENOMEM)
3700  {
3701  // Memory allocation in Vector() (for m_children) assumed to have failed.
3702  queryImpl.setErrorCode(Err_MemoryAlloc);
3703  return;
3704  }
3705  // Fill in operations parent refs, and append it as child of its parent
3706  const NdbQueryOperationDefImpl* parent = def.getParentOperation();
3707  if (parent != NULL)
3708  {
3709  const Uint32 ix = parent->getQueryOperationIx();
3710  assert (ix < m_queryImpl.getNoOfOperations());
3711  m_parent = &m_queryImpl.getQueryOperation(ix);
3712  const int res = m_parent->m_children.push_back(this);
3713  UNUSED(res);
3718  assert(res == 0);
3719  }
3720  if (def.getType()==NdbQueryOperationDef::OrderedIndexScan)
3721  {
3722  const NdbQueryOptions::ScanOrdering defOrdering =
3723  static_cast<const NdbQueryIndexScanOperationDefImpl&>(def).getOrdering();
3724  if (defOrdering != NdbQueryOptions::ScanOrdering_void)
3725  {
3726  // Use value from definition, if one was set.
3727  m_ordering = defOrdering;
3728  }
3729  }
3730 }
3731 
3732 NdbQueryOperationImpl::~NdbQueryOperationImpl()
3733 {
3738  assert (m_firstRecAttr == NULL);
3739  assert (m_interpretedCode == NULL);
3740 } //NdbQueryOperationImpl::~NdbQueryOperationImpl()
3741 
3746 void
3747 NdbQueryOperationImpl::postFetchRelease()
3748 {
3749  Ndb* const ndb = m_queryImpl.getNdbTransaction().getNdb();
3750  NdbRecAttr* recAttr = m_firstRecAttr;
3751  while (recAttr != NULL) {
3752  NdbRecAttr* saveRecAttr = recAttr;
3753  recAttr = recAttr->next();
3754  ndb->releaseRecAttr(saveRecAttr);
3755  }
3756  m_firstRecAttr = NULL;
3757 
3758  // Set API exposed info to indicate NULL-row
3759  m_isRowNull = true;
3760  if (m_resultRef!=NULL) {
3761  *m_resultRef = NULL;
3762  }
3763 
3764  // TODO: Consider if interpretedCode can be deleted imm. after ::doSend
3765  delete m_interpretedCode;
3766  m_interpretedCode = NULL;
3767 } //NdbQueryOperationImpl::postFetchRelease()
3768 
3769 
3770 Uint32
3771 NdbQueryOperationImpl::getNoOfParentOperations() const
3772 {
3773  return (m_parent) ? 1 : 0;
3774 }
3775 
3777 NdbQueryOperationImpl::getParentOperation(Uint32 i) const
3778 {
3779  assert(i==0 && m_parent!=NULL);
3780  return *m_parent;
3781 }
3783 NdbQueryOperationImpl::getParentOperation() const
3784 {
3785  return m_parent;
3786 }
3787 
3788 Uint32
3789 NdbQueryOperationImpl::getNoOfChildOperations() const
3790 {
3791  return m_children.size();
3792 }
3793 
3795 NdbQueryOperationImpl::getChildOperation(Uint32 i) const
3796 {
3797  return *m_children[i];
3798 }
3799 
3800 Int32 NdbQueryOperationImpl::getNoOfDescendantOperations() const
3801 {
3802  Int32 children = 0;
3803 
3804  for (unsigned i = 0; i < getNoOfChildOperations(); i++)
3805  children += 1 + getChildOperation(i).getNoOfDescendantOperations();
3806 
3807  return children;
3808 }
3809 
3810 Uint32
3811 NdbQueryOperationImpl::getNoOfLeafOperations() const
3812 {
3813  if (getNoOfChildOperations() == 0)
3814  {
3815  return 1;
3816  }
3817  else
3818  {
3819  Uint32 sum = 0;
3820  for (unsigned i = 0; i < getNoOfChildOperations(); i++)
3821  sum += getChildOperation(i).getNoOfLeafOperations();
3822 
3823  return sum;
3824  }
3825 }
3826 
3827 NdbRecAttr*
3828 NdbQueryOperationImpl::getValue(
3829  const char* anAttrName,
3830  char* resultBuffer)
3831 {
3832  const NdbColumnImpl* const column
3833  = m_operationDef.getTable().getColumn(anAttrName);
3834  if(unlikely(column==NULL)){
3835  getQuery().setErrorCode(Err_UnknownColumn);
3836  return NULL;
3837  } else {
3838  return getValue(*column, resultBuffer);
3839  }
3840 }
3841 
3842 NdbRecAttr*
3843 NdbQueryOperationImpl::getValue(
3844  Uint32 anAttrId,
3845  char* resultBuffer)
3846 {
3847  const NdbColumnImpl* const column
3848  = m_operationDef.getTable().getColumn(anAttrId);
3849  if(unlikely(column==NULL)){
3850  getQuery().setErrorCode(Err_UnknownColumn);
3851  return NULL;
3852  } else {
3853  return getValue(*column, resultBuffer);
3854  }
3855 }
3856 
3857 NdbRecAttr*
3858 NdbQueryOperationImpl::getValue(
3859  const NdbColumnImpl& column,
3860  char* resultBuffer)
3861 {
3862  if (unlikely(getQuery().m_state != NdbQueryImpl::Defined)) {
3863  int state = getQuery().m_state;
3864  assert (state >= NdbQueryImpl::Initial && state < NdbQueryImpl::Destructed);
3865 
3866  if (state == NdbQueryImpl::Failed)
3867  getQuery().setErrorCode(QRY_IN_ERROR_STATE);
3868  else
3869  getQuery().setErrorCode(QRY_ILLEGAL_STATE);
3870  DEBUG_CRASH();
3871  return NULL;
3872  }
3873  Ndb* const ndb = getQuery().getNdbTransaction().getNdb();
3874  NdbRecAttr* const recAttr = ndb->getRecAttr();
3875  if(unlikely(recAttr == NULL)) {
3876  getQuery().setErrorCode(Err_MemoryAlloc);
3877  return NULL;
3878  }
3879  if(unlikely(recAttr->setup(&column, resultBuffer))) {
3880  ndb->releaseRecAttr(recAttr);
3881  getQuery().setErrorCode(Err_MemoryAlloc);
3882  return NULL;
3883  }
3884  // Append to tail of list.
3885  if(m_firstRecAttr == NULL){
3886  m_firstRecAttr = recAttr;
3887  }else{
3888  m_lastRecAttr->next(recAttr);
3889  }
3890  m_lastRecAttr = recAttr;
3891  assert(recAttr->next()==NULL);
3892  return recAttr;
3893 }
3894 
3895 int
3896 NdbQueryOperationImpl::setResultRowBuf (
3897  const NdbRecord *rec,
3898  char* resBuffer,
3899  const unsigned char* result_mask)
3900 {
3901  if (unlikely(rec==0)) {
3902  getQuery().setErrorCode(QRY_REQ_ARG_IS_NULL);
3903  return -1;
3904  }
3905  if (unlikely(getQuery().m_state != NdbQueryImpl::Defined)) {
3906  int state = getQuery().m_state;
3907  assert (state >= NdbQueryImpl::Initial && state < NdbQueryImpl::Destructed);
3908 
3909  if (state == NdbQueryImpl::Failed)
3910  getQuery().setErrorCode(QRY_IN_ERROR_STATE);
3911  else
3912  getQuery().setErrorCode(QRY_ILLEGAL_STATE);
3913  DEBUG_CRASH();
3914  return -1;
3915  }
3916  if (rec->tableId !=
3917  static_cast<Uint32>(m_operationDef.getTable().getTableId())){
3918  /* The key_record and attribute_record in primary key operation do not
3919  belong to the same table.*/
3920  getQuery().setErrorCode(Err_DifferentTabForKeyRecAndAttrRec);
3921  return -1;
3922  }
3923  if (unlikely(m_ndbRecord != NULL)) {
3924  getQuery().setErrorCode(QRY_RESULT_ROW_ALREADY_DEFINED);
3925  return -1;
3926  }
3927  m_ndbRecord = rec;
3928  m_read_mask = result_mask;
3929  m_resultBuffer = resBuffer;
3930  return 0;
3931 }
3932 
3933 int
3934 NdbQueryOperationImpl::setResultRowRef (
3935  const NdbRecord* rec,
3936  const char* & bufRef,
3937  const unsigned char* result_mask)
3938 {
3939  m_resultRef = &bufRef;
3940  *m_resultRef = NULL; // No result row yet
3941  return setResultRowBuf(rec, NULL, result_mask);
3942 }
3943 
3945 NdbQueryOperationImpl::firstResult()
3946 {
3947  if (unlikely(getQuery().m_state < NdbQueryImpl::Executing ||
3948  getQuery().m_state >= NdbQueryImpl::Closed)) {
3949  int state = getQuery().m_state;
3950  assert (state >= NdbQueryImpl::Initial && state < NdbQueryImpl::Destructed);
3951  if (state == NdbQueryImpl::Failed)
3952  getQuery().setErrorCode(QRY_IN_ERROR_STATE);
3953  else
3954  getQuery().setErrorCode(QRY_ILLEGAL_STATE);
3955  DEBUG_CRASH();
3956  return NdbQuery::NextResult_error;
3957  }
3958 
3959  const NdbRootFragment* rootFrag;
3960 
3961 #if 0 // TODO ::firstResult() on root operation is unused, incomplete & untested
3962  if (unlikely(getParentOperation()==NULL))
3963  {
3964  // Reset *all* ResultStreams, optionaly order them, and find new current among them
3965  for( Uint32 i = 0; i<m_queryImpl.getRootFragCount(); i++)
3966  {
3967  m_resultStreams[i]->firstResult();
3968  }
3969  rootFrag = m_queryImpl.m_applFrags.reorganize();
3970  assert(rootFrag==NULL || rootFrag==m_queryImpl.m_applFrags.getCurrent());
3971  }
3972  else
3973 #endif
3974 
3975  {
3976  assert(getParentOperation()!=NULL); // TODO, See above
3977  rootFrag = m_queryImpl.m_applFrags.getCurrent();
3978  }
3979 
3980  if (rootFrag != NULL)
3981  {
3982  NdbResultStream& resultStream = rootFrag->getResultStream(*this);
3983  if (resultStream.firstResult() != tupleNotFound)
3984  {
3985  fetchRow(resultStream);
3986  return NdbQuery::NextResult_gotRow;
3987  }
3988  }
3989  nullifyResult();
3990  return NdbQuery::NextResult_scanComplete;
3991 } //NdbQueryOperationImpl::firstResult()
3992 
3993 
3995 NdbQueryOperationImpl::nextResult(bool fetchAllowed, bool forceSend)
3996 {
3997  if (unlikely(getQuery().m_state < NdbQueryImpl::Executing ||
3998  getQuery().m_state >= NdbQueryImpl::Closed)) {
3999  int state = getQuery().m_state;
4000  assert (state >= NdbQueryImpl::Initial && state < NdbQueryImpl::Destructed);
4001  if (state == NdbQueryImpl::Failed)
4002  getQuery().setErrorCode(QRY_IN_ERROR_STATE);
4003  else
4004  getQuery().setErrorCode(QRY_ILLEGAL_STATE);
4005  DEBUG_CRASH();
4006  return NdbQuery::NextResult_error;
4007  }
4008 
4009  if (this == &getRoot())
4010  {
4011  return m_queryImpl.nextRootResult(fetchAllowed,forceSend);
4012  }
4017  else if (m_operationDef.isScanOperation())
4018  {
4019  const NdbRootFragment* rootFrag = m_queryImpl.m_applFrags.getCurrent();
4020  if (rootFrag!=NULL)
4021  {
4022  NdbResultStream& resultStream = rootFrag->getResultStream(*this);
4023  if (resultStream.nextResult() != tupleNotFound)
4024  {
4025  fetchRow(resultStream);
4026  return NdbQuery::NextResult_gotRow;
4027  }
4028  }
4029  }
4030  nullifyResult();
4031  return NdbQuery::NextResult_scanComplete;
4032 } //NdbQueryOperationImpl::nextResult()
4033 
4034 
4035 void
4036 NdbQueryOperationImpl::fetchRow(NdbResultStream& resultStream)
4037 {
4038  const char* buff = resultStream.getCurrentRow();
4039  assert(buff!=NULL || (m_firstRecAttr==NULL && m_ndbRecord==NULL));
4040 
4041  m_isRowNull = false;
4042  if (m_firstRecAttr != NULL)
4043  {
4044  NdbRecAttr* recAttr = m_firstRecAttr;
4045  Uint32 posInRow = 0;
4046  while (recAttr != NULL)
4047  {
4048  const char *attrData = NULL;
4049  Uint32 attrSize = 0;
4050  const int retVal1 = resultStream.getReceiver()
4051  .getScanAttrData(attrData, attrSize, posInRow);
4052  UNUSED(retVal1);
4053  assert(retVal1==0);
4054  assert(attrData!=NULL);
4055  const bool retVal2 = recAttr
4056  ->receive_data(reinterpret_cast<const Uint32*>(attrData), attrSize);
4057  UNUSED(retVal2);
4058  assert(retVal2);
4059  recAttr = recAttr->next();
4060  }
4061  }
4062  if (m_ndbRecord != NULL)
4063  {
4064  if (m_resultRef!=NULL)
4065  {
4066  // Set application pointer to point into internal buffer.
4067  *m_resultRef = buff;
4068  }
4069  else
4070  {
4071  assert(m_resultBuffer!=NULL);
4072  // Copy result to buffer supplied by application.
4073  memcpy(m_resultBuffer, buff,
4074  resultStream.getReceiver().m_record.m_ndb_record->m_row_size);
4075  }
4076  }
4077 } // NdbQueryOperationImpl::fetchRow
4078 
4079 
4080 void
4081 NdbQueryOperationImpl::nullifyResult()
4082 {
4083  if (!m_isRowNull)
4084  {
4085  /* This operation gave no result for the current row.*/
4086  m_isRowNull = true;
4087  if (m_resultRef!=NULL)
4088  {
4089  // Set the pointer supplied by the application to NULL.
4090  *m_resultRef = NULL;
4091  }
4092  /* We should not give any results for the descendants either.*/
4093  for (Uint32 i = 0; i<getNoOfChildOperations(); i++)
4094  {
4095  getChildOperation(i).nullifyResult();
4096  }
4097  }
4098 } // NdbQueryOperationImpl::nullifyResult
4099 
4100 bool
4101 NdbQueryOperationImpl::isRowNULL() const
4102 {
4103  return m_isRowNull;
4104 }
4105 
4106 bool
4107 NdbQueryOperationImpl::isRowChanged() const
4108 {
4109  // FIXME: Need to be implemented as scan linked with scan is now implemented.
4110  return true;
4111 }
4112 
4113 static bool isSetInMask(const unsigned char* mask, int bitNo)
4114 {
4115  return mask[bitNo>>3] & 1<<(bitNo&7);
4116 }
4117 
4118 int
4119 NdbQueryOperationImpl::serializeProject(Uint32Buffer& attrInfo)
4120 {
4121  Uint32 startPos = attrInfo.getSize();
4122  attrInfo.append(0U); // Temp write firste 'length' word, update later
4123 
4130  if (m_ndbRecord != NULL) {
4132  Uint32 requestedCols= 0;
4133  Uint32 maxAttrId= 0;
4134 
4135  for (Uint32 i= 0; i<m_ndbRecord->noOfColumns; i++)
4136  {
4137  const NdbRecord::Attr* const col= &m_ndbRecord->columns[i];
4138  Uint32 attrId= col->attrId;
4139 
4140  if (m_read_mask == NULL || isSetInMask(m_read_mask, i))
4141  { if (attrId > maxAttrId)
4142  maxAttrId= attrId;
4143 
4144  readMask.set(attrId);
4145  requestedCols++;
4146 
4147  const NdbColumnImpl* const column = getQueryOperationDef().getTable()
4148  .getColumn(col->column_no);
4149  if (column->getStorageType() == NDB_STORAGETYPE_DISK)
4150  {
4151  m_diskInUserProjection = true;
4152  }
4153  }
4154  }
4155 
4156  // Test for special case, get all columns:
4157  if (requestedCols == (unsigned)m_operationDef.getTable().getNoOfColumns()) {
4158  Uint32 ah;
4159  AttributeHeader::init(&ah, AttributeHeader::READ_ALL, requestedCols);
4160  attrInfo.append(ah);
4161  } else if (requestedCols > 0) {
4162  /* Serialize projection as a bitmap.*/
4163  const Uint32 wordCount = 1+maxAttrId/32; // Size of mask.
4164  Uint32* dst = attrInfo.alloc(wordCount+1);
4165  AttributeHeader::init(dst,
4166  AttributeHeader::READ_PACKED, 4*wordCount);
4167  memcpy(dst+1, &readMask, 4*wordCount);
4168  }
4169  } // if (m_ndbRecord...)
4170 
4174  const NdbRecAttr* recAttr = m_firstRecAttr;
4175  /* Serialize projection as a list of Attribute id's.*/
4176  while (recAttr) {
4177  Uint32 ah;
4179  recAttr->attrId(),
4180  0);
4181  attrInfo.append(ah);
4182  if (recAttr->getColumn()->getStorageType() == NDB_STORAGETYPE_DISK)
4183  {
4184  m_diskInUserProjection = true;
4185  }
4186  recAttr = recAttr->next();
4187  }
4188 
4189  bool withCorrelation = getRoot().getQueryDef().isScanQuery();
4190  if (withCorrelation) {
4191  Uint32 ah;
4192  AttributeHeader::init(&ah, AttributeHeader::CORR_FACTOR64, 0);
4193  attrInfo.append(ah);
4194  }
4195 
4196  // Size of projection in words.
4197  Uint32 length = attrInfo.getSize() - startPos - 1 ;
4198  attrInfo.put(startPos, length);
4199  return 0;
4200 } // NdbQueryOperationImpl::serializeProject
4201 
4202 int NdbQueryOperationImpl::serializeParams(const NdbQueryParamValue* paramValues)
4203 {
4204  if (unlikely(paramValues == NULL))
4205  {
4206  return QRY_REQ_ARG_IS_NULL;
4207  }
4208 
4209  const NdbQueryOperationDefImpl& def = getQueryOperationDef();
4210  for (Uint32 i=0; i<def.getNoOfParameters(); i++)
4211  {
4212  const NdbParamOperandImpl& paramDef = def.getParameter(i);
4213  const NdbQueryParamValue& paramValue = paramValues[paramDef.getParamIx()];
4214 
4221  const Uint32 oldSize = m_params.getSize();
4222  m_params.append(0); // Place holder for length.
4223  bool null;
4224  Uint32 len;
4225  const int error =
4226  paramValue.serializeValue(*paramDef.getColumn(), m_params, len, null);
4227  if (unlikely(error))
4228  return error;
4229  if (unlikely(null))
4230  return Err_KeyIsNULL;
4231 
4232  if(unlikely(m_params.isMemoryExhausted())){
4233  return Err_MemoryAlloc;
4234  }
4235  // Back patch length field.
4236  m_params.put(oldSize, len);
4237  }
4238  return 0;
4239 } // NdbQueryOperationImpl::serializeParams
4240 
4241 Uint32
4242 NdbQueryOperationImpl
4243 ::calculateBatchedRows(const NdbQueryOperationImpl* closestScan)
4244 {
4245  const NdbQueryOperationImpl* myClosestScan;
4246  if (m_operationDef.isScanOperation())
4247  {
4248  myClosestScan = this;
4249  }
4250  else
4251  {
4252  myClosestScan = closestScan;
4253  }
4254 
4255  Uint32 maxBatchRows = 0;
4256  if (myClosestScan != NULL)
4257  {
4258 
4259  // To force usage of SCAN_NEXTREQ even for small scans resultsets
4260  if (testNextReq)
4261  {
4262  m_maxBatchRows = 1;
4263  }
4264 
4265  const Ndb& ndb = *getQuery().getNdbTransaction().getNdb();
4266 
4277  Uint32 batchByteSize, firstBatchRows;
4283  maxBatchRows = myClosestScan->m_maxBatchRows;
4284  NdbReceiver::calculate_batch_size(* ndb.theImpl,
4285  m_ndbRecord,
4286  m_firstRecAttr,
4287  0, // Key size.
4288  getRoot().m_parallelism
4289  == Parallelism_max ?
4290  m_queryImpl.getRootFragCount() :
4291  getRoot().m_parallelism,
4292  maxBatchRows,
4293  batchByteSize,
4294  firstBatchRows);
4295  assert(maxBatchRows > 0);
4296  assert(firstBatchRows == maxBatchRows);
4297  }
4298 
4299  // Find the largest value that is acceptable to all lookup descendants.
4300  for (Uint32 i = 0; i < m_children.size(); i++)
4301  {
4302  const Uint32 childMaxBatchRows =
4303  m_children[i]->calculateBatchedRows(myClosestScan);
4304  maxBatchRows = MIN(maxBatchRows, childMaxBatchRows);
4305  }
4306 
4307  if (m_operationDef.isScanOperation())
4308  {
4309  // Use this value for current op and all lookup descendants.
4310  m_maxBatchRows = maxBatchRows;
4311  // Return max(Unit32) to avoid interfering with batch size calculation
4312  // for parent.
4313  return 0xffffffff;
4314  }
4315  else
4316  {
4317  return maxBatchRows;
4318  }
4319 } // NdbQueryOperationImpl::calculateBatchedRows
4320 
4321 
4322 void
4323 NdbQueryOperationImpl::setBatchedRows(Uint32 batchedRows)
4324 {
4325  if (!m_operationDef.isScanOperation())
4326  {
4330  m_maxBatchRows = batchedRows;
4331  }
4332 
4333  for (Uint32 i = 0; i < m_children.size(); i++)
4334  {
4335  m_children[i]->setBatchedRows(m_maxBatchRows);
4336  }
4337 }
4338 
4339 int
4340 NdbQueryOperationImpl::prepareAttrInfo(Uint32Buffer& attrInfo)
4341 {
4342  const NdbQueryOperationDefImpl& def = getQueryOperationDef();
4343 
4350  if (def.getType() == NdbQueryOperationDef::UniqueIndexAccess)
4351  {
4352  // Reserve memory for LookupParameters, fill in contents later when
4353  // 'length' and 'requestInfo' has been calculated.
4354  Uint32 startPos = attrInfo.getSize();
4355  attrInfo.alloc(QN_LookupParameters::NodeSize);
4356  Uint32 requestInfo = 0;
4357 
4358  if (m_params.getSize() > 0)
4359  {
4360  // parameter values has been serialized as part of NdbTransaction::createQuery()
4361  // Only need to append it to rest of the serialized arguments
4362  requestInfo |= DABits::PI_KEY_PARAMS;
4363  attrInfo.append(m_params);
4364  }
4365 
4366  QN_LookupParameters* param = reinterpret_cast<QN_LookupParameters*>(attrInfo.addr(startPos));
4367  if (unlikely(param==NULL))
4368  return Err_MemoryAlloc;
4369 
4370  param->requestInfo = requestInfo;
4371  param->resultData = getIdOfReceiver();
4372  Uint32 length = attrInfo.getSize() - startPos;
4373  if (unlikely(length > 0xFFFF)) {
4374  return QRY_DEFINITION_TOO_LARGE; //Query definition too large.
4375  }
4376  QueryNodeParameters::setOpLen(param->len,
4377  QueryNodeParameters::QN_LOOKUP,
4378  length);
4379 
4380 #ifdef __TRACE_SERIALIZATION
4381  ndbout << "Serialized params for index node "
4382  << m_operationDef.getQueryOperationId()-1 << " : ";
4383  for(Uint32 i = startPos; i < attrInfo.getSize(); i++){
4384  char buf[12];
4385  sprintf(buf, "%.8x", attrInfo.get(i));
4386  ndbout << buf << " ";
4387  }
4388  ndbout << endl;
4389 #endif
4390  } // if (UniqueIndexAccess ...
4391 
4392  // Reserve memory for LookupParameters, fill in contents later when
4393  // 'length' and 'requestInfo' has been calculated.
4394  Uint32 startPos = attrInfo.getSize();
4395  Uint32 requestInfo = 0;
4396  bool isRoot = (def.getQueryOperationIx()==0);
4397 
4398  QueryNodeParameters::OpType paramType =
4399  !def.isScanOperation() ? QueryNodeParameters::QN_LOOKUP
4400  : (isRoot) ? QueryNodeParameters::QN_SCAN_FRAG
4401  : QueryNodeParameters::QN_SCAN_INDEX;
4402 
4403  if (paramType == QueryNodeParameters::QN_SCAN_INDEX)
4404  attrInfo.alloc(QN_ScanIndexParameters::NodeSize);
4405  else if (paramType == QueryNodeParameters::QN_SCAN_FRAG)
4406  attrInfo.alloc(QN_ScanFragParameters::NodeSize);
4407  else
4408  attrInfo.alloc(QN_LookupParameters::NodeSize);
4409 
4410  // SPJ block assume PARAMS to be supplied before ATTR_LIST
4411  if (m_params.getSize() > 0 &&
4412  def.getType() != NdbQueryOperationDef::UniqueIndexAccess)
4413  {
4414  // parameter values has been serialized as part of NdbTransaction::createQuery()
4415  // Only need to append it to rest of the serialized arguments
4416  requestInfo |= DABits::PI_KEY_PARAMS;
4417  attrInfo.append(m_params);
4418  }
4419 
4420  if (hasInterpretedCode())
4421  {
4422  requestInfo |= DABits::PI_ATTR_INTERPRET;
4423  const int error= prepareInterpretedCode(attrInfo);
4424  if (unlikely(error))
4425  {
4426  return error;
4427  }
4428  }
4429 
4430  if (m_ndbRecord!=NULL || m_firstRecAttr!=NULL)
4431  {
4432  requestInfo |= DABits::PI_ATTR_LIST;
4433  const int error = serializeProject(attrInfo);
4434  if (unlikely(error)) {
4435  return error;
4436  }
4437  }
4438 
4439  if (diskInUserProjection())
4440  {
4441  requestInfo |= DABits::PI_DISK_ATTR;
4442  }
4443 
4444  Uint32 length = attrInfo.getSize() - startPos;
4445  if (unlikely(length > 0xFFFF)) {
4446  return QRY_DEFINITION_TOO_LARGE; //Query definition too large.
4447  }
4448 
4449  if (paramType == QueryNodeParameters::QN_SCAN_INDEX)
4450  {
4451  QN_ScanIndexParameters* param = reinterpret_cast<QN_ScanIndexParameters*>(attrInfo.addr(startPos));
4452  if (unlikely(param==NULL))
4453  return Err_MemoryAlloc;
4454 
4455  Ndb& ndb = *m_queryImpl.getNdbTransaction().getNdb();
4456 
4457  Uint32 batchRows = getMaxBatchRows();
4458  Uint32 batchByteSize, firstBatchRows;
4459  NdbReceiver::calculate_batch_size(* ndb.theImpl,
4460  m_ndbRecord,
4461  m_firstRecAttr,
4462  0, // Key size.
4463  m_queryImpl.getRootFragCount(),
4464  batchRows,
4465  batchByteSize,
4466  firstBatchRows);
4467  assert(batchRows == firstBatchRows);
4468  assert(batchRows == getMaxBatchRows());
4469  assert(m_parallelism == Parallelism_max ||
4470  m_parallelism == Parallelism_adaptive);
4471  if (m_parallelism == Parallelism_max)
4472  {
4473  requestInfo |= QN_ScanIndexParameters::SIP_PARALLEL;
4474  }
4475  param->requestInfo = requestInfo;
4476  // Check that both values fit in param->batchSize.
4477  assert(getMaxBatchRows() < (1<<QN_ScanIndexParameters::BatchRowBits));
4478  assert(batchByteSize < (1 << (sizeof param->batchSize * 8
4479  - QN_ScanIndexParameters::BatchRowBits)));
4480  param->batchSize = (batchByteSize << 11) | getMaxBatchRows();
4481  param->resultData = getIdOfReceiver();
4482  QueryNodeParameters::setOpLen(param->len, paramType, length);
4483  }
4484  else if (paramType == QueryNodeParameters::QN_SCAN_FRAG)
4485  {
4486  QN_ScanFragParameters* param = reinterpret_cast<QN_ScanFragParameters*>(attrInfo.addr(startPos));
4487  if (unlikely(param==NULL))
4488  return Err_MemoryAlloc;
4489 
4490  param->requestInfo = requestInfo;
4491  param->resultData = getIdOfReceiver();
4492  QueryNodeParameters::setOpLen(param->len, paramType, length);
4493  }
4494  else
4495  {
4496  assert(paramType == QueryNodeParameters::QN_LOOKUP);
4497  QN_LookupParameters* param = reinterpret_cast<QN_LookupParameters*>(attrInfo.addr(startPos));
4498  if (unlikely(param==NULL))
4499  return Err_MemoryAlloc;
4500 
4501  param->requestInfo = requestInfo;
4502  param->resultData = getIdOfReceiver();
4503  QueryNodeParameters::setOpLen(param->len, paramType, length);
4504  }
4505 
4506 #ifdef __TRACE_SERIALIZATION
4507  ndbout << "Serialized params for node "
4508  << m_operationDef.getQueryOperationId() << " : ";
4509  for(Uint32 i = startPos; i < attrInfo.getSize(); i++){
4510  char buf[12];
4511  sprintf(buf, "%.8x", attrInfo.get(i));
4512  ndbout << buf << " ";
4513  }
4514  ndbout << endl;
4515 #endif
4516 
4517  // Parameter values was appended to AttrInfo, shrink param buffer
4518  // to reduce memory footprint.
4519  m_params.releaseExtend();
4520 
4521  return 0;
4522 } // NdbQueryOperationImpl::prepareAttrInfo
4523 
4524 
4525 int
4526 NdbQueryOperationImpl::prepareKeyInfo(
4527  Uint32Buffer& keyInfo,
4528  const NdbQueryParamValue* actualParam)
4529 {
4530  assert(this == &getRoot()); // Should only be called for root operation.
4531 #ifdef TRACE_SERIALIZATION
4532  int startPos = keyInfo.getSize();
4533 #endif
4534 
4535  const NdbQueryOperationDefImpl::IndexBound* bounds = m_operationDef.getBounds();
4536  if (bounds)
4537  {
4538  const int error = prepareIndexKeyInfo(keyInfo, bounds, actualParam);
4539  if (unlikely(error))
4540  return error;
4541  }
4542 
4543  const NdbQueryOperandImpl* const* keys = m_operationDef.getKeyOperands();
4544  if (keys)
4545  {
4546  const int error = prepareLookupKeyInfo(keyInfo, keys, actualParam);
4547  if (unlikely(error))
4548  return error;
4549  }
4550 
4551  if (unlikely(keyInfo.isMemoryExhausted())) {
4552  return Err_MemoryAlloc;
4553  }
4554 
4555 #ifdef TRACE_SERIALIZATION
4556  ndbout << "Serialized KEYINFO for NdbQuery root : ";
4557  for (Uint32 i = startPos; i < keyInfo.getSize(); i++) {
4558  char buf[12];
4559  sprintf(buf, "%.8x", keyInfo.get(i));
4560  ndbout << buf << " ";
4561  }
4562  ndbout << endl;
4563 #endif
4564 
4565  return 0;
4566 } // NdbQueryOperationImpl::prepareKeyInfo
4567 
4568 
4577 static int
4578 serializeConstOp(const NdbConstOperandImpl& constOp,
4579  Uint32Buffer& buffer,
4580  Uint32& len)
4581 {
4582  // Check that column->shrink_varchar() not specified, only used by mySQL
4583  // assert (!(column->flags & NdbDictionary::RecMysqldShrinkVarchar));
4584  buffer.skipRestOfWord();
4585  len = constOp.getSizeInBytes();
4586  Uint8 shortLen[2];
4587  switch (constOp.getColumn()->getArrayType()) {
4588  case NdbDictionary::Column::ArrayTypeFixed:
4589  buffer.appendBytes(constOp.getAddr(), len);
4590  break;
4591 
4592  case NdbDictionary::Column::ArrayTypeShortVar:
4593  if (unlikely(len > 0xFF))
4594  return QRY_CHAR_OPERAND_TRUNCATED;
4595  shortLen[0] = (unsigned char)len;
4596  buffer.appendBytes(shortLen, 1);
4597  buffer.appendBytes(constOp.getAddr(), len);
4598  len+=1;
4599  break;
4600 
4601  case NdbDictionary::Column::ArrayTypeMediumVar:
4602  if (unlikely(len > 0xFFFF))
4603  return QRY_CHAR_OPERAND_TRUNCATED;
4604  shortLen[0] = (unsigned char)(len & 0xFF);
4605  shortLen[1] = (unsigned char)(len >> 8);
4606  buffer.appendBytes(shortLen, 2);
4607  buffer.appendBytes(constOp.getAddr(), len);
4608  len+=2;
4609  break;
4610 
4611  default:
4612  assert(false);
4613  }
4614  if (unlikely(buffer.isMemoryExhausted())) {
4615  return Err_MemoryAlloc;
4616  }
4617  return 0;
4618 } // static serializeConstOp
4619 
4620 static int
4621 appendBound(Uint32Buffer& keyInfo,
4622  NdbIndexScanOperation::BoundType type, const NdbQueryOperandImpl* bound,
4623  const NdbQueryParamValue* actualParam)
4624 {
4625  Uint32 len = 0;
4626 
4627  keyInfo.append(type);
4628  const Uint32 oldSize = keyInfo.getSize();
4629  keyInfo.append(0); // Place holder for AttributeHeader
4630 
4631  switch(bound->getKind()){
4632  case NdbQueryOperandImpl::Const:
4633  {
4634  const NdbConstOperandImpl& constOp =
4635  static_cast<const NdbConstOperandImpl&>(*bound);
4636 
4637  const int error = serializeConstOp(constOp, keyInfo, len);
4638  if (unlikely(error))
4639  return error;
4640 
4641  break;
4642  }
4643  case NdbQueryOperandImpl::Param:
4644  {
4645  const NdbParamOperandImpl* const paramOp
4646  = static_cast<const NdbParamOperandImpl*>(bound);
4647  const int paramNo = paramOp->getParamIx();
4648  assert(actualParam != NULL);
4649 
4650  bool null;
4651  const int error =
4652  actualParam[paramNo].serializeValue(*paramOp->getColumn(), keyInfo,
4653  len, null);
4654  if (unlikely(error))
4655  return error;
4656  if (unlikely(null))
4657  return Err_KeyIsNULL;
4658  break;
4659  }
4660  case NdbQueryOperandImpl::Linked: // Root operation cannot have linked operands.
4661  default:
4662  assert(false);
4663  }
4664 
4665  // Back patch attribute header.
4666  keyInfo.put(oldSize,
4667  AttributeHeader(bound->getColumn()->m_attrId, len).m_value);
4668 
4669  return 0;
4670 } // static appendBound()
4671 
4672 
4673 int
4674 NdbQueryOperationImpl::prepareIndexKeyInfo(
4675  Uint32Buffer& keyInfo,
4676  const NdbQueryOperationDefImpl::IndexBound* bounds,
4677  const NdbQueryParamValue* actualParam)
4678 {
4679  int startPos = keyInfo.getSize();
4680  if (bounds->lowKeys==0 && bounds->highKeys==0) // No Bounds defined
4681  return 0;
4682 
4683  const unsigned key_count =
4684  (bounds->lowKeys >= bounds->highKeys) ? bounds->lowKeys : bounds->highKeys;
4685 
4686  for (unsigned keyNo = 0; keyNo < key_count; keyNo++)
4687  {
4689 
4690  /* If upper and lower limit is equal, a single BoundEQ is sufficient */
4691  if (keyNo < bounds->lowKeys &&
4692  keyNo < bounds->highKeys &&
4693  bounds->low[keyNo] == bounds->high[keyNo])
4694  {
4695  /* Inclusive if defined, or matching rows can include this value */
4696  bound_type= NdbIndexScanOperation::BoundEQ;
4697  int error = appendBound(keyInfo, bound_type, bounds->low[keyNo], actualParam);
4698  if (unlikely(error))
4699  return error;
4700 
4701  } else {
4702 
4703  /* If key is part of lower bound */
4704  if (keyNo < bounds->lowKeys)
4705  {
4706  /* Inclusive if defined, or matching rows can include this value */
4707  bound_type= bounds->lowIncl || keyNo+1 < bounds->lowKeys ?
4709 
4710  int error = appendBound(keyInfo, bound_type, bounds->low[keyNo], actualParam);
4711  if (unlikely(error))
4712  return error;
4713  }
4714 
4715  /* If key is part of upper bound */
4716  if (keyNo < bounds->highKeys)
4717  {
4718  /* Inclusive if defined, or matching rows can include this value */
4719  bound_type= bounds->highIncl || keyNo+1 < bounds->highKeys ?
4721 
4722  int error = appendBound(keyInfo, bound_type, bounds->high[keyNo], actualParam);
4723  if (unlikely(error))
4724  return error;
4725  }
4726  }
4727  }
4728 
4729  Uint32 length = keyInfo.getSize()-startPos;
4730  if (unlikely(keyInfo.isMemoryExhausted())) {
4731  return Err_MemoryAlloc;
4732  } else if (unlikely(length > 0xFFFF)) {
4733  return QRY_DEFINITION_TOO_LARGE; // Query definition too large.
4734  } else if (likely(length > 0)) {
4735  keyInfo.put(startPos, keyInfo.get(startPos) | (length << 16));
4736  }
4737 
4738  m_queryImpl.m_shortestBound =(bounds->lowKeys <= bounds->highKeys) ? bounds->lowKeys : bounds->highKeys;
4739  return 0;
4740 } // NdbQueryOperationImpl::prepareIndexKeyInfo
4741 
4742 
4743 int
4744 NdbQueryOperationImpl::prepareLookupKeyInfo(
4745  Uint32Buffer& keyInfo,
4746  const NdbQueryOperandImpl* const keys[],
4747  const NdbQueryParamValue* actualParam)
4748 {
4749  const int keyCount = m_operationDef.getIndex()!=NULL ?
4750  static_cast<int>(m_operationDef.getIndex()->getNoOfColumns()) :
4751  m_operationDef.getTable().getNoOfPrimaryKeys();
4752 
4753  for (int keyNo = 0; keyNo<keyCount; keyNo++)
4754  {
4755  Uint32 dummy;
4756 
4757  switch(keys[keyNo]->getKind()){
4758  case NdbQueryOperandImpl::Const:
4759  {
4760  const NdbConstOperandImpl* const constOp
4761  = static_cast<const NdbConstOperandImpl*>(keys[keyNo]);
4762  const int error =
4763  serializeConstOp(*constOp, keyInfo, dummy);
4764  if (unlikely(error))
4765  return error;
4766 
4767  break;
4768  }
4769  case NdbQueryOperandImpl::Param:
4770  {
4771  const NdbParamOperandImpl* const paramOp
4772  = static_cast<const NdbParamOperandImpl*>(keys[keyNo]);
4773  int paramNo = paramOp->getParamIx();
4774  assert(actualParam != NULL);
4775 
4776  bool null;
4777  const int error =
4778  actualParam[paramNo].serializeValue(*paramOp->getColumn(), keyInfo,
4779  dummy, null);
4780 
4781  if (unlikely(error))
4782  return error;
4783  if (unlikely(null))
4784  return Err_KeyIsNULL;
4785  break;
4786  }
4787  case NdbQueryOperandImpl::Linked: // Root operation cannot have linked operands.
4788  default:
4789  assert(false);
4790  }
4791  }
4792 
4793  if (unlikely(keyInfo.isMemoryExhausted())) {
4794  return Err_MemoryAlloc;
4795  }
4796 
4797  return 0;
4798 } // NdbQueryOperationImpl::prepareLookupKeyInfo
4799 
4800 
4801 bool
4802 NdbQueryOperationImpl::execTRANSID_AI(const Uint32* ptr, Uint32 len)
4803 {
4804  TupleCorrelation tupleCorrelation;
4805  NdbRootFragment* rootFrag = m_queryImpl.m_rootFrags;
4806 
4807  if (getQueryDef().isScanQuery())
4808  {
4809  const CorrelationData correlData(ptr, len);
4810  const Uint32 receiverId = correlData.getRootReceiverId();
4811 
4816  rootFrag =
4817  NdbRootFragment::receiverIdLookup(m_queryImpl.m_rootFrags,
4818  m_queryImpl.getRootFragCount(),
4819  receiverId);
4820  if (unlikely(rootFrag == NULL))
4821  {
4822  assert(false);
4823  return false;
4824  }
4825 
4826  // Extract tuple correlation.
4827  tupleCorrelation = correlData.getTupleCorrelation();
4828  len -= CorrelationData::wordCount;
4829  }
4830 
4831  if (traceSignals) {
4832  ndbout << "NdbQueryOperationImpl::execTRANSID_AI()"
4833  << ", operation no: " << getQueryOperationDef().getQueryOperationIx()
4834  << ", fragment no: " << rootFrag->getFragNo()
4835  << endl;
4836  }
4837 
4838  // Process result values.
4839  rootFrag->getResultStream(*this).execTRANSID_AI(ptr, len, tupleCorrelation);
4840  rootFrag->incrOutstandingResults(-1);
4841 
4842  bool ret = false;
4843  if (rootFrag->isFragBatchComplete())
4844  {
4845  ret = m_queryImpl.handleBatchComplete(*rootFrag);
4846  }
4847 
4848  if (traceSignals) {
4849  ndbout << "NdbQueryOperationImpl::execTRANSID_AI(): returns:" << ret
4850  << ", *this=" << *this << endl;
4851  }
4852  return ret;
4853 } //NdbQueryOperationImpl::execTRANSID_AI
4854 
4855 
4856 bool
4858 {
4859  if (traceSignals) {
4860  ndbout << "NdbQueryOperationImpl::execTCKEYREF()" << endl;
4861  }
4862 
4863  /* The SPJ block does not forward TCKEYREFs for trees with scan roots.*/
4864  assert(!getQueryDef().isScanQuery());
4865 
4866  const TcKeyRef* ref = CAST_CONSTPTR(TcKeyRef, aSignal->getDataPtr());
4867  if (!getQuery().m_transaction.checkState_TransId(ref->transId))
4868  {
4869 #ifdef NDB_NO_DROPPED_SIGNAL
4870  abort();
4871 #endif
4872  return false;
4873  }
4874 
4875  // Suppress 'TupleNotFound' status for child operations.
4876  if (&getRoot() == this ||
4877  ref->errorCode != static_cast<Uint32>(Err_TupleNotFound))
4878  {
4879  getQuery().setErrorCode(ref->errorCode);
4880  if (aSignal->getLength() == TcKeyRef::SignalLength)
4881  {
4882  // Signal may contain additional error data
4883  getQuery().m_error.details = (char *)ref->errorData;
4884  }
4885  }
4886 
4887  NdbRootFragment& rootFrag = getQuery().m_rootFrags[0];
4888 
4889  if (ref->errorCode != DbspjErr::NodeFailure)
4890  {
4891  // Compensate for children results not produced.
4892  // (doSend() assumed all child results to be materialized)
4893  Uint32 cnt = 0;
4894  cnt += 1; // self
4895  cnt += getNoOfDescendantOperations();
4896  if (getNoOfChildOperations() > 0)
4897  {
4898  cnt += getNoOfLeafOperations();
4899  }
4900  rootFrag.incrOutstandingResults(- Int32(cnt));
4901  }
4902  else
4903  {
4904  // consider frag-batch complete
4905  rootFrag.clearOutstandingResults();
4906  }
4907 
4908  bool ret = false;
4909  if (rootFrag.isFragBatchComplete())
4910  {
4911  ret = m_queryImpl.handleBatchComplete(rootFrag);
4912  }
4913 
4914  if (traceSignals) {
4915  ndbout << "NdbQueryOperationImpl::execTCKEYREF(): returns:" << ret
4916  << ", resultStream= {" << rootFrag.getResultStream(*this) << "}"
4917  << ", *this=" << *this << endl;
4918  }
4919  return ret;
4920 } //NdbQueryOperationImpl::execTCKEYREF
4921 
4922 bool
4924  Uint32 rowCount,
4925  Uint32 nodeMask,
4926  NdbReceiver* receiver)
4927 {
4928  if (traceSignals) {
4929  ndbout << "NdbQueryOperationImpl::execSCAN_TABCONF(rows: " << rowCount
4930  << " nodeMask: H'" << hex << nodeMask << ")" << endl;
4931  }
4932  assert((tcPtrI==RNIL && nodeMask==0) ||
4933  (tcPtrI!=RNIL && nodeMask!=0));
4934  assert(checkMagicNumber());
4935  // For now, only the root operation may be a scan.
4936  assert(&getRoot() == this);
4937  assert(m_operationDef.isScanOperation());
4938 
4939  NdbRootFragment* rootFrag =
4940  NdbRootFragment::receiverIdLookup(m_queryImpl.m_rootFrags,
4941  m_queryImpl.getRootFragCount(),
4942  receiver->getId());
4943  if (unlikely(rootFrag == NULL))
4944  {
4945  assert(false);
4946  return false;
4947  }
4948  // Prepare for SCAN_NEXTREQ, tcPtrI==RNIL, nodeMask==0 -> EOF
4949  rootFrag->setConfReceived(tcPtrI);
4950  rootFrag->setRemainingSubScans(nodeMask);
4951  rootFrag->incrOutstandingResults(rowCount);
4952 
4953  if(traceSignals){
4954  ndbout << " resultStream {" << rootFrag->getResultStream(*this)
4955  << "} fragNo" << rootFrag->getFragNo()
4956  << endl;
4957  }
4958 
4959  bool ret = false;
4960  if (rootFrag->isFragBatchComplete())
4961  {
4962  /* This fragment is now complete */
4963  ret = m_queryImpl.handleBatchComplete(*rootFrag);
4964  }
4965  if (traceSignals) {
4966  ndbout << "NdbQueryOperationImpl::execSCAN_TABCONF():, returns:" << ret
4967  << ", tcPtrI=" << tcPtrI << " rowCount=" << rowCount
4968  << " *this=" << *this << endl;
4969  }
4970  return ret;
4971 } //NdbQueryOperationImpl::execSCAN_TABCONF
4972 
4973 int
4975 {
4976  if (getQueryOperationDef().getType() != NdbQueryOperationDef::OrderedIndexScan)
4977  {
4978  getQuery().setErrorCode(QRY_WRONG_OPERATION_TYPE);
4979  return -1;
4980  }
4981 
4982  if (m_parallelism != Parallelism_max)
4983  {
4984  getQuery().setErrorCode(QRY_SEQUENTIAL_SCAN_SORTED);
4985  return -1;
4986  }
4987 
4988  if(static_cast<const NdbQueryIndexScanOperationDefImpl&>
4989  (getQueryOperationDef())
4990  .getOrdering() != NdbQueryOptions::ScanOrdering_void)
4991  {
4992  getQuery().setErrorCode(QRY_SCAN_ORDER_ALREADY_SET);
4993  return -1;
4994  }
4995 
4996  /* Check if query is sorted and has multiple scan operations. This
4997  * combination is not implemented.
4998  */
4999  if (ordering != NdbQueryOptions::ScanOrdering_unordered &&
5000  getQueryDef().getQueryType() == NdbQueryDef::MultiScanQuery)
5001  {
5002  getQuery().setErrorCode(QRY_MULTIPLE_SCAN_SORTED);
5003  return -1;
5004  }
5005 
5006  m_ordering = ordering;
5007  return 0;
5008 } // NdbQueryOperationImpl::setOrdering()
5009 
5011 {
5012  if (code.m_instructions_length == 0)
5013  {
5014  return 0;
5015  }
5016 
5017  const NdbTableImpl& table = getQueryOperationDef().getTable();
5018  // Check if operation and interpreter code use the same table
5019  if (unlikely(table.getTableId() != code.getTable()->getTableId()
5020  || table_version_major(table.getObjectVersion()) !=
5021  table_version_major(code.getTable()->getObjectVersion())))
5022  {
5023  getQuery().setErrorCode(Err_InterpretedCodeWrongTab);
5024  return -1;
5025  }
5026 
5027  if (unlikely((code.m_flags & NdbInterpretedCode::Finalised)
5028  == 0))
5029  {
5030  // NdbInterpretedCode::finalise() not called.
5031  getQuery().setErrorCode(Err_FinaliseNotCalled);
5032  return -1;
5033  }
5034 
5035  // Allocate an interpreted code object if we do not have one already.
5036  if (likely(m_interpretedCode == NULL))
5037  {
5038  m_interpretedCode = new NdbInterpretedCode();
5039 
5040  if (unlikely(m_interpretedCode==NULL))
5041  {
5042  getQuery().setErrorCode(Err_MemoryAlloc);
5043  return -1;
5044  }
5045  }
5046 
5047  /*
5048  * Make a deep copy, such that 'code' can be destroyed when this method
5049  * returns.
5050  */
5051  const int error = m_interpretedCode->copy(code);
5052  if (unlikely(error))
5053  {
5054  getQuery().setErrorCode(error);
5055  return -1;
5056  }
5057  return 0;
5058 } // NdbQueryOperationImpl::setInterpretedCode()
5059 
5061  if (!getQueryOperationDef().isScanOperation())
5062  {
5063  getQuery().setErrorCode(QRY_WRONG_OPERATION_TYPE);
5064  return -1;
5065  }
5066  else if (getOrdering() == NdbQueryOptions::ScanOrdering_ascending ||
5067  getOrdering() == NdbQueryOptions::ScanOrdering_descending)
5068  {
5069  getQuery().setErrorCode(QRY_SEQUENTIAL_SCAN_SORTED);
5070  return -1;
5071  }
5072  else if (getQueryOperationDef().getQueryOperationIx() > 0)
5073  {
5074  getQuery().setErrorCode(Err_FunctionNotImplemented);
5075  return -1;
5076  }
5077  else if (parallelism < 1 || parallelism > MAX_NDB_PARTITIONS)
5078  {
5079  getQuery().setErrorCode(Err_ParameterError);
5080  return -1;
5081  }
5082  m_parallelism = parallelism;
5083  return 0;
5084 }
5085 
5087  if (!getQueryOperationDef().isScanOperation())
5088  {
5089  getQuery().setErrorCode(QRY_WRONG_OPERATION_TYPE);
5090  return -1;
5091  }
5092  m_parallelism = Parallelism_max;
5093  return 0;
5094 }
5095 
5097  if (!getQueryOperationDef().isScanOperation())
5098  {
5099  getQuery().setErrorCode(QRY_WRONG_OPERATION_TYPE);
5100  return -1;
5101  }
5102  else if (getQueryOperationDef().getQueryOperationIx() == 0)
5103  {
5104  getQuery().setErrorCode(Err_FunctionNotImplemented);
5105  return -1;
5106  }
5107  m_parallelism = Parallelism_adaptive;
5108  return 0;
5109 }
5110 
5112  if (!getQueryOperationDef().isScanOperation())
5113  {
5114  getQuery().setErrorCode(QRY_WRONG_OPERATION_TYPE);
5115  return -1;
5116  }
5117  if (this != &getRoot() &&
5118  batchSize < getQueryOperationDef().getTable().getFragmentCount())
5119  {
5122  getQuery().setErrorCode(QRY_BATCH_SIZE_TOO_SMALL);
5123  return -1;
5124  }
5125  m_maxBatchRows = batchSize;
5126  return 0;
5127 }
5128 
5129 bool
5130 NdbQueryOperationImpl::hasInterpretedCode() const
5131 {
5132  return (m_interpretedCode && m_interpretedCode->m_instructions_length > 0) ||
5133  (getQueryOperationDef().getInterpretedCode() != NULL);
5134 } // NdbQueryOperationImpl::hasInterpretedCode
5135 
5136 int
5137 NdbQueryOperationImpl::prepareInterpretedCode(Uint32Buffer& attrInfo) const
5138 {
5139  const NdbInterpretedCode* interpretedCode =
5140  (m_interpretedCode && m_interpretedCode->m_instructions_length > 0)
5141  ? m_interpretedCode
5142  : getQueryOperationDef().getInterpretedCode();
5143 
5144  // There should be no subroutines in a filter.
5145  assert(interpretedCode->m_first_sub_instruction_pos==0);
5146  assert(interpretedCode->m_instructions_length > 0);
5147  assert(interpretedCode->m_instructions_length <= 0xffff);
5148 
5149  // Allocate space for program and length field.
5150  Uint32* const buffer =
5151  attrInfo.alloc(1+interpretedCode->m_instructions_length);
5152  if(unlikely(buffer==NULL))
5153  {
5154  return Err_MemoryAlloc;
5155  }
5156 
5157  buffer[0] = interpretedCode->m_instructions_length;
5158  memcpy(buffer+1,
5159  interpretedCode->m_buffer,
5160  interpretedCode->m_instructions_length * sizeof(Uint32));
5161  return 0;
5162 } // NdbQueryOperationImpl::prepareInterpretedCode
5163 
5164 
5165 Uint32
5166 NdbQueryOperationImpl::getIdOfReceiver() const {
5167  NdbRootFragment& rootFrag = m_queryImpl.m_rootFrags[0];
5168  return rootFrag.getResultStream(*this).getReceiver().getId();
5169 }
5170 
5172 {
5173  // Check if row size has been computed yet.
5174  if (m_rowSize == 0xffffffff)
5175  {
5176  m_rowSize =
5177  NdbReceiver::ndbrecord_rowsize(m_ndbRecord, m_firstRecAttr, 0, false);
5178  }
5179  return m_rowSize;
5180 }
5181 
5183 NdbOut& operator<<(NdbOut& out, const NdbQueryOperationImpl& op){
5184  out << "[ this: " << &op
5185  << " m_magic: " << op.m_magic;
5186  out << " op.operationDef.getQueryOperationIx()"
5187  << op.m_operationDef.getQueryOperationIx();
5188  if (op.getParentOperation()){
5189  out << " m_parent: " << op.getParentOperation();
5190  }
5191  for(unsigned int i = 0; i<op.getNoOfChildOperations(); i++){
5192  out << " m_children[" << i << "]: " << &op.getChildOperation(i);
5193  }
5194  out << " m_queryImpl: " << &op.m_queryImpl;
5195  out << " m_operationDef: " << &op.m_operationDef;
5196  for(Uint32 i = 0; i<op.m_queryImpl.getRootFragCount(); i++){
5197  NdbRootFragment& rootFrag = op.m_queryImpl.m_rootFrags[i];
5198  out << " m_resultStream[" << i << "]{" << rootFrag.getResultStream(op) << "}";
5199  }
5200  out << " m_isRowNull " << op.m_isRowNull;
5201  out << " ]";
5202  return out;
5203 }
5204 
5205 NdbOut& operator<<(NdbOut& out, const NdbResultStream& stream){
5206  out << " received rows: " << stream.m_resultSets[stream.m_recv].getRowCount();
5207  return out;
5208 }
5209 
5210 
5211 // Compiler settings require explicit instantiation.
5212 template class Vector<NdbQueryOperationImpl*>;