diff --git a/sharded-queue-scm-1.rockspec b/sharded-queue-scm-1.rockspec index b4210c4..3e53455 100755 --- a/sharded-queue-scm-1.rockspec +++ b/sharded-queue-scm-1.rockspec @@ -30,6 +30,11 @@ build = { ['sharded_queue.stash'] = 'sharded_queue/stash.lua', ['sharded_queue.state'] = 'sharded_queue/state.lua', ['sharded_queue.stats.storage'] = 'sharded_queue/stats/storage.lua', + ['sharded_queue.storage.config'] = 'sharded_queue/storage/config.lua', + ['sharded_queue.storage.drivers'] = 'sharded_queue/storage/drivers.lua', + ['sharded_queue.storage.methods'] = 'sharded_queue/storage/methods.lua', + ['sharded_queue.storage.metrics'] = 'sharded_queue/storage/metrics.lua', + ['sharded_queue.storage.tubes'] = 'sharded_queue/storage/tubes.lua', ['sharded_queue.version'] = 'sharded_queue/version.lua', }, }, diff --git a/sharded_queue/api.lua b/sharded_queue/api.lua index 4036a8b..53a6587 100644 --- a/sharded_queue/api.lua +++ b/sharded_queue/api.lua @@ -540,7 +540,12 @@ local function init(opts) end local function validate_config(cfg) - return utils.validate_config_cfg(cfg) + local cfg_tubes = cfg.tubes or {} + local ok, err = utils.validate_tubes(cfg_tubes, false) + if not ok then + return ok, err + end + return utils.validate_cfg(cfg_tubes['cfg']) end local function apply_config(cfg, opts) diff --git a/sharded_queue/storage.lua b/sharded_queue/storage.lua index c3ecdb5..f33d03f 100644 --- a/sharded_queue/storage.lua +++ b/sharded_queue/storage.lua @@ -1,208 +1,61 @@ -local fiber = require('fiber') -local json = require('json') -local log = require('log') - -local cartridge = require('cartridge') - -local metrics = require('sharded_queue.metrics') -local stash = require('sharded_queue.stash') -local state = require('sharded_queue.state') +local config = require('sharded_queue.storage.config') +local methods = require('sharded_queue.storage.methods') +local metrics = require('sharded_queue.storage.metrics') local stats_storage = require('sharded_queue.stats.storage') +local tubes = require('sharded_queue.storage.tubes').new() local utils = require('sharded_queue.utils') -local DEFAULT_DRIVER = 'sharded_queue.drivers.fifottl' - -local stash_names = { - cfg = '__sharded_queue_storage_cfg', - metrics_stats = '__sharded_queue_storage_metrics_stats', -} -stash.setup(stash_names) - -local methods = { - 'statistic', - 'put', - 'take', - 'delete', - 'touch', - 'ack', - 'peek', - 'release', - 'bury', - 'kick', -} - -local storage = { - cfg = stash.get(stash_names.cfg), - metrics_stats = metrics.init(stash.get(stash_names.metrics_stats)), -} - -if storage.cfg.metrics == nil then - storage.cfg.metrics = true -end - -if storage.cfg.metrics then - storage.cfg.metrics = metrics.is_supported() -end +local function init(opts) -local queue_drivers = {} -local function get_driver(driver_name) - if queue_drivers[driver_name] == nil then - queue_drivers[driver_name] = require(driver_name) - end - return queue_drivers[driver_name] end -local tubes = {} - -local function map_tubes(cfg_tubes) - local result = {} - for tube_name, tube_opts in pairs(cfg_tubes) do - if tube_name['cfg'] ~= nil or tube_opts.enable == nil then - -- do not add 'cfg' as a tube - local driver_name = tube_opts.driver or DEFAULT_DRIVER - result[tube_name] = get_driver(driver_name) - end - end - return result -end +local function validate_config(cfg) + local cfg_tubes = cfg.tubes or {} -local function metrics_enable() - local get_statistic = function(tube) - return stats_storage.get(tube) + local ok, err = utils.validate_tubes(cfg_tubes, true) + if not ok then + return ok, err end - - storage.metrics_stats:enable('storage', tubes, get_statistic) + return utils.validate_cfg(cfg_tubes['cfg']) end -local function metrics_disable() - storage.metrics_stats:disable() -end - -local function validate_config(cfg) - local cfg_tubes = cfg.tubes or {} - for tube_name, tube_opts in pairs(cfg_tubes) do - if tube_opts.driver ~= nil then - if type('tube_opts.driver') ~= 'string' then - return nil, 'Driver name must be a valid module name for tube' .. tube_name - end - local ok, _ = pcall(require, tube_opts.driver) - if not ok then - return nil, ('Driver %s could not be loaded for tube %s'):format(tube_opts.driver, tube_name) - end +local function apply_config(cfg, opts) + local cfg_tubes = table.deepcopy(cfg.tubes or {}) + if cfg_tubes['cfg'] ~= nil then + local options = cfg_tubes['cfg'] + if options.metrics ~= nil then + config.metrics = options.metrics and true or false end + cfg_tubes['cfg'] = nil end - return utils.validate_config_cfg(cfg) -end - -local function apply_config(cfg, opts) if opts.is_master then stats_storage.init() - local cfg_tubes = cfg.tubes or {} - if cfg_tubes['cfg'] ~= nil then - local options = cfg_tubes['cfg'] - if options.metrics ~= nil then - storage.cfg.metrics = options.metrics and true or false - end - end - - local existing_tubes = tubes - - tubes = map_tubes(cfg_tubes) - - -- try create tube -- - for tube_name, driver in pairs(tubes) do - if existing_tubes[tube_name] == nil then - tubes[tube_name].create({ - name = tube_name, - options = cfg_tubes[tube_name] - }) - stats_storage.reset(tube_name) - end - end - - -- try drop tube -- - for tube_name, driver in pairs(existing_tubes) do - if tubes[tube_name] == nil then - driver.drop(tube_name) - end + local new = tubes:update(cfg_tubes) + for _, tube in ipairs(new) do + stats_storage.reset(tube) end - -- register tube methods -- - for _, name in pairs(methods) do - local func = function(args) - if args == nil then args = {} end - args.options = cfg_tubes[args.tube_name] or {} - - local tube_name = args.tube_name - if tubes[tube_name].method[name] == nil then error(('Method %s not implemented in tube %s'):format(name, tube_name)) end - - local before = fiber.clock() - local ok, ret, err = pcall(tubes[tube_name].method[name], args) - local after = fiber.clock() - - if storage.cfg.metrics then - storage.metrics_stats:observe(after - before, - tube_name, name, ok and err == nil) - end - - if not ok then - error(ret) - end - - return ret, err - end - - local global_name = 'tube_' .. name - rawset(_G, global_name, func) - box.schema.func.create(global_name, { if_not_exists = true }) - end - - local tube_statistic_func = function(args) - local before = fiber.clock() - local ok, ret, err = pcall(stats_storage.get, args.tube_name) - local after = fiber.clock() - if storage.cfg.metrics then - storage.metrics_stats:observe(after - before, - args.tube_name, 'statistic', ok and err == nil) - end - - if not ok then - error(ret) - end - - return ret, err - end - - rawset(_G, 'tube_statistic', tube_statistic_func) - box.schema.func.create('tube_statistic', { if_not_exists = true }) + methods.init(metrics, tubes) end - if storage.cfg.metrics then - metrics_enable() + if config.metrics then + metrics.enable(tubes:map()) else - metrics_disable() + metrics.disable() end return true end -local function init(opts) - -end - return { init = init, - apply_config = apply_config, validate_config = validate_config, + apply_config = apply_config, _VERSION = require('sharded_queue.version'), dependencies = { 'cartridge.roles.vshard-storage', }, - - __private = { - methods = methods, - } } diff --git a/sharded_queue/storage/config.lua b/sharded_queue/storage/config.lua new file mode 100644 index 0000000..409c342 --- /dev/null +++ b/sharded_queue/storage/config.lua @@ -0,0 +1,19 @@ +local metrics = require('sharded_queue.metrics') +local stash = require('sharded_queue.stash') + +local stash_names = { + config = '__sharded_queue_storage_config', +} +stash.setup(stash_names) + +local config = stash.get(stash_names.config) + +if config.metrics == nil then + config.metrics = true +end + +if config.metrics then + config.metrics = metrics.is_supported() +end + +return config diff --git a/sharded_queue/storage/drivers.lua b/sharded_queue/storage/drivers.lua new file mode 100644 index 0000000..62b989b --- /dev/null +++ b/sharded_queue/storage/drivers.lua @@ -0,0 +1,15 @@ +local DEFAULT_DRIVER = 'sharded_queue.drivers.fifottl' +local queue_drivers = {} + +local function get_driver(driver_name) + driver_name = driver_name or DEFAULT_DRIVER + + if queue_drivers[driver_name] == nil then + queue_drivers[driver_name] = require(driver_name) + end + return queue_drivers[driver_name] +end + +return { + get = get_driver, +} diff --git a/sharded_queue/storage/methods.lua b/sharded_queue/storage/methods.lua new file mode 100644 index 0000000..63e8283 --- /dev/null +++ b/sharded_queue/storage/methods.lua @@ -0,0 +1,67 @@ +local fiber = require('fiber') +local stats_storage = require('sharded_queue.stats.storage') + +local methods = { + 'statistic', + 'put', + 'take', + 'delete', + 'touch', + 'ack', + 'peek', + 'release', + 'bury', + 'kick', +} + +local function init(metrics, tubes) + for _, method in pairs(methods) do + local func = function(args) + args = args or {} + args.options = tubes:get_options(args.tube_name) or {} + + local tube_name = args.tube_name + local before = fiber.clock() + local ok, ret, err = pcall(tubes.call, tubes, tube_name, method, args) + local latency = fiber.clock() - before + + metrics.observe(latency, tube_name, method, ok and err == nil) + + if not ok then + error(ret) + end + + return ret, err + end + + local global_name = 'tube_' .. method + rawset(_G, global_name, func) + box.schema.func.create(global_name, { if_not_exists = true }) + end + + local tube_statistic_func = function(args) + local before = fiber.clock() + local ok, ret, err = pcall(stats_storage.get, args.tube_name) + local latency = fiber.clock() - before + + metrics.observe(latency, args.tube_name, 'statistic', ok and err == nil) + + if not ok then + error(ret) + end + + return ret, err + end + + rawset(_G, 'tube_statistic', tube_statistic_func) + box.schema.func.create('tube_statistic', { if_not_exists = true }) +end + +local function get_list() + return methods +end + +return { + init = init, + get_list = get_list, +} diff --git a/sharded_queue/storage/metrics.lua b/sharded_queue/storage/metrics.lua new file mode 100644 index 0000000..67ed4d8 --- /dev/null +++ b/sharded_queue/storage/metrics.lua @@ -0,0 +1,32 @@ +local metrics = require('sharded_queue.metrics') +local stash = require('sharded_queue.stash') +local stats_storage = require('sharded_queue.stats.storage') + +local stash_names = { + metrics_stats = '__sharded_queue_storage_metrics_stats', +} +stash.setup(stash_names) + +local metrics_stats = metrics.init(stash.get(stash_names.metrics_stats)) + +local function enable(tubes) + local get_statistic = function(tube) + return stats_storage.get(tube) + end + + metrics_stats:enable('storage', tubes, get_statistic) +end + +local function observe(latency, tube, method, ok) + metrics_stats:observe(latency, tube, method, ok) +end + +local function disable() + metrics_stats:disable() +end + +return { + enable = enable, + observe = observe, + disable = disable, +} diff --git a/sharded_queue/storage/tubes.lua b/sharded_queue/storage/tubes.lua new file mode 100644 index 0000000..f29417c --- /dev/null +++ b/sharded_queue/storage/tubes.lua @@ -0,0 +1,85 @@ +local drivers = require('sharded_queue.storage.drivers') + +local tubes = {} + +local function map_tubes(cfg_tubes) + cfg_tubes = cfg_tubes or {} + + local result = {} + for tube_name, tube_opts in pairs(cfg_tubes) do + if tube_opts.enable == nil or tube_opts.enable == true then + result[tube_name] = drivers.get(tube_opts.driver) + end + end + return result +end + +local function call(self, tube, method, args) + if self.tubes[tube] == nil then + error(('Tube %s not exist'):format(tube)) + end + if self.tubes[tube].method[method] == nil then + error(('Method %s not implemented in tube %s'):format(method, tube)) + end + return self.tubes[tube].method[method](args) +end + +local function map(self) + return self.tubes +end + +local function get_options(self, tube) + return self.options[tube] +end + +local function update(self, cfg_tubes) + local existing_tubes = self:map() + + self.options = cfg_tubes or {} + self.tubes = map_tubes(cfg_tubes) + + -- Create new. + local new = {} + for tube_name, driver in pairs(self.tubes) do + if existing_tubes[tube_name] == nil then + self.tubes[tube_name].create({ + name = tube_name, + options = cfg_tubes[tube_name] + }) + table.insert(new, tube_name) + end + end + + -- Remove old. + local old = {} + for tube_name, driver in pairs(existing_tubes) do + if self.tubes[tube_name] == nil then + driver.drop(tube_name) + table.insert(old, tube_name) + end + end + + return new, old +end + +local mt = { + __index = { + call = call, + get_options = get_options, + map = map, + update = update, + }, +} + +local function new() + local ret = { + tubes = {}, + options = {}, + } + setmetatable(ret, mt) + return ret +end + +return { + new = new, +} diff --git a/sharded_queue/utils.lua b/sharded_queue/utils.lua index 3f483b4..e1999e3 100644 --- a/sharded_queue/utils.lua +++ b/sharded_queue/utils.lua @@ -74,13 +74,31 @@ function utils.normalize.wait_max(wait_max) return wait_max end -function utils.validate_config_cfg(cfg) - cfg = cfg.tubes or {} - if cfg['cfg'] == nil then +function utils.validate_tubes(tubes, on_storage) + for tube_name, tube_opts in pairs(tubes) do + if tube_opts.driver ~= nil then + if type(tube_opts.driver) ~= 'string' then + local msg = 'Driver name must be a valid module name for tube %s' + return nil, msg:format(tube_name) + end + if on_storage then + local ok, _ = pcall(require, tube_opts.driver) + if not ok then + local msg = 'Driver %s could not be loaded for tube %s' + return nil, msg:format(tube_opts.driver, tube_name) + end + end + end + end + + return true +end + +function utils.validate_cfg(cfg) + if cfg == nil then return true end - cfg = cfg['cfg'] if type(cfg) ~= 'table' then return nil, '"cfg" must be a table' end diff --git a/test/storage_test.lua b/test/storage_test.lua index ad10ff5..521a3d5 100644 --- a/test/storage_test.lua +++ b/test/storage_test.lua @@ -3,9 +3,8 @@ local t = require('luatest') local g = t.group('storage') -local storage = require('sharded_queue.storage') local config = require('test.helper.config') - +local methods = require('sharded_queue.storage.methods') g.before_all(function() g.storage_master = config.cluster:server('queue-storage-1-0').net_box @@ -17,7 +16,7 @@ g.test_storage_methods = function() local ro = g.storage_ro:eval("return box.cfg.read_only") t.assert_equals(ro, true) - for _,method in pairs(storage.__private.methods) do + for _, method in pairs(methods.get_list()) do local global_name = 'tube_' .. method -- Master storage t.assert_equals(g.storage_master:eval(string.format('return box.schema.func.exists("%s")', global_name)), true)