Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

what's the purpose of line_allocated_event for redis? #70

Open
uguryiilmz opened this issue May 12, 2023 · 1 comment
Open

what's the purpose of line_allocated_event for redis? #70

uguryiilmz opened this issue May 12, 2023 · 1 comment

Comments

@uguryiilmz
Copy link

Hello, thanks for all the resources you provided in the book, helping me tremendously to understand what a clean architecture could look like in python world. Just have a quick question about this line in end-to-end test:

**subscription = redis_client.subscribe_to("line_allocated")**

what's line_allocated channel needed for? I could see that the message is getting published to "change_batch_quantity" channel but we are subscribed to a different channel but somehow still able to read the messages that come from change_batch_quantity. I might be missing something easy to understand as i don't have a lot of experience with redis pubsub. I thought the channel names should have been same for them to exchange messages. Regardless of it, the tests pass so it seems like i'm missing something here. Thank you

@pytest.mark.usefixtures("postgres_db")
@pytest.mark.usefixtures("restart_api")
@pytest.mark.usefixtures("restart_redis_pubsub")
def test_change_batch_quantity_leading_to_reallocation():
    # start with two batches and an order allocated to one of them
    orderid, sku = random_orderid(), random_sku()
    earlier_batch, later_batch = random_batchref("old"), random_batchref("newer")
    api_client.post_to_add_batch(earlier_batch, sku, qty=10, eta="2011-01-01")
    api_client.post_to_add_batch(later_batch, sku, qty=10, eta="2011-01-02")
    r = api_client.post_to_allocate(orderid, sku, 10)
    assert r.ok
    response = api_client.get_allocation(orderid)
    assert response.json()[0]["batchref"] == earlier_batch

    **subscription = redis_client.subscribe_to("line_allocated")**

    # change quantity on allocated batch so it's less than our order
    redis_client.publish_message(
        "change_batch_quantity",
        {"batchref": earlier_batch, "qty": 5},
    )

    # wait until we see a message saying the order has been reallocated
    messages = []
    for attempt in Retrying(stop=stop_after_delay(3), reraise=True):
        with attempt:
            message = subscription.get_message(timeout=1)
            if message:
                messages.append(message)
                print(messages)
            data = json.loads(messages[-1]["data"])
            assert data["orderid"] == orderid
            assert data["batchref"] == later_batch
@uguryiilmz
Copy link
Author

upon investigating more, i noticed everything is working as expected. Book was right about event driven systems being more difficult to understand the flow. So if anyone else is having a problem understanding what's going on, here's the explanation:

  1. creating a subscription on "line_allocated_channel", in which this channel is only triggered if reallocation happens

  2. publishing a message on 'change_batch_quantity' channel, so this part is important because when a new message is published to this topic/channel, it'll trigger change_batch_quantity() command method from product domain. Once this happens, it will also add new events to our event list with this line
    self.events.append(events.Deallocated(line.orderid, line.sku, line.qty))

  3. as long as we have events to process, unit of works will just pop them out and we will their own handler to process those events. Deallocated event's handler is reallocate function in the service layer's handle, which will in turn add another event to our queue which is commands.Allocate()

  4. now we have commands.allocate remaining in our events list, which calls allocate function again and then that adds new event to events queue. Then, this method is calling the handler from events services which is publish_allocated_events. This will be what we read from our subscription

def publish_allocated_event(
    event: events.Allocated,
    uow: unit_of_work.AbstractUnitOfWork,
):
    logging.info("event is",event)
    print("event is",event)
    redis_eventpublisher.publish("line_allocated", event)

it looks a little complicated but this is the whole workflow for the end-to-end test. Of course the tests can be expanded to also tests what messages is published into channel 'change_batch_quantity' to see the flow in more details

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant