MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
NdbInfoScanOperation.cpp
1 /*
2  Copyright (c) 2009, 2011, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 #include "NdbInfo.hpp"
19 #include "SignalSender.hpp"
20 #include <kernel/GlobalSignalNumbers.h>
21 #include <AttributeHeader.hpp>
22 #include <signaldata/DbinfoScan.hpp>
23 #include <signaldata/TransIdAI.hpp>
24 #include <signaldata/NodeFailRep.hpp>
25 
27 {
28  NodeBitmask m_nodes_to_scan;
29 };
30 
31 NdbInfoScanOperation::NdbInfoScanOperation(const NdbInfo& info,
32  Ndb_cluster_connection* connection,
33  const NdbInfo::Table* table,
34  Uint32 max_rows, Uint32 max_bytes) :
35  m_impl(*new NdbInfoScanOperationImpl),
36  m_info(info),
37  m_state(Undefined),
38  m_connection(connection),
39  m_signal_sender(NULL),
40  m_table(table),
41  m_node_id(0),
42  m_max_rows(max_rows),
43  m_max_bytes(max_bytes),
44  m_result_data(0x37),
45  m_rows_received(0),
46  m_rows_confirmed(0),
47  m_nodes(0),
48  m_max_nodes(0)
49 {
50 }
51 
52 bool
53 NdbInfoScanOperation::init(Uint32 id)
54 {
55  DBUG_ENTER("NdbInfoScanoperation::init");
56  if (m_state != Undefined)
57  DBUG_RETURN(false);
58 
59  m_signal_sender = new SignalSender(m_connection);
60  if (!m_signal_sender)
61  DBUG_RETURN(false);
62 
63  m_transid0 = id;
64  m_transid1 = m_table->getTableId();
65  m_result_ref = m_signal_sender->getOwnRef();
66 
67  for (unsigned i = 0; i < m_table->columns(); i++)
68  m_recAttrs.push_back(NULL);
69 
70  /*
71  Build a bitmask of nodes that will be scanned if
72  connected and have been API_REGCONFed. Don't include
73  own node since it will always be "connected"
74  */
75  for (Uint32 i = 1; i < MAX_NDB_NODES; i++)
76  m_impl.m_nodes_to_scan.set(i);
77  m_impl.m_nodes_to_scan.clear(refToNode(m_result_ref));
78 
79  m_state = Initial;
80  DBUG_RETURN(true);
81 
82 }
83 
84 NdbInfoScanOperation::~NdbInfoScanOperation()
85 {
86  close();
87  delete m_signal_sender;
88  delete &m_impl;
89 }
90 
91 int
92 NdbInfoScanOperation::readTuples()
93 {
94  if (m_state != Initial)
95  return NdbInfo::ERR_WrongState;
96 
97  m_state = Prepared;
98  return 0;
99 }
100 
101 const NdbInfoRecAttr *
102 NdbInfoScanOperation::getValue(const char * anAttrName)
103 {
104  if (m_state != Prepared)
105  return NULL;
106 
107  const NdbInfo::Column* column = m_table->getColumn(anAttrName);
108  if (!column)
109  return NULL;
110  return getValue(column->m_column_id);
111 }
112 
113 const NdbInfoRecAttr *
114 NdbInfoScanOperation::getValue(Uint32 anAttrId)
115 {
116  if (m_state != Prepared)
117  return NULL;
118 
119  if (anAttrId >= m_recAttrs.size())
120  return NULL;
121 
122  NdbInfoRecAttr *recAttr = new NdbInfoRecAttr;
123  m_recAttrs[anAttrId] = recAttr;
124  return recAttr;
125 }
126 
127 
128 bool
129 NdbInfoScanOperation::find_next_node()
130 {
131  DBUG_ENTER("NdbInfoScanOperation::find_next_node");
132 
133  const NodeId next =
134  m_signal_sender->find_confirmed_node(m_impl.m_nodes_to_scan);
135  if (next == 0)
136  {
137  DBUG_PRINT("info", ("no more alive nodes"));
138  DBUG_RETURN(false);
139  }
140  assert(m_node_id != next);
141  m_impl.m_nodes_to_scan.clear(next);
142  m_node_id = next;
143  m_nodes++;
144 
145  // Check if number of nodes to scan is limited
146  DBUG_PRINT("info", ("nodes: %d, max_nodes: %d", m_nodes, m_max_nodes));
147  if (m_max_nodes && m_nodes > m_max_nodes)
148  {
149  DBUG_PRINT("info", ("Reached max nodes to scan"));
150  DBUG_RETURN(false);
151  }
152 
153  DBUG_PRINT("info", ("switched to node %d", m_node_id));
154  DBUG_RETURN(true);
155 }
156 
157 
158 int NdbInfoScanOperation::execute()
159 {
160  DBUG_ENTER("NdbInfoScanOperation::execute");
161  DBUG_PRINT("info", ("name: '%s', id: %d",
162  m_table->getName(), m_table->getTableId()));
163 
164  if (m_state != Prepared)
165  DBUG_RETURN(NdbInfo::ERR_WrongState);
166 
167  assert(m_cursor.size() == 0);
168  m_state = MoreData;
169 
170  m_signal_sender->lock();
171 
172  if (!find_next_node())
173  {
174  m_signal_sender->unlock();
175  DBUG_RETURN(NdbInfo::ERR_ClusterFailure);
176  }
177 
178  int ret = sendDBINFO_SCANREQ();
179  m_signal_sender->unlock();
180 
181  DBUG_RETURN(ret);
182 }
183 
184 int
185 NdbInfoScanOperation::sendDBINFO_SCANREQ(void)
186 {
187  DBUG_ENTER("NdbInfoScanOperation::sendDBINFO_SCANREQ");
188 
189  SimpleSignal ss;
190  DbinfoScanReq * req = CAST_PTR(DbinfoScanReq, ss.getDataPtrSend());
191 
192  // API Identifiers
193  req->resultData = m_result_data;
194  req->transId[0] = m_transid0;
195  req->transId[1] = m_transid1;
196  req->resultRef = m_result_ref;
197 
198  // Scan parameters
199  req->tableId = m_table->getTableId();
200  req->colBitmap[0] = ~0;
201  req->colBitmap[1] = ~0;
202  req->requestInfo = 0;
203  req->maxRows = m_max_rows;
204  req->maxBytes = m_max_bytes;
205  DBUG_PRINT("info", ("max rows: %d, max bytes: %d", m_max_rows, m_max_bytes));
206 
207  // Scan result
208  req->returnedRows = 0;
209 
210  // Cursor data
211  Uint32* cursor_ptr = DbinfoScan::getCursorPtrSend(req);
212  for (unsigned i = 0; i < m_cursor.size(); i++)
213  {
214  *cursor_ptr = m_cursor[i];
215  DBUG_PRINT("info", ("cursor[%u]: 0x%x", i, m_cursor[i]));
216  cursor_ptr++;
217  }
218  req->cursor_sz = m_cursor.size();
219  m_cursor.clear();
220 
221  assert((m_rows_received == 0 && m_rows_confirmed == (Uint32)~0) || // first
222  m_rows_received == m_rows_confirmed); // subsequent
223 
224  // No rows recieved in this batch yet
225  m_rows_received = 0;
226 
227  // Number of rows returned by batch is not yet known
228  m_rows_confirmed = ~0;
229 
230  assert(m_node_id);
231  Uint32 len = DbinfoScanReq::SignalLength + req->cursor_sz;
232  if (m_signal_sender->sendSignal(m_node_id, ss, DBINFO,
233  GSN_DBINFO_SCANREQ, len) != SEND_OK)
234  {
235  m_state = Error;
236  DBUG_RETURN(NdbInfo::ERR_ClusterFailure);
237  }
238 
239  DBUG_RETURN(0);
240 }
241 
242 int NdbInfoScanOperation::receive(void)
243 {
244  DBUG_ENTER("NdbInfoScanOperation::receive");
245  while (true)
246  {
247  const SimpleSignal* sig = m_signal_sender->waitFor();
248  if (!sig)
249  DBUG_RETURN(-1);
250  //sig->print();
251 
252  int sig_number = sig->readSignalNumber();
253  switch (sig_number) {
254 
255  case GSN_DBINFO_TRANSID_AI:
256  {
257  if (execDBINFO_TRANSID_AI(sig))
258  continue; // Wait for next signal
259 
260  if (m_rows_received < m_rows_confirmed)
261  DBUG_RETURN(1); // Row available
262 
263  // All rows in this batch recieved
264  assert(m_rows_received == m_rows_confirmed);
265 
266  if (m_cursor.size() == 0 && !find_next_node())
267  {
268  DBUG_PRINT("info", ("No cursor -> EOF"));
269  m_state = End;
270  DBUG_RETURN(1); // Row available(will get End on next 'nextResult')
271  }
272 
273  // Cursor is still set, fetch more rows
274  assert(m_state == MoreData);
275  int err = sendDBINFO_SCANREQ();
276  if (err != 0)
277  {
278  DBUG_PRINT("error", ("Failed to request more data"));
279  assert(m_state == Error);
280  // Return error immediately
281  DBUG_RETURN(err);
282  }
283 
284  DBUG_RETURN(1); // Row available
285  break;
286  }
287 
288  case GSN_DBINFO_SCANCONF:
289  {
290  if (execDBINFO_SCANCONF(sig))
291  continue; // Wait for next signal
292 
293  if (m_rows_received < m_rows_confirmed)
294  continue; // Continue waiting(for late TRANSID_AI signals)
295 
296  // All rows in this batch recieved
297  assert(m_rows_received == m_rows_confirmed);
298 
299  if (m_cursor.size() == 0 && !find_next_node())
300  {
301  DBUG_PRINT("info", ("No cursor -> EOF"));
302  m_state = End;
303  DBUG_RETURN(0); // No more rows
304  }
305 
306  // Cursor is still set, fetch more rows
307  assert(m_state == MoreData);
308  int err = sendDBINFO_SCANREQ();
309  if (err != 0)
310  {
311  DBUG_PRINT("error", ("Failed to request more data"));
312  assert(m_state == Error);
313  DBUG_RETURN(err);
314  }
315 
316  continue;
317  }
318 
319  case GSN_DBINFO_SCANREF:
320  {
321  int error;
322  if (execDBINFO_SCANREF(sig, error))
323  continue; // Wait for next signal
324  assert(m_state == Error);
325  DBUG_RETURN(error);
326  break;
327  }
328 
329  case GSN_NODE_FAILREP:
330  {
331  const NodeFailRep * const rep =
332  CAST_CONSTPTR(NodeFailRep, sig->getDataPtr());
333  if (NdbNodeBitmask::get(rep->theNodes, m_node_id))
334  {
335  DBUG_PRINT("info", ("Node %d where scan was runnig failed", m_node_id));
336  m_state = Error;
337  DBUG_RETURN(NdbInfo::ERR_ClusterFailure);
338  }
339  break;
340  }
341 
342  case GSN_NF_COMPLETEREP:
343  // Already handled in NODE_FAILREP
344  break;
345 
346  case GSN_SUB_GCP_COMPLETE_REP:
347  case GSN_API_REGCONF:
348  case GSN_TAKE_OVERTCCONF:
349  case GSN_CONNECT_REP:
350  // ignore
351  break;
352 
353  default:
354  DBUG_PRINT("error", ("Got unexpected signal: %d", sig_number));
355  assert(false);
356  break;
357  }
358  }
359  assert(false); // Should never come here
360  DBUG_RETURN(-1);
361 }
362 
363 int
364 NdbInfoScanOperation::nextResult()
365 {
366  DBUG_ENTER("NdbInfoScanOperation::nextResult");
367 
368  switch(m_state)
369  {
370  case MoreData:
371  {
372  m_signal_sender->lock();
373  int ret = receive();
374  m_signal_sender->unlock();
375  DBUG_RETURN(ret);
376  break;
377  }
378  case End:
379  DBUG_RETURN(0); // EOF
380  break;
381  default:
382  break;
383  }
384  DBUG_RETURN(-1);
385 }
386 
387 void
388 NdbInfoScanOperation::close()
389 {
390  DBUG_ENTER("NdbInfoScanOperation::close");
391 
392  for (unsigned i = 0; i < m_recAttrs.size(); i++)
393  {
394  if (m_recAttrs[i])
395  {
396  delete m_recAttrs[i];
397  m_recAttrs[i] = NULL;
398  }
399  }
400 
401  DBUG_VOID_RETURN;
402 }
403 
404 bool
405 NdbInfoScanOperation::execDBINFO_TRANSID_AI(const SimpleSignal * signal)
406 {
407  DBUG_ENTER("NdbInfoScanOperation::execDBINFO_TRANSID_AI");
408  const TransIdAI* transid =
409  CAST_CONSTPTR(TransIdAI, signal->getDataPtr());
410  if (transid->connectPtr != m_result_data ||
411  transid->transId[0] != m_transid0 ||
412  transid->transId[1] != m_transid1)
413  {
414  // Drop signal that belongs to previous scan
415  DBUG_RETURN(true); // Continue waiting
416  }
417 
418  m_rows_received++;
419  DBUG_PRINT("info", ("rows received: %d", m_rows_received));
420 
421  // Reset all recattr values before reading the new row
422  for (unsigned i = 0; i < m_recAttrs.size(); i++)
423  {
424  if (m_recAttrs[i])
425  m_recAttrs[i]->m_defined = false;
426  }
427 
428  // Read attributes from long signal section
429  AttributeHeader* attr = (AttributeHeader*)signal->ptr[0].p;
430  AttributeHeader* last = (AttributeHeader*)(signal->ptr[0].p +
431  signal->ptr[0].sz);
432  while (attr < last)
433  {
434  const Uint32 col = attr->getAttributeId();
435  const Uint32 len = attr->getByteSize();
436  DBUG_PRINT("info", ("col: %u, len: %u", col, len));
437  if (col < m_recAttrs.size())
438  {
439  NdbInfoRecAttr* rec_attr = m_recAttrs[col];
440  if (rec_attr)
441  {
442  // Update NdbInfoRecAttr pointer, length and defined flag
443  rec_attr->m_data = (const char*)attr->getDataPtr();
444  rec_attr->m_len = len;
445  rec_attr->m_defined = true;
446  }
447  }
448 
449  attr = attr->getNext();
450  }
451 
452  DBUG_RETURN(false); // Don't wait more, process this row
453 }
454 
455 bool
456 NdbInfoScanOperation::execDBINFO_SCANCONF(const SimpleSignal * sig)
457 {
458  DBUG_ENTER("NdbInfoScanOperation::execDBINFO_SCANCONF");
459  const DbinfoScanConf* conf =
460  CAST_CONSTPTR(DbinfoScanConf, sig->getDataPtr());
461  if (conf->resultData != m_result_data ||
462  conf->transId[0] != m_transid0 ||
463  conf->transId[1] != m_transid1 ||
464  conf->resultRef != m_result_ref)
465  {
466  // Drop signal that belongs to previous scan
467  DBUG_RETURN(true); // Continue waiting
468  }
469  const Uint32 tableId = conf->tableId;
470  assert(tableId == m_table->getTableId());
471 
472  // Assert all scan settings is unchanged
473  assert(conf->colBitmap[0] == (Uint32)~0);
474  assert(conf->colBitmap[1] == (Uint32)~0);
475  assert(conf->requestInfo == 0);
476  assert(conf->maxRows == m_max_rows);
477  assert(conf->maxBytes == m_max_bytes);
478 
479  DBUG_PRINT("info", ("returnedRows : %d", conf->returnedRows));
480 
481  // Save cursor data
482  DBUG_PRINT("info", ("cursor size: %d", conf->cursor_sz));
483  assert(m_cursor.size() == 0);
484  const Uint32* cursor_ptr = DbinfoScan::getCursorPtr(conf);
485  for (unsigned i = 0; i < conf->cursor_sz; i++)
486  {
487  m_cursor.push_back(*cursor_ptr);
488  //DBUG_PRINT("info", ("cursor[%u]: 0x%x", i, m_cursor[i]));
489  cursor_ptr++;
490  }
491  assert(conf->cursor_sz == m_cursor.size());
492 
493  assert(m_rows_confirmed == (Uint32)~0); // Should've been unknown until now
494  m_rows_confirmed = conf->returnedRows;
495 
496  // Don't allow confirmation of less rows than already been received
497  DBUG_PRINT("info", ("received: %d, confirmed: %d", m_rows_received, m_rows_confirmed));
498  assert(m_rows_received <= m_rows_confirmed);
499 
500  DBUG_RETURN(false);
501 }
502 
503 bool
504 NdbInfoScanOperation::execDBINFO_SCANREF(const SimpleSignal * signal,
505  int& error_code)
506 {
507  DBUG_ENTER("NdbInfoScanOperation::execDBINFO_SCANREF");
508  const DbinfoScanRef* ref =
509  CAST_CONSTPTR(DbinfoScanRef, signal->getDataPtr());
510 
511  if (ref->resultData != m_result_data ||
512  ref->transId[0] != m_transid0 ||
513  ref->transId[1] != m_transid1 ||
514  ref->resultRef != m_result_ref)
515  {
516  // Drop signal that belongs to previous scan
517  DBUG_RETURN(true); // Continue waiting
518  }
519 
520  error_code = ref->errorCode;
521 
522  m_state = Error;
523  DBUG_RETURN(false);
524 }
525 
526 
527 template class Vector<NdbInfoRecAttr*>;