diff --git a/src/api/common_api.js b/src/api/common_api.js index e9f4c948a2..454f14811a 100644 --- a/src/api/common_api.js +++ b/src/api/common_api.js @@ -1413,10 +1413,9 @@ module.exports = { 's3:ObjectTagging:*', 's3:ObjectTagging:Put', 's3:ObjectTagging:Delete', - /*We plan to support LifecycleExpiration 's3:LifecycleExpiration:*', 's3:LifecycleExpiration:Delete', - 's3:LifecycleExpiration:DeleteMarkerCreated',*/ + 's3:LifecycleExpiration:DeleteMarkerCreated', ], } } diff --git a/src/api/object_api.js b/src/api/object_api.js index aabae97bff..25b5beae87 100644 --- a/src/api/object_api.js +++ b/src/api/object_api.js @@ -1151,6 +1151,9 @@ module.exports = { }, limit: { type: 'integer' + }, + reply_objects: { + type: 'boolean' } } }, @@ -1159,6 +1162,12 @@ module.exports = { properties: { num_objects_deleted: { type: 'integer' + }, + deleted_objects: { + type: 'array', + items: { + $ref: '#/definitions/object_info' + } } } }, diff --git a/src/endpoint/endpoint.js b/src/endpoint/endpoint.js index 3fda31f315..231cf31dd5 100755 --- a/src/endpoint/endpoint.js +++ b/src/endpoint/endpoint.js @@ -43,6 +43,7 @@ const { NamespaceMonitor } = require('../server/bg_services/namespace_monitor'); const { SemaphoreMonitor } = require('../server/bg_services/semaphore_monitor'); const prom_reporting = require('../server/analytic_services/prometheus_reporting'); const { PersistentLogger } = require('../util/persistent_logger'); +const { get_notification_logger } = require('../util/notifications_util'); const NoobaaEvent = require('../manage_nsfs/manage_nsfs_events_utils').NoobaaEvent; const cluster = /** @type {import('node:cluster').Cluster} */ ( /** @type {unknown} */ (require('node:cluster')) @@ -139,11 +140,10 @@ async function main(options = {}) { poll_interval: config.NSFS_GLACIER_LOGS_POLL_INTERVAL, }); - notification_logger = config.NOTIFICATION_LOG_DIR && - new PersistentLogger(config.NOTIFICATION_LOG_DIR, node_name + '_' + config.NOTIFICATION_LOG_NS, { - locking: 'SHARED', - poll_interval: config.NSFS_GLACIER_LOGS_POLL_INTERVAL, - }); + notification_logger = config.NOTIFICATION_LOG_DIR && get_notification_logger( + 'SHARED', //shared locking for endpoitns + undefined, //use default namespace based on hostname + config.NSFS_GLACIER_LOGS_POLL_INTERVAL); process.on('warning', e => dbg.warn(e.stack)); diff --git a/src/endpoint/s3/s3_bucket_logging.js b/src/endpoint/s3/s3_bucket_logging.js index 47acf8e4c2..0b75eb3f5f 100644 --- a/src/endpoint/s3/s3_bucket_logging.js +++ b/src/endpoint/s3/s3_bucket_logging.js @@ -6,7 +6,7 @@ const http_utils = require('../../util/http_utils'); const dgram = require('node:dgram'); const { Buffer } = require('node:buffer'); const config = require('../../../config'); -const {compose_notification, check_notif_relevant} = require('../../util/notifications_util'); +const {compose_notification_req, check_notif_relevant} = require('../../util/notifications_util'); async function send_bucket_op_logs(req, res) { if (req.params && req.params.bucket && @@ -30,13 +30,7 @@ async function send_bucket_op_logs(req, res) { if (req.notification_logger && bucket_info.notifications) { for (const notif_conf of bucket_info.notifications) { if (check_notif_relevant(notif_conf, req)) { - const notif = { - meta: { - connect: notif_conf.Connect, - name: notif_conf.name - }, - notif: compose_notification(req, res, bucket_info, notif_conf) - }; + const notif = compose_notification_req(req, res, bucket_info, notif_conf); dbg.log1("logging notif ", notif_conf, ", notif = ", notif); writes_aggregate.push({ file: req.notification_logger, diff --git a/src/server/bg_services/lifecycle.js b/src/server/bg_services/lifecycle.js index 0d3b2eacee..732ceb1089 100644 --- a/src/server/bg_services/lifecycle.js +++ b/src/server/bg_services/lifecycle.js @@ -10,6 +10,8 @@ const server_rpc = require('../server_rpc'); const system_store = require('../system_services/system_store').get_instance(); const auth_server = require('../common_services/auth_server'); const config = require('../../../config'); +const { get_notification_logger, check_notif_relevant, + OP_TO_EVENT, compose_notification_lifecycle } = require('../../util/notifications_util'); function get_expiration_timestamp(expiration) { if (!expiration) { @@ -37,7 +39,19 @@ async function handle_bucket_rule(system, rule, j, bucket) { dbg.log0('LIFECYCLE SKIP bucket:', bucket.name, '(bucket id:', bucket._id, ') rule', util.inspect(rule), 'now', now, 'last_sync', rule.last_sync, 'no expiration'); return; } - dbg.log0('LIFECYCLE PROCESSING bucket:', bucket.name, '(bucket id:', bucket._id, ') rule', util.inspect(rule)); + dbg.log0('LIFECYCLE PROCESSING bucket:', bucket.name.unwrap(), '(bucket id:', bucket._id, ') rule', util.inspect(rule)); + + //we might need to send notifications for deleted objects, if + //1. notifications are enabled AND + //2. bucket has notifications at all AND + //3. bucket has a relevant notification, either + //3.1. notification is without event filtering OR + //3.2. notification is for LifecycleExpiration event + //if so, we need the metadata of the deleted objects from the object server + const reply_objects = config.NOTIFICATION_LOG_DIR && bucket.notifications && + _.some(bucket.notifications, notif => { + return (!notif.Events || _.some(notif.Events, event => {return event.includes(OP_TO_EVENT.lifecycle_delete.name)})) + }); const res = await server_rpc.client.object.delete_multiple_objects_by_filter({ bucket: bucket.name, @@ -47,6 +61,7 @@ async function handle_bucket_rule(system, rule, j, bucket) { size_greater: rule.filter.object_size_greater_than, tags: rule.filter.tags, limit: config.LIFECYCLE_BATCH_SIZE, + reply_objects, }, { auth_token: auth_server.make_auth_token({ system_id: system._id, @@ -55,6 +70,46 @@ async function handle_bucket_rule(system, rule, j, bucket) { }) }); + //dbg.log0("LIFECYCLE PROCESSING res =", res); + + if (res.deleted_objects) { + + const writes = []; + + for (const deleted_obj of res.deleted_objects) { + for (const notif of bucket.notifications) { + if (check_notif_relevant(notif, { + op_name: 'lifecycle_delete', + s3_event_method: deleted_obj.created_delete_marker ? 'DeleteMarkerCreated' : 'Delete', + })) { + //remember that this deletion needs a notif for this specific notification conf + writes.push({notif, deleted_obj}); + } + } + } + + //if any notifications are needed, write them in notification log file + //(otherwise don't do any unnecessary filesystem actions) + if (writes.length > 0) { + let logger; + const promises = []; + try { + logger = get_notification_logger('SHARED'); + for (const write of writes) { + promises.push(new Promise((res) => { + const notif = compose_notification_lifecycle(write.deleted_obj, write.notif, bucket); + logger.append(JSON.stringify(notif)).then(res); + })); + } + await Promise.all(promises); + } + finally { + if (logger) logger.close(); + } + } + + } + bucket.lifecycle_configuration_rules[j].last_sync = Date.now(); if (res.num_objects_deleted >= config.LIFECYCLE_BATCH_SIZE) should_rerun = true; dbg.log0('LIFECYCLE Done bucket:', bucket.name, '(bucket id:', bucket._id, ') done deletion of objects per rule', @@ -78,7 +133,7 @@ async function background_worker() { const results = await P.all(_.map(bucket.lifecycle_configuration_rules, async (lifecycle_rule, j) => { - dbg.log0('LIFECYCLE READ BUCKETS configuration handle_bucket_rule bucket name:', bucket.name, "rule", lifecycle_rule, 'j', j); + dbg.log0('LIFECYCLE READ BUCKETS configuration handle_bucket_rule bucket name:', bucket.name.unwrap(), "rule", lifecycle_rule, 'j', j); return handle_bucket_rule(system, lifecycle_rule, j, bucket); } )); diff --git a/src/server/object_services/object_server.js b/src/server/object_services/object_server.js index 2052fe88a2..53432f590d 100644 --- a/src/server/object_services/object_server.js +++ b/src/server/object_services/object_server.js @@ -949,6 +949,7 @@ async function delete_multiple_objects_by_filter(req) { dbg.log1(`delete_multiple_objects_by_filter: bucket=${req.bucket.name} filter=${util.inspect(req.rpc_params)}`); const key = new RegExp('^' + _.escapeRegExp(req.rpc_params.prefix)); const bucket_id = req.bucket._id; + const reply_objects = req.rpc_params.reply_objects; // TODO: change it to perform changes in batch. Won't scale. const query = { bucket_id, @@ -972,7 +973,16 @@ async function delete_multiple_objects_by_filter(req) { })) } })); - return { num_objects_deleted: objects.length }; + + const reply = { num_objects_deleted: objects.length }; + if (reply_objects) { + //reply needs to include deleted objects + //(this is used for LifecycleExpiratoin event notifications) + //so map the md into (api friendly) object info + reply.deleted_objects = _.map(objects, get_object_info); + } + + return reply; } /** diff --git a/src/util/notifications_util.js b/src/util/notifications_util.js index af061a6f1f..d25bf7f178 100644 --- a/src/util/notifications_util.js +++ b/src/util/notifications_util.js @@ -22,6 +22,7 @@ const OP_TO_EVENT = Object.freeze({ put_object_acl: { name: 'ObjectAcl' }, put_object_tagging: { name: 'ObjectTagging' }, delete_object_tagging: { name: 'ObjectTagging' }, + lifecycle_delete: { name: 'LifecycleExpiration' }, }); class Notificator { @@ -86,7 +87,7 @@ class Notificator { seen_nodes.add(node_namespace); } dbg.log1("process_notification_files node_namespace =", node_namespace, ", file =", entry.name); - const log = new PersistentLogger(config.NOTIFICATION_LOG_DIR, node_namespace, { locking: 'EXCLUSIVE' }); + const log = get_notification_logger('EXCLUSIVE', node_namespace); try { await log.process(async (file, failure_append) => await this._notify(this.fs_context, file, failure_append)); } catch (err) { @@ -310,51 +311,65 @@ async function test_notifications(bucket) { } } -//see https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html -function compose_notification(req, res, bucket, notif_conf) { - let eTag = res.getHeader('ETag'); - //eslint-disable-next-line - if (eTag && eTag.startsWith('\"') && eTag.endsWith('\"')) { - eTag = eTag.substring(2, eTag.length - 2); - } +function compose_notification_base(notif_conf, bucket, req) { - const event = OP_TO_EVENT[req.op_name]; - const http_verb_capitalized = req.method.charAt(0).toUpperCase() + req.method.slice(1).toLowerCase(); const event_time = new Date(); const notif = { eventVersion: '2.3', eventSource: _get_system_name(req) + ':s3', eventTime: event_time.toISOString(), - eventName: event.name + ':' + (event.method || req.s3_event_method || http_verb_capitalized), - userIdentity: { - principalId: req.object_sdk.requesting_account.name, - }, - requestParameters: { - sourceIPAddress: http_utils.parse_client_ip(req), - }, - responseElements: { - "x-amz-request-id": req.request_id, - "x-amz-id-2": req.request_id, - }, + s3: { s3SchemaVersion: "1.0", configurationId: notif_conf.name, + object: { + //default for sequencer, overriden in compose_notification_req for noobaa ns + sequencer: event_time.getTime().toString(16), + }, bucket: { name: bucket.name, ownerIdentity: { - principalId: bucket.bucket_owner.unwrap(), + //buckets from s3 reqs are sdk-style, from lifcycle are "raw" system store object + principalId: bucket.bucket_owner ? bucket.bucket_owner.unwrap() : bucket.owner_account.name.unwrap(), }, arn: "arn:aws:s3:::" + bucket.name, }, - object: { - key: req.params.key, - size: res.getHeader('content-length'), - eTag, - versionId: res.getHeader('x-amz-version-id'), - }, } + } + + return notif; + +} + +//see https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html +function compose_notification_req(req, res, bucket, notif_conf) { + let eTag = res.getHeader('ETag'); + //eslint-disable-next-line + if (eTag && eTag.startsWith('\"') && eTag.endsWith('\"')) { + eTag = eTag.substring(2, eTag.length - 2); + } + + const event = OP_TO_EVENT[req.op_name]; + const http_verb_capitalized = req.method.charAt(0).toUpperCase() + req.method.slice(1).toLowerCase(); + + const notif = compose_notification_base(notif_conf, bucket, req); + + notif.eventName = event.name + ':' + (event.method || req.s3_event_method || http_verb_capitalized); + notif.userIdentity = { + principalId: req.object_sdk.requesting_account.name, }; + notif.requestParameters = { + sourceIPAddress: http_utils.parse_client_ip(req), + }; + notif.responseElements = { + "x-amz-request-id": req.request_id, + "x-amz-id-2": req.request_id, + }; + notif.s3.object.key = req.params.key; + notif.s3.object.size = res.getHeader('content-length'), + notif.s3.object.eTag = eTag; + notif.s3.object.versionId = res.getHeader('x-amz-version-id') //handle glacierEventData if (res.restore_object_result) { @@ -370,21 +385,41 @@ function compose_notification(req, res, bucket, notif_conf) { if (res.seq) { //in noobaa-ns we have a sequence from db notif.s3.object.sequencer = res.seq; - } else { - //fallback to time-based sequence - notif.s3.object.sequencer = event_time.getTime().toString(16); } - const records = [notif]; - - return {Records: records}; + return compose_meta(notif, notif_conf); } +function compose_notification_lifecycle(deleted_obj, notif_conf, bucket) { + const notif = compose_notification_base(notif_conf, bucket); + + notif.eventName = OP_TO_EVENT.lifecycle_delete.name + ':' + + deleted_obj.created_delete_marker ? 'DeleteMarkerCreated' : 'Delete'; + notif.s3.object.key = deleted_obj.key; + notif.s3.object.size = deleted_obj.size; + notif.s3.object.eTag = deleted_obj.etag; + notif.s3.object.versionId = deleted_obj.version_id; + + return compose_meta(notif, notif_conf); + +} + +function compose_meta(record, notif_conf) { + return { + meta: { + connect: notif_conf.Connect, + name: notif_conf.Id + }, + notif: { + Records: [record], + } + }; +} function _get_system_name(req) { - if (req.object_sdk.nsfs_config_root) { + if (req && req.object_sdk && req.object_sdk.nsfs_system) { const name = Object.keys(req.object_sdk.nsfs_system)[0]; return name; } else { @@ -427,7 +462,26 @@ function check_notif_relevant(notif, req) { return false; } +/** + * + * @param {"SHARED" | "EXCLUSIVE"} locking counterintuitively, either 'SHARED' for writing or 'EXCLUSIVE' for reading + */ +function get_notification_logger(locking, namespace, poll_interval) { + if (!namespace) { + const node_name = process.env.NODE_NAME || os.hostname(); + namespace = node_name + '_' + config.NOTIFICATION_LOG_NS; + } + + return new PersistentLogger(config.NOTIFICATION_LOG_DIR, namespace, { + locking, + poll_interval, + }); +} + exports.Notificator = Notificator; exports.test_notifications = test_notifications; -exports.compose_notification = compose_notification; +exports.compose_notification_req = compose_notification_req; +exports.compose_notification_lifecycle = compose_notification_lifecycle; exports.check_notif_relevant = check_notif_relevant; +exports.get_notification_logger = get_notification_logger; +exports.OP_TO_EVENT = OP_TO_EVENT;