Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove ray to enable python 3.12 compatibility #145

Merged
merged 1 commit into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ Peak GPU memory usage during the benchmark is `4.2GB` for nougat, and `4.1GB` fo

**Throughput**

Marker takes about 4.5GB of VRAM on average per task, so you can convert 10 documents in parallel on an A6000.
Marker takes about 4GB of VRAM on average per task, so you can convert 12 documents in parallel on an A6000.

![Benchmark results](data/images/per_doc.png)

Expand Down
74 changes: 35 additions & 39 deletions convert.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import argparse
import os
from typing import Dict, Optional

import ray
os.environ["IN_STREAMLIT"] = "true" # Avoid multiprocessing inside surya
os.environ["PDFTEXT_CPU_WORKERS"] = "1" # Avoid multiprocessing inside pdftext

import pypdfium2 # Needs to be at the top to avoid warnings
import argparse
import torch.multiprocessing as mp
from tqdm import tqdm
import math

Expand All @@ -19,8 +22,14 @@
configure_logging()


@ray.remote(num_cpus=settings.RAY_CORES_PER_WORKER, num_gpus=.05 if settings.CUDA else 0)
def process_single_pdf(filepath: str, out_folder: str, model_refs, metadata: Optional[Dict] = None, min_length: Optional[int] = None):
def worker_init(shared_model):
global model_refs
model_refs = shared_model


def process_single_pdf(args):
filepath, out_folder, metadata, min_length = args

