MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
uniques.cc
1 /* Copyright (c) 2001, 2011, Oracle and/or its affiliates. All rights reserved.
2 
3  This program is free software; you can redistribute it and/or modify
4  it under the terms of the GNU General Public License as published by
5  the Free Software Foundation; version 2 of the License.
6 
7  This program is distributed in the hope that it will be useful,
8  but WITHOUT ANY WARRANTY; without even the implied warranty of
9  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  GNU General Public License for more details.
11 
12  You should have received a copy of the GNU General Public License
13  along with this program; if not, write to the Free Software
14  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
15 
16 /*
17  Function to handle quick removal of duplicates
18  This code is used when doing multi-table deletes to find the rows in
19  reference tables that needs to be deleted.
20 
21  The basic idea is as follows:
22 
23  Store first all strings in a binary tree, ignoring duplicates.
24  When the tree uses more memory than 'max_heap_table_size',
25  write the tree (in sorted order) out to disk and start with a new tree.
26  When all data has been generated, merge the trees (removing any found
27  duplicates).
28 
29  The unique entries will be returned in sort order, to ensure that we do the
30  deletes in disk order.
31 */
32 
33 #include "sql_priv.h"
34 #include "unireg.h"
35 #include "sql_sort.h"
36 #include "queues.h" // QUEUE
37 #include "my_tree.h" // element_count
38 #include "sql_class.h" // Unique
39 
40 int unique_write_to_file(uchar* key, element_count count, Unique *unique)
41 {
42  /*
43  Use unique->size (size of element stored in the tree) and not
44  unique->tree.size_of_element. The latter is different from unique->size
45  when tree implementation chooses to store pointer to key in TREE_ELEMENT
46  (instead of storing the element itself there)
47  */
48  return my_b_write(&unique->file, key, unique->size) ? 1 : 0;
49 }
50 
51 int unique_write_to_ptrs(uchar* key, element_count count, Unique *unique)
52 {
53  memcpy(unique->record_pointers, key, unique->size);
54  unique->record_pointers+=unique->size;
55  return 0;
56 }
57 
58 Unique::Unique(qsort_cmp2 comp_func, void * comp_func_fixed_arg,
59  uint size_arg, ulonglong max_in_memory_size_arg)
60  :max_in_memory_size(max_in_memory_size_arg),
61  record_pointers(NULL),
62  size(size_arg),
63  elements(0)
64 {
65  my_b_clear(&file);
66  init_tree(&tree, (ulong) (max_in_memory_size / 16), 0, size, comp_func, 0,
67  NULL, comp_func_fixed_arg);
68  /* If the following fail's the next add will also fail */
69  my_init_dynamic_array(&file_ptrs, sizeof(BUFFPEK), 16, 16);
70  /*
71  If you change the following, change it in get_max_elements function, too.
72  */
73  max_elements= (ulong) (max_in_memory_size /
74  ALIGN_SIZE(sizeof(TREE_ELEMENT)+size));
75  (void) open_cached_file(&file, mysql_tmpdir,TEMP_PREFIX, DISK_BUFFER_SIZE,
76  MYF(MY_WME));
77 }
78 
79 
80 /*
81  Calculate log2(n!)
82 
83  NOTES
84  Stirling's approximate formula is used:
85 
86  n! ~= sqrt(2*M_PI*n) * (n/M_E)^n
87 
88  Derivation of formula used for calculations is as follows:
89 
90  log2(n!) = log(n!)/log(2) = log(sqrt(2*M_PI*n)*(n/M_E)^n) / log(2) =
91 
92  = (log(2*M_PI*n)/2 + n*log(n/M_E)) / log(2).
93 */
94 
95 inline double log2_n_fact(double x)
96 {
97  return (log(2*M_PI*x)/2 + x*log(x/M_E)) / M_LN2;
98 }
99 
100 
101 /*
102  Calculate cost of merge_buffers function call for given sequence of
103  input stream lengths and store the number of rows in result stream in *last.
104 
105  SYNOPSIS
106  get_merge_buffers_cost()
107  buff_elems Array of #s of elements in buffers
108  elem_size Size of element stored in buffer
109  first Pointer to first merged element size
110  last Pointer to last merged element size
111 
112  RETURN
113  Cost of merge_buffers operation in disk seeks.
114 
115  NOTES
116  It is assumed that no rows are eliminated during merge.
117  The cost is calculated as
118 
119  cost(read_and_write) + cost(merge_comparisons).
120 
121  All bytes in the sequences is read and written back during merge so cost
122  of disk io is 2*elem_size*total_buf_elems/IO_SIZE (2 is for read + write)
123 
124  For comparisons cost calculations we assume that all merged sequences have
125  the same length, so each of total_buf_size elements will be added to a sort
126  heap with (n_buffers-1) elements. This gives the comparison cost:
127 
128  total_buf_elems * log2(n_buffers) * ROWID_COMPARE_COST;
129 */
130 
131 static double get_merge_buffers_cost(uint *buff_elems, uint elem_size,
132  uint *first, uint *last)
133 {
134  uint total_buf_elems= 0;
135  for (uint *pbuf= first; pbuf <= last; pbuf++)
136  total_buf_elems+= *pbuf;
137  *last= total_buf_elems;
138 
139  size_t n_buffers= last - first + 1;
140 
141  /* Using log2(n)=log(n)/log(2) formula */
142  return 2*((double)total_buf_elems*elem_size) / IO_SIZE +
143  total_buf_elems*log((double) n_buffers) * ROWID_COMPARE_COST / M_LN2;
144 }
145 
146 
147 /*
148  Calculate cost of merging buffers into one in Unique::get, i.e. calculate
149  how long (in terms of disk seeks) the two calls
150  merge_many_buffs(...);
151  merge_buffers(...);
152  will take.
153 
154  SYNOPSIS
155  get_merge_many_buffs_cost()
156  buffer buffer space for temporary data, at least
157  Unique::get_cost_calc_buff_size bytes
158  maxbuffer # of full buffers
159  max_n_elems # of elements in first maxbuffer buffers
160  last_n_elems # of elements in last buffer
161  elem_size size of buffer element
162 
163  NOTES
164  maxbuffer+1 buffers are merged, where first maxbuffer buffers contain
165  max_n_elems elements each and last buffer contains last_n_elems elements.
166 
167  The current implementation does a dumb simulation of merge_many_buffs
168  function actions.
169 
170  RETURN
171  Cost of merge in disk seeks.
172 */
173 
174 static double get_merge_many_buffs_cost(uint *buffer,
175  uint maxbuffer, uint max_n_elems,
176  uint last_n_elems, int elem_size)
177 {
178  register int i;
179  double total_cost= 0.0;
180  uint *buff_elems= buffer; /* #s of elements in each of merged sequences */
181 
182  /*
183  Set initial state: first maxbuffer sequences contain max_n_elems elements
184  each, last sequence contains last_n_elems elements.
185  */
186  for (i = 0; i < (int)maxbuffer; i++)
187  buff_elems[i]= max_n_elems;
188  buff_elems[maxbuffer]= last_n_elems;
189 
190  /*
191  Do it exactly as merge_many_buff function does, calling
192  get_merge_buffers_cost to get cost of merge_buffers.
193  */
194  if (maxbuffer >= MERGEBUFF2)
195  {
196  while (maxbuffer >= MERGEBUFF2)
197  {
198  uint lastbuff= 0;
199  for (i = 0; i <= (int) maxbuffer - MERGEBUFF*3/2; i += MERGEBUFF)
200  {
201  total_cost+=get_merge_buffers_cost(buff_elems, elem_size,
202  buff_elems + i,
203  buff_elems + i + MERGEBUFF-1);
204  lastbuff++;
205  }
206  total_cost+=get_merge_buffers_cost(buff_elems, elem_size,
207  buff_elems + i,
208  buff_elems + maxbuffer);
209  maxbuffer= lastbuff;
210  }
211  }
212 
213  /* Simulate final merge_buff call. */
214  total_cost += get_merge_buffers_cost(buff_elems, elem_size,
215  buff_elems, buff_elems + maxbuffer);
216  return total_cost;
217 }
218 
219 
220 /*
221  Calculate cost of using Unique for processing nkeys elements of size
222  key_size using max_in_memory_size memory.
223 
224  SYNOPSIS
225  Unique::get_use_cost()
226  buffer space for temporary data, use Unique::get_cost_calc_buff_size
227  to get # bytes needed.
228  nkeys #of elements in Unique
229  key_size size of each elements in bytes
230  max_in_memory_size amount of memory Unique will be allowed to use
231 
232  RETURN
233  Cost in disk seeks.
234 
235  NOTES
236  cost(using_unqiue) =
237  cost(create_trees) + (see #1)
238  cost(merge) + (see #2)
239  cost(read_result) (see #3)
240 
241  1. Cost of trees creation
242  For each Unique::put operation there will be 2*log2(n+1) elements
243  comparisons, where n runs from 1 tree_size (we assume that all added
244  elements are different). Together this gives:
245 
246  n_compares = 2*(log2(2) + log2(3) + ... + log2(N+1)) = 2*log2((N+1)!)
247 
248  then cost(tree_creation) = n_compares*ROWID_COMPARE_COST;
249 
250  Total cost of creating trees:
251  (n_trees - 1)*max_size_tree_cost + non_max_size_tree_cost.
252 
253  Approximate value of log2(N!) is calculated by log2_n_fact function.
254 
255  2. Cost of merging.
256  If only one tree is created by Unique no merging will be necessary.
257  Otherwise, we model execution of merge_many_buff function and count
258  #of merges. (The reason behind this is that number of buffers is small,
259  while size of buffers is big and we don't want to loose precision with
260  O(x)-style formula)
261 
262  3. If only one tree is created by Unique no disk io will happen.
263  Otherwise, ceil(key_len*n_keys) disk seeks are necessary. We assume
264  these will be random seeks.
265 */
266 
267 double Unique::get_use_cost(uint *buffer, uint nkeys, uint key_size,
268  ulonglong max_in_memory_size)
269 {
270  ulong max_elements_in_tree;
271  ulong last_tree_elems;
272  int n_full_trees; /* number of trees in unique - 1 */
273 
274  max_elements_in_tree= ((ulong) max_in_memory_size /
275  ALIGN_SIZE(sizeof(TREE_ELEMENT)+key_size));
276 
277  n_full_trees= nkeys / max_elements_in_tree;
278  last_tree_elems= nkeys % max_elements_in_tree;
279 
280  /* Calculate cost of creating trees */
281  double result= 2 * log2_n_fact(last_tree_elems + 1.0);
282  if (n_full_trees)
283  result+= n_full_trees * log2_n_fact(max_elements_in_tree + 1.0);
284  result*= ROWID_COMPARE_COST;
285 
286  DBUG_PRINT("info",("unique trees sizes: %u=%u*%lu + %lu", nkeys,
287  n_full_trees, n_full_trees?max_elements_in_tree:0,
288  last_tree_elems));
289 
290  if (!n_full_trees)
291  return result;
292 
293  /*
294  There is more then one tree and merging is necessary.
295  First, add cost of writing all trees to disk, assuming that all disk
296  writes are sequential.
297  */
298  result += DISK_SEEK_BASE_COST * n_full_trees *
299  ceil(((double) key_size)*max_elements_in_tree / IO_SIZE);
300  result += DISK_SEEK_BASE_COST * ceil(((double) key_size)*last_tree_elems / IO_SIZE);
301 
302  /* Cost of merge */
303  double merge_cost= get_merge_many_buffs_cost(buffer, n_full_trees,
304  max_elements_in_tree,
305  last_tree_elems, key_size);
306  if (merge_cost < 0.0)
307  return merge_cost;
308 
309  result += merge_cost;
310  /*
311  Add cost of reading the resulting sequence, assuming there were no
312  duplicate elements.
313  */
314  result += ceil((double)key_size*nkeys/IO_SIZE);
315 
316  return result;
317 }
318 
319 Unique::~Unique()
320 {
321  close_cached_file(&file);
322  delete_tree(&tree);
323  delete_dynamic(&file_ptrs);
324 }
325 
326 
327  /* Write tree to disk; clear tree */
328 bool Unique::flush()
329 {
330  BUFFPEK file_ptr;
331  elements+= tree.elements_in_tree;
332  file_ptr.count=tree.elements_in_tree;
333  file_ptr.file_pos=my_b_tell(&file);
334 
335  if (tree_walk(&tree, (tree_walk_action) unique_write_to_file,
336  (void*) this, left_root_right) ||
337  insert_dynamic(&file_ptrs, &file_ptr))
338  return 1;
339  delete_tree(&tree);
340  return 0;
341 }
342 
343 
344 /*
345  Clear the tree and the file.
346  You must call reset() if you want to reuse Unique after walk().
347 */
348 
349 void
350 Unique::reset()
351 {
352  reset_tree(&tree);
353  /*
354  If elements != 0, some trees were stored in the file (see how
355  flush() works). Note, that we can not count on my_b_tell(&file) == 0
356  here, because it can return 0 right after walk(), and walk() does not
357  reset any Unique member.
358  */
359  if (elements)
360  {
361  reset_dynamic(&file_ptrs);
362  reinit_io_cache(&file, WRITE_CACHE, 0L, 0, 1);
363  }
364  elements= 0;
365 }
366 
367 /*
368  The comparison function, passed to queue_init() in merge_walk() and in
369  merge_buffers() when the latter is called from Uniques::get() must
370  use comparison function of Uniques::tree, but compare members of struct
371  BUFFPEK.
372 */
373 
374 C_MODE_START
375 
376 static int buffpek_compare(void *arg, uchar *key_ptr1, uchar *key_ptr2)
377 {
379  return ctx->key_compare(ctx->key_compare_arg,
380  *((uchar **) key_ptr1), *((uchar **)key_ptr2));
381 }
382 
383 C_MODE_END
384 
385 
386 /*
387  DESCRIPTION
388 
389  Function is very similar to merge_buffers, but instead of writing sorted
390  unique keys to the output file, it invokes walk_action for each key.
391  This saves I/O if you need to pass through all unique keys only once.
392 
393  SYNOPSIS
394  merge_walk()
395  All params are 'IN' (but see comment for begin, end):
396  merge_buffer buffer to perform cached piece-by-piece loading
397  of trees; initially the buffer is empty
398  merge_buffer_size size of merge_buffer. Must be aligned with
399  key_length
400  key_length size of tree element; key_length * (end - begin)
401  must be less or equal than merge_buffer_size.
402  begin pointer to BUFFPEK struct for the first tree.
403  end pointer to BUFFPEK struct for the last tree;
404  end > begin and [begin, end) form a consecutive
405  range. BUFFPEKs structs in that range are used and
406  overwritten in merge_walk().
407  walk_action element visitor. Action is called for each unique
408  key.
409  walk_action_arg argument to walk action. Passed to it on each call.
410  compare elements comparison function
411  compare_arg comparison function argument
412  file file with all trees dumped. Trees in the file
413  must contain sorted unique values. Cache must be
414  initialized in read mode.
415  RETURN VALUE
416  0 ok
417  <> 0 error
418 */
419 
420 static bool merge_walk(uchar *merge_buffer, ulong merge_buffer_size,
421  uint key_length, BUFFPEK *begin, BUFFPEK *end,
422  tree_walk_action walk_action, void *walk_action_arg,
423  qsort_cmp2 compare, const void *compare_arg,
424  IO_CACHE *file)
425 {
426  BUFFPEK_COMPARE_CONTEXT compare_context = { compare, compare_arg };
427  QUEUE queue;
428  if (end <= begin ||
429  merge_buffer_size < (ulong) (key_length * (end - begin + 1)) ||
430  init_queue(&queue, (uint) (end - begin), offsetof(BUFFPEK, key), 0,
431  buffpek_compare, &compare_context))
432  return 1;
433  /* we need space for one key when a piece of merge buffer is re-read */
434  merge_buffer_size-= key_length;
435  uchar *save_key_buff= merge_buffer + merge_buffer_size;
436  uint max_key_count_per_piece= (uint) (merge_buffer_size/(end-begin) /
437  key_length);
438  /* if piece_size is aligned reuse_freed_buffer will always hit */
439  uint piece_size= max_key_count_per_piece * key_length;
440  uint bytes_read; /* to hold return value of read_to_buffer */
441  BUFFPEK *top;
442  int res= 1;
443  /*
444  Invariant: queue must contain top element from each tree, until a tree
445  is not completely walked through.
446  Here we're forcing the invariant, inserting one element from each tree
447  to the queue.
448  */
449  for (top= begin; top != end; ++top)
450  {
451  top->base= merge_buffer + (top - begin) * piece_size;
452  top->max_keys= max_key_count_per_piece;
453  bytes_read= read_to_buffer(file, top, key_length);
454  if (bytes_read == (uint) (-1))
455  goto end;
456  DBUG_ASSERT(bytes_read);
457  queue_insert(&queue, (uchar *) top);
458  }
459  top= (BUFFPEK *) queue_top(&queue);
460  while (queue.elements > 1)
461  {
462  /*
463  Every iteration one element is removed from the queue, and one is
464  inserted by the rules of the invariant. If two adjacent elements on
465  the top of the queue are not equal, biggest one is unique, because all
466  elements in each tree are unique. Action is applied only to unique
467  elements.
468  */
469  void *old_key= top->key;
470  /*
471  read next key from the cache or from the file and push it to the
472  queue; this gives new top.
473  */
474  top->key+= key_length;
475  if (--top->mem_count)
476  queue_replaced(&queue);
477  else /* next piece should be read */
478  {
479  /* save old_key not to overwrite it in read_to_buffer */
480  memcpy(save_key_buff, old_key, key_length);
481  old_key= save_key_buff;
482  bytes_read= read_to_buffer(file, top, key_length);
483  if (bytes_read == (uint) (-1))
484  goto end;
485  else if (bytes_read > 0) /* top->key, top->mem_count are reset */
486  queue_replaced(&queue); /* in read_to_buffer */
487  else
488  {
489  /*
490  Tree for old 'top' element is empty: remove it from the queue and
491  give all its memory to the nearest tree.
492  */
493  queue_remove(&queue, 0);
494  reuse_freed_buff(&queue, top, key_length);
495  }
496  }
497  top= (BUFFPEK *) queue_top(&queue);
498  /* new top has been obtained; if old top is unique, apply the action */
499  if (compare(compare_arg, old_key, top->key))
500  {
501  if (walk_action(old_key, 1, walk_action_arg))
502  goto end;
503  }
504  }
505  /*
506  Applying walk_action to the tail of the last tree: this is safe because
507  either we had only one tree in the beginning, either we work with the
508  last tree in the queue.
509  */
510  do
511  {
512  do
513  {
514  if (walk_action(top->key, 1, walk_action_arg))
515  goto end;
516  top->key+= key_length;
517  }
518  while (--top->mem_count);
519  bytes_read= read_to_buffer(file, top, key_length);
520  if (bytes_read == (uint) (-1))
521  goto end;
522  }
523  while (bytes_read);
524  res= 0;
525 end:
526  delete_queue(&queue);
527  return res;
528 }
529 
530 
531 /*
532  DESCRIPTION
533  Walks consecutively through all unique elements:
534  if all elements are in memory, then it simply invokes 'tree_walk', else
535  all flushed trees are loaded to memory piece-by-piece, pieces are
536  sorted, and action is called for each unique value.
537  Note: so as merging resets file_ptrs state, this method can change
538  internal Unique state to undefined: if you want to reuse Unique after
539  walk() you must call reset() first!
540  SYNOPSIS
541  Unique:walk()
542  All params are 'IN':
543  action function-visitor, typed in include/my_tree.h
544  function is called for each unique element
545  arg argument for visitor, which is passed to it on each call
546  RETURN VALUE
547  0 OK
548  <> 0 error
549  */
550 
551 bool Unique::walk(tree_walk_action action, void *walk_action_arg)
552 {
553  int res;
554  uchar *merge_buffer;
555 
556  if (elements == 0) /* the whole tree is in memory */
557  return tree_walk(&tree, action, walk_action_arg, left_root_right);
558 
559  /* flush current tree to the file to have some memory for merge buffer */
560  if (flush())
561  return 1;
562  if (flush_io_cache(&file) || reinit_io_cache(&file, READ_CACHE, 0L, 0, 0))
563  return 1;
564  if (!(merge_buffer= (uchar *) my_malloc((ulong) max_in_memory_size, MYF(0))))
565  return 1;
566  res= merge_walk(merge_buffer, (ulong) max_in_memory_size, size,
567  (BUFFPEK *) file_ptrs.buffer,
568  (BUFFPEK *) file_ptrs.buffer + file_ptrs.elements,
569  action, walk_action_arg,
570  tree.compare, tree.custom_arg, &file);
571  my_free(merge_buffer);
572  return res;
573 }
574 
575 /*
576  Modify the TABLE element so that when one calls init_records()
577  the rows will be read in priority order.
578 */
579 
580 bool Unique::get(TABLE *table)
581 {
582  table->sort.found_records=elements+tree.elements_in_tree;
583 
584  if (my_b_tell(&file) == 0)
585  {
586  /* Whole tree is in memory; Don't use disk if you don't need to */
587  DBUG_ASSERT(table->sort.record_pointers == NULL);
588  if ((record_pointers=table->sort.record_pointers= (uchar*)
589  my_malloc(size * tree.elements_in_tree, MYF(0))))
590  {
591  (void) tree_walk(&tree, (tree_walk_action) unique_write_to_ptrs,
592  this, left_root_right);
593  return 0;
594  }
595  }
596  /* Not enough memory; Save the result to file && free memory used by tree */
597  if (flush())
598  return 1;
599 
600  IO_CACHE *outfile=table->sort.io_cache;
601  BUFFPEK *file_ptr= (BUFFPEK*) file_ptrs.buffer;
602  uint maxbuffer= file_ptrs.elements - 1;
603  uchar *sort_buffer;
604  my_off_t save_pos;
605  bool error=1;
606 
607  /* Open cached file if it isn't open */
608  DBUG_ASSERT(table->sort.io_cache == NULL);
609  outfile=table->sort.io_cache=(IO_CACHE*) my_malloc(sizeof(IO_CACHE),
610  MYF(MY_ZEROFILL));
611 
612  if (!outfile || (! my_b_inited(outfile) &&
613  open_cached_file(outfile,mysql_tmpdir,TEMP_PREFIX,READ_RECORD_BUFFER,
614  MYF(MY_WME))))
615  return 1;
616  reinit_io_cache(outfile,WRITE_CACHE,0L,0,0);
617 
618  Sort_param sort_param;
619  sort_param.max_rows= elements;
620  sort_param.sort_form=table;
621  sort_param.rec_length= sort_param.sort_length= sort_param.ref_length= size;
622  sort_param.max_keys_per_buffer=
623  (uint) (max_in_memory_size / sort_param.sort_length);
624  sort_param.not_killable=1;
625 
626  if (!(sort_buffer=(uchar*) my_malloc((sort_param.max_keys_per_buffer + 1) *
627  sort_param.sort_length,
628  MYF(0))))
629  return 1;
630  sort_param.unique_buff= sort_buffer+(sort_param.max_keys_per_buffer *
631  sort_param.sort_length);
632 
633  sort_param.compare= (qsort2_cmp) buffpek_compare;
634  sort_param.cmp_context.key_compare= tree.compare;
635  sort_param.cmp_context.key_compare_arg= tree.custom_arg;
636 
637  /* Merge the buffers to one file, removing duplicates */
638  if (merge_many_buff(&sort_param,sort_buffer,file_ptr,&maxbuffer,&file))
639  goto err;
640  if (flush_io_cache(&file) ||
641  reinit_io_cache(&file,READ_CACHE,0L,0,0))
642  goto err;
643  if (merge_buffers(&sort_param, &file, outfile, sort_buffer, file_ptr,
644  file_ptr, file_ptr+maxbuffer,0))
645  goto err;
646  error=0;
647 err:
648  my_free(sort_buffer);
649  if (flush_io_cache(outfile))
650  error=1;
651 
652  /* Setup io_cache for reading */
653  save_pos=outfile->pos_in_file;
654  if (reinit_io_cache(outfile,READ_CACHE,0L,0,0))
655  error=1;
656  outfile->end_of_file=save_pos;
657  return error;
658 }