Skip to content

Commit

Permalink
Merge pull request #35 from publicmatt/feature_multiprocessing
Browse files Browse the repository at this point in the history
Add query multiprocessing
  • Loading branch information
crvernon authored Aug 31, 2023
2 parents 168bddf + f44c73e commit 1b1895d
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 20 deletions.
53 changes: 34 additions & 19 deletions gcamreader/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from click_default_group import DefaultGroup
import sys
import subprocess
import multiprocessing


@click.group(
Expand Down Expand Up @@ -140,26 +141,40 @@ def remote(

execute(conn, query_path, output_path, force)


def save(data):
conn, query, save_to, force = map(data.get, ["conn", "query", "save_to", "force"])
if save_to.exists():
click.echo(f"output exists: {save_to.name}", err=True)
if not force:
click.echo(f"skipping: {save_to.name}", err=True)
return
click.echo(f"running: {query.title}", err=True)
try:
df = conn.runQuery(query)
except subprocess.CalledProcessError as e:
click.echo(f"failed: {query.title}", err=True)
return
if df is None:
click.echo(f"empty: {query.title}", err=True)
return
df.to_csv(save_to, index=False, sep="|")
click.echo(f"saved: {save_to.absolute()}", err=True)


def execute(conn, query_path: Path, output_path: Path, force: bool):
# parse query xml
click.echo(f"parsing: {query_path.name}", err=True)
queries = parse_batch_query(str(query_path))
for query in queries:
out = output_path / f"{str(query.title).replace(' ', '_').lower()}.csv"
if out.exists():
click.echo(f"output exists: {out.name}", err=True)
if not force:
click.echo(f"skipping: {out.name}", err=True)
continue
click.echo(f"running: {query.title}", err=True)
try:
df = conn.runQuery(query)
except subprocess.CalledProcessError as e:
click.echo(f"failed: {query.title}", err=True)
continue
if df is None:
click.echo(f"empty: {query.title}", err=True)
continue
df.to_csv(out, index=False, sep="|")
click.echo(f"saved: {out.absolute()}", err=True)
queries = []
for query in parse_batch_query(str(query_path)):
data = {}
data["conn"] = conn
data["query"] = query
data["save_to"] = (
output_path / f"{str(query.title).replace(' ', '_').lower()}.csv"
)
data["force"] = force
queries.append(data)
with multiprocessing.Pool() as pool:
pool.map(save, queries)
click.echo(f"extract complete", err=True)
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from setuptools import setup, find_packages

VERSION = "1.3.1"
VERSION = "1.4.0"


def readme():
Expand Down

0 comments on commit 1b1895d

Please sign in to comment.