Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enforce formatting #612

Merged
merged 1 commit into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions .github/workflows/check.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
name: Check
on:
push:
branches:
- main
pull_request:
branches:
- "*"
jobs:
format:
runs-on: ubuntu-latest
name: Format
steps:
- name: Check out repository
uses: actions/checkout@v3

- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: "3.8"

- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install pipenv
pipenv install --dev

- name: Run format check
run: |
pipenv run yapf --diff --recursive .
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ jobs:

- name: Run tests
run: |
pipenv run flake8 --ignore=W391 ./nats/js/
pipenv run flake8 --ignore="W391, W503, W504" ./nats/js/
pipenv run pytest -x -vv -s --continue-on-collection-errors
env:
PATH: $HOME/nats-server:$PATH
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ ci: deps
# pipenv run yapf --recursive --diff $(SOURCE_CODE)
# pipenv run yapf --recursive --diff tests
# pipenv run mypy
pipenv run flake8 --ignore=W391 ./nats/js/
pipenv run flake8 --ignore="W391, W503, W504" ./nats/js/
pipenv run pytest -x -vv -s --continue-on-collection-errors

watch:
Expand Down
10 changes: 6 additions & 4 deletions benchmark/inbox_perf.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,23 @@

from nats.nuid import NUID

INBOX_PREFIX = bytearray(b'_INBOX.')
INBOX_PREFIX = bytearray(b"_INBOX.")


def gen_inboxes_nuid(n):
nuid = NUID()
for i in range(0, n):
inbox = INBOX_PREFIX[:]
inbox.extend(nuid.next())
inbox.extend(b'.')
inbox.extend(b".")
inbox.extend(nuid.next())

if __name__ == '__main__':

if __name__ == "__main__":
benchs = [
"gen_inboxes_nuid(100000)",
"gen_inboxes_nuid(1000000)",
]
]
for bench in benchs:
print(f"=== {bench}")
prof.run(bench)
111 changes: 61 additions & 50 deletions benchmark/latency_perf.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,66 +10,77 @@
HASH_MODULO = 250

try:
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
import uvloop

asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
except:
pass
pass


def show_usage():
message = """
message = """
Usage: latency_perf [options]

