MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
mi_write.c
1 /*
2  Copyright (c) 2000, 2011, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
16 
17 /* Write a row to a MyISAM table */
18 
19 #include "fulltext.h"
20 #include "rt_index.h"
21 
22 #define MAX_POINTER_LENGTH 8
23 
24  /* Functions declared in this file */
25 
26 static int w_search(MI_INFO *info,MI_KEYDEF *keyinfo,
27  uint comp_flag, uchar *key,
28  uint key_length, my_off_t pos, uchar *father_buff,
29  uchar *father_keypos, my_off_t father_page,
30  my_bool insert_last);
31 static int _mi_balance_page(MI_INFO *info,MI_KEYDEF *keyinfo,uchar *key,
32  uchar *curr_buff,uchar *father_buff,
33  uchar *father_keypos,my_off_t father_page);
34 static uchar *_mi_find_last_pos(MI_KEYDEF *keyinfo, uchar *page,
35  uchar *key, uint *return_key_length,
36  uchar **after_key);
37 int _mi_ck_write_tree(register MI_INFO *info, uint keynr,uchar *key,
38  uint key_length);
39 int _mi_ck_write_btree(register MI_INFO *info, uint keynr,uchar *key,
40  uint key_length);
41 
42  /* Write new record to database */
43 
44 int mi_write(MI_INFO *info, uchar *record)
45 {
46  MYISAM_SHARE *share=info->s;
47  uint i;
48  int save_errno;
49  my_off_t filepos;
50  uchar *buff;
51  my_bool lock_tree= share->concurrent_insert;
52  DBUG_ENTER("mi_write");
53  DBUG_PRINT("enter",("isam: %d data: %d",info->s->kfile,info->dfile));
54 
55  DBUG_EXECUTE_IF("myisam_pretend_crashed_table_on_usage",
56  mi_print_error(info->s, HA_ERR_CRASHED);
57  DBUG_RETURN(my_errno= HA_ERR_CRASHED););
58  if (share->options & HA_OPTION_READ_ONLY_DATA)
59  {
60  DBUG_RETURN(my_errno=EACCES);
61  }
62  if (_mi_readinfo(info,F_WRLCK,1))
63  DBUG_RETURN(my_errno);
64 
65  filepos= ((share->state.dellink != HA_OFFSET_ERROR &&
66  !info->append_insert_at_end) ?
67  share->state.dellink :
68  info->state->data_file_length);
69 
70  if (share->base.reloc == (ha_rows) 1 &&
71  share->base.records == (ha_rows) 1 &&
72  info->state->records == (ha_rows) 1)
73  { /* System file */
74  my_errno=HA_ERR_RECORD_FILE_FULL;
75  goto err2;
76  }
77  if (info->state->key_file_length >= share->base.margin_key_file_length)
78  {
79  my_errno=HA_ERR_INDEX_FILE_FULL;
80  goto err2;
81  }
82  if (_mi_mark_file_changed(info))
83  goto err2;
84 
85  /* Calculate and check all unique constraints */
86  if (mi_is_any_key_active(share->state.key_map))
87  {
88  for (i= 0 ; i < share->state.header.uniques ; i++)
89  {
90  if (mi_check_unique(info, share->uniqueinfo + i, record,
91  mi_unique_hash(share->uniqueinfo + i, record),
92  HA_OFFSET_ERROR))
93  goto err2;
94  }
95  }
96 
97  /* Write all keys to indextree */
98 
99  buff=info->lastkey2;
100  for (i=0 ; i < share->base.keys ; i++)
101  {
102  if (mi_is_key_active(share->state.key_map, i))
103  {
104  my_bool local_lock_tree= (lock_tree &&
105  !(info->bulk_insert &&
106  is_tree_inited(&info->bulk_insert[i])));
107  if (local_lock_tree)
108  {
109  mysql_rwlock_wrlock(&share->key_root_lock[i]);
110  share->keyinfo[i].version++;
111  }
112  if (share->keyinfo[i].flag & HA_FULLTEXT )
113  {
114  if (_mi_ft_add(info,i, buff, record, filepos))
115  {
116  if (local_lock_tree)
117  mysql_rwlock_unlock(&share->key_root_lock[i]);
118  DBUG_PRINT("error",("Got error: %d on write",my_errno));
119  goto err;
120  }
121  }
122  else
123  {
124  if (share->keyinfo[i].ck_insert(info,i,buff,
125  _mi_make_key(info,i,buff,record,filepos)))
126  {
127  if (local_lock_tree)
128  mysql_rwlock_unlock(&share->key_root_lock[i]);
129  DBUG_PRINT("error",("Got error: %d on write",my_errno));
130  goto err;
131  }
132  }
133 
134  /* The above changed info->lastkey2. Inform mi_rnext_same(). */
135  info->update&= ~HA_STATE_RNEXT_SAME;
136 
137  if (local_lock_tree)
138  mysql_rwlock_unlock(&share->key_root_lock[i]);
139  }
140  }
141  if (share->calc_checksum)
142  info->checksum=(*share->calc_checksum)(info,record);
143  if (!(info->opt_flag & OPT_NO_ROWS))
144  {
145  if ((*share->write_record)(info,record))
146  goto err;
147  info->state->checksum+=info->checksum;
148  }
149  if (share->base.auto_key)
150  set_if_bigger(info->s->state.auto_increment,
151  retrieve_auto_increment(info, record));
152  info->update= (HA_STATE_CHANGED | HA_STATE_AKTIV | HA_STATE_WRITTEN |
153  HA_STATE_ROW_CHANGED);
154  info->state->records++;
155  info->lastpos=filepos;
156  myisam_log_record(MI_LOG_WRITE,info,record,filepos,0);
157  (void) _mi_writeinfo(info, WRITEINFO_UPDATE_KEYFILE);
158  if (info->invalidator != 0)
159  {
160  DBUG_PRINT("info", ("invalidator... '%s' (update)", info->filename));
161  (*info->invalidator)(info->filename);
162  info->invalidator=0;
163  }
164 
165  /*
166  Update status of the table. We need to do so after each row write
167  for the log tables, as we want the new row to become visible to
168  other threads as soon as possible. We don't lock mutex here
169  (as it is required by pthread memory visibility rules) as (1) it's
170  not critical to use outdated share->is_log_table value (2) locking
171  mutex here for every write is too expensive.
172  */
173  if (share->is_log_table)
174  mi_update_status((void*) info);
175 
176  DBUG_RETURN(0);
177 
178 err:
179  save_errno=my_errno;
180  if (my_errno == HA_ERR_FOUND_DUPP_KEY || my_errno == HA_ERR_RECORD_FILE_FULL ||
181  my_errno == HA_ERR_NULL_IN_SPATIAL || my_errno == HA_ERR_OUT_OF_MEM)
182  {
183  if (info->bulk_insert)
184  {
185  uint j;
186  for (j=0 ; j < share->base.keys ; j++)
187  mi_flush_bulk_insert(info, j);
188  }
189  info->errkey= (int) i;
190  while ( i-- > 0)
191  {
192  if (mi_is_key_active(share->state.key_map, i))
193  {
194  my_bool local_lock_tree= (lock_tree &&
195  !(info->bulk_insert &&
196  is_tree_inited(&info->bulk_insert[i])));
197  if (local_lock_tree)
198  mysql_rwlock_wrlock(&share->key_root_lock[i]);
199  if (share->keyinfo[i].flag & HA_FULLTEXT)
200  {
201  if (_mi_ft_del(info,i, buff,record,filepos))
202  {
203  if (local_lock_tree)
204  mysql_rwlock_unlock(&share->key_root_lock[i]);
205  break;
206  }
207  }
208  else
209  {
210  uint key_length=_mi_make_key(info,i,buff,record,filepos);
211  if (share->keyinfo[i].ck_delete(info, i, buff, key_length))
212  {
213  if (local_lock_tree)
214  mysql_rwlock_unlock(&share->key_root_lock[i]);
215  break;
216  }
217  }
218  if (local_lock_tree)
219  mysql_rwlock_unlock(&share->key_root_lock[i]);
220  }
221  }
222  }
223  else
224  {
225  mi_print_error(info->s, HA_ERR_CRASHED);
226  mi_mark_crashed(info);
227  }
228  info->update= (HA_STATE_CHANGED | HA_STATE_WRITTEN | HA_STATE_ROW_CHANGED);
229  my_errno=save_errno;
230 err2:
231  save_errno=my_errno;
232  myisam_log_record(MI_LOG_WRITE,info,record,filepos,my_errno);
233  (void) _mi_writeinfo(info,WRITEINFO_UPDATE_KEYFILE);
234  DBUG_RETURN(my_errno=save_errno);
235 } /* mi_write */
236 
237 
238  /* Write one key to btree */
239 
240 int _mi_ck_write(MI_INFO *info, uint keynr, uchar *key, uint key_length)
241 {
242  DBUG_ENTER("_mi_ck_write");
243 
244  if (info->bulk_insert && is_tree_inited(&info->bulk_insert[keynr]))
245  {
246  DBUG_RETURN(_mi_ck_write_tree(info, keynr, key, key_length));
247  }
248  else
249  {
250  DBUG_RETURN(_mi_ck_write_btree(info, keynr, key, key_length));
251  }
252 } /* _mi_ck_write */
253 
254 
255 /**********************************************************************
256  * Normal insert code *
257  **********************************************************************/
258 
259 int _mi_ck_write_btree(register MI_INFO *info, uint keynr, uchar *key,
260  uint key_length)
261 {
262  int error;
263  uint comp_flag;
264  MI_KEYDEF *keyinfo=info->s->keyinfo+keynr;
265  my_off_t *root=&info->s->state.key_root[keynr];
266  DBUG_ENTER("_mi_ck_write_btree");
267 
268  if (keyinfo->flag & HA_SORT_ALLOWS_SAME)
269  comp_flag=SEARCH_BIGGER; /* Put after same key */
270  else if (keyinfo->flag & (HA_NOSAME|HA_FULLTEXT))
271  {
272  comp_flag=SEARCH_FIND | SEARCH_UPDATE; /* No duplicates */
273  if (keyinfo->flag & HA_NULL_ARE_EQUAL)
274  comp_flag|= SEARCH_NULL_ARE_EQUAL;
275  }
276  else
277  comp_flag=SEARCH_SAME; /* Keys in rec-pos order */
278 
279  error=_mi_ck_real_write_btree(info, keyinfo, key, key_length,
280  root, comp_flag);
281  if (info->ft1_to_ft2)
282  {
283  if (!error)
284  error= _mi_ft_convert_to_ft2(info, keynr, key);
285  delete_dynamic(info->ft1_to_ft2);
286  my_free(info->ft1_to_ft2);
287  info->ft1_to_ft2=0;
288  }
289  DBUG_RETURN(error);
290 } /* _mi_ck_write_btree */
291 
292 int _mi_ck_real_write_btree(MI_INFO *info, MI_KEYDEF *keyinfo,
293  uchar *key, uint key_length, my_off_t *root, uint comp_flag)
294 {
295  int error;
296  DBUG_ENTER("_mi_ck_real_write_btree");
297  /* key_length parameter is used only if comp_flag is SEARCH_FIND */
298  if (*root == HA_OFFSET_ERROR ||
299  (error=w_search(info, keyinfo, comp_flag, key, key_length,
300  *root, (uchar *) 0, (uchar*) 0,
301  (my_off_t) 0, 1)) > 0)
302  error=_mi_enlarge_root(info,keyinfo,key,root);
303  DBUG_RETURN(error);
304 } /* _mi_ck_real_write_btree */
305 
306 
307  /* Make a new root with key as only pointer */
308 
309 int _mi_enlarge_root(MI_INFO *info, MI_KEYDEF *keyinfo, uchar *key,
310  my_off_t *root)
311 {
312  uint t_length,nod_flag;
313  MI_KEY_PARAM s_temp;
314  MYISAM_SHARE *share=info->s;
315  DBUG_ENTER("_mi_enlarge_root");
316 
317  nod_flag= (*root != HA_OFFSET_ERROR) ? share->base.key_reflength : 0;
318  _mi_kpointer(info,info->buff+2,*root); /* if nod */
319  t_length=(*keyinfo->pack_key)(keyinfo,nod_flag,(uchar*) 0,
320  (uchar*) 0, (uchar*) 0, key,&s_temp);
321  mi_putint(info->buff,t_length+2+nod_flag,nod_flag);
322  (*keyinfo->store_key)(keyinfo,info->buff+2+nod_flag,&s_temp);
323  info->buff_used=info->page_changed=1; /* info->buff is used */
324  if ((*root= _mi_new(info,keyinfo,DFLT_INIT_HITS)) == HA_OFFSET_ERROR ||
325  _mi_write_keypage(info,keyinfo,*root,DFLT_INIT_HITS,info->buff))
326  DBUG_RETURN(-1);
327  DBUG_RETURN(0);
328 } /* _mi_enlarge_root */
329 
330 
331  /*
332  Search after a position for a key and store it there
333  Returns -1 = error
334  0 = ok
335  1 = key should be stored in higher tree
336  */
337 
338 static int w_search(register MI_INFO *info, register MI_KEYDEF *keyinfo,
339  uint comp_flag, uchar *key, uint key_length, my_off_t page,
340  uchar *father_buff, uchar *father_keypos,
341  my_off_t father_page, my_bool insert_last)
342 {
343  int error,flag;
344  uint nod_flag, search_key_length;
345  uchar *temp_buff,*keypos;
346  uchar keybuff[MI_MAX_KEY_BUFF];
347  my_bool was_last_key;
348  my_off_t next_page, dupp_key_pos;
349  DBUG_ENTER("w_search");
350  DBUG_PRINT("enter",("page: %ld", (long) page));
351 
352  search_key_length= (comp_flag & SEARCH_FIND) ? key_length : USE_WHOLE_KEY;
353  if (!(temp_buff= (uchar*) my_alloca((uint) keyinfo->block_length+
354  MI_MAX_KEY_BUFF*2)))
355  DBUG_RETURN(-1);
356  if (!_mi_fetch_keypage(info,keyinfo,page,DFLT_INIT_HITS,temp_buff,0))
357  goto err;
358 
359  flag=(*keyinfo->bin_search)(info,keyinfo,temp_buff,key,search_key_length,
360  comp_flag, &keypos, keybuff, &was_last_key);
361  nod_flag=mi_test_if_nod(temp_buff);
362  if (flag == 0)
363  {
364  uint tmp_key_length;
365  /* get position to record with duplicated key */
366  tmp_key_length=(*keyinfo->get_key)(keyinfo,nod_flag,&keypos,keybuff);
367  if (tmp_key_length)
368  dupp_key_pos=_mi_dpos(info,0,keybuff+tmp_key_length);
369  else
370  dupp_key_pos= HA_OFFSET_ERROR;
371 
372  if (keyinfo->flag & HA_FULLTEXT)
373  {
374  uint off;
375  int subkeys;
376 
377  get_key_full_length_rdonly(off, keybuff);
378  subkeys=ft_sintXkorr(keybuff+off);
379  comp_flag=SEARCH_SAME;
380  if (subkeys >= 0)
381  {
382  /* normal word, one-level tree structure */
383  flag=(*keyinfo->bin_search)(info, keyinfo, temp_buff, key,
384  USE_WHOLE_KEY, comp_flag,
385  &keypos, keybuff, &was_last_key);
386  }
387  else
388  {
389  /* popular word. two-level tree. going down */
390  my_off_t root=dupp_key_pos;
391  keyinfo=&info->s->ft2_keyinfo;
392  get_key_full_length_rdonly(off, key);
393  key+=off;
394  keypos-=keyinfo->keylength+nod_flag; /* we'll modify key entry 'in vivo' */
395  error=_mi_ck_real_write_btree(info, keyinfo, key, 0,
396  &root, comp_flag);
397  _mi_dpointer(info, keypos+HA_FT_WLEN, root);
398  subkeys--; /* should there be underflow protection ? */
399  DBUG_ASSERT(subkeys < 0);
400  ft_intXstore(keypos, subkeys);
401  if (!error)
402  error=_mi_write_keypage(info,keyinfo,page,DFLT_INIT_HITS,temp_buff);
403  my_afree((uchar*) temp_buff);
404  DBUG_RETURN(error);
405  }
406  }
407  else /* not HA_FULLTEXT, normal HA_NOSAME key */
408  {
409  info->dupp_key_pos= dupp_key_pos;
410  my_afree((uchar*) temp_buff);
411  my_errno=HA_ERR_FOUND_DUPP_KEY;
412  DBUG_RETURN(-1);
413  }
414  }
415  if (flag == MI_FOUND_WRONG_KEY)
416  DBUG_RETURN(-1);
417  if (!was_last_key)
418  insert_last=0;
419  next_page=_mi_kpos(nod_flag,keypos);
420  if (next_page == HA_OFFSET_ERROR ||
421  (error=w_search(info, keyinfo, comp_flag, key, key_length, next_page,
422  temp_buff, keypos, page, insert_last)) >0)
423  {
424  error=_mi_insert(info,keyinfo,key,temp_buff,keypos,keybuff,father_buff,
425  father_keypos,father_page, insert_last);
426  if (_mi_write_keypage(info,keyinfo,page,DFLT_INIT_HITS,temp_buff))
427  goto err;
428  }
429  my_afree((uchar*) temp_buff);
430  DBUG_RETURN(error);
431 err:
432  my_afree((uchar*) temp_buff);
433  DBUG_PRINT("exit",("Error: %d",my_errno));
434  DBUG_RETURN (-1);
435 } /* w_search */
436 
437 
438 /*
439  Insert new key.
440 
441  SYNOPSIS
442  _mi_insert()
443  info Open table information.
444  keyinfo Key definition information.
445  key New key.
446  anc_buff Key page (beginning).
447  key_pos Position in key page where to insert.
448  key_buff Copy of previous key.
449  father_buff parent key page for balancing.
450  father_key_pos position in parent key page for balancing.
451  father_page position of parent key page in file.
452  insert_last If to append at end of page.
453 
454  DESCRIPTION
455  Insert new key at right of key_pos.
456 
457  RETURN
458  2 if key contains key to upper level.
459  0 OK.
460  < 0 Error.
461 */
462 
463 int _mi_insert(register MI_INFO *info, register MI_KEYDEF *keyinfo,
464  uchar *key, uchar *anc_buff, uchar *key_pos, uchar *key_buff,
465  uchar *father_buff, uchar *father_key_pos, my_off_t father_page,
466  my_bool insert_last)
467 {
468  uint a_length,nod_flag;
469  int t_length;
470  uchar *endpos, *prev_key;
471  MI_KEY_PARAM s_temp;
472  DBUG_ENTER("_mi_insert");
473  DBUG_PRINT("enter",("key_pos: 0x%lx", (long) key_pos));
474  DBUG_EXECUTE("key",_mi_print_key(DBUG_FILE,keyinfo->seg,key,USE_WHOLE_KEY););
475 
476  nod_flag=mi_test_if_nod(anc_buff);
477  a_length=mi_getint(anc_buff);
478  endpos= anc_buff+ a_length;
479  prev_key=(key_pos == anc_buff+2+nod_flag ? (uchar*) 0 : key_buff);
480  t_length=(*keyinfo->pack_key)(keyinfo,nod_flag,
481  (key_pos == endpos ? (uchar*) 0 : key_pos),
482  prev_key, prev_key,
483  key,&s_temp);
484 #ifndef DBUG_OFF
485  if (key_pos != anc_buff+2+nod_flag && (keyinfo->flag &
486  (HA_BINARY_PACK_KEY | HA_PACK_KEY)))
487  {
488  DBUG_DUMP("prev_key",(uchar*) key_buff,_mi_keylength(keyinfo,key_buff));
489  }
490  if (keyinfo->flag & HA_PACK_KEY)
491  {
492  DBUG_PRINT("test",("t_length: %d ref_len: %d",
493  t_length,s_temp.ref_length));
494  DBUG_PRINT("test",("n_ref_len: %d n_length: %d key_pos: 0x%lx",
495  s_temp.n_ref_length,s_temp.n_length, (long) s_temp.key));
496  }
497 #endif
498  if (t_length > 0)
499  {
500  if (t_length >= keyinfo->maxlength*2+MAX_POINTER_LENGTH)
501  {
502  mi_print_error(info->s, HA_ERR_CRASHED);
503  my_errno=HA_ERR_CRASHED;
504  DBUG_RETURN(-1);
505  }
506  bmove_upp((uchar*) endpos+t_length,(uchar*) endpos,(uint) (endpos-key_pos));
507  }
508  else
509  {
510  if (-t_length >= keyinfo->maxlength*2+MAX_POINTER_LENGTH)
511  {
512  mi_print_error(info->s, HA_ERR_CRASHED);
513  my_errno=HA_ERR_CRASHED;
514  DBUG_RETURN(-1);
515  }
516  bmove(key_pos,key_pos-t_length,(uint) (endpos-key_pos)+t_length);
517  }
518  (*keyinfo->store_key)(keyinfo,key_pos,&s_temp);
519  a_length+=t_length;
520  mi_putint(anc_buff,a_length,nod_flag);
521  if (a_length <= keyinfo->block_length)
522  {
523  if (keyinfo->block_length - a_length < 32 &&
524  keyinfo->flag & HA_FULLTEXT && key_pos == endpos &&
525  info->s->base.key_reflength <= info->s->rec_reflength &&
526  info->s->options & (HA_OPTION_PACK_RECORD | HA_OPTION_COMPRESS_RECORD))
527  {
528  /*
529  Normal word. One-level tree. Page is almost full.
530  Let's consider converting.
531  We'll compare 'key' and the first key at anc_buff
532  */
533  uchar *a=key, *b=anc_buff+2+nod_flag;
534  uint alen, blen, ft2len=info->s->ft2_keyinfo.keylength;
535  /* the very first key on the page is always unpacked */
536  DBUG_ASSERT((*b & 128) == 0);
537 #if HA_FT_MAXLEN >= 127
538  blen= mi_uint2korr(b); b+=2;
539 #else
540  blen= *b++;
541 #endif
542  get_key_length(alen,a);
543  DBUG_ASSERT(info->ft1_to_ft2==0);
544  if (alen == blen &&
545  ha_compare_text(keyinfo->seg->charset, a, alen, b, blen, 0, 0)==0)
546  {
547  /* yup. converting */
548  info->ft1_to_ft2=(DYNAMIC_ARRAY *)
549  my_malloc(sizeof(DYNAMIC_ARRAY), MYF(MY_WME));
550  my_init_dynamic_array(info->ft1_to_ft2, ft2len, 300, 50);
551 
552  /*
553  now, adding all keys from the page to dynarray
554  if the page is a leaf (if not keys will be deleted later)
555  */
556  if (!nod_flag)
557  {
558  /* let's leave the first key on the page, though, because
559  we cannot easily dispatch an empty page here */
560  b+=blen+ft2len+2;
561  for (a=anc_buff+a_length ; b < a ; b+=ft2len+2)
562  {
563  if (insert_dynamic(info->ft1_to_ft2, b))
564  {
565  mi_print_error(info->s, HA_ERR_OUT_OF_MEM);
566  my_errno= HA_ERR_OUT_OF_MEM;
567  DBUG_RETURN(-1);
568  }
569  }
570 
571  /* fixing the page's length - it contains only one key now */
572  mi_putint(anc_buff,2+blen+ft2len+2,0);
573  }
574  /* the rest will be done when we're back from recursion */
575  }
576  }
577  DBUG_RETURN(0); /* There is room on page */
578  }
579  /* Page is full */
580  if (nod_flag)
581  insert_last=0;
582  if (!(keyinfo->flag & (HA_VAR_LENGTH_KEY | HA_BINARY_PACK_KEY)) &&
583  father_buff && !insert_last)
584  DBUG_RETURN(_mi_balance_page(info,keyinfo,key,anc_buff,father_buff,
585  father_key_pos,father_page));
586  DBUG_RETURN(_mi_split_page(info,keyinfo,key,anc_buff,key_buff, insert_last));
587 } /* _mi_insert */
588 
589 
590  /* split a full page in two and assign emerging item to key */
591 
592 int _mi_split_page(register MI_INFO *info, register MI_KEYDEF *keyinfo,
593  uchar *key, uchar *buff, uchar *key_buff,
594  my_bool insert_last_key)
595 {
596  uint length,a_length,key_ref_length,t_length,nod_flag,key_length;
597  uchar *key_pos,*pos, *after_key;
598  my_off_t new_pos;
599  MI_KEY_PARAM s_temp;
600  DBUG_ENTER("mi_split_page");
601  LINT_INIT(after_key);
602  DBUG_DUMP("buff",(uchar*) buff,mi_getint(buff));
603 
604  if (info->s->keyinfo+info->lastinx == keyinfo)
605  info->page_changed=1; /* Info->buff is used */
606  info->buff_used=1;
607  nod_flag=mi_test_if_nod(buff);
608  key_ref_length=2+nod_flag;
609  if (insert_last_key)
610  key_pos=_mi_find_last_pos(keyinfo,buff,key_buff, &key_length, &after_key);
611  else
612  key_pos=_mi_find_half_pos(nod_flag,keyinfo,buff,key_buff, &key_length,
613  &after_key);
614  if (!key_pos)
615  DBUG_RETURN(-1);
616 
617  length=(uint) (key_pos-buff);
618  a_length=mi_getint(buff);
619  mi_putint(buff,length,nod_flag);
620 
621  key_pos=after_key;
622  if (nod_flag)
623  {
624  DBUG_PRINT("test",("Splitting nod"));
625  pos=key_pos-nod_flag;
626  memcpy((uchar*) info->buff+2,(uchar*) pos,(size_t) nod_flag);
627  }
628 
629  /* Move middle item to key and pointer to new page */
630  if ((new_pos=_mi_new(info,keyinfo,DFLT_INIT_HITS)) == HA_OFFSET_ERROR)
631  DBUG_RETURN(-1);
632  _mi_kpointer(info,_mi_move_key(keyinfo,key,key_buff),new_pos);
633 
634  /* Store new page */
635  if (!(*keyinfo->get_key)(keyinfo,nod_flag,&key_pos,key_buff))
636  DBUG_RETURN(-1);
637 
638  t_length=(*keyinfo->pack_key)(keyinfo,nod_flag,(uchar *) 0,
639  (uchar*) 0, (uchar*) 0,
640  key_buff, &s_temp);
641  length=(uint) ((buff+a_length)-key_pos);
642  memcpy((uchar*) info->buff+key_ref_length+t_length,(uchar*) key_pos,
643  (size_t) length);
644  (*keyinfo->store_key)(keyinfo,info->buff+key_ref_length,&s_temp);
645  mi_putint(info->buff,length+t_length+key_ref_length,nod_flag);
646 
647  if (_mi_write_keypage(info,keyinfo,new_pos,DFLT_INIT_HITS,info->buff))
648  DBUG_RETURN(-1);
649  DBUG_DUMP("key",(uchar*) key,_mi_keylength(keyinfo,key));
650  DBUG_RETURN(2); /* Middle key up */
651 } /* _mi_split_page */
652 
653 
654  /*
655  Calculate how to much to move to split a page in two
656  Returns pointer to start of key.
657  key will contain the key.
658  return_key_length will contain the length of key
659  after_key will contain the position to where the next key starts
660  */
661 
662 uchar *_mi_find_half_pos(uint nod_flag, MI_KEYDEF *keyinfo, uchar *page,
663  uchar *key, uint *return_key_length,
664  uchar **after_key)
665 {
666  uint keys,length,key_ref_length;
667  uchar *end,*lastpos;
668  DBUG_ENTER("_mi_find_half_pos");
669 
670  key_ref_length=2+nod_flag;
671  length=mi_getint(page)-key_ref_length;
672  page+=key_ref_length;
673  if (!(keyinfo->flag &
674  (HA_PACK_KEY | HA_SPACE_PACK_USED | HA_VAR_LENGTH_KEY |
675  HA_BINARY_PACK_KEY)))
676  {
677  key_ref_length=keyinfo->keylength+nod_flag;
678  keys=length/(key_ref_length*2);
679  *return_key_length=keyinfo->keylength;
680  end=page+keys*key_ref_length;
681  *after_key=end+key_ref_length;
682  memcpy(key,end,key_ref_length);
683  DBUG_RETURN(end);
684  }
685 
686  end=page+length/2-key_ref_length; /* This is aprox. half */
687  *key='\0';
688  do
689  {
690  lastpos=page;
691  if (!(length=(*keyinfo->get_key)(keyinfo,nod_flag,&page,key)))
692  DBUG_RETURN(0);
693  } while (page < end);
694  *return_key_length=length;
695  *after_key=page;
696  DBUG_PRINT("exit",("returns: 0x%lx page: 0x%lx half: 0x%lx",
697  (long) lastpos, (long) page, (long) end));
698  DBUG_RETURN(lastpos);
699 } /* _mi_find_half_pos */
700 
701 
702  /*
703  Split buffer at last key
704  Returns pointer to the start of the key before the last key
705  key will contain the last key
706  */
707 
708 static uchar *_mi_find_last_pos(MI_KEYDEF *keyinfo, uchar *page,
709  uchar *key, uint *return_key_length,
710  uchar **after_key)
711 {
712  uint keys,length,UNINIT_VAR(last_length),key_ref_length;
713  uchar *end,*lastpos,*UNINIT_VAR(prevpos);
714  uchar key_buff[MI_MAX_KEY_BUFF];
715  DBUG_ENTER("_mi_find_last_pos");
716 
717  key_ref_length=2;
718  length=mi_getint(page)-key_ref_length;
719  page+=key_ref_length;
720  if (!(keyinfo->flag &
721  (HA_PACK_KEY | HA_SPACE_PACK_USED | HA_VAR_LENGTH_KEY |
722  HA_BINARY_PACK_KEY)))
723  {
724  keys=length/keyinfo->keylength-2;
725  *return_key_length=length=keyinfo->keylength;
726  end=page+keys*length;
727  *after_key=end+length;
728  memcpy(key,end,length);
729  DBUG_RETURN(end);
730  }
731 
732  end=page+length-key_ref_length;
733  *key='\0';
734  length=0;
735  lastpos=page;
736  while (page < end)
737  {
738  prevpos=lastpos; lastpos=page;
739  last_length=length;
740  memcpy(key, key_buff, length); /* previous key */
741  if (!(length=(*keyinfo->get_key)(keyinfo,0,&page,key_buff)))
742  {
743  mi_print_error(keyinfo->share, HA_ERR_CRASHED);
744  my_errno=HA_ERR_CRASHED;
745  DBUG_RETURN(0);
746  }
747  }
748  *return_key_length=last_length;
749  *after_key=lastpos;
750  DBUG_PRINT("exit",("returns: 0x%lx page: 0x%lx end: 0x%lx",
751  (long) prevpos,(long) page,(long) end));
752  DBUG_RETURN(prevpos);
753 } /* _mi_find_last_pos */
754 
755 
756  /* Balance page with not packed keys with page on right/left */
757  /* returns 0 if balance was done */
758 
759 static int _mi_balance_page(register MI_INFO *info, MI_KEYDEF *keyinfo,
760  uchar *key, uchar *curr_buff, uchar *father_buff,
761  uchar *father_key_pos, my_off_t father_page)
762 {
763  my_bool right;
764  uint k_length,father_length,father_keylength,nod_flag,curr_keylength,
765  right_length,left_length,new_right_length,new_left_length,extra_length,
766  length,keys;
767  uchar *pos,*buff,*extra_buff;
768  my_off_t next_page,new_pos;
769  uchar tmp_part_key[MI_MAX_KEY_BUFF];
770  DBUG_ENTER("_mi_balance_page");
771 
772  k_length=keyinfo->keylength;
773  father_length=mi_getint(father_buff);
774  father_keylength=k_length+info->s->base.key_reflength;
775  nod_flag=mi_test_if_nod(curr_buff);
776  curr_keylength=k_length+nod_flag;
777  info->page_changed=1;
778 
779  if ((father_key_pos != father_buff+father_length &&
780  (info->state->records & 1)) ||
781  father_key_pos == father_buff+2+info->s->base.key_reflength)
782  {
783  right=1;
784  next_page= _mi_kpos(info->s->base.key_reflength,
785  father_key_pos+father_keylength);
786  buff=info->buff;
787  DBUG_PRINT("test",("use right page: %lu", (ulong) next_page));
788  }
789  else
790  {
791  right=0;
792  father_key_pos-=father_keylength;
793  next_page= _mi_kpos(info->s->base.key_reflength,father_key_pos);
794  /* Fix that curr_buff is to left */
795  buff=curr_buff; curr_buff=info->buff;
796  DBUG_PRINT("test",("use left page: %lu", (ulong) next_page));
797  } /* father_key_pos ptr to parting key */
798 
799  if (!_mi_fetch_keypage(info,keyinfo,next_page,DFLT_INIT_HITS,info->buff,0))
800  goto err;
801  DBUG_DUMP("next",(uchar*) info->buff,mi_getint(info->buff));
802 
803  /* Test if there is room to share keys */
804 
805  left_length=mi_getint(curr_buff);
806  right_length=mi_getint(buff);
807  keys=(left_length+right_length-4-nod_flag*2)/curr_keylength;
808 
809  if ((right ? right_length : left_length) + curr_keylength <=
810  keyinfo->block_length)
811  { /* Merge buffs */
812  new_left_length=2+nod_flag+(keys/2)*curr_keylength;
813  new_right_length=2+nod_flag+((keys+1)/2)*curr_keylength;
814  mi_putint(curr_buff,new_left_length,nod_flag);
815  mi_putint(buff,new_right_length,nod_flag);
816 
817  if (left_length < new_left_length)
818  { /* Move keys buff -> leaf */
819  pos=curr_buff+left_length;
820  memcpy((uchar*) pos,(uchar*) father_key_pos, (size_t) k_length);
821  memcpy((uchar*) pos+k_length, (uchar*) buff+2,
822  (size_t) (length=new_left_length - left_length - k_length));
823  pos=buff+2+length;
824  memcpy((uchar*) father_key_pos,(uchar*) pos,(size_t) k_length);
825  bmove((uchar*) buff + 2, (uchar*) pos + k_length, new_right_length - 2);
826  }
827  else
828  { /* Move keys -> buff */
829 
830  bmove_upp((uchar*) buff+new_right_length,(uchar*) buff+right_length,
831  right_length-2);
832  length=new_right_length-right_length-k_length;
833  memcpy((uchar*) buff+2+length,father_key_pos,(size_t) k_length);
834  pos=curr_buff+new_left_length;
835  memcpy((uchar*) father_key_pos,(uchar*) pos,(size_t) k_length);
836  memcpy((uchar*) buff+2,(uchar*) pos+k_length,(size_t) length);
837  }
838 
839  if (_mi_write_keypage(info,keyinfo,next_page,DFLT_INIT_HITS,info->buff) ||
840  _mi_write_keypage(info,keyinfo,father_page,DFLT_INIT_HITS,father_buff))
841  goto err;
842  DBUG_RETURN(0);
843  }
844 
845  /* curr_buff[] and buff[] are full, lets split and make new nod */
846 
847  extra_buff=info->buff+info->s->base.max_key_block_length;
848  new_left_length=new_right_length=2+nod_flag+(keys+1)/3*curr_keylength;
849  if (keys == 5) /* Too few keys to balance */
850  new_left_length-=curr_keylength;
851  extra_length=nod_flag+left_length+right_length-
852  new_left_length-new_right_length-curr_keylength;
853  DBUG_PRINT("info",("left_length: %d right_length: %d new_left_length: %d new_right_length: %d extra_length: %d",
854  left_length, right_length,
855  new_left_length, new_right_length,
856  extra_length));
857  mi_putint(curr_buff,new_left_length,nod_flag);
858  mi_putint(buff,new_right_length,nod_flag);
859  mi_putint(extra_buff,extra_length+2,nod_flag);
860 
861  /* move first largest keys to new page */
862  pos=buff+right_length-extra_length;
863  memcpy((uchar*) extra_buff+2,pos,(size_t) extra_length);
864  /* Save new parting key */
865  memcpy(tmp_part_key, pos-k_length,k_length);
866  /* Make place for new keys */
867  bmove_upp((uchar*) buff+new_right_length,(uchar*) pos-k_length,
868  right_length-extra_length-k_length-2);
869  /* Copy keys from left page */
870  pos= curr_buff+new_left_length;
871  memcpy((uchar*) buff+2,(uchar*) pos+k_length,
872  (size_t) (length=left_length-new_left_length-k_length));
873  /* Copy old parting key */
874  memcpy((uchar*) buff+2+length,father_key_pos,(size_t) k_length);
875 
876  /* Move new parting keys up to caller */
877  memcpy((uchar*) (right ? key : father_key_pos),pos,(size_t) k_length);
878  memcpy((uchar*) (right ? father_key_pos : key),tmp_part_key, k_length);
879 
880  if ((new_pos=_mi_new(info,keyinfo,DFLT_INIT_HITS)) == HA_OFFSET_ERROR)
881  goto err;
882  _mi_kpointer(info,key+k_length,new_pos);
883  if (_mi_write_keypage(info,keyinfo,(right ? new_pos : next_page),
884  DFLT_INIT_HITS,info->buff) ||
885  _mi_write_keypage(info,keyinfo,(right ? next_page : new_pos),
886  DFLT_INIT_HITS,extra_buff))
887  goto err;
888 
889  DBUG_RETURN(1); /* Middle key up */
890 
891 err:
892  DBUG_RETURN(-1);
893 } /* _mi_balance_page */
894 
895 /**********************************************************************
896  * Bulk insert code *
897  **********************************************************************/
898 
899 typedef struct {
900  MI_INFO *info;
901  uint keynr;
903 
904 int _mi_ck_write_tree(register MI_INFO *info, uint keynr, uchar *key,
905  uint key_length)
906 {
907  int error;
908  DBUG_ENTER("_mi_ck_write_tree");
909 
910  error= tree_insert(&info->bulk_insert[keynr], key,
911  key_length + info->s->rec_reflength,
912  info->bulk_insert[keynr].custom_arg) ? 0 : HA_ERR_OUT_OF_MEM ;
913 
914  DBUG_RETURN(error);
915 } /* _mi_ck_write_tree */
916 
917 
918 /* typeof(_mi_keys_compare)=qsort_cmp2 */
919 
920 static int keys_compare(bulk_insert_param *param, uchar *key1, uchar *key2)
921 {
922  uint not_used[2];
923  return ha_key_cmp(param->info->s->keyinfo[param->keynr].seg,
924  key1, key2, USE_WHOLE_KEY, SEARCH_SAME,
925  not_used);
926 }
927 
928 
929 static int keys_free(uchar *key, TREE_FREE mode, bulk_insert_param *param)
930 {
931  /*
932  Probably I can use info->lastkey here, but I'm not sure,
933  and to be safe I'd better use local lastkey.
934  */
935  uchar lastkey[MI_MAX_KEY_BUFF];
936  uint keylen;
937  MI_KEYDEF *keyinfo;
938 
939  switch (mode) {
940  case free_init:
941  if (param->info->s->concurrent_insert)
942  {
943  mysql_rwlock_wrlock(&param->info->s->key_root_lock[param->keynr]);
944  param->info->s->keyinfo[param->keynr].version++;
945  }
946  return 0;
947  case free_free:
948  keyinfo=param->info->s->keyinfo+param->keynr;
949  keylen=_mi_keylength(keyinfo, key);
950  memcpy(lastkey, key, keylen);
951  return _mi_ck_write_btree(param->info,param->keynr,lastkey,
952  keylen - param->info->s->rec_reflength);
953  case free_end:
954  if (param->info->s->concurrent_insert)
955  mysql_rwlock_unlock(&param->info->s->key_root_lock[param->keynr]);
956  return 0;
957  }
958  return -1;
959 }
960 
961 
962 int mi_init_bulk_insert(MI_INFO *info, ulong cache_size, ha_rows rows)
963 {
964  MYISAM_SHARE *share=info->s;
965  MI_KEYDEF *key=share->keyinfo;
966  bulk_insert_param *params;
967  uint i, num_keys, total_keylength;
968  ulonglong key_map;
969  DBUG_ENTER("_mi_init_bulk_insert");
970  DBUG_PRINT("enter",("cache_size: %lu", cache_size));
971 
972  DBUG_ASSERT(!info->bulk_insert &&
973  (!rows || rows >= MI_MIN_ROWS_TO_USE_BULK_INSERT));
974 
975  mi_clear_all_keys_active(key_map);
976  for (i=total_keylength=num_keys=0 ; i < share->base.keys ; i++)
977  {
978  if (! (key[i].flag & HA_NOSAME) && (share->base.auto_key != i + 1) &&
979  mi_is_key_active(share->state.key_map, i))
980  {
981  num_keys++;
982  mi_set_key_active(key_map, i);
983  total_keylength+=key[i].maxlength+TREE_ELEMENT_EXTRA_SIZE;
984  }
985  }
986 
987  if (num_keys==0 ||
988  num_keys * MI_MIN_SIZE_BULK_INSERT_TREE > cache_size)
989  DBUG_RETURN(0);
990 
991  if (rows && rows*total_keylength < cache_size)
992  cache_size= (ulong)rows;
993  else
994  cache_size/=total_keylength*16;
995 
996  info->bulk_insert=(TREE *)
997  my_malloc((sizeof(TREE)*share->base.keys+
998  sizeof(bulk_insert_param)*num_keys),MYF(0));
999 
1000  if (!info->bulk_insert)
1001  DBUG_RETURN(HA_ERR_OUT_OF_MEM);
1002 
1003  params=(bulk_insert_param *)(info->bulk_insert+share->base.keys);
1004  for (i=0 ; i < share->base.keys ; i++)
1005  {
1006  if (mi_is_key_active(key_map, i))
1007  {
1008  params->info=info;
1009  params->keynr=i;
1010  /* Only allocate a 16'th of the buffer at a time */
1011  init_tree(&info->bulk_insert[i],
1012  cache_size * key[i].maxlength,
1013  cache_size * key[i].maxlength, 0,
1014  (qsort_cmp2)keys_compare, 0,
1015  (tree_element_free) keys_free, (void *)params++);
1016  }
1017  else
1018  info->bulk_insert[i].root=0;
1019  }
1020 
1021  DBUG_RETURN(0);
1022 }
1023 
1024 void mi_flush_bulk_insert(MI_INFO *info, uint inx)
1025 {
1026  if (info->bulk_insert)
1027  {
1028  if (is_tree_inited(&info->bulk_insert[inx]))
1029  reset_tree(&info->bulk_insert[inx]);
1030  }
1031 }
1032 
1033 void mi_end_bulk_insert(MI_INFO *info)
1034 {
1035  if (info->bulk_insert)
1036  {
1037  uint i;
1038  for (i=0 ; i < info->s->base.keys ; i++)
1039  {
1040  if (is_tree_inited(& info->bulk_insert[i]))
1041  {
1042  delete_tree(& info->bulk_insert[i]);
1043  }
1044  }
1045  my_free(info->bulk_insert);
1046  info->bulk_insert=0;
1047  }
1048 }