diff --git a/nats/js/client.py b/nats/js/client.py index d972fc66..7958dbb3 100644 --- a/nats/js/client.py +++ b/nats/js/client.py @@ -414,9 +414,10 @@ async def cb(msg): if inactive_threshold: config.inactive_threshold = inactive_threshold - # Create inbox for push consumer. - deliver = self._nc.new_inbox() - config.deliver_subject = deliver + # Create inbox for push consumer, if deliver_subject is not assigned already. + if config.deliver_subject is None: + deliver = self._nc.new_inbox() + config.deliver_subject = deliver # Auto created consumers use the filter subject. config.filter_subject = subject diff --git a/tests/test_js.py b/tests/test_js.py index 720c9819..9c14bb0d 100644 --- a/tests/test_js.py +++ b/tests/test_js.py @@ -1807,6 +1807,50 @@ async def cb(msg): self.assertEqual(coroutines_before, coroutines_after_unsubscribe) self.assertNotEqual(coroutines_before, coroutines_after_subscribe) + @async_test + async def test_subscribe_push_config(self): + nc = await nats.connect() + js = nc.jetstream() + + await js.add_stream(name="pconfig", subjects=["pconfig"]) + + s, d = ([], []) + + async def cb_s(msg): + s.append(msg.data) + + async def cb_d(msg): + d.append(msg.data) + + #Create config for our subscriber + cc = nats.js.api.ConsumerConfig( + name="pconfig-ps", deliver_subject="pconfig-deliver" + ) + + #Make stream consumer with set deliver_subjct + sub_s = await js.subscribe( + "pconfig", stream="pconfig", cb=cb_s, config=cc + ) + #Make direct sub on deliver_subject + sub_d = await nc.subscribe("pconfig-deliver", "check-queue", cb=cb_d) + + #Stream consumer sub should have configured subject + assert sub_s.subject == "pconfig-deliver" + + #Publish some messages + for i in range(10): + await js.publish("pconfig", f'Hello World {i}'.encode()) + + await asyncio.sleep(0.5) + #Both subs should recieve same messages, but we are not sure about order + assert len(s) == len(d) + assert set(s) == set(d) + + #Cleanup + await js.delete_consumer("pconfig", "pconfig-ps") + await js.delete_stream("pconfig") + await nc.close() + class AckPolicyTest(SingleJetStreamServerTestCase):