MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
LocalProxy.hpp
1 /* Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
2 
3  This program is free software; you can redistribute it and/or modify
4  it under the terms of the GNU General Public License as published by
5  the Free Software Foundation; version 2 of the License.
6 
7  This program is distributed in the hope that it will be useful,
8  but WITHOUT ANY WARRANTY; without even the implied warranty of
9  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  GNU General Public License for more details.
11 
12  You should have received a copy of the GNU General Public License
13  along with this program; if not, write to the Free Software
14  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
15 
16 #ifndef NDB_LOCAL_PROXY_HPP
17 #define NDB_LOCAL_PROXY_HPP
18 
19 #include <pc.hpp>
20 #include <SimulatedBlock.hpp>
21 #include <Bitmask.hpp>
22 #include <DLFifoList.hpp>
23 #include <signaldata/ReadConfig.hpp>
24 #include <signaldata/NdbSttor.hpp>
25 #include <signaldata/ReadNodesConf.hpp>
26 #include <signaldata/NodeFailRep.hpp>
27 #include <signaldata/NodeStateSignalData.hpp>
28 #include <signaldata/NFCompleteRep.hpp>
29 #include <signaldata/CreateTrigImpl.hpp>
30 #include <signaldata/DropTrigImpl.hpp>
31 #include <signaldata/DbinfoScan.hpp>
32 #include <signaldata/Sync.hpp>
33 
34 /*
35  * Proxy blocks for MT LQH.
36  *
37  * The LQH proxy is the LQH block seen by other nodes and blocks,
38  * unless by-passed for efficiency. Real LQH instances (workers)
39  * run behind it. The instance number is 1 + worker index.
40  *
41  * There are also proxies and workers for ACC, TUP, TUX, BACKUP,
42  * RESTORE, and PGMAN. Proxy classes are subclasses of LocalProxy.
43  * Workers with same instance number (one from each class) run in
44  * same thread.
45  *
46  * After LQH workers there is an optional extra worker. It runs
47  * in the thread of the main block (i.e. the proxy). Its instance
48  * number is fixed as 1 + MaxLqhWorkers (currently 5) i.e. it skips
49  * over any unused LQH instance numbers.
50  */
51 
52 class LocalProxy : public SimulatedBlock {
53 public:
54  LocalProxy(BlockNumber blockNumber, Block_context& ctx);
55  virtual ~LocalProxy();
56  BLOCK_DEFINES(LocalProxy);
57 
58 protected:
59  enum { MaxLqhWorkers = MAX_NDBMT_LQH_WORKERS };
60  enum { MaxExtraWorkers = 1 };
61  enum { MaxWorkers = MaxLqhWorkers + MaxExtraWorkers };
62  typedef Bitmask<(MaxWorkers+31)/32> WorkerMask;
63  Uint32 c_lqhWorkers;
64  Uint32 c_extraWorkers;
65  Uint32 c_workers;
66  // no gaps - extra worker has index c_lqhWorkers (not MaxLqhWorkers)
67  SimulatedBlock* c_worker[MaxWorkers];
68 
69  virtual SimulatedBlock* newWorker(Uint32 instanceNo) = 0;
70  virtual void loadWorkers();
71 
72  // get worker block by index (not by instance)
73 
74  SimulatedBlock* workerBlock(Uint32 i) {
75  ndbrequire(i < c_workers);
76  ndbrequire(c_worker[i] != 0);
77  return c_worker[i];
78  }
79 
80  SimulatedBlock* extraWorkerBlock() {
81  return workerBlock(c_lqhWorkers);
82  }
83 
84  // get worker block reference by index (not by instance)
85 
86  BlockReference workerRef(Uint32 i) {
87  return numberToRef(number(), workerInstance(i), getOwnNodeId());
88  }
89 
90  BlockReference extraWorkerRef() {
91  ndbrequire(c_workers == c_lqhWorkers + 1);
92  Uint32 i = c_lqhWorkers;
93  return workerRef(i);
94  }
95 
96  // convert between worker index and worker instance
97 
98  Uint32 workerInstance(Uint32 i) const {
99  ndbrequire(i < c_workers);
100  Uint32 ino;
101  if (i < c_lqhWorkers)
102  ino = 1 + i;
103  else
104  ino = 1 + MaxLqhWorkers;
105  return ino;
106  }
107 
108  Uint32 workerIndex(Uint32 ino) const {
109  ndbrequire(ino != 0);
110  Uint32 i;
111  if (ino != 1 + MaxLqhWorkers)
112  i = ino - 1;
113  else
114  i = c_lqhWorkers;
115  ndbrequire(i < c_workers);
116  return i;
117  }
118 
119  // support routines and classes ("Ss" = signal state)
120 
121  typedef void (LocalProxy::*SsFUNCREQ)(Signal*, Uint32 ssId, SectionHandle*);
122  typedef void (LocalProxy::*SsFUNCREP)(Signal*, Uint32 ssId);
123 
124  struct SsCommon {
125  Uint32 m_ssId; // unique id in SsPool (below)
126  SsFUNCREQ m_sendREQ; // from proxy to worker
127  SsFUNCREP m_sendCONF; // from proxy to caller
128  Uint32 m_worker; // current worker
129  Uint32 m_error;
130  Uint32 m_sec_cnt;
131  Uint32 m_sec_ptr[3];
132  static const char* name() { return "UNDEF"; }
133  SsCommon() {
134  m_ssId = 0;
135  m_sendREQ = 0;
136  m_sendCONF = 0;
137  m_worker = 0;
138  m_error = 0;
139  m_sec_cnt = 0;
140  }
141  };
142 
143  // run workers sequentially
145  SsSequential() {}
146  };
147  void sendREQ(Signal*, SsSequential& ss);
148  void recvCONF(Signal*, SsSequential& ss);
149  void recvREF(Signal*, SsSequential& ss, Uint32 error);
150  // for use in sendREQ
151  void skipReq(SsSequential& ss);
152  void skipConf(SsSequential& ss);
153  // for use in sendCONF
154  bool firstReply(const SsSequential& ss);
155  bool lastReply(const SsSequential& ss);
156 
157  void saveSections(SsCommon&ss, SectionHandle&);
158  void restoreHandle(SectionHandle&, SsCommon&);
159 
160  // run workers in parallel
161  struct SsParallel : SsCommon {
162  WorkerMask m_workerMask;
163  bool m_extraLast; // run extra after LQH workers
164  Uint32 m_extraSent;
165  SsParallel() {
166  m_extraLast = false;
167  m_extraSent = 0;
168  }
169  };
170  void sendREQ(Signal*, SsParallel& ss);
171  void recvCONF(Signal*, SsParallel& ss);
172  void recvREF(Signal*, SsParallel& ss, Uint32 error);
173  // for use in sendREQ
174  void skipReq(SsParallel& ss);
175  void skipConf(SsParallel& ss);
176  // for use in sendCONF
177  bool firstReply(const SsParallel& ss);
178  bool lastReply(const SsParallel& ss);
179  bool lastExtra(Signal* signal, SsParallel& ss);
180  // set all or given bits in worker mask
181  void setMask(SsParallel& ss);
182  void setMask(SsParallel& ss, const WorkerMask& mask);
183 
184  /*
185  * Ss instances are seized from a pool. Each pool is simply an array
186  * of Ss instances. Usually poolSize is 1. Some signals need a few
187  * more but the heavy stuff (query/DML) by-passes the proxy.
188  *
189  * Each Ss instance has a unique Uint32 ssId. If there are multiple
190  * instances then ssId must be computable from signal data. One option
191  * often is to use a generated ssId and set it as senderData,
192  */
193 
194  template <class Ss>
195  struct SsPool {
196  Ss m_pool[Ss::poolSize];
197  Uint32 m_usage;
198  SsPool() {
199  m_usage = 0;
200  }
201  };
202 
203  Uint32 c_ssIdSeq;
204 
205  // convenient for adding non-zero high bit
206  enum { SsIdBase = (1u << 31) };
207 
208  template <class Ss>
209  Ss* ssSearch(Uint32 ssId)
210  {
211  SsPool<Ss>& sp = Ss::pool(this);
212  Ss* ssptr = 0;
213  for (Uint32 i = 0; i < Ss::poolSize; i++) {
214  if (sp.m_pool[i].m_ssId == ssId) {
215  ssptr = &sp.m_pool[i];
216  break;
217  }
218  }
219  return ssptr;
220  }
221 
222  template <class Ss>
223  Ss& ssSeize() {
224  const Uint32 base = SsIdBase;
225  const Uint32 mask = ~base;
226  const Uint32 ssId = base | c_ssIdSeq;
227  c_ssIdSeq = (c_ssIdSeq + 1) & mask;
228  return ssSeize<Ss>(ssId);
229  }
230 
231  template <class Ss>
232  Ss& ssSeize(Uint32 ssId) {
233  SsPool<Ss>& sp = Ss::pool(this);
234  ndbrequire(sp.m_usage < Ss::poolSize);
235  ndbrequire(ssId != 0);
236  Ss* ssptr;
237  // check for duplicate
238  ssptr = ssSearch<Ss>(ssId);
239  ndbrequire(ssptr == 0);
240  // search for free
241  ssptr = ssSearch<Ss>(0);
242  ndbrequire(ssptr != 0);
243  // set methods, clear bitmasks, etc
244  new (ssptr) Ss;
245  ssptr->m_ssId = ssId;
246  sp.m_usage++;
247  D("ssSeize" << V(sp.m_usage) << hex << V(ssId) << " " << Ss::name());
248  return *ssptr;
249  }
250 
251  template <class Ss>
252  Ss& ssFind(Uint32 ssId) {
253  ndbrequire(ssId != 0);
254  Ss* ssptr = ssSearch<Ss>(ssId);
255  ndbrequire(ssptr != 0);
256  return *ssptr;
257  }
258 
259  /*
260  * In some cases it may not be known if this is first request.
261  * This situation should be avoided by adding signal data or
262  * by keeping state in the proxy instance.
263  */
264  template <class Ss>
265  Ss& ssFindSeize(Uint32 ssId, bool* found) {
266  ndbrequire(ssId != 0);
267  Ss* ssptr = ssSearch<Ss>(ssId);
268  if (ssptr != 0) {
269  if (found)
270  *found = true;
271  return *ssptr;
272  }
273  if (found)
274  *found = false;
275  return ssSeize<Ss>(ssId);
276  }
277 
278  template <class Ss>
279  void ssRelease(Uint32 ssId) {
280  SsPool<Ss>& sp = Ss::pool(this);
281  ndbrequire(sp.m_usage != 0);
282  ndbrequire(ssId != 0);
283  D("ssRelease" << V(sp.m_usage) << hex << V(ssId) << " " << Ss::name());
284  Ss* ssptr = ssSearch<Ss>(ssId);
285  ndbrequire(ssptr != 0);
286  ssptr->m_ssId = 0;
287  ndbrequire(sp.m_usage > 0);
288  sp.m_usage--;
289  }
290 
291  template <class Ss>
292  void ssRelease(Ss& ss) {
293  ssRelease<Ss>(ss.m_ssId);
294  }
295 
296  /*
297  * In some cases handle pool full via delayed signal.
298  * wl4391_todo maybe use CONTINUEB and guard against infinite loop.
299  */
300  template <class Ss>
301  bool ssQueue(Signal* signal) {
302  SsPool<Ss>& sp = Ss::pool(this);
303  if (sp.m_usage < Ss::poolSize)
304  return false;
305 
306  SectionHandle handle(this, signal);
307  GlobalSignalNumber gsn = signal->header.theVerId_signalNumber & 0xFFFF;
308  sendSignalWithDelay(reference(), gsn,
309  signal, 10, signal->length(), &handle);
310  return true;
311  }
312 
313  // system info
314 
315  Uint32 c_typeOfStart;
316  Uint32 c_masterNodeId;
317 
318  // GSN_READ_CONFIG_REQ
320  ReadConfigReq m_req;
322  m_sendREQ = &LocalProxy::sendREAD_CONFIG_REQ;
323  m_sendCONF = &LocalProxy::sendREAD_CONFIG_CONF;
324  }
325  enum { poolSize = 1 };
326  static SsPool<Ss_READ_CONFIG_REQ>& pool(LocalProxy* proxy) {
327  return proxy->c_ss_READ_CONFIG_REQ;
328  }
329  };
330  SsPool<Ss_READ_CONFIG_REQ> c_ss_READ_CONFIG_REQ;
331  void execREAD_CONFIG_REQ(Signal*);
332  virtual void callREAD_CONFIG_REQ(Signal*);
333  void backREAD_CONFIG_REQ(Signal*);
334  void sendREAD_CONFIG_REQ(Signal*, Uint32 ssId, SectionHandle*);
335  void execREAD_CONFIG_CONF(Signal*);
336  void sendREAD_CONFIG_CONF(Signal*, Uint32 ssId);
337 
338  // GSN_STTOR
339  struct Ss_STTOR : SsParallel {
340  Uint32 m_reqlength;
341  Uint32 m_reqdata[25];
342  Uint32 m_conflength;
343  Uint32 m_confdata[25];
344  Ss_STTOR() {
345  m_sendREQ = &LocalProxy::sendSTTOR;
346  m_sendCONF = &LocalProxy::sendSTTORRY;
347  }
348  enum { poolSize = 1 };
349  static SsPool<Ss_STTOR>& pool(LocalProxy* proxy) {
350  return proxy->c_ss_STTOR;
351  }
352  };
353  SsPool<Ss_STTOR> c_ss_STTOR;
354  void execSTTOR(Signal*);
355  virtual void callSTTOR(Signal*);
356  void backSTTOR(Signal*);
357  void sendSTTOR(Signal*, Uint32 ssId, SectionHandle*);
358  void execSTTORRY(Signal*);
359  void sendSTTORRY(Signal*, Uint32 ssId);
360 
361  // GSN_NDB_STTOR
363  NdbSttor m_req;
364  enum { m_reqlength = sizeof(NdbSttor) >> 2 };
365  Ss_NDB_STTOR() {
366  m_sendREQ = &LocalProxy::sendNDB_STTOR;
367  m_sendCONF = &LocalProxy::sendNDB_STTORRY;
368  }
369  enum { poolSize = 1 };
370  static SsPool<Ss_NDB_STTOR>& pool(LocalProxy* proxy) {
371  return proxy->c_ss_NDB_STTOR;
372  }
373  };
374  SsPool<Ss_NDB_STTOR> c_ss_NDB_STTOR;
375  void execNDB_STTOR(Signal*);
376  virtual void callNDB_STTOR(Signal*);
377  void backNDB_STTOR(Signal*);
378  void sendNDB_STTOR(Signal*, Uint32 ssId, SectionHandle*);
379  void execNDB_STTORRY(Signal*);
380  void sendNDB_STTORRY(Signal*, Uint32 ssId);
381 
382  // GSN_READ_NODESREQ
384  GlobalSignalNumber m_gsn; // STTOR or NDB_STTOR
386  m_gsn = 0;
387  }
388  };
389  Ss_READ_NODES_REQ c_ss_READ_NODESREQ;
390  void sendREAD_NODESREQ(Signal*);
391  void execREAD_NODESCONF(Signal*);
392  void execREAD_NODESREF(Signal*);
393 
394  // GSN_NODE_FAILREP
396  NodeFailRep m_req;
397  // REQ sends NdbNodeBitmask but CONF sends nodeId at a time
398  NdbNodeBitmask m_waitFor[MaxWorkers];
399  Ss_NODE_FAILREP() {
400  m_sendREQ = &LocalProxy::sendNODE_FAILREP;
401  m_sendCONF = &LocalProxy::sendNF_COMPLETEREP;
402  }
403  // some blocks do not reply
404  static bool noReply(BlockNumber blockNo) {
405  return
406  blockNo == BACKUP;
407  }
408  enum { poolSize = 1 };
409  static SsPool<Ss_NODE_FAILREP>& pool(LocalProxy* proxy) {
410  return proxy->c_ss_NODE_FAILREP;
411  }
412  };
413  SsPool<Ss_NODE_FAILREP> c_ss_NODE_FAILREP;
414  void execNODE_FAILREP(Signal*);
415  void sendNODE_FAILREP(Signal*, Uint32 ssId, SectionHandle*);
416  void execNF_COMPLETEREP(Signal*);
417  void sendNF_COMPLETEREP(Signal*, Uint32 ssId);
418 
419  // GSN_INCL_NODEREQ
421  // future-proof by allocating max length
422  struct Req {
423  Uint32 senderRef;
424  Uint32 inclNodeId;
425  Uint32 word[23];
426  };
427  struct Conf {
428  Uint32 inclNodeId;
429  Uint32 senderRef;
430  };
431  Uint32 m_reqlength;
432  Req m_req;
433  Ss_INCL_NODEREQ() {
434  m_sendREQ = &LocalProxy::sendINCL_NODEREQ;
435  m_sendCONF = &LocalProxy::sendINCL_NODECONF;
436  }
437  enum { poolSize = 1 };
438  static SsPool<Ss_INCL_NODEREQ>& pool(LocalProxy* proxy) {
439  return proxy->c_ss_INCL_NODEREQ;
440  }
441  };
442  SsPool<Ss_INCL_NODEREQ> c_ss_INCL_NODEREQ;
443  void execINCL_NODEREQ(Signal*);
444  void sendINCL_NODEREQ(Signal*, Uint32 ssId, SectionHandle*);
445  void execINCL_NODECONF(Signal*);
446  void sendINCL_NODECONF(Signal*, Uint32 ssId);
447 
448  // GSN_NODE_STATE_REP
451  m_sendREQ = &LocalProxy::sendNODE_STATE_REP;
452  m_sendCONF = 0;
453  }
454  enum { poolSize = 1 };
455  static SsPool<Ss_NODE_STATE_REP>& pool(LocalProxy* proxy) {
456  return proxy->c_ss_NODE_STATE_REP;
457  }
458  };
459  SsPool<Ss_NODE_STATE_REP> c_ss_NODE_STATE_REP;
460  void execNODE_STATE_REP(Signal*);
461  void sendNODE_STATE_REP(Signal*, Uint32 ssId, SectionHandle*);
462 
463  // GSN_CHANGE_NODE_STATE_REQ
465  ChangeNodeStateReq m_req;
467  m_sendREQ = &LocalProxy::sendCHANGE_NODE_STATE_REQ;
469  }
470  enum { poolSize = 1 };
471  static SsPool<Ss_CHANGE_NODE_STATE_REQ>& pool(LocalProxy* proxy) {
472  return proxy->c_ss_CHANGE_NODE_STATE_REQ;
473  }
474  };
475  SsPool<Ss_CHANGE_NODE_STATE_REQ> c_ss_CHANGE_NODE_STATE_REQ;
476  void execCHANGE_NODE_STATE_REQ(Signal*);
477  void sendCHANGE_NODE_STATE_REQ(Signal*, Uint32 ssId, SectionHandle*);
478  void execCHANGE_NODE_STATE_CONF(Signal*);
479  void sendCHANGE_NODE_STATE_CONF(Signal*, Uint32 ssId);
480 
481  // GSN_DUMP_STATE_ORD
483  Uint32 m_reqlength;
484  Uint32 m_reqdata[25];
486  m_sendREQ = &LocalProxy::sendDUMP_STATE_ORD;
487  m_sendCONF = 0;
488  }
489  enum { poolSize = 1 };
490  static SsPool<Ss_DUMP_STATE_ORD>& pool(LocalProxy* proxy) {
491  return proxy->c_ss_DUMP_STATE_ORD;
492  }
493  };
494  SsPool<Ss_DUMP_STATE_ORD> c_ss_DUMP_STATE_ORD;
495  void execDUMP_STATE_ORD(Signal*);
496  void sendDUMP_STATE_ORD(Signal*, Uint32 ssId, SectionHandle*);
497 
498  // GSN_NDB_TAMPER
500  Uint32 m_errorInsert;
501  Ss_NDB_TAMPER() {
502  m_sendREQ = &LocalProxy::sendNDB_TAMPER;
503  m_sendCONF = 0;
504  }
505  enum { poolSize = 1 };
506  static SsPool<Ss_NDB_TAMPER>& pool(LocalProxy* proxy) {
507  return proxy->c_ss_NDB_TAMPER;
508  }
509  };
510  SsPool<Ss_NDB_TAMPER> c_ss_NDB_TAMPER;
511  void execNDB_TAMPER(Signal*);
512  void sendNDB_TAMPER(Signal*, Uint32 ssId, SectionHandle*);
513 
514  // GSN_TIME_SIGNAL
516  Ss_TIME_SIGNAL() {
517  m_sendREQ = &LocalProxy::sendTIME_SIGNAL;
518  m_sendCONF = 0;
519  }
520  enum { poolSize = 1 };
521  static SsPool<Ss_TIME_SIGNAL>& pool(LocalProxy* proxy) {
522  return proxy->c_ss_TIME_SIGNAL;
523  }
524  };
525  SsPool<Ss_TIME_SIGNAL> c_ss_TIME_SIGNAL;
526  void execTIME_SIGNAL(Signal*);
527  void sendTIME_SIGNAL(Signal*, Uint32 ssId, SectionHandle*);
528 
529  // GSN_CREATE_TRIG_IMPL_REQ
531  CreateTrigImplReq m_req;
533  m_sendREQ = &LocalProxy::sendCREATE_TRIG_IMPL_REQ;
534  m_sendCONF = &LocalProxy::sendCREATE_TRIG_IMPL_CONF;
535  }
536  enum { poolSize = 3 };
537  static SsPool<Ss_CREATE_TRIG_IMPL_REQ>& pool(LocalProxy* proxy) {
538  return proxy->c_ss_CREATE_TRIG_IMPL_REQ;
539  }
540  };
541  SsPool<Ss_CREATE_TRIG_IMPL_REQ> c_ss_CREATE_TRIG_IMPL_REQ;
542  void execCREATE_TRIG_IMPL_REQ(Signal*);
543  void sendCREATE_TRIG_IMPL_REQ(Signal*, Uint32 ssId, SectionHandle*);
544  void execCREATE_TRIG_IMPL_CONF(Signal*);
545  void execCREATE_TRIG_IMPL_REF(Signal*);
546  void sendCREATE_TRIG_IMPL_CONF(Signal*, Uint32 ssId);
547 
548  // GSN_DROP_TRIG_IMPL_REQ
550  DropTrigImplReq m_req;
552  m_sendREQ = &LocalProxy::sendDROP_TRIG_IMPL_REQ;
553  m_sendCONF = &LocalProxy::sendDROP_TRIG_IMPL_CONF;
554  }
555  enum { poolSize = 21 };
556  static SsPool<Ss_DROP_TRIG_IMPL_REQ>& pool(LocalProxy* proxy) {
557  return proxy->c_ss_DROP_TRIG_IMPL_REQ;
558  }
559  };
560  SsPool<Ss_DROP_TRIG_IMPL_REQ> c_ss_DROP_TRIG_IMPL_REQ;
561  void execDROP_TRIG_IMPL_REQ(Signal*);
562  void sendDROP_TRIG_IMPL_REQ(Signal*, Uint32 ssId, SectionHandle*);
563  void execDROP_TRIG_IMPL_CONF(Signal*);
564  void execDROP_TRIG_IMPL_REF(Signal*);
565  void sendDROP_TRIG_IMPL_CONF(Signal*, Uint32 ssId);
566 
567  // GSN_DBINFO_SCANREQ
568  bool find_next(Ndbinfo::ScanCursor* cursor) const;
569  void execDBINFO_SCANREQ(Signal*);
570  void execDBINFO_SCANCONF(Signal*);
571 
572  // GSN_SYNC_REQ
573  void execSYNC_REQ(Signal*);
574  void execSYNC_REF(Signal*);
575  void execSYNC_CONF(Signal*);
576  void sendSYNC_REQ(Signal*, Uint32 ssId, SectionHandle*);
577  void sendSYNC_CONF(Signal*, Uint32 ssId);
579  SyncReq m_req;
580  Ss_SYNC_REQ() {
581  m_sendREQ = &LocalProxy::sendSYNC_REQ;
582  m_sendCONF = &LocalProxy::sendSYNC_CONF;
583  }
584  enum { poolSize = 4 };
585  static SsPool<Ss_SYNC_REQ>& pool(LocalProxy* proxy) {
586  return proxy->c_ss_SYNC_REQ;
587  }
588  };
589  SsPool<Ss_SYNC_REQ> c_ss_SYNC_REQ;
590 
591  void execSYNC_PATH_REQ(Signal*);
592 };
593 
594 #endif