Skip to content

Commit

Permalink
Restructure for better large bucket support
Browse files Browse the repository at this point in the history
* Use a synchronous queue to handle the processing of events in parallel
* Switch to aws-sdk v3
* Add a lot of logging
* Enable the use of the SDK's start_after parameter call to fetch only new events (useful in cases where objects are stored in alphabetical order by time, such as S3 access logs)
* Limit the batch size of the S3 request
  • Loading branch information
ph authored and eherot committed Nov 2, 2023
1 parent b7f42d7 commit ce1edd1
Show file tree
Hide file tree
Showing 26 changed files with 1,573 additions and 1,049 deletions.
485 changes: 155 additions & 330 deletions lib/logstash/inputs/s3.rb

Large diffs are not rendered by default.

81 changes: 81 additions & 0 deletions lib/logstash/inputs/s3/event_processor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# encoding: utf-8
require "logstash/inputs/base"

module LogStash module Inputs class S3 < LogStash::Inputs::Base
# Take the raw event from the files and apply the codec
# and the metadata.
class EventProcessor
def initialize(logstash_inputs_s3, codec, queue, include_object_properties, logger)
@queue = queue
@codec = codec
@logstash_inputs_s3 = logstash_inputs_s3
@include_object_properties = include_object_properties
@logger = logger
end

def process(line, metadata, remote_file_data)
@codec.decode(line) do |event|
# We are making an assumption concerning cloudfront
# log format, the user will use the plain or the line codec
# and the message key will represent the actual line content.
# If the event is only metadata the event will be drop.
# This was the behavior of the pre 1.5 plugin.
#
# The line need to go through the codecs to replace
# unknown bytes in the log stream before doing a regexp match or
# you will get a `Error: invalid byte sequence in UTF-8'
if event_is_metadata?(event)
@logger.debug('Event is metadata, updating the current cloudfront metadata', :event => event)
return update_metadata(metadata, event)
end

@logger.debug('Event is not metadata, pushing to queue', :event => event, :metadata => metadata)
push_decoded_event(@queue, metadata, remote_file_data, event)
end
end

private

def push_decoded_event(queue, metadata, remote_file_data, event)
@logstash_inputs_s3.send(:decorate, event)

if @include_object_properties
event.set("[@metadata][s3]", remote_file_data.to_h)
else
event.set("[@metadata][s3]", {})
end

# event.set("[@metadata][s3][key]", remote_file.key) # key should already be in remote_file_data.to_h
event.set(@cloudfront_version_key, metadata[:cloudfront_version]) unless metadata[:cloudfront_version].nil?
event.set(@cloudfront_fields_key, metadata[:cloudfront_fields]) unless metadata[:cloudfront_fields].nil?

queue << event
end

def event_is_metadata?(event)
return false unless event.get("message").class == String
line = event.get("message")
version_metadata?(line) || fields_metadata?(line)
end

def version_metadata?(line)
line.start_with?('#Version: ')
end

def fields_metadata?(line)
line.start_with?('#Fields: ')
end

def update_metadata(metadata, event)
line = event.get('message').strip

