@@ -1,4 +1,4 @@
-.\" $NetBSD: pcq.9,v 1.5 2010/12/02 12:54:13 wiz Exp $
+.\" $NetBSD: pcq.9,v 1.6 2012/01/22 02:55:47 rmind Exp $
.\"
.\" Copyright (c) 2010 The NetBSD Foundation, Inc.
.\" All rights reserved.
@@ -27,7 +27,7 @@
.\" ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
.\" POSSIBILITY OF SUCH DAMAGE.
.\"
-.Dd January 8, 2010
+.Dd January 22, 2012
.Dt PCQ 9
.Os
.Sh NAME
@@ -95,11 +95,11 @@
Free the resources held by
.Fa pcq .
.It Fn pcq_get "pcq"
-Remove the next item to be consumed from the queue and return
-it.
+Remove the next item to be consumed from the queue and return it.
If the queue is empty,
return
.Dv NULL .
+The caller must prevent concurrent gets from occuring.
.It Fn pcq_maxitems "pcq"
Return the maximum number of items that the queue can store at
any one time.
@@ -132,8 +132,6 @@
.Nm
interface first appeared in
.Nx 6.0 .
-.Sh AUTHORS
-.An Matt Thomas Aq matt@NetBSD.org
.\" .Sh CAVEATS
.\" .Sh BUGS
.\" .Sh SECURITY CONSIDERATIONS
@@ -1,11 +1,11 @@
-/* $NetBSD: subr_pcq.c,v 1.3 2008/11/11 21:45:33 rmind Exp $ */
+/* $NetBSD: subr_pcq.c,v 1.4 2012/01/22 02:55:47 rmind Exp $ */
/*-
- * Copyright (c) 2008 The NetBSD Foundation, Inc.
+ * Copyright (c) 2009 The NetBSD Foundation, Inc.
* All rights reserved.
*
* This code is derived from software contributed to The NetBSD Foundation
- * by Matt Thomas <matt@3am-software.com>
+ * by Andrew Doran.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
@@ -29,173 +29,181 @@
* POSSIBILITY OF SUCH DAMAGE.
*/
+/*
+ * Lockless producer/consumer queue.
+ */
+
#include <sys/cdefs.h>
-__KERNEL_RCSID(0, "$NetBSD: subr_pcq.c,v 1.3 2008/11/11 21:45:33 rmind Exp $");
+__KERNEL_RCSID(0, "$NetBSD: subr_pcq.c,v 1.4 2012/01/22 02:55:47 rmind Exp $");
#include <sys/param.h>
#include <sys/types.h>
#include <sys/atomic.h>
-#include <sys/errno.h>
#include <sys/kmem.h>
#include <sys/pcq.h>
-typedef void * volatile pcq_entry_t;
-
+/*
+ * Internal producer-consumer queue structure. Note: providing a separate
+ * cache-line both for pcq_t::pcq_pc and pcq_t::pcq_items.
+ */
struct pcq {
- pcq_entry_t *pcq_consumer;
- pcq_entry_t *pcq_producer;
- pcq_entry_t *pcq_limit;
- pcq_entry_t pcq_base[];
+ u_int pcq_nitems;
+ uint8_t pcq_pad1[COHERENCY_UNIT - sizeof(u_int)];
+ volatile uint32_t pcq_pc;
+ uint8_t pcq_pad2[COHERENCY_UNIT - sizeof(uint32_t)];
+ void * volatile pcq_items[];
};
-static inline pcq_entry_t *
-pcq_advance(pcq_t *pcq, pcq_entry_t *ptr)
+/*
+ * Producer (p) - stored in the lower 16 bits of pcq_t::pcq_pc.
+ * Consumer (c) - in the higher 16 bits.
+ *
+ * We have a limitation of 16 bits i.e. 0xffff items in the queue.
+ */
+
+static inline void
+pcq_split(uint32_t v, u_int *p, u_int *c)
{
- if (__predict_false(++ptr == pcq->pcq_limit))
- return pcq->pcq_base;
+ *p = v & 0xffff;
+ *c = v >> 16;
+}
- return ptr;
+static inline uint32_t
+pcq_combine(u_int p, u_int c)
+{
+
+ return p | (c << 16);
}
+static inline u_int
+pcq_advance(pcq_t *pcq, u_int pc)
+{
+
+ if (__predict_false(++pc == pcq->pcq_nitems)) {
+ return 0;
+ }
+ return pc;
+}
+
+/*
+ * pcq_put: place an item at the end of the queue.
+ */
bool
pcq_put(pcq_t *pcq, void *item)
{
- pcq_entry_t *producer;
+ uint32_t v, nv;
+ u_int op, p, c;
KASSERT(item != NULL);
+ do {
+ v = pcq->pcq_pc;
+ pcq_split(v, &op, &c);
+ p = pcq_advance(pcq, op);
+ if (p == c) {
+ /* Queue is full. */
+ return false;
+ }
+ nv = pcq_combine(p, c);
+ } while (atomic_cas_32(&pcq->pcq_pc, v, nv) != v);
+
/*
- * Get our starting point, While we are doing this, it is
- * imperative that pcq->pcq_base/pcq->pcq_limit not change
- * in value. If you need to resize a pcq, init a new pcq
- * with the right size and swap pointers to it.
+ * Ensure that the update to pcq_pc is globally visible before the
+ * data item. See pcq_get(). This also ensures that any changes
+ * that the caller made to the data item are globally visible
+ * before we put it onto the list.
*/
- membar_consumer(); /* see updates to pcq_producer */
- producer = pcq->pcq_producer;
- for (;;) {
- /*
- * Preadvance so we reduce the window on updates.
- */
- pcq_entry_t * const new_producer = pcq_advance(pcq, producer);
-
- /*
- * Try to fill an empty slot
- */
- if (NULL == atomic_cas_ptr(producer, NULL, item)) {
- /*
- * We need to use atomic_cas_ptr since another thread
- * might have inserted between these two cas operations
- * and we don't want to overwrite a producer that's
- * more up-to-date.
- */
- atomic_cas_ptr(&pcq->pcq_producer,
- __UNVOLATILE(producer),
- __UNVOLATILE(new_producer));
- /*
- * Tell them we were able to enqueue it.
- */
-#ifndef __HAVE_ATOMIC_AS_MEMBAR
- membar_producer();
+#ifndef _HAVE_ATOMIC_AS_MEMBAR
+ membar_producer();
#endif
- return true;
- }
+ pcq->pcq_items[op] = item;
- /*
- * If we've reached the consumer, we've filled all the
- * slots and there's no more room so return false.
- */
-#ifndef __HAVE_ATOMIC_AS_MEMBAR
- membar_consumer(); /* see updates to pcq_consumer */
-#endif
- if (producer == pcq->pcq_consumer)
- return false;
+ /*
+ * Synchronization activity to wake up the consumer will ensure
+ * that the update to pcq_items[] is visible before the wakeup
+ * arrives. So, we do not need an additonal memory barrier here.
+ */
+ return true;
+}
- /*
- * Let's see if the next slot is free...
- */
- producer = new_producer;
- }
+/*
+ * pcq_peek: return the next item from the queue without removal.
+ */
+void *
+pcq_peek(pcq_t *pcq)
+{
+ const uint32_t v = pcq->pcq_pc;
+ u_int p, c;
+
+ pcq_split(v, &p, &c);
+
+ /* See comment on race below in pcq_get(). */
+ return (p == c) ? NULL : pcq->pcq_items[c];
}
/*
- * It's assumed that the enclosing structure that contains the pcq will
- * provide appropriate locking to prevent concurrent gets from occurring.
+ * pcq_get: remove and return the next item for consumption or NULL if empty.
+ *
+ * => The caller must prevent concurrent gets from occuring.
*/
void *
pcq_get(pcq_t *pcq)
{
- pcq_entry_t * const consumer = pcq->pcq_consumer;
+ uint32_t v, nv;
+ u_int p, c;
void *item;
- /*
- * Updates to pcq_consumer doesn't matter since we control it but we
- * want to make sure that any stores to what it references have
- * completed.
- */
- membar_consumer();
-
- /*
- * If there's nothing to return, just return.
- */
- if ((item = *consumer) == NULL)
+ v = pcq->pcq_pc;
+ pcq_split(v, &p, &c);
+ if (p == c) {
+ /* Queue is empty: nothing to return. */
return NULL;
+ }
+ item = pcq->pcq_items[c];
+ if (item == NULL) {
+ /*
+ * Raced with sender: we rely on a notification (e.g. softint
+ * or wakeup) being generated after the producer's pcq_put(),
+ * causing us to retry pcq_get() later.
+ */
+ return NULL;
+ }
+ pcq->pcq_items[c] = NULL;
+ c = pcq_advance(pcq, c);
+ nv = pcq_combine(p, c);
/*
- * Update the consumer and free the slot.
- * Update the consumer pointer first so when producer == consumer
- * the right thing happens.
- *
- * 1) until the slot set to NULL, pcq_put will fail since
- * the slot != NULL && producer == consumer.
- * 2) consumer is advanced but the slot is still not NULL,
- * pcq_put will advance by one, see that producer == consumer,
- * and fail.
- * 4) Once the slot is set to NULL, the producer can fill the slot
- * and advance the producer.
- *
- * and then we are back to 1.
+ * Ensure that update to pcq_items[] becomes globally visible
+ * before the update to pcq_pc. If it were reodered to occur
+ * after it, we could in theory wipe out a modification made
+ * to pcq_items[] by pcq_put().
*/
- pcq->pcq_consumer = pcq_advance(pcq, consumer);
+#ifndef _HAVE_ATOMIC_AS_MEMBAR
membar_producer();
-
- *consumer = NULL;
- membar_producer();
-
+#endif
+ while (__predict_false(atomic_cas_32(&pcq->pcq_pc, v, nv) != v)) {
+ v = pcq->pcq_pc;
+ pcq_split(v, &p, &c);
+ c = pcq_advance(pcq, c);
+ nv = pcq_combine(p, c);
+ }
return item;
}
-void *
-pcq_peek(pcq_t *pcq)
-{
-
- membar_consumer(); /* see updates to *pcq_consumer */
- return *pcq->pcq_consumer;
-}
-
-size_t
-pcq_maxitems(pcq_t *pcq)
-{
-
- return pcq->pcq_limit - pcq->pcq_base;
-}
-
pcq_t *
-pcq_create(size_t maxitems, km_flag_t kmflags)
+pcq_create(size_t nitems, km_flag_t kmflags)
{
pcq_t *pcq;
- KASSERT(maxitems > 0);
+ KASSERT(nitems > 0 || nitems <= 0xffff);
- pcq = kmem_zalloc(offsetof(pcq_t, pcq_base[maxitems]), kmflags);
- if (__predict_false(pcq == NULL))
+ pcq = kmem_zalloc(offsetof(pcq_t, pcq_items[nitems]), kmflags);
+ if (pcq == NULL) {
return NULL;
-
- pcq->pcq_limit = pcq->pcq_base + maxitems;
- pcq->pcq_producer = pcq->pcq_base;
- pcq->pcq_consumer = pcq->pcq_producer;
-
+ }
+ pcq->pcq_nitems = nitems;
return pcq;
}
@@ -203,7 +211,12 @@
pcq_destroy(pcq_t *pcq)
{
- KASSERT(*pcq->pcq_consumer == NULL);
+ kmem_free(pcq, offsetof(pcq_t, pcq_items[pcq->pcq_nitems]));
+}
- kmem_free(pcq, (uintptr_t)pcq->pcq_limit - (uintptr_t)pcq);
+size_t
+pcq_maxitems(pcq_t *pcq)
+{
+
+ return pcq->pcq_nitems;
}