From 69a62cad2fd21370fb6cb8e91505d884f7b082d1 Mon Sep 17 00:00:00 2001 From: "Alec Thomson (S&A, Kensington WA)" Date: Tue, 16 Apr 2024 15:21:06 +0800 Subject: [PATCH 01/11] Use sql --- arrakis/init_database.py | 61 +++++++++++++++++++++++++++------------- 1 file changed, 42 insertions(+), 19 deletions(-) diff --git a/arrakis/init_database.py b/arrakis/init_database.py index 73d107f8..45d6fe88 100644 --- a/arrakis/init_database.py +++ b/arrakis/init_database.py @@ -7,6 +7,7 @@ from typing import Dict, List, Optional, Tuple, Union import numpy as np +import pandas as pd from astropy import units as u from astropy.coordinates import Angle, SkyCoord, search_around_sky from astropy.table import Table, vstack @@ -226,10 +227,7 @@ def get_catalogue(survey_dir: Path, epoch: int = 0) -> Table: Table: RACS catalogue table. """ - basedir = survey_dir / "db" / f"epoch_{epoch}" - logger.info(f"Loading RACS database from {basedir}") - data_file = basedir / "field_data.csv" - database = Table.read(data_file) + database = read_racs_database(survey_dir, epoch, table="field_data") # Remove rows with SBID < 0 database = database[database["SBID"] >= 0] @@ -238,20 +236,21 @@ def get_catalogue(survey_dir: Path, epoch: int = 0) -> Table: FIELD = row["FIELD_NAME"] SBID = row["SBID"] # Find FIELD and SBID in beamfile name - beamfile = basedir / f"beam_inf_{SBID}-{FIELD}.csv" - if not beamfile.exists(): - raise FileNotFoundError(f"{beamfile} not found!") - racs_fields = Table.read(beamfile) + racs_fields = read_racs_database( + survey_dir, epoch, table=f"beam_inf_{SBID}-{FIELD}" + ) racs_fields.add_column(FIELD, name="FIELD_NAME", index=0) racs_fields.add_column(SBID, name="SBID", index=0) # Add in all others for row in tqdm(database[1:], desc="Reading RACS database", file=TQDM_OUT): - beamfile = basedir / f"beam_inf_{row['SBID']}-{row['FIELD_NAME']}.csv" - if not beamfile.exists(): - logger.error(f"{beamfile} not found!") + try: + tab = read_racs_database( + survey_dir, epoch, table=f"beam_inf_{row['SBID']}-{row['FIELD_NAME']}" + ) + except Exception as e: + logger.error(e) continue - tab = Table.read(beamfile) try: tab.add_column(row["FIELD_NAME"], name="FIELD_NAME", index=0) tab.add_column(row["SBID"], name="SBID", index=0) @@ -339,11 +338,15 @@ def beam_inf( """Get the beam information""" tabs: List[Table] = [] for row in tqdm(database, desc="Reading beam info", file=TQDM_OUT): - fname = basedir / f"beam_inf_{row['SBID']}-{row['FIELD_NAME']}.csv" - if not fname.exists(): - logger.error(f"{fname} not found!") + try: + tab = read_racs_database( + survey_dir=basedir, + epoch=epoch, + table=f"beam_inf_{row['SBID']}-{row['FIELD_NAME']}", + ) + except Exception as e: + logger.error(e) continue - tab = Table.read(fname) if len(tab) == 0: logger.error(f"{row['SBID']}-{row['FIELD_NAME']} failed...") continue @@ -370,6 +373,28 @@ def beam_inf( return insert_res +def read_racs_database( + survey_dir: Path, + epoch: int, + table: str, +) -> Table: + epoch_name = f"epoch_{epoch}" + if survey_dir.parent.name == "postgresql:": + _dbstring = f"{survey_dir.parent.name}//{survey_dir.name}/{epoch_name}" + _df = pd.read_sql( + f"SELECT * from {table}", + f"{_dbstring}", + ) + return Table.from_pandas(_df) + + basedir = survey_dir / "db" / epoch_name + data_file = basedir / f"{table}.csv" + if not data_file.exists(): + raise FileNotFoundError(f"{data_file} not found!") + + return Table.read(data_file) + + def field_database( survey_dir: Path, host: str, @@ -389,9 +414,7 @@ def field_database( Returns: Tuple[InsertManyResult, InsertManyResult]: Field and beam info insert object. """ - basedir = survey_dir / "db" / f"epoch_{epoch}" - data_file = basedir / "field_data.csv" - database = Table.read(data_file) + database = read_racs_database(survey_dir, epoch, table="field_data") if "COMMENT" in database.colnames: database["COMMENT"] = database["COMMENT"].astype(str) # Remove rows with SBID < 0 From 6acf72d3617c4ad695e74a344e37e1b30e6200e4 Mon Sep 17 00:00:00 2001 From: "Alec Thomson (S&A, Kensington WA)" Date: Tue, 16 Apr 2024 15:22:12 +0800 Subject: [PATCH 02/11] Update pyproject --- pyproject.toml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 6fcea9fb..d87a2938 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,7 +40,7 @@ ipython = "*" matplotlib = ">=3.8" numba = "*" numba_progress = "*" -pandas = "*" +pandas = ">=2" psutil = "*" pymongo = "*" pymultinest = "*" @@ -65,6 +65,8 @@ PolSpectra = ">=1.1.0" setuptools = "*" fixms = ">=0.2" fitscube = ">=0.3" +psycopg2 = "*" +sqlalchemy = "*" [tool.poetry.dev-dependencies] black = ">=23" From fd341697efb1379fc313f7f2069c8284186c084b Mon Sep 17 00:00:00 2001 From: Alec Thomson Date: Tue, 16 Apr 2024 15:33:37 +0800 Subject: [PATCH 03/11] fixes --- pyproject.toml | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index d87a2938..9311be39 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,7 +48,7 @@ pytest = "*" python_casacore = "*" RACS-tools = ">=3" radio_beam = "*" -RMextract = {git = "https://github.com/AlecThomson/RMextract@race"} +RMextract = {git = "https://github.com/AlecThomson/RMextract@race", optional=true} schwimmbad = "*" scipy = "*" spectral_cube = ">=0.6.3" @@ -65,7 +65,7 @@ PolSpectra = ">=1.1.0" setuptools = "*" fixms = ">=0.2" fitscube = ">=0.3" -psycopg2 = "*" +psycopg2-binary = "*" sqlalchemy = "*" [tool.poetry.dev-dependencies] @@ -84,6 +84,7 @@ docs = [ "m2r2", "numpydoc", ] +RMextract = ["RMextract"] [build-system] requires = ["poetry-core>=1.2", "numpy"] From d67d75c1602e4bd47c51315b52b9bdea401785d7 Mon Sep 17 00:00:00 2001 From: "Alec Thomson (S&A, Kensington WA)" Date: Tue, 16 Apr 2024 15:36:21 +0800 Subject: [PATCH 04/11] Fix problems --- arrakis/init_database.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/arrakis/init_database.py b/arrakis/init_database.py index 45d6fe88..f5711b8a 100644 --- a/arrakis/init_database.py +++ b/arrakis/init_database.py @@ -118,7 +118,7 @@ def source_database( # Use pandas and follow # https://medium.com/analytics-vidhya/how-to-upload-a-pandas-dataframe-to-mongodb-ffa18c0953c1 df_i = islandcat.to_pandas() - if type(df_i["Source_ID"][0]) is bytes: + if isinstance(df_i["Source_ID"][0], bytes): logger.info("Decoding strings!") str_df = df_i.select_dtypes([object]) str_df = str_df.stack().str.decode("utf-8").unstack() @@ -145,7 +145,7 @@ def source_database( logger.info(f"Index created: {idx_res}") df_c = compcat.to_pandas() - if type(df_c["Source_ID"][0]) is bytes: + if isinstance(df_c["Source_ID"][0], bytes): logger.info("Decoding strings!") str_df = df_c.select_dtypes([object]) str_df = str_df.stack().str.decode("utf-8").unstack() @@ -329,7 +329,7 @@ def get_beams(mastercat: Table, database: Table, epoch: int = 0) -> List[Dict]: def beam_inf( database: Table, - basedir: Path, + survey_dir: Path, host: str, epoch: int, username: Optional[str] = None, @@ -340,7 +340,7 @@ def beam_inf( for row in tqdm(database, desc="Reading beam info", file=TQDM_OUT): try: tab = read_racs_database( - survey_dir=basedir, + survey_dir=survey_dir, epoch=epoch, table=f"beam_inf_{row['SBID']}-{row['FIELD_NAME']}", ) @@ -437,7 +437,7 @@ def field_database( beam_res = beam_inf( database=database, - basedir=basedir, + survey_dir=survey_dir, host=host, epoch=epoch, username=username, From 3ff57a5195359c50b840a84ed2d8c3bd492d72ab Mon Sep 17 00:00:00 2001 From: "Alec Thomson (S&A, Kensington WA)" Date: Tue, 16 Apr 2024 15:40:37 +0800 Subject: [PATCH 05/11] Add logging --- arrakis/init_database.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/arrakis/init_database.py b/arrakis/init_database.py index f5711b8a..4745fe33 100644 --- a/arrakis/init_database.py +++ b/arrakis/init_database.py @@ -380,6 +380,7 @@ def read_racs_database( ) -> Table: epoch_name = f"epoch_{epoch}" if survey_dir.parent.name == "postgresql:": + logger.info("Reading RACS data from postgresql...") _dbstring = f"{survey_dir.parent.name}//{survey_dir.name}/{epoch_name}" _df = pd.read_sql( f"SELECT * from {table}", @@ -387,6 +388,7 @@ def read_racs_database( ) return Table.from_pandas(_df) + logger.info("Reading RACS data from CSVs...") basedir = survey_dir / "db" / epoch_name data_file = basedir / f"{table}.csv" if not data_file.exists(): From 6f690220ade13f5fe76fa6c5d8395750098712bb Mon Sep 17 00:00:00 2001 From: "Alec Thomson (S&A, Kensington WA)" Date: Tue, 16 Apr 2024 16:09:50 +0800 Subject: [PATCH 06/11] Put table in string --- arrakis/init_database.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arrakis/init_database.py b/arrakis/init_database.py index 4745fe33..4f3f99f1 100644 --- a/arrakis/init_database.py +++ b/arrakis/init_database.py @@ -383,7 +383,7 @@ def read_racs_database( logger.info("Reading RACS data from postgresql...") _dbstring = f"{survey_dir.parent.name}//{survey_dir.name}/{epoch_name}" _df = pd.read_sql( - f"SELECT * from {table}", + f"SELECT * from '{table}'", f"{_dbstring}", ) return Table.from_pandas(_df) From 22e5e8a6a11091f8b8f5d5d208533c0c14e3553a Mon Sep 17 00:00:00 2001 From: "Alec Thomson (S&A, Kensington WA)" Date: Tue, 16 Apr 2024 16:23:45 +0800 Subject: [PATCH 07/11] Need inverted strings --- arrakis/init_database.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arrakis/init_database.py b/arrakis/init_database.py index 4f3f99f1..c5c36a5a 100644 --- a/arrakis/init_database.py +++ b/arrakis/init_database.py @@ -383,7 +383,7 @@ def read_racs_database( logger.info("Reading RACS data from postgresql...") _dbstring = f"{survey_dir.parent.name}//{survey_dir.name}/{epoch_name}" _df = pd.read_sql( - f"SELECT * from '{table}'", + f'SELECT * from "{table}"', f"{_dbstring}", ) return Table.from_pandas(_df) From 2ac7709d3de81e3b4fdce7658974163f09806e7b Mon Sep 17 00:00:00 2001 From: "Alec Thomson (S&A, Kensington WA)" Date: Tue, 16 Apr 2024 16:50:17 +0800 Subject: [PATCH 08/11] Docs --- arrakis/init_database.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/arrakis/init_database.py b/arrakis/init_database.py index c5c36a5a..0ae3ea66 100644 --- a/arrakis/init_database.py +++ b/arrakis/init_database.py @@ -378,6 +378,16 @@ def read_racs_database( epoch: int, table: str, ) -> Table: + """Read the RACS database from CSVs or postgresql + + Args: + survey_dir (Path): Path to RACS database (i.e. 'askap_surveys/racs' repo). + epoch (int): RACS epoch number. + table (str): Table name. + + Returns: + Table: RACS database table. + """ epoch_name = f"epoch_{epoch}" if survey_dir.parent.name == "postgresql:": logger.info("Reading RACS data from postgresql...") From 4dd4c1cbe6615020400e22f44419fc974babede4 Mon Sep 17 00:00:00 2001 From: "Alec Thomson (S&A, Kensington WA)" Date: Wed, 17 Apr 2024 11:04:30 +0800 Subject: [PATCH 09/11] Add script --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index 9311be39..6c1987b9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -113,6 +113,7 @@ hello_mpi_world = { reference="scripts/hello_mpi_world.py", type="file"} make_links = { reference="scripts/make_links.py", type="file"} spica = { reference="scripts/spica.py", type="file"} tar_cubelets = { reference="scripts/tar_cubelets.py", type="file"} +create_mongodb = { reference="scripts/create_mongodb.py", type="file"} [tool.isort] profile = "black" From 1f9b071eaae993e635d922c26bbc45c73f4a3324 Mon Sep 17 00:00:00 2001 From: "Alec Thomson (S&A, Kensington WA)" Date: Wed, 17 Apr 2024 11:04:39 +0800 Subject: [PATCH 10/11] Add docs --- docs/source/start.rst | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/source/start.rst b/docs/source/start.rst index 927987b6..8aa4f1ab 100644 --- a/docs/source/start.rst +++ b/docs/source/start.rst @@ -21,6 +21,10 @@ For example, you can start mongo using (for NUMA systems like Pawsey): :: .. tip:: It can be very convenient to run this database on a VM service like Pawsey's Nimbus cloud. You can then access the database from anywhere, and you don't need to worry about the database being deleted when you log out. This will require some network setup, such as opening the port for MongoDB (27017) on the VM. Get in touch with your local helpdesk if you need help with this. +For conveniance, we have provided a helper script to setup a MongoDB with both an admin and read-only user in the `scripts` directory. You can run this script with the following command: :: + + create_mongodb.py + RACS database ============= .. attention:: From e9db87c52361af9ce48c4af36e970ba85504f1ad Mon Sep 17 00:00:00 2001 From: "Alec Thomson (S&A, Kensington WA)" Date: Wed, 17 Apr 2024 11:11:05 +0800 Subject: [PATCH 11/11] Update changelog --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e4e54819..afe2e02b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +* Allow PostgreSQL RACS database to be ingested in `spice_init` +* Add helper scipt `create_mongodb.py` + ## [2.2.0] - 2024-04-11 ### What's Changed