Skip to content

Commit

Permalink
[#47] Adding filesystem support for save_df
Browse files Browse the repository at this point in the history
...
  • Loading branch information
gauglertodd committed Oct 23, 2024
1 parent 8868835 commit d47af0c
Showing 1 changed file with 11 additions and 19 deletions.
30 changes: 11 additions & 19 deletions raydar/task_tracker/task_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
import logging
import os
from collections.abc import Iterable
from typing import Dict, List, Optional
from typing import Dict, List, Optional, Type

import coolname
import pandas as pd
import polars as pl
import pyarrow.fs as fs
import pyarrow.parquet as pq
import ray
from packaging.version import Version
from ray.serve import shutdown
Expand Down Expand Up @@ -84,13 +86,7 @@ def exit(self) -> None:

@ray.remote(resources={"node:__internal_head__": 0.1}, num_cpus=0)
class AsyncMetadataTracker:
def __init__(
self,
name: str,
namespace: str,
path: Optional[str] = None,
enable_perspective_dashboard: bool = False,
):
def __init__(self, name: str, namespace: str, enable_perspective_dashboard: bool = False, filesystem: Type[fs.FileSystem] = fs.LocalFileSystem):
"""An async Ray Actor Class to track task level metadata.
This class constructs a AsyncMetadataTrackerCallback actor, which points back to this actor. Its process(...)
Expand All @@ -114,13 +110,13 @@ def __init__(
lifetime="detached",
get_if_exists=True,
).remote(name, namespace)
self.path = path
self.df = None
self.finished_tasks = {}
self.user_defined_metadata = {}
self.perspective_dashboard_enabled = enable_perspective_dashboard
self.pending_tasks = []
self.perspective_table_name = f"{name}_data"
self.filesystem = filesystem()

# WARNING: Do not move this import. Importing these modules elsewhere can cause
# difficult to diagnose, "There is no current event loop in thread 'ray_client_server_" errors.
Expand Down Expand Up @@ -306,14 +302,10 @@ def get_proxy_server(self) -> ray.serve.handle.DeploymentHandle:
return self.proxy_server
raise Exception("This task_tracker has no active proxy_server.")

def save_df(self) -> None:
"""Saves the internally maintained dataframe of task related information from the ray GCS"""
self.get_df()
if self.path is not None and self.df is not None:
logger.info(f"Writing DataFrame to {self.path}")
self.df.write_parquet(self.path)
return True
return False
def save_df(self, path: str) -> None:
"""Saves the internally maintained dataframe of task related information from the ray GCS to a provided path, using the filesystem attributed"""
logger.info(f"Writing DataFrame to {path}")
pq.write_table(self.get_df().to_arrow(), path, filesystem=self.filesystem)

def clear_df(self) -> None:
"""Clears the internally maintained dataframe of task related information from the ray GCS"""
Expand Down Expand Up @@ -363,9 +355,9 @@ def get_df(self, process_user_metadata_column=False) -> pl.DataFrame:
return df_with_user_metadata
return df

def save_df(self) -> None:
def save_df(self, path: str) -> None:
"""Save the dataframe used by this object's AsyncMetadataTracker actor"""
return ray.get(self.tracker.save_df.remote())
return ray.get(self.tracker.save_df.remote(path))

def clear(self) -> None:
"""Clear the dataframe used by this object's AsyncMetadataTracker actor"""
Expand Down

0 comments on commit d47af0c

Please sign in to comment.