-
Notifications
You must be signed in to change notification settings - Fork 496
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Docs] Offline batch inference guide (static assignment) #4144
base: master
Are you sure you want to change the base?
Changes from all commits
adbbdd8
487b5f2
38ef2e3
674582b
a2968ae
a01d867
0146d46
47a40c2
faaece4
48cc391
cbbb0cb
921cc92
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,315 @@ | ||
.. _offline-batch-inference: | ||
|
||
Large-Scale Offline Batch Inference | ||
=================================== | ||
|
||
|
||
Offline batch inference is a process for generating model predictions on a fixed set of input data. By batching multiple inputs into a single request, you can significantly improve the GPU utilization and reduce the total inference cost. | ||
|
||
It is a common use case for AI: | ||
|
||
* Large-scale document processing (summarization, entity retreival, etc) | ||
* Data pre-processing for training | ||
* Synthetic data generation | ||
* Scientific data analysis | ||
* ... | ||
|
||
.. SkyPilot enables large scale batch inference with a simple interface, offering the following benefits: | ||
|
||
.. * Cost-effective: Pay only for the resources you use, and even cheaper spot instances. | ||
.. * Faster: Scales out your jobs to multiple machines from any available resource pool. | ||
.. * Robust: Automatically handles failures and recovers jobs. | ||
.. * Easy to use: Abstracts away the complexity of distributed computing, giving you a simple interface to manage your jobs. | ||
.. * Mounted Storage: Access data on object store as if they are local files. | ||
|
||
We now walk through the steps for developing and scaling out batch inference with SkyPilot. | ||
We take a real-world example, LMSys-1M dataset, and a popular open-source model, Meta-Llama-3.1-7B, to showcase the process. | ||
All scripts can be found in the `examples in SkyPilot repository <>`__. | ||
|
||
**TL;DR:** To run batch inference on LMSys-1M dataset with Meta-Llama-3.1-7B model, you can use the following command: | ||
|
||
.. code-block:: bash | ||
|
||
NUM_PARALLEL_GROUPS=16 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "groups" is surprising/unfamiliar; do you mean "workers"? |
||
for i in $(seq 0 $((NUM_PARALLEL_GROUPS - 1))); do | ||
sky jobs launch -y -n group-$i worker.yaml \ | ||
--env DATA_GROUP_METADATA=./groups/$i.txt \ | ||
--env MODEL_NAME=meta-llama/Meta-Llama-3.1-8B-Instruct & | ||
done | ||
|
||
.. image:: An image for the `sky jobs queue` | ||
|
||
.. _split-data-into-smaller-chunks: | ||
|
||
Split Data into Smaller Chunks | ||
------------------------------ | ||
|
||
[LMSys-1M dataset](https://huggingface.co/datasets/lmsys/lmsys-chat-1m) contains 1M conversations with 6 parquet files: | ||
|
||
.. code-block:: | ||
|
||
train-00000-of-00006-*.parquet | ||
train-00001-of-00006-*.parquet | ||
train-00002-of-00006-*.parquet | ||
train-00003-of-00006-*.parquet | ||
train-00004-of-00006-*.parquet | ||
train-00005-of-00006-*.parquet | ||
|
||
|
||
For simplicity, in order to scale out the inference to more than 6 nodes, we can firstly split the entire dataset into smaller chunks. | ||
|
||
.. note:: | ||
|
||
This step is optional. We offer a R2 bucket with LMSys-1M dataset that is already divided into smaller chunks: ``r2://skypilot-lmsys-chat-1m`` (Check it in browser `here <https://pub-109f99b93eac4c22939d0ed4385f0dcd.r2.dev>`_). | ||
|
||
.. TODO: confirm r2 bucket's public access | ||
|
||
We first start a machine for development. The following command will start a small CPU machine with the current directory synced and a R2 bucket mounted, so we can directly operate on the data as if they are on local disk. | ||
|
||
.. code-block:: bash | ||
|
||
sky launch -c dev dev.yaml --workdir . | ||
# SSH into the remote machine. | ||
ssh dev | ||
# Running on remote machine. | ||
cd sky_workdir | ||
|
||
On the remote dev machine, we download the LMSys-1M dataset and split it into smaller chunks, using a utility script: | ||
|
||
.. code-block:: bash | ||
|
||
python utils/download_and_convert.py | ||
|
||
This script converts the dataset into 200 data chunks, each containing 5000 conversations. A metadata file is also generated with all paths to data chunks: | ||
|
||
.. code-block:: | ||
|
||
metadata.txt | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would be nice to show the first few lines of metadata file. |
||
part_0.jsonl | ||
part_1.jsonl | ||
... | ||
part_199.jsonl | ||
|
||
.. note:: | ||
|
||
We use R2 bucket for its no data egress fee, so we can easily scale out the inference to multiple regions/clouds without additional costs for data reading. | ||
|
||
|
||
.. _develop-inference-script: | ||
|
||
Develop Inference Script | ||
------------------------ | ||
|
||
With the data splitted into smaller chunks, we can now develop an inference script to generate predictions for each chunk. | ||
|
||
First of all, we start another dev machine with GPUs so we can develop and debug the inference script by directly logging into the machine and running the script. | ||
|
||
.. code-block:: bash | ||
|
||
sky launch -c dev dev.yaml --gpus L4 --workdir . | ||
ssh dev | ||
cd sky_workdir | ||
|
||
We now develop the inference script to generate predictions for the first turn of each conversation in LMSys-1M dataset. | ||
|
||
The following is an example script, where we aggregate multiple inputs into a single batch for better GPU utilization, and process the entire chunk of data batch by batch: | ||
|
||
.. code-block:: python | ||
|
||
from vllm import LLM | ||
|
||
BATCH_CHAR_COUNT = 2000 | ||
DATA_PATH = '/data/part_0.jsonl' | ||
OUTPUT_PATH = '/output' | ||
|
||
llm = LLM(model='meta-llama/Meta-Llama-3.1-7B-Instruct', tensor_parallel_size=1) | ||
|
||
def batch_inference(llm: LLM, data_path: str): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Rename? It seems to be working on 1 work item / 1 batch only, and there's no 'batching'? |
||
# This can take about 1-2 hours on a L4 GPU. | ||
print(f'Processing {data_path}...') | ||
data_name = data_path.split('/')[-1] | ||
|
||
# Read data (jsonl), each line is a json object | ||
with open(data_path, 'r') as f: | ||
data = f.readlines() | ||
# Extract the first message from the conversation | ||
messages = [json.loads(d.strip())['conversation'][0]['content'] for d in data] | ||
|
||
# Run inference | ||
batch_char_count = 0 | ||
batch_messages = [] | ||
generated_text = [] | ||
for message in tqdm(messages): | ||
# Calculate the word count of the conversation | ||
char_count = len(message) | ||
batch_char_count += char_count | ||
|
||
if batch_char_count > BATCH_CHAR_COUNT: | ||
outputs = llm.generate(batch_messages, SAMPLING_PARAMS, use_tqdm=False) | ||
generated_text = [] | ||
for output in outputs: | ||
generated_text.append(' '.join([o.text for o in output.outputs])) | ||
batch_messages = [] | ||
batch_char_count = 0 | ||
|
||
batch_messages.append(message) | ||
|
||
# Save predictions | ||
os.makedirs(OUTPUT_PATH, exist_ok=True) | ||
with open(os.path.join(OUTPUT_PATH, data_name), 'w') as f: | ||
for text in generated_text: | ||
f.write(text + '\n') | ||
|
||
batch_inference(llm, DATA_PATH) | ||
|
||
For complete script, see `examples/batch_inference/inference.py <https://github.com/skypilot-org/skypilot/blob/main/examples/batch_inference/inference.py>`_ and you can run it with ``HF_TOKEN=<your-huggingface-token> python inference.py`` to test it on the dev machine. | ||
|
||
After testing it on the dev machine, we can now compose a task yaml (`inference.yaml <https://github.com/skypilot-org/skypilot/blob/main/examples/batch_inference/inference.yaml>`) to run the inference on clouds. | ||
|
||
.. code-block:: bash | ||
|
||
# Set HuggingFace token for accessing Llama model weights. | ||
export HF_TOKEN=... | ||
sky launch -c inf ./inference.yaml \ | ||
--env HF_TOKEN | ||
|
||
.. TODO: make r2 bucket publically accessible | ||
.. tested with inference.py and inference.yaml on 2024-09-15 and works well. | ||
|
||
.. _scale-out-to-multiple-nodes: | ||
|
||
Scale Out to Multiple Nodes | ||
--------------------------- | ||
|
||
To scale out the inference to multiple machines, we can group the data chunks into multiple pieces so that each machine can process one piece. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reduce concept: yet another one here 'piece'! |
||
|
||
The following script (`group_data.py <https://github.com/skypilot-org/skypilot/blob/main/examples/batch_inference/group_data.py>`_) reads the metadata file and splits the path of data chunks into multiple groups. | ||
|
||
.. code-block:: python | ||
|
||
NUM_GROUPS = 16 | ||
|
||
def group_data(data_paths: str, num_groups: int): | ||
# Chunk data paths in to multiple groups | ||
data_groups = [] | ||
group_size = len(data_paths) // num_groups | ||
for i in range(num_groups): | ||
data_groups.append(data_paths[i * group_size:(i + 1) * group_size]) | ||
return data_groups | ||
|
||
data_groups = chunk_data(data_paths, NUM_GROUPS) | ||
|
||
# Save data chunks to different files | ||
for i, data_group in enumerate(data_groups): | ||
with open(f'./groups/{i}.txt', 'w') as f: | ||
f.write('\n'.join(data_group)) | ||
|
||
|
||
.. code-block:: | ||
|
||
# ./groups/0.txt | ||
part_0.jsonl | ||
part_1.jsonl | ||
... | ||
part_13.jsonl | ||
|
||
On dev machine, we can use the ``group_data.py`` script to group data chunks into the number of machines we want to scale out. | ||
|
||
.. code-block:: bash | ||
|
||
python group_data.py \ | ||
--data-metadata ./metadata.txt \ | ||
--num-groups 16 | ||
|
||
After that, we can launch a job for each group to process the groups in parallel. | ||
|
||
.. code-block:: bash | ||
|
||
# Launch a job for each chunk | ||
NUM_CHUNKS=16 | ||
for i in $(seq 0 $((NUM_CHUNKS - 1))); do | ||
# We use & to launch jobs in parallel | ||
sky jobs launch -y -d -n group-$i worker.yaml \ | ||
--env DATA_GROUP_METADATA=./groups/$i.txt & | ||
done | ||
|
||
.. Tested worker on 2024-09-15 with a group containing multiple data parts. | ||
|
||
Cut Costs by ~5x with Spot Instances and Specialized AI Clouds | ||
-------------------------------------------------------------- | ||
|
||
Batch inference can get pretty expensive when it involves large models and high-end | ||
GPUs. By leveraging spot instances and specialized clouds, you should achieve around | ||
5x cost reduction by giving away some robustness guarantee. | ||
|
||
To handle the robustness issue, we can wrap our batch inference code to resume | ||
batch inference during the event of spot preemption or node/GPU failure. | ||
|
||
The following code, checks the completed chunks and continue the unfinished chunks | ||
whenever a failure happens. | ||
|
||
.. code-block:: python | ||
|
||
def continue_batch_inference(data_paths: List[str], output_path: str): | ||
# Automatically skip processed data, resume the rest. | ||
for data_path in data_paths: | ||
data_name = data_path.split('/')[-1] | ||
succeed_indicator = os.path.join(output_path, data_name + '.succeed') | ||
if os.path.exists(succeed_indicator): | ||
print(f'Skipping {data_path} because it has been processed.') | ||
continue | ||
|
||
prediction = batch_inference(data_path, output_path) | ||
|
||
save_prediction(prediction, output_path) | ||
mark_as_done(succeed_indicator) | ||
|
||
To allow SkyPilot searching through all available spot instances and specialized | ||
AI clouds with different accelerators based on cost, we add the following fields | ||
in the ``worker.yaml``. It allows SkyPilot to search for the cheapest resources, | ||
among different accelerator types, including L4, L40, etc, with different pricing | ||
models, including on-demand and spot instances, on all enabled cloud providers. | ||
|
||
.. code-block:: yaml | ||
|
||
resources: | ||
accelerators: {L4, L40, A10, A10g, A100, A100-80GB} | ||
any_of: | ||
- use_spot: true | ||
- use_spot: false | ||
|
||
We then start the batch inference workers with the same script: | ||
|
||
.. code-block:: bash | ||
|
||
# Use spot instances to reduce costs | ||
NUM_GROUPS=16 | ||
for i in $(seq 0 $((NUM_GROUPS - 1))); do | ||
sky jobs launch -y -n group-$i worker.yaml \ | ||
--env DATA_GROUP_METADATA=./groups/$i.txt & | ||
done | ||
|
||
.. Tested worker on 2024-09-15 with continue_batch_inference. | ||
|
||
|
||
Advance Tips | ||
------------ | ||
|
||
1. Data placement: To avoid expensive data egress costs, you can place your input data on Cloudflare R2, | ||
which does not charge for data egress, so you don't need to pay for the data reading. | ||
|
||
.. TODO: how to deal with output data? | ||
|
||
2. Reduce restart overhead: Keeping the average overhead (including provisioning, setting up and potential progress loss during failure) | ||
to be within half an hour could be ideal for more efficient usage of spot instances, according to our `paper <>`_. | ||
|
||
3. Chunk size: the time for processing a data chunk is highly related to the size (number of samples) within a chunk, which will impact the potential progress loss during failure as mentioned in *Tip 2*. Before splitting the dataset into chunks, you could benchmark the time for | ||
processing a single chunk in order to get the best performance. | ||
|
||
|
||
Next steps | ||
---------- | ||
|
||
1. Details of :ref:`SkyPilot Manged Jobs <managed-jobs>`_. | ||
2. Join `SkyPilot community Slack <https://slack.skypilot.co>`__ for questions and requests. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
file_mounts: | ||
/data: | ||
name: skypilot-lmsys-chat-1m | ||
store: r2 | ||
mode: MOUNT | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
import argparse | ||
import os | ||
import math | ||
|
||
def group_data(data_paths: str, num_groups: int): | ||
# Chunk data paths in to multiple groups | ||
data_groups = [] | ||
group_size = math.ceil(len(data_paths) / num_groups) | ||
for i in range(num_groups): | ||
data_groups.append(data_paths[i * group_size:(i + 1) * group_size]) | ||
return data_groups | ||
|
||
if __name__ == '__main__': | ||
parser = argparse.ArgumentParser() | ||
parser.add_argument('--data-metadata', type=str, required=True, help='The file that contains all paths to data to be chunked.') | ||
parser.add_argument('--num-groups', type=int, required=True, help='The number of groups to be created.') | ||
args = parser.parse_args() | ||
|
||
with open(args.data_metadata, 'r') as f: | ||
data_paths = f.readlines() | ||
data_paths = [path.strip() for path in data_paths] | ||
|
||
data_groups = group_data(data_paths, args.num_groups) | ||
|
||
# Save data groups to different files | ||
os.makedirs('./groups', exist_ok=True) | ||
for i, data_group in enumerate(data_groups): | ||
with open(f'./groups/{i}.txt', 'w') as f: | ||
f.write('\n'.join(data_group)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall may need a figure to illustrate all concepts and how they relate: chunks / work items (is one item = one jsonl in this case?) / groups / workers, ..
At least a bullet list at the top to explain these. I'm currently confused by the above.