Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Partition weather station model predictions table, part 1 #4059

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
"""partition weather_station_model_predictions part 1

Revision ID: 07007f659064
Revises: c5bea0920d53
Create Date: 2024-11-04 10:41:31.466124

"""

from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision = "07007f659064"
down_revision = "c5bea0920d53"
branch_labels = None
depends_on = None

### Adapted from pgslice "prep" command
# BEGIN;
# CREATE TABLE "public"."weather_station_model_predictions_intermediate" (LIKE "public"."weather_station_model_predictions" INCLUDING DEFAULTS INCLUDING CONSTRAINTS INCLUDING STORAGE INCLUDING COMMENTS INCLUDING STATISTICS INCLUDING GENERATED INCLUDING COMPRESSION) PARTITION BY RANGE ("prediction_timestamp");
# CREATE UNIQUE INDEX ON "public"."weather_station_model_predictions_intermediate" USING btree (station_code, prediction_model_run_timestamp_id, prediction_timestamp);
# CREATE INDEX ON "public"."weather_station_model_predictions_intermediate" USING btree (id);
# CREATE INDEX ON "public"."weather_station_model_predictions_intermediate" USING btree (prediction_model_run_timestamp_id);
# CREATE INDEX ON "public"."weather_station_model_predictions_intermediate" USING btree (prediction_timestamp);
# CREATE INDEX ON "public"."weather_station_model_predictions_intermediate" USING btree (station_code);
# CREATE INDEX ON "public"."weather_station_model_predictions_intermediate" USING btree (update_date);
# CREATE INDEX ON "public"."weather_station_model_predictions_intermediate" USING btree (prediction_timestamp, station_code);
# ALTER TABLE "public"."weather_station_model_predictions_intermediate" ADD FOREIGN KEY (prediction_model_run_timestamp_id) REFERENCES prediction_model_run_timestamps(id);
# COMMENT ON TABLE "public"."weather_station_model_predictions_intermediate" IS 'column:prediction_timestamp,period:month,cast:timestamptz,version:3';
# COMMIT;


def upgrade():
op.execute(
'CREATE TABLE "public"."weather_station_model_predictions_intermediate" (LIKE "public"."weather_station_model_predictions" INCLUDING DEFAULTS INCLUDING CONSTRAINTS INCLUDING STORAGE INCLUDING COMMENTS INCLUDING STATISTICS INCLUDING GENERATED INCLUDING COMPRESSION) PARTITION BY RANGE ("prediction_timestamp");'
)
op.execute(
'CREATE UNIQUE INDEX wsmp_unique_record_idx ON "public"."weather_station_model_predictions_intermediate" USING btree (station_code, prediction_model_run_timestamp_id, prediction_timestamp);'
)
op.execute('CREATE INDEX wsmp_id_idx ON "public"."weather_station_model_predictions_intermediate" USING btree (id);')
op.execute('CREATE INDEX wsmp_prediction_model_run_timestamp_id_idx ON "public"."weather_station_model_predictions_intermediate" USING btree (prediction_model_run_timestamp_id);')
op.execute('CREATE INDEX wsmp_prediction_timestamp_idx ON "public"."weather_station_model_predictions_intermediate" USING btree (prediction_timestamp);')
op.execute('CREATE INDEX wsmp_station_code_idx ON "public"."weather_station_model_predictions_intermediate" USING btree (station_code);')
op.execute('CREATE INDEX wsmp_update_date_idx ON "public"."weather_station_model_predictions_intermediate" USING btree (update_date);')
op.execute('CREATE INDEX wsmp_prediction_station_code_idx ON "public"."weather_station_model_predictions_intermediate" USING btree (prediction_timestamp, station_code);')
op.execute(
'ALTER TABLE "public"."weather_station_model_predictions_intermediate" ADD CONSTRAINT wsmp_id_fk FOREIGN KEY (prediction_model_run_timestamp_id) REFERENCES prediction_model_run_timestamps(id);'
)
op.execute('COMMENT ON TABLE "public"."weather_station_model_predictions_intermediate" IS \'column:prediction_timestamp,period:month,cast:timestamptz,version:3\';')


