diff --git a/ADLoc b/ADLoc index 63456e2..c62ec07 160000 --- a/ADLoc +++ b/ADLoc @@ -1 +1 @@ -Subproject commit 63456e26194d87bea1be6e0175aca7830d87cddf +Subproject commit c62ec0757addd8cb0893661d22d52be02b9dbc18 diff --git a/CCTorch b/CCTorch index c83436a..a0f3f45 160000 --- a/CCTorch +++ b/CCTorch @@ -1 +1 @@ -Subproject commit c83436a0edcf23b790c3b9076c37f3085f3c845a +Subproject commit a0f3f458fe38e082e099a346f3f58ec4002ba39a diff --git a/PhaseNet b/PhaseNet index d5a17ec..4f36f9a 160000 --- a/PhaseNet +++ b/PhaseNet @@ -1 +1 @@ -Subproject commit d5a17ec9d09bc6b1bdb3cfb811ee413d17ab0d3b +Subproject commit 4f36f9ab0d273446a349729044fb753451e8faca diff --git a/scripts/args.py b/scripts/args.py index b6931b5..885c718 100644 --- a/scripts/args.py +++ b/scripts/args.py @@ -19,6 +19,9 @@ def parse_args(): parser.add_argument("--num_nodes", type=int, default=1, help="number of nodes") parser.add_argument("--node_rank", type=int, default=0, help="node rank") + ## ADLOC + parser.add_argument("--iter", type=int, default=0, help="iteration") + ## CCTorch parser.add_argument("--dtct_pair", action="store_true", help="run convert_dtcc.py") diff --git a/scripts/cut_templates_cc.py b/scripts/cut_templates_cc.py index 43fb73a..c6fe29c 100644 --- a/scripts/cut_templates_cc.py +++ b/scripts/cut_templates_cc.py @@ -378,7 +378,9 @@ def cut_templates(root_path, region, config): xmin, ymin = proj(config["minlongitude"], config["minlatitude"]) xmax, ymax = proj(config["maxlongitude"], config["maxlatitude"]) - zmin, zmax = config["mindepth"], config["maxdepth"] + # zmin, zmax = config["mindepth"], config["maxdepth"] + zmin = config["mindepth"] if "mindepth" in config else 0.0 + zmax = config["maxdepth"] if "maxdepth" in config else 60.0 config["xlim_km"] = (xmin, xmax) config["ylim_km"] = (ymin, ymax) config["zlim_km"] = (zmin, zmax) diff --git a/scripts/download_station.py b/scripts/download_station.py index 551b035..331e379 100644 --- a/scripts/download_station.py +++ b/scripts/download_station.py @@ -176,6 +176,7 @@ def parse_inventory_csv(inventory, mseed_ids=[]): sensor_description = channel.sensor.description channel_list.append( { + "station_id": f"{network.code}.{station.code}.{channel.location_code}.{channel.code[:-1]}", "network": network.code, "station": station.code, "location": channel.location_code, @@ -247,7 +248,9 @@ def parse_inventory_csv(inventory, mseed_ids=[]): tmp["provider"] = provider stations.append(tmp) stations = pd.concat(stations) - stations = stations.groupby(["network", "station", "location", "channel"], dropna=False).first().reset_index() + # stations = stations.groupby(["network", "station", "location", "channel"], dropna=False).first().reset_index() + stations = stations.sort_values(by=["station_id", "channel"]) + stations = stations.groupby(["station_id"], dropna=False).first().reset_index() print(f"Merged {len(stations)} channels") stations.to_csv(f"{root_path}/{result_dir}/stations.csv", index=False) if protocol != "file": diff --git a/scripts/merge_gamma_picks.py b/scripts/merge_gamma_picks.py new file mode 100644 index 0000000..b68c918 --- /dev/null +++ b/scripts/merge_gamma_picks.py @@ -0,0 +1,94 @@ +# %% +import json +import multiprocessing as mp +import os +from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed +from datetime import datetime, timedelta, timezone +from threading import Lock, Thread + +import fsspec +import numpy as np +import pandas as pd +import pyproj +from obspy import read_inventory +from obspy.clients.fdsn import Client +from sklearn.cluster import DBSCAN +from tqdm import tqdm +from args import parse_args +from glob import glob + + +def load_data(year, jday, data_path, root_path, bucket, protocol, token): + + fs = fsspec.filesystem(protocol, token=token) + adloc_events_csv = f"{data_path}/{year:04d}/adloc_events_{jday:03d}.csv" + adloc_picks_csv = f"{data_path}/{year:04d}/adloc_picks_{jday:03d}.csv" + if protocol == "file": + events = pd.read_csv(f"{root_path}/{adloc_events_csv}", parse_dates=["time"]) + picks = pd.read_csv(f"{root_path}/{adloc_picks_csv}", parse_dates=["phase_time"]) + else: + with fs.open(f"{bucket}/{adloc_events_csv}", "r") as fp: + events = pd.read_csv(fp, parse_dates=["time"]) + with fs.open(f"{bucket}/{adloc_picks_csv}", "r") as fp: + picks = pd.read_csv(fp, parse_dates=["phase_time"]) + + events["year"] = year + events["jday"] = jday + picks["year"] = year + picks["jday"] = jday + + return events, picks + + +# %% +if __name__ == "__main__": + + args = parse_args() + root_path = args.root_path + region = args.region + + data_path = f"{region}/gamma" + result_path = f"{region}/gamma" + + # %% + # protocol = "gs" + # token_json = f"{os.environ['HOME']}/.config/gcloud/application_default_credentials.json" + # with open(token_json, "r") as fp: + # token = json.load(fp) + # fs = fsspec.filesystem(protocol, token=token) + + # %% + event_csvs = sorted(glob(f"{root_path}/{data_path}/????/????.???.events.csv")) + + # %% + events = [] + picks = [] + for event_csv in tqdm(event_csvs, desc="Load event csvs"): + pick_csv = event_csv.replace("events.csv", "picks.csv") + year, jday = event_csv.split("/")[-1].split(".")[:2] + events_ = pd.read_csv(event_csv, dtype=str) + picks_ = pd.read_csv(pick_csv, dtype=str) + events_["year"] = year + events_["jday"] = jday + picks_["year"] = year + picks_["jday"] = jday + events.append(events_) + picks.append(picks_) + + events = pd.concat(events, ignore_index=True) + picks = pd.concat(picks, ignore_index=True) + + events["dummy_id"] = events["year"] + "." + events["jday"] + "." + events["event_index"] + picks["dummy_id"] = picks["year"] + "." + picks["jday"] + "." + picks["event_index"] + + events["event_index"] = np.arange(len(events)) + picks = picks.drop("event_index", axis=1) + picks = picks.merge(events[["dummy_id", "event_index"]], on="dummy_id") + + events.drop(["year", "jday", "dummy_id"], axis=1, inplace=True) + picks.drop(["year", "jday", "dummy_id"], axis=1, inplace=True) + + events.to_csv(f"{root_path}/{result_path}/gamma_events.csv", index=False) + picks.to_csv(f"{root_path}/{result_path}/gamma_picks.csv", index=False) + +# %% diff --git a/scripts/merge_phasenet_picks.py b/scripts/merge_phasenet_picks.py new file mode 100644 index 0000000..901274c --- /dev/null +++ b/scripts/merge_phasenet_picks.py @@ -0,0 +1,136 @@ +# %% +import json +import multiprocessing as mp +import os +from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed +from datetime import datetime, timedelta, timezone +from threading import Lock, Thread + +import fsspec +import numpy as np +import pandas as pd +import pyproj +from obspy import read_inventory +from obspy.clients.fdsn import Client +from sklearn.cluster import DBSCAN +from tqdm import tqdm +from args import parse_args +from glob import glob + + +def scan_csv(year, root_path, fs=None, bucket=None, protocol="file"): + # %% + csv_list = [] + if protocol != "file": + jdays = fs.ls(f"{bucket}/{region}/{folder}/{year}") + else: + jdays = os.listdir(f"{root_path}/{region}/phasenet/picks/{year}/") + + for jday in jdays: + if protocol != "file": + csvs = fs.glob(f"{jday}/??/*.csv") + else: + csvs = glob(f"{root_path}/{region}/phasenet/picks/{year}/{jday}/??/*.csv") + + csv_list.extend([[year, jday, csv] for csv in csvs]) + + csvs = pd.DataFrame(csv_list, columns=["year", "jday", "csv"]) + csv_file = f"{root_path}/{region}/phasenet/csv_list_{year}.csv" + csvs.to_csv(csv_file, index=False) + + return csv_file + + +# %% +def read_csv(rows, region, year, jday, root_path, fs=None, bucket=None): + + picks = [] + for i, row in rows.iterrows(): + # if fs.info(row["csv"])["size"] == 0: + # continue + # with fs.open(row["csv"], "r") as f: + # picks_ = pd.read_csv(f, dtype=str) + if os.path.getsize(row["csv"]) == 0: + continue + with open(row["csv"], "r") as f: + picks_ = pd.read_csv(f, dtype=str) + picks.append(picks_) + + if len(picks) > 0: + picks = pd.concat(picks, ignore_index=True) + if not os.path.exists(f"{root_path}/{region}/phasenet/{year}"): + os.makedirs(f"{root_path}/{region}/phasenet/{year}", exist_ok=True) + picks.to_csv(f"{root_path}/{region}/phasenet/{year}/{year}.{jday}.csv", index=False) + # fs.put( + # f"{root_path}/{region}/phasenet/{year}/{jday}/{year}.{jday}.csv", + # f"{bucket}/{region}/phasenet_merged/{year}/{year}.{jday}.csv", + # ) + else: + with open(f"{root_path}/{region}/phasenet/{year}/{year}.{jday}.csv", "w") as f: + f.write("") + + +# %% +if __name__ == "__main__": + + args = parse_args() + root_path = args.root_path + region = args.region + + data_path = f"{region}/phasenet/picks" + result_path = f"{region}/phasenet" + + # %% + # protocol = "gs" + # token_json = f"{os.environ['HOME']}/.config/gcloud/application_default_credentials.json" + # with open(token_json, "r") as fp: + # token = json.load(fp) + # fs = fsspec.filesystem(protocol, token=token) + + # %% + years = os.listdir(f"{root_path}/{region}/phasenet/picks") + + for year in years: + + csv_list = scan_csv(year, root_path) + + # %% + csv_list = pd.read_csv(csv_list, dtype=str) + + # for jday, csvs in csv_list.groupby("jday"): + # read_csv(csvs, region, year, jday, root_path) + # raise + + # ncpu = os.cpu_count() + ncpu = 64 + print(f"Number of processors: {ncpu}") + csv_by_jday = csv_list.groupby("jday") + pbar = tqdm(total=len(csv_by_jday), desc=f"Loading csv files (year {year})") + + # with mp.Pool(ncpu) as pool: + ctx = mp.get_context("spawn") + with ctx.Pool(ncpu) as pool: + jobs = [] + for jday, csvs in csv_by_jday: + job = pool.apply_async( + read_csv, (csvs, region, year, jday, root_path), callback=lambda _: pbar.update() + ) + jobs.append(job) + pool.close() + pool.join() + for job in jobs: + output = job.get() + if output: + print(output) + + pbar.close() + + # %% + csvs = glob(f"{root_path}/{region}/phasenet/????/????.???.csv") + picks = [] + for csv in tqdm(csvs, desc="Merge csv files"): + picks.append(pd.read_csv(csv, dtype=str)) + picks = pd.concat(picks, ignore_index=True) + picks.to_csv(f"{root_path}/{region}/phasenet/phasenet_picks.csv", index=False) + +# %% diff --git a/scripts/plot_catalog.py b/scripts/plot_catalog.py index 8062f25..7417769 100644 --- a/scripts/plot_catalog.py +++ b/scripts/plot_catalog.py @@ -48,7 +48,7 @@ # %% -gamma_file = f"{root_path}/{region}/gamma/gamma_events_000_001.csv" +gamma_file = f"{root_path}/{region}/gamma/gamma_events.csv" gamma_exist = False if os.path.exists(gamma_file): print(f"Reading {gamma_file}") diff --git a/scripts/run_adloc.py b/scripts/run_adloc.py index 6024b8e..0817fdb 100644 --- a/scripts/run_adloc.py +++ b/scripts/run_adloc.py @@ -45,8 +45,8 @@ def run_adloc( # %% data_path = f"{root_path}/{region}/gamma" - picks_file = os.path.join(data_path, f"gamma_picks_{node_rank:03d}_{num_nodes:03d}.csv") - events_file = os.path.join(data_path, f"gamma_events_{node_rank:03d}_{num_nodes:03d}.csv") + picks_file = os.path.join(data_path, f"gamma_picks.csv") + events_file = os.path.join(data_path, f"gamma_events.csv") # stations_file = os.path.join(data_path, "stations.csv") stations_file = f"{root_path}/{region}/obspy/stations.json" @@ -66,7 +66,7 @@ def run_adloc( stations.reset_index(drop=True, inplace=True) config["mindepth"] = config["mindepth"] if "mindepth" in config else 0.0 - config["maxdepth"] = config["maxdepth"] if "maxdepth" in config else 30.0 + config["maxdepth"] = config["maxdepth"] if "maxdepth" in config else 60.0 config["use_amplitude"] = True # ## Eikonal for 1D velocity model @@ -203,6 +203,7 @@ def run_adloc( for iter in range(MAX_SST_ITER): # picks, events = invert_location_iter(picks, stations, config, estimator, events_init=events_init, iter=iter) picks, events = invert_location(picks, stations, config, estimator, events_init=events_init, iter=iter) + station_term_amp = ( picks[picks["mask"] == 1.0].groupby("idx_sta").agg({"residual_amplitude": "median"}).reset_index() ) diff --git a/scripts/run_adloc_v2.py b/scripts/run_adloc_v2.py new file mode 100644 index 0000000..c24d4b6 --- /dev/null +++ b/scripts/run_adloc_v2.py @@ -0,0 +1,411 @@ +# %% +import argparse +import json +import multiprocessing as mp +import os +from typing import Dict, List, NamedTuple + +import fsspec +import matplotlib.pyplot as plt +import numpy as np +import pandas as pd +from adloc.eikonal2d import init_eikonal2d +from adloc.sacloc2d import ADLoc +from adloc.utils import invert_location, invert_location_iter +from args import parse_args +from glob import glob + +# from utils import plotting_ransac +from utils.plotting import plotting, plotting_ransac +from pyproj import Proj + + +# %% +def run_adloc( + root_path: str, + region: str, + config: Dict, + jdays: list, + iter: int = 0, + picks_csv: str = None, + protocol: str = "file", + bucket: str = "", + token: Dict = None, +): + + # %% + fs = fsspec.filesystem(protocol=protocol, token=token) + + for jday in jdays: + print(f"Processing jday: {jday}") + + year = int(jday.split(".")[0]) + jday = int(jday.split(".")[1]) + + # %% + data_path = f"{region}/gamma/{year:04d}" + result_path = f"{region}/adloc/{year:04d}" + if not os.path.exists(f"{root_path}/{result_path}"): + os.makedirs(f"{root_path}/{result_path}") + figure_path = f"{root_path}/{region}/adloc/figures/" + if not os.path.exists(figure_path): + os.makedirs(figure_path) + + # %% + if iter == 0: + # station_json = f"{region}/obspy/stations.csv" + stations_csv = f"{region}/obspy/stations.csv" + picks_csv = f"{data_path}/{year:04d}.{jday:03d}.picks.csv" + events_csv = f"{data_path}/{year:04d}.{jday:03d}.events.csv" + else: + # station_json = f"{result_path}/stations_sst_{iter-1}.csv" + stations_csv = f"{result_path}/{year:04d}.{jday:03d}.stations_sst_{iter-1}.csv" + picks_csv = f"{result_path}/{year:04d}.{jday:03d}.picks_sst_{iter-1}.csv" + events_csv = f"{result_path}/{year:04d}.{jday:03d}.events_sst_{iter-1}.csv" + + adloc_events_csv = f"{result_path}/{year:04d}.{jday:03d}.events_sst_{iter}.csv" + adloc_picks_csv = f"{result_path}/{year:04d}.{jday:03d}.picks_sst_{iter}.csv" + # stations_csv = f"{result_path}/stations_sst_{iter}.csv" + station_term_csv = f"{result_path}/{year:04d}.{jday:03d}.stations_sst_{iter}.csv" + + try: + if protocol == "file": + picks = pd.read_csv(f"{root_path}/{picks_csv}", parse_dates=["phase_time"]) + events = pd.read_csv(f"{root_path}/{events_csv}", parse_dates=["time"]) + else: + with fs.open(f"{bucket}/{picks_csv}") as fp: + picks = pd.read_csv(fp, parse_dates=["phase_time"]) + with fs.open(f"{bucket}/{events_csv}") as fp: + events = pd.read_csv(fp, parse_dates=["time"]) + except Exception as e: + print(f"Error reading {picks_csv}: {e}") + return + + events = None + + # picks["phase_time"] = pd.to_datetime(picks["phase_time"]) + # events["time"] = pd.to_datetime(events["time"]) + + # drop unnecessary columns + picks.drop(["id", "timestamp", "type", "amp", "prob", "event_idx"], axis=1, inplace=True, errors="ignore") + + # # increase weight for P/S pairs + # phase_counts = ( + # picks.groupby(["event_index", "station_id"])["phase_type"].nunique().reset_index(name="phase_count") + # ) + # merged = picks.merge(phase_counts, on=["event_index", "station_id"]) + # merged.loc[merged["phase_count"] == 2, "phase_score"] *= 1.5 + # picks = merged.drop(columns=["phase_count"]) + + # stations = pd.read_csv(stations_file, sep="\t") + if protocol == "file": + # stations = pd.read_json(f"{root_path}/{station_json}", orient="index") + stations = pd.read_csv(f"{root_path}/{stations_csv}") + else: + with fs.open(f"{bucket}/{station_json}") as fp: + stations = pd.read_json(fp, orient="index") + + # stations = stations[["station_id", "longitude", "latitude", "elevation_m", "num_picks"]] + # stations["station_id"] = stations.index + # stations.reset_index(drop=True, inplace=True) + + # %% + config["mindepth"] = config["mindepth"] if "mindepth" in config else 0.0 + config["maxdepth"] = config["maxdepth"] if "maxdepth" in config else 60.0 + config["use_amplitude"] = True + + # %% + ## Automatic region; you can also specify a region + # lon0 = stations["longitude"].median() + # lat0 = stations["latitude"].median() + lon0 = (config["minlongitude"] + config["maxlongitude"]) / 2 + lat0 = (config["minlatitude"] + config["maxlatitude"]) / 2 + proj = Proj(f"+proj=sterea +lon_0={lon0} +lat_0={lat0} +units=km") + + # %% + stations["depth_km"] = -stations["elevation_m"] / 1000 + if "station_term_time_p" not in stations.columns: + stations["station_term_time_p"] = 0.0 + if "station_term_time_s" not in stations.columns: + stations["station_term_time_s"] = 0.0 + if "station_term_amplitude" not in stations.columns: + stations["station_term_amplitude"] = 0.0 + stations[["x_km", "y_km"]] = stations.apply( + lambda x: pd.Series(proj(longitude=x.longitude, latitude=x.latitude)), axis=1 + ) + stations["z_km"] = stations["elevation_m"].apply(lambda x: -x / 1e3) + + if events is not None: + events[["x_km", "y_km"]] = events.apply( + lambda x: pd.Series(proj(longitude=x.longitude, latitude=x.latitude)), axis=1 + ) + events["z_km"] = events["depth_km"] + + ## set up the config; you can also specify the region manually + if ("xlim_km" not in config) or ("ylim_km" not in config) or ("zlim_km" not in config): + + # project minlatitude, maxlatitude, minlongitude, maxlongitude to ymin, ymax, xmin, xmax + xmin, ymin = proj(config["minlongitude"], config["minlatitude"]) + xmax, ymax = proj(config["maxlongitude"], config["maxlatitude"]) + zmin, zmax = config["mindepth"], config["maxdepth"] + config["xlim_km"] = (xmin, xmax) + config["ylim_km"] = (ymin, ymax) + config["zlim_km"] = (zmin, zmax) + + config["vel"] = {"P": 6.0, "S": 6.0 / 1.73} + + # %% + config["eikonal"] = None + + # ## Eikonal for 1D velocity model + # zz = [0.0, 5.5, 16.0, 32.0] + # vp = [5.5, 5.5, 6.7, 7.8] + # vp_vs_ratio = 1.73 + # vs = [v / vp_vs_ratio for v in vp] + # Northern California (Gil7) + zz = [0.0, 1.0, 3.0, 4.0, 5.0, 17.0, 25.0, 62.0] + vp = [3.2, 3.2, 4.5, 4.8, 5.51, 6.21, 6.89, 7.83] + vs = [1.5, 1.5, 2.4, 2.78, 3.18, 3.40, 3.98, 4.52] + h = 0.3 + + if os.path.exists(f"{root_path}/{region}/obspy/velocity.csv"): + velocity = pd.read_csv(f"{root_path}/{region}/obspy/velocity.csv") + zz = velocity["z_km"].values + vp = velocity["vp"].values + vs = velocity["vs"].values + h = 0.1 + + vel = {"Z": zz, "P": vp, "S": vs} + config["eikonal"] = { + "vel": vel, + "h": h, + "xlim_km": config["xlim_km"], + "ylim_km": config["ylim_km"], + "zlim_km": config["zlim_km"], + } + config["eikonal"] = init_eikonal2d(config["eikonal"]) + + # %% config for location + config["min_picks"] = 6 # for sampling not for filtering + config["min_picks_ratio"] = 0.5 # for sampling + config["max_residual_time"] = 1.0 + config["max_residual_amplitude"] = 1.0 + config["min_score"] = 0.5 + config["min_p_picks"] = 1.5 # for filtering + config["min_s_picks"] = 1.5 # for filtering + + config["bfgs_bounds"] = ( + (config["xlim_km"][0] - 1, config["xlim_km"][1] + 1), # x + (config["ylim_km"][0] - 1, config["ylim_km"][1] + 1), # y + # (config["zlim_km"][0], config["zlim_km"][1] + 1), # z + (0, config["zlim_km"][1] + 1), + (None, None), # t + ) + + # %% + plt.figure() + print(stations[["station_term_time_p", "station_term_time_s", "station_term_amplitude"]].describe()) + plt.scatter( + stations["x_km"], stations["y_km"], c=stations["station_term_time_p"], cmap="viridis_r", s=100, marker="^" + ) + plt.colorbar() + plt.xlabel("X (km)") + plt.ylabel("Y (km)") + plt.xlim(config["xlim_km"]) + plt.ylim(config["ylim_km"]) + plt.title("Stations") + plt.savefig(f"{figure_path}/stations_sst_{iter}.png", bbox_inches="tight", dpi=300) + + # %% + mapping_phase_type_int = {"P": 0, "S": 1} + config["vel"] = {mapping_phase_type_int[k]: v for k, v in config["vel"].items()} + picks["phase_type"] = picks["phase_type"].map(mapping_phase_type_int) + if "phase_amplitude" in picks.columns: + picks["phase_amplitude"] = picks["phase_amplitude"].apply( + lambda x: np.log10(x) + 2.0 + ) # convert to log10(cm/s) + + # %% + # reindex in case the index does not start from 0 or is not continuous + stations["idx_sta"] = np.arange(len(stations)) + if events is not None: + # reindex in case the index does not start from 0 or is not continuous + events["idx_eve"] = np.arange(len(events)) + + else: + picks = picks.merge(stations[["station_id", "x_km", "y_km", "z_km"]], on="station_id") + events = picks.groupby("event_index").agg( + {"x_km": "mean", "y_km": "mean", "z_km": "mean", "phase_time": "min"} + ) + picks.drop(["x_km", "y_km", "z_km"], axis=1, inplace=True) + events["z_km"] = 10.0 # km default depth + events.rename({"phase_time": "time"}, axis=1, inplace=True) + events["event_index"] = events.index + events.reset_index(drop=True, inplace=True) + events["idx_eve"] = np.arange(len(events)) + + picks = picks.merge(events[["event_index", "idx_eve"]], on="event_index") + picks = picks.merge(stations[["station_id", "idx_sta"]], on="station_id") + + # %% + estimator = ADLoc(config, stations=stations[["x_km", "y_km", "z_km"]].values, eikonal=config["eikonal"]) + + # %% + NCPU = mp.cpu_count() + # MAX_SST_ITER = 1 + # MIN_SST_S = 0.01 + events_init = events.copy() + + if True: + # for iter in range(MAX_SST_ITER): + # picks, events = invert_location_iter(picks, stations, config, estimator, events_init=events_init, iter=iter) + picks, events = invert_location( + picks, stations, config, estimator, events_init=events_init, iter=f"{year}.{jday}" + ) + + # %% + station_num_picks = picks[picks["mask"] == 1.0].groupby("idx_sta").size().reset_index(name="num_picks") + prev_num_picks = stations["num_picks"].values + current_num_picks = stations["idx_sta"].map(station_num_picks.set_index("idx_sta")["num_picks"]).fillna(0) + stations["num_picks"] = prev_num_picks + current_num_picks + + # %% + station_term_amp = ( + picks[picks["mask"] == 1.0].groupby("idx_sta").agg({"residual_amplitude": "median"}).reset_index() + ) + # stations["station_term_amplitude"] += ( + # stations["idx_sta"].map(station_term_amp.set_index("idx_sta")["residual_amplitude"]).fillna(0) + # ) + prev_station_term_amp = stations["station_term_amplitude"].values + current_station_term_amp = ( + stations["idx_sta"].map(station_term_amp.set_index("idx_sta")["residual_amplitude"]).fillna(0) + ) + stations["station_term_amplitude"] = ( + prev_station_term_amp * prev_num_picks + current_station_term_amp * current_num_picks + ) / (prev_num_picks + current_num_picks) + + ## Same P and S station term + # station_term_time = picks[picks["mask"] == 1.0].groupby("idx_sta").agg({"residual_time": "mean"}).reset_index() + # stations["station_term_time_p"] += ( + # stations["idx_sta"].map(station_term_time.set_index("idx_sta")["residual_time"]).fillna(0) + # ) + # stations["station_term_time_s"] += ( + # stations["idx_sta"].map(station_term_time.set_index("idx_sta")["residual_time"]).fillna(0) + # ) + + ## Separate P and S station term + station_term_time = ( + picks[picks["mask"] == 1.0] + .groupby(["idx_sta", "phase_type"]) + .agg({"residual_time": "mean"}) + .reset_index() + ) + # stations["station_term_time_p"] += ( + # stations["idx_sta"] + # .map(station_term_time[station_term_time["phase_type"] == 0].set_index("idx_sta")["residual_time"]) + # .fillna(0) + # ) + # stations["station_term_time_s"] += ( + # stations["idx_sta"] + # .map(station_term_time[station_term_time["phase_type"] == 1].set_index("idx_sta")["residual_time"]) + # .fillna(0) + # ) + prev_station_term_time_p = stations["station_term_time_p"].values + current_station_term_time_p = ( + stations["idx_sta"] + .map(station_term_time[station_term_time["phase_type"] == 0].set_index("idx_sta")["residual_time"]) + .fillna(0) + ) + stations["station_term_time_p"] = ( + prev_station_term_time_p * prev_num_picks + current_station_term_time_p * current_num_picks + ) / (prev_num_picks + current_num_picks) + + prev_station_term_time_s = stations["station_term_time_s"].values + current_station_term_time_s = ( + stations["idx_sta"] + .map(station_term_time[station_term_time["phase_type"] == 1].set_index("idx_sta")["residual_time"]) + .fillna(0) + ) + stations["station_term_time_s"] = ( + prev_station_term_time_s * prev_num_picks + current_station_term_time_s * current_num_picks + ) / (prev_num_picks + current_num_picks) + + # %% + if "event_index" not in events.columns: + events["event_index"] = events.merge(picks[["idx_eve", "event_index"]], on="idx_eve")["event_index"] + events[["longitude", "latitude"]] = events.apply( + lambda x: pd.Series(proj(x["x_km"], x["y_km"], inverse=True)), axis=1 + ) + events["depth_km"] = events["z_km"] + events.drop(["idx_eve", "x_km", "y_km", "z_km"], axis=1, inplace=True, errors="ignore") + events.sort_values(["time"], inplace=True) + + picks["phase_amplitude"] = 10 ** (picks["phase_amplitude"] - 2.0) + picks.rename({"mask": "adloc_mask", "residual_time": "adloc_residual_time"}, axis=1, inplace=True) + if "residual_amplitude" in picks.columns: + picks.rename({"residual_amplitude": "adloc_residual_amplitude"}, axis=1, inplace=True) + picks["phase_type"] = picks["phase_type"].map({0: "P", 1: "S"}) + picks.drop(["idx_eve", "idx_sta"], axis=1, inplace=True, errors="ignore") + picks.sort_values(["phase_time"], inplace=True) + + stations.drop(["idx_sta", "x_km", "y_km", "z_km"], axis=1, inplace=True, errors="ignore") + + picks.to_csv(f"{root_path}/{adloc_picks_csv}", index=False) + events.to_csv(f"{root_path}/{adloc_events_csv}", index=False) + stations.to_csv(f"{root_path}/{station_term_csv}", index=False) + + if protocol != "file": + fs.put(f"{root_path}/{adloc_picks_csv}", f"{bucket}/{adloc_picks_csv}") + fs.put(f"{root_path}/{adloc_events_csv}", f"{bucket}/{adloc_events_csv}") + + +# %% +if __name__ == "__main__": + + args = parse_args() + + # protocol = "gs" + # token_json = "application_default_credentials.json" + # # token_json = f"{os.environ['HOME']}/.config/gcloud/application_default_credentials.json" + # with open(token_json, "r") as fp: + # token = json.load(fp) + + # fs = fsspec.filesystem(protocol, token=token) + + protocol = "file" + token = None + fs = fsspec.filesystem(protocol) + + region = args.region + root_path = args.root_path + bucket = args.bucket + num_nodes = args.num_nodes + node_rank = args.node_rank + iter = args.iter + + # %% + jdays = sorted(glob(f"{root_path}/{region}/gamma/????/????.???.events.csv")) + jdays = [jday.split("/")[-1].replace(".csv", "") for jday in jdays] + print(f"Number of pick files: {len(jdays)}") + jdays = [jdays[i::num_nodes] for i in range(num_nodes)] + + # %% + if protocol == "file": + with open(f"{root_path}/{region}/config.json", "r") as fp: + config = json.load(fp) + else: + with fs.open(f"{bucket}/{region}/config.json", "r") as fp: + config = json.load(fp) + + # %% + print(f"{jdays[node_rank] = }") + run_adloc( + root_path=root_path, + region=region, + config=config, + jdays=jdays[node_rank], + iter=iter, + protocol=protocol, + token=token, + bucket=bucket, + ) diff --git a/scripts/run_gamma_v2.py b/scripts/run_gamma_v2.py new file mode 100644 index 0000000..4f04cda --- /dev/null +++ b/scripts/run_gamma_v2.py @@ -0,0 +1,344 @@ +# %% +import argparse +import json +import os +from typing import Dict, NamedTuple + +import fsspec +import matplotlib.pyplot as plt +import numpy as np +import pandas as pd +from gamma.utils import association +from pyproj import Proj +from glob import glob +from args import parse_args + + +def run_gamma( + root_path: str, + region: str, + config: Dict, + jdays: list, + protocol: str = "file", + bucket: str = "", + token: Dict = None, +): + + # %% + fs = fsspec.filesystem(protocol=protocol, token=token) + + for jday in jdays: + + print(f"Processing {jday}") + + year = int(jday.split(".")[0]) + jday = int(jday.split(".")[1]) + + # %% + result_path = f"{region}/gamma/{year:04d}" + if not os.path.exists(f"{root_path}/{result_path}"): + os.makedirs(f"{root_path}/{result_path}") + + # %% + # station_csv = data_path / "stations.csv" + # station_json = f"{region}/results/network/stations.json" + station_json = f"{region}/obspy/stations.json" + # if picks_csv is None: + # picks_csv = f"{region}/results/phase_picking/{year:04d}/phase_picks_{jday:03d}.csv" + picks_csv = f"{region}/phasenet/{year:04d}/{year:04d}.{jday:03d}.csv" + gamma_events_csv = f"{result_path}/{year:04d}.{jday:03d}.events.csv" + gamma_picks_csv = f"{result_path}/{year:04d}.{jday:03d}.picks.csv" + + # %% + ## read picks + try: + if protocol == "file": + picks = pd.read_csv(f"{root_path}/{picks_csv}", parse_dates=["phase_time"]) + else: + # picks = pd.read_csv(f"{protocol}://{bucket}/{picks_csv}", parse_dates=["phase_time"]) + with fs.open(f"{bucket}/{picks_csv}", "r") as fp: + picks = pd.read_csv(fp, parse_dates=["phase_time"]) + except Exception as e: + print(f"Error reading {picks_csv}: {e}") + return NamedTuple("outputs", events=str, picks=str)(events=gamma_events_csv, picks=gamma_picks_csv) + + # ################### + # # stations = pd.read_json("tests/bo20240616/stations.json", orient="index") + # # stations["station_id"] = stations.index + # # gamma_picks = pd.read_csv("tests/bo20240616/gamma_picks_20230101_30min.csv") + # # phasenet_picks = pd.read_csv("tests/bo20240616/phasenet_picks_20230101_30min.csv") + # # phasenet_picks["phase_time"] = pd.to_datetime(phasenet_picks["phase_time"]) + # # gamma_picks["phase_time"] = pd.to_datetime(gamma_picks["phase_time"]) + # root_path = "./" + # gamma_events_csv = f"tests/bo20240616/gamma_events_{jday:03d}.csv" + # gamma_picks_csv = f"tests/bo20240616/gamma_picks_{jday:03d}.csv" + # picks = pd.read_csv("tests/bo20240616/phasenet_picks_20230101_30min.csv") + # ################### + + picks.rename( + columns={ + "station_id": "id", + "phase_time": "timestamp", + "phase_type": "type", + "phase_score": "prob", + "phase_amplitude": "amp", + }, + inplace=True, + ) + # picks["id"] = picks["id"].apply(lambda x: ".".join(x.split(".")[:2])) # remove channel + + ## read stations + if protocol == "file": + stations = pd.read_json(f"{root_path}/{station_json}", orient="index") + else: + with fs.open(f"{bucket}/{station_json}", "r") as fp: + stations = pd.read_json(fp, orient="index") + stations["id"] = stations.index + # stations["id"] = stations["id"].apply(lambda x: ".".join(x.split(".")[:2])) # remove channel + # stations = stations.groupby("id").first().reset_index() + + # ################### + # stations = pd.read_json("tests/bo20240616/stations.json", orient="index") + # stations["id"] = stations.index + # ################### + + if "longitude0" not in config: + config["longitude0"] = (config["minlongitude"] + config["maxlongitude"]) / 2 + if "latitude0" not in config: + config["latitude0"] = (config["minlatitude"] + config["maxlatitude"]) / 2 + proj = Proj(f"+proj=sterea +lon_0={config['longitude0']} +lat_0={config['latitude0']} +units=km") + stations[["x(km)", "y(km)"]] = stations.apply( + lambda x: pd.Series(proj(longitude=x.longitude, latitude=x.latitude)), axis=1 + ) + stations["z(km)"] = stations["elevation_m"].apply(lambda x: -x / 1e3) + + ### setting GMMA configs + config["use_dbscan"] = True + config["use_amplitude"] = True + config["method"] = "BGMM" + if config["method"] == "BGMM": ## BayesianGaussianMixture + config["oversample_factor"] = 2 + if config["method"] == "GMM": ## GaussianMixture + config["oversample_factor"] = 1 + + ## earthquake location + config["vel"] = {"p": 6.0, "s": 6.0 / 1.75} + config["dims"] = ["x(km)", "y(km)", "z(km)"] + xmin, ymin = proj(config["minlongitude"], config["minlatitude"]) + xmax, ymax = proj(config["maxlongitude"], config["maxlatitude"]) + config["x(km)"] = (xmin, xmax) + config["y(km)"] = (ymin, ymax) + if "gamma" not in config: + config["z(km)"] = (0, 60) + else: + config["z(km)"] = [config["gamma"]["zmin_km"], config["gamma"]["zmax_km"]] + config["bfgs_bounds"] = ( + (config["x(km)"][0] - 1, config["x(km)"][1] + 1), # x + (config["y(km)"][0] - 1, config["y(km)"][1] + 1), # y + (0, config["z(km)"][1] + 1), # z + (None, None), # t + ) + + ## DBSCAN + # config["dbscan_eps"] = estimate_eps(stations, config["vel"]["p"]) # s + # eps_year = { + # 2023: 10, + # 2022: 10.5, + # 2021: 10.5, + # 2020: 10.5, + # 2019: 10.5, + # 2018: 10.5, + # } + config["dbscan_eps"] = 12 + config["dbscan_min_samples"] = 6 + + ## Eikonal for 1D velocity model + # Southern California + # zz = [0.0, 5.5, 16.0, 32.0] + # vp = [5.5, 5.5, 6.7, 7.8] + # vp_vs_ratio = 1.73 + # vs = [v / vp_vs_ratio for v in vp] + # Northern California (Gil7) + zz = [0.0, 1.0, 3.0, 4.0, 5.0, 17.0, 25.0, 62.0] + vp = [3.2, 3.2, 4.5, 4.8, 5.51, 6.21, 6.89, 7.83] + vs = [1.5, 1.5, 2.4, 2.78, 3.18, 3.40, 3.98, 4.52] + h = 0.3 + vel = {"z": zz, "p": vp, "s": vs} + config["eikonal"] = { + "vel": vel, + "h": h, + "xlim": config["x(km)"], + "ylim": config["y(km)"], + "zlim": config["z(km)"], + } + + ## set number of cpus + config["ncpu"] = 32 + + ## filtering + config["min_picks_per_eq"] = 5 + config["min_p_picks_per_eq"] = 0 + config["min_s_picks_per_eq"] = 0 + config["max_sigma11"] = 3.0 * 5 # s + config["max_sigma22"] = 1.0 * 3 # log10(m/s) + config["max_sigma12"] = 1.0 * 3 # covariance + + ## filter picks without amplitude measurements + if config["use_amplitude"]: + picks = picks[picks["amp"] != -1] + + for k, v in config.items(): + print(f"{k}: {v}") + + # %% + event_idx0 = 0 ## current earthquake index + assignments = [] + events, assignments = association(picks, stations, config, event_idx0, config["method"]) + event_idx0 += len(events) + + if len(events) > 0: + ## create catalog + events = pd.DataFrame(events) + events[["longitude", "latitude"]] = events.apply( + lambda x: pd.Series(proj(longitude=x["x(km)"], latitude=x["y(km)"], inverse=True)), axis=1 + ) + events["depth_km"] = events["z(km)"] + events.sort_values("time", inplace=True) + with open(f"{root_path}/{gamma_events_csv}", "w") as fp: + events.to_csv(fp, index=False, float_format="%.3f", date_format="%Y-%m-%dT%H:%M:%S.%f") + + # plt.figure() + # plt.scatter( + # events["longitude"], + # events["latitude"], + # c=events["depth_km"], + # s=max(0.1, min(10, 5000 / len(events))), + # alpha=0.3, + # linewidths=0, + # cmap="viridis_r", + # ) + # plt.colorbar() + # plt.title(f"Number of events: {len(events)}") + # plt.savefig(f"{root_path}/{gamma_events_csv.replace('.csv', '')}_scaler60_oversample8.png") + + ## add assignment to picks + assignments = pd.DataFrame(assignments, columns=["pick_index", "event_index", "gamma_score"]) + picks = picks.join(assignments.set_index("pick_index")).fillna(-1).astype({"event_index": int}) + picks.rename( + columns={ + "id": "station_id", + "timestamp": "phase_time", + "type": "phase_type", + "prob": "phase_score", + "amp": "phase_amplitude", + }, + inplace=True, + ) + picks.sort_values(["phase_time"], inplace=True) + with open(f"{root_path}/{gamma_picks_csv}", "w") as fp: + picks.to_csv(fp, index=False, date_format="%Y-%m-%dT%H:%M:%S.%f") + + if protocol != "file": + fs.put(f"{root_path}/{gamma_events_csv}", f"{bucket}/{gamma_events_csv}") + fs.put(f"{root_path}/{gamma_picks_csv}", f"{bucket}/{gamma_picks_csv}") + + else: + print(f"No events associated in {picks_csv}") + with open(f"{root_path}/{gamma_events_csv}", "w") as fp: + pass + with open(f"{root_path}/{gamma_picks_csv}", "w") as fp: + pass + + # # %% copy to results/phase_association + # if not os.path.exists(f"{root_path}/{region}/results/phase_association"): + # os.makedirs(f"{root_path}/{region}/results/phase_association") + # os.system( + # f"cp {root_path}/{gamma_events_csv} {root_path}/{region}/results/phase_association/events_{jday:03d}.csv" + # ) + # os.system( + # f"cp {root_path}/{gamma_picks_csv} {root_path}/{region}/results/phase_association/picks_{jday:03d}.csv" + # ) + # if protocol != "file": + # fs.put( + # f"{root_path}/{gamma_events_csv}", + # f"{bucket}/{region}/results/phase_association/events_{jday:03d}.csv", + # ) + # print( + # f"Uploaded {root_path}/{gamma_events_csv} to {bucket}/{region}/results/phase_association/events_{jday:03d}.csv" + # ) + # fs.put( + # f"{root_path}/{gamma_picks_csv}", + # f"{bucket}/{region}/results/phase_association/picks_{jday:03d}.csv", + # ) + # print( + # f"Uploaded {root_path}/{gamma_picks_csv} to {bucket}/{region}/results/phase_association/picks_{jday:03d}.csv" + # ) + + # outputs = NamedTuple("outputs", events=str, picks=str) + # return outputs(events=gamma_events_csv, picks=gamma_picks_csv) + + +# def parse_args(): +# parser = argparse.ArgumentParser(description="Run Gamma on NCEDC/SCEDC data") +# parser.add_argument("--node_rank", type=int, default=0) +# parser.add_argument("--num_nodes", type=int, default=1) +# parser.add_argument("--year", type=int, default=2023) +# parser.add_argument("--root_path", type=str, default="local") +# parser.add_argument("--region", type=str, default="Cal") +# parser.add_argument("--bucket", type=str, default="quakeflow_catalog") +# return parser.parse_args() + + +if __name__ == "__main__": + + os.environ["OMP_NUM_THREADS"] = "8" + + args = parse_args() + + # protocol = "gs" + # token_json = f"application_default_credentials.json" + # # token_json = f"{os.environ['HOME']}/.config/gcloud/application_default_credentials.json" + # with open(token_json, "r") as fp: + # token = json.load(fp) + + # fs = fsspec.filesystem(protocol, token=token) + + protocol = "file" + token = None + fs = fsspec.filesystem(protocol) + + region = args.region + root_path = args.root_path + bucket = args.bucket + num_nodes = args.num_nodes + node_rank = args.node_rank + + # %% + # calc_jdays = lambda year: 366 if (year % 4 == 0 and year % 100 != 0) or (year % 400 == 0) else 365 + # jdays = [f"{year}.{i:03d}" for i in range(1, calc_jdays(year) + 1)] + # jdays = [jdays[i::num_nodes] for i in range(num_nodes)] + jdays = glob(f"{root_path}/{region}/phasenet/????/????.???.csv") + jdays = [jday.split("/")[-1].replace(".csv", "") for jday in jdays] + print(f"Number of pick files: {len(jdays)}") + + jdays = [jdays[i::num_nodes] for i in range(num_nodes)] + + # %% + if protocol == "file": + with open(f"{root_path}/{region}/config.json", "r") as fp: + config = json.load(fp) + else: + with fs.open(f"{bucket}/{region}/config.json", "r") as fp: + config = json.load(fp) + + # %% + print(f"{jdays[node_rank] = }") + run_gamma( + root_path=root_path, + region=region, + config=config, + jdays=jdays[node_rank], + protocol=protocol, + token=token, + bucket=args.bucket, + ) diff --git a/scripts/run_phasenet_v2.py b/scripts/run_phasenet_v2.py index 6d5cefb..2f3a11c 100644 --- a/scripts/run_phasenet_v2.py +++ b/scripts/run_phasenet_v2.py @@ -1,27 +1,27 @@ # %% from typing import Dict, List +from args import parse_args +import os +from collections import defaultdict +from glob import glob +import fsspec +import numpy as np +import json +import os +import sys -from kfp import dsl - -@dsl.component(base_image="zhuwq0/phasenet-api:latest") def run_phasenet( root_path: str, region: str, config: Dict, - rank: int = 0, + node_rank: int = 0, + num_nodes: int = 1, model_path: str = "../PhaseNet/", - mseed_list: List = None, protocol: str = "file", bucket: str = "", token: Dict = None, ) -> str: - # %% - import os - from collections import defaultdict - from glob import glob - - import fsspec # %% fs = fsspec.filesystem(protocol=protocol, token=token) @@ -33,177 +33,47 @@ def run_phasenet( # %% waveform_dir = f"{region}/waveforms" - if not os.path.exists(f"{root_path}/{waveform_dir}"): - if protocol != "file": - fs.get(f"{bucket}/{waveform_dir}/", f"{root_path}/{waveform_dir}/", recursive=True) + mseed_list = sorted(glob(f"{root_path}/{waveform_dir}/????/???/??/*.mseed")) # %% - def mapping_mseed_path( - data_path="", - network="??", - year="????", - jday="???", - month="??", - day="??", - hour="??", - station="*", - location="*", - channel="*", - datacenter="quakeflow", - group_channel=False, - protocol="file", - anon=False, - token=None, - ): - data_path = data_path.rstrip("/") - if datacenter.lower() == "quakeflow": - mseed_path = f"{data_path}/{year}-{jday}/{hour}/{network}.{station}.{location}.{channel}.mseed" - elif datacenter.lower() == "ncedc": - location = location.replace("-", "_") - # mseed_path = f"s3://ncedc-pds/continuous_waveforms/{network}/{year}/{year}.{jday}/{station}.{network}.{channel}.{location}.D.{year}.{jday}" - mseed_path = ( - f"{data_path}/{network}/{year}/{year}.{jday}/{station}.{network}.{channel}.{location}.D.{year}.{jday}" - ) - elif datacenter.lower() == "scedc": - location = location.replace("-", "_") - if "*" not in station: - station = f"{station:_<5}" - # mseed_path = f"s3://scedc-pds/continuous_waveforms/{network}/{year}/{year}_{jday}/{network}{station:_<5}{channel}_{location}{year}{jday}.ms" - mseed_path = f"{data_path}/{year}/{year}_{jday}/{network}{station}{channel}_{location}{year}{jday}.ms" - else: - raise ValueError("datacenter not supported") - - fs = fsspec.filesystem(protocol=protocol, anon=anon, token=token) - print(f"{mseed_path = }") - mseed_list = sorted(fs.glob(mseed_path)) - if protocol != "file": - mseed_list = [f"{protocol}://{mseed}" for mseed in mseed_list] - - if group_channel: - groups = defaultdict(list) - for mseed in mseed_list: - if datacenter.lower() == "quakeflow": - key = mseed.replace(".mseed", "")[:-1] + "*" - groups[key].append(mseed) - elif datacenter.lower() == "ncedc": - key = mseed[:-13] + "*" + mseed[-12:] # e.g. SQK.BG.DP*..D.2023.079 - groups[key].append(mseed) - elif datacenter.lower() == "scedc": - key = mseed[:-14] + "*" + mseed[-13:] # e.g. CISLB__BH*___2015002.ms - groups[key].append(mseed) - else: - raise ValueError("datacenter not supported") - mseed_list = [] - for key in groups: - mseed_list.append(",".join(sorted(groups[key]))) - - return mseed_list - - # %% - - if mseed_list is None: - # mseed_list = sorted(glob(f"{root_path}/{waveform_dir}/????-???/??/*.mseed")) - ####### - datacenter = "quakeflow" - protocol = "file" - data_path = f"{root_path}/{region}/waveforms" - network = "??" - year = "????" - jday = "???" - group_channel = True - ####### - # datacenter = "scedc" - # protocol = "s3" - # data_path = "s3://scedc-pds/continuous_waveforms" - # network = "CI" - # year = "2023" - # jday = "001" - # group_channel = True - ####### - # datacenter = "ncedc" - # protocol = "s3" - # data_path = "s3://ncedc-pds/continuous_waveforms" - # network = "NC" - # year = "2023" - # jday = "001" - # group_channel = True - ####### - mseed_list = mapping_mseed_path( - data_path=data_path, - network=network, - year=year, - jday=jday, - datacenter=datacenter, - group_channel=group_channel, - protocol=protocol, - anon=True, - token=token, - ) - print(f"{len(mseed_list) = }") - print("\n".join(mseed_list[:5])) - else: - with open(f"{root_path}/{region}/{mseed_list}", "r") as fp: - mseed_list = fp.read().split("\n") - - # %% group channels into stations - mseed_list = sorted(list(set([x.split(".mseed")[0][:-1] + "*.mseed" for x in mseed_list]))) + processed = sorted(glob(f"{root_path}/{result_path}/picks/????/???/??/*.csv")) + processed = [f.replace(f"{root_path}/{result_path}/picks/", "").replace(".csv", "")[:-1] for f in processed] + print(f"Processed: {len(processed)}") # %% - ## filter out processed events - # processed_list = sorted(list(fs.glob(f"{result_path}/????-???/??/*.csv"))) - # processed_event_id = [f.name.split("/")[-1].replace(".csv", ".mseed") for f in processed_list] - # file_list = [f for f in file_list if f.split("/")[-1] not in processed_event_id] - # mseed_list = sorted(list(set(mseed_list))) + mseed_3c = defaultdict(list) + for mseed in mseed_list: + key = mseed.replace(f"{root_path}/{waveform_dir}/", "").replace(".mseed", "")[:-1] + if key in processed: + continue + mseed_3c[key].append(mseed) + mseed_3c = [",".join(sorted(v)) for k, v in mseed_3c.items()] + print(f"Unprocessed: {len(mseed_3c)}") + mseed_3c = list(np.array_split(mseed_3c, num_nodes)[node_rank]) # %% - if protocol != "file": - fs.get(f"{bucket}/{region}/results/data/inventory.xml", f"{root_path}/{region}/results/data/inventory.xml") + mseed_file = f"{root_path}/{result_path}/mseed_list_{node_rank:03d}_{num_nodes:03d}.csv" + with open(mseed_file, "w") as fp: + fp.write("\n".join(mseed_3c)) # %% - with open(f"{root_path}/{result_path}/mseed_list_{rank:03d}.csv", "w") as fp: - fp.write("fname\n") - fp.write("\n".join(mseed_list)) + inventory_path = f"{root_path}/{region}/obspy/inventory" # %% os.system( - f"python {model_path}/phasenet/predict.py --model={model_path}/model/190703-214543 --data_dir=./ --data_list={root_path}/{result_path}/mseed_list_{rank:03d}.csv --response_xml={root_path}/{region}/results/data/inventory.xml --format=mseed --amplitude --highpass_filter=1.0 --result_dir={root_path}/{result_path} --result_fname=phasenet_picks_{rank:03d} --batch_size=1" + f"python {model_path}/phasenet/predict.py --model={model_path}/model/190703-214543 --data_dir=./ --data_list={mseed_file} --response_xml={inventory_path} --format=mseed --amplitude --highpass_filter=1.0 --result_dir={root_path}/{result_path} --result_fname=phasenet_picks_{node_rank:03d}_{num_nodes:03d} --batch_size=1" ) - if protocol != "file": - fs.put( - f"{root_path}/{result_path}/phasenet_picks_{rank:03d}.csv", - f"{bucket}/{result_path}/phasenet_picks_{rank:03d}.csv", - ) - - # copy to results/phase_picking - if not os.path.exists(f"{root_path}/{region}/results/phase_picking"): - os.makedirs(f"{root_path}/{region}/results/phase_picking") - os.system( - f"cp {root_path}/{result_path}/phasenet_picks_{rank:03d}.csv {root_path}/{region}/results/phase_picking/phase_picks_{rank:03d}.csv" - ) - if protocol != "file": - fs.put( - f"{root_path}/{result_path}/phasenet_picks_{rank:03d}.csv", - f"{bucket}/{region}/results/phase_picking/phase_picks_{rank:03d}.csv", - ) - - return f"{result_path}/phasenet_picks_{rank:03d}.csv" - if __name__ == "__main__": - import json - import os - import sys - root_path = "local" - region = "demo" - if len(sys.argv) > 1: - root_path = sys.argv[1] - region = sys.argv[2] + args = parse_args() + root_path = args.root_path + region = args.region with open(f"{root_path}/{region}/config.json", "r") as fp: config = json.load(fp) - run_phasenet.execute(root_path=root_path, region=region, config=config) + run_phasenet(root_path=root_path, region=region, config=config) # %%