From 0ed3a19a7d8309a054d1391461aaafd6a95539ff Mon Sep 17 00:00:00 2001 From: Oleg Jukovec Date: Wed, 6 Sep 2023 10:57:52 +0300 Subject: [PATCH] fifottl: fix release/bury next_event setup `take` sets up the next event time as current time + `ttr`, while `release` and `bury` do not update the next event time. This can lead to a task being deleted after `take time` + `ttr` after `release` or `bury` in the queue fiber. To fix the issue we can copy approach from the `queue` module [1][2]. 1. https://github.com/tarantool/queue/blob/481b5fbf47a7db5c504c64a688b4f8a5fc765a45/queue/abstract/driver/fifottl.lua#L348-L351 2. https://github.com/tarantool/queue/blob/481b5fbf47a7db5c504c64a688b4f8a5fc765a45/queue/abstract/driver/fifottl.lua#L367-L370 Closes #65 --- CHANGELOG.md | 2 + sharded_queue/drivers/fifottl.lua | 22 +++++++++-- test/ttl_test.lua | 61 +++++++++++++++++++++++++++++++ 3 files changed, 82 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 266cea1..878d29b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Fixed +- A deletion of a released task after `ttr` in the `fifottl` driver. + ## 0.1.0 - 2023-06-16 The first release provides cartridge roles that implement a distributed diff --git a/sharded_queue/drivers/fifottl.lua b/sharded_queue/drivers/fifottl.lua index f65097b..05510ad 100644 --- a/sharded_queue/drivers/fifottl.lua +++ b/sharded_queue/drivers/fifottl.lua @@ -375,7 +375,6 @@ function method.ack(args) end function method.peek(args) - local task = box.space[args.tube_name]:get(args.task_id) if args.extra and args.extra.log_request then @@ -386,7 +385,15 @@ function method.peek(args) end function method.release(args) - local task = box.space[args.tube_name]:update(args.task_id, { {'=', index.status, state.READY} }) + box.begin() + local task = box.space[args.tube_name]:get(args.task_id) + if task ~= nil then + task = box.space[args.tube_name]:update(args.task_id, { + {'=', index.status, state.READY}, + {'=', index.next_event, task[index.created] + task[index.ttl]}, + }) + end + box.commit() if args.extra and args.extra.log_request then log_operation("release", task) @@ -401,7 +408,16 @@ function method.bury(args) update_stat(args.tube_name, 'bury') wc_signal(args.tube_name) - local task = box.space[args.tube_name]:update(args.task_id, { {'=', index.status, state.BURIED} }) + box.begin() + local task = box.space[args.tube_name]:get(args.task_id) + if task ~= nil then + task = box.space[args.tube_name]:update(args.task_id, { + {'=', index.status, state.BURIED}, + {'=', index.next_event, task[index.created] + task[index.ttl]}, + }) + end + box.commit() + if args.extra and args.extra.log_request then log_operation("bury", task) end diff --git a/test/ttl_test.lua b/test/ttl_test.lua index e647e6c..746f90c 100644 --- a/test/ttl_test.lua +++ b/test/ttl_test.lua @@ -150,3 +150,64 @@ function g.test_delayed_tasks() t.assert_equals(peek_task[utils.index.status], utils.state.TAKEN) end + +function g.test_ttr_release_no_delete_task() + local tube_name = 'ttr_release_no_delete_task_test' + g.queue_conn:call('queue.create_tube', { + tube_name, + { + wait_factor = 1, + ttr = 0.2, + log_request = true, + } + }) + + g.queue_conn:call(utils.shape_cmd(tube_name, 'put'), { + 'simple data', + }) + local taken_task = g.queue_conn:call(utils.shape_cmd(tube_name, 'take')) + local released_task = g.queue_conn:call(utils.shape_cmd(tube_name, 'release'), { + taken_task[utils.index.task_id] + }) + t.assert_equals(released_task[utils.index.data], 'simple data') + t.assert_equals(released_task[utils.index.status], utils.state.READY) + + -- Wait for a clenup fiber. + fiber.sleep(1) + + local retaken_task = g.queue_conn:call(utils.shape_cmd(tube_name, 'take'), {0.5}) + t.assert_not_equals(retaken_task, box.NULL) + t.assert_equals(retaken_task[utils.index.data], 'simple data') + t.assert_equals(retaken_task[utils.index.status], utils.state.TAKEN) +end + +function g.test_ttr_bury_no_delete_task() + local tube_name = 'ttr_bury_no_delete_task_test' + g.queue_conn:call('queue.create_tube', { + tube_name, + { + wait_factor = 1, + ttr = 0.2, + log_request = true, + } + }) + + g.queue_conn:call(utils.shape_cmd(tube_name, 'put'), { + 'simple data', + }) + local taken_task = g.queue_conn:call(utils.shape_cmd(tube_name, 'take')) + local buried_task = g.queue_conn:call(utils.shape_cmd(tube_name, 'bury'), { + taken_task[utils.index.task_id] + }) + t.assert_equals(buried_task[utils.index.data], 'simple data') + t.assert_equals(buried_task[utils.index.status], utils.state.BURIED) + + -- Wait for a clenup fiber. + fiber.sleep(1) + + g.queue_conn:call(utils.shape_cmd(tube_name, 'kick'), {1}) + local retaken_task = g.queue_conn:call(utils.shape_cmd(tube_name, 'take'), {0.5}) + t.assert_not_equals(retaken_task, box.NULL) + t.assert_equals(retaken_task[utils.index.data], 'simple data') + t.assert_equals(retaken_task[utils.index.status], utils.state.TAKEN) +end