diff options
Diffstat (limited to 'bufferevent.c')
-rw-r--r-- | bufferevent.c | 115 |
1 files changed, 68 insertions, 47 deletions
diff --git a/bufferevent.c b/bufferevent.c index 490b59839cca..08c0486c087d 100644 --- a/bufferevent.c +++ b/bufferevent.c @@ -66,8 +66,7 @@ static void bufferevent_finalize_cb_(struct event_callback *evcb, void *arg_); void bufferevent_suspend_read_(struct bufferevent *bufev, bufferevent_suspend_flags what) { - struct bufferevent_private *bufev_private = - EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); + struct bufferevent_private *bufev_private = BEV_UPCAST(bufev); BEV_LOCK(bufev); if (!bufev_private->read_suspended) bufev->be_ops->disable(bufev, EV_READ); @@ -78,8 +77,7 @@ bufferevent_suspend_read_(struct bufferevent *bufev, bufferevent_suspend_flags w void bufferevent_unsuspend_read_(struct bufferevent *bufev, bufferevent_suspend_flags what) { - struct bufferevent_private *bufev_private = - EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); + struct bufferevent_private *bufev_private = BEV_UPCAST(bufev); BEV_LOCK(bufev); bufev_private->read_suspended &= ~what; if (!bufev_private->read_suspended && (bufev->enabled & EV_READ)) @@ -90,8 +88,7 @@ bufferevent_unsuspend_read_(struct bufferevent *bufev, bufferevent_suspend_flags void bufferevent_suspend_write_(struct bufferevent *bufev, bufferevent_suspend_flags what) { - struct bufferevent_private *bufev_private = - EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); + struct bufferevent_private *bufev_private = BEV_UPCAST(bufev); BEV_LOCK(bufev); if (!bufev_private->write_suspended) bufev->be_ops->disable(bufev, EV_WRITE); @@ -102,8 +99,7 @@ bufferevent_suspend_write_(struct bufferevent *bufev, bufferevent_suspend_flags void bufferevent_unsuspend_write_(struct bufferevent *bufev, bufferevent_suspend_flags what) { - struct bufferevent_private *bufev_private = - EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); + struct bufferevent_private *bufev_private = BEV_UPCAST(bufev); BEV_LOCK(bufev); bufev_private->write_suspended &= ~what; if (!bufev_private->write_suspended && (bufev->enabled & EV_WRITE)) @@ -111,6 +107,28 @@ bufferevent_unsuspend_write_(struct bufferevent *bufev, bufferevent_suspend_flag BEV_UNLOCK(bufev); } +/** + * Sometimes bufferevent's implementation can overrun high watermarks + * (one of examples is openssl) and in this case if the read callback + * will not handle enough data do over condition above the read + * callback will never be called again (due to suspend above). + * + * To avoid this we are scheduling read callback again here, but only + * from the user callback to avoid multiple scheduling: + * - when the data had been added to it + * - when the data had been drained from it (user specified read callback) + */ +static void bufferevent_inbuf_wm_check(struct bufferevent *bev) +{ + if (!bev->wm_read.high) + return; + if (!(bev->enabled & EV_READ)) + return; + if (evbuffer_get_length(bev->input) < bev->wm_read.high) + return; + + bufferevent_trigger(bev, EV_READ, BEV_OPT_DEFER_CALLBACKS); +} /* Callback to implement watermarks on the input buffer. Only enabled * if the watermark is set. */ @@ -147,6 +165,7 @@ bufferevent_run_deferred_callbacks_locked(struct event_callback *cb, void *arg) if (bufev_private->readcb_pending && bufev->readcb) { bufev_private->readcb_pending = 0; bufev->readcb(bufev, bufev->cbarg); + bufferevent_inbuf_wm_check(bufev); } if (bufev_private->writecb_pending && bufev->writecb) { bufev_private->writecb_pending = 0; @@ -187,6 +206,7 @@ bufferevent_run_deferred_callbacks_unlocked(struct event_callback *cb, void *arg void *cbarg = bufev->cbarg; bufev_private->readcb_pending = 0; UNLOCKED(readcb(bufev, cbarg)); + bufferevent_inbuf_wm_check(bufev); } if (bufev_private->writecb_pending && bufev->writecb) { bufferevent_data_cb writecb = bufev->writecb; @@ -221,8 +241,7 @@ void bufferevent_run_readcb_(struct bufferevent *bufev, int options) { /* Requires that we hold the lock and a reference */ - struct bufferevent_private *p = - EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); + struct bufferevent_private *p = BEV_UPCAST(bufev); if (bufev->readcb == NULL) return; if ((p->options|options) & BEV_OPT_DEFER_CALLBACKS) { @@ -230,6 +249,7 @@ bufferevent_run_readcb_(struct bufferevent *bufev, int options) SCHEDULE_DEFERRED(p); } else { bufev->readcb(bufev, bufev->cbarg); + bufferevent_inbuf_wm_check(bufev); } } @@ -237,8 +257,7 @@ void bufferevent_run_writecb_(struct bufferevent *bufev, int options) { /* Requires that we hold the lock and a reference */ - struct bufferevent_private *p = - EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); + struct bufferevent_private *p = BEV_UPCAST(bufev); if (bufev->writecb == NULL) return; if ((p->options|options) & BEV_OPT_DEFER_CALLBACKS) { @@ -266,8 +285,7 @@ void bufferevent_run_eventcb_(struct bufferevent *bufev, short what, int options) { /* Requires that we hold the lock and a reference */ - struct bufferevent_private *p = - EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); + struct bufferevent_private *p = BEV_UPCAST(bufev); if (bufev->errorcb == NULL) return; if ((p->options|options) & BEV_OPT_DEFER_CALLBACKS) { @@ -297,14 +315,12 @@ bufferevent_init_common_(struct bufferevent_private *bufev_private, if (!bufev->input) { if ((bufev->input = evbuffer_new()) == NULL) - return -1; + goto err; } if (!bufev->output) { - if ((bufev->output = evbuffer_new()) == NULL) { - evbuffer_free(bufev->input); - return -1; - } + if ((bufev->output = evbuffer_new()) == NULL) + goto err; } bufev_private->refcnt = 1; @@ -316,7 +332,8 @@ bufferevent_init_common_(struct bufferevent_private *bufev_private, bufev->be_ops = ops; - bufferevent_ratelim_init_(bufev_private); + if (bufferevent_ratelim_init_(bufev_private)) + goto err; /* * Set to EV_WRITE so that using bufferevent_write is going to @@ -327,20 +344,14 @@ bufferevent_init_common_(struct bufferevent_private *bufev_private, #ifndef EVENT__DISABLE_THREAD_SUPPORT if (options & BEV_OPT_THREADSAFE) { - if (bufferevent_enable_locking_(bufev, NULL) < 0) { - /* cleanup */ - evbuffer_free(bufev->input); - evbuffer_free(bufev->output); - bufev->input = NULL; - bufev->output = NULL; - return -1; - } + if (bufferevent_enable_locking_(bufev, NULL) < 0) + goto err; } #endif if ((options & (BEV_OPT_DEFER_CALLBACKS|BEV_OPT_UNLOCK_CALLBACKS)) == BEV_OPT_UNLOCK_CALLBACKS) { event_warnx("UNLOCK_CALLBACKS requires DEFER_CALLBACKS"); - return -1; + goto err; } if (options & BEV_OPT_UNLOCK_CALLBACKS) event_deferred_cb_init_( @@ -361,6 +372,17 @@ bufferevent_init_common_(struct bufferevent_private *bufev_private, evbuffer_set_parent_(bufev->output, bufev); return 0; + +err: + if (bufev->input) { + evbuffer_free(bufev->input); + bufev->input = NULL; + } + if (bufev->output) { + evbuffer_free(bufev->output); + bufev->output = NULL; + } + return -1; } void @@ -459,8 +481,7 @@ bufferevent_read_buffer(struct bufferevent *bufev, struct evbuffer *buf) int bufferevent_enable(struct bufferevent *bufev, short event) { - struct bufferevent_private *bufev_private = - EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); + struct bufferevent_private *bufev_private = BEV_UPCAST(bufev); short impl_events = event; int r = 0; @@ -474,6 +495,8 @@ bufferevent_enable(struct bufferevent *bufev, short event) if (impl_events && bufev->be_ops->enable(bufev, impl_events) < 0) r = -1; + if (r) + event_debug(("%s: cannot enable 0x%hx on %p", __func__, event, bufev)); bufferevent_decref_and_unlock_(bufev); return r; @@ -533,8 +556,7 @@ int bufferevent_disable_hard_(struct bufferevent *bufev, short event) { int r = 0; - struct bufferevent_private *bufev_private = - EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); + struct bufferevent_private *bufev_private = BEV_UPCAST(bufev); BEV_LOCK(bufev); bufev->enabled &= ~event; @@ -557,6 +579,8 @@ bufferevent_disable(struct bufferevent *bufev, short event) if (bufev->be_ops->disable(bufev, event) < 0) r = -1; + if (r) + event_debug(("%s: cannot disable 0x%hx on %p", __func__, event, bufev)); BEV_UNLOCK(bufev); return r; @@ -570,8 +594,7 @@ void bufferevent_setwatermark(struct bufferevent *bufev, short events, size_t lowmark, size_t highmark) { - struct bufferevent_private *bufev_private = - EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); + struct bufferevent_private *bufev_private = BEV_UPCAST(bufev); BEV_LOCK(bufev); if (events & EV_WRITE) { @@ -656,8 +679,7 @@ bufferevent_flush(struct bufferevent *bufev, void bufferevent_incref_and_lock_(struct bufferevent *bufev) { - struct bufferevent_private *bufev_private = - BEV_UPCAST(bufev); + struct bufferevent_private *bufev_private = BEV_UPCAST(bufev); BEV_LOCK(bufev); ++bufev_private->refcnt; } @@ -683,8 +705,7 @@ bufferevent_transfer_lock_ownership_(struct bufferevent *donor, int bufferevent_decref_and_unlock_(struct bufferevent *bufev) { - struct bufferevent_private *bufev_private = - EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); + struct bufferevent_private *bufev_private = BEV_UPCAST(bufev); int n_cbs = 0; #define MAX_CBS 16 struct event_callback *cbs[MAX_CBS]; @@ -727,8 +748,7 @@ bufferevent_finalize_cb_(struct event_callback *evcb, void *arg_) { struct bufferevent *bufev = arg_; struct bufferevent *underlying; - struct bufferevent_private *bufev_private = - EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); + struct bufferevent_private *bufev_private = BEV_UPCAST(bufev); BEV_LOCK(bufev); underlying = bufferevent_get_underlying(bufev); @@ -794,8 +814,7 @@ bufferevent_free(struct bufferevent *bufev) void bufferevent_incref(struct bufferevent *bufev) { - struct bufferevent_private *bufev_private = - EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); + struct bufferevent_private *bufev_private = BEV_UPCAST(bufev); /* XXX: now that this function is public, we might want to * - return the count from this function @@ -851,6 +870,8 @@ bufferevent_setfd(struct bufferevent *bev, evutil_socket_t fd) BEV_LOCK(bev); if (bev->be_ops->ctrl) res = bev->be_ops->ctrl(bev, BEV_CTRL_SET_FD, &d); + if (res) + event_debug(("%s: cannot set fd for %p to "EV_SOCK_FMT, __func__, bev, fd)); BEV_UNLOCK(bev); return res; } @@ -864,6 +885,8 @@ bufferevent_getfd(struct bufferevent *bev) BEV_LOCK(bev); if (bev->be_ops->ctrl) res = bev->be_ops->ctrl(bev, BEV_CTRL_GET_FD, &d); + if (res) + event_debug(("%s: cannot get fd for %p", __func__, bev)); BEV_UNLOCK(bev); return (res<0) ? -1 : d.fd; } @@ -871,8 +894,7 @@ bufferevent_getfd(struct bufferevent *bev) enum bufferevent_options bufferevent_get_options_(struct bufferevent *bev) { - struct bufferevent_private *bev_p = - EVUTIL_UPCAST(bev, struct bufferevent_private, bev); + struct bufferevent_private *bev_p = BEV_UPCAST(bev); enum bufferevent_options options; BEV_LOCK(bev); @@ -948,8 +970,7 @@ int bufferevent_generic_adj_timeouts_(struct bufferevent *bev) { const short enabled = bev->enabled; - struct bufferevent_private *bev_p = - EVUTIL_UPCAST(bev, struct bufferevent_private, bev); + struct bufferevent_private *bev_p = BEV_UPCAST(bev); int r1=0, r2=0; if ((enabled & EV_READ) && !bev_p->read_suspended && evutil_timerisset(&bev->timeout_read)) |