def downgrade():
op.execute('COMMENT ON TABLE "public"."weather_station_model_predictions_intermediate" IS NULL;')
op.execute('ALTER TABLE "public"."weather_station_model_predictions_intermediate" DROP CONSTRAINT wsmp_id_fk;')
op.execute("DROP INDEX wsmp_prediction_station_code_idx;")
op.execute("DROP INDEX wsmp_update_date_idx;")
op.execute("DROP INDEX wsmp_station_code_idx;")
op.execute("DROP INDEX wsmp_prediction_timestamp_idx;")
op.execute("DROP INDEX wsmp_prediction_model_run_timestamp_id_idx;")
op.execute("DROP INDEX wsmp_id_idx;")
op.execute("DROP INDEX wsmp_unique_record_idx;")
op.execute('DROP TABLE "public"."weather_station_model_predictions_intermediate"')
107 changes: 107 additions & 0 deletions api/alembic/versions/362d268606f3_partition_weather_station_model_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
"""partition weather_station_model_predictions part 2

Revision ID: 362d268606f3
Revises: 07007f659064
Create Date: 2024-11-04 11:02:57.501656

"""

from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision = "362d268606f3"
down_revision = "07007f659064"
branch_labels = None
depends_on = None

### Adapted from pgslice "add_partitions" command, partitions previous 6 months, and the future 3 months
# BEGIN;
# CREATE TABLE "public"."weather_station_model_predictions_202405" PARTITION OF "public"."weather_station_model_predictions_intermediate" FOR VALUES FROM ('2024-05-01 00:00:00 UTC') TO ('2024-06-01 00:00:00 UTC');
# ALTER TABLE "public"."weather_station_model_predictions_202405" ADD PRIMARY KEY ("id");

# CREATE TABLE "public"."weather_station_model_predictions_202406" PARTITION OF "public"."weather_station_model_predictions_intermediate" FOR VALUES FROM ('2024-06-01 00:00:00 UTC') TO ('2024-07-01 00:00:00 UTC');
# ALTER TABLE "public"."weather_station_model_predictions_202406" ADD PRIMARY KEY ("id");


# CREATE TABLE "public"."weather_station_model_predictions_202407" PARTITION OF "public"."weather_station_model_predictions_intermediate" FOR VALUES FROM ('2024-07-01 00:00:00 UTC') TO ('2024-08-01 00:00:00 UTC');
# ALTER TABLE "public"."weather_station_model_predictions_202407" ADD PRIMARY KEY ("id");

# CREATE TABLE "public"."weather_station_model_predictions_202408" PARTITION OF "public"."weather_station_model_predictions_intermediate" FOR VALUES FROM ('2024-08-01 00:00:00 UTC') TO ('2024-09-01 00:00:00 UTC');
# ALTER TABLE "public"."weather_station_model_predictions_202408" ADD PRIMARY KEY ("id");

# CREATE TABLE "public"."weather_station_model_predictions_202409" PARTITION OF "public"."weather_station_model_predictions_intermediate" FOR VALUES FROM ('2024-09-01 00:00:00 UTC') TO ('2024-10-01 00:00:00 UTC');
# ALTER TABLE "public"."weather_station_model_predictions_202409" ADD PRIMARY KEY ("id");

# CREATE TABLE "public"."weather_station_model_predictions_202410" PARTITION OF "public"."weather_station_model_predictions_intermediate" FOR VALUES FROM ('2024-10-01 00:00:00 UTC') TO ('2024-11-01 00:00:00 UTC');
# ALTER TABLE "public"."weather_station_model_predictions_202410" ADD PRIMARY KEY ("id");