options:
-n ITERATIONS Iterations to spec (default: 1000)
-S SUBJECT Send subject (default: (test)
"""
print(message)
print(message)


def show_usage_and_die():
show_usage()
sys.exit(1)
show_usage()
sys.exit(1)


async def main():
parser = argparse.ArgumentParser()
parser.add_argument('-n', '--iterations', default=DEFAULT_ITERATIONS, type=int)
parser.add_argument('-S', '--subject', default='test')
parser.add_argument('--servers', default=[], action='append')
args = parser.parse_args()

servers = args.servers
if len(args.servers) < 1:
servers = ["nats://127.0.0.1:4222"]

try:
nc = await nats.connect(servers)
except Exception as e:
sys.stderr.write(f"ERROR: {e}")
show_usage_and_die()

async def handler(msg):
await nc.publish(msg.reply, b'')
await nc.subscribe(args.subject, cb=handler)

# Start the benchmark
start = time.monotonic()
to_send = args.iterations

print("Sending {} request/responses on [{}]".format(
args.iterations, args.subject))
while to_send > 0:
to_send -= 1
if to_send == 0:
break

await nc.request(args.subject, b'')
if (to_send % HASH_MODULO) == 0:
sys.stdout.write("#")
sys.stdout.flush()

duration = time.monotonic() - start
ms = "%.3f" % ((duration/args.iterations) * 1000)
print(f"\nTest completed : {ms} ms avg request/response latency")
await nc.close()

if __name__ == '__main__':
asyncio.run(main())
parser = argparse.ArgumentParser()
parser.add_argument(
"-n", "--iterations", default=DEFAULT_ITERATIONS, type=int
)
parser.add_argument("-S", "--subject", default="test")
parser.add_argument("--servers", default=[], action="append")
args = parser.parse_args()

servers = args.servers
if len(args.servers) < 1:
servers = ["nats://127.0.0.1:4222"]

try:
nc = await nats.connect(servers)
except Exception as e:
sys.stderr.write(f"ERROR: {e}")
show_usage_and_die()

async def handler(msg):
await nc.publish(msg.reply, b"")

await nc.subscribe(args.subject, cb=handler)

# Start the benchmark
start = time.monotonic()
to_send = args.iterations

print(
"Sending {} request/responses on [{}]".format(
args.iterations, args.subject
)
)
while to_send > 0:
to_send -= 1
if to_send == 0:
break

await nc.request(args.subject, b"")
if (to_send % HASH_MODULO) == 0:
sys.stdout.write("#")
sys.stdout.flush()

duration = time.monotonic() - start
ms = "%.3f" % ((duration / args.iterations) * 1000)
print(f"\nTest completed : {ms} ms avg request/response latency")
await nc.close()


if __name__ == "__main__":
asyncio.run(main())
37 changes: 20 additions & 17 deletions benchmark/parser_perf.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ def __init__(self):
self._pongs = []
self._pings_outstanding = 0
self._pongs_received = 0
self._server_info = {"max_payload": 1048576, "auth_required": False }
self._server_info = {"max_payload": 1048576, "auth_required": False}
self.stats = {
'in_msgs': 0,
'out_msgs': 0,
'in_bytes': 0,
'out_bytes': 0,
'reconnects': 0,
'errors_received': 0
}
"in_msgs": 0,
"out_msgs": 0,
"in_bytes": 0,
"out_bytes": 0,
"reconnects": 0,
"errors_received": 0,
}

async def _send_command(self, cmd):
pass
Expand All @@ -31,31 +31,34 @@ async def _process_ping(self):
pass

async def _process_msg(self, sid, subject, reply, data):
self.stats['in_msgs'] += 1
self.stats['in_bytes'] += len(data)
self.stats["in_msgs"] += 1
self.stats["in_bytes"] += len(data)

async def _process_err(self, err=None):
pass


def generate_msg(subject, nbytes, reply=""):
msg = []
protocol_line = "MSG {subject} 1 {reply} {nbytes}\r\n".format(
subject=subject, reply=reply, nbytes=nbytes).encode()
subject=subject, reply=reply, nbytes=nbytes
).encode()
msg.append(protocol_line)
msg.append(b'A' * nbytes)
msg.append(b'r\n')
return b''.join(msg)
msg.append(b"A" * nbytes)
msg.append(b"r\n")
return b"".join(msg)


def parse_msgs(max_msgs=1, nbytes=1):
buf = b''.join([generate_msg("foo", nbytes) for i in range(0, max_msgs)])
buf = b"".join([generate_msg("foo", nbytes) for i in range(0, max_msgs)])
print("--- buffer size: {}".format(len(buf)))
loop = asyncio.get_event_loop()
ps = Parser(DummyNatsClient())
loop.run_until_complete(ps.parse(buf))
print("--- stats: ", ps.nc.stats)

if __name__ == '__main__':

if __name__ == "__main__":
benchs = [
"parse_msgs(max_msgs=10000, nbytes=1)",
"parse_msgs(max_msgs=100000, nbytes=1)",
Expand All @@ -71,7 +74,7 @@ def parse_msgs(max_msgs=1, nbytes=1):
"parse_msgs(max_msgs=100000, nbytes=8192)",
"parse_msgs(max_msgs=10000, nbytes=16384)",
"parse_msgs(max_msgs=100000, nbytes=16384)",
]
]

for bench in benchs:
print(f"=== {bench}")
Expand Down
46 changes: 28 additions & 18 deletions benchmark/pub_perf.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,19 @@
import nats

try:
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
import uvloop

asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
except:
pass
pass

DEFAULT_FLUSH_TIMEOUT = 30
DEFAULT_NUM_MSGS = 100000
DEFAULT_MSG_SIZE = 16
DEFAULT_BATCH_SIZE = 100
HASH_MODULO = 1000


def show_usage():
message = """
Usage: pub_perf [options]
Expand All @@ -30,32 +32,34 @@ def show_usage():
"""
print(message)


def show_usage_and_die():
show_usage()
sys.exit(1)


async def main():
parser = argparse.ArgumentParser()
parser.add_argument('-n', '--count', default=DEFAULT_NUM_MSGS, type=int)
parser.add_argument('-s', '--size', default=DEFAULT_MSG_SIZE, type=int)
parser.add_argument('-S', '--subject', default='test')
parser.add_argument('-b', '--batch', default=DEFAULT_BATCH_SIZE, type=int)
parser.add_argument('--servers', default=[], action='append')
parser.add_argument("-n", "--count", default=DEFAULT_NUM_MSGS, type=int)
parser.add_argument("-s", "--size", default=DEFAULT_MSG_SIZE, type=int)
parser.add_argument("-S", "--subject", default="test")
parser.add_argument("-b", "--batch", default=DEFAULT_BATCH_SIZE, type=int)
parser.add_argument("--servers", default=[], action="append")
args = parser.parse_args()

data = []
for i in range(0, args.size):
s = "%01x" % randint(0, 15)
data.append(s.encode())
payload = b''.join(data)
payload = b"".join(data)

servers = args.servers
if len(args.servers) < 1:
servers = ["nats://127.0.0.1:4222"]

# Make sure we're connected to a server first..
try:
nc = await nats.connect(servers, pending_size=1024*1024)
nc = await nats.connect(servers, pending_size=1024 * 1024)
except Exception as e:
sys.stderr.write(f"ERROR: {e}")
show_usage_and_die()
Expand All @@ -64,8 +68,11 @@ async def main():
start = time.time()
to_send = args.count

print("Sending {} messages of size {} bytes on [{}]".format(
args.count, args.size, args.subject))
print(
"Sending {} messages of size {} bytes on [{}]".format(
args.count, args.size, args.subject
)
)
while to_send > 0:
for i in range(0, args.batch):
to_send -= 1
Expand All @@ -86,11 +93,14 @@ async def main():
print(f"Server flush timeout after {DEFAULT_FLUSH_TIMEOUT}")

elapsed = time.time() - start
mbytes = "%.1f" % (((args.size * args.count)/elapsed) / (1024*1024))
print("\nTest completed : {} msgs/sec ({}) MB/sec".format(
args.count/elapsed,
mbytes))
mbytes = "%.1f" % (((args.size * args.count) / elapsed) / (1024 * 1024))
print(
"\nTest completed : {} msgs/sec ({}) MB/sec".format(
args.count / elapsed, mbytes
)
)
await nc.close()

if __name__ == '__main__':
asyncio.run(main())

if __name__ == "__main__":
asyncio.run(main())
Loading
Loading