Skip to content
This repository has been archived by the owner on Jun 17, 2024. It is now read-only.

Event write ordering not preserved #68

Open
rickardoberg opened this issue Nov 2, 2016 · 21 comments
Open

Event write ordering not preserved #68

rickardoberg opened this issue Nov 2, 2016 · 21 comments

Comments

@rickardoberg
Copy link

Server: 3.8.1
JVM Client:
eventstore-client_2.11
2.4.1

No custom Akka conf, just defaults. BetterOrdering:true on server.

As reported on mailing list, it seems the JVM client does not preserve write order. Minimal test creates 1000 events with the ordinal as metadata, and on write ack outputs ordinal + commit position. Here's a run without synchronous ack (=do not wait for future to complete before sending next write):
Wrote:695:1650
Wrote:26:1876
Wrote:818:2101
Wrote:690:2327
Wrote:774:2553
Wrote:985:2779
Wrote:594:3005
Wrote:516:3231
Wrote:886:3457
Wrote:453:3683
Wrote:862:3909
Wrote:471:4135
Wrote:553:4361
Wrote:674:4587
Wrote:274:4813
Wrote:845:5039
Wrote:151:5265
Wrote:184:5491
....

The futures are notified in the order they are written, but the events themselves have been reordered.

Adding a wait on each write future (created through ask(connection, message, writeTimeout), where message is a WriteEvents object) causes the following output, with no other changes:
Wrote:0:1651
Wrote:1:1875
Wrote:2:2099
Wrote:3:2323
Wrote:4:2547
Wrote:5:2771
Wrote:6:2995
Wrote:7:3219
Wrote:8:3443
Wrote:9:3667
Wrote:10:3891
Wrote:11:4116
Wrote:12:4341
Wrote:13:4566
Wrote:14:4791
Wrote:15:5016
Wrote:16:5241
Wrote:17:5466
Wrote:18:5691
Wrote:19:5916
Wrote:20:6141
Wrote:21:6366
Wrote:22:6591
Wrote:23:6816
Wrote:24:7041
Wrote:25:7266
Wrote:26:7491
Wrote:27:7716
Wrote:28:7941
Wrote:29:8166
Wrote:30:8391
Wrote:31:8616
Wrote:32:8841
Wrote:33:9066
Wrote:34:9291
Wrote:35:95
....

The commit position increases along with ordinal, so write order has been preserved.

For me it is critical to understand what I can do, if anything, to keep the writes in the same order as they were produced. Adding a wait on the future after each event is not really an option.

For reference, the use case is to copy events from one EventStore to another. We currently have 6M events, and with the synchronous write (to make it safe) it looks like it will take almost a week to do this copy, which is really really really bad.

Thanks!

@rickardoberg
Copy link
Author

Testing with the msemys/esjc JVM client shows that I can get that to do write reordering as well, BUT with that one I can put in a simple semaphore throttle to have (for example) 10 concurrent writes, and write order will still be preserved. With the Akka client the semaphore throttle doesn't help at all, still get reordering.

@t3hnar
Copy link
Contributor

t3hnar commented Nov 2, 2016

On it's own JVM Client is sequential as any other Actor, if you will fire 10 messages from the same thread, those will be processed by client in the same order. So I suppose tcp + server is the place where reordering may happen.

Anyway I'd like to help you to resolve this problem, could you please post a simple example reproducing this case?

Do you write to the same Stream ?

@rickardoberg
Copy link
Author

Scratch that, even with the other JVM client I can still get reordering, even with just a 10 write semaphore (i.e. no more than 10 outstanding writes). As of now it doesn't seem possible to do an EventStore copy (subscriber to one, write to another) without waiting for each write to finish (which basically means in practice it's not possible, because of the time it would take to finish any reasonably sized store). Are there any tricks to get this to work?

@rickardoberg
Copy link
Author

rickardoberg commented Nov 2, 2016

@t3hnar no, I write to one stream per aggregate. Does that make a difference?

@rickardoberg
Copy link
Author

rickardoberg commented Nov 2, 2016

I can't easily post the Akka version code, because it uses too many internal helper classes. However, here's the esjc version of the test, same issue:

public class EventStoreOrderTest
{
    @Test
    public void testOrdering() throws IOException, InterruptedException
    {
        EventStore eventstore = EventStoreBuilder.newBuilder()
                                                 .singleNodeAddress("127.0.0.1", 1113)
                                                 .operationTimeout( Duration.ofSeconds(30) )
                                                 .userCredentials("admin", "changeit")
                                                 .build();

        int COUNT = 1_000_000;
        Semaphore throttle = new Semaphore(10);
        CountDownLatch latch = new CountDownLatch( COUNT );
        AtomicInteger next = new AtomicInteger(  );
        for ( int i = 0; i < COUNT; i++ )
        {
            final int j = i;
            throttle.acquire();
            eventstore.appendToStream("foo", ExpectedVersion.any(), asList(
                    EventData.newBuilder()
                             .type("bar")
                             .data("i:"+i)
                             .metadata("")
                             .build()))
                      .thenAccept(r ->
                      {
                          if (j != next.get())
                          {
                              System.out.println("Expected "+next.get()+", got "+j);
                          }
                          next.incrementAndGet();

//                          System.out.println("#"+j+":"+r.logPosition);
                          throttle.release();
/*
                          if (throttle.availablePermits() < 10)
                              throttle.release();
*/
                          latch.countDown();
                      });

        }

        latch.await();

        eventstore.disconnect();
    }
}

