Skip to content

csolidum/fluent-plugin-s3

 
 

Repository files navigation

Amazon S3 plugin for Fluentd

Build Status

Overview

s3 output plugin buffers event logs in local file and upload it to S3 periodically.

This plugin splits files exactly by using the time of event logs (not the time when the logs are received). For example, a log '2011-01-02 message B' is reached, and then another log '2011-01-03 message B' is reached in this order, the former one is stored in "20110102.gz" file, and latter one in "20110103.gz" file.

s3 input plugin reads data from S3 periodically. This plugin uses SQS queue on the region same as S3 bucket. We must setup SQS queue and S3 event notification before use this plugin.

Requirements

fluent-plugin-s3 fluentd ruby
>= 1.0.0 >= v0.14.0 >= 2.1
< 1.0.0 >= v0.12.0 >= 1.9

Installation

Simply use RubyGems:

$ gem install fluent-plugin-s3 -v "~> 0.8" --no-document # for fluentd v0.12 or later
$ gem install fluent-plugin-s3 -v 1.0.0 --no-document # for fluentd v1.0 or later

Configuration: credentials

Both S3 input/output plugin provide several credential methods for authentication/authorization.

AWS key and secret authentication

These parameters are required when your agent is not running on EC2 instance with an IAM Role. When using an IAM role, make sure to configure instance_profile_credentials. Usage can be found below.

aws_key_id

AWS access key id.

aws_sec_key

AWS secret key.

assume_role_credentials

Typically, you use AssumeRole for cross-account access or federation.

<match *>
  @type s3

  <assume_role_credentials>
    role_arn          ROLE_ARN
    role_session_name ROLE_SESSION_NAME
  </assume_role_credentials>
</match>

See also:

role_arn (required)

The Amazon Resource Name (ARN) of the role to assume.

role_session_name (required)

An identifier for the assumed role session.

policy

An IAM policy in JSON format.

duration_seconds

The duration, in seconds, of the role session. The value can range from 900 seconds (15 minutes) to 3600 seconds (1 hour). By default, the value is set to 3600 seconds.

external_id

A unique identifier that is used by third parties when assuming roles in their customers' accounts.

web_identity_credentials

Similar to the assume_role_credentials, but for usage in EKS.

<match *>
  @type s3

  <web_identity_credentials>
    role_arn          ROLE_ARN
    role_session_name ROLE_SESSION_NAME
    web_identity_token_file AWS_WEB_IDENTITY_TOKEN_FILE
  </web_identity_credentials>
</match>

See also:

role_arn (required)

The Amazon Resource Name (ARN) of the role to assume.

role_session_name (required)

An identifier for the assumed role session.

web_identity_token_file (required)

The absolute path to the file on disk containing the OIDC token

policy

An IAM policy in JSON format.

duration_seconds

The duration, in seconds, of the role session. The value can range from 900 seconds (15 minutes) to 43200 seconds (12 hours). By default, the value is set to 3600 seconds.

instance_profile_credentials

Retrieve temporary security credentials via HTTP request. This is useful on EC2 instance.

<match *>
  @type s3

  <instance_profile_credentials>
    ip_address IP_ADDRESS
    port       PORT
  </instance_profile_credentials>
</match>

See also:

retries

Number of times to retry when retrieving credentials. Default is 5.

ip_address

Default is 169.254.169.254.

port

Default is 80.

http_open_timeout

Default is 5.

http_read_timeout

Default is 5.

shared_credentials

This loads AWS access credentials from local ini file. This is useful for local developing.

<match *>
  @type s3

  <shared_credentials>
    path         PATH
    profile_name PROFILE_NAME
  </shared_credentials>
</match>

See also:

path

Path to the shared file. Defaults to "#{Dir.home}/.aws/credentials".

profile_name

Defaults to 'default' or [ENV]('AWS_PROFILE').

Configuration: Output

v1.0 style

With fluentd v1 and fluent-plugin-s3 v1.0.0 or later, use new buffer configuration to dynamic parameters.

