Skip to content

Commit

Permalink
fifottl: fix release/bury next_event setup
Browse files Browse the repository at this point in the history
`take` sets up the next event time as current time + `ttr`, while
`release` and `bury` do not update the next event time. This can lead
to a task being deleted after `take time` + `ttr` after `release`
or `bury` in the queue fiber.

To fix the issue we can copy approach from the `queue` module [1][2].

1. https://github.com/tarantool/queue/blob/481b5fbf47a7db5c504c64a688b4f8a5fc765a45/queue/abstract/driver/fifottl.lua#L348-L351
2. https://github.com/tarantool/queue/blob/481b5fbf47a7db5c504c64a688b4f8a5fc765a45/queue/abstract/driver/fifottl.lua#L367-L370

Closes #65
  • Loading branch information
oleg-jukovec committed Sep 6, 2023
1 parent 625423d commit 0ed3a19
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 3 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

### Fixed

- A deletion of a released task after `ttr` in the `fifottl` driver.

## 0.1.0 - 2023-06-16

The first release provides cartridge roles that implement a distributed
Expand Down
22 changes: 19 additions & 3 deletions sharded_queue/drivers/fifottl.lua
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,6 @@ function method.ack(args)
end

function method.peek(args)

local task = box.space[args.tube_name]:get(args.task_id)

if args.extra and args.extra.log_request then
Expand All @@ -386,7 +385,15 @@ function method.peek(args)
end

function method.release(args)
local task = box.space[args.tube_name]:update(args.task_id, { {'=', index.status, state.READY} })
box.begin()
local task = box.space[args.tube_name]:get(args.task_id)
if task ~= nil then
task = box.space[args.tube_name]:update(args.task_id, {
{'=', index.status, state.READY},
{'=', index.next_event, task[index.created] + task[index.ttl]},
})
end
box.commit()

if args.extra and args.extra.log_request then
log_operation("release", task)
Expand All @@ -401,7 +408,16 @@ function method.bury(args)
update_stat(args.tube_name, 'bury')
wc_signal(args.tube_name)

local task = box.space[args.tube_name]:update(args.task_id, { {'=', index.status, state.BURIED} })
box.begin()
local task = box.space[args.tube_name]:get(args.task_id)
if task ~= nil then
task = box.space[args.tube_name]:update(args.task_id, {
{'=', index.status, state.BURIED},
{'=', index.next_event, task[index.created] + task[index.ttl]},
})
end
box.commit()

if args.extra and args.extra.log_request then
log_operation("bury", task)
end
Expand Down
61 changes: 61 additions & 0 deletions test/ttl_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -150,3 +150,64 @@ function g.test_delayed_tasks()
t.assert_equals(peek_task[utils.index.status], utils.state.TAKEN)

end

function g.test_ttr_release_no_delete_task()
local tube_name = 'ttr_release_no_delete_task_test'
g.queue_conn:call('queue.create_tube', {
tube_name,
{
wait_factor = 1,
ttr = 0.2,
log_request = true,
}
})

g.queue_conn:call(utils.shape_cmd(tube_name, 'put'), {
'simple data',
})
local taken_task = g.queue_conn:call(utils.shape_cmd(tube_name, 'take'))
local released_task = g.queue_conn:call(utils.shape_cmd(tube_name, 'release'), {
taken_task[utils.index.task_id]
})
t.assert_equals(released_task[utils.index.data], 'simple data')
t.assert_equals(released_task[utils.index.status], utils.state.READY)

-- Wait for a clenup fiber.
fiber.sleep(1)

local retaken_task = g.queue_conn:call(utils.shape_cmd(tube_name, 'take'), {0.5})
t.assert_not_equals(retaken_task, box.NULL)
t.assert_equals(retaken_task[utils.index.data], 'simple data')
t.assert_equals(retaken_task[utils.index.status], utils.state.TAKEN)
end

function g.test_ttr_bury_no_delete_task()
local tube_name = 'ttr_bury_no_delete_task_test'
g.queue_conn:call('queue.create_tube', {
tube_name,
{
wait_factor = 1,
ttr = 0.2,
log_request = true,
}
})

g.queue_conn:call(utils.shape_cmd(tube_name, 'put'), {
'simple data',
})
local taken_task = g.queue_conn:call(utils.shape_cmd(tube_name, 'take'))
local buried_task = g.queue_conn:call(utils.shape_cmd(tube_name, 'bury'), {
taken_task[utils.index.task_id]
})
t.assert_equals(buried_task[utils.index.data], 'simple data')
t.assert_equals(buried_task[utils.index.status], utils.state.BURIED)

-- Wait for a clenup fiber.
fiber.sleep(1)

g.queue_conn:call(utils.shape_cmd(tube_name, 'kick'), {1})
local retaken_task = g.queue_conn:call(utils.shape_cmd(tube_name, 'take'), {0.5})
t.assert_not_equals(retaken_task, box.NULL)
t.assert_equals(retaken_task[utils.index.data], 'simple data')
t.assert_equals(retaken_task[utils.index.status], utils.state.TAKEN)
end

0 comments on commit 0ed3a19

Please sign in to comment.