Skip to content

Commit

Permalink
Merge pull request #128 from aodn/features/5851-resume-reload
Browse files Browse the repository at this point in the history
Features/5851 resume reload
  • Loading branch information
utas-raymondng authored Sep 3, 2024
2 parents 1294de4 + 90a214a commit 502aa98
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,22 @@ public ResponseEntity<ObjectNode> getDocumentByUUID(@PathVariable("uuid") String
ObjectNode response = indexerService.getDocumentByUUID(uuid).source();
return ResponseEntity.status(HttpStatus.OK).body(response);
}

/**
* A synchronized load operation, useful for local run but likely fail in cloud due to gateway time out. No response
* come back unlike everything done. Please use async load with postman if you want feedback constantly.
*
* @param confirm - Must set to true to begin load
* @param beginWithUuid - You want to start load with particular uuid, it is useful for resume previous incomplete reload
* @return A string contains all ingested record status
* @throws IOException - Any failure during reload, it is the called to handle the error
*/
@PostMapping(path="/all", consumes = "application/json", produces = "application/json")
@Operation(security = { @SecurityRequirement(name = "X-API-Key") }, description = "Index all metadata records from GeoNetwork")
public ResponseEntity<String> indexAllMetadataRecords(@RequestParam(value = "confirm", defaultValue = "false") Boolean confirm) throws IOException {
List<BulkResponse> responses = indexerService.indexAllMetadataRecordsFromGeoNetwork(confirm, null);
public ResponseEntity<String> indexAllMetadataRecords(
@RequestParam(value = "confirm", defaultValue = "false") Boolean confirm,
@RequestParam(value = "beginWithUuid", required=false) String beginWithUuid) throws IOException {

List<BulkResponse> responses = indexerService.indexAllMetadataRecordsFromGeoNetwork(beginWithUuid, confirm, null);
return ResponseEntity.ok(responses.toString());
}
/**
Expand All @@ -62,19 +73,23 @@ public ResponseEntity<String> indexAllMetadataRecords(@RequestParam(value = "con
* Noted: There is a bug in postman desktop, so either you run postman using web-browser with agent directly
* or you need to have version 10.2 or above in order to get the emitted result
*
* @param confirm
* @return
* @param confirm - Must set to true to begin load
* @param beginWithUuid - You want to start load with particular uuid, it is useful for resume previous incomplete reload
* @return The SSeEmitter for status update, you can use it to tell which record is being ingested and ingest status.
*/
@PostMapping(path="/async/all")
@Operation(security = { @SecurityRequirement(name = "X-API-Key") }, description = "Index all metadata records from GeoNetwork")
public SseEmitter indexAllMetadataRecordsAsync(@RequestParam(value = "confirm", defaultValue = "false") Boolean confirm) {
public SseEmitter indexAllMetadataRecordsAsync(
@RequestParam(value = "confirm", defaultValue = "false") Boolean confirm,
@RequestParam(value = "beginWithUuid", required=false) String beginWithUuid) {

final SseEmitter emitter = new SseEmitter(0L); // 0L means no timeout;

IndexerService.Callback callback = new IndexerService.Callback() {
@Override
public void onProgress(Object update) {
try {
log.info("Send update to client");
log.info("Send update with content - {}", update.toString());
SseEmitter.SseEventBuilder event = SseEmitter.event()
.data(update.toString())
.id(String.valueOf(update.hashCode()))
Expand All @@ -83,6 +98,8 @@ public void onProgress(Object update) {
emitter.send(event);
}
catch (IOException e) {
// In case of fail, try close the stream, if it cannot be closed. (likely stream terminated
// already, the load error out and we need to result from a particular uuid.
emitter.completeWithError(e);
}
}
Expand All @@ -107,7 +124,7 @@ public void onComplete(Object result) {

new Thread(() -> {
try {
indexerService.indexAllMetadataRecordsFromGeoNetwork(confirm, callback);
indexerService.indexAllMetadataRecordsFromGeoNetwork(beginWithUuid, confirm, callback);
}
catch(IOException e) {
emitter.completeWithError(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public interface GeoNetworkService {
* Return Iterable of records, noted that the item inside can be null, so please check null on each item
* @return
*/
Iterable<String> getAllMetadataRecords();
Iterable<String> getAllMetadataRecords(String beginWithUuid);
/**
* This function can avoid elastic outsync and achieve what we need here as the only use case is
* check if there is only 1 document in elastic.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,6 @@ public class GeoNetworkServiceImpl implements GeoNetworkService {

@Value("${elasticsearch.query.pageSize:100}")
protected int ES_PAGE_SIZE;
// Use for debug only if run the indexer locally and hit an issue, you do
// not want to start from the start, by setting this env value, it will start from the UUID
// that follows.
@Value("${elasticsearch.query.startingUUID:#{null}}")
protected String startingUUID;

protected FIFOCache<String, Map<String, ?>> cache;
protected RestTemplate indexerRestTemplate;
Expand Down Expand Up @@ -420,8 +415,8 @@ public boolean isMetadataRecordsCountLessThan(int c) {
backoff = @Backoff(delay = 1500L)
)
@Override
public Iterable<String> getAllMetadataRecords() {
SearchRequest req = createSearchAllUUID(startingUUID);
public Iterable<String> getAllMetadataRecords(String beginWithUUid) {
SearchRequest req = createSearchAllUUID(beginWithUUid);
try {
final AtomicReference<String> lastUUID = new AtomicReference<>(null);
final AtomicReference<SearchResponse<ObjectNode>> response =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ interface Callback {
}
CompletableFuture<ResponseEntity<String>> indexMetadata(String metadataValues) throws IOException, FactoryException, TransformException, JAXBException;
ResponseEntity<String> deleteDocumentByUUID(String uuid) throws IOException;
List<BulkResponse> indexAllMetadataRecordsFromGeoNetwork(boolean confirm, Callback callback) throws IOException;
List<BulkResponse> indexAllMetadataRecordsFromGeoNetwork(String beginWithUuid, boolean confirm, Callback callback) throws IOException;
Hit<ObjectNode> getDocumentByUUID(String uuid) throws IOException;
boolean isMetadataPublished(String uuid);
boolean isGeoNetworkInstanceReinstalled(long portalIndexDocumentsCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,21 +251,26 @@ public ResponseEntity<String> deleteDocumentByUUID(String uuid) throws IOExcepti
}
}

public List<BulkResponse> indexAllMetadataRecordsFromGeoNetwork(boolean confirm, Callback callback) throws IOException {
public List<BulkResponse> indexAllMetadataRecordsFromGeoNetwork(String beginWithUuid, boolean confirm, Callback callback) throws IOException {
if (!confirm) {
throw new IndexAllRequestNotConfirmedException("Please confirm that you want to index all metadata records from GeoNetwork");
}

// recreate index from mapping JSON file
elasticSearchIndexService.createIndexFromMappingJSONFile(AppConstants.PORTAL_RECORDS_MAPPING_JSON_FILE, indexName);

log.info("Indexing all metadata records from GeoNetwork");
if(beginWithUuid == null) {
log.info("Indexing all metadata records from GeoNetwork");
// recreate index from mapping JSON file
elasticSearchIndexService.createIndexFromMappingJSONFile(AppConstants.PORTAL_RECORDS_MAPPING_JSON_FILE, indexName);
}
else {
log.info("Resume indexing records from GeoNetwork at {}", beginWithUuid);
}

BulkRequest.Builder bulkRequest = new BulkRequest.Builder();
List<BulkResponse> results = new ArrayList<>();

long dataSize = 0;
for (String metadataRecord : geoNetworkResourceService.getAllMetadataRecords()) {
long total = 0;
for (String metadataRecord : geoNetworkResourceService.getAllMetadataRecords(beginWithUuid)) {
if(metadataRecord != null) {
try {
// get mapped metadata values from GeoNetwork to STAC collection schema
Expand Down Expand Up @@ -298,13 +303,15 @@ public List<BulkResponse> indexAllMetadataRecordsFromGeoNetwork(boolean confirm,
)
);
dataSize += size;
total++;

if(callback != null) {
callback.onProgress(
String.format(
"Add uuid %s to batch, current batch size is %s",
"Add uuid %s to batch, batch size is %s, total is %s",
mappedMetadataValues.getUuid(),
dataSize)
dataSize,
total)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,6 @@ public void login() {
}

public void deleteRecord(String... uuids) {
CountDownLatch latch = new CountDownLatch(1);

HttpEntity<String> requestEntity = getRequestEntity(null);

// retry the request if the server is not ready yet (sometimes will return 403 and can be resolved by retrying )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public void setup() throws IOException {
dockerComposeContainer.getServicePort(GeoNetworkSearchTestConfig.GN_NAME, GeoNetworkSearchTestConfig.GN_PORT))
);
clearElasticIndex(INDEX_NAME);
triggerIndexer(getRequestEntity(null), true);
}

@AfterEach
Expand All @@ -71,7 +72,7 @@ public void clear() throws IOException {
/**
* We need to make sure this works before you can do any meaningful transformation
*
* @throws IOException
* @throws IOException - Not expect to throw
*/
@Test
@Order(1)
Expand All @@ -84,7 +85,7 @@ public void verifyInsertMetadataWorks() throws IOException {
Assertions.assertFalse(geoNetworkService.isMetadataRecordsCountLessThan(1), "Compare false");
Assertions.assertTrue(geoNetworkService.isMetadataRecordsCountLessThan(2), "Compare true");

Iterable<String> i = geoNetworkService.getAllMetadataRecords();
Iterable<String> i = geoNetworkService.getAllMetadataRecords(null);

for (String x : i) {
if (x != null) {
Expand Down Expand Up @@ -193,7 +194,7 @@ public void verifyAllMetadataRecords() throws IOException {
insertMetadataRecords("9e5c3031-a026-48b3-a153-a70c2e2b78b9", "classpath:canned/sample1.xml");
insertMetadataRecords("830f9a83-ae6b-4260-a82a-24c4851f7119", "classpath:canned/sample2.xml");

Iterable<String> i = geoNetworkService.getAllMetadataRecords();
Iterable<String> i = geoNetworkService.getAllMetadataRecords(null);

// The content verified above, just make sure it returned the correct number
int count = 0;
Expand Down Expand Up @@ -236,7 +237,7 @@ public void verifyAllMetadataRecordWithPage() throws IOException, JAXBException
insertMetadataRecords(UUID6, "classpath:canned/sample6.xml");
insertMetadataRecords(UUID7, "classpath:canned/sample7.xml");

Iterable<String> i = geoNetworkService.getAllMetadataRecords();
Iterable<String> i = geoNetworkService.getAllMetadataRecords(null);

final List<MDMetadataType> xml = new ArrayList<>();
for(String x : i) {
Expand Down Expand Up @@ -348,7 +349,7 @@ public void verifyRetryOnGeonetworkBusyWorks() throws IOException, JAXBException
geoNetworkService.setGn4ElasticClient(spyClient);

// Should not flow exception and retry correctly given some call throw IOException
Iterable<String> i = geoNetworkService.getAllMetadataRecords();
Iterable<String> i = geoNetworkService.getAllMetadataRecords(null);

// It should handle retry internally and without throwing exception
for(String x : i) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public void verifyGetDocumentCount() throws IOException {
insertMetadataRecords(uuid1, "classpath:canned/sample2.xml");
insertMetadataRecords(uuid2, "classpath:canned/sample1.xml");

indexerService.indexAllMetadataRecordsFromGeoNetwork(true, null);
indexerService.indexAllMetadataRecordsFromGeoNetwork(null, true, null);

// The sample1 geometry have error [1:9695] failed to parse field [summaries.proj:geometry] of type [geo_shape]
// ErrorCause: {"type":"illegal_argument_exception","reason":"Polygon self-intersection at lat=57.0 lon=-66.0"}
Expand All @@ -123,7 +123,7 @@ public void verifyDeleteDocumentByUUID() throws IOException {
insertMetadataRecords(uuid1, "classpath:canned/sample2.xml");
insertMetadataRecords(uuid2, "classpath:canned/sample3.xml");

indexerService.indexAllMetadataRecordsFromGeoNetwork(true, null);
indexerService.indexAllMetadataRecordsFromGeoNetwork(null, true, null);
Assertions.assertEquals(2L, elasticSearchIndexService.getDocumentsCount(INDEX_NAME), "Doc count correct");

// Only 2 doc in elastic, if we delete it then should be zero
Expand All @@ -144,7 +144,7 @@ public void verifyGetDocumentByUUID() throws IOException {

insertMetadataRecords(uuid, "classpath:canned/sample4.xml");

indexerService.indexAllMetadataRecordsFromGeoNetwork(true, null);
indexerService.indexAllMetadataRecordsFromGeoNetwork(null, true, null);
Hit<ObjectNode> objectNodeHit = indexerService.getDocumentByUUID(uuid);

String test = String.valueOf(Objects.requireNonNull(objectNodeHit.source()));
Expand Down Expand Up @@ -175,7 +175,7 @@ public void verifyAssociatedRecordIndexer() throws IOException{
insertMetadataRecords(siblingId, "classpath:canned/associated/sibling.xml");
insertMetadataRecords(childId, "classpath:canned/associated/child.xml");

indexerService.indexAllMetadataRecordsFromGeoNetwork(true, null);
indexerService.indexAllMetadataRecordsFromGeoNetwork(null, true, null);
var targetResult = indexerService.getDocumentByUUID(targetRecordId);
String resultJson = String.valueOf(Objects.requireNonNull(targetResult.source()));

Expand Down Expand Up @@ -203,7 +203,7 @@ public void verifyLogoLinkAddedOnIndex() throws IOException {

insertMetadataRecords(uuid, "classpath:canned/sample5.xml");

indexerService.indexAllMetadataRecordsFromGeoNetwork(true, null);
indexerService.indexAllMetadataRecordsFromGeoNetwork(null,true, null);
Hit<ObjectNode> objectNodeHit = indexerService.getDocumentByUUID(uuid);

String test = String.valueOf(Objects.requireNonNull(objectNodeHit.source()));
Expand All @@ -229,7 +229,7 @@ public void verifyThumbnailLinkAddedOnIndex() throws IOException {

insertMetadataRecords(uuid, "classpath:canned/sample6.xml");

indexerService.indexAllMetadataRecordsFromGeoNetwork(true, null);
indexerService.indexAllMetadataRecordsFromGeoNetwork(null, true, null);
Hit<ObjectNode> objectNodeHit = indexerService.getDocumentByUUID(uuid);

String test = String.valueOf(Objects.requireNonNull(objectNodeHit.source()));
Expand All @@ -256,7 +256,7 @@ public void verifyThumbnailLinkNullAddedOnIndex() throws IOException {

insertMetadataRecords(uuid, "classpath:canned/sample7.xml");

indexerService.indexAllMetadataRecordsFromGeoNetwork(true, null);
indexerService.indexAllMetadataRecordsFromGeoNetwork(null,true, null);
Hit<ObjectNode> objectNodeHit = indexerService.getDocumentByUUID(uuid);

String test = String.valueOf(Objects.requireNonNull(objectNodeHit.source()));
Expand All @@ -283,7 +283,7 @@ public void verifyAbstractPhrases() throws IOException {

insertMetadataRecords(uuid, "classpath:canned/sample7.xml");

indexerService.indexAllMetadataRecordsFromGeoNetwork(true, null);
indexerService.indexAllMetadataRecordsFromGeoNetwork(null,true, null);
Hit<ObjectNode> objectNodeHit = indexerService.getDocumentByUUID(uuid);

String test = Objects.requireNonNull(objectNodeHit.source()).toPrettyString();
Expand Down

0 comments on commit 502aa98

Please sign in to comment.