MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
FastScheduler.hpp
1 /*
2  Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 #ifndef FastScheduler_H
19 #define FastScheduler_H
20 
21 #include <VMSignal.hpp>
22 #include <kernel_types.h>
23 #include <Prio.hpp>
24 #include <SignalLoggerManager.hpp>
25 #include <SimulatedBlock.hpp>
26 #include <ErrorHandlingMacros.hpp>
27 #include <GlobalData.hpp>
28 #include <TransporterDefinitions.hpp>
29 #include <portlib/ndb_prefetch.h>
30 
31 #define MAX_OCCUPANCY 1024
32 
33 #define JBASIZE 1280 // Jobs which have dead lines to meet use this level
34 #define JBBSIZE 4096 // Most jobs use this level
35 #define JBCSIZE 64 // Only used by STTOR and STTORRY currently
36 #define JBDSIZE 4096 // Time Queue uses this level for storage, not supported
37  // as priority level
38 void bnr_error();
39 void jbuf_error();
40 class Signal;
41 class Block;
42 
44 {
45 public:
46  SignalHeader header;
47  Uint32 theDataRegister[25];
48 };
49 
51 {
52 public:
53  APZJobBuffer();
54  ~APZJobBuffer();
55 
56  void newBuffer(int size);
57 
58  void insert(Signal* signal, BlockNumber bnr, GlobalSignalNumber gsn);
59  void insert(const SignalHeader * const sh, const Uint32 * const theData, const Uint32 secPtrI[3]);
60  void insert(Signal* signal, BlockNumber bnr, GlobalSignalNumber gsn,
61  Uint32 myWPtr);
62 
63  Uint32 retrieve(Signal *signal);
64  void retrieve(Signal *signal, Uint32 myRptr);
65 
69  void retrieveDump(Signal *signal, Uint32 myRptr);
70 
71  void clear();
72  Uint32 getOccupancy() const;
73 
74  Uint32 getReadPtr() const;
75  Uint32 getWritePtr() const;
76  Uint32 getBufSize() const;
77 
78 private:
79  void signal2buffer(Signal* signal, BlockNumber bnr,
80  GlobalSignalNumber gsn, BufferEntry& buf);
81  Uint32 rPtr;
82  Uint32 wPtr;
83  Uint32 theOccupancy;
84  Uint32 bufSize;
85  BufferEntry* buffer;
86  BufferEntry* memRef;
87 };
88 
89 
91 {
92 public:
93  FastScheduler();
94  ~FastScheduler();
95 
96  void doJob();
97  void postPoll();
98  int checkDoJob();
99 
100  void activateSendPacked();
101 
102  void execute(Signal* signal,
103  Priority prio,
104  BlockNumber bnr,
105  GlobalSignalNumber gsn);
106 
107  void execute(const SignalHeader * const sh,
108  Uint8 prio, const Uint32 * const theData, const Uint32 secPtr[3]);
109 
110  void clear();
111  Signal* getVMSignals();
112 
113  Priority highestAvailablePrio() const;
114  Uint32 getBOccupancy() const;
115  void sendPacked();
116 
117  void insertTimeQueue(Signal* aSignal, BlockNumber bnr,
118  GlobalSignalNumber gsn, Uint32 aIndex);
119  void scheduleTimeQueue(Uint32 aIndex);
120 
121  /*
122  The following implement aspects of ErrorReporter that differs between
123  singlethreaded and multithread ndbd.
124  */
125 
126  /* Called before dumping, intended to stop any still running processing. */
127  void traceDumpPrepare(NdbShutdownType&);
128  /* Number of threads to create trace files for (thread id 0 .. N-1). */
129  Uint32 traceDumpGetNumThreads();
130 
131  int traceDumpGetCurrentThread(); // returns -1 if not found
132 
133  /* Get jam() buffers etc. for specific thread. */
134  bool traceDumpGetJam(Uint32 thr_no, Uint32 & jamBlockNumber,
135  const Uint32 * & thrdTheEmulatedJam,
136  Uint32 & thrdTheEmulatedJamIndex);
137  /* Produce a signal dump. */
138  void dumpSignalMemory(Uint32 thr_no, FILE * output);
139 
140  void reportThreadConfigLoop(Uint32 expired_time, Uint32 extra_constant,
141  Uint32 *no_exec_loops, Uint32 *tot_exec_time,
142  Uint32 *no_extra_loops, Uint32 *tot_extra_time);
143 
144 private:
145  void highestAvailablePrio(Priority prio);
146  void reportJob(Priority aPriority);
147  void prio_level_error();
148 
149  Uint32 theDoJobTotalCounter;
150  Uint32 theDoJobCallCounter;
151  Uint8 theJobPriority[4096];
152  APZJobBuffer theJobBuffers[JB_LEVELS];
153 
154  void reportDoJobStatistics(Uint32 meanLoopCount);
155 };
156 
157 inline
158 Uint32
159 FastScheduler::getBOccupancy() const {
160  return theJobBuffers[JBB].getOccupancy();
161 }//FastScheduler::getBOccupancy()
162 
163 inline
164 int
165 FastScheduler::checkDoJob()
166 {
167  /*
168  * Job buffer overload protetction
169  * If the job buffer B is filled over a certain limit start
170  * to execute the signals in the job buffer's
171  */
172  if (getBOccupancy() < MAX_OCCUPANCY) {
173  return 0;
174  } else {
175  doJob();
176  return 1;
177  }//if
178 }//FastScheduler::checkDoJob()
179 
180 inline
181 void
182 FastScheduler::reportJob(Priority aPriority)
183 {
184  Uint32 tJobCounter = globalData.JobCounter;
185  Uint64 tJobLap = globalData.JobLap;
186  theJobPriority[tJobCounter] = (Uint8)aPriority;
187  globalData.JobCounter = (tJobCounter + 1) & 4095;
188  globalData.JobLap = tJobLap + 1;
189 }
190 
191 inline
192 Priority
193 FastScheduler::highestAvailablePrio() const
194 {
195  return (Priority)globalData.highestAvailablePrio;
196 }
197 
198 inline
199 void
200 FastScheduler::highestAvailablePrio(Priority prio)
201 {
202  globalData.highestAvailablePrio = (Uint32)prio;
203 }
204 
205 inline
206 Signal*
207 FastScheduler::getVMSignals()
208 {
209  return &globalData.VMSignals[0];
210 }
211 
212 
213 // Inserts of a protocol object into the Job Buffer.
214 inline
215 void
216 FastScheduler::execute(const SignalHeader * const sh, Uint8 prio,
217  const Uint32 * const theData, const Uint32 secPtrI[3]){
218 #ifdef VM_TRACE
219  if (prio >= LEVEL_IDLE)
220  prio_level_error();
221 #endif
222 
223  theJobBuffers[prio].insert(sh, theData, secPtrI);
224  if (prio < (Uint8)highestAvailablePrio())
225  highestAvailablePrio((Priority)prio);
226 }
227 
228 inline
229 void
230 FastScheduler::execute(Signal* signal, Priority prio,
231  BlockNumber bnr, GlobalSignalNumber gsn)
232 {
233 #ifdef VM_TRACE
234  if (prio >= LEVEL_IDLE)
235  prio_level_error();
236 #endif
237  theJobBuffers[prio].insert(signal, bnr, gsn);
238  if (prio < highestAvailablePrio())
239  highestAvailablePrio(prio);
240 }
241 
242 inline
243 void
244 FastScheduler::insertTimeQueue(Signal* signal, BlockNumber bnr,
245  GlobalSignalNumber gsn, Uint32 aIndex)
246 {
247  theJobBuffers[3].insert(signal, bnr, gsn, aIndex);
248 }
249 
250 inline
251 void
252 FastScheduler::scheduleTimeQueue(Uint32 aIndex)
253 {
254  Signal* signal = getVMSignals();
255  theJobBuffers[3].retrieve(signal, aIndex);
256  theJobBuffers[0].insert
257  (signal,
258  (BlockNumber)signal->header.theReceiversBlockNumber,
259  (GlobalSignalNumber)signal->header.theVerId_signalNumber);
260  if (highestAvailablePrio() > JBA)
261  highestAvailablePrio(JBA);
262 
263  signal->header.m_noOfSections = 0; // Or else sendPacked might pick it up
264 }
265 
266 inline
267 Uint32
268 APZJobBuffer::getWritePtr() const
269 {
270  return wPtr;
271 }
272 
273 inline
274 Uint32
275 APZJobBuffer::getReadPtr() const
276 {
277  return rPtr;
278 }
279 
280 inline
281 Uint32
282 APZJobBuffer::getOccupancy() const
283 {
284  return theOccupancy;
285 }
286 
287 inline
288 Uint32
289 APZJobBuffer::getBufSize() const
290 {
291  return bufSize;
292 }
293 
294 inline
295 void
296 APZJobBuffer::retrieve(Signal* signal, Uint32 myRptr)
297 {
298  register BufferEntry& buf = buffer[myRptr];
299 
300  buf.header.theSignalId = globalData.theSignalId++;
301 
302  signal->header = buf.header;
303 
304  Uint32 *from = (Uint32*) &buf.theDataRegister[0];
305  Uint32 *to = (Uint32*) &signal->theData[0];
306  Uint32 noOfWords = buf.header.theLength + buf.header.m_noOfSections;
307  for(; noOfWords; noOfWords--)
308  *to++ = *from++;
309  // Copy sections references (copy all without if-statements)
310  return;
311 }
312 
313 inline
314 void
315 APZJobBuffer::retrieveDump(Signal* signal, Uint32 myRptr)
316 {
321  register BufferEntry& buf = buffer[myRptr];
322  signal->header = buf.header;
323 
324  Uint32 *from = (Uint32*) &buf.theDataRegister[0];
325  Uint32 *to = (Uint32*) &signal->theData[0];
326  Uint32 noOfWords = buf.header.theLength;
327  for(; noOfWords; noOfWords--)
328  *to++ = *from++;
329  return;
330 }
331 
332 inline
333 void
334 APZJobBuffer::insert(Signal* signal,
335  BlockNumber bnr, GlobalSignalNumber gsn)
336 {
337  Uint32 tOccupancy = theOccupancy + 1;
338  Uint32 myWPtr = wPtr;
339  if (tOccupancy < bufSize) {
340  register BufferEntry& buf = buffer[myWPtr];
341  Uint32 cond = (++myWPtr == bufSize) - 1;
342  wPtr = myWPtr & cond;
343  theOccupancy = tOccupancy;
344  signal2buffer(signal, bnr, gsn, buf);
345  //---------------------------------------------------------
346  // Prefetch of buffer[wPtr] is done here. We prefetch for
347  // write both the first cache line and the next 64 byte
348  // entry
349  //---------------------------------------------------------
350  NDB_PREFETCH_WRITE((void*)&buffer[wPtr]);
351  NDB_PREFETCH_WRITE((void*)(((char*)&buffer[wPtr]) + 64));
352  } else {
353  jbuf_error();
354  }//if
355 }
356 
357 
358 inline
359 void
360 APZJobBuffer::insert(Signal* signal, BlockNumber bnr,
361  GlobalSignalNumber gsn, Uint32 myWPtr)
362 {
363  register BufferEntry& buf = buffer[myWPtr];
364  signal2buffer(signal, bnr, gsn, buf);
365 }
366 
367 #endif