Skip to content

Commit

Permalink
internal: split storage.lua to logical blocks
Browse files Browse the repository at this point in the history
The patch splits a large file `storage.lua` with a Cartridge
role for a few smaller files based on their functionality.
It will help to create a Tarantool 3.0 role from that blocks
instead of copy-paste.

Part of #68
  • Loading branch information
oleg-jukovec committed Apr 3, 2024
1 parent fb9e8ac commit 9cf8eae
Show file tree
Hide file tree
Showing 10 changed files with 279 additions and 181 deletions.
5 changes: 5 additions & 0 deletions sharded-queue-scm-1.rockspec
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ build = {
['sharded_queue.stash'] = 'sharded_queue/stash.lua',
['sharded_queue.state'] = 'sharded_queue/state.lua',
['sharded_queue.stats.storage'] = 'sharded_queue/stats/storage.lua',
['sharded_queue.storage.config'] = 'sharded_queue/storage/config.lua',
['sharded_queue.storage.drivers'] = 'sharded_queue/storage/drivers.lua',
['sharded_queue.storage.methods'] = 'sharded_queue/storage/methods.lua',
['sharded_queue.storage.metrics'] = 'sharded_queue/storage/metrics.lua',
['sharded_queue.storage.tubes'] = 'sharded_queue/storage/tubes.lua',
['sharded_queue.version'] = 'sharded_queue/version.lua',
},
},
Expand Down
7 changes: 6 additions & 1 deletion sharded_queue/api.lua
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,12 @@ local function init(opts)
end

local function validate_config(cfg)
return utils.validate_config_cfg(cfg)
local cfg_tubes = cfg.tubes or {}
local ok, err = utils.validate_tubes(cfg_tubes, false)
if not ok then
return ok, err
end
return utils.validate_cfg(cfg_tubes['cfg'])
end

local function apply_config(cfg, opts)
Expand Down
199 changes: 26 additions & 173 deletions sharded_queue/storage.lua
Original file line number Diff line number Diff line change
@@ -1,208 +1,61 @@
local fiber = require('fiber')
local json = require('json')
local log = require('log')

local cartridge = require('cartridge')

local metrics = require('sharded_queue.metrics')
local stash = require('sharded_queue.stash')
local state = require('sharded_queue.state')
local config = require('sharded_queue.storage.config')
local methods = require('sharded_queue.storage.methods')
local metrics = require('sharded_queue.storage.metrics')
local stats_storage = require('sharded_queue.stats.storage')
local tubes = require('sharded_queue.storage.tubes').new()
local utils = require('sharded_queue.utils')

local DEFAULT_DRIVER = 'sharded_queue.drivers.fifottl'

local stash_names = {
cfg = '__sharded_queue_storage_cfg',
metrics_stats = '__sharded_queue_storage_metrics_stats',
}
stash.setup(stash_names)

local methods = {
'statistic',
'put',
'take',
'delete',
'touch',
'ack',
'peek',
'release',
'bury',
'kick',
}

local storage = {
cfg = stash.get(stash_names.cfg),
metrics_stats = metrics.init(stash.get(stash_names.metrics_stats)),
}

if storage.cfg.metrics == nil then
storage.cfg.metrics = true
end

if storage.cfg.metrics then
storage.cfg.metrics = metrics.is_supported()
end
local function init(opts)

local queue_drivers = {}
local function get_driver(driver_name)
if queue_drivers[driver_name] == nil then
queue_drivers[driver_name] = require(driver_name)
end
return queue_drivers[driver_name]
end

local tubes = {}

local function map_tubes(cfg_tubes)
local result = {}
for tube_name, tube_opts in pairs(cfg_tubes) do
if tube_name['cfg'] ~= nil or tube_opts.enable == nil then
-- do not add 'cfg' as a tube
local driver_name = tube_opts.driver or DEFAULT_DRIVER
result[tube_name] = get_driver(driver_name)
end
end
return result
end
local function validate_config(cfg)
local cfg_tubes = cfg.tubes or {}

local function metrics_enable()
local get_statistic = function(tube)
return stats_storage.get(tube)
local ok, err = utils.validate_tubes(cfg_tubes, true)
if not ok then
return ok, err
end

storage.metrics_stats:enable('storage', tubes, get_statistic)
return utils.validate_cfg(cfg_tubes['cfg'])
end

local function metrics_disable()
storage.metrics_stats:disable()
end

local function validate_config(cfg)
local cfg_tubes = cfg.tubes or {}
for tube_name, tube_opts in pairs(cfg_tubes) do
if tube_opts.driver ~= nil then
if type('tube_opts.driver') ~= 'string' then
return nil, 'Driver name must be a valid module name for tube' .. tube_name
end
local ok, _ = pcall(require, tube_opts.driver)
if not ok then
return nil, ('Driver %s could not be loaded for tube %s'):format(tube_opts.driver, tube_name)
end
local function apply_config(cfg, opts)
local cfg_tubes = table.deepcopy(cfg.tubes or {})
if cfg_tubes['cfg'] ~= nil then
local options = cfg_tubes['cfg']
if options.metrics ~= nil then
config.metrics = options.metrics and true or false
end
cfg_tubes['cfg'] = nil
end

return utils.validate_config_cfg(cfg)
end

local function apply_config(cfg, opts)
if opts.is_master then
stats_storage.init()

local cfg_tubes = cfg.tubes or {}
if cfg_tubes['cfg'] ~= nil then
local options = cfg_tubes['cfg']
if options.metrics ~= nil then
storage.cfg.metrics = options.metrics and true or false
end
end

