Skip to content

Commit

Permalink
Run ruff format
Browse files Browse the repository at this point in the history
  • Loading branch information
caspervonb committed Sep 25, 2024
1 parent 7397b84 commit 5f0b119
Show file tree
Hide file tree
Showing 54 changed files with 2,201 additions and 2,243 deletions.
10 changes: 6 additions & 4 deletions benchmark/inbox_perf.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,23 @@

from nats.nuid import NUID

INBOX_PREFIX = bytearray(b'_INBOX.')
INBOX_PREFIX = bytearray(b"_INBOX.")


def gen_inboxes_nuid(n):
nuid = NUID()
for i in range(0, n):
inbox = INBOX_PREFIX[:]
inbox.extend(nuid.next())
inbox.extend(b'.')
inbox.extend(b".")
inbox.extend(nuid.next())

if __name__ == '__main__':

if __name__ == "__main__":
benchs = [
"gen_inboxes_nuid(100000)",
"gen_inboxes_nuid(1000000)",
]
]
for bench in benchs:
print(f"=== {bench}")
prof.run(bench)
105 changes: 55 additions & 50 deletions benchmark/latency_perf.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,66 +10,71 @@
HASH_MODULO = 250

try:
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
import uvloop

asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
except:
pass
pass


def show_usage():
message = """
message = """
Usage: latency_perf [options]
options:
-n ITERATIONS Iterations to spec (default: 1000)
-S SUBJECT Send subject (default: (test)
"""
print(message)
print(message)


def show_usage_and_die():
show_usage()
sys.exit(1)
show_usage()
sys.exit(1)


async def main():
parser = argparse.ArgumentParser()
parser.add_argument('-n', '--iterations', default=DEFAULT_ITERATIONS, type=int)
parser.add_argument('-S', '--subject', default='test')
parser.add_argument('--servers', default=[], action='append')
args = parser.parse_args()

servers = args.servers
if len(args.servers) < 1:
servers = ["nats://127.0.0.1:4222"]

try:
nc = await nats.connect(servers)
except Exception as e:
sys.stderr.write(f"ERROR: {e}")
show_usage_and_die()

async def handler(msg):
await nc.publish(msg.reply, b'')
await nc.subscribe(args.subject, cb=handler)

# Start the benchmark
start = time.monotonic()
to_send = args.iterations

print("Sending {} request/responses on [{}]".format(
args.iterations, args.subject))
while to_send > 0:
to_send -= 1
if to_send == 0:
break

await nc.request(args.subject, b'')
if (to_send % HASH_MODULO) == 0:
sys.stdout.write("#")
sys.stdout.flush()

duration = time.monotonic() - start
ms = "%.3f" % ((duration/args.iterations) * 1000)
print(f"\nTest completed : {ms} ms avg request/response latency")
await nc.close()

if __name__ == '__main__':
asyncio.run(main())
parser = argparse.ArgumentParser()
parser.add_argument("-n", "--iterations", default=DEFAULT_ITERATIONS, type=int)
parser.add_argument("-S", "--subject", default="test")
parser.add_argument("--servers", default=[], action="append")
args = parser.parse_args()

servers = args.servers
if len(args.servers) < 1:
servers = ["nats://127.0.0.1:4222"]

try:
nc = await nats.connect(servers)
except Exception as e:
sys.stderr.write(f"ERROR: {e}")
show_usage_and_die()

async def handler(msg):
await nc.publish(msg.reply, b"")

await nc.subscribe(args.subject, cb=handler)

# Start the benchmark
start = time.monotonic()
to_send = args.iterations

print("Sending {} request/responses on [{}]".format(args.iterations, args.subject))
while to_send > 0:
to_send -= 1
if to_send == 0:
break

await nc.request(args.subject, b"")
if (to_send % HASH_MODULO) == 0:
sys.stdout.write("#")
sys.stdout.flush()

duration = time.monotonic() - start
ms = "%.3f" % ((duration / args.iterations) * 1000)
print(f"\nTest completed : {ms} ms avg request/response latency")
await nc.close()


if __name__ == "__main__":
asyncio.run(main())
38 changes: 20 additions & 18 deletions benchmark/parser_perf.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,20 @@


class DummyNatsClient:

def __init__(self):
self._subs = {}
self._pongs = []
self._pings_outstanding = 0
self._pongs_received = 0
self._server_info = {"max_payload": 1048576, "auth_required": False }
self._server_info = {"max_payload": 1048576, "auth_required": False}
self.stats = {
'in_msgs': 0,
'out_msgs': 0,
'in_bytes': 0,
'out_bytes': 0,
'reconnects': 0,
'errors_received': 0
}
"in_msgs": 0,
"out_msgs": 0,
"in_bytes": 0,
"out_bytes": 0,
"reconnects": 0,
"errors_received": 0,
}

async def _send_command(self, cmd):
pass
Expand All @@ -31,31 +30,34 @@ async def _process_ping(self):
pass

async def _process_msg(self, sid, subject, reply, data):
self.stats['in_msgs'] += 1
self.stats['in_bytes'] += len(data)
self.stats["in_msgs"] += 1
self.stats["in_bytes"] += len(data)

async def _process_err(self, err=None):
pass


def generate_msg(subject, nbytes, reply=""):
msg = []
protocol_line = "MSG {subject} 1 {reply} {nbytes}\r\n".format(
subject=subject, reply=reply, nbytes=nbytes).encode()
subject=subject, reply=reply, nbytes=nbytes
).encode()
msg.append(protocol_line)
msg.append(b'A' * nbytes)
msg.append(b'r\n')
return b''.join(msg)
msg.append(b"A" * nbytes)
msg.append(b"r\n")
return b"".join(msg)


def parse_msgs(max_msgs=1, nbytes=1):
buf = b''.join([generate_msg("foo", nbytes) for i in range(0, max_msgs)])
buf = b"".join([generate_msg("foo", nbytes) for i in range(0, max_msgs)])
print("--- buffer size: {}".format(len(buf)))
loop = asyncio.get_event_loop()
ps = Parser(DummyNatsClient())
loop.run_until_complete(ps.parse(buf))
print("--- stats: ", ps.nc.stats)

if __name__ == '__main__':

if __name__ == "__main__":
benchs = [
"parse_msgs(max_msgs=10000, nbytes=1)",
"parse_msgs(max_msgs=100000, nbytes=1)",
Expand All @@ -71,7 +73,7 @@ def parse_msgs(max_msgs=1, nbytes=1):
"parse_msgs(max_msgs=100000, nbytes=8192)",
"parse_msgs(max_msgs=10000, nbytes=16384)",
"parse_msgs(max_msgs=100000, nbytes=16384)",
]
]

for bench in benchs:
print(f"=== {bench}")
Expand Down
46 changes: 28 additions & 18 deletions benchmark/pub_perf.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,19 @@
import nats

try:
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
import uvloop

asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
except:
pass
pass

DEFAULT_FLUSH_TIMEOUT = 30
DEFAULT_NUM_MSGS = 100000
DEFAULT_MSG_SIZE = 16
DEFAULT_BATCH_SIZE = 100
HASH_MODULO = 1000


def show_usage():
message = """
Usage: pub_perf [options]
Expand All @@ -30,32 +32,34 @@ def show_usage():
"""
print(message)


def show_usage_and_die():
show_usage()
sys.exit(1)


async def main():
parser = argparse.ArgumentParser()
parser.add_argument('-n', '--count', default=DEFAULT_NUM_MSGS, type=int)
parser.add_argument('-s', '--size', default=DEFAULT_MSG_SIZE, type=int)
parser.add_argument('-S', '--subject', default='test')
parser.add_argument('-b', '--batch', default=DEFAULT_BATCH_SIZE, type=int)
parser.add_argument('--servers', default=[], action='append')
parser.add_argument("-n", "--count", default=DEFAULT_NUM_MSGS, type=int)
parser.add_argument("-s", "--size", default=DEFAULT_MSG_SIZE, type=int)
parser.add_argument("-S", "--subject", default="test")
parser.add_argument("-b", "--batch", default=DEFAULT_BATCH_SIZE, type=int)
parser.add_argument("--servers", default=[], action="append")
args = parser.parse_args()

data = []
for i in range(0, args.size):
s = "%01x" % randint(0, 15)
data.append(s.encode())
payload = b''.join(data)
payload = b"".join(data)

servers = args.servers
if len(args.servers) < 1:
servers = ["nats://127.0.0.1:4222"]

# Make sure we're connected to a server first..
try:
nc = await nats.connect(servers, pending_size=1024*1024)
nc = await nats.connect(servers, pending_size=1024 * 1024)
except Exception as e:
sys.stderr.write(f"ERROR: {e}")
show_usage_and_die()
Expand All @@ -64,8 +68,11 @@ async def main():
start = time.time()
to_send = args.count

print("Sending {} messages of size {} bytes on [{}]".format(
args.count, args.size, args.subject))
print(
"Sending {} messages of size {} bytes on [{}]".format(
args.count, args.size, args.subject
)
)
while to_send > 0:
for i in range(0, args.batch):
to_send -= 1
Expand All @@ -86,11 +93,14 @@ async def main():
print(f"Server flush timeout after {DEFAULT_FLUSH_TIMEOUT}")

elapsed = time.time() - start
mbytes = "%.1f" % (((args.size * args.count)/elapsed) / (1024*1024))
print("\nTest completed : {} msgs/sec ({}) MB/sec".format(
args.count/elapsed,
mbytes))
mbytes = "%.1f" % (((args.size * args.count) / elapsed) / (1024 * 1024))
print(
"\nTest completed : {} msgs/sec ({}) MB/sec".format(
args.count / elapsed, mbytes
)
)
await nc.close()

if __name__ == '__main__':
asyncio.run(main())

if __name__ == "__main__":
asyncio.run(main())
Loading

0 comments on commit 5f0b119

Please sign in to comment.