MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ha_archive.cc
1 /*
2  Copyright (c) 2004, 2012, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or
5  modify it under the terms of the GNU General Public License
6  as published by the Free Software Foundation; version 2 of
7  the License.
8 
9  This program is distributed in the hope that it will be useful,
10  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  GNU General Public License for more details.
13 
14  You should have received a copy of the GNU General Public License
15  along with this program; if not, write to the Free Software
16  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18 
19 #include "sql_priv.h"
20 #include "probes_mysql.h"
21 #include "sql_class.h" // SSV
22 #include "sql_table.h"
23 #include <myisam.h>
24 
25 #include "ha_archive.h"
26 #include <my_dir.h>
27 
28 #include <mysql/plugin.h>
29 
30 /*
31  First, if you want to understand storage engines you should look at
32  ha_example.cc and ha_example.h.
33 
34  This example was written as a test case for a customer who needed
35  a storage engine without indexes that could compress data very well.
36  So, welcome to a completely compressed storage engine. This storage
37  engine only does inserts. No replace, deletes, or updates. All reads are
38  complete table scans. Compression is done through a combination of packing
39  and making use of the zlib library
40 
41  We keep a file pointer open for each instance of ha_archive for each read
42  but for writes we keep one open file handle just for that. We flush it
43  only if we have a read occur. azip handles compressing lots of records
44  at once much better then doing lots of little records between writes.
45  It is possible to not lock on writes but this would then mean we couldn't
46  handle bulk inserts as well (that is if someone was trying to read at
47  the same time since we would want to flush).
48 
49  A "meta" file is kept alongside the data file. This file serves two purpose.
50  The first purpose is to track the number of rows in the table. The second
51  purpose is to determine if the table was closed properly or not. When the
52  meta file is first opened it is marked as dirty. It is opened when the table
53  itself is opened for writing. When the table is closed the new count for rows
54  is written to the meta file and the file is marked as clean. If the meta file
55  is opened and it is marked as dirty, it is assumed that a crash occured. At
56  this point an error occurs and the user is told to rebuild the file.
57  A rebuild scans the rows and rewrites the meta file. If corruption is found
58  in the data file then the meta file is not repaired.
59 
60  At some point a recovery method for such a drastic case needs to be divised.
61 
62  Locks are row level, and you will get a consistant read.
63 
64  For performance as far as table scans go it is quite fast. I don't have
65  good numbers but locally it has out performed both Innodb and MyISAM. For
66  Innodb the question will be if the table can be fit into the buffer
67  pool. For MyISAM its a question of how much the file system caches the
68  MyISAM file. With enough free memory MyISAM is faster. Its only when the OS
69  doesn't have enough memory to cache entire table that archive turns out
70  to be any faster.
71 
72  Examples between MyISAM (packed) and Archive.
73 
74  Table with 76695844 identical rows:
75  29680807 a_archive.ARZ
76  920350317 a.MYD
77 
78 
79  Table with 8991478 rows (all of Slashdot's comments):
80  1922964506 comment_archive.ARZ
81  2944970297 comment_text.MYD
82 
83 
84  TODO:
85  Allow users to set compression level.
86  Allow adjustable block size.
87  Implement versioning, should be easy.
88  Allow for errors, find a way to mark bad rows.
89  Add optional feature so that rows can be flushed at interval (which will cause less
90  compression but may speed up ordered searches).
91  Checkpoint the meta file to allow for faster rebuilds.
92  Option to allow for dirty reads, this would lower the sync calls, which would make
93  inserts a lot faster, but would mean highly arbitrary reads.
94 
95  -Brian
96 
97  Archive file format versions:
98  <5.1.5 - v.1
99  5.1.5-5.1.15 - v.2
100  >5.1.15 - v.3
101 */
102 
103 /* The file extension */
104 #define ARZ ".ARZ" // The data file
105 #define ARN ".ARN" // Files used during an optimize call
106 #define ARM ".ARM" // Meta file (deprecated)
107 
108 /* 5.0 compatibility */
109 #define META_V1_OFFSET_CHECK_HEADER 0
110 #define META_V1_OFFSET_VERSION 1
111 #define META_V1_OFFSET_ROWS_RECORDED 2
112 #define META_V1_OFFSET_CHECK_POINT 10
113 #define META_V1_OFFSET_CRASHED 18
114 #define META_V1_LENGTH 19
115 
116 /*
117  uchar + uchar
118 */
119 #define DATA_BUFFER_SIZE 2 // Size of the data used in the data file
120 #define ARCHIVE_CHECK_HEADER 254 // The number we use to determine corruption
121 
122 #ifdef HAVE_PSI_INTERFACE
123 extern "C" PSI_file_key arch_key_file_data;
124 #endif
125 
126 /* Static declarations for handerton */
127 static handler *archive_create_handler(handlerton *hton,
128  TABLE_SHARE *table,
129  MEM_ROOT *mem_root);
130 int archive_discover(handlerton *hton, THD* thd, const char *db,
131  const char *name,
132  uchar **frmblob,
133  size_t *frmlen);
134 
135 /*
136  Number of rows that will force a bulk insert.
137 */
138 #define ARCHIVE_MIN_ROWS_TO_USE_BULK_INSERT 2
139 
140 /*
141  Size of header used for row
142 */
143 #define ARCHIVE_ROW_HEADER_SIZE 4
144 
145 static handler *archive_create_handler(handlerton *hton,
146  TABLE_SHARE *table,
147  MEM_ROOT *mem_root)
148 {
149  return new (mem_root) ha_archive(hton, table);
150 }
151 
152 
153 #ifdef HAVE_PSI_INTERFACE
154 PSI_mutex_key az_key_mutex_Archive_share_mutex;
155 
156 static PSI_mutex_info all_archive_mutexes[]=
157 {
158  { &az_key_mutex_Archive_share_mutex, "Archive_share::mutex", 0}
159 };
160 
161 PSI_file_key arch_key_file_metadata, arch_key_file_data, arch_key_file_frm;
162 static PSI_file_info all_archive_files[]=
163 {
164  { &arch_key_file_metadata, "metadata", 0},
165  { &arch_key_file_data, "data", 0},
166  { &arch_key_file_frm, "FRM", 0}
167 };
168 
169 static void init_archive_psi_keys(void)
170 {
171  const char* category= "archive";
172  int count;
173 
174  count= array_elements(all_archive_mutexes);
175  mysql_mutex_register(category, all_archive_mutexes, count);
176 
177  count= array_elements(all_archive_files);
178  mysql_file_register(category, all_archive_files, count);
179 }
180 
181 
182 #endif /* HAVE_PSI_INTERFACE */
183 
184 /*
185  Initialize the archive handler.
186 
187  SYNOPSIS
188  archive_db_init()
189  void *
190 
191  RETURN
192  FALSE OK
193  TRUE Error
194 */
195 
196 int archive_db_init(void *p)
197 {
198  DBUG_ENTER("archive_db_init");
199  handlerton *archive_hton;
200 
201 #ifdef HAVE_PSI_INTERFACE
202  init_archive_psi_keys();
203 #endif
204 
205  archive_hton= (handlerton *)p;
206  archive_hton->state= SHOW_OPTION_YES;
207  archive_hton->db_type= DB_TYPE_ARCHIVE_DB;
208  archive_hton->create= archive_create_handler;
209  archive_hton->flags= HTON_NO_FLAGS;
210  archive_hton->discover= archive_discover;
211 
212  DBUG_RETURN(0);
213 }
214 
215 
216 Archive_share::Archive_share()
217 {
218  crashed= false;
219  in_optimize= false;
220  archive_write_open= false;
221  dirty= false;
222  DBUG_PRINT("ha_archive", ("Archive_share: %p",
223  this));
224  thr_lock_init(&lock);
225  /*
226  We will use this lock for rows.
227  */
228  mysql_mutex_init(az_key_mutex_Archive_share_mutex,
229  &mutex, MY_MUTEX_INIT_FAST);
230 }
231 
232 
233 ha_archive::ha_archive(handlerton *hton, TABLE_SHARE *table_arg)
234  :handler(hton, table_arg), share(NULL), delayed_insert(0), bulk_insert(0)
235 {
236  /* Set our original buffer from pre-allocated memory */
237  buffer.set((char *)byte_buffer, IO_SIZE, system_charset_info);
238 
239  /* The size of the offset value we will use for position() */
240  ref_length= sizeof(my_off_t);
241  archive_reader_open= FALSE;
242 }
243 
244 int archive_discover(handlerton *hton, THD* thd, const char *db,
245  const char *name,
246  uchar **frmblob,
247  size_t *frmlen)
248 {
249  DBUG_ENTER("archive_discover");
250  DBUG_PRINT("archive_discover", ("db: %s, name: %s", db, name));
251  azio_stream frm_stream;
252  char az_file[FN_REFLEN];
253  char *frm_ptr;
254  MY_STAT file_stat;
255 
256  build_table_filename(az_file, sizeof(az_file) - 1, db, name, ARZ, 0);
257 
258  if (!(mysql_file_stat(arch_key_file_data, az_file, &file_stat, MYF(0))))
259  goto err;
260 
261  if (!(azopen(&frm_stream, az_file, O_RDONLY|O_BINARY)))
262  {
263  if (errno == EROFS || errno == EACCES)
264  DBUG_RETURN(my_errno= errno);
265  DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
266  }
267 
268  if (frm_stream.frm_length == 0)
269  goto err;
270 
271  frm_ptr= (char *)my_malloc(sizeof(char) * frm_stream.frm_length, MYF(0));
272  azread_frm(&frm_stream, frm_ptr);
273  azclose(&frm_stream);
274 
275  *frmlen= frm_stream.frm_length;
276  *frmblob= (uchar*) frm_ptr;
277 
278  DBUG_RETURN(0);
279 err:
280  my_errno= 0;
281  DBUG_RETURN(1);
282 }
283 
284 static void save_auto_increment(TABLE *table, ulonglong *value)
285 {
286  Field *field= table->found_next_number_field;
287  ulonglong auto_value=
288  (ulonglong) field->val_int(table->record[0] +
289  field->offset(table->record[0]));
290  if (*value <= auto_value)
291  *value= auto_value + 1;
292 }
293 
303 {
304  char file_name[FN_REFLEN];
305  uchar buf[META_V1_LENGTH];
306  File fd;
307  DBUG_ENTER("Archive_share::read_v1_metafile");
308 
309  fn_format(file_name, data_file_name, "", ARM, MY_REPLACE_EXT);
310  if ((fd= mysql_file_open(arch_key_file_metadata, file_name, O_RDONLY, MYF(0))) == -1)
311  DBUG_RETURN(-1);
312 
313  if (mysql_file_read(fd, buf, sizeof(buf), MYF(0)) != sizeof(buf))
314  {
315  mysql_file_close(fd, MYF(0));
316  DBUG_RETURN(-1);
317  }
318 
319  rows_recorded= uint8korr(buf + META_V1_OFFSET_ROWS_RECORDED);
320  crashed= buf[META_V1_OFFSET_CRASHED];
321  mysql_file_close(fd, MYF(0));
322  DBUG_RETURN(0);
323 }
324 
325 
335 {
336  char file_name[FN_REFLEN];
337  uchar buf[META_V1_LENGTH];
338  File fd;
339  DBUG_ENTER("Archive_share::write_v1_metafile");
340 
341  buf[META_V1_OFFSET_CHECK_HEADER]= ARCHIVE_CHECK_HEADER;
342  buf[META_V1_OFFSET_VERSION]= 1;
343  int8store(buf + META_V1_OFFSET_ROWS_RECORDED, rows_recorded);
344  int8store(buf + META_V1_OFFSET_CHECK_POINT, (ulonglong) 0);
345  buf[META_V1_OFFSET_CRASHED]= crashed;
346 
347  fn_format(file_name, data_file_name, "", ARM, MY_REPLACE_EXT);
348  if ((fd= mysql_file_open(arch_key_file_metadata, file_name, O_WRONLY, MYF(0))) == -1)
349  DBUG_RETURN(-1);
350 
351  if (mysql_file_write(fd, buf, sizeof(buf), MYF(0)) != sizeof(buf))
352  {
353  mysql_file_close(fd, MYF(0));
354  DBUG_RETURN(-1);
355  }
356 
357  mysql_file_close(fd, MYF(0));
358  DBUG_RETURN(0);
359 }
360 
361 
370 unsigned int ha_archive::pack_row_v1(uchar *record)
371 {
372  uint *blob, *end;
373  uchar *pos;
374  DBUG_ENTER("pack_row_v1");
375  memcpy(record_buffer->buffer, record, table->s->reclength);
376  pos= record_buffer->buffer + table->s->reclength;
377  for (blob= table->s->blob_field, end= blob + table->s->blob_fields;
378  blob != end; blob++)
379  {
380  uint32 length= ((Field_blob *) table->field[*blob])->get_length();
381  if (length)
382  {
383  uchar *data_ptr;
384  ((Field_blob *) table->field[*blob])->get_ptr(&data_ptr);
385  memcpy(pos, data_ptr, length);
386  pos+= length;
387  }
388  }
389  DBUG_RETURN(pos - record_buffer->buffer);
390 }
391 
392 
393 /*
394  This method reads the header of a datafile and returns whether or not it was successful.
395 */
396 int ha_archive::read_data_header(azio_stream *file_to_read)
397 {
398  int error;
399  unsigned long ret;
400  uchar data_buffer[DATA_BUFFER_SIZE];
401  DBUG_ENTER("ha_archive::read_data_header");
402 
403  if (azrewind(file_to_read) == -1)
404  DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
405 
406  if (file_to_read->version >= 3)
407  DBUG_RETURN(0);
408  /* Everything below this is just legacy to version 2< */
409 
410  DBUG_PRINT("ha_archive", ("Reading legacy data header"));
411 
412  ret= azread(file_to_read, data_buffer, DATA_BUFFER_SIZE, &error);
413 
414  if (ret != DATA_BUFFER_SIZE)
415  {
416  DBUG_PRINT("ha_archive", ("Reading, expected %d got %lu",
417  DATA_BUFFER_SIZE, ret));
418  DBUG_RETURN(1);
419  }
420 
421  if (error)
422  {
423  DBUG_PRINT("ha_archive", ("Compression error (%d)", error));
424  DBUG_RETURN(1);
425  }
426 
427  DBUG_PRINT("ha_archive", ("Check %u", data_buffer[0]));
428  DBUG_PRINT("ha_archive", ("Version %u", data_buffer[1]));
429 
430  if ((data_buffer[0] != (uchar)ARCHIVE_CHECK_HEADER) &&
431  (data_buffer[1] != (uchar)ARCHIVE_VERSION))
432  DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
433 
434  DBUG_RETURN(0);
435 }
436 
437 
438 /*
439  We create the shared memory space that we will use for the open table.
440  No matter what we try to get or create a share. This is so that a repair
441  table operation can occur.
442 
443  See ha_example.cc for a longer description.
444 */
445 Archive_share *ha_archive::get_share(const char *table_name, int *rc)
446 {
447  Archive_share *tmp_share;
448 
449  DBUG_ENTER("ha_archive::get_share");
450 
452  if (!(tmp_share= static_cast<Archive_share*>(get_ha_share_ptr())))
453  {
454  azio_stream archive_tmp;
455 
456  tmp_share= new Archive_share;
457 
458  if (!tmp_share)
459  {
460  *rc= HA_ERR_OUT_OF_MEM;
461  goto err;
462  }
463  DBUG_PRINT("ha_archive", ("new Archive_share: %p",
464  tmp_share));
465 
466  fn_format(tmp_share->data_file_name, table_name, "",
467  ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
468  strmov(tmp_share->table_name, table_name);
469  DBUG_PRINT("ha_archive", ("Data File %s",
470  tmp_share->data_file_name));
471 
472  /*
473  We read the meta file, but do not mark it dirty. Since we are not
474  doing a write we won't mark it dirty (and we won't open it for
475  anything but reading... open it for write and we will generate null
476  compression writes).
477  */
478  if (!(azopen(&archive_tmp, tmp_share->data_file_name, O_RDONLY|O_BINARY)))
479  {
480  delete tmp_share;
481  *rc= my_errno ? my_errno : HA_ERR_CRASHED;
482  tmp_share= NULL;
483  goto err;
484  }
485  stats.auto_increment_value= archive_tmp.auto_increment + 1;
486  tmp_share->rows_recorded= (ha_rows)archive_tmp.rows;
487  tmp_share->crashed= archive_tmp.dirty;
488  share= tmp_share;
489  if (archive_tmp.version == 1)
490  share->read_v1_metafile();
491  azclose(&archive_tmp);
492 
493  set_ha_share_ptr(static_cast<Handler_share*>(tmp_share));
494  }
495  if (tmp_share->crashed)
496  *rc= HA_ERR_CRASHED_ON_USAGE;
497 err:
499 
500  DBUG_ASSERT(tmp_share || *rc);
501 
502  DBUG_RETURN(tmp_share);
503 }
504 
505 
506 int Archive_share::init_archive_writer()
507 {
508  DBUG_ENTER("Archive_share::init_archive_writer");
509  /*
510  It is expensive to open and close the data files and since you can't have
511  a gzip file that can be both read and written we keep a writer open
512  that is shared amoung all open tables.
513  */
514  if (!(azopen(&archive_write, data_file_name,
515  O_RDWR|O_BINARY)))
516  {
517  DBUG_PRINT("ha_archive", ("Could not open archive write file"));
518  crashed= true;
519  DBUG_RETURN(1);
520  }
521  archive_write_open= true;
522 
523  DBUG_RETURN(0);
524 }
525 
526 
527 void Archive_share::close_archive_writer()
528 {
529  mysql_mutex_assert_owner(&mutex);
530  if (archive_write_open)
531  {
532  if (archive_write.version == 1)
533  (void) write_v1_metafile();
534  azclose(&archive_write);
535  archive_write_open= false;
536  dirty= false;
537  }
538 }
539 
540 
541 /*
542  No locks are required because it is associated with just one handler instance
543 */
544 int ha_archive::init_archive_reader()
545 {
546  DBUG_ENTER("ha_archive::init_archive_reader");
547  /*
548  It is expensive to open and close the data files and since you can't have
549  a gzip file that can be both read and written we keep a writer open
550  that is shared amoung all open tables, but have one reader open for
551  each handler instance.
552  */
553  if (!archive_reader_open)
554  {
555  if (!(azopen(&archive, share->data_file_name, O_RDONLY|O_BINARY)))
556  {
557  DBUG_PRINT("ha_archive", ("Could not open archive read file"));
558  share->crashed= TRUE;
559  DBUG_RETURN(1);
560  }
561  archive_reader_open= TRUE;
562  }
563 
564  DBUG_RETURN(0);
565 }
566 
567 
568 /*
569  We just implement one additional file extension.
570 */
571 static const char *ha_archive_exts[] = {
572  ARZ,
573  NullS
574 };
575 
576 const char **ha_archive::bas_ext() const
577 {
578  return ha_archive_exts;
579 }
580 
581 
582 /*
583  When opening a file we:
584  Create/get our shared structure.
585  Init out lock.
586  We open the file we will read from.
587 */
588 int ha_archive::open(const char *name, int mode, uint open_options)
589 {
590  int rc= 0;
591  DBUG_ENTER("ha_archive::open");
592 
593  DBUG_PRINT("ha_archive", ("archive table was opened for crash: %s",
594  (open_options & HA_OPEN_FOR_REPAIR) ? "yes" : "no"));
595  share= get_share(name, &rc);
596  if (!share)
597  DBUG_RETURN(rc);
598 
599  /* Allow open on crashed table in repair mode only. */
600  switch (rc)
601  {
602  case 0:
603  break;
604  case HA_ERR_CRASHED_ON_USAGE:
605  if (open_options & HA_OPEN_FOR_REPAIR)
606  break;
607  /* fall through */
608  default:
609  DBUG_RETURN(rc);
610  }
611 
612  record_buffer= create_record_buffer(table->s->reclength +
613  ARCHIVE_ROW_HEADER_SIZE);
614 
615  if (!record_buffer)
616  DBUG_RETURN(HA_ERR_OUT_OF_MEM);
617 
618  thr_lock_data_init(&share->lock, &lock, NULL);
619 
620  DBUG_PRINT("ha_archive", ("archive table was crashed %s",
621  rc == HA_ERR_CRASHED_ON_USAGE ? "yes" : "no"));
622  if (rc == HA_ERR_CRASHED_ON_USAGE && open_options & HA_OPEN_FOR_REPAIR)
623  {
624  DBUG_RETURN(0);
625  }
626 
627  DBUG_RETURN(rc);
628 }
629 
630 
631 /*
632  Closes the file.
633 
634  SYNOPSIS
635  close();
636 
637  IMPLEMENTATION:
638 
639  We first close this storage engines file handle to the archive and
640  then remove our reference count to the table (and possibly free it
641  as well).
642 
643  RETURN
644  0 ok
645  1 Error
646 */
647 
648 int ha_archive::close(void)
649 {
650  int rc= 0;
651  DBUG_ENTER("ha_archive::close");
652 
653  destroy_record_buffer(record_buffer);
654 
655  if (archive_reader_open)
656  {
657  if (azclose(&archive))
658  rc= 1;
659  }
660 
661  DBUG_RETURN(rc);
662 }
663 
664 
665 void ha_archive::frm_load(const char *name, azio_stream *dst)
666 {
667  char name_buff[FN_REFLEN];
668  MY_STAT file_stat;
669  File frm_file;
670  uchar *frm_ptr;
671  DBUG_ENTER("ha_archive::frm_load");
672  fn_format(name_buff, name, "", ".frm", MY_REPLACE_EXT | MY_UNPACK_FILENAME);
673 
674  /* Here is where we open up the frm and pass it to archive to store */
675  if ((frm_file= mysql_file_open(arch_key_file_frm, name_buff, O_RDONLY, MYF(0))) >= 0)
676  {
677  if (!mysql_file_fstat(frm_file, &file_stat, MYF(MY_WME)))
678  {
679  frm_ptr= (uchar *) my_malloc(sizeof(uchar) * (size_t) file_stat.st_size, MYF(0));
680  if (frm_ptr)
681  {
682  if (mysql_file_read(frm_file, frm_ptr, (size_t) file_stat.st_size, MYF(0)) ==
683  (size_t) file_stat.st_size)
684  azwrite_frm(dst, (char *) frm_ptr, (size_t) file_stat.st_size);
685  my_free(frm_ptr);
686  }
687  }
688  mysql_file_close(frm_file, MYF(0));
689  }
690  DBUG_VOID_RETURN;
691 }
692 
693 
703 int ha_archive::frm_copy(azio_stream *src, azio_stream *dst)
704 {
705  int rc= 0;
706  char *frm_ptr;
707 
708  /* If there is no .frm in source stream, try to read .frm from file. */
709  if (!src->frm_length)
710  {
711  frm_load(table->s->normalized_path.str, dst);
712  return 0;
713  }
714 
715  if (!(frm_ptr= (char *) my_malloc(src->frm_length, MYF(0))))
716  return HA_ERR_OUT_OF_MEM;
717 
718  /* Write file offset is set to the end of the file. */
719  if (azread_frm(src, frm_ptr) ||
720  azwrite_frm(dst, frm_ptr, src->frm_length))
721  rc= my_errno ? my_errno : HA_ERR_INTERNAL_ERROR;
722 
723  my_free(frm_ptr);
724 
725  return rc;
726 }
727 
728 
729 /*
730  We create our data file here. The format is pretty simple.
731  You can read about the format of the data file above.
732  Unlike other storage engines we do not "pack" our data. Since we
733  are about to do a general compression, packing would just be a waste of
734  CPU time. If the table has blobs they are written after the row in the order
735  of creation.
736 */
737 
738 int ha_archive::create(const char *name, TABLE *table_arg,
739  HA_CREATE_INFO *create_info)
740 {
741  char name_buff[FN_REFLEN];
742  char linkname[FN_REFLEN];
743  int error;
744  azio_stream create_stream; /* Archive file we are working with */
745  MY_STAT file_stat; // Stat information for the data file
746 
747  DBUG_ENTER("ha_archive::create");
748 
749  stats.auto_increment_value= create_info->auto_increment_value;
750 
751  for (uint key= 0; key < table_arg->s->keys; key++)
752  {
753  KEY *pos= table_arg->key_info+key;
754  KEY_PART_INFO *key_part= pos->key_part;
755  KEY_PART_INFO *key_part_end= key_part + pos->user_defined_key_parts;
756 
757  for (; key_part != key_part_end; key_part++)
758  {
759  Field *field= key_part->field;
760 
761  if (!(field->flags & AUTO_INCREMENT_FLAG))
762  {
763  error= -1;
764  DBUG_PRINT("ha_archive", ("Index error in creating archive table"));
765  goto error;
766  }
767  }
768  }
769 
770  /*
771  We reuse name_buff since it is available.
772  */
773 #ifdef HAVE_READLINK
774  if (my_use_symdir &&
775  create_info->data_file_name &&
776  create_info->data_file_name[0] != '#')
777  {
778  DBUG_PRINT("ha_archive", ("archive will create stream file %s",
779  create_info->data_file_name));
780 
781  fn_format(name_buff, create_info->data_file_name, "", ARZ,
782  MY_REPLACE_EXT | MY_UNPACK_FILENAME);
783  fn_format(linkname, name, "", ARZ,
784  MY_REPLACE_EXT | MY_UNPACK_FILENAME);
785  }
786  else
787 #endif /* HAVE_READLINK */
788  {
789  if (create_info->data_file_name)
790  {
791  push_warning_printf(table_arg->in_use, Sql_condition::WARN_LEVEL_WARN,
792  WARN_OPTION_IGNORED,
793  ER_DEFAULT(WARN_OPTION_IGNORED),
794  "DATA DIRECTORY");
795  }
796  fn_format(name_buff, name, "", ARZ,
797  MY_REPLACE_EXT | MY_UNPACK_FILENAME);
798  linkname[0]= 0;
799  }
800 
801  /* Archive engine never uses INDEX DIRECTORY. */
802  if (create_info->index_file_name)
803  {
804  push_warning_printf(table_arg->in_use, Sql_condition::WARN_LEVEL_WARN,
805  WARN_OPTION_IGNORED,
806  ER_DEFAULT(WARN_OPTION_IGNORED),
807  "INDEX DIRECTORY");
808  }
809 
810  /*
811  There is a chance that the file was "discovered". In this case
812  just use whatever file is there.
813  */
814  if (!(mysql_file_stat(arch_key_file_data, name_buff, &file_stat, MYF(0))))
815  {
816  my_errno= 0;
817  if (!(azopen(&create_stream, name_buff, O_CREAT|O_RDWR|O_BINARY)))
818  {
819  error= errno;
820  goto error2;
821  }
822 
823  if (linkname[0])
824  my_symlink(name_buff, linkname, MYF(0));
825 
826  frm_load(name, &create_stream);
827 
828  if (create_info->comment.str)
829  azwrite_comment(&create_stream, create_info->comment.str,
830  create_info->comment.length);
831 
832  /*
833  Yes you need to do this, because the starting value
834  for the autoincrement may not be zero.
835  */
836  create_stream.auto_increment= stats.auto_increment_value ?
837  stats.auto_increment_value - 1 : 0;
838  if (azclose(&create_stream))
839  {
840  error= errno;
841  goto error2;
842  }
843  }
844  else
845  my_errno= 0;
846 
847  DBUG_PRINT("ha_archive", ("Creating File %s", name_buff));
848  DBUG_PRINT("ha_archive", ("Creating Link %s", linkname));
849 
850 
851  DBUG_RETURN(0);
852 
853 error2:
854  delete_table(name);
855 error:
856  /* Return error number, if we got one */
857  DBUG_RETURN(error ? error : -1);
858 }
859 
860 /*
861  This is where the actual row is written out.
862 */
863 int ha_archive::real_write_row(uchar *buf, azio_stream *writer)
864 {
865  my_off_t written;
866  unsigned int r_pack_length;
867  DBUG_ENTER("ha_archive::real_write_row");
868 
869  /* We pack the row for writing */
870  r_pack_length= pack_row(buf, writer);
871 
872  written= azwrite(writer, record_buffer->buffer, r_pack_length);
873  if (written != r_pack_length)
874  {
875  DBUG_PRINT("ha_archive", ("Wrote %d bytes expected %d",
876  (uint32) written,
877  (uint32)r_pack_length));
878  DBUG_RETURN(-1);
879  }
880 
881  if (!delayed_insert || !bulk_insert)
882  share->dirty= TRUE;
883 
884  DBUG_RETURN(0);
885 }
886 
887 
888 /*
889  Calculate max length needed for row. This includes
890  the bytes required for the length in the header.
891 */
892 
893 uint32 ha_archive::max_row_length(const uchar *buf)
894 {
895  uint32 length= (uint32)(table->s->reclength + table->s->fields*2);
896  length+= ARCHIVE_ROW_HEADER_SIZE;
897 
898  uint *ptr, *end;
899  for (ptr= table->s->blob_field, end=ptr + table->s->blob_fields ;
900  ptr != end ;
901  ptr++)
902  {
903  if (!table->field[*ptr]->is_null())
904  length += 2 + ((Field_blob*)table->field[*ptr])->get_length();
905  }
906 
907  return length;
908 }
909 
910 
911 unsigned int ha_archive::pack_row(uchar *record, azio_stream *writer)
912 {
913  uchar *ptr;
914 
915  DBUG_ENTER("ha_archive::pack_row");
916 
917 
918  if (fix_rec_buff(max_row_length(record)))
919  DBUG_RETURN(HA_ERR_OUT_OF_MEM); /* purecov: inspected */
920 
921  if (writer->version == 1)
922  DBUG_RETURN(pack_row_v1(record));
923 
924  /* Copy null bits */
925  memcpy(record_buffer->buffer+ARCHIVE_ROW_HEADER_SIZE,
926  record, table->s->null_bytes);
927  ptr= record_buffer->buffer + table->s->null_bytes + ARCHIVE_ROW_HEADER_SIZE;
928 
929  for (Field **field=table->field ; *field ; field++)
930  {
931  if (!((*field)->is_null()))
932  ptr= (*field)->pack(ptr, record + (*field)->offset(record));
933  }
934 
935  int4store(record_buffer->buffer, (int)(ptr - record_buffer->buffer -
936  ARCHIVE_ROW_HEADER_SIZE));
937  DBUG_PRINT("ha_archive",("Pack row length %u", (unsigned int)
938  (ptr - record_buffer->buffer -
939  ARCHIVE_ROW_HEADER_SIZE)));
940 
941  DBUG_RETURN((unsigned int) (ptr - record_buffer->buffer));
942 }
943 
944 
945 /*
946  Look at ha_archive::open() for an explanation of the row format.
947  Here we just write out the row.
948 
949  Wondering about start_bulk_insert()? We don't implement it for
950  archive since it optimizes for lots of writes. The only save
951  for implementing start_bulk_insert() is that we could skip
952  setting dirty to true each time.
953 */
954 int ha_archive::write_row(uchar *buf)
955 {
956  int rc;
957  uchar *read_buf= NULL;
958  ulonglong temp_auto;
959  uchar *record= table->record[0];
960  DBUG_ENTER("ha_archive::write_row");
961 
962  if (share->crashed)
963  DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
964 
965  ha_statistic_increment(&SSV::ha_write_count);
966  mysql_mutex_lock(&share->mutex);
967 
968  if (!share->archive_write_open && share->init_archive_writer())
969  {
970  rc= HA_ERR_CRASHED_ON_USAGE;
971  goto error;
972  }
973 
974  if (table->next_number_field && record == table->record[0])
975  {
976  KEY *mkey= &table->s->key_info[0]; // We only support one key right now
977  update_auto_increment();
978  temp_auto= (((Field_num*) table->next_number_field)->unsigned_flag ||
979  table->next_number_field->val_int() > 0 ?
980  table->next_number_field->val_int() : 0);
981 
982  /*
983  We don't support decremening auto_increment. They make the performance
984  just cry.
985  */
986  if (temp_auto <= share->archive_write.auto_increment &&
987  mkey->flags & HA_NOSAME)
988  {
989  rc= HA_ERR_FOUND_DUPP_KEY;
990  goto error;
991  }
992 #ifdef DEAD_CODE
993  /*
994  Bad news, this will cause a search for the unique value which is very
995  expensive since we will have to do a table scan which will lock up
996  all other writers during this period. This could perhaps be optimized
997  in the future.
998  */
999  {
1000  /*
1001  First we create a buffer that we can use for reading rows, and can pass
1002  to get_row().
1003  */
1004  if (!(read_buf= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME))))
1005  {
1006  rc= HA_ERR_OUT_OF_MEM;
1007  goto error;
1008  }
1009  /*
1010  All of the buffer must be written out or we won't see all of the
1011  data
1012  */
1013  azflush(&(share->archive_write), Z_SYNC_FLUSH);
1014  /*
1015  Set the position of the local read thread to the beginning position.
1016  */
1017  if (read_data_header(&archive))
1018  {
1019  rc= HA_ERR_CRASHED_ON_USAGE;
1020  goto error;
1021  }
1022 
1023  Field *mfield= table->next_number_field;
1024 
1025  while (!(get_row(&archive, read_buf)))
1026  {
1027  if (!memcmp(read_buf + mfield->offset(record),
1028  table->next_number_field->ptr,
1029  mfield->max_display_length()))
1030  {
1031  rc= HA_ERR_FOUND_DUPP_KEY;
1032  goto error;
1033  }
1034  }
1035  }
1036 #endif
1037  else
1038  {
1039  if (temp_auto > share->archive_write.auto_increment)
1040  stats.auto_increment_value=
1041  (share->archive_write.auto_increment= temp_auto) + 1;
1042  }
1043  }
1044 
1045  /*
1046  Notice that the global auto_increment has been increased.
1047  In case of a failed row write, we will never try to reuse the value.
1048  */
1049  share->rows_recorded++;
1050  rc= real_write_row(buf, &(share->archive_write));
1051 error:
1052  mysql_mutex_unlock(&share->mutex);
1053  if (read_buf)
1054  my_free(read_buf);
1055  DBUG_RETURN(rc);
1056 }
1057 
1058 
1059 void ha_archive::get_auto_increment(ulonglong offset, ulonglong increment,
1060  ulonglong nb_desired_values,
1061  ulonglong *first_value,
1062  ulonglong *nb_reserved_values)
1063 {
1064  *nb_reserved_values= ULONGLONG_MAX;
1065  *first_value= share->archive_write.auto_increment + 1;
1066 }
1067 
1068 /* Initialized at each key walk (called multiple times unlike rnd_init()) */
1069 int ha_archive::index_init(uint keynr, bool sorted)
1070 {
1071  DBUG_ENTER("ha_archive::index_init");
1072  active_index= keynr;
1073  DBUG_RETURN(0);
1074 }
1075 
1076 
1077 /*
1078  No indexes, so if we get a request for an index search since we tell
1079  the optimizer that we have unique indexes, we scan
1080 */
1081 int ha_archive::index_read(uchar *buf, const uchar *key,
1082  uint key_len, enum ha_rkey_function find_flag)
1083 {
1084  int rc;
1085  DBUG_ENTER("ha_archive::index_read");
1086  MYSQL_INDEX_READ_ROW_START(table_share->db.str, table_share->table_name.str);
1087  rc= index_read_idx(buf, active_index, key, key_len, find_flag);
1088  MYSQL_INDEX_READ_ROW_DONE(rc);
1089  DBUG_RETURN(rc);
1090 }
1091 
1092 
1093 int ha_archive::index_read_idx(uchar *buf, uint index, const uchar *key,
1094  uint key_len, enum ha_rkey_function find_flag)
1095 {
1096  int rc;
1097  bool found= 0;
1098  KEY *mkey= &table->s->key_info[index];
1099  current_k_offset= mkey->key_part->offset;
1100  current_key= key;
1101  current_key_len= key_len;
1102 
1103 
1104  DBUG_ENTER("ha_archive::index_read_idx");
1105 
1106  rc= rnd_init(TRUE);
1107 
1108  if (rc)
1109  goto error;
1110 
1111  while (!(get_row(&archive, buf)))
1112  {
1113  if (!memcmp(current_key, buf + current_k_offset, current_key_len))
1114  {
1115  found= 1;
1116  break;
1117  }
1118  }
1119 
1120  if (found)
1121  {
1122  /* notify handler that a record has been found */
1123  table->status= 0;
1124  DBUG_RETURN(0);
1125  }
1126 
1127 error:
1128  DBUG_RETURN(rc ? rc : HA_ERR_END_OF_FILE);
1129 }
1130 
1131 
1132 int ha_archive::index_next(uchar * buf)
1133 {
1134  bool found= 0;
1135  int rc;
1136 
1137  DBUG_ENTER("ha_archive::index_next");
1138  MYSQL_INDEX_READ_ROW_START(table_share->db.str, table_share->table_name.str);
1139 
1140  while (!(get_row(&archive, buf)))
1141  {
1142  if (!memcmp(current_key, buf+current_k_offset, current_key_len))
1143  {
1144  found= 1;
1145  break;
1146  }
1147  }
1148 
1149  rc= found ? 0 : HA_ERR_END_OF_FILE;
1150  MYSQL_INDEX_READ_ROW_DONE(rc);
1151  DBUG_RETURN(rc);
1152 }
1153 
1154 /*
1155  All calls that need to scan the table start with this method. If we are told
1156  that it is a table scan we rewind the file to the beginning, otherwise
1157  we assume the position will be set.
1158 */
1159 
1160 int ha_archive::rnd_init(bool scan)
1161 {
1162  DBUG_ENTER("ha_archive::rnd_init");
1163 
1164  if (share->crashed)
1165  DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1166 
1167  init_archive_reader();
1168 
1169  /* We rewind the file so that we can read from the beginning if scan */
1170  if (scan)
1171  {
1172  scan_rows= stats.records;
1173  DBUG_PRINT("info", ("archive will retrieve %llu rows",
1174  (unsigned long long) scan_rows));
1175 
1176  if (read_data_header(&archive))
1177  DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1178  }
1179 
1180  DBUG_RETURN(0);
1181 }
1182 
1183 
1184 /*
1185  This is the method that is used to read a row. It assumes that the row is
1186  positioned where you want it.
1187 */
1188 int ha_archive::get_row(azio_stream *file_to_read, uchar *buf)
1189 {
1190  int rc;
1191  DBUG_ENTER("ha_archive::get_row");
1192  DBUG_PRINT("ha_archive", ("Picking version for get_row() %d -> %d",
1193  (uchar)file_to_read->version,
1194  ARCHIVE_VERSION));
1195  if (file_to_read->version == ARCHIVE_VERSION)
1196  rc= get_row_version3(file_to_read, buf);
1197  else
1198  rc= get_row_version2(file_to_read, buf);
1199 
1200  DBUG_PRINT("ha_archive", ("Return %d\n", rc));
1201 
1202  DBUG_RETURN(rc);
1203 }
1204 
1205 /* Reallocate buffer if needed */
1206 bool ha_archive::fix_rec_buff(unsigned int length)
1207 {
1208  DBUG_ENTER("ha_archive::fix_rec_buff");
1209  DBUG_PRINT("ha_archive", ("Fixing %u for %u",
1210  length, record_buffer->length));
1211  DBUG_ASSERT(record_buffer->buffer);
1212 
1213  if (length > record_buffer->length)
1214  {
1215  uchar *newptr;
1216  if (!(newptr=(uchar*) my_realloc((uchar*) record_buffer->buffer,
1217  length,
1218  MYF(MY_ALLOW_ZERO_PTR))))
1219  DBUG_RETURN(1);
1220  record_buffer->buffer= newptr;
1221  record_buffer->length= length;
1222  }
1223 
1224  DBUG_ASSERT(length <= record_buffer->length);
1225 
1226  DBUG_RETURN(0);
1227 }
1228 
1229 int ha_archive::unpack_row(azio_stream *file_to_read, uchar *record)
1230 {
1231  DBUG_ENTER("ha_archive::unpack_row");
1232 
1233  unsigned int read;
1234  int error;
1235  uchar size_buffer[ARCHIVE_ROW_HEADER_SIZE], *size_buffer_p= size_buffer;
1236  unsigned int row_len;
1237 
1238  /* First we grab the length stored */
1239  read= azread(file_to_read, size_buffer, ARCHIVE_ROW_HEADER_SIZE, &error);
1240 
1241  if (error == Z_STREAM_ERROR || (read && read < ARCHIVE_ROW_HEADER_SIZE))
1242  DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1243 
1244  /* If we read nothing we are at the end of the file */
1245  if (read == 0 || read != ARCHIVE_ROW_HEADER_SIZE)
1246  DBUG_RETURN(HA_ERR_END_OF_FILE);
1247 
1248  row_len= uint4korr(size_buffer_p);
1249  DBUG_PRINT("ha_archive",("Unpack row length %u -> %u", row_len,
1250  (unsigned int)table->s->reclength));
1251 
1252  if (fix_rec_buff(row_len))
1253  {
1254  DBUG_RETURN(HA_ERR_OUT_OF_MEM);
1255  }
1256  DBUG_ASSERT(row_len <= record_buffer->length);
1257 
1258  read= azread(file_to_read, record_buffer->buffer, row_len, &error);
1259 
1260  if (read != row_len || error)
1261  {
1262  DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1263  }
1264 
1265  /* Copy null bits */
1266  const uchar *ptr= record_buffer->buffer;
1267  /*
1268  Field::unpack() is not called when field is NULL. For VARCHAR
1269  Field::unpack() only unpacks as much bytes as occupied by field
1270  value. In these cases respective memory area on record buffer is
1271  not initialized.
1272 
1273  These uninitialized areas may be accessed by CHECKSUM TABLE or
1274  by optimizer using temporary table (BUG#12997905). We may remove
1275  this memset() when they're fixed.
1276  */
1277  memset(record, 0, table->s->reclength);
1278  memcpy(record, ptr, table->s->null_bytes);
1279  ptr+= table->s->null_bytes;
1280  for (Field **field=table->field ; *field ; field++)
1281  {
1282  if (!((*field)->is_null_in_record(record)))
1283  {
1284  ptr= (*field)->unpack(record + (*field)->offset(table->record[0]), ptr);
1285  }
1286  }
1287  DBUG_RETURN(0);
1288 }
1289 
1290 
1291 int ha_archive::get_row_version3(azio_stream *file_to_read, uchar *buf)
1292 {
1293  DBUG_ENTER("ha_archive::get_row_version3");
1294 
1295  int returnable= unpack_row(file_to_read, buf);
1296 
1297  DBUG_RETURN(returnable);
1298 }
1299 
1300 
1301 int ha_archive::get_row_version2(azio_stream *file_to_read, uchar *buf)
1302 {
1303  unsigned int read;
1304  int error;
1305  uint *ptr, *end;
1306  char *last;
1307  size_t total_blob_length= 0;
1308  MY_BITMAP *read_set= table->read_set;
1309  DBUG_ENTER("ha_archive::get_row_version2");
1310 
1311  read= azread(file_to_read, (voidp)buf, table->s->reclength, &error);
1312 
1313  /* If we read nothing we are at the end of the file */
1314  if (read == 0)
1315  DBUG_RETURN(HA_ERR_END_OF_FILE);
1316 
1317  if (read != table->s->reclength)
1318  {
1319  DBUG_PRINT("ha_archive::get_row_version2", ("Read %u bytes expected %u",
1320  read,
1321  (unsigned int)table->s->reclength));
1322  DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1323  }
1324 
1325  if (error == Z_STREAM_ERROR || error == Z_DATA_ERROR )
1326  DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1327 
1328  /*
1329  If the record is the wrong size, the file is probably damaged, unless
1330  we are dealing with a delayed insert or a bulk insert.
1331  */
1332  if ((ulong) read != table->s->reclength)
1333  DBUG_RETURN(HA_ERR_END_OF_FILE);
1334 
1335  /* Calculate blob length, we use this for our buffer */
1336  for (ptr= table->s->blob_field, end=ptr + table->s->blob_fields ;
1337  ptr != end ;
1338  ptr++)
1339  {
1340  if (bitmap_is_set(read_set,
1341  (((Field_blob*) table->field[*ptr])->field_index)))
1342  total_blob_length += ((Field_blob*) table->field[*ptr])->get_length();
1343  }
1344 
1345  /* Adjust our row buffer if we need be */
1346  buffer.alloc(total_blob_length);
1347  last= (char *)buffer.ptr();
1348 
1349  /* Loop through our blobs and read them */
1350  for (ptr= table->s->blob_field, end=ptr + table->s->blob_fields ;
1351  ptr != end ;
1352  ptr++)
1353  {
1354  size_t size= ((Field_blob*) table->field[*ptr])->get_length();
1355  if (size)
1356  {
1357  if (bitmap_is_set(read_set,
1358  ((Field_blob*) table->field[*ptr])->field_index))
1359  {
1360  read= azread(file_to_read, last, size, &error);
1361 
1362  if (error)
1363  DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1364 
1365  if ((size_t) read != size)
1366  DBUG_RETURN(HA_ERR_END_OF_FILE);
1367  ((Field_blob*) table->field[*ptr])->set_ptr(size, (uchar*) last);
1368  last += size;
1369  }
1370  else
1371  {
1372  (void)azseek(file_to_read, size, SEEK_CUR);
1373  }
1374  }
1375  }
1376  DBUG_RETURN(0);
1377 }
1378 
1379 
1380 /*
1381  Called during ORDER BY. Its position is either from being called sequentially
1382  or by having had ha_archive::rnd_pos() called before it is called.
1383 */
1384 
1385 int ha_archive::rnd_next(uchar *buf)
1386 {
1387  int rc;
1388  DBUG_ENTER("ha_archive::rnd_next");
1389  MYSQL_READ_ROW_START(table_share->db.str,
1390  table_share->table_name.str, TRUE);
1391 
1392  if (share->crashed)
1393  DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1394 
1395  if (!scan_rows)
1396  {
1397  rc= HA_ERR_END_OF_FILE;
1398  goto end;
1399  }
1400  scan_rows--;
1401 
1402  ha_statistic_increment(&SSV::ha_read_rnd_next_count);
1403  current_position= aztell(&archive);
1404  rc= get_row(&archive, buf);
1405 
1406  table->status=rc ? STATUS_NOT_FOUND: 0;
1407 
1408 end:
1409  MYSQL_READ_ROW_DONE(rc);
1410  DBUG_RETURN(rc);
1411 }
1412 
1413 
1414 /*
1415  Thanks to the table flag HA_REC_NOT_IN_SEQ this will be called after
1416  each call to ha_archive::rnd_next() if an ordering of the rows is
1417  needed.
1418 */
1419 
1420 void ha_archive::position(const uchar *record)
1421 {
1422  DBUG_ENTER("ha_archive::position");
1423  my_store_ptr(ref, ref_length, current_position);
1424  DBUG_VOID_RETURN;
1425 }
1426 
1427 
1428 /*
1429  This is called after a table scan for each row if the results of the
1430  scan need to be ordered. It will take *pos and use it to move the
1431  cursor in the file so that the next row that is called is the
1432  correctly ordered row.
1433 */
1434 
1435 int ha_archive::rnd_pos(uchar * buf, uchar *pos)
1436 {
1437  int rc;
1438  DBUG_ENTER("ha_archive::rnd_pos");
1439  MYSQL_READ_ROW_START(table_share->db.str,
1440  table_share->table_name.str, FALSE);
1441  ha_statistic_increment(&SSV::ha_read_rnd_next_count);
1442  current_position= (my_off_t)my_get_ptr(pos, ref_length);
1443  if (azseek(&archive, current_position, SEEK_SET) == (my_off_t)(-1L))
1444  {
1445  rc= HA_ERR_CRASHED_ON_USAGE;
1446  goto end;
1447  }
1448  rc= get_row(&archive, buf);
1449 end:
1450  MYSQL_READ_ROW_DONE(rc);
1451  DBUG_RETURN(rc);
1452 }
1453 
1454 /*
1455  This method repairs the meta file. It does this by walking the datafile and
1456  rewriting the meta file. If EXTENDED repair is requested, we attempt to
1457  recover as much data as possible.
1458 */
1459 int ha_archive::repair(THD* thd, HA_CHECK_OPT* check_opt)
1460 {
1461  DBUG_ENTER("ha_archive::repair");
1462  int rc= optimize(thd, check_opt);
1463 
1464  if (rc)
1465  DBUG_RETURN(HA_ADMIN_CORRUPT);
1466 
1467  share->crashed= FALSE;
1468  DBUG_RETURN(0);
1469 }
1470 
1471 /*
1472  The table can become fragmented if data was inserted, read, and then
1473  inserted again. What we do is open up the file and recompress it completely.
1474 */
1475 int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt)
1476 {
1477  int rc= 0;
1478  azio_stream writer;
1479  ha_rows count;
1480  my_bitmap_map *org_bitmap;
1481  char writer_filename[FN_REFLEN];
1482  DBUG_ENTER("ha_archive::optimize");
1483 
1484  mysql_mutex_lock(&share->mutex);
1485  if (share->in_optimize)
1486  {
1487  mysql_mutex_unlock(&share->mutex);
1488  DBUG_RETURN(HA_ADMIN_FAILED);
1489  }
1490  share->in_optimize= true;
1491  /* remember the number of rows */
1492  count= share->rows_recorded;
1493  if (share->archive_write_open)
1494  azflush(&share->archive_write, Z_SYNC_FLUSH);
1495  mysql_mutex_unlock(&share->mutex);
1496 
1497  init_archive_reader();
1498 
1499  /* Lets create a file to contain the new data */
1500  fn_format(writer_filename, share->table_name, "", ARN,
1501  MY_REPLACE_EXT | MY_UNPACK_FILENAME);
1502 
1503  if (!(azopen(&writer, writer_filename, O_CREAT|O_RDWR|O_BINARY)))
1504  {
1505  share->in_optimize= false;
1506  DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1507  }
1508 
1509  /*
1510  Transfer the embedded FRM so that the file can be discoverable.
1511  Write file offset is set to the end of the file.
1512  */
1513  if ((rc= frm_copy(&archive, &writer)))
1514  {
1515  share->in_optimize= false;
1516  goto error;
1517  }
1518  /*
1519  An extended rebuild is a lot more effort. We open up each row and re-record it.
1520  Any dead rows are removed (aka rows that may have been partially recorded).
1521 
1522  As of Archive format 3, this is the only type that is performed, before this
1523  version it was just done on T_EXTEND
1524  */
1525 
1526  DBUG_PRINT("ha_archive", ("archive extended rebuild"));
1527 
1528  /*
1529  Now we will rewind the archive file so that we are positioned at the
1530  start of the file.
1531  */
1532  if ((rc= read_data_header(&archive)))
1533  {
1534  share->in_optimize= false;
1535  goto error;
1536  }
1537 
1538  stats.auto_increment_value= 1;
1539  org_bitmap= tmp_use_all_columns(table, table->read_set);
1540  /* read rows upto the remembered rows */
1541  for (ha_rows cur_count= count; cur_count; cur_count--)
1542  {
1543  if ((rc= get_row(&archive, table->record[0])))
1544  break;
1545  real_write_row(table->record[0], &writer);
1546  if (table->found_next_number_field)
1547  save_auto_increment(table, &stats.auto_increment_value);
1548  }
1549 
1550  mysql_mutex_lock(&share->mutex);
1551 
1552  share->close_archive_writer();
1553  if (!rc)
1554  {
1555  /* read the remaining rows */
1556  for (count= share->rows_recorded - count; count; count--)
1557  {
1558  if ((rc= get_row(&archive, table->record[0])))
1559  break;
1560  real_write_row(table->record[0], &writer);
1561  if (table->found_next_number_field)
1562  save_auto_increment(table, &stats.auto_increment_value);
1563  }
1564  }
1565 
1566  tmp_restore_column_map(table->read_set, org_bitmap);
1567  share->rows_recorded= (ha_rows) writer.rows;
1568  share->archive_write.auto_increment= stats.auto_increment_value - 1;
1569  DBUG_PRINT("info", ("recovered %llu archive rows",
1570  (unsigned long long)share->rows_recorded));
1571 
1572  DBUG_PRINT("ha_archive", ("recovered %llu archive rows",
1573  (unsigned long long)share->rows_recorded));
1574 
1575  /*
1576  If REPAIR ... EXTENDED is requested, try to recover as much data
1577  from data file as possible. In this case if we failed to read a
1578  record, we assume EOF. This allows massive data loss, but we can
1579  hardly do more with broken zlib stream. And this is the only way
1580  to restore at least what is still recoverable.
1581  */
1582  if (rc && rc != HA_ERR_END_OF_FILE && !(check_opt->flags & T_EXTEND))
1583  {
1584  share->in_optimize= false;
1585  mysql_mutex_unlock(&share->mutex);
1586  goto error;
1587  }
1588 
1589  azclose(&writer);
1590  share->dirty= FALSE;
1591  azclose(&archive);
1592  archive_reader_open= FALSE;
1593 
1594  // make the file we just wrote be our data file
1595  rc= my_rename(writer_filename, share->data_file_name, MYF(0));
1596  share->in_optimize= false;
1597  mysql_mutex_unlock(&share->mutex);
1598 
1599  DBUG_RETURN(rc);
1600 error:
1601  DBUG_PRINT("ha_archive", ("Failed to recover, error was %d", rc));
1602  azclose(&writer);
1603 
1604  DBUG_RETURN(rc);
1605 }
1606 
1607 /*
1608  Below is an example of how to setup row level locking.
1609 */
1611  THR_LOCK_DATA **to,
1612  enum thr_lock_type lock_type)
1613 {
1614  if (lock_type == TL_WRITE_DELAYED)
1615  delayed_insert= TRUE;
1616  else
1617  delayed_insert= FALSE;
1618 
1619  if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
1620  {
1621  /*
1622  Here is where we get into the guts of a row level lock.
1623  If TL_UNLOCK is set
1624  If we are not doing a LOCK TABLE or DISCARD/IMPORT
1625  TABLESPACE, then allow multiple writers
1626  */
1627 
1628  if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
1629  lock_type <= TL_WRITE) && !thd_in_lock_tables(thd)
1630  && !thd_tablespace_op(thd))
1631  lock_type = TL_WRITE_ALLOW_WRITE;
1632 
1633  /*
1634  In queries of type INSERT INTO t1 SELECT ... FROM t2 ...
1635  MySQL would use the lock TL_READ_NO_INSERT on t2, and that
1636  would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts
1637  to t2. Convert the lock to a normal read lock to allow
1638  concurrent inserts to t2.
1639  */
1640 
1641  if (lock_type == TL_READ_NO_INSERT && !thd_in_lock_tables(thd))
1642  lock_type = TL_READ;
1643 
1644  lock.type=lock_type;
1645  }
1646 
1647  *to++= &lock;
1648 
1649  return to;
1650 }
1651 
1652 void ha_archive::update_create_info(HA_CREATE_INFO *create_info)
1653 {
1654  char tmp_real_path[FN_REFLEN];
1655  DBUG_ENTER("ha_archive::update_create_info");
1656 
1657  ha_archive::info(HA_STATUS_AUTO);
1658  if (!(create_info->used_fields & HA_CREATE_USED_AUTO))
1659  {
1660  create_info->auto_increment_value= stats.auto_increment_value;
1661  }
1662 
1663  if (!(my_readlink(tmp_real_path, share->data_file_name, MYF(0))))
1664  create_info->data_file_name= sql_strdup(tmp_real_path);
1665 
1666  DBUG_VOID_RETURN;
1667 }
1668 
1669 
1670 /*
1671  Hints for optimizer, see ha_tina for more information
1672 */
1673 int ha_archive::info(uint flag)
1674 {
1675  DBUG_ENTER("ha_archive::info");
1676 
1677  mysql_mutex_lock(&share->mutex);
1678  if (share->dirty)
1679  {
1680  DBUG_PRINT("ha_archive", ("archive flushing out rows for scan"));
1681  DBUG_ASSERT(share->archive_write_open);
1682  azflush(&(share->archive_write), Z_SYNC_FLUSH);
1683  share->dirty= FALSE;
1684  }
1685 
1686  /*
1687  This should be an accurate number now, though bulk and delayed inserts can
1688  cause the number to be inaccurate.
1689  */
1690  stats.records= share->rows_recorded;
1691  mysql_mutex_unlock(&share->mutex);
1692 
1693  stats.deleted= 0;
1694 
1695  DBUG_PRINT("ha_archive", ("Stats rows is %d\n", (int)stats.records));
1696  /* Costs quite a bit more to get all information */
1697  if (flag & (HA_STATUS_TIME | HA_STATUS_CONST | HA_STATUS_VARIABLE))
1698  {
1699  MY_STAT file_stat; // Stat information for the data file
1700 
1701  (void) mysql_file_stat(arch_key_file_data, share->data_file_name, &file_stat, MYF(MY_WME));
1702 
1703  if (flag & HA_STATUS_TIME)
1704  stats.update_time= (ulong) file_stat.st_mtime;
1705  if (flag & HA_STATUS_CONST)
1706  {
1707  stats.max_data_file_length= share->rows_recorded * stats.mean_rec_length;
1708  stats.max_data_file_length= MAX_FILE_SIZE;
1709  stats.create_time= (ulong) file_stat.st_ctime;
1710  }
1711  if (flag & HA_STATUS_VARIABLE)
1712  {
1713  stats.delete_length= 0;
1714  stats.data_file_length= file_stat.st_size;
1715  stats.index_file_length=0;
1716  stats.mean_rec_length= stats.records ?
1717  ulong(stats.data_file_length / stats.records) : table->s->reclength;
1718  }
1719  }
1720 
1721  if (flag & HA_STATUS_AUTO)
1722  {
1723  /* TODO: Use the shared writer instead during the lock above. */
1724  init_archive_reader();
1725  mysql_mutex_lock(&share->mutex);
1726  azflush(&archive, Z_SYNC_FLUSH);
1727  mysql_mutex_unlock(&share->mutex);
1728  stats.auto_increment_value= archive.auto_increment + 1;
1729  }
1730 
1731  DBUG_RETURN(0);
1732 }
1733 
1734 
1745 int ha_archive::extra(enum ha_extra_function operation)
1746 {
1747  int ret= 0;
1748  DBUG_ENTER("ha_archive::extra");
1749  /* On windows we need to close all files before rename/delete. */
1750 #ifdef __WIN__
1751  switch (operation)
1752  {
1753  case HA_EXTRA_PREPARE_FOR_RENAME:
1754  case HA_EXTRA_FORCE_REOPEN:
1755  /* Close both reader and writer so we don't have the file open. */
1756  if (archive_reader_open)
1757  {
1758  ret= azclose(&archive);
1759  archive_reader_open= false;
1760  }
1761  mysql_mutex_lock(&share->mutex);
1762  share->close_archive_writer();
1763  mysql_mutex_unlock(&share->mutex);
1764  break;
1765  default:
1766  /* Nothing to do. */
1767  ;
1768  }
1769 #endif
1770  DBUG_RETURN(ret);
1771 }
1772 
1773 
1774 /*
1775  This method tells us that a bulk insert operation is about to occur. We set
1776  a flag which will keep write_row from saying that its data is dirty. This in
1777  turn will keep selects from causing a sync to occur.
1778  Basically, yet another optimizations to keep compression working well.
1779 */
1780 void ha_archive::start_bulk_insert(ha_rows rows)
1781 {
1782  DBUG_ENTER("ha_archive::start_bulk_insert");
1783  if (!rows || rows >= ARCHIVE_MIN_ROWS_TO_USE_BULK_INSERT)
1784  bulk_insert= TRUE;
1785  DBUG_VOID_RETURN;
1786 }
1787 
1788 
1789 /*
1790  Other side of start_bulk_insert, is end_bulk_insert. Here we turn off the bulk insert
1791  flag, and set the share dirty so that the next select will call sync for us.
1792 */
1793 int ha_archive::end_bulk_insert()
1794 {
1795  DBUG_ENTER("ha_archive::end_bulk_insert");
1796  bulk_insert= FALSE;
1797  mysql_mutex_lock(&share->mutex);
1798  if (share->archive_write_open)
1799  share->dirty= true;
1800  mysql_mutex_unlock(&share->mutex);
1801  DBUG_RETURN(0);
1802 }
1803 
1804 /*
1805  We cancel a truncate command. The only way to delete an archive table is to drop it.
1806  This is done for security reasons. In a later version we will enable this by
1807  allowing the user to select a different row format.
1808 */
1810 {
1811  DBUG_ENTER("ha_archive::truncate");
1812  DBUG_RETURN(HA_ERR_WRONG_COMMAND);
1813 }
1814 
1815 /*
1816  We just return state if asked.
1817 */
1818 bool ha_archive::is_crashed() const
1819 {
1820  DBUG_ENTER("ha_archive::is_crashed");
1821  DBUG_RETURN(share->crashed);
1822 }
1823 
1824 
1837 {
1838  DBUG_ENTER("ha_archive::check_for_upgrade");
1839  if (init_archive_reader())
1840  DBUG_RETURN(HA_ADMIN_CORRUPT);
1841  if (archive.version < ARCHIVE_VERSION)
1842  DBUG_RETURN(HA_ADMIN_NEEDS_UPGRADE);
1843  DBUG_RETURN(HA_ADMIN_OK);
1844 }
1845 
1846 
1847 /*
1848  Simple scan of the tables to make sure everything is ok.
1849 */
1850 
1851 int ha_archive::check(THD* thd, HA_CHECK_OPT* check_opt)
1852 {
1853  int rc= 0;
1854  const char *old_proc_info;
1855  ha_rows count;
1856  DBUG_ENTER("ha_archive::check");
1857 
1858  old_proc_info= thd_proc_info(thd, "Checking table");
1859  mysql_mutex_lock(&share->mutex);
1860  count= share->rows_recorded;
1861  /* Flush any waiting data */
1862  if (share->archive_write_open)
1863  azflush(&(share->archive_write), Z_SYNC_FLUSH);
1864  mysql_mutex_unlock(&share->mutex);
1865 
1866  if (init_archive_reader())
1867  DBUG_RETURN(HA_ADMIN_CORRUPT);
1868  /*
1869  Now we will rewind the archive file so that we are positioned at the
1870  start of the file.
1871  */
1872  read_data_header(&archive);
1873  for (ha_rows cur_count= count; cur_count; cur_count--)
1874  {
1875  if ((rc= get_row(&archive, table->record[0])))
1876  goto error;
1877  }
1878  /*
1879  Now read records that may have been inserted concurrently.
1880  Acquire share->mutex so tail of the table is not modified by
1881  concurrent writers.
1882  */
1883  mysql_mutex_lock(&share->mutex);
1884  count= share->rows_recorded - count;
1885  if (share->archive_write_open)
1886  azflush(&(share->archive_write), Z_SYNC_FLUSH);
1887  while (!(rc= get_row(&archive, table->record[0])))
1888  count--;
1889  mysql_mutex_unlock(&share->mutex);
1890 
1891  if ((rc && rc != HA_ERR_END_OF_FILE) || count)
1892  goto error;
1893 
1894  thd_proc_info(thd, old_proc_info);
1895  DBUG_RETURN(HA_ADMIN_OK);
1896 
1897 error:
1898  thd_proc_info(thd, old_proc_info);
1899  share->crashed= FALSE;
1900  DBUG_RETURN(HA_ADMIN_CORRUPT);
1901 }
1902 
1903 /*
1904  Check and repair the table if needed.
1905 */
1906 bool ha_archive::check_and_repair(THD *thd)
1907 {
1908  HA_CHECK_OPT check_opt;
1909  DBUG_ENTER("ha_archive::check_and_repair");
1910 
1911  check_opt.init();
1912 
1913  DBUG_RETURN(repair(thd, &check_opt));
1914 }
1915 
1916 archive_record_buffer *ha_archive::create_record_buffer(unsigned int length)
1917 {
1918  DBUG_ENTER("ha_archive::create_record_buffer");
1920  if (!(r=
1921  (archive_record_buffer*) my_malloc(sizeof(archive_record_buffer),
1922  MYF(MY_WME))))
1923  {
1924  DBUG_RETURN(NULL); /* purecov: inspected */
1925  }
1926  r->length= (int)length;
1927 
1928  if (!(r->buffer= (uchar*) my_malloc(r->length,
1929  MYF(MY_WME))))
1930  {
1931  my_free(r);
1932  DBUG_RETURN(NULL); /* purecov: inspected */
1933  }
1934 
1935  DBUG_RETURN(r);
1936 }
1937 
1938 void ha_archive::destroy_record_buffer(archive_record_buffer *r)
1939 {
1940  DBUG_ENTER("ha_archive::destroy_record_buffer");
1941  my_free(r->buffer);
1942  my_free(r);
1943  DBUG_VOID_RETURN;
1944 }
1945 
1947  uint table_changes)
1948 {
1949  if (info->auto_increment_value != stats.auto_increment_value ||
1950  (info->used_fields & HA_CREATE_USED_DATADIR) ||
1951  info->data_file_name ||
1952  (info->used_fields & HA_CREATE_USED_COMMENT) ||
1953  table_changes != IS_EQUAL_YES)
1954  return COMPATIBLE_DATA_NO;
1955 
1956  return COMPATIBLE_DATA_YES;
1957 }
1958 
1959 
1960 struct st_mysql_storage_engine archive_storage_engine=
1961 { MYSQL_HANDLERTON_INTERFACE_VERSION };
1962 
1963 mysql_declare_plugin(archive)
1964 {
1965  MYSQL_STORAGE_ENGINE_PLUGIN,
1966  &archive_storage_engine,
1967  "ARCHIVE",
1968  "Brian Aker, MySQL AB",
1969  "Archive storage engine",
1970  PLUGIN_LICENSE_GPL,
1971  archive_db_init, /* Plugin Init */
1972  NULL, /* Plugin Deinit */
1973  0x0300 /* 3.0 */,
1974  NULL, /* status variables */
1975  NULL, /* system variables */
1976  NULL, /* config options */
1977  0, /* flags */
1978 }
1979 mysql_declare_plugin_end;
1980