Sun Jan 22 02:55:47 2012 UTC ()
Replace pcq(9) with the implementation from ad@ and minor changes by me.

PR/40516, PR/45631.


(rmind)
diff -r1.5 -r1.6 src/share/man/man9/pcq.9
diff -r1.3 -r1.4 src/sys/kern/subr_pcq.c

cvs diff -r1.5 -r1.6 src/share/man/man9/pcq.9 (expand / switch to unified diff)

--- src/share/man/man9/pcq.9 2010/12/02 12:54:13 1.5
+++ src/share/man/man9/pcq.9 2012/01/22 02:55:47 1.6
@@ -1,14 +1,14 @@ @@ -1,14 +1,14 @@
1.\" $NetBSD: pcq.9,v 1.5 2010/12/02 12:54:13 wiz Exp $ 1.\" $NetBSD: pcq.9,v 1.6 2012/01/22 02:55:47 rmind Exp $
2.\" 2.\"
3.\" Copyright (c) 2010 The NetBSD Foundation, Inc. 3.\" Copyright (c) 2010 The NetBSD Foundation, Inc.
4.\" All rights reserved. 4.\" All rights reserved.
5.\" 5.\"
6.\" This code is derived from software contributed to The NetBSD Foundation 6.\" This code is derived from software contributed to The NetBSD Foundation
7.\" by Matt Thomas. 7.\" by Matt Thomas.
8.\" 8.\"
9.\" Redistribution and use in source and binary forms, with or without 9.\" Redistribution and use in source and binary forms, with or without
10.\" modification, are permitted provided that the following conditions 10.\" modification, are permitted provided that the following conditions
11.\" are met: 11.\" are met:
12.\" 1. Redistributions of source code must retain the above copyright 12.\" 1. Redistributions of source code must retain the above copyright
13.\" notice, this list of conditions and the following disclaimer. 13.\" notice, this list of conditions and the following disclaimer.
14.\" 2. Redistributions in binary form must reproduce the above copyright 14.\" 2. Redistributions in binary form must reproduce the above copyright
@@ -17,27 +17,27 @@ @@ -17,27 +17,27 @@
17.\" 17.\"
18.\" THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS 18.\" THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS
19.\" ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 19.\" ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
20.\" TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 20.\" TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21.\" PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS 21.\" PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS
22.\" BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22.\" BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23.\" CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23.\" CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24.\" SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24.\" SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25.\" INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25.\" INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26.\" CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26.\" CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27.\" ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27.\" ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28.\" POSSIBILITY OF SUCH DAMAGE. 28.\" POSSIBILITY OF SUCH DAMAGE.
29.\" 29.\"
30.Dd January 8, 2010 30.Dd January 22, 2012
31.Dt PCQ 9 31.Dt PCQ 9
32.Os 32.Os
33.Sh NAME 33.Sh NAME
34.Nm pcq 34.Nm pcq
35.Nd producer/consumer queue 35.Nd producer/consumer queue
36.Sh SYNOPSIS 36.Sh SYNOPSIS
37.In sys/pcq.h 37.In sys/pcq.h
38.Ft pcq_t * 38.Ft pcq_t *
39.Fn pcq_create "size_t maxlen" "km_flags_t kmflags" 39.Fn pcq_create "size_t maxlen" "km_flags_t kmflags"
40.Ft void 40.Ft void
41.Fn pcq_destroy "pcq_t *pcq" 41.Fn pcq_destroy "pcq_t *pcq"
42.Ft void * 42.Ft void *
43.Fn pcq_get "pcq_t *pcq" 43.Fn pcq_get "pcq_t *pcq"
@@ -85,31 +85,31 @@ items at one time. @@ -85,31 +85,31 @@ items at one time.
85should be either 85should be either
86.Dv KM_SLEEP , 86.Dv KM_SLEEP ,
87if 87if
88.Fn pcq_create 88.Fn pcq_create
89is allowed to sleep until resources are available, or 89is allowed to sleep until resources are available, or
90.Dv KM_NOSLEEP 90.Dv KM_NOSLEEP
91if it should return 91if it should return
92.Dv NULL 92.Dv NULL
93immediately, if resources are unavailable. 93immediately, if resources are unavailable.
94.It Fn pcq_destroy "pcq" 94.It Fn pcq_destroy "pcq"
95Free the resources held by 95Free the resources held by
96.Fa pcq . 96.Fa pcq .
97.It Fn pcq_get "pcq" 97.It Fn pcq_get "pcq"
98Remove the next item to be consumed from the queue and return 98Remove the next item to be consumed from the queue and return it.
99it. 
100If the queue is empty, 99If the queue is empty,
101return 100return
102.Dv NULL . 101.Dv NULL .
 102The caller must prevent concurrent gets from occuring.
103.It Fn pcq_maxitems "pcq" 103.It Fn pcq_maxitems "pcq"
104Return the maximum number of items that the queue can store at 104Return the maximum number of items that the queue can store at
105any one time. 105any one time.
106.It Fn pcq_peek "pcq" 106.It Fn pcq_peek "pcq"
107Return the next item to be consumed from the queue but do not remove 107Return the next item to be consumed from the queue but do not remove
108it from the queue. 108it from the queue.
109If the queue is empty, 109If the queue is empty,
110return 110return
111.Dv NULL . 111.Dv NULL .
112.It Fn pcq_put "pcq" "item" 112.It Fn pcq_put "pcq" "item"
113Place an item at the end of the queue. 113Place an item at the end of the queue.
114If there is no room in the queue for the item, return 114If there is no room in the queue for the item, return
115.Dv false ; 115.Dv false ;
@@ -122,18 +122,16 @@ The item must not have the value of @@ -122,18 +122,16 @@ The item must not have the value of
122The 122The
123.Nm 123.Nm
124interface is implemented within the file 124interface is implemented within the file
125.Pa sys/kern/subr_pcq.c . 125.Pa sys/kern/subr_pcq.c .
126.\" .Sh EXAMPLES 126.\" .Sh EXAMPLES
127.Sh SEE ALSO 127.Sh SEE ALSO
128.Xr atomic_ops 3 , 128.Xr atomic_ops 3 ,
129.Xr queue 9 129.Xr queue 9
130.Sh HISTORY 130.Sh HISTORY
131The 131The
132.Nm 132.Nm
133interface first appeared in 133interface first appeared in
134.Nx 6.0 . 134.Nx 6.0 .
135.Sh AUTHORS 
136.An Matt Thomas Aq matt@NetBSD.org 
137.\" .Sh CAVEATS 135.\" .Sh CAVEATS
138.\" .Sh BUGS 136.\" .Sh BUGS
139.\" .Sh SECURITY CONSIDERATIONS 137.\" .Sh SECURITY CONSIDERATIONS

cvs diff -r1.3 -r1.4 src/sys/kern/subr_pcq.c (expand / switch to unified diff)

--- src/sys/kern/subr_pcq.c 2008/11/11 21:45:33 1.3
+++ src/sys/kern/subr_pcq.c 2012/01/22 02:55:47 1.4
@@ -1,209 +1,222 @@ @@ -1,209 +1,222 @@
1/* $NetBSD: subr_pcq.c,v 1.3 2008/11/11 21:45:33 rmind Exp $ */ 1/* $NetBSD: subr_pcq.c,v 1.4 2012/01/22 02:55:47 rmind Exp $ */
2 2
3/*- 3/*-
4 * Copyright (c) 2008 The NetBSD Foundation, Inc. 4 * Copyright (c) 2009 The NetBSD Foundation, Inc.
5 * All rights reserved. 5 * All rights reserved.
6 * 6 *
7 * This code is derived from software contributed to The NetBSD Foundation 7 * This code is derived from software contributed to The NetBSD Foundation
8 * by Matt Thomas <matt@3am-software.com> 8 * by Andrew Doran.
9 * 9 *
10 * Redistribution and use in source and binary forms, with or without 10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions 11 * modification, are permitted provided that the following conditions
12 * are met: 12 * are met:
13 * 1. Redistributions of source code must retain the above copyright 13 * 1. Redistributions of source code must retain the above copyright
14 * notice, this list of conditions and the following disclaimer. 14 * notice, this list of conditions and the following disclaimer.
15 * 2. Redistributions in binary form must reproduce the above copyright 15 * 2. Redistributions in binary form must reproduce the above copyright
16 * notice, this list of conditions and the following disclaimer in the 16 * notice, this list of conditions and the following disclaimer in the
17 * documentation and/or other materials provided with the distribution. 17 * documentation and/or other materials provided with the distribution.
18 * 18 *
19 * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS 19 * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS
20 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 20 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
21 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 21 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS 22 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS
23 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 23 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE. 29 * POSSIBILITY OF SUCH DAMAGE.
30 */ 30 */
31 31
 32/*
 33 * Lockless producer/consumer queue.
 34 */
 35
