Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v0.4.1 #149

Merged
merged 20 commits into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

# Byte-compiled / optimized / DLL files
/tmp
/tmp.*
__pycache__/
*.py[cod]
*$py.class
Expand Down
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Change Log

## v0.4.1 - 20240425
* Better parallelization: parallel and mono data are scheduled at once (previously it was one after the other)
* `mtdata cache` added. Improves concurrency by supporting multiple recipes
* Added WMT general test 2022 and 2023
* mtdata-bcp47 : -p/--pipe to map codes from stdin -> stdout
* mtdata-bcp47 : --script {suppress-default,suppress-all,express}
* Uses`pigz` to read and write gzip files by default when pigz is in PATH. export `USE_PIGZ=0` to disable

## v0.4.0 - 20230326

* Fix: allenai_nllb.json is now included in MANIFEST.in [#137](https://github.com/thammegowda/mtdata/issues/137). Also fixed CI: Travis -> github actions
Expand Down
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,22 @@ mtdata-bcp47 eng English en-US en-GB eng-Latn kan Kannada-Deva hin-Deva kan-Latn
| kan-in | kan_IN | kan | None | IN |
| kn-knda-in | kan_IN | kan | None | IN |

__Pipe Mode__
```bash
# --pipe/-p : maps stdin -> stdout
# -s express : expresses scripts (unlike BCP47, which supresses default script
$ echo -e "en\neng\nfr\nfra\nara\nkan\ntel\neng_Latn\nhin_deva"| mtdata-bcp47 -p -s express
eng_Latn
eng_Latn
fra_Latn
fra_Latn
ara_Arab
kan_Knda
tel_Telu
eng_Latn
hin_Deva
```

**Python API for BCP47 Mapping**
```python
from mtdata.iso.bcp47 import bcp47
Expand Down
2 changes: 1 addition & 1 deletion mtdata/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# Created: 4/4/20


__version__ = '0.4.0'
__version__ = '0.4.1-dev'
__description__ = 'mtdata is a tool to download datasets for machine translation'
__author__ = 'Thamme Gowda'

Expand Down
9 changes: 4 additions & 5 deletions mtdata/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,13 +175,13 @@ def get_local_in_paths(self, path: Path, entry: Entry,):
raise MTDataException(f'Unable to read {entry.did}; the file is neither zip nor tar')

def download(self, url: str, save_at: Path, timeout=(5, 10), entry=None):

valid_flag = self.get_flag_file(save_at)
lock_file = valid_flag.with_suffix("._lock")
if valid_flag.exists() and save_at.exists():
return save_at
save_at.parent.mkdir(parents=True, exist_ok=True)
log.info(f"Download: {url} → {save_at}")
log.info(f"Downloading: {url} → {save_at}")
log.debug(f"Acquiring lock on {lock_file}")
with portalocker.Lock(lock_file, 'w', timeout=Defaults.FILE_LOCK_TIMEOUT) as fh:
# check if downloaded by other parallel process
Expand All @@ -192,13 +192,12 @@ def download(self, url: str, save_at: Path, timeout=(5, 10), entry=None):
assert resp.status_code == 200, resp.status_code
buf_size = 2 ** 14
tot_bytes = int(resp.headers.get('Content-Length', '0'))
n_buffers = math.ceil(tot_bytes / buf_size) or None
parts = url.split('/')
parts = url.split('/')
desc = [entry and f'{entry.did} |' or '',
tot_bytes and (format_byte_size(tot_bytes) + "|") or "",
parts[2][:24], '...', parts[-1][-24:], # host ... filename
]
desc = ''.join(desc)
desc = ''.join(desc)
with pbar_man.counter(color='green', total=tot_bytes//2**10, unit='KiB', leave=False, position=2,
min_delta=Defaults.PBAR_REFRESH_INTERVAL, desc=f"{desc}"
) as pbar, open(save_at, 'wb', buffering=2**24) as out:
Expand Down
70 changes: 58 additions & 12 deletions mtdata/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,22 @@
# Author: Thamme Gowda [tg (at) isi (dot) edu]
# Created: 4/5/20
import collections as coll
import concurrent.futures
import json
from itertools import zip_longest
from multiprocessing import Pool
from pathlib import Path
from typing import Optional, List, Tuple, Dict, Union
import random
from typing import Dict, List, Tuple, Union

import portalocker

from mtdata import log, pbar_man, cache_dir as CACHE_DIR, Defaults
from mtdata import Defaults
from mtdata import cache_dir as CACHE_DIR
from mtdata import log, pbar_man
from mtdata.cache import Cache
from mtdata.entry import Entry, DatasetId, LangPair
from mtdata.entry import DatasetId, Entry, LangPair
from mtdata.index import INDEX
from mtdata.iso.bcp47 import bcp47, BCP47Tag
from mtdata.iso.bcp47 import BCP47Tag, bcp47
from mtdata.parser import Parser
from mtdata.utils import IO

Expand Down Expand Up @@ -64,6 +67,39 @@ def resolve_entries(cls, dids: List[DatasetId]):
raise Exception(f'Could not find {did}; try "mtdata list | grep -i {did}" to locate it')
return entries

@classmethod
def parallel_download(cls, entries: List[Entry], cache: Cache, n_jobs=1):
"""Download entries in parallel. This is useful when there are many entries to download.
:param entries: list of entries to download
:param cache: cache object to download the entries
:param n_jobs: number of parallel jobs to run
:return: dictionary of entry -> paths. Failed entries will have None path.
"""
if n_jobs == 1:
return [cache.get_entry(ent) for ent in entries]
log.info(f"Downloading {len(entries)} datasets in parallel with {n_jobs} jobs")
result = {}
entries = list(entries) # make a copy
# shuffle to hit different servers at the same time
random.seed(42)
random.shuffle(entries)
status = dict(total=len(entries), success=0, failed=0)
with concurrent.futures.ProcessPoolExecutor(max_workers=n_jobs) as executor:
futures_to_entry = {executor.submit(cache.get_entry, entry): entry for entry in entries}
for future in concurrent.futures.as_completed(futures_to_entry.keys()):
entry:Entry = futures_to_entry[future]
try:
paths = future.result() # paths, ignore
result[entry] = paths
status['success'] += 1
log.info(f"[{status['success']}/{status['total']}] Downloaded {entry.did}")
except Exception as exc:
result[entry] = None
status['failed'] += 1
log.warning(f"Failed to download {entry.did}: {exc} Total failed: {status['failed']}")
log.info(f"Downloaded {status['success']} datasets. Failed to download {status['failed']}")
return result

@classmethod
def prepare(cls, langs, out_dir: Path, dataset_ids=Dict[str, List[DatasetId]],
cache_dir: Path = CACHE_DIR, merge_train=False, drop_noise: Tuple[bool, bool] = (True, False),
Expand All @@ -81,6 +117,8 @@ def prepare(cls, langs, out_dir: Path, dataset_ids=Dict[str, List[DatasetId]],
for ent in all_entries:
if not ent.is_compatible(langs):
raise Exception(f'Given languages: {langs} and dataset: {ent.did} are not compatible')
if n_jobs > 1:
cls.parallel_download(all_entries, Cache(cache_dir), n_jobs=n_jobs)

dataset = cls(dir=out_dir, langs=langs, cache_dir=cache_dir, drop_train_noise=drop_train_noise,
drop_test_noise=drop_test_noise, drop_dupes=drop_dupes, drop_tests=drop_tests,
Expand All @@ -107,8 +145,8 @@ def prepare(cls, langs, out_dir: Path, dataset_ids=Dict[str, List[DatasetId]],
drop_hashes = test_pair_hash | test_seg_hash # set union
dataset.add_train_entries(train_entries, merge_train=merge_train, compress=compress,
drop_hashes=drop_hashes)
for key, dirpath in [('mono_train', dataset.mono_train_parts_dir),
('mono_dev', dataset.mono_tests_dir),
for key, dirpath in [('mono_train', dataset.mono_train_parts_dir),
('mono_dev', dataset.mono_tests_dir),
('mono_test', dataset.mono_tests_dir)]:
if dataset_ids.get(key):
dirpath.mkdir(exist_ok=True)
Expand Down Expand Up @@ -206,7 +244,7 @@ def add_train_entries(self, entries, merge_train=False, compress=False, drop_has
log.info('Train stats:\n' + stats_msg)
IO.write_lines(self.dir / 'train.stats.json', stats_msg)
return counts

def add_mono_entry(self, dirpath, entry: Entry, compress=False):
flag_file = dirpath / f'.valid.{entry.did}'
assert len(entry.did.langs) == 1, f'Monolingual entry expected, given {entry.did}'
Expand Down Expand Up @@ -362,11 +400,19 @@ def add_parts(self, dir_path, entries, drop_noise=False, compress=False, desc=No

tasks = [dict(dir_path=dir_path, entry=ent, drop_noise=drop_noise, compress=compress,
fail_on_error=fail_on_error) for ent in entries]
pool = Pool(self.n_jobs)
with pbar_man.counter(color='blue', leave=False, total=len(entries), unit='it', desc=desc,
with concurrent.futures.ProcessPoolExecutor(max_workers=self.n_jobs) as executor:
futures = [executor.submit(self.add_part_thread, task) for task in tasks]
with pbar_man.counter(color='blue', leave=False, total=len(entries), unit='it', desc=desc,
autorefresh=True, min_delta=Defaults.PBAR_REFRESH_INTERVAL, position=3) as pbar:
for _ in pool.imap_unordered(self.add_part_thread, tasks):
pbar.update(force=True)
for future in concurrent.futures.as_completed(futures):
try:
future.result()
except Exception as e:
log.error(f"Error in thread: {e}")
if fail_on_error:
raise e
finally:
pbar.update(force=True)

def add_parts_sequential(self, dir_path, entries, drop_noise=False, compress=False, desc=None, fail_on_error=False):
with pbar_man.counter(color='blue', leave=False, total=len(entries), unit='it', desc=desc,
Expand Down
4 changes: 3 additions & 1 deletion mtdata/index/opus/opus_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
data_file = resource_dir / 'opus_index.tsv'
""" To refresh the data_file from OPUS:
$ curl "https://opus.nlpl.eu/opusapi/?preprocessing=moses" > opus_all.json
$ cat opus_all.json | jq -r '.corpora[] | [.corpus, .version, .source, .target] | @tsv' | sort > opus_all.tsv
$ cat opus_all.json | jq -r '.corpora[] | [.corpus, .version, .source, .target] | @tsv' | LC_ALL=C sort > opus_index.tsv

# NOTE: locale is important! See https://twitter.com/thammegowda/status/1783292996773134695
"""


Expand Down
80 changes: 46 additions & 34 deletions mtdata/index/statmt.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,23 +92,15 @@ def load_parallel(index: Index):
did=DatasetId(group=GROUP_ID, name='news_commentary', version='14', langs=(l1, l2)),
url=NEWSCOM_v14 % (l1, l2), cite=cite))

for v in [15, 16]:
cite = ('barrault-etal-2020-findings',)
url = f"http://data.statmt.org/news-commentary/v{v}/training/news-commentary-v{v}.%s-%s.tsv.gz"
for pair in ['ar cs', 'ar de', 'ar en', 'ar es', 'ar fr', 'ar hi', 'ar id', 'ar it', 'ar ja', 'ar kk', 'ar nl',
'ar pt', 'ar ru', 'ar zh', 'cs de', 'cs en', 'cs es', 'cs fr', 'cs hi', 'cs id', 'cs it', 'cs ja',
'cs kk', 'cs nl', 'cs pt', 'cs ru', 'cs zh', 'de en', 'de es', 'de fr', 'de hi', 'de id', 'de it',
'de ja', 'de kk', 'de nl', 'de pt', 'de ru', 'de zh', 'en es', 'en fr', 'en hi', 'en id', 'en it',
'en ja', 'en kk', 'en nl', 'en pt', 'en ru', 'en zh', 'es fr', 'es hi', 'es id', 'es it', 'es ja',
'es kk', 'es nl', 'es pt', 'es ru', 'es zh', 'fr hi', 'fr id', 'fr it', 'fr ja', 'fr kk', 'fr nl',
'fr pt', 'fr ru', 'fr zh', 'hi id', 'hi it', 'hi nl', 'hi pt', 'hi ru', 'hi zh', 'id it', 'id ja',
'id kk', 'id nl', 'id pt', 'id ru', 'id zh', 'it kk', 'it nl', 'it pt', 'it ru', 'it zh', 'ja pt',
'ja ru', 'ja zh', 'kk nl', 'kk pt', 'kk ru', 'kk zh', 'nl pt', 'nl ru', 'nl zh', 'pt ru', 'pt zh',
'ru zh']:
l1, l2 = pair.split()
index.add_entry(Entry(did=DatasetId(group=GROUP_ID, name='news_commentary', version=f'{v}', langs=(l1, l2)),
url=url % (l1, l2), cite=cite))

for v in '15 16 17 18 18.1'.split():
cite = ('kocmi-etal-2023-findings',)
major_v = v.split('.')[0] # 18.1 -> 18
url = f"http://data.statmt.org/news-commentary/v{v}/training/news-commentary-v{major_v}.%s-%s.tsv.gz"
langs = "ar cs de en es fr hi id it ja kk nl pt ru zh".split()
for l1, l2 in itertools.combinations(langs, 2):
ent = Entry(did=DatasetId(group=GROUP_ID, name='news_commentary', version=v, langs=(l1, l2)),
url=url % (l1, l2), cite=cite)
index.add_entry(ent)

# ===== Wiki Titles V1
WIKI_TITLES_v1 = 'http://data.statmt.org/wikititles/v1/wikititles-v1.%s-%s.tsv.gz'
Expand Down Expand Up @@ -173,6 +165,27 @@ def load_parallel(index: Index):
url='http://data.statmt.org/wmt20/translation-task/dev.tgz',
cite=cite))

# test releases from github
for year, version, pairs in [
('2022', 'v1.2', 'cs-en:B:C cs-uk:A de-en:A:B de-fr:A en-cs:B:C en-de:A:B en-hr:A:stud en-ja:A en-liv:A en-ru:A en-uk:A en-zh:A:B fr-de:A ja-en:A liv-en:A ru-en:A ru-sah:A sah-ru:A uk-cs:A uk-en:A zh-en:A:B'.split()),
('2023', 'v.0.1', 'cs-uk:refA de-en:refA en-cs:refA en-de:refA en-he:refA:refB en-ja:refA en-ru:refA en-uk:refA en-zh:refA he-en:refA:refB ja-en:refA ru-en:refA uk-en:refA zh-en:refA'.split())
]:
url = f"https://github.com/wmt-conference/wmt{year[-2:]}-news-systems/archive/refs/tags/{version}.zip"
for pair in pairs:
pair, *refs = pair.split(':')
src, tgt = pair.split('-')
for ref in refs:
src_file = f"wmt{year[-2:]}-news-systems-*/txt/sources/generaltest{year}.{pair}.src.{src}"
tgt_file = f"wmt{year[-2:]}-news-systems-*/txt/references/generaltest{year}.{pair}.ref.{ref}.{tgt}"
if 'ref' in ref:
version = f'{year}_{ref}'
else:
version = f'{year}_ref{ref}'
did = DatasetId(group=GROUP_ID, name='generaltest', version=version, langs=(src, tgt))
ent = Entry(did=did, filename=f'wmt{year}-news-systems.zip', in_paths=[src_file, tgt_file], in_ext='txt', url=url)
index.add_entry(ent)


# Multi parallel
wmt_sets = {
'2009': ['en', 'cs', 'de', 'es', 'fr'],
Expand Down Expand Up @@ -367,7 +380,7 @@ def load_parallel(index: Index):


def load_mono(index: Index):

wmt22_cite = ('kocmi-etal-2022-findings',)
# 1. News Crawl
"""
Expand All @@ -376,7 +389,7 @@ def load_mono(index: Index):
for i in $langs; do
curl $base/$i | grep -o 'href="news[^"]*.gz"' | cut -f2 -d\"; sleep 1;
done | tee news.txt
cat news.txt | grep '^news.[0-9]\+.[a-z]\+.shuffled.deduped.gz$' | awk -F '.' '{if ($2 != last) {printf "\n"$2}; printf " "$3; last=$2}'
cat news.txt | grep '^news.[0-9]\+.[a-z]\+.shuffled.deduped.gz$' | sort | awk -F '.' '{if ($2 != last) {printf "\n"$2}; printf " "$3; last=$2}'
"""
news_crawl = """2007 cs de en es fr hu
2008 bn cs de en es fa fr hi hu it ky mk ps pt ru so sr sw ta uk zh
Expand All @@ -391,14 +404,14 @@ def load_mono(index: Index):
2017 bg cs de el en es et fi fr hi hr hu it lv pt ro ru tr zh
2018 am bg bn bs cs de el en es et fa fi fr gu hi hr hu it kk kn ko ky lt lv mr nl pa pl ps pt ro ru so sr sw ta te tr uk zh
2019 am bg bn bs cs de el en es et fa fi fr gu hi hr hu it ja kk kn ko ky lt lv mk ml mr nl or pa pl ps pt ro ru so sr sw ta te tr uk zh
2020 af am ar bg bm bn bs cs de el en es et fa fi fr gu ha hi hr hu id ig is it ja kk kn ko ky lt lv mk ml mr nl nr om or pa pl ps pt ro ru rw sn so sr sw ta te tig ti tl tr uk yo zh
2021 af am ar bg bm bn bs cs de el en es et fa fi fr gu ha hi hr hu id ig is it ja kk kn ko ky lt lv mk ml mr nl nr om or pa pl ps pt ro ru rw sn so sr sw ta te tig ti tl tr uk yo zh
2020,2021,2022,2023 af am ar bg bm bn bs cs de el en es et fa fi fr gu ha hi hr hu id ig is it ja kk kn ko ky lt lv mk ml mr nl nr om or pa pl ps pt ro ru rw sn so sr sw ta te tig ti tl tr uk yo zh
"""
news_crawl = [line.strip().split() for line in news_crawl.splitlines() if line.strip()]
for year, *langs in news_crawl:
for lang in langs:
url = f'https://data.statmt.org/news-crawl/{lang}/news.{year}.{lang}.shuffled.deduped.gz'
index += Entry(DatasetId(GROUP_ID, 'news_crawl', str(year), (lang,)), url=url, in_ext='txt', cite=wmt22_cite)
for years, *langs in news_crawl:
for year in years.split(','):
for lang in langs:
url = f'https://data.statmt.org/news-crawl/{lang}/news.{year}.{lang}.shuffled.deduped.gz'
index += Entry(DatasetId(GROUP_ID, 'news_crawl', str(year), (lang,)), url=url, in_ext='txt', cite=wmt22_cite)
# 2. News Discussions
for lang, years in [('en', range(2011, 2019+1)), ('fr', range(2006, 2019+1))]:
for year in years:
Expand All @@ -411,12 +424,13 @@ def load_mono(index: Index):
version = '10'
url = f'https://www.statmt.org/europarl/v{version}/training-monolingual/europarl-v{version}.{lang}.tsv.gz'
index += Entry(DatasetId(GROUP_ID, 'europarl', version, (lang,)), url=url, in_ext='tsv', cols=(0,), cite=wmt22_cite)

# 4. News Commentary
for version in '14 15 16 17'.split():
for version in '14 15 16 17 18 18.1'.split():
langs = "ar cs de en es fr hi id it ja kk nl pt ru zh".split()
for lang in langs:
url = f'https://data.statmt.org/news-commentary/v{version}/training-monolingual/news-commentary-v{version}.{lang}.gz'
major_version = version.split('.')[0]
url = f'https://data.statmt.org/news-commentary/v{version}/training-monolingual/news-commentary-v{major_version}.{lang}.gz'
index += Entry(DatasetId(GROUP_ID, 'news_commentary', version, (lang,)), url=url, in_ext='txt', cite=wmt22_cite)

# 5. Common Crawl
Expand All @@ -432,8 +446,8 @@ def load_mono(index: Index):
index.add_entry(Entry(DatasetId(GROUP_ID, 'commoncrawl', 'wmt22', (lang,)),
url=f'{prefix}/{path}', in_ext='txt', cite=wmt22_cite))
# 6. Extended Common Crawl
# these files are too large and are split into parts.
# these files are too large and are split into parts.

# 7. Uber Text Corpus
for filename in ["news.tokenized.shuffled.txt.bz2",
"wiki_dump.tokenized.txt.bz2",
Expand All @@ -444,8 +458,6 @@ def load_mono(index: Index):
index.add_entry(Entry(DatasetId('LangUk', name, '1', ('uk',)),
url=f'https://lang.org.ua/static/downloads/corpora/{filename}',
in_ext='txt', cite=wmt22_cite))

# 8. Leipzig Corpora: lot of files, so we moved to a separate module
# 9. Legal Ukrainian


# 9. Legal Ukrainian
Loading
Loading