fname = os.path.basename(filepath)
if markdown_exists(out_folder, fname):
return
Expand All @@ -46,6 +55,11 @@ def process_single_pdf(filepath: str, out_folder: str, model_refs, metadata: Opt
except Exception as e:
print(f"Error converting {filepath}: {e}")
print(traceback.format_exc())
finally:
# Release shared memory
for model in model_refs:
if model:
del model


def main():
Expand Down Expand Up @@ -86,45 +100,27 @@ def main():

total_processes = min(len(files_to_convert), args.workers)

ray.init(
num_cpus=total_processes,
num_gpus=1 if settings.CUDA else 0,
storage=settings.RAY_CACHE_PATH,
_temp_dir=settings.RAY_CACHE_PATH,
log_to_driver=settings.DEBUG
)
# Dynamically set GPU allocation per task based on GPU ram
if settings.CUDA:
tasks_per_gpu = settings.INFERENCE_RAM // settings.VRAM_PER_TASK if settings.CUDA else 0
total_processes = min(tasks_per_gpu, total_processes)

mp.set_start_method('spawn') # Required for CUDA, forkserver doesn't work
model_lst = load_all_models()
model_refs = ray.put(model_lst)

# Dynamically set GPU allocation per task based on GPU ram
gpu_frac = settings.VRAM_PER_TASK / settings.INFERENCE_RAM if settings.CUDA else 0
for model in model_lst:
if model:
model.share_memory()

print(f"Converting {len(files_to_convert)} pdfs in chunk {args.chunk_idx + 1}/{args.num_chunks} with {total_processes} processes, and storing in {out_folder}")
futures = [
process_single_pdf.options(num_gpus=gpu_frac).remote(
filepath,
out_folder,
model_refs,
metadata=metadata.get(os.path.basename(filepath)),
min_length=args.min_length
) for filepath in files_to_convert
]

# Run all ray conversion tasks
progress_bar = tqdm(total=len(futures))
while len(futures) > 0:
finished, futures = ray.wait(
futures, timeout=7.0
)
finished_lst = ray.get(finished)
if isinstance(finished_lst, list):
progress_bar.update(len(finished_lst))
else:
progress_bar.update(1)
task_args = [(f, out_folder, metadata.get(os.path.basename(f)), args.min_length) for f in files_to_convert]

with mp.Pool(processes=total_processes, initializer=worker_init, initargs=(model_lst,)) as pool:
list(tqdm(pool.imap(process_single_pdf, task_args), total=len(task_args), desc="Processing PDFs", unit="pdf"))

# Shutdown ray to free resources
ray.shutdown()
# Delete all CUDA tensors
for model in model_lst:
if model:
del model


if __name__ == "__main__":
Expand Down
1 change: 1 addition & 0 deletions convert_single.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import pypdfium2 # Needs to be at the top to avoid warnings
import argparse
import os

Expand Down
6 changes: 3 additions & 3 deletions marker/convert.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import warnings
warnings.filterwarnings("ignore", category=UserWarning) # Filter torch pytree user warnings

import pypdfium2 as pdfium
import pypdfium2 as pdfium # Needs to be at the top to avoid warnings
from PIL import Image

from marker.utils import flush_cuda_memory
Expand Down Expand Up @@ -33,8 +33,8 @@
def convert_single_pdf(
fname: str,
model_lst: List,
max_pages=None,
metadata: Optional[Dict]=None,
max_pages: int = None,
metadata: Optional[Dict] = None,
langs: Optional[List[str]] = None,
batch_multiplier: int = 1
) -> Tuple[str, Dict[str, Image.Image], Dict]:
Expand Down
2 changes: 1 addition & 1 deletion marker/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def load_all_models(langs=None, device=None, dtype=None, force_load_ocr=False):
edit = load_editing_model(device, dtype)

# Only load recognition model if we'll need it for all pdfs
ocr = setup_recognition_model(langs, device, dtype) if ((settings.OCR_ENGINE == "surya" and settings.OCR_ALL_PAGES) or force_load_ocr) else None
ocr = setup_recognition_model(langs, device, dtype)
texify = setup_texify_model(device, dtype)
model_lst = [texify, layout, order, edit, detection, ocr]
return model_lst
11 changes: 6 additions & 5 deletions marker/ocr/heuristics.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@


def should_ocr_page(page: Page, no_text: bool):
detected_lines_found = detected_line_coverage(page)
detected_lines_found, total_lines = detected_line_coverage(page)

# No reason to OCR page if it has no text lines
if total_lines == 0:
return False

# OCR page if we got minimal text, or if we got too many spaces
conditions = [
Expand Down Expand Up @@ -55,7 +59,6 @@ def no_text_found(pages: List[Page]):
def detected_line_coverage(page: Page, intersect_thresh=.5, detection_thresh=.4):
found_lines = 0
for detected_line in page.text_lines.bboxes:

# Get bbox and rescale to match dimensions of original page
detected_bbox = detected_line.bbox
detected_bbox = rescale_bbox(page.text_lines.image_bbox, page.bbox, detected_bbox)
Expand All @@ -69,6 +72,4 @@ def detected_line_coverage(page: Page, intersect_thresh=.5, detection_thresh=.4)
found_lines += 1

total_lines = len(page.text_lines.bboxes)
if total_lines == 0:
return False
return found_lines / total_lines > detection_thresh
return found_lines / total_lines > detection_thresh, total_lines
10 changes: 0 additions & 10 deletions marker/ocr/recognition.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,7 @@ def run_ocr(doc, pages: List[Page], langs: List[str], rec_model, batch_multiplie
if ocr_method is None:
return pages, {"ocr_pages": 0, "ocr_failed": 0, "ocr_success": 0, "ocr_engine": "none"}
elif ocr_method == "surya":
# Load model just in time if we're not OCRing everything
del_rec_model = False
if rec_model is None:
lang_tokens = langs_to_ids(langs)
rec_model = setup_recognition_model(lang_tokens)
del_rec_model = True

new_pages = surya_recognition(doc, ocr_idxs, langs, rec_model, pages, batch_multiplier=batch_multiplier)

if del_rec_model:
del rec_model
elif ocr_method == "ocrmypdf":
new_pages = tesseract_recognition(doc, ocr_idxs, langs)
else:
Expand Down
Loading
Loading