Skip to content

Commit

Permalink
MODINVSTOR-941: Batch holdings update updates items
Browse files Browse the repository at this point in the history
Closes https://issues.folio.org/browse/MODINVSTOR-941
"Location changes in holdings records via batch-synchronous APIs not triggering update to item effective location"
Partly addresses https://issues.folio.org/browse/MODINVSTOR-870
"POST/PUT instance/holding/item performance"

* Update items when doing a bulk holdings updates. Item properties to update:
  * effective shelving order
  * effective call number components
  * effective location id
* Database trigger fills holdings HRID if needed
* No performance regression
* Performance improvements

Move all Java code needed to update affected items into SQL functions.
Otherwise Java code must fetch and check item records that requires
round trips from Java to DB and back to Java and database locks for
consistency resulting in an unacceptable performance regression.
Performance of this bulk holdings update API is a critical requirement for
several institutions.

Move Java code that fetches the next holdings HRID from database into a
database trigger. This avoids a holdings record query and an HRID query
in Java.

The code changes are minimal so that is can easily been back-ported to Orchid
and released as a critical service patch (CSP).

Separate pull requests will remove all Java code that is duplicated in
SQL functions; they will not be back-ported as CSPs:
https://issues.folio.org/browse/MODINVSTOR-1090
https://issues.folio.org/browse/MODINVSTOR-1091

- [x] Adjust existing tests and add new ones for the the new implementation
  • Loading branch information
julianladisch committed Aug 2, 2023
1 parent 9b7b8fc commit e47e432
Show file tree
Hide file tree
Showing 17 changed files with 967 additions and 79 deletions.
6 changes: 5 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
</dependencyManagement>

