Skip to content

Commit

Permalink
notifications | lifecycle impl
Browse files Browse the repository at this point in the history
Signed-off-by: Amit Prinz Setter <[email protected]>
  • Loading branch information
alphaprinz committed Nov 13, 2024
1 parent e1bf29e commit e2f6412
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 54 deletions.
3 changes: 1 addition & 2 deletions src/api/common_api.js
Original file line number Diff line number Diff line change
Expand Up @@ -1413,10 +1413,9 @@ module.exports = {
's3:ObjectTagging:*',
's3:ObjectTagging:Put',
's3:ObjectTagging:Delete',
/*We plan to support LifecycleExpiration
's3:LifecycleExpiration:*',
's3:LifecycleExpiration:Delete',
's3:LifecycleExpiration:DeleteMarkerCreated',*/
's3:LifecycleExpiration:DeleteMarkerCreated',
],
}
}
Expand Down
9 changes: 9 additions & 0 deletions src/api/object_api.js
Original file line number Diff line number Diff line change
Expand Up @@ -1151,6 +1151,9 @@ module.exports = {
},
limit: {
type: 'integer'
},
reply_objects: {
type: 'boolean'
}
}
},
Expand All @@ -1159,6 +1162,12 @@ module.exports = {
properties: {
num_objects_deleted: {
type: 'integer'
},
deleted_objects: {
type: 'array',
items: {
$ref: '#/definitions/object_info'
}
}
}
},
Expand Down
10 changes: 5 additions & 5 deletions src/endpoint/endpoint.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const { NamespaceMonitor } = require('../server/bg_services/namespace_monitor');
const { SemaphoreMonitor } = require('../server/bg_services/semaphore_monitor');
const prom_reporting = require('../server/analytic_services/prometheus_reporting');
const { PersistentLogger } = require('../util/persistent_logger');
const { get_notification_logger } = require('../util/notifications_util');
const NoobaaEvent = require('../manage_nsfs/manage_nsfs_events_utils').NoobaaEvent;
const cluster = /** @type {import('node:cluster').Cluster} */ (
/** @type {unknown} */ (require('node:cluster'))
Expand Down Expand Up @@ -139,11 +140,10 @@ async function main(options = {}) {
poll_interval: config.NSFS_GLACIER_LOGS_POLL_INTERVAL,
});

notification_logger = config.NOTIFICATION_LOG_DIR &&
new PersistentLogger(config.NOTIFICATION_LOG_DIR, node_name + '_' + config.NOTIFICATION_LOG_NS, {
locking: 'SHARED',
poll_interval: config.NSFS_GLACIER_LOGS_POLL_INTERVAL,
});
notification_logger = config.NOTIFICATION_LOG_DIR && get_notification_logger(
'SHARED', //shared locking for endpoitns
undefined, //use default namespace based on hostname
config.NSFS_GLACIER_LOGS_POLL_INTERVAL);

process.on('warning', e => dbg.warn(e.stack));

Expand Down
10 changes: 2 additions & 8 deletions src/endpoint/s3/s3_bucket_logging.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const http_utils = require('../../util/http_utils');
const dgram = require('node:dgram');
const { Buffer } = require('node:buffer');
const config = require('../../../config');
const {compose_notification, check_notif_relevant} = require('../../util/notifications_util');
const {compose_notification_req, check_notif_relevant} = require('../../util/notifications_util');