<match pattern>
  @type s3

  aws_key_id YOUR_AWS_KEY_ID
  aws_sec_key YOUR_AWS_SECRET_KEY
  s3_bucket YOUR_S3_BUCKET_NAME
  s3_region ap-northeast-1

  path logs/${tag}/%Y/%m/%d/
  s3_object_key_format %{path}%{time_slice}_%{index}.%{file_extension}

  # if you want to use ${tag} or %Y/%m/%d/ like syntax in path / s3_object_key_format,
  # need to specify tag for ${tag} and time for %Y/%m/%d in <buffer> argument.
  <buffer tag,time>
    @type file
    path /var/log/fluent/s3
    timekey 3600 # 1 hour partition
    timekey_wait 10m
    timekey_use_utc true # use utc
  </buffer>
  <format>
    @type json
  </format>
</match>

For <buffer>, you can use any record field in path / s3_object_key_format.

path logs/${tag}/${foo}
<buffer tag,foo>
  # parameters...
</buffer>

See official article for more detail: Config: Buffer Section - Fluentd

Note that this configuration doesn't work with fluentd v0.12.

v0.12 style

This configuration works with both fluentd v0.12 and v1.0.

<match pattern>
  @type s3

  aws_key_id YOUR_AWS_KEY_ID
  aws_sec_key YOUR_AWS_SECRET_KEY
  s3_bucket YOUR_S3_BUCKET_NAME
  s3_region ap-northeast-1

  path logs/
  s3_object_key_format %{path}%{time_slice}_%{index}.%{file_extension}
  buffer_path /var/log/fluent/s3
  time_slice_format %Y%m%d-%H
  time_slice_wait 10m
  utc
  format json
</match>

If you want to embed tag in path / s3_object_key_format, you need to use fluent-plugin-forest plugin.

aws_iam_retries

This parameter is deprecated. Use instance_profile_credentials instead.

The number of attempts to make (with exponential backoff) when loading instance profile credentials from the EC2 metadata service using an IAM role. Defaults to 5 retries.

s3_bucket (required)

S3 bucket name.

s3_region

s3 region name. For example, US West (Oregon) Region is "us-west-2". The full list of regions are available here. > http://docs.aws.amazon.com/general/latest/gr/rande.html#s3_region. We recommend using s3_region instead of s3_endpoint.

s3_endpoint

endpoint for S3 compatible services. For example, Riak CS based storage or something. This option is deprecated for AWS S3, use s3_region instead.

See also AWS article: Working with Regions.

enable_transfer_acceleration

Enable S3 Transfer Acceleration for uploads. IMPORTANT: For this to work, you must first enable this feature on your destination S3 bucket.

use_bundled_cert

For cases where the default SSL certificate is unavailable (e.g. Windows), you can set this option to true in order to use the AWS SDK bundled certificate. Default is false.

This fixes the following error often seen in Windows:

SSL_connect returned=1 errno=0 state=SSLv3 read server certificate B: certificate verify failed (Seahorse::Client::NetworkingError)

ssl_verify_peer

Verify SSL certificate of the endpoint. Default is true. Set false when you want to ignore the endpoint SSL certificate.

s3_object_key_format

The format of S3 object keys. You can use several built-in variables:

  • %{path}
  • %{time_slice}
  • %{index}
  • %{file_extension}
  • %{hex_random}
  • %{uuid_flush}
  • %{hostname}

to decide keys dynamically.

  • %{path} is exactly the value of path configured in the configuration file. E.g., "logs/" in the example configuration above.
  • %{time_slice} is the time-slice in text that are formatted with time_slice_format.
  • %{index} is the sequential number starts from 0, increments when multiple files are uploaded to S3 in the same time slice.
  • %{file_extension} depends on store_as parameter.
  • %{uuid_flush} a uuid that is replaced everytime the buffer will be flushed. If you want to use this placeholder, install uuidtools gem first.
  • %{hostname} is replaced with Socket.gethostname result.
  • %{hex_random} a random hex string that is replaced for each buffer chunk, not assured to be unique. This is used to follow a way of performance tuning, Add a Hex Hash Prefix to Key Name, written in Request Rate and Performance Considerations - Amazon Simple Storage Service. You can configure the length of string with a hex_random_length parameter (Default: 4).

The default format is %{path}%{time_slice}_%{index}.%{file_extension}. In addition, you can use buffer placeholders in this parameter, so you can embed tag, time and record value like below:

s3_object_key_format %{path}/events/%Y%m%d/${tag}_%{index}.%{file_extension}
<buffer tag,time>
  # buffer parameters...
