From ca5089ebf1afb42bb23349398b4e1881c2047362 Mon Sep 17 00:00:00 2001 From: Nils Goroll Date: Sun, 29 Sep 2024 10:54:09 +0200 Subject: [PATCH] storage_simple: Implement asynchronous iteration and use it for the iterator This commit implements the asynchronous iteration API defined and described in previous commits for the simple storage layer and reimplements the synchronous iterator with it. This commit message does not provide background information, please refer to the two previous commits. Implementation sml_ai_init() initializes the handle and choses either a simple or more elaborate "boc" lease function depending on whether or not a streaming fetch is ongoing (boc present). sml_ai_lease_simple() is just that, dead simple. It iterates the storage segment list and fills the struct vaiov array provided by the caller. It is a good starting point into the implementation. sml_ai_lease_boc() handles the busy case and is more elaborate due to the nature of streaming fetches. It first calls ObjVAIGetExtend() to get the current extent. If no data is available, it returns the appropriate value. Other than that, is basically does the same things as sml_ai_lease_simple() with these exceptions: It also needs to return partial extends ("fragments") and it needs to handle the case where the last available segment is reached, in which case there is no successor to store for the next invocation, but also the last segment could get returned and thus freed before the next invocation. sml_ai_return() is only used for the "boc" case. It removes returned full segments from the list and then frees them outside the boc mtx. sml_ai_fini() is straight forward and should not need explanation. Implementation of sml_iterator() using the new API To reimplement the existing synchronous iterator based on the new API, we first need a little facility to block waiting for a notification. This is struct sml_notify with the four sml_notify* functions. sml_notify() is the callback, sml_notify_wait() blocks waiting for a notification to arrive. Until it runs out of work, the iterator performs these steps: ObjVAIlease() is called repeatedly until either the vaiov array is full or a negative value is returned. This allows the rest of the code to react to the next condition appropriately by sending an OBJ_ITER_FLUSH with the last lease only. Calling func() on each extent is trivial, the complications only come from handling OBJ_ITER_FLUSH, "just in time" returns and error handling. --- bin/varnishd/storage/storage_persistent.c | 1 + bin/varnishd/storage/storage_simple.c | 527 +++++++++++++++++----- bin/varnishd/storage/storage_simple.h | 1 - bin/varnishtest/tests/c00111.vtc | 5 +- 4 files changed, 424 insertions(+), 110 deletions(-) diff --git a/bin/varnishd/storage/storage_persistent.c b/bin/varnishd/storage/storage_persistent.c index df2fc9128e7..9b37de27590 100644 --- a/bin/varnishd/storage/storage_persistent.c +++ b/bin/varnishd/storage/storage_persistent.c @@ -692,6 +692,7 @@ smp_init(void) smp_oc_realmethods.objsetattr = SML_methods.objsetattr; smp_oc_realmethods.objtouch = LRU_Touch; smp_oc_realmethods.objfree = smp_oc_objfree; + smp_oc_realmethods.vai_init = SML_methods.vai_init; } /*-------------------------------------------------------------------- diff --git a/bin/varnishd/storage/storage_simple.c b/bin/varnishd/storage/storage_simple.c index 1dea4dc3940..7910a5895ac 100644 --- a/bin/varnishd/storage/storage_simple.c +++ b/bin/varnishd/storage/storage_simple.c @@ -31,6 +31,8 @@ #include "config.h" +#include + #include "cache/cache_varnishd.h" #include "cache/cache_obj.h" @@ -304,130 +306,440 @@ sml_objfree(struct worker *wrk, struct objcore *oc) wrk->stats->n_object--; } +// kept for reviewers - XXX remove later +#undef VAI_DBG + +struct sml_hdl { + struct vai_hdl_preamble preamble; +#define SML_HDL_MAGIC 0x37dfd996 + struct vai_qe qe; + struct ws *ws; // NULL is malloc() + struct objcore *oc; + struct object *obj; + const struct stevedore *stv; + struct boc *boc; + + struct storage *st; // updated by _lease() + struct storage *last; // if st == NULL + + // only for _lease_boc() + uint64_t st_off; // already returned fragment of current st + uint64_t avail, returned; +}; + +static inline void +sml_ai_vaiov_fill(struct vaiov *vaiov, struct storage *st) +{ + INIT_OBJ(vaiov, VAIOV_MAGIC); + vaiov->iov.iov_base = st->ptr; + vaiov->iov.iov_len = st->len; + vaiov->lease = (uintptr_t)st; + VAI_ASSERT_LEASE(vaiov->lease); +} + +static int +sml_ai_lease_simple(struct worker *wrk, vai_hdl vhdl, struct vaiov *vaiov, int vaiovcnt) +{ + struct sml_hdl *hdl; + struct storage *st, *next; + int r = 0; + + (void) wrk; + CAST_VAI_HDL_NOTNULL(hdl, vhdl, SML_HDL_MAGIC); + AN(vaiov); + AN(vaiovcnt); + + AZ(hdl->st_off); + st = hdl->st; + while (st != NULL && vaiovcnt > 0) { + next = VTAILQ_PREV(st, storagehead, list); + CHECK_OBJ_ORNULL(next, STORAGE_MAGIC); + sml_ai_vaiov_fill(vaiov, st); + vaiov->flags = (next == NULL) ? VAIOV_F_END : 0; + + vaiov++; + r++; + vaiovcnt--; + st = next; + } + hdl->st = st; + return (r); +} + +/* + * on leases while streaming (with a boc): + * + * SML uses the lease return facility to implement the "free behind" for + * OC_F_TRANSIENT objects. When streaming, we also return leases on + * fragments of sts, but we must only "free behind" when we are done with the + * last fragment. + * + * So we use a magic lease to signal "this is only a fragment", which we ignore + * on returns + */ + +// not using 0 to be able to catch accidental null pointers +static const uint64_t sml_ai_lease_frag = 0x8; + +static int +sml_ai_lease_boc(struct worker *wrk, vai_hdl vhdl, struct vaiov *vaiov, int vaiovcnt) +{ + struct sml_hdl *hdl; + enum boc_state_e state; + struct storage *next; + int r = 0; + + CAST_VAI_HDL_NOTNULL(hdl, vhdl, SML_HDL_MAGIC); + AN(vaiov); + AN(vaiovcnt); + + CHECK_VAI_HDL_NOTNULL(hdl, SML_HDL_MAGIC); + AN(vaiov); + AN(vaiovcnt); + if (hdl->avail == hdl->returned) { + hdl->avail = ObjVAIGetExtend(wrk, hdl->oc, hdl->returned, + &state, &hdl->qe); + if (state == BOS_FAILED) + return (-EPIPE); + else if (state == BOS_FINISHED) + (void)0; + else if (hdl->avail == hdl->returned) { + // ObjVAIGetExtend() has scheduled a notification + if (hdl->boc->transit_buffer > 0) + return (-ENOBUFS); + else + return (-EAGAIN); + } + else + assert(state < BOS_FINISHED); + } + Lck_Lock(&hdl->boc->mtx); + if (hdl->st == NULL && hdl->last != NULL) { + /* when the "last" st completed, we did not yet have a next, so + * resume from there. Because "last" might have been returned and + * deleted, we can not just use the pointer, but rather need to + * iterate the st list. + * if we can not find "last", it also has been returned and + * deleted, and the current write head (VTAILQ_LAST) is our next + * st, which can also be null if we are done. + */ + VTAILQ_FOREACH_REVERSE(next, &hdl->obj->list, storagehead, list) { + if (next == hdl->last) { + hdl->st = VTAILQ_PREV(next, storagehead, list); + break; + } + } + } + hdl->last = NULL; + if (hdl->st == NULL) { + assert(hdl->returned == 0 || hdl->avail == hdl->returned); + hdl->st = VTAILQ_LAST(&hdl->obj->list, storagehead); + } + if (hdl->st == NULL) + assert(hdl->avail == hdl->returned); + + while (vaiovcnt > 0 && hdl->avail > hdl->returned) { + CHECK_OBJ_NOTNULL(hdl->st, STORAGE_MAGIC); // ObjVAIGetExtend ensures + assert(hdl->st_off <= hdl->st->space); + size_t av = hdl->avail - hdl->returned; + size_t l = hdl->st->space - hdl->st_off; + AN(l); + if (l > av) + l = av; + INIT_OBJ(vaiov, VAIOV_MAGIC); + vaiov->iov.iov_base = hdl->st->ptr + hdl->st_off; + vaiov->iov.iov_len = l; + if (hdl->st_off + l == hdl->st->space) { + next = VTAILQ_PREV(hdl->st, storagehead, list); + AZ(hdl->last); + if (next == NULL) + hdl->last = hdl->st; + else + CHECK_OBJ(next, STORAGE_MAGIC); +#ifdef VAI_DBG + VSLb(wrk->vsl, SLT_Debug, "off %zu + l %zu == space st %p next st %p stvprv %p", + hdl->st_off, l, hdl->st, next, hdl->boc->stevedore_priv); +#endif + vaiov->lease = (uintptr_t)hdl->st; + hdl->st_off = 0; + hdl->st = next; + } + else { + vaiov->lease = sml_ai_lease_frag; + hdl->st_off += l; + } + hdl->returned += l; + VAI_ASSERT_LEASE(vaiov->lease); + r++; + vaiov++; + vaiovcnt--; + } + + Lck_Unlock(&hdl->boc->mtx); + return (r); +} + +static void v_matchproto_(vai_return_f) +sml_ai_return(struct worker *wrk, vai_hdl vhdl, uint64_t *leases, int leasecnt) +{ + struct storage *st; + struct sml_hdl *hdl; + uint64_t *p, *end; + + (void) wrk; + CAST_VAI_HDL_NOTNULL(hdl, vhdl, SML_HDL_MAGIC); + AN(leases); + AN(leasecnt); + + // callback is only registered if needed + assert(hdl->boc != NULL && (hdl->oc->flags & OC_F_TRANSIENT) != 0); + + // make this a two-stage process to avoid free under the lock + Lck_Lock(&hdl->boc->mtx); + for (p = leases, end = leases + leasecnt; p < end; p++) { + if (*p == sml_ai_lease_frag) + continue; + CAST_OBJ_NOTNULL(st, (void *)*p, STORAGE_MAGIC); + VTAILQ_REMOVE(&hdl->obj->list, st, list); + if (st == hdl->boc->stevedore_priv) + hdl->boc->stevedore_priv = trim_once; + } + Lck_Unlock(&hdl->boc->mtx); + + for (p = leases; p < end; p++) { + if (*p == sml_ai_lease_frag) + continue; + CAST_OBJ_NOTNULL(st, (void *)*p, STORAGE_MAGIC); + sml_stv_free(hdl->stv, st); + } +} + +static void v_matchproto_(vai_fini_f) +sml_ai_fini(struct worker *wrk, vai_hdl *vai_hdlp) +{ + struct sml_hdl *hdl; + + AN(vai_hdlp); + CAST_VAI_HDL_NOTNULL(hdl, *vai_hdlp, SML_HDL_MAGIC); + *vai_hdlp = NULL; + + if (hdl->boc != NULL) { + ObjVAICancel(wrk, hdl->boc, &hdl->qe); + HSH_DerefBoc(wrk, hdl->oc); + hdl->boc = NULL; + } + + if (hdl->ws != NULL) + WS_Release(hdl->ws, 0); + else + free(hdl); +} + +static vai_hdl v_matchproto_(vai_init_f) +sml_ai_init(struct worker *wrk, struct objcore *oc, struct ws *ws, + vai_notify_cb *notify, void *notify_priv) +{ + struct sml_hdl *hdl; + const size_t sz = sizeof *hdl; + + if (ws != NULL && WS_ReserveSize(ws, (unsigned)sz)) + hdl = WS_Reservation(ws); + else { + hdl = malloc(sz); + ws = NULL; + } + + AN(hdl); + INIT_VAI_HDL(hdl, SML_HDL_MAGIC); + hdl->preamble.vai_lease = sml_ai_lease_simple; + hdl->preamble.vai_fini = sml_ai_fini; + hdl->ws = ws; + + hdl->oc = oc; + hdl->obj = sml_getobj(wrk, oc); + CHECK_OBJ_NOTNULL(hdl->obj, OBJECT_MAGIC); + hdl->stv = oc->stobj->stevedore; + CHECK_OBJ_NOTNULL(hdl->stv, STEVEDORE_MAGIC); + + hdl->st = VTAILQ_LAST(&hdl->obj->list, storagehead); + CHECK_OBJ_ORNULL(hdl->st, STORAGE_MAGIC); + + hdl->boc = HSH_RefBoc(oc); + if (hdl->boc == NULL) + return (hdl); + /* we only initialize notifications if we have a boc, so + * any wrong attempt triggers magic checks. + */ + hdl->preamble.vai_lease = sml_ai_lease_boc; + if ((hdl->oc->flags & OC_F_TRANSIENT) != 0) + hdl->preamble.vai_return = sml_ai_return; + hdl->qe.magic = VAI_Q_MAGIC; + hdl->qe.cb = notify; + hdl->qe.hdl = hdl; + hdl->qe.priv = notify_priv; + return (hdl); +} + +/* + * trivial notification to allow the iterator to simply block + */ +struct sml_notify { + unsigned magic; +#define SML_NOTIFY_MAGIC 0x4589af31 + unsigned hasmore; + pthread_mutex_t mtx; + pthread_cond_t cond; +}; + +static void +sml_notify_init(struct sml_notify *sn) +{ + + INIT_OBJ(sn, SML_NOTIFY_MAGIC); + AZ(pthread_mutex_init(&sn->mtx, NULL)); + AZ(pthread_cond_init(&sn->cond, NULL)); +} + +static void +sml_notify_fini(struct sml_notify *sn) +{ + + CHECK_OBJ_NOTNULL(sn, SML_NOTIFY_MAGIC); + AZ(pthread_mutex_destroy(&sn->mtx)); + AZ(pthread_cond_destroy(&sn->cond)); +} + +static void v_matchproto_(vai_notify_cb) +sml_notify(vai_hdl hdl, void *priv) +{ + struct sml_notify *sn; + + (void) hdl; + CAST_OBJ_NOTNULL(sn, priv, SML_NOTIFY_MAGIC); + AZ(pthread_mutex_lock(&sn->mtx)); + sn->hasmore = 1; + AZ(pthread_cond_signal(&sn->cond)); + AZ(pthread_mutex_unlock(&sn->mtx)); + +} + +static void +sml_notify_wait(struct sml_notify *sn) +{ + + CHECK_OBJ_NOTNULL(sn, SML_NOTIFY_MAGIC); + AZ(pthread_mutex_lock(&sn->mtx)); + while (sn->hasmore == 0) + AZ(pthread_cond_wait(&sn->cond, &sn->mtx)); + AN(sn->hasmore); + sn->hasmore = 0; + AZ(pthread_mutex_unlock(&sn->mtx)); +} + static int v_matchproto_(objiterator_f) sml_iterator(struct worker *wrk, struct objcore *oc, void *priv, objiterate_f *func, int final) { - struct boc *boc; - enum boc_state_e state; - struct object *obj; - struct storage *st; - struct storage *checkpoint = NULL; - const struct stevedore *stv; - ssize_t checkpoint_len = 0; - ssize_t len = 0; - int ret = 0, ret2; - ssize_t ol; - ssize_t nl; - ssize_t sl; - void *p; - ssize_t l; - unsigned u; - - obj = sml_getobj(wrk, oc); - CHECK_OBJ_NOTNULL(obj, OBJECT_MAGIC); - stv = oc->stobj->stevedore; - CHECK_OBJ_NOTNULL(stv, STEVEDORE_MAGIC); + struct sml_notify sn; + const int vaiomax = 256; + struct vaiov *vio, *end, vaiov[vaiomax]; + uint64_t ret[vaiomax]; + unsigned u, uu; + vai_hdl hdl; + int nn, n, r, r2, nret; - boc = HSH_RefBoc(oc); + (void) final; // phase out? + sml_notify_init(&sn); + hdl = ObjVAIinit(wrk, oc, NULL, sml_notify, &sn); + AN(hdl); - if (boc == NULL) { - VTAILQ_FOREACH_REVERSE_SAFE( - st, &obj->list, storagehead, list, checkpoint) { + r = u = nret = 0; - u = 0; - if (VTAILQ_PREV(st, storagehead, list) == NULL) + do { + n = 0; + do { + nn = ObjVAIlease(wrk, hdl, vaiov + n, vaiomax - n); + if (nn <= 0) + break; + n += nn; + } while (n < vaiomax); + assert(n >= 0); + assert(n <= vaiomax); + + /* + * n contains the number of vaiovs leased + * nn is the wait/return action or 0 + * + * nn tells us if to flush + */ + for (vio = vaiov, end = vaiov + n; vio < end; vio++) { + CHECK_OBJ(vio, VAIOV_MAGIC); + AZ(u & OBJ_ITER_END); + if (vio->flags & VAIOV_F_END) { + assert(vio == end - 1); u |= OBJ_ITER_END; - if (final) - u |= OBJ_ITER_FLUSH; - if (ret == 0 && st->len > 0) - ret = func(priv, u, st->ptr, st->len); - if (final) { - VTAILQ_REMOVE(&obj->list, st, list); - sml_stv_free(stv, st); - } else if (ret) + } + else + AZ(vio->flags); + + // flush if it is the last IOV and we will wait + // or if we need space in the return leases array + uu = u; + if ((vio == end - 1 && nn < 0) || nret == vaiomax) + uu |= OBJ_ITER_FLUSH; + r = func(priv, uu, vio->iov.iov_base, vio->iov.iov_len); + if (r != 0) break; - } - return (ret); - } - p = NULL; - l = 0; + // whenever we have flushed, return leases + if ((uu & OBJ_ITER_FLUSH) && nret > 0) { + ObjVAIreturn(wrk, hdl, ret, nret); + nret = 0; + } - u = 0; - if (boc->fetched_so_far == 0) { - ret = func(priv, OBJ_ITER_FLUSH, NULL, 0); - if (ret) - return (ret); - } - while (1) { - ol = len; - nl = ObjWaitExtend(wrk, oc, ol, &state); - if (state == BOS_FAILED) { - ret = -1; - break; + assert(nret < vaiomax); + ret[nret++] = vio->lease; } - if (nl == ol) { - assert(state == BOS_FINISHED); - break; - } - assert(nl > ol); - Lck_Lock(&boc->mtx); - AZ(VTAILQ_EMPTY(&obj->list)); - if (checkpoint == NULL) { - st = VTAILQ_LAST(&obj->list, storagehead); - sl = 0; - } else { - st = checkpoint; - sl = checkpoint_len; - ol -= checkpoint_len; + + if (n == 0 && nn < 0) { + r2 = func(priv, OBJ_ITER_FLUSH, NULL, 0); + if (r == 0) + r = r2; } - while (st != NULL) { - if (st->len > ol) { - p = st->ptr + ol; - l = st->len - ol; - len += l; - break; - } - ol -= st->len; - assert(ol >= 0); - nl -= st->len; - assert(nl > 0); - sl += st->len; - st = VTAILQ_PREV(st, storagehead, list); - if (final && checkpoint != NULL) { - if (checkpoint == boc->stevedore_priv) - boc->stevedore_priv = trim_once; - else - VTAILQ_REMOVE(&obj->list, checkpoint, list); - sml_stv_free(stv, checkpoint); + + // for the break of the for loop above, we still need to collect returns + for (vio++; vio < end; vio++) { + CHECK_OBJ(vio, VAIOV_MAGIC); + if (nret == vaiomax) { + ObjVAIreturn(wrk, hdl, ret, nret); + nret = 0; } - checkpoint = st; - checkpoint_len = sl; + ret[nret++] = vio->lease; } - CHECK_OBJ_NOTNULL(obj, OBJECT_MAGIC); - CHECK_OBJ_NOTNULL(st, STORAGE_MAGIC); - st = VTAILQ_PREV(st, storagehead, list); - if (st != NULL && st->len == 0) - st = NULL; - Lck_Unlock(&boc->mtx); - assert(l > 0 || state == BOS_FINISHED); - u = 0; - if (st == NULL || final) - u |= OBJ_ITER_FLUSH; - if (st == NULL && state == BOS_FINISHED) - u |= OBJ_ITER_END; - ret = func(priv, u, p, l); - if (ret) - break; - } - HSH_DerefBoc(wrk, oc); + if (n == 0 && nn == -ENOBUFS && nret > 0) { + ObjVAIreturn(wrk, hdl, ret, nret); + nret = 0; + } + if (r == 0 && (nn == -ENOBUFS || nn == -EAGAIN)) { + sml_notify_wait(&sn); + } + else if (r == 0 && nn < 0) + r = -1; + } while (nn != 0 && r == 0); + if ((u & OBJ_ITER_END) == 0) { - ret2 = func(priv, OBJ_ITER_END, NULL, 0); - if (ret == 0) - ret = ret2; + r2 = func(priv, OBJ_ITER_END, NULL, 0); + if (r == 0) + r = r2; } - return (ret); + + if (nret > 0) + ObjVAIreturn(wrk, hdl, ret, nret); + + ObjVAIfini(wrk, &hdl); + sml_notify_fini(&sn); + + return (r); } /*-------------------------------------------------------------------- @@ -734,6 +1046,7 @@ const struct obj_methods SML_methods = { .objgetattr = sml_getattr, .objsetattr = sml_setattr, .objtouch = LRU_Touch, + .vai_init = sml_ai_init }; static void diff --git a/bin/varnishd/storage/storage_simple.h b/bin/varnishd/storage/storage_simple.h index 881e0b8571c..894de38e153 100644 --- a/bin/varnishd/storage/storage_simple.h +++ b/bin/varnishd/storage/storage_simple.h @@ -39,7 +39,6 @@ struct storage { unsigned magic; #define STORAGE_MAGIC 0x1a4e51c0 - VTAILQ_ENTRY(storage) list; void *priv; diff --git a/bin/varnishtest/tests/c00111.vtc b/bin/varnishtest/tests/c00111.vtc index 706ee7041a7..996d5d258a8 100644 --- a/bin/varnishtest/tests/c00111.vtc +++ b/bin/varnishtest/tests/c00111.vtc @@ -15,7 +15,8 @@ client c1 { } -run varnish v1 -vsl_catchup -varnish v1 -expect fetch_failed == 1 +# with vai, this no longer fails systematically (which is good) +varnish v1 -expect fetch_failed <= 1 varnish v1 -cliok "param.set transit_buffer 4k" @@ -26,4 +27,4 @@ client c2 { varnish v1 -vsl_catchup varnish v1 -expect s_fetch == 2 -varnish v1 -expect fetch_failed == 1 +varnish v1 -expect fetch_failed <= 1