async function send_bucket_op_logs(req, res) {
if (req.params && req.params.bucket &&
Expand All @@ -30,13 +30,7 @@ async function send_bucket_op_logs(req, res) {
if (req.notification_logger && bucket_info.notifications) {
for (const notif_conf of bucket_info.notifications) {
if (check_notif_relevant(notif_conf, req)) {
const notif = {
meta: {
connect: notif_conf.Connect,
name: notif_conf.name
},
notif: compose_notification(req, res, bucket_info, notif_conf)
};
const notif = compose_notification_req(req, res, bucket_info, notif_conf);
dbg.log1("logging notif ", notif_conf, ", notif = ", notif);
writes_aggregate.push({
file: req.notification_logger,
Expand Down
59 changes: 57 additions & 2 deletions src/server/bg_services/lifecycle.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ const server_rpc = require('../server_rpc');
const system_store = require('../system_services/system_store').get_instance();
const auth_server = require('../common_services/auth_server');
const config = require('../../../config');
const { get_notification_logger, check_notif_relevant,
OP_TO_EVENT, compose_notification_lifecycle } = require('../../util/notifications_util');

function get_expiration_timestamp(expiration) {
if (!expiration) {
Expand Down Expand Up @@ -37,7 +39,19 @@ async function handle_bucket_rule(system, rule, j, bucket) {
dbg.log0('LIFECYCLE SKIP bucket:', bucket.name, '(bucket id:', bucket._id, ') rule', util.inspect(rule), 'now', now, 'last_sync', rule.last_sync, 'no expiration');
return;
}
dbg.log0('LIFECYCLE PROCESSING bucket:', bucket.name, '(bucket id:', bucket._id, ') rule', util.inspect(rule));
dbg.log0('LIFECYCLE PROCESSING bucket:', bucket.name.unwrap(), '(bucket id:', bucket._id, ') rule', util.inspect(rule));

//we might need to send notifications for deleted objects, if
//1. notifications are enabled AND
//2. bucket has notifications at all AND
//3. bucket has a relevant notification, either
//3.1. notification is without event filtering OR
//3.2. notification is for LifecycleExpiration event
//if so, we need the metadata of the deleted objects from the object server
const reply_objects = config.NOTIFICATION_LOG_DIR && bucket.notifications &&
_.some(bucket.notifications, notif => {
return (!notif.Events || _.some(notif.Events, event => {return event.includes(OP_TO_EVENT.lifecycle_delete.name)}))
});

const res = await server_rpc.client.object.delete_multiple_objects_by_filter({
bucket: bucket.name,
Expand All @@ -47,6 +61,7 @@ async function handle_bucket_rule(system, rule, j, bucket) {
size_greater: rule.filter.object_size_greater_than,
tags: rule.filter.tags,
limit: config.LIFECYCLE_BATCH_SIZE,
reply_objects,
}, {
auth_token: auth_server.make_auth_token({
system_id: system._id,
Expand All @@ -55,6 +70,46 @@ async function handle_bucket_rule(system, rule, j, bucket) {
})
});

//dbg.log0("LIFECYCLE PROCESSING res =", res);

if (res.deleted_objects) {

const writes = [];

for (const deleted_obj of res.deleted_objects) {
for (const notif of bucket.notifications) {
if (check_notif_relevant(notif, {
op_name: 'lifecycle_delete',
s3_event_method: deleted_obj.created_delete_marker ? 'DeleteMarkerCreated' : 'Delete',
})) {
//remember that this deletion needs a notif for this specific notification conf
writes.push({notif, deleted_obj});
}
}
}

//if any notifications are needed, write them in notification log file
//(otherwise don't do any unnecessary filesystem actions)
if (writes.length > 0) {
let logger;
const promises = [];
try {
logger = get_notification_logger('SHARED');
for (const write of writes) {
promises.push(new Promise((res) => {
const notif = compose_notification_lifecycle(write.deleted_obj, write.notif, bucket);
logger.append(JSON.stringify(notif)).then(res);
}));
}
await Promise.all(promises);
}
finally {
if (logger) logger.close();
}
}

}

bucket.lifecycle_configuration_rules[j].last_sync = Date.now();
if (res.num_objects_deleted >= config.LIFECYCLE_BATCH_SIZE) should_rerun = true;
dbg.log0('LIFECYCLE Done bucket:', bucket.name, '(bucket id:', bucket._id, ') done deletion of objects per rule',
Expand All @@ -78,7 +133,7 @@ async function background_worker() {

const results = await P.all(_.map(bucket.lifecycle_configuration_rules,
async (lifecycle_rule, j) => {
dbg.log0('LIFECYCLE READ BUCKETS configuration handle_bucket_rule bucket name:', bucket.name, "rule", lifecycle_rule, 'j', j);
dbg.log0('LIFECYCLE READ BUCKETS configuration handle_bucket_rule bucket name:', bucket.name.unwrap(), "rule", lifecycle_rule, 'j', j);
return handle_bucket_rule(system, lifecycle_rule, j, bucket);
}
));
Expand Down
12 changes: 11 additions & 1 deletion src/server/object_services/object_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -949,6 +949,7 @@ async function delete_multiple_objects_by_filter(req) {
dbg.log1(`delete_multiple_objects_by_filter: bucket=${req.bucket.name} filter=${util.inspect(req.rpc_params)}`);
const key = new RegExp('^' + _.escapeRegExp(req.rpc_params.prefix));
const bucket_id = req.bucket._id;
const reply_objects = req.rpc_params.reply_objects;
// TODO: change it to perform changes in batch. Won't scale.
const query = {
bucket_id,
Expand All @@ -972,7 +973,16 @@ async function delete_multiple_objects_by_filter(req) {
}))
}
}));
return { num_objects_deleted: objects.length };

const reply = { num_objects_deleted: objects.length };
if (reply_objects) {
//reply needs to include deleted objects
//(this is used for LifecycleExpiratoin event notifications)
//so map the md into (api friendly) object info
reply.deleted_objects = _.map(objects, get_object_info);
}

return reply;
}

/**
Expand Down
126 changes: 90 additions & 36 deletions src/util/notifications_util.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const OP_TO_EVENT = Object.freeze({
put_object_acl: { name: 'ObjectAcl' },
put_object_tagging: { name: 'ObjectTagging' },
delete_object_tagging: { name: 'ObjectTagging' },
lifecycle_delete: { name: 'LifecycleExpiration' },
});

class Notificator {
Expand Down Expand Up @@ -86,7 +87,7 @@ class Notificator {
seen_nodes.add(node_namespace);
}
dbg.log1("process_notification_files node_namespace =", node_namespace, ", file =", entry.name);
const log = new PersistentLogger(config.NOTIFICATION_LOG_DIR, node_namespace, { locking: 'EXCLUSIVE' });
const log = get_notification_logger('EXCLUSIVE', node_namespace);
try {
await log.process(async (file, failure_append) => await this._notify(this.fs_context, file, failure_append));
} catch (err) {
Expand Down Expand Up @@ -310,51 +311,65 @@ async function test_notifications(bucket) {
}
}

//see https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html
function compose_notification(req, res, bucket, notif_conf) {
let eTag = res.getHeader('ETag');
//eslint-disable-next-line
if (eTag && eTag.startsWith('\"') && eTag.endsWith('\"')) {
eTag = eTag.substring(2, eTag.length - 2);
}
function compose_notification_base(notif_conf, bucket, req) {

const event = OP_TO_EVENT[req.op_name];
const http_verb_capitalized = req.method.charAt(0).toUpperCase() + req.method.slice(1).toLowerCase();
const event_time = new Date();

const notif = {
eventVersion: '2.3',
eventSource: _get_system_name(req) + ':s3',
eventTime: event_time.toISOString(),
eventName: event.name + ':' + (event.method || req.s3_event_method || http_verb_capitalized),
userIdentity: {
principalId: req.object_sdk.requesting_account.name,
},
requestParameters: {
sourceIPAddress: http_utils.parse_client_ip(req),
},
responseElements: {
"x-amz-request-id": req.request_id,
"x-amz-id-2": req.request_id,
},

s3: {
s3SchemaVersion: "1.0",
configurationId: notif_conf.name,
object: {
//default for sequencer, overriden in compose_notification_req for noobaa ns
sequencer: event_time.getTime().toString(16),
},
bucket: {
name: bucket.name,
ownerIdentity: {
principalId: bucket.bucket_owner.unwrap(),
//buckets from s3 reqs are sdk-style, from lifcycle are "raw" system store object
principalId: bucket.bucket_owner ? bucket.bucket_owner.unwrap() : bucket.owner_account.name.unwrap(),
},
arn: "arn:aws:s3:::" + bucket.name,
},
object: {
key: req.params.key,
size: res.getHeader('content-length'),
eTag,
versionId: res.getHeader('x-amz-version-id'),
},
}
}

return notif;

}

//see https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html
function compose_notification_req(req, res, bucket, notif_conf) {
let eTag = res.getHeader('ETag');
//eslint-disable-next-line
if (eTag && eTag.startsWith('\"') && eTag.endsWith('\"')) {
eTag = eTag.substring(2, eTag.length - 2);
}

const event = OP_TO_EVENT[req.op_name];
const http_verb_capitalized = req.method.charAt(0).toUpperCase() + req.method.slice(1).toLowerCase();

const notif = compose_notification_base(notif_conf, bucket, req);

notif.eventName = event.name + ':' + (event.method || req.s3_event_method || http_verb_capitalized);
notif.userIdentity = {
principalId: req.object_sdk.requesting_account.name,
};
notif.requestParameters = {
sourceIPAddress: http_utils.parse_client_ip(req),
};
notif.responseElements = {
"x-amz-request-id": req.request_id,
"x-amz-id-2": req.request_id,
};
notif.s3.object.key = req.params.key;
notif.s3.object.size = res.getHeader('content-length'),
notif.s3.object.eTag = eTag;
notif.s3.object.versionId = res.getHeader('x-amz-version-id')

//handle glacierEventData
if (res.restore_object_result) {
Expand All @@ -370,21 +385,41 @@ function compose_notification(req, res, bucket, notif_conf) {
if (res.seq) {
//in noobaa-ns we have a sequence from db
notif.s3.object.sequencer = res.seq;
} else {
//fallback to time-based sequence
notif.s3.object.sequencer = event_time.getTime().toString(16);
}

const records = [notif];

return {Records: records};
return compose_meta(notif, notif_conf);
}

function compose_notification_lifecycle(deleted_obj, notif_conf, bucket) {

const notif = compose_notification_base(notif_conf, bucket);

notif.eventName = OP_TO_EVENT.lifecycle_delete.name + ':' +
deleted_obj.created_delete_marker ? 'DeleteMarkerCreated' : 'Delete';
notif.s3.object.key = deleted_obj.key;
notif.s3.object.size = deleted_obj.size;
notif.s3.object.eTag = deleted_obj.etag;
notif.s3.object.versionId = deleted_obj.version_id;

return compose_meta(notif, notif_conf);

}

function compose_meta(record, notif_conf) {
return {
meta: {
connect: notif_conf.Connect,
name: notif_conf.Id
},
notif: {
Records: [record],
}
};
}

function _get_system_name(req) {

if (req.object_sdk.nsfs_config_root) {
if (req && req.object_sdk && req.object_sdk.nsfs_system) {
const name = Object.keys(req.object_sdk.nsfs_system)[0];
return name;
} else {
Expand Down Expand Up @@ -427,7 +462,26 @@ function check_notif_relevant(notif, req) {
return false;
}

/**
*
* @param {"SHARED" | "EXCLUSIVE"} locking counterintuitively, either 'SHARED' for writing or 'EXCLUSIVE' for reading
*/
function get_notification_logger(locking, namespace, poll_interval) {
if (!namespace) {
const node_name = process.env.NODE_NAME || os.hostname();
namespace = node_name + '_' + config.NOTIFICATION_LOG_NS;
}

return new PersistentLogger(config.NOTIFICATION_LOG_DIR, namespace, {
locking,
poll_interval,
});
}

exports.Notificator = Notificator;
exports.test_notifications = test_notifications;
exports.compose_notification = compose_notification;
exports.compose_notification_req = compose_notification_req;
exports.compose_notification_lifecycle = compose_notification_lifecycle;
exports.check_notif_relevant = check_notif_relevant;
exports.get_notification_logger = get_notification_logger;
exports.OP_TO_EVENT = OP_TO_EVENT;

0 comments on commit e2f6412

Please sign in to comment.