From 8bf41564eb0df5adfc42b637fbe013111d2d00da Mon Sep 17 00:00:00 2001 From: Leonid Andreev Date: Mon, 16 Dec 2024 17:15:40 -0500 Subject: [PATCH] more improvements for async. handling of download transfers #11057 --- .../dataverse/globus/GlobusServiceBean.java | 145 +++++++++++------- .../iq/dataverse/globus/GlobusTaskState.java | 1 + .../iq/dataverse/globus/GlobusUtil.java | 6 + .../globus/TaskMonitoringServiceBean.java | 59 ++++++- 4 files changed, 151 insertions(+), 60 deletions(-) diff --git a/src/main/java/edu/harvard/iq/dataverse/globus/GlobusServiceBean.java b/src/main/java/edu/harvard/iq/dataverse/globus/GlobusServiceBean.java index 53de77f2fc8..5f85aa0cd80 100644 --- a/src/main/java/edu/harvard/iq/dataverse/globus/GlobusServiceBean.java +++ b/src/main/java/edu/harvard/iq/dataverse/globus/GlobusServiceBean.java @@ -154,7 +154,7 @@ private String getRuleId(GlobusEndpoint endpoint, String principal, String permi * @param dataset - the dataset associated with the rule * @param globusLogger - a separate logger instance, may be null */ - public void deletePermission(String ruleId, Dataset dataset, Logger globusLogger) { + private void deletePermission(String ruleId, Dataset dataset, Logger globusLogger) { globusLogger.fine("Start deleting rule " + ruleId + " for dataset " + dataset.getId()); if (ruleId.length() > 0) { if (dataset != null) { @@ -172,7 +172,7 @@ public void deletePermission(String ruleId, Dataset dataset, Logger globusLogger globusLogger.info("Access rule " + ruleId + " was deleted successfully"); } } catch (MalformedURLException ex) { - logger.log(Level.WARNING, + globusLogger.log(Level.WARNING, "Failed to delete access rule " + ruleId + " on endpoint " + endpoint.getId(), ex); } } @@ -444,7 +444,6 @@ private void monitorTemporaryPermissions(String ruleId, long datasetId) { * files are created in general, some calls may use the * class logger) * @return - * @throws MalformedURLException */ public GlobusTaskState getTask(String accessToken, String taskId, Logger globusLogger) { @@ -730,8 +729,13 @@ public void globusUpload(JsonObject jsonData, Dataset dataset, String httpReques String logTimestamp = logFormatter.format(startDate); Logger globusLogger = Logger.getLogger( - "edu.harvard.iq.dataverse.upload.client.DatasetServiceBean." + "GlobusUpload" + logTimestamp); - String logFileName = System.getProperty("com.sun.aas.instanceRoot") + File.separator + "logs" + File.separator + "globusUpload_" + dataset.getId() + "_" + logTimestamp + "edu.harvard.iq.dataverse.globus.GlobusServiceBean." + "Globus" + + GlobusTaskInProgress.TaskType.UPLOAD + logTimestamp); + + String logFileName = System.getProperty("com.sun.aas.instanceRoot") + + File.separator + "logs" + + File.separator + "globus" + GlobusTaskInProgress.TaskType.UPLOAD + "_" + + logTimestamp + " " + dataset.getId() + "_" + ".log"; FileHandler fileHandler; @@ -1160,12 +1164,18 @@ private void processUploadedFiles(JsonArray filesJsonArray, Dataset dataset, Aut public void globusDownload(JsonObject jsonObject, Dataset dataset, User authUser) throws MalformedURLException { Date startDate = new Date(); - + + // @todo the logger initialization method will be moved into the GlobusUtil + // eventually, for both this and the monitoring service to use String logTimestamp = logFormatter.format(startDate); Logger globusLogger = Logger.getLogger( - "edu.harvard.iq.dataverse.upload.client.DatasetServiceBean." + "GlobusDownload" + logTimestamp); + "edu.harvard.iq.dataverse.globus.GlobusServiceBean." + "Globus" + + GlobusTaskInProgress.TaskType.DOWNLOAD + logTimestamp); - String logFileName = System.getProperty("com.sun.aas.instanceRoot") + File.separator + "logs" + File.separator + "globusDownload_id_" + dataset.getId() + "_" + logTimestamp + String logFileName = System.getProperty("com.sun.aas.instanceRoot") + + File.separator + "logs" + + File.separator + "globus" + GlobusTaskInProgress.TaskType.DOWNLOAD + "_" + + dataset.getId() + "_" + logTimestamp + ".log"; FileHandler fileHandler; boolean fileHandlerSuceeded; @@ -1194,8 +1204,8 @@ public void globusDownload(JsonObject jsonObject, Dataset dataset, User authUser // If the rules_cache times out, the permission will be deleted. Presumably that // doesn't affect a // globus task status check - GlobusTaskState task = getTask(endpoint.getClientToken(), taskIdentifier, globusLogger); - String ruleId = getRuleId(endpoint, task.getOwner_id(), "r"); + GlobusTaskState taskState = getTask(endpoint.getClientToken(), taskIdentifier, globusLogger); + String ruleId = getRuleId(endpoint, taskState.getOwner_id(), "r"); if (ruleId != null) { logger.fine("Found rule: " + ruleId); Long datasetId = rulesCache.getIfPresent(ruleId); @@ -1227,51 +1237,25 @@ public void globusDownload(JsonObject jsonObject, Dataset dataset, User authUser GlobusTaskInProgress.TaskType.DOWNLOAD, dataset, endpoint.getClientToken(), - authUser instanceof AuthenticatedUser ? authUser : null, + authUser instanceof AuthenticatedUser ? (AuthenticatedUser)authUser : null, ruleId, new Timestamp(startDate.getTime())); em.persist(taskInProgress); - if (fileHandler != null) { - fileHandler.close(); - } + fileHandler.close(); // return and forget return; } - task = globusStatusCheck(endpoint, taskIdentifier, globusLogger); - // @todo null check? - String taskStatus = GlobusUtil.getTaskStatus(task); - - // Transfer is done (success or failure) so delete the rule - if (ruleId != null) { - logger.fine("Deleting: rule: " + ruleId); - deletePermission(ruleId, dataset, globusLogger); - } - - if (taskStatus.startsWith("FAILED") || taskStatus.startsWith("INACTIVE")) { - String comment = "Reason : " + taskStatus.split("#")[1] + "
Short Description : " - + taskStatus.split("#")[2]; - if (authUser != null && authUser instanceof AuthenticatedUser) { - userNotificationService.sendNotification((AuthenticatedUser) authUser, new Timestamp(new Date().getTime()), - UserNotification.Type.GLOBUSDOWNLOADCOMPLETEDWITHERRORS, dataset.getId(), comment, true); - } - - globusLogger.info("Globus task failed during download process: "+comment); - } else if (authUser != null && authUser instanceof AuthenticatedUser) { - - boolean taskSkippedFiles = (task.getSkip_source_errors() == null) ? false : task.getSkip_source_errors(); - if (!taskSkippedFiles) { - userNotificationService.sendNotification((AuthenticatedUser) authUser, - new Timestamp(new Date().getTime()), UserNotification.Type.GLOBUSDOWNLOADCOMPLETED, - dataset.getId()); - } else { - userNotificationService.sendNotification((AuthenticatedUser) authUser, - new Timestamp(new Date().getTime()), UserNotification.Type.GLOBUSDOWNLOADCOMPLETEDWITHERRORS, - dataset.getId(), ""); - } - } + // Check again: + taskState = globusStatusCheck(endpoint, taskIdentifier, globusLogger); + + processCompletedDownloadTask(taskState, + authUser instanceof AuthenticatedUser ? (AuthenticatedUser)authUser : null, + dataset, + ruleId, + globusLogger); } Executor executor = Executors.newFixedThreadPool(10); @@ -1540,6 +1524,10 @@ public List findAllOngoingTasks() { return em.createQuery("select object(o) from GlobusTaskInProgress as o order by o.startTime", GlobusTaskInProgress.class).getResultList(); } + public List findAllOngoingTasks(GlobusTaskInProgress.TaskType taskType) { + return em.createQuery("select object(o) from GlobusTaskInProgress as o where o.taskType=:taskType order by o.startTime", GlobusTaskInProgress.class).setParameter("taskType", taskType).getResultList(); + } + public void deleteTask(GlobusTaskInProgress task) { GlobusTaskInProgress mergedTask = em.merge(task); em.remove(mergedTask); @@ -1549,14 +1537,10 @@ public List findExternalUploadsByTaskId(String tas return em.createNamedQuery("ExternalFileUploadInProgress.findByTaskId").setParameter("taskId", taskId).getResultList(); } - public void processCompletedTask(GlobusTaskInProgress globusTask, boolean taskSuccess, String taskStatus, Logger taskLogger) { + public void processCompletedTask(GlobusTaskInProgress globusTask, GlobusTaskState taskState, boolean taskSuccess, String taskStatus, Logger taskLogger) { String ruleId = globusTask.getRuleId(); Dataset dataset = globusTask.getDataset(); AuthenticatedUser authUser = globusTask.getLocalUser(); - if (authUser == null) { - // @todo log error message; do nothing - return; - } if (GlobusTaskInProgress.TaskType.UPLOAD.equals(globusTask.getTaskType())) { List fileUploadsInProgress = findExternalUploadsByTaskId(globusTask.getTaskId()); @@ -1578,10 +1562,67 @@ public void processCompletedTask(GlobusTaskInProgress globusTask, boolean taskSu JsonArray filesJsonArray = filesJsonArrayBuilder.build(); processCompletedUploadTask(dataset, filesJsonArray, authUser, ruleId, taskLogger, taskSuccess, taskStatus); + } else if (GlobusTaskInProgress.TaskType.DOWNLOAD.equals(globusTask.getTaskType())) { + + processCompletedDownloadTask(taskState, authUser, dataset, ruleId, taskLogger); + } else { - // @todo eventually, extend this async. framework to handle Glonus downloads as well + logger.warning("Unknown or null TaskType passed to processCompletedTask()"); + } + + } + + private void processCompletedDownloadTask(GlobusTaskState taskState, + AuthenticatedUser authUser, + Dataset dataset, + String ruleId, + Logger taskLogger) { + // The only thing to do on completion of a remote download + // transfer is to delete the permission ACL that Dataverse + // had negotiated for the user before the task was initialized: + + if (ruleId != null) { + deletePermission(ruleId, dataset, taskLogger); } + String taskStatus = GlobusUtil.getTaskStatus(taskState); + + // ... plus log the outcome and send any notifications: + if (taskStatus.startsWith("FAILED") || taskStatus.startsWith("INACTIVE")) { + // Outright, unambiguous failure: + String comment = "Reason : " + taskStatus.split("#")[1] + "
Short Description : " + + taskStatus.split("#")[2]; + taskLogger.info("Globus task failed during download process: " + comment); + + sendNotification(authUser, UserNotification.Type.GLOBUSDOWNLOADCOMPLETEDWITHERRORS, dataset.getId(), comment); + + } else { + // Success, total or partial + boolean taskSkippedFiles = (taskState == null || taskState.getSkip_source_errors() == null) ? false : taskState.getSkip_source_errors(); + + if (!taskSkippedFiles) { + taskLogger.info("Globus task completed successfully"); + + sendNotification(authUser, UserNotification.Type.GLOBUSDOWNLOADCOMPLETED, dataset.getId(), ""); + } else { + taskLogger.info("Globus task completed with partial success (skip source errors)"); + + sendNotification(authUser, UserNotification.Type.GLOBUSDOWNLOADCOMPLETEDWITHERRORS, dataset.getId(), ""); + } + } + } + + private void sendNotification(AuthenticatedUser authUser, + UserNotification.Type type, + Long datasetId, + String comment) { + if (authUser != null) { + userNotificationService.sendNotification(authUser, + new Timestamp(new Date().getTime()), + type, + datasetId, + comment); + } } public void deleteExternalUploadRecords(String taskId) { diff --git a/src/main/java/edu/harvard/iq/dataverse/globus/GlobusTaskState.java b/src/main/java/edu/harvard/iq/dataverse/globus/GlobusTaskState.java index b5db20d46c1..2a53df74ee4 100644 --- a/src/main/java/edu/harvard/iq/dataverse/globus/GlobusTaskState.java +++ b/src/main/java/edu/harvard/iq/dataverse/globus/GlobusTaskState.java @@ -4,6 +4,7 @@ * This class is used to store the state of an ongoing Globus task (transfer) * as reported by the Globus task API. */ + public class GlobusTaskState { private String DATA_TYPE; diff --git a/src/main/java/edu/harvard/iq/dataverse/globus/GlobusUtil.java b/src/main/java/edu/harvard/iq/dataverse/globus/GlobusUtil.java index 3254114ca49..fd36e2a27bc 100644 --- a/src/main/java/edu/harvard/iq/dataverse/globus/GlobusUtil.java +++ b/src/main/java/edu/harvard/iq/dataverse/globus/GlobusUtil.java @@ -39,6 +39,8 @@ public static boolean isTaskCompleted(GlobusTaskState task) { // TODO: "nice_status": "CONNECTION_FAILED" *may* mean // that it's a Globus issue on the endnode side, that is // in fact recoverable; should we add it to the list here? + // @todo: I'm tempted to just take "ACTIVE" for face value, + // and assume that it's still ongoing. if (task.getNice_status().equalsIgnoreCase("ok") || task.getNice_status().equalsIgnoreCase("queued")) { return false; @@ -61,6 +63,9 @@ public static boolean isTaskSucceeded(GlobusTaskState task) { // has not completed *successfully*. return false; } + // @todo: should we be more careful here, and actually check for + // status.equalsI("SUCCEEDED") etc. before assuming the task + // did in fact succeed? return true; } } @@ -89,6 +94,7 @@ public static String getTaskStatus(GlobusTaskState task) { status = "FAILED"; } } else { + // @todo are we sure? status = "FAILED"; } return status; diff --git a/src/main/java/edu/harvard/iq/dataverse/globus/TaskMonitoringServiceBean.java b/src/main/java/edu/harvard/iq/dataverse/globus/TaskMonitoringServiceBean.java index fdb2b222804..60e24d62702 100644 --- a/src/main/java/edu/harvard/iq/dataverse/globus/TaskMonitoringServiceBean.java +++ b/src/main/java/edu/harvard/iq/dataverse/globus/TaskMonitoringServiceBean.java @@ -53,21 +53,30 @@ public void init() { logger.info("Starting Globus task monitoring service"); int pollingInterval = SystemConfig.getIntLimitFromStringOrDefault( settingsSvc.getValueForKey(SettingsServiceBean.Key.GlobusPollingInterval), 600); - this.scheduler.scheduleWithFixedDelay(this::checkOngoingTasks, + this.scheduler.scheduleWithFixedDelay(this::checkOngoingUploadTasks, 0, pollingInterval, TimeUnit.SECONDS); + + // A separate monitoring service for ongoing download tasks: + this.scheduler.scheduleWithFixedDelay(this::checkOngoingDownloadTasks, + 0, 13 /*pollingInterval*/, + TimeUnit.SECONDS); + } else { logger.info("Skipping Globus task monitor initialization"); } + + } /** * This method will be executed on a timer-like schedule, continuously - * monitoring all the ongoing external Globus tasks (transfers). + * monitoring all the ongoing external Globus tasks (transfers TO remote + * Globus endnodes). */ - public void checkOngoingTasks() { - logger.fine("Performing a scheduled external Globus task check"); - List tasks = globusService.findAllOngoingTasks(); + public void checkOngoingUploadTasks() { + logger.fine("Performing a scheduled external Globus UPLOAD task check"); + List tasks = globusService.findAllOngoingTasks(GlobusTaskInProgress.TaskType.UPLOAD); tasks.forEach(t -> { FileHandler taskLogHandler = getTaskLogHandler(t); @@ -76,7 +85,7 @@ public void checkOngoingTasks() { GlobusTaskState retrieved = globusService.getTask(t.getGlobusToken(), t.getTaskId(), taskLogger); if (GlobusUtil.isTaskCompleted(retrieved)) { // Do our thing, finalize adding the files to the dataset - globusService.processCompletedTask(t, GlobusUtil.isTaskSucceeded(retrieved), GlobusUtil.getTaskStatus(retrieved), taskLogger); + globusService.processCompletedTask(t, retrieved, GlobusUtil.isTaskSucceeded(retrieved), GlobusUtil.getTaskStatus(retrieved), taskLogger); // Whether it finished successfully, or failed in the process, // there's no need to keep monitoring this task, so we can // delete it. @@ -92,6 +101,36 @@ public void checkOngoingTasks() { }); } + /** + * This method will be executed on a timer-like schedule, continuously + * monitoring all the ongoing external Globus download tasks (transfers by + * Dataverse users FROM remote, Dataverse-managed Globus endnodes). + */ + public void checkOngoingDownloadTasks() { + logger.fine("Performing a scheduled external Globus DOWNLOAD task check"); + List tasks = globusService.findAllOngoingTasks(GlobusTaskInProgress.TaskType.DOWNLOAD); + + tasks.forEach(t -> { + FileHandler taskLogHandler = getTaskLogHandler(t); + Logger taskLogger = getTaskLogger(t, taskLogHandler); + + GlobusTaskState retrieved = globusService.getTask(t.getGlobusToken(), t.getTaskId(), taskLogger); + if (GlobusUtil.isTaskCompleted(retrieved)) { + globusService.processCompletedTask(t, retrieved, GlobusUtil.isTaskSucceeded(retrieved), GlobusUtil.getTaskStatus(retrieved), taskLogger); + // globusService.processCompletedTask(t, GlobusUtil.isTaskSucceeded(retrieved), GlobusUtil.getTaskStatus(retrieved), taskLogger); + // Whether it finished successfully or failed, the task can now + // be deleted. + globusService.deleteTask(t); + } + + if (taskLogHandler != null) { + // @todo it should be prudent to cache these loggers and handlers + // between monitoring runs (should be fairly easy to do) + taskLogHandler.close(); + } + }); + } + private FileHandler getTaskLogHandler(GlobusTaskInProgress task) { if (task == null) { return null; @@ -100,7 +139,10 @@ private FileHandler getTaskLogHandler(GlobusTaskInProgress task) { Date startDate = new Date(task.getStartTime().getTime()); String logTimeStamp = logFormatter.format(startDate); - String logFileName = System.getProperty("com.sun.aas.instanceRoot") + File.separator + "logs" + File.separator + "globusUpload_" + task.getDataset().getId() + "_" + logTimeStamp + String logFileName = System.getProperty("com.sun.aas.instanceRoot") + + File.separator + "logs" + + File.separator + "globus" + task.getTaskType() + "_" + + logTimeStamp + "_" + task.getDataset().getId() + ".log"; FileHandler fileHandler; try { @@ -120,7 +162,8 @@ private Logger getTaskLogger(GlobusTaskInProgress task, FileHandler logFileHandl String logTimeStamp = logFormatter.format(startDate); Logger taskLogger = Logger.getLogger( - "edu.harvard.iq.dataverse.upload.client.DatasetServiceBean." + "GlobusUpload" + logTimeStamp); + "edu.harvard.iq.dataverse.globus.GlobusServiceBean." + "Globus" + + task.getTaskType() + logTimeStamp); taskLogger.setUseParentHandlers(false); taskLogger.addHandler(logFileHandler);