Skip to content

Commit

Permalink
Merge pull request #910 from Cofinity-X/chore/upstream-contribution
Browse files Browse the repository at this point in the history
feat: efficient orchestration of edc negotiations https://github.com/…
  • Loading branch information
ds-lcapellino authored Jan 14, 2025
2 parents 3fa6e38 + 53c1eaa commit 935b7cd
Show file tree
Hide file tree
Showing 27 changed files with 1,498 additions and 426 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ _**For better traceability add the corresponding GitHub issue number in each cha

- Resolve Null pointer exception while registering the company #888
- Added the discovery type configurable, with a default value of bpnl in (ConnectorEndpointsService) (#12)
- Changed orchestration of EDC negotations to be more efficient https://github.com/eclipse-tractusx/sig-release/issues/931

### Removed
- Removed subjectId from AssetAdministrationShellDescriptor object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ data:
cacheTTL: {{ .Values.edc.discoveryFinderClient.cacheTTL | quote }}
connectorEndpointService:
cacheTTL: {{ .Values.edc.connectorEndpointService.cacheTTL | quote }}
orchestration:
thread-pool-size: {{ .Values.edc.orchestration.threadPoolSize | quote }}
ess:
localBpn: {{ tpl (.Values.bpn | default "") . | quote }}
localEdcEndpoint: {{ tpl (.Values.ess.edc.host | default "") . | quote }}
Expand Down
2 changes: 2 additions & 0 deletions charts/item-relationship-service/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ edc:
cacheTTL: PT24H # Time to live for DiscoveryFinderClient for findDiscoveryEndpoints method cache
connectorEndpointService:
cacheTTL: PT24H # Time to live for ConnectorEndpointService for fetchConnectorEndpoints method cache
orchestration:
threadPoolSize: 5 # Thread pool size for maximum parallel negotiations

ess:
edc:
Expand Down
25 changes: 25 additions & 0 deletions docs/src/uml-diagrams/runtime-view/edc-flow-orchestrated-ddtr.puml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
@startuml
skinparam monochrome true
skinparam shadowing false
skinparam defaultFontName "Architects daughter"


autonumber "<b>[000]"

participant "EdcSubmodelClient" as ESC
participant "NegotiationOrchestrator" as NO
participant "EDCCatalogFacade" as ECF

ESC -> NO: get ContractOffers for type DTR
NO -> ECF: get ContractOffers for type DTR
NO <-- ECF: List<ContractOffer>
ESC <-- NO: List<ContractOffer>

loop for each ContractOffer
ESC -> NO: negotiate EndpointDataReference(ContractOffer)
ESC <-- NO: Future<EndpointDataReference>
end loop

ESC -> ESC: wait for completion of Futures

@enduml
46 changes: 46 additions & 0 deletions docs/src/uml-diagrams/runtime-view/edc-flow-orchestrated.puml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
@startuml
skinparam monochrome true
skinparam shadowing false
skinparam defaultFontName "Architects daughter"


autonumber "<b>[000]"

participant "EdcSubmodelClient" as ESC
participant "NegotiationOrchestrator" as NO
participant "EndpointDataReferenceCacheService" as EDRCache
participant "EDCCatalogFacade" as ECF
participant "ContractNegotiationService" as CNS

ESC -> NO: get ContractOffer for assetId
NO -> ECF: get ContractOffer for assetId
note left
parallel catalog requests
can be limited by facilitating an
ExecutorService with limited threads
end note
NO <-- ECF: ContractOffer
ESC <-- NO: ContractOffer
ESC -> NO: negotiate EndpointDataReference(ContractOffer)

NO -> EDRCache: get EndpointDataReference
alt cache contains valid entry for assetId
NO <-- EDRCache: EndpointDataReference
else no valid entry for assetId in cache
alt ongoing negotiation for asseId
NO -> NO: EndpointDataReference
else no ongoing negotiation for asseId
NO -> CNS: start contract negotiation
note left
parallel negotiations
can be limited by facilitating an
ExecutorService with limited threads
end note
NO <-- CNS: EndpointDataReference
end alt
end alt

ESC <-- NO: Future<EndpointDataReference>
ESC -> ESC: wait for completion of Future

@enduml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.time.Clock;
import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

Expand All @@ -48,16 +49,13 @@
import org.eclipse.tractusx.irs.connector.job.JobStore;
import org.eclipse.tractusx.irs.connector.job.JobTTL;
import org.eclipse.tractusx.irs.data.CxTestDataContainer;
import org.eclipse.tractusx.irs.edc.client.AsyncPollingService;
import org.eclipse.tractusx.irs.edc.client.ContractNegotiationService;
import org.eclipse.tractusx.irs.edc.client.EDCCatalogFacade;
import org.eclipse.tractusx.irs.edc.client.EdcConfiguration;
import org.eclipse.tractusx.irs.edc.client.EdcDataPlaneClient;
import org.eclipse.tractusx.irs.edc.client.EdcOrchestrator;
import org.eclipse.tractusx.irs.edc.client.EdcSubmodelClient;
import org.eclipse.tractusx.irs.edc.client.EdcSubmodelClientImpl;
import org.eclipse.tractusx.irs.edc.client.EdcSubmodelClientLocalStub;
import org.eclipse.tractusx.irs.edc.client.EdcSubmodelFacade;
import org.eclipse.tractusx.irs.edc.client.cache.endpointdatareference.EndpointDataReferenceCacheService;
import org.eclipse.tractusx.irs.registryclient.DigitalTwinRegistryService;
import org.eclipse.tractusx.irs.registryclient.central.DigitalTwinRegistryClient;
import org.eclipse.tractusx.irs.registryclient.central.DigitalTwinRegistryClientLocalStub;
Expand Down Expand Up @@ -113,6 +111,11 @@ public ScheduledExecutorService scheduledExecutorService() {
return Executors.newScheduledThreadPool(EXECUTOR_CORE_POOL_SIZE);
}

@Bean
public ExecutorService fixedThreadPoolExecutorService(@Value("${irs-edc-client.controlplane.orchestration.thread-pool-size:}") final int threadPoolSize) {
return Executors.newFixedThreadPool(threadPoolSize);
}

@Bean
public Clock clock() {
return Clock.systemUTC();
Expand Down Expand Up @@ -174,12 +177,8 @@ public EdcSubmodelClient edcLocalSubmodelClient(final CxTestDataContainer cxTest

@Profile({ "!local && !stubtest" })
@Bean
public EdcSubmodelClient edcSubmodelClient(final EdcConfiguration edcConfiguration,
final ContractNegotiationService contractNegotiationService, final EdcDataPlaneClient edcDataPlaneClient,
final AsyncPollingService pollingService, final RetryRegistry retryRegistry,
final EDCCatalogFacade catalogFacade,
final EndpointDataReferenceCacheService endpointDataReferenceCacheService) {
return new EdcSubmodelClientImpl(edcConfiguration, contractNegotiationService, edcDataPlaneClient,
pollingService, retryRegistry, catalogFacade, endpointDataReferenceCacheService);
public EdcSubmodelClient edcSubmodelClient(final EdcConfiguration edcConfiguration, final EdcDataPlaneClient edcDataPlaneClient,
final EdcOrchestrator edcOrchestrator, final RetryRegistry retryRegistry) {
return new EdcSubmodelClientImpl(edcConfiguration, edcDataPlaneClient, edcOrchestrator, retryRegistry);
}
}
2 changes: 2 additions & 0 deletions irs-api/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ irs-edc-client:
datareference:
storage:
duration: PT1H # Time after which stored data references will be cleaned up, ISO 8601 Duration
orchestration:
thread-pool-size: 5 # Thread pool size for maximum parallel negotiations

submodel:
request-ttl: ${EDC_SUBMODEL_REQUEST_TTL:PT10M} # How long to wait for an async EDC submodel retrieval to finish, ISO 8601 Duration
Expand Down
Loading

0 comments on commit 935b7cd

Please sign in to comment.