Skip to content

Commit

Permalink
Merge pull request #63 from AlecThomson/use_sql
Browse files Browse the repository at this point in the history
Use sql
  • Loading branch information
AlecThomson authored Apr 17, 2024
2 parents b27e34a + e9db87c commit df825e2
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 25 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

* Allow PostgreSQL RACS database to be ingested in `spice_init`
* Add helper scipt `create_mongodb.py`

## [2.2.0] - 2024-04-11
### What's Changed

Expand Down
81 changes: 58 additions & 23 deletions arrakis/init_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from typing import Dict, List, Optional, Tuple, Union

import numpy as np
import pandas as pd
from astropy import units as u
from astropy.coordinates import Angle, SkyCoord, search_around_sky
from astropy.table import Table, vstack
Expand Down Expand Up @@ -117,7 +118,7 @@ def source_database(
# Use pandas and follow
# https://medium.com/analytics-vidhya/how-to-upload-a-pandas-dataframe-to-mongodb-ffa18c0953c1
df_i = islandcat.to_pandas()
if type(df_i["Source_ID"][0]) is bytes:
if isinstance(df_i["Source_ID"][0], bytes):
logger.info("Decoding strings!")
str_df = df_i.select_dtypes([object])
str_df = str_df.stack().str.decode("utf-8").unstack()
Expand All @@ -144,7 +145,7 @@ def source_database(
logger.info(f"Index created: {idx_res}")

df_c = compcat.to_pandas()
if type(df_c["Source_ID"][0]) is bytes:
if isinstance(df_c["Source_ID"][0], bytes):
logger.info("Decoding strings!")
str_df = df_c.select_dtypes([object])
str_df = str_df.stack().str.decode("utf-8").unstack()
Expand Down Expand Up @@ -226,10 +227,7 @@ def get_catalogue(survey_dir: Path, epoch: int = 0) -> Table:
Table: RACS catalogue table.
"""
basedir = survey_dir / "db" / f"epoch_{epoch}"
logger.info(f"Loading RACS database from {basedir}")
data_file = basedir / "field_data.csv"
database = Table.read(data_file)
database = read_racs_database(survey_dir, epoch, table="field_data")
# Remove rows with SBID < 0
database = database[database["SBID"] >= 0]

Expand All @@ -238,20 +236,21 @@ def get_catalogue(survey_dir: Path, epoch: int = 0) -> Table:
FIELD = row["FIELD_NAME"]
SBID = row["SBID"]
# Find FIELD and SBID in beamfile name
beamfile = basedir / f"beam_inf_{SBID}-{FIELD}.csv"
if not beamfile.exists():
raise FileNotFoundError(f"{beamfile} not found!")
racs_fields = Table.read(beamfile)
racs_fields = read_racs_database(
survey_dir, epoch, table=f"beam_inf_{SBID}-{FIELD}"
)
racs_fields.add_column(FIELD, name="FIELD_NAME", index=0)
racs_fields.add_column(SBID, name="SBID", index=0)

# Add in all others
for row in tqdm(database[1:], desc="Reading RACS database", file=TQDM_OUT):
beamfile = basedir / f"beam_inf_{row['SBID']}-{row['FIELD_NAME']}.csv"
if not beamfile.exists():
logger.error(f"{beamfile} not found!")
try:
tab = read_racs_database(
survey_dir, epoch, table=f"beam_inf_{row['SBID']}-{row['FIELD_NAME']}"
)
except Exception as e:
logger.error(e)
continue
tab = Table.read(beamfile)
try:
tab.add_column(row["FIELD_NAME"], name="FIELD_NAME", index=0)
tab.add_column(row["SBID"], name="SBID", index=0)
Expand Down Expand Up @@ -330,7 +329,7 @@ def get_beams(mastercat: Table, database: Table, epoch: int = 0) -> List[Dict]:

def beam_inf(
database: Table,
basedir: Path,
survey_dir: Path,
host: str,
epoch: int,
username: Optional[str] = None,
Expand All @@ -339,11 +338,15 @@ def beam_inf(
"""Get the beam information"""
tabs: List[Table] = []
for row in tqdm(database, desc="Reading beam info", file=TQDM_OUT):
fname = basedir / f"beam_inf_{row['SBID']}-{row['FIELD_NAME']}.csv"
if not fname.exists():
logger.error(f"{fname} not found!")
try:
tab = read_racs_database(
survey_dir=survey_dir,
epoch=epoch,
table=f"beam_inf_{row['SBID']}-{row['FIELD_NAME']}",
)
except Exception as e:
logger.error(e)
continue
tab = Table.read(fname)
if len(tab) == 0:
logger.error(f"{row['SBID']}-{row['FIELD_NAME']} failed...")
continue
Expand All @@ -370,6 +373,40 @@ def beam_inf(
return insert_res


def read_racs_database(
survey_dir: Path,
epoch: int,
table: str,
) -> Table:
"""Read the RACS database from CSVs or postgresql
Args:
survey_dir (Path): Path to RACS database (i.e. 'askap_surveys/racs' repo).
epoch (int): RACS epoch number.
table (str): Table name.
Returns:
Table: RACS database table.
"""
epoch_name = f"epoch_{epoch}"
if survey_dir.parent.name == "postgresql:":
logger.info("Reading RACS data from postgresql...")
_dbstring = f"{survey_dir.parent.name}//{survey_dir.name}/{epoch_name}"
_df = pd.read_sql(
f'SELECT * from "{table}"',
f"{_dbstring}",
)
return Table.from_pandas(_df)

logger.info("Reading RACS data from CSVs...")
basedir = survey_dir / "db" / epoch_name
data_file = basedir / f"{table}.csv"
if not data_file.exists():
raise FileNotFoundError(f"{data_file} not found!")

return Table.read(data_file)


def field_database(
survey_dir: Path,
host: str,
Expand All @@ -389,9 +426,7 @@ def field_database(
Returns:
Tuple[InsertManyResult, InsertManyResult]: Field and beam info insert object.
"""
basedir = survey_dir / "db" / f"epoch_{epoch}"
data_file = basedir / "field_data.csv"
database = Table.read(data_file)
database = read_racs_database(survey_dir, epoch, table="field_data")
if "COMMENT" in database.colnames:
database["COMMENT"] = database["COMMENT"].astype(str)
# Remove rows with SBID < 0
Expand All @@ -414,7 +449,7 @@ def field_database(

beam_res = beam_inf(
database=database,
basedir=basedir,
survey_dir=survey_dir,
host=host,
epoch=epoch,
username=username,
Expand Down
4 changes: 4 additions & 0 deletions docs/source/start.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ For example, you can start mongo using (for NUMA systems like Pawsey): ::
.. tip::
It can be very convenient to run this database on a VM service like Pawsey's Nimbus cloud. You can then access the database from anywhere, and you don't need to worry about the database being deleted when you log out. This will require some network setup, such as opening the port for MongoDB (27017) on the VM. Get in touch with your local helpdesk if you need help with this.

For conveniance, we have provided a helper script to setup a MongoDB with both an admin and read-only user in the `scripts` directory. You can run this script with the following command: ::

create_mongodb.py

RACS database
=============
.. attention::
Expand Down
8 changes: 6 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,15 @@ ipython = "*"
matplotlib = ">=3.8"
numba = "*"
numba_progress = "*"
pandas = "*"
pandas = ">=2"
psutil = "*"
pymongo = "*"
pymultinest = "*"
pytest = "*"
python_casacore = "*"
RACS-tools = ">=3"
radio_beam = "*"
RMextract = {git = "https://github.com/AlecThomson/RMextract@race"}
RMextract = {git = "https://github.com/AlecThomson/RMextract@race", optional=true}
schwimmbad = "*"
scipy = "*"
spectral_cube = ">=0.6.3"
Expand All @@ -65,6 +65,8 @@ PolSpectra = ">=1.1.0"
setuptools = "*"
fixms = ">=0.2"
fitscube = ">=0.3"
psycopg2-binary = "*"
sqlalchemy = "*"

[tool.poetry.dev-dependencies]
black = ">=23"
Expand All @@ -82,6 +84,7 @@ docs = [
"m2r2",
"numpydoc",
]
RMextract = ["RMextract"]

[build-system]
requires = ["poetry-core>=1.2", "numpy"]
Expand Down Expand Up @@ -110,6 +113,7 @@ hello_mpi_world = { reference="scripts/hello_mpi_world.py", type="file"}
make_links = { reference="scripts/make_links.py", type="file"}
spica = { reference="scripts/spica.py", type="file"}
tar_cubelets = { reference="scripts/tar_cubelets.py", type="file"}
create_mongodb = { reference="scripts/create_mongodb.py", type="file"}

[tool.isort]
profile = "black"

0 comments on commit df825e2

Please sign in to comment.