Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

usage-based reporting & billing v2 #1123

Merged
merged 4 commits into from
Jul 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ Sources {
"description": "Logs related to the processing of a Flow capture, derivation, or materialization",
"type": "object",
"properties": {
"shard": { "$ref": "ops-shard-schema.json" },
"shard": {
"$ref": "ops-shard-schema.json"
},
"ts": {
"description": "Timestamp corresponding to the start of the transaction",
"type": "string",
Expand Down Expand Up @@ -189,7 +191,9 @@ Sources {
"description": "Statistics related to the processing of a Flow capture, derivation, or materialization",
"type": "object",
"properties": {
"shard": { "$ref": "ops-shard-schema.json" },
"shard": {
"$ref": "ops-shard-schema.json"
},
"ts": {
"description": "Timestamp corresponding to the start of the transaction",
"type": "string",
Expand All @@ -198,12 +202,16 @@ Sources {
"openSecondsTotal": {
"description": "Total time that the transaction was open before starting to commit",
"type": "number",
"reduce": { "strategy": "sum" }
"reduce": {
"strategy": "sum"
}
},
"txnCount": {
"description": "Total number of transactions represented by this stats document",
"type": "integer",
"reduce": { "strategy": "sum" }
"reduce": {
"strategy": "sum"
}
},
"capture": {
"description": "Capture stats, organized by collection. The keys of this object are the collection names, and the values are the stats for that collection.",
Expand All @@ -219,23 +227,37 @@ Sources {
"$ref": "#/$defs/docsAndBytes"
}
},
"reduce": { "strategy": "merge" }
"reduce": {
"strategy": "merge"
}
},
"reduce": { "strategy": "merge" }
"reduce": {
"strategy": "merge"
}
},
"materialize": {
"description": "A map of each binding source (collection name) to combiner stats for that binding",
"type": "object",
"additionalProperties": {
"type": "object",
"properties": {
"left": { "$ref": "#/$defs/docsAndBytes" },
"right": { "$ref": "#/$defs/docsAndBytes" },
"out": { "$ref": "#/$defs/docsAndBytes" }
"left": {
"$ref": "#/$defs/docsAndBytes"
},
"right": {
"$ref": "#/$defs/docsAndBytes"
},
"out": {
"$ref": "#/$defs/docsAndBytes"
}
},
"reduce": { "strategy": "merge" }
"reduce": {
"strategy": "merge"
}
},
"reduce": { "strategy": "merge" }
"reduce": {
"strategy": "merge"
}
},
"derive": {
"type": "object",
Expand All @@ -246,20 +268,73 @@ Sources {
"additionalProperties": {
"$ref": "#/$defs/transformStats"
},
"reduce": { "strategy": "merge" }
"reduce": {
"strategy": "merge"
}
},
"published": { "$ref": "#/$defs/docsAndBytes" },
"out": { "$ref": "#/$defs/docsAndBytes" }
"published": {
"$ref": "#/$defs/docsAndBytes"
},
"out": {
"$ref": "#/$defs/docsAndBytes"
}
},
"reduce": { "strategy": "merge" }
"reduce": {
"strategy": "merge"
}
},
"interval": {
"type": "object",
"properties": {
"uptimeSeconds": {
"description": "Number of seconds that the task shard is metered as having been running",
"type": "integer",
"reduce": {
"strategy": "sum"
},
"minimum": 1
},
"usageRate": {
"description": "Usage rate which adjusts `uptimeSeconds` to determine the task's effective usage",
"type": "number",
"minimum": 0,
"default": 0
}
},
"required": ["uptimeSeconds"],
"reduce": {
"strategy": "merge"
}
}
},
"reduce": { "strategy": "merge" },
"required": ["shard", "ts", "txnCount", "openSecondsTotal"],
"reduce": {
"strategy": "merge"
},
"required": [
"shard",
"ts"
],
"oneOf": [
{ "required": ["capture"] },
{ "required": ["derive"] },
{ "required": ["materialize"] }
{
"required": [
"capture"
]
},
{
"required": [
"derive"
]
},
{
"required": [
"materialize"
]
},
{
"required": [
"interval"
]
}
],
"$defs": {
"docsAndBytes": {
Expand All @@ -269,17 +344,26 @@ Sources {
"description": "Total number of documents",
"type": "integer",
"default": 0,
"reduce": { "strategy": "sum" }
"reduce": {
"strategy": "sum"
}
},
"bytesTotal": {
"description": "Total number of bytes representing the JSON encoded documents",
"type": "integer",
"default": 0,
"reduce": { "strategy": "sum" }
"reduce": {
"strategy": "sum"
}
}
},
"reduce": { "strategy": "merge" },
"required": ["docsTotal", "bytesTotal"]
"reduce": {
"strategy": "merge"
},
"required": [
"docsTotal",
"bytesTotal"
]
},
"transformStats": {
"description": "Stats for a specific transform of a derivation, which will have an update, publish, or both.",
Expand All @@ -294,8 +378,12 @@ Sources {
"$ref": "#/$defs/docsAndBytes"
}
},
"required": ["input"],
"reduce": { "strategy": "merge" }
"required": [
"input"
],
"reduce": {
"strategy": "merge"
}
}
}
},
Expand Down
28 changes: 22 additions & 6 deletions crates/proto-flow/src/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,7 @@ pub mod log {
}
/// Stats is Flow's unified representation of task metrics and statistics.
///
/// TODO(johnny): We should evolve this into a consolidated message
/// having a serde serialization corresponding to the ops/stats
/// collection.
///
/// Let's make this the one true representation for stats.
/// So far, I've just done Derive and top-level fields.
/// Next tag: 10.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Stats {
Expand Down Expand Up @@ -185,6 +180,8 @@ pub struct Stats {
::prost::alloc::string::String,
stats::Binding,
>,
#[prost(message, optional, tag = "9")]
pub interval: ::core::option::Option<stats::Interval>,
}
/// Nested message and enum types in `Stats`.
pub mod stats {
Expand Down Expand Up @@ -241,6 +238,25 @@ pub mod stats {
pub input: ::core::option::Option<super::DocsAndBytes>,
}
}
/// Interval metrics are emitted at regular intervals.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Interval {
/// Number of seconds that the task shard is metered as having been running.
/// This is measured by sampling for uptime at fixed wall-clock intervals
/// (for example, at precisely XX:05:00, XX:10:00, XX:15:00, and so on).
#[prost(uint32, tag = "1")]
pub uptime_seconds: u32,
/// Usage rate adjustment which accompanies and adjusts `uptime_seconds`.
/// The effective number of "used" task seconds is:
/// round(uptime_seconds * usage_rate)
///
/// At present, capture and materialization tasks always use a fixed value of 1.0,
/// while derivation tasks use a fixed value of 0.0.
/// The choice of `usage_rate` MAY have more critera in the future.
#[prost(float, tag = "2")]
pub usage_rate: f32,
}
}
/// The type of a catalog task.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
Expand Down
Loading
Loading