MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ha_federated.cc
1 /* Copyright (c) 2004, 2011, Oracle and/or its affiliates. All rights reserved.
2 
3  This program is free software; you can redistribute it and/or modify
4  it under the terms of the GNU General Public License as published by
5  the Free Software Foundation; version 2 of the License.
6 
7  This program is distributed in the hope that it will be useful,
8  but WITHOUT ANY WARRANTY; without even the implied warranty of
9  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  GNU General Public License for more details.
11 
12  You should have received a copy of the GNU General Public License
13  along with this program; if not, write to the Free Software
14  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
15 
16 /*
17 
18  MySQL Federated Storage Engine
19 
20  ha_federated.cc - MySQL Federated Storage Engine
21  Patrick Galbraith and Brian Aker, 2004
22 
23  This is a handler which uses a foreign database as the data file, as
24  opposed to a handler like MyISAM, which uses .MYD files locally.
25 
26  How this handler works
27  ----------------------------------
28  Normal database files are local and as such: You create a table called
29  'users', a file such as 'users.MYD' is created. A handler reads, inserts,
30  deletes, updates data in this file. The data is stored in particular format,
31  so to read, that data has to be parsed into fields, to write, fields have to
32  be stored in this format to write to this data file.
33 
34  With MySQL Federated storage engine, there will be no local files
35  for each table's data (such as .MYD). A foreign database will store
36  the data that would normally be in this file. This will necessitate
37  the use of MySQL client API to read, delete, update, insert this
38  data. The data will have to be retrieve via an SQL call "SELECT *
39  FROM users". Then, to read this data, it will have to be retrieved
40  via mysql_fetch_row one row at a time, then converted from the
41  column in this select into the format that the handler expects.
42 
43  The create table will simply create the .frm file, and within the
44  "CREATE TABLE" SQL, there SHALL be any of the following :
45 
46  connection=scheme://username:password@hostname:port/database/tablename
47  connection=scheme://username@hostname/database/tablename
48  connection=scheme://username:password@hostname/database/tablename
49  connection=scheme://username:password@hostname/database/tablename
50 
51  - OR -
52 
53  As of 5.1 (See worklog #3031), federated now allows you to use a non-url
54  format, taking advantage of mysql.servers:
55 
56  connection="connection_one"
57  connection="connection_one/table_foo"
58 
59  An example would be:
60 
61  connection=mysql://username:password@hostname:port/database/tablename
62 
63  or, if we had:
64 
65  create server 'server_one' foreign data wrapper 'mysql' options
66  (HOST '127.0.0.1',
67  DATABASE 'db1',
68  USER 'root',
69  PASSWORD '',
70  PORT 3306,
71  SOCKET '',
72  OWNER 'root');
73 
74  CREATE TABLE federated.t1 (
75  `id` int(20) NOT NULL,
76  `name` varchar(64) NOT NULL default ''
77  )
78  ENGINE="FEDERATED" DEFAULT CHARSET=latin1
79  CONNECTION='server_one';
80 
81  So, this will have been the equivalent of
82 
83  CONNECTION="mysql://root@127.0.0.1:3306/db1/t1"
84 
85  Then, we can also change the server to point to a new schema:
86 
87  ALTER SERVER 'server_one' options(DATABASE 'db2');
88 
89  All subsequent calls will now be against db2.t1! Guess what? You don't
90  have to perform an alter table!
91 
92  This connecton="connection string" is necessary for the handler to be
93  able to connect to the foreign server, either by URL, or by server
94  name.
95 
96 
97  The basic flow is this:
98 
99  SQL calls issues locally ->
100  mysql handler API (data in handler format) ->
101  mysql client API (data converted to SQL calls) ->
102  foreign database -> mysql client API ->
103  convert result sets (if any) to handler format ->
104  handler API -> results or rows affected to local
105 
106  What this handler does and doesn't support
107  ------------------------------------------
108  * Tables MUST be created on the foreign server prior to any action on those
109  tables via the handler, first version. IMPORTANT: IF you MUST use the
110  federated storage engine type on the REMOTE end, MAKE SURE [ :) ] That
111  the table you connect to IS NOT a table pointing BACK to your ORIGNAL
112  table! You know and have heard the screaching of audio feedback? You
113  know putting two mirror in front of each other how the reflection
114  continues for eternity? Well, need I say more?!
115  * There will not be support for transactions.
116  * There is no way for the handler to know if the foreign database or table
117  has changed. The reason for this is that this database has to work like a
118  data file that would never be written to by anything other than the
119  database. The integrity of the data in the local table could be breached
120  if there was any change to the foreign database.
121  * Support for SELECT, INSERT, UPDATE , DELETE, indexes.
122  * No ALTER TABLE, DROP TABLE or any other Data Definition Language calls.
123  * Prepared statements will not be used in the first implementation, it
124  remains to to be seen whether the limited subset of the client API for the
125  server supports this.
126  * This uses SELECT, INSERT, UPDATE, DELETE and not HANDLER for its
127  implementation.
128  * This will not work with the query cache.
129 
130  Method calls
131 
132  A two column table, with one record:
133 
134  (SELECT)
135 
136  "SELECT * FROM foo"
137  ha_federated::info
138  ha_federated::scan_time:
139  ha_federated::rnd_init: share->select_query SELECT * FROM foo
140  ha_federated::extra
141 
142  <for every row of data retrieved>
143  ha_federated::rnd_next
144  ha_federated::convert_row_to_internal_format
145  ha_federated::rnd_next
146  </for every row of data retrieved>
147 
148  ha_federated::rnd_end
149  ha_federated::extra
150  ha_federated::reset
151 
152  (INSERT)
153 
154  "INSERT INTO foo (id, ts) VALUES (2, now());"
155 
156  ha_federated::write_row
157 
158  ha_federated::reset
159 
160  (UPDATE)
161 
162  "UPDATE foo SET ts = now() WHERE id = 1;"
163 
164  ha_federated::index_init
165  ha_federated::index_read
166  ha_federated::index_read_idx
167  ha_federated::rnd_next
168  ha_federated::convert_row_to_internal_format
169  ha_federated::update_row
170 
171  ha_federated::extra
172  ha_federated::extra
173  ha_federated::extra
174  ha_federated::external_lock
175  ha_federated::reset
176 
177 
178  How do I use this handler?
179  --------------------------
180  First of all, you need to build this storage engine:
181 
182  ./configure --with-federated-storage-engine
183  make
184 
185  Next, to use this handler, it's very simple. You must
186  have two databases running, either both on the same host, or
187  on different hosts.
188 
189  One the server that will be connecting to the foreign
190  host (client), you create your table as such:
191 
192  CREATE TABLE test_table (
193  id int(20) NOT NULL auto_increment,
194  name varchar(32) NOT NULL default '',
195  other int(20) NOT NULL default '0',
196  PRIMARY KEY (id),
197  KEY name (name),
198  KEY other_key (other))
199  ENGINE="FEDERATED"
200  DEFAULT CHARSET=latin1
201  CONNECTION='mysql://root@127.0.0.1:9306/federated/test_federated';
202 
203  Notice the "COMMENT" and "ENGINE" field? This is where you
204  respectively set the engine type, "FEDERATED" and foreign
205  host information, this being the database your 'client' database
206  will connect to and use as the "data file". Obviously, the foreign
207  database is running on port 9306, so you want to start up your other
208  database so that it is indeed on port 9306, and your federated
209  database on a port other than that. In my setup, I use port 5554
210  for federated, and port 5555 for the foreign database.
211 
212  Then, on the foreign database:
213 
214  CREATE TABLE test_table (
215  id int(20) NOT NULL auto_increment,
216  name varchar(32) NOT NULL default '',
217  other int(20) NOT NULL default '0',
218  PRIMARY KEY (id),
219  KEY name (name),
220  KEY other_key (other))
221  ENGINE="<NAME>" <-- whatever you want, or not specify
222  DEFAULT CHARSET=latin1 ;
223 
224  This table is exactly the same (and must be exactly the same),
225  except that it is not using the federated handler and does
226  not need the URL.
227 
228 
229  How to see the handler in action
230  --------------------------------
231 
232  When developing this handler, I compiled the federated database with
233  debugging:
234 
235  ./configure --with-federated-storage-engine
236  --prefix=/home/mysql/mysql-build/federated/ --with-debug
237 
238  Once compiled, I did a 'make install' (not for the purpose of installing
239  the binary, but to install all the files the binary expects to see in the
240  diretory I specified in the build with --prefix,
241  "/home/mysql/mysql-build/federated".
242 
243  Then, I started the foreign server:
244 
245  /usr/local/mysql/bin/mysqld_safe
246  --user=mysql --log=/tmp/mysqld.5555.log -P 5555
247 
248  Then, I went back to the directory containing the newly compiled mysqld,
249  <builddir>/sql/, started up gdb:
250 
251  gdb ./mysqld
252 
253  Then, withn the (gdb) prompt:
254  (gdb) run --gdb --port=5554 --socket=/tmp/mysqld.5554 --skip-innodb --debug
255 
256  Next, I open several windows for each:
257 
258  1. Tail the debug trace: tail -f /tmp/mysqld.trace|grep ha_fed
259  2. Tail the SQL calls to the foreign database: tail -f /tmp/mysqld.5555.log
260  3. A window with a client open to the federated server on port 5554
261  4. A window with a client open to the federated server on port 5555
262 
263  I would create a table on the client to the foreign server on port
264  5555, and then to the federated server on port 5554. At this point,
265  I would run whatever queries I wanted to on the federated server,
266  just always remembering that whatever changes I wanted to make on
267  the table, or if I created new tables, that I would have to do that
268  on the foreign server.
269 
270  Another thing to look for is 'show variables' to show you that you have
271  support for federated handler support:
272 
273  show variables like '%federat%'
274 
275  and:
276 
277  show storage engines;
278 
279  Both should display the federated storage handler.
280 
281 
282  Testing
283  -------
284 
285  There is a test for MySQL Federated Storage Handler in ./mysql-test/t,
286  federatedd.test It starts both a slave and master database using
287  the same setup that the replication tests use, with the exception that
288  it turns off replication, and sets replication to ignore the test tables.
289  After ensuring that you actually do have support for the federated storage
290  handler, numerous queries/inserts/updates/deletes are run, many derived
291  from the MyISAM tests, plus som other tests which were meant to reveal
292  any issues that would be most likely to affect this handler. All tests
293  should work! ;)
294 
295  To run these tests, go into ./mysql-test (based in the directory you
296  built the server in)
297 
298  ./mysql-test-run federated
299 
300  To run the test, or if you want to run the test and have debug info:
301 
302  ./mysql-test-run --debug federated
303 
304  This will run the test in debug mode, and you can view the trace and
305  log files in the ./mysql-test/var/log directory
306 
307  ls -l mysql-test/var/log/
308  -rw-r--r-- 1 patg patg 17 4 Dec 12:27 current_test
309  -rw-r--r-- 1 patg patg 692 4 Dec 12:52 manager.log
310  -rw-rw---- 1 patg patg 21246 4 Dec 12:51 master-bin.000001
311  -rw-rw---- 1 patg patg 68 4 Dec 12:28 master-bin.index
312  -rw-r--r-- 1 patg patg 1620 4 Dec 12:51 master.err
313  -rw-rw---- 1 patg patg 23179 4 Dec 12:51 master.log
314  -rw-rw---- 1 patg patg 16696550 4 Dec 12:51 master.trace
315  -rw-r--r-- 1 patg patg 0 4 Dec 12:28 mysqltest-time
316  -rw-r--r-- 1 patg patg 2024051 4 Dec 12:51 mysqltest.trace
317  -rw-rw---- 1 patg patg 94992 4 Dec 12:51 slave-bin.000001
318  -rw-rw---- 1 patg patg 67 4 Dec 12:28 slave-bin.index
319  -rw-rw---- 1 patg patg 249 4 Dec 12:52 slave-relay-bin.000003
320  -rw-rw---- 1 patg patg 73 4 Dec 12:28 slave-relay-bin.index
321  -rw-r--r-- 1 patg patg 1349 4 Dec 12:51 slave.err
322  -rw-rw---- 1 patg patg 96206 4 Dec 12:52 slave.log
323  -rw-rw---- 1 patg patg 15706355 4 Dec 12:51 slave.trace
324  -rw-r--r-- 1 patg patg 0 4 Dec 12:51 warnings
325 
326  Of course, again, you can tail the trace log:
327 
328  tail -f mysql-test/var/log/master.trace |grep ha_fed
329 
330  As well as the slave query log:
331 
332  tail -f mysql-test/var/log/slave.log
333 
334  Files that comprise the test suit
335  ---------------------------------
336  mysql-test/t/federated.test
337  mysql-test/r/federated.result
338  mysql-test/r/have_federated_db.require
339  mysql-test/include/have_federated_db.inc
340 
341 
342  Other tidbits
343  -------------
344 
345  These were the files that were modified or created for this
346  Federated handler to work, in 5.0:
347 
348  ./configure.in
349  ./sql/Makefile.am
350  ./config/ac_macros/ha_federated.m4
351  ./sql/handler.cc
352  ./sql/mysqld.cc
353  ./sql/set_var.cc
354  ./sql/field.h
355  ./sql/sql_string.h
356  ./mysql-test/mysql-test-run(.sh)
357  ./mysql-test/t/federated.test
358  ./mysql-test/r/federated.result
359  ./mysql-test/r/have_federated_db.require
360  ./mysql-test/include/have_federated_db.inc
361  ./sql/ha_federated.cc
362  ./sql/ha_federated.h
363 
364  In 5.1
365 
366  my:~/mysql-build/mysql-5.1-bkbits patg$ ls storage/federated/
367  CMakeLists.txt Makefile.in ha_federated.h plug.in
368  Makefile SCCS libfederated.a
369  Makefile.am ha_federated.cc libfederated_a-ha_federated.o
370 
371 */
372 
373 
374 #define MYSQL_SERVER 1
375 #include "sql_priv.h"
376 #include "sql_servers.h" // FOREIGN_SERVER, get_server_by_name
377 #include "sql_class.h" // SSV
378 #include "sql_analyse.h" // append_escaped
379 #include <mysql/plugin.h>
380 
381 #include "ha_federated.h"
382 #include "probes_mysql.h"
383 
384 #include "m_string.h"
385 #include "key.h" // key_copy
386 
387 #include <mysql/plugin.h>
388 
389 #include <algorithm>
390 
391 using std::min;
392 using std::max;
393 
394 /* Variables for federated share methods */
395 static HASH federated_open_tables; // To track open tables
396 mysql_mutex_t federated_mutex; // To init the hash
397 static char ident_quote_char= '`'; // Character for quoting
398  // identifiers
399 static char value_quote_char= '\''; // Character for quoting
400  // literals
401 static const int bulk_padding= 64; // bytes "overhead" in packet
402 
403 /* Variables used when chopping off trailing characters */
404 static const uint sizeof_trailing_comma= sizeof(", ") - 1;
405 static const uint sizeof_trailing_closeparen= sizeof(") ") - 1;
406 static const uint sizeof_trailing_and= sizeof(" AND ") - 1;
407 static const uint sizeof_trailing_where= sizeof(" WHERE ") - 1;
408 
409 /* Static declaration for handerton */
410 static handler *federated_create_handler(handlerton *hton,
412  MEM_ROOT *mem_root);
413 static int federated_commit(handlerton *hton, THD *thd, bool all);
414 static int federated_rollback(handlerton *hton, THD *thd, bool all);
415 
416 /* Federated storage engine handlerton */
417 
418 static handler *federated_create_handler(handlerton *hton,
420  MEM_ROOT *mem_root)
421 {
422  return new (mem_root) ha_federated(hton, table);
423 }
424 
425 
426 /* Function we use in the creation of our hash to get key */
427 
428 static uchar *federated_get_key(FEDERATED_SHARE *share, size_t *length,
429  my_bool not_used __attribute__ ((unused)))
430 {
431  *length= share->share_key_length;
432  return (uchar*) share->share_key;
433 }
434 
435 #ifdef HAVE_PSI_INTERFACE
436 static PSI_mutex_key fe_key_mutex_federated, fe_key_mutex_FEDERATED_SHARE_mutex;
437 
438 static PSI_mutex_info all_federated_mutexes[]=
439 {
440  { &fe_key_mutex_federated, "federated", PSI_FLAG_GLOBAL},
441  { &fe_key_mutex_FEDERATED_SHARE_mutex, "FEDERATED_SHARE::mutex", 0}
442 };
443 
444 static void init_federated_psi_keys(void)
445 {
446  const char* category= "federated";
447  int count;
448 
449  count= array_elements(all_federated_mutexes);
450  mysql_mutex_register(category, all_federated_mutexes, count);
451 }
452 #endif /* HAVE_PSI_INTERFACE */
453 
454 /*
455  Initialize the federated handler.
456 
457  SYNOPSIS
458  federated_db_init()
459  p Handlerton
460 
461  RETURN
462  FALSE OK
463  TRUE Error
464 */
465 
466 int federated_db_init(void *p)
467 {
468  DBUG_ENTER("federated_db_init");
469 
470 #ifdef HAVE_PSI_INTERFACE
471  init_federated_psi_keys();
472 #endif /* HAVE_PSI_INTERFACE */
473 
474  handlerton *federated_hton= (handlerton *)p;
475  federated_hton->state= SHOW_OPTION_YES;
476  federated_hton->db_type= DB_TYPE_FEDERATED_DB;
477  federated_hton->commit= federated_commit;
478  federated_hton->rollback= federated_rollback;
479  federated_hton->create= federated_create_handler;
480  federated_hton->flags= HTON_ALTER_NOT_SUPPORTED | HTON_NO_PARTITION;
481 
482  /*
483  Support for transactions disabled until WL#2952 fixes it.
484  We do it like this to avoid "defined but not used" compiler warnings.
485  */
486  federated_hton->commit= 0;
487  federated_hton->rollback= 0;
488 
489  if (mysql_mutex_init(fe_key_mutex_federated,
490  &federated_mutex, MY_MUTEX_INIT_FAST))
491  goto error;
492  if (!my_hash_init(&federated_open_tables, &my_charset_bin, 32, 0, 0,
493  (my_hash_get_key) federated_get_key, 0, 0))
494  {
495  DBUG_RETURN(FALSE);
496  }
497 
498  mysql_mutex_destroy(&federated_mutex);
499 error:
500  DBUG_RETURN(TRUE);
501 }
502 
503 
504 /*
505  Release the federated handler.
506 
507  SYNOPSIS
508  federated_db_end()
509 
510  RETURN
511  FALSE OK
512 */
513 
514 int federated_done(void *p)
515 {
516  my_hash_free(&federated_open_tables);
517  mysql_mutex_destroy(&federated_mutex);
518 
519  return 0;
520 }
521 
522 
539 static bool append_ident(String *string, const char *name, size_t length,
540  const char quote_char)
541 {
542  bool result;
543  uint clen;
544  const char *name_end;
545  DBUG_ENTER("append_ident");
546 
547  if (quote_char)
548  {
549  string->reserve((uint) length * 2 + 2);
550  if ((result= string->append(&quote_char, 1, system_charset_info)))
551  goto err;
552 
553  for (name_end= name+length; name < name_end; name+= clen)
554  {
555  uchar c= *(uchar *) name;
556  if (!(clen= my_mbcharlen(system_charset_info, c)))
557  clen= 1;
558  if (clen == 1 && c == (uchar) quote_char &&
559  (result= string->append(&quote_char, 1, system_charset_info)))
560  goto err;
561  if ((result= string->append(name, clen, string->charset())))
562  goto err;
563  }
564  result= string->append(&quote_char, 1, system_charset_info);
565  }
566  else
567  result= string->append(name, (uint) length, system_charset_info);
568 
569 err:
570  DBUG_RETURN(result);
571 }
572 
573 
574 static int parse_url_error(FEDERATED_SHARE *share, TABLE *table, int error_num)
575 {
576  char buf[FEDERATED_QUERY_BUFFER_SIZE];
577  size_t buf_len;
578  DBUG_ENTER("ha_federated parse_url_error");
579 
580  buf_len= min<size_t>(table->s->connect_string.length,
581  FEDERATED_QUERY_BUFFER_SIZE-1);
582  strmake(buf, table->s->connect_string.str, buf_len);
583  my_error(error_num, MYF(0), buf);
584  DBUG_RETURN(error_num);
585 }
586 
587 /*
588  retrieve server object which contains server meta-data
589  from the system table given a server's name, set share
590  connection parameter members
591 */
592 int get_connection(MEM_ROOT *mem_root, FEDERATED_SHARE *share)
593 {
594  int error_num= ER_FOREIGN_SERVER_DOESNT_EXIST;
595  FOREIGN_SERVER *server, server_buffer;
596  DBUG_ENTER("ha_federated::get_connection");
597 
598  /*
599  get_server_by_name() clones the server if exists and allocates
600  copies of strings in the supplied mem_root
601  */
602  if (!(server=
603  get_server_by_name(mem_root, share->connection_string, &server_buffer)))
604  {
605  DBUG_PRINT("info", ("get_server_by_name returned > 0 error condition!"));
606  error_num= ER_FOREIGN_DATA_STRING_INVALID_CANT_CREATE;
607  goto error;
608  }
609  DBUG_PRINT("info", ("get_server_by_name returned server at %lx",
610  (long unsigned int) server));
611 
612  /*
613  Most of these should never be empty strings, error handling will
614  need to be implemented. Also, is this the best way to set the share
615  members? Is there some allocation needed? In running this code, it works
616  except there are errors in the trace file of the share being overrun
617  at the address of the share.
618  */
619  share->server_name_length= server->server_name_length;
620  share->server_name= server->server_name;
621  share->username= server->username;
622  share->password= server->password;
623  share->database= server->db;
624 #ifndef I_AM_PARANOID
625  share->port= server->port > 0 && server->port < 65536 ?
626 #else
627  share->port= server->port > 1023 && server->port < 65536 ?
628 #endif
629  (ushort) server->port : MYSQL_PORT;
630  share->hostname= server->host;
631  if (!(share->socket= server->socket) &&
632  !strcmp(share->hostname, my_localhost))
633  share->socket= (char *) MYSQL_UNIX_ADDR;
634  share->scheme= server->scheme;
635 
636  DBUG_PRINT("info", ("share->username %s", share->username));
637  DBUG_PRINT("info", ("share->password %s", share->password));
638  DBUG_PRINT("info", ("share->hostname %s", share->hostname));
639  DBUG_PRINT("info", ("share->database %s", share->database));
640  DBUG_PRINT("info", ("share->port %d", share->port));
641  DBUG_PRINT("info", ("share->socket %s", share->socket));
642  DBUG_RETURN(0);
643 
644 error:
645  my_printf_error(error_num, "server name: '%s' doesn't exist!",
646  MYF(0), share->connection_string);
647  DBUG_RETURN(error_num);
648 }
649 
650 /*
651  Parse connection info from table->s->connect_string
652 
653  SYNOPSIS
654  parse_url()
655  mem_root MEM_ROOT pointer for memory allocation
656  share pointer to FEDERATED share
657  table pointer to current TABLE class
658  table_create_flag determines what error to throw
659 
660  DESCRIPTION
661  Populates the share with information about the connection
662  to the foreign database that will serve as the data source.
663  This string must be specified (currently) in the "CONNECTION" field,
664  listed in the CREATE TABLE statement.
665 
666  This string MUST be in the format of any of these:
667 
668  CONNECTION="scheme://username:password@hostname:port/database/table"
669  CONNECTION="scheme://username@hostname/database/table"
670  CONNECTION="scheme://username@hostname:port/database/table"
671  CONNECTION="scheme://username:password@hostname/database/table"
672 
673  _OR_
674 
675  CONNECTION="connection name"
676 
677 
678 
679  An Example:
680 
681  CREATE TABLE t1 (id int(32))
682  ENGINE="FEDERATED"
683  CONNECTION="mysql://joe:joespass@192.168.1.111:9308/federated/testtable";
684 
685  CREATE TABLE t2 (
686  id int(4) NOT NULL auto_increment,
687  name varchar(32) NOT NULL,
688  PRIMARY KEY(id)
689  ) ENGINE="FEDERATED" CONNECTION="my_conn";
690 
691  ***IMPORTANT***
692  Currently, the Federated Storage Engine only supports connecting to another
693  MySQL Database ("scheme" of "mysql"). Connections using JDBC as well as
694  other connectors are in the planning stage.
695 
696 
697  'password' and 'port' are both optional.
698 
699  RETURN VALUE
700  0 success
701  error_num particular error code
702 
703 */
704 
705 static int parse_url(MEM_ROOT *mem_root, FEDERATED_SHARE *share, TABLE *table,
706  uint table_create_flag)
707 {
708  uint error_num= (table_create_flag ?
709  ER_FOREIGN_DATA_STRING_INVALID_CANT_CREATE :
710  ER_FOREIGN_DATA_STRING_INVALID);
711  DBUG_ENTER("ha_federated::parse_url");
712 
713  share->port= 0;
714  share->socket= 0;
715  DBUG_PRINT("info", ("share at %lx", (long unsigned int) share));
716  DBUG_PRINT("info", ("Length: %u", (uint) table->s->connect_string.length));
717  DBUG_PRINT("info", ("String: '%.*s'", (int) table->s->connect_string.length,
718  table->s->connect_string.str));
719  share->connection_string= strmake_root(mem_root, table->s->connect_string.str,
720  table->s->connect_string.length);
721 
722  DBUG_PRINT("info",("parse_url alloced share->connection_string %lx",
723  (long unsigned int) share->connection_string));
724 
725  DBUG_PRINT("info",("share->connection_string %s",share->connection_string));
726  /*
727  No :// or @ in connection string. Must be a straight connection name of
728  either "servername" or "servername/tablename"
729  */
730  if ( (!strstr(share->connection_string, "://") &&
731  (!strchr(share->connection_string, '@'))))
732  {
733 
734  DBUG_PRINT("info",
735  ("share->connection_string %s internal format \
736  share->connection_string %lx",
737  share->connection_string,
738  (long unsigned int) share->connection_string));
739 
740  /* ok, so we do a little parsing, but not completely! */
741  share->parsed= FALSE;
742  /*
743  If there is a single '/' in the connection string, this means the user is
744  specifying a table name
745  */
746 
747  if ((share->table_name= strchr(share->connection_string, '/')))
748  {
749  share->connection_string[share->table_name - share->connection_string]= '\0';
750  share->table_name++;
751  share->table_name_length= (uint) strlen(share->table_name);
752 
753  DBUG_PRINT("info",
754  ("internal format, parsed table_name share->connection_string \
755  %s share->table_name %s",
756  share->connection_string, share->table_name));
757 
758  /*
759  there better not be any more '/'s !
760  */
761  if (strchr(share->table_name, '/'))
762  goto error;
763 
764  }
765  /*
766  otherwise, straight server name, use tablename of federated table
767  as remote table name
768  */
769  else
770  {
771  /*
772  connection specifies everything but, resort to
773  expecting remote and foreign table names to match
774  */
775  share->table_name= strmake_root(mem_root, table->s->table_name.str,
776  (share->table_name_length= table->s->table_name.length));
777  DBUG_PRINT("info",
778  ("internal format, default table_name share->connection_string \
779  %s share->table_name %s",
780  share->connection_string, share->table_name));
781  }
782 
783  if ((error_num= get_connection(mem_root, share)))
784  goto error;
785  }
786  else
787  {
788  share->parsed= TRUE;
789  // Add a null for later termination of table name
790  share->connection_string[table->s->connect_string.length]= 0;
791  share->scheme= share->connection_string;
792  DBUG_PRINT("info",("parse_url alloced share->scheme %lx",
793  (long unsigned int) share->scheme));
794 
795  /*
796  remove addition of null terminator and store length
797  for each string in share
798  */
799  if (!(share->username= strstr(share->scheme, "://")))
800  goto error;
801  share->scheme[share->username - share->scheme]= '\0';
802 
803  if (strcmp(share->scheme, "mysql") != 0)
804  goto error;
805 
806  share->username+= 3;
807 
808  if (!(share->hostname= strchr(share->username, '@')))
809  goto error;
810 
811  share->username[share->hostname - share->username]= '\0';
812  share->hostname++;
813 
814  if ((share->password= strchr(share->username, ':')))
815  {
816  share->username[share->password - share->username]= '\0';
817  share->password++;
818  share->username= share->username;
819  /* make sure there isn't an extra / or @ */
820  if ((strchr(share->password, '/') || strchr(share->hostname, '@')))
821  goto error;
822  /*
823  Found that if the string is:
824  user:@hostname:port/db/table
825  Then password is a null string, so set to NULL
826  */
827  if (share->password[0] == '\0')
828  share->password= NULL;
829  }
830  else
831  share->username= share->username;
832 
833  /* make sure there isn't an extra / or @ */
834  if ((strchr(share->username, '/')) || (strchr(share->hostname, '@')))
835  goto error;
836 
837  if (!(share->database= strchr(share->hostname, '/')))
838  goto error;
839  share->hostname[share->database - share->hostname]= '\0';
840  share->database++;
841 
842  if ((share->sport= strchr(share->hostname, ':')))
843  {
844  share->hostname[share->sport - share->hostname]= '\0';
845  share->sport++;
846  if (share->sport[0] == '\0')
847  share->sport= NULL;
848  else
849  share->port= atoi(share->sport);
850  }
851 
852  if (!(share->table_name= strchr(share->database, '/')))
853  goto error;
854  share->database[share->table_name - share->database]= '\0';
855  share->table_name++;
856 
857  share->table_name_length= strlen(share->table_name);
858 
859  /* make sure there's not an extra / */
860  if ((strchr(share->table_name, '/')))
861  goto error;
862 
863  /*
864  If hostname is omitted, we set it to NULL. According to
865  mysql_real_connect() manual:
866  The value of host may be either a hostname or an IP address.
867  If host is NULL or the string "localhost", a connection to the
868  local host is assumed.
869  */
870  if (share->hostname[0] == '\0')
871  share->hostname= NULL;
872  }
873 
874  if (!share->port)
875  {
876  if (!share->hostname || strcmp(share->hostname, my_localhost) == 0)
877  share->socket= (char*) MYSQL_UNIX_ADDR;
878  else
879  share->port= MYSQL_PORT;
880  }
881 
882  DBUG_PRINT("info",
883  ("scheme: %s username: %s password: %s \
884  hostname: %s port: %d db: %s tablename: %s",
885  share->scheme, share->username, share->password,
886  share->hostname, share->port, share->database,
887  share->table_name));
888 
889  DBUG_RETURN(0);
890 
891 error:
892  DBUG_RETURN(parse_url_error(share, table, error_num));
893 }
894 
895 /*****************************************************************************
896 ** FEDERATED tables
897 *****************************************************************************/
898 
899 ha_federated::ha_federated(handlerton *hton,
900  TABLE_SHARE *table_arg)
901  :handler(hton, table_arg),
902  mysql(0), stored_result(0)
903 {
904  trx_next= 0;
905  memset(&bulk_insert, 0, sizeof(bulk_insert));
906 }
907 
908 
909 /*
910  Convert MySQL result set row to handler internal format
911 
912  SYNOPSIS
913  convert_row_to_internal_format()
914  record Byte pointer to record
915  row MySQL result set row from fetchrow()
916  result Result set to use
917 
918  DESCRIPTION
919  This method simply iterates through a row returned via fetchrow with
920  values from a successful SELECT , and then stores each column's value
921  in the field object via the field object pointer (pointing to the table's
922  array of field object pointers). This is how the handler needs the data
923  to be stored to then return results back to the user
924 
925  RETURN VALUE
926  0 After fields have had field values stored from record
927 */
928 
929 uint ha_federated::convert_row_to_internal_format(uchar *record,
930  MYSQL_ROW row,
931  MYSQL_RES *result)
932 {
933  ulong *lengths;
934  Field **field;
935  my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->write_set);
936  DBUG_ENTER("ha_federated::convert_row_to_internal_format");
937 
938  lengths= mysql_fetch_lengths(result);
939 
940  for (field= table->field; *field; field++, row++, lengths++)
941  {
942  /*
943  index variable to move us through the row at the
944  same iterative step as the field
945  */
946  my_ptrdiff_t old_ptr;
947  old_ptr= (my_ptrdiff_t) (record - table->record[0]);
948  (*field)->move_field_offset(old_ptr);
949  if (!*row)
950  {
951  (*field)->set_null();
952  (*field)->reset();
953  }
954  else
955  {
956  if (bitmap_is_set(table->read_set, (*field)->field_index))
957  {
958  (*field)->set_notnull();
959  (*field)->store(*row, *lengths, &my_charset_bin);
960  }
961  }
962  (*field)->move_field_offset(-old_ptr);
963  }
964  dbug_tmp_restore_column_map(table->write_set, old_map);
965  DBUG_RETURN(0);
966 }
967 
968 static bool emit_key_part_name(String *to, KEY_PART_INFO *part)
969 {
970  DBUG_ENTER("emit_key_part_name");
971  if (append_ident(to, part->field->field_name,
972  strlen(part->field->field_name), ident_quote_char))
973  DBUG_RETURN(1); // Out of memory
974  DBUG_RETURN(0);
975 }
976 
977 static bool emit_key_part_element(String *to, KEY_PART_INFO *part,
978  bool needs_quotes, bool is_like,
979  const uchar *ptr, uint len)
980 {
981  Field *field= part->field;
982  DBUG_ENTER("emit_key_part_element");
983 
984  if (needs_quotes && to->append(STRING_WITH_LEN("'")))
985  DBUG_RETURN(1);
986 
987  if (part->type == HA_KEYTYPE_BIT)
988  {
989  char buff[STRING_BUFFER_USUAL_SIZE], *buf= buff;
990 
991  *buf++= '0';
992  *buf++= 'x';
993  buf= octet2hex(buf, (char*) ptr, len);
994  if (to->append((char*) buff, (uint)(buf - buff)))
995  DBUG_RETURN(1);
996  }
997  else if (part->key_part_flag & HA_BLOB_PART)
998  {
999  String blob;
1000  uint blob_length= uint2korr(ptr);
1001  blob.set_quick((char*) ptr+HA_KEY_BLOB_LENGTH,
1002  blob_length, &my_charset_bin);
1003  if (append_escaped(to, &blob))
1004  DBUG_RETURN(1);
1005  }
1006  else if (part->key_part_flag & HA_VAR_LENGTH_PART)
1007  {
1008  String varchar;
1009  uint var_length= uint2korr(ptr);
1010  varchar.set_quick((char*) ptr+HA_KEY_BLOB_LENGTH,
1011  var_length, &my_charset_bin);
1012  if (append_escaped(to, &varchar))
1013  DBUG_RETURN(1);
1014  }
1015  else
1016  {
1017  char strbuff[MAX_FIELD_WIDTH];
1018  String str(strbuff, sizeof(strbuff), part->field->charset()), *res;
1019 
1020  res= field->val_str(&str, ptr);
1021 
1022  if (field->result_type() == STRING_RESULT)
1023  {
1024  if (append_escaped(to, res))
1025  DBUG_RETURN(1);
1026  }
1027  else if (to->append(res->ptr(), res->length()))
1028  DBUG_RETURN(1);
1029  }
1030 
1031  if (is_like && to->append(STRING_WITH_LEN("%")))
1032  DBUG_RETURN(1);
1033 
1034  if (needs_quotes && to->append(STRING_WITH_LEN("'")))
1035  DBUG_RETURN(1);
1036 
1037  DBUG_RETURN(0);
1038 }
1039 
1040 /*
1041  Create a WHERE clause based off of values in keys
1042  Note: This code was inspired by key_copy from key.cc
1043 
1044  SYNOPSIS
1045  create_where_from_key ()
1046  to String object to store WHERE clause
1047  key_info KEY struct pointer
1048  key byte pointer containing key
1049  key_length length of key
1050  range_type 0 - no range, 1 - min range, 2 - max range
1051  (see enum range_operation)
1052 
1053  DESCRIPTION
1054  Using iteration through all the keys via a KEY_PART_INFO pointer,
1055  This method 'extracts' the value of each key in the byte pointer
1056  *key, and for each key found, constructs an appropriate WHERE clause
1057 
1058  RETURN VALUE
1059  0 After all keys have been accounted for to create the WHERE clause
1060  1 No keys found
1061 
1062  Range flags Table per Timour:
1063 
1064  -----------------
1065  - start_key:
1066  * ">" -> HA_READ_AFTER_KEY
1067  * ">=" -> HA_READ_KEY_OR_NEXT
1068  * "=" -> HA_READ_KEY_EXACT
1069 
1070  - end_key:
1071  * "<" -> HA_READ_BEFORE_KEY
1072  * "<=" -> HA_READ_AFTER_KEY
1073 
1074  records_in_range:
1075  -----------------
1076  - start_key:
1077  * ">" -> HA_READ_AFTER_KEY
1078  * ">=" -> HA_READ_KEY_EXACT
1079  * "=" -> HA_READ_KEY_EXACT
1080 
1081  - end_key:
1082  * "<" -> HA_READ_BEFORE_KEY
1083  * "<=" -> HA_READ_AFTER_KEY
1084  * "=" -> HA_READ_AFTER_KEY
1085 
1086 0 HA_READ_KEY_EXACT, Find first record else error
1087 1 HA_READ_KEY_OR_NEXT, Record or next record
1088 2 HA_READ_KEY_OR_PREV, Record or previous
1089 3 HA_READ_AFTER_KEY, Find next rec. after key-record
1090 4 HA_READ_BEFORE_KEY, Find next rec. before key-record
1091 5 HA_READ_PREFIX, Key which as same prefix
1092 6 HA_READ_PREFIX_LAST, Last key with the same prefix
1093 7 HA_READ_PREFIX_LAST_OR_PREV, Last or prev key with the same prefix
1094 
1095 Flags that I've found:
1096 
1097 id, primary key, varchar
1098 
1099 id = 'ccccc'
1100 records_in_range: start_key 0 end_key 3
1101 read_range_first: start_key 0 end_key NULL
1102 
1103 id > 'ccccc'
1104 records_in_range: start_key 3 end_key NULL
1105 read_range_first: start_key 3 end_key NULL
1106 
1107 id < 'ccccc'
1108 records_in_range: start_key NULL end_key 4
1109 read_range_first: start_key NULL end_key 4
1110 
1111 id <= 'ccccc'
1112 records_in_range: start_key NULL end_key 3
1113 read_range_first: start_key NULL end_key 3
1114 
1115 id >= 'ccccc'
1116 records_in_range: start_key 0 end_key NULL
1117 read_range_first: start_key 1 end_key NULL
1118 
1119 id like 'cc%cc'
1120 records_in_range: start_key 0 end_key 3
1121 read_range_first: start_key 1 end_key 3
1122 
1123 id > 'aaaaa' and id < 'ccccc'
1124 records_in_range: start_key 3 end_key 4
1125 read_range_first: start_key 3 end_key 4
1126 
1127 id >= 'aaaaa' and id < 'ccccc';
1128 records_in_range: start_key 0 end_key 4
1129 read_range_first: start_key 1 end_key 4
1130 
1131 id >= 'aaaaa' and id <= 'ccccc';
1132 records_in_range: start_key 0 end_key 3
1133 read_range_first: start_key 1 end_key 3
1134 
1135 id > 'aaaaa' and id <= 'ccccc';
1136 records_in_range: start_key 3 end_key 3
1137 read_range_first: start_key 3 end_key 3
1138 
1139 numeric keys:
1140 
1141 id = 4
1142 index_read_idx: start_key 0 end_key NULL
1143 
1144 id > 4
1145 records_in_range: start_key 3 end_key NULL
1146 read_range_first: start_key 3 end_key NULL
1147 
1148 id >= 4
1149 records_in_range: start_key 0 end_key NULL
1150 read_range_first: start_key 1 end_key NULL
1151 
1152 id < 4
1153 records_in_range: start_key NULL end_key 4
1154 read_range_first: start_key NULL end_key 4
1155 
1156 id <= 4
1157 records_in_range: start_key NULL end_key 3
1158 read_range_first: start_key NULL end_key 3
1159 
1160 id like 4
1161 full table scan, select * from
1162 
1163 id > 2 and id < 8
1164 records_in_range: start_key 3 end_key 4
1165 read_range_first: start_key 3 end_key 4
1166 
1167 id >= 2 and id < 8
1168 records_in_range: start_key 0 end_key 4
1169 read_range_first: start_key 1 end_key 4
1170 
1171 id >= 2 and id <= 8
1172 records_in_range: start_key 0 end_key 3
1173 read_range_first: start_key 1 end_key 3
1174 
1175 id > 2 and id <= 8
1176 records_in_range: start_key 3 end_key 3
1177 read_range_first: start_key 3 end_key 3
1178 
1179 multi keys (id int, name varchar, other varchar)
1180 
1181 id = 1;
1182 records_in_range: start_key 0 end_key 3
1183 read_range_first: start_key 0 end_key NULL
1184 
1185 id > 4;
1186 id > 2 and name = '333'; remote: id > 2
1187 id > 2 and name > '333'; remote: id > 2
1188 id > 2 and name > '333' and other < 'ddd'; remote: id > 2 no results
1189 id > 2 and name >= '333' and other < 'ddd'; remote: id > 2 1 result
1190 id >= 4 and name = 'eric was here' and other > 'eeee';
1191 records_in_range: start_key 3 end_key NULL
1192 read_range_first: start_key 3 end_key NULL
1193 
1194 id >= 4;
1195 id >= 2 and name = '333' and other < 'ddd';
1196 remote: `id` >= 2 AND `name` >= '333';
1197 records_in_range: start_key 0 end_key NULL
1198 read_range_first: start_key 1 end_key NULL
1199 
1200 id < 4;
1201 id < 3 and name = '222' and other <= 'ccc'; remote: id < 3
1202 records_in_range: start_key NULL end_key 4
1203 read_range_first: start_key NULL end_key 4
1204 
1205 id <= 4;
1206 records_in_range: start_key NULL end_key 3
1207 read_range_first: start_key NULL end_key 3
1208 
1209 id like 4;
1210 full table scan
1211 
1212 id > 2 and id < 4;
1213 records_in_range: start_key 3 end_key 4
1214 read_range_first: start_key 3 end_key 4
1215 
1216 id >= 2 and id < 4;
1217 records_in_range: start_key 0 end_key 4
1218 read_range_first: start_key 1 end_key 4
1219 
1220 id >= 2 and id <= 4;
1221 records_in_range: start_key 0 end_key 3
1222 read_range_first: start_key 1 end_key 3
1223 
1224 id > 2 and id <= 4;
1225 id = 6 and name = 'eric was here' and other > 'eeee';
1226 remote: (`id` > 6 AND `name` > 'eric was here' AND `other` > 'eeee')
1227 AND (`id` <= 6) AND ( AND `name` <= 'eric was here')
1228 no results
1229 records_in_range: start_key 3 end_key 3
1230 read_range_first: start_key 3 end_key 3
1231 
1232 Summary:
1233 
1234 * If the start key flag is 0 the max key flag shouldn't even be set,
1235  and if it is, the query produced would be invalid.
1236 * Multipart keys, even if containing some or all numeric columns,
1237  are treated the same as non-numeric keys
1238 
1239  If the query is " = " (quotes or not):
1240  - records in range start key flag HA_READ_KEY_EXACT,
1241  end key flag HA_READ_AFTER_KEY (incorrect)
1242  - any other: start key flag HA_READ_KEY_OR_NEXT,
1243  end key flag HA_READ_AFTER_KEY (correct)
1244 
1245 * 'like' queries (of key)
1246  - Numeric, full table scan
1247  - Non-numeric
1248  records_in_range: start_key 0 end_key 3
1249  other : start_key 1 end_key 3
1250 
1251 * If the key flag is HA_READ_AFTER_KEY:
1252  if start_key, append >
1253  if end_key, append <=
1254 
1255 * If create_where_key was called by records_in_range:
1256 
1257  - if the key is numeric:
1258  start key flag is 0 when end key is NULL, end key flag is 3 or 4
1259  - if create_where_key was called by any other function:
1260  start key flag is 1 when end key is NULL, end key flag is 3 or 4
1261  - if the key is non-numeric, or multipart
1262  When the query is an exact match, the start key flag is 0,
1263  end key flag is 3 for what should be a no-range condition where
1264  you should have 0 and max key NULL, which it is if called by
1265  read_range_first
1266 
1267 Conclusion:
1268 
1269 1. Need logic to determin if a key is min or max when the flag is
1270 HA_READ_AFTER_KEY, and handle appending correct operator accordingly
1271 
1272 2. Need a boolean flag to pass to create_where_from_key, used in the
1273 switch statement. Add 1 to the flag if:
1274  - start key flag is HA_READ_KEY_EXACT and the end key is NULL
1275 
1276 */
1277 
1278 bool ha_federated::create_where_from_key(String *to,
1279  KEY *key_info,
1280  const key_range *start_key,
1281  const key_range *end_key,
1282  bool from_records_in_range,
1283  bool eq_range_arg)
1284 {
1285  bool both_not_null=
1286  (start_key != NULL && end_key != NULL) ? TRUE : FALSE;
1287  const uchar *ptr;
1288  uint remainder, length;
1289  char tmpbuff[FEDERATED_QUERY_BUFFER_SIZE];
1290  String tmp(tmpbuff, sizeof(tmpbuff), system_charset_info);
1291  const key_range *ranges[2]= { start_key, end_key };
1292  my_bitmap_map *old_map;
1293  DBUG_ENTER("ha_federated::create_where_from_key");
1294 
1295  tmp.length(0);
1296  if (start_key == NULL && end_key == NULL)
1297  DBUG_RETURN(1);
1298 
1299  old_map= dbug_tmp_use_all_columns(table, table->write_set);
1300  for (uint i= 0; i <= 1; i++)
1301  {
1302  bool needs_quotes;
1303  KEY_PART_INFO *key_part;
1304  if (ranges[i] == NULL)
1305  continue;
1306 
1307  if (both_not_null)
1308  {
1309  if (i > 0)
1310  tmp.append(STRING_WITH_LEN(") AND ("));
1311  else
1312  tmp.append(STRING_WITH_LEN(" ("));
1313  }
1314 
1315  for (key_part= key_info->key_part,
1316  remainder= key_info->user_defined_key_parts,
1317  length= ranges[i]->length,
1318  ptr= ranges[i]->key; ;
1319  remainder--,
1320  key_part++)
1321  {
1322  Field *field= key_part->field;
1323  uint store_length= key_part->store_length;
1324  uint part_length= min(store_length, length);
1325  needs_quotes= field->str_needs_quotes();
1326  DBUG_DUMP("key, start of loop", ptr, length);
1327 
1328  if (key_part->null_bit)
1329  {
1330  if (*ptr++)
1331  {
1332  /*
1333  We got "IS [NOT] NULL" condition against nullable column. We
1334  distinguish between "IS NOT NULL" and "IS NULL" by flag. For
1335  "IS NULL", flag is set to HA_READ_KEY_EXACT.
1336  */
1337  if (emit_key_part_name(&tmp, key_part) ||
1338  (ranges[i]->flag == HA_READ_KEY_EXACT ?
1339  tmp.append(STRING_WITH_LEN(" IS NULL ")) :
1340  tmp.append(STRING_WITH_LEN(" IS NOT NULL "))))
1341  goto err;
1342  /*
1343  We need to adjust pointer and length to be prepared for next
1344  key part. As well as check if this was last key part.
1345  */
1346  goto prepare_for_next_key_part;
1347  }
1348  }
1349 
1350  if (tmp.append(STRING_WITH_LEN(" (")))
1351  goto err;
1352 
1353  switch (ranges[i]->flag) {
1354  case HA_READ_KEY_EXACT:
1355  DBUG_PRINT("info", ("federated HA_READ_KEY_EXACT %d", i));
1356  if (store_length >= length ||
1357  !needs_quotes ||
1358  key_part->type == HA_KEYTYPE_BIT ||
1359  field->result_type() != STRING_RESULT)
1360  {
1361  if (emit_key_part_name(&tmp, key_part))
1362  goto err;
1363 
1364  if (from_records_in_range)
1365  {
1366  if (tmp.append(STRING_WITH_LEN(" >= ")))
1367  goto err;
1368  }
1369  else
1370  {
1371  if (tmp.append(STRING_WITH_LEN(" = ")))
1372  goto err;
1373  }
1374 
1375  if (emit_key_part_element(&tmp, key_part, needs_quotes, 0, ptr,
1376  part_length))
1377  goto err;
1378  }
1379  else
1380  {
1381  /* LIKE */
1382  if (emit_key_part_name(&tmp, key_part) ||
1383  tmp.append(STRING_WITH_LEN(" LIKE ")) ||
1384  emit_key_part_element(&tmp, key_part, needs_quotes, 1, ptr,
1385  part_length))
1386  goto err;
1387  }
1388  break;
1389  case HA_READ_AFTER_KEY:
1390  if (eq_range_arg)
1391  {
1392  if (tmp.append("1=1")) // Dummy
1393  goto err;
1394  break;
1395  }
1396  DBUG_PRINT("info", ("federated HA_READ_AFTER_KEY %d", i));
1397  if ((store_length >= length) || (i > 0)) /* for all parts of end key*/
1398  {
1399  if (emit_key_part_name(&tmp, key_part))
1400  goto err;
1401 
1402  if (i > 0) /* end key */
1403  {
1404  if (tmp.append(STRING_WITH_LEN(" <= ")))
1405  goto err;
1406  }
1407  else /* start key */
1408  {
1409  if (tmp.append(STRING_WITH_LEN(" > ")))
1410  goto err;
1411  }
1412 
1413  if (emit_key_part_element(&tmp, key_part, needs_quotes, 0, ptr,
1414  part_length))
1415  {
1416  goto err;
1417  }
1418  break;
1419  }
1420  case HA_READ_KEY_OR_NEXT:
1421  DBUG_PRINT("info", ("federated HA_READ_KEY_OR_NEXT %d", i));
1422  if (emit_key_part_name(&tmp, key_part) ||
1423  tmp.append(STRING_WITH_LEN(" >= ")) ||
1424  emit_key_part_element(&tmp, key_part, needs_quotes, 0, ptr,
1425  part_length))
1426  goto err;
1427  break;
1428  case HA_READ_BEFORE_KEY:
1429  DBUG_PRINT("info", ("federated HA_READ_BEFORE_KEY %d", i));
1430  if (store_length >= length)
1431  {
1432  if (emit_key_part_name(&tmp, key_part) ||
1433  tmp.append(STRING_WITH_LEN(" < ")) ||
1434  emit_key_part_element(&tmp, key_part, needs_quotes, 0, ptr,
1435  part_length))
1436  goto err;
1437  break;
1438  }
1439  case HA_READ_KEY_OR_PREV:
1440  DBUG_PRINT("info", ("federated HA_READ_KEY_OR_PREV %d", i));
1441  if (emit_key_part_name(&tmp, key_part) ||
1442  tmp.append(STRING_WITH_LEN(" <= ")) ||
1443  emit_key_part_element(&tmp, key_part, needs_quotes, 0, ptr,
1444  part_length))
1445  goto err;
1446  break;
1447  default:
1448  DBUG_PRINT("info",("cannot handle flag %d", ranges[i]->flag));
1449  goto err;
1450  }
1451  if (tmp.append(STRING_WITH_LEN(") ")))
1452  goto err;
1453 
1454 prepare_for_next_key_part:
1455  if (store_length >= length)
1456  break;
1457  DBUG_PRINT("info", ("remainder %d", remainder));
1458  DBUG_ASSERT(remainder > 1);
1459  length-= store_length;
1460  /*
1461  For nullable columns, null-byte is already skipped before, that is
1462  ptr was incremented by 1. Since store_length still counts null-byte,
1463  we need to subtract 1 from store_length.
1464  */
1465  ptr+= store_length - test(key_part->null_bit);
1466  if (tmp.append(STRING_WITH_LEN(" AND ")))
1467  goto err;
1468 
1469  DBUG_PRINT("info",
1470  ("create_where_from_key WHERE clause: %s",
1471  tmp.c_ptr_quick()));
1472  }
1473  }
1474  dbug_tmp_restore_column_map(table->write_set, old_map);
1475 
1476  if (both_not_null)
1477  if (tmp.append(STRING_WITH_LEN(") ")))
1478  DBUG_RETURN(1);
1479 
1480  if (to->append(STRING_WITH_LEN(" WHERE ")))
1481  DBUG_RETURN(1);
1482 
1483  if (to->append(tmp))
1484  DBUG_RETURN(1);
1485 
1486  DBUG_RETURN(0);
1487 
1488 err:
1489  dbug_tmp_restore_column_map(table->write_set, old_map);
1490  DBUG_RETURN(1);
1491 }
1492 
1493 /*
1494  Example of simple lock controls. The "share" it creates is structure we will
1495  pass to each federated handler. Do you have to have one of these? Well, you
1496  have pieces that are used for locking, and they are needed to function.
1497 */
1498 
1499 static FEDERATED_SHARE *get_share(const char *table_name, TABLE *table)
1500 {
1501  char query_buffer[FEDERATED_QUERY_BUFFER_SIZE];
1502  Field **field;
1503  String query(query_buffer, sizeof(query_buffer), &my_charset_bin);
1504  FEDERATED_SHARE *share= NULL, tmp_share;
1505  MEM_ROOT mem_root;
1506  DBUG_ENTER("ha_federated.cc::get_share");
1507 
1508  /*
1509  In order to use this string, we must first zero it's length,
1510  or it will contain garbage
1511  */
1512  query.length(0);
1513 
1514  init_alloc_root(&mem_root, 256, 0);
1515 
1516  mysql_mutex_lock(&federated_mutex);
1517 
1518  tmp_share.share_key= table_name;
1519  tmp_share.share_key_length= (uint) strlen(table_name);
1520  if (parse_url(&mem_root, &tmp_share, table, 0))
1521  goto error;
1522 
1523  /* TODO: change tmp_share.scheme to LEX_STRING object */
1524  if (!(share= (FEDERATED_SHARE *) my_hash_search(&federated_open_tables,
1525  (uchar*) tmp_share.share_key,
1526  tmp_share.
1527  share_key_length)))
1528  {
1529  query.set_charset(system_charset_info);
1530  query.append(STRING_WITH_LEN("SELECT "));
1531  for (field= table->field; *field; field++)
1532  {
1533  append_ident(&query, (*field)->field_name,
1534  strlen((*field)->field_name), ident_quote_char);
1535  query.append(STRING_WITH_LEN(", "));
1536  }
1537  /* chops off trailing comma */
1538  query.length(query.length() - sizeof_trailing_comma);
1539 
1540  query.append(STRING_WITH_LEN(" FROM "));
1541 
1542  append_ident(&query, tmp_share.table_name,
1543  tmp_share.table_name_length, ident_quote_char);
1544 
1545  if (!(share= (FEDERATED_SHARE *) memdup_root(&mem_root, (char*)&tmp_share, sizeof(*share))) ||
1546  !(share->select_query= (char*) strmake_root(&mem_root, query.ptr(), query.length() + 1)))
1547  goto error;
1548 
1549  share->use_count= 0;
1550  share->mem_root= mem_root;
1551 
1552  DBUG_PRINT("info",
1553  ("share->select_query %s", share->select_query));
1554 
1555  if (my_hash_insert(&federated_open_tables, (uchar*) share))
1556  goto error;
1557  thr_lock_init(&share->lock);
1558  mysql_mutex_init(fe_key_mutex_FEDERATED_SHARE_mutex,
1559  &share->mutex, MY_MUTEX_INIT_FAST);
1560  }
1561  else
1562  free_root(&mem_root, MYF(0)); /* prevents memory leak */
1563 
1564  share->use_count++;
1565  mysql_mutex_unlock(&federated_mutex);
1566 
1567  DBUG_RETURN(share);
1568 
1569 error:
1570  mysql_mutex_unlock(&federated_mutex);
1571  free_root(&mem_root, MYF(0));
1572  DBUG_RETURN(NULL);
1573 }
1574 
1575 
1576 /*
1577  Free lock controls. We call this whenever we close a table.
1578  If the table had the last reference to the share then we
1579  free memory associated with it.
1580 */
1581 
1582 static int free_share(FEDERATED_SHARE *share)
1583 {
1584  MEM_ROOT mem_root= share->mem_root;
1585  DBUG_ENTER("free_share");
1586 
1587  mysql_mutex_lock(&federated_mutex);
1588  if (!--share->use_count)
1589  {
1590  my_hash_delete(&federated_open_tables, (uchar*) share);
1591  thr_lock_delete(&share->lock);
1592  mysql_mutex_destroy(&share->mutex);
1593  free_root(&mem_root, MYF(0));
1594  }
1595  mysql_mutex_unlock(&federated_mutex);
1596 
1597  DBUG_RETURN(0);
1598 }
1599 
1600 
1601 ha_rows ha_federated::records_in_range(uint inx, key_range *start_key,
1602  key_range *end_key)
1603 {
1604  /*
1605 
1606  We really want indexes to be used as often as possible, therefore
1607  we just need to hard-code the return value to a very low number to
1608  force the issue
1609 
1610 */
1611  DBUG_ENTER("ha_federated::records_in_range");
1612  DBUG_RETURN(FEDERATED_RECORDS_IN_RANGE);
1613 }
1614 /*
1615  If frm_error() is called then we will use this to to find out
1616  what file extentions exist for the storage engine. This is
1617  also used by the default rename_table and delete_table method
1618  in handler.cc.
1619 */
1620 
1621 const char **ha_federated::bas_ext() const
1622 {
1623  static const char *ext[]=
1624  {
1625  NullS
1626  };
1627  return ext;
1628 }
1629 
1630 
1631 /*
1632  Used for opening tables. The name will be the name of the file.
1633  A table is opened when it needs to be opened. For instance
1634  when a request comes in for a select on the table (tables are not
1635  open and closed for each request, they are cached).
1636 
1637  Called from handler.cc by handler::ha_open(). The server opens
1638  all tables by calling ha_open() which then calls the handler
1639  specific open().
1640 */
1641 
1642 int ha_federated::open(const char *name, int mode, uint test_if_locked)
1643 {
1644  DBUG_ENTER("ha_federated::open");
1645 
1646  if (!(share= get_share(name, table)))
1647  DBUG_RETURN(1);
1648  thr_lock_data_init(&share->lock, &lock, NULL);
1649 
1650  DBUG_ASSERT(mysql == NULL);
1651 
1652  ref_length= sizeof(MYSQL_RES *) + sizeof(MYSQL_ROW_OFFSET);
1653  DBUG_PRINT("info", ("ref_length: %u", ref_length));
1654 
1655  my_init_dynamic_array(&results, sizeof(MYSQL_RES *), 4, 4);
1656  reset();
1657 
1658  DBUG_RETURN(0);
1659 }
1660 
1661 
1662 /*
1663  Closes a table. We call the free_share() function to free any resources
1664  that we have allocated in the "shared" structure.
1665 
1666  Called from sql_base.cc, sql_select.cc, and table.cc.
1667  In sql_select.cc it is only used to close up temporary tables or during
1668  the process where a temporary table is converted over to being a
1669  myisam table.
1670  For sql_base.cc look at close_data_tables().
1671 */
1672 
1673 int ha_federated::close(void)
1674 {
1675  DBUG_ENTER("ha_federated::close");
1676 
1677  free_result();
1678 
1679  delete_dynamic(&results);
1680 
1681  /* Disconnect from mysql */
1682  mysql_close(mysql);
1683  mysql= NULL;
1684 
1685  /*
1686  mysql_close() might return an error if a remote server's gone
1687  for some reason. If that happens while removing a table from
1688  the table cache, the error will be propagated to a client even
1689  if the original query was not issued against the FEDERATED table.
1690  So, don't propagate errors from mysql_close().
1691  */
1692  if (table->in_use)
1693  table->in_use->clear_error();
1694 
1695  DBUG_RETURN(free_share(share));
1696 }
1697 
1698 
1710 bool ha_federated::append_stmt_insert(String *query)
1711 {
1712  char insert_buffer[FEDERATED_QUERY_BUFFER_SIZE];
1713  Field **field;
1714  uint tmp_length;
1715  bool added_field= FALSE;
1716 
1717  /* The main insert query string */
1718  String insert_string(insert_buffer, sizeof(insert_buffer), &my_charset_bin);
1719  DBUG_ENTER("ha_federated::append_stmt_insert");
1720 
1721  insert_string.length(0);
1722 
1723  if (replace_duplicates)
1724  insert_string.append(STRING_WITH_LEN("REPLACE INTO "));
1725  else if (ignore_duplicates && !insert_dup_update)
1726  insert_string.append(STRING_WITH_LEN("INSERT IGNORE INTO "));
1727  else
1728  insert_string.append(STRING_WITH_LEN("INSERT INTO "));
1729  append_ident(&insert_string, share->table_name, share->table_name_length,
1730  ident_quote_char);
1731  tmp_length= insert_string.length();
1732  insert_string.append(STRING_WITH_LEN(" ("));
1733 
1734  /*
1735  loop through the field pointer array, add any fields to both the values
1736  list and the fields list that match the current query id
1737  */
1738  for (field= table->field; *field; field++)
1739  {
1740  if (bitmap_is_set(table->write_set, (*field)->field_index))
1741  {
1742  /* append the field name */
1743  append_ident(&insert_string, (*field)->field_name,
1744  strlen((*field)->field_name), ident_quote_char);
1745 
1746  /* append commas between both fields and fieldnames */
1747  /*
1748  unfortunately, we can't use the logic if *(fields + 1) to
1749  make the following appends conditional as we don't know if the
1750  next field is in the write set
1751  */
1752  insert_string.append(STRING_WITH_LEN(", "));
1753  added_field= TRUE;
1754  }
1755  }
1756 
1757  if (added_field)
1758  {
1759  /* Remove trailing comma. */
1760  insert_string.length(insert_string.length() - sizeof_trailing_comma);
1761  insert_string.append(STRING_WITH_LEN(") "));
1762  }
1763  else
1764  {
1765  /* If there were no fields, we don't want to add a closing paren. */
1766  insert_string.length(tmp_length);
1767  }
1768 
1769  insert_string.append(STRING_WITH_LEN(" VALUES "));
1770 
1771  DBUG_RETURN(query->append(insert_string));
1772 }
1773 
1774 
1775 /*
1776  write_row() inserts a row. No extra() hint is given currently if a bulk load
1777  is happeneding. buf() is a byte array of data. You can use the field
1778  information to extract the data from the native byte array type.
1779  Example of this would be:
1780  for (Field **field=table->field ; *field ; field++)
1781  {
1782  ...
1783  }
1784 
1785  Called from item_sum.cc, item_sum.cc, sql_acl.cc, sql_insert.cc,
1786  sql_insert.cc, sql_select.cc, sql_table.cc, sql_udf.cc, and sql_update.cc.
1787 */
1788 
1789 int ha_federated::write_row(uchar *buf)
1790 {
1791  char values_buffer[FEDERATED_QUERY_BUFFER_SIZE];
1792  char insert_field_value_buffer[STRING_BUFFER_USUAL_SIZE];
1793  Field **field;
1794  uint tmp_length;
1795  int error= 0;
1796  bool use_bulk_insert;
1797  bool auto_increment_update_required= (table->next_number_field != NULL);
1798 
1799  /* The string containing the values to be added to the insert */
1800  String values_string(values_buffer, sizeof(values_buffer), &my_charset_bin);
1801  /* The actual value of the field, to be added to the values_string */
1802  String insert_field_value_string(insert_field_value_buffer,
1803  sizeof(insert_field_value_buffer),
1804  &my_charset_bin);
1805  my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->read_set);
1806  DBUG_ENTER("ha_federated::write_row");
1807 
1808  values_string.length(0);
1809  insert_field_value_string.length(0);
1810  ha_statistic_increment(&SSV::ha_write_count);
1811 
1812  /*
1813  start both our field and field values strings
1814  We must disable multi-row insert for "INSERT...ON DUPLICATE KEY UPDATE"
1815  Ignore duplicates is always true when insert_dup_update is true.
1816  When replace_duplicates == TRUE, we can safely enable multi-row insert.
1817  When performing multi-row insert, we only collect the columns values for
1818  the row. The start of the statement is only created when the first
1819  row is copied in to the bulk_insert string.
1820  */
1821  if (!(use_bulk_insert= bulk_insert.str &&
1822  (!insert_dup_update || replace_duplicates)))
1823  append_stmt_insert(&values_string);
1824 
1825  values_string.append(STRING_WITH_LEN(" ("));
1826  tmp_length= values_string.length();
1827 
1828  /*
1829  loop through the field pointer array, add any fields to both the values
1830  list and the fields list that is part of the write set
1831  */
1832  for (field= table->field; *field; field++)
1833  {
1834  if (bitmap_is_set(table->write_set, (*field)->field_index))
1835  {
1836  if ((*field)->is_null())
1837  values_string.append(STRING_WITH_LEN(" NULL "));
1838  else
1839  {
1840  bool needs_quote= (*field)->str_needs_quotes();
1841  (*field)->val_str(&insert_field_value_string);
1842  if (needs_quote)
1843  values_string.append(value_quote_char);
1844  insert_field_value_string.print(&values_string);
1845  if (needs_quote)
1846  values_string.append(value_quote_char);
1847 
1848  insert_field_value_string.length(0);
1849  }
1850 
1851  /* append commas between both fields and fieldnames */
1852  /*
1853  unfortunately, we can't use the logic if *(fields + 1) to
1854  make the following appends conditional as we don't know if the
1855  next field is in the write set
1856  */
1857  values_string.append(STRING_WITH_LEN(", "));
1858  }
1859  }
1860  dbug_tmp_restore_column_map(table->read_set, old_map);
1861 
1862  /*
1863  if there were no fields, we don't want to add a closing paren
1864  AND, we don't want to chop off the last char '('
1865  insert will be "INSERT INTO t1 VALUES ();"
1866  */
1867  if (values_string.length() > tmp_length)
1868  {
1869  /* chops off trailing comma */
1870  values_string.length(values_string.length() - sizeof_trailing_comma);
1871  }
1872  /* we always want to append this, even if there aren't any fields */
1873  values_string.append(STRING_WITH_LEN(") "));
1874 
1875  if (use_bulk_insert)
1876  {
1877  /*
1878  Send the current bulk insert out if appending the current row would
1879  cause the statement to overflow the packet size, otherwise set
1880  auto_increment_update_required to FALSE as no query was executed.
1881  */
1882  if (bulk_insert.length + values_string.length() + bulk_padding >
1883  mysql->net.max_packet_size && bulk_insert.length)
1884  {
1885  error= real_query(bulk_insert.str, bulk_insert.length);
1886  bulk_insert.length= 0;
1887  }
1888  else
1889  auto_increment_update_required= FALSE;
1890 
1891  if (bulk_insert.length == 0)
1892  {
1893  char insert_buffer[FEDERATED_QUERY_BUFFER_SIZE];
1894  String insert_string(insert_buffer, sizeof(insert_buffer),
1895  &my_charset_bin);
1896  insert_string.length(0);
1897  append_stmt_insert(&insert_string);
1898  dynstr_append_mem(&bulk_insert, insert_string.ptr(),
1899  insert_string.length());
1900  }
1901  else
1902  dynstr_append_mem(&bulk_insert, ",", 1);
1903 
1904  dynstr_append_mem(&bulk_insert, values_string.ptr(),
1905  values_string.length());
1906  }
1907  else
1908  {
1909  error= real_query(values_string.ptr(), values_string.length());
1910  }
1911 
1912  if (error)
1913  {
1914  DBUG_RETURN(stash_remote_error());
1915  }
1916  /*
1917  If the table we've just written a record to contains an auto_increment
1918  field, then store the last_insert_id() value from the foreign server
1919  */
1920  if (auto_increment_update_required)
1921  {
1922  update_auto_increment();
1923 
1924  /* mysql_insert() uses this for protocol return value */
1925  table->next_number_field->store(stats.auto_increment_value, 1);
1926  }
1927 
1928  DBUG_RETURN(0);
1929 }
1930 
1931 
1942 {
1943  uint page_size;
1944  DBUG_ENTER("ha_federated::start_bulk_insert");
1945 
1946  dynstr_free(&bulk_insert);
1947 
1954  if (rows == 1)
1955  DBUG_VOID_RETURN;
1956 
1957  /*
1958  Make sure we have an open connection so that we know the
1959  maximum packet size.
1960  */
1961  if (!mysql && real_connect())
1962  DBUG_VOID_RETURN;
1963 
1964  page_size= (uint) my_getpagesize();
1965 
1966  if (init_dynamic_string(&bulk_insert, NULL, page_size, page_size))
1967  DBUG_VOID_RETURN;
1968 
1969  bulk_insert.length= 0;
1970  DBUG_VOID_RETURN;
1971 }
1972 
1973 
1986 {
1987  int error= 0;
1988  DBUG_ENTER("ha_federated::end_bulk_insert");
1989 
1990  if (bulk_insert.str && bulk_insert.length)
1991  {
1992  if (real_query(bulk_insert.str, bulk_insert.length))
1993  error= stash_remote_error();
1994  else
1995  if (table->next_number_field)
1996  update_auto_increment();
1997  }
1998 
1999  dynstr_free(&bulk_insert);
2000 
2001  DBUG_RETURN(my_errno= error);
2002 }
2003 
2004 
2005 /*
2006  ha_federated::update_auto_increment
2007 
2008  This method ensures that last_insert_id() works properly. What it simply does
2009  is calls last_insert_id() on the foreign database immediately after insert
2010  (if the table has an auto_increment field) and sets the insert id via
2011  thd->insert_id(ID)).
2012 */
2013 void ha_federated::update_auto_increment(void)
2014 {
2015  THD *thd= current_thd;
2016  DBUG_ENTER("ha_federated::update_auto_increment");
2017 
2018  ha_federated::info(HA_STATUS_AUTO);
2019  thd->first_successful_insert_id_in_cur_stmt=
2020  stats.auto_increment_value;
2021  DBUG_PRINT("info",("last_insert_id: %ld", (long) stats.auto_increment_value));
2022 
2023  DBUG_VOID_RETURN;
2024 }
2025 
2026 int ha_federated::optimize(THD* thd, HA_CHECK_OPT* check_opt)
2027 {
2028  char query_buffer[STRING_BUFFER_USUAL_SIZE];
2029  String query(query_buffer, sizeof(query_buffer), &my_charset_bin);
2030  DBUG_ENTER("ha_federated::optimize");
2031 
2032  query.length(0);
2033 
2034  query.set_charset(system_charset_info);
2035  query.append(STRING_WITH_LEN("OPTIMIZE TABLE "));
2036  append_ident(&query, share->table_name, share->table_name_length,
2037  ident_quote_char);
2038 
2039  if (real_query(query.ptr(), query.length()))
2040  {
2041  DBUG_RETURN(stash_remote_error());
2042  }
2043 
2044  DBUG_RETURN(0);
2045 }
2046 
2047 
2048 int ha_federated::repair(THD* thd, HA_CHECK_OPT* check_opt)
2049 {
2050  char query_buffer[STRING_BUFFER_USUAL_SIZE];
2051  String query(query_buffer, sizeof(query_buffer), &my_charset_bin);
2052  DBUG_ENTER("ha_federated::repair");
2053 
2054  query.length(0);
2055 
2056  query.set_charset(system_charset_info);
2057  query.append(STRING_WITH_LEN("REPAIR TABLE "));
2058  append_ident(&query, share->table_name, share->table_name_length,
2059  ident_quote_char);
2060  if (check_opt->flags & T_QUICK)
2061  query.append(STRING_WITH_LEN(" QUICK"));
2062  if (check_opt->flags & T_EXTEND)
2063  query.append(STRING_WITH_LEN(" EXTENDED"));
2064  if (check_opt->sql_flags & TT_USEFRM)
2065  query.append(STRING_WITH_LEN(" USE_FRM"));
2066 
2067  if (real_query(query.ptr(), query.length()))
2068  {
2069  DBUG_RETURN(stash_remote_error());
2070  }
2071 
2072  DBUG_RETURN(0);
2073 }
2074 
2075 
2076 /*
2077  Yes, update_row() does what you expect, it updates a row. old_data will have
2078  the previous row record in it, while new_data will have the newest data in
2079  it.
2080 
2081  Keep in mind that the server can do updates based on ordering if an ORDER BY
2082  clause was used. Consecutive ordering is not guaranteed.
2083 
2084  Currently new_data will not have an updated AUTO_INCREMENT record. You can
2085  do this for federated by doing the following:
2086 
2087  if (table->next_number_field && record == table->record[0])
2088  update_auto_increment();
2089 
2090  Called from sql_select.cc, sql_acl.cc, sql_update.cc, and sql_insert.cc.
2091 */
2092 
2093 int ha_federated::update_row(const uchar *old_data, uchar *new_data)
2094 {
2095  /*
2096  This used to control how the query was built. If there was a
2097  primary key, the query would be built such that there was a where
2098  clause with only that column as the condition. This is flawed,
2099  because if we have a multi-part primary key, it would only use the
2100  first part! We don't need to do this anyway, because
2101  read_range_first will retrieve the correct record, which is what
2102  is used to build the WHERE clause. We can however use this to
2103  append a LIMIT to the end if there is NOT a primary key. Why do
2104  this? Because we only are updating one record, and LIMIT enforces
2105  this.
2106  */
2107  bool has_a_primary_key= test(table->s->primary_key != MAX_KEY);
2108 
2109  /*
2110  buffers for following strings
2111  */
2112  char field_value_buffer[STRING_BUFFER_USUAL_SIZE];
2113  char update_buffer[FEDERATED_QUERY_BUFFER_SIZE];
2114  char where_buffer[FEDERATED_QUERY_BUFFER_SIZE];
2115 
2116  /* Work area for field values */
2117  String field_value(field_value_buffer, sizeof(field_value_buffer),
2118  &my_charset_bin);
2119  /* stores the update query */
2120  String update_string(update_buffer,
2121  sizeof(update_buffer),
2122  &my_charset_bin);
2123  /* stores the WHERE clause */
2124  String where_string(where_buffer,
2125  sizeof(where_buffer),
2126  &my_charset_bin);
2127  uchar *record= table->record[0];
2128  DBUG_ENTER("ha_federated::update_row");
2129  /*
2130  set string lengths to 0 to avoid misc chars in string
2131  */
2132  field_value.length(0);
2133  update_string.length(0);
2134  where_string.length(0);
2135 
2136  if (ignore_duplicates)
2137  update_string.append(STRING_WITH_LEN("UPDATE IGNORE "));
2138  else
2139  update_string.append(STRING_WITH_LEN("UPDATE "));
2140  append_ident(&update_string, share->table_name,
2141  share->table_name_length, ident_quote_char);
2142  update_string.append(STRING_WITH_LEN(" SET "));
2143 
2144  /*
2145  In this loop, we want to match column names to values being inserted
2146  (while building INSERT statement).
2147 
2148  Iterate through table->field (new data) and share->old_field (old_data)
2149  using the same index to create an SQL UPDATE statement. New data is
2150  used to create SET field=value and old data is used to create WHERE
2151  field=oldvalue
2152  */
2153 
2154  for (Field **field= table->field; *field; field++)
2155  {
2156  if (bitmap_is_set(table->write_set, (*field)->field_index))
2157  {
2158  size_t field_name_length= strlen((*field)->field_name);
2159  append_ident(&update_string, (*field)->field_name, field_name_length,
2160  ident_quote_char);
2161  update_string.append(STRING_WITH_LEN(" = "));
2162 
2163  if ((*field)->is_null())
2164  update_string.append(STRING_WITH_LEN(" NULL "));
2165  else
2166  {
2167  /* otherwise = */
2168  my_bitmap_map *old_map= tmp_use_all_columns(table, table->read_set);
2169  bool needs_quote= (*field)->str_needs_quotes();
2170  (*field)->val_str(&field_value);
2171  if (needs_quote)
2172  update_string.append(value_quote_char);
2173  field_value.print(&update_string);
2174  if (needs_quote)
2175  update_string.append(value_quote_char);
2176  field_value.length(0);
2177  tmp_restore_column_map(table->read_set, old_map);
2178  }
2179  update_string.append(STRING_WITH_LEN(", "));
2180  }
2181 
2182  if (bitmap_is_set(table->read_set, (*field)->field_index))
2183  {
2184  size_t field_name_length= strlen((*field)->field_name);
2185  append_ident(&where_string, (*field)->field_name, field_name_length,
2186  ident_quote_char);
2187  if ((*field)->is_null_in_record(old_data))
2188  where_string.append(STRING_WITH_LEN(" IS NULL "));
2189  else
2190  {
2191  bool needs_quote= (*field)->str_needs_quotes();
2192  where_string.append(STRING_WITH_LEN(" = "));
2193  (*field)->val_str(&field_value,
2194  (old_data + (*field)->offset(record)));
2195  if (needs_quote)
2196  where_string.append(value_quote_char);
2197  field_value.print(&where_string);
2198  if (needs_quote)
2199  where_string.append(value_quote_char);
2200  field_value.length(0);
2201  }
2202  where_string.append(STRING_WITH_LEN(" AND "));
2203  }
2204  }
2205 
2206  /* Remove last ', '. This works as there must be at least on updated field */
2207  update_string.length(update_string.length() - sizeof_trailing_comma);
2208 
2209  if (where_string.length())
2210  {
2211  /* chop off trailing AND */
2212  where_string.length(where_string.length() - sizeof_trailing_and);
2213  update_string.append(STRING_WITH_LEN(" WHERE "));
2214  update_string.append(where_string);
2215  }
2216 
2217  /*
2218  If this table has not a primary key, then we could possibly
2219  update multiple rows. We want to make sure to only update one!
2220  */
2221  if (!has_a_primary_key)
2222  update_string.append(STRING_WITH_LEN(" LIMIT 1"));
2223 
2224  if (real_query(update_string.ptr(), update_string.length()))
2225  {
2226  DBUG_RETURN(stash_remote_error());
2227  }
2228  DBUG_RETURN(0);
2229 }
2230 
2231 /*
2232  This will delete a row. 'buf' will contain a copy of the row to be =deleted.
2233  The server will call this right after the current row has been called (from
2234  either a previous rnd_next() or index call).
2235  If you keep a pointer to the last row or can access a primary key it will
2236  make doing the deletion quite a bit easier.
2237  Keep in mind that the server does no guarentee consecutive deletions.
2238  ORDER BY clauses can be used.
2239 
2240  Called in sql_acl.cc and sql_udf.cc to manage internal table information.
2241  Called in sql_delete.cc, sql_insert.cc, and sql_select.cc. In sql_select
2242  it is used for removing duplicates while in insert it is used for REPLACE
2243  calls.
2244 */
2245 
2246 int ha_federated::delete_row(const uchar *buf)
2247 {
2248  char delete_buffer[FEDERATED_QUERY_BUFFER_SIZE];
2249  char data_buffer[FEDERATED_QUERY_BUFFER_SIZE];
2250  String delete_string(delete_buffer, sizeof(delete_buffer), &my_charset_bin);
2251  String data_string(data_buffer, sizeof(data_buffer), &my_charset_bin);
2252  uint found= 0;
2253  DBUG_ENTER("ha_federated::delete_row");
2254 
2255  delete_string.length(0);
2256  delete_string.append(STRING_WITH_LEN("DELETE FROM "));
2257  append_ident(&delete_string, share->table_name,
2258  share->table_name_length, ident_quote_char);
2259  delete_string.append(STRING_WITH_LEN(" WHERE "));
2260 
2261  for (Field **field= table->field; *field; field++)
2262  {
2263  Field *cur_field= *field;
2264  found++;
2265  if (bitmap_is_set(table->read_set, cur_field->field_index))
2266  {
2267  append_ident(&delete_string, (*field)->field_name,
2268  strlen((*field)->field_name), ident_quote_char);
2269  data_string.length(0);
2270  if (cur_field->is_null())
2271  {
2272  delete_string.append(STRING_WITH_LEN(" IS NULL "));
2273  }
2274  else
2275  {
2276  bool needs_quote= cur_field->str_needs_quotes();
2277  delete_string.append(STRING_WITH_LEN(" = "));
2278  cur_field->val_str(&data_string);
2279  if (needs_quote)
2280  delete_string.append(value_quote_char);
2281  data_string.print(&delete_string);
2282  if (needs_quote)
2283  delete_string.append(value_quote_char);
2284  }
2285  delete_string.append(STRING_WITH_LEN(" AND "));
2286  }
2287  }
2288 
2289  // Remove trailing AND
2290  delete_string.length(delete_string.length() - sizeof_trailing_and);
2291  if (!found)
2292  delete_string.length(delete_string.length() - sizeof_trailing_where);
2293 
2294  delete_string.append(STRING_WITH_LEN(" LIMIT 1"));
2295  DBUG_PRINT("info",
2296  ("Delete sql: %s", delete_string.c_ptr_quick()));
2297  if (real_query(delete_string.ptr(), delete_string.length()))
2298  {
2299  DBUG_RETURN(stash_remote_error());
2300  }
2301  stats.deleted+= (ha_rows) mysql->affected_rows;
2302  stats.records-= (ha_rows) mysql->affected_rows;
2303  DBUG_PRINT("info",
2304  ("rows deleted %ld rows deleted for all time %ld",
2305  (long) mysql->affected_rows, (long) stats.deleted));
2306 
2307  DBUG_RETURN(0);
2308 }
2309 
2310 
2311 /*
2312  Positions an index cursor to the index specified in the handle. Fetches the
2313  row if available. If the key value is null, begin at the first key of the
2314  index. This method, which is called in the case of an SQL statement having
2315  a WHERE clause on a non-primary key index, simply calls index_read_idx.
2316 */
2317 
2318 int ha_federated::index_read(uchar *buf, const uchar *key,
2319  uint key_len, ha_rkey_function find_flag)
2320 {
2321  int rc;
2322  DBUG_ENTER("ha_federated::index_read");
2323 
2324  MYSQL_INDEX_READ_ROW_START(table_share->db.str, table_share->table_name.str);
2325  free_result();
2326  rc= index_read_idx_with_result_set(buf, active_index, key,
2327  key_len, find_flag,
2328  &stored_result);
2329  MYSQL_INDEX_READ_ROW_DONE(rc);
2330  DBUG_RETURN(rc);
2331 }
2332 
2333 
2334 /*
2335  Positions an index cursor to the index specified in key. Fetches the
2336  row if any. This is only used to read whole keys.
2337 
2338  This method is called via index_read in the case of a WHERE clause using
2339  a primary key index OR is called DIRECTLY when the WHERE clause
2340  uses a PRIMARY KEY index.
2341 
2342  NOTES
2343  This uses an internal result set that is deleted before function
2344  returns. We need to be able to be calable from ha_rnd_pos()
2345 */
2346 
2347 int ha_federated::index_read_idx(uchar *buf, uint index, const uchar *key,
2348  uint key_len, enum ha_rkey_function find_flag)
2349 {
2350  int retval;
2351  MYSQL_RES *mysql_result;
2352  DBUG_ENTER("ha_federated::index_read_idx");
2353 
2354  if ((retval= index_read_idx_with_result_set(buf, index, key,
2355  key_len, find_flag,
2356  &mysql_result)))
2357  DBUG_RETURN(retval);
2358  mysql_free_result(mysql_result);
2359  results.elements--;
2360  DBUG_RETURN(0);
2361 }
2362 
2363 
2364 /*
2365  Create result set for rows matching query and return first row
2366 
2367  RESULT
2368  0 ok In this case *result will contain the result set
2369  table->status == 0
2370  # error In this case *result will contain 0
2371  table->status == STATUS_NOT_FOUND
2372 */
2373 
2374 int ha_federated::index_read_idx_with_result_set(uchar *buf, uint index,
2375  const uchar *key,
2376  uint key_len,
2377  ha_rkey_function find_flag,
2378  MYSQL_RES **result)
2379 {
2380  int retval;
2381  char error_buffer[FEDERATED_QUERY_BUFFER_SIZE];
2382  char index_value[STRING_BUFFER_USUAL_SIZE];
2383  char sql_query_buffer[FEDERATED_QUERY_BUFFER_SIZE];
2384  String index_string(index_value,
2385  sizeof(index_value),
2386  &my_charset_bin);
2387  String sql_query(sql_query_buffer,
2388  sizeof(sql_query_buffer),
2389  &my_charset_bin);
2390  key_range range;
2391  DBUG_ENTER("ha_federated::index_read_idx_with_result_set");
2392 
2393  *result= 0; // In case of errors
2394  index_string.length(0);
2395  sql_query.length(0);
2396  ha_statistic_increment(&SSV::ha_read_key_count);
2397 
2398  sql_query.append(share->select_query);
2399 
2400  range.key= key;
2401  range.length= key_len;
2402  range.flag= find_flag;
2403  create_where_from_key(&index_string,
2404  &table->key_info[index],
2405  &range,
2406  NULL, 0, 0);
2407  sql_query.append(index_string);
2408 
2409  if (real_query(sql_query.ptr(), sql_query.length()))
2410  {
2411  sprintf(error_buffer, "error: %d '%s'",
2412  mysql_errno(mysql), mysql_error(mysql));
2413  retval= ER_QUERY_ON_FOREIGN_DATA_SOURCE;
2414  goto error;
2415  }
2416  if (!(*result= store_result(mysql)))
2417  {
2418  retval= HA_ERR_END_OF_FILE;
2419  goto error;
2420  }
2421  if ((retval= read_next(buf, *result)))
2422  {
2423  mysql_free_result(*result);
2424  results.elements--;
2425  *result= 0;
2426  table->status= STATUS_NOT_FOUND;
2427  DBUG_RETURN(retval);
2428  }
2429  DBUG_RETURN(0);
2430 
2431 error:
2432  table->status= STATUS_NOT_FOUND;
2433  my_error(retval, MYF(0), error_buffer);
2434  DBUG_RETURN(retval);
2435 }
2436 
2437 
2438 /*
2439  This method is used exlusevely by filesort() to check if we
2440  can create sorting buffers of necessary size.
2441  If the handler returns more records that it declares
2442  here server can just crash on filesort().
2443  We cannot guarantee that's not going to happen with
2444  the FEDERATED engine, as we have records==0 always if the
2445  client is a VIEW, and for the table the number of
2446  records can inpredictably change during execution.
2447  So we return maximum possible value here.
2448 */
2449 
2451 {
2452  return HA_POS_ERROR;
2453 }
2454 
2455 
2456 /* Initialized at each key walk (called multiple times unlike rnd_init()) */
2457 
2458 int ha_federated::index_init(uint keynr, bool sorted)
2459 {
2460  DBUG_ENTER("ha_federated::index_init");
2461  DBUG_PRINT("info", ("table: '%s' key: %u", table->s->table_name.str, keynr));
2462  active_index= keynr;
2463  DBUG_RETURN(0);
2464 }
2465 
2466 
2467 /*
2468  Read first range
2469 */
2470 
2472  const key_range *end_key,
2473  bool eq_range_arg, bool sorted)
2474 {
2475  char sql_query_buffer[FEDERATED_QUERY_BUFFER_SIZE];
2476  int retval;
2477  String sql_query(sql_query_buffer,
2478  sizeof(sql_query_buffer),
2479  &my_charset_bin);
2480  DBUG_ENTER("ha_federated::read_range_first");
2481  MYSQL_INDEX_READ_ROW_START(table_share->db.str, table_share->table_name.str);
2482 
2483  DBUG_ASSERT(!(start_key == NULL && end_key == NULL));
2484 
2485  sql_query.length(0);
2486  sql_query.append(share->select_query);
2487  create_where_from_key(&sql_query,
2488  &table->key_info[active_index],
2489  start_key, end_key, 0, eq_range_arg);
2490  if (real_query(sql_query.ptr(), sql_query.length()))
2491  {
2492  retval= ER_QUERY_ON_FOREIGN_DATA_SOURCE;
2493  goto error;
2494  }
2495  sql_query.length(0);
2496 
2497  if (!(stored_result= store_result(mysql)))
2498  {
2499  retval= HA_ERR_END_OF_FILE;
2500  goto error;
2501  }
2502 
2503  retval= read_next(table->record[0], stored_result);
2504  MYSQL_INDEX_READ_ROW_DONE(retval);
2505  DBUG_RETURN(retval);
2506 
2507 error:
2508  table->status= STATUS_NOT_FOUND;
2509  MYSQL_INDEX_READ_ROW_DONE(retval);
2510  DBUG_RETURN(retval);
2511 }
2512 
2513 
2515 {
2516  int retval;
2517  DBUG_ENTER("ha_federated::read_range_next");
2518  MYSQL_INDEX_READ_ROW_START(table_share->db.str, table_share->table_name.str);
2519  retval= rnd_next_int(table->record[0]);
2520  MYSQL_INDEX_READ_ROW_DONE(retval);
2521  DBUG_RETURN(retval);
2522 }
2523 
2524 
2525 /* Used to read forward through the index. */
2527 {
2528  int retval;
2529  DBUG_ENTER("ha_federated::index_next");
2530  MYSQL_INDEX_READ_ROW_START(table_share->db.str, table_share->table_name.str);
2531  ha_statistic_increment(&SSV::ha_read_next_count);
2532  retval= read_next(buf, stored_result);
2533  MYSQL_INDEX_READ_ROW_DONE(retval);
2534  DBUG_RETURN(retval);
2535 }
2536 
2537 
2538 /*
2539  rnd_init() is called when the system wants the storage engine to do a table
2540  scan.
2541 
2542  This is the method that gets data for the SELECT calls.
2543 
2544  See the federated in the introduction at the top of this file to see when
2545  rnd_init() is called.
2546 
2547  Called from filesort.cc, records.cc, sql_handler.cc, sql_select.cc,
2548  sql_table.cc, and sql_update.cc.
2549 */
2550 
2552 {
2553  DBUG_ENTER("ha_federated::rnd_init");
2554  /*
2555  The use of the 'scan' flag is incredibly important for this handler
2556  to work properly, especially with updates containing WHERE clauses
2557  using indexed columns.
2558 
2559  When the initial query contains a WHERE clause of the query using an
2560  indexed column, it's index_read_idx that selects the exact record from
2561  the foreign database.
2562 
2563  When there is NO index in the query, either due to not having a WHERE
2564  clause, or the WHERE clause is using columns that are not indexed, a
2565  'full table scan' done by rnd_init, which in this situation simply means
2566  a 'select * from ...' on the foreign table.
2567 
2568  In other words, this 'scan' flag gives us the means to ensure that if
2569  there is an index involved in the query, we want index_read_idx to
2570  retrieve the exact record (scan flag is 0), and do not want rnd_init
2571  to do a 'full table scan' and wipe out that result set.
2572 
2573  Prior to using this flag, the problem was most apparent with updates.
2574 
2575  An initial query like 'UPDATE tablename SET anything = whatever WHERE
2576  indexedcol = someval', index_read_idx would get called, using a query
2577  constructed with a WHERE clause built from the values of index ('indexcol'
2578  in this case, having a value of 'someval'). mysql_store_result would
2579  then get called (this would be the result set we want to use).
2580 
2581  After this rnd_init (from sql_update.cc) would be called, it would then
2582  unecessarily call "select * from table" on the foreign table, then call
2583  mysql_store_result, which would wipe out the correct previous result set
2584  from the previous call of index_read_idx's that had the result set
2585  containing the correct record, hence update the wrong row!
2586 
2587  */
2588 
2589  if (scan)
2590  {
2591  if (real_query(share->select_query, strlen(share->select_query)) ||
2592  !(stored_result= store_result(mysql)))
2593  DBUG_RETURN(stash_remote_error());
2594  }
2595  DBUG_RETURN(0);
2596 }
2597 
2598 
2599 int ha_federated::rnd_end()
2600 {
2601  DBUG_ENTER("ha_federated::rnd_end");
2602  DBUG_RETURN(index_end());
2603 }
2604 
2605 
2606 int ha_federated::index_end(void)
2607 {
2608  DBUG_ENTER("ha_federated::index_end");
2609  free_result();
2610  active_index= MAX_KEY;
2611  DBUG_RETURN(0);
2612 }
2613 
2614 
2615 /*
2616  This is called for each row of the table scan. When you run out of records
2617  you should return HA_ERR_END_OF_FILE. Fill buff up with the row information.
2618  The Field structure for the table is the key to getting data into buf
2619  in a manner that will allow the server to understand it.
2620 
2621  Called from filesort.cc, records.cc, sql_handler.cc, sql_select.cc,
2622  sql_table.cc, and sql_update.cc.
2623 */
2624 
2625 int ha_federated::rnd_next(uchar *buf)
2626 {
2627  int rc;
2628  DBUG_ENTER("ha_federated::rnd_next");
2629  MYSQL_READ_ROW_START(table_share->db.str, table_share->table_name.str,
2630  TRUE);
2631  rc= rnd_next_int(buf);
2632  MYSQL_READ_ROW_DONE(rc);
2633  DBUG_RETURN(rc);
2634 }
2635 
2636 int ha_federated::rnd_next_int(uchar *buf)
2637 {
2638  DBUG_ENTER("ha_federated::rnd_next_int");
2639 
2640  if (stored_result == 0)
2641  {
2642  /*
2643  Return value of rnd_init is not always checked (see records.cc),
2644  so we can get here _even_ if there is _no_ pre-fetched result-set!
2645  TODO: fix it. We can delete this in 5.1 when rnd_init() is checked.
2646  */
2647  DBUG_RETURN(1);
2648  }
2649  DBUG_RETURN(read_next(buf, stored_result));
2650 }
2651 
2652 
2653 /*
2654  ha_federated::read_next
2655 
2656  reads from a result set and converts to mysql internal
2657  format
2658 
2659  SYNOPSIS
2660  ha_federated::read_next()
2661  buf byte pointer to record
2662  result mysql result set
2663 
2664  DESCRIPTION
2665  This method is a wrapper method that reads one record from a result
2666  set and converts it to the internal table format
2667 
2668  RETURN VALUE
2669  1 error
2670  0 no error
2671 */
2672 
2673 int ha_federated::read_next(uchar *buf, MYSQL_RES *result)
2674 {
2675  int retval;
2676  MYSQL_ROW row;
2677  DBUG_ENTER("ha_federated::read_next");
2678 
2679  table->status= STATUS_NOT_FOUND; // For easier return
2680 
2681  /* Save current data cursor position. */
2682  current_position= result->data_cursor;
2683 
2684  /* Fetch a row, insert it back in a row format. */
2685  if (!(row= mysql_fetch_row(result)))
2686  DBUG_RETURN(HA_ERR_END_OF_FILE);
2687 
2688  if (!(retval= convert_row_to_internal_format(buf, row, result)))
2689  table->status= 0;
2690 
2691  DBUG_RETURN(retval);
2692 }
2693 
2694 
2715 void ha_federated::position(const uchar *record __attribute__ ((unused)))
2716 {
2717  DBUG_ENTER("ha_federated::position");
2718 
2719  DBUG_ASSERT(stored_result);
2720 
2721  position_called= TRUE;
2722  /* Store result set address. */
2723  memcpy(ref, &stored_result, sizeof(MYSQL_RES *));
2724  /* Store data cursor position. */
2725  memcpy(ref + sizeof(MYSQL_RES *), &current_position,
2726  sizeof(MYSQL_ROW_OFFSET));
2727  DBUG_VOID_RETURN;
2728 }
2729 
2730 
2731 /*
2732  This is like rnd_next, but you are given a position to use to determine the
2733  row. The position will be of the type that you stored in ref.
2734 
2735  This method is required for an ORDER BY
2736 
2737  Called from filesort.cc records.cc sql_insert.cc sql_select.cc sql_update.cc.
2738 */
2739 
2740 int ha_federated::rnd_pos(uchar *buf, uchar *pos)
2741 {
2742  MYSQL_RES *result;
2743  int ret_val;
2744  DBUG_ENTER("ha_federated::rnd_pos");
2745 
2746  MYSQL_READ_ROW_START(table_share->db.str, table_share->table_name.str,
2747  FALSE);
2748  ha_statistic_increment(&SSV::ha_read_rnd_count);
2749 
2750  /* Get stored result set. */
2751  memcpy(&result, pos, sizeof(MYSQL_RES *));
2752  DBUG_ASSERT(result);
2753  /* Set data cursor position. */
2754  memcpy(&result->data_cursor, pos + sizeof(MYSQL_RES *),
2755  sizeof(MYSQL_ROW_OFFSET));
2756  /* Read a row. */
2757  ret_val= read_next(buf, result);
2758  MYSQL_READ_ROW_DONE(ret_val);
2759  DBUG_RETURN(ret_val);
2760 }
2761 
2762 
2763 /*
2764  ::info() is used to return information to the optimizer.
2765  Currently this table handler doesn't implement most of the fields
2766  really needed. SHOW also makes use of this data
2767  Another note, you will probably want to have the following in your
2768  code:
2769  if (records < 2)
2770  records = 2;
2771  The reason is that the server will optimize for cases of only a single
2772  record. If in a table scan you don't know the number of records
2773  it will probably be better to set records to two so you can return
2774  as many records as you need.
2775  Along with records a few more variables you may wish to set are:
2776  records
2777  deleted
2778  data_file_length
2779  index_file_length
2780  delete_length
2781  check_time
2782  Take a look at the public variables in handler.h for more information.
2783 
2784  Called in:
2785  filesort.cc
2786  ha_heap.cc
2787  item_sum.cc
2788  opt_sum.cc
2789  sql_delete.cc
2790  sql_delete.cc
2791  sql_derived.cc
2792  sql_select.cc
2793  sql_select.cc
2794  sql_select.cc
2795  sql_select.cc
2796  sql_select.cc
2797  sql_show.cc
2798  sql_show.cc
2799  sql_show.cc
2800  sql_show.cc
2801  sql_table.cc
2802  sql_union.cc
2803  sql_update.cc
2804 
2805 */
2806 
2807 int ha_federated::info(uint flag)
2808 {
2809  char status_buf[FEDERATED_QUERY_BUFFER_SIZE];
2810  int error;
2811  uint error_code;
2812  MYSQL_RES *result= 0;
2813  MYSQL_ROW row;
2814  String status_query_string(status_buf, sizeof(status_buf), &my_charset_bin);
2815  DBUG_ENTER("ha_federated::info");
2816 
2817  error_code= ER_QUERY_ON_FOREIGN_DATA_SOURCE;
2818  /* we want not to show table status if not needed to do so */
2819  if (flag & (HA_STATUS_VARIABLE | HA_STATUS_CONST))
2820  {
2821  status_query_string.length(0);
2822  status_query_string.append(STRING_WITH_LEN("SHOW TABLE STATUS LIKE "));
2823  append_ident(&status_query_string, share->table_name,
2824  share->table_name_length, value_quote_char);
2825 
2826  if (real_query(status_query_string.ptr(), status_query_string.length()))
2827  goto error;
2828 
2829  status_query_string.length(0);
2830 
2831  result= mysql_store_result(mysql);
2832 
2833  /*
2834  We're going to use fields num. 4, 12 and 13 of the resultset,
2835  so make sure we have these fields.
2836  */
2837  if (!result || (mysql_num_fields(result) < 14))
2838  goto error;
2839 
2840  if (!mysql_num_rows(result))
2841  goto error;
2842 
2843  if (!(row= mysql_fetch_row(result)))
2844  goto error;
2845 
2846  /*
2847  deleted is set in ha_federated::info
2848  */
2849  /*
2850  need to figure out what this means as far as federated is concerned,
2851  since we don't have a "file"
2852 
2853  data_file_length = ?
2854  index_file_length = ?
2855  delete_length = ?
2856  */
2857  if (row[4] != NULL)
2858  stats.records= (ha_rows) my_strtoll10(row[4], (char**) 0,
2859  &error);
2860  if (row[5] != NULL)
2861  stats.mean_rec_length= (ulong) my_strtoll10(row[5], (char**) 0, &error);
2862 
2863  stats.data_file_length= stats.records * stats.mean_rec_length;
2864 
2865  if (row[12] != NULL)
2866  stats.update_time= (ulong) my_strtoll10(row[12], (char**) 0,
2867  &error);
2868  if (row[13] != NULL)
2869  stats.check_time= (ulong) my_strtoll10(row[13], (char**) 0,
2870  &error);
2871 
2872  /*
2873  size of IO operations (This is based on a good guess, no high science
2874  involved)
2875  */
2876  if (flag & HA_STATUS_CONST)
2877  stats.block_size= 4096;
2878 
2879  }
2880 
2881  if (flag & HA_STATUS_AUTO)
2882  stats.auto_increment_value= mysql->insert_id;
2883 
2884  mysql_free_result(result);
2885 
2886  DBUG_RETURN(0);
2887 
2888 error:
2889  mysql_free_result(result);
2890  if (mysql)
2891  {
2892  my_printf_error(error_code, ": %d : %s", MYF(0),
2893  mysql_errno(mysql), mysql_error(mysql));
2894  }
2895  else
2896  if (remote_error_number != -1 /* error already reported */)
2897  {
2898  error_code= remote_error_number;
2899  my_error(error_code, MYF(0), ER(error_code));
2900  }
2901  DBUG_RETURN(error_code);
2902 }
2903 
2904 
2913 int ha_federated::extra(ha_extra_function operation)
2914 {
2915  DBUG_ENTER("ha_federated::extra");
2916  switch (operation) {
2917  case HA_EXTRA_IGNORE_DUP_KEY:
2918  ignore_duplicates= TRUE;
2919  break;
2920  case HA_EXTRA_NO_IGNORE_DUP_KEY:
2921  insert_dup_update= FALSE;
2922  ignore_duplicates= FALSE;
2923  break;
2924  case HA_EXTRA_WRITE_CAN_REPLACE:
2925  replace_duplicates= TRUE;
2926  break;
2927  case HA_EXTRA_WRITE_CANNOT_REPLACE:
2928  /*
2929  We use this flag to ensure that we do not create an "INSERT IGNORE"
2930  statement when inserting new rows into the remote table.
2931  */
2932  replace_duplicates= FALSE;
2933  break;
2934  case HA_EXTRA_INSERT_WITH_UPDATE:
2935  insert_dup_update= TRUE;
2936  break;
2937  default:
2938  /* do nothing */
2939  DBUG_PRINT("info",("unhandled operation: %d", (uint) operation));
2940  }
2941  DBUG_RETURN(0);
2942 }
2943 
2944 
2956 {
2957  insert_dup_update= FALSE;
2958  ignore_duplicates= FALSE;
2959  replace_duplicates= FALSE;
2960 
2961  /* Free stored result sets. */
2962  for (uint i= 0; i < results.elements; i++)
2963  {
2964  MYSQL_RES *result;
2965  get_dynamic(&results, (uchar *) &result, i);
2966  mysql_free_result(result);
2967  }
2968  reset_dynamic(&results);
2969 
2970  return 0;
2971 }
2972 
2973 
2974 /*
2975  Used to delete all rows in a table. Both for cases of truncate and
2976  for cases where the optimizer realizes that all rows will be
2977  removed as a result of a SQL statement.
2978 
2979  Called from item_sum.cc by Item_func_group_concat::clear(),
2980  Item_sum_count_distinct::clear(), and Item_func_group_concat::clear().
2981  Called from sql_delete.cc by mysql_delete().
2982  Called from sql_select.cc by JOIN::reinit().
2983  Called from sql_union.cc by st_select_lex_unit::exec().
2984 */
2985 
2987 {
2988  char query_buffer[FEDERATED_QUERY_BUFFER_SIZE];
2989  String query(query_buffer, sizeof(query_buffer), &my_charset_bin);
2990  DBUG_ENTER("ha_federated::delete_all_rows");
2991 
2992  query.length(0);
2993 
2994  query.set_charset(system_charset_info);
2995  query.append(STRING_WITH_LEN("TRUNCATE "));
2996  append_ident(&query, share->table_name, share->table_name_length,
2997  ident_quote_char);
2998 
2999  /*
3000  TRUNCATE won't return anything in mysql_affected_rows
3001  */
3002  if (real_query(query.ptr(), query.length()))
3003  {
3004  DBUG_RETURN(stash_remote_error());
3005  }
3006  stats.deleted+= stats.records;
3007  stats.records= 0;
3008  DBUG_RETURN(0);
3009 }
3010 
3011 
3012 /*
3013  Used to manually truncate the table via a delete of all rows in a table.
3014 */
3015 
3017 {
3018  return delete_all_rows();
3019 }
3020 
3021 
3022 /*
3023  The idea with handler::store_lock() is the following:
3024 
3025  The statement decided which locks we should need for the table
3026  for updates/deletes/inserts we get WRITE locks, for SELECT... we get
3027  read locks.
3028 
3029  Before adding the lock into the table lock handler (see thr_lock.c)
3030  mysqld calls store lock with the requested locks. Store lock can now
3031  modify a write lock to a read lock (or some other lock), ignore the
3032  lock (if we don't want to use MySQL table locks at all) or add locks
3033  for many tables (like we do when we are using a MERGE handler).
3034 
3035  Berkeley DB for federated changes all WRITE locks to TL_WRITE_ALLOW_WRITE
3036  (which signals that we are doing WRITES, but we are still allowing other
3037  reader's and writer's.
3038 
3039  When releasing locks, store_lock() are also called. In this case one
3040  usually doesn't have to do anything.
3041 
3042  In some exceptional cases MySQL may send a request for a TL_IGNORE;
3043  This means that we are requesting the same lock as last time and this
3044  should also be ignored. (This may happen when someone does a flush
3045  table when we have opened a part of the tables, in which case mysqld
3046  closes and reopens the tables and tries to get the same locks at last
3047  time). In the future we will probably try to remove this.
3048 
3049  Called from lock.cc by get_lock_data().
3050 */
3051 
3053  THR_LOCK_DATA **to,
3054  enum thr_lock_type lock_type)
3055 {
3056  DBUG_ENTER("ha_federated::store_lock");
3057  if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
3058  {
3059  /*
3060  Here is where we get into the guts of a row level lock.
3061  If TL_UNLOCK is set
3062  If we are not doing a LOCK TABLE or DISCARD/IMPORT
3063  TABLESPACE, then allow multiple writers
3064  */
3065 
3066  if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
3067  lock_type <= TL_WRITE) && !thd->in_lock_tables)
3068  lock_type= TL_WRITE_ALLOW_WRITE;
3069 
3070  /*
3071  In queries of type INSERT INTO t1 SELECT ... FROM t2 ...
3072  MySQL would use the lock TL_READ_NO_INSERT on t2, and that
3073  would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts
3074  to t2. Convert the lock to a normal read lock to allow
3075  concurrent inserts to t2.
3076  */
3077 
3078  if (lock_type == TL_READ_NO_INSERT && !thd->in_lock_tables)
3079  lock_type= TL_READ;
3080 
3081  lock.type= lock_type;
3082  }
3083 
3084  *to++= &lock;
3085 
3086  DBUG_RETURN(to);
3087 }
3088 
3089 /*
3090  create() does nothing, since we have no local setup of our own.
3091  FUTURE: We should potentially connect to the foreign database and
3092 */
3093 
3094 int ha_federated::create(const char *name, TABLE *table_arg,
3095  HA_CREATE_INFO *create_info)
3096 {
3097  int retval;
3098  THD *thd= current_thd;
3099  FEDERATED_SHARE tmp_share; // Only a temporary share, to test the url
3100  DBUG_ENTER("ha_federated::create");
3101 
3102  retval= parse_url(thd->mem_root, &tmp_share, table_arg, 1);
3103 
3104  DBUG_RETURN(retval);
3105 
3106 }
3107 
3108 
3109 int ha_federated::real_connect()
3110 {
3111  char buffer[FEDERATED_QUERY_BUFFER_SIZE];
3112  String sql_query(buffer, sizeof(buffer), &my_charset_bin);
3113  DBUG_ENTER("ha_federated::real_connect");
3114 
3115  /*
3116  Bug#25679
3117  Ensure that we do not hold the LOCK_open mutex while attempting
3118  to establish Federated connection to guard against a trivial
3119  Denial of Service scenerio.
3120  */
3122 
3123  DBUG_ASSERT(mysql == NULL);
3124 
3125  if (!(mysql= mysql_init(NULL)))
3126  {
3127  remote_error_number= HA_ERR_OUT_OF_MEM;
3128  DBUG_RETURN(-1);
3129  }
3130 
3131  /*
3132  BUG# 17044 Federated Storage Engine is not UTF8 clean
3133  Add set names to whatever charset the table is at open
3134  of table
3135  */
3136  /* this sets the csname like 'set names utf8' */
3137  mysql_options(mysql,MYSQL_SET_CHARSET_NAME,
3138  this->table->s->table_charset->csname);
3139 
3140  sql_query.length(0);
3141 
3142  if (!mysql_real_connect(mysql,
3143  share->hostname,
3144  share->username,
3145  share->password,
3146  share->database,
3147  share->port,
3148  share->socket, 0))
3149  {
3150  stash_remote_error();
3151  mysql_close(mysql);
3152  mysql= NULL;
3153  my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), remote_error_buf);
3154  remote_error_number= -1;
3155  DBUG_RETURN(-1);
3156  }
3157 
3158  /*
3159  We have established a connection, lets try a simple dummy query just
3160  to check that the table and expected columns are present.
3161  */
3162  sql_query.append(share->select_query);
3163  sql_query.append(STRING_WITH_LEN(" WHERE 1=0"));
3164  if (mysql_real_query(mysql, sql_query.ptr(), sql_query.length()))
3165  {
3166  sql_query.length(0);
3167  sql_query.append("error: ");
3168  sql_query.qs_append(mysql_errno(mysql));
3169  sql_query.append(" '");
3170  sql_query.append(mysql_error(mysql));
3171  sql_query.append("'");
3172  mysql_close(mysql);
3173  mysql= NULL;
3174  my_error(ER_FOREIGN_DATA_SOURCE_DOESNT_EXIST, MYF(0), sql_query.ptr());
3175  remote_error_number= -1;
3176  DBUG_RETURN(-1);
3177  }
3178 
3179  /* Just throw away the result, no rows anyways but need to keep in sync */
3180  mysql_free_result(mysql_store_result(mysql));
3181 
3182  /*
3183  Since we do not support transactions at this version, we can let the client
3184  API silently reconnect. For future versions, we will need more logic to
3185  deal with transactions
3186  */
3187 
3188  mysql->reconnect= 1;
3189  DBUG_RETURN(0);
3190 }
3191 
3192 
3193 int ha_federated::real_query(const char *query, size_t length)
3194 {
3195  int rc= 0;
3196  DBUG_ENTER("ha_federated::real_query");
3197 
3198  if (!mysql && (rc= real_connect()))
3199  goto end;
3200 
3201  if (!query || !length)
3202  goto end;
3203 
3204  rc= mysql_real_query(mysql, query, (uint) length);
3205 
3206 end:
3207  DBUG_RETURN(rc);
3208 }
3209 
3210 
3211 int ha_federated::stash_remote_error()
3212 {
3213  DBUG_ENTER("ha_federated::stash_remote_error()");
3214  if (!mysql)
3215  DBUG_RETURN(remote_error_number);
3216  remote_error_number= mysql_errno(mysql);
3217  strmake(remote_error_buf, mysql_error(mysql), sizeof(remote_error_buf)-1);
3218  if (remote_error_number == ER_DUP_ENTRY ||
3219  remote_error_number == ER_DUP_KEY)
3220  DBUG_RETURN(HA_ERR_FOUND_DUPP_KEY);
3221  DBUG_RETURN(HA_FEDERATED_ERROR_WITH_REMOTE_SYSTEM);
3222 }
3223 
3224 
3226 {
3227  DBUG_ENTER("ha_federated::get_error_message");
3228  DBUG_PRINT("enter", ("error: %d", error));
3229  if (error == HA_FEDERATED_ERROR_WITH_REMOTE_SYSTEM)
3230  {
3231  buf->append(STRING_WITH_LEN("Error on remote system: "));
3232  buf->qs_append(remote_error_number);
3233  buf->append(STRING_WITH_LEN(": "));
3234  buf->append(remote_error_buf);
3235 
3236  remote_error_number= 0;
3237  remote_error_buf[0]= '\0';
3238  }
3239  DBUG_PRINT("exit", ("message: %s", buf->ptr()));
3240  DBUG_RETURN(FALSE);
3241 }
3242 
3243 
3256 {
3257  MYSQL_RES *result= mysql_store_result(mysql_arg);
3258  DBUG_ENTER("ha_federated::store_result");
3259  if (result)
3260  {
3261  (void) insert_dynamic(&results, &result);
3262  }
3263  position_called= FALSE;
3264  DBUG_RETURN(result);
3265 }
3266 
3267 
3268 void ha_federated::free_result()
3269 {
3270  DBUG_ENTER("ha_federated::free_result");
3271  if (stored_result && !position_called)
3272  {
3273  mysql_free_result(stored_result);
3274  stored_result= 0;
3275  if (results.elements > 0)
3276  results.elements--;
3277  }
3278  DBUG_VOID_RETURN;
3279 }
3280 
3281 
3282 int ha_federated::external_lock(THD *thd, int lock_type)
3283 {
3284  int error= 0;
3285  DBUG_ENTER("ha_federated::external_lock");
3286 
3287  /*
3288  Support for transactions disabled until WL#2952 fixes it.
3289  */
3290 #ifdef XXX_SUPERCEDED_BY_WL2952
3291  if (lock_type != F_UNLCK)
3292  {
3293  ha_federated *trx= (ha_federated *)thd_get_ha_data(thd, ht);
3294 
3295  DBUG_PRINT("info",("federated not lock F_UNLCK"));
3296  if (!(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)))
3297  {
3298  DBUG_PRINT("info",("federated autocommit"));
3299  /*
3300  This means we are doing an autocommit
3301  */
3302  error= connection_autocommit(TRUE);
3303  if (error)
3304  {
3305  DBUG_PRINT("info", ("error setting autocommit TRUE: %d", error));
3306  DBUG_RETURN(error);
3307  }
3308  trans_register_ha(thd, FALSE, ht);
3309  }
3310  else
3311  {
3312  DBUG_PRINT("info",("not autocommit"));
3313  if (!trx)
3314  {
3315  /*
3316  This is where a transaction gets its start
3317  */
3318  error= connection_autocommit(FALSE);
3319  if (error)
3320  {
3321  DBUG_PRINT("info", ("error setting autocommit FALSE: %d", error));
3322  DBUG_RETURN(error);
3323  }
3324  thd_set_ha_data(thd, ht, this);
3325  trans_register_ha(thd, TRUE, ht);
3326  /*
3327  Send a lock table to the remote end.
3328  We do not support this at the moment
3329  */
3330  if (thd->options & (OPTION_TABLE_LOCK))
3331  {
3332  DBUG_PRINT("info", ("We do not support lock table yet"));
3333  }
3334  }
3335  else
3336  {
3337  ha_federated *ptr;
3338  for (ptr= trx; ptr; ptr= ptr->trx_next)
3339  if (ptr == this)
3340  break;
3341  else if (!ptr->trx_next)
3342  ptr->trx_next= this;
3343  }
3344  }
3345  }
3346 #endif /* XXX_SUPERCEDED_BY_WL2952 */
3347  DBUG_RETURN(error);
3348 }
3349 
3350 
3351 static int federated_commit(handlerton *hton, THD *thd, bool all)
3352 {
3353  int return_val= 0;
3354  ha_federated *trx= (ha_federated *) thd_get_ha_data(thd, hton);
3355  DBUG_ENTER("federated_commit");
3356 
3357  if (all)
3358  {
3359  int error= 0;
3360  ha_federated *ptr, *old= NULL;
3361  for (ptr= trx; ptr; old= ptr, ptr= ptr->trx_next)
3362  {
3363  if (old)
3364  old->trx_next= NULL;
3365  error= ptr->connection_commit();
3366  if (error && !return_val)
3367  return_val= error;
3368  }
3369  thd_set_ha_data(thd, hton, NULL);
3370  }
3371 
3372  DBUG_PRINT("info", ("error val: %d", return_val));
3373  DBUG_RETURN(return_val);
3374 }
3375 
3376 
3377 static int federated_rollback(handlerton *hton, THD *thd, bool all)
3378 {
3379  int return_val= 0;
3380  ha_federated *trx= (ha_federated *)thd_get_ha_data(thd, hton);
3381  DBUG_ENTER("federated_rollback");
3382 
3383  if (all)
3384  {
3385  int error= 0;
3386  ha_federated *ptr, *old= NULL;
3387  for (ptr= trx; ptr; old= ptr, ptr= ptr->trx_next)
3388  {
3389  if (old)
3390  old->trx_next= NULL;
3391  error= ptr->connection_rollback();
3392  if (error && !return_val)
3393  return_val= error;
3394  }
3395  thd_set_ha_data(thd, hton, NULL);
3396  }
3397 
3398  DBUG_PRINT("info", ("error val: %d", return_val));
3399  DBUG_RETURN(return_val);
3400 }
3401 
3402 int ha_federated::connection_commit()
3403 {
3404  DBUG_ENTER("ha_federated::connection_commit");
3405  DBUG_RETURN(execute_simple_query("COMMIT", 6));
3406 }
3407 
3408 
3409 int ha_federated::connection_rollback()
3410 {
3411  DBUG_ENTER("ha_federated::connection_rollback");
3412  DBUG_RETURN(execute_simple_query("ROLLBACK", 8));
3413 }
3414 
3415 
3416 int ha_federated::connection_autocommit(bool state)
3417 {
3418  const char *text;
3419  DBUG_ENTER("ha_federated::connection_autocommit");
3420  text= (state == TRUE) ? "SET AUTOCOMMIT=1" : "SET AUTOCOMMIT=0";
3421  DBUG_RETURN(execute_simple_query(text, 16));
3422 }
3423 
3424 
3425 int ha_federated::execute_simple_query(const char *query, int len)
3426 {
3427  DBUG_ENTER("ha_federated::execute_simple_query");
3428 
3429  if (mysql_real_query(mysql, query, len))
3430  {
3431  DBUG_RETURN(stash_remote_error());
3432  }
3433  DBUG_RETURN(0);
3434 }
3435 
3436 struct st_mysql_storage_engine federated_storage_engine=
3437 { MYSQL_HANDLERTON_INTERFACE_VERSION };
3438 
3439 mysql_declare_plugin(federated)
3440 {
3441  MYSQL_STORAGE_ENGINE_PLUGIN,
3442  &federated_storage_engine,
3443  "FEDERATED",
3444  "Patrick Galbraith and Brian Aker, MySQL AB",
3445  "Federated MySQL storage engine",
3446  PLUGIN_LICENSE_GPL,
3447  federated_db_init, /* Plugin Init */
3448  federated_done, /* Plugin Deinit */
3449  0x0100 /* 1.0 */,
3450  NULL, /* status variables */
3451  NULL, /* system variables */
3452  NULL, /* config options */
3453  0, /* flags */
3454 }
3455 mysql_declare_plugin_end;