@rickardoberg
Copy link
Author

Actually, as you see in the above test, it writes to the "foo" stream, makes no difference, still does reordering.

@t3hnar
Copy link
Contributor

t3hnar commented Nov 2, 2016

Indeed you need to write next batch after the previous one completed and please select proper size of batches to increase throughput.

In the meantime I'm going to create test to verify that there are no such issues on JVM client, and the reordering we are facing here - (tcp + server) part.
Or will ship the fix if issue is found. Looks like the plan.

@rickardoberg
Copy link
Author

How do I create a batch? I would be happy to do so, have repeatedly asked on the list how to do this, but so far no go.

@t3hnar
Copy link
Contributor

t3hnar commented Nov 2, 2016

asList(
                EventData.newBuilder()
                         .type("bar")
                         .data("i:"+i)
                         .metadata("")
                         .build()))

Currently You are passing asList with single element, but it is better to pass many entries with single write.

@rickardoberg
Copy link
Author

@t3hnar true, but if I put many in there (same with Akka JVM client) then it is impossible to use different streams. You have to put all of them into the same one. Right?

@t3hnar
Copy link
Contributor

t3hnar commented Nov 2, 2016

yes, so basically you need to group by stream and sequential write per stream. Different streams will be written concurrently, this should cover most of your needs.

@rickardoberg
Copy link
Author

@t3hnar that will make it quite complex, and hard to get the exact same output sequence as the input sequence, not to mention the speed will still suck really badly, as most of the time the batch will only have one event in it (if I see a new stream id I have to flush the one currently being used), yet I need to wait for the future to complete before continuing. Really really bad.

Fortunately we don't really use the fact that we have one stream per aggregate, as we load aggregate snapshots from the database rather than from events, so I could try putting them all into one stream and effectively destroy any stream id information that I currently have. That would at least make it sort of work, and not take a week to complete.

@gregoryyoung
Copy link
Contributor

With the .NET client you can get ordering if writing async from a single
thread if betterordering is set.

On Wed, Nov 2, 2016 at 7:25 AM, Yaroslav Klymko [email protected]
wrote:

On it's own JVM Client is sequential as any other Actor, if you will fire
10 messages from the same thread, those will be processed by client in the
same order. So I suppose tcp + server is the place where reordering may
happen.

Anyway I'd like to help you to resolve this problem, could you please post
a simple example reproducing this case?

Do you write to the same Stream ?


You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
#68 (comment),
or mute the thread
https://github.com/notifications/unsubscribe-auth/AAXRWgLtiN2Zr-LoGnY0_l9tb84tP6Gpks5q6Dr0gaJpZM4Kmw4O
.

Studying for the Turing test

@rickardoberg
Copy link
Author

@gregoryyoung that's good to know. Then it should be possible to fix this in the JVM client I guess? It would be great if you could replicate it locally, and see if there's ANYTHING to be done. For now I'm going to try erasing my stream id's and doing batching as per above. Doing a run now, and it seems to have better performance (200 messages/second, compared to 18 messages/second in our current production, which I will have to abort because it will just take too long to finish).

@rickardoberg
Copy link
Author

rickardoberg commented Nov 3, 2016

Just to follow up: we have now completed our blue/green deployment from the broken production cluster (due to reordered events) into a new one where the events were reordered back. By doing the copy (subscribe to old ES, write to new ES) using only a single stream, we got performance that was adequate and copied the 6M events in a few hours.

The conclusion for now seems to be that you can either have the stream id be per aggregate, allowing you to load your aggregates from ES and do concurrency checking based on version, OR you can use a single stream id allowing you to do b/g deployments using the subscribe/write method. You cannot (practically speaking) get both, at least not with either available Java clients.

@t3hnar
Copy link
Contributor

t3hnar commented Nov 3, 2016

@rickardoberg good to know you restored data.

I think we can close the ticket.

@rickardoberg
Copy link
Author

@t3hnar well, so yes and no. I still think it's an issue that the JVM client reorders writes. If you want to open a new ticket more specific to that, and reference this one, that'd be fine.

@rickardoberg
Copy link
Author

@t3hnar I looked at the test, but I don't read Scala so can't really tell what it's trying to do. Can you explain? Does it test for the case outlined in this issue?

@t3hnar
Copy link
Contributor

t3hnar commented Nov 7, 2016

Yep, the test is just starting 1000 writes asynchronous then expect 1000 acks and then verify that order of acks matches the order of writes started.

actor ! WriteEvents(streamId, List(newEventData), ExpectedVersion.Exact(x)) this is an asynchronous write

expectMsgAllClassOf(List.fill(n)(classOf[WriteEventsCompleted]): _*) - expect acks

actual shouldEqual expected self explained :)

@rickardoberg
Copy link
Author

@t3hnar and it is checking the nr written, not the position in the log (which obviously is correct)? I'm a bit confused, because if that passes, then what am I doing wrong. Or does that fail?

@t3hnar
Copy link
Contributor

t3hnar commented Nov 7, 2016

  • my test writes to the same streams thus checking the order of responses + seqNr is totally correct
  • you could switch from using Future to direct message passing, that should improve order. To do that you would have to dive into http://akka.io to understand basic principles and how to implement message passing with java

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants