MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
sort.c
1 /* Copyright (c) 2000, 2011, Oracle and/or its affiliates. All rights reserved.
2 
3  This program is free software; you can redistribute it and/or modify
4  it under the terms of the GNU General Public License as published by
5  the Free Software Foundation; version 2 of the License.
6 
7  This program is distributed in the hope that it will be useful,
8  but WITHOUT ANY WARRANTY; without even the implied warranty of
9  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  GNU General Public License for more details.
11 
12  You should have received a copy of the GNU General Public License
13  along with this program; if not, write to the Free Software
14  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
15 
16 /*
17  Creates a index for a database by reading keys, sorting them and outputing
18  them in sorted order through SORT_INFO functions.
19 */
20 
21 #include "fulltext.h"
22 #if defined(__WIN__)
23 #include <fcntl.h>
24 #else
25 #include <stddef.h>
26 #endif
27 #include <queues.h>
28 
29 /* static variables */
30 
31 #undef MYF_RW
32 #undef DISK_BUFFER_SIZE
33 
34 #define MERGEBUFF 15
35 #define MERGEBUFF2 31
36 #define MYF_RW MYF(MY_NABP | MY_WME | MY_WAIT_IF_FULL)
37 #define DISK_BUFFER_SIZE (IO_SIZE*16)
38 
39 
40 /*
41  Pointers of functions for store and read keys from temp file
42 */
43 
44 extern void print_error(const char *fmt,...);
45 
46 /* Functions defined in this file */
47 
48 static ha_rows find_all_keys(MI_SORT_PARAM *info,uint keys,
49  uchar **sort_keys,
50  DYNAMIC_ARRAY *buffpek,int *maxbuffer,
51  IO_CACHE *tempfile,
52  IO_CACHE *tempfile_for_exceptions);
53 static int write_keys(MI_SORT_PARAM *info,uchar **sort_keys,
54  uint count, BUFFPEK *buffpek,IO_CACHE *tempfile);
55 static int write_key(MI_SORT_PARAM *info, uchar *key,
56  IO_CACHE *tempfile);
57 static int write_index(MI_SORT_PARAM *info,uchar * *sort_keys,
58  uint count);
59 static int merge_many_buff(MI_SORT_PARAM *info,uint keys,
60  uchar * *sort_keys,
61  BUFFPEK *buffpek,int *maxbuffer,
62  IO_CACHE *t_file);
63 static uint read_to_buffer(IO_CACHE *fromfile,BUFFPEK *buffpek,
64  uint sort_length);
65 static int merge_buffers(MI_SORT_PARAM *info,uint keys,
66  IO_CACHE *from_file, IO_CACHE *to_file,
67  uchar * *sort_keys, BUFFPEK *lastbuff,
68  BUFFPEK *Fb, BUFFPEK *Tb);
69 static int merge_index(MI_SORT_PARAM *,uint,uchar **,BUFFPEK *, int,
70  IO_CACHE *);
71 static int flush_ft_buf(MI_SORT_PARAM *info);
72 
73 static int write_keys_varlen(MI_SORT_PARAM *info,uchar **sort_keys,
74  uint count, BUFFPEK *buffpek,
75  IO_CACHE *tempfile);
76 static uint read_to_buffer_varlen(IO_CACHE *fromfile,BUFFPEK *buffpek,
77  uint sort_length);
78 static int write_merge_key(MI_SORT_PARAM *info, IO_CACHE *to_file,
79  uchar *key, uint sort_length, uint count);
80 static int write_merge_key_varlen(MI_SORT_PARAM *info,
81  IO_CACHE *to_file,
82  uchar* key, uint sort_length,
83  uint count);
84 static inline int
85 my_var_write(MI_SORT_PARAM *info, IO_CACHE *to_file, uchar *bufs);
86 
87 /*
88  Creates a index of sorted keys
89 
90  SYNOPSIS
91  _create_index_by_sort()
92  info Sort parameters
93  no_messages Set to 1 if no output
94  sortbuff_size Size if sortbuffer to allocate
95 
96  RESULT
97  0 ok
98  <> 0 Error
99 */
100 
101 int _create_index_by_sort(MI_SORT_PARAM *info,my_bool no_messages,
102  ulonglong sortbuff_size)
103 {
104  int error,maxbuffer,skr;
105  uint sort_length, keys;
106  ulonglong memavl, old_memavl;
107  DYNAMIC_ARRAY buffpek;
108  ha_rows records;
109  uchar **sort_keys;
110  IO_CACHE tempfile, tempfile_for_exceptions;
111  DBUG_ENTER("_create_index_by_sort");
112  DBUG_PRINT("enter",("sort_length: %d", info->key_length));
113 
114  if (info->keyinfo->flag & HA_VAR_LENGTH_KEY)
115  {
116  info->write_keys=write_keys_varlen;
117  info->read_to_buffer=read_to_buffer_varlen;
118  info->write_key= write_merge_key_varlen;
119  }
120  else
121  {
122  info->write_keys=write_keys;
123  info->read_to_buffer=read_to_buffer;
124  info->write_key=write_merge_key;
125  }
126 
127  my_b_clear(&tempfile);
128  my_b_clear(&tempfile_for_exceptions);
129  memset(&buffpek, 0, sizeof(buffpek));
130  sort_keys= (uchar **) NULL; error= 1;
131  maxbuffer=1;
132 
133  memavl= MY_MAX(sortbuff_size, MIN_SORT_BUFFER);
134  records= info->sort_info->max_records;
135  sort_length= info->key_length;
136  LINT_INIT(keys);
137 
138  if ((memavl - sizeof(BUFFPEK)) / (sort_length + sizeof(char *)) > UINT_MAX32)
139  memavl= sizeof(BUFFPEK) + UINT_MAX32 * (sort_length + sizeof(char *));
140 
141  while (memavl >= MIN_SORT_BUFFER)
142  {
143  if ((records < UINT_MAX32) &&
144  ((my_off_t) (records + 1) *
145  (sort_length + sizeof(char*)) <= (my_off_t) memavl))
146  keys= (uint)records+1;
147  else
148  do
149  {
150  skr=maxbuffer;
151  if (memavl < sizeof(BUFFPEK)*(uint) maxbuffer ||
152  (keys=(memavl-sizeof(BUFFPEK)*(uint) maxbuffer)/
153  (sort_length+sizeof(char*))) <= 1 ||
154  keys < (uint) maxbuffer)
155  {
156  mi_check_print_error(info->sort_info->param,
157  "myisam_sort_buffer_size is too small");
158  goto err;
159  }
160  }
161  while ((maxbuffer= (int) (records/(keys-1)+1)) != skr);
162 
163  if ((sort_keys=(uchar **)my_malloc(keys*(sort_length+sizeof(char*))+
164  HA_FT_MAXBYTELEN, MYF(0))))
165  {
166  if (my_init_dynamic_array(&buffpek, sizeof(BUFFPEK), maxbuffer,
167  maxbuffer/2))
168  {
169  my_free(sort_keys);
170  sort_keys= 0;
171  }
172  else
173  break;
174  }
175  old_memavl=memavl;
176  if ((memavl= memavl/4*3) < MIN_SORT_BUFFER && old_memavl > MIN_SORT_BUFFER)
177  memavl= MIN_SORT_BUFFER;
178  }
179  if (memavl < MIN_SORT_BUFFER)
180  {
181  mi_check_print_error(info->sort_info->param,"MyISAM sort buffer too small"); /* purecov: tested */
182  goto err; /* purecov: tested */
183  }
184  (*info->lock_in_memory)(info->sort_info->param);/* Everything is allocated */
185 
186  if (!no_messages)
187  printf(" - Searching for keys, allocating buffer for %d keys\n",keys);
188 
189  if ((records=find_all_keys(info,keys,sort_keys,&buffpek,&maxbuffer,
190  &tempfile,&tempfile_for_exceptions))
191  == HA_POS_ERROR)
192  goto err; /* purecov: tested */
193  if (maxbuffer == 0)
194  {
195  if (!no_messages)
196  printf(" - Dumping %lu keys\n", (ulong) records);
197  if (write_index(info,sort_keys, (uint) records))
198  goto err; /* purecov: inspected */
199  }
200  else
201  {
202  keys=(keys*(sort_length+sizeof(char*)))/sort_length;
203  if (maxbuffer >= MERGEBUFF2)
204  {
205  if (!no_messages)
206  printf(" - Merging %lu keys\n", (ulong) records); /* purecov: tested */
207  if (merge_many_buff(info,keys,sort_keys,
208  dynamic_element(&buffpek,0,BUFFPEK *),&maxbuffer,&tempfile))
209  goto err; /* purecov: inspected */
210  }
211  if (flush_io_cache(&tempfile) ||
212  reinit_io_cache(&tempfile,READ_CACHE,0L,0,0))
213  goto err; /* purecov: inspected */
214  if (!no_messages)
215  printf(" - Last merge and dumping keys\n"); /* purecov: tested */
216  if (merge_index(info,keys,sort_keys,dynamic_element(&buffpek,0,BUFFPEK *),
217  maxbuffer,&tempfile))
218  goto err; /* purecov: inspected */
219  }
220 
221  if (flush_ft_buf(info) || flush_pending_blocks(info))
222  goto err;
223 
224  if (my_b_inited(&tempfile_for_exceptions))
225  {
226  MI_INFO *idx=info->sort_info->info;
227  uint keyno=info->key;
228  uint key_length, ref_length=idx->s->rec_reflength;
229 
230  if (!no_messages)
231  printf(" - Adding exceptions\n"); /* purecov: tested */
232  if (flush_io_cache(&tempfile_for_exceptions) ||
233  reinit_io_cache(&tempfile_for_exceptions,READ_CACHE,0L,0,0))
234  goto err;
235 
236  while (!my_b_read(&tempfile_for_exceptions,(uchar*)&key_length,
237  sizeof(key_length))
238  && !my_b_read(&tempfile_for_exceptions,(uchar*)sort_keys,
239  (uint) key_length))
240  {
241  if (_mi_ck_write(idx,keyno,(uchar*) sort_keys,key_length-ref_length))
242  goto err;
243  }
244  }
245 
246  error =0;
247 
248 err:
249  my_free(sort_keys);
250  delete_dynamic(&buffpek);
251  close_cached_file(&tempfile);
252  close_cached_file(&tempfile_for_exceptions);
253 
254  DBUG_RETURN(error ? -1 : 0);
255 } /* _create_index_by_sort */
256 
257 
258 /* Search after all keys and place them in a temp. file */
259 
260 static ha_rows find_all_keys(MI_SORT_PARAM *info, uint keys,
261  uchar **sort_keys, DYNAMIC_ARRAY *buffpek,
262  int *maxbuffer, IO_CACHE *tempfile,
263  IO_CACHE *tempfile_for_exceptions)
264 {
265  int error;
266  uint idx;
267  DBUG_ENTER("find_all_keys");
268 
269  idx=error=0;
270  sort_keys[0]=(uchar*) (sort_keys+keys);
271 
272  while (!(error=(*info->key_read)(info,sort_keys[idx])))
273  {
274  if (info->real_key_length > info->key_length)
275  {
276  if (write_key(info,sort_keys[idx],tempfile_for_exceptions))
277  DBUG_RETURN(HA_POS_ERROR); /* purecov: inspected */
278  continue;
279  }
280 
281  if (++idx == keys)
282  {
283  if (info->write_keys(info,sort_keys,idx-1,(BUFFPEK *)alloc_dynamic(buffpek),
284  tempfile))
285  DBUG_RETURN(HA_POS_ERROR); /* purecov: inspected */
286 
287  sort_keys[0]=(uchar*) (sort_keys+keys);
288  memcpy(sort_keys[0],sort_keys[idx-1],(size_t) info->key_length);
289  idx=1;
290  }
291  sort_keys[idx]=sort_keys[idx-1]+info->key_length;
292  }
293  if (error > 0)
294  DBUG_RETURN(HA_POS_ERROR); /* Aborted by get_key */ /* purecov: inspected */
295  if (buffpek->elements)
296  {
297  if (info->write_keys(info,sort_keys,idx,(BUFFPEK *)alloc_dynamic(buffpek),
298  tempfile))
299  DBUG_RETURN(HA_POS_ERROR); /* purecov: inspected */
300  *maxbuffer=buffpek->elements-1;
301  }
302  else
303  *maxbuffer=0;
304 
305  DBUG_RETURN((*maxbuffer)*(keys-1)+idx);
306 } /* find_all_keys */
307 
308 
309 /* Search after all keys and place them in a temp. file */
310 
311 pthread_handler_t thr_find_all_keys(void *arg)
312 {
313  MI_SORT_PARAM *sort_param= (MI_SORT_PARAM*) arg;
314  int error;
315  ulonglong memavl, old_memavl;
316  uint keys, sort_length;
317  uint idx, maxbuffer;
318  uchar **sort_keys=0;
319 
320  LINT_INIT(keys);
321 
322  error=1;
323 
324  if (my_thread_init())
325  goto err;
326 
327  { /* Add extra block since DBUG_ENTER declare variables */
328  DBUG_ENTER("thr_find_all_keys");
329  DBUG_PRINT("enter", ("master: %d", sort_param->master));
330  if (sort_param->sort_info->got_error)
331  goto err;
332 
333  if (sort_param->keyinfo->flag & HA_VAR_LENGTH_KEY)
334  {
335  sort_param->write_keys= write_keys_varlen;
336  sort_param->read_to_buffer= read_to_buffer_varlen;
337  sort_param->write_key= write_merge_key_varlen;
338  }
339  else
340  {
341  sort_param->write_keys= write_keys;
342  sort_param->read_to_buffer= read_to_buffer;
343  sort_param->write_key= write_merge_key;
344  }
345 
346  my_b_clear(&sort_param->tempfile);
347  my_b_clear(&sort_param->tempfile_for_exceptions);
348  memset(&sort_param->buffpek, 0, sizeof(sort_param->buffpek));
349  memset(&sort_param->unique, 0, sizeof(sort_param->unique));
350  sort_keys= (uchar **) NULL;
351 
352  memavl= MY_MAX(sort_param->sortbuff_size, MIN_SORT_BUFFER);
353  idx= (uint)sort_param->sort_info->max_records;
354  sort_length= sort_param->key_length;
355  maxbuffer= 1;
356 
357  if ((memavl - sizeof(BUFFPEK)) / (sort_length +
358  sizeof(char *)) > UINT_MAX32)
359  memavl= sizeof(BUFFPEK) + UINT_MAX32 * (sort_length + sizeof(char *));
360 
361  while (memavl >= MIN_SORT_BUFFER)
362  {
363  if ((my_off_t) (idx+1)*(sort_length+sizeof(char*)) <=
364  (my_off_t) memavl)
365  keys= idx+1;
366  else
367  {
368  uint skr;
369  do
370  {
371  skr= maxbuffer;
372  if (memavl < sizeof(BUFFPEK)*maxbuffer ||
373  (keys=(memavl-sizeof(BUFFPEK)*maxbuffer)/
374  (sort_length+sizeof(char*))) <= 1 ||
375  keys < (uint) maxbuffer)
376  {
377  mi_check_print_error(sort_param->sort_info->param,
378  "myisam_sort_buffer_size is too small");
379  goto err;
380  }
381  }
382  while ((maxbuffer= (int) (idx/(keys-1)+1)) != skr);
383  }
384  if ((sort_keys= (uchar**)
385  my_malloc(keys*(sort_length+sizeof(char*))+
386  ((sort_param->keyinfo->flag & HA_FULLTEXT) ?
387  HA_FT_MAXBYTELEN : 0), MYF(0))))
388  {
389  if (my_init_dynamic_array(&sort_param->buffpek, sizeof(BUFFPEK),
390  maxbuffer, maxbuffer/2))
391  {
392  my_free(sort_keys);
393  sort_keys= (uchar **) NULL; /* for err: label */
394  }
395  else
396  break;
397  }
398  old_memavl= memavl;
399  if ((memavl= memavl / 4 * 3) < MIN_SORT_BUFFER &&
400  old_memavl > MIN_SORT_BUFFER)
401  memavl= MIN_SORT_BUFFER;
402  }
403  if (memavl < MIN_SORT_BUFFER)
404  {
405  mi_check_print_error(sort_param->sort_info->param,
406  "MyISAM sort buffer too small");
407  goto err; /* purecov: tested */
408  }
409 
410  if (sort_param->sort_info->param->testflag & T_VERBOSE)
411  printf("Key %d - Allocating buffer for %d keys\n",
412  sort_param->key + 1, keys);
413  sort_param->sort_keys= sort_keys;
414 
415  idx= error= 0;
416  sort_keys[0]= (uchar*) (sort_keys+keys);
417 
418  DBUG_PRINT("info", ("reading keys"));
419  while (!(error= sort_param->sort_info->got_error) &&
420  !(error= (*sort_param->key_read)(sort_param, sort_keys[idx])))
421  {
422  if (sort_param->real_key_length > sort_param->key_length)
423  {
424  if (write_key(sort_param, sort_keys[idx],
425  &sort_param->tempfile_for_exceptions))
426  goto err;
427  continue;
428  }
429 
430  if (++idx == keys)
431  {
432  if (sort_param->write_keys(sort_param, sort_keys, idx - 1,
433  (BUFFPEK*) alloc_dynamic(&sort_param->buffpek),
434  &sort_param->tempfile))
435  goto err;
436  sort_keys[0]= (uchar*) (sort_keys+keys);
437  memcpy(sort_keys[0], sort_keys[idx - 1], (size_t) sort_param->key_length);
438  idx= 1;
439  }
440  sort_keys[idx]= sort_keys[idx - 1] + sort_param->key_length;
441  }
442  if (error > 0)
443  goto err;
444  if (sort_param->buffpek.elements)
445  {
446  if (sort_param->write_keys(sort_param, sort_keys, idx,
447  (BUFFPEK*) alloc_dynamic(&sort_param->buffpek),
448  &sort_param->tempfile))
449  goto err;
450  sort_param->keys= (sort_param->buffpek.elements - 1) * (keys - 1) + idx;
451  }
452  else
453  sort_param->keys= idx;
454 
455  sort_param->sort_keys_length= keys;
456  goto ok;
457 
458 err:
459  DBUG_PRINT("error", ("got some error"));
460  sort_param->sort_info->got_error= 1; /* no need to protect with a mutex */
461  my_free(sort_keys);
462  sort_param->sort_keys= 0;
463  delete_dynamic(& sort_param->buffpek);
464  close_cached_file(&sort_param->tempfile);
465  close_cached_file(&sort_param->tempfile_for_exceptions);
466 
467 ok:
468  free_root(&sort_param->wordroot, MYF(0));
469  /*
470  Detach from the share if the writer is involved. Avoid others to
471  be blocked. This includes a flush of the write buffer. This will
472  also indicate EOF to the readers.
473  That means that a writer always gets here first and readers -
474  only when they see EOF. But if a reader finishes prematurely
475  because of an error it may reach this earlier - don't allow it
476  to detach the writer thread.
477  */
478  if (sort_param->master && sort_param->sort_info->info->rec_cache.share)
479  remove_io_thread(&sort_param->sort_info->info->rec_cache);
480 
481  /* Readers detach from the share if any. Avoid others to be blocked. */
482  if (sort_param->read_cache.share)
483  remove_io_thread(&sort_param->read_cache);
484 
485  mysql_mutex_lock(&sort_param->sort_info->mutex);
486  if (!--sort_param->sort_info->threads_running)
487  mysql_cond_signal(&sort_param->sort_info->cond);
488  mysql_mutex_unlock(&sort_param->sort_info->mutex);
489  DBUG_PRINT("exit", ("======== ending thread ========"));
490  }
491  my_thread_end();
492  return NULL;
493 }
494 
495 
496 int thr_write_keys(MI_SORT_PARAM *sort_param)
497 {
498  SORT_INFO *sort_info=sort_param->sort_info;
499  MI_CHECK *param=sort_info->param;
500  ulong UNINIT_VAR(length), keys;
501  ulong *rec_per_key_part=param->rec_per_key_part;
502  int got_error=sort_info->got_error;
503  uint i;
504  MI_INFO *info=sort_info->info;
505  MYISAM_SHARE *share=info->s;
506  MI_SORT_PARAM *sinfo;
507  uchar *mergebuf=0;
508  DBUG_ENTER("thr_write_keys");
509  LINT_INIT(length);
510 
511  for (i= 0, sinfo= sort_param ;
512  i < sort_info->total_keys ;
513  i++, sinfo++)
514  {
515  if (!sinfo->sort_keys)
516  {
517  got_error=1;
518  my_free(mi_get_rec_buff_ptr(info, sinfo->rec_buff));
519  continue;
520  }
521  if (!got_error)
522  {
523  mi_set_key_active(share->state.key_map, sinfo->key);
524  if (!sinfo->buffpek.elements)
525  {
526  if (param->testflag & T_VERBOSE)
527  {
528  printf("Key %d - Dumping %u keys\n",sinfo->key+1, sinfo->keys);
529  fflush(stdout);
530  }
531  if (write_index(sinfo, sinfo->sort_keys, sinfo->keys) ||
532  flush_ft_buf(sinfo) || flush_pending_blocks(sinfo))
533  got_error=1;
534  }
535  }
536  my_free(sinfo->sort_keys);
537  my_free(mi_get_rec_buff_ptr(info, sinfo->rec_buff));
538  sinfo->sort_keys=0;
539  }
540 
541  for (i= 0, sinfo= sort_param ;
542  i < sort_info->total_keys ;
543  i++,
544  delete_dynamic(&sinfo->buffpek),
545  close_cached_file(&sinfo->tempfile),
546  close_cached_file(&sinfo->tempfile_for_exceptions),
547  rec_per_key_part+= sinfo->keyinfo->keysegs, sinfo++)
548  {
549  if (got_error)
550  continue;
551  if (sinfo->keyinfo->flag & HA_VAR_LENGTH_KEY)
552  {
553  sinfo->write_keys=write_keys_varlen;
554  sinfo->read_to_buffer=read_to_buffer_varlen;
555  sinfo->write_key=write_merge_key_varlen;
556  }
557  else
558  {
559  sinfo->write_keys=write_keys;
560  sinfo->read_to_buffer=read_to_buffer;
561  sinfo->write_key=write_merge_key;
562  }
563  if (sinfo->buffpek.elements)
564  {
565  uint maxbuffer=sinfo->buffpek.elements-1;
566  if (!mergebuf)
567  {
568  length=param->sort_buffer_length;
569  while (length >= MIN_SORT_BUFFER)
570  {
571  if ((mergebuf= my_malloc(length, MYF(0))))
572  break;
573  length=length*3/4;
574  }
575  if (!mergebuf)
576  {
577  got_error=1;
578  continue;
579  }
580  }
581  keys=length/sinfo->key_length;
582  if (maxbuffer >= MERGEBUFF2)
583  {
584  if (param->testflag & T_VERBOSE)
585  printf("Key %d - Merging %u keys\n",sinfo->key+1, sinfo->keys);
586  if (merge_many_buff(sinfo, keys, (uchar **)mergebuf,
587  dynamic_element(&sinfo->buffpek, 0, BUFFPEK *),
588  (int*) &maxbuffer, &sinfo->tempfile))
589  {
590  got_error=1;
591  continue;
592  }
593  }
594  if (flush_io_cache(&sinfo->tempfile) ||
595  reinit_io_cache(&sinfo->tempfile,READ_CACHE,0L,0,0))
596  {
597  got_error=1;
598  continue;
599  }
600  if (param->testflag & T_VERBOSE)
601  printf("Key %d - Last merge and dumping keys\n", sinfo->key+1);
602  if (merge_index(sinfo, keys, (uchar **)mergebuf,
603  dynamic_element(&sinfo->buffpek,0,BUFFPEK *),
604  maxbuffer,&sinfo->tempfile) ||
605  flush_ft_buf(sinfo) ||
606  flush_pending_blocks(sinfo))
607  {
608  got_error=1;
609  continue;
610  }
611  }
612  if (my_b_inited(&sinfo->tempfile_for_exceptions))
613  {
614  uint key_length;
615 
616  if (param->testflag & T_VERBOSE)
617  printf("Key %d - Dumping 'long' keys\n", sinfo->key+1);
618 
619  if (flush_io_cache(&sinfo->tempfile_for_exceptions) ||
620  reinit_io_cache(&sinfo->tempfile_for_exceptions,READ_CACHE,0L,0,0))
621  {
622  got_error=1;
623  continue;
624  }
625 
626  while (!got_error &&
627  !my_b_read(&sinfo->tempfile_for_exceptions,(uchar*)&key_length,
628  sizeof(key_length)))
629  {
630  uchar ft_buf[HA_FT_MAXBYTELEN + HA_FT_WLEN + 10];
631  if (key_length > sizeof(ft_buf) ||
632  my_b_read(&sinfo->tempfile_for_exceptions, (uchar*)ft_buf,
633  (uint)key_length) ||
634  _mi_ck_write(info, sinfo->key, (uchar*)ft_buf,
635  key_length - info->s->rec_reflength))
636  got_error=1;
637  }
638  }
639  if (!got_error && param->testflag & T_STATISTICS)
640  update_key_parts(sinfo->keyinfo, rec_per_key_part, sinfo->unique,
641  param->stats_method == MI_STATS_METHOD_IGNORE_NULLS ?
642  sinfo->notnull : NULL,
643  (ulonglong) info->state->records);
644  }
645  my_free(mergebuf);
646  DBUG_RETURN(got_error);
647 }
648 
649  /* Write all keys in memory to file for later merge */
650 
651 static int write_keys(MI_SORT_PARAM *info, register uchar **sort_keys,
652  uint count, BUFFPEK *buffpek, IO_CACHE *tempfile)
653 {
654  uchar **end;
655  uint sort_length=info->key_length;
656  DBUG_ENTER("write_keys");
657 
658  my_qsort2((uchar*) sort_keys,count,sizeof(uchar*),(qsort2_cmp) info->key_cmp,
659  info);
660  if (!my_b_inited(tempfile) &&
661  open_cached_file(tempfile, my_tmpdir(info->tmpdir), "ST",
662  DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
663  DBUG_RETURN(1); /* purecov: inspected */
664 
665  buffpek->file_pos=my_b_tell(tempfile);
666  buffpek->count=count;
667 
668  for (end=sort_keys+count ; sort_keys != end ; sort_keys++)
669  {
670  if (my_b_write(tempfile,(uchar*) *sort_keys,(uint) sort_length))
671  DBUG_RETURN(1); /* purecov: inspected */
672  }
673  DBUG_RETURN(0);
674 } /* write_keys */
675 
676 
677 static inline int
678 my_var_write(MI_SORT_PARAM *info, IO_CACHE *to_file, uchar *bufs)
679 {
680  int err;
681  uint16 len = _mi_keylength(info->keyinfo, (uchar*) bufs);
682 
683  /* The following is safe as this is a local file */
684  if ((err= my_b_write(to_file, (uchar*)&len, sizeof(len))))
685  return (err);
686  if ((err= my_b_write(to_file,bufs, (uint) len)))
687  return (err);
688  return (0);
689 }
690 
691 
692 static int write_keys_varlen(MI_SORT_PARAM *info,
693  register uchar **sort_keys,
694  uint count, BUFFPEK *buffpek,
695  IO_CACHE *tempfile)
696 {
697  uchar **end;
698  int err;
699  DBUG_ENTER("write_keys_varlen");
700 
701  my_qsort2((uchar*) sort_keys,count,sizeof(uchar*),(qsort2_cmp) info->key_cmp,
702  info);
703  if (!my_b_inited(tempfile) &&
704  open_cached_file(tempfile, my_tmpdir(info->tmpdir), "ST",
705  DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
706  DBUG_RETURN(1); /* purecov: inspected */
707 
708  buffpek->file_pos=my_b_tell(tempfile);
709  buffpek->count=count;
710  for (end=sort_keys+count ; sort_keys != end ; sort_keys++)
711  {
712  if ((err= my_var_write(info,tempfile, (uchar*) *sort_keys)))
713  DBUG_RETURN(err);
714  }
715  DBUG_RETURN(0);
716 } /* write_keys_varlen */
717 
718 
719 static int write_key(MI_SORT_PARAM *info, uchar *key, IO_CACHE *tempfile)
720 {
721  uint key_length=info->real_key_length;
722  DBUG_ENTER("write_key");
723 
724  if (!my_b_inited(tempfile) &&
725  open_cached_file(tempfile, my_tmpdir(info->tmpdir), "ST",
726  DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
727  DBUG_RETURN(1);
728 
729  if (my_b_write(tempfile,(uchar*)&key_length,sizeof(key_length)) ||
730  my_b_write(tempfile,(uchar*)key,(uint) key_length))
731  DBUG_RETURN(1);
732  DBUG_RETURN(0);
733 } /* write_key */
734 
735 
736 /* Write index */
737 
738 static int write_index(MI_SORT_PARAM *info, register uchar **sort_keys,
739  register uint count)
740 {
741  DBUG_ENTER("write_index");
742 
743  my_qsort2((uchar*) sort_keys,(size_t) count,sizeof(uchar*),
744  (qsort2_cmp) info->key_cmp,info);
745  while (count--)
746  {
747  if ((*info->key_write)(info,*sort_keys++))
748  DBUG_RETURN(-1); /* purecov: inspected */
749  }
750  DBUG_RETURN(0);
751 } /* write_index */
752 
753 
754  /* Merge buffers to make < MERGEBUFF2 buffers */
755 
756 static int merge_many_buff(MI_SORT_PARAM *info, uint keys,
757  uchar **sort_keys, BUFFPEK *buffpek,
758  int *maxbuffer, IO_CACHE *t_file)
759 {
760  register int i;
761  IO_CACHE t_file2, *from_file, *to_file, *temp;
762  BUFFPEK *lastbuff;
763  DBUG_ENTER("merge_many_buff");
764 
765  if (*maxbuffer < MERGEBUFF2)
766  DBUG_RETURN(0); /* purecov: inspected */
767  if (flush_io_cache(t_file) ||
768  open_cached_file(&t_file2,my_tmpdir(info->tmpdir),"ST",
769  DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
770  DBUG_RETURN(1); /* purecov: inspected */
771 
772  from_file= t_file ; to_file= &t_file2;
773  while (*maxbuffer >= MERGEBUFF2)
774  {
775  reinit_io_cache(from_file,READ_CACHE,0L,0,0);
776  reinit_io_cache(to_file,WRITE_CACHE,0L,0,0);
777  lastbuff=buffpek;
778  for (i=0 ; i <= *maxbuffer-MERGEBUFF*3/2 ; i+=MERGEBUFF)
779  {
780  if (merge_buffers(info,keys,from_file,to_file,sort_keys,lastbuff++,
781  buffpek+i,buffpek+i+MERGEBUFF-1))
782  goto cleanup;
783  }
784  if (merge_buffers(info,keys,from_file,to_file,sort_keys,lastbuff++,
785  buffpek+i,buffpek+ *maxbuffer))
786  break; /* purecov: inspected */
787  if (flush_io_cache(to_file))
788  break; /* purecov: inspected */
789  temp=from_file; from_file=to_file; to_file=temp;
790  *maxbuffer= (int) (lastbuff-buffpek)-1;
791  }
792 cleanup:
793  close_cached_file(to_file); /* This holds old result */
794  if (to_file == t_file)
795  {
796  DBUG_ASSERT(t_file2.type == WRITE_CACHE);
797  *t_file=t_file2; /* Copy result file */
798  t_file->current_pos= &t_file->write_pos;
799  t_file->current_end= &t_file->write_end;
800  }
801 
802  DBUG_RETURN(*maxbuffer >= MERGEBUFF2); /* Return 1 if interrupted */
803 } /* merge_many_buff */
804 
805 
806 /*
807  Read data to buffer
808 
809  SYNOPSIS
810  read_to_buffer()
811  fromfile File to read from
812  buffpek Where to read from
813  sort_length max length to read
814  RESULT
815  > 0 Ammount of bytes read
816  -1 Error
817 */
818 
819 static uint read_to_buffer(IO_CACHE *fromfile, BUFFPEK *buffpek,
820  uint sort_length)
821 {
822  register uint count;
823  uint length;
824 
825  if ((count=(uint) MY_MIN((ha_rows) buffpek->max_keys,buffpek->count)))
826  {
827  if (mysql_file_pread(fromfile->file, (uchar*) buffpek->base,
828  (length= sort_length*count),
829  buffpek->file_pos, MYF_RW))
830  return((uint) -1); /* purecov: inspected */
831  buffpek->key=buffpek->base;
832  buffpek->file_pos+= length; /* New filepos */
833  buffpek->count-= count;
834  buffpek->mem_count= count;
835  }
836  return (count*sort_length);
837 } /* read_to_buffer */
838 
839 static uint read_to_buffer_varlen(IO_CACHE *fromfile, BUFFPEK *buffpek,
840  uint sort_length)
841 {
842  register uint count;
843  uint16 length_of_key = 0;
844  uint idx;
845  uchar *buffp;
846 
847  if ((count=(uint) MY_MIN((ha_rows) buffpek->max_keys, buffpek->count)))
848  {
849  buffp = buffpek->base;
850 
851  for (idx=1;idx<=count;idx++)
852  {
853  if (mysql_file_pread(fromfile->file, (uchar*)&length_of_key,
854  sizeof(length_of_key), buffpek->file_pos, MYF_RW))
855  return((uint) -1);
856  buffpek->file_pos+=sizeof(length_of_key);
857  if (mysql_file_pread(fromfile->file, (uchar*) buffp,
858  length_of_key, buffpek->file_pos, MYF_RW))
859  return((uint) -1);
860  buffpek->file_pos+=length_of_key;
861  buffp = buffp + sort_length;
862  }
863  buffpek->key=buffpek->base;
864  buffpek->count-= count;
865  buffpek->mem_count= count;
866  }
867  return (count*sort_length);
868 } /* read_to_buffer_varlen */
869 
870 
871 static int write_merge_key_varlen(MI_SORT_PARAM *info,
872  IO_CACHE *to_file, uchar* key,
873  uint sort_length, uint count)
874 {
875  uint idx;
876  uchar *bufs = key;
877 
878  for (idx=1;idx<=count;idx++)
879  {
880  int err;
881  if ((err= my_var_write(info, to_file, bufs)))
882  return (err);
883  bufs=bufs+sort_length;
884  }
885  return(0);
886 }
887 
888 
889 static int write_merge_key(MI_SORT_PARAM *info __attribute__((unused)),
890  IO_CACHE *to_file, uchar *key,
891  uint sort_length, uint count)
892 {
893  return my_b_write(to_file, key, (size_t) sort_length*count);
894 }
895 
896 /*
897  Merge buffers to one buffer
898  If to_file == 0 then use info->key_write
899 */
900 
901 static int
902 merge_buffers(MI_SORT_PARAM *info, uint keys, IO_CACHE *from_file,
903  IO_CACHE *to_file, uchar **sort_keys, BUFFPEK *lastbuff,
904  BUFFPEK *Fb, BUFFPEK *Tb)
905 {
906  int error;
907  uint sort_length,maxcount;
908  ha_rows count;
909  my_off_t UNINIT_VAR(to_start_filepos);
910  uchar *strpos;
911  BUFFPEK *buffpek,**refpek;
912  QUEUE queue;
913  volatile int *killed= killed_ptr(info->sort_info->param);
914  DBUG_ENTER("merge_buffers");
915 
916  count=error=0;
917  maxcount=keys/((uint) (Tb-Fb) +1);
918  DBUG_ASSERT(maxcount > 0);
919  LINT_INIT(to_start_filepos);
920  if (to_file)
921  to_start_filepos=my_b_tell(to_file);
922  strpos=(uchar*) sort_keys;
923  sort_length=info->key_length;
924 
925  if (init_queue(&queue,(uint) (Tb-Fb)+1,offsetof(BUFFPEK,key),0,
926  (int (*)(void*, uchar *,uchar*)) info->key_cmp,
927  (void*) info))
928  DBUG_RETURN(1); /* purecov: inspected */
929 
930  for (buffpek= Fb ; buffpek <= Tb ; buffpek++)
931  {
932  count+= buffpek->count;
933  buffpek->base= strpos;
934  buffpek->max_keys=maxcount;
935  strpos+= (uint) (error=(int) info->read_to_buffer(from_file,buffpek,
936  sort_length));
937  if (error == -1)
938  goto err; /* purecov: inspected */
939  queue_insert(&queue,(uchar*) buffpek);
940  }
941 
942  while (queue.elements > 1)
943  {
944  for (;;)
945  {
946  if (*killed)
947  {
948  error=1; goto err;
949  }
950  buffpek=(BUFFPEK*) queue_top(&queue);
951  if (to_file)
952  {
953  if (info->write_key(info,to_file,(uchar*) buffpek->key,
954  (uint) sort_length,1))
955  {
956  error=1; goto err; /* purecov: inspected */
957  }
958  }
959  else
960  {
961  if ((*info->key_write)(info,(void*) buffpek->key))
962  {
963  error=1; goto err; /* purecov: inspected */
964  }
965  }
966  buffpek->key+=sort_length;
967  if (! --buffpek->mem_count)
968  {
969  if (!(error=(int) info->read_to_buffer(from_file,buffpek,sort_length)))
970  {
971  uchar *base=buffpek->base;
972  uint max_keys=buffpek->max_keys;
973 
974  (void) queue_remove(&queue,0);
975 
976  /* Put room used by buffer to use in other buffer */
977  for (refpek= (BUFFPEK**) &queue_top(&queue);
978  refpek <= (BUFFPEK**) &queue_end(&queue);
979  refpek++)
980  {
981  buffpek= *refpek;
982  if (buffpek->base+buffpek->max_keys*sort_length == base)
983  {
984  buffpek->max_keys+=max_keys;
985  break;
986  }
987  else if (base+max_keys*sort_length == buffpek->base)
988  {
989  buffpek->base=base;
990  buffpek->max_keys+=max_keys;
991  break;
992  }
993  }
994  break; /* One buffer have been removed */
995  }
996  }
997  else if (error == -1)
998  goto err; /* purecov: inspected */
999  queue_replaced(&queue); /* Top element has been replaced */
1000  }
1001  }
1002  buffpek=(BUFFPEK*) queue_top(&queue);
1003  buffpek->base=(uchar *) sort_keys;
1004  buffpek->max_keys=keys;
1005  do
1006  {
1007  if (to_file)
1008  {
1009  if (info->write_key(info,to_file,(uchar*) buffpek->key,
1010  sort_length,buffpek->mem_count))
1011  {
1012  error=1; goto err; /* purecov: inspected */
1013  }
1014  }
1015  else
1016  {
1017  register uchar *end;
1018  strpos= buffpek->key;
1019  for (end=strpos+buffpek->mem_count*sort_length;
1020  strpos != end ;
1021  strpos+=sort_length)
1022  {
1023  if ((*info->key_write)(info,(void*) strpos))
1024  {
1025  error=1; goto err; /* purecov: inspected */
1026  }
1027  }
1028  }
1029  }
1030  while ((error=(int) info->read_to_buffer(from_file,buffpek,sort_length)) != -1 &&
1031  error != 0);
1032 
1033  lastbuff->count=count;
1034  if (to_file)
1035  lastbuff->file_pos=to_start_filepos;
1036 err:
1037  delete_queue(&queue);
1038  DBUG_RETURN(error);
1039 } /* merge_buffers */
1040 
1041 
1042  /* Do a merge to output-file (save only positions) */
1043 
1044 static int
1045 merge_index(MI_SORT_PARAM *info, uint keys, uchar **sort_keys,
1046  BUFFPEK *buffpek, int maxbuffer, IO_CACHE *tempfile)
1047 {
1048  DBUG_ENTER("merge_index");
1049  if (merge_buffers(info,keys,tempfile,(IO_CACHE*) 0,sort_keys,buffpek,buffpek,
1050  buffpek+maxbuffer))
1051  DBUG_RETURN(1); /* purecov: inspected */
1052  DBUG_RETURN(0);
1053 } /* merge_index */
1054 
1055 static int
1056 flush_ft_buf(MI_SORT_PARAM *info)
1057 {
1058  int err=0;
1059  if (info->sort_info->ft_buf)
1060  {
1061  err=sort_ft_buf_flush(info);
1062  my_free(info->sort_info->ft_buf);
1063  info->sort_info->ft_buf=0;
1064  }
1065  return err;
1066 }
1067