MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
pgman.cpp
1 /*
2  Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 #include "pgman.hpp"
19 #include <signaldata/FsRef.hpp>
20 #include <signaldata/FsConf.hpp>
21 #include <signaldata/FsReadWriteReq.hpp>
22 #include <signaldata/PgmanContinueB.hpp>
23 #include <signaldata/LCP.hpp>
24 #include <signaldata/DataFileOrd.hpp>
25 #include <signaldata/ReleasePages.hpp>
26 
27 #include <dbtup/Dbtup.hpp>
28 
29 #include <DebuggerNames.hpp>
30 #include <md5_hash.hpp>
31 
35 #define DIRTY_FLAGS (Page_request::COMMIT_REQ | \
36  Page_request::DIRTY_REQ | \
37  Page_request::ALLOC_REQ)
38 
39 static bool g_dbg_lcp = false;
40 #if 1
41 #define DBG_LCP(x)
42 #else
43 #define DBG_LCP(x) if(g_dbg_lcp) ndbout << x
44 #endif
45 
46 Pgman::Pgman(Block_context& ctx, Uint32 instanceNumber) :
47  SimulatedBlock(PGMAN, ctx, instanceNumber),
48  m_file_map(m_data_buffer_pool),
49  m_page_hashlist(m_page_entry_pool),
50  m_page_stack(m_page_entry_pool),
51  m_page_queue(m_page_entry_pool)
52 #ifdef VM_TRACE
53  ,debugFlag(false)
54  ,debugSummaryFlag(false)
55 #endif
56 {
57  BLOCK_CONSTRUCTOR(Pgman);
58 
59  // Add received signals
60  addRecSignal(GSN_STTOR, &Pgman::execSTTOR);
61  addRecSignal(GSN_READ_CONFIG_REQ, &Pgman::execREAD_CONFIG_REQ);
62  addRecSignal(GSN_DUMP_STATE_ORD, &Pgman::execDUMP_STATE_ORD);
63  addRecSignal(GSN_CONTINUEB, &Pgman::execCONTINUEB);
64  addRecSignal(GSN_FSREADREF, &Pgman::execFSREADREF, true);
65  addRecSignal(GSN_FSREADCONF, &Pgman::execFSREADCONF);
66  addRecSignal(GSN_FSWRITEREF, &Pgman::execFSWRITEREF, true);
67  addRecSignal(GSN_FSWRITECONF, &Pgman::execFSWRITECONF);
68 
69  addRecSignal(GSN_LCP_FRAG_ORD, &Pgman::execLCP_FRAG_ORD);
70  addRecSignal(GSN_END_LCP_REQ, &Pgman::execEND_LCP_REQ);
71 
72  addRecSignal(GSN_DATA_FILE_ORD, &Pgman::execDATA_FILE_ORD);
73  addRecSignal(GSN_RELEASE_PAGES_REQ, &Pgman::execRELEASE_PAGES_REQ);
74  addRecSignal(GSN_DBINFO_SCANREQ, &Pgman::execDBINFO_SCANREQ);
75 
76  // loop status
77  m_stats_loop_on = false;
78  m_busy_loop_on = false;
79  m_cleanup_loop_on = false;
80 
81  // LCP variables
82  m_lcp_state = LS_LCP_OFF;
83  m_last_lcp = 0;
84  m_last_lcp_complete = 0;
85  m_lcp_curr_bucket = ~(Uint32)0;
86  m_lcp_outstanding = 0;
87 
88  // clean-up variables
89  m_cleanup_ptr.i = RNIL;
90 
91  // should be a factor larger than number of pool pages
92  m_data_buffer_pool.setSize(16);
93  m_page_hashlist.setSize(512);
94 
95  for (Uint32 k = 0; k < Page_entry::SUBLIST_COUNT; k++)
96  m_page_sublist[k] = new Page_sublist(m_page_entry_pool);
97 
98  {
99  CallbackEntry& ce = m_callbackEntry[THE_NULL_CALLBACK];
100  ce.m_function = TheNULLCallback.m_callbackFunction;
101  ce.m_flags = 0;
102  }
103  {
104  CallbackEntry& ce = m_callbackEntry[LOGSYNC_CALLBACK];
105  ce.m_function = safe_cast(&Pgman::logsync_callback);
106  ce.m_flags = 0;
107  }
108  {
109  CallbackTable& ct = m_callbackTable;
110  ct.m_count = COUNT_CALLBACKS;
111  ct.m_entry = m_callbackEntry;
112  m_callbackTableAddr = &ct;
113  }
114 }
115 
116 Pgman::~Pgman()
117 {
118  for (Uint32 k = 0; k < Page_entry::SUBLIST_COUNT; k++)
119  delete m_page_sublist[k];
120 }
121 
122 BLOCK_FUNCTIONS(Pgman)
123 
124 void
126 {
127  jamEntry();
128 
129  const ReadConfigReq * req = (ReadConfigReq*)signal->getDataPtr();
130 
131  Uint32 ref = req->senderRef;
132  Uint32 senderData = req->senderData;
133 
134  const ndb_mgm_configuration_iterator * p =
135  m_ctx.m_config.getOwnConfigIterator();
136  ndbrequire(p != 0);
137 
138  Uint64 page_buffer = 64*1024*1024;
139  ndb_mgm_get_int64_parameter(p, CFG_DB_DISK_PAGE_BUFFER_MEMORY, &page_buffer);
140 
141  if (page_buffer > 0)
142  {
143  if (isNdbMtLqh())
144  {
145  // divide between workers - wl4391_todo give extra worker less
146  Uint32 workers = getLqhWorkers() + 1;
147  page_buffer = page_buffer / workers;
148  Uint32 min_buffer = 4*1024*1024;
149  if (page_buffer < min_buffer)
150  page_buffer = min_buffer;
151  }
152  // convert to pages
153  Uint32 page_cnt = Uint32((page_buffer + GLOBAL_PAGE_SIZE - 1) / GLOBAL_PAGE_SIZE);
154 
155  if (ERROR_INSERTED(11009))
156  {
157  page_cnt = 25;
158  ndbout_c("Setting page_cnt = %u", page_cnt);
159  }
160 
161  m_param.m_max_pages = page_cnt;
162  m_page_entry_pool.setSize(m_param.m_lirs_stack_mult * page_cnt);
163  m_param.m_max_hot_pages = (page_cnt * 9) / 10;
164  ndbrequire(m_param.m_max_hot_pages >= 1);
165  }
166 
167  Pool_context pc;
168  pc.m_block = this;
169  m_page_request_pool.wo_pool_init(RT_PGMAN_PAGE_REQUEST, pc);
170 
171  ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend();
172  conf->senderRef = reference();
173  conf->senderData = senderData;
174  sendSignal(ref, GSN_READ_CONFIG_CONF, signal,
175  ReadConfigConf::SignalLength, JBB);
176 }
177 
178 Pgman::Param::Param() :
179  m_max_pages(64), // smallish for testing
180  m_lirs_stack_mult(10),
181  m_max_hot_pages(56),
182  m_max_loop_count(256),
183  m_max_io_waits(256),
184  m_stats_loop_delay(1000),
185  m_cleanup_loop_delay(200),
186  m_lcp_loop_delay(0)
187 {
188 }
189 
190 void
191 Pgman::execSTTOR(Signal* signal)
192 {
193  jamEntry();
194 
195  const Uint32 startPhase = signal->theData[1];
196 
197  switch (startPhase) {
198  case 1:
199  {
200  if (!isNdbMtLqh()) {
201  c_tup = (Dbtup*)globalData.getBlock(DBTUP);
202  } else if (instance() <= getLqhWorkers()) {
203  c_tup = (Dbtup*)globalData.getBlock(DBTUP, instance());
204  ndbrequire(c_tup != 0);
205  } else {
206  // extra worker
207  c_tup = 0;
208  }
209  c_lgman = (Lgman*)globalData.getBlock(LGMAN);
210  c_tsman = (Tsman*)globalData.getBlock(TSMAN);
211  }
212  break;
213  case 3:
214  {
215  // start forever loops
216  do_stats_loop(signal);
217  do_cleanup_loop(signal);
218  m_stats_loop_on = true;
219  m_cleanup_loop_on = true;
220  }
221  break;
222  case 7:
223  break;
224  default:
225  break;
226  }
227 
228  sendSTTORRY(signal);
229 }
230 
231 void
232 Pgman::sendSTTORRY(Signal* signal)
233 {
234  signal->theData[0] = 0;
235  signal->theData[3] = 1;
236  signal->theData[4] = 3;
237  signal->theData[5] = 7;
238  signal->theData[6] = 255; // No more start phases from missra
239  BlockReference cntrRef = !isNdbMtLqh() ? NDBCNTR_REF : PGMAN_REF;
240  sendSignal(cntrRef, GSN_STTORRY, signal, 7, JBB);
241 }
242 
243 void
244 Pgman::execCONTINUEB(Signal* signal)
245 {
246  jamEntry();
247  Uint32 data1 = signal->theData[1];
248 
249  switch (signal->theData[0]) {
250  case PgmanContinueB::STATS_LOOP:
251  jam();
252  do_stats_loop(signal);
253  break;
254  case PgmanContinueB::BUSY_LOOP:
255  jam();
256  do_busy_loop(signal);
257  break;
258  case PgmanContinueB::CLEANUP_LOOP:
259  jam();
260  do_cleanup_loop(signal);
261  break;
262  case PgmanContinueB::LCP_LOOP:
263  jam();
264  do_lcp_loop(signal);
265  break;
266  case PgmanContinueB::LCP_LOCKED:
267  {
268  jam();
269  Ptr<Page_entry> ptr;
270  Page_sublist& pl = *m_page_sublist[Page_entry::SL_LOCKED];
271  if (data1 != RNIL)
272  {
273  jam();
274  pl.getPtr(ptr, data1);
275  process_lcp_locked(signal, ptr);
276  }
277  else
278  {
279  jam();
280  if (ERROR_INSERTED(11007))
281  {
282  ndbout << "No more writes..." << endl;
283  SET_ERROR_INSERT_VALUE(11008);
284  signal->theData[0] = 9999;
285  sendSignalWithDelay(CMVMI_REF, GSN_NDB_TAMPER, signal, 10000, 1);
286  }
287  EndLcpConf* conf = (EndLcpConf*)signal->getDataPtrSend();
288  conf->senderData = m_end_lcp_req.senderData;
289  conf->senderRef = reference();
290  sendSignal(m_end_lcp_req.senderRef, GSN_END_LCP_CONF,
291  signal, EndLcpConf::SignalLength, JBB);
292  m_lcp_state = LS_LCP_OFF;
293  }
294  return;
295  }
296  default:
297  ndbrequire(false);
298  break;
299  }
300 }
301 
302 // page entry
303 
304 Pgman::Page_entry::Page_entry(Uint32 file_no, Uint32 page_no) :
305  m_file_no(file_no),
306  m_state(0),
307  m_page_no(page_no),
308  m_real_page_i(RNIL),
309  m_lsn(0),
310  m_last_lcp(0),
311  m_dirty_count(0),
312  m_copy_page_i(RNIL),
313  m_busy_count(0),
314  m_requests()
315 {
316 }
317 
318 // page lists
319 
320 Uint32
321 Pgman::get_sublist_no(Page_state state)
322 {
323  if (state & Page_entry::REQUEST)
324  {
325  if (! (state & Page_entry::BOUND))
326  {
327  return Page_entry::SL_BIND;
328  }
329  if (! (state & Page_entry::MAPPED))
330  {
331  if (! (state & Page_entry::PAGEIN))
332  {
333  return Page_entry::SL_MAP;
334  }
335  return Page_entry::SL_MAP_IO;
336  }
337  if (! (state & Page_entry::PAGEOUT))
338  {
339  return Page_entry::SL_CALLBACK;
340  }
341  return Page_entry::SL_CALLBACK_IO;
342  }
343  if (state & Page_entry::BUSY)
344  {
345  return Page_entry::SL_BUSY;
346  }
347  if (state & Page_entry::LOCKED)
348  {
349  return Page_entry::SL_LOCKED;
350  }
351  if (state == Page_entry::ONSTACK) {
352  return Page_entry::SL_IDLE;
353  }
354  if (state != 0)
355  {
356  return Page_entry::SL_OTHER;
357  }
358  return ZNIL;
359 }
360 
361 void
362 Pgman::set_page_state(Ptr<Page_entry> ptr, Page_state new_state)
363 {
364  D(">set_page_state: state=" << hex << new_state);
365  D(ptr << ": before");
366 
367  Page_state old_state = ptr.p->m_state;
368  if (old_state != new_state)
369  {
370  Uint32 old_list_no = get_sublist_no(old_state);
371  Uint32 new_list_no = get_sublist_no(new_state);
372  if (old_state != 0)
373  {
374  ndbrequire(old_list_no != ZNIL);
375  if (old_list_no != new_list_no)
376  {
377  Page_sublist& old_list = *m_page_sublist[old_list_no];
378  old_list.remove(ptr);
379  }
380  }
381  if (new_state != 0)
382  {
383  ndbrequire(new_list_no != ZNIL);
384  if (old_list_no != new_list_no)
385  {
386  Page_sublist& new_list = *m_page_sublist[new_list_no];
387  new_list.add(ptr);
388  }
389  }
390  ptr.p->m_state = new_state;
391 
392  bool old_hot = (old_state & Page_entry::HOT);
393  bool new_hot = (new_state & Page_entry::HOT);
394  if (! old_hot && new_hot)
395  {
396  jam();
397  m_stats.m_num_hot_pages++;
398  }
399  if (old_hot && ! new_hot)
400  {
401  jam();
402  ndbrequire(m_stats.m_num_hot_pages != 0);
403  m_stats.m_num_hot_pages--;
404  }
405  }
406 
407  D(ptr << ": after");
408 #ifdef VM_TRACE
409  verify_page_entry(ptr);
410 #endif
411  D("<set_page_state");
412 }
413 
414 // seize/release pages and entries
415 
416 bool
417 Pgman::seize_cache_page(Ptr<GlobalPage>& gptr)
418 {
419  // page cache has no own pool yet
420  bool ok = m_global_page_pool.seize(gptr);
421 
422  // zero is reserved as return value for queued request
423  if (ok && gptr.i == 0)
424  ok = m_global_page_pool.seize(gptr);
425 
426  if (ok)
427  {
428  ndbrequire(m_stats.m_num_pages < m_param.m_max_pages);
429  m_stats.m_num_pages++;
430  }
431  return ok;
432 }
433 
434 void
435 Pgman::release_cache_page(Uint32 i)
436 {
437  m_global_page_pool.release(i);
438 
439  ndbrequire(m_stats.m_num_pages != 0);
440  m_stats.m_num_pages--;
441 }
442 
443 bool
444 Pgman::find_page_entry(Ptr<Page_entry>& ptr, Uint32 file_no, Uint32 page_no)
445 {
446  Page_entry key;
447  key.m_file_no = file_no;
448  key.m_page_no = page_no;
449 
450  if (m_page_hashlist.find(ptr, key))
451  {
452  D("find_page_entry");
453  D(ptr);
454  return true;
455  }
456  return false;
457 }
458 
459 Uint32
460 Pgman::seize_page_entry(Ptr<Page_entry>& ptr, Uint32 file_no, Uint32 page_no)
461 {
462  if (m_page_entry_pool.seize(ptr))
463  {
464  new (ptr.p) Page_entry(file_no, page_no);
465  m_page_hashlist.add(ptr);
466 #ifdef VM_TRACE
467  ptr.p->m_this = this;
468 #endif
469  D("seize_page_entry");
470  D(ptr);
471 
472  return true;
473  }
474  return false;
475 }
476 
477 bool
478 Pgman::get_page_entry(Ptr<Page_entry>& ptr, Uint32 file_no, Uint32 page_no)
479 {
480  if (find_page_entry(ptr, file_no, page_no))
481  {
482  jam();
483  ndbrequire(ptr.p->m_state != 0);
484  m_stats.m_page_hits++;
485 
486  D("get_page_entry: found");
487  D(ptr);
488  return true;
489  }
490 
491  if (m_page_entry_pool.getNoOfFree() == 0)
492  {
493  jam();
494  Page_sublist& pl_idle = *m_page_sublist[Page_entry::SL_IDLE];
495  Ptr<Page_entry> idle_ptr;
496  if (pl_idle.first(idle_ptr))
497  {
498  jam();
499 
500  D("get_page_entry: re-use idle entry");
501  D(idle_ptr);
502 
503  Page_state state = idle_ptr.p->m_state;
504  ndbrequire(state == Page_entry::ONSTACK);
505 
506  Page_stack& pl_stack = m_page_stack;
507  ndbrequire(pl_stack.hasPrev(idle_ptr));
508  pl_stack.remove(idle_ptr);
509  state &= ~ Page_entry::ONSTACK;
510  set_page_state(idle_ptr, state);
511  ndbrequire(idle_ptr.p->m_state == 0);
512 
513  release_page_entry(idle_ptr);
514  }
515  }
516 
517  if (seize_page_entry(ptr, file_no, page_no))
518  {
519  jam();
520  ndbrequire(ptr.p->m_state == 0);
521  m_stats.m_page_faults++;
522 
523  D("get_page_entry: seize");
524  D(ptr);
525  return true;
526  }
527 
528  ndbrequire(false);
529 
530  return false;
531 }
532 
533 void
534 Pgman::release_page_entry(Ptr<Page_entry>& ptr)
535 {
536  D("release_page_entry");
537  D(ptr);
538  Page_state state = ptr.p->m_state;
539 
540  ndbrequire(ptr.p->m_requests.isEmpty());
541 
542  ndbrequire(! (state & Page_entry::ONSTACK));
543  ndbrequire(! (state & Page_entry::ONQUEUE));
544  ndbrequire(ptr.p->m_real_page_i == RNIL);
545 
546  if (! (state & Page_entry::LOCKED))
547  {
548  ndbrequire(! (state & Page_entry::REQUEST));
549  }
550 
551  if (ptr.p->m_copy_page_i != RNIL)
552  {
553  m_global_page_pool.release(ptr.p->m_copy_page_i);
554  }
555 
556  set_page_state(ptr, 0);
557  m_page_hashlist.remove(ptr);
558  m_page_entry_pool.release(ptr);
559 }
560 
561 // LIRS
562 
563 /*
564  * After the hot entry at stack bottom is removed, additional entries
565  * are removed until next hot entry is found. There are 3 cases for the
566  * removed entry: 1) a bound entry is already on queue 2) an unbound
567  * entry with open requests enters queue at bind time 3) an unbound
568  * entry without requests is returned to entry pool.
569  */
570 void
571 Pgman::lirs_stack_prune()
572 {
573  D(">lirs_stack_prune");
574  Page_stack& pl_stack = m_page_stack;
575  Ptr<Page_entry> ptr;
576 
577  while (pl_stack.first(ptr)) // first is stack bottom
578  {
579  Page_state state = ptr.p->m_state;
580  if (state & Page_entry::HOT)
581  {
582  jam();
583  break;
584  }
585 
586  D(ptr << ": prune from stack");
587 
588  pl_stack.remove(ptr);
589  state &= ~ Page_entry::ONSTACK;
590  set_page_state(ptr, state);
591 
592  if (state & Page_entry::BOUND)
593  {
594  jam();
595  ndbrequire(state & Page_entry::ONQUEUE);
596  }
597  else if (state & Page_entry::REQUEST)
598  {
599  // enters queue at bind
600  jam();
601  ndbrequire(! (state & Page_entry::ONQUEUE));
602  }
603  else
604  {
605  jam();
606  release_page_entry(ptr);
607  }
608  }
609  D("<lirs_stack_prune");
610 }
611 
612 /*
613  * Remove the hot entry at stack bottom and make it cold and do stack
614  * pruning. There are 2 cases for the removed entry: 1) a bound entry
615  * is moved to queue 2) an unbound entry must have requests and enters
616  * queue at bind time.
617  */
618 void
619 Pgman::lirs_stack_pop()
620 {
621  D("lirs_stack_pop");
622  Page_stack& pl_stack = m_page_stack;
623  Page_queue& pl_queue = m_page_queue;
624 
625  Ptr<Page_entry> ptr;
626  bool ok = pl_stack.first(ptr);
627  ndbrequire(ok);
628  Page_state state = ptr.p->m_state;
629 
630  D(ptr << ": pop from stack");
631 
632  ndbrequire(state & Page_entry::HOT);
633  ndbrequire(state & Page_entry::ONSTACK);
634  pl_stack.remove(ptr);
635  state &= ~ Page_entry::HOT;
636  state &= ~ Page_entry::ONSTACK;
637  ndbrequire(! (state & Page_entry::ONQUEUE));
638 
639  if (state & Page_entry::BOUND)
640  {
641  jam();
642  pl_queue.add(ptr);
643  state |= Page_entry::ONQUEUE;
644  }
645  else
646  {
647  // enters queue at bind
648  jam();
649  ndbrequire(state & Page_entry::REQUEST);
650  }
651 
652  set_page_state(ptr, state);
653  lirs_stack_prune();
654 }
655 
656 /*
657  * Update LIRS lists when page is referenced.
658  */
659 void
660 Pgman::lirs_reference(Ptr<Page_entry> ptr)
661 {
662  D(">lirs_reference");
663  D(ptr);
664  Page_stack& pl_stack = m_page_stack;
665  Page_queue& pl_queue = m_page_queue;
666 
667  Page_state state = ptr.p->m_state;
668  ndbrequire(! (state & Page_entry::LOCKED));
669 
670  ndbrequire(m_stats.m_num_hot_pages <= m_param.m_max_hot_pages);
671 
672  // LIRS kicks in when we have max hot pages
673  if (m_stats.m_num_hot_pages == m_param.m_max_hot_pages)
674  {
675  if (state & Page_entry::HOT)
676  {
677  // case 1
678  jam();
679  ndbrequire(state & Page_entry::ONSTACK);
680  bool at_bottom = ! pl_stack.hasPrev(ptr);
681  pl_stack.remove(ptr);
682  pl_stack.add(ptr);
683  if (at_bottom)
684  {
685  jam();
686  lirs_stack_prune();
687  }
688  }
689  else if (state & Page_entry::ONSTACK)
690  {
691  // case 2a 3a
692  jam();
693  pl_stack.remove(ptr);
694  if (! pl_stack.isEmpty())
695  {
696  jam();
697  lirs_stack_pop();
698  }
699  pl_stack.add(ptr);
700  state |= Page_entry::HOT;
701  if (state & Page_entry::ONQUEUE)
702  {
703  jam();
704  move_cleanup_ptr(ptr);
705  pl_queue.remove(ptr);
706  state &= ~ Page_entry::ONQUEUE;
707  }
708  }
709  else
710  {
711  // case 2b 3b
712  jam();
713  pl_stack.add(ptr);
714  state |= Page_entry::ONSTACK;
715  /*
716  * bug#48910. Using hot page count (not total page count)
717  * guarantees that stack is not empty here. Therefore the new
718  * entry (added to top) is not at bottom and need not be hot.
719  */
720  ndbrequire(pl_stack.hasPrev(ptr));
721  if (state & Page_entry::ONQUEUE)
722  {
723  jam();
724  move_cleanup_ptr(ptr);
725  pl_queue.remove(ptr);
726  state &= ~ Page_entry::ONQUEUE;
727  }
728  if (state & Page_entry::BOUND)
729  {
730  jam();
731  pl_queue.add(ptr);
732  state |= Page_entry::ONQUEUE;
733  }
734  else
735  {
736  // enters queue at bind
737  jam();
738  }
739  }
740  }
741  else
742  {
743  D("filling up hot pages: " << m_stats.m_num_hot_pages << "/"
744  << m_param.m_max_hot_pages);
745  jam();
746  if (state & Page_entry::ONSTACK)
747  {
748  jam();
749  bool at_bottom = ! pl_stack.hasPrev(ptr);
750  pl_stack.remove(ptr);
751  if (at_bottom)
752  {
753  jam();
754  ndbassert(state & Page_entry::HOT);
755  lirs_stack_prune();
756  }
757  }
758  pl_stack.add(ptr);
759  state |= Page_entry::ONSTACK;
760  state |= Page_entry::HOT;
761  // it could be on queue already
762  if (state & Page_entry::ONQUEUE) {
763  jam();
764  pl_queue.remove(ptr);
765  state &= ~Page_entry::ONQUEUE;
766  }
767  }
768 
769  set_page_state(ptr, state);
770  D("<lirs_reference");
771 }
772 
773 // continueB loops
774 
775 void
776 Pgman::do_stats_loop(Signal* signal)
777 {
778  D("do_stats_loop");
779 #ifdef VM_TRACE
780  verify_all();
781 #endif
782  Uint32 delay = m_param.m_stats_loop_delay;
783  signal->theData[0] = PgmanContinueB::STATS_LOOP;
784  sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, delay, 1);
785 }
786 
787 void
788 Pgman::do_busy_loop(Signal* signal, bool direct)
789 {
790  D(">do_busy_loop on=" << m_busy_loop_on << " direct=" << direct);
791  Uint32 restart = false;
792  if (direct)
793  {
794  // may not cover the calling entry
795  (void)process_bind(signal);
796  (void)process_map(signal);
797  // callback must be queued
798  if (! m_busy_loop_on)
799  {
800  restart = true;
801  m_busy_loop_on = true;
802  }
803  }
804  else
805  {
806  ndbrequire(m_busy_loop_on);
807  restart += process_bind(signal);
808  restart += process_map(signal);
809  restart += process_callback(signal);
810  if (! restart)
811  {
812  m_busy_loop_on = false;
813  }
814  }
815  if (restart)
816  {
817  signal->theData[0] = PgmanContinueB::BUSY_LOOP;
818  sendSignal(reference(), GSN_CONTINUEB, signal, 1, JBB);
819  }
820  D("<do_busy_loop on=" << m_busy_loop_on << " restart=" << restart);
821 }
822 
823 void
824 Pgman::do_cleanup_loop(Signal* signal)
825 {
826  D("do_cleanup_loop");
827  process_cleanup(signal);
828 
829  Uint32 delay = m_param.m_cleanup_loop_delay;
830  signal->theData[0] = PgmanContinueB::CLEANUP_LOOP;
831  sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, delay, 1);
832 }
833 
834 void
835 Pgman::do_lcp_loop(Signal* signal)
836 {
837  D(">do_lcp_loop m_lcp_state=" << Uint32(m_lcp_state));
838  ndbrequire(m_lcp_state != LS_LCP_OFF);
839  LCP_STATE newstate = process_lcp(signal);
840 
841  switch(newstate) {
842  case LS_LCP_OFF:
843  jam();
844  break;
845  case LS_LCP_ON:
846  jam();
847  signal->theData[0] = PgmanContinueB::LCP_LOOP;
848  sendSignal(reference(), GSN_CONTINUEB, signal, 1, JBB);
849  break;
850  case LS_LCP_MAX_LCP_OUTSTANDING: // wait until io is completed
851  jam();
852  break;
853  case LS_LCP_LOCKED:
854  jam();
855  break;
856  }
857  m_lcp_state = newstate;
858  D("<do_lcp_loop m_lcp_state=" << Uint32(m_lcp_state));
859 }
860 
861 // busy loop
862 
863 bool
864 Pgman::process_bind(Signal* signal)
865 {
866  D(">process_bind");
867  int max_count = 32;
868  Page_sublist& pl_bind = *m_page_sublist[Page_entry::SL_BIND];
869 
870  while (! pl_bind.isEmpty() && --max_count >= 0)
871  {
872  jam();
873  Ptr<Page_entry> ptr;
874  pl_bind.first(ptr);
875  if (! process_bind(signal, ptr))
876  {
877  jam();
878  break;
879  }
880  }
881  D("<process_bind");
882  return ! pl_bind.isEmpty();
883 }
884 
885 bool
886 Pgman::process_bind(Signal* signal, Ptr<Page_entry> ptr)
887 {
888  D(ptr << " : process_bind");
889  Page_queue& pl_queue = m_page_queue;
890  Ptr<GlobalPage> gptr;
891 
892  if (m_stats.m_num_pages < m_param.m_max_pages)
893  {
894  jam();
895  bool ok = seize_cache_page(gptr);
896  // to handle failure requires some changes in LIRS
897  ndbrequire(ok);
898  }
899  else
900  {
901  jam();
902  Ptr<Page_entry> clean_ptr;
903  if (! pl_queue.first(clean_ptr))
904  {
905  jam();
906  D("bind failed: queue empty");
907  // XXX busy loop
908  return false;
909  }
910  Page_state clean_state = clean_ptr.p->m_state;
911  // under unusual circumstances it could still be paging in
912  if (! (clean_state & Page_entry::MAPPED) ||
913  clean_state & Page_entry::DIRTY ||
914  clean_state & Page_entry::REQUEST)
915  {
916  jam();
917  D("bind failed: queue front not evictable");
918  D(clean_ptr);
919  // XXX busy loop
920  return false;
921  }
922 
923  D(clean_ptr << " : evict");
924 
925  ndbassert(clean_ptr.p->m_dirty_count == 0);
926  ndbrequire(clean_state & Page_entry::ONQUEUE);
927  ndbrequire(clean_state & Page_entry::BOUND);
928  ndbrequire(clean_state & Page_entry::MAPPED);
929 
930  move_cleanup_ptr(clean_ptr);
931  pl_queue.remove(clean_ptr);
932  clean_state &= ~ Page_entry::ONQUEUE;
933 
934  gptr.i = clean_ptr.p->m_real_page_i;
935 
936  clean_ptr.p->m_real_page_i = RNIL;
937  clean_state &= ~ Page_entry::BOUND;
938  clean_state &= ~ Page_entry::MAPPED;
939 
940  set_page_state(clean_ptr, clean_state);
941 
942  if (! (clean_state & Page_entry::ONSTACK))
943  release_page_entry(clean_ptr);
944 
945  m_global_page_pool.getPtr(gptr);
946  }
947 
948  Page_state state = ptr.p->m_state;
949 
950  ptr.p->m_real_page_i = gptr.i;
951  state |= Page_entry::BOUND;
952  if (state & Page_entry::EMPTY)
953  {
954  jam();
955  state |= Page_entry::MAPPED;
956  }
957 
958  if (! (state & Page_entry::LOCKED) &&
959  ! (state & Page_entry::ONQUEUE) &&
960  ! (state & Page_entry::HOT))
961  {
962  jam();
963 
964  D(ptr << " : add to queue at bind");
965  pl_queue.add(ptr);
966  state |= Page_entry::ONQUEUE;
967  }
968 
969  set_page_state(ptr, state);
970  return true;
971 }
972 
973 bool
974 Pgman::process_map(Signal* signal)
975 {
976  D(">process_map");
977  int max_count = 0;
978  if (m_param.m_max_io_waits > m_stats.m_current_io_waits) {
979  max_count = m_param.m_max_io_waits - m_stats.m_current_io_waits;
980  max_count = max_count / 2 + 1;
981  }
982  Page_sublist& pl_map = *m_page_sublist[Page_entry::SL_MAP];
983 
984  while (! pl_map.isEmpty() && --max_count >= 0)
985  {
986  jam();
987  Ptr<Page_entry> ptr;
988  pl_map.first(ptr);
989  if (! process_map(signal, ptr))
990  {
991  jam();
992  break;
993  }
994  }
995  D("<process_map");
996  return ! pl_map.isEmpty();
997 }
998 
999 bool
1000 Pgman::process_map(Signal* signal, Ptr<Page_entry> ptr)
1001 {
1002  D(ptr << " : process_map");
1003  pagein(signal, ptr);
1004  return true;
1005 }
1006 
1007 bool
1008 Pgman::process_callback(Signal* signal)
1009 {
1010  D(">process_callback");
1011  int max_count = 1;
1012  Page_sublist& pl_callback = *m_page_sublist[Page_entry::SL_CALLBACK];
1013 
1014  Ptr<Page_entry> ptr;
1015  pl_callback.first(ptr);
1016 
1017  while (! ptr.isNull() && --max_count >= 0)
1018  {
1019  jam();
1020  Ptr<Page_entry> curr = ptr;
1021  pl_callback.next(ptr);
1022 
1023  if (! process_callback(signal, curr))
1024  {
1025  jam();
1026  break;
1027  }
1028  }
1029  D("<process_callback");
1030  return ! pl_callback.isEmpty();
1031 }
1032 
1033 bool
1034 Pgman::process_callback(Signal* signal, Ptr<Page_entry> ptr)
1035 {
1036  D(ptr << " : process_callback");
1037  int max_count = 1;
1038 
1039  while (! ptr.p->m_requests.isEmpty() && --max_count >= 0)
1040  {
1041  jam();
1042  Page_state state = ptr.p->m_state;
1043  SimulatedBlock* b;
1044  Callback callback;
1045  {
1050  Local_page_request_list req_list(m_page_request_pool, ptr.p->m_requests);
1051  Ptr<Page_request> req_ptr;
1052 
1053  req_list.first(req_ptr);
1054  D(req_ptr << " : process_callback");
1055 
1056 #ifdef ERROR_INSERT
1057  if (req_ptr.p->m_flags & Page_request::DELAY_REQ)
1058  {
1059  Uint64 now = NdbTick_CurrentMillisecond();
1060  if (now < req_ptr.p->m_delay_until_time)
1061  {
1062  break;
1063  }
1064  }
1065 #endif
1066 
1067  Uint32 blockNo = blockToMain(req_ptr.p->m_block);
1068  Uint32 instanceNo = blockToInstance(req_ptr.p->m_block);
1069  b = globalData.getBlock(blockNo, instanceNo);
1070  callback = req_ptr.p->m_callback;
1071 
1072  if (req_ptr.p->m_flags & DIRTY_FLAGS)
1073  {
1074  jam();
1075  state |= Page_entry::DIRTY;
1076  ndbassert(ptr.p->m_dirty_count);
1077  ptr.p->m_dirty_count --;
1078  }
1079 
1080  req_list.releaseFirst(req_ptr);
1081  }
1082  ndbrequire(state & Page_entry::BOUND);
1083  ndbrequire(state & Page_entry::MAPPED);
1084 
1085  // make REQUEST state consistent before set_page_state()
1086  if (ptr.p->m_requests.isEmpty())
1087  {
1088  jam();
1089  state &= ~ Page_entry::REQUEST;
1090  }
1091 
1092  // callback may re-enter PGMAN and change page state
1093  set_page_state(ptr, state);
1094  b->execute(signal, callback, ptr.p->m_real_page_i);
1095  }
1096  return true;
1097 }
1098 
1099 // cleanup loop
1100 
1101 bool
1102 Pgman::process_cleanup(Signal* signal)
1103 {
1104  D(">process_cleanup");
1105  Page_queue& pl_queue = m_page_queue;
1106 
1107  // XXX for now start always from beginning
1108  m_cleanup_ptr.i = RNIL;
1109 
1110  if (m_cleanup_ptr.i == RNIL && ! pl_queue.first(m_cleanup_ptr))
1111  {
1112  jam();
1113  D("<process_cleanup: empty queue");
1114  return false;
1115  }
1116 
1117  int max_loop_count = m_param.m_max_loop_count;
1118  int max_count = 0;
1119  if (m_param.m_max_io_waits > m_stats.m_current_io_waits) {
1120  max_count = m_param.m_max_io_waits - m_stats.m_current_io_waits;
1121  max_count = max_count / 2 + 1;
1122  }
1123 
1124  Ptr<Page_entry> ptr = m_cleanup_ptr;
1125  while (max_loop_count != 0 && max_count != 0)
1126  {
1127  Page_state state = ptr.p->m_state;
1128  ndbrequire(! (state & Page_entry::LOCKED));
1129  if (state & Page_entry::BUSY)
1130  {
1131  D("process_cleanup: break on busy page");
1132  D(ptr);
1133  break;
1134  }
1135  if (state & Page_entry::DIRTY &&
1136  ! (state & Page_entry::PAGEIN) &&
1137  ! (state & Page_entry::PAGEOUT))
1138  {
1139  D(ptr << " : process_cleanup");
1140  if (c_tup != 0)
1141  c_tup->disk_page_unmap_callback(0,
1142  ptr.p->m_real_page_i,
1143  ptr.p->m_dirty_count);
1144  pageout(signal, ptr);
1145  max_count--;
1146  }
1147  if (! pl_queue.hasNext(ptr))
1148  break;
1149  pl_queue.next(ptr);
1150  max_loop_count--;
1151  }
1152  m_cleanup_ptr = ptr;
1153  D("<process_cleanup");
1154  return true;
1155 }
1156 
1157 /*
1158  * Call this before queue.remove(ptr). If the removed entry is the
1159  * clean-up pointer, move it towards front.
1160  */
1161 void
1162 Pgman::move_cleanup_ptr(Ptr<Page_entry> ptr)
1163 {
1164  Page_queue& pl_queue = m_page_queue;
1165  if (ptr.i == m_cleanup_ptr.i)
1166  {
1167  jam();
1168  pl_queue.prev(m_cleanup_ptr);
1169  }
1170 }
1171 
1172 // LCP
1173 
1174 
1175 void
1176 Pgman::execLCP_FRAG_ORD(Signal* signal)
1177 {
1178  if (ERROR_INSERTED(11008))
1179  {
1180  ndbout_c("Ignore LCP_FRAG_ORD");
1181  return;
1182  }
1183  LcpFragOrd* ord = (LcpFragOrd*)signal->getDataPtr();
1184  ndbrequire(ord->lcpId >= m_last_lcp_complete + 1 || m_last_lcp_complete == 0);
1185  m_last_lcp = ord->lcpId;
1186  DBG_LCP("Pgman::execLCP_FRAG_ORD lcp: " << m_last_lcp << endl);
1187 
1188  D("execLCP_FRAG_ORD"
1189  << " this=" << m_last_lcp
1190  << " last_complete=" << m_last_lcp_complete
1191  << " bucket=" << m_lcp_curr_bucket);
1192 }
1193 
1194 void
1195 Pgman::execEND_LCP_REQ(Signal* signal)
1196 {
1197  if (ERROR_INSERTED(11008))
1198  {
1199  ndbout_c("Ignore END_LCP");
1200  return;
1201  }
1202 
1203  EndLcpReq* req = (EndLcpReq*)signal->getDataPtr();
1204  m_end_lcp_req = *req;
1205 
1206  DBG_LCP("execEND_LCP_REQ" << endl);
1207 
1208  ndbrequire(!m_lcp_outstanding);
1209  m_lcp_curr_bucket = 0;
1210 
1211  D("execEND_LCP_REQ"
1212  << " this=" << m_last_lcp
1213  << " last_complete=" << m_last_lcp_complete
1214  << " bucket=" << m_lcp_curr_bucket
1215  << " outstanding=" << m_lcp_outstanding);
1216 
1217  m_last_lcp_complete = m_last_lcp;
1218  ndbrequire(m_lcp_state == LS_LCP_OFF);
1219  m_lcp_state = LS_LCP_ON;
1220  do_lcp_loop(signal);
1221 }
1222 
1223 Pgman::LCP_STATE
1224 Pgman::process_lcp(Signal* signal)
1225 {
1226  Page_hashlist& pl_hash = m_page_hashlist;
1227 
1228  int max_count = 0;
1229  if (m_param.m_max_io_waits > m_stats.m_current_io_waits)
1230  {
1231  jam();
1232  max_count = m_param.m_max_io_waits - m_stats.m_current_io_waits;
1233  max_count = max_count / 2 + 1;
1234  }
1235 
1236  D("process_lcp"
1237  << " this=" << m_last_lcp
1238  << " last_complete=" << m_last_lcp_complete
1239  << " bucket=" << m_lcp_curr_bucket
1240  << " outstanding=" << m_lcp_outstanding);
1241 
1242  // start or re-start from beginning of current hash bucket
1243  if (m_lcp_curr_bucket != ~(Uint32)0)
1244  {
1245  jam();
1246  Page_hashlist::Iterator iter;
1247  pl_hash.next(m_lcp_curr_bucket, iter);
1248  Uint32 loop = 0;
1249  while (iter.curr.i != RNIL &&
1250  m_lcp_outstanding < (Uint32) max_count &&
1251  (loop ++ < 32 || iter.bucket == m_lcp_curr_bucket))
1252  {
1253  jam();
1254  Ptr<Page_entry>& ptr = iter.curr;
1255  Page_state state = ptr.p->m_state;
1256 
1257  DBG_LCP("LCP " << ptr << " - ");
1258 
1259  if (ptr.p->m_last_lcp < m_last_lcp &&
1260  (state & Page_entry::DIRTY) &&
1261  (! (state & Page_entry::LOCKED)))
1262  {
1263  jam();
1264  if(! (state & Page_entry::BOUND))
1265  {
1266  ndbout << ptr << endl;
1267  ndbrequire(false);
1268  }
1269  if (state & Page_entry::BUSY)
1270  {
1271  jam();
1272  DBG_LCP(" BUSY" << endl);
1273  break; // wait for it
1274  }
1275  else if (state & Page_entry::PAGEOUT)
1276  {
1277  jam();
1278  DBG_LCP(" PAGEOUT -> state |= LCP" << endl);
1279  set_page_state(ptr, state | Page_entry::LCP);
1280  }
1281  else
1282  {
1283  jam();
1284  DBG_LCP(" pageout()" << endl);
1285  ptr.p->m_state |= Page_entry::LCP;
1286  if (c_tup != 0)
1287  c_tup->disk_page_unmap_callback(0,
1288  ptr.p->m_real_page_i,
1289  ptr.p->m_dirty_count);
1290  pageout(signal, ptr);
1291  }
1292  ptr.p->m_last_lcp = m_last_lcp;
1293  m_lcp_outstanding++;
1294  }
1295  else
1296  {
1297  jam();
1298  DBG_LCP(" NOT DIRTY" << endl);
1299  }
1300  pl_hash.next(iter);
1301  }
1302 
1303  m_lcp_curr_bucket = (iter.curr.i != RNIL ? iter.bucket : ~(Uint32)0);
1304  }
1305 
1306  if (m_lcp_curr_bucket == ~(Uint32)0 && !m_lcp_outstanding)
1307  {
1308  jam();
1309  Ptr<Page_entry> ptr;
1310  Page_sublist& pl = *m_page_sublist[Page_entry::SL_LOCKED];
1311  if (pl.first(ptr))
1312  {
1313  jam();
1314  process_lcp_locked(signal, ptr);
1315  return LS_LCP_LOCKED;
1316  }
1317  else
1318  {
1319  jam();
1320  if (ERROR_INSERTED(11007))
1321  {
1322  ndbout << "No more writes..." << endl;
1323  signal->theData[0] = 9999;
1324  sendSignalWithDelay(CMVMI_REF, GSN_NDB_TAMPER, signal, 10000, 1);
1325  SET_ERROR_INSERT_VALUE(11008);
1326  }
1327  EndLcpConf* conf = (EndLcpConf*)signal->getDataPtrSend();
1328  conf->senderData = m_end_lcp_req.senderData;
1329  conf->senderRef = reference();
1330  sendSignal(m_end_lcp_req.senderRef, GSN_END_LCP_CONF,
1331  signal, EndLcpConf::SignalLength, JBB);
1332  return LS_LCP_OFF;
1333  }
1334  }
1335 
1336  if (m_lcp_outstanding >= (Uint32) max_count)
1337  {
1338  jam();
1339  return LS_LCP_MAX_LCP_OUTSTANDING;
1340  }
1341 
1342  return LS_LCP_ON;
1343 }
1344 
1345 void
1346 Pgman::process_lcp_locked(Signal* signal, Ptr<Page_entry> ptr)
1347 {
1348  CRASH_INSERTION(11006);
1349 
1350  // protect from tsman parallel access
1351  Tablespace_client tsman(signal, this, c_tsman, 0, 0, 0);
1352  ptr.p->m_last_lcp = m_last_lcp;
1353  if (ptr.p->m_state & Page_entry::DIRTY)
1354  {
1355  Ptr<GlobalPage> org, copy;
1356  ndbrequire(m_global_page_pool.seize(copy));
1357  m_global_page_pool.getPtr(org, ptr.p->m_real_page_i);
1358  memcpy(copy.p, org.p, sizeof(GlobalPage));
1359  ptr.p->m_copy_page_i = copy.i;
1360 
1361  m_lcp_outstanding++;
1362  ptr.p->m_state |= Page_entry::LCP;
1363  pageout(signal, ptr);
1364  return;
1365  }
1366 
1367  Page_sublist& pl = *m_page_sublist[Page_entry::SL_LOCKED];
1368  pl.next(ptr);
1369 
1370  signal->theData[0] = PgmanContinueB::LCP_LOCKED;
1371  signal->theData[1] = ptr.i;
1372  sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
1373 }
1374 
1375 void
1376 Pgman::process_lcp_locked_fswriteconf(Signal* signal, Ptr<Page_entry> ptr)
1377 {
1378  Ptr<GlobalPage> org, copy;
1379  m_global_page_pool.getPtr(copy, ptr.p->m_copy_page_i);
1380  m_global_page_pool.getPtr(org, ptr.p->m_real_page_i);
1381  memcpy(org.p, copy.p, sizeof(GlobalPage));
1382  m_global_page_pool.release(copy);
1383  ptr.p->m_copy_page_i = RNIL;
1384 
1385  Page_sublist& pl = *m_page_sublist[Page_entry::SL_LOCKED];
1386  pl.next(ptr);
1387 
1388  signal->theData[0] = PgmanContinueB::LCP_LOCKED;
1389  signal->theData[1] = ptr.i;
1390  sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
1391 }
1392 
1393 // page read and write
1394 
1395 void
1396 Pgman::pagein(Signal* signal, Ptr<Page_entry> ptr)
1397 {
1398  D("pagein");
1399  D(ptr);
1400 
1401  ndbrequire(! (ptr.p->m_state & Page_entry::PAGEIN));
1402  set_page_state(ptr, ptr.p->m_state | Page_entry::PAGEIN);
1403 
1404  fsreadreq(signal, ptr);
1405  m_stats.m_current_io_waits++;
1406 }
1407 
1408 void
1409 Pgman::fsreadconf(Signal* signal, Ptr<Page_entry> ptr)
1410 {
1411  D("fsreadconf");
1412  D(ptr);
1413 
1414  ndbrequire(ptr.p->m_state & Page_entry::PAGEIN);
1415  Page_state state = ptr.p->m_state;
1416 
1417  state &= ~ Page_entry::PAGEIN;
1418  state &= ~ Page_entry::EMPTY;
1419  state |= Page_entry::MAPPED;
1420  set_page_state(ptr, state);
1421 
1422  {
1429  Ptr<GlobalPage> pagePtr;
1430  m_global_page_pool.getPtr(pagePtr, ptr.p->m_real_page_i);
1433 
1434  Uint64 lsn = 0;
1435  lsn += page->m_page_header.m_page_lsn_hi; lsn <<= 32;
1436  lsn += page->m_page_header.m_page_lsn_lo;
1437  ptr.p->m_lsn = lsn;
1438  }
1439 
1440  ndbrequire(m_stats.m_current_io_waits > 0);
1441  m_stats.m_current_io_waits--;
1442  m_stats.m_pages_read++;
1443 
1444  ptr.p->m_last_lcp = m_last_lcp_complete;
1445  do_busy_loop(signal, true);
1446 }
1447 
1448 void
1449 Pgman::pageout(Signal* signal, Ptr<Page_entry> ptr)
1450 {
1451  D("pageout");
1452  D(ptr);
1453 
1454  Page_state state = ptr.p->m_state;
1455  ndbrequire(state & Page_entry::BOUND);
1456  ndbrequire(state & Page_entry::MAPPED);
1457  ndbrequire(! (state & Page_entry::BUSY));
1458  ndbrequire(! (state & Page_entry::PAGEOUT));
1459 
1460  state |= Page_entry::PAGEOUT;
1461 
1462  // update lsn on page prior to write
1463  Ptr<GlobalPage> pagePtr;
1464  m_global_page_pool.getPtr(pagePtr, ptr.p->m_real_page_i);
1467  page->m_page_header.m_page_lsn_hi = (Uint32)(ptr.p->m_lsn >> 32);
1468  page->m_page_header.m_page_lsn_lo = (Uint32)(ptr.p->m_lsn & 0xFFFFFFFF);
1469 
1470  // undo WAL
1472  req.m_callback.m_callbackData = ptr.i;
1473  req.m_callback.m_callbackIndex = LOGSYNC_CALLBACK;
1474  D("Logfile_client - pageout");
1475  Logfile_client lgman(this, c_lgman, RNIL);
1476  int ret = lgman.sync_lsn(signal, ptr.p->m_lsn, &req, 0);
1477  if (ret > 0)
1478  {
1479  fswritereq(signal, ptr);
1480  m_stats.m_current_io_waits++;
1481  }
1482  else
1483  {
1484  ndbrequire(ret == 0);
1485  m_stats.m_log_waits++;
1486  state |= Page_entry::LOGSYNC;
1487  }
1488  set_page_state(ptr, state);
1489 }
1490 
1491 void
1492 Pgman::logsync_callback(Signal* signal, Uint32 ptrI, Uint32 res)
1493 {
1494  Ptr<Page_entry> ptr;
1495  m_page_entry_pool.getPtr(ptr, ptrI);
1496 
1497  D("logsync_callback");
1498  D(ptr);
1499 
1500  // it is OK to be "busy" at this point (the commit is queued)
1501  Page_state state = ptr.p->m_state;
1502  ndbrequire(state & Page_entry::PAGEOUT);
1503  ndbrequire(state & Page_entry::LOGSYNC);
1504  state &= ~ Page_entry::LOGSYNC;
1505  set_page_state(ptr, state);
1506 
1507  fswritereq(signal, ptr);
1508  m_stats.m_current_io_waits++;
1509 }
1510 
1511 void
1512 Pgman::fswriteconf(Signal* signal, Ptr<Page_entry> ptr)
1513 {
1514  D("fswriteconf");
1515  D(ptr);
1516 
1517  Page_state state = ptr.p->m_state;
1518  ndbrequire(state & Page_entry::PAGEOUT);
1519 
1520  if (c_tup != 0)
1521  c_tup->disk_page_unmap_callback(1,
1522  ptr.p->m_real_page_i,
1523  ptr.p->m_dirty_count);
1524 
1525  state &= ~ Page_entry::PAGEOUT;
1526  state &= ~ Page_entry::EMPTY;
1527  state &= ~ Page_entry::DIRTY;
1528 
1529  ndbrequire(m_stats.m_current_io_waits > 0);
1530  m_stats.m_current_io_waits--;
1531 
1532  if (state & Page_entry::LCP)
1533  {
1534  jam();
1535  state &= ~ Page_entry::LCP;
1536  ndbrequire(m_lcp_outstanding);
1537  m_lcp_outstanding--;
1538  m_stats.m_pages_written_lcp++;
1539  if (ptr.p->m_copy_page_i != RNIL)
1540  {
1541  jam();
1542  Tablespace_client tsman(signal, this, c_tsman, 0, 0, 0);
1543  process_lcp_locked_fswriteconf(signal, ptr);
1544  set_page_state(ptr, state);
1545  do_busy_loop(signal, true);
1546  return;
1547  }
1548  }
1549  else
1550  {
1551  m_stats.m_pages_written++;
1552  }
1553 
1554  set_page_state(ptr, state);
1555  do_busy_loop(signal, true);
1556 
1557  if (m_lcp_state == LS_LCP_MAX_LCP_OUTSTANDING)
1558  {
1559  jam();
1560  do_lcp_loop(signal);
1561  }
1562 }
1563 
1564 // file system interface
1565 
1566 void
1567 Pgman::fsreadreq(Signal* signal, Ptr<Page_entry> ptr)
1568 {
1569  File_map::ConstDataBufferIterator it;
1570  bool ret = m_file_map.first(it) && m_file_map.next(it, ptr.p->m_file_no);
1571  ndbrequire(ret);
1572  Uint32 fd = * it.data;
1573 
1574  ndbrequire(ptr.p->m_page_no > 0);
1575 
1576  FsReadWriteReq* req = (FsReadWriteReq*)signal->getDataPtrSend();
1577  req->filePointer = fd;
1578  req->userReference = reference();
1579  req->userPointer = ptr.i;
1580  req->varIndex = ptr.p->m_page_no;
1581  req->numberOfPages = 1;
1582  req->operationFlag = 0;
1583  FsReadWriteReq::setFormatFlag(req->operationFlag,
1584  FsReadWriteReq::fsFormatGlobalPage);
1585  req->data.pageData[0] = ptr.p->m_real_page_i;
1586  sendSignal(NDBFS_REF, GSN_FSREADREQ, signal,
1587  FsReadWriteReq::FixedLength + 1, JBA);
1588 }
1589 
1590 void
1591 Pgman::execFSREADCONF(Signal* signal)
1592 {
1593  jamEntry();
1594  FsConf* conf = (FsConf*)signal->getDataPtr();
1595  Ptr<Page_entry> ptr;
1596  m_page_entry_pool.getPtr(ptr, conf->userPointer);
1597 
1598  fsreadconf(signal, ptr);
1599 }
1600 
1601 void
1602 Pgman::execFSREADREF(Signal* signal)
1603 {
1604  jamEntry();
1605  SimulatedBlock::execFSREADREF(signal);
1606  ndbrequire(false);
1607 }
1608 
1609 void
1610 Pgman::fswritereq(Signal* signal, Ptr<Page_entry> ptr)
1611 {
1612  File_map::ConstDataBufferIterator it;
1613  m_file_map.first(it);
1614  m_file_map.next(it, ptr.p->m_file_no);
1615  Uint32 fd = * it.data;
1616 
1617  ndbrequire(ptr.p->m_page_no > 0);
1618 
1619  FsReadWriteReq* req = (FsReadWriteReq*)signal->getDataPtrSend();
1620  req->filePointer = fd;
1621  req->userReference = reference();
1622  req->userPointer = ptr.i;
1623  req->varIndex = ptr.p->m_page_no;
1624  req->numberOfPages = 1;
1625  req->operationFlag = 0;
1626  FsReadWriteReq::setFormatFlag(req->operationFlag,
1627  FsReadWriteReq::fsFormatGlobalPage);
1628  req->data.pageData[0] = ptr.p->m_real_page_i;
1629 
1630 #if ERROR_INSERT_CODE
1631  if (ptr.p->m_state & Page_entry::LOCKED)
1632  {
1633  sendSignalWithDelay(NDBFS_REF, GSN_FSWRITEREQ, signal,
1634  3000, FsReadWriteReq::FixedLength + 1);
1635  ndbout_c("pageout locked (3s)");
1636  return;
1637  }
1638 #endif
1639 
1640  if (!ERROR_INSERTED(11008))
1641  {
1642  sendSignal(NDBFS_REF, GSN_FSWRITEREQ, signal,
1643  FsReadWriteReq::FixedLength + 1, JBA);
1644  }
1645 }
1646 
1647 void
1648 Pgman::execFSWRITECONF(Signal* signal)
1649 {
1650  jamEntry();
1651  FsConf* conf = (FsConf*)signal->getDataPtr();
1652  Ptr<Page_entry> ptr;
1653  m_page_entry_pool.getPtr(ptr, conf->userPointer);
1654 
1655  fswriteconf(signal, ptr);
1656 }
1657 
1658 
1659 void
1660 Pgman::execFSWRITEREF(Signal* signal)
1661 {
1662  jamEntry();
1663  SimulatedBlock::execFSWRITEREF(signal);
1664  ndbrequire(false);
1665 }
1666 
1667 // client methods
1668 
1669 int
1670 Pgman::get_page_no_lirs(Signal* signal, Ptr<Page_entry> ptr, Page_request page_req)
1671 {
1672  jamEntry();
1673 
1674 #ifdef VM_TRACE
1675  Ptr<Page_request> tmp = { &page_req, RNIL};
1676 
1677  D(">get_page");
1678  D(ptr);
1679  D(tmp);
1680 #endif
1681 
1682  Uint32 req_flags = page_req.m_flags;
1683 
1684  if (req_flags & Page_request::EMPTY_PAGE)
1685  {
1686  // Only one can "init" a page at a time
1687  //ndbrequire(ptr.p->m_requests.isEmpty());
1688  }
1689 
1690  Page_state state = ptr.p->m_state;
1691  bool is_new = (state == 0);
1692  bool busy_count = false;
1693 
1694  if (req_flags & Page_request::LOCK_PAGE)
1695  {
1696  jam();
1697  state |= Page_entry::LOCKED;
1698  }
1699 
1700  if (req_flags & Page_request::ALLOC_REQ)
1701  {
1702  jam();
1703  }
1704  else if (req_flags & Page_request::COMMIT_REQ)
1705  {
1706  busy_count = true;
1707  state |= Page_entry::BUSY;
1708  }
1709  else if ((req_flags & Page_request::OP_MASK) != ZREAD)
1710  {
1711  jam();
1712  }
1713 
1714  const Page_state LOCKED = Page_entry::LOCKED | Page_entry::MAPPED;
1715  if ((state & LOCKED) == LOCKED &&
1716  ! (req_flags & Page_request::UNLOCK_PAGE))
1717  {
1718  ptr.p->m_state |= (req_flags & DIRTY_FLAGS ? Page_entry::DIRTY : 0);
1719  m_stats.m_page_requests_direct_return++;
1720  if (ptr.p->m_copy_page_i != RNIL)
1721  {
1722  D("<get_page: immediate copy_page");
1723  return ptr.p->m_copy_page_i;
1724  }
1725 
1726  D("<get_page: immediate locked");
1727  return ptr.p->m_real_page_i;
1728  }
1729 
1730  bool only_request = ptr.p->m_requests.isEmpty();
1731 #ifdef ERROR_INSERT
1732  if (req_flags & Page_request::DELAY_REQ)
1733  {
1734  jam();
1735  only_request = false;
1736  }
1737 #endif
1738  if (only_request &&
1739  state & Page_entry::MAPPED)
1740  {
1741  if (! (state & Page_entry::PAGEOUT))
1742  {
1743  if (req_flags & DIRTY_FLAGS)
1744  state |= Page_entry::DIRTY;
1745 
1746  ptr.p->m_busy_count += busy_count;
1747  set_page_state(ptr, state);
1748 
1749  D("<get_page: immediate");
1750 
1751  ndbrequire(ptr.p->m_real_page_i != RNIL);
1752  m_stats.m_page_requests_direct_return++;
1753  return ptr.p->m_real_page_i;
1754  }
1755  }
1756 
1757  if (! (req_flags & (Page_request::LOCK_PAGE | Page_request::UNLOCK_PAGE)))
1758  {
1759  ndbrequire(! (state & Page_entry::LOCKED));
1760  }
1761 
1762  // queue the request
1763 
1764  if ((state & Page_entry::MAPPED) && ! (state & Page_entry::PAGEOUT))
1765  m_stats.m_page_requests_wait_q++;
1766  else
1767  m_stats.m_page_requests_wait_io++;
1768 
1769  Ptr<Pgman::Page_request> req_ptr;
1770  {
1771  Local_page_request_list req_list(m_page_request_pool, ptr.p->m_requests);
1772  if (! (req_flags & Page_request::ALLOC_REQ))
1773  req_list.seizeLast(req_ptr);
1774  else
1775  req_list.seizeFirst(req_ptr);
1776  }
1777 
1778  if (req_ptr.i == RNIL)
1779  {
1780  if (is_new)
1781  {
1782  release_page_entry(ptr);
1783  }
1784  D("<get_page: error out of requests");
1785  return -1;
1786  }
1787 
1788  req_ptr.p->m_block = page_req.m_block;
1789  req_ptr.p->m_flags = page_req.m_flags;
1790  req_ptr.p->m_callback = page_req.m_callback;
1791 #ifdef ERROR_INSERT
1792  req_ptr.p->m_delay_until_time = page_req.m_delay_until_time;
1793 #endif
1794 
1795  state |= Page_entry::REQUEST;
1796  if (only_request && (req_flags & Page_request::EMPTY_PAGE))
1797  {
1798  state |= Page_entry::EMPTY;
1799  }
1800 
1801  if (req_flags & Page_request::UNLOCK_PAGE)
1802  {
1803  // keep it locked
1804  }
1805 
1806  ptr.p->m_busy_count += busy_count;
1807  ptr.p->m_dirty_count += !!(req_flags & DIRTY_FLAGS);
1808  set_page_state(ptr, state);
1809 
1810  D(req_ptr);
1811  D("<get_page: queued");
1812  return 0;
1813 }
1814 
1815 int
1816 Pgman::get_page(Signal* signal, Ptr<Page_entry> ptr, Page_request page_req)
1817 {
1818  int i = get_page_no_lirs(signal, ptr, page_req);
1819  if (unlikely(i == -1))
1820  {
1821  jam();
1822  return -1;
1823  }
1824 
1825  Uint32 req_flags = page_req.m_flags;
1826  Page_state state = ptr.p->m_state;
1827 
1828  // update LIRS
1829  if (! (state & Page_entry::LOCKED) &&
1830  ! (req_flags & Page_request::CORR_REQ))
1831  {
1832  jam();
1833  lirs_reference(ptr);
1834  }
1835 
1836  // start processing if request was queued
1837  if (i == 0)
1838  {
1839  jam();
1840  do_busy_loop(signal, true);
1841  }
1842 
1843  return i;
1844 }
1845 
1846 void
1847 Pgman::update_lsn(Ptr<Page_entry> ptr, Uint32 block, Uint64 lsn)
1848 {
1849  jamEntry();
1850  D(">update_lsn: block=" << hex << block << dec << " lsn=" << lsn);
1851  D(ptr);
1852 
1853  Page_state state = ptr.p->m_state;
1854  ptr.p->m_lsn = lsn;
1855 
1856  if (state & Page_entry::BUSY)
1857  {
1858  ndbrequire(ptr.p->m_busy_count != 0);
1859  if (--ptr.p->m_busy_count == 0)
1860  {
1861  state &= ~ Page_entry::BUSY;
1862  }
1863  }
1864 
1865  state |= Page_entry::DIRTY;
1866  set_page_state(ptr, state);
1867 
1868  D(ptr);
1869  D("<update_lsn");
1870 }
1871 
1872 Uint32
1873 Pgman::create_data_file()
1874 {
1875  File_map::DataBufferIterator it;
1876  if(m_file_map.first(it))
1877  {
1878  do
1879  {
1880  if(*it.data == RNIL)
1881  {
1882  *it.data = (1u << 31) | it.pos;
1883  D("create_data_file:" << V(it.pos));
1884  return it.pos;
1885  }
1886  } while(m_file_map.next(it));
1887  }
1888 
1889  Uint32 file_no = m_file_map.getSize();
1890  Uint32 fd = (1u << 31) | file_no;
1891 
1892  if (m_file_map.append(&fd, 1))
1893  {
1894  D("create_data_file:" << V(file_no));
1895  return file_no;
1896  }
1897  D("create_data_file: RNIL");
1898  return RNIL;
1899 }
1900 
1901 Uint32
1902 Pgman::alloc_data_file(Uint32 file_no)
1903 {
1904  Uint32 sz = m_file_map.getSize();
1905  if (file_no >= sz)
1906  {
1907  Uint32 len = file_no - sz + 1;
1908  Uint32 fd = RNIL;
1909  while (len--)
1910  {
1911  if (! m_file_map.append(&fd, 1))
1912  {
1913  D("alloc_data_file: RNIL");
1914  return RNIL;
1915  }
1916  }
1917  }
1918 
1919  File_map::DataBufferIterator it;
1920  m_file_map.first(it);
1921  m_file_map.next(it, file_no);
1922  if (* it.data != RNIL)
1923  {
1924  D("alloc_data_file: RNIL");
1925  return RNIL;
1926  }
1927 
1928  *it.data = (1u << 31) | file_no;
1929  D("alloc_data_file:" << V(file_no));
1930  return file_no;
1931 }
1932 
1933 void
1934 Pgman::map_file_no(Uint32 file_no, Uint32 fd)
1935 {
1936  File_map::DataBufferIterator it;
1937  m_file_map.first(it);
1938  m_file_map.next(it, file_no);
1939 
1940  assert(*it.data == ((1u << 31) | file_no));
1941  *it.data = fd;
1942  D("map_file_no:" << V(file_no) << V(fd));
1943 }
1944 
1945 void
1946 Pgman::free_data_file(Uint32 file_no, Uint32 fd)
1947 {
1948  File_map::DataBufferIterator it;
1949  m_file_map.first(it);
1950  m_file_map.next(it, file_no);
1951 
1952  if (fd == RNIL)
1953  {
1954  ndbrequire(*it.data == ((1u << 31) | file_no));
1955  }
1956  else
1957  {
1958  ndbrequire(*it.data == fd);
1959  }
1960  *it.data = RNIL;
1961  D("free_data_file:" << V(file_no) << V(fd));
1962 }
1963 
1964 void
1965 Pgman::execDATA_FILE_ORD(Signal* signal)
1966 {
1967  const DataFileOrd* ord = (const DataFileOrd*)signal->getDataPtr();
1968  Uint32 ret;
1969  switch (ord->cmd) {
1970  case DataFileOrd::CreateDataFile:
1971  ret = create_data_file();
1972  ndbrequire(ret == ord->ret);
1973  break;
1974  case DataFileOrd::AllocDataFile:
1975  ret = alloc_data_file(ord->file_no);
1976  ndbrequire(ret == ord->ret);
1977  break;
1978  case DataFileOrd::MapFileNo:
1979  map_file_no(ord->file_no, ord->fd);
1980  break;
1981  case DataFileOrd::FreeDataFile:
1982  free_data_file(ord->file_no, ord->fd);
1983  break;
1984  default:
1985  ndbrequire(false);
1986  break;
1987  }
1988 }
1989 
1990 int
1991 Pgman::drop_page(Ptr<Page_entry> ptr)
1992 {
1993  D("drop_page");
1994  D(ptr);
1995 
1996  Page_stack& pl_stack = m_page_stack;
1997  Page_queue& pl_queue = m_page_queue;
1998 
1999  Page_state state = ptr.p->m_state;
2000  if (! (state & (Page_entry::PAGEIN | Page_entry::PAGEOUT)))
2001  {
2002  if (state & Page_entry::ONSTACK)
2003  {
2004  jam();
2005  bool at_bottom = ! pl_stack.hasPrev(ptr);
2006  pl_stack.remove(ptr);
2007  state &= ~ Page_entry::ONSTACK;
2008  if (at_bottom)
2009  {
2010  jam();
2011  lirs_stack_prune();
2012  }
2013  if (state & Page_entry::HOT)
2014  {
2015  jam();
2016  state &= ~ Page_entry::HOT;
2017  }
2018  }
2019 
2020  if (state & Page_entry::ONQUEUE)
2021  {
2022  jam();
2023  pl_queue.remove(ptr);
2024  state &= ~ Page_entry::ONQUEUE;
2025  }
2026 
2027  if (state & Page_entry::BUSY)
2028  {
2029  jam();
2030  state &= ~ Page_entry::BUSY;
2031  }
2032 
2033  if (state & Page_entry::DIRTY)
2034  {
2035  jam();
2036  state &= ~ Page_entry::DIRTY;
2037  }
2038 
2039  if (state & Page_entry::EMPTY)
2040  {
2041  jam();
2042  state &= ~ Page_entry::EMPTY;
2043  }
2044 
2045  if (state & Page_entry::MAPPED)
2046  {
2047  jam();
2048  state &= ~ Page_entry::MAPPED;
2049  }
2050 
2051  if (state & Page_entry::BOUND)
2052  {
2053  jam();
2054  ndbrequire(ptr.p->m_real_page_i != RNIL);
2055  release_cache_page(ptr.p->m_real_page_i);
2056  ptr.p->m_real_page_i = RNIL;
2057  state &= ~ Page_entry::BOUND;
2058  }
2059 
2060  set_page_state(ptr, state);
2061  release_page_entry(ptr);
2062  return 1;
2063  }
2064 
2065  ndbrequire(false);
2066  return -1;
2067 }
2068 
2069 void
2070 Pgman::execRELEASE_PAGES_REQ(Signal* signal)
2071 {
2072  const ReleasePagesReq* req = (const ReleasePagesReq*)signal->getDataPtr();
2073  const Uint32 senderData = req->senderData;
2074  const Uint32 senderRef = req->senderRef;
2075  const Uint32 requestType = req->requestType;
2076  const Uint32 bucket = req->requestData;
2077  ndbrequire(req->requestType == ReleasePagesReq::RT_RELEASE_UNLOCKED);
2078 
2079  Page_hashlist& pl_hash = m_page_hashlist;
2080  Page_hashlist::Iterator iter;
2081  pl_hash.next(bucket, iter);
2082 
2083  Uint32 loop = 0;
2084  while (iter.curr.i != RNIL && (loop++ < 8 || iter.bucket == bucket))
2085  {
2086  jam();
2087  Ptr<Page_entry> ptr = iter.curr;
2088  if (!(ptr.p->m_state & Page_entry::LOCKED) &&
2089  (ptr.p->m_state & Page_entry::BOUND) &&
2090  (ptr.p->m_state & Page_entry::MAPPED)) // should be
2091  {
2092  jam();
2093  D(ptr << ": release");
2094  ndbrequire(!(ptr.p->m_state & Page_entry::REQUEST));
2095  ndbrequire(!(ptr.p->m_state & Page_entry::EMPTY));
2096  ndbrequire(!(ptr.p->m_state & Page_entry::DIRTY));
2097  ndbrequire(!(ptr.p->m_state & Page_entry::BUSY));
2098  ndbrequire(!(ptr.p->m_state & Page_entry::PAGEIN));
2099  ndbrequire(!(ptr.p->m_state & Page_entry::PAGEOUT));
2100  ndbrequire(!(ptr.p->m_state & Page_entry::LOGSYNC));
2101  drop_page(ptr);
2102  }
2103  pl_hash.next(iter);
2104  }
2105 
2106  if (iter.curr.i != RNIL) {
2107  jam();
2108  ndbassert(iter.bucket > bucket);
2109  ReleasePagesReq* req = (ReleasePagesReq*)signal->getDataPtrSend();
2110  req->senderData = senderData;
2111  req->senderRef = senderRef;
2112  req->requestType = requestType;
2113  req->requestData = iter.bucket;
2114  sendSignal(reference(), GSN_RELEASE_PAGES_REQ,
2115  signal, ReleasePagesReq::SignalLength, JBB);
2116  return;
2117  }
2118 
2119  ReleasePagesConf* conf = (ReleasePagesConf*)signal->getDataPtrSend();
2120  conf->senderData = senderData;
2121  conf->senderRef = reference();
2122  sendSignal(senderRef, GSN_RELEASE_PAGES_CONF,
2123  signal, ReleasePagesConf::SignalLength, JBB);
2124 }
2125 
2126 // page cache client
2127 
2128 #include <PgmanProxy.hpp>
2129 
2130 Page_cache_client::Page_cache_client(SimulatedBlock* block,
2131  SimulatedBlock* pgman)
2132 {
2133  m_block = numberToBlock(block->number(), block->instance());
2134 
2135  if (pgman->isNdbMtLqh() && pgman->instance() == 0) {
2136  m_pgman_proxy = (PgmanProxy*)pgman;
2137  m_pgman = 0;
2138  } else {
2139  m_pgman_proxy = 0;
2140  m_pgman = (Pgman*)pgman;
2141  }
2142 }
2143 
2144 int
2146 {
2147  if (m_pgman_proxy != 0) {
2148  return m_pgman_proxy->get_page(*this, signal, req, flags);
2149  }
2150 
2151  Ptr<Pgman::Page_entry> entry_ptr;
2152  Uint32 file_no = req.m_page.m_file_no;
2153  Uint32 page_no = req.m_page.m_page_no;
2154 
2155  D("get_page" << V(file_no) << V(page_no) << hex << V(flags));
2156 
2157  // make sure TUP does not peek at obsolete data
2158  m_ptr.i = RNIL;
2159  m_ptr.p = 0;
2160 
2161  // find or seize
2162  bool ok = m_pgman->get_page_entry(entry_ptr, file_no, page_no);
2163  if (! ok)
2164  {
2165  return -1;
2166  }
2167 
2168  Pgman::Page_request page_req;
2169  page_req.m_block = m_block;
2170  page_req.m_flags = flags;
2171  page_req.m_callback = req.m_callback;
2172 #ifdef ERROR_INSERT
2173  page_req.m_delay_until_time = req.m_delay_until_time;
2174 #endif
2175 
2176  int i = m_pgman->get_page(signal, entry_ptr, page_req);
2177  if (i > 0)
2178  {
2179  // TODO remove
2180  m_pgman->m_global_page_pool.getPtr(m_ptr, (Uint32)i);
2181  }
2182  return i;
2183 }
2184 
2185 void
2186 Page_cache_client::update_lsn(Local_key key, Uint64 lsn)
2187 {
2188  if (m_pgman_proxy != 0) {
2189  m_pgman_proxy->update_lsn(*this, key, lsn);
2190  return;
2191  }
2192 
2193  Ptr<Pgman::Page_entry> entry_ptr;
2194  Uint32 file_no = key.m_file_no;
2195  Uint32 page_no = key.m_page_no;
2196 
2197  D("update_lsn" << V(file_no) << V(page_no) << V(lsn));
2198 
2199  bool found = m_pgman->find_page_entry(entry_ptr, file_no, page_no);
2200  assert(found);
2201 
2202  m_pgman->update_lsn(entry_ptr, m_block, lsn);
2203 }
2204 
2205 int
2207 {
2208  if (m_pgman_proxy != 0) {
2209  return m_pgman_proxy->drop_page(*this, key, page_id);
2210  }
2211 
2212  Ptr<Pgman::Page_entry> entry_ptr;
2213  Uint32 file_no = key.m_file_no;
2214  Uint32 page_no = key.m_page_no;
2215 
2216  D("drop_page" << V(file_no) << V(page_no));
2217 
2218  bool found = m_pgman->find_page_entry(entry_ptr, file_no, page_no);
2219  assert(found);
2220  assert(entry_ptr.p->m_real_page_i == page_id);
2221 
2222  return m_pgman->drop_page(entry_ptr);
2223 }
2224 
2225 Uint32
2227 {
2228  if (m_pgman_proxy != 0) {
2229  return m_pgman_proxy->create_data_file(signal);
2230  }
2231  return m_pgman->create_data_file();
2232 }
2233 
2234 Uint32
2236 {
2237  if (m_pgman_proxy != 0) {
2238  return m_pgman_proxy->alloc_data_file(signal, file_no);
2239  }
2240  return m_pgman->alloc_data_file(file_no);
2241 }
2242 
2243 void
2244 Page_cache_client::map_file_no(Signal* signal, Uint32 file_no, Uint32 fd)
2245 {
2246  if (m_pgman_proxy != 0) {
2247  m_pgman_proxy->map_file_no(signal, file_no, fd);
2248  return;
2249  }
2250  m_pgman->map_file_no(file_no, fd);
2251 }
2252 
2253 void
2254 Page_cache_client::free_data_file(Signal* signal, Uint32 file_no, Uint32 fd)
2255 {
2256  if (m_pgman_proxy != 0) {
2257  m_pgman_proxy->free_data_file(signal, file_no, fd);
2258  return;
2259  }
2260  m_pgman->free_data_file(file_no, fd);
2261 }
2262 
2263 // debug
2264 
2265 #ifdef VM_TRACE
2266 
2267 void
2268 Pgman::verify_page_entry(Ptr<Page_entry> ptr)
2269 {
2270  Page_stack& pl_stack = m_page_stack;
2271 
2272  Uint32 ptrI = ptr.i;
2273  Page_state state = ptr.p->m_state;
2274 
2275  bool has_req = state & Page_entry::REQUEST;
2276  bool has_req2 = ! ptr.p->m_requests.isEmpty();
2277  ndbrequire(has_req == has_req2 || dump_page_lists(ptrI));
2278 
2279  bool is_bound = state & Page_entry::BOUND;
2280  bool is_bound2 = ptr.p->m_real_page_i != RNIL;
2281  ndbrequire(is_bound == is_bound2 || dump_page_lists(ptrI));
2282 
2283  bool is_mapped = state & Page_entry::MAPPED;
2284  // mapped implies bound
2285  ndbrequire(! is_mapped || is_bound || dump_page_lists(ptrI));
2286  // bound is mapped or has open requests
2287  ndbrequire(! is_bound || is_mapped || has_req || dump_page_lists(ptrI));
2288 
2289  bool on_stack = state & Page_entry::ONSTACK;
2290  bool is_hot = state & Page_entry::HOT;
2291  // hot entry must be on stack
2292  ndbrequire(! is_hot || on_stack || dump_page_lists(ptrI));
2293 
2294  // stack bottom is hot
2295  bool at_bottom = on_stack && ! pl_stack.hasPrev(ptr);
2296  ndbrequire(! at_bottom || is_hot || dump_page_lists(ptrI));
2297 
2298  bool on_queue = state & Page_entry::ONQUEUE;
2299  // hot entry is not on queue
2300  ndbrequire(! is_hot || ! on_queue || dump_page_lists(ptrI));
2301 
2302  bool is_locked = state & Page_entry::LOCKED;
2303  bool on_queue2 = ! is_locked && ! is_hot && is_bound;
2304  ndbrequire(on_queue == on_queue2 || dump_page_lists(ptrI));
2305 
2306  // entries waiting to enter queue
2307  bool to_queue = ! is_locked && ! is_hot && ! is_bound && has_req;
2308 
2309  // page is about to be released
2310  bool to_release = (state == 0);
2311 
2312  // page is either LOCKED or under LIRS or about to be released
2313  bool is_lirs = on_stack || to_queue || on_queue;
2314  ndbrequire(to_release || is_locked == ! is_lirs || dump_page_lists(ptrI));
2315 
2316  bool pagein = state & Page_entry::PAGEIN;
2317  bool pageout = state & Page_entry::PAGEOUT;
2318  // cannot read and write at same time
2319  ndbrequire(! pagein || ! pageout || dump_page_lists(ptrI));
2320 
2321  Uint32 no = get_sublist_no(state);
2322  switch (no) {
2323  case Page_entry::SL_BIND:
2324  ndbrequire((! pagein && ! pageout) || dump_page_lists(ptrI));
2325  break;
2326  case Page_entry::SL_MAP:
2327  ndbrequire((! pagein && ! pageout) || dump_page_lists(ptrI));
2328  break;
2329  case Page_entry::SL_MAP_IO:
2330  ndbrequire((pagein && ! pageout) || dump_page_lists(ptrI));
2331  break;
2332  case Page_entry::SL_CALLBACK:
2333  ndbrequire((! pagein && ! pageout) || dump_page_lists(ptrI));
2334  break;
2335  case Page_entry::SL_CALLBACK_IO:
2336  ndbrequire((! pagein && pageout) || dump_page_lists(ptrI));
2337  break;
2338  case Page_entry::SL_BUSY:
2339  break;
2340  case Page_entry::SL_LOCKED:
2341  break;
2342  case Page_entry::SL_IDLE:
2343  break;
2344  case Page_entry::SL_OTHER:
2345  break;
2346  case ZNIL:
2347  ndbrequire(to_release || dump_page_lists(ptrI));
2348  break;
2349  default:
2350  ndbrequire(false || dump_page_lists(ptrI));
2351  break;
2352  }
2353 }
2354 
2355 void
2356 Pgman::verify_page_lists()
2357 {
2358  const Stats& stats = m_stats;
2359  const Param& param = m_param;
2360  Page_hashlist& pl_hash = m_page_hashlist;
2361  Page_stack& pl_stack = m_page_stack;
2362  Page_queue& pl_queue = m_page_queue;
2363  Ptr<Page_entry> ptr;
2364 
2365  Uint32 is_locked = 0;
2366  Uint32 is_bound = 0;
2367  Uint32 is_mapped = 0;
2368  Uint32 is_hot = 0;
2369  Uint32 on_stack = 0;
2370  Uint32 on_queue = 0;
2371  Uint32 to_queue = 0;
2372 
2373  Page_hashlist::Iterator iter;
2374  pl_hash.next(0, iter);
2375  while (iter.curr.i != RNIL)
2376  {
2377  ptr = iter.curr;
2378  Page_state state = ptr.p->m_state;
2379  // (state == 0) occurs only within a time-slice
2380  ndbrequire(state != 0);
2381  verify_page_entry(ptr);
2382 
2383  if (state & Page_entry::LOCKED)
2384  is_locked++;
2385  if (state & Page_entry::BOUND)
2386  is_bound++;
2387  if (state & Page_entry::MAPPED)
2388  is_mapped++;
2389  if (state & Page_entry::HOT)
2390  is_hot++;
2391  if (state & Page_entry::ONSTACK)
2392  on_stack++;
2393  if (state & Page_entry::ONQUEUE)
2394  on_queue++;
2395  if (! (state & Page_entry::LOCKED) &&
2396  ! (state & Page_entry::HOT) &&
2397  (state & Page_entry::REQUEST) &&
2398  ! (state & Page_entry::BOUND))
2399  to_queue++;
2400  pl_hash.next(iter);
2401  }
2402 
2403  for (pl_stack.first(ptr); ptr.i != RNIL; pl_stack.next(ptr))
2404  {
2405  Page_state state = ptr.p->m_state;
2406  ndbrequire(state & Page_entry::ONSTACK || dump_page_lists(ptr.i));
2407  if (! pl_stack.hasPrev(ptr))
2408  {
2409  ndbrequire(state & Page_entry::HOT || dump_page_lists(ptr.i));
2410  }
2411  }
2412 
2413  for (pl_queue.first(ptr); ptr.i != RNIL; pl_queue.next(ptr))
2414  {
2415  Page_state state = ptr.p->m_state;
2416  ndbrequire(state & Page_entry::ONQUEUE || dump_page_lists(ptr.i));
2417  ndbrequire(state & Page_entry::BOUND || dump_page_lists(ptr.i));
2418  ndbrequire(! (state & Page_entry::HOT) || dump_page_lists(ptr.i));
2419  }
2420 
2421  ndbrequire(is_bound == stats.m_num_pages || dump_page_lists());
2422  ndbrequire(is_hot == stats.m_num_hot_pages || dump_page_lists());
2423  ndbrequire(on_stack == pl_stack.count() || dump_page_lists());
2424  ndbrequire(on_queue == pl_queue.count() || dump_page_lists());
2425 
2426  Uint32 k;
2427  Uint32 entry_count = 0;
2428  char sublist_info[200] = "";
2429  for (k = 0; k < Page_entry::SUBLIST_COUNT; k++)
2430  {
2431  const Page_sublist& pl = *m_page_sublist[k];
2432  for (pl.first(ptr); ptr.i != RNIL; pl.next(ptr))
2433  ndbrequire(get_sublist_no(ptr.p->m_state) == k || dump_page_lists(ptr.i));
2434  entry_count += pl.count();
2435  sprintf(sublist_info + strlen(sublist_info),
2436  " %s:%u", get_sublist_name(k), pl.count());
2437  }
2438  ndbrequire(entry_count == pl_hash.count() || dump_page_lists());
2439 
2440  Uint32 hit_pct = 0;
2441  char hit_pct_str[20];
2442  if (stats.m_page_hits + stats.m_page_faults != 0)
2443  hit_pct = 10000 * stats.m_page_hits /
2444  (stats.m_page_hits + stats.m_page_faults);
2445  sprintf(hit_pct_str, "%u.%02u", hit_pct / 100, hit_pct % 100);
2446 
2447  D("loop"
2448  << " stats:" << m_stats_loop_on
2449  << " busy:" << m_busy_loop_on
2450  << " cleanup:" << m_cleanup_loop_on
2451  << " lcp:" << Uint32(m_lcp_state));
2452 
2453  D("page"
2454  << " entries:" << pl_hash.count()
2455  << " pages:" << stats.m_num_pages << "/" << param.m_max_pages
2456  << " mapped:" << is_mapped
2457  << " hot:" << is_hot
2458  << " io:" << stats.m_current_io_waits << "/" << param.m_max_io_waits
2459  << " hit pct:" << hit_pct_str);
2460 
2461  D("list"
2462  << " locked:" << is_locked
2463  << " stack:" << pl_stack.count()
2464  << " queue:" << pl_queue.count()
2465  << " to queue:" << to_queue);
2466 
2467  D(sublist_info);
2468 }
2469 
2470 void
2471 Pgman::verify_all()
2472 {
2473  Page_sublist& pl_bind = *m_page_sublist[Page_entry::SL_BIND];
2474  Page_sublist& pl_map = *m_page_sublist[Page_entry::SL_MAP];
2475  Page_sublist& pl_callback = *m_page_sublist[Page_entry::SL_CALLBACK];
2476 
2477  if (! pl_bind.isEmpty() || ! pl_map.isEmpty() || ! pl_callback.isEmpty())
2478  {
2479  ndbrequire(m_busy_loop_on || dump_page_lists());
2480  }
2481  verify_page_lists();
2482 }
2483 
2484 bool
2485 Pgman::dump_page_lists(Uint32 ptrI)
2486 {
2487  // use debugOut directly
2488  debugOut << "PGMAN: page list dump" << endl;
2489  if (ptrI != RNIL)
2490  debugOut << "PGMAN: error on PE [" << ptrI << "]" << "\n";
2491 
2492  Page_stack& pl_stack = m_page_stack;
2493  Page_queue& pl_queue = m_page_queue;
2494  Ptr<Page_entry> ptr;
2495  Uint32 n;
2496 
2497  debugOut << "stack:" << "\n";
2498  n = 0;
2499  for (pl_stack.first(ptr); ptr.i != RNIL; pl_stack.next(ptr))
2500  debugOut << n++ << " " << ptr << "\n";
2501 
2502  debugOut << "queue:" << "\n";
2503  n = 0;
2504  for (pl_queue.first(ptr); ptr.i != RNIL; pl_queue.next(ptr))
2505  debugOut << n++ << " " << ptr << "\n";
2506 
2507  Uint32 k;
2508  for (k = 0; k < Page_entry::SUBLIST_COUNT; k++)
2509  {
2510  debugOut << get_sublist_name(k) << ":" << "\n";
2511  const Page_sublist& pl = *m_page_sublist[k];
2512  n = 0;
2513  for (pl.first(ptr); ptr.i != RNIL; pl.next(ptr))
2514  debugOut << n++ << " " << ptr << "\n";
2515  }
2516 
2517  debugOut.flushline();
2518  return false;
2519 }
2520 
2521 #endif
2522 
2523 const char*
2524 Pgman::get_sublist_name(Uint32 list_no)
2525 {
2526  switch (list_no) {
2527  case Page_entry::SL_BIND:
2528  return "bind";
2529  case Page_entry::SL_MAP:
2530  return "map";
2531  case Page_entry::SL_MAP_IO:
2532  return "map_io";
2533  case Page_entry::SL_CALLBACK:
2534  return "cb";
2535  case Page_entry::SL_CALLBACK_IO:
2536  return "cb_io";
2537  case Page_entry::SL_BUSY:
2538  return "busy";
2539  case Page_entry::SL_LOCKED:
2540  return "locked";
2541  case Page_entry::SL_IDLE:
2542  return "idle";
2543  case Page_entry::SL_OTHER:
2544  return "other";
2545  }
2546  return "?";
2547 }
2548 
2549 NdbOut&
2550 operator<<(NdbOut& out, Ptr<Pgman::Page_request> ptr)
2551 {
2552  const Pgman::Page_request& pr = *ptr.p;
2553  out << "PR";
2554  if (ptr.i != RNIL)
2555  out << " [" << dec << ptr.i << "]";
2556  out << " block=" << hex << pr.m_block;
2557  out << " flags=" << hex << pr.m_flags;
2558  out << "," << dec << (pr.m_flags & Pgman::Page_request::OP_MASK);
2559  {
2560  if (pr.m_flags & Pgman::Page_request::LOCK_PAGE)
2561  out << ",lock_page";
2562  if (pr.m_flags & Pgman::Page_request::EMPTY_PAGE)
2563  out << ",empty_page";
2564  if (pr.m_flags & Pgman::Page_request::ALLOC_REQ)
2565  out << ",alloc_req";
2566  if (pr.m_flags & Pgman::Page_request::COMMIT_REQ)
2567  out << ",commit_req";
2568  if (pr.m_flags & Pgman::Page_request::DIRTY_REQ)
2569  out << ",dirty_req";
2570  if (pr.m_flags & Pgman::Page_request::CORR_REQ)
2571  out << ",corr_req";
2572  }
2573  return out;
2574 }
2575 
2576 NdbOut&
2577 operator<<(NdbOut& out, Ptr<Pgman::Page_entry> ptr)
2578 {
2579  const Pgman::Page_entry pe = *ptr.p;
2580  Uint32 list_no = Pgman::get_sublist_no(pe.m_state);
2581  out << "PE [" << dec << ptr.i << "]";
2582  out << " state=" << hex << pe.m_state;
2583  {
2584  if (pe.m_state & Pgman::Page_entry::REQUEST)
2585  out << ",request";
2586  if (pe.m_state & Pgman::Page_entry::EMPTY)
2587  out << ",empty";
2588  if (pe.m_state & Pgman::Page_entry::BOUND)
2589  out << ",bound";
2590  if (pe.m_state & Pgman::Page_entry::MAPPED)
2591  out << ",mapped";
2592  if (pe.m_state & Pgman::Page_entry::DIRTY)
2593  out << ",dirty";
2594  if (pe.m_state & Pgman::Page_entry::USED)
2595  out << ",used";
2596  if (pe.m_state & Pgman::Page_entry::BUSY)
2597  out << ",busy";
2598  if (pe.m_state & Pgman::Page_entry::LOCKED)
2599  out << ",locked";
2600  if (pe.m_state & Pgman::Page_entry::PAGEIN)
2601  out << ",pagein";
2602  if (pe.m_state & Pgman::Page_entry::PAGEOUT)
2603  out << ",pageout";
2604  if (pe.m_state & Pgman::Page_entry::LOGSYNC)
2605  out << ",logsync";
2606  if (pe.m_state & Pgman::Page_entry::LCP)
2607  out << ",lcp";
2608  if (pe.m_state & Pgman::Page_entry::HOT)
2609  out << ",hot";
2610  if (pe.m_state & Pgman::Page_entry::ONSTACK)
2611  out << ",onstack";
2612  if (pe.m_state & Pgman::Page_entry::ONQUEUE)
2613  out << ",onqueue";
2614  }
2615  out << " list=";
2616  if (list_no == ZNIL)
2617  out << "NONE";
2618  else
2619  {
2620  out << dec << list_no;
2621  out << "," << Pgman::get_sublist_name(list_no);
2622  }
2623  out << " diskpage=" << dec << pe.m_file_no << "," << pe.m_page_no;
2624  if (pe.m_real_page_i == RNIL)
2625  out << " realpage=RNIL";
2626  else {
2627  out << " realpage=" << dec << pe.m_real_page_i;
2628 #ifdef VM_TRACE
2629  if (pe.m_state & Pgman::Page_entry::MAPPED) {
2630  Ptr<GlobalPage> gptr;
2631  pe.m_this->m_global_page_pool.getPtr(gptr, pe.m_real_page_i);
2632  Uint32 hash_result[4];
2633  /* NOTE: Assuming "data" is 64 bit aligned as required by 'md5_hash' */
2634  md5_hash(hash_result,
2635  (Uint64*)gptr.p->data, sizeof(gptr.p->data)/sizeof(Uint32));
2636  out.print(" md5=%08x%08x%08x%08x",
2637  hash_result[0], hash_result[1],
2638  hash_result[2], hash_result[3]);
2639  }
2640 #endif
2641  }
2642  out << " lsn=" << dec << pe.m_lsn;
2643  out << " busy_count=" << dec << pe.m_busy_count;
2644 #ifdef VM_TRACE
2645  {
2646  Pgman::Page_stack& pl_stack = pe.m_this->m_page_stack;
2647  if (! pl_stack.hasNext(ptr))
2648  out << " top";
2649  if (! pl_stack.hasPrev(ptr))
2650  out << " bottom";
2651  }
2652  {
2654  req_list(ptr.p->m_this->m_page_request_pool, ptr.p->m_requests);
2655  if (! req_list.isEmpty())
2656  {
2657  Ptr<Pgman::Page_request> req_ptr;
2658  out << " req:";
2659  for (req_list.first(req_ptr); req_ptr.i != RNIL; req_list.next(req_ptr))
2660  {
2661  out << " " << req_ptr;
2662  }
2663  }
2664  }
2665 #endif
2666  return out;
2667 }
2668 
2669 void
2670 Pgman::execDUMP_STATE_ORD(Signal* signal)
2671 {
2672  jamEntry();
2673  Page_hashlist& pl_hash = m_page_hashlist;
2674 #ifdef VM_TRACE
2675  if (signal->theData[0] == 11000 && signal->getLength() == 2)
2676  {
2677  // has no effect currently
2678  Uint32 flag = signal->theData[1];
2679  debugFlag = flag & 1;
2680  debugSummaryFlag = flag & 2;
2681  }
2682 #endif
2683 
2684  if (signal->theData[0] == 11001)
2685  {
2686  // XXX print hash list if no sublist
2687  Uint32 list = 0;
2688  if (signal->getLength() > 1)
2689  list = signal->theData[1];
2690 
2691  Page_sublist& pl = *m_page_sublist[list];
2692  Ptr<Page_entry> ptr;
2693 
2694  for (pl.first(ptr); ptr.i != RNIL; pl.next(ptr))
2695  {
2696  ndbout << ptr << endl;
2697  infoEvent(" PE [ file: %d page: %d ] state: %x lsn: %lld lcp: %d busy: %d req-list: %d",
2698  ptr.p->m_file_no, ptr.p->m_page_no,
2699  ptr.p->m_state, ptr.p->m_lsn, ptr.p->m_last_lcp,
2700  ptr.p->m_busy_count,
2701  !ptr.p->m_requests.isEmpty());
2702  }
2703  }
2704 
2705  if (signal->theData[0] == 11002 && signal->getLength() == 3)
2706  {
2707  Page_entry key;
2708  key.m_file_no = signal->theData[1];
2709  key.m_page_no = signal->theData[2];
2710 
2711  Ptr<Page_entry> ptr;
2712  if (pl_hash.find(ptr, key))
2713  {
2714  ndbout << "pageout " << ptr << endl;
2715  if (c_tup != 0)
2716  c_tup->disk_page_unmap_callback(0,
2717  ptr.p->m_real_page_i,
2718  ptr.p->m_dirty_count);
2719  pageout(signal, ptr);
2720  }
2721  }
2722 
2723 
2724  if (signal->theData[0] == 11003)
2725  {
2726 #ifdef VM_TRACE
2727  verify_page_lists();
2728  dump_page_lists();
2729 #else
2730  ndbout << "Only in VM_TRACE builds" << endl;
2731 #endif
2732  }
2733 
2734  if (signal->theData[0] == 11004)
2735  {
2736  ndbout << "Dump LCP bucket m_lcp_outstanding: " << m_lcp_outstanding;
2737  if (m_lcp_curr_bucket != ~(Uint32)0)
2738  {
2739  Page_hashlist::Iterator iter;
2740  pl_hash.next(m_lcp_curr_bucket, iter);
2741 
2742  ndbout_c(" %d", m_lcp_curr_bucket);
2743 
2744  while (iter.curr.i != RNIL && iter.bucket == m_lcp_curr_bucket)
2745  {
2746  Ptr<Page_entry>& ptr = iter.curr;
2747  ndbout << ptr << endl;
2748  pl_hash.next(iter);
2749  }
2750 
2751  ndbout_c("-- done");
2752  }
2753  else
2754  {
2755  ndbout_c(" == ~0");
2756  }
2757  }
2758 
2759  if (signal->theData[0] == 11005)
2760  {
2761  g_dbg_lcp = !g_dbg_lcp;
2762  }
2763 
2764  if (signal->theData[0] == 11006)
2765  {
2766  SET_ERROR_INSERT_VALUE(11006);
2767  }
2768 
2769  if (signal->theData[0] == 11007)
2770  {
2771  SET_ERROR_INSERT_VALUE(11007);
2772  }
2773 
2774  if (signal->theData[0] == 11008)
2775  {
2776  SET_ERROR_INSERT_VALUE(11008);
2777  }
2778 
2779  if (signal->theData[0] == 11009)
2780  {
2781  SET_ERROR_INSERT_VALUE(11009);
2782  }
2783 }
2784 
2785 void
2786 Pgman::execDBINFO_SCANREQ(Signal *signal)
2787 {
2788  DbinfoScanReq req= *(DbinfoScanReq*)signal->theData;
2789  Ndbinfo::Ratelimit rl;
2790 
2791  jamEntry();
2792  switch(req.tableId) {
2793  case Ndbinfo::DISKPAGEBUFFER_TABLEID:
2794  {
2795  jam();
2796  Ndbinfo::Row row(signal, req);
2797  row.write_uint32(getOwnNodeId());
2798  row.write_uint32(instance()); // block instance
2799  row.write_uint64(m_stats.m_pages_written);
2800  row.write_uint64(m_stats.m_pages_written_lcp);
2801  row.write_uint64(m_stats.m_pages_read);
2802  row.write_uint64(m_stats.m_log_waits);
2803  row.write_uint64(m_stats.m_page_requests_direct_return);
2804  row.write_uint64(m_stats.m_page_requests_wait_q);
2805  row.write_uint64(m_stats.m_page_requests_wait_io);
2806 
2807  ndbinfo_send_row(signal, req, row, rl);
2808  }
2809  default:
2810  break;
2811  }
2812  ndbinfo_send_scan_conf(signal, req, rl);
2813 }