From 63088a20b383795ac835260570d1d7f1210fae96 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cenk=20Alt=C4=B1?= Date: Sun, 31 Oct 2021 16:18:53 -0400 Subject: [PATCH] add cli command to download torrent from magnet --- .../resumer/boltdbresumer/boltdbresumer.go | 108 +++++++++++----- internal/resumer/boltdbresumer/spec.go | 4 + internal/rpctypes/rpctypes.go | 1 + main.go | 117 +++++++++++++++++- rainrpc/client.go | 3 + torrent/session_add.go | 6 + torrent/session_load.go | 2 + torrent/session_rpc_handler.go | 12 +- torrent/session_torrent.go | 9 +- torrent/torrent.go | 15 ++- torrent/torrent_commands.go | 4 + torrent/torrent_metadataextension.go | 11 +- torrent/torrent_start.go | 9 ++ 13 files changed, 259 insertions(+), 42 deletions(-) diff --git a/internal/resumer/boltdbresumer/boltdbresumer.go b/internal/resumer/boltdbresumer/boltdbresumer.go index 689afb93..27698d45 100644 --- a/internal/resumer/boltdbresumer/boltdbresumer.go +++ b/internal/resumer/boltdbresumer/boltdbresumer.go @@ -13,39 +13,43 @@ import ( // Keys for the persisten storage. var Keys = struct { - InfoHash []byte - Port []byte - Name []byte - Trackers []byte - URLList []byte - FixedPeers []byte - Dest []byte - Info []byte - Bitfield []byte - AddedAt []byte - BytesDownloaded []byte - BytesUploaded []byte - BytesWasted []byte - SeededFor []byte - Started []byte - CompleteCmdRun []byte + InfoHash []byte + Port []byte + Name []byte + Trackers []byte + URLList []byte + FixedPeers []byte + Dest []byte + Info []byte + Bitfield []byte + AddedAt []byte + BytesDownloaded []byte + BytesUploaded []byte + BytesWasted []byte + SeededFor []byte + Started []byte + StopAfterDownload []byte + StopAfterMetadata []byte + CompleteCmdRun []byte }{ - InfoHash: []byte("info_hash"), - Port: []byte("port"), - Name: []byte("name"), - Trackers: []byte("trackers"), - URLList: []byte("url_list"), - FixedPeers: []byte("fixed_peers"), - Dest: []byte("dest"), - Info: []byte("info"), - Bitfield: []byte("bitfield"), - AddedAt: []byte("added_at"), - BytesDownloaded: []byte("bytes_downloaded"), - BytesUploaded: []byte("bytes_uploaded"), - BytesWasted: []byte("bytes_wasted"), - SeededFor: []byte("seeded_for"), - Started: []byte("started"), - CompleteCmdRun: []byte("complete_cmd_run"), + InfoHash: []byte("info_hash"), + Port: []byte("port"), + Name: []byte("name"), + Trackers: []byte("trackers"), + URLList: []byte("url_list"), + FixedPeers: []byte("fixed_peers"), + Dest: []byte("dest"), + Info: []byte("info"), + Bitfield: []byte("bitfield"), + AddedAt: []byte("added_at"), + BytesDownloaded: []byte("bytes_downloaded"), + BytesUploaded: []byte("bytes_uploaded"), + BytesWasted: []byte("bytes_wasted"), + SeededFor: []byte("seeded_for"), + Started: []byte("started"), + StopAfterDownload: []byte("stop_after_download"), + StopAfterMetadata: []byte("stop_after_metadata"), + CompleteCmdRun: []byte("complete_cmd_run"), } // Resumer contains methods for saving/loading resume information of a torrent to a BoltDB database. @@ -103,6 +107,8 @@ func (r *Resumer) Write(torrentID string, spec *Spec) error { _ = b.Put(Keys.BytesWasted, []byte(strconv.FormatInt(spec.BytesWasted, 10))) _ = b.Put(Keys.SeededFor, []byte(spec.SeededFor.String())) _ = b.Put(Keys.Started, []byte(strconv.FormatBool(spec.Started))) + _ = b.Put(Keys.StopAfterDownload, []byte(strconv.FormatBool(spec.StopAfterDownload))) + _ = b.Put(Keys.StopAfterMetadata, []byte(strconv.FormatBool(spec.StopAfterMetadata))) _ = b.Put(Keys.CompleteCmdRun, []byte(strconv.FormatBool(spec.CompleteCmdRun))) return nil }) @@ -141,6 +147,28 @@ func (r *Resumer) WriteStarted(torrentID string, value bool) error { }) } +// WriteStopAfterDownload writes the start status of a torrent. +func (r *Resumer) WriteStopAfterDownload(torrentID string, value bool) error { + return r.db.Update(func(tx *bbolt.Tx) error { + b := tx.Bucket(r.bucket).Bucket([]byte(torrentID)) + if b == nil { + return nil + } + return b.Put(Keys.StopAfterDownload, []byte(strconv.FormatBool(value))) + }) +} + +// WriteStopAfterMetadata writes the start status of a torrent. +func (r *Resumer) WriteStopAfterMetadata(torrentID string, value bool) error { + return r.db.Update(func(tx *bbolt.Tx) error { + b := tx.Bucket(r.bucket).Bucket([]byte(torrentID)) + if b == nil { + return nil + } + return b.Put(Keys.StopAfterMetadata, []byte(strconv.FormatBool(value))) + }) +} + // WriteCompleteCmdRun writes the start status of a torrent. func (r *Resumer) WriteCompleteCmdRun(torrentID string) error { return r.db.Update(func(tx *bbolt.Tx) error { @@ -289,6 +317,22 @@ func (r *Resumer) Read(torrentID string) (spec *Spec, err error) { } } + value = b.Get(Keys.StopAfterDownload) + if value != nil { + spec.StopAfterDownload, err = strconv.ParseBool(string(value)) + if err != nil { + return err + } + } + + value = b.Get(Keys.StopAfterMetadata) + if value != nil { + spec.StopAfterMetadata, err = strconv.ParseBool(string(value)) + if err != nil { + return err + } + } + value = b.Get(Keys.CompleteCmdRun) if value != nil { spec.CompleteCmdRun, err = strconv.ParseBool(string(value)) diff --git a/internal/resumer/boltdbresumer/spec.go b/internal/resumer/boltdbresumer/spec.go index 77cd224c..2c26da8e 100644 --- a/internal/resumer/boltdbresumer/spec.go +++ b/internal/resumer/boltdbresumer/spec.go @@ -23,6 +23,7 @@ type Spec struct { SeededFor time.Duration Started bool StopAfterDownload bool + StopAfterMetadata bool CompleteCmdRun bool } @@ -38,6 +39,7 @@ type jsonSpec struct { BytesWasted int64 Started bool StopAfterDownload bool + StopAfterMetadata bool CompleteCmdRun bool // JSON unsafe types @@ -61,6 +63,7 @@ func (s Spec) MarshalJSON() ([]byte, error) { BytesWasted: s.BytesWasted, Started: s.Started, StopAfterDownload: s.StopAfterDownload, + StopAfterMetadata: s.StopAfterMetadata, CompleteCmdRun: s.CompleteCmdRun, InfoHash: base64.StdEncoding.EncodeToString(s.InfoHash), @@ -102,6 +105,7 @@ func (s *Spec) UnmarshalJSON(b []byte) error { s.BytesWasted = j.BytesWasted s.Started = j.Started s.StopAfterDownload = j.StopAfterDownload + s.StopAfterMetadata = j.StopAfterMetadata s.CompleteCmdRun = j.CompleteCmdRun return nil } diff --git a/internal/rpctypes/rpctypes.go b/internal/rpctypes/rpctypes.go index 375cf2e2..d10f3b08 100644 --- a/internal/rpctypes/rpctypes.go +++ b/internal/rpctypes/rpctypes.go @@ -176,6 +176,7 @@ type AddTorrentOptions struct { ID string Stopped bool StopAfterDownload bool + StopAfterMetadata bool } // AddTorrentRequest contains request arguments for Session.AddTorrent method. diff --git a/main.go b/main.go index 6fd1d59f..3005e608 100644 --- a/main.go +++ b/main.go @@ -104,6 +104,32 @@ func main() { }, Action: handleDownload, }, + { + Name: "magnet-to-torrent", + Usage: "download torrent from magnet link", + Flags: []cli.Flag{ + cli.StringFlag{ + Name: "config,c", + Usage: "read config from `FILE`", + Value: "~/rain/config.yaml", + }, + cli.StringFlag{ + Name: "magnet,m", + Usage: "magnet link", + Required: true, + }, + cli.StringFlag{ + Name: "output,o", + Usage: "output file", + }, + cli.DurationFlag{ + Name: "timeout,t", + Usage: "command fails if torrent cannot be downloaded after duration", + Value: time.Minute, + }, + }, + Action: handleMagnetToTorrent, + }, { Name: "server", Usage: "run rpc server and torrent client", @@ -160,6 +186,14 @@ func main() { Name: "stopped", Usage: "do not start torrent automatically", }, + cli.BoolFlag{ + Name: "stop-after-download", + Usage: "stop the torrent after download is finished", + }, + cli.BoolFlag{ + Name: "stop-after-metadata", + Usage: "stop the torrent after metadata download is finished", + }, cli.StringFlag{ Name: "id", Usage: "if id is not given, a unique id is automatically generated", @@ -733,6 +767,83 @@ func handleDownload(c *cli.Context) error { } } +func handleMagnetToTorrent(c *cli.Context) error { + arg := c.String("magnet") + output := c.String("output") + timeout := c.Duration("timeout") + cfg, err := prepareConfig(c) + if err != nil { + return err + } + dbFile, err := ioutil.TempFile("", "") + if err != nil { + return err + } + dbFileName := dbFile.Name() + defer os.Remove(dbFileName) + err = dbFile.Close() + if err != nil { + return err + } + cfg.Database = dbFileName + ses, err := torrent.NewSession(cfg) + if err != nil { + return err + } + defer ses.Close() + opt := &torrent.AddTorrentOptions{ + StopAfterMetadata: true, + } + t, err := ses.AddURI(arg, opt) + if err != nil { + return err + } + ch := make(chan os.Signal, 1) + signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) + timeoutC := time.After(timeout) + metadataC := t.NotifyMetadata() + for { + select { + case s := <-ch: + log.Noticef("received %s, stopping server", s) + err = t.Stop() + if err != nil { + return err + } + case <-time.After(timeout): + stats := t.Stats() + log.Infof("Status: %s, Peers: %d\n", stats.Status.String(), stats.Peers.Total) + case <-metadataC: + name := output + if name == "" { + name = t.Name() + ".torrent" + } + data, err := t.Torrent() + if err != nil { + return err + } + f, err := os.Create(name) + if err != nil { + return err + } + _, err = f.Write(data) + if err != nil { + return err + } + err = f.Close() + if err != nil { + return err + } + fmt.Println(name) + return nil + case <-timeoutC: + return fmt.Errorf("metadata cannot be downloaded in %s, try increasing timeout", timeout.String()) + case err = <-t.NotifyStop(): + return err + } + } +} + func handleBeforeClient(c *cli.Context) error { clt = rainrpc.NewClient(c.String("url")) clt.SetTimeout(c.Duration("timeout")) @@ -772,8 +883,10 @@ func handleAdd(c *cli.Context) error { var marshalErr error arg := c.String("torrent") addOpt := &rainrpc.AddTorrentOptions{ - Stopped: c.Bool("stopped"), - ID: c.String("id"), + Stopped: c.Bool("stopped"), + StopAfterDownload: c.Bool("stop-after-download"), + StopAfterMetadata: c.Bool("stop-after-metadata"), + ID: c.String("id"), } if isURI(arg) { resp, err := clt.AddURI(arg, addOpt) diff --git a/rainrpc/client.go b/rainrpc/client.go index 12a0b396..6820ec0f 100644 --- a/rainrpc/client.go +++ b/rainrpc/client.go @@ -62,6 +62,7 @@ type AddTorrentOptions struct { ID string Stopped bool StopAfterDownload bool + StopAfterMetadata bool } // AddTorrent adds a new torrent by reading .torrent file. @@ -75,6 +76,7 @@ func (c *Client) AddTorrent(f io.Reader, options *AddTorrentOptions) (*rpctypes. args.AddTorrentOptions.ID = options.ID args.AddTorrentOptions.Stopped = options.Stopped args.AddTorrentOptions.StopAfterDownload = options.StopAfterDownload + args.AddTorrentOptions.StopAfterMetadata = options.StopAfterMetadata } var reply rpctypes.AddTorrentResponse return &reply.Torrent, c.client.Call("Session.AddTorrent", args, &reply) @@ -87,6 +89,7 @@ func (c *Client) AddURI(uri string, options *AddTorrentOptions) (*rpctypes.Torre args.AddTorrentOptions.ID = options.ID args.AddTorrentOptions.Stopped = options.Stopped args.AddTorrentOptions.StopAfterDownload = options.StopAfterDownload + args.AddTorrentOptions.StopAfterMetadata = options.StopAfterMetadata } var reply rpctypes.AddURIResponse return &reply.Torrent, c.client.Call("Session.AddURI", args, &reply) diff --git a/torrent/session_add.go b/torrent/session_add.go index 4a13cef5..09c0ddb0 100644 --- a/torrent/session_add.go +++ b/torrent/session_add.go @@ -30,6 +30,8 @@ type AddTorrentOptions struct { Stopped bool // Stop torrent after all pieces are downloaded. StopAfterDownload bool + // Stop torrent after metadata is downloaded from magnet links. + StopAfterMetadata bool } // AddTorrent adds a new torrent to the session by reading .torrent metainfo from reader. @@ -89,6 +91,7 @@ func (s *Session) addTorrentStopped(r io.Reader, opt *AddTorrentOptions) (*Torre resumer.Stats{}, webseedsource.NewList(mi.URLList), opt.StopAfterDownload, + opt.StopAfterMetadata, false, // completeCmdRun ) if err != nil { @@ -109,6 +112,7 @@ func (s *Session) addTorrentStopped(r io.Reader, opt *AddTorrentOptions) (*Torre Info: mi.Info.Bytes, AddedAt: t.addedAt, StopAfterDownload: opt.StopAfterDownload, + StopAfterMetadata: opt.StopAfterMetadata, } err = s.resumer.Write(id, rspec) if err != nil { @@ -200,6 +204,7 @@ func (s *Session) addMagnet(link string, opt *AddTorrentOptions) (*Torrent, erro resumer.Stats{}, nil, // webseedSources opt.StopAfterDownload, + opt.StopAfterMetadata, false, // completeCmdRun ) if err != nil { @@ -219,6 +224,7 @@ func (s *Session) addMagnet(link string, opt *AddTorrentOptions) (*Torrent, erro FixedPeers: ma.Peers, AddedAt: t.addedAt, StopAfterDownload: opt.StopAfterDownload, + StopAfterMetadata: opt.StopAfterMetadata, } err = s.resumer.Write(id, rspec) if err != nil { diff --git a/torrent/session_load.go b/torrent/session_load.go index d70e30a1..3606496b 100644 --- a/torrent/session_load.go +++ b/torrent/session_load.go @@ -104,6 +104,7 @@ func (s *Session) loadExistingTorrent(id string) (tt *Torrent, hasStarted bool, }, webseedsource.NewList(spec.URLList), spec.StopAfterDownload, + spec.StopAfterMetadata, spec.CompleteCmdRun, ) if err != nil { @@ -168,6 +169,7 @@ func (s *Session) CompactDatabase(output string) error { Info: t.torrent.info.Bytes, AddedAt: t.torrent.addedAt, StopAfterDownload: t.torrent.stopAfterDownload, + StopAfterMetadata: t.torrent.stopAfterMetadata, } err = res.Write(t.torrent.id, spec) if err != nil { diff --git a/torrent/session_rpc_handler.go b/torrent/session_rpc_handler.go index 7f718116..ccaba245 100644 --- a/torrent/session_rpc_handler.go +++ b/torrent/session_rpc_handler.go @@ -42,8 +42,10 @@ func (h *rpcHandler) ListTorrents(args *rpctypes.ListTorrentsRequest, reply *rpc func (h *rpcHandler) AddTorrent(args *rpctypes.AddTorrentRequest, reply *rpctypes.AddTorrentResponse) error { r := base64.NewDecoder(base64.StdEncoding, strings.NewReader(args.Torrent)) opt := &AddTorrentOptions{ - Stopped: args.AddTorrentOptions.Stopped, - ID: args.AddTorrentOptions.ID, + Stopped: args.AddTorrentOptions.Stopped, + ID: args.AddTorrentOptions.ID, + StopAfterDownload: args.StopAfterDownload, + StopAfterMetadata: args.StopAfterMetadata, } t, err := h.session.AddTorrent(r, opt) var e *InputError @@ -59,8 +61,10 @@ func (h *rpcHandler) AddTorrent(args *rpctypes.AddTorrentRequest, reply *rpctype func (h *rpcHandler) AddURI(args *rpctypes.AddURIRequest, reply *rpctypes.AddURIResponse) error { opt := &AddTorrentOptions{ - Stopped: args.AddTorrentOptions.Stopped, - ID: args.AddTorrentOptions.ID, + Stopped: args.AddTorrentOptions.Stopped, + ID: args.AddTorrentOptions.ID, + StopAfterDownload: args.StopAfterDownload, + StopAfterMetadata: args.StopAfterMetadata, } t, err := h.session.AddURI(args.URI, opt) var e *InputError diff --git a/torrent/session_torrent.go b/torrent/session_torrent.go index 5f0dbc85..6e8fffea 100644 --- a/torrent/session_torrent.go +++ b/torrent/session_torrent.go @@ -100,12 +100,19 @@ func (t *Torrent) NotifyStop() <-chan error { } // NotifyComplete returns a channel for notifying completion. -// The channel is closed once all pieces are downloaded successfully. +// The channel is closed once all torrent pieces are downloaded successfully. // NotifyComplete must be called after calling Start(). func (t *Torrent) NotifyComplete() <-chan struct{} { return t.torrent.NotifyComplete() } +// NotifyMetadata returns a channel for notifying completion of metadata download from magnet links. +// The channel is closed once all metadata pieces are downloaded successfully. +// NotifyMetadata must be called after calling Start(). +func (t *Torrent) NotifyMetadata() <-chan struct{} { + return t.torrent.NotifyMetadata() +} + // AddPeer adds a new peer to the torrent. Does nothing if torrent is stopped. func (t *Torrent) AddPeer(addr string) error { return t.torrent.addPeerString(addr) diff --git a/torrent/torrent.go b/torrent/torrent.go index 4f64ca3a..8d0765a4 100644 --- a/torrent/torrent.go +++ b/torrent/torrent.go @@ -117,9 +117,12 @@ type torrent struct { pieceWriterResultC chan *piecewriter.PieceWriter - // This channel is closed once all pieces are downloaded and verified. + // This channel is closed once all torrent pieces are downloaded and verified. completeC chan struct{} + // This channel is closed once all metadata pieces are downloaded and verified. + completeMetadataC chan struct{} + // True after all pieces are download, verified and written to disk. completed bool @@ -247,15 +250,20 @@ type torrent struct { // Set to true when manual verification is requested doVerify bool - // If true, the torrent is stopped automatically when all pieces are downloaded. + // If true, the torrent is stopped automatically when all torrent pieces are downloaded. stopAfterDownload bool + // If true, the torrent is stopped automatically when all metadata pieces are downloaded. + stopAfterMetadata bool + // True means that completeCmd has run before. completeCmdRun bool log logger.Logger } +// newTorrent2 is a constructor for torrent struct. +// loadExistingTorrents, addTorrentStopped and addMagnet ultimately calls this method. func newTorrent2( s *Session, id string, @@ -271,6 +279,7 @@ func newTorrent2( stats resumer.Stats, // initial stats from previous run ws []*webseedsource.WebseedSource, stopAfterDownload bool, + stopAfterMetadata bool, completeCmdRun bool, ) (*torrent, error) { if len(infoHash) != 20 { @@ -306,6 +315,7 @@ func newTorrent2( infoDownloadersSnubbed: make(map[*peer.Peer]*infodownloader.InfoDownloader), pieceWriterResultC: make(chan *piecewriter.PieceWriter), completeC: make(chan struct{}), + completeMetadataC: make(chan struct{}), closeC: make(chan chan struct{}), startCommandC: make(chan struct{}), stopCommandC: make(chan struct{}), @@ -350,6 +360,7 @@ func newTorrent2( webseedRetryC: make(chan *webseedsource.WebseedSource), doneC: make(chan struct{}), stopAfterDownload: stopAfterDownload, + stopAfterMetadata: stopAfterMetadata, completeCmdRun: completeCmdRun, } if len(t.webseedSources) > s.config.WebseedMaxSources { diff --git a/torrent/torrent_commands.go b/torrent/torrent_commands.go index c6581f95..8be2d5d0 100644 --- a/torrent/torrent_commands.go +++ b/torrent/torrent_commands.go @@ -55,6 +55,10 @@ func (t *torrent) NotifyComplete() <-chan struct{} { return t.completeC } +func (t *torrent) NotifyMetadata() <-chan struct{} { + return t.completeMetadataC +} + type notifyErrorCommand struct { errCC chan chan error } diff --git a/torrent/torrent_metadataextension.go b/torrent/torrent_metadataextension.go index 07f9933a..4fa76b08 100644 --- a/torrent/torrent_metadataextension.go +++ b/torrent/torrent_metadataextension.go @@ -97,7 +97,16 @@ func (t *torrent) handleMetadataMessage(pe *peer.Peer, msg peerprotocol.Extensio t.stop(fmt.Errorf("cannot write resume info: %s", err)) break } - t.startAllocator() + select { + case <-t.completeMetadataC: + default: + close(t.completeMetadataC) + } + if t.stopAfterMetadata { + t.stop(nil) + } else { + t.startAllocator() + } case peerprotocol.ExtensionMetadataMessageTypeReject: id, ok := t.infoDownloaders[pe] if ok { diff --git a/torrent/torrent_start.go b/torrent/torrent_start.go index 9664d5c7..658d9df7 100644 --- a/torrent/torrent_start.go +++ b/torrent/torrent_start.go @@ -35,6 +35,15 @@ func (t *torrent) start() { t.downloadSpeed = metrics.NewMeter() t.uploadSpeed = metrics.NewMeter() + if t.info != nil && t.stopAfterMetadata { + t.stopAfterMetadata = false + _ = t.session.resumer.WriteStopAfterMetadata(t.id, false) + } + if t.completed && t.stopAfterDownload { + t.stopAfterDownload = false + _ = t.session.resumer.WriteStopAfterDownload(t.id, false) + } + if t.info != nil { if t.pieces != nil { if t.bitfield != nil {