Skip to content

Commit

Permalink
Merge pull request #2 from stickz/develop
Browse files Browse the repository at this point in the history
Promote fast download delegator to stable
  • Loading branch information
stickz authored Apr 15, 2024
2 parents 42db34e + 7f0c332 commit 7c8213b
Show file tree
Hide file tree
Showing 10 changed files with 150 additions and 245 deletions.
257 changes: 98 additions & 159 deletions libtorrent/src/download/delegator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,192 +52,131 @@

namespace torrent {

struct DelegatorCheckAffinity {
DelegatorCheckAffinity(Delegator* delegator, Block** target, unsigned int index, const PeerInfo* peerInfo) :
m_delegator(delegator), m_target(target), m_index(index), m_peerInfo(peerInfo) {}

bool operator () (BlockList* d) {
return m_index == d->index() && (*m_target = m_delegator->delegate_piece(d, m_peerInfo)) != NULL;
std::vector<BlockTransfer*>
Delegator::delegate(PeerChunks* peerChunks, uint32_t affinity, uint32_t maxPieces) {
// TODO: Make sure we don't queue the same piece several time on the same peer when
// it timeout cancels them.
std::vector<BlockTransfer*> new_transfers;
PeerInfo* peerInfo = peerChunks->peer_info();

// TODO: What if the hash failed? Don't want data from that peer again.
if (affinity >= 0) {
for (BlockList* itr : m_transfers) {
if (new_transfers.size() >= maxPieces)
return new_transfers;
if (affinity == itr->index())
delegate_from_blocklist(new_transfers, maxPieces, itr, peerInfo);
}
}

Delegator* m_delegator;
Block** m_target;
unsigned int m_index;
const PeerInfo* m_peerInfo;
};

struct DelegatorCheckSeeder {
DelegatorCheckSeeder(Delegator* delegator, Block** target, const PeerInfo* peerInfo) :
m_delegator(delegator), m_target(target), m_peerInfo(peerInfo) {}

bool operator () (BlockList* d) {
return d->by_seeder() && (*m_target = m_delegator->delegate_piece(d, m_peerInfo)) != NULL;
// Prioritize full seeders
if (peerChunks->is_seeder()) {
for (BlockList* itr : m_transfers) {
if (new_transfers.size() >= maxPieces)
return new_transfers;
if (itr->by_seeder())
delegate_from_blocklist(new_transfers, maxPieces, itr, peerInfo);
}
// Create new high priority pieces.
delegate_new_chunks(new_transfers, maxPieces, peerChunks, true);
// Create new normal priority pieces.
delegate_new_chunks(new_transfers, maxPieces, peerChunks, false);
}

Delegator* m_delegator;
Block** m_target;
const PeerInfo* m_peerInfo;
};

struct DelegatorCheckPriority {
DelegatorCheckPriority(Delegator* delegator, Block** target, priority_t p, const PeerChunks* peerChunks) :
m_delegator(delegator), m_target(target), m_priority(p), m_peerChunks(peerChunks) {}

bool operator () (BlockList* d) {
return
m_priority == d->priority() &&
m_peerChunks->bitfield()->get(d->index()) &&
(*m_target = m_delegator->delegate_piece(d, m_peerChunks->peer_info())) != NULL;
if (new_transfers.size() >= maxPieces)
return new_transfers;

// Find existing high priority pieces.
for (BlockList* itr : m_transfers) {
if (new_transfers.size() >= maxPieces)
return new_transfers;
if (itr->priority() == PRIORITY_HIGH && peerChunks->bitfield()->get(itr->index()))
delegate_from_blocklist(new_transfers, maxPieces, itr, peerInfo);
}

Delegator* m_delegator;
Block** m_target;
priority_t m_priority;
const PeerChunks* m_peerChunks;
};

// TODO: Should this ensure we don't download pieces that are priority off?
struct DelegatorCheckAggressive {
DelegatorCheckAggressive(Delegator* delegator, Block** target, uint16_t* o, const PeerChunks* peerChunks) :
m_delegator(delegator), m_target(target), m_overlapp(o), m_peerChunks(peerChunks) {}

bool operator () (BlockList* d) {
Block* tmp;

if (!m_peerChunks->bitfield()->get(d->index()) ||
d->priority() == PRIORITY_OFF ||
(tmp = m_delegator->delegate_aggressive(d, m_overlapp, m_peerChunks->peer_info())) == NULL)
return false;
// Create new high priority pieces.
delegate_new_chunks(new_transfers, maxPieces, peerChunks, true);

*m_target = tmp;
return m_overlapp == 0;
// Find existing normal priority pieces.
for (BlockList* itr : m_transfers) {
if (new_transfers.size() >= maxPieces)
return new_transfers;
if (itr->priority() == PRIORITY_NORMAL && peerChunks->bitfield()->get(itr->index()))
delegate_from_blocklist(new_transfers, maxPieces, itr, peerInfo);
}

Delegator* m_delegator;
Block** m_target;
uint16_t* m_overlapp;
const PeerChunks* m_peerChunks;
};

BlockTransfer*
Delegator::delegate(PeerChunks* peerChunks, int affinity) {
// TODO: Make sure we don't queue the same piece several time on the same peer when
// it timeout cancels them.
Block* target = NULL;

// Find piece with same index as affinity. This affinity should ensure that we
// never start another piece while the chunk this peer used to download is still
// in progress.
//
// TODO: What if the hash failed? Don't want data from that peer again.
if (affinity >= 0 &&
std::find_if(m_transfers.begin(), m_transfers.end(), DelegatorCheckAffinity(this, &target, affinity, peerChunks->peer_info()))
!= m_transfers.end())
return target->insert(peerChunks->peer_info());

if (peerChunks->is_seeder() && (target = delegate_seeder(peerChunks)) != NULL)
return target->insert(peerChunks->peer_info());

// High priority pieces.
if (std::find_if(m_transfers.begin(), m_transfers.end(), DelegatorCheckPriority(this, &target, PRIORITY_HIGH, peerChunks))
!= m_transfers.end())
return target->insert(peerChunks->peer_info());

// Find normal priority pieces.
if ((target = new_chunk(peerChunks, true)))
return target->insert(peerChunks->peer_info());

// Normal priority pieces.
if (std::find_if(m_transfers.begin(), m_transfers.end(), DelegatorCheckPriority(this, &target, PRIORITY_NORMAL, peerChunks))
!= m_transfers.end())
return target->insert(peerChunks->peer_info());

if ((target = new_chunk(peerChunks, false)))
return target->insert(peerChunks->peer_info());
// Create new normal priority pieces.
delegate_new_chunks(new_transfers, maxPieces, peerChunks, false);

if (!m_aggressive)
return NULL;
return new_transfers;

// Aggressive mode, look for possible downloads that already have
// In aggressive mode, look for possible downloads that already have
// one or more queued.

// No more than 4 per piece.
uint16_t overlapped = 5;

std::find_if(m_transfers.begin(), m_transfers.end(), DelegatorCheckAggressive(this, &target, &overlapped, peerChunks));

return target ? target->insert(peerChunks->peer_info()) : NULL;
}

Block*
Delegator::delegate_seeder(PeerChunks* peerChunks) {
Block* target = NULL;

if (std::find_if(m_transfers.begin(), m_transfers.end(), DelegatorCheckSeeder(this, &target, peerChunks->peer_info()))
!= m_transfers.end())
return target;

if ((target = new_chunk(peerChunks, true)))
return target;

if ((target = new_chunk(peerChunks, false)))
return target;

return NULL;
for (BlockList* itr : m_transfers) {
if (new_transfers.size() >= maxPieces)
return new_transfers;
if (peerChunks->bitfield()->get(itr->index()) && itr->priority() != PRIORITY_OFF) {
for (auto bl_itr = itr->begin(); bl_itr != itr->end() && overlapped != 0; bl_itr++) {
if (new_transfers.size() >= maxPieces || bl_itr->size_not_stalled() >= overlapped)
break;
if (!bl_itr->is_finished()) {
BlockTransfer* inserted_info = bl_itr->insert(peerInfo);
if (inserted_info != NULL) {
new_transfers.push_back(inserted_info);
overlapped = bl_itr->size_not_stalled();
}
}
}
}
}
return new_transfers;
}

Block*
Delegator::new_chunk(PeerChunks* pc, bool highPriority) {
uint32_t index = m_slot_chunk_find(pc, highPriority);
void
Delegator::delegate_new_chunks(std::vector<BlockTransfer*> &transfers, uint32_t maxPieces, PeerChunks* pc, bool highPriority) {
// Find new chunks and if successful, add all possible pieces into `transfers`
while (transfers.size() < maxPieces) {
uint32_t index = m_slot_chunk_find(pc, highPriority);

if (index == ~(uint32_t)0)
return NULL;
if (index == ~(uint32_t)0)
return;

TransferList::iterator itr = m_transfers.insert(Piece(index, 0, m_slot_chunk_size(index)), block_size);
TransferList::iterator itr = m_transfers.insert(Piece(index, 0, m_slot_chunk_size(index)), block_size);

(*itr)->set_by_seeder(pc->is_seeder());
(*itr)->set_by_seeder(pc->is_seeder());

if (highPriority)
(*itr)->set_priority(PRIORITY_HIGH);
else
(*itr)->set_priority(PRIORITY_NORMAL);
if (highPriority)
(*itr)->set_priority(PRIORITY_HIGH);
else
(*itr)->set_priority(PRIORITY_NORMAL);

return &*(*itr)->begin();
delegate_from_blocklist(transfers, maxPieces, *itr, pc->peer_info());
}
}

Block*
Delegator::delegate_piece(BlockList* c, const PeerInfo* peerInfo) {
Block* p = NULL;

for (BlockList::iterator i = c->begin(); i != c->end(); ++i) {
if (i->is_finished() || !i->is_stalled())
continue;

if (i->size_all() == 0) {
// No one is downloading this, assign.
return &*i;

} else if (p == NULL && i->find(peerInfo) == NULL) {
// Stalled but we really want to finish this piece. Check 'p' so
// that we don't end up queuing the pieces in reverse.
p = &*i;
void
Delegator::delegate_from_blocklist(std::vector<BlockTransfer*> &transfers, uint32_t maxPieces, BlockList* c, PeerInfo* peerInfo) {
for (auto i = c->begin(); i != c->end() && transfers.size() < maxPieces; ++i) {
// If not finished and stalled, and no one is downloading this, then assign
if (!i->is_finished() && i->is_stalled() && i->size_all() == 0)
transfers.push_back(i->insert(peerInfo));
}
if (transfers.size() >= maxPieces)
return;

// Fill any remaining slots with potentially stalled pieces.
for (auto i = c->begin(); i != c->end() && transfers.size() < maxPieces; ++i) {
if (!i->is_finished() && i->is_stalled()) {
BlockTransfer* inserted_info = i->insert(peerInfo);
if (inserted_info != NULL)
transfers.push_back(inserted_info);
}
}

return p;
}

Block*
Delegator::delegate_aggressive(BlockList* c, uint16_t* overlapped, const PeerInfo* peerInfo) {
Block* p = NULL;

for (BlockList::iterator i = c->begin(); i != c->end() && *overlapped != 0; ++i)
if (!i->is_finished() && i->size_not_stalled() < *overlapped && i->find(peerInfo) == NULL) {
p = &*i;
*overlapped = i->size_not_stalled();
}

return p;
}

} // namespace torrent
13 changes: 3 additions & 10 deletions libtorrent/src/download/delegator.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,24 +64,17 @@ class Delegator {
TransferList* transfer_list() { return &m_transfers; }
const TransferList* transfer_list() const { return &m_transfers; }

BlockTransfer* delegate(PeerChunks* peerChunks, int affinity);
std::vector<BlockTransfer*> delegate(PeerChunks* peerChunks, uint32_t affinity, uint32_t maxPieces);

bool get_aggressive() { return m_aggressive; }
void set_aggressive(bool a) { m_aggressive = a; }

slot_peer_chunk& slot_chunk_find() { return m_slot_chunk_find; }
slot_size& slot_chunk_size() { return m_slot_chunk_size; }

// Don't call this from the outside.
Block* delegate_piece(BlockList* c, const PeerInfo* peerInfo);
Block* delegate_aggressive(BlockList* c, uint16_t* overlapped, const PeerInfo* peerInfo);

private:
// Start on a new chunk, returns .end() if none possible. bf is
// remote peer's bitfield.
Block* new_chunk(PeerChunks* pc, bool highPriority);

Block* delegate_seeder(PeerChunks* peerChunks);
void delegate_from_blocklist(std::vector<BlockTransfer*> &transfers, uint32_t maxPieces, BlockList* c, PeerInfo* peerInfo);
void delegate_new_chunks(std::vector<BlockTransfer*> &transfers, uint32_t maxPieces, PeerChunks* pc, bool highPriority);

TransferList m_transfers;

Expand Down
32 changes: 17 additions & 15 deletions libtorrent/src/protocol/peer_connection_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -970,24 +970,26 @@ PeerConnectionBase::try_request_pieces() {

while (request_list()->queued_size() < pipeSize && m_up->can_write_request()) {

// Delegator should return a vector of pieces, and it should be
// passed the number of pieces it should delegate. Try to ensure
// it receives large enough request to fill a whole chunk if the
// peer is fast enough.
// It should get the right number the first time around, but loop just to be sure
int maxRequests = m_up->max_write_request();
int maxQueued = pipeSize - request_list()->queued_size();
int maxPieces = std::max(std::min(maxRequests, maxQueued), 1);

const Piece* p = request_list()->delegate();

if (p == NULL)
break;

if (!m_download->file_list()->is_valid_piece(*p) || !m_peerChunks.bitfield()->get(p->index()))
throw internal_error("PeerConnectionBase::try_request_pieces() tried to use an invalid piece.");
std::vector<const Piece*> pieces = request_list()->delegate(maxPieces);
if (pieces.empty()) {
return false;
}

for (auto& p : pieces) {
if (!m_download->file_list()->is_valid_piece(*p) || !m_peerChunks.bitfield()->get(p->index()))
throw internal_error("PeerConnectionBase::try_request_pieces() tried to use an invalid piece.");

m_up->write_request(*p);
m_up->write_request(*p);

LT_LOG_PIECE_EVENTS("(down) requesting %" PRIu32 " %" PRIu32 " %" PRIu32,
p->index(), p->offset(), p->length());
success = true;
LT_LOG_PIECE_EVENTS("(down) requesting %" PRIu32 " %" PRIu32 " %" PRIu32,
p->index(), p->offset(), p->length());
success = true;
}
}

return success;
Expand Down
6 changes: 4 additions & 2 deletions libtorrent/src/protocol/peer_connection_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -436,11 +436,13 @@ PeerConnectionMetadata::try_request_metadata_pieces() {
if (!m_up->can_write_extension() || m_extensions->has_pending_message())
return false;

const Piece* p = request_list()->delegate();
std::vector<const Piece*> pieces = request_list()->delegate(1);

if (p == NULL)
if (pieces.empty())
return false;

const Piece* p = pieces.front();

if (!m_download->file_list()->is_valid_piece(*p) || !m_peerChunks.bitfield()->get(p->index()))
throw internal_error("PeerConnectionMetadata::try_request_metadata_pieces() tried to use an invalid piece.");

Expand Down
2 changes: 2 additions & 0 deletions libtorrent/src/protocol/protocol_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ class ProtocolBase {
bool can_write_piece() const { return m_buffer.reserved_left() >= sizeof_piece; }
bool can_write_port() const { return m_buffer.reserved_left() >= sizeof_port; }
bool can_write_extension() const { return m_buffer.reserved_left() >= sizeof_extension; }

size_type max_write_request() const { return m_buffer.reserved_left() / sizeof_request; }

bool can_read_have_body() const { return m_buffer.remaining() >= sizeof_have_body; }
bool can_read_request_body() const { return m_buffer.remaining() >= sizeof_request_body; }
Expand Down
Loading

0 comments on commit 7c8213b

Please sign in to comment.