Skip to content

Commit

Permalink
feat(cli/configs.py): added migration feature
Browse files Browse the repository at this point in the history
  • Loading branch information
odarotto committed Sep 26, 2024
1 parent 53ae8b5 commit 671688d
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 5 deletions.
94 changes: 93 additions & 1 deletion workflow/cli/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@

import json
from typing import Any, Dict, List, Optional
from urllib.parse import urlencode

import click
import requests
import yaml
from rich import pretty
from rich.console import Console
from rich.progress import BarColumn, Progress, TextColumn
from rich.syntax import Syntax
from rich.table import Table
from rich.table import Column, Table
from rich.text import Text
from yaml.loader import SafeLoader

Expand Down Expand Up @@ -345,3 +347,93 @@ def reformat(filename: click.Path):
new_data: str = yaml.dump(data)
file.write(new_data)
console.print(f"Reformatted file {filename}", style="green")


@configs.command("migrate", help="Sends running Config to Pipelines V2.")
@click.argument("name", type=str, required=True)
@click.argument("base_url", type=str, required=True)
@click.option(
"status",
"-s",
"--status",
type=click.Choice(["success", "failure", "running", "queued", "cancelled"]),
default="running",
)
@click.option("delete", "-d", "--delete", is_flag=True, default=False)
def migrate(name: str, base_url: str, status: str, delete: bool):
"""Migrates V1 Configs to V2.
Parameters
----------
name : str
Configuration name.
base_url : str
Pipelines V1 backend URL.
status : str
Status filter.
delete : bool
WIP.
"""
http = HTTPContext(backends=["configs"])
params = {
"skip": 0,
"length": 100,
"projection": json.dumps({}),
"query": json.dumps({"status": {"$in": status.split(",")}}),
"name": name,
}
url = f"{base_url}?{urlencode(params)}"
response = requests.get(url)
pipelines = response.json()
text_column = TextColumn("{task.description}", table_column=Column(ratio=1))
bar_column = BarColumn(bar_width=None, table_column=Column(ratio=2))
progress = Progress(text_column, bar_column, expand=True)

if not pipelines:
console.print(f"No Config objects were found under the name {name}")
return

console.print(
f"Migrating {len(pipelines)} Pipeline objects from {name} collection to V2.",
style="bright_green",
)
with progress:
for obj in pipelines:
task = progress.add_task(f"[cyan]Migrating {obj['id']}", total=1)
# ? Reformat
reformatted_obj = format.reformat(
data=obj,
r_steps=False,
r_matrix=False,
r_version=True,
)
yaml_url = (
f"{base_url}/inspect?"
f"{urlencode({'name': name, 'id': reformatted_obj['id']})}"
)
yaml_response = requests.get(yaml_url)
old_yaml = yaml_response.json()["yaml"]
new_yaml = format.reformat(
data=yaml.safe_load(old_yaml),
r_steps=True,
r_matrix=True,
r_version=True,
)
progress.update(task, advance=0.3)
reformatted_obj["yaml"] = yaml.dump(new_yaml)
# ? Stop monitoring on V1
stopped = False
if obj["status"] == "running":
stop_params = urlencode(
{"name": obj["name"], "query": json.dumps({"id": obj["id"]})}
)
stop_url = f"{base_url}/cancel?" f"{stop_params}"
stop_response = requests.put(stop_url)
if stop_response.ok:
stopped = True
progress.update(task, advance=0.3)
# ? Migrate
migration_response = http.configs.migrate(reformatted_obj)
if migration_response.ok:
progress.update(task, advance=0.4 if stopped else 0.7)
console.print("Migration completed", style="bright_green")
22 changes: 22 additions & 0 deletions workflow/http/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,28 @@ def deploy(self, data: Dict[str, Any]):
response.raise_for_status()
return response.json()

@retry(
reraise=True, wait=wait_random(min=0.3, max=1.8), stop=(stop_after_delay(15))
)
def migrate(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Migrates V1 objects to V2.
Parameters
----------
data : Dict[str, Any]
Object payload.
Returns
-------
Dict[str, Any]
Backend JSON response.
"""
with self.session as session:
url = f"{self.baseurl}/configs/migrate"
response: Response = session.post(url, json=data)
response.raise_for_status()
return response.json()

def count(self) -> Dict[str, Any]:
"""Count all documents in a collection.
Expand Down
11 changes: 7 additions & 4 deletions workflow/utils/format.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def reformat(
r_steps: bool,
r_matrix: bool,
r_version: bool,
console: Console,
console: Optional[Console] = None,
) -> Dict[str, Any]:
"""Reformats data to work on Pipeline V2 backend.
Expand All @@ -60,7 +60,8 @@ def reformat(
"""
# ? Reformat steps
if r_steps:
console.print("Reformatting steps.", style="green")
if console:
console.print("Reformatting steps.", style="green")
steps: List[Dict[str, Any]] = []
for step_name in data["pipeline"].keys():
step = data["pipeline"][step_name]
Expand All @@ -71,7 +72,8 @@ def reformat(

# ? Check top level matrix
if r_matrix:
console.print("Reformatting top level matrix.", style="green")
if console:
console.print("Reformatting top level matrix.", style="green")
matrix: Optional[Dict[str, Any]] = None
if data.get("matrix", None):
matrix = data["matrix"]
Expand All @@ -80,7 +82,8 @@ def reformat(

# ? Fix version
if r_version:
console.print("Fixing version.", style="green")
if console:
console.print("Fixing version.", style="green")
data["version"] = "2"

return data

0 comments on commit 671688d

Please sign in to comment.