MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
mf_iocache.c
1 /* Copyright (c) 2000, 2011, Oracle and/or its affiliates. All rights reserved.
2 
3  This program is free software; you can redistribute it and/or modify
4  it under the terms of the GNU General Public License as published by
5  the Free Software Foundation; version 2 of the License.
6 
7  This program is distributed in the hope that it will be useful,
8  but WITHOUT ANY WARRANTY; without even the implied warranty of
9  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  GNU General Public License for more details.
11 
12  You should have received a copy of the GNU General Public License
13  along with this program; if not, write to the Free Software
14  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
15 
16 /*
17  Cashing of files with only does (sequential) read or writes of fixed-
18  length records. A read isn't allowed to go over file-length. A read is ok
19  if it ends at file-length and next read can try to read after file-length
20  (and get a EOF-error).
21  Possibly use of asyncronic io.
22  macros for read and writes for faster io.
23  Used instead of FILE when reading or writing whole files.
24  This code makes mf_rec_cache obsolete (currently only used by ISAM)
25  One can change info->pos_in_file to a higher value to skip bytes in file if
26  also info->read_pos is set to info->read_end.
27  If called through open_cached_file(), then the temporary file will
28  only be created if a write exeeds the file buffer or if one calls
29  my_b_flush_io_cache().
30 
31  If one uses SEQ_READ_APPEND, then two buffers are allocated, one for
32  reading and another for writing. Reads are first done from disk and
33  then done from the write buffer. This is an efficient way to read
34  from a log file when one is writing to it at the same time.
35  For this to work, the file has to be opened in append mode!
36  Note that when one uses SEQ_READ_APPEND, one MUST write using
37  my_b_append ! This is needed because we need to lock the mutex
38  every time we access the write buffer.
39 
40 TODO:
41  When one SEQ_READ_APPEND and we are reading and writing at the same time,
42  each time the write buffer gets full and it's written to disk, we will
43  always do a disk read to read a part of the buffer from disk to the
44  read buffer.
45  This should be fixed so that when we do a my_b_flush_io_cache() and
46  we have been reading the write buffer, we should transfer the rest of the
47  write buffer to the read buffer before we start to reuse it.
48 */
49 
50 #include "mysys_priv.h"
51 #include <m_string.h>
52 #ifdef HAVE_AIOWAIT
53 #include "mysys_err.h"
54 static void my_aiowait(my_aio_result *result);
55 #endif
56 #include <errno.h>
57 
58 #define lock_append_buffer(info) \
59  mysql_mutex_lock(&(info)->append_buffer_lock)
60 #define unlock_append_buffer(info) \
61  mysql_mutex_unlock(&(info)->append_buffer_lock)
62 
63 #define IO_ROUND_UP(X) (((X)+IO_SIZE-1) & ~(IO_SIZE-1))
64 #define IO_ROUND_DN(X) ( (X) & ~(IO_SIZE-1))
65 
66 /*
67  Setup internal pointers inside IO_CACHE
68 
69  SYNOPSIS
70  setup_io_cache()
71  info IO_CACHE handler
72 
73  NOTES
74  This is called on automaticly on init or reinit of IO_CACHE
75  It must be called externally if one moves or copies an IO_CACHE
76  object.
77 */
78 
79 void setup_io_cache(IO_CACHE* info)
80 {
81  /* Ensure that my_b_tell() and my_b_bytes_in_cache works */
82  if (info->type == WRITE_CACHE)
83  {
84  info->current_pos= &info->write_pos;
85  info->current_end= &info->write_end;
86  }
87  else
88  {
89  info->current_pos= &info->read_pos;
90  info->current_end= &info->read_end;
91  }
92 }
93 
94 
95 static void
96 init_functions(IO_CACHE* info)
97 {
98  enum cache_type type= info->type;
99  switch (type) {
100  case READ_NET:
101  /*
102  Must be initialized by the caller. The problem is that
103  _my_b_net_read has to be defined in sql directory because of
104  the dependency on THD, and therefore cannot be visible to
105  programs that link against mysys but know nothing about THD, such
106  as myisamchk
107  */
108  break;
109  case SEQ_READ_APPEND:
110  info->read_function = _my_b_seq_read;
111  info->write_function = 0; /* Force a core if used */
112  break;
113  default:
114  info->read_function = info->share ? _my_b_read_r : _my_b_read;
115  info->write_function = _my_b_write;
116  }
117 
118  setup_io_cache(info);
119 }
120 
121 
122 /*
123  Initialize an IO_CACHE object
124 
125  SYNOPSOS
126  init_io_cache()
127  info cache handler to initialize
128  file File that should be associated to to the handler
129  If == -1 then real_open_cached_file()
130  will be called when it's time to open file.
131  cachesize Size of buffer to allocate for read/write
132  If == 0 then use my_default_record_cache_size
133  type Type of cache
134  seek_offset Where cache should start reading/writing
135  use_async_io Set to 1 of we should use async_io (if avaiable)
136  cache_myflags Bitmap of differnt flags
137  MY_WME | MY_FAE | MY_NABP | MY_FNABP |
138  MY_DONT_CHECK_FILESIZE
139 
140  RETURN
141  0 ok
142  # error
143 */
144 
145 int init_io_cache(IO_CACHE *info, File file, size_t cachesize,
146  enum cache_type type, my_off_t seek_offset,
147  pbool use_async_io, myf cache_myflags)
148 {
149  size_t min_cache;
150  my_off_t pos;
151  my_off_t end_of_file= ~(my_off_t) 0;
152  DBUG_ENTER("init_io_cache");
153  DBUG_PRINT("enter",("cache: 0x%lx type: %d pos: %ld",
154  (ulong) info, (int) type, (ulong) seek_offset));
155 
156  info->file= file;
157  info->type= TYPE_NOT_SET; /* Don't set it until mutex are created */
158  info->pos_in_file= seek_offset;
159  info->pre_close = info->pre_read = info->post_read = 0;
160  info->arg = 0;
161  info->alloced_buffer = 0;
162  info->buffer=0;
163  info->seek_not_done= 0;
164 
165  if (file >= 0)
166  {
167  pos= mysql_file_tell(file, MYF(0));
168  if ((pos == (my_off_t) -1) && (my_errno == ESPIPE))
169  {
170  /*
171  This kind of object doesn't support seek() or tell(). Don't set a
172  flag that will make us again try to seek() later and fail.
173  */
174  info->seek_not_done= 0;
175  /*
176  Additionally, if we're supposed to start somewhere other than the
177  the beginning of whatever this file is, then somebody made a bad
178  assumption.
179  */
180  DBUG_ASSERT(seek_offset == 0);
181  }
182  else
183  info->seek_not_done= test(seek_offset != pos);
184  }
185 
186  info->disk_writes= 0;
187  info->share=0;
188 
189  if (!cachesize && !(cachesize= my_default_record_cache_size))
190  DBUG_RETURN(1); /* No cache requested */
191  min_cache=use_async_io ? IO_SIZE*4 : IO_SIZE*2;
192  if (type == READ_CACHE || type == SEQ_READ_APPEND)
193  { /* Assume file isn't growing */
194  if (!(cache_myflags & MY_DONT_CHECK_FILESIZE))
195  {
196  /* Calculate end of file to avoid allocating oversized buffers */
197  end_of_file= mysql_file_seek(file, 0L, MY_SEEK_END, MYF(0));
198  /* Need to reset seek_not_done now that we just did a seek. */
199  info->seek_not_done= end_of_file == seek_offset ? 0 : 1;
200  if (end_of_file < seek_offset)
201  end_of_file=seek_offset;
202  /* Trim cache size if the file is very small */
203  if ((my_off_t) cachesize > end_of_file-seek_offset+IO_SIZE*2-1)
204  {
205  cachesize= (size_t) (end_of_file-seek_offset)+IO_SIZE*2-1;
206  use_async_io=0; /* No need to use async */
207  }
208  }
209  }
210  cache_myflags &= ~MY_DONT_CHECK_FILESIZE;
211  if (type != READ_NET && type != WRITE_NET)
212  {
213  /* Retry allocating memory in smaller blocks until we get one */
214  cachesize= ((cachesize + min_cache-1) & ~(min_cache-1));
215  for (;;)
216  {
217  size_t buffer_block;
218  /*
219  Unset MY_WAIT_IF_FULL bit if it is set, to prevent conflict with
220  MY_ZEROFILL.
221  */
222  myf flags= (myf) (cache_myflags & ~(MY_WME | MY_WAIT_IF_FULL));
223 
224  if (cachesize < min_cache)
225  cachesize = min_cache;
226  buffer_block= cachesize;
227  if (type == SEQ_READ_APPEND)
228  buffer_block *= 2;
229  if (cachesize == min_cache)
230  flags|= (myf) MY_WME;
231 
232  if ((info->buffer= (uchar*) my_malloc(buffer_block, flags)) != 0)
233  {
234  info->write_buffer=info->buffer;
235  if (type == SEQ_READ_APPEND)
236  info->write_buffer = info->buffer + cachesize;
237  info->alloced_buffer=1;
238  break; /* Enough memory found */
239  }
240  if (cachesize == min_cache)
241  DBUG_RETURN(2); /* Can't alloc cache */
242  /* Try with less memory */
243  cachesize= (cachesize*3/4 & ~(min_cache-1));
244  }
245  }
246 
247  DBUG_PRINT("info",("init_io_cache: cachesize = %lu", (ulong) cachesize));
248  info->read_length=info->buffer_length=cachesize;
249  info->myflags=cache_myflags & ~(MY_NABP | MY_FNABP);
250  info->request_pos= info->read_pos= info->write_pos = info->buffer;
251  if (type == SEQ_READ_APPEND)
252  {
253  info->append_read_pos = info->write_pos = info->write_buffer;
254  info->write_end = info->write_buffer + info->buffer_length;
255  mysql_mutex_init(key_IO_CACHE_append_buffer_lock,
256  &info->append_buffer_lock, MY_MUTEX_INIT_FAST);
257  }
258 #if defined(SAFE_MUTEX)
259  else
260  {
261  /* Clear mutex so that safe_mutex will notice that it's not initialized */
262  memset(&info->append_buffer_lock, 0, sizeof(info->append_buffer_lock));
263  }
264 #endif
265 
266  if (type == WRITE_CACHE)
267  info->write_end=
268  info->buffer+info->buffer_length- (seek_offset & (IO_SIZE-1));
269  else
270  info->read_end=info->buffer; /* Nothing in cache */
271 
272  /* End_of_file may be changed by user later */
273  info->end_of_file= end_of_file;
274  info->error=0;
275  info->type= type;
276  init_functions(info);
277 #ifdef HAVE_AIOWAIT
278  if (use_async_io && ! my_disable_async_io)
279  {
280  DBUG_PRINT("info",("Using async io"));
281  info->read_length/=2;
282  info->read_function=_my_b_async_read;
283  }
284  info->inited=info->aio_result.pending=0;
285 #endif
286  DBUG_RETURN(0);
287 } /* init_io_cache */
288 
289  /* Wait until current request is ready */
290 
291 #ifdef HAVE_AIOWAIT
292 static void my_aiowait(my_aio_result *result)
293 {
294  if (result->pending)
295  {
296  struct aio_result_t *tmp;
297  for (;;)
298  {
299  if ((int) (tmp=aiowait((struct timeval *) 0)) == -1)
300  {
301  if (errno == EINTR)
302  continue;
303  DBUG_PRINT("error",("No aio request, error: %d",errno));
304  result->pending=0; /* Assume everythings is ok */
305  break;
306  }
307  ((my_aio_result*) tmp)->pending=0;
308  if ((my_aio_result*) tmp == result)
309  break;
310  }
311  }
312  return;
313 }
314 #endif
315 
316 
317 /*
318  Use this to reset cache to re-start reading or to change the type
319  between READ_CACHE <-> WRITE_CACHE
320  If we are doing a reinit of a cache where we have the start of the file
321  in the cache, we are reusing this memory without flushing it to disk.
322 */
323 
324 my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
325  my_off_t seek_offset,
326  pbool use_async_io __attribute__((unused)),
327  pbool clear_cache)
328 {
329  DBUG_ENTER("reinit_io_cache");
330  DBUG_PRINT("enter",("cache: 0x%lx type: %d seek_offset: %lu clear_cache: %d",
331  (ulong) info, type, (ulong) seek_offset,
332  (int) clear_cache));
333 
334  /* One can't do reinit with the following types */
335  DBUG_ASSERT(type != READ_NET && info->type != READ_NET &&
336  type != WRITE_NET && info->type != WRITE_NET &&
337  type != SEQ_READ_APPEND && info->type != SEQ_READ_APPEND);
338 
339  /* If the whole file is in memory, avoid flushing to disk */
340  if (! clear_cache &&
341  seek_offset >= info->pos_in_file &&
342  seek_offset <= my_b_tell(info))
343  {
344  /* Reuse current buffer without flushing it to disk */
345  uchar *pos;
346  if (info->type == WRITE_CACHE && type == READ_CACHE)
347  {
348  info->read_end=info->write_pos;
349  info->end_of_file=my_b_tell(info);
350  /*
351  Trigger a new seek only if we have a valid
352  file handle.
353  */
354  info->seek_not_done= (info->file != -1);
355  }
356  else if (type == WRITE_CACHE)
357  {
358  if (info->type == READ_CACHE)
359  {
360  info->write_end=info->write_buffer+info->buffer_length;
361  info->seek_not_done=1;
362  }
363  info->end_of_file = ~(my_off_t) 0;
364  }
365  pos=info->request_pos+(seek_offset-info->pos_in_file);
366  if (type == WRITE_CACHE)
367  info->write_pos=pos;
368  else
369  info->read_pos= pos;
370 #ifdef HAVE_AIOWAIT
371  my_aiowait(&info->aio_result); /* Wait for outstanding req */
372 #endif
373  }
374  else
375  {
376  /*
377  If we change from WRITE_CACHE to READ_CACHE, assume that everything
378  after the current positions should be ignored
379  */
380  if (info->type == WRITE_CACHE && type == READ_CACHE)
381  info->end_of_file=my_b_tell(info);
382  /* flush cache if we want to reuse it */
383  if (!clear_cache && my_b_flush_io_cache(info,1))
384  DBUG_RETURN(1);
385  info->pos_in_file=seek_offset;
386  /* Better to do always do a seek */
387  info->seek_not_done=1;
388  info->request_pos=info->read_pos=info->write_pos=info->buffer;
389  if (type == READ_CACHE)
390  {
391  info->read_end=info->buffer; /* Nothing in cache */
392  }
393  else
394  {
395  info->write_end=(info->buffer + info->buffer_length -
396  (seek_offset & (IO_SIZE-1)));
397  info->end_of_file= ~(my_off_t) 0;
398  }
399  }
400  info->type=type;
401  info->error=0;
402  init_functions(info);
403 
404 #ifdef HAVE_AIOWAIT
405  if (use_async_io && ! my_disable_async_io &&
406  ((ulong) info->buffer_length <
407  (ulong) (info->end_of_file - seek_offset)))
408  {
409  info->read_length=info->buffer_length/2;
410  info->read_function=_my_b_async_read;
411  }
412  info->inited=0;
413 #endif
414  DBUG_RETURN(0);
415 } /* reinit_io_cache */
416 
417 
418 
419 /*
420  Read buffered.
421 
422  SYNOPSIS
423  _my_b_read()
424  info IO_CACHE pointer
425  Buffer Buffer to retrieve count bytes from file
426  Count Number of bytes to read into Buffer
427 
428  NOTE
429  This function is only called from the my_b_read() macro when there
430  isn't enough characters in the buffer to satisfy the request.
431 
432  WARNING
433 
434  When changing this function, be careful with handling file offsets
435  (end-of_file, pos_in_file). Do not cast them to possibly smaller
436  types than my_off_t unless you can be sure that their value fits.
437  Same applies to differences of file offsets.
438 
439  When changing this function, check _my_b_read_r(). It might need the
440  same change.
441 
442  RETURN
443  0 we succeeded in reading all data
444  1 Error: couldn't read requested characters. In this case:
445  If info->error == -1, we got a read error.
446  Otherwise info->error contains the number of bytes in Buffer.
447 */
448 
449 int _my_b_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
450 {
451  size_t length,diff_length,left_length, max_length;
452  my_off_t pos_in_file;
453  DBUG_ENTER("_my_b_read");
454 
455  /* If the buffer is not empty yet, copy what is available. */
456  if ((left_length= (size_t) (info->read_end-info->read_pos)))
457  {
458  DBUG_ASSERT(Count >= left_length); /* User is not using my_b_read() */
459  memcpy(Buffer,info->read_pos, left_length);
460  Buffer+=left_length;
461  Count-=left_length;
462  }
463 
464  /* pos_in_file always point on where info->buffer was read */
465  pos_in_file=info->pos_in_file+ (size_t) (info->read_end - info->buffer);
466 
467  /*
468  Whenever a function which operates on IO_CACHE flushes/writes
469  some part of the IO_CACHE to disk it will set the property
470  "seek_not_done" to indicate this to other functions operating
471  on the IO_CACHE.
472  */
473  if (info->seek_not_done)
474  {
475  if ((mysql_file_seek(info->file, pos_in_file, MY_SEEK_SET, MYF(0))
476  != MY_FILEPOS_ERROR))
477  {
478  /* No error, reset seek_not_done flag. */
479  info->seek_not_done= 0;
480  }
481  else
482  {
483  /*
484  If the seek failed and the error number is ESPIPE, it is because
485  info->file is a pipe or socket or FIFO. We never should have tried
486  to seek on that. See Bugs#25807 and #22828 for more info.
487  */
488  DBUG_ASSERT(my_errno != ESPIPE);
489  info->error= -1;
490  DBUG_RETURN(1);
491  }
492  }
493 
494  /*
495  Calculate, how much we are within a IO_SIZE block. Ideally this
496  should be zero.
497  */
498  diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
499 
500  /*
501  If more than a block plus the rest of the current block is wanted,
502  we do read directly, without filling the buffer.
503  */
504  if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
505  { /* Fill first intern buffer */
506  size_t read_length;
507  if (info->end_of_file <= pos_in_file)
508  {
509  /* End of file. Return, what we did copy from the buffer. */
510  info->error= (int) left_length;
511  DBUG_RETURN(1);
512  }
513  /*
514  Crop the wanted count to a multiple of IO_SIZE and subtract,
515  what we did already read from a block. That way, the read will
516  end aligned with a block.
517  */
518  length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
519  if ((read_length= mysql_file_read(info->file,Buffer, length, info->myflags))
520  != length)
521  {
522  /*
523  If we didn't get, what we wanted, we either return -1 for a read
524  error, or (it's end of file), how much we got in total.
525  */
526  info->error= (read_length == (size_t) -1 ? -1 :
527  (int) (read_length+left_length));
528  DBUG_RETURN(1);
529  }
530  Count-=length;
531  Buffer+=length;
532  pos_in_file+=length;
533  left_length+=length;
534  diff_length=0;
535  }
536 
537  /*
538  At this point, we want less than one and a partial block.
539  We will read a full cache, minus the number of bytes, we are
540  within a block already. So we will reach new alignment.
541  */
542  max_length= info->read_length-diff_length;
543  /* We will not read past end of file. */
544  if (info->type != READ_FIFO &&
545  max_length > (info->end_of_file - pos_in_file))
546  max_length= (size_t) (info->end_of_file - pos_in_file);
547  /*
548  If there is nothing left to read,
549  we either are done, or we failed to fulfill the request.
550  Otherwise, we read max_length into the cache.
551  */
552  if (!max_length)
553  {
554  if (Count)
555  {
556  /* We couldn't fulfil the request. Return, how much we got. */
557  info->error= left_length;
558  DBUG_RETURN(1);
559  }
560  length=0; /* Didn't read any chars */
561  }
562  else if ((length= mysql_file_read(info->file,info->buffer, max_length,
563  info->myflags)) < Count ||
564  length == (size_t) -1)
565  {
566  /*
567  We got an read error, or less than requested (end of file).
568  If not a read error, copy, what we got.
569  */
570  if (length != (size_t) -1)
571  memcpy(Buffer, info->buffer, length);
572  info->pos_in_file= pos_in_file;
573  /* For a read error, return -1, otherwise, what we got in total. */
574  info->error= length == (size_t) -1 ? -1 : (int) (length+left_length);
575  info->read_pos=info->read_end=info->buffer;
576  DBUG_RETURN(1);
577  }
578  /*
579  Count is the remaining number of bytes requested.
580  length is the amount of data in the cache.
581  Read Count bytes from the cache.
582  */
583  info->read_pos=info->buffer+Count;
584  info->read_end=info->buffer+length;
585  info->pos_in_file=pos_in_file;
586  memcpy(Buffer, info->buffer, Count);
587  DBUG_RETURN(0);
588 }
589 
590 
591 /*
592  Prepare IO_CACHE for shared use.
593 
594  SYNOPSIS
595  init_io_cache_share()
596  read_cache A read cache. This will be copied for
597  every thread after setup.
598  cshare The share.
599  write_cache If non-NULL a write cache that is to be
600  synchronized with the read caches.
601  num_threads Number of threads sharing the cache
602  including the write thread if any.
603 
604  DESCRIPTION
605 
606  The shared cache is used so: One IO_CACHE is initialized with
607  init_io_cache(). This includes the allocation of a buffer. Then a
608  share is allocated and init_io_cache_share() is called with the io
609  cache and the share. Then the io cache is copied for each thread. So
610  every thread has its own copy of IO_CACHE. But the allocated buffer
611  is shared because cache->buffer is the same for all caches.
612 
613  One thread reads data from the file into the buffer. All threads
614  read from the buffer, but every thread maintains its own set of
615  pointers into the buffer. When all threads have used up the buffer
616  contents, one of the threads reads the next block of data into the
617  buffer. To accomplish this, each thread enters the cache lock before
618  accessing the buffer. They wait in lock_io_cache() until all threads
619  joined the lock. The last thread entering the lock is in charge of
620  reading from file to buffer. It wakes all threads when done.
621 
622  Synchronizing a write cache to the read caches works so: Whenever
623  the write buffer needs a flush, the write thread enters the lock and
624  waits for all other threads to enter the lock too. They do this when
625  they have used up the read buffer. When all threads are in the lock,
626  the write thread copies the write buffer to the read buffer and
627  wakes all threads.
628 
629  share->running_threads is the number of threads not being in the
630  cache lock. When entering lock_io_cache() the number is decreased.
631  When the thread that fills the buffer enters unlock_io_cache() the
632  number is reset to the number of threads. The condition
633  running_threads == 0 means that all threads are in the lock. Bumping
634  up the number to the full count is non-intuitive. But increasing the
635  number by one for each thread that leaves the lock could lead to a
636  solo run of one thread. The last thread to join a lock reads from
637  file to buffer, wakes the other threads, processes the data in the
638  cache and enters the lock again. If no other thread left the lock
639  meanwhile, it would think it's the last one again and read the next
640  block...
641 
642  The share has copies of 'error', 'buffer', 'read_end', and
643  'pos_in_file' from the thread that filled the buffer. We may not be
644  able to access this information directly from its cache because the
645  thread may be removed from the share before the variables could be
646  copied by all other threads. Or, if a write buffer is synchronized,
647  it would change its 'pos_in_file' after waking the other threads,
648  possibly before they could copy its value.
649 
650  However, the 'buffer' variable in the share is for a synchronized
651  write cache. It needs to know where to put the data. Otherwise it
652  would need access to the read cache of one of the threads that is
653  not yet removed from the share.
654 
655  RETURN
656  void
657 */
658 
659 void init_io_cache_share(IO_CACHE *read_cache, IO_CACHE_SHARE *cshare,
660  IO_CACHE *write_cache, uint num_threads)
661 {
662  DBUG_ENTER("init_io_cache_share");
663  DBUG_PRINT("io_cache_share", ("read_cache: 0x%lx share: 0x%lx "
664  "write_cache: 0x%lx threads: %u",
665  (long) read_cache, (long) cshare,
666  (long) write_cache, num_threads));
667 
668  DBUG_ASSERT(num_threads > 1);
669  DBUG_ASSERT(read_cache->type == READ_CACHE);
670  DBUG_ASSERT(!write_cache || (write_cache->type == WRITE_CACHE));
671 
672  mysql_mutex_init(key_IO_CACHE_SHARE_mutex,
673  &cshare->mutex, MY_MUTEX_INIT_FAST);
674  mysql_cond_init(key_IO_CACHE_SHARE_cond, &cshare->cond, 0);
675  mysql_cond_init(key_IO_CACHE_SHARE_cond_writer, &cshare->cond_writer, 0);
676 
677  cshare->running_threads= num_threads;
678  cshare->total_threads= num_threads;
679  cshare->error= 0; /* Initialize. */
680  cshare->buffer= read_cache->buffer;
681  cshare->read_end= NULL; /* See function comment of lock_io_cache(). */
682  cshare->pos_in_file= 0; /* See function comment of lock_io_cache(). */
683  cshare->source_cache= write_cache; /* Can be NULL. */
684 
685  read_cache->share= cshare;
686  read_cache->read_function= _my_b_read_r;
687  read_cache->current_pos= NULL;
688  read_cache->current_end= NULL;
689 
690  if (write_cache)
691  write_cache->share= cshare;
692 
693  DBUG_VOID_RETURN;
694 }
695 
696 
697 /*
698  Remove a thread from shared access to IO_CACHE.
699 
700  SYNOPSIS
701  remove_io_thread()
702  cache The IO_CACHE to be removed from the share.
703 
704  NOTE
705 
706  Every thread must do that on exit for not to deadlock other threads.
707 
708  The last thread destroys the pthread resources.
709 
710  A writer flushes its cache first.
711 
712  RETURN
713  void
714 */
715 
716 void remove_io_thread(IO_CACHE *cache)
717 {
718  IO_CACHE_SHARE *cshare= cache->share;
719  uint total;
720  DBUG_ENTER("remove_io_thread");
721 
722  /* If the writer goes, it needs to flush the write cache. */
723  if (cache == cshare->source_cache)
724  flush_io_cache(cache);
725 
726  mysql_mutex_lock(&cshare->mutex);
727  DBUG_PRINT("io_cache_share", ("%s: 0x%lx",
728  (cache == cshare->source_cache) ?
729  "writer" : "reader", (long) cache));
730 
731  /* Remove from share. */
732  total= --cshare->total_threads;
733  DBUG_PRINT("io_cache_share", ("remaining threads: %u", total));
734 
735  /* Detach from share. */
736  cache->share= NULL;
737 
738  /* If the writer goes, let the readers know. */
739  if (cache == cshare->source_cache)
740  {
741  DBUG_PRINT("io_cache_share", ("writer leaves"));
742  cshare->source_cache= NULL;
743  }
744 
745  /* If all threads are waiting for me to join the lock, wake them. */
746  if (!--cshare->running_threads)
747  {
748  DBUG_PRINT("io_cache_share", ("the last running thread leaves, wake all"));
749  mysql_cond_signal(&cshare->cond_writer);
750  mysql_cond_broadcast(&cshare->cond);
751  }
752 
753  mysql_mutex_unlock(&cshare->mutex);
754 
755  if (!total)
756  {
757  DBUG_PRINT("io_cache_share", ("last thread removed, destroy share"));
758  mysql_cond_destroy (&cshare->cond_writer);
759  mysql_cond_destroy (&cshare->cond);
760  mysql_mutex_destroy(&cshare->mutex);
761  }
762 
763  DBUG_VOID_RETURN;
764 }
765 
766 
767 /*
768  Lock IO cache and wait for all other threads to join.
769 
770  SYNOPSIS
771  lock_io_cache()
772  cache The cache of the thread entering the lock.
773  pos File position of the block to read.
774  Unused for the write thread.
775 
776  DESCRIPTION
777 
778  Wait for all threads to finish with the current buffer. We want
779  all threads to proceed in concert. The last thread to join
780  lock_io_cache() will read the block from file and all threads start
781  to use it. Then they will join again for reading the next block.
782 
783  The waiting threads detect a fresh buffer by comparing
784  cshare->pos_in_file with the position they want to process next.
785  Since the first block may start at position 0, we take
786  cshare->read_end as an additional condition. This variable is
787  initialized to NULL and will be set after a block of data is written
788  to the buffer.
789 
790  RETURN
791  1 OK, lock in place, go ahead and read.
792  0 OK, unlocked, another thread did the read.
793 */
794 
795 static int lock_io_cache(IO_CACHE *cache, my_off_t pos)
796 {
797  IO_CACHE_SHARE *cshare= cache->share;
798  DBUG_ENTER("lock_io_cache");
799 
800  /* Enter the lock. */
801  mysql_mutex_lock(&cshare->mutex);
802  cshare->running_threads--;
803  DBUG_PRINT("io_cache_share", ("%s: 0x%lx pos: %lu running: %u",
804  (cache == cshare->source_cache) ?
805  "writer" : "reader", (long) cache, (ulong) pos,
806  cshare->running_threads));
807 
808  if (cshare->source_cache)
809  {
810  /* A write cache is synchronized to the read caches. */
811 
812  if (cache == cshare->source_cache)
813  {
814  /* The writer waits until all readers are here. */
815  while (cshare->running_threads)
816  {
817  DBUG_PRINT("io_cache_share", ("writer waits in lock"));
818  mysql_cond_wait(&cshare->cond_writer, &cshare->mutex);
819  }
820  DBUG_PRINT("io_cache_share", ("writer awoke, going to copy"));
821 
822  /* Stay locked. Leave the lock later by unlock_io_cache(). */
823  DBUG_RETURN(1);
824  }
825 
826  /* The last thread wakes the writer. */
827  if (!cshare->running_threads)
828  {
829  DBUG_PRINT("io_cache_share", ("waking writer"));
830  mysql_cond_signal(&cshare->cond_writer);
831  }
832 
833  /*
834  Readers wait until the data is copied from the writer. Another
835  reason to stop waiting is the removal of the write thread. If this
836  happens, we leave the lock with old data in the buffer.
837  */
838  while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
839  cshare->source_cache)
840  {
841  DBUG_PRINT("io_cache_share", ("reader waits in lock"));
842  mysql_cond_wait(&cshare->cond, &cshare->mutex);
843  }
844 
845  /*
846  If the writer was removed from the share while this thread was
847  asleep, we need to simulate an EOF condition. The writer cannot
848  reset the share variables as they might still be in use by readers
849  of the last block. When we awake here then because the last
850  joining thread signalled us. If the writer is not the last, it
851  will not signal. So it is safe to clear the buffer here.
852  */
853  if (!cshare->read_end || (cshare->pos_in_file < pos))
854  {
855  DBUG_PRINT("io_cache_share", ("reader found writer removed. EOF"));
856  cshare->read_end= cshare->buffer; /* Empty buffer. */
857  cshare->error= 0; /* EOF is not an error. */
858  }
859  }
860  else
861  {
862  /*
863  There are read caches only. The last thread arriving in
864  lock_io_cache() continues with a locked cache and reads the block.
865  */
866  if (!cshare->running_threads)
867  {
868  DBUG_PRINT("io_cache_share", ("last thread joined, going to read"));
869  /* Stay locked. Leave the lock later by unlock_io_cache(). */
870  DBUG_RETURN(1);
871  }
872 
873  /*
874  All other threads wait until the requested block is read by the
875  last thread arriving. Another reason to stop waiting is the
876  removal of a thread. If this leads to all threads being in the
877  lock, we have to continue also. The first of the awaken threads
878  will then do the read.
879  */
880  while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
881  cshare->running_threads)
882  {
883  DBUG_PRINT("io_cache_share", ("reader waits in lock"));
884  mysql_cond_wait(&cshare->cond, &cshare->mutex);
885  }
886 
887  /* If the block is not yet read, continue with a locked cache and read. */
888  if (!cshare->read_end || (cshare->pos_in_file < pos))
889  {
890  DBUG_PRINT("io_cache_share", ("reader awoke, going to read"));
891  /* Stay locked. Leave the lock later by unlock_io_cache(). */
892  DBUG_RETURN(1);
893  }
894 
895  /* Another thread did read the block already. */
896  }
897  DBUG_PRINT("io_cache_share", ("reader awoke, going to process %u bytes",
898  (uint) (cshare->read_end ? (size_t)
899  (cshare->read_end - cshare->buffer) :
900  0)));
901 
902  /*
903  Leave the lock. Do not call unlock_io_cache() later. The thread that
904  filled the buffer did this and marked all threads as running.
905  */
906  mysql_mutex_unlock(&cshare->mutex);
907  DBUG_RETURN(0);
908 }
909 
910 
911 /*
912  Unlock IO cache.
913 
914  SYNOPSIS
915  unlock_io_cache()
916  cache The cache of the thread leaving the lock.
917 
918  NOTE
919  This is called by the thread that filled the buffer. It marks all
920  threads as running and awakes them. This must not be done by any
921  other thread.
922 
923  Do not signal cond_writer. Either there is no writer or the writer
924  is the only one who can call this function.
925 
926  The reason for resetting running_threads to total_threads before
927  waking all other threads is that it could be possible that this
928  thread is so fast with processing the buffer that it enters the lock
929  before even one other thread has left it. If every awoken thread
930  would increase running_threads by one, this thread could think that
931  he is again the last to join and would not wait for the other
932  threads to process the data.
933 
934  RETURN
935  void
936 */
937 
938 static void unlock_io_cache(IO_CACHE *cache)
939 {
940  IO_CACHE_SHARE *cshare= cache->share;
941  DBUG_ENTER("unlock_io_cache");
942  DBUG_PRINT("io_cache_share", ("%s: 0x%lx pos: %lu running: %u",
943  (cache == cshare->source_cache) ?
944  "writer" : "reader",
945  (long) cache, (ulong) cshare->pos_in_file,
946  cshare->total_threads));
947 
948  cshare->running_threads= cshare->total_threads;
949  mysql_cond_broadcast(&cshare->cond);
950  mysql_mutex_unlock(&cshare->mutex);
951  DBUG_VOID_RETURN;
952 }
953 
954 
955 /*
956  Read from IO_CACHE when it is shared between several threads.
957 
958  SYNOPSIS
959  _my_b_read_r()
960  cache IO_CACHE pointer
961  Buffer Buffer to retrieve count bytes from file
962  Count Number of bytes to read into Buffer
963 
964  NOTE
965  This function is only called from the my_b_read() macro when there
966  isn't enough characters in the buffer to satisfy the request.
967 
968  IMPLEMENTATION
969 
970  It works as follows: when a thread tries to read from a file (that
971  is, after using all the data from the (shared) buffer), it just
972  hangs on lock_io_cache(), waiting for other threads. When the very
973  last thread attempts a read, lock_io_cache() returns 1, the thread
974  does actual IO and unlock_io_cache(), which signals all the waiting
975  threads that data is in the buffer.
976 
977  WARNING
978 
979  When changing this function, be careful with handling file offsets
980  (end-of_file, pos_in_file). Do not cast them to possibly smaller
981  types than my_off_t unless you can be sure that their value fits.
982  Same applies to differences of file offsets. (Bug #11527)
983 
984  When changing this function, check _my_b_read(). It might need the
985  same change.
986 
987  RETURN
988  0 we succeeded in reading all data
989  1 Error: can't read requested characters
990 */
991 
992 int _my_b_read_r(register IO_CACHE *cache, uchar *Buffer, size_t Count)
993 {
994  my_off_t pos_in_file;
995  size_t length, diff_length, left_length;
996  IO_CACHE_SHARE *cshare= cache->share;
997  DBUG_ENTER("_my_b_read_r");
998 
999  if ((left_length= (size_t) (cache->read_end - cache->read_pos)))
1000  {
1001  DBUG_ASSERT(Count >= left_length); /* User is not using my_b_read() */
1002  memcpy(Buffer, cache->read_pos, left_length);
1003  Buffer+= left_length;
1004  Count-= left_length;
1005  }
1006  while (Count)
1007  {
1008  size_t cnt, len;
1009 
1010  pos_in_file= cache->pos_in_file + (cache->read_end - cache->buffer);
1011  diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
1012  length=IO_ROUND_UP(Count+diff_length)-diff_length;
1013  length= ((length <= cache->read_length) ?
1014  length + IO_ROUND_DN(cache->read_length - length) :
1015  length - IO_ROUND_UP(length - cache->read_length));
1016  if (cache->type != READ_FIFO &&
1017  (length > (cache->end_of_file - pos_in_file)))
1018  length= (size_t) (cache->end_of_file - pos_in_file);
1019  if (length == 0)
1020  {
1021  cache->error= (int) left_length;
1022  DBUG_RETURN(1);
1023  }
1024  if (lock_io_cache(cache, pos_in_file))
1025  {
1026  /* With a synchronized write/read cache we won't come here... */
1027  DBUG_ASSERT(!cshare->source_cache);
1028  /*
1029  ... unless the writer has gone before this thread entered the
1030  lock. Simulate EOF in this case. It can be distinguished by
1031  cache->file.
1032  */
1033  if (cache->file < 0)
1034  len= 0;
1035  else
1036  {
1037  /*
1038  Whenever a function which operates on IO_CACHE flushes/writes
1039  some part of the IO_CACHE to disk it will set the property
1040  "seek_not_done" to indicate this to other functions operating
1041  on the IO_CACHE.
1042  */
1043  if (cache->seek_not_done)
1044  {
1045  if (mysql_file_seek(cache->file, pos_in_file, MY_SEEK_SET, MYF(0))
1046  == MY_FILEPOS_ERROR)
1047  {
1048  cache->error= -1;
1049  unlock_io_cache(cache);
1050  DBUG_RETURN(1);
1051  }
1052  }
1053  len= mysql_file_read(cache->file, cache->buffer, length, cache->myflags);
1054  }
1055  DBUG_PRINT("io_cache_share", ("read %lu bytes", (ulong) len));
1056 
1057  cache->read_end= cache->buffer + (len == (size_t) -1 ? 0 : len);
1058  cache->error= (len == length ? 0 : (int) len);
1059  cache->pos_in_file= pos_in_file;
1060 
1061  /* Copy important values to the share. */
1062  cshare->error= cache->error;
1063  cshare->read_end= cache->read_end;
1064  cshare->pos_in_file= pos_in_file;
1065 
1066  /* Mark all threads as running and wake them. */
1067  unlock_io_cache(cache);
1068  }
1069  else
1070  {
1071  /*
1072  With a synchronized write/read cache readers always come here.
1073  Copy important values from the share.
1074  */
1075  cache->error= cshare->error;
1076  cache->read_end= cshare->read_end;
1077  cache->pos_in_file= cshare->pos_in_file;
1078 
1079  len= ((cache->error == -1) ? (size_t) -1 :
1080  (size_t) (cache->read_end - cache->buffer));
1081  }
1082  cache->read_pos= cache->buffer;
1083  cache->seek_not_done= 0;
1084  if (len == 0 || len == (size_t) -1)
1085  {
1086  DBUG_PRINT("io_cache_share", ("reader error. len %lu left %lu",
1087  (ulong) len, (ulong) left_length));
1088  cache->error= (int) left_length;
1089  DBUG_RETURN(1);
1090  }
1091  cnt= (len > Count) ? Count : len;
1092  memcpy(Buffer, cache->read_pos, cnt);
1093  Count -= cnt;
1094  Buffer+= cnt;
1095  left_length+= cnt;
1096  cache->read_pos+= cnt;
1097  }
1098  DBUG_RETURN(0);
1099 }
1100 
1101 
1102 /*
1103  Copy data from write cache to read cache.
1104 
1105  SYNOPSIS
1106  copy_to_read_buffer()
1107  write_cache The write cache.
1108  write_buffer The source of data, mostly the cache buffer.
1109  write_length The number of bytes to copy.
1110 
1111  NOTE
1112  The write thread will wait for all read threads to join the cache
1113  lock. Then it copies the data over and wakes the read threads.
1114 
1115  RETURN
1116  void
1117 */
1118 
1119 static void copy_to_read_buffer(IO_CACHE *write_cache,
1120  const uchar *write_buffer, size_t write_length)
1121 {
1122  IO_CACHE_SHARE *cshare= write_cache->share;
1123 
1124  DBUG_ASSERT(cshare->source_cache == write_cache);
1125  /*
1126  write_length is usually less or equal to buffer_length.
1127  It can be bigger if _my_b_write() is called with a big length.
1128  */
1129  while (write_length)
1130  {
1131  size_t copy_length= MY_MIN(write_length, write_cache->buffer_length);
1132  int __attribute__((unused)) rc;
1133 
1134  rc= lock_io_cache(write_cache, write_cache->pos_in_file);
1135  /* The writing thread does always have the lock when it awakes. */
1136  DBUG_ASSERT(rc);
1137 
1138  memcpy(cshare->buffer, write_buffer, copy_length);
1139 
1140  cshare->error= 0;
1141  cshare->read_end= cshare->buffer + copy_length;
1142  cshare->pos_in_file= write_cache->pos_in_file;
1143 
1144  /* Mark all threads as running and wake them. */
1145  unlock_io_cache(write_cache);
1146 
1147  write_buffer+= copy_length;
1148  write_length-= copy_length;
1149  }
1150 }
1151 
1152 
1153 /*
1154  Do sequential read from the SEQ_READ_APPEND cache.
1155 
1156  We do this in three stages:
1157  - first read from info->buffer
1158  - then if there are still data to read, try the file descriptor
1159  - afterwards, if there are still data to read, try append buffer
1160 
1161  RETURNS
1162  0 Success
1163  1 Failed to read
1164 */
1165 
1166 int _my_b_seq_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
1167 {
1168  size_t length, diff_length, left_length, save_count, max_length;
1169  my_off_t pos_in_file;
1170  save_count=Count;
1171 
1172  /* first, read the regular buffer */
1173  if ((left_length=(size_t) (info->read_end-info->read_pos)))
1174  {
1175  DBUG_ASSERT(Count > left_length); /* User is not using my_b_read() */
1176  memcpy(Buffer,info->read_pos, left_length);
1177  Buffer+=left_length;
1178  Count-=left_length;
1179  }
1180  lock_append_buffer(info);
1181 
1182  /* pos_in_file always point on where info->buffer was read */
1183  if ((pos_in_file=info->pos_in_file +
1184  (size_t) (info->read_end - info->buffer)) >= info->end_of_file)
1185  goto read_append_buffer;
1186 
1187  /*
1188  With read-append cache we must always do a seek before we read,
1189  because the write could have moved the file pointer astray
1190  */
1191  if (mysql_file_seek(info->file, pos_in_file, MY_SEEK_SET, MYF(0)) == MY_FILEPOS_ERROR)
1192  {
1193  info->error= -1;
1194  unlock_append_buffer(info);
1195  return (1);
1196  }
1197  info->seek_not_done=0;
1198 
1199  diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
1200 
1201  /* now the second stage begins - read from file descriptor */
1202  if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
1203  {
1204  /* Fill first intern buffer */
1205  size_t read_length;
1206 
1207  length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
1208  if ((read_length= mysql_file_read(info->file,Buffer, length,
1209  info->myflags)) == (size_t) -1)
1210  {
1211  info->error= -1;
1212  unlock_append_buffer(info);
1213  return 1;
1214  }
1215  Count-=read_length;
1216  Buffer+=read_length;
1217  pos_in_file+=read_length;
1218 
1219  if (read_length != length)
1220  {
1221  /*
1222  We only got part of data; Read the rest of the data from the
1223  write buffer
1224  */
1225  goto read_append_buffer;
1226  }
1227  left_length+=length;
1228  diff_length=0;
1229  }
1230 
1231  max_length= info->read_length-diff_length;
1232  if (max_length > (info->end_of_file - pos_in_file))
1233  max_length= (size_t) (info->end_of_file - pos_in_file);
1234  if (!max_length)
1235  {
1236  if (Count)
1237  goto read_append_buffer;
1238  length=0; /* Didn't read any more chars */
1239  }
1240  else
1241  {
1242  length= mysql_file_read(info->file,info->buffer, max_length, info->myflags);
1243  if (length == (size_t) -1)
1244  {
1245  info->error= -1;
1246  unlock_append_buffer(info);
1247  return 1;
1248  }
1249  if (length < Count)
1250  {
1251  memcpy(Buffer, info->buffer, length);
1252  Count -= length;
1253  Buffer += length;
1254 
1255  /*
1256  added the line below to make
1257  DBUG_ASSERT(pos_in_file==info->end_of_file) pass.
1258  otherwise this does not appear to be needed
1259  */
1260  pos_in_file += length;
1261  goto read_append_buffer;
1262  }
1263  }
1264  unlock_append_buffer(info);
1265  info->read_pos=info->buffer+Count;
1266  info->read_end=info->buffer+length;
1267  info->pos_in_file=pos_in_file;
1268  memcpy(Buffer,info->buffer,(size_t) Count);
1269  return 0;
1270 
1271 read_append_buffer:
1272 
1273  /*
1274  Read data from the current write buffer.
1275  Count should never be == 0 here (The code will work even if count is 0)
1276  */
1277 
1278  {
1279  /* First copy the data to Count */
1280  size_t len_in_buff = (size_t) (info->write_pos - info->append_read_pos);
1281  size_t copy_len;
1282  size_t transfer_len;
1283 
1284  DBUG_ASSERT(info->append_read_pos <= info->write_pos);
1285  /*
1286  TODO: figure out if the assert below is needed or correct.
1287  */
1288  DBUG_ASSERT(pos_in_file == info->end_of_file);
1289  copy_len= MY_MIN(Count, len_in_buff);
1290  memcpy(Buffer, info->append_read_pos, copy_len);
1291  info->append_read_pos += copy_len;
1292  Count -= copy_len;
1293  if (Count)
1294  info->error = save_count - Count;
1295 
1296  /* Fill read buffer with data from write buffer */
1297  memcpy(info->buffer, info->append_read_pos,
1298  (size_t) (transfer_len=len_in_buff - copy_len));
1299  info->read_pos= info->buffer;
1300  info->read_end= info->buffer+transfer_len;
1301  info->append_read_pos=info->write_pos;
1302  info->pos_in_file=pos_in_file+copy_len;
1303  info->end_of_file+=len_in_buff;
1304  }
1305  unlock_append_buffer(info);
1306  return Count ? 1 : 0;
1307 }
1308 
1309 
1310 #ifdef HAVE_AIOWAIT
1311 
1312 /*
1313  Read from the IO_CACHE into a buffer and feed asynchronously
1314  from disk when needed.
1315 
1316  SYNOPSIS
1317  _my_b_async_read()
1318  info IO_CACHE pointer
1319  Buffer Buffer to retrieve count bytes from file
1320  Count Number of bytes to read into Buffer
1321 
1322  RETURN VALUE
1323  -1 An error has occurred; my_errno is set.
1324  0 Success
1325  1 An error has occurred; IO_CACHE to error state.
1326 */
1327 
1328 int _my_b_async_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
1329 {
1330  size_t length,read_length,diff_length,left_length,use_length,org_Count;
1331  size_t max_length;
1332  my_off_t next_pos_in_file;
1333  uchar *read_buffer;
1334 
1335  memcpy(Buffer,info->read_pos,
1336  (left_length= (size_t) (info->read_end-info->read_pos)));
1337  Buffer+=left_length;
1338  org_Count=Count;
1339  Count-=left_length;
1340 
1341  if (info->inited)
1342  { /* wait for read block */
1343  info->inited=0; /* No more block to read */
1344  my_aiowait(&info->aio_result); /* Wait for outstanding req */
1345  if (info->aio_result.result.aio_errno)
1346  {
1347  if (info->myflags & MY_WME)
1348  {
1349  char errbuf[MYSYS_STRERROR_SIZE];
1350  my_error(EE_READ, MYF(ME_BELL+ME_WAITTANG),
1351  my_filename(info->file),
1352  info->aio_result.result.aio_errno,
1353  my_strerror(errbuf, sizeof(errbuf),
1354  info->aio_result.result.aio_errno));
1355  my_errno=info->aio_result.result.aio_errno;
1356  info->error= -1;
1357  return(1);
1358  }
1359  if (! (read_length= (size_t) info->aio_result.result.aio_return) ||
1360  read_length == (size_t) -1)
1361  {
1362  my_errno=0; /* For testing */
1363  info->error= (read_length == (size_t) -1 ? -1 :
1364  (int) (read_length+left_length));
1365  return(1);
1366  }
1367  info->pos_in_file+= (size_t) (info->read_end - info->request_pos);
1368 
1369  if (info->request_pos != info->buffer)
1370  info->request_pos=info->buffer;
1371  else
1372  info->request_pos=info->buffer+info->read_length;
1373  info->read_pos=info->request_pos;
1374  next_pos_in_file=info->aio_read_pos+read_length;
1375 
1376  /* Check if pos_in_file is changed
1377  (_ni_read_cache may have skipped some bytes) */
1378 
1379  if (info->aio_read_pos < info->pos_in_file)
1380  { /* Fix if skipped bytes */
1381  if (info->aio_read_pos + read_length < info->pos_in_file)
1382  {
1383  read_length=0; /* Skip block */
1384  next_pos_in_file=info->pos_in_file;
1385  }
1386  else
1387  {
1388  my_off_t offset= (info->pos_in_file - info->aio_read_pos);
1389  info->pos_in_file=info->aio_read_pos; /* Whe are here */
1390  info->read_pos=info->request_pos+offset;
1391  read_length-=offset; /* Bytes left from read_pos */
1392  }
1393  }
1394 #ifndef DBUG_OFF
1395  if (info->aio_read_pos > info->pos_in_file)
1396  {
1397  my_errno=EINVAL;
1398  return(info->read_length= (size_t) -1);
1399  }
1400 #endif
1401  /* Copy found bytes to buffer */
1402  length= MY_MIN(Count, read_length);
1403  memcpy(Buffer,info->read_pos,(size_t) length);
1404  Buffer+=length;
1405  Count-=length;
1406  left_length+=length;
1407  info->read_end=info->rc_pos+read_length;
1408  info->read_pos+=length;
1409  }
1410  else
1411  next_pos_in_file=(info->pos_in_file+ (size_t)
1412  (info->read_end - info->request_pos));
1413 
1414  /* If reading large blocks, or first read or read with skip */
1415  if (Count)
1416  {
1417  if (next_pos_in_file == info->end_of_file)
1418  {
1419  info->error=(int) (read_length+left_length);
1420  return 1;
1421  }
1422 
1423  if (mysql_file_seek(info->file, next_pos_in_file, MY_SEEK_SET, MYF(0))
1424  == MY_FILEPOS_ERROR)
1425  {
1426  info->error= -1;
1427  return (1);
1428  }
1429 
1430  read_length=IO_SIZE*2- (size_t) (next_pos_in_file & (IO_SIZE-1));
1431  if (Count < read_length)
1432  { /* Small block, read to cache */
1433  if ((read_length=mysql_file_read(info->file,info->request_pos,
1434  read_length, info->myflags)) == (size_t) -1)
1435  return info->error= -1;
1436  use_length= MY_MIN(Count, read_length);
1437  memcpy(Buffer,info->request_pos,(size_t) use_length);
1438  info->read_pos=info->request_pos+Count;
1439  info->read_end=info->request_pos+read_length;
1440  info->pos_in_file=next_pos_in_file; /* Start of block in cache */
1441  next_pos_in_file+=read_length;
1442 
1443  if (Count != use_length)
1444  { /* Didn't find hole block */
1445  if (info->myflags & (MY_WME | MY_FAE | MY_FNABP) && Count != org_Count)
1446  {
1447  char errbuf[MYSYS_STRERROR_SIZE];
1448  my_error(EE_EOFERR, MYF(ME_BELL+ME_WAITTANG), my_filename(info->file),
1449  my_errno, my_strerror(errbuf, sizeof(errbuf), my_errno));
1450  }
1451  info->error=(int) (read_length+left_length);
1452  return 1;
1453  }
1454  }
1455  else
1456  { /* Big block, don't cache it */
1457  if ((read_length= mysql_file_read(info->file, Buffer, Count,info->myflags))
1458  != Count)
1459  {
1460  info->error= read_length == (size_t) -1 ? -1 : read_length+left_length;
1461  return 1;
1462  }
1463  info->read_pos=info->read_end=info->request_pos;
1464  info->pos_in_file=(next_pos_in_file+=Count);
1465  }
1466  }
1467 
1468  /* Read next block with asyncronic io */
1469  diff_length=(next_pos_in_file & (IO_SIZE-1));
1470  max_length= info->read_length - diff_length;
1471  if (max_length > info->end_of_file - next_pos_in_file)
1472  max_length= (size_t) (info->end_of_file - next_pos_in_file);
1473 
1474  if (info->request_pos != info->buffer)
1475  read_buffer=info->buffer;
1476  else
1477  read_buffer=info->buffer+info->read_length;
1478  info->aio_read_pos=next_pos_in_file;
1479  if (max_length)
1480  {
1481  info->aio_result.result.aio_errno=AIO_INPROGRESS; /* Marker for test */
1482  DBUG_PRINT("aioread",("filepos: %ld length: %lu",
1483  (ulong) next_pos_in_file, (ulong) max_length));
1484  if (aioread(info->file,read_buffer, max_length,
1485  (my_off_t) next_pos_in_file,MY_SEEK_SET,
1486  &info->aio_result.result))
1487  { /* Skip async io */
1488  my_errno=errno;
1489  DBUG_PRINT("error",("got error: %d, aio_result: %d from aioread, async skipped",
1490  errno, info->aio_result.result.aio_errno));
1491  if (info->request_pos != info->buffer)
1492  {
1493  bmove(info->buffer,info->request_pos,
1494  (size_t) (info->read_end - info->read_pos));
1495  info->request_pos=info->buffer;
1496  info->read_pos-=info->read_length;
1497  info->read_end-=info->read_length;
1498  }
1499  info->read_length=info->buffer_length; /* Use hole buffer */
1500  info->read_function=_my_b_read; /* Use normal IO_READ next */
1501  }
1502  else
1503  info->inited=info->aio_result.pending=1;
1504  }
1505  return 0; /* Block read, async in use */
1506 } /* _my_b_async_read */
1507 #endif
1508 
1509 
1510 /* Read one byte when buffer is empty */
1511 
1512 int _my_b_get(IO_CACHE *info)
1513 {
1514  uchar buff;
1515  IO_CACHE_CALLBACK pre_read,post_read;
1516  if ((pre_read = info->pre_read))
1517  (*pre_read)(info);
1518  if ((*(info)->read_function)(info,&buff,1))
1519  return my_b_EOF;
1520  if ((post_read = info->post_read))
1521  (*post_read)(info);
1522  return (int) (uchar) buff;
1523 }
1524 
1525 /*
1526  Write a byte buffer to IO_CACHE and flush to disk
1527  if IO_CACHE is full.
1528 
1529  RETURN VALUE
1530  1 On error on write
1531  0 On success
1532  -1 On error; my_errno contains error code.
1533 */
1534 
1535 int _my_b_write(register IO_CACHE *info, const uchar *Buffer, size_t Count)
1536 {
1537  size_t rest_length,length;
1538  my_off_t pos_in_file= info->pos_in_file;
1539 
1540  DBUG_EXECUTE_IF("simulate_huge_load_data_file",
1541  {
1542  pos_in_file=(my_off_t)(5000000000ULL);
1543  });
1544  if (pos_in_file+info->buffer_length > info->end_of_file)
1545  {
1546  my_errno=errno=EFBIG;
1547  return info->error = -1;
1548  }
1549 
1550  rest_length= (size_t) (info->write_end - info->write_pos);
1551  memcpy(info->write_pos,Buffer,(size_t) rest_length);
1552  Buffer+=rest_length;
1553  Count-=rest_length;
1554  info->write_pos+=rest_length;
1555 
1556  if (my_b_flush_io_cache(info,1))
1557  return 1;
1558  if (Count >= IO_SIZE)
1559  { /* Fill first intern buffer */
1560  length=Count & (size_t) ~(IO_SIZE-1);
1561  if (info->seek_not_done)
1562  {
1563  /*
1564  Whenever a function which operates on IO_CACHE flushes/writes
1565  some part of the IO_CACHE to disk it will set the property
1566  "seek_not_done" to indicate this to other functions operating
1567  on the IO_CACHE.
1568  */
1569  if (mysql_file_seek(info->file, info->pos_in_file, MY_SEEK_SET, MYF(0)))
1570  {
1571  info->error= -1;
1572  return (1);
1573  }
1574  info->seek_not_done=0;
1575  }
1576  if (mysql_file_write(info->file, Buffer, length, info->myflags | MY_NABP))
1577  return info->error= -1;
1578 
1579  /*
1580  In case of a shared I/O cache with a writer we normally do direct
1581  write cache to read cache copy. Simulate this here by direct
1582  caller buffer to read cache copy. Do it after the write so that
1583  the cache readers actions on the flushed part can go in parallel
1584  with the write of the extra stuff. copy_to_read_buffer()
1585  synchronizes writer and readers so that after this call the
1586  readers can act on the extra stuff while the writer can go ahead
1587  and prepare the next output. copy_to_read_buffer() relies on
1588  info->pos_in_file.
1589  */
1590  if (info->share)
1591  copy_to_read_buffer(info, Buffer, length);
1592 
1593  Count-=length;
1594  Buffer+=length;
1595  info->pos_in_file+=length;
1596  }
1597  memcpy(info->write_pos,Buffer,(size_t) Count);
1598  info->write_pos+=Count;
1599  return 0;
1600 }
1601 
1602 
1603 /*
1604  Append a block to the write buffer.
1605  This is done with the buffer locked to ensure that we don't read from
1606  the write buffer before we are ready with it.
1607 */
1608 
1609 int my_b_append(register IO_CACHE *info, const uchar *Buffer, size_t Count)
1610 {
1611  size_t rest_length,length;
1612 
1613  /*
1614  Assert that we cannot come here with a shared cache. If we do one
1615  day, we might need to add a call to copy_to_read_buffer().
1616  */
1617  DBUG_ASSERT(!info->share);
1618 
1619  lock_append_buffer(info);
1620  rest_length= (size_t) (info->write_end - info->write_pos);
1621  if (Count <= rest_length)
1622  goto end;
1623  memcpy(info->write_pos, Buffer, rest_length);
1624  Buffer+=rest_length;
1625  Count-=rest_length;
1626  info->write_pos+=rest_length;
1627  if (my_b_flush_io_cache(info,0))
1628  {
1629  unlock_append_buffer(info);
1630  return 1;
1631  }
1632  if (Count >= IO_SIZE)
1633  { /* Fill first intern buffer */
1634  length=Count & (size_t) ~(IO_SIZE-1);
1635  if (mysql_file_write(info->file,Buffer, length, info->myflags | MY_NABP))
1636  {
1637  unlock_append_buffer(info);
1638  return info->error= -1;
1639  }
1640  Count-=length;
1641  Buffer+=length;
1642  info->end_of_file+=length;
1643  }
1644 
1645 end:
1646  memcpy(info->write_pos,Buffer,(size_t) Count);
1647  info->write_pos+=Count;
1648  unlock_append_buffer(info);
1649  return 0;
1650 }
1651 
1652 
1653 int my_b_safe_write(IO_CACHE *info, const uchar *Buffer, size_t Count)
1654 {
1655  /*
1656  Sasha: We are not writing this with the ? operator to avoid hitting
1657  a possible compiler bug. At least gcc 2.95 cannot deal with
1658  several layers of ternary operators that evaluated comma(,) operator
1659  expressions inside - I do have a test case if somebody wants it
1660  */
1661  if (info->type == SEQ_READ_APPEND)
1662  return my_b_append(info, Buffer, Count);
1663  return my_b_write(info, Buffer, Count);
1664 }
1665 
1666 
1667 /*
1668  Write a block to disk where part of the data may be inside the record
1669  buffer. As all write calls to the data goes through the cache,
1670  we will never get a seek over the end of the buffer
1671 */
1672 
1673 int my_block_write(register IO_CACHE *info, const uchar *Buffer, size_t Count,
1674  my_off_t pos)
1675 {
1676  size_t length;
1677  int error=0;
1678 
1679  /*
1680  Assert that we cannot come here with a shared cache. If we do one
1681  day, we might need to add a call to copy_to_read_buffer().
1682  */
1683  DBUG_ASSERT(!info->share);
1684 
1685  if (pos < info->pos_in_file)
1686  {
1687  /* Of no overlap, write everything without buffering */
1688  if (pos + Count <= info->pos_in_file)
1689  return mysql_file_pwrite(info->file, Buffer, Count, pos,
1690  info->myflags | MY_NABP);
1691  /* Write the part of the block that is before buffer */
1692  length= (uint) (info->pos_in_file - pos);
1693  if (mysql_file_pwrite(info->file, Buffer, length, pos, info->myflags | MY_NABP))
1694  info->error= error= -1;
1695  Buffer+=length;
1696  pos+= length;
1697  Count-= length;
1698 #ifndef HAVE_PREAD
1699  info->seek_not_done=1;
1700 #endif
1701  }
1702 
1703  /* Check if we want to write inside the used part of the buffer.*/
1704  length= (size_t) (info->write_end - info->buffer);
1705  if (pos < info->pos_in_file + length)
1706  {
1707  size_t offset= (size_t) (pos - info->pos_in_file);
1708  length-=offset;
1709  if (length > Count)
1710  length=Count;
1711  memcpy(info->buffer+offset, Buffer, length);
1712  Buffer+=length;
1713  Count-= length;
1714  /* Fix length of buffer if the new data was larger */
1715  if (info->buffer+length > info->write_pos)
1716  info->write_pos=info->buffer+length;
1717  if (!Count)
1718  return (error);
1719  }
1720  /* Write at the end of the current buffer; This is the normal case */
1721  if (_my_b_write(info, Buffer, Count))
1722  error= -1;
1723  return error;
1724 }
1725 
1726 
1727  /* Flush write cache */
1728 
1729 #define LOCK_APPEND_BUFFER if (need_append_buffer_lock) \
1730  lock_append_buffer(info);
1731 #define UNLOCK_APPEND_BUFFER if (need_append_buffer_lock) \
1732  unlock_append_buffer(info);
1733 
1734 int my_b_flush_io_cache(IO_CACHE *info,
1735  int need_append_buffer_lock __attribute__((unused)))
1736 {
1737  size_t length;
1738  my_off_t pos_in_file;
1739  my_bool append_cache= (info->type == SEQ_READ_APPEND);
1740  DBUG_ENTER("my_b_flush_io_cache");
1741  DBUG_PRINT("enter", ("cache: 0x%lx", (long) info));
1742 
1743  if (!append_cache)
1744  need_append_buffer_lock= 0;
1745 
1746  if (info->type == WRITE_CACHE || append_cache)
1747  {
1748  if (info->file == -1)
1749  {
1750  if (real_open_cached_file(info))
1751  DBUG_RETURN((info->error= -1));
1752  }
1753  LOCK_APPEND_BUFFER;
1754 
1755  if ((length=(size_t) (info->write_pos - info->write_buffer)))
1756  {
1757  /*
1758  In case of a shared I/O cache with a writer we do direct write
1759  cache to read cache copy. Do it before the write here so that
1760  the readers can work in parallel with the write.
1761  copy_to_read_buffer() relies on info->pos_in_file.
1762  */
1763  if (info->share)
1764  copy_to_read_buffer(info, info->write_buffer, length);
1765 
1766  pos_in_file=info->pos_in_file;
1767  /*
1768  If we have append cache, we always open the file with
1769  O_APPEND which moves the pos to EOF automatically on every write
1770  */
1771  if (!append_cache && info->seek_not_done)
1772  { /* File touched, do seek */
1773  if (mysql_file_seek(info->file, pos_in_file, MY_SEEK_SET, MYF(0)) ==
1774  MY_FILEPOS_ERROR)
1775  {
1776  UNLOCK_APPEND_BUFFER;
1777  DBUG_RETURN((info->error= -1));
1778  }
1779  if (!append_cache)
1780  info->seek_not_done=0;
1781  }
1782  if (!append_cache)
1783  info->pos_in_file+=length;
1784  info->write_end= (info->write_buffer+info->buffer_length-
1785  ((pos_in_file+length) & (IO_SIZE-1)));
1786 
1787  if (mysql_file_write(info->file,info->write_buffer,length,
1788  info->myflags | MY_NABP))
1789  info->error= -1;
1790  else
1791  info->error= 0;
1792  if (!append_cache)
1793  {
1794  set_if_bigger(info->end_of_file,(pos_in_file+length));
1795  }
1796  else
1797  {
1798  info->end_of_file+=(info->write_pos-info->append_read_pos);
1799  DBUG_ASSERT(info->end_of_file == mysql_file_tell(info->file, MYF(0)));
1800  }
1801 
1802  info->append_read_pos=info->write_pos=info->write_buffer;
1803  ++info->disk_writes;
1804  UNLOCK_APPEND_BUFFER;
1805  DBUG_RETURN(info->error);
1806  }
1807  }
1808 #ifdef HAVE_AIOWAIT
1809  else if (info->type != READ_NET)
1810  {
1811  my_aiowait(&info->aio_result); /* Wait for outstanding req */
1812  info->inited=0;
1813  }
1814 #endif
1815  UNLOCK_APPEND_BUFFER;
1816  DBUG_RETURN(0);
1817 }
1818 
1819 /*
1820  Free an IO_CACHE object
1821 
1822  SYNOPSOS
1823  end_io_cache()
1824  info IO_CACHE Handle to free
1825 
1826  NOTES
1827  It's currently safe to call this if one has called init_io_cache()
1828  on the 'info' object, even if init_io_cache() failed.
1829  This function is also safe to call twice with the same handle.
1830 
1831  RETURN
1832  0 ok
1833  # Error
1834 */
1835 
1836 int end_io_cache(IO_CACHE *info)
1837 {
1838  int error=0;
1839  IO_CACHE_CALLBACK pre_close;
1840  DBUG_ENTER("end_io_cache");
1841  DBUG_PRINT("enter",("cache: 0x%lx", (ulong) info));
1842 
1843  /*
1844  Every thread must call remove_io_thread(). The last one destroys
1845  the share elements.
1846  */
1847  DBUG_ASSERT(!info->share || !info->share->total_threads);
1848 
1849  if ((pre_close=info->pre_close))
1850  {
1851  (*pre_close)(info);
1852  info->pre_close= 0;
1853  }
1854  if (info->alloced_buffer)
1855  {
1856  info->alloced_buffer=0;
1857  if (info->file != -1) /* File doesn't exist */
1858  error= my_b_flush_io_cache(info,1);
1859  my_free(info->buffer);
1860  info->buffer=info->read_pos=(uchar*) 0;
1861  }
1862  if (info->type == SEQ_READ_APPEND)
1863  {
1864  /* Destroy allocated mutex */
1865  info->type= TYPE_NOT_SET;
1866  mysql_mutex_destroy(&info->append_buffer_lock);
1867  }
1868  DBUG_RETURN(error);
1869 } /* end_io_cache */
1870 
1871 
1872 /**********************************************************************
1873  Testing of MF_IOCACHE
1874 **********************************************************************/
1875 
1876 #ifdef MAIN
1877 
1878 #include <my_dir.h>
1879 
1880 void die(const char* fmt, ...)
1881 {
1882  va_list va_args;
1883  va_start(va_args,fmt);
1884  fprintf(stderr,"Error:");
1885  vfprintf(stderr, fmt,va_args);
1886  fprintf(stderr,", errno=%d\n", errno);
1887  exit(1);
1888 }
1889 
1890 int open_file(const char* fname, IO_CACHE* info, int cache_size)
1891 {
1892  int fd;
1893  if ((fd=my_open(fname,O_CREAT | O_RDWR,MYF(MY_WME))) < 0)
1894  die("Could not open %s", fname);
1895  if (init_io_cache(info, fd, cache_size, SEQ_READ_APPEND, 0,0,MYF(MY_WME)))
1896  die("failed in init_io_cache()");
1897  return fd;
1898 }
1899 
1900 void close_file(IO_CACHE* info)
1901 {
1902  end_io_cache(info);
1903  my_close(info->file, MYF(MY_WME));
1904 }
1905 
1906 int main(int argc, char** argv)
1907 {
1908  IO_CACHE sra_cache; /* SEQ_READ_APPEND */
1909  MY_STAT status;
1910  const char* fname="/tmp/iocache.test";
1911  int cache_size=16384;
1912  char llstr_buf[22];
1913  int max_block,total_bytes=0;
1914  int i,num_loops=100,error=0;
1915  char *p;
1916  char* block, *block_end;
1917  MY_INIT(argv[0]);
1918  max_block = cache_size*3;
1919  if (!(block=(char*)my_malloc(max_block,MYF(MY_WME))))
1920  die("Not enough memory to allocate test block");
1921  block_end = block + max_block;
1922  for (p = block,i=0; p < block_end;i++)
1923  {
1924  *p++ = (char)i;
1925  }
1926  if (my_stat(fname,&status, MYF(0)) &&
1927  my_delete(fname,MYF(MY_WME)))
1928  {
1929  die("Delete of %s failed, aborting", fname);
1930  }
1931  open_file(fname,&sra_cache, cache_size);
1932  for (i = 0; i < num_loops; i++)
1933  {
1934  char buf[4];
1935  int block_size = abs(rand() % max_block);
1936  int4store(buf, block_size);
1937  if (my_b_append(&sra_cache,buf,4) ||
1938  my_b_append(&sra_cache, block, block_size))
1939  die("write failed");
1940  total_bytes += 4+block_size;
1941  }
1942  close_file(&sra_cache);
1943  my_free(block);
1944  if (!my_stat(fname,&status,MYF(MY_WME)))
1945  die("%s failed to stat, but I had just closed it,\
1946  wonder how that happened");
1947  printf("Final size of %s is %s, wrote %d bytes\n",fname,
1948  llstr(status.st_size,llstr_buf),
1949  total_bytes);
1950  my_delete(fname, MYF(MY_WME));
1951  /* check correctness of tests */
1952  if (total_bytes != status.st_size)
1953  {
1954  fprintf(stderr,"Not the same number of bytes acutally in file as bytes \
1955 supposedly written\n");
1956  error=1;
1957  }
1958  exit(error);
1959  return 0;
1960 }
1961 #endif