MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
DbspjMain.cpp
1 /*
2  Copyright (c) 2004, 2011, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 #define DBSPJ_C
19 #include "Dbspj.hpp"
20 
21 #include <SectionReader.hpp>
22 #include <signaldata/LqhKey.hpp>
23 #include <signaldata/QueryTree.hpp>
24 #include <signaldata/TcKeyRef.hpp>
25 #include <signaldata/RouteOrd.hpp>
26 #include <signaldata/TransIdAI.hpp>
27 #include <signaldata/DiGetNodes.hpp>
28 #include <signaldata/DihScanTab.hpp>
29 #include <signaldata/AttrInfo.hpp>
30 #include <Interpreter.hpp>
31 #include <AttributeHeader.hpp>
32 #include <AttributeDescriptor.hpp>
33 #include <KeyDescriptor.hpp>
34 #include <md5_hash.hpp>
35 #include <signaldata/TcKeyConf.hpp>
36 
37 #include <signaldata/NodeFailRep.hpp>
38 #include <signaldata/ReadNodesConf.hpp>
39 
40 // Use DEBUG to print messages that should be
41 // seen only when we debug the product
42 
43 #ifdef VM_TRACE
44 
45 #define DEBUG(x) ndbout << "DBSPJ: "<< x << endl;
46 #define DEBUG_LQHKEYREQ
47 #define DEBUG_SCAN_FRAGREQ
48 
49 #else
50 
51 #define DEBUG(x)
52 
53 #endif
54 
55 #if 1
56 #define DEBUG_CRASH() ndbrequire(false)
57 #else
58 #define DEBUG_CRASH()
59 #endif
60 
61 #if 1
62 #undef DEBUG
63 #define DEBUG(x)
64 #undef DEBUG_LQHKEYREQ
65 #undef DEBUG_SCAN_FRAGREQ
66 #endif
67 
68 const Ptr<Dbspj::TreeNode> Dbspj::NullTreeNodePtr = { 0, RNIL };
69 const Dbspj::RowRef Dbspj::NullRowRef = { RNIL, GLOBAL_PAGE_SIZE_WORDS, { 0 } };
70 
72 void Dbspj::execREAD_CONFIG_REQ(Signal* signal)
73 {
74  jamEntry();
75  const ReadConfigReq req =
76  *reinterpret_cast<const ReadConfigReq*>(signal->getDataPtr());
77 
78  Pool_context pc;
79  pc.m_block = this;
80 
81  DEBUG("execREAD_CONFIG_REQ");
82  DEBUG("sizeof(Request): " << sizeof(Request) <<
83  " sizeof(TreeNode): " << sizeof(TreeNode));
84 
85  m_arenaAllocator.init(1024, RT_SPJ_ARENA_BLOCK, pc);
86  m_request_pool.arena_pool_init(&m_arenaAllocator, RT_SPJ_REQUEST, pc);
87  m_treenode_pool.arena_pool_init(&m_arenaAllocator, RT_SPJ_TREENODE, pc);
88  m_scanfraghandle_pool.arena_pool_init(&m_arenaAllocator, RT_SPJ_SCANFRAG, pc);
89  m_lookup_request_hash.setSize(16);
90  m_scan_request_hash.setSize(16);
91  void* ptr = m_ctx.m_mm.get_memroot();
92  m_page_pool.set((RowPage*)ptr, (Uint32)~0);
93 
94  Record_info ri;
95  Dependency_map::createRecordInfo(ri, RT_SPJ_DATABUFFER);
96  m_dependency_map_pool.init(&m_arenaAllocator, ri, pc);
97 
98  ReadConfigConf* const conf =
99  reinterpret_cast<ReadConfigConf*>(signal->getDataPtrSend());
100  conf->senderRef = reference();
101  conf->senderData = req.senderData;
102 
103  sendSignal(req.senderRef, GSN_READ_CONFIG_CONF, signal,
104  ReadConfigConf::SignalLength, JBB);
105 }//Dbspj::execREAD_CONF_REQ()
106 
107 static Uint32 f_STTOR_REF = 0;
108 
109 void Dbspj::execSTTOR(Signal* signal)
110 {
111 //#define UNIT_TEST_DATABUFFER2
112 
113  jamEntry();
114  /* START CASE */
115  const Uint16 tphase = signal->theData[1];
116  f_STTOR_REF = signal->getSendersBlockRef();
117 
118  ndbout << "Dbspj::execSTTOR() inst:" << instance()
119  << " phase=" << tphase << endl;
120 
121  if (tphase == 1)
122  {
123  jam();
124  signal->theData[0] = 0;
125  sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 1000, 1);
126  }
127 
128  if (tphase == 4)
129  {
130  jam();
131 
132  signal->theData[0] = reference();
133  sendSignal(NDBCNTR_REF, GSN_READ_NODESREQ, signal, 1, JBB);
134  return;
135  }
136 
137  sendSTTORRY(signal);
138 
139 #ifdef UNIT_TEST_DATABUFFER2
140  if (tphase == 120)
141  {
142  ndbout_c("basic test of ArenaPool / DataBuffer2");
143 
144  for (Uint32 i = 0; i<100; i++)
145  {
146  ArenaHead ah;
147  if (!m_arenaAllocator.seize(ah))
148  {
149  ndbout_c("Failed to allocate arena");
150  break;
151  }
152 
153  ndbout_c("*** LOOP %u", i);
154  Uint32 sum = 0;
155  Dependency_map::Head head;
156  LocalArenaPoolImpl pool(ah, m_dependency_map_pool);
157  for (Uint32 j = 0; j<100; j++)
158  {
159  Uint32 sz = rand() % 1000;
160  if (0)
161  ndbout_c("adding %u", sz);
162  Local_dependency_map list(pool, head);
163  for (Uint32 i = 0; i<sz; i++)
164  signal->theData[i] = sum + i;
165  list.append(signal->theData, sz);
166  sum += sz;
167  }
168 
169  {
170  ndbrequire(head.getSize() == sum);
171  Local_dependency_map list(pool, head);
172  Dependency_map::ConstDataBufferIterator it;
173  Uint32 cnt = 0;
174  for (list.first(it); !it.isNull(); list.next(it))
175  {
176  ndbrequire(* it.data == cnt);
177  cnt++;
178  }
179 
180  ndbrequire(cnt == sum);
181  }
182 
183  Resource_limit rl;
184  if (m_ctx.m_mm.get_resource_limit(7, rl))
185  {
186  ndbout_c("Resource %d min: %d max: %d curr: %d",
187  7, rl.m_min, rl.m_max, rl.m_curr);
188  }
189 
190  {
191  ndbout_c("release map");
192  Local_dependency_map list(pool, head);
193  list.release();
194  }
195 
196  ndbout_c("release all");
197  m_arenaAllocator.release(ah);
198  ndbout_c("*** LOOP %u sum: %u", i, sum);
199  }
200  }
201 #endif
202 }//Dbspj::execSTTOR()
203 
204 void
205 Dbspj::sendSTTORRY(Signal* signal)
206 {
207  signal->theData[0] = 0;
208  signal->theData[1] = 0; /* BLOCK CATEGORY */
209  signal->theData[2] = 0; /* SIGNAL VERSION NUMBER */
210  signal->theData[3] = 4;
211 #ifdef UNIT_TEST_DATABUFFER2
212  signal->theData[4] = 120; /* Start phase end*/
213 #else
214  signal->theData[4] = 255;
215 #endif
216  signal->theData[5] = 255;
217  sendSignal(f_STTOR_REF, GSN_STTORRY, signal, 6, JBB);
218 }
219 
220 void
221 Dbspj::execREAD_NODESCONF(Signal* signal)
222 {
223  jamEntry();
224 
225  ReadNodesConf * const conf = (ReadNodesConf *)signal->getDataPtr();
226 
227  if (getNodeState().getNodeRestartInProgress())
228  {
229  jam();
230  c_alive_nodes.assign(NdbNodeBitmask::Size, conf->startedNodes);
231  c_alive_nodes.set(getOwnNodeId());
232  }
233  else
234  {
235  jam();
236  c_alive_nodes.assign(NdbNodeBitmask::Size, conf->startingNodes);
237  NdbNodeBitmask tmp;
238  tmp.assign(NdbNodeBitmask::Size, conf->startedNodes);
239  c_alive_nodes.bitOR(tmp);
240  }
241 
242  sendSTTORRY(signal);
243 }
244 
245 void
246 Dbspj::execINCL_NODEREQ(Signal* signal)
247 {
248  jamEntry();
249  const Uint32 senderRef = signal->theData[0];
250  const Uint32 nodeId = signal->theData[1];
251 
252  ndbrequire(!c_alive_nodes.get(nodeId));
253  c_alive_nodes.set(nodeId);
254 
255  signal->theData[0] = nodeId;
256  signal->theData[1] = reference();
257  sendSignal(senderRef, GSN_INCL_NODECONF, signal, 2, JBB);
258 }
259 
260 void
261 Dbspj::execNODE_FAILREP(Signal* signal)
262 {
263  jamEntry();
264 
265  const NodeFailRep * rep = (NodeFailRep*)signal->getDataPtr();
266  NdbNodeBitmask failed;
267  failed.assign(NdbNodeBitmask::Size, rep->theNodes);
268 
269  c_alive_nodes.bitANDC(failed);
270 
271  signal->theData[0] = 1;
272  signal->theData[1] = 0;
273  failed.copyto(NdbNodeBitmask::Size, signal->theData + 2);
274  sendSignal(reference(), GSN_CONTINUEB, signal, 2 + NdbNodeBitmask::Size,
275  JBB);
276 }
277 
278 void
279 Dbspj::execAPI_FAILREQ(Signal* signal)
280 {
281  jamEntry();
282  Uint32 failedApiNode = signal->theData[0];
283  ndbrequire(signal->theData[1] == QMGR_REF); // As callback hard-codes QMGR
284 
290  signal->theData[0] = failedApiNode;
291  signal->theData[1] = reference();
292  sendSignal(QMGR_REF, GSN_API_FAILCONF, signal, 2, JBB);
293 }
294 
295 void
296 Dbspj::execCONTINUEB(Signal* signal)
297 {
298  jamEntry();
299  switch(signal->theData[0]) {
300  case 0:
301  releaseGlobal(signal);
302  return;
303  case 1:
304  nodeFail_checkRequests(signal);
305  return;
306  case 2:
307  nodeFail_checkRequests(signal);
308  return;
309  }
310 
311  ndbrequire(false);
312 }
313 
314 void
315 Dbspj::nodeFail_checkRequests(Signal* signal)
316 {
317  jam();
318  const Uint32 type = signal->theData[0];
319  const Uint32 bucket = signal->theData[1];
320 
321  NdbNodeBitmask failed;
322  failed.assign(NdbNodeBitmask::Size, signal->theData+2);
323 
324  Request_iterator iter;
325  Request_hash * hash;
326  switch(type){
327  case 1:
328  hash = &m_lookup_request_hash;
329  break;
330  case 2:
331  hash = &m_scan_request_hash;
332  break;
333  }
334  hash->next(bucket, iter);
335 
336  const Uint32 RT_BREAK = 64;
337  for(Uint32 i = 0; (i<RT_BREAK || iter.bucket == bucket) &&
338  !iter.curr.isNull(); i++)
339  {
340  jam();
341 
342  Ptr<Request> requestPtr = iter.curr;
343  hash->next(iter);
344  i += nodeFail(signal, requestPtr, failed);
345  }
346 
347  if (!iter.curr.isNull())
348  {
349  jam();
350  signal->theData[0] = type;
351  signal->theData[1] = bucket;
352  failed.copyto(NdbNodeBitmask::Size, signal->theData+2);
353  sendSignal(reference(), GSN_CONTINUEB, signal, 2 + NdbNodeBitmask::Size,
354  JBB);
355  }
356  else if (type == 1)
357  {
358  jam();
359  signal->theData[0] = 2;
360  signal->theData[1] = 0;
361  failed.copyto(NdbNodeBitmask::Size, signal->theData+2);
362  sendSignal(reference(), GSN_CONTINUEB, signal, 2 + NdbNodeBitmask::Size,
363  JBB);
364  }
365  else if (type == 2)
366  {
367  jam();
368  ndbout_c("Finished with handling node-failure");
369  }
370 }
371 
375 void Dbspj::execLQHKEYREQ(Signal* signal)
376 {
377  jamEntry();
378  c_Counters.incr_counter(CI_READS_RECEIVED, 1);
379 
380  const LqhKeyReq* req = reinterpret_cast<const LqhKeyReq*>(signal->getDataPtr());
381 
388  SectionHandle handle = SectionHandle(this, signal);
389  SegmentedSectionPtr ssPtr;
390  handle.getSection(ssPtr, LqhKeyReq::AttrInfoSectionNum);
391 
392  Uint32 err;
393  Ptr<Request> requestPtr = { 0, RNIL };
394  do
395  {
396  ArenaHead ah;
397  err = DbspjErr::OutOfQueryMemory;
398  if (unlikely(!m_arenaAllocator.seize(ah)))
399  break;
400 
401 
402  m_request_pool.seize(ah, requestPtr);
403 
404  new (requestPtr.p) Request(ah);
405  do_init(requestPtr.p, req, signal->getSendersBlockRef());
406 
407  Uint32 len_cnt;
408 
409  {
410  SectionReader r0(ssPtr, getSectionSegmentPool());
411 
412  err = DbspjErr::ZeroLengthQueryTree;
413  if (unlikely(!r0.getWord(&len_cnt)))
414  break;
415  }
416 
417  Uint32 len = QueryTree::getLength(len_cnt);
418  Uint32 cnt = QueryTree::getNodeCnt(len_cnt);
419 
420  {
421  SectionReader treeReader(ssPtr, getSectionSegmentPool());
422  SectionReader paramReader(ssPtr, getSectionSegmentPool());
423  paramReader.step(len); // skip over tree to parameters
424 
425  Build_context ctx;
426  ctx.m_resultRef = req->variableData[0];
427  ctx.m_savepointId = req->savePointId;
428  ctx.m_scanPrio = 1;
429  ctx.m_start_signal = signal;
430  ctx.m_keyPtr.i = handle.m_ptr[LqhKeyReq::KeyInfoSectionNum].i;
431  ctx.m_senderRef = signal->getSendersBlockRef();
432 
433  err = build(ctx, requestPtr, treeReader, paramReader);
434  if (unlikely(err != 0))
435  break;
436  }
437 
442  ndbassert(requestPtr.p->isLookup());
443  ndbassert(requestPtr.p->m_node_cnt == cnt);
444  err = DbspjErr::InvalidRequest;
445  if (unlikely(!requestPtr.p->isLookup() || requestPtr.p->m_node_cnt != cnt))
446  break;
447 
451  store_lookup(requestPtr);
452 
453  release(ssPtr);
454  handle.clear();
455 
456  start(signal, requestPtr);
457  return;
458  } while (0);
459 
464  if (!requestPtr.isNull())
465  {
466  jam();
467  m_request_pool.release(requestPtr);
468  }
469  releaseSections(handle);
470  handle_early_lqhkey_ref(signal, req, err);
471 }
472 
473 void
474 Dbspj::do_init(Request* requestP, const LqhKeyReq* req, Uint32 senderRef)
475 {
476  requestP->m_bits = 0;
477  requestP->m_errCode = 0;
478  requestP->m_state = Request::RS_BUILDING;
479  requestP->m_node_cnt = 0;
480  requestP->m_cnt_active = 0;
481  requestP->m_rows = 0;
482  requestP->m_active_nodes.clear();
483  requestP->m_outstanding = 0;
484  requestP->m_transId[0] = req->transId1;
485  requestP->m_transId[1] = req->transId2;
486  bzero(requestP->m_lookup_node_data, sizeof(requestP->m_lookup_node_data));
487 #ifdef SPJ_TRACE_TIME
488  requestP->m_cnt_batches = 0;
489  requestP->m_sum_rows = 0;
490  requestP->m_sum_running = 0;
491  requestP->m_sum_waiting = 0;
492  requestP->m_save_time = spj_now();
493 #endif
494  const Uint32 reqInfo = req->requestInfo;
495  Uint32 tmp = req->clientConnectPtr;
496  if (LqhKeyReq::getDirtyFlag(reqInfo) &&
497  LqhKeyReq::getOperation(reqInfo) == ZREAD)
498  {
499  jam();
500 
501  ndbrequire(LqhKeyReq::getApplicationAddressFlag(reqInfo));
502  //const Uint32 apiRef = lqhKeyReq->variableData[0];
503  //const Uint32 apiOpRec = lqhKeyReq->variableData[1];
504  tmp = req->variableData[1];
505  requestP->m_senderData = tmp;
506  requestP->m_senderRef = senderRef;
507  }
508  else
509  {
510  if (LqhKeyReq::getSameClientAndTcFlag(reqInfo) == 1)
511  {
512  if (LqhKeyReq::getApplicationAddressFlag(reqInfo))
513  tmp = req->variableData[2];
514  else
515  tmp = req->variableData[0];
516  }
517  requestP->m_senderData = tmp;
518  requestP->m_senderRef = senderRef;
519  }
520  requestP->m_rootResultData = tmp;
521 }
522 
523 void
524 Dbspj::store_lookup(Ptr<Request> requestPtr)
525 {
526  ndbassert(requestPtr.p->isLookup());
527  Ptr<Request> tmp;
528  bool found = m_lookup_request_hash.find(tmp, *requestPtr.p);
529  ndbrequire(found == false);
530  m_lookup_request_hash.add(requestPtr);
531 }
532 
533 void
534 Dbspj::handle_early_lqhkey_ref(Signal* signal,
535  const LqhKeyReq * lqhKeyReq,
536  Uint32 err)
537 {
541  ndbrequire(err);
542  const Uint32 reqInfo = lqhKeyReq->requestInfo;
543  const Uint32 transid[2] = { lqhKeyReq->transId1, lqhKeyReq->transId2 };
544 
545  if (LqhKeyReq::getDirtyFlag(reqInfo) &&
546  LqhKeyReq::getOperation(reqInfo) == ZREAD)
547  {
548  jam();
549  /* Dirty read sends TCKEYREF direct to client, and nothing to TC */
550  ndbrequire(LqhKeyReq::getApplicationAddressFlag(reqInfo));
551  const Uint32 apiRef = lqhKeyReq->variableData[0];
552  const Uint32 apiOpRec = lqhKeyReq->variableData[1];
553 
554  TcKeyRef* const tcKeyRef = reinterpret_cast<TcKeyRef*>(signal->getDataPtrSend());
555 
556  tcKeyRef->connectPtr = apiOpRec;
557  tcKeyRef->transId[0] = transid[0];
558  tcKeyRef->transId[1] = transid[1];
559  tcKeyRef->errorCode = err;
560  sendTCKEYREF(signal, apiRef, signal->getSendersBlockRef());
561  }
562  else
563  {
564  jam();
565  const Uint32 returnref = signal->getSendersBlockRef();
566  const Uint32 clientPtr = lqhKeyReq->clientConnectPtr;
567 
568  Uint32 TcOprec = clientPtr;
569  if (LqhKeyReq::getSameClientAndTcFlag(reqInfo) == 1)
570  {
571  if (LqhKeyReq::getApplicationAddressFlag(reqInfo))
572  TcOprec = lqhKeyReq->variableData[2];
573  else
574  TcOprec = lqhKeyReq->variableData[0];
575  }
576 
577  LqhKeyRef* const ref = reinterpret_cast<LqhKeyRef*>(signal->getDataPtrSend());
578  ref->userRef = clientPtr;
579  ref->connectPtr = TcOprec;
580  ref->errorCode = err;
581  ref->transId1 = transid[0];
582  ref->transId2 = transid[1];
583  sendSignal(returnref, GSN_LQHKEYREF, signal,
584  LqhKeyRef::SignalLength, JBB);
585  }
586 }
587 
588 void
589 Dbspj::sendTCKEYREF(Signal* signal, Uint32 ref, Uint32 routeRef)
590 {
591  const Uint32 nodeId = refToNode(ref);
592  const bool connectedToNode = getNodeInfo(nodeId).m_connected;
593 
594  if (likely(connectedToNode))
595  {
596  jam();
597  sendSignal(ref, GSN_TCKEYREF, signal, TcKeyRef::SignalLength, JBB);
598  }
599  else
600  {
601  jam();
602  memmove(signal->theData+25, signal->theData, 4*TcKeyRef::SignalLength);
603  RouteOrd* ord = (RouteOrd*)signal->getDataPtrSend();
604  ord->dstRef = ref;
605  ord->srcRef = reference();
606  ord->gsn = GSN_TCKEYREF;
607  ord->cnt = 0;
608  LinearSectionPtr ptr[3];
609  ptr[0].p = signal->theData+25;
610  ptr[0].sz = TcKeyRef::SignalLength;
611  sendSignal(routeRef, GSN_ROUTE_ORD, signal, RouteOrd::SignalLength, JBB,
612  ptr, 1);
613  }
614 }
615 
616 void
617 Dbspj::sendTCKEYCONF(Signal* signal, Uint32 len, Uint32 ref, Uint32 routeRef)
618 {
619  const Uint32 nodeId = refToNode(ref);
620  const bool connectedToNode = getNodeInfo(nodeId).m_connected;
621 
622  if (likely(connectedToNode))
623  {
624  jam();
625  sendSignal(ref, GSN_TCKEYCONF, signal, len, JBB);
626  }
627  else
628  {
629  jam();
630  memmove(signal->theData+25, signal->theData, 4*len);
631  RouteOrd* ord = (RouteOrd*)signal->getDataPtrSend();
632  ord->dstRef = ref;
633  ord->srcRef = reference();
634  ord->gsn = GSN_TCKEYCONF;
635  ord->cnt = 0;
636  LinearSectionPtr ptr[3];
637  ptr[0].p = signal->theData+25;
638  ptr[0].sz = len;
639  sendSignal(routeRef, GSN_ROUTE_ORD, signal, RouteOrd::SignalLength, JBB,
640  ptr, 1);
641  }
642 }
643 
652 void
653 Dbspj::execSCAN_FRAGREQ(Signal* signal)
654 {
655  jamEntry();
656 
657  /* Reassemble if the request was fragmented */
658  if (!assembleFragments(signal))
659  {
660  jam();
661  return;
662  }
663 
664  const ScanFragReq * req = (ScanFragReq *)&signal->theData[0];
665 
666 #ifdef DEBUG_SCAN_FRAGREQ
667  ndbout_c("Incomming SCAN_FRAGREQ ");
668  printSCAN_FRAGREQ(stdout, signal->getDataPtrSend(),
669  ScanFragReq::SignalLength + 2,
670  DBLQH);
671 #endif
672 
680  SectionHandle handle = SectionHandle(this, signal);
681  SegmentedSectionPtr ssPtr;
682  handle.getSection(ssPtr, ScanFragReq::AttrInfoSectionNum);
683 
684  Uint32 err;
685  Ptr<Request> requestPtr = { 0, RNIL };
686  do
687  {
688  ArenaHead ah;
689  err = DbspjErr::OutOfQueryMemory;
690  if (unlikely(!m_arenaAllocator.seize(ah)))
691  break;
692 
693  m_request_pool.seize(ah, requestPtr);
694 
695  new (requestPtr.p) Request(ah);
696  do_init(requestPtr.p, req, signal->getSendersBlockRef());
697 
698  Uint32 len_cnt;
699  {
700  SectionReader r0(ssPtr, getSectionSegmentPool());
701  err = DbspjErr::ZeroLengthQueryTree;
702  if (unlikely(!r0.getWord(&len_cnt)))
703  break;
704  }
705 
706  Uint32 len = QueryTree::getLength(len_cnt);
707  Uint32 cnt = QueryTree::getNodeCnt(len_cnt);
708 
709  {
710  SectionReader treeReader(ssPtr, getSectionSegmentPool());
711  SectionReader paramReader(ssPtr, getSectionSegmentPool());
712  paramReader.step(len); // skip over tree to parameters
713 
714  Build_context ctx;
715  ctx.m_resultRef = req->resultRef;
716  ctx.m_scanPrio = ScanFragReq::getScanPrio(req->requestInfo);
717  ctx.m_savepointId = req->savePointId;
718  ctx.m_batch_size_rows = req->batch_size_rows;
719  ctx.m_start_signal = signal;
720  ctx.m_senderRef = signal->getSendersBlockRef();
721 
722  if (handle.m_cnt > 1)
723  {
724  jam();
725  ctx.m_keyPtr.i = handle.m_ptr[ScanFragReq::KeyInfoSectionNum].i;
726  }
727  else
728  {
729  jam();
730  ctx.m_keyPtr.i = RNIL;
731  }
732 
733  err = build(ctx, requestPtr, treeReader, paramReader);
734  if (unlikely(err != 0))
735  break;
736  }
737 
738  ndbassert(requestPtr.p->isScan());
739  ndbassert(requestPtr.p->m_node_cnt == cnt);
740  err = DbspjErr::InvalidRequest;
741  if (unlikely(!requestPtr.p->isScan() || requestPtr.p->m_node_cnt != cnt))
742  break;
743 
747  store_scan(requestPtr);
748 
749  release(ssPtr);
750  handle.clear();
751 
752  start(signal, requestPtr);
753  return;
754  } while (0);
755 
756  if (!requestPtr.isNull())
757  {
758  jam();
759  m_request_pool.release(requestPtr);
760  }
761  releaseSections(handle);
762  handle_early_scanfrag_ref(signal, req, err);
763 }
764 
765 void
766 Dbspj::do_init(Request* requestP, const ScanFragReq* req, Uint32 senderRef)
767 {
768  requestP->m_bits = 0;
769  requestP->m_errCode = 0;
770  requestP->m_state = Request::RS_BUILDING;
771  requestP->m_node_cnt = 0;
772  requestP->m_cnt_active = 0;
773  requestP->m_rows = 0;
774  requestP->m_active_nodes.clear();
775  requestP->m_outstanding = 0;
776  requestP->m_senderRef = senderRef;
777  requestP->m_senderData = req->senderData;
778  requestP->m_transId[0] = req->transId1;
779  requestP->m_transId[1] = req->transId2;
780  requestP->m_rootResultData = req->resultData;
781  bzero(requestP->m_lookup_node_data, sizeof(requestP->m_lookup_node_data));
782 #ifdef SPJ_TRACE_TIME
783  requestP->m_cnt_batches = 0;
784  requestP->m_sum_rows = 0;
785  requestP->m_sum_running = 0;
786  requestP->m_sum_waiting = 0;
787  requestP->m_save_time = spj_now();
788 #endif
789 }
790 
791 void
792 Dbspj::store_scan(Ptr<Request> requestPtr)
793 {
794  ndbassert(requestPtr.p->isScan());
795  Ptr<Request> tmp;
796  bool found = m_scan_request_hash.find(tmp, *requestPtr.p);
797  ndbrequire(found == false);
798  m_scan_request_hash.add(requestPtr);
799 }
800 
801 void
802 Dbspj::handle_early_scanfrag_ref(Signal* signal,
803  const ScanFragReq * _req,
804  Uint32 err)
805 {
806  ScanFragReq req = *_req;
807  Uint32 senderRef = signal->getSendersBlockRef();
808 
809  ScanFragRef * ref = (ScanFragRef*)&signal->theData[0];
810  ref->senderData = req.senderData;
811  ref->transId1 = req.transId1;
812  ref->transId2 = req.transId2;
813  ref->errorCode = err;
814  sendSignal(senderRef, GSN_SCAN_FRAGREF, signal,
815  ScanFragRef::SignalLength, JBB);
816 }
817 
825 Uint32
826 Dbspj::build(Build_context& ctx,
827  Ptr<Request> requestPtr,
828  SectionReader & tree,
829  SectionReader & param)
830 {
831  Uint32 tmp0, tmp1;
832  Uint32 err = DbspjErr::ZeroLengthQueryTree;
833  ctx.m_cnt = 0;
834  ctx.m_scan_cnt = 0;
835 
836  tree.getWord(&tmp0);
837  Uint32 loop = QueryTree::getNodeCnt(tmp0);
838 
839  DEBUG("::build()");
840  err = DbspjErr::InvalidTreeNodeCount;
841  if (loop == 0 || loop > NDB_SPJ_MAX_TREE_NODES)
842  {
843  DEBUG_CRASH();
844  goto error;
845  }
846 
847  while (ctx.m_cnt < loop)
848  {
849  DEBUG(" - loop " << ctx.m_cnt << " pos: " << tree.getPos().currPos);
850  tree.peekWord(&tmp0);
851  param.peekWord(&tmp1);
852  Uint32 node_op = QueryNode::getOpType(tmp0);
853  Uint32 node_len = QueryNode::getLength(tmp0);
854  Uint32 param_op = QueryNodeParameters::getOpType(tmp1);
855  Uint32 param_len = QueryNodeParameters::getLength(tmp1);
856 
857  err = DbspjErr::QueryNodeTooBig;
858  if (unlikely(node_len >= NDB_ARRAY_SIZE(m_buffer0)))
859  {
860  DEBUG_CRASH();
861  goto error;
862  }
863 
864  err = DbspjErr::QueryNodeParametersTooBig;
865  if (unlikely(param_len >= NDB_ARRAY_SIZE(m_buffer1)))
866  {
867  DEBUG_CRASH();
868  goto error;
869  }
870 
871  err = DbspjErr::InvalidTreeNodeSpecification;
872  if (unlikely(tree.getWords(m_buffer0, node_len) == false))
873  {
874  DEBUG_CRASH();
875  goto error;
876  }
877 
878  err = DbspjErr::InvalidTreeParametersSpecification;
879  if (unlikely(param.getWords(m_buffer1, param_len) == false))
880  {
881  DEBUG_CRASH();
882  goto error;
883  }
884 
885 #if defined(DEBUG_LQHKEYREQ) || defined(DEBUG_SCAN_FRAGREQ)
886  printf("node: ");
887  for (Uint32 i = 0; i<node_len; i++)
888  printf("0x%.8x ", m_buffer0[i]);
889  printf("\n");
890 
891  printf("param: ");
892  for (Uint32 i = 0; i<param_len; i++)
893  printf("0x%.8x ", m_buffer1[i]);
894  printf("\n");
895 #endif
896 
897  err = DbspjErr::UnknowQueryOperation;
898  if (unlikely(node_op != param_op))
899  {
900  DEBUG_CRASH();
901  goto error;
902  }
903 
904  const OpInfo* info = getOpInfo(node_op);
905  if (unlikely(info == 0))
906  {
907  DEBUG_CRASH();
908  goto error;
909  }
910 
911  QueryNode* qn = (QueryNode*)m_buffer0;
912  QueryNodeParameters * qp = (QueryNodeParameters*)m_buffer1;
913  qn->len = node_len;
914  qp->len = param_len;
915  err = (this->*(info->m_build))(ctx, requestPtr, qn, qp);
916  if (unlikely(err != 0))
917  {
918  DEBUG_CRASH();
919  goto error;
920  }
921 
925  ctx.m_start_signal = 0;
926 
930  ndbrequire(ctx.m_cnt < NDB_ARRAY_SIZE(ctx.m_node_list));
931  ctx.m_cnt++;
932  }
933  requestPtr.p->m_node_cnt = ctx.m_cnt;
934 
939  if (requestPtr.p->m_bits & Request::RT_ROW_BUFFERS)
940  {
941  Ptr<TreeNode> treeNodePtr;
942  Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
943  for (list.first(treeNodePtr); !treeNodePtr.isNull(); list.next(treeNodePtr))
944  {
945  if (treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER_MAP)
946  {
947  jam();
948  treeNodePtr.p->m_row_map.init();
949  }
950  else if (treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER)
951  {
952  jam();
953  treeNodePtr.p->m_row_list.init();
954  }
955  }
956  }
957 
958  if (ctx.m_scan_cnt > 1)
959  {
960  jam();
961  requestPtr.p->m_bits |= Request::RT_MULTI_SCAN;
962 
971  if (requestPtr.p->m_bits & Request::RT_ROW_BUFFERS)
972  {
973  jam();
974  requestPtr.p->m_bits |= Request::RT_VAR_ALLOC;
975  }
976  }
977 
978  return 0;
979 
980 error:
981  jam();
982  return err;
983 }
984 
985 Uint32
986 Dbspj::createNode(Build_context& ctx, Ptr<Request> requestPtr,
987  Ptr<TreeNode> & treeNodePtr)
988 {
994  if (m_treenode_pool.seize(requestPtr.p->m_arena, treeNodePtr))
995  {
996  DEBUG("createNode - seize -> ptrI: " << treeNodePtr.i);
997  new (treeNodePtr.p) TreeNode(requestPtr.i);
998  ctx.m_node_list[ctx.m_cnt] = treeNodePtr;
999  Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
1000  list.addLast(treeNodePtr);
1001  treeNodePtr.p->m_node_no = ctx.m_cnt;
1002  return 0;
1003  }
1004  return DbspjErr::OutOfOperations;
1005 }
1006 
1007 void
1008 Dbspj::start(Signal* signal,
1009  Ptr<Request> requestPtr)
1010 {
1011  if (requestPtr.p->m_bits & Request::RT_NEED_PREPARE)
1012  {
1013  jam();
1014  requestPtr.p->m_outstanding = 0;
1015  requestPtr.p->m_state = Request::RS_PREPARING;
1016 
1017  Ptr<TreeNode> nodePtr;
1018  Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
1019  for (list.first(nodePtr); !nodePtr.isNull(); list.next(nodePtr))
1020  {
1021  jam();
1022  ndbrequire(nodePtr.p->m_info != 0);
1023  if (nodePtr.p->m_info->m_prepare != 0)
1024  {
1025  jam();
1026  (this->*(nodePtr.p->m_info->m_prepare))(signal, requestPtr, nodePtr);
1027  }
1028  }
1029 
1034  ndbassert(requestPtr.p->m_outstanding);
1035  }
1036 
1037  checkPrepareComplete(signal, requestPtr, 0);
1038 }
1039 
1040 void
1041 Dbspj::checkPrepareComplete(Signal * signal, Ptr<Request> requestPtr,
1042  Uint32 cnt)
1043 {
1044  ndbrequire(requestPtr.p->m_outstanding >= cnt);
1045  requestPtr.p->m_outstanding -= cnt;
1046 
1047  if (requestPtr.p->m_outstanding == 0)
1048  {
1049  jam();
1050 
1051  if (unlikely((requestPtr.p->m_state & Request::RS_ABORTING) != 0))
1052  {
1053  jam();
1054  batchComplete(signal, requestPtr);
1055  return;
1056  }
1057 
1058  requestPtr.p->m_state = Request::RS_RUNNING;
1059  Ptr<TreeNode> nodePtr;
1060  {
1061  Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
1062  ndbrequire(list.first(nodePtr));
1063  }
1064  ndbrequire(nodePtr.p->m_info != 0 && nodePtr.p->m_info->m_start != 0);
1065  (this->*(nodePtr.p->m_info->m_start))(signal, requestPtr, nodePtr);
1066  }
1067 }
1068 
1069 void
1070 Dbspj::checkBatchComplete(Signal * signal, Ptr<Request> requestPtr,
1071  Uint32 cnt)
1072 {
1073  ndbrequire(requestPtr.p->m_outstanding >= cnt);
1074  requestPtr.p->m_outstanding -= cnt;
1075 
1076  if (requestPtr.p->m_outstanding == 0)
1077  {
1078  jam();
1079  batchComplete(signal, requestPtr);
1080  }
1081 }
1082 
1083 void
1084 Dbspj::batchComplete(Signal* signal, Ptr<Request> requestPtr)
1085 {
1086  ndbrequire(requestPtr.p->m_outstanding == 0); // "definition" of batchComplete
1087 
1088  bool is_complete = requestPtr.p->m_cnt_active == 0;
1089  bool need_complete_phase = requestPtr.p->m_bits & Request::RT_NEED_COMPLETE;
1090 
1091  if (requestPtr.p->isLookup())
1092  {
1093  ndbassert(requestPtr.p->m_cnt_active == 0);
1094  }
1095 
1096  if (!is_complete || (is_complete && need_complete_phase == false))
1097  {
1103  jam();
1104 
1105  if ((requestPtr.p->m_state & Request::RS_ABORTING) != 0)
1106  {
1107  ndbassert(is_complete);
1108  }
1109 
1110  prepareNextBatch(signal, requestPtr);
1111  sendConf(signal, requestPtr, is_complete);
1112  }
1113  else if (is_complete && need_complete_phase)
1114  {
1115  jam();
1119  complete(signal, requestPtr);
1120  return;
1121  }
1122 
1123  if (requestPtr.p->m_cnt_active == 0)
1124  {
1125  jam();
1129  cleanup(requestPtr);
1130  }
1131  else if ((requestPtr.p->m_bits & Request::RT_MULTI_SCAN) != 0)
1132  {
1133  jam();
1137  releaseScanBuffers(requestPtr);
1138  }
1139  else if ((requestPtr.p->m_bits & Request::RT_ROW_BUFFERS) != 0)
1140  {
1141  jam();
1146  releaseRequestBuffers(requestPtr, true);
1147  }
1148 }
1149 
1156 void
1157 Dbspj::prepareNextBatch(Signal* signal, Ptr<Request> requestPtr)
1158 {
1159  requestPtr.p->m_cursor_nodes.init();
1160  requestPtr.p->m_active_nodes.clear();
1161 
1162  if (requestPtr.p->m_cnt_active == 0)
1163  {
1164  jam();
1165  return;
1166  }
1167 
1168  if (requestPtr.p->m_bits & Request::RT_REPEAT_SCAN_RESULT)
1169  {
1184  jam();
1185  Ptr<TreeNode> nodePtr;
1186  Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
1187 
1192  for (list.last(nodePtr); !nodePtr.isNull(); list.prev(nodePtr))
1193  {
1194  if (nodePtr.p->m_state == TreeNode::TN_ACTIVE)
1195  {
1196  jam();
1197  DEBUG("Will fetch more from 'active' m_node_no: " << nodePtr.p->m_node_no);
1201  registerActiveCursor(requestPtr, nodePtr);
1202  break;
1203  }
1204  }
1205 
1213  if (!nodePtr.isNull())
1214  {
1215  jam();
1216  DEBUG("Calculate 'active', w/ cursor on m_node_no: " << nodePtr.p->m_node_no);
1217 
1218  /* Restart any partial index-scans after this 'TN_ACTIVE' TreeNode */
1219  for (list.next(nodePtr); !nodePtr.isNull(); list.next(nodePtr))
1220  {
1221  jam();
1222  if (!nodePtr.p->m_ancestors.overlaps (requestPtr.p->m_active_nodes))
1223  {
1224  jam();
1225  ndbrequire(nodePtr.p->m_state != TreeNode::TN_ACTIVE);
1226  ndbrequire(nodePtr.p->m_info != 0);
1227  if (nodePtr.p->m_info->m_parent_batch_repeat != 0)
1228  {
1229  jam();
1230  (this->*(nodePtr.p->m_info->m_parent_batch_repeat))(signal,
1231  requestPtr,
1232  nodePtr);
1233  }
1234  }
1235  }
1236  } // if (!nodePtr.isNull()
1237  }
1238  else // not 'RT_REPEAT_SCAN_RESULT'
1239  {
1248  jam();
1249  Ptr<TreeNode> nodePtr;
1250  Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
1251  TreeNodeBitMask ancestors_of_active;
1252 
1253  for (list.last(nodePtr); !nodePtr.isNull(); list.prev(nodePtr))
1254  {
1262  if (nodePtr.p->m_state == TreeNode::TN_ACTIVE &&
1263  !ancestors_of_active.get (nodePtr.p->m_node_no))
1264  {
1265  jam();
1266  DEBUG("Add 'active' m_node_no: " << nodePtr.p->m_node_no);
1267  registerActiveCursor(requestPtr, nodePtr);
1268  ancestors_of_active.bitOR(nodePtr.p->m_ancestors);
1269  }
1270  }
1271  } // if (RT_REPEAT_SCAN_RESULT)
1272 
1273  DEBUG("Calculated 'm_active_nodes': " << requestPtr.p->m_active_nodes.rep.data[0]);
1274 }
1275 
1276 void
1277 Dbspj::sendConf(Signal* signal, Ptr<Request> requestPtr, bool is_complete)
1278 {
1279  if (requestPtr.p->isScan())
1280  {
1281  if (unlikely((requestPtr.p->m_state & Request::RS_WAITING) != 0))
1282  {
1283  jam();
1288  ndbrequire(is_complete);
1289  ndbrequire((requestPtr.p->m_state & Request::RS_ABORTING) != 0);
1290  return;
1291  }
1292 
1293  if (requestPtr.p->m_errCode == 0)
1294  {
1295  jam();
1296  ScanFragConf * conf=
1297  reinterpret_cast<ScanFragConf*>(signal->getDataPtrSend());
1298  conf->senderData = requestPtr.p->m_senderData;
1299  conf->transId1 = requestPtr.p->m_transId[0];
1300  conf->transId2 = requestPtr.p->m_transId[1];
1301  conf->completedOps = requestPtr.p->m_rows;
1302  conf->fragmentCompleted = is_complete ? 1 : 0;
1303  conf->total_len = requestPtr.p->m_active_nodes.rep.data[0];
1304 
1305  c_Counters.incr_counter(CI_SCAN_BATCHES_RETURNED, 1);
1306  c_Counters.incr_counter(CI_SCAN_ROWS_RETURNED, requestPtr.p->m_rows);
1307 
1308 #ifdef SPJ_TRACE_TIME
1309  Uint64 now = spj_now();
1310  Uint64 then = requestPtr.p->m_save_time;
1311 
1312  requestPtr.p->m_sum_rows += requestPtr.p->m_rows;
1313  requestPtr.p->m_sum_running += Uint32(now - then);
1314  requestPtr.p->m_cnt_batches++;
1315  requestPtr.p->m_save_time = now;
1316 
1317  if (is_complete)
1318  {
1319  Uint32 cnt = requestPtr.p->m_cnt_batches;
1320  ndbout_c("batches: %u avg_rows: %u avg_running: %u avg_wait: %u",
1321  cnt,
1322  (requestPtr.p->m_sum_rows / cnt),
1323  (requestPtr.p->m_sum_running / cnt),
1324  cnt == 1 ? 0 : requestPtr.p->m_sum_waiting / (cnt - 1));
1325  }
1326 #endif
1327 
1331  requestPtr.p->m_rows = 0;
1332  if (!is_complete)
1333  {
1334  jam();
1335  requestPtr.p->m_state |= Request::RS_WAITING;
1336  }
1337 #ifdef DEBUG_SCAN_FRAGREQ
1338  ndbout_c("Dbspj::sendConf() sending SCAN_FRAGCONF ");
1339  printSCAN_FRAGCONF(stdout, signal->getDataPtrSend(),
1340  conf->total_len,
1341  DBLQH);
1342 #endif
1343  sendSignal(requestPtr.p->m_senderRef, GSN_SCAN_FRAGCONF, signal,
1344  ScanFragConf::SignalLength, JBB);
1345  }
1346  else
1347  {
1348  jam();
1349  ndbrequire(is_complete);
1350  ScanFragRef * ref=
1351  reinterpret_cast<ScanFragRef*>(signal->getDataPtrSend());
1352  ref->senderData = requestPtr.p->m_senderData;
1353  ref->transId1 = requestPtr.p->m_transId[0];
1354  ref->transId2 = requestPtr.p->m_transId[1];
1355  ref->errorCode = requestPtr.p->m_errCode;
1356 
1357  sendSignal(requestPtr.p->m_senderRef, GSN_SCAN_FRAGREF, signal,
1358  ScanFragRef::SignalLength, JBB);
1359  }
1360  }
1361  else
1362  {
1363  ndbassert(is_complete);
1364  if (requestPtr.p->m_errCode)
1365  {
1366  jam();
1367  Uint32 resultRef = getResultRef(requestPtr);
1368  TcKeyRef* ref = (TcKeyRef*)signal->getDataPtr();
1369  ref->connectPtr = requestPtr.p->m_senderData;
1370  ref->transId[0] = requestPtr.p->m_transId[0];
1371  ref->transId[1] = requestPtr.p->m_transId[1];
1372  ref->errorCode = requestPtr.p->m_errCode;
1373  ref->errorData = 0;
1374 
1375  sendTCKEYREF(signal, resultRef, requestPtr.p->m_senderRef);
1376  }
1377  }
1378 }
1379 
1380 Uint32
1381 Dbspj::getResultRef(Ptr<Request> requestPtr)
1382 {
1383  Ptr<TreeNode> nodePtr;
1384  Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
1385  for (list.first(nodePtr); !nodePtr.isNull(); list.next(nodePtr))
1386  {
1387  if (nodePtr.p->m_info == &g_LookupOpInfo)
1388  {
1389  jam();
1390  return nodePtr.p->m_lookup_data.m_api_resultRef;
1391  }
1392  }
1393  ndbrequire(false);
1394  return 0;
1395 }
1396 
1397 void
1398 Dbspj::releaseScanBuffers(Ptr<Request> requestPtr)
1399 {
1400  Ptr<TreeNode> treeNodePtr;
1401  Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
1402  TreeNodeBitMask ancestors_of_active;
1403 
1404  for (list.last(treeNodePtr); !treeNodePtr.isNull(); list.prev(treeNodePtr))
1405  {
1410  if (!ancestors_of_active.get(treeNodePtr.p->m_node_no))
1411  {
1412  if (treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER)
1413  {
1414  jam();
1415  releaseNodeRows(requestPtr, treeNodePtr);
1416  }
1417 
1423  if (treeNodePtr.p->m_state == TreeNode::TN_ACTIVE ||
1424  requestPtr.p->m_active_nodes.get(treeNodePtr.p->m_node_no))
1425  {
1426  jam();
1427  cleanupChildBranch(requestPtr, treeNodePtr);
1428  }
1429  }
1430 
1435  if (treeNodePtr.p->m_state == TreeNode::TN_ACTIVE ||
1436  requestPtr.p->m_active_nodes.get(treeNodePtr.p->m_node_no))
1437  {
1438  ancestors_of_active.bitOR(treeNodePtr.p->m_ancestors);
1439  }
1440  }
1445  ndbrequire(requestPtr.p->m_cnt_active >= 1);
1446 }
1447 
1448 void
1449 Dbspj::registerActiveCursor(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr)
1450 {
1451  Uint32 bit = treeNodePtr.p->m_node_no;
1452  ndbrequire(!requestPtr.p->m_active_nodes.get(bit));
1453  requestPtr.p->m_active_nodes.set(bit);
1454 
1455  Local_TreeNodeCursor_list list(m_treenode_pool, requestPtr.p->m_cursor_nodes);
1456 #ifdef VM_TRACE
1457  {
1458  Ptr<TreeNode> nodePtr;
1459  for (list.first(nodePtr); !nodePtr.isNull(); list.next(nodePtr))
1460  {
1461  ndbrequire(nodePtr.i != treeNodePtr.i);
1462  }
1463  }
1464 #endif
1465  list.add(treeNodePtr);
1466 }
1467 
1468 void
1469 Dbspj::cleanupChildBranch(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr)
1470 {
1471  LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
1472  Local_dependency_map list(pool, treeNodePtr.p->m_dependent_nodes);
1473  Dependency_map::ConstDataBufferIterator it;
1474  for (list.first(it); !it.isNull(); list.next(it))
1475  {
1476  jam();
1477  Ptr<TreeNode> childPtr;
1478  m_treenode_pool.getPtr(childPtr, *it.data);
1479  if (childPtr.p->m_info->m_parent_batch_cleanup != 0)
1480  {
1481  jam();
1482  (this->*(childPtr.p->m_info->m_parent_batch_cleanup))(requestPtr,
1483  childPtr);
1484  }
1485  cleanupChildBranch(requestPtr,childPtr);
1486  }
1487 }
1488 
1489 void
1490 Dbspj::releaseNodeRows(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr)
1491 {
1496  // only when var-alloc, or else stack will be popped wo/ consideration
1497  // to individual rows
1498  ndbassert(requestPtr.p->m_bits & Request::RT_VAR_ALLOC);
1499  ndbassert(treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER);
1500 
1504  if ((treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER_MAP) == 0)
1505  {
1506  jam();
1507  Uint32 cnt = 0;
1508  SLFifoRowListIterator iter;
1509  for (first(requestPtr, treeNodePtr, iter); !iter.isNull(); )
1510  {
1511  jam();
1512  RowRef pos = iter.m_ref;
1513  next(iter);
1514  releaseRow(requestPtr, pos);
1515  cnt ++;
1516  }
1517  treeNodePtr.p->m_row_list.init();
1518  DEBUG("SLFifoRowListIterator: released " << cnt << " rows!");
1519  }
1520  else
1521  {
1522  jam();
1523  Uint32 cnt = 0;
1524  RowMapIterator iter;
1525  for (first(requestPtr, treeNodePtr, iter); !iter.isNull(); )
1526  {
1527  jam();
1528  RowRef pos = iter.m_ref;
1529  // this could be made more efficient by not actually seting up m_row_ptr
1530  next(iter);
1531  releaseRow(requestPtr, pos);
1532  cnt++;
1533  }
1534  treeNodePtr.p->m_row_map.init();
1535  DEBUG("RowMapIterator: released " << cnt << " rows!");
1536  }
1537 }
1538 
1539 void
1540 Dbspj::releaseRow(Ptr<Request> requestPtr, RowRef pos)
1541 {
1542  ndbassert(requestPtr.p->m_bits & Request::RT_VAR_ALLOC);
1543  ndbassert(pos.m_allocator == 1);
1544  Ptr<RowPage> ptr;
1545  m_page_pool.getPtr(ptr, pos.m_page_id);
1546  ((Var_page*)ptr.p)->free_record(pos.m_page_pos, Var_page::CHAIN);
1547  Uint32 free_space = ((Var_page*)ptr.p)->free_space;
1548  if (free_space == 0)
1549  {
1550  jam();
1551  LocalDLFifoList<RowPage> list(m_page_pool,
1552  requestPtr.p->m_rowBuffer.m_page_list);
1553  list.remove(ptr);
1554  releasePage(ptr);
1555  }
1556  else if (free_space > requestPtr.p->m_rowBuffer.m_var.m_free)
1557  {
1558  LocalDLFifoList<RowPage> list(m_page_pool,
1559  requestPtr.p->m_rowBuffer.m_page_list);
1560  list.remove(ptr);
1561  list.addLast(ptr);
1562  requestPtr.p->m_rowBuffer.m_var.m_free = free_space;
1563  }
1564 }
1565 
1566 void
1567 Dbspj::releaseRequestBuffers(Ptr<Request> requestPtr, bool reset)
1568 {
1572  {
1573  {
1574  LocalDLFifoList<RowPage> list(m_page_pool,
1575  requestPtr.p->m_rowBuffer.m_page_list);
1576  if (!list.isEmpty())
1577  {
1578  jam();
1579  Ptr<RowPage> first, last;
1580  list.first(first);
1581  list.last(last);
1582  releasePages(first.i, last);
1583  list.remove();
1584  }
1585  }
1586  requestPtr.p->m_rowBuffer.stack_init();
1587  }
1588 
1589  if (reset)
1590  {
1591  Ptr<TreeNode> nodePtr;
1592  Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
1593  for (list.first(nodePtr); !nodePtr.isNull(); list.next(nodePtr))
1594  {
1595  jam();
1596  if (nodePtr.p->m_bits & TreeNode::T_ROW_BUFFER)
1597  {
1598  jam();
1599  if (nodePtr.p->m_bits & TreeNode::T_ROW_BUFFER_MAP)
1600  {
1601  jam();
1602  nodePtr.p->m_row_map.init();
1603  }
1604  else
1605  {
1606  nodePtr.p->m_row_list.init();
1607  }
1608  }
1609  }
1610  }
1611 }
1612 
1613 void
1614 Dbspj::reportBatchComplete(Signal * signal, Ptr<Request> requestPtr,
1615  Ptr<TreeNode> treeNodePtr)
1616 {
1617  LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
1618  Local_dependency_map list(pool, treeNodePtr.p->m_dependent_nodes);
1619  Dependency_map::ConstDataBufferIterator it;
1620  for (list.first(it); !it.isNull(); list.next(it))
1621  {
1622  jam();
1623  Ptr<TreeNode> childPtr;
1624  m_treenode_pool.getPtr(childPtr, * it.data);
1625  if (childPtr.p->m_bits & TreeNode::T_NEED_REPORT_BATCH_COMPLETED)
1626  {
1627  jam();
1628  ndbrequire(childPtr.p->m_info != 0 &&
1629  childPtr.p->m_info->m_parent_batch_complete !=0 );
1630  (this->*(childPtr.p->m_info->m_parent_batch_complete))(signal,
1631  requestPtr,
1632  childPtr);
1633  }
1634  }
1635 }
1636 
1637 void
1638 Dbspj::abort(Signal* signal, Ptr<Request> requestPtr, Uint32 errCode)
1639 {
1640  jam();
1641 
1642  if ((requestPtr.p->m_state & Request::RS_ABORTING) != 0)
1643  {
1644  jam();
1645  goto checkcomplete;
1646  }
1647 
1648  requestPtr.p->m_state |= Request::RS_ABORTING;
1649  requestPtr.p->m_errCode = errCode;
1650 
1651  {
1652  Ptr<TreeNode> nodePtr;
1653  Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
1654  for (list.first(nodePtr); !nodePtr.isNull(); list.next(nodePtr))
1655  {
1656  jam();
1661  nodePtr.p->m_bits &= ~Uint32(TreeNode::T_REPORT_BATCH_COMPLETE);
1662 
1663  ndbrequire(nodePtr.p->m_info != 0);
1664  if (nodePtr.p->m_info->m_abort != 0)
1665  {
1666  jam();
1667  (this->*(nodePtr.p->m_info->m_abort))(signal, requestPtr, nodePtr);
1668  }
1669  }
1670  }
1671 
1672 checkcomplete:
1673  checkBatchComplete(signal, requestPtr, 0);
1674 }
1675 
1676 Uint32
1677 Dbspj::nodeFail(Signal* signal, Ptr<Request> requestPtr,
1678  NdbNodeBitmask nodes)
1679 {
1680  Uint32 cnt = 0;
1681  Uint32 iter = 0;
1682 
1683  {
1684  Ptr<TreeNode> nodePtr;
1685  Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
1686  for (list.first(nodePtr); !nodePtr.isNull(); list.next(nodePtr))
1687  {
1688  jam();
1689  ndbrequire(nodePtr.p->m_info != 0);
1690  if (nodePtr.p->m_info->m_execNODE_FAILREP != 0)
1691  {
1692  jam();
1693  iter ++;
1694  cnt += (this->*(nodePtr.p->m_info->m_execNODE_FAILREP))(signal,
1695  requestPtr,
1696  nodePtr, nodes);
1697  }
1698  }
1699  }
1700 
1701  if (cnt == 0)
1702  {
1703  jam();
1709  if (requestPtr.p->isScan() &&
1710  nodes.get(refToNode(requestPtr.p->m_senderRef)))
1711  {
1712  jam();
1713  abort(signal, requestPtr, DbspjErr::NodeFailure);
1714  }
1715  }
1716  else
1717  {
1718  jam();
1719  abort(signal, requestPtr, DbspjErr::NodeFailure);
1720  }
1721 
1722  return cnt + iter;
1723 }
1724 
1725 void
1726 Dbspj::complete(Signal* signal, Ptr<Request> requestPtr)
1727 {
1731  Uint32 flags = requestPtr.p->m_state &
1732  (Request::RS_ABORTING | Request::RS_WAITING);
1733 
1734  requestPtr.p->m_state = Request::RS_COMPLETING | flags;
1735 
1736  // clear bit so that next batchComplete()
1737  // will continue to cleanup
1738  ndbassert((requestPtr.p->m_bits & Request::RT_NEED_COMPLETE) != 0);
1739  requestPtr.p->m_bits &= ~(Uint32)Request::RT_NEED_COMPLETE;
1740  requestPtr.p->m_outstanding = 0;
1741  {
1742  Ptr<TreeNode> nodePtr;
1743  Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
1744  for (list.first(nodePtr); !nodePtr.isNull(); list.next(nodePtr))
1745  {
1746  jam();
1747  ndbrequire(nodePtr.p->m_info != 0);
1748  if (nodePtr.p->m_info->m_complete != 0)
1749  {
1750  jam();
1751  (this->*(nodePtr.p->m_info->m_complete))(signal, requestPtr, nodePtr);
1752  }
1753  }
1754 
1763  //ndbassert(requestPtr.p->m_outstanding);
1764  }
1765  checkBatchComplete(signal, requestPtr, 0);
1766 }
1767 
1768 void
1769 Dbspj::cleanup(Ptr<Request> requestPtr)
1770 {
1771  ndbrequire(requestPtr.p->m_cnt_active == 0);
1772  {
1773  Ptr<TreeNode> nodePtr;
1774  Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
1775  for (list.first(nodePtr); !nodePtr.isNull(); )
1776  {
1777  jam();
1778  ndbrequire(nodePtr.p->m_info != 0 && nodePtr.p->m_info->m_cleanup != 0);
1779  (this->*(nodePtr.p->m_info->m_cleanup))(requestPtr, nodePtr);
1780 
1781  Ptr<TreeNode> tmp = nodePtr;
1782  list.next(nodePtr);
1783  m_treenode_pool.release(tmp);
1784  }
1785  list.remove();
1786  }
1787  if (requestPtr.p->isScan())
1788  {
1789  jam();
1790 
1791  if (unlikely((requestPtr.p->m_state & Request::RS_WAITING) != 0))
1792  {
1793  jam();
1794  requestPtr.p->m_state = Request::RS_ABORTED;
1795  return;
1796  }
1797 
1798 #ifdef VM_TRACE
1799  {
1800  Request key;
1801  key.m_transId[0] = requestPtr.p->m_transId[0];
1802  key.m_transId[1] = requestPtr.p->m_transId[1];
1803  key.m_senderData = requestPtr.p->m_senderData;
1804  Ptr<Request> tmp;
1805  ndbrequire(m_scan_request_hash.find(tmp, key));
1806  }
1807 #endif
1808  m_scan_request_hash.remove(requestPtr);
1809  }
1810  else
1811  {
1812  jam();
1813 #ifdef VM_TRACE
1814  {
1815  Request key;
1816  key.m_transId[0] = requestPtr.p->m_transId[0];
1817  key.m_transId[1] = requestPtr.p->m_transId[1];
1818  key.m_senderData = requestPtr.p->m_senderData;
1819  Ptr<Request> tmp;
1820  ndbrequire(m_lookup_request_hash.find(tmp, key));
1821  }
1822 #endif
1823  m_lookup_request_hash.remove(requestPtr);
1824  }
1825  releaseRequestBuffers(requestPtr, false);
1826  ArenaHead ah = requestPtr.p->m_arena;
1827  m_request_pool.release(requestPtr);
1828  m_arenaAllocator.release(ah);
1829 }
1830 
1831 void
1832 Dbspj::cleanup_common(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr)
1833 {
1834  jam();
1835 
1836  LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
1837  {
1838  Local_dependency_map list(pool, treeNodePtr.p->m_dependent_nodes);
1839  list.release();
1840  }
1841 
1842  {
1843  Local_pattern_store pattern(pool, treeNodePtr.p->m_keyPattern);
1844  pattern.release();
1845  }
1846 
1847  {
1848  Local_pattern_store pattern(pool, treeNodePtr.p->m_attrParamPattern);
1849  pattern.release();
1850  }
1851 
1852  if (treeNodePtr.p->m_send.m_keyInfoPtrI != RNIL)
1853  {
1854  jam();
1855  releaseSection(treeNodePtr.p->m_send.m_keyInfoPtrI);
1856  }
1857 
1858  if (treeNodePtr.p->m_send.m_attrInfoPtrI != RNIL)
1859  {
1860  jam();
1861  releaseSection(treeNodePtr.p->m_send.m_attrInfoPtrI);
1862  }
1863 }
1864 
1868 void
1869 Dbspj::execLQHKEYREF(Signal* signal)
1870 {
1871  jamEntry();
1872 
1873  const LqhKeyRef* ref = reinterpret_cast<const LqhKeyRef*>(signal->getDataPtr());
1874 
1875  DEBUG("execLQHKEYREF, errorCode:" << ref->errorCode);
1876  Ptr<TreeNode> treeNodePtr;
1877  m_treenode_pool.getPtr(treeNodePtr, ref->connectPtr);
1878 
1879  Ptr<Request> requestPtr;
1880  m_request_pool.getPtr(requestPtr, treeNodePtr.p->m_requestPtrI);
1881 
1882  ndbrequire(treeNodePtr.p->m_info && treeNodePtr.p->m_info->m_execLQHKEYREF);
1883  (this->*(treeNodePtr.p->m_info->m_execLQHKEYREF))(signal,
1884  requestPtr,
1885  treeNodePtr);
1886 }
1887 
1888 void
1889 Dbspj::execLQHKEYCONF(Signal* signal)
1890 {
1891  jamEntry();
1892 
1893  DEBUG("execLQHKEYCONF");
1894 
1895  const LqhKeyConf* conf = reinterpret_cast<const LqhKeyConf*>(signal->getDataPtr());
1896  Ptr<TreeNode> treeNodePtr;
1897  m_treenode_pool.getPtr(treeNodePtr, conf->opPtr);
1898 
1899  Ptr<Request> requestPtr;
1900  m_request_pool.getPtr(requestPtr, treeNodePtr.p->m_requestPtrI);
1901 
1902  ndbrequire(treeNodePtr.p->m_info && treeNodePtr.p->m_info->m_execLQHKEYCONF);
1903  (this->*(treeNodePtr.p->m_info->m_execLQHKEYCONF))(signal,
1904  requestPtr,
1905  treeNodePtr);
1906 }
1907 
1908 void
1909 Dbspj::execSCAN_FRAGREF(Signal* signal)
1910 {
1911  jamEntry();
1912  const ScanFragRef* ref = reinterpret_cast<const ScanFragRef*>(signal->getDataPtr());
1913 
1914  DEBUG("execSCAN_FRAGREF, errorCode:" << ref->errorCode);
1915 
1916  Ptr<ScanFragHandle> scanFragHandlePtr;
1917  m_scanfraghandle_pool.getPtr(scanFragHandlePtr, ref->senderData);
1918  Ptr<TreeNode> treeNodePtr;
1919  m_treenode_pool.getPtr(treeNodePtr, scanFragHandlePtr.p->m_treeNodePtrI);
1920  Ptr<Request> requestPtr;
1921  m_request_pool.getPtr(requestPtr, treeNodePtr.p->m_requestPtrI);
1922 
1923  ndbrequire(treeNodePtr.p->m_info&&treeNodePtr.p->m_info->m_execSCAN_FRAGREF);
1924  (this->*(treeNodePtr.p->m_info->m_execSCAN_FRAGREF))(signal,
1925  requestPtr,
1926  treeNodePtr,
1927  scanFragHandlePtr);
1928 }
1929 
1930 void
1931 Dbspj::execSCAN_HBREP(Signal* signal)
1932 {
1933  jamEntry();
1934 
1935  Uint32 senderData = signal->theData[0];
1936  //Uint32 transId[2] = { signal->theData[1], signal->theData[2] };
1937 
1938  Ptr<ScanFragHandle> scanFragHandlePtr;
1939  m_scanfraghandle_pool.getPtr(scanFragHandlePtr, senderData);
1940  Ptr<TreeNode> treeNodePtr;
1941  m_treenode_pool.getPtr(treeNodePtr, scanFragHandlePtr.p->m_treeNodePtrI);
1942  Ptr<Request> requestPtr;
1943  m_request_pool.getPtr(requestPtr, treeNodePtr.p->m_requestPtrI);
1944 
1945  Uint32 ref = requestPtr.p->m_senderRef;
1946  signal->theData[0] = requestPtr.p->m_senderData;
1947  sendSignal(ref, GSN_SCAN_HBREP, signal, 3, JBB);
1948 }
1949 
1950 void
1951 Dbspj::execSCAN_FRAGCONF(Signal* signal)
1952 {
1953  jamEntry();
1954  DEBUG("execSCAN_FRAGCONF");
1955 
1956  const ScanFragConf* conf = reinterpret_cast<const ScanFragConf*>(signal->getDataPtr());
1957 
1958 #ifdef DEBUG_SCAN_FRAGREQ
1959  ndbout_c("Dbspj::execSCAN_FRAGCONF() receiveing SCAN_FRAGCONF ");
1960  printSCAN_FRAGCONF(stdout, signal->getDataPtrSend(),
1961  conf->total_len,
1962  DBLQH);
1963 #endif
1964 
1965  Ptr<ScanFragHandle> scanFragHandlePtr;
1966  m_scanfraghandle_pool.getPtr(scanFragHandlePtr, conf->senderData);
1967  Ptr<TreeNode> treeNodePtr;
1968  m_treenode_pool.getPtr(treeNodePtr, scanFragHandlePtr.p->m_treeNodePtrI);
1969  Ptr<Request> requestPtr;
1970  m_request_pool.getPtr(requestPtr, treeNodePtr.p->m_requestPtrI);
1971 
1972  ndbrequire(treeNodePtr.p->m_info&&treeNodePtr.p->m_info->m_execSCAN_FRAGCONF);
1973  (this->*(treeNodePtr.p->m_info->m_execSCAN_FRAGCONF))(signal,
1974  requestPtr,
1975  treeNodePtr,
1976  scanFragHandlePtr);
1977 }
1978 
1979 void
1980 Dbspj::execSCAN_NEXTREQ(Signal* signal)
1981 {
1982  jamEntry();
1983  const ScanFragNextReq * req = (ScanFragNextReq*)&signal->theData[0];
1984 
1985  DEBUG("Incomming SCAN_NEXTREQ");
1986 #ifdef DEBUG_SCAN_FRAGREQ
1987  printSCANFRAGNEXTREQ(stdout, &signal->theData[0],
1988  ScanFragNextReq::SignalLength, DBLQH);
1989 #endif
1990 
1991  Request key;
1992  key.m_transId[0] = req->transId1;
1993  key.m_transId[1] = req->transId2;
1994  key.m_senderData = req->senderData;
1995 
1996  Ptr<Request> requestPtr;
1997  if (unlikely(!m_scan_request_hash.find(requestPtr, key)))
1998  {
1999  jam();
2000  ndbrequire(req->requestInfo == ScanFragNextReq::ZCLOSE);
2001  return;
2002  }
2003 
2004 #ifdef SPJ_TRACE_TIME
2005  Uint64 now = spj_now();
2006  Uint64 then = requestPtr.p->m_save_time;
2007  requestPtr.p->m_sum_waiting += Uint32(now - then);
2008  requestPtr.p->m_save_time = now;
2009 #endif
2010 
2011  Uint32 state = requestPtr.p->m_state;
2012  requestPtr.p->m_state = state & ~Uint32(Request::RS_WAITING);
2013 
2014  if (unlikely(state == Request::RS_ABORTED))
2015  {
2016  jam();
2017  batchComplete(signal, requestPtr);
2018  return;
2019  }
2020 
2021  if (unlikely((state & Request::RS_ABORTING) != 0))
2022  {
2023  jam();
2028  return;
2029  }
2030 
2031  if (req->requestInfo == ScanFragNextReq::ZCLOSE) // Requested close scan
2032  {
2033  jam();
2034  abort(signal, requestPtr, 0);
2035  return;
2036  }
2037 
2038  ndbrequire((state & Request::RS_WAITING) != 0);
2039  ndbrequire(requestPtr.p->m_outstanding == 0);
2040 
2041  {
2045  Ptr<TreeNode> treeNodePtr;
2046  Local_TreeNodeCursor_list list(m_treenode_pool,
2047  requestPtr.p->m_cursor_nodes);
2048  Uint32 cnt_active = 0;
2049 
2050  for (list.first(treeNodePtr); !treeNodePtr.isNull(); list.next(treeNodePtr))
2051  {
2052  if (treeNodePtr.p->m_state == TreeNode::TN_ACTIVE)
2053  {
2054  jam();
2055  DEBUG("SCAN_NEXTREQ on TreeNode: " << treeNodePtr.i
2056  << ", m_node_no: " << treeNodePtr.p->m_node_no
2057  << ", w/ m_parentPtrI: " << treeNodePtr.p->m_parentPtrI);
2058 
2059  ndbrequire(treeNodePtr.p->m_info != 0 &&
2060  treeNodePtr.p->m_info->m_execSCAN_NEXTREQ != 0);
2061  (this->*(treeNodePtr.p->m_info->m_execSCAN_NEXTREQ))(signal,
2062  requestPtr,
2063  treeNodePtr);
2064  cnt_active++;
2065  }
2066  else
2067  {
2072  jam();
2073  ndbrequire(requestPtr.p->m_bits & Request::RT_REPEAT_SCAN_RESULT);
2074  DEBUG(" Restart TreeNode: " << treeNodePtr.i
2075  << ", m_node_no: " << treeNodePtr.p->m_node_no
2076  << ", w/ m_parentPtrI: " << treeNodePtr.p->m_parentPtrI);
2077 
2078  ndbrequire(treeNodePtr.p->m_info != 0 &&
2079  treeNodePtr.p->m_info->m_parent_batch_complete !=0 );
2080  (this->*(treeNodePtr.p->m_info->m_parent_batch_complete))(signal,
2081  requestPtr,
2082  treeNodePtr);
2083  }
2084  }
2085  /* Expected only a single ACTIVE TreeNode among the cursors */
2086  ndbrequire(cnt_active == 1 ||
2087  !(requestPtr.p->m_bits & Request::RT_REPEAT_SCAN_RESULT));
2088  }
2089 }
2090 
2091 void
2092 Dbspj::execTRANSID_AI(Signal* signal)
2093 {
2094  jamEntry();
2095  DEBUG("execTRANSID_AI");
2096  TransIdAI * req = (TransIdAI *)signal->getDataPtr();
2097  Uint32 ptrI = req->connectPtr;
2098  //Uint32 transId[2] = { req->transId[0], req->transId[1] };
2099 
2100  Ptr<TreeNode> treeNodePtr;
2101  m_treenode_pool.getPtr(treeNodePtr, ptrI);
2102  Ptr<Request> requestPtr;
2103  m_request_pool.getPtr(requestPtr, treeNodePtr.p->m_requestPtrI);
2104 
2105  ndbrequire(signal->getNoOfSections() != 0); // TODO check if this can happen
2106 
2107  SegmentedSectionPtr dataPtr;
2108  {
2109  SectionHandle handle(this, signal);
2110  handle.getSection(dataPtr, 0);
2111  handle.clear();
2112  }
2113 
2114 #if defined(DEBUG_LQHKEYREQ) || defined(DEBUG_SCAN_FRAGREQ)
2115  printf("execTRANSID_AI: ");
2116  print(dataPtr, stdout);
2117 #endif
2118 
2122  Uint32 tmp[2+MAX_ATTRIBUTES_IN_TABLE];
2123  RowPtr::Header* header = CAST_PTR(RowPtr::Header, &tmp[0]);
2124 
2125  Uint32 cnt = buildRowHeader(header, dataPtr);
2126  ndbassert(header->m_len < NDB_ARRAY_SIZE(tmp));
2127 
2128  struct RowPtr row;
2129  row.m_type = RowPtr::RT_SECTION;
2130  row.m_src_node_ptrI = treeNodePtr.i;
2131  row.m_row_data.m_section.m_header = header;
2132  row.m_row_data.m_section.m_dataPtr.assign(dataPtr);
2133 
2134  getCorrelationData(row.m_row_data.m_section,
2135  cnt - 1,
2136  row.m_src_correlation);
2137 
2138  if (treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER)
2139  {
2140  jam();
2141  Uint32 err = storeRow(requestPtr, treeNodePtr, row);
2142  ndbrequire(err == 0);
2143  }
2144 
2145  ndbrequire(treeNodePtr.p->m_info&&treeNodePtr.p->m_info->m_execTRANSID_AI);
2146 
2147  (this->*(treeNodePtr.p->m_info->m_execTRANSID_AI))(signal,
2148  requestPtr,
2149  treeNodePtr,
2150  row);
2151  release(dataPtr);
2152 }
2153 
2154 Uint32
2155 Dbspj::storeRow(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr, RowPtr &row)
2156 {
2157  ndbassert(row.m_type == RowPtr::RT_SECTION);
2158  SegmentedSectionPtr dataPtr = row.m_row_data.m_section.m_dataPtr;
2159  Uint32 * headptr = (Uint32*)row.m_row_data.m_section.m_header;
2160  Uint32 headlen = 1 + row.m_row_data.m_section.m_header->m_len;
2161 
2165  Uint32 linklen = (treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER_MAP)?
2166  0 : 2;
2167 
2168  Uint32 totlen = 0;
2169  totlen += dataPtr.sz;
2170  totlen += headlen;
2171  totlen += linklen;
2172 
2173  RowRef ref;
2174  Uint32 * dstptr = 0;
2175  if ((requestPtr.p->m_bits & Request::RT_VAR_ALLOC) == 0)
2176  {
2177  jam();
2178  dstptr = stackAlloc(requestPtr.p->m_rowBuffer, ref, totlen);
2179  }
2180  else
2181  {
2182  jam();
2183  dstptr = varAlloc(requestPtr.p->m_rowBuffer, ref, totlen);
2184  }
2185 
2186  if (unlikely(dstptr == 0))
2187  {
2188  jam();
2189  return DbspjErr::OutOfRowMemory;
2190  }
2191 
2192  row.m_type = RowPtr::RT_LINEAR;
2193  row.m_row_data.m_linear.m_row_ref = ref;
2194  row.m_row_data.m_linear.m_header = (RowPtr::Header*)(dstptr + linklen);
2195  row.m_row_data.m_linear.m_data = dstptr + linklen + headlen;
2196 
2197  memcpy(dstptr + linklen, headptr, 4 * headlen);
2198  copy(dstptr + linklen + headlen, dataPtr);
2199 
2200  if (linklen)
2201  {
2202  jam();
2203  NullRowRef.copyto_link(dstptr); // Null terminate list...
2204  add_to_list(treeNodePtr.p->m_row_list, ref);
2205  }
2206  else
2207  {
2208  jam();
2209  return add_to_map(requestPtr, treeNodePtr, row.m_src_correlation, ref);
2210  }
2211 
2212  return 0;
2213 }
2214 
2215 void
2216 Dbspj::setupRowPtr(Ptr<TreeNode> treeNodePtr,
2217  RowPtr& row, RowRef ref, const Uint32 * src)
2218 {
2219  Uint32 linklen = (treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER_MAP)?
2220  0 : 2;
2221  const RowPtr::Header * headptr = (RowPtr::Header*)(src + linklen);
2222  Uint32 headlen = 1 + headptr->m_len;
2223 
2224  row.m_type = RowPtr::RT_LINEAR;
2225  row.m_row_data.m_linear.m_row_ref = ref;
2226  row.m_row_data.m_linear.m_header = headptr;
2227  row.m_row_data.m_linear.m_data = (Uint32*)headptr + headlen;
2228 }
2229 
2230 void
2231 Dbspj::add_to_list(SLFifoRowList & list, RowRef rowref)
2232 {
2233  if (list.isNull())
2234  {
2235  jam();
2236  list.m_first_row_page_id = rowref.m_page_id;
2237  list.m_first_row_page_pos = rowref.m_page_pos;
2238  }
2239  else
2240  {
2241  jam();
2245  RowRef last;
2246  last.m_allocator = rowref.m_allocator;
2247  last.m_page_id = list.m_last_row_page_id;
2248  last.m_page_pos = list.m_last_row_page_pos;
2249  Uint32 * rowptr;
2250  if (rowref.m_allocator == 0)
2251  {
2252  jam();
2253  rowptr = get_row_ptr_stack(last);
2254  }
2255  else
2256  {
2257  jam();
2258  rowptr = get_row_ptr_var(last);
2259  }
2260  rowref.copyto_link(rowptr);
2261  }
2262 
2263  list.m_last_row_page_id = rowref.m_page_id;
2264  list.m_last_row_page_pos = rowref.m_page_pos;
2265 }
2266 
2267 Uint32 *
2268 Dbspj::get_row_ptr_stack(RowRef pos)
2269 {
2270  ndbassert(pos.m_allocator == 0);
2271  Ptr<RowPage> ptr;
2272  m_page_pool.getPtr(ptr, pos.m_page_id);
2273  return ptr.p->m_data + pos.m_page_pos;
2274 }
2275 
2276 Uint32 *
2277 Dbspj::get_row_ptr_var(RowRef pos)
2278 {
2279  ndbassert(pos.m_allocator == 1);
2280  Ptr<RowPage> ptr;
2281  m_page_pool.getPtr(ptr, pos.m_page_id);
2282  return ((Var_page*)ptr.p)->get_ptr(pos.m_page_pos);
2283 }
2284 
2285 bool
2286 Dbspj::first(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr,
2287  SLFifoRowListIterator& iter)
2288 {
2289  Uint32 var = (requestPtr.p->m_bits & Request::RT_VAR_ALLOC) != 0;
2290  SLFifoRowList & list = treeNodePtr.p->m_row_list;
2291  if (list.isNull())
2292  {
2293  jam();
2294  iter.setNull();
2295  return false;
2296  }
2297 
2298  iter.m_ref.m_allocator = var;
2299  iter.m_ref.m_page_id = list.m_first_row_page_id;
2300  iter.m_ref.m_page_pos = list.m_first_row_page_pos;
2301  if (var == 0)
2302  {
2303  jam();
2304  iter.m_row_ptr = get_row_ptr_stack(iter.m_ref);
2305  }
2306  else
2307  {
2308  jam();
2309  iter.m_row_ptr = get_row_ptr_var(iter.m_ref);
2310  }
2311 
2312  return true;
2313 }
2314 
2315 bool
2316 Dbspj::next(SLFifoRowListIterator& iter)
2317 {
2318  iter.m_ref.assign_from_link(iter.m_row_ptr);
2319  if (iter.m_ref.isNull())
2320  {
2321  jam();
2322  return false;
2323  }
2324 
2325  if (iter.m_ref.m_allocator == 0)
2326  {
2327  jam();
2328  iter.m_row_ptr = get_row_ptr_stack(iter.m_ref);
2329  }
2330  else
2331  {
2332  jam();
2333  iter.m_row_ptr = get_row_ptr_var(iter.m_ref);
2334  }
2335  return true;
2336 }
2337 
2338 bool
2339 Dbspj::next(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr,
2340  SLFifoRowListIterator& iter, SLFifoRowListIteratorPtr start)
2341 {
2342  Uint32 var = (requestPtr.p->m_bits & Request::RT_VAR_ALLOC) != 0;
2343  (void)var;
2344  ndbassert(var == iter.m_ref.m_allocator);
2345  if (iter.m_ref.m_allocator == 0)
2346  {
2347  jam();
2348  iter.m_row_ptr = get_row_ptr_stack(start.m_ref);
2349  }
2350  else
2351  {
2352  jam();
2353  iter.m_row_ptr = get_row_ptr_var(start.m_ref);
2354  }
2355  return next(iter);
2356 }
2357 
2358 Uint32
2359 Dbspj::add_to_map(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr,
2360  Uint32 corrVal, RowRef rowref)
2361 {
2362  Uint32 * mapptr;
2363  RowMap& map = treeNodePtr.p->m_row_map;
2364  if (map.isNull())
2365  {
2366  jam();
2367  Uint16 batchsize = treeNodePtr.p->m_batch_size;
2368  Uint32 sz16 = RowMap::MAP_SIZE_PER_REF_16 * batchsize;
2369  Uint32 sz32 = (sz16 + 1) / 2;
2370  RowRef ref;
2371  if ((requestPtr.p->m_bits & Request::RT_VAR_ALLOC) == 0)
2372  {
2373  jam();
2374  mapptr = stackAlloc(requestPtr.p->m_rowBuffer, ref, sz32);
2375  }
2376  else
2377  {
2378  jam();
2379  mapptr = varAlloc(requestPtr.p->m_rowBuffer, ref, sz32);
2380  }
2381  if (unlikely(mapptr == 0))
2382  {
2383  jam();
2384  return DbspjErr::OutOfRowMemory;
2385  }
2386  map.assign(ref);
2387  map.m_elements = 0;
2388  map.m_size = batchsize;
2389  map.clear(mapptr);
2390  }
2391  else
2392  {
2393  jam();
2394  RowRef ref;
2395  map.copyto(ref);
2396  if (ref.m_allocator == 0)
2397  {
2398  jam();
2399  mapptr = get_row_ptr_stack(ref);
2400  }
2401  else
2402  {
2403  jam();
2404  mapptr = get_row_ptr_var(ref);
2405  }
2406  }
2407 
2408  Uint32 pos = corrVal & 0xFFFF;
2409  ndbrequire(pos < map.m_size);
2410  ndbrequire(map.m_elements < map.m_size);
2411 
2412  if (1)
2413  {
2417  RowRef check;
2418  map.load(mapptr, pos, check);
2419  ndbrequire(check.m_page_pos == 0xFFFF);
2420  }
2421 
2422  map.store(mapptr, pos, rowref);
2423 
2424  return 0;
2425 }
2426 
2427 bool
2428 Dbspj::first(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr,
2429  RowMapIterator & iter)
2430 {
2431  Uint32 var = (requestPtr.p->m_bits & Request::RT_VAR_ALLOC) != 0;
2432  RowMap& map = treeNodePtr.p->m_row_map;
2433  if (map.isNull())
2434  {
2435  jam();
2436  iter.setNull();
2437  return false;
2438  }
2439 
2440  if (var == 0)
2441  {
2442  jam();
2443  iter.m_map_ptr = get_row_ptr_stack(map.m_map_ref);
2444  }
2445  else
2446  {
2447  jam();
2448  iter.m_map_ptr = get_row_ptr_var(map.m_map_ref);
2449  }
2450  iter.m_size = map.m_size;
2451  iter.m_ref.m_allocator = var;
2452 
2453  Uint32 pos = 0;
2454  while (RowMap::isNull(iter.m_map_ptr, pos) && pos < iter.m_size)
2455  pos++;
2456 
2457  if (pos == iter.m_size)
2458  {
2459  jam();
2460  iter.setNull();
2461  return false;
2462  }
2463  else
2464  {
2465  jam();
2466  RowMap::load(iter.m_map_ptr, pos, iter.m_ref);
2467  iter.m_element_no = pos;
2468  if (var == 0)
2469  {
2470  jam();
2471  iter.m_row_ptr = get_row_ptr_stack(iter.m_ref);
2472  }
2473  else
2474  {
2475  jam();
2476  iter.m_row_ptr = get_row_ptr_var(iter.m_ref);
2477  }
2478  return true;
2479  }
2480 }
2481 
2482 bool
2483 Dbspj::next(RowMapIterator & iter)
2484 {
2485  Uint32 pos = iter.m_element_no + 1;
2486  while (RowMap::isNull(iter.m_map_ptr, pos) && pos < iter.m_size)
2487  pos++;
2488 
2489  if (pos == iter.m_size)
2490  {
2491  jam();
2492  iter.setNull();
2493  return false;
2494  }
2495  else
2496  {
2497  jam();
2498  RowMap::load(iter.m_map_ptr, pos, iter.m_ref);
2499  iter.m_element_no = pos;
2500  if (iter.m_ref.m_allocator == 0)
2501  {
2502  jam();
2503  iter.m_row_ptr = get_row_ptr_stack(iter.m_ref);
2504  }
2505  else
2506  {
2507  jam();
2508  iter.m_row_ptr = get_row_ptr_var(iter.m_ref);
2509  }
2510  return true;
2511  }
2512 }
2513 
2514 bool
2515 Dbspj::next(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr,
2516  RowMapIterator & iter, RowMapIteratorPtr start)
2517 {
2518  Uint32 var = (requestPtr.p->m_bits & Request::RT_VAR_ALLOC) != 0;
2519  RowMap& map = treeNodePtr.p->m_row_map;
2520  ndbrequire(!map.isNull());
2521 
2522  if (var == 0)
2523  {
2524  jam();
2525  iter.m_map_ptr = get_row_ptr_stack(map.m_map_ref);
2526  }
2527  else
2528  {
2529  jam();
2530  iter.m_map_ptr = get_row_ptr_var(map.m_map_ref);
2531  }
2532  iter.m_size = map.m_size;
2533 
2534  RowMap::load(iter.m_map_ptr, start.m_element_no, iter.m_ref);
2535  iter.m_element_no = start.m_element_no;
2536  return next(iter);
2537 }
2538 
2539 Uint32 *
2540 Dbspj::stackAlloc(RowBuffer & buffer, RowRef& dst, Uint32 sz)
2541 {
2542  Ptr<RowPage> ptr;
2543  LocalDLFifoList<RowPage> list(m_page_pool, buffer.m_page_list);
2544 
2545  Uint32 pos = buffer.m_stack.m_pos;
2546  const Uint32 SIZE = RowPage::SIZE;
2547  if (list.isEmpty() || (pos + sz) > SIZE)
2548  {
2549  jam();
2550  bool ret = allocPage(ptr);
2551  if (unlikely(ret == false))
2552  {
2553  jam();
2554  return 0;
2555  }
2556 
2557  pos = 0;
2558  list.addLast(ptr);
2559  }
2560  else
2561  {
2562  list.last(ptr);
2563  }
2564 
2565  dst.m_page_id = ptr.i;
2566  dst.m_page_pos = pos;
2567  dst.m_allocator = 0;
2568  buffer.m_stack.m_pos = pos + sz;
2569  return ptr.p->m_data + pos;
2570 }
2571 
2572 Uint32 *
2573 Dbspj::varAlloc(RowBuffer & buffer, RowRef& dst, Uint32 sz)
2574 {
2575  Ptr<RowPage> ptr;
2576  LocalDLFifoList<RowPage> list(m_page_pool, buffer.m_page_list);
2577 
2578  Uint32 free_space = buffer.m_var.m_free;
2579  if (list.isEmpty() || free_space < (sz + 1))
2580  {
2581  jam();
2582  bool ret = allocPage(ptr);
2583  if (unlikely(ret == false))
2584  {
2585  jam();
2586  return 0;
2587  }
2588 
2589  list.addLast(ptr);
2590  ((Var_page*)ptr.p)->init();
2591  }
2592  else
2593  {
2594  jam();
2595  list.last(ptr);
2596  }
2597 
2598  Var_page * vp = (Var_page*)ptr.p;
2599  Uint32 pos = vp->alloc_record(sz, (Var_page*)m_buffer0, Var_page::CHAIN);
2600 
2601  dst.m_page_id = ptr.i;
2602  dst.m_page_pos = pos;
2603  dst.m_allocator = 1;
2604  buffer.m_var.m_free = vp->free_space;
2605  return vp->get_ptr(pos);
2606 }
2607 
2608 bool
2609 Dbspj::allocPage(Ptr<RowPage> & ptr)
2610 {
2611  if (m_free_page_list.firstItem == RNIL)
2612  {
2613  jam();
2614  ptr.p = (RowPage*)m_ctx.m_mm.alloc_page(RT_SPJ_DATABUFFER,
2615  &ptr.i,
2616  Ndbd_mem_manager::NDB_ZONE_ANY);
2617  if (ptr.p == 0)
2618  {
2619  return false;
2620  }
2621  return true;
2622  }
2623  else
2624  {
2625  jam();
2626  LocalSLList<RowPage> list(m_page_pool, m_free_page_list);
2627  bool ret = list.remove_front(ptr);
2628  ndbrequire(ret);
2629  return ret;
2630  }
2631 }
2632 
2633 void
2634 Dbspj::releasePage(Ptr<RowPage> ptr)
2635 {
2636  LocalSLList<RowPage> list(m_page_pool, m_free_page_list);
2637  list.add(ptr);
2638 }
2639 
2640 void
2641 Dbspj::releasePages(Uint32 first, Ptr<RowPage> last)
2642 {
2643  LocalSLList<RowPage> list(m_page_pool, m_free_page_list);
2644  list.add(first, last);
2645 }
2646 
2647 void
2648 Dbspj::releaseGlobal(Signal * signal)
2649 {
2650  Uint32 delay = 100;
2651  LocalSLList<RowPage> list(m_page_pool, m_free_page_list);
2652  if (list.empty())
2653  {
2654  jam();
2655  delay = 300;
2656  }
2657  else
2658  {
2659  Ptr<RowPage> ptr;
2660  list.remove_front(ptr);
2661  m_ctx.m_mm.release_page(RT_SPJ_DATABUFFER, ptr.i);
2662  }
2663 
2664  signal->theData[0] = 0;
2665  sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, delay, 1);
2666 }
2667 
2675 const Dbspj::OpInfo
2676 Dbspj::g_LookupOpInfo =
2677 {
2678  &Dbspj::lookup_build,
2679  0, // prepare
2680  &Dbspj::lookup_start,
2681  &Dbspj::lookup_execTRANSID_AI,
2682  &Dbspj::lookup_execLQHKEYREF,
2683  &Dbspj::lookup_execLQHKEYCONF,
2684  0, // execSCAN_FRAGREF
2685  0, // execSCAN_FRAGCONF
2686  &Dbspj::lookup_parent_row,
2687  &Dbspj::lookup_parent_batch_complete,
2688  0, // Dbspj::lookup_parent_batch_repeat,
2689  0, // Dbspj::lookup_parent_batch_cleanup,
2690  0, // Dbspj::lookup_execSCAN_NEXTREQ
2691  0, // Dbspj::lookup_complete
2692  &Dbspj::lookup_abort,
2693  &Dbspj::lookup_execNODE_FAILREP,
2694  &Dbspj::lookup_cleanup
2695 };
2696 
2697 Uint32
2698 Dbspj::lookup_build(Build_context& ctx,
2699  Ptr<Request> requestPtr,
2700  const QueryNode* qn,
2701  const QueryNodeParameters* qp)
2702 {
2703  Uint32 err = 0;
2704  Ptr<TreeNode> treeNodePtr;
2705  const QN_LookupNode * node = (const QN_LookupNode*)qn;
2706  const QN_LookupParameters * param = (const QN_LookupParameters*)qp;
2707  do
2708  {
2709  err = createNode(ctx, requestPtr, treeNodePtr);
2710  if (unlikely(err != 0))
2711  {
2712  DEBUG_CRASH();
2713  break;
2714  }
2715 
2716  treeNodePtr.p->m_info = &g_LookupOpInfo;
2717  Uint32 transId1 = requestPtr.p->m_transId[0];
2718  Uint32 transId2 = requestPtr.p->m_transId[1];
2719  Uint32 savePointId = ctx.m_savepointId;
2720 
2721  Uint32 treeBits = node->requestInfo;
2722  Uint32 paramBits = param->requestInfo;
2723  //ndbout_c("Dbspj::lookup_build() treeBits=%.8x paramBits=%.8x",
2724  // treeBits, paramBits);
2725  LqhKeyReq* dst = (LqhKeyReq*)treeNodePtr.p->m_lookup_data.m_lqhKeyReq;
2726  {
2730  dst->tcBlockref = reference();
2731  dst->clientConnectPtr = treeNodePtr.i;
2732 
2739  dst->transId1 = transId1;
2740  dst->transId2 = transId2;
2741  dst->savePointId = savePointId;
2742  dst->scanInfo = 0;
2743  dst->attrLen = 0;
2745  dst->variableData[0] = ctx.m_resultRef;
2746  dst->variableData[1] = param->resultData;
2747  Uint32 requestInfo = 0;
2748  LqhKeyReq::setOperation(requestInfo, ZREAD);
2749  LqhKeyReq::setApplicationAddressFlag(requestInfo, 1);
2750  LqhKeyReq::setDirtyFlag(requestInfo, 1);
2751  LqhKeyReq::setSimpleFlag(requestInfo, 1);
2752  LqhKeyReq::setNormalProtocolFlag(requestInfo, 0); // Assume T_LEAF
2753  LqhKeyReq::setCorrFactorFlag(requestInfo, 1);
2754  LqhKeyReq::setNoDiskFlag(requestInfo,
2755  (treeBits & DABits::NI_LINKED_DISK) == 0 &&
2756  (paramBits & DABits::PI_DISK_ATTR) == 0);
2757  dst->requestInfo = requestInfo;
2758  }
2759 
2760  err = DbspjErr::InvalidTreeNodeSpecification;
2761  if (unlikely(node->len < QN_LookupNode::NodeSize))
2762  {
2763  DEBUG_CRASH();
2764  break;
2765  }
2766 
2767  if (treeBits & QN_LookupNode::L_UNIQUE_INDEX)
2768  {
2769  jam();
2770  treeNodePtr.p->m_bits |= TreeNode::T_UNIQUE_INDEX_LOOKUP;
2771  }
2772 
2773  Uint32 tableId = node->tableId;
2774  Uint32 schemaVersion = node->tableVersion;
2775 
2776  Uint32 tableSchemaVersion = tableId + ((schemaVersion << 16) & 0xFFFF0000);
2777  dst->tableSchemaVersion = tableSchemaVersion;
2778 
2779  err = DbspjErr::InvalidTreeParametersSpecification;
2780  DEBUG("param len: " << param->len);
2781  if (unlikely(param->len < QN_LookupParameters::NodeSize))
2782  {
2783  DEBUG_CRASH();
2784  break;
2785  }
2786 
2787  ctx.m_resultData = param->resultData;
2788  treeNodePtr.p->m_lookup_data.m_api_resultRef = ctx.m_resultRef;
2789  treeNodePtr.p->m_lookup_data.m_api_resultData = param->resultData;
2790  treeNodePtr.p->m_lookup_data.m_outstanding = 0;
2791  treeNodePtr.p->m_lookup_data.m_parent_batch_complete = false;
2792 
2796  struct DABuffer nodeDA, paramDA;
2797  nodeDA.ptr = node->optional;
2798  nodeDA.end = nodeDA.ptr + (node->len - QN_LookupNode::NodeSize);
2799  paramDA.ptr = param->optional;
2800  paramDA.end = paramDA.ptr + (param->len - QN_LookupParameters::NodeSize);
2801  err = parseDA(ctx, requestPtr, treeNodePtr,
2802  nodeDA, treeBits, paramDA, paramBits);
2803  if (unlikely(err != 0))
2804  {
2805  DEBUG_CRASH();
2806  break;
2807  }
2808 
2809  if (treeNodePtr.p->m_bits & TreeNode::T_ATTR_INTERPRETED)
2810  {
2811  jam();
2812  LqhKeyReq::setInterpretedFlag(dst->requestInfo, 1);
2813  }
2814 
2818  treeNodePtr.p->m_batch_size = 1;
2819  if (treeNodePtr.p->m_parentPtrI != RNIL)
2820  {
2821  jam();
2822  Ptr<TreeNode> parentPtr;
2823  m_treenode_pool.getPtr(parentPtr, treeNodePtr.p->m_parentPtrI);
2824  treeNodePtr.p->m_batch_size = parentPtr.p->m_batch_size;
2825  }
2826 
2827  if (ctx.m_start_signal)
2828  {
2829  jam();
2830  Signal * signal = ctx.m_start_signal;
2831  const LqhKeyReq* src = (const LqhKeyReq*)signal->getDataPtr();
2832 #if NOT_YET
2833  Uint32 instanceNo =
2834  blockToInstance(signal->header.theReceiversBlockNumber);
2835  treeNodePtr.p->m_send.m_ref = numberToRef(DBLQH,
2836  instanceNo, getOwnNodeId());
2837 #else
2838  treeNodePtr.p->m_send.m_ref =
2839  numberToRef(DBLQH, getInstanceKey(src->tableSchemaVersion & 0xFFFF,
2840  src->fragmentData & 0xFFFF),
2841  getOwnNodeId());
2842 #endif
2843 
2844  Uint32 hashValue = src->hashValue;
2845  Uint32 fragId = src->fragmentData;
2846  Uint32 requestInfo = src->requestInfo;
2847  Uint32 attrLen = src->attrLen; // fragdist-key is in here
2848 
2852  ndbassert(LqhKeyReq::getAttrLen(attrLen) == 0); // Only long
2853  ndbassert(LqhKeyReq::getScanTakeOverFlag(attrLen) == 0);// Not supported
2854  ndbassert(LqhKeyReq::getReorgFlag(attrLen) == 0); // Not supported
2855  ndbassert(LqhKeyReq::getOperation(requestInfo) == ZREAD);
2856  ndbassert(LqhKeyReq::getKeyLen(requestInfo) == 0); // Only long
2857  ndbassert(LqhKeyReq::getMarkerFlag(requestInfo) == 0); // Only read
2858  ndbassert(LqhKeyReq::getAIInLqhKeyReq(requestInfo) == 0);
2859  ndbassert(LqhKeyReq::getSeqNoReplica(requestInfo) == 0);
2860  ndbassert(LqhKeyReq::getLastReplicaNo(requestInfo) == 0);
2861  ndbassert(LqhKeyReq::getApplicationAddressFlag(requestInfo) != 0);
2862  ndbassert(LqhKeyReq::getSameClientAndTcFlag(requestInfo) == 0);
2863 
2864 #if TODO
2865 
2868  static Uint8 getDirtyFlag(const UintR & requestInfo);
2869  static Uint8 getSimpleFlag(const UintR & requestInfo);
2870 #endif
2871 
2872  Uint32 dst_requestInfo = dst->requestInfo;
2873  ndbassert(LqhKeyReq::getInterpretedFlag(requestInfo) ==
2874  LqhKeyReq::getInterpretedFlag(dst_requestInfo));
2875  ndbassert(LqhKeyReq::getNoDiskFlag(requestInfo) ==
2876  LqhKeyReq::getNoDiskFlag(dst_requestInfo));
2877 
2878  dst->hashValue = hashValue;
2879  dst->fragmentData = fragId;
2880  dst->attrLen = attrLen; // fragdist is in here
2881 
2882  treeNodePtr.p->m_send.m_keyInfoPtrI = ctx.m_keyPtr.i;
2883  treeNodePtr.p->m_bits |= TreeNode::T_ONE_SHOT;
2884  }
2885  return 0;
2886  } while (0);
2887 
2888  return err;
2889 }
2890 
2891 void
2892 Dbspj::lookup_start(Signal* signal,
2893  Ptr<Request> requestPtr,
2894  Ptr<TreeNode> treeNodePtr)
2895 {
2896  lookup_send(signal, requestPtr, treeNodePtr);
2897 }
2898 
2899 void
2900 Dbspj::lookup_send(Signal* signal,
2901  Ptr<Request> requestPtr,
2902  Ptr<TreeNode> treeNodePtr)
2903 {
2904  jam();
2905 
2906  Uint32 cnt = 2;
2907  if (treeNodePtr.p->isLeaf())
2908  {
2909  jam();
2910  if (requestPtr.p->isLookup())
2911  {
2912  jam();
2913  cnt = 0;
2914  }
2915  else
2916  {
2917  jam();
2918  cnt = 1;
2919  }
2920  }
2921 
2922  LqhKeyReq* req = reinterpret_cast<LqhKeyReq*>(signal->getDataPtrSend());
2923 
2924  memcpy(req, treeNodePtr.p->m_lookup_data.m_lqhKeyReq,
2925  sizeof(treeNodePtr.p->m_lookup_data.m_lqhKeyReq));
2926  req->variableData[2] = treeNodePtr.p->m_send.m_correlation;
2927  req->variableData[3] = requestPtr.p->m_rootResultData;
2928 
2929  if (!(requestPtr.p->isLookup() && treeNodePtr.p->isLeaf()))
2930  {
2931  // Non-LEAF want reply to SPJ instead of ApiClient.
2932  LqhKeyReq::setNormalProtocolFlag(req->requestInfo, 1);
2933  req->variableData[0] = reference();
2934  req->variableData[1] = treeNodePtr.i;
2935  }
2936  else
2937  {
2938  jam();
2943  req->tcBlockref = requestPtr.p->m_senderRef;
2944  }
2945 
2946  SectionHandle handle(this);
2947 
2948  Uint32 ref = treeNodePtr.p->m_send.m_ref;
2949  Uint32 keyInfoPtrI = treeNodePtr.p->m_send.m_keyInfoPtrI;
2950  Uint32 attrInfoPtrI = treeNodePtr.p->m_send.m_attrInfoPtrI;
2951 
2952  if (treeNodePtr.p->m_bits & TreeNode::T_ONE_SHOT)
2953  {
2954  jam();
2958  treeNodePtr.p->m_send.m_attrInfoPtrI = RNIL;
2959  treeNodePtr.p->m_send.m_keyInfoPtrI = RNIL;
2960  }
2961  else
2962  {
2963  if ((treeNodePtr.p->m_bits & TreeNode::T_KEYINFO_CONSTRUCTED) == 0)
2964  {
2965  jam();
2966  Uint32 tmp = RNIL;
2967  ndbrequire(dupSection(tmp, keyInfoPtrI)); // TODO handle error
2968  keyInfoPtrI = tmp;
2969  }
2970  else
2971  {
2972  jam();
2973  treeNodePtr.p->m_send.m_keyInfoPtrI = RNIL;
2974  }
2975 
2976  if ((treeNodePtr.p->m_bits & TreeNode::T_ATTRINFO_CONSTRUCTED) == 0)
2977  {
2978  jam();
2979  Uint32 tmp = RNIL;
2980  ndbrequire(dupSection(tmp, attrInfoPtrI)); // TODO handle error
2981  attrInfoPtrI = tmp;
2982  }
2983  else
2984  {
2985  jam();
2986  treeNodePtr.p->m_send.m_attrInfoPtrI = RNIL;
2987  }
2988  }
2989 
2990  getSection(handle.m_ptr[0], keyInfoPtrI);
2991  getSection(handle.m_ptr[1], attrInfoPtrI);
2992  handle.m_cnt = 2;
2993 
2994 #if defined DEBUG_LQHKEYREQ
2995  ndbout_c("LQHKEYREQ to %x", ref);
2996  printLQHKEYREQ(stdout, signal->getDataPtrSend(),
2997  NDB_ARRAY_SIZE(treeNodePtr.p->m_lookup_data.m_lqhKeyReq),
2998  DBLQH);
2999  printf("KEYINFO: ");
3000  print(handle.m_ptr[0], stdout);
3001  printf("ATTRINFO: ");
3002  print(handle.m_ptr[1], stdout);
3003 #endif
3004 
3005  Uint32 Tnode = refToNode(ref);
3006  if (Tnode == getOwnNodeId())
3007  {
3008  c_Counters.incr_counter(CI_LOCAL_READS_SENT, 1);
3009  }
3010  else
3011  {
3012  c_Counters.incr_counter(CI_REMOTE_READS_SENT, 1);
3013  }
3014 
3015  if (unlikely(!c_alive_nodes.get(Tnode)))
3016  {
3017  jam();
3018  releaseSections(handle);
3019  abort(signal, requestPtr, DbspjErr::NodeFailure);
3020  return;
3021  }
3022  else if (! (treeNodePtr.p->isLeaf() && requestPtr.p->isLookup()))
3023  {
3024  jam();
3025  ndbassert(Tnode < NDB_ARRAY_SIZE(requestPtr.p->m_lookup_node_data));
3026  requestPtr.p->m_outstanding += cnt;
3027  requestPtr.p->m_lookup_node_data[Tnode] += cnt;
3028  // number wrapped
3029  ndbrequire(! (requestPtr.p->m_lookup_node_data[Tnode] == 0));
3030  }
3031 
3032  sendSignal(ref, GSN_LQHKEYREQ, signal,
3033  NDB_ARRAY_SIZE(treeNodePtr.p->m_lookup_data.m_lqhKeyReq),
3034  JBB, &handle);
3035 
3036  treeNodePtr.p->m_lookup_data.m_outstanding += cnt;
3037  if (requestPtr.p->isLookup() && treeNodePtr.p->isLeaf())
3038  {
3039  jam();
3044  Uint32 resultRef = req->variableData[0];
3045  Uint32 resultData = req->variableData[1];
3046 
3047  TcKeyConf* conf = (TcKeyConf*)signal->getDataPtrSend();
3048  conf->apiConnectPtr = RNIL; // lookup transaction from operations...
3049  conf->confInfo = 0;
3050  TcKeyConf::setNoOfOperations(conf->confInfo, 1);
3051  conf->transId1 = requestPtr.p->m_transId[0];
3052  conf->transId2 = requestPtr.p->m_transId[1];
3053  conf->operations[0].apiOperationPtr = resultData;
3054  conf->operations[0].attrInfoLen = TcKeyConf::DirtyReadBit | Tnode;
3055  Uint32 sigLen = TcKeyConf::StaticLength + TcKeyConf::OperationLength;
3056  sendTCKEYCONF(signal, sigLen, resultRef, requestPtr.p->m_senderRef);
3057  }
3058 }
3059 
3060 void
3061 Dbspj::lookup_execTRANSID_AI(Signal* signal,
3062  Ptr<Request> requestPtr,
3063  Ptr<TreeNode> treeNodePtr,
3064  const RowPtr & rowRef)
3065 {
3066  jam();
3067 
3068  Uint32 Tnode = refToNode(signal->getSendersBlockRef());
3069 
3070  {
3071  LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
3072  Local_dependency_map list(pool, treeNodePtr.p->m_dependent_nodes);
3073  Dependency_map::ConstDataBufferIterator it;
3074  for (list.first(it); !it.isNull(); list.next(it))
3075  {
3076  jam();
3077  Ptr<TreeNode> childPtr;
3078  m_treenode_pool.getPtr(childPtr, * it.data);
3079  ndbrequire(childPtr.p->m_info != 0&&childPtr.p->m_info->m_parent_row!=0);
3080  (this->*(childPtr.p->m_info->m_parent_row))(signal,
3081  requestPtr, childPtr,rowRef);
3082  }
3083  }
3084  ndbrequire(!(requestPtr.p->isLookup() && treeNodePtr.p->isLeaf()));
3085 
3086  ndbassert(requestPtr.p->m_lookup_node_data[Tnode] >= 1);
3087  requestPtr.p->m_lookup_node_data[Tnode] -= 1;
3088 
3089  treeNodePtr.p->m_lookup_data.m_outstanding--;
3090 
3091  if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE
3092  && treeNodePtr.p->m_lookup_data.m_parent_batch_complete
3093  && treeNodePtr.p->m_lookup_data.m_outstanding == 0)
3094  {
3095  jam();
3096  // We have received all rows for this operation in this batch.
3097  reportBatchComplete(signal, requestPtr, treeNodePtr);
3098 
3099  // Prepare for next batch.
3100  treeNodePtr.p->m_lookup_data.m_parent_batch_complete = false;
3101  treeNodePtr.p->m_lookup_data.m_outstanding = 0;
3102  }
3103 
3104  checkBatchComplete(signal, requestPtr, 1);
3105 }
3106 
3107 void
3108 Dbspj::lookup_execLQHKEYREF(Signal* signal,
3109  Ptr<Request> requestPtr,
3110  Ptr<TreeNode> treeNodePtr)
3111 {
3112  const LqhKeyRef * rep = (LqhKeyRef*)signal->getDataPtr();
3113  Uint32 errCode = rep->errorCode;
3114  Uint32 Tnode = refToNode(signal->getSendersBlockRef());
3115 
3116  c_Counters.incr_counter(CI_READS_NOT_FOUND, 1);
3117 
3118  if (requestPtr.p->isLookup())
3119  {
3120  jam();
3121 
3122  /* CONF/REF not requested for lookup-Leaf: */
3123  ndbrequire(!treeNodePtr.p->isLeaf());
3124 
3133  Uint32 resultRef = treeNodePtr.p->m_lookup_data.m_api_resultRef;
3134  Uint32 resultData = treeNodePtr.p->m_lookup_data.m_api_resultData;
3135  TcKeyRef* ref = (TcKeyRef*)signal->getDataPtr();
3136  ref->connectPtr = resultData;
3137  ref->transId[0] = requestPtr.p->m_transId[0];
3138  ref->transId[1] = requestPtr.p->m_transId[1];
3139  ref->errorCode = errCode;
3140  ref->errorData = 0;
3141 
3142  DEBUG("lookup_execLQHKEYREF, errorCode:" << errCode);
3143 
3144  sendTCKEYREF(signal, resultRef, requestPtr.p->m_senderRef);
3145 
3146  if (treeNodePtr.p->m_bits & TreeNode::T_UNIQUE_INDEX_LOOKUP)
3147  {
3153  LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
3154  Local_dependency_map list(pool, treeNodePtr.p->m_dependent_nodes);
3155  Dependency_map::ConstDataBufferIterator it;
3156  ndbrequire(list.first(it));
3157  ndbrequire(list.getSize() == 1); // should only be 1 child
3158  Ptr<TreeNode> childPtr;
3159  m_treenode_pool.getPtr(childPtr, * it.data);
3160  if (childPtr.p->m_bits & TreeNode::T_LEAF)
3161  {
3162  jam();
3163  Uint32 resultRef = childPtr.p->m_lookup_data.m_api_resultRef;
3164  Uint32 resultData = childPtr.p->m_lookup_data.m_api_resultData;
3165  TcKeyConf* conf = (TcKeyConf*)signal->getDataPtr();
3166  conf->apiConnectPtr = RNIL;
3167  conf->confInfo = 0;
3168  conf->gci_hi = 0;
3169  TcKeyConf::setNoOfOperations(conf->confInfo, 1);
3170  conf->transId1 = requestPtr.p->m_transId[0];
3171  conf->transId2 = requestPtr.p->m_transId[1];
3172  conf->operations[0].apiOperationPtr = resultData;
3173  conf->operations[0].attrInfoLen =
3174  TcKeyConf::DirtyReadBit |getOwnNodeId();
3175  sendTCKEYCONF(signal, TcKeyConf::StaticLength + 2, resultRef, requestPtr.p->m_senderRef);
3176  }
3177  }
3178  }
3179  else
3180  {
3181  jam();
3182  switch(errCode){
3183  case 626: // Row not found
3184  case 899: // Interpreter_exit_nok
3185  jam();
3186  break;
3187  default:
3188  jam();
3189  abort(signal, requestPtr, errCode);
3190  }
3191  }
3192 
3193  Uint32 cnt = 2;
3194  if (treeNodePtr.p->isLeaf()) // Can't be a lookup-Leaf, asserted above
3195  cnt = 1;
3196 
3197  ndbassert(requestPtr.p->m_lookup_node_data[Tnode] >= cnt);
3198  requestPtr.p->m_lookup_node_data[Tnode] -= cnt;
3199 
3200  treeNodePtr.p->m_lookup_data.m_outstanding -= cnt;
3201 
3202  if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE
3203  && treeNodePtr.p->m_lookup_data.m_parent_batch_complete
3204  && treeNodePtr.p->m_lookup_data.m_outstanding == 0)
3205  {
3206  jam();
3207  // We have received all rows for this operation in this batch.
3208  reportBatchComplete(signal, requestPtr, treeNodePtr);
3209 
3210  // Prepare for next batch.
3211  treeNodePtr.p->m_lookup_data.m_parent_batch_complete = false;
3212  treeNodePtr.p->m_lookup_data.m_outstanding = 0;
3213  }
3214 
3215  checkBatchComplete(signal, requestPtr, cnt);
3216 }
3217 
3218 void
3219 Dbspj::lookup_execLQHKEYCONF(Signal* signal,
3220  Ptr<Request> requestPtr,
3221  Ptr<TreeNode> treeNodePtr)
3222 {
3223  ndbrequire(!(requestPtr.p->isLookup() && treeNodePtr.p->isLeaf()));
3224 
3225  Uint32 Tnode = refToNode(signal->getSendersBlockRef());
3226 
3227  if (treeNodePtr.p->m_bits & TreeNode::T_USER_PROJECTION)
3228  {
3229  jam();
3230  requestPtr.p->m_rows++;
3231  }
3232 
3233  ndbassert(requestPtr.p->m_lookup_node_data[Tnode] >= 1);
3234  requestPtr.p->m_lookup_node_data[Tnode] -= 1;
3235 
3236  treeNodePtr.p->m_lookup_data.m_outstanding--;
3237 
3238  if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE
3239  && treeNodePtr.p->m_lookup_data.m_parent_batch_complete
3240  && treeNodePtr.p->m_lookup_data.m_outstanding == 0)
3241  {
3242  jam();
3243  // We have received all rows for this operation in this batch.
3244  reportBatchComplete(signal, requestPtr, treeNodePtr);
3245 
3246  // Prepare for next batch.
3247  treeNodePtr.p->m_lookup_data.m_parent_batch_complete = false;
3248  treeNodePtr.p->m_lookup_data.m_outstanding = 0;
3249  }
3250 
3251  checkBatchComplete(signal, requestPtr, 1);
3252 }
3253 
3254 void
3255 Dbspj::lookup_parent_row(Signal* signal,
3256  Ptr<Request> requestPtr,
3257  Ptr<TreeNode> treeNodePtr,
3258  const RowPtr & rowRef)
3259 {
3266  Uint32 err;
3267  const LqhKeyReq* src = (LqhKeyReq*)treeNodePtr.p->m_lookup_data.m_lqhKeyReq;
3268  const Uint32 tableId = LqhKeyReq::getTableId(src->tableSchemaVersion);
3269  const Uint32 corrVal = rowRef.m_src_correlation;
3270 
3271  DEBUG("::lookup_parent_row");
3272 
3273  do
3274  {
3275  Uint32 ptrI = RNIL;
3276  if (treeNodePtr.p->m_bits & TreeNode::T_KEYINFO_CONSTRUCTED)
3277  {
3278  jam();
3279  DEBUG("parent_row w/ T_KEYINFO_CONSTRUCTED");
3283  LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
3284  Local_pattern_store pattern(pool, treeNodePtr.p->m_keyPattern);
3285 
3286  bool keyIsNull;
3287  err = expand(ptrI, pattern, rowRef, keyIsNull);
3288  if (unlikely(err != 0))
3289  break;
3290 
3291  if (keyIsNull)
3292  {
3293  jam();
3294  DEBUG("Key contain NULL values");
3302  if (requestPtr.p->isScan())
3303  {
3309  jam();
3310  DEBUG("..Ignore impossible KEYREQ");
3311  if (ptrI != RNIL)
3312  {
3313  releaseSection(ptrI);
3314  }
3315  return; // Bailout, KEYREQ would have returned KEYREF(626) anyway
3316  }
3317  else // isLookup()
3318  {
3328  jam();
3329  }
3330  } // keyIsNull
3331 
3342  if (ptrI == RNIL) // TODO: remove when keyIsNull is completely handled
3343  {
3344  jam();
3354  err = createEmptySection(ptrI);
3355  if (unlikely(err != 0))
3356  break;
3357  }
3358 
3359  treeNodePtr.p->m_send.m_keyInfoPtrI = ptrI;
3360  }
3361 
3362  BuildKeyReq tmp;
3363  err = computeHash(signal, tmp, tableId, treeNodePtr.p->m_send.m_keyInfoPtrI);
3364  if (unlikely(err != 0))
3365  break;
3366 
3367  err = getNodes(signal, tmp, tableId);
3368  if (unlikely(err != 0))
3369  break;
3370 
3371  Uint32 attrInfoPtrI = treeNodePtr.p->m_send.m_attrInfoPtrI;
3372  if (treeNodePtr.p->m_bits & TreeNode::T_ATTRINFO_CONSTRUCTED)
3373  {
3374  jam();
3375  Uint32 tmp = RNIL;
3376  ndbrequire(dupSection(tmp, attrInfoPtrI)); // TODO handle error
3377 
3378  Uint32 org_size;
3379  {
3380  SegmentedSectionPtr ptr;
3381  getSection(ptr, tmp);
3382  org_size = ptr.sz;
3383  }
3384 
3385  bool hasNull;
3386  LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
3387  Local_pattern_store pattern(pool, treeNodePtr.p->m_attrParamPattern);
3388  err = expand(tmp, pattern, rowRef, hasNull);
3389  if (unlikely(err != 0))
3390  break;
3391 // ndbrequire(!hasNull);
3392 
3396  SegmentedSectionPtr ptr;
3397  getSection(ptr, tmp);
3398  Uint32 new_size = ptr.sz;
3399  Uint32 * sectionptrs = ptr.p->theData;
3400  sectionptrs[4] = new_size - org_size;
3401 
3402  treeNodePtr.p->m_send.m_attrInfoPtrI = tmp;
3403  }
3404 
3412  {
3413  /* We set the upper half word of m_correlation to the tuple ID
3414  * of the parent, such that the API can match this tuple with its
3415  * parent.
3416  * Then we re-use the tuple ID of the parent as the
3417  * tuple ID for this tuple also. Since the tuple ID
3418  * is unique within this batch and SPJ block for the parent operation,
3419  * it must also be unique for this operation.
3420  * This ensures that lookup operations with no user projection will
3421  * work, since such operations will have the same tuple ID as their
3422  * parents. The API will then be able to match a tuple with its
3423  * grandparent, even if it gets no tuple for the parent operation.*/
3424  treeNodePtr.p->m_send.m_correlation =
3425  (corrVal << 16) + (corrVal & 0xffff);
3426 
3427  treeNodePtr.p->m_send.m_ref = tmp.receiverRef;
3428  LqhKeyReq * dst = (LqhKeyReq*)treeNodePtr.p->m_lookup_data.m_lqhKeyReq;
3429  dst->hashValue = tmp.hashInfo[0];
3430  dst->fragmentData = tmp.fragId;
3431  Uint32 attrLen = 0;
3432  LqhKeyReq::setDistributionKey(attrLen, tmp.fragDistKey);
3433  dst->attrLen = attrLen;
3434  lookup_send(signal, requestPtr, treeNodePtr);
3435 
3436  if (treeNodePtr.p->m_bits & TreeNode::T_ATTRINFO_CONSTRUCTED)
3437  {
3438  jam();
3439  // restore
3440  treeNodePtr.p->m_send.m_attrInfoPtrI = attrInfoPtrI;
3441  }
3442  }
3443  return;
3444  } while (0);
3445 
3446  ndbrequire(false);
3447 }
3448 
3449 void
3450 Dbspj::lookup_parent_batch_complete(Signal* signal,
3451  Ptr<Request> requestPtr,
3452  Ptr<TreeNode> treeNodePtr)
3453 {
3454  jam();
3455 
3464  ndbassert(treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE);
3465 
3466  ndbassert(!treeNodePtr.p->m_lookup_data.m_parent_batch_complete);
3467  treeNodePtr.p->m_lookup_data.m_parent_batch_complete = true;
3468  if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE
3469  && treeNodePtr.p->m_lookup_data.m_outstanding == 0)
3470  {
3471  jam();
3472  // We have received all rows for this operation in this batch.
3473  reportBatchComplete(signal, requestPtr, treeNodePtr);
3474 
3475  // Prepare for next batch.
3476  treeNodePtr.p->m_lookup_data.m_parent_batch_complete = false;
3477  treeNodePtr.p->m_lookup_data.m_outstanding = 0;
3478  }
3479 }
3480 
3481 void
3482 Dbspj::lookup_abort(Signal* signal,
3483  Ptr<Request> requestPtr,
3484  Ptr<TreeNode> treeNodePtr)
3485 {
3486  jam();
3487 }
3488 
3489 Uint32
3490 Dbspj::lookup_execNODE_FAILREP(Signal* signal,
3491  Ptr<Request> requestPtr,
3492  Ptr<TreeNode> treeNodePtr,
3493  NdbNodeBitmask mask)
3494 {
3495  jam();
3496  Uint32 node = 0;
3497  Uint32 sum = 0;
3498  while (requestPtr.p->m_outstanding &&
3499  ((node = mask.find(node + 1)) != NdbNodeBitmask::NotFound))
3500  {
3501  Uint32 cnt = requestPtr.p->m_lookup_node_data[node];
3502  sum += cnt;
3503  requestPtr.p->m_lookup_node_data[node] = 0;
3504  }
3505 
3506  if (sum)
3507  {
3508  jam();
3509  ndbrequire(requestPtr.p->m_outstanding >= sum);
3510  requestPtr.p->m_outstanding -= sum;
3511  }
3512 
3513  return sum;
3514 }
3515 
3516 void
3517 Dbspj::lookup_cleanup(Ptr<Request> requestPtr,
3518  Ptr<TreeNode> treeNodePtr)
3519 {
3520  cleanup_common(requestPtr, treeNodePtr);
3521 }
3522 
3523 
3524 Uint32
3525 Dbspj::handle_special_hash(Uint32 tableId, Uint32 dstHash[4],
3526  const Uint64* src,
3527  Uint32 srcLen, // Len in #32bit words
3528  const KeyDescriptor* desc)
3529 {
3530  const Uint32 MAX_KEY_SIZE_IN_LONG_WORDS=
3531  (MAX_KEY_SIZE_IN_WORDS + 1) / 2;
3532  Uint64 alignedWorkspace[MAX_KEY_SIZE_IN_LONG_WORDS * MAX_XFRM_MULTIPLY];
3533  const bool hasVarKeys = desc->noOfVarKeys > 0;
3534  const bool hasCharAttr = desc->hasCharAttr;
3535  const bool compute_distkey = desc->noOfDistrKeys > 0;
3536 
3537  const Uint64 *hashInput = 0;
3538  Uint32 inputLen = 0;
3539  Uint32 keyPartLen[MAX_ATTRIBUTES_IN_INDEX];
3540  Uint32 * keyPartLenPtr;
3541 
3542  /* Normalise KeyInfo into workspace if necessary */
3543  if (hasCharAttr || (compute_distkey && hasVarKeys))
3544  {
3545  hashInput = alignedWorkspace;
3546  keyPartLenPtr = keyPartLen;
3547  inputLen = xfrm_key(tableId,
3548  (Uint32*)src,
3549  (Uint32*)alignedWorkspace,
3550  sizeof(alignedWorkspace) >> 2,
3551  keyPartLenPtr);
3552  if (unlikely(inputLen == 0))
3553  {
3554  return 290; // 'Corrupt key in TC, unable to xfrm'
3555  }
3556  }
3557  else
3558  {
3559  /* Keyinfo already suitable for hash */
3560  hashInput = src;
3561  inputLen = srcLen;
3562  keyPartLenPtr = 0;
3563  }
3564 
3565  /* Calculate primary key hash */
3566  md5_hash(dstHash, hashInput, inputLen);
3567 
3568  /* If the distribution key != primary key then we have to
3569  * form a distribution key from the primary key and calculate
3570  * a separate distribution hash based on this
3571  */
3572  if (compute_distkey)
3573  {
3574  jam();
3575 
3576  Uint32 distrKeyHash[4];
3577  /* Reshuffle primary key columns to get just distribution key */
3578  Uint32 len = create_distr_key(tableId, (Uint32*)hashInput, (Uint32*)alignedWorkspace, keyPartLenPtr);
3579  /* Calculate distribution key hash */
3580  md5_hash(distrKeyHash, alignedWorkspace, len);
3581 
3582  /* Just one word used for distribution */
3583  dstHash[1] = distrKeyHash[1];
3584  }
3585  return 0;
3586 }
3587 
3588 Uint32
3589 Dbspj::computeHash(Signal* signal,
3590  BuildKeyReq& dst, Uint32 tableId, Uint32 ptrI)
3591 {
3596  SegmentedSectionPtr ptr;
3597  getSection(ptr, ptrI);
3598 
3599  /* NOTE: md5_hash below require 64-bit alignment
3600  */
3601  const Uint32 MAX_KEY_SIZE_IN_LONG_WORDS=
3602  (MAX_KEY_SIZE_IN_WORDS + 1) / 2;
3603  Uint64 tmp64[MAX_KEY_SIZE_IN_LONG_WORDS];
3604  Uint32 *tmp32 = (Uint32*)tmp64;
3605  copy(tmp32, ptr);
3606 
3607  const KeyDescriptor* desc = g_key_descriptor_pool.getPtr(tableId);
3608  ndbrequire(desc != NULL);
3609 
3610  bool need_special_hash = desc->hasCharAttr | (desc->noOfDistrKeys > 0);
3611  if (need_special_hash)
3612  {
3613  jam();
3614  return handle_special_hash(tableId, dst.hashInfo, tmp64, ptr.sz, desc);
3615  }
3616  else
3617  {
3618  jam();
3619  md5_hash(dst.hashInfo, tmp64, ptr.sz);
3620  return 0;
3621  }
3622 }
3623 
3628 Uint32
3629 Dbspj::computePartitionHash(Signal* signal,
3630  BuildKeyReq& dst, Uint32 tableId, Uint32 ptrI)
3631 {
3632  SegmentedSectionPtr ptr;
3633  getSection(ptr, ptrI);
3634 
3635  /* NOTE: md5_hash below require 64-bit alignment
3636  */
3637  const Uint32 MAX_KEY_SIZE_IN_LONG_WORDS=
3638  (MAX_KEY_SIZE_IN_WORDS + 1) / 2;
3639  Uint64 _space[MAX_KEY_SIZE_IN_LONG_WORDS];
3640  Uint64 *tmp64 = _space;
3641  Uint32 *tmp32 = (Uint32*)tmp64;
3642  Uint32 sz = ptr.sz;
3643  copy(tmp32, ptr);
3644 
3645  const KeyDescriptor* desc = g_key_descriptor_pool.getPtr(tableId);
3646  ndbrequire(desc != NULL);
3647 
3648  bool need_xfrm = desc->hasCharAttr || desc->noOfVarKeys;
3649  if (need_xfrm)
3650  {
3651  jam();
3655  Uint32 srcPos = 0;
3656  Uint32 dstPos = 0;
3657  Uint32 * src = tmp32;
3658  Uint32 * dst = signal->theData+24;
3659  for (Uint32 i = 0; i < desc->noOfKeyAttr; i++)
3660  {
3661  const KeyDescriptor::KeyAttr& keyAttr = desc->keyAttr[i];
3662  if (AttributeDescriptor::getDKey(keyAttr.attributeDescriptor))
3663  {
3664  xfrm_attr(keyAttr.attributeDescriptor, keyAttr.charsetInfo,
3665  src, srcPos, dst, dstPos,
3666  NDB_ARRAY_SIZE(signal->theData) - 24);
3667  }
3668  }
3669  tmp64 = (Uint64*)dst;
3670  sz = dstPos;
3671  }
3672 
3673  md5_hash(dst.hashInfo, tmp64, sz);
3674  return 0;
3675 }
3676 
3677 Uint32
3678 Dbspj::getNodes(Signal* signal, BuildKeyReq& dst, Uint32 tableId)
3679 {
3680  Uint32 err;
3681  DiGetNodesReq * req = (DiGetNodesReq *)&signal->theData[0];
3682  req->tableId = tableId;
3683  req->hashValue = dst.hashInfo[1];
3684  req->distr_key_indicator = 0; // userDefinedPartitioning not supported!
3685  * (EmulatedJamBuffer**)req->jamBuffer = jamBuffer();
3686 
3687 #if 1
3688  EXECUTE_DIRECT(DBDIH, GSN_DIGETNODESREQ, signal,
3689  DiGetNodesReq::SignalLength, 0);
3690 #else
3691  sendSignal(DBDIH_REF, GSN_DIGETNODESREQ, signal,
3692  DiGetNodesReq::SignalLength, JBB);
3693  jamEntry();
3694 
3695 #endif
3696 
3697  DiGetNodesConf * conf = (DiGetNodesConf *)&signal->theData[0];
3698  err = signal->theData[0];
3699  Uint32 Tdata2 = conf->reqinfo;
3700  Uint32 nodeId = conf->nodes[0];
3701  Uint32 instanceKey = (Tdata2 >> 24) & 127;
3702 
3703  DEBUG("HASH to nodeId:" << nodeId << ", instanceKey:" << instanceKey);
3704 
3705  jamEntry();
3706  if (unlikely(err != 0))
3707  goto error;
3708 
3709  dst.fragId = conf->fragId;
3710  dst.fragDistKey = (Tdata2 >> 16) & 255;
3711  dst.receiverRef = numberToRef(DBLQH, instanceKey, nodeId);
3712 
3713  return 0;
3714 
3715 error:
3719  ndbrequire(false);
3720  return err;
3721 }
3722 
3732 const Dbspj::OpInfo
3733 Dbspj::g_ScanFragOpInfo =
3734 {
3735  &Dbspj::scanFrag_build,
3736  0, // prepare
3737  &Dbspj::scanFrag_start,
3738  &Dbspj::scanFrag_execTRANSID_AI,
3739  0, // execLQHKEYREF
3740  0, // execLQHKEYCONF
3741  &Dbspj::scanFrag_execSCAN_FRAGREF,
3742  &Dbspj::scanFrag_execSCAN_FRAGCONF,
3743  0, // parent row
3744  0, // parent batch complete
3745  0, // parent batch repeat
3746  0, // Dbspj::scanFrag_parent_batch_cleanup,
3747  &Dbspj::scanFrag_execSCAN_NEXTREQ,
3748  0, // Dbspj::scanFrag_complete
3749  &Dbspj::scanFrag_abort,
3750  0, // execNODE_FAILREP,
3751  &Dbspj::scanFrag_cleanup
3752 };
3753 
3754 Uint32
3755 Dbspj::scanFrag_build(Build_context& ctx,
3756  Ptr<Request> requestPtr,
3757  const QueryNode* qn,
3758  const QueryNodeParameters* qp)
3759 {
3760  Uint32 err = 0;
3761  Ptr<TreeNode> treeNodePtr;
3762  const QN_ScanFragNode * node = (const QN_ScanFragNode*)qn;
3763  const QN_ScanFragParameters * param = (const QN_ScanFragParameters*)qp;
3764 
3765  do
3766  {
3767  err = createNode(ctx, requestPtr, treeNodePtr);
3768  if (unlikely(err != 0))
3769  break;
3770 
3771  treeNodePtr.p->m_scanfrag_data.m_scanFragHandlePtrI = RNIL;
3772  Ptr<ScanFragHandle> scanFragHandlePtr;
3773  if (unlikely(m_scanfraghandle_pool.seize(requestPtr.p->m_arena,
3774  scanFragHandlePtr) != true))
3775  {
3776  err = DbspjErr::OutOfQueryMemory;
3777  break;
3778  }
3779 
3780  scanFragHandlePtr.p->m_treeNodePtrI = treeNodePtr.i;
3781  scanFragHandlePtr.p->m_state = ScanFragHandle::SFH_NOT_STARTED;
3782  treeNodePtr.p->m_scanfrag_data.m_scanFragHandlePtrI = scanFragHandlePtr.i;
3783 
3784  requestPtr.p->m_bits |= Request::RT_SCAN;
3785  treeNodePtr.p->m_info = &g_ScanFragOpInfo;
3786  treeNodePtr.p->m_bits |= TreeNode::T_ATTR_INTERPRETED;
3787  treeNodePtr.p->m_batch_size = ctx.m_batch_size_rows;
3788 
3789  ScanFragReq*dst=(ScanFragReq*)treeNodePtr.p->m_scanfrag_data.m_scanFragReq;
3790  dst->senderData = scanFragHandlePtr.i;
3791  dst->resultRef = reference();
3792  dst->resultData = treeNodePtr.i;
3793  dst->savePointId = ctx.m_savepointId;
3794 
3795  Uint32 transId1 = requestPtr.p->m_transId[0];
3796  Uint32 transId2 = requestPtr.p->m_transId[1];
3797  dst->transId1 = transId1;
3798  dst->transId2 = transId2;
3799 
3800  Uint32 treeBits = node->requestInfo;
3801  Uint32 paramBits = param->requestInfo;
3802  //ndbout_c("Dbspj::scanFrag_build() treeBits=%.8x paramBits=%.8x",
3803  // treeBits, paramBits);
3804  Uint32 requestInfo = 0;
3805  ScanFragReq::setReadCommittedFlag(requestInfo, 1);
3806  ScanFragReq::setScanPrio(requestInfo, ctx.m_scanPrio);
3807  ScanFragReq::setCorrFactorFlag(requestInfo, 1);
3808  ScanFragReq::setNoDiskFlag(requestInfo,
3809  (treeBits & DABits::NI_LINKED_DISK) == 0 &&
3810  (paramBits & DABits::PI_DISK_ATTR) == 0);
3811  dst->requestInfo = requestInfo;
3812 
3813  err = DbspjErr::InvalidTreeNodeSpecification;
3814  DEBUG("scanFrag_build: len=" << node->len);
3815  if (unlikely(node->len < QN_ScanFragNode::NodeSize))
3816  break;
3817 
3818  dst->tableId = node->tableId;
3819  dst->schemaVersion = node->tableVersion;
3820 
3821  err = DbspjErr::InvalidTreeParametersSpecification;
3822  DEBUG("param len: " << param->len);
3823  if (unlikely(param->len < QN_ScanFragParameters::NodeSize))
3824  {
3825  jam();
3826  DEBUG_CRASH();
3827  break;
3828  }
3829 
3830  ctx.m_resultData = param->resultData;
3831 
3835  struct DABuffer nodeDA, paramDA;
3836  nodeDA.ptr = node->optional;
3837  nodeDA.end = nodeDA.ptr + (node->len - QN_ScanFragNode::NodeSize);
3838  paramDA.ptr = param->optional;
3839  paramDA.end = paramDA.ptr + (param->len - QN_ScanFragParameters::NodeSize);
3840  err = parseDA(ctx, requestPtr, treeNodePtr,
3841  nodeDA, treeBits, paramDA, paramBits);
3842  if (unlikely(err != 0))
3843  {
3844  jam();
3845  DEBUG_CRASH();
3846  break;
3847  }
3848 
3849  ctx.m_scan_cnt++;
3850  ctx.m_scans.set(treeNodePtr.p->m_node_no);
3851 
3852  if (ctx.m_start_signal)
3853  {
3854  jam();
3855  Signal* signal = ctx.m_start_signal;
3856  const ScanFragReq* src = (const ScanFragReq*)(signal->getDataPtr());
3857 
3858 #if NOT_YET
3859  Uint32 instanceNo =
3860  blockToInstance(signal->header.theReceiversBlockNumber);
3861  treeNodePtr.p->m_send.m_ref = numberToRef(DBLQH,
3862  instanceNo, getOwnNodeId());
3863 #else
3864  treeNodePtr.p->m_send.m_ref =
3865  numberToRef(DBLQH, getInstanceKey(src->tableId,
3866  src->fragmentNoKeyLen),
3867  getOwnNodeId());
3868 #endif
3869 
3870  Uint32 fragId = src->fragmentNoKeyLen;
3871  Uint32 requestInfo = src->requestInfo;
3872  Uint32 batch_size_bytes = src->batch_size_bytes;
3873  Uint32 batch_size_rows = src->batch_size_rows;
3874 
3875 #ifdef VM_TRACE
3876  Uint32 savePointId = src->savePointId;
3877  Uint32 tableId = src->tableId;
3878  Uint32 schemaVersion = src->schemaVersion;
3879  Uint32 transId1 = src->transId1;
3880  Uint32 transId2 = src->transId2;
3881 #endif
3882  ndbassert(ScanFragReq::getLockMode(requestInfo) == 0);
3883  ndbassert(ScanFragReq::getHoldLockFlag(requestInfo) == 0);
3884  ndbassert(ScanFragReq::getKeyinfoFlag(requestInfo) == 0);
3885  ndbassert(ScanFragReq::getReadCommittedFlag(requestInfo) == 1);
3886  ndbassert(ScanFragReq::getLcpScanFlag(requestInfo) == 0);
3887  //ScanFragReq::getAttrLen(requestInfo); // ignore
3888  ndbassert(ScanFragReq::getReorgFlag(requestInfo) == 0);
3889 
3890  Uint32 tupScanFlag = ScanFragReq::getTupScanFlag(requestInfo);
3891  Uint32 rangeScanFlag = ScanFragReq::getRangeScanFlag(requestInfo);
3892  Uint32 descendingFlag = ScanFragReq::getDescendingFlag(requestInfo);
3893  Uint32 scanPrio = ScanFragReq::getScanPrio(requestInfo);
3894 
3895  Uint32 dst_requestInfo = dst->requestInfo;
3896 
3897  ScanFragReq::setTupScanFlag(dst_requestInfo,tupScanFlag);
3898  ScanFragReq::setRangeScanFlag(dst_requestInfo,rangeScanFlag);
3899  ScanFragReq::setDescendingFlag(dst_requestInfo,descendingFlag);
3900  ScanFragReq::setScanPrio(dst_requestInfo,scanPrio);
3901 
3905  ndbassert(ScanFragReq::getNoDiskFlag(requestInfo) ==
3906  ScanFragReq::getNoDiskFlag(dst_requestInfo));
3907 
3908  dst->fragmentNoKeyLen = fragId;
3909  dst->requestInfo = dst_requestInfo;
3910  dst->batch_size_bytes = batch_size_bytes;
3911  dst->batch_size_rows = batch_size_rows;
3912 
3913 #ifdef VM_TRACE
3914  ndbassert(dst->savePointId == savePointId);
3915  ndbassert(dst->tableId == tableId);
3916  ndbassert(dst->schemaVersion == schemaVersion);
3917  ndbassert(dst->transId1 == transId1);
3918  ndbassert(dst->transId2 == transId2);
3919 #endif
3920 
3921  treeNodePtr.p->m_send.m_keyInfoPtrI = ctx.m_keyPtr.i;
3922  treeNodePtr.p->m_bits |= TreeNode::T_ONE_SHOT;
3923 
3924  if (rangeScanFlag)
3925  {
3926  c_Counters.incr_counter(CI_RANGE_SCANS_RECEIVED, 1);
3927  }
3928  else
3929  {
3930  c_Counters.incr_counter(CI_TABLE_SCANS_RECEIVED, 1);
3931  }
3932  }
3933  else
3934  {
3935  ndbrequire(false);
3936  }
3937 
3938  return 0;
3939  } while (0);
3940 
3941  return err;
3942 }
3943 
3944 void
3945 Dbspj::scanFrag_start(Signal* signal,
3946  Ptr<Request> requestPtr,
3947  Ptr<TreeNode> treeNodePtr)
3948 {
3949  scanFrag_send(signal, requestPtr, treeNodePtr);
3950 }
3951 
3952 void
3953 Dbspj::scanFrag_send(Signal* signal,
3954  Ptr<Request> requestPtr,
3955  Ptr<TreeNode> treeNodePtr)
3956 {
3957  jam();
3958 
3959  requestPtr.p->m_outstanding++;
3960  requestPtr.p->m_cnt_active++;
3961  treeNodePtr.p->m_state = TreeNode::TN_ACTIVE;
3962  Ptr<ScanFragHandle> scanFragHandlePtr;
3963  m_scanfraghandle_pool.getPtr(scanFragHandlePtr, treeNodePtr.p->
3964  m_scanfrag_data.m_scanFragHandlePtrI);
3965 
3966  ScanFragReq* req = reinterpret_cast<ScanFragReq*>(signal->getDataPtrSend());
3967 
3968  memcpy(req, treeNodePtr.p->m_scanfrag_data.m_scanFragReq,
3969  sizeof(treeNodePtr.p->m_scanfrag_data.m_scanFragReq));
3970  req->variableData[0] = treeNodePtr.p->m_send.m_correlation;
3971  req->variableData[1] = requestPtr.p->m_rootResultData;
3972 
3973  SectionHandle handle(this);
3974 
3975  Uint32 ref = treeNodePtr.p->m_send.m_ref;
3976  Uint32 keyInfoPtrI = treeNodePtr.p->m_send.m_keyInfoPtrI;
3977  Uint32 attrInfoPtrI = treeNodePtr.p->m_send.m_attrInfoPtrI;
3978 
3982  ndbrequire(treeNodePtr.p->m_bits & TreeNode::T_ONE_SHOT);
3983 
3987  treeNodePtr.p->m_send.m_attrInfoPtrI = RNIL;
3988  treeNodePtr.p->m_send.m_keyInfoPtrI = RNIL;
3989 
3990  getSection(handle.m_ptr[0], attrInfoPtrI);
3991  handle.m_cnt = 1;
3992 
3993  if (keyInfoPtrI != RNIL)
3994  {
3995  jam();
3996  getSection(handle.m_ptr[1], keyInfoPtrI);
3997  handle.m_cnt = 2;
3998  }
3999 
4000 #ifdef DEBUG_SCAN_FRAGREQ
4001  ndbout_c("SCAN_FRAGREQ to %x", ref);
4002  printSCAN_FRAGREQ(stdout, signal->getDataPtrSend(),
4003  NDB_ARRAY_SIZE(treeNodePtr.p->m_scanfrag_data.m_scanFragReq),
4004  DBLQH);
4005  printf("ATTRINFO: ");
4006  print(handle.m_ptr[0], stdout);
4007  if (handle.m_cnt > 1)
4008  {
4009  printf("KEYINFO: ");
4010  print(handle.m_ptr[1], stdout);
4011  }
4012 #endif
4013 
4014  if (ScanFragReq::getRangeScanFlag(req->requestInfo))
4015  {
4016  c_Counters.incr_counter(CI_LOCAL_RANGE_SCANS_SENT, 1);
4017  }
4018  else
4019  {
4020  c_Counters.incr_counter(CI_LOCAL_TABLE_SCANS_SENT, 1);
4021  }
4022 
4023  ndbrequire(refToNode(ref) == getOwnNodeId());
4024  sendSignal(ref, GSN_SCAN_FRAGREQ, signal,
4025  NDB_ARRAY_SIZE(treeNodePtr.p->m_scanfrag_data.m_scanFragReq),
4026  JBB, &handle);
4027 
4028  scanFragHandlePtr.p->m_state = ScanFragHandle::SFH_SCANNING;
4029  treeNodePtr.p->m_scanfrag_data.m_rows_received = 0;
4030  treeNodePtr.p->m_scanfrag_data.m_rows_expecting = ~Uint32(0);
4031 }
4032 
4033 void
4034 Dbspj::scanFrag_execTRANSID_AI(Signal* signal,
4035  Ptr<Request> requestPtr,
4036  Ptr<TreeNode> treeNodePtr,
4037  const RowPtr & rowRef)
4038 {
4039  jam();
4040  treeNodePtr.p->m_scanfrag_data.m_rows_received++;
4041 
4042  LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
4043  Local_dependency_map list(pool, treeNodePtr.p->m_dependent_nodes);
4044  Dependency_map::ConstDataBufferIterator it;
4045 
4046  {
4047  for (list.first(it); !it.isNull(); list.next(it))
4048  {
4049  jam();
4050  Ptr<TreeNode> childPtr;
4051  m_treenode_pool.getPtr(childPtr, * it.data);
4052  ndbrequire(childPtr.p->m_info != 0&&childPtr.p->m_info->m_parent_row!=0);
4053  (this->*(childPtr.p->m_info->m_parent_row))(signal,
4054  requestPtr, childPtr,rowRef);
4055  }
4056  }
4057 
4058  if (treeNodePtr.p->m_scanfrag_data.m_rows_received ==
4059  treeNodePtr.p->m_scanfrag_data.m_rows_expecting)
4060  {
4061  jam();
4062 
4063  if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE)
4064  {
4065  jam();
4066  reportBatchComplete(signal, requestPtr, treeNodePtr);
4067  }
4068 
4069  checkBatchComplete(signal, requestPtr, 1);
4070  return;
4071  }
4072 }
4073 
4074 void
4075 Dbspj::scanFrag_execSCAN_FRAGREF(Signal* signal,
4076  Ptr<Request> requestPtr,
4077  Ptr<TreeNode> treeNodePtr,
4078  Ptr<ScanFragHandle> scanFragHandlePtr)
4079 {
4080  const ScanFragRef* rep =
4081  reinterpret_cast<const ScanFragRef*>(signal->getDataPtr());
4082  Uint32 errCode = rep->errorCode;
4083 
4084  DEBUG("scanFrag_execSCAN_FRAGREF, rep->senderData:" << rep->senderData
4085  << ", requestPtr.p->m_senderData:" << requestPtr.p->m_senderData);
4086  scanFragHandlePtr.p->m_state = ScanFragHandle::SFH_COMPLETE;
4087  ndbrequire(treeNodePtr.p->m_state == TreeNode::TN_ACTIVE);
4088  ndbrequire(requestPtr.p->m_cnt_active);
4089  requestPtr.p->m_cnt_active--;
4090  ndbrequire(requestPtr.p->m_outstanding);
4091  requestPtr.p->m_outstanding--;
4092  treeNodePtr.p->m_state = TreeNode::TN_INACTIVE;
4093 
4094  abort(signal, requestPtr, errCode);
4095 }
4096 
4097 
4098 void
4099 Dbspj::scanFrag_execSCAN_FRAGCONF(Signal* signal,
4100  Ptr<Request> requestPtr,
4101  Ptr<TreeNode> treeNodePtr,
4102  Ptr<ScanFragHandle> scanFragHandlePtr)
4103 {
4104  const ScanFragConf * conf =
4105  reinterpret_cast<const ScanFragConf*>(signal->getDataPtr());
4106  Uint32 rows = conf->completedOps;
4107  Uint32 done = conf->fragmentCompleted;
4108 
4109  Uint32 state = scanFragHandlePtr.p->m_state;
4110  if (state == ScanFragHandle::SFH_WAIT_CLOSE && done == 0)
4111  {
4112  jam();
4116  return;
4117  }
4118 
4119  ndbrequire(done <= 2); // 0, 1, 2 (=ZSCAN_FRAG_CLOSED)
4120 
4121  ndbassert(treeNodePtr.p->m_scanfrag_data.m_rows_expecting == ~Uint32(0));
4122  treeNodePtr.p->m_scanfrag_data.m_rows_expecting = rows;
4123  if (treeNodePtr.p->isLeaf())
4124  {
4129  treeNodePtr.p->m_scanfrag_data.m_rows_received = rows;
4130  }
4131 
4132  requestPtr.p->m_rows += rows;
4133  if (done)
4134  {
4135  jam();
4136 
4137  ndbrequire(requestPtr.p->m_cnt_active);
4138  requestPtr.p->m_cnt_active--;
4139  treeNodePtr.p->m_state = TreeNode::TN_INACTIVE;
4140  scanFragHandlePtr.p->m_state = ScanFragHandle::SFH_COMPLETE;
4141  }
4142  else
4143  {
4144  jam();
4145  scanFragHandlePtr.p->m_state = ScanFragHandle::SFH_WAIT_NEXTREQ;
4146  }
4147 
4148  if (treeNodePtr.p->m_scanfrag_data.m_rows_expecting ==
4149  treeNodePtr.p->m_scanfrag_data.m_rows_received ||
4150  (state == ScanFragHandle::SFH_WAIT_CLOSE))
4151  {
4152  jam();
4153 
4154  if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE)
4155  {
4156  jam();
4157  reportBatchComplete(signal, requestPtr, treeNodePtr);
4158  }
4159 
4160  checkBatchComplete(signal, requestPtr, 1);
4161  return;
4162  }
4163 }
4164 
4165 void
4166 Dbspj::scanFrag_execSCAN_NEXTREQ(Signal* signal,
4167  Ptr<Request> requestPtr,
4168  Ptr<TreeNode> treeNodePtr)
4169 {
4170  jamEntry();
4171 
4172  Ptr<ScanFragHandle> scanFragHandlePtr;
4173  m_scanfraghandle_pool.getPtr(scanFragHandlePtr, treeNodePtr.p->
4174  m_scanfrag_data.m_scanFragHandlePtrI);
4175 
4176  const ScanFragReq * org =
4177  (ScanFragReq*)treeNodePtr.p->m_scanfrag_data.m_scanFragReq;
4178 
4179  ScanFragNextReq* req =
4180  reinterpret_cast<ScanFragNextReq*>(signal->getDataPtrSend());
4181  req->senderData = treeNodePtr.p->m_scanfrag_data.m_scanFragHandlePtrI;
4182  req->requestInfo = 0;
4183  req->transId1 = requestPtr.p->m_transId[0];
4184  req->transId2 = requestPtr.p->m_transId[1];
4185  req->batch_size_rows = org->batch_size_rows;
4186  req->batch_size_bytes = org->batch_size_bytes;
4187 
4188  DEBUG("scanFrag_execSCAN_NEXTREQ to: " << hex << treeNodePtr.p->m_send.m_ref
4189  << ", senderData: " << req->senderData);
4190 #ifdef DEBUG_SCAN_FRAGREQ
4191  printSCANFRAGNEXTREQ(stdout, &signal->theData[0],
4192  ScanFragNextReq::SignalLength, DBLQH);
4193 #endif
4194 
4195  sendSignal(treeNodePtr.p->m_send.m_ref,
4196  GSN_SCAN_NEXTREQ,
4197  signal,
4198  ScanFragNextReq::SignalLength,
4199  JBB);
4200 
4201  treeNodePtr.p->m_scanfrag_data.m_rows_received = 0;
4202  treeNodePtr.p->m_scanfrag_data.m_rows_expecting = ~Uint32(0);
4203  requestPtr.p->m_outstanding++;
4204  scanFragHandlePtr.p->m_state = ScanFragHandle::SFH_SCANNING;
4205 }//Dbspj::scanFrag_execSCAN_NEXTREQ()
4206 
4207 void
4208 Dbspj::scanFrag_abort(Signal* signal,
4209  Ptr<Request> requestPtr,
4210  Ptr<TreeNode> treeNodePtr)
4211 {
4212  jam();
4213 
4214  Ptr<ScanFragHandle> scanFragHandlePtr;
4215  m_scanfraghandle_pool.getPtr(scanFragHandlePtr, treeNodePtr.p->
4216  m_scanfrag_data.m_scanFragHandlePtrI);
4217  if (treeNodePtr.p->m_state == TreeNode::TN_ACTIVE)
4218  {
4219  jam();
4220 
4221  switch(scanFragHandlePtr.p->m_state){
4222  case ScanFragHandle::SFH_NOT_STARTED:
4223  case ScanFragHandle::SFH_COMPLETE:
4224  ndbrequire(false); // we shouldnt be TN_ACTIVE then...
4225 
4226  case ScanFragHandle::SFH_WAIT_CLOSE:
4227  jam();
4228  // close already sent
4229  return;
4230  case ScanFragHandle::SFH_WAIT_NEXTREQ:
4231  jam();
4232  // we were idle
4233  requestPtr.p->m_outstanding++;
4234  break;
4235  case ScanFragHandle::SFH_SCANNING:
4236  jam();
4237  break;
4238  }
4239 
4240  treeNodePtr.p->m_scanfrag_data.m_rows_expecting = ~Uint32(0);
4241  scanFragHandlePtr.p->m_state = ScanFragHandle::SFH_WAIT_CLOSE;
4242 
4243  ScanFragNextReq* req =
4244  reinterpret_cast<ScanFragNextReq*>(signal->getDataPtrSend());
4245  req->senderData = treeNodePtr.p->m_scanfrag_data.m_scanFragHandlePtrI;
4246  req->requestInfo = ScanFragNextReq::ZCLOSE;
4247  req->transId1 = requestPtr.p->m_transId[0];
4248  req->transId2 = requestPtr.p->m_transId[1];
4249  req->batch_size_rows = 0;
4250  req->batch_size_bytes = 0;
4251 
4252  sendSignal(treeNodePtr.p->m_send.m_ref,
4253  GSN_SCAN_NEXTREQ,
4254  signal,
4255  ScanFragNextReq::SignalLength,
4256  JBB);
4257  }
4258 }
4259 
4260 
4261 void
4262 Dbspj::scanFrag_cleanup(Ptr<Request> requestPtr,
4263  Ptr<TreeNode> treeNodePtr)
4264 {
4265  Uint32 ptrI = treeNodePtr.p->m_scanfrag_data.m_scanFragHandlePtrI;
4266  if (ptrI != RNIL)
4267  {
4268  m_scanfraghandle_pool.release(ptrI);
4269  }
4270  cleanup_common(requestPtr, treeNodePtr);
4271 }
4272 
4282 const Dbspj::OpInfo
4283 Dbspj::g_ScanIndexOpInfo =
4284 {
4285  &Dbspj::scanIndex_build,
4286  &Dbspj::scanIndex_prepare,
4287  0, // start
4288  &Dbspj::scanIndex_execTRANSID_AI,
4289  0, // execLQHKEYREF
4290  0, // execLQHKEYCONF
4291  &Dbspj::scanIndex_execSCAN_FRAGREF,
4292  &Dbspj::scanIndex_execSCAN_FRAGCONF,
4293  &Dbspj::scanIndex_parent_row,
4294  &Dbspj::scanIndex_parent_batch_complete,
4295  &Dbspj::scanIndex_parent_batch_repeat,
4296  &Dbspj::scanIndex_parent_batch_cleanup,
4297  &Dbspj::scanIndex_execSCAN_NEXTREQ,
4298  &Dbspj::scanIndex_complete,
4299  &Dbspj::scanIndex_abort,
4300  &Dbspj::scanIndex_execNODE_FAILREP,
4301  &Dbspj::scanIndex_cleanup
4302 };
4303 
4304 Uint32
4305 Dbspj::scanIndex_build(Build_context& ctx,
4306  Ptr<Request> requestPtr,
4307  const QueryNode* qn,
4308  const QueryNodeParameters* qp)
4309 {
4310  Uint32 err = 0;
4311  Ptr<TreeNode> treeNodePtr;
4312  const QN_ScanIndexNode * node = (const QN_ScanIndexNode*)qn;
4313  const QN_ScanIndexParameters * param = (const QN_ScanIndexParameters*)qp;
4314 
4315  do
4316  {
4317  err = createNode(ctx, requestPtr, treeNodePtr);
4318  if (unlikely(err != 0))
4319  break;
4320 
4321  Uint32 batchSize = param->batchSize;
4322 
4323  requestPtr.p->m_bits |= Request::RT_SCAN;
4324  requestPtr.p->m_bits |= Request::RT_NEED_PREPARE;
4325  requestPtr.p->m_bits |= Request::RT_NEED_COMPLETE;
4326  treeNodePtr.p->m_info = &g_ScanIndexOpInfo;
4327  treeNodePtr.p->m_bits |= TreeNode::T_ATTR_INTERPRETED;
4328  treeNodePtr.p->m_bits |= TreeNode::T_NEED_REPORT_BATCH_COMPLETED;
4329  treeNodePtr.p->m_batch_size =
4330  batchSize & ~(0xFFFFFFFF << QN_ScanIndexParameters::BatchRowBits);
4331 
4332  ScanFragReq*dst=(ScanFragReq*)treeNodePtr.p->m_scanindex_data.m_scanFragReq;
4333  dst->senderData = treeNodePtr.i;
4334  dst->resultRef = reference();
4335  dst->resultData = treeNodePtr.i;
4336  dst->savePointId = ctx.m_savepointId;
4337  dst->batch_size_rows =
4338  batchSize & ~(0xFFFFFFFF << QN_ScanIndexParameters::BatchRowBits);
4339  dst->batch_size_bytes = batchSize >> QN_ScanIndexParameters::BatchRowBits;
4340 
4341  Uint32 transId1 = requestPtr.p->m_transId[0];
4342  Uint32 transId2 = requestPtr.p->m_transId[1];
4343  dst->transId1 = transId1;
4344  dst->transId2 = transId2;
4345 
4346  Uint32 treeBits = node->requestInfo;
4347  Uint32 paramBits = param->requestInfo;
4348  Uint32 requestInfo = 0;
4349  ScanFragReq::setRangeScanFlag(requestInfo, 1);
4350  ScanFragReq::setReadCommittedFlag(requestInfo, 1);
4351  ScanFragReq::setScanPrio(requestInfo, ctx.m_scanPrio);
4352  ScanFragReq::setNoDiskFlag(requestInfo,
4353  (treeBits & DABits::NI_LINKED_DISK) == 0 &&
4354  (paramBits & DABits::PI_DISK_ATTR) == 0);
4355  ScanFragReq::setCorrFactorFlag(requestInfo, 1);
4356  dst->requestInfo = requestInfo;
4357 
4358  err = DbspjErr::InvalidTreeNodeSpecification;
4359  DEBUG("scanIndex_build: len=" << node->len);
4360  if (unlikely(node->len < QN_ScanIndexNode::NodeSize))
4361  break;
4362 
4363  dst->tableId = node->tableId;
4364  dst->schemaVersion = node->tableVersion;
4365 
4366  err = DbspjErr::InvalidTreeParametersSpecification;
4367  DEBUG("param len: " << param->len);
4368  if (unlikely(param->len < QN_ScanIndexParameters::NodeSize))
4369  {
4370  jam();
4371  DEBUG_CRASH();
4372  break;
4373  }
4374 
4375  ctx.m_resultData = param->resultData;
4376 
4380  struct DABuffer nodeDA, paramDA;
4381  nodeDA.ptr = node->optional;
4382  nodeDA.end = nodeDA.ptr + (node->len - QN_ScanIndexNode::NodeSize);
4383  paramDA.ptr = param->optional;
4384  paramDA.end = paramDA.ptr + (param->len - QN_ScanIndexParameters::NodeSize);
4385 
4386  err = parseScanIndex(ctx, requestPtr, treeNodePtr,
4387  nodeDA, treeBits, paramDA, paramBits);
4388 
4389  if (unlikely(err != 0))
4390  {
4391  jam();
4392  DEBUG_CRASH();
4393  break;
4394  }
4395 
4400  Ptr<TreeNode> nodePtr;
4401  nodePtr.i = treeNodePtr.p->m_parentPtrI;
4402  while (nodePtr.i != RNIL)
4403  {
4404  jam();
4405  m_treenode_pool.getPtr(nodePtr);
4406  nodePtr.p->m_bits |= TreeNode::T_REPORT_BATCH_COMPLETE;
4407  nodePtr.p->m_bits |= TreeNode::T_NEED_REPORT_BATCH_COMPLETED;
4408  nodePtr.i = nodePtr.p->m_parentPtrI;
4409  }
4410 
4420  if (requestPtr.p->m_bits & Request::RT_REPEAT_SCAN_RESULT &&
4421  !treeNodePtr.p->m_ancestors.contains(ctx.m_scans))
4422  {
4423  treeNodePtr.p->m_bits |= TreeNode::T_SCAN_REPEATABLE;
4424  }
4425 
4426  ctx.m_scan_cnt++;
4427  ctx.m_scans.set(treeNodePtr.p->m_node_no);
4428 
4429  return 0;
4430  } while (0);
4431 
4432  return err;
4433 }
4434 
4435 Uint32
4436 Dbspj::parseScanIndex(Build_context& ctx,
4437  Ptr<Request> requestPtr,
4438  Ptr<TreeNode> treeNodePtr,
4439  DABuffer tree, Uint32 treeBits,
4440  DABuffer param, Uint32 paramBits)
4441 {
4442  Uint32 err = 0;
4443 
4444  typedef QN_ScanIndexNode Node;
4445  typedef QN_ScanIndexParameters Params;
4446 
4447  do
4448  {
4449  jam();
4450 
4451  ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
4452  data.m_fragments.init();
4453  data.m_frags_outstanding = 0;
4454  data.m_frags_complete = 0;
4455  data.m_frags_not_started = 0;
4456  data.m_parallelismStat.init();
4457  data.m_firstExecution = true;
4458  data.m_batch_chunks = 0;
4459 
4460  err = parseDA(ctx, requestPtr, treeNodePtr,
4461  tree, treeBits, param, paramBits);
4462  if (unlikely(err != 0))
4463  break;
4464 
4465  if (treeBits & Node::SI_PRUNE_PATTERN)
4466  {
4467  Uint32 len_cnt = * tree.ptr ++;
4468  Uint32 len = len_cnt & 0xFFFF; // length of pattern in words
4469  Uint32 cnt = len_cnt >> 16; // no of parameters
4470 
4471  LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
4472  ndbrequire((cnt==0) == ((treeBits & Node::SI_PRUNE_PARAMS) ==0));
4473  ndbrequire((cnt==0) == ((paramBits & Params::SIP_PRUNE_PARAMS)==0));
4474 
4475  if (treeBits & Node::SI_PRUNE_LINKED)
4476  {
4477  jam();
4478  DEBUG("LINKED-PRUNE PATTERN w/ " << cnt << " PARAM values");
4479 
4480  data.m_prunePattern.init();
4481  Local_pattern_store pattern(pool, data.m_prunePattern);
4482 
4486  err = expand(pattern, treeNodePtr, tree, len, param, cnt);
4487  if (unlikely(err != 0))
4488  break;
4489 
4490  treeNodePtr.p->m_bits |= TreeNode::T_PRUNE_PATTERN;
4491  c_Counters.incr_counter(CI_PRUNED_RANGE_SCANS_RECEIVED, 1);
4492  }
4493  else
4494  {
4495  jam();
4496  DEBUG("FIXED-PRUNE w/ " << cnt << " PARAM values");
4497 
4503  Uint32 prunePtrI = RNIL;
4504  bool hasNull;
4505  err = expand(prunePtrI, tree, len, param, cnt, hasNull);
4506  if (unlikely(err != 0))
4507  break;
4508 
4509  if (unlikely(hasNull))
4510  {
4511  /* API should have elliminated requests w/ const-NULL keys */
4512  jam();
4513  DEBUG("BEWARE: T_CONST_PRUNE-key contain NULL values");
4514 // treeNodePtr.p->m_bits |= TreeNode::T_NULL_PRUNE;
4515 // break;
4516  ndbrequire(false);
4517  }
4518  ndbrequire(prunePtrI != RNIL); /* todo: can we allow / take advantage of NULLs in range scan? */
4519  data.m_constPrunePtrI = prunePtrI;
4520 
4525  treeNodePtr.p->m_bits |= TreeNode::T_CONST_PRUNE;
4526  c_Counters.incr_counter(CI_CONST_PRUNED_RANGE_SCANS_RECEIVED, 1);
4527  }
4528  } //SI_PRUNE_PATTERN
4529 
4530  if ((treeNodePtr.p->m_bits & TreeNode::T_CONST_PRUNE) == 0 &&
4531  ((treeBits & Node::SI_PARALLEL) ||
4532  ((paramBits & Params::SIP_PARALLEL))))
4533  {
4534  jam();
4535  treeNodePtr.p->m_bits |= TreeNode::T_SCAN_PARALLEL;
4536  }
4537 
4538  return 0;
4539  } while(0);
4540 
4541  DEBUG_CRASH();
4542  return err;
4543 }
4544 
4545 void
4546 Dbspj::scanIndex_prepare(Signal * signal,
4547  Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr)
4548 {
4549  jam();
4550 
4551  treeNodePtr.p->m_state = TreeNode::TN_PREPARING;
4552  ScanFragReq*dst=(ScanFragReq*)treeNodePtr.p->m_scanindex_data.m_scanFragReq;
4553 
4554  DihScanTabReq * req = (DihScanTabReq*)signal->getDataPtrSend();
4555  req->senderRef = reference();
4556  req->senderData = treeNodePtr.i;
4557  req->tableId = dst->tableId;
4558  req->schemaTransId = 0;
4559  sendSignal(DBDIH_REF, GSN_DIH_SCAN_TAB_REQ, signal,
4560  DihScanTabReq::SignalLength, JBB);
4561 
4562  requestPtr.p->m_outstanding++;
4563 }
4564 
4565 void
4566 Dbspj::execDIH_SCAN_TAB_REF(Signal* signal)
4567 {
4568  jamEntry();
4569  ndbrequire(false);
4570 }
4571 
4572 void
4573 Dbspj::execDIH_SCAN_TAB_CONF(Signal* signal)
4574 {
4575  jamEntry();
4576  DihScanTabConf * conf = (DihScanTabConf*)signal->getDataPtr();
4577 
4578  Ptr<TreeNode> treeNodePtr;
4579  m_treenode_pool.getPtr(treeNodePtr, conf->senderData);
4580  ndbrequire(treeNodePtr.p->m_info == &g_ScanIndexOpInfo);
4581 
4582  ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
4583 
4584  Uint32 cookie = conf->scanCookie;
4585  Uint32 fragCount = conf->fragmentCount;
4586  ScanFragReq * dst = (ScanFragReq*)data.m_scanFragReq;
4587 
4588  if (conf->reorgFlag)
4589  {
4590  jam();
4591  ScanFragReq::setReorgFlag(dst->requestInfo, 1);
4592  }
4593 
4594  data.m_fragCount = fragCount;
4595  data.m_scanCookie = cookie;
4596 
4597  const Uint32 prunemask = TreeNode::T_PRUNE_PATTERN | TreeNode::T_CONST_PRUNE;
4598  bool pruned = (treeNodePtr.p->m_bits & prunemask) != 0;
4599 
4600  Ptr<Request> requestPtr;
4601  m_request_pool.getPtr(requestPtr, treeNodePtr.p->m_requestPtrI);
4602 
4603  Ptr<ScanFragHandle> fragPtr;
4604  Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
4605  if (likely(m_scanfraghandle_pool.seize(requestPtr.p->m_arena, fragPtr)))
4606  {
4607  jam();
4608  fragPtr.p->init(0);
4609  fragPtr.p->m_treeNodePtrI = treeNodePtr.i;
4610  list.addLast(fragPtr);
4611  }
4612  else
4613  {
4614  jam();
4615  goto error1;
4616  }
4617 
4618  if (treeNodePtr.p->m_bits & TreeNode::T_CONST_PRUNE)
4619  {
4620  jam();
4621 
4622  // TODO we need a different variant of computeHash here,
4623  // since m_constPrunePtrI does not contain full primary key
4624  // but only parts in distribution key
4625 
4626  BuildKeyReq tmp;
4627  Uint32 indexId = dst->tableId;
4628  Uint32 tableId = g_key_descriptor_pool.getPtr(indexId)->primaryTableId;
4629  Uint32 err = computePartitionHash(signal, tmp, tableId, data.m_constPrunePtrI);
4630  if (unlikely(err != 0))
4631  goto error;
4632 
4633  releaseSection(data.m_constPrunePtrI);
4634  data.m_constPrunePtrI = RNIL;
4635 
4636  err = getNodes(signal, tmp, tableId);
4637  if (unlikely(err != 0))
4638  goto error;
4639 
4640  fragPtr.p->m_fragId = tmp.fragId;
4641  fragPtr.p->m_ref = tmp.receiverRef;
4642  data.m_fragCount = 1;
4643  }
4644  else if (fragCount == 1)
4645  {
4646  jam();
4651  if (treeNodePtr.p->m_bits & TreeNode::T_PRUNE_PATTERN)
4652  {
4653  jam();
4654  LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
4655  Local_pattern_store pattern(pool, data.m_prunePattern);
4656  pattern.release();
4657  }
4658  data.m_constPrunePtrI = RNIL;
4660  treeNodePtr.p->m_bits &= ~clear;
4661  treeNodePtr.p->m_bits |= TreeNode::T_CONST_PRUNE;
4662 
4666  pruned = false;
4667  }
4668  else
4669  {
4670  for (Uint32 i = 1; i<fragCount; i++)
4671  {
4672  jam();
4673  Ptr<ScanFragHandle> fragPtr;
4674  if (likely(m_scanfraghandle_pool.seize(requestPtr.p->m_arena, fragPtr)))
4675  {
4676  jam();
4677  fragPtr.p->init(i);
4678  fragPtr.p->m_treeNodePtrI = treeNodePtr.i;
4679  list.addLast(fragPtr);
4680  }
4681  else
4682  {
4683  goto error1;
4684  }
4685  }
4686  }
4687  data.m_frags_complete = data.m_fragCount;
4688 
4689  if (!pruned)
4690  {
4691  jam();
4692  Uint32 tableId = ((ScanFragReq*)data.m_scanFragReq)->tableId;
4693  DihScanGetNodesReq * req = (DihScanGetNodesReq*)signal->getDataPtrSend();
4694  req->senderRef = reference();
4695  req->tableId = tableId;
4696  req->scanCookie = cookie;
4697 
4698  Uint32 cnt = 0;
4699  for (list.first(fragPtr); !fragPtr.isNull(); list.next(fragPtr))
4700  {
4701  jam();
4702  req->senderData = fragPtr.i;
4703  req->fragId = fragPtr.p->m_fragId;
4704  sendSignal(DBDIH_REF, GSN_DIH_SCAN_GET_NODES_REQ, signal,
4705  DihScanGetNodesReq::SignalLength, JBB);
4706  cnt++;
4707  }
4708  data.m_frags_outstanding = cnt;
4709  requestPtr.p->m_outstanding++;
4710  }
4711  else
4712  {
4713  jam();
4714  treeNodePtr.p->m_state = TreeNode::TN_INACTIVE;
4715  }
4716  checkPrepareComplete(signal, requestPtr, 1);
4717 
4718  return;
4719 
4720 error1:
4721 error:
4722  ndbrequire(false);
4723 }
4724 
4725 void
4726 Dbspj::execDIH_SCAN_GET_NODES_REF(Signal* signal)
4727 {
4728  jamEntry();
4729  ndbrequire(false);
4730 }
4731 
4732 void
4733 Dbspj::execDIH_SCAN_GET_NODES_CONF(Signal* signal)
4734 {
4735  jamEntry();
4736 
4737  DihScanGetNodesConf * conf = (DihScanGetNodesConf*)signal->getDataPtr();
4738 
4739  Uint32 senderData = conf->senderData;
4740  Uint32 node = conf->nodes[0];
4741  Uint32 instanceKey = conf->instanceKey;
4742 
4743  Ptr<ScanFragHandle> fragPtr;
4744  m_scanfraghandle_pool.getPtr(fragPtr, senderData);
4745  Ptr<TreeNode> treeNodePtr;
4746  m_treenode_pool.getPtr(treeNodePtr, fragPtr.p->m_treeNodePtrI);
4747  ndbrequire(treeNodePtr.p->m_info == &g_ScanIndexOpInfo);
4748  ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
4749  ndbrequire(data.m_frags_outstanding > 0);
4750  data.m_frags_outstanding--;
4751 
4752  fragPtr.p->m_ref = numberToRef(DBLQH, instanceKey, node);
4753 
4754  if (data.m_frags_outstanding == 0)
4755  {
4756  jam();
4757 
4758  treeNodePtr.p->m_state = TreeNode::TN_INACTIVE;
4759 
4760  Ptr<Request> requestPtr;
4761  m_request_pool.getPtr(requestPtr, treeNodePtr.p->m_requestPtrI);
4762  checkPrepareComplete(signal, requestPtr, 1);
4763  }
4764 }
4765 
4766 Uint32
4767 Dbspj::scanIndex_findFrag(Local_ScanFragHandle_list & list,
4768  Ptr<ScanFragHandle> & fragPtr, Uint32 fragId)
4769 {
4770  for (list.first(fragPtr); !fragPtr.isNull(); list.next(fragPtr))
4771  {
4772  jam();
4773  if (fragPtr.p->m_fragId == fragId)
4774  {
4775  jam();
4776  return 0;
4777  }
4778  }
4779 
4780  return 99; // TODO
4781 }
4782 
4783 void
4784 Dbspj::scanIndex_parent_row(Signal* signal,
4785  Ptr<Request> requestPtr,
4786  Ptr<TreeNode> treeNodePtr,
4787  const RowPtr & rowRef)
4788 {
4789  jam();
4790 
4791  Uint32 err;
4792  ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
4793 
4799  do
4800  {
4801  Ptr<ScanFragHandle> fragPtr;
4802  Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
4803  LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
4804  if (treeNodePtr.p->m_bits & TreeNode::T_PRUNE_PATTERN)
4805  {
4806  jam();
4807 
4813  Local_pattern_store pattern(pool, data.m_prunePattern);
4814  Uint32 pruneKeyPtrI = RNIL;
4815  bool hasNull;
4816  err = expand(pruneKeyPtrI, pattern, rowRef, hasNull);
4817  if (unlikely(err != 0))
4818  {
4819  DEBUG_CRASH();
4820  break;
4821  }
4822 
4823  if (unlikely(hasNull))
4824  {
4825  jam();
4826  DEBUG("T_PRUNE_PATTERN-key contain NULL values");
4827 
4828  // Ignore this request as 'NULL == <column>' will never give a match
4829  if (pruneKeyPtrI != RNIL)
4830  {
4831  releaseSection(pruneKeyPtrI);
4832  }
4833  return; // Bailout, SCANREQ would have returned 0 rows anyway
4834  }
4835 
4836  // TODO we need a different variant of computeHash here,
4837  // since pruneKeyPtrI does not contain full primary key
4838  // but only parts in distribution key
4839 
4840  BuildKeyReq tmp;
4841  ScanFragReq * dst = (ScanFragReq*)data.m_scanFragReq;
4842  Uint32 indexId = dst->tableId;
4843  Uint32 tableId = g_key_descriptor_pool.getPtr(indexId)->primaryTableId;
4844  err = computePartitionHash(signal, tmp, tableId, pruneKeyPtrI);
4845  releaseSection(pruneKeyPtrI); // see ^ TODO
4846  if (unlikely(err != 0))
4847  {
4848  DEBUG_CRASH();
4849  break;
4850  }
4851 
4852  err = getNodes(signal, tmp, tableId);
4853  if (unlikely(err != 0))
4854  {
4855  DEBUG_CRASH();
4856  break;
4857  }
4858 
4859  err = scanIndex_findFrag(list, fragPtr, tmp.fragId);
4860  if (unlikely(err != 0))
4861  {
4862  DEBUG_CRASH();
4863  break;
4864  }
4865 
4875  fragPtr.p->m_ref = tmp.receiverRef;
4876  }
4877  else
4878  {
4879  jam();
4884  list.first(fragPtr);
4885  }
4886 
4887  Uint32 ptrI = fragPtr.p->m_rangePtrI;
4888  bool hasNull;
4889  if (treeNodePtr.p->m_bits & TreeNode::T_KEYINFO_CONSTRUCTED)
4890  {
4891  jam();
4892  Local_pattern_store pattern(pool, treeNodePtr.p->m_keyPattern);
4893  err = expand(ptrI, pattern, rowRef, hasNull);
4894  if (unlikely(err != 0))
4895  {
4896  DEBUG_CRASH();
4897  break;
4898  }
4899  }
4900  else
4901  {
4902  jam();
4903  // Fixed key...fix later...
4904  ndbrequire(false);
4905  }
4906 // ndbrequire(!hasNull); // FIXME, can't ignore request as we already added it to keyPattern
4907  fragPtr.p->m_rangePtrI = ptrI;
4908  scanIndex_fixupBound(fragPtr, ptrI, rowRef.m_src_correlation);
4909 
4910  if (treeNodePtr.p->m_bits & TreeNode::T_ONE_SHOT)
4911  {
4912  jam();
4917  scanIndex_parent_batch_complete(signal, requestPtr, treeNodePtr);
4918  }
4919 
4920  return;
4921  } while (0);
4922 
4923  ndbrequire(false);
4924 }
4925 
4926 
4927 void
4928 Dbspj::scanIndex_fixupBound(Ptr<ScanFragHandle> fragPtr,
4929  Uint32 ptrI, Uint32 corrVal)
4930 {
4936  SectionReader r0(ptrI, getSectionSegmentPool());
4937  ndbrequire(r0.step(fragPtr.p->m_range_builder.m_range_size));
4938  Uint32 boundsz = r0.getSize() - fragPtr.p->m_range_builder.m_range_size;
4939  Uint32 boundno = fragPtr.p->m_range_builder.m_range_cnt + 1;
4940 
4941  Uint32 tmp;
4942  ndbrequire(r0.peekWord(&tmp));
4943  tmp |= (boundsz << 16) | ((corrVal & 0xFFF) << 4);
4944  ndbrequire(r0.updateWord(tmp));
4945  ndbrequire(r0.step(1)); // Skip first BoundType
4946 
4947  // TODO: Renumbering below assume there are only EQ-bounds !!
4948  Uint32 id = 0;
4949  Uint32 len32;
4950  do
4951  {
4952  ndbrequire(r0.peekWord(&tmp));
4953  AttributeHeader ah(tmp);
4954  Uint32 len = ah.getByteSize();
4955  AttributeHeader::init(&tmp, id++, len);
4956  ndbrequire(r0.updateWord(tmp));
4957  len32 = (len + 3) >> 2;
4958  } while (r0.step(2 + len32)); // Skip AttributeHeader(1) + Attribute(len32) + next BoundType(1)
4959 
4960  fragPtr.p->m_range_builder.m_range_cnt = boundno;
4961  fragPtr.p->m_range_builder.m_range_size = r0.getSize();
4962 }
4963 
4964 void
4965 Dbspj::scanIndex_parent_batch_complete(Signal* signal,
4966  Ptr<Request> requestPtr,
4967  Ptr<TreeNode> treeNodePtr)
4968 {
4969  jam();
4970 
4971  ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
4972  data.m_rows_received = 0;
4973  data.m_rows_expecting = 0;
4974  ndbassert(data.m_frags_outstanding == 0);
4975  ndbassert(data.m_frags_complete == data.m_fragCount);
4976  data.m_frags_complete = 0;
4977 
4978  Ptr<ScanFragHandle> fragPtr;
4979  {
4980  Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
4981  list.first(fragPtr);
4982 
4983  if ((treeNodePtr.p->m_bits & TreeNode::T_PRUNE_PATTERN) == 0)
4984  {
4985  if (fragPtr.p->m_rangePtrI == RNIL)
4986  {
4987  // No keys found
4988  jam();
4989  data.m_frags_complete = data.m_fragCount;
4990  }
4991  }
4992  else
4993  {
4994  while(!fragPtr.isNull())
4995  {
4996  if (fragPtr.p->m_rangePtrI == RNIL)
4997  {
4998  jam();
5003  fragPtr.p->m_state = ScanFragHandle::SFH_COMPLETE;
5004  data.m_frags_complete++;
5005  }
5006  list.next(fragPtr);
5007  }
5008  }
5009  }
5010  data.m_frags_not_started = data.m_fragCount - data.m_frags_complete;
5011 
5012  if (data.m_frags_complete == data.m_fragCount)
5013  {
5014  jam();
5018  return;
5019  }
5020 
5024  const ScanFragReq * org = (const ScanFragReq*)data.m_scanFragReq;
5025  ndbrequire(org->batch_size_rows > 0);
5026 
5027  if (treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL)
5028  {
5029  jam();
5030  data.m_parallelism = MIN(data.m_fragCount - data.m_frags_complete,
5031  org->batch_size_rows);
5032  }
5033  else if (data.m_firstExecution)
5034  {
5044  jam();
5045  data.m_parallelism = 1;
5046  }
5047  else
5048  {
5049  jam();
5056  Int32 parallelism =
5057  static_cast<Int32>(MIN(data.m_parallelismStat.getMean()
5058  - 2 * data.m_parallelismStat.getStdDev(),
5059  org->batch_size_rows));
5060 
5061  if (parallelism < 1)
5062  {
5063  jam();
5064  parallelism = 1;
5065  }
5066  else if ((data.m_fragCount - data.m_frags_complete) % parallelism != 0)
5067  {
5068  jam();
5075  const Int32 roundTrips =
5076  1 + (data.m_fragCount - data.m_frags_complete) / parallelism;
5077  parallelism = (data.m_fragCount - data.m_frags_complete) / roundTrips;
5078  }
5079 
5080  data.m_parallelism = static_cast<Uint32>(parallelism);
5081 
5082 #ifdef DEBUG_SCAN_FRAGREQ
5083  DEBUG("::scanIndex_send() starting index scan with parallelism="
5084  << data.m_parallelism);
5085 #endif
5086  }
5087  ndbrequire(data.m_parallelism > 0);
5088 
5089  const Uint32 bs_rows = org->batch_size_rows/ data.m_parallelism;
5090  const Uint32 bs_bytes = org->batch_size_bytes / data.m_parallelism;
5091  ndbassert(bs_rows > 0);
5092  ndbassert(bs_bytes > 0);
5093 
5094  data.m_largestBatchRows = 0;
5095  data.m_largestBatchBytes = 0;
5096  data.m_totalRows = 0;
5097  data.m_totalBytes = 0;
5098 
5099  {
5100  Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
5101  Ptr<ScanFragHandle> fragPtr;
5102  list.first(fragPtr);
5103 
5104  while(!fragPtr.isNull())
5105  {
5106  ndbassert(fragPtr.p->m_state == ScanFragHandle::SFH_NOT_STARTED ||
5107  fragPtr.p->m_state == ScanFragHandle::SFH_COMPLETE);
5108  fragPtr.p->m_state = ScanFragHandle::SFH_NOT_STARTED;
5109  list.next(fragPtr);
5110  }
5111  }
5112 
5113  Uint32 batchRange = 0;
5114  scanIndex_send(signal,
5115  requestPtr,
5116  treeNodePtr,
5117  data.m_parallelism,
5118  bs_bytes,
5119  bs_rows,
5120  batchRange);
5121 
5122  data.m_firstExecution = false;
5123 
5124  ndbrequire(static_cast<Uint32>(data.m_frags_outstanding +
5125  data.m_frags_complete) <=
5126  data.m_fragCount);
5127 
5128  data.m_batch_chunks = 1;
5129  requestPtr.p->m_cnt_active++;
5130  requestPtr.p->m_outstanding++;
5131  treeNodePtr.p->m_state = TreeNode::TN_ACTIVE;
5132 }
5133 
5134 void
5135 Dbspj::scanIndex_parent_batch_repeat(Signal* signal,
5136  Ptr<Request> requestPtr,
5137  Ptr<TreeNode> treeNodePtr)
5138 {
5139  jam();
5140  ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
5141 
5142  DEBUG("scanIndex_parent_batch_repeat(), m_node_no: " << treeNodePtr.p->m_node_no
5143  << ", m_batch_chunks: " << data.m_batch_chunks);
5144 
5145  ndbassert(treeNodePtr.p->m_bits & TreeNode::T_SCAN_REPEATABLE);
5146 
5151  if (data.m_batch_chunks > 1)
5152  {
5153  jam();
5154  DEBUG("Register TreeNode for restart, m_node_no: " << treeNodePtr.p->m_node_no);
5155  ndbrequire(treeNodePtr.p->m_state != TreeNode::TN_ACTIVE);
5156  registerActiveCursor(requestPtr, treeNodePtr);
5157  data.m_batch_chunks = 0;
5158  }
5159 }
5160 
5164 void
5165 Dbspj::scanIndex_send(Signal* signal,
5166  Ptr<Request> requestPtr,
5167  Ptr<TreeNode> treeNodePtr,
5168  Uint32 noOfFrags,
5169  Uint32 bs_bytes,
5170  Uint32 bs_rows,
5171  Uint32& batchRange)
5172 {
5178  const bool prune = treeNodePtr.p->m_bits &
5180 
5185  const bool repeatable =
5186  (treeNodePtr.p->m_bits & TreeNode::T_SCAN_REPEATABLE) != 0;
5187 
5188  ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
5189  ndbassert(noOfFrags > 0);
5190  ndbassert(data.m_frags_not_started >= noOfFrags);
5191  ScanFragReq* const req =
5192  reinterpret_cast<ScanFragReq*>(signal->getDataPtrSend());
5193  const ScanFragReq * const org
5194  = reinterpret_cast<ScanFragReq*>(data.m_scanFragReq);
5195  memcpy(req, org, sizeof(data.m_scanFragReq));
5196  // req->variableData[0] // set below
5197  req->variableData[1] = requestPtr.p->m_rootResultData;
5198  req->batch_size_bytes = bs_bytes;
5199  req->batch_size_rows = bs_rows;
5200 
5201  Uint32 requestsSent = 0;
5202  Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
5203  Ptr<ScanFragHandle> fragPtr;
5204  list.first(fragPtr);
5205  Uint32 keyInfoPtrI = fragPtr.p->m_rangePtrI;
5206  ndbrequire(prune || keyInfoPtrI != RNIL);
5211  while (requestsSent < noOfFrags)
5212  {
5213  jam();
5214  ndbassert(!fragPtr.isNull());
5215 
5216  if (fragPtr.p->m_state != ScanFragHandle::SFH_NOT_STARTED)
5217  {
5218  // Skip forward to the frags that we should send.
5219  jam();
5220  list.next(fragPtr);
5221  continue;
5222  }
5223 
5224  const Uint32 ref = fragPtr.p->m_ref;
5225 
5226  if (noOfFrags==1 && !prune &&
5227  data.m_frags_not_started == data.m_fragCount &&
5228  refToNode(ref) != getOwnNodeId() &&
5229  list.hasNext(fragPtr))
5230  {
5239  jam();
5240  list.next(fragPtr);
5241  continue;
5242  }
5243 
5244  SectionHandle handle(this);
5245 
5246  Uint32 attrInfoPtrI = treeNodePtr.p->m_send.m_attrInfoPtrI;
5247 
5251  req->senderData = fragPtr.i;
5252  req->fragmentNoKeyLen = fragPtr.p->m_fragId;
5253 
5254  if (prune)
5255  {
5256  jam();
5257  keyInfoPtrI = fragPtr.p->m_rangePtrI;
5258  if (keyInfoPtrI == RNIL)
5259  {
5264  jam();
5265  fragPtr.p->m_state = ScanFragHandle::SFH_COMPLETE;
5266  list.next(fragPtr);
5267  continue;
5268  }
5269 
5270  if (!repeatable)
5271  {
5277  jam();
5278  Uint32 tmp = RNIL;
5279  ndbrequire(dupSection(tmp, attrInfoPtrI)); // TODO handle error
5280  attrInfoPtrI = tmp;
5281  }
5282  }
5283 
5284  req->variableData[0] = batchRange;
5285  getSection(handle.m_ptr[0], attrInfoPtrI);
5286  getSection(handle.m_ptr[1], keyInfoPtrI);
5287  handle.m_cnt = 2;
5288 
5289 #if defined DEBUG_SCAN_FRAGREQ
5290  ndbout_c("SCAN_FRAGREQ to %x", ref);
5291  printSCAN_FRAGREQ(stdout, signal->getDataPtrSend(),
5292  NDB_ARRAY_SIZE(treeNodePtr.p->m_scanfrag_data.m_scanFragReq),
5293  DBLQH);
5294  printf("ATTRINFO: ");
5295  print(handle.m_ptr[0], stdout);
5296  printf("KEYINFO: ");
5297  print(handle.m_ptr[1], stdout);
5298 #endif
5299 
5300  if (refToNode(ref) == getOwnNodeId())
5301  {
5302  c_Counters.incr_counter(CI_LOCAL_RANGE_SCANS_SENT, 1);
5303  }
5304  else
5305  {
5306  c_Counters.incr_counter(CI_REMOTE_RANGE_SCANS_SENT, 1);
5307  }
5308 
5309  if (prune && !repeatable)
5310  {
5316  jam();
5317  sendSignal(ref, GSN_SCAN_FRAGREQ, signal,
5318  NDB_ARRAY_SIZE(data.m_scanFragReq), JBB, &handle);
5319  fragPtr.p->m_rangePtrI = RNIL;
5320  fragPtr.p->reset_ranges();
5321  }
5322  else
5323  {
5328  jam();
5329  sendSignalNoRelease(ref, GSN_SCAN_FRAGREQ, signal,
5330  NDB_ARRAY_SIZE(data.m_scanFragReq), JBB, &handle);
5331  }
5332  handle.clear();
5333 
5334  fragPtr.p->m_state = ScanFragHandle::SFH_SCANNING; // running
5335  data.m_frags_outstanding++;
5336  batchRange += bs_rows;
5337  requestsSent++;
5338  list.next(fragPtr);
5339  } // while (requestsSent < noOfFrags)
5340 
5341  data.m_frags_not_started -= requestsSent;
5342 }
5343 
5344 void
5345 Dbspj::scanIndex_execTRANSID_AI(Signal* signal,
5346  Ptr<Request> requestPtr,
5347  Ptr<TreeNode> treeNodePtr,
5348  const RowPtr & rowRef)
5349 {
5350  jam();
5351 
5352  LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
5353  Local_dependency_map list(pool, treeNodePtr.p->m_dependent_nodes);
5354  Dependency_map::ConstDataBufferIterator it;
5355 
5356  {
5357  for (list.first(it); !it.isNull(); list.next(it))
5358  {
5359  jam();
5360  Ptr<TreeNode> childPtr;
5361  m_treenode_pool.getPtr(childPtr, * it.data);
5362  ndbrequire(childPtr.p->m_info != 0&&childPtr.p->m_info->m_parent_row!=0);
5363  (this->*(childPtr.p->m_info->m_parent_row))(signal,
5364  requestPtr, childPtr,rowRef);
5365  }
5366  }
5367 
5368  ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
5369  data.m_rows_received++;
5370 
5371  if (data.m_frags_outstanding == 0 &&
5372  data.m_rows_received == data.m_rows_expecting)
5373  {
5374  jam();
5378  if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE)
5379  {
5380  jam();
5381  reportBatchComplete(signal, requestPtr, treeNodePtr);
5382  }
5383 
5384  checkBatchComplete(signal, requestPtr, 1);
5385  return;
5386  }
5387 }
5388 
5389 void
5390 Dbspj::scanIndex_execSCAN_FRAGCONF(Signal* signal,
5391  Ptr<Request> requestPtr,
5392  Ptr<TreeNode> treeNodePtr,
5393  Ptr<ScanFragHandle> fragPtr)
5394 {
5395  jam();
5396 
5397  const ScanFragConf * conf = (const ScanFragConf*)(signal->getDataPtr());
5398 
5399  Uint32 rows = conf->completedOps;
5400  Uint32 done = conf->fragmentCompleted;
5401 
5402  Uint32 state = fragPtr.p->m_state;
5403  ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
5404 
5405  if (state == ScanFragHandle::SFH_WAIT_CLOSE && done == 0)
5406  {
5407  jam();
5411  return;
5412  }
5413 
5414  requestPtr.p->m_rows += rows;
5415  data.m_totalRows += rows;
5416  data.m_totalBytes += conf->total_len;
5417  data.m_largestBatchRows = MAX(data.m_largestBatchRows, rows);
5418  data.m_largestBatchBytes = MAX(data.m_largestBatchBytes, conf->total_len);
5419 
5420  if (!treeNodePtr.p->isLeaf())
5421  {
5422  jam();
5423  data.m_rows_expecting += rows;
5424  }
5425  ndbrequire(data.m_frags_outstanding);
5426  ndbrequire(state == ScanFragHandle::SFH_SCANNING ||
5427  state == ScanFragHandle::SFH_WAIT_CLOSE);
5428 
5429  data.m_frags_outstanding--;
5430  fragPtr.p->m_state = ScanFragHandle::SFH_WAIT_NEXTREQ;
5431 
5432  if (done)
5433  {
5434  jam();
5435  fragPtr.p->m_state = ScanFragHandle::SFH_COMPLETE;
5436  ndbrequire(data.m_frags_complete < data.m_fragCount);
5437  data.m_frags_complete++;
5438 
5439  if (data.m_frags_complete == data.m_fragCount ||
5440  ((requestPtr.p->m_state & Request::RS_ABORTING) != 0 &&
5441  data.m_fragCount == (data.m_frags_complete + data.m_frags_not_started)))
5442  {
5443  jam();
5444  ndbrequire(requestPtr.p->m_cnt_active);
5445  requestPtr.p->m_cnt_active--;
5446  treeNodePtr.p->m_state = TreeNode::TN_INACTIVE;
5447  }
5448  }
5449 
5450 
5451  if (data.m_frags_outstanding == 0)
5452  {
5453  const ScanFragReq * const org
5454  = reinterpret_cast<const ScanFragReq*>(data.m_scanFragReq);
5455 
5456  if (data.m_frags_complete == data.m_fragCount)
5457  {
5458  jam();
5465  double parallelism = data.m_fragCount;
5466  if (data.m_totalRows > 0)
5467  {
5468  parallelism = MIN(parallelism,
5469  double(org->batch_size_rows) / data.m_totalRows);
5470  }
5471  if (data.m_totalBytes > 0)
5472  {
5473  parallelism = MIN(parallelism,
5474  double(org->batch_size_bytes) / data.m_totalBytes);
5475  }
5476  data.m_parallelismStat.update(parallelism);
5477  }
5478 
5482  if (state == ScanFragHandle::SFH_WAIT_CLOSE)
5483  {
5484  jam();
5485  ndbrequire((requestPtr.p->m_state & Request::RS_ABORTING) != 0);
5486  }
5487  else if (! (data.m_rows_received == data.m_rows_expecting))
5488  {
5489  jam();
5490  return;
5491  }
5492  else
5493  {
5494  if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE)
5495  {
5496  jam();
5497  reportBatchComplete(signal, requestPtr, treeNodePtr);
5498  }
5499  }
5500 
5501  checkBatchComplete(signal, requestPtr, 1);
5502  return;
5503  }
5504 }
5505 
5506 void
5507 Dbspj::scanIndex_execSCAN_FRAGREF(Signal* signal,
5508  Ptr<Request> requestPtr,
5509  Ptr<TreeNode> treeNodePtr,
5510  Ptr<ScanFragHandle> fragPtr)
5511 {
5512  jam();
5513 
5514  const ScanFragRef * rep = CAST_CONSTPTR(ScanFragRef, signal->getDataPtr());
5515  const Uint32 errCode = rep->errorCode;
5516 
5517  Uint32 state = fragPtr.p->m_state;
5518  ndbrequire(state == ScanFragHandle::SFH_SCANNING ||
5519  state == ScanFragHandle::SFH_WAIT_CLOSE);
5520 
5521  fragPtr.p->m_state = ScanFragHandle::SFH_COMPLETE;
5522 
5523  ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
5524  ndbrequire(data.m_frags_complete < data.m_fragCount);
5525  data.m_frags_complete++;
5526  ndbrequire(data.m_frags_outstanding > 0);
5527  data.m_frags_outstanding--;
5528 
5529  if (data.m_fragCount == (data.m_frags_complete + data.m_frags_not_started))
5530  {
5531  jam();
5532  ndbrequire(requestPtr.p->m_cnt_active);
5533  requestPtr.p->m_cnt_active--;
5534  treeNodePtr.p->m_state = TreeNode::TN_INACTIVE;
5535  }
5536 
5537  if (data.m_frags_outstanding == 0)
5538  {
5539  jam();
5540  ndbrequire(requestPtr.p->m_outstanding);
5541  requestPtr.p->m_outstanding--;
5542  }
5543 
5544  abort(signal, requestPtr, errCode);
5545 }
5546 
5547 void
5548 Dbspj::scanIndex_execSCAN_NEXTREQ(Signal* signal,
5549  Ptr<Request> requestPtr,
5550  Ptr<TreeNode> treeNodePtr)
5551 {
5552  jam();
5553 
5554  ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
5555  const ScanFragReq * org = (const ScanFragReq*)data.m_scanFragReq;
5556 
5557  data.m_rows_received = 0;
5558  data.m_rows_expecting = 0;
5559  ndbassert(data.m_frags_outstanding == 0);
5560 
5561  ndbrequire(data.m_frags_complete < data.m_fragCount);
5562  if ((treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL) == 0)
5563  {
5564  jam();
5570  if (data.m_largestBatchRows < org->batch_size_rows/data.m_parallelism &&
5571  data.m_largestBatchBytes < org->batch_size_bytes/data.m_parallelism)
5572  {
5573  jam();
5574  data.m_parallelism = MIN(data.m_fragCount - data.m_frags_complete,
5575  org->batch_size_rows);
5576  if (data.m_largestBatchRows > 0)
5577  {
5578  jam();
5579  data.m_parallelism =
5580  MIN(org->batch_size_rows / data.m_largestBatchRows,
5581  data.m_parallelism);
5582  }
5583  if (data.m_largestBatchBytes > 0)
5584  {
5585  jam();
5586  data.m_parallelism =
5587  MIN(data.m_parallelism,
5588  org->batch_size_bytes/data.m_largestBatchBytes);
5589  }
5590  if (data.m_frags_complete == 0 &&
5591  data.m_frags_not_started % data.m_parallelism != 0)
5592  {
5593  jam();
5600  const Uint32 roundTrips =
5601  1 + data.m_frags_not_started / data.m_parallelism;
5602  data.m_parallelism = data.m_frags_not_started / roundTrips;
5603  }
5604  }
5605  else
5606  {
5607  jam();
5608  // We get full batches, so we should lower parallelism.
5609  data.m_parallelism = MIN(data.m_fragCount - data.m_frags_complete,
5610  MAX(1, data.m_parallelism/2));
5611  }
5612  ndbassert(data.m_parallelism > 0);
5613 #ifdef DEBUG_SCAN_FRAGREQ
5614  DEBUG("::scanIndex_execSCAN_NEXTREQ() Asking for new batches from " <<
5615  data.m_parallelism <<
5616  " fragments with " << org->batch_size_rows/data.m_parallelism <<
5617  " rows and " << org->batch_size_bytes/data.m_parallelism <<
5618  " bytes.");
5619 #endif
5620  }
5621  else
5622  {
5623  jam();
5624  data.m_parallelism = MIN(data.m_fragCount - data.m_frags_complete,
5625  org->batch_size_rows);
5626  }
5627 
5628  const Uint32 bs_rows = org->batch_size_rows/data.m_parallelism;
5629  ndbassert(bs_rows > 0);
5630  ScanFragNextReq* req =
5631  reinterpret_cast<ScanFragNextReq*>(signal->getDataPtrSend());
5632  req->requestInfo = 0;
5633  ScanFragNextReq::setCorrFactorFlag(req->requestInfo);
5634  req->transId1 = requestPtr.p->m_transId[0];
5635  req->transId2 = requestPtr.p->m_transId[1];
5636  req->batch_size_rows = bs_rows;
5637  req->batch_size_bytes = org->batch_size_bytes/data.m_parallelism;
5638 
5639  Uint32 batchRange = 0;
5640  Ptr<ScanFragHandle> fragPtr;
5641  Uint32 sentFragCount = 0;
5642  {
5646  Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
5647  list.first(fragPtr);
5648  while (sentFragCount < data.m_parallelism && !fragPtr.isNull())
5649  {
5650  jam();
5651  ndbassert(fragPtr.p->m_state == ScanFragHandle::SFH_WAIT_NEXTREQ ||
5652  fragPtr.p->m_state == ScanFragHandle::SFH_COMPLETE ||
5653  fragPtr.p->m_state == ScanFragHandle::SFH_NOT_STARTED);
5654  if (fragPtr.p->m_state == ScanFragHandle::SFH_WAIT_NEXTREQ)
5655  {
5656  jam();
5657 
5658  data.m_frags_outstanding++;
5659  req->variableData[0] = batchRange;
5660  fragPtr.p->m_state = ScanFragHandle::SFH_SCANNING;
5661  batchRange += bs_rows;
5662 
5663  DEBUG("scanIndex_execSCAN_NEXTREQ to: " << hex
5664  << treeNodePtr.p->m_send.m_ref
5665  << ", m_node_no=" << treeNodePtr.p->m_node_no
5666  << ", senderData: " << req->senderData);
5667 
5668 #ifdef DEBUG_SCAN_FRAGREQ
5669  printSCANFRAGNEXTREQ(stdout, &signal->theData[0],
5670  ScanFragNextReq:: SignalLength + 1, DBLQH);
5671 #endif
5672 
5673  req->senderData = fragPtr.i;
5674  sendSignal(fragPtr.p->m_ref, GSN_SCAN_NEXTREQ, signal,
5675  ScanFragNextReq::SignalLength + 1,
5676  JBB);
5677  sentFragCount++;
5678  }
5679  list.next(fragPtr);
5680  }
5681  }
5682 
5683  if (sentFragCount < data.m_parallelism)
5684  {
5688  jam();
5689  ndbassert(data.m_frags_not_started != 0);
5690  scanIndex_send(signal,
5691  requestPtr,
5692  treeNodePtr,
5693  data.m_parallelism - sentFragCount,
5694  org->batch_size_bytes/data.m_parallelism,
5695  bs_rows,
5696  batchRange);
5697  }
5703  ndbrequire(data.m_frags_outstanding > 0);
5704  ndbrequire(data.m_batch_chunks > 0);
5705  data.m_batch_chunks++;
5706 
5707  requestPtr.p->m_outstanding++;
5708  ndbassert(treeNodePtr.p->m_state == TreeNode::TN_ACTIVE);
5709 }
5710 
5711 void
5712 Dbspj::scanIndex_complete(Signal* signal,
5713  Ptr<Request> requestPtr,
5714  Ptr<TreeNode> treeNodePtr)
5715 {
5716  jam();
5717  ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
5718  ScanFragReq*dst=(ScanFragReq*)treeNodePtr.p->m_scanindex_data.m_scanFragReq;
5719  if (!data.m_fragments.isEmpty())
5720  {
5721  jam();
5722  DihScanTabCompleteRep* rep=(DihScanTabCompleteRep*)signal->getDataPtrSend();
5723  rep->tableId = dst->tableId;
5724  rep->scanCookie = data.m_scanCookie;
5725  sendSignal(DBDIH_REF, GSN_DIH_SCAN_TAB_COMPLETE_REP,
5726  signal, DihScanTabCompleteRep::SignalLength, JBB);
5727  }
5728 }
5729 
5730 void
5731 Dbspj::scanIndex_abort(Signal* signal,
5732  Ptr<Request> requestPtr,
5733  Ptr<TreeNode> treeNodePtr)
5734 {
5735  jam();
5736 
5737  switch(treeNodePtr.p->m_state){
5738  case TreeNode::TN_BUILDING:
5740  case TreeNode::TN_INACTIVE:
5742  case TreeNode::TN_END:
5743  ndbout_c("H'%.8x H'%.8x scanIndex_abort state: %u",
5744  requestPtr.p->m_transId[0],
5745  requestPtr.p->m_transId[1],
5746  treeNodePtr.p->m_state);
5747  return;
5748 
5749  case TreeNode::TN_ACTIVE:
5750  jam();
5751  break;
5752  }
5753 
5754  ScanFragNextReq* req = CAST_PTR(ScanFragNextReq, signal->getDataPtrSend());
5755  req->requestInfo = ScanFragNextReq::ZCLOSE;
5756  req->transId1 = requestPtr.p->m_transId[0];
5757  req->transId2 = requestPtr.p->m_transId[1];
5758  req->batch_size_rows = 0;
5759  req->batch_size_bytes = 0;
5760 
5761  ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
5762  Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
5763  Ptr<ScanFragHandle> fragPtr;
5764 
5765  Uint32 cnt_waiting = 0;
5766  Uint32 cnt_scanning = 0;
5767  for (list.first(fragPtr); !fragPtr.isNull(); list.next(fragPtr))
5768  {
5769  switch(fragPtr.p->m_state){
5770  case ScanFragHandle::SFH_NOT_STARTED:
5771  case ScanFragHandle::SFH_COMPLETE:
5772  case ScanFragHandle::SFH_WAIT_CLOSE:
5773  jam();
5774  break;
5775  case ScanFragHandle::SFH_WAIT_NEXTREQ:
5776  jam();
5777  cnt_waiting++; // was idle...
5778  data.m_frags_outstanding++; // is closing
5779  goto do_abort;
5780  case ScanFragHandle::SFH_SCANNING:
5781  jam();
5782  cnt_scanning++;
5783  goto do_abort;
5784  do_abort:
5785  req->senderData = fragPtr.i;
5786  sendSignal(fragPtr.p->m_ref, GSN_SCAN_NEXTREQ, signal,
5787  ScanFragNextReq::SignalLength, JBB);
5788 
5789  fragPtr.p->m_state = ScanFragHandle::SFH_WAIT_CLOSE;
5790  break;
5791  }
5792  }
5793 
5794  if (cnt_scanning == 0)
5795  {
5796  if (cnt_waiting > 0)
5797  {
5801  jam();
5802  requestPtr.p->m_outstanding++;
5803  }
5804  else
5805  {
5810  jam();
5811  ndbassert(data.m_frags_not_started > 0);
5812  ndbrequire(requestPtr.p->m_cnt_active);
5813  requestPtr.p->m_cnt_active--;
5814  treeNodePtr.p->m_state = TreeNode::TN_INACTIVE;
5815  }
5816  }
5817 }
5818 
5819 Uint32
5820 Dbspj::scanIndex_execNODE_FAILREP(Signal* signal,
5821  Ptr<Request> requestPtr,
5822  Ptr<TreeNode> treeNodePtr,
5823  NdbNodeBitmask nodes)
5824 {
5825  jam();
5826 
5827  switch(treeNodePtr.p->m_state){
5829  case TreeNode::TN_INACTIVE:
5830  return 1;
5831 
5832  case TreeNode::TN_BUILDING:
5834  case TreeNode::TN_END:
5835  return 0;
5836 
5837  case TreeNode::TN_ACTIVE:
5838  jam();
5839  break;
5840  }
5841 
5842 
5843  Uint32 sum = 0;
5844  ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
5845  Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
5846  Ptr<ScanFragHandle> fragPtr;
5847 
5848  Uint32 save0 = data.m_frags_outstanding;
5849  Uint32 save1 = data.m_frags_complete;
5850 
5851  for (list.first(fragPtr); !fragPtr.isNull(); list.next(fragPtr))
5852  {
5853  if (nodes.get(refToNode(fragPtr.p->m_ref)) == false)
5854  {
5855  jam();
5859  continue;
5860  }
5861 
5862  switch(fragPtr.p->m_state){
5863  case ScanFragHandle::SFH_NOT_STARTED:
5864  jam();
5865  ndbrequire(data.m_frags_complete < data.m_fragCount);
5866  data.m_frags_complete++;
5867  ndbrequire(data.m_frags_not_started > 0);
5868  data.m_frags_not_started--;
5869  // fall through
5870  case ScanFragHandle::SFH_COMPLETE:
5871  jam();
5872  sum++; // indicate that we should abort
5878  break;
5879  case ScanFragHandle::SFH_WAIT_CLOSE:
5880  case ScanFragHandle::SFH_SCANNING:
5881  jam();
5882  ndbrequire(data.m_frags_outstanding > 0);
5883  data.m_frags_outstanding--;
5884  // fall through
5885  case ScanFragHandle::SFH_WAIT_NEXTREQ:
5886  jam();
5887  sum++;
5888  ndbrequire(data.m_frags_complete < data.m_fragCount);
5889  data.m_frags_complete++;
5890  break;
5891  }
5892  fragPtr.p->m_ref = 0;
5893  fragPtr.p->m_state = ScanFragHandle::SFH_COMPLETE;
5894  }
5895 
5896  if (save0 != 0 && data.m_frags_outstanding == 0)
5897  {
5898  jam();
5899  ndbrequire(requestPtr.p->m_outstanding);
5900  requestPtr.p->m_outstanding--;
5901  }
5902 
5903  if (save1 != 0 &&
5904  data.m_fragCount == (data.m_frags_complete + data.m_frags_not_started))
5905  {
5906  jam();
5907  ndbrequire(requestPtr.p->m_cnt_active);
5908  requestPtr.p->m_cnt_active--;
5909  treeNodePtr.p->m_state = TreeNode::TN_INACTIVE;
5910  }
5911 
5912  return sum;
5913 }
5914 
5915 void
5916 Dbspj::scanIndex_release_rangekeys(Ptr<Request> requestPtr,
5917  Ptr<TreeNode> treeNodePtr)
5918 {
5919  jam();
5920  DEBUG("scanIndex_release_rangekeys(), tree node " << treeNodePtr.i
5921  << " m_node_no: " << treeNodePtr.p->m_node_no);
5922 
5923  ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
5924  Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
5925  Ptr<ScanFragHandle> fragPtr;
5926 
5927  if (treeNodePtr.p->m_bits & TreeNode::T_PRUNE_PATTERN)
5928  {
5929  jam();
5930  for (list.first(fragPtr); !fragPtr.isNull(); list.next(fragPtr))
5931  {
5932  if (fragPtr.p->m_rangePtrI != RNIL)
5933  {
5934  releaseSection(fragPtr.p->m_rangePtrI);
5935  fragPtr.p->m_rangePtrI = RNIL;
5936  }
5937  fragPtr.p->reset_ranges();
5938  }
5939  }
5940  else
5941  {
5942  jam();
5943  list.first(fragPtr);
5944  if (fragPtr.p->m_rangePtrI != RNIL)
5945  {
5946  releaseSection(fragPtr.p->m_rangePtrI);
5947  fragPtr.p->m_rangePtrI = RNIL;
5948  }
5949  fragPtr.p->reset_ranges();
5950  }
5951 }
5952 
5958 void
5959 Dbspj::scanIndex_parent_batch_cleanup(Ptr<Request> requestPtr,
5960  Ptr<TreeNode> treeNodePtr)
5961 {
5962  DEBUG("scanIndex_parent_batch_cleanup");
5963  scanIndex_release_rangekeys(requestPtr,treeNodePtr);
5964 }
5965 
5966 void
5967 Dbspj::scanIndex_cleanup(Ptr<Request> requestPtr,
5968  Ptr<TreeNode> treeNodePtr)
5969 {
5970  ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
5971  DEBUG("scanIndex_cleanup");
5972 
5977  scanIndex_release_rangekeys(requestPtr,treeNodePtr);
5978 
5979  {
5980  Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
5981  list.remove();
5982  }
5983  if (treeNodePtr.p->m_bits & TreeNode::T_PRUNE_PATTERN)
5984  {
5985  jam();
5986  LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
5987  Local_pattern_store pattern(pool, data.m_prunePattern);
5988  pattern.release();
5989  }
5990  else if (treeNodePtr.p->m_bits & TreeNode::T_CONST_PRUNE)
5991  {
5992  jam();
5993  if (data.m_constPrunePtrI != RNIL)
5994  {
5995  jam();
5996  releaseSection(data.m_constPrunePtrI);
5997  data.m_constPrunePtrI = RNIL;
5998  }
5999  }
6000 
6001  cleanup_common(requestPtr, treeNodePtr);
6002 }
6003 
6011 const Dbspj::OpInfo*
6012 Dbspj::getOpInfo(Uint32 op)
6013 {
6014  DEBUG("getOpInfo(" << op << ")");
6015  switch(op){
6016  case QueryNode::QN_LOOKUP:
6017  return &Dbspj::g_LookupOpInfo;
6018  case QueryNode::QN_SCAN_FRAG:
6019  return &Dbspj::g_ScanFragOpInfo;
6020  case QueryNode::QN_SCAN_INDEX:
6021  return &Dbspj::g_ScanIndexOpInfo;
6022  default:
6023  return 0;
6024  }
6025 }
6026 
6034 static
6035 Uint32
6036 unpackList(Uint32 dstLen, Uint32 * dst, Dbspj::DABuffer & buffer)
6037 {
6038  const Uint32 * ptr = buffer.ptr;
6039  if (likely(ptr != buffer.end))
6040  {
6041  Uint32 tmp = * ptr++;
6042  Uint32 cnt = tmp & 0xFFFF;
6043 
6044  * dst ++ = (tmp >> 16); // Store first
6045  DEBUG("cnt: " << cnt << " first: " << (tmp >> 16));
6046 
6047  if (cnt > 1)
6048  {
6049  Uint32 len = cnt / 2;
6050  if (unlikely(cnt >= dstLen || (ptr + len > buffer.end)))
6051  goto error;
6052 
6053  cnt --; // subtract item stored in header
6054 
6055  for (Uint32 i = 0; i < cnt/2; i++)
6056  {
6057  * dst++ = (* ptr) & 0xFFFF;
6058  * dst++ = (* ptr) >> 16;
6059  ptr++;
6060  }
6061 
6062  if (cnt & 1)
6063  {
6064  * dst ++ = * ptr & 0xFFFF;
6065  ptr++;
6066  }
6067 
6068  cnt ++; // readd item stored in header
6069  }
6070  buffer.ptr = ptr;
6071  return cnt;
6072  }
6073  return 0;
6074 
6075 error:
6076  return dstLen + 1;
6077 }
6078 
6083 Uint32
6084 Dbspj::buildRowHeader(RowPtr::Header * header, SegmentedSectionPtr ptr)
6085 {
6086  Uint32 tmp, len;
6087  Uint32 * dst = header->m_offset;
6088  const Uint32 * const save = dst;
6089  SectionReader r0(ptr, getSectionSegmentPool());
6090  Uint32 offset = 0;
6091  do
6092  {
6093  * dst++ = offset;
6094  r0.getWord(&tmp);
6095  len = AttributeHeader::getDataSize(tmp);
6096  offset += 1 + len;
6097  } while (r0.step(len));
6098 
6099  return header->m_len = static_cast<Uint32>(dst - save);
6100 }
6101 
6106 Uint32
6107 Dbspj::buildRowHeader(RowPtr::Header * header, const Uint32 *& src, Uint32 len)
6108 {
6109  Uint32 * dst = header->m_offset;
6110  const Uint32 * save = dst;
6111  Uint32 offset = 0;
6112  for (Uint32 i = 0; i<len; i++)
6113  {
6114  * dst ++ = offset;
6115  Uint32 tmp = * src++;
6116  Uint32 tmp_len = AttributeHeader::getDataSize(tmp);
6117  offset += 1 + tmp_len;
6118  src += tmp_len;
6119  }
6120 
6121  return header->m_len = static_cast<Uint32>(dst - save);
6122 }
6123 
6124 Uint32
6125 Dbspj::appendToPattern(Local_pattern_store & pattern,
6126  DABuffer & tree, Uint32 len)
6127 {
6128  if (unlikely(tree.ptr + len > tree.end))
6129  return DbspjErr::InvalidTreeNodeSpecification;
6130 
6131  if (unlikely(pattern.append(tree.ptr, len)==0))
6132  return DbspjErr::OutOfQueryMemory;
6133 
6134  tree.ptr += len;
6135  return 0;
6136 }
6137 
6138 Uint32
6139 Dbspj::appendParamToPattern(Local_pattern_store& dst,
6140  const RowPtr::Linear & row, Uint32 col)
6141 {
6145  Uint32 offset = row.m_header->m_offset[col];
6146  const Uint32 * ptr = row.m_data + offset;
6147  Uint32 len = AttributeHeader::getDataSize(* ptr ++);
6148  /* Param COL's converted to DATA when appended to pattern */
6149  Uint32 info = QueryPattern::data(len);
6150  return dst.append(&info,1) && dst.append(ptr,len) ? 0 : DbspjErr::OutOfQueryMemory;
6151 }
6152 
6153 Uint32
6154 Dbspj::appendParamHeadToPattern(Local_pattern_store& dst,
6155  const RowPtr::Linear & row, Uint32 col)
6156 {
6160  Uint32 offset = row.m_header->m_offset[col];
6161  const Uint32 * ptr = row.m_data + offset;
6162  Uint32 len = AttributeHeader::getDataSize(*ptr);
6163  /* Param COL's converted to DATA when appended to pattern */
6164  Uint32 info = QueryPattern::data(len+1);
6165  return dst.append(&info,1) && dst.append(ptr,len+1) ? 0 : DbspjErr::OutOfQueryMemory;
6166 }
6167 
6168 Uint32
6169 Dbspj::appendTreeToSection(Uint32 & ptrI, SectionReader & tree, Uint32 len)
6170 {
6174  Uint32 SZ = 16;
6175  Uint32 tmp[16];
6176  while (len > SZ)
6177  {
6178  jam();
6179  tree.getWords(tmp, SZ);
6180  ndbrequire(appendToSection(ptrI, tmp, SZ));
6181  len -= SZ;
6182  }
6183 
6184  tree.getWords(tmp, len);
6185  return appendToSection(ptrI, tmp, len) ? 0 : 1;
6186 #if TODO
6187 err:
6188  return 1;
6189 #endif
6190 }
6191 
6192 void
6193 Dbspj::getCorrelationData(const RowPtr::Section & row,
6194  Uint32 col,
6195  Uint32& correlationNumber)
6196 {
6200  SegmentedSectionPtr ptr(row.m_dataPtr);
6201  SectionReader reader(ptr, getSectionSegmentPool());
6202  Uint32 offset = row.m_header->m_offset[col];
6203  ndbrequire(reader.step(offset));
6204  Uint32 tmp;
6205  ndbrequire(reader.getWord(&tmp));
6206  Uint32 len = AttributeHeader::getDataSize(tmp);
6207  ndbrequire(len == 1);
6208  ndbrequire(AttributeHeader::getAttributeId(tmp) == AttributeHeader::CORR_FACTOR32);
6209  ndbrequire(reader.getWord(&correlationNumber));
6210 }
6211 
6212 void
6213 Dbspj::getCorrelationData(const RowPtr::Linear & row,
6214  Uint32 col,
6215  Uint32& correlationNumber)
6216 {
6220  Uint32 offset = row.m_header->m_offset[col];
6221  Uint32 tmp = row.m_data[offset];
6222  Uint32 len = AttributeHeader::getDataSize(tmp);
6223  ndbrequire(len == 1);
6224  ndbrequire(AttributeHeader::getAttributeId(tmp) == AttributeHeader::CORR_FACTOR32);
6225  correlationNumber = row.m_data[offset+1];
6226 }
6227 
6228 Uint32
6229 Dbspj::appendColToSection(Uint32 & dst, const RowPtr::Section & row,
6230  Uint32 col, bool& hasNull)
6231 {
6235  SegmentedSectionPtr ptr(row.m_dataPtr);
6236  SectionReader reader(ptr, getSectionSegmentPool());
6237  Uint32 offset = row.m_header->m_offset[col];
6238  ndbrequire(reader.step(offset));
6239  Uint32 tmp;
6240  ndbrequire(reader.getWord(&tmp));
6241  Uint32 len = AttributeHeader::getDataSize(tmp);
6242  if (unlikely(len==0))
6243  {
6244  jam();
6245  hasNull = true; // NULL-value in key
6246  return 0;
6247  }
6248  return appendTreeToSection(dst, reader, len);
6249 }
6250 
6251 Uint32
6252 Dbspj::appendColToSection(Uint32 & dst, const RowPtr::Linear & row,
6253  Uint32 col, bool& hasNull)
6254 {
6258  Uint32 offset = row.m_header->m_offset[col];
6259  const Uint32 * ptr = row.m_data + offset;
6260  Uint32 len = AttributeHeader::getDataSize(* ptr ++);
6261  if (unlikely(len==0))
6262  {
6263  jam();
6264  hasNull = true; // NULL-value in key
6265  return 0;
6266  }
6267  return appendToSection(dst, ptr, len) ? 0 : DbspjErr::InvalidPattern;
6268 }
6269 
6270 Uint32
6271 Dbspj::appendAttrinfoToSection(Uint32 & dst, const RowPtr::Linear & row,
6272  Uint32 col, bool& hasNull)
6273 {
6277  Uint32 offset = row.m_header->m_offset[col];
6278  const Uint32 * ptr = row.m_data + offset;
6279  Uint32 len = AttributeHeader::getDataSize(* ptr);
6280  if (unlikely(len==0))
6281  {
6282  jam();
6283  hasNull = true; // NULL-value in key
6284  }
6285  return appendToSection(dst, ptr, 1 + len) ? 0 : DbspjErr::InvalidPattern;
6286 }
6287 
6288 Uint32
6289 Dbspj::appendAttrinfoToSection(Uint32 & dst, const RowPtr::Section & row,
6290  Uint32 col, bool& hasNull)
6291 {
6295  SegmentedSectionPtr ptr(row.m_dataPtr);
6296  SectionReader reader(ptr, getSectionSegmentPool());
6297  Uint32 offset = row.m_header->m_offset[col];
6298  ndbrequire(reader.step(offset));
6299  Uint32 tmp;
6300  ndbrequire(reader.peekWord(&tmp));
6301  Uint32 len = AttributeHeader::getDataSize(tmp);
6302  if (unlikely(len==0))
6303  {
6304  jam();
6305  hasNull = true; // NULL-value in key
6306  }
6307  return appendTreeToSection(dst, reader, 1 + len);
6308 }
6309 
6314 Uint32
6315 Dbspj::appendPkColToSection(Uint32 & dst, const RowPtr::Section & row, Uint32 col)
6316 {
6320  SegmentedSectionPtr ptr(row.m_dataPtr);
6321  SectionReader reader(ptr, getSectionSegmentPool());
6322  Uint32 offset = row.m_header->m_offset[col];
6323  ndbrequire(reader.step(offset));
6324  Uint32 tmp;
6325  ndbrequire(reader.getWord(&tmp));
6326  Uint32 len = AttributeHeader::getDataSize(tmp);
6327  ndbrequire(len>1); // NULL-value in PkKey is an error
6328  ndbrequire(reader.step(1)); // Skip fragid
6329  return appendTreeToSection(dst, reader, len-1);
6330 }
6331 
6336 Uint32
6337 Dbspj::appendPkColToSection(Uint32 & dst, const RowPtr::Linear & row, Uint32 col)
6338 {
6339  Uint32 offset = row.m_header->m_offset[col];
6340  Uint32 tmp = row.m_data[offset];
6341  Uint32 len = AttributeHeader::getDataSize(tmp);
6342  ndbrequire(len>1); // NULL-value in PkKey is an error
6343  return appendToSection(dst, row.m_data+offset+2, len - 1) ? 0 : 1;
6344 }
6345 
6346 Uint32
6347 Dbspj::appendFromParent(Uint32 & dst, Local_pattern_store& pattern,
6348  Local_pattern_store::ConstDataBufferIterator& it,
6349  Uint32 levels, const RowPtr & rowptr,
6350  bool& hasNull)
6351 {
6352  Ptr<TreeNode> treeNodePtr;
6353  m_treenode_pool.getPtr(treeNodePtr, rowptr.m_src_node_ptrI);
6354  Uint32 corrVal = rowptr.m_src_correlation;
6355  RowPtr targetRow;
6356  while (levels--)
6357  {
6358  jam();
6359  if (unlikely(treeNodePtr.p->m_parentPtrI == RNIL))
6360  {
6361  DEBUG_CRASH();
6362  return DbspjErr::InvalidPattern;
6363  }
6364  m_treenode_pool.getPtr(treeNodePtr, treeNodePtr.p->m_parentPtrI);
6365  if (unlikely((treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER_MAP) == 0))
6366  {
6367  DEBUG_CRASH();
6368  return DbspjErr::InvalidPattern;
6369  }
6370 
6371  RowRef ref;
6372  treeNodePtr.p->m_row_map.copyto(ref);
6373  Uint32 allocator = ref.m_allocator;
6374  const Uint32 * mapptr;
6375  if (allocator == 0)
6376  {
6377  jam();
6378  mapptr = get_row_ptr_stack(ref);
6379  }
6380  else
6381  {
6382  jam();
6383  mapptr = get_row_ptr_var(ref);
6384  }
6385 
6386  Uint32 pos = corrVal >> 16; // parent corr-val
6387  if (unlikely(! (pos < treeNodePtr.p->m_row_map.m_size)))
6388  {
6389  DEBUG_CRASH();
6390  return DbspjErr::InvalidPattern;
6391  }
6392 
6393  // load ref to parent row
6394  treeNodePtr.p->m_row_map.load(mapptr, pos, ref);
6395 
6396  const Uint32 * rowptr;
6397  if (allocator == 0)
6398  {
6399  jam();
6400  rowptr = get_row_ptr_stack(ref);
6401  }
6402  else
6403  {
6404  jam();
6405  rowptr = get_row_ptr_var(ref);
6406  }
6407  setupRowPtr(treeNodePtr, targetRow, ref, rowptr);
6408 
6409  if (levels)
6410  {
6411  jam();
6412  getCorrelationData(targetRow.m_row_data.m_linear,
6413  targetRow.m_row_data.m_linear.m_header->m_len - 1,
6414  corrVal);
6415  }
6416  }
6417 
6418  if (unlikely(it.isNull()))
6419  {
6420  DEBUG_CRASH();
6421  return DbspjErr::InvalidPattern;
6422  }
6423 
6424  Uint32 info = *it.data;
6425  Uint32 type = QueryPattern::getType(info);
6426  Uint32 val = QueryPattern::getLength(info);
6427  pattern.next(it);
6428  switch(type){
6429  case QueryPattern::P_COL:
6430  jam();
6431  return appendColToSection(dst, targetRow.m_row_data.m_linear, val, hasNull);
6432  break;
6433  case QueryPattern::P_UNQ_PK:
6434  jam();
6435  return appendPkColToSection(dst, targetRow.m_row_data.m_linear, val);
6436  break;
6437  case QueryPattern::P_ATTRINFO:
6438  jam();
6439  return appendAttrinfoToSection(dst, targetRow.m_row_data.m_linear, val, hasNull);
6440  break;
6441  case QueryPattern::P_DATA:
6442  jam();
6443  // retreiving DATA from parent...is...an error
6444  break;
6445  case QueryPattern::P_PARENT:
6446  jam();
6447  // no point in nesting P_PARENT...an error
6448  break;
6449  case QueryPattern::P_PARAM:
6450  case QueryPattern::P_PARAM_HEADER:
6451  jam();
6452  // should have been expanded during build
6453  break;
6454  }
6455 
6456  DEBUG_CRASH();
6457  return DbspjErr::InvalidPattern;
6458 }
6459 
6460 Uint32
6461 Dbspj::appendDataToSection(Uint32 & ptrI,
6462  Local_pattern_store& pattern,
6463  Local_pattern_store::ConstDataBufferIterator& it,
6464  Uint32 len, bool& hasNull)
6465 {
6466  if (unlikely(len==0))
6467  {
6468  jam();
6469  hasNull = true;
6470  return 0;
6471  }
6472 
6473 #if 0
6474 
6477  Uint32 tmp[NDB_SECTION_SEGMENT_SZ];
6478  while (len > NDB_SECTION_SEGMENT_SZ)
6479  {
6480  pattern.copyout(tmp, NDB_SECTION_SEGMENT_SZ, it);
6481  appendToSection(ptrI, tmp, NDB_SECTION_SEGMENT_SZ);
6482  len -= NDB_SECTION_SEGMENT_SZ;
6483  }
6484 
6485  pattern.copyout(tmp, len, it);
6486  appendToSection(ptrI, tmp, len);
6487  return 0;
6488 #else
6489  Uint32 remaining = len;
6490  Uint32 dstIdx = 0;
6491  Uint32 tmp[NDB_SECTION_SEGMENT_SZ];
6492 
6493  while (remaining > 0 && !it.isNull())
6494  {
6495  tmp[dstIdx] = *it.data;
6496  remaining--;
6497  dstIdx++;
6498  pattern.next(it);
6499  if (dstIdx == NDB_SECTION_SEGMENT_SZ || remaining == 0)
6500  {
6501  if (!appendToSection(ptrI, tmp, dstIdx))
6502  {
6503  DEBUG_CRASH();
6504  return DbspjErr::InvalidPattern;
6505  }
6506  dstIdx = 0;
6507  }
6508  }
6509  if (remaining > 0)
6510  {
6511  DEBUG_CRASH();
6512  return DbspjErr::InvalidPattern;
6513  }
6514  else
6515  {
6516  return 0;
6517  }
6518 #endif
6519 }
6520 
6521 Uint32
6522 Dbspj::createEmptySection(Uint32 & dst)
6523 {
6524  Uint32 tmp;
6525  SegmentedSectionPtr ptr;
6526  if (likely(import(ptr, &tmp, 0)))
6527  {
6528  jam();
6529  dst = ptr.i;
6530  return 0;
6531  }
6532 
6533  jam();
6534  return DbspjErr::OutOfSectionMemory;
6535 }
6536 
6540 Uint32
6541 Dbspj::expandS(Uint32 & _dst, Local_pattern_store& pattern,
6542  const RowPtr & row, bool& hasNull)
6543 {
6544  Uint32 err;
6545  Uint32 dst = _dst;
6546  hasNull = false;
6547  Local_pattern_store::ConstDataBufferIterator it;
6548  pattern.first(it);
6549  while (!it.isNull())
6550  {
6551  Uint32 info = *it.data;
6552  Uint32 type = QueryPattern::getType(info);
6553  Uint32 val = QueryPattern::getLength(info);
6554  pattern.next(it);
6555  switch(type){
6556  case QueryPattern::P_COL:
6557  jam();
6558  err = appendColToSection(dst, row.m_row_data.m_section, val, hasNull);
6559  break;
6560  case QueryPattern::P_UNQ_PK:
6561  jam();
6562  err = appendPkColToSection(dst, row.m_row_data.m_section, val);
6563  break;
6564  case QueryPattern::P_ATTRINFO:
6565  jam();
6566  err = appendAttrinfoToSection(dst, row.m_row_data.m_section, val, hasNull);
6567  break;
6568  case QueryPattern::P_DATA:
6569  jam();
6570  err = appendDataToSection(dst, pattern, it, val, hasNull);
6571  break;
6572  case QueryPattern::P_PARENT:
6573  jam();
6574  // P_PARENT is a prefix to another pattern token
6575  // that permits code to access rows from earlier than immediate parent.
6576  // val is no of levels to move up the tree
6577  err = appendFromParent(dst, pattern, it, val, row, hasNull);
6578  break;
6579  // PARAM's was converted to DATA by ::expand(pattern...)
6580  case QueryPattern::P_PARAM:
6581  case QueryPattern::P_PARAM_HEADER:
6582  default:
6583  jam();
6584  err = DbspjErr::InvalidPattern;
6585  DEBUG_CRASH();
6586  }
6587  if (unlikely(err != 0))
6588  {
6589  jam();
6590  DEBUG_CRASH();
6591  goto error;
6592  }
6593  }
6594 
6595  _dst = dst;
6596  return 0;
6597 error:
6598  jam();
6599  return err;
6600 }
6601 
6605 Uint32
6606 Dbspj::expandL(Uint32 & _dst, Local_pattern_store& pattern,
6607  const RowPtr & row, bool& hasNull)
6608 {
6609  Uint32 err;
6610  Uint32 dst = _dst;
6611  hasNull = false;
6612  Local_pattern_store::ConstDataBufferIterator it;
6613  pattern.first(it);
6614  while (!it.isNull())
6615  {
6616  Uint32 info = *it.data;
6617  Uint32 type = QueryPattern::getType(info);
6618  Uint32 val = QueryPattern::getLength(info);
6619  pattern.next(it);
6620  switch(type){
6621  case QueryPattern::P_COL:
6622  jam();
6623  err = appendColToSection(dst, row.m_row_data.m_linear, val, hasNull);
6624  break;
6625  case QueryPattern::P_UNQ_PK:
6626  jam();
6627  err = appendPkColToSection(dst, row.m_row_data.m_linear, val);
6628  break;
6629  case QueryPattern::P_ATTRINFO:
6630  jam();
6631  err = appendAttrinfoToSection(dst, row.m_row_data.m_linear, val, hasNull);
6632  break;
6633  case QueryPattern::P_DATA:
6634  jam();
6635  err = appendDataToSection(dst, pattern, it, val, hasNull);
6636  break;
6637  case QueryPattern::P_PARENT:
6638  jam();
6639  // P_PARENT is a prefix to another pattern token
6640  // that permits code to access rows from earlier than immediate parent
6641  // val is no of levels to move up the tree
6642  err = appendFromParent(dst, pattern, it, val, row, hasNull);
6643  break;
6644  // PARAM's was converted to DATA by ::expand(pattern...)
6645  case QueryPattern::P_PARAM:
6646  case QueryPattern::P_PARAM_HEADER:
6647  default:
6648  jam();
6649  err = DbspjErr::InvalidPattern;
6650  DEBUG_CRASH();
6651  }
6652  if (unlikely(err != 0))
6653  {
6654  jam();
6655  DEBUG_CRASH();
6656  goto error;
6657  }
6658  }
6659 
6660  _dst = dst;
6661  return 0;
6662 error:
6663  jam();
6664  return err;
6665 }
6666 
6667 Uint32
6668 Dbspj::expand(Uint32 & ptrI, DABuffer& pattern, Uint32 len,
6669  DABuffer& param, Uint32 paramCnt, bool& hasNull)
6670 {
6674  Uint32 err;
6675  Uint32 tmp[1+MAX_ATTRIBUTES_IN_TABLE];
6676  struct RowPtr::Linear row;
6677  row.m_data = param.ptr;
6678  row.m_header = CAST_PTR(RowPtr::Header, &tmp[0]);
6679  buildRowHeader(CAST_PTR(RowPtr::Header, &tmp[0]), param.ptr, paramCnt);
6680 
6681  Uint32 dst = ptrI;
6682  const Uint32 * ptr = pattern.ptr;
6683  const Uint32 * end = ptr + len;
6684  hasNull = false;
6685 
6686  for (; ptr < end; )
6687  {
6688  Uint32 info = * ptr++;
6689  Uint32 type = QueryPattern::getType(info);
6690  Uint32 val = QueryPattern::getLength(info);
6691  switch(type){
6692  case QueryPattern::P_PARAM:
6693  jam();
6694  ndbassert(val < paramCnt);
6695  err = appendColToSection(dst, row, val, hasNull);
6696  break;
6697  case QueryPattern::P_PARAM_HEADER:
6698  jam();
6699  ndbassert(val < paramCnt);
6700  err = appendAttrinfoToSection(dst, row, val, hasNull);
6701  break;
6702  case QueryPattern::P_DATA:
6703  if (unlikely(val==0))
6704  {
6705  jam();
6706  hasNull = true;
6707  err = 0;
6708  }
6709  else if (likely(appendToSection(dst, ptr, val)))
6710  {
6711  jam();
6712  err = 0;
6713  }
6714  else
6715  {
6716  jam();
6717  err = DbspjErr::InvalidPattern;
6718  }
6719  ptr += val;
6720  break;
6721  case QueryPattern::P_COL: // (linked) COL's not expected here
6722  case QueryPattern::P_PARENT: // Prefix to P_COL
6723  case QueryPattern::P_ATTRINFO:
6724  case QueryPattern::P_UNQ_PK:
6725  default:
6726  jam();
6727  jamLine(type);
6728  err = DbspjErr::InvalidPattern;
6729  DEBUG_CRASH();
6730  }
6731  if (unlikely(err != 0))
6732  {
6733  jam();
6734  DEBUG_CRASH();
6735  goto error;
6736  }
6737  }
6738 
6742  pattern.ptr = end;
6743 
6744 error:
6745  jam();
6746  ptrI = dst;
6747  return err;
6748 }
6749 
6750 Uint32
6751 Dbspj::expand(Local_pattern_store& dst, Ptr<TreeNode> treeNodePtr,
6752  DABuffer& pattern, Uint32 len,
6753  DABuffer& param, Uint32 paramCnt)
6754 {
6758  Uint32 err;
6759  Uint32 tmp[1+MAX_ATTRIBUTES_IN_TABLE];
6760  struct RowPtr::Linear row;
6761  row.m_header = CAST_PTR(RowPtr::Header, &tmp[0]);
6762  row.m_data = param.ptr;
6763  buildRowHeader(CAST_PTR(RowPtr::Header, &tmp[0]), param.ptr, paramCnt);
6764 
6765  const Uint32 * end = pattern.ptr + len;
6766  for (; pattern.ptr < end; )
6767  {
6768  Uint32 info = *pattern.ptr;
6769  Uint32 type = QueryPattern::getType(info);
6770  Uint32 val = QueryPattern::getLength(info);
6771  switch(type){
6772  case QueryPattern::P_COL:
6773  case QueryPattern::P_UNQ_PK:
6774  case QueryPattern::P_ATTRINFO:
6775  jam();
6776  err = appendToPattern(dst, pattern, 1);
6777  break;
6778  case QueryPattern::P_DATA:
6779  jam();
6780  err = appendToPattern(dst, pattern, val+1);
6781  break;
6782  case QueryPattern::P_PARAM:
6783  jam();
6784  // NOTE: Converted to P_DATA by appendParamToPattern
6785  ndbassert(val < paramCnt);
6786  err = appendParamToPattern(dst, row, val);
6787  pattern.ptr++;
6788  break;
6789  case QueryPattern::P_PARAM_HEADER:
6790  jam();
6791  // NOTE: Converted to P_DATA by appendParamHeadToPattern
6792  ndbassert(val < paramCnt);
6793  err = appendParamHeadToPattern(dst, row, val);
6794  pattern.ptr++;
6795  break;
6796  case QueryPattern::P_PARENT: // Prefix to P_COL
6797  {
6798  jam();
6799  err = appendToPattern(dst, pattern, 1);
6800 
6801  // Locate requested grandparent and request it to
6802  // T_ROW_BUFFER its result rows
6803  Ptr<TreeNode> parentPtr;
6804  m_treenode_pool.getPtr(parentPtr, treeNodePtr.p->m_parentPtrI);
6805  while (val--)
6806  {
6807  jam();
6808  ndbassert(parentPtr.p->m_parentPtrI != RNIL);
6809  m_treenode_pool.getPtr(parentPtr, parentPtr.p->m_parentPtrI);
6810  parentPtr.p->m_bits |= TreeNode::T_ROW_BUFFER;
6811  parentPtr.p->m_bits |= TreeNode::T_ROW_BUFFER_MAP;
6812  }
6813  Ptr<Request> requestPtr;
6814  m_request_pool.getPtr(requestPtr, treeNodePtr.p->m_requestPtrI);
6815  requestPtr.p->m_bits |= Request::RT_ROW_BUFFERS;
6816  break;
6817  }
6818  default:
6819  jam();
6820  err = DbspjErr::InvalidPattern;
6821  DEBUG_CRASH();
6822  }
6823 
6824  if (unlikely(err != 0))
6825  {
6826  DEBUG_CRASH();
6827  goto error;
6828  }
6829  }
6830  return 0;
6831 
6832 error:
6833  jam();
6834  return err;
6835 }
6836 
6837 Uint32
6838 Dbspj::parseDA(Build_context& ctx,
6839  Ptr<Request> requestPtr,
6840  Ptr<TreeNode> treeNodePtr,
6841  DABuffer& tree, Uint32 treeBits,
6842  DABuffer& param, Uint32 paramBits)
6843 {
6844  Uint32 err;
6845  Uint32 attrInfoPtrI = RNIL;
6846  Uint32 attrParamPtrI = RNIL;
6847 
6848  do
6849  {
6850  if (treeBits & DABits::NI_REPEAT_SCAN_RESULT)
6851  {
6852  jam();
6853  DEBUG("use REPEAT_SCAN_RESULT when returning results");
6854  requestPtr.p->m_bits |= Request::RT_REPEAT_SCAN_RESULT;
6855  } // DABits::NI_HAS_PARENT
6856 
6857  if (treeBits & DABits::NI_HAS_PARENT)
6858  {
6859  jam();
6860  DEBUG("NI_HAS_PARENT");
6869  err = DbspjErr::InvalidTreeNodeSpecification;
6870  Uint32 dst[63];
6871  Uint32 cnt = unpackList(NDB_ARRAY_SIZE(dst), dst, tree);
6872  if (unlikely(cnt > NDB_ARRAY_SIZE(dst)))
6873  {
6874  DEBUG_CRASH();
6875  break;
6876  }
6877 
6878  err = 0;
6879 
6880  if (unlikely(cnt!=1))
6881  {
6885  DEBUG_CRASH();
6886  }
6887 
6888  for (Uint32 i = 0; i<cnt; i++)
6889  {
6890  DEBUG("adding " << dst[i] << " as parent");
6891  Ptr<TreeNode> parentPtr = ctx.m_node_list[dst[i]];
6892  LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
6893  Local_dependency_map map(pool, parentPtr.p->m_dependent_nodes);
6894  if (unlikely(!map.append(&treeNodePtr.i, 1)))
6895  {
6896  err = DbspjErr::OutOfQueryMemory;
6897  DEBUG_CRASH();
6898  break;
6899  }
6900  parentPtr.p->m_bits &= ~(Uint32)TreeNode::T_LEAF;
6901  treeNodePtr.p->m_parentPtrI = parentPtr.i;
6902 
6903  // Build Bitmask of all ancestors to treeNode
6904  treeNodePtr.p->m_ancestors = parentPtr.p->m_ancestors;
6905  treeNodePtr.p->m_ancestors.set(parentPtr.p->m_node_no);
6906  }
6907 
6908  if (unlikely(err != 0))
6909  break;
6910  } // DABits::NI_HAS_PARENT
6911 
6912  err = DbspjErr::InvalidTreeParametersSpecificationKeyParamBitsMissmatch;
6913  if (unlikely( ((treeBits & DABits::NI_KEY_PARAMS)==0) !=
6914  ((paramBits & DABits::PI_KEY_PARAMS)==0)))
6915  {
6916  DEBUG_CRASH();
6917  break;
6918  }
6919 
6920  if (treeBits & (DABits::NI_KEY_PARAMS
6921  | DABits::NI_KEY_LINKED
6922  | DABits::NI_KEY_CONSTS))
6923  {
6924  jam();
6925  DEBUG("NI_KEY_PARAMS | NI_KEY_LINKED | NI_KEY_CONSTS");
6926 
6933  Uint32 len_cnt = * tree.ptr ++;
6934  Uint32 len = len_cnt & 0xFFFF; // length of pattern in words
6935  Uint32 cnt = len_cnt >> 16; // no of parameters
6936 
6937  LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
6938  Local_pattern_store pattern(pool, treeNodePtr.p->m_keyPattern);
6939 
6940  err = DbspjErr::InvalidTreeParametersSpecificationIncorrectKeyParamCount;
6941  if (unlikely( ((cnt==0) != ((treeBits & DABits::NI_KEY_PARAMS) == 0)) ||
6942  ((cnt==0) != ((paramBits & DABits::PI_KEY_PARAMS) == 0))))
6943  {
6944  DEBUG_CRASH();
6945  break;
6946  }
6947 
6948  if (treeBits & DABits::NI_KEY_LINKED)
6949  {
6950  jam();
6951  DEBUG("LINKED-KEY PATTERN w/ " << cnt << " PARAM values");
6955  err = expand(pattern, treeNodePtr, tree, len, param, cnt);
6956 
6960  treeNodePtr.p->m_bits |= TreeNode::T_KEYINFO_CONSTRUCTED;
6961  }
6962  else
6963  {
6964  jam();
6965  DEBUG("FIXED-KEY w/ " << cnt << " PARAM values");
6970  bool hasNull;
6971  Uint32 keyInfoPtrI = RNIL;
6972  err = expand(keyInfoPtrI, tree, len, param, cnt, hasNull);
6973  if (unlikely(hasNull))
6974  {
6975  /* API should have elliminated requests w/ const-NULL keys */
6976  jam();
6977  DEBUG("BEWARE: FIXED-key contain NULL values");
6978 // treeNodePtr.p->m_bits |= TreeNode::T_NULL_PRUNE;
6979 // break;
6980  ndbrequire(false);
6981  }
6982  treeNodePtr.p->m_send.m_keyInfoPtrI = keyInfoPtrI;
6983  }
6984 
6985  if (unlikely(err != 0))
6986  {
6987  DEBUG_CRASH();
6988  break;
6989  }
6990  } // DABits::NI_KEY_...
6991 
6992  const Uint32 mask =
6993  DABits::NI_LINKED_ATTR | DABits::NI_ATTR_INTERPRET |
6994  DABits::NI_ATTR_LINKED | DABits::NI_ATTR_PARAMS;
6995 
6996  if (((treeBits & mask) | (paramBits & DABits::PI_ATTR_LIST)) != 0)
6997  {
6998  jam();
7026  Uint32 sections[5] = { 0, 0, 0, 0, 0 };
7027  Uint32 * sectionptrs = 0;
7028 
7029  bool interpreted =
7030  (treeBits & DABits::NI_ATTR_INTERPRET) ||
7031  (paramBits & DABits::PI_ATTR_INTERPRET) ||
7032  (treeNodePtr.p->m_bits & TreeNode::T_ATTR_INTERPRETED);
7033 
7034  if (interpreted)
7035  {
7040  jam();
7041  err = DbspjErr::OutOfSectionMemory;
7042  if (unlikely(!appendToSection(attrInfoPtrI, sections, 5)))
7043  {
7044  DEBUG_CRASH();
7045  break;
7046  }
7047 
7048  SegmentedSectionPtr ptr;
7049  getSection(ptr, attrInfoPtrI);
7050  sectionptrs = ptr.p->theData;
7051 
7052  if (treeBits & DABits::NI_ATTR_INTERPRET)
7053  {
7054  jam();
7055 
7059  err = DbspjErr::BothTreeAndParametersContainInterpretedProgram;
7060  if (unlikely(paramBits & DABits::PI_ATTR_INTERPRET))
7061  {
7062  DEBUG_CRASH();
7063  break;
7064  }
7065 
7066  treeNodePtr.p->m_bits |= TreeNode::T_ATTR_INTERPRETED;
7067  Uint32 len2 = * tree.ptr++;
7068  Uint32 len_prg = len2 & 0xFFFF; // Length of interpret program
7069  Uint32 len_pattern = len2 >> 16;// Length of attr param pattern
7070  err = DbspjErr::OutOfSectionMemory;
7071  if (unlikely(!appendToSection(attrInfoPtrI, tree.ptr, len_prg)))
7072  {
7073  DEBUG_CRASH();
7074  break;
7075  }
7076 
7077  tree.ptr += len_prg;
7078  sectionptrs[1] = len_prg; // size of interpret program
7079 
7080  Uint32 tmp = * tree.ptr ++; // attr-pattern header
7081  Uint32 cnt = tmp & 0xFFFF;
7082 
7083  if (treeBits & DABits::NI_ATTR_LINKED)
7084  {
7085  jam();
7089  LocalArenaPoolImpl pool(requestPtr.p->m_arena,
7090  m_dependency_map_pool);
7091  Local_pattern_store pattern(pool,treeNodePtr.p->m_attrParamPattern);
7092  err = expand(pattern, treeNodePtr, tree, len_pattern, param, cnt);
7093  if (unlikely(err))
7094  {
7095  DEBUG_CRASH();
7096  break;
7097  }
7101  treeNodePtr.p->m_bits |= TreeNode::T_ATTRINFO_CONSTRUCTED;
7102  }
7103  else
7104  {
7105  jam();
7110  bool hasNull;
7111  err = expand(attrParamPtrI, tree, len_pattern, param, cnt, hasNull);
7112  if (unlikely(err))
7113  {
7114  DEBUG_CRASH();
7115  break;
7116  }
7117 // ndbrequire(!hasNull);
7118  }
7119  }
7120  else // if (treeBits & DABits::NI_ATTR_INTERPRET)
7121  {
7122  jam();
7126  ndbrequire((treeBits & DABits::NI_ATTR_PARAMS) == 0);
7127  ndbrequire((paramBits & DABits::PI_ATTR_PARAMS) == 0);
7128  ndbrequire((treeBits & DABits::NI_ATTR_LINKED) == 0);
7129 
7130  treeNodePtr.p->m_bits |= TreeNode::T_ATTR_INTERPRETED;
7131 
7132  if (! (paramBits & DABits::PI_ATTR_INTERPRET))
7133  {
7134  jam();
7135 
7141  Uint32 tmp = Interpreter::ExitOK();
7142  err = DbspjErr::OutOfSectionMemory;
7143  if (unlikely(!appendToSection(attrInfoPtrI, &tmp, 1)))
7144  {
7145  DEBUG_CRASH();
7146  break;
7147  }
7148  sectionptrs[1] = 1;
7149  }
7150  } // if (treeBits & DABits::NI_ATTR_INTERPRET)
7151  } // if (interpreted)
7152 
7153  if (paramBits & DABits::PI_ATTR_INTERPRET)
7154  {
7155  jam();
7156 
7160  const Uint32 len2 = * param.ptr++;
7161  Uint32 program_len = len2 & 0xFFFF;
7162  Uint32 subroutine_len = len2 >> 16;
7163  err = DbspjErr::OutOfSectionMemory;
7164  if (unlikely(!appendToSection(attrInfoPtrI, param.ptr, program_len)))
7165  {
7166  DEBUG_CRASH();
7167  break;
7168  }
7175  sectionptrs[1] = program_len;
7176  param.ptr += program_len;
7177 
7178  if (subroutine_len)
7179  {
7180  if (unlikely(!appendToSection(attrParamPtrI,
7181  param.ptr, subroutine_len)))
7182  {
7183  DEBUG_CRASH();
7184  break;
7185  }
7186  sectionptrs[4] = subroutine_len;
7187  param.ptr += subroutine_len;
7188  }
7189  treeNodePtr.p->m_bits |= TreeNode::T_ATTR_INTERPRETED;
7190  }
7191 
7192  Uint32 sum_read = 0;
7193  Uint32 dst[MAX_ATTRIBUTES_IN_TABLE + 2];
7194 
7195  if (paramBits & DABits::PI_ATTR_LIST)
7196  {
7197  jam();
7198  Uint32 len = * param.ptr++;
7199  DEBUG("PI_ATTR_LIST");
7200 
7201  treeNodePtr.p->m_bits |= TreeNode::T_USER_PROJECTION;
7202  err = DbspjErr::OutOfSectionMemory;
7203  if (!appendToSection(attrInfoPtrI, param.ptr, len))
7204  {
7205  DEBUG_CRASH();
7206  break;
7207  }
7208 
7209  param.ptr += len;
7210 
7214  Uint32 flush[4];
7215  flush[0] = AttributeHeader::FLUSH_AI << 16;
7216  flush[1] = ctx.m_resultRef;
7217  flush[2] = ctx.m_resultData;
7218  flush[3] = ctx.m_senderRef; // RouteRef
7219  if (!appendToSection(attrInfoPtrI, flush, 4))
7220  {
7221  DEBUG_CRASH();
7222  break;
7223  }
7224 
7225  sum_read += len + 4;
7226  }
7227 
7228  if (treeBits & DABits::NI_LINKED_ATTR)
7229  {
7230  jam();
7231  DEBUG("NI_LINKED_ATTR");
7232  err = DbspjErr::InvalidTreeNodeSpecification;
7233  Uint32 cnt = unpackList(MAX_ATTRIBUTES_IN_TABLE, dst, tree);
7234  if (unlikely(cnt > MAX_ATTRIBUTES_IN_TABLE))
7235  {
7236  DEBUG_CRASH();
7237  break;
7238  }
7239 
7243  for (Uint32 i = 0; i<cnt; i++)
7244  dst[i] <<= 16;
7245 
7249  dst[cnt++] = AttributeHeader::CORR_FACTOR32 << 16;
7250 
7251  err = DbspjErr::OutOfSectionMemory;
7252  if (!appendToSection(attrInfoPtrI, dst, cnt))
7253  {
7254  DEBUG_CRASH();
7255  break;
7256  }
7257 
7258  sum_read += cnt;
7259  }
7260 
7261  if (interpreted)
7262  {
7263  jam();
7268  sectionptrs[3] = sum_read;
7269 
7270  if (attrParamPtrI != RNIL)
7271  {
7272  jam();
7273  ndbrequire(!(treeNodePtr.p->m_bits&TreeNode::T_ATTRINFO_CONSTRUCTED));
7274 
7275  SegmentedSectionPtr ptr;
7276  getSection(ptr, attrParamPtrI);
7277  {
7278  SectionReader r0(ptr, getSectionSegmentPool());
7279  err = appendTreeToSection(attrInfoPtrI, r0, ptr.sz);
7280  sectionptrs[4] = ptr.sz;
7281  if (unlikely(err != 0))
7282  {
7283  DEBUG_CRASH();
7284  break;
7285  }
7286  }
7287  releaseSection(attrParamPtrI);
7288  }
7289  }
7290 
7291  treeNodePtr.p->m_send.m_attrInfoPtrI = attrInfoPtrI;
7292  } // if (((treeBits & mask) | (paramBits & DABits::PI_ATTR_LIST)) != 0)
7293 
7294  return 0;
7295  } while (0);
7296 
7297  return err;
7298 }
7299 
7308 void Dbspj::execDBINFO_SCANREQ(Signal *signal)
7309 {
7310  DbinfoScanReq req= * CAST_PTR(DbinfoScanReq, &signal->theData[0]);
7311  const Ndbinfo::ScanCursor* cursor =
7312  CAST_CONSTPTR(Ndbinfo::ScanCursor, DbinfoScan::getCursorPtr(&req));
7313  Ndbinfo::Ratelimit rl;
7314 
7315  jamEntry();
7316 
7317  switch(req.tableId){
7318 
7319  // The SPJ block only implements the ndbinfo.counters table.
7320  case Ndbinfo::COUNTERS_TABLEID:
7321  {
7322  Ndbinfo::counter_entry counters[] = {
7323  { Ndbinfo::SPJ_READS_RECEIVED_COUNTER,
7324  c_Counters.get_counter(CI_READS_RECEIVED) },
7325  { Ndbinfo::SPJ_LOCAL_READS_SENT_COUNTER,
7326  c_Counters.get_counter(CI_LOCAL_READS_SENT) },
7327  { Ndbinfo::SPJ_REMOTE_READS_SENT_COUNTER,
7328  c_Counters.get_counter(CI_REMOTE_READS_SENT) },
7329  { Ndbinfo::SPJ_READS_NOT_FOUND_COUNTER,
7330  c_Counters.get_counter(CI_READS_NOT_FOUND) },
7331  { Ndbinfo::SPJ_TABLE_SCANS_RECEIVED_COUNTER,
7332  c_Counters.get_counter(CI_TABLE_SCANS_RECEIVED) },
7333  { Ndbinfo::SPJ_LOCAL_TABLE_SCANS_SENT_COUNTER,
7334  c_Counters.get_counter(CI_LOCAL_TABLE_SCANS_SENT) },
7335  { Ndbinfo::SPJ_RANGE_SCANS_RECEIVED_COUNTER,
7336  c_Counters.get_counter(CI_RANGE_SCANS_RECEIVED) },
7337  { Ndbinfo::SPJ_LOCAL_RANGE_SCANS_SENT_COUNTER,
7338  c_Counters.get_counter(CI_LOCAL_RANGE_SCANS_SENT) },
7339  { Ndbinfo::SPJ_REMOTE_RANGE_SCANS_SENT_COUNTER,
7340  c_Counters.get_counter(CI_REMOTE_RANGE_SCANS_SENT) },
7341  { Ndbinfo::SPJ_SCAN_BATCHES_RETURNED_COUNTER,
7342  c_Counters.get_counter(CI_SCAN_BATCHES_RETURNED) },
7343  { Ndbinfo::SPJ_SCAN_ROWS_RETURNED_COUNTER,
7344  c_Counters.get_counter(CI_SCAN_ROWS_RETURNED) },
7345  { Ndbinfo::SPJ_PRUNED_RANGE_SCANS_RECEIVED_COUNTER,
7346  c_Counters.get_counter(CI_PRUNED_RANGE_SCANS_RECEIVED) },
7347  { Ndbinfo::SPJ_CONST_PRUNED_RANGE_SCANS_RECEIVED_COUNTER,
7348  c_Counters.get_counter(CI_CONST_PRUNED_RANGE_SCANS_RECEIVED) }
7349  };
7350  const size_t num_counters = sizeof(counters) / sizeof(counters[0]);
7351 
7352  Uint32 i = cursor->data[0];
7353  const BlockNumber bn = blockToMain(number());
7354  while(i < num_counters)
7355  {
7356  jam();
7357  Ndbinfo::Row row(signal, req);
7358  row.write_uint32(getOwnNodeId());
7359  row.write_uint32(bn); // block number
7360  row.write_uint32(instance()); // block instance
7361  row.write_uint32(counters[i].id);
7362 
7363  row.write_uint64(counters[i].val);
7364  ndbinfo_send_row(signal, req, row, rl);
7365  i++;
7366  if (rl.need_break(req))
7367  {
7368  jam();
7369  ndbinfo_send_scan_break(signal, req, rl, i);
7370  return;
7371  }
7372  }
7373  break;
7374  }
7375 
7376  default:
7377  break;
7378  }
7379 
7380  ndbinfo_send_scan_conf(signal, req, rl);
7381 } // Dbspj::execDBINFO_SCANREQ(Signal *signal)
7382 
7383 void Dbspj::IncrementalStatistics::update(double sample)
7384 {
7385  // Prevent wrap-around
7386  if(m_noOfSamples < 0xffffffff)
7387  {
7388  m_noOfSamples++;
7389  const double delta = sample - m_mean;
7390  m_mean += delta/m_noOfSamples;
7391  m_sumSquare += delta * (sample - m_mean);
7392  }
7393 }