Skip to content

Commit

Permalink
Fix deliver_subject in implicit subscription creation (#615)
Browse files Browse the repository at this point in the history
---------

Signed-off-by: Mikołaj Nowak <[email protected]>
  • Loading branch information
m3nowak authored Oct 8, 2024
1 parent 1ecc5e1 commit c90166c
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 3 deletions.
7 changes: 4 additions & 3 deletions nats/js/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,9 +414,10 @@ async def cb(msg):
if inactive_threshold:
config.inactive_threshold = inactive_threshold

# Create inbox for push consumer.
deliver = self._nc.new_inbox()
config.deliver_subject = deliver
# Create inbox for push consumer, if deliver_subject is not assigned already.
if config.deliver_subject is None:
deliver = self._nc.new_inbox()
config.deliver_subject = deliver

# Auto created consumers use the filter subject.
config.filter_subject = subject
Expand Down
44 changes: 44 additions & 0 deletions tests/test_js.py
Original file line number Diff line number Diff line change
Expand Up @@ -1807,6 +1807,50 @@ async def cb(msg):
self.assertEqual(coroutines_before, coroutines_after_unsubscribe)
self.assertNotEqual(coroutines_before, coroutines_after_subscribe)

@async_test
async def test_subscribe_push_config(self):
nc = await nats.connect()
js = nc.jetstream()

await js.add_stream(name="pconfig", subjects=["pconfig"])

s, d = ([], [])

async def cb_s(msg):
s.append(msg.data)

async def cb_d(msg):
d.append(msg.data)

#Create config for our subscriber
cc = nats.js.api.ConsumerConfig(
name="pconfig-ps", deliver_subject="pconfig-deliver"
)

#Make stream consumer with set deliver_subjct
sub_s = await js.subscribe(
"pconfig", stream="pconfig", cb=cb_s, config=cc
)
#Make direct sub on deliver_subject
sub_d = await nc.subscribe("pconfig-deliver", "check-queue", cb=cb_d)

#Stream consumer sub should have configured subject
assert sub_s.subject == "pconfig-deliver"

#Publish some messages
for i in range(10):
await js.publish("pconfig", f'Hello World {i}'.encode())

await asyncio.sleep(0.5)
#Both subs should recieve same messages, but we are not sure about order
assert len(s) == len(d)
assert set(s) == set(d)

#Cleanup
await js.delete_consumer("pconfig", "pconfig-ps")
await js.delete_stream("pconfig")
await nc.close()


class AckPolicyTest(SingleJetStreamServerTestCase):

Expand Down

0 comments on commit c90166c

Please sign in to comment.