diff options
Diffstat (limited to 'bufferevent_async.c')
-rw-r--r-- | bufferevent_async.c | 66 |
1 files changed, 43 insertions, 23 deletions
diff --git a/bufferevent_async.c b/bufferevent_async.c index 6395e57a9f0c..40c7c5e8d0d3 100644 --- a/bufferevent_async.c +++ b/bufferevent_async.c @@ -46,6 +46,7 @@ #ifdef _WIN32 #include <winsock2.h> +#include <winerror.h> #include <ws2tcpip.h> #endif @@ -100,11 +101,32 @@ const struct bufferevent_ops bufferevent_ops_async = { be_async_ctrl, }; +static inline void +be_async_run_eventcb(struct bufferevent *bev, short what, int options) +{ bufferevent_run_eventcb_(bev, what, options|BEV_TRIG_DEFER_CALLBACKS); } + +static inline void +be_async_trigger_nolock(struct bufferevent *bev, short what, int options) +{ bufferevent_trigger_nolock_(bev, what, options|BEV_TRIG_DEFER_CALLBACKS); } + +static inline int +fatal_error(int err) +{ + switch (err) { + /* We may have already associated this fd with a port. + * Let's hope it's this port, and that the error code + * for doing this neer changes. */ + case ERROR_INVALID_PARAMETER: + return 0; + } + return 1; +} + static inline struct bufferevent_async * upcast(struct bufferevent *bev) { struct bufferevent_async *bev_a; - if (bev->be_ops != &bufferevent_ops_async) + if (!BEV_IS_ASYNC(bev)) return NULL; bev_a = EVUTIL_UPCAST(bev, struct bufferevent_async, bev.bev); return bev_a; @@ -217,7 +239,7 @@ bev_async_consider_writing(struct bufferevent_async *beva) &beva->write_overlapped)) { bufferevent_decref_(bev); beva->ok = 0; - bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR, 0); + be_async_run_eventcb(bev, BEV_EVENT_ERROR, 0); } else { beva->write_in_progress = at_most; bufferevent_decrement_write_buckets_(&beva->bev, at_most); @@ -270,7 +292,7 @@ bev_async_consider_reading(struct bufferevent_async *beva) bufferevent_incref_(bev); if (evbuffer_launch_read_(bev->input, at_most, &beva->read_overlapped)) { beva->ok = 0; - bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR, 0); + be_async_run_eventcb(bev, BEV_EVENT_ERROR, 0); bufferevent_decref_(bev); } else { beva->read_in_progress = at_most; @@ -381,10 +403,10 @@ be_async_destruct(struct bufferevent *bev) bev_async_del_write(bev_async); fd = evbuffer_overlapped_get_fd_(bev->input); - if (fd != (evutil_socket_t)INVALID_SOCKET && + if (fd != (evutil_socket_t)EVUTIL_INVALID_SOCKET && (bev_p->options & BEV_OPT_CLOSE_ON_FREE)) { evutil_closesocket(fd); - evbuffer_overlapped_set_fd_(bev->input, INVALID_SOCKET); + evbuffer_overlapped_set_fd_(bev->input, EVUTIL_INVALID_SOCKET); } } @@ -428,8 +450,7 @@ connect_complete(struct event_overlapped *eo, ev_uintptr_t key, else bev_async_set_wsa_error(bev, eo); - bufferevent_run_eventcb_(bev, - ok? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR, 0); + be_async_run_eventcb(bev, ok ? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR, 0); event_base_del_virtual_(bev->ev_base); @@ -459,16 +480,16 @@ read_complete(struct event_overlapped *eo, ev_uintptr_t key, if (bev_a->ok) { if (ok && nbytes) { BEV_RESET_GENERIC_READ_TIMEOUT(bev); - bufferevent_trigger_nolock_(bev, EV_READ, 0); + be_async_trigger_nolock(bev, EV_READ, 0); bev_async_consider_reading(bev_a); } else if (!ok) { what |= BEV_EVENT_ERROR; bev_a->ok = 0; - bufferevent_run_eventcb_(bev, what, 0); + be_async_run_eventcb(bev, what, 0); } else if (!nbytes) { what |= BEV_EVENT_EOF; bev_a->ok = 0; - bufferevent_run_eventcb_(bev, what, 0); + be_async_run_eventcb(bev, what, 0); } } @@ -502,16 +523,16 @@ write_complete(struct event_overlapped *eo, ev_uintptr_t key, if (bev_a->ok) { if (ok && nbytes) { BEV_RESET_GENERIC_WRITE_TIMEOUT(bev); - bufferevent_trigger_nolock_(bev, EV_WRITE, 0); + be_async_trigger_nolock(bev, EV_WRITE, 0); bev_async_consider_writing(bev_a); } else if (!ok) { what |= BEV_EVENT_ERROR; bev_a->ok = 0; - bufferevent_run_eventcb_(bev, what, 0); + be_async_run_eventcb(bev, what, 0); } else if (!nbytes) { what |= BEV_EVENT_EOF; bev_a->ok = 0; - bufferevent_run_eventcb_(bev, what, 0); + be_async_run_eventcb(bev, what, 0); } } @@ -532,11 +553,7 @@ bufferevent_async_new_(struct event_base *base, return NULL; if (fd >= 0 && event_iocp_port_associate_(iocp, fd, 1)<0) { - int err = GetLastError(); - /* We may have alrady associated this fd with a port. - * Let's hope it's this port, and that the error code - * for doing this neer changes. */ - if (err != ERROR_INVALID_PARAMETER) + if (fatal_error(GetLastError())) return NULL; } @@ -580,7 +597,6 @@ bufferevent_async_set_connected_(struct bufferevent *bev) { struct bufferevent_async *bev_async = upcast(bev); bev_async->ok = 1; - bufferevent_init_generic_timeout_cbs_(bev); /* Now's a good time to consider reading/writing */ be_async_enable(bev, bev->enabled); } @@ -654,25 +670,29 @@ be_async_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op, data->fd = evbuffer_overlapped_get_fd_(bev->input); return 0; case BEV_CTRL_SET_FD: { + struct bufferevent_async *bev_a = upcast(bev); struct event_iocp_port *iocp; if (data->fd == evbuffer_overlapped_get_fd_(bev->input)) return 0; if (!(iocp = event_base_get_iocp_(bev->ev_base))) return -1; - if (event_iocp_port_associate_(iocp, data->fd, 1) < 0) - return -1; + if (event_iocp_port_associate_(iocp, data->fd, 1) < 0) { + if (fatal_error(GetLastError())) + return -1; + } evbuffer_overlapped_set_fd_(bev->input, data->fd); evbuffer_overlapped_set_fd_(bev->output, data->fd); + bev_a->ok = data->fd >= 0; return 0; } case BEV_CTRL_CANCEL_ALL: { struct bufferevent_async *bev_a = upcast(bev); evutil_socket_t fd = evbuffer_overlapped_get_fd_(bev->input); - if (fd != (evutil_socket_t)INVALID_SOCKET && + if (fd != (evutil_socket_t)EVUTIL_INVALID_SOCKET && (bev_a->bev.options & BEV_OPT_CLOSE_ON_FREE)) { closesocket(fd); - evbuffer_overlapped_set_fd_(bev->input, INVALID_SOCKET); + evbuffer_overlapped_set_fd_(bev->input, EVUTIL_INVALID_SOCKET); } bev_a->ok = 0; return 0; |