Skip to content

Commit

Permalink
Add support for Access Point ARN
Browse files Browse the repository at this point in the history
  • Loading branch information
chemamartinez committed Oct 31, 2024
1 parent b1c7478 commit d3b8e50
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 9 deletions.
17 changes: 17 additions & 0 deletions x-pack/filebeat/input/awss3/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package awss3
import (
"errors"
"fmt"
"strings"
"time"

awssdk "github.com/aws/aws-sdk-go-v2/aws"
Expand Down Expand Up @@ -139,6 +140,9 @@ type backupConfig struct {

func (c *backupConfig) GetBucketName() string {
if c.BackupToBucketArn != "" {
if isAccessPointARN(c.BackupToBucketArn) {
return c.BackupToBucketArn
}
return getBucketNameFromARN(c.BackupToBucketArn)
}
return c.NonAWSBackupToBucketName
Expand Down Expand Up @@ -226,6 +230,10 @@ func (c config) getBucketName() string {
return c.NonAWSBucketName
}
if c.BucketARN != "" {
// Check if it's an Access Point ARN
if isAccessPointARN(c.BucketARN) {
return c.BucketARN // Return full ARN for Access Points
}
return getBucketNameFromARN(c.BucketARN)
}
return ""
Expand Down Expand Up @@ -277,3 +285,12 @@ func (c config) getFileSelectors() []fileSelectorConfig {
}
return []fileSelectorConfig{{ReaderConfig: c.ReaderConfig}}
}

// Helper function to detect if an ARN is an Access Point
func isAccessPointARN(arn string) bool {
arnParts := strings.Split(arn, ":")
if len(arnParts) < 6 {
return false
}
return strings.Contains(arn, ":accesspoint/")
}
23 changes: 22 additions & 1 deletion x-pack/filebeat/input/awss3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package awss3

import (
"fmt"
"strings"

awssdk "github.com/aws/aws-sdk-go-v2/aws"

Expand Down Expand Up @@ -48,7 +49,27 @@ func (im *s3InputManager) Create(cfg *conf.C) (v2.Input, error) {
return nil, fmt.Errorf("initializing AWS config: %w", err)
}

if config.AWSConfig.Endpoint != "" {
if config.BucketARN != "" && isAccessPointARN(config.BucketARN) {
// When using the access point ARN, requests must be directed to the
// access point hostname. The access point hostname takes the form
// AccessPointName-AccountId.s3-accesspoint.Region.amazonaws.com
arnParts := strings.Split(config.BucketARN, ":")
accountID := arnParts[4]
region := arnParts[3]
accessPointName := strings.Split(arnParts[5], "/")[1]

// Construct the endpoint for the Access Point
endpoint := fmt.Sprintf("%s-%s.s3-accesspoint.%s.amazonaws.com", accessPointName, accountID, region)

// Set up a custom endpoint resolver for Access Points
awsConfig.EndpointResolverWithOptions = awssdk.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (awssdk.Endpoint, error) {

Check failure on line 65 in x-pack/filebeat/input/awss3/input.go

View workflow job for this annotation

GitHub Actions / lint (linux)

SA1019: awsConfig.EndpointResolverWithOptions is deprecated: with the release of endpoint resolution v2 in API clients, EndpointResolver and EndpointResolverWithOptions are deprecated. Providing a value for this field will likely prevent you from using newer endpoint-related service features. See API client options EndpointResolverV2 and BaseEndpoint. (staticcheck)
return awssdk.Endpoint{

Check failure on line 66 in x-pack/filebeat/input/awss3/input.go

View workflow job for this annotation

GitHub Actions / lint (linux)

SA1019: awssdk.Endpoint is deprecated: This structure was used with the global [EndpointResolver] interface, which has been deprecated in favor of service-specific endpoint resolution. See the deprecation docs on that interface for more information. (staticcheck)
URL: fmt.Sprintf("https://%s", endpoint),
SigningRegion: region,
HostnameImmutable: true,
}, nil
})
} else if config.AWSConfig.Endpoint != "" {
// Add a custom endpointResolver to the awsConfig so that all the requests are routed to this endpoint
awsConfig.EndpointResolverWithOptions = awssdk.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (awssdk.Endpoint, error) {

Check failure on line 74 in x-pack/filebeat/input/awss3/input.go

View workflow job for this annotation

GitHub Actions / lint (linux)

SA1019: awsConfig.EndpointResolverWithOptions is deprecated: with the release of endpoint resolution v2 in API clients, EndpointResolver and EndpointResolverWithOptions are deprecated. Providing a value for this field will likely prevent you from using newer endpoint-related service features. See API client options EndpointResolverV2 and BaseEndpoint. (staticcheck)
return awssdk.Endpoint{

Check failure on line 75 in x-pack/filebeat/input/awss3/input.go

View workflow job for this annotation

GitHub Actions / lint (linux)

SA1019: awssdk.Endpoint is deprecated: This structure was used with the global [EndpointResolver] interface, which has been deprecated in favor of service-specific endpoint resolution. See the deprecation docs on that interface for more information. (staticcheck)
Expand Down
35 changes: 27 additions & 8 deletions x-pack/filebeat/input/awss3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@ import (

func createS3API(ctx context.Context, config config, awsConfig awssdk.Config) (*awsS3API, error) {
s3Client := s3.NewFromConfig(awsConfig, config.s3ConfigModifier)
regionName, err := getRegionForBucket(ctx, s3Client, config.getBucketName())
if err != nil {
return nil, fmt.Errorf("failed to get AWS region for bucket: %w", err)
}
// Can this really happen?
if regionName != awsConfig.Region {
awsConfig.Region = regionName
s3Client = s3.NewFromConfig(awsConfig, config.s3ConfigModifier)

// Only attempt to get the region for Bucket ARNs, not Access Point ARNs
if !isAccessPointARN(config.getBucketARN()) {
regionName, err := getRegionForBucket(ctx, s3Client, config.getBucketName())
if err != nil {
return nil, fmt.Errorf("failed to get AWS region for bucket: %w", err)
}
// Can this really happen?
if regionName != awsConfig.Region {
awsConfig.Region = regionName
s3Client = s3.NewFromConfig(awsConfig, config.s3ConfigModifier)
}
}

return newAWSs3API(s3Client), nil
Expand All @@ -43,6 +47,12 @@ func createPipelineClient(pipeline beat.Pipeline, acks *awsACKHandler) (beat.Cli
}

func getRegionForBucket(ctx context.Context, s3Client *s3.Client, bucketName string) (string, error) {
// Skip region fetching if it's an Access Point ARN
if isAccessPointARN(bucketName) {
// Extract the region from the ARN (e.g., arn:aws:s3:us-west-2:123456789012:accesspoint/my-access-point)
return getRegionFromARN(bucketName), nil
}

getBucketLocationOutput, err := s3Client.GetBucketLocation(ctx, &s3.GetBucketLocationInput{
Bucket: awssdk.String(bucketName),
})
Expand All @@ -59,6 +69,15 @@ func getRegionForBucket(ctx context.Context, s3Client *s3.Client, bucketName str
return string(getBucketLocationOutput.LocationConstraint), nil
}

// Helper function to extract region from ARN
func getRegionFromARN(arn string) string {
arnParts := strings.Split(arn, ":")
if len(arnParts) > 3 {
return arnParts[3] // The fourth part of ARN is region
}
return ""
}

func getBucketNameFromARN(bucketARN string) string {
bucketMetadata := strings.Split(bucketARN, ":")
bucketName := bucketMetadata[len(bucketMetadata)-1]
Expand Down

0 comments on commit d3b8e50

Please sign in to comment.