Skip to content

Commit

Permalink
Change Event API to be more flexible, log rust stuff to file
Browse files Browse the repository at this point in the history
  • Loading branch information
kegsay committed Nov 10, 2023
1 parent aa4c679 commit f9f6c75
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 50 deletions.
16 changes: 15 additions & 1 deletion internal/api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type Client interface {
SendMessage(t *testing.T, roomID, text string)
// Wait until an event with the given body is seen. Not all impls expose event IDs
// hence needing to use body as a proxy.
WaitUntilEventInRoom(t *testing.T, roomID, wantBody string) Waiter
WaitUntilEventInRoom(t *testing.T, roomID string, checker func(e Event) bool) Waiter
// Backpaginate in this room by `count` events.
MustBackpaginate(t *testing.T, roomID string, count int)
Type() ClientType
Expand Down Expand Up @@ -69,9 +69,23 @@ type Event struct {
Text string // FFI bindings don't expose the content object
Sender string
// FFI bindings don't expose state key
Target string
// FFI bindings don't expose type
Membership string
}

type Waiter interface {
Wait(t *testing.T, s time.Duration)
}

func CheckEventHasBody(body string) func(e Event) bool {
return func(e Event) bool {
return e.Text == body
}
}

func CheckEventHasMembership(target, membership string) func(e Event) bool {
return func(e Event) bool {
return e.Membership == membership && e.Target == target
}
}
75 changes: 50 additions & 25 deletions internal/api/js.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type JSClient struct {
ctx context.Context
cancel func()
baseJSURL string
listeners map[int32]func(roomID, text string)
listeners map[int32]func(roomID string, ev Event)
listenerID atomic.Int32
userID string
}
Expand All @@ -42,7 +42,7 @@ func NewJSClient(t *testing.T, opts ClientCreationOpts) (Client, error) {
chromedp.WithBrowserLogf(log.Printf), chromedp.WithBrowserErrorf(log.Printf), //chromedp.WithBrowserDebugf(log.Printf),
))
jsc := &JSClient{
listeners: make(map[int32]func(roomID, text string)),
listeners: make(map[int32]func(roomID string, ev Event)),
userID: opts.UserID,
}
// Listen for console logs for debugging AND to communicate live updates
Expand All @@ -60,10 +60,15 @@ func NewJSClient(t *testing.T, opts ClientCreationOpts) (Client, error) {

if strings.HasPrefix(s, CONSOLE_LOG_CONTROL_STRING) {
val := strings.TrimPrefix(s, CONSOLE_LOG_CONTROL_STRING)
// for now the format is always 'room_id||text'
// for now the format is always 'room_id||{event}'
segs := strings.Split(val, "||")
var ev JSEvent
if err := json.Unmarshal([]byte(segs[1]), &ev); err != nil {
colorify("[%s] failed to unmarshal event '%s' into Go %s\n", opts.UserID, segs[1], err)
continue
}
for _, l := range jsc.listeners {
l(segs[0], segs[1])
l(segs[0], jsToEvent(ev))
}
}
}
Expand Down Expand Up @@ -137,7 +142,7 @@ func NewJSClient(t *testing.T, opts ClientCreationOpts) (Client, error) {
if (event.getType() !== "m.room.message") {
return; // only use messages
}
console.log("%s"+event.getRoomId()+"||"+event.getEffectiveEvent().content.body);
console.log("%s"+event.getRoomId()+"||"+JSON.stringify(event.getEffectiveEvent()));
});`, CONSOLE_LOG_CONTROL_STRING))

jsc.ctx = ctx
Expand All @@ -152,7 +157,7 @@ func NewJSClient(t *testing.T, opts ClientCreationOpts) (Client, error) {
// log messages.
func (c *JSClient) Close(t *testing.T) {
c.cancel()
c.listeners = make(map[int32]func(roomID string, text string))
c.listeners = make(map[int32]func(roomID string, ev Event))
}

// StartSyncing to begin syncing from sync v2 / sliding sync.
Expand Down Expand Up @@ -192,24 +197,19 @@ func (c *JSClient) MustBackpaginate(t *testing.T, roomID string, count int) {
))
}

func (c *JSClient) WaitUntilEventInRoom(t *testing.T, roomID, wantBody string) Waiter {
exists := chrome.MustExecuteInto[bool](t, c.ctx, fmt.Sprintf(
`window.__client.getRoom("%s").getLiveTimeline().getEvents().map((e)=>{return e.getContent().body}).includes("%s");`, roomID, wantBody,
))

func (c *JSClient) WaitUntilEventInRoom(t *testing.T, roomID string, checker func(e Event) bool) Waiter {
return &jsTimelineWaiter{
roomID: roomID,
wantBody: wantBody,
client: c,
exists: exists,
roomID: roomID,
checker: checker,
client: c,
}
}

func (c *JSClient) Type() ClientType {
return ClientTypeJS
}

func (c *JSClient) listenForUpdates(callback func(roomID, gotText string)) (cancel func()) {
func (c *JSClient) listenForUpdates(callback func(roomID string, ev Event)) (cancel func()) {
id := c.listenerID.Add(1)
c.listeners[id] = callback
return func() {
Expand All @@ -218,28 +218,31 @@ func (c *JSClient) listenForUpdates(callback func(roomID, gotText string)) (canc
}

type jsTimelineWaiter struct {
roomID string
wantBody string
client *JSClient
exists bool
roomID string
checker func(e Event) bool
client *JSClient
}

func (w *jsTimelineWaiter) Wait(t *testing.T, s time.Duration) {
if w.exists {
return
}
updates := make(chan bool, 3)
cancel := w.client.listenForUpdates(func(roomID, gotText string) {
cancel := w.client.listenForUpdates(func(roomID string, ev Event) {
if w.roomID != roomID {
return
}
if w.wantBody != gotText {
if !w.checker(ev) {
return
}
updates <- true
})
defer cancel()

// check if it already exists by echoing the current timeline. This will call the callback above.
chrome.MustExecute(t, w.client.ctx, fmt.Sprintf(
`window.__client.getRoom("%s")?.getLiveTimeline()?.getEvents().forEach((e)=>{
console.log("%s"+e.getRoomId()+"||"+JSON.stringify(e.getEffectiveEvent()));
});`, w.roomID, CONSOLE_LOG_CONTROL_STRING,
))

start := time.Now()
for {
timeLeft := s - time.Since(start)
Expand All @@ -262,3 +265,25 @@ func colorify(format string, args ...any) {
format = ansiYellowForeground + format + ansiResetForeground
fmt.Printf(format, args...)
}

type JSEvent struct {
Type string `json:"type"`
Sender string `json:"sender,omitempty"`
StateKey *string `json:"state_key,omitempty"`
Content map[string]interface{} `json:"content"`
ID string `json:"event_id"`
}

func jsToEvent(j JSEvent) Event {
var ev Event
ev.Sender = j.Sender
ev.ID = j.ID
switch j.Type {
case "m.room.member":
ev.Target = *j.StateKey
ev.Membership = j.Content["membership"].(string)
case "m.room.message":
ev.Text = j.Content["body"].(string)
}
return ev
}
83 changes: 68 additions & 15 deletions internal/api/rust.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,17 @@ import (

func init() {
matrix_sdk_ffi.SetupTracing(matrix_sdk_ffi.TracingConfiguration{
WriteToStdoutOrSystem: true,
WriteToStdoutOrSystem: false,
Filter: "debug",
WriteToFiles: &matrix_sdk_ffi.TracingFileConfiguration{
Path: ".",
FilePrefix: "rust_sdk",
},
})
}

var zero uint32

type RustRoomInfo struct {
attachedListener bool
room *matrix_sdk_ffi.Room
Expand All @@ -33,6 +39,7 @@ type RustClient struct {
}

func NewRustClient(t *testing.T, opts ClientCreationOpts, ssURL string) (Client, error) {
t.Logf("NewRustClient[%s] creating...", opts.UserID)
ab := matrix_sdk_ffi.NewClientBuilder().HomeserverUrl(opts.BaseURL).SlidingSyncProxy(&ssURL)
client, err := ab.Build()
if err != nil {
Expand All @@ -46,25 +53,30 @@ func NewRustClient(t *testing.T, opts ClientCreationOpts, ssURL string) (Client,
if err != nil {
return nil, fmt.Errorf("Client.Login failed: %s", err)
}
return &RustClient{
c := &RustClient{
userID: opts.UserID,
FFIClient: client,
rooms: make(map[string]*RustRoomInfo),
listeners: make(map[int32]func(roomID string)),
}, nil
}
c.logf(t, "NewRustClient[%s] created client", opts.UserID)
return c, nil
}

func (c *RustClient) Close(t *testing.T) {
t.Helper()
c.logf(t, "Close[%s]", c.userID)
c.FFIClient.Destroy()
}

// StartSyncing to begin syncing from sync v2 / sliding sync.
// Tests should call stopSyncing() at the end of the test.
func (c *RustClient) StartSyncing(t *testing.T) (stopSyncing func()) {
t.Helper()
c.logf(t, "StartSyncing[%s]", c.userID)
syncService, err := c.FFIClient.SyncService().FinishBlocking()
must.NotError(t, fmt.Sprintf("[%s]failed to make sync service", c.userID), err)
c.syncService = syncService
t.Logf("%s: Starting sync service", c.userID)
go syncService.StartBlocking()
return func() {
t.Logf("%s: Stopping sync service", c.userID)
Expand All @@ -75,6 +87,8 @@ func (c *RustClient) StartSyncing(t *testing.T) (stopSyncing func()) {
// IsRoomEncrypted returns true if the room is encrypted. May return an error e.g if you
// provide a bogus room ID.
func (c *RustClient) IsRoomEncrypted(t *testing.T, roomID string) (bool, error) {
t.Helper()
c.logf(t, "IsRoomEncrypted[%s] %s", c.userID, roomID)
r := c.findRoom(roomID)
if r == nil {
rooms := c.FFIClient.Rooms()
Expand All @@ -83,12 +97,14 @@ func (c *RustClient) IsRoomEncrypted(t *testing.T, roomID string) (bool, error)
return r.IsEncrypted()
}

func (c *RustClient) WaitUntilEventInRoom(t *testing.T, roomID, wantBody string) Waiter {
func (c *RustClient) WaitUntilEventInRoom(t *testing.T, roomID string, checker func(Event) bool) Waiter {
t.Helper()
c.logf(t, "WaitUntilEventInRoom[%s] %s", c.userID, roomID)
c.ensureListening(t, roomID)
return &timelineWaiter{
roomID: roomID,
wantBody: wantBody,
client: c,
roomID: roomID,
checker: checker,
client: c,
}
}

Expand All @@ -99,6 +115,8 @@ func (c *RustClient) Type() ClientType {
// SendMessage sends the given text as an m.room.message with msgtype:m.text into the given
// room. Returns the event ID of the sent event.
func (c *RustClient) SendMessage(t *testing.T, roomID, text string) {
t.Helper()
c.logf(t, "SendMessage[%s] %s => %s", c.userID, roomID, text)
// we need a timeline listener before we can send messages, AND that listener must be attached to the
// same *Room you call .Send on :S
r := c.ensureListening(t, roomID)
Expand All @@ -108,7 +126,7 @@ func (c *RustClient) SendMessage(t *testing.T, roomID, text string) {

func (c *RustClient) MustBackpaginate(t *testing.T, roomID string, count int) {
t.Helper()
t.Logf("[%s] MustBackpaginate %d %s", c.userID, count, roomID)
c.logf(t, "[%s] MustBackpaginate %d %s", c.userID, count, roomID)
r := c.findRoom(roomID)
must.NotEqual(t, r, nil, "unknown room")
must.NotError(t, "failed to backpaginate", r.PaginateBackwards(matrix_sdk_ffi.PaginationOptionsSingleRequest{
Expand All @@ -134,6 +152,12 @@ func (c *RustClient) findRoom(roomID string) *matrix_sdk_ffi.Room {
return nil
}

func (c *RustClient) logf(t *testing.T, format string, args ...interface{}) {
t.Helper()
matrix_sdk_ffi.LogEvent("rust.go", &zero, matrix_sdk_ffi.LogLevelInfo, t.Name(), fmt.Sprintf(format, args...))
t.Logf(format, args...)
}

func (c *RustClient) ensureListening(t *testing.T, roomID string) *matrix_sdk_ffi.Room {
r := c.findRoom(roomID)
must.NotEqual(t, r, nil, fmt.Sprintf("room %s does not exist", roomID))
Expand Down Expand Up @@ -220,31 +244,32 @@ func (c *RustClient) listenForUpdates(callback func(roomID string)) (cancel func
}

type timelineWaiter struct {
roomID string
wantBody string
client *RustClient
roomID string
checker func(e Event) bool
client *RustClient
}

func (w *timelineWaiter) Wait(t *testing.T, s time.Duration) {
t.Helper()

checkForEvent := func() bool {
t.Helper()
// check if it exists in the timeline already
info := w.client.rooms[w.roomID]
if info == nil {
fmt.Printf("_____checkForEvent[%s] '%s' room does not exist\n", w.client.userID, w.wantBody)
fmt.Printf("_____checkForEvent[%s] room does not exist\n", w.client.userID)
return false
}
for _, ev := range info.timeline {
if ev == nil {
continue
}
if ev.Text == w.wantBody {
if w.checker(*ev) {
t.Logf("%s: Wait[%s]: event exists in the timeline", w.client.userID, w.roomID)
return true
}
}
fmt.Printf("_____checkForEvent[%s] '%s' checked %d timeline events and no match \n", w.client.userID, w.wantBody, len(info.timeline))
fmt.Printf("_____checkForEvent[%s] checked %d timeline events and no match \n", w.client.userID, len(info.timeline))
return false
}

Expand Down Expand Up @@ -304,6 +329,34 @@ func timelineItemToEvent(item *matrix_sdk_ffi.TimelineItem) *Event {
ID: eventID,
Sender: evv.Sender(),
}
switch k := evv.Content().Kind().(type) {
case matrix_sdk_ffi.TimelineItemContentKindRoomMembership:
complementEvent.Target = k.UserId
change := *k.Change
switch change {
case matrix_sdk_ffi.MembershipChangeInvited:
complementEvent.Membership = "invite"
case matrix_sdk_ffi.MembershipChangeBanned:
fallthrough
case matrix_sdk_ffi.MembershipChangeKickedAndBanned:
complementEvent.Membership = "ban"
case matrix_sdk_ffi.MembershipChangeJoined:
complementEvent.Membership = "join"
case matrix_sdk_ffi.MembershipChangeLeft:
fallthrough
case matrix_sdk_ffi.MembershipChangeInvitationRevoked:
fallthrough
case matrix_sdk_ffi.MembershipChangeInvitationRejected:
fallthrough
case matrix_sdk_ffi.MembershipChangeKicked:
fallthrough
case matrix_sdk_ffi.MembershipChangeUnbanned:
complementEvent.Membership = "leave"
default:
fmt.Printf("%s unhandled membership %d\n", k.UserId, change)
}
}

content := evv.Content()
if content != nil {
msg := content.AsMessage()
Expand Down
Loading

0 comments on commit f9f6c75

Please sign in to comment.