Groonga 3.0.9 Source Code Document
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
ngx_event_pipe.c
Go to the documentation of this file.
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) Nginx, Inc.
5  */
6 
7 
8 #include <ngx_config.h>
9 #include <ngx_core.h>
10 #include <ngx_event.h>
11 #include <ngx_event_pipe.h>
12 
13 
14 static ngx_int_t ngx_event_pipe_read_upstream(ngx_event_pipe_t *p);
15 static ngx_int_t ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p);
16 
17 static ngx_int_t ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t *p);
18 static ngx_inline void ngx_event_pipe_remove_shadow_links(ngx_buf_t *buf);
19 static ngx_int_t ngx_event_pipe_drain_chains(ngx_event_pipe_t *p);
20 
21 
24 {
25  u_int flags;
26  ngx_int_t rc;
27  ngx_event_t *rev, *wev;
28 
29  for ( ;; ) {
30  if (do_write) {
31  p->log->action = "sending to client";
32 
33  rc = ngx_event_pipe_write_to_downstream(p);
34 
35  if (rc == NGX_ABORT) {
36  return NGX_ABORT;
37  }
38 
39  if (rc == NGX_BUSY) {
40  return NGX_OK;
41  }
42  }
43 
44  p->read = 0;
45  p->upstream_blocked = 0;
46 
47  p->log->action = "reading upstream";
48 
49  if (ngx_event_pipe_read_upstream(p) == NGX_ABORT) {
50  return NGX_ABORT;
51  }
52 
53  if (!p->read && !p->upstream_blocked) {
54  break;
55  }
56 
57  do_write = 1;
58  }
59 
60  if (p->upstream->fd != -1) {
61  rev = p->upstream->read;
62 
63  flags = (rev->eof || rev->error) ? NGX_CLOSE_EVENT : 0;
64 
65  if (ngx_handle_read_event(rev, flags) != NGX_OK) {
66  return NGX_ABORT;
67  }
68 
69  if (rev->active && !rev->ready) {
70  ngx_add_timer(rev, p->read_timeout);
71 
72  } else if (rev->timer_set) {
73  ngx_del_timer(rev);
74  }
75  }
76 
77  if (p->downstream->fd != -1 && p->downstream->data == p->output_ctx) {
78  wev = p->downstream->write;
79  if (ngx_handle_write_event(wev, p->send_lowat) != NGX_OK) {
80  return NGX_ABORT;
81  }
82 
83  if (!wev->delayed) {
84  if (wev->active && !wev->ready) {
85  ngx_add_timer(wev, p->send_timeout);
86 
87  } else if (wev->timer_set) {
88  ngx_del_timer(wev);
89  }
90  }
91  }
92 
93  return NGX_OK;
94 }
95 
96 
97 static ngx_int_t
98 ngx_event_pipe_read_upstream(ngx_event_pipe_t *p)
99 {
100  ssize_t n, size;
101  ngx_int_t rc;
102  ngx_buf_t *b;
103  ngx_chain_t *chain, *cl, *ln;
104 
105  if (p->upstream_eof || p->upstream_error || p->upstream_done) {
106  return NGX_OK;
107  }
108 
110  "pipe read upstream: %d", p->upstream->read->ready);
111 
112  for ( ;; ) {
113 
114  if (p->upstream_eof || p->upstream_error || p->upstream_done) {
115  break;
116  }
117 
118  if (p->preread_bufs == NULL && !p->upstream->read->ready) {
119  break;
120  }
121 
122  if (p->preread_bufs) {
123 
124  /* use the pre-read bufs if they exist */
125 
126  chain = p->preread_bufs;
127  p->preread_bufs = NULL;
128  n = p->preread_size;
129 
131  "pipe preread: %z", n);
132 
133  if (n) {
134  p->read = 1;
135  }
136 
137  } else {
138 
139 #if (NGX_HAVE_KQUEUE)
140 
141  /*
142  * kqueue notifies about the end of file or a pending error.
143  * This test allows not to allocate a buf on these conditions
144  * and not to call c->recv_chain().
145  */
146 
147  if (p->upstream->read->available == 0
148  && p->upstream->read->pending_eof)
149  {
150  p->upstream->read->ready = 0;
151  p->upstream->read->eof = 1;
152  p->upstream_eof = 1;
153  p->read = 1;
154 
155  if (p->upstream->read->kq_errno) {
156  p->upstream->read->error = 1;
157  p->upstream_error = 1;
158  p->upstream_eof = 0;
159 
161  p->upstream->read->kq_errno,
162  "kevent() reported that upstream "
163  "closed connection");
164  }
165 
166  break;
167  }
168 #endif
169 
170  if (p->free_raw_bufs) {
171 
172  /* use the free bufs if they exist */
173 
174  chain = p->free_raw_bufs;
175  if (p->single_buf) {
177  chain->next = NULL;
178  } else {
179  p->free_raw_bufs = NULL;
180  }
181 
182  } else if (p->allocated < p->bufs.num) {
183 
184  /* allocate a new buf if it's still allowed */
185 
186  b = ngx_create_temp_buf(p->pool, p->bufs.size);
187  if (b == NULL) {
188  return NGX_ABORT;
189  }
190 
191  p->allocated++;
192 
193  chain = ngx_alloc_chain_link(p->pool);
194  if (chain == NULL) {
195  return NGX_ABORT;
196  }
197 
198  chain->buf = b;
199  chain->next = NULL;
200 
201  } else if (!p->cacheable
202  && p->downstream->data == p->output_ctx
203  && p->downstream->write->ready
204  && !p->downstream->write->delayed)
205  {
206  /*
207  * if the bufs are not needed to be saved in a cache and
208  * a downstream is ready then write the bufs to a downstream
209  */
210 
211  p->upstream_blocked = 1;
212 
214  "pipe downstream ready");
215 
216  break;
217 
218  } else if (p->cacheable
219  || p->temp_file->offset < p->max_temp_file_size)
220  {
221 
222  /*
223  * if it is allowed, then save some bufs from r->in
224  * to a temporary file, and add them to a r->out chain
225  */
226 
227  rc = ngx_event_pipe_write_chain_to_temp_file(p);
228 
230  "pipe temp offset: %O", p->temp_file->offset);
231 
232  if (rc == NGX_BUSY) {
233  break;
234  }
235 
236  if (rc == NGX_AGAIN) {
238  && p->upstream->read->active
239  && p->upstream->read->ready)
240  {
242  == NGX_ERROR)
243  {
244  return NGX_ABORT;
245  }
246  }
247  }
248 
249  if (rc != NGX_OK) {
250  return rc;
251  }
252 
253  chain = p->free_raw_bufs;
254  if (p->single_buf) {
256  chain->next = NULL;
257  } else {
258  p->free_raw_bufs = NULL;
259  }
260 
261  } else {
262 
263  /* there are no bufs to read in */
264 
266  "no pipe bufs to read in");
267 
268  break;
269  }
270 
271  n = p->upstream->recv_chain(p->upstream, chain);
272 
274  "pipe recv chain: %z", n);
275 
276  if (p->free_raw_bufs) {
277  chain->next = p->free_raw_bufs;
278  }
279  p->free_raw_bufs = chain;
280 
281  if (n == NGX_ERROR) {
282  p->upstream_error = 1;
283  return NGX_ERROR;
284  }
285 
286  if (n == NGX_AGAIN) {
287  if (p->single_buf) {
288  ngx_event_pipe_remove_shadow_links(chain->buf);
289  }
290 
291  break;
292  }
293 
294  p->read = 1;
295 
296  if (n == 0) {
297  p->upstream_eof = 1;
298  break;
299  }
300  }
301 
302  p->read_length += n;
303  cl = chain;
304  p->free_raw_bufs = NULL;
305 
306  while (cl && n > 0) {
307 
308  ngx_event_pipe_remove_shadow_links(cl->buf);
309 
310  size = cl->buf->end - cl->buf->last;
311 
312  if (n >= size) {
313  cl->buf->last = cl->buf->end;
314 
315  /* STUB */ cl->buf->num = p->num++;
316 
317  if (p->input_filter(p, cl->buf) == NGX_ERROR) {
318  return NGX_ABORT;
319  }
320 
321  n -= size;
322  ln = cl;
323  cl = cl->next;
324  ngx_free_chain(p->pool, ln);
325 
326  } else {
327  cl->buf->last += n;
328  n = 0;
329  }
330  }
331 
332  if (cl) {
333  for (ln = cl; ln->next; ln = ln->next) { /* void */ }
334 
335  ln->next = p->free_raw_bufs;
336  p->free_raw_bufs = cl;
337  }
338  }
339 
340 #if (NGX_DEBUG)
341 
342  for (cl = p->busy; cl; cl = cl->next) {
344  "pipe buf busy s:%d t:%d f:%d "
345  "%p, pos %p, size: %z "
346  "file: %O, size: %z",
347  (cl->buf->shadow ? 1 : 0),
348  cl->buf->temporary, cl->buf->in_file,
349  cl->buf->start, cl->buf->pos,
350  cl->buf->last - cl->buf->pos,
351  cl->buf->file_pos,
352  cl->buf->file_last - cl->buf->file_pos);
353  }
354 
355  for (cl = p->out; cl; cl = cl->next) {
357  "pipe buf out s:%d t:%d f:%d "
358  "%p, pos %p, size: %z "
359  "file: %O, size: %z",
360  (cl->buf->shadow ? 1 : 0),
361  cl->buf->temporary, cl->buf->in_file,
362  cl->buf->start, cl->buf->pos,
363  cl->buf->last - cl->buf->pos,
364  cl->buf->file_pos,
365  cl->buf->file_last - cl->buf->file_pos);
366  }
367 
368  for (cl = p->in; cl; cl = cl->next) {
370  "pipe buf in s:%d t:%d f:%d "
371  "%p, pos %p, size: %z "
372  "file: %O, size: %z",
373  (cl->buf->shadow ? 1 : 0),
374  cl->buf->temporary, cl->buf->in_file,
375  cl->buf->start, cl->buf->pos,
376  cl->buf->last - cl->buf->pos,
377  cl->buf->file_pos,
378  cl->buf->file_last - cl->buf->file_pos);
379  }
380 
381  for (cl = p->free_raw_bufs; cl; cl = cl->next) {
383  "pipe buf free s:%d t:%d f:%d "
384  "%p, pos %p, size: %z "
385  "file: %O, size: %z",
386  (cl->buf->shadow ? 1 : 0),
387  cl->buf->temporary, cl->buf->in_file,
388  cl->buf->start, cl->buf->pos,
389  cl->buf->last - cl->buf->pos,
390  cl->buf->file_pos,
391  cl->buf->file_last - cl->buf->file_pos);
392  }
393 
395  "pipe length: %O", p->length);
396 
397 #endif
398 
399  if (p->free_raw_bufs && p->length != -1) {
400  cl = p->free_raw_bufs;
401 
402  if (cl->buf->last - cl->buf->pos >= p->length) {
403 
404  p->free_raw_bufs = cl->next;
405 
406  /* STUB */ cl->buf->num = p->num++;
407 
408  if (p->input_filter(p, cl->buf) == NGX_ERROR) {
409  return NGX_ABORT;
410  }
411 
412  ngx_free_chain(p->pool, cl);
413  }
414  }
415 
416  if (p->length == 0) {
417  p->upstream_done = 1;
418  p->read = 1;
419  }
420 
421  if ((p->upstream_eof || p->upstream_error) && p->free_raw_bufs) {
422 
423  /* STUB */ p->free_raw_bufs->buf->num = p->num++;
424 
425  if (p->input_filter(p, p->free_raw_bufs->buf) == NGX_ERROR) {
426  return NGX_ABORT;
427  }
428 
430 
431  if (p->free_bufs && p->buf_to_file == NULL) {
432  for (cl = p->free_raw_bufs; cl; cl = cl->next) {
433  if (cl->buf->shadow == NULL) {
434  ngx_pfree(p->pool, cl->buf->start);
435  }
436  }
437  }
438  }
439 
440  if (p->cacheable && p->in) {
441  if (ngx_event_pipe_write_chain_to_temp_file(p) == NGX_ABORT) {
442  return NGX_ABORT;
443  }
444  }
445 
446  return NGX_OK;
447 }
448 
449 
450 static ngx_int_t
451 ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p)
452 {
453  u_char *prev;
454  size_t bsize;
455  ngx_int_t rc;
456  ngx_uint_t flush, flushed, prev_last_shadow;
457  ngx_chain_t *out, **ll, *cl, file;
458  ngx_connection_t *downstream;
459 
460  downstream = p->downstream;
461 
463  "pipe write downstream: %d", downstream->write->ready);
464 
465  flushed = 0;
466 
467  for ( ;; ) {
468  if (p->downstream_error) {
469  return ngx_event_pipe_drain_chains(p);
470  }
471 
472  if (p->upstream_eof || p->upstream_error || p->upstream_done) {
473 
474  /* pass the p->out and p->in chains to the output filter */
475 
476  for (cl = p->busy; cl; cl = cl->next) {
477  cl->buf->recycled = 0;
478  }
479 
480  if (p->out) {
482  "pipe write downstream flush out");
483 
484  for (cl = p->out; cl; cl = cl->next) {
485  cl->buf->recycled = 0;
486  }
487 
488  rc = p->output_filter(p->output_ctx, p->out);
489 
490  if (rc == NGX_ERROR) {
491  p->downstream_error = 1;
492  return ngx_event_pipe_drain_chains(p);
493  }
494 
495  p->out = NULL;
496  }
497 
498  if (p->in) {
500  "pipe write downstream flush in");
501 
502  for (cl = p->in; cl; cl = cl->next) {
503  cl->buf->recycled = 0;
504  }
505 
506  rc = p->output_filter(p->output_ctx, p->in);
507 
508  if (rc == NGX_ERROR) {
509  p->downstream_error = 1;
510  return ngx_event_pipe_drain_chains(p);
511  }
512 
513  p->in = NULL;
514  }
515 
516  if (p->cacheable && p->buf_to_file) {
517 
518  file.buf = p->buf_to_file;
519  file.next = NULL;
520 
522  == NGX_ERROR)
523  {
524  return NGX_ABORT;
525  }
526  }
527 
529  "pipe write downstream done");
530 
531  /* TODO: free unused bufs */
532 
533  p->downstream_done = 1;
534  break;
535  }
536 
537  if (downstream->data != p->output_ctx
538  || !downstream->write->ready
539  || downstream->write->delayed)
540  {
541  break;
542  }
543 
544  /* bsize is the size of the busy recycled bufs */
545 
546  prev = NULL;
547  bsize = 0;
548 
549  for (cl = p->busy; cl; cl = cl->next) {
550 
551  if (cl->buf->recycled) {
552  if (prev == cl->buf->start) {
553  continue;
554  }
555 
556  bsize += cl->buf->end - cl->buf->start;
557  prev = cl->buf->start;
558  }
559  }
560 
562  "pipe write busy: %uz", bsize);
563 
564  out = NULL;
565 
566  if (bsize >= (size_t) p->busy_size) {
567  flush = 1;
568  goto flush;
569  }
570 
571  flush = 0;
572  ll = NULL;
573  prev_last_shadow = 1;
574 
575  for ( ;; ) {
576  if (p->out) {
577  cl = p->out;
578 
579  if (cl->buf->recycled) {
581  "recycled buffer in pipe out chain");
582  }
583 
584  p->out = p->out->next;
585 
586  } else if (!p->cacheable && p->in) {
587  cl = p->in;
588 
590  "pipe write buf ls:%d %p %z",
591  cl->buf->last_shadow,
592  cl->buf->pos,
593  cl->buf->last - cl->buf->pos);
594 
595  if (cl->buf->recycled && prev_last_shadow) {
596  if (bsize + cl->buf->end - cl->buf->start > p->busy_size) {
597  flush = 1;
598  break;
599  }
600 
601  bsize += cl->buf->end - cl->buf->start;
602  }
603 
604  prev_last_shadow = cl->buf->last_shadow;
605 
606  p->in = p->in->next;
607 
608  } else {
609  break;
610  }
611 
612  cl->next = NULL;
613 
614  if (out) {
615  *ll = cl;
616  } else {
617  out = cl;
618  }
619  ll = &cl->next;
620  }
621 
622  flush:
623 
625  "pipe write: out:%p, f:%d", out, flush);
626 
627  if (out == NULL) {
628 
629  if (!flush) {
630  break;
631  }
632 
633  /* a workaround for AIO */
634  if (flushed++ > 10) {
635  return NGX_BUSY;
636  }
637  }
638 
639  rc = p->output_filter(p->output_ctx, out);
640 
641  ngx_chain_update_chains(p->pool, &p->free, &p->busy, &out, p->tag);
642 
643  if (rc == NGX_ERROR) {
644  p->downstream_error = 1;
645  return ngx_event_pipe_drain_chains(p);
646  }
647 
648  for (cl = p->free; cl; cl = cl->next) {
649 
650  if (cl->buf->temp_file) {
651  if (p->cacheable || !p->cyclic_temp_file) {
652  continue;
653  }
654 
655  /* reset p->temp_offset if all bufs had been sent */
656 
657  if (cl->buf->file_last == p->temp_file->offset) {
658  p->temp_file->offset = 0;
659  }
660  }
661 
662  /* TODO: free buf if p->free_bufs && upstream done */
663 
664  /* add the free shadow raw buf to p->free_raw_bufs */
665 
666  if (cl->buf->last_shadow) {
667  if (ngx_event_pipe_add_free_buf(p, cl->buf->shadow) != NGX_OK) {
668  return NGX_ABORT;
669  }
670 
671  cl->buf->last_shadow = 0;
672  }
673 
674  cl->buf->shadow = NULL;
675  }
676  }
677 
678  return NGX_OK;
679 }
680 
681 
682 static ngx_int_t
683 ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t *p)
684 {
685  ssize_t size, bsize, n;
686  ngx_buf_t *b;
687  ngx_uint_t prev_last_shadow;
688  ngx_chain_t *cl, *tl, *next, *out, **ll, **last_out, **last_free, fl;
689 
690  if (p->buf_to_file) {
691  fl.buf = p->buf_to_file;
692  fl.next = p->in;
693  out = &fl;
694 
695  } else {
696  out = p->in;
697  }
698 
699  if (!p->cacheable) {
700 
701  size = 0;
702  cl = out;
703  ll = NULL;
704  prev_last_shadow = 1;
705 
707  "pipe offset: %O", p->temp_file->offset);
708 
709  do {
710  bsize = cl->buf->last - cl->buf->pos;
711 
713  "pipe buf ls:%d %p, pos %p, size: %z",
714  cl->buf->last_shadow, cl->buf->start,
715  cl->buf->pos, bsize);
716 
717  if (prev_last_shadow
718  && ((size + bsize > p->temp_file_write_size)
719  || (p->temp_file->offset + size + bsize
720  > p->max_temp_file_size)))
721  {
722  break;
723  }
724 
725  prev_last_shadow = cl->buf->last_shadow;
726 
727  size += bsize;
728  ll = &cl->next;
729  cl = cl->next;
730 
731  } while (cl);
732 
733  ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "size: %z", size);
734 
735  if (ll == NULL) {
736  return NGX_BUSY;
737  }
738 
739  if (cl) {
740  p->in = cl;
741  *ll = NULL;
742 
743  } else {
744  p->in = NULL;
745  p->last_in = &p->in;
746  }
747 
748  } else {
749  p->in = NULL;
750  p->last_in = &p->in;
751  }
752 
754 
755  if (n == NGX_ERROR) {
756  return NGX_ABORT;
757  }
758 
759  if (p->buf_to_file) {
761  n -= p->buf_to_file->last - p->buf_to_file->pos;
762  p->buf_to_file = NULL;
763  out = out->next;
764  }
765 
766  if (n > 0) {
767  /* update previous buffer or add new buffer */
768 
769  if (p->out) {
770  for (cl = p->out; cl->next; cl = cl->next) { /* void */ }
771 
772  b = cl->buf;
773 
774  if (b->file_last == p->temp_file->offset) {
775  p->temp_file->offset += n;
776  b->file_last = p->temp_file->offset;
777  goto free;
778  }
779 
780  last_out = &cl->next;
781 
782  } else {
783  last_out = &p->out;
784  }
785 
786  cl = ngx_chain_get_free_buf(p->pool, &p->free);
787  if (cl == NULL) {
788  return NGX_ABORT;
789  }
790 
791  b = cl->buf;
792 
793  ngx_memzero(b, sizeof(ngx_buf_t));
794 
795  b->tag = p->tag;
796 
797  b->file = &p->temp_file->file;
798  b->file_pos = p->temp_file->offset;
799  p->temp_file->offset += n;
800  b->file_last = p->temp_file->offset;
801 
802  b->in_file = 1;
803  b->temp_file = 1;
804 
805  *last_out = cl;
806  }
807 
808 free:
809 
810  for (last_free = &p->free_raw_bufs;
811  *last_free != NULL;
812  last_free = &(*last_free)->next)
813  {
814  /* void */
815  }
816 
817  for (cl = out; cl; cl = next) {
818  next = cl->next;
819 
820  cl->next = p->free;
821  p->free = cl;
822 
823  b = cl->buf;
824 
825  if (b->last_shadow) {
826 
827  tl = ngx_alloc_chain_link(p->pool);
828  if (tl == NULL) {
829  return NGX_ABORT;
830  }
831 
832  tl->buf = b->shadow;
833  tl->next = NULL;
834 
835  *last_free = tl;
836  last_free = &tl->next;
837 
838  b->shadow->pos = b->shadow->start;
839  b->shadow->last = b->shadow->start;
840 
841  ngx_event_pipe_remove_shadow_links(b->shadow);
842  }
843  }
844 
845  return NGX_OK;
846 }
847 
848 
849 /* the copy input filter */
850 
851 ngx_int_t
853 {
854  ngx_buf_t *b;
855  ngx_chain_t *cl;
856 
857  if (buf->pos == buf->last) {
858  return NGX_OK;
859  }
860 
861  if (p->free) {
862  cl = p->free;
863  b = cl->buf;
864  p->free = cl->next;
865  ngx_free_chain(p->pool, cl);
866 
867  } else {
868  b = ngx_alloc_buf(p->pool);
869  if (b == NULL) {
870  return NGX_ERROR;
871  }
872  }
873 
874  ngx_memcpy(b, buf, sizeof(ngx_buf_t));
875  b->shadow = buf;
876  b->tag = p->tag;
877  b->last_shadow = 1;
878  b->recycled = 1;
879  buf->shadow = b;
880 
881  cl = ngx_alloc_chain_link(p->pool);
882  if (cl == NULL) {
883  return NGX_ERROR;
884  }
885 
886  cl->buf = b;
887  cl->next = NULL;
888 
889  ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "input buf #%d", b->num);
890 
891  if (p->in) {
892  *p->last_in = cl;
893  } else {
894  p->in = cl;
895  }
896  p->last_in = &cl->next;
897 
898  if (p->length == -1) {
899  return NGX_OK;
900  }
901 
902  p->length -= b->last - b->pos;
903 
904  return NGX_OK;
905 }
906 
907 
908 static ngx_inline void
909 ngx_event_pipe_remove_shadow_links(ngx_buf_t *buf)
910 {
911  ngx_buf_t *b, *next;
912 
913  b = buf->shadow;
914 
915  if (b == NULL) {
916  return;
917  }
918 
919  while (!b->last_shadow) {
920  next = b->shadow;
921 
922  b->temporary = 0;
923  b->recycled = 0;
924 
925  b->shadow = NULL;
926  b = next;
927  }
928 
929  b->temporary = 0;
930  b->recycled = 0;
931  b->last_shadow = 0;
932 
933  b->shadow = NULL;
934 
935  buf->shadow = NULL;
936 }
937 
938 
939 ngx_int_t
941 {
942  ngx_chain_t *cl;
943 
944  cl = ngx_alloc_chain_link(p->pool);
945  if (cl == NULL) {
946  return NGX_ERROR;
947  }
948 
949  if (p->buf_to_file && b->start == p->buf_to_file->start) {
950  b->pos = p->buf_to_file->last;
951  b->last = p->buf_to_file->last;
952 
953  } else {
954  b->pos = b->start;
955  b->last = b->start;
956  }
957 
958  b->shadow = NULL;
959 
960  cl->buf = b;
961 
962  if (p->free_raw_bufs == NULL) {
963  p->free_raw_bufs = cl;
964  cl->next = NULL;
965 
966  return NGX_OK;
967  }
968 
969  if (p->free_raw_bufs->buf->pos == p->free_raw_bufs->buf->last) {
970 
971  /* add the free buf to the list start */
972 
973  cl->next = p->free_raw_bufs;
974  p->free_raw_bufs = cl;
975 
976  return NGX_OK;
977  }
978 
979  /* the first free buf is partially filled, thus add the free buf after it */
980 
981  cl->next = p->free_raw_bufs->next;
982  p->free_raw_bufs->next = cl;
983 
984  return NGX_OK;
985 }
986 
987 
988 static ngx_int_t
989 ngx_event_pipe_drain_chains(ngx_event_pipe_t *p)
990 {
991  ngx_chain_t *cl, *tl;
992 
993  for ( ;; ) {
994  if (p->busy) {
995  cl = p->busy;
996  p->busy = NULL;
997 
998  } else if (p->out) {
999  cl = p->out;
1000  p->out = NULL;
1001 
1002  } else if (p->in) {
1003  cl = p->in;
1004  p->in = NULL;
1005 
1006  } else {
1007  return NGX_OK;
1008  }
1009 
1010  while (cl) {
1011  if (cl->buf->last_shadow) {
1012  if (ngx_event_pipe_add_free_buf(p, cl->buf->shadow) != NGX_OK) {
1013  return NGX_ABORT;
1014  }
1015 
1016  cl->buf->last_shadow = 0;
1017  }
1018 
1019  cl->buf->shadow = NULL;
1020  tl = cl->next;
1021  cl->next = p->free;
1022  p->free = cl;
1023  cl = tl;
1024  }
1025  }
1026 }