Skip to content

Commit

Permalink
update phasenet plus
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuwq0 committed Nov 1, 2024
1 parent da0705a commit da9ee53
Show file tree
Hide file tree
Showing 8 changed files with 488 additions and 322 deletions.
3 changes: 3 additions & 0 deletions scripts/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ def parse_args():
parser.add_argument("--num_nodes", type=int, default=1, help="number of nodes")
parser.add_argument("--node_rank", type=int, default=0, help="node rank")

## Model
parser.add_argument("--model", type=str, default="phasenet", help="model")

## ADLOC
parser.add_argument("--iter", type=int, default=0, help="iteration")

Expand Down
43 changes: 22 additions & 21 deletions scripts/merge_phasenet_picks.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,31 @@
from glob import glob


def scan_csv(year, root_path, fs=None, bucket=None, protocol="file"):
def scan_csv(year, root_path, region, model, fs=None, bucket=None, protocol="file"):
# %%
csv_list = []
if protocol != "file":
jdays = fs.ls(f"{bucket}/{region}/{folder}/{year}")
jdays = fs.ls(f"{bucket}/{region}/{model}/picks/{year}")
else:
jdays = os.listdir(f"{root_path}/{region}/phasenet/picks/{year}/")
jdays = os.listdir(f"{root_path}/{region}/{model}/picks/{year}/")

for jday in jdays:
if protocol != "file":
csvs = fs.glob(f"{jday}/??/*.csv")
else:
csvs = glob(f"{root_path}/{region}/phasenet/picks/{year}/{jday}/??/*.csv")
csvs = glob(f"{root_path}/{region}/{model}/picks/{year}/{jday}/??/*.csv")

csv_list.extend([[year, jday, csv] for csv in csvs])

csvs = pd.DataFrame(csv_list, columns=["year", "jday", "csv"])
csv_file = f"{root_path}/{region}/phasenet/csv_list_{year}.csv"
csv_file = f"{root_path}/{region}/{model}/csv_list_{year}.csv"
csvs.to_csv(csv_file, index=False)

return csv_file


# %%
def read_csv(rows, region, year, jday, root_path, fs=None, bucket=None):
def read_csv(rows, region, model, year, jday, root_path, fs=None, bucket=None):

picks = []
for i, row in rows.iterrows():
Expand All @@ -58,15 +58,15 @@ def read_csv(rows, region, year, jday, root_path, fs=None, bucket=None):

if len(picks) > 0:
picks = pd.concat(picks, ignore_index=True)
if not os.path.exists(f"{root_path}/{region}/phasenet/{year}"):
os.makedirs(f"{root_path}/{region}/phasenet/{year}", exist_ok=True)
picks.to_csv(f"{root_path}/{region}/phasenet/{year}/{year}.{jday}.csv", index=False)
if not os.path.exists(f"{root_path}/{region}/{model}/{year}"):
os.makedirs(f"{root_path}/{region}/{model}/{year}", exist_ok=True)
picks.to_csv(f"{root_path}/{region}/{model}/{year}/{year}.{jday}.csv", index=False)
# fs.put(
# f"{root_path}/{region}/phasenet/{year}/{jday}/{year}.{jday}.csv",
# f"{bucket}/{region}/phasenet_merged/{year}/{year}.{jday}.csv",
# )
else:
with open(f"{root_path}/{region}/phasenet/{year}/{year}.{jday}.csv", "w") as f:
with open(f"{root_path}/{region}/{model}/{year}/{year}.{jday}.csv", "w") as f:
f.write("")


Expand All @@ -76,9 +76,9 @@ def read_csv(rows, region, year, jday, root_path, fs=None, bucket=None):
args = parse_args()
root_path = args.root_path
region = args.region
model = args.model

data_path = f"{region}/phasenet/picks"
result_path = f"{region}/phasenet"
result_path = f"{region}/{model}"

# %%
# protocol = "gs"
Expand All @@ -88,32 +88,33 @@ def read_csv(rows, region, year, jday, root_path, fs=None, bucket=None):
# fs = fsspec.filesystem(protocol, token=token)

