Skip to content

Commit

Permalink
support infinite loop over alpaca dataset
Browse files Browse the repository at this point in the history
ghstack-source-id: 38cbc277e2a177bc0baf35450a661835b97a7f22
Pull Request resolved: #92
  • Loading branch information
tianyu-l committed Feb 26, 2024
1 parent ae85e97 commit bd6fe55
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 16 deletions.
40 changes: 25 additions & 15 deletions torchtrain/datasets/alpaca.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class AlpacaDataset(IterableDataset):
seq_len (int): max sequence length
world_size (int): number of data parallel processes participating in training
rank (int): rank of the current data parallel process
infinite: whether to loop infinitely over the dataset
Data input format:
{
Expand All @@ -43,38 +44,47 @@ def __init__(
seq_len: int = 2048,
world_size: int = 1,
rank: int = 0,
infinite: bool = False,
**kwargs
) -> None:
# TODO: This is a temporary solution for small datasets like Alpaca.
# For larger datasets we need to use a more scalable approach.
# Setting `streaming=True` works for large dataset, but the speed is slow.
ds = load_dataset("tatsu-lab/alpaca", split="train")
self.data_iterator = iter(split_dataset_by_node(ds, rank, world_size))
self._data = split_dataset_by_node(ds, rank, world_size)
self._tokenizer = tokenizer
self.seq_len = seq_len
self.infinite = infinite

def __iter__(self):
max_buffer_token_len = 1 + self.seq_len
all_tokens: List[int] = []

for sample in self.data_iterator:
sample_text = sample["text"]
sample_tokens = self._tokenizer.encode(sample_text, bos=True, eos=True)
all_tokens.extend(sample_tokens)
while True:
for sample in iter(self._data):
sample_text = sample["text"]
sample_tokens = self._tokenizer.encode(sample_text, bos=True, eos=True)
all_tokens.extend(sample_tokens)

while len(all_tokens) >= max_buffer_token_len:
x = torch.LongTensor(all_tokens[:max_buffer_token_len])
# batched_x = x.reshape(self.batch_size, -1)
# update tokens to the remaining tokens
all_tokens = all_tokens[max_buffer_token_len:]
input = x[:-1]
label = x[1:]
yield input, label
while len(all_tokens) >= max_buffer_token_len:
x = torch.LongTensor(all_tokens[:max_buffer_token_len])
# update tokens to the remaining tokens
all_tokens = all_tokens[max_buffer_token_len:]
input = x[:-1]
label = x[1:]
yield input, label
if not self.infinite:
break


def build_alpaca_data_loader(
tokenizer: TokenizerIf, batch_size: int, seq_len: int, world_size, rank
tokenizer: TokenizerIf,
batch_size: int,
seq_len: int,
world_size: int,
rank: int,
infinite: bool = True,
):
alpaca_ds = AlpacaDataset(tokenizer, seq_len, world_size, rank)
alpaca_ds = AlpacaDataset(tokenizer, seq_len, world_size, rank, infinite)

return DataLoader(alpaca_ds, batch_size=batch_size)
4 changes: 3 additions & 1 deletion train.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ def main(job_config: JobConfig):
)
checkpoint.load()

data_iterator = iter(data_loader)

with maybe_run_profiler(job_config) as torch_profiler:
checkpoint.reset()
# variables used to keep info for metrics logging
Expand All @@ -180,7 +182,7 @@ def main(job_config: JobConfig):
train_state.step += 1
# get batch
data_load_start = timer()
batch = next(iter(data_loader))
batch = next(data_iterator)
input_ids, labels = batch
input_ids = input_ids.cuda()
labels = labels.cuda()
Expand Down

0 comments on commit bd6fe55

Please sign in to comment.