Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gcp2mdx transfer #16

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions gcp2mdx-transfer/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# rclone config
rclone.conf

# log
log*
logs/
*.log

# stamp
stamps/
90 changes: 90 additions & 0 deletions gcp2mdx-transfer/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# A makefile to transfer data from GCS to mdx object storage (S3 compatible)

PARALLEL := 8
BWLIMIT := 100M
STATS_INTERVAL := 1m
RCLONE_GLOBAL_FLAGS := -v --config=rclone.conf --stats=${STATS_INTERVAL}
RCLONE_COPY_FLAGS := --checksum --bwlimit=${BWLIMIT} --transfers=${PARALLEL} --checkers=${PARALLEL}

SRC_BUCKET_NON_LUSTRE := gcs:llama-2-172b-checkpoints
SRC_BUCKET := gcs:llama-2-172b-lustre-checkpoints
DEST_BUCKET := mdx-s3:llama-2-172b-checkpoints

begin_non_lustre := 500
end_non_lustre := 12500
begin_lustre := 13000
end_10percent := 30000
begin_50percent := 31000
end_50percent := 150000
begin_100percent := 152000
end_100percent := 300000

_checkpoints_non_lustre = $(shell seq -f 'stamps/iter_%07g' $(begin_non_lustre) 500 $(end_non_lustre))
_checkpoints_10percent = $(shell seq -f 'stamps/iter_%07g' $(begin_lustre) 500 $(end_10percent))
_checkpoints_50percent = $(shell seq -f 'stamps/iter_%07g' $(begin_50percent) 1000 $(end_50percent))
_checkpoints_100percent = $(shell seq -f 'stamps/iter_%07g' $(begin_100percent) 2000 $(end_100percent))

# Default is dry run mode
dry_run := 1

# Dry run mode
TOUCH_FLAGS =
ifeq ($(dry_run),1)
RCLONE_GLOBAL_FLAGS += --dry-run
TOUCH_FLAGS += --no-create
endif

# Watch flags
WATCH_FLAGS :=
ifdef SLACK_WEBHOOK
WATCH_FLAGS += --slack_webhook ${SLACK_WEBHOOK}
endif

# Targets
.DEFAULT_GOAL = all
.PHONY: checkpoints_non_lustre checkpoints_10percent checkpoints_50percent

checkpoints_non_lustre : $(_checkpoints_non_lustre)
checkpoints_10percent : $(_checkpoints_10percent)
checkpoints_50percent : $(_checkpoints_50percent)
checkpoints_100percent : $(_checkpoints_100percent)

all : $(checkpoints_non_lustre) $(checkpoints_10percent) $(checkpoints_50percent) $(checkpoints_100percent)

$(_checkpoints_non_lustre) : stamps/% : stamps/dir logs/dir
./watch.py ${WATCH_FLAGS} \
rclone ${RCLONE_GLOBAL_FLAGS} copy \
${RCLONE_COPY_FLAGS} \
--log-file=logs/$*.log \
${SRC_BUCKET_NON_LUSTRE}/$*/ ${DEST_BUCKET}/$* && \
touch $(TOUCH_FLAGS) $@

$(_checkpoints_10percent) : stamps/% : stamps/dir logs/dir
./watch.py ${WATCH_FLAGS} \
rclone ${RCLONE_GLOBAL_FLAGS} copy \
${RCLONE_COPY_FLAGS} \
--log-file=logs/$*.log \
${SRC_BUCKET}/$*/ ${DEST_BUCKET}/$* && \
touch $(TOUCH_FLAGS) $@

$(_checkpoints_50percent) : stamps/% : stamps/dir logs/dir
./watch.py ${WATCH_FLAGS} \
rclone ${RCLONE_GLOBAL_FLAGS} copy \
${RCLONE_COPY_FLAGS} \
--log-file=logs/$*.log \
${SRC_BUCKET}/$*/ ${DEST_BUCKET}/$* && \
touch $(TOUCH_FLAGS) $@

$(_checkpoints_100percent) : stamps/% : stamps/dir logs/dir
./watch.py ${WATCH_FLAGS} \
rclone ${RCLONE_GLOBAL_FLAGS} copy \
${RCLONE_COPY_FLAGS} \
--log-file=logs/$*.log \
${SRC_BUCKET}/$*/ ${DEST_BUCKET}/$* && \
touch $(TOUCH_FLAGS) $@

logs/dir:
mkdir -p $@

stamps/dir:
mkdir -p $@
5 changes: 5 additions & 0 deletions gcp2mdx-transfer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# GCP to mdx data transfer script

## Links