# %%
years = os.listdir(f"{root_path}/{region}/phasenet/picks")
# years = os.listdir(f"{root_path}/{region}/{model}/picks_{model}")
years = glob(f"{root_path}/{region}/{model}/picks_{model}/????/")
years = [year.rstrip("/").split("/")[-1] for year in years]
print(f"Years: {years}")

for year in years:

csv_list = scan_csv(year, root_path)
csv_list = scan_csv(year, root_path, region, model)

# %%
csv_list = pd.read_csv(csv_list, dtype=str)

# for jday, csvs in csv_list.groupby("jday"):
# read_csv(csvs, region, year, jday, root_path)
# read_csv(csvs, region, model, year, jday, root_path)
# raise

# ncpu = os.cpu_count()
ncpu = 64
ncpu = min(64, mp.cpu_count())
print(f"Number of processors: {ncpu}")
csv_by_jday = csv_list.groupby("jday")
pbar = tqdm(total=len(csv_by_jday), desc=f"Loading csv files (year {year})")

# with mp.Pool(ncpu) as pool:
ctx = mp.get_context("spawn")
with ctx.Pool(ncpu) as pool:
jobs = []
for jday, csvs in csv_by_jday:
job = pool.apply_async(
read_csv, (csvs, region, year, jday, root_path), callback=lambda _: pbar.update()
read_csv, (csvs, region, model, year, jday, root_path), callback=lambda _: pbar.update()
)
jobs.append(job)
pool.close()
Expand All @@ -126,11 +127,11 @@ def read_csv(rows, region, year, jday, root_path, fs=None, bucket=None):
pbar.close()

# %%
csvs = glob(f"{root_path}/{region}/phasenet/????/????.???.csv")
csvs = glob(f"{root_path}/{region}/{model}/????/????.???.csv")
picks = []
for csv in tqdm(csvs, desc="Merge csv files"):
picks.append(pd.read_csv(csv, dtype=str))
picks = pd.concat(picks, ignore_index=True)
picks.to_csv(f"{root_path}/{region}/phasenet/phasenet_picks.csv", index=False)
picks.to_csv(f"{root_path}/{region}/{model}/{model}_picks.csv", index=False)

# %%
134 changes: 134 additions & 0 deletions scripts/merge_phasenet_plus_picks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
# %%
import json
import multiprocessing as mp
import os

import fsspec
import numpy as np
import pandas as pd
from tqdm import tqdm
from args import parse_args
from glob import glob


def scan_csv(year, root_path, region, model, data="picks", fs=None, bucket=None, protocol="file"):
# %%
csv_list = []
if protocol != "file":
jdays = fs.ls(f"{bucket}/{region}/{model}/{data}_{model}/{year}")
else:
jdays = os.listdir(f"{root_path}/{region}/{model}/{data}_{model}/{year}/")

for jday in jdays:
if protocol != "file":
csvs = fs.glob(f"{jday}/??/*.csv")
else:
csvs = glob(f"{root_path}/{region}/{model}/{data}_{model}/{year}/{jday}/??/*.csv")

csv_list.extend([[year, jday, csv] for csv in csvs])

csvs = pd.DataFrame(csv_list, columns=["year", "jday", "csv"])
csv_file = f"{root_path}/{region}/{model}/{data}_list_{year}.csv"
csvs.to_csv(csv_file, index=False)

return csv_file


# %%
def read_csv(rows, region, model, data, year, jday, root_path, fs=None, bucket=None):

picks = []
for i, row in rows.iterrows():
# if fs.info(row["csv"])["size"] == 0:
# continue
# with fs.open(row["csv"], "r") as f:
# picks_ = pd.read_csv(f, dtype=str)
if os.path.getsize(row["csv"]) == 0:
continue
with open(row["csv"], "r") as f:
picks_ = pd.read_csv(f, dtype=str)
picks.append(picks_)

