Sun Jan 22 02:55:47 2012 UTC ()
Replace pcq(9) with the implementation from ad@ and minor changes by me.

PR/40516, PR/45631.


(rmind)
diff -r1.5 -r1.6 src/share/man/man9/pcq.9
diff -r1.3 -r1.4 src/sys/kern/subr_pcq.c

cvs diff -r1.5 -r1.6 src/share/man/man9/pcq.9 (expand / switch to context diff)
--- src/share/man/man9/pcq.9 2010/12/02 12:54:13 1.5
+++ src/share/man/man9/pcq.9 2012/01/22 02:55:47 1.6
@@ -1,4 +1,4 @@
-.\"     $NetBSD: pcq.9,v 1.5 2010/12/02 12:54:13 wiz Exp $
+.\"     $NetBSD: pcq.9,v 1.6 2012/01/22 02:55:47 rmind Exp $
 .\"
 .\" Copyright (c) 2010 The NetBSD Foundation, Inc.
 .\" All rights reserved.
@@ -27,7 +27,7 @@
 .\" ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 .\" POSSIBILITY OF SUCH DAMAGE.
 .\"
-.Dd January 8, 2010
+.Dd January 22, 2012
 .Dt PCQ 9
 .Os
 .Sh NAME
@@ -95,11 +95,11 @@
 Free the resources held by
 .Fa pcq .
 .It Fn pcq_get "pcq"
-Remove the next item to be consumed from the queue and return
-it.
+Remove the next item to be consumed from the queue and return it.
 If the queue is empty,
 return
 .Dv NULL .
+The caller must prevent concurrent gets from occuring.
 .It Fn pcq_maxitems "pcq"
 Return the maximum number of items that the queue can store at
 any one time.
@@ -132,8 +132,6 @@
 .Nm
 interface first appeared in
 .Nx 6.0 .
-.Sh AUTHORS
-.An Matt Thomas Aq matt@NetBSD.org
 .\" .Sh CAVEATS
 .\" .Sh BUGS
 .\" .Sh SECURITY CONSIDERATIONS

cvs diff -r1.3 -r1.4 src/sys/kern/subr_pcq.c (expand / switch to context diff)
--- src/sys/kern/subr_pcq.c 2008/11/11 21:45:33 1.3
+++ src/sys/kern/subr_pcq.c 2012/01/22 02:55:47 1.4
@@ -1,11 +1,11 @@
-/*	$NetBSD: subr_pcq.c,v 1.3 2008/11/11 21:45:33 rmind Exp $	*/
+/*	$NetBSD: subr_pcq.c,v 1.4 2012/01/22 02:55:47 rmind Exp $	*/
 
 /*-
- * Copyright (c) 2008 The NetBSD Foundation, Inc.
+ * Copyright (c) 2009 The NetBSD Foundation, Inc.
  * All rights reserved.
  *
  * This code is derived from software contributed to The NetBSD Foundation
- * by Matt Thomas <matt@3am-software.com>
+ * by Andrew Doran.
  *
  * Redistribution and use in source and binary forms, with or without
  * modification, are permitted provided that the following conditions
@@ -29,173 +29,181 @@
  * POSSIBILITY OF SUCH DAMAGE.
  */
 
+/*
+ * Lockless producer/consumer queue.
+ */
+
 #include <sys/cdefs.h>
-__KERNEL_RCSID(0, "$NetBSD: subr_pcq.c,v 1.3 2008/11/11 21:45:33 rmind Exp $");
+__KERNEL_RCSID(0, "$NetBSD: subr_pcq.c,v 1.4 2012/01/22 02:55:47 rmind Exp $");
 
 #include <sys/param.h>
 #include <sys/types.h>
 #include <sys/atomic.h>
-#include <sys/errno.h>
 #include <sys/kmem.h>
 
 #include <sys/pcq.h>
 
-typedef void * volatile pcq_entry_t;
-
+/*
+ * Internal producer-consumer queue structure.  Note: providing a separate
+ * cache-line both for pcq_t::pcq_pc and pcq_t::pcq_items.
+ */
 struct pcq {
-	pcq_entry_t *pcq_consumer;
-	pcq_entry_t *pcq_producer;
-	pcq_entry_t *pcq_limit;
-	pcq_entry_t pcq_base[];
+	u_int			pcq_nitems;
+	uint8_t			pcq_pad1[COHERENCY_UNIT - sizeof(u_int)];
+	volatile uint32_t	pcq_pc;
+	uint8_t			pcq_pad2[COHERENCY_UNIT - sizeof(uint32_t)];
+	void * volatile		pcq_items[];
 };
 
