MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
LocalProxy.cpp
1 /* Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
2 
3  This program is free software; you can redistribute it and/or modify
4  it under the terms of the GNU General Public License as published by
5  the Free Software Foundation; version 2 of the License.
6 
7  This program is distributed in the hope that it will be useful,
8  but WITHOUT ANY WARRANTY; without even the implied warranty of
9  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  GNU General Public License for more details.
11 
12  You should have received a copy of the GNU General Public License
13  along with this program; if not, write to the Free Software
14  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
15 
16 #include <mt.hpp>
17 #include "LocalProxy.hpp"
18 
19 LocalProxy::LocalProxy(BlockNumber blockNumber, Block_context& ctx) :
20  SimulatedBlock(blockNumber, ctx)
21 {
22  BLOCK_CONSTRUCTOR(LocalProxy);
23 
24  ndbrequire(instance() == 0); // this is main block
25  c_lqhWorkers = 0;
26  c_extraWorkers = 0; // sub-class constructor can set
27  c_workers = 0;
28  Uint32 i;
29  for (i = 0; i < MaxWorkers; i++)
30  c_worker[i] = 0;
31 
32  c_ssIdSeq = 0;
33 
34  c_typeOfStart = NodeState::ST_ILLEGAL_TYPE;
35  c_masterNodeId = ZNIL;
36 
37  // GSN_READ_CONFIG_REQ
38  addRecSignal(GSN_READ_CONFIG_REQ, &LocalProxy::execREAD_CONFIG_REQ, true);
39  addRecSignal(GSN_READ_CONFIG_CONF, &LocalProxy::execREAD_CONFIG_CONF, true);
40 
41  // GSN_STTOR
42  addRecSignal(GSN_STTOR, &LocalProxy::execSTTOR);
43  addRecSignal(GSN_STTORRY, &LocalProxy::execSTTORRY);
44 
45  // GSN_NDB_STTOR
46  addRecSignal(GSN_NDB_STTOR, &LocalProxy::execNDB_STTOR);
47  addRecSignal(GSN_NDB_STTORRY, &LocalProxy::execNDB_STTORRY);
48 
49  // GSN_READ_NODESREQ
50  addRecSignal(GSN_READ_NODESCONF, &LocalProxy::execREAD_NODESCONF);
51  addRecSignal(GSN_READ_NODESREF, &LocalProxy::execREAD_NODESREF);
52 
53  // GSN_NODE_FAILREP
54  addRecSignal(GSN_NODE_FAILREP, &LocalProxy::execNODE_FAILREP);
55  addRecSignal(GSN_NF_COMPLETEREP, &LocalProxy::execNF_COMPLETEREP);
56 
57  // GSN_INCL_NODEREQ
58  addRecSignal(GSN_INCL_NODEREQ, &LocalProxy::execINCL_NODEREQ);
59  addRecSignal(GSN_INCL_NODECONF, &LocalProxy::execINCL_NODECONF);
60 
61  // GSN_NODE_STATE_REP
62  addRecSignal(GSN_NODE_STATE_REP, &LocalProxy::execNODE_STATE_REP, true);
63 
64  // GSN_CHANGE_NODE_STATE_REQ
65  addRecSignal(GSN_CHANGE_NODE_STATE_REQ, &LocalProxy::execCHANGE_NODE_STATE_REQ, true);
66  addRecSignal(GSN_CHANGE_NODE_STATE_CONF, &LocalProxy::execCHANGE_NODE_STATE_CONF);
67 
68  // GSN_DUMP_STATE_ORD
69  addRecSignal(GSN_DUMP_STATE_ORD, &LocalProxy::execDUMP_STATE_ORD);
70 
71  // GSN_NDB_TAMPER
72  addRecSignal(GSN_NDB_TAMPER, &LocalProxy::execNDB_TAMPER, true);
73 
74  // GSN_TIME_SIGNAL
75  addRecSignal(GSN_TIME_SIGNAL, &LocalProxy::execTIME_SIGNAL);
76 
77  // GSN_CREATE_TRIG_IMPL_REQ
78  addRecSignal(GSN_CREATE_TRIG_IMPL_REQ, &LocalProxy::execCREATE_TRIG_IMPL_REQ);
79  addRecSignal(GSN_CREATE_TRIG_IMPL_CONF, &LocalProxy::execCREATE_TRIG_IMPL_CONF);
80  addRecSignal(GSN_CREATE_TRIG_IMPL_REF, &LocalProxy::execCREATE_TRIG_IMPL_REF);
81 
82  // GSN_DROP_TRIG_IMPL_REQ
83  addRecSignal(GSN_DROP_TRIG_IMPL_REQ, &LocalProxy::execDROP_TRIG_IMPL_REQ);
84  addRecSignal(GSN_DROP_TRIG_IMPL_CONF, &LocalProxy::execDROP_TRIG_IMPL_CONF);
85  addRecSignal(GSN_DROP_TRIG_IMPL_REF, &LocalProxy::execDROP_TRIG_IMPL_REF);
86 
87  // GSN_DBINFO_SCANREQ
88  addRecSignal(GSN_DBINFO_SCANREQ, &LocalProxy::execDBINFO_SCANREQ);
89  addRecSignal(GSN_DBINFO_SCANCONF, &LocalProxy::execDBINFO_SCANCONF);
90 
91  // GSN_SYNC_REQ
92  addRecSignal(GSN_SYNC_REQ, &LocalProxy::execSYNC_REQ, true);
93  addRecSignal(GSN_SYNC_REF, &LocalProxy::execSYNC_REF);
94  addRecSignal(GSN_SYNC_CONF, &LocalProxy::execSYNC_CONF);
95 
96  // GSN_SYNC_PATH_REQ
97  addRecSignal(GSN_SYNC_PATH_REQ, &LocalProxy::execSYNC_PATH_REQ, true);
98 }
99 
100 LocalProxy::~LocalProxy()
101 {
102  // dtor of main block deletes workers
103 }
104 
105 // support routines
106 
107 void
108 LocalProxy::sendREQ(Signal* signal, SsSequential& ss)
109 {
110  ss.m_worker = 0;
111  ndbrequire(ss.m_sendREQ != 0);
112  SectionHandle handle(this);
113  restoreHandle(handle, ss);
114  (this->*ss.m_sendREQ)(signal, ss.m_ssId, &handle);
115  saveSections(ss, handle);
116 }
117 
118 void
119 LocalProxy::recvCONF(Signal* signal, SsSequential& ss)
120 {
121  ndbrequire(ss.m_sendCONF != 0);
122  (this->*ss.m_sendCONF)(signal, ss.m_ssId);
123 
124  ss.m_worker++;
125  if (ss.m_worker < c_workers) {
126  jam();
127  ndbrequire(ss.m_sendREQ != 0);
128  SectionHandle handle(this);
129  (this->*ss.m_sendREQ)(signal, ss.m_ssId, &handle);
130  return;
131  }
132 }
133 
134 void
135 LocalProxy::recvREF(Signal* signal, SsSequential& ss, Uint32 error)
136 {
137  ndbrequire(error != 0);
138  if (ss.m_error == 0)
139  ss.m_error = error;
140  recvCONF(signal, ss);
141 }
142 
143 void
144 LocalProxy::skipReq(SsSequential& ss)
145 {
146 }
147 
148 void
149 LocalProxy::skipConf(SsSequential& ss)
150 {
151 }
152 
153 void
154 LocalProxy::saveSections(SsCommon& ss, SectionHandle & handle)
155 {
156  ss.m_sec_cnt = handle.m_cnt;
157  for (Uint32 i = 0; i<ss.m_sec_cnt; i++)
158  ss.m_sec_ptr[i] = handle.m_ptr[i].i;
159  handle.clear();
160 }
161 
162 void
163 LocalProxy::restoreHandle(SectionHandle & handle, SsCommon& ss)
164 {
165  handle.m_cnt = ss.m_sec_cnt;
166  for (Uint32 i = 0; i<ss.m_sec_cnt; i++)
167  handle.m_ptr[i].i = ss.m_sec_ptr[i];
168 
169  getSections(handle.m_cnt, handle.m_ptr);
170  ss.m_sec_cnt = 0;
171 }
172 
173 bool
174 LocalProxy::firstReply(const SsSequential& ss)
175 {
176  return ss.m_worker == 0;
177 }
178 
179 bool
180 LocalProxy::lastReply(const SsSequential& ss)
181 {
182  return ss.m_worker + 1 == c_workers;
183 }
184 
185 void
186 LocalProxy::sendREQ(Signal* signal, SsParallel& ss)
187 {
188  ndbrequire(ss.m_sendREQ != 0);
189 
190  ss.m_workerMask.clear();
191  ss.m_worker = 0;
192  const Uint32 count = ss.m_extraLast ? c_lqhWorkers : c_workers;
193  SectionHandle handle(this);
194  restoreHandle(handle, ss);
195  while (ss.m_worker < count) {
196  jam();
197  ss.m_workerMask.set(ss.m_worker);
198  (this->*ss.m_sendREQ)(signal, ss.m_ssId, &handle);
199  ss.m_worker++;
200  }
201  releaseSections(handle);
202 }
203 
204 void
205 LocalProxy::recvCONF(Signal* signal, SsParallel& ss)
206 {
207  ndbrequire(ss.m_sendCONF != 0);
208 
209  BlockReference ref = signal->getSendersBlockRef();
210  ndbrequire(refToMain(ref) == number());
211 
212  Uint32 ino = refToInstance(ref);
213  ss.m_worker = workerIndex(ino);
214  ndbrequire(ref == workerRef(ss.m_worker));
215  ndbrequire(ss.m_worker < c_workers);
216  ndbrequire(ss.m_workerMask.get(ss.m_worker));
217  ss.m_workerMask.clear(ss.m_worker);
218 
219  (this->*ss.m_sendCONF)(signal, ss.m_ssId);
220 }
221 
222 void
223 LocalProxy::recvREF(Signal* signal, SsParallel& ss, Uint32 error)
224 {
225  ndbrequire(error != 0);
226  if (ss.m_error == 0)
227  ss.m_error = error;
228  recvCONF(signal, ss);
229 }
230 
231 void
232 LocalProxy::skipReq(SsParallel& ss)
233 {
234  ndbrequire(ss.m_workerMask.get(ss.m_worker));
235  ss.m_workerMask.clear(ss.m_worker);
236 }
237 
238 // more replies expected from this worker
239 void
240 LocalProxy::skipConf(SsParallel& ss)
241 {
242  ndbrequire(!ss.m_workerMask.get(ss.m_worker));
243  ss.m_workerMask.set(ss.m_worker);
244 }
245 
246 bool
247 LocalProxy::firstReply(const SsParallel& ss)
248 {
249  const WorkerMask& mask = ss.m_workerMask;
250  const Uint32 count = mask.count();
251 
252  // recvCONF has cleared current worker
253  ndbrequire(ss.m_worker < c_workers);
254  ndbrequire(!mask.get(ss.m_worker));
255  ndbrequire(count < c_workers);
256  return count + 1 == c_workers;
257 }
258 
259 bool
260 LocalProxy::lastReply(const SsParallel& ss)
261 {
262  return ss.m_workerMask.isclear();
263 }
264 
265 bool
266 LocalProxy::lastExtra(Signal* signal, SsParallel& ss)
267 {
268  SectionHandle handle(this);
269  if (c_lqhWorkers + ss.m_extraSent < c_workers) {
270  jam();
271  ss.m_worker = c_lqhWorkers + ss.m_extraSent;
272  ss.m_workerMask.set(ss.m_worker);
273  (this->*ss.m_sendREQ)(signal, ss.m_ssId, &handle);
274  ss.m_extraSent++;
275  return false;
276  }
277  return true;
278 }
279 
280 // used in "reverse" proxying (start with worker REQs)
281 void
282 LocalProxy::setMask(SsParallel& ss)
283 {
284  Uint32 i;
285  for (i = 0; i < c_workers; i++)
286  ss.m_workerMask.set(i);
287 }
288 
289 void
290 LocalProxy::setMask(SsParallel& ss, const WorkerMask& mask)
291 {
292  ss.m_workerMask.assign(mask);
293 }
294 
295 // load workers (before first signal)
296 
297 void
298 LocalProxy::loadWorkers()
299 {
300  c_lqhWorkers = getLqhWorkers();
301  c_workers = c_lqhWorkers + c_extraWorkers;
302 
303  Uint32 i;
304  for (i = 0; i < c_workers; i++) {
305  jam();
306  Uint32 instanceNo = workerInstance(i);
307 
308  SimulatedBlock* worker = newWorker(instanceNo);
309  ndbrequire(worker->instance() == instanceNo);
310  ndbrequire(this->getInstance(instanceNo) == worker);
311  c_worker[i] = worker;
312 
313  if (i < c_lqhWorkers) {
314  add_lqh_worker_thr_map(number(), instanceNo);
315  } else {
316  add_extra_worker_thr_map(number(), instanceNo);
317  }
318  }
319 }
320 
321 // GSN_READ_CONFIG_REQ
322 
323 void
324 LocalProxy::execREAD_CONFIG_REQ(Signal* signal)
325 {
326  Ss_READ_CONFIG_REQ& ss = ssSeize<Ss_READ_CONFIG_REQ>(1);
327 
328  const ReadConfigReq* req = (const ReadConfigReq*)signal->getDataPtr();
329  ss.m_req = *req;
330  ndbrequire(ss.m_req.noOfParameters == 0);
331  callREAD_CONFIG_REQ(signal);
332 }
333 
334 void
335 LocalProxy::callREAD_CONFIG_REQ(Signal* signal)
336 {
337  backREAD_CONFIG_REQ(signal);
338 }
339 
340 void
341 LocalProxy::backREAD_CONFIG_REQ(Signal* signal)
342 {
343  Ss_READ_CONFIG_REQ& ss = ssFind<Ss_READ_CONFIG_REQ>(1);
344 
345  // run sequentially due to big mallocs and initializations
346  sendREQ(signal, ss);
347 }
348 
349 void
350 LocalProxy::sendREAD_CONFIG_REQ(Signal* signal, Uint32 ssId,
351  SectionHandle* handle)
352 {
353  Ss_READ_CONFIG_REQ& ss = ssFind<Ss_READ_CONFIG_REQ>(ssId);
354 
355  ReadConfigReq* req = (ReadConfigReq*)signal->getDataPtrSend();
356  req->senderRef = reference();
357  req->senderData = ssId;
358  req->noOfParameters = 0;
359  sendSignalNoRelease(workerRef(ss.m_worker), GSN_READ_CONFIG_REQ,
360  signal, ReadConfigReq::SignalLength, JBB, handle);
361 }
362 
363 void
364 LocalProxy::execREAD_CONFIG_CONF(Signal* signal)
365 {
366  const ReadConfigConf* conf = (const ReadConfigConf*)signal->getDataPtr();
367  Uint32 ssId = conf->senderData;
368  Ss_READ_CONFIG_REQ& ss = ssFind<Ss_READ_CONFIG_REQ>(ssId);
369  recvCONF(signal, ss);
370 }
371 
372 void
373 LocalProxy::sendREAD_CONFIG_CONF(Signal* signal, Uint32 ssId)
374 {
375  Ss_READ_CONFIG_REQ& ss = ssFind<Ss_READ_CONFIG_REQ>(ssId);
376 
377  if (!lastReply(ss))
378  return;
379 
380  SectionHandle handle(this);
381  restoreHandle(handle, ss);
382  releaseSections(handle);
383 
384  ReadConfigConf* conf = (ReadConfigConf*)signal->getDataPtrSend();
385  conf->senderRef = reference();
386  conf->senderData = ss.m_req.senderData;
387  sendSignal(ss.m_req.senderRef, GSN_READ_CONFIG_CONF,
388  signal, ReadConfigConf::SignalLength, JBB);
389 
390  ssRelease<Ss_READ_CONFIG_REQ>(ssId);
391 }
392 
393 // GSN_STTOR
394 
395 void
396 LocalProxy::execSTTOR(Signal* signal)
397 {
398  Ss_STTOR& ss = ssSeize<Ss_STTOR>(1);
399 
400  const Uint32 startphase = signal->theData[1];
401  const Uint32 typeOfStart = signal->theData[7];
402 
403  if (startphase == 3) {
404  jam();
405  c_typeOfStart = typeOfStart;
406  }
407 
408  ss.m_reqlength = signal->getLength();
409  memcpy(ss.m_reqdata, signal->getDataPtr(), ss.m_reqlength << 2);
410 
411  callSTTOR(signal);
412 }
413 
414 void
415 LocalProxy::callSTTOR(Signal* signal)
416 {
417  backSTTOR(signal);
418 }
419 
420 void
421 LocalProxy::backSTTOR(Signal* signal)
422 {
423  Ss_STTOR& ss = ssFind<Ss_STTOR>(1);
424  sendREQ(signal, ss);
425 }
426 
427 void
428 LocalProxy::sendSTTOR(Signal* signal, Uint32 ssId, SectionHandle* handle)
429 {
430  Ss_STTOR& ss = ssFind<Ss_STTOR>(ssId);
431 
432  memcpy(signal->getDataPtrSend(), ss.m_reqdata, ss.m_reqlength << 2);
433  sendSignalNoRelease(workerRef(ss.m_worker), GSN_STTOR,
434  signal, ss.m_reqlength, JBB, handle);
435 }
436 
437 void
438 LocalProxy::execSTTORRY(Signal* signal)
439 {
440  Ss_STTOR& ss = ssFind<Ss_STTOR>(1);
441  recvCONF(signal, ss);
442 }
443 
444 void
445 LocalProxy::sendSTTORRY(Signal* signal, Uint32 ssId)
446 {
447  Ss_STTOR& ss = ssFind<Ss_STTOR>(ssId);
448 
449  const Uint32 conflength = signal->getLength();
450  const Uint32* confdata = signal->getDataPtr();
451 
452  // the reply is identical from all
453  if (firstReply(ss)) {
454  ss.m_conflength = conflength;
455  memcpy(ss.m_confdata, confdata, conflength << 2);
456  } else {
457  ndbrequire(ss.m_conflength == conflength);
458  ndbrequire(memcmp(ss.m_confdata, confdata, conflength << 2) == 0);
459  }
460 
461  if (!lastReply(ss))
462  return;
463 
464  memcpy(signal->getDataPtrSend(), ss.m_confdata, ss.m_conflength << 2);
465  sendSignal(NDBCNTR_REF, GSN_STTORRY,
466  signal, ss.m_conflength, JBB);
467 
468  ssRelease<Ss_STTOR>(ssId);
469 }
470 
471 // GSN_NDB_STTOR
472 
473 void
474 LocalProxy::execNDB_STTOR(Signal* signal)
475 {
476  Ss_NDB_STTOR& ss = ssSeize<Ss_NDB_STTOR>(1);
477 
478  const NdbSttor* req = (const NdbSttor*)signal->getDataPtr();
479  ss.m_req = *req;
480 
481  callNDB_STTOR(signal);
482 }
483 
484 void
485 LocalProxy::callNDB_STTOR(Signal* signal)
486 {
487  backNDB_STTOR(signal);
488 }
489 
490 void
491 LocalProxy::backNDB_STTOR(Signal* signal)
492 {
493  Ss_NDB_STTOR& ss = ssFind<Ss_NDB_STTOR>(1);
494  sendREQ(signal, ss);
495 }
496 
497 void
498 LocalProxy::sendNDB_STTOR(Signal* signal, Uint32 ssId, SectionHandle* handle)
499 {
500  Ss_NDB_STTOR& ss = ssFind<Ss_NDB_STTOR>(ssId);
501 
502  NdbSttor* req = (NdbSttor*)signal->getDataPtrSend();
503  *req = ss.m_req;
504  req->senderRef = reference();
505  sendSignalNoRelease(workerRef(ss.m_worker), GSN_NDB_STTOR,
506  signal, ss.m_reqlength, JBB, handle);
507 }
508 
509 void
510 LocalProxy::execNDB_STTORRY(Signal* signal)
511 {
512  Ss_NDB_STTOR& ss = ssFind<Ss_NDB_STTOR>(1);
513 
514  // the reply contains only senderRef
515  const NdbSttorry* conf = (const NdbSttorry*)signal->getDataPtr();
516  ndbrequire(conf->senderRef == signal->getSendersBlockRef());
517  recvCONF(signal, ss);
518 }
519 
520 void
521 LocalProxy::sendNDB_STTORRY(Signal* signal, Uint32 ssId)
522 {
523  Ss_NDB_STTOR& ss = ssFind<Ss_NDB_STTOR>(ssId);
524 
525  if (!lastReply(ss))
526  return;
527 
528  NdbSttorry* conf = (NdbSttorry*)signal->getDataPtrSend();
529  conf->senderRef = reference();
530  sendSignal(NDBCNTR_REF, GSN_NDB_STTORRY,
531  signal, NdbSttorry::SignalLength, JBB);
532 
533  ssRelease<Ss_NDB_STTOR>(ssId);
534 }
535 
536 // GSN_READ_NODESREQ
537 
538 void
539 LocalProxy::sendREAD_NODESREQ(Signal* signal)
540 {
541  signal->theData[0] = reference();
542  sendSignal(NDBCNTR_REF, GSN_READ_NODESREQ, signal, 1, JBB);
543 }
544 
545 void
546 LocalProxy::execREAD_NODESCONF(Signal* signal)
547 {
548  Ss_READ_NODES_REQ& ss = c_ss_READ_NODESREQ;
549 
550  const ReadNodesConf* conf = (const ReadNodesConf*)signal->getDataPtr();
551 
552  c_masterNodeId = conf->masterNodeId;
553 
554  switch (ss.m_gsn) {
555  case GSN_STTOR:
556  backSTTOR(signal);
557  break;
558  case GSN_NDB_STTOR:
559  backNDB_STTOR(signal);
560  break;
561  default:
562  ndbrequire(false);
563  break;
564  }
565 
566  ss.m_gsn = 0;
567 }
568 
569 void
570 LocalProxy::execREAD_NODESREF(Signal* signal)
571 {
572  Ss_READ_NODES_REQ& ss = c_ss_READ_NODESREQ;
573  ndbrequire(ss.m_gsn != 0);
574  ndbrequire(false);
575 }
576 
577 // GSN_NODE_FAILREP
578 
579 void
580 LocalProxy::execNODE_FAILREP(Signal* signal)
581 {
582  Ss_NODE_FAILREP& ss = ssFindSeize<Ss_NODE_FAILREP>(1, 0);
583  const NodeFailRep* req = (const NodeFailRep*)signal->getDataPtr();
584  ss.m_req = *req;
585  ndbrequire(signal->getLength() == NodeFailRep::SignalLength);
586 
587  NdbNodeBitmask mask;
588  mask.assign(NdbNodeBitmask::Size, req->theNodes);
589 
590  // from each worker wait for ack for each failed node
591  for (Uint32 i = 0; i < c_workers; i++)
592  {
593  jam();
594  NdbNodeBitmask& waitFor = ss.m_waitFor[i];
595  waitFor.bitOR(mask);
596  }
597 
598  sendREQ(signal, ss);
599  if (ss.noReply(number()))
600  {
601  jam();
602  ssRelease<Ss_NODE_FAILREP>(ss);
603  }
604 }
605 
606 void
607 LocalProxy::sendNODE_FAILREP(Signal* signal, Uint32 ssId, SectionHandle* handle)
608 {
609  Ss_NODE_FAILREP& ss = ssFind<Ss_NODE_FAILREP>(ssId);
610 
611  NodeFailRep* req = (NodeFailRep*)signal->getDataPtrSend();
612  *req = ss.m_req;
613  sendSignalNoRelease(workerRef(ss.m_worker), GSN_NODE_FAILREP,
614  signal, NodeFailRep::SignalLength, JBB, handle);
615 }
616 
617 void
618 LocalProxy::execNF_COMPLETEREP(Signal* signal)
619 {
620  Ss_NODE_FAILREP& ss = ssFind<Ss_NODE_FAILREP>(1);
621  ndbrequire(!ss.noReply(number()));
622  ss.m_workerMask.set(ss.m_worker); // Avoid require in recvCONF
623  recvCONF(signal, ss);
624 }
625 
626 void
628 {
629  Ss_NODE_FAILREP& ss = ssFind<Ss_NODE_FAILREP>(ssId);
630 
631  const NFCompleteRep* conf = (const NFCompleteRep*)signal->getDataPtr();
632  Uint32 node = conf->failedNodeId;
633 
634  {
635  NdbNodeBitmask& waitFor = ss.m_waitFor[ss.m_worker];
636  ndbrequire(waitFor.get(node));
637  waitFor.clear(node);
638  }
639 
640  for (Uint32 i = 0; i < c_workers; i++)
641  {
642  jam();
643  NdbNodeBitmask& waitFor = ss.m_waitFor[i];
644  if (waitFor.get(node))
645  {
646  jam();
650  return;
651  }
652  }
653 
654  {
655  NFCompleteRep* conf = (NFCompleteRep*)signal->getDataPtrSend();
656  conf->blockNo = number();
657  conf->nodeId = getOwnNodeId();
658  conf->failedNodeId = node;
659  conf->unused = 0;
660  conf->from = __LINE__;
661 
662  sendSignal(DBDIH_REF, GSN_NF_COMPLETEREP,
663  signal, NFCompleteRep::SignalLength, JBB);
664 
665  if (number() == DBTC)
666  {
673  jam();
674  sendSignal(QMGR_REF, GSN_NF_COMPLETEREP, signal,
675  NFCompleteRep::SignalLength, JBB);
676  }
677  }
678 }
679 
680 // GSN_INCL_NODEREQ
681 
682 void
683 LocalProxy::execINCL_NODEREQ(Signal* signal)
684 {
685  Ss_INCL_NODEREQ& ss = ssSeize<Ss_INCL_NODEREQ>(1);
686 
687  ss.m_reqlength = signal->getLength();
688  ndbrequire(sizeof(ss.m_req) >= (ss.m_reqlength << 2));
689  memcpy(&ss.m_req, signal->getDataPtr(), ss.m_reqlength << 2);
690 
691  sendREQ(signal, ss);
692 }
693 
694 void
695 LocalProxy::sendINCL_NODEREQ(Signal* signal, Uint32 ssId, SectionHandle* handle)
696 {
697  Ss_INCL_NODEREQ& ss = ssFind<Ss_INCL_NODEREQ>(ssId);
698 
699  Ss_INCL_NODEREQ::Req* req =
700  (Ss_INCL_NODEREQ::Req*)signal->getDataPtrSend();
701 
702  memcpy(req, &ss.m_req, ss.m_reqlength << 2);
703  req->senderRef = reference();
704  sendSignalNoRelease(workerRef(ss.m_worker), GSN_INCL_NODEREQ,
705  signal, ss.m_reqlength, JBB, handle);
706 }
707 
708 void
709 LocalProxy::execINCL_NODECONF(Signal* signal)
710 {
711  Ss_INCL_NODEREQ& ss = ssFind<Ss_INCL_NODEREQ>(1);
712  recvCONF(signal, ss);
713 }
714 
715 void
716 LocalProxy::sendINCL_NODECONF(Signal* signal, Uint32 ssId)
717 {
718  Ss_INCL_NODEREQ& ss = ssFind<Ss_INCL_NODEREQ>(ssId);
719 
720  if (!lastReply(ss))
721  return;
722 
723  Ss_INCL_NODEREQ::Conf* conf =
724  (Ss_INCL_NODEREQ::Conf*)signal->getDataPtrSend();
725 
726  conf->inclNodeId = ss.m_req.inclNodeId;
727  conf->senderRef = reference();
728  sendSignal(ss.m_req.senderRef, GSN_INCL_NODECONF,
729  signal, 2, JBB);
730 
731  ssRelease<Ss_INCL_NODEREQ>(ssId);
732 }
733 
734 // GSN_NODE_STATE_REP
735 
736 void
737 LocalProxy::execNODE_STATE_REP(Signal* signal)
738 {
739  Ss_NODE_STATE_REP& ss = ssSeize<Ss_NODE_STATE_REP>();
740  sendREQ(signal, ss);
741  SimulatedBlock::execNODE_STATE_REP(signal);
742  ssRelease<Ss_NODE_STATE_REP>(ss);
743 }
744 
745 void
746 LocalProxy::sendNODE_STATE_REP(Signal* signal, Uint32 ssId,
747  SectionHandle* handle)
748 {
749  Ss_NODE_STATE_REP& ss = ssFind<Ss_NODE_STATE_REP>(ssId);
750 
751  sendSignalNoRelease(workerRef(ss.m_worker), GSN_NODE_STATE_REP,
752  signal,NodeStateRep::SignalLength, JBB, handle);
753 }
754 
755 // GSN_CHANGE_NODE_STATE_REQ
756 
757 void
758 LocalProxy::execCHANGE_NODE_STATE_REQ(Signal* signal)
759 {
760  Ss_CHANGE_NODE_STATE_REQ& ss = ssSeize<Ss_CHANGE_NODE_STATE_REQ>(1);
761 
762  ChangeNodeStateReq * req = (ChangeNodeStateReq*)signal->getDataPtrSend();
763  ss.m_req = *req;
764 
765  sendREQ(signal, ss);
766 }
767 
768 void
769 LocalProxy::sendCHANGE_NODE_STATE_REQ(Signal* signal, Uint32 ssId,
770  SectionHandle* handle)
771 {
772  Ss_CHANGE_NODE_STATE_REQ& ss = ssFind<Ss_CHANGE_NODE_STATE_REQ>(ssId);
773 
774  ChangeNodeStateReq * req = (ChangeNodeStateReq*)signal->getDataPtrSend();
775  req->senderRef = reference();
776 
777  sendSignalNoRelease(workerRef(ss.m_worker), GSN_CHANGE_NODE_STATE_REQ,
778  signal, ChangeNodeStateReq::SignalLength, JBB, handle);
779 }
780 
781 void
782 LocalProxy::execCHANGE_NODE_STATE_CONF(Signal* signal)
783 {
784  Ss_CHANGE_NODE_STATE_REQ& ss = ssFind<Ss_CHANGE_NODE_STATE_REQ>(1);
785 
786  ChangeNodeStateConf * conf = (ChangeNodeStateConf*)signal->getDataPtrSend();
787  ndbrequire(conf->senderData == ss.m_req.senderData);
788  recvCONF(signal, ss);
789 }
790 
791 void
793 {
794  Ss_CHANGE_NODE_STATE_REQ& ss = ssFind<Ss_CHANGE_NODE_STATE_REQ>(ssId);
795 
796  if (!lastReply(ss))
797  return;
798 
802  ChangeNodeStateReq * req = (ChangeNodeStateReq*)signal->getDataPtrSend();
803  * req = ss.m_req;
805  ssRelease<Ss_CHANGE_NODE_STATE_REQ>(ssId);
806 }
807 
808 // GSN_DUMP_STATE_ORD
809 
810 void
811 LocalProxy::execDUMP_STATE_ORD(Signal* signal)
812 {
813  Ss_DUMP_STATE_ORD& ss = ssSeize<Ss_DUMP_STATE_ORD>();
814 
815  ss.m_reqlength = signal->getLength();
816  memcpy(ss.m_reqdata, signal->getDataPtr(), ss.m_reqlength << 2);
817  sendREQ(signal, ss);
818  ssRelease<Ss_DUMP_STATE_ORD>(ss);
819 }
820 
821 void
822 LocalProxy::sendDUMP_STATE_ORD(Signal* signal, Uint32 ssId,
823  SectionHandle* handle)
824 {
825  Ss_DUMP_STATE_ORD& ss = ssFind<Ss_DUMP_STATE_ORD>(ssId);
826 
827  memcpy(signal->getDataPtrSend(), ss.m_reqdata, ss.m_reqlength << 2);
828  sendSignalNoRelease(workerRef(ss.m_worker), GSN_DUMP_STATE_ORD,
829  signal, ss.m_reqlength, JBB, handle);
830 }
831 
832 // GSN_NDB_TAMPER
833 
834 void
835 LocalProxy::execNDB_TAMPER(Signal* signal)
836 {
837  Ss_NDB_TAMPER& ss = ssSeize<Ss_NDB_TAMPER>();
838 
839  ndbrequire(signal->getLength() == 1);
840  ss.m_errorInsert = signal->theData[0];
841 
842  SimulatedBlock::execNDB_TAMPER(signal);
843  sendREQ(signal, ss);
844  ssRelease<Ss_NDB_TAMPER>(ss);
845 }
846 
847 void
848 LocalProxy::sendNDB_TAMPER(Signal* signal, Uint32 ssId, SectionHandle* handle)
849 {
850  Ss_NDB_TAMPER& ss = ssFind<Ss_NDB_TAMPER>(ssId);
851 
852  signal->theData[0] = ss.m_errorInsert;
853  sendSignalNoRelease(workerRef(ss.m_worker), GSN_NDB_TAMPER,
854  signal, 1, JBB, handle);
855 }
856 
857 // GSN_TIME_SIGNAL
858 
859 void
860 LocalProxy::execTIME_SIGNAL(Signal* signal)
861 {
862  Ss_TIME_SIGNAL& ss = ssSeize<Ss_TIME_SIGNAL>();
863 
864  sendREQ(signal, ss);
865  ssRelease<Ss_TIME_SIGNAL>(ss);
866 }
867 
868 void
869 LocalProxy::sendTIME_SIGNAL(Signal* signal, Uint32 ssId, SectionHandle* handle)
870 {
871  Ss_TIME_SIGNAL& ss = ssFind<Ss_TIME_SIGNAL>(ssId);
872  signal->theData[0] = 0;
873  sendSignalNoRelease(workerRef(ss.m_worker), GSN_TIME_SIGNAL,
874  signal, 1, JBB, handle);
875 }
876 
877 // GSN_CREATE_TRIG_IMPL_REQ
878 
879 void
880 LocalProxy::execCREATE_TRIG_IMPL_REQ(Signal* signal)
881 {
882  if (!assembleFragments(signal))
883  return;
884 
885  if (ssQueue<Ss_CREATE_TRIG_IMPL_REQ>(signal))
886  return;
887  const CreateTrigImplReq* req = (const CreateTrigImplReq*)signal->getDataPtr();
888  Ss_CREATE_TRIG_IMPL_REQ& ss = ssSeize<Ss_CREATE_TRIG_IMPL_REQ>();
889  ss.m_req = *req;
890  ndbrequire(signal->getLength() <= CreateTrigImplReq::SignalLength);
891 
892  SectionHandle handle(this, signal);
893  saveSections(ss, handle);
894 
895  sendREQ(signal, ss);
896 }
897 
898 void
899 LocalProxy::sendCREATE_TRIG_IMPL_REQ(Signal* signal, Uint32 ssId,
900  SectionHandle * handle)
901 {
902  Ss_CREATE_TRIG_IMPL_REQ& ss = ssFind<Ss_CREATE_TRIG_IMPL_REQ>(ssId);
903 
904  CreateTrigImplReq* req = (CreateTrigImplReq*)signal->getDataPtrSend();
905  *req = ss.m_req;
906  req->senderRef = reference();
907  req->senderData = ssId;
908  sendSignalNoRelease(workerRef(ss.m_worker), GSN_CREATE_TRIG_IMPL_REQ,
909  signal, CreateTrigImplReq::SignalLength, JBB,
910  handle);
911 }
912 
913 void
914 LocalProxy::execCREATE_TRIG_IMPL_CONF(Signal* signal)
915 {
916  const CreateTrigImplConf* conf = (const CreateTrigImplConf*)signal->getDataPtr();
917  Uint32 ssId = conf->senderData;
918  Ss_CREATE_TRIG_IMPL_REQ& ss = ssFind<Ss_CREATE_TRIG_IMPL_REQ>(ssId);
919  recvCONF(signal, ss);
920 }
921 
922 void
923 LocalProxy::execCREATE_TRIG_IMPL_REF(Signal* signal)
924 {
925  const CreateTrigImplRef* ref = (const CreateTrigImplRef*)signal->getDataPtr();
926  Uint32 ssId = ref->senderData;
927  Ss_CREATE_TRIG_IMPL_REQ& ss = ssFind<Ss_CREATE_TRIG_IMPL_REQ>(ssId);
928  recvREF(signal, ss, ref->errorCode);
929 }
930 
931 void
932 LocalProxy::sendCREATE_TRIG_IMPL_CONF(Signal* signal, Uint32 ssId)
933 {
934  Ss_CREATE_TRIG_IMPL_REQ& ss = ssFind<Ss_CREATE_TRIG_IMPL_REQ>(ssId);
935  BlockReference dictRef = ss.m_req.senderRef;
936 
937  if (!lastReply(ss))
938  return;
939 
940  if (ss.m_error == 0) {
941  jam();
942  CreateTrigImplConf* conf = (CreateTrigImplConf*)signal->getDataPtrSend();
943  conf->senderRef = reference();
944  conf->senderData = ss.m_req.senderData;
945  conf->tableId = ss.m_req.tableId;
946  conf->triggerId = ss.m_req.triggerId;
947  conf->triggerInfo = ss.m_req.triggerInfo;
948  sendSignal(dictRef, GSN_CREATE_TRIG_IMPL_CONF,
949  signal, CreateTrigImplConf::SignalLength, JBB);
950  } else {
951  CreateTrigImplRef* ref = (CreateTrigImplRef*)signal->getDataPtrSend();
952  ref->senderRef = reference();
953  ref->senderData = ss.m_req.senderData;
954  ref->tableId = ss.m_req.tableId;
955  ref->triggerId = ss.m_req.triggerId;
956  ref->triggerInfo = ss.m_req.triggerInfo;
957  ref->errorCode = ss.m_error;
958  sendSignal(dictRef, GSN_CREATE_TRIG_IMPL_REF,
959  signal, CreateTrigImplRef::SignalLength, JBB);
960  }
961 
962  ssRelease<Ss_CREATE_TRIG_IMPL_REQ>(ssId);
963 }
964 
965 // GSN_DROP_TRIG_IMPL_REQ
966 
967 void
968 LocalProxy::execDROP_TRIG_IMPL_REQ(Signal* signal)
969 {
970  if (ssQueue<Ss_DROP_TRIG_IMPL_REQ>(signal))
971  return;
972  const DropTrigImplReq* req = (const DropTrigImplReq*)signal->getDataPtr();
973  Ss_DROP_TRIG_IMPL_REQ& ss = ssSeize<Ss_DROP_TRIG_IMPL_REQ>();
974  ss.m_req = *req;
975  ndbrequire(signal->getLength() == DropTrigImplReq::SignalLength);
976  sendREQ(signal, ss);
977 }
978 
979 void
980 LocalProxy::sendDROP_TRIG_IMPL_REQ(Signal* signal, Uint32 ssId,
981  SectionHandle * handle)
982 {
983  Ss_DROP_TRIG_IMPL_REQ& ss = ssFind<Ss_DROP_TRIG_IMPL_REQ>(ssId);
984 
985  DropTrigImplReq* req = (DropTrigImplReq*)signal->getDataPtrSend();
986  *req = ss.m_req;
987  req->senderRef = reference();
988  req->senderData = ssId;
989  sendSignalNoRelease(workerRef(ss.m_worker), GSN_DROP_TRIG_IMPL_REQ,
990  signal, DropTrigImplReq::SignalLength, JBB, handle);
991 }
992 
993 void
994 LocalProxy::execDROP_TRIG_IMPL_CONF(Signal* signal)
995 {
996  const DropTrigImplConf* conf = (const DropTrigImplConf*)signal->getDataPtr();
997  Uint32 ssId = conf->senderData;
998  Ss_DROP_TRIG_IMPL_REQ& ss = ssFind<Ss_DROP_TRIG_IMPL_REQ>(ssId);
999  recvCONF(signal, ss);
1000 }
1001 
1002 void
1003 LocalProxy::execDROP_TRIG_IMPL_REF(Signal* signal)
1004 {
1005  const DropTrigImplRef* ref = (const DropTrigImplRef*)signal->getDataPtr();
1006  Uint32 ssId = ref->senderData;
1007  Ss_DROP_TRIG_IMPL_REQ& ss = ssFind<Ss_DROP_TRIG_IMPL_REQ>(ssId);
1008  recvREF(signal, ss, ref->errorCode);
1009 }
1010 
1011 void
1012 LocalProxy::sendDROP_TRIG_IMPL_CONF(Signal* signal, Uint32 ssId)
1013 {
1014  Ss_DROP_TRIG_IMPL_REQ& ss = ssFind<Ss_DROP_TRIG_IMPL_REQ>(ssId);
1015  BlockReference dictRef = ss.m_req.senderRef;
1016 
1017  if (!lastReply(ss))
1018  return;
1019 
1020  if (ss.m_error == 0) {
1021  jam();
1022  DropTrigImplConf* conf = (DropTrigImplConf*)signal->getDataPtrSend();
1023  conf->senderRef = reference();
1024  conf->senderData = ss.m_req.senderData;
1025  conf->tableId = ss.m_req.tableId;
1026  conf->triggerId = ss.m_req.triggerId;
1027  sendSignal(dictRef, GSN_DROP_TRIG_IMPL_CONF,
1028  signal, DropTrigImplConf::SignalLength, JBB);
1029  } else {
1030  DropTrigImplRef* ref = (DropTrigImplRef*)signal->getDataPtrSend();
1031  ref->senderRef = reference();
1032  ref->senderData = ss.m_req.senderData;
1033  ref->tableId = ss.m_req.tableId;
1034  ref->triggerId = ss.m_req.triggerId;
1035  ref->errorCode = ss.m_error;
1036  sendSignal(dictRef, GSN_DROP_TRIG_IMPL_REF,
1037  signal, DropTrigImplRef::SignalLength, JBB);
1038  }
1039 
1040  ssRelease<Ss_DROP_TRIG_IMPL_REQ>(ssId);
1041 }
1042 
1043 // GSN_DBINFO_SCANREQ
1044 
1045 //#define DBINFO_SCAN_TRACE
1046 #ifdef DBINFO_SCAN_TRACE
1047 #include <debugger/DebuggerNames.hpp>
1048 #endif
1049 
1050 static Uint32
1051 switchRef(Uint32 block, Uint32 instance, Uint32 node)
1052 {
1053  const Uint32 ref = numberToRef(block, instance, node);
1054 #ifdef DBINFO_SCAN_TRACE
1055  ndbout_c("Dbinfo::LocalProxy: switching to %s(%d) in node %d, ref: 0x%.8x",
1056  getBlockName(block, "<unknown>"), instance, node, ref);
1057 #endif
1058  return ref;
1059 }
1060 
1061 
1062 bool
1063 LocalProxy::find_next(Ndbinfo::ScanCursor* cursor) const
1064 {
1065  const Uint32 node = refToNode(cursor->currRef);
1066  const Uint32 block = refToMain(cursor->currRef);
1067  Uint32 instance = refToInstance(cursor->currRef);
1068 
1069  ndbrequire(node == getOwnNodeId());
1070  ndbrequire(block == number());
1071 
1072 
1073  Uint32 worker = (instance > 0) ? workerIndex(instance) + 1 : 0;
1074 
1075  if (worker < c_workers)
1076  {
1077  jam();
1078  cursor->currRef = switchRef(block, workerInstance(worker), node);
1079  return true;
1080  }
1081 
1082  cursor->currRef = numberToRef(block, node);
1083  return false;
1084 }
1085 
1086 
1087 
1088 void
1089 LocalProxy::execDBINFO_SCANREQ(Signal* signal)
1090 {
1091  jamEntry();
1092  const DbinfoScanReq* req = (const DbinfoScanReq*) signal->getDataPtr();
1093  Uint32 signal_length = signal->getLength();
1094  ndbrequire(signal_length == DbinfoScanReq::SignalLength+req->cursor_sz);
1095 
1096  Ndbinfo::ScanCursor* cursor =
1097  (Ndbinfo::ScanCursor*)DbinfoScan::getCursorPtr(req);
1098 
1099  if (Ndbinfo::ScanCursor::getHasMoreData(cursor->flags) &&
1100  cursor->saveCurrRef)
1101  {
1102  jam();
1103  /* Continue in the saved block ref */
1104  cursor->currRef = cursor->saveCurrRef;
1105  cursor->saveCurrRef = 0;
1106 
1107  // Set this block as sender and remember original sender
1108  cursor->saveSenderRef = cursor->senderRef;
1109  cursor->senderRef = reference();
1110 
1111  sendSignal(cursor->currRef, GSN_DBINFO_SCANREQ,
1112  signal, signal_length, JBB);
1113  return;
1114  }
1115 
1116  Ndbinfo::ScanCursor::setHasMoreData(cursor->flags, false);
1117 
1118  if (find_next(cursor))
1119  {
1120  jam();
1121  ndbrequire(cursor->currRef);
1122  ndbrequire(cursor->saveCurrRef == 0);
1123 
1124  // Set this block as sender and remember original sender
1125  cursor->saveSenderRef = cursor->senderRef;
1126  cursor->senderRef = reference();
1127 
1128  sendSignal(cursor->currRef, GSN_DBINFO_SCANREQ,
1129  signal, signal_length, JBB);
1130  return;
1131  }
1132 
1133  /* Scan is done, send SCANCONF back to caller */
1134  ndbrequire(cursor->saveSenderRef == 0);
1135 
1136  ndbrequire(cursor->currRef);
1137  ndbrequire(cursor->saveCurrRef == 0);
1138 
1139  ndbrequire(refToInstance(cursor->currRef) == 0);
1140  sendSignal(cursor->senderRef, GSN_DBINFO_SCANCONF, signal, signal_length, JBB);
1141  return;
1142 }
1143 
1144 void
1145 LocalProxy::execDBINFO_SCANCONF(Signal* signal)
1146 {
1147  jamEntry();
1148  const DbinfoScanConf* conf = (const DbinfoScanConf*)signal->getDataPtr();
1149  Uint32 signal_length = signal->getLength();
1150  ndbrequire(signal_length == DbinfoScanConf::SignalLength+conf->cursor_sz);
1151 
1152  Ndbinfo::ScanCursor* cursor =
1153  (Ndbinfo::ScanCursor*)DbinfoScan::getCursorPtr(conf);
1154 
1155  if (Ndbinfo::ScanCursor::getHasMoreData(cursor->flags))
1156  {
1157  /* The instance has more data and want to continue */
1158  jam();
1159 
1160  /* Swap back saved senderRef */
1161  const Uint32 senderRef = cursor->senderRef = cursor->saveSenderRef;
1162  cursor->saveSenderRef = 0;
1163 
1164  /* Save currRef to continue with same instance again */
1165  cursor->saveCurrRef = cursor->currRef;
1166  cursor->currRef = reference();
1167 
1168  sendSignal(senderRef, GSN_DBINFO_SCANCONF, signal, signal_length, JBB);
1169  return;
1170  }
1171 
1172  if (conf->returnedRows)
1173  {
1174  jam();
1175  /*
1176  The instance has no more data, but it has sent rows
1177  to the API which need to be CONFed
1178  */
1179 
1180  /* Swap back saved senderRef */
1181  const Uint32 senderRef = cursor->senderRef = cursor->saveSenderRef;
1182  cursor->saveSenderRef = 0;
1183 
1184  if (find_next(cursor))
1185  {
1186  /*
1187  There is another instance to continue in - signal 'more data'
1188  and setup saveCurrRef to continue in that instance
1189  */
1190  jam();
1191  Ndbinfo::ScanCursor::setHasMoreData(cursor->flags, true);
1192 
1193  cursor->saveCurrRef = cursor->currRef;
1194  cursor->currRef = reference();
1195  }
1196  else
1197  {
1198  /* There was no more instances to continue in */
1199  ndbrequire(Ndbinfo::ScanCursor::getHasMoreData(cursor->flags) == false);
1200 
1201  ndbrequire(cursor->currRef == reference());
1202  cursor->saveCurrRef = 0;
1203  }
1204 
1205  sendSignal(senderRef, GSN_DBINFO_SCANCONF, signal, signal_length, JBB);
1206  return;
1207  }
1208 
1209 
1210  /* The underlying block reported completed, find next if any */
1211  if (find_next(cursor))
1212  {
1213  jam();
1214 
1215  ndbrequire(cursor->senderRef == reference());
1216  ndbrequire(cursor->saveSenderRef); // Should already be set
1217 
1218  ndbrequire(cursor->saveCurrRef == 0);
1219 
1220  sendSignal(cursor->currRef, GSN_DBINFO_SCANREQ,
1221  signal, signal_length, JBB);
1222  return;
1223  }
1224 
1225  /* Scan in this block and its instances are completed */
1226 
1227  /* Swap back saved senderRef */
1228  const Uint32 senderRef = cursor->senderRef = cursor->saveSenderRef;
1229  cursor->saveSenderRef = 0;
1230 
1231  ndbrequire(cursor->currRef);
1232  ndbrequire(cursor->saveCurrRef == 0);
1233 
1234  sendSignal(senderRef, GSN_DBINFO_SCANCONF, signal, signal_length, JBB);
1235  return;
1236 }
1237 
1238 // GSN_SYNC_REQ
1239 
1240 void
1241 LocalProxy::execSYNC_REQ(Signal* signal)
1242 {
1243  Ss_SYNC_REQ& ss = ssSeize<Ss_SYNC_REQ>();
1244 
1245  ss.m_req = * CAST_CONSTPTR(SyncReq, signal->getDataPtr());
1246 
1247  sendREQ(signal, ss);
1248 }
1249 
1250 void
1251 LocalProxy::sendSYNC_REQ(Signal* signal, Uint32 ssId,
1252  SectionHandle* handle)
1253 {
1254  Ss_SYNC_REQ& ss = ssFind<Ss_SYNC_REQ>(ssId);
1255 
1256  SyncReq * req = CAST_PTR(SyncReq, signal->getDataPtrSend());
1257  req->senderRef = reference();
1258  req->senderData = ssId;
1259  req->prio = ss.m_req.prio;
1260 
1261  sendSignalNoRelease(workerRef(ss.m_worker), GSN_SYNC_REQ,
1262  signal, SyncReq::SignalLength,
1263  JobBufferLevel(ss.m_req.prio), handle);
1264 }
1265 
1266 void
1267 LocalProxy::execSYNC_REF(Signal* signal)
1268 {
1269  SyncRef ref = * CAST_CONSTPTR(SyncRef, signal->getDataPtr());
1270  Ss_SYNC_REQ& ss = ssFind<Ss_SYNC_REQ>(ref.senderData);
1271 
1272  recvREF(signal, ss, ref.errorCode);
1273 }
1274 
1275 void
1276 LocalProxy::execSYNC_CONF(Signal* signal)
1277 {
1278  SyncConf conf = * CAST_CONSTPTR(SyncConf, signal->getDataPtr());
1279  Ss_SYNC_REQ& ss = ssFind<Ss_SYNC_REQ>(conf.senderData);
1280 
1281  recvCONF(signal, ss);
1282 }
1283 
1284 void
1285 LocalProxy::sendSYNC_CONF(Signal* signal, Uint32 ssId)
1286 {
1287  Ss_SYNC_REQ& ss = ssFind<Ss_SYNC_REQ>(ssId);
1288 
1289  if (!lastReply(ss))
1290  return;
1291 
1295  if (ss.m_error == 0)
1296  {
1297  jam();
1298  SyncConf * conf = CAST_PTR(SyncConf, signal->getDataPtrSend());
1299  conf->senderRef = reference();
1300  conf->senderData = ss.m_req.senderData;
1301 
1302  Uint32 prio = ss.m_req.prio;
1303  sendSignal(ss.m_req.senderRef, GSN_SYNC_CONF, signal,
1304  SyncConf::SignalLength,
1305  JobBufferLevel(prio));
1306  }
1307  else
1308  {
1309  jam();
1310  SyncRef * ref = CAST_PTR(SyncRef, signal->getDataPtrSend());
1311  ref->senderRef = reference();
1312  ref->senderData = ss.m_req.senderData;
1313  ref->errorCode = ss.m_error;
1314 
1315  Uint32 prio = ss.m_req.prio;
1316  sendSignal(ss.m_req.senderRef, GSN_SYNC_REF, signal,
1317  SyncRef::SignalLength,
1318  JobBufferLevel(prio));
1319  }
1320  ssRelease<Ss_SYNC_REQ>(ssId);
1321 }
1322 
1323 void
1324 LocalProxy::execSYNC_PATH_REQ(Signal* signal)
1325 {
1326  SyncPathReq* req = CAST_PTR(SyncPathReq, signal->getDataPtrSend());
1327  req->count *= c_workers;
1328 
1329  for (Uint32 i = 0; i < c_workers; i++)
1330  {
1331  jam();
1332  Uint32 ref = numberToRef(number(), workerInstance(i), getOwnNodeId());
1333  sendSignal(ref, GSN_SYNC_PATH_REQ, signal,
1334  signal->getLength(),
1335  JobBufferLevel(req->prio));
1336  }
1337 }
1338 
1339 BLOCK_FUNCTIONS(LocalProxy)