MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
NdbThread.c
1 /*
2  Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 
19 #include <ndb_global.h>
20 #include <NdbThread.h>
21 #include <my_pthread.h>
22 #include <NdbMem.h>
23 #include <NdbMutex.h>
24 #include <NdbCondition.h>
25 
26 #ifdef HAVE_LINUX_SCHEDULING
27 #ifndef _GNU_SOURCE
28 #define _GNU_SOURCE
29 #endif
30 #include <sys/types.h>
31 #include <unistd.h>
32 #include <sched.h>
33 #include <sys/syscall.h>
34 #elif defined HAVE_SOLARIS_AFFINITY
35 #include <sys/types.h>
36 #include <sys/lwp.h>
37 #include <sys/processor.h>
38 #include <sys/procset.h>
39 #endif
40 
41 static int g_min_prio = 0;
42 static int g_max_prio = 0;
43 static int g_prio = 0;
44 
45 static NdbMutex *g_ndb_thread_mutex = 0;
46 static struct NdbCondition * g_ndb_thread_condition = 0;
47 
48 #ifdef NDB_SHM_TRANSPORTER
49 int g_ndb_shm_signum= 0;
50 #endif
51 
52 static int f_high_prio_set = 0;
53 static int f_high_prio_policy;
54 static int f_high_prio_prio;
55 
56 struct NdbThread
57 {
58  volatile int inited;
59  pthread_t thread;
60 #if defined HAVE_SOLARIS_AFFINITY
61  id_t tid;
62 #elif defined HAVE_LINUX_SCHEDULING
63  pid_t tid;
64 #endif
65  char thread_name[16];
66  NDB_THREAD_FUNC * func;
67  void * object;
68 };
69 
70 #ifdef NDB_SHM_TRANSPORTER
71 void NdbThread_set_shm_sigmask(my_bool block)
72 {
73  if (g_ndb_shm_signum)
74  {
75  sigset_t mask;
76  sigemptyset(&mask);
77  sigaddset(&mask, g_ndb_shm_signum);
78  if (block)
79  pthread_sigmask(SIG_BLOCK, &mask, 0);
80  else
81  pthread_sigmask(SIG_UNBLOCK, &mask, 0);
82  }
83  return;
84 }
85 #endif
86 
87 static
88 void
89 settid(struct NdbThread * thr)
90 {
91 #if defined HAVE_SOLARIS_AFFINITY
92  thr->tid = _lwp_self();
93 #elif defined HAVE_LINUX_SCHEDULING
94  thr->tid = syscall(SYS_gettid);
95  if (thr->tid == (pid_t)-1)
96  {
97  /*
98  This extra check is from suggestion by Kristian Nielsen
99  to handle cases when running binaries on LinuxThreads
100  compiled with NPTL threads
101  */
102  thr->tid = getpid();
103  }
104 #endif
105 }
106 
107 int
108 NdbThread_GetTid(struct NdbThread* thr)
109 {
110 #if defined HAVE_SOLARIS_AFFINITY
111  return (int)thr->tid;
112 #elif defined HAVE_LINUX_SCHEDULING
113  return (int)thr->tid;
114 #endif
115  return -1;
116 }
117 
118 static
119 void*
120 ndb_thread_wrapper(void* _ss){
121  my_thread_init();
122  {
123  DBUG_ENTER("ndb_thread_wrapper");
124 #ifdef NDB_SHM_TRANSPORTER
125  NdbThread_set_shm_sigmask(TRUE);
126 #endif
127 
128 #ifdef HAVE_PTHREAD_SIGMASK
129  {
134  sigset_t mask;
135  sigfillset(&mask);
136  pthread_sigmask(SIG_BLOCK, &mask, 0);
137  }
138 #endif
139 
140  {
141  void *ret;
142  struct NdbThread * ss = (struct NdbThread *)_ss;
143  settid(ss);
144  NdbMutex_Lock(g_ndb_thread_mutex);
145  ss->inited = 1;
146  NdbCondition_Signal(g_ndb_thread_condition);
147  NdbMutex_Unlock(g_ndb_thread_mutex);
148  ret= (* ss->func)(ss->object);
149  DBUG_POP();
150  NdbThread_Exit(ret);
151  }
152  /* will never be reached */
153  DBUG_RETURN(0);
154  }
155 }
156 
157 
158 struct NdbThread*
159 NdbThread_CreateObject(const char * name)
160 {
161  struct NdbThread* tmpThread;
162  DBUG_ENTER("NdbThread_Create");
163 
164  tmpThread = (struct NdbThread*)NdbMem_Allocate(sizeof(struct NdbThread));
165  if (tmpThread == NULL)
166  DBUG_RETURN(NULL);
167 
168  bzero(tmpThread, sizeof(* tmpThread));
169  if (name)
170  {
171  strnmov(tmpThread->thread_name, name, sizeof(tmpThread->thread_name));
172  }
173  else
174  {
175  strnmov(tmpThread->thread_name, "main", sizeof(tmpThread->thread_name));
176  }
177 
178 #ifdef HAVE_PTHREAD_SELF
179  tmpThread->thread = pthread_self();
180 #elif defined HAVE_GETPID
181  tmpThread->thread = getpid();
182 #endif
183  settid(tmpThread);
184  tmpThread->inited = 1;
185 
186  return tmpThread;
187 }
188 
189 struct NdbThread*
190 NdbThread_Create(NDB_THREAD_FUNC *p_thread_func,
191  NDB_THREAD_ARG *p_thread_arg,
192  const NDB_THREAD_STACKSIZE _stack_size,
193  const char* p_thread_name,
194  NDB_THREAD_PRIO thread_prio)
195 {
196  struct NdbThread* tmpThread;
197  int result;
198  pthread_attr_t thread_attr;
199  NDB_THREAD_STACKSIZE thread_stack_size;
200 
201  DBUG_ENTER("NdbThread_Create");
202 
203  /* Use default stack size if 0 specified */
204  if (_stack_size == 0)
205  thread_stack_size = 64 * 1024 * SIZEOF_CHARP/4;
206  else
207  thread_stack_size = _stack_size * SIZEOF_CHARP/4;
208 
209  (void)thread_prio; /* remove warning for unused parameter */
210 
211  if (p_thread_func == NULL)
212  DBUG_RETURN(NULL);
213 
214  tmpThread = (struct NdbThread*)NdbMem_Allocate(sizeof(struct NdbThread));
215  if (tmpThread == NULL)
216  DBUG_RETURN(NULL);
217 
218  DBUG_PRINT("info",("thread_name: %s", p_thread_name));
219 
220  strnmov(tmpThread->thread_name,p_thread_name,sizeof(tmpThread->thread_name));
221 
222  pthread_attr_init(&thread_attr);
223 #ifdef PTHREAD_STACK_MIN
224  if (thread_stack_size < PTHREAD_STACK_MIN)
225  thread_stack_size = PTHREAD_STACK_MIN;
226 #endif
227  DBUG_PRINT("info", ("stack_size: %llu", (ulonglong)thread_stack_size));
228  pthread_attr_setstacksize(&thread_attr, thread_stack_size);
229 #ifdef USE_PTHREAD_EXTRAS
230  /* Guard stack overflow with a 2k databuffer */
231  pthread_attr_setguardsize(&thread_attr, 2048);
232 #endif
233 
234 #ifdef PTHREAD_CREATE_JOINABLE /* needed on SCO */
235  pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_JOINABLE);
236 #endif
237  tmpThread->inited = 0;
238  tmpThread->func= p_thread_func;
239  tmpThread->object= p_thread_arg;
240 
241  NdbMutex_Lock(g_ndb_thread_mutex);
242  result = pthread_create(&tmpThread->thread,
243  &thread_attr,
244  ndb_thread_wrapper,
245  tmpThread);
246 
247  pthread_attr_destroy(&thread_attr);
248 
249  if (result != 0)
250  {
251  NdbMem_Free((char *)tmpThread);
252  NdbMutex_Unlock(g_ndb_thread_mutex);
253  return 0;
254  }
255 
256  if (thread_prio == NDB_THREAD_PRIO_HIGH && f_high_prio_set)
257  {
258 #ifdef HAVE_PTHREAD_SETSCHEDPARAM
259  struct sched_param param;
260  bzero(&param, sizeof(param));
261  param.sched_priority = f_high_prio_prio;
262  if (pthread_setschedparam(tmpThread->thread, f_high_prio_policy, &param))
263  perror("pthread_setschedparam failed");
264 #endif
265  }
266 
267  do
268  {
269  NdbCondition_WaitTimeout(g_ndb_thread_condition, g_ndb_thread_mutex, 100);
270  } while (tmpThread->inited == 0);
271 
272  NdbMutex_Unlock(g_ndb_thread_mutex);
273 
274  DBUG_PRINT("exit",("ret: 0x%lx", (long) tmpThread));
275  DBUG_RETURN(tmpThread);
276 }
277 
278 
279 void NdbThread_Destroy(struct NdbThread** p_thread)
280 {
281  DBUG_ENTER("NdbThread_Destroy");
282  if (*p_thread != NULL){
283  DBUG_PRINT("enter",("*p_thread: 0x%lx", (long) *p_thread));
284  free(* p_thread);
285  * p_thread = 0;
286  }
287  DBUG_VOID_RETURN;
288 }
289 
290 
291 int NdbThread_WaitFor(struct NdbThread* p_wait_thread, void** status)
292 {
293  int result;
294 
295  if (p_wait_thread == NULL)
296  return 0;
297 
298  if (p_wait_thread->thread == 0)
299  return 0;
300 
301  result = pthread_join(p_wait_thread->thread, status);
302 
303  return result;
304 }
305 
306 
307 void NdbThread_Exit(void *status)
308 {
309  my_thread_end();
310  pthread_exit(status);
311 }
312 
313 
314 int NdbThread_SetConcurrencyLevel(int level)
315 {
316 #ifdef USE_PTHREAD_EXTRAS
317  return pthread_setconcurrency(level);
318 #else
319  (void)level; /* remove warning for unused parameter */
320  return 0;
321 #endif
322 }
323 
324 static int
325 get_max_prio(int policy)
326 {
327  int max_prio;
328 #ifdef HAVE_SCHED_GET_PRIORITY_MAX
329  max_prio = sched_get_priority_max(policy);
330 #else
331  /*
332  Should normally not be used, on Linux RT-prio is between 1 and 100
333  so choose 90 mostly from Linux point of view
334  */
335  max_prio = 90;
336 #endif
337  return max_prio;
338 }
339 
340 static int
341 get_min_prio(int policy)
342 {
343  int min_prio;
344 #ifdef HAVE_SCHED_GET_PRIORITY_MIN
345  min_prio = sched_get_priority_min(policy);
346 #else
347  /* 1 seems like a natural minimum priority level */
348  min_prio = 1;
349 #endif
350  return min_prio;
351 }
352 
353 static int
354 get_prio(my_bool rt_prio, my_bool high_prio, int policy)
355 {
356  if (!rt_prio)
357  return 0;
358  if (g_prio != 0)
359  return g_prio;
360  g_max_prio = get_max_prio(policy);
361  g_min_prio = get_min_prio(policy);
362  /*
363  We need to distinguish between high and low priority threads. High
364  priority threads are the threads that don't execute the main thread.
365  It's important that these threads are at a higher priority than the
366  main thread since the main thread can execute for a very long time.
367  There are no reasons to put these priorities higher than the lowest
368  priority and the distance between them being two to enable for future
369  extensions where a new priority level is required.
370  */
371  if (high_prio)
372  g_prio = g_min_prio + 3;
373  else
374  g_prio = g_min_prio + 1;
375  if (g_prio < g_min_prio)
376  g_prio = g_min_prio;
377  return g_prio;
378 }
379 
380 int
381 NdbThread_SetScheduler(struct NdbThread* pThread,
382  my_bool rt_prio,
383  my_bool high_prio)
384 {
385  int error_no= 0;
386 #if defined HAVE_LINUX_SCHEDULING
387  int ret, policy, prio;
388  struct sched_param loc_sched_param;
389  if (rt_prio)
390  {
391  policy = SCHED_RR;
392  prio = get_prio(rt_prio, high_prio, policy);
393  }
394  else
395  {
396  policy = SCHED_OTHER;
397  prio = 0;
398  }
399  bzero(&loc_sched_param, sizeof(loc_sched_param));
400  loc_sched_param.sched_priority = prio;
401  ret= sched_setscheduler(pThread->tid, policy, &loc_sched_param);
402  if (ret)
403  error_no= errno;
404 #elif defined HAVE_PTHREAD_SET_SCHEDPARAM
405  /*
406  This variant is POSIX compliant so should be useful on most
407  Operating Systems supporting real-time scheduling.
408  */
409  int ret, policy, prio;
410  struct sched_param loc_sched_param;
411  if (rt_prio)
412  {
413  policy = SCHED_RR;
414  prio = get_prio(rt_prio, high_prio, policy);
415  }
416  else
417  {
418  policy = SCHED_OTHER;
419  prio = 0;
420  }
421  bzero(&loc_sched_param, sizeof(loc_sched_param));
422  loc_sched_param.sched_priority = prio;
423  ret= pthread_setschedparam(pThread->thread, policy, &loc_sched_param);
424  if (ret)
425  error_no= errno;
426 #else
427  error_no = ENOSYS;
428 #endif
429 
430  return error_no;
431 }
432 
433 int
434 NdbThread_LockCPU(struct NdbThread* pThread, Uint32 cpu_id)
435 {
436  int error_no = 0;
437 #if defined HAVE_LINUX_SCHEDULING
438 
439  /*
440  On recent Linux versions the ability to set processor
441  affinity is available through the sched_setaffinity call.
442  In Linux this is possible to do on thread level so we can
443  lock execution thread to one CPU and the rest of the threads
444  to another CPU.
445 
446  By combining Real-time Scheduling and Locking to CPU we can
447  achieve more or less a realtime system for NDB Cluster.
448  */
449  int ret;
450  cpu_set_t cpu_set;
451  CPU_ZERO(&cpu_set);
452  CPU_SET(cpu_id, &cpu_set);
453  ret= sched_setaffinity(pThread->tid, sizeof(cpu_set), &cpu_set);
454  if (ret)
455  error_no = errno;
456 #elif defined HAVE_SOLARIS_AFFINITY
457  /*
458  Solaris have a number of versions to lock threads to CPU's.
459  We'll use the processor_bind interface since we only work
460  with single threads and bind those to CPU's.
461  A bit unclear as whether the id returned by pthread_self
462  is the LWP id.
463  */
464  int ret;
465  ret= processor_bind(P_LWPID, pThread->tid, cpu_id, NULL);
466  if (ret)
467  error_no= errno;
468 #else
469  error_no = ENOSYS;
470 #endif
471  return error_no;
472 }
473 
474 static pthread_key(void*, tls_keys[NDB_THREAD_TLS_MAX]);
475 
476 void *NdbThread_GetTlsKey(NDB_THREAD_TLS key)
477 {
478  return pthread_getspecific(tls_keys[key]);
479 }
480 
481 void NdbThread_SetTlsKey(NDB_THREAD_TLS key, void *value)
482 {
483  pthread_setspecific(tls_keys[key], value);
484 }
485 
486 int
487 NdbThread_Init()
488 {
489  g_ndb_thread_mutex = NdbMutex_Create();
490  g_ndb_thread_condition = NdbCondition_Create();
491  pthread_key_create(&(tls_keys[NDB_THREAD_TLS_JAM]), NULL);
492  pthread_key_create(&(tls_keys[NDB_THREAD_TLS_THREAD]), NULL);
493  return 0;
494 }
495 
496 void
497 NdbThread_End()
498 {
499  if (g_ndb_thread_mutex)
500  {
501  NdbMutex_Destroy(g_ndb_thread_mutex);
502  }
503 
504  if (g_ndb_thread_condition)
505  {
506  NdbCondition_Destroy(g_ndb_thread_condition);
507  }
508 }
509 
510 int
511 NdbThread_SetHighPrioProperties(const char * spec)
512 {
513  char * copy = 0;
514  char * prio = 0;
515  int found = 0;
516 
517  if (spec == 0)
518  {
519  f_high_prio_set = 0;
520  return 0;
521  }
522 
526  while ((* spec == ' ') || (*spec == '\t'))
527  spec++;
528 
529  copy = strdup(spec);
530  if (copy == 0)
531  return -1;
532 
536  prio = strchr(copy, ',');
537  if (prio)
538  {
539  * prio = 0;
540  prio++;
541  }
542 
543  if (prio && strchr(prio, ','))
544  {
548  free(copy);
549  return -1;
550  }
551 
552 #ifdef HAVE_PTHREAD_SETSCHEDPARAM
553  found = 0;
554 #ifdef SCHED_FIFO
555  if (strcmp("fifo", copy) == 0)
556  {
557  found = 1;
558  f_high_prio_policy = SCHED_FIFO;
559  }
560 #endif
561 #ifdef SCHED_RR
562  if (strcmp("rr", copy) == 0)
563  {
564  found = 1;
565  f_high_prio_policy = SCHED_RR;
566  }
567 #endif
568  if (!found)
569  {
570  free(copy);
571  return -1;
572  }
573 
574  f_high_prio_prio = 50;
575  if (prio)
576  {
577  char * endptr = 0;
578  long p = strtol(prio, &endptr, 10);
579  if (prio == endptr)
580  {
581  free(copy);
582  return -1;
583  }
584  f_high_prio_prio = (int)p;
585  }
586  f_high_prio_set = 1;
587  free(copy);
588  return 0;
589 #else
590  return 0;
591 #endif
592 }