Sat Apr 11 16:32:07 2015 UTC ()
Fix ASSERT(x == y | z).  Interpret nthreads as pct when requested.

Reduces the number of threads created by zfs to a slightly less
insane value.

XXX This whole taskq business is questionable, and we really really
should not have copies of external code outside dist/ and without
vendor branches to record provenance and local changes.


(riastradh)
diff -r1.4 -r1.5 src/external/cddl/osnet/sys/kern/taskq.c

cvs diff -r1.4 -r1.5 src/external/cddl/osnet/sys/kern/taskq.c (switch to unified diff)

--- src/external/cddl/osnet/sys/kern/taskq.c 2015/04/11 00:13:04 1.4
+++ src/external/cddl/osnet/sys/kern/taskq.c 2015/04/11 16:32:07 1.5
@@ -1,1050 +1,1054 @@ @@ -1,1050 +1,1054 @@
1/* $NetBSD: taskq.c,v 1.4 2015/04/11 00:13:04 riastradh Exp $ */ 1/* $NetBSD: taskq.c,v 1.5 2015/04/11 16:32:07 riastradh Exp $ */
2 2
3/* 3/*
4 * CDDL HEADER START 4 * CDDL HEADER START
5 * 5 *
6 * The contents of this file are subject to the terms of the 6 * The contents of this file are subject to the terms of the
7 * Common Development and Distribution License, Version 1.0 only 7 * Common Development and Distribution License, Version 1.0 only
8 * (the "License"). You may not use this file except in compliance 8 * (the "License"). You may not use this file except in compliance
9 * with the License. 9 * with the License.
10 * 10 *
11 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 11 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
12 * or http://www.opensolaris.org/os/licensing. 12 * or http://www.opensolaris.org/os/licensing.
13 * See the License for the specific language governing permissions 13 * See the License for the specific language governing permissions
14 * and limitations under the License. 14 * and limitations under the License.
15 * 15 *
16 * When distributing Covered Code, include this CDDL HEADER in each 16 * When distributing Covered Code, include this CDDL HEADER in each
17 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 17 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
18 * If applicable, add the following below this CDDL HEADER, with the 18 * If applicable, add the following below this CDDL HEADER, with the
19 * fields enclosed by brackets "[]" replaced with your own identifying 19 * fields enclosed by brackets "[]" replaced with your own identifying
20 * information: Portions Copyright [yyyy] [name of copyright owner] 20 * information: Portions Copyright [yyyy] [name of copyright owner]
21 * 21 *
22 * CDDL HEADER END 22 * CDDL HEADER END
23 */ 23 */
24/* 24/*
25 * Copyright 2005 Sun Microsystems, Inc. All rights reserved. 25 * Copyright 2005 Sun Microsystems, Inc. All rights reserved.
26 * Use is subject to license terms. 26 * Use is subject to license terms.
27 */ 27 */
28 28
29#pragma ident "%Z%%M% %I% %E% SMI" 29#pragma ident "%Z%%M% %I% %E% SMI"
30 30
31/* 31/*
32 * Kernel task queues: general-purpose asynchronous task scheduling. 32 * Kernel task queues: general-purpose asynchronous task scheduling.
33 * 33 *
34 * A common problem in kernel programming is the need to schedule tasks 34 * A common problem in kernel programming is the need to schedule tasks
35 * to be performed later, by another thread. There are several reasons 35 * to be performed later, by another thread. There are several reasons
36 * you may want or need to do this: 36 * you may want or need to do this:
37 * 37 *
38 * (1) The task isn't time-critical, but your current code path is. 38 * (1) The task isn't time-critical, but your current code path is.
39 * 39 *
40 * (2) The task may require grabbing locks that you already hold. 40 * (2) The task may require grabbing locks that you already hold.
41 * 41 *
42 * (3) The task may need to block (e.g. to wait for memory), but you 42 * (3) The task may need to block (e.g. to wait for memory), but you
43 * cannot block in your current context. 43 * cannot block in your current context.
44 * 44 *
45 * (4) Your code path can't complete because of some condition, but you can't 45 * (4) Your code path can't complete because of some condition, but you can't
46 * sleep or fail, so you queue the task for later execution when condition 46 * sleep or fail, so you queue the task for later execution when condition
47 * disappears. 47 * disappears.
48 * 48 *
49 * (5) You just want a simple way to launch multiple tasks in parallel. 49 * (5) You just want a simple way to launch multiple tasks in parallel.
50 * 50 *
51 * Task queues provide such a facility. In its simplest form (used when 51 * Task queues provide such a facility. In its simplest form (used when
52 * performance is not a critical consideration) a task queue consists of a 52 * performance is not a critical consideration) a task queue consists of a
53 * single list of tasks, together with one or more threads to service the 53 * single list of tasks, together with one or more threads to service the
54 * list. There are some cases when this simple queue is not sufficient: 54 * list. There are some cases when this simple queue is not sufficient:
55 * 55 *
56 * (1) The task queues are very hot and there is a need to avoid data and lock 56 * (1) The task queues are very hot and there is a need to avoid data and lock
57 * contention over global resources. 57 * contention over global resources.
58 * 58 *
59 * (2) Some tasks may depend on other tasks to complete, so they can't be put in 59 * (2) Some tasks may depend on other tasks to complete, so they can't be put in
60 * the same list managed by the same thread. 60 * the same list managed by the same thread.
61 * 61 *
62 * (3) Some tasks may block for a long time, and this should not block other 62 * (3) Some tasks may block for a long time, and this should not block other
63 * tasks in the queue. 63 * tasks in the queue.
64 * 64 *
65 * To provide useful service in such cases we define a "dynamic task queue" 65 * To provide useful service in such cases we define a "dynamic task queue"
66 * which has an individual thread for each of the tasks. These threads are 66 * which has an individual thread for each of the tasks. These threads are
67 * dynamically created as they are needed and destroyed when they are not in 67 * dynamically created as they are needed and destroyed when they are not in
68 * use. The API for managing task pools is the same as for managing task queues 68 * use. The API for managing task pools is the same as for managing task queues
69 * with the exception of a taskq creation flag TASKQ_DYNAMIC which tells that 69 * with the exception of a taskq creation flag TASKQ_DYNAMIC which tells that
70 * dynamic task pool behavior is desired. 70 * dynamic task pool behavior is desired.
71 * 71 *
72 * Dynamic task queues may also place tasks in the normal queue (called "backing 72 * Dynamic task queues may also place tasks in the normal queue (called "backing
73 * queue") when task pool runs out of resources. Users of task queues may 73 * queue") when task pool runs out of resources. Users of task queues may
74 * disallow such queued scheduling by specifying TQ_NOQUEUE in the dispatch 74 * disallow such queued scheduling by specifying TQ_NOQUEUE in the dispatch
75 * flags. 75 * flags.
76 * 76 *
77 * The backing task queue is also used for scheduling internal tasks needed for 77 * The backing task queue is also used for scheduling internal tasks needed for
78 * dynamic task queue maintenance. 78 * dynamic task queue maintenance.
79 * 79 *
80 * INTERFACES: 80 * INTERFACES:
81 * 81 *
82 * taskq_t *taskq_create(name, nthreads, pri_t pri, minalloc, maxall, flags); 82 * taskq_t *taskq_create(name, nthreads, pri_t pri, minalloc, maxall, flags);
83 * 83 *
84 * Create a taskq with specified properties. 84 * Create a taskq with specified properties.
85 * Possible 'flags': 85 * Possible 'flags':
86 * 86 *
87 * TASKQ_DYNAMIC: Create task pool for task management. If this flag is 87 * TASKQ_DYNAMIC: Create task pool for task management. If this flag is
88 * specified, 'nthreads' specifies the maximum number of threads in 88 * specified, 'nthreads' specifies the maximum number of threads in
89 * the task queue. Task execution order for dynamic task queues is 89 * the task queue. Task execution order for dynamic task queues is
90 * not predictable. 90 * not predictable.
91 * 91 *
92 * If this flag is not specified (default case) a 92 * If this flag is not specified (default case) a
93 * single-list task queue is created with 'nthreads' threads 93 * single-list task queue is created with 'nthreads' threads
94 * servicing it. Entries in this queue are managed by 94 * servicing it. Entries in this queue are managed by
95 * taskq_ent_alloc() and taskq_ent_free() which try to keep the 95 * taskq_ent_alloc() and taskq_ent_free() which try to keep the
96 * task population between 'minalloc' and 'maxalloc', but the 96 * task population between 'minalloc' and 'maxalloc', but the
97 * latter limit is only advisory for TQ_SLEEP dispatches and the 97 * latter limit is only advisory for TQ_SLEEP dispatches and the
98 * former limit is only advisory for TQ_NOALLOC dispatches. If 98 * former limit is only advisory for TQ_NOALLOC dispatches. If
99 * TASKQ_PREPOPULATE is set in 'flags', the taskq will be 99 * TASKQ_PREPOPULATE is set in 'flags', the taskq will be
100 * prepopulated with 'minalloc' task structures. 100 * prepopulated with 'minalloc' task structures.
101 * 101 *
102 * Since non-DYNAMIC taskqs are queues, tasks are guaranteed to be 102 * Since non-DYNAMIC taskqs are queues, tasks are guaranteed to be
103 * executed in the order they are scheduled if nthreads == 1. 103 * executed in the order they are scheduled if nthreads == 1.
104 * If nthreads > 1, task execution order is not predictable. 104 * If nthreads > 1, task execution order is not predictable.
105 * 105 *
106 * TASKQ_PREPOPULATE: Prepopulate task queue with threads. 106 * TASKQ_PREPOPULATE: Prepopulate task queue with threads.
107 * Also prepopulate the task queue with 'minalloc' task structures. 107 * Also prepopulate the task queue with 'minalloc' task structures.
108 * 108 *
109 * TASKQ_CPR_SAFE: This flag specifies that users of the task queue will 109 * TASKQ_CPR_SAFE: This flag specifies that users of the task queue will
110 * use their own protocol for handling CPR issues. This flag is not 110 * use their own protocol for handling CPR issues. This flag is not
111 * supported for DYNAMIC task queues. 111 * supported for DYNAMIC task queues.
112 * 112 *
113 * The 'pri' field specifies the default priority for the threads that 113 * The 'pri' field specifies the default priority for the threads that
114 * service all scheduled tasks. 114 * service all scheduled tasks.
115 * 115 *
116 * void taskq_destroy(tap): 116 * void taskq_destroy(tap):
117 * 117 *
118 * Waits for any scheduled tasks to complete, then destroys the taskq. 118 * Waits for any scheduled tasks to complete, then destroys the taskq.
119 * Caller should guarantee that no new tasks are scheduled in the closing 119 * Caller should guarantee that no new tasks are scheduled in the closing
120 * taskq. 120 * taskq.
121 * 121 *
122 * taskqid_t taskq_dispatch(tq, func, arg, flags): 122 * taskqid_t taskq_dispatch(tq, func, arg, flags):
123 * 123 *
124 * Dispatches the task "func(arg)" to taskq. The 'flags' indicates whether 124 * Dispatches the task "func(arg)" to taskq. The 'flags' indicates whether
125 * the caller is willing to block for memory. The function returns an 125 * the caller is willing to block for memory. The function returns an
126 * opaque value which is zero iff dispatch fails. If flags is TQ_NOSLEEP 126 * opaque value which is zero iff dispatch fails. If flags is TQ_NOSLEEP
127 * or TQ_NOALLOC and the task can't be dispatched, taskq_dispatch() fails 127 * or TQ_NOALLOC and the task can't be dispatched, taskq_dispatch() fails
128 * and returns (taskqid_t)0. 128 * and returns (taskqid_t)0.
129 * 129 *
130 * ASSUMES: func != NULL. 130 * ASSUMES: func != NULL.
131 * 131 *
132 * Possible flags: 132 * Possible flags:
133 * TQ_NOSLEEP: Do not wait for resources; may fail. 133 * TQ_NOSLEEP: Do not wait for resources; may fail.
134 * 134 *
135 * TQ_NOALLOC: Do not allocate memory; may fail. May only be used with 135 * TQ_NOALLOC: Do not allocate memory; may fail. May only be used with
136 * non-dynamic task queues. 136 * non-dynamic task queues.
137 * 137 *
138 * TQ_NOQUEUE: Do not enqueue a task if it can't dispatch it due to 138 * TQ_NOQUEUE: Do not enqueue a task if it can't dispatch it due to
139 * lack of available resources and fail. If this flag is not 139 * lack of available resources and fail. If this flag is not
140 * set, and the task pool is exhausted, the task may be scheduled 140 * set, and the task pool is exhausted, the task may be scheduled
141 * in the backing queue. This flag may ONLY be used with dynamic 141 * in the backing queue. This flag may ONLY be used with dynamic
142 * task queues. 142 * task queues.
143 * 143 *
144 * NOTE: This flag should always be used when a task queue is used 144 * NOTE: This flag should always be used when a task queue is used
145 * for tasks that may depend on each other for completion. 145 * for tasks that may depend on each other for completion.
146 * Enqueueing dependent tasks may create deadlocks. 146 * Enqueueing dependent tasks may create deadlocks.
147 * 147 *
148 * TQ_SLEEP: May block waiting for resources. May still fail for 148 * TQ_SLEEP: May block waiting for resources. May still fail for
149 * dynamic task queues if TQ_NOQUEUE is also specified, otherwise 149 * dynamic task queues if TQ_NOQUEUE is also specified, otherwise
150 * always succeed. 150 * always succeed.
151 * 151 *
152 * NOTE: Dynamic task queues are much more likely to fail in 152 * NOTE: Dynamic task queues are much more likely to fail in
153 * taskq_dispatch() (especially if TQ_NOQUEUE was specified), so it 153 * taskq_dispatch() (especially if TQ_NOQUEUE was specified), so it
154 * is important to have backup strategies handling such failures. 154 * is important to have backup strategies handling such failures.
155 * 155 *
156 * void taskq_wait(tq): 156 * void taskq_wait(tq):
157 * 157 *
158 * Waits for all previously scheduled tasks to complete. 158 * Waits for all previously scheduled tasks to complete.
159 * 159 *
160 * NOTE: It does not stop any new task dispatches. 160 * NOTE: It does not stop any new task dispatches.
161 * Do NOT call taskq_wait() from a task: it will cause deadlock. 161 * Do NOT call taskq_wait() from a task: it will cause deadlock.
162 * 162 *
163 * void taskq_suspend(tq) 163 * void taskq_suspend(tq)
164 * 164 *
165 * Suspend all task execution. Tasks already scheduled for a dynamic task 165 * Suspend all task execution. Tasks already scheduled for a dynamic task
166 * queue will still be executed, but all new scheduled tasks will be 166 * queue will still be executed, but all new scheduled tasks will be
167 * suspended until taskq_resume() is called. 167 * suspended until taskq_resume() is called.
168 * 168 *
169 * int taskq_suspended(tq) 169 * int taskq_suspended(tq)
170 * 170 *
171 * Returns 1 if taskq is suspended and 0 otherwise. It is intended to 171 * Returns 1 if taskq is suspended and 0 otherwise. It is intended to
172 * ASSERT that the task queue is suspended. 172 * ASSERT that the task queue is suspended.
173 * 173 *
174 * void taskq_resume(tq) 174 * void taskq_resume(tq)
175 * 175 *
176 * Resume task queue execution. 176 * Resume task queue execution.
177 * 177 *
178 * int taskq_member(tq, thread) 178 * int taskq_member(tq, thread)
179 * 179 *
180 * Returns 1 if 'thread' belongs to taskq 'tq' and 0 otherwise. The 180 * Returns 1 if 'thread' belongs to taskq 'tq' and 0 otherwise. The
181 * intended use is to ASSERT that a given function is called in taskq 181 * intended use is to ASSERT that a given function is called in taskq
182 * context only. 182 * context only.
183 * 183 *
184 * system_taskq 184 * system_taskq
185 * 185 *
186 * Global system-wide dynamic task queue for common uses. It may be used by 186 * Global system-wide dynamic task queue for common uses. It may be used by
187 * any subsystem that needs to schedule tasks and does not need to manage 187 * any subsystem that needs to schedule tasks and does not need to manage
188 * its own task queues. It is initialized quite early during system boot. 188 * its own task queues. It is initialized quite early during system boot.
189 * 189 *
190 * IMPLEMENTATION. 190 * IMPLEMENTATION.
191 * 191 *
192 * This is schematic representation of the task queue structures. 192 * This is schematic representation of the task queue structures.
193 * 193 *
194 * taskq: 194 * taskq:
195 * +-------------+ 195 * +-------------+
196 * |tq_lock | +---< taskq_ent_free() 196 * |tq_lock | +---< taskq_ent_free()
197 * +-------------+ | 197 * +-------------+ |
198 * |... | | tqent: tqent: 198 * |... | | tqent: tqent:
199 * +-------------+ | +------------+ +------------+ 199 * +-------------+ | +------------+ +------------+
200 * | tq_freelist |-->| tqent_next |--> ... ->| tqent_next | 200 * | tq_freelist |-->| tqent_next |--> ... ->| tqent_next |
201 * +-------------+ +------------+ +------------+ 201 * +-------------+ +------------+ +------------+
202 * |... | | ... | | ... | 202 * |... | | ... | | ... |
203 * +-------------+ +------------+ +------------+ 203 * +-------------+ +------------+ +------------+
204 * | tq_task | | 204 * | tq_task | |
205 * | | +-------------->taskq_ent_alloc() 205 * | | +-------------->taskq_ent_alloc()
206 * +--------------------------------------------------------------------------+ 206 * +--------------------------------------------------------------------------+
207 * | | | tqent tqent | 207 * | | | tqent tqent |
208 * | +---------------------+ +--> +------------+ +--> +------------+ | 208 * | +---------------------+ +--> +------------+ +--> +------------+ |
209 * | | ... | | | func, arg | | | func, arg | | 209 * | | ... | | | func, arg | | | func, arg | |
210 * +>+---------------------+ <---|-+ +------------+ <---|-+ +------------+ | 210 * +>+---------------------+ <---|-+ +------------+ <---|-+ +------------+ |
211 * | tq_taskq.tqent_next | ----+ | | tqent_next | --->+ | | tqent_next |--+ 211 * | tq_taskq.tqent_next | ----+ | | tqent_next | --->+ | | tqent_next |--+
212 * +---------------------+ | +------------+ ^ | +------------+ 212 * +---------------------+ | +------------+ ^ | +------------+
213 * +-| tq_task.tqent_prev | +--| tqent_prev | | +--| tqent_prev | ^ 213 * +-| tq_task.tqent_prev | +--| tqent_prev | | +--| tqent_prev | ^
214 * | +---------------------+ +------------+ | +------------+ | 214 * | +---------------------+ +------------+ | +------------+ |
215 * | |... | | ... | | | ... | | 215 * | |... | | ... | | | ... | |
216 * | +---------------------+ +------------+ | +------------+ | 216 * | +---------------------+ +------------+ | +------------+ |
217 * | ^ | | 217 * | ^ | |
218 * | | | | 218 * | | | |
219 * +--------------------------------------+--------------+ TQ_APPEND() -+ 219 * +--------------------------------------+--------------+ TQ_APPEND() -+
220 * | | | 220 * | | |
221 * |... | taskq_thread()-----+ 221 * |... | taskq_thread()-----+
222 * +-------------+ 222 * +-------------+
223 * | tq_buckets |--+-------> [ NULL ] (for regular task queues) 223 * | tq_buckets |--+-------> [ NULL ] (for regular task queues)
224 * +-------------+ | 224 * +-------------+ |
225 * | DYNAMIC TASK QUEUES: 225 * | DYNAMIC TASK QUEUES:
226 * | 226 * |
227 * +-> taskq_bucket[nCPU] taskq_bucket_dispatch() 227 * +-> taskq_bucket[nCPU] taskq_bucket_dispatch()
228 * +-------------------+ ^ 228 * +-------------------+ ^
229 * +--->| tqbucket_lock | | 229 * +--->| tqbucket_lock | |
230 * | +-------------------+ +--------+ +--------+ 230 * | +-------------------+ +--------+ +--------+
231 * | | tqbucket_freelist |-->| tqent |-->...| tqent | ^ 231 * | | tqbucket_freelist |-->| tqent |-->...| tqent | ^
232 * | +-------------------+<--+--------+<--...+--------+ | 232 * | +-------------------+<--+--------+<--...+--------+ |
233 * | | ... | | thread | | thread | | 233 * | | ... | | thread | | thread | |
234 * | +-------------------+ +--------+ +--------+ | 234 * | +-------------------+ +--------+ +--------+ |
235 * | +-------------------+ | 235 * | +-------------------+ |
236 * taskq_dispatch()--+--->| tqbucket_lock | TQ_APPEND()------+ 236 * taskq_dispatch()--+--->| tqbucket_lock | TQ_APPEND()------+
237 * TQ_HASH() | +-------------------+ +--------+ +--------+ 237 * TQ_HASH() | +-------------------+ +--------+ +--------+
238 * | | tqbucket_freelist |-->| tqent |-->...| tqent | 238 * | | tqbucket_freelist |-->| tqent |-->...| tqent |
239 * | +-------------------+<--+--------+<--...+--------+ 239 * | +-------------------+<--+--------+<--...+--------+
240 * | | ... | | thread | | thread | 240 * | | ... | | thread | | thread |
241 * | +-------------------+ +--------+ +--------+ 241 * | +-------------------+ +--------+ +--------+
242 * +---> ... 242 * +---> ...
243 * 243 *
244 * 244 *
245 * Task queues use tq_task field to link new entry in the queue. The queue is a 245 * Task queues use tq_task field to link new entry in the queue. The queue is a
246 * circular doubly-linked list. Entries are put in the end of the list with 246 * circular doubly-linked list. Entries are put in the end of the list with
247 * TQ_APPEND() and processed from the front of the list by taskq_thread() in 247 * TQ_APPEND() and processed from the front of the list by taskq_thread() in
248 * FIFO order. Task queue entries are cached in the free list managed by 248 * FIFO order. Task queue entries are cached in the free list managed by
249 * taskq_ent_alloc() and taskq_ent_free() functions. 249 * taskq_ent_alloc() and taskq_ent_free() functions.
250 * 250 *
251 * All threads used by task queues mark t_taskq field of the thread to 251 * All threads used by task queues mark t_taskq field of the thread to
252 * point to the task queue. 252 * point to the task queue.
253 * 253 *
254 * Dynamic Task Queues Implementation. 254 * Dynamic Task Queues Implementation.
255 * 255 *
256 * For a dynamic task queues there is a 1-to-1 mapping between a thread and 256 * For a dynamic task queues there is a 1-to-1 mapping between a thread and
257 * taskq_ent_structure. Each entry is serviced by its own thread and each thread 257 * taskq_ent_structure. Each entry is serviced by its own thread and each thread
258 * is controlled by a single entry. 258 * is controlled by a single entry.
259 * 259 *
260 * Entries are distributed over a set of buckets. To avoid using modulo 260 * Entries are distributed over a set of buckets. To avoid using modulo
261 * arithmetics the number of buckets is 2^n and is determined as the nearest 261 * arithmetics the number of buckets is 2^n and is determined as the nearest
262 * power of two roundown of the number of CPUs in the system. Tunable 262 * power of two roundown of the number of CPUs in the system. Tunable
263 * variable 'taskq_maxbuckets' limits the maximum number of buckets. Each entry 263 * variable 'taskq_maxbuckets' limits the maximum number of buckets. Each entry
264 * is attached to a bucket for its lifetime and can't migrate to other buckets. 264 * is attached to a bucket for its lifetime and can't migrate to other buckets.
265 * 265 *
266 * Entries that have scheduled tasks are not placed in any list. The dispatch 266 * Entries that have scheduled tasks are not placed in any list. The dispatch
267 * function sets their "func" and "arg" fields and signals the corresponding 267 * function sets their "func" and "arg" fields and signals the corresponding
268 * thread to execute the task. Once the thread executes the task it clears the 268 * thread to execute the task. Once the thread executes the task it clears the
269 * "func" field and places an entry on the bucket cache of free entries pointed 269 * "func" field and places an entry on the bucket cache of free entries pointed
270 * by "tqbucket_freelist" field. ALL entries on the free list should have "func" 270 * by "tqbucket_freelist" field. ALL entries on the free list should have "func"
271 * field equal to NULL. The free list is a circular doubly-linked list identical 271 * field equal to NULL. The free list is a circular doubly-linked list identical
272 * in structure to the tq_task list above, but entries are taken from it in LIFO 272 * in structure to the tq_task list above, but entries are taken from it in LIFO
273 * order - the last freed entry is the first to be allocated. The 273 * order - the last freed entry is the first to be allocated. The
274 * taskq_bucket_dispatch() function gets the most recently used entry from the 274 * taskq_bucket_dispatch() function gets the most recently used entry from the
275 * free list, sets its "func" and "arg" fields and signals a worker thread. 275 * free list, sets its "func" and "arg" fields and signals a worker thread.
276 * 276 *
277 * After executing each task a per-entry thread taskq_d_thread() places its 277 * After executing each task a per-entry thread taskq_d_thread() places its
278 * entry on the bucket free list and goes to a timed sleep. If it wakes up 278 * entry on the bucket free list and goes to a timed sleep. If it wakes up
279 * without getting new task it removes the entry from the free list and destroys 279 * without getting new task it removes the entry from the free list and destroys
280 * itself. The thread sleep time is controlled by a tunable variable 280 * itself. The thread sleep time is controlled by a tunable variable
281 * `taskq_thread_timeout'. 281 * `taskq_thread_timeout'.
282 * 282 *
283 * There is various statistics kept in the bucket which allows for later 283 * There is various statistics kept in the bucket which allows for later
284 * analysis of taskq usage patterns. Also, a global copy of taskq creation and 284 * analysis of taskq usage patterns. Also, a global copy of taskq creation and
285 * death statistics is kept in the global taskq data structure. Since thread 285 * death statistics is kept in the global taskq data structure. Since thread
286 * creation and death happen rarely, updating such global data does not present 286 * creation and death happen rarely, updating such global data does not present
287 * a performance problem. 287 * a performance problem.
288 * 288 *
289 * NOTE: Threads are not bound to any CPU and there is absolutely no association 289 * NOTE: Threads are not bound to any CPU and there is absolutely no association
290 * between the bucket and actual thread CPU, so buckets are used only to 290 * between the bucket and actual thread CPU, so buckets are used only to
291 * split resources and reduce resource contention. Having threads attached 291 * split resources and reduce resource contention. Having threads attached
292 * to the CPU denoted by a bucket may reduce number of times the job 292 * to the CPU denoted by a bucket may reduce number of times the job
293 * switches between CPUs. 293 * switches between CPUs.
294 * 294 *
295 * Current algorithm creates a thread whenever a bucket has no free 295 * Current algorithm creates a thread whenever a bucket has no free
296 * entries. It would be nice to know how many threads are in the running 296 * entries. It would be nice to know how many threads are in the running
297 * state and don't create threads if all CPUs are busy with existing 297 * state and don't create threads if all CPUs are busy with existing
298 * tasks, but it is unclear how such strategy can be implemented. 298 * tasks, but it is unclear how such strategy can be implemented.
299 * 299 *
300 * Currently buckets are created statically as an array attached to task 300 * Currently buckets are created statically as an array attached to task
301 * queue. On some system with nCPUs < max_ncpus it may waste system 301 * queue. On some system with nCPUs < max_ncpus it may waste system
302 * memory. One solution may be allocation of buckets when they are first 302 * memory. One solution may be allocation of buckets when they are first
303 * touched, but it is not clear how useful it is. 303 * touched, but it is not clear how useful it is.
304 * 304 *
305 * SUSPEND/RESUME implementation. 305 * SUSPEND/RESUME implementation.
306 * 306 *
307 * Before executing a task taskq_thread() (executing non-dynamic task 307 * Before executing a task taskq_thread() (executing non-dynamic task
308 * queues) obtains taskq's thread lock as a reader. The taskq_suspend() 308 * queues) obtains taskq's thread lock as a reader. The taskq_suspend()
309 * function gets the same lock as a writer blocking all non-dynamic task 309 * function gets the same lock as a writer blocking all non-dynamic task
310 * execution. The taskq_resume() function releases the lock allowing 310 * execution. The taskq_resume() function releases the lock allowing
311 * taskq_thread to continue execution. 311 * taskq_thread to continue execution.
312 * 312 *
313 * For dynamic task queues, each bucket is marked as TQBUCKET_SUSPEND by 313 * For dynamic task queues, each bucket is marked as TQBUCKET_SUSPEND by
314 * taskq_suspend() function. After that taskq_bucket_dispatch() always 314 * taskq_suspend() function. After that taskq_bucket_dispatch() always
315 * fails, so that taskq_dispatch() will either enqueue tasks for a 315 * fails, so that taskq_dispatch() will either enqueue tasks for a
316 * suspended backing queue or fail if TQ_NOQUEUE is specified in dispatch 316 * suspended backing queue or fail if TQ_NOQUEUE is specified in dispatch
317 * flags. 317 * flags.
318 * 318 *
319 * NOTE: taskq_suspend() does not immediately block any tasks already 319 * NOTE: taskq_suspend() does not immediately block any tasks already
320 * scheduled for dynamic task queues. It only suspends new tasks 320 * scheduled for dynamic task queues. It only suspends new tasks
321 * scheduled after taskq_suspend() was called. 321 * scheduled after taskq_suspend() was called.
322 * 322 *
323 * taskq_member() function works by comparing a thread t_taskq pointer with 323 * taskq_member() function works by comparing a thread t_taskq pointer with
324 * the passed thread pointer. 324 * the passed thread pointer.
325 * 325 *
326 * LOCKS and LOCK Hierarchy: 326 * LOCKS and LOCK Hierarchy:
327 * 327 *
328 * There are two locks used in task queues. 328 * There are two locks used in task queues.
329 * 329 *
330 * 1) Task queue structure has a lock, protecting global task queue state. 330 * 1) Task queue structure has a lock, protecting global task queue state.
331 * 331 *
332 * 2) Each per-CPU bucket has a lock for bucket management. 332 * 2) Each per-CPU bucket has a lock for bucket management.
333 * 333 *
334 * If both locks are needed, task queue lock should be taken only after bucket 334 * If both locks are needed, task queue lock should be taken only after bucket
335 * lock. 335 * lock.
336 * 336 *
337 * DEBUG FACILITIES. 337 * DEBUG FACILITIES.
338 * 338 *
339 * For DEBUG kernels it is possible to induce random failures to 339 * For DEBUG kernels it is possible to induce random failures to
340 * taskq_dispatch() function when it is given TQ_NOSLEEP argument. The value of 340 * taskq_dispatch() function when it is given TQ_NOSLEEP argument. The value of
341 * taskq_dmtbf and taskq_smtbf tunables control the mean time between induced 341 * taskq_dmtbf and taskq_smtbf tunables control the mean time between induced
342 * failures for dynamic and static task queues respectively. 342 * failures for dynamic and static task queues respectively.
343 * 343 *
344 * Setting TASKQ_STATISTIC to 0 will disable per-bucket statistics. 344 * Setting TASKQ_STATISTIC to 0 will disable per-bucket statistics.
345 * 345 *
346 * TUNABLES 346 * TUNABLES
347 * 347 *
348 * system_taskq_size - Size of the global system_taskq. 348 * system_taskq_size - Size of the global system_taskq.
349 * This value is multiplied by nCPUs to determine 349 * This value is multiplied by nCPUs to determine
350 * actual size. 350 * actual size.
351 * Default value: 64 351 * Default value: 64
352 * 352 *
353 * taskq_thread_timeout - Maximum idle time for taskq_d_thread() 353 * taskq_thread_timeout - Maximum idle time for taskq_d_thread()
354 * Default value: 5 minutes 354 * Default value: 5 minutes
355 * 355 *
356 * taskq_maxbuckets - Maximum number of buckets in any task queue 356 * taskq_maxbuckets - Maximum number of buckets in any task queue
357 * Default value: 128 357 * Default value: 128
358 * 358 *
359 * taskq_search_depth - Maximum # of buckets searched for a free entry 359 * taskq_search_depth - Maximum # of buckets searched for a free entry
360 * Default value: 4 360 * Default value: 4
361 * 361 *
362 * taskq_dmtbf - Mean time between induced dispatch failures 362 * taskq_dmtbf - Mean time between induced dispatch failures
363 * for dynamic task queues. 363 * for dynamic task queues.
364 * Default value: UINT_MAX (no induced failures) 364 * Default value: UINT_MAX (no induced failures)
365 * 365 *
366 * taskq_smtbf - Mean time between induced dispatch failures 366 * taskq_smtbf - Mean time between induced dispatch failures
367 * for static task queues. 367 * for static task queues.
368 * Default value: UINT_MAX (no induced failures) 368 * Default value: UINT_MAX (no induced failures)
369 * 369 *
370 * CONDITIONAL compilation. 370 * CONDITIONAL compilation.
371 * 371 *
372 * TASKQ_STATISTIC - If set will enable bucket statistic (default). 372 * TASKQ_STATISTIC - If set will enable bucket statistic (default).
373 * 373 *
374 */ 374 */
375 375
376#include <sys/kthread.h> 376#include <sys/kthread.h>
377#include <sys/taskq_impl.h> 377#include <sys/taskq_impl.h>
378#include <sys/proc.h> 378#include <sys/proc.h>
379#include <sys/kmem.h> 379#include <sys/kmem.h>
380#include <sys/callb.h> 380#include <sys/callb.h>
381#include <sys/systm.h> 381#include <sys/systm.h>
382#include <sys/cmn_err.h> 382#include <sys/cmn_err.h>
383#include <sys/debug.h> 383#include <sys/debug.h>
384#include <sys/sysmacros.h> 384#include <sys/sysmacros.h>
385#include <sys/sdt.h> 385#include <sys/sdt.h>
386#include <sys/mutex.h> 386#include <sys/mutex.h>
387#include <sys/kernel.h> 387#include <sys/kernel.h>
388#include <sys/limits.h> 388#include <sys/limits.h>
389 389
390static kmem_cache_t *taskq_ent_cache, *taskq_cache; 390static kmem_cache_t *taskq_ent_cache, *taskq_cache;
391 391
392/* Global system task queue for common use */ 392/* Global system task queue for common use */
393taskq_t *system_taskq; 393taskq_t *system_taskq;
394 394
395/* 395/*
396 * Maxmimum number of entries in global system taskq is 396 * Maxmimum number of entries in global system taskq is
397 * system_taskq_size * max_ncpus 397 * system_taskq_size * max_ncpus
398 */ 398 */
399#define SYSTEM_TASKQ_SIZE 1 399#define SYSTEM_TASKQ_SIZE 1
400int system_taskq_size = SYSTEM_TASKQ_SIZE; 400int system_taskq_size = SYSTEM_TASKQ_SIZE;
401 401
402#define TASKQ_ACTIVE 0x00010000 402#define TASKQ_ACTIVE 0x00010000
403 403
404/* 404/*
405 * Dynamic task queue threads that don't get any work within 405 * Dynamic task queue threads that don't get any work within
406 * taskq_thread_timeout destroy themselves 406 * taskq_thread_timeout destroy themselves
407 */ 407 */
408#define TASKQ_THREAD_TIMEOUT (60 * 5) 408#define TASKQ_THREAD_TIMEOUT (60 * 5)
409int taskq_thread_timeout = TASKQ_THREAD_TIMEOUT; 409int taskq_thread_timeout = TASKQ_THREAD_TIMEOUT;
410 410
411#define TASKQ_MAXBUCKETS 128 411#define TASKQ_MAXBUCKETS 128
412int taskq_maxbuckets = TASKQ_MAXBUCKETS; 412int taskq_maxbuckets = TASKQ_MAXBUCKETS;
413 413
414/* 414/*
415 * When a bucket has no available entries another buckets are tried. 415 * When a bucket has no available entries another buckets are tried.
416 * taskq_search_depth parameter limits the amount of buckets that we search 416 * taskq_search_depth parameter limits the amount of buckets that we search
417 * before failing. This is mostly useful in systems with many CPUs where we may 417 * before failing. This is mostly useful in systems with many CPUs where we may
418 * spend too much time scanning busy buckets. 418 * spend too much time scanning busy buckets.
419 */ 419 */
420#define TASKQ_SEARCH_DEPTH 4 420#define TASKQ_SEARCH_DEPTH 4
421int taskq_search_depth = TASKQ_SEARCH_DEPTH; 421int taskq_search_depth = TASKQ_SEARCH_DEPTH;
422 422
423/* 423/*
424 * Hashing function: mix various bits of x. May be pretty much anything. 424 * Hashing function: mix various bits of x. May be pretty much anything.
425 */ 425 */
426#define TQ_HASH(x) ((x) ^ ((x) >> 11) ^ ((x) >> 17) ^ ((x) ^ 27)) 426#define TQ_HASH(x) ((x) ^ ((x) >> 11) ^ ((x) >> 17) ^ ((x) ^ 27))
427 427
428/* 428/*
429 * We do not create any new threads when the system is low on memory and start 429 * We do not create any new threads when the system is low on memory and start
430 * throttling memory allocations. The following macro tries to estimate such 430 * throttling memory allocations. The following macro tries to estimate such
431 * condition. 431 * condition.
432 */ 432 */
433#define ENOUGH_MEMORY() (freemem > throttlefree) 433#define ENOUGH_MEMORY() (freemem > throttlefree)
434 434
435/* 435/*
436 * Static functions. 436 * Static functions.
437 */ 437 */
438static taskq_t *taskq_create_common(const char *, int, int, pri_t, int, 438static taskq_t *taskq_create_common(const char *, int, int, pri_t, int,
439 int, uint_t); 439 int, uint_t);
440static void taskq_thread(void *); 440static void taskq_thread(void *);
441static int taskq_constructor(void *, void *, int); 441static int taskq_constructor(void *, void *, int);
442static void taskq_destructor(void *, void *); 442static void taskq_destructor(void *, void *);
443static int taskq_ent_constructor(void *, void *, int); 443static int taskq_ent_constructor(void *, void *, int);
444static void taskq_ent_destructor(void *, void *); 444static void taskq_ent_destructor(void *, void *);
445static taskq_ent_t *taskq_ent_alloc(taskq_t *, int); 445static taskq_ent_t *taskq_ent_alloc(taskq_t *, int);
446static void taskq_ent_free(taskq_t *, taskq_ent_t *); 446static void taskq_ent_free(taskq_t *, taskq_ent_t *);
447 447
448/* 448/*
449 * Collect per-bucket statistic when TASKQ_STATISTIC is defined. 449 * Collect per-bucket statistic when TASKQ_STATISTIC is defined.
450 */ 450 */
451#define TASKQ_STATISTIC 1 451#define TASKQ_STATISTIC 1
452 452
453#if TASKQ_STATISTIC 453#if TASKQ_STATISTIC
454#define TQ_STAT(b, x) b->tqbucket_stat.x++ 454#define TQ_STAT(b, x) b->tqbucket_stat.x++
455#else 455#else
456#define TQ_STAT(b, x) 456#define TQ_STAT(b, x)
457#endif 457#endif
458 458
459/* 459/*
460 * Random fault injection. 460 * Random fault injection.
461 */ 461 */
462uint_t taskq_random; 462uint_t taskq_random;
463uint_t taskq_dmtbf = UINT_MAX; /* mean time between injected failures */ 463uint_t taskq_dmtbf = UINT_MAX; /* mean time between injected failures */
464uint_t taskq_smtbf = UINT_MAX; /* mean time between injected failures */ 464uint_t taskq_smtbf = UINT_MAX; /* mean time between injected failures */
465 465
466/* 466/*
467 * TQ_NOSLEEP dispatches on dynamic task queues are always allowed to fail. 467 * TQ_NOSLEEP dispatches on dynamic task queues are always allowed to fail.
468 * 468 *
469 * TQ_NOSLEEP dispatches on static task queues can't arbitrarily fail because 469 * TQ_NOSLEEP dispatches on static task queues can't arbitrarily fail because
470 * they could prepopulate the cache and make sure that they do not use more 470 * they could prepopulate the cache and make sure that they do not use more
471 * then minalloc entries. So, fault injection in this case insures that 471 * then minalloc entries. So, fault injection in this case insures that
472 * either TASKQ_PREPOPULATE is not set or there are more entries allocated 472 * either TASKQ_PREPOPULATE is not set or there are more entries allocated
473 * than is specified by minalloc. TQ_NOALLOC dispatches are always allowed 473 * than is specified by minalloc. TQ_NOALLOC dispatches are always allowed
474 * to fail, but for simplicity we treat them identically to TQ_NOSLEEP 474 * to fail, but for simplicity we treat them identically to TQ_NOSLEEP
475 * dispatches. 475 * dispatches.
476 */ 476 */
477#ifdef DEBUG 477#ifdef DEBUG
478#define TASKQ_D_RANDOM_DISPATCH_FAILURE(tq, flag) \ 478#define TASKQ_D_RANDOM_DISPATCH_FAILURE(tq, flag) \
479 taskq_random = (taskq_random * 2416 + 374441) % 1771875;\ 479 taskq_random = (taskq_random * 2416 + 374441) % 1771875;\
480 if ((flag & TQ_NOSLEEP) && \ 480 if ((flag & TQ_NOSLEEP) && \
481 taskq_random < 1771875 / taskq_dmtbf) { \ 481 taskq_random < 1771875 / taskq_dmtbf) { \
482 return (NULL); \ 482 return (NULL); \
483 } 483 }
484 484
485#define TASKQ_S_RANDOM_DISPATCH_FAILURE(tq, flag) \ 485#define TASKQ_S_RANDOM_DISPATCH_FAILURE(tq, flag) \
486 taskq_random = (taskq_random * 2416 + 374441) % 1771875;\ 486 taskq_random = (taskq_random * 2416 + 374441) % 1771875;\
487 if ((flag & (TQ_NOSLEEP | TQ_NOALLOC)) && \ 487 if ((flag & (TQ_NOSLEEP | TQ_NOALLOC)) && \
488 (!(tq->tq_flags & TASKQ_PREPOPULATE) || \ 488 (!(tq->tq_flags & TASKQ_PREPOPULATE) || \
489 (tq->tq_nalloc > tq->tq_minalloc)) && \ 489 (tq->tq_nalloc > tq->tq_minalloc)) && \
490 (taskq_random < (1771875 / taskq_smtbf))) { \ 490 (taskq_random < (1771875 / taskq_smtbf))) { \
491 mutex_exit(&tq->tq_lock); \ 491 mutex_exit(&tq->tq_lock); \
492 return ((taskqid_t)0); \ 492 return ((taskqid_t)0); \
493 } 493 }
494#else 494#else
495#define TASKQ_S_RANDOM_DISPATCH_FAILURE(tq, flag) 495#define TASKQ_S_RANDOM_DISPATCH_FAILURE(tq, flag)
496#define TASKQ_D_RANDOM_DISPATCH_FAILURE(tq, flag) 496#define TASKQ_D_RANDOM_DISPATCH_FAILURE(tq, flag)
497#endif 497#endif
498 498
499#define IS_EMPTY(l) (((l).tqent_prev == (l).tqent_next) && \ 499#define IS_EMPTY(l) (((l).tqent_prev == (l).tqent_next) && \
500 ((l).tqent_prev == &(l))) 500 ((l).tqent_prev == &(l)))
501 501
502/* 502/*
503 * Append `tqe' in the end of the doubly-linked list denoted by l. 503 * Append `tqe' in the end of the doubly-linked list denoted by l.
504 */ 504 */
505#define TQ_APPEND(l, tqe) { \ 505#define TQ_APPEND(l, tqe) { \
506 tqe->tqent_next = &l; \ 506 tqe->tqent_next = &l; \
507 tqe->tqent_prev = l.tqent_prev; \ 507 tqe->tqent_prev = l.tqent_prev; \
508 tqe->tqent_next->tqent_prev = tqe; \ 508 tqe->tqent_next->tqent_prev = tqe; \
509 tqe->tqent_prev->tqent_next = tqe; \ 509 tqe->tqent_prev->tqent_next = tqe; \
510} 510}
511 511
512/* 512/*
513 * Schedule a task specified by func and arg into the task queue entry tqe. 513 * Schedule a task specified by func and arg into the task queue entry tqe.
514 */ 514 */
515#define TQ_ENQUEUE(tq, tqe, func, arg) { \ 515#define TQ_ENQUEUE(tq, tqe, func, arg) { \
516 ASSERT(MUTEX_HELD(&tq->tq_lock)); \ 516 ASSERT(MUTEX_HELD(&tq->tq_lock)); \
517 TQ_APPEND(tq->tq_task, tqe); \ 517 TQ_APPEND(tq->tq_task, tqe); \
518 tqe->tqent_func = (func); \ 518 tqe->tqent_func = (func); \
519 tqe->tqent_arg = (arg); \ 519 tqe->tqent_arg = (arg); \
520 tq->tq_tasks++; \ 520 tq->tq_tasks++; \
521 if (tq->tq_tasks - tq->tq_executed > tq->tq_maxtasks) \ 521 if (tq->tq_tasks - tq->tq_executed > tq->tq_maxtasks) \
522 tq->tq_maxtasks = tq->tq_tasks - tq->tq_executed; \ 522 tq->tq_maxtasks = tq->tq_tasks - tq->tq_executed; \
523 cv_signal(&tq->tq_dispatch_cv); \ 523 cv_signal(&tq->tq_dispatch_cv); \
524 DTRACE_PROBE2(taskq__enqueue, taskq_t *, tq, taskq_ent_t *, tqe); \ 524 DTRACE_PROBE2(taskq__enqueue, taskq_t *, tq, taskq_ent_t *, tqe); \
525} 525}
526 526
527/* 527/*
528 * Do-nothing task which may be used to prepopulate thread caches. 528 * Do-nothing task which may be used to prepopulate thread caches.
529 */ 529 */
530/*ARGSUSED*/ 530/*ARGSUSED*/
531void 531void
532nulltask(void *unused) 532nulltask(void *unused)
533{ 533{
534} 534}
535 535
536 536
537/*ARGSUSED*/ 537/*ARGSUSED*/
538static int 538static int
539taskq_constructor(void *arg, void *obj, int kmflags) 539taskq_constructor(void *arg, void *obj, int kmflags)
540{ 540{
541 taskq_t *tq = obj; 541 taskq_t *tq = obj;
542 542
543 memset(tq, 0, sizeof (taskq_t)); 543 memset(tq, 0, sizeof (taskq_t));
544 544
545 mutex_init(&tq->tq_lock, NULL, MUTEX_DEFAULT, NULL); 545 mutex_init(&tq->tq_lock, NULL, MUTEX_DEFAULT, NULL);
546 rw_init(&tq->tq_threadlock, NULL, RW_DEFAULT, NULL); 546 rw_init(&tq->tq_threadlock, NULL, RW_DEFAULT, NULL);
547 cv_init(&tq->tq_dispatch_cv, NULL, CV_DEFAULT, NULL); 547 cv_init(&tq->tq_dispatch_cv, NULL, CV_DEFAULT, NULL);
548 cv_init(&tq->tq_wait_cv, NULL, CV_DEFAULT, NULL); 548 cv_init(&tq->tq_wait_cv, NULL, CV_DEFAULT, NULL);
549 549
550 tq->tq_task.tqent_next = &tq->tq_task; 550 tq->tq_task.tqent_next = &tq->tq_task;
551 tq->tq_task.tqent_prev = &tq->tq_task; 551 tq->tq_task.tqent_prev = &tq->tq_task;
552 552
553 return (0); 553 return (0);
554} 554}
555 555
556/*ARGSUSED*/ 556/*ARGSUSED*/
557static void 557static void
558taskq_destructor(void *arg, void *obj) 558taskq_destructor(void *arg, void *obj)
559{ 559{
560 taskq_t *tq = obj; 560 taskq_t *tq = obj;
561 561
562 mutex_destroy(&tq->tq_lock); 562 mutex_destroy(&tq->tq_lock);
563 rw_destroy(&tq->tq_threadlock); 563 rw_destroy(&tq->tq_threadlock);
564 cv_destroy(&tq->tq_dispatch_cv); 564 cv_destroy(&tq->tq_dispatch_cv);
565 cv_destroy(&tq->tq_wait_cv); 565 cv_destroy(&tq->tq_wait_cv);
566} 566}
567 567
568/*ARGSUSED*/ 568/*ARGSUSED*/
569static int 569static int
570taskq_ent_constructor(void *arg, void *obj, int kmflags) 570taskq_ent_constructor(void *arg, void *obj, int kmflags)
571{ 571{
572 taskq_ent_t *tqe = obj; 572 taskq_ent_t *tqe = obj;
573 573
574 tqe->tqent_thread = NULL; 574 tqe->tqent_thread = NULL;
575 cv_init(&tqe->tqent_cv, NULL, CV_DEFAULT, NULL); 575 cv_init(&tqe->tqent_cv, NULL, CV_DEFAULT, NULL);
576 576
577 return (0); 577 return (0);
578} 578}
579 579
580/*ARGSUSED*/ 580/*ARGSUSED*/
581static void 581static void
582taskq_ent_destructor(void *arg, void *obj) 582taskq_ent_destructor(void *arg, void *obj)
583{ 583{
584 taskq_ent_t *tqe = obj; 584 taskq_ent_t *tqe = obj;
585 585
586 ASSERT(tqe->tqent_thread == NULL); 586 ASSERT(tqe->tqent_thread == NULL);
587 cv_destroy(&tqe->tqent_cv); 587 cv_destroy(&tqe->tqent_cv);
588} 588}
589 589
590/* 590/*
591 * Create global system dynamic task queue. 591 * Create global system dynamic task queue.
592 */ 592 */
593void 593void
594system_taskq_init(void) 594system_taskq_init(void)
595{ 595{
596 system_taskq = taskq_create_common("system_taskq", 0, 596 system_taskq = taskq_create_common("system_taskq", 0,
597 system_taskq_size * max_ncpus, minclsyspri, 4, 512, 597 system_taskq_size * max_ncpus, minclsyspri, 4, 512,
598 TASKQ_PREPOPULATE); 598 TASKQ_PREPOPULATE);
599} 599}
600 600
601void 601void
602system_taskq_fini(void) 602system_taskq_fini(void)
603{ 603{
604 taskq_destroy(system_taskq); 604 taskq_destroy(system_taskq);
605} 605}
606 606
607void 607void
608taskq_init(void) 608taskq_init(void)
609{ 609{
610 taskq_ent_cache = kmem_cache_create("taskq_ent_cache", 610 taskq_ent_cache = kmem_cache_create("taskq_ent_cache",
611 sizeof (taskq_ent_t), 0, taskq_ent_constructor, 611 sizeof (taskq_ent_t), 0, taskq_ent_constructor,
612 taskq_ent_destructor, NULL, NULL, NULL, 0); 612 taskq_ent_destructor, NULL, NULL, NULL, 0);
613 taskq_cache = kmem_cache_create("taskq_cache", sizeof (taskq_t), 613 taskq_cache = kmem_cache_create("taskq_cache", sizeof (taskq_t),
614 0, taskq_constructor, taskq_destructor, NULL, NULL, NULL, 0); 614 0, taskq_constructor, taskq_destructor, NULL, NULL, NULL, 0);
615 system_taskq_init(); 615 system_taskq_init();
616} 616}
617 617
618void 618void
619taskq_fini(void) 619taskq_fini(void)
620{ 620{
621 system_taskq_fini(); 621 system_taskq_fini();
622 kmem_cache_destroy(taskq_cache); 622 kmem_cache_destroy(taskq_cache);
623 kmem_cache_destroy(taskq_ent_cache); 623 kmem_cache_destroy(taskq_ent_cache);
624} 624}
625 625
626/* 626/*
627 * taskq_ent_alloc() 627 * taskq_ent_alloc()
628 * 628 *
629 * Allocates a new taskq_ent_t structure either from the free list or from the 629 * Allocates a new taskq_ent_t structure either from the free list or from the
630 * cache. Returns NULL if it can't be allocated. 630 * cache. Returns NULL if it can't be allocated.
631 * 631 *
632 * Assumes: tq->tq_lock is held. 632 * Assumes: tq->tq_lock is held.
633 */ 633 */
634static taskq_ent_t * 634static taskq_ent_t *
635taskq_ent_alloc(taskq_t *tq, int flags) 635taskq_ent_alloc(taskq_t *tq, int flags)
636{ 636{
637 int kmflags = KM_NOSLEEP; 637 int kmflags = KM_NOSLEEP;
638  638
639 taskq_ent_t *tqe; 639 taskq_ent_t *tqe;
640 640
641 ASSERT(MUTEX_HELD(&tq->tq_lock)); 641 ASSERT(MUTEX_HELD(&tq->tq_lock));
642 642
643 /* 643 /*
644 * TQ_NOALLOC allocations are allowed to use the freelist, even if 644 * TQ_NOALLOC allocations are allowed to use the freelist, even if
645 * we are below tq_minalloc. 645 * we are below tq_minalloc.
646 */ 646 */
647 if ((tqe = tq->tq_freelist) != NULL && 647 if ((tqe = tq->tq_freelist) != NULL &&
648 ((flags & TQ_NOALLOC) || tq->tq_nalloc >= tq->tq_minalloc)) { 648 ((flags & TQ_NOALLOC) || tq->tq_nalloc >= tq->tq_minalloc)) {
649 tq->tq_freelist = tqe->tqent_next; 649 tq->tq_freelist = tqe->tqent_next;
650 } else { 650 } else {
651 if (flags & TQ_NOALLOC) 651 if (flags & TQ_NOALLOC)
652 return (NULL); 652 return (NULL);
653 653
654 mutex_exit(&tq->tq_lock); 654 mutex_exit(&tq->tq_lock);
655 if (tq->tq_nalloc >= tq->tq_maxalloc) { 655 if (tq->tq_nalloc >= tq->tq_maxalloc) {
656 if (kmflags & KM_NOSLEEP) { 656 if (kmflags & KM_NOSLEEP) {
657 mutex_enter(&tq->tq_lock); 657 mutex_enter(&tq->tq_lock);
658 return (NULL); 658 return (NULL);
659 } 659 }
660 /* 660 /*
661 * We don't want to exceed tq_maxalloc, but we can't 661 * We don't want to exceed tq_maxalloc, but we can't
662 * wait for other tasks to complete (and thus free up 662 * wait for other tasks to complete (and thus free up
663 * task structures) without risking deadlock with 663 * task structures) without risking deadlock with
664 * the caller. So, we just delay for one second 664 * the caller. So, we just delay for one second
665 * to throttle the allocation rate. 665 * to throttle the allocation rate.
666 */ 666 */
667 xdelay(hz); 667 xdelay(hz);
668 } 668 }
669 tqe = kmem_cache_alloc(taskq_ent_cache, kmflags); 669 tqe = kmem_cache_alloc(taskq_ent_cache, kmflags);
670 mutex_enter(&tq->tq_lock); 670 mutex_enter(&tq->tq_lock);
671 if (tqe != NULL) 671 if (tqe != NULL)
672 tq->tq_nalloc++; 672 tq->tq_nalloc++;
673 } 673 }
674 return (tqe); 674 return (tqe);
675} 675}
676 676
677/* 677/*
678 * taskq_ent_free() 678 * taskq_ent_free()
679 * 679 *
680 * Free taskq_ent_t structure by either putting it on the free list or freeing 680 * Free taskq_ent_t structure by either putting it on the free list or freeing
681 * it to the cache. 681 * it to the cache.
682 * 682 *
683 * Assumes: tq->tq_lock is held. 683 * Assumes: tq->tq_lock is held.
684 */ 684 */
685static void 685static void
686taskq_ent_free(taskq_t *tq, taskq_ent_t *tqe) 686taskq_ent_free(taskq_t *tq, taskq_ent_t *tqe)
687{ 687{
688 ASSERT(MUTEX_HELD(&tq->tq_lock)); 688 ASSERT(MUTEX_HELD(&tq->tq_lock));
689 689
690 if (tq->tq_nalloc <= tq->tq_minalloc) { 690 if (tq->tq_nalloc <= tq->tq_minalloc) {
691 tqe->tqent_next = tq->tq_freelist; 691 tqe->tqent_next = tq->tq_freelist;
692 tq->tq_freelist = tqe; 692 tq->tq_freelist = tqe;
693 } else { 693 } else {
694 tq->tq_nalloc--; 694 tq->tq_nalloc--;
695 mutex_exit(&tq->tq_lock); 695 mutex_exit(&tq->tq_lock);
696 kmem_cache_free(taskq_ent_cache, tqe); 696 kmem_cache_free(taskq_ent_cache, tqe);
697 mutex_enter(&tq->tq_lock); 697 mutex_enter(&tq->tq_lock);
698 } 698 }
699} 699}
700 700
701/* 701/*
702 * Dispatch a task. 702 * Dispatch a task.
703 * 703 *
704 * Assumes: func != NULL 704 * Assumes: func != NULL
705 * 705 *
706 * Returns: NULL if dispatch failed. 706 * Returns: NULL if dispatch failed.
707 * non-NULL if task dispatched successfully. 707 * non-NULL if task dispatched successfully.
708 * Actual return value is the pointer to taskq entry that was used to 708 * Actual return value is the pointer to taskq entry that was used to
709 * dispatch a task. This is useful for debugging. 709 * dispatch a task. This is useful for debugging.
710 */ 710 */
711/* ARGSUSED */ 711/* ARGSUSED */
712taskqid_t 712taskqid_t
713taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags) 713taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags)
714{ 714{
715 taskq_ent_t *tqe = NULL; 715 taskq_ent_t *tqe = NULL;
716 716
717 ASSERT(tq != NULL); 717 ASSERT(tq != NULL);
718 ASSERT(func != NULL); 718 ASSERT(func != NULL);
719 ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC)); 719 ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC));
720 720
721 /* 721 /*
722 * TQ_NOQUEUE flag can't be used with non-dynamic task queues. 722 * TQ_NOQUEUE flag can't be used with non-dynamic task queues.
723 */ 723 */
724#ifdef __NetBSD__ 724#ifdef __NetBSD__
725 /* 725 /*
726 * Dynamic task queues didn't seem to get imported. Caller 726 * Dynamic task queues didn't seem to get imported. Caller
727 * must be prepared to handle failure anyway, so just fail. 727 * must be prepared to handle failure anyway, so just fail.
728 */ 728 */
729 if (flags & TQ_NOQUEUE) 729 if (flags & TQ_NOQUEUE)
730 return ((taskqid_t)NULL); 730 return ((taskqid_t)NULL);
731#endif 731#endif
732 ASSERT(! (flags & TQ_NOQUEUE)); 732 ASSERT(! (flags & TQ_NOQUEUE));
733 733
734 /* 734 /*
735 * Enqueue the task to the underlying queue. 735 * Enqueue the task to the underlying queue.
736 */ 736 */
737 mutex_enter(&tq->tq_lock); 737 mutex_enter(&tq->tq_lock);
738 738
739 TASKQ_S_RANDOM_DISPATCH_FAILURE(tq, flags); 739 TASKQ_S_RANDOM_DISPATCH_FAILURE(tq, flags);
740 740
741 if ((tqe = taskq_ent_alloc(tq, flags)) == NULL) { 741 if ((tqe = taskq_ent_alloc(tq, flags)) == NULL) {
742 mutex_exit(&tq->tq_lock); 742 mutex_exit(&tq->tq_lock);
743 return ((taskqid_t)NULL); 743 return ((taskqid_t)NULL);
744 } 744 }
745 TQ_ENQUEUE(tq, tqe, func, arg); 745 TQ_ENQUEUE(tq, tqe, func, arg);
746 mutex_exit(&tq->tq_lock); 746 mutex_exit(&tq->tq_lock);
747 return ((taskqid_t)tqe); 747 return ((taskqid_t)tqe);
748} 748}
749 749
750/* 750/*
751 * Wait for all pending tasks to complete. 751 * Wait for all pending tasks to complete.
752 * Calling taskq_wait from a task will cause deadlock. 752 * Calling taskq_wait from a task will cause deadlock.
753 */ 753 */
754void 754void
755taskq_wait(taskq_t *tq) 755taskq_wait(taskq_t *tq)
756{ 756{
757 757
758 mutex_enter(&tq->tq_lock); 758 mutex_enter(&tq->tq_lock);
759 while (tq->tq_task.tqent_next != &tq->tq_task || tq->tq_active != 0) 759 while (tq->tq_task.tqent_next != &tq->tq_task || tq->tq_active != 0)
760 cv_wait(&tq->tq_wait_cv, &tq->tq_lock); 760 cv_wait(&tq->tq_wait_cv, &tq->tq_lock);
761 mutex_exit(&tq->tq_lock); 761 mutex_exit(&tq->tq_lock);
762} 762}
763 763
764/* 764/*
765 * Suspend execution of tasks. 765 * Suspend execution of tasks.
766 * 766 *
767 * Tasks in the queue part will be suspended immediately upon return from this 767 * Tasks in the queue part will be suspended immediately upon return from this
768 * function. Pending tasks in the dynamic part will continue to execute, but all 768 * function. Pending tasks in the dynamic part will continue to execute, but all
769 * new tasks will be suspended. 769 * new tasks will be suspended.
770 */ 770 */
771void 771void
772taskq_suspend(taskq_t *tq) 772taskq_suspend(taskq_t *tq)
773{ 773{
774 rw_enter(&tq->tq_threadlock, RW_WRITER); 774 rw_enter(&tq->tq_threadlock, RW_WRITER);
775 775
776 /* 776 /*
777 * Mark task queue as being suspended. Needed for taskq_suspended(). 777 * Mark task queue as being suspended. Needed for taskq_suspended().
778 */ 778 */
779 mutex_enter(&tq->tq_lock); 779 mutex_enter(&tq->tq_lock);
780 ASSERT(!(tq->tq_flags & TASKQ_SUSPENDED)); 780 ASSERT(!(tq->tq_flags & TASKQ_SUSPENDED));
781 tq->tq_flags |= TASKQ_SUSPENDED; 781 tq->tq_flags |= TASKQ_SUSPENDED;
782 mutex_exit(&tq->tq_lock); 782 mutex_exit(&tq->tq_lock);
783} 783}
784 784
785/* 785/*
786 * returns: 1 if tq is suspended, 0 otherwise. 786 * returns: 1 if tq is suspended, 0 otherwise.
787 */ 787 */
788int 788int
789taskq_suspended(taskq_t *tq) 789taskq_suspended(taskq_t *tq)
790{ 790{
791 return ((tq->tq_flags & TASKQ_SUSPENDED) != 0); 791 return ((tq->tq_flags & TASKQ_SUSPENDED) != 0);
792} 792}
793 793
794/* 794/*
795 * Resume taskq execution. 795 * Resume taskq execution.
796 */ 796 */
797void 797void
798taskq_resume(taskq_t *tq) 798taskq_resume(taskq_t *tq)
799{ 799{
800 ASSERT(RW_WRITE_HELD(&tq->tq_threadlock)); 800 ASSERT(RW_WRITE_HELD(&tq->tq_threadlock));
801 801
802 mutex_enter(&tq->tq_lock); 802 mutex_enter(&tq->tq_lock);
803 ASSERT(tq->tq_flags & TASKQ_SUSPENDED); 803 ASSERT(tq->tq_flags & TASKQ_SUSPENDED);
804 tq->tq_flags &= ~TASKQ_SUSPENDED; 804 tq->tq_flags &= ~TASKQ_SUSPENDED;
805 mutex_exit(&tq->tq_lock); 805 mutex_exit(&tq->tq_lock);
806 806
807 rw_exit(&tq->tq_threadlock); 807 rw_exit(&tq->tq_threadlock);
808} 808}
809 809
810int 810int
811taskq_member(taskq_t *tq, kthread_t *thread) 811taskq_member(taskq_t *tq, kthread_t *thread)
812{ 812{
813 if (tq->tq_nthreads == 1) 813 if (tq->tq_nthreads == 1)
814 return (tq->tq_thread == thread); 814 return (tq->tq_thread == thread);
815 else { 815 else {
816 int i, found = 0; 816 int i, found = 0;
817 817
818 mutex_enter(&tq->tq_lock); 818 mutex_enter(&tq->tq_lock);
819 for (i = 0; i < tq->tq_nthreads; i++) { 819 for (i = 0; i < tq->tq_nthreads; i++) {
820 if (tq->tq_threadlist[i] == thread) { 820 if (tq->tq_threadlist[i] == thread) {
821 found = 1; 821 found = 1;
822 break; 822 break;
823 } 823 }
824 } 824 }
825 mutex_exit(&tq->tq_lock); 825 mutex_exit(&tq->tq_lock);
826 return (found); 826 return (found);
827 } 827 }
828} 828}
829 829
830/* 830/*
831 * Worker thread for processing task queue. 831 * Worker thread for processing task queue.
832 */ 832 */
833static void 833static void
834taskq_thread(void *arg) 834taskq_thread(void *arg)
835{ 835{
836 taskq_t *tq = arg; 836 taskq_t *tq = arg;
837 taskq_ent_t *tqe; 837 taskq_ent_t *tqe;
838 callb_cpr_t cprinfo; 838 callb_cpr_t cprinfo;
839 hrtime_t start, end; 839 hrtime_t start, end;
840 840
841 CALLB_CPR_INIT(&cprinfo, &tq->tq_lock, callb_generic_cpr, tq->tq_name); 841 CALLB_CPR_INIT(&cprinfo, &tq->tq_lock, callb_generic_cpr, tq->tq_name);
842 842
843 mutex_enter(&tq->tq_lock); 843 mutex_enter(&tq->tq_lock);
844 while (tq->tq_flags & TASKQ_ACTIVE) { 844 while (tq->tq_flags & TASKQ_ACTIVE) {
845 if ((tqe = tq->tq_task.tqent_next) == &tq->tq_task) { 845 if ((tqe = tq->tq_task.tqent_next) == &tq->tq_task) {
846 if (--tq->tq_active == 0) 846 if (--tq->tq_active == 0)
847 cv_broadcast(&tq->tq_wait_cv); 847 cv_broadcast(&tq->tq_wait_cv);
848 if (tq->tq_flags & TASKQ_CPR_SAFE) { 848 if (tq->tq_flags & TASKQ_CPR_SAFE) {
849 cv_wait(&tq->tq_dispatch_cv, &tq->tq_lock); 849 cv_wait(&tq->tq_dispatch_cv, &tq->tq_lock);
850 } else { 850 } else {
851 CALLB_CPR_SAFE_BEGIN(&cprinfo); 851 CALLB_CPR_SAFE_BEGIN(&cprinfo);
852 cv_wait(&tq->tq_dispatch_cv, &tq->tq_lock); 852 cv_wait(&tq->tq_dispatch_cv, &tq->tq_lock);
853 CALLB_CPR_SAFE_END(&cprinfo, &tq->tq_lock); 853 CALLB_CPR_SAFE_END(&cprinfo, &tq->tq_lock);
854 } 854 }
855 tq->tq_active++; 855 tq->tq_active++;
856 continue; 856 continue;
857 } 857 }
858 tqe->tqent_prev->tqent_next = tqe->tqent_next; 858 tqe->tqent_prev->tqent_next = tqe->tqent_next;
859 tqe->tqent_next->tqent_prev = tqe->tqent_prev; 859 tqe->tqent_next->tqent_prev = tqe->tqent_prev;
860 mutex_exit(&tq->tq_lock); 860 mutex_exit(&tq->tq_lock);
861 861
862 rw_enter(&tq->tq_threadlock, RW_READER); 862 rw_enter(&tq->tq_threadlock, RW_READER);
863 start = gethrtime(); 863 start = gethrtime();
864 DTRACE_PROBE2(taskq__exec__start, taskq_t *, tq, 864 DTRACE_PROBE2(taskq__exec__start, taskq_t *, tq,
865 taskq_ent_t *, tqe); 865 taskq_ent_t *, tqe);
866 tqe->tqent_func(tqe->tqent_arg); 866 tqe->tqent_func(tqe->tqent_arg);
867 DTRACE_PROBE2(taskq__exec__end, taskq_t *, tq, 867 DTRACE_PROBE2(taskq__exec__end, taskq_t *, tq,
868 taskq_ent_t *, tqe); 868 taskq_ent_t *, tqe);
869 end = gethrtime(); 869 end = gethrtime();
870 rw_exit(&tq->tq_threadlock); 870 rw_exit(&tq->tq_threadlock);
871 871
872 mutex_enter(&tq->tq_lock); 872 mutex_enter(&tq->tq_lock);
873 tq->tq_totaltime += end - start; 873 tq->tq_totaltime += end - start;
874 tq->tq_executed++; 874 tq->tq_executed++;
875 875
876 taskq_ent_free(tq, tqe); 876 taskq_ent_free(tq, tqe);
877 } 877 }
878 tq->tq_nthreads--; 878 tq->tq_nthreads--;
879 cv_broadcast(&tq->tq_wait_cv); 879 cv_broadcast(&tq->tq_wait_cv);
880 ASSERT(!(tq->tq_flags & TASKQ_CPR_SAFE)); 880 ASSERT(!(tq->tq_flags & TASKQ_CPR_SAFE));
881 CALLB_CPR_EXIT(&cprinfo); 881 CALLB_CPR_EXIT(&cprinfo);
882 thread_exit(); 882 thread_exit();
883} 883}
884 884
885/* 885/*
886 * Taskq creation. May sleep for memory. 886 * Taskq creation. May sleep for memory.
887 * Always use automatically generated instances to avoid kstat name space 887 * Always use automatically generated instances to avoid kstat name space
888 * collisions. 888 * collisions.
889 */ 889 */
890 890
891taskq_t * 891taskq_t *
892taskq_create(const char *name, int nthreads, pri_t pri, int minalloc, 892taskq_create(const char *name, int nthreads, pri_t pri, int minalloc,
893 int maxalloc, uint_t flags) 893 int maxalloc, uint_t flags)
894{ 894{
895 return taskq_create_common(name, 0, nthreads, pri, minalloc, 895 return taskq_create_common(name, 0, nthreads, pri, minalloc,
896 maxalloc, flags | TASKQ_NOINSTANCE); 896 maxalloc, flags | TASKQ_NOINSTANCE);
897} 897}
898 898
899static taskq_t * 899static taskq_t *
900taskq_create_common(const char *name, int instance, int nthreads, pri_t pri, 900taskq_create_common(const char *name, int instance, int nthreads, pri_t pri,
901 int minalloc, int maxalloc, uint_t flags) 901 int minalloc, int maxalloc, uint_t flags)
902{ 902{
903 taskq_t *tq = kmem_cache_alloc(taskq_cache, KM_NOSLEEP); 903 taskq_t *tq = kmem_cache_alloc(taskq_cache, KM_NOSLEEP);
904 uint_t ncpus = ((boot_max_ncpus == -1) ? max_ncpus : boot_max_ncpus); 904 uint_t ncpus = ((boot_max_ncpus == -1) ? max_ncpus : boot_max_ncpus);
905 uint_t bsize; /* # of buckets - always power of 2 */ 905 uint_t bsize; /* # of buckets - always power of 2 */
906 906
907 ASSERT(instance == 0); 907 ASSERT(instance == 0);
908 ASSERT(flags == TASKQ_PREPOPULATE | TASKQ_NOINSTANCE); 908 ASSERT(!ISSET(flags, TASKQ_CPR_SAFE));
 909 ASSERT(!ISSET(flags, TASKQ_DYNAMIC));
909 910
910 /* 911 /*
911 * TASKQ_CPR_SAFE and TASKQ_DYNAMIC flags are mutually exclusive. 912 * TASKQ_CPR_SAFE and TASKQ_DYNAMIC flags are mutually exclusive.
912 */ 913 */
913 ASSERT((flags & (TASKQ_DYNAMIC | TASKQ_CPR_SAFE)) != 914 ASSERT((flags & (TASKQ_DYNAMIC | TASKQ_CPR_SAFE)) !=
914 ((TASKQ_DYNAMIC | TASKQ_CPR_SAFE))); 915 ((TASKQ_DYNAMIC | TASKQ_CPR_SAFE)));
915 916
916 ASSERT(tq->tq_buckets == NULL); 917 ASSERT(tq->tq_buckets == NULL);
917 918
918 bsize = 1 << (highbit(ncpus) - 1); 919 bsize = 1 << (highbit(ncpus) - 1);
919 ASSERT(bsize >= 1); 920 ASSERT(bsize >= 1);
920 bsize = MIN(bsize, taskq_maxbuckets); 921 bsize = MIN(bsize, taskq_maxbuckets);
921 922
922 tq->tq_maxsize = nthreads; 923 ASSERT(!(flags & TASKQ_DYNAMIC));
 924 if (flags & TASKQ_THREADS_CPU_PCT)
 925 /* nthreads is % of CPUs we want to use. */
 926 nthreads = (ncpus*nthreads)/100;
923 927
924 (void) strncpy(tq->tq_name, name, TASKQ_NAMELEN + 1); 928 (void) strncpy(tq->tq_name, name, TASKQ_NAMELEN + 1);
925 tq->tq_name[TASKQ_NAMELEN] = '\0'; 929 tq->tq_name[TASKQ_NAMELEN] = '\0';
926 /* Make sure the name conforms to the rules for C indentifiers */ 930 /* Make sure the name conforms to the rules for C indentifiers */
927 strident_canon(tq->tq_name, TASKQ_NAMELEN); 931 strident_canon(tq->tq_name, TASKQ_NAMELEN);
928 932
929 tq->tq_flags = flags | TASKQ_ACTIVE; 933 tq->tq_flags = flags | TASKQ_ACTIVE;
930 tq->tq_active = nthreads; 934 tq->tq_active = nthreads;
931 tq->tq_nthreads = nthreads; 935 tq->tq_nthreads = nthreads;
932 tq->tq_minalloc = minalloc; 936 tq->tq_minalloc = minalloc;
933 tq->tq_maxalloc = maxalloc; 937 tq->tq_maxalloc = maxalloc;
934 tq->tq_nbuckets = bsize; 938 tq->tq_nbuckets = bsize;
935 tq->tq_pri = pri; 939 tq->tq_pri = pri;
936 940
937 if (flags & TASKQ_PREPOPULATE) { 941 if (flags & TASKQ_PREPOPULATE) {
938 mutex_enter(&tq->tq_lock); 942 mutex_enter(&tq->tq_lock);
939 while (minalloc-- > 0) 943 while (minalloc-- > 0)
940 taskq_ent_free(tq, taskq_ent_alloc(tq, TQ_SLEEP)); 944 taskq_ent_free(tq, taskq_ent_alloc(tq, TQ_SLEEP));
941 mutex_exit(&tq->tq_lock); 945 mutex_exit(&tq->tq_lock);
942 } 946 }
943 947
944 if (nthreads == 1) { 948 if (nthreads == 1) {
945 tq->tq_thread = thread_create(NULL, 0, taskq_thread, tq, 949 tq->tq_thread = thread_create(NULL, 0, taskq_thread, tq,
946 0, NULL, TS_RUN, pri); 950 0, NULL, TS_RUN, pri);
947 } else { 951 } else {
948 kthread_t **tpp = kmem_alloc(sizeof (kthread_t *) * nthreads, 952 kthread_t **tpp = kmem_alloc(sizeof (kthread_t *) * nthreads,
949 KM_SLEEP); 953 KM_SLEEP);
950 954
951 tq->tq_threadlist = tpp; 955 tq->tq_threadlist = tpp;
952 956
953 mutex_enter(&tq->tq_lock); 957 mutex_enter(&tq->tq_lock);
954 while (nthreads-- > 0) { 958 while (nthreads-- > 0) {
955 *tpp = thread_create(NULL, 0, taskq_thread, tq, 959 *tpp = thread_create(NULL, 0, taskq_thread, tq,
956 0, NULL, TS_RUN, pri); 960 0, NULL, TS_RUN, pri);
957 tpp++; 961 tpp++;
958 } 962 }
959 mutex_exit(&tq->tq_lock); 963 mutex_exit(&tq->tq_lock);
960 } 964 }
961 965
962 return (tq); 966 return (tq);
963} 967}
964 968
965/* 969/*
966 * taskq_destroy(). 970 * taskq_destroy().
967 * 971 *
968 * Assumes: by the time taskq_destroy is called no one will use this task queue 972 * Assumes: by the time taskq_destroy is called no one will use this task queue
969 * in any way and no one will try to dispatch entries in it. 973 * in any way and no one will try to dispatch entries in it.
970 */ 974 */
971void 975void
972taskq_destroy(taskq_t *tq) 976taskq_destroy(taskq_t *tq)
973{ 977{
974 taskq_bucket_t *b = tq->tq_buckets; 978 taskq_bucket_t *b = tq->tq_buckets;
975 int bid = 0; 979 int bid = 0;
976 980
977 ASSERT(! (tq->tq_flags & TASKQ_CPR_SAFE)); 981 ASSERT(! (tq->tq_flags & TASKQ_CPR_SAFE));
978 982
979 /* 983 /*
980 * Wait for any pending entries to complete. 984 * Wait for any pending entries to complete.
981 */ 985 */
982 taskq_wait(tq); 986 taskq_wait(tq);
983 987
984 mutex_enter(&tq->tq_lock); 988 mutex_enter(&tq->tq_lock);
985 ASSERT((tq->tq_task.tqent_next == &tq->tq_task) && 989 ASSERT((tq->tq_task.tqent_next == &tq->tq_task) &&
986 (tq->tq_active == 0)); 990 (tq->tq_active == 0));
987 991
988 if ((tq->tq_nthreads > 1) && (tq->tq_threadlist != NULL)) 992 if ((tq->tq_nthreads > 1) && (tq->tq_threadlist != NULL))
989 kmem_free(tq->tq_threadlist, sizeof (kthread_t *) * 993 kmem_free(tq->tq_threadlist, sizeof (kthread_t *) *
990 tq->tq_nthreads); 994 tq->tq_nthreads);
991 995
992 tq->tq_flags &= ~TASKQ_ACTIVE; 996 tq->tq_flags &= ~TASKQ_ACTIVE;
993 cv_broadcast(&tq->tq_dispatch_cv); 997 cv_broadcast(&tq->tq_dispatch_cv);
994 while (tq->tq_nthreads != 0) 998 while (tq->tq_nthreads != 0)
995 cv_wait(&tq->tq_wait_cv, &tq->tq_lock); 999 cv_wait(&tq->tq_wait_cv, &tq->tq_lock);
996 1000
997 tq->tq_minalloc = 0; 1001 tq->tq_minalloc = 0;
998 while (tq->tq_nalloc != 0) 1002 while (tq->tq_nalloc != 0)
999 taskq_ent_free(tq, taskq_ent_alloc(tq, TQ_SLEEP)); 1003 taskq_ent_free(tq, taskq_ent_alloc(tq, TQ_SLEEP));
1000 1004
1001 mutex_exit(&tq->tq_lock); 1005 mutex_exit(&tq->tq_lock);
1002 1006
1003 /* 1007 /*
1004 * Mark each bucket as closing and wakeup all sleeping threads. 1008 * Mark each bucket as closing and wakeup all sleeping threads.
1005 */ 1009 */
1006 for (; (b != NULL) && (bid < tq->tq_nbuckets); b++, bid++) { 1010 for (; (b != NULL) && (bid < tq->tq_nbuckets); b++, bid++) {
1007 taskq_ent_t *tqe; 1011 taskq_ent_t *tqe;
1008 1012
1009 mutex_enter(&b->tqbucket_lock); 1013 mutex_enter(&b->tqbucket_lock);
1010 1014
1011 b->tqbucket_flags |= TQBUCKET_CLOSE; 1015 b->tqbucket_flags |= TQBUCKET_CLOSE;
1012 /* Wakeup all sleeping threads */ 1016 /* Wakeup all sleeping threads */
1013 1017
1014 for (tqe = b->tqbucket_freelist.tqent_next; 1018 for (tqe = b->tqbucket_freelist.tqent_next;
1015 tqe != &b->tqbucket_freelist; tqe = tqe->tqent_next) 1019 tqe != &b->tqbucket_freelist; tqe = tqe->tqent_next)
1016 cv_signal(&tqe->tqent_cv); 1020 cv_signal(&tqe->tqent_cv);
1017 1021
1018 ASSERT(b->tqbucket_nalloc == 0); 1022 ASSERT(b->tqbucket_nalloc == 0);
1019 1023
1020 /* 1024 /*
1021 * At this point we waited for all pending jobs to complete (in 1025 * At this point we waited for all pending jobs to complete (in
1022 * both the task queue and the bucket and no new jobs should 1026 * both the task queue and the bucket and no new jobs should
1023 * arrive. Wait for all threads to die. 1027 * arrive. Wait for all threads to die.
1024 */ 1028 */
1025 while (b->tqbucket_nfree > 0) 1029 while (b->tqbucket_nfree > 0)
1026 cv_wait(&b->tqbucket_cv, &b->tqbucket_lock); 1030 cv_wait(&b->tqbucket_cv, &b->tqbucket_lock);
1027 mutex_exit(&b->tqbucket_lock); 1031 mutex_exit(&b->tqbucket_lock);
1028 mutex_destroy(&b->tqbucket_lock); 1032 mutex_destroy(&b->tqbucket_lock);
1029 cv_destroy(&b->tqbucket_cv); 1033 cv_destroy(&b->tqbucket_cv);
1030 } 1034 }
1031 1035
1032 if (tq->tq_buckets != NULL) { 1036 if (tq->tq_buckets != NULL) {
1033 ASSERT(tq->tq_flags & TASKQ_DYNAMIC); 1037 ASSERT(tq->tq_flags & TASKQ_DYNAMIC);
1034 kmem_free(tq->tq_buckets, 1038 kmem_free(tq->tq_buckets,
1035 sizeof (taskq_bucket_t) * tq->tq_nbuckets); 1039 sizeof (taskq_bucket_t) * tq->tq_nbuckets);
1036 1040
1037 /* Cleanup fields before returning tq to the cache */ 1041 /* Cleanup fields before returning tq to the cache */
1038 tq->tq_buckets = NULL; 1042 tq->tq_buckets = NULL;
1039 tq->tq_tcreates = 0; 1043 tq->tq_tcreates = 0;
1040 tq->tq_tdeaths = 0; 1044 tq->tq_tdeaths = 0;
1041 } else { 1045 } else {
1042 ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC)); 1046 ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC));
1043 } 1047 }
1044 1048
1045 tq->tq_totaltime = 0; 1049 tq->tq_totaltime = 0;
1046 tq->tq_tasks = 0; 1050 tq->tq_tasks = 0;
1047 tq->tq_maxtasks = 0; 1051 tq->tq_maxtasks = 0;
1048 tq->tq_executed = 0; 1052 tq->tq_executed = 0;
1049 kmem_cache_free(taskq_cache, tq); 1053 kmem_cache_free(taskq_cache, tq);
1050} 1054}