Skip to content

Commit

Permalink
[CELEBORN-1490][CIP-6] Add Flink hybrid shuffle doc
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Add Flink hybrid shuffle doc

### Why are the changes needed?
We need the doc for the new hybrid shuffle mode.

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?

no neeed.

Closes apache#2867 from reswqa/add-hs-doc.

Authored-by: Weijie Guo <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
  • Loading branch information
reswqa authored and SteNicholas committed Nov 1, 2024
1 parent 165e914 commit 41fdb8a
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 2 deletions.
28 changes: 27 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -311,10 +311,17 @@ spark.executor.userClassPathFirst false
```

### Deploy Flink client

**Important: Only Flink batch jobs are supported for now.**

Copy `$CELEBORN_HOME/flink/*.jar` to `$FLINK_HOME/lib/`.

#### Flink Configuration
To use Celeborn, the following flink configurations should be added.
Celeborn supports two Flink integration strategies: remote shuffle service (since Flink 1.14) and [hybrid shuffle](https://nightlies.apache.org/flink/flink-docs-stable/docs/ops/batch/batch_shuffle/#hybrid-shuffle) (since Flink 1.20).

To use Celeborn, you can choose one of them and add the following Flink configurations.

##### Flink Remote Shuffle Service Configuration
```properties
shuffle-service-factory.class: org.apache.celeborn.plugin.flink.RemoteShuffleServiceFactory
execution.batch-shuffle-mode: ALL_EXCHANGES_BLOCKING
Expand All @@ -337,6 +344,25 @@ taskmanager.memory.task.off-heap.size: 512m
```
**Note**: The config option `execution.batch-shuffle-mode` should configure as `ALL_EXCHANGES_BLOCKING`.

##### Flink Hybrid Shuffle Configuration
```properties
shuffle-service-factory.class: org.apache.flink.runtime.io.network.NettyShuffleServiceFactory
taskmanager.network.hybrid-shuffle.external-remote-tier-factory.class: org.apache.celeborn.plugin.flink.tiered.CelebornTierFactory
execution.batch-shuffle-mode: ALL_EXCHANGES_HYBRID_FULL
jobmanager.partition.hybrid.partition-data-consume-constraint: ALL_PRODUCERS_FINISHED

celeborn.master.endpoints: clb-1:9097,clb-2:9097,clb-3:9097
celeborn.client.shuffle.batchHandleReleasePartition.enabled: true
celeborn.client.push.maxReqsInFlight: 128
# Network connections between peers
celeborn.data.io.numConnectionsPerPeer: 16
# threads number may vary according to your cluster but do not set to 1
celeborn.data.io.threads: 32
celeborn.client.shuffle.batchHandleCommitPartition.threads: 32
celeborn.rpc.dispatcher.numThreads: 32
```
**Note**: The config option `execution.batch-shuffle-mode` should configure as `ALL_EXCHANGES_HYBRID_FULL`.

### Deploy MapReduce client
Copy `$CELEBORN_HOME/mr/*.jar` into `mapreduce.application.classpath` and `yarn.application.classpath`.
Meanwhile, configure the following settings in YARN and MapReduce config.
Expand Down
16 changes: 16 additions & 0 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ INFO [async-reply] Controller: CommitFiles for local-1690000152711-0 success wit
```

## Start Flink with Celeborn

**Important: Only Flink batch jobs are supported for now.**

#### Copy Celeborn Client to Flink's lib
Celeborn release binary contains clients for Flink 1.14.x, Flink 1.15.x, Flink 1.17.x, Flink 1.18.x, Flink 1.19.x and Flink 1.20.x, copy the corresponding client jar into Flink's
`lib/` directory:
Expand All @@ -138,12 +141,25 @@ vi conf/flink-conf.yaml
cd $FLINK_HOME
vi conf/config.yaml
```

Choose one of flink integration strategies and add the following configuration:

**(Support Flink 1.14 and above versions) Flink Remote Shuffle Service Config**
```properties
shuffle-service-factory.class: org.apache.celeborn.plugin.flink.RemoteShuffleServiceFactory
execution.batch-shuffle-mode: ALL_EXCHANGES_BLOCKING
```
**Note**: The config option `execution.batch-shuffle-mode` should configure as `ALL_EXCHANGES_BLOCKING`.

**(Support Flink 1.20 and above versions) Flink [hybrid shuffle](https://nightlies.apache.org/flink/flink-docs-stable/docs/ops/batch/batch_shuffle/#hybrid-shuffle) Config**
```properties
shuffle-service-factory.class: org.apache.flink.runtime.io.network.NettyShuffleServiceFactory
taskmanager.network.hybrid-shuffle.external-remote-tier-factory.class: org.apache.celeborn.plugin.flink.tiered.CelebornTierFactory
execution.batch-shuffle-mode: ALL_EXCHANGES_HYBRID_FULL
jobmanager.partition.hybrid.partition-data-consume-constraint: ALL_PRODUCERS_FINISHED
```
**Note**: The config option `execution.batch-shuffle-mode` should configure as `ALL_EXCHANGES_HYBRID_FULL`.

Then deploy the example word count job to the running cluster:
```shell
cd $FLINK_HOME
Expand Down
28 changes: 27 additions & 1 deletion docs/deploy.md
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,17 @@ spark.executor.userClassPathFirst false
```

## Deploy Flink client

**Important: Only Flink batch jobs are supported for now.**

Copy `$CELEBORN_HOME/flink/*.jar` to `$FLINK_HOME/lib/`.

### Flink Configuration
To use Celeborn, the following flink configurations should be added.
Celeborn supports two Flink integration strategies: remote shuffle service (since Flink 1.14) and [hybrid shuffle](https://nightlies.apache.org/flink/flink-docs-stable/docs/ops/batch/batch_shuffle/#hybrid-shuffle) (since Flink 1.20).

To use Celeborn, you can choose one of them and add the following Flink configurations.

#### Flink Remote Shuffle Service Configuration
```properties
shuffle-service-factory.class: org.apache.celeborn.plugin.flink.RemoteShuffleServiceFactory
execution.batch-shuffle-mode: ALL_EXCHANGES_BLOCKING
Expand All @@ -232,6 +239,25 @@ taskmanager.memory.task.off-heap.size: 512m
```
**Note**: The config option `execution.batch-shuffle-mode` should configure as `ALL_EXCHANGES_BLOCKING`.

##### Flink Hybrid Shuffle Configuration
```properties
shuffle-service-factory.class: org.apache.flink.runtime.io.network.NettyShuffleServiceFactory
taskmanager.network.hybrid-shuffle.external-remote-tier-factory.class: org.apache.celeborn.plugin.flink.tiered.CelebornTierFactory
execution.batch-shuffle-mode: ALL_EXCHANGES_HYBRID_FULL
jobmanager.partition.hybrid.partition-data-consume-constraint: ALL_PRODUCERS_FINISHED

celeborn.master.endpoints: clb-1:9097,clb-2:9097,clb-3:9097
celeborn.client.shuffle.batchHandleReleasePartition.enabled: true
celeborn.client.push.maxReqsInFlight: 128
# Network connections between peers
celeborn.data.io.numConnectionsPerPeer: 16
# threads number may vary according to your cluster but do not set to 1
celeborn.data.io.threads: 32
celeborn.client.shuffle.batchHandleCommitPartition.threads: 32
celeborn.rpc.dispatcher.numThreads: 32
```
**Note**: The config option `execution.batch-shuffle-mode` should configure as `ALL_EXCHANGES_HYBRID_FULL`.

## Deploy MapReduce client
Copy `$CELEBORN_HOME/mr/*.jar` into `mapreduce.application.classpath` and `yarn.application.classpath`.
Meanwhile, configure the following settings in YARN and MapReduce config.
Expand Down

0 comments on commit 41fdb8a

Please sign in to comment.