-static inline pcq_entry_t *
-pcq_advance(pcq_t *pcq, pcq_entry_t *ptr)
+/*
+ * Producer (p) - stored in the lower 16 bits of pcq_t::pcq_pc.
+ * Consumer (c) - in the higher 16 bits.
+ *
+ * We have a limitation of 16 bits i.e. 0xffff items in the queue.
+ */
+
+static inline void
+pcq_split(uint32_t v, u_int *p, u_int *c)
 {
 
-	if (__predict_false(++ptr == pcq->pcq_limit))
-		return pcq->pcq_base;
+	*p = v & 0xffff;
+	*c = v >> 16;
+}
 
-	return ptr;
+static inline uint32_t
+pcq_combine(u_int p, u_int c)
+{
+
+	return p | (c << 16);
 }
 
+static inline u_int
+pcq_advance(pcq_t *pcq, u_int pc)
+{
+
+	if (__predict_false(++pc == pcq->pcq_nitems)) {
+		return 0;
+	}
+	return pc;
+}
+
+/*
+ * pcq_put: place an item at the end of the queue.
+ */
 bool
 pcq_put(pcq_t *pcq, void *item)
 {
-	pcq_entry_t *producer;
+	uint32_t v, nv;
+	u_int op, p, c;
 
 	KASSERT(item != NULL);
 
+	do {
+		v = pcq->pcq_pc;
+		pcq_split(v, &op, &c);
+		p = pcq_advance(pcq, op);
+		if (p == c) {
+			/* Queue is full. */
+			return false;
+		}
+		nv = pcq_combine(p, c);
+	} while (atomic_cas_32(&pcq->pcq_pc, v, nv) != v);
+
 	/*
-	 * Get our starting point,  While we are doing this, it is
-	 * imperative that pcq->pcq_base/pcq->pcq_limit not change
-	 * in value.  If you need to resize a pcq, init a new pcq
-	 * with the right size and swap pointers to it.
+	 * Ensure that the update to pcq_pc is globally visible before the
+	 * data item.  See pcq_get().  This also ensures that any changes
+	 * that the caller made to the data item are globally visible
+	 * before we put it onto the list.
 	 */
-	membar_consumer();	/* see updates to pcq_producer */
-	producer = pcq->pcq_producer;
-	for (;;) {
-		/*
-		 * Preadvance so we reduce the window on updates.
-		 */
-		pcq_entry_t * const new_producer = pcq_advance(pcq, producer);
-
-		/*
-		 * Try to fill an empty slot
-		 */
-		if (NULL == atomic_cas_ptr(producer, NULL, item)) {
-			/*
-			 * We need to use atomic_cas_ptr since another thread
-			 * might have inserted between these two cas operations
-			 * and we don't want to overwrite a producer that's
-			 * more up-to-date.
-			 */
-			atomic_cas_ptr(&pcq->pcq_producer,
-			    __UNVOLATILE(producer), 
-			    __UNVOLATILE(new_producer));
-			/*
-			 * Tell them we were able to enqueue it.
-			 */
-#ifndef __HAVE_ATOMIC_AS_MEMBAR
-			membar_producer();
+#ifndef _HAVE_ATOMIC_AS_MEMBAR
+	membar_producer();
 #endif
-			return true;
-		}
+	pcq->pcq_items[op] = item;
 
-		/*
-		 * If we've reached the consumer, we've filled all the
-		 * slots and there's no more room so return false.
-		 */
-#ifndef __HAVE_ATOMIC_AS_MEMBAR
-		membar_consumer();	/* see updates to pcq_consumer */
-#endif
-		if (producer == pcq->pcq_consumer)
-			return false;
+	/*
+	 * Synchronization activity to wake up the consumer will ensure
+	 * that the update to pcq_items[] is visible before the wakeup
+	 * arrives.  So, we do not need an additonal memory barrier here.
+	 */
+	return true;
+}
 
-		/*
-		 * Let's see if the next slot is free...
-		 */
-		producer = new_producer;
-	}
+/*
+ * pcq_peek: return the next item from the queue without removal.
+ */
+void *
+pcq_peek(pcq_t *pcq)
+{
+	const uint32_t v = pcq->pcq_pc;
+	u_int p, c;
+
+	pcq_split(v, &p, &c);
+
+	/* See comment on race below in pcq_get(). */
+	return (p == c) ? NULL : pcq->pcq_items[c];
 }
 
 /*
- * It's assumed that the enclosing structure that contains the pcq will
- * provide appropriate locking to prevent concurrent gets from occurring.
+ * pcq_get: remove and return the next item for consumption or NULL if empty.
+ *
+ * => The caller must prevent concurrent gets from occuring.
  */
 void *
 pcq_get(pcq_t *pcq)
 {
-	pcq_entry_t * const consumer = pcq->pcq_consumer;
+	uint32_t v, nv;
+	u_int p, c;
 	void *item;
 
-	/*
-	 * Updates to pcq_consumer doesn't matter since we control it but we
-	 * want to make sure that any stores to what it references have
-	 * completed.
-	 */
-	membar_consumer();
-
-	/*
-	 * If there's nothing to return, just return.
-	 */
-	if ((item = *consumer) == NULL)
+	v = pcq->pcq_pc;
+	pcq_split(v, &p, &c);
+	if (p == c) {
+		/* Queue is empty: nothing to return. */
 		return NULL;
+	}
+	item = pcq->pcq_items[c];
+	if (item == NULL) {
+		/*
+		 * Raced with sender: we rely on a notification (e.g. softint
+		 * or wakeup) being generated after the producer's pcq_put(),
+		 * causing us to retry pcq_get() later.
+		 */
+		return NULL;
+	}
+	pcq->pcq_items[c] = NULL;
+	c = pcq_advance(pcq, c);
+	nv = pcq_combine(p, c);
 
 	/*
-	 * Update the consumer and free the slot.
-	 * Update the consumer pointer first so when producer == consumer
-	 * the right thing happens.
-	 *
-	 * 1) until the slot set to NULL, pcq_put will fail since
-	 *    the slot != NULL && producer == consumer.
-	 * 2) consumer is advanced but the slot is still not NULL,
-	 *    pcq_put will advance by one, see that producer == consumer,
-	 *    and fail.
-	 * 4) Once the slot is set to NULL, the producer can fill the slot
-	 *    and advance the producer.
-	 *    
-	 * and then we are back to 1.
+	 * Ensure that update to pcq_items[] becomes globally visible
+	 * before the update to pcq_pc.  If it were reodered to occur
+	 * after it, we could in theory wipe out a modification made
+	 * to pcq_items[] by pcq_put().
 	 */
-	pcq->pcq_consumer = pcq_advance(pcq, consumer);
+#ifndef _HAVE_ATOMIC_AS_MEMBAR
 	membar_producer();
-
-	*consumer = NULL;
-	membar_producer();
-
+#endif
+	while (__predict_false(atomic_cas_32(&pcq->pcq_pc, v, nv) != v)) {
+		v = pcq->pcq_pc;
+		pcq_split(v, &p, &c);
+		c = pcq_advance(pcq, c);
+		nv = pcq_combine(p, c);
+	}
 	return item;
 }
 
