Skip to content

Commit

Permalink
Merge branch 'master' into MODINVSTOR-1086
Browse files Browse the repository at this point in the history
  • Loading branch information
andrei-razumnov committed Sep 14, 2023
2 parents 0492667 + e4bddfb commit d70939e
Show file tree
Hide file tree
Showing 19 changed files with 990 additions and 81 deletions.
6 changes: 5 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
</dependencyManagement>

<dependencies>
<!-- https://mvnrepository.com/artifact/org.marc4j/marc4j -->
<dependency>
<groupId>org.marc4j</groupId>
<artifactId>marc4j</artifactId>
Expand Down Expand Up @@ -186,6 +185,11 @@
<artifactId>vertx-unit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion ramls/holdingsrecord.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
},
"hrid": {
"type": "string",
"description": "the human readable ID, also called eye readable ID. A system-assigned sequential ID which maps to the Instance ID"
"description": "the human readable ID, also called eye readable ID. Unique. On creation it can be provided, otherwise the system assigns the next sequential ID using the settings of the /hrid-settings-storage/hrid-settings API. An update with a different hrid is rejected. An update without hrid is populated with the holdings record's current hrid."
},
"holdingsTypeId": {
"type": "string",
Expand Down
2 changes: 1 addition & 1 deletion ramls/item.json
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@
},
"holdingsRecord2": {
"type": "object",
"description": "Item associated holdings record object.",
"description": "Fake property for mod-graphql to determine record relationships.",
"folio:$ref": "holdingsrecord.json",
"readonly": true,
"folio:isVirtual": true,
Expand Down
26 changes: 26 additions & 0 deletions src/main/java/org/folio/persist/HoldingsRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@

import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.json.JsonObject;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowSet;
import io.vertx.sqlclient.Tuple;
import java.util.List;
import java.util.Map;
import org.folio.cql2pgjson.CQL2PgJSON;
import org.folio.dbschema.ObjectMapperTool;
import org.folio.rest.jaxrs.model.HoldingsRecord;
import org.folio.rest.persist.cql.CQLWrapper;

Expand All @@ -17,6 +21,28 @@ public HoldingsRepository(Context context, Map<String, String> okapiHeaders) {
super(postgresClient(context, okapiHeaders), HOLDINGS_RECORD_TABLE, HoldingsRecord.class);
}

/**
* Upsert holdings records.
*
* <p>Returns
* { "holdingsRecords": [{"old": {...}, "new": {...}}, ...],
* "items": [{"old": {...}, "new": {...}}, ...]
* }
* providing old and new jsonb content of all updated holdings and items. For a newly
* inserted holding only "new" is provided.
*
* @param holdingsRecords records to insert or update (upsert)
*/
public Future<JsonObject> upsert(List<HoldingsRecord> holdingsRecords) {
try {
var array = ObjectMapperTool.getMapper().writeValueAsString(holdingsRecords);
return postgresClient.selectSingle("SELECT upsert_holdings($1::text::jsonb)", Tuple.of(array))
.map(row -> row.getJsonObject(0));
} catch (Exception e) {
return Future.failedFuture(e);
}
}

/**
* Delete by CQL. For each deleted record return a {@link Row} with the instance id String
* and with the holdings' jsonb String.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.folio.rest.exceptions.NotFoundException;
import org.folio.rest.exceptions.ValidationException;
import org.folio.rest.jaxrs.model.Errors;
import org.folio.rest.persist.PgExceptionFacade;
import org.folio.rest.persist.PgExceptionUtil;
import org.folio.rest.persist.cql.CQLQueryValidationException;
import org.folio.rest.tools.client.exceptions.ResponseException;
Expand Down Expand Up @@ -68,6 +69,10 @@ public static Response failureResponse(Throwable error) {
} else if (error instanceof ResponseException) {
return ((ResponseException) error).getResponse();
}
var sqlState = new PgExceptionFacade(error).getSqlState();
if ("239HR".equals(sqlState)) { // Cannot change hrid of holdings record
return textPlainResponse(400, error);
}
String message = PgExceptionUtil.badRequestMessage(error);
if (message != null) {
return textPlainResponse(400, message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.json.JsonObject;
import java.util.Collection;
import java.util.List;
import javax.ws.rs.core.Response;
Expand Down Expand Up @@ -92,6 +93,13 @@ public Future<Void> publishAllRemoved() {
return domainEventService.publishAllRecordsRemoved();
}

public Future<Void> publishUpserted(String instanceId, JsonObject oldRecord, JsonObject newRecord) {
if (oldRecord == null) {
return domainEventService.publishRecordCreated(instanceId, newRecord.encode());
}
return domainEventService.publishRecordUpdated(instanceId, oldRecord.encode(), newRecord.encode());
}

public Handler<Response> publishUpdated(D oldRecord) {
return response -> {
if (!isUpdateSuccessResponse(response)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ Future<Void> publishRecordUpdated(String instanceId, T oldRecord, T newRecord) {
return publish(instanceId, domainEvent);
}

Future<Void> publishRecordUpdated(String instanceId, String oldRecord, String newRecord) {
var domainEvent = DomainEventRaw.updateEvent(oldRecord, newRecord, tenantId(okapiHeaders));
return publish(instanceId, domainEvent);
}

Future<Void> publishRecordsUpdated(Collection<Triple<String, T, T>> updatedRecords) {
if (updatedRecords.isEmpty()) {
return succeededFuture();
Expand All @@ -133,6 +138,11 @@ Future<Void> publishRecordCreated(String instanceId, T newRecord) {
return publish(instanceId, domainEvent);
}

Future<Void> publishRecordCreated(String id, String newRecord) {
var domainEvent = DomainEventRaw.createEvent(newRecord, tenantId(okapiHeaders));
return publish(id, domainEvent);
}

Future<Void> publishRecordsCreated(List<Pair<String, T>> records) {
if (records.isEmpty()) {
return succeededFuture();
Expand Down
115 changes: 62 additions & 53 deletions src/main/java/org/folio/services/holding/HoldingsService.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.folio.services.holding;

import static io.vertx.core.Promise.promise;
import static org.apache.logging.log4j.LogManager.getLogger;
import static org.folio.rest.impl.HoldingsStorageApi.HOLDINGS_RECORD_TABLE;
import static org.folio.rest.impl.StorageHelper.MAX_ENTITIES;
import static org.folio.rest.jaxrs.resource.HoldingsStorage.DeleteHoldingsStorageHoldingsByHoldingsRecordIdResponse;
Expand All @@ -14,39 +13,34 @@
import static org.folio.rest.persist.PgUtil.postSync;
import static org.folio.rest.persist.PgUtil.postgresClient;
import static org.folio.services.batch.BatchOperationContextFactory.buildBatchOperationContext;
import static org.folio.validator.HridValidators.refuseWhenHridChanged;

import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.json.JsonObject;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.Logger;
import org.folio.persist.HoldingsRepository;
import org.folio.rest.jaxrs.model.HoldingsRecord;
import org.folio.rest.jaxrs.model.Item;
import org.folio.rest.persist.PostgresClient;
import org.folio.rest.persist.SQLConnection;
import org.folio.rest.support.CqlQuery;
import org.folio.rest.support.EndpointFailureHandler;
import org.folio.rest.support.HridManager;
import org.folio.rest.tools.utils.OptimisticLockingUtil;
import org.folio.services.domainevent.HoldingDomainEventPublisher;
import org.folio.services.domainevent.ItemDomainEventPublisher;
import org.folio.services.item.ItemService;
import org.folio.validator.CommonValidators;
import org.folio.validator.NotesValidators;

public class HoldingsService {
private static final Logger log = getLogger(HoldingsService.class);

private static final String INSTANCE_ID = "instanceId";
private final Context vertxContext;
private final Map<String, String> okapiHeaders;
private final PostgresClient postgresClient;
private final HridManager hridManager;
private final ItemService itemService;
private final HoldingsRepository holdingsRepository;
private final ItemDomainEventPublisher itemEventService;
private final HoldingDomainEventPublisher domainEventPublisher;
Expand All @@ -55,7 +49,6 @@ public HoldingsService(Context context, Map<String, String> okapiHeaders) {
this.vertxContext = context;
this.okapiHeaders = okapiHeaders;

itemService = new ItemService(context, okapiHeaders);
postgresClient = postgresClient(context, okapiHeaders);
hridManager = new HridManager(postgresClient);
holdingsRepository = new HoldingsRepository(context, okapiHeaders);
Expand All @@ -77,7 +70,7 @@ public Future<Response> updateHoldingRecord(String holdingId, HoldingsRecord hol
return holdingsRepository.getById(holdingId)
.compose(existingHoldingsRecord -> {
if (holdingsRecordFound(existingHoldingsRecord)) {
return updateHolding(existingHoldingsRecord, holdingsRecord);
return updateHolding(holdingsRecord);
} else {
return createHolding(holdingsRecord);
}
Expand Down Expand Up @@ -134,6 +127,10 @@ public Future<Response> deleteHoldings(String cql) {
}

public Future<Response> createHoldings(List<HoldingsRecord> holdings, boolean upsert, boolean optimisticLocking) {
if (upsert) {
return upsertHoldings(holdings, optimisticLocking);
}

for (HoldingsRecord holdingsRecord : holdings) {
holdingsRecord.setEffectiveLocationId(calculateEffectiveLocation(holdingsRecord));
}
Expand All @@ -147,28 +144,65 @@ public Future<Response> createHoldings(List<HoldingsRecord> holdings, boolean up
.onSuccess(domainEventPublisher.publishCreatedOrUpdated(batchOperation)));
}

private Future<Response> updateHolding(HoldingsRecord oldHoldings, HoldingsRecord newHoldings) {

public Future<Response> upsertHoldings(List<HoldingsRecord> holdings, boolean optimisticLocking) {
try {
for (HoldingsRecord holdingsRecord : holdings) {
holdingsRecord.setEffectiveLocationId(calculateEffectiveLocation(holdingsRecord));
}

if (optimisticLocking) {
OptimisticLockingUtil.unsetVersionIfMinusOne(holdings);
} else {
if (! OptimisticLockingUtil.isSuppressingOptimisticLockingAllowed()) {
return Future.succeededFuture(Response.status(413).entity(
"DB_ALLOW_SUPPRESS_OPTIMISTIC_LOCKING environment variable doesn't allow "
+ "to disable optimistic locking").build());
}
OptimisticLockingUtil.setVersionToMinusOne(holdings);
}

return NotesValidators.refuseHoldingLongNotes(holdings)
.compose(x -> holdingsRepository.upsert(holdings))
.onSuccess(this::publishEvents)
.map(Response.status(201).build())
.otherwise(EndpointFailureHandler::failureResponse);
} catch (ReflectiveOperationException e) {
throw new SecurityException(e);
}
}

private Future<Response> updateHolding(HoldingsRecord newHoldings) {
newHoldings.setEffectiveLocationId(calculateEffectiveLocation(newHoldings));

if (Integer.valueOf(-1).equals(newHoldings.getVersion())) {
newHoldings.setVersion(null); // enforce optimistic locking
}

return refuseWhenHridChanged(oldHoldings, newHoldings)
.compose(notUsed -> NotesValidators.refuseLongNotes(newHoldings))
.compose(notUsed -> {
final Promise<List<Item>> overallResult = promise();

postgresClient.startTx(
connection -> holdingsRepository.update(connection, oldHoldings.getId(), newHoldings)
.compose(updateRes -> itemService.updateItemsOnHoldingChanged(connection, newHoldings))
.onComplete(handleTransaction(connection, overallResult)));
return NotesValidators.refuseLongNotes(newHoldings)
.compose(x -> holdingsRepository.upsert(List.of(newHoldings)))
.onSuccess(this::publishEvents)
.map(x -> PutHoldingsStorageHoldingsByHoldingsRecordIdResponse.respond204());
}

return overallResult.future()
.compose(itemsBeforeUpdate -> itemEventService.publishUpdated(oldHoldings, newHoldings, itemsBeforeUpdate))
.<Response>map(res -> PutHoldingsStorageHoldingsByHoldingsRecordIdResponse.respond204())
.onSuccess(domainEventPublisher.publishUpdated(oldHoldings));
});
private void publishEvents(JsonObject holdingsItems) {
Map<String, String> instanceIdByHoldingsRecordId = new HashMap<>();
holdingsItems.getJsonArray("holdingsRecords").forEach(o -> {
var oldHolding = ((JsonObject) o).getJsonObject("old");
var newHolding = ((JsonObject) o).getJsonObject("new");
var instanceId = newHolding.getString(INSTANCE_ID);
var holdingId = newHolding.getString("id");
instanceIdByHoldingsRecordId.put(holdingId, instanceId);
domainEventPublisher.publishUpserted(instanceId, oldHolding, newHolding);
});
holdingsItems.getJsonArray("items").forEach(o -> {
var oldItem = ((JsonObject) o).getJsonObject("old");
var newItem = ((JsonObject) o).getJsonObject("new");
var instanceId = instanceIdByHoldingsRecordId.get(newItem.getString("holdingsRecordId"));
oldItem.put(INSTANCE_ID, instanceId);
newItem.put(INSTANCE_ID, instanceId);
itemEventService.publishUpserted(instanceId, oldItem, newItem);
});
}

private String calculateEffectiveLocation(HoldingsRecord holdingsRecord) {
Expand All @@ -182,31 +216,6 @@ private String calculateEffectiveLocation(HoldingsRecord holdingsRecord) {
}
}

private <T> Handler<AsyncResult<T>> handleTransaction(
AsyncResult<SQLConnection> connection, Promise<T> overallResult) {

return transactionResult -> {
if (transactionResult.succeeded()) {
postgresClient.endTx(connection, commitResult -> {
if (commitResult.succeeded()) {
overallResult.complete(transactionResult.result());
} else {
log.error("Unable to commit transaction", commitResult.cause());
overallResult.fail(commitResult.cause());
}
});
} else {
log.error("Reverting transaction");
postgresClient.rollbackTx(connection, revertResult -> {
if (revertResult.failed()) {
log.error("Unable to revert transaction", revertResult.cause());
}
overallResult.fail(transactionResult.cause());
});
}
};
}

private boolean holdingsRecordFound(HoldingsRecord holdingsRecord) {
return holdingsRecord != null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
-- Fill in the next hrid from the sequence when the instance/holding/item
-- doesn't have a hrid.
CREATE OR REPLACE FUNCTION hrid_trigger() RETURNS TRIGGER
AS $$
DECLARE
name TEXT;
hrid TEXT;
prefix TEXT;
zeroes BOOLEAN;
BEGIN
IF NEW.jsonb->'hrid' IS NOT NULL THEN
RETURN NEW;
END IF;
name = CASE TG_TABLE_NAME
WHEN 'instance' THEN 'instances'
WHEN 'holdings_record' THEN 'holdings'
WHEN 'item' THEN 'items'
END;
SELECT nextval('hrid_' || name || '_seq'), jsonb->name->>'prefix', jsonb->>'commonRetainLeadingZeroes'
INTO STRICT hrid, prefix, zeroes FROM hrid_settings;
IF zeroes IS TRUE THEN
hrid = repeat('0', 11 - length(hrid)) || hrid;
END IF;
NEW.jsonb = jsonb_set(NEW.jsonb, '{hrid}', to_jsonb(concat(prefix, hrid)));
RETURN NEW;
END;
$$ language 'plpgsql';

-- currently only the holding hrid has a trigger; instance and item hrid still use Java code

DROP TRIGGER IF EXISTS hrid_holdings_record ON holdings_record CASCADE;
CREATE TRIGGER hrid_holdings_record BEFORE INSERT ON holdings_record FOR EACH ROW EXECUTE FUNCTION hrid_trigger();
Loading

0 comments on commit d70939e

Please sign in to comment.