MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
SimulatedBlock.cpp
1 /*
2  Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 /*
19  This file is used to build both the multithreaded and the singlethreaded
20  ndbd. It is built twice, included from either SimulatedBlock_mt.cpp (with
21  the macro NDBD_MULTITHREADED defined) or SimulatedBlock_nonmt.cpp (with the
22  macro not defined).
23 */
24 
25 #include <ndb_global.h>
26 
27 #include "SimulatedBlock.hpp"
28 #include <NdbOut.hpp>
29 #include <OutputStream.hpp>
30 #include <GlobalData.hpp>
31 #include <Emulator.hpp>
32 #include <WatchDog.hpp>
33 #include <ErrorHandlingMacros.hpp>
34 #include <TimeQueue.hpp>
35 #include <TransporterRegistry.hpp>
36 #include <SignalLoggerManager.hpp>
37 #include <FastScheduler.hpp>
38 #include "ndbd_malloc.hpp"
39 #include <signaldata/EventReport.hpp>
40 #include <signaldata/ContinueFragmented.hpp>
41 #include <signaldata/NodeStateSignalData.hpp>
42 #include <signaldata/FsRef.hpp>
43 #include <signaldata/SignalDroppedRep.hpp>
44 #include <signaldata/LocalRouteOrd.hpp>
45 #include <signaldata/TransIdAI.hpp>
46 #include <signaldata/Sync.hpp>
47 #include <DebuggerNames.hpp>
48 #include "LongSignal.hpp"
49 
50 #include <Properties.hpp>
51 #include "Configuration.hpp"
52 #include <AttributeDescriptor.hpp>
53 #include <NdbSqlUtil.hpp>
54 
55 #include "../blocks/dbdih/Dbdih.hpp"
56 #include <signaldata/CallbackSignal.hpp>
57 #include "LongSignalImpl.hpp"
58 
59 #include <EventLogger.hpp>
60 extern EventLogger * g_eventLogger;
61 
62 #define ljamEntry() jamEntryLine(30000 + __LINE__)
63 #define ljam() jamLine(30000 + __LINE__)
64 
65 //
66 // Constructor, Destructor
67 //
68 SimulatedBlock::SimulatedBlock(BlockNumber blockNumber,
69  struct Block_context & ctx,
70  Uint32 instanceNumber)
71  : theNodeId(globalData.ownId),
72  theNumber(blockNumber),
73  theInstance(instanceNumber),
74  theReference(numberToRef(blockNumber, instanceNumber, globalData.ownId)),
75  theInstanceList(0),
76  theMainInstance(0),
77  m_ctx(ctx),
78  m_global_page_pool(globalData.m_global_page_pool),
79  m_shared_page_pool(globalData.m_shared_page_pool),
80  c_fragmentInfoHash(c_fragmentInfoPool),
81  c_linearFragmentSendList(c_fragmentSendPool),
82  c_segmentedFragmentSendList(c_fragmentSendPool),
83  c_mutexMgr(* this),
84  c_counterMgr(* this)
85 #ifdef VM_TRACE
86  ,debugOut(*new NdbOut(*new FileOutputStream(globalSignalLoggers.getOutputStream())))
87 #endif
88 #ifdef VM_TRACE_TIME
89  ,m_currentGsn(0)
90 #endif
91 {
92  m_threadId = 0;
93  m_watchDogCounter = NULL;
94  m_jamBuffer = (EmulatedJamBuffer *)NdbThread_GetTlsKey(NDB_THREAD_TLS_JAM);
95  NewVarRef = 0;
96 
97  SimulatedBlock* mainBlock = globalData.getBlock(blockNumber);
98 
99  if (theInstance == 0) {
100  ndbrequire(mainBlock == 0);
101  mainBlock = this;
102  globalData.setBlock(blockNumber, mainBlock);
103  } else {
104  ndbrequire(mainBlock != 0);
105  mainBlock->addInstance(this, theInstance);
106  }
107  theMainInstance = mainBlock;
108 
109  c_fragmentIdCounter = 1;
110  c_fragSenderRunning = false;
111 
112 #ifdef VM_TRACE_TIME
113  clearTimes();
114 #endif
115 
116  for(GlobalSignalNumber i = 0; i<=MAX_GSN; i++)
117  theExecArray[i] = 0;
118 
119  installSimulatedBlockFunctions();
120 
121  m_callbackTableAddr = 0;
122 
123  CLEAR_ERROR_INSERT_VALUE;
124 
125 #ifdef VM_TRACE
126  m_global_variables = new Ptr<void> * [1];
127  m_global_variables[0] = 0;
128  m_global_variables_save = 0;
129 #endif
130 }
131 
132 void
133 SimulatedBlock::addInstance(SimulatedBlock* b, Uint32 theInstance)
134 {
135  ndbrequire(theMainInstance == this);
136  ndbrequire(number() == b->number());
137  if (theInstanceList == 0)
138  {
139  theInstanceList = new SimulatedBlock* [MaxInstances];
140  ndbrequire(theInstanceList != 0);
141  for (Uint32 i = 0; i < MaxInstances; i++)
142  theInstanceList[i] = 0;
143  }
144  ndbrequire(theInstance < MaxInstances);
145  ndbrequire(theInstanceList[theInstance] == 0);
146  theInstanceList[theInstance] = b;
147 }
148 
149 void
150 SimulatedBlock::initCommon()
151 {
152  Uint32 count = 10;
153  this->getParam("FragmentSendPool", &count);
154  c_fragmentSendPool.setSize(count);
155 
156  count = 10;
157  this->getParam("FragmentInfoPool", &count);
158  c_fragmentInfoPool.setSize(count);
159 
160  count = 10;
161  this->getParam("FragmentInfoHash", &count);
162  c_fragmentInfoHash.setSize(count);
163 
164  count = 5;
165  this->getParam("ActiveMutexes", &count);
166  c_mutexMgr.setSize(count);
167 
168  count = 5;
169  this->getParam("ActiveCounters", &count);
170  c_counterMgr.setSize(count);
171 
172  count = 5;
173  this->getParam("ActiveThreadSync", &count);
174  c_syncThreadPool.setSize(count);
175 }
176 
177 SimulatedBlock::~SimulatedBlock()
178 {
179  freeBat();
180 #ifdef VM_TRACE_TIME
181  printTimes(stdout);
182 #endif
183 
184 #ifdef VM_TRACE
185  delete [] m_global_variables;
186 #endif
187 
188  if (theInstanceList != 0) {
189  Uint32 i;
190  for (i = 0; i < MaxInstances; i++)
191  delete theInstanceList[i];
192  delete [] theInstanceList;
193  }
194  theInstanceList = 0;
195 }
196 
197 void
198 SimulatedBlock::installSimulatedBlockFunctions(){
199  ExecFunction * a = theExecArray;
200  a[GSN_NODE_STATE_REP] = &SimulatedBlock::execNODE_STATE_REP;
201  a[GSN_CHANGE_NODE_STATE_REQ] = &SimulatedBlock::execCHANGE_NODE_STATE_REQ;
202  a[GSN_NDB_TAMPER] = &SimulatedBlock::execNDB_TAMPER;
203  a[GSN_SIGNAL_DROPPED_REP] = &SimulatedBlock::execSIGNAL_DROPPED_REP;
204  a[GSN_CONTINUE_FRAGMENTED]= &SimulatedBlock::execCONTINUE_FRAGMENTED;
205  a[GSN_STOP_FOR_CRASH]= &SimulatedBlock::execSTOP_FOR_CRASH;
206  a[GSN_UTIL_CREATE_LOCK_REF] = &SimulatedBlock::execUTIL_CREATE_LOCK_REF;
207  a[GSN_UTIL_CREATE_LOCK_CONF] = &SimulatedBlock::execUTIL_CREATE_LOCK_CONF;
208  a[GSN_UTIL_DESTROY_LOCK_REF] = &SimulatedBlock::execUTIL_DESTORY_LOCK_REF;
209  a[GSN_UTIL_DESTROY_LOCK_CONF] = &SimulatedBlock::execUTIL_DESTORY_LOCK_CONF;
210  a[GSN_UTIL_LOCK_REF] = &SimulatedBlock::execUTIL_LOCK_REF;
211  a[GSN_UTIL_LOCK_CONF] = &SimulatedBlock::execUTIL_LOCK_CONF;
212  a[GSN_UTIL_UNLOCK_REF] = &SimulatedBlock::execUTIL_UNLOCK_REF;
213  a[GSN_UTIL_UNLOCK_CONF] = &SimulatedBlock::execUTIL_UNLOCK_CONF;
214  a[GSN_FSOPENREF] = &SimulatedBlock::execFSOPENREF;
215  a[GSN_FSCLOSEREF] = &SimulatedBlock::execFSCLOSEREF;
216  a[GSN_FSWRITEREF] = &SimulatedBlock::execFSWRITEREF;
217  a[GSN_FSREADREF] = &SimulatedBlock::execFSREADREF;
218  a[GSN_FSREMOVEREF] = &SimulatedBlock::execFSREMOVEREF;
219  a[GSN_FSSYNCREF] = &SimulatedBlock::execFSSYNCREF;
220  a[GSN_FSAPPENDREF] = &SimulatedBlock::execFSAPPENDREF;
221  a[GSN_NODE_START_REP] = &SimulatedBlock::execNODE_START_REP;
222  a[GSN_API_START_REP] = &SimulatedBlock::execAPI_START_REP;
223  a[GSN_SEND_PACKED] = &SimulatedBlock::execSEND_PACKED;
224  a[GSN_CALLBACK_CONF] = &SimulatedBlock::execCALLBACK_CONF;
225  a[GSN_SYNC_THREAD_REQ] = &SimulatedBlock::execSYNC_THREAD_REQ;
226  a[GSN_SYNC_THREAD_CONF] = &SimulatedBlock::execSYNC_THREAD_CONF;
227  a[GSN_LOCAL_ROUTE_ORD] = &SimulatedBlock::execLOCAL_ROUTE_ORD;
228  a[GSN_SYNC_REQ] = &SimulatedBlock::execSYNC_REQ;
229  a[GSN_SYNC_PATH_REQ] = &SimulatedBlock::execSYNC_PATH_REQ;
230  a[GSN_SYNC_PATH_CONF] = &SimulatedBlock::execSYNC_PATH_CONF;
231 }
232 
233 void
234 SimulatedBlock::addRecSignalImpl(GlobalSignalNumber gsn,
235  ExecFunction f, bool force){
236  if(gsn > MAX_GSN || (!force && theExecArray[gsn] != 0)){
237  char errorMsg[255];
238  BaseString::snprintf(errorMsg, 255,
239  "GSN %d(%d))", gsn, MAX_GSN);
240  ERROR_SET(fatal, NDBD_EXIT_ILLEGAL_SIGNAL, errorMsg, errorMsg);
241  }
242  theExecArray[gsn] = f;
243 }
244 
245 void
246 SimulatedBlock::assignToThread(ThreadContext ctx)
247 {
248  m_threadId = ctx.threadId;
249  m_jamBuffer = ctx.jamBuffer;
250  m_watchDogCounter = ctx.watchDogCounter;
251  m_sectionPoolCache = ctx.sectionPoolCache;
252 }
253 
254 Uint32
255 SimulatedBlock::getInstanceKey(Uint32 tabId, Uint32 fragId)
256 {
257  Dbdih* dbdih = (Dbdih*)globalData.getBlock(DBDIH);
258  Uint32 instanceKey = dbdih->dihGetInstanceKey(tabId, fragId);
259  return instanceKey;
260 }
261 
262 Uint32
263 SimulatedBlock::getInstanceFromKey(Uint32 instanceKey)
264 {
265  Uint32 lqhWorkers = globalData.ndbMtLqhWorkers;
266  Uint32 instanceNo;
267  if (lqhWorkers == 0) {
268  instanceNo = 0;
269  } else {
270  assert(instanceKey != 0);
271  instanceNo = 1 + (instanceKey - 1) % lqhWorkers;
272  }
273  return instanceNo;
274 }
275 
276 void
277 SimulatedBlock::signal_error(Uint32 gsn, Uint32 len, Uint32 recBlockNo,
278  const char* filename, int lineno) const
279 {
280  char objRef[255];
281  BaseString::snprintf(objRef, 255, "%s:%d", filename, lineno);
282  char probData[255];
283  BaseString::snprintf(probData, 255,
284  "Signal (GSN: %d, Length: %d, Rec Block No: %d)",
285  gsn, len, recBlockNo);
286 
287  ErrorReporter::handleError(NDBD_EXIT_BLOCK_BNR_ZERO,
288  probData,
289  objRef);
290 }
291 
292 
293 extern class SectionSegmentPool g_sectionSegmentPool;
294 
295 #define check_sections(signal, cnt, cnt2) do { if (unlikely(cnt)) { handle_invalid_sections_in_send_signal(signal); } else if (unlikely(cnt2 == 0 && (signal->header.m_fragmentInfo != 0 && signal->header.m_fragmentInfo != 3))) { handle_invalid_fragmentInfo(signal); } } while(0)
296 
297 void
298 SimulatedBlock::handle_invalid_sections_in_send_signal(Signal* signal) const
299 {
300  //Uint32 cnt = signal->header.m_noOfSections;
301 #if defined VM_TRACE || defined ERROR_INSERT
302  ErrorReporter::handleError(NDBD_EXIT_BLOCK_BNR_ZERO,
303  "Unhandled sections in sendSignal",
304  "");
305 #else
306  infoEvent("Unhandled sections in sendSignal!!");
307 #endif
308 }
309 
310 void
311 SimulatedBlock::handle_lingering_sections_after_execute(Signal* signal) const
312 {
313  //Uint32 cnt = signal->header.m_noOfSections;
314 #if defined VM_TRACE || defined ERROR_INSERT
315  ErrorReporter::handleError(NDBD_EXIT_BLOCK_BNR_ZERO,
316  "Unhandled sections after execute",
317  "");
318 #else
319  infoEvent("Unhandled sections after execute");
320 #endif
321 }
322 
323 void
324 SimulatedBlock::handle_lingering_sections_after_execute(SectionHandle* handle) const
325 {
326  //Uint32 cnt = signal->header.m_noOfSections;
327 #if defined VM_TRACE || defined ERROR_INSERT
328  ErrorReporter::handleError(NDBD_EXIT_BLOCK_BNR_ZERO,
329  "Unhandled sections(handle) after execute",
330  "");
331 #else
332  infoEvent("Unhandled sections(handle) after execute");
333 #endif
334 }
335 
336 void
337 SimulatedBlock::handle_invalid_fragmentInfo(Signal* signal) const
338 {
339 #if defined VM_TRACE || defined ERROR_INSERT
340  ErrorReporter::handleError(NDBD_EXIT_BLOCK_BNR_ZERO,
341  "Incorrect header->m_fragmentInfo in sendSignal()",
342  "");
343 #else
344  signal->header.m_fragmentInfo = 0;
345  infoEvent("Incorrect header->m_fragmentInfo in sendSignal");
346 #endif
347 }
348 
349 void
350 SimulatedBlock::handle_out_of_longsignal_memory(Signal * signal) const
351 {
352  ErrorReporter::handleError(NDBD_EXIT_OUT_OF_LONG_SIGNAL_MEMORY,
353  "Out of LongMessageBuffer in sendSignal",
354  "");
355 }
356 
357 void
358 SimulatedBlock::handle_send_failed(SendStatus ss, Signal * signal) const
359 {
360  switch(ss){
361  case SEND_BUFFER_FULL:
362  ErrorReporter::handleError(NDBD_EXIT_GENERIC,
363  "Out of SendBufferMemory in sendSignal", "");
364  break;
365  case SEND_MESSAGE_TOO_BIG:
366  ErrorReporter::handleError(NDBD_EXIT_NDBREQUIRE,
367  "Message to big in sendSignal", "");
368  break;
369  case SEND_UNKNOWN_NODE:
370  ErrorReporter::handleError(NDBD_EXIT_NDBREQUIRE,
371  "Unknown node in sendSignal", "");
372  break;
373  case SEND_OK:
374  case SEND_BLOCKED:
375  case SEND_DISCONNECTED:
376  break;
377  }
378  ndbrequire(false);
379 }
380 
381 static void
382 linkSegments(Uint32 head, Uint32 tail){
383 
384  Ptr<SectionSegment> headPtr;
385  g_sectionSegmentPool.getPtr(headPtr, head);
386 
387  Ptr<SectionSegment> tailPtr;
388  g_sectionSegmentPool.getPtr(tailPtr, tail);
389 
390  Ptr<SectionSegment> oldTailPtr;
391  g_sectionSegmentPool.getPtr(oldTailPtr, headPtr.p->m_lastSegment);
392 
393  /* Can only efficiently link segments if linking to the end of a
394  * multiple-of-segment-size sized chunk
395  */
396  if ((headPtr.p->m_sz % NDB_SECTION_SEGMENT_SZ) != 0)
397  {
398 #if defined VM_TRACE || defined ERROR_INSERT
399  ErrorReporter::handleError(NDBD_EXIT_BLOCK_BNR_ZERO,
400  "Bad head segment size",
401  "");
402 #else
403  ndbout_c("linkSegments : Bad head segment size");
404 #endif
405  }
406 
407  headPtr.p->m_lastSegment = tailPtr.p->m_lastSegment;
408  headPtr.p->m_sz += tailPtr.p->m_sz;
409 
410  oldTailPtr.p->m_nextSegment = tailPtr.i;
411 }
412 
413 void
414 getSections(Uint32 secCount, SegmentedSectionPtr ptr[3]){
415  Uint32 tSec0 = ptr[0].i;
416  Uint32 tSec1 = ptr[1].i;
417  Uint32 tSec2 = ptr[2].i;
418  SectionSegment * p;
419  switch(secCount){
420  case 3:
421  p = g_sectionSegmentPool.getPtr(tSec2);
422  ptr[2].p = p;
423  ptr[2].sz = p->m_sz;
424  case 2:
425  p = g_sectionSegmentPool.getPtr(tSec1);
426  ptr[1].p = p;
427  ptr[1].sz = p->m_sz;
428  case 1:
429  p = g_sectionSegmentPool.getPtr(tSec0);
430  ptr[0].p = p;
431  ptr[0].sz = p->m_sz;
432  case 0:
433  return;
434  }
435  char msg[40];
436  sprintf(msg, "secCount=%d", secCount);
437  ErrorReporter::handleAssert(msg, __FILE__, __LINE__);
438 }
439 
440 void
441 getSection(SegmentedSectionPtr & ptr, Uint32 i){
442  ptr.i = i;
443  SectionSegment * p = g_sectionSegmentPool.getPtr(i);
444  ptr.p = p;
445  ptr.sz = p->m_sz;
446 }
447 
448 Uint32 getSectionSz(Uint32 id)
449 {
450  return g_sectionSegmentPool.getPtr(id)->m_sz;
451 }
452 
453 Uint32* getLastWordPtr(Uint32 id)
454 {
455  SectionSegment* first= g_sectionSegmentPool.getPtr(id);
456  SectionSegment* last= g_sectionSegmentPool.getPtr(first->m_lastSegment);
457  Uint32 offset= (first->m_sz -1) % SectionSegment::DataLength;
458  return &last->theData[offset];
459 }
460 
461 #ifdef NDBD_MULTITHREADED
462 #define SB_SP_ARG *m_sectionPoolCache,
463 #define SB_SP_REL_ARG f_section_lock, *m_sectionPoolCache,
464 #else
465 #define SB_SP_ARG
466 #define SB_SP_REL_ARG
467 #endif
468 
469 static
470 void
471 releaseSections(SPC_ARG Uint32 secCount, SegmentedSectionPtr ptr[3]){
472  Uint32 tSec0 = ptr[0].i;
473  Uint32 tSz0 = ptr[0].sz;
474  Uint32 tSec1 = ptr[1].i;
475  Uint32 tSz1 = ptr[1].sz;
476  Uint32 tSec2 = ptr[2].i;
477  Uint32 tSz2 = ptr[2].sz;
478  switch(secCount){
479  case 3:
480  g_sectionSegmentPool.releaseList(SPC_SEIZE_ARG
481  relSz(tSz2), tSec2,
482  ptr[2].p->m_lastSegment);
483  case 2:
484  g_sectionSegmentPool.releaseList(SPC_SEIZE_ARG
485  relSz(tSz1), tSec1,
486  ptr[1].p->m_lastSegment);
487  case 1:
488  g_sectionSegmentPool.releaseList(SPC_SEIZE_ARG
489  relSz(tSz0), tSec0,
490  ptr[0].p->m_lastSegment);
491  case 0:
492  return;
493  }
494  char msg[40];
495  sprintf(msg, "secCount=%d", secCount);
496  ErrorReporter::handleAssert(msg, __FILE__, __LINE__);
497 }
498 
499 void
500 SimulatedBlock::sendSignal(BlockReference ref,
501  GlobalSignalNumber gsn,
502  Signal* signal,
503  Uint32 length,
504  JobBufferLevel jobBuffer) const {
505 
506  BlockReference sendBRef = reference();
507 
508  Uint32 noOfSections = signal->header.m_noOfSections;
509  Uint32 recBlock = refToBlock(ref);
510  Uint32 recNode = refToNode(ref);
511  Uint32 ourProcessor = globalData.ownId;
512 
513  signal->header.theLength = length;
514  signal->header.theVerId_signalNumber = gsn;
515  signal->header.theReceiversBlockNumber = recBlock;
516  signal->header.m_noOfSections = 0;
517 
518  check_sections(signal, noOfSections, 0);
519 
520  Uint32 tSignalId = signal->header.theSignalId;
521 
522  if ((length == 0) || length > 25 || (recBlock == 0)) {
523  signal_error(gsn, length, recBlock, __FILE__, __LINE__);
524  return;
525  }//if
526 #ifdef VM_TRACE
527  if(globalData.testOn){
528  Uint16 proc =
529  (recNode == 0 ? globalData.ownId : recNode);
530  signal->header.theSendersBlockRef = sendBRef;
531  globalSignalLoggers.sendSignal(signal->header,
532  jobBuffer,
533  &signal->theData[0],
534  proc);
535  }
536 #endif
537 
538  if(recNode == ourProcessor || recNode == 0) {
539  signal->header.theSendersSignalId = tSignalId;
540  signal->header.theSendersBlockRef = sendBRef;
541 #ifdef NDBD_MULTITHREADED
542  if (jobBuffer == JBB)
543  sendlocal(m_threadId, &signal->header, signal->theData, NULL);
544  else
545  sendprioa(m_threadId, &signal->header, signal->theData, NULL);
546 #else
547  globalScheduler.execute(signal, jobBuffer, recBlock,
548  gsn);
549 #endif
550  return;
551  } else {
552  // send distributed Signal
553  SignalHeader sh;
554 
555  Uint32 tTrace = signal->getTrace();
556 
557  sh.theVerId_signalNumber = gsn;
558  sh.theReceiversBlockNumber = recBlock;
559  sh.theSendersBlockRef = refToBlock(sendBRef);
560  sh.theLength = length;
561  sh.theTrace = tTrace;
562  sh.theSignalId = tSignalId;
563  sh.m_noOfSections = 0;
564  sh.m_fragmentInfo = 0;
565 
566 #ifdef TRACE_DISTRIBUTED
567  ndbout_c("send: %s(%d) to (%s, %d)",
568  getSignalName(gsn), gsn, getBlockName(recBlock),
569  recNode);
570 #endif
571 
572  SendStatus ss;
573 #ifdef NDBD_MULTITHREADED
574  ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
575  recNode, 0);
576 #else
577  ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
578  &signal->theData[0], recNode,
579  (LinearSectionPtr*)0);
580 #endif
581 
582  if (unlikely(! (ss == SEND_OK ||
583  ss == SEND_BLOCKED ||
584  ss == SEND_DISCONNECTED)))
585  {
586  handle_send_failed(ss, signal);
587  }
588  }
589  return;
590 }
591 
592 void
593 SimulatedBlock::sendSignal(NodeReceiverGroup rg,
594  GlobalSignalNumber gsn,
595  Signal* signal,
596  Uint32 length,
597  JobBufferLevel jobBuffer) const {
598 
599  Uint32 noOfSections = signal->header.m_noOfSections;
600  Uint32 tSignalId = signal->header.theSignalId;
601  Uint32 tTrace = signal->getTrace();
602 
603  Uint32 ourProcessor = globalData.ownId;
604  Uint32 recBlock = rg.m_block;
605 
606  signal->header.theLength = length;
607  signal->header.theVerId_signalNumber = gsn;
608  signal->header.theReceiversBlockNumber = recBlock;
609  signal->header.theSendersSignalId = tSignalId;
610  signal->header.theSendersBlockRef = reference();
611  signal->header.m_noOfSections = 0;
612 
613  check_sections(signal, noOfSections, 0);
614 
615  if ((length == 0) || (length > 25) || (recBlock == 0)) {
616  signal_error(gsn, length, recBlock, __FILE__, __LINE__);
617  return;
618  }//if
619 
620  SignalHeader sh;
621 
622  sh.theVerId_signalNumber = gsn;
623  sh.theReceiversBlockNumber = recBlock;
624  sh.theSendersBlockRef = refToBlock(reference());
625  sh.theLength = length;
626  sh.theTrace = tTrace;
627  sh.theSignalId = tSignalId;
628  sh.m_noOfSections = 0;
629  sh.m_fragmentInfo = 0;
630 
634  if(rg.m_nodes.get(0) || rg.m_nodes.get(ourProcessor)){
635 #ifdef VM_TRACE
636  if(globalData.testOn){
637  globalSignalLoggers.sendSignal(signal->header,
638  jobBuffer,
639  &signal->theData[0],
640  ourProcessor);
641  }
642 #endif
643 
644 #ifdef NDBD_MULTITHREADED
645  if (jobBuffer == JBB)
646  sendlocal(m_threadId, &signal->header, signal->theData, NULL);
647  else
648  sendprioa(m_threadId, &signal->header, signal->theData, NULL);
649 #else
650  globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
651 #endif
652 
653  rg.m_nodes.clear((Uint32)0);
654  rg.m_nodes.clear(ourProcessor);
655  }
656 
660  Uint32 recNode = 0;
661  while(!rg.m_nodes.isclear()){
662  recNode = rg.m_nodes.find(recNode + 1);
663  rg.m_nodes.clear(recNode);
664 #ifdef VM_TRACE
665  if(globalData.testOn){
666  globalSignalLoggers.sendSignal(signal->header,
667  jobBuffer,
668  &signal->theData[0],
669  recNode);
670  }
671 #endif
672 
673 #ifdef TRACE_DISTRIBUTED
674  ndbout_c("send: %s(%d) to (%s, %d)",
675  getSignalName(gsn), gsn, getBlockName(recBlock),
676  recNode);
677 #endif
678 
679  SendStatus ss;
680 #ifdef NDBD_MULTITHREADED
681  ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
682  recNode, 0);
683 #else
684  ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
685  &signal->theData[0], recNode,
686  (LinearSectionPtr*)0);
687 #endif
688 
689  if (unlikely(! (ss == SEND_OK ||
690  ss == SEND_BLOCKED ||
691  ss == SEND_DISCONNECTED)))
692  {
693  handle_send_failed(ss, signal);
694  }
695  }
696 
697  return;
698 }
699 
700 bool import(Ptr<SectionSegment> & first, const Uint32 * src, Uint32 len);
701 
702 void
703 SimulatedBlock::sendSignal(BlockReference ref,
704  GlobalSignalNumber gsn,
705  Signal* signal,
706  Uint32 length,
707  JobBufferLevel jobBuffer,
708  LinearSectionPtr ptr[3],
709  Uint32 noOfSections) const {
710 
711  BlockReference sendBRef = reference();
712 
713  Uint32 recBlock = refToBlock(ref);
714  Uint32 recNode = refToNode(ref);
715  Uint32 ourProcessor = globalData.ownId;
716 
717  check_sections(signal, signal->header.m_noOfSections, noOfSections);
718 
719  signal->header.theLength = length;
720  signal->header.theVerId_signalNumber = gsn;
721  signal->header.theReceiversBlockNumber = recBlock;
722  signal->header.m_noOfSections = noOfSections;
723 
724  Uint32 tSignalId = signal->header.theSignalId;
725  Uint32 tFragInfo = signal->header.m_fragmentInfo;
726 
727  if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) {
728  signal_error(gsn, length, recBlock, __FILE__, __LINE__);
729  return;
730  }//if
731 #ifdef VM_TRACE
732  if(globalData.testOn){
733  Uint16 proc =
734  (recNode == 0 ? globalData.ownId : recNode);
735  signal->header.theSendersBlockRef = sendBRef;
736  globalSignalLoggers.sendSignal(signal->header,
737  jobBuffer,
738  &signal->theData[0],
739  proc,
740  ptr, noOfSections);
741  }
742 #endif
743 
744  if(recNode == ourProcessor || recNode == 0) {
745  signal->header.theSendersSignalId = tSignalId;
746  signal->header.theSendersBlockRef = sendBRef;
747 
751  bool ok = true;
752  Ptr<SectionSegment> segptr[3];
753  for(Uint32 i = 0; i<noOfSections; i++){
754  ok &= ::import(SB_SP_ARG segptr[i], ptr[i].p, ptr[i].sz);
755  signal->theData[length+i] = segptr[i].i;
756  }
757 
758  if (unlikely(! ok))
759  {
760  handle_out_of_longsignal_memory(signal);
761  }
762 
763 #ifdef NDBD_MULTITHREADED
764  if (jobBuffer == JBB)
765  sendlocal(m_threadId, &signal->header, signal->theData,
766  signal->theData+length);
767  else
768  sendprioa(m_threadId, &signal->header, signal->theData,
769  signal->theData+length);
770 #else
771  globalScheduler.execute(signal, jobBuffer, recBlock,
772  gsn);
773 #endif
774  signal->header.m_noOfSections = 0;
775  return;
776  } else {
777  // send distributed Signal
778  SignalHeader sh;
779 
780  Uint32 tTrace = signal->getTrace();
781  Uint32 noOfSections = signal->header.m_noOfSections;
782 
783  sh.theVerId_signalNumber = gsn;
784  sh.theReceiversBlockNumber = recBlock;
785  sh.theSendersBlockRef = refToBlock(sendBRef);
786  sh.theLength = length;
787  sh.theTrace = tTrace;
788  sh.theSignalId = tSignalId;
789  sh.m_noOfSections = noOfSections;
790  sh.m_fragmentInfo = tFragInfo;
791 
792 #ifdef TRACE_DISTRIBUTED
793  ndbout_c("send: %s(%d) to (%s, %d)",
794  getSignalName(gsn), gsn, getBlockName(recBlock),
795  recNode);
796 #endif
797 
798  SendStatus ss;
799 #ifdef NDBD_MULTITHREADED
800  ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
801  recNode, ptr);
802 #else
803  ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
804  &signal->theData[0], recNode,
805  ptr);
806 #endif
807 
808  if (unlikely(! (ss == SEND_OK ||
809  ss == SEND_BLOCKED ||
810  ss == SEND_DISCONNECTED)))
811  {
812  handle_send_failed(ss, signal);
813  }
814  }
815 
816  signal->header.m_noOfSections = 0;
817  signal->header.m_fragmentInfo = 0;
818  return;
819 }
820 
821 void
822 SimulatedBlock::sendSignal(NodeReceiverGroup rg,
823  GlobalSignalNumber gsn,
824  Signal* signal,
825  Uint32 length,
826  JobBufferLevel jobBuffer,
827  LinearSectionPtr ptr[3],
828  Uint32 noOfSections) const {
829 
830  Uint32 tSignalId = signal->header.theSignalId;
831  Uint32 tTrace = signal->getTrace();
832  Uint32 tFragInfo = signal->header.m_fragmentInfo;
833 
834  Uint32 ourProcessor = globalData.ownId;
835  Uint32 recBlock = rg.m_block;
836 
837  check_sections(signal, signal->header.m_noOfSections, noOfSections);
838 
839  signal->header.theLength = length;
840  signal->header.theVerId_signalNumber = gsn;
841  signal->header.theReceiversBlockNumber = recBlock;
842  signal->header.theSendersSignalId = tSignalId;
843  signal->header.theSendersBlockRef = reference();
844  signal->header.m_noOfSections = noOfSections;
845 
846  if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) {
847  signal_error(gsn, length, recBlock, __FILE__, __LINE__);
848  return;
849  }//if
850 
851  SignalHeader sh;
852  sh.theVerId_signalNumber = gsn;
853  sh.theReceiversBlockNumber = recBlock;
854  sh.theSendersBlockRef = refToBlock(reference());
855  sh.theLength = length;
856  sh.theTrace = tTrace;
857  sh.theSignalId = tSignalId;
858  sh.m_noOfSections = noOfSections;
859  sh.m_fragmentInfo = tFragInfo;
860 
864  if(rg.m_nodes.get(0) || rg.m_nodes.get(ourProcessor)){
865 #ifdef VM_TRACE
866  if(globalData.testOn){
867  globalSignalLoggers.sendSignal(signal->header,
868  jobBuffer,
869  &signal->theData[0],
870  ourProcessor,
871  ptr, noOfSections);
872  }
873 #endif
874 
877  bool ok = true;
878  Ptr<SectionSegment> segptr[3];
879  for(Uint32 i = 0; i<noOfSections; i++){
880  ok &= ::import(SB_SP_ARG segptr[i], ptr[i].p, ptr[i].sz);
881  signal->theData[length+i] = segptr[i].i;
882  }
883 
884  if (unlikely(! ok))
885  {
886  handle_out_of_longsignal_memory(signal);
887  }
888 
889 #ifdef NDBD_MULTITHREADED
890  if (jobBuffer == JBB)
891  sendlocal(m_threadId, &signal->header, signal->theData,
892  signal->theData + length);
893  else
894  sendprioa(m_threadId, &signal->header, signal->theData,
895  signal->theData + length);
896 #else
897  globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
898 #endif
899 
900  rg.m_nodes.clear((Uint32)0);
901  rg.m_nodes.clear(ourProcessor);
902  }
903 
907  Uint32 recNode = 0;
908  while(!rg.m_nodes.isclear()){
909  recNode = rg.m_nodes.find(recNode + 1);
910  rg.m_nodes.clear(recNode);
911 
912 #ifdef VM_TRACE
913  if(globalData.testOn){
914  globalSignalLoggers.sendSignal(signal->header,
915  jobBuffer,
916  &signal->theData[0],
917  recNode,
918  ptr, noOfSections);
919  }
920 #endif
921 
922 #ifdef TRACE_DISTRIBUTED
923  ndbout_c("send: %s(%d) to (%s, %d)",
924  getSignalName(gsn), gsn, getBlockName(recBlock),
925  recNode);
926 #endif
927 
928  SendStatus ss;
929 #ifdef NDBD_MULTITHREADED
930  ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
931  recNode, ptr);
932 #else
933  ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
934  &signal->theData[0], recNode,
935  ptr);
936 #endif
937 
938  if (unlikely(! (ss == SEND_OK ||
939  ss == SEND_BLOCKED ||
940  ss == SEND_DISCONNECTED)))
941  {
942  handle_send_failed(ss, signal);
943  }
944  }
945 
946  signal->header.m_noOfSections = 0;
947  signal->header.m_fragmentInfo = 0;
948 
949  return;
950 }
951 
952 void
953 SimulatedBlock::sendSignal(BlockReference ref,
954  GlobalSignalNumber gsn,
955  Signal* signal,
956  Uint32 length,
957  JobBufferLevel jobBuffer,
958  SectionHandle* sections) const {
959 
960  Uint32 noOfSections = sections->m_cnt;
961  BlockReference sendBRef = reference();
962 
963  Uint32 recBlock = refToBlock(ref);
964  Uint32 recNode = refToNode(ref);
965  Uint32 ourProcessor = globalData.ownId;
966 
967  check_sections(signal, signal->header.m_noOfSections, noOfSections);
968 
969  signal->header.theLength = length;
970  signal->header.theVerId_signalNumber = gsn;
971  signal->header.theReceiversBlockNumber = recBlock;
972  signal->header.m_noOfSections = noOfSections;
973 
974  Uint32 tSignalId = signal->header.theSignalId;
975  Uint32 tFragInfo = signal->header.m_fragmentInfo;
976 
977  if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) {
978  signal_error(gsn, length, recBlock, __FILE__, __LINE__);
979  return;
980  }//if
981 #ifdef VM_TRACE
982  if(globalData.testOn){
983  Uint16 proc =
984  (recNode == 0 ? globalData.ownId : recNode);
985  signal->header.theSendersBlockRef = sendBRef;
986  globalSignalLoggers.sendSignal(signal->header,
987  jobBuffer,
988  &signal->theData[0],
989  proc,
990  sections->m_ptr, noOfSections);
991  }
992 #endif
993 
994  if(recNode == ourProcessor || recNode == 0) {
995  signal->header.theSendersSignalId = tSignalId;
996  signal->header.theSendersBlockRef = sendBRef;
997 
1001  Uint32 * dst = signal->theData + length;
1002  * dst ++ = sections->m_ptr[0].i;
1003  * dst ++ = sections->m_ptr[1].i;
1004  * dst ++ = sections->m_ptr[2].i;
1005 
1006 #ifdef NDBD_MULTITHREADED
1007  if (jobBuffer == JBB)
1008  sendlocal(m_threadId, &signal->header, signal->theData,
1009  signal->theData + length);
1010  else
1011  sendprioa(m_threadId, &signal->header, signal->theData,
1012  signal->theData + length);
1013 #else
1014  globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
1015 #endif
1016  } else {
1017  // send distributed Signal
1018  SignalHeader sh;
1019 
1020  Uint32 tTrace = signal->getTrace();
1021 
1022  sh.theVerId_signalNumber = gsn;
1023  sh.theReceiversBlockNumber = recBlock;
1024  sh.theSendersBlockRef = refToBlock(sendBRef);
1025  sh.theLength = length;
1026  sh.theTrace = tTrace;
1027  sh.theSignalId = tSignalId;
1028  sh.m_noOfSections = noOfSections;
1029  sh.m_fragmentInfo = tFragInfo;
1030 
1031 #ifdef TRACE_DISTRIBUTED
1032  ndbout_c("send: %s(%d) to (%s, %d)",
1033  getSignalName(gsn), gsn, getBlockName(recBlock),
1034  recNode);
1035 #endif
1036 
1037  SendStatus ss;
1038 #ifdef NDBD_MULTITHREADED
1039  ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
1040  recNode, &g_sectionSegmentPool, sections->m_ptr);
1041 #else
1042  ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
1043  &signal->theData[0], recNode,
1044  g_sectionSegmentPool,
1045  sections->m_ptr);
1046 #endif
1047 
1048  if (unlikely(! (ss == SEND_OK ||
1049  ss == SEND_BLOCKED ||
1050  ss == SEND_DISCONNECTED)))
1051  {
1052  handle_send_failed(ss, signal);
1053  }
1054 
1055  ::releaseSections(SB_SP_ARG noOfSections, sections->m_ptr);
1056  }
1057 
1058  signal->header.m_noOfSections = 0;
1059  signal->header.m_fragmentInfo = 0;
1060  sections->m_cnt = 0;
1061  return;
1062 }
1063 
1064 void
1065 SimulatedBlock::sendSignal(NodeReceiverGroup rg,
1066  GlobalSignalNumber gsn,
1067  Signal* signal,
1068  Uint32 length,
1069  JobBufferLevel jobBuffer,
1070  SectionHandle * sections) const {
1071 
1072  Uint32 noOfSections = sections->m_cnt;
1073  Uint32 tSignalId = signal->header.theSignalId;
1074  Uint32 tTrace = signal->getTrace();
1075  Uint32 tFragInfo = signal->header.m_fragmentInfo;
1076 
1077  Uint32 ourProcessor = globalData.ownId;
1078  Uint32 recBlock = rg.m_block;
1079 
1080  check_sections(signal, signal->header.m_noOfSections, noOfSections);
1081 
1082  signal->header.theLength = length;
1083  signal->header.theVerId_signalNumber = gsn;
1084  signal->header.theReceiversBlockNumber = recBlock;
1085  signal->header.theSendersSignalId = tSignalId;
1086  signal->header.theSendersBlockRef = reference();
1087  signal->header.m_noOfSections = noOfSections;
1088 
1089  if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) {
1090  signal_error(gsn, length, recBlock, __FILE__, __LINE__);
1091  return;
1092  }//if
1093 
1094  SignalHeader sh;
1095  sh.theVerId_signalNumber = gsn;
1096  sh.theReceiversBlockNumber = recBlock;
1097  sh.theSendersBlockRef = refToBlock(reference());
1098  sh.theLength = length;
1099  sh.theTrace = tTrace;
1100  sh.theSignalId = tSignalId;
1101  sh.m_noOfSections = noOfSections;
1102  sh.m_fragmentInfo = tFragInfo;
1103 
1107  bool release = true;
1108  if(rg.m_nodes.get(0) || rg.m_nodes.get(ourProcessor))
1109  {
1110  release = false;
1111 #ifdef VM_TRACE
1112  if(globalData.testOn){
1113  globalSignalLoggers.sendSignal(signal->header,
1114  jobBuffer,
1115  &signal->theData[0],
1116  ourProcessor,
1117  sections->m_ptr, noOfSections);
1118  }
1119 #endif
1120 
1123  Uint32 * dst = signal->theData + length;
1124  * dst ++ = sections->m_ptr[0].i;
1125  * dst ++ = sections->m_ptr[1].i;
1126  * dst ++ = sections->m_ptr[2].i;
1127 #ifdef NDBD_MULTITHREADED
1128  if (jobBuffer == JBB)
1129  sendlocal(m_threadId, &signal->header, signal->theData,
1130  signal->theData + length);
1131  else
1132  sendprioa(m_threadId, &signal->header, signal->theData,
1133  signal->theData + length);
1134 #else
1135  globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
1136 #endif
1137 
1138  rg.m_nodes.clear((Uint32)0);
1139  rg.m_nodes.clear(ourProcessor);
1140  }
1141 
1145  Uint32 recNode = 0;
1146  while(!rg.m_nodes.isclear()){
1147  recNode = rg.m_nodes.find(recNode + 1);
1148  rg.m_nodes.clear(recNode);
1149 
1150 #ifdef VM_TRACE
1151  if(globalData.testOn){
1152  globalSignalLoggers.sendSignal(signal->header,
1153  jobBuffer,
1154  &signal->theData[0],
1155  recNode,
1156  sections->m_ptr, noOfSections);
1157  }
1158 #endif
1159 
1160 #ifdef TRACE_DISTRIBUTED
1161  ndbout_c("send: %s(%d) to (%s, %d)",
1162  getSignalName(gsn), gsn, getBlockName(recBlock),
1163  recNode);
1164 #endif
1165 
1166  SendStatus ss;
1167 #ifdef NDBD_MULTITHREADED
1168  ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
1169  recNode, &g_sectionSegmentPool, sections->m_ptr);
1170 #else
1171  ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
1172  &signal->theData[0], recNode,
1173  g_sectionSegmentPool,
1174  sections->m_ptr);
1175 #endif
1176 
1177  if (unlikely(! (ss == SEND_OK ||
1178  ss == SEND_BLOCKED ||
1179  ss == SEND_DISCONNECTED)))
1180  {
1181  handle_send_failed(ss, signal);
1182  }
1183  }
1184 
1185  if (release)
1186  {
1187  ::releaseSections(SB_SP_ARG noOfSections, sections->m_ptr);
1188  }
1189 
1190  sections->m_cnt = 0;
1191  signal->header.m_noOfSections = 0;
1192  signal->header.m_fragmentInfo = 0;
1193 
1194  return;
1195 }
1196 
1197 void
1199  GlobalSignalNumber gsn,
1200  Signal* signal,
1201  Uint32 length,
1202  JobBufferLevel jobBuffer,
1203  SectionHandle* sections) const {
1204 
1211  Uint32 noOfSections = sections->m_cnt;
1212  BlockReference sendBRef = reference();
1213 
1214  Uint32 recBlock = refToBlock(ref);
1215  Uint32 recNode = refToNode(ref);
1216  Uint32 ourProcessor = globalData.ownId;
1217 
1218  check_sections(signal, signal->header.m_noOfSections, noOfSections);
1219 
1220  signal->header.theLength = length;
1221  signal->header.theVerId_signalNumber = gsn;
1222  signal->header.theReceiversBlockNumber = recBlock;
1223  signal->header.m_noOfSections = noOfSections;
1224 
1225  Uint32 tSignalId = signal->header.theSignalId;
1226  Uint32 tFragInfo = signal->header.m_fragmentInfo;
1227 
1228  if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) {
1229  signal_error(gsn, length, recBlock, __FILE__, __LINE__);
1230  return;
1231  }//if
1232 #ifdef VM_TRACE
1233  if(globalData.testOn){
1234  Uint16 proc =
1235  (recNode == 0 ? globalData.ownId : recNode);
1236  signal->header.theSendersBlockRef = sendBRef;
1237  globalSignalLoggers.sendSignal(signal->header,
1238  jobBuffer,
1239  &signal->theData[0],
1240  proc,
1241  sections->m_ptr, noOfSections);
1242  }
1243 #endif
1244 
1245  if(recNode == ourProcessor || recNode == 0) {
1246  signal->header.theSendersSignalId = tSignalId;
1247  signal->header.theSendersBlockRef = sendBRef;
1248 
1249  Uint32 * dst = signal->theData + length;
1250 
1251  /* We need to copy the segmented section data into separate
1252  * sections when sending locally and keeping a copy ourselves
1253  */
1254  for (Uint32 sec=0; sec < noOfSections; sec++)
1255  {
1256  Uint32 secCopy;
1257  bool ok= ::dupSection(SB_SP_ARG secCopy, sections->m_ptr[sec].i);
1258  ndbrequire (ok);
1259  * dst ++ = secCopy;
1260  }
1261 
1262 #ifdef NDBD_MULTITHREADED
1263  if (jobBuffer == JBB)
1264  sendlocal(m_threadId, &signal->header, signal->theData,
1265  signal->theData + length);
1266  else
1267  sendprioa(m_threadId, &signal->header, signal->theData,
1268  signal->theData + length);
1269 #else
1270  globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
1271 #endif
1272  } else {
1273  // send distributed Signal
1274  SignalHeader sh;
1275 
1276  Uint32 tTrace = signal->getTrace();
1277 
1278  sh.theVerId_signalNumber = gsn;
1279  sh.theReceiversBlockNumber = recBlock;
1280  sh.theSendersBlockRef = refToBlock(sendBRef);
1281  sh.theLength = length;
1282  sh.theTrace = tTrace;
1283  sh.theSignalId = tSignalId;
1284  sh.m_noOfSections = noOfSections;
1285  sh.m_fragmentInfo = tFragInfo;
1286 
1287 #ifdef TRACE_DISTRIBUTED
1288  ndbout_c("send: %s(%d) to (%s, %d)",
1289  getSignalName(gsn), gsn, getBlockName(recBlock),
1290  recNode);
1291 #endif
1292 
1293  SendStatus ss;
1294 #ifdef NDBD_MULTITHREADED
1295  ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
1296  recNode, &g_sectionSegmentPool, sections->m_ptr);
1297 #else
1298  ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
1299  &signal->theData[0], recNode,
1300  g_sectionSegmentPool,
1301  sections->m_ptr);
1302 #endif
1303 
1304  ndbrequire(ss == SEND_OK || ss == SEND_BLOCKED || ss == SEND_DISCONNECTED);
1305  }
1306 
1307  signal->header.m_noOfSections = 0;
1308  signal->header.m_fragmentInfo = 0;
1309  return;
1310 }
1311 
1312 void
1314  GlobalSignalNumber gsn,
1315  Signal* signal,
1316  Uint32 length,
1317  JobBufferLevel jobBuffer,
1318  SectionHandle * sections) const {
1325  Uint32 noOfSections = sections->m_cnt;
1326  Uint32 tSignalId = signal->header.theSignalId;
1327  Uint32 tTrace = signal->getTrace();
1328  Uint32 tFragInfo = signal->header.m_fragmentInfo;
1329 
1330  Uint32 ourProcessor = globalData.ownId;
1331  Uint32 recBlock = rg.m_block;
1332 
1333  check_sections(signal, signal->header.m_noOfSections, noOfSections);
1334 
1335  signal->header.theLength = length;
1336  signal->header.theVerId_signalNumber = gsn;
1337  signal->header.theReceiversBlockNumber = recBlock;
1338  signal->header.theSendersSignalId = tSignalId;
1339  signal->header.theSendersBlockRef = reference();
1340  signal->header.m_noOfSections = noOfSections;
1341 
1342  if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) {
1343  signal_error(gsn, length, recBlock, __FILE__, __LINE__);
1344  return;
1345  }//if
1346 
1347  SignalHeader sh;
1348  sh.theVerId_signalNumber = gsn;
1349  sh.theReceiversBlockNumber = recBlock;
1350  sh.theSendersBlockRef = refToBlock(reference());
1351  sh.theLength = length;
1352  sh.theTrace = tTrace;
1353  sh.theSignalId = tSignalId;
1354  sh.m_noOfSections = noOfSections;
1355  sh.m_fragmentInfo = tFragInfo;
1356 
1360  if(rg.m_nodes.get(0) || rg.m_nodes.get(ourProcessor))
1361  {
1362 #ifdef VM_TRACE
1363  if(globalData.testOn){
1364  globalSignalLoggers.sendSignal(signal->header,
1365  jobBuffer,
1366  &signal->theData[0],
1367  ourProcessor,
1368  sections->m_ptr, noOfSections);
1369  }
1370 #endif
1371 
1372  Uint32 * dst = signal->theData + length;
1373 
1374  /* We need to copy the segmented section data into separate
1375  * sections when sending locally and keeping a copy ourselves
1376  */
1377  for (Uint32 sec=0; sec < noOfSections; sec++)
1378  {
1379  Uint32 secCopy;
1380  bool ok= ::dupSection(SB_SP_ARG secCopy, sections->m_ptr[sec].i);
1381  ndbrequire (ok);
1382  * dst ++ = secCopy;
1383  }
1384 
1385 #ifdef NDBD_MULTITHREADED
1386  if (jobBuffer == JBB)
1387  sendlocal(m_threadId, &signal->header, signal->theData,
1388  signal->theData + length);
1389  else
1390  sendprioa(m_threadId, &signal->header, signal->theData,
1391  signal->theData + length);
1392 #else
1393  globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
1394 #endif
1395 
1396  rg.m_nodes.clear((Uint32)0);
1397  rg.m_nodes.clear(ourProcessor);
1398  }
1399 
1403  Uint32 recNode = 0;
1404  while(!rg.m_nodes.isclear()){
1405  recNode = rg.m_nodes.find(recNode + 1);
1406  rg.m_nodes.clear(recNode);
1407 
1408 #ifdef VM_TRACE
1409  if(globalData.testOn){
1410  globalSignalLoggers.sendSignal(signal->header,
1411  jobBuffer,
1412  &signal->theData[0],
1413  recNode,
1414  sections->m_ptr, noOfSections);
1415  }
1416 #endif
1417 
1418 #ifdef TRACE_DISTRIBUTED
1419  ndbout_c("send: %s(%d) to (%s, %d)",
1420  getSignalName(gsn), gsn, getBlockName(recBlock),
1421  recNode);
1422 #endif
1423 
1424  SendStatus ss;
1425 #ifdef NDBD_MULTITHREADED
1426  ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
1427  recNode, &g_sectionSegmentPool, sections->m_ptr);
1428 #else
1429  ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
1430  &signal->theData[0], recNode,
1431  g_sectionSegmentPool,
1432  sections->m_ptr);
1433 #endif
1434 
1435  ndbrequire(ss == SEND_OK || ss == SEND_BLOCKED || ss == SEND_DISCONNECTED);
1436  }
1437 
1438  signal->header.m_noOfSections = 0;
1439  signal->header.m_fragmentInfo = 0;
1440 
1441  return;
1442 }
1443 
1444 
1445 void
1446 SimulatedBlock::sendSignalWithDelay(BlockReference ref,
1447  GlobalSignalNumber gsn,
1448  Signal* signal,
1449  Uint32 delayInMilliSeconds,
1450  Uint32 length) const {
1451 
1452  BlockNumber bnr = refToBlock(ref);
1453 
1454  check_sections(signal, signal->header.m_noOfSections, 0);
1455 
1456  signal->header.theLength = length;
1457  signal->header.theSendersSignalId = signal->header.theSignalId;
1458  signal->header.theVerId_signalNumber = gsn;
1459  signal->header.theReceiversBlockNumber = bnr;
1460  signal->header.theSendersBlockRef = reference();
1461 
1462 #ifdef VM_TRACE
1463  {
1464  if(globalData.testOn){
1465  globalSignalLoggers.sendSignalWithDelay(delayInMilliSeconds,
1466  signal->header,
1467  0,
1468  &signal->theData[0],
1469  globalData.ownId);
1470  }
1471  }
1472 #endif
1473 
1474 #ifdef NDBD_MULTITHREADED
1475  senddelay(m_threadId, &signal->header, delayInMilliSeconds);
1476 #else
1477  globalTimeQueue.insert(signal, bnr, gsn, delayInMilliSeconds);
1478 #endif
1479 
1480  // befor 2nd parameter to globalTimeQueue.insert
1481  // (Priority)theSendSig[sigIndex].jobBuffer
1482 }
1483 
1484 void
1485 SimulatedBlock::sendSignalWithDelay(BlockReference ref,
1486  GlobalSignalNumber gsn,
1487  Signal* signal,
1488  Uint32 delayInMilliSeconds,
1489  Uint32 length,
1490  SectionHandle * sections) const {
1491 
1492  Uint32 noOfSections = sections->m_cnt;
1493  BlockNumber bnr = refToBlock(ref);
1494 
1495  BlockReference sendBRef = reference();
1496 
1497  if (bnr == 0) {
1498  bnr_error();
1499  }//if
1500 
1501  check_sections(signal, signal->header.m_noOfSections, noOfSections);
1502 
1503  signal->header.theLength = length;
1504  signal->header.theSendersSignalId = signal->header.theSignalId;
1505  signal->header.theSendersBlockRef = sendBRef;
1506  signal->header.theVerId_signalNumber = gsn;
1507  signal->header.theReceiversBlockNumber = bnr;
1508  signal->header.m_noOfSections = noOfSections;
1509 
1510  Uint32 * dst = signal->theData + length;
1511  * dst ++ = sections->m_ptr[0].i;
1512  * dst ++ = sections->m_ptr[1].i;
1513  * dst ++ = sections->m_ptr[2].i;
1514 
1515 #ifdef VM_TRACE
1516  {
1517  if(globalData.testOn){
1518  globalSignalLoggers.sendSignalWithDelay(delayInMilliSeconds,
1519  signal->header,
1520  0,
1521  &signal->theData[0],
1522  globalData.ownId);
1523  }
1524  }
1525 #endif
1526 
1527 #ifdef NDBD_MULTITHREADED
1528  senddelay(m_threadId, &signal->header, delayInMilliSeconds);
1529 #else
1530  globalTimeQueue.insert(signal, bnr, gsn, delayInMilliSeconds);
1531 #endif
1532 
1533  signal->header.m_noOfSections = 0;
1534  signal->header.m_fragmentInfo = 0;
1535  sections->m_cnt = 0;
1536 }
1537 
1538 void
1540 {
1541  ::release(SB_SP_ARG ptr);
1542 }
1543 
1544 void
1545 SimulatedBlock::releaseSection(Uint32 firstSegmentIVal)
1546 {
1547  ::releaseSection(SB_SP_ARG firstSegmentIVal);
1548 }
1549 
1550 void
1551 SimulatedBlock::releaseSections(SectionHandle& handle)
1552 {
1553  ::releaseSections(SB_SP_ARG handle.m_cnt, handle.m_ptr);
1554  handle.m_cnt = 0;
1555 }
1556 
1557 bool
1558 SimulatedBlock::appendToSection(Uint32& firstSegmentIVal, const Uint32* src, Uint32 len)
1559 {
1560  return ::appendToSection(SB_SP_ARG firstSegmentIVal, src, len);
1561 }
1562 
1563 bool
1564 SimulatedBlock::import(Ptr<SectionSegment> & first, const Uint32 * src, Uint32 len)
1565 {
1566  return ::import(SB_SP_ARG first, src, len);
1567 }
1568 
1569 bool
1570 SimulatedBlock::import(SegmentedSectionPtr& ptr, const Uint32* src, Uint32 len)
1571 {
1572  Ptr<SectionSegment> tmp;
1573  if (::import(SB_SP_ARG tmp, src, len))
1574  {
1575  ptr.i = tmp.i;
1576  ptr.p = tmp.p;
1577  ptr.sz = len;
1578  return true;
1579  }
1580  return false;
1581 }
1582 
1583 bool
1584 SimulatedBlock::dupSection(Uint32& copyFirstIVal, Uint32 srcFirstIVal)
1585 {
1586  return ::dupSection(SB_SP_ARG copyFirstIVal, srcFirstIVal);
1587 }
1588 
1589 bool
1590 SimulatedBlock::writeToSection(Uint32 firstSegmentIVal, Uint32 offset,
1591  const Uint32* src, Uint32 len)
1592 {
1593  return ::writeToSection(firstSegmentIVal, offset, src, len);
1594 }
1595 
1596 class SectionSegmentPool&
1597 SimulatedBlock::getSectionSegmentPool(){
1598  return g_sectionSegmentPool;
1599 }
1600 
1601 NewVARIABLE *
1602 SimulatedBlock::allocateBat(int batSize){
1603  NewVARIABLE* bat = NewVarRef;
1604  bat = (NewVARIABLE*)realloc(bat, batSize * sizeof(NewVARIABLE));
1605  NewVarRef = bat;
1606  theBATSize = batSize;
1607  return bat;
1608 }
1609 
1610 void
1611 SimulatedBlock::freeBat(){
1612  if(NewVarRef != 0){
1613  free(NewVarRef);
1614  NewVarRef = 0;
1615  }
1616 }
1617 
1618 const NewVARIABLE *
1619 SimulatedBlock::getBat(Uint16 blockNo, Uint32 instanceNo){
1620  assert(blockNo == blockToMain(blockNo));
1621  SimulatedBlock * sb = globalData.getBlock(blockNo);
1622  if (sb != 0 && instanceNo != 0)
1623  sb = sb->getInstance(instanceNo);
1624  if(sb == 0)
1625  return 0;
1626  return sb->NewVarRef;
1627 }
1628 
1629 Uint16
1630 SimulatedBlock::getBatSize(Uint16 blockNo, Uint32 instanceNo){
1631  assert(blockNo == blockToMain(blockNo));
1632  SimulatedBlock * sb = globalData.getBlock(blockNo);
1633  if (sb != 0 && instanceNo != 0)
1634  sb = sb->getInstance(instanceNo);
1635  if(sb == 0)
1636  return 0;
1637  return sb->theBATSize;
1638 }
1639 
1640 void* SimulatedBlock::allocRecord(const char * type, size_t s, size_t n, bool clear, Uint32 paramId)
1641 {
1642  return allocRecordAligned(type, s, n, 0, 0, clear, paramId);
1643 }
1644 
1645 void*
1646 SimulatedBlock::allocRecordAligned(const char * type, size_t s, size_t n, void **unaligned_buffer, Uint32 align, bool clear, Uint32 paramId)
1647 {
1648 
1649  void * p = NULL;
1650  Uint32 over_alloc = unaligned_buffer ? (align - 1) : 0;
1651  size_t size = n*s + over_alloc;
1652  Uint64 real_size = (Uint64)((Uint64)n)*((Uint64)s) + over_alloc;
1653  refresh_watch_dog(9);
1654  if (real_size > 0){
1655 #ifdef VM_TRACE_MEM
1656  ndbout_c("%s::allocRecord(%s, %u, %u) = %llu bytes",
1657  getBlockName(number()),
1658  type,
1659  s,
1660  n,
1661  real_size);
1662 #endif
1663  if( real_size == (Uint64)size )
1664  p = ndbd_malloc(size);
1665  if (p == NULL){
1666  char buf1[255];
1667  char buf2[255];
1668  struct ndb_mgm_param_info param_info;
1669  size_t size = sizeof(ndb_mgm_param_info);
1670 
1671  if(0 != paramId && 0 == ndb_mgm_get_db_parameter_info(paramId, &param_info, &size)) {
1672  BaseString::snprintf(buf1, sizeof(buf1), "%s could not allocate memory for parameter %s",
1673  getBlockName(number()), param_info.m_name);
1674  } else {
1675  BaseString::snprintf(buf1, sizeof(buf1), "%s could not allocate memory for %s",
1676  getBlockName(number()), type);
1677  }
1678  BaseString::snprintf(buf2, sizeof(buf2), "Requested: %ux%u = %llu bytes",
1679  (Uint32)s, (Uint32)n, (Uint64)real_size);
1680  ERROR_SET(fatal, NDBD_EXIT_MEMALLOC, buf1, buf2);
1681  }
1682 
1683  if(clear){
1684  char * ptr = (char*)p;
1685  const Uint32 chunk = 128 * 1024;
1686  while(size > chunk){
1687  refresh_watch_dog(9);
1688  memset(ptr, 0, chunk);
1689  ptr += chunk;
1690  size -= chunk;
1691  }
1692  refresh_watch_dog(9);
1693  memset(ptr, 0, size);
1694  }
1695  if (unaligned_buffer)
1696  {
1697  *unaligned_buffer = p;
1698  p = (void *)(((UintPtr)p + over_alloc) & ~(UintPtr)(over_alloc));
1699 #ifdef VM_TRACE
1700  g_eventLogger->info("'%s' (%u) %llu %llu, alignment correction %u bytes",
1701  type, align, (Uint64)p, (Uint64)p+n*s,
1702  (Uint32)((UintPtr)p - (UintPtr)*unaligned_buffer));
1703 #endif
1704  }
1705  }
1706  return p;
1707 }
1708 
1709 void
1711  const char * type, size_t s, size_t n){
1712  (void)type;
1713 
1714  if(* ptr != 0){
1715  ndbd_free(* ptr, n*s);
1716  * ptr = 0;
1717  }
1718 }
1719 
1720 int
1721 SimulatedBlock::sortchunks(const void * _e0, const void * _e1)
1722 {
1723  const AllocChunk *p0 = (const AllocChunk*)_e0;
1724  const AllocChunk *p1 = (const AllocChunk*)_e1;
1725 
1726  if (p0->ptrI > p1->ptrI)
1727  return 1;
1728  if (p0->ptrI < p1->ptrI)
1729  return -1;
1730  return 0;
1731 }
1732 
1733 Uint32
1734 SimulatedBlock::allocChunks(AllocChunk dst[],
1735  Uint32 arraysize,
1736  Uint32 rg,
1737  Uint32 pages,
1738  Uint32 paramId)
1739 {
1740  const Uint32 save = pages; // For fail
1741  Uint32 i = 0;
1742  for (; i<arraysize && pages > 0; i++)
1743  {
1744  Uint32 cnt = pages;
1745  m_ctx.m_mm.alloc_pages(rg, &dst[i].ptrI, &cnt, 1);
1746  if (unlikely(cnt == 0))
1747  goto fail;
1748  pages -= cnt;
1749  dst[i].cnt = cnt;
1750  }
1751  if (unlikely(pages != 0))
1752  goto fail;
1753 
1754  qsort(dst, i, sizeof(dst[0]), sortchunks);
1755  return i;
1756 
1757 fail:
1758  char buf1[255];
1759  char buf2[255];
1760  struct ndb_mgm_param_info param_info;
1761  size_t size = sizeof(ndb_mgm_param_info);
1762 
1763  if (ndb_mgm_get_db_parameter_info(paramId, &param_info, &size) != 0)
1764  {
1765  ndbassert(false);
1766  param_info.m_name = "<unknown>";
1767  }
1768 
1769  BaseString::snprintf(buf1, sizeof(buf1),
1770  "%s could not allocate memory for parameter %s",
1771  getBlockName(number()), param_info.m_name);
1772  BaseString::snprintf(buf2, sizeof(buf2), "Requested: %llu bytes",
1773  Uint64(save) * sizeof(GlobalPage));
1774  ERROR_SET(fatal, NDBD_EXIT_MEMALLOC, buf1, buf2);
1775  return 0;
1776 }
1777 
1778 void
1780 {
1781 #ifdef NDBD_MULTITHREADED
1782  (*m_watchDogCounter) = place;
1783 #else
1784  globalData.incrementWatchDogCounter(place);
1785 #endif
1786 }
1787 
1788 void
1789 SimulatedBlock::update_watch_dog_timer(Uint32 interval)
1790 {
1791  extern EmulatorData globalEmulatorData;
1792  globalEmulatorData.theWatchDog->setCheckInterval(interval);
1793 }
1794 
1795 void
1796 SimulatedBlock::progError(int line, int err_code, const char* extra) const {
1797  jamLine(line);
1798 
1799  const char *aBlockName = getBlockName(number(), "VM Kernel");
1800 
1801  // Pack status of interesting config variables
1802  // so that we can print them in error.log
1803  int magicStatus =
1804  (m_ctx.m_config.stopOnError()<<1) +
1805  (m_ctx.m_config.getInitialStart()<<2);
1806 
1807  /* Add line number to block name */
1808  char buf[100];
1809  BaseString::snprintf(&buf[0], 100, "%s (Line: %d) 0x%.8x",
1810  aBlockName, line, magicStatus);
1811 
1812  ErrorReporter::handleError(err_code, extra, buf);
1813 
1814 }
1815 
1816 void
1817 SimulatedBlock::infoEvent(const char * msg, ...) const {
1818  if(msg == 0)
1819  return;
1820 
1821  SignalT<25> signalT;
1822  signalT.theData[0] = NDB_LE_InfoEvent;
1823  char * buf = (char *)(signalT.theData+1);
1824 
1825  va_list ap;
1826  va_start(ap, msg);
1827  BaseString::vsnprintf(buf, 96, msg, ap); // 96 = 100 - 4
1828  va_end(ap);
1829 
1830  int len = strlen(buf) + 1;
1831  if(len > 96){
1832  len = 96;
1833  buf[95] = 0;
1834  }
1835 
1839  memset(&signalT.header, 0, sizeof(SignalHeader));
1840 
1841  const Signal * signal = globalScheduler.getVMSignals();
1842  Uint32 tTrace = signal->header.theTrace;
1843  Uint32 tSignalId = signal->header.theSignalId;
1844 
1845  signalT.header.theVerId_signalNumber = GSN_EVENT_REP;
1846  signalT.header.theReceiversBlockNumber = CMVMI;
1847  signalT.header.theSendersBlockRef = reference();
1848  signalT.header.theTrace = tTrace;
1849  signalT.header.theSignalId = tSignalId;
1850  signalT.header.theLength = ((len+3)/4)+1;
1851 
1852 #ifdef NDBD_MULTITHREADED
1853  sendlocal(m_threadId,
1854  &signalT.header, signalT.theData, signalT.m_sectionPtrI);
1855 #else
1856  globalScheduler.execute(&signalT.header, JBB, signalT.theData,
1857  signalT.m_sectionPtrI);
1858 #endif
1859 }
1860 
1861 void
1862 SimulatedBlock::warningEvent(const char * msg, ...) const {
1863  if(msg == 0)
1864  return;
1865 
1866  SignalT<25> signalT;
1867  signalT.theData[0] = NDB_LE_WarningEvent;
1868  char * buf = (char *)(signalT.theData+1);
1869 
1870  va_list ap;
1871  va_start(ap, msg);
1872  BaseString::vsnprintf(buf, 96, msg, ap); // 96 = 100 - 4
1873  va_end(ap);
1874 
1875  int len = strlen(buf) + 1;
1876  if(len > 96){
1877  len = 96;
1878  buf[95] = 0;
1879  }
1880 
1884  memset(&signalT.header, 0, sizeof(SignalHeader));
1885 
1886  const Signal * signal = globalScheduler.getVMSignals();
1887  Uint32 tTrace = signal->header.theTrace;
1888  Uint32 tSignalId = signal->header.theSignalId;
1889 
1890  signalT.header.theVerId_signalNumber = GSN_EVENT_REP;
1891  signalT.header.theReceiversBlockNumber = CMVMI;
1892  signalT.header.theSendersBlockRef = reference();
1893  signalT.header.theTrace = tTrace;
1894  signalT.header.theSignalId = tSignalId;
1895  signalT.header.theLength = ((len+3)/4)+1;
1896 
1897 #ifdef NDBD_MULTITHREADED
1898  sendlocal(m_threadId,
1899  &signalT.header, signalT.theData, signalT.m_sectionPtrI);
1900 #else
1901  globalScheduler.execute(&signalT.header, JBB, signalT.theData,
1902  signalT.m_sectionPtrI);
1903 #endif
1904 }
1905 
1906 void
1907 SimulatedBlock::execNODE_STATE_REP(Signal* signal){
1908  const NodeStateRep * const rep = (NodeStateRep *)&signal->theData[0];
1909 
1910  this->theNodeState = rep->nodeState;
1911 }
1912 
1913 void
1915  const ChangeNodeStateReq * const req =
1916  (ChangeNodeStateReq *)&signal->theData[0];
1917 
1918  this->theNodeState = req->nodeState;
1919  const Uint32 senderData = req->senderData;
1920  const BlockReference senderRef = req->senderRef;
1921 
1925  ChangeNodeStateConf * const conf =
1926  (ChangeNodeStateConf *)&signal->theData[0];
1927 
1928  conf->senderData = senderData;
1929 
1930  sendSignal(senderRef, GSN_CHANGE_NODE_STATE_CONF, signal,
1931  ChangeNodeStateConf::SignalLength, JBB);
1932 }
1933 
1934 void
1935 SimulatedBlock::execNDB_TAMPER(Signal * signal){
1936  if (signal->getLength() == 1)
1937  {
1938  SET_ERROR_INSERT_VALUE(signal->theData[0]);
1939  }
1940  else
1941  {
1942  SET_ERROR_INSERT_VALUE2(signal->theData[0], signal->theData[1]);
1943  }
1944 }
1945 
1946 void
1947 SimulatedBlock::execSIGNAL_DROPPED_REP(Signal * signal){
1948  /* Note no need for fragmented signal handling as we are
1949  * going to crash this node
1950  */
1951  char msg[64];
1952  const SignalDroppedRep * const rep = (SignalDroppedRep *)&signal->theData[0];
1953  BaseString::snprintf(msg, sizeof(msg), "%s GSN: %u (%u,%u)", getBlockName(number()),
1954  rep->originalGsn, rep->originalLength,rep->originalSectionCount);
1955  ErrorReporter::handleError(NDBD_EXIT_OUT_OF_LONG_SIGNAL_MEMORY,
1956  msg,
1957  __FILE__,
1958  NST_ErrorHandler);
1959 }
1960 
1961 void
1962 SimulatedBlock::execCONTINUE_FRAGMENTED(Signal * signal){
1963  ljamEntry();
1964 
1965  ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
1966  ndbrequire(signal->getSendersBlockRef() == reference()); /* Paranoia */
1967 
1968  switch (sig->type)
1969  {
1970  case ContinueFragmented::CONTINUE_SENDING :
1971  {
1972  ljam();
1973  Ptr<FragmentSendInfo> fragPtr;
1974 
1975  c_segmentedFragmentSendList.first(fragPtr);
1976  for(; !fragPtr.isNull();){
1977  ljam();
1978  Ptr<FragmentSendInfo> copyPtr = fragPtr;
1979  c_segmentedFragmentSendList.next(fragPtr);
1980 
1981  sendNextSegmentedFragment(signal, * copyPtr.p);
1982  if(copyPtr.p->m_status == FragmentSendInfo::SendComplete){
1983  ljam();
1984  if(copyPtr.p->m_callback.m_callbackFunction != 0) {
1985  ljam();
1986  execute(signal, copyPtr.p->m_callback, 0);
1987  }//if
1988  c_segmentedFragmentSendList.release(copyPtr);
1989  }
1990  }
1991 
1992  c_linearFragmentSendList.first(fragPtr);
1993  for(; !fragPtr.isNull();){
1994  ljam();
1995  Ptr<FragmentSendInfo> copyPtr = fragPtr;
1996  c_linearFragmentSendList.next(fragPtr);
1997 
1998  sendNextLinearFragment(signal, * copyPtr.p);
1999  if(copyPtr.p->m_status == FragmentSendInfo::SendComplete){
2000  ljam();
2001  if(copyPtr.p->m_callback.m_callbackFunction != 0) {
2002  ljam();
2003  execute(signal, copyPtr.p->m_callback, 0);
2004  }//if
2005  c_linearFragmentSendList.release(copyPtr);
2006  }
2007  }
2008 
2009  if(c_segmentedFragmentSendList.isEmpty() &&
2010  c_linearFragmentSendList.isEmpty()){
2011  ljam();
2012  c_fragSenderRunning = false;
2013  return;
2014  }
2015 
2016  sig->type = ContinueFragmented::CONTINUE_SENDING;
2017  sig->line = __LINE__;
2018  sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB);
2019  break;
2020  }
2021  case ContinueFragmented::CONTINUE_CLEANUP:
2022  {
2023  ljam();
2024 
2025  const Uint32 callbackWords = (sizeof(Callback) + 3) >> 2;
2026  /* Check length of signal */
2027  ndbassert(signal->getLength() ==
2028  ContinueFragmented::CONTINUE_CLEANUP_FIXED_WORDS +
2029  callbackWords);
2030 
2031  Callback cb;
2032  memcpy(&cb, &sig->cleanup.callbackStart, callbackWords << 2);
2033 
2034  doNodeFailureCleanup(signal,
2035  sig->cleanup.failedNodeId,
2036  sig->cleanup.resource,
2037  sig->cleanup.cursor,
2038  sig->cleanup.elementsCleaned,
2039  cb);
2040  break;
2041  }
2042  default:
2043  ndbrequire(false);
2044  }
2045 }
2046 
2047 void
2048 SimulatedBlock::execSTOP_FOR_CRASH(Signal* signal)
2049 {
2050 #ifdef NDBD_MULTITHREADED
2051  mt_execSTOP_FOR_CRASH();
2052 #endif
2053 }
2054 
2055 void
2056 SimulatedBlock::execNODE_START_REP(Signal* signal)
2057 {
2058 }
2059 
2060 void
2061 SimulatedBlock::execAPI_START_REP(Signal* signal)
2062 {
2063 }
2064 
2065 void
2066 SimulatedBlock::execSEND_PACKED(Signal* signal)
2067 {
2068 }
2069 
2070 // MT LQH callback CONF via signal
2071 
2073 SimulatedBlock::getCallbackEntry(Uint32 ci)
2074 {
2075  ndbrequire(m_callbackTableAddr != 0);
2076  const CallbackTable& ct = *m_callbackTableAddr;
2077  ndbrequire(ci < ct.m_count);
2078  return ct.m_entry[ci];
2079 }
2080 
2081 void
2082 SimulatedBlock::sendCallbackConf(Signal* signal, Uint32 fullBlockNo,
2083  CallbackPtr& cptr, Uint32 returnCode)
2084 {
2085  Uint32 blockNo = blockToMain(fullBlockNo);
2086  Uint32 instanceNo = blockToInstance(fullBlockNo);
2087  SimulatedBlock* b = globalData.getBlock(blockNo, instanceNo);
2088  ndbrequire(b != 0);
2089 
2090  const CallbackEntry& ce = b->getCallbackEntry(cptr.m_callbackIndex);
2091 
2092  // wl4391_todo add as arg if this is not enough
2093  Uint32 senderData = returnCode;
2094 
2095  if (!isNdbMtLqh()) {
2096  Callback c;
2097  c.m_callbackFunction = ce.m_function;
2098  c.m_callbackData = cptr.m_callbackData;
2099  b->execute(signal, c, returnCode);
2100 
2101  if (ce.m_flags & CALLBACK_ACK) {
2102  jam();
2103  CallbackAck* ack = (CallbackAck*)signal->getDataPtrSend();
2104  ack->senderData = senderData;
2105  EXECUTE_DIRECT(number(), GSN_CALLBACK_ACK,
2106  signal, CallbackAck::SignalLength);
2107  }
2108  } else {
2109  CallbackConf* conf = (CallbackConf*)signal->getDataPtrSend();
2110  conf->senderData = senderData;
2111  conf->senderRef = reference();
2112  conf->callbackIndex = cptr.m_callbackIndex;
2113  conf->callbackData = cptr.m_callbackData;
2114  conf->returnCode = returnCode;
2115 
2116  if (ce.m_flags & CALLBACK_DIRECT) {
2117  jam();
2118  EXECUTE_DIRECT(blockNo, GSN_CALLBACK_CONF,
2119  signal, CallbackConf::SignalLength, instanceNo);
2120  } else {
2121  jam();
2122  BlockReference ref = numberToRef(fullBlockNo, getOwnNodeId());
2123  sendSignal(ref, GSN_CALLBACK_CONF,
2124  signal, CallbackConf::SignalLength, JBB);
2125  }
2126  }
2127  cptr.m_callbackIndex = ZNIL;
2128 }
2129 
2130 void
2131 SimulatedBlock::execCALLBACK_CONF(Signal* signal)
2132 {
2133  const CallbackConf* conf = (const CallbackConf*)signal->getDataPtr();
2134 
2135  Uint32 senderData = conf->senderData;
2136  Uint32 senderRef = conf->senderRef;
2137 
2138  ndbrequire(m_callbackTableAddr != 0);
2139  const CallbackEntry& ce = getCallbackEntry(conf->callbackIndex);
2140  CallbackFunction function = ce.m_function;
2141 
2142  Callback callback;
2143  callback.m_callbackFunction = function;
2144  callback.m_callbackData = conf->callbackData;
2145  execute(signal, callback, conf->returnCode);
2146 
2147  if (ce.m_flags & CALLBACK_ACK) {
2148  jam();
2149  CallbackAck* ack = (CallbackAck*)signal->getDataPtrSend();
2150  ack->senderData = senderData;
2151  sendSignal(senderRef, GSN_CALLBACK_ACK,
2152  signal, CallbackAck::SignalLength, JBB);
2153  }
2154 }
2155 
2156 #ifdef VM_TRACE_TIME
2157 void
2158 SimulatedBlock::clearTimes() {
2159  for(Uint32 i = 0; i <= MAX_GSN; i++){
2160  m_timeTrace[i].cnt = 0;
2161  m_timeTrace[i].sum = 0;
2162  m_timeTrace[i].sub = 0;
2163  }
2164 }
2165 
2166 void
2167 SimulatedBlock::printTimes(FILE * output){
2168  fprintf(output, "-- %s --\n", getBlockName(number()));
2169  Uint64 sum = 0;
2170  for(Uint32 i = 0; i <= MAX_GSN; i++){
2171  Uint32 n = m_timeTrace[i].cnt;
2172  if(n != 0){
2173  double dn = n;
2174 
2175  double avg = m_timeTrace[i].sum;
2176  double avg2 = avg - m_timeTrace[i].sub;
2177 
2178  avg /= dn;
2179  avg2 /= dn;
2180 
2181  fprintf(output,
2182  //name ; cnt ; loc ; acc
2183  "%s ; #%d ; %dus ; %dus ; %dms\n",
2184  getSignalName(i), n, (Uint32)avg, (Uint32)avg2,
2185  (Uint32)((m_timeTrace[i].sum - m_timeTrace[i].sub + 500)/ 1000));
2186 
2187  sum += (m_timeTrace[i].sum - m_timeTrace[i].sub);
2188  }
2189  }
2190  sum = (sum + 500)/ 1000;
2191  fprintf(output, "-- %s : %u --\n", getBlockName(number()), (Uint32)sum);
2192  fprintf(output, "\n");
2193  fflush(output);
2194 }
2195 
2196 #endif
2197 
2198 SimulatedBlock::FragmentInfo::FragmentInfo(Uint32 fragId, Uint32 sender){
2199  m_fragmentId = fragId;
2200  m_senderRef = sender;
2201  m_sectionPtrI[0] = RNIL;
2202  m_sectionPtrI[1] = RNIL;
2203  m_sectionPtrI[2] = RNIL;
2204 }
2205 
2206 SimulatedBlock::FragmentSendInfo::FragmentSendInfo()
2207 {
2208 }
2209 
2210 bool
2212  Uint32 sigLen = signal->length() - 1;
2213  Uint32 fragId = signal->theData[sigLen];
2214  Uint32 fragInfo = signal->header.m_fragmentInfo;
2215  Uint32 senderRef = signal->getSendersBlockRef();
2216 
2217  Uint32 *sectionPtr = signal->m_sectionPtrI;
2218 
2219  if(fragInfo == 0){
2220  return true;
2221  }
2222 
2223  const Uint32 secs = signal->header.m_noOfSections;
2224  const Uint32 * const secNos = &signal->theData[sigLen - secs];
2225 
2226  if(fragInfo == 1){
2230  Ptr<FragmentInfo> fragPtr;
2231  if(!c_fragmentInfoHash.seize(fragPtr)){
2232  ndbrequire(false);
2233  return false;
2234  }
2235 
2236  new (fragPtr.p)FragmentInfo(fragId, senderRef);
2237  c_fragmentInfoHash.add(fragPtr);
2238 
2239  for(Uint32 i = 0; i<secs; i++){
2240  Uint32 sectionNo = secNos[i];
2241  ndbassert(sectionNo < 3);
2242  fragPtr.p->m_sectionPtrI[sectionNo] = sectionPtr[i];
2243  }
2244 
2245  ndbassert(! fragPtr.p->isDropped() );
2246 
2250  signal->header.m_fragmentInfo = 0;
2251  signal->header.m_noOfSections = 0;
2252  return false;
2253  }
2254 
2255  FragmentInfo key(fragId, senderRef);
2256  Ptr<FragmentInfo> fragPtr;
2257  if(c_fragmentInfoHash.find(fragPtr, key)){
2258 
2262  if ( likely(! fragPtr.p->isDropped()) )
2263  {
2264  Uint32 i;
2265  for(i = 0; i<secs; i++){
2266  Uint32 sectionNo = secNos[i];
2267  ndbassert(sectionNo < 3);
2268  Uint32 sectionPtrI = sectionPtr[i];
2269  if(fragPtr.p->m_sectionPtrI[sectionNo] != RNIL){
2270  linkSegments(fragPtr.p->m_sectionPtrI[sectionNo], sectionPtrI);
2271  } else {
2272  fragPtr.p->m_sectionPtrI[sectionNo] = sectionPtrI;
2273  }
2274  }
2275 
2279  if(fragInfo == 2){
2280  signal->header.m_fragmentInfo = 0;
2281  signal->header.m_noOfSections = 0;
2282  return false;
2283  }
2284 
2288  for(i = 0; i<3; i++){
2289  Uint32 ptrI = fragPtr.p->m_sectionPtrI[i];
2290  if(ptrI != RNIL){
2291  signal->m_sectionPtrI[i] = ptrI;
2292  } else {
2293  break;
2294  }
2295  }
2296 
2297  signal->setLength(sigLen - secs);
2298  signal->header.m_noOfSections = i;
2299  signal->header.m_fragmentInfo = 0;
2300 
2301  c_fragmentInfoHash.release(fragPtr);
2302  return true;
2303  }
2304  else
2305  {
2306  /* This fragmented signal has already had at least 1 fragment
2307  * dropped. We must release the received segments.
2308  */
2309  for (Uint32 i=0; i < secs; i++)
2310  releaseSection( sectionPtr[i] );
2311 
2312  signal->header.m_fragmentInfo = 0;
2313  signal->header.m_noOfSections = 0;
2314 
2315  /* FragInfo == 2
2316  * More fragments to come, keep waiting
2317  */
2318  if (fragInfo == 2)
2319  return false;
2320 
2321  /* FragInfo == 3
2322  * That was the last fragment.
2323  * We're now ready for handling the dropped signal.
2324  */
2325  SignalDroppedRep * rep = (SignalDroppedRep*)signal->theData;
2326  Uint32 gsn = signal->header.theVerId_signalNumber;
2327  Uint32 len = signal->header.theLength;
2328  Uint32 newLen= (len > 22 ? 22 : len);
2329  memmove(rep->originalData, signal->theData, (4 * newLen));
2330  rep->originalGsn = gsn;
2331  rep->originalLength = len;
2332  rep->originalSectionCount = 0;
2333  signal->header.theVerId_signalNumber = GSN_SIGNAL_DROPPED_REP;
2334  signal->header.theLength = newLen + 3;
2335  signal->header.m_noOfSections = 0;
2336  signal->header.m_fragmentInfo = 3;
2337 
2338 
2343  /* Perform dropped signal handling, in this thread, now */
2344  executeFunction(GSN_SIGNAL_DROPPED_REP, signal);
2345 
2346  /* return false to caller - they should not process the signal */
2347  return false;
2348  } // else (isDropped())
2349  }
2350 
2354  ndbrequire(false);
2355  return false;
2356 }
2357 
2358 bool
2360 {
2361  /* This method is called at the start of a SIGNAL_DROPPED_REP
2362  * handler when there is a chance that the dropped signal could
2363  * be part of a fragmented signal.
2364  * If the dropped signal was a fragmented signal, this
2365  * needs to be handled specially to ensure that fragments
2366  * of the signal are correctly dropped to avoid segment
2367  * leaks etc.
2368  * There are a number of cases :
2369  * 1) First fragment dropped (FragInfo=1)
2370  * All remaining fragments must be dropped when they
2371  * arrive. The Signal dropped report handler must be
2372  * executed when the last fragment has arrived.
2373  * 2) Middle fragment dropped (FragInfo=2)
2374  * Any existing stored segments must be released.
2375  * All remaining fragments must be dropped when they
2376  * arrive.
2377  * 3) Last fragment dropped (FragInfo=3)
2378  * Any existing stored segments must be released.
2379  * Signal Dropped handling can occur, so return true.
2380  *
2381  * To indicate that a fragment has been dropped for a signal,
2382  * all the section I Values in the fragment's hash entry are
2383  * set to RNIL.
2384  * Signal Dropped Report handling is performed when the last
2385  * fragment arrives. If the last fragment is not dropped
2386  * by the transporter layer then normal fragment assembly
2387  * arranges for dropped signal handling to occur.
2388  */
2389  Uint32 sigLen = signal->length() - 1;
2390  Uint32 fragId = signal->theData[sigLen];
2391  Uint32 fragInfo = signal->header.m_fragmentInfo;
2392  Uint32 senderRef = signal->getSendersBlockRef();
2393 
2394  if(fragInfo == 0){
2395  return true;
2396  }
2397 
2398  /* This method is for handling SIGNAL_DROPPED_REP only */
2399  ndbrequire(signal->header.theVerId_signalNumber == GSN_SIGNAL_DROPPED_REP);
2400  ndbrequire(signal->header.m_noOfSections == 0);
2401 
2402  if(fragInfo == 1){
2406  Ptr<FragmentInfo> fragPtr;
2407  if(!c_fragmentInfoHash.seize(fragPtr)){
2408  ndbrequire(false);
2409  return false;
2410  }
2411 
2412  new (fragPtr.p)FragmentInfo(fragId, senderRef);
2413  c_fragmentInfoHash.add(fragPtr);
2414 
2415  /* Mark entry in hash as belonging to dropped signal so subsequent
2416  * fragments can also be dropped
2417  */
2418  fragPtr.p->m_sectionPtrI[0]= RNIL;
2419  fragPtr.p->m_sectionPtrI[1]= RNIL;
2420  fragPtr.p->m_sectionPtrI[2]= RNIL;
2421 
2422  /* Wait for last fragment before SignalDroppedRep handling */
2423  signal->header.m_fragmentInfo = 0;
2424  return false;
2425  }
2426 
2427  FragmentInfo key(fragId, senderRef);
2428  Ptr<FragmentInfo> fragPtr;
2429  if(c_fragmentInfoHash.find(fragPtr, key)){
2430 
2434  if (! fragPtr.p->isDropped() )
2435  {
2436  /* Fragmented Signal not already marked as dropped
2437  * Need to free stored segments
2438  */
2439  releaseSection(fragPtr.p->m_sectionPtrI[0]);
2440  releaseSection(fragPtr.p->m_sectionPtrI[1]);
2441  releaseSection(fragPtr.p->m_sectionPtrI[2]);
2442 
2443  /* Mark as dropped now */
2444  fragPtr.p->m_sectionPtrI[0]= RNIL;
2445  fragPtr.p->m_sectionPtrI[1]= RNIL;
2446  fragPtr.p->m_sectionPtrI[2]= RNIL;
2447 
2448  ndbassert( fragPtr.p->isDropped() );
2449  }
2450 
2456  if(fragInfo == 2){
2457  signal->header.m_fragmentInfo = 0;
2458  return false;
2459  }
2460 
2467  signal->header.m_fragmentInfo = 0;
2468 
2469  c_fragmentInfoHash.release(fragPtr);
2470  return true;
2471  }
2472 
2476  ndbrequire(false);
2477  return false;
2478 }
2479 
2503 bool
2504 SimulatedBlock::doCleanupFragInfo(Uint32 failedNodeId,
2505  Uint32& cursor,
2506  Uint32& rtUnitsUsed,
2507  Uint32& elementsCleaned)
2508 {
2509  ljam();
2511 
2512  c_fragmentInfoHash.next(cursor, iter);
2513 
2514  const Uint32 startBucket = iter.bucket;
2515 
2516  while (!iter.isNull() &&
2517  (iter.bucket == startBucket))
2518  {
2519  ljam();
2520 
2521  Ptr<FragmentInfo> curr = iter.curr;
2522  c_fragmentInfoHash.next(iter);
2523 
2524  FragmentInfo* fragInfo = curr.p;
2525 
2526  if (refToNode(fragInfo->m_senderRef) == failedNodeId)
2527  {
2528  ljam();
2529  /* We were assembling a fragmented signal from the
2530  * failed node, discard the partially assembled
2531  * sections and free the FragmentInfo hash entry
2532  */
2533  for(Uint32 s = 0; s<3; s++)
2534  {
2535  if (fragInfo->m_sectionPtrI[s] != RNIL)
2536  {
2537  ljam();
2538  SegmentedSectionPtr ssptr;
2539  getSection(ssptr, fragInfo->m_sectionPtrI[s]);
2540  release(ssptr);
2541  }
2542  }
2543 
2544  /* Release FragmentInfo hash element */
2545  c_fragmentInfoHash.release(curr);
2546 
2547  elementsCleaned++;
2548  rtUnitsUsed+=3;
2549  }
2550 
2551  rtUnitsUsed++;
2552  } // while
2553 
2554  cursor = iter.bucket;
2555  return iter.isNull();
2556 }
2557 
2558 bool
2559 SimulatedBlock::doCleanupFragSend(Uint32 failedNodeId,
2560  Uint32& cursor,
2561  Uint32& rtUnitsUsed,
2562  Uint32& elementsCleaned)
2563 {
2564  ljam();
2565 
2566  Ptr<FragmentSendInfo> fragPtr;
2567  const Uint32 NumSendLists = 2;
2568  ndbrequire(cursor < NumSendLists);
2569 
2570  DLList<FragmentSendInfo>* fragSendLists[ NumSendLists ] =
2571  { &c_segmentedFragmentSendList,
2572  &c_linearFragmentSendList };
2573 
2574  DLList<FragmentSendInfo>* list = fragSendLists[ cursor ];
2575 
2576  list->first(fragPtr);
2577  for(; !fragPtr.isNull();){
2578  ljam();
2579  Ptr<FragmentSendInfo> copyPtr = fragPtr;
2580  list->next(fragPtr);
2581  rtUnitsUsed++;
2582 
2583  NodeReceiverGroup& rg = copyPtr.p->m_nodeReceiverGroup;
2584 
2585  if (rg.m_nodes.get(failedNodeId))
2586  {
2587  ljam();
2588  /* Fragmented signal is being sent to node */
2589  rg.m_nodes.clear(failedNodeId);
2590 
2591  if (rg.m_nodes.isclear())
2592  {
2593  ljam();
2594  /* No other nodes in receiver group - send
2595  * is cancelled
2596  * Will be cleaned up in the usual CONTINUE_FRAGMENTED
2597  * handling code.
2598  */
2599  copyPtr.p->m_status = FragmentSendInfo::SendCancelled;
2600  }
2601  elementsCleaned++;
2602  }
2603  }
2604 
2605  /* Next time we'll do the next list */
2606  cursor++;
2607 
2608  return (cursor == NumSendLists);
2609 }
2610 
2611 
2612 Uint32
2613 SimulatedBlock::doNodeFailureCleanup(Signal* signal,
2614  Uint32 failedNodeId,
2615  Uint32 resource,
2616  Uint32 cursor,
2617  Uint32 elementsCleaned,
2618  Callback& cb)
2619 {
2620  ljam();
2621  const bool userCallback = (cb.m_callbackFunction != 0);
2622  const Uint32 maxRtUnits = userCallback ?
2623 #ifdef VM_TRACE
2624  2 :
2625 #else
2626  16 :
2627 #endif
2628  ~0; /* Must complete all processing in this call */
2629 
2630  Uint32 rtUnitsUsed = 0;
2631 
2632  /* Loop over resources, cleaning them up */
2633  do
2634  {
2635  bool resourceDone= false;
2636  switch(resource) {
2637  case ContinueFragmented::RES_FRAGSEND:
2638  {
2639  ljam();
2640  resourceDone = doCleanupFragSend(failedNodeId, cursor,
2641  rtUnitsUsed, elementsCleaned);
2642  break;
2643  }
2644  case ContinueFragmented::RES_FRAGINFO:
2645  {
2646  ljam();
2647  resourceDone = doCleanupFragInfo(failedNodeId, cursor,
2648  rtUnitsUsed, elementsCleaned);
2649  break;
2650  }
2651  case ContinueFragmented::RES_LAST:
2652  {
2653  ljam();
2654  /* Node failure processing complete, execute user callback if provided */
2655  if (userCallback)
2656  execute(signal, cb, elementsCleaned);
2657 
2658  return elementsCleaned;
2659  }
2660  default:
2661  ndbrequire(false);
2662  }
2663 
2664  /* Did we complete cleaning up this resource? */
2665  if (resourceDone)
2666  {
2667  resource++;
2668  cursor= 0;
2669  }
2670 
2671  } while (rtUnitsUsed <= maxRtUnits);
2672 
2673  ljam();
2674 
2675  /* Not yet completed failure handling.
2676  * Must have exhausted RT units.
2677  * Update cursor and re-invoke
2678  */
2679  ndbassert(userCallback);
2680 
2681  /* Send signal to continue processing */
2682 
2683  ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
2684  sig->type = ContinueFragmented::CONTINUE_CLEANUP;
2685  sig->cleanup.failedNodeId = failedNodeId;
2686  sig->cleanup.resource = resource;
2687  sig->cleanup.cursor = cursor;
2688  sig->cleanup.elementsCleaned= elementsCleaned;
2689  Uint32 callbackWords = (sizeof(Callback) + 3) >> 2;
2690  Uint32 sigLen = ContinueFragmented::CONTINUE_CLEANUP_FIXED_WORDS +
2691  callbackWords;
2692  ndbassert(sigLen <= 25); // Should be STATIC_ASSERT
2693  memcpy(&sig->cleanup.callbackStart, &cb, callbackWords << 2);
2694 
2695  sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, sigLen, JBB);
2696 
2697  return elementsCleaned;
2698 }
2699 
2700 Uint32
2702  Uint32 failedNodeId,
2703  Callback& cb)
2704 {
2705  ljam();
2706  return doNodeFailureCleanup(signal, failedNodeId, 0, 0, 0, cb);
2707 }
2708 
2709 Uint32
2710 SimulatedBlock::debugPrintFragmentCounts()
2711 {
2712  const char* blockName = getBlockName(theNumber);
2714  Uint32 fragmentInfoCount = 0;
2715  c_fragmentInfoHash.first(iter);
2716 
2717  while(!iter.isNull())
2718  {
2719  fragmentInfoCount++;
2720  c_fragmentInfoHash.next(iter);
2721  }
2722 
2724  Uint32 linSendInfoCount = 0;
2725 
2726  c_linearFragmentSendList.first(ptr);
2727 
2728  while (!ptr.isNull())
2729  {
2730  linSendInfoCount++;
2731  c_linearFragmentSendList.next(ptr);
2732  }
2733 
2734  Uint32 segSendInfoCount = 0;
2735  c_segmentedFragmentSendList.first(ptr);
2736 
2737  while (!ptr.isNull())
2738  {
2739  segSendInfoCount++;
2740  c_segmentedFragmentSendList.next(ptr);
2741  }
2742 
2743  ndbout_c("%s : Fragment assembly hash entry count : %d",
2744  blockName,
2745  fragmentInfoCount);
2746 
2747  ndbout_c("%s : Linear fragment send list size : %d",
2748  blockName,
2749  linSendInfoCount);
2750 
2751  ndbout_c("%s : Segmented fragment send list size : %d",
2752  blockName,
2753  segSendInfoCount);
2754 
2755  return fragmentInfoCount +
2756  linSendInfoCount +
2757  segSendInfoCount;
2758 }
2759 
2760 
2761 bool
2763  NodeReceiverGroup rg,
2764  GlobalSignalNumber gsn,
2765  Signal* signal,
2766  Uint32 length,
2767  JobBufferLevel jbuf,
2768  SectionHandle* sections,
2769  bool noRelease,
2770  Uint32 messageSize) {
2771 
2772  Uint32 noSections = sections->m_cnt;
2773  SegmentedSectionPtr * ptr = sections->m_ptr;
2774 
2775  info.m_sectionPtr[0].m_segmented.i = RNIL;
2776  info.m_sectionPtr[1].m_segmented.i = RNIL;
2777  info.m_sectionPtr[2].m_segmented.i = RNIL;
2778 
2779  Uint32 totalSize = 0;
2780  switch(noSections){
2781  case 3:
2782  info.m_sectionPtr[2].m_segmented.i = ptr[2].i;
2783  info.m_sectionPtr[2].m_segmented.p = ptr[2].p;
2784  totalSize += ptr[2].sz;
2785  case 2:
2786  info.m_sectionPtr[1].m_segmented.i = ptr[1].i;
2787  info.m_sectionPtr[1].m_segmented.p = ptr[1].p;
2788  totalSize += ptr[1].sz;
2789  case 1:
2790  info.m_sectionPtr[0].m_segmented.i = ptr[0].i;
2791  info.m_sectionPtr[0].m_segmented.p = ptr[0].p;
2792  totalSize += ptr[0].sz;
2793  }
2794 
2795  if(totalSize <= messageSize + SectionSegment::DataLength){
2799  if (noRelease)
2800  sendSignalNoRelease(rg, gsn, signal, length, jbuf, sections);
2801  else
2802  sendSignal(rg, gsn, signal, length, jbuf, sections);
2803 
2804  info.m_status = FragmentSendInfo::SendComplete;
2805  return true;
2806  }
2807 
2811  info.m_status = FragmentSendInfo::SendNotComplete;
2812  info.m_prio = (Uint8)jbuf;
2813  info.m_gsn = gsn;
2814  info.m_fragInfo = 1;
2815  info.m_flags = 0;
2816  info.m_messageSize = messageSize;
2817  info.m_fragmentId = c_fragmentIdCounter++;
2818  info.m_nodeReceiverGroup = rg;
2819  info.m_callback.m_callbackFunction = 0;
2820 
2821  if (noRelease)
2822  {
2823  /* Record info that we are not releasing segments */
2824  info.m_flags|= FragmentSendInfo::SendNoReleaseSeg;
2825  }
2826  else
2827  {
2832  sections->m_cnt = 0;
2833  }
2834 
2835  /* Store main signal data in a segment for sending later */
2836  Ptr<SectionSegment> tmp;
2837  if(!import(tmp, &signal->theData[0], length))
2838  {
2839  handle_out_of_longsignal_memory(0);
2840  return false;
2841  }
2842  info.m_theDataSection.p = &tmp.p->theData[0];
2843  info.m_theDataSection.sz = length;
2844  tmp.p->theData[length] = tmp.i;
2845 
2846  sendNextSegmentedFragment(signal, info);
2847 
2848  if(c_fragmentIdCounter == 0){
2852  c_fragmentIdCounter = 1;
2853  }
2854 
2855  return true;
2856 }
2857 
2858 #if 0
2859 #define lsout(x) x
2860 #else
2861 #define lsout(x)
2862 #endif
2863 
2864 void
2866  FragmentSendInfo & info){
2867 
2868  if (unlikely(info.m_status == FragmentSendInfo::SendCancelled))
2869  {
2870  /* Send was cancelled - all dest. nodes have failed
2871  * since send was started
2872  */
2873  if (0 == (info.m_flags & FragmentSendInfo::SendNoReleaseSeg))
2874  {
2875  /*
2876  * Free any sections still to be sent
2877  */
2878  SectionHandle handle(this);
2879  for (Uint32 s = 0; s < 3; s++)
2880  {
2881  Uint32 sectionI = info.m_sectionPtr[s].m_segmented.i;
2882  if (sectionI != RNIL)
2883  {
2884  getSection(handle.m_ptr[handle.m_cnt], sectionI);
2885  info.m_sectionPtr[s].m_segmented.i = RNIL;
2886  info.m_sectionPtr[s].m_segmented.p = NULL;
2887  handle.m_cnt++;
2888  }
2889  }
2890 
2891  releaseSections(handle);
2892  }
2893 
2894  /* Free inline signal data storage section */
2895  Uint32 inlineDataI = info.m_theDataSection.p[info.m_theDataSection.sz];
2896  g_sectionSegmentPool.release(SB_SP_REL_ARG inlineDataI);
2897 
2898  info.m_status = FragmentSendInfo::SendComplete;
2899  return;
2900  }
2901 
2905  const Uint32 sigLen = info.m_theDataSection.sz;
2906  memcpy(&signal->theData[0], info.m_theDataSection.p, 4 * sigLen);
2907 
2908  Uint32 sz = 0;
2909  Uint32 maxSz = info.m_messageSize;
2910 
2911  Int32 secNo = 2;
2912  Uint32 secCount = 0;
2913  Uint32 * secNos = &signal->theData[sigLen];
2914 
2915  SectionHandle sections(this);
2916  SegmentedSectionPtr *ptr = sections.m_ptr;
2917 
2918  bool split= false;
2919  Uint32 splitSectionStartI= RNIL;
2920  SectionSegment* splitSectionStartP= NULL;
2921  Uint32 splitSectionLastSegment= RNIL;
2922  Uint32 splitSectionSz= 0;
2923 
2924  enum { Unknown = 0, Full = 1 } loop = Unknown;
2925  for(; secNo >= 0 && secCount < 3; secNo--){
2926  Uint32 ptrI = info.m_sectionPtr[secNo].m_segmented.i;
2927  if(ptrI == RNIL)
2928  continue;
2929 
2930  info.m_sectionPtr[secNo].m_segmented.i = RNIL;
2931 
2932  SectionSegment * ptrP = info.m_sectionPtr[secNo].m_segmented.p;
2933  const Uint32 size = ptrP->m_sz;
2934 
2935  ptr[secCount].i = ptrI;
2936  ptr[secCount].p = ptrP;
2937  ptr[secCount].sz = size;
2938  secNos[secCount] = secNo;
2939  secCount++;
2940 
2941  const Uint32 sizeLeft = maxSz - sz;
2942  if(size <= sizeLeft){
2946  sz += size;
2947  lsout(ndbout_c("section %d saved as %d", secNo, secCount-1));
2948  continue;
2949  }
2950 
2951  const Uint32 overflow = size - sizeLeft; // > 0
2952  if(overflow <= SectionSegment::DataLength){
2957  lsout(ndbout_c("section %d saved as %d but full over: %d",
2958  secNo, secCount-1, overflow));
2959  secNo--;
2960  break;
2961  }
2962 
2963  // size >= 61
2964  if(sizeLeft < SectionSegment::DataLength){
2969  secCount--;
2970  info.m_sectionPtr[secNo].m_segmented.i = ptrI;
2971  loop = Full;
2972  lsout(ndbout_c("section %d not saved", secNo));
2973  break;
2974  }
2975 
2983  // size >= 61 && sizeLeft >= 60
2984  Uint32 sum = SectionSegment::DataLength;
2985  Uint32 prevPtrI = ptrI;
2986  ptrI = ptrP->m_nextSegment;
2987  const Uint32 fill = sizeLeft - SectionSegment::DataLength;
2988  while(sum < fill){
2989  prevPtrI = ptrI;
2990  ptrP = g_sectionSegmentPool.getPtr(ptrI);
2991  ptrI = ptrP->m_nextSegment;
2992  sum += SectionSegment::DataLength;
2993  }
2994 
2995  Uint32 prev = secCount - 1;
3001  split= true;
3002  splitSectionStartI= ptr[prev].i;
3003  splitSectionStartP= ptr[prev].p;
3004  splitSectionLastSegment= splitSectionStartP->m_lastSegment;
3005  splitSectionSz= splitSectionStartP->m_sz;
3006 
3011  splitSectionStartP->m_lastSegment = prevPtrI;
3012  splitSectionStartP->m_sz = sum;
3013  ptr[prev].sz = sum;
3014 
3019  ptrP = g_sectionSegmentPool.getPtr(ptrI);
3020  ptrP->m_lastSegment = splitSectionLastSegment;
3021  ptrP->m_sz = size - sum;
3022 
3026  info.m_sectionPtr[secNo].m_segmented.i = ptrI;
3027  info.m_sectionPtr[secNo].m_segmented.p = ptrP;
3028 
3029  loop = Full;
3030  lsout(ndbout_c("section %d split into %d", secNo, prev));
3031  break;
3032  }
3033 
3034  lsout(ndbout_c("loop: %d secNo: %d secCount: %d sz: %d",
3035  loop, secNo, secCount, sz));
3036 
3040  secNos[secCount] = info.m_fragmentId;
3041 
3042  Uint32 fragInfo = info.m_fragInfo;
3043  info.m_fragInfo = 2;
3044  switch(loop){
3045  case Unknown:
3046  if(secNo >= 0){
3047  lsout(ndbout_c("Unknown - Full"));
3051  break;
3052  }
3053  // Fall through
3054  lsout(ndbout_c("Unknown - Done"));
3055  info.m_status = FragmentSendInfo::SendComplete;
3056  ndbassert(fragInfo == 2);
3057  fragInfo = 3;
3058  case Full:
3059  break;
3060  }
3061 
3062  signal->header.m_fragmentInfo = fragInfo;
3063  signal->header.m_noOfSections = 0;
3064  sections.m_cnt = secCount;
3065 
3066  if (info.m_flags & FragmentSendInfo::SendNoReleaseSeg)
3067  {
3068  sendSignalNoRelease(info.m_nodeReceiverGroup,
3069  info.m_gsn,
3070  signal,
3071  sigLen + secCount + 1,
3072  (JobBufferLevel)info.m_prio,
3073  &sections);
3074  /* NoRelease leaves SectionHandle populated, we'll
3075  * clear it here. The actual sections themselves
3076  * remain allocated.
3077  */
3078  sections.m_cnt = 0;
3079 
3080  if (split)
3081  {
3082  /* There was a split section, which required us to modify the
3083  * segment list.
3084  * Now restore the split section's segment list back to
3085  * its previous state
3086  * (Only really required for first segment, but we do
3087  * it for all of them, to be a good citizen)
3088  */
3089  ndbrequire( splitSectionStartI != RNIL );
3090  ndbrequire( splitSectionStartP != NULL );
3091  ndbrequire( splitSectionLastSegment != RNIL );
3092 
3093  splitSectionStartP->m_lastSegment= splitSectionLastSegment;
3094  splitSectionStartP->m_sz= splitSectionSz;
3095 
3096  /* Check our handiwork */
3097  assert(verifySection(splitSectionStartI));
3098  }
3099  }
3100  else
3101  {
3102  /* Normal, release sections case */
3103  sendSignal(info.m_nodeReceiverGroup,
3104  info.m_gsn,
3105  signal,
3106  sigLen + secCount + 1,
3107  (JobBufferLevel)info.m_prio,
3108  &sections);
3109  }
3110 
3111  if(fragInfo == 3){
3116  g_sectionSegmentPool.release(SB_SP_REL_ARG info.m_theDataSection.p[sigLen]);
3117  }
3118 }
3119 
3120 bool
3122  NodeReceiverGroup rg,
3123  GlobalSignalNumber gsn,
3124  Signal* signal,
3125  Uint32 length,
3126  JobBufferLevel jbuf,
3127  LinearSectionPtr ptr[3],
3128  Uint32 noOfSections,
3129  Uint32 messageSize){
3130 
3131  check_sections(signal, signal->header.m_noOfSections, noOfSections);
3132 
3133  info.m_sectionPtr[0].m_linear.p = NULL;
3134  info.m_sectionPtr[1].m_linear.p = NULL;
3135  info.m_sectionPtr[2].m_linear.p = NULL;
3136 
3137  Uint32 totalSize = 0;
3138  switch(noOfSections){
3139  case 3:
3140  info.m_sectionPtr[2].m_linear = ptr[2];
3141  totalSize += ptr[2].sz;
3142  case 2:
3143  info.m_sectionPtr[1].m_linear = ptr[1];
3144  totalSize += ptr[1].sz;
3145  case 1:
3146  info.m_sectionPtr[0].m_linear = ptr[0];
3147  totalSize += ptr[0].sz;
3148  }
3149 
3150  if(totalSize <= messageSize + SectionSegment::DataLength){
3154  sendSignal(rg, gsn, signal, length, jbuf, ptr, noOfSections);
3155  info.m_status = FragmentSendInfo::SendComplete;
3156 
3161  return true;
3162  }
3163 
3167  info.m_status = FragmentSendInfo::SendNotComplete;
3168  info.m_prio = (Uint8)jbuf;
3169  info.m_gsn = gsn;
3170  info.m_messageSize = messageSize;
3171  info.m_fragInfo = 1;
3172  info.m_flags = 0;
3173  info.m_fragmentId = c_fragmentIdCounter++;
3174  info.m_nodeReceiverGroup = rg;
3175  info.m_callback.m_callbackFunction = 0;
3176 
3177  Ptr<SectionSegment> tmp;
3178  if(unlikely(!import(tmp, &signal->theData[0], length)))
3179  {
3180  handle_out_of_longsignal_memory(0);
3181  return false;
3182  }
3183 
3184  info.m_theDataSection.p = &tmp.p->theData[0];
3185  info.m_theDataSection.sz = length;
3186  tmp.p->theData[length] = tmp.i;
3187 
3188  sendNextLinearFragment(signal, info);
3189 
3190  if(c_fragmentIdCounter == 0){
3194  c_fragmentIdCounter = 1;
3195  }
3196 
3197  return true;
3198 }
3199 
3200 void
3202  FragmentSendInfo & info){
3203 
3204  if (unlikely(info.m_status == FragmentSendInfo::SendCancelled))
3205  {
3206  /* Send was cancelled - all dest. nodes have failed
3207  * since send was started
3208  */
3209  /* Free inline signal data storage section */
3210  Uint32 inlineDataI = info.m_theDataSection.p[info.m_theDataSection.sz];
3211  g_sectionSegmentPool.release(SB_SP_REL_ARG inlineDataI);
3212 
3213  info.m_status = FragmentSendInfo::SendComplete;
3214  return;
3215  }
3216 
3220  const Uint32 sigLen = info.m_theDataSection.sz;
3221  memcpy(&signal->theData[0], info.m_theDataSection.p, 4 * sigLen);
3222 
3223  Uint32 sz = 0;
3224  Uint32 maxSz = info.m_messageSize;
3225 
3226  Int32 secNo = 2;
3227  Uint32 secCount = 0;
3228  Uint32 * secNos = &signal->theData[sigLen];
3229  LinearSectionPtr signalPtr[3];
3230 
3231  enum { Unknown = 0, Full = 2 } loop = Unknown;
3232  for(; secNo >= 0 && secCount < 3; secNo--){
3233  Uint32 * ptrP = info.m_sectionPtr[secNo].m_linear.p;
3234  if(ptrP == NULL)
3235  continue;
3236 
3237  info.m_sectionPtr[secNo].m_linear.p = NULL;
3238  const Uint32 size = info.m_sectionPtr[secNo].m_linear.sz;
3239 
3240  signalPtr[secCount].p = ptrP;
3241  signalPtr[secCount].sz = size;
3242  secNos[secCount] = secNo;
3243  secCount++;
3244 
3245  const Uint32 sizeLeft = maxSz - sz;
3246  if(size <= sizeLeft){
3250  sz += size;
3251  lsout(ndbout_c("section %d saved as %d", secNo, secCount-1));
3252  continue;
3253  }
3254 
3255  const Uint32 overflow = size - sizeLeft; // > 0
3256  if(overflow <= SectionSegment::DataLength){
3261  lsout(ndbout_c("section %d saved as %d but full over: %d",
3262  secNo, secCount-1, overflow));
3263  secNo--;
3264  break;
3265  }
3266 
3267  // size >= 61
3268  if(sizeLeft < SectionSegment::DataLength){
3273  secCount--;
3274  info.m_sectionPtr[secNo].m_linear.p = ptrP;
3275  loop = Full;
3276  lsout(ndbout_c("section %d not saved", secNo));
3277  break;
3278  }
3279 
3287  Uint32 sum = sizeLeft;
3288  sum /= SectionSegment::DataLength;
3289  sum *= SectionSegment::DataLength;
3290 
3294  Uint32 prev = secCount - 1;
3295  signalPtr[prev].sz = sum;
3296 
3300  info.m_sectionPtr[secNo].m_linear.p = ptrP + sum;
3301  info.m_sectionPtr[secNo].m_linear.sz = size - sum;
3302 
3303  loop = Full;
3304  lsout(ndbout_c("section %d split into %d", secNo, prev));
3305  break;
3306  }
3307 
3308  lsout(ndbout_c("loop: %d secNo: %d secCount: %d sz: %d",
3309  loop, secNo, secCount, sz));
3310 
3314  secNos[secCount] = info.m_fragmentId;
3315 
3316  Uint32 fragInfo = info.m_fragInfo;
3317  info.m_fragInfo = 2;
3318  switch(loop){
3319  case Unknown:
3320  if(secNo >= 0){
3321  lsout(ndbout_c("Unknown - Full"));
3325  break;
3326  }
3327  // Fall through
3328  lsout(ndbout_c("Unknown - Done"));
3329  info.m_status = FragmentSendInfo::SendComplete;
3330  ndbassert(fragInfo == 2);
3331  fragInfo = 3;
3332  case Full:
3333  break;
3334  }
3335 
3336  signal->header.m_noOfSections = 0;
3337  signal->header.m_fragmentInfo = fragInfo;
3338 
3339  sendSignal(info.m_nodeReceiverGroup,
3340  info.m_gsn,
3341  signal,
3342  sigLen + secCount + 1,
3343  (JobBufferLevel)info.m_prio,
3344  signalPtr,
3345  secCount);
3346 
3347  if(fragInfo == 3){
3351  g_sectionSegmentPool.release(SB_SP_REL_ARG info.m_theDataSection.p[sigLen]);
3352  }
3353 }
3354 
3355 void
3356 SimulatedBlock::sendFragmentedSignal(BlockReference ref,
3357  GlobalSignalNumber gsn,
3358  Signal* signal,
3359  Uint32 length,
3360  JobBufferLevel jbuf,
3361  SectionHandle* sections,
3362  Callback & c,
3363  Uint32 messageSize){
3364  bool res = true;
3366  res = c_segmentedFragmentSendList.seize(tmp);
3367  ndbrequire(res);
3368 
3369  res = sendFirstFragment(* tmp.p,
3370  NodeReceiverGroup(ref),
3371  gsn,
3372  signal,
3373  length,
3374  jbuf,
3375  sections,
3376  false, // Release sections on send
3377  messageSize);
3378  ndbrequire(res);
3379 
3380  if(tmp.p->m_status == FragmentSendInfo::SendComplete){
3381  c_segmentedFragmentSendList.release(tmp);
3382  if(c.m_callbackFunction != 0)
3383  execute(signal, c, 0);
3384  return;
3385  }
3386  tmp.p->m_callback = c;
3387 
3388  if(!c_fragSenderRunning)
3389  {
3390  SaveSignal<2> save(signal);
3391  c_fragSenderRunning = true;
3392  ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
3393  sig->type = ContinueFragmented::CONTINUE_SENDING;
3394  sig->line = __LINE__;
3395  sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB);
3396  }
3397 }
3398 
3399 void
3400 SimulatedBlock::sendFragmentedSignal(NodeReceiverGroup rg,
3401  GlobalSignalNumber gsn,
3402  Signal* signal,
3403  Uint32 length,
3404  JobBufferLevel jbuf,
3405  SectionHandle * sections,
3406  Callback & c,
3407  Uint32 messageSize){
3408  bool res = true;
3410  res = c_segmentedFragmentSendList.seize(tmp);
3411  ndbrequire(res);
3412 
3413  res = sendFirstFragment(* tmp.p,
3414  rg,
3415  gsn,
3416  signal,
3417  length,
3418  jbuf,
3419  sections,
3420  false, // Release sections on send
3421  messageSize);
3422  ndbrequire(res);
3423 
3424  if(tmp.p->m_status == FragmentSendInfo::SendComplete){
3425  c_segmentedFragmentSendList.release(tmp);
3426  if(c.m_callbackFunction != 0)
3427  execute(signal, c, 0);
3428  return;
3429  }
3430  tmp.p->m_callback = c;
3431 
3432  if(!c_fragSenderRunning)
3433  {
3434  SaveSignal<2> save(signal);
3435  c_fragSenderRunning = true;
3436  ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
3437  sig->type = ContinueFragmented::CONTINUE_SENDING;
3438  sig->line = __LINE__;
3439  sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB);
3440  }
3441 }
3442 
3443 SimulatedBlock::Callback SimulatedBlock::TheEmptyCallback = {0, 0};
3444 void
3445 SimulatedBlock::TheNULLCallbackFunction(class Signal*, Uint32, Uint32)
3446 { abort(); /* should never be called */ }
3447 SimulatedBlock::Callback SimulatedBlock::TheNULLCallback =
3448 { &SimulatedBlock::TheNULLCallbackFunction, 0 };
3449 
3450 void
3451 SimulatedBlock::sendFragmentedSignal(BlockReference ref,
3452  GlobalSignalNumber gsn,
3453  Signal* signal,
3454  Uint32 length,
3455  JobBufferLevel jbuf,
3456  LinearSectionPtr ptr[3],
3457  Uint32 noOfSections,
3458  Callback & c,
3459  Uint32 messageSize){
3460  bool res = true;
3462  res = c_linearFragmentSendList.seize(tmp);
3463  ndbrequire(res);
3464 
3465  res = sendFirstFragment(* tmp.p,
3466  NodeReceiverGroup(ref),
3467  gsn,
3468  signal,
3469  length,
3470  jbuf,
3471  ptr,
3472  noOfSections,
3473  messageSize);
3474  ndbrequire(res);
3475 
3476  if(tmp.p->m_status == FragmentSendInfo::SendComplete){
3477  c_linearFragmentSendList.release(tmp);
3478  if(c.m_callbackFunction != 0)
3479  execute(signal, c, 0);
3480  return;
3481  }
3482  tmp.p->m_callback = c;
3483 
3484  if(!c_fragSenderRunning)
3485  {
3486  SaveSignal<2> save(signal);
3487  c_fragSenderRunning = true;
3488  ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
3489  sig->type = ContinueFragmented::CONTINUE_SENDING;
3490  sig->line = __LINE__;
3491  sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB);
3492  }
3493 }
3494 
3495 void
3496 SimulatedBlock::sendFragmentedSignal(NodeReceiverGroup rg,
3497  GlobalSignalNumber gsn,
3498  Signal* signal,
3499  Uint32 length,
3500  JobBufferLevel jbuf,
3501  LinearSectionPtr ptr[3],
3502  Uint32 noOfSections,
3503  Callback & c,
3504  Uint32 messageSize){
3505  bool res = true;
3507  res = c_linearFragmentSendList.seize(tmp);
3508  ndbrequire(res);
3509 
3510  res = sendFirstFragment(* tmp.p,
3511  rg,
3512  gsn,
3513  signal,
3514  length,
3515  jbuf,
3516  ptr,
3517  noOfSections,
3518  messageSize);
3519  ndbrequire(res);
3520 
3521  if(tmp.p->m_status == FragmentSendInfo::SendComplete){
3522  c_linearFragmentSendList.release(tmp);
3523  if(c.m_callbackFunction != 0)
3524  execute(signal, c, 0);
3525  return;
3526  }
3527  tmp.p->m_callback = c;
3528 
3529  if(!c_fragSenderRunning)
3530  {
3531  SaveSignal<2> save(signal);
3532  c_fragSenderRunning = true;
3533  ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
3534  sig->type = ContinueFragmented::CONTINUE_SENDING;
3535  sig->line = __LINE__;
3536  sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB);
3537  }
3538 }
3539 
3540 NodeInfo &
3541 SimulatedBlock::setNodeInfo(NodeId nodeId) {
3542  ndbrequire(nodeId > 0 && nodeId < MAX_NODES);
3543  return globalData.m_nodeInfo[nodeId];
3544 }
3545 
3546 bool
3547 SimulatedBlock::isMultiThreaded()
3548 {
3549 #ifdef NDBD_MULTITHREADED
3550  return true;
3551 #else
3552  return false;
3553 #endif
3554 }
3555 
3556 
3557 void
3558 SimulatedBlock::execUTIL_CREATE_LOCK_REF(Signal* signal){
3559  ljamEntry();
3560  c_mutexMgr.execUTIL_CREATE_LOCK_REF(signal);
3561 }
3562 
3563 void SimulatedBlock::execUTIL_CREATE_LOCK_CONF(Signal* signal){
3564  ljamEntry();
3565  c_mutexMgr.execUTIL_CREATE_LOCK_CONF(signal);
3566 }
3567 
3568 void SimulatedBlock::execUTIL_DESTORY_LOCK_REF(Signal* signal){
3569  ljamEntry();
3570  c_mutexMgr.execUTIL_DESTORY_LOCK_REF(signal);
3571 }
3572 
3573 void SimulatedBlock::execUTIL_DESTORY_LOCK_CONF(Signal* signal){
3574  ljamEntry();
3575  c_mutexMgr.execUTIL_DESTORY_LOCK_CONF(signal);
3576 }
3577 
3578 void SimulatedBlock::execUTIL_LOCK_REF(Signal* signal){
3579  ljamEntry();
3580  c_mutexMgr.execUTIL_LOCK_REF(signal);
3581 }
3582 
3583 void SimulatedBlock::execUTIL_LOCK_CONF(Signal* signal){
3584  ljamEntry();
3585  c_mutexMgr.execUTIL_LOCK_CONF(signal);
3586 }
3587 
3588 void SimulatedBlock::execUTIL_UNLOCK_REF(Signal* signal){
3589  ljamEntry();
3590  c_mutexMgr.execUTIL_UNLOCK_REF(signal);
3591 }
3592 
3593 void SimulatedBlock::execUTIL_UNLOCK_CONF(Signal* signal){
3594  ljamEntry();
3595  c_mutexMgr.execUTIL_UNLOCK_CONF(signal);
3596 }
3597 
3598 void
3599 SimulatedBlock::ignoreMutexUnlockCallback(Signal* signal,
3600  Uint32 ptrI, Uint32 retVal){
3601  c_mutexMgr.release(ptrI);
3602 }
3603 
3604 void
3605 SimulatedBlock::fsRefError(Signal* signal, Uint32 line, const char *msg)
3606 {
3607  const FsRef *fsRef = (FsRef*)signal->getDataPtr();
3608  Uint32 errorCode = fsRef->errorCode;
3609  Uint32 osErrorCode = fsRef->osErrorCode;
3610  char msg2[100];
3611 
3612  sprintf(msg2, "%s: %s. OS errno: %u", getBlockName(number()), msg, osErrorCode);
3613 
3614  progError(line, errorCode, msg2);
3615 }
3616 
3617 void
3618 SimulatedBlock::execFSWRITEREF(Signal* signal)
3619 {
3620  fsRefError(signal, __LINE__, "File system write failed");
3621 }
3622 
3623 void
3624 SimulatedBlock::execFSREADREF(Signal* signal)
3625 {
3626  fsRefError(signal, __LINE__, "File system read failed");
3627 }
3628 
3629 void
3630 SimulatedBlock::execFSCLOSEREF(Signal* signal)
3631 {
3632  fsRefError(signal, __LINE__, "File system close failed");
3633 }
3634 
3635 void
3636 SimulatedBlock::execFSOPENREF(Signal* signal)
3637 {
3638  fsRefError(signal, __LINE__, "File system open failed");
3639 }
3640 
3641 void
3642 SimulatedBlock::execFSREMOVEREF(Signal* signal)
3643 {
3644  fsRefError(signal, __LINE__, "File system remove failed");
3645 }
3646 
3647 void
3648 SimulatedBlock::execFSSYNCREF(Signal* signal)
3649 {
3650  fsRefError(signal, __LINE__, "File system sync failed");
3651 }
3652 
3653 void
3654 SimulatedBlock::execFSAPPENDREF(Signal* signal)
3655 {
3656  fsRefError(signal, __LINE__, "File system append failed");
3657 }
3658 
3659 #ifdef VM_TRACE
3660 static Ptr<void> * m_empty_global_variables[] = { 0 };
3661 void
3662 SimulatedBlock::disable_global_variables()
3663 {
3664  m_global_variables_save = m_global_variables;
3665  m_global_variables = m_empty_global_variables;
3666 }
3667 
3668 void
3669 SimulatedBlock::enable_global_variables()
3670 {
3671  if (m_global_variables == m_empty_global_variables)
3672  {
3673  m_global_variables = m_global_variables_save;
3674  }
3675 }
3676 
3677 void
3678 SimulatedBlock::clear_global_variables(){
3679  Ptr<void> ** tmp = m_global_variables;
3680  while(* tmp != 0){
3681  (* tmp)->i = RNIL;
3682  (* tmp)->p = 0;
3683  tmp++;
3684  }
3685 }
3686 
3687 void
3688 SimulatedBlock::init_globals_list(void ** tmp, size_t cnt){
3689  m_global_variables = new Ptr<void> * [cnt+1];
3690  for(size_t i = 0; i<cnt; i++){
3691  m_global_variables[i] = (Ptr<void>*)tmp[i];
3692  }
3693  m_global_variables[cnt] = 0;
3694 }
3695 
3696 #endif
3697 
3698 #include "KeyDescriptor.hpp"
3699 
3700 Uint32
3701 SimulatedBlock::xfrm_key(Uint32 tab, const Uint32* src,
3702  Uint32 *dst, Uint32 dstSize,
3703  Uint32 keyPartLen[MAX_ATTRIBUTES_IN_INDEX]) const
3704 {
3705  const KeyDescriptor * desc = g_key_descriptor_pool.getPtr(tab);
3706  const Uint32 noOfKeyAttr = desc->noOfKeyAttr;
3707 
3708  Uint32 i = 0;
3709  Uint32 srcPos = 0;
3710  Uint32 dstPos = 0;
3711  while (i < noOfKeyAttr)
3712  {
3713  const KeyDescriptor::KeyAttr& keyAttr = desc->keyAttr[i];
3714  Uint32 dstWords =
3715  xfrm_attr(keyAttr.attributeDescriptor, keyAttr.charsetInfo,
3716  src, srcPos, dst, dstPos, dstSize);
3717  keyPartLen[i++] = dstWords;
3718  if (unlikely(dstWords == 0))
3719  return 0;
3720  }
3721 
3722  if (0)
3723  {
3724  for(Uint32 i = 0; i<dstPos; i++)
3725  {
3726  printf("%.8x ", dst[i]);
3727  }
3728  printf("\n");
3729  }
3730  return dstPos;
3731 }
3732 
3733 Uint32
3734 SimulatedBlock::xfrm_attr(Uint32 attrDesc, CHARSET_INFO* cs,
3735  const Uint32* src, Uint32 & srcPos,
3736  Uint32* dst, Uint32 & dstPos, Uint32 dstSize) const
3737 {
3738  Uint32 array =
3739  AttributeDescriptor::getArrayType(attrDesc);
3740  Uint32 srcBytes =
3741  AttributeDescriptor::getSizeInBytes(attrDesc);
3742 
3743  Uint32 srcWords = ~0;
3744  Uint32 dstWords = ~0;
3745  uchar* dstPtr = (uchar*)&dst[dstPos];
3746  const uchar* srcPtr = (const uchar*)&src[srcPos];
3747 
3748  if (cs == NULL)
3749  {
3750  jam();
3751  Uint32 len;
3752  LINT_INIT(len);
3753  switch(array){
3754  case NDB_ARRAYTYPE_SHORT_VAR:
3755  len = 1 + srcPtr[0];
3756  break;
3757  case NDB_ARRAYTYPE_MEDIUM_VAR:
3758  len = 2 + srcPtr[0] + (srcPtr[1] << 8);
3759  break;
3760 #ifndef VM_TRACE
3761  default:
3762  abort();
3763 #endif
3764  case NDB_ARRAYTYPE_FIXED:
3765  len = srcBytes;
3766  }
3767  srcWords = (len + 3) >> 2;
3768  dstWords = srcWords;
3769  memcpy(dstPtr, srcPtr, dstWords << 2);
3770 
3771  if (0)
3772  {
3773  ndbout_c("srcPos: %d dstPos: %d len: %d srcWords: %d dstWords: %d",
3774  srcPos, dstPos, len, srcWords, dstWords);
3775 
3776  for(Uint32 i = 0; i<srcWords; i++)
3777  printf("%.8x ", src[srcPos + i]);
3778  printf("\n");
3779  }
3780  }
3781  else
3782  {
3783  jam();
3784  Uint32 typeId =
3785  AttributeDescriptor::getType(attrDesc);
3786  Uint32 lb, len;
3787  bool ok = NdbSqlUtil::get_var_length(typeId, srcPtr, srcBytes, lb, len);
3788  if (unlikely(!ok))
3789  return 0;
3790  Uint32 xmul = cs->strxfrm_multiply;
3791  if (xmul == 0)
3792  xmul = 1;
3793  /*
3794  * Varchar end-spaces are ignored in comparisons. To get same hash
3795  * we blank-pad to maximum length via strnxfrm.
3796  */
3797  Uint32 dstLen = xmul * (srcBytes - lb);
3798  ndbrequire(dstLen <= ((dstSize - dstPos) << 2));
3799  int n = NdbSqlUtil::strnxfrm_bug7284(cs, dstPtr, dstLen, srcPtr + lb, len);
3800  if (unlikely(n == -1))
3801  return 0;
3802  while ((n & 3) != 0)
3803  {
3804  dstPtr[n++] = 0;
3805  }
3806  dstWords = (n >> 2);
3807  srcWords = (lb + len + 3) >> 2;
3808  }
3809 
3810  dstPos += dstWords;
3811  srcPos += srcWords;
3812  return dstWords;
3813 }
3814 
3815 Uint32
3816 SimulatedBlock::create_distr_key(Uint32 tableId,
3817  const Uint32 *src,
3818  Uint32* dst,
3819  const Uint32
3820  keyPartLen[MAX_ATTRIBUTES_IN_INDEX]) const
3821 {
3822  const KeyDescriptor* desc = g_key_descriptor_pool.getPtr(tableId);
3823  const Uint32 noOfKeyAttr = desc->noOfKeyAttr;
3824  Uint32 noOfDistrKeys = desc->noOfDistrKeys;
3825 
3826  Uint32 i = 0;
3827  Uint32 dstPos = 0;
3828 
3829  /* --Note that src and dst may be the same location-- */
3830 
3831  if(keyPartLen)
3832  {
3833  while (i < noOfKeyAttr && noOfDistrKeys)
3834  {
3835  Uint32 attr = desc->keyAttr[i].attributeDescriptor;
3836  Uint32 len = keyPartLen[i];
3837  if(AttributeDescriptor::getDKey(attr))
3838  {
3839  noOfDistrKeys--;
3840  memmove(dst+dstPos, src, len << 2);
3841  dstPos += len;
3842  }
3843  src += len;
3844  i++;
3845  }
3846  }
3847  else
3848  {
3849  while (i < noOfKeyAttr && noOfDistrKeys)
3850  {
3851  Uint32 attr = desc->keyAttr[i].attributeDescriptor;
3852  Uint32 len = AttributeDescriptor::getSizeInWords(attr);
3853  ndbrequire(AttributeDescriptor::getArrayType(attr) == NDB_ARRAYTYPE_FIXED);
3854  if(AttributeDescriptor::getDKey(attr))
3855  {
3856  noOfDistrKeys--;
3857  memmove(dst+dstPos, src, len << 2);
3858  dstPos += len;
3859  }
3860  src += len;
3861  i++;
3862  }
3863  }
3864  return dstPos;
3865 }
3866 
3867 CArray<KeyDescriptor> g_key_descriptor_pool;
3868 
3869 void
3871  Uint32 dst[],
3872  Uint32 dstcnt,
3873  Uint32 gsn,
3874  Signal * signal,
3875  Uint32 sigLen,
3876  JobBufferLevel prio,
3877  SectionHandle * userhandle)
3878 {
3879  ndbrequire(pathcnt > 0); // don't support (now) directly multi-cast
3880  pathcnt--; // first hop is made from here
3881 
3882 
3883  Uint32 len = LocalRouteOrd::StaticLen + (2 * pathcnt) + dstcnt;
3884  ndbrequire(len <= 25);
3885 
3886  SectionHandle handle(this, signal);
3887  if (userhandle)
3888  {
3889  ljam();
3890  handle.m_cnt = userhandle->m_cnt;
3891  for (Uint32 i = 0; i<handle.m_cnt; i++)
3892  handle.m_ptr[i] = userhandle->m_ptr[i];
3893  userhandle->m_cnt = 0;
3894  }
3895 
3896  if (len + sigLen > 25)
3897  {
3898  ljam();
3899 
3903  ndbrequire(handle.m_cnt < 3);
3904  handle.m_ptr[2] = handle.m_ptr[1];
3905  handle.m_ptr[1] = handle.m_ptr[0];
3906  Ptr<SectionSegment> tmp;
3907  if (unlikely(! import(tmp, signal->theData, sigLen)))
3908  {
3909  handle_out_of_longsignal_memory(0);
3910  }
3911  handle.m_ptr[0].p = tmp.p;
3912  handle.m_ptr[0].i = tmp.i;
3913  handle.m_ptr[0].sz = sigLen;
3914  handle.m_cnt ++;
3915  }
3916  else
3917  {
3918  ljam();
3919  memmove(signal->theData + len, signal->theData, 4 * sigLen);
3920  len += sigLen;
3921  }
3922 
3923  LocalRouteOrd * ord = (LocalRouteOrd*)signal->getDataPtrSend();
3924  ord->cnt = (pathcnt << 16) | (dstcnt);
3925  ord->gsn = gsn;
3926  ord->prio = Uint32(prio);
3927 
3928  Uint32 * dstptr = ord->path;
3929  for (Uint32 i = 1; i <= pathcnt; i++)
3930  {
3931  ndbrequire(refToNode(path[i].ref) == 0 ||
3932  refToNode(path[i].ref) == getOwnNodeId());
3933 
3934  * dstptr++ = path[i].ref;
3935  * dstptr++ = Uint32(path[i].prio);
3936  }
3937 
3938  for (Uint32 i = 0; i<dstcnt; i++)
3939  {
3940  ndbrequire(refToNode(dst[i]) == 0 ||
3941  refToNode(dst[i]) == getOwnNodeId());
3942 
3943  * dstptr++ = dst[i];
3944  }
3945 
3946  sendSignal(path[0].ref, GSN_LOCAL_ROUTE_ORD, signal, len,
3947  path[0].prio, &handle);
3948 }
3949 
3950 void
3952 {
3953  ljamEntry();
3954 
3955  if (!assembleFragments(signal))
3956  {
3957  ljam();
3958  return;
3959  }
3960 
3961  if (ERROR_INSERTED(1001))
3962  {
3966  ljam();
3967  SectionHandle handle(this, signal);
3968  sendSignalWithDelay(reference(), GSN_LOCAL_ROUTE_ORD, signal, 200,
3969  signal->getLength(), &handle);
3970  return;
3971  }
3972 
3973  LocalRouteOrd* ord = (LocalRouteOrd*)signal->getDataPtr();
3974  Uint32 pathcnt = ord->cnt >> 16;
3975  Uint32 dstcnt = ord->cnt & 0xFFFF;
3976  Uint32 sigLen = signal->getLength();
3977 
3978  if (pathcnt == 0)
3979  {
3983  ljam();
3984  Uint32 gsn = ord->gsn;
3985  Uint32 prio = ord->prio;
3986  memcpy(signal->theData+25, ord->path, 4*dstcnt);
3987  SectionHandle handle(this, signal);
3988  if (sigLen > LocalRouteOrd::StaticLen + dstcnt)
3989  {
3990  ljam();
3994  memmove(signal->theData,
3995  signal->theData + LocalRouteOrd::StaticLen + dstcnt,
3996  4 * (sigLen - (LocalRouteOrd::StaticLen + dstcnt)));
3997  sigLen = sigLen - (LocalRouteOrd::StaticLen + dstcnt);
3998  }
3999  else
4000  {
4001  ljam();
4005  sigLen = handle.m_ptr[0].sz;
4006  ndbrequire(sigLen <= 25);
4007  copy(signal->theData, handle.m_ptr[0]);
4008  release(handle.m_ptr[0]);
4009 
4010  for (Uint32 i = 0; i < handle.m_cnt - 1; i++)
4011  handle.m_ptr[i] = handle.m_ptr[i+1];
4012  handle.m_cnt--;
4013  }
4014 
4015  /*
4016  * The extra if-statement is as sendSignalNoRelease will copy sections
4017  * which is not necessary is only sending to one destination
4018  */
4019  if (dstcnt > 1)
4020  {
4021  jam();
4022  for (Uint32 i = 0; i<dstcnt; i++)
4023  {
4024  ljam();
4025  sendSignalNoRelease(signal->theData[25+i], gsn, signal, sigLen,
4026  JobBufferLevel(prio), &handle);
4027  }
4028  releaseSections(handle);
4029  }
4030  else
4031  {
4032  jam();
4033  sendSignal(signal->theData[25+0], gsn, signal, sigLen,
4034  JobBufferLevel(prio), &handle);
4035  }
4036  }
4037  else
4038  {
4042  ljam();
4043  SectionHandle handle(this, signal);
4044  Uint32 ref = ord->path[0];
4045  Uint32 prio = ord->path[1];
4046  Uint32 len = sigLen - 2;
4047  ord->cnt = ((pathcnt - 1) << 16) | dstcnt;
4048  memmove(ord->path, ord->path+2, 4 * (len - LocalRouteOrd::StaticLen));
4049  sendSignal(ref, GSN_LOCAL_ROUTE_ORD, signal, len,
4050  JobBufferLevel(prio), &handle);
4051  }
4052 }
4053 
4054 
4055 #ifdef VM_TRACE
4056 bool
4057 SimulatedBlock::debugOutOn()
4058 {
4059  SignalLoggerManager::LogMode mask = SignalLoggerManager::LogInOut;
4060  return
4061  globalData.testOn &&
4062  globalSignalLoggers.logMatch(number(), mask);
4063 }
4064 
4065 const char*
4066 SimulatedBlock::debugOutTag(char *buf, int line)
4067 {
4068  char blockbuf[40];
4069  char instancebuf[40];
4070  char linebuf[40];
4071  char timebuf[40];
4072  sprintf(blockbuf, "%s", getBlockName(number(), "UNKNOWN"));
4073  instancebuf[0] = 0;
4074  if (instance() != 0)
4075  sprintf(instancebuf, "/%u", instance());
4076  sprintf(linebuf, " %d", line);
4077  timebuf[0] = 0;
4078 #ifdef VM_TRACE_TIME
4079  {
4080  NDB_TICKS t = NdbTick_CurrentMillisecond();
4081  uint s = (t / 1000) % 3600;
4082  uint ms = t % 1000;
4083  sprintf(timebuf, " - %u.%03u -", s, ms);
4084  }
4085 #endif
4086  sprintf(buf, "%s%s%s%s ", blockbuf, instancebuf, linebuf, timebuf);
4087  return buf;
4088 }
4089 #endif
4090 
4091 void
4093  const Uint32 blocks[],
4094  const Callback & cb,
4095  JobBufferLevel prio)
4096 {
4097 #ifndef NDBD_MULTITHREADED
4098  Callback copy = cb;
4099  execute(signal, copy, 0);
4100 #else
4101  ljam();
4102  Uint32 ref[32]; // max threads
4103  Uint32 cnt = mt_get_thread_references_for_blocks(blocks, getThreadId(),
4104  ref, NDB_ARRAY_SIZE(ref));
4105  if (cnt == 0)
4106  {
4107  ljam();
4108  Callback copy = cb;
4109  execute(signal, copy, 0);
4110  return;
4111  }
4112 
4114  ndbrequire(c_syncThreadPool.seize(ptr));
4115  ptr.p->m_cnt = cnt;
4116  ptr.p->m_callback = cb;
4117 
4118  signal->theData[0] = reference();
4119  signal->theData[1] = ptr.i;
4120  signal->theData[2] = Uint32(prio);
4121  for (Uint32 i = 0; i<cnt; i++)
4122  {
4123  sendSignal(ref[i], GSN_SYNC_THREAD_REQ, signal, 3, prio);
4124  }
4125 #endif
4126 }
4127 
4128 void
4129 SimulatedBlock::execSYNC_THREAD_REQ(Signal* signal)
4130 {
4131  ljamEntry();
4132  Uint32 ref = signal->theData[0];
4133  Uint32 prio = signal->theData[2];
4134  sendSignal(ref, GSN_SYNC_THREAD_CONF, signal, signal->getLength(),
4135  JobBufferLevel(prio));
4136 }
4137 
4138 void
4139 SimulatedBlock::execSYNC_THREAD_CONF(Signal* signal)
4140 {
4141  ljamEntry();
4143  c_syncThreadPool.getPtr(ptr, signal->theData[1]);
4144  if (ptr.p->m_cnt == 1)
4145  {
4146  ljam();
4147  Callback copy = ptr.p->m_callback;
4148  c_syncThreadPool.release(ptr);
4149  execute(signal, copy, 0);
4150  return;
4151  }
4152  ptr.p->m_cnt --;
4153 }
4154 
4155 void
4156 SimulatedBlock::execSYNC_REQ(Signal* signal)
4157 {
4158  ljamEntry();
4159  Uint32 ref = signal->theData[0];
4160  Uint32 prio = signal->theData[2];
4161  sendSignal(ref, GSN_SYNC_CONF, signal, signal->getLength(),
4162  JobBufferLevel(prio));
4163 }
4164 
4165 void
4167  const Uint32 blocks[],
4168  const Callback & cb,
4169  JobBufferLevel prio)
4170 {
4171  ljam();
4172 
4173  // reuse SyncThreadRecord
4175  ndbrequire(c_syncThreadPool.seize(ptr));
4176  ptr.p->m_cnt = 0; // with count of 0
4177  ptr.p->m_callback = cb;
4178 
4179  SyncPathReq* req = CAST_PTR(SyncPathReq, signal->getDataPtrSend());
4180  req->senderData = ptr.i;
4181  req->prio = Uint32(prio);
4182  req->count = 1;
4183  if (blocks[0] == 0)
4184  {
4185  ljam();
4186  ndbrequire(false); // TODO
4187  }
4188  else
4189  {
4190  ljam();
4191  Uint32 len = 0;
4192  for (; blocks[len+1] != 0; len++)
4193  {
4194  req->path[len] = blocks[len+1];
4195  }
4196  req->pathlen = 1 + len;
4197  req->path[len] = reference();
4198  sendSignal(numberToRef(blocks[0], getOwnNodeId()),
4199  GSN_SYNC_PATH_REQ, signal,
4200  SyncPathReq::SignalLength + (1 + len), prio);
4201  }
4202 }
4203 
4204 void
4205 SimulatedBlock::execSYNC_PATH_REQ(Signal* signal)
4206 {
4207  ljamEntry();
4208  SyncPathReq * req = CAST_PTR(SyncPathReq, signal->getDataPtrSend());
4209  if (req->pathlen == 1)
4210  {
4211  ljam();
4212  SyncPathReq copy = *req;
4213  SyncPathConf* conf = CAST_PTR(SyncPathConf, signal->getDataPtrSend());
4214  conf->senderData = copy.senderData;
4215  conf->count = copy.count;
4216  sendSignal(copy.path[0], GSN_SYNC_PATH_CONF, signal,
4217  SyncPathConf::SignalLength, JobBufferLevel(copy.prio));
4218  }
4219  else
4220  {
4221  ljam();
4222  Uint32 ref = numberToRef(req->path[0], getOwnNodeId());
4223  req->pathlen--;
4224  memmove(req->path, req->path + 1, 4 * req->pathlen);
4225  sendSignal(ref, GSN_SYNC_PATH_REQ, signal,
4226  SyncPathReq::SignalLength + (1 + req->pathlen),
4227  JobBufferLevel(req->prio));
4228  }
4229 }
4230 
4231 void
4232 SimulatedBlock::execSYNC_PATH_CONF(Signal* signal)
4233 {
4234  ljamEntry();
4235  SyncPathConf conf = * CAST_CONSTPTR(SyncPathConf, signal->getDataPtr());
4237 
4238  c_syncThreadPool.getPtr(ptr, conf.senderData);
4239 
4240  if (ptr.p->m_cnt == 0)
4241  {
4242  ljam();
4243  ptr.p->m_cnt = conf.count;
4244  }
4245 
4246  if (ptr.p->m_cnt == 1)
4247  {
4248  ljam();
4249  Callback copy = ptr.p->m_callback;
4250  c_syncThreadPool.release(ptr);
4251  execute(signal, copy, 0);
4252  return;
4253  }
4254 
4255  ptr.p->m_cnt --;
4256 }
4257 
4258 
4259 bool
4261 {
4262  Uint32 ref = signal->getSendersBlockRef();
4263 
4277  if (ref == reference() ||
4278  (refToNode(ref) == getOwnNodeId() &&
4279  refToMain(ref) == NDBCNTR))
4280  {
4281  ljam();
4282  return true;
4283  }
4284 
4285  RoutePath path[2];
4286  path[0].ref = QMGR_REF;
4287  path[0].prio = JBB;
4288  path[1].ref = NDBCNTR_REF;
4289  path[1].prio = JBB;
4290 
4291  Uint32 dst[1];
4292  dst[0] = reference();
4293 
4294  SectionHandle handle(this, signal);
4295  Uint32 gsn = signal->header.theVerId_signalNumber;
4296  Uint32 len = signal->getLength();
4297 
4298  sendRoutedSignal(path, 2, dst, 1, gsn, signal, len, JBB, &handle);
4299  return false;
4300 }
4301 
4302 void
4304 {
4305 #ifdef NDBD_MULTITHREADED
4306 #else
4307  globalTransporterRegistry.setup_wakeup_socket();
4308 #endif
4309 }
4310 
4311 void
4313 {
4314 #ifdef NDBD_MULTITHREADED
4315  mt_wakeup(this);
4316 #else
4317  globalTransporterRegistry.wakeup();
4318 #endif
4319 }
4320 
4321 
4322 void
4323 SimulatedBlock::ndbinfo_send_row(Signal* signal,
4324  const DbinfoScanReq& req,
4325  const Ndbinfo::Row& row,
4326  Ndbinfo::Ratelimit& rl) const
4327 {
4328  // Check correct number of columns against table
4329  assert(row.columns() == Ndbinfo::getTable(req.tableId).columns());
4330 
4331  TransIdAI *tidai= (TransIdAI*)signal->getDataPtrSend();
4332  tidai->connectPtr= req.resultData;
4333  tidai->transId[0]= req.transId[0];
4334  tidai->transId[1]= req.transId[1];
4335 
4336  LinearSectionPtr ptr[3];
4337  ptr[0].p = row.getDataPtr();
4338  ptr[0].sz = row.getLength();
4339 
4340  rl.rows++;
4341  rl.bytes += row.getLength();
4342 
4343  sendSignal(req.resultRef, GSN_DBINFO_TRANSID_AI,
4344  signal, TransIdAI::HeaderLength, JBB, ptr, 1);
4345 }
4346 
4347 
4348 void
4349 SimulatedBlock::ndbinfo_send_scan_break(Signal* signal,
4350  DbinfoScanReq& req,
4351  const Ndbinfo::Ratelimit& rl,
4352  Uint32 data1, Uint32 data2,
4353  Uint32 data3, Uint32 data4) const
4354 {
4355  DbinfoScanConf* conf= (DbinfoScanConf*)signal->getDataPtrSend();
4356  const Uint32 signal_length = DbinfoScanReq::SignalLength + req.cursor_sz;
4357  MEMCOPY_NO_WORDS(conf, &req, signal_length);
4358 
4359  conf->returnedRows = rl.rows;
4360 
4361  // Update the cursor with current item number
4362  Ndbinfo::ScanCursor* cursor =
4363  (Ndbinfo::ScanCursor*)DbinfoScan::getCursorPtrSend(conf);
4364 
4365  cursor->data[0] = data1;
4366  cursor->data[1] = data2;
4367  cursor->data[2] = data3;
4368  cursor->data[3] = data4;
4369 
4370  // Increase number of rows and bytes sent to far
4371  cursor->totalRows += rl.rows;
4372  cursor->totalBytes += rl.bytes;
4373 
4374  Ndbinfo::ScanCursor::setHasMoreData(cursor->flags, true);
4375 
4376  sendSignal(cursor->senderRef, GSN_DBINFO_SCANCONF, signal,
4377  signal_length, JBB);
4378 }
4379 
4380 void
4381 SimulatedBlock::ndbinfo_send_scan_conf(Signal* signal,
4382  DbinfoScanReq& req,
4383  const Ndbinfo::Ratelimit& rl) const
4384 {
4385  DbinfoScanConf* conf= (DbinfoScanConf*)signal->getDataPtrSend();
4386  const Uint32 signal_length = DbinfoScanReq::SignalLength + req.cursor_sz;
4387  Uint32 sender_ref = req.resultRef;
4388  MEMCOPY_NO_WORDS(conf, &req, signal_length);
4389 
4390  conf->returnedRows = rl.rows;
4391 
4392  if (req.cursor_sz)
4393  {
4394  jam();
4395  // Update the cursor with current item number
4396  Ndbinfo::ScanCursor* cursor =
4397  (Ndbinfo::ScanCursor*)DbinfoScan::getCursorPtrSend(conf);
4398 
4399  // Reset all data holders
4400  memset(cursor->data, 0, sizeof(cursor->data));
4401 
4402  // Increase number of rows and bytes sent to far
4403  cursor->totalRows += rl.rows;
4404  cursor->totalBytes += rl.bytes;
4405 
4406  Ndbinfo::ScanCursor::setHasMoreData(cursor->flags, false);
4407 
4408  sender_ref = cursor->senderRef;
4409 
4410  }
4411  sendSignal(sender_ref, GSN_DBINFO_SCANCONF, signal,
4412  signal_length, JBB);
4413 }
4414 
4415 #ifdef VM_TRACE
4416 void
4418 {
4419 #ifdef NDBD_MULTITHREADED
4420  mt_assert_own_thread(this);
4421 #endif
4422 }
4423 
4424 #endif