Skip to content

Commit

Permalink
Merge pull request #36 from matrix-org/kegan/waiter
Browse files Browse the repository at this point in the history
Waiter: change up the interface to improve dev UX
  • Loading branch information
kegsay authored Mar 22, 2024
2 parents 5812cdd + b45b1e8 commit 1922d73
Show file tree
Hide file tree
Showing 12 changed files with 88 additions and 65 deletions.
7 changes: 6 additions & 1 deletion internal/api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,12 @@ type Event struct {
}

type Waiter interface {
Wait(t ct.TestLike, s time.Duration)
// Wait for something to happen, up until the timeout s. If nothing happens,
// fail the test with the formatted string provided.
Waitf(t ct.TestLike, s time.Duration, format string, args ...any)
// Wait for something to happen, up until the timeout s. If nothing happens,
// return an error with the formatted string provided.
TryWaitf(t ct.TestLike, s time.Duration, format string, args ...any) error
}

func CheckEventHasBody(body string) func(e Event) bool {
Expand Down
17 changes: 13 additions & 4 deletions internal/api/js/js.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,15 @@ type jsTimelineWaiter struct {
client *JSClient
}

func (w *jsTimelineWaiter) Wait(t ct.TestLike, s time.Duration) {
func (w *jsTimelineWaiter) Waitf(t ct.TestLike, s time.Duration, format string, args ...any) {
t.Helper()
err := w.TryWaitf(t, s, format, args...)
if err != nil {
ct.Fatalf(t, err.Error())
}
}

func (w *jsTimelineWaiter) TryWaitf(t ct.TestLike, s time.Duration, format string, args ...any) error {
t.Helper()
updates := make(chan bool, 3)
cancel := w.client.listenForUpdates(func(roomID string, ev api.Event) {
Expand All @@ -494,17 +502,18 @@ func (w *jsTimelineWaiter) Wait(t ct.TestLike, s time.Duration) {
});`, w.roomID, CONSOLE_LOG_CONTROL_STRING,
))

msg := fmt.Sprintf(format, args...)
start := time.Now()
for {
timeLeft := s - time.Since(start)
if timeLeft <= 0 {
ct.Fatalf(t, "%s (js): Wait[%s]: timed out", w.client.userID, w.roomID)
return fmt.Errorf("%s (js): Wait[%s]: timed out: %s", w.client.userID, w.roomID, msg)
}
select {
case <-time.After(timeLeft):
ct.Fatalf(t, "%s (js): Wait[%s]: timed out", w.client.userID, w.roomID)
return fmt.Errorf("%s (js): Wait[%s]: timed out: %s", w.client.userID, w.roomID, msg)
case <-updates:
return
return nil // event exists
}
}
}
Expand Down
21 changes: 15 additions & 6 deletions internal/api/rust/rust.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,15 @@ type timelineWaiter struct {
client *RustClient
}

func (w *timelineWaiter) Wait(t ct.TestLike, s time.Duration) {
func (w *timelineWaiter) Waitf(t ct.TestLike, s time.Duration, format string, args ...any) {
t.Helper()
err := w.TryWaitf(t, s, format, args...)
if err != nil {
ct.Fatalf(t, err.Error())
}
}

func (w *timelineWaiter) TryWaitf(t ct.TestLike, s time.Duration, format string, args ...any) error {
t.Helper()

checkForEvent := func() bool {
Expand All @@ -629,7 +637,7 @@ func (w *timelineWaiter) Wait(t ct.TestLike, s time.Duration) {
}

if checkForEvent() {
return
return nil // event exists
}

updates := make(chan bool, 3)
Expand All @@ -653,21 +661,22 @@ func (w *timelineWaiter) Wait(t ct.TestLike, s time.Duration) {

// check again in case it was added after the previous checkForEvent but before AddListener
if checkForEvent() {
return
return nil // event exists
}

msg := fmt.Sprintf(format, args...)
// either no timeline or doesn't exist yet, start blocking
start := time.Now()
for {
timeLeft := s - time.Since(start)
if timeLeft <= 0 {
ct.Fatalf(t, "%s (rust): Wait[%s]: timed out", w.client.userID, w.roomID)
return fmt.Errorf("%s (rust): Wait[%s]: timed out: %s", w.client.userID, w.roomID, msg)
}
select {
case <-time.After(timeLeft):
ct.Fatalf(t, "%s (rust): Wait[%s]: timed out", w.client.userID, w.roomID)
return fmt.Errorf("%s (rust): Wait[%s]: timed out %s", w.client.userID, w.roomID, msg)
case <-updates:
return
return nil // event exists
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions internal/tests/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,13 @@ func TestReceiveTimeline(t *testing.T) {
stopSyncing := client.MustStartSyncing(t)
defer stopSyncing()
// wait until we see the latest event
client.WaitUntilEventInRoom(t, roomID, api.CheckEventHasEventID(eventIDs[len(eventIDs)-1])).Wait(t, 5*time.Second)
client.WaitUntilEventInRoom(t, roomID, api.CheckEventHasEventID(eventIDs[len(eventIDs)-1])).Waitf(t, 5*time.Second, "client did not see latest event")
// ensure we have backpaginated if we need to. It is valid for a client to only sync the latest
// event in the room, so we have to backpaginate here.
client.MustBackpaginate(t, roomID, len(eventIDs))
// ensure we see all the events
for _, eventID := range eventIDs {
client.WaitUntilEventInRoom(t, roomID, api.CheckEventHasEventID(eventID)).Wait(t, 5*time.Second)
client.WaitUntilEventInRoom(t, roomID, api.CheckEventHasEventID(eventID)).Waitf(t, 5*time.Second, "client did not see event %s", eventID)
}
// check event content is correct
for i, eventID := range eventIDs {
Expand All @@ -123,7 +123,7 @@ func TestReceiveTimeline(t *testing.T) {
// ensure we see all the events
for i, eventID := range eventIDs {
t.Logf("waiting for event %d : %s", i, eventID)
client.WaitUntilEventInRoom(t, roomID, api.CheckEventHasEventID(eventID)).Wait(t, 5*time.Second)
client.WaitUntilEventInRoom(t, roomID, api.CheckEventHasEventID(eventID)).Waitf(t, 5*time.Second, "client did not see event %s", eventID)
}
// now send another live event and ensure we see it. This ensure we can still wait for events after having
// previously waited for events.
Expand All @@ -135,7 +135,7 @@ func TestReceiveTimeline(t *testing.T) {
"body": "Final",
},
})
waiter.Wait(t, 5*time.Second)
waiter.Waitf(t, 5*time.Second, "client did not see final message")

// check event content is correct
for i, eventID := range eventIDs {
Expand Down Expand Up @@ -165,7 +165,7 @@ func TestCanWaitUntilEventInRoomBeforeRoomIsKnown(t *testing.T) {
completed := helpers.NewWaiter()
waiter := client.WaitUntilEventInRoom(t, roomID, api.CheckEventHasEventID(eventID))
go func() {
waiter.Wait(t, 5*time.Second)
waiter.Waitf(t, 5*time.Second, "client did not seee event %s", eventID)
completed.Finish()
}()
t.Logf("waiting for event %s", eventID)
Expand Down
2 changes: 1 addition & 1 deletion tests/device_keys_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestFailedDeviceKeyDownloadRetries(t *testing.T) {
t,
roomID,
api.CheckEventHasBody("checking whether we can send a message"),
).Wait(t, 5*time.Second)
).Waitf(t, 5*time.Second, "bob did not see alice's decrypted message")

})
})
Expand Down
14 changes: 7 additions & 7 deletions tests/federation_connectivity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestNewUserCannotGetKeysForOfflineServer(t *testing.T) {
waiter := bob.WaitUntilEventInRoom(t, roomID, api.CheckEventHasBody(wantMsgBody))
evID := alice.SendMessage(t, roomID, wantMsgBody)
t.Logf("bob (%s) waiting for event %s", bob.Type(), evID)
waiter.Wait(t, 5*time.Second)
waiter.Waitf(t, 5*time.Second, "bob did not see alice's message '%s'", wantMsgBody)

// now bob's HS becomes unreachable
tc.Deployment.PauseServer(t, "hs2")
Expand All @@ -58,7 +58,7 @@ func TestNewUserCannotGetKeysForOfflineServer(t *testing.T) {
waiter = alice.WaitUntilEventInRoom(t, roomID, api.CheckEventHasBody(wantUndecryptableMsgBody))
undecryptableEventID := charlie.SendMessage(t, roomID, wantUndecryptableMsgBody)
t.Logf("alice (%s) waiting for event %s", alice.Type(), undecryptableEventID)
waiter.Wait(t, 5*time.Second)
waiter.Waitf(t, 5*time.Second, "alice did not see charlie's messages '%s'", wantUndecryptableMsgBody)

// now bob's server comes back online
tc.Deployment.UnpauseServer(t, "hs2")
Expand All @@ -76,7 +76,7 @@ func TestNewUserCannotGetKeysForOfflineServer(t *testing.T) {
waiter = bob.WaitUntilEventInRoom(t, roomID, api.CheckEventHasBody(wantMsgBody))
evID = charlie.SendMessage(t, roomID, wantMsgBody)
t.Logf("bob (%s) waiting for event %s", bob.Type(), evID)
waiter.Wait(t, 5*time.Second)
waiter.Waitf(t, 5*time.Second, "bob did not see charlie's message '%s'", wantMsgBody)

// make sure bob cannot decrypt the msg from when his server was offline
// TODO: this isn't ideal, see https://github.com/matrix-org/matrix-rust-sdk/issues/2864
Expand Down Expand Up @@ -122,11 +122,11 @@ func TestExistingSessionCannotGetKeysForOfflineServer(t *testing.T) {
waiter := bob.WaitUntilEventInRoom(t, roomIDab, api.CheckEventHasBody(wantMsgBody))
evID := alice.SendMessage(t, roomIDab, wantMsgBody)
t.Logf("bob (%s) waiting for event %s", bob.Type(), evID)
waiter.Wait(t, 5*time.Second)
waiter.Waitf(t, 5*time.Second, "bob did not see alice's message: '%s'", wantMsgBody)
waiter = bob.WaitUntilEventInRoom(t, roomIDbc, api.CheckEventHasBody(wantMsgBody))
evID = charlie.SendMessage(t, roomIDbc, wantMsgBody)
t.Logf("bob (%s) waiting for event %s", bob.Type(), evID)
waiter.Wait(t, 5*time.Second)
waiter.Waitf(t, 5*time.Second, "bob did not see charlie's message: '%s'", wantMsgBody)

// now bob's HS becomes unreachable
tc.Deployment.PauseServer(t, "hs2")
Expand All @@ -144,13 +144,13 @@ func TestExistingSessionCannotGetKeysForOfflineServer(t *testing.T) {
waiter = alice.WaitUntilEventInRoom(t, roomIDab, api.CheckEventHasBody(wantDecryptableMsgBody))
decryptableEventID := charlie.SendMessage(t, roomIDab, wantDecryptableMsgBody)
t.Logf("alice (%s) waiting for event %s", alice.Type(), decryptableEventID)
waiter.Wait(t, 5*time.Second)
waiter.Waitf(t, 5*time.Second, "alice did not see charlie's message: '%s'", wantDecryptableMsgBody)

// now bob's server comes back online
tc.Deployment.UnpauseServer(t, "hs2")

waiter = bob.WaitUntilEventInRoom(t, roomIDab, api.CheckEventHasBody(wantDecryptableMsgBody))
waiter.Wait(t, 10*time.Second) // longer time to allow for retries
waiter.Waitf(t, 10*time.Second, "bob did not see charlie's message: '%s'", wantDecryptableMsgBody) // longer time to allow for retries
})
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func main() {
fmt.Println("Sent event " + eventID + " waiting for remote echo")

waiter := client.WaitUntilEventInRoom(t, roomID, api.CheckEventHasEventID(eventID))
waiter.Wait(t, 5*time.Second)
waiter.Waitf(t, 5*time.Second, "client did not see event %s", eventID)

time.Sleep(time.Second)
fmt.Println("exiting")
Expand Down
4 changes: 2 additions & 2 deletions tests/key_backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestCanBackupKeys(t *testing.T) {
waiter := backupCreator.WaitUntilEventInRoom(t, roomID, api.CheckEventHasBody(body))
evID := backupCreator.SendMessage(t, roomID, body)
t.Logf("backupCreator (%s) waiting for event %s", backupCreator.Type(), evID)
waiter.Wait(t, 5*time.Second)
waiter.Waitf(t, 5*time.Second, "backup creator did not see own message %s", evID)

// Now backupCreator backs up his keys. Some clients may automatically do this, but let's be explicit about it.
recoveryKey := backupCreator.MustBackupKeys(t)
Expand Down Expand Up @@ -101,7 +101,7 @@ func TestBackupWrongRecoveryKeyFails(t *testing.T) {
waiter := backupCreator.WaitUntilEventInRoom(t, roomID, api.CheckEventHasBody(body))
evID := backupCreator.SendMessage(t, roomID, body)
t.Logf("backupCreator (%s) waiting for event %s", backupCreator.Type(), evID)
waiter.Wait(t, 5*time.Second)
waiter.Waitf(t, 5*time.Second, "backup creator did not see own message %s", evID)

// Now backupCreator backs up his keys. Some clients may automatically do this, but let's be explicit about it.
recoveryKey := backupCreator.MustBackupKeys(t)
Expand Down
24 changes: 12 additions & 12 deletions tests/membership_acls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestAliceBobEncryptionWorks(t *testing.T) {

// Bob receives the message
t.Logf("bob (%s) waiting for event %s", bob.Type(), evID)
waiter.Wait(t, 5*time.Second)
waiter.Waitf(t, 5*time.Second, "bob did not see alice's message")
})
})
}
Expand Down Expand Up @@ -115,15 +115,15 @@ func TestCanDecryptMessagesAfterInviteButBeforeJoin(t *testing.T) {
sentinelBody := "Sentinel"
waiter := bob.WaitUntilEventInRoom(t, roomID, api.CheckEventHasBody(sentinelBody))
alice.SendMessage(t, roomID, sentinelBody)
waiter.Wait(t, 5*time.Second)
waiter.Waitf(t, 5*time.Second, "bob did not see alice's message")

// Explicitly ask for a pagination, rather than assuming the SDK will return events
// earlier than the join by default. This is important because:
// - sync v2 (JS SDK) it depends on the timeline limit, which is 20 by default but we don't want to assume.
// - sliding sync (FFI) it won't return events before the join by default, relying on clients using the prev_batch token.
waiter = bob.WaitUntilEventInRoom(t, roomID, api.CheckEventHasBody(wantMsgBody))
bob.MustBackpaginate(t, roomID, 5) // number is arbitrary, just needs to be >=2
waiter.Wait(t, 5*time.Second)
waiter.Waitf(t, 5*time.Second, "bob did not see backpaginated message")
})
}

Expand All @@ -143,13 +143,13 @@ func TestBobCanSeeButNotDecryptHistoryInPublicRoom(t *testing.T) {
waiter := alice.WaitUntilEventInRoom(t, roomID, api.CheckEventHasBody(beforeJoinBody))
evID := alice.SendMessage(t, roomID, beforeJoinBody)
t.Logf("alice (%s) waiting for event %s", alice.Type(), evID)
waiter.Wait(t, 5*time.Second)
waiter.Waitf(t, 5*time.Second, "alice did not see own message")

// now bob joins the room
tc.Bob.MustJoinRoom(t, roomID, []string{clientTypeA.HS})
time.Sleep(time.Second) // wait for it to appear on the client else rust crashes if it cannot find the room FIXME
waiter = bob.WaitUntilEventInRoom(t, roomID, api.CheckEventHasMembership(bob.UserID(), "join"))
waiter.Wait(t, 5*time.Second)
waiter.Waitf(t, 5*time.Second, "bob did not see own join")

// bob hits scrollback and should see but not be able to decrypt the message
bob.MustBackpaginate(t, roomID, 5)
Expand Down Expand Up @@ -198,24 +198,24 @@ func TestOnRejoinBobCanSeeButNotDecryptHistoryInPublicRoom(t *testing.T) {
waiter := bob.WaitUntilEventInRoom(t, roomID, api.CheckEventHasBody(bothJoinedBody))
evID := alice.SendMessage(t, roomID, bothJoinedBody)
t.Logf("bob (%s) waiting for event %s", bob.Type(), evID)
waiter.Wait(t, 5*time.Second)
waiter.Waitf(t, 5*time.Second, "bob did not see alice's message")

// now bob leaves the room, wait for alice to see it
waiter = alice.WaitUntilEventInRoom(t, roomID, api.CheckEventHasMembership(bob.UserID(), "leave"))
tc.Bob.MustLeaveRoom(t, roomID)
waiter.Wait(t, 5*time.Second)
waiter.Waitf(t, 5*time.Second, "alice did not see bob's leave")

// now alice sends another message, which should use a key that bob does not have. Wait for the remote echo to come back.
onlyAliceBody := "Only me on my lonesome"
waiter = alice.WaitUntilEventInRoom(t, roomID, api.CheckEventHasBody(onlyAliceBody))
evID = alice.SendMessage(t, roomID, onlyAliceBody)
t.Logf("alice (%s) waiting for event %s", alice.Type(), evID)
waiter.Wait(t, 5*time.Second)
waiter.Waitf(t, 5*time.Second, "alice did not see own message")

// now bob rejoins the room, wait until he sees it.
tc.Bob.MustJoinRoom(t, roomID, []string{clientTypeA.HS})
waiter = bob.WaitUntilEventInRoom(t, roomID, api.CheckEventHasMembership(bob.UserID(), "join"))
waiter.Wait(t, 5*time.Second)
waiter.Waitf(t, 5*time.Second, "bob did not see own join")
// this is required for some reason else tests fail
time.Sleep(time.Second)

Expand Down Expand Up @@ -270,7 +270,7 @@ func TestOnNewDeviceBobCanSeeButNotDecryptHistoryInPublicRoom(t *testing.T) {
waiter := bob.WaitUntilEventInRoom(t, roomID, api.CheckEventHasBody(onlyFirstDeviceBody))
evID := alice.SendMessage(t, roomID, onlyFirstDeviceBody)
t.Logf("bob (%s) waiting for event %s", bob.Type(), evID)
waiter.Wait(t, 5*time.Second)
waiter.Waitf(t, 5*time.Second, "bob did not see alice's message")

// now bob logs in on a new device. He should NOT be able to decrypt this event (though can see it due to history visibility)
csapiBob2 := tc.MustRegisterNewDevice(t, tc.Bob, clientTypeB.HS, "NEW_DEVICE")
Expand All @@ -294,7 +294,7 @@ func TestOnNewDeviceBobCanSeeButNotDecryptHistoryInPublicRoom(t *testing.T) {
waiter = bob2.WaitUntilEventInRoom(t, roomID, api.CheckEventHasBody(decryptableBody))
evID = alice.SendMessage(t, roomID, decryptableBody)
t.Logf("bob2 (%s) waiting for event %s", bob2.Type(), evID)
waiter.Wait(t, 5*time.Second)
waiter.Waitf(t, 5*time.Second, "bob2 did not see alice's message")

// now bob logs out
bob2StopSyncing()
Expand All @@ -310,7 +310,7 @@ func TestOnNewDeviceBobCanSeeButNotDecryptHistoryInPublicRoom(t *testing.T) {
waiter = bob.WaitUntilEventInRoom(t, roomID, api.CheckEventHasBody(undecryptableBody))
evID = alice.SendMessage(t, roomID, undecryptableBody)
t.Logf("bob (%s) waiting for event %s", bob.Type(), evID)
waiter.Wait(t, 5*time.Second)
waiter.Waitf(t, 5*time.Second, "bob did not see alice's event %s", evID)

// now bob logs in again
bob2 = tc.MustLoginClient(t, csapiBob2, clientTypeB)
Expand Down
12 changes: 6 additions & 6 deletions tests/one_time_keys_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,9 @@ func TestFallbackKeyIsUsedIfOneTimeKeysRunOut(t *testing.T) {
)
tc.Charlie.MustJoinRoom(t, roomID, []string{keyConsumerClientType.HS})
tc.Alice.MustJoinRoom(t, roomID, []string{keyConsumerClientType.HS})
charlie.WaitUntilEventInRoom(t, roomID, api.CheckEventHasMembership(alice.UserID(), "join")).Wait(t, 5*time.Second)
bob.WaitUntilEventInRoom(t, roomID, api.CheckEventHasMembership(alice.UserID(), "join")).Wait(t, 5*time.Second)
alice.WaitUntilEventInRoom(t, roomID, api.CheckEventHasMembership(alice.UserID(), "join")).Wait(t, 5*time.Second)
charlie.WaitUntilEventInRoom(t, roomID, api.CheckEventHasMembership(alice.UserID(), "join")).Waitf(t, 5*time.Second, "charlie did not see alice's join")
bob.WaitUntilEventInRoom(t, roomID, api.CheckEventHasMembership(alice.UserID(), "join")).Waitf(t, 5*time.Second, "bob did not see alice's join")
alice.WaitUntilEventInRoom(t, roomID, api.CheckEventHasMembership(alice.UserID(), "join")).Waitf(t, 5*time.Second, "alice did not see own join")
bob.SendMessage(t, roomID, "Hello world!")
charlie.SendMessage(t, roomID, "Goodbye world!")
waiter = alice.WaitUntilEventInRoom(t, roomID, api.CheckEventHasBody("Hello world!"))
Expand All @@ -146,9 +146,9 @@ func TestFallbackKeyIsUsedIfOneTimeKeysRunOut(t *testing.T) {
must.Equal(t, otkCount, 0, "OTKs were uploaded when they should have been blocked by mitmproxy")
})
// rust sdk needs /keys/upload to 200 OK before it will decrypt the hello world msg
waiter.Wait(t, 5*time.Second)
waiter.Waitf(t, 5*time.Second, "alice did not see bob's message")
// check charlie's message is also here
alice.WaitUntilEventInRoom(t, roomID, api.CheckEventHasBody("Goodbye world!")).Wait(t, 5*time.Second)
alice.WaitUntilEventInRoom(t, roomID, api.CheckEventHasBody("Goodbye world!")).Waitf(t, 5*time.Second, "alice did not see charlie's message")

// now /keys/upload is unblocked, make sure we upload new keys
alice.SendMessage(t, roomID, "Kick the client to upload OTKs... hopefully")
Expand Down Expand Up @@ -253,7 +253,7 @@ func TestFailedKeysClaimRetries(t *testing.T) {
// call /keys/claim. We should retry though.
tc.Bob.MustJoinRoom(t, roomID, []string{clientType.HS})
time.Sleep(time.Second) // FIXME using WaitUntilEventInRoom panics on rust because the room isn't there yet
bob.WaitUntilEventInRoom(t, roomID, api.CheckEventHasMembership(tc.Bob.UserID, "join")).Wait(t, 5*time.Second)
bob.WaitUntilEventInRoom(t, roomID, api.CheckEventHasMembership(tc.Bob.UserID, "join")).Waitf(t, 5*time.Second, "bob did not see own join event")

// Now send a message. On Rust, just sending 1 msg is enough to kick retry schedule.
// JS SDK won't retry the /keys/claim automatically. Try sending another event to kick it.
Expand Down
Loading

0 comments on commit 1922d73

Please sign in to comment.