Skip to content

Commit

Permalink
Merge pull request #1559 from bstasyszyn/1550
Browse files Browse the repository at this point in the history
feat: Limit number of anchors to sync per task run
  • Loading branch information
fqutishat authored May 10, 2023
2 parents f77ef5f + 25418ee commit cfe12d7
Show file tree
Hide file tree
Showing 10 changed files with 677 additions and 405 deletions.
76 changes: 55 additions & 21 deletions cmd/orb-server/startcmd/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@ const (
defaultTaskMgrCheckInterval = 10 * time.Second
defaultDataExpiryCheckInterval = time.Minute
defaultAnchorSyncInterval = time.Minute
defaultAnchorSyncMinActivityAge = time.Minute
defaultAnchorSyncAcceleratedInterval = 15 * time.Second
defaultAnchorSyncMinActivityAge = 10 * time.Minute
defaultAnchorSyncMaxActivities = 500
defaultVCTProofMonitoringInterval = 10 * time.Second
defaultVCTLogMonitoringInterval = 10 * time.Second
defaultVCTLogMonitoringMaxTreeSize = 50000
Expand Down Expand Up @@ -632,10 +634,22 @@ const (
"this service is following. Defaults to 1m if not set. " +
commonEnvVarUsageText + anchorSyncIntervalEnvKey

anchorSyncMaxActivitiesFlagName = "sync-max-activities"
anchorSyncMaxActivitiesEnvKey = "ANCHOR_EVENT_SYNC_MAX_ACTIVITIES"
anchorSyncMaxActivitiesFlagUsage = "The maximum number of activities to be synchronized in a single task run. Defaults to 500 if not set. " +
commonEnvVarUsageText + anchorSyncMaxActivitiesEnvKey

anchorSyncAcceleratedIntervalFlagName = "sync-accelerated-interval"
anchorSyncAcceleratedIntervalEnvKey = "ANCHOR_EVENT_SYNC_ACCELERATED_INTERVAL"
anchorSyncNextIntervalFlagUsage = "The interval in which to run the activity sync task after the maximum number of activities " +
"(specified by sync-max-activities) have been processed for the current task run. This should be smaller than the default interval " +
"in order to accelerate processing. Defaults to 15s if not set. " +
commonEnvVarUsageText + anchorSyncAcceleratedIntervalEnvKey

anchorSyncMinActivityAgeFlagName = "sync-min-activity-age"
anchorSyncMinActivityAgeEnvKey = "ANCHOR_EVENT_SYNC_MIN_ACTIVITY_AGE"
anchorSyncMinActivityAgeFlagUsage = "The minimum age of an activity to be synchronized. The activity will be " +
"processed only if its age is greater than this value. Defaults to 1m if not set. " +
"processed only if its age is greater than this value. Defaults to 10m if not set. " +
commonEnvVarUsageText + anchorSyncMinActivityAgeEnvKey

activityPubClientCacheSizeFlagName = "apclient-cache-size"
Expand Down Expand Up @@ -1584,13 +1598,15 @@ func getVCTParams(cmd *cobra.Command) (*vctParams, error) {
}

type activityPubParams struct {
pageSize int
anchorSyncPeriod time.Duration
anchorSyncMinActivityAge time.Duration
clientCacheSize int
clientCacheExpiration time.Duration
iriCacheSize int
iriCacheExpiration time.Duration
pageSize int
anchorSyncPeriod time.Duration
anchorSyncAcceleratedPeriod time.Duration
anchorSyncMinActivityAge time.Duration
anchorSyncMaxActivities int
clientCacheSize int
clientCacheExpiration time.Duration
iriCacheSize int
iriCacheExpiration time.Duration
}

func getActivityPubParams(cmd *cobra.Command) (*activityPubParams, error) {
Expand All @@ -1599,7 +1615,7 @@ func getActivityPubParams(cmd *cobra.Command) (*activityPubParams, error) {
return nil, fmt.Errorf("%s: %w", activityPubPageSizeFlagName, err)
}

syncPeriod, minActivityAge, err := getAnchorSyncParameters(cmd)
syncPeriod, acceleratedSyncPeriod, minActivityAge, maxActivities, err := getAnchorSyncParameters(cmd)
if err != nil {
return nil, err
}
Expand All @@ -1615,13 +1631,15 @@ func getActivityPubParams(cmd *cobra.Command) (*activityPubParams, error) {
}

return &activityPubParams{
pageSize: activityPubPageSize,
anchorSyncPeriod: syncPeriod,
anchorSyncMinActivityAge: minActivityAge,
clientCacheSize: apClientCacheSize,
clientCacheExpiration: apClientCacheExpiration,
iriCacheSize: apIRICacheSize,
iriCacheExpiration: apIRICacheExpiration,
pageSize: activityPubPageSize,
anchorSyncPeriod: syncPeriod,
anchorSyncAcceleratedPeriod: acceleratedSyncPeriod,
anchorSyncMinActivityAge: minActivityAge,
anchorSyncMaxActivities: maxActivities,
clientCacheSize: apClientCacheSize,
clientCacheExpiration: apClientCacheExpiration,
iriCacheSize: apIRICacheSize,
iriCacheExpiration: apIRICacheExpiration,
}, nil
}

Expand Down Expand Up @@ -2182,19 +2200,33 @@ func getActivityPubCacheParameters(cmd *cobra.Command, params *cacheParams) (int
return cacheSize, cacheExpiration, nil
}

func getAnchorSyncParameters(cmd *cobra.Command) (syncPeriod, minActivityAge time.Duration, err error) {
func getAnchorSyncParameters(cmd *cobra.Command) (syncPeriod, acceleratedSyncPeriod,
minActivityAge time.Duration, maxActivities int, err error,
) {
syncPeriod, err = cmdutil.GetDuration(cmd, anchorSyncIntervalFlagName, anchorSyncIntervalEnvKey, defaultAnchorSyncInterval)
if err != nil {
return 0, 0, fmt.Errorf("%s: %w", anchorSyncIntervalFlagName, err)
return 0, 0, 0, 0, fmt.Errorf("%s: %w", anchorSyncIntervalFlagName, err)
}

acceleratedSyncPeriod, err = cmdutil.GetDuration(cmd, anchorSyncAcceleratedIntervalFlagName, anchorSyncAcceleratedIntervalEnvKey,
defaultAnchorSyncAcceleratedInterval)
if err != nil {
return 0, 0, 0, 0, fmt.Errorf("%s: %w", anchorSyncIntervalFlagName, err)
}

minActivityAge, err = cmdutil.GetDuration(cmd, anchorSyncMinActivityAgeFlagName, anchorSyncMinActivityAgeEnvKey,
defaultAnchorSyncMinActivityAge)
if err != nil {
return 0, 0, fmt.Errorf("%s: %w", anchorSyncMinActivityAgeFlagName, err)
return 0, 0, 0, 0, fmt.Errorf("%s: %w", anchorSyncMinActivityAgeFlagName, err)
}

maxActivities, err = cmdutil.GetInt(cmd, anchorSyncMaxActivitiesFlagName, anchorSyncMaxActivitiesEnvKey,
defaultAnchorSyncMaxActivities)
if err != nil {
return 0, 0, 0, 0, fmt.Errorf("%s: %w", anchorSyncMinActivityAgeFlagName, err)
}

return syncPeriod, minActivityAge, nil
return syncPeriod, acceleratedSyncPeriod, minActivityAge, maxActivities, nil
}

func newAPServiceParams(apServiceID, externalEndpoint string,
Expand Down Expand Up @@ -2377,6 +2409,8 @@ func createFlags(startCmd *cobra.Command) {
startCmd.Flags().StringP(httpTimeoutFlagName, "", "", httpTimeoutFlagUsage)
startCmd.Flags().StringP(httpDialTimeoutFlagName, "", "", httpDialTimeoutFlagUsage)
startCmd.Flags().StringP(anchorSyncIntervalFlagName, anchorSyncIntervalFlagShorthand, "", anchorSyncIntervalFlagUsage)
startCmd.Flags().StringP(anchorSyncAcceleratedIntervalFlagName, "", "", anchorSyncNextIntervalFlagUsage)
startCmd.Flags().StringP(anchorSyncMaxActivitiesFlagName, "", "", anchorSyncMaxActivitiesFlagUsage)
startCmd.Flags().StringP(anchorSyncMinActivityAgeFlagName, "", "", anchorSyncMinActivityAgeFlagUsage)
startCmd.Flags().StringP(vctProofMonitoringIntervalFlagName, "", "", vctProofMonitoringIntervalFlagUsage)
startCmd.Flags().StringP(vctProofMonitoringExpiryPeriodFlagName, "", "", vctProofMonitoringExpiryPeriodFlagUsage)
Expand Down
8 changes: 5 additions & 3 deletions cmd/orb-server/startcmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -873,9 +873,11 @@ func startOrbServices(parameters *orbParameters) error {

err = anchorsynctask.Register(
anchorsynctask.Config{
ServiceIRI: parameters.apServiceParams.serviceIRI(),
Interval: parameters.activityPub.anchorSyncPeriod,
MinActivityAge: parameters.activityPub.anchorSyncMinActivityAge,
ServiceIRI: parameters.apServiceParams.serviceIRI(),
Interval: parameters.activityPub.anchorSyncPeriod,
AcceleratedInterval: parameters.activityPub.anchorSyncAcceleratedPeriod,
MinActivityAge: parameters.activityPub.anchorSyncMinActivityAge,
MaxActivitiesToSync: parameters.activityPub.anchorSyncMaxActivities,
},
taskMgr, apClient, apStore, storeProviders.provider,
func() apspi.InboxHandler {
Expand Down
Loading

0 comments on commit cfe12d7

Please sign in to comment.