Skip to content

Commit

Permalink
Add document and support report interval in metric store
Browse files Browse the repository at this point in the history
This closes #171.
  • Loading branch information
yunfengzhou-hub authored Jul 28, 2023
1 parent a12083e commit df400be
Show file tree
Hide file tree
Showing 16 changed files with 541 additions and 46 deletions.
3 changes: 3 additions & 0 deletions docs/content/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,8 @@ applications.
- [Hive](connectors/hive.md)
- [Deep Dive](deep-dive)
- [Built-in Optimizations](deep-dive/optimizations.md)
- [Metric Stores](metric-stores)
- [Overview](metric-stores/overview.md)
- [Prometheus](metric-stores/prometheus.md)
- [How To](how-to)
- [Deploy FeatHub Job on Alibaba Cloud](how-to/deploy-on-alibaba-cloud.md)
4 changes: 4 additions & 0 deletions docs/content/metric-stores/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Metric stores

- [Overview](overview.md)
- [Prometheus](prometheus.md)
113 changes: 113 additions & 0 deletions docs/content/metric-stores/overview.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# Overview

A MetricStore provides properties to set the metrics of a Feathub job into an
external metric service. A metric refers to a statistic of a characteristic of a
feature.

## Metric store configurations

Below are common configurations shared by different metric store
implementations. The document of each metric store contains its specific
configurations.

| Key | Required | Default | Type | Description |
| ------------------- | -------- | --------- | ------- | ------------------------------------------------------------ |
| type | Required | - | String | The type of the metric store to use. |
| report_interval_sec | Optional | 10 | Float | The interval in seconds to report metrics. |
| namespace | Optional | "default" | String | The namespace to report metrics to the metric store. Metrics within different namespace will not overwrite each other. |

## Defining metrics

Feathub supports defining metrics at feature's granularity. Below is an example
of defining a metric for a feature.

```python
f_total_cost = Feature(
name="total_cost",
transform=SlidingWindowTransform(
expr="cost",
agg_func="SUM",
group_by_keys=["name"],
window_size=timedelta(days=2),
step_size=timedelta(days=1),
),
metrics=[
Count(
filter_expr="> 100",
window_size=timedelta(days=1),
),
Ratio(
filter_expr="IS NULL",
window_size=timedelta(hours=1),
),
],
)
```

## Metric reporting format

Metrics reported by metric stores will have the following format by default.
Some metric stores might override the default format, and please check the
document of each metric store for details.

- Metric name: `"{namespace}_{feature_name}_{metric_type}"`
- namespace: The namespace of the metric store.
- feature_name: The name of the host feature.
- metric_type: The type of the metric.
- Metric tags:
- namespace: The namespace of the metric store.
- table_name: The name of the sink where the host features would be written
to.
- feature_name: The name of the host feature.
- other metric-specific tags.

## Built-in metrics

Below are Feathub's built-in metrics's metric types, their parameters and their
exposed tags.

### Count

Count is a metric that shows the number of features. It has the following
parameters:

- filter_expr: Optional with None as the default value. If it is not None, it
represents a partial FeatHub expression which evaluates to a boolean value.
The partial Feathub expression should be a binary operator whose left child is
absent and would be filled in with the host feature name. For example, "IS
NULL" will be enriched into "{feature_name} IS NULL". Only features that
evaluate this expression into True will be considered when computing the
metric.
- window_size: Optional with 0 as the default value. The time range to compute
the metric. It should be zero or a positive time span. If it is zero, the
metric will be computed from all feature values that have been processed since
the Feathub job is created.

It exposes the following metric-specific tags:

- metric_type: "count"
- filter_expr: The value of the filter_expr parameter.
- window_size_sec: The value of the window_size parameter in seconds.

### Ratio

Ratio is a metric that shows the proportion of the number features that meets
filter_expr to the number of all features. It has the following parameters:

- filter_expr: A partial FeatHub expression which evaluates to a boolean value.
The partial Feathub expression should be a binary operator whose left child is
absent and would be filled in with the host feature name. For example, "IS
NULL" will be enriched into "{feature_name} IS NULL". Only features that
evaluate this expression into True will be considered when computing the
metric.
- window_size: Optional with 0 as the default value. The time range to compute
the metric. It should be zero or a positive time span. If it is zero, the
metric will be computed from all feature values that have been processed since
the Feathub job is created.

It exposes the following metric-specific tags:

- metric_type: "ratio"
- filter_expr: The value of the filter_expr parameter.
- window_size_sec: The value of the window_size parameter in seconds.

62 changes: 62 additions & 0 deletions docs/content/metric-stores/prometheus.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# Prometheus

