MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
NdbPoolImpl.cpp
1 /*
2  Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 #include "NdbPoolImpl.hpp"
19 
20 NdbMutex *NdbPool::pool_mutex = NULL;
21 NdbPool *the_pool = NULL;
22 
23 NdbPool*
24 NdbPool::create_instance(Ndb_cluster_connection* cc,
25  Uint32 max_ndb_obj,
26  Uint32 no_conn_obj,
27  Uint32 init_no_ndb_objects)
28 {
29  if (!initPoolMutex()) {
30  return NULL;
31  }
32  NdbMutex_Lock(pool_mutex);
33  NdbPool* a_pool;
34  if (the_pool != NULL) {
35  a_pool = NULL;
36  } else {
37  the_pool = new NdbPool(cc, max_ndb_obj, no_conn_obj);
38  if (!the_pool->init(init_no_ndb_objects)) {
39  delete the_pool;
40  the_pool = NULL;
41  }
42  a_pool = the_pool;
43  }
44  NdbMutex* temp = pool_mutex;
45  if (a_pool == NULL) {
46  pool_mutex = NULL;
47  }
48  NdbMutex_Unlock(pool_mutex);
49  if (a_pool == NULL) {
50  NdbMutex_Destroy(temp);
51  }
52  return a_pool;
53 }
54 
55 void
56 NdbPool::drop_instance()
57 {
58  if (pool_mutex == NULL) {
59  return;
60  }
61  NdbMutex_Lock(pool_mutex);
62  the_pool->release_all();
63  delete the_pool;
64  the_pool = NULL;
65  NdbMutex* temp = pool_mutex;
66  NdbMutex_Unlock(temp);
67  NdbMutex_Destroy(temp);
68 }
69 
70 bool
71 NdbPool::initPoolMutex()
72 {
73  bool ret_result = false;
74  if (pool_mutex == NULL) {
75  pool_mutex = NdbMutex_Create();
76  ret_result = ((pool_mutex == NULL) ? false : true);
77  }
78  return ret_result;
79 }
80 
81 NdbPool::NdbPool(Ndb_cluster_connection* cc,
82  Uint32 max_no_objects,
83  Uint32 no_conn_objects)
84 {
85  if (no_conn_objects > 1024) {
86  no_conn_objects = 1024;
87  }
88  if (max_no_objects > MAX_NDB_OBJECTS) {
89  max_no_objects = MAX_NDB_OBJECTS;
90  } else if (max_no_objects == 0) {
91  max_no_objects = 1;
92  }
93  m_max_ndb_objects = max_no_objects;
94  m_no_of_conn_objects = no_conn_objects;
95  m_no_of_objects = 0;
96  m_waiting = 0;
97  m_pool_reference = NULL;
98  m_hash_entry = NULL;
99  m_first_free = NULL_POOL;
100  m_first_not_in_use = NULL_POOL;
101  m_last_free = NULL_POOL;
102  input_pool_cond = NULL;
103  output_pool_cond = NULL;
104  m_output_queue = 0;
105  m_input_queue = 0;
106  m_signal_count = 0;
107  m_cluster_connection = cc;
108 }
109 
110 NdbPool::~NdbPool()
111 {
112  NdbCondition_Destroy(input_pool_cond);
113  NdbCondition_Destroy(output_pool_cond);
114 }
115 
116 void
117 NdbPool::release_all()
118 {
119  int i;
120  for (i = 0; i < m_max_ndb_objects + 1; i++) {
121  if (m_pool_reference[i].ndb_reference != NULL) {
122  assert(m_pool_reference[i].in_use);
123  assert(m_pool_reference[i].free_entry);
124  delete m_pool_reference[i].ndb_reference;
125  }
126  }
127  delete [] m_pool_reference;
128  delete [] m_hash_entry;
129  m_pool_reference = NULL;
130  m_hash_entry = NULL;
131 }
132 
133 bool
134 NdbPool::init(Uint32 init_no_objects)
135 {
136  bool ret_result = false;
137  int i;
138  do {
139  input_pool_cond = NdbCondition_Create();
140  output_pool_cond = NdbCondition_Create();
141  if (input_pool_cond == NULL || output_pool_cond == NULL) {
142  break;
143  }
144  if (init_no_objects > m_max_ndb_objects) {
145  init_no_objects = m_max_ndb_objects;
146  }
147  if (init_no_objects == 0) {
148  init_no_objects = 1;
149  }
150  m_pool_reference = new NdbPool::POOL_STRUCT[m_max_ndb_objects + 1];
151  m_hash_entry = new Uint8[POOL_HASH_TABLE_SIZE];
152 
153  if ((m_pool_reference == NULL) || (m_hash_entry == NULL)) {
154  delete [] m_pool_reference;
155  delete [] m_hash_entry;
156  break;
157  }
158  for (i = 0; i < m_max_ndb_objects + 1; i++) {
159  m_pool_reference[i].ndb_reference = NULL;
160  m_pool_reference[i].in_use = false;
161  m_pool_reference[i].next_free_object = i+1;
162  m_pool_reference[i].prev_free_object = i-1;
163  m_pool_reference[i].next_db_object = NULL_POOL;
164  m_pool_reference[i].prev_db_object = NULL_POOL;
165  }
166  for (i = 0; i < POOL_HASH_TABLE_SIZE; i++) {
167  m_hash_entry[i] = NULL_HASH;
168  }
169  m_pool_reference[m_max_ndb_objects].next_free_object = NULL_POOL;
170  m_pool_reference[1].prev_free_object = NULL_POOL;
171  m_first_not_in_use = 1;
172  m_no_of_objects = init_no_objects;
173  for (i = init_no_objects; i > 0 ; i--) {
174  Uint32 fake_id;
175  if (!allocate_ndb(fake_id, (const char*)NULL, (const char*)NULL)) {
176  release_all();
177  break;
178  }
179  }
180  ret_result = true;
181  break;
182  } while (1);
183  return ret_result;
184 }
185 
186 /*
187 Get an Ndb object.
188 Input:
189 hint_id: 0 = no hint, otherwise a hint of which Ndb object the thread
190  used the last time.
191 a_db_name: NULL = don't check for database specific Ndb object, otherwise
192  a hint of which database is preferred.
193 Output:
194 hint_id: Returns id of Ndb object returned
195 Return value: Ndb object pointer
196 */
197 Ndb*
198 NdbPool::get_ndb_object(Uint32 &hint_id,
199  const char* a_catalog_name,
200  const char* a_schema_name)
201 {
202  Ndb* ret_ndb = NULL;
203  Uint32 hash_entry = compute_hash(a_schema_name);
204  NdbMutex_Lock(pool_mutex);
205  while (1) {
206  /*
207  We start by checking if we can use the hinted Ndb object.
208  */
209  if ((ret_ndb = get_hint_ndb(hint_id, hash_entry)) != NULL) {
210  break;
211  }
212  /*
213  The hinted Ndb object was not free. We need to allocate another object.
214  We start by checking for a free Ndb object connected to the same database.
215  */
216  if (a_schema_name && (ret_ndb = get_db_hash(hint_id,
217  hash_entry,
218  a_catalog_name,
219  a_schema_name))) {
220  break;
221  }
222  /*
223  No Ndb object connected to the preferred database was found.
224  We look for a free Ndb object in general.
225  */
226  if ((ret_ndb = get_free_list(hint_id, hash_entry)) != NULL) {
227  break;
228  }
229  /*
230  No free Ndb object was found. If we haven't allocated objects up until the
231  maximum number yet then we can allocate a new Ndb object here.
232  */
233  if (m_no_of_objects < m_max_ndb_objects) {
234  if (allocate_ndb(hint_id, a_catalog_name, a_schema_name)) {
235  assert((ret_ndb = get_hint_ndb(hint_id, hash_entry)) != NULL);
236  break;
237  }
238  }
239  /*
240  We need to wait until an Ndb object becomes
241  available.
242  */
243  if ((ret_ndb = wait_free_ndb(hint_id)) != NULL) {
244  break;
245  }
246  /*
247  Not even after waiting were we able to get hold of an Ndb object. We
248  return NULL to indicate this problem.
249  */
250  ret_ndb = NULL;
251  break;
252  }
253  NdbMutex_Unlock(pool_mutex);
254  if (ret_ndb != NULL) {
255  /*
256  We need to set the catalog and schema name of the Ndb object before
257  returning it to the caller.
258  */
259  ret_ndb->setCatalogName(a_catalog_name);
260  ret_ndb->setSchemaName(a_schema_name);
261  }
262  return ret_ndb;
263 }
264 
265 void
266 NdbPool::return_ndb_object(Ndb* returned_ndb, Uint32 id)
267 {
268  NdbMutex_Lock(pool_mutex);
269  assert(id <= m_max_ndb_objects);
270  assert(id != 0);
271  assert(returned_ndb == m_pool_reference[id].ndb_reference);
272  bool wait_cond = m_waiting;
273  if (wait_cond) {
274  NdbCondition* pool_cond;
275  if (m_signal_count > 0) {
276  pool_cond = output_pool_cond;
277  m_signal_count--;
278  } else {
279  pool_cond = input_pool_cond;
280  }
281  add_wait_list(id);
282  NdbMutex_Unlock(pool_mutex);
283  NdbCondition_Signal(pool_cond);
284  } else {
285  add_free_list(id);
286  add_db_hash(id);
287  NdbMutex_Unlock(pool_mutex);
288  }
289 }
290 
291 bool
292 NdbPool::allocate_ndb(Uint32 &id,
293  const char* a_catalog_name,
294  const char* a_schema_name)
295 {
296  Ndb* a_ndb;
297  if (m_first_not_in_use == NULL_POOL) {
298  return false;
299  }
300  if (a_schema_name) {
301  a_ndb = new Ndb(m_cluster_connection, a_schema_name, a_catalog_name);
302  } else {
303  a_ndb = new Ndb(m_cluster_connection, "");
304  }
305  if (a_ndb == NULL) {
306  return false;
307  }
308  a_ndb->init(m_no_of_conn_objects);
309  m_no_of_objects++;
310 
311  id = m_first_not_in_use;
312  Uint32 allocated_id = m_first_not_in_use;
313  m_first_not_in_use = m_pool_reference[allocated_id].next_free_object;
314 
315  m_pool_reference[allocated_id].ndb_reference = a_ndb;
316  m_pool_reference[allocated_id].in_use = true;
317  m_pool_reference[allocated_id].free_entry = false;
318 
319  add_free_list(allocated_id);
320  add_db_hash(allocated_id);
321  return true;
322 }
323 
324 void
325 NdbPool::add_free_list(Uint32 id)
326 {
327  assert(!m_pool_reference[id].free_entry);
328  assert(m_pool_reference[id].in_use);
329  m_pool_reference[id].free_entry = true;
330  m_pool_reference[id].next_free_object = m_first_free;
331  m_pool_reference[id].prev_free_object = (Uint8)NULL_POOL;
332  m_first_free = (Uint8)id;
333  if (m_last_free == (Uint8)NULL_POOL) {
334  m_last_free = (Uint8)id;
335  }
336 }
337 
338 void
339 NdbPool::add_db_hash(Uint32 id)
340 {
341  Ndb* t_ndb = m_pool_reference[id].ndb_reference;
342  const char* schema_name = t_ndb->getSchemaName();
343  Uint32 hash_entry = compute_hash(schema_name);
344  Uint8 next_db_entry = m_hash_entry[hash_entry];
345  m_pool_reference[id].next_db_object = next_db_entry;
346  m_pool_reference[id].prev_db_object = (Uint8)NULL_HASH;
347  m_hash_entry[hash_entry] = (Uint8)id;
348 }
349 
350 Ndb*
351 NdbPool::get_free_list(Uint32 &id, Uint32 hash_entry)
352 {
353  if (m_first_free == NULL_POOL) {
354  return NULL;
355  }
356  id = m_first_free;
357  Ndb* ret_ndb = get_hint_ndb(m_first_free, hash_entry);
358  assert(ret_ndb != NULL);
359  return ret_ndb;
360 }
361 
362 Ndb*
363 NdbPool::get_db_hash(Uint32 &id,
364  Uint32 hash_entry,
365  const char *a_catalog_name,
366  const char *a_schema_name)
367 {
368  Uint32 entry_id = m_hash_entry[hash_entry];
369  bool found = false;
370  while (entry_id != NULL_HASH) {
371  Ndb* t_ndb = m_pool_reference[entry_id].ndb_reference;
372  const char *a_ndb_catalog_name = t_ndb->getCatalogName();
373  if (strcmp(a_catalog_name, a_ndb_catalog_name) == 0) {
374  const char *a_ndb_schema_name = t_ndb->getSchemaName();
375  if (strcmp(a_schema_name, a_ndb_schema_name) == 0) {
376  found = true;
377  break;
378  }
379  }
380  entry_id = m_pool_reference[entry_id].next_db_object;
381  }
382  if (found) {
383  id = entry_id;
384  Ndb* ret_ndb = get_hint_ndb(entry_id, hash_entry);
385  assert(ret_ndb != NULL);
386  return ret_ndb;
387  }
388  return NULL;
389 }
390 
391 Ndb*
392 NdbPool::get_hint_ndb(Uint32 hint_id, Uint32 hash_entry)
393 {
394  Ndb* ret_ndb = NULL;
395  do {
396  if ((hint_id != 0) &&
397  (hint_id <= m_max_ndb_objects) &&
398  (m_pool_reference[hint_id].in_use) &&
399  (m_pool_reference[hint_id].free_entry)) {
400  ret_ndb = m_pool_reference[hint_id].ndb_reference;
401  if (ret_ndb != NULL) {
402  break;
403  } else {
404  assert(false);
405  }
406  }
407  return NULL;
408  } while (1);
409  /*
410  This is where we remove the entry from the free list and from the db hash
411  table.
412  */
413  remove_free_list(hint_id);
414  remove_db_hash(hint_id, hash_entry);
415  return ret_ndb;
416 }
417 
418 void
419 NdbPool::remove_free_list(Uint32 id)
420 {
421  Uint16 next_free_entry = m_pool_reference[id].next_free_object;
422  Uint16 prev_free_entry = m_pool_reference[id].prev_free_object;
423  if (prev_free_entry == (Uint8)NULL_POOL) {
424  m_first_free = next_free_entry;
425  } else {
426  m_pool_reference[prev_free_entry].next_free_object = next_free_entry;
427  }
428  if (next_free_entry == (Uint8)NULL_POOL) {
429  m_last_free = prev_free_entry;
430  } else {
431  m_pool_reference[next_free_entry].prev_free_object = prev_free_entry;
432  }
433  m_pool_reference[id].next_free_object = NULL_POOL;
434  m_pool_reference[id].prev_free_object = NULL_POOL;
435  m_pool_reference[id].free_entry = false;
436 }
437 
438 void
439 NdbPool::remove_db_hash(Uint32 id, Uint32 hash_entry)
440 {
441  Uint16 next_free_entry = m_pool_reference[id].next_db_object;
442  Uint16 prev_free_entry = m_pool_reference[id].prev_db_object;
443  if (prev_free_entry == (Uint8)NULL_HASH) {
444  m_hash_entry[hash_entry] = (Uint8)next_free_entry;
445  } else {
446  m_pool_reference[prev_free_entry].next_db_object = next_free_entry;
447  }
448  if (next_free_entry == (Uint8)NULL_HASH) {
449  ;
450  } else {
451  m_pool_reference[next_free_entry].prev_db_object = prev_free_entry;
452  }
453  m_pool_reference[id].next_db_object = NULL_HASH;
454  m_pool_reference[id].prev_db_object = NULL_HASH;
455 }
456 
457 Uint32
458 NdbPool::compute_hash(const char *a_schema_name)
459 {
460  Uint32 len = Uint32(strlen(a_schema_name));
461  Uint32 h = 147;
462  for (Uint32 i = 0; i < len; i++) {
463  Uint32 c = a_schema_name[i];
464  h = (h << 5) + h + c;
465  }
466  h &= (POOL_HASH_TABLE_SIZE - 1);
467  return h;
468 }
469 
470 Ndb*
471 NdbPool::wait_free_ndb(Uint32 &id)
472 {
473  int res;
474  int time_out = 3500;
475  do {
476  NdbCondition* tmp = input_pool_cond;
477  m_waiting++;
478  m_input_queue++;
479  time_out -= 500;
480  res = NdbCondition_WaitTimeout(input_pool_cond, pool_mutex, time_out);
481  if (tmp == input_pool_cond) {
482  m_input_queue--;
483  } else {
484  m_output_queue--;
485  if (m_output_queue == 0) {
486  switch_condition_queue();
487  }
488  }
489  m_waiting--;
490  } while (res == 0 && m_first_wait == NULL_POOL);
491  if (res != 0 && m_first_wait == NULL_POOL) {
492  return NULL;
493  }
494  id = m_first_wait;
495  remove_wait_list();
496  assert(m_waiting != 0 || m_first_wait == NULL_POOL);
497  return m_pool_reference[id].ndb_reference;
498 }
499 
500 void
501 NdbPool::remove_wait_list()
502 {
503  Uint32 id = m_first_wait;
504  m_first_wait = m_pool_reference[id].next_free_object;
505  m_pool_reference[id].next_free_object = NULL_POOL;
506  m_pool_reference[id].prev_free_object = NULL_POOL;
507  m_pool_reference[id].free_entry = false;
508 }
509 
510 void
511 NdbPool::add_wait_list(Uint32 id)
512 {
513  m_pool_reference[id].next_free_object = m_first_wait;
514  m_first_wait = id;
515 }
516 
517 void
518 NdbPool::switch_condition_queue()
519 {
520  m_signal_count = m_input_queue;
521  Uint16 move_queue = m_input_queue;
522  m_input_queue = m_output_queue;
523  m_output_queue = move_queue;
524 
525  NdbCondition* move_cond = input_pool_cond;
526  input_pool_cond = output_pool_cond;
527  output_pool_cond = move_cond;
528 }
529