Skip to content

Commit

Permalink
cache_obj: Add an asynchronous iteration API
Browse files Browse the repository at this point in the history
This commit adds a new object iteration API to support asynchronous IO.

Background

To process object bodies, the Object API so far provides ObjIterate(), which
calls a storage specific iterator function. It in turn calls a caller-provided
objiterate_f function on individual, contigious segments of data (extents).

In turn, objiterate_f gets called with either no flags, or one of OBJ_ITER_FLUSH
and OBJ_ITER_END. The storage iterator uses these flags to signal lifetime of
the provided entents: They remain valid until a flag is present, so the caller
may delay use until an extent is provided with a flag sent. Or, seen from the
other end, objiterate_f needs to ensure it does not use any previously received
extent when a flag is seen.

objiterate_f can not make any assumption as to if or when it is going to be
called, if the storage iterator function needs time to retrieve data or a
streaming fetch is in progress, then so be it, objiterate_f may eventually get
called or not.

Or, again seen from the other end, the storage iterator function assumes being
called from a thread and may block at any time.

Why this change?

The model described above is fundamentally incompatible with asynchronous, event
driven IO models, where a single thread might serve multiple requests in
parallel to benefit from efficiency gains and thus no called function must ever
block.

This additional API is intended to provide an interface suitable for such
asynchronous models. As before, also the asynchronous iterator is owned by a
storage specific implementation, but now, instead of using a thread for its
state, that state exists in a data structure opaque to the caller.

API Usage

The basic model for the API is that the storage engine "leases" to the caller a
number of extents, which the caller is then free to use until it returns the
leases to the storage engine.

The storage engine can also signal to the caller that it can not return more
extents unless some are returned or that it simply can not return any at this
time for other reasons (for example, because it is waiting for data on a
streaming fetch). In both cases, the storage engine promises to call the
caller's notification function when it is ready to provide more extents or
iteration has ended.

The API consists of four functions:

- ObjVAIinit() requests an asynchronous iteration on an object. The caller
  provides an optional workspace for the storage engine to use for its state,
  and the notification callback / private pointer introduced with the previous
  commit. Its use is explained below.

  ObjVAIinit() returns either an opaque handle owned jointly by the Object layer
  in Varnish-Cache and the storage engine, or NULL if the storage engine does
  not provide asynchronous iteration.

All other API functions work on the handle returned by ObjVAIinit():

- ObjVAIlease() returns the next extents from the object body in a
  caller-prodived array. Eeach extent is a struct vaiov, which contains a struct
  iovec (see iovec(3type) / readv(2)) with the actual extent, a flags field to
  signal the last extent (mirroring OBJ_ITER_END) and an integer identifying the
  lease. The "lease" integer (uint64_t) is opaque to the caller and needs to be
  returned as-is later, but is guaranteed by storage to be a multiple of 8. This
  can be used by the caller to temporily stash a tiny amount of additional state
  into the lease.

  ObjVAIlease either returns a positive integer with a number of available
  leases, zero if the end of the object has been reached, or a negative integer
  for "call again later" and error conditions:

  -EAGAIN signals that no more data is available at this point, and the storage
  engine will call the notification function when the condition changes.

  -ENOBUFS behaves identically, but also requires the caller to return more
  leases.

  -EPIPE mirrors BOS_FAILED on the busy object.

  Any other -(errno) can be used by the storage engine to signal other error
  conditions.

- ObjVAIreturn() returns leases to the storage when the caller is done with them

  For efficiency, leases should be returned in batches, and latest if
  ObjVAIlease() requests so by returning -ENOBUFS.

- ObjVAIfini() finalizes iteration. The handle must not be used thereafter.

Implementation

One particular aspect of the implementation is that the storage engine returns
the "lease", "return" and "fini" functions to be used with the handle. This
allows the storage engine to provide functions tailored to the attributes of the
storage object, for example streaming fetches require more elaborate handling
than settled storage objects.

