Skip to content

Commit

Permalink
feat: Limit jobs executed in parallel eclipse-tractusx#892
Browse files Browse the repository at this point in the history
concept: Efficient orchestration of edc negotations eclipse-tractusx#894
test: Remediation of technical Debts /irs/orders api eclipse-tractusx/sig-release#933
chore: Resolve Null pointer exception while registering the company eclipse-tractusx#888
feat: Build href URL correctly for accessing submodel assets eclipse-tractusx#889
  • Loading branch information
ds-lcapellino committed Jan 14, 2025
1 parent 8533972 commit c01f345
Show file tree
Hide file tree
Showing 42 changed files with 2,349 additions and 509 deletions.
9 changes: 2 additions & 7 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,10 @@ _**For better traceability add the corresponding GitHub issue number in each cha

## [Unreleased]

### Added
- Added api key authentication for edc notification requests

### Changed

- Added the discovery type configurable, with a default value of bpnl in (ConnectorEndpointsService) (#12)

### Removed
- Removed subjectId from AssetAdministrationShellDescriptor object
- Build href URL correctly for accessing submodel assets #889
- Resolve Null pointer exception while registering the company #888

## [5.4.1] - 2024-08-19

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ data:
trustedPort: {{ .Values.service.trustedPort }}
irs:
job:
batch:
threadCount: {{ .Values.job.batch.threadCount }}
scheduled:
threadCount: {{ .Values.job.scheduled.threadCount }}
cached:
threadCount: {{ .Values.job.cached.threadCount }}
apiUrl: {{ tpl (.Values.irsUrl | default "http://localhost") . | quote }}
security:
api:
Expand Down Expand Up @@ -124,6 +131,8 @@ data:
cacheTTL: {{ .Values.edc.discoveryFinderClient.cacheTTL | quote }}
connectorEndpointService:
cacheTTL: {{ .Values.edc.connectorEndpointService.cacheTTL | quote }}
orchestration:
thread-pool-size: {{ .Values.edc.orchestration.threadPoolSize | quote }}
ess:
localBpn: {{ tpl (.Values.bpn | default "") . | quote }}
localEdcEndpoint: {{ tpl (.Values.ess.edc.host | default "") . | quote }}
Expand Down
7 changes: 7 additions & 0 deletions charts/item-relationship-service/values-umbrella.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ resources:
# IRS Configuration #
#####################
irsUrl: http://irs-item-relationship-service:8080
job:
batch:
threadCount: 5
scheduled:
threadCount: 5
cached:
threadCount: 0
bpn: BPNL00000003AZQP
apiKeyAdmin: "password" # <api-key-admin> Admin auth key, Should be changed!
apiKeyRegular: "password" # <api-key-regular> View auth key, Should be changed!
Expand Down
9 changes: 9 additions & 0 deletions charts/item-relationship-service/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,13 @@ readinessProbe:
# IRS Configuration #
#####################
irsUrl: # "https://<irs-url>"
job:
batch:
threadCount: 5
scheduled:
threadCount: 5
cached:
threadCount: 0
bpn: # BPN for this IRS instance; only users with this BPN are allowed to access the API
apiKeyAdmin: "password" # <api-key-admin> Admin auth key, Should be changed!
apiKeyRegular: "password" # <api-key-regular> View auth key, Should be changed!
Expand Down Expand Up @@ -226,6 +233,8 @@ edc:
cacheTTL: PT24H # Time to live for DiscoveryFinderClient for findDiscoveryEndpoints method cache
connectorEndpointService:
cacheTTL: PT24H # Time to live for ConnectorEndpointService for fetchConnectorEndpoints method cache
orchestration:
threadPoolSize: 5 # Thread pool size for maximum parallel negotiations

ess:
edc:
Expand Down
25 changes: 25 additions & 0 deletions docs/src/uml-diagrams/runtime-view/edc-flow-orchestrated-ddtr.puml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
@startuml
skinparam monochrome true
skinparam shadowing false
skinparam defaultFontName "Architects daughter"


autonumber "<b>[000]"

participant "EdcSubmodelClient" as ESC
participant "NegotiationOrchestrator" as NO
participant "EDCCatalogFacade" as ECF

ESC -> NO: get ContractOffers for type DTR
NO -> ECF: get ContractOffers for type DTR
NO <-- ECF: List<ContractOffer>
ESC <-- NO: List<ContractOffer>

loop for each ContractOffer
ESC -> NO: negotiate EndpointDataReference(ContractOffer)
ESC <-- NO: Future<EndpointDataReference>
end loop

ESC -> ESC: wait for completion of Futures

@enduml
46 changes: 46 additions & 0 deletions docs/src/uml-diagrams/runtime-view/edc-flow-orchestrated.puml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
@startuml
skinparam monochrome true
skinparam shadowing false
skinparam defaultFontName "Architects daughter"


autonumber "<b>[000]"

participant "EdcSubmodelClient" as ESC
participant "NegotiationOrchestrator" as NO
participant "EndpointDataReferenceCacheService" as EDRCache
participant "EDCCatalogFacade" as ECF
participant "ContractNegotiationService" as CNS

ESC -> NO: get ContractOffer for assetId
NO -> ECF: get ContractOffer for assetId
note left
parallel catalog requests
can be limited by facilitating an
ExecutorService with limited threads
end note
NO <-- ECF: ContractOffer
ESC <-- NO: ContractOffer
ESC -> NO: negotiate EndpointDataReference(ContractOffer)

NO -> EDRCache: get EndpointDataReference
alt cache contains valid entry for assetId
NO <-- EDRCache: EndpointDataReference
else no valid entry for assetId in cache
alt ongoing negotiation for asseId
NO -> NO: EndpointDataReference
else no ongoing negotiation for asseId
NO -> CNS: start contract negotiation
note left
parallel negotiations
can be limited by facilitating an
ExecutorService with limited threads
end note
NO <-- CNS: EndpointDataReference
end alt
end alt

ESC <-- NO: Future<EndpointDataReference>
ESC -> ESC: wait for completion of Future

@enduml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,12 @@

import java.time.Clock;
import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import io.github.resilience4j.retry.RetryRegistry;
import io.micrometer.core.aop.TimedAspect;
Expand All @@ -48,16 +52,13 @@
import org.eclipse.tractusx.irs.connector.job.JobStore;
import org.eclipse.tractusx.irs.connector.job.JobTTL;
import org.eclipse.tractusx.irs.data.CxTestDataContainer;
import org.eclipse.tractusx.irs.edc.client.AsyncPollingService;
import org.eclipse.tractusx.irs.edc.client.ContractNegotiationService;
import org.eclipse.tractusx.irs.edc.client.EDCCatalogFacade;
import org.eclipse.tractusx.irs.edc.client.EdcConfiguration;
import org.eclipse.tractusx.irs.edc.client.EdcDataPlaneClient;
import org.eclipse.tractusx.irs.edc.client.EdcOrchestrator;
import org.eclipse.tractusx.irs.edc.client.EdcSubmodelClient;
import org.eclipse.tractusx.irs.edc.client.EdcSubmodelClientImpl;
import org.eclipse.tractusx.irs.edc.client.EdcSubmodelClientLocalStub;
import org.eclipse.tractusx.irs.edc.client.EdcSubmodelFacade;
import org.eclipse.tractusx.irs.edc.client.cache.endpointdatareference.EndpointDataReferenceCacheService;
import org.eclipse.tractusx.irs.registryclient.DigitalTwinRegistryService;
import org.eclipse.tractusx.irs.registryclient.central.DigitalTwinRegistryClient;
import org.eclipse.tractusx.irs.registryclient.central.DigitalTwinRegistryClientLocalStub;
Expand All @@ -82,7 +83,6 @@
})
public class JobConfiguration {
public static final String JOB_BLOB_PERSISTENCE = "JobPersistence";
public static final int EXECUTOR_CORE_POOL_SIZE = 5;
private static final Integer EXPIRE_AFTER_DAYS = 7;

@Bean
Expand All @@ -94,12 +94,16 @@ public OutboundMeterRegistryService outboundMeterRegistryService(final MeterRegi
@Bean
public JobOrchestrator<ItemDataRequest, AASTransferProcess> jobOrchestrator(
final DigitalTwinDelegate digitalTwinDelegate,
@Qualifier(JOB_BLOB_PERSISTENCE) final BlobPersistence blobStore, final JobStore jobStore,
final MeterRegistryService meterService, final ApplicationEventPublisher applicationEventPublisher,
@Qualifier(JOB_BLOB_PERSISTENCE) final BlobPersistence blobStore,
final JobStore jobStore,
final MeterRegistryService meterService,
final ApplicationEventPublisher applicationEventPublisher,
@Value("${irs.job.jobstore.ttl.failed:}") final Duration ttlFailedJobs,
@Value("${irs.job.jobstore.ttl.completed:}") final Duration ttlCompletedJobs, final JsonUtil jsonUtil) {
@Value("${irs.job.jobstore.ttl.completed:}") final Duration ttlCompletedJobs,
final JsonUtil jsonUtil,
@Value("${irs.job.cached.threadCount}") final int threadCount) {

final var manager = new AASTransferProcessManager(digitalTwinDelegate, Executors.newCachedThreadPool(),
final var manager = new AASTransferProcessManager(digitalTwinDelegate, cachedExecutorService(threadCount),
blobStore, jsonUtil);
final var logic = new TreeRecursiveLogic(blobStore, jsonUtil, new ItemTreesAssembler());
final var handler = new AASRecursiveJobHandler(logic);
Expand All @@ -109,8 +113,24 @@ public JobOrchestrator<ItemDataRequest, AASTransferProcess> jobOrchestrator(
}

@Bean
public ScheduledExecutorService scheduledExecutorService() {
return Executors.newScheduledThreadPool(EXECUTOR_CORE_POOL_SIZE);
public ExecutorService cachedExecutorService(@Value("${irs.job.cached.threadCount}") final int threadCount) {
final long keepAliveTime = 60L;

return new ThreadPoolExecutor(
threadCount,
Integer.MAX_VALUE,
keepAliveTime, TimeUnit.SECONDS,
new SynchronousQueue<>());
}

@Bean
public ScheduledExecutorService scheduledExecutorService(@Value("${irs.job.scheduled.threadCount}") final int threadCount) {
return Executors.newScheduledThreadPool(threadCount);
}

@Bean
public ExecutorService fixedThreadPoolExecutorService(@Value("${irs-edc-client.controlplane.orchestration.thread-pool-size:}") final int threadPoolSize) {
return Executors.newFixedThreadPool(threadPoolSize);
}

@Bean
Expand Down Expand Up @@ -174,12 +194,8 @@ public EdcSubmodelClient edcLocalSubmodelClient(final CxTestDataContainer cxTest

@Profile({ "!local && !stubtest" })
@Bean
public EdcSubmodelClient edcSubmodelClient(final EdcConfiguration edcConfiguration,
final ContractNegotiationService contractNegotiationService, final EdcDataPlaneClient edcDataPlaneClient,
final AsyncPollingService pollingService, final RetryRegistry retryRegistry,
final EDCCatalogFacade catalogFacade,
final EndpointDataReferenceCacheService endpointDataReferenceCacheService) {
return new EdcSubmodelClientImpl(edcConfiguration, contractNegotiationService, edcDataPlaneClient,
pollingService, retryRegistry, catalogFacade, endpointDataReferenceCacheService);
public EdcSubmodelClient edcSubmodelClient(final EdcConfiguration edcConfiguration, final EdcDataPlaneClient edcDataPlaneClient,
final EdcOrchestrator edcOrchestrator, final RetryRegistry retryRegistry) {
return new EdcSubmodelClientImpl(edcConfiguration, edcDataPlaneClient, edcOrchestrator, retryRegistry);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ private Optional<List<AspectModel>> readAllFromSemanticHub() {
log.info("Got response from semantic hub '{}'", semanticHubPage.toString());
aspectModelsCollection.addAll(
semanticHubPage.orElseThrow().toPageImpl(config.getPageSize()).getContent());
} while (semanticHubPage.isPresent() && semanticHubPage.get().toPageImpl(config.getPageSize()).hasNext());
} while (semanticHubPage.get().toPageImpl(config.getPageSize()).hasNext());

return Optional.of(aspectModelsCollection);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@

import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -64,6 +67,7 @@ public class BatchOrderEventListener {
private final EssService essService;
private final ApplicationEventPublisher applicationEventPublisher;
private final TimeoutSchedulerBatchProcessingService timeoutScheduler;
private final ExecutorCompletionServiceFactory executorCompletionServiceFactory;

@Async
@EventListener
Expand Down Expand Up @@ -112,35 +116,49 @@ public void handleBatchProcessingFinishedEvent(final BatchProcessingFinishedEven
}

private void startBatch(final BatchOrder batchOrder, final Batch batch) {
final ExecutorCompletionService<JobProgress> executorCompletionService = executorCompletionServiceFactory.create();

final List<PartChainIdentificationKey> keyStream = batch.getJobProgressList()
.stream()
.map(JobProgress::getIdentificationKey)
.toList();
if (batchOrder.getJobType().equals(BatchOrder.JobType.REGULAR)) {
batch.setJobProgressList(keyStream.stream()
.map(identificationKey -> createRegisterJob(batchOrder, identificationKey))
.map(registerJob -> createJobProgress(
irsItemGraphQueryService.registerItemJob(registerJob,
batch.getBatchId()),
registerJob.getKey()))
.toList());
} else if (batchOrder.getJobType().equals(BatchOrder.JobType.ESS)) {
batch.setJobProgressList(keyStream.stream()
.map(identificationKey -> createRegisterBpnInvestigationBatchOrder(
batchOrder, identificationKey))
.map(registerJob -> createJobProgress(
essService.startIrsJob(registerJob, batch.getBatchId()),
registerJob.getKey()))
.toList());
}
.stream()
.map(JobProgress::getIdentificationKey)
.toList();

keyStream.forEach(identificationKey -> executorCompletionService
.submit(() -> getJobProgress(batchOrder, batch, identificationKey)));

final List<JobProgress> jobProgressList = new ArrayList<>();

keyStream.forEach(key -> {
try {
jobProgressList.add(executorCompletionService.take().get());
} catch (ExecutionException e) {
log.error("Job execution for global asset id: {} failed: {}", key.getGlobalAssetId(), e.getCause().getMessage());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});

batch.setJobProgressList(jobProgressList);
batch.setStartedOn(ZonedDateTime.now(ZoneOffset.UTC));
batchStore.save(batch.getBatchId(), batch);
timeoutScheduler.registerBatchTimeout(batch.getBatchId(), batchOrder.getTimeout());
timeoutScheduler.registerJobsTimeout(batch.getJobProgressList().stream().map(JobProgress::getJobId).toList(),
batchOrder.getJobTimeout());
}

private JobProgress getJobProgress(final BatchOrder batchOrder, final Batch batch,
final PartChainIdentificationKey identificationKey) {
if (BatchOrder.JobType.REGULAR.equals(batchOrder.getJobType())) {
final var registerJob = createRegisterJob(batchOrder, identificationKey);
return createJobProgress(irsItemGraphQueryService.registerItemJob(registerJob, batch.getBatchId()),
registerJob.getKey());
} else if (BatchOrder.JobType.ESS.equals(batchOrder.getJobType())) {
final var registerJob = createRegisterBpnInvestigationBatchOrder(batchOrder, identificationKey);
return createJobProgress(essService.startIrsJob(registerJob, batch.getBatchId()), registerJob.getKey());
}
throw new IllegalArgumentException("Unsupported job type: " + batchOrder.getJobType());
}

private JobProgress createJobProgress(final JobHandle jobHandle, final PartChainIdentificationKey identificationKey) {
return JobProgress.builder()
.jobId(jobHandle.getId())
Expand Down Expand Up @@ -174,15 +192,17 @@ private RegisterBpnInvestigationJob createRegisterBpnInvestigationBatchOrder(fin
private ProcessingState calculateBatchOrderState(final List<ProcessingState> stateList) {
if (stateList.stream().anyMatch(ProcessingState.PROCESSING::equals)) {
return ProcessingState.PROCESSING;
} else if (stateList.stream().anyMatch(ProcessingState.ERROR::equals)) {
}
if (stateList.stream().anyMatch(ProcessingState.ERROR::equals)) {
return ProcessingState.ERROR;
} else if (stateList.stream().anyMatch(ProcessingState.PARTIAL::equals)) {
}
if (stateList.stream().anyMatch(ProcessingState.PARTIAL::equals)) {
return ProcessingState.PARTIAL;
} else if (stateList.stream().allMatch(ProcessingState.COMPLETED::equals)) {
}
if (stateList.stream().allMatch(ProcessingState.COMPLETED::equals)) {
return ProcessingState.COMPLETED;
} else {
return ProcessingState.PARTIAL;
}
return ProcessingState.PARTIAL;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.tractusx.irs.IrsApplication;
import org.eclipse.tractusx.irs.common.auth.SecurityHelperService;
import org.eclipse.tractusx.irs.component.PartChainIdentificationKey;
import org.eclipse.tractusx.irs.component.RegisterBatchOrder;
import org.eclipse.tractusx.irs.component.RegisterBpnInvestigationBatchOrder;
Expand Down Expand Up @@ -60,7 +59,6 @@ public class CreationBatchService {
private final BatchStore batchStore;
private final ApplicationEventPublisher applicationEventPublisher;
private final JobEventLinkedQueueListener jobEventLinkedQueueListener;
private final SecurityHelperService securityHelperService;
private final IrsConfiguration irsConfiguration;

public UUID create(final RegisterBatchOrder request) {
Expand Down
Loading

0 comments on commit c01f345

Please sign in to comment.