Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Timeout mechanism for values #1812

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions packages/server-admin-ui/src/views/ServerConfig/Settings.js
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,26 @@ class ServerSettings extends Component {
</FormText>
</Col>
</FormGroup>
<FormGroup row>
<Col md="2">
<Label htmlFor="defaultTimeout">
Default timeout for data (seconds)
</Label>
</Col>
<Col xs="12" md={fieldColWidthMd}>
<Input
type="text"
name="defaultTimeout"
onChange={this.handleChange}
value={this.state.defaultTimeout}
/>
<FormText color="muted">
Unless overridden in metadata for the path data older than
this will be automatically set to null to clear outdated
values. Zero value disables default timeout mechanism.
</FormText>
</Col>
</FormGroup>
<FormGroup row>
<Col md="2">
<Label htmlFor="loggingDirectory">
Expand Down
1 change: 1 addition & 0 deletions src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ export interface Config {
proxy_port?: number
hostname?: string
pruneContextsMinutes?: number
defaultTimeout?: number
mdns?: boolean
sslport?: number
port?: number
Expand Down
59 changes: 55 additions & 4 deletions src/deltacache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,28 @@ import { FullSignalK, getSourceId } from '@signalk/signalk-schema'
import _, { isUndefined } from 'lodash'
import { toDelta } from './streambundle'
import { ContextMatcher, SignalKServer, StreamBundle } from './types'
import { Context, NormalizedDelta, SourceRef } from '@signalk/server-api'
import { Context, NormalizedDelta, Path, SourceRef } from '@signalk/server-api'

interface StringKeyed {
[key: string]: any
}

function sendNullsForOutdatedValues(
x: any,
oldestValidTs: number,
sendNullFor: (context: Context, path: Path, $source: SourceRef) => void
) {
Object.values(x).forEach((v: any) => {
if (v.timestamp) {
if (v.timestamp > oldestValidTs && v.value !== null) {
sendNullFor(v.context, v.path as Path, v.$source as SourceRef)
}
} else {
sendNullsForOutdatedValues(v, oldestValidTs, sendNullFor)
}
})
}

export default class DeltaCache {
cache: StringKeyed = {}
lastModifieds: StringKeyed = {}
Expand All @@ -39,12 +55,44 @@ export default class DeltaCache {
}
} = {}

constructor(app: SignalKServer, streambundle: StreamBundle) {
constructor(
app: SignalKServer,
streambundle: StreamBundle,
defaultTimeout: number
) {
this.app = app
streambundle.keys.onValue((key) => {
streambundle.getBus(key).onValue(this.onValue.bind(this))
})

const sendNullFor = (context: Context, path: Path, $source: SourceRef) => {
app.handleMessage('n/a', {
context,
updates: [
{
$source,
values: [
{
path,
value: null
}
]
}
]
})
}

if (defaultTimeout) {
debug(`defaultTimeout set as ${defaultTimeout}, starting background task`)
setInterval(() => {
sendNullsForOutdatedValues(
this.cache,
Date.now() - defaultTimeout * 1000,
sendNullFor
)
}, defaultTimeout * 1000)
}

// String.split() is heavy enough and called frequently enough
// to warrant caching the result. Has a noticeable effect
// on throughput of a server going full blast with the n2k
Expand Down Expand Up @@ -89,14 +137,17 @@ export default class DeltaCache {
true
)

const msgCopy = Object.assign({}, msg) as any
msgCopy.timestamp = Date.now()

if (msg.path.length !== 0) {
leaf[sourceRef] = msg
leaf[sourceRef] = msgCopy
} else if (msg.value) {
_.keys(msg.value).forEach((key) => {
if (!leaf[key]) {
leaf[key] = {}
}
leaf[key][sourceRef] = msg
leaf[key][sourceRef] = msgCopy
})
}
this.lastModifieds[msg.context] = Date.now()
Expand Down
6 changes: 5 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,11 @@ class Server {
)
app.signalk.on('delta', app.streambundle.pushDelta.bind(app.streambundle))
app.subscriptionmanager = new SubscriptionManager(app)
app.deltaCache = new DeltaCache(app, app.streambundle)
app.deltaCache = new DeltaCache(
app,
app.streambundle,
app.config.settings.defaultTimeout
)

app.getHello = () => ({
name: app.config.name,
Expand Down
5 changes: 5 additions & 0 deletions src/serverroutes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,7 @@ module.exports = function (
},
loggingDirectory: app.config.settings.loggingDirectory,
pruneContextsMinutes: app.config.settings.pruneContextsMinutes || 60,
defaultTimeout: app.config.settings.defaultTimeout || 0,
keepMostRecentLogsOnly:
isUndefined(app.config.settings.keepMostRecentLogsOnly) ||
app.config.settings.keepMostRecentLogsOnly,
Expand Down Expand Up @@ -612,6 +613,10 @@ module.exports = function (
)
}

if (!isUndefined(settings.defaultTimeout)) {
app.config.settings.defaultTimeout = Number(settings.defaultTimeout)
}

if (!isUndefined(settings.keepMostRecentLogsOnly)) {
app.config.settings.keepMostRecentLogsOnly =
settings.keepMostRecentLogsOnly
Expand Down
Loading