MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
DbtupProxy.cpp
1 /* Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
2 
3  This program is free software; you can redistribute it and/or modify
4  it under the terms of the GNU General Public License as published by
5  the Free Software Foundation; version 2 of the License.
6 
7  This program is distributed in the hope that it will be useful,
8  but WITHOUT ANY WARRANTY; without even the implied warranty of
9  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  GNU General Public License for more details.
11 
12  You should have received a copy of the GNU General Public License
13  along with this program; if not, write to the Free Software
14  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
15 
16 // can be removed if DBTUP continueB codes are moved to signaldata
17 #define DBTUP_C
18 
19 #include "DbtupProxy.hpp"
20 #include "Dbtup.hpp"
21 #include <pgman.hpp>
22 #include <signaldata/LgmanContinueB.hpp>
23 
24 #include <EventLogger.hpp>
25 extern EventLogger * g_eventLogger;
26 
27 DbtupProxy::DbtupProxy(Block_context& ctx) :
28  LocalProxy(DBTUP, ctx),
29  c_pgman(0),
30  c_tableRecSize(0),
31  c_tableRec(0)
32 {
33  // GSN_CREATE_TAB_REQ
34  addRecSignal(GSN_CREATE_TAB_REQ, &DbtupProxy::execCREATE_TAB_REQ);
35  addRecSignal(GSN_DROP_TAB_REQ, &DbtupProxy::execDROP_TAB_REQ);
36 
37  // GSN_BUILD_INDX_IMPL_REQ
38  addRecSignal(GSN_BUILD_INDX_IMPL_REQ, &DbtupProxy::execBUILD_INDX_IMPL_REQ);
39  addRecSignal(GSN_BUILD_INDX_IMPL_CONF, &DbtupProxy::execBUILD_INDX_IMPL_CONF);
40  addRecSignal(GSN_BUILD_INDX_IMPL_REF, &DbtupProxy::execBUILD_INDX_IMPL_REF);
41 }
42 
43 DbtupProxy::~DbtupProxy()
44 {
45 }
46 
48 DbtupProxy::newWorker(Uint32 instanceNo)
49 {
50  return new Dbtup(m_ctx, instanceNo);
51 }
52 
53 // GSN_READ_CONFIG_REQ
54 void
55 DbtupProxy::callREAD_CONFIG_REQ(Signal* signal)
56 {
57  const ReadConfigReq* req = (const ReadConfigReq*)signal->getDataPtr();
58  ndbrequire(req->noOfParameters == 0);
59 
61  m_ctx.m_config.getOwnConfigIterator();
62  ndbrequire(p != 0);
63 
64  ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_TUP_TABLE, &c_tableRecSize));
65  c_tableRec = (Uint8*)allocRecord("TableRec", sizeof(Uint8), c_tableRecSize);
66  D("proxy:" << V(c_tableRecSize));
67  Uint32 i;
68  for (i = 0; i < c_tableRecSize; i++)
69  c_tableRec[i] = 0;
70  backREAD_CONFIG_REQ(signal);
71 }
72 
73 // GSN_STTOR
74 
75 void
76 DbtupProxy::callSTTOR(Signal* signal)
77 {
78  Uint32 startPhase = signal->theData[1];
79  switch (startPhase) {
80  case 1:
81  c_pgman = (Pgman*)globalData.getBlock(PGMAN);
82  ndbrequire(c_pgman != 0);
83  break;
84  }
85  backSTTOR(signal);
86 }
87 
88 // GSN_CREATE_TAB_REQ
89 
90 void
91 DbtupProxy::execCREATE_TAB_REQ(Signal* signal)
92 {
93  const CreateTabReq* req = (const CreateTabReq*)signal->getDataPtr();
94  const Uint32 tableId = req->tableId;
95  ndbrequire(tableId < c_tableRecSize);
96  ndbrequire(c_tableRec[tableId] == 0);
97  c_tableRec[tableId] = 1;
98  D("proxy: created table" << V(tableId));
99 }
100 
101 void
102 DbtupProxy::execDROP_TAB_REQ(Signal* signal)
103 {
104  const DropTabReq* req = (const DropTabReq*)signal->getDataPtr();
105  const Uint32 tableId = req->tableId;
106  ndbrequire(tableId < c_tableRecSize);
107  c_tableRec[tableId] = 0;
108  D("proxy: dropped table" << V(tableId));
109 }
110 
111 // GSN_BUILD_INDX_IMPL_REQ
112 
113 void
114 DbtupProxy::execBUILD_INDX_IMPL_REQ(Signal* signal)
115 {
116  const BuildIndxImplReq* req = (const BuildIndxImplReq*)signal->getDataPtr();
117  Ss_BUILD_INDX_IMPL_REQ& ss = ssSeize<Ss_BUILD_INDX_IMPL_REQ>();
118  ss.m_req = *req;
119  ndbrequire(signal->getLength() == BuildIndxImplReq::SignalLength);
120  sendREQ(signal, ss);
121 }
122 
123 void
124 DbtupProxy::sendBUILD_INDX_IMPL_REQ(Signal* signal, Uint32 ssId,
125  SectionHandle* handle)
126 {
127  Ss_BUILD_INDX_IMPL_REQ& ss = ssFind<Ss_BUILD_INDX_IMPL_REQ>(ssId);
128 
129  BuildIndxImplReq* req = (BuildIndxImplReq*)signal->getDataPtrSend();
130  *req = ss.m_req;
131  req->senderRef = reference();
132  req->senderData = ssId;
133  sendSignalNoRelease(workerRef(ss.m_worker), GSN_BUILD_INDX_IMPL_REQ,
134  signal, BuildIndxImplReq::SignalLength, JBB, handle);
135 }
136 
137 void
138 DbtupProxy::execBUILD_INDX_IMPL_CONF(Signal* signal)
139 {
140  const BuildIndxImplConf* conf = (const BuildIndxImplConf*)signal->getDataPtr();
141  Uint32 ssId = conf->senderData;
142  Ss_BUILD_INDX_IMPL_REQ& ss = ssFind<Ss_BUILD_INDX_IMPL_REQ>(ssId);
143  recvCONF(signal, ss);
144 }
145 
146 void
147 DbtupProxy::execBUILD_INDX_IMPL_REF(Signal* signal)
148 {
149  const BuildIndxImplRef* ref = (const BuildIndxImplRef*)signal->getDataPtr();
150  Uint32 ssId = ref->senderData;
151  Ss_BUILD_INDX_IMPL_REQ& ss = ssFind<Ss_BUILD_INDX_IMPL_REQ>(ssId);
152  recvREF(signal, ss, ref->errorCode);
153 }
154 
155 void
156 DbtupProxy::sendBUILD_INDX_IMPL_CONF(Signal* signal, Uint32 ssId)
157 {
158  Ss_BUILD_INDX_IMPL_REQ& ss = ssFind<Ss_BUILD_INDX_IMPL_REQ>(ssId);
159  BlockReference dictRef = ss.m_req.senderRef;
160 
161  if (!lastReply(ss))
162  return;
163 
164  if (ss.m_error == 0) {
165  jam();
166  BuildIndxImplConf* conf = (BuildIndxImplConf*)signal->getDataPtrSend();
167  conf->senderRef = reference();
168  conf->senderData = ss.m_req.senderData;
169  sendSignal(dictRef, GSN_BUILD_INDX_IMPL_CONF,
170  signal, BuildIndxImplConf::SignalLength, JBB);
171  } else {
172  BuildIndxImplRef* ref = (BuildIndxImplRef*)signal->getDataPtrSend();
173  ref->senderRef = reference();
174  ref->senderData = ss.m_req.senderData;
175  ref->errorCode = ss.m_error;
176  sendSignal(dictRef, GSN_BUILD_INDX_IMPL_REF,
177  signal, BuildIndxImplRef::SignalLength, JBB);
178  }
179 
180  ssRelease<Ss_BUILD_INDX_IMPL_REQ>(ssId);
181 }
182 
183 // client methods
184 
185 // LGMAN
186 
187 DbtupProxy::Proxy_undo::Proxy_undo()
188 {
189  m_type = 0;
190  m_len = 0;
191  m_ptr = 0;
192  m_lsn = (Uint64)0;
193  m_key.setNull();
194  m_page_id = ~(Uint32)0;
195  m_table_id = ~(Uint32)0;
196  m_fragment_id = ~(Uint32)0;
197  m_instance_no = ~(Uint32)0;
198  m_actions = 0;
199  m_in_use = false;
200 }
201 
202 void
203 DbtupProxy::disk_restart_undo(Signal* signal, Uint64 lsn,
204  Uint32 type, const Uint32 * ptr, Uint32 len)
205 {
206  Proxy_undo& undo = c_proxy_undo;
207  ndbrequire(!undo.m_in_use);
208  new (&undo) Proxy_undo;
209  undo.m_in_use = true;
210 
211  D("proxy: disk_restart_undo" << V(type) << hex << V(ptr) << dec << V(len) << V(lsn));
212  undo.m_type = type;
213  undo.m_len = len;
214  undo.m_ptr = ptr;
215  undo.m_lsn = lsn;
216 
217  /*
218  * The timeslice via PGMAN/5 gives LGMAN a chance to overwrite the
219  * data pointed to by ptr. So save it now and do not use ptr.
220  */
221  ndbrequire(undo.m_len <= Proxy_undo::MaxData);
222  memcpy(undo.m_data, undo.m_ptr, undo.m_len << 2);
223 
224  switch (undo.m_type) {
225  case File_formats::Undofile::UNDO_LCP_FIRST:
226  case File_formats::Undofile::UNDO_LCP:
227  {
228  undo.m_table_id = ptr[1] >> 16;
229  undo.m_fragment_id = ptr[1] & 0xFFFF;
230  undo.m_actions |= Proxy_undo::SendToAll;
231  undo.m_actions |= Proxy_undo::SendUndoNext;
232  }
233  break;
234  case File_formats::Undofile::UNDO_TUP_ALLOC:
235  {
237  (const Dbtup::Disk_undo::Alloc*)ptr;
238  undo.m_key.m_file_no = rec->m_file_no_page_idx >> 16;
239  undo.m_key.m_page_no = rec->m_page_no;
240  undo.m_key.m_page_idx = rec->m_file_no_page_idx & 0xFFFF;
241  undo.m_actions |= Proxy_undo::ReadTupPage;
242  undo.m_actions |= Proxy_undo::GetInstance;
243  }
244  break;
245  case File_formats::Undofile::UNDO_TUP_UPDATE:
246  {
248  (const Dbtup::Disk_undo::Update*)ptr;
249  undo.m_key.m_file_no = rec->m_file_no_page_idx >> 16;
250  undo.m_key.m_page_no = rec->m_page_no;
251  undo.m_key.m_page_idx = rec->m_file_no_page_idx & 0xFFFF;
252  undo.m_actions |= Proxy_undo::ReadTupPage;
253  undo.m_actions |= Proxy_undo::GetInstance;
254  }
255  break;
256  case File_formats::Undofile::UNDO_TUP_FREE:
257  {
258  const Dbtup::Disk_undo::Free* rec =
259  (const Dbtup::Disk_undo::Free*)ptr;
260  undo.m_key.m_file_no = rec->m_file_no_page_idx >> 16;
261  undo.m_key.m_page_no = rec->m_page_no;
262  undo.m_key.m_page_idx = rec->m_file_no_page_idx & 0xFFFF;
263  undo.m_actions |= Proxy_undo::ReadTupPage;
264  undo.m_actions |= Proxy_undo::GetInstance;
265  }
266  break;
267 
268  case File_formats::Undofile::UNDO_TUP_CREATE:
269  {
270  jam();
272  Uint32 tableId = rec->m_table;
273  if (tableId < c_tableRecSize)
274  {
275  jam();
276  c_tableRec[tableId] = 0;
277  }
278 
279  undo.m_actions |= Proxy_undo::SendToAll;
280  undo.m_actions |= Proxy_undo::SendUndoNext;
281  break;
282  }
283  case File_formats::Undofile::UNDO_TUP_DROP:
284  {
285  jam();
287  Uint32 tableId = rec->m_table;
288  if (tableId < c_tableRecSize)
289  {
290  jam();
291  c_tableRec[tableId] = 0;
292  }
293 
294  undo.m_actions |= Proxy_undo::SendToAll;
295  undo.m_actions |= Proxy_undo::SendUndoNext;
296  break;
297  }
298 #if NOT_YET_UNDO_ALLOC_EXTENT
299  case File_formats::Undofile::UNDO_TUP_ALLOC_EXTENT:
300  ndbrequire(false);
301  break;
302 #endif
303 #if NOT_YET_UNDO_FREE_EXTENT
304  case File_formats::Undofile::UNDO_TUP_FREE_EXTENT:
305  ndbrequire(false);
306  break;
307 #endif
308  case File_formats::Undofile::UNDO_END:
309  {
310  undo.m_actions |= Proxy_undo::SendToAll;
311  }
312  break;
313  default:
314  ndbrequire(false);
315  break;
316  }
317 
318  if (undo.m_actions & Proxy_undo::ReadTupPage) {
319  jam();
320  /*
321  * Page request goes to the extra PGMAN worker (our thread).
322  * TUP worker reads same page again via another PGMAN worker.
323  * MT-LGMAN is planned, do not optimize (pass page) now
324  */
325  Page_cache_client pgman(this, c_pgman);
327 
328  req.m_page = undo.m_key;
329  req.m_callback.m_callbackData = 0;
330  req.m_callback.m_callbackFunction =
331  safe_cast(&DbtupProxy::disk_restart_undo_callback);
332 
333  int ret = pgman.get_page(signal, req, 0);
334  ndbrequire(ret >= 0);
335  if (ret > 0) {
336  jam();
337  execute(signal, req.m_callback, (Uint32)ret);
338  }
339  return;
340  }
341 
342  disk_restart_undo_finish(signal);
343 }
344 
345 void
346 DbtupProxy::disk_restart_undo_callback(Signal* signal, Uint32, Uint32 page_id)
347 {
348  Proxy_undo& undo = c_proxy_undo;
349  undo.m_page_id = page_id;
350 
351  Ptr<GlobalPage> gptr;
352  m_global_page_pool.getPtr(gptr, undo.m_page_id);
353 
354  ndbrequire(undo.m_actions & Proxy_undo::ReadTupPage);
355  {
356  jam();
357  const Tup_page* page = (const Tup_page*)gptr.p;
358  const File_formats::Page_header& header = page->m_page_header;
359  const Uint32 page_type = header.m_page_type;
360 
361  if (page_type == 0) { // wl4391_todo ?
362  jam();
363  ndbrequire(header.m_page_lsn_hi == 0 && header.m_page_lsn_lo == 0);
364  undo.m_actions |= Proxy_undo::NoExecute;
365  undo.m_actions |= Proxy_undo::SendUndoNext;
366  D("proxy: callback" << V(page_type));
367  } else {
368  ndbrequire(page_type == File_formats::PT_Tup_fixsize_page ||
369  page_type == File_formats::PT_Tup_varsize_page);
370 
371  Uint64 page_lsn = (Uint64(header.m_page_lsn_hi) << 32) + header.m_page_lsn_lo;
372  if (! (undo.m_lsn <= page_lsn))
373  {
374  jam();
375  undo.m_actions |= Proxy_undo::NoExecute;
376  undo.m_actions |= Proxy_undo::SendUndoNext;
377  }
378 
379  undo.m_table_id = page->m_table_id;
380  undo.m_fragment_id = page->m_fragment_id;
381  D("proxy: callback" << V(undo.m_table_id) << V(undo.m_fragment_id));
382  const Uint32 tableId = undo.m_table_id;
383  if (tableId >= c_tableRecSize || c_tableRec[tableId] == 0) {
384  D("proxy: table dropped" << V(tableId));
385  undo.m_actions |= Proxy_undo::NoExecute;
386  undo.m_actions |= Proxy_undo::SendUndoNext;
387  }
388  }
389  }
390 
391  disk_restart_undo_finish(signal);
392 }
393 
394 void
395 DbtupProxy::disk_restart_undo_finish(Signal* signal)
396 {
397  Proxy_undo& undo = c_proxy_undo;
398 
399  if (undo.m_actions & Proxy_undo::SendUndoNext) {
400  jam();
401  signal->theData[0] = LgmanContinueB::EXECUTE_UNDO_RECORD;
402  sendSignal(LGMAN_REF, GSN_CONTINUEB, signal, 1, JBB);
403  }
404 
405  if (undo.m_actions & Proxy_undo::NoExecute) {
406  jam();
407  goto finish;
408  }
409 
410  if (undo.m_actions & Proxy_undo::GetInstance) {
411  jam();
412  Uint32 instanceKey = getInstanceKey(undo.m_table_id, undo.m_fragment_id);
413  Uint32 instanceNo = getInstanceFromKey(instanceKey);
414  undo.m_instance_no = instanceNo;
415  }
416 
417  if (!(undo.m_actions & Proxy_undo::SendToAll)) {
418  jam();
419  ndbrequire(undo.m_instance_no != 0);
420  Uint32 i = undo.m_instance_no - 1;
421  disk_restart_undo_send(signal, i);
422  } else {
423  jam();
424  Uint32 i;
425  for (i = 0; i < c_workers; i++) {
426  disk_restart_undo_send(signal, i);
427  }
428  }
429 
430 finish:
431  ndbrequire(undo.m_in_use);
432  undo.m_in_use = false;
433 }
434 
435 void
436 DbtupProxy::disk_restart_undo_send(Signal* signal, Uint32 i)
437 {
438  /*
439  * Send undo entry via long signal because:
440  * 1) a method call would execute in another non-mutexed Pgman
441  * 2) MT-LGMAN is planned, do not optimize (pass pointer) now
442  */
443  Proxy_undo& undo = c_proxy_undo;
444 
445  LinearSectionPtr ptr[3];
446  ptr[0].p = undo.m_data;
447  ptr[0].sz = undo.m_len;
448 
449  signal->theData[0] = ZDISK_RESTART_UNDO;
450  signal->theData[1] = undo.m_type;
451  signal->theData[2] = undo.m_len;
452  signal->theData[3] = (Uint32)(undo.m_lsn >> 32);
453  signal->theData[4] = (Uint32)(undo.m_lsn & 0xFFFFFFFF);
454  sendSignal(workerRef(i), GSN_CONTINUEB, signal, 5, JBB, ptr, 1);
455 }
456 
457 // TSMAN
458 
459 int
460 DbtupProxy::disk_restart_alloc_extent(Uint32 tableId, Uint32 fragId,
461  const Local_key* key, Uint32 pages)
462 {
463  if (tableId >= c_tableRecSize || c_tableRec[tableId] == 0) {
464  jam();
465  D("proxy: table dropped" << V(tableId));
466  return -1;
467  }
468 
469  // local call so mapping instance key to number is ok
470  Uint32 instanceKey = getInstanceKey(tableId, fragId);
471  Uint32 instanceNo = getInstanceFromKey(instanceKey);
472 
473  Uint32 i = workerIndex(instanceNo);
474  Dbtup* dbtup = (Dbtup*)workerBlock(i);
475  return dbtup->disk_restart_alloc_extent(tableId, fragId, key, pages);
476 }
477 
478 void
479 DbtupProxy::disk_restart_page_bits(Uint32 tableId, Uint32 fragId,
480  const Local_key* key, Uint32 bits)
481 {
482  ndbrequire(tableId < c_tableRecSize && c_tableRec[tableId] == 1);
483 
484  // local call so mapping instance key to number is ok
485  Uint32 instanceKey = getInstanceKey(tableId, fragId);
486  Uint32 instanceNo = getInstanceFromKey(instanceKey);
487 
488  Uint32 i = workerIndex(instanceNo);
489  Dbtup* dbtup = (Dbtup*)workerBlock(i);
490  dbtup->disk_restart_page_bits(tableId, fragId, key, bits);
491 }
492 
493 BLOCK_FUNCTIONS(DbtupProxy)