# CREATE TABLE "public"."weather_station_model_predictions_202411" PARTITION OF "public"."weather_station_model_predictions_intermediate" FOR VALUES FROM ('2024-11-01 00:00:00 UTC') TO ('2024-12-01 00:00:00 UTC');
# ALTER TABLE "public"."weather_station_model_predictions_202411" ADD PRIMARY KEY ("id");

# CREATE TABLE "public"."weather_station_model_predictions_202412" PARTITION OF "public"."weather_station_model_predictions_intermediate" FOR VALUES FROM ('2024-12-01 00:00:00 UTC') TO ('2025-01-01 00:00:00 UTC');
# ALTER TABLE "public"."weather_station_model_predictions_202412" ADD PRIMARY KEY ("id");

# CREATE TABLE "public"."weather_station_model_predictions_202501" PARTITION OF "public"."weather_station_model_predictions_intermediate" FOR VALUES FROM ('2025-01-01 00:00:00 UTC') TO ('2025-02-01 00:00:00 UTC');
# ALTER TABLE "public"."weather_station_model_predictions_202501" ADD PRIMARY KEY ("id");

# CREATE TABLE "public"."weather_station_model_predictions_202502" PARTITION OF "public"."weather_station_model_predictions_intermediate" FOR VALUES FROM ('2025-02-01 00:00:00 UTC') TO ('2025-03-01 00:00:00 UTC');
# ALTER TABLE "public"."weather_station_model_predictions_202502" ADD PRIMARY KEY ("id");
# COMMIT;


def upgrade():
op.execute(
'CREATE TABLE "public"."weather_station_model_predictions_202405" PARTITION OF "public"."weather_station_model_predictions_intermediate" FOR VALUES FROM (\'2024-05-01 00:00:00 UTC\') TO (\'2024-06-01 00:00:00 UTC\');'
)
op.execute('ALTER TABLE "public"."weather_station_model_predictions_202405" ADD PRIMARY KEY ("id");')
op.execute(
'CREATE TABLE "public"."weather_station_model_predictions_202406" PARTITION OF "public"."weather_station_model_predictions_intermediate" FOR VALUES FROM (\'2024-06-01 00:00:00 UTC\') TO (\'2024-07-01 00:00:00 UTC\');'
)
op.execute('ALTER TABLE "public"."weather_station_model_predictions_202406" ADD PRIMARY KEY ("id");')
op.execute(
'CREATE TABLE "public"."weather_station_model_predictions_202407" PARTITION OF "public"."weather_station_model_predictions_intermediate" FOR VALUES FROM (\'2024-07-01 00:00:00 UTC\') TO (\'2024-08-01 00:00:00 UTC\');'
)
op.execute('ALTER TABLE "public"."weather_station_model_predictions_202407" ADD PRIMARY KEY ("id");')
op.execute(
'CREATE TABLE "public"."weather_station_model_predictions_202408" PARTITION OF "public"."weather_station_model_predictions_intermediate" FOR VALUES FROM (\'2024-08-01 00:00:00 UTC\') TO (\'2024-09-01 00:00:00 UTC\');'
)
op.execute('ALTER TABLE "public"."weather_station_model_predictions_202408" ADD PRIMARY KEY ("id");')
op.execute(
'CREATE TABLE "public"."weather_station_model_predictions_202409" PARTITION OF "public"."weather_station_model_predictions_intermediate" FOR VALUES FROM (\'2024-09-01 00:00:00 UTC\') TO (\'2024-10-01 00:00:00 UTC\');'
)
op.execute('ALTER TABLE "public"."weather_station_model_predictions_202409" ADD PRIMARY KEY ("id");')
op.execute(
'CREATE TABLE "public"."weather_station_model_predictions_202410" PARTITION OF "public"."weather_station_model_predictions_intermediate" FOR VALUES FROM (\'2024-10-01 00:00:00 UTC\') TO (\'2024-11-01 00:00:00 UTC\');'
)
op.execute('ALTER TABLE "public"."weather_station_model_predictions_202410" ADD PRIMARY KEY ("id");')
op.execute(
'CREATE TABLE "public"."weather_station_model_predictions_202411" PARTITION OF "public"."weather_station_model_predictions_intermediate" FOR VALUES FROM (\'2024-11-01 00:00:00 UTC\') TO (\'2024-12-01 00:00:00 UTC\');'
)
op.execute('ALTER TABLE "public"."weather_station_model_predictions_202411" ADD PRIMARY KEY ("id");')
op.execute(
'CREATE TABLE "public"."weather_station_model_predictions_202412" PARTITION OF "public"."weather_station_model_predictions_intermediate" FOR VALUES FROM (\'2024-12-01 00:00:00 UTC\') TO (\'2025-01-01 00:00:00 UTC\');'
)
op.execute('ALTER TABLE "public"."weather_station_model_predictions_202412" ADD PRIMARY KEY ("id");')
op.execute(
'CREATE TABLE "public"."weather_station_model_predictions_202501" PARTITION OF "public"."weather_station_model_predictions_intermediate" FOR VALUES FROM (\'2025-01-01 00:00:00 UTC\') TO (\'2025-02-01 00:00:00 UTC\');'
)
op.execute('ALTER TABLE "public"."weather_station_model_predictions_202501" ADD PRIMARY KEY ("id");')
op.execute(
'CREATE TABLE "public"."weather_station_model_predictions_202502" PARTITION OF "public"."weather_station_model_predictions_intermediate" FOR VALUES FROM (\'2025-02-01 00:00:00 UTC\') TO (\'2025-03-01 00:00:00 UTC\');'
)
op.execute('ALTER TABLE "public"."weather_station_model_predictions_202502" ADD PRIMARY KEY ("id");')


