diff --git a/libtorrent/src/download/delegator.cc b/libtorrent/src/download/delegator.cc index 711cd461..5d713380 100644 --- a/libtorrent/src/download/delegator.cc +++ b/libtorrent/src/download/delegator.cc @@ -52,192 +52,131 @@ namespace torrent { -struct DelegatorCheckAffinity { - DelegatorCheckAffinity(Delegator* delegator, Block** target, unsigned int index, const PeerInfo* peerInfo) : - m_delegator(delegator), m_target(target), m_index(index), m_peerInfo(peerInfo) {} - - bool operator () (BlockList* d) { - return m_index == d->index() && (*m_target = m_delegator->delegate_piece(d, m_peerInfo)) != NULL; +std::vector +Delegator::delegate(PeerChunks* peerChunks, uint32_t affinity, uint32_t maxPieces) { + // TODO: Make sure we don't queue the same piece several time on the same peer when + // it timeout cancels them. + std::vector new_transfers; + PeerInfo* peerInfo = peerChunks->peer_info(); + + // TODO: What if the hash failed? Don't want data from that peer again. + if (affinity >= 0) { + for (BlockList* itr : m_transfers) { + if (new_transfers.size() >= maxPieces) + return new_transfers; + if (affinity == itr->index()) + delegate_from_blocklist(new_transfers, maxPieces, itr, peerInfo); + } } - - Delegator* m_delegator; - Block** m_target; - unsigned int m_index; - const PeerInfo* m_peerInfo; -}; - -struct DelegatorCheckSeeder { - DelegatorCheckSeeder(Delegator* delegator, Block** target, const PeerInfo* peerInfo) : - m_delegator(delegator), m_target(target), m_peerInfo(peerInfo) {} - - bool operator () (BlockList* d) { - return d->by_seeder() && (*m_target = m_delegator->delegate_piece(d, m_peerInfo)) != NULL; + + // Prioritize full seeders + if (peerChunks->is_seeder()) { + for (BlockList* itr : m_transfers) { + if (new_transfers.size() >= maxPieces) + return new_transfers; + if (itr->by_seeder()) + delegate_from_blocklist(new_transfers, maxPieces, itr, peerInfo); + } + // Create new high priority pieces. + delegate_new_chunks(new_transfers, maxPieces, peerChunks, true); + // Create new normal priority pieces. + delegate_new_chunks(new_transfers, maxPieces, peerChunks, false); } - - Delegator* m_delegator; - Block** m_target; - const PeerInfo* m_peerInfo; -}; - -struct DelegatorCheckPriority { - DelegatorCheckPriority(Delegator* delegator, Block** target, priority_t p, const PeerChunks* peerChunks) : - m_delegator(delegator), m_target(target), m_priority(p), m_peerChunks(peerChunks) {} - - bool operator () (BlockList* d) { - return - m_priority == d->priority() && - m_peerChunks->bitfield()->get(d->index()) && - (*m_target = m_delegator->delegate_piece(d, m_peerChunks->peer_info())) != NULL; + if (new_transfers.size() >= maxPieces) + return new_transfers; + + // Find existing high priority pieces. + for (BlockList* itr : m_transfers) { + if (new_transfers.size() >= maxPieces) + return new_transfers; + if (itr->priority() == PRIORITY_HIGH && peerChunks->bitfield()->get(itr->index())) + delegate_from_blocklist(new_transfers, maxPieces, itr, peerInfo); } - Delegator* m_delegator; - Block** m_target; - priority_t m_priority; - const PeerChunks* m_peerChunks; -}; - -// TODO: Should this ensure we don't download pieces that are priority off? -struct DelegatorCheckAggressive { - DelegatorCheckAggressive(Delegator* delegator, Block** target, uint16_t* o, const PeerChunks* peerChunks) : - m_delegator(delegator), m_target(target), m_overlapp(o), m_peerChunks(peerChunks) {} - - bool operator () (BlockList* d) { - Block* tmp; - - if (!m_peerChunks->bitfield()->get(d->index()) || - d->priority() == PRIORITY_OFF || - (tmp = m_delegator->delegate_aggressive(d, m_overlapp, m_peerChunks->peer_info())) == NULL) - return false; + // Create new high priority pieces. + delegate_new_chunks(new_transfers, maxPieces, peerChunks, true); - *m_target = tmp; - return m_overlapp == 0; + // Find existing normal priority pieces. + for (BlockList* itr : m_transfers) { + if (new_transfers.size() >= maxPieces) + return new_transfers; + if (itr->priority() == PRIORITY_NORMAL && peerChunks->bitfield()->get(itr->index())) + delegate_from_blocklist(new_transfers, maxPieces, itr, peerInfo); } - Delegator* m_delegator; - Block** m_target; - uint16_t* m_overlapp; - const PeerChunks* m_peerChunks; -}; - -BlockTransfer* -Delegator::delegate(PeerChunks* peerChunks, int affinity) { - // TODO: Make sure we don't queue the same piece several time on the same peer when - // it timeout cancels them. - Block* target = NULL; - - // Find piece with same index as affinity. This affinity should ensure that we - // never start another piece while the chunk this peer used to download is still - // in progress. - // - // TODO: What if the hash failed? Don't want data from that peer again. - if (affinity >= 0 && - std::find_if(m_transfers.begin(), m_transfers.end(), DelegatorCheckAffinity(this, &target, affinity, peerChunks->peer_info())) - != m_transfers.end()) - return target->insert(peerChunks->peer_info()); - - if (peerChunks->is_seeder() && (target = delegate_seeder(peerChunks)) != NULL) - return target->insert(peerChunks->peer_info()); - - // High priority pieces. - if (std::find_if(m_transfers.begin(), m_transfers.end(), DelegatorCheckPriority(this, &target, PRIORITY_HIGH, peerChunks)) - != m_transfers.end()) - return target->insert(peerChunks->peer_info()); - - // Find normal priority pieces. - if ((target = new_chunk(peerChunks, true))) - return target->insert(peerChunks->peer_info()); - - // Normal priority pieces. - if (std::find_if(m_transfers.begin(), m_transfers.end(), DelegatorCheckPriority(this, &target, PRIORITY_NORMAL, peerChunks)) - != m_transfers.end()) - return target->insert(peerChunks->peer_info()); - - if ((target = new_chunk(peerChunks, false))) - return target->insert(peerChunks->peer_info()); + // Create new normal priority pieces. + delegate_new_chunks(new_transfers, maxPieces, peerChunks, false); if (!m_aggressive) - return NULL; + return new_transfers; - // Aggressive mode, look for possible downloads that already have + // In aggressive mode, look for possible downloads that already have // one or more queued. // No more than 4 per piece. uint16_t overlapped = 5; - std::find_if(m_transfers.begin(), m_transfers.end(), DelegatorCheckAggressive(this, &target, &overlapped, peerChunks)); - - return target ? target->insert(peerChunks->peer_info()) : NULL; -} - -Block* -Delegator::delegate_seeder(PeerChunks* peerChunks) { - Block* target = NULL; - - if (std::find_if(m_transfers.begin(), m_transfers.end(), DelegatorCheckSeeder(this, &target, peerChunks->peer_info())) - != m_transfers.end()) - return target; - - if ((target = new_chunk(peerChunks, true))) - return target; - - if ((target = new_chunk(peerChunks, false))) - return target; - - return NULL; + for (BlockList* itr : m_transfers) { + if (new_transfers.size() >= maxPieces) + return new_transfers; + if (peerChunks->bitfield()->get(itr->index()) && itr->priority() != PRIORITY_OFF) { + for (auto bl_itr = itr->begin(); bl_itr != itr->end() && overlapped != 0; bl_itr++) { + if (new_transfers.size() >= maxPieces || bl_itr->size_not_stalled() >= overlapped) + break; + if (!bl_itr->is_finished()) { + BlockTransfer* inserted_info = bl_itr->insert(peerInfo); + if (inserted_info != NULL) { + new_transfers.push_back(inserted_info); + overlapped = bl_itr->size_not_stalled(); + } + } + } + } + } + return new_transfers; } -Block* -Delegator::new_chunk(PeerChunks* pc, bool highPriority) { - uint32_t index = m_slot_chunk_find(pc, highPriority); +void +Delegator::delegate_new_chunks(std::vector &transfers, uint32_t maxPieces, PeerChunks* pc, bool highPriority) { + // Find new chunks and if successful, add all possible pieces into `transfers` + while (transfers.size() < maxPieces) { + uint32_t index = m_slot_chunk_find(pc, highPriority); - if (index == ~(uint32_t)0) - return NULL; + if (index == ~(uint32_t)0) + return; - TransferList::iterator itr = m_transfers.insert(Piece(index, 0, m_slot_chunk_size(index)), block_size); + TransferList::iterator itr = m_transfers.insert(Piece(index, 0, m_slot_chunk_size(index)), block_size); - (*itr)->set_by_seeder(pc->is_seeder()); + (*itr)->set_by_seeder(pc->is_seeder()); - if (highPriority) - (*itr)->set_priority(PRIORITY_HIGH); - else - (*itr)->set_priority(PRIORITY_NORMAL); + if (highPriority) + (*itr)->set_priority(PRIORITY_HIGH); + else + (*itr)->set_priority(PRIORITY_NORMAL); - return &*(*itr)->begin(); + delegate_from_blocklist(transfers, maxPieces, *itr, pc->peer_info()); + } } -Block* -Delegator::delegate_piece(BlockList* c, const PeerInfo* peerInfo) { - Block* p = NULL; - - for (BlockList::iterator i = c->begin(); i != c->end(); ++i) { - if (i->is_finished() || !i->is_stalled()) - continue; - - if (i->size_all() == 0) { - // No one is downloading this, assign. - return &*i; - - } else if (p == NULL && i->find(peerInfo) == NULL) { - // Stalled but we really want to finish this piece. Check 'p' so - // that we don't end up queuing the pieces in reverse. - p = &*i; +void +Delegator::delegate_from_blocklist(std::vector &transfers, uint32_t maxPieces, BlockList* c, PeerInfo* peerInfo) { + for (auto i = c->begin(); i != c->end() && transfers.size() < maxPieces; ++i) { + // If not finished and stalled, and no one is downloading this, then assign + if (!i->is_finished() && i->is_stalled() && i->size_all() == 0) + transfers.push_back(i->insert(peerInfo)); + } + if (transfers.size() >= maxPieces) + return; + + // Fill any remaining slots with potentially stalled pieces. + for (auto i = c->begin(); i != c->end() && transfers.size() < maxPieces; ++i) { + if (!i->is_finished() && i->is_stalled()) { + BlockTransfer* inserted_info = i->insert(peerInfo); + if (inserted_info != NULL) + transfers.push_back(inserted_info); } } - - return p; } -Block* -Delegator::delegate_aggressive(BlockList* c, uint16_t* overlapped, const PeerInfo* peerInfo) { - Block* p = NULL; - - for (BlockList::iterator i = c->begin(); i != c->end() && *overlapped != 0; ++i) - if (!i->is_finished() && i->size_not_stalled() < *overlapped && i->find(peerInfo) == NULL) { - p = &*i; - *overlapped = i->size_not_stalled(); - } - - return p; -} } // namespace torrent diff --git a/libtorrent/src/download/delegator.h b/libtorrent/src/download/delegator.h index 2e36fc96..340126e0 100644 --- a/libtorrent/src/download/delegator.h +++ b/libtorrent/src/download/delegator.h @@ -64,7 +64,7 @@ class Delegator { TransferList* transfer_list() { return &m_transfers; } const TransferList* transfer_list() const { return &m_transfers; } - BlockTransfer* delegate(PeerChunks* peerChunks, int affinity); + std::vector delegate(PeerChunks* peerChunks, uint32_t affinity, uint32_t maxPieces); bool get_aggressive() { return m_aggressive; } void set_aggressive(bool a) { m_aggressive = a; } @@ -72,16 +72,9 @@ class Delegator { slot_peer_chunk& slot_chunk_find() { return m_slot_chunk_find; } slot_size& slot_chunk_size() { return m_slot_chunk_size; } - // Don't call this from the outside. - Block* delegate_piece(BlockList* c, const PeerInfo* peerInfo); - Block* delegate_aggressive(BlockList* c, uint16_t* overlapped, const PeerInfo* peerInfo); - private: - // Start on a new chunk, returns .end() if none possible. bf is - // remote peer's bitfield. - Block* new_chunk(PeerChunks* pc, bool highPriority); - - Block* delegate_seeder(PeerChunks* peerChunks); + void delegate_from_blocklist(std::vector &transfers, uint32_t maxPieces, BlockList* c, PeerInfo* peerInfo); + void delegate_new_chunks(std::vector &transfers, uint32_t maxPieces, PeerChunks* pc, bool highPriority); TransferList m_transfers; diff --git a/libtorrent/src/protocol/peer_connection_base.cc b/libtorrent/src/protocol/peer_connection_base.cc index c02998fb..9ade6b97 100644 --- a/libtorrent/src/protocol/peer_connection_base.cc +++ b/libtorrent/src/protocol/peer_connection_base.cc @@ -970,24 +970,26 @@ PeerConnectionBase::try_request_pieces() { while (request_list()->queued_size() < pipeSize && m_up->can_write_request()) { - // Delegator should return a vector of pieces, and it should be - // passed the number of pieces it should delegate. Try to ensure - // it receives large enough request to fill a whole chunk if the - // peer is fast enough. + // It should get the right number the first time around, but loop just to be sure + int maxRequests = m_up->max_write_request(); + int maxQueued = pipeSize - request_list()->queued_size(); + int maxPieces = std::max(std::min(maxRequests, maxQueued), 1); - const Piece* p = request_list()->delegate(); - - if (p == NULL) - break; - - if (!m_download->file_list()->is_valid_piece(*p) || !m_peerChunks.bitfield()->get(p->index())) - throw internal_error("PeerConnectionBase::try_request_pieces() tried to use an invalid piece."); + std::vector pieces = request_list()->delegate(maxPieces); + if (pieces.empty()) { + return false; + } + + for (auto& p : pieces) { + if (!m_download->file_list()->is_valid_piece(*p) || !m_peerChunks.bitfield()->get(p->index())) + throw internal_error("PeerConnectionBase::try_request_pieces() tried to use an invalid piece."); - m_up->write_request(*p); + m_up->write_request(*p); - LT_LOG_PIECE_EVENTS("(down) requesting %" PRIu32 " %" PRIu32 " %" PRIu32, - p->index(), p->offset(), p->length()); - success = true; + LT_LOG_PIECE_EVENTS("(down) requesting %" PRIu32 " %" PRIu32 " %" PRIu32, + p->index(), p->offset(), p->length()); + success = true; + } } return success; diff --git a/libtorrent/src/protocol/peer_connection_metadata.cc b/libtorrent/src/protocol/peer_connection_metadata.cc index e68d0eff..760b48c5 100644 --- a/libtorrent/src/protocol/peer_connection_metadata.cc +++ b/libtorrent/src/protocol/peer_connection_metadata.cc @@ -436,11 +436,13 @@ PeerConnectionMetadata::try_request_metadata_pieces() { if (!m_up->can_write_extension() || m_extensions->has_pending_message()) return false; - const Piece* p = request_list()->delegate(); + std::vector pieces = request_list()->delegate(1); - if (p == NULL) + if (pieces.empty()) return false; + const Piece* p = pieces.front(); + if (!m_download->file_list()->is_valid_piece(*p) || !m_peerChunks.bitfield()->get(p->index())) throw internal_error("PeerConnectionMetadata::try_request_metadata_pieces() tried to use an invalid piece."); diff --git a/libtorrent/src/protocol/protocol_base.h b/libtorrent/src/protocol/protocol_base.h index 5022abff..cf95f320 100644 --- a/libtorrent/src/protocol/protocol_base.h +++ b/libtorrent/src/protocol/protocol_base.h @@ -143,6 +143,8 @@ class ProtocolBase { bool can_write_piece() const { return m_buffer.reserved_left() >= sizeof_piece; } bool can_write_port() const { return m_buffer.reserved_left() >= sizeof_port; } bool can_write_extension() const { return m_buffer.reserved_left() >= sizeof_extension; } + + size_type max_write_request() const { return m_buffer.reserved_left() / sizeof_request; } bool can_read_have_body() const { return m_buffer.remaining() >= sizeof_have_body; } bool can_read_request_body() const { return m_buffer.remaining() >= sizeof_request_body; } diff --git a/libtorrent/src/protocol/request_list.cc b/libtorrent/src/protocol/request_list.cc index 0f403a31..ea81d1ef 100644 --- a/libtorrent/src/protocol/request_list.cc +++ b/libtorrent/src/protocol/request_list.cc @@ -124,19 +124,25 @@ RequestList::~RequestList() { priority_queue_erase(&taskScheduler, &m_delay_process_unordered); } -const Piece* -RequestList::delegate() { - BlockTransfer* transfer = m_delegator->delegate(m_peerChunks, m_affinity); +std::vector +RequestList::delegate(uint32_t maxPieces) { + std::vector transfers = m_delegator->delegate(m_peerChunks, m_affinity, maxPieces); + + std::vector pieces; + if (transfers.empty()) + return pieces; instrumentation_update(INSTRUMENTATION_TRANSFER_REQUESTS_DELEGATED, 1); - if (transfer == NULL) - return NULL; + for (auto& itr : transfers) { + m_queues.push_back(bucket_queued, itr); + pieces.push_back(&(itr->piece())); + } - m_affinity = transfer->index(); - m_queues.push_back(bucket_queued, transfer); + // Use the last index returned for the next affinity + m_affinity = transfers.back()->index(); - return &transfer->piece(); + return pieces; } void diff --git a/libtorrent/src/protocol/request_list.h b/libtorrent/src/protocol/request_list.h index 9e5c6a8d..dab1bdf2 100644 --- a/libtorrent/src/protocol/request_list.h +++ b/libtorrent/src/protocol/request_list.h @@ -38,6 +38,7 @@ #define LIBTORRENT_REQUEST_LIST_H #include +#include #include "torrent/data/block_transfer.h" #include "utils/instrumentation.h" @@ -80,7 +81,7 @@ class RequestList { // Some parameters here, like how fast we are downloading and stuff // when we start considering those. - const Piece* delegate(); + std::vector delegate(uint32_t maxPieces); void stall_initial(); void stall_prolonged(); diff --git a/libtorrent/src/torrent/data/block.cc b/libtorrent/src/torrent/data/block.cc index dab8da4f..7caaf33b 100644 --- a/libtorrent/src/torrent/data/block.cc +++ b/libtorrent/src/torrent/data/block.cc @@ -80,7 +80,7 @@ Block::~Block() { BlockTransfer* Block::insert(PeerInfo* peerInfo) { if (find_queued(peerInfo) || find_transfer(peerInfo)) - throw internal_error("Block::insert(...) find_queued(peerInfo) || find_transfer(peerInfo)."); + return NULL; m_notStalled++; diff --git a/libtorrent/src/torrent/data/block.h b/libtorrent/src/torrent/data/block.h index 287183dd..9e429126 100644 --- a/libtorrent/src/torrent/data/block.h +++ b/libtorrent/src/torrent/data/block.h @@ -125,11 +125,13 @@ class LIBTORRENT_EXPORT Block { // will just delete the object. Made static so it can be called when // block == NULL. static void release(BlockTransfer* transfer); + + // Only allow move constructions + Block(const Block&) = delete; + void operator = (const Block&) = delete; + Block(Block&&) = default; private: - Block(const Block&); - void operator = (const Block&); - void invalidate_transfer(BlockTransfer* transfer) LIBTORRENT_NO_EXPORT; void remove_erased_transfers() LIBTORRENT_NO_EXPORT; diff --git a/libtorrent/src/torrent/data/block_list.h b/libtorrent/src/torrent/data/block_list.h index 712fd056..a9a10981 100644 --- a/libtorrent/src/torrent/data/block_list.h +++ b/libtorrent/src/torrent/data/block_list.h @@ -44,64 +44,22 @@ namespace torrent { -// Temporary workaround until we can use C++11's std::vector::emblace_back. -template -class no_copy_vector { +class LIBTORRENT_EXPORT BlockList : private std::vector { public: - typedef Type value_type; - typedef size_t size_type; - typedef value_type& reference; - typedef ptrdiff_t difference_type; - - typedef value_type* iterator; - typedef const value_type* const_iterator; - - no_copy_vector() : m_size(0), m_values(NULL) {} - ~no_copy_vector() { clear(); } - - size_type size() const { return m_size; } - bool empty() const { return m_size == 0; } - - void resize(size_type s) { clear(); m_size = s; m_values = new value_type[s]; } - - void clear() { delete [] m_values; m_values = NULL; m_size = 0; } - - iterator begin() { return m_values; } - const_iterator begin() const { return m_values; } - - iterator end() { return m_values + m_size; } - const_iterator end() const { return m_values + m_size; } - - value_type& back() { return *(m_values + m_size - 1); } - - value_type& operator[](size_type idx) { return m_values[idx]; } - -private: - no_copy_vector(const no_copy_vector&); - void operator = (const no_copy_vector&); - - size_type m_size; - Block* m_values; -}; - -class LIBTORRENT_EXPORT BlockList : public no_copy_vector { -public: - typedef no_copy_vector base_type; - typedef uint32_t size_type; + typedef uint32_t size_type; + typedef std::vector base_type; using base_type::value_type; using base_type::reference; using base_type::difference_type; using base_type::iterator; - // using base_type::reverse_iterator; + using base_type::const_iterator; using base_type::size; using base_type::empty; using base_type::begin; using base_type::end; - // using base_type::rbegin; - // using base_type::rend; using base_type::operator[];