Wed Dec 26 21:25:52 2018 UTC ()
Whitespace tweaks.


(thorpej)
diff -r1.8 -r1.9 src/sys/kern/kern_threadpool.c

cvs diff -r1.8 -r1.9 src/sys/kern/kern_threadpool.c (expand / switch to unified diff)

--- src/sys/kern/kern_threadpool.c 2018/12/26 21:18:51 1.8
+++ src/sys/kern/kern_threadpool.c 2018/12/26 21:25:51 1.9
@@ -1,14 +1,14 @@ @@ -1,14 +1,14 @@
1/* $NetBSD: kern_threadpool.c,v 1.8 2018/12/26 21:18:51 thorpej Exp $ */ 1/* $NetBSD: kern_threadpool.c,v 1.9 2018/12/26 21:25:51 thorpej Exp $ */
2 2
3/*- 3/*-
4 * Copyright (c) 2014, 2018 The NetBSD Foundation, Inc. 4 * Copyright (c) 2014, 2018 The NetBSD Foundation, Inc.
5 * All rights reserved. 5 * All rights reserved.
6 * 6 *
7 * This code is derived from software contributed to The NetBSD Foundation 7 * This code is derived from software contributed to The NetBSD Foundation
8 * by Taylor R. Campbell and Jason R. Thorpe. 8 * by Taylor R. Campbell and Jason R. Thorpe.
9 * 9 *
10 * Redistribution and use in source and binary forms, with or without 10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions 11 * modification, are permitted provided that the following conditions
12 * are met: 12 * are met:
13 * 1. Redistributions of source code must retain the above copyright 13 * 1. Redistributions of source code must retain the above copyright
14 * notice, this list of conditions and the following disclaimer. 14 * notice, this list of conditions and the following disclaimer.
@@ -71,27 +71,27 @@ @@ -71,27 +71,27 @@
71 * | | <running (n+1)b> | | 71 * | | <running (n+1)b> | |
72 * | +------------------+ | 72 * | +------------------+ |
73 * +--------------------------------------------------------+ 73 * +--------------------------------------------------------+
74 * 74 *
75 * XXX Why one overseer per CPU? I did that originally to avoid 75 * XXX Why one overseer per CPU? I did that originally to avoid
76 * touching remote CPUs' memory when scheduling a job, but that still 76 * touching remote CPUs' memory when scheduling a job, but that still
77 * requires interprocessor synchronization. Perhaps we could get by 77 * requires interprocessor synchronization. Perhaps we could get by
78 * with a single overseer thread, at the expense of another pointer in 78 * with a single overseer thread, at the expense of another pointer in
79 * struct threadpool_job to identify the CPU on which it must run 79 * struct threadpool_job to identify the CPU on which it must run
80 * in order for the overseer to schedule it correctly. 80 * in order for the overseer to schedule it correctly.
81 */ 81 */
82 82
83#include <sys/cdefs.h> 83#include <sys/cdefs.h>
84__KERNEL_RCSID(0, "$NetBSD: kern_threadpool.c,v 1.8 2018/12/26 21:18:51 thorpej Exp $"); 84__KERNEL_RCSID(0, "$NetBSD: kern_threadpool.c,v 1.9 2018/12/26 21:25:51 thorpej Exp $");
85 85
86#include <sys/types.h> 86#include <sys/types.h>
87#include <sys/param.h> 87#include <sys/param.h>
88#include <sys/atomic.h> 88#include <sys/atomic.h>
89#include <sys/condvar.h> 89#include <sys/condvar.h>
90#include <sys/cpu.h> 90#include <sys/cpu.h>
91#include <sys/kernel.h> 91#include <sys/kernel.h>
92#include <sys/kmem.h> 92#include <sys/kmem.h>
93#include <sys/kthread.h> 93#include <sys/kthread.h>
94#include <sys/mutex.h> 94#include <sys/mutex.h>
95#include <sys/once.h> 95#include <sys/once.h>
96#include <sys/percpu.h> 96#include <sys/percpu.h>
97#include <sys/pool.h> 97#include <sys/pool.h>
@@ -375,38 +375,38 @@ threadpool_get(struct threadpool **poolp @@ -375,38 +375,38 @@ threadpool_get(struct threadpool **poolp
375 375
376 THREADPOOL_INIT(); 376 THREADPOOL_INIT();
377 377
378 ASSERT_SLEEPABLE(); 378 ASSERT_SLEEPABLE();
379 379
380 if (! threadpool_pri_is_valid(pri)) 380 if (! threadpool_pri_is_valid(pri))
381 return EINVAL; 381 return EINVAL;
382 382
383 mutex_enter(&threadpools_lock); 383 mutex_enter(&threadpools_lock);
384 tpu = threadpool_lookup_unbound(pri); 384 tpu = threadpool_lookup_unbound(pri);
385 if (tpu == NULL) { 385 if (tpu == NULL) {
386 mutex_exit(&threadpools_lock); 386 mutex_exit(&threadpools_lock);
387 TP_LOG(("%s: No pool for pri=%d, creating one.\n", 387 TP_LOG(("%s: No pool for pri=%d, creating one.\n",
388 __func__, (int)pri)); 388 __func__, (int)pri));
389 tmp = kmem_zalloc(sizeof(*tmp), KM_SLEEP); 389 tmp = kmem_zalloc(sizeof(*tmp), KM_SLEEP);
390 error = threadpool_create(&tmp->tpu_pool, NULL, pri); 390 error = threadpool_create(&tmp->tpu_pool, NULL, pri);
391 if (error) { 391 if (error) {
392 kmem_free(tmp, sizeof(*tmp)); 392 kmem_free(tmp, sizeof(*tmp));
393 return error; 393 return error;
394 } 394 }
395 mutex_enter(&threadpools_lock); 395 mutex_enter(&threadpools_lock);
396 tpu = threadpool_lookup_unbound(pri); 396 tpu = threadpool_lookup_unbound(pri);
397 if (tpu == NULL) { 397 if (tpu == NULL) {
398 TP_LOG(("%s: Won the creation race for pri=%d.\n", 398 TP_LOG(("%s: Won the creation race for pri=%d.\n",
399 __func__, (int)pri)); 399 __func__, (int)pri));
400 tpu = tmp; 400 tpu = tmp;
401 tmp = NULL; 401 tmp = NULL;
402 threadpool_insert_unbound(tpu); 402 threadpool_insert_unbound(tpu);
403 } 403 }
404 } 404 }
405 KASSERT(tpu != NULL); 405 KASSERT(tpu != NULL);
406 tpu->tpu_refcnt++; 406 tpu->tpu_refcnt++;
407 KASSERT(tpu->tpu_refcnt != 0); 407 KASSERT(tpu->tpu_refcnt != 0);
408 mutex_exit(&threadpools_lock); 408 mutex_exit(&threadpools_lock);
409 409
410 if (tmp != NULL) { 410 if (tmp != NULL) {
411 threadpool_destroy(&tmp->tpu_pool); 411 threadpool_destroy(&tmp->tpu_pool);
412 kmem_free(tmp, sizeof(*tmp)); 412 kmem_free(tmp, sizeof(*tmp));
@@ -423,27 +423,27 @@ threadpool_put(struct threadpool *pool,  @@ -423,27 +423,27 @@ threadpool_put(struct threadpool *pool,
423 container_of(pool, struct threadpool_unbound, tpu_pool); 423 container_of(pool, struct threadpool_unbound, tpu_pool);
424 424
425 THREADPOOL_INIT(); 425 THREADPOOL_INIT();
426 426
427 ASSERT_SLEEPABLE(); 427 ASSERT_SLEEPABLE();
428 428
429 KASSERT(threadpool_pri_is_valid(pri)); 429 KASSERT(threadpool_pri_is_valid(pri));
430 430
431 mutex_enter(&threadpools_lock); 431 mutex_enter(&threadpools_lock);
432 KASSERT(tpu == threadpool_lookup_unbound(pri)); 432 KASSERT(tpu == threadpool_lookup_unbound(pri));
433 KASSERT(0 < tpu->tpu_refcnt); 433 KASSERT(0 < tpu->tpu_refcnt);
434 if (--tpu->tpu_refcnt == 0) { 434 if (--tpu->tpu_refcnt == 0) {
435 TP_LOG(("%s: Last reference for pri=%d, destroying pool.\n", 435 TP_LOG(("%s: Last reference for pri=%d, destroying pool.\n",
436 __func__, (int)pri)); 436 __func__, (int)pri));
437 threadpool_remove_unbound(tpu); 437 threadpool_remove_unbound(tpu);
438 } else { 438 } else {
439 tpu = NULL; 439 tpu = NULL;
440 } 440 }
441 mutex_exit(&threadpools_lock); 441 mutex_exit(&threadpools_lock);
442 442
443 if (tpu) { 443 if (tpu) {
444 threadpool_destroy(&tpu->tpu_pool); 444 threadpool_destroy(&tpu->tpu_pool);
445 kmem_free(tpu, sizeof(*tpu)); 445 kmem_free(tpu, sizeof(*tpu));
446 } 446 }
447} 447}
448 448
449/* Per-CPU thread pools */ 449/* Per-CPU thread pools */
@@ -456,36 +456,36 @@ threadpool_percpu_get(struct threadpool_ @@ -456,36 +456,36 @@ threadpool_percpu_get(struct threadpool_
456 456
457 THREADPOOL_INIT(); 457 THREADPOOL_INIT();
458 458
459 ASSERT_SLEEPABLE(); 459 ASSERT_SLEEPABLE();
460 460
461 if (! threadpool_pri_is_valid(pri)) 461 if (! threadpool_pri_is_valid(pri))
462 return EINVAL; 462 return EINVAL;
463 463
464 mutex_enter(&threadpools_lock); 464 mutex_enter(&threadpools_lock);
465 pool_percpu = threadpool_lookup_percpu(pri); 465 pool_percpu = threadpool_lookup_percpu(pri);
466 if (pool_percpu == NULL) { 466 if (pool_percpu == NULL) {
467 mutex_exit(&threadpools_lock); 467 mutex_exit(&threadpools_lock);
468 TP_LOG(("%s: No pool for pri=%d, creating one.\n", 468 TP_LOG(("%s: No pool for pri=%d, creating one.\n",
469 __func__, (int)pri)); 469 __func__, (int)pri));
470 error = threadpool_percpu_create(&tmp, pri); 470 error = threadpool_percpu_create(&tmp, pri);
471 if (error) 471 if (error)
472 return error; 472 return error;
473 KASSERT(tmp != NULL); 473 KASSERT(tmp != NULL);
474 mutex_enter(&threadpools_lock); 474 mutex_enter(&threadpools_lock);
475 pool_percpu = threadpool_lookup_percpu(pri); 475 pool_percpu = threadpool_lookup_percpu(pri);
476 if (pool_percpu == NULL) { 476 if (pool_percpu == NULL) {
477 TP_LOG(("%s: Won the creation race for pri=%d.\n", 477 TP_LOG(("%s: Won the creation race for pri=%d.\n",
478 __func__, (int)pri)); 478 __func__, (int)pri));
479 pool_percpu = tmp; 479 pool_percpu = tmp;
480 tmp = NULL; 480 tmp = NULL;
481 threadpool_insert_percpu(pool_percpu); 481 threadpool_insert_percpu(pool_percpu);
482 } 482 }
483 } 483 }
484 KASSERT(pool_percpu != NULL); 484 KASSERT(pool_percpu != NULL);
485 pool_percpu->tpp_refcnt++; 485 pool_percpu->tpp_refcnt++;
486 KASSERT(pool_percpu->tpp_refcnt != 0); 486 KASSERT(pool_percpu->tpp_refcnt != 0);
487 mutex_exit(&threadpools_lock); 487 mutex_exit(&threadpools_lock);
488 488
489 if (tmp != NULL) 489 if (tmp != NULL)
490 threadpool_percpu_destroy(tmp); 490 threadpool_percpu_destroy(tmp);
491 KASSERT(pool_percpu != NULL); 491 KASSERT(pool_percpu != NULL);
@@ -498,27 +498,27 @@ threadpool_percpu_put(struct threadpool_ @@ -498,27 +498,27 @@ threadpool_percpu_put(struct threadpool_
498{ 498{
499 499
500 THREADPOOL_INIT(); 500 THREADPOOL_INIT();
501 501
502 ASSERT_SLEEPABLE(); 502 ASSERT_SLEEPABLE();
503 503
504 KASSERT(threadpool_pri_is_valid(pri)); 504 KASSERT(threadpool_pri_is_valid(pri));
505 505
506 mutex_enter(&threadpools_lock); 506 mutex_enter(&threadpools_lock);
507 KASSERT(pool_percpu == threadpool_lookup_percpu(pri)); 507 KASSERT(pool_percpu == threadpool_lookup_percpu(pri));
508 KASSERT(0 < pool_percpu->tpp_refcnt); 508 KASSERT(0 < pool_percpu->tpp_refcnt);
509 if (--pool_percpu->tpp_refcnt == 0) { 509 if (--pool_percpu->tpp_refcnt == 0) {
510 TP_LOG(("%s: Last reference for pri=%d, destroying pool.\n", 510 TP_LOG(("%s: Last reference for pri=%d, destroying pool.\n",
511 __func__, (int)pri)); 511 __func__, (int)pri));
512 threadpool_remove_percpu(pool_percpu); 512 threadpool_remove_percpu(pool_percpu);
513 } else { 513 } else {
514 pool_percpu = NULL; 514 pool_percpu = NULL;
515 } 515 }
516 mutex_exit(&threadpools_lock); 516 mutex_exit(&threadpools_lock);
517 517
518 if (pool_percpu) 518 if (pool_percpu)
519 threadpool_percpu_destroy(pool_percpu); 519 threadpool_percpu_destroy(pool_percpu);
520} 520}
521 521
522struct threadpool * 522struct threadpool *
523threadpool_percpu_ref(struct threadpool_percpu *pool_percpu) 523threadpool_percpu_ref(struct threadpool_percpu *pool_percpu)
524{ 524{
@@ -665,33 +665,34 @@ threadpool_job_destroy(struct threadpool @@ -665,33 +665,34 @@ threadpool_job_destroy(struct threadpool
665 job->job_lock = NULL; 665 job->job_lock = NULL;
666 KASSERT(job->job_thread == NULL); 666 KASSERT(job->job_thread == NULL);
667 KASSERT(job->job_refcnt == 0); 667 KASSERT(job->job_refcnt == 0);
668 KASSERT(!cv_has_waiters(&job->job_cv)); 668 KASSERT(!cv_has_waiters(&job->job_cv));
669 cv_destroy(&job->job_cv); 669 cv_destroy(&job->job_cv);
670 job->job_fn = threadpool_job_dead; 670 job->job_fn = threadpool_job_dead;
671 (void)strlcpy(job->job_name, "deadjob", sizeof(job->job_name)); 671 (void)strlcpy(job->job_name, "deadjob", sizeof(job->job_name));
672} 672}
673 673
674static int 674static int
675threadpool_job_hold(struct threadpool_job *job) 675threadpool_job_hold(struct threadpool_job *job)
676{ 676{
677 unsigned int refcnt; 677 unsigned int refcnt;
 678
678 do { 679 do {
679 refcnt = job->job_refcnt; 680 refcnt = job->job_refcnt;
680 if (refcnt == UINT_MAX) 681 if (refcnt == UINT_MAX)
681 return EBUSY; 682 return EBUSY;
682 } while (atomic_cas_uint(&job->job_refcnt, refcnt, (refcnt + 1)) 683 } while (atomic_cas_uint(&job->job_refcnt, refcnt, (refcnt + 1))
683 != refcnt); 684 != refcnt);
684  685
685 return 0; 686 return 0;
686} 687}
687 688
688static void 689static void
689threadpool_job_rele(struct threadpool_job *job) 690threadpool_job_rele(struct threadpool_job *job)
690{ 691{
691 unsigned int refcnt; 692 unsigned int refcnt;
692 693
693 do { 694 do {
694 refcnt = job->job_refcnt; 695 refcnt = job->job_refcnt;
695 KASSERT(0 < refcnt); 696 KASSERT(0 < refcnt);
696 if (refcnt == 1) { 697 if (refcnt == 1) {
697 mutex_enter(job->job_lock); 698 mutex_enter(job->job_lock);
@@ -722,43 +723,43 @@ void @@ -722,43 +723,43 @@ void
722threadpool_schedule_job(struct threadpool *pool, struct threadpool_job *job) 723threadpool_schedule_job(struct threadpool *pool, struct threadpool_job *job)
723{ 724{
724 725
725 KASSERT(mutex_owned(job->job_lock)); 726 KASSERT(mutex_owned(job->job_lock));
726 727
727 /* 728 /*
728 * If the job's already running, let it keep running. The job 729 * If the job's already running, let it keep running. The job
729 * is guaranteed by the interlock not to end early -- if it had 730 * is guaranteed by the interlock not to end early -- if it had
730 * ended early, threadpool_job_done would have set job_thread 731 * ended early, threadpool_job_done would have set job_thread
731 * to NULL under the interlock. 732 * to NULL under the interlock.
732 */ 733 */
733 if (__predict_true(job->job_thread != NULL)) { 734 if (__predict_true(job->job_thread != NULL)) {
734 TP_LOG(("%s: job '%s' already runnining.\n", 735 TP_LOG(("%s: job '%s' already runnining.\n",
735 __func__, job->job_name)); 736 __func__, job->job_name));
736 return; 737 return;
737 } 738 }
738 739
739 /* Otherwise, try to assign a thread to the job. */ 740 /* Otherwise, try to assign a thread to the job. */
740 mutex_spin_enter(&pool->tp_lock); 741 mutex_spin_enter(&pool->tp_lock);
741 if (__predict_false(TAILQ_EMPTY(&pool->tp_idle_threads))) { 742 if (__predict_false(TAILQ_EMPTY(&pool->tp_idle_threads))) {
742 /* Nobody's idle. Give it to the overseer. */ 743 /* Nobody's idle. Give it to the overseer. */
743 TP_LOG(("%s: giving job '%s' to overseer.\n", 744 TP_LOG(("%s: giving job '%s' to overseer.\n",
744 __func__, job->job_name)); 745 __func__, job->job_name));
745 job->job_thread = &pool->tp_overseer; 746 job->job_thread = &pool->tp_overseer;
746 TAILQ_INSERT_TAIL(&pool->tp_jobs, job, job_entry); 747 TAILQ_INSERT_TAIL(&pool->tp_jobs, job, job_entry);
747 } else { 748 } else {
748 /* Assign it to the first idle thread. */ 749 /* Assign it to the first idle thread. */
749 job->job_thread = TAILQ_FIRST(&pool->tp_idle_threads); 750 job->job_thread = TAILQ_FIRST(&pool->tp_idle_threads);
750 TP_LOG(("%s: giving job '%s' to idle thread %p.\n", 751 TP_LOG(("%s: giving job '%s' to idle thread %p.\n",
751 __func__, job->job_name, job->job_thread)); 752 __func__, job->job_name, job->job_thread));
752 TAILQ_REMOVE(&pool->tp_idle_threads, job->job_thread, 753 TAILQ_REMOVE(&pool->tp_idle_threads, job->job_thread,
753 tpt_entry); 754 tpt_entry);
754 threadpool_job_hold(job); 755 threadpool_job_hold(job);
755 job->job_thread->tpt_job = job; 756 job->job_thread->tpt_job = job;
756 } 757 }
757 758
758 /* Notify whomever we gave it to, overseer or idle thread. */ 759 /* Notify whomever we gave it to, overseer or idle thread. */
759 KASSERT(job->job_thread != NULL); 760 KASSERT(job->job_thread != NULL);
760 cv_broadcast(&job->job_thread->tpt_cv); 761 cv_broadcast(&job->job_thread->tpt_cv);
761 mutex_spin_exit(&pool->tp_lock); 762 mutex_spin_exit(&pool->tp_lock);
762} 763}
763 764
764bool 765bool
@@ -835,38 +836,38 @@ threadpool_overseer_thread(void *arg) @@ -835,38 +836,38 @@ threadpool_overseer_thread(void *arg)
835 836
836 /* Wait until we're initialized. */ 837 /* Wait until we're initialized. */
837 mutex_spin_enter(&pool->tp_lock); 838 mutex_spin_enter(&pool->tp_lock);
838 while (overseer->tpt_lwp == NULL) 839 while (overseer->tpt_lwp == NULL)
839 cv_wait(&overseer->tpt_cv, &pool->tp_lock); 840 cv_wait(&overseer->tpt_cv, &pool->tp_lock);
840 841
841 TP_LOG(("%s: starting.\n", __func__)); 842 TP_LOG(("%s: starting.\n", __func__));
842 843
843 for (;;) { 844 for (;;) {
844 /* Wait until there's a job. */ 845 /* Wait until there's a job. */
845 while (TAILQ_EMPTY(&pool->tp_jobs)) { 846 while (TAILQ_EMPTY(&pool->tp_jobs)) {
846 if (ISSET(pool->tp_flags, THREADPOOL_DYING)) { 847 if (ISSET(pool->tp_flags, THREADPOOL_DYING)) {
847 TP_LOG(("%s: THREADPOOL_DYING\n", 848 TP_LOG(("%s: THREADPOOL_DYING\n",
848 __func__)); 849 __func__));
849 break; 850 break;
850 } 851 }
851 cv_wait(&overseer->tpt_cv, &pool->tp_lock); 852 cv_wait(&overseer->tpt_cv, &pool->tp_lock);
852 } 853 }
853 if (__predict_false(TAILQ_EMPTY(&pool->tp_jobs))) 854 if (__predict_false(TAILQ_EMPTY(&pool->tp_jobs)))
854 break; 855 break;
855 856
856 /* If there are no threads, we'll have to try to start one. */ 857 /* If there are no threads, we'll have to try to start one. */
857 if (TAILQ_EMPTY(&pool->tp_idle_threads)) { 858 if (TAILQ_EMPTY(&pool->tp_idle_threads)) {
858 TP_LOG(("%s: Got a job, need to create a thread.\n", 859 TP_LOG(("%s: Got a job, need to create a thread.\n",
859 __func__)); 860 __func__));
860 threadpool_hold(pool); 861 threadpool_hold(pool);
861 mutex_spin_exit(&pool->tp_lock); 862 mutex_spin_exit(&pool->tp_lock);
862 863
863 struct threadpool_thread *const thread = 864 struct threadpool_thread *const thread =
864 pool_cache_get(threadpool_thread_pc, PR_WAITOK); 865 pool_cache_get(threadpool_thread_pc, PR_WAITOK);
865 thread->tpt_lwp = NULL; 866 thread->tpt_lwp = NULL;
866 thread->tpt_pool = pool; 867 thread->tpt_pool = pool;
867 thread->tpt_job = NULL; 868 thread->tpt_job = NULL;
868 cv_init(&thread->tpt_cv, "poolthrd"); 869 cv_init(&thread->tpt_cv, "poolthrd");
869 870
870 ktflags = 0; 871 ktflags = 0;
871 ktflags |= KTHREAD_MPSAFE; 872 ktflags |= KTHREAD_MPSAFE;
872 if (pool->tp_pri < PRI_KERNEL) 873 if (pool->tp_pri < PRI_KERNEL)
@@ -912,39 +913,39 @@ threadpool_overseer_thread(void *arg) @@ -912,39 +913,39 @@ threadpool_overseer_thread(void *arg)
912 mutex_spin_exit(&pool->tp_lock); 913 mutex_spin_exit(&pool->tp_lock);
913 914
914 mutex_enter(job->job_lock); 915 mutex_enter(job->job_lock);
915 /* If the job was cancelled, we'll no longer be its thread. */ 916 /* If the job was cancelled, we'll no longer be its thread. */
916 if (__predict_true(job->job_thread == overseer)) { 917 if (__predict_true(job->job_thread == overseer)) {
917 mutex_spin_enter(&pool->tp_lock); 918 mutex_spin_enter(&pool->tp_lock);
918 if (__predict_false( 919 if (__predict_false(
919 TAILQ_EMPTY(&pool->tp_idle_threads))) { 920 TAILQ_EMPTY(&pool->tp_idle_threads))) {
920 /* 921 /*
921 * Someone else snagged the thread 922 * Someone else snagged the thread
922 * first. We'll have to try again. 923 * first. We'll have to try again.
923 */ 924 */
924 TP_LOG(("%s: '%s' lost race to use idle thread.\n", 925 TP_LOG(("%s: '%s' lost race to use idle thread.\n",
925 __func__, job->job_name)); 926 __func__, job->job_name));
926 TAILQ_INSERT_HEAD(&pool->tp_jobs, job, 927 TAILQ_INSERT_HEAD(&pool->tp_jobs, job,
927 job_entry); 928 job_entry);
928 } else { 929 } else {
929 /* 930 /*
930 * Assign the job to the thread and 931 * Assign the job to the thread and
931 * wake the thread so it starts work. 932 * wake the thread so it starts work.
932 */ 933 */
933 struct threadpool_thread *const thread = 934 struct threadpool_thread *const thread =
934 TAILQ_FIRST(&pool->tp_idle_threads); 935 TAILQ_FIRST(&pool->tp_idle_threads);
935 936
936 TP_LOG(("%s: '%s' gets thread %p\n", 937 TP_LOG(("%s: '%s' gets thread %p\n",
937 __func__, job->job_name, thread)); 938 __func__, job->job_name, thread));
938 KASSERT(thread->tpt_job == NULL); 939 KASSERT(thread->tpt_job == NULL);
939 TAILQ_REMOVE(&pool->tp_idle_threads, thread, 940 TAILQ_REMOVE(&pool->tp_idle_threads, thread,
940 tpt_entry); 941 tpt_entry);
941 thread->tpt_job = job; 942 thread->tpt_job = job;
942 job->job_thread = thread; 943 job->job_thread = thread;
943 cv_broadcast(&thread->tpt_cv); 944 cv_broadcast(&thread->tpt_cv);
944 /* Gave the thread our job reference. */ 945 /* Gave the thread our job reference. */
945 rele_job = false; 946 rele_job = false;
946 } 947 }
947 mutex_spin_exit(&pool->tp_lock); 948 mutex_spin_exit(&pool->tp_lock);
948 } 949 }
949 mutex_exit(job->job_lock); 950 mutex_exit(job->job_lock);
950 if (__predict_false(rele_job)) 951 if (__predict_false(rele_job))
@@ -973,45 +974,45 @@ threadpool_thread(void *arg) @@ -973,45 +974,45 @@ threadpool_thread(void *arg)
973 /* Wait until we're initialized and on the queue. */ 974 /* Wait until we're initialized and on the queue. */
974 mutex_spin_enter(&pool->tp_lock); 975 mutex_spin_enter(&pool->tp_lock);
975 while (thread->tpt_lwp == NULL) 976 while (thread->tpt_lwp == NULL)
976 cv_wait(&thread->tpt_cv, &pool->tp_lock); 977 cv_wait(&thread->tpt_cv, &pool->tp_lock);
977 978
978 TP_LOG(("%s: starting.\n", __func__)); 979 TP_LOG(("%s: starting.\n", __func__));
979 980
980 KASSERT(thread->tpt_lwp == curlwp); 981 KASSERT(thread->tpt_lwp == curlwp);
981 for (;;) { 982 for (;;) {
982 /* Wait until we are assigned a job. */ 983 /* Wait until we are assigned a job. */
983 while (thread->tpt_job == NULL) { 984 while (thread->tpt_job == NULL) {
984 if (ISSET(pool->tp_flags, THREADPOOL_DYING)) { 985 if (ISSET(pool->tp_flags, THREADPOOL_DYING)) {
985 TP_LOG(("%s: THREADPOOL_DYING\n", 986 TP_LOG(("%s: THREADPOOL_DYING\n",
986 __func__)); 987 __func__));
987 break; 988 break;
988 } 989 }
989 if (cv_timedwait(&thread->tpt_cv, &pool->tp_lock, 990 if (cv_timedwait(&thread->tpt_cv, &pool->tp_lock,
990 THREADPOOL_IDLE_TICKS)) 991 THREADPOOL_IDLE_TICKS))
991 break; 992 break;
992 } 993 }
993 if (__predict_false(thread->tpt_job == NULL)) { 994 if (__predict_false(thread->tpt_job == NULL)) {
994 TAILQ_REMOVE(&pool->tp_idle_threads, thread, 995 TAILQ_REMOVE(&pool->tp_idle_threads, thread,
995 tpt_entry); 996 tpt_entry);
996 break; 997 break;
997 } 998 }
998 999
999 struct threadpool_job *const job = thread->tpt_job; 1000 struct threadpool_job *const job = thread->tpt_job;
1000 KASSERT(job != NULL); 1001 KASSERT(job != NULL);
1001 mutex_spin_exit(&pool->tp_lock); 1002 mutex_spin_exit(&pool->tp_lock);
1002 1003
1003 TP_LOG(("%s: running job '%s' on thread %p.\n", 1004 TP_LOG(("%s: running job '%s' on thread %p.\n",
1004 __func__, job->job_name, thread)); 1005 __func__, job->job_name, thread));
1005 1006
1006 /* Set our lwp name to reflect what job we're doing. */ 1007 /* Set our lwp name to reflect what job we're doing. */
1007 lwp_lock(curlwp); 1008 lwp_lock(curlwp);
1008 char *const lwp_name = curlwp->l_name; 1009 char *const lwp_name = curlwp->l_name;
1009 curlwp->l_name = job->job_name; 1010 curlwp->l_name = job->job_name;
1010 lwp_unlock(curlwp); 1011 lwp_unlock(curlwp);
1011 1012
1012 /* Run the job. */ 1013 /* Run the job. */
1013 (*job->job_fn)(job); 1014 (*job->job_fn)(job);
1014 1015
1015 /* Restore our lwp name. */ 1016 /* Restore our lwp name. */
1016 lwp_lock(curlwp); 1017 lwp_lock(curlwp);
1017 curlwp->l_name = lwp_name; 1018 curlwp->l_name = lwp_name;