Skip to content

Commit

Permalink
initializing the branch #11057
Browse files Browse the repository at this point in the history
  • Loading branch information
landreev committed Dec 13, 2024
1 parent e3b5795 commit dab3884
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 15 deletions.
11 changes: 10 additions & 1 deletion src/main/java/edu/harvard/iq/dataverse/api/Datasets.java
Original file line number Diff line number Diff line change
Expand Up @@ -4443,8 +4443,17 @@ public Response monitorGlobusDownload(@Context ContainerRequestContext crc, @Pat
return wr.getResponse();
}

JsonObject jsonObject = null;
try {
jsonObject = JsonUtil.getJsonObject(jsonData);
} catch (Exception ex) {
logger.warning("Globus download monitoring: error parsing json: " + jsonData + " " + ex.getMessage());
return badRequest("Error parsing json body");

}

// Async Call
globusService.globusDownload(jsonData, dataset, authUser);
globusService.globusDownload(jsonObject, dataset, authUser);

return ok("Async call to Globus Download started");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1157,9 +1157,11 @@ private void processUploadedFiles(JsonArray filesJsonArray, Dataset dataset, Aut
}

@Asynchronous
public void globusDownload(String jsonData, Dataset dataset, User authUser) throws MalformedURLException {
public void globusDownload(JsonObject jsonObject, Dataset dataset, User authUser) throws MalformedURLException {

String logTimestamp = logFormatter.format(new Date());
Date startDate = new Date();

String logTimestamp = logFormatter.format(startDate);
Logger globusLogger = Logger.getLogger(
"edu.harvard.iq.dataverse.upload.client.DatasetServiceBean." + "GlobusDownload" + logTimestamp);

Expand All @@ -1182,17 +1184,8 @@ public void globusDownload(String jsonData, Dataset dataset, User authUser) thro
globusLogger = logger;
}

globusLogger.info("Starting a globusDownload ");

JsonObject jsonObject = null;
try {
jsonObject = JsonUtil.getJsonObject(jsonData);
} catch (Exception jpe) {
jpe.printStackTrace();
globusLogger.log(Level.SEVERE, "Error parsing dataset json. Json: {0}", jsonData);
// TODO: stop the process after this parsing exception.
}

globusLogger.info("Starting monitoring a globus download task");

String taskIdentifier = jsonObject.getString("taskIdentifier");

GlobusEndpoint endpoint = getGlobusEndpoint(dataset);
Expand All @@ -1215,7 +1208,38 @@ public void globusDownload(String jsonData, Dataset dataset, User authUser) thro
// Something is wrong - the rule should be there (a race with the cache timing
// out?)
logger.warning("ruleId not found for taskId: " + taskIdentifier);
// @todo: do we need to bail out then, or ...?
}

// Wait before first check
try {
Thread.sleep(3000);
} catch (InterruptedException ie) {
logger.warning("caught an Interrupted Exception while trying to sleep for 3 sec. in globusDownload()");
}

if (FeatureFlags.GLOBUS_USE_EXPERIMENTAL_ASYNC_FRAMEWORK.enabled()) {

// Save the task information in the database so that the Globus monitoring
// service can continue checking on its progress.

GlobusTaskInProgress taskInProgress = new GlobusTaskInProgress(taskIdentifier,
GlobusTaskInProgress.TaskType.DOWNLOAD,
dataset,
endpoint.getClientToken(),
authUser instanceof AuthenticatedUser ? authUser : null,
ruleId,
new Timestamp(startDate.getTime()));
em.persist(taskInProgress);

if (fileHandler != null) {
fileHandler.close();
}

// return and forget
return;
}

task = globusStatusCheck(endpoint, taskIdentifier, globusLogger);
// @todo null check?
String taskStatus = GlobusUtil.getTaskStatus(task);
Expand Down Expand Up @@ -1258,20 +1282,41 @@ private GlobusTaskState globusStatusCheck(GlobusEndpoint endpoint, String taskId
GlobusTaskState task = null;
int pollingInterval = SystemConfig.getIntLimitFromStringOrDefault(
settingsSvc.getValueForKey(SettingsServiceBean.Key.GlobusPollingInterval), 50);
int retries = 0;
do {
try {
globusLogger.info("checking globus transfer task " + taskId);
Thread.sleep(pollingInterval * 1000);
// Call the (centralized) Globus API to check on the task state/status:
task = getTask(endpoint.getClientToken(), taskId, globusLogger);
taskCompleted = GlobusUtil.isTaskCompleted(task);
if (taskCompleted) {
if (task.getStatus().equalsIgnoreCase("ACTIVE")) {
retries++;
// isTaskCompleted() method assumes that a task that is still
// being reported as "ACTIVE" is in fact completed if its
// "nice status" is neither "ok" nor "queued". If that is the
// case, we want it to happen at least 3 times in a row before
// we give up on this task.
globusLogger.fine("Task is reported as \"ACTIVE\", but appears completed (nice_status: "
+ task.getNice_status()
+ ", "
+ retries
+ " attempts so far");
taskCompleted = retries > 3;
}
} else {
retries = 0;
}

} catch (Exception ex) {
logger.warning("Caught exception while in globusStatusCheck(); stack trace below");
ex.printStackTrace();
}

} while (!taskCompleted);

globusLogger.info("globus transfer task completed successfully");
globusLogger.info("globus transfer task completed");
return task;
}

Expand Down
3 changes: 3 additions & 0 deletions src/main/java/edu/harvard/iq/dataverse/globus/GlobusUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ public static boolean isTaskCompleted(GlobusTaskState task) {
String status = task.getStatus();
if (status != null) {
if (status.equalsIgnoreCase("ACTIVE")) {
// TODO: "nice_status": "CONNECTION_FAILED" *may* mean
// that it's a Globus issue on the endnode side, that is
// in fact recoverable; should we add it to the list here?
if (task.getNice_status().equalsIgnoreCase("ok")
|| task.getNice_status().equalsIgnoreCase("queued")) {
return false;
Expand Down

0 comments on commit dab3884

Please sign in to comment.