Groonga 3.0.9 Source Code Document
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
ngx_kqueue_module.c
Go to the documentation of this file.
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) Nginx, Inc.
5  */
6 
7 
8 #include <ngx_config.h>
9 #include <ngx_core.h>
10 #include <ngx_event.h>
11 
12 
13 typedef struct {
17 
18 
19 static ngx_int_t ngx_kqueue_init(ngx_cycle_t *cycle, ngx_msec_t timer);
20 static void ngx_kqueue_done(ngx_cycle_t *cycle);
21 static ngx_int_t ngx_kqueue_add_event(ngx_event_t *ev, ngx_int_t event,
22  ngx_uint_t flags);
23 static ngx_int_t ngx_kqueue_del_event(ngx_event_t *ev, ngx_int_t event,
24  ngx_uint_t flags);
25 static ngx_int_t ngx_kqueue_set_event(ngx_event_t *ev, ngx_int_t filter,
26  ngx_uint_t flags);
27 static ngx_int_t ngx_kqueue_process_changes(ngx_cycle_t *cycle, ngx_uint_t try);
28 static ngx_int_t ngx_kqueue_process_events(ngx_cycle_t *cycle, ngx_msec_t timer,
29  ngx_uint_t flags);
30 static ngx_inline void ngx_kqueue_dump_event(ngx_log_t *log,
31  struct kevent *kev);
32 
33 static void *ngx_kqueue_create_conf(ngx_cycle_t *cycle);
34 static char *ngx_kqueue_init_conf(ngx_cycle_t *cycle, void *conf);
35 
36 
37 int ngx_kqueue = -1;
38 
39 /*
40  * The "change_list" should be declared as ngx_thread_volatile.
41  * However, the use of the change_list is localized in kqueue functions and
42  * is protected by the mutex so even the "icc -ipo" should not build the code
43  * with the race condition. Thus we avoid the declaration to make a more
44  * readable code.
45  */
46 
47 static struct kevent *change_list, *change_list0, *change_list1;
48 static struct kevent *event_list;
49 static ngx_uint_t max_changes, nchanges, nevents;
50 
51 #if (NGX_THREADS)
52 static ngx_mutex_t *list_mutex;
53 static ngx_mutex_t *kevent_mutex;
54 #endif
55 
56 
57 
58 static ngx_str_t kqueue_name = ngx_string("kqueue");
59 
60 static ngx_command_t ngx_kqueue_commands[] = {
61 
62  { ngx_string("kqueue_changes"),
65  0,
66  offsetof(ngx_kqueue_conf_t, changes),
67  NULL },
68 
69  { ngx_string("kqueue_events"),
72  0,
73  offsetof(ngx_kqueue_conf_t, events),
74  NULL },
75 
77 };
78 
79 
81  &kqueue_name,
82  ngx_kqueue_create_conf, /* create configuration */
83  ngx_kqueue_init_conf, /* init configuration */
84 
85  {
86  ngx_kqueue_add_event, /* add an event */
87  ngx_kqueue_del_event, /* delete an event */
88  ngx_kqueue_add_event, /* enable an event */
89  ngx_kqueue_del_event, /* disable an event */
90  NULL, /* add an connection */
91  NULL, /* delete an connection */
92  ngx_kqueue_process_changes, /* process the changes */
93  ngx_kqueue_process_events, /* process the events */
94  ngx_kqueue_init, /* init the events */
95  ngx_kqueue_done /* done the events */
96  }
97 
98 };
99 
102  &ngx_kqueue_module_ctx, /* module context */
103  ngx_kqueue_commands, /* module directives */
104  NGX_EVENT_MODULE, /* module type */
105  NULL, /* init master */
106  NULL, /* init module */
107  NULL, /* init process */
108  NULL, /* init thread */
109  NULL, /* exit thread */
110  NULL, /* exit process */
111  NULL, /* exit master */
113 };
114 
115 
116 static ngx_int_t
117 ngx_kqueue_init(ngx_cycle_t *cycle, ngx_msec_t timer)
118 {
119  ngx_kqueue_conf_t *kcf;
120  struct timespec ts;
121 #if (NGX_HAVE_TIMER_EVENT)
122  struct kevent kev;
123 #endif
124 
125  kcf = ngx_event_get_conf(cycle->conf_ctx, ngx_kqueue_module);
126 
127  if (ngx_kqueue == -1) {
128  ngx_kqueue = kqueue();
129 
130  if (ngx_kqueue == -1) {
132  "kqueue() failed");
133  return NGX_ERROR;
134  }
135 
136 #if (NGX_THREADS)
137 
138  list_mutex = ngx_mutex_init(cycle->log, 0);
139  if (list_mutex == NULL) {
140  return NGX_ERROR;
141  }
142 
143  kevent_mutex = ngx_mutex_init(cycle->log, 0);
144  if (kevent_mutex == NULL) {
145  return NGX_ERROR;
146  }
147 
148 #endif
149  }
150 
151  if (max_changes < kcf->changes) {
152  if (nchanges) {
153  ts.tv_sec = 0;
154  ts.tv_nsec = 0;
155 
156  if (kevent(ngx_kqueue, change_list, (int) nchanges, NULL, 0, &ts)
157  == -1)
158  {
160  "kevent() failed");
161  return NGX_ERROR;
162  }
163  nchanges = 0;
164  }
165 
166  if (change_list0) {
167  ngx_free(change_list0);
168  }
169 
170  change_list0 = ngx_alloc(kcf->changes * sizeof(struct kevent),
171  cycle->log);
172  if (change_list0 == NULL) {
173  return NGX_ERROR;
174  }
175 
176  if (change_list1) {
177  ngx_free(change_list1);
178  }
179 
180  change_list1 = ngx_alloc(kcf->changes * sizeof(struct kevent),
181  cycle->log);
182  if (change_list1 == NULL) {
183  return NGX_ERROR;
184  }
185 
186  change_list = change_list0;
187  }
188 
189  max_changes = kcf->changes;
190 
191  if (nevents < kcf->events) {
192  if (event_list) {
193  ngx_free(event_list);
194  }
195 
196  event_list = ngx_alloc(kcf->events * sizeof(struct kevent), cycle->log);
197  if (event_list == NULL) {
198  return NGX_ERROR;
199  }
200  }
201 
205 
206 #if (NGX_HAVE_TIMER_EVENT)
207 
208  if (timer) {
209  kev.ident = 0;
210  kev.filter = EVFILT_TIMER;
211  kev.flags = EV_ADD|EV_ENABLE;
212  kev.fflags = 0;
213  kev.data = timer;
214  kev.udata = 0;
215 
216  ts.tv_sec = 0;
217  ts.tv_nsec = 0;
218 
219  if (kevent(ngx_kqueue, &kev, 1, NULL, 0, &ts) == -1) {
221  "kevent(EVFILT_TIMER) failed");
222  return NGX_ERROR;
223  }
224 
226  }
227 
228 #endif
229 
230 #if (NGX_HAVE_CLEAR_EVENT)
232 #else
234 #endif
235 
236 #if (NGX_HAVE_LOWAT_EVENT)
238 #endif
239 
240  nevents = kcf->events;
241 
242  ngx_io = ngx_os_io;
243 
244  ngx_event_actions = ngx_kqueue_module_ctx.actions;
245 
246  return NGX_OK;
247 }
248 
249 
250 static void
251 ngx_kqueue_done(ngx_cycle_t *cycle)
252 {
253  if (close(ngx_kqueue) == -1) {
255  "kqueue close() failed");
256  }
257 
258  ngx_kqueue = -1;
259 
260 #if (NGX_THREADS)
261  ngx_mutex_destroy(kevent_mutex);
262  ngx_mutex_destroy(list_mutex);
263 #endif
264 
265  ngx_free(change_list1);
266  ngx_free(change_list0);
267  ngx_free(event_list);
268 
269  change_list1 = NULL;
270  change_list0 = NULL;
271  change_list = NULL;
272  event_list = NULL;
273  max_changes = 0;
274  nchanges = 0;
275  nevents = 0;
276 }
277 
278 
279 static ngx_int_t
280 ngx_kqueue_add_event(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags)
281 {
282  ngx_int_t rc;
283 #if 0
284  ngx_event_t *e;
285  ngx_connection_t *c;
286 #endif
287 
288  ev->active = 1;
289  ev->disabled = 0;
290  ev->oneshot = (flags & NGX_ONESHOT_EVENT) ? 1 : 0;
291 
292  ngx_mutex_lock(list_mutex);
293 
294 #if 0
295 
296  if (ev->index < nchanges
297  && ((uintptr_t) change_list[ev->index].udata & (uintptr_t) ~1)
298  == (uintptr_t) ev)
299  {
300  if (change_list[ev->index].flags == EV_DISABLE) {
301 
302  /*
303  * if the EV_DISABLE is still not passed to a kernel
304  * we will not pass it
305  */
306 
308  "kevent activated: %d: ft:%i",
309  ngx_event_ident(ev->data), event);
310 
311  if (ev->index < --nchanges) {
312  e = (ngx_event_t *)
313  ((uintptr_t) change_list[nchanges].udata & (uintptr_t) ~1);
314  change_list[ev->index] = change_list[nchanges];
315  e->index = ev->index;
316  }
317 
318  ngx_mutex_unlock(list_mutex);
319 
320  return NGX_OK;
321  }
322 
323  c = ev->data;
324 
325  ngx_log_error(NGX_LOG_ALERT, ev->log, 0,
326  "previous event on #%d were not passed in kernel", c->fd);
327 
328  ngx_mutex_unlock(list_mutex);
329 
330  return NGX_ERROR;
331  }
332 
333 #endif
334 
335  rc = ngx_kqueue_set_event(ev, event, EV_ADD|EV_ENABLE|flags);
336 
337  ngx_mutex_unlock(list_mutex);
338 
339  return rc;
340 }
341 
342 
343 static ngx_int_t
344 ngx_kqueue_del_event(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags)
345 {
346  ngx_int_t rc;
347  ngx_event_t *e;
348 
349  ev->active = 0;
350  ev->disabled = 0;
351 
352  ngx_mutex_lock(list_mutex);
353 
354  if (ev->index < nchanges
355  && ((uintptr_t) change_list[ev->index].udata & (uintptr_t) ~1)
356  == (uintptr_t) ev)
357  {
359  "kevent deleted: %d: ft:%i",
360  ngx_event_ident(ev->data), event);
361 
362  /* if the event is still not passed to a kernel we will not pass it */
363 
364  nchanges--;
365 
366  if (ev->index < nchanges) {
367  e = (ngx_event_t *)
368  ((uintptr_t) change_list[nchanges].udata & (uintptr_t) ~1);
369  change_list[ev->index] = change_list[nchanges];
370  e->index = ev->index;
371  }
372 
373  ngx_mutex_unlock(list_mutex);
374 
375  return NGX_OK;
376  }
377 
378  /*
379  * when the file descriptor is closed the kqueue automatically deletes
380  * its filters so we do not need to delete explicitly the event
381  * before the closing the file descriptor.
382  */
383 
384  if (flags & NGX_CLOSE_EVENT) {
385  ngx_mutex_unlock(list_mutex);
386  return NGX_OK;
387  }
388 
389  if (flags & NGX_DISABLE_EVENT) {
390  ev->disabled = 1;
391 
392  } else {
393  flags |= EV_DELETE;
394  }
395 
396  rc = ngx_kqueue_set_event(ev, event, flags);
397 
398  ngx_mutex_unlock(list_mutex);
399 
400  return rc;
401 }
402 
403 
404 static ngx_int_t
405 ngx_kqueue_set_event(ngx_event_t *ev, ngx_int_t filter, ngx_uint_t flags)
406 {
407  struct kevent *kev;
408  struct timespec ts;
409  ngx_connection_t *c;
410 
411  c = ev->data;
412 
414  "kevent set event: %d: ft:%i fl:%04Xi",
415  c->fd, filter, flags);
416 
417  if (nchanges >= max_changes) {
419  "kqueue change list is filled up");
420 
421  ts.tv_sec = 0;
422  ts.tv_nsec = 0;
423 
424  if (kevent(ngx_kqueue, change_list, (int) nchanges, NULL, 0, &ts)
425  == -1)
426  {
427  ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno, "kevent() failed");
428  return NGX_ERROR;
429  }
430 
431  nchanges = 0;
432  }
433 
434  kev = &change_list[nchanges];
435 
436  kev->ident = c->fd;
437  kev->filter = (short) filter;
438  kev->flags = (u_short) flags;
439  kev->udata = NGX_KQUEUE_UDATA_T ((uintptr_t) ev | ev->instance);
440 
441  if (filter == EVFILT_VNODE) {
442  kev->fflags = NOTE_DELETE|NOTE_WRITE|NOTE_EXTEND
443  |NOTE_ATTRIB|NOTE_RENAME
444 #if (__FreeBSD__ == 4 && __FreeBSD_version >= 430000) \
445  || __FreeBSD_version >= 500018
446  |NOTE_REVOKE
447 #endif
448  ;
449  kev->data = 0;
450 
451  } else {
452 #if (NGX_HAVE_LOWAT_EVENT)
453  if (flags & NGX_LOWAT_EVENT) {
454  kev->fflags = NOTE_LOWAT;
455  kev->data = ev->available;
456 
457  } else {
458  kev->fflags = 0;
459  kev->data = 0;
460  }
461 #else
462  kev->fflags = 0;
463  kev->data = 0;
464 #endif
465  }
466 
467  ev->index = nchanges;
468  nchanges++;
469 
470  if (flags & NGX_FLUSH_EVENT) {
471  ts.tv_sec = 0;
472  ts.tv_nsec = 0;
473 
474  ngx_log_debug0(NGX_LOG_DEBUG_EVENT, ev->log, 0, "kevent flush");
475 
476  if (kevent(ngx_kqueue, change_list, (int) nchanges, NULL, 0, &ts)
477  == -1)
478  {
479  ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno, "kevent() failed");
480  return NGX_ERROR;
481  }
482 
483  nchanges = 0;
484  }
485 
486  return NGX_OK;
487 }
488 
489 
490 static ngx_int_t
491 ngx_kqueue_process_events(ngx_cycle_t *cycle, ngx_msec_t timer,
492  ngx_uint_t flags)
493 {
494  int events, n;
495  ngx_int_t i, instance;
496  ngx_uint_t level;
497  ngx_err_t err;
498  ngx_event_t *ev, **queue;
499  struct timespec ts, *tp;
500 
501  if (ngx_threaded) {
502  if (ngx_kqueue_process_changes(cycle, 0) == NGX_ERROR) {
503  return NGX_ERROR;
504  }
505 
506  n = 0;
507 
508  } else {
509  n = (int) nchanges;
510  nchanges = 0;
511  }
512 
513  if (timer == NGX_TIMER_INFINITE) {
514  tp = NULL;
515 
516  } else {
517 
518  ts.tv_sec = timer / 1000;
519  ts.tv_nsec = (timer % 1000) * 1000000;
520 
521  /*
522  * 64-bit Darwin kernel has the bug: kernel level ts.tv_nsec is
523  * the int32_t while user level ts.tv_nsec is the long (64-bit),
524  * so on the big endian PowerPC all nanoseconds are lost.
525  */
526 
527 #if (NGX_DARWIN_KEVENT_BUG)
528  ts.tv_nsec <<= 32;
529 #endif
530 
531  tp = &ts;
532  }
533 
535  "kevent timer: %M, changes: %d", timer, n);
536 
537  events = kevent(ngx_kqueue, change_list, n, event_list, (int) nevents, tp);
538 
539  err = (events == -1) ? ngx_errno : 0;
540 
541  if (flags & NGX_UPDATE_TIME || ngx_event_timer_alarm) {
542  ngx_time_update();
543  }
544 
546  "kevent events: %d", events);
547 
548  if (err) {
549  if (err == NGX_EINTR) {
550 
551  if (ngx_event_timer_alarm) {
553  return NGX_OK;
554  }
555 
556  level = NGX_LOG_INFO;
557 
558  } else {
559  level = NGX_LOG_ALERT;
560  }
561 
562  ngx_log_error(level, cycle->log, err, "kevent() failed");
563  return NGX_ERROR;
564  }
565 
566  if (events == 0) {
567  if (timer != NGX_TIMER_INFINITE) {
568  return NGX_OK;
569  }
570 
571  ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
572  "kevent() returned no events without timeout");
573  return NGX_ERROR;
574  }
575 
576  ngx_mutex_lock(ngx_posted_events_mutex);
577 
578  for (i = 0; i < events; i++) {
579 
580  ngx_kqueue_dump_event(cycle->log, &event_list[i]);
581 
582  if (event_list[i].flags & EV_ERROR) {
583  ngx_log_error(NGX_LOG_ALERT, cycle->log, event_list[i].data,
584  "kevent() error on %d filter:%d flags:%04Xd",
585  event_list[i].ident, event_list[i].filter,
586  event_list[i].flags);
587  continue;
588  }
589 
590 #if (NGX_HAVE_TIMER_EVENT)
591 
592  if (event_list[i].filter == EVFILT_TIMER) {
593  ngx_time_update();
594  continue;
595  }
596 
597 #endif
598 
599  ev = (ngx_event_t *) event_list[i].udata;
600 
601  switch (event_list[i].filter) {
602 
603  case EVFILT_READ:
604  case EVFILT_WRITE:
605 
606  instance = (uintptr_t) ev & 1;
607  ev = (ngx_event_t *) ((uintptr_t) ev & (uintptr_t) ~1);
608 
609  if (ev->closed || ev->instance != instance) {
610 
611  /*
612  * the stale event from a file descriptor
613  * that was just closed in this iteration
614  */
615 
617  "kevent: stale event %p", ev);
618  continue;
619  }
620 
621  if (ev->log && (ev->log->log_level & NGX_LOG_DEBUG_CONNECTION)) {
622  ngx_kqueue_dump_event(ev->log, &event_list[i]);
623  }
624 
625  if (ev->oneshot) {
626  ev->active = 0;
627  }
628 
629 #if (NGX_THREADS)
630 
631  if ((flags & NGX_POST_THREAD_EVENTS) && !ev->accept) {
632  ev->posted_ready = 1;
633  ev->posted_available = event_list[i].data;
634 
635  if (event_list[i].flags & EV_EOF) {
636  ev->posted_eof = 1;
637  ev->posted_errno = event_list[i].fflags;
638  }
639 
641 
642  continue;
643  }
644 
645 #endif
646 
647  ev->available = event_list[i].data;
648 
649  if (event_list[i].flags & EV_EOF) {
650  ev->pending_eof = 1;
651  ev->kq_errno = event_list[i].fflags;
652  }
653 
654  ev->ready = 1;
655 
656  break;
657 
658  case EVFILT_VNODE:
659  ev->kq_vnode = 1;
660 
661  break;
662 
663  case EVFILT_AIO:
664  ev->complete = 1;
665  ev->ready = 1;
666 
667  break;
668 
669  default:
670  ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
671  "unexpected kevent() filter %d",
672  event_list[i].filter);
673  continue;
674  }
675 
676  if (flags & NGX_POST_EVENTS) {
677  queue = (ngx_event_t **) (ev->accept ? &ngx_posted_accept_events:
679  ngx_locked_post_event(ev, queue);
680 
681  continue;
682  }
683 
684  ev->handler(ev);
685  }
686 
687  ngx_mutex_unlock(ngx_posted_events_mutex);
688 
689  return NGX_OK;
690 }
691 
692 
693 static ngx_int_t
694 ngx_kqueue_process_changes(ngx_cycle_t *cycle, ngx_uint_t try)
695 {
696  int n;
697  ngx_int_t rc;
698  ngx_err_t err;
699  struct timespec ts;
700  struct kevent *changes;
701 
702  ngx_mutex_lock(kevent_mutex);
703 
704  ngx_mutex_lock(list_mutex);
705 
706  if (nchanges == 0) {
707  ngx_mutex_unlock(list_mutex);
708  ngx_mutex_unlock(kevent_mutex);
709  return NGX_OK;
710  }
711 
712  changes = change_list;
713  if (change_list == change_list0) {
714  change_list = change_list1;
715  } else {
716  change_list = change_list0;
717  }
718 
719  n = (int) nchanges;
720  nchanges = 0;
721 
722  ngx_mutex_unlock(list_mutex);
723 
724  ts.tv_sec = 0;
725  ts.tv_nsec = 0;
726 
728  "kevent changes: %d", n);
729 
730  if (kevent(ngx_kqueue, changes, n, NULL, 0, &ts) == -1) {
731  err = ngx_errno;
733  cycle->log, err, "kevent() failed");
734  rc = NGX_ERROR;
735 
736  } else {
737  rc = NGX_OK;
738  }
739 
740  ngx_mutex_unlock(kevent_mutex);
741 
742  return rc;
743 }
744 
745 
746 static ngx_inline void
747 ngx_kqueue_dump_event(ngx_log_t *log, struct kevent *kev)
748 {
750  (kev->ident > 0x8000000 && kev->ident != (unsigned) -1) ?
751  "kevent: %p: ft:%d fl:%04Xd ff:%08Xd d:%d ud:%p":
752  "kevent: %d: ft:%d fl:%04Xd ff:%08Xd d:%d ud:%p",
753  kev->ident, kev->filter,
754  kev->flags, kev->fflags,
755  kev->data, kev->udata);
756 }
757 
758 
759 static void *
760 ngx_kqueue_create_conf(ngx_cycle_t *cycle)
761 {
762  ngx_kqueue_conf_t *kcf;
763 
764  kcf = ngx_palloc(cycle->pool, sizeof(ngx_kqueue_conf_t));
765  if (kcf == NULL) {
766  return NULL;
767  }
768 
769  kcf->changes = NGX_CONF_UNSET;
770  kcf->events = NGX_CONF_UNSET;
771 
772  return kcf;
773 }
774 
775 
776 static char *
777 ngx_kqueue_init_conf(ngx_cycle_t *cycle, void *conf)
778 {
779  ngx_kqueue_conf_t *kcf = conf;
780 
782  ngx_conf_init_uint_value(kcf->events, 512);
783 
784  return NGX_CONF_OK;
785 }