Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: kvs: call content.flush before checkpoint #6240

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
content: require backing store for checkpoint
Problem: A backing store is required for content.flush but it
is not required for content.checkpoint-put.  This is inconsistent
and can lead to checkpointing problems done the line.

Require content.checkpoint-put to only work if there is a backing
store available.  As a consequence, remove code that handled
"cached" checkpoints when a backing store is not available.

Fixes #6251
  • Loading branch information
chu11 committed Dec 18, 2024
commit 38717f7f3ea6c0719017f7d0af0f1c2e37b8dd65
1 change: 0 additions & 1 deletion src/modules/content/cache.c
Original file line number Diff line number Diff line change
@@ -782,7 +782,6 @@ static void content_register_backing_request (flux_t *h,
if (flux_respond (h, msg, NULL) < 0)
flux_log_error (h, "error responding to register-backing request");
(void)cache_flush (cache);
(void)checkpoints_flush (cache->checkpoint);
return;
error:
if (flux_respond_error (h, msg, errno, errstr) < 0)
180 changes: 13 additions & 167 deletions src/modules/content/checkpoint.c
Original file line number Diff line number Diff line change
@@ -28,74 +28,8 @@ struct content_checkpoint {
flux_msg_handler_t **handlers;
uint32_t rank;
struct content_cache *cache;
zhashx_t *hash;
unsigned int hash_dirty;
};

struct checkpoint_data {
struct content_checkpoint *checkpoint;
json_t *value;
uint8_t dirty:1;
bool in_progress;
int refcount;
};

static struct checkpoint_data *
checkpoint_data_incref (struct checkpoint_data *data)
{
if (data)
data->refcount++;
return data;
}

static void checkpoint_data_decref (struct checkpoint_data *data)
{
if (data && --data->refcount == 0) {
if (data->dirty)
data->checkpoint->hash_dirty--;
json_decref (data->value);
free (data);
}
}

/* zhashx_destructor_fn */
static void checkpoint_data_decref_wrapper (void **arg)
{
if (arg) {
struct checkpoint_data *data = *arg;
checkpoint_data_decref (data);
}
}

static struct checkpoint_data *
checkpoint_data_create (struct content_checkpoint *checkpoint,
json_t *value)
{
struct checkpoint_data *data = NULL;

if (!(data = calloc (1, sizeof (*data))))
return NULL;
data->checkpoint = checkpoint;
data->value = json_incref (value);
data->refcount = 1;
return data;
}

static int checkpoint_data_update (struct content_checkpoint *checkpoint,
const char *key,
json_t *value)
{
struct checkpoint_data *data = NULL;

if (!(data = checkpoint_data_create (checkpoint, value)))
return -1;

zhashx_update (checkpoint->hash, key, data);
data->dirty = 1;
checkpoint->hash_dirty++;
return 0;
}

static void checkpoint_get_continuation (flux_future_t *f, void *arg)
{
struct content_checkpoint *checkpoint = arg;
@@ -111,9 +45,6 @@ static void checkpoint_get_continuation (flux_future_t *f, void *arg)
if (flux_rpc_get_unpack (f, "{s:o}", "value", &value) < 0)
goto error;

if (checkpoint_data_update (checkpoint, key, value) < 0)
goto error;

if (flux_respond_pack (checkpoint->h, msg, "{s:O}", "value", value) < 0)
flux_log_error (checkpoint->h, "error responding to checkpoint-get");

@@ -176,24 +107,16 @@ void content_checkpoint_get_request (flux_t *h, flux_msg_handler_t *mh,
const char *key;
const char *errstr = NULL;

if (flux_request_unpack (msg, NULL, "{s:s}", "key", &key) < 0)
goto error;

if (checkpoint->rank == 0
&& !content_cache_backing_loaded (checkpoint->cache)) {
struct checkpoint_data *data = zhashx_lookup (checkpoint->hash, key);
if (!data) {
errstr = "checkpoint key unavailable";
errno = ENOENT;
goto error;
}
if (flux_respond_pack (h, msg,
"{s:O}",
"value", data->value) < 0)
flux_log_error (h, "error responding to checkpoint-get");
return;
errstr = "checkpoint get unavailable, no backing store";
errno = ENOSYS;
goto error;
}

if (flux_request_unpack (msg, NULL, "{s:s}", "key", &key) < 0)
goto error;

if (checkpoint_get_forward (checkpoint,
msg,
key,
@@ -279,24 +202,20 @@ void content_checkpoint_put_request (flux_t *h, flux_msg_handler_t *mh,
json_t *value;
const char *errstr = NULL;

if (checkpoint->rank == 0
&& !content_cache_backing_loaded (checkpoint->cache)) {
errstr = "checkpoint put unavailable, no backing store";
errno = ENOSYS;
goto error;
}

if (flux_request_unpack (msg,
NULL,
"{s:s s:o}",
"key", &key,
"value", &value) < 0)
goto error;

if (checkpoint->rank == 0) {
if (checkpoint_data_update (checkpoint, key, value) < 0)
goto error;

if (!content_cache_backing_loaded (checkpoint->cache)) {
if (flux_respond (h, msg, NULL) < 0)
flux_log_error (checkpoint->h, "error responding to checkpoint-put");
return;
}
}

if (checkpoint_put_forward (checkpoint,
msg,
key,
@@ -311,72 +230,6 @@ void content_checkpoint_put_request (flux_t *h, flux_msg_handler_t *mh,
flux_log_error (h, "error responding to checkpoint-put request");
}

static void checkpoint_flush_continuation (flux_future_t *f, void *arg)
{
struct checkpoint_data *data = arg;
int rv;

assert (data);
if ((rv = flux_rpc_get (f, NULL)) < 0)
flux_log_error (data->checkpoint->h, "checkpoint flush rpc");
if (!rv) {
data->dirty = 0;
data->checkpoint->hash_dirty--;
}
data->in_progress = false;
checkpoint_data_decref (data);
flux_future_destroy (f);
}

static int checkpoint_flush (struct content_checkpoint *checkpoint,
struct checkpoint_data *data)
{
if (data->dirty && !data->in_progress) {
const char *key = zhashx_cursor (checkpoint->hash);
const char *topic = "content-backing.checkpoint-put";
flux_future_t *f;
if (!(f = flux_rpc_pack (checkpoint->h, topic, 0, 0,
"{s:s s:O}",
"key", key,
"value", data->value))
|| flux_future_then (f,
-1,
checkpoint_flush_continuation,
(void *)checkpoint_data_incref (data)) < 0) {
flux_log_error (checkpoint->h, "%s: checkpoint flush", __FUNCTION__);
flux_future_destroy (f);
return -1;
}
data->in_progress = true;
}
return 0;
}

int checkpoints_flush (struct content_checkpoint *checkpoint)
{
int last_errno = 0;
int rc = 0;

if (checkpoint->hash_dirty > 0) {
struct checkpoint_data *data = zhashx_first (checkpoint->hash);
while (data) {
if (checkpoint_flush (checkpoint, data) < 0) {
last_errno = errno;
rc = -1;
/* A few errors we will consider "unrecoverable", so
* break out */
if (errno == ENOSYS
|| errno == ENOMEM)
break;
}
data = zhashx_next (checkpoint->hash);
}
}
if (rc < 0)
errno = last_errno;
return rc;
}

static const struct flux_msg_handler_spec htab[] = {
{
FLUX_MSGTYPE_REQUEST,
@@ -398,7 +251,6 @@ void content_checkpoint_destroy (struct content_checkpoint *checkpoint)
if (checkpoint) {
int saved_errno = errno;
flux_msg_handler_delvec (checkpoint->handlers);
zhashx_destroy (&checkpoint->hash);
free (checkpoint);
errno = saved_errno;
}
@@ -417,16 +269,10 @@ struct content_checkpoint *content_checkpoint_create (
checkpoint->rank = rank;
checkpoint->cache = cache;

if (!(checkpoint->hash = zhashx_new ()))
goto nomem;
zhashx_set_destructor (checkpoint->hash, checkpoint_data_decref_wrapper);

if (flux_msg_handler_addvec (h, htab, checkpoint, &checkpoint->handlers) < 0)
goto error;
return checkpoint;

nomem:
errno = ENOMEM;
error:
content_checkpoint_destroy (checkpoint);
return NULL;
2 changes: 0 additions & 2 deletions src/modules/content/checkpoint.h
Original file line number Diff line number Diff line change
@@ -19,8 +19,6 @@ struct content_checkpoint *content_checkpoint_create (
struct content_cache *cache);
void content_checkpoint_destroy (struct content_checkpoint *checkpoint);

int checkpoints_flush (struct content_checkpoint *checkpoint);

#endif /* !_CONTENT_CHECKPOINT_H */

/*