- [rclone](https://rclone.org/)
11 changes: 11 additions & 0 deletions gcp2mdx-transfer/rclone.conf.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[gcs]
type = google cloud storage
location = {{ gcs_location }}
storage_class =

[mdx-s3]
type = s3
provider = Other
access_key_id = {{ s3_access_key }}
secret_access_key = {{ s3_secret_key }}
endpoint = {{ s3_endpoint }}
32 changes: 32 additions & 0 deletions gcp2mdx-transfer/validate_md5.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#!/bin/bash

set -e -o pipefail

check_() {
gcs_path="$1"
mdx_path="$2"
gcs_md5="$(rclone --config=rclone.conf md5sum "$gcs_path" | sort -k 2,2)"
mdx_md5="$(rclone --config=rclone.conf md5sum "$mdx_path" | sort -k 2,2)"
if [ "$gcs_md5" = "" ]; then
echo -e "\e[36m[INFO]\e[0m $gcs_path is empty, skipping..."
elif [ "$gcs_md5" != "$mdx_md5" ]; then
echo -e "\e[31m[ERROR]\e[0m MD5 mismatch for $gcs_path"
else
echo -e "\e[32m[OK]\e[0m MD5 match for $gcs_path"
fi
}

mdx_bucket="mdx-s3:llama-2-172b-checkpoints"
gcs_bucket="gcs:llama-2-172b-lustre-checkpoints"
first_iter="$(rclone --config=rclone.conf lsd $gcs_bucket | grep -o 'iter_[0-9]\+' | sed 's/^iter_0*//' | grep -E '^[0-9]+000$' | sort -h | head -n1)"
last_iter="$(rclone --config=rclone.conf lsd $mdx_bucket | grep -o 'iter_[0-9]\+' | sed 's/^iter_0*//' | sort -h | tail -n1)"
echo "First iteration: $first_iter"
echo "Last iteration: $last_iter"

# Checkpoints from 31000 to 150000 with step 1000
begin=31000
end="$last_iter"
step=1000
for iter in $(seq -f 'iter_%07g' $begin $step $end); do
check_ "$gcs_bucket/$iter" "$mdx_bucket/$iter"
done
48 changes: 48 additions & 0 deletions gcp2mdx-transfer/validate_md5_and_delete.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#!/bin/bash

set -e -o pipefail

check_delete() {
gcs_path="$1"
mdx_path="$2"
echo "============== $gcs_path =============="
echo "============== $mdx_path =============="
gcs_md5="$(rclone --config=rclone.conf md5sum "$gcs_path" | sort -k 2,2)"
mdx_md5="$(rclone --config=rclone.conf md5sum "$mdx_path" | sort -k 2,2)"
if [ "$gcs_md5" = "" ]; then
echo -e "\e[31m[ERROR]\e[0m gcs_md5 empty for $gcs_path"
fi
if [ "$mdx_md5" = "" ]; then
echo -e "\e[31m[ERROR]\e[0m mdx_md5 empty for $mdx_path"
fi
if [ "$gcs_md5" != "$mdx_md5" ]; then
echo -e "\e[31m[ERROR]\e[0m MD5 mismatch for $gcs_path"
else
echo -e "\e[32m[OK]\e[0m MD5 match for $gcs_path"
$echo rclone --config=rclone.conf delete --rmdirs "$gcs_path"
echo "Delete $gcs_path successfully"
fi
}

echo=""

# Non lustre checkpoints
begin=500
end=12500
step=500
gcs_bucket="gcs:llama-2-172b-checkpoints"
mdx_bucket="mdx-s3:llama-2-172b-checkpoints"
for iter in $(seq -f 'iter_%07g' $begin $step $end); do
check_delete "$gcs_bucket/$iter" "$mdx_bucket/$iter"
done

# Lustre checkpoints from 13000 to 30000 with step 500
begin=13000
end=30000
step=500
gcs_bucket="gcs:llama-2-172b-lustre-checkpoints"
mdx_bucket="mdx-s3:llama-2-172b-checkpoints"
for iter in $(seq -f 'iter_%07g' $begin $step $end); do
check_delete "$gcs_bucket/$iter" "$mdx_bucket/$iter"
done

46 changes: 46 additions & 0 deletions gcp2mdx-transfer/watch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#!/usr/bin/env python3

'''
Command line tool to run a command and send a message to a Slack webhook if the command fails.
'''

import subprocess
import urllib
import urllib.request
import urllib.parse
import argparse

def call_slack_webhook(webhook, message):
data = urllib.parse.urlencode({'payload': f'{{"text": "{message}"}}'}).encode()
req = urllib.request.Request(webhook, data=data)
urllib.request.urlopen(req)

def main():
parser = argparse.ArgumentParser(description='Watch subprocesses')
parser.add_argument('--slack_webhook', help='Slack webhook URL')
parser.add_argument('command', nargs=argparse.REMAINDER, help='Command to run')

args = parser.parse_args()
webhook = args.slack_webhook
command = args.command
if not command:
parser.error('No command provided')

command_str = ' '.join(command)
print(f'Running command: `{command_str}`')
proc = subprocess.run(command)
if proc.returncode != 0:
message = f'Command failed with exit code {proc.returncode}: `{command_str}`'
print(message)
if webhook:
call_slack_webhook(webhook, message)
else:
message = f'Command successfully ended: `{command_str}`'
print(message)
if webhook:
call_slack_webhook(webhook, message)

return proc.returncode

if __name__ == '__main__':
exit(main())