Skip to content

Commit

Permalink
refactor(WIP): inclusive error handling in batch operations (#1053)
Browse files Browse the repository at this point in the history
  • Loading branch information
bobeal authored Nov 30, 2023
1 parent c092be7 commit c5d7e60
Show file tree
Hide file tree
Showing 7 changed files with 303 additions and 174 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,8 @@ import arrow.core.raise.either
import arrow.core.right
import com.egm.stellio.search.authorization.AuthorizationService
import com.egm.stellio.search.model.UpdateResult
import com.egm.stellio.search.web.BatchEntityError
import com.egm.stellio.search.web.BatchEntitySuccess
import com.egm.stellio.search.web.BatchOperationResult
import com.egm.stellio.search.web.*
import com.egm.stellio.shared.model.BadRequestDataException
import com.egm.stellio.shared.model.JsonLdEntity
import com.egm.stellio.shared.model.NgsiLdEntity
import com.egm.stellio.shared.util.Sub
import org.springframework.stereotype.Component
import org.springframework.transaction.annotation.Transactional
Expand All @@ -30,8 +26,10 @@ class EntityOperationService(
/**
* Splits [entities] by their existence in the DB.
*/
suspend fun splitEntitiesByExistence(entities: List<NgsiLdEntity>): Pair<List<NgsiLdEntity>, List<NgsiLdEntity>> {
val extractIdFunc: (NgsiLdEntity) -> URI = { it.id }
suspend fun splitEntitiesByExistence(
entities: List<JsonLdNgsiLdEntity>
): Pair<List<JsonLdNgsiLdEntity>, List<JsonLdNgsiLdEntity>> {
val extractIdFunc: (JsonLdNgsiLdEntity) -> URI = { it.entityId() }
return splitEntitiesByExistenceGeneric(entities, extractIdFunc)
}

Expand All @@ -58,20 +56,16 @@ class EntityOperationService(
* @return a [BatchOperationResult]
*/
suspend fun create(
entities: List<NgsiLdEntity>,
jsonLdEntities: List<JsonLdEntity>,
entities: List<JsonLdNgsiLdEntity>,
sub: Sub?
): BatchOperationResult {
val creationResults = entities.map { ngsiLdEntity ->
val creationResults = entities.map { jsonLdNgsiLdEntity ->
either {
val jsonLdEntity = jsonLdEntities.find { jsonLdEntity ->
ngsiLdEntity.id.toString() == jsonLdEntity.id
}!!
entityPayloadService.createEntity(ngsiLdEntity, jsonLdEntity, sub)
entityPayloadService.createEntity(jsonLdNgsiLdEntity.second, jsonLdNgsiLdEntity.first, sub)
.map {
BatchEntitySuccess(ngsiLdEntity.id)
BatchEntitySuccess(jsonLdNgsiLdEntity.entityId())
}.mapLeft { apiException ->
BatchEntityError(ngsiLdEntity.id, arrayListOf(apiException.message))
BatchEntityError(jsonLdNgsiLdEntity.entityId(), arrayListOf(apiException.message))
}.bind()
}
}.fold(
Expand Down Expand Up @@ -124,7 +118,7 @@ class EntityOperationService(
* @return a [BatchOperationResult] with list of replaced ids and list of errors.
*/
@Transactional
suspend fun replace(entities: List<Pair<NgsiLdEntity, JsonLdEntity>>, sub: Sub?): BatchOperationResult =
suspend fun replace(entities: List<JsonLdNgsiLdEntity>, sub: Sub?): BatchOperationResult =
processEntities(entities, false, sub, ::replaceEntity)

/**
Expand All @@ -136,18 +130,18 @@ class EntityOperationService(
*/
@Transactional
suspend fun update(
entities: List<Pair<NgsiLdEntity, JsonLdEntity>>,
entities: List<JsonLdNgsiLdEntity>,
disallowOverwrite: Boolean = false,
sub: Sub?
): BatchOperationResult =
processEntities(entities, disallowOverwrite, sub, ::updateEntity)

private suspend fun processEntities(
entities: List<Pair<NgsiLdEntity, JsonLdEntity>>,
entities: List<JsonLdNgsiLdEntity>,
disallowOverwrite: Boolean = false,
sub: Sub?,
processor:
suspend (Pair<NgsiLdEntity, JsonLdEntity>, Boolean, Sub?) -> Either<BatchEntityError, BatchEntitySuccess>
suspend (JsonLdNgsiLdEntity, Boolean, Sub?) -> Either<BatchEntityError, BatchEntitySuccess>
): BatchOperationResult =
entities.map {
processEntity(it, disallowOverwrite, sub, processor)
Expand All @@ -162,16 +156,16 @@ class EntityOperationService(
)

private suspend fun processEntity(
entity: Pair<NgsiLdEntity, JsonLdEntity>,
entity: JsonLdNgsiLdEntity,
disallowOverwrite: Boolean = false,
sub: Sub?,
processor:
suspend (Pair<NgsiLdEntity, JsonLdEntity>, Boolean, Sub?) -> Either<BatchEntityError, BatchEntitySuccess>
suspend (JsonLdNgsiLdEntity, Boolean, Sub?) -> Either<BatchEntityError, BatchEntitySuccess>
): Either<BatchEntityError, BatchEntitySuccess> =
kotlin.runCatching {
processor(entity, disallowOverwrite, sub)
}.fold(
onFailure = { BatchEntityError(entity.first.id, arrayListOf(it.message!!)).left() },
onFailure = { BatchEntityError(entity.entityId(), arrayListOf(it.message!!)).left() },
onSuccess = { it }
)

Expand All @@ -181,57 +175,55 @@ class EntityOperationService(
@Transactional(rollbackFor = [BadRequestDataException::class])
@Throws(BadRequestDataException::class)
suspend fun replaceEntity(
entity: Pair<NgsiLdEntity, JsonLdEntity>,
entity: JsonLdNgsiLdEntity,
disallowOverwrite: Boolean,
sub: Sub?
): Either<BatchEntityError, BatchEntitySuccess> =
either {
val (ngsiLdEntity, jsonLdEntity) = entity
temporalEntityAttributeService.deleteTemporalAttributesOfEntity(ngsiLdEntity.id)
val updateResult = entityPayloadService.appendAttributes(
ngsiLdEntity.id,
jsonLdEntity.getAttributes(),
disallowOverwrite,
sub
).bind()

if (updateResult.notUpdated.isNotEmpty())
BadRequestDataException(
updateResult.notUpdated.joinToString(", ") { it.attributeName + " : " + it.reason }
).left().bind<UpdateResult>()
else updateResult.right().bind()
}.map {
BatchEntitySuccess(entity.first.id)
}.mapLeft {
BatchEntityError(entity.first.id, arrayListOf(it.message))
}
): Either<BatchEntityError, BatchEntitySuccess> = either {
val (jsonLdEntity, ngsiLdEntity) = entity
temporalEntityAttributeService.deleteTemporalAttributesOfEntity(ngsiLdEntity.id)
val updateResult = entityPayloadService.appendAttributes(
ngsiLdEntity.id,
jsonLdEntity.getAttributes(),
disallowOverwrite,
sub
).bind()

if (updateResult.notUpdated.isNotEmpty())
BadRequestDataException(
updateResult.notUpdated.joinToString(", ") { it.attributeName + " : " + it.reason }
).left().bind<UpdateResult>()
else updateResult.right().bind()
}.map {
BatchEntitySuccess(entity.entityId())
}.mapLeft {
BatchEntityError(entity.entityId(), arrayListOf(it.message))
}

/*
* Transactional because it should not replace entity attributes if new ones could not be replaced.
*/
@Transactional
suspend fun updateEntity(
entity: Pair<NgsiLdEntity, JsonLdEntity>,
entity: JsonLdNgsiLdEntity,
disallowOverwrite: Boolean,
sub: Sub?
): Either<BatchEntityError, BatchEntitySuccess> =
either {
val (ngsiLdEntity, jsonLdEntity) = entity
val updateResult = entityPayloadService.appendAttributes(
ngsiLdEntity.id,
jsonLdEntity.getAttributes(),
disallowOverwrite,
sub
).bind()
if (updateResult.notUpdated.isEmpty())
updateResult.right().bind()
else
BadRequestDataException(
ArrayList(updateResult.notUpdated.map { it.attributeName + " : " + it.reason }).joinToString()
).left().bind<UpdateResult>()
}.map {
BatchEntitySuccess(entity.first.id, it)
}.mapLeft {
BatchEntityError(entity.first.id, arrayListOf(it.message))
}
): Either<BatchEntityError, BatchEntitySuccess> = either {
val (jsonLdEntity, ngsiLdEntity) = entity
val updateResult = entityPayloadService.appendAttributes(
ngsiLdEntity.id,
jsonLdEntity.getAttributes(),
disallowOverwrite,
sub
).bind()
if (updateResult.notUpdated.isEmpty())
updateResult.right().bind()
else
BadRequestDataException(
ArrayList(updateResult.notUpdated.map { it.attributeName + " : " + it.reason }).joinToString()
).left().bind<UpdateResult>()
}.map {
BatchEntitySuccess(entity.entityId(), it)
}.mapLeft {
BatchEntityError(entity.entityId(), arrayListOf(it.message))
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package com.egm.stellio.search.web

import com.egm.stellio.search.model.UpdateResult
import com.egm.stellio.shared.model.APIException
import com.egm.stellio.shared.model.JsonLdEntity
import com.egm.stellio.shared.model.NgsiLdEntity
import com.egm.stellio.shared.util.toUri
import com.fasterxml.jackson.annotation.JsonIgnore
import com.fasterxml.jackson.annotation.JsonValue
import java.net.URI
Expand All @@ -19,6 +22,12 @@ data class BatchOperationResult(
@JsonIgnore
fun getSuccessfulEntitiesIds() = success.map { it.entityId }

@JsonIgnore
fun addEntitiesToErrors(entities: List<Pair<String, APIException>>) =
entities.forEach {
errors.add(BatchEntityError(it.first.toUri(), arrayListOf(it.second.message)))
}

@JsonIgnore
fun addEntitiesToErrors(entities: List<NgsiLdEntity>, errorMessage: String) =
addIdsToErrors(entities.map { it.id }, errorMessage)
Expand All @@ -41,3 +50,15 @@ data class BatchEntityError(
val entityId: URI,
val error: MutableList<String>
)

typealias JsonLdNgsiLdEntity = Pair<JsonLdEntity, NgsiLdEntity>

fun List<JsonLdNgsiLdEntity>.extractNgsiLdEntities(): List<NgsiLdEntity> = this.map { it.second }
fun JsonLdNgsiLdEntity.entityId(): URI = this.second.id

// a temporary data class to hold the result of deserializing, expanding and transforming to NGSI-LD entities
// the entities received in a batch operation
data class BatchEntityPreparation(
val success: List<JsonLdNgsiLdEntity> = emptyList(),
val errors: List<Pair<String, APIException>> = emptyList()
)
Loading

0 comments on commit c5d7e60

Please sign in to comment.