Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

No data for singleton aggregations? #61

Open
jelos98 opened this issue Oct 17, 2023 · 1 comment
Open

No data for singleton aggregations? #61

jelos98 opened this issue Oct 17, 2023 · 1 comment

Comments

@jelos98
Copy link

jelos98 commented Oct 17, 2023

What is the expected behavior when only a single input occurs within a window? We're trying to run a simple sum aggregation over some events with long-tail of infrequent events (and a smaller number of high frequency events). We boiled it down to a test, to run in a test harness (the aggregation function simply sums the count, and takes the max of timestamps - omitted for simplicity, because the function itself doesn't appear to matter for this).

      ScottyWindowOperator<String, UserEventCounter, UserEventCounter> keyedScottyWindowOperator = ...
        UserEventCounter element1 = new UserEventCounter();
        element1.userKey = "user1";
        element1.timestamp = 1;
        element1.count = 4;

        UserEventCounter element2 = new UserEventCounter();
        element2.userKey = "user1";
        element2.timestamp = 16;
        element2.count = 5;

        UserEventCounter element3 = new UserEventCounter();
        element3.userKey = "user1";
        element3.timestamp = 33;
        element3.count = 6;

        testHarness.processElement(element1, 1);
        testHarness.processWatermark(11);
        testHarness.processElement(element2, 16);
        testHarness.processWatermark(25);        
        testHarness.processWatermark(30);        
        testHarness.processElement(element3, 33);
        testHarness.processWatermark(35);
        testHarness.processWatermark(40);
        testHarness.processWatermark(45);
        testHarness.processWatermark(50);
        

We modified ScottyWindowOperator to add some debugging prints (value in processElement, and the AggregateWindow, before filtering on aggregation.hasValue() in processWatermarks, and see the following sequence:

processElement|K     user1 W@             1| = UserEventCounter(userKey=user1, timestamp=1, count=4)
processElement|K     user1 W@            16| = UserEventCounter(userKey=user1, timestamp=16, count=5)

processWatermark|user1 @           11| = WindowResult(Time,0-10,[ScottySumUserEventCounter->UserEventCounter(userKey=user1, timestamp=1, count=4)])

processElement|K     user1 W@            33| = UserEventCounter(userKey=user1, timestamp=33, count=6)

processWatermark|user1 @           25| = WindowResult(Time,15-25,[ScottySumUserEventCounter->])
processWatermark|user1 @           25| = WindowResult(Time,10-20,[ScottySumUserEventCounter->])
processWatermark|user1 @           25| = WindowResult(Time,5-15,[ScottySumUserEventCounter->])

Observations:

  • The only element that's actually emitted is WindowResult(Time,0-10,[ScottySumUserEventCounter->UserEventCounter(userKey=user1, timestamp=1, count=4)])
  • The three windows at the end aren't actually emitted, but would presumably be the ones containing element2, above, if they were.
  • Notably, element 3 never even shows up in processWatermark.

Does Scotty, as implemented, only work if you have consistent (non-sporadic) stream of events? Is there any workaround if you have something with keys that only receive sporadic traffic like this?

@powibol
Copy link
Member

powibol commented Nov 16, 2023

Hi @jelos98, thank you for your request and detailed explanation in the ticket. We will look into the issue and get back to you as soon as possible.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants