Skip to content

Commit

Permalink
fix(temporal): incorrect parsing of date-based aggregation period dur…
Browse files Browse the repository at this point in the history
…ations (#1072)
  • Loading branch information
bobeal authored Dec 28, 2023
1 parent d2153cb commit eae6e1e
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package com.egm.stellio.search.model

import java.time.Duration
import java.time.Period
import java.time.temporal.TemporalAmount

data class TemporalEntitiesQuery(
val entitiesQuery: EntitiesQuery,
val temporalQuery: TemporalQuery,
Expand All @@ -10,4 +14,16 @@ data class TemporalEntitiesQuery(
fun isAggregatedWithDefinedDuration(): Boolean =
withAggregatedValues &&
(temporalQuery.aggrPeriodDuration != null && temporalQuery.aggrPeriodDuration != "PT0S")

fun computeAggrPeriodDuration(): TemporalAmount {
val splitted = temporalQuery.aggrPeriodDuration!!.split("T")
return if (splitted.size == 1) // has only date-based fields
Period.parse(temporalQuery.aggrPeriodDuration)
else {
val duration = Duration.parse("PT" + splitted[1])
if ("P" == splitted[0]) // has only time-based fields
duration
else Period.parse(splitted[0]).plus(duration)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ class ScopeService(
)
.bind("entity_id", entityId)
.bind("time_property", timeproperty.name)
.oneToResult { toZonedDateTime(it["first"]) }
.oneToResult { toOptionalZonedDateTime(it["first"]) }
.getOrNull()

private fun Json.replaceScopeValue(newScopeValue: Any): Map<String, Any> =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import org.springframework.r2dbc.core.bind
import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional
import java.net.URI
import java.time.Duration
import java.time.ZonedDateTime
import java.util.UUID

Expand Down Expand Up @@ -174,7 +173,11 @@ class AttributeInstanceService(
this.allToMappedList { rowToAttributeInstanceResult(it, temporalEntitiesQuery) }
}.fold(
{ it.right() },
{ OperationNotSupportedException(INCONSISTENT_VALUES_IN_AGGREGATION_MESSAGE).left() }
{
OperationNotSupportedException(
it.cause?.message ?: INCONSISTENT_VALUES_IN_AGGREGATION_MESSAGE
).left()
}
)
}

Expand Down Expand Up @@ -275,7 +278,7 @@ class AttributeInstanceService(
if (!temporalEntitiesQuery.isAggregatedWithDefinedDuration())
toZonedDateTime(row["endTime"])
else
startDateTime.plus(Duration.parse(temporalEntitiesQuery.temporalQuery.aggrPeriodDuration))
startDateTime.plus(temporalEntitiesQuery.computeAggrPeriodDuration())
// in a row, there is the result for each requested aggregation method
val values = temporalEntitiesQuery.temporalQuery.aggrMethods!!.map {
val value = row["${it.method}_value"] ?: ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ class AggregatedQueryServiceTests : WithTimescaleContainer, WithKafkaContainer {
}

@Test
fun `ìt should aggregate on the whole time range if no aggrPeriodDuration is given`() = runTest {
fun `it should aggregate on the whole time range if no aggrPeriodDuration is given`() = runTest {
val temporalEntityAttribute = createTemporalEntityAttribute(TemporalEntityAttribute.AttributeValueType.NUMBER)
(1..10).forEach { i ->
val attributeInstance = gimmeAttributeInstance(teaUuid)
Expand All @@ -372,6 +372,35 @@ class AggregatedQueryServiceTests : WithTimescaleContainer, WithKafkaContainer {
}
}

@ParameterizedTest
@CsvSource(
"P1D, 10",
"PT48H, 6",
"PT24H, 10",
"PT12H, 10",
"P1W, 2",
"P2W, 1",
"P1M, 2"
)
fun `it should aggregate on the asked aggrPeriodDuration`(
aggrPeriodDuration: String,
expectedNumberOfBuckets: Int
) = runTest {
val temporalEntityAttribute = createTemporalEntityAttribute(TemporalEntityAttribute.AttributeValueType.NUMBER)
val startTimestamp = ZonedDateTime.parse("2023-12-28T12:00:00Z")
(1..10).forEach { i ->
val attributeInstance = gimmeAttributeInstance(teaUuid)
.copy(time = startTimestamp.plusDays(i.toLong()))
attributeInstanceService.create(attributeInstance)
}

val temporalEntitiesQuery = createTemporalEntitiesQuery("avg", aggrPeriodDuration)
attributeInstanceService.search(temporalEntitiesQuery, temporalEntityAttribute, startTimestamp)
.shouldSucceedWith { results ->
assertEquals(expectedNumberOfBuckets, results.size)
}
}

@Test
fun `it should handle aggregates for an attribute having different types of values in history`() = runTest {
val temporalEntityAttribute = createTemporalEntityAttribute(TemporalEntityAttribute.AttributeValueType.ARRAY)
Expand All @@ -388,7 +417,7 @@ class AggregatedQueryServiceTests : WithTimescaleContainer, WithKafkaContainer {
attributeInstanceService.search(temporalEntitiesQuery, temporalEntityAttribute, now)
.shouldFail {
assertInstanceOf(OperationNotSupportedException::class.java, it)
assertEquals(INCONSISTENT_VALUES_IN_AGGREGATION_MESSAGE, it.message)
assertEquals("cannot get array length of a scalar", it.message)
}
}

Expand All @@ -407,12 +436,15 @@ class AggregatedQueryServiceTests : WithTimescaleContainer, WithKafkaContainer {
return temporalEntityAttribute
}

private fun createTemporalEntitiesQuery(aggrMethod: String): TemporalEntitiesQuery =
private fun createTemporalEntitiesQuery(
aggrMethod: String,
aggrPeriodDuration: String = "P1D"
): TemporalEntitiesQuery =
gimmeTemporalEntitiesQuery(
TemporalQuery(
timerel = TemporalQuery.Timerel.AFTER,
timeAt = now.minusHours(1),
aggrPeriodDuration = "P1D",
aggrPeriodDuration = aggrPeriodDuration,
aggrMethods = listOfNotNull(TemporalQuery.Aggregate.forMethod(aggrMethod))
),
withAggregatedValues = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ fun APIException.toErrorResponse(): ResponseEntity<*> =
generateErrorResponse(HttpStatus.BAD_REQUEST, InvalidRequestResponse(this.message))
is BadRequestDataException ->
generateErrorResponse(HttpStatus.BAD_REQUEST, BadRequestDataResponse(this.message))
is OperationNotSupportedException ->
generateErrorResponse(HttpStatus.BAD_REQUEST, OperationNotSupportedResponse(this.message))
is AccessDeniedException ->
generateErrorResponse(HttpStatus.FORBIDDEN, AccessDeniedResponse(this.message))
is NotImplementedException ->
Expand Down

0 comments on commit eae6e1e

Please sign in to comment.