Wed Dec 26 20:08:22 2018 UTC ()
Use uint64_t for the unbound and per-cpu thread pool ref counts; they're
always manipulated under a lock.  Rather than bother returning EBUSY,
just assert that the ref count never overlows (if it ever does, you have
bigger problems).


(thorpej)
diff -r1.4 -r1.5 src/sys/kern/kern_threadpool.c

cvs diff -r1.4 -r1.5 src/sys/kern/kern_threadpool.c (switch to unified diff)

--- src/sys/kern/kern_threadpool.c 2018/12/26 18:54:19 1.4
+++ src/sys/kern/kern_threadpool.c 2018/12/26 20:08:22 1.5
@@ -1,1062 +1,1054 @@ @@ -1,1062 +1,1054 @@
1/* $NetBSD: kern_threadpool.c,v 1.4 2018/12/26 18:54:19 thorpej Exp $ */ 1/* $NetBSD: kern_threadpool.c,v 1.5 2018/12/26 20:08:22 thorpej Exp $ */
2 2
3/*- 3/*-
4 * Copyright (c) 2014, 2018 The NetBSD Foundation, Inc. 4 * Copyright (c) 2014, 2018 The NetBSD Foundation, Inc.
5 * All rights reserved. 5 * All rights reserved.
6 * 6 *
7 * This code is derived from software contributed to The NetBSD Foundation 7 * This code is derived from software contributed to The NetBSD Foundation
8 * by Taylor R. Campbell and Jason R. Thorpe. 8 * by Taylor R. Campbell and Jason R. Thorpe.
9 * 9 *
10 * Redistribution and use in source and binary forms, with or without 10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions 11 * modification, are permitted provided that the following conditions
12 * are met: 12 * are met:
13 * 1. Redistributions of source code must retain the above copyright 13 * 1. Redistributions of source code must retain the above copyright
14 * notice, this list of conditions and the following disclaimer. 14 * notice, this list of conditions and the following disclaimer.
15 * 2. Redistributions in binary form must reproduce the above copyright 15 * 2. Redistributions in binary form must reproduce the above copyright
16 * notice, this list of conditions and the following disclaimer in the 16 * notice, this list of conditions and the following disclaimer in the
17 * documentation and/or other materials provided with the distribution. 17 * documentation and/or other materials provided with the distribution.
18 * 18 *
19 * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS 19 * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS
20 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 20 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
21 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 21 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS 22 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS
23 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 23 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE. 29 * POSSIBILITY OF SUCH DAMAGE.
30 */ 30 */
31 31
32/* 32/*
33 * Thread pools. 33 * Thread pools.
34 * 34 *
35 * A thread pool is a collection of worker threads idle or running 35 * A thread pool is a collection of worker threads idle or running
36 * jobs, together with an overseer thread that does not run jobs but 36 * jobs, together with an overseer thread that does not run jobs but
37 * can be given jobs to assign to a worker thread. Scheduling a job in 37 * can be given jobs to assign to a worker thread. Scheduling a job in
38 * a thread pool does not allocate or even sleep at all, except perhaps 38 * a thread pool does not allocate or even sleep at all, except perhaps
39 * on an adaptive lock, unlike kthread_create. Jobs reuse threads, so 39 * on an adaptive lock, unlike kthread_create. Jobs reuse threads, so
40 * they do not incur the expense of creating and destroying kthreads 40 * they do not incur the expense of creating and destroying kthreads
41 * unless there is not much work to be done. 41 * unless there is not much work to be done.
42 * 42 *
43 * A per-CPU thread pool (threadpool_percpu) is a collection of thread 43 * A per-CPU thread pool (threadpool_percpu) is a collection of thread
44 * pools, one per CPU bound to that CPU. For each priority level in 44 * pools, one per CPU bound to that CPU. For each priority level in
45 * use, there is one shared unbound thread pool (i.e., pool of threads 45 * use, there is one shared unbound thread pool (i.e., pool of threads
46 * not bound to any CPU) and one shared per-CPU thread pool. 46 * not bound to any CPU) and one shared per-CPU thread pool.
47 * 47 *
48 * To use the unbound thread pool at priority pri, call 48 * To use the unbound thread pool at priority pri, call
49 * threadpool_get(&pool, pri). When you're done, call 49 * threadpool_get(&pool, pri). When you're done, call
50 * threadpool_put(pool, pri). 50 * threadpool_put(pool, pri).
51 * 51 *
52 * To use the per-CPU thread pools at priority pri, call 52 * To use the per-CPU thread pools at priority pri, call
53 * threadpool_percpu_get(&pool_percpu, pri), and then use the thread 53 * threadpool_percpu_get(&pool_percpu, pri), and then use the thread
54 * pool returned by threadpool_percpu_ref(pool_percpu) for the current 54 * pool returned by threadpool_percpu_ref(pool_percpu) for the current
55 * CPU, or by threadpool_percpu_ref_remote(pool_percpu, ci) for another 55 * CPU, or by threadpool_percpu_ref_remote(pool_percpu, ci) for another
56 * CPU. When you're done, call threadpool_percpu_put(pool_percpu, 56 * CPU. When you're done, call threadpool_percpu_put(pool_percpu,
57 * pri). 57 * pri).
58 * 58 *
59 * +--MACHINE-----------------------------------------------+ 59 * +--MACHINE-----------------------------------------------+
60 * | +--CPU 0-------+ +--CPU 1-------+ +--CPU n-------+ | 60 * | +--CPU 0-------+ +--CPU 1-------+ +--CPU n-------+ |
61 * | | <overseer 0> | | <overseer 1> | ... | <overseer n> | | 61 * | | <overseer 0> | | <overseer 1> | ... | <overseer n> | |
62 * | | <idle 0a> | | <running 1a> | ... | <idle na> | | 62 * | | <idle 0a> | | <running 1a> | ... | <idle na> | |
63 * | | <running 0b> | | <running 1b> | ... | <idle nb> | | 63 * | | <running 0b> | | <running 1b> | ... | <idle nb> | |
64 * | | . | | . | ... | . | | 64 * | | . | | . | ... | . | |
65 * | | . | | . | ... | . | | 65 * | | . | | . | ... | . | |
66 * | | . | | . | ... | . | | 66 * | | . | | . | ... | . | |
67 * | +--------------+ +--------------+ +--------------+ | 67 * | +--------------+ +--------------+ +--------------+ |
68 * | +--unbound---------+ | 68 * | +--unbound---------+ |
69 * | | <overseer n+1> | | 69 * | | <overseer n+1> | |
70 * | | <idle (n+1)a> | | 70 * | | <idle (n+1)a> | |
71 * | | <running (n+1)b> | | 71 * | | <running (n+1)b> | |
72 * | +------------------+ | 72 * | +------------------+ |
73 * +--------------------------------------------------------+ 73 * +--------------------------------------------------------+
74 * 74 *
75 * XXX Why one overseer per CPU? I did that originally to avoid 75 * XXX Why one overseer per CPU? I did that originally to avoid
76 * touching remote CPUs' memory when scheduling a job, but that still 76 * touching remote CPUs' memory when scheduling a job, but that still
77 * requires interprocessor synchronization. Perhaps we could get by 77 * requires interprocessor synchronization. Perhaps we could get by
78 * with a single overseer thread, at the expense of another pointer in 78 * with a single overseer thread, at the expense of another pointer in
79 * struct threadpool_job to identify the CPU on which it must run 79 * struct threadpool_job to identify the CPU on which it must run
80 * in order for the overseer to schedule it correctly. 80 * in order for the overseer to schedule it correctly.
81 */ 81 */
82 82
83#include <sys/cdefs.h> 83#include <sys/cdefs.h>
84__KERNEL_RCSID(0, "$NetBSD: kern_threadpool.c,v 1.4 2018/12/26 18:54:19 thorpej Exp $"); 84__KERNEL_RCSID(0, "$NetBSD: kern_threadpool.c,v 1.5 2018/12/26 20:08:22 thorpej Exp $");
85 85
86#include <sys/types.h> 86#include <sys/types.h>
87#include <sys/param.h> 87#include <sys/param.h>
88#include <sys/atomic.h> 88#include <sys/atomic.h>
89#include <sys/condvar.h> 89#include <sys/condvar.h>
90#include <sys/cpu.h> 90#include <sys/cpu.h>
91#include <sys/kernel.h> 91#include <sys/kernel.h>
92#include <sys/kmem.h> 92#include <sys/kmem.h>
93#include <sys/kthread.h> 93#include <sys/kthread.h>
94#include <sys/mutex.h> 94#include <sys/mutex.h>
95#include <sys/once.h> 95#include <sys/once.h>
96#include <sys/percpu.h> 96#include <sys/percpu.h>
97#include <sys/pool.h> 97#include <sys/pool.h>
98#include <sys/proc.h> 98#include <sys/proc.h>
99#include <sys/queue.h> 99#include <sys/queue.h>
100#include <sys/systm.h> 100#include <sys/systm.h>
101#include <sys/threadpool.h> 101#include <sys/threadpool.h>
102 102
103static ONCE_DECL(threadpool_init_once) 103static ONCE_DECL(threadpool_init_once)
104 104
105#define THREADPOOL_INIT() \ 105#define THREADPOOL_INIT() \
106do { \ 106do { \
107 int threadpool_init_error __diagused = \ 107 int threadpool_init_error __diagused = \
108 RUN_ONCE(&threadpool_init_once, threadpools_init); \ 108 RUN_ONCE(&threadpool_init_once, threadpools_init); \
109 KASSERT(threadpool_init_error == 0); \ 109 KASSERT(threadpool_init_error == 0); \
110} while (/*CONSTCOND*/0) 110} while (/*CONSTCOND*/0)
111 111
112/* Data structures */ 112/* Data structures */
113 113
114TAILQ_HEAD(job_head, threadpool_job); 114TAILQ_HEAD(job_head, threadpool_job);
115TAILQ_HEAD(thread_head, threadpool_thread); 115TAILQ_HEAD(thread_head, threadpool_thread);
116 116
117struct threadpool_thread { 117struct threadpool_thread {
118 struct lwp *tpt_lwp; 118 struct lwp *tpt_lwp;
119 struct threadpool *tpt_pool; 119 struct threadpool *tpt_pool;
120 struct threadpool_job *tpt_job; 120 struct threadpool_job *tpt_job;
121 kcondvar_t tpt_cv; 121 kcondvar_t tpt_cv;
122 TAILQ_ENTRY(threadpool_thread) tpt_entry; 122 TAILQ_ENTRY(threadpool_thread) tpt_entry;
123}; 123};
124 124
125struct threadpool { 125struct threadpool {
126 kmutex_t tp_lock; 126 kmutex_t tp_lock;
127 struct threadpool_thread tp_overseer; 127 struct threadpool_thread tp_overseer;
128 struct job_head tp_jobs; 128 struct job_head tp_jobs;
129 struct thread_head tp_idle_threads; 129 struct thread_head tp_idle_threads;
130 unsigned int tp_refcnt; 130 unsigned int tp_refcnt;
131 int tp_flags; 131 int tp_flags;
132#define THREADPOOL_DYING 0x01 132#define THREADPOOL_DYING 0x01
133 struct cpu_info *tp_cpu; 133 struct cpu_info *tp_cpu;
134 pri_t tp_pri; 134 pri_t tp_pri;
135}; 135};
136 136
137static int threadpool_hold(struct threadpool *); 137static int threadpool_hold(struct threadpool *);
138static void threadpool_rele(struct threadpool *); 138static void threadpool_rele(struct threadpool *);
139 139
140static int threadpool_percpu_create(struct threadpool_percpu **, pri_t); 140static int threadpool_percpu_create(struct threadpool_percpu **, pri_t);
141static void threadpool_percpu_destroy(struct threadpool_percpu *); 141static void threadpool_percpu_destroy(struct threadpool_percpu *);
142 142
143static void threadpool_job_dead(struct threadpool_job *); 143static void threadpool_job_dead(struct threadpool_job *);
144 144
145static int threadpool_job_hold(struct threadpool_job *); 145static int threadpool_job_hold(struct threadpool_job *);
146static void threadpool_job_rele(struct threadpool_job *); 146static void threadpool_job_rele(struct threadpool_job *);
147 147
148static void threadpool_overseer_thread(void *) __dead; 148static void threadpool_overseer_thread(void *) __dead;
149static void threadpool_thread(void *) __dead; 149static void threadpool_thread(void *) __dead;
150 150
151static pool_cache_t threadpool_thread_pc __read_mostly; 151static pool_cache_t threadpool_thread_pc __read_mostly;
152 152
153static kmutex_t threadpools_lock __cacheline_aligned; 153static kmutex_t threadpools_lock __cacheline_aligned;
154 154
155 /* Idle out threads after 30 seconds */ 155 /* Idle out threads after 30 seconds */
156#define THREADPOOL_IDLE_TICKS mstohz(30 * 1000) 156#define THREADPOOL_IDLE_TICKS mstohz(30 * 1000)
157 157
158struct threadpool_unbound { 158struct threadpool_unbound {
159 /* must be first; see threadpool_create() */ 159 /* must be first; see threadpool_create() */
160 struct threadpool tpu_pool; 160 struct threadpool tpu_pool;
161 161
162 /* protected by threadpools_lock */ 162 /* protected by threadpools_lock */
163 LIST_ENTRY(threadpool_unbound) tpu_link; 163 LIST_ENTRY(threadpool_unbound) tpu_link;
164 unsigned int tpu_refcnt; 164 uint64_t tpu_refcnt;
165}; 165};
166 166
167static LIST_HEAD(, threadpool_unbound) unbound_threadpools; 167static LIST_HEAD(, threadpool_unbound) unbound_threadpools;
168 168
169static struct threadpool_unbound * 169static struct threadpool_unbound *
170threadpool_lookup_unbound(pri_t pri) 170threadpool_lookup_unbound(pri_t pri)
171{ 171{
172 struct threadpool_unbound *tpu; 172 struct threadpool_unbound *tpu;
173 173
174 LIST_FOREACH(tpu, &unbound_threadpools, tpu_link) { 174 LIST_FOREACH(tpu, &unbound_threadpools, tpu_link) {
175 if (tpu->tpu_pool.tp_pri == pri) 175 if (tpu->tpu_pool.tp_pri == pri)
176 return tpu; 176 return tpu;
177 } 177 }
178 return NULL; 178 return NULL;
179} 179}
180 180
181static void 181static void
182threadpool_insert_unbound(struct threadpool_unbound *tpu) 182threadpool_insert_unbound(struct threadpool_unbound *tpu)
183{ 183{
184 KASSERT(threadpool_lookup_unbound(tpu->tpu_pool.tp_pri) == NULL); 184 KASSERT(threadpool_lookup_unbound(tpu->tpu_pool.tp_pri) == NULL);
185 LIST_INSERT_HEAD(&unbound_threadpools, tpu, tpu_link); 185 LIST_INSERT_HEAD(&unbound_threadpools, tpu, tpu_link);
186} 186}
187 187
188static void 188static void
189threadpool_remove_unbound(struct threadpool_unbound *tpu) 189threadpool_remove_unbound(struct threadpool_unbound *tpu)
190{ 190{
191 KASSERT(threadpool_lookup_unbound(tpu->tpu_pool.tp_pri) == tpu); 191 KASSERT(threadpool_lookup_unbound(tpu->tpu_pool.tp_pri) == tpu);
192 LIST_REMOVE(tpu, tpu_link); 192 LIST_REMOVE(tpu, tpu_link);
193} 193}
194 194
195struct threadpool_percpu { 195struct threadpool_percpu {
196 percpu_t * tpp_percpu; 196 percpu_t * tpp_percpu;
197 pri_t tpp_pri; 197 pri_t tpp_pri;
198 198
199 /* protected by threadpools_lock */ 199 /* protected by threadpools_lock */
200 LIST_ENTRY(threadpool_percpu) tpp_link; 200 LIST_ENTRY(threadpool_percpu) tpp_link;
201 unsigned int tpp_refcnt; 201 uint64_t tpp_refcnt;
202}; 202};
203 203
204static LIST_HEAD(, threadpool_percpu) percpu_threadpools; 204static LIST_HEAD(, threadpool_percpu) percpu_threadpools;
205 205
206static struct threadpool_percpu * 206static struct threadpool_percpu *
207threadpool_lookup_percpu(pri_t pri) 207threadpool_lookup_percpu(pri_t pri)
208{ 208{
209 struct threadpool_percpu *tpp; 209 struct threadpool_percpu *tpp;
210 210
211 LIST_FOREACH(tpp, &percpu_threadpools, tpp_link) { 211 LIST_FOREACH(tpp, &percpu_threadpools, tpp_link) {
212 if (tpp->tpp_pri == pri) 212 if (tpp->tpp_pri == pri)
213 return tpp; 213 return tpp;
214 } 214 }
215 return NULL; 215 return NULL;
216} 216}
217 217
218static void 218static void
219threadpool_insert_percpu(struct threadpool_percpu *tpp) 219threadpool_insert_percpu(struct threadpool_percpu *tpp)
220{ 220{
221 KASSERT(threadpool_lookup_percpu(tpp->tpp_pri) == NULL); 221 KASSERT(threadpool_lookup_percpu(tpp->tpp_pri) == NULL);
222 LIST_INSERT_HEAD(&percpu_threadpools, tpp, tpp_link); 222 LIST_INSERT_HEAD(&percpu_threadpools, tpp, tpp_link);
223} 223}
224 224
225static void 225static void
226threadpool_remove_percpu(struct threadpool_percpu *tpp) 226threadpool_remove_percpu(struct threadpool_percpu *tpp)
227{ 227{
228 KASSERT(threadpool_lookup_percpu(tpp->tpp_pri) == tpp); 228 KASSERT(threadpool_lookup_percpu(tpp->tpp_pri) == tpp);
229 LIST_REMOVE(tpp, tpp_link); 229 LIST_REMOVE(tpp, tpp_link);
230} 230}
231 231
232#ifdef THREADPOOL_VERBOSE 232#ifdef THREADPOOL_VERBOSE
233#define TP_LOG(x) printf x 233#define TP_LOG(x) printf x
234#else 234#else
235#define TP_LOG(x) /* nothing */ 235#define TP_LOG(x) /* nothing */
236#endif /* THREADPOOL_VERBOSE */ 236#endif /* THREADPOOL_VERBOSE */
237 237
238static int 238static int
239threadpools_init(void) 239threadpools_init(void)
240{ 240{
241 241
242 threadpool_thread_pc = 242 threadpool_thread_pc =
243 pool_cache_init(sizeof(struct threadpool_thread), 0, 0, 0, 243 pool_cache_init(sizeof(struct threadpool_thread), 0, 0, 0,
244 "thplthrd", NULL, IPL_NONE, NULL, NULL, NULL); 244 "thplthrd", NULL, IPL_NONE, NULL, NULL, NULL);
245 245
246 LIST_INIT(&unbound_threadpools); 246 LIST_INIT(&unbound_threadpools);
247 LIST_INIT(&percpu_threadpools); 247 LIST_INIT(&percpu_threadpools);
248 mutex_init(&threadpools_lock, MUTEX_DEFAULT, IPL_NONE); 248 mutex_init(&threadpools_lock, MUTEX_DEFAULT, IPL_NONE);
249 249
250 TP_LOG(("%s: sizeof(threadpool_job) = %zu\n", 250 TP_LOG(("%s: sizeof(threadpool_job) = %zu\n",
251 __func__, sizeof(struct threadpool_job))); 251 __func__, sizeof(struct threadpool_job)));
252 252
253 return 0; 253 return 0;
254} 254}
255 255
256/* Thread pool creation */ 256/* Thread pool creation */
257 257
258static bool 258static bool
259threadpool_pri_is_valid(pri_t pri) 259threadpool_pri_is_valid(pri_t pri)
260{ 260{
261 return (pri == PRI_NONE || (pri >= PRI_USER && pri < PRI_COUNT)); 261 return (pri == PRI_NONE || (pri >= PRI_USER && pri < PRI_COUNT));
262} 262}
263 263
264static int 264static int
265threadpool_create(struct threadpool **poolp, struct cpu_info *ci, pri_t pri, 265threadpool_create(struct threadpool **poolp, struct cpu_info *ci, pri_t pri,
266 size_t size) 266 size_t size)
267{ 267{
268 struct threadpool *const pool = kmem_zalloc(size, KM_SLEEP); 268 struct threadpool *const pool = kmem_zalloc(size, KM_SLEEP);
269 struct lwp *lwp; 269 struct lwp *lwp;
270 int ktflags; 270 int ktflags;
271 int error; 271 int error;
272 272
273 KASSERT(threadpool_pri_is_valid(pri)); 273 KASSERT(threadpool_pri_is_valid(pri));
274 274
275 mutex_init(&pool->tp_lock, MUTEX_DEFAULT, IPL_VM); 275 mutex_init(&pool->tp_lock, MUTEX_DEFAULT, IPL_VM);
276 /* XXX overseer */ 276 /* XXX overseer */
277 TAILQ_INIT(&pool->tp_jobs); 277 TAILQ_INIT(&pool->tp_jobs);
278 TAILQ_INIT(&pool->tp_idle_threads); 278 TAILQ_INIT(&pool->tp_idle_threads);
279 pool->tp_refcnt = 0; 279 pool->tp_refcnt = 0;
280 pool->tp_flags = 0; 280 pool->tp_flags = 0;
281 pool->tp_cpu = ci; 281 pool->tp_cpu = ci;
282 pool->tp_pri = pri; 282 pool->tp_pri = pri;
283 283
284 error = threadpool_hold(pool); 284 error = threadpool_hold(pool);
285 KASSERT(error == 0); 285 KASSERT(error == 0);
286 pool->tp_overseer.tpt_lwp = NULL; 286 pool->tp_overseer.tpt_lwp = NULL;
287 pool->tp_overseer.tpt_pool = pool; 287 pool->tp_overseer.tpt_pool = pool;
288 pool->tp_overseer.tpt_job = NULL; 288 pool->tp_overseer.tpt_job = NULL;
289 cv_init(&pool->tp_overseer.tpt_cv, "poolover"); 289 cv_init(&pool->tp_overseer.tpt_cv, "poolover");
290 290
291 ktflags = 0; 291 ktflags = 0;
292 ktflags |= KTHREAD_MPSAFE; 292 ktflags |= KTHREAD_MPSAFE;
293 if (pri < PRI_KERNEL) 293 if (pri < PRI_KERNEL)
294 ktflags |= KTHREAD_TS; 294 ktflags |= KTHREAD_TS;
295 error = kthread_create(pri, ktflags, ci, &threadpool_overseer_thread, 295 error = kthread_create(pri, ktflags, ci, &threadpool_overseer_thread,
296 &pool->tp_overseer, &lwp, 296 &pool->tp_overseer, &lwp,
297 "pooloverseer/%d@%d", (ci ? cpu_index(ci) : -1), (int)pri); 297 "pooloverseer/%d@%d", (ci ? cpu_index(ci) : -1), (int)pri);
298 if (error) 298 if (error)
299 goto fail0; 299 goto fail0;
300 300
301 mutex_spin_enter(&pool->tp_lock); 301 mutex_spin_enter(&pool->tp_lock);
302 pool->tp_overseer.tpt_lwp = lwp; 302 pool->tp_overseer.tpt_lwp = lwp;
303 cv_broadcast(&pool->tp_overseer.tpt_cv); 303 cv_broadcast(&pool->tp_overseer.tpt_cv);
304 mutex_spin_exit(&pool->tp_lock); 304 mutex_spin_exit(&pool->tp_lock);
305 305
306 *poolp = pool; 306 *poolp = pool;
307 return 0; 307 return 0;
308 308
309fail0: KASSERT(error); 309fail0: KASSERT(error);
310 KASSERT(pool->tp_overseer.tpt_job == NULL); 310 KASSERT(pool->tp_overseer.tpt_job == NULL);
311 KASSERT(pool->tp_overseer.tpt_pool == pool); 311 KASSERT(pool->tp_overseer.tpt_pool == pool);
312 KASSERT(pool->tp_flags == 0); 312 KASSERT(pool->tp_flags == 0);
313 KASSERT(pool->tp_refcnt == 0); 313 KASSERT(pool->tp_refcnt == 0);
314 KASSERT(TAILQ_EMPTY(&pool->tp_idle_threads)); 314 KASSERT(TAILQ_EMPTY(&pool->tp_idle_threads));
315 KASSERT(TAILQ_EMPTY(&pool->tp_jobs)); 315 KASSERT(TAILQ_EMPTY(&pool->tp_jobs));
316 KASSERT(!cv_has_waiters(&pool->tp_overseer.tpt_cv)); 316 KASSERT(!cv_has_waiters(&pool->tp_overseer.tpt_cv));
317 cv_destroy(&pool->tp_overseer.tpt_cv); 317 cv_destroy(&pool->tp_overseer.tpt_cv);
318 mutex_destroy(&pool->tp_lock); 318 mutex_destroy(&pool->tp_lock);
319 kmem_free(pool, size); 319 kmem_free(pool, size);
320 return error; 320 return error;
321} 321}
322 322
323/* Thread pool destruction */ 323/* Thread pool destruction */
324 324
325static void 325static void
326threadpool_destroy(struct threadpool *pool, size_t size) 326threadpool_destroy(struct threadpool *pool, size_t size)
327{ 327{
328 struct threadpool_thread *thread; 328 struct threadpool_thread *thread;
329 329
330 /* Mark the pool dying and wait for threads to commit suicide. */ 330 /* Mark the pool dying and wait for threads to commit suicide. */
331 mutex_spin_enter(&pool->tp_lock); 331 mutex_spin_enter(&pool->tp_lock);
332 KASSERT(TAILQ_EMPTY(&pool->tp_jobs)); 332 KASSERT(TAILQ_EMPTY(&pool->tp_jobs));
333 pool->tp_flags |= THREADPOOL_DYING; 333 pool->tp_flags |= THREADPOOL_DYING;
334 cv_broadcast(&pool->tp_overseer.tpt_cv); 334 cv_broadcast(&pool->tp_overseer.tpt_cv);
335 TAILQ_FOREACH(thread, &pool->tp_idle_threads, tpt_entry) 335 TAILQ_FOREACH(thread, &pool->tp_idle_threads, tpt_entry)
336 cv_broadcast(&thread->tpt_cv); 336 cv_broadcast(&thread->tpt_cv);
337 while (0 < pool->tp_refcnt) { 337 while (0 < pool->tp_refcnt) {
338 TP_LOG(("%s: draining %u references...\n", __func__, 338 TP_LOG(("%s: draining %u references...\n", __func__,
339 pool->tp_refcnt)); 339 pool->tp_refcnt));
340 cv_wait(&pool->tp_overseer.tpt_cv, &pool->tp_lock); 340 cv_wait(&pool->tp_overseer.tpt_cv, &pool->tp_lock);
341 } 341 }
342 mutex_spin_exit(&pool->tp_lock); 342 mutex_spin_exit(&pool->tp_lock);
343 343
344 KASSERT(pool->tp_overseer.tpt_job == NULL); 344 KASSERT(pool->tp_overseer.tpt_job == NULL);
345 KASSERT(pool->tp_overseer.tpt_pool == pool); 345 KASSERT(pool->tp_overseer.tpt_pool == pool);
346 KASSERT(pool->tp_flags == THREADPOOL_DYING); 346 KASSERT(pool->tp_flags == THREADPOOL_DYING);
347 KASSERT(pool->tp_refcnt == 0); 347 KASSERT(pool->tp_refcnt == 0);
348 KASSERT(TAILQ_EMPTY(&pool->tp_idle_threads)); 348 KASSERT(TAILQ_EMPTY(&pool->tp_idle_threads));
349 KASSERT(TAILQ_EMPTY(&pool->tp_jobs)); 349 KASSERT(TAILQ_EMPTY(&pool->tp_jobs));
350 KASSERT(!cv_has_waiters(&pool->tp_overseer.tpt_cv)); 350 KASSERT(!cv_has_waiters(&pool->tp_overseer.tpt_cv));
351 cv_destroy(&pool->tp_overseer.tpt_cv); 351 cv_destroy(&pool->tp_overseer.tpt_cv);
352 mutex_destroy(&pool->tp_lock); 352 mutex_destroy(&pool->tp_lock);
353 kmem_free(pool, size); 353 kmem_free(pool, size);
354} 354}
355 355
356static int 356static int
357threadpool_hold(struct threadpool *pool) 357threadpool_hold(struct threadpool *pool)
358{ 358{
359 unsigned int refcnt; 359 unsigned int refcnt;
360 360
361 do { 361 do {
362 refcnt = pool->tp_refcnt; 362 refcnt = pool->tp_refcnt;
363 if (refcnt == UINT_MAX) 363 if (refcnt == UINT_MAX)
364 return EBUSY; 364 return EBUSY;
365 } while (atomic_cas_uint(&pool->tp_refcnt, refcnt, (refcnt + 1)) 365 } while (atomic_cas_uint(&pool->tp_refcnt, refcnt, (refcnt + 1))
366 != refcnt); 366 != refcnt);
367 367
368 return 0; 368 return 0;
369} 369}
370 370
371static void 371static void
372threadpool_rele(struct threadpool *pool) 372threadpool_rele(struct threadpool *pool)
373{ 373{
374 unsigned int refcnt; 374 unsigned int refcnt;
375 375
376 do { 376 do {
377 refcnt = pool->tp_refcnt; 377 refcnt = pool->tp_refcnt;
378 KASSERT(0 < refcnt); 378 KASSERT(0 < refcnt);
379 if (refcnt == 1) { 379 if (refcnt == 1) {
380 mutex_spin_enter(&pool->tp_lock); 380 mutex_spin_enter(&pool->tp_lock);
381 refcnt = atomic_dec_uint_nv(&pool->tp_refcnt); 381 refcnt = atomic_dec_uint_nv(&pool->tp_refcnt);
382 KASSERT(refcnt != UINT_MAX); 382 KASSERT(refcnt != UINT_MAX);
383 if (refcnt == 0) 383 if (refcnt == 0)
384 cv_broadcast(&pool->tp_overseer.tpt_cv); 384 cv_broadcast(&pool->tp_overseer.tpt_cv);
385 mutex_spin_exit(&pool->tp_lock); 385 mutex_spin_exit(&pool->tp_lock);
386 return; 386 return;
387 } 387 }
388 } while (atomic_cas_uint(&pool->tp_refcnt, refcnt, (refcnt - 1)) 388 } while (atomic_cas_uint(&pool->tp_refcnt, refcnt, (refcnt - 1))
389 != refcnt); 389 != refcnt);
390} 390}
391 391
392/* Unbound thread pools */ 392/* Unbound thread pools */
393 393
394int 394int
395threadpool_get(struct threadpool **poolp, pri_t pri) 395threadpool_get(struct threadpool **poolp, pri_t pri)
396{ 396{
397 struct threadpool_unbound *tpu, *tmp = NULL; 397 struct threadpool_unbound *tpu, *tmp = NULL;
398 int error; 398 int error;
399 399
400 THREADPOOL_INIT(); 400 THREADPOOL_INIT();
401 401
402 ASSERT_SLEEPABLE(); 402 ASSERT_SLEEPABLE();
403 403
404 if (! threadpool_pri_is_valid(pri)) 404 if (! threadpool_pri_is_valid(pri))
405 return EINVAL; 405 return EINVAL;
406 406
407 mutex_enter(&threadpools_lock); 407 mutex_enter(&threadpools_lock);
408 tpu = threadpool_lookup_unbound(pri); 408 tpu = threadpool_lookup_unbound(pri);
409 if (tpu == NULL) { 409 if (tpu == NULL) {
410 struct threadpool *new_pool; 410 struct threadpool *new_pool;
411 mutex_exit(&threadpools_lock); 411 mutex_exit(&threadpools_lock);
412 TP_LOG(("%s: No pool for pri=%d, creating one.\n", 412 TP_LOG(("%s: No pool for pri=%d, creating one.\n",
413 __func__, (int)pri)); 413 __func__, (int)pri));
414 error = threadpool_create(&new_pool, NULL, pri, sizeof(*tpu)); 414 error = threadpool_create(&new_pool, NULL, pri, sizeof(*tpu));
415 if (error) 415 if (error)
416 return error; 416 return error;
417 KASSERT(new_pool != NULL); 417 KASSERT(new_pool != NULL);
418 tmp = container_of(new_pool, struct threadpool_unbound, 418 tmp = container_of(new_pool, struct threadpool_unbound,
419 tpu_pool); 419 tpu_pool);
420 mutex_enter(&threadpools_lock); 420 mutex_enter(&threadpools_lock);
421 tpu = threadpool_lookup_unbound(pri); 421 tpu = threadpool_lookup_unbound(pri);
422 if (tpu == NULL) { 422 if (tpu == NULL) {
423 TP_LOG(("%s: Won the creation race for pri=%d.\n", 423 TP_LOG(("%s: Won the creation race for pri=%d.\n",
424 __func__, (int)pri)); 424 __func__, (int)pri));
425 tpu = tmp; 425 tpu = tmp;
426 tmp = NULL; 426 tmp = NULL;
427 threadpool_insert_unbound(tpu); 427 threadpool_insert_unbound(tpu);
428 } 428 }
429 } 429 }
430 KASSERT(tpu != NULL); 430 KASSERT(tpu != NULL);
431 if (tpu->tpu_refcnt == UINT_MAX) { 
432 mutex_exit(&threadpools_lock); 
433 if (tmp != NULL) 
434 threadpool_destroy(&tmp->tpu_pool, sizeof(*tpu)); 
435 return EBUSY; 
436 } 
437 tpu->tpu_refcnt++; 431 tpu->tpu_refcnt++;
 432 KASSERT(tpu->tpu_refcnt != 0);
438 mutex_exit(&threadpools_lock); 433 mutex_exit(&threadpools_lock);
439 434
440 if (tmp != NULL) 435 if (tmp != NULL)
441 threadpool_destroy((struct threadpool *)tmp, sizeof(*tpu)); 436 threadpool_destroy((struct threadpool *)tmp, sizeof(*tpu));
442 KASSERT(tpu != NULL); 437 KASSERT(tpu != NULL);
443 *poolp = &tpu->tpu_pool; 438 *poolp = &tpu->tpu_pool;
444 return 0; 439 return 0;
445} 440}
446 441
447void 442void
448threadpool_put(struct threadpool *pool, pri_t pri) 443threadpool_put(struct threadpool *pool, pri_t pri)
449{ 444{
450 struct threadpool_unbound *tpu = 445 struct threadpool_unbound *tpu =
451 container_of(pool, struct threadpool_unbound, tpu_pool); 446 container_of(pool, struct threadpool_unbound, tpu_pool);
452 447
453 THREADPOOL_INIT(); 448 THREADPOOL_INIT();
454 449
455 ASSERT_SLEEPABLE(); 450 ASSERT_SLEEPABLE();
456 451
457 KASSERT(threadpool_pri_is_valid(pri)); 452 KASSERT(threadpool_pri_is_valid(pri));
458 453
459 mutex_enter(&threadpools_lock); 454 mutex_enter(&threadpools_lock);
460 KASSERT(tpu == threadpool_lookup_unbound(pri)); 455 KASSERT(tpu == threadpool_lookup_unbound(pri));
461 KASSERT(0 < tpu->tpu_refcnt); 456 KASSERT(0 < tpu->tpu_refcnt);
462 if (--tpu->tpu_refcnt == 0) { 457 if (--tpu->tpu_refcnt == 0) {
463 TP_LOG(("%s: Last reference for pri=%d, destroying pool.\n", 458 TP_LOG(("%s: Last reference for pri=%d, destroying pool.\n",
464 __func__, (int)pri)); 459 __func__, (int)pri));
465 threadpool_remove_unbound(tpu); 460 threadpool_remove_unbound(tpu);
466 } else 461 } else {
467 tpu = NULL; 462 tpu = NULL;
 463 }
468 mutex_exit(&threadpools_lock); 464 mutex_exit(&threadpools_lock);
469 465
470 if (tpu) 466 if (tpu)
471 threadpool_destroy(pool, sizeof(*tpu)); 467 threadpool_destroy(pool, sizeof(*tpu));
472} 468}
473 469
474/* Per-CPU thread pools */ 470/* Per-CPU thread pools */
475 471
476int 472int
477threadpool_percpu_get(struct threadpool_percpu **pool_percpup, pri_t pri) 473threadpool_percpu_get(struct threadpool_percpu **pool_percpup, pri_t pri)
478{ 474{
479 struct threadpool_percpu *pool_percpu, *tmp = NULL; 475 struct threadpool_percpu *pool_percpu, *tmp = NULL;
480 int error; 476 int error;
481 477
482 THREADPOOL_INIT(); 478 THREADPOOL_INIT();
483 479
484 ASSERT_SLEEPABLE(); 480 ASSERT_SLEEPABLE();
485 481
486 if (! threadpool_pri_is_valid(pri)) 482 if (! threadpool_pri_is_valid(pri))
487 return EINVAL; 483 return EINVAL;
488 484
489 mutex_enter(&threadpools_lock); 485 mutex_enter(&threadpools_lock);
490 pool_percpu = threadpool_lookup_percpu(pri); 486 pool_percpu = threadpool_lookup_percpu(pri);
491 if (pool_percpu == NULL) { 487 if (pool_percpu == NULL) {
492 mutex_exit(&threadpools_lock); 488 mutex_exit(&threadpools_lock);
493 TP_LOG(("%s: No pool for pri=%d, creating one.\n", 489 TP_LOG(("%s: No pool for pri=%d, creating one.\n",
494 __func__, (int)pri)); 490 __func__, (int)pri));
495 error = threadpool_percpu_create(&tmp, pri); 491 error = threadpool_percpu_create(&tmp, pri);
496 if (error) 492 if (error)
497 return error; 493 return error;
498 KASSERT(tmp != NULL); 494 KASSERT(tmp != NULL);
499 mutex_enter(&threadpools_lock); 495 mutex_enter(&threadpools_lock);
500 pool_percpu = threadpool_lookup_percpu(pri); 496 pool_percpu = threadpool_lookup_percpu(pri);
501 if (pool_percpu == NULL) { 497 if (pool_percpu == NULL) {
502 TP_LOG(("%s: Won the creation race for pri=%d.\n", 498 TP_LOG(("%s: Won the creation race for pri=%d.\n",
503 __func__, (int)pri)); 499 __func__, (int)pri));
504 pool_percpu = tmp; 500 pool_percpu = tmp;
505 tmp = NULL; 501 tmp = NULL;
506 threadpool_insert_percpu(pool_percpu); 502 threadpool_insert_percpu(pool_percpu);
507 } 503 }
508 } 504 }
509 KASSERT(pool_percpu != NULL); 505 KASSERT(pool_percpu != NULL);
510 if (pool_percpu->tpp_refcnt == UINT_MAX) { 
511 mutex_exit(&threadpools_lock); 
512 if (tmp != NULL) 
513 threadpool_percpu_destroy(tmp); 
514 return EBUSY; 
515 } 
516 pool_percpu->tpp_refcnt++; 506 pool_percpu->tpp_refcnt++;
 507 KASSERT(pool_percpu->tpp_refcnt != 0);
517 mutex_exit(&threadpools_lock); 508 mutex_exit(&threadpools_lock);
518 509
519 if (tmp != NULL) 510 if (tmp != NULL)
520 threadpool_percpu_destroy(tmp); 511 threadpool_percpu_destroy(tmp);
521 KASSERT(pool_percpu != NULL); 512 KASSERT(pool_percpu != NULL);
522 *pool_percpup = pool_percpu; 513 *pool_percpup = pool_percpu;
523 return 0; 514 return 0;
524} 515}
525 516
526void 517void
527threadpool_percpu_put(struct threadpool_percpu *pool_percpu, pri_t pri) 518threadpool_percpu_put(struct threadpool_percpu *pool_percpu, pri_t pri)
528{ 519{
529 520
530 THREADPOOL_INIT(); 521 THREADPOOL_INIT();
531 522
532 ASSERT_SLEEPABLE(); 523 ASSERT_SLEEPABLE();
533 524
534 KASSERT(threadpool_pri_is_valid(pri)); 525 KASSERT(threadpool_pri_is_valid(pri));
535 526
536 mutex_enter(&threadpools_lock); 527 mutex_enter(&threadpools_lock);
537 KASSERT(pool_percpu == threadpool_lookup_percpu(pri)); 528 KASSERT(pool_percpu == threadpool_lookup_percpu(pri));
538 KASSERT(0 < pool_percpu->tpp_refcnt); 529 KASSERT(0 < pool_percpu->tpp_refcnt);
539 if (--pool_percpu->tpp_refcnt == 0) { 530 if (--pool_percpu->tpp_refcnt == 0) {
540 TP_LOG(("%s: Last reference for pri=%d, destroying pool.\n", 531 TP_LOG(("%s: Last reference for pri=%d, destroying pool.\n",
541 __func__, (int)pri)); 532 __func__, (int)pri));
542 threadpool_remove_percpu(pool_percpu); 533 threadpool_remove_percpu(pool_percpu);
543 } else 534 } else {
544 pool_percpu = NULL; 535 pool_percpu = NULL;
 536 }
545 mutex_exit(&threadpools_lock); 537 mutex_exit(&threadpools_lock);
546 538
547 if (pool_percpu) 539 if (pool_percpu)
548 threadpool_percpu_destroy(pool_percpu); 540 threadpool_percpu_destroy(pool_percpu);
549} 541}
550 542
551struct threadpool * 543struct threadpool *
552threadpool_percpu_ref(struct threadpool_percpu *pool_percpu) 544threadpool_percpu_ref(struct threadpool_percpu *pool_percpu)
553{ 545{
554 struct threadpool **poolp, *pool; 546 struct threadpool **poolp, *pool;
555 547
556 poolp = percpu_getref(pool_percpu->tpp_percpu); 548 poolp = percpu_getref(pool_percpu->tpp_percpu);
557 pool = *poolp; 549 pool = *poolp;
558 percpu_putref(pool_percpu->tpp_percpu); 550 percpu_putref(pool_percpu->tpp_percpu);
559 551
560 return pool; 552 return pool;
561} 553}
562 554
563struct threadpool * 555struct threadpool *
564threadpool_percpu_ref_remote(struct threadpool_percpu *pool_percpu, 556threadpool_percpu_ref_remote(struct threadpool_percpu *pool_percpu,
565 struct cpu_info *ci) 557 struct cpu_info *ci)
566{ 558{
567 struct threadpool **poolp, *pool; 559 struct threadpool **poolp, *pool;
568 560
569 percpu_traverse_enter(); 561 percpu_traverse_enter();
570 poolp = percpu_getptr_remote(pool_percpu->tpp_percpu, ci); 562 poolp = percpu_getptr_remote(pool_percpu->tpp_percpu, ci);
571 pool = *poolp; 563 pool = *poolp;
572 percpu_traverse_exit(); 564 percpu_traverse_exit();
573 565
574 return pool; 566 return pool;
575} 567}
576 568
577static int 569static int
578threadpool_percpu_create(struct threadpool_percpu **pool_percpup, pri_t pri) 570threadpool_percpu_create(struct threadpool_percpu **pool_percpup, pri_t pri)
579{ 571{
580 struct threadpool_percpu *pool_percpu; 572 struct threadpool_percpu *pool_percpu;
581 struct cpu_info *ci; 573 struct cpu_info *ci;
582 CPU_INFO_ITERATOR cii; 574 CPU_INFO_ITERATOR cii;
583 unsigned int i, j; 575 unsigned int i, j;
584 int error; 576 int error;
585 577
586 pool_percpu = kmem_zalloc(sizeof(*pool_percpu), KM_SLEEP); 578 pool_percpu = kmem_zalloc(sizeof(*pool_percpu), KM_SLEEP);
587 if (pool_percpu == NULL) { 579 if (pool_percpu == NULL) {
588 error = ENOMEM; 580 error = ENOMEM;
589 goto fail0; 581 goto fail0;
590 } 582 }
591 pool_percpu->tpp_pri = pri; 583 pool_percpu->tpp_pri = pri;
592 584
593 pool_percpu->tpp_percpu = percpu_alloc(sizeof(struct threadpool *)); 585 pool_percpu->tpp_percpu = percpu_alloc(sizeof(struct threadpool *));
594 if (pool_percpu->tpp_percpu == NULL) { 586 if (pool_percpu->tpp_percpu == NULL) {
595 error = ENOMEM; 587 error = ENOMEM;
596 goto fail1; 588 goto fail1;
597 } 589 }
598 590
599 for (i = 0, CPU_INFO_FOREACH(cii, ci), i++) { 591 for (i = 0, CPU_INFO_FOREACH(cii, ci), i++) {
600 struct threadpool *pool; 592 struct threadpool *pool;
601 593
602 error = threadpool_create(&pool, ci, pri, sizeof(*pool)); 594 error = threadpool_create(&pool, ci, pri, sizeof(*pool));
603 if (error) 595 if (error)
604 goto fail2; 596 goto fail2;
605 percpu_traverse_enter(); 597 percpu_traverse_enter();
606 struct threadpool **const poolp = 598 struct threadpool **const poolp =
607 percpu_getptr_remote(pool_percpu->tpp_percpu, ci); 599 percpu_getptr_remote(pool_percpu->tpp_percpu, ci);
608 *poolp = pool; 600 *poolp = pool;
609 percpu_traverse_exit(); 601 percpu_traverse_exit();
610 } 602 }
611 603
612 /* Success! */ 604 /* Success! */
613 *pool_percpup = (struct threadpool_percpu *)pool_percpu; 605 *pool_percpup = (struct threadpool_percpu *)pool_percpu;
614 return 0; 606 return 0;
615 607
616fail2: for (j = 0, CPU_INFO_FOREACH(cii, ci), j++) { 608fail2: for (j = 0, CPU_INFO_FOREACH(cii, ci), j++) {
617 if (i <= j) 609 if (i <= j)
618 break; 610 break;
619 percpu_traverse_enter(); 611 percpu_traverse_enter();
620 struct threadpool **const poolp = 612 struct threadpool **const poolp =
621 percpu_getptr_remote(pool_percpu->tpp_percpu, ci); 613 percpu_getptr_remote(pool_percpu->tpp_percpu, ci);
622 struct threadpool *const pool = *poolp; 614 struct threadpool *const pool = *poolp;
623 percpu_traverse_exit(); 615 percpu_traverse_exit();
624 threadpool_destroy(pool, sizeof(*pool)); 616 threadpool_destroy(pool, sizeof(*pool));
625 } 617 }
626 percpu_free(pool_percpu->tpp_percpu, sizeof(struct taskthread_pool *)); 618 percpu_free(pool_percpu->tpp_percpu, sizeof(struct taskthread_pool *));
627fail1: kmem_free(pool_percpu, sizeof(*pool_percpu)); 619fail1: kmem_free(pool_percpu, sizeof(*pool_percpu));
628fail0: return error; 620fail0: return error;
629} 621}
630 622
631static void 623static void
632threadpool_percpu_destroy(struct threadpool_percpu *pool_percpu) 624threadpool_percpu_destroy(struct threadpool_percpu *pool_percpu)
633{ 625{
634 struct cpu_info *ci; 626 struct cpu_info *ci;
635 CPU_INFO_ITERATOR cii; 627 CPU_INFO_ITERATOR cii;
636 628
637 for (CPU_INFO_FOREACH(cii, ci)) { 629 for (CPU_INFO_FOREACH(cii, ci)) {
638 percpu_traverse_enter(); 630 percpu_traverse_enter();
639 struct threadpool **const poolp = 631 struct threadpool **const poolp =
640 percpu_getptr_remote(pool_percpu->tpp_percpu, ci); 632 percpu_getptr_remote(pool_percpu->tpp_percpu, ci);
641 struct threadpool *const pool = *poolp; 633 struct threadpool *const pool = *poolp;
642 percpu_traverse_exit(); 634 percpu_traverse_exit();
643 threadpool_destroy(pool, sizeof(*pool)); 635 threadpool_destroy(pool, sizeof(*pool));
644 } 636 }
645 637
646 percpu_free(pool_percpu->tpp_percpu, sizeof(struct threadpool *)); 638 percpu_free(pool_percpu->tpp_percpu, sizeof(struct threadpool *));
647 kmem_free(pool_percpu, sizeof(*pool_percpu)); 639 kmem_free(pool_percpu, sizeof(*pool_percpu));
648} 640}
649 641
650/* Thread pool jobs */ 642/* Thread pool jobs */
651 643
652void __printflike(4,5) 644void __printflike(4,5)
653threadpool_job_init(struct threadpool_job *job, threadpool_job_fn_t fn, 645threadpool_job_init(struct threadpool_job *job, threadpool_job_fn_t fn,
654 kmutex_t *lock, const char *fmt, ...) 646 kmutex_t *lock, const char *fmt, ...)
655{ 647{
656 va_list ap; 648 va_list ap;
657 649
658 va_start(ap, fmt); 650 va_start(ap, fmt);
659 (void)vsnprintf(job->job_name, sizeof(job->job_name), fmt, ap); 651 (void)vsnprintf(job->job_name, sizeof(job->job_name), fmt, ap);
660 va_end(ap); 652 va_end(ap);
661 653
662 job->job_lock = lock; 654 job->job_lock = lock;
663 job->job_thread = NULL; 655 job->job_thread = NULL;
664 job->job_refcnt = 0; 656 job->job_refcnt = 0;
665 cv_init(&job->job_cv, job->job_name); 657 cv_init(&job->job_cv, job->job_name);
666 job->job_fn = fn; 658 job->job_fn = fn;
667} 659}
668 660
669static void 661static void
670threadpool_job_dead(struct threadpool_job *job) 662threadpool_job_dead(struct threadpool_job *job)
671{ 663{
672 664
673 panic("threadpool job %p ran after destruction", job); 665 panic("threadpool job %p ran after destruction", job);
674} 666}
675 667
676void 668void
677threadpool_job_destroy(struct threadpool_job *job) 669threadpool_job_destroy(struct threadpool_job *job)
678{ 670{
679 671
680 ASSERT_SLEEPABLE(); 672 ASSERT_SLEEPABLE();
681 673
682 KASSERTMSG((job->job_thread == NULL), "job %p still running", job); 674 KASSERTMSG((job->job_thread == NULL), "job %p still running", job);
683 675
684 mutex_enter(job->job_lock); 676 mutex_enter(job->job_lock);
685 while (0 < job->job_refcnt) 677 while (0 < job->job_refcnt)
686 cv_wait(&job->job_cv, job->job_lock); 678 cv_wait(&job->job_cv, job->job_lock);
687 mutex_exit(job->job_lock); 679 mutex_exit(job->job_lock);
688 680
689 job->job_lock = NULL; 681 job->job_lock = NULL;
690 KASSERT(job->job_thread == NULL); 682 KASSERT(job->job_thread == NULL);
691 KASSERT(job->job_refcnt == 0); 683 KASSERT(job->job_refcnt == 0);
692 KASSERT(!cv_has_waiters(&job->job_cv)); 684 KASSERT(!cv_has_waiters(&job->job_cv));
693 cv_destroy(&job->job_cv); 685 cv_destroy(&job->job_cv);
694 job->job_fn = threadpool_job_dead; 686 job->job_fn = threadpool_job_dead;
695 (void)strlcpy(job->job_name, "deadjob", sizeof(job->job_name)); 687 (void)strlcpy(job->job_name, "deadjob", sizeof(job->job_name));
696} 688}
697 689
698static int 690static int
699threadpool_job_hold(struct threadpool_job *job) 691threadpool_job_hold(struct threadpool_job *job)
700{ 692{
701 unsigned int refcnt; 693 unsigned int refcnt;
702 do { 694 do {
703 refcnt = job->job_refcnt; 695 refcnt = job->job_refcnt;
704 if (refcnt == UINT_MAX) 696 if (refcnt == UINT_MAX)
705 return EBUSY; 697 return EBUSY;
706 } while (atomic_cas_uint(&job->job_refcnt, refcnt, (refcnt + 1)) 698 } while (atomic_cas_uint(&job->job_refcnt, refcnt, (refcnt + 1))
707 != refcnt); 699 != refcnt);
708  700
709 return 0; 701 return 0;
710} 702}
711 703
712static void 704static void
713threadpool_job_rele(struct threadpool_job *job) 705threadpool_job_rele(struct threadpool_job *job)
714{ 706{
715 unsigned int refcnt; 707 unsigned int refcnt;
716 708
717 do { 709 do {
718 refcnt = job->job_refcnt; 710 refcnt = job->job_refcnt;
719 KASSERT(0 < refcnt); 711 KASSERT(0 < refcnt);
720 if (refcnt == 1) { 712 if (refcnt == 1) {
721 mutex_enter(job->job_lock); 713 mutex_enter(job->job_lock);
722 refcnt = atomic_dec_uint_nv(&job->job_refcnt); 714 refcnt = atomic_dec_uint_nv(&job->job_refcnt);
723 KASSERT(refcnt != UINT_MAX); 715 KASSERT(refcnt != UINT_MAX);
724 if (refcnt == 0) 716 if (refcnt == 0)
725 cv_broadcast(&job->job_cv); 717 cv_broadcast(&job->job_cv);
726 mutex_exit(job->job_lock); 718 mutex_exit(job->job_lock);
727 return; 719 return;
728 } 720 }
729 } while (atomic_cas_uint(&job->job_refcnt, refcnt, (refcnt - 1)) 721 } while (atomic_cas_uint(&job->job_refcnt, refcnt, (refcnt - 1))
730 != refcnt); 722 != refcnt);
731} 723}
732 724
733void 725void
734threadpool_job_done(struct threadpool_job *job) 726threadpool_job_done(struct threadpool_job *job)
735{ 727{
736 728
737 KASSERT(mutex_owned(job->job_lock)); 729 KASSERT(mutex_owned(job->job_lock));
738 KASSERT(job->job_thread != NULL); 730 KASSERT(job->job_thread != NULL);
739 KASSERT(job->job_thread->tpt_lwp == curlwp); 731 KASSERT(job->job_thread->tpt_lwp == curlwp);
740 732
741 cv_broadcast(&job->job_cv); 733 cv_broadcast(&job->job_cv);
742 job->job_thread = NULL; 734 job->job_thread = NULL;
743} 735}
744 736
745void 737void
746threadpool_schedule_job(struct threadpool *pool, struct threadpool_job *job) 738threadpool_schedule_job(struct threadpool *pool, struct threadpool_job *job)
747{ 739{
748 740
749 KASSERT(mutex_owned(job->job_lock)); 741 KASSERT(mutex_owned(job->job_lock));
750 742
751 /* 743 /*
752 * If the job's already running, let it keep running. The job 744 * If the job's already running, let it keep running. The job
753 * is guaranteed by the interlock not to end early -- if it had 745 * is guaranteed by the interlock not to end early -- if it had
754 * ended early, threadpool_job_done would have set job_thread 746 * ended early, threadpool_job_done would have set job_thread
755 * to NULL under the interlock. 747 * to NULL under the interlock.
756 */ 748 */
757 if (__predict_true(job->job_thread != NULL)) { 749 if (__predict_true(job->job_thread != NULL)) {
758 TP_LOG(("%s: job '%s' already runnining.\n", 750 TP_LOG(("%s: job '%s' already runnining.\n",
759 __func__, job->job_name)); 751 __func__, job->job_name));
760 return; 752 return;
761 } 753 }
762 754
763 /* Otherwise, try to assign a thread to the job. */ 755 /* Otherwise, try to assign a thread to the job. */
764 mutex_spin_enter(&pool->tp_lock); 756 mutex_spin_enter(&pool->tp_lock);
765 if (__predict_false(TAILQ_EMPTY(&pool->tp_idle_threads))) { 757 if (__predict_false(TAILQ_EMPTY(&pool->tp_idle_threads))) {
766 /* Nobody's idle. Give it to the overseer. */ 758 /* Nobody's idle. Give it to the overseer. */
767 TP_LOG(("%s: giving job '%s' to overseer.\n", 759 TP_LOG(("%s: giving job '%s' to overseer.\n",
768 __func__, job->job_name)); 760 __func__, job->job_name));
769 job->job_thread = &pool->tp_overseer; 761 job->job_thread = &pool->tp_overseer;
770 TAILQ_INSERT_TAIL(&pool->tp_jobs, job, job_entry); 762 TAILQ_INSERT_TAIL(&pool->tp_jobs, job, job_entry);
771 } else { 763 } else {
772 /* Assign it to the first idle thread. */ 764 /* Assign it to the first idle thread. */
773 job->job_thread = TAILQ_FIRST(&pool->tp_idle_threads); 765 job->job_thread = TAILQ_FIRST(&pool->tp_idle_threads);
774 TP_LOG(("%s: giving job '%s' to idle thread %p.\n", 766 TP_LOG(("%s: giving job '%s' to idle thread %p.\n",
775 __func__, job->job_name, job->job_thread)); 767 __func__, job->job_name, job->job_thread));
776 TAILQ_REMOVE(&pool->tp_idle_threads, job->job_thread, 768 TAILQ_REMOVE(&pool->tp_idle_threads, job->job_thread,
777 tpt_entry); 769 tpt_entry);
778 threadpool_job_hold(job); 770 threadpool_job_hold(job);
779 job->job_thread->tpt_job = job; 771 job->job_thread->tpt_job = job;
780 } 772 }
781 773
782 /* Notify whomever we gave it to, overseer or idle thread. */ 774 /* Notify whomever we gave it to, overseer or idle thread. */
783 KASSERT(job->job_thread != NULL); 775 KASSERT(job->job_thread != NULL);
784 cv_broadcast(&job->job_thread->tpt_cv); 776 cv_broadcast(&job->job_thread->tpt_cv);
785 mutex_spin_exit(&pool->tp_lock); 777 mutex_spin_exit(&pool->tp_lock);
786} 778}
787 779
788bool 780bool
789threadpool_cancel_job_async(struct threadpool *pool, struct threadpool_job *job) 781threadpool_cancel_job_async(struct threadpool *pool, struct threadpool_job *job)
790{ 782{
791 783
792 KASSERT(mutex_owned(job->job_lock)); 784 KASSERT(mutex_owned(job->job_lock));
793 785
794 /* 786 /*
795 * XXXJRT This fails (albeit safely) when all of the following 787 * XXXJRT This fails (albeit safely) when all of the following
796 * are true: 788 * are true:
797 * 789 *
798 * => "pool" is something other than what the job was 790 * => "pool" is something other than what the job was
799 * scheduled on. This can legitimately occur if, 791 * scheduled on. This can legitimately occur if,
800 * for example, a job is percpu-scheduled on CPU0 792 * for example, a job is percpu-scheduled on CPU0
801 * and then CPU1 attempts to cancel it without taking 793 * and then CPU1 attempts to cancel it without taking
802 * a remote pool reference. (this might happen by 794 * a remote pool reference. (this might happen by
803 * "luck of the draw"). 795 * "luck of the draw").
804 * 796 *
805 * => "job" is not yet running, but is assigned to the 797 * => "job" is not yet running, but is assigned to the
806 * overseer. 798 * overseer.
807 * 799 *
808 * When this happens, this code makes the determination that 800 * When this happens, this code makes the determination that
809 * the job is already running. The failure mode is that the 801 * the job is already running. The failure mode is that the
810 * caller is told the job is running, and thus has to wait. 802 * caller is told the job is running, and thus has to wait.
811 * The overseer will eventually get to it and the job will 803 * The overseer will eventually get to it and the job will
812 * proceed as if it had been already running. 804 * proceed as if it had been already running.
813 */ 805 */
814 806
815 if (job->job_thread == NULL) { 807 if (job->job_thread == NULL) {
816 /* Nothing to do. Guaranteed not running. */ 808 /* Nothing to do. Guaranteed not running. */
817 return true; 809 return true;
818 } else if (job->job_thread == &pool->tp_overseer) { 810 } else if (job->job_thread == &pool->tp_overseer) {
819 /* Take it off the list to guarantee it won't run. */ 811 /* Take it off the list to guarantee it won't run. */
820 job->job_thread = NULL; 812 job->job_thread = NULL;
821 mutex_spin_enter(&pool->tp_lock); 813 mutex_spin_enter(&pool->tp_lock);
822 TAILQ_REMOVE(&pool->tp_jobs, job, job_entry); 814 TAILQ_REMOVE(&pool->tp_jobs, job, job_entry);
823 mutex_spin_exit(&pool->tp_lock); 815 mutex_spin_exit(&pool->tp_lock);
824 return true; 816 return true;
825 } else { 817 } else {
826 /* Too late -- already running. */ 818 /* Too late -- already running. */
827 return false; 819 return false;
828 } 820 }
829} 821}
830 822
831void 823void
832threadpool_cancel_job(struct threadpool *pool, struct threadpool_job *job) 824threadpool_cancel_job(struct threadpool *pool, struct threadpool_job *job)
833{ 825{
834 826
835 ASSERT_SLEEPABLE(); 827 ASSERT_SLEEPABLE();
836 828
837 KASSERT(mutex_owned(job->job_lock)); 829 KASSERT(mutex_owned(job->job_lock));
838 830
839 if (threadpool_cancel_job_async(pool, job)) 831 if (threadpool_cancel_job_async(pool, job))
840 return; 832 return;
841 833
842 /* Already running. Wait for it to complete. */ 834 /* Already running. Wait for it to complete. */
843 while (job->job_thread != NULL) 835 while (job->job_thread != NULL)
844 cv_wait(&job->job_cv, job->job_lock); 836 cv_wait(&job->job_cv, job->job_lock);
845} 837}
846 838
847/* Thread pool overseer thread */ 839/* Thread pool overseer thread */
848 840
849static void __dead 841static void __dead
850threadpool_overseer_thread(void *arg) 842threadpool_overseer_thread(void *arg)
851{ 843{
852 struct threadpool_thread *const overseer = arg; 844 struct threadpool_thread *const overseer = arg;
853 struct threadpool *const pool = overseer->tpt_pool; 845 struct threadpool *const pool = overseer->tpt_pool;
854 struct lwp *lwp = NULL; 846 struct lwp *lwp = NULL;
855 int ktflags; 847 int ktflags;
856 int error; 848 int error;
857 849
858 KASSERT((pool->tp_cpu == NULL) || (pool->tp_cpu == curcpu())); 850 KASSERT((pool->tp_cpu == NULL) || (pool->tp_cpu == curcpu()));
859 851
860 /* Wait until we're initialized. */ 852 /* Wait until we're initialized. */
861 mutex_spin_enter(&pool->tp_lock); 853 mutex_spin_enter(&pool->tp_lock);
862 while (overseer->tpt_lwp == NULL) 854 while (overseer->tpt_lwp == NULL)
863 cv_wait(&overseer->tpt_cv, &pool->tp_lock); 855 cv_wait(&overseer->tpt_cv, &pool->tp_lock);
864 856
865 TP_LOG(("%s: starting.\n", __func__)); 857 TP_LOG(("%s: starting.\n", __func__));
866 858
867 for (;;) { 859 for (;;) {
868 /* Wait until there's a job. */ 860 /* Wait until there's a job. */
869 while (TAILQ_EMPTY(&pool->tp_jobs)) { 861 while (TAILQ_EMPTY(&pool->tp_jobs)) {
870 if (ISSET(pool->tp_flags, THREADPOOL_DYING)) { 862 if (ISSET(pool->tp_flags, THREADPOOL_DYING)) {
871 TP_LOG(("%s: THREADPOOL_DYING\n", 863 TP_LOG(("%s: THREADPOOL_DYING\n",
872 __func__)); 864 __func__));
873 break; 865 break;
874 } 866 }
875 cv_wait(&overseer->tpt_cv, &pool->tp_lock); 867 cv_wait(&overseer->tpt_cv, &pool->tp_lock);
876 } 868 }
877 if (__predict_false(TAILQ_EMPTY(&pool->tp_jobs))) 869 if (__predict_false(TAILQ_EMPTY(&pool->tp_jobs)))
878 break; 870 break;
879 871
880 /* If there are no threads, we'll have to try to start one. */ 872 /* If there are no threads, we'll have to try to start one. */
881 if (TAILQ_EMPTY(&pool->tp_idle_threads)) { 873 if (TAILQ_EMPTY(&pool->tp_idle_threads)) {
882 TP_LOG(("%s: Got a job, need to create a thread.\n", 874 TP_LOG(("%s: Got a job, need to create a thread.\n",
883 __func__)); 875 __func__));
884 error = threadpool_hold(pool); 876 error = threadpool_hold(pool);
885 if (error) { 877 if (error) {
886 (void)kpause("thrdplrf", false, hz, 878 (void)kpause("thrdplrf", false, hz,
887 &pool->tp_lock); 879 &pool->tp_lock);
888 continue; 880 continue;
889 } 881 }
890 mutex_spin_exit(&pool->tp_lock); 882 mutex_spin_exit(&pool->tp_lock);
891 883
892 struct threadpool_thread *const thread = 884 struct threadpool_thread *const thread =
893 pool_cache_get(threadpool_thread_pc, PR_WAITOK); 885 pool_cache_get(threadpool_thread_pc, PR_WAITOK);
894 thread->tpt_lwp = NULL; 886 thread->tpt_lwp = NULL;
895 thread->tpt_pool = pool; 887 thread->tpt_pool = pool;
896 thread->tpt_job = NULL; 888 thread->tpt_job = NULL;
897 cv_init(&thread->tpt_cv, "poolthrd"); 889 cv_init(&thread->tpt_cv, "poolthrd");
898 890
899 ktflags = 0; 891 ktflags = 0;
900 ktflags |= KTHREAD_MPSAFE; 892 ktflags |= KTHREAD_MPSAFE;
901 if (pool->tp_pri < PRI_KERNEL) 893 if (pool->tp_pri < PRI_KERNEL)
902 ktflags |= KTHREAD_TS; 894 ktflags |= KTHREAD_TS;
903 error = kthread_create(pool->tp_pri, ktflags, 895 error = kthread_create(pool->tp_pri, ktflags,
904 pool->tp_cpu, &threadpool_thread, thread, &lwp, 896 pool->tp_cpu, &threadpool_thread, thread, &lwp,
905 "poolthread/%d@%d", 897 "poolthread/%d@%d",
906 (pool->tp_cpu ? cpu_index(pool->tp_cpu) : -1), 898 (pool->tp_cpu ? cpu_index(pool->tp_cpu) : -1),
907 (int)pool->tp_pri); 899 (int)pool->tp_pri);
908 900
909 mutex_spin_enter(&pool->tp_lock); 901 mutex_spin_enter(&pool->tp_lock);
910 if (error) { 902 if (error) {
911 pool_cache_put(threadpool_thread_pc, thread); 903 pool_cache_put(threadpool_thread_pc, thread);
912 threadpool_rele(pool); 904 threadpool_rele(pool);
913 /* XXX What to do to wait for memory? */ 905 /* XXX What to do to wait for memory? */
914 (void)kpause("thrdplcr", false, hz, 906 (void)kpause("thrdplcr", false, hz,
915 &pool->tp_lock); 907 &pool->tp_lock);
916 continue; 908 continue;
917 } 909 }
918 KASSERT(lwp != NULL); 910 KASSERT(lwp != NULL);
919 TAILQ_INSERT_TAIL(&pool->tp_idle_threads, thread, 911 TAILQ_INSERT_TAIL(&pool->tp_idle_threads, thread,
920 tpt_entry); 912 tpt_entry);
921 thread->tpt_lwp = lwp; 913 thread->tpt_lwp = lwp;
922 lwp = NULL; 914 lwp = NULL;
923 cv_broadcast(&thread->tpt_cv); 915 cv_broadcast(&thread->tpt_cv);
924 continue; 916 continue;
925 } 917 }
926 918
927 /* There are idle threads, so try giving one a job. */ 919 /* There are idle threads, so try giving one a job. */
928 bool rele_job = true; 920 bool rele_job = true;
929 struct threadpool_job *const job = TAILQ_FIRST(&pool->tp_jobs); 921 struct threadpool_job *const job = TAILQ_FIRST(&pool->tp_jobs);
930 TAILQ_REMOVE(&pool->tp_jobs, job, job_entry); 922 TAILQ_REMOVE(&pool->tp_jobs, job, job_entry);
931 error = threadpool_job_hold(job); 923 error = threadpool_job_hold(job);
932 if (error) { 924 if (error) {
933 TAILQ_INSERT_HEAD(&pool->tp_jobs, job, job_entry); 925 TAILQ_INSERT_HEAD(&pool->tp_jobs, job, job_entry);
934 (void)kpause("pooljob", false, hz, &pool->tp_lock); 926 (void)kpause("pooljob", false, hz, &pool->tp_lock);
935 continue; 927 continue;
936 } 928 }
937 mutex_spin_exit(&pool->tp_lock); 929 mutex_spin_exit(&pool->tp_lock);
938 930
939 mutex_enter(job->job_lock); 931 mutex_enter(job->job_lock);
940 /* If the job was cancelled, we'll no longer be its thread. */ 932 /* If the job was cancelled, we'll no longer be its thread. */
941 if (__predict_true(job->job_thread == overseer)) { 933 if (__predict_true(job->job_thread == overseer)) {
942 mutex_spin_enter(&pool->tp_lock); 934 mutex_spin_enter(&pool->tp_lock);
943 if (__predict_false( 935 if (__predict_false(
944 TAILQ_EMPTY(&pool->tp_idle_threads))) { 936 TAILQ_EMPTY(&pool->tp_idle_threads))) {
945 /* 937 /*
946 * Someone else snagged the thread 938 * Someone else snagged the thread
947 * first. We'll have to try again. 939 * first. We'll have to try again.
948 */ 940 */
949 TP_LOG(("%s: '%s' lost race to use idle thread.\n", 941 TP_LOG(("%s: '%s' lost race to use idle thread.\n",
950 __func__, job->job_name)); 942 __func__, job->job_name));
951 TAILQ_INSERT_HEAD(&pool->tp_jobs, job, 943 TAILQ_INSERT_HEAD(&pool->tp_jobs, job,
952 job_entry); 944 job_entry);
953 } else { 945 } else {
954 /* 946 /*
955 * Assign the job to the thread and 947 * Assign the job to the thread and
956 * wake the thread so it starts work. 948 * wake the thread so it starts work.
957 */ 949 */
958 struct threadpool_thread *const thread = 950 struct threadpool_thread *const thread =
959 TAILQ_FIRST(&pool->tp_idle_threads); 951 TAILQ_FIRST(&pool->tp_idle_threads);
960 952
961 TP_LOG(("%s: '%s' gets thread %p\n", 953 TP_LOG(("%s: '%s' gets thread %p\n",
962 __func__, job->job_name, thread)); 954 __func__, job->job_name, thread));
963 KASSERT(thread->tpt_job == NULL); 955 KASSERT(thread->tpt_job == NULL);
964 TAILQ_REMOVE(&pool->tp_idle_threads, thread, 956 TAILQ_REMOVE(&pool->tp_idle_threads, thread,
965 tpt_entry); 957 tpt_entry);
966 thread->tpt_job = job; 958 thread->tpt_job = job;
967 job->job_thread = thread; 959 job->job_thread = thread;
968 cv_broadcast(&thread->tpt_cv); 960 cv_broadcast(&thread->tpt_cv);
969 /* Gave the thread our job reference. */ 961 /* Gave the thread our job reference. */
970 rele_job = false; 962 rele_job = false;
971 } 963 }
972 mutex_spin_exit(&pool->tp_lock); 964 mutex_spin_exit(&pool->tp_lock);
973 } 965 }
974 mutex_exit(job->job_lock); 966 mutex_exit(job->job_lock);
975 if (__predict_false(rele_job)) 967 if (__predict_false(rele_job))
976 threadpool_job_rele(job); 968 threadpool_job_rele(job);
977 969
978 mutex_spin_enter(&pool->tp_lock); 970 mutex_spin_enter(&pool->tp_lock);
979 } 971 }
980 mutex_spin_exit(&pool->tp_lock); 972 mutex_spin_exit(&pool->tp_lock);
981 973
982 TP_LOG(("%s: exiting.\n", __func__)); 974 TP_LOG(("%s: exiting.\n", __func__));
983 975
984 threadpool_rele(pool); 976 threadpool_rele(pool);
985 kthread_exit(0); 977 kthread_exit(0);
986} 978}
987 979
988/* Thread pool thread */ 980/* Thread pool thread */
989 981
990static void __dead 982static void __dead
991threadpool_thread(void *arg) 983threadpool_thread(void *arg)
992{ 984{
993 struct threadpool_thread *const thread = arg; 985 struct threadpool_thread *const thread = arg;
994 struct threadpool *const pool = thread->tpt_pool; 986 struct threadpool *const pool = thread->tpt_pool;
995 987
996 KASSERT((pool->tp_cpu == NULL) || (pool->tp_cpu == curcpu())); 988 KASSERT((pool->tp_cpu == NULL) || (pool->tp_cpu == curcpu()));
997 989
998 /* Wait until we're initialized and on the queue. */ 990 /* Wait until we're initialized and on the queue. */
999 mutex_spin_enter(&pool->tp_lock); 991 mutex_spin_enter(&pool->tp_lock);
1000 while (thread->tpt_lwp == NULL) 992 while (thread->tpt_lwp == NULL)
1001 cv_wait(&thread->tpt_cv, &pool->tp_lock); 993 cv_wait(&thread->tpt_cv, &pool->tp_lock);
1002 994
1003 TP_LOG(("%s: starting.\n", __func__)); 995 TP_LOG(("%s: starting.\n", __func__));
1004 996
1005 KASSERT(thread->tpt_lwp == curlwp); 997 KASSERT(thread->tpt_lwp == curlwp);
1006 for (;;) { 998 for (;;) {
1007 /* Wait until we are assigned a job. */ 999 /* Wait until we are assigned a job. */
1008 while (thread->tpt_job == NULL) { 1000 while (thread->tpt_job == NULL) {
1009 if (ISSET(pool->tp_flags, THREADPOOL_DYING)) { 1001 if (ISSET(pool->tp_flags, THREADPOOL_DYING)) {
1010 TP_LOG(("%s: THREADPOOL_DYING\n", 1002 TP_LOG(("%s: THREADPOOL_DYING\n",
1011 __func__)); 1003 __func__));
1012 break; 1004 break;
1013 } 1005 }
1014 if (cv_timedwait(&thread->tpt_cv, &pool->tp_lock, 1006 if (cv_timedwait(&thread->tpt_cv, &pool->tp_lock,
1015 THREADPOOL_IDLE_TICKS)) 1007 THREADPOOL_IDLE_TICKS))
1016 break; 1008 break;
1017 } 1009 }
1018 if (__predict_false(thread->tpt_job == NULL)) { 1010 if (__predict_false(thread->tpt_job == NULL)) {
1019 TAILQ_REMOVE(&pool->tp_idle_threads, thread, 1011 TAILQ_REMOVE(&pool->tp_idle_threads, thread,
1020 tpt_entry); 1012 tpt_entry);
1021 break; 1013 break;
1022 } 1014 }
1023 1015
1024 struct threadpool_job *const job = thread->tpt_job; 1016 struct threadpool_job *const job = thread->tpt_job;
1025 KASSERT(job != NULL); 1017 KASSERT(job != NULL);
1026 mutex_spin_exit(&pool->tp_lock); 1018 mutex_spin_exit(&pool->tp_lock);
1027 1019
1028 TP_LOG(("%s: running job '%s' on thread %p.\n", 1020 TP_LOG(("%s: running job '%s' on thread %p.\n",
1029 __func__, job->job_name, thread)); 1021 __func__, job->job_name, thread));
1030 1022
1031 /* Set our lwp name to reflect what job we're doing. */ 1023 /* Set our lwp name to reflect what job we're doing. */
1032 lwp_lock(curlwp); 1024 lwp_lock(curlwp);
1033 char *const lwp_name = curlwp->l_name; 1025 char *const lwp_name = curlwp->l_name;
1034 curlwp->l_name = job->job_name; 1026 curlwp->l_name = job->job_name;
1035 lwp_unlock(curlwp); 1027 lwp_unlock(curlwp);
1036 1028
1037 /* Run the job. */ 1029 /* Run the job. */
1038 (*job->job_fn)(job); 1030 (*job->job_fn)(job);
1039 1031
1040 /* Restore our lwp name. */ 1032 /* Restore our lwp name. */
1041 lwp_lock(curlwp); 1033 lwp_lock(curlwp);
1042 curlwp->l_name = lwp_name; 1034 curlwp->l_name = lwp_name;
1043 lwp_unlock(curlwp); 1035 lwp_unlock(curlwp);
1044 1036
1045 /* Job is done and its name is unreferenced. Release it. */ 1037 /* Job is done and its name is unreferenced. Release it. */
1046 threadpool_job_rele(job); 1038 threadpool_job_rele(job);
1047 1039
1048 mutex_spin_enter(&pool->tp_lock); 1040 mutex_spin_enter(&pool->tp_lock);
1049 KASSERT(thread->tpt_job == job); 1041 KASSERT(thread->tpt_job == job);
1050 thread->tpt_job = NULL; 1042 thread->tpt_job = NULL;
1051 TAILQ_INSERT_TAIL(&pool->tp_idle_threads, thread, tpt_entry); 1043 TAILQ_INSERT_TAIL(&pool->tp_idle_threads, thread, tpt_entry);
1052 } 1044 }
1053 mutex_spin_exit(&pool->tp_lock); 1045 mutex_spin_exit(&pool->tp_lock);
1054 1046
1055 TP_LOG(("%s: thread %p exiting.\n", __func__, thread)); 1047 TP_LOG(("%s: thread %p exiting.\n", __func__, thread));
1056 1048
1057 KASSERT(!cv_has_waiters(&thread->tpt_cv)); 1049 KASSERT(!cv_has_waiters(&thread->tpt_cv));
1058 cv_destroy(&thread->tpt_cv); 1050 cv_destroy(&thread->tpt_cv);
1059 pool_cache_put(threadpool_thread_pc, thread); 1051 pool_cache_put(threadpool_thread_pc, thread);
1060 threadpool_rele(pool); 1052 threadpool_rele(pool);
1061 kthread_exit(0); 1053 kthread_exit(0);
1062} 1054}