def downgrade():
op.execute("DROP TABLE weather_station_model_predictions_202502;")
op.execute("DROP TABLE weather_station_model_predictions_202501;")
op.execute("DROP TABLE weather_station_model_predictions_202412;")
op.execute("DROP TABLE weather_station_model_predictions_202411;")
op.execute("DROP TABLE weather_station_model_predictions_202410;")
op.execute("DROP TABLE weather_station_model_predictions_202409;")
op.execute("DROP TABLE weather_station_model_predictions_202408;")
op.execute("DROP TABLE weather_station_model_predictions_202407;")
op.execute("DROP TABLE weather_station_model_predictions_202406;")
op.execute("DROP TABLE weather_station_model_predictions_202405;")
2 changes: 2 additions & 0 deletions openshift/pgslice/docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
FROM ankane/pgslice:v0.6.1
COPY fill_partition_data.sh .
52 changes: 52 additions & 0 deletions openshift/pgslice/docker/fill_partition_data.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#!/bin/bash

# usage example:
# PG_PASSWORD=wps PG_HOSTNAME=localhost PG_PORT=5432 PG_USER=wps PG_DATABASE=wps TABLE=table ./backup_to_s3.sh

# variable checks
if [ -z ${PG_PASSWORD+0} ]
then
echo "PG_PASSWORD not specified"
echo "Specify a postgres password"
exit 1
fi

if [ -z ${PG_HOSTNAME+0} ]
then
echo "PG_HOSTNAME not specified"
echo "Specify a postgres hostname"
exit 1
fi

if [ -z ${PG_PORT+0} ]
then
echo "PG_PORT not specified"
echo "Specify a postgres port"
exit 1
fi

if [ -z ${PG_USER+0} ]
then
echo "PG_USER not specified"
echo "Specify a postgres user"
exit 1
fi

if [ -z ${PG_DATABASE+0} ]
then
echo "PG_DATABASE not specified"
echo "Specify a postgres database"
exit 1
fi

