MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
rep_latency.cpp
1 /*
2  Copyright (C) 2007 MySQL AB, 2008 Sun Microsystems, Inc.
3  All rights reserved. Use is subject to license terms.
4 
5  This program is free software; you can redistribute it and/or modify
6  it under the terms of the GNU General Public License as published by
7  the Free Software Foundation; version 2 of the License.
8 
9  This program is distributed in the hope that it will be useful,
10  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  GNU General Public License for more details.
13 
14  You should have received a copy of the GNU General Public License
15  along with this program; if not, write to the Free Software
16  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18 
19 /*
20  * Update on master wait for update on slave
21  *
22  */
23 
24 #include <ndb_global.h>
25 #include <NdbApi.hpp>
26 #include <NdbSleep.h>
27 #include <sys/time.h>
28 #include <NdbOut.hpp>
29 #include <NDBT.hpp>
30 
31 struct Xxx
32 {
33  Ndb *ndb;
34  const NdbDictionary::Table *table;
35  Uint32 pk_col;
36  Uint32 col;
37 };
38 
39 struct XxxR
40 {
41  Uint32 pk_val;
42  Uint32 val;
43  struct timeval start_time;
44  Uint32 latency;
45 };
46 
47 static int
48 prepare_master_or_slave(Ndb &myNdb,
49  const char* table,
50  const char* pk,
51  Uint32 pk_val,
52  const char* col,
53  struct Xxx &xxx,
54  struct XxxR &xxxr);
55 static void
56 run_master_update(struct Xxx &xxx, struct XxxR &xxxr);
57 static void
58 run_slave_wait(struct Xxx &xxx, struct XxxR &xxxr);
59 
60 #define PRINT_ERROR(code,msg) \
61  g_err << "Error in " << __FILE__ << ", line: " << __LINE__ \
62  << ", code: " << code \
63  << ", msg: " << msg << ".\n"
64 #define APIERROR(error) { \
65  PRINT_ERROR((error).code, (error).message); \
66  exit(-1); }
67 
68 int main(int argc, char** argv)
69 {
70  if (argc != 8)
71  {
72  ndbout << "Arguments are <connect_string cluster 1> <connect_string cluster 2> <database> <table name> <primary key> <value of primary key> <attribute to update>.\n";
73  exit(-1);
74  }
75  // ndb_init must be called first
76  ndb_init();
77  {
78  const char *opt_connectstring1 = argv[1];
79  const char *opt_connectstring2 = argv[2];
80  const char *opt_db = argv[3];
81  const char *opt_table = argv[4];
82  const char *opt_pk = argv[5];
83  const Uint32 opt_pk_val = atoi(argv[6]);
84  const char *opt_col = argv[7];
85 
86  // Object representing the cluster 1
87  Ndb_cluster_connection cluster1_connection(opt_connectstring1);
88  // Object representing the cluster 2
89  Ndb_cluster_connection cluster2_connection(opt_connectstring2);
90 
91  // connect cluster 1 and run application
92  // Connect to cluster 1 management server (ndb_mgmd)
93  if (cluster1_connection.connect(4 /* retries */,
94  5 /* delay between retries */,
95  1 /* verbose */))
96  {
97  g_err << "Cluster 1 management server was not ready within 30 secs.\n";
98  exit(-1);
99  }
100  // Optionally connect and wait for the storage nodes (ndbd's)
101  if (cluster1_connection.wait_until_ready(30,0) < 0)
102  {
103  g_err << "Cluster 1 was not ready within 30 secs.\n";
104  exit(-1);
105  }
106  // connect cluster 2 and run application
107  // Connect to cluster management server (ndb_mgmd)
108  if (cluster2_connection.connect(4 /* retries */,
109  5 /* delay between retries */,
110  1 /* verbose */))
111  {
112  g_err << "Cluster 2 management server was not ready within 30 secs.\n";
113  exit(-1);
114  }
115  // Optionally connect and wait for the storage nodes (ndbd's)
116  if (cluster2_connection.wait_until_ready(30,0) < 0)
117  {
118  g_err << "Cluster 2 was not ready within 30 secs.\n";
119  exit(-1);
120  }
121  // Object representing the database
122  Ndb myNdb1(&cluster1_connection, opt_db);
123  Ndb myNdb2(&cluster2_connection, opt_db);
124  //
125  struct Xxx xxx1;
126  struct Xxx xxx2;
127  struct XxxR xxxr;
128  prepare_master_or_slave(myNdb1, opt_table, opt_pk, opt_pk_val, opt_col,
129  xxx1, xxxr);
130  prepare_master_or_slave(myNdb2, opt_table, opt_pk, opt_pk_val, opt_col,
131  xxx2, xxxr);
132  while (1)
133  {
134  // run the application code
135  run_master_update(xxx1, xxxr);
136  run_slave_wait(xxx2, xxxr);
137  ndbout << "latency: " << xxxr.latency << endl;
138  }
139  }
140  // Note: all connections must have been destroyed before calling ndb_end()
141  ndb_end(0);
142 
143  return 0;
144 }
145 
146 static int
147 prepare_master_or_slave(Ndb &myNdb,
148  const char* table,
149  const char* pk,
150  Uint32 pk_val,
151  const char* col,
152  struct Xxx &xxx,
153  struct XxxR &xxxr)
154 {
155  if (myNdb.init())
156  APIERROR(myNdb.getNdbError());
157  const NdbDictionary::Dictionary* myDict = myNdb.getDictionary();
158  const NdbDictionary::Table *myTable = myDict->getTable(table);
159  if (myTable == NULL)
160  APIERROR(myDict->getNdbError());
161  const NdbDictionary::Column *myPkCol = myTable->getColumn(pk);
162  if (myPkCol == NULL)
163  APIERROR(myDict->getNdbError());
164  if (myPkCol->getType() != NdbDictionary::Column::Unsigned)
165  {
166  PRINT_ERROR(0, "Primary key column not of type unsigned");
167  exit(-1);
168  }
169  const NdbDictionary::Column *myCol = myTable->getColumn(col);
170  if (myCol == NULL)
171  APIERROR(myDict->getNdbError());
173  {
174  PRINT_ERROR(0, "Update column not of type unsigned");
175  exit(-1);
176  }
177 
178  xxx.ndb = &myNdb;
179  xxx.table = myTable;
180  xxx.pk_col = myPkCol->getColumnNo();
181  xxx.col = myCol->getColumnNo();
182 
183  xxxr.pk_val = pk_val;
184 
185  return 0;
186 }
187 
188 static void run_master_update(struct Xxx &xxx, struct XxxR &xxxr)
189 {
190  Ndb *ndb = xxx.ndb;
191  const NdbDictionary::Table *myTable = xxx.table;
192  int retry_sleep= 10; /* 10 milliseconds */
193  int retries= 100;
194  while (1)
195  {
196  Uint32 val;
197  NdbTransaction *trans = ndb->startTransaction();
198  if (trans == NULL)
199  goto err;
200  {
201  NdbOperation *op = trans->getNdbOperation(myTable);
202  if (op == NULL)
203  APIERROR(trans->getNdbError());
204  op->readTupleExclusive();
205  op->equal(xxx.pk_col, xxxr.pk_val);
206  op->getValue(xxx.col, (char *)&val);
207  }
208  if (trans->execute(NdbTransaction::NoCommit))
209  goto err;
210  //fprintf(stderr, "read %u\n", val);
211  xxxr.val = val + 1;
212  {
213  NdbOperation *op = trans->getNdbOperation(myTable);
214  if (op == NULL)
215  APIERROR(trans->getNdbError());
216  op->updateTuple();
217  op->equal(xxx.pk_col, xxxr.pk_val);
218  op->setValue(xxx.col, xxxr.val);
219  }
220  if (trans->execute(NdbTransaction::Commit))
221  goto err;
222  ndb->closeTransaction(trans);
223  //fprintf(stderr, "updated to %u\n", xxxr.val);
224  break;
225 err:
226  const NdbError this_error= trans ?
227  trans->getNdbError() : ndb->getNdbError();
228  if (this_error.status == NdbError::TemporaryError)
229  {
230  if (retries--)
231  {
232  if (trans)
233  ndb->closeTransaction(trans);
234  NdbSleep_MilliSleep(retry_sleep);
235  continue; // retry
236  }
237  }
238  if (trans)
239  ndb->closeTransaction(trans);
240  APIERROR(this_error);
241  }
242  /* update done start timer */
243  gettimeofday(&xxxr.start_time, 0);
244 }
245 
246 static void run_slave_wait(struct Xxx &xxx, struct XxxR &xxxr)
247 {
248  struct timeval old_end_time = xxxr.start_time, end_time;
249  Ndb *ndb = xxx.ndb;
250  const NdbDictionary::Table *myTable = xxx.table;
251  int retry_sleep= 10; /* 10 milliseconds */
252  int retries= 100;
253  while (1)
254  {
255  Uint32 val;
256  NdbTransaction *trans = ndb->startTransaction();
257  if (trans == NULL)
258  goto err;
259  {
260  NdbOperation *op = trans->getNdbOperation(myTable);
261  if (op == NULL)
262  APIERROR(trans->getNdbError());
263  op->readTuple();
264  op->equal(xxx.pk_col, xxxr.pk_val);
265  op->getValue(xxx.col, (char *)&val);
266  if (trans->execute(NdbTransaction::Commit))
267  goto err;
268  }
269  /* read done, check time of read */
270  gettimeofday(&end_time, 0);
271  ndb->closeTransaction(trans);
272  //fprintf(stderr, "read %u waiting for %u\n", val, xxxr.val);
273  if (xxxr.val != val)
274  {
275  /* expected value not received yet */
276  retries = 100;
277  NdbSleep_MilliSleep(retry_sleep);
278  old_end_time = end_time;
279  continue;
280  }
281  break;
282 err:
283  const NdbError this_error= trans ?
284  trans->getNdbError() : ndb->getNdbError();
285  if (this_error.status == NdbError::TemporaryError)
286  {
287  if (retries--)
288  {
289  if (trans)
290  ndb->closeTransaction(trans);
291  NdbSleep_MilliSleep(retry_sleep);
292  continue; // retry
293  }
294  }
295  if (trans)
296  ndb->closeTransaction(trans);
297  APIERROR(this_error);
298  }
299 
300  Int64 elapsed_usec1 =
301  ((Int64)end_time.tv_sec - (Int64)xxxr.start_time.tv_sec)*1000*1000 +
302  ((Int64)end_time.tv_usec - (Int64)xxxr.start_time.tv_usec);
303  Int64 elapsed_usec2 =
304  ((Int64)end_time.tv_sec - (Int64)old_end_time.tv_sec)*1000*1000 +
305  ((Int64)end_time.tv_usec - (Int64)old_end_time.tv_usec);
306  xxxr.latency =
307  ((elapsed_usec1 - elapsed_usec2/2)+999)/1000;
308 }