From 04e1532fc54002411768087a126c878f556c2c3c Mon Sep 17 00:00:00 2001 From: Kip Macy Date: Wed, 20 Aug 2008 02:42:08 +0000 Subject: Xen 3.2 now interleaves watch events with regular message notifications. More graciously handle processing messages and watch events inline prior to threads being up and running. MFC after: 1 month --- sys/xen/xenbus/xenbus_comms.c | 1 + sys/xen/xenbus/xenbus_comms.h | 7 ++- sys/xen/xenbus/xenbus_dev.c | 2 - sys/xen/xenbus/xenbus_probe.c | 3 +- sys/xen/xenbus/xenbus_probe_backend.c | 3 - sys/xen/xenbus/xenbus_xs.c | 107 +++++++++++++++++++++++----------- 6 files changed, 79 insertions(+), 44 deletions(-) (limited to 'sys/xen') diff --git a/sys/xen/xenbus/xenbus_comms.c b/sys/xen/xenbus/xenbus_comms.c index 6fd34404f230..8f0f171cf255 100644 --- a/sys/xen/xenbus/xenbus_comms.c +++ b/sys/xen/xenbus/xenbus_comms.c @@ -114,6 +114,7 @@ int xb_write(const void *tdata, unsigned len) while (len != 0) { void *dst; unsigned int avail; + wait_event_interruptible(&xb_waitq, (intf->req_prod - intf->req_cons) != XENSTORE_RING_SIZE); diff --git a/sys/xen/xenbus/xenbus_comms.h b/sys/xen/xenbus/xenbus_comms.h index 4ec46f66423e..871afd56e00f 100644 --- a/sys/xen/xenbus/xenbus_comms.h +++ b/sys/xen/xenbus/xenbus_comms.h @@ -38,10 +38,14 @@ int xb_write(const void *data, unsigned len); int xb_read(void *data, unsigned len); int xs_input_avail(void); extern int xb_waitq; +extern int xenbus_running; #define __wait_event_interruptible(wchan, condition, ret) \ do { \ for (;;) { \ + if (xenbus_running == 0) { \ + break; \ + } \ if (condition) \ break; \ if ((ret = !tsleep(wchan, PWAIT | PCATCH, "waitev", hz/10))) \ @@ -96,9 +100,6 @@ do { \ #define BUG_ON PANIC_IF #define semaphore sema #define rw_semaphore sema -typedef struct mtx spinlock_t; -#define spin_lock mtx_lock -#define spin_unlock mtx_unlock #define DEFINE_SPINLOCK(lock) struct mtx lock #define DECLARE_MUTEX(lock) struct sema lock #define u32 uint32_t diff --git a/sys/xen/xenbus/xenbus_dev.c b/sys/xen/xenbus/xenbus_dev.c index 3eb6408929ec..8a13322ac18b 100644 --- a/sys/xen/xenbus/xenbus_dev.c +++ b/sys/xen/xenbus/xenbus_dev.c @@ -58,8 +58,6 @@ __FBSDID("$FreeBSD$"); #define BUG_ON PANIC_IF #define semaphore sema #define rw_semaphore sema -#define spin_lock mtx_lock -#define spin_unlock mtx_unlock #define DEFINE_SPINLOCK(lock) struct mtx lock #define DECLARE_MUTEX(lock) struct sema lock #define u32 uint32_t diff --git a/sys/xen/xenbus/xenbus_probe.c b/sys/xen/xenbus/xenbus_probe.c index 85a3e579b511..fc6821727772 100644 --- a/sys/xen/xenbus/xenbus_probe.c +++ b/sys/xen/xenbus/xenbus_probe.c @@ -1048,8 +1048,9 @@ xenbus_probe_sysinit(void *unused) /* Enumerate devices in xenstore. */ xenbus_probe_devices(&xenbus_frontend); register_xenbus_watch(&fe_watch); +#ifdef notyet xenbus_backend_probe_and_watch(); - +#endif /* Notify others that xenstore is up */ EVENTHANDLER_INVOKE(xenstore_event); diff --git a/sys/xen/xenbus/xenbus_probe_backend.c b/sys/xen/xenbus/xenbus_probe_backend.c index 8ef66ceac511..3cb8e67283c1 100644 --- a/sys/xen/xenbus/xenbus_probe_backend.c +++ b/sys/xen/xenbus/xenbus_probe_backend.c @@ -67,8 +67,6 @@ __FBSDID("$FreeBSD$"); #define BUG_ON PANIC_IF #define semaphore sema #define rw_semaphore sema -#define spin_lock mtx_lock -#define spin_unlock mtx_unlock #define DEFINE_SPINLOCK(lock) struct mtx lock #define DECLARE_MUTEX(lock) struct sema lock #define u32 uint32_t @@ -76,7 +74,6 @@ __FBSDID("$FreeBSD$"); #define simple_strtoul strtoul #define ARRAY_SIZE(x) (sizeof(x)/sizeof(x[0])) #define list_empty TAILQ_EMPTY -#define wake_up wakeup extern struct xendev_list_head xenbus_device_backend_list; #if 0 diff --git a/sys/xen/xenbus/xenbus_xs.c b/sys/xen/xenbus/xenbus_xs.c index 61a977964282..c4039f616fbe 100644 --- a/sys/xen/xenbus/xenbus_xs.c +++ b/sys/xen/xenbus/xenbus_xs.c @@ -57,13 +57,11 @@ __FBSDID("$FreeBSD$"); #include #include -int xs_process_msg(void); +static int xs_process_msg(enum xsd_sockmsg_type *type); #define kmalloc(size, unused) malloc(size, M_DEVBUF, M_WAITOK) #define BUG_ON PANIC_IF #define DEFINE_SPINLOCK(lock) struct mtx lock -#define spin_lock mtx_lock -#define spin_unlock mtx_unlock #define u32 uint32_t #define list_del(head, ent) TAILQ_REMOVE(head, ent, list) #define simple_strtoul strtoul @@ -71,6 +69,8 @@ int xs_process_msg(void); #define list_empty TAILQ_EMPTY #define streq(a, b) (strcmp((a), (b)) == 0) +int xenwatch_running = 0; +int xenbus_running = 0; struct kvec { const void *iov_base; @@ -100,7 +100,7 @@ struct xs_stored_msg { struct xs_handle { /* A list of replies. Currently only one will ever be outstanding. */ TAILQ_HEAD(xs_handle_list, xs_stored_msg) reply_list; - spinlock_t reply_lock; + struct mtx reply_lock; int reply_waitq; /* One request at a time. */ @@ -154,7 +154,7 @@ static void *read_reply(enum xsd_sockmsg_type *type, unsigned int *len) /* * Give other domain time to run :-/ */ - for (i = 0; i < 10000; i++) + for (i = 0; i < 100000; i++) HYPERVISOR_yield(); xs_process_msg(); } @@ -249,11 +249,14 @@ static void *xs_talkv(struct xenbus_transaction t, for (i = 0; i < num_vecs; i++) msg.len += iovec[i].iov_len; + printf("xs_talkv "); + sx_xlock(&xs_state.request_mutex); err = xb_write(&msg, sizeof(msg)); if (err) { sx_xunlock(&xs_state.request_mutex); + printf("xs_talkv failed %d\n", err); return ERR_PTR(err); } @@ -261,6 +264,7 @@ static void *xs_talkv(struct xenbus_transaction t, err = xb_write(iovec[i].iov_base, iovec[i].iov_len);; if (err) { sx_xunlock(&xs_state.request_mutex); + printf("xs_talkv failed %d\n", err); return ERR_PTR(err); } } @@ -277,8 +281,19 @@ static void *xs_talkv(struct xenbus_transaction t, kfree(ret); return ERR_PTR(-err); } - - BUG_ON(msg.type != type); + + if (xenwatch_running == 0) { + while (!TAILQ_EMPTY(&watch_events)) { + struct xs_stored_msg *wmsg = TAILQ_FIRST(&watch_events); + list_del(&watch_events, wmsg); + wmsg->u.watch.handle->callback( + wmsg->u.watch.handle, + (const char **)wmsg->u.watch.vec, + wmsg->u.watch.vec_size); + } + } + BUG_ON(msg.type != type); + return ret; } @@ -290,6 +305,7 @@ static void *xs_single(struct xenbus_transaction t, { struct kvec iovec; + printf("xs_single %s ", string); iovec.iov_base = (const void *)string; iovec.iov_len = strlen(string) + 1; return xs_talkv(t, type, &iovec, 1, len); @@ -339,7 +355,7 @@ static char **split(char *strings, unsigned int len, unsigned int *num) char *p, **ret; /* Count the strings. */ - *num = count_strings(strings, len); + *num = count_strings(strings, len) + 1; /* Transfer to one big alloc for easy freeing. */ ret = kmalloc(*num * sizeof(char *) + len, GFP_KERNEL); @@ -354,6 +370,8 @@ static char **split(char *strings, unsigned int len, unsigned int *num) for (p = strings, *num = 0; p < strings + len; p += strlen(p) + 1) ret[(*num)++] = p; + ret[*num] = strings + len; + return ret; } @@ -361,7 +379,7 @@ char **xenbus_directory(struct xenbus_transaction t, const char *dir, const char *node, unsigned int *num) { char *strings, *path; - unsigned int len; + unsigned int len = 0; path = join(dir, node); if (IS_ERR(path)) @@ -405,6 +423,7 @@ void *xenbus_read(struct xenbus_transaction t, if (IS_ERR(path)) return (void *)path; + printf("xs_read "); ret = xs_single(t, XS_READ, path, len); kfree(path); return ret; @@ -430,6 +449,7 @@ int xenbus_write(struct xenbus_transaction t, iovec[1].iov_base = string; iovec[1].iov_len = strlen(string); + printf("xenbus_write dir=%s val=%s ", dir, string); ret = xs_error(xs_talkv(t, XS_WRITE, iovec, ARRAY_SIZE(iovec), NULL)); kfree(path); return ret; @@ -504,6 +524,7 @@ int xenbus_transaction_end(struct xenbus_transaction t, int abort) else strcpy(abortstr, "T"); + printf("xenbus_transaction_end "); err = xs_error(xs_single(t, XS_TRANSACTION_END, abortstr, NULL)); up_read(&xs_state.suspend_mutex); @@ -641,18 +662,18 @@ int register_xenbus_watch(struct xenbus_watch *watch) down_read(&xs_state.suspend_mutex); - spin_lock(&watches_lock); + mtx_lock(&watches_lock); BUG_ON(find_watch(token) != NULL); LIST_INSERT_HEAD(&watches, watch, list); - spin_unlock(&watches_lock); + mtx_unlock(&watches_lock); err = xs_watch(watch->node, token); /* Ignore errors due to multiple registration. */ if ((err != 0) && (err != -EEXIST)) { - spin_lock(&watches_lock); + mtx_lock(&watches_lock); LIST_REMOVE(watch, list); - spin_unlock(&watches_lock); + mtx_unlock(&watches_lock); } up_read(&xs_state.suspend_mutex); @@ -671,10 +692,10 @@ void unregister_xenbus_watch(struct xenbus_watch *watch) down_read(&xs_state.suspend_mutex); - spin_lock(&watches_lock); + mtx_lock(&watches_lock); BUG_ON(!find_watch(token)); LIST_REMOVE(watch, list); - spin_unlock(&watches_lock); + mtx_unlock(&watches_lock); err = xs_unwatch(watch->node, token); if (err) @@ -684,7 +705,7 @@ void unregister_xenbus_watch(struct xenbus_watch *watch) up_read(&xs_state.suspend_mutex); /* Cancel pending watch events. */ - spin_lock(&watch_events_lock); + mtx_lock(&watch_events_lock); TAILQ_FOREACH_SAFE(msg, &watch_events, list, tmp) { if (msg->u.watch.handle != watch) continue; @@ -692,7 +713,7 @@ void unregister_xenbus_watch(struct xenbus_watch *watch) kfree(msg->u.watch.vec); kfree(msg); } - spin_unlock(&watch_events_lock); + mtx_unlock(&watch_events_lock); /* Flush any currently-executing callback, unless we are it. :-) */ if (curproc->p_pid != xenwatch_pid) { @@ -728,17 +749,19 @@ static void xenwatch_thread(void *unused) { struct xs_stored_msg *msg; + xenwatch_running = 1; for (;;) { - wait_event_interruptible(&watch_events_waitq, - !list_empty(&watch_events)); + + while (list_empty(&watch_events)) + pause("xenwatch", hz/10); sx_xlock(&xenwatch_mutex); - spin_lock(&watch_events_lock); + mtx_lock(&watch_events_lock); msg = TAILQ_FIRST(&watch_events); if (msg) list_del(&watch_events, msg); - spin_unlock(&watch_events_lock); + mtx_unlock(&watch_events_lock); if (msg != NULL) { @@ -754,7 +777,7 @@ static void xenwatch_thread(void *unused) } } -int xs_process_msg(void) +static int xs_process_msg(enum xsd_sockmsg_type *type) { struct xs_stored_msg *msg; char *body; @@ -783,7 +806,8 @@ int xs_process_msg(void) return err; } body[msg->hdr.len] = '\0'; - + + *type = msg->hdr.type; if (msg->hdr.type == XS_WATCH_EVENT) { msg->u.watch.vec = split(body, msg->hdr.len, &msg->u.watch.vec_size); @@ -792,26 +816,31 @@ int xs_process_msg(void) return PTR_ERR(msg->u.watch.vec); } - spin_lock(&watches_lock); + mtx_lock(&watches_lock); msg->u.watch.handle = find_watch( msg->u.watch.vec[XS_WATCH_TOKEN]); if (msg->u.watch.handle != NULL) { - spin_lock(&watch_events_lock); + mtx_lock(&watch_events_lock); TAILQ_INSERT_TAIL(&watch_events, msg, list); - wakeup(&watch_events_waitq); - spin_unlock(&watch_events_lock); + if (xenwatch_running) + wakeup(&watch_events_waitq); + mtx_unlock(&watch_events_lock); } else { kfree(msg->u.watch.vec); kfree(msg); } - spin_unlock(&watches_lock); + mtx_unlock(&watches_lock); } else { + printf("event=%d ", *type); msg->u.reply.body = body; - spin_lock(&xs_state.reply_lock); + mtx_lock(&xs_state.reply_lock); TAILQ_INSERT_TAIL(&xs_state.reply_list, msg, list); - spin_unlock(&xs_state.reply_lock); - wakeup(&xs_state.reply_waitq); + mtx_unlock(&xs_state.reply_lock); + if (xenbus_running) + wakeup(&xs_state.reply_waitq); } + if (*type == XS_WATCH_EVENT) + printf("\n"); return 0; } @@ -819,12 +848,17 @@ int xs_process_msg(void) static void xenbus_thread(void *unused) { int err; + enum xsd_sockmsg_type type; + + xenbus_running = 1; + pause("xenbus", hz/10); for (;;) { - err = xs_process_msg(); - if (err) + err = xs_process_msg(&type); + if (err) printf("XENBUS error %d while reading " "message\n", err); + } } @@ -835,10 +869,13 @@ int xs_init(void) TAILQ_INIT(&xs_state.reply_list); TAILQ_INIT(&watch_events); - mtx_init(&xs_state.reply_lock, "state reply", NULL, MTX_DEF); - sema_init(&xs_state.suspend_mutex, 1, "xenstore suspend"); sx_init(&xenwatch_mutex, "xenwatch"); + + + mtx_init(&xs_state.reply_lock, "state reply", NULL, MTX_DEF); sx_init(&xs_state.request_mutex, "xenstore request"); + sema_init(&xs_state.suspend_mutex, 1, "xenstore suspend"); + #if 0 mtx_init(&xs_state.suspend_mutex, "xenstore suspend", NULL, MTX_DEF); -- cgit v1.2.3