Groonga 3.0.9 Source Code Document
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
ngx_eventport_module.c
Go to the documentation of this file.
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) Nginx, Inc.
5  */
6 
7 
8 #include <ngx_config.h>
9 #include <ngx_core.h>
10 #include <ngx_event.h>
11 
12 
13 #if (NGX_TEST_BUILD_EVENTPORT)
14 
15 #define ushort_t u_short
16 #define uint_t u_int
17 
18 #ifndef CLOCK_REALTIME
19 #define CLOCK_REALTIME 0
20 typedef int clockid_t;
21 typedef void * timer_t;
22 #endif
23 
24 /* Solaris declarations */
25 
26 #define PORT_SOURCE_AIO 1
27 #define PORT_SOURCE_TIMER 2
28 #define PORT_SOURCE_USER 3
29 #define PORT_SOURCE_FD 4
30 #define PORT_SOURCE_ALERT 5
31 #define PORT_SOURCE_MQ 6
32 
33 #ifndef ETIME
34 #define ETIME 64
35 #endif
36 
37 #define SIGEV_PORT 4
38 
39 typedef struct {
40  int portev_events; /* event data is source specific */
41  ushort_t portev_source; /* event source */
42  ushort_t portev_pad; /* port internal use */
43  uintptr_t portev_object; /* source specific object */
44  void *portev_user; /* user cookie */
45 } port_event_t;
46 
47 typedef struct port_notify {
48  int portnfy_port; /* bind request(s) to port */
49  void *portnfy_user; /* user defined */
50 } port_notify_t;
51 
52 #if (__FreeBSD_version < 700005)
53 
54 typedef struct itimerspec { /* definition per POSIX.4 */
55  struct timespec it_interval;/* timer period */
56  struct timespec it_value; /* timer expiration */
57 } itimerspec_t;
58 
59 #endif
60 
61 int port_create(void);
62 
63 int port_create(void)
64 {
65  return -1;
66 }
67 
68 
69 int port_associate(int port, int source, uintptr_t object, int events,
70  void *user);
71 
72 int port_associate(int port, int source, uintptr_t object, int events,
73  void *user)
74 {
75  return -1;
76 }
77 
78 
79 int port_dissociate(int port, int source, uintptr_t object);
80 
81 int port_dissociate(int port, int source, uintptr_t object)
82 {
83  return -1;
84 }
85 
86 
87 int port_getn(int port, port_event_t list[], uint_t max, uint_t *nget,
88  struct timespec *timeout);
89 
90 int port_getn(int port, port_event_t list[], uint_t max, uint_t *nget,
91  struct timespec *timeout)
92 {
93  return -1;
94 }
95 
96 
97 int timer_create(clockid_t clock_id, struct sigevent *evp, timer_t *timerid);
98 
99 int timer_create(clockid_t clock_id, struct sigevent *evp, timer_t *timerid)
100 {
101  return -1;
102 }
103 
104 
105 int timer_settime(timer_t timerid, int flags, const struct itimerspec *value,
106  struct itimerspec *ovalue);
107 
108 int timer_settime(timer_t timerid, int flags, const struct itimerspec *value,
109  struct itimerspec *ovalue)
110 {
111  return -1;
112 }
113 
114 
115 int timer_delete(timer_t timerid);
116 
117 int timer_delete(timer_t timerid)
118 {
119  return -1;
120 }
121 
122 #endif
123 
124 
125 typedef struct {
128 
129 
130 static ngx_int_t ngx_eventport_init(ngx_cycle_t *cycle, ngx_msec_t timer);
131 static void ngx_eventport_done(ngx_cycle_t *cycle);
132 static ngx_int_t ngx_eventport_add_event(ngx_event_t *ev, ngx_int_t event,
133  ngx_uint_t flags);
134 static ngx_int_t ngx_eventport_del_event(ngx_event_t *ev, ngx_int_t event,
135  ngx_uint_t flags);
136 static ngx_int_t ngx_eventport_process_events(ngx_cycle_t *cycle,
137  ngx_msec_t timer, ngx_uint_t flags);
138 
139 static void *ngx_eventport_create_conf(ngx_cycle_t *cycle);
140 static char *ngx_eventport_init_conf(ngx_cycle_t *cycle, void *conf);
141 
142 static int ep = -1;
143 static port_event_t *event_list;
144 static ngx_uint_t nevents;
145 static timer_t event_timer = (timer_t) -1;
146 
147 static ngx_str_t eventport_name = ngx_string("eventport");
148 
149 
150 static ngx_command_t ngx_eventport_commands[] = {
151 
152  { ngx_string("eventport_events"),
155  0,
156  offsetof(ngx_eventport_conf_t, events),
157  NULL },
158 
160 };
161 
162 
164  &eventport_name,
165  ngx_eventport_create_conf, /* create configuration */
166  ngx_eventport_init_conf, /* init configuration */
167 
168  {
169  ngx_eventport_add_event, /* add an event */
170  ngx_eventport_del_event, /* delete an event */
171  ngx_eventport_add_event, /* enable an event */
172  ngx_eventport_del_event, /* disable an event */
173  NULL, /* add an connection */
174  NULL, /* delete an connection */
175  NULL, /* process the changes */
176  ngx_eventport_process_events, /* process the events */
177  ngx_eventport_init, /* init the events */
178  ngx_eventport_done, /* done the events */
179  }
180 
181 };
182 
185  &ngx_eventport_module_ctx, /* module context */
186  ngx_eventport_commands, /* module directives */
187  NGX_EVENT_MODULE, /* module type */
188  NULL, /* init master */
189  NULL, /* init module */
190  NULL, /* init process */
191  NULL, /* init thread */
192  NULL, /* exit thread */
193  NULL, /* exit process */
194  NULL, /* exit master */
196 };
197 
198 
199 static ngx_int_t
200 ngx_eventport_init(ngx_cycle_t *cycle, ngx_msec_t timer)
201 {
202  port_notify_t pn;
203  struct itimerspec its;
204  struct sigevent sev;
205  ngx_eventport_conf_t *epcf;
206 
207  epcf = ngx_event_get_conf(cycle->conf_ctx, ngx_eventport_module);
208 
209  if (ep == -1) {
210  ep = port_create();
211 
212  if (ep == -1) {
214  "port_create() failed");
215  return NGX_ERROR;
216  }
217  }
218 
219  if (nevents < epcf->events) {
220  if (event_list) {
221  ngx_free(event_list);
222  }
223 
224  event_list = ngx_alloc(sizeof(port_event_t) * epcf->events,
225  cycle->log);
226  if (event_list == NULL) {
227  return NGX_ERROR;
228  }
229  }
230 
232 
233  if (timer) {
234  ngx_memzero(&pn, sizeof(port_notify_t));
235  pn.portnfy_port = ep;
236 
237  ngx_memzero(&sev, sizeof(struct sigevent));
238  sev.sigev_notify = SIGEV_PORT;
239 #if !(NGX_TEST_BUILD_EVENTPORT)
240  sev.sigev_value.sival_ptr = &pn;
241 #endif
242 
243  if (timer_create(CLOCK_REALTIME, &sev, &event_timer) == -1) {
245  "timer_create() failed");
246  return NGX_ERROR;
247  }
248 
249  its.it_interval.tv_sec = timer / 1000;
250  its.it_interval.tv_nsec = (timer % 1000) * 1000000;
251  its.it_value.tv_sec = timer / 1000;
252  its.it_value.tv_nsec = (timer % 1000) * 1000000;
253 
254  if (timer_settime(event_timer, 0, &its, NULL) == -1) {
256  "timer_settime() failed");
257  return NGX_ERROR;
258  }
259 
261  }
262 
263  nevents = epcf->events;
264 
265  ngx_io = ngx_os_io;
266 
267  ngx_event_actions = ngx_eventport_module_ctx.actions;
268 
269  return NGX_OK;
270 }
271 
272 
273 static void
274 ngx_eventport_done(ngx_cycle_t *cycle)
275 {
276  if (event_timer != (timer_t) -1) {
277  if (timer_delete(event_timer) == -1) {
279  "timer_delete() failed");
280  }
281 
282  event_timer = (timer_t) -1;
283  }
284 
285  if (close(ep) == -1) {
287  "close() event port failed");
288  }
289 
290  ep = -1;
291 
292  ngx_free(event_list);
293 
294  event_list = NULL;
295  nevents = 0;
296 }
297 
298 
299 static ngx_int_t
300 ngx_eventport_add_event(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags)
301 {
302  ngx_int_t events, prev;
303  ngx_event_t *e;
304  ngx_connection_t *c;
305 
306  c = ev->data;
307 
308  events = event;
309 
310  if (event == NGX_READ_EVENT) {
311  e = c->write;
312  prev = POLLOUT;
313 #if (NGX_READ_EVENT != POLLIN)
314  events = POLLIN;
315 #endif
316 
317  } else {
318  e = c->read;
319  prev = POLLIN;
320 #if (NGX_WRITE_EVENT != POLLOUT)
321  events = POLLOUT;
322 #endif
323  }
324 
325  if (e->oneshot) {
326  events |= prev;
327  }
328 
330  "eventport add event: fd:%d ev:%04Xi", c->fd, events);
331 
332  if (port_associate(ep, PORT_SOURCE_FD, c->fd, events,
333  (void *) ((uintptr_t) ev | ev->instance))
334  == -1)
335  {
337  "port_associate() failed");
338  return NGX_ERROR;
339  }
340 
341  ev->active = 1;
342  ev->oneshot = 1;
343 
344  return NGX_OK;
345 }
346 
347 
348 static ngx_int_t
349 ngx_eventport_del_event(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags)
350 {
351  ngx_event_t *e;
352  ngx_connection_t *c;
353 
354  /*
355  * when the file descriptor is closed, the event port automatically
356  * dissociates it from the port, so we do not need to dissociate explicitly
357  * the event before the closing the file descriptor
358  */
359 
360  if (flags & NGX_CLOSE_EVENT) {
361  ev->active = 0;
362  ev->oneshot = 0;
363  return NGX_OK;
364  }
365 
366  c = ev->data;
367 
368  if (event == NGX_READ_EVENT) {
369  e = c->write;
370  event = POLLOUT;
371 
372  } else {
373  e = c->read;
374  event = POLLIN;
375  }
376 
377  if (e->oneshot) {
379  "eventport change event: fd:%d ev:%04Xi", c->fd, event);
380 
381  if (port_associate(ep, PORT_SOURCE_FD, c->fd, event,
382  (void *) ((uintptr_t) ev | ev->instance))
383  == -1)
384  {
386  "port_associate() failed");
387  return NGX_ERROR;
388  }
389 
390  } else {
392  "eventport del event: fd:%d", c->fd);
393 
394  if (port_dissociate(ep, PORT_SOURCE_FD, c->fd) == -1) {
396  "port_dissociate() failed");
397  return NGX_ERROR;
398  }
399  }
400 
401  ev->active = 0;
402  ev->oneshot = 0;
403 
404  return NGX_OK;
405 }
406 
407 
408 ngx_int_t
409 ngx_eventport_process_events(ngx_cycle_t *cycle, ngx_msec_t timer,
410  ngx_uint_t flags)
411 {
412  int n, revents;
413  u_int events;
414  ngx_err_t err;
415  ngx_int_t instance;
416  ngx_uint_t i, level;
417  ngx_event_t *ev, *rev, *wev, **queue;
418  ngx_connection_t *c;
419  struct timespec ts, *tp;
420 
421  if (timer == NGX_TIMER_INFINITE) {
422  tp = NULL;
423 
424  } else {
425  ts.tv_sec = timer / 1000;
426  ts.tv_nsec = (timer % 1000) * 1000000;
427  tp = &ts;
428  }
429 
431  "eventport timer: %M", timer);
432 
433  events = 1;
434 
435  n = port_getn(ep, event_list, (u_int) nevents, &events, tp);
436 
437  err = ngx_errno;
438 
439  if (flags & NGX_UPDATE_TIME) {
440  ngx_time_update();
441  }
442 
443  if (n == -1) {
444  if (err == ETIME) {
445  if (timer != NGX_TIMER_INFINITE) {
446  return NGX_OK;
447  }
448 
449  ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
450  "port_getn() returned no events without timeout");
451  return NGX_ERROR;
452  }
453 
454  level = (err == NGX_EINTR) ? NGX_LOG_INFO : NGX_LOG_ALERT;
455  ngx_log_error(level, cycle->log, err, "port_getn() failed");
456  return NGX_ERROR;
457  }
458 
459  if (events == 0) {
460  if (timer != NGX_TIMER_INFINITE) {
461  return NGX_OK;
462  }
463 
464  ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
465  "port_getn() returned no events without timeout");
466  return NGX_ERROR;
467  }
468 
469  ngx_mutex_lock(ngx_posted_events_mutex);
470 
471  for (i = 0; i < events; i++) {
472 
473  if (event_list[i].portev_source == PORT_SOURCE_TIMER) {
474  ngx_time_update();
475  continue;
476  }
477 
478  ev = event_list[i].portev_user;
479 
480  switch (event_list[i].portev_source) {
481 
482  case PORT_SOURCE_FD:
483 
484  instance = (uintptr_t) ev & 1;
485  ev = (ngx_event_t *) ((uintptr_t) ev & (uintptr_t) ~1);
486 
487  if (ev->closed || ev->instance != instance) {
488 
489  /*
490  * the stale event from a file descriptor
491  * that was just closed in this iteration
492  */
493 
495  "eventport: stale event %p", ev);
496  continue;
497  }
498 
499  revents = event_list[i].portev_events;
500 
502  "eventport: fd:%d, ev:%04Xd",
503  event_list[i].portev_object, revents);
504 
505  if (revents & (POLLERR|POLLHUP|POLLNVAL)) {
507  "port_getn() error fd:%d ev:%04Xd",
508  event_list[i].portev_object, revents);
509  }
510 
511  if (revents & ~(POLLIN|POLLOUT|POLLERR|POLLHUP|POLLNVAL)) {
512  ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
513  "strange port_getn() events fd:%d ev:%04Xd",
514  event_list[i].portev_object, revents);
515  }
516 
517  if ((revents & (POLLERR|POLLHUP|POLLNVAL))
518  && (revents & (POLLIN|POLLOUT)) == 0)
519  {
520  /*
521  * if the error events were returned without POLLIN or POLLOUT,
522  * then add these flags to handle the events at least in one
523  * active handler
524  */
525 
526  revents |= POLLIN|POLLOUT;
527  }
528 
529  c = ev->data;
530  rev = c->read;
531  wev = c->write;
532 
533  rev->active = 0;
534  wev->active = 0;
535 
536  if (revents & POLLIN) {
537 
538  if ((flags & NGX_POST_THREAD_EVENTS) && !rev->accept) {
539  rev->posted_ready = 1;
540 
541  } else {
542  rev->ready = 1;
543  }
544 
545  if (flags & NGX_POST_EVENTS) {
546  queue = (ngx_event_t **) (rev->accept ?
548 
549  ngx_locked_post_event(rev, queue);
550 
551  } else {
552  rev->handler(rev);
553 
554  if (ev->closed || ev->instance != instance) {
555  continue;
556  }
557  }
558 
559  if (rev->accept) {
560  if (ngx_use_accept_mutex) {
561  ngx_accept_events = 1;
562  continue;
563  }
564 
565  if (port_associate(ep, PORT_SOURCE_FD, c->fd, POLLIN,
566  (void *) ((uintptr_t) ev | ev->instance))
567  == -1)
568  {
570  "port_associate() failed");
571  return NGX_ERROR;
572  }
573  }
574  }
575 
576  if (revents & POLLOUT) {
577 
578  if (flags & NGX_POST_THREAD_EVENTS) {
579  wev->posted_ready = 1;
580 
581  } else {
582  wev->ready = 1;
583  }
584 
585  if (flags & NGX_POST_EVENTS) {
587 
588  } else {
589  wev->handler(wev);
590  }
591  }
592 
593  continue;
594 
595  default:
596  ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
597  "unexpected even_port object %d",
598  event_list[i].portev_object);
599  continue;
600  }
601  }
602 
603  ngx_mutex_unlock(ngx_posted_events_mutex);
604 
605  return NGX_OK;
606 }
607 
608 
609 static void *
610 ngx_eventport_create_conf(ngx_cycle_t *cycle)
611 {
612  ngx_eventport_conf_t *epcf;
613 
614  epcf = ngx_palloc(cycle->pool, sizeof(ngx_eventport_conf_t));
615  if (epcf == NULL) {
616  return NULL;
617  }
618 
619  epcf->events = NGX_CONF_UNSET;
620 
621  return epcf;
622 }
623 
624 
625 static char *
626 ngx_eventport_init_conf(ngx_cycle_t *cycle, void *conf)
627 {
628  ngx_eventport_conf_t *epcf = conf;
629 
630  ngx_conf_init_uint_value(epcf->events, 32);
631 
632  return NGX_CONF_OK;
633 }