From 7d01ec2df4e492b74d23802ee2b55cd8de895118 Mon Sep 17 00:00:00 2001 From: Ben Pate Date: Sun, 26 Nov 2023 13:27:07 -0700 Subject: [PATCH] Support ActivityStream statistics --- handler/message.go | 2 +- model/linkOrigin.go | 12 ++--- model/message.go | 27 ---------- model/response.go | 11 +++++ render/renderer_message.go | 27 +++++++--- render/renderer_oAuthAuthorization.go | 2 - service/activityStreams.go | 71 ++++++++++++++++----------- service/outbox.go | 26 ++-------- service/outbox_publisher.go | 13 +++-- service/response.go | 59 ++++++++++------------ tools/ascache/client.go | 4 -- 11 files changed, 118 insertions(+), 136 deletions(-) diff --git a/handler/message.go b/handler/message.go index f8c8d3f0f..68c8878a1 100644 --- a/handler/message.go +++ b/handler/message.go @@ -133,7 +133,7 @@ func renderMessage(serverFactory *server.Factory, actionMethod render.ActionMeth actionID := first.String(context.Param("action"), "view") // Create the new Renderer - renderer, err := render.NewMessage(factory, context.Request(), context.Response(), inboxService, &message, actionID) + renderer, err := render.NewMessage(factory, context.Request(), context.Response(), factory.Inbox(), factory.ActivityStreams(), &message, actionID) if err != nil { return derp.Wrap(err, location, "Error creating renderer") diff --git a/model/linkOrigin.go b/model/linkOrigin.go index 90fcfa579..a242f0e53 100644 --- a/model/linkOrigin.go +++ b/model/linkOrigin.go @@ -7,12 +7,12 @@ import ( // OriginLink represents the original source of a stream that has been imported into Emissary. // This could be an external ActivityPub server, RSS Feed, or Tweet. type OriginLink struct { - Type string `json:"type" bson:"type,omitempty"` // The type of message that this document (DIRECT, LIKE, DISLIKE, REPLY, ANNOUNCE) - FollowingID primitive.ObjectID `json:"followinglId" bson:"followingId,omitempty"` // Unique ID of a document in this database - Label string `json:"label" bson:"label,omitempty"` // Human-friendly label of the origin - URL string `json:"url" bson:"url,omitempty"` // Public URL of the origin - ProfileURL string `json:"profileUrl" bson:"profileUrl,omitempty"` // URL of the profile image for this document's image - ImageURL string `json:"imageUrl" bson:"imageUrl,omitempty"` // URL of the cover image for this document's image + Type string `json:"type" bson:"type,omitempty"` // The type of message that this document (DIRECT, LIKE, DISLIKE, REPLY, ANNOUNCE) + FollowingID primitive.ObjectID `json:"followingId" bson:"followingId,omitempty"` // Unique ID of a document in this database + Label string `json:"label" bson:"label,omitempty"` // Human-friendly label of the origin + URL string `json:"url" bson:"url,omitempty"` // Public URL of the origin + ProfileURL string `json:"profileUrl" bson:"profileUrl,omitempty"` // URL of the profile image for this document's image + ImageURL string `json:"imageUrl" bson:"imageUrl,omitempty"` // URL of the cover image for this document's image } // NewOriginLink returns a fully initialized OriginLink diff --git a/model/message.go b/model/message.go index d989eab60..cb104cfb6 100644 --- a/model/message.go +++ b/model/message.go @@ -246,33 +246,6 @@ func increment(value *int) { *value++ } -// Map returns a mep representation of this object -func (message Message) Map() map[string]any { - - return map[string]any{ - "messageId": message.MessageID, - "userId": message.UserID, - "followingId": message.FollowingID, - "folderId": message.FolderID, - "socialRole": message.SocialRole, - "origin": message.Origin, - "references": message.References, - "url": message.URL, - "label": message.Label, - "summary": message.Summary, - "imageUrl": message.ImageURL, - "attributedTo": message.AttributedTo, - "inReplyTyo": message.InReplyTo, - "contentHtml": message.ContentHTML, - "resonses": message.Responses, - "myResponse": message.MyResponse, - "muted": message.Muted, - "readDate": message.ReadDate, - "publishDate": message.PublishDate, - "rank": message.Rank, - } -} - /****************************************** * Mastodon API ******************************************/ diff --git a/model/response.go b/model/response.go index 1ee15ba1d..7e7756581 100644 --- a/model/response.go +++ b/model/response.go @@ -135,10 +135,21 @@ func (response *Response) CalcContent() { // Nothin to return. } +// CreateDateSeconds returns the CreateDate in Unix Epoch seconds (instead of milliseconds) func (response Response) CreateDateSeconds() int64 { return response.CreateDate / 1000 } +// IsEmpty returns TRUE if this Response has no data in it. +func (response Response) IsEmpty() bool { + return response.Type == "" +} + +// NotEmpty returns TRUE if this Response has data in it. +func (response Response) NotEmpty() bool { + return !response.IsEmpty() +} + /****************************************** * Mastodon API ******************************************/ diff --git a/render/renderer_message.go b/render/renderer_message.go index 5a22e6eba..262e97d7d 100644 --- a/render/renderer_message.go +++ b/render/renderer_message.go @@ -20,13 +20,14 @@ import ( // Message renders individual messages from a User's Inbox. type Message struct { - _service *service.Inbox - _message *model.Message + _service *service.Inbox + _message *model.Message + _activityStreams *service.ActivityStreams Common } // NewMessage returns a fully initialized `Message` renderer. -func NewMessage(factory Factory, request *http.Request, response http.ResponseWriter, modelService *service.Inbox, message *model.Message, actionID string) (Message, error) { +func NewMessage(factory Factory, request *http.Request, response http.ResponseWriter, modelService *service.Inbox, activityStreamsService *service.ActivityStreams, message *model.Message, actionID string) (Message, error) { const location = "render.NewMessage" @@ -61,9 +62,10 @@ func NewMessage(factory Factory, request *http.Request, response http.ResponseWr } } return Message{ - _service: modelService, - _message: message, - Common: common, + _service: modelService, + _message: message, + _activityStreams: activityStreamsService, + Common: common, }, nil } @@ -131,7 +133,7 @@ func (w Message) View(actionID string) (template.HTML, error) { const location = "render.Message.View" // Create a new renderer (this will also validate the user's permissions) - subStream, err := NewMessage(w._factory, w._request, w._response, w._service, w._message, actionID) + subStream, err := NewMessage(w._factory, w._request, w._response, w._service, w._activityStreams, w._message, actionID) if err != nil { return template.HTML(""), derp.Wrap(err, location, "Error creating sub-renderer") @@ -146,21 +148,30 @@ func (w Message) templateRole() string { } func (w Message) clone(action string) (Renderer, error) { - return NewMessage(w._factory, w._request, w._response, w._service, w._message, action) + return NewMessage(w._factory, w._request, w._response, w._service, w._activityStreams, w._message, action) } /****************************************** * Data Access Methods ******************************************/ +// MessageID returns the inbox message ID for the object func (w Message) MessageID() string { return w._message.MessageID.Hex() } +// URL returns the public URL for the object func (w Message) URL() string { return w._message.URL } +// ActivityStream returns a hannibal Document that this message wraps +func (w Message) ActivityStream() streams.Document { + result, err := w._activityStreams.Load(w._message.URL) + derp.Report(err) + return result +} + func (w Message) AttributedTo() model.PersonLink { return w._message.AttributedTo } diff --git a/render/renderer_oAuthAuthorization.go b/render/renderer_oAuthAuthorization.go index 5237053da..932dbf977 100644 --- a/render/renderer_oAuthAuthorization.go +++ b/render/renderer_oAuthAuthorization.go @@ -6,7 +6,6 @@ import ( "github.com/EmissarySocial/emissary/model" "github.com/EmissarySocial/emissary/service" "github.com/benpate/derp" - "github.com/davecgh/go-spew/spew" "go.mongodb.org/mongo-driver/bson/primitive" ) @@ -64,7 +63,6 @@ func (r OAuthAuthorization) Website() string { } func (r OAuthAuthorization) RedirectURI() string { - spew.Dump(r._request) return r._request.RedirectURI } diff --git a/service/activityStreams.go b/service/activityStreams.go index 21ff4e33f..7fa85b856 100644 --- a/service/activityStreams.go +++ b/service/activityStreams.go @@ -16,7 +16,7 @@ import ( // ActivityStreams implements the Hannibal HTTP client interface, and provides a cache for ActivityStreams documents. type ActivityStreams struct { documentCollection data.Collection - innerClient streams.Client + innerClient *ascache.Client } /****************************************** @@ -29,7 +29,7 @@ func NewActivityStreams() ActivityStreams { } // Refresh updates the ActivityStreams service with new dependencies -func (service *ActivityStreams) Refresh(innerClient streams.Client, documentCollection data.Collection) { +func (service *ActivityStreams) Refresh(innerClient *ascache.Client, documentCollection data.Collection) { service.innerClient = innerClient service.documentCollection = documentCollection } @@ -50,6 +50,18 @@ func (service *ActivityStreams) Load(uri string, options ...any) (streams.Docume return service.innerClient.Load(uri, options...) } +// Delete removes a single document from the database by its URL +func (service *ActivityStreams) Delete(uri string) error { + + const location = "service.ActivityStreams.Delete" + + if err := service.innerClient.Delete(uri); err != nil { + return derp.Wrap(err, location, "Error deleting document from cache", uri) + } + + return nil +} + /****************************************** * Custom Behaviors ******************************************/ @@ -62,9 +74,8 @@ func (service *ActivityStreams) PurgeCache() error { return derp.NewInternalError("service.ActivityStreams.PurgeCache", "Document Collection not initialized") } - criteria := exp.LessThan("expires", time.Now().Unix()) - // Purge all expired Documents + criteria := exp.LessThan("expires", time.Now().Unix()) if err := service.documentCollection.HardDelete(criteria); err != nil { return derp.Wrap(err, "service.ActivityStreams.PurgeCache", "Error purging documents") } @@ -76,56 +87,56 @@ func (service *ActivityStreams) PurgeCache() error { * Custom Query Methods ******************************************/ -// DeleteDocumentByURL removes a single document from the database by its URL -func (service *ActivityStreams) DeleteDocumentByURL(url string) error { - - // NPE Check - if service.documentCollection == nil { - return derp.NewInternalError("service.ActivityStreams.DeleteDocumentByURL", "Document Collection not initialized") - } - - // Forward request to documentCollection - return service.documentCollection.HardDelete(exp.Equal("uri", url)) -} - // QueryRepliesBeforeDate returns a slice of streams.Document values that are replies to the specified document, and were published before the specified date. func (service *ActivityStreams) QueryRepliesBeforeDate(inReplyTo string, maxDate int64, maxRows int) (streams.Document, error) { + const location = "service.ActivityStreams.QueryRepliesBeforeDate" + // NPE Check if service.documentCollection == nil { - return streams.Document{}, derp.NewInternalError("service.ActivityStreams.QueryRepliesBeforeDate", "Document Collection not initialized") + return streams.Document{}, derp.NewInternalError(location, "Document Collection not initialized") } // Build the query criteria := exp. - Equal("inReplyTo", inReplyTo). + Equal("relationType", "Reply"). + AndEqual("relationHref", inReplyTo). AndLessThan("published", maxDate) results, err := service.documentQuery(criteria, option.SortDesc("published"), option.MaxRows(int64(maxRows))) + if err != nil { + return streams.Document{}, derp.Wrap(err, location, "Error querying database") + } + // Return the results as a streams.Document / collection - return streams.NewDocument(results.Reverse(), streams.WithClient(service)), - derp.Wrap(err, "service.ActivityStreams.QueryRepliesAfterDate", "Error querying database") + return streams.NewDocument(results.Reverse(), streams.WithClient(service)), nil } // QueryRepliesAfterDate returns a slice of streams.Document values that are replies to the specified document, and were published after the specified date. func (service *ActivityStreams) QueryRepliesAfterDate(inReplyTo string, minDate int64, maxRows int) (streams.Document, error) { + const location = "service.ActivityStreams.QueryRepliesAfterDate" + // NPE Check if service.documentCollection == nil { - return streams.Document{}, derp.NewInternalError("service.ActivityStreams.QueryRepliesAfterDate", "Document Collection not initialized") + return streams.Document{}, derp.NewInternalError(location, "Document Collection not initialized") } // Build the query criteria := exp. - Equal("inReplyTo", inReplyTo). + Equal("relationType", "Reply"). + AndEqual("relationHref", inReplyTo). AndGreaterThan("published", minDate) results, err := service.documentQuery(criteria, option.SortAsc("published"), option.MaxRows(int64(maxRows))) + if err != nil { + return streams.Document{}, derp.Wrap(err, location, "Error querying database") + } + // Return the result as a streams.Document / collection - return streams.NewDocument(results, streams.WithClient(service)), - derp.Wrap(err, "service.ActivityStreams.QueryRepliesAfterDate", "Error querying database") + return streams.NewDocument(results, streams.WithClient(service)), nil } /****************************************** @@ -135,9 +146,11 @@ func (service *ActivityStreams) QueryRepliesAfterDate(inReplyTo string, minDate // iterator reads from the database and returns a data.Iterator with the result values. func (service *ActivityStreams) documentIterator(criteria exp.Expression, options ...option.Option) (data.Iterator, error) { + const location = "service.ActivityStreams.documentIterator" + // NPE Check if service.documentCollection == nil { - return nil, derp.NewInternalError("service.ActivityStreams.documentIterator", "Document Collection not initialized") + return nil, derp.NewInternalError(location, "Document Collection not initialized") } // Forward request to documentCollection @@ -147,16 +160,18 @@ func (service *ActivityStreams) documentIterator(criteria exp.Expression, option // query reads from the database and returns a slice of streams.Document values func (service *ActivityStreams) documentQuery(criteria exp.Expression, options ...option.Option) (sliceof.Object[mapof.Any], error) { + const location = "service.ActivityStreams.documentQuery" + // NPE Check if service.documentCollection == nil { - return nil, derp.NewInternalError("service.ActivityStreams.documentQuery", "Document Collection not initialized") + return nil, derp.NewInternalError(location, "Document Collection not initialized") } // Create the Iterator iterator, err := service.documentIterator(criteria, options...) if err != nil { - return nil, derp.Wrap(err, "service.ActivityStreams.Query", "Error querying database") + return nil, derp.Wrap(err, location, "Error querying database") } // Initialize result slice @@ -169,7 +184,7 @@ func (service *ActivityStreams) documentQuery(criteria exp.Expression, options . value = ascache.NewCachedValue() if err := iterator.Error(); err != nil { - return nil, derp.Wrap(err, "emisary.tools.cache.Client.Query", "Error during iteration") + return nil, derp.Wrap(err, location, "Error during iteration") } } diff --git a/service/outbox.go b/service/outbox.go index d7d3f229a..1d6a6c717 100644 --- a/service/outbox.go +++ b/service/outbox.go @@ -104,41 +104,21 @@ func (service *Outbox) Save(outboxMessage *model.OutboxMessage, note string) err // Delete removes an Outbox from the database (virtual delete) func (service *Outbox) Delete(outboxMessage *model.OutboxMessage, note string) error { - // Delete Outbox record last. + // Delete the message from the outbox criteria := exp.Equal("_id", outboxMessage.OutboxMessageID) if err := service.collection.HardDelete(criteria); err != nil { return derp.Wrap(err, "service.Outbox", "Error deleting Outbox", outboxMessage, note) } - if err := service.activityStreamsService.DeleteDocumentByURL(outboxMessage.URL); err != nil { + // Delete the document from the cache + if err := service.activityStreamsService.Delete(outboxMessage.URL); err != nil { return derp.Wrap(err, "service.Outbox", "Error deleting ActivityStream", outboxMessage, note) } return nil } -// DeleteMany removes all child streams from the provided stream (virtual delete) -func (service *Outbox) DeleteMany(criteria exp.Expression, note string) error { - - it, err := service.List(criteria) - - if err != nil { - return derp.Wrap(err, "service.Message.Delete", "Error listing streams to delete", criteria) - } - - outboxMessage := model.NewOutboxMessage() - - for it.Next(&outboxMessage) { - if err := service.Delete(&outboxMessage, note); err != nil { - return derp.Wrap(err, "service.Message.Delete", "Error deleting outboxMessage", outboxMessage) - } - outboxMessage = model.NewOutboxMessage() - } - - return nil -} - /****************************************** * Custom Query Methods ******************************************/ diff --git a/service/outbox_publisher.go b/service/outbox_publisher.go index b33deeeb9..a3ac1086b 100644 --- a/service/outbox_publisher.go +++ b/service/outbox_publisher.go @@ -46,10 +46,15 @@ func (service Outbox) UnPublish(userID primitive.ObjectID, url string, activity // Try to load the existing outbox message outboxMessage := model.NewOutboxMessage() - if err := service.LoadByURL(userID, url, &outboxMessage); err != nil { - if derp.NotFound(err) { - return nil - } + err := service.LoadByURL(userID, url, &outboxMessage) + + // If the message is not found, then there's nothing to delete + if derp.NotFound(err) { + return nil + } + + // Report all other errors to the caller + if err != nil { return derp.Wrap(err, location, "Error loading outbox message", userID, url) } diff --git a/service/response.go b/service/response.go index 86bba43ee..52c1271a1 100644 --- a/service/response.go +++ b/service/response.go @@ -212,55 +212,51 @@ func (service *Response) SetResponse(response *model.Response) error { const location = "service.Response.SetResponse" - // Validate the response + // Normalize response content response.CalcContent() + // Try to load the User who created this Response. Only authenticated Users can create Response records. user := model.NewUser() - err := service.userService.LoadByProfileURL(response.ActorID, &user) - // If this is a legitimate error, then abort. - if (err != nil) && !derp.NotFound(err) { + if err := service.userService.LoadByProfileURL(response.ActorID, &user); err != nil { return derp.Wrap(err, location, "Error loading user", response.ActorID) } - // NOTE if the actorID is not found then the User will be a blank object + // RULE: Set the Response URL using the User's "liked" collection. + response.URL = user.ActivityPubLikedURL() + "/" + response.ResponseID.Hex() - // If a response already exists, then delete it first. + // Search for a previous Response from this User oldResponse := model.NewResponse() - err = service.LoadByActorAndObject(response.ActorID, response.ObjectID, &oldResponse) - // RULE: if the response exists.... - if err == nil { + if err := service.LoadByActorAndObject(response.ActorID, response.ObjectID, &oldResponse); !derp.NilOrNotFound(err) { + return derp.Wrap(err, location, "Error loading original response", oldResponse) + } + + // If the database had a previous Response, then delete it. + if oldResponse.NotEmpty() { - // If there was no change, then there's nothing to do. + // ... except if the new Response is the same as the old Response. + // If there was no change, then there's nothing else to do. if response.IsEqual(oldResponse) { return nil } - // Otherwise, delete the old response (which triggers other logic) + // Otherwise, delete the old Response if err := service.Delete(&oldResponse, ""); err != nil { return derp.Wrap(err, location, "Error deleting old response", oldResponse) } - // Responses from local Actors should be removed from the Outbox - if !user.IsNew() { + // Unpublish from the Outbox, and send the "Undo" activity to followers + undoActivity := pub.Undo(user.ActivityPubURL(), response.GetJSONLD()) - // Create an "Undo" activity - undoActivity := pub.Undo(user.ActivityPubURL(), response.GetJSONLD()) - - // Send the "Undo" activity to followers - if err := service.outboxService.UnPublish(user.UserID, response.URL, undoActivity); err != nil { - derp.Report(derp.Wrap(err, location, "Error publishing Response", response)) - } - } - - // RULE: If there is no response type, then this is a DELETE-ONLY operation. Do not create a new response. - if response.Type == "" { - return nil + if err := service.outboxService.UnPublish(user.UserID, oldResponse.URL, undoActivity); err != nil { + derp.Report(derp.Wrap(err, location, "Error publishing Response", oldResponse)) } + } - } else if !derp.NotFound(err) { - return derp.Wrap(err, location, "Error loading original response", oldResponse) + // RULE: If the "new" response is empty, then this is a DELETE-ONLY operation. Do not create a new response. + if response.IsEmpty() { + return nil } // Save the Response to the database (response service will automatically publish to ActivityPub and beyond) @@ -268,12 +264,9 @@ func (service *Response) SetResponse(response *model.Response) error { return derp.Wrap(err, location, "Error saving response", response) } - // Responses from local Actors should be published to the Outbox - if !user.IsNew() { - - if err := service.outboxService.Publish(user.UserID, response.URL, response.GetJSONLD()); err != nil { - derp.Report(derp.Wrap(err, location, "Error publishing Response", response)) - } + // Publish the new Response to the Outbox, sending "Like" notifications to all followers. + if err := service.outboxService.Publish(user.UserID, response.URL, response.GetJSONLD()); err != nil { + derp.Report(derp.Wrap(err, location, "Error publishing Response", response)) } // Oye cómo va! diff --git a/tools/ascache/client.go b/tools/ascache/client.go index 2191ac306..4532412d3 100644 --- a/tools/ascache/client.go +++ b/tools/ascache/client.go @@ -9,7 +9,6 @@ import ( "github.com/benpate/exp" "github.com/benpate/hannibal/streams" "github.com/benpate/hannibal/vocab" - "github.com/davecgh/go-spew/spew" "github.com/rs/zerolog/log" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" @@ -131,8 +130,6 @@ func (client *Client) Delete(uri string) error { const location = "ascache.Client.Delete" - spew.Dump(location, uri) - // NPE Check if client.collection == nil { return derp.NewInternalError(location, "Document Collection not initialized") @@ -143,7 +140,6 @@ func (client *Client) Delete(uri string) error { // If there's nothing in the cache, then there's nothing to delete if derp.NotFound(err) { - spew.Dump(err) return nil }