Index: kern_task.c =================================================================== RCS file: /cvs/src/sys/kern/kern_task.c,v retrieving revision 1.27 diff -u -p -r1.27 kern_task.c --- kern_task.c 19 Dec 2019 17:40:11 -0000 1.27 +++ kern_task.c 5 Jun 2020 01:05:09 -0000 @@ -36,6 +36,14 @@ static struct lock_type taskq_lock_type #endif /* WITNESS */ +struct taskq_thread { + SLIST_ENTRY(taskq_thread) + tt_entry; + struct proc *tt_thread; +}; + +SLIST_HEAD(taskq_threads, taskq_thread); + struct taskq { enum { TQ_S_CREATED, @@ -43,12 +51,12 @@ struct taskq { TQ_S_DESTROYED } tq_state; unsigned int tq_running; - unsigned int tq_waiting; unsigned int tq_nthreads; unsigned int tq_flags; const char *tq_name; struct mutex tq_mtx; + struct taskq_threads tq_threads; struct task_list tq_worklist; #ifdef WITNESS struct lock_object tq_lock_object; @@ -60,11 +68,11 @@ static const char taskq_sys_name[] = "sy struct taskq taskq_sys = { TQ_S_CREATED, 0, - 0, 1, 0, taskq_sys_name, MUTEX_INITIALIZER(IPL_HIGH), + SLIST_HEAD_INITIALIZER(taskq_sys.tq_threads), TAILQ_HEAD_INITIALIZER(taskq_sys.tq_worklist), #ifdef WITNESS { @@ -79,11 +87,11 @@ static const char taskq_sys_mp_name[] = struct taskq taskq_sys_mp = { TQ_S_CREATED, 0, - 0, 1, TASKQ_MPSAFE, taskq_sys_mp_name, MUTEX_INITIALIZER(IPL_HIGH), + SLIST_HEAD_INITIALIZER(taskq_sys_mp.tq_threads), TAILQ_HEAD_INITIALIZER(taskq_sys_mp.tq_worklist), #ifdef WITNESS { @@ -98,7 +106,6 @@ struct taskq *const systqmp = &taskq_sys void taskq_init(void); /* called in init_main.c */ void taskq_create_thread(void *); -void taskq_barrier_task(void *); int taskq_sleep(const volatile void *, struct mutex *, int, const char *, int); int taskq_next_work(struct taskq *, struct task *); @@ -125,12 +132,12 @@ taskq_create(const char *name, unsigned tq->tq_state = TQ_S_CREATED; tq->tq_running = 0; - tq->tq_waiting = 0; tq->tq_nthreads = nthreads; tq->tq_name = name; tq->tq_flags = flags; mtx_init_flags(&tq->tq_mtx, ipl, name, 0); + SLIST_INIT(&tq->tq_threads); TAILQ_INIT(&tq->tq_worklist); #ifdef WITNESS @@ -220,40 +227,84 @@ taskq_create_thread(void *arg) mtx_leave(&tq->tq_mtx); } -void -taskq_barrier(struct taskq *tq) +struct taskq_barrier { + struct taskq *tb_tq; + struct task tb_t; + struct rwlock tb_lock; + unsigned int tb_nthreads; +}; + +static void +taskq_barrier_wait(struct taskq_barrier *tb) { - struct cond c = COND_INITIALIZER(); - struct task t = TASK_INITIALIZER(taskq_barrier_task, &c); + struct taskq *tq = tb->tb_tq; - WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL); + if (tb->tb_nthreads < tq->tq_nthreads) { + /* tell the next thread we're waiting for it */ + task_add(tq, &tb->tb_t); + + do { + rwsleep_nsec(tb, &tb->tb_lock, PWAIT, "tqbar", INFSLP); + } while (tb->tb_nthreads < tq->tq_nthreads); + } +} + +static void +taskq_barrier_task(void *p) +{ + struct taskq_barrier *tb = p; - SET(t.t_flags, TASK_BARRIER); - task_add(tq, &t); - cond_wait(&c, "tqbar"); + rw_enter_write(&tb->tb_lock); + tb->tb_nthreads++; + taskq_barrier_wait(tb); + rw_exit_write(&tb->tb_lock); + + wakeup_one(tb); /* unwind */ +} + +static void +taskq_do_barrier(struct taskq *tq) +{ + struct taskq_barrier tb = { + .tb_tq = tq, + .tb_t = TASK_INITIALIZER(taskq_barrier_task, &tb), + .tb_lock = RWLOCK_INITIALIZER("tqbarlk"), + .tb_nthreads = 0, + }; + struct proc *thread = curproc; + struct taskq_thread *tt; + + mtx_enter(&tq->tq_mtx); + SLIST_FOREACH(tt, &tq->tq_threads, tt_entry) { + if (tt->tt_thread == thread) { + tb.tb_nthreads++; + break; + } + } + mtx_leave(&tq->tq_mtx); + + rw_enter_write(&tb.tb_lock); + taskq_barrier_wait(&tb); + rw_exit_write(&tb.tb_lock); } void -taskq_del_barrier(struct taskq *tq, struct task *del) +taskq_barrier(struct taskq *tq) { - struct cond c = COND_INITIALIZER(); - struct task t = TASK_INITIALIZER(taskq_barrier_task, &c); - WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL); - if (task_del(tq, del)) - return; - - SET(t.t_flags, TASK_BARRIER); - task_add(tq, &t); - cond_wait(&c, "tqbar"); + taskq_do_barrier(tq); } void -taskq_barrier_task(void *p) +taskq_del_barrier(struct taskq *tq, struct task *t) { - struct cond *c = p; - cond_signal(c); + WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL); + + if (task_del(tq, t)) + return; + + taskq_do_barrier(tq); } void @@ -311,30 +362,13 @@ taskq_next_work(struct taskq *tq, struct struct task *next; mtx_enter(&tq->tq_mtx); -retry: while ((next = TAILQ_FIRST(&tq->tq_worklist)) == NULL) { if (tq->tq_state != TQ_S_RUNNING) { mtx_leave(&tq->tq_mtx); return (0); } - tq->tq_waiting++; msleep_nsec(tq, &tq->tq_mtx, PWAIT, "bored", INFSLP); - tq->tq_waiting--; - } - - if (ISSET(next->t_flags, TASK_BARRIER)) { - /* - * Make sure all other threads are sleeping before we - * proceed and run the barrier task. - */ - if (++tq->tq_waiting == tq->tq_nthreads) { - tq->tq_waiting--; - } else { - msleep_nsec(tq, &tq->tq_mtx, PWAIT, "tqblk", INFSLP); - tq->tq_waiting--; - goto retry; - } } TAILQ_REMOVE(&tq->tq_worklist, next, t_entry); @@ -354,6 +388,7 @@ retry: void taskq_thread(void *xtq) { + struct taskq_thread self = { .tt_thread = curproc }; struct taskq *tq = xtq; struct task work; int last; @@ -361,6 +396,10 @@ taskq_thread(void *xtq) if (ISSET(tq->tq_flags, TASKQ_MPSAFE)) KERNEL_UNLOCK(); + mtx_enter(&tq->tq_mtx); + SLIST_INSERT_HEAD(&tq->tq_threads, &self, tt_entry); + mtx_leave(&tq->tq_mtx); + WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL); while (taskq_next_work(tq, &work)) { @@ -371,6 +410,7 @@ taskq_thread(void *xtq) } mtx_enter(&tq->tq_mtx); + SLIST_REMOVE(&tq->tq_threads, &self, taskq_thread, tt_entry); last = (--tq->tq_running == 0); mtx_leave(&tq->tq_mtx);