if [ -z ${TABLE+0} ]
then
echo "TABLE not specified"
echo "Specify a postgres table"
exit 1
fi

export PGSLICE_URL = "postgresql://${PG_USER}:${PG_PASSWORD}@${PG_HOSTNAME}:${PG_PORT}/${PG_DATABASE}"
pgslice fill $TABLE
pgslice analyze $TABLE
pgslice swap $TABLE
18 changes: 18 additions & 0 deletions openshift/pgslice/openshift/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# pgslice partitioning image

Uses https://hub.docker.com/r/ankane/pgslice to run the pgslice commands
Runs `fill`, `analyze` and `swap` for a newly partitioned table where the original has data, to fill the partitions with existing data.

## Building

### Create a new build

```bash
`oc -n e1e498-tools process -f build.yaml -p VERSION=05-11-2024 | oc -n e1e498-tools apply -f -`
```

### Kick off a new build

```bash
oc -n e1e498-tools start-build pgslice --follow
```
64 changes: 64 additions & 0 deletions openshift/pgslice/openshift/build.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
apiVersion: template.openshift.io/v1
kind: Template
metadata:
creationTimestamp: null
name: pgslice
labels:
app: pgslice
phase: build
app.kubernetes.io/name: pgslice
app.kubernetes.io/managed-by: template
parameters:
- name: NAME
value: pgslice
- name: VERSION
description: Output version
required: true
value: "latest"
- name: GIT_URL
value: https://github.com/bcgov/wps.git
- name: GIT_BRANCH
value: task/partition-weather-station-model-predictions
objects:
- apiVersion: v1
kind: ImageStream
metadata:
annotations:
openshift.io/generated-by: OpenShiftNewBuild
labels:
app: pgslice
common: "true"
name: pgslice
spec:
lookupPolicy:
local: false
- apiVersion: v1
kind: BuildConfig
metadata:
annotations:
openshift.io/generated-by: OpenShiftNewBuild
labels:
app: pgslice
name: pgslice
spec:
resources:
limits:
cpu: "2000m"
memory: "512Mi"
requests:
# we might as well start high!
cpu: "500m"
memory: "256Mi"
completionDeadlineSeconds: 600 # 10 minutes.
output:
to:
kind: ImageStreamTag
name: pgslice:${VERSION}
source:
type: Git
git:
uri: ${GIT_URL}
ref: origin/${GIT_BRANCH}
contextDir: openshift/pgslice/docker
strategy:
type: Docker
34 changes: 34 additions & 0 deletions openshift/scripts/oc_provision_fill_partition_job.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#!/bin/sh -l
#
source "$(dirname ${0})/common/common"

#%
#% OpenShift Deploy Helper
#%
#% Intended for use with a pull request-based pipeline.
#% Suffixes incl.: pr-###.
#%
#% Usage:
#%
#% [PROJ_TARGET] [PG_DATABASE] [TABLE] ${THIS_FILE} [SUFFIX]
#%
#% Examples:
#%
#% PROJ_TARGET=e1e498-dev PG_DATABASE=wps TABLE=table ${THIS_FILE} pr-0

JOB="job/fill-partition-data-${SUFFIX}"

# create the job
oc -n ${PROJ_TARGET} process -f ${TEMPLATE_PATH}/partition_filler_job.yaml \
-p SUFFIX=${SUFFIX} \
-p PG_DATABASE=${PG_DATABASE} \
-p TABLE=${TABLE} \
-p CRUNCHYDB_USER=wps-crunchydb-${SUFFIX}-pguser-wps-crunchydb-${SUFFIX} \
-p PROJ_TOOLS=${PROJ_TOOLS} | jq '.items[0]' | oc -n ${PROJ_TARGET} create -f -
# wait for the job to finish
oc wait --for=condition=complete ${JOB} --timeout=3600s
# output the log for debugging
oc logs -f ${JOB}
# we're done, so get rid of the job
oc delete ${JOB}

Loading
Loading