</buffer>

For instance, using the example configuration above, actual object keys on S3 will be something like:

"logs/20130111-22_0.gz"
"logs/20130111-23_0.gz"
"logs/20130111-23_1.gz"
"logs/20130112-00_0.gz"

With the configuration:

s3_object_key_format %{path}/events/ts=%{time_slice}/events_%{index}.%{file_extension}
path log
time_slice_format %Y%m%d-%H

You get:

"log/events/ts=20130111-22/events_0.gz"
"log/events/ts=20130111-23/events_0.gz"
"log/events/ts=20130111-23/events_1.gz"
"log/events/ts=20130112-00/events_0.gz"

NOTE: ${hostname} placeholder is deprecated since v0.8. You can get same result by using configuration's embedded ruby code feature.

s3_object_key_format %{path}%{time_slice}_%{hostname}%{index}.%{file_extension}
s3_object_key_format "%{path}%{time_slice}_#{Socket.gethostname}%{index}.%{file_extension}"

Above two configurations are same. The important point is wrapping "" is needed for #{Socket.gethostname}.

force_path_style

:force_path_style (Boolean) — default: false — When set to true, the bucket name is always left in the request URI and never moved to the host as a sub-domain. See Plugins::S3BucketDns for more details.

This parameter is deprecated. See AWS announcement: https://aws.amazon.com/blogs/aws/amazon-s3-path-deprecation-plan-the-rest-of-the-story/

store_as

archive format on S3. You can use several format:

  • gzip (default)
  • json
  • text
  • lzo (Need lzop command)
  • lzma2 (Need xz command)
  • gzip_command (Need gzip command)
    • This compressor uses an external gzip command, hence would result in utilizing CPU cores well compared with gzip

See Use your compression algorithm section for adding another format.

<format> or format

Change one line format in the S3 object. Supported formats are "out_file", "json", "ltsv", "single_value" and other formatter plugins. See also official Formatter article.

  • out_file (default).

      time\ttag\t{..json1..}
      time\ttag\t{..json2..}
      ...
    
  • json

      {..json1..}
      {..json2..}
      ...
    

At this format, "time" and "tag" are omitted. But you can set these information to the record by setting <inject> option. If you set following configuration in S3 output:

# v1
<format>
  @type json
</format>
<inject>
  time_key log_time
</inject>
# v0.12
@format json
include_time_key true
time_key log_time # default is time

then the record has log_time field.

{"log_time":"time string",...}

See also official Inject Section article.

  • ltsv

      key1:value1\tkey2:value2
      key1:value1\tkey2:value2
      ...
    
  • single_value

Use specified value instead of entire recode. If you get '{"message":"my log"}', then contents are

my log1
my log2
...

You can change key name by "message_key" option.

auto_create_bucket

Create S3 bucket if it does not exists. Default is true.

check_bucket

Check mentioned bucket if it exists in AWS or not. Default is true.

When it is false, fluentd will not check aws s3 for the existence of the mentioned bucket. This is the case where bucket will be pre-created before running fluentd.

check_object

Check object before creation if it exists or not. Default is true.

