MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Suma.hpp
1 /*
2  Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 #ifndef SUMA_H
19 #define SUMA_H
20 
21 #include <ndb_limits.h>
22 #include <SimulatedBlock.hpp>
23 
24 #include <NodeBitmask.hpp>
25 
26 #include <SLList.hpp>
27 #include <DLList.hpp>
28 #include <DLCFifoList.hpp>
29 #include <KeyTable.hpp>
30 #include <DataBuffer.hpp>
31 #include <SignalCounter.hpp>
32 #include <AttributeHeader.hpp>
33 #include <AttributeList.hpp>
34 
35 #include <signaldata/UtilSequence.hpp>
36 #include <signaldata/SumaImpl.hpp>
37 #include <ndbapi/NdbDictionary.hpp>
38 
39 class Suma : public SimulatedBlock {
40  BLOCK_DEFINES(Suma);
41 public:
42  Suma(Block_context& ctx);
43  virtual ~Suma();
44 
48  void execSUB_CREATE_REQ(Signal* signal);
49  void execSUB_REMOVE_REQ(Signal* signal);
50 
51  void execSUB_START_REQ(Signal* signal);
52  void execSUB_STOP_REQ(Signal* signal);
53 
54  void execSUB_SYNC_REQ(Signal* signal);
55  void execSUB_ABORT_SYNC_REQ(Signal* signal);
56 
60  void execGET_TABINFOREF(Signal* signal);
61  void execGET_TABINFO_CONF(Signal* signal);
62 
63  void execGET_TABLEID_CONF(Signal* signal);
64  void execGET_TABLEID_REF(Signal* signal);
65 
66  void execDROP_TAB_CONF(Signal* signal);
67  void execALTER_TAB_REQ(Signal* signal);
68  void execCREATE_TAB_CONF(Signal* signal);
69 
70  void execDICT_LOCK_REF(Signal*);
71  void execDICT_LOCK_CONF(Signal*);
72 
76  void execSCAN_HBREP(Signal* signal);
77  void execSCAN_FRAGREF(Signal* signal);
78  void execSCAN_FRAGCONF(Signal* signal);
79  void execTRANSID_AI(Signal* signal);
80  void execKEYINFO20(Signal* signal);
81  void execSUB_SYNC_CONTINUE_REF(Signal* signal);
82  void execSUB_SYNC_CONTINUE_CONF(Signal* signal);
83 
87  void execTRIG_ATTRINFO(Signal* signal);
88  void execFIRE_TRIG_ORD(Signal* signal);
89  void execFIRE_TRIG_ORD_L(Signal* signal);
90  void execSUB_GCP_COMPLETE_REP(Signal* signal);
91 
95  void execDIH_SCAN_TAB_REF(Signal* signal);
96  void execDIH_SCAN_TAB_CONF(Signal* signal);
97  void execDIH_SCAN_GET_NODES_REF(Signal* signal);
98  void execDIH_SCAN_GET_NODES_CONF(Signal* signal);
99  void execCHECKNODEGROUPSCONF(Signal *signal);
100  void execGCP_PREPARE(Signal *signal);
101 
105  void execCREATE_TRIG_IMPL_REF(Signal* signal);
106  void execCREATE_TRIG_IMPL_CONF(Signal* signal);
107  void execDROP_TRIG_IMPL_REF(Signal* signal);
108  void execDROP_TRIG_IMPL_CONF(Signal* signal);
109 
113  void execCONTINUEB(Signal* signal);
114 
115  void execCREATE_NODEGROUP_IMPL_REQ(Signal*);
116  void execDROP_NODEGROUP_IMPL_REQ(Signal*);
117 public:
118 
119  void suma_ndbrequire(bool v);
120 
121  // wl4391_todo big enough for now
123  struct {
124  Uint8 m_fragmentNo;
125  Uint8 m_lqhInstanceKey;
126  Uint16 m_nodeId;
127  } m_fragDesc;
128  Uint32 m_dummy;
129  };
130 
135  struct {
136  Uint16 attrId;
137  Uint16 unused;
138  } m_attrDesc;
139  Uint32 m_dummy;
140  };
141 
142  struct Subscriber {
143  Subscriber() {}
144  Uint32 m_senderRef;
145  Uint32 m_senderData;
146  Uint32 nextList;
147 
148  union { Uint32 nextPool; Uint32 prevList; };
149  };
151 
152  struct Table;
153  friend struct Table;
154  typedef Ptr<Table> TablePtr;
155 
156  struct SyncRecord {
158  : suma(s)
159 #ifdef ERROR_INSERT
160  , cerrorInsert(s.cerrorInsert)
161 #endif
162  {}
163 
164  void release();
165 
166  Uint32 m_senderRef;
167  Uint32 m_senderData;
168 
169  Uint32 m_subscriptionPtrI;
170  Uint32 m_error;
171  Uint32 m_requestInfo;
172 
173  Uint32 m_frag_cnt; // only scan this many fragments...
174  Uint32 m_frag_id; // only scan this specific fragment...
175  Uint32 m_tableId; // redundant...
176 
181  DataBuffer<15>::Head m_fragments; // Fragment descriptors
182 
186  Uint32 m_currentFragment; // Index in tabPtr.p->m_fragments
187  Uint32 m_currentNoOfAttributes; // No of attributes for current table
188  DataBuffer<15>::Head m_attributeList; // Attribute if other than default
189  DataBuffer<15>::Head m_boundInfo; // For range scan
190 
191  void startScan(Signal*);
192  void nextScan(Signal*);
193  bool getNextFragment(TablePtr * tab, FragmentDescriptor * fd);
194  void completeScan(Signal*, int error= 0);
195 
196  Suma & suma;
197 #ifdef ERROR_INSERT
198  UintR &cerrorInsert;
199 #endif
200  BlockNumber number() const { return suma.number(); }
201  EmulatedJamBuffer *jamBuffer() const { return suma.jamBuffer(); }
202  void progError(int line, int cause, const char * extra) {
203  suma.progError(line, cause, extra);
204  }
205 
206  Uint32 prevList; Uint32 ptrI;
207  union { Uint32 nextPool; Uint32 nextList; };
208  };
209  friend struct SyncRecord;
210 
211  struct SubOpRecord
212  {
213  SubOpRecord() {}
214 
215  enum OpType
216  {
217  R_SUB_START_REQ,
218  R_SUB_STOP_REQ,
219  R_START_ME_REQ,
220  R_API_FAIL_REQ,
221  R_SUB_ABORT_START_REQ
222  };
223 
224  Uint32 m_opType;
225  Uint32 m_subPtrI;
226  Uint32 m_senderRef;
227  Uint32 m_senderData;
228  Uint32 m_subscriberRef;
229  Uint32 m_subscriberData;
230 
231  Uint32 nextList;
232  union {
233  Uint32 prevList;
234  Uint32 nextPool;
235  };
236  };
237  friend struct SubOpRecord;
238 
240  {
241  Uint32 m_seq_no;
242  Uint32 m_subscriptionId;
243  Uint32 m_subscriptionKey;
244  Uint32 m_subscriptionType;
245  Uint32 m_schemaTransId;
246  Uint16 m_options;
247 
248  enum Options {
249  REPORT_ALL = 0x1,
250  REPORT_SUBSCRIBE = 0x2,
251  MARKED_DROPPED = 0x4,
252  NO_REPORT_DDL = 0x8
253  };
254 
255  enum State {
256  UNDEFINED,
257  DEFINED,
258  DEFINING
259  };
260 
261  enum TriggerState {
262  T_UNDEFINED,
263  T_CREATING,
264  T_DEFINED,
265  T_DROPPING,
266  T_ERROR
267  };
268 
269  State m_state;
270  TriggerState m_trigger_state;
271 
272  DLList<Subscriber>::Head m_subscribers;
273  DLFifoList<SubOpRecord>::Head m_create_req;
274  DLFifoList<SubOpRecord>::Head m_start_req;
276  DLList<SyncRecord>::Head m_syncRecords;
277 
278  Uint32 m_errorCode;
279  Uint32 m_outstanding_trigger;
280  Uint32 m_triggers[3];
281 
282  Uint32 nextList, prevList;
283  Uint32 nextHash;
284  union { Uint32 prevHash; Uint32 nextPool; };
285 
286  Uint32 hashValue() const {
287  return m_subscriptionId + m_subscriptionKey;
288  }
289 
290  bool equal(const Subscription & s) const {
291  return
292  m_subscriptionId == s.m_subscriptionId &&
293  m_subscriptionKey == s.m_subscriptionKey;
294  }
299  Uint32 m_tableId;
300  Uint32 m_table_ptrI;
301  };
303 
304  struct Table {
305  Table() { m_tableId = ~0; }
306  void release(Suma&);
307 
308  DLList<Subscription>::Head m_subscriptions;
309 
310  enum State {
311  UNDEFINED,
312  DEFINING,
313  DEFINED,
314  DROPPED
315  };
316  State m_state;
317 
318  Uint32 m_ptrI;
319 
320  bool parseTable(SegmentedSectionPtr ptr, Suma &suma);
324  void createAttributeMask(AttributeMask&, Suma &suma);
325 
326  union { Uint32 m_tableId; Uint32 key; };
327  Uint32 m_schemaVersion;
328 
329  Uint32 m_error;
330 
331  Uint32 m_noOfAttributes;
332 
336  Uint32 nextHash;
337  union { Uint32 prevHash; Uint32 nextPool; };
338  Uint32 hashValue() const {
339  return m_tableId;
340  }
341  bool equal(const Table& rec) const {
342  return m_tableId == rec.m_tableId;
343  }
344 
345  // copy from Subscription
346  Uint32 m_schemaTransId;
347  };
348 
357  DLHashTable<Subscription> c_subscriptions;
358 
363  ArrayPool<Table> c_tablePool;
364  ArrayPool<Subscription> c_subscriptionPool;
365  ArrayPool<SyncRecord> c_syncPool;
366  DataBuffer<15>::DataBufferPool c_dataBufferPool;
367  ArrayPool<SubOpRecord> c_subOpPool;
368 
369  Uint32 c_maxBufferedEpochs;
370 
371  NodeBitmask c_failedApiNodes;
372  Uint32 c_failedApiNodesState[MAX_NODES];
373 
377  bool removeSubscribersOnNode(Signal *signal, Uint32 nodeId);
378 
379  void sendSubIdRef(Signal* signal,Uint32 senderRef,Uint32 senderData,Uint32 errorCode);
380 
381  void sendSubCreateRef(Signal* signal, Uint32 ref, Uint32 data, Uint32 error);
382  void sendSubStartRef(Signal* signal, Uint32 ref, Uint32 data, Uint32 error);
383  void sendSubStopRef(Signal* signal, Uint32 ref, Uint32 data, Uint32 error);
384  void report_sub_stop_conf(Signal* signal,
385  Ptr<SubOpRecord> subOpPtr,
386  Ptr<Subscriber> ptr,
387  bool report,
389 
390  void sendSubSyncRef(Signal* signal, Uint32 errorCode);
391  void sendSubRemoveRef(Signal* signal, const SubRemoveReq& ref,
392  Uint32 errorCode);
393  void sendSubStopReq(Signal* signal, bool unlock= false);
394 
395  void completeSubRemove(SubscriptionPtr subPtr);
396 
397  void send_sub_start_stop_event(Signal *signal,
398  Ptr<Subscriber> ptr,
399  NdbDictionary::Event::_TableEvent event,
400  bool report,
402 
403  Uint32 getFirstGCI(Signal* signal);
404 
405  void create_triggers(Signal*, Ptr<Subscription>);
406  void drop_triggers(Signal*, Ptr<Subscription>);
408 
409  bool check_sub_start(Uint32 subscriberRef);
410  void report_sub_start_conf(Signal* signal, Ptr<Subscription> subPtr);
411  void report_sub_start_ref(Signal* signal, Ptr<Subscription> subPtr, Uint32);
412 
413  void sub_stop_req(Signal*);
414  void check_remove_queue(Signal*, Ptr<Subscription>,
415  Ptr<SubOpRecord>,bool,bool);
417  void get_tabinfo_ref_release(Signal*, Ptr<Table>);
418 
422  void execCREATE_SUBSCRIPTION_REQ(Signal* signal);
423  void execDROP_SUBSCRIPTION_REQ(Signal* signal);
424 
425  void execSTART_SUBSCRIPTION_REQ(Signal* signal);
426  void execSTOP_SUBSCRIPTION_REQ(Signal* signal);
427 
428  void execSYNC_SUBSCRIPTION_REQ(Signal* signal);
429  void execABORT_SYNC_REQ(Signal* signal);
430 
435  void getNodeGroupMembers(Signal* signal);
436  void execREAD_CONFIG_REQ(Signal* signal);
437 
438  void execSTTOR(Signal* signal);
439  void sendSTTORRY(Signal*);
440  void execNDB_STTOR(Signal* signal);
441  void execDUMP_STATE_ORD(Signal* signal);
442  void execDBINFO_SCANREQ(Signal* signal);
443  void execREAD_NODESCONF(Signal* signal);
444  void execNODE_FAILREP(Signal* signal);
445  void execINCL_NODEREQ(Signal* signal);
446  void execSIGNAL_DROPPED_REP(Signal* signal);
447  void execAPI_START_REP(Signal* signal);
448  void execAPI_FAILREQ(Signal* signal) ;
449 
450  void api_fail_gci_list(Signal*, Uint32 node);
451  void api_fail_subscriber_list(Signal*, Uint32 node);
453  void api_fail_block_cleanup(Signal* signal, Uint32 failedNode);
454  void api_fail_block_cleanup_callback(Signal* signal,
455  Uint32 failedNodeId,
456  Uint32 elementsCleaned);
457 
458  void execSUB_GCP_COMPLETE_ACK(Signal* signal);
459 
463  void execSUB_CREATE_REF(Signal* signal);
464  void execSUB_CREATE_CONF(Signal* signal);
465 
466  void execSUB_DROP_REF(Signal* signal);
467  void execSUB_DROP_CONF(Signal* signal);
468 
469  void execSUB_START_REF(Signal* signal);
470  void execSUB_START_CONF(Signal* signal);
471 
472  void execSUB_ABORT_SYNC_REF(Signal* signal);
473  void execSUB_ABORT_SYNC_CONF(Signal* signal);
474 
475  void execSUMA_START_ME_REQ(Signal* signal);
476  void execSUMA_START_ME_REF(Signal* signal);
477  void execSUMA_START_ME_CONF(Signal* signal);
478 
479  void execSTOP_ME_REQ(Signal*);
480 
483  void copySubscriber(Signal*, Ptr<Subscription>, Ptr<Subscriber>);
484  void abort_start_me(Signal*, Ptr<Subscription>, bool lockowner);
485 
486  void execSUMA_HANDOVER_REQ(Signal* signal);
487  void execSUMA_HANDOVER_REF(Signal* signal);
488  void execSUMA_HANDOVER_CONF(Signal* signal);
489 
493  void createSequence(Signal* signal);
494  void createSequenceReply(Signal* signal,
495  UtilSequenceConf* conf,
496  UtilSequenceRef* ref);
497  void execUTIL_SEQUENCE_CONF(Signal* signal);
498  void execUTIL_SEQUENCE_REF(Signal* signal);
499  void execCREATE_SUBID_REQ(Signal* signal);
500 
505  // for LQH transporter overload check
506  const NodeBitmask& getSubscriberNodes() const { return c_subscriber_nodes; }
507 
508 private:
512  NodeId c_masterNodeId;
513  NdbNodeBitmask c_alive_nodes;
514 
518  struct Startup
519  {
520  bool m_wait_handover;
521  Uint32 m_restart_server_node_id;
522  NdbNodeBitmask m_handover_nodes;
523  } c_startup;
524 
528  struct Shutdown
529  {
530  bool m_wait_handover;
531  Uint32 m_senderRef;
532  Uint32 m_senderData;
533  } c_shutdown;
534 
535  struct Restart
536  {
537  Uint16 m_abort;
538  Uint16 m_waiting_on_self;
539  Uint32 m_ref;
540  Uint32 m_max_seq;
541  Uint32 m_subPtrI;
542  Uint32 m_subOpPtrI;
543  Uint32 m_bucket; // In c_subscribers hashtable
544  } c_restart;
545 
546  Uint32 c_current_seq; // Sequence no on subscription(s)
547  Uint32 c_outstanding_drop_trig_req;
548 
549  NodeBitmask c_connected_nodes; // (NODE/API) START REP / (API/NODE) FAIL REQ
550  NodeBitmask c_subscriber_nodes; //
551 
555  Uint32 c_nodeGroup;
556  Uint32 c_noNodesInGroup;
557  Uint32 c_nodesInGroup[MAX_REPLICAS];
558  NdbNodeBitmask c_nodes_in_nodegroup_mask; // NodeId's of nodes in nodegroup
559 
560  void send_dict_lock_req(Signal* signal, Uint32 state);
561  void send_dict_unlock_ord(Signal* signal, Uint32 state);
562  void send_start_me_req(Signal* signal);
563  void check_start_handover(Signal* signal);
564  void send_handover_req(Signal* signal, Uint32 type);
565 
566  Uint32 get_responsible_node(Uint32 B) const;
567  Uint32 get_responsible_node(Uint32 B, const NdbNodeBitmask& mask) const;
568  bool check_switchover(Uint32 bucket, Uint64 gci);
569 
570  void fix_nodegroup();
571 
572 public:
573  struct Page_pos
574  {
575  Uint32 m_page_id;
576  Uint32 m_page_pos;
577  Uint64 m_max_gci; // max gci on page
578  Uint64 m_last_gci; // last gci on page
579  };
580 private:
581 
582  struct Bucket
583  {
584  enum {
585  BUCKET_STARTING = 0x1 // On starting node
586  ,BUCKET_HANDOVER = 0x2 // On running node
587  ,BUCKET_TAKEOVER = 0x4 // On takeing over node
588  ,BUCKET_RESEND = 0x8 // On takeing over node
589  ,BUCKET_CREATED_SELF = 0x10 // New nodegroup (me)
590  ,BUCKET_CREATED_OTHER = 0x20 // New nodegroup (not me)
591  ,BUCKET_CREATED_MASK = (BUCKET_CREATED_SELF | BUCKET_CREATED_OTHER)
592  ,BUCKET_DROPPED_SELF = 0x40 // New nodegroup (me) uses hi 8 bit for cnt
593  ,BUCKET_DROPPED_OTHER = 0x80 // New nodegroup (not me)
594  ,BUCKET_DROPPED_MASK = (BUCKET_DROPPED_SELF | BUCKET_DROPPED_OTHER)
595  ,BUCKET_SHUTDOWN = 0x100 // Graceful shutdown
596  ,BUCKET_SHUTDOWN_TO = 0x200 // Graceful shutdown
597  };
598  Uint16 m_state;
599  Uint16 m_switchover_node;
600  Uint16 m_nodes[MAX_REPLICAS];
601  Uint32 m_buffer_tail; // Page
602  Uint64 m_switchover_gci;
603  Uint64 m_max_acked_gci;
604  Page_pos m_buffer_head;
605  };
606 
607  struct Buffer_page
608  {
609  STATIC_CONST( DATA_WORDS = 8192 - 10);
610  STATIC_CONST( GCI_SZ32 = 2 );
611 
612  Uint32 _tupdata1;
613  Uint32 _tupdata2;
614  Uint32 _tupdata3;
615  Uint32 _tupdata4;
616  Uint32 m_page_state; // Used by TUP buddy algorithm
617  Uint32 m_page_chunk_ptr_i;
618  Uint32 m_next_page;
619  Uint32 m_words_used; //
620  Uint32 m_max_gci_hi; //
621  Uint32 m_max_gci_lo; //
622  Uint32 m_data[DATA_WORDS];
623  };
624 
625  STATIC_CONST( NO_OF_BUCKETS = 24 ); // 24 = 4*3*2*1!
626  Uint32 c_no_of_buckets;
627  struct Bucket c_buckets[NO_OF_BUCKETS];
628  Uint32 c_subscriber_per_node[MAX_NODES];
629 
630  STATIC_CONST( BUCKET_MASK_SIZE = (((NO_OF_BUCKETS+31)>> 5)) );
631  typedef Bitmask<BUCKET_MASK_SIZE> Bucket_mask;
632  Bucket_mask m_active_buckets;
633  Bucket_mask m_switchover_buckets;
634 
635  void init_buffers();
636  Uint32* get_buffer_ptr(Signal*, Uint32 buck, Uint64 gci, Uint32 sz);
637  Uint32 seize_page();
638  void free_page(Uint32 page_id, Buffer_page* page);
639  void out_of_buffer(Signal*);
640  void out_of_buffer_release(Signal* signal, Uint32 buck);
641 
642  void start_resend(Signal*, Uint32 bucket);
643  void resend_bucket(Signal*, Uint32 bucket, Uint64 gci,
644  Uint32 page_pos, Uint64 last_gci);
645  void release_gci(Signal*, Uint32 bucket, Uint64 gci);
646 
647  Uint64 get_current_gci(Signal*);
648 
649  void checkMaxBufferedEpochs(Signal *signal);
650 
651  Uint64 m_max_seen_gci; // FIRE_TRIG_ORD
652  Uint64 m_max_sent_gci; // FIRE_TRIG_ORD -> send
653  Uint64 m_last_complete_gci; // SUB_GCP_COMPLETE_REP
654  Uint64 m_out_of_buffer_gci;
655  Uint32 m_gcp_complete_rep_count;
656  bool m_missing_data;
657 
658  struct Gcp_record
659  {
660  Uint64 m_gci;
661  NodeBitmask m_subscribers;
662  union {
663  Uint32 nextPool;
664  Uint32 nextList;
665  };
666  Uint32 prevList;
667  };
668  ArrayPool<Gcp_record> c_gcp_pool;
669  DLCFifoList<Gcp_record> c_gcp_list;
670 
671  struct Page_chunk
672  {
673  Uint32 m_page_id;
674  Uint32 m_size;
675  Uint32 m_free;
676  union {
677  Uint32 nextPool;
678  Uint32 nextList;
679  };
680  Uint32 prevList;
681  };
682 
683  Uint32 m_first_free_page;
684  ArrayPool<Page_chunk> c_page_chunk_pool;
685  ArrayPool<Buffer_page> c_page_pool;
686 
687 #ifdef VM_TRACE
688  Uint64 m_gcp_monitor;
689 #endif
690 
691  struct SubGcpCompleteCounter
692  {
693  Uint64 m_gci;
694  Uint32 m_cnt;
695  };
696 
697  Uint32 m_gcp_rep_cnt;
698  Uint32 m_min_gcp_rep_counter_index;
699  Uint32 m_max_gcp_rep_counter_index;
700  struct SubGcpCompleteCounter m_gcp_rep_counter[10];
701 
702  /* Buffer used in Suma::execALTER_TAB_REQ(). */
703  Uint32 b_dti_buf[MAX_WORDS_META_FILE];
704  Uint64 m_current_gci;
705 
706  Uint32 m_startphase;
707  Uint32 m_typeOfStart;
708 
709  void sendScanSubTableData(Signal* signal, Ptr<SyncRecord>, Uint32);
710 };
711 
712 #endif