MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
NDBT_Thread.hpp
1 /*
2  Copyright (C) 2007 MySQL AB, 2009 Sun Microsystems, Inc.
3  All rights reserved. Use is subject to license terms.
4 
5  This program is free software; you can redistribute it and/or modify
6  it under the terms of the GNU General Public License as published by
7  the Free Software Foundation; version 2 of the License.
8 
9  This program is distributed in the hope that it will be useful,
10  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  GNU General Public License for more details.
13 
14  You should have received a copy of the GNU General Public License
15  along with this program; if not, write to the Free Software
16  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18 
19 #ifndef NDB_THREAD_HPP
20 #define NDB_THREAD_HPP
21 
22 #include <NdbMutex.h>
23 #include <NdbCondition.h>
24 #include <NdbThread.h>
25 
26 // NDBT_Thread ctor -> NDBT_Thread_run -> thr.run()
27 extern "C" {
28 void* NDBT_Thread_run(void* arg);
29 }
30 
31 // Function to run in a thread.
32 
33 typedef void NDBT_ThreadFunc(class NDBT_Thread&);
34 
35 /*
36  * NDBT_Thread
37  *
38  * Represents a thread. The thread pauses at startup.
39  * Main process sets a function to run. When the function
40  * returns, the thread pauses again to wait for a command.
41  * This allows main process to sync with the thread and
42  * exchange data with it.
43  *
44  * Input to thread is typically options. The input area
45  * is read-only in the thread. Output from thread is
46  * results such as statistics. Error code is handled
47  * separately.
48  *
49  * Pointer to Ndb object and method to create it are
50  * provided for convenience.
51  */
52 
53 class NDBT_ThreadSet;
54 
55 class NDBT_Thread {
56 public:
57  NDBT_Thread();
58  NDBT_Thread(NDBT_ThreadSet* thread_set, int thread_no);
59  void create(NDBT_ThreadSet* thread_set, int thread_no);
60  ~NDBT_Thread();
61 
62  // if part of a set
63  inline NDBT_ThreadSet& get_thread_set() const {
64  assert(m_thread_set != 0);
65  return *m_thread_set;
66  }
67  inline int get_thread_no() const {
68  return m_thread_no;
69  }
70 
71  // { Wait -> Start -> Stop }+ -> Exit
72  enum State {
73  Wait = 1, // wait for command
74  Start, // run current function
75  Stop, // stopped (paused) when current function done
76  Exit // exit thread
77  };
78 
79  // tell thread to start running current function
80  void start();
81  // wait for thread to stop when function is done
82  void stop();
83  // tell thread to exit
84  void exit();
85  // collect thread after exit
86  void join();
87 
88  // set function to run
89  inline void set_func(NDBT_ThreadFunc* func) {
90  m_func = func;
91  }
92 
93  // input area
94  inline void set_input(const void* input) {
95  m_input = input;
96  }
97  inline const void* get_input() const {
98  return m_input;
99  }
100 
101  // output area
102  inline void set_output(void* output) {
103  m_output = output;
104  }
105  inline void* get_output() const {
106  return m_output;
107  }
108  template <class T> inline void set_output() {
109  set_output(new T);
110  }
111 #if 0
112  inline void delete_output() {
113  delete m_output;
114  m_output = 0;
115  }
116 #endif
117 
118  // thread-specific Ndb object
119  inline class Ndb* get_ndb() const {
120  return m_ndb;
121  }
122  int connect(class Ndb_cluster_connection*, const char* db = "TEST_DB");
123  void disconnect();
124 
125  // error code (OS, Ndb, other)
126  void clear_err() {
127  m_err = 0;
128  }
129  void set_err(int err) {
130  m_err = err;
131  }
132  int get_err() const {
133  return m_err;
134  }
135 
136 private:
137  friend class NDBT_ThreadSet;
138  friend void* NDBT_Thread_run(void* arg);
139 
140  enum { Magic = 0xabacadae };
141  Uint32 m_magic;
142 
143  State m_state;
144  NDBT_ThreadSet* m_thread_set;
145  int m_thread_no;
146 
147  NDBT_ThreadFunc* m_func;
148  const void* m_input;
149  void* m_output;
150  class Ndb* m_ndb;
151  int m_err;
152 
153  // run the thread
154  void run();
155 
156  void lock() {
157  NdbMutex_Lock(m_mutex);
158  }
159  void unlock() {
160  NdbMutex_Unlock(m_mutex);
161  }
162 
163  void wait() {
164  NdbCondition_Wait(m_cond, m_mutex);
165  }
166  void signal() {
167  NdbCondition_Signal(m_cond);
168  }
169 
170  NdbMutex* m_mutex;
171  NdbCondition* m_cond;
172  NdbThread* m_thread;
173  void* m_status;
174 };
175 
176 /*
177  * A set of threads, indexed from 0 to count-1. Methods
178  * are applied to each thread (serially). Input area is
179  * common to all threads. Output areas are allocated
180  * separately according to a template class.
181  */
182 
184 public:
185  NDBT_ThreadSet(int count);
186  ~NDBT_ThreadSet();
187 
188  inline int get_count() const {
189  return m_count;
190  }
191  inline NDBT_Thread& get_thread(int n) {
192  assert(n < m_count && m_thread[n] != 0);
193  return *m_thread[n];
194  }
195 
196  // tell each thread to start running
197  void start();
198  // wait for each thread to stop
199  void stop();
200  // tell each thread to exit
201  void exit();
202  // collect each thread after exit
203  void join();
204 
205  // set function to run in each thread
206  void set_func(NDBT_ThreadFunc* func);
207 
208  // set input area (same instance in each thread)
209  void set_input(const void* input);
210 
211  // set output areas
212  template <class T> inline void set_output() {
213  for (int n = 0; n < m_count; n++) {
214  NDBT_Thread& thr = *m_thread[n];
215  thr.set_output<T>();
216  }
217  }
218  void delete_output();
219 
220  // thread-specific Ndb objects
221  int connect(class Ndb_cluster_connection*, const char* db = "TEST_DB");
222  void disconnect();
223 
224  int get_err() const;
225 
226 private:
227  int m_count;
228  NDBT_Thread** m_thread;
229 };
230 
231 #endif