From 57ce240ec15c81b1cc19b1a6ba6695d27454fa6d Mon Sep 17 00:00:00 2001 From: Oleg Jukovec Date: Wed, 3 Apr 2024 10:46:49 +0300 Subject: [PATCH] api: add Tarantool 3 roles * The role `roles.sharded-queue-router` communicates with storages as a vshard router and contains the queue API. * The role `roles.sharded-queue-storage` stores and manages information about tubes on vshard storages. Closes #68 --- .github/workflows/tests.yml | 66 ++++-- CHANGELOG.md | 4 + Makefile | 14 +- README.md | 61 ++++-- config.yaml | 77 +++++++ init.lua | 49 ++--- instances.yaml | 11 + instances.yml | 24 --- replicasets.yml | 20 -- roles/sharded-queue-router.lua | 70 +++++++ roles/sharded-queue-storage.lua | 81 ++++++++ sharded-queue-scm-1.rockspec | 8 +- sharded_queue/drivers/fifo.lua | 5 + sharded_queue/drivers/fifottl.lua | 5 + sharded_queue/roles.lua | 26 +++ sharded_queue/router/queue.lua | 34 +-- sharded_queue/stats/storage.lua | 5 + sharded_queue/storage/methods.lua | 11 + sharded_queue/storage/vshard_utils.lua | 93 +++++++++ sharded_queue/utils.lua | 42 ++++ test/api_test.lua | 17 +- test/create_test.lua | 13 +- test/drop_test.lua | 25 +-- test/entrypoint/config.yml | 70 +++++++ .../{config.lua => config_cartridge.lua} | 46 ++++- test/helper/config_tarantool.lua | 189 +++++++++++++++++ test/helper/init.lua | 7 + test/helper/server.lua | 194 ++++++++++++++++++ test/helper/utils.lua | 16 ++ test/metrics_test.lua | 41 ++-- test/simple_test.lua | 33 +-- test/statistics_test.lua | 8 +- test/storage_test.lua | 8 +- test/take_exp_backoff_test.lua | 103 +++------- test/timeout_test.lua | 24 +-- test/ttl_test.lua | 53 ++--- 36 files changed, 1183 insertions(+), 370 deletions(-) create mode 100644 config.yaml mode change 100755 => 100644 init.lua create mode 100644 instances.yaml delete mode 100644 instances.yml delete mode 100644 replicasets.yml create mode 100644 roles/sharded-queue-router.lua create mode 100644 roles/sharded-queue-storage.lua create mode 100644 sharded_queue/roles.lua create mode 100644 sharded_queue/storage/vshard_utils.lua create mode 100644 test/entrypoint/config.yml rename test/helper/{config.lua => config_cartridge.lua} (75%) create mode 100644 test/helper/config_tarantool.lua create mode 100644 test/helper/init.lua create mode 100644 test/helper/server.lua diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 16ee12d..a83bf08 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -18,34 +18,74 @@ jobs: fail-fast: false matrix: tarantool: - - '1.10' - '2.8' - metrics: - - '' - - '1.0.0' + - '2.10' + cartridge: [true] coveralls: [false] + metrics: [false] include: - tarantool: '2.11' - metrics: '1.0.0' + cartridge: true coveralls: true + metrics: true + - tarantool: 'master' + cartridge: false + coveralls: false + metrics: false runs-on: ubuntu-20.04 steps: + - name: Clone the module + uses: actions/checkout@v3 + + - name: Setup tt + run: | + curl -L https://tarantool.io/release/2/installer.sh | sudo bash + sudo apt install -y tt + - name: Install tarantool ${{ matrix.tarantool }} - uses: tarantool/setup-tarantool@v2 + uses: tarantool/setup-tarantool@v3 with: tarantool-version: ${{ matrix.tarantool }} + if: matrix.tarantool != 'master' - - name: Clone the module - uses: actions/checkout@v3 + - name: Get Tarantool master latest commit + if: matrix.tarantool == 'master' + run: | + commit_hash=$(git ls-remote https://github.com/tarantool/tarantool.git --branch master | head -c 8) + echo "LATEST_COMMIT=${commit_hash}" >> $GITHUB_ENV + shell: bash - - name: Install requirements - run: make deps + - name: Cache Tarantool master + if: matrix.tarantool == 'master' + id: cache-latest + uses: actions/cache@v3 + with: + path: "${GITHUB_WORKSPACE}/bin" + key: cache-latest-${{ env.LATEST_COMMIT }} + + - name: Setup Tarantool CE (master) + if: matrix.tarantool == 'master' && steps.cache-latest.outputs.cache-hit != 'true' + run: | + tt init + tt install tarantool master - - name: Install metrics - if: matrix.metrics != '' + - name: Add tt Tarantool to PATH + if: matrix.tarantool == 'master' + run: echo "${GITHUB_WORKSPACE}/bin" >> $GITHUB_PATH + + - name: Install requirements run: | - tarantoolctl rocks install metrics ${{ matrix.metrics }} + tarantool --version + make deps + + - name: Install Cartridge requirements + run: make deps-cartridge + if: ${{ matrix.cartridge }} + + - name: Install metrics requirements + run: make deps-metrics + if: ${{ matrix.metrics }} - name: Run linter run: make lint diff --git a/CHANGELOG.md b/CHANGELOG.md index 739c3ac..970c29f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. `tasks` table. The metric contains labels in the following format: `{name = "tube_name", state = "task_state"}` +- Role `roles.shareded-queue-router` for Tarantool 3 (#68). +- Role `roles.shareded-queue-storage` for Tarantool 3 (#68). ### Changed @@ -44,6 +46,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. `tnt_sharded_queue_api_statistics_tasks` (#71). The metric now has labels in the format `{name = "tube_name", state = "task_state"}` instead of `{name = "tube_name", status = "task_state"}`. +- The dependency `cartridge` is removed from the `rockspec` since the module + does not require it to work with Tarantool 3 (#68). ### Fixed diff --git a/Makefile b/Makefile index 5fe6522..93dd5f0 100644 --- a/Makefile +++ b/Makefile @@ -39,15 +39,19 @@ build: .PHONY: deps deps: - $(TTCTL) rocks install cartridge 2.8.0 + $(TTCTL) rocks install vshard 0.1.26 $(TTCTL) rocks install luacheck 0.26.0 $(TTCTL) rocks install luacov 0.13.0 $(TTCTL) rocks install luacov-coveralls 0.2.3-1 --server=http://luarocks.org - $(TTCTL) rocks install luatest 0.5.7 + $(TTCTL) rocks install luatest 1.0.1 -.PHONY: deps-full -deps-full: deps - tarantoolctl rocks install metrics 1.0.0 +.PHONY: deps-cartridge +deps-cartridge: deps-cartridge + $(TTCTL) rocks install cartridge 2.9.0 + +.PHONY: deps-metrics +deps-metrics: deps-metrics + $(TTCTL) rocks install metrics 1.0.0 .PHONY: lint lint: diff --git a/README.md b/README.md index 9730870..f73d998 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ - # Tarantool Sharded Queue Application -This module provides cartridge roles implementing of a distributed queue compatible with [Tarantool queue](https://github.com/tarantool/queue) (*fifiottl driver*) +This module provides roles for the Tarantool 3 and for the Tarantool +Cartridge implementing of a distributed queue compatible with [Tarantool queue](https://github.com/tarantool/queue) (*fifiottl driver*) ```mermaid flowchart TB @@ -50,9 +50,28 @@ flowchart TB end ``` -## Usage in a cartridge application +## Usage in a Tarantool 3 application + +You need to install the Tarantool 3.0.2+ or 3.1+. + +1. Add a dependency to your application rockspec. +2. Enable `roles.sharded-queue-router` role on all sharding `router` instances. +3. Enable `roles.sharded-queue-storage` role on all sharding `storage` + instances. +4. Configure tubes for both roles. +5. Do not forget to bootstrap the vshard in your application. See + [init.lua](./init.lua) as an example. + +You could see a full example of the configuration in the [config.yaml](./config.yaml). -1. Add dependency to your application rockspec +Be careful, it is impossible to create or drop tubes dynamically by API calls +with Tarantool 3. You need to update the role configuration instead. + +## Usage in a Tarantool Cartridge application + +1. Add a dependency to your application rockspec. You need to make sure that +the dependency of the Tarantool Cartridge 2.9.0 is enabled because the +`sharded-queue` does not have it by default. 2. Add roles to your application: ```init.lua cartridge.cfg({ @@ -74,13 +93,13 @@ cartridge.cfg({ ## Usage as a ready-to-deploy service Prepare `tt` environment: -``` +```shell tt init git clone https://github.com/tarantool/sharded-queue instances.enabled/sharded-queue ``` Run: -``` +```shell tt pack --app-list sharded-queue rpm --version 1.0.0 ``` @@ -90,10 +109,10 @@ For more details refer to [tt](https://github.com/tarantool/tt/) ## Usage from client perspective -The good old queue api is located on all instances of the router masters that we launched. -For a test configuration, this is one router on `localhost:3301` +The good old queue api is located on all instances of the router masters that +we launched. For a test configuration, this is one router on `localhost:3301` -``` +```shell tarantool@user:~/sharded_queue$ tarantool Tarantool 1.10.3-6-gfbf53b9 type 'help' for interactive help @@ -117,7 +136,7 @@ tarantool> queue_conn:call('queue.tube.test_tube:take') ``` -You may also set up tubes using cluster-wide config: +You may also set up tubes using Cartridge cluster-wide config: ```config.yml tubes: tube_1: @@ -136,27 +155,26 @@ supported: * `metrics` - enable or disable stats collection by metrics. metrics >= 0.11.0 is required. It is enabled by default. -## Running locally (as an example) +## Running locally with Tarantool 3 (as an example) Prepare `tt` environment: -``` +```shell tt init git clone https://github.com/tarantool/sharded-queue instances.enabled/sharded-queue ``` Install dependencies: -``` +```shell tt build sharded-queue ``` Start default configuration: -``` +```shell tt start sharded-queue -tt cartridge replicasets setup --bootstrap-vshard --name sharded-queue ``` To stop, say: -``` +```shell tt stop sharded-queue ``` @@ -164,8 +182,10 @@ tt stop sharded-queue Say: -``` +```shell make deps +make deps-cartridge # For Tarantool < 3. +make deps-metrics # For Tarantool < 3. make test ``` @@ -174,7 +194,7 @@ make test The module exports several metrics if the module `metrics` >= 0.11 is installed and the feature is not disabled by the configuration. -### Role sharded_queue.api +### Router (`roles.sharded-queue-router` or `sharded_queue.api` for the Cartridge) * Metric `tnt_sharded_queue_api_statistics_calls_total` is a counter with the number of requests broken down by [the type of request][queue-statistics]. @@ -205,7 +225,7 @@ installed and the feature is not disabled by the configuration. A list of possible call methods: `put`, `take`, `delete`, `release`, `touch`, `ack`, `bury`, `kick`, `peek`, `drop`. -### Role sharded_queue.storage +### Storage (`roles.sharded-queue-storage` or `sharded_queue.storage` for the Cartridge) * Metric `tnt_sharded_queue_storage_statistics_calls_total` is a counter with the number of requests broken down by [the type of request][queue-statistics]. @@ -269,5 +289,8 @@ installed and the feature is not disabled by the configuration. If you use **fifottl** driver (default), you can log driver's method calls with `log_request` (log router's and storage's operations). +* You can not create or drop tubes by API calls with Tarantool 3. You need + to update the role configuration instead. + [metrics-summary]: https://www.tarantool.io/en/doc/latest/book/monitoring/api_reference/#summary [queue-statistics]: https://github.com/tarantool/queue?tab=readme-ov-file#getting-statistics diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..28f22d5 --- /dev/null +++ b/config.yaml @@ -0,0 +1,77 @@ +credentials: + users: + client: + password: 'secret' + roles: [super] + replicator: + password: 'secret' + roles: [replication] + storage: + password: 'secret' + roles: [sharding] + +iproto: + advertise: + peer: + login: replicator + sharding: + login: storage + +sharding: + bucket_count: 3000 + +groups: + storages: + roles: ['roles.sharded-queue-storage'] + roles_cfg: + roles.sharded-queue-storage: + cfg: + metrics: true + tubes: + test_tube: {} + sharding: + roles: [storage] + replication: + failover: manual + replicasets: + storage-001: + leader: storage-001-a + instances: + storage-001-a: + iproto: + listen: + - uri: localhost:3302 + storage-001-b: + iproto: + listen: + - uri: localhost:3303 + storage-002: + leader: storage-002-a + instances: + storage-002-a: + iproto: + listen: + - uri: localhost:3304 + storage-002-b: + iproto: + listen: + - uri: localhost:3305 + routers: + roles: ['roles.sharded-queue-router'] + roles_cfg: + roles.sharded-queue-router: + cfg: + metrics: true + tubes: + test_tube: {} + app: + module: init + sharding: + roles: [router] + replicasets: + router-001: + instances: + router-001-a: + iproto: + listen: + - uri: localhost:3301 diff --git a/init.lua b/init.lua old mode 100755 new mode 100644 index 9949d38..b16873b --- a/init.lua +++ b/init.lua @@ -1,38 +1,13 @@ -#!/usr/bin/env tarantool - -require('strict').on() - -if package.setsearchroot ~= nil then - package.setsearchroot() -else - -- Workaround for rocks loading in tarantool 1.10 - -- It can be removed in tarantool > 2.2 - -- By default, when you do require('mymodule'), tarantool looks into - -- the current working directory and whatever is specified in - -- package.path and package.cpath. If you run your app while in the - -- root directory of that app, everything goes fine, but if you try to - -- start your app with "tarantool myapp/init.lua", it will fail to load - -- its modules, and modules from myapp/.rocks. - local fio = require('fio') - local app_dir = fio.abspath(fio.dirname(arg[0])) - package.path = app_dir .. '/?.lua;' .. package.path - package.path = app_dir .. '/?/init.lua;' .. package.path - package.path = app_dir .. '/.rocks/share/tarantool/?.lua;' .. package.path - package.path = app_dir .. '/.rocks/share/tarantool/?/init.lua;' .. package.path - package.cpath = app_dir .. '/?.so;' .. package.cpath - package.cpath = app_dir .. '/?.dylib;' .. package.cpath - package.cpath = app_dir .. '/.rocks/lib/tarantool/?.so;' .. package.cpath - package.cpath = app_dir .. '/.rocks/lib/tarantool/?.dylib;' .. package.cpath +local vshard = require('vshard') +local log = require('log') + +-- Bootstrap the vshard router. +while true do + local ok, err = vshard.router.bootstrap({ + if_not_bootstrapped = true, + }) + if ok then + break + end + log.info(('Router bootstrap error: %s'):format(err)) end - -local cartridge = require('cartridge') - -local ok, err = cartridge.cfg({ - roles = { - 'sharded_queue.storage', - 'sharded_queue.api' - }, -}, { -}) - -assert(ok, tostring(err)) diff --git a/instances.yaml b/instances.yaml new file mode 100644 index 0000000..910b35b --- /dev/null +++ b/instances.yaml @@ -0,0 +1,11 @@ +--- +storage-001-a: + +storage-001-b: + +storage-002-a: + +storage-002-b: + +router-001-a: + diff --git a/instances.yml b/instances.yml deleted file mode 100644 index c2bf49b..0000000 --- a/instances.yml +++ /dev/null @@ -1,24 +0,0 @@ -sharded-queue.router: - workdir: ./tmp/db_dev/3301 - advertise_uri: localhost:3301 - http_port: 8081 - -sharded-queue.s1-master: - workdir: ./tmp/db_dev/3302 - advertise_uri: localhost:3302 - http_port: 8082 - -sharded-queue.s1-replica: - workdir: ./tmp/db_dev/3303 - advertise_uri: localhost:3303 - http_port: 8083 - -sharded-queue.s2-master: - workdir: ./tmp/db_dev/3304 - advertise_uri: localhost:3304 - http_port: 8084 - -sharded-queue.s2-replica: - workdir: ./tmp/db_dev/3305 - advertise_uri: localhost:3305 - http_port: 8085 diff --git a/replicasets.yml b/replicasets.yml deleted file mode 100644 index a9af742..0000000 --- a/replicasets.yml +++ /dev/null @@ -1,20 +0,0 @@ -router: - instances: - - router - roles: - - sharded_queue.api -s-1: - instances: - - s1-master - - s1-replica - roles: - - sharded_queue.storage -s-2: - instances: - - s2-master - - s2-replica - roles: - - sharded_queue.storage - weight: 1 - all_rw: false - vshard_group: default diff --git a/roles/sharded-queue-router.lua b/roles/sharded-queue-router.lua new file mode 100644 index 0000000..66e2e9b --- /dev/null +++ b/roles/sharded-queue-router.lua @@ -0,0 +1,70 @@ +local config = require('sharded_queue.router.config') +local is_metrics_supported = require('sharded_queue.metrics').is_supported +local metrics = require('sharded_queue.router.metrics') +local roles = require('sharded_queue.roles') +local utils = require('sharded_queue.utils') +local queue = require('sharded_queue.router.queue') + +local role_name = "roles.sharded-queue-router" + +local function validate(conf) + if not roles.is_tarantool_role_supported() then + error(role_name .. ": the role is supported only for Tarantool 3.0.2 or newer") + end + if not roles.is_sharding_role_enabled('router') then + error(role_name .. ": instance must be a sharding router to use the role") + end + + conf = conf or {} + local ok, err = utils.validate_tubes(conf.tubes or {}, false) + if not ok then + error(role_name .. ": " .. err) + end + ok, err = utils.validate_cfg(conf.cfg or {}) + if not ok then + error(role_name .. ": " .. err) + end + return true +end + +local function apply(conf) + conf = conf or {} + + queue.export_globals() + + local conf_tubes = conf.tubes or {} + local conf_cfg = conf.cfg or {} + if conf_cfg.metrics ~= nil then + config.metrics = conf_cfg.metrics and true or false + else + config.metrics = is_metrics_supported() + end + + for tube_name, options in pairs(conf_tubes) do + if queue.map()[tube_name] == nil then + queue.add(tube_name, metrics, options) + end + end + + for tube_name, _ in pairs(queue.map()) do + if conf_tubes[tube_name] == nil then + queue.remove(tube_name) + end + end + + if config.metrics then + metrics.enable(queue) + else + metrics.disable() + end +end + +local function stop() + queue.clear_globals() +end + +return { + validate = validate, + apply = apply, + stop = stop, +} diff --git a/roles/sharded-queue-storage.lua b/roles/sharded-queue-storage.lua new file mode 100644 index 0000000..5679fbc --- /dev/null +++ b/roles/sharded-queue-storage.lua @@ -0,0 +1,81 @@ +local config = require('sharded_queue.storage.config') +local is_metrics_supported = require('sharded_queue.metrics').is_supported +local methods = require('sharded_queue.storage.methods') +local metrics = require('sharded_queue.storage.metrics') +local roles = require('sharded_queue.roles') +local stats_storage = require('sharded_queue.stats.storage') +local tubes = require('sharded_queue.storage.tubes').new() +local utils = require('sharded_queue.utils') + +local role_name = "roles.sharded-queue-storage" +local watcher = nil + +local function validate(conf) + if not roles.is_tarantool_role_supported() then + error(role_name .. ": the role is supported only for Tarantool 3.0.2 or newer") + end + if not roles.is_sharding_role_enabled('storage') then + error(role_name .. ": instance must be a sharding storage to use the role") + end + + conf = conf or {} + local ok, err = utils.validate_tubes(conf.tubes or {}, true) + if not ok then + error(role_name .. ": " .. err) + end + ok, err = utils.validate_cfg(conf.cfg or {}) + if not ok then + error(role_name .. ": " .. err) + end + return true +end + +local function apply(conf) + conf = conf or {} + + local conf_tubes = conf.tubes or {} + local conf_cfg = conf.cfg or {} + if conf_cfg.metrics ~= nil then + config.metrics = conf_cfg.metrics and true or false + else + config.metrics = is_metrics_supported() + end + + if watcher ~= nil then + watcher:unregister() + end + watcher = box.watch('box.status', function(_, status) + if status.is_ro == false then + stats_storage.init() + + local new = tubes:update(conf_tubes) + for _, tube in ipairs(new) do + stats_storage.reset(tube) + end + + if config.metrics then + metrics.enable(tubes:map()) + end + methods.init(metrics, tubes) + end + end) + + if config.metrics then + metrics.enable(tubes:map()) + else + metrics.disable() + end +end + +local function stop() + if watcher ~= nil then + watcher:unregister() + watcher = nil + end +end + +return { + validate = validate, + apply = apply, + stop = stop, +} diff --git a/sharded-queue-scm-1.rockspec b/sharded-queue-scm-1.rockspec index 7b8dcfb..a87bb09 100755 --- a/sharded-queue-scm-1.rockspec +++ b/sharded-queue-scm-1.rockspec @@ -5,8 +5,8 @@ source = { branch = 'master'; } dependencies = { - 'lua >= 5.1'; - 'cartridge >= 2.0.0, < 3.0.0', + 'lua >= 5.1', + 'vshard >= 0.1.26-1', } external_dependencies = { @@ -20,6 +20,8 @@ build = { build_target = 'all', install = { lua = { + ["roles.sharded-queue-router"] = "roles/sharded-queue-router.lua", + ["roles.sharded-queue-storage"] = "roles/sharded-queue-storage.lua", ['sharded_queue.api'] = 'sharded_queue/api.lua', ['sharded_queue.storage'] = 'sharded_queue/storage.lua', ['sharded_queue.drivers.fifo'] = 'sharded_queue/drivers/fifo.lua', @@ -27,6 +29,7 @@ build = { ['sharded_queue.time'] = 'sharded_queue/time.lua', ['sharded_queue.utils'] = 'sharded_queue/utils.lua', ['sharded_queue.metrics'] = 'sharded_queue/metrics.lua', + ['sharded_queue.roles'] = 'sharded_queue/roles.lua', ['sharded_queue.stash'] = 'sharded_queue/stash.lua', ['sharded_queue.state'] = 'sharded_queue/state.lua', ['sharded_queue.stats.storage'] = 'sharded_queue/stats/storage.lua', @@ -39,6 +42,7 @@ build = { ['sharded_queue.storage.methods'] = 'sharded_queue/storage/methods.lua', ['sharded_queue.storage.metrics'] = 'sharded_queue/storage/metrics.lua', ['sharded_queue.storage.tubes'] = 'sharded_queue/storage/tubes.lua', + ['sharded_queue.storage.vshard_utils'] = 'sharded_queue/storage/vshard_utils.lua', ['sharded_queue.version'] = 'sharded_queue/version.lua', }, }, diff --git a/sharded_queue/drivers/fifo.lua b/sharded_queue/drivers/fifo.lua index edb2ff7..16bf653 100644 --- a/sharded_queue/drivers/fifo.lua +++ b/sharded_queue/drivers/fifo.lua @@ -2,6 +2,7 @@ local state = require('sharded_queue.state') local utils = require('sharded_queue.utils') local log = require('log') -- luacheck: ignore local stats = require('sharded_queue.stats.storage') +local vshard_utils = require('sharded_queue.storage.vshard_utils') local function update_stat(tube_name, name) stats.update(tube_name, name, '+', 1) @@ -10,6 +11,7 @@ end local method = {} local function tube_create(args) + local user = vshard_utils.get_this_replica_user() or 'guest' local space_options = {} local if_not_exists = args.options.if_not_exists or true space_options.if_not_exists = if_not_exists @@ -47,6 +49,9 @@ local function tube_create(args) if_not_exists = if_not_exists }) + box.schema.user.grant(user, 'read,write', 'space', args.name, + {if_not_exists = true}) + return space end diff --git a/sharded_queue/drivers/fifottl.lua b/sharded_queue/drivers/fifottl.lua index e6afca2..c7be755 100644 --- a/sharded_queue/drivers/fifottl.lua +++ b/sharded_queue/drivers/fifottl.lua @@ -3,6 +3,7 @@ local state = require('sharded_queue.state') local utils = require('sharded_queue.utils') local stats = require('sharded_queue.stats.storage') local time = require('sharded_queue.time') +local vshard_utils = require('sharded_queue.storage.vshard_utils') local log = require('log') -- luacheck: ignore local index = { @@ -136,6 +137,7 @@ end -- QUEUE METHODs -- local function tube_create(args) + local user = vshard_utils.get_this_replica_user() or 'guest' local space_options = {} local if_not_exists = args.options.if_not_exists or true space_options.if_not_exists = if_not_exists @@ -201,6 +203,9 @@ local function tube_create(args) if_not_exists = if_not_exists }) + box.schema.user.grant(user, 'read,write', 'space', args.name, + {if_not_exists = true}) + -- run fiber for tracking event fiber.create(fiber_common, args.name) end diff --git a/sharded_queue/roles.lua b/sharded_queue/roles.lua new file mode 100644 index 0000000..8dc62b5 --- /dev/null +++ b/sharded_queue/roles.lua @@ -0,0 +1,26 @@ +local config = require('config') +local utils = require('sharded_queue.utils') + +local function is_sharding_role_enabled(expected) + local sharding_roles = config:get('sharding.roles') + for _, role in ipairs(sharding_roles or {}) do + if role == expected then + return true + end + end + return false +end + +local function is_tarantool_role_supported() + local major, minor, patch = utils.get_tarantool_version() + if major <= 2 + or major == 3 and minor == 0 and patch <= 1 then + return false + end + return true +end + +return { + is_sharding_role_enabled = is_sharding_role_enabled, + is_tarantool_role_supported = is_tarantool_role_supported, +} diff --git a/sharded_queue/router/queue.lua b/sharded_queue/router/queue.lua index c92d86f..accccb7 100644 --- a/sharded_queue/router/queue.lua +++ b/sharded_queue/router/queue.lua @@ -46,35 +46,7 @@ end -- The Tarantool 3.0 does not support to update dinamically a configuration, so -- a user must update the configuration by itself. if is_cartridge_package then - local function validate_options(options) - if not options then return true end - - if options.wait_factor then - if type(options.wait_factor) ~= 'number' - or options.wait_factor < 1 - then - return false, "wait_factor must be number greater than or equal to 1" - end - end - - local _, err = utils.normalize.log_request(options.log_request) - if err then - return false, err - end - - if options.wait_max ~= nil then - local err - options.wait_max, err = utils.normalize.wait_max(options.wait_max) - if err ~= nil then - return false, err - end - end - - return true - end - queue_global.create_tube = function(tube_name, options) - require('log').info("CREATE TUBE") local tubes = cartridge.config_get_deepcopy('tubes') or {} if tube_name == 'cfg' then @@ -85,7 +57,7 @@ if is_cartridge_package then return nil end - local ok , err = validate_options(options) + local ok, err = utils.validate_options(options) if not ok then error(err) end options = table.deepcopy(options or {}) @@ -107,6 +79,10 @@ local function export_globals() rawset(_G, 'queue', queue_global) end +local function clear_globals() + rawset(_G, 'queue', nil) +end + local function add(name, metrics, options) queue_global.tube[name] = tube.new(name, metrics, options) end diff --git a/sharded_queue/stats/storage.lua b/sharded_queue/stats/storage.lua index d2184e8..320e972 100644 --- a/sharded_queue/stats/storage.lua +++ b/sharded_queue/stats/storage.lua @@ -1,6 +1,7 @@ ---- Module used to store storage-specific statistics. local state = require('sharded_queue.state') +local vshard_utils = require('sharded_queue.storage.vshard_utils') local statistics = {} @@ -17,6 +18,7 @@ local actions = { } function statistics.init() + local user = vshard_utils.get_this_replica_user() or 'guest' local space_stat = box.schema.space.create('_queue_statistics', { if_not_exists = true }) space_stat:format({ @@ -39,6 +41,9 @@ function statistics.init() }, if_not_exists = true }) + + box.schema.user.grant(user, 'read,write', 'space', '_queue_statistics', + {if_not_exists = true}) end function statistics.update(tube_name, stat_name, operation, value) diff --git a/sharded_queue/storage/methods.lua b/sharded_queue/storage/methods.lua index 63e8283..6791888 100644 --- a/sharded_queue/storage/methods.lua +++ b/sharded_queue/storage/methods.lua @@ -1,4 +1,6 @@ local fiber = require('fiber') +local vshard_utils = require('sharded_queue.storage.vshard_utils') + local stats_storage = require('sharded_queue.stats.storage') local methods = { @@ -15,10 +17,15 @@ local methods = { } local function init(metrics, tubes) + local user = vshard_utils.get_this_replica_user() or 'guest' + for _, method in pairs(methods) do local func = function(args) args = args or {} args.options = tubes:get_options(args.tube_name) or {} + if args.options.priority == nil and args.options.pri ~= nil then + args.options.priority = args.options.pri + end local tube_name = args.tube_name local before = fiber.clock() @@ -37,6 +44,8 @@ local function init(metrics, tubes) local global_name = 'tube_' .. method rawset(_G, global_name, func) box.schema.func.create(global_name, { if_not_exists = true }) + box.schema.user.grant(user, 'execute', 'function', global_name, + {if_not_exists = true}) end local tube_statistic_func = function(args) @@ -55,6 +64,8 @@ local function init(metrics, tubes) rawset(_G, 'tube_statistic', tube_statistic_func) box.schema.func.create('tube_statistic', { if_not_exists = true }) + box.schema.user.grant(user, 'execute', 'function', 'tube_statistic', + {if_not_exists = true}) end local function get_list() diff --git a/sharded_queue/storage/vshard_utils.lua b/sharded_queue/storage/vshard_utils.lua new file mode 100644 index 0000000..98f359d --- /dev/null +++ b/sharded_queue/storage/vshard_utils.lua @@ -0,0 +1,93 @@ +-- The source is copied from: +-- https://github.com/tarantool/crud/blob/99315a53ef75056ac057b63f5ce1bbadef6a1418/crud/common/vshard_utils.lua + +local luri = require('uri') + +local vshard = require('vshard') + +local vshard_utils = {} + +function vshard_utils.get_self_vshard_replicaset() + local box_info = box.info() + + local ok, storage_info = pcall(vshard.storage.info) + assert(ok, 'vshard.storage.cfg() must be called first') + + if vshard_utils.get_vshard_identification_mode() == 'name_as_key' then + local replicaset_name = box_info.replicaset.name + + return replicaset_name, storage_info.replicasets[replicaset_name] + else + local replicaset_uuid + if box_info.replicaset ~= nil then + replicaset_uuid = box_info.replicaset.uuid + else + replicaset_uuid = box_info.cluster.uuid + end + + return replicaset_uuid, storage_info.replicasets[replicaset_uuid] + end +end + +function vshard_utils.get_self_vshard_replica_id() + local box_info = box.info() + + if vshard_utils.get_vshard_identification_mode() == 'name_as_key' then + return box_info.name + else + return box_info.uuid + end +end + +function vshard_utils.get_replicaset_id(vshard_router, replicaset) + -- https://github.com/tarantool/vshard/issues/460. + local known_replicasets = vshard_router:routeall() + + for known_replicaset_id, known_replicaset in pairs(known_replicasets) do + if known_replicaset == replicaset then + return known_replicaset_id + end + end + + return nil +end + +function vshard_utils.get_vshard_identification_mode() + -- https://github.com/tarantool/vshard/issues/460. + assert(vshard.storage.internal.current_cfg ~= nil, 'available only on vshard storage') + return vshard.storage.internal.current_cfg.identification_mode +end + +function vshard_utils.get_this_replica_user() + local replicaset_key, replicaset = vshard_utils.get_self_vshard_replicaset() + + if replicaset == nil or replicaset.master == nil then + error(string.format( + 'Failed to find a vshard configuration ' .. + 'for storage replicaset with key %q.', + replicaset_key)) + end + + local uri + if replicaset.master == 'auto' then + -- https://github.com/tarantool/vshard/issues/467. + uri = vshard.storage.internal.this_replica.uri + else + uri = replicaset.master.uri + end + + return luri.parse(uri).login +end + +function vshard_utils.get_replicaset_master(replicaset, opts) + opts = opts or {} + local cached = opts.cached or false + + if (not cached) and replicaset.locate_master ~= nil then + replicaset:locate_master() + end + + return replicaset.master +end + +return vshard_utils diff --git a/sharded_queue/utils.lua b/sharded_queue/utils.lua index e1999e3..b6c31da 100644 --- a/sharded_queue/utils.lua +++ b/sharded_queue/utils.lua @@ -74,6 +74,33 @@ function utils.normalize.wait_max(wait_max) return wait_max end +function utils.validate_options(options) + if not options then return true end + + if options.wait_factor then + if type(options.wait_factor) ~= 'number' + or options.wait_factor < 1 + then + return false, "wait_factor must be number greater than or equal to 1" + end + end + + local _, err = utils.normalize.log_request(options.log_request) + if err then + return false, err + end + + if options.wait_max ~= nil then + local err + options.wait_max, err = utils.normalize.wait_max(options.wait_max) + if err ~= nil then + return false, err + end + end + + return true +end + function utils.validate_tubes(tubes, on_storage) for tube_name, tube_opts in pairs(tubes) do if tube_opts.driver ~= nil then @@ -89,6 +116,10 @@ function utils.validate_tubes(tubes, on_storage) end end end + local ok, err = utils.validate_options(tube_opts) + if not ok then + return nil, err + end end return true @@ -114,4 +145,15 @@ function utils.validate_cfg(cfg) return true end +function utils.get_tarantool_version() + local version_parts = rawget(_G, '_TARANTOOL'):split('-', 3) + + local major_minor_patch_parts = version_parts[1]:split('.', 2) + local major = tonumber(major_minor_patch_parts[1]) + local minor = tonumber(major_minor_patch_parts[2]) + local patch = tonumber(major_minor_patch_parts[3]) + + return major, minor, patch +end + return utils diff --git a/test/api_test.lua b/test/api_test.lua index fdf7794..165ca04 100644 --- a/test/api_test.lua +++ b/test/api_test.lua @@ -3,20 +3,21 @@ local t = require('luatest') local g = t.group('api') -local api = require('sharded_queue.api') -local config = require('test.helper.config') +local _, api = pcall(require, 'sharded_queue.api') +local helper = require('test.helper') local utils = require('test.helper.utils') local tube = require('sharded_queue.router.tube') local is_metrics_supported = utils.is_metrics_supported() g.before_all(function() - g.queue_conn = config.cluster:server('queue-router').net_box - g.queue_conn_ro = config.cluster:server('queue-router-1').net_box - g.cfg = g.queue_conn:eval("return require('sharded_queue.api').cfg") + t.skip_if(utils.is_tarantool_3(), 'the role is available only for Cartridge') + g.queue_conn = helper.get_evaler('queue-router') + g.queue_conn_ro = helper.get_evaler('queue-router-1') + g.cfg = helper.get_cfg() end) g.after_each(function() - g.queue_conn:eval("require('sharded_queue.api').cfg(...)", {g.cfg}) + helper.set_cfg(g.cfg) end) g.test_exported_api = function() @@ -115,8 +116,8 @@ g.test_role_statistics_read_only_router = function() end g.test_api_version = function() - local api_conn = config.cluster:server('queue-router').net_box - local storage_conn = config.cluster:server('queue-storage-1-0').net_box + local api_conn = helper.get_evaler('queue-router') + local storage_conn = helper.get_evaler('queue-storage-1-0') local api_version = api_conn:eval( "return require('sharded_queue.api')._VERSION" diff --git a/test/create_test.lua b/test/create_test.lua index fc8bcb2..bd1fe1d 100644 --- a/test/create_test.lua +++ b/test/create_test.lua @@ -1,11 +1,10 @@ local t = require('luatest') local g = t.group('create_test') -local config = require('test.helper.config') +local helper = require('test.helper') g.before_all(function() - g.api = config.cluster:server('queue-router').net_box - g.storage = config.cluster:server('queue-storage-1-0').net_box + g.storage = helper.get_evaler('queue-storage-1-0') end) for test_name, options in pairs({ @@ -19,9 +18,7 @@ for test_name, options in pairs({ }) do g['test_create_tube_defauls_' .. test_name] = function() local tube_name = 'creates_tube_defaults_' .. test_name .. '_test' - g.api:call('queue.create_tube', { - tube_name, options - }) + helper.create_tube(tube_name, options) local space = g.storage:eval(string.format([[ local space = box.space.%s @@ -61,9 +58,7 @@ for test_name, options in pairs({ }) do g['test_create_tube_opts' .. test_name] = function() local tube_name = 'create_tube_opts_' .. test_name .. '_test' - g.api:call('queue.create_tube', { - tube_name, options - }) + helper.create_tube(tube_name, options) local space = g.storage:eval(string.format([[ local space = box.space.%s diff --git a/test/drop_test.lua b/test/drop_test.lua index baeb65d..d3621e3 100644 --- a/test/drop_test.lua +++ b/test/drop_test.lua @@ -1,20 +1,18 @@ local t = require('luatest') local g = t.group('drop_test') -local config = require('test.helper.config') +local helper = require('test.helper') local utils = require('test.helper.utils') g.before_all(function() - g.queue_conn = config.cluster:server('queue-router').net_box + g.queue_conn = helper.get_evaler('queue-router') end) function g.test_drop_empty() local tube_name = 'drop_empty_test' - g.queue_conn:call('queue.create_tube', { - tube_name - }) - g.queue_conn:call(utils.shape_cmd(tube_name, 'drop')) + helper.create_tube(tube_name) + helper.drop_tube(tube_name) local cur_stat = g.queue_conn:call('queue.statistics', { tube_name }) t.assert_equals(cur_stat, nil) @@ -23,20 +21,15 @@ end function g.test_drop_and_recreate() local tube_name = 'drop_and_recreate_test' - g.queue_conn:call('queue.create_tube', { - tube_name - }) + helper.create_tube(tube_name) g.queue_conn:call(utils.shape_cmd(tube_name, 'put'), { '*' } ) - g.queue_conn:call(utils.shape_cmd(tube_name, 'drop')) + helper.drop_tube(tube_name) - -- recreate tube with same name - t.assert(g.queue_conn:call('queue.create_tube', { - tube_name - })) + -- Recreate tube with same name. + helper.create_tube(tube_name) local task = g.queue_conn:call(utils.shape_cmd(tube_name, 'put'), { '*' } ) t.assert_equals(task[utils.index.data], '*') t.assert_equals(task[utils.index.status], utils.state.READY) - -end \ No newline at end of file +end diff --git a/test/entrypoint/config.yml b/test/entrypoint/config.yml new file mode 100644 index 0000000..ccaf6d6 --- /dev/null +++ b/test/entrypoint/config.yml @@ -0,0 +1,70 @@ +credentials: + users: + guest: + roles: [super] + storage: + roles: [sharding] + password: "storage" + +iproto: + listen: + - uri: 'unix/:./{{ instance_name }}.iproto' + advertise: + sharding: + login: 'storage' + password: 'storage' + +sharding: + bucket_count: 1234 + sched_ref_quota: 258 + +groups: + group-001: + replicasets: + replicaset-001: + sharding: + roles: [router] + roles: + - roles.sharded-queue-router + database: + replicaset_uuid: 'aaaaaaaa-0000-4000-b000-000000000000' + instances: + queue-router: + database: + instance_uuid: 'aaaaaaaa-aaaa-4000-b000-000000000001' + mode: rw + queue-router-1: + database: + instance_uuid: 'aaaaaaaa-aaaa-4000-b000-000000000002' + replicaset-002: + sharding: + roles: [storage] + roles: + - roles.sharded-queue-storage + database: + replicaset_uuid: 'bbbbbbbb-bbbb-4000-b000-000000000001' + instances: + queue-storage-1-0: + database: + instance_uuid: 'bbbbbbbb-bbbb-4000-b000-000000000001' + mode: rw + queue-storage-1-1: + database: + instance_uuid: 'bbbbbbbb-bbbb-4000-b000-000000000002' + mode: ro + replicaset-003: + sharding: + roles: [storage] + roles: + - roles.sharded-queue-storage + database: + replicaset_uuid: 'cccccccc-0000-4000-b000-000000000000' + instances: + queue-storage-2-0: + database: + instance_uuid: 'cccccccc-cccc-4000-b000-000000000001' + mode: rw + queue-storage-2-1: + database: + instance_uuid: 'cccccccc-cccc-4000-b000-000000000002' + mode: ro diff --git a/test/helper/config.lua b/test/helper/config_cartridge.lua similarity index 75% rename from test/helper/config.lua rename to test/helper/config_cartridge.lua index fec4c82..6b8b8e3 100644 --- a/test/helper/config.lua +++ b/test/helper/config_cartridge.lua @@ -1,6 +1,8 @@ local fio = require('fio') local t = require('luatest') local cartridge_helpers = require('cartridge.test-helpers') +local utils = require('test.helper.utils') + require('json').cfg { encode_use_tostring = true } local config = {} @@ -8,7 +10,6 @@ local config = {} config.root = fio.dirname(fio.abspath(package.search('init'))) config.datadir = fio.pathjoin(config.root, 'dev') -config.unitdir = fio.pathjoin(config.datadir, 'unit') config.cluster = cartridge_helpers.Cluster:new({ datadir = config.datadir, @@ -82,18 +83,45 @@ config.cluster = cartridge_helpers.Cluster:new({ } }) -t.before_suite(function () +function config.get_cfg() + return config.eval('queue-router', "return require('sharded_queue.api').cfg") +end + +function config.set_cfg(cfg) + config.eval('queue-router', "require('sharded_queue.api').cfg(...)", + {cfg}) +end + +function config.create_tube(tube_name, options) + config.eval('queue-router', "queue.create_tube(...)", {tube_name, options}) +end + +function config.drop_tube(tube_name) + pcall(function() + config.eval('queue-router', utils.shape_cmd(tube_name, 'drop') .. "()") + end) +end + +function config.eval(server, ...) + return config.cluster:server(server).net_box:eval(...) +end + +function config.get_evaler(server) + return config.cluster:server(server).net_box +end + +t.before_suite(function() fio.rmtree(config.datadir) fio.mktree(config.datadir) config.cluster:start() - - fio.mktree(config.unitdir) - box.cfg{ - work_dir=config.unitdir, - wal_mode='none' - } + config.servers = {} + for _, srv in pairs(config.cluster.servers) do + config.servers[srv.alias] = srv + end end) -t.after_suite(function () config.cluster:stop() end) +t.after_suite(function() + config.cluster:stop() +end) return config diff --git a/test/helper/config_tarantool.lua b/test/helper/config_tarantool.lua new file mode 100644 index 0000000..d5e4ddb --- /dev/null +++ b/test/helper/config_tarantool.lua @@ -0,0 +1,189 @@ +local fio = require('fio') +local t = require('luatest') +local fun = require('fun') +local yaml = require('yaml') + +local server = require('test.helper.server') + +local config = {} + +local roles = { + 'roles.sharded-queue-router', + 'roles.sharded-queue-storage', +} +config.root = fio.dirname(fio.abspath(package.search('init'))) +config.datadir = fio.pathjoin(config.root, 'dev') +config.configpath = fio.pathjoin(config.root, 'test', 'entrypoint', 'config.yml') +config.devconfigpath = fio.pathjoin(config.root, 'dev', 'config.yml') + +function config.start_example_replicaset() + local opts = { + config_file = config.devconfigpath, + chdir = config.datadir, + } + local servers = { + "queue-router", + "queue-router-1", + "queue-storage-1-0", + "queue-storage-1-1", + "queue-storage-2-0", + "queue-storage-2-1", + } + config.servers = {} + for _, server_name in ipairs(servers) do + local server_opts = fun.chain(opts, {alias = server_name}):tomap() + config.servers[server_name] = server:new(server_opts) + end + + for _, srv in pairs(config.servers) do + srv:start({wait_until_ready = false}) + end + + for _, srv in pairs(config.servers) do + srv:wait_until_ready() + end + config.bootstrap() +end + +function config.stop_example_replicaset() + for _, srv in pairs(config.servers) do + srv:drop() + end +end + +function config.bootstrap() + local routers = { + config.servers['queue-router'], + config.servers['queue-router-1'], + } + for _, router in ipairs(routers) do + router:eval("require('vshard').router.bootstrap({if_not_bootstrapped = true})") + end +end + +function config.reload() + for _, srv in pairs(config.servers) do + srv:eval("require('config'):reload()") + end + for _, srv in pairs(config.servers) do + srv:wait_until_ready() + end + config.bootstrap() +end + +local function read_config(path) + local src = fio.open(path) + local data = src:read() + src:close() + return yaml.decode(data) +end + +local function write_config(path, decoded) + local dst = fio.open(path, {'O_CREAT', 'O_WRONLY', 'O_TRUNC'}) + dst:write(yaml.encode(decoded)) + dst:close() +end + +function config.get_cfg() + local decoded = read_config(config.devconfigpath) + + if decoded['roles_cfg'] ~= nil and decoded['roles_cfg'][roles[1]] then + return decoded['roles_cfg'][roles[1]]['cfg'] + end + return nil +end + +function config.set_cfg(cfg) + local decoded = read_config(config.devconfigpath) + + if decoded['roles_cfg'] == nil then + decoded['roles_cfg'] = {} + end + for _, role in ipairs(roles) do + if decoded['roles_cfg'][role] == nil then + decoded['roles_cfg'][role] = {} + end + decoded['roles_cfg'][role]['cfg'] = cfg + end + + write_config(config.devconfigpath, decoded) + + config.reload() +end + +function config.create_tube(tube_name, options) + options = options or {} + + local decoded = read_config(config.devconfigpath) + + if decoded['roles_cfg'] == nil then + decoded['roles_cfg'] = {} + end + for _, role in ipairs(roles) do + if decoded['roles_cfg'][role] == nil then + decoded['roles_cfg'][role] = {} + end + if decoded['roles_cfg'][role]['tubes'] == nil then + decoded['roles_cfg'][role]['tubes'] = {} + end + decoded['roles_cfg'][role]['tubes'][tube_name] = {} + for k, v in pairs(options or {}) do + decoded['roles_cfg'][role]['tubes'][tube_name][k] = v + end + end + + write_config(config.devconfigpath, decoded) + + config.reload() +end + +function config.drop_tube(tube_name) + local decoded = read_config(config.devconfigpath) + + if decoded['roles_cfg'] == nil then + return + end + for _, role in ipairs(roles) do + if decoded['roles_cfg'][role] == nil then + return + end + if decoded['roles_cfg'][role]['tubes'] == nil then + return + end + if decoded['roles_cfg'][role]['tubes'][tube_name] == nil then + return + end + decoded['roles_cfg'][role]['tubes'][tube_name] = nil + end + + write_config(config.devconfigpath, decoded) + + config.reload() +end + +function config.eval(server_name, ...) + return config.servers[server_name]:eval(...) +end + +function config.get_evaler(server_name) + return config.servers[server_name] +end + +t.before_suite(function() + fio.rmtree(config.datadir) + fio.mktree(config.datadir) + + fio.copyfile(config.configpath, config.devconfigpath) + fio.copytree(fio.pathjoin(config.root, 'roles'), + fio.pathjoin(config.datadir, 'roles')) + fio.copytree(fio.pathjoin(config.root, 'sharded_queue'), + fio.pathjoin(config.datadir, 'sharded_queue')) + + config.start_example_replicaset() +end) + +t.after_suite(function() + config.stop_example_replicaset() +end) + +return config diff --git a/test/helper/init.lua b/test/helper/init.lua new file mode 100644 index 0000000..b201c12 --- /dev/null +++ b/test/helper/init.lua @@ -0,0 +1,7 @@ +local utils = require('test.helper.utils') + +if utils.is_tarantool_3() then + return require('test.helper.config_tarantool') +else + return require('test.helper.config_cartridge') +end diff --git a/test/helper/server.lua b/test/helper/server.lua new file mode 100644 index 0000000..7cc5f2c --- /dev/null +++ b/test/helper/server.lua @@ -0,0 +1,194 @@ +-- https://github.com/tarantool/tarantool/blob/5040fba9cf1da942371721e36e81c7372699600c/test/luatest_helpers/server.lua +local fun = require('fun') +local yaml = require('yaml') +local urilib = require('uri') +local fio = require('fio') +local luatest = require('luatest') + +-- Join paths in an intuitive way. +-- +-- If a component is nil, it is skipped. +-- +-- If a component is an absolute path, it skips all the previous +-- components. +-- +-- The wrapper is written for two components for simplicity. +local function pathjoin(a, b) + -- No first path -- skip it. + if a == nil then + return b + end + -- No second path -- skip it. + if b == nil then + return a + end + -- The absolute path is checked explicitly due to gh-8816. + if b:startswith('/') then + return b + end + return fio.pathjoin(a, b) +end + +-- Determine advertise URI for given instance from a cluster +-- configuration. +local function find_advertise_uri(config, instance_name, dir) + if config == nil or next(config) == nil then + return nil + end + + -- Determine listen and advertise options that are in effect + -- for the given instance. + local advertise + local listen + + for _, group in pairs(config.groups or {}) do + for _, replicaset in pairs(group.replicasets or {}) do + local instance = (replicaset.instances or {})[instance_name] + if instance == nil then + break + end + if instance.iproto ~= nil then + if instance.iproto.advertise ~= nil then + advertise = advertise or instance.iproto.advertise.client + end + listen = listen or instance.iproto.listen + end + if replicaset.iproto ~= nil then + if replicaset.iproto.advertise ~= nil then + advertise = advertise or replicaset.iproto.advertise.client + end + listen = listen or replicaset.iproto.listen + end + if group.iproto ~= nil then + if group.iproto.advertise ~= nil then + advertise = advertise or group.iproto.advertise.client + end + listen = listen or group.iproto.listen + end + end + end + + if config.iproto ~= nil then + if config.iproto.advertise ~= nil then + advertise = advertise or config.iproto.advertise.client + end + listen = listen or config.iproto.listen + end + + local uris + if advertise ~= nil then + uris = {{uri = advertise}} + else + uris = listen + end + + for _, uri in ipairs(uris or {}) do + uri = table.copy(uri) + uri.uri = uri.uri:gsub('{{ *instance_name *}}', instance_name) + uri.uri = uri.uri:gsub('unix/:%./', ('unix/:%s/'):format(dir)) + local u = urilib.parse(uri) + if u.ipv4 ~= '0.0.0.0' and u.ipv6 ~= '::' and u.service ~= '0' then + return uri + end + end + error('No suitable URI to connect is found') +end + +local Server = luatest.Server:inherit({}) + +-- Adds the following options: +-- +-- * config_file (string) +-- +-- An argument of the `--config <...>` CLI option. +-- +-- Used to deduce advertise URI to connect net.box to the +-- instance. +-- +-- The special value '' means running without `--config <...>` +-- CLI option (but still pass `--name `). +-- * remote_config (table) +-- +-- If `config_file` is not passed, this config value is used to +-- deduce the advertise URI to connect net.box to the instance. +Server.constructor_checks = fun.chain(Server.constructor_checks, { + config_file = 'string', + remote_config = '?table', +}):tomap() + +function Server:initialize() + if self.config_file ~= nil then + self.command = arg[-1] + + self.args = fun.chain(self.args or {}, { + '--name', self.alias + }):totable() + + if self.config_file ~= '' then + table.insert(self.args, '--config') + table.insert(self.args, self.config_file) + + -- Take into account self.chdir to calculate a config + -- file path. + local config_file_path = pathjoin(self.chdir, self.config_file) + + -- Read the provided config file. + local fh, err = fio.open(config_file_path, {'O_RDONLY'}) + if fh == nil then + error(('Unable to open file %q: %s'):format(config_file_path, + err)) + end + self.config = yaml.decode(fh:read()) + fh:close() + end + + if self.net_box_uri == nil then + local config = self.config or self.remote_config + + -- NB: listen and advertise URIs are relative to + -- process.work_dir, which, in turn, is relative to + -- self.chdir. + local work_dir + if config.process ~= nil and config.process.work_dir ~= nil then + work_dir = config.process.work_dir + end + local dir = pathjoin(self.chdir, work_dir) + self.net_box_uri = find_advertise_uri(config, self.alias, dir) + end + end + getmetatable(getmetatable(self)).initialize(self) +end + +function Server:connect_net_box() + getmetatable(getmetatable(self)).connect_net_box(self) + + if self.config_file == nil then + return + end + + if not self.net_box then + return + end + + -- Replace the ready condition. + local saved_eval = self.net_box.eval + self.net_box.eval = function(self, expr, args, opts) -- luacheck: ignore + if expr == 'return _G.ready' then + expr = "return require('config'):info().status == 'ready' or " .. + "require('config'):info().status == 'check_warnings'" + end + return saved_eval(self, expr, args, opts) + end +end + +-- Enable the startup waiting if the advertise URI of the instance +-- is determined. +function Server:start(opts) + opts = opts or {} + if self.config_file and opts.wait_until_ready == nil then + opts.wait_until_ready = self.net_box_uri ~= nil + end + getmetatable(getmetatable(self)).start(self, opts) +end + +return Server diff --git a/test/helper/utils.lua b/test/helper/utils.lua index 7b5c8fd..7a55387 100644 --- a/test/helper/utils.lua +++ b/test/helper/utils.lua @@ -48,4 +48,20 @@ function utils.is_metrics_supported() return metrics.unregister_callback and counter.remove and true or false end +local function get_tarantool_version() + local version_parts = rawget(_G, '_TARANTOOL'):split('-', 3) + + local major_minor_patch_parts = version_parts[1]:split('.', 2) + local major = tonumber(major_minor_patch_parts[1]) + local minor = tonumber(major_minor_patch_parts[2]) + local patch = tonumber(major_minor_patch_parts[3]) + + return major, minor, patch +end + +function utils.is_tarantool_3() + local major = get_tarantool_version() + return major == 3 +end + return utils diff --git a/test/metrics_test.lua b/test/metrics_test.lua index 09c8082..1114c49 100644 --- a/test/metrics_test.lua +++ b/test/metrics_test.lua @@ -1,13 +1,13 @@ local t = require('luatest') local g = t.group('metrics_test') -local config = require('test.helper.config') +local helper = require('test.helper') local json = require('json') local utils = require('test.helper.utils') g.before_all(function() - g.queue_conn = config.cluster:server('queue-router').net_box - g.cfg = g.queue_conn:eval("return require('sharded_queue.api').cfg") + g.queue_conn = helper.get_evaler('queue-router') + g.cfg = helper.get_cfg() end) g.before_each(function() @@ -17,7 +17,7 @@ g.before_each(function() end) g.after_each(function() - g.queue_conn:eval("require('sharded_queue.api').cfg(...)", {g.cfg}) + helper.set_cfg(g.cfg) end) local function filter_metrics(metrics, labels) @@ -45,10 +45,11 @@ local metrics = require('metrics') metrics.invoke_callbacks() return metrics.collect() ]] + if instance == nil then metrics = g.queue_conn:eval(eval) else - metrics = config.cluster:server(instance).net_box:eval(eval) + metrics = helper.eval(instance, eval) end for _, v in ipairs(metrics) do @@ -67,6 +68,9 @@ local function merge_metrics(first, second) local found = false for _, f in pairs(first) do if f.metric_name == s.metric_name then + -- It clears the storage name. + f.label_pairs.alias = nil + s.label_pairs.alias = nil if json.encode(f.label_pairs) == json.encode(s.label_pairs) then found = true f.value = f.value + s.value @@ -111,7 +115,6 @@ local function assert_metric(metrics, name, label, values, filters) for k, v in pairs(values) do local filtered = filter_metrics(metric, {[label] = k}) - json.encode(filtered) t.assert_equals(#filtered, 1, label .. "_" .. k) t.assert_equals(filtered[1].value, v, label .. "_" .. k) end @@ -119,9 +122,8 @@ end g.test_metrics_api = function() local tube_name = 'metrics_api_test' - g.queue_conn:call('queue.create_tube', { - tube_name - }) + helper.create_tube(tube_name) + g.queue_conn = helper.get_evaler('queue-router') local task_count = 64 for i = 1, task_count do @@ -191,12 +193,8 @@ end g.test_metrics_api_disabled = function() local tube_name = 'metrics_api_disabled_test' - g.queue_conn:eval("require('sharded_queue.api').cfg(...)", - {{metrics = false}}) - - g.queue_conn:call('queue.create_tube', { - tube_name - }) + helper.set_cfg({metrics = false}) + helper.create_tube(tube_name) g.queue_conn:call(utils.shape_cmd(tube_name, 'put'), { 1, { delay = 3 , ttl = 3, ttr = 1} @@ -214,9 +212,7 @@ end g.test_metrics_api_disable = function() local tube_name = 'metrics_api_disable_test' - g.queue_conn:call('queue.create_tube', { - tube_name - }) + helper.create_tube(tube_name) g.queue_conn:call(utils.shape_cmd(tube_name, 'put'), { 1, { delay = 3 , ttl = 3, ttr = 1} @@ -243,8 +239,7 @@ g.test_metrics_api_disable = function() t.assert_not_equals(get_metric(metrics, "tnt_sharded_queue_storage_role_stats_sum"), {}) t.assert_not_equals(get_metric(metrics, "tnt_sharded_queue_storage_role_stats_count"), {}) - g.queue_conn:eval("require('sharded_queue.api').cfg(...)", - {{metrics = false}}) + helper.set_cfg({metrics = false}) metrics = get_router_metrics(tube_name) t.assert_equals(metrics, {}) @@ -259,11 +254,9 @@ end g.test_metrics_storage = function() local tube_name = 'metrics_storage_test' - g.queue_conn:call('queue.create_tube', { - tube_name - }) + helper.create_tube(tube_name) - local storage = config.cluster:server('queue-storage-1-0').net_box + local storage = helper.get_evaler('queue-storage-1-0') local methods = { 'statistic', 'put', diff --git a/test/simple_test.lua b/test/simple_test.lua index eea6a7e..a7413c3 100644 --- a/test/simple_test.lua +++ b/test/simple_test.lua @@ -1,11 +1,11 @@ local t = require('luatest') local g = t.group('simple_test') -local config = require('test.helper.config') +local helper = require('test.helper') local utils = require('test.helper.utils') g.before_all(function() - g.queue_conn = config.cluster:server('queue-router').net_box + g.queue_conn = helper.get_evaler('queue-router') end) for test_name, options in pairs({ @@ -18,10 +18,7 @@ for test_name, options in pairs({ g['test_put_taken_' .. test_name] = function() local tube_name = 'put_taken_test_' .. test_name - g.queue_conn:call('queue.create_tube', { - tube_name, - options - }) + helper.create_tube(tube_name, options) -- tasks data for putting local task_count = 100 @@ -36,12 +33,10 @@ for test_name, options in pairs({ local task_ids = {} for _, data in pairs(tasks_data) do local task = g.queue_conn:call(utils.shape_cmd(tube_name, 'put'), { data }) - local peek_task = g.queue_conn:call(utils.shape_cmd(tube_name, 'peek'), { task[utils.index.task_id] }) - t.assert_equals(peek_task[utils.index.status], utils.state.READY) task_ids[task[utils.index.task_id]] = true end @@ -73,13 +68,12 @@ end g.test_take_with_options = function() local tube_name = 'test_take_with_options' - g.queue_conn:call('queue.create_tube', { - tube_name, + helper.create_tube(tube_name, { temporary = true, driver = 'sharded_queue.drivers.fifo', } - }) + ) local options, timeout, data = {}, 1, 'data' for _, take_args in pairs({{}, {timeout}, {timeout, options}, {box.NULL, options}}) do @@ -90,20 +84,19 @@ g.test_take_with_options = function() end function g.test_invalid_driver() - t.assert_error_msg_contains('Driver unexistent could not be loaded', function() g.queue_conn:call('queue.create_tube', { + t.assert_error_msg_contains('Driver unexistent could not be loaded', function() helper.create_tube( 'invalid', { driver = 'unexistent' } - }) + ) end) + helper.drop_tube('invalid') end function g.test_delete() local tube_name = 'delete_test' - g.queue_conn:call('queue.create_tube', { - tube_name - }) + helper.create_tube(tube_name) -- task data for putting local task_count = 20 @@ -167,9 +160,7 @@ end function g.test_release() local tube_name = 'release_test' - g.queue_conn:call('queue.create_tube', { - tube_name - }) + helper.create_tube(tube_name) local task_count = 10 local tasks_data = {} @@ -227,9 +218,7 @@ end function g.test_bury_kick() local tube_name = 'bury_kick_test' - g.queue_conn:call('queue.create_tube', { - tube_name - }) + helper.create_tube(tube_name) local cur_stat diff --git a/test/statistics_test.lua b/test/statistics_test.lua index 30d81f8..750749d 100644 --- a/test/statistics_test.lua +++ b/test/statistics_test.lua @@ -1,19 +1,17 @@ local t = require('luatest') local g = t.group('statistics_test') -local config = require('test.helper.config') +local helper = require('test.helper') local utils = require('test.helper.utils') local fiber = require('fiber') g.before_all(function() - g.queue_conn = config.cluster:server('queue-router').net_box + g.queue_conn = helper.get_evaler('queue-router') end) function g.test_statistics() local tube_name = 'statistics_test' - g.queue_conn:call('queue.create_tube', { - tube_name - }) + helper.create_tube(tube_name) local task_count = 64 local middle = 32 diff --git a/test/storage_test.lua b/test/storage_test.lua index 521a3d5..5f9311e 100644 --- a/test/storage_test.lua +++ b/test/storage_test.lua @@ -1,14 +1,12 @@ -#!/usr/bin/env tarantool - local t = require('luatest') local g = t.group('storage') -local config = require('test.helper.config') +local helper = require('test.helper') local methods = require('sharded_queue.storage.methods') g.before_all(function() - g.storage_master = config.cluster:server('queue-storage-1-0').net_box - g.storage_ro = config.cluster:server('queue-storage-1-1').net_box + g.storage_master = helper.get_evaler('queue-storage-1-0') + g.storage_ro = helper.get_evaler('queue-storage-1-1') end) g.test_storage_methods = function() diff --git a/test/take_exp_backoff_test.lua b/test/take_exp_backoff_test.lua index b910171..13ce00c 100644 --- a/test/take_exp_backoff_test.lua +++ b/test/take_exp_backoff_test.lua @@ -1,18 +1,12 @@ local t = require('luatest') local g = t.group('exponential_backoff_test') -local log = require('log') -- luacheck: ignore - -local config = require('test.helper.config') +local helper = require('test.helper') local utils = require('test.helper.utils') local fiber = require('fiber') g.before_all(function() - --- Workaround for https://github.com/tarantool/cartridge/issues/462 - config.cluster:server('queue-router').net_box:close() - config.cluster:server('queue-router').net_box = nil - config.cluster:server('queue-router'):connect_net_box() - g.queue_conn = config.cluster:server('queue-router').net_box + g.queue_conn = helper.get_evaler('queue-router') end) local function task_take(tube_name, timeout, channel, options) @@ -25,9 +19,7 @@ end function g.test_default_wait_factor() local tube_name = 'test_default_wait_factor' - g.queue_conn:call('queue.create_tube', { - tube_name - }) + helper.create_tube(tube_name) local timeout = 10 -- second local attemts = 8 -- attempts count @@ -35,7 +27,7 @@ function g.test_default_wait_factor() local take_time_expected = 0.01*(math.pow(2, attemts+1) - 1) -- 5.12 local channel = fiber.channel(2) - local start = fiber.time64() + local start = fiber.time64() fiber.create(task_take, tube_name, timeout, channel) fiber.sleep(put_wait + 1) @@ -62,17 +54,12 @@ function g.test_success() -- expected time is 1.56 in case wait_factor = 5 local tube_name = 'test_success_exp_backoff' - g.queue_conn:call('queue.create_tube', { - tube_name, - { - wait_factor = 5, - } - }) + helper.create_tube(tube_name, {wait_factor = 5}) local timeout = 10 local channel = fiber.channel(2) - local start = fiber.time64() + local start = fiber.time64() fiber.create(task_take, tube_name, timeout, channel) fiber.sleep(1) @@ -88,53 +75,35 @@ function g.test_success() end function g.test_invalid_factors() - local tube_name = 'test_tinvalid_factors' - t.assert_error_msg_contains('wait_factor', g.queue_conn.call, - g.queue_conn, - 'queue.create_tube', - { tube_name, { - wait_factor = 0.5, - } - }) + t.assert_error_msg_contains('wait_factor', helper.create_tube, + tube_name, {wait_factor = 0.5} + ) - t.assert_error_msg_contains('wait_factor', g.queue_conn.call, - g.queue_conn, - 'queue.create_tube', - { tube_name, { - wait_factor = 'not factor', - } - }) + t.assert_error_msg_contains('wait_factor', helper.create_tube, + tube_name, {wait_factor = 'not factor'} + ) + + helper.drop_tube(tube_name) end function g.test_invalid_wait_max() - local tube_name = 'test_invalid_wait_max' - t.assert_error_msg_contains('wait_max', g.queue_conn.call, - g.queue_conn, - 'queue.create_tube', - { tube_name, { - wait_max = -8, - } - }) + t.assert_error_msg_contains('wait_max', helper.create_tube, + tube_name, {wait_max = -8} + ) - t.assert_error_msg_contains('wait_max', g.queue_conn.call, - g.queue_conn, - 'queue.create_tube', - { tube_name, { - wait_max = 0, - } - }) + t.assert_error_msg_contains('wait_max', helper.create_tube, + tube_name, {wait_max = 0} + ) - t.assert_error_msg_contains('wait_max', g.queue_conn.call, - g.queue_conn, - 'queue.create_tube', - { tube_name, { - wait_max = 'not number', - } - }) + t.assert_error_msg_contains('wait_max', helper.create_tube, + tube_name, {wait_max = 'not number'} + ) + + helper.drop_tube(tube_name) end function g.test_invalid_wait_max_on_take() @@ -151,18 +120,18 @@ function g.test_invalid_wait_max_on_take() -- expected time is 3.31 in case wait_factor = 5 local tube_name = 'test_invalid_wait_max_on_take' - g.queue_conn:call('queue.create_tube', { + helper.create_tube( tube_name, { wait_factor = 5, wait_max = 1, } - }) + ) local timeout = 10 local channel = fiber.channel(2) - local start = fiber.time64() + local start = fiber.time64() fiber.create(task_take, tube_name, timeout, channel, { wait_max = -10, -- invalid wait_factor, using 0.3 }) @@ -197,16 +166,11 @@ function g.test_wait_max_on_tube() -- expected time is 3.27 local tube_name = 'test_wait_max_on_take_tube' - g.queue_conn:call('queue.create_tube', { - tube_name, - { - wait_max = 1, - } - }) + helper.create_tube(tube_name, {wait_max = 1}) local timeout = 10 local channel = fiber.channel(2) - local start = fiber.time64() + local start = fiber.time64() fiber.create(task_take, tube_name, timeout, channel) fiber.sleep(2.8) @@ -240,17 +204,16 @@ function g.test_wait_max_in_take() -- expected time is 3.27 local tube_name = 'test_wait_max_in_take_tube' - g.queue_conn:call('queue.create_tube', { - tube_name, + helper.create_tube(tube_name, { wait_factor = 2, wait_max = 100, } - }) + ) local timeout = 10 local channel = fiber.channel(2) - local start = fiber.time64() + local start = fiber.time64() fiber.create(task_take, tube_name, timeout, channel, {wait_max = 1.0}) fiber.sleep(2.8) diff --git a/test/timeout_test.lua b/test/timeout_test.lua index 2d6f093..837f56f 100644 --- a/test/timeout_test.lua +++ b/test/timeout_test.lua @@ -1,18 +1,12 @@ local t = require('luatest') local g = t.group('timeout_test') -local log = require('log') -- luacheck: ignore - -local config = require('test.helper.config') +local helper = require('test.helper') local utils = require('test.helper.utils') local fiber = require('fiber') g.before_all(function() - --- Workaround for https://github.com/tarantool/cartridge/issues/462 - config.cluster:server('queue-router').net_box:close() - config.cluster:server('queue-router').net_box = nil - config.cluster:server('queue-router'):connect_net_box() - g.queue_conn = config.cluster:server('queue-router').net_box + g.queue_conn = helper.get_evaler('queue-router') end) local function task_take(tube_name, timeout, channel) @@ -31,12 +25,7 @@ function g.test_try_waiting() -- CHECK uptime and value - nil local tube_name = 'try_waiting_test' - g.queue_conn:call('queue.create_tube', { - tube_name, - { - wait_factor = 1, - } - }) + helper.create_tube(tube_name, {wait_factor = 1}) local timeout = 3 -- second @@ -62,12 +51,7 @@ function g.test_wait_put_taking() -- CHEK what was taken successfully local tube_name = 'wait_put_taking_test' - g.queue_conn:call('queue.create_tube', { - tube_name, - { - wait_factor = 1, - } - }) + helper.create_tube(tube_name, {wait_factor = 1}) local timeout = 3 diff --git a/test/ttl_test.lua b/test/ttl_test.lua index 746f90c..24a8865 100644 --- a/test/ttl_test.lua +++ b/test/ttl_test.lua @@ -1,22 +1,25 @@ local t = require('luatest') local g = t.group('ttl_test') -local config = require('test.helper.config') +local helper = require('test.helper') local utils = require('test.helper.utils') local fiber = require('fiber') g.before_all(function() - g.queue_conn = config.cluster:server('queue-router').net_box + g.queue_conn = helper.get_evaler('queue-router') end) -local function lookup_task(task_id, tube_name, cluster) +local function lookup_task(task_id, tube_name) local call_string = ("return box.space.%s:get(%s):tomap({names_only=true})"):format(tube_name, task_id) local ok, stored_task - for _, server in pairs(cluster.servers) do + for server, _ in pairs(helper.servers) do ok, stored_task = pcall(function() - return server.net_box:eval(call_string) + local evaler = helper.get_evaler(server) + return evaler:eval(call_string) end) - if ok then break end + if ok then + break + end end return stored_task end @@ -24,16 +27,13 @@ end function g.test_fifottl_config() local tube_name = 'test_fifottl_config' local tube_options = { ttl = 43, ttr = 15, priority = 17, wait_factor = 1 } - g.queue_conn:call('queue.create_tube', { - tube_name, - tube_options - }) + helper.create_tube(tube_name, tube_options) local task_id = g.queue_conn:call(utils.shape_cmd(tube_name, 'put'), { 'simple data', })[1] - local stored_task = lookup_task(task_id, tube_name, config.cluster) + local stored_task = lookup_task(task_id, tube_name) t.assert_equals(stored_task.ttl, tube_options.ttl * 1000000) t.assert_equals(stored_task.ttr, tube_options.ttr * 1000000) @@ -43,31 +43,23 @@ end function g.test_fifottl_config_pri() local tube_name = 'test_fifottl_config_pri' local tube_options = { ttl = 43, ttr = 15, pri = 15, wait_factor = 1 } - g.queue_conn:call('queue.create_tube', { - tube_name, - tube_options - }) + helper.create_tube(tube_name, tube_options) local task_id = g.queue_conn:call(utils.shape_cmd(tube_name, 'put'), { 'simple data', })[1] - local stored_task = lookup_task(task_id, tube_name, config.cluster) + local stored_task = lookup_task(task_id, tube_name) t.assert_equals(stored_task.priority, tube_options.pri) task_id = g.queue_conn:call(utils.shape_cmd(tube_name, 'put'), { 'simple data', {pri = 18}})[1] - stored_task = lookup_task(task_id, tube_name, config.cluster) + stored_task = lookup_task(task_id, tube_name) t.assert_equals(stored_task.priority, 18) end function g.test_touch_task() local tube_name = 'touch_task_test' - g.queue_conn:call('queue.create_tube', { - tube_name, - { - wait_factor = 1, - } - }) + helper.create_tube(tube_name, {wait_factor = 1}) local task = g.queue_conn:call(utils.shape_cmd(tube_name, 'put'), { 'simple data', @@ -98,12 +90,7 @@ end function g.test_delayed_tasks() local tube_name = 'delayed_tasks_test' - g.queue_conn:call('queue.create_tube', { - tube_name, - { - wait_factor = 1, - } - }) + helper.create_tube(tube_name, {wait_factor = 1}) -- task delayed for 0.1 sec local task = g.queue_conn:call(utils.shape_cmd(tube_name, 'put'), { 'simple data', @@ -153,14 +140,14 @@ end function g.test_ttr_release_no_delete_task() local tube_name = 'ttr_release_no_delete_task_test' - g.queue_conn:call('queue.create_tube', { + helper.create_tube( tube_name, { wait_factor = 1, ttr = 0.2, log_request = true, } - }) + ) g.queue_conn:call(utils.shape_cmd(tube_name, 'put'), { 'simple data', @@ -183,14 +170,14 @@ end function g.test_ttr_bury_no_delete_task() local tube_name = 'ttr_bury_no_delete_task_test' - g.queue_conn:call('queue.create_tube', { + helper.create_tube( tube_name, { wait_factor = 1, ttr = 0.2, log_request = true, } - }) + ) g.queue_conn:call(utils.shape_cmd(tube_name, 'put'), { 'simple data',