MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
NDBT_Thread.cpp
1 /*
2  Copyright (C) 2007 MySQL AB, 2009 Sun Microsystems, Inc.
3  All rights reserved. Use is subject to license terms.
4 
5  This program is free software; you can redistribute it and/or modify
6  it under the terms of the GNU General Public License as published by
7  the Free Software Foundation; version 2 of the License.
8 
9  This program is distributed in the hope that it will be useful,
10  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  GNU General Public License for more details.
13 
14  You should have received a copy of the GNU General Public License
15  along with this program; if not, write to the Free Software
16  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18 
19 #include <ndb_global.h>
20 #include <NDBT_Thread.hpp>
21 #include <NdbApi.hpp>
22 
23 NDBT_Thread::NDBT_Thread()
24 {
25  create(0, -1);
26 }
27 
28 NDBT_Thread::NDBT_Thread(NDBT_ThreadSet* thread_set, int thread_no)
29 {
30  create(thread_set, thread_no);
31 }
32 
33 void
34 NDBT_Thread::create(NDBT_ThreadSet* thread_set, int thread_no)
35 {
36  m_magic = NDBT_Thread::Magic;
37 
38  m_state = Wait;
39  m_thread_set = thread_set;
40  m_thread_no = thread_no;
41  m_func = 0;
42  m_input = 0;
43  m_output = 0;
44  m_ndb = 0;
45  m_err = 0;
46 
47  m_mutex = NdbMutex_Create();
48  assert(m_mutex != 0);
49  m_cond = NdbCondition_Create();
50  assert(m_cond != 0);
51 
52  char buf[20];
53  sprintf(buf, "NDBT_%04u", (unsigned)thread_no);
54  const char* name = strdup(buf);
55  assert(name != 0);
56 
57  unsigned stacksize = 512 * 1024;
58  NDB_THREAD_PRIO prio = NDB_THREAD_PRIO_LOW;
59  m_thread = NdbThread_Create(NDBT_Thread_run,
60  (void**)this, stacksize, name, prio);
61  assert(m_thread != 0);
62 }
63 
64 NDBT_Thread::~NDBT_Thread()
65 {
66  if (m_thread != 0) {
67  NdbThread_Destroy(&m_thread);
68  m_thread = 0;
69  }
70  if (m_cond != 0) {
71  NdbCondition_Destroy(m_cond);
72  m_cond = 0;
73  }
74  if (m_mutex != 0) {
75  NdbMutex_Destroy(m_mutex);
76  m_mutex = 0;
77  }
78 }
79 
80 void*
81 NDBT_Thread_run(void* arg)
82 {
83  assert(arg != 0);
84  NDBT_Thread& thr = *(NDBT_Thread*)arg;
85  assert(thr.m_magic == NDBT_Thread::Magic);
86  thr.run();
87  return 0;
88 }
89 
90 void
91 NDBT_Thread::run()
92 {
93  while (1) {
94  lock();
95  while (m_state != Start && m_state != Exit) {
96  wait();
97  }
98  if (m_state == Exit) {
99  unlock();
100  break;
101  }
102  (*m_func)(*this);
103  m_state = Stop;
104  signal();
105  unlock();
106  }
107 }
108 
109 // methods for main process
110 
111 void
112 NDBT_Thread::start()
113 {
114  lock();
115  m_state = Start;
116  signal();
117  unlock();
118 }
119 
120 void
121 NDBT_Thread::stop()
122 {
123  lock();
124  while (m_state != Stop)
125  wait();
126  m_state = Wait;
127  unlock();
128 }
129 
130 void
131 NDBT_Thread::exit()
132 {
133  lock();
134  m_state = Exit;
135  signal();
136  unlock();
137 }
138 
139 void
140 NDBT_Thread::join()
141 {
142  NdbThread_WaitFor(m_thread, &m_status);
143  m_thread = 0;
144 }
145 
146 int
147 NDBT_Thread::connect(class Ndb_cluster_connection* ncc, const char* db)
148 {
149  m_ndb = new Ndb(ncc, db);
150  if (m_ndb->init() == -1 ||
151  m_ndb->waitUntilReady() == -1) {
152  m_err = m_ndb->getNdbError().code;
153  return -1;
154  }
155  return 0;
156 }
157 
158 void
159 NDBT_Thread::disconnect()
160 {
161  delete m_ndb;
162  m_ndb = 0;
163 }
164 
165 // set of threads
166 
167 NDBT_ThreadSet::NDBT_ThreadSet(int count)
168 {
169  m_count = count;
170  m_thread = new NDBT_Thread* [count];
171  for (int n = 0; n < count; n++) {
172  m_thread[n] = new NDBT_Thread(this, n);
173  }
174 }
175 
176 NDBT_ThreadSet::~NDBT_ThreadSet()
177 {
178  for (int n = 0; n < m_count; n++) {
179  delete m_thread[n];
180  m_thread[n] = 0;
181  }
182  delete [] m_thread;
183 }
184 
185 void
186 NDBT_ThreadSet::start()
187 {
188  for (int n = 0; n < m_count; n++) {
189  NDBT_Thread& thr = *m_thread[n];
190  thr.start();
191  }
192 }
193 
194 void
195 NDBT_ThreadSet::stop()
196 {
197  for (int n = 0; n < m_count; n++) {
198  NDBT_Thread& thr = *m_thread[n];
199  thr.stop();
200  }
201 }
202 
203 void
204 NDBT_ThreadSet::exit()
205 {
206  for (int n = 0; n < m_count; n++) {
207  NDBT_Thread& thr = *m_thread[n];
208  thr.exit();
209  }
210 }
211 
212 void
213 NDBT_ThreadSet::join()
214 {
215  for (int n = 0; n < m_count; n++) {
216  NDBT_Thread& thr = *m_thread[n];
217  thr.join();
218  }
219 }
220 
221 void
222 NDBT_ThreadSet::set_func(NDBT_ThreadFunc* func)
223 {
224  for (int n = 0; n < m_count; n++) {
225  NDBT_Thread& thr = *m_thread[n];
226  thr.set_func(func);
227  }
228 }
229 
230 void
231 NDBT_ThreadSet::set_input(const void* input)
232 {
233  for (int n = 0; n < m_count; n++) {
234  NDBT_Thread& thr = *m_thread[n];
235  thr.set_input(input);
236  }
237 }
238 
239 void
240 NDBT_ThreadSet::delete_output()
241 {
242  for (int n = 0; n < m_count; n++) {
243  if (m_thread[n] != 0) {
244  //NDBT_Thread& thr = *m_thread[n];
245  //thr.delete_output();
246  }
247  }
248 }
249 
250 int
251 NDBT_ThreadSet::connect(class Ndb_cluster_connection* ncc, const char* db)
252 {
253  for (int n = 0; n < m_count; n++) {
254  assert(m_thread[n] != 0);
255  NDBT_Thread& thr = *m_thread[n];
256  if (thr.connect(ncc, db) == -1)
257  return -1;
258  }
259  return 0;
260 }
261 
262 void
263 NDBT_ThreadSet::disconnect()
264 {
265  for (int n = 0; n < m_count; n++) {
266  if (m_thread[n] != 0) {
267  NDBT_Thread& thr = *m_thread[n];
268  thr.disconnect();
269  }
270  }
271 }
272 
273 int
274 NDBT_ThreadSet::get_err() const
275 {
276  for (int n = 0; n < m_count; n++) {
277  if (m_thread[n] != 0) {
278  NDBT_Thread& thr = *m_thread[n];
279  int err = thr.get_err();
280  if (err != 0)
281  return err;
282  }
283  }
284  return 0;
285 }