MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
AsyncFileTest.cpp
1 /*
2  Copyright (C) 2003-2006 MySQL AB
3  All rights reserved. Use is subject to license terms.
4 
5  This program is free software; you can redistribute it and/or modify
6  it under the terms of the GNU General Public License as published by
7  the Free Software Foundation; version 2 of the License.
8 
9  This program is distributed in the hope that it will be useful,
10  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  GNU General Public License for more details.
13 
14  You should have received a copy of the GNU General Public License
15  along with this program; if not, write to the Free Software
16  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18 
19 //#define TESTDEBUG 1
20 
21 #include <ndb_global.h>
22 
23 #include <kernel_types.h>
24 #include <Pool.hpp>
25 #include "AsyncFile.hpp"
26 #include "NdbOut.hpp"
27 #include "NdbTick.h"
28 #include "NdbThread.h"
29 #include "NdbMain.h"
30 
31 // Test and benchmark functionality of AsyncFile
32 // -n Number of files
33 // -r Number of simultaneous requests
34 // -s Filesize, number of pages
35 // -l Number of iterations
36 // -remove, remove files after close
37 // -reverse, write files in reverse order, start with the last page
38 
39 #define MAXFILES 255
40 #define DEFAULT_NUM_FILES 1
41 #define MAXREQUESTS 256
42 #define DEFAULT_NUM_REQUESTS 1
43 #define MAXFILESIZE 4096
44 #define DEFAULT_FILESIZE 2048
45 #define FVERSION 0x01000000
46 #define PAGESIZE 8192
47 
48 #define TIMER_START { Uint64 starttick = NdbTick_CurrentMillisecond()
49 #define TIMER_PRINT(str, ops) Uint64 stoptick = NdbTick_CurrentMillisecond();\
50  Uint64 totaltime = (stoptick-starttick); \
51  ndbout << ops << " " << str << \
52  " total time " << (int)totaltime << "ms" << endl;\
53  char buf[255];\
54  sprintf(buf, "%d %s/sec\n",(int)((ops*1000)/totaltime), str);\
55  ndbout <<buf << endl;}
56 
57 static int numberOfFiles = DEFAULT_NUM_FILES;
58 static int numberOfRequests = DEFAULT_NUM_REQUESTS;
59 static int fileSize = DEFAULT_FILESIZE;
60 static int removeFiles = 0;
61 static int writeFilesReverse = 0;
62 static int numberOfIterations = 1;
63 Uint32 FileNameArray[4];
64 
65 Pool<AsyncFile>* files;
66 AsyncFile* openFiles[MAXFILES];
67 Pool<Request>* theRequestPool;
68 MemoryChannelMultipleWriter<Request>* theReportChannel;
69 
70 char WritePages[MAXFILES][PAGESIZE];
71 char ReadPages[MAXFILES][PAGESIZE];
72 
73 int readArguments(int argc, const char** argv);
74 int openFile(int fileNum);
75 int openFileWait();
76 int closeFile(int fileNum);
77 int closeFileWait();
78 int writeFile( int fileNum, int pagenum);
79 int writeFileWait();
80 int writeSyncFile( int fileNum, int pagenum);
81 int writeSyncFileWait();
82 int readFile( int fileNum, int pagenum);
83 int readFileWait();
84 
85 
86 NDB_COMMAND(aftest, "aftest", "aftest [-n <Number of files>] [-r <Number of simultaneous requests>] [-s <Filesize, number of pages>] [-l <Number of iterations>] [-remove, remove files after close] [-reverse, write files in reverse order, start with the last page]", "Test the AsyncFile class of Ndb", 8192)
87 {
88  int s, numReq, numOps;
89 
90  readArguments(argc, argv);
91 
92  files = new Pool<AsyncFile>(numberOfFiles, 2);
93  theRequestPool = new Pool<Request>;
94  theReportChannel = new MemoryChannelMultipleWriter<Request>;
95 
96  ndbout << "AsyncFileTest starting" << endl;
97  ndbout << " " << numberOfFiles << " files" << endl;
98  ndbout << " " << numberOfRequests << " requests" << endl;
99  ndbout << " " << fileSize << " * 8k files" << endl << endl;
100  ndbout << " " << numberOfIterations << " iterations" << endl << endl;
101 
102  NdbThread_SetConcurrencyLevel(numberOfFiles+2);
103 
104  // initialize data to write to files
105  for (int i = 0; i < MAXFILES; i++) {
106  for (int j = 0; j < PAGESIZE; j++){
107  WritePages[i][j] = (64+i+j)%256;
108  }
109  // memset(&WritePages[i][0], i+64, PAGESIZE);
110  }
111 
112  // Set file directory and name
113  // /T27/F27/NDBFS/S27Pnn.data
114  FileNameArray[0] = 27; // T27
115  FileNameArray[1] = 27; // F27
116  FileNameArray[2] = 27; // S27
117  FileNameArray[3] = FVERSION; // Version
118 
119  for (int l = 0; l < numberOfIterations; l++)
120  {
121 
122  ndbout << "Opening files" << endl;
123  // Open files
124  for (int f = 0; f < numberOfFiles; f++)
125  {
126  openFile(f);
127 
128  }
129 
130  // Wait for answer
131  openFileWait();
132 
133  ndbout << "Files opened!" << endl<< endl;
134 
135  // Write to files
136  ndbout << "Started writing" << endl;
137  TIMER_START;
138  s = 0;
139  numReq = 0;
140  numOps = 0;
141  while ( s < fileSize)
142  {
143  for (int r = 0; r < numberOfRequests; r++)
144  {
145  for (int f = 0; f < numberOfFiles; f++)
146  {
147  writeFile(f, s);
148  numReq++;
149  numOps++;
150  }
151 
152  s++;
153  }
154 
155  while (numReq > 0)
156  {
157  writeFileWait();
158  numReq--;
159  }
160 
161  }
162 
163  TIMER_PRINT("writes", numOps);
164 
165 
166  ndbout << "Started reading" << endl;
167  TIMER_START;
168 
169  // Read from files
170  s = 0;
171  numReq = 0;
172  numOps = 0;
173  while ( s < fileSize)
174  {
175  for (int r = 0; r < numberOfRequests; r++)
176  {
177  for (int f = 0; f < numberOfFiles; f++)
178  {
179  readFile(f, s);
180  numReq++;
181  numOps++;
182  }
183 
184  s++;
185 
186  }
187 
188  while (numReq > 0)
189  {
190  readFileWait();
191  numReq--;
192  }
193 
194  }
195  TIMER_PRINT("reads", numOps);
196 
197  ndbout << "Started writing with sync" << endl;
198  TIMER_START;
199 
200  // Write to files
201  s = 0;
202  numReq = 0;
203  numOps = 0;
204  while ( s < fileSize)
205  {
206  for (int r = 0; r < numberOfRequests; r++)
207  {
208  for (int f = 0; f < numberOfFiles; f++)
209  {
210  writeSyncFile(f, s);
211  numReq++;
212  numOps++;
213  }
214 
215  s++;
216  }
217 
218  while (numReq > 0)
219  {
220  writeSyncFileWait();
221  numReq--;
222  }
223 
224  }
225 
226  TIMER_PRINT("writeSync", numOps);
227 
228  // Close files
229  ndbout << "Closing files" << endl;
230  for (int f = 0; f < numberOfFiles; f++)
231  {
232  closeFile(f);
233 
234  }
235 
236  // Wait for answer
237  closeFileWait();
238 
239  ndbout << "Files closed!" << endl<< endl;
240  }
241 
242  // Deallocate memory
243  delete files;
244  delete theReportChannel;
245  delete theRequestPool;
246 
247  return 0;
248 
249 }
250 
251 
252 
253 int forward( AsyncFile * file, Request* request )
254 {
255  file->execute(request);
256  ERROR_CHECK 0;
257  return 1;
258 }
259 
260 int openFile( int fileNum)
261 {
262  AsyncFile* file = (AsyncFile *)files->get();
263 
264  FileNameArray[3] = fileNum | FVERSION;
265  file->fileName().set( NDBFS_REF, &FileNameArray[0] );
266  ndbout << "openFile: " << file->fileName().c_str() << endl;
267 
268  if( ERROR_STATE ) {
269  ERROR_RESET;
270  files->put( file );
271  ndbout << "Failed to set filename" << endl;
272  return 1;
273  }
274  file->reportTo(theReportChannel);
275 
276  Request* request = theRequestPool->get();
277  request->action= Request::open;
278  request->error= 0;
279  request->par.open.flags = 0x302; //O_RDWR | O_CREAT | O_TRUNC ; // 770
280  request->set(NDBFS_REF, 0x23456789, fileNum );
281  request->file = file;
282 
283  if (!forward(file,request)) {
284  // Something went wrong
285  ndbout << "Could not forward open request" << endl;
286  theRequestPool->put(request);
287  return 1;
288  }
289  return 0;
290 }
291 
292 int closeFile( int fileNum)
293 {
294 
295  AsyncFile* file = openFiles[fileNum];
296 
297  Request* request = theRequestPool->get();
298  if (removeFiles == 1)
299  request->action = Request::closeRemove;
300  else
301  request->action= Request::close;
302 
303  request->error= 0;
304  request->set(NDBFS_REF, 0x23456789, fileNum );
305  request->file = file;
306 
307  if (!forward(file,request)) {
308  // Something went wrong
309  ndbout << "Could not forward close request" << endl;
310  theRequestPool->put(request);
311  return 1;
312  }
313  return 0;
314 }
315 
316 int writeFile( int fileNum, int pagenum)
317 {
318  AsyncFile* file = openFiles[fileNum];
319 #ifdef TESTDEBUG
320  ndbout << "writeFile" << fileNum <<": "<<pagenum<<", " << file->fileName().c_str()<< endl;
321 #endif
322  Request *request = theRequestPool->get();
323  request->action = Request::write;
324  request->error = 0;
325  request->set(NDBFS_REF, pagenum, fileNum);
326  request->file = openFiles[fileNum];
327 
328  // Write only one page, choose the correct page for each file using fileNum
329  request->par.readWrite.pages[0].buf = &WritePages[fileNum][0];
330  request->par.readWrite.pages[0].size = PAGESIZE;
331  if (writeFilesReverse == 1)
332  {
333  // write the last page in the files first
334  // This is a normal way for the Blocks in Ndb to write to a file
335  request->par.readWrite.pages[0].offset = (fileSize - pagenum - 1) * PAGESIZE;
336  }
337  else
338  {
339  request->par.readWrite.pages[0].offset = pagenum * PAGESIZE;
340  }
341  request->par.readWrite.numberOfPages = 1;
342 
343  if (!forward(file,request)) {
344  // Something went wrong
345  ndbout << "Could not forward write request" << endl;
346  theRequestPool->put(request);
347  return 1;
348  }
349  return 0;
350 
351 }
352 
353 int writeSyncFile( int fileNum, int pagenum)
354 {
355  AsyncFile* file = openFiles[fileNum];
356 #ifdef TESTDEBUG
357  ndbout << "writeFile" << fileNum <<": "<<pagenum<<", " << file->fileName().c_str() << endl;
358 #endif
359  Request *request = theRequestPool->get();
360  request->action = Request::writeSync;
361  request->error = 0;
362  request->set(NDBFS_REF, pagenum, fileNum);
363  request->file = openFiles[fileNum];
364 
365  // Write only one page, choose the correct page for each file using fileNum
366  request->par.readWrite.pages[0].buf = &WritePages[fileNum][0];
367  request->par.readWrite.pages[0].size = PAGESIZE;
368  request->par.readWrite.pages[0].offset = pagenum * PAGESIZE;
369  request->par.readWrite.numberOfPages = 1;
370 
371  if (!forward(file,request)) {
372  // Something went wrong
373  ndbout << "Could not forward write request" << endl;
374  theRequestPool->put(request);
375  return 1;
376  }
377  return 0;
378 
379 }
380 
381 int readFile( int fileNum, int pagenum)
382 {
383  AsyncFile* file = openFiles[fileNum];
384 #ifdef TESTDEBUG
385  ndbout << "readFile" << fileNum <<": "<<pagenum<<", " << file->fileName().c_str() << endl;
386 #endif
387  Request *request = theRequestPool->get();
388  request->action = Request::read;
389  request->error = 0;
390  request->set(NDBFS_REF, pagenum, fileNum);
391  request->file = openFiles[fileNum];
392 
393  // Read only one page, choose the correct page for each file using fileNum
394  request->par.readWrite.pages[0].buf = &ReadPages[fileNum][0];
395  request->par.readWrite.pages[0].size = PAGESIZE;
396  request->par.readWrite.pages[0].offset = pagenum * PAGESIZE;
397  request->par.readWrite.numberOfPages = 1;
398 
399  if (!forward(file,request)) {
400  // Something went wrong
401  ndbout << "Could not forward read request" << endl;
402  theRequestPool->put(request);
403  return 1;
404  }
405  return 0;
406 
407 }
408 
409 int openFileWait()
410 {
411  int openedFiles = 0;
412  while (openedFiles < numberOfFiles)
413  {
414  Request* request = theReportChannel->readChannel();
415  if (request)
416  {
417  if (request->action == Request::open)
418  {
419  if (request->error ==0)
420  {
421 #ifdef TESTDEBUG
422  ndbout << "Opened file " << request->file->fileName().c_str() << endl;
423 #endif
424  openFiles[request->theFilePointer] = request->file;
425  }
426  else
427  {
428  ndbout << "error while opening file" << endl;
429  exit(1);
430  }
431  theRequestPool->put(request);
432  openedFiles++;
433  }
434  else
435  {
436  ndbout << "Unexpected request received" << endl;
437  }
438  }
439  else
440  {
441  ndbout << "Nothing read from theReportChannel" << endl;
442  }
443  }
444  return 0;
445 }
446 
447 int closeFileWait()
448 {
449  int closedFiles = 0;
450  while (closedFiles < numberOfFiles)
451  {
452  Request* request = theReportChannel->readChannel();
453  if (request)
454  {
455  if (request->action == Request::close || request->action == Request::closeRemove)
456  {
457  if (request->error ==0)
458  {
459 #ifdef TESTDEBUG
460  ndbout << "Closed file " << request->file->fileName().c_str() << endl;
461 #endif
462  openFiles[request->theFilePointer] = NULL;
463  files->put(request->file);
464  }
465  else
466  {
467  ndbout << "error while closing file" << endl;
468  exit(1);
469  }
470  theRequestPool->put(request);
471  closedFiles++;
472  }
473  else
474  {
475  ndbout << "Unexpected request received" << endl;
476  }
477  }
478  else
479  {
480  ndbout << "Nothing read from theReportChannel" << endl;
481  }
482  }
483  return 0;
484 }
485 
486 int writeFileWait()
487 {
488  Request* request = theReportChannel->readChannel();
489  if (request)
490  {
491  if (request->action == Request::write)
492  {
493  if (request->error == 0)
494  {
495 #ifdef TESTDEBUG
496  ndbout << "writeFileWait"<<request->theFilePointer<<", " << request->theUserPointer<<" "<< request->file->fileName().c_str() << endl;
497 #endif
498 
499  }
500  else
501  {
502  ndbout << "error while writing file, error=" << request->error << endl;
503  exit(1);
504  }
505  theRequestPool->put(request);
506  }
507  else
508  {
509  ndbout << "Unexpected request received" << endl;
510  }
511  }
512  else
513  {
514  ndbout << "Nothing read from theReportChannel" << endl;
515  }
516  return 0;
517 }
518 
519 int writeSyncFileWait()
520 {
521  Request* request = theReportChannel->readChannel();
522  if (request)
523  {
524  if (request->action == Request::writeSync)
525  {
526  if (request->error == 0)
527  {
528 #ifdef TESTDEBUG
529  ndbout << "writeFileWait"<<request->theFilePointer<<", " << request->theUserPointer<<" "<< request->file->fileName().c_str() << endl;
530 #endif
531 
532  }
533  else
534  {
535  ndbout << "error while writing file" << endl;
536  exit(1);
537  }
538  theRequestPool->put(request);
539  }
540  else
541  {
542  ndbout << "Unexpected request received" << endl;
543  }
544  }
545  else
546  {
547  ndbout << "Nothing read from theReportChannel" << endl;
548  }
549  return 0;
550 }
551 
552 int readFileWait()
553 {
554  Request* request = theReportChannel->readChannel();
555  if (request)
556  {
557  if (request->action == Request::read)
558  {
559  if (request->error == 0)
560  {
561 #ifdef TESTDEBUG
562  ndbout << "readFileWait"<<request->theFilePointer<<", " << request->theUserPointer<<" "<< request->file->fileName().c_str() << endl;
563 #endif
564  if (memcmp(&(ReadPages[request->theFilePointer][0]), &(WritePages[request->theFilePointer][0]), PAGESIZE)!=0)
565  {
566  ndbout <<"Verification error!" << endl;
567  for (int i = 0; i < PAGESIZE; i++ ){
568  ndbout <<" Compare Page " << i << " : " << ReadPages[request->theFilePointer][i] <<", " <<WritePages[request->theFilePointer][i] << endl;;
569  if( ReadPages[request->theFilePointer][i] !=WritePages[request->theFilePointer][i])
570 
571  exit(1);
572  }
573  }
574 
575  }
576  else
577  {
578  ndbout << "error while reading file" << endl;
579  exit(1);
580  }
581  theRequestPool->put(request);
582  }
583  else
584  {
585  ndbout << "Unexpected request received" << endl;
586  }
587  }
588  else
589  {
590  ndbout << "Nothing read from theReportChannel" << endl;
591  }
592  return 0;
593 }
594 
595 int readArguments(int argc, const char** argv)
596 {
597 
598  int i = 1;
599  while (argc > 1)
600  {
601  if (strcmp(argv[i], "-n") == 0)
602  {
603  numberOfFiles = atoi(argv[i+1]);
604  if ((numberOfFiles < 1) || (numberOfFiles > MAXFILES))
605  {
606  ndbout << "Wrong number of files, default = "<<DEFAULT_NUM_FILES << endl;
607  numberOfFiles = DEFAULT_NUM_FILES;
608  }
609  }
610  else if (strcmp(argv[i], "-r") == 0)
611  {
612  numberOfRequests = atoi(argv[i+1]);
613  if ((numberOfRequests < 1) || (numberOfRequests > MAXREQUESTS))
614  {
615  ndbout << "Wrong number of requests, default = "<<DEFAULT_NUM_REQUESTS << endl;
616  numberOfRequests = DEFAULT_NUM_REQUESTS;
617  }
618  }
619  else if (strcmp(argv[i], "-s") == 0)
620  {
621  fileSize = atoi(argv[i+1]);
622  if ((fileSize < 1) || (fileSize > MAXFILESIZE))
623  {
624  ndbout << "Wrong number of 8k pages, default = "<<DEFAULT_FILESIZE << endl;
625  fileSize = DEFAULT_FILESIZE;
626  }
627  }
628  else if (strcmp(argv[i], "-l") == 0)
629  {
630  numberOfIterations = atoi(argv[i+1]);
631  if ((numberOfIterations < 1))
632  {
633  ndbout << "Wrong number of iterations, default = 1" << endl;
634  numberOfIterations = 1;
635  }
636  }
637  else if (strcmp(argv[i], "-remove") == 0)
638  {
639  removeFiles = 1;
640  argc++;
641  i--;
642  }
643  else if (strcmp(argv[i], "-reverse") == 0)
644  {
645  ndbout << "Writing files reversed" << endl;
646  writeFilesReverse = 1;
647  argc++;
648  i--;
649  }
650 
651  argc -= 2;
652  i = i + 2;
653  }
654 
655  if ((fileSize % numberOfRequests)!= 0)
656  {
657  numberOfRequests = numberOfRequests - (fileSize % numberOfRequests);
658  ndbout <<"numberOfRequest must be modulo of filesize" << endl;
659  ndbout << "New numberOfRequest="<<numberOfRequests<<endl;
660  }
661  return 0;
662 }
663 
664 
665 // Needed for linking...
666 
667 void ErrorReporter::handleError(ErrorCategory type, int messageID,
668  const char* problemData, const char* objRef, NdbShutdownType stype)
669 {
670 
671  ndbout << "ErrorReporter::handleError activated" << endl;
672  ndbout << "type= " << type << endl;
673  ndbout << "messageID= " << messageID << endl;
674  ndbout << "problemData= " << problemData << endl;
675  ndbout << "objRef= " << objRef << endl;
676 
677  exit(1);
678 }
679 
680 void ErrorReporter::handleAssert(const char* message, const char* file, int line)
681 {
682  ndbout << "ErrorReporter::handleAssert activated" << endl;
683  ndbout << "message= " << message << endl;
684  ndbout << "file= " << file << endl;
685  ndbout << "line= " << line << endl;
686  exit(1);
687 }
688 
689 
690 GlobalData globalData;
691 
692 
693 Signal::Signal()
694 {
695 
696 }
697