From 20545eccbda00a0e73d1d8a0695704ff7e40225e Mon Sep 17 00:00:00 2001 From: utas-raymondng Date: Wed, 28 Aug 2024 14:53:23 +1000 Subject: [PATCH 1/2] Avoid flooding and cause geonetwork too busy error --- .../configuration/IndexerConfig.java | 24 +++++++++++++++++++ .../controller/IndexerController.java | 8 ------- .../esindexer/service/IndexerServiceImpl.java | 10 ++++++-- .../service/IndexerServiceTests.java | 22 ++++++++++++----- .../src/test/resources/application-test.yaml | 4 ++++ 5 files changed, 52 insertions(+), 16 deletions(-) diff --git a/indexer/src/main/java/au/org/aodn/esindexer/configuration/IndexerConfig.java b/indexer/src/main/java/au/org/aodn/esindexer/configuration/IndexerConfig.java index 14376027..c2a74274 100644 --- a/indexer/src/main/java/au/org/aodn/esindexer/configuration/IndexerConfig.java +++ b/indexer/src/main/java/au/org/aodn/esindexer/configuration/IndexerConfig.java @@ -1,13 +1,19 @@ package au.org.aodn.esindexer.configuration; import au.org.aodn.esindexer.utils.VocabsUtils; +import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.retry.annotation.EnableRetry; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.Executor; @Configuration @EnableRetry +@EnableAsync public class IndexerConfig { /** * We need to create component here because we do not want to run test with real http connection @@ -20,4 +26,22 @@ public class IndexerConfig { public VocabsUtils createVocabsUtils() { return new VocabsUtils(); } + /** + * This executor is used to limit the number of concurrent call to index metadata so not to flood the + * geonetwork. This is useful because the geonetwork do not care about re-index call it invoke, hence + * the elastic of geonetwork may be flooded by its re-index call. + * @return - An async task executor + */ + @Bean(name = "asyncIndexMetadata") + public Executor taskExecutor( + @Value("${app.indexing.pool.core:5}") Integer coreSize, + @Value("${app.indexing.pool.max:10}") Integer coreMax) { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(coreSize); // Number of concurrent threads + executor.setMaxPoolSize(coreMax); // Max number of concurrent threads + executor.setQueueCapacity(5000); // Size of the queue + executor.setThreadNamePrefix("Async-"); + executor.initialize(); + return executor; + } } diff --git a/indexer/src/main/java/au/org/aodn/esindexer/controller/IndexerController.java b/indexer/src/main/java/au/org/aodn/esindexer/controller/IndexerController.java index b1dab74e..ecdde0b6 100644 --- a/indexer/src/main/java/au/org/aodn/esindexer/controller/IndexerController.java +++ b/indexer/src/main/java/au/org/aodn/esindexer/controller/IndexerController.java @@ -34,14 +34,6 @@ public class IndexerController { @Autowired GeoNetworkService geonetworkResourceService; -// @GetMapping(path="/gn_records/{uuid}", produces = "application/json") -// @Operation(description = "Get a document from GeoNetwork Elasticsearch by UUID") -// public ResponseEntity getMetadataRecordFromGeoNetworkElasticsearchByUUID(@PathVariable("uuid") String uuid) { -// logger.info("getting a document by UUID: " + uuid); -// JSONObject response = geonetworkResourceService.searchMetadataBy(uuid); -// return ResponseEntity.status(HttpStatus.OK).body(response.toString()); -// } - @GetMapping(path="/records/{uuid}", produces = "application/json") @Operation(description = "Get a document from GeoNetwork by UUID directly - JSON format response") public ResponseEntity getMetadataRecordFromGeoNetworkByUUID(@PathVariable("uuid") String uuid) { diff --git a/indexer/src/main/java/au/org/aodn/esindexer/service/IndexerServiceImpl.java b/indexer/src/main/java/au/org/aodn/esindexer/service/IndexerServiceImpl.java index 41a6e4c4..da4956e0 100644 --- a/indexer/src/main/java/au/org/aodn/esindexer/service/IndexerServiceImpl.java +++ b/indexer/src/main/java/au/org/aodn/esindexer/service/IndexerServiceImpl.java @@ -3,7 +3,6 @@ import au.org.aodn.esindexer.configuration.AppConstants; import au.org.aodn.esindexer.exception.*; import au.org.aodn.esindexer.utils.JaxbUtils; -import au.org.aodn.esindexer.utils.StringUtil; import au.org.aodn.metadata.iso19115_3_2018.MDMetadataType; import au.org.aodn.stac.model.RecordSuggest; import au.org.aodn.stac.model.StacCollectionModel; @@ -29,6 +28,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; +import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import java.io.ByteArrayInputStream; @@ -170,7 +170,13 @@ protected StacCollectionModel getMappedMetadataValues(String metadataValues) thr return stacCollectionModel; } - + /** + * Use to index a particular UUID, the async is used to limit the number of same function call to avoid flooding + * the system. + * @param metadataValues - The XML of the metadata + * @return - The STAC doc in string format. + */ + @Async("asyncIndexMetadata") public ResponseEntity indexMetadata(String metadataValues) { try { StacCollectionModel mappedMetadataValues = this.getMappedMetadataValues(metadataValues); diff --git a/indexer/src/test/java/au/org/aodn/esindexer/service/IndexerServiceTests.java b/indexer/src/test/java/au/org/aodn/esindexer/service/IndexerServiceTests.java index e2eabaec..f37f68b9 100644 --- a/indexer/src/test/java/au/org/aodn/esindexer/service/IndexerServiceTests.java +++ b/indexer/src/test/java/au/org/aodn/esindexer/service/IndexerServiceTests.java @@ -27,7 +27,7 @@ public class IndexerServiceTests extends BaseTestClass { protected GeoNetworkServiceImpl geoNetworkService; @Autowired - protected IndexerServiceImpl indexerService; + protected IndexerService indexerService; @Autowired protected ObjectMapper indexerObjectMapper; @@ -60,9 +60,14 @@ public void verifyIsMetadataPublished() throws IOException { insertMetadataRecords(uuid1, "classpath:canned/sample1.xml"); insertMetadataRecords(uuid2, "classpath:canned/sample2.xml"); - Assertions.assertTrue(indexerService.isMetadataPublished(uuid1), uuid1 + " published"); - Assertions.assertTrue(indexerService.isMetadataPublished(uuid2), uuid2 + " published"); - Assertions.assertFalse(indexerService.isMetadataPublished("not-exist"), "Not exist and not published"); + if(indexerService instanceof IndexerServiceImpl impl) { + Assertions.assertTrue(impl.isMetadataPublished(uuid1), uuid1 + " published"); + Assertions.assertTrue(impl.isMetadataPublished(uuid2), uuid2 + " published"); + Assertions.assertFalse(impl.isMetadataPublished("not-exist"), "Not exist and not published"); + } + else { + Assertions.fail("IndexerServiceImpl expected"); + } } finally { deleteRecord(uuid1); @@ -77,8 +82,13 @@ public void verifyIsMetadataPublished() throws IOException { public void verifyGeoNetworkInstanceReinstalled() throws IOException { String uuid = "9e5c3031-a026-48b3-a153-a70c2e2b78b9"; try { - insertMetadataRecords(uuid, "classpath:canned/sample1.xml"); - Assertions.assertTrue(indexerService.isGeoNetworkInstanceReinstalled(1), "New installed"); + if(indexerService instanceof IndexerServiceImpl impl) { + insertMetadataRecords(uuid, "classpath:canned/sample1.xml"); + Assertions.assertTrue(impl.isGeoNetworkInstanceReinstalled(1), "New installed"); + } + else { + Assertions.fail("IndexerServiceImpl expected"); + } } finally { deleteRecord(uuid); diff --git a/indexer/src/test/resources/application-test.yaml b/indexer/src/test/resources/application-test.yaml index 86cd1763..3baa2d6d 100644 --- a/indexer/src/test/resources/application-test.yaml +++ b/indexer/src/test/resources/application-test.yaml @@ -1,5 +1,9 @@ # Client calling the Indexer API must provide this token in the Authorization header app: + indexing: + pool: + core: 1 + max: 1 http: authToken: sample-auth-token From 50ec4359452646f9e100e4641ae85b54afc991c9 Mon Sep 17 00:00:00 2001 From: utas-raymondng Date: Wed, 28 Aug 2024 14:59:33 +1000 Subject: [PATCH 2/2] Change interface so we can connect to proxy of IndexerService --- .../esindexer/service/IndexerService.java | 3 +++ .../esindexer/service/IndexerServiceImpl.java | 6 ++++-- .../service/IndexerServiceTests.java | 20 +++++-------------- 3 files changed, 12 insertions(+), 17 deletions(-) diff --git a/indexer/src/main/java/au/org/aodn/esindexer/service/IndexerService.java b/indexer/src/main/java/au/org/aodn/esindexer/service/IndexerService.java index 084a9104..88760bca 100644 --- a/indexer/src/main/java/au/org/aodn/esindexer/service/IndexerService.java +++ b/indexer/src/main/java/au/org/aodn/esindexer/service/IndexerService.java @@ -12,6 +12,7 @@ import java.util.List; public interface IndexerService { + // Event call back to notify caller, this avoid gateway timeout as we have message back to browser interface Callback { void onProgress(Object update); void onComplete(Object result); @@ -20,4 +21,6 @@ interface Callback { ResponseEntity deleteDocumentByUUID(String uuid) throws IOException; List indexAllMetadataRecordsFromGeoNetwork(boolean confirm, Callback callback) throws IOException; Hit getDocumentByUUID(String uuid) throws IOException; + boolean isMetadataPublished(String uuid); + boolean isGeoNetworkInstanceReinstalled(long portalIndexDocumentsCount); } diff --git a/indexer/src/main/java/au/org/aodn/esindexer/service/IndexerServiceImpl.java b/indexer/src/main/java/au/org/aodn/esindexer/service/IndexerServiceImpl.java index da4956e0..fe372f8c 100644 --- a/indexer/src/main/java/au/org/aodn/esindexer/service/IndexerServiceImpl.java +++ b/indexer/src/main/java/au/org/aodn/esindexer/service/IndexerServiceImpl.java @@ -101,7 +101,8 @@ public Hit getDocumentByUUID(String uuid) throws IOException { } } - protected boolean isGeoNetworkInstanceReinstalled(long portalIndexDocumentsCount) { + @Override + public boolean isGeoNetworkInstanceReinstalled(long portalIndexDocumentsCount) { /** * compare if GeoNetwork has 1 only metadata (the recently added one which triggered the indexer) * and the portal index has more than 0 documents (the most recent metadata yet indexed to portal index at this point) @@ -109,7 +110,8 @@ protected boolean isGeoNetworkInstanceReinstalled(long portalIndexDocumentsCount return geoNetworkResourceService.isMetadataRecordsCountLessThan(2) && portalIndexDocumentsCount > 0; } - protected boolean isMetadataPublished(String uuid) { + @Override + public boolean isMetadataPublished(String uuid) { /* read for the published status from GN Elasticsearch index, the flag is not part of the XML body */ try { geoNetworkResourceService.searchRecordBy(uuid); diff --git a/indexer/src/test/java/au/org/aodn/esindexer/service/IndexerServiceTests.java b/indexer/src/test/java/au/org/aodn/esindexer/service/IndexerServiceTests.java index f37f68b9..015e3cc7 100644 --- a/indexer/src/test/java/au/org/aodn/esindexer/service/IndexerServiceTests.java +++ b/indexer/src/test/java/au/org/aodn/esindexer/service/IndexerServiceTests.java @@ -60,14 +60,9 @@ public void verifyIsMetadataPublished() throws IOException { insertMetadataRecords(uuid1, "classpath:canned/sample1.xml"); insertMetadataRecords(uuid2, "classpath:canned/sample2.xml"); - if(indexerService instanceof IndexerServiceImpl impl) { - Assertions.assertTrue(impl.isMetadataPublished(uuid1), uuid1 + " published"); - Assertions.assertTrue(impl.isMetadataPublished(uuid2), uuid2 + " published"); - Assertions.assertFalse(impl.isMetadataPublished("not-exist"), "Not exist and not published"); - } - else { - Assertions.fail("IndexerServiceImpl expected"); - } + Assertions.assertTrue(indexerService.isMetadataPublished(uuid1), uuid1 + " published"); + Assertions.assertTrue(indexerService.isMetadataPublished(uuid2), uuid2 + " published"); + Assertions.assertFalse(indexerService.isMetadataPublished("not-exist"), "Not exist and not published"); } finally { deleteRecord(uuid1); @@ -82,13 +77,8 @@ public void verifyIsMetadataPublished() throws IOException { public void verifyGeoNetworkInstanceReinstalled() throws IOException { String uuid = "9e5c3031-a026-48b3-a153-a70c2e2b78b9"; try { - if(indexerService instanceof IndexerServiceImpl impl) { - insertMetadataRecords(uuid, "classpath:canned/sample1.xml"); - Assertions.assertTrue(impl.isGeoNetworkInstanceReinstalled(1), "New installed"); - } - else { - Assertions.fail("IndexerServiceImpl expected"); - } + insertMetadataRecords(uuid, "classpath:canned/sample1.xml"); + Assertions.assertTrue(indexerService.isGeoNetworkInstanceReinstalled(1), "New installed"); } finally { deleteRecord(uuid);