From 443c8ee65d5d8c65168a446d9c012e96f45fb79a Mon Sep 17 00:00:00 2001 From: Idan Novogroder Date: Tue, 5 Nov 2024 17:16:19 +0200 Subject: [PATCH 1/4] Fix OOM lakectl FS upload bug --- cmd/lakectl/cmd/retry_client.go | 4 ++-- cmd/lakectl/cmd/root.go | 7 ++++--- pkg/api/helpers/upload.go | 12 ++++++------ pkg/local/sync.go | 5 +++-- 4 files changed, 15 insertions(+), 13 deletions(-) diff --git a/cmd/lakectl/cmd/retry_client.go b/cmd/lakectl/cmd/retry_client.go index 05da56c9ec3..a1afb3e3592 100644 --- a/cmd/lakectl/cmd/retry_client.go +++ b/cmd/lakectl/cmd/retry_client.go @@ -18,7 +18,7 @@ var ( notTrustedErrorRe = regexp.MustCompile(`certificate is not trusted`) ) -func NewRetryClient(retriesCfg RetriesCfg, transport *http.Transport) *http.Client { +func NewRetryClient(retriesCfg RetriesCfg, transport *http.Transport) *retryablehttp.Client { retryClient := retryablehttp.NewClient() if transport != nil { retryClient.HTTPClient.Transport = transport @@ -28,7 +28,7 @@ func NewRetryClient(retriesCfg RetriesCfg, transport *http.Transport) *http.Clie retryClient.RetryWaitMin = retriesCfg.MinWaitInterval retryClient.RetryWaitMax = retriesCfg.MaxWaitInterval retryClient.CheckRetry = lakectlRetryPolicy - return retryClient.StandardClient() + return retryClient } // lakectl retry policy - we retry in the following cases: diff --git a/cmd/lakectl/cmd/root.go b/cmd/lakectl/cmd/root.go index 26b4253fc40..d6324b49efc 100644 --- a/cmd/lakectl/cmd/root.go +++ b/cmd/lakectl/cmd/root.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "github.com/hashicorp/go-retryablehttp" "net/http" "os" "path/filepath" @@ -467,7 +468,7 @@ func sendStats(cmd *cobra.Command, cmdSuffix string) { } } -func getHTTPClient() *http.Client { +func getHTTPClient() *retryablehttp.Client { // Override MaxIdleConnsPerHost to allow highly concurrent access to our API client. // This is done to avoid accumulating many sockets in `TIME_WAIT` status that were closed // only to be immediately reopened. @@ -475,7 +476,7 @@ func getHTTPClient() *http.Client { transport := http.DefaultTransport.(*http.Transport).Clone() transport.MaxIdleConnsPerHost = DefaultMaxIdleConnsPerHost if !cfg.Server.Retries.Enabled { - return &http.Client{Transport: transport} + return NewRetryClient(RetriesCfg{MaxAttempts: 1}, transport) } return NewRetryClient(cfg.Server.Retries, transport) } @@ -498,7 +499,7 @@ func getClient() *apigen.ClientWithResponses { oss := osinfo.GetOSInfo() client, err := apigen.NewClientWithResponses( serverEndpoint, - apigen.WithHTTPClient(httpClient), + apigen.WithHTTPClient(httpClient.StandardClient()), apigen.WithRequestEditorFn(basicAuthProvider.Intercept), apigen.WithRequestEditorFn(func(ctx context.Context, req *http.Request) error { // This UA string structure is agreed upon diff --git a/pkg/api/helpers/upload.go b/pkg/api/helpers/upload.go index 27e09027a6f..90c24ecd94b 100644 --- a/pkg/api/helpers/upload.go +++ b/pkg/api/helpers/upload.go @@ -7,6 +7,7 @@ import ( "encoding/hex" "errors" "fmt" + "github.com/hashicorp/go-retryablehttp" "io" "mime" "mime/multipart" @@ -104,7 +105,7 @@ func ClientUpload(ctx context.Context, client apigen.ClientWithResponsesInterfac // It supports both multipart and single part uploads. type PreSignUploader struct { Concurrency int - HTTPClient *http.Client + HTTPClient *retryablehttp.Client Client apigen.ClientWithResponsesInterface MultipartSupport bool } @@ -124,7 +125,7 @@ type presignUpload struct { numParts int } -func NewPreSignUploader(client apigen.ClientWithResponsesInterface, httpClient *http.Client, multipartSupport bool) *PreSignUploader { +func NewPreSignUploader(client apigen.ClientWithResponsesInterface, httpClient *retryablehttp.Client, multipartSupport bool) *PreSignUploader { return &PreSignUploader{ Concurrency: DefaultUploadConcurrency, HTTPClient: httpClient, @@ -300,7 +301,7 @@ func (u *presignUpload) initMultipart(ctx context.Context) (*apigen.PresignMulti } func (u *presignUpload) uploadPart(ctx context.Context, partReader *io.SectionReader, partURL string) (string, error) { - req, err := http.NewRequestWithContext(ctx, http.MethodPut, partURL, partReader) + req, err := retryablehttp.NewRequestWithContext(ctx, http.MethodPut, partURL, partReader) if err != nil { return "", err } @@ -340,8 +341,7 @@ func (u *presignUpload) uploadObject(ctx context.Context) (*apigen.ObjectStats, // Passing Reader with content length == 0 results in 501 Not Implemented body = u.reader } - - req, err := http.NewRequestWithContext(ctx, http.MethodPut, preSignURL, body) + req, err := retryablehttp.NewRequestWithContext(ctx, http.MethodPut, preSignURL, body) if err != nil { return nil, err } @@ -403,7 +403,7 @@ func (u *presignUpload) Upload(ctx context.Context) (*apigen.ObjectStats, error) return u.uploadObject(ctx) } -func ClientUploadPreSign(ctx context.Context, client apigen.ClientWithResponsesInterface, httpClient *http.Client, repoID, branchID, objPath string, metadata map[string]string, contentType string, contents io.ReadSeeker, presignMultipartSupport bool) (*apigen.ObjectStats, error) { +func ClientUploadPreSign(ctx context.Context, client apigen.ClientWithResponsesInterface, httpClient *retryablehttp.Client, repoID, branchID, objPath string, metadata map[string]string, contentType string, contents io.ReadSeeker, presignMultipartSupport bool) (*apigen.ObjectStats, error) { // upload loop, retry on conflict uploader := NewPreSignUploader(client, httpClient, presignMultipartSupport) for { diff --git a/pkg/local/sync.go b/pkg/local/sync.go index fa82508334d..2bbde193ffb 100644 --- a/pkg/local/sync.go +++ b/pkg/local/sync.go @@ -6,6 +6,7 @@ import ( "encoding/json" "errors" "fmt" + "github.com/hashicorp/go-retryablehttp" "io" "net/http" "os" @@ -47,13 +48,13 @@ type Tasks struct { type SyncManager struct { ctx context.Context client *apigen.ClientWithResponses - httpClient *http.Client + httpClient *retryablehttp.Client progressBar *ProgressPool tasks Tasks cfg Config } -func NewSyncManager(ctx context.Context, client *apigen.ClientWithResponses, httpClient *http.Client, cfg Config) *SyncManager { +func NewSyncManager(ctx context.Context, client *apigen.ClientWithResponses, httpClient *retryablehttp.Client, cfg Config) *SyncManager { sm := &SyncManager{ ctx: ctx, client: client, From 523932ed663d07e73cc594d889d521ee92194c2d Mon Sep 17 00:00:00 2001 From: Idan Novogroder Date: Tue, 5 Nov 2024 17:21:59 +0200 Subject: [PATCH 2/4] Go imported --- cmd/lakectl/cmd/root.go | 2 +- pkg/api/helpers/upload.go | 2 +- pkg/local/sync.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cmd/lakectl/cmd/root.go b/cmd/lakectl/cmd/root.go index d6324b49efc..584c9cda21f 100644 --- a/cmd/lakectl/cmd/root.go +++ b/cmd/lakectl/cmd/root.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "github.com/hashicorp/go-retryablehttp" "net/http" "os" "path/filepath" @@ -14,6 +13,7 @@ import ( "github.com/deepmap/oapi-codegen/pkg/securityprovider" "github.com/go-openapi/swag" + "github.com/hashicorp/go-retryablehttp" "github.com/mitchellh/go-homedir" "github.com/mitchellh/mapstructure" "github.com/spf13/cobra" diff --git a/pkg/api/helpers/upload.go b/pkg/api/helpers/upload.go index 90c24ecd94b..507c0346f4c 100644 --- a/pkg/api/helpers/upload.go +++ b/pkg/api/helpers/upload.go @@ -7,7 +7,6 @@ import ( "encoding/hex" "errors" "fmt" - "github.com/hashicorp/go-retryablehttp" "io" "mime" "mime/multipart" @@ -18,6 +17,7 @@ import ( "strings" "github.com/go-openapi/swag" + "github.com/hashicorp/go-retryablehttp" "github.com/treeverse/lakefs/pkg/api/apigen" "github.com/treeverse/lakefs/pkg/api/apiutil" "github.com/treeverse/lakefs/pkg/block/azure" diff --git a/pkg/local/sync.go b/pkg/local/sync.go index 2bbde193ffb..f4e3b8be6f8 100644 --- a/pkg/local/sync.go +++ b/pkg/local/sync.go @@ -6,7 +6,6 @@ import ( "encoding/json" "errors" "fmt" - "github.com/hashicorp/go-retryablehttp" "io" "net/http" "os" @@ -19,6 +18,7 @@ import ( "time" "github.com/go-openapi/swag" + "github.com/hashicorp/go-retryablehttp" "github.com/jedib0t/go-pretty/v6/progress" "github.com/treeverse/lakefs/pkg/api/apigen" "github.com/treeverse/lakefs/pkg/api/helpers" From 3af12978222ec6a30eb30082166e3d5faffc0103 Mon Sep 17 00:00:00 2001 From: Idan Novogroder Date: Wed, 6 Nov 2024 10:54:39 +0200 Subject: [PATCH 3/4] Changed the fix --- cmd/lakectl/cmd/retry_client.go | 4 ++-- cmd/lakectl/cmd/root.go | 7 +++---- pkg/api/helpers/upload.go | 12 ++++++------ pkg/local/progress.go | 5 +++++ pkg/local/sync.go | 5 ++--- 5 files changed, 18 insertions(+), 15 deletions(-) diff --git a/cmd/lakectl/cmd/retry_client.go b/cmd/lakectl/cmd/retry_client.go index a1afb3e3592..05da56c9ec3 100644 --- a/cmd/lakectl/cmd/retry_client.go +++ b/cmd/lakectl/cmd/retry_client.go @@ -18,7 +18,7 @@ var ( notTrustedErrorRe = regexp.MustCompile(`certificate is not trusted`) ) -func NewRetryClient(retriesCfg RetriesCfg, transport *http.Transport) *retryablehttp.Client { +func NewRetryClient(retriesCfg RetriesCfg, transport *http.Transport) *http.Client { retryClient := retryablehttp.NewClient() if transport != nil { retryClient.HTTPClient.Transport = transport @@ -28,7 +28,7 @@ func NewRetryClient(retriesCfg RetriesCfg, transport *http.Transport) *retryable retryClient.RetryWaitMin = retriesCfg.MinWaitInterval retryClient.RetryWaitMax = retriesCfg.MaxWaitInterval retryClient.CheckRetry = lakectlRetryPolicy - return retryClient + return retryClient.StandardClient() } // lakectl retry policy - we retry in the following cases: diff --git a/cmd/lakectl/cmd/root.go b/cmd/lakectl/cmd/root.go index 584c9cda21f..26b4253fc40 100644 --- a/cmd/lakectl/cmd/root.go +++ b/cmd/lakectl/cmd/root.go @@ -13,7 +13,6 @@ import ( "github.com/deepmap/oapi-codegen/pkg/securityprovider" "github.com/go-openapi/swag" - "github.com/hashicorp/go-retryablehttp" "github.com/mitchellh/go-homedir" "github.com/mitchellh/mapstructure" "github.com/spf13/cobra" @@ -468,7 +467,7 @@ func sendStats(cmd *cobra.Command, cmdSuffix string) { } } -func getHTTPClient() *retryablehttp.Client { +func getHTTPClient() *http.Client { // Override MaxIdleConnsPerHost to allow highly concurrent access to our API client. // This is done to avoid accumulating many sockets in `TIME_WAIT` status that were closed // only to be immediately reopened. @@ -476,7 +475,7 @@ func getHTTPClient() *retryablehttp.Client { transport := http.DefaultTransport.(*http.Transport).Clone() transport.MaxIdleConnsPerHost = DefaultMaxIdleConnsPerHost if !cfg.Server.Retries.Enabled { - return NewRetryClient(RetriesCfg{MaxAttempts: 1}, transport) + return &http.Client{Transport: transport} } return NewRetryClient(cfg.Server.Retries, transport) } @@ -499,7 +498,7 @@ func getClient() *apigen.ClientWithResponses { oss := osinfo.GetOSInfo() client, err := apigen.NewClientWithResponses( serverEndpoint, - apigen.WithHTTPClient(httpClient.StandardClient()), + apigen.WithHTTPClient(httpClient), apigen.WithRequestEditorFn(basicAuthProvider.Intercept), apigen.WithRequestEditorFn(func(ctx context.Context, req *http.Request) error { // This UA string structure is agreed upon diff --git a/pkg/api/helpers/upload.go b/pkg/api/helpers/upload.go index 507c0346f4c..27e09027a6f 100644 --- a/pkg/api/helpers/upload.go +++ b/pkg/api/helpers/upload.go @@ -17,7 +17,6 @@ import ( "strings" "github.com/go-openapi/swag" - "github.com/hashicorp/go-retryablehttp" "github.com/treeverse/lakefs/pkg/api/apigen" "github.com/treeverse/lakefs/pkg/api/apiutil" "github.com/treeverse/lakefs/pkg/block/azure" @@ -105,7 +104,7 @@ func ClientUpload(ctx context.Context, client apigen.ClientWithResponsesInterfac // It supports both multipart and single part uploads. type PreSignUploader struct { Concurrency int - HTTPClient *retryablehttp.Client + HTTPClient *http.Client Client apigen.ClientWithResponsesInterface MultipartSupport bool } @@ -125,7 +124,7 @@ type presignUpload struct { numParts int } -func NewPreSignUploader(client apigen.ClientWithResponsesInterface, httpClient *retryablehttp.Client, multipartSupport bool) *PreSignUploader { +func NewPreSignUploader(client apigen.ClientWithResponsesInterface, httpClient *http.Client, multipartSupport bool) *PreSignUploader { return &PreSignUploader{ Concurrency: DefaultUploadConcurrency, HTTPClient: httpClient, @@ -301,7 +300,7 @@ func (u *presignUpload) initMultipart(ctx context.Context) (*apigen.PresignMulti } func (u *presignUpload) uploadPart(ctx context.Context, partReader *io.SectionReader, partURL string) (string, error) { - req, err := retryablehttp.NewRequestWithContext(ctx, http.MethodPut, partURL, partReader) + req, err := http.NewRequestWithContext(ctx, http.MethodPut, partURL, partReader) if err != nil { return "", err } @@ -341,7 +340,8 @@ func (u *presignUpload) uploadObject(ctx context.Context) (*apigen.ObjectStats, // Passing Reader with content length == 0 results in 501 Not Implemented body = u.reader } - req, err := retryablehttp.NewRequestWithContext(ctx, http.MethodPut, preSignURL, body) + + req, err := http.NewRequestWithContext(ctx, http.MethodPut, preSignURL, body) if err != nil { return nil, err } @@ -403,7 +403,7 @@ func (u *presignUpload) Upload(ctx context.Context) (*apigen.ObjectStats, error) return u.uploadObject(ctx) } -func ClientUploadPreSign(ctx context.Context, client apigen.ClientWithResponsesInterface, httpClient *retryablehttp.Client, repoID, branchID, objPath string, metadata map[string]string, contentType string, contents io.ReadSeeker, presignMultipartSupport bool) (*apigen.ObjectStats, error) { +func ClientUploadPreSign(ctx context.Context, client apigen.ClientWithResponsesInterface, httpClient *http.Client, repoID, branchID, objPath string, metadata map[string]string, contentType string, contents io.ReadSeeker, presignMultipartSupport bool) (*apigen.ObjectStats, error) { // upload loop, retry on conflict uploader := NewPreSignUploader(client, httpClient, presignMultipartSupport) for { diff --git a/pkg/local/progress.go b/pkg/local/progress.go index 239a78027fc..52cc7c14f39 100644 --- a/pkg/local/progress.go +++ b/pkg/local/progress.go @@ -135,6 +135,7 @@ func NewProgressPool() *ProgressPool { type fileWrapper struct { file io.Seeker reader io.Reader + closer io.Closer } func (f fileWrapper) Read(p []byte) (n int, err error) { @@ -144,3 +145,7 @@ func (f fileWrapper) Read(p []byte) (n int, err error) { func (f fileWrapper) Seek(offset int64, whence int) (int64, error) { return f.file.Seek(offset, whence) } + +func (f fileWrapper) Close() error { + return nil +} diff --git a/pkg/local/sync.go b/pkg/local/sync.go index f4e3b8be6f8..fa82508334d 100644 --- a/pkg/local/sync.go +++ b/pkg/local/sync.go @@ -18,7 +18,6 @@ import ( "time" "github.com/go-openapi/swag" - "github.com/hashicorp/go-retryablehttp" "github.com/jedib0t/go-pretty/v6/progress" "github.com/treeverse/lakefs/pkg/api/apigen" "github.com/treeverse/lakefs/pkg/api/helpers" @@ -48,13 +47,13 @@ type Tasks struct { type SyncManager struct { ctx context.Context client *apigen.ClientWithResponses - httpClient *retryablehttp.Client + httpClient *http.Client progressBar *ProgressPool tasks Tasks cfg Config } -func NewSyncManager(ctx context.Context, client *apigen.ClientWithResponses, httpClient *retryablehttp.Client, cfg Config) *SyncManager { +func NewSyncManager(ctx context.Context, client *apigen.ClientWithResponses, httpClient *http.Client, cfg Config) *SyncManager { sm := &SyncManager{ ctx: ctx, client: client, From cc90b445f499dada2da5c0f2a006936ba7850f9e Mon Sep 17 00:00:00 2001 From: Idan Novogroder Date: Wed, 6 Nov 2024 11:04:48 +0200 Subject: [PATCH 4/4] Remove unused field --- pkg/local/progress.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/local/progress.go b/pkg/local/progress.go index 52cc7c14f39..5e6d6e3c7df 100644 --- a/pkg/local/progress.go +++ b/pkg/local/progress.go @@ -135,7 +135,6 @@ func NewProgressPool() *ProgressPool { type fileWrapper struct { file io.Seeker reader io.Reader - closer io.Closer } func (f fileWrapper) Read(p []byte) (n int, err error) {