From 57baedaa225fdc05787e8e7524f4f59857258610 Mon Sep 17 00:00:00 2001 From: Cenk Alti Date: Mon, 18 Feb 2019 15:56:20 +0300 Subject: [PATCH] verify piece in separate goroutine --- internal/piece/piece.go | 15 ++++++----- internal/piecepicker/piecepicker.go | 8 +++--- internal/pieceverifier/pieceverifier.go | 35 +++++++++++++++++++++++++ internal/piecewriter/piecewriter.go | 16 ++--------- torrent/messagehandler.go | 34 +++--------------------- torrent/options.go | 2 ++ torrent/run.go | 31 ++++++++++++++++++++++ torrent/torrent.go | 4 ++- 8 files changed, 89 insertions(+), 56 deletions(-) create mode 100644 internal/pieceverifier/pieceverifier.go diff --git a/internal/piece/piece.go b/internal/piece/piece.go index 46f89b76..d9956b78 100644 --- a/internal/piece/piece.go +++ b/internal/piece/piece.go @@ -11,13 +11,14 @@ import ( // Piece of a torrent. type Piece struct { - Index uint32 // index in torrent - Length uint32 // always equal to Info.PieceLength except last piece - Blocks Blocks - Data filesection.Piece // the place to write downloaded bytes - Hash []byte - Writing bool - Done bool + Index uint32 // index in torrent + Length uint32 // always equal to Info.PieceLength except last piece + Blocks Blocks + Data filesection.Piece // the place to write downloaded bytes + Hash []byte + Verifying bool + Writing bool + Done bool } func NewPieces(info *metainfo.Info, files []storage.File) []Piece { diff --git a/internal/piecepicker/piecepicker.go b/internal/piecepicker/piecepicker.go index f95251e3..e696f061 100644 --- a/internal/piecepicker/piecepicker.go +++ b/internal/piecepicker/piecepicker.go @@ -151,7 +151,7 @@ func (p *PiecePicker) findPiece(pe *peer.Peer) *myPiece { func (p *PiecePicker) pickAllowedFast(pe *peer.Peer) *myPiece { for _, pi := range pe.AllowedFast.Pieces { mp := &p.pieces[pi.Index] - if mp.Done || mp.Writing { + if mp.Done || mp.Writing || mp.Verifying { continue } if mp.Requested.Len() == 0 && mp.Having.Has(pe) { @@ -170,7 +170,7 @@ func (p *PiecePicker) pickRarest(pe *peer.Peer) *myPiece { var hasUnrequested bool // Select unrequested piece for _, mp := range p.piecesByAvailability { - if mp.Done || mp.Writing { + if mp.Done || mp.Writing || mp.Verifying { continue } if mp.Requested.Len() == 0 && mp.Having.Has(pe) { @@ -194,7 +194,7 @@ func (p *PiecePicker) pickEndgame(pe *peer.Peer) *myPiece { }) // Select unrequested piece for _, mp := range p.piecesByAvailability { - if mp.Done || mp.Writing { + if mp.Done || mp.Writing || mp.Verifying { continue } if mp.Requested.Len() < p.maxDuplicateDownload && mp.Having.Has(pe) { @@ -211,7 +211,7 @@ func (p *PiecePicker) pickStalled(pe *peer.Peer) *myPiece { }) // Select unrequested piece for _, mp := range p.piecesByStalled { - if mp.Done || mp.Writing { + if mp.Done || mp.Writing || mp.Verifying { continue } if mp.RunningDownloads() > 0 { diff --git a/internal/pieceverifier/pieceverifier.go b/internal/pieceverifier/pieceverifier.go new file mode 100644 index 00000000..663a36af --- /dev/null +++ b/internal/pieceverifier/pieceverifier.go @@ -0,0 +1,35 @@ +package pieceverifier + +import ( + "crypto/sha1" // nolint: gosec + + "github.com/cenkalti/rain/internal/peer" + "github.com/cenkalti/rain/internal/piece" +) + +type PieceVerifier struct { + Piece *piece.Piece + Peer *peer.Peer + Buffer []byte + Length uint32 + OK bool +} + +func New(p *piece.Piece, pe *peer.Peer, buf []byte, length uint32) *PieceVerifier { + return &PieceVerifier{ + Piece: p, + Peer: pe, + Buffer: buf, + Length: length, + } +} + +func (v *PieceVerifier) Run(resultC chan *PieceVerifier, closeC chan struct{}) { + data := v.Buffer[:v.Length] + v.OK = v.Piece.VerifyHash(data, sha1.New()) // nolint: gosec + + select { + case resultC <- v: + case <-closeC: + } +} diff --git a/internal/piecewriter/piecewriter.go b/internal/piecewriter/piecewriter.go index 90e2f172..c2e6f7a2 100644 --- a/internal/piecewriter/piecewriter.go +++ b/internal/piecewriter/piecewriter.go @@ -9,9 +9,6 @@ type PieceWriter struct { Buffer []byte Lenght uint32 Error error - - closeC chan struct{} - doneC chan struct{} } func New(p *piece.Piece, buf []byte, length uint32) *PieceWriter { @@ -19,22 +16,13 @@ func New(p *piece.Piece, buf []byte, length uint32) *PieceWriter { Piece: p, Buffer: buf, Lenght: length, - closeC: make(chan struct{}), - doneC: make(chan struct{}), } } -func (w *PieceWriter) Close() { - close(w.closeC) - <-w.doneC -} - -func (w *PieceWriter) Run(resultC chan *PieceWriter) { - defer close(w.doneC) - +func (w *PieceWriter) Run(resultC chan *PieceWriter, closeC chan struct{}) { _, w.Error = w.Piece.Data.Write(w.Buffer[:w.Lenght]) select { case resultC <- w: - case <-w.closeC: + case <-closeC: } } diff --git a/torrent/messagehandler.go b/torrent/messagehandler.go index 43d12d62..b6b48630 100644 --- a/torrent/messagehandler.go +++ b/torrent/messagehandler.go @@ -12,7 +12,7 @@ import ( "github.com/cenkalti/rain/internal/peer" "github.com/cenkalti/rain/internal/peerconn/peerwriter" "github.com/cenkalti/rain/internal/peerprotocol" - "github.com/cenkalti/rain/internal/piecewriter" + "github.com/cenkalti/rain/internal/pieceverifier" "github.com/cenkalti/rain/internal/tracker" ) @@ -62,35 +62,9 @@ func (t *torrent) handlePieceMessage(pm peer.PieceMessage) { t.closePieceDownloader(pd) pe.StopSnubTimer() - ok = piece.VerifyHash(pd.Buffer[:pd.Piece.Length], sha1.New()) // nolint: gosec - if !ok { - t.resumerStats.BytesWasted += int64(len(msg.Data)) - t.log.Error("received corrupt piece") - t.closePeer(pd.Peer) - t.startPieceDownloaderFor(pe) - return - } - - if t.piecePicker != nil { - for _, pe := range t.piecePicker.RequestedPeers(piece.Index) { - pd2 := t.pieceDownloaders[pe] - t.closePieceDownloader(pd2) - pd2.CancelPending() - } - } - - if piece.Writing { - panic("piece already writing") - } - piece.Writing = true - - t.blockPieceMessages = t.pieceMessages - t.pieceMessages = nil - - pw := piecewriter.New(piece, pd.Buffer, pd.Piece.Length) - go pw.Run(t.pieceWriterResultC) - - t.startPieceDownloaderFor(pe) + piece.Verifying = true + pv := pieceverifier.New(piece, pe, pd.Buffer, pd.Piece.Length) + go pv.Run(t.pieceVerifierResultC, t.doneC) } func (t *torrent) handlePeerMessage(pm peer.Message) { diff --git a/torrent/options.go b/torrent/options.go index 7fe73a8f..51d84e32 100644 --- a/torrent/options.go +++ b/torrent/options.go @@ -20,6 +20,7 @@ import ( "github.com/cenkalti/rain/internal/peer" "github.com/cenkalti/rain/internal/piececache" "github.com/cenkalti/rain/internal/piecedownloader" + "github.com/cenkalti/rain/internal/pieceverifier" "github.com/cenkalti/rain/internal/piecewriter" "github.com/cenkalti/rain/internal/resourcemanager" "github.com/cenkalti/rain/internal/resumer" @@ -96,6 +97,7 @@ func (o *options) NewTorrent(infoHash []byte, sto storage.Storage) (*torrent, er peerSnubbedC: make(chan *peer.Peer), infoDownloaders: make(map[*peer.Peer]*infodownloader.InfoDownloader), infoDownloadersSnubbed: make(map[*peer.Peer]*infodownloader.InfoDownloader), + pieceVerifierResultC: make(chan *pieceverifier.PieceVerifier), pieceWriterResultC: make(chan *piecewriter.PieceWriter), optimisticUnchokedPeers: make([]*peer.Peer, 0, cfg.OptimisticUnchokedPeers), completeC: make(chan struct{}), diff --git a/torrent/run.go b/torrent/run.go index a9f099ad..3394b73d 100644 --- a/torrent/run.go +++ b/torrent/run.go @@ -17,6 +17,7 @@ import ( "github.com/cenkalti/rain/internal/peerconn" "github.com/cenkalti/rain/internal/peerprotocol" "github.com/cenkalti/rain/internal/piecedownloader" + "github.com/cenkalti/rain/internal/piecewriter" ) var errClosed = errors.New("torrent is closed") @@ -112,6 +113,36 @@ func (t *torrent) run() { case req.Response <- announcer.Response{Torrent: tr}: case <-req.Cancel: } + case pv := <-t.pieceVerifierResultC: + pv.Piece.Verifying = false + + if !pv.OK { + t.resumerStats.BytesWasted += int64(pv.Length) + t.log.Error("received corrupt piece") + t.closePeer(pv.Peer) + return + } + + if pv.Piece.Writing { + break + } + pv.Piece.Writing = true + + if t.piecePicker != nil { + for _, pe := range t.piecePicker.RequestedPeers(pv.Piece.Index) { + pd2 := t.pieceDownloaders[pe] + t.closePieceDownloader(pd2) + pd2.CancelPending() + } + } + + t.blockPieceMessages = t.pieceMessages + t.pieceMessages = nil + + pw := piecewriter.New(pv.Piece, pv.Buffer, pv.Piece.Length) + go pw.Run(t.pieceWriterResultC, t.doneC) + + t.startPieceDownloaderFor(pv.Peer) case pw := <-t.pieceWriterResultC: pw.Piece.Writing = false diff --git a/torrent/torrent.go b/torrent/torrent.go index 52ee981d..d17b05ec 100644 --- a/torrent/torrent.go +++ b/torrent/torrent.go @@ -22,6 +22,7 @@ import ( "github.com/cenkalti/rain/internal/piececache" "github.com/cenkalti/rain/internal/piecedownloader" "github.com/cenkalti/rain/internal/piecepicker" + "github.com/cenkalti/rain/internal/pieceverifier" "github.com/cenkalti/rain/internal/piecewriter" "github.com/cenkalti/rain/internal/resourcemanager" "github.com/cenkalti/rain/internal/resumer" @@ -113,7 +114,8 @@ type torrent struct { infoDownloaders map[*peer.Peer]*infodownloader.InfoDownloader infoDownloadersSnubbed map[*peer.Peer]*infodownloader.InfoDownloader - pieceWriterResultC chan *piecewriter.PieceWriter + pieceVerifierResultC chan *pieceverifier.PieceVerifier + pieceWriterResultC chan *piecewriter.PieceWriter // Some peers are optimistically unchoked regardless of their download rate. optimisticUnchokedPeers []*peer.Peer