Skip to content

Commit

Permalink
verify piece in separate goroutine
Browse files Browse the repository at this point in the history
  • Loading branch information
cenkalti committed Feb 18, 2019
1 parent b67e10f commit 57baeda
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 56 deletions.
15 changes: 8 additions & 7 deletions internal/piece/piece.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ import (

// Piece of a torrent.
type Piece struct {
Index uint32 // index in torrent
Length uint32 // always equal to Info.PieceLength except last piece
Blocks Blocks
Data filesection.Piece // the place to write downloaded bytes
Hash []byte
Writing bool
Done bool
Index uint32 // index in torrent
Length uint32 // always equal to Info.PieceLength except last piece
Blocks Blocks
Data filesection.Piece // the place to write downloaded bytes
Hash []byte
Verifying bool
Writing bool
Done bool
}

func NewPieces(info *metainfo.Info, files []storage.File) []Piece {
Expand Down
8 changes: 4 additions & 4 deletions internal/piecepicker/piecepicker.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (p *PiecePicker) findPiece(pe *peer.Peer) *myPiece {
func (p *PiecePicker) pickAllowedFast(pe *peer.Peer) *myPiece {
for _, pi := range pe.AllowedFast.Pieces {
mp := &p.pieces[pi.Index]
if mp.Done || mp.Writing {
if mp.Done || mp.Writing || mp.Verifying {
continue
}
if mp.Requested.Len() == 0 && mp.Having.Has(pe) {
Expand All @@ -170,7 +170,7 @@ func (p *PiecePicker) pickRarest(pe *peer.Peer) *myPiece {
var hasUnrequested bool
// Select unrequested piece
for _, mp := range p.piecesByAvailability {
if mp.Done || mp.Writing {
if mp.Done || mp.Writing || mp.Verifying {
continue
}
if mp.Requested.Len() == 0 && mp.Having.Has(pe) {
Expand All @@ -194,7 +194,7 @@ func (p *PiecePicker) pickEndgame(pe *peer.Peer) *myPiece {
})
// Select unrequested piece
for _, mp := range p.piecesByAvailability {
if mp.Done || mp.Writing {
if mp.Done || mp.Writing || mp.Verifying {
continue
}
if mp.Requested.Len() < p.maxDuplicateDownload && mp.Having.Has(pe) {
Expand All @@ -211,7 +211,7 @@ func (p *PiecePicker) pickStalled(pe *peer.Peer) *myPiece {
})
// Select unrequested piece
for _, mp := range p.piecesByStalled {
if mp.Done || mp.Writing {
if mp.Done || mp.Writing || mp.Verifying {
continue
}
if mp.RunningDownloads() > 0 {
Expand Down
35 changes: 35 additions & 0 deletions internal/pieceverifier/pieceverifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package pieceverifier

import (
"crypto/sha1" // nolint: gosec

"github.com/cenkalti/rain/internal/peer"
"github.com/cenkalti/rain/internal/piece"
)

type PieceVerifier struct {
Piece *piece.Piece
Peer *peer.Peer
Buffer []byte
Length uint32
OK bool
}

func New(p *piece.Piece, pe *peer.Peer, buf []byte, length uint32) *PieceVerifier {
return &PieceVerifier{
Piece: p,
Peer: pe,
Buffer: buf,
Length: length,
}
}

func (v *PieceVerifier) Run(resultC chan *PieceVerifier, closeC chan struct{}) {
data := v.Buffer[:v.Length]
v.OK = v.Piece.VerifyHash(data, sha1.New()) // nolint: gosec

select {
case resultC <- v:
case <-closeC:
}
}
16 changes: 2 additions & 14 deletions internal/piecewriter/piecewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,32 +9,20 @@ type PieceWriter struct {
Buffer []byte
Lenght uint32
Error error

closeC chan struct{}
doneC chan struct{}
}

func New(p *piece.Piece, buf []byte, length uint32) *PieceWriter {
return &PieceWriter{
Piece: p,
Buffer: buf,
Lenght: length,
closeC: make(chan struct{}),
doneC: make(chan struct{}),
}
}

func (w *PieceWriter) Close() {
close(w.closeC)
<-w.doneC
}

func (w *PieceWriter) Run(resultC chan *PieceWriter) {
defer close(w.doneC)

func (w *PieceWriter) Run(resultC chan *PieceWriter, closeC chan struct{}) {
_, w.Error = w.Piece.Data.Write(w.Buffer[:w.Lenght])
select {
case resultC <- w:
case <-w.closeC:
case <-closeC:
}
}
34 changes: 4 additions & 30 deletions torrent/messagehandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/cenkalti/rain/internal/peer"
"github.com/cenkalti/rain/internal/peerconn/peerwriter"
"github.com/cenkalti/rain/internal/peerprotocol"
"github.com/cenkalti/rain/internal/piecewriter"
"github.com/cenkalti/rain/internal/pieceverifier"
"github.com/cenkalti/rain/internal/tracker"
)

Expand Down Expand Up @@ -62,35 +62,9 @@ func (t *torrent) handlePieceMessage(pm peer.PieceMessage) {
t.closePieceDownloader(pd)
pe.StopSnubTimer()

ok = piece.VerifyHash(pd.Buffer[:pd.Piece.Length], sha1.New()) // nolint: gosec
if !ok {
t.resumerStats.BytesWasted += int64(len(msg.Data))
t.log.Error("received corrupt piece")
t.closePeer(pd.Peer)
t.startPieceDownloaderFor(pe)
return
}

if t.piecePicker != nil {
for _, pe := range t.piecePicker.RequestedPeers(piece.Index) {
pd2 := t.pieceDownloaders[pe]
t.closePieceDownloader(pd2)
pd2.CancelPending()
}
}

if piece.Writing {
panic("piece already writing")
}
piece.Writing = true

t.blockPieceMessages = t.pieceMessages
t.pieceMessages = nil

pw := piecewriter.New(piece, pd.Buffer, pd.Piece.Length)
go pw.Run(t.pieceWriterResultC)

t.startPieceDownloaderFor(pe)
piece.Verifying = true
pv := pieceverifier.New(piece, pe, pd.Buffer, pd.Piece.Length)
go pv.Run(t.pieceVerifierResultC, t.doneC)
}

func (t *torrent) handlePeerMessage(pm peer.Message) {
Expand Down
2 changes: 2 additions & 0 deletions torrent/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cenkalti/rain/internal/peer"
"github.com/cenkalti/rain/internal/piececache"
"github.com/cenkalti/rain/internal/piecedownloader"
"github.com/cenkalti/rain/internal/pieceverifier"
"github.com/cenkalti/rain/internal/piecewriter"
"github.com/cenkalti/rain/internal/resourcemanager"
"github.com/cenkalti/rain/internal/resumer"
Expand Down Expand Up @@ -96,6 +97,7 @@ func (o *options) NewTorrent(infoHash []byte, sto storage.Storage) (*torrent, er
peerSnubbedC: make(chan *peer.Peer),
infoDownloaders: make(map[*peer.Peer]*infodownloader.InfoDownloader),
infoDownloadersSnubbed: make(map[*peer.Peer]*infodownloader.InfoDownloader),
pieceVerifierResultC: make(chan *pieceverifier.PieceVerifier),
pieceWriterResultC: make(chan *piecewriter.PieceWriter),
optimisticUnchokedPeers: make([]*peer.Peer, 0, cfg.OptimisticUnchokedPeers),
completeC: make(chan struct{}),
Expand Down
31 changes: 31 additions & 0 deletions torrent/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cenkalti/rain/internal/peerconn"
"github.com/cenkalti/rain/internal/peerprotocol"
"github.com/cenkalti/rain/internal/piecedownloader"
"github.com/cenkalti/rain/internal/piecewriter"
)

var errClosed = errors.New("torrent is closed")
Expand Down Expand Up @@ -112,6 +113,36 @@ func (t *torrent) run() {
case req.Response <- announcer.Response{Torrent: tr}:
case <-req.Cancel:
}
case pv := <-t.pieceVerifierResultC:
pv.Piece.Verifying = false

if !pv.OK {
t.resumerStats.BytesWasted += int64(pv.Length)
t.log.Error("received corrupt piece")
t.closePeer(pv.Peer)
return
}

if pv.Piece.Writing {
break
}
pv.Piece.Writing = true

if t.piecePicker != nil {
for _, pe := range t.piecePicker.RequestedPeers(pv.Piece.Index) {
pd2 := t.pieceDownloaders[pe]
t.closePieceDownloader(pd2)
pd2.CancelPending()
}
}

t.blockPieceMessages = t.pieceMessages
t.pieceMessages = nil

pw := piecewriter.New(pv.Piece, pv.Buffer, pv.Piece.Length)
go pw.Run(t.pieceWriterResultC, t.doneC)

t.startPieceDownloaderFor(pv.Peer)
case pw := <-t.pieceWriterResultC:
pw.Piece.Writing = false

Expand Down
4 changes: 3 additions & 1 deletion torrent/torrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cenkalti/rain/internal/piececache"
"github.com/cenkalti/rain/internal/piecedownloader"
"github.com/cenkalti/rain/internal/piecepicker"
"github.com/cenkalti/rain/internal/pieceverifier"
"github.com/cenkalti/rain/internal/piecewriter"
"github.com/cenkalti/rain/internal/resourcemanager"
"github.com/cenkalti/rain/internal/resumer"
Expand Down Expand Up @@ -113,7 +114,8 @@ type torrent struct {
infoDownloaders map[*peer.Peer]*infodownloader.InfoDownloader
infoDownloadersSnubbed map[*peer.Peer]*infodownloader.InfoDownloader

pieceWriterResultC chan *piecewriter.PieceWriter
pieceVerifierResultC chan *pieceverifier.PieceVerifier
pieceWriterResultC chan *piecewriter.PieceWriter

// Some peers are optimistically unchoked regardless of their download rate.
optimisticUnchokedPeers []*peer.Peer
Expand Down

0 comments on commit 57baeda

Please sign in to comment.