Skip to content

Commit

Permalink
twittermeow: use snowflakes properly
Browse files Browse the repository at this point in the history
  • Loading branch information
tulir committed Nov 25, 2024
1 parent 5b5ebef commit 612377f
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 70 deletions.
4 changes: 2 additions & 2 deletions cmd/mautrix-twitter/legacymigrate.sql
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ SELECT
CASE WHEN receiver<>0 THEN CAST(receiver AS TEXT) ELSE '' END, -- room_receiver
'', -- sender_id
'', -- sender_mxid
0, -- timestamp
((id>>22)+1288834974657)*1000000, -- timestamp
0, -- edit_count
'{}' -- metadata
FROM message_old;
Expand All @@ -99,7 +99,7 @@ SELECT
'', -- emoji_id
(SELECT twid FROM portal_old WHERE portal_old.mxid=reaction_old.mx_room), -- room_id
CASE WHEN tw_receiver<>0 THEN CAST(tw_receiver AS TEXT) ELSE '' END, -- room_receiver
0, -- timestamp
((tw_reaction_id>>22)+1288834974657)*1000000, -- timestamp
mxid,
reaction, -- emoji
'{}' -- metadata
Expand Down
15 changes: 7 additions & 8 deletions pkg/connector/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,25 +66,25 @@ func (tc *TwitterClient) FetchMessages(ctx context.Context, params bridgev2.Fetc
Str("newest_raw_ts", messages[len(messages)-1].Time).
Msg("Fetched messages")
for _, msg := range messages {
sentAt, _ := methods.UnixStringMilliToTime(msg.MessageData.Time)
messageTS := methods.ParseSnowflake(msg.ID)
log := log.With().
Str("message_id", msg.MessageData.ID).
Str("message_raw_ts", msg.Time).
Time("message_ts", sentAt).
Time("message_ts", messageTS).
Logger()
if params.AnchorMessage != nil {
if string(params.AnchorMessage.ID) == msg.ID {
log.Warn().Msg("Skipping anchor message")
continue
} else if params.Forward && sentAt.Before(params.AnchorMessage.Timestamp) {
} else if params.Forward && messageTS.Before(params.AnchorMessage.Timestamp) {
log.Warn().Msg("Skipping too old message in forwards backfill")
continue
} else if !params.Forward && sentAt.After(params.AnchorMessage.Timestamp) {
} else if !params.Forward && messageTS.After(params.AnchorMessage.Timestamp) {
log.Warn().Msg("Skipping too new message in backwards backfill")
continue
}
}
log.Trace().Time("message_ts", sentAt).Msg("Converting message")
log.Trace().Msg("Converting message")
// TODO get correct intent
intent := tc.userLogin.Bridge.Matrix.BotIntent()
convertedMsg := &bridgev2.BackfillMessage{
Expand All @@ -94,7 +94,7 @@ func (tc *TwitterClient) FetchMessages(ctx context.Context, params bridgev2.Fetc
Sender: networkid.UserID(msg.MessageData.SenderID),
},
ID: networkid.MessageID(msg.MessageData.ID),
Timestamp: sentAt,
Timestamp: messageTS,
Reactions: tc.convertBackfillReactions(msg.MessageReactions),
}
converted = append(converted, convertedMsg)
Expand All @@ -115,9 +115,8 @@ func (tc *TwitterClient) FetchMessages(ctx context.Context, params bridgev2.Fetc
func (tc *TwitterClient) convertBackfillReactions(reactions []types.MessageReaction) []*bridgev2.BackfillReaction {
backfillReactions := make([]*bridgev2.BackfillReaction, 0)
for _, reaction := range reactions {
reactionTime, _ := methods.UnixStringMilliToTime(reaction.Time)
backfillReaction := &bridgev2.BackfillReaction{
Timestamp: reactionTime,
Timestamp: methods.ParseSnowflake(reaction.ID),
Sender: bridgev2.EventSender{
IsFromMe: reaction.SenderID == string(tc.userLogin.ID),
Sender: networkid.UserID(reaction.SenderID),
Expand Down
6 changes: 1 addition & 5 deletions pkg/connector/client_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,7 @@ func (tc *TwitterClient) syncChannels(ctx context.Context) {
methods.SortMessagesByTime(convInboxData.Messages)
messages := convInboxData.Messages
latestMessage := messages[len(messages)-1]
latestMessageTS, err := methods.UnixStringMilliToTime(latestMessage.MessageData.Time)
if err != nil {
log.Error().Err(err).Msg("failed to convert latest message TS to time.Time:")
return
}
latestMessageTS := methods.ParseSnowflake(latestMessage.MessageData.ID)
evt := &simplevent.ChatResync{
EventMeta: simplevent.EventMeta{
Type: bridgev2.RemoteEventChatResync,
Expand Down
41 changes: 9 additions & 32 deletions pkg/twittermeow/data/response/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,9 @@ func (data *XInboxData) ToEventEntries() ([]interface{}, error) {
return nil, err
}

reactionActionAt, err := methods.UnixStringMilliToTime(reactionEventData.Time)
if err != nil {
return nil, err
}
reactionActionAt := methods.ParseSnowflake(reactionEventData.ID)

// TODO remove pointless re-wrapping of events (applies to all types)
updatedReactionEventData := event.XEventReaction{
Conversation: data.GetConversationByID(reactionEventData.ConversationID),
Time: reactionActionAt,
Expand All @@ -89,10 +87,7 @@ func (data *XInboxData) ToEventEntries() ([]interface{}, error) {
return nil, err
}

createdAt, err := methods.UnixStringMilliToTime(messageEventData.MessageData.Time)
if err != nil {
return nil, err
}
createdAt := methods.ParseSnowflake(messageEventData.MessageData.ID)

updatedEntry = event.XEventMessage{
Conversation: data.GetConversationByID(messageEventData.ConversationID),
Expand All @@ -115,10 +110,7 @@ func (data *XInboxData) ToEventEntries() ([]interface{}, error) {
return nil, err
}

updatedAt, err := methods.UnixStringMilliToTime(convNameUpdateEventData.Time)
if err != nil {
return nil, err
}
updatedAt := methods.ParseSnowflake(convNameUpdateEventData.ID)

updatedEntry = event.XEventConversationNameUpdate{
Conversation: data.GetConversationByID(convNameUpdateEventData.ConversationID),
Expand All @@ -135,10 +127,7 @@ func (data *XInboxData) ToEventEntries() ([]interface{}, error) {
return nil, err
}

eventTime, err := methods.UnixStringMilliToTime(participantsJoinedEventData.Time)
if err != nil {
return nil, err
}
eventTime := methods.ParseSnowflake(participantsJoinedEventData.ID)

updatedEntry = event.XEventParticipantsJoined{
EventID: participantsJoinedEventData.ID,
Expand All @@ -155,10 +144,7 @@ func (data *XInboxData) ToEventEntries() ([]interface{}, error) {
return nil, err
}

deletedAt, err := methods.UnixStringMilliToTime(messageDeletedEventData.Time)
if err != nil {
return nil, err
}
deletedAt := methods.ParseSnowflake(messageDeletedEventData.ID)

updatedEntry = event.XEventMessageDeleted{
Conversation: data.GetConversationByID(messageDeletedEventData.ConversationID),
Expand All @@ -175,10 +161,7 @@ func (data *XInboxData) ToEventEntries() ([]interface{}, error) {
return nil, err
}

deletedAt, err := methods.UnixStringMilliToTime(convDeletedEventData.Time)
if err != nil {
return nil, err
}
deletedAt := methods.ParseSnowflake(convDeletedEventData.ID)

updatedEntry = event.XEventConversationDelete{
ConversationID: convDeletedEventData.ConversationID,
Expand All @@ -194,10 +177,7 @@ func (data *XInboxData) ToEventEntries() ([]interface{}, error) {
return nil, err
}

createdAt, err := methods.UnixStringMilliToTime(convCreatedEventData.Time)
if err != nil {
return nil, err
}
createdAt := methods.ParseSnowflake(convCreatedEventData.ID)

updatedEntry = event.XEventConversationCreated{
EventID: convCreatedEventData.ID,
Expand All @@ -214,10 +194,7 @@ func (data *XInboxData) ToEventEntries() ([]interface{}, error) {
return nil, err
}

updatedAt, err := methods.UnixStringMilliToTime(convMetadataUpdateEventData.Time)
if err != nil {
return nil, err
}
updatedAt := methods.ParseSnowflake(convMetadataUpdateEventData.ID)

updatedEntry = event.XEventConversationMetadataUpdate{
EventID: convMetadataUpdateEventData.ID,
Expand Down
7 changes: 3 additions & 4 deletions pkg/twittermeow/data/response/inbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ type PrettifiedMessage struct {
Reactions []types.MessageReaction
}

// TODO this function and struct are dumb, they should be deleted

func (data *XInboxData) PrettifyMessages(conversationID string) ([]PrettifiedMessage, error) {
messages, err := data.GetMessageEntriesByConversationID(conversationID, true)
if err != nil {
Expand All @@ -74,10 +76,7 @@ func (data *XInboxData) PrettifyMessages(conversationID string) ([]PrettifiedMes

prettifiedMessages := make([]PrettifiedMessage, 0)
for _, msg := range messages {
sentAt, err := methods.UnixStringMilliToTime(msg.Time)
if err != nil {
return nil, err
}
sentAt := methods.ParseSnowflake(msg.ID)

prettifiedMessage := PrettifiedMessage{
EventID: msg.ID,
Expand Down
27 changes: 8 additions & 19 deletions pkg/twittermeow/methods/methods.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package methods

import (
"cmp"
"slices"
"sort"
"strconv"
Expand All @@ -13,38 +12,28 @@ import (
"go.mau.fi/mautrix-twitter/pkg/twittermeow/data/types"
)

func UnixStringMilliToTime(input string) (time.Time, error) {
secs, err := strconv.ParseInt(input, 10, 64)
const TwitterEpoch = 1288834974657

func ParseSnowflake(msgID string) time.Time {
secs, err := strconv.ParseInt(msgID, 10, 64)
if err != nil {
return time.Time{}, err
return time.Time{}
}
return time.UnixMilli(secs), nil
return time.UnixMilli((secs >> 22) + TwitterEpoch)
}

func SortConversationsByTimestamp(conversations map[string]types.Conversation) []types.Conversation {
conversationValues := maps.Values(conversations)
slices.SortFunc(conversationValues, func(a, b types.Conversation) int {
timeA, errA := strconv.ParseInt(a.SortTimestamp, 10, 64)
timeB, errB := strconv.ParseInt(b.SortTimestamp, 10, 64)
if errB != nil || errA != nil {
return 0
}

return cmp.Compare(timeA, timeB)
return strings.Compare(a.SortTimestamp, b.SortTimestamp)
})

return conversationValues
}

func SortMessagesByTime(messages []types.Message) {
slices.SortFunc(messages, func(a, b types.Message) int {
timeA, errA := strconv.ParseInt(a.Time, 10, 64)
timeB, errB := strconv.ParseInt(b.Time, 10, 64)
if errB != nil || errA != nil {
return 0
}

return cmp.Compare(timeA, timeB)
return strings.Compare(a.ID, b.ID)
})
}

Expand Down

0 comments on commit 612377f

Please sign in to comment.