MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
evbuffer.c
1 /*
2  * Copyright (c) 2002-2004 Niels Provos <provos@citi.umich.edu>
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  * notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  * notice, this list of conditions and the following disclaimer in the
12  * documentation and/or other materials provided with the distribution.
13  * 3. The name of the author may not be used to endorse or promote products
14  * derived from this software without specific prior written permission.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
17  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
18  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
19  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
20  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
21  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
22  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
23  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
25  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26  */
27 
28 #include <sys/types.h>
29 
30 #ifdef HAVE_CONFIG_H
31 #include "config.h"
32 #endif
33 
34 #ifdef HAVE_SYS_TIME_H
35 #include <sys/time.h>
36 #endif
37 
38 #include <errno.h>
39 #include <stdio.h>
40 #include <stdlib.h>
41 #include <string.h>
42 #ifdef HAVE_STDARG_H
43 #include <stdarg.h>
44 #endif
45 
46 #ifdef WIN32
47 #include <winsock2.h>
48 #endif
49 
50 #include "evutil.h"
51 #include "event.h"
52 
53 /* prototypes */
54 
55 void bufferevent_read_pressure_cb(struct evbuffer *, size_t, size_t, void *);
56 
57 static int
58 bufferevent_add(struct event *ev, int timeout)
59 {
60  struct timeval tv, *ptv = NULL;
61 
62  if (timeout) {
63  evutil_timerclear(&tv);
64  tv.tv_sec = timeout;
65  ptv = &tv;
66  }
67 
68  return (event_add(ev, ptv));
69 }
70 
71 /*
72  * This callback is executed when the size of the input buffer changes.
73  * We use it to apply back pressure on the reading side.
74  */
75 
76 void
77 bufferevent_read_pressure_cb(struct evbuffer *buf, size_t old, size_t now,
78  void *arg) {
79  struct bufferevent *bufev = arg;
80  /*
81  * If we are below the watermark then reschedule reading if it's
82  * still enabled.
83  */
84  if (bufev->wm_read.high == 0 || now < bufev->wm_read.high) {
85  evbuffer_setcb(buf, NULL, NULL);
86 
87  if (bufev->enabled & EV_READ)
88  bufferevent_add(&bufev->ev_read, bufev->timeout_read);
89  }
90 }
91 
92 static void
93 bufferevent_readcb(int fd, short event, void *arg)
94 {
95  struct bufferevent *bufev = arg;
96  int res = 0;
97  short what = EVBUFFER_READ;
98  size_t len;
99  int howmuch = -1;
100 
101  if (event == EV_TIMEOUT) {
102  what |= EVBUFFER_TIMEOUT;
103  goto error;
104  }
105 
106  /*
107  * If we have a high watermark configured then we don't want to
108  * read more data than would make us reach the watermark.
109  */
110  if (bufev->wm_read.high != 0) {
111  howmuch = bufev->wm_read.high - EVBUFFER_LENGTH(bufev->input);
112  /* we might have lowered the watermark, stop reading */
113  if (howmuch <= 0) {
114  struct evbuffer *buf = bufev->input;
115  event_del(&bufev->ev_read);
116  evbuffer_setcb(buf,
117  bufferevent_read_pressure_cb, bufev);
118  return;
119  }
120  }
121 
122  res = evbuffer_read(bufev->input, fd, howmuch);
123  if (res == -1) {
124  if (errno == EAGAIN || errno == EINTR)
125  goto reschedule;
126  /* error case */
127  what |= EVBUFFER_ERROR;
128  } else if (res == 0) {
129  /* eof case */
130  what |= EVBUFFER_EOF;
131  }
132 
133  if (res <= 0)
134  goto error;
135 
136  bufferevent_add(&bufev->ev_read, bufev->timeout_read);
137 
138  /* See if this callbacks meets the water marks */
139  len = EVBUFFER_LENGTH(bufev->input);
140  if (bufev->wm_read.low != 0 && len < bufev->wm_read.low)
141  return;
142  if (bufev->wm_read.high != 0 && len >= bufev->wm_read.high) {
143  struct evbuffer *buf = bufev->input;
144  event_del(&bufev->ev_read);
145 
146  /* Now schedule a callback for us when the buffer changes */
147  evbuffer_setcb(buf, bufferevent_read_pressure_cb, bufev);
148  }
149 
150  /* Invoke the user callback - must always be called last */
151  if (bufev->readcb != NULL)
152  (*bufev->readcb)(bufev, bufev->cbarg);
153  return;
154 
155  reschedule:
156  bufferevent_add(&bufev->ev_read, bufev->timeout_read);
157  return;
158 
159  error:
160  (*bufev->errorcb)(bufev, what, bufev->cbarg);
161 }
162 
163 static void
164 bufferevent_writecb(int fd, short event, void *arg)
165 {
166  struct bufferevent *bufev = arg;
167  int res = 0;
168  short what = EVBUFFER_WRITE;
169 
170  if (event == EV_TIMEOUT) {
171  what |= EVBUFFER_TIMEOUT;
172  goto error;
173  }
174 
175  if (EVBUFFER_LENGTH(bufev->output)) {
176  res = evbuffer_write(bufev->output, fd);
177  if (res == -1) {
178 #ifndef WIN32
179 /*todo. evbuffer uses WriteFile when WIN32 is set. WIN32 system calls do not
180  *set errno. thus this error checking is not portable*/
181  if (errno == EAGAIN ||
182  errno == EINTR ||
183  errno == EINPROGRESS)
184  goto reschedule;
185  /* error case */
186  what |= EVBUFFER_ERROR;
187 
188 #else
189  goto reschedule;
190 #endif
191 
192  } else if (res == 0) {
193  /* eof case */
194  what |= EVBUFFER_EOF;
195  }
196  if (res <= 0)
197  goto error;
198  }
199 
200  if (EVBUFFER_LENGTH(bufev->output) != 0)
201  bufferevent_add(&bufev->ev_write, bufev->timeout_write);
202 
203  /*
204  * Invoke the user callback if our buffer is drained or below the
205  * low watermark.
206  */
207  if (bufev->writecb != NULL &&
208  EVBUFFER_LENGTH(bufev->output) <= bufev->wm_write.low)
209  (*bufev->writecb)(bufev, bufev->cbarg);
210 
211  return;
212 
213  reschedule:
214  if (EVBUFFER_LENGTH(bufev->output) != 0)
215  bufferevent_add(&bufev->ev_write, bufev->timeout_write);
216  return;
217 
218  error:
219  (*bufev->errorcb)(bufev, what, bufev->cbarg);
220 }
221 
222 /*
223  * Create a new buffered event object.
224  *
225  * The read callback is invoked whenever we read new data.
226  * The write callback is invoked whenever the output buffer is drained.
227  * The error callback is invoked on a write/read error or on EOF.
228  *
229  * Both read and write callbacks maybe NULL. The error callback is not
230  * allowed to be NULL and have to be provided always.
231  */
232 
233 struct bufferevent *
234 bufferevent_new(int fd, evbuffercb readcb, evbuffercb writecb,
235  everrorcb errorcb, void *cbarg)
236 {
237  struct bufferevent *bufev;
238 
239  if ((bufev = calloc(1, sizeof(struct bufferevent))) == NULL)
240  return (NULL);
241 
242  if ((bufev->input = evbuffer_new()) == NULL) {
243  free(bufev);
244  return (NULL);
245  }
246 
247  if ((bufev->output = evbuffer_new()) == NULL) {
248  evbuffer_free(bufev->input);
249  free(bufev);
250  return (NULL);
251  }
252 
253  event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev);
254  event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev);
255 
256  bufferevent_setcb(bufev, readcb, writecb, errorcb, cbarg);
257 
258  /*
259  * Set to EV_WRITE so that using bufferevent_write is going to
260  * trigger a callback. Reading needs to be explicitly enabled
261  * because otherwise no data will be available.
262  */
263  bufev->enabled = EV_WRITE;
264 
265  return (bufev);
266 }
267 
268 void
269 bufferevent_setcb(struct bufferevent *bufev,
270  evbuffercb readcb, evbuffercb writecb, everrorcb errorcb, void *cbarg)
271 {
272  bufev->readcb = readcb;
273  bufev->writecb = writecb;
274  bufev->errorcb = errorcb;
275 
276  bufev->cbarg = cbarg;
277 }
278 
279 void
280 bufferevent_setfd(struct bufferevent *bufev, int fd)
281 {
282  event_del(&bufev->ev_read);
283  event_del(&bufev->ev_write);
284 
285  event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev);
286  event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev);
287  if (bufev->ev_base != NULL) {
288  event_base_set(bufev->ev_base, &bufev->ev_read);
289  event_base_set(bufev->ev_base, &bufev->ev_write);
290  }
291 
292  /* might have to manually trigger event registration */
293 }
294 
295 int
296 bufferevent_priority_set(struct bufferevent *bufev, int priority)
297 {
298  if (event_priority_set(&bufev->ev_read, priority) == -1)
299  return (-1);
300  if (event_priority_set(&bufev->ev_write, priority) == -1)
301  return (-1);
302 
303  return (0);
304 }
305 
306 /* Closing the file descriptor is the responsibility of the caller */
307 
308 void
309 bufferevent_free(struct bufferevent *bufev)
310 {
311  event_del(&bufev->ev_read);
312  event_del(&bufev->ev_write);
313 
314  evbuffer_free(bufev->input);
315  evbuffer_free(bufev->output);
316 
317  free(bufev);
318 }
319 
320 /*
321  * Returns 0 on success;
322  * -1 on failure.
323  */
324 
325 int
326 bufferevent_write(struct bufferevent *bufev, const void *data, size_t size)
327 {
328  int res;
329 
330  res = evbuffer_add(bufev->output, data, size);
331 
332  if (res == -1)
333  return (res);
334 
335  /* If everything is okay, we need to schedule a write */
336  if (size > 0 && (bufev->enabled & EV_WRITE))
337  bufferevent_add(&bufev->ev_write, bufev->timeout_write);
338 
339  return (res);
340 }
341 
342 int
343 bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf)
344 {
345  int res;
346 
347  res = bufferevent_write(bufev, buf->buffer, buf->off);
348  if (res != -1)
349  evbuffer_drain(buf, buf->off);
350 
351  return (res);
352 }
353 
354 size_t
355 bufferevent_read(struct bufferevent *bufev, void *data, size_t size)
356 {
357  struct evbuffer *buf = bufev->input;
358 
359  if (buf->off < size)
360  size = buf->off;
361 
362  /* Copy the available data to the user buffer */
363  memcpy(data, buf->buffer, size);
364 
365  if (size)
366  evbuffer_drain(buf, size);
367 
368  return (size);
369 }
370 
371 int
372 bufferevent_enable(struct bufferevent *bufev, short event)
373 {
374  if (event & EV_READ) {
375  if (bufferevent_add(&bufev->ev_read, bufev->timeout_read) == -1)
376  return (-1);
377  }
378  if (event & EV_WRITE) {
379  if (bufferevent_add(&bufev->ev_write, bufev->timeout_write) == -1)
380  return (-1);
381  }
382 
383  bufev->enabled |= event;
384  return (0);
385 }
386 
387 int
388 bufferevent_disable(struct bufferevent *bufev, short event)
389 {
390  if (event & EV_READ) {
391  if (event_del(&bufev->ev_read) == -1)
392  return (-1);
393  }
394  if (event & EV_WRITE) {
395  if (event_del(&bufev->ev_write) == -1)
396  return (-1);
397  }
398 
399  bufev->enabled &= ~event;
400  return (0);
401 }
402 
403 /*
404  * Sets the read and write timeout for a buffered event.
405  */
406 
407 void
408 bufferevent_settimeout(struct bufferevent *bufev,
409  int timeout_read, int timeout_write) {
410  bufev->timeout_read = timeout_read;
411  bufev->timeout_write = timeout_write;
412 
413  if (event_pending(&bufev->ev_read, EV_READ, NULL))
414  bufferevent_add(&bufev->ev_read, timeout_read);
415  if (event_pending(&bufev->ev_write, EV_WRITE, NULL))
416  bufferevent_add(&bufev->ev_write, timeout_write);
417 }
418 
419 /*
420  * Sets the water marks
421  */
422 
423 void
424 bufferevent_setwatermark(struct bufferevent *bufev, short events,
425  size_t lowmark, size_t highmark)
426 {
427  if (events & EV_READ) {
428  bufev->wm_read.low = lowmark;
429  bufev->wm_read.high = highmark;
430  }
431 
432  if (events & EV_WRITE) {
433  bufev->wm_write.low = lowmark;
434  bufev->wm_write.high = highmark;
435  }
436 
437  /* If the watermarks changed then see if we should call read again */
438  bufferevent_read_pressure_cb(bufev->input,
439  0, EVBUFFER_LENGTH(bufev->input), bufev);
440 }
441 
442 int
443 bufferevent_base_set(struct event_base *base, struct bufferevent *bufev)
444 {
445  int res;
446 
447  bufev->ev_base = base;
448 
449  res = event_base_set(base, &bufev->ev_read);
450  if (res == -1)
451  return (res);
452 
453  res = event_base_set(base, &bufev->ev_write);
454  return (res);
455 }