MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
hugoPkUpdate.cpp
1 /*
2  Copyright (C) 2003-2008 MySQL AB
3  All rights reserved. Use is subject to license terms.
4 
5  This program is free software; you can redistribute it and/or modify
6  it under the terms of the GNU General Public License as published by
7  the Free Software Foundation; version 2 of the License.
8 
9  This program is distributed in the hope that it will be useful,
10  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  GNU General Public License for more details.
13 
14  You should have received a copy of the GNU General Public License
15  along with this program; if not, write to the Free Software
16  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18 
19 #include <ndb_global.h>
20 
21 #include <NdbOut.hpp>
22 
23 #include <NdbApi.hpp>
24 #include <NdbMain.h>
25 #include <NDBT.hpp>
26 #include <NDBT_Thread.hpp>
27 #include <NDBT_Stats.hpp>
28 #include <NdbSleep.h>
29 #include <getarg.h>
30 
31 #include <HugoTransactions.hpp>
32 
33 static NDBT_ThreadFunc hugoPkUpdate;
34 
35 struct ThrInput {
36  const NdbDictionary::Table* pTab;
37  int records;
38  int batch;
39  int stats;
40 };
41 
42 struct ThrOutput {
43  NDBT_Stats latency;
44 };
45 
46 static int _refresh = 0;
47 
48 int main(int argc, const char** argv){
49  ndb_init();
50 
51  int _records = 0;
52  int _loops = 1;
53  int _threads = 1;
54  int _stats = 0;
55  int _abort = 0;
56  int _batch = 1;
57  const char* _tabname = NULL, *db = 0;
58  int _help = 0;
59 
60  struct getargs args[] = {
61  { "aborts", 'a', arg_integer, &_abort, "percent of transactions that are aborted", "abort%" },
62  { "loops", 'l', arg_integer, &_loops, "number of times to run this program(0=infinite loop)", "loops" },
63  { "threads", 't', arg_integer, &_threads, "number of threads (default 1)", "threads" },
64  { "stats", 's', arg_flag, &_stats, "report latency per batch", "stats" },
65  // { "batch", 'b', arg_integer, &_batch, "batch value", "batch" },
66  { "records", 'r', arg_integer, &_records, "Number of records", "records" },
67  { "usage", '?', arg_flag, &_help, "Print help", "" },
68  { "database", 'd', arg_string, &db, "Database", "" },
69  { "refresh", 0, arg_flag, &_refresh, "refresh record rather than update them", "" }
70 
71  };
72  int num_args = sizeof(args) / sizeof(args[0]);
73  int optind = 0;
74  char desc[] =
75  "tabname\n"\
76  "This program will update all records in a table using PK\n";
77 
78  if(getarg(args, num_args, argc, argv, &optind) ||
79  argv[optind] == NULL || _records == 0 || _help) {
80  arg_printusage(args, num_args, argv[0], desc);
81  return NDBT_ProgramExit(NDBT_WRONGARGS);
82  }
83  _tabname = argv[optind];
84 
85  // Connect to Ndb
87  if(con.connect(12, 5, 1) != 0)
88  {
89  return NDBT_ProgramExit(NDBT_FAILED);
90  }
91 
92  if (con.wait_until_ready(30,0) < 0)
93  {
94  ndbout << "Cluster nodes not ready in 30 seconds." << endl;
95  return NDBT_ProgramExit(NDBT_FAILED);
96  }
97 
98  Ndb MyNdb( &con, db ? db : "TEST_DB" );
99 
100  if(MyNdb.init() != 0){
101  ERR(MyNdb.getNdbError());
102  return NDBT_ProgramExit(NDBT_FAILED);
103  }
104 
105  // Check if table exists in db
106  const NdbDictionary::Table * pTab = NDBT_Table::discoverTableFromDb(&MyNdb, _tabname);
107  if(pTab == NULL){
108  ndbout << " Table " << _tabname << " does not exist!" << endl;
109  return NDBT_ProgramExit(NDBT_WRONGARGS);
110  }
111 
112  // threads
113  NDBT_ThreadSet ths(_threads);
114 
115  // create Ndb object for each thread
116  if (ths.connect(&con, db ? db : "TEST_DB") == -1) {
117  ndbout << "connect failed: err=" << ths.get_err() << endl;
118  return NDBT_ProgramExit(NDBT_FAILED);
119  }
120 
121  // input is options
122  ThrInput input;
123  ths.set_input(&input);
124  input.pTab = pTab;
125  input.records = _records;
126  input.batch = _batch;
127  input.stats = _stats;
128 
129  // output is stats
130  ThrOutput output;
131  ths.set_output<ThrOutput>();
132 
133  int i = 0;
134  while (i < _loops || _loops == 0) {
135  ndbout << i << ": ";
136 
137  ths.set_func(hugoPkUpdate);
138  ths.start();
139  ths.stop();
140 
141  if (ths.get_err())
142  {
143  ths.disconnect();
144  NDBT_ProgramExit(NDBT_FAILED);
145  }
146 
147  if (_stats) {
148  NDBT_Stats latency;
149 
150  // add stats from each thread
151  int n;
152  for (n = 0; n < ths.get_count(); n++) {
153  NDBT_Thread& thr = ths.get_thread(n);
154  ThrOutput* output = (ThrOutput*)thr.get_output();
155  latency += output->latency;
156  }
157 
158  ndbout
159  << "latency per batch (us): "
160  << " samples=" << latency.getCount()
161  << " min=" << (int)latency.getMin()
162  << " max=" << (int)latency.getMax()
163  << " mean=" << (int)latency.getMean()
164  << " stddev=" << (int)latency.getStddev()
165  << endl;
166  }
167  i++;
168  }
169 
170  ths.disconnect();
171 
172  return NDBT_ProgramExit(NDBT_OK);
173 }
174 
175 static void hugoPkUpdate(NDBT_Thread& thr)
176 {
177  const ThrInput* input = (const ThrInput*)thr.get_input();
178  ThrOutput* output = (ThrOutput*)thr.get_output();
179 
180  HugoTransactions hugoTrans(*input->pTab);
181  output->latency.reset();
182  if (input->stats)
183  hugoTrans.setStatsLatency(&output->latency);
184 
185  NDBT_ThreadSet& ths = thr.get_thread_set();
186  hugoTrans.setThrInfo(ths.get_count(), thr.get_thread_no());
187 
188  int ret;
189  if (_refresh == 0)
190  {
191  ret = hugoTrans.pkUpdateRecords(thr.get_ndb(),
192  input->records,
193  input->batch);
194  }
195  else
196  {
197  ret = hugoTrans.pkRefreshRecords(thr.get_ndb(),
198  0,
199  input->records,
200  input->batch);
201  }
202  if (ret != 0)
203  thr.set_err(ret);
204 }