Skip to content

Commit

Permalink
iterators, golint, and internal packages, oh my\!
Browse files Browse the repository at this point in the history
  • Loading branch information
puellanivis committed Nov 12, 2024
1 parent 5a107ff commit f1f69e9
Show file tree
Hide file tree
Showing 30 changed files with 807 additions and 318 deletions.
57 changes: 28 additions & 29 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,14 @@ import (
"os"
"path"
"slices"
"sync"
"sync/atomic"
"syscall"
"time"
"unsafe"

sshfx "github.com/pkg/sftp/v2/encoding/ssh/filexfer"
"github.com/pkg/sftp/v2/encoding/ssh/filexfer/openssh"
"github.com/pkg/sftp/v2/internal/pool"
"github.com/pkg/sftp/v2/internal/sync"

"golang.org/x/crypto/ssh"
)
Expand All @@ -35,10 +34,10 @@ type clientConn struct {
reqid atomic.Uint32
rd io.Reader

resPool *pool.WorkPool[result]
resPool *sync.WorkPool[result]

bufPool *pool.SlicePool[[]byte, byte]
pktPool *pool.Pool[sshfx.RawPacket]
bufPool *sync.SlicePool[[]byte, byte]
pktPool *sync.Pool[sshfx.RawPacket]

mu sync.Mutex
closed chan struct{}
Expand Down Expand Up @@ -551,10 +550,10 @@ func NewClientPipe(ctx context.Context, rd io.Reader, wr io.WriteCloser, opts ..

cl.exts = exts

cl.conn.resPool = pool.NewWorkPool[result](cl.maxInflight)
cl.conn.resPool = sync.NewWorkPool[result](cl.maxInflight)

cl.conn.bufPool = pool.NewSlicePool[[]byte](cl.maxInflight, int(cl.maxPacket))
cl.conn.pktPool = pool.NewPool[sshfx.RawPacket](cl.maxInflight)
cl.conn.bufPool = sync.NewSlicePool[[]byte](cl.maxInflight, int(cl.maxPacket))
cl.conn.pktPool = sync.NewPool[sshfx.RawPacket](cl.maxInflight)

go func() {
if err := cl.conn.recvLoop(cl.maxPacket); err != nil {
Expand All @@ -565,8 +564,10 @@ func NewClientPipe(ctx context.Context, rd io.Reader, wr io.WriteCloser, opts ..
return cl, nil
}

// ReportPoolMetrics writes buffer pool metrics to the given writer.
// ReportPoolMetrics writes buffer pool metrics to the given writer, if pool metrics are enabled.
// It is expected that this is only useful during testing, and benchmarking.
//
// To enable you must include `-tag sftp.sync.metrics` to your go command-line.
func (cl *Client) ReportPoolMetrics(wr io.Writer) {
if cl.conn.bufPool != nil {
hits, total := cl.conn.bufPool.Hits()
Expand Down Expand Up @@ -993,36 +994,34 @@ func (d *Dir) Name() string {
return d.name
}

// readdir returns an iterator over the directory entries of the directory.
// We do not expose an iterator, because none have been defined yet,
// rangedir returns an iterator over the directory entries of the directory.
// We do not expose an iterator, because none has been standardized yet.
// and we do not want to accidentally implement an inconsistent API.
// However, for internal usage, we can definitely make use of this to simplify the common parts of ReadDir and Readdir.
//
// Callers must guarantee synchronization by either holding the file lock, or holding an exclusive reference.
func (d *Dir) readdir(ctx context.Context) iter.Seq2[*sshfx.NameEntry, error] {
func (d *Dir) rangedir(ctx context.Context) iter.Seq2[*sshfx.NameEntry, error] {
return func(yield func(v *sshfx.NameEntry, err error) bool) {
// We have saved entries, use those first.
if len(d.entries) > 0 {
for i, ent := range d.entries {
if !yield(ent, nil) {
// Early break, delete the entries we have yielded.
d.entries = slices.Delete(d.entries, 0, i+1)
return
}
// Pull from saved entries first.
for i, ent := range d.entries {
if !yield(ent, nil) {
// Early break, delete the entries we have yielded.
d.entries = slices.Delete(d.entries, 0, i+1)
return
}

// We got through all the remaining entries, delete all the entries.
d.entries = slices.Delete(d.entries, 0, len(d.entries))
}

// We got through all the remaining entries, delete all the entries.
d.entries = slices.Delete(d.entries, 0, len(d.entries))

for {
pkt, err := getPacket[sshfx.NamePacket](ctx, d.cl, &sshfx.ReadDirPacket{
Handle: d.handle,
})
if err != nil {
// There are no remaining entries to save here,
// SFTP can only return either an error or a result, never both.
if err == io.EOF {
if errors.Is(err, io.EOF) {
yield(nil, io.EOF)
return
}
Expand Down Expand Up @@ -1069,9 +1068,9 @@ func (d *Dir) ReaddirContext(ctx context.Context, n int) ([]fs.FileInfo, error)

var ret []fs.FileInfo

for ent, err := range d.readdir(ctx) {
for ent, err := range d.rangedir(ctx) {
if err != nil {
if err == io.EOF && n <= 0 {
if errors.Is(err, io.EOF) && n <= 0 {
return ret, nil
}

Expand Down Expand Up @@ -1115,9 +1114,9 @@ func (d *Dir) ReadDirContext(ctx context.Context, n int) ([]fs.DirEntry, error)

var ret []fs.DirEntry

for ent, err := range d.readdir(ctx) {
for ent, err := range d.rangedir(ctx) {
if err != nil {
if err == io.EOF && n <= 0 {
if errors.Is(err, io.EOF) && n <= 0 {
return ret, nil
}

Expand Down Expand Up @@ -1923,7 +1922,7 @@ func (f *File) Read(b []byte) (int, error) {

f.offset += int64(n)

if err == io.EOF && n != 0 {
if errors.Is(err, io.EOF) && n != 0 {
return n, nil
}

Expand Down
11 changes: 11 additions & 0 deletions encoding/ssh/filexfer/attrs.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,41 +251,52 @@ func (e *ExtendedAttribute) UnmarshalBinary(data []byte) error {

// NameEntry implements the SSH_FXP_NAME repeated data type from draft-ietf-secsh-filexfer-02
//
// It implements both [fs.FileInfo] and [fs.DirEntry].
//
// This type is incompatible with versions 4 or higher.
type NameEntry struct {
Filename string
Longname string
Attrs Attributes
}

// Name implements [fs.FileInfo].
func (e *NameEntry) Name() string {
return path.Base(e.Filename)
}

// Size implements [fs.FileInfo].
func (e *NameEntry) Size() int64 {
return int64(e.Attrs.Size)
}

// Mode implements [fs.FileInfo].
func (e *NameEntry) Mode() fs.FileMode {
return ToGoFileMode(e.Attrs.Permissions)
}

// ModTime implements [fs.FileInfo].
func (e *NameEntry) ModTime() time.Time {
return time.Unix(int64(e.Attrs.MTime), 0)
}

// IsDir implements [fs.FileInfo].
func (e *NameEntry) IsDir() bool {
return e.Attrs.Permissions.IsDir()
}

// Sys implements [fs.FileInfo].
// It returns a pointer of type *Attribute to the Attr field of this name entry.
func (e *NameEntry) Sys() any {
return &e.Attrs
}

// Type implements [fs.DirEntry].
func (e *NameEntry) Type() fs.FileMode {
return ToGoFileMode(e.Attrs.Permissions).Type()
}

// Info implements [fs.DirEntry].
func (e *NameEntry) Info() (fs.FileInfo, error) {
return e, nil
}
Expand Down
4 changes: 2 additions & 2 deletions encoding/ssh/filexfer/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"encoding/binary"
"errors"

"github.com/pkg/sftp/v2/internal/pool"
"github.com/pkg/sftp/v2/internal/sync"
)

// Various encoding errors.
Expand All @@ -22,7 +22,7 @@ type Buffer struct {
Err error
}

var bufPool = pool.NewPool[Buffer](64)
var bufPool = sync.NewPool[Buffer](64)

// NewBuffer creates and initializes a new buffer using buf as its initial contents.
// The new buffer takes ownership of buf, and the caller should not use buf after this call.
Expand Down
13 changes: 9 additions & 4 deletions encoding/ssh/filexfer/fxp.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package sshfx
import (
"fmt"

"github.com/pkg/sftp/v2/internal/pool"
"github.com/pkg/sftp/v2/internal/sync"
)

// PacketType defines the various SFTP packet types.
Expand Down Expand Up @@ -126,11 +126,16 @@ func (f PacketType) String() string {
}

var (
readPool = pool.NewPool[ReadPacket](64)
writePool = pool.NewPool[WritePacket](64)
wrDataPool = pool.NewSlicePool[[]byte](64, DefaultMaxDataLength)
readPool = sync.NewPool[ReadPacket](64)
writePool = sync.NewPool[WritePacket](64)
wrDataPool = sync.NewSlicePool[[]byte](64, DefaultMaxDataLength)
)

// PoolReturn adds a packet to an internal pool for its type, if one exists.
// If a pool has not been setup, then it is a no-op.
//
// Currently, this is only setup for [ReadPacket] and [WritePacket],
// as these are generally the most heavily used packet types.
func PoolReturn(p Packet) {
switch p := p.(type) {
case *ReadPacket:
Expand Down
2 changes: 2 additions & 0 deletions encoding/ssh/filexfer/packets.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ func (p *RawPacket) UnmarshalBinary(data []byte) error {
return p.UnmarshalFrom(NewBuffer(clone[:n]))
}

// PacketBody unmarshals and returns the concretely typed Packet that this raw packet encodes.
// It returns an error if the packet type is not recognized, or unmarshalling the packet fails.
func (p *RawPacket) PacketBody() (Packet, error) {
body, err := newPacketFromType(p.PacketType)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions examples/buffered-read-benchmark/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/pkg/sftp/v2"
)

// Various flags to control the benchmark.
var (
User = flag.String("user", os.Getenv("USER"), "ssh username")
Pass = flag.String("pass", os.Getenv("SOCKSIE_SSH_PASSWORD"), "ssh password")
Expand Down
1 change: 1 addition & 0 deletions examples/buffered-write-benchmark/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/pkg/sftp/v2"
)

// Various flags to control the benchmark.
var (
User = flag.String("user", os.Getenv("USER"), "ssh username")
Pass = flag.String("pass", os.Getenv("SOCKSIE_SSH_PASSWORD"), "ssh password")
Expand Down
1 change: 1 addition & 0 deletions examples/streaming-read-benchmark/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/pkg/sftp/v2"
)

// Various flags to control the benchmark.
var (
User = flag.String("user", os.Getenv("USER"), "ssh username")
Pass = flag.String("pass", os.Getenv("SOCKSIE_SSH_PASSWORD"), "ssh password")
Expand Down
1 change: 1 addition & 0 deletions examples/streaming-write-benchmark/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/pkg/sftp/v2"
)

// Various flags to control the benchmark.
var (
User = flag.String("user", os.Getenv("USER"), "ssh username")
Pass = flag.String("pass", os.Getenv("SOCKSIE_SSH_PASSWORD"), "ssh password")
Expand Down
Loading

0 comments on commit f1f69e9

Please sign in to comment.