diff --git a/geonetwork4-api/pom.xml b/geonetwork4-api/pom.xml
index 288016de..7c5e2e1d 100644
--- a/geonetwork4-api/pom.xml
+++ b/geonetwork4-api/pom.xml
@@ -28,6 +28,11 @@
org.glassfish.jaxb
jaxb-runtime
+
+ org.projectlombok
+ lombok
+ true
+
diff --git a/indexer/src/main/java/au/org/aodn/esindexer/model/GeoNetworkField.java b/geonetwork4-api/src/main/java/au/org/aodn/metadata/geonetwork/GeoNetworkField.java
similarity index 72%
rename from indexer/src/main/java/au/org/aodn/esindexer/model/GeoNetworkField.java
rename to geonetwork4-api/src/main/java/au/org/aodn/metadata/geonetwork/GeoNetworkField.java
index 7da75110..8fa66f6d 100644
--- a/indexer/src/main/java/au/org/aodn/esindexer/model/GeoNetworkField.java
+++ b/geonetwork4-api/src/main/java/au/org/aodn/metadata/geonetwork/GeoNetworkField.java
@@ -1,4 +1,4 @@
-package au.org.aodn.esindexer.model;
+package au.org.aodn.metadata.geonetwork;
import lombok.Getter;
diff --git a/indexer/src/main/java/au/org/aodn/esindexer/configuration/AppConstants.java b/indexer/src/main/java/au/org/aodn/esindexer/configuration/AppConstants.java
index df00f348..cfef4039 100644
--- a/indexer/src/main/java/au/org/aodn/esindexer/configuration/AppConstants.java
+++ b/indexer/src/main/java/au/org/aodn/esindexer/configuration/AppConstants.java
@@ -2,7 +2,7 @@
public interface AppConstants {
String PORTAL_RECORDS_MAPPING_JSON_FILE = "portal_records_index_schema.json";
- String DATASET_INDEX_MAPPING_JSON_FILE = "dataset_index_schema.json";
+ String DATASET_INDEX_MAPPING_JSON_FILE = "data_index_schema.json";
String FORMAT_XML = "xml";
String FORMAT_ISO19115_3_2018 = "iso19115-3.2018";
diff --git a/indexer/src/main/java/au/org/aodn/esindexer/configuration/DatasetAccessConfig.java b/indexer/src/main/java/au/org/aodn/esindexer/configuration/DatasetAccessConfig.java
index 6e88bfdf..c6439dcd 100644
--- a/indexer/src/main/java/au/org/aodn/esindexer/configuration/DatasetAccessConfig.java
+++ b/indexer/src/main/java/au/org/aodn/esindexer/configuration/DatasetAccessConfig.java
@@ -1,17 +1,24 @@
package au.org.aodn.esindexer.configuration;
+import au.org.aodn.esindexer.service.DataAccessService;
import au.org.aodn.esindexer.service.DataAccessServiceImpl;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import org.springframework.web.client.RestTemplate;
@Configuration
public class DatasetAccessConfig {
- @Bean(name = "DataAccessService")
+ @Bean
+ @ConditionalOnMissingBean(DataAccessService.class)
public DataAccessServiceImpl createDataAccessService(
- @Value("${dataaccess.host:defaultForTesting}") String serverUrl
- ){
- return new DataAccessServiceImpl(serverUrl);
+ @Value("${dataaccess.host:http://localhost:5000}") String serverUrl,
+ @Value("${dataaccess.baseUrl:/api/v1/das/}") String baseUrl,
+ @Autowired RestTemplate template){
+
+ return new DataAccessServiceImpl(serverUrl, baseUrl, template);
}
}
diff --git a/indexer/src/main/java/au/org/aodn/esindexer/configuration/ObjectMapperConfig.java b/indexer/src/main/java/au/org/aodn/esindexer/configuration/ObjectMapperConfig.java
index 0e9ae7a2..d7b166a5 100644
--- a/indexer/src/main/java/au/org/aodn/esindexer/configuration/ObjectMapperConfig.java
+++ b/indexer/src/main/java/au/org/aodn/esindexer/configuration/ObjectMapperConfig.java
@@ -1,10 +1,7 @@
package au.org.aodn.esindexer.configuration;
-import com.fasterxml.jackson.annotation.JsonInclude;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationFeature;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.PropertyNamingStrategy;
+import com.fasterxml.jackson.databind.*;
+import com.fasterxml.jackson.databind.json.JsonMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -12,13 +9,13 @@
public class ObjectMapperConfig {
@Bean("indexerObjectMapper")
public static ObjectMapper objectMapper() {
- ObjectMapper objectMapper = new ObjectMapper();
-
- // Enable pretty printing for JSON output
- objectMapper.enable(SerializationFeature.INDENT_OUTPUT);
-
- // Ignore unknown properties during deserialization
- objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
+ ObjectMapper objectMapper = JsonMapper.builder()
+ .enable(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES)
+ // Enable pretty printing for JSON output
+ .enable(SerializationFeature.INDENT_OUTPUT)
+ // Ignore unknown properties during deserialization
+ .disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
+ .build();
// Use a specific date format for serialization and deserialization (if needed)
// objectMapper.setDateFormat(new SimpleDateFormat("yyyy-MM-dd"));
diff --git a/indexer/src/main/java/au/org/aodn/esindexer/controller/IndexerController.java b/indexer/src/main/java/au/org/aodn/esindexer/controller/IndexerController.java
index ac31331d..cd0a2eae 100644
--- a/indexer/src/main/java/au/org/aodn/esindexer/controller/IndexerController.java
+++ b/indexer/src/main/java/au/org/aodn/esindexer/controller/IndexerController.java
@@ -1,8 +1,7 @@
package au.org.aodn.esindexer.controller;
-import au.org.aodn.esindexer.service.DataAccessService;
-import au.org.aodn.esindexer.service.GeoNetworkService;
-import au.org.aodn.esindexer.service.IndexerService;
+import au.org.aodn.esindexer.model.TemporalExtent;
+import au.org.aodn.esindexer.service.*;
import co.elastic.clients.elasticsearch.core.BulkResponse;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.swagger.v3.oas.annotations.Operation;
@@ -19,10 +18,9 @@
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
-import java.util.ArrayList;
+import java.time.LocalDate;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
@RestController
@RequestMapping(value = "/api/v1/indexer/index")
@@ -31,7 +29,10 @@
public class IndexerController {
@Autowired
- IndexerService indexerService;
+ IndexerMetadataService indexerMetadata;
+
+ @Autowired
+ IndexCloudOptimizedService indexCloudOptimizedData;
@Autowired
GeoNetworkService geonetworkResourceService;
@@ -51,7 +52,7 @@ public ResponseEntity getMetadataRecordFromGeoNetworkByUUID(@PathVariabl
@Operation(description = "Get a document from portal index by UUID")
public ResponseEntity getDocumentByUUID(@PathVariable("uuid") String uuid) throws IOException {
log.info("getting a document form portal by UUID: {}", uuid);
- ObjectNode response = indexerService.getDocumentByUUID(uuid).source();
+ ObjectNode response = indexerMetadata.getDocumentByUUID(uuid).source();
return ResponseEntity.status(HttpStatus.OK).body(response);
}
/**
@@ -69,13 +70,12 @@ public ResponseEntity indexAllMetadataRecords(
@RequestParam(value = "confirm", defaultValue = "false") Boolean confirm,
@RequestParam(value = "beginWithUuid", required=false) String beginWithUuid) throws IOException {
- List responses = indexerService.indexAllMetadataRecordsFromGeoNetwork(beginWithUuid, confirm, null);
+ List responses = indexerMetadata.indexAllMetadataRecordsFromGeoNetwork(beginWithUuid, confirm, null);
return ResponseEntity.ok(responses.toString());
}
/**
* Emit result to FE so it will not result in gateway time-out. You need to run it with postman or whatever tools
* support server side event, the content type needs to be text/event-stream in order to work
- *
* Noted: There is a bug in postman desktop, so either you run postman using web-browser with agent directly
* or you need to have version 10.2 or above in order to get the emitted result
*
@@ -90,8 +90,65 @@ public SseEmitter indexAllMetadataRecordsAsync(
@RequestParam(value = "beginWithUuid", required=false) String beginWithUuid) {
final SseEmitter emitter = new SseEmitter(0L); // 0L means no timeout;
+ final IndexService.Callback callback = createCallback(emitter);
+
+ new Thread(() -> {
+ try {
+ indexerMetadata.indexAllMetadataRecordsFromGeoNetwork(beginWithUuid, confirm, callback);
+ }
+ catch(IOException e) {
+ emitter.completeWithError(e);
+ }
+ }).start();
+
+ return emitter;
+ }
+
+ @PostMapping(path="/{uuid}", produces = "application/json")
+ @Operation(security = { @SecurityRequirement(name = "X-API-Key") }, description = "Index a metadata record by UUID")
+ public ResponseEntity addDocumentByUUID(@PathVariable("uuid") String uuid) throws IOException, FactoryException, JAXBException, TransformException {
+ String metadataValues = geonetworkResourceService.searchRecordBy(uuid);
+
+ CompletableFuture> f = indexerMetadata.indexMetadata(metadataValues);
+ // Return when done make it back to sync instead of async
+ return f.join();
+ }
+
+ @DeleteMapping(path="/{uuid}", produces = "application/json")
+ @Operation(security = { @SecurityRequirement(name = "X-API-Key") }, description = "Delete a metadata record by UUID")
+ public ResponseEntity deleteDocumentByUUID(@PathVariable("uuid") String uuid) throws IOException {
+ return indexerMetadata.deleteDocumentByUUID(uuid);
+ }
+
+ @PostMapping(path="/{uuid}/dataset", produces = "application/json")
+ @Operation(security = {@SecurityRequirement(name = "X-API-Key") }, description = "Index a dataset by UUID")
+ public SseEmitter indexDatasetByUUID(@PathVariable("uuid") String uuid) {
+
+ final SseEmitter emitter = new SseEmitter(0L); // 0L means no timeout;
+ final IndexService.Callback callback = createCallback(emitter);
- IndexerService.Callback callback = new IndexerService.Callback() {
+ new Thread(() -> {
+ try {
+ List temporalExtents = dataAccessService.getTemporalExtentOf(uuid);
+ if (!temporalExtents.isEmpty()) {
+ // Only first block works from dataservice api
+ LocalDate startDate = temporalExtents.get(0).getLocalStartDate();
+ LocalDate endDate = temporalExtents.get(0).getLocalEndDate();
+ log.info("Indexing dataset with UUID: {} from {} to {}", uuid, startDate, endDate);
+
+ indexCloudOptimizedData.indexCloudOptimizedData(uuid, startDate, endDate, callback);
+ }
+ }
+ finally {
+ emitter.complete();
+ }
+ }).start();
+
+ return emitter;
+ }
+
+ protected IndexerMetadataService.Callback createCallback(SseEmitter emitter) {
+ return new IndexService.Callback() {
@Override
public void onProgress(Object update) {
try {
@@ -102,8 +159,7 @@ public void onProgress(Object update) {
.name("Indexer update event");
emitter.send(event);
- }
- catch (IOException e) {
+ } catch (IOException e) {
// In case of fail, try close the stream, if it cannot be closed. (likely stream terminated
// already, the load error out and we need to result from a particular uuid.
emitter.completeWithError(e);
@@ -121,57 +177,10 @@ public void onComplete(Object result) {
emitter.send(event);
emitter.complete();
- }
- catch (IOException e) {
+ } catch (IOException e) {
emitter.completeWithError(e);
}
}
};
-
- new Thread(() -> {
- try {
- indexerService.indexAllMetadataRecordsFromGeoNetwork(beginWithUuid, confirm, callback);
- }
- catch(IOException e) {
- emitter.completeWithError(e);
- }
- }).start();
-
- return emitter;
- }
-
- @PostMapping(path="/{uuid}", produces = "application/json")
- @Operation(security = { @SecurityRequirement(name = "X-API-Key") }, description = "Index a metadata record by UUID")
- public ResponseEntity addDocumentByUUID(@PathVariable("uuid") String uuid) throws IOException, FactoryException, JAXBException, TransformException, ExecutionException, InterruptedException {
- String metadataValues = geonetworkResourceService.searchRecordBy(uuid);
-
- CompletableFuture> f = indexerService.indexMetadata(metadataValues);
- // Return when done make it back to sync instead of async
- return f.join();
- }
-
- @DeleteMapping(path="/{uuid}", produces = "application/json")
- @Operation(security = { @SecurityRequirement(name = "X-API-Key") }, description = "Delete a metadata record by UUID")
- public ResponseEntity deleteDocumentByUUID(@PathVariable("uuid") String uuid) throws IOException {
- return indexerService.deleteDocumentByUUID(uuid);
- }
-
- @PostMapping(path="/{uuid}/dataset", produces = "application/json")
- @Operation(security = {@SecurityRequirement(name = "X-API-Key") }, description = "Index a dataset by UUID")
- public ResponseEntity> indexDatasetByUUID(@PathVariable("uuid") String uuid) {
-
- var temporalExtents = dataAccessService.getTemporalExtentOf(uuid);
- var startDate = temporalExtents.getStartDate();
- var endDate = temporalExtents.getEndDate();
- log.info("Indexing dataset with UUID: {} from {} to {}", uuid, startDate, endDate);
-
- var responses = indexerService.indexDataset(uuid, startDate, endDate);
-
- List result = new ArrayList<>();
- for (BulkResponse response : responses) {
- result.add(response.toString());
- }
-
- return ResponseEntity.ok(result);
}
}
diff --git a/indexer/src/main/java/au/org/aodn/esindexer/model/CloudOptimizedEntry.java b/indexer/src/main/java/au/org/aodn/esindexer/model/CloudOptimizedEntry.java
new file mode 100644
index 00000000..fa961d87
--- /dev/null
+++ b/indexer/src/main/java/au/org/aodn/esindexer/model/CloudOptimizedEntry.java
@@ -0,0 +1,127 @@
+package au.org.aodn.esindexer.model;
+
+import com.fasterxml.jackson.annotation.*;
+import lombok.Data;
+
+import java.math.BigDecimal;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeParseException;
+import java.time.temporal.Temporal;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * A container that represent the data entry from cloud optimize, if you add fields, please
+ * update the hashCode() and equals()
+ */
+@Data
+@JsonInclude(JsonInclude.Include.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class CloudOptimizedEntry {
+
+ static final BigDecimal MIN = new BigDecimal(Double.MIN_VALUE);
+
+ @JsonIgnore
+ protected DateTimeFormatter DATETIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+
+ @JsonIgnore
+ protected DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd");
+
+ @JsonIgnore
+ protected Temporal time;
+
+ @JsonIgnore
+ protected BigDecimal longitude;
+
+ @JsonIgnore
+ protected BigDecimal latitude;
+
+ @JsonIgnore
+ protected BigDecimal depth;
+
+ @JsonIgnore
+ public ZonedDateTime getZonedDateTime() {
+ return ((LocalDateTime)this.time).atZone(ZoneOffset.UTC);
+ }
+
+ @JsonProperty("depth")
+ public void setDepth(Double v) {
+ this.depth = new BigDecimal(v);
+ }
+
+ @JsonProperty("longitude")
+ public void setLongitude(Double v) {
+ this.longitude = new BigDecimal(v);
+ }
+
+ @JsonProperty("latitude")
+ public void setLatitude(Double v) {
+ this.latitude = new BigDecimal(v);
+ }
+
+ @JsonProperty("time")
+ public void setTime(String time) {
+ try {
+ this.time = LocalDateTime.parse(time, DATETIME_FORMATTER);
+ }
+ catch(DateTimeParseException pe) {
+ this.time = LocalDateTime.parse(time, DATE_FORMATTER);
+ }
+ }
+ /**
+ * Must use function as child class may override functions
+ * @return - The hashcode
+ */
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ this.getTime(),
+ this.getLongitude(),
+ this.getLatitude(),
+ this.getDepth()
+ );
+ }
+ /**
+ * Must use function as child class may override functions
+ * @param obj - Input
+ * @return - Compare result
+ */
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ else if (obj instanceof CloudOptimizedEntry that) {
+ int compareLong = Optional
+ .ofNullable(that.getLongitude())
+ .orElse(MIN)
+ .compareTo(Optional.ofNullable(getLongitude())
+ .orElse(MIN));
+
+ int compareLat = Optional
+ .ofNullable(that.getLatitude())
+ .orElse(MIN)
+ .compareTo(Optional.ofNullable(getLatitude())
+ .orElse(MIN));
+
+ int compareDepth = Optional
+ .ofNullable(that.getDepth())
+ .orElse(MIN)
+ .compareTo(Optional.ofNullable(getDepth())
+ .orElse(MIN));
+
+ boolean compareTime = Objects.equals(that.getTime(), getTime());
+
+ return compareLong == 0 &&
+ compareLat == 0 &&
+ compareDepth == 0 &&
+ compareTime;
+ }
+ else {
+ return false;
+ }
+ }
+}
diff --git a/indexer/src/main/java/au/org/aodn/esindexer/model/CloudOptimizedEntryReducePrecision.java b/indexer/src/main/java/au/org/aodn/esindexer/model/CloudOptimizedEntryReducePrecision.java
new file mode 100644
index 00000000..b6576068
--- /dev/null
+++ b/indexer/src/main/java/au/org/aodn/esindexer/model/CloudOptimizedEntryReducePrecision.java
@@ -0,0 +1,58 @@
+package au.org.aodn.esindexer.model;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.time.*;
+import java.time.format.DateTimeParseException;
+
+/**
+ * This class modified the time entry and consider the Year and Month only, and reduce precision it is used
+ * later to aggregate data based on year ond month only
+ */
+public class CloudOptimizedEntryReducePrecision extends CloudOptimizedEntry {
+
+ @Override
+ public void setTime(String time) {
+ try {
+ LocalDateTime l = LocalDateTime.parse(time, DATETIME_FORMATTER);
+ this.time = YearMonth.of(l.getYear(), l.getMonth());
+ }
+ catch(DateTimeParseException pe) {
+ LocalDate l = LocalDate.parse(time, DATE_FORMATTER);
+ this.time = YearMonth.of(l.getYear(), l.getMonth());
+ }
+ }
+
+ @Override
+ public void setLongitude(Double v) {
+ this.longitude = new BigDecimal(v).setScale(2, RoundingMode.HALF_UP);
+ }
+
+ @Override
+ public void setLatitude(Double v) {
+ this.latitude = new BigDecimal(v).setScale(2, RoundingMode.HALF_UP);
+ }
+ /**
+ * Round to 10th position, with two decimal place so later parse will generate .00
+ * @param v - Input
+ */
+ @Override
+ public void setDepth(Double v) {
+ this.depth = new BigDecimal(v)
+ .divide(BigDecimal.TEN, RoundingMode.HALF_UP)
+ .setScale(0, RoundingMode.HALF_UP)
+ .multiply(BigDecimal.TEN)
+ .setScale(2, RoundingMode.HALF_UP);
+ }
+ /**
+ * Return a zoned datetime in this case because we use YearMonth internally, we set the day to 1 and all time zero
+ * @return - A ZonedDateTime at day 1 time 0
+ */
+ @Override
+ public ZonedDateTime getZonedDateTime() {
+ return ((YearMonth)this.time)
+ .atDay(1)
+ .atTime(0,0,0)
+ .atZone(ZoneOffset.UTC);
+ }
+}
diff --git a/indexer/src/main/java/au/org/aodn/esindexer/model/DatasetEsEntry.java b/indexer/src/main/java/au/org/aodn/esindexer/model/DatasetEsEntry.java
index 5b91fc98..f8514fe6 100644
--- a/indexer/src/main/java/au/org/aodn/esindexer/model/DatasetEsEntry.java
+++ b/indexer/src/main/java/au/org/aodn/esindexer/model/DatasetEsEntry.java
@@ -5,6 +5,6 @@
public record DatasetEsEntry(
String uuid,
String yearMonth,
- List data) {
+ List data) {
}
diff --git a/indexer/src/main/java/au/org/aodn/esindexer/model/DatasetProvider.java b/indexer/src/main/java/au/org/aodn/esindexer/model/DatasetProvider.java
index fd3aae2b..85903605 100644
--- a/indexer/src/main/java/au/org/aodn/esindexer/model/DatasetProvider.java
+++ b/indexer/src/main/java/au/org/aodn/esindexer/model/DatasetProvider.java
@@ -1,21 +1,24 @@
package au.org.aodn.esindexer.model;
import au.org.aodn.esindexer.service.DataAccessService;
+import au.org.aodn.stac.model.StacItemModel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.util.StopWatch;
import java.time.LocalDate;
import java.time.YearMonth;
-import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
public class DatasetProvider {
- private final String uuid;
- private YearMonth currentYearMonth;
- private final YearMonth endYearMonth;
- private final DataAccessService dataAccessService;
+ protected Logger log = LoggerFactory.getLogger(DatasetProvider.class);
+ protected final String uuid;
+ protected YearMonth currentYearMonth;
+ protected final YearMonth endYearMonth;
+ protected final DataAccessService dataAccessService;
public DatasetProvider(String uuid, LocalDate startDate, LocalDate endDate, DataAccessService dataAccessService) {
this.uuid = uuid;
@@ -24,7 +27,7 @@ public DatasetProvider(String uuid, LocalDate startDate, LocalDate endDate, Data
this.endYearMonth = YearMonth.from(endDate);
}
- public Iterable getIterator() {
+ public Iterable> getIterator() {
return () -> new Iterator<>() {
@Override
public boolean hasNext() {
@@ -32,27 +35,27 @@ public boolean hasNext() {
}
@Override
- public DatasetEsEntry next() {
+ public List next() {
// please keep it for a while since it benefits the performance optimisation
StopWatch timer = new StopWatch();
- timer.start("Data querying");
+ timer.start(String.format("Data querying for %s %s", currentYearMonth.getYear(), currentYearMonth.getMonth()));
if (!hasNext()) {
throw new NoSuchElementException();
}
- List data = Arrays.stream(dataAccessService.getIndexingDatasetBy(
+ List data = dataAccessService.getIndexingDatasetBy(
uuid,
LocalDate.of(currentYearMonth.getYear(), currentYearMonth.getMonthValue(), 1),
LocalDate.of(currentYearMonth.getYear(), currentYearMonth.getMonthValue(), currentYearMonth.lengthOfMonth())
- )).toList();
+ );
currentYearMonth = currentYearMonth.plusMonths(1);
if (data.isEmpty()) {
return null;
}
timer.stop();
- System.out.println(timer.prettyPrint());
- return new DatasetEsEntry(uuid, currentYearMonth.toString(), data);
+ log.info(timer.prettyPrint());
+ return data;
}
};
}
diff --git a/indexer/src/main/java/au/org/aodn/esindexer/model/Datum.java b/indexer/src/main/java/au/org/aodn/esindexer/model/Datum.java
deleted file mode 100644
index 0a403d5a..00000000
--- a/indexer/src/main/java/au/org/aodn/esindexer/model/Datum.java
+++ /dev/null
@@ -1,58 +0,0 @@
-package au.org.aodn.esindexer.model;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonInclude;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import lombok.Data;
-
-// If more fields are needed to be filtered, please add more columns here
-// and don't forget updating the override equals() method
-@Data
-@JsonInclude(JsonInclude.Include.NON_NULL)
-public class Datum {
-
- private String time;
- private Double longitude;
- private Double latitude;
- private Double depth;
-
- private long count;
-
- @JsonCreator
- public Datum(
- @JsonProperty("time") String time,
- @JsonProperty("longitude") Double longitude,
- @JsonProperty("latitude") Double latitude,
- @JsonProperty("depth") Double depth) {
- this.time = time;
- this.longitude = longitude;
- this.latitude = latitude;
- this.depth = depth;
- this.count = 1L;
- }
-
- // putting all same record into one instance and increment the count is more efficient
- public void incrementCount() {
- count++;
- }
-
- // Don't include variable "count" in the equals() method.
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null || getClass() != obj.getClass()) {
- return false;
- }
- Datum that = (Datum) obj;
-
- return Double.compare(that.longitude, longitude) == 0 &&
- Double.compare(that.latitude, latitude) == 0 &&
- Double.compare(
- that.depth != null? that.depth : 0.0,
- depth != null? depth : 0.0
- ) == 0 &&
- time.equals(that.time);
- }
-}
diff --git a/indexer/src/main/java/au/org/aodn/esindexer/model/MediaType.java b/indexer/src/main/java/au/org/aodn/esindexer/model/MediaType.java
deleted file mode 100644
index b1253621..00000000
--- a/indexer/src/main/java/au/org/aodn/esindexer/model/MediaType.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package au.org.aodn.esindexer.model;
-
-import lombok.Getter;
-
-@Getter
-public enum MediaType {
- TEXT_HTML("text/html"),
- IMAGE_PNG("image/png"),
- APPLICATION_JSON("application/json"),
- ;
-
- private final String value;
-
- MediaType(String value) {
- this.value = value;
- }
-}
diff --git a/indexer/src/main/java/au/org/aodn/esindexer/model/RelationType.java b/indexer/src/main/java/au/org/aodn/esindexer/model/RelationType.java
index 74869845..546072b3 100644
--- a/indexer/src/main/java/au/org/aodn/esindexer/model/RelationType.java
+++ b/indexer/src/main/java/au/org/aodn/esindexer/model/RelationType.java
@@ -1,7 +1,7 @@
package au.org.aodn.esindexer.model;
import lombok.Getter;
-
+// TODO: Should use some lib provide value or move it to stacmodel folder
@Getter
public enum RelationType {
SELF("self"),
diff --git a/indexer/src/main/java/au/org/aodn/esindexer/model/TemporalExtent.java b/indexer/src/main/java/au/org/aodn/esindexer/model/TemporalExtent.java
index eac9d51d..31af6f39 100644
--- a/indexer/src/main/java/au/org/aodn/esindexer/model/TemporalExtent.java
+++ b/indexer/src/main/java/au/org/aodn/esindexer/model/TemporalExtent.java
@@ -1,23 +1,35 @@
package au.org.aodn.esindexer.model;
+import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.Setter;
import java.time.LocalDate;
+import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
+@Builder
+@Setter
+@Getter
public class TemporalExtent{
+
+ static DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssZ");
+
@JsonProperty("start_date")
- String startDate;
+ protected String startDate;
+
@JsonProperty("end_date")
- String endDate;
+ protected String endDate;
- public LocalDate getStartDate() {
- DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
- return LocalDate.parse(startDate, formatter);
+ @JsonIgnore
+ public LocalDate getLocalStartDate() {
+ return ZonedDateTime.parse(startDate, DATE_FORMAT).toLocalDate();
}
- public LocalDate getEndDate() {
- DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
- return LocalDate.parse(endDate, formatter);
+ @JsonIgnore
+ public LocalDate getLocalEndDate() {
+ return ZonedDateTime.parse(endDate, DATE_FORMAT).toLocalDate();
}
}
diff --git a/indexer/src/main/java/au/org/aodn/esindexer/service/DataAccessService.java b/indexer/src/main/java/au/org/aodn/esindexer/service/DataAccessService.java
index cb782643..4030d211 100644
--- a/indexer/src/main/java/au/org/aodn/esindexer/service/DataAccessService.java
+++ b/indexer/src/main/java/au/org/aodn/esindexer/service/DataAccessService.java
@@ -1,15 +1,12 @@
package au.org.aodn.esindexer.service;
-import au.org.aodn.esindexer.model.Datum;
import au.org.aodn.esindexer.model.TemporalExtent;
+import au.org.aodn.stac.model.StacItemModel;
import java.time.LocalDate;
+import java.util.List;
public interface DataAccessService {
- Datum[] getIndexingDatasetBy(String uuid, LocalDate startDate, LocalDate endDate);
- String getServiceUrl();
- void setServiceUrl(String url);
-
-
- TemporalExtent getTemporalExtentOf(String uuid);
+ List getIndexingDatasetBy(String uuid, LocalDate startDate, LocalDate endDate);
+ List getTemporalExtentOf(String uuid);
}
diff --git a/indexer/src/main/java/au/org/aodn/esindexer/service/DataAccessServiceImpl.java b/indexer/src/main/java/au/org/aodn/esindexer/service/DataAccessServiceImpl.java
index 73f81e14..34bb68f9 100644
--- a/indexer/src/main/java/au/org/aodn/esindexer/service/DataAccessServiceImpl.java
+++ b/indexer/src/main/java/au/org/aodn/esindexer/service/DataAccessServiceImpl.java
@@ -1,43 +1,38 @@
package au.org.aodn.esindexer.service;
import au.org.aodn.esindexer.exception.MetadataNotFoundException;
-import au.org.aodn.esindexer.model.Datum;
+import au.org.aodn.esindexer.model.CloudOptimizedEntry;
+import au.org.aodn.esindexer.model.CloudOptimizedEntryReducePrecision;
import au.org.aodn.esindexer.model.TemporalExtent;
-import org.springframework.beans.factory.annotation.Autowired;
+import au.org.aodn.esindexer.utils.GeometryUtils;
+import au.org.aodn.stac.model.StacItemModel;
+import lombok.Getter;
+import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.*;
import org.springframework.web.client.HttpClientErrorException;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.util.UriComponentsBuilder;
import java.time.LocalDate;
-import java.util.ArrayList;
+import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
+@Getter
public class DataAccessServiceImpl implements DataAccessService {
- private String serverUrl;
+ protected String accessEndPoint;
+ protected RestTemplate restTemplate;
- @Override
- public String getServiceUrl() {
- return serverUrl;
- }
-
- @Override
- public void setServiceUrl(String url) {
- this.serverUrl = url;
- }
-
- public DataAccessServiceImpl(String serverUrl) {
- setServiceUrl(serverUrl);
+ public DataAccessServiceImpl(String serverUrl, String baseUrl, RestTemplate restTemplate) {
+ this.accessEndPoint = serverUrl + baseUrl;
+ this.restTemplate = restTemplate;
}
- @Autowired
- private RestTemplate restTemplate;
-
@Override
- public Datum[] getIndexingDatasetBy(String uuid, LocalDate startDate, LocalDate endDate) {
+ public List getIndexingDatasetBy(String uuid, LocalDate startDate, LocalDate endDate) {
// currently, we force to get data in the same year to simplify the logic
if (startDate.getYear() != endDate.getYear()) {
@@ -54,36 +49,35 @@ public Datum[] getIndexingDatasetBy(String uuid, LocalDate startDate, LocalDate
.queryParam("is_to_index", "true")
.queryParam("start_date", startDate)
.queryParam("end_date", endDate)
+ .queryParam("columns", List.of("TIME","DEPTH","LONGITUDE","LATITUDE"))
.buildAndExpand(uuid)
.toUriString();
- ResponseEntity responseEntity = restTemplate.exchange(
+ ResponseEntity> responseEntity = restTemplate.exchange(
url,
HttpMethod.GET,
request,
- Datum[].class,
+ new ParameterizedTypeReference<>() {},
params
);
if (responseEntity.getStatusCode().is2xxSuccessful()) {
- List data = new ArrayList<>();
if (responseEntity.getBody() != null) {
- data = List.of(responseEntity.getBody());
+ return toStacItemModel(uuid, aggregateData(responseEntity.getBody()));
}
- var dataToIndex = aggregateData(data);
- return dataToIndex.toArray(new Datum[0]);
}
throw new RuntimeException("Unable to retrieve dataset with UUID: " + uuid );
-
- } catch (HttpClientErrorException.NotFound e) {
+ }
+ catch (HttpClientErrorException.NotFound e) {
throw new MetadataNotFoundException("Unable to find dataset with UUID: " + uuid + " in GeoNetwork");
- } catch (Exception e) {
+ }
+ catch (Exception e) {
throw new RuntimeException("Exception thrown while retrieving dataset with UUID: " + uuid + e.getMessage(), e);
}
}
@Override
- public TemporalExtent getTemporalExtentOf(String uuid) {
+ public List getTemporalExtentOf(String uuid) {
try {
HttpEntity request = getRequestEntity(null, null);
@@ -94,11 +88,11 @@ public TemporalExtent getTemporalExtentOf(String uuid) {
.buildAndExpand(uuid)
.toUriString();
- ResponseEntity responseEntity = restTemplate.exchange(
+ ResponseEntity> responseEntity = restTemplate.exchange(
url,
HttpMethod.GET,
request,
- TemporalExtent.class,
+ new ParameterizedTypeReference<>() {},
params
);
@@ -110,29 +104,51 @@ public TemporalExtent getTemporalExtentOf(String uuid) {
throw new RuntimeException("Exception thrown while retrieving dataset with UUID: " + uuid + e.getMessage(), e);
}
}
-
/**
* Summarize the data by counting the number if all the concerned fields are the same
* @param data the data to summarize
* @return the summarized data
*/
- private List aggregateData(List data) {
- var aggregatedData = new ArrayList();
- for (var datum: data) {
- if (aggregatedData.contains(datum)) {
- var existingDatum = aggregatedData.get(aggregatedData.indexOf(datum));
- existingDatum.incrementCount();
- } else {
- aggregatedData.add(datum);
- }
- }
- return aggregatedData;
+ protected Map extends CloudOptimizedEntry, Long> aggregateData(List extends CloudOptimizedEntry> data) {
+ return data.stream()
+ .collect(Collectors.groupingBy(
+ d -> d,
+ Collectors.counting()
+ ));
}
-
- private String getDataAccessEndpoint() {
- return getServiceUrl() + "/api/v1/das/";
+ /**
+ * Group and count the entries based on user object equals/hashcode
+ * @param uuid - The parent uuid that associate with input
+ * @param data - The aggregated data
+ * @return - List of formatted stac item
+ */
+ protected List toStacItemModel(String uuid, Map extends CloudOptimizedEntry, Long> data) {
+ return data.entrySet().stream()
+ .filter(d -> d.getKey().getLongitude() != null && d.getKey().getLatitude() != null)
+ .map(d ->
+ StacItemModel.builder()
+ .collection(uuid) // collection point to the uuid of parent
+ .uuid(String
+ .join("|",
+ uuid,
+ d.getKey().getTime().toString(),
+ d.getKey().getLongitude().toString(),
+ d.getKey().getLatitude().toString(),
+ d.getKey().getDepth().toString()
+ )
+ )
+ .geometry(GeometryUtils.createGeoJson(d.getKey().getLongitude(), d.getKey().getLatitude(), d.getKey().getDepth()))
+ .properties(Map.of(
+ "count", d.getValue(),
+ "time", d.getKey().getZonedDateTime().format(DateTimeFormatter.ISO_ZONED_DATE_TIME)))
+ .build()
+ )
+ .toList();
}
+ protected String getDataAccessEndpoint() {
+ return this.accessEndPoint;
+ }
// parameters are not in use for now. May be useful in the future so just keep it
protected HttpEntity getRequestEntity(MediaType accept, String body) {
@@ -143,6 +159,6 @@ protected HttpEntity getRequestEntity(MediaType accept, String body) {
MediaType.valueOf("application/*+json"),
MediaType.ALL
));
- return body == null ? new org.springframework.http.HttpEntity<>(headers) : new org.springframework.http.HttpEntity<>(body, headers);
+ return body == null ? new HttpEntity<>(headers) : new HttpEntity<>(body, headers);
}
}
diff --git a/indexer/src/main/java/au/org/aodn/esindexer/service/IndexCloudOptimizedService.java b/indexer/src/main/java/au/org/aodn/esindexer/service/IndexCloudOptimizedService.java
new file mode 100644
index 00000000..0f5e7879
--- /dev/null
+++ b/indexer/src/main/java/au/org/aodn/esindexer/service/IndexCloudOptimizedService.java
@@ -0,0 +1,11 @@
+package au.org.aodn.esindexer.service;
+
+import co.elastic.clients.elasticsearch.core.BulkResponse;
+
+import java.time.LocalDate;
+import java.util.List;
+
+public interface IndexCloudOptimizedService extends IndexService {
+ List indexCloudOptimizedData(String uuid, LocalDate startDate, LocalDate endDate, IndexService.Callback callback);
+
+}
diff --git a/indexer/src/main/java/au/org/aodn/esindexer/service/IndexCloudOptimizedServiceImpl.java b/indexer/src/main/java/au/org/aodn/esindexer/service/IndexCloudOptimizedServiceImpl.java
new file mode 100644
index 00000000..ca083613
--- /dev/null
+++ b/indexer/src/main/java/au/org/aodn/esindexer/service/IndexCloudOptimizedServiceImpl.java
@@ -0,0 +1,88 @@
+package au.org.aodn.esindexer.service;
+
+import au.org.aodn.esindexer.model.DatasetProvider;
+import au.org.aodn.stac.model.StacItemModel;
+import co.elastic.clients.elasticsearch.ElasticsearchClient;
+import co.elastic.clients.elasticsearch.core.BulkResponse;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Lazy;
+import org.springframework.context.annotation.Scope;
+import org.springframework.context.annotation.ScopedProxyMode;
+import org.springframework.stereotype.Service;
+
+import java.time.LocalDate;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+@Slf4j
+@Service
+@Scope(proxyMode = ScopedProxyMode.TARGET_CLASS)
+public class IndexCloudOptimizedServiceImpl extends IndexServiceImpl implements IndexCloudOptimizedService {
+
+ protected DataAccessService dataAccessService;
+ protected ObjectMapper indexerObjectMapper;
+ protected String indexName;
+
+ @Lazy
+ @Autowired
+ protected IndexCloudOptimizedServiceImpl self;
+
+ @Autowired
+ public IndexCloudOptimizedServiceImpl(
+ @Value("${elasticsearch.cloud_optimized_index.name}") String indexName,
+ @Qualifier("portalElasticsearchClient") ElasticsearchClient elasticsearchClient,
+ ObjectMapper indexerObjectMapper,
+ DataAccessService dataAccessService) {
+
+ super(elasticsearchClient, indexerObjectMapper);
+
+ this.indexName = indexName;
+ this.indexerObjectMapper = indexerObjectMapper;
+ this.dataAccessService = dataAccessService;
+ }
+ /**
+ * Index the cloud optimized data
+ * @param uuid - The UUID of data you want to index
+ * @param startDate - The start range to index
+ * @param endDate - THe end range to index
+ * @return - The index result
+ */
+ @Override
+ public List indexCloudOptimizedData(String uuid, LocalDate startDate, LocalDate endDate, IndexService.Callback callback) {
+
+ List responses = new ArrayList<>();
+
+ Iterable> dataset = new DatasetProvider(uuid, startDate, endDate, dataAccessService).getIterator();
+ BulkRequestProcessor bulkRequestProcessor = new BulkRequestProcessor<>(
+ indexName, (item) -> Optional.empty(),self, callback
+ );
+
+ try {
+ for (List entries : dataset) {
+ if (entries != null) {
+ for(StacItemModel entry: entries) {
+ log.debug("add dataset into b with UUID: {} and props: {}", entry.getUuid(), entry.getProperties());
+ bulkRequestProcessor.processItem(entry.getUuid(), entry)
+ .ifPresent(responses::add);
+ }
+ }
+ }
+ bulkRequestProcessor
+ .flush()
+ .ifPresent(responses::add);
+
+ log.info("Finished execute bulk indexing records to index: {}", indexName);
+ callback.onComplete(responses);
+ }
+ catch (Exception e) {
+ log.error("Failed", e);
+ throw new RuntimeException("Exception thrown while indexing dataset with UUID: " + uuid + " | " + e.getMessage(), e);
+ }
+ return responses;
+ }
+}
diff --git a/indexer/src/main/java/au/org/aodn/esindexer/service/IndexService.java b/indexer/src/main/java/au/org/aodn/esindexer/service/IndexService.java
new file mode 100644
index 00000000..b0f7ab84
--- /dev/null
+++ b/indexer/src/main/java/au/org/aodn/esindexer/service/IndexService.java
@@ -0,0 +1,19 @@
+package au.org.aodn.esindexer.service;
+
+import co.elastic.clients.elasticsearch.core.BulkRequest;
+import co.elastic.clients.elasticsearch.core.BulkResponse;
+import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.function.Function;
+
+public interface IndexService {
+ // Event call back to notify caller, this avoid gateway timeout as we have message back to browser
+ interface Callback {
+ void onProgress(Object update);
+ void onComplete(Object result);
+ }
+ long getBatchSize();
+ BulkResponse executeBulk(BulkRequest.Builder bulkRequest, Function> mapper, IndexerMetadataService.Callback callback) throws IOException;
+}
diff --git a/indexer/src/main/java/au/org/aodn/esindexer/service/IndexServiceImpl.java b/indexer/src/main/java/au/org/aodn/esindexer/service/IndexServiceImpl.java
new file mode 100644
index 00000000..f9c36103
--- /dev/null
+++ b/indexer/src/main/java/au/org/aodn/esindexer/service/IndexServiceImpl.java
@@ -0,0 +1,211 @@
+package au.org.aodn.esindexer.service;
+
+import co.elastic.clients.elasticsearch.ElasticsearchClient;
+import co.elastic.clients.elasticsearch.core.BulkRequest;
+import co.elastic.clients.elasticsearch.core.BulkResponse;
+import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.aop.support.AopUtils;
+import org.springframework.http.HttpStatus;
+import org.springframework.retry.annotation.Backoff;
+import org.springframework.retry.annotation.Retryable;
+import org.springframework.web.client.HttpServerErrorException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+
+@Slf4j
+public abstract class IndexServiceImpl implements IndexService {
+
+ protected static final long DEFAULT_BACKOFF_TIME = 3000L;
+ protected ElasticsearchClient elasticClient;
+ protected ObjectMapper indexerObjectMapper;
+
+ /**
+ * This processor help to shield the complexity of bulk save of Elastic search where the batch size cannot be
+ * too large. You keep calling the processItem by adding it new item. Once it reached the max size it will flush
+ * to the Elastic, then it will reset and allow adding new item.
+ * You must call flush at the end so that any remain item will get push
+ * @param
+ */
+ protected class BulkRequestProcessor {
+ private final IndexService proxyImpl;
+ private final Callback callback;
+ private final Function> mapper;
+ private final String indexName;
+
+ private BulkRequest.Builder bulkRequest = new BulkRequest.Builder();
+ private long dataSize = 0;
+ private long total = 0;
+ /**
+ * A class the use to write request by batch to improve writing speed and not to flood the Elastic by too many call.
+ * @param indexName - The elastic index name that store the output json.
+ * @param mapper - An optional mapper to find the original object from source given some input
+ * @param proxyImpl - Must be a IndexService wrapper by the proxy so that the @Retryable call of the executeBulk can be use
+ * @param callback - A callback to the front end to avoid timeout on aws gateway on long run process
+ */
+ BulkRequestProcessor(String indexName, Function> mapper, IndexService proxyImpl, Callback callback) {
+ this.indexName = indexName;
+ this.mapper = mapper;
+ this.callback = callback;
+
+ if(AopUtils.isAopProxy(proxyImpl) || AopUtils.isJdkDynamicProxy(proxyImpl)) {
+ this.proxyImpl = proxyImpl;
+ }
+ else {
+ throw new RuntimeException("Spring AOP component expected, please pass the Autowrired instance");
+ }
+ }
+
+ Optional processItem(String id, T item) throws IOException {
+ if(item != null) {
+ int size = indexerObjectMapper.writeValueAsBytes(item).length;
+
+ // We need to split the batch into smaller size to avoid data too large error in ElasticSearch,
+ // the limit is 10mb, so to make check before document add and push batch if size is too big
+ //
+ // dataSize = 0 is init case, just in case we have a very big doc that exceed the limit
+ // and we have not add it to the bulkRequest, hardcode to 5M which should be safe,
+ // usually it is 5M - 15M
+ //
+ if (dataSize + size > IndexServiceImpl.this.getBatchSize() && dataSize != 0) {
+ if (callback != null) {
+ callback.onProgress(String.format("Execute batch as bulk request is big enough %s", dataSize + size));
+ }
+
+ Optional result = Optional.of(reduceResponse(proxyImpl.executeBulk(bulkRequest, mapper, callback)));
+
+ dataSize = 0;
+ bulkRequest = new BulkRequest.Builder();
+
+ return result;
+ }
+ // Add item to bulk request to Elasticsearch
+ bulkRequest.operations(op -> op
+ .index(idx -> idx
+ .id(id)
+ .index(indexName)
+ .document(item)
+ )
+ );
+ dataSize += size;
+ total++;
+
+ if (callback != null) {
+ callback.onProgress(
+ String.format(
+ "Add uuid %s to batch, batch size is %s, total is %s",
+ id,
+ dataSize,
+ total)
+ );
+ }
+ }
+ return Optional.empty();
+ }
+
+ Optional flush() throws IOException {
+ if(total != 0) {
+ return Optional.of(reduceResponse(proxyImpl.executeBulk(bulkRequest, mapper, callback)));
+ }
+ else {
+ return Optional.empty();
+ }
+ }
+ }
+
+ public IndexServiceImpl(ElasticsearchClient elasticClient, ObjectMapper indexerObjectMapper) {
+ this.elasticClient = elasticClient;
+ this.indexerObjectMapper = indexerObjectMapper;
+ }
+
+ public static BulkResponse reduceResponse(BulkResponse in) {
+ List errors = in.items()
+ .stream()
+ .filter(p -> !(p.status() == HttpStatus.CREATED.value() || p.status() == HttpStatus.OK.value()))
+ .toList();
+
+ return errors.isEmpty() ?
+ BulkResponse.of(f -> f.items(new ArrayList<>()).errors(false).took(in.took())) :
+ BulkResponse.of(f -> f.items(errors).errors(true).took(in.took()));
+ }
+
+ @Override
+ public long getBatchSize() {
+ return 5242880;
+ }
+ /**
+ * Keep retry until success, it is ok to insert docs to elastic again because we use _id as identifier.
+ * In case any service is not available, we will keep retry many times, with 100 retry we try 25 mins which is
+ * big enough for aws process restart.
+ *
+ * @param bulkRequest - The bulk request
+ * @param callback - The event call back to avoid timeout
+ * @return - The bulk insert result
+ * @throws IOException - Exceptions on error
+ * @throws HttpServerErrorException.ServiceUnavailable - Exceptions on geonetwork die or elastic not available
+ */
+ @Retryable(
+ retryFor = {Exception.class, HttpServerErrorException.ServiceUnavailable.class},
+ maxAttempts = 1000,
+ backoff = @Backoff(delay = DEFAULT_BACKOFF_TIME)
+ )
+ @Override
+ public BulkResponse executeBulk(BulkRequest.Builder bulkRequest, Function> mapper, IndexerMetadataService.Callback callback) throws IOException, HttpServerErrorException.ServiceUnavailable {
+ try {
+ // Keep retry until success
+ BulkResponse result = elasticClient.bulk(bulkRequest.build());
+
+ // Flush after insert, otherwise you need to wait for next auto-refresh. It is
+ // especially a problem with autotest, where assert happens very fast.
+ elasticClient.indices().refresh();
+
+ // Report status if success
+ if(callback != null) {
+ callback.onProgress(reduceResponse(result));
+ }
+
+ // Log errors, if any
+ if (!result.items().isEmpty()) {
+ if (result.errors()) {
+ log.error("Bulk load have errors? {}", true);
+ } else {
+ log.info("Bulk load have errors? {}", false);
+ }
+ for (BulkResponseItem item : result.items()) {
+ if (item.error() != null) {
+ Optional i = mapper.apply(item);
+ i.ifPresent(a -> {
+ try {
+ log.error("UUID {} {} {} {}",
+ item.id(),
+ item.error().reason(),
+ item.error().causedBy(),
+ indexerObjectMapper
+ .writerWithDefaultPrettyPrinter()
+ .writeValueAsString(a)
+ );
+ } catch (JsonProcessingException e) {
+ log.error("Fail to convert item with json mapper, {}", item.id());
+ }
+ });
+ }
+ }
+ }
+ return result;
+ }
+ catch(Exception e) {
+ // Report status if not success, this help to keep connection
+ if(callback != null) {
+ callback.onProgress("Exception on bulk save, will retry : " + e.getMessage());
+ }
+ throw e;
+ }
+ }
+
+}
diff --git a/indexer/src/main/java/au/org/aodn/esindexer/service/IndexerService.java b/indexer/src/main/java/au/org/aodn/esindexer/service/IndexerMetadataService.java
similarity index 67%
rename from indexer/src/main/java/au/org/aodn/esindexer/service/IndexerService.java
rename to indexer/src/main/java/au/org/aodn/esindexer/service/IndexerMetadataService.java
index 3b8616fe..d7ca1536 100644
--- a/indexer/src/main/java/au/org/aodn/esindexer/service/IndexerService.java
+++ b/indexer/src/main/java/au/org/aodn/esindexer/service/IndexerMetadataService.java
@@ -1,6 +1,5 @@
package au.org.aodn.esindexer.service;
-import co.elastic.clients.elasticsearch.core.BulkRequest;
import co.elastic.clients.elasticsearch.core.BulkResponse;
import co.elastic.clients.elasticsearch.core.search.Hit;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -10,22 +9,16 @@
import org.springframework.http.ResponseEntity;
import java.io.IOException;
-import java.time.LocalDate;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-public interface IndexerService {
- // Event call back to notify caller, this avoid gateway timeout as we have message back to browser
- interface Callback {
- void onProgress(Object update);
- void onComplete(Object result);
- }
+public interface IndexerMetadataService extends IndexService {
+
CompletableFuture> indexMetadata(String metadataValues) throws IOException, FactoryException, TransformException, JAXBException;
- List indexDataset(String uuid, LocalDate startDate, LocalDate endDate);
ResponseEntity deleteDocumentByUUID(String uuid) throws IOException;
List indexAllMetadataRecordsFromGeoNetwork(String beginWithUuid, boolean confirm, Callback callback) throws IOException;
Hit getDocumentByUUID(String uuid) throws IOException;
+ Hit getDocumentByUUID(String uuid, String indexName) throws IOException;
boolean isMetadataPublished(String uuid);
boolean isGeoNetworkInstanceReinstalled(long portalIndexDocumentsCount);
- BulkResponse executeBulk(BulkRequest.Builder bulkRequest, Callback callback) throws IOException;
}
diff --git a/indexer/src/main/java/au/org/aodn/esindexer/service/IndexerServiceImpl.java b/indexer/src/main/java/au/org/aodn/esindexer/service/IndexerMetadataServiceImpl.java
similarity index 67%
rename from indexer/src/main/java/au/org/aodn/esindexer/service/IndexerServiceImpl.java
rename to indexer/src/main/java/au/org/aodn/esindexer/service/IndexerMetadataServiceImpl.java
index 3421d9e8..851148f2 100644
--- a/indexer/src/main/java/au/org/aodn/esindexer/service/IndexerServiceImpl.java
+++ b/indexer/src/main/java/au/org/aodn/esindexer/service/IndexerMetadataServiceImpl.java
@@ -3,7 +3,6 @@
import au.org.aodn.ardcvocabs.model.VocabModel;
import au.org.aodn.esindexer.configuration.AppConstants;
import au.org.aodn.esindexer.exception.*;
-import au.org.aodn.esindexer.model.DatasetProvider;
import au.org.aodn.esindexer.utils.GcmdKeywordUtils;
import au.org.aodn.esindexer.utils.JaxbUtils;
import au.org.aodn.metadata.iso19115_3_2018.MDMetadataType;
@@ -27,36 +26,31 @@
import org.opengis.referencing.FactoryException;
import org.opengis.referencing.operation.TransformException;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.context.annotation.Scope;
import org.springframework.context.annotation.ScopedProxyMode;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
-import org.springframework.retry.annotation.Backoff;
-import org.springframework.retry.annotation.Retryable;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
-import org.springframework.web.client.HttpServerErrorException;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.time.LocalDate;
import java.util.*;
import java.util.concurrent.*;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
+import java.util.function.Function;
import static au.org.aodn.esindexer.utils.CommonUtils.safeGet;
@Slf4j
@Service
@Scope(proxyMode = ScopedProxyMode.TARGET_CLASS)
-public class IndexerServiceImpl implements IndexerService {
+public class IndexerMetadataServiceImpl extends IndexServiceImpl implements IndexerMetadataService {
protected String indexName;
- protected String datasetIndexName;
protected String tokensAnalyserName;
protected GeoNetworkService geoNetworkResourceService;
protected ElasticsearchClient portalElasticsearchClient;
@@ -66,46 +60,46 @@ public class IndexerServiceImpl implements IndexerService {
protected JaxbUtils jaxbUtils;
protected RankingService rankingService;
protected VocabService vocabService;
- protected DataAccessService dataAccessService;
protected GcmdKeywordUtils gcmdKeywordUtils;
- protected static final long DEFAULT_BACKOFF_TIME = 3000L;
@Lazy
@Autowired
- protected IndexerService self;
+ protected IndexerMetadataService self;
@Autowired
- public IndexerServiceImpl(
+ public IndexerMetadataServiceImpl(
@Value("${elasticsearch.index.name}") String indexName,
- @Value("${elasticsearch.dataset_index.name}") String datasetIndexName,
@Value("${elasticsearch.analyser.tokens.name}") String tokensAnalyserName,
ObjectMapper indexerObjectMapper,
JaxbUtils jaxbUtils,
RankingService rankingService,
GeoNetworkService geoNetworkResourceService,
- ElasticsearchClient portalElasticsearchClient,
+ @Qualifier("portalElasticsearchClient") ElasticsearchClient elasticsearchClient,
ElasticSearchIndexService elasticSearchIndexService,
StacCollectionMapperService stacCollectionMapperService,
VocabService vocabService,
- DataAccessService dataAccessService,
GcmdKeywordUtils gcmdKeywordUtils
) {
+ super(elasticsearchClient, indexerObjectMapper);
+
this.indexName = indexName;
- this.datasetIndexName = datasetIndexName;
this.tokensAnalyserName = tokensAnalyserName;
this.indexerObjectMapper = indexerObjectMapper;
this.jaxbUtils = jaxbUtils;
this.rankingService = rankingService;
this.geoNetworkResourceService = geoNetworkResourceService;
- this.portalElasticsearchClient = portalElasticsearchClient;
+ this.portalElasticsearchClient = elasticsearchClient;
this.elasticSearchIndexService = elasticSearchIndexService;
this.stacCollectionMapperService = stacCollectionMapperService;
this.vocabService = vocabService;
- this.dataAccessService = dataAccessService;
this.gcmdKeywordUtils = gcmdKeywordUtils;
}
public Hit getDocumentByUUID(String uuid) throws IOException {
+ return getDocumentByUUID(uuid, indexName);
+ }
+
+ public Hit getDocumentByUUID(String uuid, String indexName) throws IOException {
try {
SearchResponse response = portalElasticsearchClient
.search(s -> s
@@ -173,7 +167,6 @@ protected StacCollectionModel getMappedMetadataValues(String metadataValues) thr
StacCollectionModel stacCollectionModel = stacCollectionMapperService.mapToSTACCollection(metadataType);
// evaluate completeness
- Integer completeness = rankingService.evaluateCompleteness(stacCollectionModel);
// TODO: in future, evaluate other aspects of the data such as relevance, quality, etc using NLP
/* expand score with other aspect of the data such as relevance, quality, etc.
@@ -182,7 +175,7 @@ protected StacCollectionModel getMappedMetadataValues(String metadataValues) thr
* e.g completeness = 80, relevance = 90, quality = 100
* final score = (80 + 90 + 100) / 3 = 90
*/
- Integer score = completeness;
+ Integer score = rankingService.evaluateCompleteness(stacCollectionModel);
stacCollectionModel.getSummaries().setScore(score);
@@ -293,48 +286,6 @@ public CompletableFuture> indexMetadata(String metadataVa
}
}
- // TODO: Refactor this method later since it uses similar logic as indexAllMetadataRecordsFromGeoNetwork
- @Override
- public List indexDataset(String uuid, LocalDate startDate, LocalDate endDate) {
-
- BulkRequest.Builder bulkRequest = new BulkRequest.Builder();
- List responses = new ArrayList<>();
-
- long dataSize = 0;
- final long maxSize = 5242880; // is 5mb
-
- var dataset = new DatasetProvider(uuid, startDate, endDate, dataAccessService).getIterator();
- try {
- for (var entry : dataset) {
- if (entry == null) {
- continue;
- }
- log.info("add dataset into b with UUID: {} and yearMonth: {}", entry.uuid(), entry.yearMonth());
-
- bulkRequest.operations(operation -> operation.index(
- indexReq -> indexReq
- .id(entry.uuid() + entry.yearMonth())
- .index(datasetIndexName)
- .document(entry)
- ));
- dataSize += indexerObjectMapper.writeValueAsBytes(entry).length;
- if (dataSize > maxSize) {
- log.info("Execute bulk request as bulk request is big enough {}", dataSize);
- responses.add(reduceResponse(self.executeBulk(bulkRequest, null)));
- dataSize = 0;
- bulkRequest = new BulkRequest.Builder();
- }
- }
- log.info("Finished execute bulk indexing records to index: {}", datasetIndexName);
- responses.add(reduceResponse(self.executeBulk(bulkRequest, null)));
- } catch (Exception e) {
- log.error("Failed", e);
- throw new RuntimeException("Exception thrown while indexing dataset with UUID: " + uuid + " | " + e.getMessage(), e);
- }
- return responses;
- }
-
-
public ResponseEntity deleteDocumentByUUID(String uuid) throws IOException {
log.info("Deleting document with UUID: {} from index: {}", uuid, indexName);
try {
@@ -374,11 +325,20 @@ public List indexAllMetadataRecordsFromGeoNetwork(
log.info("Resume indexing records from GeoNetwork at {}", beginWithUuid);
}
- BulkRequest.Builder bulkRequest = new BulkRequest.Builder();
+ Function> mapper = (item) ->
+ {
+ try {
+ return Optional.of(this.getMappedMetadataValues(
+ geoNetworkResourceService.searchRecordBy(item.id())
+ ));
+ } catch (IOException | FactoryException | TransformException | JAXBException e) {
+ return Optional.empty();
+ }
+ };
+
List results = new ArrayList<>();
+ BulkRequestProcessor bulkRequestProcessor = new BulkRequestProcessor<>(indexName, mapper, self, callback);
- long dataSize = 0;
- long total = 0;
// We need to keep sending messages to client to avoid timeout on long processing
ExecutorService executor = Executors.newFixedThreadPool(2);
@@ -429,55 +389,24 @@ public List indexAllMetadataRecordsFromGeoNetwork(
final StacCollectionModel mappedMetadataValues = value.get();
if(mappedMetadataValues != null) {
- int size = indexerObjectMapper.writeValueAsBytes(mappedMetadataValues).length;
-
- // We need to split the batch into smaller size to avoid data too large error in ElasticSearch,
- // the limit is 10mb, so to make check before document add and push batch if size is too big
- //
- // dataSize = 0 is init case, just in case we have a very big doc that exceed the limit
- // and we have not add it to the bulkRequest, hardcode to 5M which should be safe,
- // usually it is 5M - 15M
- //
- if (dataSize + size > 5242880 && dataSize != 0) {
- if (callback != null) {
- callback.onProgress(String.format("Execute batch as bulk request is big enough %s", dataSize + size));
- }
-
- results.add(reduceResponse(self.executeBulk(bulkRequest, callback)));
-
- dataSize = 0;
- bulkRequest = new BulkRequest.Builder();
- }
- // Add item to bulk request to Elasticsearch
- bulkRequest.operations(op -> op
- .index(idx -> idx
- .id(mappedMetadataValues.getUuid())
- .index(indexName)
- .document(mappedMetadataValues)
- )
- );
- dataSize += size;
- total++;
-
- if (callback != null) {
- callback.onProgress(
- String.format(
- "Add uuid %s to batch, batch size is %s, total is %s",
- mappedMetadataValues.getUuid(),
- dataSize,
- total)
- );
- }
+ bulkRequestProcessor
+ .processItem(mappedMetadataValues.getUuid(), mappedMetadataValues)
+ .ifPresent(results::add);
}
}
}
+
// In case there are residual, just report error
- BulkResponse temp = reduceResponse(self.executeBulk(bulkRequest, callback));
- results.add(temp);
+ bulkRequestProcessor
+ .flush()
+ .ifPresent(response -> {
+ results.add(response);
+
+ if(callback != null) {
+ callback.onComplete(response);
+ }
+ });
- if(callback != null) {
- callback.onComplete(temp);
- }
// TODO now processing for record_suggestions index
log.info("Finished execute bulk indexing records to index: {}",indexName);
@@ -498,84 +427,4 @@ public List indexAllMetadataRecordsFromGeoNetwork(
}
return results;
}
- /**
- * Keep retry until success, it is ok to insert docs to elastic again because we use _id as identifier.
- * In case any service is not available, we will keep retry many times, with 100 retry we try 25 mins which is
- * big enough for aws process restart.
- *
- * @param bulkRequest - The bulk request
- * @param callback - The event call back to avoid timeout
- * @return - The bulk insert result
- * @throws IOException - Exceptions on error
- * @throws HttpServerErrorException.ServiceUnavailable - Exceptions on geonetwork die or elastic not available
- */
- @Retryable(
- retryFor = {Exception.class, HttpServerErrorException.ServiceUnavailable.class},
- maxAttempts = 1000,
- backoff = @Backoff(delay = DEFAULT_BACKOFF_TIME)
- )
- @Override
- public BulkResponse executeBulk(BulkRequest.Builder bulkRequest, Callback callback) throws IOException, HttpServerErrorException.ServiceUnavailable {
- try {
- // Keep retry until success
- BulkResponse result = portalElasticsearchClient.bulk(bulkRequest.build());
-
- // Flush after insert, otherwise you need to wait for next auto-refresh. It is
- // especially a problem with autotest, where assert happens very fast.
- portalElasticsearchClient.indices().refresh();
-
- // Report status if success
- if(callback != null) {
- callback.onProgress(reduceResponse(result));
- }
-
- // Log errors, if any
- if (!result.items().isEmpty()) {
- if (result.errors()) {
- log.error("Bulk load have errors? {}", true);
- } else {
- log.info("Bulk load have errors? {}", false);
- }
- for (BulkResponseItem item : result.items()) {
- if (item.error() != null) {
- try {
- log.error("UUID {} {} {} {}",
- item.id(),
- item.error().reason(),
- item.error().causedBy(),
- indexerObjectMapper
- .writerWithDefaultPrettyPrinter()
- .writeValueAsString(
- this.getMappedMetadataValues(
- geoNetworkResourceService.searchRecordBy(item.id())
- )
- )
- );
- } catch (FactoryException | TransformException | JAXBException e) {
- log.warn("Parse error on display stac record");
- }
- }
- }
- }
- return result;
- }
- catch(Exception e) {
- // Report status if not success, this help to keep connection
- if(callback != null) {
- callback.onProgress("Exception on bulk save, will retry : " + e.getMessage());
- }
- throw e;
- }
- }
-
- protected static BulkResponse reduceResponse(BulkResponse in) {
- List errors = in.items()
- .stream()
- .filter(p -> !(p.status() == HttpStatus.CREATED.value() || p.status() == HttpStatus.OK.value()))
- .toList();
-
- return errors.isEmpty() ?
- BulkResponse.of(f -> f.items(new ArrayList<>()).errors(false).took(in.took())) :
- BulkResponse.of(f -> f.items(errors).errors(true).took(in.took()));
- }
}
diff --git a/indexer/src/main/java/au/org/aodn/esindexer/service/StacCollectionMapperService.java b/indexer/src/main/java/au/org/aodn/esindexer/service/StacCollectionMapperService.java
index 95b7a5b2..e5d02c15 100644
--- a/indexer/src/main/java/au/org/aodn/esindexer/service/StacCollectionMapperService.java
+++ b/indexer/src/main/java/au/org/aodn/esindexer/service/StacCollectionMapperService.java
@@ -1,12 +1,11 @@
package au.org.aodn.esindexer.service;
import au.org.aodn.esindexer.utils.AssociatedRecordsUtil;
-import au.org.aodn.esindexer.model.GeoNetworkField;
-import au.org.aodn.esindexer.model.MediaType;
import au.org.aodn.esindexer.model.RelationType;
import au.org.aodn.esindexer.utils.*;
import au.org.aodn.stac.model.*;
+import au.org.aodn.metadata.geonetwork.GeoNetworkField;
import au.org.aodn.metadata.iso19115_3_2018.*;
import au.org.aodn.stac.util.JsonUtil;
import jakarta.xml.bind.JAXBElement;
@@ -15,6 +14,7 @@
import org.mapstruct.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
+import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import java.io.IOException;
@@ -26,7 +26,6 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;
-import static au.org.aodn.esindexer.model.GeoNetworkField.*;
import static au.org.aodn.esindexer.utils.CommonUtils.safeGet;
import static au.org.aodn.esindexer.utils.StringUtil.capitalizeFirstLetter;
@@ -318,14 +317,14 @@ List