Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added PingService #480

Merged
merged 5 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/ping/ping.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ async def handle_ping(stream: INetStream) -> None:
try:
payload = await stream.read(PING_LENGTH)
peer_id = stream.muxed_conn.peer_id
if payload != None:
if payload is not None:
print(f"received ping from {peer_id}")

await stream.write(payload)
print(f"responded with pong to {peer_id}")

except:
except Exception:
await stream.reset()


Expand Down
43 changes: 43 additions & 0 deletions libp2p/host/ping.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
import logging
import secrets
import time
from typing import (
List,
)

import trio

from libp2p.host.host_interface import (
IHost,
)
from libp2p.network.stream.exceptions import (
StreamClosed,
StreamEOF,
Expand Down Expand Up @@ -65,7 +73,42 @@ async def handle_ping(stream: INetStream) -> None:
try:
should_continue = await _handle_ping(stream, peer_id)
if not should_continue:
await stream.close()
return
except Exception:
await stream.reset()
return


async def _ping(stream: INetStream) -> int:
"""
Helper function to perform a single ping operation on a given stream,
returns integer value rtt - which denotes round trip time for a ping request in ms
"""
ping_bytes = secrets.token_bytes(PING_LENGTH)
before = time.time()
await stream.write(ping_bytes)
pong_bytes = await stream.read(PING_LENGTH)
rtt = int((time.time() - before) * (10**6))
if ping_bytes != pong_bytes:
logger.debug("invalid pong response")
raise
return rtt


class PingService:
"""PingService executes pings and returns RTT in miliseconds."""

def __init__(self, host: IHost):
self._host = host

async def ping(self, peer_id: PeerID, ping_amt: int = 1) -> List[int]:
stream = await self._host.new_stream(peer_id, [ID])

try:
rtts = [await _ping(stream) for _ in range(ping_amt)]
await stream.close()
return rtts
except Exception:
await stream.close()
raise
1 change: 1 addition & 0 deletions newsfragments/344.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added ``PingService`` class in ``host/ping.py`` which can be used to initiate ping requests to peers and added tests for the same
26 changes: 26 additions & 0 deletions tests/core/host/test_ping.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from libp2p.host.ping import (
ID,
PING_LENGTH,
PingService,
)
from libp2p.tools.factories import (
host_pair_factory,
Expand Down Expand Up @@ -47,3 +48,28 @@ async def test_ping_several(security_protocol):
# NOTE: this interval can be `0` for this test.
await trio.sleep(0)
await stream.close()


@pytest.mark.trio
async def test_ping_service_once(security_protocol):
async with host_pair_factory(security_protocol=security_protocol) as (
host_a,
host_b,
):
ping_service = PingService(host_b)
rtts = await ping_service.ping(host_a.get_id())
assert len(rtts) == 1
assert rtts[0] < 10**6


@pytest.mark.trio
async def test_ping_service_several(security_protocol):
async with host_pair_factory(security_protocol=security_protocol) as (
host_a,
host_b,
):
ping_service = PingService(host_b)
rtts = await ping_service.ping(host_a.get_id(), ping_amt=SOME_PING_COUNT)
assert len(rtts) == SOME_PING_COUNT
for rtt in rtts:
assert rtt < 10**6