Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GODRIVER-2520 Remove deadline setters from gridfs #1427

Merged
merged 15 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 62 additions & 128 deletions mongo/gridfs/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"errors"
"fmt"
"io"
"time"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
Expand Down Expand Up @@ -51,9 +50,6 @@ type Bucket struct {
firstWriteDone bool
readBuf []byte
writeBuf []byte

readDeadline time.Time
writeDeadline time.Time
}

// Upload contains options to upload a file to a bucket.
Expand Down Expand Up @@ -120,30 +116,22 @@ func NewBucket(db *mongo.Database, opts ...*options.BucketOptions) (*Bucket, err
return b, nil
}

// SetWriteDeadline sets the write deadline for this bucket.
func (b *Bucket) SetWriteDeadline(t time.Time) error {
b.writeDeadline = t
return nil
}

// SetReadDeadline sets the read deadline for this bucket
func (b *Bucket) SetReadDeadline(t time.Time) error {
b.readDeadline = t
return nil
}

// OpenUploadStream creates a file ID new upload stream for a file given the filename.
func (b *Bucket) OpenUploadStream(filename string, opts ...*options.UploadOptions) (*UploadStream, error) {
return b.OpenUploadStreamWithID(primitive.NewObjectID(), filename, opts...)
func (b *Bucket) OpenUploadStream(
ctx context.Context,
filename string,
opts ...*options.UploadOptions,
) (*UploadStream, error) {
return b.OpenUploadStreamWithID(ctx, primitive.NewObjectID(), filename, opts...)
}

// OpenUploadStreamWithID creates a new upload stream for a file given the file ID and filename.
func (b *Bucket) OpenUploadStreamWithID(fileID interface{}, filename string, opts ...*options.UploadOptions) (*UploadStream, error) {
ctx, cancel := deadlineContext(b.writeDeadline)
if cancel != nil {
defer cancel()
}

func (b *Bucket) OpenUploadStreamWithID(
ctx context.Context,
fileID interface{},
filename string,
opts ...*options.UploadOptions,
) (*UploadStream, error) {
if err := b.checkFirstWrite(ctx); err != nil {
return nil, err
}
Expand All @@ -160,25 +148,30 @@ func (b *Bucket) OpenUploadStreamWithID(fileID interface{}, filename string, opt
//
// If this upload requires a custom write deadline to be set on the bucket, it cannot be done concurrently with other
// write operations operations on this bucket that also require a custom deadline.
func (b *Bucket) UploadFromStream(filename string, source io.Reader, opts ...*options.UploadOptions) (primitive.ObjectID, error) {
func (b *Bucket) UploadFromStream(
ctx context.Context,
filename string,
source io.Reader,
opts ...*options.UploadOptions,
) (primitive.ObjectID, error) {
fileID := primitive.NewObjectID()
err := b.UploadFromStreamWithID(fileID, filename, source, opts...)
err := b.UploadFromStreamWithID(ctx, fileID, filename, source, opts...)
return fileID, err
}

// UploadFromStreamWithID uploads a file given a source stream.
//
// If this upload requires a custom write deadline to be set on the bucket, it cannot be done concurrently with other
// write operations operations on this bucket that also require a custom deadline.
func (b *Bucket) UploadFromStreamWithID(fileID interface{}, filename string, source io.Reader, opts ...*options.UploadOptions) error {
us, err := b.OpenUploadStreamWithID(fileID, filename, opts...)
if err != nil {
return err
}

err = us.SetWriteDeadline(b.writeDeadline)
func (b *Bucket) UploadFromStreamWithID(
ctx context.Context,
fileID interface{},
filename string,
source io.Reader,
opts ...*options.UploadOptions,
) error {
us, err := b.OpenUploadStreamWithID(ctx, fileID, filename, opts...)
if err != nil {
_ = us.Close()
return err
}

Expand All @@ -205,8 +198,8 @@ func (b *Bucket) UploadFromStreamWithID(fileID interface{}, filename string, sou
}

// OpenDownloadStream creates a stream from which the contents of the file can be read.
func (b *Bucket) OpenDownloadStream(fileID interface{}) (*DownloadStream, error) {
return b.openDownloadStream(bson.D{
func (b *Bucket) OpenDownloadStream(ctx context.Context, fileID interface{}) (*DownloadStream, error) {
return b.openDownloadStream(ctx, bson.D{
{"_id", fileID},
})
}
Expand All @@ -216,17 +209,21 @@ func (b *Bucket) OpenDownloadStream(fileID interface{}) (*DownloadStream, error)
//
// If this download requires a custom read deadline to be set on the bucket, it cannot be done concurrently with other
// read operations operations on this bucket that also require a custom deadline.
func (b *Bucket) DownloadToStream(fileID interface{}, stream io.Writer) (int64, error) {
ds, err := b.OpenDownloadStream(fileID)
func (b *Bucket) DownloadToStream(ctx context.Context, fileID interface{}, stream io.Writer) (int64, error) {
ds, err := b.OpenDownloadStream(ctx, fileID)
if err != nil {
return 0, err
}

return b.downloadToStream(ds, stream)
return b.downloadToStream(ctx, ds, stream)
}

// OpenDownloadStreamByName opens a download stream for the file with the given filename.
func (b *Bucket) OpenDownloadStreamByName(filename string, opts ...*options.NameOptions) (*DownloadStream, error) {
func (b *Bucket) OpenDownloadStreamByName(
ctx context.Context,
filename string,
opts ...*options.NameOptions,
) (*DownloadStream, error) {
var numSkip int32 = -1
var sortOrder int32 = 1

Expand All @@ -252,41 +249,32 @@ func (b *Bucket) OpenDownloadStreamByName(filename string, opts ...*options.Name

findOpts := options.Find().SetSkip(int64(numSkip)).SetSort(bson.D{{"uploadDate", sortOrder}})

return b.openDownloadStream(bson.D{{"filename", filename}}, findOpts)
return b.openDownloadStream(ctx, bson.D{{"filename", filename}}, findOpts)
}

// DownloadToStreamByName downloads the file with the given name to the given io.Writer.
//
// If this download requires a custom read deadline to be set on the bucket, it cannot be done concurrently with other
// read operations operations on this bucket that also require a custom deadline.
func (b *Bucket) DownloadToStreamByName(filename string, stream io.Writer, opts ...*options.NameOptions) (int64, error) {
ds, err := b.OpenDownloadStreamByName(filename, opts...)
func (b *Bucket) DownloadToStreamByName(
ctx context.Context,
filename string,
stream io.Writer,
opts ...*options.NameOptions,
) (int64, error) {
ds, err := b.OpenDownloadStreamByName(ctx, filename, opts...)
if err != nil {
return 0, err
}

return b.downloadToStream(ds, stream)
return b.downloadToStream(ctx, ds, stream)
}

// Delete deletes all chunks and metadata associated with the file with the given file ID.
//
// If this operation requires a custom write deadline to be set on the bucket, it cannot be done concurrently with other
// write operations operations on this bucket that also require a custom deadline.
//
// Use SetWriteDeadline to set a deadline for the delete operation.
func (b *Bucket) Delete(fileID interface{}) error {
ctx, cancel := deadlineContext(b.writeDeadline)
if cancel != nil {
defer cancel()
}
return b.DeleteContext(ctx, fileID)
}

// DeleteContext deletes all chunks and metadata associated with the file with the given file ID and runs the underlying
// Delete deletes all chunks and metadata associated with the file with the given file ID and runs the underlying
// delete operations with the provided context.
//
// Use the context parameter to time-out or cancel the delete operation. The deadline set by SetWriteDeadline is ignored.
func (b *Bucket) DeleteContext(ctx context.Context, fileID interface{}) error {
func (b *Bucket) Delete(ctx context.Context, fileID interface{}) error {
// If no deadline is set on the passed-in context, Timeout is set on the Client, and context is
// not already a Timeout context, honor Timeout in new Timeout context for operation execution to
// be shared by both delete operations.
Expand All @@ -311,27 +299,16 @@ func (b *Bucket) DeleteContext(ctx context.Context, fileID interface{}) error {
return b.deleteChunks(ctx, fileID)
}

// Find returns the files collection documents that match the given filter.
//
// If this download requires a custom read deadline to be set on the bucket, it cannot be done concurrently with other
// read operations operations on this bucket that also require a custom deadline.
//
// Use SetReadDeadline to set a deadline for the find operation.
func (b *Bucket) Find(filter interface{}, opts ...*options.GridFSFindOptions) (*mongo.Cursor, error) {
ctx, cancel := deadlineContext(b.readDeadline)
if cancel != nil {
defer cancel()
}

return b.FindContext(ctx, filter, opts...)
}

// FindContext returns the files collection documents that match the given filter and runs the underlying
// Find returns the files collection documents that match the given filter and runs the underlying
// find query with the provided context.
//
// Use the context parameter to time-out or cancel the find operation. The deadline set by SetReadDeadline
// is ignored.
func (b *Bucket) FindContext(ctx context.Context, filter interface{}, opts ...*options.GridFSFindOptions) (*mongo.Cursor, error) {
func (b *Bucket) Find(
ctx context.Context,
filter interface{},
opts ...*options.GridFSFindOptions,
) (*mongo.Cursor, error) {
gfsOpts := options.GridFSFind()
for _, opt := range opts {
if opt == nil {
Expand Down Expand Up @@ -391,20 +368,7 @@ func (b *Bucket) FindContext(ctx context.Context, filter interface{}, opts ...*o
// write operations operations on this bucket that also require a custom deadline
//
// Use SetWriteDeadline to set a deadline for the rename operation.
func (b *Bucket) Rename(fileID interface{}, newFilename string) error {
ctx, cancel := deadlineContext(b.writeDeadline)
if cancel != nil {
defer cancel()
}

return b.RenameContext(ctx, fileID, newFilename)
}

// RenameContext renames the stored file with the specified file ID and runs the underlying update with the provided
// context.
//
// Use the context parameter to time-out or cancel the rename operation. The deadline set by SetWriteDeadline is ignored.
func (b *Bucket) RenameContext(ctx context.Context, fileID interface{}, newFilename string) error {
func (b *Bucket) Rename(ctx context.Context, fileID interface{}, newFilename string) error {
res, err := b.filesColl.UpdateOne(ctx,
bson.D{{"_id", fileID}},
bson.D{{"$set", bson.D{{"filename", newFilename}}}},
Expand All @@ -420,26 +384,11 @@ func (b *Bucket) RenameContext(ctx context.Context, fileID interface{}, newFilen
return nil
}

// Drop drops the files and chunks collections associated with this bucket.
//
// If this operation requires a custom write deadline to be set on the bucket, it cannot be done concurrently with other
// write operations operations on this bucket that also require a custom deadline
//
// Use SetWriteDeadline to set a deadline for the drop operation.
func (b *Bucket) Drop() error {
ctx, cancel := deadlineContext(b.writeDeadline)
if cancel != nil {
defer cancel()
}

return b.DropContext(ctx)
}

// DropContext drops the files and chunks collections associated with this bucket and runs the drop operations with
// the provided context.
//
// Use the context parameter to time-out or cancel the drop operation. The deadline set by SetWriteDeadline is ignored.
func (b *Bucket) DropContext(ctx context.Context) error {
func (b *Bucket) Drop(ctx context.Context) error {
// If no deadline is set on the passed-in context, Timeout is set on the Client, and context is
// not already a Timeout context, honor Timeout in new Timeout context for operation execution to
// be shared by both drop operations.
Expand Down Expand Up @@ -469,12 +418,11 @@ func (b *Bucket) GetChunksCollection() *mongo.Collection {
return b.chunksColl
}

func (b *Bucket) openDownloadStream(filter interface{}, opts ...*options.FindOptions) (*DownloadStream, error) {
ctx, cancel := deadlineContext(b.readDeadline)
if cancel != nil {
defer cancel()
}

func (b *Bucket) openDownloadStream(
ctx context.Context,
filter interface{},
opts ...*options.FindOptions,
) (*DownloadStream, error) {
cursor, err := b.findFile(ctx, filter, opts...)
prestonvasquez marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
Expand Down Expand Up @@ -506,21 +454,7 @@ func (b *Bucket) openDownloadStream(filter interface{}, opts ...*options.FindOpt
return newDownloadStream(chunksCursor, foundFile.ChunkSize, &foundFile), nil
}

func deadlineContext(deadline time.Time) (context.Context, context.CancelFunc) {
if deadline.Equal(time.Time{}) {
return context.Background(), nil
}

return context.WithDeadline(context.Background(), deadline)
}

func (b *Bucket) downloadToStream(ds *DownloadStream, stream io.Writer) (int64, error) {
err := ds.SetReadDeadline(b.readDeadline)
if err != nil {
_ = ds.Close()
return 0, err
}

func (b *Bucket) downloadToStream(ctx context.Context, ds *DownloadStream, stream io.Writer) (int64, error) {
copied, err := io.Copy(stream, ds)
if err != nil {
_ = ds.Close()
Expand Down
28 changes: 7 additions & 21 deletions mongo/gridfs/download_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ type DownloadStream struct {
bufferStart int
bufferEnd int
expectedChunk int32 // index of next expected chunk
readDeadline time.Time
fileLen int64
ctx context.Context

// The pointer returned by GetFile. This should not be used in the actual DownloadStream code outside of the
// newDownloadStream constructor because the values can be mutated by the user after calling GetFile. Instead,
Expand Down Expand Up @@ -128,14 +128,10 @@ func (ds *DownloadStream) Close() error {
return nil
}

// SetReadDeadline sets the read deadline for this download stream.
func (ds *DownloadStream) SetReadDeadline(t time.Time) error {
if ds.closed {
return ErrStreamClosed
}

ds.readDeadline = t
return nil
// WithContext sets the context for the DownloadStream, allowing control over
// the execution and behavior of operations associated with the stream.
func (ds *DownloadStream) WithContext(ctx context.Context) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An alternative to this would be to have a constructor that accepts a context for initializing a download stream.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this setter at all? The only ways to create a DownloadStream are using OpenDownloadStream or OpenDownloadStreamByName, which both accept a Context parameter as of this PR.

Copy link
Collaborator Author

@prestonvasquez prestonvasquez Oct 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The context set by WithContext is specific to the read operation, which is independent of constructing a DownloadStream. For example, this:

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
mt.Cleanup(cancel)

ds, err := bucket.OpenDownloadStreamByName(ctx, fileName) // could time out finding a file, etc
assert.Nil(mt, err, "OpenDownloadStreamByName error: %v", err)

p := make([]byte, len(fileData))
_, err = ds.Read(p)

has a different intent than this:

ds, err := bucket.OpenDownloadStreamByName(context.Background(), fileName) 
assert.Nil(mt, err, "OpenDownloadStreamByName error: %v", err)

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
mt.Cleanup(cancel)

ds.WithContext(ctx) // specifically trying to add a context when reading a file

p := make([]byte, len(fileData))
_, err = ds.Read(p)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point about how the Context in OpenDownloadStream is only used to get the file info, but is not used for the subsequent ops that read the file info from the database. I think it's worth considering how similar APIs behave.

The Go stdlib offers a few examples of how to apply timeouts to stream reader types that implement io.Reader. The patterns are similar for applying timeouts to stream writer types that implement io.Writer.

The Go net.Conn allows setting a read timeout via SetReadDeadline or SetDeadline.

conn, _ := net.Dial(...)
conn.SetReadDeadline(time.Now().Add(15 * time.Second))

// Will time out in 15 seconds
io.ReadAll(conn)

The Go http.Client allows setting a timeout that applies to the entire lifetime of any request, including dialing, reading headers, and reading the body.

client := &http.Client{
    Timeout: 15 * time.Second,
}
resp, err := client.Get(...)

// Will time out in 15 seconds.
io.ReadAll(http.Body)

Thoughts:

  • Concerning using "read deadline" vs "context", all of the underlying APIs used by the GridFS code accept a Context (they're all just Go driver CRUD calls), so using a Context seems to be the best choice.
  • I think accepting a Context in OpenDownloadStream that is not used for actually downloading the file is confusing and would surprise most users. I recommend using the Context passed to OpenDownloadStream (and OpenDownloadStreamByName) as the Context on a DownloadStream.
  • If we want to allow users to override the Context used when actually downloading the file, we can add a SetContext method to DownloadStream. However, it's not immediately clear if that is necessary, so I'd recommend omitting it for now.

Copy link
Collaborator Author

@prestonvasquez prestonvasquez Oct 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The suggestion of WithContext comes directly from the http packages Request.WithContext API. Which uses a context set by this method in it's io operations. I am also open to retaining the existing API which, as you note, is the pattern used in net.Conn. I would argue the existing pattern (SetReadDeadline) is unnecessarily asymmetric as DownloadStream does not have a concept of Write and so WithContext or SetDeadline is concise.


I think accepting a Context in OpenDownloadStream that is not used for actually downloading the file is confusing and would surprise most users. I recommend using the Context passed to OpenDownloadStream (and OpenDownloadStreamByName) as the Context on a DownloadStream.

The context timeout starts ticking around when the DownloadStream is constructed. So The user will have to be judicious about how they set the context timeout and when they plan on reading from io. If we go this way, I agree with omitting a setter specific to setting context on the streaming types until it's more clear if there is a use case for it. However, in my opinion this makes the API for DownloadStream more difficult to use. What are your thoughts?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another issue with going the constructor route is that if we ever needed to add a read-specific context timeout, then undoing the constructor propagation of context would be a breaking change.

For example, suppose a user is setting a context on the constructor to timeout the the find operation, i.e. the construction. And they have no intention of attempting to timeout the io read. We would be tempted on the Go Driver team to add a WithContext method to DownloadStream to accommodate this case. However, we couldn't simply revert the context associated with the constructor because that could break another user's logic that expects a timeout to be shared between construction and read. This could be an awkward situation.

I think simply having something like SetReadDeadline is the correct approach to the Download/Upload Stream objects.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The GridFS API section of the CSOT spec actually describes the required behavior of the timeout param, which is basically "use the constructor context":

... all methods in the GridFS Bucket API MUST support the timeoutMS option. For methods that create streams (e.g. open_upload_stream), the option MUST cap the lifetime of the entire stream. ... Methods that interact with a user-provided stream (e.g. upload_from_stream) MUST use timeoutMS as the timeout for the entire upload/download operation.

Concerning the comment

we couldn't simply revert the context associated with the constructor because that could break another user's logic that expects a timeout to be shared between construction and read

If we use the Context passed into the constructor, adding a new WithContext method to a DownloadStream doesn't seem like it would create a breaking change in API behavior.

For example, consider downloading a file with a 30 second timeout:

ctx, cancel := context.WithTimeout(context.Background(), 30 * time.Second)
defer cancel()
ds, _ := bucket.OpenDownloadStream(ctx, ...)
b, _ := io.ReadAll(ds)

Now consider opening a DownloadStream with a 30 second timeout, but reading the file document(s) with no timeout:

ctx, cancel := context.WithTimeout(context.Background(), 30 * time.Second)
defer cancel()
ds, _ := bucket.OpenDownloadStream(ctx, ...)
ds.WithContext(context.Background())
b, _ := io.ReadAll(ds)

Is there an examples where those timeouts would conflict?

It's still not clear that there is a use case for having different timeout behavior for different underlying operations during a GridFS upload/download, so I still recommend omitting it.

Copy link
Collaborator Author

@prestonvasquez prestonvasquez Nov 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@matthewdale For this:

we couldn't simply revert the context associated with the constructor because that could break another user's logic that expects a timeout to be shared between construction and read

I agree that there wouldn't be a conflict if (1) we didn't revert the context on the constructor, and (2) (probably) the WithContext method returned a shallow copy of the DownloadStream. Consider this resolved.

I will update the code to include the requested changes, since it conforms to the specifications. But I also want to make it clear that my concern with this approach is that the context lifecycle begins at construction.

This issue is because we store context on the objects, which is an antipattern, and the documentation linked covers this exact case:

The caller’s lifetime is intermingled with a shared context, and the context is scoped to the lifetime where the Worker is created.

The docs also note that the only reason we should do this is for backwards-compatibility, which is not our issue in 2.x.

Unfortunately, if we want to time out the read operation, we have to do this. However, we can do it more modularly than at instantiation. WithContext gives us more control over what precisely a timeout effects.


Notes:

The http packages NewRequestWithContext also notes this:

For an outgoing client request, the context controls the entire lifetime of a request and its response: obtaining a connection, sending the request, and reading the response headers and body.

Copy link
Collaborator

@matthewdale matthewdale Nov 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed that we're basically using an antipattern, as described almost exactly in the "Storing context in structs leads to confusion" section of that "Contexts and structs" article. However, it's not significantly clearer if we provide a context via WithContext on a DownloadStream (I'd actually argue it's more confusing). It seems like we're designing an API to work around two problems:

  1. io.Reader and io.Writer don't include contexts, so they basically have to be side-loaded for types that implement those interfaces. See an interesting proposal for adding contexts to those interfaces here.
  2. The methods like OpenDownloadStream aren't an atomic "download a file" operation, but are currently the only way to accomplish downloading a file in the API described in the GridFS spec. That creates an conflict between Go Context best practices and the GridFS spec.

There's not much we can do about (1). However, we could separate the upload/download API into different methods, one supporting timeout and one not. For example, keep the existing methods with timeouts that only affect the initial operations but not the returned DownloadStream (i.e. there is no way to time out Read calls):

func (b *Bucket) OpenDownloadStream(ctx context.Context, fileID any) (*DownloadStream, error)

Then add additional methods for upload/download that apply the context to the entire operation.

func (b *Bucket) Download(ctx context.Context, dst io.WriterAt, fileID any) error

That deviates from the spec, but conforms more closely to Go Context best practices.

P.S. The Download method signature is inspired by the AWS SDK's S3 Download method. The io.WriterAt allows downloading multiple file chunks simultaneously. That's not something the GridFS spec covers, but that API would allow for optimization in the future.

Copy link
Collaborator

@matthewdale matthewdale Nov 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From an offline conversation: The least surprising behavior is to make the Context apply to all I/O operations for a DownloadStream or UploadStream, which also matches the GridFS spec. The Download method I proposed above was intended to provide context and is really out of scope of this ticket, so can be ignored. Consider this comment resolved.

ds.ctx = ctx
}

// Read reads the file from the server and writes it to a destination byte slice.
Expand All @@ -148,17 +144,12 @@ func (ds *DownloadStream) Read(p []byte) (int, error) {
return 0, io.EOF
}

ctx, cancel := deadlineContext(ds.readDeadline)
if cancel != nil {
defer cancel()
}

bytesCopied := 0
var err error
for bytesCopied < len(p) {
if ds.bufferStart >= ds.bufferEnd {
// Buffer is empty and can load in data from new chunk.
err = ds.fillBuffer(ctx)
err = ds.fillBuffer(ds.ctx)
if err != nil {
if err == errNoMoreChunks {
if bytesCopied == 0 {
Expand Down Expand Up @@ -190,18 +181,13 @@ func (ds *DownloadStream) Skip(skip int64) (int64, error) {
return 0, nil
}

ctx, cancel := deadlineContext(ds.readDeadline)
if cancel != nil {
defer cancel()
}

var skipped int64
var err error

for skipped < skip {
if ds.bufferStart >= ds.bufferEnd {
// Buffer is empty and can load in data from new chunk.
err = ds.fillBuffer(ctx)
err = ds.fillBuffer(ds.ctx)
if err != nil {
if err == errNoMoreChunks {
return skipped, nil
Expand Down
Loading
Loading