-void *
-pcq_peek(pcq_t *pcq)
-{
-
-	membar_consumer();	/* see updates to *pcq_consumer */ 
-	return *pcq->pcq_consumer;
-}
-
-size_t
-pcq_maxitems(pcq_t *pcq)
-{
-
-	return pcq->pcq_limit - pcq->pcq_base;
-}
-
 pcq_t *
-pcq_create(size_t maxitems, km_flag_t kmflags)
+pcq_create(size_t nitems, km_flag_t kmflags)
 {
 	pcq_t *pcq;
 
-	KASSERT(maxitems > 0);
+	KASSERT(nitems > 0 || nitems <= 0xffff);
 
-	pcq = kmem_zalloc(offsetof(pcq_t, pcq_base[maxitems]), kmflags);
-	if (__predict_false(pcq == NULL))
+	pcq = kmem_zalloc(offsetof(pcq_t, pcq_items[nitems]), kmflags);
+	if (pcq == NULL) {
 		return NULL;
-
-	pcq->pcq_limit = pcq->pcq_base + maxitems;
-	pcq->pcq_producer = pcq->pcq_base;
-	pcq->pcq_consumer = pcq->pcq_producer;
-
+	}
+	pcq->pcq_nitems = nitems;
 	return pcq;
 }
 
@@ -203,7 +211,12 @@
 pcq_destroy(pcq_t *pcq)
 {
 
-	KASSERT(*pcq->pcq_consumer == NULL);
+	kmem_free(pcq, offsetof(pcq_t, pcq_items[pcq->pcq_nitems]));
+}
 
-	kmem_free(pcq, (uintptr_t)pcq->pcq_limit - (uintptr_t)pcq);
+size_t
+pcq_maxitems(pcq_t *pcq)
+{
+
+	return pcq->pcq_nitems;
 }