MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
FastScheduler.cpp
1 /*
2  Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 #include "FastScheduler.hpp"
19 #include "RefConvert.hpp"
20 
21 #include "Emulator.hpp"
22 #include "VMSignal.hpp"
23 
24 #include <SignalLoggerManager.hpp>
25 #include <BlockNumbers.h>
26 #include <GlobalSignalNumbers.h>
27 #include <signaldata/EventReport.hpp>
28 #include "LongSignal.hpp"
29 #include <NdbTick.h>
30 
31 #define MIN_NUMBER_OF_SIG_PER_DO_JOB 64
32 #define MAX_NUMBER_OF_SIG_PER_DO_JOB 2048
33 #define EXTRA_SIGNALS_PER_DO_JOB 32
34 
35 FastScheduler::FastScheduler()
36 {
37  // These constants work for sun only, but they should be initated from
38  // Emulator.C as soon as VMTime has been initiated.
39  theJobBuffers[0].newBuffer(JBASIZE);
40  theJobBuffers[1].newBuffer(JBBSIZE);
41  theJobBuffers[2].newBuffer(JBCSIZE);
42  theJobBuffers[3].newBuffer(JBDSIZE);
43  clear();
44 }
45 
46 FastScheduler::~FastScheduler()
47 {
48 }
49 
50 void
51 FastScheduler::clear()
52 {
53  int i;
54  // Make sure the restart signals are not sent too early
55  // the prio is set back in 'main' using the 'ready' method.
56  globalData.highestAvailablePrio = LEVEL_IDLE;
57  globalData.sendPackedActivated = 0;
58  globalData.activateSendPacked = 0;
59  for (i = 0; i < JB_LEVELS; i++){
60  theJobBuffers[i].clear();
61  }
62  globalData.JobCounter = 0;
63  globalData.JobLap = 0;
64  globalData.loopMax = 32;
65  globalData.VMSignals[0].header.theSignalId = 0;
66 
67  theDoJobTotalCounter = 0;
68  theDoJobCallCounter = 0;
69 }
70 
71 void
72 FastScheduler::activateSendPacked()
73 {
74  globalData.sendPackedActivated = 1;
75  globalData.activateSendPacked = 0;
76  globalData.loopMax = 2048;
77 }//FastScheduler::activateSendPacked()
78 
79 //------------------------------------------------------------------------
80 // sendPacked is executed at the end of the loop.
81 // To ensure that we don't send any messages before executing all local
82 // packed signals we do another turn in the loop (unless we have already
83 // executed too many signals in the loop).
84 //------------------------------------------------------------------------
85 void
86 FastScheduler::doJob()
87 {
88  Uint32 loopCount = 0;
89  Uint32 TminLoops = getBOccupancy() + EXTRA_SIGNALS_PER_DO_JOB;
90  Uint32 TloopMax = (Uint32)globalData.loopMax;
91  if (TminLoops < TloopMax) {
92  TloopMax = TminLoops;
93  }//if
94  if (TloopMax < MIN_NUMBER_OF_SIG_PER_DO_JOB) {
95  TloopMax = MIN_NUMBER_OF_SIG_PER_DO_JOB;
96  }//if
97  register Signal* signal = getVMSignals();
98  register Uint32 tHighPrio= globalData.highestAvailablePrio;
99  do{
100  while ((tHighPrio < LEVEL_IDLE) && (loopCount < TloopMax)) {
101  // signal->garbage_register();
102  // To ensure we find bugs quickly
103  register Uint32 gsnbnr = theJobBuffers[tHighPrio].retrieve(signal);
104  // also strip any instance bits since this is non-MT code
105  register BlockNumber reg_bnr = gsnbnr & NDBMT_BLOCK_MASK;
106  register GlobalSignalNumber reg_gsn = gsnbnr >> 16;
107  globalData.incrementWatchDogCounter(1);
108  if (reg_bnr > 0) {
109  Uint32 tJobCounter = globalData.JobCounter;
110  Uint64 tJobLap = globalData.JobLap;
111  SimulatedBlock* b = globalData.getBlock(reg_bnr);
112  theJobPriority[tJobCounter] = (Uint8)tHighPrio;
113  globalData.JobCounter = (tJobCounter + 1) & 4095;
114  globalData.JobLap = tJobLap + 1;
115 
116 #ifdef VM_TRACE_TIME
117  Uint32 us1, us2;
118  Uint64 ms1, ms2;
119  NdbTick_CurrentMicrosecond(&ms1, &us1);
120  b->m_currentGsn = reg_gsn;
121 #endif
122 
123 #ifdef VM_TRACE
124  {
125  if (globalData.testOn) {
126  signal->header.theVerId_signalNumber = reg_gsn;
127  signal->header.theReceiversBlockNumber = reg_bnr;
128 
129  globalSignalLoggers.executeSignal(signal->header,
130  tHighPrio,
131  &signal->theData[0],
132  globalData.ownId);
133  }//if
134  }
135 #endif
136  b->executeFunction(reg_gsn, signal);
137 #ifdef VM_TRACE_TIME
138  NdbTick_CurrentMicrosecond(&ms2, &us2);
139  Uint64 diff = ms2;
140  diff -= ms1;
141  diff *= 1000000;
142  diff += us2;
143  diff -= us1;
144  b->addTime(reg_gsn, diff);
145 #endif
146  tHighPrio = globalData.highestAvailablePrio;
147  } else {
148  tHighPrio++;
149  globalData.highestAvailablePrio = tHighPrio;
150  }//if
151  loopCount++;
152  }//while
153  sendPacked();
154  tHighPrio = globalData.highestAvailablePrio;
155  if(getBOccupancy() > MAX_OCCUPANCY)
156  {
157  if(loopCount != TloopMax)
158  abort();
159  assert( loopCount == TloopMax );
160  TloopMax += 512;
161  }
162  } while ((getBOccupancy() > MAX_OCCUPANCY) ||
163  ((loopCount < TloopMax) &&
164  (tHighPrio < LEVEL_IDLE)));
165 
166  theDoJobCallCounter ++;
167  theDoJobTotalCounter += loopCount;
168  if (theDoJobCallCounter == 8192) {
169  reportDoJobStatistics(theDoJobTotalCounter >> 13);
170  theDoJobCallCounter = 0;
171  theDoJobTotalCounter = 0;
172  }//if
173 
174 }//FastScheduler::doJob()
175 
176 void
177 FastScheduler::postPoll()
178 {
179  Signal * signal = getVMSignals();
180  SimulatedBlock* b_fs = globalData.getBlock(NDBFS);
181  b_fs->executeFunction(GSN_SEND_PACKED, signal);
182 }
183 
184 void FastScheduler::sendPacked()
185 {
186  if (globalData.sendPackedActivated == 1) {
187  SimulatedBlock* b_lqh = globalData.getBlock(DBLQH);
188  SimulatedBlock* b_tc = globalData.getBlock(DBTC);
189  SimulatedBlock* b_tup = globalData.getBlock(DBTUP);
190  SimulatedBlock* b_fs = globalData.getBlock(NDBFS);
191  Signal * signal = getVMSignals();
192  b_lqh->executeFunction(GSN_SEND_PACKED, signal);
193  b_tc->executeFunction(GSN_SEND_PACKED, signal);
194  b_tup->executeFunction(GSN_SEND_PACKED, signal);
195  b_fs->executeFunction(GSN_SEND_PACKED, signal);
196  return;
197  } else if (globalData.activateSendPacked == 0) {
198  return;
199  } else {
200  activateSendPacked();
201  }//if
202  return;
203 }//FastScheduler::sendPacked()
204 
205 Uint32
206 APZJobBuffer::retrieve(Signal* signal)
207 {
208  Uint32 tOccupancy = theOccupancy;
209  Uint32 myRPtr = rPtr;
210  BufferEntry& buf = buffer[myRPtr];
211  Uint32 gsnbnr;
212  Uint32 cond = (++myRPtr == bufSize) - 1;
213  Uint32 tRecBlockNo = buf.header.theReceiversBlockNumber;
214 
215  if (tOccupancy != 0) {
216  if (tRecBlockNo != 0) {
217  // Transform protocol to signal.
218  rPtr = myRPtr & cond;
219  theOccupancy = tOccupancy - 1;
220  gsnbnr = buf.header.theVerId_signalNumber << 16 | tRecBlockNo;
221 
222  Uint32 tSignalId = globalData.theSignalId;
223  Uint32 tLength = buf.header.theLength;
224  Uint32 tFirstData = buf.theDataRegister[0];
225  signal->header = buf.header;
226 
227  // Recall our signal Id for restart purposes
228  buf.header.theSignalId = tSignalId;
229  globalData.theSignalId = tSignalId + 1;
230 
231  Uint32* tDataRegPtr = &buf.theDataRegister[0];
232  Uint32* tSigDataPtr = signal->getDataPtrSend();
233  *tSigDataPtr = tFirstData;
234  tDataRegPtr++;
235  tSigDataPtr++;
236  Uint32 tLengthCopied = 1;
237  while (tLengthCopied < tLength) {
238  Uint32 tData0 = tDataRegPtr[0];
239  Uint32 tData1 = tDataRegPtr[1];
240  Uint32 tData2 = tDataRegPtr[2];
241  Uint32 tData3 = tDataRegPtr[3];
242 
243  tDataRegPtr += 4;
244  tLengthCopied += 4;
245 
246  tSigDataPtr[0] = tData0;
247  tSigDataPtr[1] = tData1;
248  tSigDataPtr[2] = tData2;
249  tSigDataPtr[3] = tData3;
250  tSigDataPtr += 4;
251  }//while
252 
253  tSigDataPtr = signal->m_sectionPtrI;
254  tDataRegPtr = buf.theDataRegister + buf.header.theLength;
255  Uint32 ptr0 = * tDataRegPtr ++;
256  Uint32 ptr1 = * tDataRegPtr ++;
257  Uint32 ptr2 = * tDataRegPtr ++;
258  * tSigDataPtr ++ = ptr0;
259  * tSigDataPtr ++ = ptr1;
260  * tSigDataPtr ++ = ptr2;
261 
262  //---------------------------------------------------------
263  // Prefetch of buffer[rPtr] is done here. We prefetch for
264  // read both the first cache line and the next 64 byte
265  // entry
266  //---------------------------------------------------------
267  NDB_PREFETCH_READ((void*)&buffer[rPtr]);
268  NDB_PREFETCH_READ((void*)(((char*)&buffer[rPtr]) + 64));
269  return gsnbnr;
270  } else {
271  bnr_error();
272  return 0; // Will never come here, simply to keep GCC happy.
273  }//if
274  } else {
275  //------------------------------------------------------------
276  // The Job Buffer was empty, signal this by return zero.
277  //------------------------------------------------------------
278  return 0;
279  }//if
280 }//APZJobBuffer::retrieve()
281 
282 void
283 APZJobBuffer::signal2buffer(Signal* signal,
284  BlockNumber bnr, GlobalSignalNumber gsn,
285  BufferEntry& buf)
286 {
287  Uint32 tSignalId = globalData.theSignalId;
288  Uint32 tFirstData = signal->theData[0];
289  Uint32 tLength = signal->header.theLength + signal->header.m_noOfSections;
290  Uint32 tSigId = buf.header.theSignalId;
291 
292  buf.header = signal->header;
293  buf.header.theVerId_signalNumber = gsn;
294  buf.header.theReceiversBlockNumber = bnr;
295  buf.header.theSendersSignalId = tSignalId - 1;
296  buf.header.theSignalId = tSigId;
297  buf.theDataRegister[0] = tFirstData;
298 
299  Uint32 tLengthCopied = 1;
300  Uint32* tSigDataPtr = &signal->theData[1];
301  Uint32* tDataRegPtr = &buf.theDataRegister[1];
302  while (tLengthCopied < tLength) {
303  Uint32 tData0 = tSigDataPtr[0];
304  Uint32 tData1 = tSigDataPtr[1];
305  Uint32 tData2 = tSigDataPtr[2];
306  Uint32 tData3 = tSigDataPtr[3];
307 
308  tLengthCopied += 4;
309  tSigDataPtr += 4;
310 
311  tDataRegPtr[0] = tData0;
312  tDataRegPtr[1] = tData1;
313  tDataRegPtr[2] = tData2;
314  tDataRegPtr[3] = tData3;
315  tDataRegPtr += 4;
316  }//while
317 }//APZJobBuffer::signal2buffer()
318 
319 void
320 APZJobBuffer::insert(const SignalHeader * const sh,
321  const Uint32 * const theData, const Uint32 secPtrI[3]){
322  Uint32 tOccupancy = theOccupancy + 1;
323  Uint32 myWPtr = wPtr;
324  register BufferEntry& buf = buffer[myWPtr];
325 
326  if (tOccupancy < bufSize) {
327  Uint32 cond = (++myWPtr == bufSize) - 1;
328  wPtr = myWPtr & cond;
329  theOccupancy = tOccupancy;
330 
331  buf.header = * sh;
332  const Uint32 len = buf.header.theLength;
333  memcpy(buf.theDataRegister, theData, 4 * len);
334  memcpy(&buf.theDataRegister[len], &secPtrI[0], 4 * 3);
335  //---------------------------------------------------------
336  // Prefetch of buffer[wPtr] is done here. We prefetch for
337  // write both the first cache line and the next 64 byte
338  // entry
339  //---------------------------------------------------------
340  NDB_PREFETCH_WRITE((void*)&buffer[wPtr]);
341  NDB_PREFETCH_WRITE((void*)(((char*)&buffer[wPtr]) + 64));
342  } else {
343  jbuf_error();
344  }//if
345 }
346 APZJobBuffer::APZJobBuffer()
347  : bufSize(0), buffer(NULL), memRef(NULL)
348 {
349  clear();
350 }
351 
352 APZJobBuffer::~APZJobBuffer()
353 {
354  delete [] buffer;
355 }
356 
357 void
358 APZJobBuffer::newBuffer(int size)
359 {
360  buffer = new BufferEntry[size + 1]; // +1 to support "overrrun"
361  if(buffer){
362 #ifndef NDB_PURIFY
363  ::memset(buffer, 0, (size * sizeof(BufferEntry)));
364 #endif
365  bufSize = size;
366  } else
367  bufSize = 0;
368 }
369 
370 void
371 APZJobBuffer::clear()
372 {
373  rPtr = 0;
374  wPtr = 0;
375  theOccupancy = 0;
376 }
377 
383 void print_restart(FILE * output, Signal* signal, Uint32 aLevel);
384 
385 void FastScheduler::dumpSignalMemory(Uint32 thr_no, FILE * output)
386 {
387  SignalT<25> signalT;
388  Signal * signal = new (&signalT) Signal(0);
389  Uint32 ReadPtr[5];
390  Uint32 tJob;
391  Uint32 tLastJob;
392 
393  /* Single threaded ndbd scheduler, no threads. */
394  assert(thr_no == 0);
395 
396  fprintf(output, "\n");
397 
398  if (globalData.JobLap > 4095) {
399  if (globalData.JobCounter != 0)
400  tJob = globalData.JobCounter - 1;
401  else
402  tJob = 4095;
403  tLastJob = globalData.JobCounter;
404  } else {
405  if (globalData.JobCounter == 0)
406  return; // No signals sent
407  else {
408  tJob = globalData.JobCounter - 1;
409  tLastJob = 4095;
410  }
411  }
412  ReadPtr[0] = theJobBuffers[0].getReadPtr();
413  ReadPtr[1] = theJobBuffers[1].getReadPtr();
414  ReadPtr[2] = theJobBuffers[2].getReadPtr();
415  ReadPtr[3] = theJobBuffers[3].getReadPtr();
416 
417  do {
418  unsigned char tLevel = theJobPriority[tJob];
419  globalData.incrementWatchDogCounter(4);
420  if (ReadPtr[tLevel] == 0)
421  ReadPtr[tLevel] = theJobBuffers[tLevel].getBufSize() - 1;
422  else
423  ReadPtr[tLevel]--;
424 
425  theJobBuffers[tLevel].retrieveDump(signal, ReadPtr[tLevel]);
426  // strip instance bits since this in non-MT code
427  signal->header.theReceiversBlockNumber &= NDBMT_BLOCK_MASK;
428  print_restart(output, signal, tLevel);
429 
430  if (tJob == 0)
431  tJob = 4095;
432  else
433  tJob--;
434 
435  } while (tJob != tLastJob);
436  fflush(output);
437 }
438 
439 void
440 FastScheduler::prio_level_error()
441 {
442  ERROR_SET(ecError, NDBD_EXIT_WRONG_PRIO_LEVEL,
443  "Wrong Priority Level", "FastScheduler.C");
444 }
445 
446 void
447 jbuf_error()
448 {
449  ERROR_SET(ecError, NDBD_EXIT_BLOCK_JBUFCONGESTION,
450  "Job Buffer Full", "APZJobBuffer.C");
451 }
452 
453 void
454 bnr_error()
455 {
456  ERROR_SET(ecError, NDBD_EXIT_BLOCK_BNR_ZERO,
457  "Block Number Zero", "FastScheduler.C");
458 }
459 
460 void
461 print_restart(FILE * output, Signal* signal, Uint32 aLevel)
462 {
463  fprintf(output, "--------------- Signal ----------------\n");
465  signal->header,
466  aLevel,
467  globalData.ownId,
468  true);
470  signal->header,
471  &signal->theData[0]);
472 }
473 
474 void
475 FastScheduler::traceDumpPrepare(NdbShutdownType&)
476 {
477  /* No-operation in single-threaded ndbd. */
478 }
479 
480 Uint32
481 FastScheduler::traceDumpGetNumThreads()
482 {
483  return 1; // Single-threaded ndbd scheduler
484 }
485 
486 int
487 FastScheduler::traceDumpGetCurrentThread()
488 {
489  return -1; // Single-threaded ndbd scheduler
490 }
491 
492 bool
493 FastScheduler::traceDumpGetJam(Uint32 thr_no, Uint32 & jamBlockNumber,
494  const Uint32 * & thrdTheEmulatedJam,
495  Uint32 & thrdTheEmulatedJamIndex)
496 {
497  /* Single threaded ndbd scheduler, no threads. */
498  assert(thr_no == 0);
499 
500 #ifdef NO_EMULATED_JAM
501  jamBlockNumber = 0;
502  thrdTheEmulatedJam = NULL;
503  thrdTheEmulatedJamIndex = 0;
504 #else
505  const EmulatedJamBuffer *jamBuffer =
506  (EmulatedJamBuffer *)NdbThread_GetTlsKey(NDB_THREAD_TLS_JAM);
507  thrdTheEmulatedJam = jamBuffer->theEmulatedJam;
508  thrdTheEmulatedJamIndex = jamBuffer->theEmulatedJamIndex;
509  jamBlockNumber = jamBuffer->theEmulatedJamBlockNumber;
510 #endif
511  return true;
512 }
513 
514 
521 void
522 FastScheduler::reportDoJobStatistics(Uint32 tMeanLoopCount) {
523  SignalT<2> signal;
524 
525  memset(&signal.header, 0, sizeof(signal.header));
526  signal.header.theLength = 2;
527  signal.header.theSendersSignalId = 0;
528  signal.header.theSendersBlockRef = numberToRef(0, 0);
529  signal.header.theVerId_signalNumber = GSN_EVENT_REP;
530  signal.header.theReceiversBlockNumber = CMVMI;
531 
532  signal.theData[0] = NDB_LE_JobStatistic;
533  signal.theData[1] = tMeanLoopCount;
534 
535  Uint32 secPtr[3];
536  execute(&signal.header, JBA, signal.theData, secPtr);
537 }
538 
539 void
540 FastScheduler::reportThreadConfigLoop(Uint32 expired_time,
541  Uint32 extra_constant,
542  Uint32 *no_exec_loops,
543  Uint32 *tot_exec_time,
544  Uint32 *no_extra_loops,
545  Uint32 *tot_extra_time)
546 {
547  SignalT<6> signal;
548 
549  memset(&signal.header, 0, sizeof(signal.header));
550  signal.header.theLength = 6;
551  signal.header.theSendersSignalId = 0;
552  signal.header.theSendersBlockRef = numberToRef(0, 0);
553  signal.header.theVerId_signalNumber = GSN_EVENT_REP;
554  signal.header.theReceiversBlockNumber = CMVMI;
555 
556  signal.theData[0] = NDB_LE_ThreadConfigLoop;
557  signal.theData[1] = expired_time;
558  signal.theData[2] = extra_constant;
559  signal.theData[3] = (*tot_exec_time)/(*no_exec_loops);
560  signal.theData[4] = *no_extra_loops;
561  if (*no_extra_loops > 0)
562  signal.theData[5] = (*tot_extra_time)/(*no_extra_loops);
563  else
564  signal.theData[5] = 0;
565 
566  *no_exec_loops = 0;
567  *tot_exec_time = 0;
568  *no_extra_loops = 0;
569  *tot_extra_time = 0;
570 
571  Uint32 secPtr[3];
572  execute(&signal.header, JBA, signal.theData, secPtr);
573 }
574 
575 static NdbMutex g_mm_mutex;
576 
577 void
578 mt_mem_manager_init()
579 {
580  NdbMutex_Init(&g_mm_mutex);
581 }
582 
583 void
584 mt_mem_manager_lock()
585 {
586  NdbMutex_Lock(&g_mm_mutex);
587 }
588 
589 void
590 mt_mem_manager_unlock()
591 {
592  NdbMutex_Unlock(&g_mm_mutex);
593 }