32#include <sys/cdefs.h> 36#include <sys/cdefs.h>
33__KERNEL_RCSID(0, "$NetBSD: subr_pcq.c,v 1.3 2008/11/11 21:45:33 rmind Exp $"); 37__KERNEL_RCSID(0, "$NetBSD: subr_pcq.c,v 1.4 2012/01/22 02:55:47 rmind Exp $");
34 38
35#include <sys/param.h> 39#include <sys/param.h>
36#include <sys/types.h> 40#include <sys/types.h>
37#include <sys/atomic.h> 41#include <sys/atomic.h>
38#include <sys/errno.h> 
39#include <sys/kmem.h> 42#include <sys/kmem.h>
40 43
41#include <sys/pcq.h> 44#include <sys/pcq.h>
42 45
43typedef void * volatile pcq_entry_t; 46/*
44 47 * Internal producer-consumer queue structure. Note: providing a separate
 48 * cache-line both for pcq_t::pcq_pc and pcq_t::pcq_items.
 49 */
45struct pcq { 50struct pcq {
46 pcq_entry_t *pcq_consumer; 51 u_int pcq_nitems;
47 pcq_entry_t *pcq_producer; 52 uint8_t pcq_pad1[COHERENCY_UNIT - sizeof(u_int)];
48 pcq_entry_t *pcq_limit; 53 volatile uint32_t pcq_pc;
49 pcq_entry_t pcq_base[]; 54 uint8_t pcq_pad2[COHERENCY_UNIT - sizeof(uint32_t)];
 55 void * volatile pcq_items[];
50}; 56};
51 57
52static inline pcq_entry_t * 58/*
53pcq_advance(pcq_t *pcq, pcq_entry_t *ptr) 59 * Producer (p) - stored in the lower 16 bits of pcq_t::pcq_pc.
54{ 60 * Consumer (c) - in the higher 16 bits.
 61 *
 62 * We have a limitation of 16 bits i.e. 0xffff items in the queue.
 63 */