Consequently, the vai_hdl which is, by design, opaque to the caller, is not
entirely opaque to the object layer: The implementation requires it to start
with a struct vai_hdl_preamble containing the function pointers to be called by
ObjVAIlease(), ObjVAIreturn() and ObjVAIfini(). The return function pointer
vai_return is optional.

More details about the implementation will become clear with the next commit,
which implements SML's synchronous iterator using the new API.
  • Loading branch information
nigoroll committed Nov 1, 2024
1 parent 6097a68 commit 7651240
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 1 deletion.
16 changes: 16 additions & 0 deletions bin/varnishd/cache/cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include <pthread.h>
#include <stdarg.h>
#include <sys/types.h>
#include <sys/uio.h>

#include "vdef.h"
#include "vrt.h"
Expand Down Expand Up @@ -784,6 +785,21 @@ int ObjCheckFlag(struct worker *, struct objcore *, enum obj_flags of);
typedef void *vai_hdl;
typedef void vai_notify_cb(vai_hdl, void *priv);

struct vaiov {
unsigned magic;
#define VAIOV_MAGIC 0x7a107a10
unsigned flags;
#define VAIOV_F_END 1 // last VAIOV
uint64_t lease;
struct iovec iov;
};

vai_hdl ObjVAIinit(struct worker *, struct objcore *, struct ws *,
vai_notify_cb *, void *);
int ObjVAIlease(struct worker *, vai_hdl, struct vaiov *, int);
void ObjVAIreturn(struct worker *, vai_hdl, uint64_t *, int);
void ObjVAIfini(struct worker *, vai_hdl *);

/* cache_req_body.c */
ssize_t VRB_Iterate(struct worker *, struct vsl_log *, struct req *,
objiterate_f *func, void *priv);
Expand Down
94 changes: 94 additions & 0 deletions bin/varnishd/cache/cache_obj.c
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,100 @@ ObjIterate(struct worker *wrk, struct objcore *oc,
return (om->objiterator(wrk, oc, priv, func, final));
}

/*====================================================================
* ObjVAI...(): Asynchronous Iteration
*
*
* ObjVAIinit() returns an opaque handle, or NULL if not supported
*
* A VAI handle must not be used concurrently
*
* the vai_notify_cb(priv) will be called asynchronously by the storage
* engine when a -EAGAIN / -ENOBUFS condition is over and ObjVAIlease()
* can be called again.
*
* Note:
* - the callback gets executed by an arbitrary thread
* - WITH the boc mtx held
* so it should never block and only do minimal work
*
* ObjVAIlease() fills the vaiov array passed in with leases. returns:
*
* -EAGAIN: nothing available at the moment, storage will notify, no use to
* call again until notification
* -ENOBUFS: caller needs to return leases, storage will notify
* -EPIPE: BOS_FAILED for busy object
* -(errno): other problem, fatal
* 0: EOF
* n: number of vaiovs filled
*
* struct vaiov:
*
* the returned leases can be used by the caller until returned with
* ObjVAIreturn(). The storage guarantees that the lease member is a
* multiple of 8 (that is, the lower three bits are zero). These can be
* used by the caller between lease and return, but must be returned to
* zero before returning.
*
* ObjVAIreturn() returns leases
*
* it must be called with an array of lease values from vaiovs
* received when the caller can guarantee that they are no longer accessed
*
* ObjVAIfini() finalized iteration
*
* it must be called when iteration is done, irrespective of error status
*/

vai_hdl
ObjVAIinit(struct worker *wrk, struct objcore *oc, struct ws *ws,
vai_notify_cb *cb, void *cb_priv)
{
const struct obj_methods *om = obj_getmethods(oc);

CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);

if (om->vai_init == NULL)
return (NULL);
return (om->vai_init(wrk, oc, ws, cb, cb_priv));
}

int
ObjVAIlease(struct worker *wrk, vai_hdl vhdl, struct vaiov *vaiov, int n)
{
struct vai_hdl_preamble *vaip = vhdl;

AN(vaip);
assert(vaip->magic2 == VAI_HDL_PREAMBLE_MAGIC2);
AN(vaip->vai_lease);
return (vaip->vai_lease(wrk, vhdl, vaiov, n));
}

