MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ha_ndbinfo.cc
1 /*
2  Copyright (C) 2009 Sun Microsystems Inc.
3  All rights reserved. Use is subject to license terms.
4 
5  This program is free software; you can redistribute it and/or modify
6  it under the terms of the GNU General Public License as published by
7  the Free Software Foundation; version 2 of the License.
8 
9  This program is distributed in the hope that it will be useful,
10  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  GNU General Public License for more details.
13 
14  You should have received a copy of the GNU General Public License
15  along with this program; if not, write to the Free Software
16  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18 
19 #include "ha_ndbcluster_glue.h"
20 
21 #ifdef WITH_NDBCLUSTER_STORAGE_ENGINE
22 #include "ha_ndbinfo.h"
23 #include "../storage/ndb/src/ndbapi/NdbInfo.hpp"
24 
25 
26 static MYSQL_THDVAR_UINT(
27  max_rows, /* name */
28  PLUGIN_VAR_RQCMDARG,
29  "Specify max number of rows to fetch per roundtrip to cluster",
30  NULL, /* check func. */
31  NULL, /* update func. */
32  10, /* default */
33  1, /* min */
34  256, /* max */
35  0 /* block */
36 );
37 
38 static MYSQL_THDVAR_UINT(
39  max_bytes, /* name */
40  PLUGIN_VAR_RQCMDARG,
41  "Specify approx. max number of bytes to fetch per roundtrip to cluster",
42  NULL, /* check func. */
43  NULL, /* update func. */
44  0, /* default */
45  0, /* min */
46  65535, /* max */
47  0 /* block */
48 );
49 
50 static MYSQL_THDVAR_BOOL(
51  show_hidden, /* name */
52  PLUGIN_VAR_RQCMDARG,
53  "Control if tables should be visible or not",
54  NULL, /* check func. */
55  NULL, /* update func. */
56  FALSE /* default */
57 );
58 
59 static char* opt_ndbinfo_dbname = (char*)"ndbinfo";
60 static MYSQL_SYSVAR_STR(
61  database, /* name */
62  opt_ndbinfo_dbname, /* var */
63  PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
64  "Name of the database used by ndbinfo",
65  NULL, /* check func. */
66  NULL, /* update func. */
67  NULL /* default */
68 );
69 
70 static char* opt_ndbinfo_table_prefix = (char*)"ndb$";
71 static MYSQL_SYSVAR_STR(
72  table_prefix, /* name */
73  opt_ndbinfo_table_prefix, /* var */
74  PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
75  "Prefix to use for all virtual tables loaded from NDB",
76  NULL, /* check func. */
77  NULL, /* update func. */
78  NULL /* default */
79 );
80 
81 static Uint32 opt_ndbinfo_version = NDB_VERSION_D;
82 static MYSQL_SYSVAR_UINT(
83  version, /* name */
84  opt_ndbinfo_version, /* var */
85  PLUGIN_VAR_NOCMDOPT | PLUGIN_VAR_READONLY,
86  "Compile version for ndbinfo",
87  NULL, /* check func. */
88  NULL, /* update func. */
89  0, /* default */
90  0, /* min */
91  0, /* max */
92  0 /* block */
93 );
94 
95 static my_bool opt_ndbinfo_offline;
96 
97 static
98 void
99 offline_update(THD* thd, struct st_mysql_sys_var* var,
100  void* var_ptr, const void* save)
101 {
102  DBUG_ENTER("offline_update");
103 
104  const my_bool new_offline =
105  (*(static_cast<const my_bool*>(save)) != 0);
106  if (new_offline == opt_ndbinfo_offline)
107  {
108  // No change
109  DBUG_VOID_RETURN;
110  }
111 
112  // Set offline mode, any tables opened from here on will
113  // be opened in the new mode
114  opt_ndbinfo_offline = new_offline;
115 
116  // Close any open tables which may be in the old mode
117  (void)close_cached_tables(thd, NULL, false, true, false);
118 
119  DBUG_VOID_RETURN;
120 }
121 
122 static MYSQL_SYSVAR_BOOL(
123  offline, /* name */
124  opt_ndbinfo_offline, /* var */
125  PLUGIN_VAR_NOCMDOPT,
126  "Set ndbinfo in offline mode, tables and views can "
127  "be opened even if they don't exist or have different "
128  "definition in NDB. No rows will be returned.",
129  NULL, /* check func. */
130  offline_update, /* update func. */
131  0 /* default */
132 );
133 
134 
135 static NdbInfo* g_ndbinfo;
136 
137 extern Ndb_cluster_connection* g_ndb_cluster_connection;
138 
139 static bool
140 ndbcluster_is_disabled(void)
141 {
142  /*
143  ndbinfo uses the same connection as ndbcluster
144  to avoid using up another nodeid, this also means that
145  if ndbcluster is not enabled, ndbinfo won't start
146  */
147  if (g_ndb_cluster_connection)
148  return false;
149  assert(g_ndbinfo == NULL);
150  return true;
151 }
152 
153 static handler*
154 create_handler(handlerton *hton, TABLE_SHARE *table, MEM_ROOT *mem_root)
155 {
156  return new (mem_root) ha_ndbinfo(hton, table);
157 }
158 
159 struct ha_ndbinfo_impl
160 {
161  const NdbInfo::Table* m_table;
162  NdbInfoScanOperation* m_scan_op;
164  bool m_first_use;
165 
166  // Indicates if table has been opened in offline mode
167  // can only be reset by closing the table
168  bool m_offline;
169 
170  ha_ndbinfo_impl() :
171  m_table(NULL),
172  m_scan_op(NULL),
173  m_first_use(true),
174  m_offline(false)
175  {
176  }
177 };
178 
179 ha_ndbinfo::ha_ndbinfo(handlerton *hton, TABLE_SHARE *table_arg)
180 : handler(hton, table_arg), m_impl(*new ha_ndbinfo_impl)
181 {
182 }
183 
184 ha_ndbinfo::~ha_ndbinfo()
185 {
186  delete &m_impl;
187 }
188 
189 enum ndbinfo_error_codes {
190  ERR_INCOMPAT_TABLE_DEF = 40001
191 };
192 
193 struct error_message {
194  int error;
195  const char* message;
196 } error_messages[] = {
197  { ERR_INCOMPAT_TABLE_DEF, "Incompatible table definitions" },
198  { HA_ERR_NO_CONNECTION, "Connection to NDB failed" },
199 
200  { 0, 0 }
201 };
202 
203 static
204 const char* find_error_message(int error)
205 {
206  struct error_message* err = error_messages;
207  while (err->error && err->message)
208  {
209  if (err->error == error)
210  {
211  assert(err->message);
212  return err->message;
213  }
214  err++;
215  }
216  return NULL;
217 }
218 
219 static int err2mysql(int error)
220 {
221  DBUG_ENTER("err2mysql");
222  DBUG_PRINT("enter", ("error: %d", error));
223  assert(error != 0);
224  switch(error)
225  {
226  case NdbInfo::ERR_ClusterFailure:
227  DBUG_RETURN(HA_ERR_NO_CONNECTION);
228  break;
229  case NdbInfo::ERR_OutOfMemory:
230  DBUG_RETURN(HA_ERR_OUT_OF_MEM);
231  break;
232  default:
233  break;
234  }
235  push_warning_printf(current_thd, Sql_condition::WARN_LEVEL_WARN,
236  ER_GET_ERRNO, ER(ER_GET_ERRNO), error);
237  DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
238 }
239 
240 bool ha_ndbinfo::get_error_message(int error, String *buf)
241 {
242  DBUG_ENTER("ha_ndbinfo::get_error_message");
243  DBUG_PRINT("enter", ("error: %d", error));
244 
245  const char* message = find_error_message(error);
246  if (!message)
247  DBUG_RETURN(false);
248 
249  buf->set(message, strlen(message), &my_charset_bin);
250  DBUG_PRINT("exit", ("message: %s", buf->ptr()));
251  DBUG_RETURN(false);
252 }
253 
254 static void
255 generate_sql(const NdbInfo::Table* ndb_tab, BaseString& sql)
256 {
257  sql.appfmt("'CREATE TABLE `%s`.`%s%s` (",
258  opt_ndbinfo_dbname, opt_ndbinfo_table_prefix, ndb_tab->getName());
259 
260  const char* separator = "";
261  for (unsigned i = 0; i < ndb_tab->columns(); i++)
262  {
263  const NdbInfo::Column* col = ndb_tab->getColumn(i);
264 
265  sql.appfmt("%s", separator);
266  separator = ", ";
267 
268  sql.appfmt("`%s` ", col->m_name.c_str());
269 
270  switch(col->m_type)
271  {
272  case NdbInfo::Column::Number:
273  sql.appfmt("INT UNSIGNED");
274  break;
275  case NdbInfo::Column::Number64:
276  sql.appfmt("BIGINT UNSIGNED");
277  break;
278  case NdbInfo::Column::String:
279  sql.appfmt("VARCHAR(512)");
280  break;
281  default:
282  sql.appfmt("UNKNOWN");
283  assert(false);
284  break;
285  }
286  }
287  sql.appfmt(") ENGINE=NDBINFO'");
288 }
289 
290 /*
291  Push a warning with explanation of the problem as well as the
292  proper SQL so the user can regenerate the table definition
293 */
294 
295 static void
296 warn_incompatible(const NdbInfo::Table* ndb_tab, bool fatal,
297  const char* format, ...)
298 {
299  BaseString msg;
300  DBUG_ENTER("warn_incompatible");
301  DBUG_PRINT("enter",("table_name: %s, fatal: %d", ndb_tab->getName(), fatal));
302  DBUG_ASSERT(format != NULL);
303 
304  va_list args;
305  char explanation[128];
306  va_start(args,format);
307  my_vsnprintf(explanation, sizeof(explanation), format, args);
308  va_end(args);
309 
310  msg.assfmt("Table '%s%s' is defined differently in NDB, %s. The "
311  "SQL to regenerate is: ",
312  opt_ndbinfo_table_prefix, ndb_tab->getName(), explanation);
313  generate_sql(ndb_tab, msg);
314 
315  const Sql_condition::enum_warning_level level =
316  (fatal ? Sql_condition::WARN_LEVEL_WARN : Sql_condition::WARN_LEVEL_NOTE);
317  push_warning(current_thd, level, ERR_INCOMPAT_TABLE_DEF, msg.c_str());
318 
319  DBUG_VOID_RETURN;
320 }
321 
322 int ha_ndbinfo::create(const char *name, TABLE *form,
323  HA_CREATE_INFO *create_info)
324 {
325  DBUG_ENTER("ha_ndbinfo::create");
326  DBUG_PRINT("enter", ("name: %s", name));
327 
328  DBUG_RETURN(0);
329 }
330 
331 bool ha_ndbinfo::is_open(void) const
332 {
333  return m_impl.m_table != NULL;
334 }
335 
336 bool ha_ndbinfo::is_offline(void) const
337 {
338  return m_impl.m_offline;
339 }
340 
341 int ha_ndbinfo::open(const char *name, int mode, uint test_if_locked)
342 {
343  DBUG_ENTER("ha_ndbinfo::open");
344  DBUG_PRINT("enter", ("name: %s, mode: %d", name, mode));
345 
346  assert(is_closed());
347  assert(!is_offline()); // Closed table can not be offline
348 
349  if (mode == O_RDWR)
350  {
351  if (table->db_stat & HA_TRY_READ_ONLY)
352  {
353  DBUG_PRINT("info", ("Telling server to use readonly mode"));
354  DBUG_RETURN(EROFS); // Read only fs
355  }
356  // Find any commands that does not allow open readonly
357  DBUG_ASSERT(false);
358  }
359 
360  if (opt_ndbinfo_offline ||
361  ndbcluster_is_disabled())
362  {
363  // Mark table as being offline and allow it to be opened
364  m_impl.m_offline = true;
365  DBUG_RETURN(0);
366  }
367 
368  int err = g_ndbinfo->openTable(name, &m_impl.m_table);
369  if (err)
370  {
371  assert(m_impl.m_table == 0);
372  if (err == NdbInfo::ERR_NoSuchTable)
373  DBUG_RETURN(HA_ERR_NO_SUCH_TABLE);
374  DBUG_RETURN(err2mysql(err));
375  }
376 
377  /*
378  Check table def. to detect incompatible differences which should
379  return an error. Differences which only generate a warning
380  is checked on first use
381  */
382  DBUG_PRINT("info", ("Comparing MySQL's table def against NDB"));
383  const NdbInfo::Table* ndb_tab = m_impl.m_table;
384  for (uint i = 0; i < table->s->fields; i++)
385  {
386  const Field* field = table->field[i];
387 
388  // Check if field is NULLable
389  if (const_cast<Field*>(field)->real_maybe_null() == false)
390  {
391  // Only NULLable fields supported
392  warn_incompatible(ndb_tab, true,
393  "column '%s' is NOT NULL",
394  field->field_name);
395  delete m_impl.m_table; m_impl.m_table= 0;
396  DBUG_RETURN(ERR_INCOMPAT_TABLE_DEF);
397  }
398 
399  // Check if column exist in NDB
400  const NdbInfo::Column* col = ndb_tab->getColumn(field->field_name);
401  if (!col)
402  {
403  // The column didn't exist
404  continue;
405  }
406 
407  // Check compatible field and column type
408  bool compatible = false;
409  switch(col->m_type)
410  {
411  case NdbInfo::Column::Number:
412  if (field->type() == MYSQL_TYPE_LONG)
413  compatible = true;
414  break;
415  case NdbInfo::Column::Number64:
416  if (field->type() == MYSQL_TYPE_LONGLONG)
417  compatible = true;
418  break;
419  case NdbInfo::Column::String:
420  if (field->type() == MYSQL_TYPE_VARCHAR)
421  compatible = true;
422  break;
423  default:
424  assert(false);
425  break;
426  }
427  if (!compatible)
428  {
429  // The column type is not compatible
430  warn_incompatible(ndb_tab, true,
431  "column '%s' is not compatible",
432  field->field_name);
433  delete m_impl.m_table; m_impl.m_table= 0;
434  DBUG_RETURN(ERR_INCOMPAT_TABLE_DEF);
435  }
436  }
437 
438  /* Increase "ref_length" to allow a whole row to be stored in "ref" */
439  ref_length = 0;
440  for (uint i = 0; i < table->s->fields; i++)
441  ref_length += table->field[i]->pack_length();
442  DBUG_PRINT("info", ("ref_length: %u", ref_length));
443 
444  DBUG_RETURN(0);
445 }
446 
447 int ha_ndbinfo::close(void)
448 {
449  DBUG_ENTER("ha_ndbinfo::close");
450 
451  if (is_offline())
452  DBUG_RETURN(0);
453 
454  assert(is_open());
455  if (m_impl.m_table)
456  {
457  g_ndbinfo->closeTable(m_impl.m_table);
458  m_impl.m_table = NULL;
459  }
460  DBUG_RETURN(0);
461 }
462 
463 int ha_ndbinfo::rnd_init(bool scan)
464 {
465  DBUG_ENTER("ha_ndbinfo::rnd_init");
466  DBUG_PRINT("info", ("scan: %d", scan));
467 
468  if (is_offline())
469  {
470  push_warning(current_thd, Sql_condition::WARN_LEVEL_NOTE, 1,
471  "'NDBINFO' has been started in offline mode "
472  "since the 'NDBCLUSTER' engine is disabled "
473  "or @@global.ndbinfo_offline is turned on "
474  "- no rows can be returned");
475  DBUG_RETURN(0);
476  }
477 
478  assert(is_open());
479  assert(m_impl.m_scan_op == NULL); // No scan already ongoing
480 
481  if (m_impl.m_first_use)
482  {
483  m_impl.m_first_use = false;
484 
485  /*
486  Check table def. and generate warnings for incompatibilites
487  which is allowed but should generate a warning.
488  (Done this late due to different code paths in MySQL Server for
489  prepared statement protocol, where warnings from 'handler::open'
490  are lost).
491  */
492  uint fields_found_in_ndb = 0;
493  const NdbInfo::Table* ndb_tab = m_impl.m_table;
494  for (uint i = 0; i < table->s->fields; i++)
495  {
496  const Field* field = table->field[i];
497  const NdbInfo::Column* col = ndb_tab->getColumn(field->field_name);
498  if (!col)
499  {
500  // The column didn't exist
501  warn_incompatible(ndb_tab, true,
502  "column '%s' does not exist",
503  field->field_name);
504  continue;
505  }
506  fields_found_in_ndb++;
507  }
508 
509  if (fields_found_in_ndb < ndb_tab->columns())
510  {
511  // There are more columns available in NDB
512  warn_incompatible(ndb_tab, false,
513  "there are more columns available");
514  }
515  }
516 
517  if (!scan)
518  {
519  // Just an init to read using 'rnd_pos'
520  DBUG_PRINT("info", ("not scan"));
521  DBUG_RETURN(0);
522  }
523 
524  THD* thd = current_thd;
525  int err;
526  NdbInfoScanOperation* scan_op = NULL;
527  if ((err = g_ndbinfo->createScanOperation(m_impl.m_table,
528  &scan_op,
529  THDVAR(thd, max_rows),
530  THDVAR(thd, max_bytes))) != 0)
531  DBUG_RETURN(err2mysql(err));
532 
533  if ((err = scan_op->readTuples()) != 0)
534  DBUG_RETURN(err2mysql(err));
535 
536  /* Read all columns specified in read_set */
537  for (uint i = 0; i < table->s->fields; i++)
538  {
539  Field *field = table->field[i];
540  if (bitmap_is_set(table->read_set, i))
541  m_impl.m_columns.push_back(scan_op->getValue(field->field_name));
542  else
543  m_impl.m_columns.push_back(NULL);
544  }
545 
546  if ((err = scan_op->execute()) != 0)
547  DBUG_RETURN(err2mysql(err));
548 
549  m_impl.m_scan_op = scan_op;
550  DBUG_RETURN(0);
551 }
552 
553 int ha_ndbinfo::rnd_end()
554 {
555  DBUG_ENTER("ha_ndbinfo::rnd_end");
556 
557  if (is_offline())
558  DBUG_RETURN(0);
559 
560  assert(is_open());
561 
562  if (m_impl.m_scan_op)
563  {
564  g_ndbinfo->releaseScanOperation(m_impl.m_scan_op);
565  m_impl.m_scan_op = NULL;
566  }
567  m_impl.m_columns.clear();
568 
569  DBUG_RETURN(0);
570 }
571 
572 int ha_ndbinfo::rnd_next(uchar *buf)
573 {
574  int err;
575  DBUG_ENTER("ha_ndbinfo::rnd_next");
576 
577  if (is_offline())
578  DBUG_RETURN(HA_ERR_END_OF_FILE);
579 
580  assert(is_open());
581  assert(m_impl.m_scan_op);
582 
583  if ((err = m_impl.m_scan_op->nextResult()) == 0)
584  DBUG_RETURN(HA_ERR_END_OF_FILE);
585 
586  if (err != 1)
587  DBUG_RETURN(err2mysql(err));
588 
589  unpack_record(buf);
590 
591  DBUG_RETURN(0);
592 }
593 
594 int ha_ndbinfo::rnd_pos(uchar *buf, uchar *pos)
595 {
596  DBUG_ENTER("ha_ndbinfo::rnd_pos");
597  assert(is_open());
598  assert(m_impl.m_scan_op == NULL); // No scan started
599 
600  /* Copy the saved row into "buf" and set all fields to not null */
601  memcpy(buf, pos, ref_length);
602  for (uint i = 0; i < table->s->fields; i++)
603  table->field[i]->set_notnull();
604 
605  DBUG_RETURN(0);
606 }
607 
608 void ha_ndbinfo::position(const uchar *record)
609 {
610  DBUG_ENTER("ha_ndbinfo::position");
611  assert(is_open());
612  assert(m_impl.m_scan_op);
613 
614  /* Save away the whole row in "ref" */
615  memcpy(ref, record, ref_length);
616 
617  DBUG_VOID_RETURN;
618 }
619 
620 int ha_ndbinfo::info(uint flag)
621 {
622  DBUG_ENTER("ha_ndbinfo::info");
623  DBUG_PRINT("enter", ("flag: %d", flag));
624  DBUG_RETURN(0);
625 }
626 
627 void
628 ha_ndbinfo::unpack_record(uchar *dst_row)
629 {
630  DBUG_ENTER("ha_ndbinfo::unpack_record");
631  my_ptrdiff_t dst_offset = dst_row - table->record[0];
632 
633  for (uint i = 0; i < table->s->fields; i++)
634  {
635  Field *field = table->field[i];
636  const NdbInfoRecAttr* record = m_impl.m_columns[i];
637  if (record && !record->isNULL())
638  {
639  field->set_notnull();
640  field->move_field_offset(dst_offset);
641  switch (field->type()) {
642 
643  case (MYSQL_TYPE_VARCHAR):
644  {
645  DBUG_PRINT("info", ("str: %s", record->c_str()));
646  Field_varstring* vfield = (Field_varstring *) field;
647  /* Field_bit in DBUG requires the bit set in write_set for store(). */
648  my_bitmap_map *old_map =
649  dbug_tmp_use_all_columns(table, table->write_set);
650  (void)vfield->store(record->c_str(),
651  MIN(record->length(), field->field_length)-1,
652  field->charset());
653  dbug_tmp_restore_column_map(table->write_set, old_map);
654  break;
655  }
656 
657  case (MYSQL_TYPE_LONG):
658  {
659  memcpy(field->ptr, record->ptr(), sizeof(Uint32));
660  break;
661  }
662 
663  case (MYSQL_TYPE_LONGLONG):
664  {
665  memcpy(field->ptr, record->ptr(), sizeof(Uint64));
666  break;
667  }
668 
669  default:
670  sql_print_error("Found unexpected field type %u", field->type());
671  break;
672  }
673 
674  field->move_field_offset(-dst_offset);
675  }
676  else
677  {
678  field->set_null();
679  }
680  }
681  DBUG_VOID_RETURN;
682 }
683 
684 
685 static int
686 ndbinfo_find_files(handlerton *hton, THD *thd,
687  const char *db, const char *path,
688  const char *wild, bool dir, List<LEX_STRING> *files)
689 {
690  DBUG_ENTER("ndbinfo_find_files");
691  DBUG_PRINT("enter", ("db: '%s', dir: %d, path: '%s'", db, dir, path));
692 
693  const bool show_hidden = THDVAR(thd, show_hidden);
694 
695  if(show_hidden)
696  DBUG_RETURN(0); // Don't filter out anything
697 
698  if (dir)
699  {
700  if (!ndbcluster_is_disabled())
701  DBUG_RETURN(0);
702 
703  // Hide our database when ndbcluster is disabled
704  LEX_STRING *dir_name;
705  List_iterator<LEX_STRING> it(*files);
706  while ((dir_name=it++))
707  {
708  if (strcmp(dir_name->str, opt_ndbinfo_dbname))
709  continue;
710 
711  DBUG_PRINT("info", ("Hiding own databse '%s'", dir_name->str));
712  it.remove();
713  }
714 
715  DBUG_RETURN(0);
716  }
717 
718  DBUG_ASSERT(db);
719  if (strcmp(db, opt_ndbinfo_dbname))
720  DBUG_RETURN(0); // Only hide files in "our" db
721 
722  /* Hide all files that start with "our" prefix */
723  LEX_STRING *file_name;
724  List_iterator<LEX_STRING> it(*files);
725  while ((file_name=it++))
726  {
727  if (is_prefix(file_name->str, opt_ndbinfo_table_prefix))
728  {
729  DBUG_PRINT("info", ("Hiding '%s'", file_name->str));
730  it.remove();
731  }
732  }
733 
734  DBUG_RETURN(0);
735 }
736 
737 
738 handlerton* ndbinfo_hton;
739 
740 int ndbinfo_init(void *plugin)
741 {
742  DBUG_ENTER("ndbinfo_init");
743 
744  handlerton *hton = (handlerton *) plugin;
745  hton->create = create_handler;
746  hton->flags =
747  HTON_TEMPORARY_NOT_SUPPORTED |
748  HTON_ALTER_NOT_SUPPORTED;
749  hton->find_files = ndbinfo_find_files;
750 
751  ndbinfo_hton = hton;
752 
753  if (ndbcluster_is_disabled())
754  {
755  // Starting in limited mode since ndbcluster is disabled
756  DBUG_RETURN(0);
757  }
758 
759  char prefix[FN_REFLEN];
760  build_table_filename(prefix, sizeof(prefix) - 1,
761  opt_ndbinfo_dbname, opt_ndbinfo_table_prefix, "", 0);
762  DBUG_PRINT("info", ("prefix: '%s'", prefix));
763  assert(g_ndb_cluster_connection);
764  g_ndbinfo = new NdbInfo(g_ndb_cluster_connection, prefix,
765  opt_ndbinfo_dbname, opt_ndbinfo_table_prefix);
766  if (!g_ndbinfo)
767  {
768  sql_print_error("Failed to create NdbInfo");
769  DBUG_RETURN(1);
770  }
771 
772  if (!g_ndbinfo->init())
773  {
774  sql_print_error("Failed to init NdbInfo");
775 
776  delete g_ndbinfo;
777  g_ndbinfo = NULL;
778 
779  DBUG_RETURN(1);
780  }
781 
782  DBUG_RETURN(0);
783 }
784 
785 int ndbinfo_deinit(void *plugin)
786 {
787  DBUG_ENTER("ndbinfo_deinit");
788 
789  if (g_ndbinfo)
790  {
791  delete g_ndbinfo;
792  g_ndbinfo = NULL;
793  }
794 
795  DBUG_RETURN(0);
796 }
797 
798 struct st_mysql_sys_var* ndbinfo_system_variables[]= {
799  MYSQL_SYSVAR(max_rows),
800  MYSQL_SYSVAR(max_bytes),
801  MYSQL_SYSVAR(show_hidden),
802  MYSQL_SYSVAR(database),
803  MYSQL_SYSVAR(table_prefix),
804  MYSQL_SYSVAR(version),
805  MYSQL_SYSVAR(offline),
806 
807  NULL
808 };
809 
810 template class Vector<const NdbInfoRecAttr*>;
811 
812 #endif