55 64
56 if (__predict_false(++ptr == pcq->pcq_limit)) 65static inline void
57 return pcq->pcq_base; 66pcq_split(uint32_t v, u_int *p, u_int *c)
 67{
58 68
59 return ptr; 69 *p = v & 0xffff;
 70 *c = v >> 16;
60} 71}
61 72
62bool 73static inline uint32_t
63pcq_put(pcq_t *pcq, void *item) 74pcq_combine(u_int p, u_int c)
64{ 75{
65 pcq_entry_t *producer; 
66 
67 KASSERT(item != NULL); 
68 
69 /* 
70 * Get our starting point, While we are doing this, it is 
71 * imperative that pcq->pcq_base/pcq->pcq_limit not change 
72 * in value. If you need to resize a pcq, init a new pcq 
73 * with the right size and swap pointers to it. 
74 */ 
75 membar_consumer(); /* see updates to pcq_producer */ 
76 producer = pcq->pcq_producer; 
77 for (;;) { 
78 /* 
79 * Preadvance so we reduce the window on updates. 
80 */ 
81 pcq_entry_t * const new_producer = pcq_advance(pcq, producer); 
82 76
83 /* 77 return p | (c << 16);
84 * Try to fill an empty slot 78}
85 */ 
86 if (NULL == atomic_cas_ptr(producer, NULL, item)) { 
87 /* 
88 * We need to use atomic_cas_ptr since another thread 
89 * might have inserted between these two cas operations 
90 * and we don't want to overwrite a producer that's 
91 * more up-to-date. 
92 */ 
93 atomic_cas_ptr(&pcq->pcq_producer, 
94 __UNVOLATILE(producer),  
95 __UNVOLATILE(new_producer)); 
96 /* 
97 * Tell them we were able to enqueue it. 
98 */ 
99#ifndef __HAVE_ATOMIC_AS_MEMBAR 
100 membar_producer(); 
101#endif 
102 return true; 
103 } 
104 79
105 /* 80static inline u_int
106 * If we've reached the consumer, we've filled all the 81pcq_advance(pcq_t *pcq, u_int pc)
107 * slots and there's no more room so return false. 82{
108 */ 
109#ifndef __HAVE_ATOMIC_AS_MEMBAR 
110 membar_consumer(); /* see updates to pcq_consumer */ 
111#endif 
112 if (producer == pcq->pcq_consumer) 
113 return false; 
114 83
115 /* 84 if (__predict_false(++pc == pcq->pcq_nitems)) {
116 * Let's see if the next slot is free... 85 return 0;
117 */ 
118 producer = new_producer; 
119 } 86 }
 87 return pc;
120} 88}
121 89
122/* 90/*
123 * It's assumed that the enclosing structure that contains the pcq will 91 * pcq_put: place an item at the end of the queue.
124 * provide appropriate locking to prevent concurrent gets from occurring. 
125 */ 92 */
126void * 93bool
127pcq_get(pcq_t *pcq) 94pcq_put(pcq_t *pcq, void *item)
128{ 95{
129 pcq_entry_t * const consumer = pcq->pcq_consumer; 96 uint32_t v, nv;
130 void *item; 97 u_int op, p, c;
131 98
132 /* 99 KASSERT(item != NULL);
133 * Updates to pcq_consumer doesn't matter since we control it but we 
134 * want to make sure that any stores to what it references have 
135 * completed. 
136 */ 
137 membar_consumer(); 
138 100
139 /* 101 do {
140 * If there's nothing to return, just return. 102 v = pcq->pcq_pc;
141 */ 103 pcq_split(v, &op, &c);
142 if ((item = *consumer) == NULL) 104 p = pcq_advance(pcq, op);
143 return NULL; 105 if (p == c) {
 106 /* Queue is full. */
 107 return false;
 108 }
 109 nv = pcq_combine(p, c);
 110 } while (atomic_cas_32(&pcq->pcq_pc, v, nv) != v);
144 111
145 /* 112 /*
146 * Update the consumer and free the slot. 113 * Ensure that the update to pcq_pc is globally visible before the
147 * Update the consumer pointer first so when producer == consumer 114 * data item. See pcq_get(). This also ensures that any changes
148 * the right thing happens. 115 * that the caller made to the data item are globally visible
149 * 116 * before we put it onto the list.
150 * 1) until the slot set to NULL, pcq_put will fail since 
151 * the slot != NULL && producer == consumer. 
152 * 2) consumer is advanced but the slot is still not NULL, 
153 * pcq_put will advance by one, see that producer == consumer, 
154 * and fail. 
155 * 4) Once the slot is set to NULL, the producer can fill the slot 
156 * and advance the producer. 
157 *  
158 * and then we are back to 1. 
159 */ 117 */
160 pcq->pcq_consumer = pcq_advance(pcq, consumer); 118#ifndef _HAVE_ATOMIC_AS_MEMBAR
161 membar_producer(); 
162 
163 *consumer = NULL; 
164 membar_producer(); 119 membar_producer();
 120#endif
 121 pcq->pcq_items[op] = item;
165 122
166 return item; 123 /*
 124 * Synchronization activity to wake up the consumer will ensure
 125 * that the update to pcq_items[] is visible before the wakeup
 126 * arrives. So, we do not need an additonal memory barrier here.
 127 */
 128 return true;
167} 129}
168 130
 131/*
 132 * pcq_peek: return the next item from the queue without removal.
 133 */
