diff --git a/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java b/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java index d9cd3aa90ff70..070cc02d16736 100644 --- a/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java +++ b/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java @@ -58,6 +58,7 @@ public class C2ClientConfig { private final String c2RequestCompression; private final String c2AssetDirectory; private final long bootstrapAcknowledgeTimeout; + private final int c2FlowInfoProcessorBulletinLimit; private C2ClientConfig(final Builder builder) { this.c2Url = builder.c2Url; @@ -88,6 +89,7 @@ private C2ClientConfig(final Builder builder) { this.c2RequestCompression = builder.c2RequestCompression; this.c2AssetDirectory = builder.c2AssetDirectory; this.bootstrapAcknowledgeTimeout = builder.bootstrapAcknowledgeTimeout; + this.c2FlowInfoProcessorBulletinLimit = builder.c2FlowInfoProcessorBulletinLimit; } public String getC2Url() { @@ -202,6 +204,10 @@ public long getBootstrapAcknowledgeTimeout() { return bootstrapAcknowledgeTimeout; } + public int getC2FlowInfoProcessorBulletinLimit() { + return c2FlowInfoProcessorBulletinLimit; + } + /** * Builder for client configuration. */ @@ -238,6 +244,7 @@ public static class Builder { private String c2RequestCompression; private String c2AssetDirectory; private long bootstrapAcknowledgeTimeout; + private int c2FlowInfoProcessorBulletinLimit; public Builder c2Url(String c2Url) { this.c2Url = c2Url; @@ -389,6 +396,11 @@ public Builder bootstrapAcknowledgeTimeout(long bootstrapAcknowledgeTimeout) { return this; } + public Builder c2FlowInfoProcessorBulletinLimit(int c2FlowInfoProcessorBulletinLimit) { + this.c2FlowInfoProcessorBulletinLimit = c2FlowInfoProcessorBulletinLimit; + return this; + } + public C2ClientConfig build() { return new C2ClientConfig(this); } diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatFactory.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatFactory.java index 06549b566363a..653d9ba48fa0c 100644 --- a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatFactory.java +++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatFactory.java @@ -35,6 +35,7 @@ import java.net.SocketException; import java.util.Collections; import java.util.Comparator; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -47,6 +48,7 @@ import org.apache.nifi.c2.protocol.api.AgentRepositories; import org.apache.nifi.c2.protocol.api.AgentResourceConsumption; import org.apache.nifi.c2.protocol.api.AgentStatus; +import org.apache.nifi.c2.protocol.api.ProcessorBulletin; import org.apache.nifi.c2.protocol.api.ResourceInfo; import org.apache.nifi.c2.protocol.api.C2Heartbeat; import org.apache.nifi.c2.protocol.api.DeviceInfo; @@ -89,7 +91,7 @@ public C2Heartbeat create(RuntimeInfoWrapper runtimeInfoWrapper) { heartbeat.setAgentInfo(getAgentInfo(runtimeInfoWrapper.getAgentRepositories(), runtimeInfoWrapper.getManifest())); heartbeat.setDeviceInfo(generateDeviceInfo()); - heartbeat.setFlowInfo(getFlowInfo(runtimeInfoWrapper.getQueueStatus())); + heartbeat.setFlowInfo(getFlowInfo(runtimeInfoWrapper.getQueueStatus(), runtimeInfoWrapper.getProcessorBulletins())); heartbeat.setCreated(System.currentTimeMillis()); ResourceInfo resourceInfo = new ResourceInfo(); @@ -99,9 +101,10 @@ public C2Heartbeat create(RuntimeInfoWrapper runtimeInfoWrapper) { return heartbeat; } - private FlowInfo getFlowInfo(Map queueStatus) { + private FlowInfo getFlowInfo(Map queueStatus, List processorBulletins) { FlowInfo flowInfo = new FlowInfo(); flowInfo.setQueues(queueStatus); + flowInfo.setProcessorBulletins(processorBulletins); Optional.ofNullable(flowIdHolder.getFlowId()).ifPresent(flowInfo::setFlowId); return flowInfo; } diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/model/RuntimeInfoWrapper.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/model/RuntimeInfoWrapper.java index c017ac0d116de..b7653ea4248e5 100644 --- a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/model/RuntimeInfoWrapper.java +++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/model/RuntimeInfoWrapper.java @@ -16,20 +16,24 @@ */ package org.apache.nifi.c2.client.service.model; +import java.util.List; import java.util.Map; import org.apache.nifi.c2.protocol.api.AgentRepositories; import org.apache.nifi.c2.protocol.api.FlowQueueStatus; +import org.apache.nifi.c2.protocol.api.ProcessorBulletin; import org.apache.nifi.c2.protocol.component.api.RuntimeManifest; public class RuntimeInfoWrapper { final AgentRepositories repos; final RuntimeManifest manifest; final Map queueStatus; + final List processorBulletins; - public RuntimeInfoWrapper(AgentRepositories repos, RuntimeManifest manifest, Map queueStatus) { + public RuntimeInfoWrapper(AgentRepositories repos, RuntimeManifest manifest, Map queueStatus, List processorBulletins) { this.repos = repos; this.manifest = manifest; this.queueStatus = queueStatus; + this.processorBulletins = processorBulletins; } public AgentRepositories getAgentRepositories() { @@ -43,4 +47,8 @@ public RuntimeManifest getManifest() { public Map getQueueStatus() { return queueStatus; } + + public List getProcessorBulletins() { + return processorBulletins; + } } diff --git a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2HeartbeatFactoryTest.java b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2HeartbeatFactoryTest.java index 4fb20aeb86454..ceaa42d4faa94 100644 --- a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2HeartbeatFactoryTest.java +++ b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2HeartbeatFactoryTest.java @@ -24,9 +24,11 @@ import static org.mockito.Mockito.when; import java.io.File; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.Supplier; @@ -37,6 +39,7 @@ import org.apache.nifi.c2.protocol.api.C2Heartbeat; import org.apache.nifi.c2.protocol.api.FlowQueueStatus; import org.apache.nifi.c2.protocol.api.OperationType; +import org.apache.nifi.c2.protocol.api.ProcessorBulletin; import org.apache.nifi.c2.protocol.api.SupportedOperation; import org.apache.nifi.c2.protocol.api.ResourcesGlobalHash; import org.apache.nifi.c2.protocol.component.api.Bundle; @@ -117,12 +120,14 @@ void testCreatePopulatesFromRuntimeInfoWrapperForFullHeartbeat() { AgentRepositories repos = new AgentRepositories(); RuntimeManifest manifest = createManifest(); Map queueStatus = new HashMap<>(); + List processorBulletins = new ArrayList<>(); - C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(repos, manifest, queueStatus)); + C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(repos, manifest, queueStatus, processorBulletins)); assertEquals(repos, heartbeat.getAgentInfo().getStatus().getRepositories()); assertEquals(manifest, heartbeat.getAgentInfo().getAgentManifest()); assertEquals(queueStatus, heartbeat.getFlowInfo().getQueues()); + assertEquals(processorBulletins, heartbeat.getFlowInfo().getProcessorBulletins()); assertEquals(RESOURCE_HASH, heartbeat.getResourceInfo().getHash()); } @@ -134,12 +139,14 @@ void testCreatePopulatesFromRuntimeInfoWrapperForLightHeartbeat() { AgentRepositories repos = new AgentRepositories(); RuntimeManifest manifest = createManifest(); Map queueStatus = new HashMap<>(); + List processorBulletins = new ArrayList<>(); - C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(repos, manifest, queueStatus)); + C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(repos, manifest, queueStatus, processorBulletins)); assertEquals(repos, heartbeat.getAgentInfo().getStatus().getRepositories()); assertNull(heartbeat.getAgentInfo().getAgentManifest()); assertEquals(queueStatus, heartbeat.getFlowInfo().getQueues()); + assertEquals(processorBulletins, heartbeat.getFlowInfo().getProcessorBulletins()); assertEquals(RESOURCE_HASH, heartbeat.getResourceInfo().getHash()); } @@ -156,7 +163,7 @@ void testAgentManifestHashIsPopulatedInCaseOfRuntimeManifest() { when(manifestHashProvider.calculateManifestHash(manifest.getBundles(), Collections.emptySet())).thenReturn(MANIFEST_HASH); when(resourcesGlobalHashSupplier.get()).thenReturn(createResourcesGlobalHash()); - C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(new AgentRepositories(), manifest, new HashMap<>())); + C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(new AgentRepositories(), manifest, new HashMap<>(), new ArrayList<>())); assertEquals(MANIFEST_HASH, heartbeat.getAgentInfo().getAgentManifestHash()); assertEquals(RESOURCE_HASH, heartbeat.getResourceInfo().getHash()); @@ -172,7 +179,7 @@ void testAgentManifestHashIsPopulatedInCaseOfAgentManifest() { when(manifestHashProvider.calculateManifestHash(manifest.getBundles(), supportedOperations)).thenReturn(MANIFEST_HASH); when(resourcesGlobalHashSupplier.get()).thenReturn(createResourcesGlobalHash()); - C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(new AgentRepositories(), manifest, new HashMap<>())); + C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(new AgentRepositories(), manifest, new HashMap<>(), new ArrayList<>())); assertEquals(MANIFEST_HASH, heartbeat.getAgentInfo().getAgentManifestHash()); assertEquals(RESOURCE_HASH, heartbeat.getResourceInfo().getHash()); diff --git a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/DescribeManifestOperationHandlerTest.java b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/DescribeManifestOperationHandlerTest.java index f4508e7fede65..24bcc0700a532 100644 --- a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/DescribeManifestOperationHandlerTest.java +++ b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/DescribeManifestOperationHandlerTest.java @@ -58,7 +58,7 @@ void testDescribeManifestOperationHandlerCreateSuccess() { void testDescribeManifestOperationHandlerPopulatesAckSuccessfully() { RuntimeManifest manifest = new RuntimeManifest(); manifest.setIdentifier("manifestId"); - RuntimeInfoWrapper runtimeInfoWrapper = new RuntimeInfoWrapper(null, manifest, null); + RuntimeInfoWrapper runtimeInfoWrapper = new RuntimeInfoWrapper(null, manifest, null, null); C2Heartbeat heartbeat = new C2Heartbeat(); AgentInfo agentInfo = new AgentInfo(); diff --git a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/FlowInfo.java b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/FlowInfo.java index b6c9cc93ec1d6..f2f683c1a3390 100644 --- a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/FlowInfo.java +++ b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/FlowInfo.java @@ -20,6 +20,7 @@ import io.swagger.v3.oas.annotations.media.Schema; import java.io.Serializable; +import java.util.List; import java.util.Map; public class FlowInfo implements Serializable { @@ -29,6 +30,7 @@ public class FlowInfo implements Serializable { private FlowUri flowUri; private Map components; private Map queues; + private List processorBulletins; @Schema(description = "A unique identifier of the flow currently deployed on the agent") public String getFlowId() { @@ -66,4 +68,13 @@ public void setQueues(Map queues) { this.queues = queues; } + @Schema(description = "Bulletins of each processors") + public List getProcessorBulletins() { + return processorBulletins; + } + + public void setProcessorBulletins(List processorBulletins) { + this.processorBulletins = processorBulletins; + } + } diff --git a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/ProcessorBulletin.java b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/ProcessorBulletin.java new file mode 100644 index 0000000000000..67e123fbb3192 --- /dev/null +++ b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/ProcessorBulletin.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.c2.protocol.api; + +import io.swagger.v3.oas.annotations.media.Schema; +import java.io.Serializable; +import java.util.Date; + +public class ProcessorBulletin implements Serializable { + + private static final long serialVersionUID = 1L; + + private Date timestamp; + private long id; + private String nodeAddress; + private String level; + private String category; + private String message; + private String groupId; + private String groupName; + private String groupPath; + private String sourceId; + private String sourceName; + private String flowFileUuid; + + @Schema(description = "When this bulletin was generated.") + public Date getTimestamp() { + return timestamp; + } + + public void setTimestamp(Date timestamp) { + this.timestamp = timestamp; + } + + @Schema(description = "The id of the bulletin.") + public long getId() { + return id; + } + + public void setId(long id) { + this.id = id; + } + + @Schema(description = "If clustered, the address of the node from which the bulletin originated.") + public String getNodeAddress() { + return nodeAddress; + } + + public void setNodeAddress(String nodeAddress) { + this.nodeAddress = nodeAddress; + } + + @Schema(description = "The level of the bulletin.") + public String getLevel() { + return level; + } + + public void setLevel(String level) { + this.level = level; + } + + @Schema(description = "The category of this bulletin.") + public String getCategory() { + return category; + } + + public void setCategory(String category) { + this.category = category; + } + + @Schema(description = "The bulletin message.") + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + @Schema(description = "The group id of the source component.") + public String getGroupId() { + return groupId; + } + + public void setGroupId(String groupId) { + this.groupId = groupId; + } + + @Schema(description = "The group name of the source component.") + public String getGroupName() { + return groupName; + } + + public void setGroupName(String groupName) { + this.groupName = groupName; + } + + @Schema(description = "The group path of the source component.") + public String getGroupPath() { + return groupPath; + } + + public void setGroupPath(String groupPath) { + this.groupPath = groupPath; + } + + @Schema(description = "The id of the source component.") + public String getSourceId() { + return sourceId; + } + + public void setSourceId(String sourceId) { + this.sourceId = sourceId; + } + + @Schema(description = "The name of the source component.") + public String getSourceName() { + return sourceName; + } + + public void setSourceName(String sourceName) { + this.sourceName = sourceName; + } + + @Schema(description = "The id of the flow file.") + public String getFlowFileUuid() { + return flowFileUuid; + } + + public void setFlowFileUuid(String flowFileUuid) { + this.flowFileUuid = flowFileUuid; + } +} diff --git a/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/api/MiNiFiProperties.java b/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/api/MiNiFiProperties.java index b346eed20be83..c40d2be198690 100644 --- a/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/api/MiNiFiProperties.java +++ b/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/api/MiNiFiProperties.java @@ -92,6 +92,7 @@ public enum MiNiFiProperties { C2_SECURITY_KEYSTORE_TYPE("c2.security.keystore.type", "JKS", false, false, VALID), C2_REQUEST_COMPRESSION("c2.request.compression", "none", false, true, VALID), C2_BOOTSTRAP_ACKNOWLEDGE_TIMEOUT("c2.bootstrap.acknowledge.timeout", "15 sec", false, true, VALID), + C2_FLOW_INFO_PROCESSOR_BULLETIN_LIMIT("c2.flow.info.processor.bulletin.limit", "1000", false, true, NON_NEGATIVE_INTEGER_VALIDATOR ), NIFI_MINIFI_NOTIFIER_INGESTORS("nifi.minifi.notifier.ingestors", null, false, true, VALID), NIFI_MINIFI_NOTIFIER_INGESTORS_FILE_CONFIG_PATH("nifi.minifi.notifier.ingestors.file.config.path", null, false, true, VALID), NIFI_MINIFI_NOTIFIER_INGESTORS_FILE_POLLING_PERIOD_SECONDS("nifi.minifi.notifier.ingestors.file.polling.period.seconds", null, false, true, NON_NEGATIVE_INTEGER_VALIDATOR), diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java index 677798976ad2a..02366147d51be 100644 --- a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java +++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java @@ -44,6 +44,7 @@ import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_REST_READ_TIMEOUT; import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_REST_URL; import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_REST_URL_ACK; +import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_FLOW_INFO_PROCESSOR_BULLETIN_LIMIT; import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_RUNTIME_MANIFEST_IDENTIFIER; import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_RUNTIME_TYPE; import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_SECURITY_KEYSTORE_LOCATION; @@ -59,11 +60,14 @@ import com.fasterxml.jackson.databind.ObjectMapper; import java.nio.file.Path; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Supplier; + import org.apache.commons.lang3.tuple.Pair; import org.apache.nifi.bootstrap.BootstrapCommunicator; import org.apache.nifi.c2.client.C2ClientConfig; @@ -90,6 +94,7 @@ import org.apache.nifi.c2.protocol.api.AgentRepositories; import org.apache.nifi.c2.protocol.api.AgentRepositoryStatus; import org.apache.nifi.c2.protocol.api.FlowQueueStatus; +import org.apache.nifi.c2.protocol.api.ProcessorBulletin; import org.apache.nifi.c2.serializer.C2JacksonSerializer; import org.apache.nifi.c2.serializer.C2Serializer; import org.apache.nifi.controller.FlowController; @@ -113,6 +118,8 @@ import org.apache.nifi.minifi.commons.service.StandardFlowPropertyEncryptor; import org.apache.nifi.minifi.commons.service.StandardFlowSerDeService; import org.apache.nifi.nar.ExtensionManagerHolder; +import org.apache.nifi.reporting.BulletinQuery; +import org.apache.nifi.reporting.ComponentType; import org.apache.nifi.services.FlowService; import org.apache.nifi.util.NiFiProperties; import org.slf4j.Logger; @@ -173,8 +180,9 @@ public C2NifiClientService(NiFiProperties niFiProperties, FlowController flowCon this.c2OperationManager = new C2OperationManager( client, c2OperationHandlerProvider, heartbeatLock, operationQueueDAO, c2OperationRestartHandler); + Supplier runtimeInfoWrapperSupplier = () -> generateRuntimeInfo(clientConfig.getC2FlowInfoProcessorBulletinLimit()); this.c2HeartbeatManager = new C2HeartbeatManager( - client, heartbeatFactory, heartbeatLock, this::generateRuntimeInfo, c2OperationManager); + client, heartbeatFactory, heartbeatLock, runtimeInfoWrapperSupplier, c2OperationManager); } private C2ClientConfig generateClientConfig(NiFiProperties properties) { @@ -206,6 +214,8 @@ private C2ClientConfig generateClientConfig(NiFiProperties properties) { .c2RestPathHeartbeat(properties.getProperty(C2_REST_PATH_HEARTBEAT.getKey(), C2_REST_PATH_HEARTBEAT.getDefaultValue())) .c2RestPathAcknowledge(properties.getProperty(C2_REST_PATH_ACKNOWLEDGE.getKey(), C2_REST_PATH_ACKNOWLEDGE.getDefaultValue())) .bootstrapAcknowledgeTimeout(durationPropertyInMilliSecs(properties, C2_BOOTSTRAP_ACKNOWLEDGE_TIMEOUT)) + .c2FlowInfoProcessorBulletinLimit(parseInt(properties + .getProperty(C2_FLOW_INFO_PROCESSOR_BULLETIN_LIMIT.getKey(), C2_FLOW_INFO_PROCESSOR_BULLETIN_LIMIT.getDefaultValue()))) .build(); } @@ -231,10 +241,12 @@ private C2OperationHandlerProvider c2OperationHandlerProvider(NiFiProperties niF UpdateConfigurationStrategy updateConfigurationStrategy = new DefaultUpdateConfigurationStrategy(flowController, flowService, new StandardFlowEnrichService(niFiProperties), flowPropertyEncryptor, StandardFlowSerDeService.defaultInstance(), niFiProperties.getProperty(FLOW_CONFIGURATION_FILE)); + Supplier runtimeInfoWrapperSupplier = () -> generateRuntimeInfo( + parseInt(niFiProperties.getProperty(C2_FLOW_INFO_PROCESSOR_BULLETIN_LIMIT.getKey(), C2_FLOW_INFO_PROCESSOR_BULLETIN_LIMIT.getDefaultValue()))); return new C2OperationHandlerProvider(List.of( new UpdateConfigurationOperationHandler(client, flowIdHolder, updateConfigurationStrategy, emptyOperandPropertiesProvider), - new DescribeManifestOperationHandler(heartbeatFactory, this::generateRuntimeInfo, emptyOperandPropertiesProvider), + new DescribeManifestOperationHandler(heartbeatFactory, runtimeInfoWrapperSupplier, emptyOperandPropertiesProvider), TransferDebugOperationHandler.create(client, emptyOperandPropertiesProvider, transferDebugCommandHelper.debugBundleFiles(), transferDebugCommandHelper::excludeSensitiveText), UpdateAssetOperationHandler.create(client, emptyOperandPropertiesProvider, @@ -271,10 +283,10 @@ public void stop() { } } - private synchronized RuntimeInfoWrapper generateRuntimeInfo() { + private synchronized RuntimeInfoWrapper generateRuntimeInfo(int processorBulletinLimit) { AgentManifest agentManifest = new AgentManifest(runtimeManifestService.getManifest()); agentManifest.setSupportedOperations(supportedOperationsProvider.getSupportedOperations()); - return new RuntimeInfoWrapper(getAgentRepositories(), agentManifest, getQueueStatus()); + return new RuntimeInfoWrapper(getAgentRepositories(), agentManifest, getQueueStatus(), getBulletins(processorBulletinLimit)); } private AgentRepositories getAgentRepositories() { @@ -321,4 +333,38 @@ private Map getQueueStatus() { }) .collect(toMap(Pair::getKey, Pair::getValue)); } + + private List getBulletins(int processorBulletinLimit) { + if (processorBulletinLimit > 0) { + String groupId = flowController.getEventAccess() + .getGroupStatus(ROOT_GROUP_ID) + .getId(); + BulletinQuery query = new BulletinQuery.Builder() + .sourceType(ComponentType.PROCESSOR) + .groupIdMatches(groupId) + .limit(processorBulletinLimit) + .build(); + + return flowController.getBulletinRepository() + .findBulletins(query) + .stream() + .map(bulletin -> { + ProcessorBulletin processorBulletin = new ProcessorBulletin(); + processorBulletin.setCategory(bulletin.getCategory()); + processorBulletin.setFlowFileUuid(bulletin.getFlowFileUuid()); + processorBulletin.setGroupId(bulletin.getGroupId()); + processorBulletin.setGroupName(bulletin.getGroupName()); + processorBulletin.setGroupPath(bulletin.getGroupPath()); + processorBulletin.setId(bulletin.getId()); + processorBulletin.setLevel(bulletin.getLevel()); + processorBulletin.setMessage(bulletin.getMessage()); + processorBulletin.setNodeAddress(bulletin.getNodeAddress()); + processorBulletin.setSourceId(bulletin.getSourceId()); + processorBulletin.setSourceName(bulletin.getSourceName()); + processorBulletin.setTimestamp(bulletin.getTimestamp()); + return processorBulletin; + }).toList(); + } + return new ArrayList<>(); + } } \ No newline at end of file diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf index 61afd6f355718..761bc7bfc0987 100644 --- a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf +++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf @@ -113,7 +113,8 @@ nifi.minifi.sensitive.props.algorithm= #c2.security.keystore.password= #c2.security.keystore.type=JKS #c2.request.compression=none - +# Number of processor bulletins exposed as a part of flow info. +#c2.flow.info.processor.bulletin.limit=0 ### MiNiFi Notifiers For Flow Updates # If C2 is enabled these should be disabled