Skip to content

Commit

Permalink
Address PR review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
jshearer committed Dec 8, 2023
1 parent b897637 commit 800c8cd
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 38 deletions.
38 changes: 12 additions & 26 deletions broker/fragment/store_azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ type udcAndExp struct {

type azureBackend struct {
// This is a cache of configured Pipelines for each tenant. These do not expire
clients map[string]pipeline.Pipeline
pipelines map[string]pipeline.Pipeline
// This is a cache of Azure storage clients for each tenant. These do not expire
svcClients map[string]service.Client
mu sync.Mutex
clients map[string]*service.Client
mu sync.Mutex
// This is a cache of URL-signing credentials for each tenant. These DO expire
udcs map[string]udcAndExp
}
Expand Down Expand Up @@ -267,25 +267,19 @@ func (a *azureBackend) getAzureServiceClient(endpoint *url.URL) (client *service
return nil, err
}

a.mu.Lock()
if a.svcClients == nil {
a.svcClients = make(map[string]service.Client)
}
a.mu.Unlock()

if endpoint.Scheme == "azure" {
var accountName = os.Getenv("AZURE_ACCOUNT_NAME")
var accountKey = os.Getenv("AZURE_ACCOUNT_KEY")

a.mu.Lock()
client, ok := a.svcClients[accountName]
client, ok := a.clients[accountName]
a.mu.Unlock()

if ok {
log.WithFields(log.Fields{
"storageAccountName": accountName,
}).Info("Re-using cached azure:// service client")
return &client, nil
return client, nil
}

sharedKeyCred, err := service.NewSharedKeyCredential(accountName, accountKey)
Expand All @@ -298,7 +292,7 @@ func (a *azureBackend) getAzureServiceClient(endpoint *url.URL) (client *service
}

a.mu.Lock()
a.svcClients[accountName] = *serviceClient
a.clients[accountName] = serviceClient
a.mu.Lock()
return serviceClient, nil
} else if endpoint.Scheme == "azure-ad" {
Expand All @@ -308,14 +302,14 @@ func (a *azureBackend) getAzureServiceClient(endpoint *url.URL) (client *service
var clientSecret = os.Getenv("AZURE_CLIENT_SECRET")

a.mu.Lock()
client, ok := a.svcClients[cfg.accountTenantID]
client, ok := a.clients[cfg.accountTenantID]
a.mu.Unlock()

if ok {
log.WithFields(log.Fields{
"accountTenantId": cfg.accountTenantID,
}).Info("Re-using cached azure-ad:// service client")
return &client, nil
return client, nil
}

identityCreds, err := azidentity.NewClientSecretCredential(
Expand All @@ -337,7 +331,7 @@ func (a *azureBackend) getAzureServiceClient(endpoint *url.URL) (client *service
}

a.mu.Lock()
a.svcClients[cfg.accountTenantID] = *serviceClient
a.clients[cfg.accountTenantID] = serviceClient
a.mu.Unlock()

return serviceClient, nil
Expand All @@ -351,7 +345,7 @@ func (a *azureBackend) getAzurePipeline(ep *url.URL) (cfg azureStoreConfig, clie
}

a.mu.Lock()
client = a.clients[cfg.accountTenantID]
client = a.pipelines[cfg.accountTenantID]
a.mu.Unlock()

if client != nil {
Expand Down Expand Up @@ -395,11 +389,9 @@ func (a *azureBackend) getAzurePipeline(ep *url.URL) (cfg azureStoreConfig, clie
}

client = azblob.NewPipeline(credentials, azblob.PipelineOptions{})

a.mu.Lock()
if a.clients == nil {
a.clients = make(map[string]pipeline.Pipeline)
}
a.clients[cfg.accountTenantID] = client
a.pipelines[cfg.accountTenantID] = client
a.mu.Unlock()

log.WithFields(log.Fields{
Expand All @@ -423,12 +415,6 @@ func (a *azureBackend) buildBlobURL(cfg azureStoreConfig, client pipeline.Pipeli

// Cache UserDelegationCredentials and refresh them when needed
func (a *azureBackend) getUserDelegationCredential(endpoint *url.URL) (*service.UserDelegationCredential, error) {
a.mu.Lock()
if a.udcs == nil {
a.udcs = make(map[string]udcAndExp)
}
a.mu.Unlock()

var cfg, err = parseAzureEndpoint(endpoint)
if err != nil {
return nil, err
Expand Down
29 changes: 17 additions & 12 deletions broker/fragment/stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"text/template"
"time"

"github.com/Azure/azure-pipeline-go/pipeline"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service"
"github.com/gorilla/schema"
"github.com/pkg/errors"
pb "go.gazette.dev/core/broker/protocol"
Expand All @@ -35,10 +37,14 @@ var sharedStores = struct {
azure *azureBackend
fs *fsBackend
}{
s3: newS3Backend(),
gcs: &gcsBackend{},
azure: &azureBackend{},
fs: &fsBackend{},
s3: newS3Backend(),
gcs: &gcsBackend{},
azure: &azureBackend{
pipelines: make(map[string]pipeline.Pipeline),
clients: make(map[string]*service.Client),
udcs: make(map[string]udcAndExp),
},
fs: &fsBackend{},
}

func getBackend(scheme string) backend {
Expand Down Expand Up @@ -194,14 +200,13 @@ func evalPathPostfix(spool Spool, spec *pb.JournalSpec) (string, error) {
// in the implementation of new journal naming taxonomies which don't disrupt
// journal fragments that are already written.
//
// var cfg = RewriterConfig{
// Replace: "/old-path/page-views/
// Find: "/bar/v1/page-views/",
// }
// // Remaps journal name => fragment store URL:
// // "/foo/bar/v1/page-views/part-000" => "s3://my-bucket/foo/old-path/page-views/part-000" // Matched.
// // "/foo/bar/v2/page-views/part-000" => "s3://my-bucket/foo/bar/v2/page-views/part-000" // Not matched.
//
// var cfg = RewriterConfig{
// Replace: "/old-path/page-views/
// Find: "/bar/v1/page-views/",
// }
// // Remaps journal name => fragment store URL:
// // "/foo/bar/v1/page-views/part-000" => "s3://my-bucket/foo/old-path/page-views/part-000" // Matched.
// // "/foo/bar/v2/page-views/part-000" => "s3://my-bucket/foo/bar/v2/page-views/part-000" // Not matched.
type RewriterConfig struct {
// Find is the string to replace in the unmodified journal name.
Find string
Expand Down

0 comments on commit 800c8cd

Please sign in to comment.