MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
thread.c
1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  * Thread management for memcached.
4  */
5 #include "config.h"
6 #include "memcached.h"
7 #include <assert.h>
8 #include <stdio.h>
9 #include <errno.h>
10 #include <stdlib.h>
11 #include <errno.h>
12 #include <string.h>
13 #include <stdint.h>
14 #include <signal.h>
15 #include <pthread.h>
16 #include <fcntl.h>
17 
18 #define ITEMS_PER_ALLOC 64
19 
20 static char devnull[8192];
21 extern volatile sig_atomic_t memcached_shutdown;
22 
23 /* An item in the connection queue. */
24 typedef struct conn_queue_item CQ_ITEM;
26  SOCKET sfd;
27  STATE_FUNC init_state;
28  int event_flags;
29  int read_buffer_size;
30  enum network_transport transport;
31  CQ_ITEM *next;
32 };
33 
34 /* A connection queue. */
35 typedef struct conn_queue CQ;
36 struct conn_queue {
37  CQ_ITEM *head;
38  CQ_ITEM *tail;
39  pthread_mutex_t lock;
40  pthread_cond_t cond;
41 };
42 
43 /* Connection lock around accepting new connections */
44 pthread_mutex_t conn_lock = PTHREAD_MUTEX_INITIALIZER;
45 
46 /* Lock for global stats */
47 static pthread_mutex_t stats_lock;
48 
49 /* Free list of CQ_ITEM structs */
50 static CQ_ITEM *cqi_freelist;
51 static pthread_mutex_t cqi_freelist_lock;
52 
53 static LIBEVENT_THREAD dispatcher_thread;
54 
55 /*
56  * Each libevent instance has a wakeup pipe, which other threads
57  * can use to signal that they've put a new connection on its queue.
58  */
59 static int nthreads;
60 static LIBEVENT_THREAD *threads;
61 static pthread_t *thread_ids;
62 LIBEVENT_THREAD *tap_thread;
63 
64 /*
65  * Number of worker threads that have finished setting themselves up.
66  */
67 static int init_count = 0;
68 static pthread_mutex_t init_lock;
69 static pthread_cond_t init_cond;
70 
71 
72 static void thread_libevent_process(int fd, short which, void *arg);
73 static void libevent_tap_process(int fd, short which, void *arg);
74 
75 /*
76  * Initializes a connection queue.
77  */
78 static void cq_init(CQ *cq) {
79  pthread_mutex_init(&cq->lock, NULL);
80  pthread_cond_init(&cq->cond, NULL);
81  cq->head = NULL;
82  cq->tail = NULL;
83 }
84 
85 /*
86  * Looks for an item on a connection queue, but doesn't block if there isn't
87  * one.
88  * Returns the item, or NULL if no item is available
89  */
90 static CQ_ITEM *cq_pop(CQ *cq) {
91  CQ_ITEM *item;
92 
93  pthread_mutex_lock(&cq->lock);
94  item = cq->head;
95  if (NULL != item) {
96  cq->head = item->next;
97  if (NULL == cq->head)
98  cq->tail = NULL;
99  }
100  pthread_mutex_unlock(&cq->lock);
101 
102  return item;
103 }
104 
105 /*
106  * Adds an item to a connection queue.
107  */
108 static void cq_push(CQ *cq, CQ_ITEM *item) {
109  item->next = NULL;
110 
111  pthread_mutex_lock(&cq->lock);
112  if (NULL == cq->tail)
113  cq->head = item;
114  else
115  cq->tail->next = item;
116  cq->tail = item;
117  pthread_cond_signal(&cq->cond);
118  pthread_mutex_unlock(&cq->lock);
119 }
120 
121 /*
122  * Returns a fresh connection queue item.
123  */
124 static CQ_ITEM *cqi_new(void) {
125  CQ_ITEM *item = NULL;
126  pthread_mutex_lock(&cqi_freelist_lock);
127  if (cqi_freelist) {
128  item = cqi_freelist;
129  cqi_freelist = item->next;
130  }
131  pthread_mutex_unlock(&cqi_freelist_lock);
132 
133  if (NULL == item) {
134  int i;
135 
136  /* Allocate a bunch of items at once to reduce fragmentation */
137  item = malloc(sizeof(CQ_ITEM) * ITEMS_PER_ALLOC);
138  if (NULL == item)
139  return NULL;
140 
141  /*
142  * Link together all the new items except the first one
143  * (which we'll return to the caller) for placement on
144  * the freelist.
145  */
146  for (i = 2; i < ITEMS_PER_ALLOC; i++)
147  item[i - 1].next = &item[i];
148 
149  pthread_mutex_lock(&cqi_freelist_lock);
150  item[ITEMS_PER_ALLOC - 1].next = cqi_freelist;
151  cqi_freelist = &item[1];
152  pthread_mutex_unlock(&cqi_freelist_lock);
153  }
154 
155  return item;
156 }
157 
158 
159 /*
160  * Frees a connection queue item (adds it to the freelist.)
161  */
162 static void cqi_free(CQ_ITEM *item) {
163  pthread_mutex_lock(&cqi_freelist_lock);
164  item->next = cqi_freelist;
165  cqi_freelist = item;
166  pthread_mutex_unlock(&cqi_freelist_lock);
167 }
168 
169 
170 /*
171  * Creates a worker thread.
172  */
173 static void create_worker(void *(*func)(void *), void *arg, pthread_t *id) {
174  pthread_attr_t attr;
175  int ret;
176 
177  pthread_attr_init(&attr);
178 
179  if ((ret = pthread_create(id, &attr, func, arg)) != 0) {
180  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
181  "Can't create thread: %s\n",
182  strerror(ret));
183  exit(1);
184  }
185 }
186 
187 /****************************** LIBEVENT THREADS *****************************/
188 
189 bool create_notification_pipe(LIBEVENT_THREAD *me)
190 {
191  if (evutil_socketpair(SOCKETPAIR_AF, SOCK_STREAM, 0,
192  (void*)me->notify) == SOCKET_ERROR) {
193  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
194  "Can't create notify pipe: %s",
195  strerror(errno));
196  return false;
197  }
198 
199  for (int j = 0; j < 2; ++j) {
200  int flags = 1;
201  setsockopt(me->notify[j], IPPROTO_TCP,
202  TCP_NODELAY, (void *)&flags, sizeof(flags));
203  setsockopt(me->notify[j], SOL_SOCKET,
204  SO_REUSEADDR, (void *)&flags, sizeof(flags));
205 
206 
207  if (evutil_make_socket_nonblocking(me->notify[j]) == -1) {
208  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
209  "Failed to enable non-blocking: %s",
210  strerror(errno));
211  return false;
212  }
213  }
214  return true;
215 }
216 
217 static void setup_dispatcher(struct event_base *main_base,
218  void (*dispatcher_callback)(int, short, void *))
219 {
220  memset(&dispatcher_thread, 0, sizeof(dispatcher_thread));
221  dispatcher_thread.type = DISPATCHER;
222  dispatcher_thread.base = main_base;
223  dispatcher_thread.thread_id = pthread_self();
224  if (!create_notification_pipe(&dispatcher_thread)) {
225  exit(1);
226  }
227  /* Listen for notifications from other threads */
228  event_set(&dispatcher_thread.notify_event, dispatcher_thread.notify[0],
229  EV_READ | EV_PERSIST, dispatcher_callback, &dispatcher_callback);
230  event_base_set(dispatcher_thread.base, &dispatcher_thread.notify_event);
231 
232  if (event_add(&dispatcher_thread.notify_event, 0) == -1) {
233  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
234  "Can't monitor libevent notify pipe\n");
235  exit(1);
236  }
237 }
238 
239 /*
240  * Set up a thread's information.
241  */
242 static void setup_thread(LIBEVENT_THREAD *me, bool tap) {
243  me->type = tap ? TAP : GENERAL;
244  me->base = event_init();
245  if (! me->base) {
246  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
247  "Can't allocate event base\n");
248  exit(1);
249  }
250 
251  /* Listen for notifications from other threads */
252  event_set(&me->notify_event, me->notify[0],
253  EV_READ | EV_PERSIST,
254  tap ? libevent_tap_process : thread_libevent_process, me);
255  event_base_set(me->base, &me->notify_event);
256 
257  if (event_add(&me->notify_event, 0) == -1) {
258  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
259  "Can't monitor libevent notify pipe\n");
260  exit(1);
261  }
262 
263  if (!tap) {
264  me->new_conn_queue = malloc(sizeof(struct conn_queue));
265  if (me->new_conn_queue == NULL) {
266  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
267  "Failed to allocate memory for connection queue");
268  exit(EXIT_FAILURE);
269  }
270  cq_init(me->new_conn_queue);
271  }
272 
273  if ((pthread_mutex_init(&me->mutex, NULL) != 0)) {
274  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
275  "Failed to initialize mutex: %s\n",
276  strerror(errno));
277  exit(EXIT_FAILURE);
278  }
279 
280  me->suffix_cache = cache_create("suffix", SUFFIX_SIZE, sizeof(char*),
281  NULL, NULL);
282  if (me->suffix_cache == NULL) {
283  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
284  "Failed to create suffix cache\n");
285  exit(EXIT_FAILURE);
286  }
287 }
288 
289 /*
290  * Worker thread: main event loop
291  */
292 static void *worker_libevent(void *arg) {
293  LIBEVENT_THREAD *me = arg;
294 
295  /* Any per-thread setup can happen here; thread_init() will block until
296  * all threads have finished initializing.
297  */
298 
299  pthread_mutex_lock(&init_lock);
300  init_count++;
301  pthread_cond_signal(&init_cond);
302  pthread_mutex_unlock(&init_lock);
303 
304  event_base_loop(me->base, 0);
305  return NULL;
306 }
307 
308 int number_of_pending(conn *c, conn *list) {
309  int rv = 0;
310  for (; list; list = list->next) {
311  if (list == c) {
312  rv ++;
313  }
314  }
315  return rv;
316 }
317 
318 /*
319  * Processes an incoming "handle a new connection" item. This is called when
320  * input arrives on the libevent wakeup pipe.
321  */
322 static void thread_libevent_process(int fd, short which, void *arg) {
323  LIBEVENT_THREAD *me = arg;
324  assert(me->type == GENERAL);
325  CQ_ITEM *item;
326 
327  if (recv(fd, devnull, sizeof(devnull), 0) == -1) {
328  if (settings.verbose > 0) {
329  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
330  "Can't read from libevent pipe: %s\n",
331  strerror(errno));
332  }
333  }
334 
335  if (memcached_shutdown) {
336  event_base_loopbreak(me->base);
337  return ;
338  }
339 
340  while ((item = cq_pop(me->new_conn_queue)) != NULL) {
341  conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
342  item->read_buffer_size, item->transport, me->base,
343  NULL);
344  if (c == NULL) {
345  if (IS_UDP(item->transport)) {
346  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
347  "Can't listen for events on UDP socket\n");
348  exit(1);
349  } else {
350  if (settings.verbose > 0) {
351  settings.extensions.logger->log(EXTENSION_LOG_INFO, NULL,
352  "Can't listen for events on fd %d\n",
353  item->sfd);
354  }
355  closesocket(item->sfd);
356  }
357  } else {
358  assert(c->thread == NULL);
359  c->thread = me;
360  }
361  cqi_free(item);
362  }
363 
364  pthread_mutex_lock(&me->mutex);
365  conn* pending = me->pending_io;
366  me->pending_io = NULL;
367  pthread_mutex_unlock(&me->mutex);
368  while (pending != NULL) {
369  conn *c = pending;
370  assert(me == c->thread);
371  pending = pending->next;
372  c->next = NULL;
373  register_event(c, 0);
374  /*
375  * We don't want the thread to keep on serving all of the data
376  * from the context of the notification pipe, so just let it
377  * run one time to set up the correct mask in libevent
378  */
379  c->nevents = 1;
380  /* c->nevents = settings.reqs_per_event; */
381  while (c->state(c)) {
382  /* do task */
383  }
384  }
385 }
386 
387 extern volatile rel_time_t current_time;
388 
389 bool has_cycle(conn *c) {
390  if (!c) {
391  return false;
392  }
393  conn *slowNode, *fastNode1, *fastNode2;
394  slowNode = fastNode1 = fastNode2 = c;
395  while (slowNode && (fastNode1 = fastNode2->next) && (fastNode2 = fastNode1->next)) {
396  if (slowNode == fastNode1 || slowNode == fastNode2) {
397  return true;
398  }
399  slowNode = slowNode->next;
400  }
401  return false;
402 }
403 
404 bool list_contains(conn *haystack, conn *needle) {
405  for (; haystack; haystack = haystack -> next) {
406  if (needle == haystack) {
407  return true;
408  }
409  }
410  return false;
411 }
412 
413 conn* list_remove(conn *haystack, conn *needle) {
414  if (!haystack) {
415  return NULL;
416  }
417 
418  if (haystack == needle) {
419  conn *rv = needle->next;
420  needle->next = NULL;
421  return rv;
422  }
423 
424  haystack->next = list_remove(haystack->next, needle);
425 
426  return haystack;
427 }
428 
429 size_t list_to_array(conn **dest, size_t max_items, conn **l) {
430  size_t n_items = 0;
431  for (; *l && n_items < max_items - 1; ++n_items) {
432  dest[n_items] = *l;
433  *l = dest[n_items]->next;
434  dest[n_items]->next = NULL;
435  dest[n_items]->list_state |= LIST_STATE_PROCESSING;
436  }
437  return n_items;
438 }
439 
440 void enlist_conn(conn *c, conn **list) {
441  LIBEVENT_THREAD *thr = c->thread;
442  assert(list == &thr->pending_io || list == &thr->pending_close);
443  if ((c->list_state & LIST_STATE_PROCESSING) == 0) {
444  assert(!list_contains(thr->pending_close, c));
445  assert(!list_contains(thr->pending_io, c));
446  assert(c->next == NULL);
447  c->next = *list;
448  *list = c;
449  assert(list_contains(*list, c));
450  assert(!has_cycle(*list));
451  } else {
452  c->list_state |= (list == &thr->pending_io ?
453  LIST_STATE_REQ_PENDING_IO :
454  LIST_STATE_REQ_PENDING_CLOSE);
455  }
456 }
457 
458 void finalize_list(conn **list, size_t items) {
459  for (size_t i = 0; i < items; i++) {
460  list[i]->list_state &= ~LIST_STATE_PROCESSING;
461  if (list[i]->sfd != INVALID_SOCKET) {
462  if (list[i]->list_state & LIST_STATE_REQ_PENDING_IO) {
463  enlist_conn(list[i], &list[i]->thread->pending_io);
464  } else if (list[i]->list_state & LIST_STATE_REQ_PENDING_CLOSE) {
465  enlist_conn(list[i], &list[i]->thread->pending_close);
466  }
467  }
468  list[i]->list_state = 0;
469  }
470 }
471 
472 
473 static void libevent_tap_process(int fd, short which, void *arg) {
474  LIBEVENT_THREAD *me = arg;
475  assert(me->type == TAP);
476 
477  if (recv(fd, devnull, sizeof(devnull), 0) == -1) {
478  if (settings.verbose > 0) {
479  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
480  "Can't read from libevent pipe: %s\n",
481  strerror(errno));
482  }
483  }
484 
485  if (memcached_shutdown) {
486  event_base_loopbreak(me->base);
487  return ;
488  }
489 
490  // Do we have pending closes?
491  const size_t max_items = 256;
492  LOCK_THREAD(me);
493  conn *pending_close[max_items];
494  size_t n_pending_close = 0;
495 
496  if (me->pending_close && me->last_checked != current_time) {
497  assert(!has_cycle(me->pending_close));
498  me->last_checked = current_time;
499 
500  n_pending_close = list_to_array(pending_close, max_items,
501  &me->pending_close);
502  }
503 
504  // Now copy the pending IO buffer and run them...
505  conn *pending_io[max_items];
506  size_t n_items = list_to_array(pending_io, max_items, &me->pending_io);
507 
508  UNLOCK_THREAD(me);
509  for (size_t i = 0; i < n_items; ++i) {
510  conn *c = pending_io[i];
511 
512  assert(c->thread == me);
513 
514  LOCK_THREAD(c->thread);
515  assert(me == c->thread);
516  settings.extensions.logger->log(EXTENSION_LOG_DEBUG, NULL,
517  "Processing tap pending_io for %d\n", c->sfd);
518 
519  UNLOCK_THREAD(me);
520  register_event(c, NULL);
521  /*
522  * We don't want the thread to keep on serving all of the data
523  * from the context of the notification pipe, so just let it
524  * run one time to set up the correct mask in libevent
525  */
526  c->nevents = 1;
527  c->which = EV_WRITE;
528  while (c->state(c)) {
529  /* do task */
530  }
531  }
532 
533  /* Close any connections pending close */
534  if (n_pending_close > 0) {
535  for (size_t i = 0; i < n_pending_close; ++i) {
536  conn *ce = pending_close[i];
537  if (ce->refcount == 1) {
538  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
539  "OK, time to nuke: %p\n",
540  (void*)ce);
541  assert(ce->next == NULL);
542  conn_close(ce);
543  } else {
544  LOCK_THREAD(me);
545  enlist_conn(ce, &me->pending_close);
546  UNLOCK_THREAD(me);
547  }
548  }
549  }
550 
551  LOCK_THREAD(me);
552  finalize_list(pending_io, n_items);
553  finalize_list(pending_close, n_pending_close);
554  UNLOCK_THREAD(me);
555 }
556 
557 static bool is_thread_me(LIBEVENT_THREAD *thr) {
558 #ifdef __WIN32__
559  pthread_t tid = pthread_self();
560  return(tid.p == thr->thread_id.p && tid.x == thr->thread_id.x);
561 #else
562  return pthread_self() == thr->thread_id;
563 #endif
564 }
565 
566 void notify_io_complete(const void *cookie, ENGINE_ERROR_CODE status)
567 {
568  if (cookie == NULL) {
569  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
570  "notify_io_complete called without a valid cookie (status %x)\n",
571  status);
572  return ;
573  }
574 
575  struct conn *conn = (struct conn *)cookie;
576 
577  settings.extensions.logger->log(EXTENSION_LOG_DEBUG, NULL,
578  "Got notify from %d, status %x\n",
579  conn->sfd, status);
580 
581  /*
582  ** TROND:
583  ** I changed the logic for the tap connections so that the core
584  ** issues the ON_DISCONNECT call to the engine instead of trying
585  ** to close the connection. Then it let's the engine have a grace
586  ** period to call notify_io_complete if not it will go ahead and
587  ** kill it.
588  **
589  */
590  if (status == ENGINE_DISCONNECT && conn->thread == tap_thread) {
591  LOCK_THREAD(conn->thread);
592  if (conn->sfd != INVALID_SOCKET) {
593  unregister_event(conn);
594  safe_close(conn->sfd);
595  conn->sfd = INVALID_SOCKET;
596  }
597 
598  settings.extensions.logger->log(EXTENSION_LOG_DEBUG, NULL,
599  "Immediate close of %p\n",
600  conn);
601  conn_set_state(conn, conn_immediate_close);
602 
603  if (!is_thread_me(conn->thread)) {
604  /* kick the thread in the butt */
605  notify_thread(conn->thread);
606  }
607 
608  UNLOCK_THREAD(conn->thread);
609  return;
610  }
611 
612  /*
613  ** There may be a race condition between the engine calling this
614  ** function and the core closing the connection.
615  ** Let's lock the connection structure (this might not be the
616  ** correct one) and re-evaluate.
617  */
618  LIBEVENT_THREAD *thr = conn->thread;
619  if (thr == NULL || (conn->state == conn_closing ||
620  conn->state == conn_pending_close ||
621  conn->state == conn_immediate_close)) {
622  return;
623  }
624 
625  int notify = 0;
626 
627  LOCK_THREAD(thr);
628  if (thr != conn->thread || !conn->ewouldblock) {
629  // Ignore
630  UNLOCK_THREAD(thr);
631  return;
632  }
633 
634  conn->aiostat = status;
635 
636  /* Move the connection to the closing state if the engine
637  * wants it to be disconnected
638  */
639  if (status == ENGINE_DISCONNECT) {
640  conn->state = conn_closing;
641  notify = 1;
642  thr->pending_io = list_remove(thr->pending_io, conn);
643  if (number_of_pending(conn, thr->pending_close) == 0) {
644  enlist_conn(conn, &thr->pending_close);
645  }
646  } else {
647  if (number_of_pending(conn, thr->pending_io) +
648  number_of_pending(conn, thr->pending_close) == 0) {
649  if (thr->pending_io == NULL) {
650  notify = 1;
651  }
652  enlist_conn(conn, &thr->pending_io);
653  }
654  }
655  UNLOCK_THREAD(thr);
656 
657  /* kick the thread in the butt */
658  if (notify) {
659  notify_thread(thr);
660  }
661 }
662 
663 /* Which thread we assigned a connection to most recently. */
664 static int last_thread = -1;
665 
666 /*
667  * Dispatches a new connection to another thread. This is only ever called
668  * from the main thread, either during initialization (for UDP) or because
669  * of an incoming connection.
670  */
671 void dispatch_conn_new(SOCKET sfd, STATE_FUNC init_state, int event_flags,
672  int read_buffer_size, enum network_transport transport) {
673  CQ_ITEM *item = cqi_new();
674  int tid = (last_thread + 1) % settings.num_threads;
675 
676  LIBEVENT_THREAD *thread = threads + tid;
677 
678  last_thread = tid;
679 
680  item->sfd = sfd;
681  item->init_state = init_state;
682  item->event_flags = event_flags;
683  item->read_buffer_size = read_buffer_size;
684  item->transport = transport;
685 
686  cq_push(thread->new_conn_queue, item);
687 
688  MEMCACHED_CONN_DISPATCH(sfd, (uintptr_t)thread->thread_id);
689  notify_thread(thread);
690 }
691 
692 /*
693  * Returns true if this is the thread that listens for new TCP connections.
694  */
695 int is_listen_thread() {
696 #ifdef __WIN32__
697  pthread_t tid = pthread_self();
698  return(tid.p == dispatcher_thread.thread_id.p && tid.x == dispatcher_thread.thread_id.x);
699 #else
700  return pthread_self() == dispatcher_thread.thread_id;
701 #endif
702 }
703 
704 void notify_dispatcher(void) {
705  notify_thread(&dispatcher_thread);
706 }
707 
708 /******************************* GLOBAL STATS ******************************/
709 
710 void STATS_LOCK() {
711  pthread_mutex_lock(&stats_lock);
712 }
713 
714 void STATS_UNLOCK() {
715  pthread_mutex_unlock(&stats_lock);
716 }
717 
718 void threadlocal_stats_clear(struct thread_stats *stats) {
719  stats->cmd_get = 0;
720  stats->get_misses = 0;
721  stats->delete_misses = 0;
722  stats->incr_misses = 0;
723  stats->decr_misses = 0;
724  stats->incr_hits = 0;
725  stats->decr_hits = 0;
726  stats->cas_misses = 0;
727  stats->bytes_written = 0;
728  stats->bytes_read = 0;
729  stats->cmd_flush = 0;
730  stats->conn_yields = 0;
731  stats->auth_cmds = 0;
732  stats->auth_errors = 0;
733 
734  memset(stats->slab_stats, 0,
735  sizeof(struct slab_stats) * MAX_NUMBER_OF_SLAB_CLASSES);
736 }
737 
738 void threadlocal_stats_reset(struct thread_stats *thread_stats) {
739  int ii;
740  for (ii = 0; ii < settings.num_threads; ++ii) {
741  pthread_mutex_lock(&thread_stats[ii].mutex);
742  threadlocal_stats_clear(&thread_stats[ii]);
743  pthread_mutex_unlock(&thread_stats[ii].mutex);
744  }
745 }
746 
747 void threadlocal_stats_aggregate(struct thread_stats *thread_stats, struct thread_stats *stats) {
748  int ii, sid;
749  for (ii = 0; ii < settings.num_threads; ++ii) {
750  pthread_mutex_lock(&thread_stats[ii].mutex);
751 
752  stats->cmd_get += thread_stats[ii].cmd_get;
753  stats->get_misses += thread_stats[ii].get_misses;
754  stats->delete_misses += thread_stats[ii].delete_misses;
755  stats->decr_misses += thread_stats[ii].decr_misses;
756  stats->incr_misses += thread_stats[ii].incr_misses;
757  stats->decr_hits += thread_stats[ii].decr_hits;
758  stats->incr_hits += thread_stats[ii].incr_hits;
759  stats->cas_misses += thread_stats[ii].cas_misses;
760  stats->bytes_read += thread_stats[ii].bytes_read;
761  stats->bytes_written += thread_stats[ii].bytes_written;
762  stats->cmd_flush += thread_stats[ii].cmd_flush;
763  stats->conn_yields += thread_stats[ii].conn_yields;
764  stats->auth_cmds += thread_stats[ii].auth_cmds;
765  stats->auth_errors += thread_stats[ii].auth_errors;
766 
767  for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
768  stats->slab_stats[sid].cmd_set +=
769  thread_stats[ii].slab_stats[sid].cmd_set;
770  stats->slab_stats[sid].get_hits +=
771  thread_stats[ii].slab_stats[sid].get_hits;
772  stats->slab_stats[sid].delete_hits +=
773  thread_stats[ii].slab_stats[sid].delete_hits;
774  stats->slab_stats[sid].cas_hits +=
775  thread_stats[ii].slab_stats[sid].cas_hits;
776  stats->slab_stats[sid].cas_badval +=
777  thread_stats[ii].slab_stats[sid].cas_badval;
778  }
779 
780  pthread_mutex_unlock(&thread_stats[ii].mutex);
781  }
782 }
783 
784 void slab_stats_aggregate(struct thread_stats *stats, struct slab_stats *out) {
785  int sid;
786 
787  out->cmd_set = 0;
788  out->get_hits = 0;
789  out->delete_hits = 0;
790  out->cas_hits = 0;
791  out->cas_badval = 0;
792 
793  for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
794  out->cmd_set += stats->slab_stats[sid].cmd_set;
795  out->get_hits += stats->slab_stats[sid].get_hits;
796  out->delete_hits += stats->slab_stats[sid].delete_hits;
797  out->cas_hits += stats->slab_stats[sid].cas_hits;
798  out->cas_badval += stats->slab_stats[sid].cas_badval;
799  }
800 }
801 
802 /*
803  * Initializes the thread subsystem, creating various worker threads.
804  *
805  * nthreads Number of worker event handler threads to spawn
806  * main_base Event base for main thread
807  */
808 void thread_init(int nthr, struct event_base *main_base,
809  void (*dispatcher_callback)(int, short, void *)) {
810  int i;
811  nthreads = nthr + 1;
812 
813  pthread_mutex_init(&stats_lock, NULL);
814  pthread_mutex_init(&init_lock, NULL);
815  pthread_cond_init(&init_cond, NULL);
816 
817  pthread_mutex_init(&cqi_freelist_lock, NULL);
818  cqi_freelist = NULL;
819 
820  threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
821  if (! threads) {
822  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
823  "Can't allocate thread descriptors: %s",
824  strerror(errno));
825  exit(1);
826  }
827  thread_ids = calloc(nthreads, sizeof(pthread_t));
828  if (! thread_ids) {
829  perror("Can't allocate thread descriptors");
830  exit(1);
831  }
832 
833  setup_dispatcher(main_base, dispatcher_callback);
834 
835  for (i = 0; i < nthreads; i++) {
836  if (!create_notification_pipe(&threads[i])) {
837  exit(1);
838  }
839  threads[i].index = i;
840 
841  setup_thread(&threads[i], i == (nthreads - 1));
842  }
843 
844  /* Create threads after we've done all the libevent setup. */
845  for (i = 0; i < nthreads; i++) {
846  create_worker(worker_libevent, &threads[i], &thread_ids[i]);
847  threads[i].thread_id = thread_ids[i];
848  }
849 
850  tap_thread = &threads[nthreads - 1];
851 
852  /* Wait for all the threads to set themselves up before returning. */
853  pthread_mutex_lock(&init_lock);
854  while (init_count < nthreads) {
855  pthread_cond_wait(&init_cond, &init_lock);
856  }
857  pthread_mutex_unlock(&init_lock);
858 }
859 
860 void threads_shutdown(void)
861 {
862  for (int ii = 0; ii < nthreads; ++ii) {
863  notify_thread(&threads[ii]);
864  pthread_join(thread_ids[ii], NULL);
865  }
866  for (int ii = 0; ii < nthreads; ++ii) {
867  safe_close(threads[ii].notify[0]);
868  safe_close(threads[ii].notify[1]);
869  }
870 }
871 
872 void notify_thread(LIBEVENT_THREAD *thread) {
873  if (send(thread->notify[1], "", 1, 0) != 1) {
874  if (thread == tap_thread) {
875  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
876  "Failed to notify TAP thread: %s",
877  strerror(errno));
878  } else {
879  settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
880  "Failed to notify thread: %s",
881  strerror(errno));
882  }
883  }
884 }