MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
AsyncIoThread.cpp
1 /* Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
2 
3  This program is free software; you can redistribute it and/or modify
4  it under the terms of the GNU General Public License as published by
5  the Free Software Foundation; version 2 of the License.
6 
7  This program is distributed in the hope that it will be useful,
8  but WITHOUT ANY WARRANTY; without even the implied warranty of
9  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  GNU General Public License for more details.
11 
12  You should have received a copy of the GNU General Public License
13  along with this program; if not, write to the Free Software
14  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
15 
16 #include <ndb_global.h>
17 
18 #include "AsyncIoThread.hpp"
19 #include "AsyncFile.hpp"
20 #include <ErrorHandlingMacros.hpp>
21 #include <kernel_types.h>
22 #include <NdbThread.h>
23 #include <signaldata/FsRef.hpp>
24 #include <signaldata/FsOpenReq.hpp>
25 #include <signaldata/FsReadWriteReq.hpp>
26 #include <signaldata/AllocMem.hpp>
27 #include "Ndbfs.hpp"
28 #include <NdbSleep.h>
29 
30 #include <EventLogger.hpp>
31 extern EventLogger * g_eventLogger;
32 
33 AsyncIoThread::AsyncIoThread(class Ndbfs& fs, bool bound)
34  : m_fs(fs)
35 {
36  m_current_file = 0;
37  if (bound)
38  {
39  theMemoryChannelPtr = &m_fs.theToBoundThreads;
40  }
41  else
42  {
43  theMemoryChannelPtr = &m_fs.theToUnboundThreads;
44  }
45  theReportTo = &m_fs.theFromThreads;
46 }
47 
48 static int numAsyncFiles = 0;
49 
50 extern "C"
51 void *
52 runAsyncIoThread(void* arg)
53 {
54  ((AsyncIoThread*)arg)->run();
55  return (NULL);
56 }
57 
58 
59 struct NdbThread*
60 AsyncIoThread::doStart()
61 {
62  // Stacksize for filesystem threads
63 #if !defined(DBUG_OFF) && defined (__hpux)
64  // Empirical evidence indicates at least 32k
65  const NDB_THREAD_STACKSIZE stackSize = 32768;
66 #else
67  // Otherwise an 8k stack should be enough
68  const NDB_THREAD_STACKSIZE stackSize = 8192;
69 #endif
70 
71  char buf[16];
72  numAsyncFiles++;
73  BaseString::snprintf(buf, sizeof(buf), "AsyncIoThread%d", numAsyncFiles);
74 
75  theStartMutexPtr = NdbMutex_Create();
76  theStartConditionPtr = NdbCondition_Create();
77  NdbMutex_Lock(theStartMutexPtr);
78  theStartFlag = false;
79 
80  theThreadPtr = NdbThread_Create(runAsyncIoThread,
81  (void**)this,
82  stackSize,
83  buf,
84  NDB_THREAD_PRIO_MEAN);
85 
86  if (theThreadPtr == 0)
87  {
88  ERROR_SET(fatal, NDBD_EXIT_MEMALLOC,
89  "","Could not allocate file system thread");
90  }
91 
92  do
93  {
94  NdbCondition_Wait(theStartConditionPtr,
95  theStartMutexPtr);
96  }
97  while (theStartFlag == false);
98 
99  NdbMutex_Unlock(theStartMutexPtr);
100  NdbMutex_Destroy(theStartMutexPtr);
101  NdbCondition_Destroy(theStartConditionPtr);
102 
103  return theThreadPtr;
104 }
105 
106 void
107 AsyncIoThread::shutdown()
108 {
109  void *status;
111  request.action = Request::end;
112  this->theMemoryChannelPtr->writeChannel( &request );
113  NdbThread_WaitFor(theThreadPtr, &status);
114  NdbThread_Destroy(&theThreadPtr);
115 }
116 
117 void
119 {
120  assert(m_current_file);
121  assert(m_current_file->getThread() == this);
122  assert(theMemoryChannelPtr == &theMemoryChannel);
123  theMemoryChannelPtr->writeChannel(request);
124 }
125 
126 void
127 AsyncIoThread::run()
128 {
129  Request *request;
130 
131  // Create theMemoryChannel in the thread that will wait for it
132  NdbMutex_Lock(theStartMutexPtr);
133  theStartFlag = true;
134  NdbMutex_Unlock(theStartMutexPtr);
135  NdbCondition_Signal(theStartConditionPtr);
136 
137  while (1)
138  {
139  request = theMemoryChannelPtr->readChannel();
140  if (!request || request->action == Request::end)
141  {
142  DEBUG(ndbout_c("Nothing read from Memory Channel in AsyncFile"));
143  theStartFlag = false;
144  return;
145  }//if
146 
147  AsyncFile * file = request->file;
148  m_current_request= request;
149  switch (request->action) {
150  case Request::open:
151  file->openReq(request);
152  if (request->error == 0 && request->m_do_bind)
153  attach(file);
154  break;
155  case Request::close:
156  file->closeReq(request);
157  detach(file);
158  break;
159  case Request::closeRemove:
160  file->closeReq(request);
161  file->removeReq(request);
162  detach(file);
163  break;
164  case Request::readPartial:
165  case Request::read:
166  file->readReq(request);
167  break;
168  case Request::readv:
169  file->readvReq(request);
170  break;
171  case Request::write:
172  file->writeReq(request);
173  break;
174  case Request::writev:
175  file->writevReq(request);
176  break;
177  case Request::writeSync:
178  file->writeReq(request);
179  file->syncReq(request);
180  break;
181  case Request::writevSync:
182  file->writevReq(request);
183  file->syncReq(request);
184  break;
185  case Request::sync:
186  file->syncReq(request);
187  break;
188  case Request::append:
189  file->appendReq(request);
190  break;
191  case Request::append_synch:
192  file->appendReq(request);
193  file->syncReq(request);
194  break;
195  case Request::rmrf:
196  file->rmrfReq(request, file->theFileName.c_str(),
197  request->par.rmrf.own_directory);
198  break;
199  case Request::end:
200  theStartFlag = false;
201  return;
202  case Request::allocmem:
203  {
204  allocMemReq(request);
205  break;
206  }
207  case Request::buildindx:
208  buildIndxReq(request);
209  break;
210  case Request::suspend:
211  if (request->par.suspend.milliseconds)
212  {
213  g_eventLogger->debug("Suspend %s %u ms",
214  file->theFileName.c_str(),
215  request->par.suspend.milliseconds);
216  NdbSleep_MilliSleep(request->par.suspend.milliseconds);
217  continue;
218  }
219  else
220  {
221  g_eventLogger->debug("Suspend %s",
222  file->theFileName.c_str());
223  theStartFlag = false;
224  return;
225  }
226  default:
227  DEBUG(ndbout_c("Invalid Request"));
228  abort();
229  break;
230  }//switch
231  m_last_request = request;
232  m_current_request = 0;
233 
234  // No need to signal as ndbfs only uses tryRead
235  theReportTo->writeChannelNoSignal(request);
236  m_fs.wakeup();
237  }
238 }
239 
240 void
241 AsyncIoThread::allocMemReq(Request* request)
242 {
243  Uint32 watchDog = 0;
244  switch((request->par.alloc.requestInfo & 255)){
245  case AllocMemReq::RT_MAP:{
246  bool memlock = !!(request->par.alloc.requestInfo & AllocMemReq::RT_MEMLOCK);
247  request->par.alloc.ctx->m_mm.map(&watchDog, memlock);
248  request->par.alloc.bytes = 0;
249  request->error = 0;
250  break;
251  }
252  case AllocMemReq::RT_EXTEND:
256  assert(false);
257  request->par.alloc.bytes = 0;
258  request->error = 1;
259  break;
260  }
261 }
262 
263 void
264 AsyncIoThread::buildIndxReq(Request* request)
265 {
266  mt_BuildIndxReq req;
267  memcpy(&req, &request->par.build.m_req, sizeof(req));
268  req.mem_buffer = request->file->m_page_ptr.p;
269  req.buffer_size = request->file->m_page_cnt * sizeof(GlobalPage);
270  request->error = (* req.func_ptr)(&req);
271 }
272 
273 void
274 AsyncIoThread::attach(AsyncFile* file)
275 {
276  assert(m_current_file == 0);
277  assert(theMemoryChannelPtr == &m_fs.theToBoundThreads);
278  m_current_file = file;
279  theMemoryChannelPtr = &theMemoryChannel;
280  file->attach(this);
281  m_fs.cnt_active_bound(1);
282 }
283 
284 void
285 AsyncIoThread::detach(AsyncFile* file)
286 {
287  if (m_current_file == 0)
288  {
289  assert(file->getThread() == 0);
290  }
291  else
292  {
293  assert(m_current_file == file);
294  assert(theMemoryChannelPtr = &theMemoryChannel);
295  m_current_file = 0;
296  theMemoryChannelPtr = &m_fs.theToBoundThreads;
297  file->detach(this);
298  m_fs.cnt_active_bound(-1);
299  }
300 }