MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
lgman.cpp
1 /*
2  Copyright (c) 2005, 2011, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 #include "lgman.hpp"
19 #include "diskpage.hpp"
20 #include <signaldata/FsRef.hpp>
21 #include <signaldata/FsConf.hpp>
22 #include <signaldata/FsOpenReq.hpp>
23 #include <signaldata/FsCloseReq.hpp>
24 #include <signaldata/CreateFilegroupImpl.hpp>
25 #include <signaldata/DropFilegroupImpl.hpp>
26 #include <signaldata/FsReadWriteReq.hpp>
27 #include <signaldata/LCP.hpp>
28 #include <signaldata/SumaImpl.hpp>
29 #include <signaldata/LgmanContinueB.hpp>
30 #include <signaldata/GetTabInfo.hpp>
31 #include <signaldata/NodeFailRep.hpp>
32 #include <signaldata/DbinfoScan.hpp>
33 #include "dbtup/Dbtup.hpp"
34 
35 #include <EventLogger.hpp>
36 extern EventLogger * g_eventLogger;
37 
38 #include <record_types.hpp>
39 
54 #define DEBUG_UNDO_EXECUTION 0
55 #define DEBUG_SEARCH_LOG_HEAD 0
56 
57 #define FREE_BUFFER_MARGIN (2 * File_formats::UNDO_PAGE_WORDS)
58 
59 Lgman::Lgman(Block_context & ctx) :
60  SimulatedBlock(LGMAN, ctx),
61  m_tup(0),
62  m_logfile_group_list(m_logfile_group_pool),
63  m_logfile_group_hash(m_logfile_group_pool),
64  m_client_mutex("lgman-client", 2, true)
65 {
66  BLOCK_CONSTRUCTOR(Lgman);
67 
68  // Add received signals
69  addRecSignal(GSN_STTOR, &Lgman::execSTTOR);
70  addRecSignal(GSN_READ_CONFIG_REQ, &Lgman::execREAD_CONFIG_REQ);
71  addRecSignal(GSN_DUMP_STATE_ORD, &Lgman::execDUMP_STATE_ORD);
72  addRecSignal(GSN_DBINFO_SCANREQ, &Lgman::execDBINFO_SCANREQ);
73  addRecSignal(GSN_CONTINUEB, &Lgman::execCONTINUEB);
74  addRecSignal(GSN_NODE_FAILREP, &Lgman::execNODE_FAILREP);
75 
76  addRecSignal(GSN_CREATE_FILE_IMPL_REQ, &Lgman::execCREATE_FILE_IMPL_REQ);
77  addRecSignal(GSN_CREATE_FILEGROUP_IMPL_REQ,
78  &Lgman::execCREATE_FILEGROUP_IMPL_REQ);
79 
80  addRecSignal(GSN_DROP_FILE_IMPL_REQ, &Lgman::execDROP_FILE_IMPL_REQ);
81  addRecSignal(GSN_DROP_FILEGROUP_IMPL_REQ,
82  &Lgman::execDROP_FILEGROUP_IMPL_REQ);
83 
84  addRecSignal(GSN_FSWRITEREQ, &Lgman::execFSWRITEREQ);
85  addRecSignal(GSN_FSWRITEREF, &Lgman::execFSWRITEREF, true);
86  addRecSignal(GSN_FSWRITECONF, &Lgman::execFSWRITECONF);
87 
88  addRecSignal(GSN_FSOPENREF, &Lgman::execFSOPENREF, true);
89  addRecSignal(GSN_FSOPENCONF, &Lgman::execFSOPENCONF);
90 
91  addRecSignal(GSN_FSCLOSECONF, &Lgman::execFSCLOSECONF);
92 
93  addRecSignal(GSN_FSREADREF, &Lgman::execFSREADREF, true);
94  addRecSignal(GSN_FSREADCONF, &Lgman::execFSREADCONF);
95 
96  addRecSignal(GSN_LCP_FRAG_ORD, &Lgman::execLCP_FRAG_ORD);
97  addRecSignal(GSN_END_LCP_REQ, &Lgman::execEND_LCP_REQ);
98  addRecSignal(GSN_SUB_GCP_COMPLETE_REP, &Lgman::execSUB_GCP_COMPLETE_REP);
99  addRecSignal(GSN_START_RECREQ, &Lgman::execSTART_RECREQ);
100 
101  addRecSignal(GSN_END_LCP_CONF, &Lgman::execEND_LCP_CONF);
102 
103  addRecSignal(GSN_GET_TABINFOREQ, &Lgman::execGET_TABINFOREQ);
104 
105  m_last_lsn = 1;
106  m_logfile_group_hash.setSize(10);
107 
108  if (isNdbMtLqh()) {
109  jam();
110  int ret = m_client_mutex.create();
111  ndbrequire(ret == 0);
112  }
113 
114  {
115  CallbackEntry& ce = m_callbackEntry[THE_NULL_CALLBACK];
116  ce.m_function = TheNULLCallback.m_callbackFunction;
117  ce.m_flags = 0;
118  }
119  {
120  CallbackEntry& ce = m_callbackEntry[ENDLCP_CALLBACK];
121  ce.m_function = safe_cast(&Lgman::endlcp_callback);
122  ce.m_flags = 0;
123  }
124  {
125  CallbackTable& ct = m_callbackTable;
126  ct.m_count = COUNT_CALLBACKS;
127  ct.m_entry = m_callbackEntry;
128  m_callbackTableAddr = &ct;
129  }
130 }
131 
132 Lgman::~Lgman()
133 {
134  if (isNdbMtLqh()) {
135  (void)m_client_mutex.destroy();
136  }
137 }
138 
139 void
140 Lgman::client_lock(BlockNumber block, int line)
141 {
142  if (isNdbMtLqh()) {
143 #ifdef VM_TRACE
144  Uint32 bno = blockToMain(block);
145  Uint32 ino = blockToInstance(block);
146 #endif
147  D("try lock " << bno << "/" << ino << V(line));
148  int ret = m_client_mutex.lock();
149  ndbrequire(ret == 0);
150  D("got lock " << bno << "/" << ino << V(line));
151  }
152 }
153 
154 void
155 Lgman::client_unlock(BlockNumber block, int line)
156 {
157  if (isNdbMtLqh()) {
158 #ifdef VM_TRACE
159  Uint32 bno = blockToMain(block);
160  Uint32 ino = blockToInstance(block);
161 #endif
162  D("unlock " << bno << "/" << ino << V(line));
163  int ret = m_client_mutex.unlock();
164  ndbrequire(ret == 0);
165  }
166 }
167 
168 BLOCK_FUNCTIONS(Lgman)
169 
170 void
171 Lgman::execREAD_CONFIG_REQ(Signal* signal)
172 {
173  jamEntry();
174 
175  const ReadConfigReq * req = (ReadConfigReq*)signal->getDataPtr();
176 
177  Uint32 ref = req->senderRef;
178  Uint32 senderData = req->senderData;
179 
180  const ndb_mgm_configuration_iterator * p =
181  m_ctx.m_config.getOwnConfigIterator();
182  ndbrequire(p != 0);
183 
184  Pool_context pc;
185  pc.m_block = this;
186  m_log_waiter_pool.wo_pool_init(RT_LGMAN_LOG_WAITER, pc);
187  m_file_pool.init(RT_LGMAN_FILE, pc);
188  m_logfile_group_pool.init(RT_LGMAN_FILEGROUP, pc);
189  // 10 -> 150M
190  m_data_buffer_pool.setSize(40);
191 
192  ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend();
193  conf->senderRef = reference();
194  conf->senderData = senderData;
195  sendSignal(ref, GSN_READ_CONFIG_CONF, signal,
196  ReadConfigConf::SignalLength, JBB);
197 }
198 
199 void
200 Lgman::execSTTOR(Signal* signal)
201 {
202  jamEntry();
203  Uint32 startPhase = signal->theData[1];
204  switch (startPhase) {
205  case 1:
206  m_tup = globalData.getBlock(DBTUP);
207  ndbrequire(m_tup != 0);
208  break;
209  }
210  sendSTTORRY(signal);
211 }
212 
213 void
214 Lgman::sendSTTORRY(Signal* signal)
215 {
216  signal->theData[0] = 0;
217  signal->theData[3] = 1;
218  signal->theData[4] = 2;
219  signal->theData[5] = 3;
220  signal->theData[6] = 4;
221  signal->theData[7] = 5;
222  signal->theData[8] = 6;
223  signal->theData[9] = 255; // No more start phases from missra
224  sendSignal(NDBCNTR_REF, GSN_STTORRY, signal, 10, JBB);
225 }
226 
227 void
228 Lgman::execCONTINUEB(Signal* signal){
229  jamEntry();
230 
231  Uint32 type= signal->theData[0];
232  Uint32 ptrI = signal->theData[1];
233  client_lock(number(), __LINE__);
234  switch(type){
235  case LgmanContinueB::FILTER_LOG:
236  jam();
237  break;
238  case LgmanContinueB::CUT_LOG_TAIL:
239  {
240  jam();
241  Ptr<Logfile_group> ptr;
242  m_logfile_group_pool.getPtr(ptr, ptrI);
243  cut_log_tail(signal, ptr);
244  break;
245  }
246  case LgmanContinueB::FLUSH_LOG:
247  {
248  jam();
249  Ptr<Logfile_group> ptr;
250  m_logfile_group_pool.getPtr(ptr, ptrI);
251  flush_log(signal, ptr, signal->theData[2]);
252  break;
253  }
254  case LgmanContinueB::PROCESS_LOG_BUFFER_WAITERS:
255  {
256  jam();
257  Ptr<Logfile_group> ptr;
258  m_logfile_group_pool.getPtr(ptr, ptrI);
259  process_log_buffer_waiters(signal, ptr);
260  break;
261  }
262  case LgmanContinueB::FIND_LOG_HEAD:
263  jam();
264  Ptr<Logfile_group> ptr;
265  if(ptrI != RNIL)
266  {
267  jam();
268  m_logfile_group_pool.getPtr(ptr, ptrI);
269  find_log_head(signal, ptr);
270  }
271  else
272  {
273  jam();
274  init_run_undo_log(signal);
275  }
276  break;
277  case LgmanContinueB::EXECUTE_UNDO_RECORD:
278  jam();
279  execute_undo_record(signal);
280  break;
281  case LgmanContinueB::STOP_UNDO_LOG:
282  jam();
283  stop_run_undo_log(signal);
284  break;
285  case LgmanContinueB::READ_UNDO_LOG:
286  {
287  jam();
288  Ptr<Logfile_group> ptr;
289  m_logfile_group_pool.getPtr(ptr, ptrI);
290  read_undo_log(signal, ptr);
291  break;
292  }
293  case LgmanContinueB::PROCESS_LOG_SYNC_WAITERS:
294  {
295  jam();
296  Ptr<Logfile_group> ptr;
297  m_logfile_group_pool.getPtr(ptr, ptrI);
298  process_log_sync_waiters(signal, ptr);
299  break;
300  }
301  case LgmanContinueB::FORCE_LOG_SYNC:
302  {
303  jam();
304  Ptr<Logfile_group> ptr;
305  m_logfile_group_pool.getPtr(ptr, ptrI);
306  force_log_sync(signal, ptr, signal->theData[2], signal->theData[3]);
307  break;
308  }
309  case LgmanContinueB::DROP_FILEGROUP:
310  {
311  jam();
312  Ptr<Logfile_group> ptr;
313  m_logfile_group_pool.getPtr(ptr, ptrI);
314  if ((ptr.p->m_state & Logfile_group::LG_THREAD_MASK) ||
315  ptr.p->m_outstanding_fs > 0)
316  {
317  jam();
318  sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 100,
319  signal->length());
320  break;
321  }
322  Uint32 ref = signal->theData[2];
323  Uint32 data = signal->theData[3];
324  drop_filegroup_drop_files(signal, ptr, ref, data);
325  break;
326  }
327  }
328  client_unlock(number(), __LINE__);
329 }
330 
331 void
332 Lgman::execNODE_FAILREP(Signal* signal)
333 {
334  jamEntry();
335  const NodeFailRep * rep = (NodeFailRep*)signal->getDataPtr();
336  NdbNodeBitmask failed;
337  failed.assign(NdbNodeBitmask::Size, rep->theNodes);
338 
339  /* Block level cleanup */
340  for(unsigned i = 1; i < MAX_NDB_NODES; i++) {
341  jam();
342  if(failed.get(i)) {
343  jam();
344  Uint32 elementsCleaned = simBlockNodeFailure(signal, i); // No callback
345  ndbassert(elementsCleaned == 0); // No distributed fragmented signals
346  (void) elementsCleaned; // Remove compiler warning
347  }//if
348  }//for
349 }
350 
351 void
352 Lgman::execDUMP_STATE_ORD(Signal* signal){
353  jamEntry();
354  if (signal->theData[0] == 12001 || signal->theData[0] == 12002)
355  {
356  char tmp[1024];
357  Ptr<Logfile_group> ptr;
358  m_logfile_group_list.first(ptr);
359  while(!ptr.isNull())
360  {
361  BaseString::snprintf(tmp, sizeof(tmp),
362  "lfg %u state: %x fs: %u lsn "
363  " [ last: %llu s(req): %llu s:ed: %llu lcp: %llu ] "
364  " waiters: %d %d",
365  ptr.p->m_logfile_group_id, ptr.p->m_state,
366  ptr.p->m_outstanding_fs,
367  ptr.p->m_last_lsn, ptr.p->m_last_sync_req_lsn,
368  ptr.p->m_last_synced_lsn, ptr.p->m_last_lcp_lsn,
369  !ptr.p->m_log_buffer_waiters.isEmpty(),
370  !ptr.p->m_log_sync_waiters.isEmpty());
371  if (signal->theData[0] == 12001)
372  infoEvent("%s", tmp);
373  ndbout_c("%s", tmp);
374 
375  BaseString::snprintf(tmp, sizeof(tmp),
376  " callback_buffer_words: %u"
377  " free_buffer_words: %u free_file_words: %llu",
378  ptr.p->m_callback_buffer_words,
379  ptr.p->m_free_buffer_words,
380  ptr.p->m_free_file_words);
381  if (signal->theData[0] == 12001)
382  infoEvent("%s", tmp);
383  ndbout_c("%s", tmp);
384  if (!ptr.p->m_log_buffer_waiters.isEmpty())
385  {
386  Ptr<Log_waiter> waiter;
387  Local_log_waiter_list
388  list(m_log_waiter_pool, ptr.p->m_log_buffer_waiters);
389  list.first(waiter);
390  BaseString::snprintf(tmp, sizeof(tmp),
391  " head(waiters).sz: %u %u",
392  waiter.p->m_size,
393  FREE_BUFFER_MARGIN);
394  if (signal->theData[0] == 12001)
395  infoEvent("%s", tmp);
396  ndbout_c("%s", tmp);
397  }
398  if (!ptr.p->m_log_sync_waiters.isEmpty())
399  {
400  Ptr<Log_waiter> waiter;
401  Local_log_waiter_list
402  list(m_log_waiter_pool, ptr.p->m_log_sync_waiters);
403  list.first(waiter);
404  BaseString::snprintf(tmp, sizeof(tmp),
405  " m_last_synced_lsn: %llu head(waiters %x).m_sync_lsn: %llu",
406  ptr.p->m_last_synced_lsn,
407  waiter.i,
408  waiter.p->m_sync_lsn);
409  if (signal->theData[0] == 12001)
410  infoEvent("%s", tmp);
411  ndbout_c("%s", tmp);
412 
413  while(!waiter.isNull())
414  {
415  ndbout_c("ptr: %x %p lsn: %llu next: %x",
416  waiter.i, waiter.p, waiter.p->m_sync_lsn, waiter.p->nextList);
417  list.next(waiter);
418  }
419  }
420  m_logfile_group_list.next(ptr);
421  }
422  }
423  if (signal->theData[0] == 12003)
424  {
425  bool crash = false;
426  Ptr<Logfile_group> ptr;
427  for (m_logfile_group_list.first(ptr); !ptr.isNull();
428  m_logfile_group_list.next(ptr))
429  {
430  if (ptr.p->m_callback_buffer_words != 0)
431  {
432  crash = true;
433  break;
434  }
435  }
436 
437  if (crash)
438  {
439  ndbout_c("Detected logfile-group with non zero m_callback_buffer_words");
440  signal->theData[0] = 12002;
441  execDUMP_STATE_ORD(signal);
442  ndbrequire(false);
443  }
444 #ifdef VM_TRACE
445  else
446  {
447  ndbout_c("Check for non zero m_callback_buffer_words OK!");
448  }
449 #endif
450  }
451 }
452 
453 void
454 Lgman::execDBINFO_SCANREQ(Signal *signal)
455 {
456  DbinfoScanReq req= *(DbinfoScanReq*)signal->theData;
457  const Ndbinfo::ScanCursor* cursor =
458  CAST_CONSTPTR(Ndbinfo::ScanCursor, DbinfoScan::getCursorPtr(&req));
460 
461  jamEntry();
462 
463  switch(req.tableId) {
464  case Ndbinfo::LOGSPACES_TABLEID:
465  {
466  jam();
467  Uint32 startBucket = cursor->data[0];
468  Logfile_group_hash_iterator iter;
469  m_logfile_group_hash.next(startBucket, iter);
470 
471  while (!iter.curr.isNull())
472  {
473  jam();
474 
475  Uint32 currentBucket = iter.bucket;
476  Ptr<Logfile_group> ptr = iter.curr;
477 
478  Uint64 free = ptr.p->m_free_file_words*4;
479 
480  Uint64 total = 0;
481  Local_undofile_list list(m_file_pool, ptr.p->m_files);
482  Ptr<Undofile> filePtr;
483  for (list.first(filePtr); !filePtr.isNull(); list.next(filePtr))
484  {
485  jam();
486  total += (Uint64)filePtr.p->m_file_size *
487  (Uint64)File_formats::NDB_PAGE_SIZE;
488  }
489 
490  Uint64 high = 0; // TODO
491 
492  Ndbinfo::Row row(signal, req);
493  row.write_uint32(getOwnNodeId());
494  row.write_uint32(1); // log type, 1 = DD-UNDO
495  row.write_uint32(ptr.p->m_logfile_group_id); // log id
496  row.write_uint32(0); // log part
497 
498  row.write_uint64(total); // total allocated
499  row.write_uint64((total-free)); // currently in use
500  row.write_uint64(high); // in use high water mark
501  ndbinfo_send_row(signal, req, row, rl);
502 
503  // move to next
504  if (m_logfile_group_hash.next(iter) == false)
505  {
506  jam(); // no more...
507  break;
508  }
509  else if (iter.bucket == currentBucket)
510  {
511  jam();
512  continue; // we need to iterate an entire bucket
513  }
514  else if (rl.need_break(req))
515  {
516  jam();
517  ndbinfo_send_scan_break(signal, req, rl, iter.bucket);
518  return;
519  }
520  }
521  break;
522  }
523 
524  case Ndbinfo::LOGBUFFERS_TABLEID:
525  {
526  jam();
527  Uint32 startBucket = cursor->data[0];
528  Logfile_group_hash_iterator iter;
529  m_logfile_group_hash.next(startBucket, iter);
530 
531  while (!iter.curr.isNull())
532  {
533  jam();
534 
535  Uint32 currentBucket = iter.bucket;
536  Ptr<Logfile_group> ptr = iter.curr;
537 
538  Uint64 free = ptr.p->m_free_buffer_words*4;
539  Uint64 total = ptr.p->m_total_buffer_words*4;
540  Uint64 high = 0; // TODO
541 
542  Ndbinfo::Row row(signal, req);
543  row.write_uint32(getOwnNodeId());
544  row.write_uint32(1); // log type, 1 = DD-UNDO
545  row.write_uint32(ptr.p->m_logfile_group_id); // log id
546  row.write_uint32(0); // log part
547 
548  row.write_uint64(total); // total allocated
549  row.write_uint64((total-free)); // currently in use
550  row.write_uint64(high); // in use high water mark
551  ndbinfo_send_row(signal, req, row, rl);
552 
553  // move to next
554  if (m_logfile_group_hash.next(iter) == false)
555  {
556  jam(); // no more...
557  break;
558  }
559  else if (iter.bucket == currentBucket)
560  {
561  jam();
562  continue; // we need to iterate an entire bucket
563  }
564  else if (rl.need_break(req))
565  {
566  jam();
567  ndbinfo_send_scan_break(signal, req, rl, iter.bucket);
568  return;
569  }
570  }
571  break;
572  }
573 
574  default:
575  break;
576  }
577 
578  ndbinfo_send_scan_conf(signal, req, rl);
579 }
580 
581 void
582 Lgman::execCREATE_FILEGROUP_IMPL_REQ(Signal* signal){
583  jamEntry();
584  CreateFilegroupImplReq* req= (CreateFilegroupImplReq*)signal->getDataPtr();
585 
586  Uint32 senderRef = req->senderRef;
587  Uint32 senderData = req->senderData;
588 
589  Ptr<Logfile_group> ptr;
590  CreateFilegroupImplRef::ErrorCode err = CreateFilegroupImplRef::NoError;
591  do {
592  if (m_logfile_group_hash.find(ptr, req->filegroup_id))
593  {
594  jam();
595  err = CreateFilegroupImplRef::FilegroupAlreadyExists;
596  break;
597  }
598 
599  if (!m_logfile_group_list.isEmpty())
600  {
601  jam();
602  err = CreateFilegroupImplRef::OneLogfileGroupLimit;
603  break;
604  }
605 
606  if (!m_logfile_group_pool.seize(ptr))
607  {
608  jam();
609  err = CreateFilegroupImplRef::OutOfFilegroupRecords;
610  break;
611  }
612 
613  new (ptr.p) Logfile_group(req);
614 
615  if (!alloc_logbuffer_memory(ptr, req->logfile_group.buffer_size))
616  {
617  jam();
618  err= CreateFilegroupImplRef::OutOfLogBufferMemory;
619  m_logfile_group_pool.release(ptr);
620  break;
621  }
622 
623  m_logfile_group_hash.add(ptr);
624  m_logfile_group_list.add(ptr);
625 
626  if ((getNodeState().getNodeRestartInProgress() &&
627  getNodeState().starting.restartType !=
628  NodeState::ST_INITIAL_NODE_RESTART)||
629  getNodeState().getSystemRestartInProgress())
630  {
631  ptr.p->m_state = Logfile_group::LG_STARTING;
632  }
633 
635  (CreateFilegroupImplConf*)signal->getDataPtr();
636  conf->senderData = senderData;
637  conf->senderRef = reference();
638  sendSignal(senderRef, GSN_CREATE_FILEGROUP_IMPL_CONF, signal,
639  CreateFilegroupImplConf::SignalLength, JBB);
640 
641  return;
642  } while(0);
643 
644  CreateFilegroupImplRef* ref= (CreateFilegroupImplRef*)signal->getDataPtr();
645  ref->senderData = senderData;
646  ref->senderRef = reference();
647  ref->errorCode = err;
648  sendSignal(senderRef, GSN_CREATE_FILEGROUP_IMPL_REF, signal,
649  CreateFilegroupImplRef::SignalLength, JBB);
650 }
651 
652 void
653 Lgman::execDROP_FILEGROUP_IMPL_REQ(Signal* signal)
654 {
655  jamEntry();
656 
657  Uint32 errorCode = 0;
658  DropFilegroupImplReq req = *(DropFilegroupImplReq*)signal->getDataPtr();
659  do
660  {
661  Ptr<Logfile_group> ptr;
662  if (!m_logfile_group_hash.find(ptr, req.filegroup_id))
663  {
664  errorCode = DropFilegroupImplRef::NoSuchFilegroup;
665  break;
666  }
667 
668  if (ptr.p->m_version != req.filegroup_version)
669  {
670  errorCode = DropFilegroupImplRef::InvalidFilegroupVersion;
671  break;
672  }
673 
674  switch(req.requestInfo){
675  case DropFilegroupImplReq::Prepare:
676  break;
677  case DropFilegroupImplReq::Commit:
678  m_logfile_group_list.remove(ptr);
679  ptr.p->m_state |= Logfile_group::LG_DROPPING;
680  signal->theData[0] = LgmanContinueB::DROP_FILEGROUP;
681  signal->theData[1] = ptr.i;
682  signal->theData[2] = req.senderRef;
683  signal->theData[3] = req.senderData;
684  sendSignal(reference(), GSN_CONTINUEB, signal, 4, JBB);
685  return;
686  case DropFilegroupImplReq::Abort:
687  break;
688  default:
689  ndbrequire(false);
690  }
691  } while(0);
692 
693  if (errorCode)
694  {
695  DropFilegroupImplRef* ref =
696  (DropFilegroupImplRef*)signal->getDataPtrSend();
697  ref->senderRef = reference();
698  ref->senderData = req.senderData;
699  ref->errorCode = errorCode;
700  sendSignal(req.senderRef, GSN_DROP_FILEGROUP_IMPL_REF, signal,
701  DropFilegroupImplRef::SignalLength, JBB);
702  }
703  else
704  {
705  DropFilegroupImplConf* conf =
706  (DropFilegroupImplConf*)signal->getDataPtrSend();
707  conf->senderRef = reference();
708  conf->senderData = req.senderData;
709  sendSignal(req.senderRef, GSN_DROP_FILEGROUP_IMPL_CONF, signal,
710  DropFilegroupImplConf::SignalLength, JBB);
711  }
712 }
713 
714 void
715 Lgman::drop_filegroup_drop_files(Signal* signal,
716  Ptr<Logfile_group> ptr,
717  Uint32 ref, Uint32 data)
718 {
719  jam();
720  ndbrequire(! (ptr.p->m_state & Logfile_group::LG_THREAD_MASK));
721  ndbrequire(ptr.p->m_outstanding_fs == 0);
722 
723  Local_undofile_list list(m_file_pool, ptr.p->m_files);
724  Ptr<Undofile> file_ptr;
725 
726  if (list.first(file_ptr))
727  {
728  jam();
729  ndbrequire(! (file_ptr.p->m_state & Undofile::FS_OUTSTANDING));
730  file_ptr.p->m_create.m_senderRef = ref;
731  file_ptr.p->m_create.m_senderData = data;
732  create_file_abort(signal, ptr, file_ptr);
733  return;
734  }
735 
736  Local_undofile_list metalist(m_file_pool, ptr.p->m_meta_files);
737  if (metalist.first(file_ptr))
738  {
739  jam();
740  metalist.remove(file_ptr);
741  list.add(file_ptr);
742  file_ptr.p->m_create.m_senderRef = ref;
743  file_ptr.p->m_create.m_senderData = data;
744  create_file_abort(signal, ptr, file_ptr);
745  return;
746  }
747 
748  free_logbuffer_memory(ptr);
749  m_logfile_group_hash.release(ptr);
750  DropFilegroupImplConf *conf = (DropFilegroupImplConf*)signal->getDataPtr();
751  conf->senderData = data;
752  conf->senderRef = reference();
753  sendSignal(ref, GSN_DROP_FILEGROUP_IMPL_CONF, signal,
754  DropFilegroupImplConf::SignalLength, JBB);
755 }
756 
757 void
758 Lgman::execCREATE_FILE_IMPL_REQ(Signal* signal)
759 {
760  jamEntry();
761  CreateFileImplReq* req= (CreateFileImplReq*)signal->getDataPtr();
762 
763  Uint32 senderRef = req->senderRef;
764  Uint32 senderData = req->senderData;
765  Uint32 requestInfo = req->requestInfo;
766 
767  Ptr<Logfile_group> ptr;
768  CreateFileImplRef::ErrorCode err = CreateFileImplRef::NoError;
769  SectionHandle handle(this, signal);
770  do {
771  if (!m_logfile_group_hash.find(ptr, req->filegroup_id))
772  {
773  jam();
774  err = CreateFileImplRef::InvalidFilegroup;
775  break;
776  }
777 
778  if (ptr.p->m_version != req->filegroup_version)
779  {
780  jam();
781  err = CreateFileImplRef::InvalidFilegroupVersion;
782  break;
783  }
784 
785  Ptr<Undofile> file_ptr;
786  switch(requestInfo){
787  case CreateFileImplReq::Commit:
788  {
789  jam();
790  ndbrequire(find_file_by_id(file_ptr, ptr.p->m_meta_files, req->file_id));
791  file_ptr.p->m_create.m_senderRef = req->senderRef;
792  file_ptr.p->m_create.m_senderData = req->senderData;
793  create_file_commit(signal, ptr, file_ptr);
794  return;
795  }
796  case CreateFileImplReq::Abort:
797  {
798  Uint32 senderRef = req->senderRef;
799  Uint32 senderData = req->senderData;
800  if (find_file_by_id(file_ptr, ptr.p->m_meta_files, req->file_id))
801  {
802  jam();
803  file_ptr.p->m_create.m_senderRef = senderRef;
804  file_ptr.p->m_create.m_senderData = senderData;
805  create_file_abort(signal, ptr, file_ptr);
806  }
807  else
808  {
809  CreateFileImplConf* conf= (CreateFileImplConf*)signal->getDataPtr();
810  jam();
811  conf->senderData = senderData;
812  conf->senderRef = reference();
813  sendSignal(senderRef, GSN_CREATE_FILE_IMPL_CONF, signal,
814  CreateFileImplConf::SignalLength, JBB);
815  }
816  return;
817  }
818  default: // prepare
819  break;
820  }
821 
822  if (!m_file_pool.seize(file_ptr))
823  {
824  jam();
825  err = CreateFileImplRef::OutOfFileRecords;
826  break;
827  }
828 
829  if (!handle.m_cnt == 1)
830  {
831  ndbrequire(false);
832  }
833 
834  if (ERROR_INSERTED(15000) ||
835  (sizeof(void*) == 4 && req->file_size_hi & 0xFFFFFFFF))
836  {
837  jam();
838  err = CreateFileImplRef::FileSizeTooLarge;
839  break;
840  }
841 
842  Uint64 sz = (Uint64(req->file_size_hi) << 32) + req->file_size_lo;
843  if (sz < 1024*1024)
844  {
845  jam();
846  err = CreateFileImplRef::FileSizeTooSmall;
847  break;
848  }
849 
850  new (file_ptr.p) Undofile(req, ptr.i);
851 
852  Local_undofile_list tmp(m_file_pool, ptr.p->m_meta_files);
853  tmp.add(file_ptr);
854 
855  open_file(signal, file_ptr, req->requestInfo, &handle);
856  return;
857  } while(0);
858 
859  releaseSections(handle);
860  CreateFileImplRef* ref= (CreateFileImplRef*)signal->getDataPtr();
861  ref->senderData = senderData;
862  ref->senderRef = reference();
863  ref->errorCode = err;
864  sendSignal(senderRef, GSN_CREATE_FILE_IMPL_REF, signal,
865  CreateFileImplRef::SignalLength, JBB);
866 }
867 
868 void
869 Lgman::open_file(Signal* signal, Ptr<Undofile> ptr,
870  Uint32 requestInfo,
871  SectionHandle * handle)
872 {
873  FsOpenReq* req = (FsOpenReq*)signal->getDataPtrSend();
874  req->userReference = reference();
875  req->userPointer = ptr.i;
876 
877  memset(req->fileNumber, 0, sizeof(req->fileNumber));
878  FsOpenReq::setVersion(req->fileNumber, 4); // Version 4 = specified filename
879  FsOpenReq::v4_setBasePath(req->fileNumber, FsOpenReq::BP_DD_UF);
880 
881  req->fileFlags = 0;
882  req->fileFlags |= FsOpenReq::OM_READWRITE;
883  req->fileFlags |= FsOpenReq::OM_DIRECT;
884  req->fileFlags |= FsOpenReq::OM_SYNC;
885  switch(requestInfo){
886  case CreateFileImplReq::Create:
887  req->fileFlags |= FsOpenReq::OM_CREATE_IF_NONE;
888  req->fileFlags |= FsOpenReq::OM_INIT;
889  ptr.p->m_state = Undofile::FS_CREATING;
890  break;
891  case CreateFileImplReq::CreateForce:
892  req->fileFlags |= FsOpenReq::OM_CREATE;
893  req->fileFlags |= FsOpenReq::OM_INIT;
894  ptr.p->m_state = Undofile::FS_CREATING;
895  break;
896  case CreateFileImplReq::Open:
897  req->fileFlags |= FsOpenReq::OM_CHECK_SIZE;
898  ptr.p->m_state = Undofile::FS_OPENING;
899  break;
900  default:
901  ndbrequire(false);
902  }
903 
904  req->page_size = File_formats::NDB_PAGE_SIZE;
905  Uint64 size = (Uint64)ptr.p->m_file_size * (Uint64)File_formats::NDB_PAGE_SIZE;
906  req->file_size_hi = (Uint32)(size >> 32);
907  req->file_size_lo = (Uint32)(size & 0xFFFFFFFF);
908 
909  sendSignal(NDBFS_REF, GSN_FSOPENREQ, signal, FsOpenReq::SignalLength, JBB,
910  handle);
911 }
912 
913 void
914 Lgman::execFSWRITEREQ(Signal* signal)
915 {
916  jamEntry();
917  Ptr<Undofile> ptr;
918  Ptr<GlobalPage> page_ptr;
919  FsReadWriteReq* req= (FsReadWriteReq*)signal->getDataPtr();
920 
921  m_file_pool.getPtr(ptr, req->userPointer);
922  m_shared_page_pool.getPtr(page_ptr, req->data.pageData[0]);
923 
924  if (req->varIndex == 0)
925  {
928  page->m_page_header.init(File_formats::FT_Undofile,
929  getOwnNodeId(),
930  ndbGetOwnVersion(),
931  (Uint32)time(0));
932  page->m_file_id = ptr.p->m_file_id;
933  page->m_logfile_group_id = ptr.p->m_create.m_logfile_group_id;
934  page->m_logfile_group_version = ptr.p->m_create.m_logfile_group_version;
935  page->m_undo_pages = ptr.p->m_file_size - 1; // minus zero page
936  }
937  else
938  {
941  page->m_page_header.m_page_lsn_hi = 0;
942  page->m_page_header.m_page_lsn_lo = 0;
943  page->m_page_header.m_page_type = File_formats::PT_Undopage;
944  page->m_words_used = 0;
945  }
946 }
947 
948 void
949 Lgman::execFSOPENREF(Signal* signal)
950 {
951  jamEntry();
952 
953  Ptr<Undofile> ptr;
954  Ptr<Logfile_group> lg_ptr;
955  FsRef* ref = (FsRef*)signal->getDataPtr();
956 
957  Uint32 errCode = ref->errorCode;
958  Uint32 osErrCode = ref->osErrorCode;
959 
960  m_file_pool.getPtr(ptr, ref->userPointer);
961  m_logfile_group_pool.getPtr(lg_ptr, ptr.p->m_logfile_group_ptr_i);
962 
963  {
964  CreateFileImplRef* ref= (CreateFileImplRef*)signal->getDataPtr();
965  ref->senderData = ptr.p->m_create.m_senderData;
966  ref->senderRef = reference();
967  ref->errorCode = CreateFileImplRef::FileError;
968  ref->fsErrCode = errCode;
969  ref->osErrCode = osErrCode;
970 
971  sendSignal(ptr.p->m_create.m_senderRef, GSN_CREATE_FILE_IMPL_REF, signal,
972  CreateFileImplRef::SignalLength, JBB);
973  }
974 
975  Local_undofile_list meta(m_file_pool, lg_ptr.p->m_meta_files);
976  meta.release(ptr);
977 }
978 
979 #define HEAD 0
980 #define TAIL 1
981 
982 void
983 Lgman::execFSOPENCONF(Signal* signal)
984 {
985  jamEntry();
986  Ptr<Undofile> ptr;
987 
988  FsConf* conf = (FsConf*)signal->getDataPtr();
989 
990  Uint32 fd = conf->filePointer;
991  m_file_pool.getPtr(ptr, conf->userPointer);
992 
993  ptr.p->m_fd = fd;
994 
995  {
996  Uint32 senderRef = ptr.p->m_create.m_senderRef;
997  Uint32 senderData = ptr.p->m_create.m_senderData;
998 
999  CreateFileImplConf* conf= (CreateFileImplConf*)signal->getDataPtr();
1000  conf->senderData = senderData;
1001  conf->senderRef = reference();
1002  sendSignal(senderRef, GSN_CREATE_FILE_IMPL_CONF, signal,
1003  CreateFileImplConf::SignalLength, JBB);
1004  }
1005 }
1006 
1007 bool
1008 Lgman::find_file_by_id(Ptr<Undofile>& ptr,
1009  Local_undofile_list::Head& head, Uint32 id)
1010 {
1011  Local_undofile_list list(m_file_pool, head);
1012  for(list.first(ptr); !ptr.isNull(); list.next(ptr))
1013  if(ptr.p->m_file_id == id)
1014  return true;
1015  return false;
1016 }
1017 
1018 void
1019 Lgman::create_file_commit(Signal* signal,
1020  Ptr<Logfile_group> lg_ptr,
1021  Ptr<Undofile> ptr)
1022 {
1023  Uint32 senderRef = ptr.p->m_create.m_senderRef;
1024  Uint32 senderData = ptr.p->m_create.m_senderData;
1025 
1026  bool first= false;
1027  if(ptr.p->m_state == Undofile::FS_CREATING &&
1028  (lg_ptr.p->m_state & Logfile_group::LG_ONLINE))
1029  {
1030  jam();
1031  Local_undofile_list free(m_file_pool, lg_ptr.p->m_files);
1032  Local_undofile_list meta(m_file_pool, lg_ptr.p->m_meta_files);
1033  first= free.isEmpty();
1034  meta.remove(ptr);
1035  if(!first)
1036  {
1040  Ptr<Undofile> curr;
1041  m_file_pool.getPtr(curr, lg_ptr.p->m_file_pos[HEAD].m_ptr_i);
1042  if(free.next(curr))
1043  free.insert(ptr, curr); // inserts before (that's why the extra next)
1044  else
1045  free.add(ptr);
1046 
1047  ptr.p->m_state = Undofile::FS_ONLINE | Undofile::FS_EMPTY;
1048  }
1049  else
1050  {
1054  free.add(ptr);
1055  ptr.p->m_state = Undofile::FS_ONLINE;
1056  lg_ptr.p->m_state |= Logfile_group::LG_FLUSH_THREAD;
1057  signal->theData[0] = LgmanContinueB::FLUSH_LOG;
1058  signal->theData[1] = lg_ptr.i;
1059  signal->theData[2] = 0;
1060  sendSignal(reference(), GSN_CONTINUEB, signal, 3, JBB);
1061  }
1062  }
1063  else
1064  {
1065  ptr.p->m_state = Undofile::FS_SORTING;
1066  }
1067 
1068  ptr.p->m_online.m_lsn = 0;
1069  ptr.p->m_online.m_outstanding = 0;
1070 
1071  Uint64 add= ptr.p->m_file_size - 1;
1072  lg_ptr.p->m_free_file_words += add * File_formats::UNDO_PAGE_WORDS;
1073 
1074  if(first)
1075  {
1076  jam();
1077 
1078  Buffer_idx tmp= { ptr.i, 0 };
1079  lg_ptr.p->m_file_pos[HEAD] = lg_ptr.p->m_file_pos[TAIL] = tmp;
1080 
1084  lg_ptr.p->m_tail_pos[0] = tmp;
1085  lg_ptr.p->m_tail_pos[1] = tmp;
1086  lg_ptr.p->m_tail_pos[2] = tmp;
1087  lg_ptr.p->m_next_reply_ptr_i = ptr.i;
1088  }
1089 
1090  validate_logfile_group(lg_ptr, "create_file_commit");
1091 
1092  CreateFileImplConf* conf= (CreateFileImplConf*)signal->getDataPtr();
1093  conf->senderData = senderData;
1094  conf->senderRef = reference();
1095  sendSignal(senderRef, GSN_CREATE_FILE_IMPL_CONF, signal,
1096  CreateFileImplConf::SignalLength, JBB);
1097 }
1098 
1099 void
1100 Lgman::create_file_abort(Signal* signal,
1101  Ptr<Logfile_group> lg_ptr,
1102  Ptr<Undofile> ptr)
1103 {
1104  if (ptr.p->m_fd == RNIL)
1105  {
1106  ((FsConf*)signal->getDataPtr())->userPointer = ptr.i;
1107  execFSCLOSECONF(signal);
1108  return;
1109  }
1110 
1111  FsCloseReq *req= (FsCloseReq*)signal->getDataPtrSend();
1112  req->filePointer = ptr.p->m_fd;
1113  req->userReference = reference();
1114  req->userPointer = ptr.i;
1115  req->fileFlag = 0;
1116  FsCloseReq::setRemoveFileFlag(req->fileFlag, true);
1117 
1118  sendSignal(NDBFS_REF, GSN_FSCLOSEREQ, signal,
1119  FsCloseReq::SignalLength, JBB);
1120 }
1121 
1122 void
1123 Lgman::execFSCLOSECONF(Signal* signal)
1124 {
1125  Ptr<Undofile> ptr;
1126  Ptr<Logfile_group> lg_ptr;
1127  Uint32 ptrI = ((FsConf*)signal->getDataPtr())->userPointer;
1128  m_file_pool.getPtr(ptr, ptrI);
1129 
1130  Uint32 senderRef = ptr.p->m_create.m_senderRef;
1131  Uint32 senderData = ptr.p->m_create.m_senderData;
1132 
1133  m_logfile_group_pool.getPtr(lg_ptr, ptr.p->m_logfile_group_ptr_i);
1134 
1135  if (lg_ptr.p->m_state & Logfile_group::LG_DROPPING)
1136  {
1137  jam();
1138  {
1139  Local_undofile_list list(m_file_pool, lg_ptr.p->m_files);
1140  list.release(ptr);
1141  }
1142  drop_filegroup_drop_files(signal, lg_ptr, senderRef, senderData);
1143  }
1144  else
1145  {
1146  jam();
1147  Local_undofile_list list(m_file_pool, lg_ptr.p->m_meta_files);
1148  list.release(ptr);
1149 
1150  CreateFileImplConf* conf= (CreateFileImplConf*)signal->getDataPtr();
1151  conf->senderData = senderData;
1152  conf->senderRef = reference();
1153  sendSignal(senderRef, GSN_CREATE_FILE_IMPL_CONF, signal,
1154  CreateFileImplConf::SignalLength, JBB);
1155  }
1156 }
1157 
1158 void
1159 Lgman::execDROP_FILE_IMPL_REQ(Signal* signal)
1160 {
1161  jamEntry();
1162  ndbrequire(false);
1163 }
1164 
1165 #define CONSUMER 0
1166 #define PRODUCER 1
1167 
1168 Lgman::Logfile_group::Logfile_group(const CreateFilegroupImplReq* req)
1169 {
1170  m_logfile_group_id = req->filegroup_id;
1171  m_version = req->filegroup_version;
1172  m_state = LG_ONLINE;
1173  m_outstanding_fs = 0;
1174  m_next_reply_ptr_i = RNIL;
1175 
1176  m_last_lsn = 0;
1177  m_last_synced_lsn = 0;
1178  m_last_sync_req_lsn = 0;
1179  m_max_sync_req_lsn = 0;
1180  m_last_read_lsn = 0;
1181  m_file_pos[0].m_ptr_i= m_file_pos[1].m_ptr_i = RNIL;
1182 
1183  m_free_file_words = 0;
1184  m_total_buffer_words = 0;
1185  m_free_buffer_words = 0;
1186  m_callback_buffer_words = 0;
1187 
1188  m_pos[CONSUMER].m_current_page.m_ptr_i = RNIL;// { m_buffer_pages, idx }
1189  m_pos[CONSUMER].m_current_pos.m_ptr_i = RNIL; // { page ptr.i, m_words_used}
1190  m_pos[PRODUCER].m_current_page.m_ptr_i = RNIL;// { m_buffer_pages, idx }
1191  m_pos[PRODUCER].m_current_pos.m_ptr_i = RNIL; // { page ptr.i, m_words_used}
1192 
1193  m_tail_pos[2].m_ptr_i= RNIL;
1194  m_tail_pos[2].m_idx= ~0;
1195 
1196  m_tail_pos[0] = m_tail_pos[1] = m_tail_pos[2];
1197 }
1198 
1199 bool
1200 Lgman::alloc_logbuffer_memory(Ptr<Logfile_group> ptr, Uint32 bytes)
1201 {
1202  Uint32 pages= (((bytes + 3) >> 2) + File_formats::NDB_PAGE_SIZE_WORDS - 1)
1203  / File_formats::NDB_PAGE_SIZE_WORDS;
1204  Uint32 requested= pages;
1205  {
1206  Page_map map(m_data_buffer_pool, ptr.p->m_buffer_pages);
1207  while(pages)
1208  {
1209  Uint32 ptrI;
1210  Uint32 cnt = pages > 64 ? 64 : pages;
1211  m_ctx.m_mm.alloc_pages(RG_DISK_OPERATIONS, &ptrI, &cnt, 1);
1212  if (cnt)
1213  {
1214  Buffer_idx range;
1215  range.m_ptr_i= ptrI;
1216  range.m_idx = cnt;
1217 
1218  if (map.append((Uint32*)&range, 2) == false)
1219  {
1224  jam();
1225  m_ctx.m_mm.release_pages(RG_DISK_OPERATIONS,
1226  range.m_ptr_i, range.m_idx);
1227  break;
1228  }
1229  pages -= range.m_idx;
1230  }
1231  else
1232  {
1233  break;
1234  }
1235  }
1236  }
1237 
1238  if(pages)
1239  {
1240  /* Could not allocate all of the requested memory.
1241  * So release that already allocated.
1242  */
1243  free_logbuffer_memory(ptr);
1244  return false;
1245  }
1246 
1247 #if defined VM_TRACE || defined ERROR_INSERT
1248  ndbout << "DD lgman: fg id:" << ptr.p->m_logfile_group_id << " undo buffer pages/bytes:" << (requested-pages) << "/" << (requested-pages)*File_formats::NDB_PAGE_SIZE << endl;
1249 #endif
1250 
1251  init_logbuffer_pointers(ptr);
1252  return true;
1253 }
1254 
1255 void
1256 Lgman::init_logbuffer_pointers(Ptr<Logfile_group> ptr)
1257 {
1258  Page_map map(m_data_buffer_pool, ptr.p->m_buffer_pages);
1259  Page_map::Iterator it;
1260  union {
1261  Uint32 tmp[2];
1262  Buffer_idx range;
1263  };
1264 
1265  map.first(it);
1266  tmp[0] = *it.data;
1267  ndbrequire(map.next(it));
1268  tmp[1] = *it.data;
1269 
1270  ptr.p->m_pos[CONSUMER].m_current_page.m_ptr_i = 0; // Index in page map
1271  ptr.p->m_pos[CONSUMER].m_current_page.m_idx = range.m_idx - 1;// left range
1272  ptr.p->m_pos[CONSUMER].m_current_pos.m_ptr_i = range.m_ptr_i; // Which page
1273  ptr.p->m_pos[CONSUMER].m_current_pos.m_idx = 0; // Page pos
1274 
1275  ptr.p->m_pos[PRODUCER].m_current_page.m_ptr_i = 0; // Index in page map
1276  ptr.p->m_pos[PRODUCER].m_current_page.m_idx = range.m_idx - 1;// left range
1277  ptr.p->m_pos[PRODUCER].m_current_pos.m_ptr_i = range.m_ptr_i; // Which page
1278  ptr.p->m_pos[PRODUCER].m_current_pos.m_idx = 0; // Page pos
1279 
1280  Uint32 pages= range.m_idx;
1281  while(map.next(it))
1282  {
1283  tmp[0] = *it.data;
1284  ndbrequire(map.next(it));
1285  tmp[1] = *it.data;
1286  pages += range.m_idx;
1287  }
1288 
1289  ptr.p->m_total_buffer_words =
1290  ptr.p->m_free_buffer_words = pages * File_formats::UNDO_PAGE_WORDS;
1291 }
1292 
1293 Uint32
1294 Lgman::compute_free_file_pages(Ptr<Logfile_group> ptr)
1295 {
1296  Buffer_idx head= ptr.p->m_file_pos[HEAD];
1297  Buffer_idx tail= ptr.p->m_file_pos[TAIL];
1298  Uint32 pages = 0;
1299  if (head.m_ptr_i == tail.m_ptr_i && head.m_idx < tail.m_idx)
1300  {
1301  pages += tail.m_idx - head.m_idx;
1302  }
1303  else
1304  {
1306  m_file_pool.getPtr(file, head.m_ptr_i);
1307  Local_undofile_list list(m_file_pool, ptr.p->m_files);
1308 
1309  do
1310  {
1311  pages += (file.p->m_file_size - head.m_idx - 1);
1312  if(!list.next(file))
1313  list.first(file);
1314  head.m_idx = 0;
1315  } while(file.i != tail.m_ptr_i);
1316 
1317  pages += tail.m_idx - head.m_idx;
1318  }
1319  return pages;
1320 }
1321 
1322 void
1323 Lgman::free_logbuffer_memory(Ptr<Logfile_group> ptr)
1324 {
1325  union {
1326  Uint32 tmp[2];
1327  Buffer_idx range;
1328  };
1329 
1330  Page_map map(m_data_buffer_pool, ptr.p->m_buffer_pages);
1331 
1332  Page_map::Iterator it;
1333  map.first(it);
1334  while(!it.isNull())
1335  {
1336  tmp[0] = *it.data;
1337  ndbrequire(map.next(it));
1338  tmp[1] = *it.data;
1339 
1340  m_ctx.m_mm.release_pages(RG_DISK_OPERATIONS, range.m_ptr_i, range.m_idx);
1341  map.next(it);
1342  }
1343  map.release();
1344 }
1345 
1346 Lgman::Undofile::Undofile(const struct CreateFileImplReq* req, Uint32 ptrI)
1347 {
1348  m_fd = RNIL;
1349  m_file_id = req->file_id;
1350  m_logfile_group_ptr_i= ptrI;
1351 
1352  Uint64 pages = req->file_size_hi;
1353  pages = (pages << 32) | req->file_size_lo;
1354  pages /= GLOBAL_PAGE_SIZE;
1355  m_file_size = Uint32(pages);
1356 #if defined VM_TRACE || defined ERROR_INSERT
1357  ndbout << "DD lgman: file id:" << m_file_id << " undofile pages/bytes:" << m_file_size << "/" << m_file_size*GLOBAL_PAGE_SIZE << endl;
1358 #endif
1359 
1360  m_create.m_senderRef = req->senderRef; // During META
1361  m_create.m_senderData = req->senderData; // During META
1362  m_create.m_logfile_group_id = req->filegroup_id;
1363 }
1364 
1365 Logfile_client::Logfile_client(SimulatedBlock* block,
1366  Lgman* lgman, Uint32 logfile_group_id,
1367  bool lock)
1368 {
1369  Uint32 bno = block->number();
1370  Uint32 ino = block->instance();
1371  m_client_block= block;
1372  m_block= numberToBlock(bno, ino);
1373  m_lgman= lgman;
1374  m_lock = lock;
1375  m_logfile_group_id= logfile_group_id;
1376  D("client ctor " << bno << "/" << ino);
1377  if (m_lock)
1378  m_lgman->client_lock(m_block, 0);
1379 }
1380 
1381 Logfile_client::~Logfile_client()
1382 {
1383 #ifdef VM_TRACE
1384  Uint32 bno = blockToMain(m_block);
1385  Uint32 ino = blockToInstance(m_block);
1386 #endif
1387  D("client dtor " << bno << "/" << ino);
1388  if (m_lock)
1389  m_lgman->client_unlock(m_block, 0);
1390 }
1391 
1392 int
1394  Uint64 lsn, Request* req, Uint32 flags)
1395 {
1397  if(m_lgman->m_logfile_group_list.first(ptr))
1398  {
1399  if(ptr.p->m_last_synced_lsn >= lsn)
1400  {
1401  return 1;
1402  }
1403 
1404  bool empty= false;
1406  {
1408  list(m_lgman->m_log_waiter_pool, ptr.p->m_log_sync_waiters);
1409 
1410  empty= list.isEmpty();
1411  if(!list.seize(wait))
1412  return -1;
1413 
1414  wait.p->m_block= m_block;
1415  wait.p->m_sync_lsn= lsn;
1416  memcpy(&wait.p->m_callback, &req->m_callback,
1417  sizeof(SimulatedBlock::CallbackPtr));
1418 
1419  ptr.p->m_max_sync_req_lsn = lsn > ptr.p->m_max_sync_req_lsn ?
1420  lsn : ptr.p->m_max_sync_req_lsn;
1421  }
1422 
1423  if(ptr.p->m_last_sync_req_lsn < lsn &&
1424  ! (ptr.p->m_state & Lgman::Logfile_group::LG_FORCE_SYNC_THREAD))
1425  {
1426  ptr.p->m_state |= Lgman::Logfile_group::LG_FORCE_SYNC_THREAD;
1427  signal->theData[0] = LgmanContinueB::FORCE_LOG_SYNC;
1428  signal->theData[1] = ptr.i;
1429  signal->theData[2] = (Uint32)(lsn >> 32);
1430  signal->theData[3] = (Uint32)(lsn & 0xFFFFFFFF);
1431  m_client_block->sendSignalWithDelay(m_lgman->reference(),
1432  GSN_CONTINUEB, signal, 10, 4);
1433  }
1434  return 0;
1435  }
1436  return -1;
1437 }
1438 
1439 void
1440 Lgman::force_log_sync(Signal* signal,
1441  Ptr<Logfile_group> ptr,
1442  Uint32 lsn_hi, Uint32 lsn_lo)
1443 {
1444  Local_log_waiter_list list(m_log_waiter_pool, ptr.p->m_log_sync_waiters);
1445  Uint64 force_lsn = lsn_hi; force_lsn <<= 32; force_lsn += lsn_lo;
1446 
1447  if(ptr.p->m_last_sync_req_lsn < force_lsn)
1448  {
1452  Buffer_idx pos= ptr.p->m_pos[PRODUCER].m_current_pos;
1453  GlobalPage *page = m_shared_page_pool.getPtr(pos.m_ptr_i);
1454 
1455  Uint32 free= File_formats::UNDO_PAGE_WORDS - pos.m_idx;
1456  if(pos.m_idx) // don't flush empty page...
1457  {
1458  Uint64 lsn= ptr.p->m_last_lsn - 1;
1459 
1462  undo->m_page_header.m_page_lsn_lo = (Uint32)(lsn & 0xFFFFFFFF);
1463  undo->m_page_header.m_page_lsn_hi = (Uint32)(lsn >> 32);
1464  undo->m_words_used= File_formats::UNDO_PAGE_WORDS - free;
1465 
1469  ndbrequire(ptr.p->m_free_file_words >= free);
1470  ndbrequire(ptr.p->m_free_buffer_words > free);
1471  ptr.p->m_free_file_words -= free;
1472  ptr.p->m_free_buffer_words -= free;
1473 
1474  validate_logfile_group(ptr, "force_log_sync");
1475 
1476  next_page(ptr.p, PRODUCER);
1477  ptr.p->m_pos[PRODUCER].m_current_pos.m_idx = 0;
1478  }
1479  }
1480 
1481 
1482 
1483  Uint64 max_req_lsn = ptr.p->m_max_sync_req_lsn;
1484  if(max_req_lsn > force_lsn &&
1485  max_req_lsn > ptr.p->m_last_sync_req_lsn)
1486  {
1487  ndbrequire(ptr.p->m_state & Lgman::Logfile_group::LG_FORCE_SYNC_THREAD);
1488  signal->theData[0] = LgmanContinueB::FORCE_LOG_SYNC;
1489  signal->theData[1] = ptr.i;
1490  signal->theData[2] = (Uint32)(max_req_lsn >> 32);
1491  signal->theData[3] = (Uint32)(max_req_lsn & 0xFFFFFFFF);
1492  sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 10, 4);
1493  }
1494  else
1495  {
1496  ptr.p->m_state &= ~(Uint32)Lgman::Logfile_group::LG_FORCE_SYNC_THREAD;
1497  }
1498 }
1499 
1500 void
1501 Lgman::process_log_sync_waiters(Signal* signal, Ptr<Logfile_group> ptr)
1502 {
1503  Local_log_waiter_list
1504  list(m_log_waiter_pool, ptr.p->m_log_sync_waiters);
1505 
1506  if(list.isEmpty())
1507  {
1508  return;
1509  }
1510 
1511  bool removed= false;
1512  Ptr<Log_waiter> waiter;
1513  list.first(waiter);
1514  Uint32 logfile_group_id = ptr.p->m_logfile_group_id;
1515 
1516  if(waiter.p->m_sync_lsn <= ptr.p->m_last_synced_lsn)
1517  {
1518  removed= true;
1519  Uint32 block = waiter.p->m_block;
1520  CallbackPtr & callback = waiter.p->m_callback;
1521  sendCallbackConf(signal, block, callback, logfile_group_id);
1522 
1523  list.releaseFirst(waiter);
1524  }
1525 
1526  if(removed && !list.isEmpty())
1527  {
1528  ptr.p->m_state |= Logfile_group::LG_SYNC_WAITERS_THREAD;
1529  signal->theData[0] = LgmanContinueB::PROCESS_LOG_SYNC_WAITERS;
1530  signal->theData[1] = ptr.i;
1531  sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
1532  }
1533  else
1534  {
1535  ptr.p->m_state &= ~(Uint32)Logfile_group::LG_SYNC_WAITERS_THREAD;
1536  }
1537 }
1538 
1539 
1540 Uint32*
1541 Lgman::get_log_buffer(Ptr<Logfile_group> ptr, Uint32 sz)
1542 {
1543  GlobalPage *page;
1544  page=m_shared_page_pool.getPtr(ptr.p->m_pos[PRODUCER].m_current_pos.m_ptr_i);
1545 
1546  Uint32 total_free= ptr.p->m_free_buffer_words;
1547  assert(total_free >= sz);
1548  Uint32 pos= ptr.p->m_pos[PRODUCER].m_current_pos.m_idx;
1549  Uint32 free= File_formats::UNDO_PAGE_WORDS - pos;
1550 
1551  if(sz <= free)
1552  {
1553 next:
1554  // fits this page wo/ problem
1555  ndbrequire(total_free >= sz);
1556  ptr.p->m_free_buffer_words = total_free - sz;
1557  ptr.p->m_pos[PRODUCER].m_current_pos.m_idx = pos + sz;
1558  return ((File_formats::Undofile::Undo_page*)page)->m_data + pos;
1559  }
1560 
1564  Uint64 lsn= ptr.p->m_last_lsn - 1;
1567  undo->m_page_header.m_page_lsn_lo = (Uint32)(lsn & 0xFFFFFFFF);
1568  undo->m_page_header.m_page_lsn_hi = (Uint32)(lsn >> 32);
1569  undo->m_words_used= File_formats::UNDO_PAGE_WORDS - free;
1570 
1574  ndbrequire(ptr.p->m_free_file_words >= free);
1575  ptr.p->m_free_file_words -= free;
1576 
1577  validate_logfile_group(ptr, "get_log_buffer");
1578 
1579  pos= 0;
1580  assert(total_free >= free);
1581  total_free -= free;
1582  page= m_shared_page_pool.getPtr(next_page(ptr.p, PRODUCER));
1583  goto next;
1584 }
1585 
1586 Uint32
1587 Lgman::next_page(Logfile_group* ptrP, Uint32 i)
1588 {
1589  Uint32 page_ptr_i= ptrP->m_pos[i].m_current_pos.m_ptr_i;
1590  Uint32 left_in_range= ptrP->m_pos[i].m_current_page.m_idx;
1591  if(left_in_range > 0)
1592  {
1593  ptrP->m_pos[i].m_current_page.m_idx = left_in_range - 1;
1594  ptrP->m_pos[i].m_current_pos.m_ptr_i = page_ptr_i + 1;
1595  return page_ptr_i + 1;
1596  }
1597  else
1598  {
1599  Lgman::Page_map map(m_data_buffer_pool, ptrP->m_buffer_pages);
1600  Uint32 pos= (ptrP->m_pos[i].m_current_page.m_ptr_i + 2) % map.getSize();
1602  map.position(it, pos);
1603 
1604  union {
1605  Uint32 tmp[2];
1606  Lgman::Buffer_idx range;
1607  };
1608 
1609  tmp[0] = *it.data; map.next(it);
1610  tmp[1] = *it.data;
1611 
1612  ptrP->m_pos[i].m_current_page.m_ptr_i = pos; // New index in map
1613  ptrP->m_pos[i].m_current_page.m_idx = range.m_idx - 1; // Free pages
1614  ptrP->m_pos[i].m_current_pos.m_ptr_i = range.m_ptr_i; // Current page
1615  // No need to set ptrP->m_current_pos.m_idx, that is set "in higher"-func
1616  return range.m_ptr_i;
1617  }
1618 }
1619 
1620 int
1622  SimulatedBlock::CallbackPtr* callback)
1623 {
1624  sz += 2; // lsn
1626  key.m_logfile_group_id= m_logfile_group_id;
1628  if(m_lgman->m_logfile_group_hash.find(ptr, key))
1629  {
1630  Uint32 callback_buffer = ptr.p->m_callback_buffer_words;
1631  Uint32 free_buffer = ptr.p->m_free_buffer_words;
1632  if (free_buffer >= (sz + callback_buffer + FREE_BUFFER_MARGIN) &&
1633  ptr.p->m_log_buffer_waiters.isEmpty())
1634  {
1635  ptr.p->m_callback_buffer_words = callback_buffer + sz;
1636  return 1;
1637  }
1638 
1639  bool empty= false;
1640  {
1643  list(m_lgman->m_log_waiter_pool, ptr.p->m_log_buffer_waiters);
1644 
1645  empty= list.isEmpty();
1646  if(!list.seize(wait))
1647  {
1648  return -1;
1649  }
1650 
1651  wait.p->m_size= sz;
1652  wait.p->m_block= m_block;
1653  memcpy(&wait.p->m_callback, callback,sizeof(SimulatedBlock::CallbackPtr));
1654  }
1655 
1656  return 0;
1657  }
1658  return -1;
1659 }
1660 
1661 NdbOut&
1662 operator<<(NdbOut& out, const Lgman::Buffer_idx& pos)
1663 {
1664  out << "[ "
1665  << pos.m_ptr_i << " "
1666  << pos.m_idx << " ]";
1667  return out;
1668 }
1669 
1670 NdbOut&
1671 operator<<(NdbOut& out, const Lgman::Logfile_group::Position& pos)
1672 {
1673  out << "[ ("
1674  << pos.m_current_page.m_ptr_i << " "
1675  << pos.m_current_page.m_idx << ") ("
1676  << pos.m_current_pos.m_ptr_i << " "
1677  << pos.m_current_pos.m_idx << ") ]";
1678  return out;
1679 }
1680 
1681 void
1682 Lgman::flush_log(Signal* signal, Ptr<Logfile_group> ptr, Uint32 force)
1683 {
1684  Logfile_group::Position consumer= ptr.p->m_pos[CONSUMER];
1685  Logfile_group::Position producer= ptr.p->m_pos[PRODUCER];
1686 
1687  jamEntry();
1688 
1689  if (consumer.m_current_page == producer.m_current_page)
1690  {
1691  jam();
1692  Buffer_idx pos = producer.m_current_pos;
1693 
1694 #if 0
1695  if (force)
1696  {
1697  ndbout_c("force: %d ptr.p->m_file_pos[HEAD].m_ptr_i= %x",
1698  force, ptr.p->m_file_pos[HEAD].m_ptr_i);
1699  ndbout_c("consumer.m_current_page: %d %d producer.m_current_page: %d %d",
1700  consumer.m_current_page.m_ptr_i, consumer.m_current_page.m_idx,
1701  producer.m_current_page.m_ptr_i, producer.m_current_page.m_idx);
1702  }
1703 #endif
1704  if (! (ptr.p->m_state & Logfile_group::LG_DROPPING))
1705  {
1706  jam();
1707 
1708  if (ptr.p->m_log_buffer_waiters.isEmpty() || pos.m_idx == 0)
1709  {
1710  jam();
1711  force = 0;
1712  }
1713  else if (ptr.p->m_free_buffer_words < FREE_BUFFER_MARGIN)
1714  {
1715  jam();
1716  force = 2;
1717  }
1718 
1719  if (force < 2 || ptr.p->m_outstanding_fs)
1720  {
1721  jam();
1722  signal->theData[0] = LgmanContinueB::FLUSH_LOG;
1723  signal->theData[1] = ptr.i;
1724  signal->theData[2] = force + 1;
1725  sendSignalWithDelay(reference(), GSN_CONTINUEB, signal,
1726  force ? 10 : 100, 3);
1727  return;
1728  }
1729  else
1730  {
1731  jam();
1732  GlobalPage *page = m_shared_page_pool.getPtr(pos.m_ptr_i);
1733 
1734  Uint32 free= File_formats::UNDO_PAGE_WORDS - pos.m_idx;
1735 
1736  ndbout_c("force flush %d %d outstanding: %u isEmpty(): %u",
1737  pos.m_idx, ptr.p->m_free_buffer_words,
1738  ptr.p->m_outstanding_fs,
1739  ptr.p->m_log_buffer_waiters.isEmpty());
1740 
1741  ndbrequire(pos.m_idx); // don't flush empty page...
1742  Uint64 lsn= ptr.p->m_last_lsn - 1;
1743 
1746  undo->m_page_header.m_page_lsn_lo = (Uint32)(lsn & 0xFFFFFFFF);
1747  undo->m_page_header.m_page_lsn_hi = (Uint32)(lsn >> 32);
1748  undo->m_words_used= File_formats::UNDO_PAGE_WORDS - free;
1749 
1753  ndbrequire(ptr.p->m_free_file_words >= free);
1754  ndbrequire(ptr.p->m_free_buffer_words > free);
1755  ptr.p->m_free_file_words -= free;
1756  ptr.p->m_free_buffer_words -= free;
1757 
1758  validate_logfile_group(ptr, "force_log_flush");
1759 
1760  next_page(ptr.p, PRODUCER);
1761  ptr.p->m_pos[PRODUCER].m_current_pos.m_idx = 0;
1762  producer = ptr.p->m_pos[PRODUCER];
1763  // break through
1764  }
1765  }
1766  else
1767  {
1768  jam();
1769  ptr.p->m_state &= ~(Uint32)Logfile_group::LG_FLUSH_THREAD;
1770  return;
1771  }
1772  }
1773 
1774  bool full= false;
1775  Uint32 tot= 0;
1776  while(!(consumer.m_current_page == producer.m_current_page) && !full)
1777  {
1778  jam();
1779  validate_logfile_group(ptr, "before flush log");
1780 
1781  Uint32 cnt; // pages written
1782  Uint32 page= consumer.m_current_pos.m_ptr_i;
1783  if(consumer.m_current_page.m_ptr_i == producer.m_current_page.m_ptr_i)
1784  {
1788  jam();
1789 
1790  if(producer.m_current_pos.m_ptr_i > page)
1791  {
1795  jam();
1796  Uint32 tmp= producer.m_current_pos.m_ptr_i - page;
1797  cnt= write_log_pages(signal, ptr, page, tmp);
1798  assert(cnt <= tmp);
1799 
1800  consumer.m_current_pos.m_ptr_i += cnt;
1801  consumer.m_current_page.m_idx -= cnt;
1802  full= (tmp > cnt);
1803  }
1804  else
1805  {
1809  Uint32 tmp= consumer.m_current_page.m_idx + 1;
1810  cnt= write_log_pages(signal, ptr, page, tmp);
1811  assert(cnt <= tmp);
1812 
1813  if(cnt == tmp)
1814  {
1815  jam();
1820  ptr.p->m_pos[CONSUMER].m_current_page.m_idx= 0;
1821  next_page(ptr.p, CONSUMER);
1822  consumer = ptr.p->m_pos[CONSUMER];
1823  }
1824  else
1825  {
1826  jam();
1830  full= true;
1831  consumer.m_current_page.m_idx -= cnt;
1832  consumer.m_current_pos.m_ptr_i += cnt;
1833  }
1834  }
1835  }
1836  else
1837  {
1838  Uint32 tmp= consumer.m_current_page.m_idx + 1;
1839  cnt= write_log_pages(signal, ptr, page, tmp);
1840  assert(cnt <= tmp);
1841 
1842  if(cnt == tmp)
1843  {
1844  jam();
1849  ptr.p->m_pos[CONSUMER].m_current_page.m_idx= 0;
1850  next_page(ptr.p, CONSUMER);
1851  consumer = ptr.p->m_pos[CONSUMER];
1852  }
1853  else
1854  {
1855  jam();
1859  full= true;
1860  consumer.m_current_page.m_idx -= cnt;
1861  consumer.m_current_pos.m_ptr_i += cnt;
1862  }
1863  }
1864 
1865  tot += cnt;
1866  if(cnt)
1867  validate_logfile_group(ptr, " after flush_log");
1868  }
1869 
1870  ptr.p->m_pos[CONSUMER]= consumer;
1871 
1872  if (! (ptr.p->m_state & Logfile_group::LG_DROPPING))
1873  {
1874  signal->theData[0] = LgmanContinueB::FLUSH_LOG;
1875  signal->theData[1] = ptr.i;
1876  signal->theData[2] = 0;
1877  sendSignal(reference(), GSN_CONTINUEB, signal, 3, JBB);
1878  }
1879  else
1880  {
1881  ptr.p->m_state &= ~(Uint32)Logfile_group::LG_FLUSH_THREAD;
1882  }
1883 }
1884 
1885 void
1886 Lgman::process_log_buffer_waiters(Signal* signal, Ptr<Logfile_group> ptr)
1887 {
1888  Uint32 free_buffer= ptr.p->m_free_buffer_words;
1889  Uint32 callback_buffer = ptr.p->m_callback_buffer_words;
1890  Local_log_waiter_list
1891  list(m_log_waiter_pool, ptr.p->m_log_buffer_waiters);
1892 
1893  if (list.isEmpty())
1894  {
1895  jam();
1896  ptr.p->m_state &= ~(Uint32)Logfile_group::LG_WAITERS_THREAD;
1897  return;
1898  }
1899 
1900  bool removed= false;
1901  Ptr<Log_waiter> waiter;
1902  list.first(waiter);
1903  Uint32 sz = waiter.p->m_size;
1904  Uint32 logfile_group_id = ptr.p->m_logfile_group_id;
1905  if (sz + callback_buffer + FREE_BUFFER_MARGIN < free_buffer)
1906  {
1907  jam();
1908  removed= true;
1909  Uint32 block = waiter.p->m_block;
1910  CallbackPtr & callback = waiter.p->m_callback;
1911  ptr.p->m_callback_buffer_words += sz;
1912  sendCallbackConf(signal, block, callback, logfile_group_id);
1913 
1914  list.releaseFirst(waiter);
1915  }
1916 
1917  if (removed && !list.isEmpty())
1918  {
1919  jam();
1920  ptr.p->m_state |= Logfile_group::LG_WAITERS_THREAD;
1921  signal->theData[0] = LgmanContinueB::PROCESS_LOG_BUFFER_WAITERS;
1922  signal->theData[1] = ptr.i;
1923  sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
1924  }
1925  else
1926  {
1927  jam();
1928  ptr.p->m_state &= ~(Uint32)Logfile_group::LG_WAITERS_THREAD;
1929  }
1930 }
1931 
1932 #define REALLY_SLOW_FS 0
1933 
1934 Uint32
1935 Lgman::write_log_pages(Signal* signal, Ptr<Logfile_group> ptr,
1936  Uint32 pageId, Uint32 in_pages)
1937 {
1938  assert(in_pages);
1939  Ptr<Undofile> filePtr;
1940  Buffer_idx head= ptr.p->m_file_pos[HEAD];
1941  Buffer_idx tail= ptr.p->m_file_pos[TAIL];
1942  m_file_pool.getPtr(filePtr, head.m_ptr_i);
1943 
1944  if(filePtr.p->m_online.m_outstanding > 0)
1945  {
1946  jam();
1947  return 0;
1948  }
1949 
1950  Uint32 sz= filePtr.p->m_file_size - 1; // skip zero
1951  Uint32 max, pages= in_pages;
1952 
1953  if(!(head.m_ptr_i == tail.m_ptr_i && head.m_idx < tail.m_idx))
1954  {
1955  max= sz - head.m_idx;
1956  }
1957  else
1958  {
1959  max= tail.m_idx - head.m_idx;
1960  }
1961 
1962  FsReadWriteReq* req= (FsReadWriteReq*)signal->getDataPtrSend();
1963  req->filePointer = filePtr.p->m_fd;
1964  req->userReference = reference();
1965  req->userPointer = filePtr.i;
1966  req->varIndex = 1+head.m_idx; // skip zero page
1967  req->numberOfPages = pages;
1968  req->data.pageData[0] = pageId;
1969  req->operationFlag = 0;
1970  FsReadWriteReq::setFormatFlag(req->operationFlag,
1971  FsReadWriteReq::fsFormatSharedPage);
1972 
1973  if(max > pages)
1974  {
1975  jam();
1976  max= pages;
1977  head.m_idx += max;
1978  ptr.p->m_file_pos[HEAD] = head;
1979 
1980  if (REALLY_SLOW_FS)
1981  sendSignalWithDelay(NDBFS_REF, GSN_FSWRITEREQ, signal, REALLY_SLOW_FS,
1982  FsReadWriteReq::FixedLength + 1);
1983  else
1984  sendSignal(NDBFS_REF, GSN_FSWRITEREQ, signal,
1985  FsReadWriteReq::FixedLength + 1, JBA);
1986 
1987  ptr.p->m_outstanding_fs++;
1988  filePtr.p->m_online.m_outstanding = max;
1989  filePtr.p->m_state |= Undofile::FS_OUTSTANDING;
1990 
1992  m_shared_page_pool.getPtr(pageId + max - 1);
1993  Uint64 lsn = 0;
1994  lsn += page->m_page_header.m_page_lsn_hi; lsn <<= 32;
1995  lsn += page->m_page_header.m_page_lsn_lo;
1996 
1997  filePtr.p->m_online.m_lsn = lsn; // Store last writereq lsn on file
1998  ptr.p->m_last_sync_req_lsn = lsn; // And logfile_group
1999  }
2000  else
2001  {
2002  jam();
2003  req->numberOfPages = max;
2004  FsReadWriteReq::setSyncFlag(req->operationFlag, 1);
2005 
2006  if (REALLY_SLOW_FS)
2007  sendSignalWithDelay(NDBFS_REF, GSN_FSWRITEREQ, signal, REALLY_SLOW_FS,
2008  FsReadWriteReq::FixedLength + 1);
2009  else
2010  sendSignal(NDBFS_REF, GSN_FSWRITEREQ, signal,
2011  FsReadWriteReq::FixedLength + 1, JBA);
2012 
2013  ptr.p->m_outstanding_fs++;
2014  filePtr.p->m_online.m_outstanding = max;
2015  filePtr.p->m_state |= Undofile::FS_OUTSTANDING;
2016 
2018  m_shared_page_pool.getPtr(pageId + max - 1);
2019  Uint64 lsn = 0;
2020  lsn += page->m_page_header.m_page_lsn_hi; lsn <<= 32;
2021  lsn += page->m_page_header.m_page_lsn_lo;
2022 
2023  filePtr.p->m_online.m_lsn = lsn; // Store last writereq lsn on file
2024  ptr.p->m_last_sync_req_lsn = lsn; // And logfile_group
2025 
2026  Ptr<Undofile> next = filePtr;
2027  Local_undofile_list files(m_file_pool, ptr.p->m_files);
2028  if(!files.next(next))
2029  {
2030  jam();
2031  files.first(next);
2032  }
2033  ndbout_c("changing file from %d to %d", filePtr.i, next.i);
2034  filePtr.p->m_state |= Undofile::FS_MOVE_NEXT;
2035  next.p->m_state &= ~(Uint32)Undofile::FS_EMPTY;
2036 
2037  head.m_idx= 0;
2038  head.m_ptr_i= next.i;
2039  ptr.p->m_file_pos[HEAD] = head;
2040  if(max < pages)
2041  max += write_log_pages(signal, ptr, pageId + max, pages - max);
2042  }
2043 
2044  assert(max);
2045  return max;
2046 }
2047 
2048 void
2049 Lgman::execFSWRITEREF(Signal* signal)
2050 {
2051  jamEntry();
2052  SimulatedBlock::execFSWRITEREF(signal);
2053  ndbrequire(false);
2054 }
2055 
2056 void
2057 Lgman::execFSWRITECONF(Signal* signal)
2058 {
2059  jamEntry();
2060  client_lock(number(), __LINE__);
2061  FsConf * conf = (FsConf*)signal->getDataPtr();
2062  Ptr<Undofile> ptr;
2063  m_file_pool.getPtr(ptr, conf->userPointer);
2064 
2065  ndbrequire(ptr.p->m_state & Undofile::FS_OUTSTANDING);
2066  ptr.p->m_state &= ~(Uint32)Undofile::FS_OUTSTANDING;
2067 
2068  Ptr<Logfile_group> lg_ptr;
2069  m_logfile_group_pool.getPtr(lg_ptr, ptr.p->m_logfile_group_ptr_i);
2070 
2071  Uint32 cnt= lg_ptr.p->m_outstanding_fs;
2072  ndbrequire(cnt);
2073 
2074  if(lg_ptr.p->m_next_reply_ptr_i == ptr.i)
2075  {
2076  Uint32 tot= 0;
2077  Uint64 lsn = 0;
2078  {
2079  Local_undofile_list files(m_file_pool, lg_ptr.p->m_files);
2080  while(cnt && ! (ptr.p->m_state & Undofile::FS_OUTSTANDING))
2081  {
2082  Uint32 state= ptr.p->m_state;
2083  Uint32 pages= ptr.p->m_online.m_outstanding;
2084  ndbrequire(pages);
2085  ptr.p->m_online.m_outstanding= 0;
2086  ptr.p->m_state &= ~(Uint32)Undofile::FS_MOVE_NEXT;
2087  tot += pages;
2088  cnt--;
2089 
2090  lsn = ptr.p->m_online.m_lsn;
2091 
2092  if((state & Undofile::FS_MOVE_NEXT) && !files.next(ptr))
2093  files.first(ptr);
2094  }
2095  }
2096 
2097  ndbassert(tot);
2098  lg_ptr.p->m_outstanding_fs = cnt;
2099  lg_ptr.p->m_free_buffer_words += (tot * File_formats::UNDO_PAGE_WORDS);
2100  lg_ptr.p->m_next_reply_ptr_i = ptr.i;
2101  lg_ptr.p->m_last_synced_lsn = lsn;
2102 
2103  if(! (lg_ptr.p->m_state & Logfile_group::LG_SYNC_WAITERS_THREAD))
2104  {
2105  process_log_sync_waiters(signal, lg_ptr);
2106  }
2107 
2108  if(! (lg_ptr.p->m_state & Logfile_group::LG_WAITERS_THREAD))
2109  {
2110  process_log_buffer_waiters(signal, lg_ptr);
2111  }
2112  }
2113  else
2114  {
2115  ndbout_c("miss matched writes");
2116  }
2117  client_unlock(number(), __LINE__);
2118 
2119  return;
2120 }
2121 
2122 void
2123 Lgman::execLCP_FRAG_ORD(Signal* signal)
2124 {
2125  jamEntry();
2126  client_lock(number(), __LINE__);
2127  exec_lcp_frag_ord(signal, this);
2128  client_unlock(number(), __LINE__);
2129 }
2130 
2131 void
2133 {
2134  jamEntry();
2135 
2136  LcpFragOrd * ord = (LcpFragOrd *)signal->getDataPtr();
2137  Uint32 lcp_id= ord->lcpId;
2138  Uint32 frag_id = ord->fragmentId;
2139  Uint32 table_id = ord->tableId;
2140 
2141  Ptr<Logfile_group> ptr;
2142  m_logfile_group_list.first(ptr);
2143 
2144  Uint32 entry= lcp_id == m_latest_lcp ?
2145  File_formats::Undofile::UNDO_LCP : File_formats::Undofile::UNDO_LCP_FIRST;
2146  if(!ptr.isNull() && ! (ptr.p->m_state & Logfile_group::LG_CUT_LOG_THREAD))
2147  {
2148  jam();
2149  ptr.p->m_state |= Logfile_group::LG_CUT_LOG_THREAD;
2150  signal->theData[0] = LgmanContinueB::CUT_LOG_TAIL;
2151  signal->theData[1] = ptr.i;
2152  client_block->sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
2153  }
2154 
2155  if(!ptr.isNull() && ptr.p->m_last_lsn)
2156  {
2157  Uint32 undo[3];
2158  undo[0] = lcp_id;
2159  undo[1] = (table_id << 16) | frag_id;
2160  undo[2] = (entry << 16 ) | (sizeof(undo) >> 2);
2161 
2162  Uint64 last_lsn= m_last_lsn;
2163 
2164  if(ptr.p->m_last_lsn == last_lsn
2165 #ifdef VM_TRACE
2166  && ((rand() % 100) > 50)
2167 #endif
2168  )
2169  {
2170  undo[2] |= File_formats::Undofile::UNDO_NEXT_LSN << 16;
2171  Uint32 *dst= get_log_buffer(ptr, sizeof(undo) >> 2);
2172  memcpy(dst, undo, sizeof(undo));
2173  ndbrequire(ptr.p->m_free_file_words >= (sizeof(undo) >> 2));
2174  ptr.p->m_free_file_words -= (sizeof(undo) >> 2);
2175  }
2176  else
2177  {
2178  Uint32 *dst= get_log_buffer(ptr, (sizeof(undo) >> 2) + 2);
2179  * dst++ = (Uint32)(last_lsn >> 32);
2180  * dst++ = (Uint32)(last_lsn & 0xFFFFFFFF);
2181  memcpy(dst, undo, sizeof(undo));
2182  ndbrequire(ptr.p->m_free_file_words >= (sizeof(undo) >> 2));
2183  ptr.p->m_free_file_words -= ((sizeof(undo) >> 2) + 2);
2184  }
2185  ptr.p->m_last_lcp_lsn = last_lsn;
2186  m_last_lsn = ptr.p->m_last_lsn = last_lsn + 1;
2187 
2188  validate_logfile_group(ptr, "execLCP_FRAG_ORD");
2189  }
2190 
2191  while(!ptr.isNull())
2192  {
2193  if (ptr.p->m_last_lsn)
2194  {
2198  if(m_latest_lcp != lcp_id)
2199  {
2200  ptr.p->m_tail_pos[0] = ptr.p->m_tail_pos[1];
2201  ptr.p->m_tail_pos[1] = ptr.p->m_tail_pos[2];
2202  ptr.p->m_tail_pos[2] = ptr.p->m_file_pos[HEAD];
2203  }
2204 
2205  if(0)
2206  ndbout_c
2207  ("execLCP_FRAG_ORD (%d %d) (%d %d) (%d %d) free pages: %ld",
2208  ptr.p->m_tail_pos[0].m_ptr_i, ptr.p->m_tail_pos[0].m_idx,
2209  ptr.p->m_tail_pos[1].m_ptr_i, ptr.p->m_tail_pos[1].m_idx,
2210  ptr.p->m_tail_pos[2].m_ptr_i, ptr.p->m_tail_pos[2].m_idx,
2211  (long) (ptr.p->m_free_file_words / File_formats::UNDO_PAGE_WORDS));
2212  }
2213  m_logfile_group_list.next(ptr);
2214  }
2215 
2216  m_latest_lcp = lcp_id;
2217 }
2218 
2219 void
2220 Lgman::execEND_LCP_REQ(Signal* signal)
2221 {
2222  EndLcpReq* req= (EndLcpReq*)signal->getDataPtr();
2223  ndbrequire(m_latest_lcp == req->backupId);
2224  m_end_lcp_senderdata = req->senderData;
2225 
2226  Ptr<Logfile_group> ptr;
2227  m_logfile_group_list.first(ptr);
2228  bool wait= false;
2229  while(!ptr.isNull())
2230  {
2231  Uint64 lcp_lsn = ptr.p->m_last_lcp_lsn;
2232  if(ptr.p->m_last_synced_lsn < lcp_lsn)
2233  {
2234  wait= true;
2235  if(signal->getSendersBlockRef() != reference())
2236  {
2237  D("Logfile_client - execEND_LCP_REQ");
2238  Logfile_client tmp(this, this, ptr.p->m_logfile_group_id);
2240  req.m_callback.m_callbackData = ptr.i;
2241  req.m_callback.m_callbackIndex = ENDLCP_CALLBACK;
2242  ndbrequire(tmp.sync_lsn(signal, lcp_lsn, &req, 0) == 0);
2243  }
2244  }
2245  else
2246  {
2247  ptr.p->m_last_lcp_lsn = 0;
2248  }
2249  m_logfile_group_list.next(ptr);
2250  }
2251 
2252  if(wait)
2253  {
2254  return;
2255  }
2256 
2257  EndLcpConf* conf = (EndLcpConf*)signal->getDataPtrSend();
2258  conf->senderData = m_end_lcp_senderdata;
2259  conf->senderRef = reference();
2260  sendSignal(DBLQH_REF, GSN_END_LCP_CONF,
2261  signal, EndLcpConf::SignalLength, JBB);
2262 }
2263 
2264 void
2265 Lgman::endlcp_callback(Signal* signal, Uint32 ptr, Uint32 res)
2266 {
2267  EndLcpReq* req= (EndLcpReq*)signal->getDataPtr();
2268  req->backupId = m_latest_lcp;
2269  req->senderData = m_end_lcp_senderdata;
2270  execEND_LCP_REQ(signal);
2271 }
2272 
2273 void
2274 Lgman::cut_log_tail(Signal* signal, Ptr<Logfile_group> ptr)
2275 {
2276  bool done= true;
2277  if (likely(ptr.p->m_last_lsn))
2278  {
2279  Buffer_idx tmp= ptr.p->m_tail_pos[0];
2280  Buffer_idx tail= ptr.p->m_file_pos[TAIL];
2281 
2282  Ptr<Undofile> filePtr;
2283  m_file_pool.getPtr(filePtr, tail.m_ptr_i);
2284 
2285  if(!(tmp == tail))
2286  {
2287  Uint32 free;
2288  if(tmp.m_ptr_i == tail.m_ptr_i && tail.m_idx < tmp.m_idx)
2289  {
2290  free= tmp.m_idx - tail.m_idx;
2291  ptr.p->m_free_file_words += free * File_formats::UNDO_PAGE_WORDS;
2292  ptr.p->m_file_pos[TAIL] = tmp;
2293  }
2294  else
2295  {
2296  free= filePtr.p->m_file_size - tail.m_idx - 1;
2297  ptr.p->m_free_file_words += free * File_formats::UNDO_PAGE_WORDS;
2298 
2299  Ptr<Undofile> next = filePtr;
2300  Local_undofile_list files(m_file_pool, ptr.p->m_files);
2301  while(files.next(next) && (next.p->m_state & Undofile::FS_EMPTY))
2302  ndbrequire(next.i != filePtr.i);
2303  if(next.isNull())
2304  {
2305  jam();
2306  files.first(next);
2307  while((next.p->m_state & Undofile::FS_EMPTY) && files.next(next))
2308  ndbrequire(next.i != filePtr.i);
2309  }
2310 
2311  tmp.m_idx= 0;
2312  tmp.m_ptr_i= next.i;
2313  ptr.p->m_file_pos[TAIL] = tmp;
2314  done= false;
2315  }
2316  }
2317 
2318  validate_logfile_group(ptr, "cut log");
2319  }
2320 
2321  if (done)
2322  {
2323  ptr.p->m_state &= ~(Uint32)Logfile_group::LG_CUT_LOG_THREAD;
2324  m_logfile_group_list.next(ptr);
2325  }
2326 
2327  if(!done || !ptr.isNull())
2328  {
2329  ptr.p->m_state |= Logfile_group::LG_CUT_LOG_THREAD;
2330  signal->theData[0] = LgmanContinueB::CUT_LOG_TAIL;
2331  signal->theData[1] = ptr.i;
2332  sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
2333  }
2334 }
2335 
2336 void
2338 {
2339  jamEntry();
2340 
2341  Ptr<Logfile_group> ptr;
2342  m_logfile_group_list.first(ptr);
2343 
2347  return; // NOT IMPLETMENT YET
2348 
2349  signal->theData[0] = LgmanContinueB::FILTER_LOG;
2350  while(!ptr.isNull())
2351  {
2352  signal->theData[1] = ptr.i;
2353  sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
2354  m_logfile_group_list.next(ptr);
2355  }
2356 }
2357 
2358 int
2359 Lgman::alloc_log_space(Uint32 ref, Uint32 words)
2360 {
2361  ndbrequire(words);
2362  words += 2; // lsn
2363  Logfile_group key;
2364  key.m_logfile_group_id= ref;
2365  Ptr<Logfile_group> ptr;
2366  if(m_logfile_group_hash.find(ptr, key) &&
2367  ptr.p->m_free_file_words >= (words + (4 * File_formats::UNDO_PAGE_WORDS)))
2368  {
2369  ptr.p->m_free_file_words -= words;
2370  validate_logfile_group(ptr, "alloc_log_space");
2371  return 0;
2372  }
2373 
2374  if(ptr.isNull())
2375  {
2376  return -1;
2377  }
2378 
2379  return 1501;
2380 }
2381 
2382 int
2383 Lgman::free_log_space(Uint32 ref, Uint32 words)
2384 {
2385  ndbrequire(words);
2386  Logfile_group key;
2387  key.m_logfile_group_id= ref;
2388  Ptr<Logfile_group> ptr;
2389  if(m_logfile_group_hash.find(ptr, key))
2390  {
2391  ptr.p->m_free_file_words += (words + 2);
2392  validate_logfile_group(ptr, "free_log_space");
2393  return 0;
2394  }
2395  ndbrequire(false);
2396  return -1;
2397 }
2398 
2399 Uint64
2400 Logfile_client::add_entry(const Change* src, Uint32 cnt)
2401 {
2402  Uint32 i, tot= 0;
2403  for(i= 0; i<cnt; i++)
2404  {
2405  tot += src[i].len;
2406  }
2407 
2408  Uint32 *dst;
2409  Uint64 last_lsn= m_lgman->m_last_lsn;
2410  {
2412  key.m_logfile_group_id= m_logfile_group_id;
2414  if(m_lgman->m_logfile_group_hash.find(ptr, key))
2415  {
2416  Uint32 callback_buffer = ptr.p->m_callback_buffer_words;
2417  Uint64 last_lsn_filegroup= ptr.p->m_last_lsn;
2418  if(last_lsn_filegroup == last_lsn
2419 #ifdef VM_TRACE
2420  && ((rand() % 100) > 50)
2421 #endif
2422  )
2423  {
2424  dst= m_lgman->get_log_buffer(ptr, tot);
2425  for(i= 0; i<cnt; i++)
2426  {
2427  memcpy(dst, src[i].ptr, 4*src[i].len);
2428  dst += src[i].len;
2429  }
2430  * (dst - 1) |= File_formats::Undofile::UNDO_NEXT_LSN << 16;
2431  ptr.p->m_free_file_words += 2;
2432  m_lgman->validate_logfile_group(ptr);
2433  }
2434  else
2435  {
2436  dst= m_lgman->get_log_buffer(ptr, tot + 2);
2437  * dst++ = (Uint32)(last_lsn >> 32);
2438  * dst++ = (Uint32)(last_lsn & 0xFFFFFFFF);
2439  for(i= 0; i<cnt; i++)
2440  {
2441  memcpy(dst, src[i].ptr, 4*src[i].len);
2442  dst += src[i].len;
2443  }
2444  }
2449  tot += 2;
2450 
2451  if (unlikely(! (tot <= callback_buffer)))
2452  {
2453  abort();
2454  }
2455  ptr.p->m_callback_buffer_words = callback_buffer - tot;
2456  }
2457 
2458  m_lgman->m_last_lsn = ptr.p->m_last_lsn = last_lsn + 1;
2459 
2460  return last_lsn;
2461  }
2462 }
2463 
2464 void
2465 Lgman::execSTART_RECREQ(Signal* signal)
2466 {
2467  m_latest_lcp = signal->theData[0];
2468 
2469  Ptr<Logfile_group> ptr;
2470  m_logfile_group_list.first(ptr);
2471 
2472  if(ptr.i != RNIL)
2473  {
2474  infoEvent("Applying undo to LCP: %d", m_latest_lcp);
2475  ndbout_c("Applying undo to LCP: %d", m_latest_lcp);
2476  find_log_head(signal, ptr);
2477  return;
2478  }
2479 
2480  signal->theData[0] = reference();
2481  sendSignal(DBLQH_REF, GSN_START_RECCONF, signal, 1, JBB);
2482 }
2483 
2484 void
2485 Lgman::find_log_head(Signal* signal, Ptr<Logfile_group> ptr)
2486 {
2487  ndbrequire(ptr.p->m_state &
2488  (Logfile_group::LG_STARTING | Logfile_group::LG_SORTING));
2489 
2490  if(ptr.p->m_meta_files.isEmpty() && ptr.p->m_files.isEmpty())
2491  {
2492  jam();
2496  ptr.p->m_state &= ~(Uint32)Logfile_group::LG_STARTING;
2497  ptr.p->m_state |= Logfile_group::LG_ONLINE;
2498  m_logfile_group_list.next(ptr);
2499  signal->theData[0] = LgmanContinueB::FIND_LOG_HEAD;
2500  signal->theData[1] = ptr.i;
2501  sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
2502  return;
2503  }
2504 
2505  ptr.p->m_state = Logfile_group::LG_SORTING;
2506 
2510  Local_undofile_list files(m_file_pool, ptr.p->m_meta_files);
2511  Ptr<Undofile> file_ptr;
2512  files.first(file_ptr);
2513 
2514  if(!file_ptr.isNull())
2515  {
2519  Uint32 page_id = ptr.p->m_pos[CONSUMER].m_current_pos.m_ptr_i;
2520  file_ptr.p->m_online.m_outstanding= page_id;
2521 
2522  FsReadWriteReq* req= (FsReadWriteReq*)signal->getDataPtrSend();
2523  req->filePointer = file_ptr.p->m_fd;
2524  req->userReference = reference();
2525  req->userPointer = file_ptr.i;
2526  req->varIndex = 1; // skip zero page
2527  req->numberOfPages = 1;
2528  req->data.pageData[0] = page_id;
2529  req->operationFlag = 0;
2530  FsReadWriteReq::setFormatFlag(req->operationFlag,
2531  FsReadWriteReq::fsFormatSharedPage);
2532 
2533  sendSignal(NDBFS_REF, GSN_FSREADREQ, signal,
2534  FsReadWriteReq::FixedLength + 1, JBA);
2535 
2536  ptr.p->m_outstanding_fs++;
2537  file_ptr.p->m_state |= Undofile::FS_OUTSTANDING;
2538  return;
2539  }
2540  else
2541  {
2546  ndbrequire(!ptr.p->m_files.isEmpty());
2547  Local_undofile_list read_files(m_file_pool, ptr.p->m_files);
2548  read_files.last(file_ptr);
2549 
2550 
2554  ptr.p->m_state = Logfile_group::LG_SEARCHING;
2555  file_ptr.p->m_state = Undofile::FS_SEARCHING;
2556  ptr.p->m_file_pos[TAIL].m_idx = 1; // left page
2557  ptr.p->m_file_pos[HEAD].m_idx = file_ptr.p->m_file_size;
2558  ptr.p->m_file_pos[HEAD].m_ptr_i = ((file_ptr.p->m_file_size - 1) >> 1) + 1;
2559 
2560  Uint32 page_id = ptr.p->m_pos[CONSUMER].m_current_pos.m_ptr_i;
2561  file_ptr.p->m_online.m_outstanding= page_id;
2562 
2563  FsReadWriteReq* req= (FsReadWriteReq*)signal->getDataPtrSend();
2564  req->filePointer = file_ptr.p->m_fd;
2565  req->userReference = reference();
2566  req->userPointer = file_ptr.i;
2567  req->varIndex = ptr.p->m_file_pos[HEAD].m_ptr_i;
2568  req->numberOfPages = 1;
2569  req->data.pageData[0] = page_id;
2570  req->operationFlag = 0;
2571  FsReadWriteReq::setFormatFlag(req->operationFlag,
2572  FsReadWriteReq::fsFormatSharedPage);
2573 
2574  sendSignal(NDBFS_REF, GSN_FSREADREQ, signal,
2575  FsReadWriteReq::FixedLength + 1, JBA);
2576 
2577  ptr.p->m_outstanding_fs++;
2578  file_ptr.p->m_state |= Undofile::FS_OUTSTANDING;
2579  return;
2580  }
2581 }
2582 
2583 void
2585 {
2586  jamEntry();
2587  client_lock(number(), __LINE__);
2588 
2589  Ptr<Undofile> ptr;
2590  Ptr<Logfile_group> lg_ptr;
2591  FsConf* conf = (FsConf*)signal->getDataPtr();
2592 
2593  m_file_pool.getPtr(ptr, conf->userPointer);
2594  m_logfile_group_pool.getPtr(lg_ptr, ptr.p->m_logfile_group_ptr_i);
2595 
2596  ndbrequire(ptr.p->m_state & Undofile::FS_OUTSTANDING);
2597  ptr.p->m_state &= ~(Uint32)Undofile::FS_OUTSTANDING;
2598 
2599  Uint32 cnt= lg_ptr.p->m_outstanding_fs;
2600  ndbrequire(cnt);
2601 
2602  if((ptr.p->m_state & Undofile::FS_EXECUTING)== Undofile::FS_EXECUTING)
2603  {
2604  jam();
2605 
2606  if(lg_ptr.p->m_next_reply_ptr_i == ptr.i)
2607  {
2608  Uint32 tot= 0;
2609  Local_undofile_list files(m_file_pool, lg_ptr.p->m_files);
2610  while(cnt && ! (ptr.p->m_state & Undofile::FS_OUTSTANDING))
2611  {
2612  Uint32 state= ptr.p->m_state;
2613  Uint32 pages= ptr.p->m_online.m_outstanding;
2614  ndbrequire(pages);
2615  ptr.p->m_online.m_outstanding= 0;
2616  ptr.p->m_state &= ~(Uint32)Undofile::FS_MOVE_NEXT;
2617  tot += pages;
2618  cnt--;
2619 
2620  if((state & Undofile::FS_MOVE_NEXT) && !files.prev(ptr))
2621  files.last(ptr);
2622  }
2623 
2624  lg_ptr.p->m_outstanding_fs = cnt;
2625  lg_ptr.p->m_pos[PRODUCER].m_current_pos.m_idx += tot;
2626  lg_ptr.p->m_next_reply_ptr_i = ptr.i;
2627  }
2628  client_unlock(number(), __LINE__);
2629  return;
2630  }
2631 
2632  lg_ptr.p->m_outstanding_fs = cnt - 1;
2633 
2634  Ptr<GlobalPage> page_ptr;
2635  m_shared_page_pool.getPtr(page_ptr, ptr.p->m_online.m_outstanding);
2636  ptr.p->m_online.m_outstanding= 0;
2637 
2639  (File_formats::Undofile::Undo_page*)page_ptr.p;
2640 
2641  Uint64 lsn = 0;
2642  lsn += page->m_page_header.m_page_lsn_hi; lsn <<= 32;
2643  lsn += page->m_page_header.m_page_lsn_lo;
2644 
2645  switch(ptr.p->m_state){
2646  case Undofile::FS_SORTING:
2647  jam();
2648  break;
2649  case Undofile::FS_SEARCHING:
2650  jam();
2651  find_log_head_in_file(signal, lg_ptr, ptr, lsn);
2652  client_unlock(number(), __LINE__);
2653  return;
2654  default:
2655  case Undofile::FS_EXECUTING:
2656  case Undofile::FS_CREATING:
2657  case Undofile::FS_DROPPING:
2658  case Undofile::FS_ONLINE:
2659  case Undofile::FS_OPENING:
2660  case Undofile::FS_EMPTY:
2661  jam();
2662  ndbrequire(false);
2663  }
2664 
2668  ptr.p->m_state = Undofile::FS_EXECUTING;
2669  ptr.p->m_online.m_lsn = lsn;
2670 
2674  {
2675  Local_undofile_list meta(m_file_pool, lg_ptr.p->m_meta_files);
2676  Local_undofile_list files(m_file_pool, lg_ptr.p->m_files);
2677  meta.remove(ptr);
2678 
2679  Ptr<Undofile> loop;
2680  files.first(loop);
2681  while(!loop.isNull() && loop.p->m_online.m_lsn <= lsn)
2682  files.next(loop);
2683 
2684  if(loop.isNull())
2685  {
2689  jam();
2690  files.add(ptr);
2691  }
2692  else
2693  {
2697  files.insert(ptr, loop);
2698  }
2699  }
2700  find_log_head(signal, lg_ptr);
2701  client_unlock(number(), __LINE__);
2702 }
2703 
2704 void
2705 Lgman::execFSREADREF(Signal* signal)
2706 {
2707  jamEntry();
2708  SimulatedBlock::execFSREADREF(signal);
2709  ndbrequire(false);
2710 }
2711 
2712 void
2713 Lgman::find_log_head_in_file(Signal* signal,
2714  Ptr<Logfile_group> ptr,
2715  Ptr<Undofile> file_ptr,
2716  Uint64 last_lsn)
2717 {
2718  // a b
2719  // 3 4 5 0 1
2720  Uint32 curr= ptr.p->m_file_pos[HEAD].m_ptr_i;
2721  Uint32 head= ptr.p->m_file_pos[HEAD].m_idx;
2722  Uint32 tail= ptr.p->m_file_pos[TAIL].m_idx;
2723 
2724  ndbrequire(head > tail);
2725  Uint32 diff = head - tail;
2726 
2727  if(DEBUG_SEARCH_LOG_HEAD)
2728  printf("tail: %d(%lld) head: %d last: %d(%lld) -> ",
2729  tail, file_ptr.p->m_online.m_lsn,
2730  head, curr, last_lsn);
2731  if(last_lsn > file_ptr.p->m_online.m_lsn)
2732  {
2733  if(DEBUG_SEARCH_LOG_HEAD)
2734  printf("moving tail ");
2735 
2736  file_ptr.p->m_online.m_lsn = last_lsn;
2737  ptr.p->m_file_pos[TAIL].m_idx = tail = curr;
2738  }
2739  else
2740  {
2741  if(DEBUG_SEARCH_LOG_HEAD)
2742  printf("moving head ");
2743 
2744  ptr.p->m_file_pos[HEAD].m_idx = head = curr;
2745  }
2746 
2747  if(diff > 1)
2748  {
2749  // We need to find more pages to be sure...
2750  ptr.p->m_file_pos[HEAD].m_ptr_i = curr = ((head + tail) >> 1);
2751 
2752  if(DEBUG_SEARCH_LOG_HEAD)
2753  ndbout_c("-> new search tail: %d(%lld) head: %d -> %d",
2754  tail, file_ptr.p->m_online.m_lsn,
2755  head, curr);
2756 
2757  Uint32 page_id = ptr.p->m_pos[CONSUMER].m_current_pos.m_ptr_i;
2758  file_ptr.p->m_online.m_outstanding= page_id;
2759 
2760  FsReadWriteReq* req= (FsReadWriteReq*)signal->getDataPtrSend();
2761  req->filePointer = file_ptr.p->m_fd;
2762  req->userReference = reference();
2763  req->userPointer = file_ptr.i;
2764  req->varIndex = curr;
2765  req->numberOfPages = 1;
2766  req->data.pageData[0] = page_id;
2767  req->operationFlag = 0;
2768  FsReadWriteReq::setFormatFlag(req->operationFlag,
2769  FsReadWriteReq::fsFormatSharedPage);
2770 
2771  sendSignal(NDBFS_REF, GSN_FSREADREQ, signal,
2772  FsReadWriteReq::FixedLength + 1, JBA);
2773 
2774  ptr.p->m_outstanding_fs++;
2775  file_ptr.p->m_state |= Undofile::FS_OUTSTANDING;
2776  return;
2777  }
2778 
2779  ndbrequire(diff == 1);
2780  if(DEBUG_SEARCH_LOG_HEAD)
2781  ndbout_c("-> found last page: %d", tail);
2782 
2783  ptr.p->m_state = 0;
2784  file_ptr.p->m_state = Undofile::FS_EXECUTING;
2785  ptr.p->m_last_lsn = file_ptr.p->m_online.m_lsn;
2786  ptr.p->m_last_read_lsn = file_ptr.p->m_online.m_lsn;
2787  ptr.p->m_last_synced_lsn = file_ptr.p->m_online.m_lsn;
2788  m_last_lsn = file_ptr.p->m_online.m_lsn;
2789 
2793  ptr.p->m_file_pos[HEAD].m_ptr_i = file_ptr.i;
2794  ptr.p->m_file_pos[HEAD].m_idx = tail;
2795 
2796  ptr.p->m_file_pos[TAIL].m_ptr_i = file_ptr.i;
2797  ptr.p->m_file_pos[TAIL].m_idx = tail - 1;
2798  ptr.p->m_next_reply_ptr_i = file_ptr.i;
2799 
2800  {
2801  Local_undofile_list files(m_file_pool, ptr.p->m_files);
2802  if(tail == 1)
2803  {
2808  Ptr<Undofile> prev = file_ptr;
2809  if(!files.prev(prev))
2810  {
2811  files.last(prev);
2812  }
2813  ptr.p->m_file_pos[TAIL].m_ptr_i = prev.i;
2814  ptr.p->m_file_pos[TAIL].m_idx = prev.p->m_file_size - 1;
2815  ptr.p->m_next_reply_ptr_i = prev.i;
2816  }
2817 
2818  SimulatedBlock* fs = globalData.getBlock(NDBFS);
2819  infoEvent("Undo head - %s page: %d lsn: %lld",
2820  fs->get_filename(file_ptr.p->m_fd),
2821  tail, file_ptr.p->m_online.m_lsn);
2822  g_eventLogger->info("Undo head - %s page: %d lsn: %lld",
2823  fs->get_filename(file_ptr.p->m_fd),
2824  tail, file_ptr.p->m_online.m_lsn);
2825 
2826  for(files.prev(file_ptr); !file_ptr.isNull(); files.prev(file_ptr))
2827  {
2828  infoEvent(" - next - %s(%lld)",
2829  fs->get_filename(file_ptr.p->m_fd),
2830  file_ptr.p->m_online.m_lsn);
2831 
2832  g_eventLogger->info(" - next - %s(%lld)",
2833  fs->get_filename(file_ptr.p->m_fd),
2834  file_ptr.p->m_online.m_lsn);
2835  }
2836  }
2837 
2841  m_logfile_group_list.next(ptr);
2842  signal->theData[0] = LgmanContinueB::FIND_LOG_HEAD;
2843  signal->theData[1] = ptr.i;
2844  sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
2845 }
2846 
2847 void
2848 Lgman::init_run_undo_log(Signal* signal)
2849 {
2854  Logfile_group_list& list= m_logfile_group_list;
2855  Logfile_group_list tmp(m_logfile_group_pool);
2856 
2857  bool found_any = false;
2858 
2859  list.first(group);
2860  while(!group.isNull())
2861  {
2863  list.next(group);
2864  list.remove(ptr);
2865 
2866  if (ptr.p->m_state & Logfile_group::LG_ONLINE)
2867  {
2871  jam();
2872  tmp.addLast(ptr);
2873  continue;
2874  }
2875 
2876  found_any = true;
2877 
2878  {
2882  ptr.p->m_free_buffer_words -= File_formats::UNDO_PAGE_WORDS;
2883  ptr.p->m_pos[CONSUMER].m_current_page.m_idx = 0; // 0 more pages read
2884  ptr.p->m_pos[PRODUCER].m_current_page.m_idx = 0; // 0 more pages read
2885 
2886  Uint32 page = ptr.p->m_pos[CONSUMER].m_current_pos.m_ptr_i;
2888  (File_formats::Undofile::Undo_page*)m_shared_page_pool.getPtr(page);
2889 
2890  ptr.p->m_pos[CONSUMER].m_current_pos.m_idx = pageP->m_words_used;
2891  ptr.p->m_pos[PRODUCER].m_current_pos.m_idx = 1;
2892  ptr.p->m_last_read_lsn++;
2893  }
2894 
2898  signal->theData[0] = LgmanContinueB::READ_UNDO_LOG;
2899  signal->theData[1] = ptr.i;
2900  sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
2901 
2905  Ptr<Logfile_group> pos;
2906  for(tmp.first(pos); !pos.isNull(); tmp.next(pos))
2907  if(ptr.p->m_last_read_lsn >= pos.p->m_last_read_lsn)
2908  break;
2909 
2910  if(pos.isNull())
2911  tmp.add(ptr);
2912  else
2913  tmp.insert(ptr, pos);
2914 
2915  ptr.p->m_state =
2916  Logfile_group::LG_EXEC_THREAD | Logfile_group::LG_READ_THREAD;
2917  }
2918  list = tmp;
2919 
2920  if (found_any == false)
2921  {
2925  jam();
2926  signal->theData[0] = reference();
2927  sendSignal(DBLQH_REF, GSN_START_RECCONF, signal, 1, JBB);
2928  return;
2929  }
2930 
2931  execute_undo_record(signal);
2932 }
2933 
2934 void
2935 Lgman::read_undo_log(Signal* signal, Ptr<Logfile_group> ptr)
2936 {
2937  Uint32 cnt, free= ptr.p->m_free_buffer_words;
2938 
2939  if(! (ptr.p->m_state & Logfile_group::LG_EXEC_THREAD))
2940  {
2941  jam();
2945  ptr.p->m_state &= ~(Uint32)Logfile_group::LG_READ_THREAD;
2946  stop_run_undo_log(signal);
2947  return;
2948  }
2949 
2950  if(free <= File_formats::UNDO_PAGE_WORDS)
2951  {
2952  signal->theData[0] = LgmanContinueB::READ_UNDO_LOG;
2953  signal->theData[1] = ptr.i;
2954  sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 100, 2);
2955  return;
2956  }
2957 
2958  Logfile_group::Position producer= ptr.p->m_pos[PRODUCER];
2959  Logfile_group::Position consumer= ptr.p->m_pos[CONSUMER];
2960 
2961  if(producer.m_current_page.m_idx == 0)
2962  {
2967  Page_map map(m_data_buffer_pool, ptr.p->m_buffer_pages);
2968  Uint32 sz = map.getSize();
2969  Uint32 pos= (producer.m_current_page.m_ptr_i + sz - 2) % sz;
2970  map.position(it, pos);
2971  union {
2972  Uint32 _tmp[2];
2973  Lgman::Buffer_idx range;
2974  };
2975  _tmp[0] = *it.data; map.next(it); _tmp[1] = *it.data;
2976  producer.m_current_page.m_ptr_i = pos;
2977  producer.m_current_page.m_idx = range.m_idx;
2978  producer.m_current_pos.m_ptr_i = range.m_ptr_i + range.m_idx;
2979  }
2980 
2981  if(producer.m_current_page.m_ptr_i == consumer.m_current_page.m_ptr_i &&
2982  producer.m_current_pos.m_ptr_i > consumer.m_current_pos.m_ptr_i)
2983  {
2984  Uint32 max=
2985  producer.m_current_pos.m_ptr_i - consumer.m_current_pos.m_ptr_i - 1;
2986  ndbrequire(free >= max * File_formats::UNDO_PAGE_WORDS);
2987  cnt= read_undo_pages(signal, ptr, producer.m_current_pos.m_ptr_i, max);
2988  ndbrequire(cnt <= max);
2989  producer.m_current_pos.m_ptr_i -= cnt;
2990  producer.m_current_page.m_idx -= cnt;
2991  }
2992  else
2993  {
2994  Uint32 max= producer.m_current_page.m_idx;
2995  ndbrequire(free >= max * File_formats::UNDO_PAGE_WORDS);
2996  cnt= read_undo_pages(signal, ptr, producer.m_current_pos.m_ptr_i, max);
2997  ndbrequire(cnt <= max);
2998  producer.m_current_pos.m_ptr_i -= cnt;
2999  producer.m_current_page.m_idx -= cnt;
3000  }
3001 
3002  ndbrequire(free >= cnt * File_formats::UNDO_PAGE_WORDS);
3003  free -= (cnt * File_formats::UNDO_PAGE_WORDS);
3004  ptr.p->m_free_buffer_words = free;
3005  ptr.p->m_pos[PRODUCER] = producer;
3006 
3007  signal->theData[0] = LgmanContinueB::READ_UNDO_LOG;
3008  signal->theData[1] = ptr.i;
3009 
3010  if(free > File_formats::UNDO_PAGE_WORDS)
3011  sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
3012  else
3013  sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 100, 2);
3014 }
3015 
3016 Uint32
3017 Lgman::read_undo_pages(Signal* signal, Ptr<Logfile_group> ptr,
3018  Uint32 pageId, Uint32 pages)
3019 {
3020  ndbrequire(pages);
3021  Ptr<Undofile> filePtr;
3022  Buffer_idx tail= ptr.p->m_file_pos[TAIL];
3023  m_file_pool.getPtr(filePtr, tail.m_ptr_i);
3024 
3025  if(filePtr.p->m_online.m_outstanding > 0)
3026  {
3027  jam();
3028  return 0;
3029  }
3030 
3031  Uint32 max= tail.m_idx;
3032 
3033  FsReadWriteReq* req= (FsReadWriteReq*)signal->getDataPtrSend();
3034  req->filePointer = filePtr.p->m_fd;
3035  req->userReference = reference();
3036  req->userPointer = filePtr.i;
3037  req->operationFlag = 0;
3038  FsReadWriteReq::setFormatFlag(req->operationFlag,
3039  FsReadWriteReq::fsFormatSharedPage);
3040 
3041 
3042  if(max > pages)
3043  {
3044  jam();
3045  tail.m_idx -= pages;
3046 
3047  req->varIndex = 1 + tail.m_idx;
3048  req->numberOfPages = pages;
3049  req->data.pageData[0] = pageId - pages;
3050  ptr.p->m_file_pos[TAIL] = tail;
3051 
3052  if(DEBUG_UNDO_EXECUTION)
3053  ndbout_c("a reading from file: %d page(%d-%d) into (%d-%d)",
3054  ptr.i, 1 + tail.m_idx, 1+tail.m_idx+pages-1,
3055  pageId - pages, pageId - 1);
3056 
3057  sendSignal(NDBFS_REF, GSN_FSREADREQ, signal,
3058  FsReadWriteReq::FixedLength + 1, JBA);
3059 
3060  ptr.p->m_outstanding_fs++;
3061  filePtr.p->m_state |= Undofile::FS_OUTSTANDING;
3062  filePtr.p->m_online.m_outstanding = pages;
3063  max = pages;
3064  }
3065  else
3066  {
3067  jam();
3068 
3069  ndbrequire(tail.m_idx - max == 0);
3070  req->varIndex = 1;
3071  req->numberOfPages = max;
3072  req->data.pageData[0] = pageId - max;
3073 
3074  if(DEBUG_UNDO_EXECUTION)
3075  ndbout_c("b reading from file: %d page(%d-%d) into (%d-%d)",
3076  ptr.i, 1 , 1+max-1,
3077  pageId - max, pageId - 1);
3078 
3079  sendSignal(NDBFS_REF, GSN_FSREADREQ, signal,
3080  FsReadWriteReq::FixedLength + 1, JBA);
3081 
3082  ptr.p->m_outstanding_fs++;
3083  filePtr.p->m_online.m_outstanding = max;
3084  filePtr.p->m_state |= Undofile::FS_OUTSTANDING | Undofile::FS_MOVE_NEXT;
3085 
3086  Ptr<Undofile> prev = filePtr;
3087  {
3088  Local_undofile_list files(m_file_pool, ptr.p->m_files);
3089  if(!files.prev(prev))
3090  {
3091  jam();
3092  files.last(prev);
3093  }
3094  }
3095  if(DEBUG_UNDO_EXECUTION)
3096  ndbout_c("changing file from %d to %d", filePtr.i, prev.i);
3097 
3098  tail.m_idx= prev.p->m_file_size - 1;
3099  tail.m_ptr_i= prev.i;
3100  ptr.p->m_file_pos[TAIL] = tail;
3101  if(max < pages && filePtr.i != prev.i)
3102  max += read_undo_pages(signal, ptr, pageId - max, pages - max);
3103  }
3104 
3105  return max;
3106 
3107 }
3108 
3109 void
3110 Lgman::execute_undo_record(Signal* signal)
3111 {
3112  Uint64 lsn;
3113  const Uint32* ptr;
3114  if((ptr = get_next_undo_record(&lsn)))
3115  {
3116  Uint32 len= (* ptr) & 0xFFFF;
3117  Uint32 type= (* ptr) >> 16;
3118  Uint32 mask= type & ~(Uint32)File_formats::Undofile::UNDO_NEXT_LSN;
3119  switch(mask){
3120  case File_formats::Undofile::UNDO_END:
3121  stop_run_undo_log(signal);
3122  return;
3123  case File_formats::Undofile::UNDO_LCP:
3124  case File_formats::Undofile::UNDO_LCP_FIRST:
3125  {
3126  Uint32 lcp = * (ptr - len + 1);
3127  if(m_latest_lcp && lcp > m_latest_lcp)
3128  {
3129  if (0)
3130  {
3131  const Uint32 * base = ptr - len + 1;
3132  Uint32 lcp = base[0];
3133  Uint32 tableId = base[1] >> 16;
3134  Uint32 fragId = base[1] & 0xFFFF;
3135 
3136  ndbout_c("NOT! ignoring lcp: %u tab: %u frag: %u",
3137  lcp, tableId, fragId);
3138  }
3139  }
3140 
3141  if(m_latest_lcp == 0 ||
3142  lcp < m_latest_lcp ||
3143  (lcp == m_latest_lcp &&
3144  mask == File_formats::Undofile::UNDO_LCP_FIRST))
3145  {
3146  stop_run_undo_log(signal);
3147  return;
3148  }
3149  // Fallthrough
3150  }
3151  case File_formats::Undofile::UNDO_TUP_ALLOC:
3152  case File_formats::Undofile::UNDO_TUP_UPDATE:
3153  case File_formats::Undofile::UNDO_TUP_FREE:
3154  case File_formats::Undofile::UNDO_TUP_CREATE:
3155  case File_formats::Undofile::UNDO_TUP_DROP:
3156  case File_formats::Undofile::UNDO_TUP_ALLOC_EXTENT:
3157  case File_formats::Undofile::UNDO_TUP_FREE_EXTENT:
3158  {
3159  Dbtup_client tup(this, m_tup);
3160  tup.disk_restart_undo(signal, lsn, mask, ptr - len + 1, len);
3161  jamEntry();
3162  }
3163  return;
3164  default:
3165  ndbrequire(false);
3166  }
3167  }
3168  signal->theData[0] = LgmanContinueB::EXECUTE_UNDO_RECORD;
3169  sendSignal(LGMAN_REF, GSN_CONTINUEB, signal, 1, JBB);
3170 
3171  return;
3172 }
3173 
3174 const Uint32*
3175 Lgman::get_next_undo_record(Uint64 * this_lsn)
3176 {
3177  Ptr<Logfile_group> ptr;
3178  m_logfile_group_list.first(ptr);
3179 
3180  Logfile_group::Position consumer= ptr.p->m_pos[CONSUMER];
3181  Logfile_group::Position producer= ptr.p->m_pos[PRODUCER];
3182  if(producer.m_current_pos.m_idx < 2)
3183  {
3184  jam();
3188  return 0;
3189  }
3190 
3191  Uint32 pos = consumer.m_current_pos.m_idx;
3192  Uint32 page = consumer.m_current_pos.m_ptr_i;
3193 
3195  m_shared_page_pool.getPtr(page);
3196 
3197  if(pos == 0)
3198  {
3202  pageP->m_data[0] = (File_formats::Undofile::UNDO_END << 16) | 1 ;
3203  pageP->m_page_header.m_page_lsn_hi = 0;
3204  pageP->m_page_header.m_page_lsn_lo = 0;
3205  pos= consumer.m_current_pos.m_idx= pageP->m_words_used = 1;
3206  this_lsn = 0;
3207  return pageP->m_data;
3208  }
3209 
3210  Uint32 *record= pageP->m_data + pos - 1;
3211  Uint32 len= (* record) & 0xFFFF;
3212  ndbrequire(len);
3213  Uint32 *prev= record - len;
3214  Uint64 lsn = 0;
3215 
3216  // Same page
3217  if(((* record) >> 16) & File_formats::Undofile::UNDO_NEXT_LSN)
3218  {
3219  lsn = ptr.p->m_last_read_lsn - 1;
3220  ndbrequire((Int64)lsn >= 0);
3221  }
3222  else
3223  {
3224  ndbrequire(pos >= 3);
3225  lsn += * (prev - 1); lsn <<= 32;
3226  lsn += * (prev - 0);
3227  len += 2;
3228  ndbrequire((Int64)lsn >= 0);
3229  }
3230 
3231 
3232  ndbrequire(pos >= len);
3233 
3234  if(pos == len)
3235  {
3239  ndbrequire(producer.m_current_pos.m_idx);
3240  ptr.p->m_pos[PRODUCER].m_current_pos.m_idx --;
3241 
3242  if(consumer.m_current_page.m_idx)
3243  {
3244  consumer.m_current_page.m_idx--; // left in range
3245  consumer.m_current_pos.m_ptr_i --; // page
3246  }
3247  else
3248  {
3249  // 0 pages left in range...switch range
3251  Page_map map(m_data_buffer_pool, ptr.p->m_buffer_pages);
3252  Uint32 sz = map.getSize();
3253  Uint32 tmp = (consumer.m_current_page.m_ptr_i + sz - 2) % sz;
3254 
3255  map.position(it, tmp);
3256  union {
3257  Uint32 _tmp[2];
3258  Lgman::Buffer_idx range;
3259  };
3260 
3261  _tmp[0] = *it.data; map.next(it); _tmp[1] = *it.data;
3262 
3263  consumer.m_current_page.m_idx = range.m_idx - 1; // left in range
3264  consumer.m_current_page.m_ptr_i = tmp; // pos in map
3265 
3266  consumer.m_current_pos.m_ptr_i = range.m_ptr_i + range.m_idx - 1; // page
3267  }
3268 
3269  if(DEBUG_UNDO_EXECUTION)
3270  ndbout_c("reading from %d", consumer.m_current_pos.m_ptr_i);
3271 
3273  m_shared_page_pool.getPtr(consumer.m_current_pos.m_ptr_i);
3274 
3275  pos= consumer.m_current_pos.m_idx= pageP->m_words_used;
3276 
3277  Uint64 tmp = 0;
3278  tmp += pageP->m_page_header.m_page_lsn_hi; tmp <<= 32;
3279  tmp += pageP->m_page_header.m_page_lsn_lo;
3280 
3281  prev = pageP->m_data + pos - 1;
3282 
3283  if(((* prev) >> 16) & File_formats::Undofile::UNDO_NEXT_LSN)
3284  {
3285  ndbrequire(lsn + 1 == ptr.p->m_last_read_lsn);
3286  }
3287 
3288  ptr.p->m_pos[CONSUMER] = consumer;
3289  ptr.p->m_free_buffer_words += File_formats::UNDO_PAGE_WORDS;
3290  }
3291  else
3292  {
3293  ptr.p->m_pos[CONSUMER].m_current_pos.m_idx -= len;
3294  }
3295 
3296  * this_lsn = ptr.p->m_last_read_lsn = lsn;
3297 
3301  Ptr<Logfile_group> sort = ptr;
3302  if(m_logfile_group_list.next(sort))
3303  {
3304  while(!sort.isNull() && sort.p->m_last_read_lsn > lsn)
3305  m_logfile_group_list.next(sort);
3306 
3307  if(sort.i != ptr.p->nextList)
3308  {
3309  m_logfile_group_list.remove(ptr);
3310  if(sort.isNull())
3311  m_logfile_group_list.add(ptr);
3312  else
3313  m_logfile_group_list.insert(ptr, sort);
3314  }
3315  }
3316  return record;
3317 }
3318 
3319 void
3320 Lgman::stop_run_undo_log(Signal* signal)
3321 {
3322  bool running = false, outstanding = false;
3323  Ptr<Logfile_group> ptr;
3324  m_logfile_group_list.first(ptr);
3325  while(!ptr.isNull())
3326  {
3330  ptr.p->m_state &= ~(Uint32)Logfile_group::LG_EXEC_THREAD;
3331 
3332  if(ptr.p->m_state & Logfile_group::LG_READ_THREAD)
3333  {
3337  running = true;
3338  }
3339  else if(ptr.p->m_outstanding_fs)
3340  {
3341  outstanding = true; // a FSREADREQ is outstanding...wait for it
3342  }
3343  else if(ptr.p->m_state != Logfile_group::LG_ONLINE)
3344  {
3348  ndbrequire(ptr.p->m_state == 0);
3349  ptr.p->m_state = Logfile_group::LG_ONLINE;
3350  Buffer_idx tail= ptr.p->m_file_pos[TAIL];
3351  Uint32 pages= ptr.p->m_pos[PRODUCER].m_current_pos.m_idx;
3352 
3353  while(pages)
3354  {
3356  m_file_pool.getPtr(file, tail.m_ptr_i);
3357  Uint32 page= tail.m_idx;
3358  Uint32 size= file.p->m_file_size;
3359  ndbrequire(size >= page);
3360  Uint32 diff= size - page;
3361 
3362  if(pages >= diff)
3363  {
3364  pages -= diff;
3365  Local_undofile_list files(m_file_pool, ptr.p->m_files);
3366  if(!files.next(file))
3367  files.first(file);
3368  tail.m_idx = 1;
3369  tail.m_ptr_i= file.i;
3370  }
3371  else
3372  {
3373  tail.m_idx += pages;
3374  pages= 0;
3375  }
3376  }
3377  ptr.p->m_tail_pos[0] = tail;
3378  ptr.p->m_tail_pos[1] = tail;
3379  ptr.p->m_tail_pos[2] = tail;
3380  ptr.p->m_file_pos[TAIL] = tail;
3381 
3382  init_logbuffer_pointers(ptr);
3383 
3384  {
3385  Buffer_idx head= ptr.p->m_file_pos[HEAD];
3387  m_file_pool.getPtr(file, head.m_ptr_i);
3388  if (head.m_idx == file.p->m_file_size - 1)
3389  {
3390  Local_undofile_list files(m_file_pool, ptr.p->m_files);
3391  if(!files.next(file))
3392  {
3393  jam();
3394  files.first(file);
3395  }
3396  head.m_idx = 0;
3397  head.m_ptr_i = file.i;
3398  ptr.p->m_file_pos[HEAD] = head;
3399  }
3400  }
3401 
3402  client_lock(number(), __LINE__);
3403  ptr.p->m_free_file_words = (Uint64)File_formats::UNDO_PAGE_WORDS *
3404  (Uint64)compute_free_file_pages(ptr);
3405  client_unlock(number(), __LINE__);
3406  ptr.p->m_next_reply_ptr_i = ptr.p->m_file_pos[HEAD].m_ptr_i;
3407 
3408  ptr.p->m_state |= Logfile_group::LG_FLUSH_THREAD;
3409  signal->theData[0] = LgmanContinueB::FLUSH_LOG;
3410  signal->theData[1] = ptr.i;
3411  signal->theData[2] = 0;
3412  sendSignal(reference(), GSN_CONTINUEB, signal, 3, JBB);
3413 
3414  if(1)
3415  {
3416  SimulatedBlock* fs = globalData.getBlock(NDBFS);
3417  Ptr<Undofile> hf, tf;
3418  m_file_pool.getPtr(tf, tail.m_ptr_i);
3419  m_file_pool.getPtr(hf, ptr.p->m_file_pos[HEAD].m_ptr_i);
3420  infoEvent("Logfile group: %d ", ptr.p->m_logfile_group_id);
3421  g_eventLogger->info("Logfile group: %d ", ptr.p->m_logfile_group_id);
3422  infoEvent(" head: %s page: %d",
3423  fs->get_filename(hf.p->m_fd),
3424  ptr.p->m_file_pos[HEAD].m_idx);
3425  g_eventLogger->info(" head: %s page: %d",
3426  fs->get_filename(hf.p->m_fd),
3427  ptr.p->m_file_pos[HEAD].m_idx);
3428  infoEvent(" tail: %s page: %d",
3429  fs->get_filename(tf.p->m_fd), tail.m_idx);
3430  g_eventLogger->info(" tail: %s page: %d",
3431  fs->get_filename(tf.p->m_fd), tail.m_idx);
3432  }
3433  }
3434 
3435  m_logfile_group_list.next(ptr);
3436  }
3437 
3438  if(running)
3439  {
3440  jam();
3441  return;
3442  }
3443 
3444  if(outstanding)
3445  {
3446  jam();
3447  signal->theData[0] = LgmanContinueB::STOP_UNDO_LOG;
3448  sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 100, 1);
3449  return;
3450  }
3451 
3452  infoEvent("Flushing page cache after undo completion");
3453  g_eventLogger->info("Flushing page cache after undo completion");
3454 
3458  LcpFragOrd * ord = (LcpFragOrd *)signal->getDataPtr();
3459  ord->lcpId = m_latest_lcp;
3460  sendSignal(PGMAN_REF, GSN_LCP_FRAG_ORD, signal,
3461  LcpFragOrd::SignalLength, JBB);
3462 
3463  EndLcpReq* req= (EndLcpReq*)signal->getDataPtr();
3464  req->senderData = 0;
3465  req->senderRef = reference();
3466  req->backupId = m_latest_lcp;
3467  sendSignal(PGMAN_REF, GSN_END_LCP_REQ, signal,
3468  EndLcpReq::SignalLength, JBB);
3469 }
3470 
3471 void
3473 {
3474  {
3475  Dbtup_client tup(this, m_tup);
3476  tup.disk_restart_undo(signal, 0, File_formats::Undofile::UNDO_END, 0, 0);
3477  jamEntry();
3478  }
3479 
3485  Uint32 undo[3];
3486  undo[0] = m_latest_lcp;
3487  undo[1] = (0 << 16) | 0;
3488  undo[2] = (File_formats::Undofile::UNDO_LCP_FIRST << 16 )
3489  | (sizeof(undo) >> 2);
3490 
3491  Ptr<Logfile_group> ptr;
3492  ndbrequire(m_logfile_group_list.first(ptr));
3493 
3494  Uint64 last_lsn= m_last_lsn;
3495  if(ptr.p->m_last_lsn == last_lsn
3496 #ifdef VM_TRACE
3497  && ((rand() % 100) > 50)
3498 #endif
3499  )
3500  {
3501  undo[2] |= File_formats::Undofile::UNDO_NEXT_LSN << 16;
3502  Uint32 *dst= get_log_buffer(ptr, sizeof(undo) >> 2);
3503  memcpy(dst, undo, sizeof(undo));
3504  ndbrequire(ptr.p->m_free_file_words >= (sizeof(undo) >> 2));
3505  ptr.p->m_free_file_words -= (sizeof(undo) >> 2);
3506  }
3507  else
3508  {
3509  Uint32 *dst= get_log_buffer(ptr, (sizeof(undo) >> 2) + 2);
3510  * dst++ = (Uint32)(last_lsn >> 32);
3511  * dst++ = (Uint32)(last_lsn & 0xFFFFFFFF);
3512  memcpy(dst, undo, sizeof(undo));
3513  ndbrequire(ptr.p->m_free_file_words >= ((sizeof(undo) >> 2) + 2));
3514  ptr.p->m_free_file_words -= ((sizeof(undo) >> 2) + 2);
3515  }
3516  m_last_lsn = ptr.p->m_last_lsn = last_lsn + 1;
3517 
3518  ptr.p->m_last_synced_lsn = last_lsn;
3519  while(m_logfile_group_list.next(ptr))
3520  ptr.p->m_last_synced_lsn = last_lsn;
3521 
3522  infoEvent("Flushing complete");
3523  g_eventLogger->info("Flushing complete");
3524 
3525  signal->theData[0] = reference();
3526  sendSignal(DBLQH_REF, GSN_START_RECCONF, signal, 1, JBB);
3527 }
3528 
3529 #ifdef VM_TRACE
3530 void
3531 Lgman::validate_logfile_group(Ptr<Logfile_group> ptr, const char * heading)
3532 {
3533  do
3534  {
3535  if (ptr.p->m_file_pos[HEAD].m_ptr_i == RNIL)
3536  break;
3537 
3538  Uint32 pages = compute_free_file_pages(ptr);
3539 
3540  Uint32 group_pages =
3541  ((ptr.p->m_free_file_words + File_formats::UNDO_PAGE_WORDS - 1)/ File_formats::UNDO_PAGE_WORDS) ;
3542  Uint32 last = ptr.p->m_free_file_words % File_formats::UNDO_PAGE_WORDS;
3543 
3544  if(! (pages >= group_pages))
3545  {
3546  ndbout << heading << " Tail: " << ptr.p->m_file_pos[TAIL]
3547  << " Head: " << ptr.p->m_file_pos[HEAD]
3548  << " free: " << group_pages << "(" << last << ")"
3549  << " found: " << pages;
3550  for(Uint32 i = 0; i<3; i++)
3551  {
3552  ndbout << " - " << ptr.p->m_tail_pos[i];
3553  }
3554  ndbout << endl;
3555 
3556  ndbrequire(pages >= group_pages);
3557  }
3558  } while(0);
3559 }
3560 #endif
3561 
3562 void Lgman::execGET_TABINFOREQ(Signal* signal)
3563 {
3564  jamEntry();
3565 
3566  if(!assembleFragments(signal))
3567  {
3568  return;
3569  }
3570 
3571  GetTabInfoReq * const req = (GetTabInfoReq *)&signal->theData[0];
3572 
3573  const Uint32 reqType = req->requestType & (~GetTabInfoReq::LongSignalConf);
3574  BlockReference retRef= req->senderRef;
3575  Uint32 senderData= req->senderData;
3576  Uint32 tableId= req->tableId;
3577 
3578  if(reqType == GetTabInfoReq::RequestByName)
3579  {
3580  jam();
3581  SectionHandle handle(this, signal);
3582  releaseSections(handle);
3583 
3584  sendGET_TABINFOREF(signal, req, GetTabInfoRef::NoFetchByName);
3585  return;
3586  }
3587 
3588  Logfile_group key;
3589  key.m_logfile_group_id= tableId;
3590  Ptr<Logfile_group> ptr;
3591  m_logfile_group_hash.find(ptr, key);
3592 
3593  if(ptr.p->m_logfile_group_id != tableId)
3594  {
3595  jam();
3596 
3597  sendGET_TABINFOREF(signal, req, GetTabInfoRef::InvalidTableId);
3598  return;
3599  }
3600 
3601 
3602  GetTabInfoConf *conf = (GetTabInfoConf *)&signal->theData[0];
3603 
3604  conf->senderData= senderData;
3605  conf->tableId= tableId;
3606  conf->freeWordsHi= (Uint32)(ptr.p->m_free_file_words >> 32);
3607  conf->freeWordsLo= (Uint32)(ptr.p->m_free_file_words & 0xFFFFFFFF);
3608  conf->tableType= DictTabInfo::LogfileGroup;
3609  conf->senderRef= reference();
3610  sendSignal(retRef, GSN_GET_TABINFO_CONF, signal,
3611  GetTabInfoConf::SignalLength, JBB);
3612 }
3613 
3615  GetTabInfoReq * req,
3616  GetTabInfoRef::ErrorCode errorCode)
3617 {
3618  jamEntry();
3619  GetTabInfoRef * const ref = (GetTabInfoRef *)&signal->theData[0];
3623  BlockReference retRef = req->senderRef;
3624  ref->errorCode = errorCode;
3625 
3626  sendSignal(retRef, GSN_GET_TABINFOREF, signal, signal->length(), JBB);
3627 }