MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Suma.cpp
1 /*
2  Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 #include <my_global.h>
19 #include "Suma.hpp"
20 
21 #include <ndb_version.h>
22 
23 #include <NdbTCP.h>
24 #include <Bitmask.hpp>
25 #include <SimpleProperties.hpp>
26 
27 #include <signaldata/NodeFailRep.hpp>
28 #include <signaldata/ReadNodesConf.hpp>
29 
30 #include <signaldata/ListTables.hpp>
31 #include <signaldata/GetTabInfo.hpp>
32 #include <signaldata/GetTableId.hpp>
33 #include <signaldata/DictTabInfo.hpp>
34 #include <signaldata/SumaImpl.hpp>
35 #include <signaldata/ScanFrag.hpp>
36 #include <signaldata/TransIdAI.hpp>
37 #include <signaldata/CreateTrigImpl.hpp>
38 #include <signaldata/DropTrigImpl.hpp>
39 #include <signaldata/FireTrigOrd.hpp>
40 #include <signaldata/TrigAttrInfo.hpp>
41 #include <signaldata/CheckNodeGroups.hpp>
42 #include <signaldata/CreateTab.hpp>
43 #include <signaldata/DropTab.hpp>
44 #include <signaldata/AlterTable.hpp>
45 #include <signaldata/AlterTab.hpp>
46 #include <signaldata/DihScanTab.hpp>
47 #include <signaldata/SystemError.hpp>
48 #include <signaldata/GCP.hpp>
49 #include <signaldata/StopMe.hpp>
50 
51 #include <signaldata/DictLock.hpp>
52 #include <ndbapi/NdbDictionary.hpp>
53 
54 #include <DebuggerNames.hpp>
55 #include "../dbtup/Dbtup.hpp"
56 #include "../dbdih/Dbdih.hpp"
57 
58 #include <signaldata/CreateNodegroup.hpp>
59 #include <signaldata/CreateNodegroupImpl.hpp>
60 
61 #include <signaldata/DropNodegroup.hpp>
62 #include <signaldata/DropNodegroupImpl.hpp>
63 
64 #include <signaldata/DbinfoScan.hpp>
65 #include <signaldata/TransIdAI.hpp>
66 
67 #include <EventLogger.hpp>
68 extern EventLogger * g_eventLogger;
69 
70 //#define HANDOVER_DEBUG
71 //#define NODEFAIL_DEBUG
72 //#define NODEFAIL_DEBUG2
73 //#define DEBUG_SUMA_SEQUENCE
74 //#define EVENT_DEBUG
75 //#define EVENT_PH3_DEBUG
76 //#define EVENT_DEBUG2
77 #if 1
78 #undef DBUG_ENTER
79 #undef DBUG_PRINT
80 #undef DBUG_RETURN
81 #undef DBUG_VOID_RETURN
82 
83 #if 0
84 #define DBUG_ENTER(a) {ndbout_c("%s:%d >%s", __FILE__, __LINE__, a);}
85 #define DBUG_PRINT(a,b) {ndbout << __FILE__ << ":" << __LINE__ << " " << a << ": "; ndbout_c b ;}
86 #define DBUG_RETURN(a) { ndbout_c("%s:%d <", __FILE__, __LINE__); return(a); }
87 #define DBUG_VOID_RETURN { ndbout_c("%s:%d <", __FILE__, __LINE__); return; }
88 #else
89 #define DBUG_ENTER(a)
90 #define DBUG_PRINT(a,b)
91 #define DBUG_RETURN(a) return a
92 #define DBUG_VOID_RETURN return
93 #endif
94 
95 #endif
96 
97 #define DBG_3R 0
98 
106 Uint32 g_subPtrI = RNIL;
107 static const Uint32 SUMA_SEQUENCE = 0xBABEBABE;
108 
109 static const Uint32 MAX_CONCURRENT_GCP = 2;
110 
111 /**************************************************************
112  *
113  * Start of suma
114  *
115  */
116 
117 #define PRINT_ONLY 0
118 
119 void
120 Suma::execREAD_CONFIG_REQ(Signal* signal)
121 {
122  jamEntry();
123 
124  const ReadConfigReq * req = (ReadConfigReq*)signal->getDataPtr();
125 
126  Uint32 ref = req->senderRef;
127  Uint32 senderData = req->senderData;
128 
129  const ndb_mgm_configuration_iterator * p =
130  m_ctx.m_config.getOwnConfigIterator();
131  ndbrequire(p != 0);
132 
133  // SumaParticipant
134  Uint32 noTables, noAttrs, maxBufferedEpochs;
135  ndb_mgm_get_int_parameter(p, CFG_DICT_TABLE,
136  &noTables);
137  ndb_mgm_get_int_parameter(p, CFG_DICT_ATTRIBUTE,
138  &noAttrs);
139  ndb_mgm_get_int_parameter(p, CFG_DB_MAX_BUFFERED_EPOCHS,
140  &maxBufferedEpochs);
141 
142  c_tablePool.setSize(noTables);
143  c_tables.setSize(noTables);
144 
145  c_subscriptions.setSize(noTables);
146 
147  Uint32 cnt = 0;
148  cnt = 0;
149  ndb_mgm_get_int_parameter(p, CFG_DB_SUBSCRIPTIONS, &cnt);
150  if (cnt == 0)
151  {
152  jam();
153  cnt = noTables;
154  }
155  c_subscriptionPool.setSize(cnt);
156 
157  cnt *= 2;
158  {
159  Uint32 val = 0;
160  ndb_mgm_get_int_parameter(p, CFG_DB_SUBSCRIBERS, &val);
161  if (val)
162  {
163  jam();
164  cnt = val;
165  }
166  }
168 
169  cnt = 0;
170  ndb_mgm_get_int_parameter(p, CFG_DB_SUB_OPERATIONS, &cnt);
171  if (cnt)
172  c_subOpPool.setSize(cnt);
173  else
174  c_subOpPool.setSize(256);
175 
176  c_syncPool.setSize(2);
177 
178  // Trix: max 5 concurrent index stats ops with max 9 words bounds
179  Uint32 noOfBoundWords = 5 * 9;
180 
181  // XXX multiplies number of words by 15 ???
182  c_dataBufferPool.setSize(noAttrs + noOfBoundWords);
183 
184  c_maxBufferedEpochs = maxBufferedEpochs;
185 
186  // Calculate needed gcp pool as 10 records + the ones needed
187  // during a possible api timeout
188  Uint32 dbApiHbInterval, gcpInterval, microGcpInterval = 0;
189  ndb_mgm_get_int_parameter(p, CFG_DB_API_HEARTBEAT_INTERVAL,
190  &dbApiHbInterval);
191  ndb_mgm_get_int_parameter(p, CFG_DB_GCP_INTERVAL,
192  &gcpInterval);
193  ndb_mgm_get_int_parameter(p, CFG_DB_MICRO_GCP_INTERVAL,
194  &microGcpInterval);
195 
196  if (microGcpInterval)
197  {
198  gcpInterval = microGcpInterval;
199  }
200  c_gcp_pool.setSize(10 + (4*dbApiHbInterval+gcpInterval-1)/gcpInterval);
201 
202  c_page_chunk_pool.setSize(50);
203 
204  {
205  SLList<SyncRecord> tmp(c_syncPool);
206  Ptr<SyncRecord> ptr;
207  while(tmp.seize(ptr))
208  new (ptr.p) SyncRecord(* this, c_dataBufferPool);
209  tmp.release();
210  }
211 
212  // Suma
213  c_masterNodeId = getOwnNodeId();
214 
215  c_nodeGroup = c_noNodesInGroup = 0;
216  for (int i = 0; i < MAX_REPLICAS; i++) {
217  c_nodesInGroup[i] = 0;
218  }
219 
220  m_first_free_page= RNIL;
221 
222  c_no_of_buckets = 0;
223  memset(c_buckets, 0, sizeof(c_buckets));
224  for(Uint32 i = 0; i<NO_OF_BUCKETS; i++)
225  {
226  Bucket* bucket= c_buckets+i;
227  bucket->m_buffer_tail = RNIL;
228  bucket->m_buffer_head.m_page_id = RNIL;
229  bucket->m_buffer_head.m_page_pos = Buffer_page::DATA_WORDS;
230  }
231 
232  m_max_seen_gci = 0; // FIRE_TRIG_ORD
233  m_max_sent_gci = 0; // FIRE_TRIG_ORD -> send
234  m_last_complete_gci = 0; // SUB_GCP_COMPLETE_REP
235  m_gcp_complete_rep_count = 0;
236  m_out_of_buffer_gci = 0;
237  m_missing_data = false;
238 
239  c_startup.m_wait_handover= false;
240  c_failedApiNodes.clear();
241 
242  ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend();
243  conf->senderRef = reference();
244  conf->senderData = senderData;
245  sendSignal(ref, GSN_READ_CONFIG_CONF, signal,
246  ReadConfigConf::SignalLength, JBB);
247 }
248 
249 void
251  jamEntry();
252 
253  DBUG_ENTER("Suma::execSTTOR");
254  m_startphase = signal->theData[1];
255  m_typeOfStart = signal->theData[7];
256 
257  DBUG_PRINT("info",("startphase = %u, typeOfStart = %u",
258  m_startphase, m_typeOfStart));
259 
260  if(m_startphase == 3)
261  {
262  jam();
263  void* ptr = m_ctx.m_mm.get_memroot();
264  c_page_pool.set((Buffer_page*)ptr, (Uint32)~0);
265  }
266 
267  if(m_startphase == 5)
268  {
269  jam();
270 
271  if (ERROR_INSERTED(13029)) /* Hold startphase 5 */
272  {
273  sendSignalWithDelay(SUMA_REF, GSN_STTOR, signal,
274  30, signal->getLength());
275  DBUG_VOID_RETURN;
276  }
277 
278  signal->theData[0] = reference();
279  sendSignal(NDBCNTR_REF, GSN_READ_NODESREQ, signal, 1, JBB);
280  DBUG_VOID_RETURN;
281  }
282 
283  if(m_startphase == 7)
284  {
285  if (m_typeOfStart != NodeState::ST_NODE_RESTART &&
286  m_typeOfStart != NodeState::ST_INITIAL_NODE_RESTART)
287  {
288  for( Uint32 i = 0; i < c_no_of_buckets; i++)
289  {
290  if (get_responsible_node(i) == getOwnNodeId())
291  {
292  // I'm running this bucket
293  DBUG_PRINT("info",("bucket %u set to true", i));
294  m_active_buckets.set(i);
295  ndbout_c("m_active_buckets.set(%d)", i);
296  }
297  }
298  }
299 
300  if(!m_active_buckets.isclear())
301  {
302  NdbNodeBitmask tmp;
303  Uint32 bucket = 0;
304  while ((bucket = m_active_buckets.find(bucket)) != Bucket_mask::NotFound)
305  {
306  tmp.set(get_responsible_node(bucket, c_nodes_in_nodegroup_mask));
307  bucket++;
308  }
309 
310  ndbassert(tmp.get(getOwnNodeId()));
311  m_gcp_complete_rep_count = m_active_buckets.count();
312  }
313  else
314  m_gcp_complete_rep_count = 0; // I contribute 1 gcp complete rep
315 
316  if(m_typeOfStart == NodeState::ST_INITIAL_START &&
317  c_masterNodeId == getOwnNodeId())
318  {
319  jam();
320  createSequence(signal);
321  DBUG_VOID_RETURN;
322  }//if
323 
324  if (ERROR_INSERTED(13030))
325  {
326  ndbout_c("Dont start handover");
327  DBUG_VOID_RETURN;
328  }
329  }//if
330 
331  if(m_startphase == 100)
332  {
336  sendSTTORRY(signal);
337  DBUG_VOID_RETURN;
338  }
339 
340  if(m_startphase == 101)
341  {
342  if (m_typeOfStart == NodeState::ST_NODE_RESTART ||
343  m_typeOfStart == NodeState::ST_INITIAL_NODE_RESTART)
344  {
348  c_startup.m_wait_handover= true;
349  check_start_handover(signal);
350  DBUG_VOID_RETURN;
351  }
352  }
353  sendSTTORRY(signal);
354 
355  DBUG_VOID_RETURN;
356 }
357 
358 #include <ndb_version.h>
359 
360 void
361 Suma::send_dict_lock_req(Signal* signal, Uint32 state)
362 {
363  if (state == DictLockReq::SumaStartMe &&
364  !ndbd_suma_dictlock_startme(getNodeInfo(c_masterNodeId).m_version))
365  {
366  jam();
367  goto notsupported;
368  }
369  else if (state == DictLockReq::SumaHandOver &&
370  !ndbd_suma_dictlock_handover(getNodeInfo(c_masterNodeId).m_version))
371  {
372  jam();
373  goto notsupported;
374  }
375 
376  {
377  jam();
378  DictLockReq* req = (DictLockReq*)signal->getDataPtrSend();
379  req->lockType = state;
380  req->userPtr = state;
381  req->userRef = reference();
382  sendSignal(calcDictBlockRef(c_masterNodeId),
383  GSN_DICT_LOCK_REQ, signal, DictLockReq::SignalLength, JBB);
384  }
385  return;
386 
387 notsupported:
388  DictLockConf* conf = (DictLockConf*)signal->getDataPtrSend();
389  conf->userPtr = state;
390  execDICT_LOCK_CONF(signal);
391 }
392 
393 void
394 Suma::execDICT_LOCK_CONF(Signal* signal)
395 {
396  jamEntry();
397 
398  DictLockConf* conf = (DictLockConf*)signal->getDataPtr();
399  Uint32 state = conf->userPtr;
400 
401  switch(state){
402  case DictLockReq::SumaStartMe:
403  jam();
404  c_startup.m_restart_server_node_id = 0;
405  CRASH_INSERTION(13039);
406  send_start_me_req(signal);
407  return;
408  case DictLockReq::SumaHandOver:
409  jam();
410  send_handover_req(signal, SumaHandoverReq::RT_START_NODE);
411  return;
412  default:
413  jam();
414  jamLine(state);
415  ndbrequire(false);
416  }
417 }
418 
419 void
420 Suma::execDICT_LOCK_REF(Signal* signal)
421 {
422  jamEntry();
423 
424  DictLockRef* ref = (DictLockRef*)signal->getDataPtr();
425  Uint32 state = ref->userPtr;
426 
427  ndbrequire(ref->errorCode == DictLockRef::TooManyRequests);
428  signal->theData[0] = SumaContinueB::RETRY_DICT_LOCK;
429  signal->theData[1] = state;
430  sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 300, 2);
431 }
432 
433 void
434 Suma::send_dict_unlock_ord(Signal* signal, Uint32 state)
435 {
436  if (state == DictLockReq::SumaStartMe &&
437  !ndbd_suma_dictlock_startme(getNodeInfo(c_masterNodeId).m_version))
438  {
439  jam();
440  return;
441  }
442  else if (state == DictLockReq::SumaHandOver &&
443  !ndbd_suma_dictlock_handover(getNodeInfo(c_masterNodeId).m_version))
444  {
445  jam();
446  return;
447  }
448 
449  jam();
450  DictUnlockOrd* ord = (DictUnlockOrd*)signal->getDataPtrSend();
451  ord->lockPtr = 0;
452  ord->lockType = state;
453  ord->senderData = state;
454  ord->senderRef = reference();
455  sendSignal(calcDictBlockRef(c_masterNodeId),
456  GSN_DICT_UNLOCK_ORD, signal, DictUnlockOrd::SignalLength, JBB);
457 }
458 
459 void
460 Suma::send_start_me_req(Signal* signal)
461 {
462  Uint32 nodeId= c_startup.m_restart_server_node_id;
463  do {
464  nodeId = c_alive_nodes.find(nodeId + 1);
465 
466  if(nodeId == getOwnNodeId())
467  continue;
468  if(nodeId == NdbNodeBitmask::NotFound)
469  {
470  nodeId = 0;
471  continue;
472  }
473  break;
474  } while(true);
475 
476 
477  infoEvent("Suma: asking node %d to recreate subscriptions on me", nodeId);
478  c_startup.m_restart_server_node_id= nodeId;
479  sendSignal(calcSumaBlockRef(nodeId),
480  GSN_SUMA_START_ME_REQ, signal, 1, JBB);
481 }
482 
483 void
484 Suma::execSUMA_START_ME_REF(Signal* signal)
485 {
486  const SumaStartMeRef* ref= (SumaStartMeRef*)signal->getDataPtr();
487 
488  Uint32 error = ref->errorCode;
489  if (error != SumaStartMeRef::Busy && error != SumaStartMeRef::NotStarted)
490  {
491  jam();
492  // for some reason we did not manage to create a subscription
493  // on the starting node
494  SystemError * const sysErr = (SystemError*)&signal->theData[0];
495  sysErr->errorCode = SystemError::CopySubscriptionRef;
496  sysErr->errorRef = reference();
497  sysErr->data[0] = error;
498  sysErr->data[1] = 0;
499  sendSignal(NDBCNTR_REF, GSN_SYSTEM_ERROR, signal,
500  SystemError::SignalLength, JBB);
501  return;
502  }
503 
504  infoEvent("Suma: node %d refused %d",
505  c_startup.m_restart_server_node_id, ref->errorCode);
506 
507  send_start_me_req(signal);
508 }
509 
510 void
511 Suma::execSUMA_START_ME_CONF(Signal* signal)
512 {
513  infoEvent("Suma: node %d has completed restoring me",
514  c_startup.m_restart_server_node_id);
515  sendSTTORRY(signal);
516  send_dict_unlock_ord(signal, DictLockReq::SumaStartMe);
517  c_startup.m_restart_server_node_id= 0;
518 }
519 
520 void
522 {
523  jam();
524  DBUG_ENTER("Suma::createSequence");
525 
526  UtilSequenceReq * req = (UtilSequenceReq*)signal->getDataPtrSend();
527 
528  req->senderData = RNIL;
529  req->sequenceId = SUMA_SEQUENCE;
530  req->requestType = UtilSequenceReq::Create;
531  sendSignal(DBUTIL_REF, GSN_UTIL_SEQUENCE_REQ,
532  signal, UtilSequenceReq::SignalLength, JBB);
533  // execUTIL_SEQUENCE_CONF will call createSequenceReply()
534  DBUG_VOID_RETURN;
535 }
536 
537 void
538 Suma::createSequenceReply(Signal* signal,
539  UtilSequenceConf * conf,
540  UtilSequenceRef * ref)
541 {
542  jam();
543 
544  if (ref != NULL)
545  {
546  switch ((UtilSequenceRef::ErrorCode)ref->errorCode)
547  {
548  case UtilSequenceRef::NoSuchSequence:
549  ndbrequire(false);
550  case UtilSequenceRef::TCError:
551  {
552  char buf[128];
553  BaseString::snprintf(buf, sizeof(buf),
554  "Startup failed during sequence creation. TC error %d",
555  ref->TCErrorCode);
556  progError(__LINE__, NDBD_EXIT_RESOURCE_ALLOC_ERROR, buf);
557  }
558  }
559  ndbrequire(false);
560  }
561 
562  sendSTTORRY(signal);
563 }
564 
565 void
566 Suma::execREAD_NODESCONF(Signal* signal){
567  jamEntry();
568  ReadNodesConf * const conf = (ReadNodesConf *)signal->getDataPtr();
569 
570  if(getNodeState().getNodeRestartInProgress())
571  {
572  c_alive_nodes.assign(NdbNodeBitmask::Size, conf->startedNodes);
573  c_alive_nodes.set(getOwnNodeId());
574  }
575  else
576  {
577  c_alive_nodes.assign(NdbNodeBitmask::Size, conf->startingNodes);
578  NdbNodeBitmask tmp;
579  tmp.assign(NdbNodeBitmask::Size, conf->startedNodes);
580  ndbrequire(tmp.isclear()); // No nodes can be started during SR
581  }
582 
583  if (DBG_3R)
584  {
585  for (Uint32 i = 0; i<MAX_NDB_NODES; i++)
586  {
587  if (c_alive_nodes.get(i))
588  ndbout_c("%u c_alive_nodes.set(%u)", __LINE__, i);
589  }
590  }
591 
592  c_masterNodeId = conf->masterNodeId;
593 
594  getNodeGroupMembers(signal);
595 }
596 
597 void
599 {
600  jam();
601  DBUG_ENTER("Suma::getNodeGroupMembers");
605  CheckNodeGroups * sd = (CheckNodeGroups*)signal->getDataPtrSend();
606  sd->blockRef = reference();
607  sd->requestType = CheckNodeGroups::GetNodeGroupMembers;
608  sd->nodeId = getOwnNodeId();
609  sd->senderData = RNIL;
610  sendSignal(DBDIH_REF, GSN_CHECKNODEGROUPSREQ, signal,
611  CheckNodeGroups::SignalLength, JBB);
612  DBUG_VOID_RETURN;
613 }
614 
615 static
616 bool
617 valid_seq(Uint32 n, Uint32 r, Uint16 dst[])
618 {
619  Uint16 tmp[MAX_REPLICAS];
620  for (Uint32 i = 0; i<r; i++)
621  {
622  tmp[i] = n % r;
623  for (Uint32 j = 0; j<i; j++)
624  if (tmp[j] == tmp[i])
625  return false;
626  n /= r;
627  }
628 
632  for (Uint32 i = 0; i<r; i++)
633  dst[i] = tmp[r-i-1];
634 
635  return true;
636 }
637 
638 void
639 Suma::fix_nodegroup()
640 {
641  Uint32 i, pos= 0;
642 
643  for (i = 0; i < MAX_NDB_NODES; i++)
644  {
645  if (c_nodes_in_nodegroup_mask.get(i))
646  {
647  c_nodesInGroup[pos++] = i;
648  }
649  }
650 
651  const Uint32 replicas= c_noNodesInGroup = pos;
652 
653  if (replicas)
654  {
655  Uint32 buckets= 1;
656  for(i = 1; i <= replicas; i++)
657  buckets *= i;
658 
659  Uint32 tot = 0;
660  switch(replicas){
661  case 1:
662  tot = 1;
663  break;
664  case 2:
665  tot = 4; // 2^2
666  break;
667  case 3:
668  tot = 27; // 3^3
669  break;
670  case 4:
671  tot = 256; // 4^4
672  break;
673  ndbrequire(false);
674  }
675  Uint32 cnt = 0;
676  for (i = 0; i<tot; i++)
677  {
678  Bucket* ptr= c_buckets + cnt;
679  if (valid_seq(i, replicas, ptr->m_nodes))
680  {
681  jam();
682  if (DBG_3R) printf("bucket %u : ", cnt);
683  for (Uint32 j = 0; j<replicas; j++)
684  {
685  ptr->m_nodes[j] = c_nodesInGroup[ptr->m_nodes[j]];
686  if (DBG_3R) printf("%u ", ptr->m_nodes[j]);
687  }
688  if (DBG_3R) printf("\n");
689  cnt++;
690  }
691  }
692  ndbrequire(cnt == buckets);
693  c_no_of_buckets= buckets;
694  }
695  else
696  {
697  jam();
698  c_no_of_buckets = 0;
699  }
700 }
701 
702 
703 void
704 Suma::execCHECKNODEGROUPSCONF(Signal *signal)
705 {
706  const CheckNodeGroups *sd = (const CheckNodeGroups *)signal->getDataPtrSend();
707  DBUG_ENTER("Suma::execCHECKNODEGROUPSCONF");
708  jamEntry();
709 
710  c_nodeGroup = sd->output;
711  c_nodes_in_nodegroup_mask.assign(sd->mask);
712  c_noNodesInGroup = c_nodes_in_nodegroup_mask.count();
713 
714  fix_nodegroup();
715 
716 #ifndef DBUG_OFF
717  for (Uint32 i = 0; i < c_noNodesInGroup; i++) {
718  DBUG_PRINT("exit",("Suma: NodeGroup %u, me %u, "
719  "member[%u] %u",
720  c_nodeGroup, getOwnNodeId(),
721  i, c_nodesInGroup[i]));
722  }
723 #endif
724 
725  c_startup.m_restart_server_node_id = 0;
726  if (m_typeOfStart == NodeState::ST_NODE_RESTART ||
727  m_typeOfStart == NodeState::ST_INITIAL_NODE_RESTART)
728  {
729  jam();
730 
731  send_dict_lock_req(signal, DictLockReq::SumaStartMe);
732 
733  return;
734  }
735 
736  c_startup.m_restart_server_node_id = 0;
737  sendSTTORRY(signal);
738 
739  DBUG_VOID_RETURN;
740 }
741 
742 void
743 Suma::execAPI_START_REP(Signal* signal)
744 {
745  Uint32 nodeId = signal->theData[0];
746  c_connected_nodes.set(nodeId);
747 
748  check_start_handover(signal);
749 }
750 
751 void
752 Suma::check_start_handover(Signal* signal)
753 {
754  if(c_startup.m_wait_handover)
755  {
756  NodeBitmask tmp;
757  tmp.assign(c_connected_nodes);
758  tmp.bitAND(c_subscriber_nodes);
759  if(!c_subscriber_nodes.equal(tmp))
760  {
761  return;
762  }
763 
764  c_startup.m_wait_handover= false;
765 
766  if (c_no_of_buckets)
767  {
768  jam();
769  send_dict_lock_req(signal, DictLockReq::SumaHandOver);
770  }
771  else
772  {
773  jam();
774  sendSTTORRY(signal);
775  }
776  }
777 }
778 
779 void
780 Suma::send_handover_req(Signal* signal, Uint32 type)
781 {
782  jam();
783  c_startup.m_handover_nodes.assign(c_alive_nodes);
784  c_startup.m_handover_nodes.bitAND(c_nodes_in_nodegroup_mask);
785  c_startup.m_handover_nodes.clear(getOwnNodeId());
786  Uint32 gci= Uint32(m_last_complete_gci >> 32) + 3;
787 
788  SumaHandoverReq* req= (SumaHandoverReq*)signal->getDataPtrSend();
789  char buf[255];
790  c_startup.m_handover_nodes.getText(buf);
791  infoEvent("Suma: initiate handover for %s with nodes %s GCI: %u",
792  (type == SumaHandoverReq::RT_START_NODE ? "startup" : "shutdown"),
793  buf,
794  gci);
795 
796  req->gci = gci;
797  req->nodeId = getOwnNodeId();
798  req->requestType = type;
799 
800  NodeReceiverGroup rg(SUMA, c_startup.m_handover_nodes);
801  sendSignal(rg, GSN_SUMA_HANDOVER_REQ, signal,
802  SumaHandoverReq::SignalLength, JBB);
803 }
804 
805 void
806 Suma::sendSTTORRY(Signal* signal){
807  signal->theData[0] = 0;
808  signal->theData[3] = 1;
809  signal->theData[4] = 3;
810  signal->theData[5] = 5;
811  signal->theData[6] = 7;
812  signal->theData[7] = 100;
813  signal->theData[8] = 101;
814  signal->theData[9] = 255; // No more start phases from missra
815  sendSignal(NDBCNTR_REF, GSN_STTORRY, signal, 10, JBB);
816 }
817 
818 void
819 Suma::execNDB_STTOR(Signal* signal)
820 {
821  jamEntry();
822 }
823 
824 void
826  jamEntry();
827  Uint32 type= signal->theData[0];
828  switch(type){
829  case SumaContinueB::RELEASE_GCI:
830  {
831  Uint32 gci_hi = signal->theData[2];
832  Uint32 gci_lo = signal->theData[3];
833  Uint64 gci = gci_lo | (Uint64(gci_hi) << 32);
834  release_gci(signal, signal->theData[1], gci);
835  return;
836  }
837  case SumaContinueB::RESEND_BUCKET:
838  {
839  Uint32 min_gci_hi = signal->theData[2];
840  Uint32 min_gci_lo = signal->theData[5];
841  Uint32 last_gci_hi = signal->theData[4];
842  Uint32 last_gci_lo = signal->theData[6];
843  Uint64 min_gci = min_gci_lo | (Uint64(min_gci_hi) << 32);
844  Uint64 last_gci = last_gci_lo | (Uint64(last_gci_hi) << 32);
845  resend_bucket(signal,
846  signal->theData[1],
847  min_gci,
848  signal->theData[3],
849  last_gci);
850  return;
851  }
852  case SumaContinueB::OUT_OF_BUFFER_RELEASE:
853  out_of_buffer_release(signal, signal->theData[1]);
854  return;
855  case SumaContinueB::API_FAIL_GCI_LIST:
856  api_fail_gci_list(signal, signal->theData[1]);
857  return;
858  case SumaContinueB::API_FAIL_SUBSCRIBER_LIST:
860  signal->theData[1]);
861  return;
862  case SumaContinueB::API_FAIL_SUBSCRIPTION:
863  api_fail_subscription(signal);
864  return;
865  case SumaContinueB::SUB_STOP_REQ:
866  sub_stop_req(signal);
867  return;
868  case SumaContinueB::RETRY_DICT_LOCK:
869  jam();
870  send_dict_lock_req(signal, signal->theData[1]);
871  return;
872  }
873 }
874 
875 /*****************************************************************************
876  *
877  * Node state handling
878  *
879  *****************************************************************************/
880 
881 void Suma::execAPI_FAILREQ(Signal* signal)
882 {
883  jamEntry();
884  DBUG_ENTER("Suma::execAPI_FAILREQ");
885  Uint32 failedApiNode = signal->theData[0];
886  ndbrequire(signal->theData[1] == QMGR_REF); // As callback hard-codes QMGR
887 
888  c_connected_nodes.clear(failedApiNode);
889 
890  if (c_failedApiNodes.get(failedApiNode))
891  {
892  jam();
893  /* Being handled already, just conf */
894  goto CONF;
895  }
896 
897  if (!c_subscriber_nodes.get(failedApiNode))
898  {
899  jam();
900  /* No Subscribers on that node, no SUMA
901  * specific work to do
902  */
903  goto BLOCK_CLEANUP;
904  }
905 
906  c_failedApiNodes.set(failedApiNode);
907  c_subscriber_nodes.clear(failedApiNode);
908  c_subscriber_per_node[failedApiNode] = 0;
909  c_failedApiNodesState[failedApiNode] = __LINE__;
910 
911  check_start_handover(signal);
912 
913  signal->theData[0] = SumaContinueB::API_FAIL_GCI_LIST;
914  signal->theData[1] = failedApiNode;
915  sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 2, JBB);
916  return;
917 
918 BLOCK_CLEANUP:
919  jam();
920  api_fail_block_cleanup(signal, failedApiNode);
921  DBUG_VOID_RETURN;
922 
923 CONF:
924  jam();
925  signal->theData[0] = failedApiNode;
926  signal->theData[1] = reference();
927  sendSignal(QMGR_REF, GSN_API_FAILCONF, signal, 2, JBB);
928 
929  c_failedApiNodesState[failedApiNode] = 0;
930 
931  DBUG_VOID_RETURN;
932 }//execAPI_FAILREQ()
933 
934 void
935 Suma::api_fail_block_cleanup_callback(Signal* signal,
936  Uint32 failedNodeId,
937  Uint32 elementsCleaned)
938 {
939  jamEntry();
940 
941  /* Suma should not have any block level elements
942  * to be cleaned (Fragmented send/receive structures etc.)
943  * As it only uses Fragmented send/receive locally
944  */
945  ndbassert(elementsCleaned == 0);
946 
947  /* Node failure handling is complete */
948  signal->theData[0] = failedNodeId;
949  signal->theData[1] = reference();
950  sendSignal(QMGR_REF, GSN_API_FAILCONF, signal, 2, JBB);
951  c_failedApiNodes.clear(failedNodeId);
952  c_failedApiNodesState[failedNodeId] = 0;
953 }
954 
955 void
956 Suma::api_fail_block_cleanup(Signal* signal, Uint32 failedNode)
957 {
958  jam();
959 
960  c_failedApiNodesState[failedNode] = __LINE__;
961 
962  Callback cb = {safe_cast(&Suma::api_fail_block_cleanup_callback),
963  failedNode};
964 
965  simBlockNodeFailure(signal, failedNode, cb);
966 }
967 
968 void
969 Suma::api_fail_gci_list(Signal* signal, Uint32 nodeId)
970 {
971  jam();
972 
973  Ptr<Gcp_record> gcp;
974  if (c_gcp_list.first(gcp))
975  {
976  jam();
977  gcp.p->m_subscribers.bitAND(c_subscriber_nodes);
978 
979  if (gcp.p->m_subscribers.isclear())
980  {
981  jam();
982 
983  SubGcpCompleteAck* ack = (SubGcpCompleteAck*)signal->getDataPtrSend();
984  ack->rep.gci_hi = Uint32(gcp.p->m_gci >> 32);
985  ack->rep.gci_lo = Uint32(gcp.p->m_gci);
986  ack->rep.senderRef = reference();
987  NodeReceiverGroup rg(SUMA, c_nodes_in_nodegroup_mask);
988  sendSignal(rg, GSN_SUB_GCP_COMPLETE_ACK, signal,
989  SubGcpCompleteAck::SignalLength, JBB);
990 
991  c_gcp_list.release(gcp);
992 
993  c_failedApiNodesState[nodeId] = __LINE__;
994  signal->theData[0] = SumaContinueB::API_FAIL_GCI_LIST;
995  signal->theData[1] = nodeId;
996  sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 2, JBB);
997  return;
998  }
999  }
1000 
1001  if (ERROR_INSERTED(13023))
1002  {
1003  CLEAR_ERROR_INSERT_VALUE;
1004  }
1005 
1006  signal->theData[0] = SumaContinueB::API_FAIL_SUBSCRIBER_LIST;
1007  signal->theData[1] = nodeId;
1008  signal->theData[2] = RNIL; // SubOpPtr
1009  signal->theData[3] = RNIL; // c_subscribers bucket
1010  signal->theData[4] = RNIL; // subscriptionId
1011  signal->theData[5] = RNIL; // SubscriptionKey
1012 
1013  Ptr<SubOpRecord> subOpPtr;
1014  if (c_subOpPool.seize(subOpPtr))
1015  {
1016  c_failedApiNodesState[nodeId] = __LINE__;
1017  signal->theData[2] = subOpPtr.i;
1018  sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 6, JBB);
1019  }
1020  else
1021  {
1022  c_failedApiNodesState[nodeId] = __LINE__;
1023  sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB);
1024  }
1025 
1026  return;
1027 }
1028 
1029 void
1031 {
1032  jam();
1033  Ptr<SubOpRecord> subOpPtr;
1034 
1035  if (c_outstanding_drop_trig_req > 9)
1036  {
1037  jam();
1042  sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 100,
1043  signal->getLength());
1044  return;
1045  }
1046 
1047  subOpPtr.i = signal->theData[2];
1048  if (subOpPtr.i == RNIL)
1049  {
1050  if (c_subOpPool.seize(subOpPtr))
1051  {
1052  signal->theData[3] = RNIL;
1053  }
1054  else
1055  {
1056  jam();
1057  sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB);
1058  c_failedApiNodesState[nodeId] = __LINE__;
1059  return;
1060  }
1061  }
1062  else
1063  {
1064  jam();
1065  c_subOpPool.getPtr(subOpPtr);
1066  }
1067 
1068  Uint32 bucket = signal->theData[3];
1069  Uint32 subscriptionId = signal->theData[4];
1070  Uint32 subscriptionKey = signal->theData[5];
1071 
1073  if (bucket == RNIL)
1074  {
1075  jam();
1076  c_subscriptions.first(iter);
1077  c_failedApiNodesState[nodeId] = __LINE__;
1078  }
1079  else
1080  {
1081  jam();
1082 
1083  Subscription key;
1084  key.m_subscriptionId = subscriptionId;
1085  key.m_subscriptionKey = subscriptionKey;
1086  if (c_subscriptions.find(iter.curr, key) == false)
1087  {
1088  jam();
1092  c_subscriptions.next(bucket, iter);
1093  c_failedApiNodesState[nodeId] = __LINE__;
1094  }
1095  else
1096  {
1097  iter.bucket = bucket;
1098  }
1099  }
1100 
1101  if (iter.curr.isNull())
1102  {
1103  jam();
1104  api_fail_block_cleanup(signal, nodeId);
1105  c_failedApiNodesState[nodeId] = __LINE__;
1106  return;
1107  }
1108 
1109  subOpPtr.p->m_opType = SubOpRecord::R_API_FAIL_REQ;
1110  subOpPtr.p->m_subPtrI = iter.curr.i;
1111  subOpPtr.p->m_senderRef = nodeId;
1112  subOpPtr.p->m_senderData = iter.bucket;
1113 
1114  LocalDLFifoList<SubOpRecord> list(c_subOpPool, iter.curr.p->m_stop_req);
1115  bool empty = list.isEmpty();
1116  list.add(subOpPtr);
1117 
1118  if (empty)
1119  {
1120  jam();
1121  c_failedApiNodesState[nodeId] = __LINE__;
1122  signal->theData[0] = SumaContinueB::API_FAIL_SUBSCRIPTION;
1123  signal->theData[1] = subOpPtr.i;
1124  signal->theData[2] = RNIL;
1125  sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB);
1126  }
1127  else
1128  {
1129  jam();
1130  c_failedApiNodesState[nodeId] = __LINE__;
1131  }
1132 }
1133 
1134 void
1136 {
1137  jam();
1138  Ptr<SubOpRecord> subOpPtr;
1139  c_subOpPool.getPtr(subOpPtr, signal->theData[1]);
1140 
1141  Uint32 nodeId = subOpPtr.p->m_senderRef;
1142 
1143  Ptr<Subscription> subPtr;
1144  c_subscriptionPool.getPtr(subPtr, subOpPtr.p->m_subPtrI);
1145 
1146  Ptr<Subscriber> ptr;
1147  {
1148  LocalDLList<Subscriber> list(c_subscriberPool, subPtr.p->m_subscribers);
1149  if (signal->theData[2] == RNIL)
1150  {
1151  jam();
1152  list.first(ptr);
1153  }
1154  else
1155  {
1156  jam();
1157  list.getPtr(ptr, signal->theData[2]);
1158  }
1159 
1160  for (Uint32 i = 0; i<32 && !ptr.isNull(); i++)
1161  {
1162  jam();
1163  if (refToNode(ptr.p->m_senderRef) == nodeId)
1164  {
1165  jam();
1166 
1167  Ptr<Subscriber> tmp = ptr;
1168  list.next(ptr);
1169  list.remove(tmp);
1170 
1174  bool report = subPtr.p->m_options & Subscription::REPORT_SUBSCRIBE;
1175 
1176  send_sub_start_stop_event(signal, tmp, NdbDictionary::Event::_TE_STOP,
1177  report, list);
1178 
1180  }
1181  else
1182  {
1183  jam();
1184  list.next(ptr);
1185  }
1186  }
1187  }
1188 
1189  if (!ptr.isNull())
1190  {
1191  jam();
1192  c_failedApiNodesState[nodeId] = __LINE__;
1193  signal->theData[0] = SumaContinueB::API_FAIL_SUBSCRIPTION;
1194  signal->theData[1] = subOpPtr.i;
1195  signal->theData[2] = ptr.i;
1196  sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB);
1197  return;
1198  }
1199 
1200  // Start potential waiter(s)
1201  check_remove_queue(signal, subPtr, subOpPtr, true, false);
1202  check_release_subscription(signal, subPtr);
1203 
1204  // Continue iterating through subscriptions
1206  iter.bucket = subOpPtr.p->m_senderData;
1207  iter.curr = subPtr;
1208 
1209  if (c_subscriptions.next(iter))
1210  {
1211  jam();
1212  c_failedApiNodesState[nodeId] = __LINE__;
1213  signal->theData[0] = SumaContinueB::API_FAIL_SUBSCRIBER_LIST;
1214  signal->theData[1] = nodeId;
1215  signal->theData[2] = subOpPtr.i;
1216  signal->theData[3] = iter.bucket;
1217  signal->theData[4] = iter.curr.p->m_subscriptionId; // subscriptionId
1218  signal->theData[5] = iter.curr.p->m_subscriptionKey; // SubscriptionKey
1219  sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 6, JBB);
1220  return;
1221  }
1222 
1223  c_subOpPool.release(subOpPtr);
1224 
1225  /* Now do block level cleanup */
1226  api_fail_block_cleanup(signal, nodeId);
1227 }
1228 
1229 void
1230 Suma::execNODE_FAILREP(Signal* signal){
1231  jamEntry();
1232  DBUG_ENTER("Suma::execNODE_FAILREP");
1233  ndbassert(signal->getNoOfSections() == 0);
1234 
1235  const NodeFailRep * rep = (NodeFailRep*)signal->getDataPtr();
1236  NdbNodeBitmask failed; failed.assign(NdbNodeBitmask::Size, rep->theNodes);
1237 
1238  if(c_restart.m_ref && failed.get(refToNode(c_restart.m_ref)))
1239  {
1240  jam();
1241 
1242  if (c_restart.m_waiting_on_self)
1243  {
1244  jam();
1245  c_restart.m_abort = 1;
1246  }
1247  else
1248  {
1249  jam();
1250  Ptr<Subscription> subPtr;
1251  c_subscriptionPool.getPtr(subPtr, c_restart.m_subPtrI);
1252  abort_start_me(signal, subPtr, false);
1253  }
1254  }
1255 
1256  if (ERROR_INSERTED(13032))
1257  {
1258  Uint32 node = c_subscriber_nodes.find(0);
1259  if (node != NodeBitmask::NotFound)
1260  {
1261  ndbout_c("Inserting API_FAILREQ node: %u", node);
1262  signal->theData[0] = node;
1263  sendSignal(QMGR_REF, GSN_API_FAILREQ, signal, 1, JBA);
1264  }
1265  }
1266 
1267  NdbNodeBitmask tmp;
1268  tmp.assign(c_alive_nodes);
1269  tmp.bitANDC(failed);
1270 
1271  NdbNodeBitmask takeover_nodes;
1272 
1273  if(c_nodes_in_nodegroup_mask.overlaps(failed))
1274  {
1275  for( Uint32 i = 0; i < c_no_of_buckets; i++)
1276  {
1277  if(m_active_buckets.get(i))
1278  continue;
1279  else if(m_switchover_buckets.get(i))
1280  {
1281  Uint32 state= c_buckets[i].m_state;
1282  if((state & Bucket::BUCKET_HANDOVER) &&
1283  failed.get(get_responsible_node(i)))
1284  {
1285  m_active_buckets.set(i);
1286  m_switchover_buckets.clear(i);
1287  ndbout_c("aborting handover");
1288  }
1289  else if(state & Bucket::BUCKET_STARTING)
1290  {
1291  progError(__LINE__, NDBD_EXIT_SYSTEM_ERROR,
1292  "Nodefailure during SUMA takeover");
1293  }
1294  else if (state & Bucket::BUCKET_SHUTDOWN_TO)
1295  {
1296  jam();
1297  c_buckets[i].m_state &= ~Uint32(Bucket::BUCKET_SHUTDOWN_TO);
1298  m_switchover_buckets.clear(i);
1299  ndbrequire(get_responsible_node(i, tmp) == getOwnNodeId());
1300  start_resend(signal, i);
1301  }
1302  }
1303  else if(get_responsible_node(i, tmp) == getOwnNodeId())
1304  {
1305  start_resend(signal, i);
1306  }
1307  }
1308  }
1309 
1310  /* Block level cleanup */
1311  for(unsigned i = 1; i < MAX_NDB_NODES; i++) {
1312  jam();
1313  if(failed.get(i)) {
1314  jam();
1315  Uint32 elementsCleaned = simBlockNodeFailure(signal, i); // No callback
1316  ndbassert(elementsCleaned == 0); // As Suma has no remote fragmented signals
1317  (void) elementsCleaned; // Avoid compiler error
1318  }//if
1319  }//for
1320 
1321  c_alive_nodes.assign(tmp);
1322 
1323  DBUG_VOID_RETURN;
1324 }
1325 
1326 void
1328  jamEntry();
1329 
1330  const Uint32 senderRef = signal->theData[0];
1331  const Uint32 nodeId = signal->theData[1];
1332 
1333  ndbrequire(!c_alive_nodes.get(nodeId));
1334  if (c_nodes_in_nodegroup_mask.get(nodeId))
1335  {
1342  c_alive_nodes.set(nodeId);
1343 
1349  }
1350  else
1351  {
1352  jam();
1353  c_alive_nodes.set(nodeId);
1354  }
1355 
1356  signal->theData[0] = nodeId;
1357  signal->theData[1] = reference();
1358  sendSignal(senderRef, GSN_INCL_NODECONF, signal, 2, JBB);
1359 }
1360 
1361 void
1362 Suma::execSIGNAL_DROPPED_REP(Signal* signal){
1363  jamEntry();
1364  ndbrequire(false);
1365 }
1366 
1367 /********************************************************************
1368  *
1369  * Dump state
1370  *
1371  */
1372 static
1373 const char*
1374 cstr(Suma::Subscription::State s)
1375 {
1376  switch(s){
1377  case Suma::Subscription::UNDEFINED:
1378  return "undefined";
1379  case Suma::Subscription::DEFINED:
1380  return "defined";
1381  case Suma::Subscription::DEFINING:
1382  return "defining";
1383  }
1384  return "<unknown>";
1385 }
1386 
1387 static
1388 const char*
1389 cstr(Suma::Subscription::TriggerState s)
1390 {
1391  switch(s){
1392  case Suma::Subscription::T_UNDEFINED:
1393  return "undefined";
1394  case Suma::Subscription::T_CREATING:
1395  return "creating";
1396  case Suma::Subscription::T_DEFINED:
1397  return "defined";
1398  case Suma::Subscription::T_DROPPING:
1399  return "dropping";
1400  case Suma::Subscription::T_ERROR:
1401  return "error";
1402  }
1403  return "<uknown>";
1404 }
1405 
1406 static
1407 const char*
1408 cstr(Suma::Subscription::Options s)
1409 {
1410  static char buf[256];
1411  buf[0] = 0;
1412  strcat(buf, "[");
1413  if (s & Suma::Subscription::REPORT_ALL)
1414  strcat(buf, " reportall");
1415  if (s & Suma::Subscription::REPORT_SUBSCRIBE)
1416  strcat(buf, " reportsubscribe");
1417  if (s & Suma::Subscription::MARKED_DROPPED)
1418  strcat(buf, " dropped");
1419  if (s & Suma::Subscription::NO_REPORT_DDL)
1420  strcat(buf, " noreportddl");
1421  strcat(buf, " ]");
1422  return buf;
1423 }
1424 
1425 static
1426 const char*
1427 cstr(Suma::Table::State s)
1428 {
1429  switch(s){
1430  case Suma::Table::UNDEFINED:
1431  return "undefined";
1432  case Suma::Table::DEFINING:
1433  return "defining";
1434  case Suma::Table::DEFINED:
1435  return "defined";
1436  case Suma::Table::DROPPED:
1437  return "dropped";
1438  }
1439  return "<unknown>";
1440 }
1441 
1442 void
1443 Suma::execDUMP_STATE_ORD(Signal* signal){
1444  jamEntry();
1445 
1446  Uint32 tCase = signal->theData[0];
1447 #if 0
1448  if(tCase >= 8000 && tCase <= 8003){
1449  SubscriptionPtr subPtr;
1450  c_subscriptions.getPtr(subPtr, g_subPtrI);
1451 
1452  Ptr<SyncRecord> syncPtr;
1453  c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);
1454 
1455  if(tCase == 8000){
1456  syncPtr.p->startMeta(signal);
1457  }
1458 
1459  if(tCase == 8001){
1460  syncPtr.p->startScan(signal);
1461  }
1462 
1463  if(tCase == 8002){
1464  syncPtr.p->startTrigger(signal);
1465  }
1466 
1467  if(tCase == 8003){
1468  subPtr.p->m_subscriptionType = SubCreateReq::SingleTableScan;
1469  LocalDataBuffer<15> attrs(c_dataBufferPool, syncPtr.p->m_attributeList);
1470  Uint32 tab = 0;
1471  Uint32 att[] = { 0, 1, 1 };
1472  syncPtr.p->m_tableList.append(&tab, 1);
1473  attrs.append(att, 3);
1474  }
1475  }
1476 #endif
1477  if(tCase == 8004){
1478  infoEvent("Suma: c_subscriberPool size: %d free: %d",
1479  c_subscriberPool.getSize(),
1480  c_subscriberPool.getNoOfFree());
1481 
1482  infoEvent("Suma: c_tablePool size: %d free: %d",
1483  c_tablePool.getSize(),
1484  c_tablePool.getNoOfFree());
1485 
1486  infoEvent("Suma: c_subscriptionPool size: %d free: %d",
1487  c_subscriptionPool.getSize(),
1488  c_subscriptionPool.getNoOfFree());
1489 
1490  infoEvent("Suma: c_syncPool size: %d free: %d",
1491  c_syncPool.getSize(),
1492  c_syncPool.getNoOfFree());
1493 
1494  infoEvent("Suma: c_dataBufferPool size: %d free: %d",
1495  c_dataBufferPool.getSize(),
1496  c_dataBufferPool.getNoOfFree());
1497 
1498  infoEvent("Suma: c_subOpPool size: %d free: %d",
1499  c_subOpPool.getSize(),
1500  c_subOpPool.getNoOfFree());
1501 
1502 #if 0
1503  infoEvent("Suma: c_dataSubscribers count: %d",
1504  count_subscribers(c_dataSubscribers));
1505  infoEvent("Suma: c_prepDataSubscribers count: %d",
1506  count_subscribers(c_prepDataSubscribers));
1507 #endif
1508  }
1509 
1510  if(tCase == 8005)
1511  {
1512  for(Uint32 i = 0; i<c_no_of_buckets; i++)
1513  {
1514  Bucket* ptr= c_buckets + i;
1515  infoEvent("Bucket %d %d%d-%x switch gci: %llu max_acked_gci: %llu max_gci: %llu tail: %d head: %d",
1516  i,
1517  m_active_buckets.get(i),
1518  m_switchover_buckets.get(i),
1519  ptr->m_state,
1520  ptr->m_switchover_gci,
1521  ptr->m_max_acked_gci,
1522  ptr->m_buffer_head.m_max_gci,
1523  ptr->m_buffer_tail,
1524  ptr->m_buffer_head.m_page_id);
1525  }
1526  }
1527 
1528  if (tCase == 8006)
1529  {
1530  SET_ERROR_INSERT_VALUE(13029);
1531  }
1532 
1533  if (tCase == 8007)
1534  {
1535  c_startup.m_restart_server_node_id = MAX_NDB_NODES + 1;
1536  SET_ERROR_INSERT_VALUE(13029);
1537  }
1538 
1539  if (tCase == 8008)
1540  {
1541  CLEAR_ERROR_INSERT_VALUE;
1542  }
1543 
1544  if (tCase == 8010)
1545  {
1546  char buf1[255], buf2[255];
1547  c_subscriber_nodes.getText(buf1);
1548  c_connected_nodes.getText(buf2);
1549  infoEvent("c_subscriber_nodes: %s", buf1);
1550  infoEvent("c_connected_nodes: %s", buf2);
1551  }
1552 
1553  if (tCase == 8009)
1554  {
1555  if (ERROR_INSERTED(13030))
1556  {
1557  CLEAR_ERROR_INSERT_VALUE;
1558  sendSTTORRY(signal);
1559  }
1560  else
1561  {
1562  SET_ERROR_INSERT_VALUE(13030);
1563  }
1564  return;
1565  }
1566 
1567  if (tCase == 8011)
1568  {
1569  jam();
1570  Uint32 bucket = signal->theData[1];
1572  if (signal->getLength() == 1)
1573  {
1574  jam();
1575  bucket = 0;
1576  infoEvent("-- Starting dump of subscribers --");
1577  }
1578 
1579  c_tables.next(bucket, it);
1580  const Uint32 RT_BREAK = 16;
1581  for(Uint32 i = 0; i<RT_BREAK || it.bucket == bucket; i++)
1582  {
1583  jam();
1584  if(it.curr.i == RNIL)
1585  {
1586  jam();
1587  infoEvent("-- Ending dump of subscribers --");
1588  return;
1589  }
1590 
1591  infoEvent("Table %u ver %u",
1592  it.curr.p->m_tableId,
1593  it.curr.p->m_schemaVersion);
1594 
1595  Uint32 cnt = 0;
1596  Ptr<Subscription> subPtr;
1597  LocalDLList<Subscription> subList(c_subscriptionPool,
1598  it.curr.p->m_subscriptions);
1599  for(subList.first(subPtr); !subPtr.isNull(); subList.next(subPtr))
1600  {
1601  infoEvent(" Subcription %u", subPtr.i);
1602  {
1603  Ptr<Subscriber> ptr;
1605  subPtr.p->m_subscribers);
1606  for (list.first(ptr); !ptr.isNull(); list.next(ptr), i++)
1607  {
1608  jam();
1609  cnt++;
1610  infoEvent(" Subscriber [ %x %u %u ]",
1611  ptr.p->m_senderRef,
1612  ptr.p->m_senderData,
1613  subPtr.i);
1614  }
1615  }
1616 
1617  {
1618  Ptr<SubOpRecord> ptr;
1619  LocalDLFifoList<SubOpRecord> list(c_subOpPool,
1620  subPtr.p->m_create_req);
1621 
1622  for (list.first(ptr); !ptr.isNull(); list.next(ptr), i++)
1623  {
1624  jam();
1625  infoEvent(" create [ %x %u ]",
1626  ptr.p->m_senderRef,
1627  ptr.p->m_senderData);
1628  }
1629  }
1630 
1631  {
1632  Ptr<SubOpRecord> ptr;
1633  LocalDLFifoList<SubOpRecord> list(c_subOpPool,
1634  subPtr.p->m_start_req);
1635 
1636  for (list.first(ptr); !ptr.isNull(); list.next(ptr), i++)
1637  {
1638  jam();
1639  infoEvent(" start [ %x %u ]",
1640  ptr.p->m_senderRef,
1641  ptr.p->m_senderData);
1642  }
1643  }
1644 
1645  {
1646  Ptr<SubOpRecord> ptr;
1647  LocalDLFifoList<SubOpRecord> list(c_subOpPool,
1648  subPtr.p->m_stop_req);
1649 
1650  for (list.first(ptr); !ptr.isNull(); list.next(ptr), i++)
1651  {
1652  jam();
1653  infoEvent(" stop [ %u %x %u ]",
1654  ptr.p->m_opType,
1655  ptr.p->m_senderRef,
1656  ptr.p->m_senderData);
1657  }
1658  }
1659  }
1660  infoEvent("Table %u #subscribers %u", it.curr.p->m_tableId, cnt);
1661  c_tables.next(it);
1662  }
1663 
1664  signal->theData[0] = tCase;
1665  signal->theData[1] = it.bucket;
1666  sendSignalWithDelay(reference(), GSN_DUMP_STATE_ORD, signal, 100, 2);
1667  return;
1668  }
1669 
1670  if (tCase == 8012)
1671  {
1672  jam();
1673  Uint32 bucket = signal->theData[1];
1675  if (signal->getLength() == 1)
1676  {
1677  jam();
1678  bucket = 0;
1679  infoEvent("-- Starting dump of subscribers --");
1680  }
1681 
1682  c_subscriptions.next(bucket, it);
1683  const Uint32 RT_BREAK = 16;
1684  for(Uint32 i = 0; i<RT_BREAK || it.bucket == bucket; i++)
1685  {
1686  jam();
1687  if(it.curr.i == RNIL)
1688  {
1689  jam();
1690  infoEvent("-- Ending dump of subscribers --");
1691  return;
1692  }
1693 
1694  Ptr<Subscription> subPtr = it.curr;
1695  Ptr<Table> tabPtr;
1696  c_tablePool.getPtr(tabPtr, subPtr.p->m_table_ptrI);
1697  infoEvent("Subcription %u id: 0x%.8x key: 0x%.8x state: %s",
1698  subPtr.i,
1699  subPtr.p->m_subscriptionId,
1700  subPtr.p->m_subscriptionKey,
1701  cstr(subPtr.p->m_state));
1702  infoEvent(" trigger state: %s options: %s",
1703  cstr(subPtr.p->m_trigger_state),
1704  cstr((Suma::Subscription::Options)subPtr.p->m_options));
1705  infoEvent(" tablePtr: %u tableId: %u schemaVersion: 0x%.8x state: %s",
1706  tabPtr.i,
1707  subPtr.p->m_tableId,
1708  tabPtr.p->m_schemaVersion,
1709  cstr(tabPtr.p->m_state));
1710  {
1711  Ptr<Subscriber> ptr;
1713  subPtr.p->m_subscribers);
1714  for (list.first(ptr); !ptr.isNull(); list.next(ptr), i++)
1715  {
1716  jam();
1717  infoEvent(" Subscriber [ %x %u %u ]",
1718  ptr.p->m_senderRef,
1719  ptr.p->m_senderData,
1720  subPtr.i);
1721  }
1722  }
1723 
1724  {
1725  Ptr<SubOpRecord> ptr;
1726  LocalDLFifoList<SubOpRecord> list(c_subOpPool,
1727  subPtr.p->m_create_req);
1728 
1729  for (list.first(ptr); !ptr.isNull(); list.next(ptr), i++)
1730  {
1731  jam();
1732  infoEvent(" create [ %x %u ]",
1733  ptr.p->m_senderRef,
1734  ptr.p->m_senderData);
1735  }
1736  }
1737 
1738  {
1739  Ptr<SubOpRecord> ptr;
1740  LocalDLFifoList<SubOpRecord> list(c_subOpPool,
1741  subPtr.p->m_start_req);
1742 
1743  for (list.first(ptr); !ptr.isNull(); list.next(ptr), i++)
1744  {
1745  jam();
1746  infoEvent(" start [ %x %u ]",
1747  ptr.p->m_senderRef,
1748  ptr.p->m_senderData);
1749  }
1750  }
1751 
1752  {
1753  Ptr<SubOpRecord> ptr;
1754  LocalDLFifoList<SubOpRecord> list(c_subOpPool,
1755  subPtr.p->m_stop_req);
1756 
1757  for (list.first(ptr); !ptr.isNull(); list.next(ptr), i++)
1758  {
1759  jam();
1760  infoEvent(" stop [ %u %x %u ]",
1761  ptr.p->m_opType,
1762  ptr.p->m_senderRef,
1763  ptr.p->m_senderData);
1764  }
1765  }
1766  c_subscriptions.next(it);
1767  }
1768 
1769  signal->theData[0] = tCase;
1770  signal->theData[1] = it.bucket;
1771  sendSignalWithDelay(reference(), GSN_DUMP_STATE_ORD, signal, 100, 2);
1772  return;
1773  }
1774 
1775  if (tCase == 7019 && signal->getLength() == 2)
1776  {
1777  jam();
1778  Uint32 nodeId = signal->theData[1];
1779  if (nodeId < MAX_NODES)
1780  {
1781  warningEvent(" Suma 7019 %u line: %u", nodeId,
1782  c_failedApiNodesState[nodeId]);
1783  warningEvent(" c_connected_nodes.get(): %u",
1784  c_connected_nodes.get(nodeId));
1785  warningEvent(" c_failedApiNodes.get(): %u",
1786  c_failedApiNodes.get(nodeId));
1787  warningEvent(" c_subscriber_nodes.get(): %u",
1788  c_subscriber_nodes.get(nodeId));
1789  warningEvent(" c_subscriber_per_node[%u]: %u",
1790  nodeId, c_subscriber_per_node[nodeId]);
1791  }
1792  else
1793  {
1794  warningEvent(" SUMP: dump-7019 to unknown node: %u", nodeId);
1795  }
1796  }
1797 }
1798 
1799 void Suma::execDBINFO_SCANREQ(Signal *signal)
1800 {
1801  DbinfoScanReq req= *(DbinfoScanReq*)signal->theData;
1802  const Ndbinfo::ScanCursor* cursor =
1803  CAST_CONSTPTR(Ndbinfo::ScanCursor, DbinfoScan::getCursorPtr(&req));
1804  Ndbinfo::Ratelimit rl;
1805 
1806  jamEntry();
1807 
1808  switch(req.tableId){
1809  case Ndbinfo::POOLS_TABLEID:
1810  {
1811  Ndbinfo::pool_entry pools[] =
1812  {
1813  { "Subscriber",
1814  c_subscriberPool.getUsed(),
1815  c_subscriberPool.getSize(),
1816  c_subscriberPool.getEntrySize(),
1817  c_subscriberPool.getUsedHi(),
1818  { CFG_DB_SUBSCRIBERS,
1819  CFG_DB_SUBSCRIPTIONS,
1820  CFG_DB_NO_TABLES,0 }},
1821  { "Table",
1822  c_tablePool.getUsed(),
1823  c_tablePool.getSize(),
1824  c_tablePool.getEntrySize(),
1825  c_tablePool.getUsedHi(),
1826  { CFG_DB_NO_TABLES,0,0,0 }},
1827  { "Subscription",
1828  c_subscriptionPool.getUsed(),
1829  c_subscriptionPool.getSize(),
1830  c_subscriptionPool.getEntrySize(),
1831  c_subscriptionPool.getUsedHi(),
1832  { CFG_DB_SUBSCRIPTIONS,
1833  CFG_DB_NO_TABLES,0,0 }},
1834  { "Sync",
1835  c_syncPool.getUsed(),
1836  c_syncPool.getSize(),
1837  c_syncPool.getEntrySize(),
1838  c_syncPool.getUsedHi(),
1839  { 0,0,0,0 }},
1840  { "Data Buffer",
1841  c_dataBufferPool.getUsed(),
1842  c_dataBufferPool.getSize(),
1843  c_dataBufferPool.getEntrySize(),
1844  c_dataBufferPool.getUsedHi(),
1845  { CFG_DB_NO_ATTRIBUTES,0,0,0 }},
1846  { "SubOp",
1847  c_subOpPool.getUsed(),
1848  c_subOpPool.getSize(),
1849  c_subOpPool.getEntrySize(),
1850  c_subOpPool.getUsedHi(),
1851  { CFG_DB_SUB_OPERATIONS,0,0,0 }},
1852  { "Page Chunk",
1853  c_page_chunk_pool.getUsed(),
1854  c_page_chunk_pool.getSize(),
1855  c_page_chunk_pool.getEntrySize(),
1856  c_page_chunk_pool.getUsedHi(),
1857  { 0,0,0,0 }},
1858  { "GCP",
1859  c_gcp_pool.getUsed(),
1860  c_gcp_pool.getSize(),
1861  c_gcp_pool.getEntrySize(),
1862  c_gcp_pool.getUsedHi(),
1863  { CFG_DB_API_HEARTBEAT_INTERVAL,
1864  CFG_DB_GCP_INTERVAL,0,0 }},
1865  { NULL, 0,0,0,0, { 0,0,0,0 }}
1866  };
1867 
1868  const size_t num_config_params =
1869  sizeof(pools[0].config_params) / sizeof(pools[0].config_params[0]);
1870  Uint32 pool = cursor->data[0];
1871  BlockNumber bn = blockToMain(number());
1872  while(pools[pool].poolname)
1873  {
1874  jam();
1875  Ndbinfo::Row row(signal, req);
1876  row.write_uint32(getOwnNodeId());
1877  row.write_uint32(bn); // block number
1878  row.write_uint32(instance()); // block instance
1879  row.write_string(pools[pool].poolname);
1880  row.write_uint64(pools[pool].used);
1881  row.write_uint64(pools[pool].total);
1882  row.write_uint64(pools[pool].used_hi);
1883  row.write_uint64(pools[pool].entry_size);
1884  for (size_t i = 0; i < num_config_params; i++)
1885  row.write_uint32(pools[pool].config_params[i]);
1886  ndbinfo_send_row(signal, req, row, rl);
1887  pool++;
1888  if (rl.need_break(req))
1889  {
1890  jam();
1891  ndbinfo_send_scan_break(signal, req, rl, pool);
1892  return;
1893  }
1894  }
1895  break;
1896  }
1897  default:
1898  break;
1899  }
1900 
1901  ndbinfo_send_scan_conf(signal, req, rl);
1902 }
1903 
1904 /*************************************************************
1905  *
1906  * Creation of subscription id's
1907  *
1908  ************************************************************/
1909 
1910 void
1911 Suma::execCREATE_SUBID_REQ(Signal* signal)
1912 {
1913  jamEntry();
1914  DBUG_ENTER("Suma::execCREATE_SUBID_REQ");
1915  ndbassert(signal->getNoOfSections() == 0);
1916  CRASH_INSERTION(13001);
1917 
1918  CreateSubscriptionIdReq const * req =
1919  (CreateSubscriptionIdReq*)signal->getDataPtr();
1920  SubscriberPtr subbPtr;
1921  if(!c_subscriberPool.seize(subbPtr)){
1922  jam();
1923  sendSubIdRef(signal, req->senderRef, req->senderData, 1412);
1924  DBUG_VOID_RETURN;
1925  }
1926  DBUG_PRINT("info",("c_subscriberPool size: %d free: %d",
1927  c_subscriberPool.getSize(),
1928  c_subscriberPool.getNoOfFree()));
1929 
1930  subbPtr.p->m_senderRef = req->senderRef;
1931  subbPtr.p->m_senderData = req->senderData;
1932 
1933  UtilSequenceReq * utilReq = (UtilSequenceReq*)signal->getDataPtrSend();
1934  utilReq->senderData = subbPtr.i;
1935  utilReq->sequenceId = SUMA_SEQUENCE;
1936  utilReq->requestType = UtilSequenceReq::NextVal;
1937  sendSignal(DBUTIL_REF, GSN_UTIL_SEQUENCE_REQ,
1938  signal, UtilSequenceReq::SignalLength, JBB);
1939 
1940  DBUG_VOID_RETURN;
1941 }
1942 
1943 void
1944 Suma::execUTIL_SEQUENCE_CONF(Signal* signal)
1945 {
1946  jamEntry();
1947  DBUG_ENTER("Suma::execUTIL_SEQUENCE_CONF");
1948  ndbassert(signal->getNoOfSections() == 0);
1949  CRASH_INSERTION(13002);
1950 
1951  UtilSequenceConf * conf = (UtilSequenceConf*)signal->getDataPtr();
1952  if(conf->requestType == UtilSequenceReq::Create) {
1953  jam();
1954  createSequenceReply(signal, conf, NULL);
1955  DBUG_VOID_RETURN;
1956  }
1957 
1958  Uint64 subId;
1959  memcpy(&subId,conf->sequenceValue,8);
1960  SubscriberPtr subbPtr;
1961  c_subscriberPool.getPtr(subbPtr,conf->senderData);
1962 
1964  subconf->senderRef = reference();
1965  subconf->senderData = subbPtr.p->m_senderData;
1966  subconf->subscriptionId = (Uint32)subId;
1967  subconf->subscriptionKey =(getOwnNodeId() << 16) | (Uint32)(subId & 0xFFFF);
1968 
1969  sendSignal(subbPtr.p->m_senderRef, GSN_CREATE_SUBID_CONF, signal,
1970  CreateSubscriptionIdConf::SignalLength, JBB);
1971 
1972  c_subscriberPool.release(subbPtr);
1973  DBUG_PRINT("info",("c_subscriberPool size: %d free: %d",
1974  c_subscriberPool.getSize(),
1975  c_subscriberPool.getNoOfFree()));
1976  DBUG_VOID_RETURN;
1977 }
1978 
1979 void
1980 Suma::execUTIL_SEQUENCE_REF(Signal* signal)
1981 {
1982  jamEntry();
1983  DBUG_ENTER("Suma::execUTIL_SEQUENCE_REF");
1984  ndbassert(signal->getNoOfSections() == 0);
1985  UtilSequenceRef * ref = (UtilSequenceRef*)signal->getDataPtr();
1986  Uint32 err= ref->errorCode;
1987 
1988  if(ref->requestType == UtilSequenceReq::Create) {
1989  jam();
1990  createSequenceReply(signal, NULL, ref);
1991  DBUG_VOID_RETURN;
1992  }
1993 
1994  Uint32 subData = ref->senderData;
1995 
1996  SubscriberPtr subbPtr;
1997  c_subscriberPool.getPtr(subbPtr,subData);
1998  if (err == UtilSequenceRef::TCError)
1999  {
2000  jam();
2001  err = ref->TCErrorCode;
2002  }
2003  sendSubIdRef(signal, subbPtr.p->m_senderRef, subbPtr.p->m_senderData, err);
2004  c_subscriberPool.release(subbPtr);
2005  DBUG_PRINT("info",("c_subscriberPool size: %d free: %d",
2006  c_subscriberPool.getSize(),
2007  c_subscriberPool.getNoOfFree()));
2008  DBUG_VOID_RETURN;
2009 }//execUTIL_SEQUENCE_REF()
2010 
2011 
2012 void
2013 Suma::sendSubIdRef(Signal* signal,
2014  Uint32 senderRef, Uint32 senderData, Uint32 errCode)
2015 {
2016  jam();
2017  DBUG_ENTER("Suma::sendSubIdRef");
2018  CreateSubscriptionIdRef * ref =
2019  (CreateSubscriptionIdRef *)signal->getDataPtrSend();
2020 
2021  ref->senderRef = reference();
2022  ref->senderData = senderData;
2023  ref->errorCode = errCode;
2024  sendSignal(senderRef,
2025  GSN_CREATE_SUBID_REF,
2026  signal,
2027  CreateSubscriptionIdRef::SignalLength,
2028  JBB);
2029 
2030  DBUG_VOID_RETURN;
2031 }
2032 
2033 /**********************************************************
2034  * Suma participant interface
2035  *
2036  * Creation of subscriptions
2037  */
2038 void
2040 {
2041  jamEntry();
2042  DBUG_ENTER("Suma::execSUB_CREATE_REQ");
2043  ndbassert(signal->getNoOfSections() == 0);
2044  CRASH_INSERTION(13003);
2045 
2046  const SubCreateReq req = *(SubCreateReq*)signal->getDataPtr();
2047 
2048  const Uint32 senderRef = req.senderRef;
2049  const Uint32 senderData = req.senderData;
2050  const Uint32 subId = req.subscriptionId;
2051  const Uint32 subKey = req.subscriptionKey;
2052  const Uint32 type = req.subscriptionType & SubCreateReq::RemoveFlags;
2053  const Uint32 flags = req.subscriptionType & SubCreateReq::GetFlags;
2054  const Uint32 reportAll = (flags & SubCreateReq::ReportAll) ?
2055  Subscription::REPORT_ALL : 0;
2056  const Uint32 reportSubscribe = (flags & SubCreateReq::ReportSubscribe) ?
2057  Subscription::REPORT_SUBSCRIBE : 0;
2058  const Uint32 noReportDDL = (flags & SubCreateReq::NoReportDDL) ?
2059  Subscription::NO_REPORT_DDL : 0;
2060  const Uint32 tableId = req.tableId;
2061  const Uint32 schemaTransId = req.schemaTransId;
2062 
2063  bool subDropped = req.subscriptionType & SubCreateReq::NR_Sub_Dropped;
2064 
2068  if (subDropped)
2069  {
2070  ndbrequire(refToNode(senderRef) == c_startup.m_restart_server_node_id);
2071  }
2072 
2073  Subscription key;
2074  key.m_subscriptionId = subId;
2075  key.m_subscriptionKey = subKey;
2076 
2077  DBUG_PRINT("enter",("key.m_subscriptionId: %u, key.m_subscriptionKey: %u",
2078  key.m_subscriptionId, key.m_subscriptionKey));
2079 
2080  SubscriptionPtr subPtr;
2081 
2082  bool found = c_subscriptions.find(subPtr, key);
2083 
2084  if (c_startup.m_restart_server_node_id == RNIL)
2085  {
2086  jam();
2087 
2091  sendSubCreateRef(signal, senderRef, senderData,
2092  SubCreateRef::NotStarted);
2093  return;
2094  }
2095 
2096  CRASH_INSERTION2(13040, c_startup.m_restart_server_node_id != RNIL);
2097  CRASH_INSERTION(13041);
2098 
2099  bool allowDup = true; //c_startup.m_restart_server_node_id;
2100 
2101  if (found && !allowDup)
2102  {
2103  jam();
2104  sendSubCreateRef(signal, senderRef, senderData,
2105  SubCreateRef::SubscriptionAlreadyExist);
2106  return;
2107  }
2108 
2109  if (found == false)
2110  {
2111  jam();
2112  if(!c_subscriptions.seize(subPtr))
2113  {
2114  jam();
2115  sendSubCreateRef(signal, senderRef, senderData,
2116  SubCreateRef::OutOfSubscriptionRecords);
2117  return;
2118  }
2119 
2120  new (subPtr.p) Subscription();
2121  subPtr.p->m_seq_no = c_current_seq;
2122  subPtr.p->m_subscriptionId = subId;
2123  subPtr.p->m_subscriptionKey = subKey;
2124  subPtr.p->m_subscriptionType = type;
2125  subPtr.p->m_tableId = tableId;
2126  subPtr.p->m_table_ptrI = RNIL;
2127  subPtr.p->m_state = Subscription::UNDEFINED;
2128  subPtr.p->m_trigger_state = Subscription::T_UNDEFINED;
2129  subPtr.p->m_triggers[0] = ILLEGAL_TRIGGER_ID;
2130  subPtr.p->m_triggers[1] = ILLEGAL_TRIGGER_ID;
2131  subPtr.p->m_triggers[2] = ILLEGAL_TRIGGER_ID;
2132  subPtr.p->m_errorCode = 0;
2133  subPtr.p->m_options = reportSubscribe | reportAll | noReportDDL;
2134  subPtr.p->m_schemaTransId = schemaTransId;
2135  }
2136 
2137  Ptr<SubOpRecord> subOpPtr;
2138  LocalDLFifoList<SubOpRecord> subOpList(c_subOpPool, subPtr.p->m_create_req);
2139  if ((ERROR_INSERTED(13044) && found == false) ||
2140  subOpList.seize(subOpPtr) == false)
2141  {
2142  jam();
2143  if (found == false)
2144  {
2145  jam();
2146  if (ERROR_INSERTED(13044))
2147  {
2148  CLEAR_ERROR_INSERT_VALUE;
2149  }
2150  c_subscriptionPool.release(subPtr); // not yet in hash
2151  }
2152  sendSubCreateRef(signal, senderRef, senderData,
2153  SubCreateRef::OutOfTableRecords);
2154  return;
2155  }
2156 
2157  subOpPtr.p->m_senderRef = senderRef;
2158  subOpPtr.p->m_senderData = senderData;
2159 
2160  if (subDropped)
2161  {
2162  jam();
2163  subPtr.p->m_options |= Subscription::MARKED_DROPPED;
2164  }
2165 
2166  TablePtr tabPtr;
2167  if (found)
2168  {
2169  jam();
2170  c_tablePool.getPtr(tabPtr, subPtr.p->m_table_ptrI);
2171  }
2172  else if (c_tables.find(tabPtr, tableId))
2173  {
2174  jam();
2175  }
2176  else
2177  {
2178  jam();
2179  if (ERROR_INSERTED(13045) || c_tablePool.seize(tabPtr) == false)
2180  {
2181  jam();
2182  if (ERROR_INSERTED(13045))
2183  {
2184  CLEAR_ERROR_INSERT_VALUE;
2185  }
2186 
2187  subOpList.release(subOpPtr);
2188  c_subscriptionPool.release(subPtr); // not yet in hash
2189  sendSubCreateRef(signal, senderRef, senderData,
2190  SubCreateRef::OutOfTableRecords);
2191  return;
2192  }
2193 
2194  new (tabPtr.p) Table;
2195  tabPtr.p->m_tableId= tableId;
2196  tabPtr.p->m_ptrI= tabPtr.i;
2197  tabPtr.p->m_error = 0;
2198  tabPtr.p->m_schemaVersion = RNIL;
2199  tabPtr.p->m_state = Table::UNDEFINED;
2200  tabPtr.p->m_schemaTransId = schemaTransId;
2201  c_tables.add(tabPtr);
2202  }
2203 
2204  if (found == false)
2205  {
2206  jam();
2207  c_subscriptions.add(subPtr);
2208  LocalDLList<Subscription> list(c_subscriptionPool,
2209  tabPtr.p->m_subscriptions);
2210  list.add(subPtr);
2211  subPtr.p->m_table_ptrI = tabPtr.i;
2212  }
2213 
2214  switch(tabPtr.p->m_state){
2215  case Table::DEFINED:{
2216  jam();
2217  // Send conf
2218  subOpList.release(subOpPtr);
2219  subPtr.p->m_state = Subscription::DEFINED;
2220  SubCreateConf * const conf = (SubCreateConf*)signal->getDataPtrSend();
2221  conf->senderRef = reference();
2222  conf->senderData = senderData;
2223  sendSignal(senderRef, GSN_SUB_CREATE_CONF, signal,
2224  SubCreateConf::SignalLength, JBB);
2225  return;
2226  }
2227  case Table::UNDEFINED:{
2228  jam();
2229  tabPtr.p->m_state = Table::DEFINING;
2230  subPtr.p->m_state = Subscription::DEFINING;
2231 
2232  if (ERROR_INSERTED(13031))
2233  {
2234  jam();
2235  CLEAR_ERROR_INSERT_VALUE;
2236  GetTabInfoRef* ref = (GetTabInfoRef*)signal->getDataPtrSend();
2237  ref->tableId = tableId;
2238  ref->senderData = tabPtr.i;
2239  ref->errorCode = GetTabInfoRef::TableNotDefined;
2240  sendSignal(reference(), GSN_GET_TABINFOREF, signal,
2241  GetTabInfoRef::SignalLength, JBB);
2242  return;
2243  }
2244 
2245  GetTabInfoReq * req = (GetTabInfoReq *)signal->getDataPtrSend();
2246  req->senderRef = reference();
2247  req->senderData = tabPtr.i;
2248  req->requestType =
2249  GetTabInfoReq::RequestById | GetTabInfoReq::LongSignalConf;
2250  req->tableId = tableId;
2251  req->schemaTransId = schemaTransId;
2252 
2253  sendSignal(DBDICT_REF, GSN_GET_TABINFOREQ, signal,
2254  GetTabInfoReq::SignalLength, JBB);
2255  return;
2256  }
2257  case Table::DEFINING:
2258  {
2259  jam();
2263  subPtr.p->m_state = Subscription::DEFINING;
2264  return;
2265  }
2266  case Table::DROPPED:
2267  {
2268  subOpList.release(subOpPtr);
2269 
2270  {
2271  LocalDLList<Subscription> list(c_subscriptionPool,
2272  tabPtr.p->m_subscriptions);
2273  list.remove(subPtr);
2274  }
2275  c_subscriptions.release(subPtr);
2276 
2277  sendSubCreateRef(signal, senderRef, senderData,
2278  SubCreateRef::TableDropped);
2279  return;
2280  }
2281  }
2282 
2283  ndbrequire(false);
2284 }
2285 
2286 void
2287 Suma::sendSubCreateRef(Signal* signal, Uint32 retRef, Uint32 data,
2288  Uint32 errCode)
2289 {
2290  jam();
2291  SubCreateRef * ref = (SubCreateRef *)signal->getDataPtrSend();
2292  ref->errorCode = errCode;
2293  ref->senderData = data;
2294  sendSignal(retRef, GSN_SUB_CREATE_REF, signal,
2295  SubCreateRef::SignalLength, JBB);
2296  return;
2297 }
2298 
2299 /**********************************************************
2300  *
2301  * Setting upp trigger for subscription
2302  *
2303  */
2304 
2305 void
2307 {
2308  jamEntry();
2309 
2310  CRASH_INSERTION(13004);
2311 
2312  SubSyncReq * const req = (SubSyncReq*)signal->getDataPtr();
2313 
2314  SubscriptionPtr subPtr;
2315  Subscription key;
2316  key.m_subscriptionId = req->subscriptionId;
2317  key.m_subscriptionKey = req->subscriptionKey;
2318 
2319  SectionHandle handle(this, signal);
2320  if(!c_subscriptions.find(subPtr, key))
2321  {
2322  jam();
2323  releaseSections(handle);
2324  sendSubSyncRef(signal, 1407);
2325  return;
2326  }
2327 
2328  Ptr<SyncRecord> syncPtr;
2329  LocalDLList<SyncRecord> list(c_syncPool, subPtr.p->m_syncRecords);
2330  if(!list.seize(syncPtr))
2331  {
2332  jam();
2333  releaseSections(handle);
2334  sendSubSyncRef(signal, 1416);
2335  return;
2336  }
2337 
2338  new (syncPtr.p) Ptr<SyncRecord>;
2339  syncPtr.p->m_senderRef = req->senderRef;
2340  syncPtr.p->m_senderData = req->senderData;
2341  syncPtr.p->m_subscriptionPtrI = subPtr.i;
2342  syncPtr.p->ptrI = syncPtr.i;
2343  syncPtr.p->m_error = 0;
2344  syncPtr.p->m_requestInfo = req->requestInfo;
2345  syncPtr.p->m_frag_cnt = req->fragCount;
2346  syncPtr.p->m_frag_id = req->fragId;
2347  syncPtr.p->m_tableId = subPtr.p->m_tableId;
2348 
2349  {
2350  jam();
2351  if(handle.m_cnt > 0)
2352  {
2353  SegmentedSectionPtr ptr;
2354  handle.getSection(ptr, SubSyncReq::ATTRIBUTE_LIST);
2355  LocalDataBuffer<15> attrBuf(c_dataBufferPool, syncPtr.p->m_attributeList);
2356  append(attrBuf, ptr, getSectionSegmentPool());
2357  }
2358  if (req->requestInfo & SubSyncReq::RangeScan)
2359  {
2360  jam();
2361  ndbrequire(handle.m_cnt > 1)
2362  SegmentedSectionPtr ptr;
2363  handle.getSection(ptr, SubSyncReq::TUX_BOUND_INFO);
2364  LocalDataBuffer<15> boundBuf(c_dataBufferPool, syncPtr.p->m_boundInfo);
2365  append(boundBuf, ptr, getSectionSegmentPool());
2366  }
2367  releaseSections(handle);
2368  }
2369 
2373  {
2374  jam();
2375  DihScanTabReq* req = (DihScanTabReq*)signal->getDataPtrSend();
2376  req->senderRef = reference();
2377  req->senderData = syncPtr.i;
2378  req->tableId = subPtr.p->m_tableId;
2379  req->schemaTransId = subPtr.p->m_schemaTransId;
2380  sendSignal(DBDIH_REF, GSN_DIH_SCAN_TAB_REQ, signal,
2381  DihScanTabReq::SignalLength, JBB);
2382  }
2383 }
2384 
2385 void
2386 Suma::sendSubSyncRef(Signal* signal, Uint32 errCode){
2387  jam();
2388  SubSyncRef * ref= (SubSyncRef *)signal->getDataPtrSend();
2389  ref->errorCode = errCode;
2390  sendSignal(signal->getSendersBlockRef(),
2391  GSN_SUB_SYNC_REF,
2392  signal,
2393  SubSyncRef::SignalLength,
2394  JBB);
2395  return;
2396 }
2397 
2398 void
2400 {
2401  jamEntry();
2402  DBUG_ENTER("Suma::execDI_FCOUNTREF");
2403  DihScanTabRef * ref = (DihScanTabRef*)signal->getDataPtr();
2404  switch ((DihScanTabRef::ErrorCode) ref->error)
2405  {
2406  case DihScanTabRef::ErroneousTableState:
2407  jam();
2408  if (ref->tableStatus == Dbdih::TabRecord::TS_CREATING)
2409  {
2410  const Uint32 tableId = ref->tableId;
2411  const Uint32 synPtrI = ref->senderData;
2412  const Uint32 schemaTransId = ref->schemaTransId;
2413  DihScanTabReq * req = (DihScanTabReq*)signal->getDataPtrSend();
2414 
2415  req->senderData = synPtrI;
2416  req->senderRef = reference();
2417  req->tableId = tableId;
2418  req->schemaTransId = schemaTransId;
2419  sendSignalWithDelay(DBDIH_REF, GSN_DIH_SCAN_TAB_REQ, signal,
2420  DihScanTabReq::SignalLength,
2421  DihScanTabReq::RetryInterval);
2422  DBUG_VOID_RETURN;
2423  }
2424  ndbrequire(false);
2425  default:
2426  ndbrequire(false);
2427  }
2428 
2429  DBUG_VOID_RETURN;
2430 }
2431 
2432 void
2433 Suma::execDIH_SCAN_TAB_CONF(Signal* signal)
2434 {
2435  jamEntry();
2436  DBUG_ENTER("Suma::execDI_FCOUNTCONF");
2437  ndbassert(signal->getNoOfSections() == 0);
2438  DihScanTabConf * conf = (DihScanTabConf*)signal->getDataPtr();
2439  const Uint32 tableId = conf->tableId;
2440  const Uint32 fragCount = conf->fragmentCount;
2441  const Uint32 scanCookie = conf->scanCookie;
2442 
2443  Ptr<SyncRecord> ptr;
2444  c_syncPool.getPtr(ptr, conf->senderData);
2445 
2446  LocalDataBuffer<15> fragBuf(c_dataBufferPool, ptr.p->m_fragments);
2447  ndbrequire(fragBuf.getSize() == 0);
2448 
2449  ndbassert(fragCount >= ptr.p->m_frag_cnt);
2450  if (ptr.p->m_frag_cnt == 0)
2451  {
2452  jam();
2453  ptr.p->m_frag_cnt = fragCount;
2454  }
2455  ptr.p->m_scan_cookie = scanCookie;
2456 
2457  DihScanGetNodesReq* req = (DihScanGetNodesReq*)signal->getDataPtrSend();
2458  req->senderRef = reference();
2459  req->senderData = ptr.i;
2460  req->tableId = tableId;
2461  req->fragId = 0;
2462  req->scanCookie = scanCookie;
2463  sendSignal(DBDIH_REF, GSN_DIH_SCAN_GET_NODES_REQ, signal,
2464  DihScanGetNodesReq::SignalLength, JBB);
2465 
2466  DBUG_VOID_RETURN;
2467 }
2468 
2469 void
2471 {
2472  jamEntry();
2473  DBUG_ENTER("Suma::execDIGETPRIMCONF");
2474  ndbassert(signal->getNoOfSections() == 0);
2475 
2476  DihScanGetNodesConf* conf = (DihScanGetNodesConf*)signal->getDataPtr();
2477  const Uint32 nodeCount = conf->count;
2478  const Uint32 tableId = conf->tableId;
2479  const Uint32 fragNo = conf->fragId;
2480 
2481  ndbrequire(nodeCount > 0 && nodeCount <= MAX_REPLICAS);
2482 
2483  Ptr<SyncRecord> ptr;
2484  c_syncPool.getPtr(ptr, conf->senderData);
2485 
2486  {
2487  LocalDataBuffer<15> fragBuf(c_dataBufferPool, ptr.p->m_fragments);
2488 
2492  FragmentDescriptor fd;
2493  fd.m_fragDesc.m_nodeId = conf->nodes[0];
2494  fd.m_fragDesc.m_fragmentNo = fragNo;
2495  fd.m_fragDesc.m_lqhInstanceKey = conf->instanceKey;
2496  if (ptr.p->m_frag_id == ZNIL)
2497  {
2498  signal->theData[2] = fd.m_dummy;
2499  fragBuf.append(&signal->theData[2], 1);
2500  }
2501  else if (ptr.p->m_frag_id == fragNo)
2502  {
2503  /*
2504  * Given fragment must have a replica on this node.
2505  */
2506  const Uint32 ownNodeId = getOwnNodeId();
2507  Uint32 i = 0;
2508  for (i = 0; i < nodeCount; i++)
2509  if (conf->nodes[i] == ownNodeId)
2510  break;
2511  if (i == nodeCount)
2512  {
2513  sendSubSyncRef(signal, 1428);
2514  return;
2515  }
2516  fd.m_fragDesc.m_nodeId = ownNodeId;
2517  signal->theData[2] = fd.m_dummy;
2518  fragBuf.append(&signal->theData[2], 1);
2519  }
2520  }
2521 
2522  const Uint32 nextFrag = fragNo + 1;
2523  if(nextFrag == ptr.p->m_frag_cnt)
2524  {
2525  jam();
2526 
2527  ptr.p->startScan(signal);
2528  return;
2529  }
2530 
2531  DihScanGetNodesReq* req = (DihScanGetNodesReq*)signal->getDataPtrSend();
2532  req->senderRef = reference();
2533  req->senderData = ptr.i;
2534  req->tableId = tableId;
2535  req->fragId = nextFrag;
2536  req->scanCookie = ptr.p->m_scan_cookie;
2537  sendSignal(DBDIH_REF, GSN_DIH_SCAN_GET_NODES_REQ, signal,
2538  DihScanGetNodesReq::SignalLength, JBB);
2539 
2540  DBUG_VOID_RETURN;
2541 }
2542 
2543 /**********************************************************
2544  * Dict interface
2545  */
2546 
2547 /*************************************************************************
2548  *
2549  *
2550  */
2551 void
2553  jamEntry();
2554  GetTabInfoRef* ref = (GetTabInfoRef*)signal->getDataPtr();
2555  Uint32 tableId = ref->tableId;
2556  Uint32 senderData = ref->senderData;
2557  Uint32 schemaTransId = ref->schemaTransId;
2558  GetTabInfoRef::ErrorCode errorCode =
2559  (GetTabInfoRef::ErrorCode) ref->errorCode;
2560  int do_resend_request = 0;
2561  TablePtr tabPtr;
2562  c_tablePool.getPtr(tabPtr, senderData);
2563  switch (errorCode)
2564  {
2565  case GetTabInfoRef::TableNotDefined:
2566  // wrong state
2567  break;
2568  case GetTabInfoRef::InvalidTableId:
2569  // no such table
2570  break;
2571  case GetTabInfoRef::Busy:
2572  do_resend_request = 1;
2573  break;
2574  case GetTabInfoRef::NoFetchByName:
2575  jam();
2576  case GetTabInfoRef::TableNameTooLong:
2577  jam();
2578  ndbrequire(false);
2579  }
2580  if (tabPtr.p->m_state == Table::DROPPED)
2581  {
2582  jam();
2583  do_resend_request = 0;
2584  }
2585 
2586  if (do_resend_request)
2587  {
2588  GetTabInfoReq * req = (GetTabInfoReq *)signal->getDataPtrSend();
2589  req->senderRef = reference();
2590  req->senderData = senderData;
2591  req->requestType =
2592  GetTabInfoReq::RequestById | GetTabInfoReq::LongSignalConf;
2593  req->tableId = tableId;
2594  req->schemaTransId = schemaTransId;
2595  sendSignalWithDelay(DBDICT_REF, GSN_GET_TABINFOREQ, signal,
2596  30, GetTabInfoReq::SignalLength);
2597  return;
2598  }
2599  get_tabinfo_ref_release(signal, tabPtr);
2600 }
2601 
2602 void
2603 Suma::get_tabinfo_ref_release(Signal* signal, Ptr<Table> tabPtr)
2604 {
2605  LocalDLList<Subscription> subList(c_subscriptionPool,
2606  tabPtr.p->m_subscriptions);
2607  Ptr<Subscription> subPtr;
2608  bool empty = subList.isEmpty();
2609  for(subList.first(subPtr); !subPtr.isNull();)
2610  {
2611  jam();
2612  Ptr<SubOpRecord> ptr;
2613  ndbassert(subPtr.p->m_start_req.isEmpty());
2614  ndbassert(subPtr.p->m_stop_req.isEmpty());
2615  LocalDLFifoList<SubOpRecord> list(c_subOpPool, subPtr.p->m_create_req);
2616  for (list.first(ptr); !ptr.isNull(); )
2617  {
2618  jam();
2619  sendSubCreateRef(signal,
2620  ptr.p->m_senderRef,
2621  ptr.p->m_senderData,
2622  SubCreateRef::TableDropped);
2623 
2624  Ptr<SubOpRecord> tmp0 = ptr;
2625  list.next(ptr);
2626  list.release(tmp0);
2627  }
2628  Ptr<Subscription> tmp1 = subPtr;
2629  subList.next(subPtr);
2630  c_subscriptions.remove(tmp1);
2631  subList.release(tmp1);
2632  }
2633 
2634  c_tables.release(tabPtr);
2635  ndbassert(!empty);
2636 }
2637 
2638 void
2639 Suma::execGET_TABINFO_CONF(Signal* signal){
2640  jamEntry();
2641 
2642  CRASH_INSERTION(13006);
2643 
2644  if(!assembleFragments(signal)){
2645  return;
2646  }
2647 
2648  SectionHandle handle(this, signal);
2649  GetTabInfoConf* conf = (GetTabInfoConf*)signal->getDataPtr();
2650  TablePtr tabPtr;
2651  c_tablePool.getPtr(tabPtr, conf->senderData);
2652  SegmentedSectionPtr ptr;
2653  handle.getSection(ptr, GetTabInfoConf::DICT_TAB_INFO);
2654  ndbrequire(tabPtr.p->parseTable(ptr, *this));
2655  releaseSections(handle);
2656 
2657  if (tabPtr.p->m_state == Table::DROPPED)
2658  {
2659  jam();
2660  get_tabinfo_ref_release(signal, tabPtr);
2661  return;
2662  }
2663 
2664  tabPtr.p->m_state = Table::DEFINED;
2665 
2666  LocalDLList<Subscription> subList(c_subscriptionPool,
2667  tabPtr.p->m_subscriptions);
2668  Ptr<Subscription> subPtr;
2669  bool empty = subList.isEmpty();
2670  for(subList.first(subPtr); !subPtr.isNull(); subList.next(subPtr))
2671  {
2672  jam();
2673  subPtr.p->m_state = Subscription::DEFINED;
2674 
2675  Ptr<SubOpRecord> ptr;
2676  LocalDLFifoList<SubOpRecord> list(c_subOpPool, subPtr.p->m_create_req);
2677  for (list.first(ptr); !ptr.isNull();)
2678  {
2679  jam();
2680  SubCreateConf * const conf = (SubCreateConf*)signal->getDataPtrSend();
2681  conf->senderRef = reference();
2682  conf->senderData = ptr.p->m_senderData;
2683  sendSignal(ptr.p->m_senderRef, GSN_SUB_CREATE_CONF, signal,
2684  SubCreateConf::SignalLength, JBB);
2685 
2686  Ptr<SubOpRecord> tmp = ptr;
2687  list.next(ptr);
2688  list.release(tmp);
2689  }
2690  }
2691 
2692  ndbassert(!empty);
2693 }
2694 
2695 bool
2697  Suma &suma)
2698 {
2699  DBUG_ENTER("Suma::Table::parseTable");
2700 
2701  SimplePropertiesSectionReader it(ptr, suma.getSectionSegmentPool());
2702 
2704  DictTabInfo::Table tableDesc; tableDesc.init();
2705  s = SimpleProperties::unpack(it, &tableDesc,
2706  DictTabInfo::TableMapping,
2707  DictTabInfo::TableMappingSize,
2708  true, true);
2709 
2710  jamBlock(&suma);
2711  suma.suma_ndbrequire(s == SimpleProperties::Break);
2712 
2716  m_noOfAttributes = tableDesc.NoOfAttributes;
2717  m_schemaVersion = tableDesc.TableVersion;
2718 
2719  DBUG_RETURN(true);
2720 }
2721 
2722 /**********************************************************
2723  *
2724  * Scan interface
2725  *
2726  */
2727 
2728 void
2730 {
2731  jam();
2732  DBUG_ENTER("Suma::SyncRecord::startScan");
2733 
2737  m_currentFragment = 0;
2738  nextScan(signal);
2739  DBUG_VOID_RETURN;
2740 }
2741 
2742 bool
2743 Suma::SyncRecord::getNextFragment(TablePtr * tab,
2744  FragmentDescriptor * fd)
2745 {
2746  jam();
2747  SubscriptionPtr subPtr;
2748  suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
2750 
2751  TablePtr tabPtr;
2752  suma.c_tablePool.getPtr(tabPtr, subPtr.p->m_table_ptrI);
2753  LocalDataBuffer<15> fragBuf(suma.c_dataBufferPool, m_fragments);
2754 
2755  fragBuf.position(fragIt, m_currentFragment);
2756  for(; !fragIt.curr.isNull(); fragBuf.next(fragIt), m_currentFragment++)
2757  {
2758  FragmentDescriptor tmp;
2759  tmp.m_dummy = * fragIt.data;
2760  if(tmp.m_fragDesc.m_nodeId == suma.getOwnNodeId()){
2761  * fd = tmp;
2762  * tab = tabPtr;
2763  return true;
2764  }
2765  }
2766  m_currentFragment = 0;
2767  return false;
2768 }
2769 
2770 void
2771 Suma::SyncRecord::nextScan(Signal* signal)
2772 {
2773  jam();
2774  DBUG_ENTER("Suma::SyncRecord::nextScan");
2775  TablePtr tabPtr;
2776  FragmentDescriptor fd;
2777  SubscriptionPtr subPtr;
2778  if(!getNextFragment(&tabPtr, &fd)){
2779  jam();
2780  completeScan(signal);
2781  DBUG_VOID_RETURN;
2782  }
2783 
2784  suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
2785 
2786  DataBuffer<15>::Head head = m_attributeList;
2787  LocalDataBuffer<15> attrBuf(suma.c_dataBufferPool, head);
2788 
2789  Uint32 instanceKey = fd.m_fragDesc.m_lqhInstanceKey;
2790  BlockReference lqhRef = numberToRef(DBLQH, instanceKey, suma.getOwnNodeId());
2791 
2792  ScanFragReq * req = (ScanFragReq *)signal->getDataPtrSend();
2793  const Uint32 parallelism = 16;
2794  //const Uint32 attrLen = 5 + attrBuf.getSize();
2795 
2796  req->senderData = ptrI;
2797  req->resultRef = suma.reference();
2798  req->tableId = tabPtr.p->m_tableId;
2799  req->requestInfo = 0;
2800  req->savePointId = 0;
2801  ScanFragReq::setLockMode(req->requestInfo, 0);
2802  ScanFragReq::setHoldLockFlag(req->requestInfo, 1);
2803  ScanFragReq::setKeyinfoFlag(req->requestInfo, 0);
2804  if (m_requestInfo & SubSyncReq::NoDisk)
2805  {
2806  ScanFragReq::setNoDiskFlag(req->requestInfo, 1);
2807  }
2808 
2809  if (m_requestInfo & SubSyncReq::LM_Exclusive)
2810  {
2811  ScanFragReq::setLockMode(req->requestInfo, 1);
2812  ScanFragReq::setHoldLockFlag(req->requestInfo, 1);
2813  ScanFragReq::setKeyinfoFlag(req->requestInfo, 1);
2814  }
2815 
2816  if (m_requestInfo & SubSyncReq::Reorg)
2817  {
2818  ScanFragReq::setReorgFlag(req->requestInfo, ScanFragReq::REORG_MOVED);
2819  }
2820 
2821  if (m_requestInfo & SubSyncReq::TupOrder)
2822  {
2823  ScanFragReq::setTupScanFlag(req->requestInfo, 1);
2824  }
2825 
2826  if (m_requestInfo & SubSyncReq::LM_CommittedRead)
2827  {
2828  ScanFragReq::setReadCommittedFlag(req->requestInfo, 1);
2829  }
2830 
2831  if (m_requestInfo & SubSyncReq::RangeScan)
2832  {
2833  ScanFragReq::setRangeScanFlag(req->requestInfo, 1);
2834  }
2835 
2836  if (m_requestInfo & SubSyncReq::StatScan)
2837  {
2838  ScanFragReq::setStatScanFlag(req->requestInfo, 1);
2839  }
2840 
2841  req->fragmentNoKeyLen = fd.m_fragDesc.m_fragmentNo;
2842  req->schemaVersion = tabPtr.p->m_schemaVersion;
2843  req->transId1 = 0;
2844  req->transId2 = (SUMA << 20) + (suma.getOwnNodeId() << 8);
2845  req->clientOpPtr = (ptrI << 16);
2846  req->batch_size_rows= parallelism;
2847 
2848  req->batch_size_bytes= 0;
2849 
2850  Uint32 * attrInfo = signal->theData + 25;
2851  attrInfo[0] = attrBuf.getSize();
2852  attrInfo[1] = 0;
2853  attrInfo[2] = 0;
2854  attrInfo[3] = 0;
2855  attrInfo[4] = 0;
2856 
2857  Uint32 pos = 5;
2859  for(attrBuf.first(it); !it.curr.isNull(); attrBuf.next(it))
2860  {
2861  AttributeHeader::init(&attrInfo[pos++], * it.data, 0);
2862  }
2863  LinearSectionPtr ptr[3];
2864  Uint32 noOfSections;
2865  ptr[0].p = attrInfo;
2866  ptr[0].sz = pos;
2867  noOfSections = 1;
2868  if (m_requestInfo & SubSyncReq::RangeScan)
2869  {
2870  jam();
2871  Uint32 oldpos = pos; // after attrInfo
2872  LocalDataBuffer<15> boundBuf(suma.c_dataBufferPool, m_boundInfo);
2873  for (boundBuf.first(it); !it.curr.isNull(); boundBuf.next(it))
2874  {
2875  attrInfo[pos++] = *it.data;
2876  }
2877  ptr[1].p = &attrInfo[oldpos];
2878  ptr[1].sz = pos - oldpos;
2879  noOfSections = 2;
2880  }
2881  suma.sendSignal(lqhRef, GSN_SCAN_FRAGREQ, signal,
2882  ScanFragReq::SignalLength, JBB, ptr, noOfSections);
2883 
2884  m_currentNoOfAttributes = attrBuf.getSize();
2885 
2886  DBUG_VOID_RETURN;
2887 }
2888 
2889 
2890 void
2891 Suma::execSCAN_FRAGREF(Signal* signal){
2892  jamEntry();
2893 
2894 // ScanFragRef * const ref = (ScanFragRef*)signal->getDataPtr();
2895  ndbrequire(false);
2896 }
2897 
2898 void
2899 Suma::execSCAN_FRAGCONF(Signal* signal){
2900  jamEntry();
2901  DBUG_ENTER("Suma::execSCAN_FRAGCONF");
2902  ndbassert(signal->getNoOfSections() == 0);
2903  CRASH_INSERTION(13011);
2904 
2905  ScanFragConf * const conf = (ScanFragConf*)signal->getDataPtr();
2906 
2907  const Uint32 completed = conf->fragmentCompleted;
2908  const Uint32 senderData = conf->senderData;
2909  const Uint32 completedOps = conf->completedOps;
2910 
2911  Ptr<SyncRecord> syncPtr;
2912  c_syncPool.getPtr(syncPtr, senderData);
2913 
2914  if(completed != 2){ // 2==ZSCAN_FRAG_CLOSED
2915  jam();
2916 
2917 #if PRINT_ONLY
2918  SubSyncContinueConf * const conf =
2919  (SubSyncContinueConf*)signal->getDataPtrSend();
2920  conf->subscriptionId = subPtr.p->m_subscriptionId;
2921  conf->subscriptionKey = subPtr.p->m_subscriptionKey;
2922  execSUB_SYNC_CONTINUE_CONF(signal);
2923 #else
2924  SubSyncContinueReq * const req = (SubSyncContinueReq*)signal->getDataPtrSend();
2925  req->subscriberData = syncPtr.p->m_senderData;
2926  req->noOfRowsSent = completedOps;
2927  req->senderData = senderData;
2928  sendSignal(syncPtr.p->m_senderRef, GSN_SUB_SYNC_CONTINUE_REQ, signal,
2929  SubSyncContinueReq::SignalLength, JBB);
2930 #endif
2931  DBUG_VOID_RETURN;
2932  }
2933 
2934  ndbrequire(completedOps == 0);
2935 
2936  syncPtr.p->m_currentFragment++;
2937  syncPtr.p->nextScan(signal);
2938  DBUG_VOID_RETURN;
2939 }
2940 
2941 void
2942 Suma::execSUB_SYNC_CONTINUE_CONF(Signal* signal){
2943  jamEntry();
2944  ndbassert(signal->getNoOfSections() == 0);
2945 
2946  CRASH_INSERTION(13012);
2947 
2948  SubSyncContinueConf * const conf =
2949  (SubSyncContinueConf*)signal->getDataPtr();
2950 
2951  SubscriptionPtr subPtr;
2952  Subscription key;
2953  key.m_subscriptionId = conf->subscriptionId;
2954  key.m_subscriptionKey = conf->subscriptionKey;
2955  Uint32 syncPtrI = conf->senderData;
2956 
2957  ndbrequire(c_subscriptions.find(subPtr, key));
2958 
2959  Uint32 instanceKey;
2960  {
2961  Ptr<SyncRecord> syncPtr;
2962  c_syncPool.getPtr(syncPtr, syncPtrI);
2963  LocalDataBuffer<15> fragBuf(c_dataBufferPool, syncPtr.p->m_fragments);
2965  bool ok = fragBuf.position(fragIt, syncPtr.p->m_currentFragment);
2966  ndbrequire(ok);
2967  FragmentDescriptor tmp;
2968  tmp.m_dummy = * fragIt.data;
2969  instanceKey = tmp.m_fragDesc.m_lqhInstanceKey;
2970  }
2971  BlockReference lqhRef = numberToRef(DBLQH, instanceKey, getOwnNodeId());
2972 
2973  ScanFragNextReq * req = (ScanFragNextReq *)signal->getDataPtrSend();
2974  req->senderData = syncPtrI;
2975  req->requestInfo = 0;
2976  req->transId1 = 0;
2977  req->transId2 = (SUMA << 20) + (getOwnNodeId() << 8);
2978  req->batch_size_rows = 16;
2979  req->batch_size_bytes = 0;
2980  sendSignal(lqhRef, GSN_SCAN_NEXTREQ, signal,
2981  ScanFragNextReq::SignalLength, JBB);
2982 }
2983 
2984 void
2985 Suma::SyncRecord::completeScan(Signal* signal, int error)
2986 {
2987  jam();
2988  DBUG_ENTER("Suma::SyncRecord::completeScan");
2989 
2990  SubscriptionPtr subPtr;
2991  suma.c_subscriptionPool.getPtr(subPtr, m_subscriptionPtrI);
2992 
2993  DihScanTabCompleteRep* rep = (DihScanTabCompleteRep*)signal->getDataPtr();
2994  rep->tableId = subPtr.p->m_tableId;
2995  rep->scanCookie = m_scan_cookie;
2996  suma.sendSignal(DBDIH_REF, GSN_DIH_SCAN_TAB_COMPLETE_REP, signal,
2997  DihScanTabCompleteRep::SignalLength, JBB);
2998 
2999 #if PRINT_ONLY
3000  ndbout_c("GSN_SUB_SYNC_CONF (data)");
3001 #else
3002  if (error == 0)
3003  {
3004  SubSyncConf * const conf = (SubSyncConf*)signal->getDataPtrSend();
3005  conf->senderRef = suma.reference();
3006  conf->senderData = m_senderData;
3007  suma.sendSignal(m_senderRef, GSN_SUB_SYNC_CONF, signal,
3008  SubSyncConf::SignalLength, JBB);
3009  }
3010  else
3011  {
3012  SubSyncRef * const ref = (SubSyncRef*)signal->getDataPtrSend();
3013  ref->senderRef = suma.reference();
3014  ref->senderData = m_senderData;
3015  suma.sendSignal(m_senderRef, GSN_SUB_SYNC_REF, signal,
3016  SubSyncRef::SignalLength, JBB);
3017  }
3018 #endif
3019 
3020  release();
3021  LocalDLList<SyncRecord> list(suma.c_syncPool, subPtr.p->m_syncRecords);
3022  Ptr<SyncRecord> tmp;
3023  tmp.i = ptrI;
3024  tmp.p = this;
3025  list.release(tmp);
3026 
3027  DBUG_VOID_RETURN;
3028 }
3029 
3030 void
3032  jamEntry();
3033 #if 0
3034  ndbout << "execSCAN_HBREP" << endl << hex;
3035  for(int i = 0; i<signal->length(); i++){
3036  ndbout << signal->theData[i] << " ";
3037  if(((i + 1) % 8) == 0)
3038  ndbout << endl << hex;
3039  }
3040  ndbout << endl;
3041 #endif
3042 }
3043 
3044 /**********************************************************
3045  *
3046  * Suma participant interface
3047  *
3048  * Creation of subscriber
3049  *
3050  */
3051 
3052 void
3054  jamEntry();
3055  ndbassert(signal->getNoOfSections() == 0);
3056  DBUG_ENTER("Suma::execSUB_START_REQ");
3057  SubStartReq * const req = (SubStartReq*)signal->getDataPtr();
3058 
3059  CRASH_INSERTION(13013);
3060  Uint32 senderRef = req->senderRef;
3061  Uint32 senderData = req->senderData;
3062  Uint32 subscriberData = req->subscriberData;
3063  Uint32 subscriberRef = req->subscriberRef;
3064  SubscriptionData::Part part = (SubscriptionData::Part)req->part;
3065  (void)part; // TODO validate part
3066 
3067  Subscription key;
3068  key.m_subscriptionId = req->subscriptionId;
3069  key.m_subscriptionKey = req->subscriptionKey;
3070 
3071  SubscriptionPtr subPtr;
3072 
3073  CRASH_INSERTION2(13042, getNodeState().startLevel == NodeState::SL_STARTING);
3074 
3075  if (c_startup.m_restart_server_node_id == RNIL)
3076  {
3077  jam();
3078 
3082  sendSubStartRef(signal,
3083  senderRef, senderData, SubStartRef::NotStarted);
3084  return;
3085  }
3086 
3087  bool found = c_subscriptions.find(subPtr, key);
3088  if (!found)
3089  {
3090  jam();
3091  sendSubStartRef(signal,
3092  senderRef, senderData, SubStartRef::NoSuchSubscription);
3093  return;
3094  }
3095 
3096  if (ERROR_INSERTED(13046))
3097  {
3098  jam();
3099  CLEAR_ERROR_INSERT_VALUE;
3100  sendSubStartRef(signal,
3101  senderRef, senderData, SubStartRef::NoSuchSubscription);
3102  return;
3103  }
3104 
3105  switch(subPtr.p->m_state){
3106  case Subscription::UNDEFINED:
3107  jam();
3108  ndbrequire(false);
3109  case Subscription::DEFINING:
3110  jam();
3111  sendSubStartRef(signal,
3112  senderRef, senderData, SubStartRef::Defining);
3113  return;
3114  case Subscription::DEFINED:
3115  break;
3116  }
3117 
3118  if (subPtr.p->m_options & Subscription::MARKED_DROPPED)
3119  {
3120  jam();
3121  if (c_startup.m_restart_server_node_id == 0)
3122  {
3123  sendSubStartRef(signal,
3124  senderRef, senderData, SubStartRef::Dropped);
3125  return;
3126  }
3127  else
3128  {
3132  }
3133  }
3134 
3135  if (subPtr.p->m_trigger_state == Subscription::T_ERROR)
3136  {
3137  jam();
3138  sendSubStartRef(signal,
3139  senderRef, senderData, subPtr.p->m_errorCode);
3140  return;
3141  }
3142 
3143  SubscriberPtr subbPtr;
3144  if(!c_subscriberPool.seize(subbPtr))
3145  {
3146  jam();
3147  sendSubStartRef(signal,
3148  senderRef, senderData, SubStartRef::OutOfSubscriberRecords);
3149  return;
3150  }
3151 
3152  Ptr<SubOpRecord> subOpPtr;
3153  if (!c_subOpPool.seize(subOpPtr))
3154  {
3155  jam();
3156  c_subscriberPool.release(subbPtr);
3157  sendSubStartRef(signal,
3158  senderRef, senderData, SubStartRef::OutOfSubOpRecords);
3159  return;
3160  }
3161 
3162  if (! check_sub_start(subscriberRef))
3163  {
3164  jam();
3165  c_subscriberPool.release(subbPtr);
3166  c_subOpPool.release(subOpPtr);
3167  sendSubStartRef(signal,
3168  senderRef, senderData, SubStartRef::NodeDied);
3169  return;
3170  }
3171 
3172  // setup subscriber record
3173  subbPtr.p->m_senderRef = subscriberRef;
3174  subbPtr.p->m_senderData = subscriberData;
3175 
3176  subOpPtr.p->m_opType = SubOpRecord::R_SUB_START_REQ;
3177  subOpPtr.p->m_subPtrI = subPtr.i;
3178  subOpPtr.p->m_senderRef = senderRef;
3179  subOpPtr.p->m_senderData = senderData;
3180  subOpPtr.p->m_subscriberRef = subbPtr.i;
3181 
3182  {
3183  LocalDLFifoList<SubOpRecord> subOpList(c_subOpPool, subPtr.p->m_start_req);
3184  subOpList.add(subOpPtr);
3185  }
3186 
3190  switch(subPtr.p->m_trigger_state){
3191  case Subscription::T_UNDEFINED:
3192  jam();
3196  create_triggers(signal, subPtr);
3197  break;
3198  case Subscription::T_CREATING:
3199  jam();
3203  return;
3204  case Subscription::T_DROPPING:
3205  jam();
3210  break;
3211  case Subscription::T_DEFINED:{
3212  jam();
3213  report_sub_start_conf(signal, subPtr);
3214  return;
3215  }
3216  case Subscription::T_ERROR:
3217  jam();
3218  ndbrequire(false); // Checked above
3219  break;
3220  }
3221 }
3222 
3223 void
3224 Suma::sendSubStartRef(Signal* signal, Uint32 dstref, Uint32 data, Uint32 err)
3225 {
3226  jam();
3227  SubStartRef * ref = (SubStartRef *)signal->getDataPtrSend();
3228  ref->senderRef = reference();
3229  ref->senderData = data;
3230  ref->errorCode = err;
3231  sendSignal(dstref, GSN_SUB_START_REF, signal,
3232  SubStartRef::SignalLength, JBB);
3233 }
3234 
3235 void
3236 Suma::create_triggers(Signal* signal, SubscriptionPtr subPtr)
3237 {
3238  jam();
3239 
3240  ndbrequire(subPtr.p->m_trigger_state == Subscription::T_UNDEFINED);
3241  subPtr.p->m_trigger_state = Subscription::T_CREATING;
3242 
3243  TablePtr tabPtr;
3244  c_tablePool.getPtr(tabPtr, subPtr.p->m_table_ptrI);
3245 
3246  AttributeMask attrMask;
3247  tabPtr.p->createAttributeMask(attrMask, *this);
3248 
3249  subPtr.p->m_outstanding_trigger = 3;
3250  for(Uint32 j = 0; j<3; j++)
3251  {
3252  Uint32 triggerId = (tabPtr.p->m_schemaVersion << 18) | (j << 16) | subPtr.i;
3253  ndbrequire(subPtr.p->m_triggers[j] == ILLEGAL_TRIGGER_ID);
3254 
3255  CreateTrigImplReq * const req =
3256  (CreateTrigImplReq*)signal->getDataPtrSend();
3257  req->senderRef = SUMA_REF;
3258  req->senderData = subPtr.i;
3259  req->requestType = 0;
3260 
3261  Uint32 ti = 0;
3262  TriggerInfo::setTriggerType(ti, TriggerType::SUBSCRIPTION_BEFORE);
3263  TriggerInfo::setTriggerActionTime(ti, TriggerActionTime::TA_DETACHED);
3264  TriggerInfo::setTriggerEvent(ti, (TriggerEvent::Value)j);
3265  TriggerInfo::setMonitorReplicas(ti, true);
3266  //TriggerInfo::setMonitorAllAttributes(ti, j == TriggerEvent::TE_DELETE);
3267  TriggerInfo::setMonitorAllAttributes(ti, true);
3268  TriggerInfo::setReportAllMonitoredAttributes(ti,
3269  subPtr.p->m_options & Subscription::REPORT_ALL);
3270  req->triggerInfo = ti;
3271 
3272  req->receiverRef = SUMA_REF;
3273  req->triggerId = triggerId;
3274  req->tableId = subPtr.p->m_tableId;
3275  req->tableVersion = 0; // not used
3276  req->indexId = ~(Uint32)0;
3277  req->indexVersion = 0;
3278 
3279  LinearSectionPtr ptr[3];
3280  ptr[0].p = attrMask.rep.data;
3281  ptr[0].sz = attrMask.getSizeInWords();
3282  sendSignal(DBTUP_REF, GSN_CREATE_TRIG_IMPL_REQ,
3283  signal, CreateTrigImplReq::SignalLength, JBB, ptr, 1);
3284  }
3285 }
3286 
3287 void
3289 {
3290  jamEntry();
3291 
3292  CreateTrigImplConf * conf = (CreateTrigImplConf*)signal->getDataPtr();
3293  const Uint32 triggerId = conf->triggerId;
3294  Uint32 type = (triggerId >> 16) & 0x3;
3295  Uint32 tableId = conf->tableId;
3296 
3297  TablePtr tabPtr;
3298  SubscriptionPtr subPtr;
3299  c_subscriptions.getPtr(subPtr, conf->senderData);
3300  c_tables.getPtr(tabPtr, subPtr.p->m_table_ptrI);
3301 
3302  ndbrequire(tabPtr.p->m_tableId == tableId);
3303  ndbrequire(subPtr.p->m_trigger_state == Subscription::T_CREATING);
3304 
3305  ndbrequire(type < 3);
3306  ndbrequire(subPtr.p->m_triggers[type] == ILLEGAL_TRIGGER_ID);
3307  subPtr.p->m_triggers[type] = triggerId;
3308 
3309  ndbrequire(subPtr.p->m_outstanding_trigger);
3310  subPtr.p->m_outstanding_trigger--;
3311 
3312  if (subPtr.p->m_outstanding_trigger)
3313  {
3314  jam();
3318  return;
3319  }
3320 
3321  if (subPtr.p->m_errorCode == 0)
3322  {
3323  jam();
3324  subPtr.p->m_trigger_state = Subscription::T_DEFINED;
3325  report_sub_start_conf(signal, subPtr);
3326  }
3327  else
3328  {
3329  jam();
3330  subPtr.p->m_trigger_state = Subscription::T_ERROR;
3331  drop_triggers(signal, subPtr);
3332  }
3333 }
3334 
3335 void
3337 {
3338  jamEntry();
3339 
3340  CreateTrigImplRef * const ref = (CreateTrigImplRef*)signal->getDataPtr();
3341  const Uint32 triggerId = ref->triggerId;
3342  Uint32 type = (triggerId >> 16) & 0x3;
3343  Uint32 tableId = ref->tableId;
3344 
3345  TablePtr tabPtr;
3346  SubscriptionPtr subPtr;
3347  c_subscriptions.getPtr(subPtr, ref->senderData);
3348  c_tables.getPtr(tabPtr, subPtr.p->m_table_ptrI);
3349 
3350  ndbrequire(tabPtr.p->m_tableId == tableId);
3351  ndbrequire(subPtr.p->m_trigger_state == Subscription::T_CREATING);
3352 
3353  ndbrequire(type < 3);
3354  ndbrequire(subPtr.p->m_triggers[type] == ILLEGAL_TRIGGER_ID);
3355 
3356  subPtr.p->m_errorCode = ref->errorCode;
3357 
3358  ndbrequire(subPtr.p->m_outstanding_trigger);
3359  subPtr.p->m_outstanding_trigger--;
3360 
3361  if (subPtr.p->m_outstanding_trigger)
3362  {
3363  jam();
3367  return;
3368  }
3369 
3370  subPtr.p->m_trigger_state = Subscription::T_ERROR;
3371  drop_triggers(signal, subPtr);
3372 }
3373 
3374 bool
3375 Suma::check_sub_start(Uint32 subscriberRef)
3376 {
3377  Uint32 nodeId = refToNode(subscriberRef);
3378  bool startme = c_startup.m_restart_server_node_id;
3379  bool handover = c_startup.m_wait_handover;
3380  bool connected =
3381  c_failedApiNodes.get(nodeId) == false &&
3382  c_connected_nodes.get(nodeId);
3383 
3384  return (startme || handover || connected);
3385 }
3386 
3387 void
3389 {
3390  const Uint64 gci = get_current_gci(signal);
3391  {
3393  subPtr.p->m_subscribers);
3394  LocalDLFifoList<SubOpRecord> subOpList(c_subOpPool, subPtr.p->m_start_req);
3395 
3396  Ptr<Subscriber> ptr;
3397  Ptr<SubOpRecord> subOpPtr;
3398  for (subOpList.first(subOpPtr); !subOpPtr.isNull(); )
3399  {
3400  jam();
3401 
3402  Uint32 senderRef = subOpPtr.p->m_senderRef;
3403  Uint32 senderData = subOpPtr.p->m_senderData;
3404  c_subscriberPool.getPtr(ptr, subOpPtr.p->m_subscriberRef);
3405 
3406  if (check_sub_start(ptr.p->m_senderRef))
3407  {
3408  SubStartConf* conf = (SubStartConf*)signal->getDataPtrSend();
3409  conf->senderRef = reference();
3410  conf->senderData = senderData;
3411  conf->subscriptionId = subPtr.p->m_subscriptionId;
3412  conf->subscriptionKey = subPtr.p->m_subscriptionKey;
3413  conf->firstGCI = Uint32(gci >> 32);
3414  conf->part = SubscriptionData::TableData;
3415  conf->bucketCount = c_no_of_buckets;
3416  conf->nodegroup = c_nodeGroup;
3417  sendSignal(senderRef, GSN_SUB_START_CONF, signal,
3418  SubStartConf::SignalLength, JBB);
3419 
3424  bool report = subPtr.p->m_options & Subscription::REPORT_SUBSCRIBE;
3425  send_sub_start_stop_event(signal, ptr,NdbDictionary::Event::_TE_ACTIVE,
3426  report, list);
3427 
3428  list.add(ptr);
3429  c_subscriber_nodes.set(refToNode(ptr.p->m_senderRef));
3430  c_subscriber_per_node[refToNode(ptr.p->m_senderRef)]++;
3431  }
3432  else
3433  {
3434  jam();
3435 
3436  sendSubStartRef(signal,
3437  senderRef, senderData, SubStartRef::NodeDied);
3438 
3440  }
3441 
3442  Ptr<SubOpRecord> tmp = subOpPtr;
3443  subOpList.next(subOpPtr);
3444  subOpList.release(tmp);
3445  }
3446  }
3447 
3448  check_release_subscription(signal, subPtr);
3449 }
3450 
3451 void
3452 Suma::report_sub_start_ref(Signal* signal,
3453  Ptr<Subscription> subPtr,
3454  Uint32 errCode)
3455 {
3457  subPtr.p->m_subscribers);
3458  LocalDLFifoList<SubOpRecord> subOpList(c_subOpPool, subPtr.p->m_start_req);
3459 
3460  Ptr<Subscriber> ptr;
3461  Ptr<SubOpRecord> subOpPtr;
3462  for (subOpList.first(subOpPtr); !subOpPtr.isNull(); )
3463  {
3464  jam();
3465 
3466  Uint32 senderRef = subOpPtr.p->m_senderRef;
3467  Uint32 senderData = subOpPtr.p->m_senderData;
3468  c_subscriberPool.getPtr(ptr, subOpPtr.p->m_subscriberRef);
3469 
3470  SubStartRef* ref = (SubStartRef*)signal->getDataPtrSend();
3471  ref->senderRef = reference();
3472  ref->senderData = senderData;
3473  ref->errorCode = errCode;
3474 
3475  sendSignal(senderRef, GSN_SUB_START_REF, signal,
3476  SubStartConf::SignalLength, JBB);
3477 
3478 
3479  Ptr<SubOpRecord> tmp = subOpPtr;
3480  subOpList.next(subOpPtr);
3481  subOpList.release(tmp);
3483  }
3484 }
3485 
3486 void
3487 Suma::drop_triggers(Signal* signal, SubscriptionPtr subPtr)
3488 {
3489  jam();
3490 
3491  subPtr.p->m_outstanding_trigger = 0;
3492 
3493  Ptr<Table> tabPtr;
3494  c_tablePool.getPtr(tabPtr, subPtr.p->m_table_ptrI);
3495  if (tabPtr.p->m_state == Table::DROPPED)
3496  {
3497  jam();
3498  subPtr.p->m_triggers[0] = ILLEGAL_TRIGGER_ID;
3499  subPtr.p->m_triggers[1] = ILLEGAL_TRIGGER_ID;
3500  subPtr.p->m_triggers[2] = ILLEGAL_TRIGGER_ID;
3501  }
3502  else
3503  {
3504  for(Uint32 j = 0; j<3; j++)
3505  {
3506  jam();
3507  Uint32 triggerId = subPtr.p->m_triggers[j];
3508  if (triggerId != ILLEGAL_TRIGGER_ID)
3509  {
3510  subPtr.p->m_outstanding_trigger++;
3511 
3512  DropTrigImplReq * const req =
3513  (DropTrigImplReq*)signal->getDataPtrSend();
3514  req->senderRef = SUMA_REF; // Sending to myself
3515  req->senderData = subPtr.i;
3516  req->requestType = 0;
3517 
3518  // TUP needs some triggerInfo to find right list
3519  Uint32 ti = 0;
3520  TriggerInfo::setTriggerType(ti, TriggerType::SUBSCRIPTION_BEFORE);
3521  TriggerInfo::setTriggerActionTime(ti, TriggerActionTime::TA_DETACHED);
3522  TriggerInfo::setTriggerEvent(ti, (TriggerEvent::Value)j);
3523  TriggerInfo::setMonitorReplicas(ti, true);
3524  //TriggerInfo::setMonitorAllAttributes(ti, j ==TriggerEvent::TE_DELETE);
3525  TriggerInfo::setMonitorAllAttributes(ti, true);
3526  TriggerInfo::setReportAllMonitoredAttributes(ti,
3527  subPtr.p->m_options & Subscription::REPORT_ALL);
3528  req->triggerInfo = ti;
3529 
3530  req->tableId = subPtr.p->m_tableId;
3531  req->tableVersion = 0; // not used
3532  req->indexId = RNIL;
3533  req->indexVersion = 0;
3534  req->triggerId = triggerId;
3535  req->receiverRef = SUMA_REF;
3536 
3537  c_outstanding_drop_trig_req++;
3538  sendSignal(DBTUP_REF, GSN_DROP_TRIG_IMPL_REQ,
3539  signal, DropTrigImplReq::SignalLength, JBB);
3540  }
3541  }
3542  }
3543 
3544  if (subPtr.p->m_outstanding_trigger == 0)
3545  {
3546  jam();
3547  drop_triggers_complete(signal, subPtr);
3548  }
3549 }
3550 
3551 void
3553 {
3554  jamEntry();
3555  DropTrigImplRef * const ref = (DropTrigImplRef*)signal->getDataPtr();
3556  Ptr<Table> tabPtr;
3557  Ptr<Subscription> subPtr;
3558  const Uint32 triggerId = ref->triggerId;
3559  const Uint32 type = (triggerId >> 16) & 0x3;
3560 
3561  c_subscriptionPool.getPtr(subPtr, ref->senderData);
3562  c_tables.getPtr(tabPtr, subPtr.p->m_table_ptrI);
3563  ndbrequire(tabPtr.p->m_tableId == ref->tableId);
3564 
3565  ndbrequire(type < 3);
3566  ndbrequire(subPtr.p->m_triggers[type] != ILLEGAL_TRIGGER_ID);
3567  subPtr.p->m_triggers[type] = ILLEGAL_TRIGGER_ID;
3568 
3569  ndbrequire(subPtr.p->m_outstanding_trigger);
3570  subPtr.p->m_outstanding_trigger--;
3571 
3572  ndbrequire(c_outstanding_drop_trig_req);
3573  c_outstanding_drop_trig_req--;
3574 
3575  if (subPtr.p->m_outstanding_trigger)
3576  {
3577  jam();
3581  return;
3582  }
3583 
3584  drop_triggers_complete(signal, subPtr);
3585 }
3586 
3587 void
3589 {
3590  jamEntry();
3591 
3592  DropTrigImplConf * const conf = (DropTrigImplConf*)signal->getDataPtr();
3593 
3594  Ptr<Table> tabPtr;
3595  Ptr<Subscription> subPtr;
3596  const Uint32 triggerId = conf->triggerId;
3597  const Uint32 type = (triggerId >> 16) & 0x3;
3598 
3599  c_subscriptionPool.getPtr(subPtr, conf->senderData);
3600  c_tables.getPtr(tabPtr, subPtr.p->m_table_ptrI);
3601  ndbrequire(tabPtr.p->m_tableId == conf->tableId);
3602 
3603  ndbrequire(type < 3);
3604  ndbrequire(subPtr.p->m_triggers[type] != ILLEGAL_TRIGGER_ID);
3605  subPtr.p->m_triggers[type] = ILLEGAL_TRIGGER_ID;
3606 
3607  ndbrequire(subPtr.p->m_outstanding_trigger);
3608  subPtr.p->m_outstanding_trigger--;
3609 
3610  ndbrequire(c_outstanding_drop_trig_req);
3611  c_outstanding_drop_trig_req--;
3612 
3613  if (subPtr.p->m_outstanding_trigger)
3614  {
3615  jam();
3619  return;
3620  }
3621 
3622  drop_triggers_complete(signal, subPtr);
3623 }
3624 
3625 void
3627 {
3628  switch(subPtr.p->m_trigger_state){
3629  case Subscription::T_UNDEFINED:
3630  case Subscription::T_CREATING:
3631  case Subscription::T_DEFINED:
3632  jam();
3633  ndbrequire(false);
3634  break;
3635  case Subscription::T_DROPPING:
3636  jam();
3639  subPtr.p->m_trigger_state = Subscription::T_UNDEFINED;
3640  if (!subPtr.p->m_start_req.isEmpty())
3641  {
3642  jam();
3643  create_triggers(signal, subPtr);
3644  return;
3645  }
3646  break;
3647  case Subscription::T_ERROR:
3648  jam();
3649  Uint32 err = subPtr.p->m_errorCode;
3650  subPtr.p->m_trigger_state = Subscription::T_UNDEFINED;
3651  subPtr.p->m_errorCode = 0;
3652  report_sub_start_ref(signal, subPtr, err);
3653  break;
3654  }
3655 
3656  check_release_subscription(signal, subPtr);
3657 }
3658 
3659 /**********************************************************
3660  * Suma participant interface
3661  *
3662  * Stopping and removing of subscriber
3663  *
3664  */
3665 
3666 void
3668  jamEntry();
3669  ndbassert(signal->getNoOfSections() == 0);
3670  DBUG_ENTER("Suma::execSUB_STOP_REQ");
3671 
3672  CRASH_INSERTION(13019);
3673 
3674  SubStopReq * const req = (SubStopReq*)signal->getDataPtr();
3675  Uint32 senderRef = req->senderRef;
3676  Uint32 senderData = req->senderData;
3677  Uint32 subscriberRef = req->subscriberRef;
3678  Uint32 subscriberData = req->subscriberData;
3679  SubscriptionPtr subPtr;
3680  Subscription key;
3681  key.m_subscriptionId = req->subscriptionId;
3682  key.m_subscriptionKey = req->subscriptionKey;
3683  bool abortStart = (req->requestInfo & SubStopReq::RI_ABORT_START);
3684 
3685  if (c_startup.m_restart_server_node_id == RNIL)
3686  {
3687  jam();
3688 
3692  sendSubStopRef(signal,
3693  senderRef, senderData, SubStopRef::NotStarted);
3694  return;
3695  }
3696 
3697  bool found = c_subscriptions.find(subPtr, key);
3698  if (!found)
3699  {
3700  jam();
3701  sendSubStopRef(signal,
3702  senderRef, senderData, SubStopRef::NoSuchSubscription);
3703  return;
3704  }
3705 
3706  switch(subPtr.p->m_state){
3707  case Subscription::UNDEFINED:
3708  jam();
3709  ndbrequire(false);
3710  case Subscription::DEFINING:
3711  jam();
3712  sendSubStopRef(signal,
3713  senderRef, senderData, SubStopRef::Defining);
3714  return;
3715  case Subscription::DEFINED:
3716  jam();
3717  break;
3718  }
3719 
3720  Ptr<SubOpRecord> subOpPtr;
3721  LocalDLFifoList<SubOpRecord> list(c_subOpPool, subPtr.p->m_stop_req);
3722  bool empty = list.isEmpty();
3723  if (list.seize(subOpPtr) == false)
3724  {
3725  jam();
3726  sendSubStopRef(signal,
3727  senderRef, senderData, SubStopRef::OutOfSubOpRecords);
3728  return;
3729  }
3730 
3731  if (abortStart)
3732  {
3733  jam();
3734  subOpPtr.p->m_opType = SubOpRecord::R_SUB_ABORT_START_REQ;
3735  }
3736  else
3737  {
3738  jam();
3739  subOpPtr.p->m_opType = SubOpRecord::R_SUB_STOP_REQ;
3740  }
3741  subOpPtr.p->m_subPtrI = subPtr.i;
3742  subOpPtr.p->m_senderRef = senderRef;
3743  subOpPtr.p->m_senderData = senderData;
3744  subOpPtr.p->m_subscriberRef = subscriberRef;
3745  subOpPtr.p->m_subscriberData = subscriberData;
3746 
3747 
3748  if (empty)
3749  {
3750  jam();
3751  signal->theData[0] = SumaContinueB::SUB_STOP_REQ;
3752  signal->theData[1] = subOpPtr.i;
3753  signal->theData[2] = RNIL;
3754  sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB);
3755  }
3756 }
3757 
3758 void
3760 {
3761  jam();
3762 
3763  Ptr<SubOpRecord> subOpPtr;
3764  c_subOpPool.getPtr(subOpPtr, signal->theData[1]);
3765 
3766  Ptr<Subscription> subPtr;
3767  c_subscriptionPool.getPtr(subPtr, subOpPtr.p->m_subPtrI);
3768 
3769  Ptr<Subscriber> ptr;
3770  {
3771  LocalDLList<Subscriber> list(c_subscriberPool, subPtr.p->m_subscribers);
3772  if (signal->theData[2] == RNIL)
3773  {
3774  jam();
3775  list.first(ptr);
3776  }
3777  else
3778  {
3779  jam();
3780  list.getPtr(ptr, signal->theData[2]);
3781  }
3782 
3783  for (Uint32 i = 0; i<32 && !ptr.isNull(); i++, list.next(ptr))
3784  {
3785  if (ptr.p->m_senderRef == subOpPtr.p->m_subscriberRef &&
3786  ptr.p->m_senderData == subOpPtr.p->m_subscriberData)
3787  {
3788  jam();
3789  goto found;
3790  }
3791  }
3792  }
3793 
3794  if (ptr.isNull())
3795  {
3796  jam();
3797  sendSubStopRef(signal,
3798  subOpPtr.p->m_senderRef,
3799  subOpPtr.p->m_senderData,
3800  SubStopRef::NoSuchSubscriber);
3801  check_remove_queue(signal, subPtr, subOpPtr, true, true);
3802  return;
3803  }
3804 
3805  signal->theData[0] = SumaContinueB::SUB_STOP_REQ;
3806  signal->theData[1] = subOpPtr.i;
3807  signal->theData[2] = ptr.i;
3808  sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB);
3809  return;
3810 
3811 found:
3812  {
3813  LocalDLList<Subscriber> list(c_subscriberPool, subPtr.p->m_subscribers);
3814  list.remove(ptr);
3818  bool report = subPtr.p->m_options & Subscription::REPORT_SUBSCRIBE;
3819  report_sub_stop_conf(signal, subOpPtr, ptr, report, list);
3821  }
3822  check_remove_queue(signal, subPtr, subOpPtr, true, true);
3823  check_release_subscription(signal, subPtr);
3824 }
3825 
3826 void
3827 Suma::check_remove_queue(Signal* signal,
3828  Ptr<Subscription> subPtr,
3829  Ptr<SubOpRecord> subOpPtr,
3830  bool ishead,
3831  bool dorelease)
3832 {
3833  LocalDLFifoList<SubOpRecord> list(c_subOpPool, subPtr.p->m_stop_req);
3834 
3835  {
3836  Ptr<SubOpRecord> tmp;
3837  list.first(tmp);
3838  if (ishead)
3839  {
3840  jam();
3841  ndbrequire(tmp.i == subOpPtr.i);
3842  }
3843  else
3844  {
3845  jam();
3846  ishead = (tmp.i == subOpPtr.i);
3847  }
3848  }
3849 
3850  if (dorelease)
3851  {
3852  jam();
3853  list.release(subOpPtr);
3854  }
3855  else
3856  {
3857  jam();
3858  list.remove(subOpPtr);
3859  }
3860 
3861  if (ishead)
3862  {
3863  jam();
3864  if (list.first(subOpPtr) == false)
3865  {
3866  jam();
3867  c_restart.m_waiting_on_self = 1;
3868  return;
3869  }
3870  // Fall through
3871  }
3872  else
3873  {
3874  jam();
3875  return;
3876  }
3877 
3878  switch(subOpPtr.p->m_opType){
3879  case SubOpRecord::R_SUB_ABORT_START_REQ:
3880  case SubOpRecord::R_SUB_STOP_REQ:
3881  jam();
3882  signal->theData[0] = SumaContinueB::SUB_STOP_REQ;
3883  signal->theData[1] = subOpPtr.i;
3884  signal->theData[2] = RNIL;
3885  sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB);
3886  return;
3887  case SubOpRecord::R_API_FAIL_REQ:
3888  jam();
3889  signal->theData[0] = SumaContinueB::API_FAIL_SUBSCRIPTION;
3890  signal->theData[1] = subOpPtr.i;
3891  signal->theData[2] = RNIL;
3892  sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB);
3893  return;
3894  case SubOpRecord::R_START_ME_REQ:
3895  jam();
3896  sendSubCreateReq(signal, subPtr);
3897  return;
3898  }
3899 }
3900 
3901 void
3902 Suma::report_sub_stop_conf(Signal* signal,
3903  Ptr<SubOpRecord> subOpPtr,
3904  Ptr<Subscriber> ptr,
3905  bool report,
3907 {
3908  jam();
3909  CRASH_INSERTION(13020);
3910 
3911  Uint32 senderRef = subOpPtr.p->m_senderRef;
3912  Uint32 senderData = subOpPtr.p->m_senderData;
3913  bool abortStart = subOpPtr.p->m_opType == SubOpRecord::R_SUB_ABORT_START_REQ;
3914 
3915  // let subscriber know that subscrber is stopped
3916  if (!abortStart)
3917  {
3918  jam();
3919  send_sub_start_stop_event(signal, ptr, NdbDictionary::Event::_TE_STOP,
3920  report, list);
3921  }
3922 
3923  SubStopConf * const conf = (SubStopConf*)signal->getDataPtrSend();
3924  const Uint64 gci = m_max_seen_gci;
3925  conf->senderRef= reference();
3926  conf->senderData= senderData;
3927  conf->gci_hi= Uint32(gci>>32);
3928  conf->gci_lo= Uint32(gci);
3929  sendSignal(senderRef, GSN_SUB_STOP_CONF, signal,
3930  SubStopConf::SignalLength, JBB);
3931 
3932  Uint32 nodeId = refToNode(ptr.p->m_senderRef);
3933  if (c_subscriber_per_node[nodeId])
3934  {
3935  c_subscriber_per_node[nodeId]--;
3936  if (c_subscriber_per_node[nodeId] == 0)
3937  {
3938  jam();
3939  c_subscriber_nodes.clear(nodeId);
3940  }
3941  }
3942 }
3943 
3944 void
3945 Suma::sendSubStopRef(Signal* signal,
3946  Uint32 retref,
3947  Uint32 data,
3948  Uint32 errCode)
3949 {
3950  jam();
3951  SubStopRef * ref = (SubStopRef *)signal->getDataPtrSend();
3952  ref->senderRef = reference();
3953  ref->errorCode = errCode;
3954  ref->senderData = data;
3955  sendSignal(retref, GSN_SUB_STOP_REF, signal, SubStopRef::SignalLength, JBB);
3956 }
3957 
3958 // report new started subscriber to all other subscribers
3959 void
3960 Suma::send_sub_start_stop_event(Signal *signal,
3961  Ptr<Subscriber> ptr,
3962  NdbDictionary::Event::_TableEvent event,
3963  bool report,
3965 {
3966  const Uint64 gci = get_current_gci(signal);
3967  SubTableData * data = (SubTableData*)signal->getDataPtrSend();
3968  Uint32 nodeId = refToNode(ptr.p->m_senderRef);
3969 
3970  NdbDictionary::Event::_TableEvent other;
3971  if (event == NdbDictionary::Event::_TE_STOP)
3972  {
3973  other = NdbDictionary::Event::_TE_UNSUBSCRIBE;
3974  }
3975  else if (event == NdbDictionary::Event::_TE_ACTIVE)
3976  {
3977  other = NdbDictionary::Event::_TE_SUBSCRIBE;
3978  }
3979  else
3980  {
3981  jamLine(event);
3982  ndbrequire(false);
3983  }
3984 
3985  data->gci_hi = Uint32(gci >> 32);
3986  data->gci_lo = Uint32(gci);
3987  data->tableId = 0;
3988  data->requestInfo = 0;
3989  SubTableData::setOperation(data->requestInfo, event);
3990  SubTableData::setNdbdNodeId(data->requestInfo, getOwnNodeId());
3991  SubTableData::setReqNodeId(data->requestInfo, nodeId);
3992  data->changeMask = 0;
3993  data->totalLen = 0;
3994  data->senderData = ptr.p->m_senderData;
3995  sendSignal(ptr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
3996  SubTableData::SignalLength, JBB);
3997 
3998  if (report == false)
3999  {
4000  return;
4001  }
4002 
4003  data->requestInfo = 0;
4004  SubTableData::setOperation(data->requestInfo, other);
4005  SubTableData::setNdbdNodeId(data->requestInfo, getOwnNodeId());
4006 
4007  Ptr<Subscriber> tmp;
4008  for(list.first(tmp); !tmp.isNull(); list.next(tmp))
4009  {
4010  jam();
4011  SubTableData::setReqNodeId(data->requestInfo, nodeId);
4012  data->senderData = tmp.p->m_senderData;
4013  sendSignal(tmp.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
4014  SubTableData::SignalLength, JBB);
4015 
4016  ndbassert(tmp.i != ptr.i); // ptr should *NOT* be in list now
4017  if (other != NdbDictionary::Event::_TE_UNSUBSCRIBE)
4018  {
4019  jam();
4020  SubTableData::setReqNodeId(data->requestInfo,
4021  refToNode(tmp.p->m_senderRef));
4022 
4023  data->senderData = ptr.p->m_senderData;
4024  sendSignal(ptr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
4025  SubTableData::SignalLength, JBB);
4026  }
4027  }
4028 }
4029 
4030 void
4032  Suma &suma)
4033 {
4034  mask.clear();
4035  for(Uint32 i = 0; i<m_noOfAttributes; i++)
4036  mask.set(i);
4037 }
4038 
4039 void Suma::suma_ndbrequire(bool v) { ndbrequire(v); }
4040 
4041 
4042 /**********************************************************
4043  * Scan data interface
4044  *
4045  * Assumption: one execTRANSID_AI contains all attr info
4046  *
4047  */
4048 
4049 #define SUMA_BUF_SZ1 MAX_KEY_SIZE_IN_WORDS + MAX_TUPLE_SIZE_IN_WORDS
4050 #define SUMA_BUF_SZ MAX_ATTRIBUTES_IN_TABLE + SUMA_BUF_SZ1
4051 
4052 static Uint32 f_bufferLock = 0;
4053 static Uint32 f_buffer[SUMA_BUF_SZ];
4054 static Uint32 f_trigBufferSize = 0;
4055 static Uint32 b_bufferLock = 0;
4056 static Uint32 b_buffer[SUMA_BUF_SZ];
4057 static Uint32 b_trigBufferSize = 0;
4058 
4059 void
4060 Suma::execTRANSID_AI(Signal* signal)
4061 {
4062  jamEntry();
4063  DBUG_ENTER("Suma::execTRANSID_AI");
4064 
4065  CRASH_INSERTION(13015);
4066  TransIdAI * const data = (TransIdAI*)signal->getDataPtr();
4067  const Uint32 opPtrI = data->connectPtr;
4068  Uint32 length = signal->length() - 3;
4069 
4070  if(f_bufferLock == 0){
4071  f_bufferLock = opPtrI;
4072  } else {
4073  ndbrequire(f_bufferLock == opPtrI);
4074  }
4075 
4076  if (signal->getNoOfSections())
4077  {
4078  SectionHandle handle(this, signal);
4079  SegmentedSectionPtr dataPtr;
4080  handle.getSection(dataPtr, 0);
4081  length = dataPtr.sz;
4082  copy(data->attrData, dataPtr);
4083  releaseSections(handle);
4084  }
4085 
4086  Ptr<SyncRecord> syncPtr;
4087  c_syncPool.getPtr(syncPtr, (opPtrI >> 16));
4088 
4089  Uint32 sum = 0;
4090  Uint32 * dst = f_buffer + MAX_ATTRIBUTES_IN_TABLE;
4091  Uint32 * headers = f_buffer;
4092  const Uint32 * src = &data->attrData[0];
4093  const Uint32 * const end = &src[length];
4094 
4095  const Uint32 attribs = syncPtr.p->m_currentNoOfAttributes;
4096  for(Uint32 i = 0; i<attribs; i++){
4097  Uint32 tmp = * src++;
4098  * headers++ = tmp;
4099  Uint32 len = AttributeHeader::getDataSize(tmp);
4100 
4101  memcpy(dst, src, 4 * len);
4102  dst += len;
4103  src += len;
4104  sum += len;
4105  }
4106  f_trigBufferSize = sum;
4107 
4108  ndbrequire(src == end);
4109 
4110  if ((syncPtr.p->m_requestInfo & SubSyncReq::LM_Exclusive) == 0)
4111  {
4112  sendScanSubTableData(signal, syncPtr, 0);
4113  }
4114 
4115  DBUG_VOID_RETURN;
4116 }
4117 
4118 void
4119 Suma::execKEYINFO20(Signal* signal)
4120 {
4121  jamEntry();
4122  KeyInfo20* data = (KeyInfo20*)signal->getDataPtr();
4123 
4124  const Uint32 opPtrI = data->clientOpPtr;
4125  const Uint32 takeOver = data->scanInfo_Node;
4126 
4127  ndbrequire(f_bufferLock == opPtrI);
4128 
4129  Ptr<SyncRecord> syncPtr;
4130  c_syncPool.getPtr(syncPtr, (opPtrI >> 16));
4131  sendScanSubTableData(signal, syncPtr, takeOver);
4132 }
4133 
4134 void
4135 Suma::sendScanSubTableData(Signal* signal,
4136  Ptr<SyncRecord> syncPtr, Uint32 takeOver)
4137 {
4138  const Uint32 attribs = syncPtr.p->m_currentNoOfAttributes;
4139  const Uint32 sum = f_trigBufferSize;
4140 
4144  LinearSectionPtr ptr[3];
4145  ptr[0].p = f_buffer;
4146  ptr[0].sz = attribs;
4147 
4148  ptr[1].p = f_buffer + MAX_ATTRIBUTES_IN_TABLE;
4149  ptr[1].sz = sum;
4150 
4151  SubscriptionPtr subPtr;
4152  c_subscriptions.getPtr(subPtr, syncPtr.p->m_subscriptionPtrI);
4153 
4154 
4158  SubTableData * sdata = (SubTableData*)signal->getDataPtrSend();
4159  Uint32 ref = syncPtr.p->m_senderRef;
4160  sdata->tableId = syncPtr.p->m_tableId;
4161  sdata->senderData = syncPtr.p->m_senderData;
4162  sdata->requestInfo = 0;
4163  SubTableData::setOperation(sdata->requestInfo,
4164  NdbDictionary::Event::_TE_SCAN); // Scan
4165  sdata->gci_hi = 0; // Undefined
4166  sdata->gci_lo = 0;
4167  sdata->takeOver = takeOver;
4168 #if PRINT_ONLY
4169  ndbout_c("GSN_SUB_TABLE_DATA (scan) #attr: %d len: %d", attribs, sum);
4170 #else
4171  sendSignal(ref,
4172  GSN_SUB_TABLE_DATA,
4173  signal,
4174  SubTableData::SignalLength, JBB,
4175  ptr, 2);
4176 #endif
4177 
4181  f_bufferLock = 0;
4182 }
4183 
4184 /**********************************************************
4185  *
4186  * Trigger data interface
4187  *
4188  */
4189 
4190 void
4192 {
4193  jamEntry();
4194  DBUG_ENTER("Suma::execTRIG_ATTRINFO");
4195 
4196  CRASH_INSERTION(13016);
4197  TrigAttrInfo* const trg = (TrigAttrInfo*)signal->getDataPtr();
4198  const Uint32 trigId = trg->getTriggerId();
4199 
4200  const Uint32 dataLen = signal->length() - TrigAttrInfo::StaticLength;
4201 
4202  if(trg->getAttrInfoType() == TrigAttrInfo::BEFORE_VALUES){
4203  jam();
4204 
4205  ndbrequire(b_bufferLock == trigId);
4206 
4207  memcpy(b_buffer + b_trigBufferSize, trg->getData(), 4 * dataLen);
4208  b_trigBufferSize += dataLen;
4209 
4210  // printf("before values %u %u %u\n",trigId, dataLen, b_trigBufferSize);
4211  } else {
4212  jam();
4213 
4214  if(f_bufferLock == 0){
4215  f_bufferLock = trigId;
4216  f_trigBufferSize = 0;
4217  b_bufferLock = trigId;
4218  b_trigBufferSize = 0;
4219  } else {
4220  ndbrequire(f_bufferLock == trigId);
4221  }
4222 
4223  memcpy(f_buffer + f_trigBufferSize, trg->getData(), 4 * dataLen);
4224  f_trigBufferSize += dataLen;
4225  }
4226 
4227 
4228  DBUG_VOID_RETURN;
4229 }
4230 
4231 #ifdef NODEFAIL_DEBUG2
4232 static int theCounts[64] = {0};
4233 #endif
4234 
4235 Uint32
4236 Suma::get_responsible_node(Uint32 bucket) const
4237 {
4238  // id will contain id to responsible suma or
4239  // RNIL if we don't have nodegroup info yet
4240 
4241  jam();
4242  Uint32 node;
4243  const Bucket* ptr= c_buckets + bucket;
4244  for(Uint32 i = 0; i<MAX_REPLICAS; i++)
4245  {
4246  node= ptr->m_nodes[i];
4247  if(c_alive_nodes.get(node))
4248  {
4249 #ifdef NODEFAIL_DEBUG2
4250  theCounts[node]++;
4251  ndbout_c("Suma:responsible n=%u, D=%u, id = %u, count=%u",
4252  n,D, id, theCounts[node]);
4253 #endif
4254  return node;
4255  }
4256  }
4257 
4258  return 0;
4259 }
4260 
4261 Uint32
4262 Suma::get_responsible_node(Uint32 bucket, const NdbNodeBitmask& mask) const
4263 {
4264  jam();
4265  Uint32 node;
4266  const Bucket* ptr= c_buckets + bucket;
4267  for(Uint32 i = 0; i<MAX_REPLICAS; i++)
4268  {
4269  node= ptr->m_nodes[i];
4270  if(mask.get(node))
4271  {
4272  return node;
4273  }
4274  }
4275 
4276  return 0;
4277 }
4278 
4279 bool
4280 Suma::check_switchover(Uint32 bucket, Uint64 gci)
4281 {
4282  const Uint32 send_mask =
4283  Bucket::BUCKET_STARTING |
4284  Bucket::BUCKET_TAKEOVER |
4285  Bucket::BUCKET_SHUTDOWN_TO;
4286 
4287  bool send = c_buckets[bucket].m_state & send_mask;
4288  ndbassert(m_switchover_buckets.get(bucket));
4289  if(unlikely(gci > c_buckets[bucket].m_switchover_gci))
4290  {
4291  return send;
4292  }
4293  return !send;
4294 }
4295 
4296 static
4297 Uint32
4298 reformat(Signal* signal, LinearSectionPtr ptr[3],
4299  Uint32 * src_1, Uint32 sz_1,
4300  Uint32 * src_2, Uint32 sz_2)
4301 {
4302  Uint32 noOfAttrs = 0, dataLen = 0;
4303  Uint32 * headers = signal->theData + 25;
4304  Uint32 * dst = signal->theData + 25 + MAX_ATTRIBUTES_IN_TABLE;
4305 
4306  ptr[0].p = headers;
4307  ptr[1].p = dst;
4308 
4309  while(sz_1 > 0){
4310  Uint32 tmp = * src_1 ++;
4311  * headers ++ = tmp;
4312  Uint32 len = AttributeHeader::getDataSize(tmp);
4313  memcpy(dst, src_1, 4 * len);
4314  dst += len;
4315  src_1 += len;
4316 
4317  noOfAttrs++;
4318  dataLen += len;
4319  sz_1 -= (1 + len);
4320  }
4321  assert(sz_1 == 0);
4322 
4323  ptr[0].sz = noOfAttrs;
4324  ptr[1].sz = dataLen;
4325 
4326  ptr[2].p = src_2;
4327  ptr[2].sz = sz_2;
4328 
4329  return sz_2 > 0 ? 3 : 2;
4330 }
4331 
4336 void
4338 {
4339  jamEntry();
4340 
4341  ndbassert(signal->getNoOfSections() == 0);
4342  Uint32 pageId = signal->theData[0];
4343  Uint32 len = signal->theData[1];
4344 
4345  if (pageId == RNIL && len == 0)
4346  {
4347  jam();
4351  out_of_buffer(signal);
4352  return;
4353  }
4354 
4355  Uint32 * ptr = reinterpret_cast<Uint32*>(c_page_pool.getPtr(pageId));
4356  while (len)
4357  {
4358  Uint32 * save = ptr;
4359  Uint32 msglen = * ptr++;
4360  Uint32 siglen = * ptr++;
4361  Uint32 sec0len = * ptr++;
4362  Uint32 sec1len = * ptr++;
4363  Uint32 sec2len = * ptr++;
4364 
4368  Uint32 trigId = ((FireTrigOrd*)ptr)->getTriggerId();
4369  memcpy(signal->theData, ptr, 4 * siglen); // signal
4370  ptr += siglen;
4371  memcpy(f_buffer, ptr, 4*sec0len);
4372  ptr += sec0len;
4373  memcpy(b_buffer, ptr, 4*sec1len);
4374  ptr += sec1len;
4375  memcpy(f_buffer + sec0len, ptr, 4*sec2len);
4376  ptr += sec2len;
4377 
4378  f_trigBufferSize = sec0len + sec2len;
4379  b_trigBufferSize = sec1len;
4380  f_bufferLock = trigId;
4381  b_bufferLock = trigId;
4382 
4383  execFIRE_TRIG_ORD(signal);
4384 
4385  ndbrequire(ptr == save + msglen);
4386  ndbrequire(len >= msglen);
4387  len -= msglen;
4388  }
4389 
4390  m_ctx.m_mm.release_page(RT_DBTUP_PAGE, pageId);
4391 }
4392 
4393 void
4395 {
4396  jamEntry();
4397  DBUG_ENTER("Suma::execFIRE_TRIG_ORD");
4398 
4399  CRASH_INSERTION(13016);
4400  FireTrigOrd* const trg = (FireTrigOrd*)signal->getDataPtr();
4401  const Uint32 trigId = trg->getTriggerId();
4402  const Uint32 hashValue = trg->getHashValue();
4403  const Uint32 gci_hi = trg->getGCI();
4404  const Uint32 gci_lo = trg->m_gci_lo;
4405  const Uint64 gci = gci_lo | (Uint64(gci_hi) << 32);
4406  const Uint32 event = trg->getTriggerEvent();
4407  const Uint32 any_value = trg->getAnyValue();
4408  const Uint32 transId1 = trg->m_transId1;
4409  const Uint32 transId2 = trg->m_transId2;
4410 
4411  Ptr<Subscription> subPtr;
4412  c_subscriptionPool.getPtr(subPtr, trigId & 0xFFFF);
4413 
4414  ndbassert(gci > m_last_complete_gci);
4415 
4416  if (signal->getNoOfSections())
4417  {
4418  jam();
4419  ndbassert(isNdbMtLqh());
4420  SectionHandle handle(this, signal);
4421 
4422  ndbrequire(b_bufferLock == 0);
4423  ndbrequire(f_bufferLock == 0);
4424  f_bufferLock = trigId;
4425  b_bufferLock = trigId;
4426 
4427  SegmentedSectionPtr ptr;
4428  handle.getSection(ptr, 0); // Keys
4429  Uint32 sz = ptr.sz;
4430  copy(f_buffer, ptr);
4431 
4432  handle.getSection(ptr, 2); // After values
4433  copy(f_buffer + sz, ptr);
4434  f_trigBufferSize = sz + ptr.sz;
4435 
4436  handle.getSection(ptr, 1); // Before values
4437  copy(b_buffer, ptr);
4438  b_trigBufferSize = ptr.sz;
4439  releaseSections(handle);
4440  }
4441 
4442  jam();
4443  ndbrequire(f_bufferLock == trigId);
4447  f_bufferLock = 0;
4448  b_bufferLock = 0;
4449 
4450  Uint32 tableId = subPtr.p->m_tableId;
4451  Uint32 schemaVersion =
4452  c_tablePool.getPtr(subPtr.p->m_table_ptrI)->m_schemaVersion;
4453 
4454  Uint32 bucket= hashValue % c_no_of_buckets;
4455  m_max_seen_gci = (gci > m_max_seen_gci ? gci : m_max_seen_gci);
4456  if(m_active_buckets.get(bucket) ||
4457  (m_switchover_buckets.get(bucket) && (check_switchover(bucket, gci))))
4458  {
4459  m_max_sent_gci = (gci > m_max_sent_gci ? gci : m_max_sent_gci);
4460  Uint32 sz = trg->getNoOfPrimaryKeyWords()+trg->getNoOfAfterValueWords();
4461  ndbrequire(sz == f_trigBufferSize);
4462 
4463  LinearSectionPtr ptr[3];
4464  const Uint32 nptr= reformat(signal, ptr,
4465  f_buffer, f_trigBufferSize,
4466  b_buffer, b_trigBufferSize);
4467  Uint32 ptrLen= 0;
4468  for(Uint32 i =0; i < nptr; i++)
4469  ptrLen+= ptr[i].sz;
4473  SubTableData * data = (SubTableData*)signal->getDataPtrSend();//trg;
4474  data->gci_hi = gci_hi;
4475  data->gci_lo = gci_lo;
4476  data->tableId = tableId;
4477  data->requestInfo = 0;
4478  SubTableData::setOperation(data->requestInfo, event);
4479  data->flags = 0;
4480  data->anyValue = any_value;
4481  data->totalLen = ptrLen;
4482  data->transId1 = transId1;
4483  data->transId2 = transId2;
4484 
4485  {
4486  LocalDLList<Subscriber> list(c_subscriberPool, subPtr.p->m_subscribers);
4487  SubscriberPtr subbPtr;
4488  for(list.first(subbPtr); !subbPtr.isNull(); list.next(subbPtr))
4489  {
4490  data->senderData = subbPtr.p->m_senderData;
4491  sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
4492  SubTableData::SignalLengthWithTransId, JBB, ptr, nptr);
4493  }
4494  }
4495  }
4496  else
4497  {
4498  const uint buffer_header_sz = 6;
4499  Uint32* dst;
4500  Uint32 sz = f_trigBufferSize + b_trigBufferSize + buffer_header_sz;
4501  if((dst = get_buffer_ptr(signal, bucket, gci, sz)))
4502  {
4503  * dst++ = subPtr.i;
4504  * dst++ = schemaVersion;
4505  * dst++ = (event << 16) | f_trigBufferSize;
4506  * dst++ = any_value;
4507  * dst++ = transId1;
4508  * dst++ = transId2;
4509  memcpy(dst, f_buffer, f_trigBufferSize << 2);
4510  dst += f_trigBufferSize;
4511  memcpy(dst, b_buffer, b_trigBufferSize << 2);
4512  }
4513  }
4514 
4515  DBUG_VOID_RETURN;
4516 }
4517 
4518 void
4519 Suma::checkMaxBufferedEpochs(Signal *signal)
4520 {
4521  /*
4522  * Check if any subscribers are exceeding the MaxBufferedEpochs
4523  */
4524  Ptr<Gcp_record> gcp;
4525  jamEntry();
4526  if (c_gcp_list.isEmpty())
4527  {
4528  jam();
4529  return;
4530  }
4531  c_gcp_list.first(gcp);
4532  if (ERROR_INSERTED(13037))
4533  {
4534  jam();
4535  CLEAR_ERROR_INSERT_VALUE;
4536  ndbout_c("Simulating exceeding the MaxBufferedEpochs %u(%llu,%llu,%llu)",
4537  c_maxBufferedEpochs, m_max_seen_gci,
4538  m_last_complete_gci, gcp.p->m_gci);
4539  }
4540  else if (c_gcp_list.count() < c_maxBufferedEpochs)
4541  {
4542  return;
4543  }
4544  NodeBitmask subs = gcp.p->m_subscribers;
4545  jam();
4546  // Disconnect lagging subscribers waiting for oldest epoch
4547  ndbout_c("Found lagging epoch %llu", gcp.p->m_gci);
4548  for(Uint32 nodeId = 0; nodeId < MAX_NODES; nodeId++)
4549  {
4550  if (subs.get(nodeId))
4551  {
4552  jam();
4553  subs.clear(nodeId);
4554  // Disconnecting node
4555  signal->theData[0] = NDB_LE_SubscriptionStatus;
4556  signal->theData[1] = 1; // DISCONNECTED;
4557  signal->theData[2] = nodeId;
4558  signal->theData[3] = (Uint32) gcp.p->m_gci;
4559  signal->theData[4] = (Uint32) (gcp.p->m_gci >> 32);
4560  signal->theData[5] = (Uint32) c_gcp_list.count();
4561  signal->theData[6] = c_maxBufferedEpochs;
4562  sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 8, JBB);
4563 
4567  signal->theData[0] = nodeId;
4568  sendSignal(QMGR_REF, GSN_API_FAILREQ, signal, 1, JBA);
4569  }
4570  }
4571 }
4572 
4573 void
4575 {
4576  jamEntry();
4577  ndbassert(signal->getNoOfSections() == 0);
4578 
4579  SubGcpCompleteRep * rep = (SubGcpCompleteRep*)signal->getDataPtrSend();
4580  Uint32 gci_hi = rep->gci_hi;
4581  Uint32 gci_lo = rep->gci_lo;
4582  Uint64 gci = gci_lo | (Uint64(gci_hi) << 32);
4583 
4584  if (isNdbMtLqh() && m_gcp_rep_cnt > 1)
4585  {
4586 
4587 #define SSPP 0
4588 
4589  if (SSPP)
4590  printf("execSUB_GCP_COMPLETE_REP(%u/%u)", gci_hi, gci_lo);
4591  jam();
4592  Uint32 min = m_min_gcp_rep_counter_index;
4593  Uint32 sz = NDB_ARRAY_SIZE(m_gcp_rep_counter);
4594  for (Uint32 i = min; i != m_max_gcp_rep_counter_index; i = (i + 1) % sz)
4595  {
4596  jam();
4597  if (m_gcp_rep_counter[i].m_gci == gci)
4598  {
4599  jam();
4600  m_gcp_rep_counter[i].m_cnt ++;
4601  if (m_gcp_rep_counter[i].m_cnt == m_gcp_rep_cnt)
4602  {
4603  jam();
4607  if (i != min)
4608  {
4609  jam();
4610  m_gcp_rep_counter[i] = m_gcp_rep_counter[min];
4611  }
4612  m_min_gcp_rep_counter_index = (min + 1) % sz;
4613  if (SSPP)
4614  ndbout_c(" found - complete after: (min: %u max: %u)",
4615  m_min_gcp_rep_counter_index,
4616  m_max_gcp_rep_counter_index);
4617  goto found;
4618  }
4619  else
4620  {
4621  jam();
4622  if (SSPP)
4623  ndbout_c(" found - wait unchanged: (min: %u max: %u)",
4624  m_min_gcp_rep_counter_index,
4625  m_max_gcp_rep_counter_index);
4626  return; // Wait for more...
4627  }
4628  }
4629  }
4633  Uint32 next = (m_max_gcp_rep_counter_index + 1) % sz;
4634  ndbrequire(next != min); // ring buffer full
4635  m_gcp_rep_counter[m_max_gcp_rep_counter_index].m_gci = gci;
4636  m_gcp_rep_counter[m_max_gcp_rep_counter_index].m_cnt = 1;
4637  m_max_gcp_rep_counter_index = next;
4638  if (SSPP)
4639  ndbout_c(" new - after: (min: %u max: %u)",
4640  m_min_gcp_rep_counter_index,
4641  m_max_gcp_rep_counter_index);
4642  return;
4643  }
4644 found:
4645  bool drop = false;
4646  Uint32 flags = (m_missing_data)
4647  ? rep->flags | SubGcpCompleteRep::MISSING_DATA
4648  : rep->flags;
4649 
4650  if (ERROR_INSERTED(13036))
4651  {
4652  jam();
4653  CLEAR_ERROR_INSERT_VALUE;
4654  ndbout_c("Simulating out of event buffer at node failure");
4655  flags |= SubGcpCompleteRep::MISSING_DATA;
4656  }
4657 
4658 #ifdef VM_TRACE
4659  if (m_gcp_monitor == 0)
4660  {
4661  }
4662  else if (gci_hi == Uint32(m_gcp_monitor >> 32))
4663  {
4664  ndbrequire(gci_lo == Uint32(m_gcp_monitor) + 1);
4665  }
4666  else
4667  {
4668  ndbrequire(gci_hi == Uint32(m_gcp_monitor >> 32) + 1);
4669  ndbrequire(gci_lo == 0);
4670  }
4671  m_gcp_monitor = gci;
4672 #endif
4673 
4674  m_last_complete_gci = gci;
4675  checkMaxBufferedEpochs(signal);
4676  m_max_seen_gci = (gci > m_max_seen_gci ? gci : m_max_seen_gci);
4677 
4681  if(!m_switchover_buckets.isclear())
4682  {
4683  bool unlock = false;
4684  Uint32 i = m_switchover_buckets.find(0);
4685  for(; i != Bucket_mask::NotFound; i = m_switchover_buckets.find(i + 1))
4686  {
4687  if(gci > c_buckets[i].m_switchover_gci)
4688  {
4689  Uint32 state = c_buckets[i].m_state;
4690  m_switchover_buckets.clear(i);
4691  printf("%u/%u (%u/%u) switchover complete bucket %d state: %x",
4692  Uint32(gci >> 32),
4693  Uint32(gci),
4694  Uint32(c_buckets[i].m_switchover_gci >> 32),
4695  Uint32(c_buckets[i].m_switchover_gci),
4696  i, state);
4697 
4698  if(state & Bucket::BUCKET_STARTING)
4699  {
4703  jam();
4704  m_active_buckets.set(i);
4705  c_buckets[i].m_state &= ~(Uint32)Bucket::BUCKET_STARTING;
4706  ndbout_c("starting");
4707  m_gcp_complete_rep_count++;
4708  unlock = true;
4709  }
4710  else if(state & Bucket::BUCKET_TAKEOVER)
4711  {
4715  jam();
4716  Bucket* bucket= c_buckets + i;
4717  Page_pos pos= bucket->m_buffer_head;
4718  ndbrequire(pos.m_max_gci < gci);
4719 
4720  Buffer_page* page= c_page_pool.getPtr(pos.m_page_id);
4721  ndbout_c("takeover %d", pos.m_page_id);
4722  page->m_max_gci_hi = (Uint32)(pos.m_max_gci >> 32);
4723  page->m_max_gci_lo = (Uint32)(pos.m_max_gci & 0xFFFFFFFF);
4724  ndbassert(pos.m_max_gci != 0);
4725  page->m_words_used = pos.m_page_pos;
4726  page->m_next_page = RNIL;
4727  memset(&bucket->m_buffer_head, 0, sizeof(bucket->m_buffer_head));
4728  bucket->m_buffer_head.m_page_id = RNIL;
4729  bucket->m_buffer_head.m_page_pos = Buffer_page::DATA_WORDS + 1;
4730 
4731  m_active_buckets.set(i);
4732  m_gcp_complete_rep_count++;
4733  c_buckets[i].m_state &= ~(Uint32)Bucket::BUCKET_TAKEOVER;
4734  }
4735  else if (state & Bucket::BUCKET_HANDOVER)
4736  {
4740  jam();
4741  c_buckets[i].m_state &= ~(Uint32)Bucket::BUCKET_HANDOVER;
4742  m_gcp_complete_rep_count--;
4743  ndbout_c("handover");
4744  }
4745  else if (state & Bucket::BUCKET_CREATED_MASK)
4746  {
4747  jam();
4748  Uint32 cnt = state >> 8;
4749  Uint32 mask = Uint32(Bucket::BUCKET_CREATED_MASK) | (cnt << 8);
4750  c_buckets[i].m_state &= ~mask;
4751  flags |= SubGcpCompleteRep::ADD_CNT;
4752  flags |= (cnt << 16);
4753  ndbout_c("add %u %s", cnt,
4754  state & Bucket::BUCKET_CREATED_SELF ? "self" : "other");
4755  if (state & Bucket::BUCKET_CREATED_SELF &&
4756  get_responsible_node(i) == getOwnNodeId())
4757  {
4758  jam();
4759  m_active_buckets.set(i);
4760  m_gcp_complete_rep_count++;
4761  }
4762  }
4763  else if (state & Bucket::BUCKET_DROPPED_MASK)
4764  {
4765  jam();
4766  Uint32 cnt = state >> 8;
4767  Uint32 mask = Uint32(Bucket::BUCKET_DROPPED_MASK) | (cnt << 8);
4768  c_buckets[i].m_state &= ~mask;
4769  flags |= SubGcpCompleteRep::SUB_CNT;
4770  flags |= (cnt << 16);
4771  ndbout_c("sub %u %s", cnt,
4772  state & Bucket::BUCKET_DROPPED_SELF ? "self" : "other");
4773  if (state & Bucket::BUCKET_DROPPED_SELF)
4774  {
4775  m_active_buckets.clear(i);
4776  drop = true;
4777  }
4778  }
4779  else if (state & Bucket::BUCKET_SHUTDOWN)
4780  {
4781  jam();
4782  Uint32 nodeId = c_buckets[i].m_switchover_node;
4783  ndbrequire(nodeId == getOwnNodeId());
4784  m_active_buckets.clear(i);
4785  m_gcp_complete_rep_count--;
4786  ndbout_c("shutdown handover");
4787  c_buckets[i].m_state &= ~(Uint32)Bucket::BUCKET_SHUTDOWN;
4788  }
4789  else if (state & Bucket::BUCKET_SHUTDOWN_TO)
4790  {
4791  jam();
4792  Uint32 nodeId = c_buckets[i].m_switchover_node;
4793  NdbNodeBitmask nodegroup = c_nodes_in_nodegroup_mask;
4794  nodegroup.clear(nodeId);
4795  ndbrequire(get_responsible_node(i) == nodeId &&
4796  get_responsible_node(i, nodegroup) == getOwnNodeId());
4797  m_active_buckets.set(i);
4798  m_gcp_complete_rep_count++;
4799  ndbout_c("shutdown takover");
4800  c_buckets[i].m_state &= ~(Uint32)Bucket::BUCKET_SHUTDOWN_TO;
4801  }
4802  }
4803  }
4804 
4805  if (m_switchover_buckets.isclear())
4806  {
4807  jam();
4808  if(getNodeState().startLevel == NodeState::SL_STARTING &&
4809  c_startup.m_handover_nodes.isclear())
4810  {
4811  jam();
4812  sendSTTORRY(signal);
4813  }
4814  else if (getNodeState().startLevel >= NodeState::SL_STOPPING_1)
4815  {
4816  jam();
4817  ndbrequire(c_shutdown.m_wait_handover);
4818  StopMeConf * conf = CAST_PTR(StopMeConf, signal->getDataPtrSend());
4819  conf->senderData = c_shutdown.m_senderData;
4820  conf->senderRef = reference();
4821  sendSignal(c_shutdown.m_senderRef, GSN_STOP_ME_CONF, signal,
4822  StopMeConf::SignalLength, JBB);
4823  c_shutdown.m_wait_handover = false;
4824  infoEvent("Suma: handover complete");
4825  }
4826  }
4827 
4828  if (unlock)
4829  {
4830  jam();
4831  send_dict_unlock_ord(signal, DictLockReq::SumaHandOver);
4832  }
4833  }
4834 
4835  if(ERROR_INSERTED(13010))
4836  {
4837  CLEAR_ERROR_INSERT_VALUE;
4838  ndbout_c("Don't send GCP_COMPLETE_REP(%llu)", gci);
4839  return;
4840  }
4841 
4845  rep->gci_hi = gci_hi;
4846  rep->gci_lo = gci_lo;
4847  rep->flags = flags;
4848  rep->senderRef = reference();
4849  rep->gcp_complete_rep_count = m_gcp_complete_rep_count;
4850 
4851  if(m_gcp_complete_rep_count && !c_subscriber_nodes.isclear())
4852  {
4853  CRASH_INSERTION(13033);
4854 
4855  NodeReceiverGroup rg(API_CLUSTERMGR, c_subscriber_nodes);
4856  sendSignal(rg, GSN_SUB_GCP_COMPLETE_REP, signal,
4857  SubGcpCompleteRep::SignalLength, JBB);
4858 
4859  Ptr<Gcp_record> gcp;
4860  if(c_gcp_list.seize(gcp))
4861  {
4862  gcp.p->m_gci = gci;
4863  gcp.p->m_subscribers = c_subscriber_nodes;
4864  }
4865  else
4866  {
4867  char buf[100];
4868  c_subscriber_nodes.getText(buf);
4869  g_eventLogger->error("c_gcp_list.seize() failed: gci: %llu nodes: %s",
4870  gci, buf);
4871  }
4872  }
4873 
4877  bool subscribers = !c_subscriber_nodes.isclear();
4878  for(Uint32 i = 0; i<c_no_of_buckets; i++)
4879  {
4880  if(m_active_buckets.get(i))
4881  continue;
4882 
4883  if (subscribers || (c_buckets[i].m_state & Bucket::BUCKET_RESEND))
4884  {
4885  //Uint32* dst;
4886  get_buffer_ptr(signal, i, gci, 0);
4887  }
4888  }
4889 
4890  if(m_out_of_buffer_gci && gci > m_out_of_buffer_gci)
4891  {
4892  jam();
4893  infoEvent("Reenable event buffer");
4894  m_out_of_buffer_gci = 0;
4895  m_missing_data = false;
4896  }
4897 
4898  if (unlikely(drop))
4899  {
4900  jam();
4901  m_gcp_complete_rep_count = 0;
4902  c_nodeGroup = RNIL;
4903  c_nodes_in_nodegroup_mask.clear();
4904  fix_nodegroup();
4905  }
4906 }
4907 
4908 void
4909 Suma::execCREATE_TAB_CONF(Signal *signal)
4910 {
4911  jamEntry();
4912  DBUG_ENTER("Suma::execCREATE_TAB_CONF");
4913 
4914  DBUG_VOID_RETURN;
4915 }
4916 
4917 void
4919 {
4920  jamEntry();
4921  ndbassert(signal->getNoOfSections() == 0);
4922 
4923  DropTabConf * const conf = (DropTabConf*)signal->getDataPtr();
4924  Uint32 senderRef= conf->senderRef;
4925  Uint32 tableId= conf->tableId;
4926 
4927  TablePtr tabPtr;
4928  if (!c_tables.find(tabPtr, tableId))
4929  {
4930  jam();
4931  return;
4932  }
4933 
4934  DBUG_PRINT("info",("drop table id: %d[i=%u]", tableId, tabPtr.i));
4935  const Table::State old_state = tabPtr.p->m_state;
4936  tabPtr.p->m_state = Table::DROPPED;
4937  c_tables.remove(tabPtr);
4938 
4939  if (senderRef != 0)
4940  {
4941  jam();
4942 
4943  // dict coordinator sends info to API
4944 
4945  const Uint64 gci = get_current_gci(signal);
4946  SubTableData * data = (SubTableData*)signal->getDataPtrSend();
4947  data->gci_hi = Uint32(gci >> 32);
4948  data->gci_lo = Uint32(gci);
4949  data->tableId = tableId;
4950  data->requestInfo = 0;
4951  SubTableData::setOperation(data->requestInfo,
4952  NdbDictionary::Event::_TE_DROP);
4953  SubTableData::setReqNodeId(data->requestInfo, refToNode(senderRef));
4954 
4955  Ptr<Subscription> subPtr;
4956  LocalDLList<Subscription> subList(c_subscriptionPool,
4957  tabPtr.p->m_subscriptions);
4958 
4959  for (subList.first(subPtr); !subPtr.isNull(); subList.next(subPtr))
4960  {
4961  jam();
4962  if(subPtr.p->m_subscriptionType != SubCreateReq::TableEvent)
4963  {
4964  jam();
4965  continue;
4966  //continue in for-loop if the table is not part of
4967  //the subscription. Otherwise, send data to subscriber.
4968  }
4969 
4970  if (subPtr.p->m_options & Subscription::NO_REPORT_DDL)
4971  {
4972  jam();
4973  continue;
4974  }
4975 
4976  Ptr<Subscriber> ptr;
4977  LocalDLList<Subscriber> list(c_subscriberPool, subPtr.p->m_subscribers);
4978  for(list.first(ptr); !ptr.isNull(); list.next(ptr))
4979  {
4980  jam();
4981  data->senderData= ptr.p->m_senderData;
4982  sendSignal(ptr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
4983  SubTableData::SignalLength, JBB);
4984  }
4985  }
4986  }
4987 
4988  if (old_state == Table::DEFINING)
4989  {
4990  jam();
4991  return;
4992  }
4993 
4994  if (tabPtr.p->m_subscriptions.isEmpty())
4995  {
4996  jam();
4997  tabPtr.p->release(* this);
4998  c_tablePool.release(tabPtr);
4999  return;
5000  }
5001  else
5002  {
5007  Ptr<Subscription> subPtr;
5008  {
5009  LocalDLList<Subscription> subList(c_subscriptionPool,
5010  tabPtr.p->m_subscriptions);
5011  subList.first(subPtr);
5012  }
5013  while (!subPtr.isNull())
5014  {
5015  Ptr<Subscription> tmp = subPtr;
5016  {
5017  LocalDLList<Subscription> subList(c_subscriptionPool,
5018  tabPtr.p->m_subscriptions);
5019  subList.next(subPtr);
5020  }
5021  check_release_subscription(signal, tmp);
5022  }
5023  }
5024 }
5025 
5030 void
5032 {
5033  jamEntry();
5034 
5035  AlterTabReq * const req = (AlterTabReq*)signal->getDataPtr();
5036  Uint32 senderRef= req->senderRef;
5037  Uint32 tableId= req->tableId;
5038  Uint32 changeMask= req->changeMask;
5039  TablePtr tabPtr;
5040 
5041  // Copy DICT_TAB_INFO to local linear buffer
5042  SectionHandle handle(this, signal);
5043  SegmentedSectionPtr tabInfoPtr;
5044  handle.getSection(tabInfoPtr, 0);
5045 
5046  if (!c_tables.find(tabPtr, tableId))
5047  {
5048  jam();
5049  releaseSections(handle);
5050  return;
5051  }
5052 
5053  if (senderRef == 0)
5054  {
5055  jam();
5056  releaseSections(handle);
5057  return;
5058  }
5059  // dict coordinator sends info to API
5060 
5061 #ifndef DBUG_OFF
5062  ndbout_c("DICT_TAB_INFO in SUMA, tabInfoPtr.sz = %d", tabInfoPtr.sz);
5063  SimplePropertiesSectionReader reader(handle.m_ptr[0],
5064  getSectionSegmentPool());
5065  reader.printAll(ndbout);
5066 #endif
5067  copy(b_dti_buf, tabInfoPtr);
5068  releaseSections(handle);
5069 
5070  LinearSectionPtr lptr[3];
5071  lptr[0].p = b_dti_buf;
5072  lptr[0].sz = tabInfoPtr.sz;
5073 
5074  const Uint64 gci = get_current_gci(signal);
5075  SubTableData * data = (SubTableData*)signal->getDataPtrSend();
5076  data->gci_hi = Uint32(gci >> 32);
5077  data->gci_lo = Uint32(gci);
5078  data->tableId = tableId;
5079  data->requestInfo = 0;
5080  SubTableData::setOperation(data->requestInfo,
5081  NdbDictionary::Event::_TE_ALTER);
5082  SubTableData::setReqNodeId(data->requestInfo, refToNode(senderRef));
5083  data->flags = 0;
5084  data->changeMask = changeMask;
5085  data->totalLen = tabInfoPtr.sz;
5086  Ptr<Subscription> subPtr;
5087  LocalDLList<Subscription> subList(c_subscriptionPool,
5088  tabPtr.p->m_subscriptions);
5089 
5090  for (subList.first(subPtr); !subPtr.isNull(); subList.next(subPtr))
5091  {
5092  if(subPtr.p->m_subscriptionType != SubCreateReq::TableEvent)
5093  {
5094  jam();
5095  continue;
5096  //continue in for-loop if the table is not part of
5097  //the subscription. Otherwise, send data to subscriber.
5098  }
5099 
5100  if (subPtr.p->m_options & Subscription::NO_REPORT_DDL)
5101  {
5102  jam();
5103  continue;
5104  }
5105 
5106  Ptr<Subscriber> ptr;
5107  LocalDLList<Subscriber> list(c_subscriberPool, subPtr.p->m_subscribers);
5108  for(list.first(ptr); !ptr.isNull(); list.next(ptr))
5109  {
5110  jam();
5111  data->senderData= ptr.p->m_senderData;
5112  Callback c = { 0, 0 };
5113  sendFragmentedSignal(ptr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
5114  SubTableData::SignalLength, JBB, lptr, 1, c);
5115  }
5116  }
5117 }
5118 
5119 void
5120 Suma::execSUB_GCP_COMPLETE_ACK(Signal* signal)
5121 {
5122  jamEntry();
5123  ndbassert(signal->getNoOfSections() == 0);
5124 
5125  SubGcpCompleteAck * const ack = (SubGcpCompleteAck*)signal->getDataPtr();
5126  Uint32 gci_hi = ack->rep.gci_hi;
5127  Uint32 gci_lo = ack->rep.gci_lo;
5128  Uint32 senderRef = ack->rep.senderRef;
5129  if (unlikely(signal->getLength() < SubGcpCompleteAck::SignalLength))
5130  {
5131  jam();
5132  ndbassert(!ndb_check_micro_gcp(getNodeInfo(refToNode(senderRef)).m_version));
5133  gci_lo = 0;
5134  }
5135 
5136  Uint64 gci = gci_lo | (Uint64(gci_hi) << 32);
5137  m_max_seen_gci = (gci > m_max_seen_gci ? gci : m_max_seen_gci);
5138 
5139  if (ERROR_INSERTED(13037))
5140  {
5141  jam();
5142  ndbout_c("Simulating exceeding the MaxBufferedEpochs, ignoring ack");
5143  return;
5144  }
5145 
5146  if (refToBlock(senderRef) == SUMA)
5147  {
5148  jam();
5149 
5150  // Ack from other SUMA
5151  Uint32 nodeId= refToNode(senderRef);
5152  for(Uint32 i = 0; i<c_no_of_buckets; i++)
5153  {
5154  if(m_active_buckets.get(i) ||
5155  (m_switchover_buckets.get(i) && (check_switchover(i, gci))) ||
5156  (!m_switchover_buckets.get(i) && get_responsible_node(i) == nodeId))
5157  {
5158  release_gci(signal, i, gci);
5159  }
5160  }
5161  return;
5162  }
5163 
5164  // Ack from User and not an ack from other SUMA, redistribute in nodegroup
5165 
5166  Uint32 nodeId = refToNode(senderRef);
5167  if (ERROR_INSERTED(13023))
5168  {
5169  ndbout_c("Throwing SUB_GCP_COMPLETE_ACK gci: %u/%u from %u",
5170  Uint32(gci>>32), Uint32(gci), nodeId);
5171  return;
5172  }
5173 
5174 
5175  jam();
5176  Ptr<Gcp_record> gcp;
5177  for(c_gcp_list.first(gcp); !gcp.isNull(); c_gcp_list.next(gcp))
5178  {
5179  if(gcp.p->m_gci == gci)
5180  {
5181  gcp.p->m_subscribers.clear(nodeId);
5182  gcp.p->m_subscribers.bitAND(c_subscriber_nodes);
5183  if(!gcp.p->m_subscribers.isclear())
5184  {
5185  jam();
5186  return;
5187  }
5188  break;
5189  }
5190  }
5191 
5192  if(gcp.isNull())
5193  {
5194  g_eventLogger->warning("ACK wo/ gcp record (gci: %u/%u) ref: %.8x from: %.8x",
5195  Uint32(gci >> 32), Uint32(gci),
5196  senderRef, signal->getSendersBlockRef());
5197  }
5198  else
5199  {
5200  c_gcp_list.release(gcp);
5201  }
5202 
5203  CRASH_INSERTION(13011);
5204  if(ERROR_INSERTED(13012))
5205  {
5206  CLEAR_ERROR_INSERT_VALUE;
5207  ndbout_c("Don't redistribute SUB_GCP_COMPLETE_ACK");
5208  return;
5209  }
5210 
5211  ack->rep.senderRef = reference();
5212  NodeReceiverGroup rg(SUMA, c_nodes_in_nodegroup_mask);
5213  sendSignal(rg, GSN_SUB_GCP_COMPLETE_ACK, signal,
5214  SubGcpCompleteAck::SignalLength, JBB);
5215 }
5216 
5217 /**************************************************************
5218  *
5219  * Removing subscription
5220  *
5221  */
5222 
5223 void
5225 {
5226  jamEntry();
5227  DBUG_ENTER("Suma::execSUB_REMOVE_REQ");
5228 
5229  CRASH_INSERTION(13021);
5230 
5231  const SubRemoveReq req = *(SubRemoveReq*)signal->getDataPtr();
5232  SubscriptionPtr subPtr;
5233  Subscription key;
5234  key.m_subscriptionId = req.subscriptionId;
5235  key.m_subscriptionKey = req.subscriptionKey;
5236 
5237  if (c_startup.m_restart_server_node_id == RNIL)
5238  {
5239  jam();
5240 
5244  sendSubRemoveRef(signal, req, SubRemoveRef::NotStarted);
5245  return;
5246  }
5247 
5248  bool found = c_subscriptions.find(subPtr, key);
5249 
5250  if(!found)
5251  {
5252  jam();
5253  sendSubRemoveRef(signal, req, SubRemoveRef::NoSuchSubscription);
5254  return;
5255  }
5256 
5257  switch(subPtr.p->m_state){
5258  case Subscription::UNDEFINED:
5259  jam();
5260  ndbrequire(false);
5261  case Subscription::DEFINING:
5262  jam();
5263  sendSubRemoveRef(signal, req, SubRemoveRef::Defining);
5264  return;
5265  case Subscription::DEFINED:
5266  if (subPtr.p->m_options & Subscription::MARKED_DROPPED)
5267  {
5271  jam();
5272  sendSubRemoveRef(signal, req, SubRemoveRef::AlreadyDropped);
5273  return;
5274  }
5275  break;
5276  }
5277 
5278  subPtr.p->m_options |= Subscription::MARKED_DROPPED;
5279  check_release_subscription(signal, subPtr);
5280 
5281  SubRemoveConf * const conf = (SubRemoveConf*)signal->getDataPtrSend();
5282  conf->senderRef = reference();
5283  conf->senderData = req.senderData;
5284  conf->subscriptionId = req.subscriptionId;
5285  conf->subscriptionKey = req.subscriptionKey;
5286 
5287  sendSignal(req.senderRef, GSN_SUB_REMOVE_CONF, signal,
5288  SubRemoveConf::SignalLength, JBB);
5289  return;
5290 }
5291 
5292 void
5294 {
5295  if (!subPtr.p->m_subscribers.isEmpty())
5296  {
5297  jam();
5298  return;
5299  }
5300 
5301  if (!subPtr.p->m_start_req.isEmpty())
5302  {
5303  jam();
5304  return;
5305  }
5306 
5307  if (!subPtr.p->m_stop_req.isEmpty())
5308  {
5309  jam();
5310  return;
5311  }
5312 
5313  switch(subPtr.p->m_trigger_state){
5314  case Subscription::T_UNDEFINED:
5315  jam();
5316  goto do_release;
5317  case Subscription::T_CREATING:
5318  jam();
5322  return;
5323  case Subscription::T_DEFINED:
5324  jam();
5325  subPtr.p->m_trigger_state = Subscription::T_DROPPING;
5326  drop_triggers(signal, subPtr);
5327  return;
5328  case Subscription::T_DROPPING:
5329  jam();
5333  return;
5334  case Subscription::T_ERROR:
5335  jam();
5339  return;
5340  }
5341  ndbrequire(false);
5342 
5343 do_release:
5344  TablePtr tabPtr;
5345  c_tables.getPtr(tabPtr, subPtr.p->m_table_ptrI);
5346 
5347  if (tabPtr.p->m_state == Table::DROPPED)
5348  {
5349  jam();
5350  subPtr.p->m_options |= Subscription::MARKED_DROPPED;
5351  }
5352 
5353  if ((subPtr.p->m_options & Subscription::MARKED_DROPPED) == 0)
5354  {
5355  jam();
5356  return;
5357  }
5358 
5359  {
5360  LocalDLList<Subscription> list(c_subscriptionPool,
5361  tabPtr.p->m_subscriptions);
5362  list.remove(subPtr);
5363  }
5364 
5365  if (tabPtr.p->m_subscriptions.isEmpty())
5366  {
5367  jam();
5368  switch(tabPtr.p->m_state){
5369  case Table::UNDEFINED:
5370  ndbrequire(false);
5371  case Table::DEFINING:
5372  break;
5373  case Table::DEFINED:
5374  jam();
5375  c_tables.remove(tabPtr);
5376  // Fall through
5377  case Table::DROPPED:
5378  jam();
5379  tabPtr.p->release(* this);
5380  c_tablePool.release(tabPtr);
5381  };
5382  }
5383 
5384  c_subscriptions.release(subPtr);
5385 }
5386 
5387 void
5388 Suma::sendSubRemoveRef(Signal* signal,
5389  const SubRemoveReq& req,
5390  Uint32 errCode)
5391 {
5392  jam();
5393  DBUG_ENTER("Suma::sendSubRemoveRef");
5394  SubRemoveRef * ref = (SubRemoveRef *)signal->getDataPtrSend();
5395  ref->senderRef = reference();
5396  ref->senderData = req.senderData;
5397  ref->subscriptionId = req.subscriptionId;
5398  ref->subscriptionKey = req.subscriptionKey;
5399  ref->errorCode = errCode;
5400  sendSignal(signal->getSendersBlockRef(), GSN_SUB_REMOVE_REF,
5401  signal, SubRemoveRef::SignalLength, JBB);
5402  DBUG_VOID_RETURN;
5403 }
5404 
5405 void
5406 Suma::Table::release(Suma & suma){
5407  jamBlock(&suma);
5408 
5409  m_state = UNDEFINED;
5410 }
5411 
5412 void
5413 Suma::SyncRecord::release(){
5414  jam();
5415 
5416  LocalDataBuffer<15> fragBuf(suma.c_dataBufferPool, m_fragments);
5417  fragBuf.release();
5418 
5419  LocalDataBuffer<15> attrBuf(suma.c_dataBufferPool, m_attributeList);
5420  attrBuf.release();
5421 
5422  LocalDataBuffer<15> boundBuf(suma.c_dataBufferPool, m_boundInfo);
5423  boundBuf.release();
5424 }
5425 
5426 
5427 /**************************************************************
5428  *
5429  * Restarting remote node functions, master functionality
5430  * (slave does nothing special)
5431  * - triggered on INCL_NODEREQ calling startNode
5432  * - included node will issue START_ME when it's ready to start
5433  * the subscribers
5434  *
5435  */
5436 
5437 void
5439  jamEntry();
5440 
5441  Uint32 retref = signal->getSendersBlockRef();
5442  if (c_restart.m_ref)
5443  {
5444  jam();
5445  SumaStartMeRef* ref= (SumaStartMeRef*)signal->getDataPtrSend();
5446  ref->errorCode = SumaStartMeRef::Busy;
5447  sendSignal(retref, GSN_SUMA_START_ME_REF, signal,
5448  SumaStartMeRef::SignalLength, JBB);
5449  return;
5450  }
5451 
5452  if (getNodeState().getStarted() == false)
5453  {
5454  jam();
5455  SumaStartMeRef* ref= (SumaStartMeRef*)signal->getDataPtrSend();
5456  ref->errorCode = SumaStartMeRef::NotStarted;
5457  sendSignal(retref, GSN_SUMA_START_ME_REF, signal,
5458  SumaStartMeRef::SignalLength, JBB);
5459  return;
5460  }
5461 
5462  Ptr<SubOpRecord> subOpPtr;
5463  if (c_subOpPool.seize(subOpPtr) == false)
5464  {
5465  jam();
5466  SumaStartMeRef* ref= (SumaStartMeRef*)signal->getDataPtrSend();
5467  ref->errorCode = SumaStartMeRef::Busy;
5468  sendSignal(retref, GSN_SUMA_START_ME_REF, signal,
5469  SumaStartMeRef::SignalLength, JBB);
5470  return;
5471  }
5472 
5473  subOpPtr.p->m_opType = SubOpRecord::R_START_ME_REQ;
5474 
5475  c_restart.m_abort = 0;
5476  c_restart.m_waiting_on_self = 0;
5477  c_restart.m_ref = retref;
5478  c_restart.m_max_seq = c_current_seq;
5479  c_restart.m_subOpPtrI = subOpPtr.i;
5480 
5482  if (c_subscriptions.first(it))
5483  {
5484  jam();
5485 
5491  c_current_seq++;
5492  }
5493 
5494  copySubscription(signal, it);
5495 }
5496 
5497 void
5499 {
5500  jam();
5501 
5502  Ptr<SubOpRecord> subOpPtr;
5503  c_subOpPool.getPtr(subOpPtr, c_restart.m_subOpPtrI);
5504 
5505  Ptr<Subscription> subPtr = it.curr;
5506  if (!subPtr.isNull())
5507  {
5508  jam();
5509  c_restart.m_subPtrI = subPtr.i;
5510  c_restart.m_bucket = it.bucket;
5511 
5512  LocalDLFifoList<SubOpRecord> list(c_subOpPool, subPtr.p->m_stop_req);
5513  bool empty = list.isEmpty();
5514  list.add(subOpPtr);
5515 
5516  if (!empty)
5517  {
5521  jam();
5522  c_restart.m_waiting_on_self = 1;
5523  return;
5524  }
5525 
5526  sendSubCreateReq(signal, subPtr);
5527  }
5528  else
5529  {
5530  jam();
5531  SumaStartMeConf* conf = (SumaStartMeConf*)signal->getDataPtrSend();
5532  conf->unused = 0;
5533  sendSignal(c_restart.m_ref, GSN_SUMA_START_ME_CONF, signal,
5534  SumaStartMeConf::SignalLength, JBB);
5535 
5536  c_subOpPool.release(subOpPtr);
5537  c_restart.m_ref = 0;
5538  return;
5539  }
5540 }
5541 
5542 void
5544 {
5545  jam();
5546 
5547  if (c_restart.m_abort)
5548  {
5549  jam();
5550  abort_start_me(signal, subPtr, true);
5551  return;
5552  }
5553 
5554  c_restart.m_waiting_on_self = 0;
5555  SubCreateReq * req = (SubCreateReq *)signal->getDataPtrSend();
5556  req->senderRef = reference();
5557  req->senderData = subPtr.i;
5558  req->subscriptionId = subPtr.p->m_subscriptionId;
5559  req->subscriptionKey = subPtr.p->m_subscriptionKey;
5560  req->subscriptionType = subPtr.p->m_subscriptionType;
5561  req->tableId = subPtr.p->m_tableId;
5562  req->schemaTransId = 0;
5563 
5564  if (subPtr.p->m_options & Subscription::REPORT_ALL)
5565  {
5566  req->subscriptionType |= SubCreateReq::ReportAll;
5567  }
5568 
5569  if (subPtr.p->m_options & Subscription::REPORT_SUBSCRIBE)
5570  {
5571  req->subscriptionType |= SubCreateReq::ReportSubscribe;
5572  }
5573 
5574  if (subPtr.p->m_options & Subscription::NO_REPORT_DDL)
5575  {
5576  req->subscriptionType |= SubCreateReq::NoReportDDL;
5577  }
5578 
5579  if (subPtr.p->m_options & Subscription::MARKED_DROPPED)
5580  {
5581  req->subscriptionType |= SubCreateReq::NR_Sub_Dropped;
5582  ndbout_c("copying dropped sub: %u", subPtr.i);
5583  }
5584 
5585  Ptr<Table> tabPtr;
5586  c_tablePool.getPtr(tabPtr, subPtr.p->m_table_ptrI);
5587  if (tabPtr.p->m_state != Table::DROPPED)
5588  {
5589  jam();
5590  c_restart.m_waiting_on_self = 0;
5591  if (!ndbd_suma_dictlock_startme(getNodeInfo(refToNode(c_restart.m_ref)).m_version))
5592  {
5593  jam();
5602  }
5603 
5604  sendSignal(c_restart.m_ref, GSN_SUB_CREATE_REQ, signal,
5605  SubCreateReq::SignalLength, JBB);
5606  }
5607  else
5608  {
5609  jam();
5610  ndbout_c("not copying sub %u with dropped table: %u/%u",
5611  subPtr.i,
5612  tabPtr.p->m_tableId, tabPtr.i);
5613 
5614  c_restart.m_waiting_on_self = 1;
5615  SubCreateConf * conf = (SubCreateConf *)signal->getDataPtrSend();
5616  conf->senderRef = reference();
5617  conf->senderData = subPtr.i;
5618  sendSignal(reference(), GSN_SUB_CREATE_CONF, signal,
5619  SubCreateConf::SignalLength, JBB);
5620  }
5621 }
5622 
5623 void
5625 {
5626  jamEntry();
5627 
5628  SubCreateRef *const ref= (SubCreateRef *)signal->getDataPtr();
5629  Uint32 error= ref->errorCode;
5630 
5631  {
5632  SumaStartMeRef* ref= (SumaStartMeRef*)signal->getDataPtrSend();
5633  ref->errorCode = error;
5634  sendSignal(c_restart.m_ref, GSN_SUMA_START_ME_REF, signal,
5635  SumaStartMeRef::SignalLength, JBB);
5636  }
5637 
5638  Ptr<Subscription> subPtr;
5639  c_subscriptionPool.getPtr(subPtr, c_restart.m_subPtrI);
5640  abort_start_me(signal, subPtr, true);
5641 }
5642 
5643 void
5645 {
5646  jamEntry();
5647 
5651  Ptr<Subscription> subPtr;
5652  c_subscriptionPool.getPtr(subPtr, c_restart.m_subPtrI);
5653 
5654  c_restart.m_waiting_on_self = 0;
5655 
5660  if (c_restart.m_abort)
5661  {
5662  jam();
5663  abort_start_me(signal, subPtr, true);
5664  return;
5665  }
5666 
5667  Ptr<Table> tabPtr;
5668  c_tablePool.getPtr(tabPtr, subPtr.p->m_table_ptrI);
5669 
5670  Ptr<Subscriber> ptr;
5671  if (tabPtr.p->m_state != Table::DROPPED)
5672  {
5673  jam();
5674  LocalDLList<Subscriber> list(c_subscriberPool, subPtr.p->m_subscribers);
5675  list.first(ptr);
5676  }
5677  else
5678  {
5679  jam();
5680  ptr.setNull();
5681  ndbout_c("not copying subscribers on sub: %u with dropped table %u/%u",
5682  subPtr.i, tabPtr.p->m_tableId, tabPtr.i);
5683  }
5684 
5685  copySubscriber(signal, subPtr, ptr);
5686 }
5687 
5688 void
5689 Suma::copySubscriber(Signal* signal,
5690  Ptr<Subscription> subPtr,
5691  Ptr<Subscriber> ptr)
5692 {
5693  if (!ptr.isNull())
5694  {
5695  jam();
5696 
5697  SubStartReq* req = (SubStartReq*)signal->getDataPtrSend();
5698  req->senderRef = reference();
5699  req->senderData = ptr.i;
5700  req->subscriptionId = subPtr.p->m_subscriptionId;
5701  req->subscriptionKey = subPtr.p->m_subscriptionKey;
5702  req->part = SubscriptionData::TableData;
5703  req->subscriberData = ptr.p->m_senderData;
5704  req->subscriberRef = ptr.p->m_senderRef;
5705 
5706  sendSignal(c_restart.m_ref, GSN_SUB_START_REQ,
5707  signal, SubStartReq::SignalLength, JBB);
5708  return;
5709  }
5710  else
5711  {
5712  // remove lock from this subscription
5713  Ptr<SubOpRecord> subOpPtr;
5714  c_subOpPool.getPtr(subOpPtr, c_restart.m_subOpPtrI);
5715  check_remove_queue(signal, subPtr, subOpPtr, true, false);
5716  check_release_subscription(signal, subPtr);
5717 
5719  it.curr = subPtr;
5720  it.bucket = c_restart.m_bucket;
5721  c_subscriptions.next(it);
5722  copySubscription(signal, it);
5723  }
5724 }
5725 
5726 void
5727 Suma::execSUB_START_CONF(Signal* signal)
5728 {
5729  jamEntry();
5730 
5731  SubStartConf * const conf = (SubStartConf*)signal->getDataPtr();
5732 
5733  Ptr<Subscription> subPtr;
5734  c_subscriptionPool.getPtr(subPtr, c_restart.m_subPtrI);
5735 
5736  Ptr<Subscriber> ptr;
5737  c_subscriberPool.getPtr(ptr, conf->senderData);
5738 
5739  LocalDLList<Subscriber> list(c_subscriberPool, subPtr.p->m_subscribers);
5740  list.next(ptr);
5741  copySubscriber(signal, subPtr, ptr);
5742 }
5743 
5744 void
5745 Suma::execSUB_START_REF(Signal* signal)
5746 {
5747  jamEntry();
5748 
5749  SubStartRef * sig = (SubStartRef*)signal->getDataPtr();
5750  Uint32 errorCode = sig->errorCode;
5751 
5752  {
5753  SumaStartMeRef* ref= (SumaStartMeRef*)signal->getDataPtrSend();
5754  ref->errorCode = errorCode;
5755  sendSignal(c_restart.m_ref, GSN_SUMA_START_ME_REF, signal,
5756  SumaStartMeRef::SignalLength, JBB);
5757  }
5758 
5759  Ptr<Subscription> subPtr;
5760  c_subscriptionPool.getPtr(subPtr, c_restart.m_subPtrI);
5761 
5762  abort_start_me(signal, subPtr, true);
5763 }
5764 
5765 void
5766 Suma::abort_start_me(Signal* signal, Ptr<Subscription> subPtr,
5767  bool lockowner)
5768 {
5769  Ptr<SubOpRecord> subOpPtr;
5770  c_subOpPool.getPtr(subOpPtr, c_restart.m_subOpPtrI);
5771  check_remove_queue(signal, subPtr, subOpPtr, lockowner, true);
5772  check_release_subscription(signal, subPtr);
5773 
5774  c_restart.m_ref = 0;
5775 }
5776 
5777 void
5778 Suma::execSUMA_HANDOVER_REQ(Signal* signal)
5779 {
5780  jamEntry();
5781  DBUG_ENTER("Suma::execSUMA_HANDOVER_REQ");
5782  // Uint32 sumaRef = signal->getSendersBlockRef();
5783  const SumaHandoverReq * req = CAST_CONSTPTR(SumaHandoverReq,
5784  signal->getDataPtr());
5785 
5786  Uint32 gci = req->gci;
5787  Uint32 nodeId = req->nodeId;
5788  Uint32 new_gci = Uint32(m_last_complete_gci >> 32) + MAX_CONCURRENT_GCP + 1;
5789  Uint32 requestType = req->requestType;
5790  if (!ndbd_suma_stop_me(getNodeInfo(nodeId).m_version))
5791  {
5792  jam();
5793  requestType = SumaHandoverReq::RT_START_NODE;
5794  }
5795 
5796  Uint32 start_gci = (gci > new_gci ? gci : new_gci);
5797  // mark all active buckets really belonging to restarting SUMA
5798 
5799  Bucket_mask tmp;
5800  if (requestType == SumaHandoverReq::RT_START_NODE)
5801  {
5802  jam();
5803  c_alive_nodes.set(nodeId);
5804  if (DBG_3R)
5805  ndbout_c("%u c_alive_nodes.set(%u)", __LINE__, nodeId);
5806 
5807  for( Uint32 i = 0; i < c_no_of_buckets; i++)
5808  {
5809  if(get_responsible_node(i) == nodeId)
5810  {
5811  if (m_active_buckets.get(i))
5812  {
5813  // I'm running this bucket but it should really be the restarted node
5814  tmp.set(i);
5815  m_active_buckets.clear(i);
5816  m_switchover_buckets.set(i);
5817  c_buckets[i].m_switchover_gci = (Uint64(start_gci) << 32) - 1;
5818  c_buckets[i].m_state |= Bucket::BUCKET_HANDOVER;
5819  c_buckets[i].m_switchover_node = nodeId;
5820  ndbout_c("prepare to handover bucket: %d", i);
5821  }
5822  else if(m_switchover_buckets.get(i))
5823  {
5824  ndbout_c("dont handover bucket: %d %d", i, nodeId);
5825  }
5826  }
5827  }
5828  }
5829  else if (requestType == SumaHandoverReq::RT_STOP_NODE)
5830  {
5831  jam();
5832 
5833  for( Uint32 i = 0; i < c_no_of_buckets; i++)
5834  {
5835  NdbNodeBitmask nodegroup = c_nodes_in_nodegroup_mask;
5836  nodegroup.clear(nodeId);
5837  if(get_responsible_node(i) == nodeId &&
5838  get_responsible_node(i, nodegroup) == getOwnNodeId())
5839  {
5840  // I'm will be running this bucket when nodeId shutdown
5841  jam();
5842  tmp.set(i);
5843  m_switchover_buckets.set(i);
5844  c_buckets[i].m_switchover_gci = (Uint64(start_gci) << 32) - 1;
5845  c_buckets[i].m_state |= Bucket::BUCKET_SHUTDOWN_TO;
5846  c_buckets[i].m_switchover_node = nodeId;
5847  ndbout_c("prepare to takeover bucket: %d", i);
5848  }
5849  }
5850  }
5851  else
5852  {
5853  jam();
5854  goto ref;
5855  }
5856 
5857  {
5858  SumaHandoverConf *conf= CAST_PTR(SumaHandoverConf,signal->getDataPtrSend());
5859  tmp.copyto(BUCKET_MASK_SIZE, conf->theBucketMask);
5860  conf->gci = start_gci;
5861  conf->nodeId = getOwnNodeId();
5862  conf->requestType = requestType;
5863  sendSignal(calcSumaBlockRef(nodeId), GSN_SUMA_HANDOVER_CONF, signal,
5864  SumaHandoverConf::SignalLength, JBB);
5865  }
5866 
5867  DBUG_VOID_RETURN;
5868 
5869 ref:
5870  signal->theData[0] = 111;
5871  signal->theData[1] = getOwnNodeId();
5872  signal->theData[2] = nodeId;
5873  sendSignal(calcSumaBlockRef(nodeId), GSN_SUMA_HANDOVER_REF, signal, 3, JBB);
5874  DBUG_VOID_RETURN;
5875 }
5876 
5877 // only run on all but restarting suma
5878 void
5879 Suma::execSUMA_HANDOVER_REF(Signal* signal)
5880 {
5881  ndbrequire(false);
5882 }
5883 
5884 void
5885 Suma::execSUMA_HANDOVER_CONF(Signal* signal) {
5886  jamEntry();
5887  DBUG_ENTER("Suma::execSUMA_HANDOVER_CONF");
5888 
5889  const SumaHandoverConf * conf = CAST_CONSTPTR(SumaHandoverConf,
5890  signal->getDataPtr());
5891 
5892  CRASH_INSERTION(13043);
5893 
5894  Uint32 gci = conf->gci;
5895  Uint32 nodeId = conf->nodeId;
5896  Uint32 requestType = conf->requestType;
5897  Bucket_mask tmp;
5898  tmp.assign(BUCKET_MASK_SIZE, conf->theBucketMask);
5899 #ifdef HANDOVER_DEBUG
5900  ndbout_c("Suma::execSUMA_HANDOVER_CONF, gci = %u", gci);
5901 #endif
5902 
5903  if (!ndbd_suma_stop_me(getNodeInfo(nodeId).m_version))
5904  {
5905  jam();
5906  requestType = SumaHandoverReq::RT_START_NODE;
5907  }
5908 
5909  if (requestType == SumaHandoverReq::RT_START_NODE)
5910  {
5911  jam();
5912  for (Uint32 i = 0; i < c_no_of_buckets; i++)
5913  {
5914  if (tmp.get(i))
5915  {
5916  if (DBG_3R)
5917  ndbout_c("%u : %u %u", i, get_responsible_node(i), getOwnNodeId());
5918  ndbrequire(get_responsible_node(i) == getOwnNodeId());
5919  // We should run this bucket, but _nodeId_ is
5920  c_buckets[i].m_switchover_gci = (Uint64(gci) << 32) - 1;
5921  c_buckets[i].m_state |= Bucket::BUCKET_STARTING;
5922  }
5923  }
5924 
5925  char buf[255];
5926  tmp.getText(buf);
5927  infoEvent("Suma: handover from node %u gci: %u buckets: %s (%u)",
5928  nodeId, gci, buf, c_no_of_buckets);
5929  g_eventLogger->info("Suma: handover from node %u gci: %u buckets: %s (%u)",
5930  nodeId, gci, buf, c_no_of_buckets);
5931  m_switchover_buckets.bitOR(tmp);
5932  c_startup.m_handover_nodes.clear(nodeId);
5933  DBUG_VOID_RETURN;
5934  }
5935  else if (requestType == SumaHandoverReq::RT_STOP_NODE)
5936  {
5937  jam();
5938  for (Uint32 i = 0; i < c_no_of_buckets; i++)
5939  {
5940  if (tmp.get(i))
5941  {
5942  ndbrequire(get_responsible_node(i) == getOwnNodeId());
5943  // We should run this bucket, but _nodeId_ is
5944  c_buckets[i].m_switchover_node = getOwnNodeId();
5945  c_buckets[i].m_switchover_gci = (Uint64(gci) << 32) - 1;
5946  c_buckets[i].m_state |= Bucket::BUCKET_SHUTDOWN;
5947  }
5948  }
5949 
5950  char buf[255];
5951  tmp.getText(buf);
5952  infoEvent("Suma: handover to node %u gci: %u buckets: %s (%u)",
5953  nodeId, gci, buf, c_no_of_buckets);
5954  g_eventLogger->info("Suma: handover to node %u gci: %u buckets: %s (%u)",
5955  nodeId, gci, buf, c_no_of_buckets);
5956  m_switchover_buckets.bitOR(tmp);
5957  c_startup.m_handover_nodes.clear(nodeId);
5958  DBUG_VOID_RETURN;
5959  }
5960 }
5961 
5962 void
5964 {
5965  jam();
5966  StopMeReq req = * CAST_CONSTPTR(StopMeReq, signal->getDataPtr());
5967 
5968  ndbrequire(refToNode(req.senderRef) == getOwnNodeId());
5969  ndbrequire(c_shutdown.m_wait_handover == false);
5970  c_shutdown.m_wait_handover = true;
5971  c_shutdown.m_senderRef = req.senderRef;
5972  c_shutdown.m_senderData = req.senderData;
5973 
5974  for (Uint32 i = c_nodes_in_nodegroup_mask.find(0);
5975  i != c_nodes_in_nodegroup_mask.NotFound ;
5976  i = c_nodes_in_nodegroup_mask.find(i + 1))
5977  {
5983  if (!ndbd_suma_stop_me(getNodeInfo(i).m_version))
5984  {
5985  jam();
5986  char buf[255];
5987  BaseString::snprintf(buf, sizeof(buf),
5988  "Not all versions support graceful shutdown (suma)."
5989  " Shutdown directly instead");
5990  progError(__LINE__,
5991  NDBD_EXIT_GRACEFUL_SHUTDOWN_ERROR,
5992  buf);
5993  ndbrequire(false);
5994  }
5995  }
5996  send_handover_req(signal, SumaHandoverReq::RT_STOP_NODE);
5997 }
5998 
5999 #ifdef NOT_USED
6000 static
6001 NdbOut&
6002 operator<<(NdbOut & out, const Suma::Page_pos & pos)
6003 {
6004  out << "[ Page_pos:"
6005  << " m_page_id: " << pos.m_page_id
6006  << " m_page_pos: " << pos.m_page_pos
6007  << " m_max_gci: " << pos.m_max_gci
6008  << " ]";
6009  return out;
6010 }
6011 #endif
6012 
6013 Uint32*
6014 Suma::get_buffer_ptr(Signal* signal, Uint32 buck, Uint64 gci, Uint32 sz)
6015 {
6016  sz += 1; // len
6017  Bucket* bucket= c_buckets+buck;
6018  Page_pos pos= bucket->m_buffer_head;
6019 
6020  Buffer_page* page = 0;
6021  Uint32 *ptr = 0;
6022 
6023  if (likely(pos.m_page_id != RNIL))
6024  {
6025  page= c_page_pool.getPtr(pos.m_page_id);
6026  ptr= page->m_data + pos.m_page_pos;
6027  }
6028 
6029  const bool same_gci = (gci == pos.m_last_gci) && (!ERROR_INSERTED(13022));
6030 
6031  pos.m_page_pos += sz;
6032  pos.m_last_gci = gci;
6033  Uint64 max = pos.m_max_gci > gci ? pos.m_max_gci : gci;
6034 
6035  if(likely(same_gci && pos.m_page_pos <= Buffer_page::DATA_WORDS))
6036  {
6037  pos.m_max_gci = max;
6038  bucket->m_buffer_head = pos;
6039  * ptr++ = (0x8000 << 16) | sz; // Same gci
6040  return ptr;
6041  }
6042  else if(pos.m_page_pos + Buffer_page::GCI_SZ32 <= Buffer_page::DATA_WORDS)
6043  {
6044 loop:
6045  pos.m_max_gci = max;
6046  pos.m_page_pos += Buffer_page::GCI_SZ32;
6047  bucket->m_buffer_head = pos;
6048  * ptr++ = (sz + Buffer_page::GCI_SZ32);
6049  * ptr++ = (Uint32)(gci >> 32);
6050  * ptr++ = (Uint32)(gci & 0xFFFFFFFF);
6051  return ptr;
6052  }
6053  else
6054  {
6060  Uint32 next;
6061  if(unlikely((next= seize_page()) == RNIL))
6062  {
6066  out_of_buffer(signal);
6067  return 0;
6068  }
6069 
6070  if(likely(pos.m_page_id != RNIL))
6071  {
6072  page->m_max_gci_hi = (Uint32)(pos.m_max_gci >> 32);
6073  page->m_max_gci_lo = (Uint32)(pos.m_max_gci & 0xFFFFFFFF);
6074  page->m_words_used = pos.m_page_pos - sz;
6075  page->m_next_page= next;
6076  ndbassert(pos.m_max_gci != 0);
6077  }
6078  else
6079  {
6080  bucket->m_buffer_tail = next;
6081  }
6082 
6083  memset(&pos, 0, sizeof(pos));
6084  pos.m_page_id = next;
6085  pos.m_page_pos = sz;
6086  pos.m_last_gci = gci;
6087 
6088  page= c_page_pool.getPtr(pos.m_page_id);
6089  page->m_next_page= RNIL;
6090  ptr= page->m_data;
6091  goto loop; //
6092  }
6093 }
6094 
6095 void
6096 Suma::out_of_buffer(Signal* signal)
6097 {
6098  if(m_out_of_buffer_gci)
6099  {
6100  return;
6101  }
6102 
6103  m_out_of_buffer_gci = m_last_complete_gci - 1;
6104  infoEvent("Out of event buffer: nodefailure will cause event failures");
6105  m_missing_data = false;
6106  out_of_buffer_release(signal, 0);
6107 }
6108 
6109 void
6110 Suma::out_of_buffer_release(Signal* signal, Uint32 buck)
6111 {
6112  Bucket* bucket= c_buckets+buck;
6113  Uint32 tail= bucket->m_buffer_tail;
6114 
6115  if(tail != RNIL)
6116  {
6117  Buffer_page* page= c_page_pool.getPtr(tail);
6118  bucket->m_buffer_tail = page->m_next_page;
6119  free_page(tail, page);
6120  signal->theData[0] = SumaContinueB::OUT_OF_BUFFER_RELEASE;
6121  signal->theData[1] = buck;
6122  sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 2, JBB);
6123  return;
6124  }
6125 
6129  bucket->m_buffer_head.m_page_id = RNIL;
6130  bucket->m_buffer_head.m_page_pos = Buffer_page::DATA_WORDS + 1;
6131 
6132  buck++;
6133  if(buck != c_no_of_buckets)
6134  {
6135  signal->theData[0] = SumaContinueB::OUT_OF_BUFFER_RELEASE;
6136  signal->theData[1] = buck;
6137  sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 2, JBB);
6138  return;
6139  }
6140 
6145  m_out_of_buffer_gci = m_max_seen_gci > m_last_complete_gci
6146  ? m_max_seen_gci : m_last_complete_gci;
6147  m_missing_data = false;
6148 }
6149 
6150 Uint32
6151 Suma::seize_page()
6152 {
6153  if (ERROR_INSERTED(13038))
6154  {
6155  jam();
6156  CLEAR_ERROR_INSERT_VALUE;
6157  ndbout_c("Simulating out of event buffer");
6158  m_out_of_buffer_gci = m_max_seen_gci;
6159  }
6160  if(unlikely(m_out_of_buffer_gci))
6161  {
6162  return RNIL;
6163  }
6164 loop:
6165  Ptr<Page_chunk> ptr;
6166  Uint32 ref= m_first_free_page;
6167  if(likely(ref != RNIL))
6168  {
6169  m_first_free_page = (c_page_pool.getPtr(ref))->m_next_page;
6170  Uint32 chunk = (c_page_pool.getPtr(ref))->m_page_chunk_ptr_i;
6171  c_page_chunk_pool.getPtr(ptr, chunk);
6172  ndbassert(ptr.p->m_free);
6173  ptr.p->m_free--;
6174  return ref;
6175  }
6176 
6177  if(!c_page_chunk_pool.seize(ptr))
6178  return RNIL;
6179 
6180  Uint32 count = 16;
6181  m_ctx.m_mm.alloc_pages(RT_DBTUP_PAGE, &ref, &count, 1);
6182  if (count == 0)
6183  return RNIL;
6184 
6185  ndbout_c("alloc_chunk(%d %d) - ", ref, count);
6186 
6187  m_first_free_page = ptr.p->m_page_id = ref;
6188  ptr.p->m_size = count;
6189  ptr.p->m_free = count;
6190 
6191  Buffer_page* page;
6192  LINT_INIT(page);
6193  for(Uint32 i = 0; i<count; i++)
6194  {
6195  page = c_page_pool.getPtr(ref);
6196  page->m_page_state= SUMA_SEQUENCE;
6197  page->m_page_chunk_ptr_i = ptr.i;
6198  page->m_next_page = ++ref;
6199  }
6200  page->m_next_page = RNIL;
6201 
6202  goto loop;
6203 }
6204 
6205 void
6206 Suma::free_page(Uint32 page_id, Buffer_page* page)
6207 {
6208  Ptr<Page_chunk> ptr;
6209  ndbrequire(page->m_page_state == SUMA_SEQUENCE);
6210 
6211  Uint32 chunk= page->m_page_chunk_ptr_i;
6212 
6213  c_page_chunk_pool.getPtr(ptr, chunk);
6214 
6215  ptr.p->m_free ++;
6216  page->m_next_page = m_first_free_page;
6217  ndbrequire(ptr.p->m_free <= ptr.p->m_size);
6218 
6219  m_first_free_page = page_id;
6220 }
6221 
6222 void
6223 Suma::release_gci(Signal* signal, Uint32 buck, Uint64 gci)
6224 {
6225  Bucket* bucket= c_buckets+buck;
6226  Uint32 tail= bucket->m_buffer_tail;
6227  Page_pos head= bucket->m_buffer_head;
6228  Uint64 max_acked = bucket->m_max_acked_gci;
6229 
6230  const Uint32 mask = Bucket::BUCKET_TAKEOVER | Bucket::BUCKET_RESEND;
6231  if(unlikely(bucket->m_state & mask))
6232  {
6233  jam();
6234  ndbout_c("release_gci(%d, %u/%u) 0x%x-> node failure -> abort",
6235  buck, Uint32(gci >> 32), Uint32(gci), bucket->m_state);
6236  return;
6237  }
6238 
6239  bucket->m_max_acked_gci = (max_acked > gci ? max_acked : gci);
6240  if(unlikely(tail == RNIL))
6241  {
6242  return;
6243  }
6244 
6245  if(tail == head.m_page_id)
6246  {
6247  if(gci >= head.m_max_gci)
6248  {
6249  jam();
6250  if (ERROR_INSERTED(13034))
6251  {
6252  jam();
6253  SET_ERROR_INSERT_VALUE(13035);
6254  return;
6255  }
6256  if (ERROR_INSERTED(13035))
6257  {
6258  CLEAR_ERROR_INSERT_VALUE;
6259  NodeReceiverGroup rg(CMVMI, c_nodes_in_nodegroup_mask);
6260  rg.m_nodes.clear(getOwnNodeId());
6261  signal->theData[0] = 9999;
6262  sendSignal(rg, GSN_NDB_TAMPER, signal, 1, JBA);
6263  return;
6264  }
6265  head.m_page_pos = 0;
6266  head.m_max_gci = gci;
6267  head.m_last_gci = 0;
6268  bucket->m_buffer_head = head;
6269  }
6270  return;
6271  }
6272  else
6273  {
6274  jam();
6275  Buffer_page* page= c_page_pool.getPtr(tail);
6276  Uint64 max_gci = page->m_max_gci_lo | (Uint64(page->m_max_gci_hi) << 32);
6277  Uint32 next_page = page->m_next_page;
6278 
6279  ndbassert(max_gci != 0);
6280 
6281  if(gci >= max_gci)
6282  {
6283  jam();
6284  free_page(tail, page);
6285 
6286  bucket->m_buffer_tail = next_page;
6287  signal->theData[0] = SumaContinueB::RELEASE_GCI;
6288  signal->theData[1] = buck;
6289  signal->theData[2] = (Uint32)(gci >> 32);
6290  signal->theData[3] = (Uint32)(gci & 0xFFFFFFFF);
6291  sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 4, JBB);
6292  return;
6293  }
6294  else
6295  {
6296  //ndbout_c("do nothing...");
6297  }
6298  }
6299 }
6300 
6301 static Uint32 g_cnt = 0;
6302 
6303 void
6304 Suma::start_resend(Signal* signal, Uint32 buck)
6305 {
6306  printf("start_resend(%d, ", buck);
6307 
6311  Bucket* bucket= c_buckets + buck;
6312  Page_pos pos= bucket->m_buffer_head;
6313 
6314  if(m_out_of_buffer_gci)
6315  {
6316  Ptr<Gcp_record> gcp;
6317  c_gcp_list.last(gcp);
6318  signal->theData[0] = NDB_LE_SubscriptionStatus;
6319  signal->theData[1] = 2; // INCONSISTENT;
6320  signal->theData[2] = 0; // Not used
6321  signal->theData[3] = (Uint32) pos.m_max_gci;
6322  signal->theData[4] = (Uint32) (gcp.p->m_gci >> 32);
6323  sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 5, JBB);
6324  m_missing_data = true;
6325  return;
6326  }
6327 
6328  if(pos.m_page_id == RNIL)
6329  {
6330  jam();
6331  m_active_buckets.set(buck);
6332  m_gcp_complete_rep_count ++;
6333  ndbout_c("empty bucket(RNIL) -> active max_acked: %u/%u max_gci: %u/%u",
6334  Uint32(bucket->m_max_acked_gci >> 32),
6335  Uint32(bucket->m_max_acked_gci),
6336  Uint32(pos.m_max_gci >> 32),
6337  Uint32(pos.m_max_gci));
6338  return;
6339  }
6340 
6341  Uint64 min= bucket->m_max_acked_gci + 1;
6342  Uint64 max = m_max_seen_gci;
6343 
6344  ndbrequire(max <= m_max_seen_gci);
6345 
6346  if(min > max)
6347  {
6348  ndbrequire(pos.m_page_id == bucket->m_buffer_tail);
6349  m_active_buckets.set(buck);
6350  m_gcp_complete_rep_count ++;
6351  ndbout_c("empty bucket (%u/%u %u/%u) -> active",
6352  Uint32(min >> 32), Uint32(min),
6353  Uint32(max >> 32), Uint32(max));
6354  return;
6355  }
6356 
6357  g_cnt = 0;
6358  bucket->m_state |= (Bucket::BUCKET_TAKEOVER | Bucket::BUCKET_RESEND);
6359  bucket->m_switchover_node = get_responsible_node(buck);
6360  bucket->m_switchover_gci = max;
6361 
6362  m_switchover_buckets.set(buck);
6363 
6364  signal->theData[0] = SumaContinueB::RESEND_BUCKET;
6365  signal->theData[1] = buck;
6366  signal->theData[2] = (Uint32)(min >> 32);
6367  signal->theData[3] = 0;
6368  signal->theData[4] = 0;
6369  signal->theData[5] = (Uint32)(min & 0xFFFFFFFF);
6370  signal->theData[6] = 0;
6371  sendSignal(reference(), GSN_CONTINUEB, signal, 7, JBB);
6372 
6373  ndbout_c("min: %u/%u - max: %u/%u) page: %d",
6374  Uint32(min >> 32), Uint32(min), Uint32(max >> 32), Uint32(max),
6375  bucket->m_buffer_tail);
6376  ndbrequire(max >= min);
6377 }
6378 
6379 void
6380 Suma::resend_bucket(Signal* signal, Uint32 buck, Uint64 min_gci,
6381  Uint32 pos, Uint64 last_gci)
6382 {
6383  Bucket* bucket= c_buckets+buck;
6384  Uint32 tail= bucket->m_buffer_tail;
6385 
6386  Buffer_page* page= c_page_pool.getPtr(tail);
6387  Uint64 max_gci = page->m_max_gci_lo | (Uint64(page->m_max_gci_hi) << 32);
6388  Uint32 next_page = page->m_next_page;
6389  Uint32 *ptr = page->m_data + pos;
6390  Uint32 *end = page->m_data + page->m_words_used;
6391  bool delay = false;
6392 
6393  ndbrequire(tail != RNIL);
6394 
6395  if(tail == bucket->m_buffer_head.m_page_id)
6396  {
6397  max_gci= bucket->m_buffer_head.m_max_gci;
6398  end= page->m_data + bucket->m_buffer_head.m_page_pos;
6399  next_page= RNIL;
6400 
6401  if(ptr == end)
6402  {
6403  delay = true;
6404  goto next;
6405  }
6406  }
6407  else if(pos == 0 && min_gci > max_gci)
6408  {
6409  free_page(tail, page);
6410  tail = bucket->m_buffer_tail = next_page;
6411  goto next;
6412  }
6413 
6414 #if 0
6415  for(Uint32 i = 0; i<page->m_words_used; i++)
6416  {
6417  printf("%.8x ", page->m_data[i]);
6418  if(((i + 1) % 8) == 0)
6419  printf("\n");
6420  }
6421  printf("\n");
6422 #endif
6423 
6424  while(ptr < end)
6425  {
6426  Uint32 *src = ptr;
6427  Uint32 tmp = * src++;
6428  Uint32 sz = tmp & 0xFFFF;
6429 
6430  ptr += sz;
6431 
6432  if(! (tmp & (0x8000 << 16)))
6433  {
6434  ndbrequire(sz >= Buffer_page::GCI_SZ32);
6435  sz -= Buffer_page::GCI_SZ32;
6436  Uint32 last_gci_hi = * src++;
6437  Uint32 last_gci_lo = * src++;
6438  last_gci = last_gci_lo | (Uint64(last_gci_hi) << 32);
6439  }
6440  else
6441  {
6442  ndbrequire(ptr - sz > page->m_data);
6443  }
6444 
6445  if(last_gci < min_gci)
6446  {
6447  continue;
6448  }
6449 
6450  ndbrequire(sz);
6451  sz --; // remove *len* part of sz
6452 
6453  if(sz == 0)
6454  {
6455  SubGcpCompleteRep * rep = (SubGcpCompleteRep*)signal->getDataPtrSend();
6456  rep->gci_hi = (Uint32)(last_gci >> 32);
6457  rep->gci_lo = (Uint32)(last_gci & 0xFFFFFFFF);
6458  rep->flags = (m_missing_data)
6459  ? SubGcpCompleteRep::MISSING_DATA
6460  : 0;
6461  rep->senderRef = reference();
6462  rep->gcp_complete_rep_count = 1;
6463 
6464  if (ERROR_INSERTED(13036))
6465  {
6466  jam();
6467  CLEAR_ERROR_INSERT_VALUE;
6468  ndbout_c("Simulating out of event buffer at node failure");
6469  rep->flags |= SubGcpCompleteRep::MISSING_DATA;
6470  }
6471 
6472  char buf[255];
6473  c_subscriber_nodes.getText(buf);
6474  if (g_cnt)
6475  {
6476  ndbout_c("resending GCI: %u/%u rows: %d -> %s",
6477  Uint32(last_gci >> 32), Uint32(last_gci), g_cnt, buf);
6478  }
6479  g_cnt = 0;
6480 
6481  NodeReceiverGroup rg(API_CLUSTERMGR, c_subscriber_nodes);
6482  sendSignal(rg, GSN_SUB_GCP_COMPLETE_REP, signal,
6483  SubGcpCompleteRep::SignalLength, JBB);
6484  }
6485  else
6486  {
6487  const uint buffer_header_sz = 6;
6488  g_cnt++;
6489  Uint32 subPtrI = * src++ ;
6490  Uint32 schemaVersion = * src++;
6491  Uint32 event = * src >> 16;
6492  Uint32 sz_1 = (* src ++) & 0xFFFF;
6493  Uint32 any_value = * src++;
6494  Uint32 transId1 = * src++;
6495  Uint32 transId2 = * src++;
6496 
6497  ndbassert(sz - buffer_header_sz >= sz_1);
6498 
6499  LinearSectionPtr ptr[3];
6500  const Uint32 nptr= reformat(signal, ptr,
6501  src, sz_1,
6502  src + sz_1, sz - buffer_header_sz - sz_1);
6503  Uint32 ptrLen= 0;
6504  for(Uint32 i =0; i < nptr; i++)
6505  ptrLen+= ptr[i].sz;
6506 
6510  Ptr<Subscription> subPtr;
6511  c_subscriptionPool.getPtr(subPtr, subPtrI);
6512  Ptr<Table> tabPtr;
6513  c_tablePool.getPtr(tabPtr, subPtr.p->m_table_ptrI);
6514  Uint32 table = subPtr.p->m_tableId;
6515  if (table_version_major(tabPtr.p->m_schemaVersion) ==
6516  table_version_major(schemaVersion))
6517  {
6518  SubTableData * data = (SubTableData*)signal->getDataPtrSend();//trg;
6519  data->gci_hi = (Uint32)(last_gci >> 32);
6520  data->gci_lo = (Uint32)(last_gci & 0xFFFFFFFF);
6521  data->tableId = table;
6522  data->requestInfo = 0;
6523  SubTableData::setOperation(data->requestInfo, event);
6524  data->flags = 0;
6525  data->anyValue = any_value;
6526  data->totalLen = ptrLen;
6527  data->transId1 = transId1;
6528  data->transId2 = transId2;
6529 
6530  {
6532  subPtr.p->m_subscribers);
6533  SubscriberPtr subbPtr;
6534  for(list.first(subbPtr); !subbPtr.isNull(); list.next(subbPtr))
6535  {
6536  data->senderData = subbPtr.p->m_senderData;
6537  sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
6538  SubTableData::SignalLengthWithTransId, JBB, ptr, nptr);
6539  }
6540  }
6541  }
6542  }
6543 
6544  break;
6545  }
6546 
6547  if(ptr == end && (tail != bucket->m_buffer_head.m_page_id))
6548  {
6552  free_page(tail, page);
6553  tail = bucket->m_buffer_tail = next_page;
6554  pos = 0;
6555  last_gci = 0;
6556  }
6557  else
6558  {
6559  pos = Uint32(ptr - page->m_data);
6560  }
6561 
6562 next:
6563  if(tail == RNIL)
6564  {
6565  bucket->m_state &= ~(Uint32)Bucket::BUCKET_RESEND;
6566  ndbassert(! (bucket->m_state & Bucket::BUCKET_TAKEOVER));
6567  ndbout_c("resend done...");
6568  return;
6569  }
6570 
6571  signal->theData[0] = SumaContinueB::RESEND_BUCKET;
6572  signal->theData[1] = buck;
6573  signal->theData[2] = (Uint32)(min_gci >> 32);
6574  signal->theData[3] = pos;
6575  signal->theData[4] = (Uint32)(last_gci >> 32);
6576  signal->theData[5] = (Uint32)(min_gci & 0xFFFFFFFF);
6577  signal->theData[6] = (Uint32)(last_gci & 0xFFFFFFFF);
6578  if(!delay)
6579  sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 7, JBB);
6580  else
6581  sendSignalWithDelay(SUMA_REF, GSN_CONTINUEB, signal, 10, 7);
6582 }
6583 
6584 void
6585 Suma::execGCP_PREPARE(Signal *signal)
6586 {
6587  jamEntry();
6588  const GCPPrepare *prep = (const GCPPrepare *)signal->getDataPtr();
6589  m_current_gci = prep->gci_lo | (Uint64(prep->gci_hi) << 32);
6590 }
6591 
6592 Uint64
6593 Suma::get_current_gci(Signal*)
6594 {
6595  return m_current_gci;
6596 }
6597 
6598 void
6599 Suma::execCREATE_NODEGROUP_IMPL_REQ(Signal* signal)
6600 {
6602  signal->getDataPtr();
6603  CreateNodegroupImplReq *req = &reqCopy;
6604 
6605  Uint32 err = 0;
6606  Uint32 rt = req->requestType;
6607 
6608  NdbNodeBitmask tmp;
6609  for (Uint32 i = 0; i<NDB_ARRAY_SIZE(req->nodes) && req->nodes[i]; i++)
6610  {
6611  tmp.set(req->nodes[i]);
6612  }
6613  Uint32 cnt = tmp.count();
6614  Uint32 group = req->nodegroupId;
6615 
6616  switch(rt){
6617  case CreateNodegroupImplReq::RT_ABORT:
6618  jam();
6619  break;
6620  case CreateNodegroupImplReq::RT_PARSE:
6621  jam();
6622  break;
6623  case CreateNodegroupImplReq::RT_PREPARE:
6624  jam();
6625  break;
6626  case CreateNodegroupImplReq::RT_COMMIT:
6627  jam();
6628  break;
6629  case CreateNodegroupImplReq::RT_COMPLETE:
6630  jam();
6631  CRASH_INSERTION(13043);
6632 
6633  Uint64 gci = (Uint64(req->gci_hi) << 32) | req->gci_lo;
6634  ndbrequire(gci > m_last_complete_gci);
6635 
6636  Uint32 state = 0;
6637  if (c_nodeGroup != RNIL)
6638  {
6639  jam();
6640  NdbNodeBitmask check = tmp;
6641  check.bitAND(c_nodes_in_nodegroup_mask);
6642  ndbrequire(check.isclear());
6643  ndbrequire(c_nodeGroup != group);
6644  ndbrequire(cnt == c_nodes_in_nodegroup_mask.count());
6645  state = Bucket::BUCKET_CREATED_OTHER;
6646  }
6647  else if (tmp.get(getOwnNodeId()))
6648  {
6649  jam();
6650  c_nodeGroup = group;
6651  c_nodes_in_nodegroup_mask.assign(tmp);
6652  fix_nodegroup();
6653  state = Bucket::BUCKET_CREATED_SELF;
6654  }
6655  if (state != 0)
6656  {
6657  for (Uint32 i = 0; i<c_no_of_buckets; i++)
6658  {
6659  jam();
6660  m_switchover_buckets.set(i);
6661  c_buckets[i].m_switchover_gci = gci - 1; // start from gci
6662  c_buckets[i].m_state = state | (c_no_of_buckets << 8);
6663  }
6664  }
6665  }
6666 
6667  {
6668  CreateNodegroupImplConf* conf =
6669  (CreateNodegroupImplConf*)signal->getDataPtrSend();
6670  conf->senderRef = reference();
6671  conf->senderData = req->senderData;
6672  sendSignal(req->senderRef, GSN_CREATE_NODEGROUP_IMPL_CONF, signal,
6673  CreateNodegroupImplConf::SignalLength, JBB);
6674  }
6675  return;
6676 
6677 //error:
6678  CreateNodegroupImplRef *ref =
6679  (CreateNodegroupImplRef*)signal->getDataPtrSend();
6680  ref->senderRef = reference();
6681  ref->senderData = req->senderData;
6682  ref->errorCode = err;
6683  sendSignal(req->senderRef, GSN_CREATE_NODEGROUP_IMPL_REF, signal,
6684  CreateNodegroupImplRef::SignalLength, JBB);
6685  return;
6686 }
6687 
6688 void
6689 Suma::execDROP_NODEGROUP_IMPL_REQ(Signal* signal)
6690 {
6692  signal->getDataPtr();
6693  DropNodegroupImplReq *req = &reqCopy;
6694 
6695  Uint32 err = 0;
6696  Uint32 rt = req->requestType;
6697  Uint32 group = req->nodegroupId;
6698 
6699  switch(rt){
6700  case DropNodegroupImplReq::RT_ABORT:
6701  jam();
6702  break;
6703  case DropNodegroupImplReq::RT_PARSE:
6704  jam();
6705  break;
6706  case DropNodegroupImplReq::RT_PREPARE:
6707  jam();
6708  break;
6709  case DropNodegroupImplReq::RT_COMMIT:
6710  jam();
6711  break;
6712  case DropNodegroupImplReq::RT_COMPLETE:
6713  jam();
6714  CRASH_INSERTION(13043);
6715 
6716  Uint64 gci = (Uint64(req->gci_hi) << 32) | req->gci_lo;
6717  ndbrequire(gci > m_last_complete_gci);
6718 
6719  Uint32 state;
6720  if (c_nodeGroup != group)
6721  {
6722  jam();
6723  state = Bucket::BUCKET_DROPPED_OTHER;
6724  break;
6725  }
6726  else
6727  {
6728  jam();
6729  state = Bucket::BUCKET_DROPPED_SELF;
6730  }
6731 
6732  for (Uint32 i = 0; i<c_no_of_buckets; i++)
6733  {
6734  jam();
6735  m_switchover_buckets.set(i);
6736  if (c_buckets[i].m_state != 0)
6737  {
6738  jamLine(c_buckets[i].m_state);
6739  ndbout_c("c_buckets[%u].m_state: %u", i, c_buckets[i].m_state);
6740  }
6741  ndbrequire(c_buckets[i].m_state == 0); // XXX todo
6742  c_buckets[i].m_switchover_gci = gci - 1; // start from gci
6743  c_buckets[i].m_state = state | (c_no_of_buckets << 8);
6744  }
6745  break;
6746  }
6747 
6748  {
6749  DropNodegroupImplConf* conf =
6750  (DropNodegroupImplConf*)signal->getDataPtrSend();
6751  conf->senderRef = reference();
6752  conf->senderData = req->senderData;
6753  sendSignal(req->senderRef, GSN_DROP_NODEGROUP_IMPL_CONF, signal,
6754  DropNodegroupImplConf::SignalLength, JBB);
6755  }
6756  return;
6757 
6758 //error:
6759  DropNodegroupImplRef *ref =
6760  (DropNodegroupImplRef*)signal->getDataPtrSend();
6761  ref->senderRef = reference();
6762  ref->senderData = req->senderData;
6763  ref->errorCode = err;
6764  sendSignal(req->senderRef, GSN_DROP_NODEGROUP_IMPL_REF, signal,
6765  DropNodegroupImplRef::SignalLength, JBB);
6766  return;
6767 }
6768 
6769 template void append(DataBuffer<11>&,SegmentedSectionPtr,SectionSegmentPool&);
6770