MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ndbapi_event.cpp
1 /*
2  Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
52 #include <NdbApi.hpp>
53 
54 // Used for cout
55 #include <stdio.h>
56 #include <iostream>
57 #include <unistd.h>
58 #ifdef VM_TRACE
59 #include <my_global.h>
60 #endif
61 #ifndef assert
62 #include <assert.h>
63 #endif
64 
65 
107 #define APIERROR(error) \
108  { std::cout << "Error in " << __FILE__ << ", line:" << __LINE__ << ", code:" \
109  << error.code << ", msg: " << error.message << "." << std::endl; \
110  exit(-1); }
111 
112 int myCreateEvent(Ndb* myNdb,
113  const char *eventName,
114  const char *eventTableName,
115  const char **eventColumnName,
116  const int noEventColumnName,
117  bool merge_events);
118 
119 int main(int argc, char** argv)
120 {
121  if (argc < 3)
122  {
123  std::cout << "Arguments are <connect_string cluster> <timeout> [m(merge events)|d(debug)].\n";
124  exit(-1);
125  }
126  const char *connectstring = argv[1];
127  int timeout = atoi(argv[2]);
128  ndb_init();
129  bool merge_events = argc > 3 && strchr(argv[3], 'm') != 0;
130 #ifdef VM_TRACE
131  bool dbug = argc > 3 && strchr(argv[3], 'd') != 0;
132  if (dbug) DBUG_PUSH("d:t:");
133  if (dbug) putenv("API_SIGNAL_LOG=-");
134 #endif
135 
136  Ndb_cluster_connection *cluster_connection=
137  new Ndb_cluster_connection(connectstring); // Object representing the cluster
138 
139  int r= cluster_connection->connect(5 /* retries */,
140  3 /* delay between retries */,
141  1 /* verbose */);
142  if (r > 0)
143  {
144  std::cout
145  << "Cluster connect failed, possibly resolved with more retries.\n";
146  exit(-1);
147  }
148  else if (r < 0)
149  {
150  std::cout
151  << "Cluster connect failed.\n";
152  exit(-1);
153  }
154 
155  if (cluster_connection->wait_until_ready(30,30))
156  {
157  std::cout << "Cluster was not ready within 30 secs." << std::endl;
158  exit(-1);
159  }
160 
161  Ndb* myNdb= new Ndb(cluster_connection,
162  "ndb_examples"); // Object representing the database
163 
164  if (myNdb->init() == -1) APIERROR(myNdb->getNdbError());
165 
166  const char *eventName= "CHNG_IN_t0";
167  const char *eventTableName= "t0";
168  const int noEventColumnName= 5;
169  const char *eventColumnName[noEventColumnName]=
170  {"c0",
171  "c1",
172  "c2",
173  "c3",
174  "c4"
175  };
176 
177  // Create events
178  myCreateEvent(myNdb,
179  eventName,
180  eventTableName,
181  eventColumnName,
182  noEventColumnName,
183  merge_events);
184 
185  // Normal values and blobs are unfortunately handled differently..
186  typedef union { NdbRecAttr* ra; NdbBlob* bh; } RA_BH;
187 
188  int i, j, k, l;
189  j = 0;
190  while (j < timeout) {
191 
192  // Start "transaction" for handling events
193  NdbEventOperation* op;
194  printf("create EventOperation\n");
195  if ((op = myNdb->createEventOperation(eventName)) == NULL)
196  APIERROR(myNdb->getNdbError());
197  op->mergeEvents(merge_events);
198 
199  printf("get values\n");
200  RA_BH recAttr[noEventColumnName];
201  RA_BH recAttrPre[noEventColumnName];
202  // primary keys should always be a part of the result
203  for (i = 0; i < noEventColumnName; i++) {
204  if (i < 4) {
205  recAttr[i].ra = op->getValue(eventColumnName[i]);
206  recAttrPre[i].ra = op->getPreValue(eventColumnName[i]);
207  } else if (merge_events) {
208  recAttr[i].bh = op->getBlobHandle(eventColumnName[i]);
209  recAttrPre[i].bh = op->getPreBlobHandle(eventColumnName[i]);
210  }
211  }
212 
213  // set up the callbacks
214  printf("execute\n");
215  // This starts changes to "start flowing"
216  if (op->execute())
217  APIERROR(op->getNdbError());
218 
219  NdbEventOperation* the_op = op;
220 
221  i= 0;
222  while (i < timeout) {
223  // printf("now waiting for event...\n");
224  int r = myNdb->pollEvents(1000); // wait for event or 1000 ms
225  if (r > 0) {
226  // printf("got data! %d\n", r);
227  while ((op= myNdb->nextEvent())) {
228  assert(the_op == op);
229  i++;
230  switch (op->getEventType()) {
232  printf("%u INSERT", i);
233  break;
235  printf("%u DELETE", i);
236  break;
238  printf("%u UPDATE", i);
239  break;
240  default:
241  abort(); // should not happen
242  }
243  printf(" gci=%d\n", (int)op->getGCI());
244  for (k = 0; k <= 1; k++) {
245  printf(k == 0 ? "post: " : "pre : ");
246  for (l = 0; l < noEventColumnName; l++) {
247  if (l < 4) {
248  NdbRecAttr* ra = k == 0 ? recAttr[l].ra : recAttrPre[l].ra;
249  if (ra->isNULL() >= 0) { // we have a value
250  if (ra->isNULL() == 0) { // we have a non-null value
251  if (l < 2)
252  printf("%-5u", ra->u_32_value());
253  else
254  printf("%-5.4s", ra->aRef());
255  } else
256  printf("%-5s", "NULL");
257  } else
258  printf("%-5s", "-"); // no value
259  } else if (merge_events) {
260  int isNull;
261  NdbBlob* bh = k == 0 ? recAttr[l].bh : recAttrPre[l].bh;
262  bh->getDefined(isNull);
263  if (isNull >= 0) { // we have a value
264  if (! isNull) { // we have a non-null value
265  Uint64 length = 0;
266  bh->getLength(length);
267  // read into buffer
268  unsigned char* buf = new unsigned char [length];
269  memset(buf, 'X', length);
270  Uint32 n = length;
271  bh->readData(buf, n); // n is in/out
272  assert(n == length);
273  // pretty-print
274  bool first = true;
275  Uint32 i = 0;
276  while (i < n) {
277  unsigned char c = buf[i++];
278  Uint32 m = 1;
279  while (i < n && buf[i] == c)
280  i++, m++;
281  if (! first)
282  printf("+");
283  printf("%u%c", m, c);
284  first = false;
285  }
286  printf("[%u]", n);
287  delete [] buf;
288  } else
289  printf("%-5s", "NULL");
290  } else
291  printf("%-5s", "-"); // no value
292  }
293  }
294  printf("\n");
295  }
296  }
297  } // else printf("timed out (%i)\n", timeout);
298  }
299  // don't want to listen to events anymore
300  if (myNdb->dropEventOperation(the_op)) APIERROR(myNdb->getNdbError());
301  the_op = 0;
302 
303  j++;
304  }
305 
306  {
307  NdbDictionary::Dictionary *myDict = myNdb->getDictionary();
308  if (!myDict) APIERROR(myNdb->getNdbError());
309  // remove event from database
310  if (myDict->dropEvent(eventName)) APIERROR(myDict->getNdbError());
311  }
312 
313  delete myNdb;
314  delete cluster_connection;
315  ndb_end(0);
316  return 0;
317 }
318 
319 int myCreateEvent(Ndb* myNdb,
320  const char *eventName,
321  const char *eventTableName,
322  const char **eventColumnNames,
323  const int noEventColumnNames,
324  bool merge_events)
325 {
326  NdbDictionary::Dictionary *myDict= myNdb->getDictionary();
327  if (!myDict) APIERROR(myNdb->getNdbError());
328 
329  const NdbDictionary::Table *table= myDict->getTable(eventTableName);
330  if (!table) APIERROR(myDict->getNdbError());
331 
332  NdbDictionary::Event myEvent(eventName, *table);
333  myEvent.addTableEvent(NdbDictionary::Event::TE_ALL);
334  // myEvent.addTableEvent(NdbDictionary::Event::TE_INSERT);
335  // myEvent.addTableEvent(NdbDictionary::Event::TE_UPDATE);
336  // myEvent.addTableEvent(NdbDictionary::Event::TE_DELETE);
337 
338  myEvent.addEventColumns(noEventColumnNames, eventColumnNames);
339  myEvent.mergeEvents(merge_events);
340 
341  // Add event to database
342  if (myDict->createEvent(myEvent) == 0)
343  myEvent.print();
344  else if (myDict->getNdbError().classification ==
346  printf("Event creation failed, event exists\n");
347  printf("dropping Event...\n");
348  if (myDict->dropEvent(eventName)) APIERROR(myDict->getNdbError());
349  // try again
350  // Add event to database
351  if ( myDict->createEvent(myEvent)) APIERROR(myDict->getNdbError());
352  } else
353  APIERROR(myDict->getNdbError());
354 
355  return 0;
356 }