diff --git a/indexer/src/main/java/au/org/aodn/esindexer/configuration/AppConstants.java b/indexer/src/main/java/au/org/aodn/esindexer/configuration/AppConstants.java index a65ff1b1..df00f348 100644 --- a/indexer/src/main/java/au/org/aodn/esindexer/configuration/AppConstants.java +++ b/indexer/src/main/java/au/org/aodn/esindexer/configuration/AppConstants.java @@ -2,6 +2,7 @@ public interface AppConstants { String PORTAL_RECORDS_MAPPING_JSON_FILE = "portal_records_index_schema.json"; + String DATASET_INDEX_MAPPING_JSON_FILE = "dataset_index_schema.json"; String FORMAT_XML = "xml"; String FORMAT_ISO19115_3_2018 = "iso19115-3.2018"; diff --git a/indexer/src/main/java/au/org/aodn/esindexer/configuration/DatasetAccessConfig.java b/indexer/src/main/java/au/org/aodn/esindexer/configuration/DatasetAccessConfig.java new file mode 100644 index 00000000..6e88bfdf --- /dev/null +++ b/indexer/src/main/java/au/org/aodn/esindexer/configuration/DatasetAccessConfig.java @@ -0,0 +1,17 @@ +package au.org.aodn.esindexer.configuration; + +import au.org.aodn.esindexer.service.DataAccessServiceImpl; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class DatasetAccessConfig { + + @Bean(name = "DataAccessService") + public DataAccessServiceImpl createDataAccessService( + @Value("${dataaccess.host:defaultForTesting}") String serverUrl + ){ + return new DataAccessServiceImpl(serverUrl); + } +} diff --git a/indexer/src/main/java/au/org/aodn/esindexer/controller/IndexerController.java b/indexer/src/main/java/au/org/aodn/esindexer/controller/IndexerController.java index e4fdeb68..5ac921dd 100644 --- a/indexer/src/main/java/au/org/aodn/esindexer/controller/IndexerController.java +++ b/indexer/src/main/java/au/org/aodn/esindexer/controller/IndexerController.java @@ -1,22 +1,27 @@ package au.org.aodn.esindexer.controller; -import au.org.aodn.esindexer.service.IndexerService; +import au.org.aodn.esindexer.model.Dataset; +import au.org.aodn.esindexer.service.DataAccessService; import au.org.aodn.esindexer.service.GeoNetworkService; +import au.org.aodn.esindexer.service.IndexerService; import co.elastic.clients.elasticsearch.core.BulkResponse; import com.fasterxml.jackson.databind.node.ObjectNode; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.security.SecurityRequirement; +import io.swagger.v3.oas.annotations.tags.Tag; import jakarta.xml.bind.JAXBException; import lombok.extern.slf4j.Slf4j; import org.opengis.referencing.FactoryException; import org.opengis.referencing.operation.TransformException; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.http.*; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.*; -import io.swagger.v3.oas.annotations.tags.Tag; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import java.io.IOException; +import java.time.LocalDate; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -33,6 +38,9 @@ public class IndexerController { @Autowired GeoNetworkService geonetworkResourceService; + @Autowired + DataAccessService dataAccessService; + @GetMapping(path="/records/{uuid}", produces = "application/json") @Operation(description = "Get a document from GeoNetwork by UUID directly - JSON format response") public ResponseEntity getMetadataRecordFromGeoNetworkByUUID(@PathVariable("uuid") String uuid) { @@ -149,4 +157,67 @@ public ResponseEntity addDocumentByUUID(@PathVariable("uuid") String uui public ResponseEntity deleteDocumentByUUID(@PathVariable("uuid") String uuid) throws IOException { return indexerService.deleteDocumentByUUID(uuid); } + + @PostMapping(path="/{uuid}/dataset", produces = "application/json") + @Operation(security = {@SecurityRequirement(name = "X-API-Key") }, description = "Index a dataset by UUID") + public ResponseEntity> addDatasetByUUID(@PathVariable("uuid") String uuid) { + + // For making sure the dataset entry is not too big, they will be split into smaller chunks by yearmonth + // By default, we assume the dataset started from 1970-01, and until now + LocalDate maxDate = LocalDate.now(); + LocalDate startDate = LocalDate.of(1970, 1, 1); + List>> futures = new ArrayList<>(); + + try{ + while (startDate.isBefore(maxDate)) { + // For speed optimizing, check whether data is existing in this year. If no data, skip to next year + var endDate = startDate.plusYears(1).minusDays(1); + var hasData = dataAccessService.doesDataExist(uuid, startDate, endDate); + if (!hasData) { + log.info("No data found for dataset {} from {} to {}", uuid, startDate, endDate); + startDate = startDate.plusYears(1); + continue; + } + + futures.addAll(indexDatasetMonthly(uuid, startDate, endDate)); + startDate = startDate.plusYears(1); + } + + CompletableFuture allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); + allFutures.join(); + List results = new ArrayList<>(); + for (CompletableFuture> future : futures) { + results.add(future.join().getBody()); + } + + return ResponseEntity.ok(results); + } catch (Exception e) { + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(List.of(e.getMessage())); + } + } + + private List>> indexDatasetMonthly( + String uuid, + LocalDate startDate, + LocalDate maxDate + ) throws InterruptedException, ExecutionException { + List>> futures = new ArrayList<>(); + var startDateToLoop = startDate; + + while (startDateToLoop.isBefore(maxDate)) { + var endDate = startDateToLoop.plusMonths(1).minusDays(1); + + Dataset dataset = dataAccessService.getIndexingDatasetBy(uuid, startDateToLoop, endDate); + if (dataset != null && dataset.data() != null && !dataset.data().isEmpty()) { + CompletableFuture> future = indexerService.indexDataset(dataset); + futures.add(future); + log.info("Indexing dataset {} from {} to {}", uuid, startDateToLoop, endDate); + future.get(); + } + startDateToLoop = startDateToLoop.plusMonths(1); + } + + return futures; + } + } diff --git a/indexer/src/main/java/au/org/aodn/esindexer/model/Dataset.java b/indexer/src/main/java/au/org/aodn/esindexer/model/Dataset.java new file mode 100644 index 00000000..7c9a5949 --- /dev/null +++ b/indexer/src/main/java/au/org/aodn/esindexer/model/Dataset.java @@ -0,0 +1,10 @@ +package au.org.aodn.esindexer.model; + +import java.time.YearMonth; +import java.util.List; + +public record Dataset( + String uuid, + YearMonth yearMonth, + List data +) {} diff --git a/indexer/src/main/java/au/org/aodn/esindexer/model/Datum.java b/indexer/src/main/java/au/org/aodn/esindexer/model/Datum.java new file mode 100644 index 00000000..81360e8c --- /dev/null +++ b/indexer/src/main/java/au/org/aodn/esindexer/model/Datum.java @@ -0,0 +1,49 @@ +package au.org.aodn.esindexer.model; + +import lombok.Getter; +import lombok.Setter; + +import java.time.LocalDate; + +// If more fields are needed to be filtered, please add more columns here +// and don't forget updating the override equals() method +@Getter +@Setter +public class Datum { + + + private final LocalDate time; + private final double longitude; + private final double latitude; + private final double depth; + + private long count = 1; + + public Datum(LocalDate time, double longitude, double latitude, double depth) { + this.time = time; + this.longitude = longitude; + this.latitude = latitude; + this.depth = depth; + } + + // putting all same record into one instance and increment the count is more efficient + public void incrementCount() { + count++; + } + + // Don't include variable "count" in the equals() method. + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + Datum that = (Datum) obj; + return Double.compare(that.longitude, longitude) == 0 && + Double.compare(that.latitude, latitude) == 0 && + Double.compare(that.depth, depth) == 0 && + time.equals(that.time); + } +} diff --git a/indexer/src/main/java/au/org/aodn/esindexer/service/DataAccessService.java b/indexer/src/main/java/au/org/aodn/esindexer/service/DataAccessService.java new file mode 100644 index 00000000..82654d48 --- /dev/null +++ b/indexer/src/main/java/au/org/aodn/esindexer/service/DataAccessService.java @@ -0,0 +1,12 @@ +package au.org.aodn.esindexer.service; + +import au.org.aodn.esindexer.model.Dataset; + +import java.time.LocalDate; + +public interface DataAccessService { + Dataset getIndexingDatasetBy(String uuid, LocalDate startDate, LocalDate endDate); + boolean doesDataExist(String uuid, LocalDate startDate, LocalDate endDate); + String getServiceUrl(); + void setServiceUrl(String url); +} diff --git a/indexer/src/main/java/au/org/aodn/esindexer/service/DataAccessServiceImpl.java b/indexer/src/main/java/au/org/aodn/esindexer/service/DataAccessServiceImpl.java new file mode 100644 index 00000000..466a9354 --- /dev/null +++ b/indexer/src/main/java/au/org/aodn/esindexer/service/DataAccessServiceImpl.java @@ -0,0 +1,154 @@ +package au.org.aodn.esindexer.service; + +import au.org.aodn.esindexer.exception.MetadataNotFoundException; +import au.org.aodn.esindexer.model.Datum; +import au.org.aodn.esindexer.model.Dataset; +import org.springframework.http.*; +import org.springframework.web.client.HttpClientErrorException; +import org.springframework.web.client.RestTemplate; +import org.springframework.web.util.UriComponentsBuilder; + +import java.time.LocalDate; +import java.time.YearMonth; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class DataAccessServiceImpl implements DataAccessService { + + private String serverUrl; + + @Override + public String getServiceUrl() { + return serverUrl; + } + + @Override + public void setServiceUrl(String url) { + this.serverUrl = url; + } + + public DataAccessServiceImpl(String serverUrl) { + setServiceUrl(serverUrl); + } + + private final RestTemplate restTemplate = new RestTemplate(); + @Override + public Dataset getIndexingDatasetBy(String uuid, LocalDate startDate, LocalDate endDate) { + + // currently, we force to get data in the same month and year to simplify the logic + if (startDate.getMonth() != endDate.getMonth() || startDate.getYear() != endDate.getYear()) { + throw new IllegalArgumentException("Start date and end date must be in the same month and year"); + } + + try { + HttpEntity request = getRequestEntity(null, null); + + Map params = new HashMap<>(); + params.put("uuid", uuid); + + String url = UriComponentsBuilder.fromHttpUrl(getDataAccessEndpoint() + "/data/{uuid}") + .queryParam("is_to_index", "true") + .queryParam("start_date", startDate) + .queryParam("end_date", endDate) + .buildAndExpand(uuid) + .toUriString(); + + ResponseEntity responseEntity = restTemplate.exchange( + url, + HttpMethod.GET, + request, + Datum[].class, + params + ); + + + + if (responseEntity.getStatusCode().is2xxSuccessful()) { + List data = new ArrayList<>(); + if (responseEntity.getBody() != null) { + data = List.of(responseEntity.getBody()); + } + var dataToIndex = aggregateData(data); + return new Dataset( + uuid, + YearMonth.of(startDate.getYear(), startDate.getMonth()), + dataToIndex + ); + } + throw new RuntimeException("Unable to retrieve dataset with UUID: " + uuid ); + + } catch (HttpClientErrorException.NotFound e) { + throw new MetadataNotFoundException("Unable to find dataset with UUID: " + uuid + " in GeoNetwork"); + } catch (Exception e) { + throw new RuntimeException("Exception thrown while retrieving dataset with UUID: " + uuid + e.getMessage(), e); + } + } + + @Override + public boolean doesDataExist(String uuid, LocalDate startDate, LocalDate endDate) { + try { + HttpEntity request = getRequestEntity(null, null); + + Map params = new HashMap<>(); + params.put("uuid", uuid); + + String url = UriComponentsBuilder.fromHttpUrl(getDataAccessEndpoint() + "/data/{uuid}/has_data") + .queryParam("start_date", startDate) + .queryParam("end_date", endDate) + .buildAndExpand(uuid) + .toUriString(); + + ResponseEntity responseEntity = restTemplate.exchange( + url, + HttpMethod.GET, + request, + Boolean.class, + params + ); + + return Boolean.TRUE.equals(responseEntity.getBody()); + + } catch (HttpClientErrorException.NotFound e) { + throw new MetadataNotFoundException("Unable to find dataset with UUID: " + uuid + " in GeoNetwork"); + } catch (Exception e) { + throw new RuntimeException("Exception thrown while retrieving dataset with UUID: " + uuid + e.getMessage(), e); + } + } + + /** + * Summarize the data by counting the number if all the concerned fields are the same + * @param data the data to summarize + * @return the summarized data + */ + private List aggregateData(List data) { + var aggregatedData = new ArrayList(); + for (var datum: data) { + if (aggregatedData.contains(datum)) { + var existingDatum = aggregatedData.get(aggregatedData.indexOf(datum)); + existingDatum.incrementCount(); + } else { + aggregatedData.add(datum); + } + } + return aggregatedData; + } + + private String getDataAccessEndpoint() { + return getServiceUrl() + "/api/v1/das/"; + } + + + // parameters are not in use for now. May be useful in the future so just keep it + protected HttpEntity getRequestEntity(MediaType accept, String body) { + HttpHeaders headers = new HttpHeaders(); + headers.setAccept(List.of( + MediaType.TEXT_PLAIN, + MediaType.APPLICATION_JSON, + MediaType.valueOf("application/*+json"), + MediaType.ALL + )); + return body == null ? new org.springframework.http.HttpEntity<>(headers) : new org.springframework.http.HttpEntity<>(body, headers); + } +} diff --git a/indexer/src/main/java/au/org/aodn/esindexer/service/IndexerService.java b/indexer/src/main/java/au/org/aodn/esindexer/service/IndexerService.java index 4fb77267..64f74700 100644 --- a/indexer/src/main/java/au/org/aodn/esindexer/service/IndexerService.java +++ b/indexer/src/main/java/au/org/aodn/esindexer/service/IndexerService.java @@ -1,5 +1,6 @@ package au.org.aodn.esindexer.service; +import au.org.aodn.esindexer.model.Dataset; import co.elastic.clients.elasticsearch.core.BulkRequest; import co.elastic.clients.elasticsearch.core.BulkResponse; import co.elastic.clients.elasticsearch.core.search.Hit; @@ -20,6 +21,7 @@ interface Callback { void onComplete(Object result); } CompletableFuture> indexMetadata(String metadataValues) throws IOException, FactoryException, TransformException, JAXBException; + CompletableFuture> indexDataset(Dataset dataset); ResponseEntity deleteDocumentByUUID(String uuid) throws IOException; List indexAllMetadataRecordsFromGeoNetwork(String beginWithUuid, boolean confirm, Callback callback) throws IOException; Hit getDocumentByUUID(String uuid) throws IOException; diff --git a/indexer/src/main/java/au/org/aodn/esindexer/service/IndexerServiceImpl.java b/indexer/src/main/java/au/org/aodn/esindexer/service/IndexerServiceImpl.java index c64c4a23..b743ef66 100644 --- a/indexer/src/main/java/au/org/aodn/esindexer/service/IndexerServiceImpl.java +++ b/indexer/src/main/java/au/org/aodn/esindexer/service/IndexerServiceImpl.java @@ -2,6 +2,7 @@ import au.org.aodn.esindexer.configuration.AppConstants; import au.org.aodn.esindexer.exception.*; +import au.org.aodn.esindexer.model.Dataset; import au.org.aodn.esindexer.utils.GcmdKeywordUtils; import au.org.aodn.esindexer.utils.JaxbUtils; import au.org.aodn.metadata.iso19115_3_2018.MDMetadataType; @@ -51,6 +52,7 @@ public class IndexerServiceImpl implements IndexerService { protected String indexName; + protected String datasetIndexName; protected String tokensAnalyserName; protected GeoNetworkService geoNetworkResourceService; protected ElasticsearchClient portalElasticsearchClient; @@ -70,6 +72,7 @@ public class IndexerServiceImpl implements IndexerService { @Autowired public IndexerServiceImpl( @Value("${elasticsearch.index.name}") String indexName, + @Value("${elasticsearch.dataset_index.name}") String datasetIndexName, @Value("${elasticsearch.analyser.tokens.name}") String tokensAnalyserName, ObjectMapper indexerObjectMapper, JaxbUtils jaxbUtils, @@ -82,6 +85,7 @@ public IndexerServiceImpl( GcmdKeywordUtils gcmdKeywordUtils ) { this.indexName = indexName; + this.datasetIndexName = datasetIndexName; this.tokensAnalyserName = tokensAnalyserName; this.indexerObjectMapper = indexerObjectMapper; this.jaxbUtils = jaxbUtils; @@ -252,6 +256,36 @@ public CompletableFuture> indexMetadata(String metadataVa } } + @Override + public CompletableFuture> indexDataset(Dataset dataset) { + try { + + IndexRequest request; + try(InputStream inputStream = new ByteArrayInputStream(indexerObjectMapper.writeValueAsBytes(dataset))){ + log.info("Ingesting a new dataset with UUID: {} to index: {}", dataset.uuid(), indexName); + log.debug("{}", dataset); + + request = IndexRequest.of(builder -> builder + .id(dataset.uuid() + dataset.yearMonth()) + .index(datasetIndexName) + .withJson(inputStream) + ); + + IndexResponse response = portalElasticsearchClient.index(request); + log.info("Dataset with UUID: {} indexed with version: {}", dataset.uuid(), response.version()); + return CompletableFuture.completedFuture(ResponseEntity.status(HttpStatus.OK).body(response.toString())); + + } catch (Exception e) { + log.error(e.getMessage()); + throw new IndexingRecordException(e.getMessage()); + } + + } catch(Exception e) { + log.error(e.getMessage()); + throw new MappingValueException(e.getMessage()); + } + } + public ResponseEntity deleteDocumentByUUID(String uuid) throws IOException { log.info("Deleting document with UUID: {} from index: {}", uuid, indexName); diff --git a/indexer/src/main/resources/application-dev.yaml b/indexer/src/main/resources/application-dev.yaml index 885be539..15c0b4be 100644 --- a/indexer/src/main/resources/application-dev.yaml +++ b/indexer/src/main/resources/application-dev.yaml @@ -18,6 +18,9 @@ geonetwork: index: "records" endpoint: /geonetwork/srv/api/search +dataaccess: + host: http://localhost:5000 + logging: level: au.org.aodn.indexer: DEBUG diff --git a/indexer/src/main/resources/application.yaml b/indexer/src/main/resources/application.yaml index 5f0e2de0..ebaf6972 100644 --- a/indexer/src/main/resources/application.yaml +++ b/indexer/src/main/resources/application.yaml @@ -40,6 +40,8 @@ elasticsearch: name: portal_records vocabs_index: name: vocabs_index + dataset_index: + name: dataset_index analyser: tokens: name: shingle_analyser diff --git a/indexer/src/main/resources/config_files/dataset_index_schema.json b/indexer/src/main/resources/config_files/dataset_index_schema.json new file mode 100644 index 00000000..6e427ea2 --- /dev/null +++ b/indexer/src/main/resources/config_files/dataset_index_schema.json @@ -0,0 +1,33 @@ +{ + "mappings": { + "properties": { + "uuid": { + "type": "keyword" + }, + "year_month": { + "type": "keyword" + }, + "data": { + "type": "nested", + "properties": { + "time": { + "type": "date", + "format": "yyyy-MM-dd" + }, + "longitude": { + "type": "float" + }, + "latitude": { + "type": "float" + }, + "depth": { + "type": "float" + }, + "count": { + "type": "integer" + } + } + } + } + } +} diff --git a/indexer/src/test/java/au/org/aodn/esindexer/service/StacCollectionMapperServiceTest.java b/indexer/src/test/java/au/org/aodn/esindexer/service/StacCollectionMapperServiceTest.java index 954ba909..8a0f6d67 100644 --- a/indexer/src/test/java/au/org/aodn/esindexer/service/StacCollectionMapperServiceTest.java +++ b/indexer/src/test/java/au/org/aodn/esindexer/service/StacCollectionMapperServiceTest.java @@ -105,6 +105,7 @@ public void cleanUp() { public void createIndexerService() throws IOException { indexerService = new IndexerServiceImpl( "any-works", + "any-works-dataset", "shingle_analyser", objectMapper, jaxbUtils,