When it is false, s3_object_key_format will be %{path}%{time_slice}_%{hms_slice}.%{file_extension} by default where, hms_slice will be time-slice in hhmmss format, so that each object will be unique. Example object name, assuming it is created on 2016/16/11 3:30:54 PM 20161611_153054.txt (extension can be anything as per user's choice)

check_apikey_on_start

Check AWS key on start. Default is true.

proxy_uri

uri of proxy environment.

path

path prefix of the files on S3. Default is "" (no prefix). buffer placeholder is supported, so you can embed tag, time and record value like below.

path logs/%Y%m%d/${tag}/
<buffer tag,time>
  # buffer parameters...
</buffer>

buffer_path (for v0.12)

path prefix of the files to buffer logs.

This parameter is for v0.12. Use <buffer>'s path in v1.

time_slice_format(for v0.12)

Format of the time used as the file name. Default is '%Y%m%d'. Use '%Y%m%d%H' to split files hourly.

This parameter is for v0.12. Use buffer placeholder for path / s3_object_key_format in v1.

time_slice_wait(for v0.12)

The time to wait old logs. Default is 10 minutes. Specify larger value if old logs may reach.

This parameter is for v0.12. Use <buffer>'s timekey_wait in v1.

utc

Use UTC instead of local time.

storage_class

Set storage class. Possible values are STANDARD, REDUCED_REDUNDANCY, STANDARD_IA from Ruby SDK.

reduced_redundancy

Use S3 reduced redundancy storage for 33% cheaper pricing. Default is false.

This is deprecated. Use storage_class REDUCED_REDUNDANCY instead.

acl

Permission for the object in S3. This is useful for cross-account access using IAM roles. Valid values are:

  • private (default)
  • public-read
  • public-read-write (not recommended - see Canned ACL)
  • authenticated-read
  • bucket-owner-read
  • bucket-owner-full-control

To use cross-account access, you will need to create a bucket policy granting the specific access required. Refer to the AWS documentation for examples.

grant_full_control

Allows grantee READ, READ_ACP, and WRITE_ACP permissions on the object. This is useful for cross-account access using IAM roles.

Valid values are id="Grantee-CanonicalUserID". Please specify the grantee's canonical user ID.

e.g. id="79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be"

Note that a canonical user ID is different from an AWS account ID. Please refer to AWS documentation for more details.

grant_read

Allows grantee to read the object data and its metadata. Valid values are id="Grantee-CanonicalUserID". Please specify the grantee's canonical user ID.

e.g. id="79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be"

grant_read_acp

Allows grantee to read the object ACL. Valid values are id="Grantee-CanonicalUserID". Please specify the grantee's canonical user ID.

e.g. id="79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be"

grant_write_acp

Allows grantee to write the ACL for the applicable object. Valid values are id="Grantee-CanonicalUserID". Please specify the grantee's canonical user ID.

e.g. id="79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be"

hex_random_length

The length of %{hex_random} placeholder. Default is 4 as written in Request Rate and Performance Considerations - Amazon Simple Storage Service. The maximum length is 16.

index_format

%{index} is formatted by sprintf using this format_string. Default is '%d'. Zero padding is supported e.g. %04d to ensure minimum length four digits. %{index} can be in lowercase or uppercase hex using '%x' or '%X'

overwrite

Overwrite already existing path. Default is false, which raises an error if a s3 object of the same path already exists, or increment the %{index} placeholder until finding an absent path.

use_server_side_encryption

The Server-side encryption algorithm used when storing this object in S3 (e.g., AES256, aws:kms)

ssekms_key_id

Specifies the AWS KMS key ID to use for object encryption. You have to set "aws:kms" to use_server_side_encryption to use the KMS encryption.

sse_customer_algorithm

Specifies the algorithm to use to when encrypting the object (e.g., AES256).

sse_customer_key

Specifies the AWS KMS key ID to use for object encryption.

sse_customer_key_md5

Specifies the 128-bit MD5 digest of the encryption key according to RFC 1321.

compute_checksums

AWS SDK uses MD5 for API request/response by default. On FIPS enabled environment, OpenSSL returns an error because MD5 is disabled. If you want to use this plugin on FIPS enabled environment, set compute_checksums false.

signature_version

Signature version for API request. s3 means signature version 2 and v4 means signature version 4. Default is nil (Following SDK's default). It would be useful when you use S3 compatible storage that accepts only signature version 2.

warn_for_delay

Given a threshold to treat events as delay, output warning logs if delayed events were put into s3.

bucket_lifecycle_rule

Specify one or more lifecycle rules for the bucket

<bucket_lifecycle_rule>
  id UNIQUE_ID_FOR_THE_RULE
  prefix OPTIONAL_PREFIX # Objects whose keys begin with this prefix will be affected by the rule. If not specified all objects of the bucket will be affected
  expiration_days NUMBER_OF_DAYS # The number of days before the object will expire
</bucket_lifecycle_rule>

Input: Setup

  1. Create new SQS queue (use same region as S3)
  2. Set proper permission to new queue
  3. Configure S3 event notification
  4. Write configuration file such as fluent.conf
  5. Run fluentd

Configuration: Input

<source>
  @type s3

  aws_key_id YOUR_AWS_KEY_ID
  aws_sec_key YOUR_AWS_SECRET_KEY
  s3_bucket YOUR_S3_BUCKET_NAME
  s3_region ap-northeast-1
  add_object_metadata true

  <sqs>
    queue_name YOUR_SQS_QUEUE_NAME
  </sqs>
</source>

add_object_metadata

Whether or not object metadata should be added to the record. Defaults to false. See below for details.

s3_bucket (required)

S3 bucket name.

s3_region

S3 region name. For example, US West (Oregon) Region is "us-west-2". The full list of regions are available here. > http://docs.aws.amazon.com/general/latest/gr/rande.html#s3_region. We recommend using s3_region instead of s3_endpoint.

store_as

archive format on S3. You can use serveral format:

  • gzip (default)
  • json
  • text
  • lzo (Need lzop command)
  • lzma2 (Need xz command)
  • gzip_command (Need gzip command)
    • This compressor uses an external gzip command, hence would result in utilizing CPU cores well compared with gzip

See 'Use your compression algorithm' section for adding another format.

format

Parse a line as this format in the S3 object. Supported formats are "apache_error", "apache2", "syslog", "json", "tsv", "ltsv", "csv", "nginx" and "none".

check_apikey_on_start

Check AWS key on start. Default is true.

proxy_uri

URI of proxy environment.

sqs/queue_name (required)

SQS queue name. Need to create SQS queue on the region same as S3 bucket.

sqs/skip_delete

When true, messages are not deleted after polling block. Default is false.

sqs/wait_time_seconds

The long polling interval. Default is 20.

sqs/retry_error_interval

Interval to retry polling SQS if polling unsuccessful, in seconds. Default is 300.

Object Metadata Added To Records

If the add_object_metadata option is set to true, then the name of the bucket and the key for a given object will be added to each log record as s3_bucket and s3_key, respectively. This metadata can be used by filter plugins or other downstream processors to better identify the source of a given record.

IAM Policy

The following is an example for a IAM policy needed to write to an s3 bucket (matches my-s3bucket/logs, my-s3bucket-test, etc.).

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "s3:ListBucket"
      ],
      "Resource": "arn:aws:s3:::my-s3bucket"
    },
    {
      "Effect": "Allow",
      "Action": [
        "s3:PutObject",
        "s3:GetObject"
      ],
      "Resource": "arn:aws:s3:::my-s3bucket/*"
    }
  ]
}

