Skip to content

Commit

Permalink
propagate redownload, use download_id in get download
Browse files Browse the repository at this point in the history
  • Loading branch information
andrii-i committed Apr 10, 2024
1 parent b01cff9 commit cfc4886
Show file tree
Hide file tree
Showing 6 changed files with 13 additions and 18 deletions.
7 changes: 4 additions & 3 deletions jupyter_scheduler/download_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ def put(self, download: DescribeDownload):
session.add(download)
session.commit()

def get(self, job_id: str) -> Optional[DescribeDownload]:
def get(self, download_id: str) -> Optional[DescribeDownload]:
with self.session() as session:
download = session.query(Download).filter(Download.job_id == job_id).first()
download = session.query(Download).filter(Download.download_id == download_id).first()

if download:
return DescribeDownload.from_orm(download)
Expand All @@ -45,13 +45,14 @@ def __init__(self, db_url: str):
self.record_manager = DownloadRecordManager(db_url=db_url)
self.queue = Queue()

def download_from_staging(self, job_id: str):
def download_from_staging(self, job_id: str, redownload: bool):
download_initiated_time = get_utc_timestamp()
download_id = generate_uuid()
download = DescribeDownload(
job_id=job_id,
download_id=download_id,
download_initiated_time=download_initiated_time,
redownload=redownload,
)
self.record_manager.put(download)
self.queue.put(download)
Expand Down
8 changes: 4 additions & 4 deletions jupyter_scheduler/download_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ def __init__(
async def process_download_queue(self):
while not self.download_manager.queue.empty():
download = self.download_manager.queue.get()
cache = self.download_manager.record_manager.get(download.job_id)
if not cache or not download:
download_record = self.download_manager.record_manager.get(download.download_id)
if not download_record:
continue
await self.job_files_manager.copy_from_staging(cache.job_id)
self.download_manager.record_manager.delete_download(cache.download_id)
await self.job_files_manager.copy_from_staging(download.job_id, download.redownload)
self.download_manager.delete_download(download.download_id)

async def start(self):
self.download_manager.populate_queue()
Expand Down
1 change: 1 addition & 0 deletions jupyter_scheduler/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ def _download_from_staging(self, job_id: str):
job_id=job_id,
download_id=download_id,
download_initiated_time=download_initiated_time,
redownload=True,
)
with self.db_session() as session:
download_record = Download(**download.dict())
Expand Down
13 changes: 2 additions & 11 deletions jupyter_scheduler/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,16 +394,8 @@ def get(self):


class FilesDownloadHandler(ExtensionHandlerMixin, APIHandler):
# _job_files_manager = None
_download_from_staging = None

# @property
# def job_files_manager(self):
# if not self._job_files_manager:
# self._job_files_manager = self.settings.get("job_files_manager", None)

# return self._job_files_manager

@property
def download_from_staging(self):
if not self._download_from_staging:
Expand All @@ -413,10 +405,9 @@ def download_from_staging(self):

@authenticated
async def get(self, job_id):
# redownload = self.get_query_argument("redownload", False)
redownload = self.get_query_argument("redownload", False)
try:
# await self.job_files_manager.copy_from_staging(job_id=job_id, redownload=redownload)
self.download_from_staging(job_id)
self.download_from_staging(job_id, redownload)
except Exception as e:
self.log.exception(e)
raise HTTPError(500, str(e)) from e
Expand Down
1 change: 1 addition & 0 deletions jupyter_scheduler/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ class DescribeDownload(BaseModel):
job_id: str
download_id: str
download_initiated_time: int
redownload: bool

class Config:
orm_mode = True
Expand Down
1 change: 1 addition & 0 deletions jupyter_scheduler/orm.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ class Download(Base):
job_id = Column(String(36), primary_key=True)
download_id = Column(String(36), primary_key=True)
download_initiated_time = Column(Integer)
redownload = Column(Boolean, default=False)


def create_tables(db_url, drop_tables=False):
Expand Down

0 comments on commit cfc4886

Please sign in to comment.