if version_metadata?(line)
metadata[:cloudfront_version] = line.split(/#Version: (.+)/).last
end

if fields_metadata?(line)
metadata[:cloudfront_fields] = line.split(/#Fields: (.+)/).last
end
end
end
end end end
20 changes: 0 additions & 20 deletions lib/logstash/inputs/s3/patch.rb

This file was deleted.

98 changes: 98 additions & 0 deletions lib/logstash/inputs/s3/poller.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# encoding: utf-8
require "logstash/inputs/base"

require "logstash/inputs/s3/remote_file"
require "stud/interval"

module LogStash module Inputs class S3 < LogStash::Inputs::Base
class Poller
DEFAULT_OPTIONS = {
:polling_interval => 1,
:use_start_after => false,
:batch_size => 1000,
:buckets_options => {},
:gzip_pattern => "\.gz(ip)?$"
}

def initialize(bucket, sincedb, logger, options = {})
@bucket = bucket
@sincedb = sincedb
@logger = logger
@stopped = false

@options = DEFAULT_OPTIONS.merge(options)
@last_key_fetched = nil if @options[:use_start_after]
end

def run(&block)
Stud.interval(options[:polling_interval]) do
Stud.stop! if stop?

retrieved_count = retrieve_objects(&block)

# If we retrieved the amount of the batch size, it means there are still
# more objects to retrieve so don't wait for the next interval
redo if retrieved_count == options[:batch_size]
end
end

def stop
@stopped = true
end

private
attr_reader :options

def retrieve_objects(&block)
@logger.debug("Retrieving objects from S3", :options => options)

retrieved_count = 0
remote_objects.limit(options[:batch_size]).each do |object|
return if stop?

block.call(RemoteFile.new(object, @logger, @gzip_pattern))

if options[:use_start_after]
@last_key_fetched = object.key
@logger.debug("Setting last_key_fetched", :last_key_fetched => @last_key_fetched)
end

retrieved_count += 1
end

retrieved_count
end

def remote_objects
@logger.info("Instantiating S3 object collection", :bucket_listing_options => bucket_listing_options)
@bucket.objects(bucket_listing_options)
end

def bucket_listing_options
output = {}

if options[:use_start_after]
if @last_key_fetched
@logger.debug("Setting start_after to last_key_fetched",
:last_key_fetched => @last_key_fetched)
output[:start_after] = @last_key_fetched
elsif (oldest_key = @sincedb.oldest_key)
@logger.debug("Setting start_after to SinceDB.oldest_key", :oldest_key => oldest_key)
output[:start_after] = oldest_key
else
@logger.debug("use_start_after is enabled but no previous key was found in the " +
"sincedb and @last_key_fetched is nil. Starting from the beginning" +
" of the bucket.")
end
else
@logger.debug("use_start_after is disabled, relying on last_modified to filter seen objects")
end

output.merge(options[:buckets_options])
end

def stop?
@stopped
end
end
end;end;end
71 changes: 71 additions & 0 deletions lib/logstash/inputs/s3/post_processor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# encoding: utf-8
require "logstash/inputs/base"

require "fileutils"

module LogStash module Inputs class S3 < LogStash::Inputs::Base
class PostProcessor
class UpdateSinceDB
def initialize(sincedb)
@sincedb = sincedb
end

def process(remote_file)
@sincedb.completed(remote_file)
end
end

class BackupLocally
def initialize(backup_to_dir)
@backup_dir = backup_to_dir
end

def process(remote_file)
destination = File.join(@backup_dir, remote_file.key)

if File.exist?(destination)
destination = File.join(@backup_dir, "#{remote_file.key}_#{remote_file.version}")
end

case remote_file.file
when StringIO
File.open(destination) { |f| f.write(remote_file.file.read) }
when File
FileUtils.cp(remote_file.file.path, destination)
end
end
end

class BackupToBucket
attr_reader :backup_bucket, :backup_prefix

def initialize(backup_bucket, backup_prefix = nil)
@backup_bucket = backup_bucket
@backup_prefix = backup_prefix
end

def process(remote_file)
remote_file.remote_object.copy_to(destination(remote_file))
end

def destination(remote_file)
"#{@backup_bucket}/#{rename(remote_file.key)}"
end

def rename(key)
backup_prefix.nil? ? key : "#{backup_prefix}#{key}"
end
end

class MoveToBucket < BackupToBucket
def process(remote_file)
remote_file.remote_object.move_to(destination(remote_file))
end
end

class DeleteFromSourceBucket
def process(remote_file)
remote_file.remote_object.delete
end
end
end end end end
92 changes: 92 additions & 0 deletions lib/logstash/inputs/s3/processing_policy_validator.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# encoding: utf-8
require "logstash/inputs/base"

module LogStash module Inputs class S3 < LogStash::Inputs::Base
class ProcessingPolicyValidator
class SkipEndingDirectory
ENDING_DIRECTORY_STRING = "/"

def self.process?(remote_file)
!remote_file.key.end_with?(ENDING_DIRECTORY_STRING)
end
end

class SkipEmptyFile
def self.process?(remote_file)
remote_file.content_length > 0
end
end

class IgnoreNewerThan
def initialize(seconds)
@seconds = seconds
end

def process?(remote_file)
Time.now - remote_file.last_modified >= @seconds
end
end

class IgnoreOlderThan
def initialize(seconds)
@seconds = seconds
end

def process?(remote_file)
Time.now - remote_file.last_modified <= @seconds
end
end

class AlreadyProcessed
def initialize(sincedb)
@sincedb = sincedb
end

def process?(remote_file)
!@sincedb.processed?(remote_file)
end
end

class ExcludePattern
def initialize(pattern)
@pattern = Regexp.new(pattern)
end

def process?(remote_file)
remote_file.key !~ @pattern
end
end

class ExcludeBackupedFiles < ExcludePattern
def initialize(backup_prefix)
super(/^#{backup_prefix}/)
end
end

def initialize(logger, *policies)
@logger = logger
@policies = []
add_policy(policies)
end

def add_policy(*policies)
@policies = @policies.concat([policies].flatten)
end

def process?(remote_file)
# TODO log were we stop
@policies.all? do |policy|
if policy.process?(remote_file)
true
else
@logger.debug("Skipping file because of policy", :remote_file => remote_file, :policy => policy.class)
return false
end
end
end

def count
@policies.count
end
end
end; end; end
Loading

0 comments on commit ce1edd1

Please sign in to comment.