if len(picks) > 0:
picks = pd.concat(picks, ignore_index=True)
if not os.path.exists(f"{root_path}/{region}/{model}/{year}"):
os.makedirs(f"{root_path}/{region}/{model}/{year}", exist_ok=True)
picks.to_csv(f"{root_path}/{region}/{model}/{year}/{year}.{jday}.{data}.csv", index=False)
# fs.put(
# f"{root_path}/{region}/phasenet/{year}/{jday}/{year}.{jday}.csv",
# f"{bucket}/{region}/phasenet_merged/{year}/{year}.{jday}.csv",
# )
else:
with open(f"{root_path}/{region}/{model}/{year}/{year}.{jday}.{data}.csv", "w") as f:
f.write("")


# %%
if __name__ == "__main__":

args = parse_args()
root_path = args.root_path
region = args.region
# model = args.model
model = "phasenet_plus"

result_path = f"{region}/{model}"

# %%
# protocol = "gs"
# token_json = f"{os.environ['HOME']}/.config/gcloud/application_default_credentials.json"
# with open(token_json, "r") as fp:
# token = json.load(fp)
# fs = fsspec.filesystem(protocol, token=token)

# %%
# years = os.listdir(f"{root_path}/{region}/{model}/picks_{model}")
years = glob(f"{root_path}/{region}/{model}/picks_{model}/????/")
years = [year.rstrip("/").split("/")[-1] for year in years]
print(f"Years: {years}")

for year in years:

for data in ["picks", "events"]:

csv_list = scan_csv(year, root_path, region, model, data)

# %%
csv_list = pd.read_csv(csv_list, dtype=str)

# for jday, csvs in csv_list.groupby("jday"):
# read_csv(csvs, region, model, data, year, jday, root_path)
# raise

ncpu = min(64, mp.cpu_count())
print(f"Number of processors: {ncpu}")
csv_by_jday = csv_list.groupby("jday")
pbar = tqdm(total=len(csv_by_jday), desc=f"Loading {data} csv files (year {year})")

ctx = mp.get_context("spawn")
with ctx.Pool(ncpu) as pool:
jobs = []
for jday, csvs in csv_by_jday:
job = pool.apply_async(
read_csv, (csvs, region, model, data, year, jday, root_path), callback=lambda _: pbar.update()
)
jobs.append(job)
pool.close()
pool.join()
for job in jobs:
output = job.get()
if output:
print(output)

pbar.close()

# %%
for data in ["picks", "events"]:
csvs = glob(f"{root_path}/{region}/{model}/????/????.???.{data}.csv")
picks = []
for csv in tqdm(csvs, desc=f"Merge {data} csv files"):
picks.append(pd.read_csv(csv, dtype=str))
picks = pd.concat(picks, ignore_index=True)
picks.to_csv(f"{root_path}/{region}/{model}/{model}_{data}.csv", index=False)

# %%
5 changes: 4 additions & 1 deletion scripts/run_adloc.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ def run_adloc(
data_path = f"{root_path}/{region}/gamma"
picks_file = os.path.join(data_path, f"gamma_picks.csv")
events_file = os.path.join(data_path, f"gamma_events.csv")
# picks_file = f"{root_path}/{region}/phasenet_plus/phasenet_plus_picks_associated.csv"
# events_file = f"{root_path}/{region}/phasenet_plus/phasenet_plus_events_associated.csv"

# stations_file = os.path.join(data_path, "stations.csv")
stations_file = f"{root_path}/{region}/obspy/stations.json"

Expand Down Expand Up @@ -93,7 +96,7 @@ def run_adloc(
events[["x_km", "y_km"]] = events.apply(
lambda x: pd.Series(proj(longitude=x.longitude, latitude=x.latitude)), axis=1
)
events["z_km"] = events["depth_km"]
events["z_km"] = events["depth_km"] if "depth_km" in events.columns else 10.0

## set up the config; you can also specify the region manually
if ("xlim_km" not in config) or ("ylim_km" not in config) or ("zlim_km" not in config):
Expand Down
Loading

0 comments on commit da9ee53

Please sign in to comment.