Sample application which simulates network anomaly intrusion and detection using Apache Flink and Pravega.
This application is based on streaming-state-machine which is slightly extended to demonstrate Pravega/Flink integration capabilities.
Events in streams (generated by devices and services, such as firewalls, routers, authentication services etc.,) are expected to occur in certain patterns. Any deviation from these patterns indicates an anomaly (attempted intrusion) that the streaming system should recognize and that should trigger an alert.
The event patterns are tracked per interacting party (here simplified per source IP address) and are validated by a state machine. The state machine's states define what possible events may occur next, and what new states these events will result in.
The final aggregated results are grouped under network id which acts as a network domain abstraction hosting multiple server machines.
The aggregated results are (optionally) sinked to Elastic Search for visualizing from the Kibana user interface.
The following diagram depicts the state machine used in this example.
+--<a>--> W --<b>--> Y --<e>---+
| ^ | +-----<g>---> TERM
INITIAL-+ | | |
| | +--> (Z)---<f>-+
+--<c>--> X --<b>----+ | ^ |
| | | |
+--------<d>---------+ +--------+
- Pravega running (see here for instructions)
- Build pravega-samples repository
- Apache Flink 1.12 running
- ELK running (optional)
The example program is copied to a distribution folder when built as shown above.
Navigate to scenarios/anomaly-detection/build/install/pravega-flink-scenario-anomaly-detection/
for the below
steps.
The example is split into three separate programs:
- A utility program to create a Pravega stream for use by the example code.
- An event generator producing simulated network events.
- An anomaly detector consuming events.
All programs share a configuration file (conf/app.json
) and a startup script (bin/anomaly-detection
).
Usage:
bin/anomaly-detection --configDir <APP_CONFIG_DIR> --mode <RUN_MODE> [--stream <SCOPE/NAME>] [--controller <CONTROLLER_URI>]
where,
- `APP_CONFIG_DIR` is the directory that contains the `app.json` configuration file
- `RUN_MODE` can take any of the 3 options (1, 2 or 3):
- 1 = Create the configured Pravega stream and scope
- 2 = Publish events to Pravega
- 3 = Run Anomaly detection
- `SCOPE/NAME` Optionally you can change the name of the pravega scope and stream name. Defaults to 'examples/NetworkPacket'
- `CONTROLLER_URI` is the URI to the pravega controller. Defaults to 'tcp://127.0.0.1:9090'
Update conf/app.json
as appropriate (see "Application Configuration" section).
Run the following command to create the Pravega-based event stream:
$ bin/anomaly-detection --configDir conf/ --mode 1
The event generator is a Flink program to be run in either the local Flink environment or on a remote Flink cluster.
- To run in the local environment:
$ bin/anomaly-detection --configDir conf/ --mode 2
- To run on a cluster (given a pre-configured Flink client):
$ flink run -c io.pravega.anomalydetection.ApplicationMain lib/pravega-flink-scenario-anomaly-detection-<VERSION>-all.jar --configDir conf/ --mode 2
Leave the program running for a while, to generate a few events per second for a number of simulated network clients. Type CTRL-C to exit the generator (local environment only).
The generator is able to produce both valid and invalid state transitions. Invalid transitions occur in one of two ways, depending on the configuration:
- Automatically, with some probability (as configured by
errorProbFactor
) - Manually, by pressing ENTER on the console (if
controlledEnv
istrue
)
Hint: To run the application on Flink, set the property
controlledEnv: false
inapp.json
configuration file (as there is no user input). Moreover, the input parameter--configDir
should contain the absolute path of the configuration directory.
The anomaly detector is another Flink program. The program groups events by source IP address, and maintains a state machine for each address as described above.
Alerts are aggregated using a tumbling window. Note that the Flink program uses event time to aggregate alerts.
- To run in the local environment:
$ bin/anomaly-detection --configDir conf/ --mode 3
- To run on a cluster:
$ flink run -c io.pravega.anomalydetection.ApplicationMain lib/pravega-flink-scenario-anomaly-detection-<VERSION>-all.jar --configDir conf/ --mode 3
Ensure that $FLINK_HOME/bin
is on your path to use the flink
command shown above.
When an anomalous event sequence is detected (due to an invalid event generated by whatever means), an alert is generated to STDOUT and to Elasticsearch (if configured).
The state machine associated with each source IP address is reliably stored in Flink's checkpoint state. In a failure scenario, Flink will automatically recover the job state, ensuring exactly-once processing of input events and their corresponding state transitions. Assuming that the event stream contains only valid events, no spurious alerts should be produced following the recovery.
Perform the below steps to test job state recovery. This works only when checkpoint is enabled and a reliable state backend is configured to store checkpoint/savepoint snapshot (flink-conf.yaml).
- Cancel the current running job using
-s
option which will create a savepoint in the configured savepoint location (as defined inflink-conf.yaml
):
bin/flink cancel -s <JOB_ID>
- To resume from a savepoint, run the below command:
bin/flink run -s <SAVEPOINT_LOCATION> -c io.pravega.anomalydetection.ApplicationMain <PATH_TO_pravega-flink-scenario-anomaly-detection-<VERSION>-all.jar> --configDir <APP_CONFIG_DIR> --mode 3
The job should nicely recover from the last checkpointed state. As mentioned, you should not see any spurious alerts.
The alerts may be visualized using Kibana or by examining the TaskManager's 'stdout' (as shown in the Flink Web UI).
{
"name": "anomaly-detection",
"producer": {
"latencyInMilliSec": 2000,
"capacity": 100,
"errorProbFactor": 0.3,
"controlledEnv": false
},
"pipeline": {
"parallelism": 1,
"checkpointIntervalInMilliSec": 1000,
"disableCheckpoint": false,
"watermarkOffsetInSec": 0,
"windowIntervalInSeconds": 30,
"elasticSearch": {
"sinkResults": false,
"host": "127.0.0.1",
"port": 9300,
"cluster": "elasticsearch",
"index": "anomaly-index",
"type":"anomalies"
}
}
}
latencyInMilliSec
- how frequently events needs to be generated and published to Pravega
capacity
- initial capacity till which the error records will not be generated
errorProbFactor
- how frequently error records needs to be simulated. Provide a value between 0.0 and 1.0
controlledEnv
- When this is true, the errorProbFactor
value will be ignored and the error record will be generated only when user presses ENTER on the console
parallelism
- This value will be used to define pravega segment count/fixed scaling policy and Flink job parallelism
controllerUri
- Pravega controller endpoint
elasticSearch
- Final results will be sinked to elasticsearch if sinkResults
is set to true
windowIntervalInSeconds
- Window frequency interval
watermarkOffsetInSec
- Window watermark offset interval
disableCheckpoint
- If checkpoint is enabled, make sure Flink cluster uses appropriate state backend (checkpoint/savepoint) configurations
The steps are tested on Ubuntu 16.x OS/docker 1.13.1/JDK8/ElasticSearch5/Kibana5.3.0
Install Elastic Search and Kibana
Install Elastic Search
sudo mkdir -p /elk-stack/esdata
sudo chmod 777 -R /elk-stack/
docker run -d --name elasticsearch --net="host" -v /elk-stack/esdata:/usr/share/elasticsearch/data elasticsearch
Verify if ES is running by executing the command:
curl -X GET http://localhost:9200
update /etc/hosts with 127.0.0.1 elasticsearch (this is for Kibana to lookup ES)
Install Kibana
docker run -d --name kibana --net="host" kibana
Verify by going to the URL: http://localhost:5601
Create Elastic Search Index and define schema
curl -XDELETE "http://localhost:9200/anomaly-index"
curl -XPUT "http://localhost:9200/anomaly-index"
curl -XPUT "http://localhost:9200/anomaly-index/_mapping/anomalies" -d'
{
"anomalies" : {
"properties" : {
"count": {"type": "integer"},
"location": {"type": "geo_point"},
"minTimestampAsString": {"type": "date"},
"maxTimestampAsString": {"type": "date"}
}
}
}'
Visualize results in Kibana
- Create index pattern for "anomaly-index" and set it as default index
- Visualize the metrics using "Tile Map" option and choose "sum" aggregation on the field "count"
- Select Geo-Coordinates bucket for the field "location"
- You can now visualize the total anomalies per geo location for the simulated time window period
Final results filtered based on lat/long coordinates from Kibana