void
ObjVAIreturn(struct worker *wrk, vai_hdl vhdl, uint64_t *leases, int n)
{
struct vai_hdl_preamble *vaip = vhdl;

AN(vaip);
assert(vaip->magic2 == VAI_HDL_PREAMBLE_MAGIC2);
/* vai_return is optional */
if (vaip->vai_return == NULL)
return;
vaip->vai_return(wrk, vhdl, leases, n);
}

void
ObjVAIfini(struct worker *wrk, vai_hdl *vhdlp)
{
AN(vhdlp);
struct vai_hdl_preamble *vaip = *vhdlp;

AN(vaip);
assert(vaip->magic2 == VAI_HDL_PREAMBLE_MAGIC2);
AN(vaip->vai_lease);
return (vaip->vai_fini(wrk, vhdlp));
}

/*====================================================================
* ObjGetSpace()
*
Expand Down
80 changes: 79 additions & 1 deletion bin/varnishd/cache/cache_obj.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,83 @@ struct vai_qe {
void *priv;
};

#define VAI_ASSERT_LEASE(x) AZ((x) & 0x7)

/*
* start an iteration. the ws can we used (reserved) by storage
* the void * will be passed as the second argument to vai_notify_cb
*/
typedef vai_hdl vai_init_f(struct worker *, struct objcore *, struct ws *,
vai_notify_cb *, void *);

/*
* lease io vectors from storage
*
* vai_hdl is from vai_init_f
* viov / vaiovcnt is space provided by the caller to return leases
*
* return:
* -EAGAIN: nothing available at the moment, storage will notify, no use to
* call again until notification
* -ENOBUFS: caller needs to return leases, storage will notify
* -EPIPE: BOS_FAILED for busy object
* -(errno): other problem, fatal
* 0: EOF
* n: number of vaiovs filled
*/
typedef int vai_lease_f(struct worker *, vai_hdl, struct vaiov *vaiov, int vaiovcnt);

/*
* return leases
*/
typedef void vai_return_f(struct worker *,vai_hdl, uint64_t *leases, int leasecnt);

/*
* finish iteration, vai_return_f must have been called on all leases
*/
typedef void vai_fini_f(struct worker *, vai_hdl *);

/*
* vai_hdl must start with this preamble such that when cast to it, cache_obj.c
* has access to the methods.
*
* The first magic is owned by storage, the second magic is owned by cache_obj.c
* and must be initialized to VAI_HDL_PREAMBLE_MAGIC2
*
*/

struct vai_hdl_preamble {
unsigned magic; // owned by storage
unsigned magic2;
#define VAI_HDL_PREAMBLE_MAGIC2 0x7a15d162
vai_lease_f *vai_lease;
vai_return_f *vai_return; // optional
uintptr_t reserve[4]; // abi fwd compat
vai_fini_f *vai_fini;
};

#define INIT_VAI_HDL(to, x) do { \
(void)memset(to, 0, sizeof *(to)); \
(to)->preamble.magic = (x); \
(to)->preamble.magic2 = VAI_HDL_PREAMBLE_MAGIC2; \
} while (0)

#define CHECK_VAI_HDL(obj, x) do { \
assert(obj->preamble.magic == (x)); \
assert(obj->preamble.magic2 == VAI_HDL_PREAMBLE_MAGIC2);\
} while (0)

#define CHECK_VAI_HDL_NOTNULL(obj, x) do { \
AN(obj); \
CHECK_VAI_HDL(obj, x); \
} while (0)

#define CAST_VAI_HDL_NOTNULL(obj, ptr, x) do { \
AN(ptr); \
(obj) = (ptr); \
CHECK_VAI_HDL(obj, x); \
} while (0)

struct obj_methods {
/* required */
objfree_f *objfree;
Expand All @@ -84,5 +161,6 @@ struct obj_methods {
objslim_f *objslim;
objtouch_f *objtouch;
objsetstate_f *objsetstate;
/* async iteration (VAI) */
vai_init_f *vai_init;
};

0 comments on commit 7651240

Please sign in to comment.