<dependencies>
<!-- https://mvnrepository.com/artifact/org.marc4j/marc4j -->
<dependency>
<groupId>org.marc4j</groupId>
<artifactId>marc4j</artifactId>
Expand Down Expand Up @@ -181,6 +180,11 @@
<artifactId>vertx-unit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion ramls/holdingsrecord.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
},
"hrid": {
"type": "string",
"description": "the human readable ID, also called eye readable ID. A system-assigned sequential ID which maps to the Instance ID"
"description": "the human readable ID, also called eye readable ID. Unique. On creation it can be provided, otherwise the system assigns the next sequential ID using the settings of the /hrid-settings-storage/hrid-settings API. An update with a different hrid is rejected. An update without hrid is populated with the holdings record's current hrid."
},
"holdingsTypeId": {
"type": "string",
Expand Down
48 changes: 48 additions & 0 deletions src/main/java/org/folio/persist/HoldingsRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@

import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.json.JsonObject;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowSet;
import io.vertx.sqlclient.Tuple;
import java.util.List;
import java.util.Map;
import org.folio.cql2pgjson.CQL2PgJSON;
import org.folio.dbschema.ObjectMapperTool;
import org.folio.rest.jaxrs.model.HoldingsRecord;
import org.folio.rest.persist.cql.CQLWrapper;

Expand All @@ -17,6 +21,50 @@ public HoldingsRepository(Context context, Map<String, String> okapiHeaders) {
super(postgresClient(context, okapiHeaders), HOLDINGS_RECORD_TABLE, HoldingsRecord.class);
}

/**
* Upsert a holdings record.
*
* <p>Returns
* { "holdingsRecords": [{"old": {...}, "new": {...}}, ...],
* "items": [{"old": {...}, "new": {...}}, ...]
* }
* providing old and new jsonb content of all updated holdings and items. For a newly
* inserted holding only "new" is provided.
*
* @param holdingsRecord record to insert or update (upsert)
*/
public Future<JsonObject> getAndUpsert(HoldingsRecord holdingsRecord) {
try {
var array = "[" + ObjectMapperTool.getMapper().writeValueAsString(holdingsRecord) + "]";
return postgresClient.selectSingle("SELECT upsert_holdings($1::text::jsonb)", Tuple.of(array))
.map(row -> row.getJsonObject(0));
} catch (Exception e) {
return Future.failedFuture(e);
}
}

/**
* Upsert holdings records.
*
* <p>Returns
* { "holdingsRecords": [{"old": {...}, "new": {...}}, ...],
* "items": [{"old": {...}, "new": {...}}, ...]
* }
* providing old and new jsonb content of all updated holdings and items. For a newly
* inserted holding only "new" is provided.
*
* @param holdingsRecords records to insert or update (upsert)
*/
public Future<JsonObject> upsert(List<HoldingsRecord> holdingsRecords) {
try {
var array = ObjectMapperTool.getMapper().writeValueAsString(holdingsRecords);
return postgresClient.selectSingle("SELECT upsert_holdings($1::text::jsonb)", Tuple.of(array))
.map(row -> row.getJsonObject(0));
} catch (Exception e) {
return Future.failedFuture(e);
}
}

/**
* Delete by CQL. For each deleted record return a {@link Row} with the instance id String
* and with the holdings' jsonb String.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.folio.rest.exceptions.NotFoundException;
import org.folio.rest.exceptions.ValidationException;
import org.folio.rest.jaxrs.model.Errors;
import org.folio.rest.persist.PgExceptionFacade;
import org.folio.rest.persist.PgExceptionUtil;
import org.folio.rest.persist.cql.CQLQueryValidationException;
import org.folio.rest.tools.client.exceptions.ResponseException;
Expand Down Expand Up @@ -68,6 +69,13 @@ public static Response failureResponse(Throwable error) {
} else if (error instanceof ResponseException) {
return ((ResponseException) error).getResponse();
}
var sqlState = new PgExceptionFacade(error).getSqlState();
switch (sqlState) {
case "239HR": // Cannot change hrid of holdings record
return textPlainResponse(400, error);
default:
// continue
}
String message = PgExceptionUtil.badRequestMessage(error);
if (message != null) {
return textPlainResponse(400, message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.json.JsonObject;
import java.util.Collection;
import java.util.List;
import javax.ws.rs.core.Response;
Expand Down Expand Up @@ -92,6 +93,13 @@ public Future<Void> publishAllRemoved() {
return domainEventService.publishAllRecordsRemoved();
}

public Future<Void> publishUpserted(String instanceId, JsonObject oldRecord, JsonObject newRecord) {
if (oldRecord == null) {
return domainEventService.publishRecordCreated(instanceId, newRecord.encode());
}
return domainEventService.publishRecordUpdated(instanceId, oldRecord.encode(), newRecord.encode());
}

public Handler<Response> publishUpdated(D oldRecord) {
return response -> {
if (!isUpdateSuccessResponse(response)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ Future<Void> publishRecordUpdated(String instanceId, T oldRecord, T newRecord) {
return publish(instanceId, domainEvent);
}

Future<Void> publishRecordUpdated(String instanceId, String oldRecord, String newRecord) {
var domainEvent = DomainEventRaw.updateEvent(oldRecord, newRecord, tenantId(okapiHeaders));
return publish(instanceId, domainEvent);
}

Future<Void> publishRecordsUpdated(Collection<Triple<String, T, T>> updatedRecords) {
if (updatedRecords.isEmpty()) {
return succeededFuture();
Expand All @@ -133,6 +138,11 @@ Future<Void> publishRecordCreated(String instanceId, T newRecord) {
return publish(instanceId, domainEvent);
}

Future<Void> publishRecordCreated(String id, String newRecord) {
var domainEvent = DomainEventRaw.createEvent(newRecord, tenantId(okapiHeaders));
return publish(id, domainEvent);
}

Future<Void> publishRecordsCreated(List<Pair<String, T>> records) {
if (records.isEmpty()) {
return succeededFuture();
Expand Down
110 changes: 59 additions & 51 deletions src/main/java/org/folio/services/holding/HoldingsService.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.folio.services.holding;

import static io.vertx.core.Promise.promise;
import static org.apache.logging.log4j.LogManager.getLogger;
import static org.folio.rest.impl.HoldingsStorageApi.HOLDINGS_RECORD_TABLE;
import static org.folio.rest.impl.StorageHelper.MAX_ENTITIES;
import static org.folio.rest.jaxrs.resource.HoldingsStorage.DeleteHoldingsStorageHoldingsByHoldingsRecordIdResponse;
Expand All @@ -14,39 +13,33 @@
import static org.folio.rest.persist.PgUtil.postSync;
import static org.folio.rest.persist.PgUtil.postgresClient;
import static org.folio.services.batch.BatchOperationContextFactory.buildBatchOperationContext;
import static org.folio.validator.HridValidators.refuseWhenHridChanged;

import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.json.JsonObject;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.Logger;
import org.folio.persist.HoldingsRepository;
import org.folio.rest.jaxrs.model.HoldingsRecord;
import org.folio.rest.jaxrs.model.Item;
import org.folio.rest.persist.PostgresClient;
import org.folio.rest.persist.SQLConnection;
import org.folio.rest.support.CqlQuery;
import org.folio.rest.support.EndpointFailureHandler;
import org.folio.rest.support.HridManager;
import org.folio.rest.tools.utils.OptimisticLockingUtil;
import org.folio.services.domainevent.HoldingDomainEventPublisher;
import org.folio.services.domainevent.ItemDomainEventPublisher;
import org.folio.services.item.ItemService;
import org.folio.validator.CommonValidators;
import org.folio.validator.NotesValidators;

public class HoldingsService {
private static final Logger log = getLogger(HoldingsService.class);

private final Context vertxContext;
private final Map<String, String> okapiHeaders;
private final PostgresClient postgresClient;
private final HridManager hridManager;
private final ItemService itemService;
private final HoldingsRepository holdingsRepository;
private final ItemDomainEventPublisher itemEventService;
private final HoldingDomainEventPublisher domainEventPublisher;
Expand All @@ -55,7 +48,6 @@ public HoldingsService(Context context, Map<String, String> okapiHeaders) {
this.vertxContext = context;
this.okapiHeaders = okapiHeaders;

itemService = new ItemService(context, okapiHeaders);
postgresClient = postgresClient(context, okapiHeaders);
hridManager = new HridManager(postgresClient);
holdingsRepository = new HoldingsRepository(context, okapiHeaders);
Expand Down Expand Up @@ -134,6 +126,10 @@ public Future<Response> deleteHoldings(String cql) {
}

public Future<Response> createHoldings(List<HoldingsRecord> holdings, boolean upsert, boolean optimisticLocking) {
if (upsert) {
return upsertHoldings(holdings, optimisticLocking);
}

for (HoldingsRecord holdingsRecord : holdings) {
holdingsRecord.setEffectiveLocationId(calculateEffectiveLocation(holdingsRecord));
}
Expand All @@ -147,28 +143,65 @@ public Future<Response> createHoldings(List<HoldingsRecord> holdings, boolean up
.onSuccess(domainEventPublisher.publishCreatedOrUpdated(batchOperation)));
}


public Future<Response> upsertHoldings(List<HoldingsRecord> holdings, boolean optimisticLocking) {
try {
for (HoldingsRecord holdingsRecord : holdings) {
holdingsRecord.setEffectiveLocationId(calculateEffectiveLocation(holdingsRecord));
}

if (optimisticLocking) {
OptimisticLockingUtil.unsetVersionIfMinusOne(holdings);
} else {
if (! OptimisticLockingUtil.isSuppressingOptimisticLockingAllowed()) {
return Future.succeededFuture(Response.status(413).entity(
"DB_ALLOW_SUPPRESS_OPTIMISTIC_LOCKING environment variable doesn't allow "
+ "to disable optimistic locking").build());
}
OptimisticLockingUtil.setVersionToMinusOne(holdings);
}

return NotesValidators.refuseHoldingLongNotes(holdings)
.compose(x -> holdingsRepository.upsert(holdings))
.onSuccess(this::publishEvents)
.map(Response.status(201).build())
.otherwise(EndpointFailureHandler::failureResponse);
} catch (ReflectiveOperationException e) {
throw new RuntimeException(e);
}
}

private Future<Response> updateHolding(HoldingsRecord oldHoldings, HoldingsRecord newHoldings) {
newHoldings.setEffectiveLocationId(calculateEffectiveLocation(newHoldings));

if (Integer.valueOf(-1).equals(newHoldings.getVersion())) {
newHoldings.setVersion(null); // enforce optimistic locking
}

return refuseWhenHridChanged(oldHoldings, newHoldings)
.compose(notUsed -> NotesValidators.refuseLongNotes(newHoldings))
.compose(notUsed -> {
final Promise<List<Item>> overallResult = promise();

postgresClient.startTx(
connection -> holdingsRepository.update(connection, oldHoldings.getId(), newHoldings)
.compose(updateRes -> itemService.updateItemsOnHoldingChanged(connection, newHoldings))
.onComplete(handleTransaction(connection, overallResult)));
return NotesValidators.refuseLongNotes(newHoldings)
.compose(x -> holdingsRepository.getAndUpsert(newHoldings))
.onSuccess(this::publishEvents)
.map(x -> PutHoldingsStorageHoldingsByHoldingsRecordIdResponse.respond204());
}

return overallResult.future()
.compose(itemsBeforeUpdate -> itemEventService.publishUpdated(oldHoldings, newHoldings, itemsBeforeUpdate))
.<Response>map(res -> PutHoldingsStorageHoldingsByHoldingsRecordIdResponse.respond204())
.onSuccess(domainEventPublisher.publishUpdated(oldHoldings));
});
private void publishEvents(JsonObject holdingsItems) {
Map<String, String> instanceIdByHoldingsRecordId = new HashMap<>();
holdingsItems.getJsonArray("holdingsRecords").forEach(o -> {
var oldHolding = ((JsonObject) o).getJsonObject("old");
var newHolding = ((JsonObject) o).getJsonObject("new");
var instanceId = newHolding.getString("instanceId");
var holdingId = newHolding.getString("id");
instanceIdByHoldingsRecordId.put(holdingId, instanceId);
domainEventPublisher.publishUpserted(instanceId, oldHolding, newHolding);
});
holdingsItems.getJsonArray("items").forEach(o -> {
var oldItem = ((JsonObject) o).getJsonObject("old");
var newItem = ((JsonObject) o).getJsonObject("new");
var instanceId = instanceIdByHoldingsRecordId.get(newItem.getString("holdingsRecordId"));
oldItem.put("instanceId", instanceId);
newItem.put("instanceId", instanceId);
itemEventService.publishUpserted(instanceId, oldItem, newItem);
});
}

private String calculateEffectiveLocation(HoldingsRecord holdingsRecord) {
Expand All @@ -182,31 +215,6 @@ private String calculateEffectiveLocation(HoldingsRecord holdingsRecord) {
}
}

private <T> Handler<AsyncResult<T>> handleTransaction(
AsyncResult<SQLConnection> connection, Promise<T> overallResult) {

return transactionResult -> {
if (transactionResult.succeeded()) {
postgresClient.endTx(connection, commitResult -> {
if (commitResult.succeeded()) {
overallResult.complete(transactionResult.result());
} else {
log.error("Unable to commit transaction", commitResult.cause());
overallResult.fail(commitResult.cause());
}
});
} else {
log.error("Reverting transaction");
postgresClient.rollbackTx(connection, revertResult -> {
if (revertResult.failed()) {
log.error("Unable to revert transaction", revertResult.cause());
}
overallResult.fail(transactionResult.cause());
});
}
};
}

private boolean holdingsRecordFound(HoldingsRecord holdingsRecord) {
return holdingsRecord != null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
-- Fill in the next hrid from the sequence when the instance/holding/item
-- doesn't have a hrid.
CREATE OR REPLACE FUNCTION hrid_trigger() RETURNS TRIGGER
AS $$
DECLARE
name TEXT;
hrid TEXT;
prefix TEXT;
zeroes BOOLEAN;
BEGIN
IF NEW.jsonb->'hrid' IS NOT NULL THEN
RETURN NEW;
END IF;
name = CASE TG_TABLE_NAME
WHEN 'instance' THEN 'instances'
WHEN 'holdings_record' THEN 'holdings'
WHEN 'item' THEN 'items'
END;
SELECT nextval('hrid_' || name || '_seq'), jsonb->name->>'prefix', jsonb->>'commonRetainLeadingZeroes'
INTO STRICT hrid, prefix, zeroes FROM hrid_settings;
IF zeroes IS TRUE THEN
hrid = repeat('0', 11 - length(hrid)) || hrid;
END IF;
NEW.jsonb = jsonb_set(NEW.jsonb, '{hrid}', to_jsonb(concat(prefix, hrid)));
RETURN NEW;
END;
$$ language 'plpgsql';

-- currently only the holding hrid has a trigger; instance and item hrid still use Java code

DROP TRIGGER IF EXISTS hrid_holdings_record ON holdings_record CASCADE;
CREATE TRIGGER hrid_holdings_record BEFORE INSERT ON holdings_record FOR EACH ROW EXECUTE FUNCTION hrid_trigger();
Loading

0 comments on commit e47e432

Please sign in to comment.