MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
NdbReceiver.hpp
1 /*
2  Copyright (C) 2003-2008 MySQL AB, 2009 Sun Microsystems, Inc.
3  All rights reserved. Use is subject to license terms.
4 
5  This program is free software; you can redistribute it and/or modify
6  it under the terms of the GNU General Public License as published by
7  the Free Software Foundation; version 2 of the License.
8 
9  This program is distributed in the hope that it will be useful,
10  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  GNU General Public License for more details.
13 
14  You should have received a copy of the GNU General Public License
15  along with this program; if not, write to the Free Software
16  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18 
19 #ifndef NdbReceiver_H
20 #define NdbReceiver_H
21 #ifndef DOXYGEN_SHOULD_SKIP_INTERNAL // Not part of public interface
22 
23 #include <ndb_types.h>
24 
25 
26 class Ndb;
27 class NdbImpl;
28 class NdbTransaction;
29 class NdbRecord;
31 
33 {
34  friend class Ndb;
35  friend class NdbOperation;
36  friend class NdbQueryImpl;
37  friend class NdbQueryOperationImpl;
38  friend class NdbResultStream;
39  friend class NdbScanOperation;
40  friend class NdbIndexOperation;
41  friend class NdbIndexScanOperation;
42  friend class NdbTransaction;
43  friend class NdbRootFragment;
44  friend int compare_ndbrecord(const NdbReceiver *r1,
45  const NdbReceiver *r2,
46  const NdbRecord *key_record,
47  const NdbRecord *result_record,
48  bool descending,
49  bool read_range_no);
50  friend int spjTest(int argc, char** argv);
51 
52 public:
53  enum ReceiverType { NDB_UNINITIALIZED,
54  NDB_OPERATION = 1,
55  NDB_SCANRECEIVER = 2,
56  NDB_INDEX_OPERATION = 3,
57  NDB_QUERY_OPERATION = 4
58  };
59 
60  NdbReceiver(Ndb *aNdb);
61  int init(ReceiverType type, bool useRec, void* owner);
62  void release();
63  ~NdbReceiver();
64 
65  Uint32 getId() const{
66  return m_id;
67  }
68 
69  ReceiverType getType() const {
70  return m_type;
71  }
72 
73  inline NdbTransaction * getTransaction() const;
74  void* getOwner() const {
75  return m_owner;
76  }
77 
78  bool checkMagicNumber() const;
79 
80  inline void next(NdbReceiver* next_arg) { m_next = next_arg;}
81  inline NdbReceiver* next() { return m_next; }
82 
83  void setErrorCode(int);
84 
85  /* Prepare for receiving of rows into specified buffer */
86  void prepareReceive(char *buf);
87 
88  /* Prepare for reading of rows from specified buffer */
89  void prepareRead(char *buf, Uint32 rows);
90 
91 private:
92  Uint32 theMagicNumber;
93  Ndb* m_ndb;
94  Uint32 m_id;
95  Uint32 m_tcPtrI;
96  ReceiverType m_type;
97  void* m_owner;
98  NdbReceiver* m_next;
99 
103  class NdbRecAttr * getValue(const class NdbColumnImpl*, char * user_dst_ptr);
104  void getValues(const NdbRecord*, char*);
105  void prepareSend();
106 
107  static
108  void calculate_batch_size(const NdbImpl&,
109  const NdbRecord *,
110  const NdbRecAttr *first_rec_attr,
111  Uint32, Uint32, Uint32&, Uint32&, Uint32&);
112 
113  void calculate_batch_size(Uint32 key_size,
114  Uint32 parallelism,
115  Uint32& batch_size,
116  Uint32& batch_byte_size,
117  Uint32& first_batch_size,
118  const NdbRecord *rec) const;
119 
120  /*
121  Set up buffers for receiving TRANSID_AI and KEYINFO20 signals
122  during a scan using NdbRecord.
123  */
124  void do_setup_ndbrecord(const NdbRecord *ndb_record, Uint32 batch_size,
125  Uint32 key_size, Uint32 read_range_no,
126  Uint32 rowsize, char *buf);
127 
128  static
129  Uint32 ndbrecord_rowsize(const NdbRecord *ndb_record,
130  const NdbRecAttr *first_rec_attr,
131  Uint32 key_size,
132  bool read_range_no);
133 
134 
135  int execKEYINFO20(Uint32 info, const Uint32* ptr, Uint32 len);
136  int execTRANSID_AI(const Uint32* ptr, Uint32 len);
137  int execTCOPCONF(Uint32 len);
138  int execSCANOPCONF(Uint32 tcPtrI, Uint32 len, Uint32 rows);
139 
140  /*
141  We keep different state for old NdbRecAttr based operation and for
142  new NdbRecord style operation.
143  */
144  bool m_using_ndb_record;
145 
146  /* members used for NdbRecord operation. */
147  struct {
148  const NdbRecord *m_ndb_record;
149  /* Destination to receive next row into. */
150  char *m_row_recv;
151  /* Block of memory used to read all rows in a batch during scan. */
152  char *m_row_buffer;
153  /*
154  Offsets between two rows in m_row_buffer.
155  This can be different from m_ndb_record->m_row_size, as we sometimes
156  store extra information after each row (range_no and keyinfo).
157  For non-scan operations, this is set to zero.
158  */
159  Uint32 m_row_offset;
160  /*
161  m_read_range_no is true if we are storing the range_no at the end of
162  each row during scans.
163  */
164  bool m_read_range_no;
165  } m_record;
166 
167  class NdbRecAttr* theFirstRecAttr;
168  class NdbRecAttr* theCurrentRecAttr;
169 
170  /*
171  m_rows is only used in NdbRecAttr mode, but is kept during NdbRecord mode
172  operation to avoid the need for re-allocation.
173  */
174  class NdbRecAttr** m_rows;
175 
176  /*
177  When an NdbReceiver is sitting in the NdbScanOperation::m_sent_receivers
178  array, waiting to receive TRANSID_AI data from the kernel, its index into
179  m_sent_receivers is stored in m_list_index, so that we can remove it when
180  done without having to search for it.
181  */
182  Uint32 m_list_index;
183  /*
184  m_current_row serves two purposes, both used during scans:
185 
186  1. While rows are being received from the kernel (and the receiver is
187  sitting in the NdbScanOperation::m_sent_receivers array), it holds the
188  row index (into m_rows) for the row to receive the next KEYINFO20 data.
189  This is used to receive keyInfo during scans (for scans that request
190  keyInfo).
191 
192  2. While rows are being delivered to the application (and the receiver is
193  sitting in the NdbScanOperation::m_api_receivers array), it holds the
194  next row to be delivered to the application.
195 
196  For NdbRecord operation, it works similarly, but instead indexes rows in
197  the RdbRecord m_row_buffer.
198  */
199  Uint32 m_current_row;
200  /* m_result_rows: Total number of rows contained in this batch. */
201  Uint32 m_result_rows;
202 
203  Uint32 m__UNUSED;
204 
205  /*
206  m_expected_result_length: Total number of 32-bit words of TRANSID_AI and
207  KEYINFO20 data to receive. This is set to zero until SCAN_TABCONF has
208  been received.
209  */
210  Uint32 m_expected_result_length;
211  Uint32 m_received_result_length;
212 
213  bool hasResults() const { return m_result_rows > 0; }
214  bool nextResult() const { return m_current_row < m_result_rows; }
215  Uint32 receive_packed_recattr(NdbRecAttr**, Uint32 bmlen,
216  const Uint32* aDataPtr, Uint32 aLength);
217  Uint32 receive_packed_ndbrecord(Uint32 bmlen,
218  const Uint32* aDataPtr,
219  char* row);
220  /* get_row() returns the next available row during NdbRecord scans. */
221  const char *get_row();
222  /*
223  peek_row() returns the row pointer that get_row() will return on next call,
224  without advancing the internal pointer.
225  So two successive calls to peek_row() will return the same pointer, whereas
226  two successive calls to get_row would return different pointers.
227  */
228  const char *peek_row() const;
229  /* get_range_no() returns the range_no from the last returned row. */
230  int get_range_no() const;
231  /* get_keyinfo20)_ returns keyinfo from KEYINFO20 signal. */
232  int get_keyinfo20(Uint32 & scaninfo, Uint32 & length,
233  const char * & data_ptr) const;
234  int getScanAttrData(const char * & data, Uint32 & size, Uint32 & pos) const;
236  void setCurrentRow(char* buffer, Uint32 row);
238  Uint32 getCurrentRow() const { return m_current_row; }
239 };
240 
241 #ifdef NDB_NO_DROPPED_SIGNAL
242 #include <stdlib.h>
243 #endif
244 
245 inline
246 bool
247 NdbReceiver::checkMagicNumber() const {
248  bool retVal = (theMagicNumber == 0x11223344);
249 #ifdef NDB_NO_DROPPED_SIGNAL
250  if(!retVal){
251  abort();
252  }
253 #endif
254  return retVal;
255 }
256 
257 inline
258 void
259 NdbReceiver::prepareSend(){
260  /* Set pointers etc. to prepare for receiving the first row of the batch. */
261  theMagicNumber = 0x11223344;
262  m_current_row = 0;
263  m_result_rows = 0;
264  m_received_result_length = 0;
265  m_expected_result_length = 0;
266  if (m_using_ndb_record)
267  {
268  if (m_type==NDB_SCANRECEIVER || m_type==NDB_QUERY_OPERATION)
269  m_record.m_row_recv= m_record.m_row_buffer;
270  }
271  theCurrentRecAttr = theFirstRecAttr;
272 }
273 
274 inline
275 int
276 NdbReceiver::execTCOPCONF(Uint32 len){
277  Uint32 tmp = m_received_result_length;
278  m_expected_result_length = len;
279 #ifdef assert
280  assert(!(tmp && !len));
281 #endif
282  return ((bool)len ^ (bool)tmp ? 0 : 1);
283 }
284 
285 inline
286 int
287 NdbReceiver::execSCANOPCONF(Uint32 tcPtrI, Uint32 len, Uint32 rows){
288  m_tcPtrI = tcPtrI;
289  m_result_rows = rows;
290  Uint32 tmp = m_received_result_length;
291  m_expected_result_length = len;
292  return (tmp == len ? 1 : 0);
293 }
294 
295 inline
296 void
297 NdbReceiver::setCurrentRow(char* buffer, Uint32 row)
298 {
299  m_record.m_row_buffer = buffer;
300  m_current_row = row;
301 #ifdef assert
302  assert(m_current_row < m_result_rows);
303 #endif
304 }
305 
306 inline
307 const char *
308 NdbReceiver::get_row()
309 {
310 #ifdef assert
311  assert(m_current_row < m_result_rows);
312 #endif
313  return m_record.m_row_buffer + (m_current_row++ * m_record.m_row_offset);
314 }
315 
316 inline
317 const char *
318 NdbReceiver::peek_row() const
319 {
320  return m_record.m_row_buffer + m_current_row * m_record.m_row_offset;
321 }
322 
323 
324 #endif // DOXYGEN_SHOULD_SKIP_INTERNAL
325 #endif