Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Docs] Offline batch inference guide (static assignment) #4144

Draft
wants to merge 12 commits into
base: master
Choose a base branch
from
315 changes: 315 additions & 0 deletions docs/source/examples/batch-inference.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,315 @@
.. _offline-batch-inference:

Large-Scale Offline Batch Inference
===================================
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall may need a figure to illustrate all concepts and how they relate: chunks / work items (is one item = one jsonl in this case?) / groups / workers, ..

At least a bullet list at the top to explain these. I'm currently confused by the above.



Offline batch inference is a process for generating model predictions on a fixed set of input data. By batching multiple inputs into a single request, you can significantly improve the GPU utilization and reduce the total inference cost.

It is a common use case for AI:

* Large-scale document processing (summarization, entity retreival, etc)
* Data pre-processing for training
* Synthetic data generation
* Scientific data analysis
* ...

.. SkyPilot enables large scale batch inference with a simple interface, offering the following benefits:

.. * Cost-effective: Pay only for the resources you use, and even cheaper spot instances.
.. * Faster: Scales out your jobs to multiple machines from any available resource pool.
.. * Robust: Automatically handles failures and recovers jobs.
.. * Easy to use: Abstracts away the complexity of distributed computing, giving you a simple interface to manage your jobs.
.. * Mounted Storage: Access data on object store as if they are local files.

We now walk through the steps for developing and scaling out batch inference with SkyPilot.
We take a real-world example, LMSys-1M dataset, and a popular open-source model, Meta-Llama-3.1-7B, to showcase the process.
All scripts can be found in the `examples in SkyPilot repository <>`__.

**TL;DR:** To run batch inference on LMSys-1M dataset with Meta-Llama-3.1-7B model, you can use the following command:

.. code-block:: bash

NUM_PARALLEL_GROUPS=16
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"groups" is surprising/unfamiliar; do you mean "workers"?

for i in $(seq 0 $((NUM_PARALLEL_GROUPS - 1))); do
sky jobs launch -y -n group-$i worker.yaml \
--env DATA_GROUP_METADATA=./groups/$i.txt \
--env MODEL_NAME=meta-llama/Meta-Llama-3.1-8B-Instruct &
done

.. image:: An image for the `sky jobs queue`

.. _split-data-into-smaller-chunks:

Split Data into Smaller Chunks
------------------------------