The PrometheusMetricStore supports reporting metrics to a Prometheus service
through [Prometheus Pushgateway](https://github.com/prometheus/pushgateway).

## Metric reporting format

The metrics reported by PrometheusMetricStore generally match the default format
with the following modifications.

- It uses the label name "job" instead of "namespace" to hold the namespace
value of the metric store.
- table_name is not only reported as a label of the metric, but also the
grouping key of the job in Prometheus PushGateway.


## Configurations

The following describes the configurations required to set up a
PrometheusMetricStore instance.

| Key | Required | Default | Type | Description |
| ------------------ | -------- | ------- | ------- | ------------------------------------------------------------ |
| server_url | Required | - | String | The PushGateway server URL including scheme, host name, and port. |
| delete_on_shutdown | Optional | True | Boolean | Whether to delete metrics from Prometheus when the job finishes. When set to true, Feathub will try its best to delete the metrics but this is not guaranteed. |

## Examples

Here is an example that creates a FeathubClient that reports metrics to
Prometheus.

```python
client = FeathubClient(
{
"processor": {
"type": "local",
},
"online_store": {
"types": ["memory"],
"memory": {},
},
"registry": {
"type": "local",
"local": {
"namespace": "default",
},
},
"feature_service": {
"type": "local",
"local": {},
},
"metric_store": {
"type": "prometheus",
"report_interval_sec": 5,
"prometheus": {
"server_url": "localhost:8080",
"delete_on_shutdown": False,
},
}
}
)
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2022 The FeatHub Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.feathub.flink.udf;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.types.Row;

import java.util.ArrayList;
import java.util.List;

/** Utility methods for Java UDFs. */
public class JavaUdfUtils {
/**
* Evaluates a Java UDF against a DataStream.
*
* @param stream DataStream containing input data for the UDF.
* @param className The classpath of the UDF to be evaluated. Must be a subclass of {@link
* OneInputStreamOperator}.
* @param parameters The constructor parameters of the UDF.
* @return DataStream containing output data of the UDF.
*/
@SuppressWarnings({"unchecked"})
public static DataStream<Row> evalJavaUdf(
DataStream<Row> stream, String className, List<?> parameters) throws Exception {
if (stream.getParallelism() != 1) {
// The parallelism of currently supported JDFs must be 1.
throw new UnsupportedOperationException(
"The parallelism of the upstream of a Java UDF must be 1.");
}

// TODO: Support cases in which the input and output type differs.
return stream.transform(
"JavaUDF",
stream.getType(),
(OneInputStreamOperator<Row, Row>) instantiate(className, parameters))
.setParallelism(stream.getParallelism());
}

private static Object instantiate(String className, List<?> parameters) throws Exception {
Class<?> clazz = Class.forName(className);
List<Class<?>> parameterClasses = new ArrayList<>();
for (Object parameter : parameters) {
parameterClasses.add(parameter.getClass());
}
return clazz.getConstructor(parameterClasses.toArray(new Class[0]))
.newInstance(parameters.toArray());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright 2022 The FeatHub Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.feathub.flink.udf;

import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.types.Row;

import javax.annotation.Nullable;

import java.io.Serializable;
import java.util.Collections;
import java.util.Iterator;

/** An operator that periodically emits the last value it has received from upstream operator. */
public class PeriodicEmitLastValueOperator extends AbstractStreamOperator<Row>
implements OneInputStreamOperator<Row, Row>, Serializable, BoundedOneInput {
private final long emitIntervalMs;
@Nullable private Row lastValue;
private ListState<Row> lastValueState;

public PeriodicEmitLastValueOperator(Double emitIntervalSec) {
this.emitIntervalMs = (long) (emitIntervalSec * 1000);
}

@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
lastValueState =
context.getOperatorStateStore()
.getListState(new ListStateDescriptor<>("lastValueState", Row.class));
Iterator<Row> iterator = lastValueState.get().iterator();
if (iterator.hasNext()) {
lastValue = iterator.next();
}
}

@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
if (lastValue != null) {
lastValueState.update(Collections.singletonList(lastValue));
}
}

@Override
public void processElement(StreamRecord<Row> element) {
if (lastValue == null) {
getProcessingTimeService()
.registerTimer(
getProcessingTimeService().getCurrentProcessingTime() + emitIntervalMs,
new EmitLastValueCallback());
}
lastValue = element.getValue();
}

@Override
public void endInput() {
if (lastValue != null) {
output.collect(new StreamRecord<>(lastValue));
}
}

private class EmitLastValueCallback implements ProcessingTimeService.ProcessingTimeCallback {
@Override
public void onProcessingTime(long timestamp) {
output.collect(new StreamRecord<>(lastValue));
getProcessingTimeService().registerTimer(timestamp + emitIntervalMs, this);
}
}
}
Loading

0 comments on commit df400be

Please sign in to comment.