169void * 134void *
170pcq_peek(pcq_t *pcq) 135pcq_peek(pcq_t *pcq)
171{ 136{
 137 const uint32_t v = pcq->pcq_pc;
 138 u_int p, c;
172 139
173 membar_consumer(); /* see updates to *pcq_consumer */  140 pcq_split(v, &p, &c);
174 return *pcq->pcq_consumer; 141
 142 /* See comment on race below in pcq_get(). */
 143 return (p == c) ? NULL : pcq->pcq_items[c];
175} 144}
176 145
177size_t 146/*
178pcq_maxitems(pcq_t *pcq) 147 * pcq_get: remove and return the next item for consumption or NULL if empty.
 148 *
 149 * => The caller must prevent concurrent gets from occuring.
 150 */
 151void *
 152pcq_get(pcq_t *pcq)
179{ 153{
 154 uint32_t v, nv;
 155 u_int p, c;
 156 void *item;
180 157
181 return pcq->pcq_limit - pcq->pcq_base; 158 v = pcq->pcq_pc;
 159 pcq_split(v, &p, &c);
 160 if (p == c) {
 161 /* Queue is empty: nothing to return. */
 162 return NULL;
 163 }
 164 item = pcq->pcq_items[c];
 165 if (item == NULL) {
 166 /*
 167 * Raced with sender: we rely on a notification (e.g. softint
 168 * or wakeup) being generated after the producer's pcq_put(),
 169 * causing us to retry pcq_get() later.
 170 */
 171 return NULL;
 172 }
 173 pcq->pcq_items[c] = NULL;
 174 c = pcq_advance(pcq, c);
 175 nv = pcq_combine(p, c);
 176
 177 /*
 178 * Ensure that update to pcq_items[] becomes globally visible
 179 * before the update to pcq_pc. If it were reodered to occur
 180 * after it, we could in theory wipe out a modification made
 181 * to pcq_items[] by pcq_put().
 182 */
 183#ifndef _HAVE_ATOMIC_AS_MEMBAR
 184 membar_producer();
 185#endif
 186 while (__predict_false(atomic_cas_32(&pcq->pcq_pc, v, nv) != v)) {
 187 v = pcq->pcq_pc;
 188 pcq_split(v, &p, &c);
 189 c = pcq_advance(pcq, c);
 190 nv = pcq_combine(p, c);
 191 }
 192 return item;
182} 193}
183 194
184pcq_t * 195pcq_t *
185pcq_create(size_t maxitems, km_flag_t kmflags) 196pcq_create(size_t nitems, km_flag_t kmflags)
186{ 197{
187 pcq_t *pcq; 198 pcq_t *pcq;
188 199
189 KASSERT(maxitems > 0); 200 KASSERT(nitems > 0 || nitems <= 0xffff);
190 201
191 pcq = kmem_zalloc(offsetof(pcq_t, pcq_base[maxitems]), kmflags); 202 pcq = kmem_zalloc(offsetof(pcq_t, pcq_items[nitems]), kmflags);
192 if (__predict_false(pcq == NULL)) 203 if (pcq == NULL) {
193 return NULL; 204 return NULL;
194 205 }
195 pcq->pcq_limit = pcq->pcq_base + maxitems; 206 pcq->pcq_nitems = nitems;
196 pcq->pcq_producer = pcq->pcq_base; 
197 pcq->pcq_consumer = pcq->pcq_producer; 
198 
199 return pcq; 207 return pcq;
200} 208}
201 209
202void 210void
203pcq_destroy(pcq_t *pcq) 211pcq_destroy(pcq_t *pcq)
204{ 212{
205 213
206 KASSERT(*pcq->pcq_consumer == NULL); 214 kmem_free(pcq, offsetof(pcq_t, pcq_items[pcq->pcq_nitems]));
 215}
 216
 217size_t
 218pcq_maxitems(pcq_t *pcq)
 219{
207 220
208 kmem_free(pcq, (uintptr_t)pcq->pcq_limit - (uintptr_t)pcq); 221 return pcq->pcq_nitems;
209} 222}