Note that the bucket must already exist and auto_create_bucket has no effect in this case.

s3:GetObject is needed for object check to avoid object overwritten. If you set check_object false, s3:GetObject is not needed.

Refer to the AWS documentation for example policies.

Using IAM roles with a properly configured IAM policy are preferred over embedding access keys on EC2 instances.

Example when check_bucket false and check_object false

When the mentioned configuration will be made, fluentd will work with the minimum IAM poilcy, like:

"Statement": [{
  "Effect": "Allow",
  "Action": "s3:PutObject",
  "Resource": ["*"]
}]

Use your (de)compression algorithm

s3 plugin has pluggable compression mechanizm like Fluentd's input / output plugin. If you set 'store_as xxx', out_s3 plugin searches fluent/plugin/s3_compressor_xxx.rb and in_s3 plugin searches fluent/plugin/s3_extractor_xxx.rb. You can define your (de)compression with 'S3Output::Compressor'/S3Input::Extractor classes. Compressor API is here:

module Fluent # Since fluent-plugin-s3 v1.0.0 or later, use Fluent::Plugin instead of Fluent
  class S3Output
    class XXXCompressor < Compressor
      S3Output.register_compressor('xxx', self)

      # Used to file extension
      def ext
        'xxx'
      end

      # Used to file content type
      def content_type
        'application/x-xxx'
      end

      # chunk is buffer chunk. tmp is destination file for upload
      def compress(chunk, tmp)
        # call command or something
      end
    end
  end
end

Extractor is similar to Compressor See bundled Compressor/Extractor classes for more detail.

Website, license, et. al.

Web site http://fluentd.org/
Documents http://docs.fluentd.org/
Source repository http://github.com/fluent/fluent-plugin-s3
Discussion http://groups.google.com/group/fluentd
Author Sadayuki Furuhashi
Copyright (c) 2011 FURUHASHI Sadayuki
License Apache License, Version 2.0

About

Amazon S3 input and output plugin for Fluentd

Resources

Stars

Watchers

Forks

Packages

No packages published

Languages

  • Ruby 100.0%