Annotation of sys/kern/sys_pipe.c, Revision 1.1.1.1
1.1 nbrk 1: /* $OpenBSD: sys_pipe.c,v 1.52 2007/08/07 11:30:53 millert Exp $ */
2:
3: /*
4: * Copyright (c) 1996 John S. Dyson
5: * All rights reserved.
6: *
7: * Redistribution and use in source and binary forms, with or without
8: * modification, are permitted provided that the following conditions
9: * are met:
10: * 1. Redistributions of source code must retain the above copyright
11: * notice immediately at the beginning of the file, without modification,
12: * this list of conditions, and the following disclaimer.
13: * 2. Redistributions in binary form must reproduce the above copyright
14: * notice, this list of conditions and the following disclaimer in the
15: * documentation and/or other materials provided with the distribution.
16: * 3. Absolutely no warranty of function or purpose is made by the author
17: * John S. Dyson.
18: * 4. Modifications may be freely made to this file if the above conditions
19: * are met.
20: */
21:
22: /*
23: * This file contains a high-performance replacement for the socket-based
24: * pipes scheme originally used in FreeBSD/4.4Lite. It does not support
25: * all features of sockets, but does do everything that pipes normally
26: * do.
27: */
28:
29: #include <sys/param.h>
30: #include <sys/systm.h>
31: #include <sys/proc.h>
32: #include <sys/file.h>
33: #include <sys/filedesc.h>
34: #include <sys/pool.h>
35: #include <sys/ioctl.h>
36: #include <sys/stat.h>
37: #include <sys/signalvar.h>
38: #include <sys/mount.h>
39: #include <sys/syscallargs.h>
40: #include <sys/event.h>
41: #include <sys/lock.h>
42: #include <sys/poll.h>
43:
44: #include <uvm/uvm_extern.h>
45:
46: #include <sys/pipe.h>
47:
48: /*
49: * interfaces to the outside world
50: */
51: int pipe_read(struct file *, off_t *, struct uio *, struct ucred *);
52: int pipe_write(struct file *, off_t *, struct uio *, struct ucred *);
53: int pipe_close(struct file *, struct proc *);
54: int pipe_poll(struct file *, int events, struct proc *);
55: int pipe_kqfilter(struct file *fp, struct knote *kn);
56: int pipe_ioctl(struct file *, u_long, caddr_t, struct proc *);
57: int pipe_stat(struct file *fp, struct stat *ub, struct proc *p);
58:
59: static struct fileops pipeops = {
60: pipe_read, pipe_write, pipe_ioctl, pipe_poll, pipe_kqfilter,
61: pipe_stat, pipe_close
62: };
63:
64: void filt_pipedetach(struct knote *kn);
65: int filt_piperead(struct knote *kn, long hint);
66: int filt_pipewrite(struct knote *kn, long hint);
67:
68: struct filterops pipe_rfiltops =
69: { 1, NULL, filt_pipedetach, filt_piperead };
70: struct filterops pipe_wfiltops =
71: { 1, NULL, filt_pipedetach, filt_pipewrite };
72:
73: /*
74: * Default pipe buffer size(s), this can be kind-of large now because pipe
75: * space is pageable. The pipe code will try to maintain locality of
76: * reference for performance reasons, so small amounts of outstanding I/O
77: * will not wipe the cache.
78: */
79: #define MINPIPESIZE (PIPE_SIZE/3)
80:
81: /*
82: * Limit the number of "big" pipes
83: */
84: #define LIMITBIGPIPES 32
85: int nbigpipe;
86: static int amountpipekva;
87:
88: struct pool pipe_pool;
89:
90: void pipeclose(struct pipe *);
91: void pipe_free_kmem(struct pipe *);
92: int pipe_create(struct pipe *);
93: static __inline int pipelock(struct pipe *);
94: static __inline void pipeunlock(struct pipe *);
95: static __inline void pipeselwakeup(struct pipe *);
96: int pipespace(struct pipe *, u_int);
97:
98: /*
99: * The pipe system call for the DTYPE_PIPE type of pipes
100: */
101:
102: /* ARGSUSED */
103: int
104: sys_opipe(struct proc *p, void *v, register_t *retval)
105: {
106: struct filedesc *fdp = p->p_fd;
107: struct file *rf, *wf;
108: struct pipe *rpipe, *wpipe;
109: int fd, error;
110:
111: fdplock(fdp);
112:
113: rpipe = pool_get(&pipe_pool, PR_WAITOK);
114: error = pipe_create(rpipe);
115: if (error != 0)
116: goto free1;
117: wpipe = pool_get(&pipe_pool, PR_WAITOK);
118: error = pipe_create(wpipe);
119: if (error != 0)
120: goto free2;
121:
122: error = falloc(p, &rf, &fd);
123: if (error != 0)
124: goto free2;
125: rf->f_flag = FREAD | FWRITE;
126: rf->f_type = DTYPE_PIPE;
127: rf->f_data = rpipe;
128: rf->f_ops = &pipeops;
129: retval[0] = fd;
130:
131: error = falloc(p, &wf, &fd);
132: if (error != 0)
133: goto free3;
134: wf->f_flag = FREAD | FWRITE;
135: wf->f_type = DTYPE_PIPE;
136: wf->f_data = wpipe;
137: wf->f_ops = &pipeops;
138: retval[1] = fd;
139:
140: rpipe->pipe_peer = wpipe;
141: wpipe->pipe_peer = rpipe;
142:
143: FILE_SET_MATURE(rf);
144: FILE_SET_MATURE(wf);
145:
146: fdpunlock(fdp);
147: return (0);
148:
149: free3:
150: fdremove(fdp, retval[0]);
151: closef(rf, p);
152: rpipe = NULL;
153: free2:
154: (void)pipeclose(wpipe);
155: free1:
156: if (rpipe != NULL)
157: (void)pipeclose(rpipe);
158: fdpunlock(fdp);
159: return (error);
160: }
161:
162: /*
163: * Allocate kva for pipe circular buffer, the space is pageable.
164: * This routine will 'realloc' the size of a pipe safely, if it fails
165: * it will retain the old buffer.
166: * If it fails it will return ENOMEM.
167: */
168: int
169: pipespace(struct pipe *cpipe, u_int size)
170: {
171: caddr_t buffer;
172:
173: buffer = (caddr_t)uvm_km_valloc(kernel_map, size);
174: if (buffer == NULL) {
175: return (ENOMEM);
176: }
177:
178: /* free old resources if we are resizing */
179: pipe_free_kmem(cpipe);
180: cpipe->pipe_buffer.buffer = buffer;
181: cpipe->pipe_buffer.size = size;
182: cpipe->pipe_buffer.in = 0;
183: cpipe->pipe_buffer.out = 0;
184: cpipe->pipe_buffer.cnt = 0;
185:
186: amountpipekva += cpipe->pipe_buffer.size;
187:
188: return (0);
189: }
190:
191: /*
192: * initialize and allocate VM and memory for pipe
193: */
194: int
195: pipe_create(struct pipe *cpipe)
196: {
197: int error;
198:
199: /* so pipe_free_kmem() doesn't follow junk pointer */
200: cpipe->pipe_buffer.buffer = NULL;
201: /*
202: * protect so pipeclose() doesn't follow a junk pointer
203: * if pipespace() fails.
204: */
205: bzero(&cpipe->pipe_sel, sizeof cpipe->pipe_sel);
206: cpipe->pipe_state = 0;
207: cpipe->pipe_peer = NULL;
208: cpipe->pipe_busy = 0;
209:
210: error = pipespace(cpipe, PIPE_SIZE);
211: if (error != 0)
212: return (error);
213:
214: nanotime(&cpipe->pipe_ctime);
215: cpipe->pipe_atime = cpipe->pipe_ctime;
216: cpipe->pipe_mtime = cpipe->pipe_ctime;
217: cpipe->pipe_pgid = NO_PID;
218:
219: return (0);
220: }
221:
222:
223: /*
224: * lock a pipe for I/O, blocking other access
225: */
226: static __inline int
227: pipelock(struct pipe *cpipe)
228: {
229: int error;
230: while (cpipe->pipe_state & PIPE_LOCK) {
231: cpipe->pipe_state |= PIPE_LWANT;
232: if ((error = tsleep(cpipe, PRIBIO|PCATCH, "pipelk", 0)))
233: return error;
234: }
235: cpipe->pipe_state |= PIPE_LOCK;
236: return 0;
237: }
238:
239: /*
240: * unlock a pipe I/O lock
241: */
242: static __inline void
243: pipeunlock(struct pipe *cpipe)
244: {
245: cpipe->pipe_state &= ~PIPE_LOCK;
246: if (cpipe->pipe_state & PIPE_LWANT) {
247: cpipe->pipe_state &= ~PIPE_LWANT;
248: wakeup(cpipe);
249: }
250: }
251:
252: static __inline void
253: pipeselwakeup(struct pipe *cpipe)
254: {
255: if (cpipe->pipe_state & PIPE_SEL) {
256: cpipe->pipe_state &= ~PIPE_SEL;
257: selwakeup(&cpipe->pipe_sel);
258: }
259: if ((cpipe->pipe_state & PIPE_ASYNC) && cpipe->pipe_pgid != NO_PID)
260: gsignal(cpipe->pipe_pgid, SIGIO);
261: KNOTE(&cpipe->pipe_sel.si_note, 0);
262: }
263:
264: /* ARGSUSED */
265: int
266: pipe_read(struct file *fp, off_t *poff, struct uio *uio, struct ucred *cred)
267: {
268: struct pipe *rpipe = (struct pipe *) fp->f_data;
269: int error;
270: int nread = 0;
271: int size;
272:
273: error = pipelock(rpipe);
274: if (error)
275: return (error);
276:
277: ++rpipe->pipe_busy;
278:
279: while (uio->uio_resid) {
280: /*
281: * normal pipe buffer receive
282: */
283: if (rpipe->pipe_buffer.cnt > 0) {
284: size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out;
285: if (size > rpipe->pipe_buffer.cnt)
286: size = rpipe->pipe_buffer.cnt;
287: if (size > uio->uio_resid)
288: size = uio->uio_resid;
289: error = uiomove(&rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out],
290: size, uio);
291: if (error) {
292: break;
293: }
294: rpipe->pipe_buffer.out += size;
295: if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size)
296: rpipe->pipe_buffer.out = 0;
297:
298: rpipe->pipe_buffer.cnt -= size;
299: /*
300: * If there is no more to read in the pipe, reset
301: * its pointers to the beginning. This improves
302: * cache hit stats.
303: */
304: if (rpipe->pipe_buffer.cnt == 0) {
305: rpipe->pipe_buffer.in = 0;
306: rpipe->pipe_buffer.out = 0;
307: }
308: nread += size;
309: } else {
310: /*
311: * detect EOF condition
312: * read returns 0 on EOF, no need to set error
313: */
314: if (rpipe->pipe_state & PIPE_EOF)
315: break;
316:
317: /*
318: * If the "write-side" has been blocked, wake it up now.
319: */
320: if (rpipe->pipe_state & PIPE_WANTW) {
321: rpipe->pipe_state &= ~PIPE_WANTW;
322: wakeup(rpipe);
323: }
324:
325: /*
326: * Break if some data was read.
327: */
328: if (nread > 0)
329: break;
330:
331: /*
332: * Unlock the pipe buffer for our remaining processing.
333: * We will either break out with an error or we will
334: * sleep and relock to loop.
335: */
336: pipeunlock(rpipe);
337:
338: /*
339: * Handle non-blocking mode operation or
340: * wait for more data.
341: */
342: if (fp->f_flag & FNONBLOCK) {
343: error = EAGAIN;
344: } else {
345: rpipe->pipe_state |= PIPE_WANTR;
346: if ((error = tsleep(rpipe, PRIBIO|PCATCH, "piperd", 0)) == 0)
347: error = pipelock(rpipe);
348: }
349: if (error)
350: goto unlocked_error;
351: }
352: }
353: pipeunlock(rpipe);
354:
355: if (error == 0)
356: nanotime(&rpipe->pipe_atime);
357: unlocked_error:
358: --rpipe->pipe_busy;
359:
360: /*
361: * PIPE_WANT processing only makes sense if pipe_busy is 0.
362: */
363: if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) {
364: rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW);
365: wakeup(rpipe);
366: } else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) {
367: /*
368: * Handle write blocking hysteresis.
369: */
370: if (rpipe->pipe_state & PIPE_WANTW) {
371: rpipe->pipe_state &= ~PIPE_WANTW;
372: wakeup(rpipe);
373: }
374: }
375:
376: if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) >= PIPE_BUF)
377: pipeselwakeup(rpipe);
378:
379: return (error);
380: }
381:
382: int
383: pipe_write(struct file *fp, off_t *poff, struct uio *uio, struct ucred *cred)
384: {
385: int error = 0;
386: int orig_resid;
387:
388: struct pipe *wpipe, *rpipe;
389:
390: rpipe = (struct pipe *) fp->f_data;
391: wpipe = rpipe->pipe_peer;
392:
393: /*
394: * detect loss of pipe read side, issue SIGPIPE if lost.
395: */
396: if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
397: return (EPIPE);
398: }
399: ++wpipe->pipe_busy;
400:
401: /*
402: * If it is advantageous to resize the pipe buffer, do
403: * so.
404: */
405: if ((uio->uio_resid > PIPE_SIZE) &&
406: (nbigpipe < LIMITBIGPIPES) &&
407: (wpipe->pipe_buffer.size <= PIPE_SIZE) &&
408: (wpipe->pipe_buffer.cnt == 0)) {
409:
410: if ((error = pipelock(wpipe)) == 0) {
411: if (pipespace(wpipe, BIG_PIPE_SIZE) == 0)
412: nbigpipe++;
413: pipeunlock(wpipe);
414: }
415: }
416:
417: /*
418: * If an early error occured unbusy and return, waking up any pending
419: * readers.
420: */
421: if (error) {
422: --wpipe->pipe_busy;
423: if ((wpipe->pipe_busy == 0) &&
424: (wpipe->pipe_state & PIPE_WANT)) {
425: wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR);
426: wakeup(wpipe);
427: }
428: return (error);
429: }
430:
431: orig_resid = uio->uio_resid;
432:
433: while (uio->uio_resid) {
434: int space;
435:
436: retrywrite:
437: if (wpipe->pipe_state & PIPE_EOF) {
438: error = EPIPE;
439: break;
440: }
441:
442: space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
443:
444: /* Writes of size <= PIPE_BUF must be atomic. */
445: if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
446: space = 0;
447:
448: if (space > 0) {
449: if ((error = pipelock(wpipe)) == 0) {
450: int size; /* Transfer size */
451: int segsize; /* first segment to transfer */
452:
453: /*
454: * If a process blocked in uiomove, our
455: * value for space might be bad.
456: *
457: * XXX will we be ok if the reader has gone
458: * away here?
459: */
460: if (space > wpipe->pipe_buffer.size -
461: wpipe->pipe_buffer.cnt) {
462: pipeunlock(wpipe);
463: goto retrywrite;
464: }
465:
466: /*
467: * Transfer size is minimum of uio transfer
468: * and free space in pipe buffer.
469: */
470: if (space > uio->uio_resid)
471: size = uio->uio_resid;
472: else
473: size = space;
474: /*
475: * First segment to transfer is minimum of
476: * transfer size and contiguous space in
477: * pipe buffer. If first segment to transfer
478: * is less than the transfer size, we've got
479: * a wraparound in the buffer.
480: */
481: segsize = wpipe->pipe_buffer.size -
482: wpipe->pipe_buffer.in;
483: if (segsize > size)
484: segsize = size;
485:
486: /* Transfer first segment */
487:
488: error = uiomove(&wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in],
489: segsize, uio);
490:
491: if (error == 0 && segsize < size) {
492: /*
493: * Transfer remaining part now, to
494: * support atomic writes. Wraparound
495: * happened.
496: */
497: #ifdef DIAGNOSTIC
498: if (wpipe->pipe_buffer.in + segsize !=
499: wpipe->pipe_buffer.size)
500: panic("Expected pipe buffer wraparound disappeared");
501: #endif
502:
503: error = uiomove(&wpipe->pipe_buffer.buffer[0],
504: size - segsize, uio);
505: }
506: if (error == 0) {
507: wpipe->pipe_buffer.in += size;
508: if (wpipe->pipe_buffer.in >=
509: wpipe->pipe_buffer.size) {
510: #ifdef DIAGNOSTIC
511: if (wpipe->pipe_buffer.in != size - segsize + wpipe->pipe_buffer.size)
512: panic("Expected wraparound bad");
513: #endif
514: wpipe->pipe_buffer.in = size - segsize;
515: }
516:
517: wpipe->pipe_buffer.cnt += size;
518: #ifdef DIAGNOSTIC
519: if (wpipe->pipe_buffer.cnt > wpipe->pipe_buffer.size)
520: panic("Pipe buffer overflow");
521: #endif
522: }
523: pipeunlock(wpipe);
524: }
525: if (error)
526: break;
527: } else {
528: /*
529: * If the "read-side" has been blocked, wake it up now.
530: */
531: if (wpipe->pipe_state & PIPE_WANTR) {
532: wpipe->pipe_state &= ~PIPE_WANTR;
533: wakeup(wpipe);
534: }
535:
536: /*
537: * don't block on non-blocking I/O
538: */
539: if (fp->f_flag & FNONBLOCK) {
540: error = EAGAIN;
541: break;
542: }
543:
544: /*
545: * We have no more space and have something to offer,
546: * wake up select/poll.
547: */
548: pipeselwakeup(wpipe);
549:
550: wpipe->pipe_state |= PIPE_WANTW;
551: error = tsleep(wpipe, (PRIBIO + 1)|PCATCH,
552: "pipewr", 0);
553: if (error)
554: break;
555: /*
556: * If read side wants to go away, we just issue a
557: * signal to ourselves.
558: */
559: if (wpipe->pipe_state & PIPE_EOF) {
560: error = EPIPE;
561: break;
562: }
563: }
564: }
565:
566: --wpipe->pipe_busy;
567:
568: if ((wpipe->pipe_busy == 0) && (wpipe->pipe_state & PIPE_WANT)) {
569: wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR);
570: wakeup(wpipe);
571: } else if (wpipe->pipe_buffer.cnt > 0) {
572: /*
573: * If we have put any characters in the buffer, we wake up
574: * the reader.
575: */
576: if (wpipe->pipe_state & PIPE_WANTR) {
577: wpipe->pipe_state &= ~PIPE_WANTR;
578: wakeup(wpipe);
579: }
580: }
581:
582: /*
583: * Don't return EPIPE if I/O was successful
584: */
585: if ((wpipe->pipe_buffer.cnt == 0) &&
586: (uio->uio_resid == 0) &&
587: (error == EPIPE)) {
588: error = 0;
589: }
590:
591: if (error == 0)
592: nanotime(&wpipe->pipe_mtime);
593: /*
594: * We have something to offer, wake up select/poll.
595: */
596: if (wpipe->pipe_buffer.cnt)
597: pipeselwakeup(wpipe);
598:
599: return (error);
600: }
601:
602: /*
603: * we implement a very minimal set of ioctls for compatibility with sockets.
604: */
605: int
606: pipe_ioctl(struct file *fp, u_long cmd, caddr_t data, struct proc *p)
607: {
608: struct pipe *mpipe = (struct pipe *)fp->f_data;
609:
610: switch (cmd) {
611:
612: case FIONBIO:
613: return (0);
614:
615: case FIOASYNC:
616: if (*(int *)data) {
617: mpipe->pipe_state |= PIPE_ASYNC;
618: } else {
619: mpipe->pipe_state &= ~PIPE_ASYNC;
620: }
621: return (0);
622:
623: case FIONREAD:
624: *(int *)data = mpipe->pipe_buffer.cnt;
625: return (0);
626:
627: case SIOCSPGRP:
628: mpipe->pipe_pgid = *(int *)data;
629: return (0);
630:
631: case SIOCGPGRP:
632: *(int *)data = mpipe->pipe_pgid;
633: return (0);
634:
635: }
636: return (ENOTTY);
637: }
638:
639: int
640: pipe_poll(struct file *fp, int events, struct proc *p)
641: {
642: struct pipe *rpipe = (struct pipe *)fp->f_data;
643: struct pipe *wpipe;
644: int revents = 0;
645:
646: wpipe = rpipe->pipe_peer;
647: if (events & (POLLIN | POLLRDNORM)) {
648: if ((rpipe->pipe_buffer.cnt > 0) ||
649: (rpipe->pipe_state & PIPE_EOF))
650: revents |= events & (POLLIN | POLLRDNORM);
651: }
652:
653: /* NOTE: POLLHUP and POLLOUT/POLLWRNORM are mutually exclusive */
654: if ((rpipe->pipe_state & PIPE_EOF) ||
655: (wpipe == NULL) ||
656: (wpipe->pipe_state & PIPE_EOF))
657: revents |= POLLHUP;
658: else if (events & (POLLOUT | POLLWRNORM)) {
659: if ((wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF)
660: revents |= events & (POLLOUT | POLLWRNORM);
661: }
662:
663: if (revents == 0) {
664: if (events & (POLLIN | POLLRDNORM)) {
665: selrecord(p, &rpipe->pipe_sel);
666: rpipe->pipe_state |= PIPE_SEL;
667: }
668: if (events & (POLLOUT | POLLWRNORM)) {
669: selrecord(p, &wpipe->pipe_sel);
670: wpipe->pipe_state |= PIPE_SEL;
671: }
672: }
673: return (revents);
674: }
675:
676: int
677: pipe_stat(struct file *fp, struct stat *ub, struct proc *p)
678: {
679: struct pipe *pipe = (struct pipe *)fp->f_data;
680:
681: bzero(ub, sizeof(*ub));
682: ub->st_mode = S_IFIFO;
683: ub->st_blksize = pipe->pipe_buffer.size;
684: ub->st_size = pipe->pipe_buffer.cnt;
685: ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
686: ub->st_atimespec = pipe->pipe_atime;
687: ub->st_mtimespec = pipe->pipe_mtime;
688: ub->st_ctimespec = pipe->pipe_ctime;
689: ub->st_uid = fp->f_cred->cr_uid;
690: ub->st_gid = fp->f_cred->cr_gid;
691: /*
692: * Left as 0: st_dev, st_ino, st_nlink, st_rdev, st_flags, st_gen.
693: * XXX (st_dev, st_ino) should be unique.
694: */
695: return (0);
696: }
697:
698: /* ARGSUSED */
699: int
700: pipe_close(struct file *fp, struct proc *p)
701: {
702: struct pipe *cpipe = (struct pipe *)fp->f_data;
703:
704: fp->f_ops = NULL;
705: fp->f_data = NULL;
706: pipeclose(cpipe);
707: return (0);
708: }
709:
710: void
711: pipe_free_kmem(struct pipe *cpipe)
712: {
713: if (cpipe->pipe_buffer.buffer != NULL) {
714: if (cpipe->pipe_buffer.size > PIPE_SIZE)
715: --nbigpipe;
716: amountpipekva -= cpipe->pipe_buffer.size;
717: uvm_km_free(kernel_map, (vaddr_t)cpipe->pipe_buffer.buffer,
718: cpipe->pipe_buffer.size);
719: cpipe->pipe_buffer.buffer = NULL;
720: }
721: }
722:
723: /*
724: * shutdown the pipe
725: */
726: void
727: pipeclose(struct pipe *cpipe)
728: {
729: struct pipe *ppipe;
730: if (cpipe) {
731:
732: pipeselwakeup(cpipe);
733:
734: /*
735: * If the other side is blocked, wake it up saying that
736: * we want to close it down.
737: */
738: cpipe->pipe_state |= PIPE_EOF;
739: while (cpipe->pipe_busy) {
740: wakeup(cpipe);
741: cpipe->pipe_state |= PIPE_WANT;
742: tsleep(cpipe, PRIBIO, "pipecl", 0);
743: }
744:
745: /*
746: * Disconnect from peer
747: */
748: if ((ppipe = cpipe->pipe_peer) != NULL) {
749: pipeselwakeup(ppipe);
750:
751: ppipe->pipe_state |= PIPE_EOF;
752: wakeup(ppipe);
753: KNOTE(&ppipe->pipe_sel.si_note, 0);
754: ppipe->pipe_peer = NULL;
755: }
756:
757: /*
758: * free resources
759: */
760: pipe_free_kmem(cpipe);
761: pool_put(&pipe_pool, cpipe);
762: }
763: }
764:
765: int
766: pipe_kqfilter(struct file *fp, struct knote *kn)
767: {
768: struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
769: struct pipe *wpipe = rpipe->pipe_peer;
770:
771: switch (kn->kn_filter) {
772: case EVFILT_READ:
773: kn->kn_fop = &pipe_rfiltops;
774: SLIST_INSERT_HEAD(&rpipe->pipe_sel.si_note, kn, kn_selnext);
775: break;
776: case EVFILT_WRITE:
777: if (wpipe == NULL)
778: /* other end of pipe has been closed */
779: return (1);
780: kn->kn_fop = &pipe_wfiltops;
781: SLIST_INSERT_HEAD(&wpipe->pipe_sel.si_note, kn, kn_selnext);
782: break;
783: default:
784: return (1);
785: }
786:
787: return (0);
788: }
789:
790: void
791: filt_pipedetach(struct knote *kn)
792: {
793: struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
794: struct pipe *wpipe = rpipe->pipe_peer;
795:
796: switch (kn->kn_filter) {
797: case EVFILT_READ:
798: SLIST_REMOVE(&rpipe->pipe_sel.si_note, kn, knote, kn_selnext);
799: break;
800: case EVFILT_WRITE:
801: if (wpipe == NULL)
802: return;
803: SLIST_REMOVE(&wpipe->pipe_sel.si_note, kn, knote, kn_selnext);
804: break;
805: }
806: }
807:
808: /*ARGSUSED*/
809: int
810: filt_piperead(struct knote *kn, long hint)
811: {
812: struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
813: struct pipe *wpipe = rpipe->pipe_peer;
814:
815: kn->kn_data = rpipe->pipe_buffer.cnt;
816:
817: if ((rpipe->pipe_state & PIPE_EOF) ||
818: (wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
819: kn->kn_flags |= EV_EOF;
820: return (1);
821: }
822: return (kn->kn_data > 0);
823: }
824:
825: /*ARGSUSED*/
826: int
827: filt_pipewrite(struct knote *kn, long hint)
828: {
829: struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
830: struct pipe *wpipe = rpipe->pipe_peer;
831:
832: if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
833: kn->kn_data = 0;
834: kn->kn_flags |= EV_EOF;
835: return (1);
836: }
837: kn->kn_data = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
838:
839: return (kn->kn_data >= PIPE_BUF);
840: }
841:
842: void
843: pipe_init(void)
844: {
845: pool_init(&pipe_pool, sizeof(struct pipe), 0, 0, 0, "pipepl",
846: &pool_allocator_nointr);
847: }
848:
CVSweb