[LMSys-1M dataset](https://huggingface.co/datasets/lmsys/lmsys-chat-1m) contains 1M conversations with 6 parquet files:

.. code-block::

train-00000-of-00006-*.parquet
train-00001-of-00006-*.parquet
train-00002-of-00006-*.parquet
train-00003-of-00006-*.parquet
train-00004-of-00006-*.parquet
train-00005-of-00006-*.parquet


For simplicity, in order to scale out the inference to more than 6 nodes, we can firstly split the entire dataset into smaller chunks.

.. note::

This step is optional. We offer a R2 bucket with LMSys-1M dataset that is already divided into smaller chunks: ``r2://skypilot-lmsys-chat-1m`` (Check it in browser `here <https://pub-109f99b93eac4c22939d0ed4385f0dcd.r2.dev>`_).

.. TODO: confirm r2 bucket's public access

We first start a machine for development. The following command will start a small CPU machine with the current directory synced and a R2 bucket mounted, so we can directly operate on the data as if they are on local disk.

.. code-block:: bash

sky launch -c dev dev.yaml --workdir .
# SSH into the remote machine.
ssh dev
# Running on remote machine.
cd sky_workdir

On the remote dev machine, we download the LMSys-1M dataset and split it into smaller chunks, using a utility script:

.. code-block:: bash

python utils/download_and_convert.py

This script converts the dataset into 200 data chunks, each containing 5000 conversations. A metadata file is also generated with all paths to data chunks:

.. code-block::

metadata.txt
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to show the first few lines of metadata file.

part_0.jsonl
part_1.jsonl
...
part_199.jsonl

.. note::

We use R2 bucket for its no data egress fee, so we can easily scale out the inference to multiple regions/clouds without additional costs for data reading.


.. _develop-inference-script:

Develop Inference Script
------------------------

With the data splitted into smaller chunks, we can now develop an inference script to generate predictions for each chunk.

First of all, we start another dev machine with GPUs so we can develop and debug the inference script by directly logging into the machine and running the script.

.. code-block:: bash

sky launch -c dev dev.yaml --gpus L4 --workdir .
ssh dev
cd sky_workdir

We now develop the inference script to generate predictions for the first turn of each conversation in LMSys-1M dataset.

The following is an example script, where we aggregate multiple inputs into a single batch for better GPU utilization, and process the entire chunk of data batch by batch:

.. code-block:: python

from vllm import LLM

BATCH_CHAR_COUNT = 2000
DATA_PATH = '/data/part_0.jsonl'
OUTPUT_PATH = '/output'

llm = LLM(model='meta-llama/Meta-Llama-3.1-7B-Instruct', tensor_parallel_size=1)

def batch_inference(llm: LLM, data_path: str):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename? It seems to be working on 1 work item / 1 batch only, and there's no 'batching'?

# This can take about 1-2 hours on a L4 GPU.
print(f'Processing {data_path}...')
data_name = data_path.split('/')[-1]

# Read data (jsonl), each line is a json object
with open(data_path, 'r') as f:
data = f.readlines()
# Extract the first message from the conversation
messages = [json.loads(d.strip())['conversation'][0]['content'] for d in data]

# Run inference
batch_char_count = 0
batch_messages = []
generated_text = []
for message in tqdm(messages):
# Calculate the word count of the conversation
char_count = len(message)
batch_char_count += char_count

if batch_char_count > BATCH_CHAR_COUNT:
outputs = llm.generate(batch_messages, SAMPLING_PARAMS, use_tqdm=False)
generated_text = []
for output in outputs:
generated_text.append(' '.join([o.text for o in output.outputs]))
batch_messages = []
batch_char_count = 0

batch_messages.append(message)

# Save predictions
os.makedirs(OUTPUT_PATH, exist_ok=True)
with open(os.path.join(OUTPUT_PATH, data_name), 'w') as f:
for text in generated_text:
f.write(text + '\n')

batch_inference(llm, DATA_PATH)

For complete script, see `examples/batch_inference/inference.py <https://github.com/skypilot-org/skypilot/blob/main/examples/batch_inference/inference.py>`_ and you can run it with ``HF_TOKEN=<your-huggingface-token> python inference.py`` to test it on the dev machine.

After testing it on the dev machine, we can now compose a task yaml (`inference.yaml <https://github.com/skypilot-org/skypilot/blob/main/examples/batch_inference/inference.yaml>`) to run the inference on clouds.

.. code-block:: bash

# Set HuggingFace token for accessing Llama model weights.
export HF_TOKEN=...
sky launch -c inf ./inference.yaml \
--env HF_TOKEN

.. TODO: make r2 bucket publically accessible
.. tested with inference.py and inference.yaml on 2024-09-15 and works well.

.. _scale-out-to-multiple-nodes:

Scale Out to Multiple Nodes
---------------------------

To scale out the inference to multiple machines, we can group the data chunks into multiple pieces so that each machine can process one piece.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reduce concept: yet another one here 'piece'!


The following script (`group_data.py <https://github.com/skypilot-org/skypilot/blob/main/examples/batch_inference/group_data.py>`_) reads the metadata file and splits the path of data chunks into multiple groups.

.. code-block:: python

NUM_GROUPS = 16

def group_data(data_paths: str, num_groups: int):
# Chunk data paths in to multiple groups
data_groups = []
group_size = len(data_paths) // num_groups
for i in range(num_groups):
data_groups.append(data_paths[i * group_size:(i + 1) * group_size])
return data_groups

data_groups = chunk_data(data_paths, NUM_GROUPS)

# Save data chunks to different files
for i, data_group in enumerate(data_groups):
with open(f'./groups/{i}.txt', 'w') as f:
f.write('\n'.join(data_group))


.. code-block::

# ./groups/0.txt
part_0.jsonl
part_1.jsonl
...
part_13.jsonl

On dev machine, we can use the ``group_data.py`` script to group data chunks into the number of machines we want to scale out.

.. code-block:: bash

python group_data.py \
--data-metadata ./metadata.txt \
--num-groups 16

After that, we can launch a job for each group to process the groups in parallel.

.. code-block:: bash

# Launch a job for each chunk
NUM_CHUNKS=16
for i in $(seq 0 $((NUM_CHUNKS - 1))); do
# We use & to launch jobs in parallel
sky jobs launch -y -d -n group-$i worker.yaml \
--env DATA_GROUP_METADATA=./groups/$i.txt &
done

.. Tested worker on 2024-09-15 with a group containing multiple data parts.

Cut Costs by ~5x with Spot Instances and Specialized AI Clouds
--------------------------------------------------------------

Batch inference can get pretty expensive when it involves large models and high-end
GPUs. By leveraging spot instances and specialized clouds, you should achieve around
5x cost reduction by giving away some robustness guarantee.

To handle the robustness issue, we can wrap our batch inference code to resume
batch inference during the event of spot preemption or node/GPU failure.

The following code, checks the completed chunks and continue the unfinished chunks
whenever a failure happens.

.. code-block:: python

def continue_batch_inference(data_paths: List[str], output_path: str):
# Automatically skip processed data, resume the rest.
for data_path in data_paths:
data_name = data_path.split('/')[-1]
succeed_indicator = os.path.join(output_path, data_name + '.succeed')
if os.path.exists(succeed_indicator):
print(f'Skipping {data_path} because it has been processed.')
continue

prediction = batch_inference(data_path, output_path)

save_prediction(prediction, output_path)
mark_as_done(succeed_indicator)

To allow SkyPilot searching through all available spot instances and specialized
AI clouds with different accelerators based on cost, we add the following fields
in the ``worker.yaml``. It allows SkyPilot to search for the cheapest resources,
among different accelerator types, including L4, L40, etc, with different pricing
models, including on-demand and spot instances, on all enabled cloud providers.

.. code-block:: yaml

resources:
accelerators: {L4, L40, A10, A10g, A100, A100-80GB}
any_of:
- use_spot: true
- use_spot: false

We then start the batch inference workers with the same script:

.. code-block:: bash

# Use spot instances to reduce costs
NUM_GROUPS=16
for i in $(seq 0 $((NUM_GROUPS - 1))); do
sky jobs launch -y -n group-$i worker.yaml \
--env DATA_GROUP_METADATA=./groups/$i.txt &
done

.. Tested worker on 2024-09-15 with continue_batch_inference.


Advance Tips
------------

1. Data placement: To avoid expensive data egress costs, you can place your input data on Cloudflare R2,
which does not charge for data egress, so you don't need to pay for the data reading.

.. TODO: how to deal with output data?

2. Reduce restart overhead: Keeping the average overhead (including provisioning, setting up and potential progress loss during failure)
to be within half an hour could be ideal for more efficient usage of spot instances, according to our `paper <>`_.

3. Chunk size: the time for processing a data chunk is highly related to the size (number of samples) within a chunk, which will impact the potential progress loss during failure as mentioned in *Tip 2*. Before splitting the dataset into chunks, you could benchmark the time for
processing a single chunk in order to get the best performance.


Next steps
----------

1. Details of :ref:`SkyPilot Manged Jobs <managed-jobs>`_.
2. Join `SkyPilot community Slack <https://slack.skypilot.co>`__ for questions and requests.

6 changes: 6 additions & 0 deletions docs/source/examples/managed-jobs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -483,3 +483,9 @@ The :code:`resources` field has the same spec as a normal SkyPilot job; see `her
stopped or live). For them to take effect, tear down the existing controller
first, which requires all in-progress jobs to finish or be canceled.


Next Steps
----------

* :ref:`Many Parallel Jobs <many-jobs>`
* :ref:`Offline Batch Inference <offline-batch-inference>`
2 changes: 1 addition & 1 deletion docs/source/getting-started/quickstart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ Congratulations! In this quickstart, you have launched a cluster, run a task, a
Next steps:

- Adapt :ref:`Tutorial: AI Training <ai-training>` to start running your own project on SkyPilot!
- See the :ref:`Task YAML reference <yaml-spec>`, :ref:`CLI reference <cli>`, and `more examples <https://github.com/skypilot-org/skypilot/tree/master/examples>`_
- See the :ref:`Task YAML reference <yaml-spec>`, :ref:`CLI reference <cli>`, and more examples: `LLM recipes <https://github.com/skypilot-org/skypilot/tree/master/llm>`_, `AI Gallery <https://skypilot.readthedocs.io/en/latest/gallery/index.html>`_, and `examples <https://github.com/skypilot-org/skypilot/tree/master/examples>`_.
- To learn more, try out `SkyPilot Tutorials <https://github.com/skypilot-org/skypilot-tutorial>`_ in Jupyter notebooks

We invite you to explore SkyPilot's unique features in the rest of the documentation.
6 changes: 6 additions & 0 deletions examples/batch_inference/dev.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
file_mounts:
/data:
name: skypilot-lmsys-chat-1m
store: r2
mode: MOUNT

29 changes: 29 additions & 0 deletions examples/batch_inference/group_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import argparse
import os
import math

def group_data(data_paths: str, num_groups: int):
# Chunk data paths in to multiple groups
data_groups = []
group_size = math.ceil(len(data_paths) / num_groups)
for i in range(num_groups):
data_groups.append(data_paths[i * group_size:(i + 1) * group_size])
return data_groups

if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--data-metadata', type=str, required=True, help='The file that contains all paths to data to be chunked.')
parser.add_argument('--num-groups', type=int, required=True, help='The number of groups to be created.')
args = parser.parse_args()

with open(args.data_metadata, 'r') as f:
data_paths = f.readlines()
data_paths = [path.strip() for path in data_paths]

data_groups = group_data(data_paths, args.num_groups)

# Save data groups to different files
os.makedirs('./groups', exist_ok=True)
for i, data_group in enumerate(data_groups):
with open(f'./groups/{i}.txt', 'w') as f:
f.write('\n'.join(data_group))
Loading
Loading