Skip to content

Commit

Permalink
Support ActivityStream statistics
Browse files Browse the repository at this point in the history
  • Loading branch information
benpate committed Nov 26, 2023
1 parent 56c75cf commit 7d01ec2
Show file tree
Hide file tree
Showing 11 changed files with 118 additions and 136 deletions.
2 changes: 1 addition & 1 deletion handler/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func renderMessage(serverFactory *server.Factory, actionMethod render.ActionMeth
actionID := first.String(context.Param("action"), "view")

// Create the new Renderer
renderer, err := render.NewMessage(factory, context.Request(), context.Response(), inboxService, &message, actionID)
renderer, err := render.NewMessage(factory, context.Request(), context.Response(), factory.Inbox(), factory.ActivityStreams(), &message, actionID)

if err != nil {
return derp.Wrap(err, location, "Error creating renderer")
Expand Down
12 changes: 6 additions & 6 deletions model/linkOrigin.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import (
// OriginLink represents the original source of a stream that has been imported into Emissary.
// This could be an external ActivityPub server, RSS Feed, or Tweet.
type OriginLink struct {
Type string `json:"type" bson:"type,omitempty"` // The type of message that this document (DIRECT, LIKE, DISLIKE, REPLY, ANNOUNCE)
FollowingID primitive.ObjectID `json:"followinglId" bson:"followingId,omitempty"` // Unique ID of a document in this database
Label string `json:"label" bson:"label,omitempty"` // Human-friendly label of the origin
URL string `json:"url" bson:"url,omitempty"` // Public URL of the origin
ProfileURL string `json:"profileUrl" bson:"profileUrl,omitempty"` // URL of the profile image for this document's image
ImageURL string `json:"imageUrl" bson:"imageUrl,omitempty"` // URL of the cover image for this document's image
Type string `json:"type" bson:"type,omitempty"` // The type of message that this document (DIRECT, LIKE, DISLIKE, REPLY, ANNOUNCE)
FollowingID primitive.ObjectID `json:"followingId" bson:"followingId,omitempty"` // Unique ID of a document in this database
Label string `json:"label" bson:"label,omitempty"` // Human-friendly label of the origin
URL string `json:"url" bson:"url,omitempty"` // Public URL of the origin
ProfileURL string `json:"profileUrl" bson:"profileUrl,omitempty"` // URL of the profile image for this document's image
ImageURL string `json:"imageUrl" bson:"imageUrl,omitempty"` // URL of the cover image for this document's image
}

// NewOriginLink returns a fully initialized OriginLink
Expand Down
27 changes: 0 additions & 27 deletions model/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,33 +246,6 @@ func increment(value *int) {
*value++
}

// Map returns a mep representation of this object
func (message Message) Map() map[string]any {

return map[string]any{
"messageId": message.MessageID,
"userId": message.UserID,
"followingId": message.FollowingID,
"folderId": message.FolderID,
"socialRole": message.SocialRole,
"origin": message.Origin,
"references": message.References,
"url": message.URL,
"label": message.Label,
"summary": message.Summary,
"imageUrl": message.ImageURL,
"attributedTo": message.AttributedTo,
"inReplyTyo": message.InReplyTo,
"contentHtml": message.ContentHTML,
"resonses": message.Responses,
"myResponse": message.MyResponse,
"muted": message.Muted,
"readDate": message.ReadDate,
"publishDate": message.PublishDate,
"rank": message.Rank,
}
}

/******************************************
* Mastodon API
******************************************/
Expand Down
11 changes: 11 additions & 0 deletions model/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,21 @@ func (response *Response) CalcContent() {
// Nothin to return.
}

// CreateDateSeconds returns the CreateDate in Unix Epoch seconds (instead of milliseconds)
func (response Response) CreateDateSeconds() int64 {
return response.CreateDate / 1000
}

// IsEmpty returns TRUE if this Response has no data in it.
func (response Response) IsEmpty() bool {
return response.Type == ""
}

// NotEmpty returns TRUE if this Response has data in it.
func (response Response) NotEmpty() bool {
return !response.IsEmpty()
}

/******************************************
* Mastodon API
******************************************/
Expand Down
27 changes: 19 additions & 8 deletions render/renderer_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ import (

// Message renders individual messages from a User's Inbox.
type Message struct {
_service *service.Inbox
_message *model.Message
_service *service.Inbox
_message *model.Message
_activityStreams *service.ActivityStreams
Common
}

// NewMessage returns a fully initialized `Message` renderer.
func NewMessage(factory Factory, request *http.Request, response http.ResponseWriter, modelService *service.Inbox, message *model.Message, actionID string) (Message, error) {
func NewMessage(factory Factory, request *http.Request, response http.ResponseWriter, modelService *service.Inbox, activityStreamsService *service.ActivityStreams, message *model.Message, actionID string) (Message, error) {

const location = "render.NewMessage"

Expand Down Expand Up @@ -61,9 +62,10 @@ func NewMessage(factory Factory, request *http.Request, response http.ResponseWr
}
}
return Message{
_service: modelService,
_message: message,
Common: common,
_service: modelService,
_message: message,
_activityStreams: activityStreamsService,
Common: common,
}, nil
}

Expand Down Expand Up @@ -131,7 +133,7 @@ func (w Message) View(actionID string) (template.HTML, error) {
const location = "render.Message.View"

// Create a new renderer (this will also validate the user's permissions)
subStream, err := NewMessage(w._factory, w._request, w._response, w._service, w._message, actionID)
subStream, err := NewMessage(w._factory, w._request, w._response, w._service, w._activityStreams, w._message, actionID)

if err != nil {
return template.HTML(""), derp.Wrap(err, location, "Error creating sub-renderer")
Expand All @@ -146,21 +148,30 @@ func (w Message) templateRole() string {
}

func (w Message) clone(action string) (Renderer, error) {
return NewMessage(w._factory, w._request, w._response, w._service, w._message, action)
return NewMessage(w._factory, w._request, w._response, w._service, w._activityStreams, w._message, action)
}

/******************************************
* Data Access Methods
******************************************/

// MessageID returns the inbox message ID for the object
func (w Message) MessageID() string {
return w._message.MessageID.Hex()
}

// URL returns the public URL for the object
func (w Message) URL() string {
return w._message.URL
}

// ActivityStream returns a hannibal Document that this message wraps
func (w Message) ActivityStream() streams.Document {
result, err := w._activityStreams.Load(w._message.URL)
derp.Report(err)
return result
}

func (w Message) AttributedTo() model.PersonLink {
return w._message.AttributedTo
}
Expand Down
2 changes: 0 additions & 2 deletions render/renderer_oAuthAuthorization.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"github.com/EmissarySocial/emissary/model"
"github.com/EmissarySocial/emissary/service"
"github.com/benpate/derp"
"github.com/davecgh/go-spew/spew"
"go.mongodb.org/mongo-driver/bson/primitive"
)

Expand Down Expand Up @@ -64,7 +63,6 @@ func (r OAuthAuthorization) Website() string {
}

func (r OAuthAuthorization) RedirectURI() string {
spew.Dump(r._request)
return r._request.RedirectURI
}

Expand Down
71 changes: 43 additions & 28 deletions service/activityStreams.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
// ActivityStreams implements the Hannibal HTTP client interface, and provides a cache for ActivityStreams documents.
type ActivityStreams struct {
documentCollection data.Collection
innerClient streams.Client
innerClient *ascache.Client
}

/******************************************
Expand All @@ -29,7 +29,7 @@ func NewActivityStreams() ActivityStreams {
}

// Refresh updates the ActivityStreams service with new dependencies
func (service *ActivityStreams) Refresh(innerClient streams.Client, documentCollection data.Collection) {
func (service *ActivityStreams) Refresh(innerClient *ascache.Client, documentCollection data.Collection) {
service.innerClient = innerClient
service.documentCollection = documentCollection
}
Expand All @@ -50,6 +50,18 @@ func (service *ActivityStreams) Load(uri string, options ...any) (streams.Docume
return service.innerClient.Load(uri, options...)
}

// Delete removes a single document from the database by its URL
func (service *ActivityStreams) Delete(uri string) error {

const location = "service.ActivityStreams.Delete"

if err := service.innerClient.Delete(uri); err != nil {
return derp.Wrap(err, location, "Error deleting document from cache", uri)
}

return nil
}

/******************************************
* Custom Behaviors
******************************************/
Expand All @@ -62,9 +74,8 @@ func (service *ActivityStreams) PurgeCache() error {
return derp.NewInternalError("service.ActivityStreams.PurgeCache", "Document Collection not initialized")
}

criteria := exp.LessThan("expires", time.Now().Unix())

// Purge all expired Documents
criteria := exp.LessThan("expires", time.Now().Unix())
if err := service.documentCollection.HardDelete(criteria); err != nil {
return derp.Wrap(err, "service.ActivityStreams.PurgeCache", "Error purging documents")
}
Expand All @@ -76,56 +87,56 @@ func (service *ActivityStreams) PurgeCache() error {
* Custom Query Methods
******************************************/

// DeleteDocumentByURL removes a single document from the database by its URL
func (service *ActivityStreams) DeleteDocumentByURL(url string) error {

// NPE Check
if service.documentCollection == nil {
return derp.NewInternalError("service.ActivityStreams.DeleteDocumentByURL", "Document Collection not initialized")
}

// Forward request to documentCollection
return service.documentCollection.HardDelete(exp.Equal("uri", url))
}

// QueryRepliesBeforeDate returns a slice of streams.Document values that are replies to the specified document, and were published before the specified date.
func (service *ActivityStreams) QueryRepliesBeforeDate(inReplyTo string, maxDate int64, maxRows int) (streams.Document, error) {

const location = "service.ActivityStreams.QueryRepliesBeforeDate"

// NPE Check
if service.documentCollection == nil {
return streams.Document{}, derp.NewInternalError("service.ActivityStreams.QueryRepliesBeforeDate", "Document Collection not initialized")
return streams.Document{}, derp.NewInternalError(location, "Document Collection not initialized")
}

// Build the query
criteria := exp.
Equal("inReplyTo", inReplyTo).
Equal("relationType", "Reply").
AndEqual("relationHref", inReplyTo).
AndLessThan("published", maxDate)

results, err := service.documentQuery(criteria, option.SortDesc("published"), option.MaxRows(int64(maxRows)))

if err != nil {
return streams.Document{}, derp.Wrap(err, location, "Error querying database")
}

// Return the results as a streams.Document / collection
return streams.NewDocument(results.Reverse(), streams.WithClient(service)),
derp.Wrap(err, "service.ActivityStreams.QueryRepliesAfterDate", "Error querying database")
return streams.NewDocument(results.Reverse(), streams.WithClient(service)), nil
}

// QueryRepliesAfterDate returns a slice of streams.Document values that are replies to the specified document, and were published after the specified date.
func (service *ActivityStreams) QueryRepliesAfterDate(inReplyTo string, minDate int64, maxRows int) (streams.Document, error) {

const location = "service.ActivityStreams.QueryRepliesAfterDate"

// NPE Check
if service.documentCollection == nil {
return streams.Document{}, derp.NewInternalError("service.ActivityStreams.QueryRepliesAfterDate", "Document Collection not initialized")
return streams.Document{}, derp.NewInternalError(location, "Document Collection not initialized")
}

// Build the query
criteria := exp.
Equal("inReplyTo", inReplyTo).
Equal("relationType", "Reply").
AndEqual("relationHref", inReplyTo).
AndGreaterThan("published", minDate)

results, err := service.documentQuery(criteria, option.SortAsc("published"), option.MaxRows(int64(maxRows)))

if err != nil {
return streams.Document{}, derp.Wrap(err, location, "Error querying database")
}

// Return the result as a streams.Document / collection
return streams.NewDocument(results, streams.WithClient(service)),
derp.Wrap(err, "service.ActivityStreams.QueryRepliesAfterDate", "Error querying database")
return streams.NewDocument(results, streams.WithClient(service)), nil
}

/******************************************
Expand All @@ -135,9 +146,11 @@ func (service *ActivityStreams) QueryRepliesAfterDate(inReplyTo string, minDate
// iterator reads from the database and returns a data.Iterator with the result values.
func (service *ActivityStreams) documentIterator(criteria exp.Expression, options ...option.Option) (data.Iterator, error) {

const location = "service.ActivityStreams.documentIterator"

// NPE Check
if service.documentCollection == nil {
return nil, derp.NewInternalError("service.ActivityStreams.documentIterator", "Document Collection not initialized")
return nil, derp.NewInternalError(location, "Document Collection not initialized")
}

// Forward request to documentCollection
Expand All @@ -147,16 +160,18 @@ func (service *ActivityStreams) documentIterator(criteria exp.Expression, option
// query reads from the database and returns a slice of streams.Document values
func (service *ActivityStreams) documentQuery(criteria exp.Expression, options ...option.Option) (sliceof.Object[mapof.Any], error) {

const location = "service.ActivityStreams.documentQuery"

// NPE Check
if service.documentCollection == nil {
return nil, derp.NewInternalError("service.ActivityStreams.documentQuery", "Document Collection not initialized")
return nil, derp.NewInternalError(location, "Document Collection not initialized")
}

// Create the Iterator
iterator, err := service.documentIterator(criteria, options...)

if err != nil {
return nil, derp.Wrap(err, "service.ActivityStreams.Query", "Error querying database")
return nil, derp.Wrap(err, location, "Error querying database")
}

// Initialize result slice
Expand All @@ -169,7 +184,7 @@ func (service *ActivityStreams) documentQuery(criteria exp.Expression, options .
value = ascache.NewCachedValue()

if err := iterator.Error(); err != nil {
return nil, derp.Wrap(err, "emisary.tools.cache.Client.Query", "Error during iteration")
return nil, derp.Wrap(err, location, "Error during iteration")
}
}

Expand Down
26 changes: 3 additions & 23 deletions service/outbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,41 +104,21 @@ func (service *Outbox) Save(outboxMessage *model.OutboxMessage, note string) err
// Delete removes an Outbox from the database (virtual delete)
func (service *Outbox) Delete(outboxMessage *model.OutboxMessage, note string) error {

// Delete Outbox record last.
// Delete the message from the outbox
criteria := exp.Equal("_id", outboxMessage.OutboxMessageID)

if err := service.collection.HardDelete(criteria); err != nil {
return derp.Wrap(err, "service.Outbox", "Error deleting Outbox", outboxMessage, note)
}

if err := service.activityStreamsService.DeleteDocumentByURL(outboxMessage.URL); err != nil {
// Delete the document from the cache
if err := service.activityStreamsService.Delete(outboxMessage.URL); err != nil {
return derp.Wrap(err, "service.Outbox", "Error deleting ActivityStream", outboxMessage, note)
}

return nil
}

// DeleteMany removes all child streams from the provided stream (virtual delete)
func (service *Outbox) DeleteMany(criteria exp.Expression, note string) error {

it, err := service.List(criteria)

if err != nil {
return derp.Wrap(err, "service.Message.Delete", "Error listing streams to delete", criteria)
}

outboxMessage := model.NewOutboxMessage()

for it.Next(&outboxMessage) {
if err := service.Delete(&outboxMessage, note); err != nil {
return derp.Wrap(err, "service.Message.Delete", "Error deleting outboxMessage", outboxMessage)
}
outboxMessage = model.NewOutboxMessage()
}

return nil
}

/******************************************
* Custom Query Methods
******************************************/
Expand Down
Loading

0 comments on commit 7d01ec2

Please sign in to comment.