MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
row0ftsort.cc
Go to the documentation of this file.
1 /*****************************************************************************
2 
3 Copyright (c) 2010, 2012, Oracle and/or its affiliates. All Rights Reserved.
4 
5 This program is free software; you can redistribute it and/or modify it under
6 the terms of the GNU General Public License as published by the Free Software
7 Foundation; version 2 of the License.
8 
9 This program is distributed in the hope that it will be useful, but WITHOUT
10 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
12 
13 You should have received a copy of the GNU General Public License along with
14 this program; if not, write to the Free Software Foundation, Inc.,
15 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
16 
17 *****************************************************************************/
18 
19 /**************************************************/
26 #include "dict0dict.h" /* dict_table_stats_lock() */
27 #include "row0merge.h"
28 #include "pars0pars.h"
29 #include "row0ftsort.h"
30 #include "row0merge.h"
31 #include "row0row.h"
32 #include "btr0cur.h"
33 
36 #define ROW_MERGE_READ_GET_NEXT(N) \
37  do { \
38  b[N] = row_merge_read_rec( \
39  block[N], buf[N], b[N], index, \
40  fd[N], &foffs[N], &mrec[N], offsets[N]); \
41  if (UNIV_UNLIKELY(!b[N])) { \
42  if (mrec[N]) { \
43  goto exit; \
44  } \
45  } \
46  } while (0)
47 
49 UNIV_INTERN ulong fts_sort_pll_degree = 2;
50 
51 /*********************************************************************/
61 UNIV_INTERN
64 /*============================*/
68  const dict_table_t* table,
70  ibool* opt_doc_id_size)
74 {
75  dict_index_t* new_index;
76  dict_field_t* field;
77  dict_field_t* idx_field;
78  CHARSET_INFO* charset;
79 
80  // FIXME: This name shouldn't be hard coded here.
81  new_index = dict_mem_index_create(
82  index->table->name, "tmp_fts_idx", 0, DICT_FTS, 3);
83 
84  new_index->id = index->id;
85  new_index->table = (dict_table_t*) table;
86  new_index->n_uniq = FTS_NUM_FIELDS_SORT;
87  new_index->n_def = FTS_NUM_FIELDS_SORT;
88  new_index->cached = TRUE;
89 
90  idx_field = dict_index_get_nth_field(index, 0);
91  charset = fts_index_get_charset(index);
92 
93  /* The first field is on the Tokenized Word */
94  field = dict_index_get_nth_field(new_index, 0);
95  field->name = NULL;
96  field->prefix_len = 0;
97  field->col = static_cast<dict_col_t*>(
98  mem_heap_alloc(new_index->heap, sizeof(dict_col_t)));
99  field->col->len = FTS_MAX_WORD_LEN;
100 
101  if (strcmp(charset->name, "latin1_swedish_ci") == 0) {
102  field->col->mtype = DATA_VARCHAR;
103  } else {
104  field->col->mtype = DATA_VARMYSQL;
105  }
106 
107  field->col->prtype = idx_field->col->prtype | DATA_NOT_NULL;
108  field->col->mbminmaxlen = idx_field->col->mbminmaxlen;
109  field->fixed_len = 0;
110 
111  /* Doc ID */
112  field = dict_index_get_nth_field(new_index, 1);
113  field->name = NULL;
114  field->prefix_len = 0;
115  field->col = static_cast<dict_col_t*>(
116  mem_heap_alloc(new_index->heap, sizeof(dict_col_t)));
117  field->col->mtype = DATA_INT;
118  *opt_doc_id_size = FALSE;
119 
120  /* Check whether we can use 4 bytes instead of 8 bytes integer
121  field to hold the Doc ID, thus reduce the overall sort size */
122  if (DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_ADD_DOC_ID)) {
123  /* If Doc ID column is being added by this create
124  index, then just check the number of rows in the table */
126  *opt_doc_id_size = TRUE;
127  }
128  } else {
129  doc_id_t max_doc_id;
130 
131  /* If the Doc ID column is supplied by user, then
132  check the maximum Doc ID in the table */
133  max_doc_id = fts_get_max_doc_id((dict_table_t*) table);
134 
135  if (max_doc_id && max_doc_id < MAX_DOC_ID_OPT_VAL) {
136  *opt_doc_id_size = TRUE;
137  }
138  }
139 
140  if (*opt_doc_id_size) {
141  field->col->len = sizeof(ib_uint32_t);
142  field->fixed_len = sizeof(ib_uint32_t);
143  } else {
144  field->col->len = FTS_DOC_ID_LEN;
145  field->fixed_len = FTS_DOC_ID_LEN;
146  }
147 
148  field->col->prtype = DATA_NOT_NULL | DATA_BINARY_TYPE;
149 
150  field->col->mbminmaxlen = 0;
151 
152  /* The third field is on the word's position in the original doc */
153  field = dict_index_get_nth_field(new_index, 2);
154  field->name = NULL;
155  field->prefix_len = 0;
156  field->col = static_cast<dict_col_t*>(
157  mem_heap_alloc(new_index->heap, sizeof(dict_col_t)));
158  field->col->mtype = DATA_INT;
159  field->col->len = 4 ;
160  field->fixed_len = 4;
161  field->col->prtype = DATA_NOT_NULL;
162  field->col->mbminmaxlen = 0;
163 
164  return(new_index);
165 }
166 /*********************************************************************/
169 UNIV_INTERN
170 ibool
172 /*====================*/
173  trx_t* trx,
174  row_merge_dup_t* dup,
176  const dict_table_t* new_table,
178  ibool opt_doc_id_size,
182  fts_psort_t** psort,
184  fts_psort_t** merge)
186 {
187  ulint i;
188  ulint j;
189  fts_psort_common_t* common_info = NULL;
190  fts_psort_t* psort_info = NULL;
191  fts_psort_t* merge_info = NULL;
192  ulint block_size;
193  ibool ret = TRUE;
194 
195  block_size = 3 * srv_sort_buf_size;
196 
197  *psort = psort_info = static_cast<fts_psort_t*>(mem_zalloc(
198  fts_sort_pll_degree * sizeof *psort_info));
199 
200  if (!psort_info) {
201  ut_free(dup);
202  return(FALSE);
203  }
204 
205  /* Common Info for all sort threads */
206  common_info = static_cast<fts_psort_common_t*>(
207  mem_alloc(sizeof *common_info));
208 
209  if (!common_info) {
210  ut_free(dup);
211  mem_free(psort_info);
212  return(FALSE);
213  }
214 
215  common_info->dup = dup;
216  common_info->new_table = (dict_table_t*) new_table;
217  common_info->trx = trx;
218  common_info->all_info = psort_info;
219  common_info->sort_event = os_event_create();
220  common_info->merge_event = os_event_create();
221  common_info->opt_doc_id_size = opt_doc_id_size;
222 
223  /* There will be FTS_NUM_AUX_INDEX number of "sort buckets" for
224  each parallel sort thread. Each "sort bucket" holds records for
225  a particular "FTS index partition" */
226  for (j = 0; j < fts_sort_pll_degree; j++) {
227 
228  UT_LIST_INIT(psort_info[j].fts_doc_list);
229 
230  for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
231 
232  psort_info[j].merge_file[i] =
233  static_cast<merge_file_t*>(
234  mem_zalloc(sizeof(merge_file_t)));
235 
236  if (!psort_info[j].merge_file[i]) {
237  ret = FALSE;
238  goto func_exit;
239  }
240 
241  psort_info[j].merge_buf[i] = row_merge_buf_create(
242  dup->index);
243 
244  if (row_merge_file_create(psort_info[j].merge_file[i])
245  < 0) {
246  goto func_exit;
247  }
248 
249  /* Need to align memory for O_DIRECT write */
250  psort_info[j].block_alloc[i] =
251  static_cast<row_merge_block_t*>(ut_malloc(
252  block_size + 1024));
253 
254  psort_info[j].merge_block[i] =
255  static_cast<row_merge_block_t*>(
256  ut_align(
257  psort_info[j].block_alloc[i], 1024));
258 
259  if (!psort_info[j].merge_block[i]) {
260  ret = FALSE;
261  goto func_exit;
262  }
263  }
264 
265  psort_info[j].child_status = 0;
266  psort_info[j].state = 0;
267  psort_info[j].psort_common = common_info;
268  }
269 
270  /* Initialize merge_info structures parallel merge and insert
271  into auxiliary FTS tables (FTS_INDEX_TABLE) */
272  *merge = merge_info = static_cast<fts_psort_t*>(
273  mem_alloc(FTS_NUM_AUX_INDEX * sizeof *merge_info));
274 
275  for (j = 0; j < FTS_NUM_AUX_INDEX; j++) {
276 
277  merge_info[j].child_status = 0;
278  merge_info[j].state = 0;
279  merge_info[j].psort_common = common_info;
280  }
281 
282 func_exit:
283  if (!ret) {
284  row_fts_psort_info_destroy(psort_info, merge_info);
285  }
286 
287  return(ret);
288 }
289 /*********************************************************************/
292 UNIV_INTERN
293 void
295 /*=======================*/
296  fts_psort_t* psort_info,
297  fts_psort_t* merge_info)
298 {
299  ulint i;
300  ulint j;
301 
302  if (psort_info) {
303  for (j = 0; j < fts_sort_pll_degree; j++) {
304  for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
305  if (psort_info[j].merge_file[i]) {
307  psort_info[j].merge_file[i]);
308  }
309 
310  if (psort_info[j].block_alloc[i]) {
311  ut_free(psort_info[j].block_alloc[i]);
312  }
313  mem_free(psort_info[j].merge_file[i]);
314  }
315  }
316 
317  os_event_free(merge_info[0].psort_common->sort_event);
318  os_event_free(merge_info[0].psort_common->merge_event);
319  ut_free(merge_info[0].psort_common->dup);
320  mem_free(merge_info[0].psort_common);
321  mem_free(psort_info);
322  }
323 
324  if (merge_info) {
325  mem_free(merge_info);
326  }
327 }
328 /*********************************************************************/
330 UNIV_INTERN
331 void
333 /*=======================*/
334  fts_psort_t* psort_info)
335 {
336  ulint j;
337  ulint i;
338 
339  if (!psort_info) {
340  return;
341  }
342 
343  for (j = 0; j < fts_sort_pll_degree; j++) {
344  for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
345  row_merge_buf_free(psort_info[j].merge_buf[i]);
346  }
347  }
348 
349  return;
350 }
351 
352 /*********************************************************************/
355 static
356 ibool
357 row_merge_fts_doc_tokenize(
358 /*=======================*/
359  row_merge_buf_t** sort_buf,
360  doc_id_t doc_id,
361  fts_doc_t* doc,
362  dtype_t* word_dtype,
364  merge_file_t** merge_file,
365  ibool opt_doc_id_size,
368  fts_tokenize_ctx_t* t_ctx)
369 {
370  ulint i;
371  ulint inc;
372  fts_string_t str;
373  ulint len;
375  dfield_t* field;
376  fts_string_t t_str;
377  ibool buf_full = FALSE;
378  byte str_buf[FTS_MAX_WORD_LEN + 1];
379  ulint data_size[FTS_NUM_AUX_INDEX];
380  ulint n_tuple[FTS_NUM_AUX_INDEX];
381 
382  t_str.f_n_char = 0;
383  t_ctx->buf_used = 0;
384 
385  memset(n_tuple, 0, FTS_NUM_AUX_INDEX * sizeof(ulint));
386  memset(data_size, 0, FTS_NUM_AUX_INDEX * sizeof(ulint));
387 
388  /* Tokenize the data and add each word string, its corresponding
389  doc id and position to sort buffer */
390  for (i = t_ctx->processed_len; i < doc->text.f_len; i += inc) {
391  ib_rbt_bound_t parent;
392  ulint idx = 0;
393  ib_uint32_t position;
394  ulint offset = 0;
395  ulint cur_len = 0;
396  doc_id_t write_doc_id;
397 
399  doc->charset, doc->text.f_str + i,
400  doc->text.f_str + doc->text.f_len, &str, &offset);
401 
402  ut_a(inc > 0);
403 
404  /* Ignore string whose character number is less than
405  "fts_min_token_size" or more than "fts_max_token_size" */
406  if (str.f_n_char < fts_min_token_size
407  || str.f_n_char > fts_max_token_size) {
408 
409  t_ctx->processed_len += inc;
410  continue;
411  }
412 
414  doc->charset, (char*) str.f_str, str.f_len,
415  (char*) &str_buf, FTS_MAX_WORD_LEN + 1);
416 
417  t_str.f_str = (byte*) &str_buf;
418 
419  /* if "cached_stopword" is defined, ingore words in the
420  stopword list */
421  if (t_ctx->cached_stopword
422  && rbt_search(t_ctx->cached_stopword,
423  &parent, &t_str) == 0) {
424 
425  t_ctx->processed_len += inc;
426  continue;
427  }
428 
429  /* There are FTS_NUM_AUX_INDEX auxiliary tables, find
430  out which sort buffer to put this word record in */
431  t_ctx->buf_used = fts_select_index(
432  doc->charset, t_str.f_str, t_str.f_len);
433 
434  buf = sort_buf[t_ctx->buf_used];
435 
436  ut_a(t_ctx->buf_used < FTS_NUM_AUX_INDEX);
437  idx = t_ctx->buf_used;
438 
439  mtuple_t* mtuple = &buf->tuples[buf->n_tuples + n_tuple[idx]];
440 
441  field = mtuple->fields = static_cast<dfield_t*>(
442  mem_heap_alloc(buf->heap,
443  FTS_NUM_FIELDS_SORT * sizeof *field));
444 
445  /* The first field is the tokenized word */
446  dfield_set_data(field, t_str.f_str, t_str.f_len);
447  len = dfield_get_len(field);
448 
449  field->type.mtype = word_dtype->mtype;
450  field->type.prtype = word_dtype->prtype | DATA_NOT_NULL;
451 
452  /* Variable length field, set to max size. */
453  field->type.len = FTS_MAX_WORD_LEN;
454  field->type.mbminmaxlen = word_dtype->mbminmaxlen;
455 
456  cur_len += len;
457  dfield_dup(field, buf->heap);
458  field++;
459 
460  /* The second field is the Doc ID */
461 
462  ib_uint32_t doc_id_32_bit;
463 
464  if (!opt_doc_id_size) {
465  fts_write_doc_id((byte*) &write_doc_id, doc_id);
466 
468  field, &write_doc_id, sizeof(write_doc_id));
469  } else {
471  (byte*) &doc_id_32_bit, (ib_uint32_t) doc_id);
472 
474  field, &doc_id_32_bit, sizeof(doc_id_32_bit));
475  }
476 
477  len = field->len;
478  ut_ad(len == FTS_DOC_ID_LEN || len == sizeof(ib_uint32_t));
479 
480  field->type.mtype = DATA_INT;
481  field->type.prtype = DATA_NOT_NULL | DATA_BINARY_TYPE;
482  field->type.len = len;
483  field->type.mbminmaxlen = 0;
484 
485  cur_len += len;
486  dfield_dup(field, buf->heap);
487 
488  ++field;
489 
490  /* The third field is the position */
492  (byte*) &position,
493  (i + offset + inc - str.f_len + t_ctx->init_pos));
494 
495  dfield_set_data(field, &position, sizeof(position));
496  len = dfield_get_len(field);
497  ut_ad(len == sizeof(ib_uint32_t));
498 
499  field->type.mtype = DATA_INT;
500  field->type.prtype = DATA_NOT_NULL;
501  field->type.len = len;
502  field->type.mbminmaxlen = 0;
503  cur_len += len;
504  dfield_dup(field, buf->heap);
505 
506  /* One variable length column, word with its lenght less than
507  fts_max_token_size, add one extra size and one extra byte */
508  cur_len += 2;
509 
510  /* Reserve one byte for the end marker of row_merge_block_t. */
511  if (buf->total_size + data_size[idx] + cur_len
512  >= srv_sort_buf_size - 1) {
513 
514  buf_full = TRUE;
515  break;
516  }
517 
518  /* Increment the number of tuples */
519  n_tuple[idx]++;
520  t_ctx->processed_len += inc;
521  data_size[idx] += cur_len;
522  }
523 
524  /* Update the data length and the number of new word tuples
525  added in this round of tokenization */
526  for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
527  /* The computation of total_size below assumes that no
528  delete-mark flags will be stored and that all fields
529  are NOT NULL and fixed-length. */
530 
531  sort_buf[i]->total_size += data_size[i];
532 
533  sort_buf[i]->n_tuples += n_tuple[i];
534 
535  merge_file[i]->n_rec += n_tuple[i];
536  t_ctx->rows_added[i] += n_tuple[i];
537  }
538 
539  if (!buf_full) {
540  /* we pad one byte between text accross two fields */
541  t_ctx->init_pos += doc->text.f_len + 1;
542  }
543 
544  return(!buf_full);
545 }
546 
547 /*********************************************************************/
551 UNIV_INTERN
552 os_thread_ret_t
554 /*======================*/
555  void* arg)
556 {
557  fts_psort_t* psort_info = (fts_psort_t*) arg;
558  ulint i;
559  fts_doc_item_t* doc_item = NULL;
560  fts_doc_item_t* prev_doc_item = NULL;
562  ibool processed = FALSE;
563  merge_file_t** merge_file;
565  int tmpfd[FTS_NUM_AUX_INDEX];
566  ulint mycount[FTS_NUM_AUX_INDEX];
567  ib_uint64_t total_rec = 0;
568  ulint num_doc_processed = 0;
569  doc_id_t last_doc_id = 0;
570  ulint zip_size;
571  mem_heap_t* blob_heap = NULL;
572  fts_doc_t doc;
573  dict_table_t* table = psort_info->psort_common->new_table;
574  dtype_t word_dtype;
575  dict_field_t* idx_field;
576  fts_tokenize_ctx_t t_ctx;
577  ulint retried = 0;
578  ut_ad(psort_info);
579 
580  ut_ad(psort_info);
581 
582  buf = psort_info->merge_buf;
583  merge_file = psort_info->merge_file;
584  blob_heap = mem_heap_create(512);
585  memset(&doc, 0, sizeof(doc));
586  memset(&t_ctx, 0, sizeof(t_ctx));
587  memset(mycount, 0, FTS_NUM_AUX_INDEX * sizeof(int));
588 
590  psort_info->psort_common->dup->index);
591 
592  idx_field = dict_index_get_nth_field(
593  psort_info->psort_common->dup->index, 0);
594  word_dtype.prtype = idx_field->col->prtype;
595  word_dtype.mbminmaxlen = idx_field->col->mbminmaxlen;
596  word_dtype.mtype = (strcmp(doc.charset->name, "latin1_swedish_ci") == 0)
597  ? DATA_VARCHAR : DATA_VARMYSQL;
598 
599  block = psort_info->merge_block;
600  zip_size = dict_table_zip_size(table);
601 
602  doc_item = UT_LIST_GET_FIRST(psort_info->fts_doc_list);
603 
604  if (doc_item) {
605  prev_doc_item = doc_item;
606  }
607 
608  t_ctx.cached_stopword = table->fts->cache->stopword_info.cached_stopword;
609  processed = TRUE;
610 loop:
611  while (doc_item) {
612  dfield_t* dfield = doc_item->field;
613 
614  last_doc_id = doc_item->doc_id;
615 
616  if (!(dfield->data)
617  || dfield_get_len(dfield) == UNIV_SQL_NULL) {
618  num_doc_processed++;
619  doc_item = UT_LIST_GET_NEXT(doc_list, doc_item);
620 
621  /* Always remember the last doc_item we processed */
622  if (doc_item) {
623  prev_doc_item = doc_item;
624  }
625  continue;
626  }
627 
628  /* If finish processing the last item, update "doc" with
629  strings in the doc_item, otherwise continue processing last
630  item */
631  if (processed) {
632  byte* data;
633  ulint data_len;
634 
635  dfield = doc_item->field;
636  data = static_cast<byte*>(dfield_get_data(dfield));
637  data_len = dfield_get_len(dfield);
638 
639  if (dfield_is_ext(dfield)) {
640  doc.text.f_str =
642  &doc.text.f_len, data,
643  zip_size, data_len, blob_heap);
644  } else {
645  doc.text.f_str = data;
646  doc.text.f_len = data_len;
647  }
648 
649  doc.tokens = 0;
650  t_ctx.processed_len = 0;
651  } else {
652  /* Not yet finish processing the "doc" on hand,
653  continue processing it */
654  ut_ad(doc.text.f_str);
655  ut_ad(t_ctx.processed_len < doc.text.f_len);
656  }
657 
658  processed = row_merge_fts_doc_tokenize(
659  buf, doc_item->doc_id, &doc,
660  &word_dtype,
661  merge_file, psort_info->psort_common->opt_doc_id_size,
662  &t_ctx);
663 
664  /* Current sort buffer full, need to recycle */
665  if (!processed) {
666  ut_ad(t_ctx.processed_len < doc.text.f_len);
667  ut_ad(t_ctx.rows_added[t_ctx.buf_used]);
668  break;
669  }
670 
671  num_doc_processed++;
672 
673  if (fts_enable_diag_print && num_doc_processed % 10000 == 1) {
674  fprintf(stderr, "number of doc processed %d\n",
675  (int) num_doc_processed);
676 #ifdef FTS_INTERNAL_DIAG_PRINT
677  for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
678  fprintf(stderr, "ID %d, partition %d, word "
679  "%d\n",(int) psort_info->psort_id,
680  (int) i, (int) mycount[i]);
681  }
682 #endif
683  }
684 
685  mem_heap_empty(blob_heap);
686 
687  if (doc_item->field->data) {
688  ut_free(doc_item->field->data);
689  doc_item->field->data = NULL;
690  }
691 
692  doc_item = UT_LIST_GET_NEXT(doc_list, doc_item);
693 
694  /* Always remember the last doc_item we processed */
695  if (doc_item) {
696  prev_doc_item = doc_item;
697  if (last_doc_id != doc_item->doc_id) {
698  t_ctx.init_pos = 0;
699  }
700  }
701  }
702 
703  /* If we run out of current sort buffer, need to sort
704  and flush the sort buffer to disk */
705  if (t_ctx.rows_added[t_ctx.buf_used] && !processed) {
706  row_merge_buf_sort(buf[t_ctx.buf_used], NULL);
707  row_merge_buf_write(buf[t_ctx.buf_used],
708  merge_file[t_ctx.buf_used],
709  block[t_ctx.buf_used]);
710  row_merge_write(merge_file[t_ctx.buf_used]->fd,
711  merge_file[t_ctx.buf_used]->offset++,
712  block[t_ctx.buf_used]);
713  UNIV_MEM_INVALID(block[t_ctx.buf_used][0], srv_sort_buf_size);
714  buf[t_ctx.buf_used] = row_merge_buf_empty(buf[t_ctx.buf_used]);
715  mycount[t_ctx.buf_used] += t_ctx.rows_added[t_ctx.buf_used];
716  t_ctx.rows_added[t_ctx.buf_used] = 0;
717 
718  ut_a(doc_item);
719  goto loop;
720  }
721 
722  /* Parent done scanning, and if finish processing all the docs, exit */
723  if (psort_info->state == FTS_PARENT_COMPLETE) {
724  if (num_doc_processed >= UT_LIST_GET_LEN(
725  psort_info->fts_doc_list)) {
726  goto exit;
727  } else if (retried > 10000) {
728  ut_ad(!doc_item);
729  /* retied too many times and cannot get new record */
730  fprintf(stderr, "InnoDB: FTS parallel sort processed "
731  "%lu records, the sort queue has "
732  "%lu records. But sort cannot get "
733  "the next records", num_doc_processed,
735  psort_info->fts_doc_list));
736  goto exit;
737  }
738  }
739 
740  if (doc_item) {
741  doc_item = UT_LIST_GET_NEXT(doc_list, doc_item);
742  } else if (prev_doc_item) {
743  os_thread_yield();
744  doc_item = UT_LIST_GET_NEXT(doc_list, prev_doc_item);
745  } else {
746  os_thread_yield();
747  doc_item = UT_LIST_GET_FIRST(psort_info->fts_doc_list);
748  }
749 
750  if (doc_item) {
751  prev_doc_item = doc_item;
752 
753  if (last_doc_id != doc_item->doc_id) {
754  t_ctx.init_pos = 0;
755  }
756 
757  retried = 0;
758  } else if (psort_info->state == FTS_PARENT_COMPLETE) {
759  retried++;
760  }
761 
762  goto loop;
763 
764 exit:
765  /* Do a final sort of the last (or latest) batch of records
766  in block memory. Flush them to temp file if records cannot
767  be hold in one block memory */
768  for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
769  if (t_ctx.rows_added[i]) {
770  row_merge_buf_sort(buf[i], NULL);
772  buf[i], merge_file[i], block[i]);
773 
774  /* Write to temp file, only if records have
775  been flushed to temp file before (offset > 0):
776  The pseudo code for sort is following:
777 
778  while (there are rows) {
779  tokenize rows, put result in block[]
780  if (block[] runs out) {
781  sort rows;
782  write to temp file with
783  row_merge_write();
784  offset++;
785  }
786  }
787 
788  # write out the last batch
789  if (offset > 0) {
790  row_merge_write();
791  offset++;
792  } else {
793  # no need to write anything
794  offset stay as 0
795  }
796 
797  so if merge_file[i]->offset is 0 when we come to
798  here as the last batch, this means rows have
799  never flush to temp file, it can be held all in
800  memory */
801  if (merge_file[i]->offset != 0) {
802  row_merge_write(merge_file[i]->fd,
803  merge_file[i]->offset++,
804  block[i]);
805 
806  UNIV_MEM_INVALID(block[i][0],
808  }
809 
810  buf[i] = row_merge_buf_empty(buf[i]);
811  t_ctx.rows_added[i] = 0;
812  }
813  }
814 
815  if (fts_enable_diag_print) {
816  DEBUG_FTS_SORT_PRINT(" InnoDB_FTS: start merge sort\n");
817  }
818 
819  for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
820 
821  if (!merge_file[i]->offset) {
822  continue;
823  }
824 
825  tmpfd[i] = row_merge_file_create_low();
826  if (tmpfd[i] < 0) {
827  goto func_exit;
828  }
829 
830  row_merge_sort(psort_info->psort_common->trx,
831  psort_info->psort_common->dup,
832  merge_file[i], block[i], &tmpfd[i]);
833  total_rec += merge_file[i]->n_rec;
834  close(tmpfd[i]);
835  }
836 
837 func_exit:
838  if (fts_enable_diag_print) {
839  DEBUG_FTS_SORT_PRINT(" InnoDB_FTS: complete merge sort\n");
840  }
841 
842  mem_heap_free(blob_heap);
843 
844  psort_info->child_status = FTS_CHILD_COMPLETE;
845  os_event_set(psort_info->psort_common->sort_event);
846  psort_info->child_status = FTS_CHILD_EXITING;
847 
848 #ifdef __WIN__
849  CloseHandle(psort_info->thread_hdl);
850 #endif /*__WIN__ */
851 
852  os_thread_exit(NULL);
853 
854  OS_THREAD_DUMMY_RETURN;
855 }
856 
857 /*********************************************************************/
859 UNIV_INTERN
860 void
862 /*================*/
863  fts_psort_t* psort_info)
864 {
865  ulint i = 0;
866  os_thread_id_t thd_id;
867 
868  for (i = 0; i < fts_sort_pll_degree; i++) {
869  psort_info[i].psort_id = i;
870  psort_info[i].thread_hdl = os_thread_create(
872  (void*) &psort_info[i], &thd_id);
873  }
874 }
875 
876 /*********************************************************************/
879 UNIV_INTERN
880 os_thread_ret_t
882 /*===============*/
883  void* arg)
884 {
885  fts_psort_t* psort_info = (fts_psort_t*) arg;
886  ulint id;
887 
888  ut_ad(psort_info);
889 
890  id = psort_info->psort_id;
891 
893  psort_info->psort_common->new_table,
894  psort_info->psort_common->all_info, id);
895 
896  psort_info->child_status = FTS_CHILD_COMPLETE;
897  os_event_set(psort_info->psort_common->merge_event);
898  psort_info->child_status = FTS_CHILD_EXITING;
899 
900 #ifdef __WIN__
901  CloseHandle(psort_info->thread_hdl);
902 #endif /*__WIN__ */
903 
904  os_thread_exit(NULL);
905 
906  OS_THREAD_DUMMY_RETURN;
907 }
908 
909 /*********************************************************************/
911 UNIV_INTERN
912 void
914 /*=========================*/
915  fts_psort_t* merge_info)
916 {
917  int i = 0;
918  os_thread_id_t thd_id;
919 
920  /* Kick off merge/insert threads */
921  for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
922  merge_info[i].psort_id = i;
923  merge_info[i].child_status = 0;
924 
925  merge_info[i].thread_hdl = os_thread_create(
926  fts_parallel_merge, (void*) &merge_info[i], &thd_id);
927  }
928 }
929 
930 /********************************************************************/
933 static __attribute__((nonnull))
934 dberr_t
935 row_merge_write_fts_word(
936 /*=====================*/
937  trx_t* trx,
942  CHARSET_INFO* charset)
943 {
944  ulint selected;
945  dberr_t ret = DB_SUCCESS;
946 
947  selected = fts_select_index(
948  charset, word->text.f_str, word->text.f_len);
949  fts_table->suffix = fts_get_suffix(selected);
950 
951  /* Pop out each fts_node in word->nodes write them to auxiliary table */
952  while (ib_vector_size(word->nodes) > 0) {
953  dberr_t error;
954  fts_node_t* fts_node;
955 
956  fts_node = static_cast<fts_node_t*>(ib_vector_pop(word->nodes));
957 
958  error = fts_write_node(
959  trx, &ins_graph[selected], fts_table, &word->text,
960  fts_node);
961 
962  if (error != DB_SUCCESS) {
963  fprintf(stderr, "InnoDB: failed to write"
964  " word %s to FTS auxiliary index"
965  " table, error (%s) \n",
966  word->text.f_str, ut_strerr(error));
967  ret = error;
968  }
969 
970  ut_free(fts_node->ilist);
971  fts_node->ilist = NULL;
972  }
973 
974  return(ret);
975 }
976 
977 /*********************************************************************/
980 UNIV_INTERN
981 void
983 /*=================*/
985  ins_ctx,
988  ib_vector_t* positions,
989  doc_id_t* in_doc_id,
990  dtuple_t* dtuple)
991 {
992  fts_node_t* fts_node = NULL;
993  dfield_t* dfield;
994  doc_id_t doc_id;
995  ulint position;
996  fts_string_t token_word;
997  ulint i;
998 
999  /* Get fts_node for the FTS auxillary INDEX table */
1000  if (ib_vector_size(word->nodes) > 0) {
1001  fts_node = static_cast<fts_node_t*>(
1002  ib_vector_last(word->nodes));
1003  }
1004 
1005  if (fts_node == NULL
1006  || fts_node->ilist_size > FTS_ILIST_MAX_SIZE) {
1007 
1008  fts_node = static_cast<fts_node_t*>(
1009  ib_vector_push(word->nodes, NULL));
1010 
1011  memset(fts_node, 0x0, sizeof(*fts_node));
1012  }
1013 
1014  /* If dtuple == NULL, this is the last word to be processed */
1015  if (!dtuple) {
1016  if (fts_node && ib_vector_size(positions) > 0) {
1018  NULL, fts_node, *in_doc_id,
1019  positions);
1020 
1021  /* Write out the current word */
1022  row_merge_write_fts_word(ins_ctx->trx,
1023  ins_ctx->ins_graph, word,
1024  &ins_ctx->fts_table,
1025  ins_ctx->charset);
1026 
1027  }
1028 
1029  return;
1030  }
1031 
1032  /* Get the first field for the tokenized word */
1033  dfield = dtuple_get_nth_field(dtuple, 0);
1034 
1035  token_word.f_n_char = 0;
1036  token_word.f_len = dfield->len;
1037  token_word.f_str = static_cast<byte*>(dfield_get_data(dfield));
1038 
1039  if (!word->text.f_str) {
1040  fts_utf8_string_dup(&word->text, &token_word, ins_ctx->heap);
1041  }
1042 
1043  /* compare to the last word, to see if they are the same
1044  word */
1045  if (innobase_fts_text_cmp(ins_ctx->charset,
1046  &word->text, &token_word) != 0) {
1047  ulint num_item;
1048 
1049  /* Getting a new word, flush the last position info
1050  for the currnt word in fts_node */
1051  if (ib_vector_size(positions) > 0) {
1053  NULL, fts_node, *in_doc_id, positions);
1054  }
1055 
1056  /* Write out the current word */
1057  row_merge_write_fts_word(ins_ctx->trx, ins_ctx->ins_graph,
1058  word, &ins_ctx->fts_table,
1059  ins_ctx->charset);
1060 
1061  /* Copy the new word */
1062  fts_utf8_string_dup(&word->text, &token_word, ins_ctx->heap);
1063 
1064  num_item = ib_vector_size(positions);
1065 
1066  /* Clean up position queue */
1067  for (i = 0; i < num_item; i++) {
1068  ib_vector_pop(positions);
1069  }
1070 
1071  /* Reset Doc ID */
1072  *in_doc_id = 0;
1073  memset(fts_node, 0x0, sizeof(*fts_node));
1074  }
1075 
1076  /* Get the word's Doc ID */
1077  dfield = dtuple_get_nth_field(dtuple, 1);
1078 
1079  if (!ins_ctx->opt_doc_id_size) {
1080  doc_id = fts_read_doc_id(
1081  static_cast<byte*>(dfield_get_data(dfield)));
1082  } else {
1083  doc_id = (doc_id_t) mach_read_from_4(
1084  static_cast<byte*>(dfield_get_data(dfield)));
1085  }
1086 
1087  /* Get the word's position info */
1088  dfield = dtuple_get_nth_field(dtuple, 2);
1089  position = mach_read_from_4(static_cast<byte*>(dfield_get_data(dfield)));
1090 
1091  /* If this is the same word as the last word, and they
1092  have the same Doc ID, we just need to add its position
1093  info. Otherwise, we will flush position info to the
1094  fts_node and initiate a new position vector */
1095  if (!(*in_doc_id) || *in_doc_id == doc_id) {
1096  ib_vector_push(positions, &position);
1097  } else {
1098  ulint num_pos = ib_vector_size(positions);
1099 
1100  fts_cache_node_add_positions(NULL, fts_node,
1101  *in_doc_id, positions);
1102  for (i = 0; i < num_pos; i++) {
1103  ib_vector_pop(positions);
1104  }
1105  ib_vector_push(positions, &position);
1106  }
1107 
1108  /* record the current Doc ID */
1109  *in_doc_id = doc_id;
1110 }
1111 
1112 /*********************************************************************/
1115 static
1116 int
1117 row_fts_sel_tree_propagate(
1118 /*=======================*/
1119  int propogated, /*<! in: tree node propagated */
1120  int* sel_tree, /*<! in: selection tree */
1121  const mrec_t** mrec, /*<! in: sort record */
1122  ulint** offsets, /*<! in: record offsets */
1123  dict_index_t* index) /*<! in/out: FTS index */
1124 {
1125  ulint parent;
1126  int child_left;
1127  int child_right;
1128  int selected;
1129 
1130  /* Find which parent this value will be propagated to */
1131  parent = (propogated - 1) / 2;
1132 
1133  /* Find out which value is smaller, and to propagate */
1134  child_left = sel_tree[parent * 2 + 1];
1135  child_right = sel_tree[parent * 2 + 2];
1136 
1137  if (child_left == -1 || mrec[child_left] == NULL) {
1138  if (child_right == -1
1139  || mrec[child_right] == NULL) {
1140  selected = -1;
1141  } else {
1142  selected = child_right ;
1143  }
1144  } else if (child_right == -1
1145  || mrec[child_right] == NULL) {
1146  selected = child_left;
1147  } else if (cmp_rec_rec_simple(mrec[child_left], mrec[child_right],
1148  offsets[child_left],
1149  offsets[child_right],
1150  index, NULL) < 0) {
1151  selected = child_left;
1152  } else {
1153  selected = child_right;
1154  }
1155 
1156  sel_tree[parent] = selected;
1157 
1158  return(parent);
1159 }
1160 
1161 /*********************************************************************/
1164 static
1165 int
1166 row_fts_sel_tree_update(
1167 /*====================*/
1168  int* sel_tree, /*<! in/out: selection tree */
1169  ulint propagated, /*<! in: node to propagate up */
1170  ulint height, /*<! in: tree height */
1171  const mrec_t** mrec, /*<! in: sort record */
1172  ulint** offsets, /*<! in: record offsets */
1173  dict_index_t* index) /*<! in: index dictionary */
1174 {
1175  ulint i;
1176 
1177  for (i = 1; i <= height; i++) {
1178  propagated = row_fts_sel_tree_propagate(
1179  propagated, sel_tree, mrec, offsets, index);
1180  }
1181 
1182  return(sel_tree[0]);
1183 }
1184 
1185 /*********************************************************************/
1187 static
1188 void
1189 row_fts_build_sel_tree_level(
1190 /*=========================*/
1191  int* sel_tree, /*<! in/out: selection tree */
1192  ulint level, /*<! in: selection tree level */
1193  const mrec_t** mrec, /*<! in: sort record */
1194  ulint** offsets, /*<! in: record offsets */
1195  dict_index_t* index) /*<! in: index dictionary */
1196 {
1197  ulint start;
1198  int child_left;
1199  int child_right;
1200  ulint i;
1201  ulint num_item;
1202 
1203  start = (1 << level) - 1;
1204  num_item = (1 << level);
1205 
1206  for (i = 0; i < num_item; i++) {
1207  child_left = sel_tree[(start + i) * 2 + 1];
1208  child_right = sel_tree[(start + i) * 2 + 2];
1209 
1210  if (child_left == -1) {
1211  if (child_right == -1) {
1212  sel_tree[start + i] = -1;
1213  } else {
1214  sel_tree[start + i] = child_right;
1215  }
1216  continue;
1217  } else if (child_right == -1) {
1218  sel_tree[start + i] = child_left;
1219  continue;
1220  }
1221 
1222  /* Deal with NULL child conditions */
1223  if (!mrec[child_left]) {
1224  if (!mrec[child_right]) {
1225  sel_tree[start + i] = -1;
1226  } else {
1227  sel_tree[start + i] = child_right;
1228  }
1229  continue;
1230  } else if (!mrec[child_right]) {
1231  sel_tree[start + i] = child_left;
1232  continue;
1233  }
1234 
1235  /* Select the smaller one to set parent pointer */
1236  int cmp = cmp_rec_rec_simple(
1237  mrec[child_left], mrec[child_right],
1238  offsets[child_left], offsets[child_right],
1239  index, NULL);
1240 
1241  sel_tree[start + i] = cmp < 0 ? child_left : child_right;
1242  }
1243 }
1244 
1245 /*********************************************************************/
1249 static
1250 ulint
1251 row_fts_build_sel_tree(
1252 /*===================*/
1253  int* sel_tree, /*<! in/out: selection tree */
1254  const mrec_t** mrec, /*<! in: sort record */
1255  ulint** offsets, /*<! in: record offsets */
1256  dict_index_t* index) /*<! in: index dictionary */
1257 {
1258  ulint treelevel = 1;
1259  ulint num = 2;
1260  int i = 0;
1261  ulint start;
1262 
1263  /* No need to build selection tree if we only have two merge threads */
1264  if (fts_sort_pll_degree <= 2) {
1265  return(0);
1266  }
1267 
1268  while (num < fts_sort_pll_degree) {
1269  num = num << 1;
1270  treelevel++;
1271  }
1272 
1273  start = (1 << treelevel) - 1;
1274 
1275  for (i = 0; i < (int) fts_sort_pll_degree; i++) {
1276  sel_tree[i + start] = i;
1277  }
1278 
1279  for (i = treelevel - 1; i >=0; i--) {
1280  row_fts_build_sel_tree_level(sel_tree, i, mrec, offsets, index);
1281  }
1282 
1283  return(treelevel);
1284 }
1285 
1286 /*********************************************************************/
1290 UNIV_INTERN
1291 dberr_t
1293 /*=================*/
1294  dict_index_t* index,
1295  dict_table_t* table,
1296  fts_psort_t* psort_info,
1297  ulint id) /* !< in: which auxiliary table's data
1298  to insert to */
1299 {
1300  const byte** b;
1301  mem_heap_t* tuple_heap;
1302  mem_heap_t* heap;
1303  dberr_t error = DB_SUCCESS;
1304  ulint* foffs;
1305  ulint** offsets;
1306  fts_tokenizer_word_t new_word;
1307  ib_vector_t* positions;
1308  doc_id_t last_doc_id;
1309  ib_alloc_t* heap_alloc;
1310  ulint n_bytes;
1311  ulint i;
1312  mrec_buf_t** buf;
1313  int* fd;
1314  byte** block;
1315  const mrec_t** mrec;
1316  ulint count = 0;
1317  int* sel_tree;
1318  ulint height;
1319  ulint start;
1320  fts_psort_insert_t ins_ctx;
1321  ulint count_diag = 0;
1322 
1323  ut_ad(index);
1324  ut_ad(table);
1325 
1326  /* We use the insert query graph as the dummy graph
1327  needed in the row module call */
1328 
1329  ins_ctx.trx = trx_allocate_for_background();
1330 
1331  ins_ctx.trx->op_info = "inserting index entries";
1332 
1333  ins_ctx.opt_doc_id_size = psort_info[0].psort_common->opt_doc_id_size;
1334 
1335  heap = mem_heap_create(500 + sizeof(mrec_buf_t));
1336 
1337  b = (const byte**) mem_heap_alloc(
1338  heap, sizeof (*b) * fts_sort_pll_degree);
1339  foffs = (ulint*) mem_heap_alloc(
1340  heap, sizeof(*foffs) * fts_sort_pll_degree);
1341  offsets = (ulint**) mem_heap_alloc(
1342  heap, sizeof(*offsets) * fts_sort_pll_degree);
1343  buf = (mrec_buf_t**) mem_heap_alloc(
1344  heap, sizeof(*buf) * fts_sort_pll_degree);
1345  fd = (int*) mem_heap_alloc(heap, sizeof(*fd) * fts_sort_pll_degree);
1346  block = (byte**) mem_heap_alloc(
1347  heap, sizeof(*block) * fts_sort_pll_degree);
1348  mrec = (const mrec_t**) mem_heap_alloc(
1349  heap, sizeof(*mrec) * fts_sort_pll_degree);
1350  sel_tree = (int*) mem_heap_alloc(
1351  heap, sizeof(*sel_tree) * (fts_sort_pll_degree * 2));
1352 
1353  tuple_heap = mem_heap_create(1000);
1354 
1355  ins_ctx.charset = fts_index_get_charset(index);
1356  ins_ctx.heap = heap;
1357 
1358  for (i = 0; i < fts_sort_pll_degree; i++) {
1359  ulint num;
1360 
1361  num = 1 + REC_OFFS_HEADER_SIZE
1362  + dict_index_get_n_fields(index);
1363  offsets[i] = static_cast<ulint*>(mem_heap_zalloc(
1364  heap, num * sizeof *offsets[i]));
1365  offsets[i][0] = num;
1366  offsets[i][1] = dict_index_get_n_fields(index);
1367  block[i] = psort_info[i].merge_block[id];
1368  b[i] = psort_info[i].merge_block[id];
1369  fd[i] = psort_info[i].merge_file[id]->fd;
1370  foffs[i] = 0;
1371 
1372  buf[i] = static_cast<unsigned char (*)[16384]>(
1373  mem_heap_alloc(heap, sizeof *buf[i]));
1374  count_diag += (int) psort_info[i].merge_file[id]->n_rec;
1375  }
1376 
1377  if (fts_enable_diag_print) {
1378  ut_print_timestamp(stderr);
1379  fprintf(stderr, " InnoDB_FTS: to inserted %lu records\n",
1380  (ulong) count_diag);
1381  }
1382 
1383  /* Initialize related variables if creating FTS indexes */
1384  heap_alloc = ib_heap_allocator_create(heap);
1385 
1386  memset(&new_word, 0, sizeof(new_word));
1387 
1388  new_word.nodes = ib_vector_create(heap_alloc, sizeof(fts_node_t), 4);
1389  positions = ib_vector_create(heap_alloc, sizeof(ulint), 32);
1390  last_doc_id = 0;
1391 
1392  /* Allocate insert query graphs for FTS auxillary
1393  Index Table, note we have FTS_NUM_AUX_INDEX such index tables */
1394  n_bytes = sizeof(que_t*) * (FTS_NUM_AUX_INDEX + 1);
1395  ins_ctx.ins_graph = static_cast<que_t**>(mem_heap_alloc(heap, n_bytes));
1396  memset(ins_ctx.ins_graph, 0x0, n_bytes);
1397 
1398  ins_ctx.fts_table.type = FTS_INDEX_TABLE;
1399  ins_ctx.fts_table.index_id = index->id;
1400  ins_ctx.fts_table.table_id = table->id;
1401  ins_ctx.fts_table.parent = index->table->name;
1402  ins_ctx.fts_table.table = NULL;
1403 
1404  for (i = 0; i < fts_sort_pll_degree; i++) {
1405  if (psort_info[i].merge_file[id]->n_rec == 0) {
1406  /* No Rows to read */
1407  mrec[i] = b[i] = NULL;
1408  } else {
1409  /* Read from temp file only if it has been
1410  written to. Otherwise, block memory holds
1411  all the sorted records */
1412  if (psort_info[i].merge_file[id]->offset > 0
1413  && (!row_merge_read(
1414  fd[i], foffs[i],
1415  (row_merge_block_t*) block[i]))) {
1416  error = DB_CORRUPTION;
1417  goto exit;
1418  }
1419 
1421  }
1422  }
1423 
1424  height = row_fts_build_sel_tree(sel_tree, (const mrec_t **) mrec,
1425  offsets, index);
1426 
1427  start = (1 << height) - 1;
1428 
1429  /* Fetch sorted records from sort buffer and insert them into
1430  corresponding FTS index auxiliary tables */
1431  for (;;) {
1432  dtuple_t* dtuple;
1433  ulint n_ext;
1434  int min_rec = 0;
1435 
1436  if (fts_sort_pll_degree <= 2) {
1437  while (!mrec[min_rec]) {
1438  min_rec++;
1439 
1440  if (min_rec >= (int) fts_sort_pll_degree) {
1442  &ins_ctx, &new_word,
1443  positions, &last_doc_id,
1444  NULL);
1445 
1446  goto exit;
1447  }
1448  }
1449 
1450  for (i = min_rec + 1; i < fts_sort_pll_degree; i++) {
1451  if (!mrec[i]) {
1452  continue;
1453  }
1454 
1455  if (cmp_rec_rec_simple(
1456  mrec[i], mrec[min_rec],
1457  offsets[i], offsets[min_rec],
1458  index, NULL) < 0) {
1459  min_rec = i;
1460  }
1461  }
1462  } else {
1463  min_rec = sel_tree[0];
1464 
1465  if (min_rec == -1) {
1467  &ins_ctx, &new_word,
1468  positions, &last_doc_id,
1469  NULL);
1470 
1471  goto exit;
1472  }
1473  }
1474 
1475  dtuple = row_rec_to_index_entry_low(
1476  mrec[min_rec], index, offsets[min_rec], &n_ext,
1477  tuple_heap);
1478 
1480  &ins_ctx, &new_word, positions,
1481  &last_doc_id, dtuple);
1482 
1483 
1484  ROW_MERGE_READ_GET_NEXT(min_rec);
1485 
1486  if (fts_sort_pll_degree > 2) {
1487  if (!mrec[min_rec]) {
1488  sel_tree[start + min_rec] = -1;
1489  }
1490 
1491  row_fts_sel_tree_update(sel_tree, start + min_rec,
1492  height, mrec,
1493  offsets, index);
1494  }
1495 
1496  count++;
1497 
1498  mem_heap_empty(tuple_heap);
1499  }
1500 
1501 exit:
1502  fts_sql_commit(ins_ctx.trx);
1503 
1504  ins_ctx.trx->op_info = "";
1505 
1506  mem_heap_free(tuple_heap);
1507 
1508  for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
1509  if (ins_ctx.ins_graph[i]) {
1510  fts_que_graph_free(ins_ctx.ins_graph[i]);
1511  }
1512  }
1513 
1514  trx_free_for_background(ins_ctx.trx);
1515 
1516  mem_heap_free(heap);
1517 
1518  if (fts_enable_diag_print) {
1519  ut_print_timestamp(stderr);
1520  fprintf(stderr, " InnoDB_FTS: inserted %lu records\n",
1521  (ulong) count);
1522  }
1523 
1524  return(error);
1525 }