Skip to content

Commit

Permalink
Rollback upsert_holdings
Browse files Browse the repository at this point in the history
  • Loading branch information
psmagin committed Oct 10, 2023
1 parent c4365b1 commit 2c4296a
Show file tree
Hide file tree
Showing 17 changed files with 83 additions and 987 deletions.
6 changes: 1 addition & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
</dependencyManagement>

<dependencies>
<!-- https://mvnrepository.com/artifact/org.marc4j/marc4j -->
<dependency>
<groupId>org.marc4j</groupId>
<artifactId>marc4j</artifactId>
Expand Down Expand Up @@ -185,11 +186,6 @@
<artifactId>vertx-unit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion ramls/holdingsrecord.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
},
"hrid": {
"type": "string",
"description": "the human readable ID, also called eye readable ID. Unique. On creation it can be provided, otherwise the system assigns the next sequential ID using the settings of the /hrid-settings-storage/hrid-settings API. An update with a different hrid is rejected. An update without hrid is populated with the holdings record's current hrid."
"description": "the human readable ID, also called eye readable ID. A system-assigned sequential ID which maps to the Instance ID"
},
"holdingsTypeId": {
"type": "string",
Expand Down
26 changes: 0 additions & 26 deletions src/main/java/org/folio/persist/HoldingsRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,10 @@

import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.json.JsonObject;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowSet;
import io.vertx.sqlclient.Tuple;
import java.util.List;
import java.util.Map;
import org.folio.cql2pgjson.CQL2PgJSON;
import org.folio.dbschema.ObjectMapperTool;
import org.folio.rest.jaxrs.model.HoldingsRecord;
import org.folio.rest.persist.cql.CQLWrapper;

Expand All @@ -21,28 +17,6 @@ public HoldingsRepository(Context context, Map<String, String> okapiHeaders) {
super(postgresClient(context, okapiHeaders), HOLDINGS_RECORD_TABLE, HoldingsRecord.class);
}

/**
* Upsert holdings records.
*
* <p>Returns
* { "holdingsRecords": [{"old": {...}, "new": {...}}, ...],
* "items": [{"old": {...}, "new": {...}}, ...]
* }
* providing old and new jsonb content of all updated holdings and items. For a newly
* inserted holding only "new" is provided.
*
* @param holdingsRecords records to insert or update (upsert)
*/
public Future<JsonObject> upsert(List<HoldingsRecord> holdingsRecords) {
try {
var array = ObjectMapperTool.getMapper().writeValueAsString(holdingsRecords);
return postgresClient.selectSingle("SELECT upsert_holdings($1::text::jsonb)", Tuple.of(array))
.map(row -> row.getJsonObject(0));
} catch (Exception e) {
return Future.failedFuture(e);
}
}

/**
* Delete by CQL. For each deleted record return a {@link Row} with the instance id String
* and with the holdings' jsonb String.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.folio.rest.exceptions.NotFoundException;
import org.folio.rest.exceptions.ValidationException;
import org.folio.rest.jaxrs.model.Errors;
import org.folio.rest.persist.PgExceptionFacade;
import org.folio.rest.persist.PgExceptionUtil;
import org.folio.rest.persist.cql.CQLQueryValidationException;
import org.folio.rest.tools.client.exceptions.ResponseException;
Expand Down Expand Up @@ -69,10 +68,6 @@ public static Response failureResponse(Throwable error) {
} else if (error instanceof ResponseException) {
return ((ResponseException) error).getResponse();
}
var sqlState = new PgExceptionFacade(error).getSqlState();
if ("239HR".equals(sqlState)) { // Cannot change hrid of holdings record
return textPlainResponse(400, error);
}
String message = PgExceptionUtil.badRequestMessage(error);
if (message != null) {
return textPlainResponse(400, message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.json.JsonObject;
import java.util.Collection;
import java.util.List;
import javax.ws.rs.core.Response;
Expand Down Expand Up @@ -93,13 +92,6 @@ public Future<Void> publishAllRemoved() {
return domainEventService.publishAllRecordsRemoved();
}

public Future<Void> publishUpserted(String instanceId, JsonObject oldRecord, JsonObject newRecord) {
if (oldRecord == null) {
return domainEventService.publishRecordCreated(instanceId, newRecord.encode());
}
return domainEventService.publishRecordUpdated(instanceId, oldRecord.encode(), newRecord.encode());
}

public Handler<Response> publishUpdated(D oldRecord) {
return response -> {
if (!isUpdateSuccessResponse(response)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,6 @@ Future<Void> publishRecordUpdated(String instanceId, T oldRecord, T newRecord) {
return publish(instanceId, domainEvent);
}

Future<Void> publishRecordUpdated(String instanceId, String oldRecord, String newRecord) {
var domainEvent = DomainEventRaw.updateEvent(oldRecord, newRecord, tenantId(okapiHeaders));
return publish(instanceId, domainEvent);
}

Future<Void> publishRecordsUpdated(Collection<Triple<String, T, T>> updatedRecords) {
if (updatedRecords.isEmpty()) {
return succeededFuture();
Expand All @@ -139,11 +134,6 @@ Future<Void> publishRecordCreated(String instanceId, T newRecord) {
return publish(instanceId, domainEvent);
}

Future<Void> publishRecordCreated(String id, String newRecord) {
var domainEvent = DomainEventRaw.createEvent(newRecord, tenantId(okapiHeaders));
return publish(id, domainEvent);
}

Future<Void> publishRecordsCreated(List<Pair<String, T>> records) {
if (records.isEmpty()) {
return succeededFuture();
Expand Down
122 changes: 56 additions & 66 deletions src/main/java/org/folio/services/holding/HoldingsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,16 @@
import static org.folio.rest.jaxrs.resource.HoldingsStorageBatchSynchronous.PostHoldingsStorageBatchSynchronousResponse;
import static org.folio.rest.persist.PgUtil.deleteById;
import static org.folio.rest.persist.PgUtil.post;
import static org.folio.rest.persist.PgUtil.postSync;
import static org.folio.rest.persist.PgUtil.postgresClient;
import static org.folio.services.batch.BatchOperationContextFactory.buildBatchOperationContext;
import static org.folio.validator.HridValidators.refuseWhenHridChanged;

import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.json.JsonObject;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -30,28 +31,29 @@
import org.folio.persist.HoldingsRepository;
import org.folio.persist.InstanceInternalRepository;
import org.folio.rest.jaxrs.model.HoldingsRecord;
import org.folio.rest.jaxrs.model.Item;
import org.folio.rest.persist.PostgresClient;
import org.folio.rest.persist.SQLConnection;
import org.folio.rest.support.CqlQuery;
import org.folio.rest.support.EndpointFailureHandler;
import org.folio.rest.support.HridManager;
import org.folio.rest.tools.utils.OptimisticLockingUtil;
import org.folio.services.caches.ConsortiumData;
import org.folio.services.caches.ConsortiumDataCache;
import org.folio.services.consortium.ConsortiumService;
import org.folio.services.consortium.ConsortiumServiceImpl;
import org.folio.services.consortium.entities.SharingInstance;
import org.folio.services.domainevent.HoldingDomainEventPublisher;
import org.folio.services.domainevent.ItemDomainEventPublisher;
import org.folio.services.item.ItemService;
import org.folio.validator.CommonValidators;
import org.folio.validator.NotesValidators;

public class HoldingsService {
private static final Logger log = getLogger(HoldingsService.class);
private static final String INSTANCE_ID = "instanceId";
private final Context vertxContext;
private final Map<String, String> okapiHeaders;
private final PostgresClient postgresClient;
private final HridManager hridManager;
private final ItemService itemService;
private final HoldingsRepository holdingsRepository;
private final ItemDomainEventPublisher itemEventService;
private final HoldingDomainEventPublisher domainEventPublisher;
Expand All @@ -62,6 +64,7 @@ public HoldingsService(Context context, Map<String, String> okapiHeaders) {
this.vertxContext = context;
this.okapiHeaders = okapiHeaders;

itemService = new ItemService(context, okapiHeaders);
postgresClient = postgresClient(context, okapiHeaders);
hridManager = new HridManager(postgresClient);
holdingsRepository = new HoldingsRepository(context, okapiHeaders);
Expand All @@ -86,7 +89,7 @@ public Future<Response> updateHoldingRecord(String holdingId, HoldingsRecord hol
return holdingsRepository.getById(holdingId)
.compose(existingHoldingsRecord -> {
if (holdingsRecordFound(existingHoldingsRecord)) {
return updateHolding(holdingsRecord);
return updateHolding(existingHoldingsRecord, holdingsRecord);
} else {
return createHolding(holdingsRecord);
}
Expand Down Expand Up @@ -159,75 +162,37 @@ public Future<Response> createHoldings(List<HoldingsRecord> holdings, boolean up
}
return Future.succeededFuture();
})
.compose(ar -> {
if (upsert) {
return upsertHoldings(holdings, optimisticLocking);
} else {
return hridManager.populateHridForHoldings(holdings)
.compose(NotesValidators::refuseHoldingLongNotes)
.compose(result -> buildBatchOperationContext(upsert, holdings,
holdingsRepository, HoldingsRecord::getId))
.compose(batchOperation -> postSync(HOLDINGS_RECORD_TABLE, holdings, MAX_ENTITIES,
upsert, optimisticLocking, okapiHeaders, vertxContext, PostHoldingsStorageBatchSynchronousResponse.class)
.onSuccess(domainEventPublisher.publishCreatedOrUpdated(batchOperation)));
}
});
}

public Future<Response> upsertHoldings(List<HoldingsRecord> holdings, boolean optimisticLocking) {
try {
if (optimisticLocking) {
OptimisticLockingUtil.unsetVersionIfMinusOne(holdings);
} else {
if (! OptimisticLockingUtil.isSuppressingOptimisticLockingAllowed()) {
return Future.succeededFuture(Response.status(413).entity(
"DB_ALLOW_SUPPRESS_OPTIMISTIC_LOCKING environment variable doesn't allow "
+ "to disable optimistic locking").build());
}
OptimisticLockingUtil.setVersionToMinusOne(holdings);
}

return NotesValidators.refuseHoldingLongNotes(holdings)
.compose(x -> holdingsRepository.upsert(holdings))
.onSuccess(this::publishEvents)
.map(Response.status(201).build())
.otherwise(EndpointFailureHandler::failureResponse);
} catch (ReflectiveOperationException e) {
throw new SecurityException(e);
}
.compose(ar -> hridManager.populateHridForHoldings(holdings)
.compose(NotesValidators::refuseHoldingLongNotes)
.compose(result -> buildBatchOperationContext(upsert, holdings,
holdingsRepository, HoldingsRecord::getId))
.compose(batchOperation -> postSync(HOLDINGS_RECORD_TABLE, holdings, MAX_ENTITIES,
upsert, optimisticLocking, okapiHeaders, vertxContext, PostHoldingsStorageBatchSynchronousResponse.class)
.onSuccess(domainEventPublisher.publishCreatedOrUpdated(batchOperation))));
}

private Future<Response> updateHolding(HoldingsRecord newHoldings) {
private Future<Response> updateHolding(HoldingsRecord oldHoldings, HoldingsRecord newHoldings) {
newHoldings.setEffectiveLocationId(calculateEffectiveLocation(newHoldings));

if (Integer.valueOf(-1).equals(newHoldings.getVersion())) {
newHoldings.setVersion(null); // enforce optimistic locking
}

return NotesValidators.refuseLongNotes(newHoldings)
.compose(x -> holdingsRepository.upsert(List.of(newHoldings)))
.onSuccess(this::publishEvents)
.map(x -> PutHoldingsStorageHoldingsByHoldingsRecordIdResponse.respond204());
}
return refuseWhenHridChanged(oldHoldings, newHoldings)
.compose(notUsed -> NotesValidators.refuseLongNotes(newHoldings))
.compose(notUsed -> {
final Promise<List<Item>> overallResult = promise();

postgresClient.startTx(
connection -> holdingsRepository.update(connection, oldHoldings.getId(), newHoldings)
.compose(updateRes -> itemService.updateItemsOnHoldingChanged(connection, newHoldings))
.onComplete(handleTransaction(connection, overallResult)));

private void publishEvents(JsonObject holdingsItems) {
Map<String, String> instanceIdByHoldingsRecordId = new HashMap<>();
holdingsItems.getJsonArray("holdingsRecords").forEach(o -> {
var oldHolding = ((JsonObject) o).getJsonObject("old");
var newHolding = ((JsonObject) o).getJsonObject("new");
var instanceId = newHolding.getString(INSTANCE_ID);
var holdingId = newHolding.getString("id");
instanceIdByHoldingsRecordId.put(holdingId, instanceId);
domainEventPublisher.publishUpserted(instanceId, oldHolding, newHolding);
});
holdingsItems.getJsonArray("items").forEach(o -> {
var oldItem = ((JsonObject) o).getJsonObject("old");
var newItem = ((JsonObject) o).getJsonObject("new");
var instanceId = instanceIdByHoldingsRecordId.get(newItem.getString("holdingsRecordId"));
oldItem.put(INSTANCE_ID, instanceId);
newItem.put(INSTANCE_ID, instanceId);
itemEventService.publishUpserted(instanceId, oldItem, newItem);
});
return overallResult.future()
.compose(itemsBeforeUpdate -> itemEventService.publishUpdated(oldHoldings, newHoldings, itemsBeforeUpdate))
.<Response>map(res -> PutHoldingsStorageHoldingsByHoldingsRecordIdResponse.respond204())
.onSuccess(domainEventPublisher.publishUpdated(oldHoldings));
});
}

private String calculateEffectiveLocation(HoldingsRecord holdingsRecord) {
Expand All @@ -241,6 +206,31 @@ private String calculateEffectiveLocation(HoldingsRecord holdingsRecord) {
}
}

private <T> Handler<AsyncResult<T>> handleTransaction(
AsyncResult<SQLConnection> connection, Promise<T> overallResult) {

return transactionResult -> {
if (transactionResult.succeeded()) {
postgresClient.endTx(connection, commitResult -> {
if (commitResult.succeeded()) {
overallResult.complete(transactionResult.result());
} else {
log.error("Unable to commit transaction", commitResult.cause());
overallResult.fail(commitResult.cause());
}
});
} else {
log.error("Reverting transaction");
postgresClient.rollbackTx(connection, revertResult -> {
if (revertResult.failed()) {
log.error("Unable to revert transaction", revertResult.cause());
}
overallResult.fail(transactionResult.cause());
});
}
};
}

private CompositeFuture createShadowInstancesIfNeeded(List<HoldingsRecord> holdingsRecords,
ConsortiumData consortiumData) {
Map<String, Future<SharingInstance>> instanceFuturesMap = new HashMap<>();
Expand Down

This file was deleted.

Loading

0 comments on commit 2c4296a

Please sign in to comment.