Index: share/man/man9/task_add.9 =================================================================== RCS file: /cvs/src/share/man/man9/task_add.9,v retrieving revision 1.16 diff -u -p -r1.16 task_add.9 --- share/man/man9/task_add.9 14 Sep 2015 15:14:55 -0000 1.16 +++ share/man/man9/task_add.9 6 Dec 2015 03:39:03 -0000 @@ -18,15 +18,23 @@ .Dt TASK_ADD 9 .Os .Sh NAME +.Nm task_set , +.Nm TASK_INITIALIZER , .Nm taskq_create , .Nm taskq_destroy , -.Nm task_set , +.Nm taskq_barrier , .Nm task_add , .Nm task_del , -.Nm TASK_INITIALIZER -.Nd task queues +.Nm taskctx_init , +.Nm TASKCTX_INITIALIZER , +.Nm taskctx_barrier , +.Nm task_dispatch +.Nd task queues and contexts .Sh SYNOPSIS .In sys/task.h +.Ft void +.Fn task_set "struct task *t" "void (*fn)(void *)" "void *arg" +.Fn TASK_INITIALIZER "void (*fn)(void *)" "void *arg" .Ft struct taskq * .Fo taskq_create .Fa "const char *name" @@ -37,19 +45,74 @@ .Ft void .Fn taskq_destroy "struct taskq *tq" .Ft void -.Fn task_set "struct task *t" "void (*fn)(void *)" "void *arg" +.Fn taskq_barrier "struct taskq *tq" .Ft int .Fn task_add "struct taskq *tq" "struct task *t" .Ft int .Fn task_del "struct taskq *tq" "struct task *t" +.Ft void +.Fn taskctx_init "struct taskctx *tc" "int ipl" +.Fn TASKCTX_INITIALIZER "struct taskctx self" "int ipl" +.Ft void +.Fn taskctx_barrier "struct taskctx *tc" +.Ft void +.Fn task_dispatch "struct taskctx *tc" "struct task *t" .Vt extern struct taskq *const systq; .Vt extern struct taskq *const systqmp; -.Fn TASK_INITIALIZER "void (*fn)(void *)" "void *arg" .Sh DESCRIPTION The taskq API provides a mechanism to defer work to a process context. .Pp +The +taskctx +API provides a mechanism to serialise work in a single context. +A taskctx guarantees that all work submitted to it will not run +concurrently and can therefore provide exclusive access to a resource. +It attempts to run the submitted work immediately, unless another +another CPU is already running work in the taskctx. +The other CPU will then run all the submitted work on behalf of the +rest of the system. +.Ss TASKS +Work is represented in both the +taskq +and +taskctx +APIs by task structures. +It is up to the user of the APIs to allocate the task structures. +.Pp +.Fn task_set +prepares the task structure +.Fa t +to be used in a +taskq +with +.Fn task_add +and +.Fn task_del , +or for use in a +taskctx +with +.Fn task_dispatch . +.Fa t +will be prepared to call the function +.Fa fn +with the argument specified by +.Fa arg . +.Pp +A task declaration can be initialised with the +.Fn TASK_INITIALIZER +macro. +The task will be prepared to call the function specified by the +.Fa fn +argument with the +.Fa void * +argument given in +.Fa arg . +.Pp +A task may only be used exclusively by a single taskq or taskctx +at a time. +.Ss TASKQS .Fn taskq_create allocates a taskq and a set of threads to be used to complete work that would be inappropriate for the shared system taskq. @@ -88,33 +151,14 @@ Calling against the system taskq is an error and will lead to undefined behaviour or a system fault. .Pp -It is the responsibility of the caller to provide the -.Fn task_set , -.Fn task_add , -and -.Fn task_del -functions with pre-allocated task structures. -.Pp -.Fn task_set -prepares the task structure -.Fa t -to be used in future calls to -.Fn task_add -and -.Fn task_del . -.Fa t -will be prepared to call the function -.Fa fn -with the argument specified by -.Fa arg . -Once initialised, the -.Fa t -structure can be used repeatedly in calls to -.Fn task_add -and -.Fn task_del -and does not need to be reinitialised unless the function called -and/or its argument must change. +.Fn taskq_barrier +guarantees that any task that was running on the +.Fa tq +taskq when the barrier was called has finished by the time the barrier +returns. +.Fn taskq_barrier +is only supported on taskqs serviced by 1 thread, +and may not be called by a task running in the specified taskq. .Pp .Fn task_add schedules the execution of the work specified by the @@ -148,25 +192,60 @@ They can both be used by any subsystem f They are serviced by a single thread and can therefore provide predictable ordering of work. Work can be scheduled on the system taskqs from callers at or below IPL_HIGH. +.Ss TASKCTXS +.Fn taskctx_init +initialises the task context +.Fa tc +for work to be dispatched to. +.Fa ipl +specifies the highest interrupt protection level at which +.Fn task_dispatch +will be called against the initialised taskctx. +See +.Xr spl 9 +for a list of the IPLs. .Pp -A task declaration can be initialised with the -.Fn TASK_INITIALIZER +A taskctx declaration can be initialised with the +.Fn TASKCTX_INITIALIZER macro. -The task will be prepared to call the function specified by the -.Fa fn -argument with the -.Fa void * -argument given in -.Fa arg . +The taskctx being initialised must be passed as +.Fa self . +.Fa ipl +specifies the highest interrupt protection level at which +.Fn task_dispatch +will be called against the initialised taskctx. +.Pp +.Fn taskctx_barrier +guarantees that any task that was running in the +.Fa tc +taskctx when the barrier was called has finished by the time the barrier +returns. +.Pp +.Fn task_dispatch +submits work represented by +.Fa t +to be run within the serialised context of the +.Fa tc +taskctx. +The task structure must already be initialised by +.Fn task_set . .Sh CONTEXT .Fn taskq_create and .Fn taskq_destroy can be called during autoconf, or from process context. +.Pp +.Fn taskq_barrier +and +.Fn taskctx_barrier +can be called from process context. +.Pp +.Fn taskctx_init , .Fn task_set , .Fn task_add , +.Fn task_del , and -.Fn task_del +.Fn task_dispatch can be called during autoconf, from process context, or from interrupt context. .Sh RETURN VALUES .Fn taskq_create Index: share/man/man9/Makefile =================================================================== RCS file: /cvs/src/share/man/man9/Makefile,v retrieving revision 1.262 diff -u -p -r1.262 Makefile --- share/man/man9/Makefile 25 Nov 2015 03:09:57 -0000 1.262 +++ share/man/man9/Makefile 6 Dec 2015 03:39:03 -0000 @@ -397,9 +397,14 @@ MLINKS+=systrace.9 systrace_redirect.9 \ systrace.9 systrace_fork.9 systrace.9 systrace_exit.9 MLINKS+=task_add.9 taskq_create.9 \ task_add.9 taskq_destroy.9 \ + task_add.9 taskq_barrier.9 \ task_add.9 task_set.9 \ task_add.9 task_del.9 \ - task_add.9 TASK_INITIALIZER.9 + task_add.9 TASK_INITIALIZER.9 \ + task_add.9 taskctx_init.9 \ + task_add.9 TASKCTX_INITIALIZER.9 \ + task_add.9 taskctx_barrier.9 \ + task_add.9 task_dispatch.9 MLINKS+=time.9 boottime.9 time.9 mono_time.9 time.9 runtime.9 MLINKS+=timeout.9 timeout_add.9 timeout.9 timeout_set.9 \ timeout.9 timeout_pending.9 timeout.9 timeout_del.9 \ Index: sys/dev/pci/if_myx.c =================================================================== RCS file: /cvs/src/sys/dev/pci/if_myx.c,v retrieving revision 1.90 diff -u -p -r1.90 if_myx.c --- sys/dev/pci/if_myx.c 3 Dec 2015 12:45:56 -0000 1.90 +++ sys/dev/pci/if_myx.c 6 Dec 2015 03:39:03 -0000 @@ -34,6 +34,7 @@ #include #include #include +#include #include #include @@ -151,6 +152,7 @@ struct myx_softc { u_int sc_tx_prod; u_int sc_tx_cons; struct myx_slot *sc_tx_slots; + struct task sc_tx_restart; struct ifmedia sc_media; @@ -527,6 +529,8 @@ myx_attachhook(void *arg) IFCAP_CSUM_UDPv4; #endif + task_set(&sc->sc_tx_restart, if_restart_task, ifp); + ifmedia_init(&sc->sc_media, 0, myx_media_change, myx_media_status); ifmedia_add(&sc->sc_media, IFM_ETHER | IFM_AUTO, 0, NULL); ifmedia_set(&sc->sc_media, IFM_ETHER | IFM_AUTO); @@ -1702,9 +1706,8 @@ myx_txeof(struct myx_softc *sc, u_int32_ sc->sc_tx_ring_cons = idx; sc->sc_tx_cons = cons; - ifq_clr_oactive(&ifp->if_snd); if (!ifq_empty(&ifp->if_snd)) - if_start(ifp); + if_restart(ifp, &sc->sc_tx_restart); } void Index: sys/kern/kern_task.c =================================================================== RCS file: /cvs/src/sys/kern/kern_task.c,v retrieving revision 1.15 diff -u -p -r1.15 kern_task.c --- sys/kern/kern_task.c 19 Nov 2015 13:19:24 -0000 1.15 +++ sys/kern/kern_task.c 6 Dec 2015 03:39:03 -0000 @@ -22,6 +22,8 @@ #include #include #include +#include +#include #define TASK_ONQUEUE 1 @@ -73,6 +75,10 @@ int taskq_sleep(const volatile void *, s int taskq_next_work(struct taskq *, struct task *, sleepfn); void taskq_thread(void *); +void taskctx_run(struct taskctx *); + +void task_barrier(void *); + void taskq_init(void) { @@ -179,6 +185,30 @@ taskq_create_thread(void *arg) } void +taskq_barrier(struct taskq *tq) +{ + struct sleep_state sls; + unsigned int notdone = 1; + struct task t = TASK_INITIALIZER(task_barrier, ¬done); + + task_add(tq, &t); + + while (notdone) { + sleep_setup(&sls, ¬done, PWAIT, "tqbar"); + sleep_finish(&sls, notdone); + } +} + +void +task_barrier(void *p) +{ + unsigned int *notdone = p; + + *notdone = 0; + wakeup_one(notdone); +} + +void task_set(struct task *t, void (*fn)(void *), void *arg) { t->t_func = fn; @@ -305,4 +335,99 @@ taskq_thread(void *xtq) wakeup_one(&tq->tq_running); kthread_exit(0); +} + +void +taskctx_init(struct taskctx *tc, int ipl) +{ + mtx_init(&tc->tc_mtx, ipl); + TAILQ_INIT(&tc->tc_worklist); + tc->tc_serializer = 0; +} + +void +task_dispatch(struct taskctx *tc, struct task *t) +{ + if (ISSET(t->t_flags, TASK_ONQUEUE)) + return; + + mtx_enter(&tc->tc_mtx); + if (!ISSET(t->t_flags, TASK_ONQUEUE)) { + SET(t->t_flags, TASK_ONQUEUE); + TAILQ_INSERT_TAIL(&tc->tc_worklist, t, t_entry); + } + mtx_leave(&tc->tc_mtx); + + taskctx_run(tc); +} + +void +taskctx_barrier(struct taskctx *tc) +{ + struct sleep_state sls; + unsigned int notdone = 1; + struct task t = TASK_INITIALIZER(task_barrier, ¬done); + + if (tc->tc_serializer == 0) + return; + + task_dispatch(tc, &t); + + while (notdone) { + sleep_setup(&sls, ¬done, PWAIT, "tcbar"); + sleep_finish(&sls, notdone); + } +} + +static inline unsigned int +taskctx_enter(struct taskctx *tc) +{ + return (atomic_inc_int_nv(&tc->tc_serializer) == 1); +} + +static inline unsigned int +taskctx_leave(struct taskctx *tc) +{ + if (atomic_cas_uint(&tc->tc_serializer, 1, 0) == 1) + return (1); + + tc->tc_serializer = 1; + + return (0); +} + +static inline int +taskctx_next_work(struct taskctx *tc, struct task *work) +{ + struct task *t; + int rv = 0; + + mtx_enter(&tc->tc_mtx); + t = TAILQ_FIRST(&tc->tc_worklist); + if (t != NULL) { + TAILQ_REMOVE(&tc->tc_worklist, t, t_entry); + CLR(t->t_flags, TASK_ONQUEUE); + + *work = *t; /* copy to caller to avoid races */ + + rv = 1; + } + mtx_leave(&tc->tc_mtx); + + return (rv); +} + +void +taskctx_run(struct taskctx *tc) +{ + struct task work; + + if (!taskctx_enter(tc)) + return; + + do { + while (taskctx_next_work(tc, &work)) + (*work.t_func)(work.t_arg); + + } while (!taskctx_leave(tc)); } Index: sys/net/if.c =================================================================== RCS file: /cvs/src/sys/net/if.c,v retrieving revision 1.422 diff -u -p -r1.422 if.c --- sys/net/if.c 5 Dec 2015 10:07:55 -0000 1.422 +++ sys/net/if.c 6 Dec 2015 03:39:03 -0000 @@ -153,8 +153,9 @@ void if_input_process(void *); void ifa_print_all(void); #endif -void if_start_mpsafe(struct ifnet *ifp); -void if_start_locked(struct ifnet *ifp); +void if_start_mpsafe(struct ifnet *); +void if_start_locked(struct ifnet *); +void if_start_task(void *); /* * interface index map @@ -513,6 +514,7 @@ if_attach_common(struct ifnet *ifp) TAILQ_INIT(&ifp->if_maddrlist); ifq_init(&ifp->if_snd); + task_set(&ifp->if_start_ctx, if_start_task, ifp); ifp->if_addrhooks = malloc(sizeof(*ifp->if_addrhooks), M_TEMP, M_WAITOK); @@ -557,66 +559,50 @@ if_start_locked(struct ifnet *ifp) KERNEL_UNLOCK(); } -static inline unsigned int -ifq_enter(struct ifqueue *ifq) +void +if_start_mpsafe(struct ifnet *ifp) { - return (atomic_inc_int_nv(&ifq->ifq_serializer) == 1); + task_dispatch(&ifp->if_snd.ifq_serializer, &ifp->if_start_ctx); } -static inline unsigned int -ifq_leave(struct ifqueue *ifq) +void +if_start_task(void *p) { - if (atomic_cas_uint(&ifq->ifq_serializer, 1, 0) == 1) - return (1); + struct ifnet *ifp = p; - ifq->ifq_serializer = 1; + if (!ISSET(ifp->if_flags, IFF_RUNNING) || + ifq_empty(&ifp->if_snd) || + ifq_is_oactive(&ifp->if_snd)) + return; - return (0); + ifp->if_start(ifp); } void -if_start_mpsafe(struct ifnet *ifp) +if_start_barrier(struct ifnet *ifp) { - struct ifqueue *ifq = &ifp->if_snd; - - if (!ifq_enter(ifq)) - return; - - do { - if (__predict_false(!ISSET(ifp->if_flags, IFF_RUNNING))) { - ifq->ifq_serializer = 0; - wakeup_one(&ifq->ifq_serializer); - return; - } - - if (ifq_empty(ifq) || ifq_is_oactive(ifq)) - continue; - - ifp->if_start(ifp); + taskctx_barrier(&ifp->if_snd.ifq_serializer); +} - } while (!ifq_leave(ifq)); +void +if_restart(struct ifnet *ifp, struct task *t) +{ + task_dispatch(&ifp->if_snd.ifq_serializer, t); } void -if_start_barrier(struct ifnet *ifp) +if_restart_task(void *p) { - struct sleep_state sls; - struct ifqueue *ifq = &ifp->if_snd; + struct ifnet *ifp = p; - /* this should only be called from converted drivers */ KASSERT(ISSET(ifp->if_xflags, IFXF_MPSAFE)); - /* drivers should only call this on the way down */ - KASSERT(!ISSET(ifp->if_flags, IFF_RUNNING)); - - if (ifq->ifq_serializer == 0) + ifq_clr_oactive(&ifp->if_snd); + if (!ISSET(ifp->if_flags, IFF_RUNNING) || + ifq_empty(&ifp->if_snd)) return; - if_start_mpsafe(ifp); /* spin the wheel to guarantee a wakeup */ - do { - sleep_setup(&sls, &ifq->ifq_serializer, PWAIT, "ifbar"); - sleep_finish(&sls, ifq->ifq_serializer != 0); - } while (ifq->ifq_serializer != 0); + ifp->if_start(ifp); } int @@ -834,8 +820,8 @@ if_input_process(void *xmq) struct mbuf *m; struct ifnet *ifp; struct ifih *ifih; + struct ifref r; struct srpl_iter i; - int s; mq_delist(mq, &ml); if (ml_empty(&ml)) @@ -843,12 +829,11 @@ if_input_process(void *xmq) add_net_randomness(ml_len(&ml)); - s = splnet(); while ((m = ml_dequeue(&ml)) != NULL) { - ifp = if_get(m->m_pkthdr.ph_ifidx); + ifp = if_enter(&r, m->m_pkthdr.ph_ifidx); if (ifp == NULL) { m_freem(m); - continue; + goto next; } /* @@ -864,9 +849,9 @@ if_input_process(void *xmq) if (ifih == NULL) m_freem(m); - if_put(ifp); +next: + if_leave(&r, ifp); } - splx(s); } void @@ -1535,28 +1520,49 @@ ifunit(const char *name) return (NULL); } -/* - * Map interface index to interface structure pointer. - */ struct ifnet * -if_get(unsigned int index) +if_enter(struct ifref *r, unsigned int index) { struct if_map *if_map; struct srp *map; - struct ifnet *ifp = NULL; if_map = srp_enter(&if_idxmap.map); if (index < if_map->limit) { map = (struct srp *)(if_map + 1); - ifp = srp_follow(&if_idxmap.map, if_map, &map[index]); - if (ifp != NULL) { - KASSERT(ifp->if_index == index); - if_ref(ifp); - } - srp_leave(&map[index], ifp); - } else - srp_leave(&if_idxmap.map, if_map); + r->p = &map[index]; + return (srp_follow(&if_idxmap.map, if_map, r->p)); + } + srp_leave(&if_idxmap.map, if_map); + + r->p = NULL; + return (NULL); +} + +void +if_leave(struct ifref *r, struct ifnet *ifp) +{ + if (r->p == NULL) + return; + + srp_leave(r->p, ifp); +} + +/* + * Map interface index to interface structure pointer. + */ +struct ifnet * +if_get(unsigned int index) +{ + struct ifref r; + struct ifnet *ifp; + + ifp = if_enter(&r, index); + if (ifp != NULL) { + KASSERT(ifp->if_index == index); + if_ref(ifp); + } + if_leave(&r, ifp); return (ifp); } @@ -2992,6 +2998,7 @@ ifq_purge(struct ifqueue *ifq) void ifq_init(struct ifqueue *ifq) { + taskctx_init(&ifq->ifq_serializer, IPL_NET); mtx_init(&ifq->ifq_mtx, IPL_NET); ifq->ifq_drops = 0; @@ -2999,7 +3006,6 @@ ifq_init(struct ifqueue *ifq) ifq->ifq_ops = &priq_ops; ifq->ifq_q = priq_ops.ifqop_alloc(NULL); - ifq->ifq_serializer = 0; ifq->ifq_len = 0; if (ifq->ifq_maxlen == 0) Index: sys/net/if.h =================================================================== RCS file: /cvs/src/sys/net/if.h,v retrieving revision 1.175 diff -u -p -r1.175 if.h --- sys/net/if.h 5 Dec 2015 19:04:37 -0000 1.175 +++ sys/net/if.h 6 Dec 2015 03:39:03 -0000 @@ -450,6 +450,10 @@ struct if_afreq { struct socket; struct ifnet; +struct ifref { + struct srp *p; +}; + void if_alloc_sadl(struct ifnet *); void if_free_sadl(struct ifnet *); void if_attach(struct ifnet *); @@ -471,6 +475,8 @@ int if_addgroup(struct ifnet *, const ch int if_delgroup(struct ifnet *, const char *); void if_group_routechange(struct sockaddr *, struct sockaddr *); struct ifnet *ifunit(const char *); +struct ifnet *if_enter(struct ifref *, unsigned int); +void if_leave(struct ifref *, struct ifnet *); struct ifnet *if_get(unsigned int); void if_put(struct ifnet *); void ifnewlladdr(struct ifnet *); Index: sys/net/if_var.h =================================================================== RCS file: /cvs/src/sys/net/if_var.h,v retrieving revision 1.64 diff -u -p -r1.64 if_var.h --- sys/net/if_var.h 5 Dec 2015 16:24:59 -0000 1.64 +++ sys/net/if_var.h 6 Dec 2015 03:39:03 -0000 @@ -42,6 +42,7 @@ #include #include #include +#include #include /* @@ -73,7 +74,6 @@ struct rtentry; struct timeout; struct arpcom; struct ifnet; -struct task; /* * Structure describing a `cloning' interface. @@ -108,12 +108,13 @@ struct ifq_ops { }; struct ifqueue { + struct taskctx ifq_serializer; + struct mutex ifq_mtx; uint64_t ifq_drops; const struct ifq_ops *ifq_ops; void *ifq_q; unsigned int ifq_len; - unsigned int ifq_serializer; unsigned int ifq_oactive; unsigned int ifq_maxlen; @@ -173,6 +174,7 @@ struct ifnet { /* and the entries */ int (*if_ll_output)(struct ifnet *, struct mbuf *, struct sockaddr *, struct rtentry *); /* initiate output routine */ + struct task if_start_ctx; void (*if_start)(struct ifnet *); /* ioctl routine */ int (*if_ioctl)(struct ifnet *, u_long, caddr_t); @@ -365,6 +367,8 @@ extern struct taskq *softnettq; void if_start(struct ifnet *); void if_start_barrier(struct ifnet *); +void if_restart(struct ifnet *, struct task *); +void if_restart_task(void *); int if_enqueue_try(struct ifnet *, struct mbuf *); int if_enqueue(struct ifnet *, struct mbuf *); void if_input(struct ifnet *, struct mbuf_list *); Index: sys/sys/task.h =================================================================== RCS file: /cvs/src/sys/sys/task.h,v retrieving revision 1.8 diff -u -p -r1.8 task.h --- sys/sys/task.h 9 Feb 2015 03:15:41 -0000 1.8 +++ sys/sys/task.h 6 Dec 2015 03:39:03 -0000 @@ -20,6 +20,7 @@ #define _SYS_TASKQ_H_ #include +#include struct taskq; @@ -30,6 +31,12 @@ struct task { unsigned int t_flags; }; +struct taskctx { + struct mutex tc_mtx; + TAILQ_HEAD(, task) tc_worklist; + unsigned int tc_serializer; +}; + #define TASKQ_MPSAFE (1 << 0) #define TASKQ_CANTSLEEP (1 << 1) @@ -39,6 +46,7 @@ extern struct taskq *const systqmp; struct taskq *taskq_create(const char *, unsigned int, int, unsigned int); void taskq_destroy(struct taskq *); +void taskq_barrier(struct taskq *); void task_set(struct task *, void (*)(void *), void *); int task_add(struct taskq *, struct task *); @@ -46,6 +54,14 @@ int task_del(struct taskq *, struct ta #define TASK_INITIALIZER(_f, _a) \ { { NULL, NULL }, (_f), (_a), 0 } + +void taskctx_init(struct taskctx *, int); +void taskctx_barrier(struct taskctx *); + +void task_dispatch(struct taskctx *, struct task *); + +#define TASKCTX_INITIALIZER(_tc, _i) \ + { MUTEX_INITIALIZER(_i), TAILQ_HEAD_INITIALIZER(_tc.tc_worklist), 0 } #endif /* _KERNEL */