diff --git a/cmd/orb-server/startcmd/params.go b/cmd/orb-server/startcmd/params.go index 809aa31c6..2f8fda2c2 100644 --- a/cmd/orb-server/startcmd/params.go +++ b/cmd/orb-server/startcmd/params.go @@ -121,7 +121,9 @@ const ( defaultTaskMgrCheckInterval = 10 * time.Second defaultDataExpiryCheckInterval = time.Minute defaultAnchorSyncInterval = time.Minute - defaultAnchorSyncMinActivityAge = time.Minute + defaultAnchorSyncAcceleratedInterval = 15 * time.Second + defaultAnchorSyncMinActivityAge = 10 * time.Minute + defaultAnchorSyncMaxActivities = 500 defaultVCTProofMonitoringInterval = 10 * time.Second defaultVCTLogMonitoringInterval = 10 * time.Second defaultVCTLogMonitoringMaxTreeSize = 50000 @@ -632,10 +634,22 @@ const ( "this service is following. Defaults to 1m if not set. " + commonEnvVarUsageText + anchorSyncIntervalEnvKey + anchorSyncMaxActivitiesFlagName = "sync-max-activities" + anchorSyncMaxActivitiesEnvKey = "ANCHOR_EVENT_SYNC_MAX_ACTIVITIES" + anchorSyncMaxActivitiesFlagUsage = "The maximum number of activities to be synchronized in a single task run. Defaults to 500 if not set. " + + commonEnvVarUsageText + anchorSyncMaxActivitiesEnvKey + + anchorSyncAcceleratedIntervalFlagName = "sync-accelerated-interval" + anchorSyncAcceleratedIntervalEnvKey = "ANCHOR_EVENT_SYNC_ACCELERATED_INTERVAL" + anchorSyncNextIntervalFlagUsage = "The interval in which to run the activity sync task after the maximum number of activities " + + "(specified by sync-max-activities) have been processed for the current task run. This should be smaller than the default interval " + + "in order to accelerate processing. Defaults to 15s if not set. " + + commonEnvVarUsageText + anchorSyncAcceleratedIntervalEnvKey + anchorSyncMinActivityAgeFlagName = "sync-min-activity-age" anchorSyncMinActivityAgeEnvKey = "ANCHOR_EVENT_SYNC_MIN_ACTIVITY_AGE" anchorSyncMinActivityAgeFlagUsage = "The minimum age of an activity to be synchronized. The activity will be " + - "processed only if its age is greater than this value. Defaults to 1m if not set. " + + "processed only if its age is greater than this value. Defaults to 10m if not set. " + commonEnvVarUsageText + anchorSyncMinActivityAgeEnvKey activityPubClientCacheSizeFlagName = "apclient-cache-size" @@ -1584,13 +1598,15 @@ func getVCTParams(cmd *cobra.Command) (*vctParams, error) { } type activityPubParams struct { - pageSize int - anchorSyncPeriod time.Duration - anchorSyncMinActivityAge time.Duration - clientCacheSize int - clientCacheExpiration time.Duration - iriCacheSize int - iriCacheExpiration time.Duration + pageSize int + anchorSyncPeriod time.Duration + anchorSyncAcceleratedPeriod time.Duration + anchorSyncMinActivityAge time.Duration + anchorSyncMaxActivities int + clientCacheSize int + clientCacheExpiration time.Duration + iriCacheSize int + iriCacheExpiration time.Duration } func getActivityPubParams(cmd *cobra.Command) (*activityPubParams, error) { @@ -1599,7 +1615,7 @@ func getActivityPubParams(cmd *cobra.Command) (*activityPubParams, error) { return nil, fmt.Errorf("%s: %w", activityPubPageSizeFlagName, err) } - syncPeriod, minActivityAge, err := getAnchorSyncParameters(cmd) + syncPeriod, acceleratedSyncPeriod, minActivityAge, maxActivities, err := getAnchorSyncParameters(cmd) if err != nil { return nil, err } @@ -1615,13 +1631,15 @@ func getActivityPubParams(cmd *cobra.Command) (*activityPubParams, error) { } return &activityPubParams{ - pageSize: activityPubPageSize, - anchorSyncPeriod: syncPeriod, - anchorSyncMinActivityAge: minActivityAge, - clientCacheSize: apClientCacheSize, - clientCacheExpiration: apClientCacheExpiration, - iriCacheSize: apIRICacheSize, - iriCacheExpiration: apIRICacheExpiration, + pageSize: activityPubPageSize, + anchorSyncPeriod: syncPeriod, + anchorSyncAcceleratedPeriod: acceleratedSyncPeriod, + anchorSyncMinActivityAge: minActivityAge, + anchorSyncMaxActivities: maxActivities, + clientCacheSize: apClientCacheSize, + clientCacheExpiration: apClientCacheExpiration, + iriCacheSize: apIRICacheSize, + iriCacheExpiration: apIRICacheExpiration, }, nil } @@ -2182,19 +2200,33 @@ func getActivityPubCacheParameters(cmd *cobra.Command, params *cacheParams) (int return cacheSize, cacheExpiration, nil } -func getAnchorSyncParameters(cmd *cobra.Command) (syncPeriod, minActivityAge time.Duration, err error) { +func getAnchorSyncParameters(cmd *cobra.Command) (syncPeriod, acceleratedSyncPeriod, + minActivityAge time.Duration, maxActivities int, err error, +) { syncPeriod, err = cmdutil.GetDuration(cmd, anchorSyncIntervalFlagName, anchorSyncIntervalEnvKey, defaultAnchorSyncInterval) if err != nil { - return 0, 0, fmt.Errorf("%s: %w", anchorSyncIntervalFlagName, err) + return 0, 0, 0, 0, fmt.Errorf("%s: %w", anchorSyncIntervalFlagName, err) + } + + acceleratedSyncPeriod, err = cmdutil.GetDuration(cmd, anchorSyncAcceleratedIntervalFlagName, anchorSyncAcceleratedIntervalEnvKey, + defaultAnchorSyncAcceleratedInterval) + if err != nil { + return 0, 0, 0, 0, fmt.Errorf("%s: %w", anchorSyncIntervalFlagName, err) } minActivityAge, err = cmdutil.GetDuration(cmd, anchorSyncMinActivityAgeFlagName, anchorSyncMinActivityAgeEnvKey, defaultAnchorSyncMinActivityAge) if err != nil { - return 0, 0, fmt.Errorf("%s: %w", anchorSyncMinActivityAgeFlagName, err) + return 0, 0, 0, 0, fmt.Errorf("%s: %w", anchorSyncMinActivityAgeFlagName, err) + } + + maxActivities, err = cmdutil.GetInt(cmd, anchorSyncMaxActivitiesFlagName, anchorSyncMaxActivitiesEnvKey, + defaultAnchorSyncMaxActivities) + if err != nil { + return 0, 0, 0, 0, fmt.Errorf("%s: %w", anchorSyncMinActivityAgeFlagName, err) } - return syncPeriod, minActivityAge, nil + return syncPeriod, acceleratedSyncPeriod, minActivityAge, maxActivities, nil } func newAPServiceParams(apServiceID, externalEndpoint string, @@ -2377,6 +2409,8 @@ func createFlags(startCmd *cobra.Command) { startCmd.Flags().StringP(httpTimeoutFlagName, "", "", httpTimeoutFlagUsage) startCmd.Flags().StringP(httpDialTimeoutFlagName, "", "", httpDialTimeoutFlagUsage) startCmd.Flags().StringP(anchorSyncIntervalFlagName, anchorSyncIntervalFlagShorthand, "", anchorSyncIntervalFlagUsage) + startCmd.Flags().StringP(anchorSyncAcceleratedIntervalFlagName, "", "", anchorSyncNextIntervalFlagUsage) + startCmd.Flags().StringP(anchorSyncMaxActivitiesFlagName, "", "", anchorSyncMaxActivitiesFlagUsage) startCmd.Flags().StringP(anchorSyncMinActivityAgeFlagName, "", "", anchorSyncMinActivityAgeFlagUsage) startCmd.Flags().StringP(vctProofMonitoringIntervalFlagName, "", "", vctProofMonitoringIntervalFlagUsage) startCmd.Flags().StringP(vctProofMonitoringExpiryPeriodFlagName, "", "", vctProofMonitoringExpiryPeriodFlagUsage) diff --git a/cmd/orb-server/startcmd/start.go b/cmd/orb-server/startcmd/start.go index aeeb3338e..910640e35 100644 --- a/cmd/orb-server/startcmd/start.go +++ b/cmd/orb-server/startcmd/start.go @@ -873,9 +873,11 @@ func startOrbServices(parameters *orbParameters) error { err = anchorsynctask.Register( anchorsynctask.Config{ - ServiceIRI: parameters.apServiceParams.serviceIRI(), - Interval: parameters.activityPub.anchorSyncPeriod, - MinActivityAge: parameters.activityPub.anchorSyncMinActivityAge, + ServiceIRI: parameters.apServiceParams.serviceIRI(), + Interval: parameters.activityPub.anchorSyncPeriod, + AcceleratedInterval: parameters.activityPub.anchorSyncAcceleratedPeriod, + MinActivityAge: parameters.activityPub.anchorSyncMinActivityAge, + MaxActivitiesToSync: parameters.activityPub.anchorSyncMaxActivities, }, taskMgr, apClient, apStore, storeProviders.provider, func() apspi.InboxHandler { diff --git a/internal/pkg/log/fields.go b/internal/pkg/log/fields.go index 39e9412c5..b99c7e88d 100644 --- a/internal/pkg/log/fields.go +++ b/internal/pkg/log/fields.go @@ -19,135 +19,138 @@ import ( // Log Fields. const ( - FieldURI = "uri" - FieldURIs = "uris" - FieldURL = "url" - FieldSenderURL = "sender" - FieldConfig = "config" - FieldServiceName = "service" - FieldServiceIRI = "serviceIri" - FieldServiceEndpoint = "serviceEndpoint" - FieldActorID = "actorId" - FieldOriginActorID = "originActorId" - FieldActivityType = "activityType" - FieldActivityID = "activityId" - FieldMessageID = "messageId" - FieldData = "data" - FieldMetadata = "metadata" - FieldRequestURL = "requestUrl" - FieldRequestHeaders = "requestHeaders" - FieldRequestBody = "requestBody" - FieldSize = "size" - FieldMaxSize = "maxSize" - FieldCacheExpiration = "cacheExpiration" - FieldCacheRefreshInterval = "cacheRefreshInterval" - FieldCacheRefreshAttempts = "cacheRefreshAttempts" - FieldTarget = "target" - FieldTargets = "targets" - FieldHTTPMethod = "httpMethod" - FieldParameter = "parameter" - FieldParameters = "parameters" - FieldAcceptListType = "acceptListType" - FieldAdditions = "additions" - FieldDeletions = "deletions" - FieldReferenceType = "referenceType" - FieldAnchorURI = "anchorUri" - FieldAnchorURIs = "anchorURIs" - FieldAnchorHash = "anchorHash" - FieldAnchorEventURI = "anchorEventUri" - FieldObjectIRI = "objectIri" - FieldReferenceIRI = "reference" - FieldKeyID = "keyId" - FieldKeyType = "keyType" - FieldKeyOwner = "keyOwner" - FieldCurrent = "current" - FieldNext = "next" - FieldTotal = "total" - FieldMinimum = "minimum" - FieldType = "type" - FieldQuery = "query" - FieldSuffix = "suffix" - FieldSuffixes = "suffixes" - FieldVerifiableCredential = "vc" - FieldVerifiableCredentialID = "vcId" - FieldHash = "hash" - FieldHashlink = "hashlink" - FieldLocalHashlink = "localHashlink" - FieldParent = "parent" - FieldParents = "parents" - FieldProof = "proof" - FieldCreatedTime = "createdTime" - FieldWitnessURI = "witnessUri" - FieldWitnessURIs = "witnessURIs" - FieldWitnessPolicy = "witnessPolicy" - FieldAnchorOrigin = "anchorOrigin" - FieldAnchorOriginEndpoint = "anchorOriginEndpoint" - FieldOperationType = "operationType" - FieldOperation = "operation" - FieldCoreIndex = "coreIndex" - FieldKey = "key" - FieldValue = "value" - FieldCID = "cid" - FieldResolvedCID = "resolvedCid" - FieldAnchorCID = "anchorCid" - FieldCIDVersion = "cidVersion" - FieldMultihash = "multihash" - FieldCASData = "casData" - FieldDomain = "domain" - FieldLink = "link" - FieldLinks = "links" - FieldTaskMgrInstanceID = "taskMgrInstanceId" - FieldTaskID = "taskId" - FieldRetries = "retries" - FieldMaxRetries = "maxRetries" - FieldSubscriberPoolSize = "subscriberPoolSize" - FieldTaskMonitorInterval = "taskMonitorInterval" - FieldTaskExpiration = "taskExpiration" - FieldDeliveryDelay = "deliveryDelay" - FieldOperationID = "operationId" - FieldPermitHolder = "permitHolder" - FieldTimeSinceLastUpdate = "timeSinceLastUpdate" - FieldGenesisTime = "genesisTime" - FieldSidetreeProtocol = "sidetreeProtocol" - FieldSidetreeTxn = "sidetreeTxn" - FieldDID = "did" - FieldHRef = "href" - FieldID = "id" - FieldResource = "resource" - FieldResolutionResult = "resolutionResult" - FieldResolutionModel = "resolutionModel" - FieldResolutionEndpoints = "resolutionEndpoints" - FieldAuthToken = "authToken" - FieldAuthTokens = "authTokens" - FieldAddress = "address" - FieldAttributedTo = "attributedTo" - FieldAnchorLink = "anchorLink" - FieldAnchorLinkset = "anchorLinkset" - FieldVersion = "version" - FieldDeliveryAttempts = "deliveryAttempts" - FieldProperty = "property" - FieldStorageName = "storeName" - FieldIssuer = "issuer" - FieldStatus = "status" - FieldLogURL = "logURL" - FieldNamespace = "namespace" - FieldCanonicalRef = "canonicalRef" - FieldAnchorString = "anchorString" - FieldJRD = "jrd" - FieldBackoff = "backoff" - FieldTimeout = "timeout" - FieldMaxTime = "maxTime" - FieldLogMonitor = "logMonitor" - FieldLogMonitors = "logMonitors" - FieldIndex = "index" - FieldFromIndex = "fromIndex" - FieldToIndex = "toIndex" - FieldSource = "source" - FieldAge = "age" - FieldMinAge = "minAge" - FieldLogSpec = "logSpec" - FieldTracingProvider = "tracingProvider" - FieldMaxOperationsToRepost = "maxOperationsToRepost" + FieldURI = "uri" + FieldURIs = "uris" + FieldURL = "url" + FieldSenderURL = "sender" + FieldConfig = "config" + FieldServiceName = "service" + FieldServiceIRI = "serviceIri" + FieldServiceEndpoint = "serviceEndpoint" + FieldActorID = "actorId" + FieldOriginActorID = "originActorId" + FieldActivityType = "activityType" + FieldActivityID = "activityId" + FieldMessageID = "messageId" + FieldData = "data" + FieldMetadata = "metadata" + FieldRequestURL = "requestUrl" + FieldRequestHeaders = "requestHeaders" + FieldRequestBody = "requestBody" + FieldSize = "size" + FieldMaxSize = "maxSize" + FieldCacheExpiration = "cacheExpiration" + FieldCacheRefreshInterval = "cacheRefreshInterval" + FieldCacheRefreshAttempts = "cacheRefreshAttempts" + FieldTarget = "target" + FieldTargets = "targets" + FieldHTTPMethod = "httpMethod" + FieldParameter = "parameter" + FieldParameters = "parameters" + FieldAcceptListType = "acceptListType" + FieldAdditions = "additions" + FieldDeletions = "deletions" + FieldReferenceType = "referenceType" + FieldAnchorURI = "anchorUri" + FieldAnchorURIs = "anchorURIs" + FieldAnchorHash = "anchorHash" + FieldAnchorEventURI = "anchorEventUri" + FieldObjectIRI = "objectIri" + FieldReferenceIRI = "reference" + FieldKeyID = "keyId" + FieldKeyType = "keyType" + FieldKeyOwner = "keyOwner" + FieldCurrent = "current" + FieldNext = "next" + FieldTotal = "total" + FieldMinimum = "minimum" + FieldType = "type" + FieldQuery = "query" + FieldSuffix = "suffix" + FieldSuffixes = "suffixes" + FieldVerifiableCredential = "vc" + FieldVerifiableCredentialID = "vcId" + FieldHash = "hash" + FieldHashlink = "hashlink" + FieldLocalHashlink = "localHashlink" + FieldParent = "parent" + FieldParents = "parents" + FieldProof = "proof" + FieldCreatedTime = "createdTime" + FieldWitnessURI = "witnessUri" + FieldWitnessURIs = "witnessURIs" + FieldWitnessPolicy = "witnessPolicy" + FieldAnchorOrigin = "anchorOrigin" + FieldAnchorOriginEndpoint = "anchorOriginEndpoint" + FieldOperationType = "operationType" + FieldOperation = "operation" + FieldCoreIndex = "coreIndex" + FieldKey = "key" + FieldValue = "value" + FieldCID = "cid" + FieldResolvedCID = "resolvedCid" + FieldAnchorCID = "anchorCid" + FieldCIDVersion = "cidVersion" + FieldMultihash = "multihash" + FieldCASData = "casData" + FieldDomain = "domain" + FieldLink = "link" + FieldLinks = "links" + FieldTaskMgrInstanceID = "taskMgrInstanceId" + FieldTaskID = "taskId" + FieldRetries = "retries" + FieldMaxRetries = "maxRetries" + FieldSubscriberPoolSize = "subscriberPoolSize" + FieldTaskMonitorInterval = "taskMonitorInterval" + FieldTaskExpiration = "taskExpiration" + FieldDeliveryDelay = "deliveryDelay" + FieldOperationID = "operationId" + FieldPermitHolder = "permitHolder" + FieldTimeSinceLastUpdate = "timeSinceLastUpdate" + FieldGenesisTime = "genesisTime" + FieldSidetreeProtocol = "sidetreeProtocol" + FieldSidetreeTxn = "sidetreeTxn" + FieldDID = "did" + FieldHRef = "href" + FieldID = "id" + FieldResource = "resource" + FieldResolutionResult = "resolutionResult" + FieldResolutionModel = "resolutionModel" + FieldResolutionEndpoints = "resolutionEndpoints" + FieldAuthToken = "authToken" + FieldAuthTokens = "authTokens" + FieldAddress = "address" + FieldAttributedTo = "attributedTo" + FieldAnchorLink = "anchorLink" + FieldAnchorLinkset = "anchorLinkset" + FieldVersion = "version" + FieldDeliveryAttempts = "deliveryAttempts" + FieldProperty = "property" + FieldStorageName = "storeName" + FieldIssuer = "issuer" + FieldStatus = "status" + FieldLogURL = "logURL" + FieldNamespace = "namespace" + FieldCanonicalRef = "canonicalRef" + FieldAnchorString = "anchorString" + FieldJRD = "jrd" + FieldBackoff = "backoff" + FieldTimeout = "timeout" + FieldMaxTime = "maxTime" + FieldLogMonitor = "logMonitor" + FieldLogMonitors = "logMonitors" + FieldIndex = "index" + FieldFromIndex = "fromIndex" + FieldToIndex = "toIndex" + FieldSource = "source" + FieldAge = "age" + FieldMinAge = "minAge" + FieldLogSpec = "logSpec" + FieldTracingProvider = "tracingProvider" + FieldMaxOperationsToRepost = "maxOperationsToRepost" + FieldMaxActivitiesToSync = "maxActivitiesToSync" + FieldNumActivitiesSynced = "numActivitiesSynced" + FieldNextActivitySyncInterval = "nextActivitySyncInterval" ) // WithMessageID sets the message-id field. @@ -886,6 +889,21 @@ func WithMaxOperationsToRepost(value int) zap.Field { return zap.Int(FieldMaxOperationsToRepost, value) } +// WithMaxActivitiesToSync sets the maxActivitiesToSync field. +func WithMaxActivitiesToSync(value int) zap.Field { + return zap.Int(FieldMaxActivitiesToSync, value) +} + +// WithNumActivitiesSynced sets the maxActivitiesSynced field. +func WithNumActivitiesSynced(value int) zap.Field { + return zap.Int(FieldNumActivitiesSynced, value) +} + +// WithNextActivitySyncInterval sets the nextActivitySyncInterval field. +func WithNextActivitySyncInterval(value time.Duration) zap.Field { + return zap.Duration(FieldNextActivitySyncInterval, value) +} + type jsonMarshaller struct { key string obj interface{} diff --git a/internal/pkg/log/fields_test.go b/internal/pkg/log/fields_test.go index 5c8215fd4..1f62ec264 100644 --- a/internal/pkg/log/fields_test.go +++ b/internal/pkg/log/fields_test.go @@ -62,7 +62,8 @@ func TestStandardFields(t *testing.T) { WithProof([]byte(`{"id":"https://example.com/proof1"}`)), WithCreatedTime(now), WithWitnessURI(u1), WithWitnessURIs(u1, u2), WithWitnessPolicy("some policy"), WithAnchorOrigin(u1.String()), WithOperationType("Create"), WithCoreIndex("1234"), - WithMaxOperationsToRepost(300), + WithMaxOperationsToRepost(300), WithMaxActivitiesToSync(11), WithNextActivitySyncInterval(3*time.Second), + WithNumActivitiesSynced(123), ) t.Logf(stdOut.String()) @@ -119,6 +120,9 @@ func TestStandardFields(t *testing.T) { require.Equal(t, "Create", l.OperationType) require.Equal(t, "1234", l.CoreIndex) require.Equal(t, 300, l.MaxOperationsToRepost) + require.Equal(t, 11, l.MaxActivitiesToSync) + require.Equal(t, "3s", l.NextActivitySyncInterval) + require.Equal(t, 123, l.NumActivitiesSynced) }) t.Run("json fields 2", func(t *testing.T) { @@ -319,132 +323,135 @@ type logData struct { Msg string `json:"msg"` Error string `json:"error"` - MessageID string `json:"messageId"` - Data string `json:"data"` - ActorID string `json:"actorId"` - ActivityID string `json:"activityId"` - ActivityType string `json:"activityType"` - ServiceIri string `json:"serviceIri"` - Service string `json:"service"` - ServiceEndpoint string `json:"serviceEndpoint"` - Size int `json:"size"` - CacheExpiration string `json:"cacheExpiration"` - Target string `json:"target"` - Parameter string `json:"parameter"` - ReferenceType string `json:"referenceType"` - URI string `json:"uri"` - URIs []string `json:"uris"` - Sender string `json:"sender"` - AnchorURI string `json:"anchorUri"` - AnchorEventURI string `json:"anchorEventUri"` - Config *mockObject `json:"config"` - AcceptListType string `json:"acceptListType"` - Additions []string `json:"additions"` - Deletions []string `json:"deletions"` - RequestURL string `json:"requestUrl"` - RequestHeaders map[string][]string `json:"requestHeaders"` - RequestBody string `json:"requestBody"` - ObjectIRI string `json:"objectIri"` - Reference string `json:"reference"` - KeyID string `json:"keyId"` - KeyOwnerID string `json:"keyOwner"` - KeyType string `json:"keyType"` - Current string `json:"current"` - Next string `json:"next"` - Total int `json:"total"` - Minimum int `json:"minimum"` - Type string `json:"type"` - Query *mockObject `json:"query"` - AnchorHash string `json:"anchorHash"` - Suffix string `json:"suffix"` - VerifiableCredential string `json:"vc"` - VerifiableCredentialID string `json:"vcId"` - Hashlink string `json:"hashlink"` - Parent string `json:"parent"` - Parents []string `json:"parents"` - Proof string `json:"proof"` - CreatedTime string `json:"createdTime"` - WitnessURI string `json:"witnessUri"` - WitnessURIs []string `json:"WitnessURIs"` //nolint:tagliatelle - WitnessPolicy string `json:"witnessPolicy"` - AnchorOrigin string `json:"anchorOrigin"` - OperationType string `json:"operationType"` - CoreIndex string `json:"coreIndex"` - Hash string `json:"hash"` - AnchorOriginEndpoint *mockObject `json:"anchorOriginEndpoint"` - Key string `json:"key"` - CID string `json:"cid"` - ResolvedCID string `json:"resolvedCid"` - AnchorCID string `json:"anchorCid"` - CIDVersion int `json:"cidVersion"` - Multihash string `json:"multihash"` - CASData string `json:"casData"` - Domain string `json:"domain"` - Link string `json:"link"` - Links []string `json:"links"` - TaskMgrInstanceID string `json:"taskMgrInstanceId"` - Retries int `json:"retries"` - MaxRetries int `json:"maxRetries"` - SubscriberPoolSize int `json:"subscriberPoolSize"` - TaskMonitorInterval string `json:"taskMonitorInterval"` - TaskExpiration string `json:"taskExpiration"` - DeliveryDelay string `json:"deliveryDelay"` - OperationID string `json:"operationId"` - PermitHolder string `json:"permitHolder"` - TimeSinceLastUpdate string `json:"timeSinceLastUpdate"` - GenesisTime int `json:"genesisTime"` - DID string `json:"did"` - HRef string `json:"href"` - ID string `json:"id"` - Resource string `json:"resource"` - ResolutionResult *mockObject `json:"resolutionResult"` - ResolutionModel *mockObject `json:"resolutionModel"` - ResolutionEndpoints []string `json:"resolutionEndpoints"` - Metadata *mockObject `json:"metadata"` - SidetreeProtocol *mockObject `json:"sidetreeProtocol"` - OriginActorID string `json:"originActorId"` - Targets []string `json:"targets"` - HTTPMethod string `json:"httpMethod"` - Suffixes []string `json:"suffixes"` - LocalHashlink string `json:"localHashlink"` - AuthToken string `json:"authToken"` - AuthTokens []string `json:"authTokens"` - Address string `json:"address"` - AttributedTo string `json:"attributedTo"` - AnchorLinkset string `json:"anchorLinkset"` - Version string `json:"version"` - MaxSize int `json:"maxSize"` - Parameters *mockObject `json:"parameters"` - URL string `json:"url"` - AnchorURIs []string `json:"anchorURIs"` //nolint:tagliatelle - Operation *mockObject `json:"operation"` - Value string `json:"value"` - TaskID string `json:"taskId"` - SidetreeTxn *mockObject `json:"sidetreeTxn"` - AnchorLink string `json:"anchorLink"` - DeliveryAttempts int `json:"deliveryAttempts"` - Property string `json:"property"` - StoreName string `json:"storeName"` - Issuer string `json:"issuer"` - Status string `json:"status"` - LogURL string `json:"logUrl"` - Namespace string `json:"namespace"` - CanonicalRef string `json:"canonicalRef"` - AnchorString string `json:"anchorString"` - JRD *mockObject `json:"jrd"` - Backoff string `json:"backoff"` - Timeout string `json:"timeout"` - LogMonitor *mockObject `json:"logMonitor"` - LogMonitors []*mockObject `json:"logMonitors"` - MaxTime string `json:"maxTime"` - Index int `json:"index"` - FromIndex int `json:"fromIndex"` - ToIndex int `json:"toIndex"` - Source string `json:"source"` - Age string `json:"age"` - MinAge string `json:"minAge"` - LogSpec string `json:"logSpec"` - MaxOperationsToRepost int `json:"maxOperationsToRepost"` + MessageID string `json:"messageId"` + Data string `json:"data"` + ActorID string `json:"actorId"` + ActivityID string `json:"activityId"` + ActivityType string `json:"activityType"` + ServiceIri string `json:"serviceIri"` + Service string `json:"service"` + ServiceEndpoint string `json:"serviceEndpoint"` + Size int `json:"size"` + CacheExpiration string `json:"cacheExpiration"` + Target string `json:"target"` + Parameter string `json:"parameter"` + ReferenceType string `json:"referenceType"` + URI string `json:"uri"` + URIs []string `json:"uris"` + Sender string `json:"sender"` + AnchorURI string `json:"anchorUri"` + AnchorEventURI string `json:"anchorEventUri"` + Config *mockObject `json:"config"` + AcceptListType string `json:"acceptListType"` + Additions []string `json:"additions"` + Deletions []string `json:"deletions"` + RequestURL string `json:"requestUrl"` + RequestHeaders map[string][]string `json:"requestHeaders"` + RequestBody string `json:"requestBody"` + ObjectIRI string `json:"objectIri"` + Reference string `json:"reference"` + KeyID string `json:"keyId"` + KeyOwnerID string `json:"keyOwner"` + KeyType string `json:"keyType"` + Current string `json:"current"` + Next string `json:"next"` + Total int `json:"total"` + Minimum int `json:"minimum"` + Type string `json:"type"` + Query *mockObject `json:"query"` + AnchorHash string `json:"anchorHash"` + Suffix string `json:"suffix"` + VerifiableCredential string `json:"vc"` + VerifiableCredentialID string `json:"vcId"` + Hashlink string `json:"hashlink"` + Parent string `json:"parent"` + Parents []string `json:"parents"` + Proof string `json:"proof"` + CreatedTime string `json:"createdTime"` + WitnessURI string `json:"witnessUri"` + WitnessURIs []string `json:"WitnessURIs"` //nolint:tagliatelle + WitnessPolicy string `json:"witnessPolicy"` + AnchorOrigin string `json:"anchorOrigin"` + OperationType string `json:"operationType"` + CoreIndex string `json:"coreIndex"` + Hash string `json:"hash"` + AnchorOriginEndpoint *mockObject `json:"anchorOriginEndpoint"` + Key string `json:"key"` + CID string `json:"cid"` + ResolvedCID string `json:"resolvedCid"` + AnchorCID string `json:"anchorCid"` + CIDVersion int `json:"cidVersion"` + Multihash string `json:"multihash"` + CASData string `json:"casData"` + Domain string `json:"domain"` + Link string `json:"link"` + Links []string `json:"links"` + TaskMgrInstanceID string `json:"taskMgrInstanceId"` + Retries int `json:"retries"` + MaxRetries int `json:"maxRetries"` + SubscriberPoolSize int `json:"subscriberPoolSize"` + TaskMonitorInterval string `json:"taskMonitorInterval"` + TaskExpiration string `json:"taskExpiration"` + DeliveryDelay string `json:"deliveryDelay"` + OperationID string `json:"operationId"` + PermitHolder string `json:"permitHolder"` + TimeSinceLastUpdate string `json:"timeSinceLastUpdate"` + GenesisTime int `json:"genesisTime"` + DID string `json:"did"` + HRef string `json:"href"` + ID string `json:"id"` + Resource string `json:"resource"` + ResolutionResult *mockObject `json:"resolutionResult"` + ResolutionModel *mockObject `json:"resolutionModel"` + ResolutionEndpoints []string `json:"resolutionEndpoints"` + Metadata *mockObject `json:"metadata"` + SidetreeProtocol *mockObject `json:"sidetreeProtocol"` + OriginActorID string `json:"originActorId"` + Targets []string `json:"targets"` + HTTPMethod string `json:"httpMethod"` + Suffixes []string `json:"suffixes"` + LocalHashlink string `json:"localHashlink"` + AuthToken string `json:"authToken"` + AuthTokens []string `json:"authTokens"` + Address string `json:"address"` + AttributedTo string `json:"attributedTo"` + AnchorLinkset string `json:"anchorLinkset"` + Version string `json:"version"` + MaxSize int `json:"maxSize"` + Parameters *mockObject `json:"parameters"` + URL string `json:"url"` + AnchorURIs []string `json:"anchorURIs"` //nolint:tagliatelle + Operation *mockObject `json:"operation"` + Value string `json:"value"` + TaskID string `json:"taskId"` + SidetreeTxn *mockObject `json:"sidetreeTxn"` + AnchorLink string `json:"anchorLink"` + DeliveryAttempts int `json:"deliveryAttempts"` + Property string `json:"property"` + StoreName string `json:"storeName"` + Issuer string `json:"issuer"` + Status string `json:"status"` + LogURL string `json:"logUrl"` + Namespace string `json:"namespace"` + CanonicalRef string `json:"canonicalRef"` + AnchorString string `json:"anchorString"` + JRD *mockObject `json:"jrd"` + Backoff string `json:"backoff"` + Timeout string `json:"timeout"` + LogMonitor *mockObject `json:"logMonitor"` + LogMonitors []*mockObject `json:"logMonitors"` + MaxTime string `json:"maxTime"` + Index int `json:"index"` + FromIndex int `json:"fromIndex"` + ToIndex int `json:"toIndex"` + Source string `json:"source"` + Age string `json:"age"` + MinAge string `json:"minAge"` + LogSpec string `json:"logSpec"` + MaxOperationsToRepost int `json:"maxOperationsToRepost"` + MaxActivitiesToSync int `json:"maxActivitiesToSync"` + NextActivitySyncInterval string `json:"nextActivitySyncInterval"` + NumActivitiesSynced int `json:"numActivitiesSynced"` } func unmarshalLogData(t *testing.T, b []byte) *logData { diff --git a/pkg/activitypub/service/anchorsynctask/activitysynctask.go b/pkg/activitypub/service/anchorsynctask/activitysynctask.go index 6cc678f09..eb9fb52aa 100644 --- a/pkg/activitypub/service/anchorsynctask/activitysynctask.go +++ b/pkg/activitypub/service/anchorsynctask/activitysynctask.go @@ -10,6 +10,7 @@ import ( "context" "errors" "fmt" + "math" "net/url" "time" @@ -32,8 +33,10 @@ const logModule = "activity_sync" var logger = log.New(logModule) const ( - defaultInterval = time.Minute - defaultMinActivityAge = time.Minute + defaultInterval = time.Minute + defaultAcceleratedInterval = 15 * time.Second + defaultMinActivityAge = time.Minute + defaultMaxActivitiesToSync = math.MaxInt taskName = "activity-sync" ) @@ -51,60 +54,53 @@ type activityPubClient interface { } type taskManager interface { - RegisterTask(taskType string, interval time.Duration, task func()) + RegisterTaskEx(taskType string, interval time.Duration, task func() time.Duration) } // Config contains configuration parameters for the anchor event synchronization task. type Config struct { - ServiceIRI *url.URL - Interval time.Duration - MinActivityAge time.Duration + ServiceIRI *url.URL + Interval time.Duration + AcceleratedInterval time.Duration + MinActivityAge time.Duration + MaxActivitiesToSync int } type task struct { - serviceIRI *url.URL - apClient activityPubClient - store *syncStore - getHandler func() spi.InboxHandler - activityPubStore store.Store - closed chan struct{} - minActivityAge time.Duration - tracer trace.Tracer + serviceIRI *url.URL + apClient activityPubClient + store *syncStore + getHandler func() spi.InboxHandler + activityPubStore store.Store + closed chan struct{} + minActivityAge time.Duration + maxActivitiesToSync int + acceleratedInterval time.Duration + tracer trace.Tracer } // Register registers the anchor event synchronization task. func Register(cfg Config, taskMgr taskManager, apClient activityPubClient, apStore store.Store, storageProvider storage.Provider, handlerFactory func() spi.InboxHandler, ) error { - interval := cfg.Interval + config := resolveConfig(&cfg) - if interval == 0 { - interval = defaultInterval - } - - minActivityAge := cfg.MinActivityAge - - if minActivityAge == 0 { - minActivityAge = defaultMinActivityAge - } - - t, err := newTask(cfg.ServiceIRI, apClient, apStore, storageProvider, minActivityAge, handlerFactory) + t, err := newTask(config, apClient, apStore, storageProvider, handlerFactory) if err != nil { return fmt.Errorf("create task: %w", err) } logger.Info("Registering activity-sync task.", - logfields.WithServiceIRI(cfg.ServiceIRI), logfields.WithTaskMonitorInterval(interval), - logfields.WithMinAge(minActivityAge)) + logfields.WithServiceIRI(config.ServiceIRI), logfields.WithTaskMonitorInterval(config.Interval), + logfields.WithMinAge(config.MinActivityAge), logfields.WithMaxActivitiesToSync(config.MaxActivitiesToSync)) - taskMgr.RegisterTask(taskName, interval, t.run) + taskMgr.RegisterTaskEx(taskName, config.Interval, t.run) return nil } -func newTask(serviceIRI *url.URL, apClient activityPubClient, apStore store.Store, - storageProvider storage.Provider, minActivityAge time.Duration, - handlerFactory func() spi.InboxHandler, +func newTask(cfg *Config, apClient activityPubClient, apStore store.Store, + storageProvider storage.Provider, handlerFactory func() spi.InboxHandler, ) (*task, error) { s, err := newSyncStore(storageProvider) if err != nil { @@ -112,68 +108,132 @@ func newTask(serviceIRI *url.URL, apClient activityPubClient, apStore store.Stor } return &task{ - serviceIRI: serviceIRI, - apClient: apClient, - store: s, - activityPubStore: apStore, - getHandler: handlerFactory, - minActivityAge: minActivityAge, - closed: make(chan struct{}), - tracer: tracing.Tracer(tracing.SubsystemActivityPub), + serviceIRI: cfg.ServiceIRI, + apClient: apClient, + store: s, + activityPubStore: apStore, + getHandler: handlerFactory, + minActivityAge: cfg.MinActivityAge, + maxActivitiesToSync: cfg.MaxActivitiesToSync, + acceleratedInterval: cfg.AcceleratedInterval, + closed: make(chan struct{}), + tracer: tracing.Tracer(tracing.SubsystemActivityPub), }, nil } -func (m *task) run() { +func (m *task) run() time.Duration { + numFromFollowers, err := m.syncFollowers(m.maxActivitiesToSync) + if err != nil { + logger.Error("Error synchronizing activities", log.WithError(err)) + + return 0 + } + + var numFromFollowing int + + if numFromFollowers < m.maxActivitiesToSync { + numFromFollowing, err = m.syncFollowing(m.maxActivitiesToSync - numFromFollowers) + if err != nil { + logger.Error("Error synchronizing activities", log.WithError(err)) + + return 0 + } + } + + numTotal := numFromFollowers + numFromFollowing + + if numTotal > 0 { + if numTotal >= m.maxActivitiesToSync { + logger.Info("Reached the maximum number of activities to sync. Will continue syncing in the next run.", + logfields.WithNumActivitiesSynced(numTotal), logfields.WithNextActivitySyncInterval(m.acceleratedInterval)) + + return m.acceleratedInterval + } + + logger.Info("Done synchronizing activities", logfields.WithNumActivitiesSynced(numTotal)) + } + + return 0 +} + +func (m *task) syncFollowers(maxActivitiesToSync int) (int, error) { followers, err := m.getServices(store.Follower) if err != nil { logger.Error("Error retrieving my followers list", log.WithError(err)) - return + return 0, err } - if len(followers) > 0 { - for _, serviceIRI := range followers { - err = m.sync(serviceIRI, inbox, func(a *vocab.ActivityType) bool { - // Only sync Create activities that were originated by this service. - return a.Type().Is(vocab.TypeCreate) && a.Actor().String() == m.serviceIRI.String() - }) - if err != nil { - logger.Warn("Error processing activities from inbox of service", - logfields.WithServiceIRI(serviceIRI), log.WithError(err)) + if len(followers) == 0 { + return 0, nil + } + + var numProcessed int + + for _, serviceIRI := range followers { + num, err := m.sync(serviceIRI, inbox, maxActivitiesToSync-numProcessed, func(a *vocab.ActivityType) bool { + // Only sync Create activities that were originated by this service. + return a.Type().Is(vocab.TypeCreate) && a.Actor().String() == m.serviceIRI.String() + }) + if err != nil { + logger.Warn("Error processing activities from inbox of service", + logfields.WithServiceIRI(serviceIRI), log.WithError(err)) + } else { + numProcessed += num + + if numProcessed >= maxActivitiesToSync { + break } } - - logger.Debug("Done synchronizing activities with services that are following me.", - logfields.WithTotal(len(followers))) } + logger.Debug("Done synchronizing activities with services that are following me.", logfields.WithTotal(len(followers)), + logfields.WithNumActivitiesSynced(numProcessed)) + + return numProcessed, nil +} + +func (m *task) syncFollowing(maxActivitiesToSync int) (int, error) { following, err := m.getServices(store.Following) if err != nil { - logger.Error("Error retrieving my following list", log.WithError(err)) + return 0, fmt.Errorf("retrieve following list: %w", err) + } - return + if len(following) == 0 { + return 0, nil } - if len(following) > 0 { - for _, serviceIRI := range following { - err = m.sync(serviceIRI, outbox, func(a *vocab.ActivityType) bool { - return a.Type().IsAny(vocab.TypeCreate, vocab.TypeAnnounce) - }) - if err != nil { - logger.Warn("Error processing activities from outbox of service", - logfields.WithServiceIRI(serviceIRI), log.WithError(err)) + var numProcessed int + + for _, serviceIRI := range following { + num, err := m.sync(serviceIRI, outbox, maxActivitiesToSync-numProcessed, func(a *vocab.ActivityType) bool { + return a.Type().IsAny(vocab.TypeCreate, vocab.TypeAnnounce) + }) + if err != nil { + logger.Warn("Error processing activities from outbox of service", + logfields.WithServiceIRI(serviceIRI), log.WithError(err)) + } else { + numProcessed += num + + if numProcessed >= maxActivitiesToSync { + break } } - - logger.Debug("Done synchronizing activities with services that I'm following.", logfields.WithTotal(len(following))) } + + logger.Debug("Done synchronizing activities with services that I'm following.", logfields.WithTotal(len(following)), + logfields.WithNumActivitiesSynced(numProcessed)) + + return numProcessed, nil } //nolint:cyclop -func (m *task) sync(serviceIRI *url.URL, src activitySource, shouldSync func(*vocab.ActivityType) bool) error { +func (m *task) sync(serviceIRI *url.URL, src activitySource, maxNumActivitiesToProcess int, + shouldSync func(*vocab.ActivityType) bool, +) (int, error) { it, lastSyncedPage, lastSyncedIndex, err := m.getNewActivities(serviceIRI, src) if err != nil { - return fmt.Errorf("get new activities: %w", err) + return 0, fmt.Errorf("get new activities: %w", err) } page, index := lastSyncedPage, lastSyncedIndex @@ -192,7 +252,7 @@ func (m *task) sync(serviceIRI *url.URL, src activitySource, shouldSync func(*vo break } - return fmt.Errorf("next activity: %w", e) + return numProcessed, fmt.Errorf("next activity: %w", e) } currentPage := it.CurrentPage() @@ -219,7 +279,7 @@ func (m *task) sync(serviceIRI *url.URL, src activitySource, shouldSync func(*vo n, e := m.syncActivity(span.Start("sync activities"), serviceIRI, currentPage, a) if e != nil { - return fmt.Errorf("sync activity [%s]: %w", a.ID(), e) + return numProcessed, fmt.Errorf("sync activity [%s]: %w", a.ID(), e) } numProcessed += n @@ -227,6 +287,10 @@ func (m *task) sync(serviceIRI *url.URL, src activitySource, shouldSync func(*vo progress.Log(n, page, currentPage) page, index = currentPage, it.NextIndex()-1 + + if numProcessed >= maxNumActivitiesToProcess { + break + } } if page.String() != lastSyncedPage.String() || index != lastSyncedIndex { @@ -241,7 +305,7 @@ func (m *task) sync(serviceIRI *url.URL, src activitySource, shouldSync func(*vo err = m.store.PutLastSyncedPage(serviceIRI, src, page, index) if err != nil { - return fmt.Errorf("update last synced page [%s] at index [%d]: %w", page, index, err) + return numProcessed, fmt.Errorf("update last synced page [%s] at index [%d]: %w", page, index, err) } } else { logger.Debug("Processed missing anchor events ending at page/index.", @@ -249,7 +313,7 @@ func (m *task) sync(serviceIRI *url.URL, src activitySource, shouldSync func(*vo logfields.WithURL(page), logfields.WithIndex(index)) } - return nil + return numProcessed, nil } func (m *task) syncActivity(ctx context.Context, serviceIRI, currentPage *url.URL, a *vocab.ActivityType) (int, error) { @@ -393,6 +457,28 @@ func (m *task) getLastSyncedPage(serviceIRI *url.URL, src activitySource) (*url. return actor.Outbox(), 0, nil } +func resolveConfig(cfg *Config) *Config { + config := *cfg + + if config.Interval == 0 { + config.Interval = defaultInterval + } + + if config.AcceleratedInterval == 0 { + config.AcceleratedInterval = defaultAcceleratedInterval + } + + if config.MinActivityAge == 0 { + config.MinActivityAge = defaultMinActivityAge + } + + if config.MaxActivitiesToSync == 0 { + config.MaxActivitiesToSync = defaultMaxActivitiesToSync + } + + return &config +} + type progressLogger struct { numProcessedInPage int } diff --git a/pkg/activitypub/service/anchorsynctask/activitysynctask_test.go b/pkg/activitypub/service/anchorsynctask/activitysynctask_test.go index a99f72337..afd52a90f 100644 --- a/pkg/activitypub/service/anchorsynctask/activitysynctask_test.go +++ b/pkg/activitypub/service/anchorsynctask/activitysynctask_test.go @@ -102,12 +102,19 @@ func TestRun(t *testing.T) { WithActivities(activities) t.Run("Success", func(t *testing.T) { + cfg := resolveConfig(&Config{ + ServiceIRI: serviceIRI, + MinActivityAge: time.Second, + MaxActivitiesToSync: 2, + AcceleratedInterval: time.Millisecond, + }) + handler := &mockHandler{} handler.duplicateAnchors = append(handler.duplicateAnchors, announceActivities[1], createActivities[1]) task, err := newTask( - serviceIRI, apClient, apStore, storage.NewMockStoreProvider(), time.Second, + cfg, apClient, apStore, storage.NewMockStoreProvider(), func() spi.InboxHandler { return handler }, @@ -122,12 +129,19 @@ func TestRun(t *testing.T) { time.Sleep(time.Second) + // Split into two runs with MaxActivitiesToSync=2. + task.run() task.run() require.Equal(t, 3, len(handler.activities)) }) t.Run("QueryReferences error", func(t *testing.T) { + cfg := resolveConfig(&Config{ + ServiceIRI: serviceIRI, + MinActivityAge: time.Nanosecond, + }) + errExpected := errors.New("injected query error") s := &mocks.ActivityStore{} @@ -136,7 +150,7 @@ func TestRun(t *testing.T) { handler := &mockHandler{} task, err := newTask( - serviceIRI, apClient, s, storage.NewMockStoreProvider(), time.Nanosecond, + cfg, apClient, s, storage.NewMockStoreProvider(), func() spi.InboxHandler { return handler }, @@ -150,6 +164,11 @@ func TestRun(t *testing.T) { }) t.Run("ReferenceIterator error", func(t *testing.T) { + cfg := resolveConfig(&Config{ + ServiceIRI: serviceIRI, + MinActivityAge: time.Nanosecond, + }) + errExpected := errors.New("injected iterator error") it := &mocks2.ReferenceIterator{} @@ -161,7 +180,7 @@ func TestRun(t *testing.T) { handler := &mockHandler{} task, err := newTask( - serviceIRI, apClient, s, storage.NewMockStoreProvider(), time.Nanosecond, + cfg, apClient, s, storage.NewMockStoreProvider(), func() spi.InboxHandler { return handler }, @@ -175,6 +194,11 @@ func TestRun(t *testing.T) { }) t.Run("GetActor error", func(t *testing.T) { + cfg := resolveConfig(&Config{ + ServiceIRI: serviceIRI, + MinActivityAge: time.Nanosecond, + }) + errExpected := errors.New("injected client error") apClient := mocks.NewActivitPubClient().WithError(errExpected) @@ -182,7 +206,7 @@ func TestRun(t *testing.T) { handler := &mockHandler{} task, err := newTask( - serviceIRI, apClient, apStore, storage.NewMockStoreProvider(), time.Nanosecond, + cfg, apClient, apStore, storage.NewMockStoreProvider(), func() spi.InboxHandler { return handler }, diff --git a/pkg/activitypub/service/mocks/taskmgr.go b/pkg/activitypub/service/mocks/taskmgr.go index 2a15adfa2..24eaccb62 100644 --- a/pkg/activitypub/service/mocks/taskmgr.go +++ b/pkg/activitypub/service/mocks/taskmgr.go @@ -77,7 +77,7 @@ func (m *TaskManager) stop() { type task struct { interval time.Duration - run func() + run func() time.Duration lastRunTime time.Time } @@ -95,7 +95,16 @@ func (m *TaskManager) InstanceID() string { } // RegisterTask registers the given task to be run at the given interval. -func (m *TaskManager) RegisterTask(_ string, interval time.Duration, run func()) { +func (m *TaskManager) RegisterTask(id string, interval time.Duration, run func()) { + m.RegisterTaskEx(id, interval, func() time.Duration { + run() + + return 0 + }) +} + +// RegisterTaskEx registers the given task to be run at the given interval. +func (m *TaskManager) RegisterTaskEx(_ string, interval time.Duration, run func() time.Duration) { m.mutex.Lock() defer m.mutex.Unlock() diff --git a/pkg/taskmgr/taskmgr.go b/pkg/taskmgr/taskmgr.go index fbdc739d9..6121834e2 100644 --- a/pkg/taskmgr/taskmgr.go +++ b/pkg/taskmgr/taskmgr.go @@ -11,7 +11,6 @@ import ( "errors" "fmt" "sync" - "sync/atomic" "time" "github.com/google/uuid" @@ -103,14 +102,26 @@ func (s *Manager) InstanceID() string { } // RegisterTask registers a task to be periodically run at the given interval. -func (s *Manager) RegisterTask(id string, interval time.Duration, task func()) { +func (s *Manager) RegisterTask(id string, interval time.Duration, run func()) { + s.RegisterTaskEx(id, interval, func() time.Duration { + run() + + return 0 + }) +} + +// RegisterTaskEx registers a task to be periodically run at the given interval. +// The task returns an override of the default interval. For example, if 5s is returned then the task +// will run again in 5 seconds. If 0 is returned then the task will run at the default interval. +func (s *Manager) RegisterTaskEx(id string, defaultInterval time.Duration, task func() time.Duration) { s.mutex.Lock() defer s.mutex.Unlock() s.tasks[id] = ®istration{ - handle: task, - id: id, - interval: interval, + handle: task, + id: id, + defaultInterval: defaultInterval, + nextInterval: defaultInterval, } } @@ -153,7 +164,7 @@ func (s *Manager) stop() { } func (s *Manager) run(t *registration) error { - if t.isRunning() { + if t.Running() { s.logger.Debug("Task is still running. Updating timestamp in the permit to tell others that I'm still alive.", logfields.WithTaskID(t.id)) @@ -228,10 +239,10 @@ func (s *Manager) shouldRun(t *registration) (bool, error) { timeSinceLastUpdate := time.Since(timeOfLastUpdate).Truncate(time.Second) if currentPermit.CurrentHolder == s.instanceID { - if timeSinceLastUpdate < t.interval { + if timeSinceLastUpdate < t.NextInterval() { s.logger.Debug("It's currently my duty to run this task but it's not time for it to run.", logfields.WithTaskID(t.id), logfields.WithTimeSinceLastUpdate(timeSinceLastUpdate), - logfields.WithTaskMonitorInterval(t.interval)) + logfields.WithTaskMonitorInterval(t.NextInterval())) return false, nil } @@ -249,7 +260,7 @@ func (s *Manager) shouldRun(t *registration) (bool, error) { // within the cluster have the same interval setting (which they should). // So, "unusually long time" means that the 'last update' time is greater than the Task Manager check interval plus // the task's run interval, in which case we'll assume that the other instance is dead and will take over. - maxTime := s.interval + t.interval + maxTime := s.interval + t.DefaultInterval() if timeSinceLastUpdate > maxTime { s.logger.Info("The current permit holder for this task has not updated the permit in an "+ @@ -299,23 +310,48 @@ func getPermitKey(taskID string) string { } type registration struct { - handle func() - running uint32 - id string - interval time.Duration + handle func() time.Duration + running bool + id string + defaultInterval time.Duration + nextInterval time.Duration + mutex sync.RWMutex } func (r *registration) run() { - if !atomic.CompareAndSwapUint32(&r.running, 0, 1) { + if r.Running() { // Already running. return } - r.handle() + nextInterval := r.handle() + + if nextInterval == 0 { + nextInterval = r.defaultInterval + } + + r.mutex.Lock() + + r.running = false + r.nextInterval = nextInterval - atomic.StoreUint32(&r.running, 0) + r.mutex.Unlock() } -func (r *registration) isRunning() bool { - return atomic.LoadUint32(&r.running) == 1 +func (r *registration) Running() bool { + r.mutex.RLock() + defer r.mutex.RUnlock() + + return r.running +} + +func (r *registration) DefaultInterval() time.Duration { + return r.defaultInterval +} + +func (r *registration) NextInterval() time.Duration { + r.mutex.RLock() + defer r.mutex.RUnlock() + + return r.nextInterval } diff --git a/pkg/taskmgr/taskmgr_test.go b/pkg/taskmgr/taskmgr_test.go index e8dda48f8..c1fa24b4a 100644 --- a/pkg/taskmgr/taskmgr_test.go +++ b/pkg/taskmgr/taskmgr_test.go @@ -67,9 +67,9 @@ func TestService(t *testing.T) { taskMgr := New(coordinationStore, time.Millisecond) err := taskMgr.run(®istration{ - handle: func() {}, - id: "test-task", - interval: time.Millisecond, + handle: func() time.Duration { return 0 }, + id: "test-task", + defaultInterval: time.Millisecond, }) require.Error(t, err) require.Contains(t, err.Error(), "get permit from DB for task [test-task]: get error") @@ -83,9 +83,9 @@ func TestService(t *testing.T) { taskMgr := New(coordinationStore, time.Millisecond) err := taskMgr.run(®istration{ - handle: func() {}, - id: "test-task", - interval: time.Millisecond, + handle: func() time.Duration { return 0 }, + id: "test-task", + defaultInterval: time.Millisecond, }) require.Error(t, err) require.Contains(t, err.Error(), diff --git a/test/bdd/fixtures/docker-compose.yml b/test/bdd/fixtures/docker-compose.yml index 4b05838fc..d48582bec 100644 --- a/test/bdd/fixtures/docker-compose.yml +++ b/test/bdd/fixtures/docker-compose.yml @@ -15,7 +15,7 @@ services: - ORB_SYNC_TIMEOUT=3 - ORB_KMS_TYPE=web - ORB_KMS_ENDPOINT=http://orb.kms:7878 - - LOG_LEVEL=metrics=INFO:nodeinfo=WARNING:activitypub_store=INFO:expiry-service=INFO:task-manager=INFO:watermill=INFO:DEBUG + - LOG_LEVEL=metrics=INFO:nodeinfo=WARNING:activitypub_store=INFO:expiry-service=INFO:task-manager=INFO:watermill=INFO:INFO - VCT_ENABLED=true - ORB_HOST_URL=172.20.0.23:443 - ORB_METRICS_PROVIDER_NAME=prometheus @@ -119,10 +119,18 @@ services: # we're following. # Default value: 1m. - ANCHOR_EVENT_SYNC_INTERVAL=1m + # ANCHOR_EVENT_SYNC_MAX_ACTIVITIES is the maximum number of activities to be synchronized in a single task run. + # Default value: 500. + - ANCHOR_EVENT_SYNC_MAX_ACTIVITIES=5 + # ANCHOR_EVENT_SYNC_ACCELERATED_INTERVAL is the interval in which to run the anchor sync task after the maximum number of + # activities (specified by ANCHOR_EVENT_SYNC_MAX_ACTIVITIES) have been processed for the current task run. This + # should be smaller than the default interval in order to accelerate processing. + # Default value: 15s. + - ANCHOR_EVENT_SYNC_ACCELERATED_INTERVAL=15s # ANCHOR_EVENT_SYNC_MIN_ACTIVITY_AGE is the minimum age of an activity to be synchronized. The activity will be processed # only if its age is greater than this value. - # Default value: 1m - - ANCHOR_EVENT_SYNC_MIN_ACTIVITY_AGE=1m + # Default value: 10m + - ANCHOR_EVENT_SYNC_MIN_ACTIVITY_AGE=20s # VCT_MONITORING_INTERVAL is the interval (period) in which proofs are monitored from various VCTs that promised # to anchor a VC by a certain time. # Default value: 10s. @@ -171,7 +179,7 @@ services: - ORB_SYNC_TIMEOUT=3 - ORB_KMS_TYPE=web - ORB_KMS_ENDPOINT=http://orb.kms:7878 - - LOG_LEVEL=metrics=INFO:nodeinfo=WARNING:activitypub_store=INFO:expiry-service=INFO:task-manager=INFO:watermill=INFO:DEBUG + - LOG_LEVEL=metrics=INFO:nodeinfo=WARNING:activitypub_store=INFO:expiry-service=INFO:task-manager=INFO:watermill=INFO:INFO - VCT_ENABLED=true - ORB_HOST_URL=172.20.0.2:443 - ORB_METRICS_PROVIDER_NAME=prometheus @@ -262,10 +270,18 @@ services: # we're following. # Default value: 1m. - ANCHOR_EVENT_SYNC_INTERVAL=1m + # ANCHOR_EVENT_SYNC_MAX_ACTIVITIES is the maximum number of activities to be synchronized in a single task run. + # Default value: 500. + - ANCHOR_EVENT_SYNC_MAX_ACTIVITIES=5 + # ANCHOR_EVENT_SYNC_ACCELERATED_INTERVAL is the interval in which to run the anchor sync task after the maximum number of + # activities (specified by ANCHOR_EVENT_SYNC_MAX_ACTIVITIES) have been processed for the current task run. This + # should be smaller than the default interval in order to accelerate processing. + # Default value: 15s. + - ANCHOR_EVENT_SYNC_ACCELERATED_INTERVAL=15s # ANCHOR_EVENT_SYNC_MIN_ACTIVITY_AGE is the minimum age of an activity to be synchronized. The activity will be processed # only if its age is greater than this value. - # Default value: 1m - - ANCHOR_EVENT_SYNC_MIN_ACTIVITY_AGE=1m + # Default value: 10m + - ANCHOR_EVENT_SYNC_MIN_ACTIVITY_AGE=20s # VCT_MONITORING_INTERVAL is the interval (period) in which proofs are monitored from various VCTs that promised # to anchor a VC by a certain time. # Default value: 10s. @@ -327,7 +343,7 @@ services: - ORB_SYNC_TIMEOUT=3 - ORB_KMS_TYPE=web - ORB_KMS_ENDPOINT=http://orb.kms:7878 - - LOG_LEVEL=metrics=INFO:nodeinfo=WARNING:activitypub_store=INFO:expiry-service=INFO:task-manager=INFO:watermill=INFO:DEBUG + - LOG_LEVEL=metrics=INFO:nodeinfo=WARNING:activitypub_store=INFO:expiry-service=INFO:task-manager=INFO:watermill=INFO:INFO - ORB_HOST_URL=172.20.0.4:80 - ORB_METRICS_PROVIDER_NAME=prometheus - ORB_PROM_HTTP_URL=172.20.0.4:48827 @@ -405,10 +421,18 @@ services: # we're following. # Default value: 1m. - ANCHOR_EVENT_SYNC_INTERVAL=1m + # ANCHOR_EVENT_SYNC_MAX_ACTIVITIES is the maximum number of activities to be synchronized in a single task run. + # Default value: 500. + - ANCHOR_EVENT_SYNC_MAX_ACTIVITIES=5 + # ANCHOR_EVENT_SYNC_ACCELERATED_INTERVAL is the interval in which to run the anchor sync task after the maximum number of + # activities (specified by ANCHOR_EVENT_SYNC_MAX_ACTIVITIES) have been processed for the current task run. This + # should be smaller than the default interval in order to accelerate processing. + # Default value: 15s. + - ANCHOR_EVENT_SYNC_ACCELERATED_INTERVAL=15s # ANCHOR_EVENT_SYNC_MIN_ACTIVITY_AGE is the minimum age of an activity to be synchronized. The activity will be processed # only if its age is greater than this value. - # Default value: 1m - - ANCHOR_EVENT_SYNC_MIN_ACTIVITY_AGE=1m + # Default value: 10m + - ANCHOR_EVENT_SYNC_MIN_ACTIVITY_AGE=20s - ANCHOR_DATA_URI_MEDIA_TYPE=application/json - ORB_REQUEST_TOKENS=vct-read=vctread,vct-write=vctwrite ports: @@ -435,7 +459,7 @@ services: - ORB_SYNC_TIMEOUT=3 - ORB_KMS_TYPE=web - ORB_KMS_ENDPOINT=http://orb.kms:7878 - - LOG_LEVEL=metrics=INFO:nodeinfo=WARNING:activitypub_store=INFO:expiry-service=INFO:task-manager=INFO:watermill=INFO:DEBUG + - LOG_LEVEL=metrics=INFO:nodeinfo=WARNING:activitypub_store=INFO:expiry-service=INFO:task-manager=INFO:watermill=INFO:INFO - ORB_HOST_URL=172.20.0.5:80 - ORB_METRICS_PROVIDER_NAME=prometheus - ORB_PROM_HTTP_URL=172.20.0.5:48927 @@ -513,10 +537,18 @@ services: # we're following. # Default value: 1m. - ANCHOR_EVENT_SYNC_INTERVAL=1m + # ANCHOR_EVENT_SYNC_MAX_ACTIVITIES is the maximum number of activities to be synchronized in a single task run. + # Default value: 500. + - ANCHOR_EVENT_SYNC_MAX_ACTIVITIES=5 + # ANCHOR_EVENT_SYNC_ACCELERATED_INTERVAL is the interval in which to run the anchor sync task after the maximum number of + # activities (specified by ANCHOR_EVENT_SYNC_MAX_ACTIVITIES) have been processed for the current task run. This + # should be smaller than the default interval in order to accelerate processing. + # Default value: 15s. + - ANCHOR_EVENT_SYNC_ACCELERATED_INTERVAL=15s # ANCHOR_EVENT_SYNC_MIN_ACTIVITY_AGE is the minimum age of an activity to be synchronized. The activity will be processed # only if its age is greater than this value. - # Default value: 1m - - ANCHOR_EVENT_SYNC_MIN_ACTIVITY_AGE=1m + # Default value: 10m + - ANCHOR_EVENT_SYNC_MIN_ACTIVITY_AGE=20s - ANCHOR_DATA_URI_MEDIA_TYPE=application/json - ORB_REQUEST_TOKENS=vct-read=vctread,vct-write=vctwrite ports: @@ -543,7 +575,7 @@ services: - ORB_SYNC_TIMEOUT=3 - ORB_KMS_TYPE=web - ORB_KMS_ENDPOINT=http://orb.kms:7878 - - LOG_LEVEL=metrics=INFO:nodeinfo=WARNING:activitypub_store=INFO:expiry-service=INFO:task-manager=INFO:watermill=INFO:DEBUG + - LOG_LEVEL=metrics=INFO:nodeinfo=WARNING:activitypub_store=INFO:expiry-service=INFO:task-manager=INFO:watermill=INFO:INFO - VCT_ENABLED=true - ORB_HOST_URL=172.20.0.6:443 - ORB_METRICS_PROVIDER_NAME=prometheus @@ -632,10 +664,18 @@ services: # we're following. # Default value: 1m. - ANCHOR_EVENT_SYNC_INTERVAL=1m + # ANCHOR_EVENT_SYNC_MAX_ACTIVITIES is the maximum number of activities to be synchronized in a single task run. + # Default value: 500. + - ANCHOR_EVENT_SYNC_MAX_ACTIVITIES=5 + # ANCHOR_EVENT_SYNC_ACCELERATED_INTERVAL is the interval in which to run the anchor sync task after the maximum number of + # activities (specified by ANCHOR_EVENT_SYNC_MAX_ACTIVITIES) have been processed for the current task run. This + # should be smaller than the default interval in order to accelerate processing. + # Default value: 15s. + - ANCHOR_EVENT_SYNC_ACCELERATED_INTERVAL=15s # ANCHOR_EVENT_SYNC_MIN_ACTIVITY_AGE is the minimum age of an activity to be synchronized. The activity will be processed # only if its age is greater than this value. - # Default value: 1m - - ANCHOR_EVENT_SYNC_MIN_ACTIVITY_AGE=1m + # Default value: 10m + - ANCHOR_EVENT_SYNC_MIN_ACTIVITY_AGE=20s - ORB_REQUEST_TOKENS=vct-read=vctread,vct-write=vctwrite - VCT_LOG_ENTRIES_STORE_ENABLED=true ports: @@ -667,7 +707,7 @@ services: - ORB_HTTP_SIGN_ACTIVE_KEY_ID=aws-kms://arn:aws:kms:ca-central-1:111122223333:alias/http-sign - AWS_ACCESS_KEY_ID=mock - AWS_SECRET_ACCESS_KEY=mock - - LOG_LEVEL=metrics=INFO:nodeinfo=WARNING:activitypub_store=INFO:expiry-service=INFO:task-manager=INFO:watermill=INFO:DEBUG + - LOG_LEVEL=metrics=INFO:nodeinfo=WARNING:activitypub_store=INFO:expiry-service=INFO:task-manager=INFO:watermill=INFO:INFO - ORB_HOST_URL=172.20.0.7:443 - ORB_METRICS_PROVIDER_NAME=prometheus - ORB_PROM_HTTP_URL=172.20.0.7:48727 @@ -756,10 +796,18 @@ services: # we're following. # Default value: 1m. - ANCHOR_EVENT_SYNC_INTERVAL=1m + # ANCHOR_EVENT_SYNC_MAX_ACTIVITIES is the maximum number of activities to be synchronized in a single task run. + # Default value: 500. + - ANCHOR_EVENT_SYNC_MAX_ACTIVITIES=5 + # ANCHOR_EVENT_SYNC_ACCELERATED_INTERVAL is the interval in which to run the anchor sync task after the maximum number of + # activities (specified by ANCHOR_EVENT_SYNC_MAX_ACTIVITIES) have been processed for the current task run. This + # should be smaller than the default interval in order to accelerate processing. + # Default value: 15s. + - ANCHOR_EVENT_SYNC_ACCELERATED_INTERVAL=15s # ANCHOR_EVENT_SYNC_MIN_ACTIVITY_AGE is the minimum age of an activity to be synchronized. The activity will be processed # only if its age is greater than this value. - # Default value: 1m - - ANCHOR_EVENT_SYNC_MIN_ACTIVITY_AGE=1m + # Default value: 10m + - ANCHOR_EVENT_SYNC_MIN_ACTIVITY_AGE=20s - ORB_REQUEST_TOKENS=vct-read=vctread,vct-write=vctwrite ports: - 48726:443 @@ -785,7 +833,7 @@ services: - ORB_SYNC_TIMEOUT=3 - ORB_KMS_TYPE=web - ORB_KMS_ENDPOINT=http://orb5.kms:7878 - - LOG_LEVEL=metrics=INFO:nodeinfo=WARNING:activitypub_store=INFO:expiry-service=INFO:task-manager=INFO:watermill=INFO:DEBUG + - LOG_LEVEL=metrics=INFO:nodeinfo=WARNING:activitypub_store=INFO:expiry-service=INFO:task-manager=INFO:watermill=INFO:INFO - ORB_HOST_URL=172.20.0.31:443 - ORB_METRICS_PROVIDER_NAME=prometheus - ORB_PROM_HTTP_URL=172.20.0.31:48727 @@ -881,10 +929,18 @@ services: # ANCHOR_EVENT_SYNC_INTERVAL is the interval in which anchor events are synchronized with other services that # we're following. # Default value: 1m. - - ANCHOR_EVENT_SYNC_INTERVAL=10s + - ANCHOR_EVENT_SYNC_INTERVAL=20s + # ANCHOR_EVENT_SYNC_MAX_ACTIVITIES is the maximum number of activities to be synchronized in a single task run. + # Default value: 500. + - ANCHOR_EVENT_SYNC_MAX_ACTIVITIES=5 + # ANCHOR_EVENT_SYNC_ACCELERATED_INTERVAL is the interval in which to run the anchor sync task after the maximum number of + # activities (specified by ANCHOR_EVENT_SYNC_MAX_ACTIVITIES) have been processed for the current task run. This + # should be smaller than the default interval in order to accelerate processing. + # Default value: 15s. + - ANCHOR_EVENT_SYNC_ACCELERATED_INTERVAL=3s # ANCHOR_EVENT_SYNC_MIN_ACTIVITY_AGE is the minimum age of an activity to be synchronized. The activity will be processed # only if its age is greater than this value. - # Default value: 1m + # Default value: 10m - ANCHOR_EVENT_SYNC_MIN_ACTIVITY_AGE=20s # WITNESS_POLICY_CACHE_EXPIRATION sets the expiration time of witness policy cache. # Default value: 30s. Set the cache expiration very low since we're updating the policy frequently during the test. @@ -1002,7 +1058,7 @@ services: - KMS_DATABASE_PREFIX=keystore_ - KMS_SECRET_LOCK_TYPE=local - KMS_SECRET_LOCK_KEY_PATH=/etc/kms/secret-lock.key - - KMS_LOG_LEVEL=debug + - KMS_LOG_LEVEL=error ports: - 7878:7878 volumes: @@ -1026,7 +1082,7 @@ services: - KMS_DATABASE_PREFIX=keystore_ - KMS_SECRET_LOCK_TYPE=local - KMS_SECRET_LOCK_KEY_PATH=/etc/kms/secret-lock.key - - KMS_LOG_LEVEL=debug + - KMS_LOG_LEVEL=error ports: - 7978:7878 volumes: