diff --git a/src/test/unit_tests/coretest.js b/src/test/unit_tests/coretest.js index 9d084d06bc..0151baa404 100644 --- a/src/test/unit_tests/coretest.js +++ b/src/test/unit_tests/coretest.js @@ -8,6 +8,7 @@ const mocha = require('mocha'); const assert = require('assert'); const crypto = require('crypto'); const fs = require('fs'); +const path = require('path'); // keep me first - this is setting envs that should be set before other modules are required. const CORETEST = 'coretest'; @@ -49,6 +50,9 @@ const system_store = require('../../server/system_services/system_store').get_in const MDStore = require('../../server/object_services/md_store').MDStore; const pool_server = require('../../server/system_services/pool_server'); const pool_ctrls = require('../../server/system_services/pool_controllers'); +const { PersistentLogger } = require('../../util/persistent_logger'); +const os = require('os'); +const { TMP_PATH } = require('../system_tests/test_utils'); // Set the pools server pool controller factory to create pools with // backed by in process agents. @@ -132,11 +136,18 @@ function setup(options = {}) { _.each(server_rpc.rpc._services, (service, srv) => api_coverage.add(srv)); + const notification_logger = + new PersistentLogger(path.join(TMP_PATH, 'test_notifications', 'notif_logs'), + process.env.NODE_NAME || os.hostname() + '_' + config.NOTIFICATION_LOG_NS, { + locking: 'SHARED', + poll_interval: config.NSFS_GLACIER_LOGS_POLL_INTERVAL, + }); + const object_io = new ObjectIO(); const endpoint_request_handler = endpoint.create_endpoint_handler( - endpoint.create_init_request_sdk(server_rpc.rpc, rpc_client, object_io), [], false); + endpoint.create_init_request_sdk(server_rpc.rpc, rpc_client, object_io), [], false, undefined, notification_logger); const endpoint_request_handler_sts = endpoint.create_endpoint_handler( - endpoint.create_init_request_sdk(server_rpc.rpc, rpc_client, object_io), [], true); + endpoint.create_init_request_sdk(server_rpc.rpc, rpc_client, object_io), [], true, undefined, notification_logger); async function announce(msg) { if (process.env.SUPPRESS_LOGS) return; diff --git a/src/test/unit_tests/index.js b/src/test/unit_tests/index.js index ecec0b3775..9f252ccc74 100644 --- a/src/test/unit_tests/index.js +++ b/src/test/unit_tests/index.js @@ -99,6 +99,7 @@ require('./test_tiering_ttl_worker'); // require('./test_tiering_upload'); //require('./test_s3_worm'); require('./test_bucket_logging'); +require('./test_notifications'); // UPGRADE // require('./test_postgres_upgrade'); // TODO currently working with mongo -> once changing to postgres - need to uncomment diff --git a/src/test/unit_tests/test_notifications.js b/src/test/unit_tests/test_notifications.js new file mode 100644 index 0000000000..8c66c1ae3b --- /dev/null +++ b/src/test/unit_tests/test_notifications.js @@ -0,0 +1,207 @@ +/* Copyright (C) 2016 NooBaa */ +'use strict'; + +// disabling init_rand_seed as it takes longer than the actual test execution +process.env.DISABLE_INIT_RANDOM_SEED = "true"; + +const fs = require('fs'); +const path = require('path'); + +const config = require('../../../config'); + +// setup coretest first to prepare the env +const { get_coretest_path, TMP_PATH } = require('../system_tests/test_utils'); +const coretest_path = get_coretest_path(); +const coretest = require(coretest_path); + +const { rpc_client, EMAIL, POOL_LIST } = coretest; + +console.log('POOL_LIST =', POOL_LIST); + +const tmp_path = path.join(TMP_PATH, 'test_notifications'); +//dir for connect files +const tmp_connect = path.join(tmp_path, 'connect'); +//dir for notification persistent files +const notif_logs_path = path.join(tmp_path, 'notif_logs'); +config.NOTIFICATION_LOG_DIR = notif_logs_path; + +coretest.setup({pools_to_create: [POOL_LIST[1]]}); +const { S3 } = require('@aws-sdk/client-s3'); +const mocha = require('mocha'); +const { NodeHttpHandler } = require("@smithy/node-http-handler"); +const http = require('http'); +const assert = require('assert'); +const timer = require('node:timers/promises'); +const notifications_util = require('../../util/notifications_util'); + +const http_connect_filename = 'http_connect.kv'; +const http_connect_path = path.join(tmp_connect, http_connect_filename); +//content of connect file, will be written to a file in before() +const http_connect = +'agent_request_object={"host": "localhost", "port": 9999, "timeout": 100}\n' + +'request_options_object={"auth": "amit:passw"}\n' + +'notification_protocol=http\n' + +'name=http_notif'; + +const Bucket = 'notif'; + +let http_server = null; +let server_done = false; +let expected_bucket; +let expected_event_name; + +// eslint-disable-next-line max-lines-per-function +mocha.describe('notifications', function() { + + this.timeout(20000); // eslint-disable-line no-invalid-this + + describe('notifications', () => { + + const s3_creds = { + forcePathStyle: true, + region: config.DEFAULT_REGION, + requestHandler: new NodeHttpHandler({ + httpAgent: new http.Agent({ keepAlive: false }) + }) + }; + let s3; + + mocha.before(async function() { + + const admin_keys = (await rpc_client.account.read_account({ email: EMAIL, })).access_keys; + s3_creds.credentials = { + accessKeyId: admin_keys[0].access_key.unwrap(), + secretAccessKey: admin_keys[0].secret_key.unwrap(), + }; + s3_creds.endpoint = coretest.get_http_address(); + s3 = new S3(s3_creds); + + //create http connect file + fs.mkdirSync(tmp_connect, {recursive: true}); + fs.writeFileSync(http_connect_path, http_connect); + + fs.mkdirSync(notif_logs_path, {recursive: true}); + + await s3.createBucket({ + Bucket, + }); + + http_server = http.createServer(async function(req, res) { + const chunks = []; + + for await (const chunk of req) { + chunks.push(chunk); + } + + const input = Buffer.concat(chunks); + const notif = JSON.parse(input.toString()); + + assert.strictEqual(notif.Records[0].s3.bucket.name, expected_bucket, 'wrong bucket name in notification'); + assert.strictEqual(notif.Records[0].eventName, expected_event_name, 'wrong event name in notification'); + + res.writeHead(200, {'Content-Type': 'text/plain'}); + res.end(); + server_done = true; + }).listen(9999); + }); + + mocha.after(() => { + http_server.close(); + }); + + mocha.it('set/get notif conf s3ops', async () => { + await s3.putBucketNotificationConfiguration({ + Bucket, + NotificationConfiguration: { + TopicConfigurations: [{ + "Id": "system_test_http_no_event", + "TopicArn": http_connect_path, + }], + }, + }); + + const get = await s3.getBucketNotificationConfiguration({Bucket}); + assert.strictEqual(get.TopicConfigurations[0].Id, 'system_test_http_no_event'); + }); + + mocha.it('simple notif put', async () => { + await s3.putObject({ + Bucket, + Key: 'f1', + Body: 'this is the body', + }); + + await notify_await_result({bucket: Bucket, event_name: 'ObjectCreated:Put'}); + }); + + + mocha.it('simple notif delete', async () => { + await s3.deleteObject({ + Bucket, + Key: 'f1', + }); + + await notify_await_result({bucket: Bucket, event_name: 'ObjectRemoved:Delete'}); + }); + + + mocha.it('event notif', async () => { + + const set = await s3.putBucketNotificationConfiguration({ + Bucket, + NotificationConfiguration: { + TopicConfigurations: [{ + "Id": "system_test_http_event", + "TopicArn": http_connect_path, + "Events": ["s3:ObjectCreated:*"], + }], + }, + }); + + assert.strictEqual(set.$metadata.httpStatusCode, 200); + + await s3.putObject({ + Bucket, + Key: 'f1', + Body: 'this is the body', + }); + + await notify_await_result({bucket: Bucket, event_name: 'ObjectCreated:Put'}); + + await s3.deleteObject({ + Bucket, + Key: 'f1', + }); + + //there shouldn't be a notification for the delete, wait 2 seconds to validate this + await notify_await_result({timeout: 2000}); + }); + }); + +}); + +const step_wait = 100; +async function notify_await_result({bucket, event_name, timeout}) { + + //remember expected result here so server could compare it to actual result later + expected_bucket = bucket; + expected_event_name = event_name; + server_done = false; + + //busy-sync wait for server + //eslint-disable-next-line no-unmodified-loop-condition + while (!server_done) { + console.log('awaiting for notification to arrive, timeout =', timeout); + await new notifications_util.Notificator({name: 'coretest notificator'}).process_notification_files(); + await timer.setTimeout(step_wait); + if (timeout !== undefined) { + timeout -= step_wait; + //timeout if we're validating notification did not arrive + if (timeout < 0) break; + } + } + + //if we were not expecting to get a notification (time is undefined), + //make sure server_done remained false + assert.strictEqual(timeout === undefined || !server_done, true, "unexpected notification received"); +}