local existing_tubes = tubes

tubes = map_tubes(cfg_tubes)

-- try create tube --
for tube_name, driver in pairs(tubes) do
if existing_tubes[tube_name] == nil then
tubes[tube_name].create({
name = tube_name,
options = cfg_tubes[tube_name]
})
stats_storage.reset(tube_name)
end
end

-- try drop tube --
for tube_name, driver in pairs(existing_tubes) do
if tubes[tube_name] == nil then
driver.drop(tube_name)
end
local new = tubes:update(cfg_tubes)
for _, tube in ipairs(new) do
stats_storage.reset(tube)
end

-- register tube methods --
for _, name in pairs(methods) do
local func = function(args)
if args == nil then args = {} end
args.options = cfg_tubes[args.tube_name] or {}

local tube_name = args.tube_name
if tubes[tube_name].method[name] == nil then error(('Method %s not implemented in tube %s'):format(name, tube_name)) end

local before = fiber.clock()
local ok, ret, err = pcall(tubes[tube_name].method[name], args)
local after = fiber.clock()

if storage.cfg.metrics then
storage.metrics_stats:observe(after - before,
tube_name, name, ok and err == nil)
end

if not ok then
error(ret)
end

return ret, err
end

local global_name = 'tube_' .. name
rawset(_G, global_name, func)
box.schema.func.create(global_name, { if_not_exists = true })
end

local tube_statistic_func = function(args)
local before = fiber.clock()
local ok, ret, err = pcall(stats_storage.get, args.tube_name)
local after = fiber.clock()
if storage.cfg.metrics then
storage.metrics_stats:observe(after - before,
args.tube_name, 'statistic', ok and err == nil)
end

if not ok then
error(ret)
end

return ret, err
end

rawset(_G, 'tube_statistic', tube_statistic_func)
box.schema.func.create('tube_statistic', { if_not_exists = true })
methods.init(metrics, tubes)
end

if storage.cfg.metrics then
metrics_enable()
if config.metrics then
metrics.enable(tubes:map())
else
metrics_disable()
metrics.disable()
end

return true
end

local function init(opts)

end

return {
init = init,
apply_config = apply_config,
validate_config = validate_config,
apply_config = apply_config,
_VERSION = require('sharded_queue.version'),

dependencies = {
'cartridge.roles.vshard-storage',
},

__private = {
methods = methods,
}
}
19 changes: 19 additions & 0 deletions sharded_queue/storage/config.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
local metrics = require('sharded_queue.metrics')
local stash = require('sharded_queue.stash')

local stash_names = {
config = '__sharded_queue_storage_config',
}
stash.setup(stash_names)

local config = stash.get(stash_names.config)

if config.metrics == nil then
config.metrics = true
end

if config.metrics then
config.metrics = metrics.is_supported()
end

return config
15 changes: 15 additions & 0 deletions sharded_queue/storage/drivers.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
local DEFAULT_DRIVER = 'sharded_queue.drivers.fifottl'
local queue_drivers = {}

local function get_driver(driver_name)
driver_name = driver_name or DEFAULT_DRIVER

if queue_drivers[driver_name] == nil then
queue_drivers[driver_name] = require(driver_name)
end
return queue_drivers[driver_name]
end

return {
get = get_driver,
}
67 changes: 67 additions & 0 deletions sharded_queue/storage/methods.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
local fiber = require('fiber')
local stats_storage = require('sharded_queue.stats.storage')

local methods = {
'statistic',
'put',
'take',
'delete',
'touch',
'ack',
'peek',
'release',
'bury',
'kick',
}

local function init(metrics, tubes)
for _, method in pairs(methods) do
local func = function(args)
args = args or {}
args.options = tubes:get_options(args.tube_name) or {}

local tube_name = args.tube_name
local before = fiber.clock()
local ok, ret, err = pcall(tubes.call, tubes, tube_name, method, args)
local latency = fiber.clock() - before

metrics.observe(latency, tube_name, method, ok and err == nil)

if not ok then
error(ret)
end

return ret, err
end

local global_name = 'tube_' .. method
rawset(_G, global_name, func)
box.schema.func.create(global_name, { if_not_exists = true })
end

local tube_statistic_func = function(args)
local before = fiber.clock()
local ok, ret, err = pcall(stats_storage.get, args.tube_name)
local latency = fiber.clock() - before

metrics.observe(latency, args.tube_name, 'statistic', ok and err == nil)

if not ok then
error(ret)
end

return ret, err
end

rawset(_G, 'tube_statistic', tube_statistic_func)
box.schema.func.create('tube_statistic', { if_not_exists = true })
end

local function get_list()
return methods
end

return {
init = init,
get_list = get_list,
}
32 changes: 32 additions & 0 deletions sharded_queue/storage/metrics.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
local metrics = require('sharded_queue.metrics')
local stash = require('sharded_queue.stash')
local stats_storage = require('sharded_queue.stats.storage')

local stash_names = {
metrics_stats = '__sharded_queue_storage_metrics_stats',
}
stash.setup(stash_names)

local metrics_stats = metrics.init(stash.get(stash_names.metrics_stats))

local function enable(tubes)
local get_statistic = function(tube)
return stats_storage.get(tube)
end

metrics_stats:enable('storage', tubes, get_statistic)
end

local function observe(latency, tube, method, ok)
metrics_stats:observe(latency, tube, method, ok)
end

local function disable()
metrics_stats:disable()
end

return {
enable = enable,
observe = observe,
disable = disable,
}
Loading

0 comments on commit 9cf8eae

Please sign in to comment.