Skip to content

Commit

Permalink
NIFI-13635 Expose processor bulletins as a part of FlowInfo
Browse files Browse the repository at this point in the history
Signed-off-by: Ferenc Erdei <[email protected]>
This closes #9161.
  • Loading branch information
pkedvessy authored and ferencerdei committed Aug 21, 2024
1 parent 31e5afc commit 83b701a
Show file tree
Hide file tree
Showing 10 changed files with 250 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public class C2ClientConfig {
private final String c2RequestCompression;
private final String c2AssetDirectory;
private final long bootstrapAcknowledgeTimeout;
private final int c2FlowInfoProcessorBulletinLimit;

private C2ClientConfig(final Builder builder) {
this.c2Url = builder.c2Url;
Expand Down Expand Up @@ -88,6 +89,7 @@ private C2ClientConfig(final Builder builder) {
this.c2RequestCompression = builder.c2RequestCompression;
this.c2AssetDirectory = builder.c2AssetDirectory;
this.bootstrapAcknowledgeTimeout = builder.bootstrapAcknowledgeTimeout;
this.c2FlowInfoProcessorBulletinLimit = builder.c2FlowInfoProcessorBulletinLimit;
}

public String getC2Url() {
Expand Down Expand Up @@ -202,6 +204,10 @@ public long getBootstrapAcknowledgeTimeout() {
return bootstrapAcknowledgeTimeout;
}

public int getC2FlowInfoProcessorBulletinLimit() {
return c2FlowInfoProcessorBulletinLimit;
}

/**
* Builder for client configuration.
*/
Expand Down Expand Up @@ -238,6 +244,7 @@ public static class Builder {
private String c2RequestCompression;
private String c2AssetDirectory;
private long bootstrapAcknowledgeTimeout;
private int c2FlowInfoProcessorBulletinLimit;

public Builder c2Url(String c2Url) {
this.c2Url = c2Url;
Expand Down Expand Up @@ -389,6 +396,11 @@ public Builder bootstrapAcknowledgeTimeout(long bootstrapAcknowledgeTimeout) {
return this;
}

public Builder c2FlowInfoProcessorBulletinLimit(int c2FlowInfoProcessorBulletinLimit) {
this.c2FlowInfoProcessorBulletinLimit = c2FlowInfoProcessorBulletinLimit;
return this;
}

public C2ClientConfig build() {
return new C2ClientConfig(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.net.SocketException;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand All @@ -47,6 +48,7 @@
import org.apache.nifi.c2.protocol.api.AgentRepositories;
import org.apache.nifi.c2.protocol.api.AgentResourceConsumption;
import org.apache.nifi.c2.protocol.api.AgentStatus;
import org.apache.nifi.c2.protocol.api.ProcessorBulletin;
import org.apache.nifi.c2.protocol.api.ResourceInfo;
import org.apache.nifi.c2.protocol.api.C2Heartbeat;
import org.apache.nifi.c2.protocol.api.DeviceInfo;
Expand Down Expand Up @@ -89,7 +91,7 @@ public C2Heartbeat create(RuntimeInfoWrapper runtimeInfoWrapper) {

heartbeat.setAgentInfo(getAgentInfo(runtimeInfoWrapper.getAgentRepositories(), runtimeInfoWrapper.getManifest()));
heartbeat.setDeviceInfo(generateDeviceInfo());
heartbeat.setFlowInfo(getFlowInfo(runtimeInfoWrapper.getQueueStatus()));
heartbeat.setFlowInfo(getFlowInfo(runtimeInfoWrapper.getQueueStatus(), runtimeInfoWrapper.getProcessorBulletins()));
heartbeat.setCreated(System.currentTimeMillis());

ResourceInfo resourceInfo = new ResourceInfo();
Expand All @@ -99,9 +101,10 @@ public C2Heartbeat create(RuntimeInfoWrapper runtimeInfoWrapper) {
return heartbeat;
}

private FlowInfo getFlowInfo(Map<String, FlowQueueStatus> queueStatus) {
private FlowInfo getFlowInfo(Map<String, FlowQueueStatus> queueStatus, List<ProcessorBulletin> processorBulletins) {
FlowInfo flowInfo = new FlowInfo();
flowInfo.setQueues(queueStatus);
flowInfo.setProcessorBulletins(processorBulletins);
Optional.ofNullable(flowIdHolder.getFlowId()).ifPresent(flowInfo::setFlowId);
return flowInfo;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,24 @@
*/
package org.apache.nifi.c2.client.service.model;

import java.util.List;
import java.util.Map;
import org.apache.nifi.c2.protocol.api.AgentRepositories;
import org.apache.nifi.c2.protocol.api.FlowQueueStatus;
import org.apache.nifi.c2.protocol.api.ProcessorBulletin;
import org.apache.nifi.c2.protocol.component.api.RuntimeManifest;

public class RuntimeInfoWrapper {
final AgentRepositories repos;
final RuntimeManifest manifest;
final Map<String, FlowQueueStatus> queueStatus;
final List<ProcessorBulletin> processorBulletins;

public RuntimeInfoWrapper(AgentRepositories repos, RuntimeManifest manifest, Map<String, FlowQueueStatus> queueStatus) {
public RuntimeInfoWrapper(AgentRepositories repos, RuntimeManifest manifest, Map<String, FlowQueueStatus> queueStatus, List<ProcessorBulletin> processorBulletins) {
this.repos = repos;
this.manifest = manifest;
this.queueStatus = queueStatus;
this.processorBulletins = processorBulletins;
}

public AgentRepositories getAgentRepositories() {
Expand All @@ -43,4 +47,8 @@ public RuntimeManifest getManifest() {
public Map<String, FlowQueueStatus> getQueueStatus() {
return queueStatus;
}

public List<ProcessorBulletin> getProcessorBulletins() {
return processorBulletins;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
import static org.mockito.Mockito.when;

import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
Expand All @@ -37,6 +39,7 @@
import org.apache.nifi.c2.protocol.api.C2Heartbeat;
import org.apache.nifi.c2.protocol.api.FlowQueueStatus;
import org.apache.nifi.c2.protocol.api.OperationType;
import org.apache.nifi.c2.protocol.api.ProcessorBulletin;
import org.apache.nifi.c2.protocol.api.SupportedOperation;
import org.apache.nifi.c2.protocol.api.ResourcesGlobalHash;
import org.apache.nifi.c2.protocol.component.api.Bundle;
Expand Down Expand Up @@ -117,12 +120,14 @@ void testCreatePopulatesFromRuntimeInfoWrapperForFullHeartbeat() {
AgentRepositories repos = new AgentRepositories();
RuntimeManifest manifest = createManifest();
Map<String, FlowQueueStatus> queueStatus = new HashMap<>();
List<ProcessorBulletin> processorBulletins = new ArrayList<>();

C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(repos, manifest, queueStatus));
C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(repos, manifest, queueStatus, processorBulletins));

assertEquals(repos, heartbeat.getAgentInfo().getStatus().getRepositories());
assertEquals(manifest, heartbeat.getAgentInfo().getAgentManifest());
assertEquals(queueStatus, heartbeat.getFlowInfo().getQueues());
assertEquals(processorBulletins, heartbeat.getFlowInfo().getProcessorBulletins());
assertEquals(RESOURCE_HASH, heartbeat.getResourceInfo().getHash());
}

Expand All @@ -134,12 +139,14 @@ void testCreatePopulatesFromRuntimeInfoWrapperForLightHeartbeat() {
AgentRepositories repos = new AgentRepositories();
RuntimeManifest manifest = createManifest();
Map<String, FlowQueueStatus> queueStatus = new HashMap<>();
List<ProcessorBulletin> processorBulletins = new ArrayList<>();

C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(repos, manifest, queueStatus));
C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(repos, manifest, queueStatus, processorBulletins));

assertEquals(repos, heartbeat.getAgentInfo().getStatus().getRepositories());
assertNull(heartbeat.getAgentInfo().getAgentManifest());
assertEquals(queueStatus, heartbeat.getFlowInfo().getQueues());
assertEquals(processorBulletins, heartbeat.getFlowInfo().getProcessorBulletins());
assertEquals(RESOURCE_HASH, heartbeat.getResourceInfo().getHash());
}

Expand All @@ -156,7 +163,7 @@ void testAgentManifestHashIsPopulatedInCaseOfRuntimeManifest() {
when(manifestHashProvider.calculateManifestHash(manifest.getBundles(), Collections.emptySet())).thenReturn(MANIFEST_HASH);
when(resourcesGlobalHashSupplier.get()).thenReturn(createResourcesGlobalHash());

C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(new AgentRepositories(), manifest, new HashMap<>()));
C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(new AgentRepositories(), manifest, new HashMap<>(), new ArrayList<>()));

assertEquals(MANIFEST_HASH, heartbeat.getAgentInfo().getAgentManifestHash());
assertEquals(RESOURCE_HASH, heartbeat.getResourceInfo().getHash());
Expand All @@ -172,7 +179,7 @@ void testAgentManifestHashIsPopulatedInCaseOfAgentManifest() {
when(manifestHashProvider.calculateManifestHash(manifest.getBundles(), supportedOperations)).thenReturn(MANIFEST_HASH);
when(resourcesGlobalHashSupplier.get()).thenReturn(createResourcesGlobalHash());

C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(new AgentRepositories(), manifest, new HashMap<>()));
C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(new AgentRepositories(), manifest, new HashMap<>(), new ArrayList<>()));

assertEquals(MANIFEST_HASH, heartbeat.getAgentInfo().getAgentManifestHash());
assertEquals(RESOURCE_HASH, heartbeat.getResourceInfo().getHash());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ void testDescribeManifestOperationHandlerCreateSuccess() {
void testDescribeManifestOperationHandlerPopulatesAckSuccessfully() {
RuntimeManifest manifest = new RuntimeManifest();
manifest.setIdentifier("manifestId");
RuntimeInfoWrapper runtimeInfoWrapper = new RuntimeInfoWrapper(null, manifest, null);
RuntimeInfoWrapper runtimeInfoWrapper = new RuntimeInfoWrapper(null, manifest, null, null);

C2Heartbeat heartbeat = new C2Heartbeat();
AgentInfo agentInfo = new AgentInfo();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.swagger.v3.oas.annotations.media.Schema;

import java.io.Serializable;
import java.util.List;
import java.util.Map;

public class FlowInfo implements Serializable {
Expand All @@ -29,6 +30,7 @@ public class FlowInfo implements Serializable {
private FlowUri flowUri;
private Map<String, ComponentStatus> components;
private Map<String, FlowQueueStatus> queues;
private List<ProcessorBulletin> processorBulletins;

@Schema(description = "A unique identifier of the flow currently deployed on the agent")
public String getFlowId() {
Expand Down Expand Up @@ -66,4 +68,13 @@ public void setQueues(Map<String, FlowQueueStatus> queues) {
this.queues = queues;
}

@Schema(description = "Bulletins of each processors")
public List<ProcessorBulletin> getProcessorBulletins() {
return processorBulletins;
}

public void setProcessorBulletins(List<ProcessorBulletin> processorBulletins) {
this.processorBulletins = processorBulletins;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.c2.protocol.api;

import io.swagger.v3.oas.annotations.media.Schema;
import java.io.Serializable;
import java.util.Date;

public class ProcessorBulletin implements Serializable {

private static final long serialVersionUID = 1L;

private Date timestamp;
private long id;
private String nodeAddress;
private String level;
private String category;
private String message;
private String groupId;
private String groupName;
private String groupPath;
private String sourceId;
private String sourceName;
private String flowFileUuid;

@Schema(description = "When this bulletin was generated")
public Date getTimestamp() {
return timestamp;
}

public void setTimestamp(Date timestamp) {
this.timestamp = timestamp;
}

@Schema(description = "The id of the bulletin")
public long getId() {
return id;
}

public void setId(long id) {
this.id = id;
}

@Schema(description = "If clustered, the address of the node from which the bulletin originated")
public String getNodeAddress() {
return nodeAddress;
}

public void setNodeAddress(String nodeAddress) {
this.nodeAddress = nodeAddress;
}

@Schema(description = "The level of the bulletin")
public String getLevel() {
return level;
}

public void setLevel(String level) {
this.level = level;
}

@Schema(description = "The category of this bulletin")
public String getCategory() {
return category;
}

public void setCategory(String category) {
this.category = category;
}

@Schema(description = "The bulletin message")
public String getMessage() {
return message;
}

public void setMessage(String message) {
this.message = message;
}

@Schema(description = "The group id of the source component")
public String getGroupId() {
return groupId;
}

public void setGroupId(String groupId) {
this.groupId = groupId;
}

@Schema(description = "The group name of the source component")
public String getGroupName() {
return groupName;
}

public void setGroupName(String groupName) {
this.groupName = groupName;
}

@Schema(description = "The group path of the source component")
public String getGroupPath() {
return groupPath;
}

public void setGroupPath(String groupPath) {
this.groupPath = groupPath;
}

@Schema(description = "The id of the source component")
public String getSourceId() {
return sourceId;
}

public void setSourceId(String sourceId) {
this.sourceId = sourceId;
}

@Schema(description = "The name of the source component")
public String getSourceName() {
return sourceName;
}

public void setSourceName(String sourceName) {
this.sourceName = sourceName;
}

@Schema(description = "The id of the flow file")
public String getFlowFileUuid() {
return flowFileUuid;
}

public void setFlowFileUuid(String flowFileUuid) {
this.flowFileUuid = flowFileUuid;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public enum MiNiFiProperties {
C2_SECURITY_KEYSTORE_TYPE("c2.security.keystore.type", "JKS", false, false, VALID),
C2_REQUEST_COMPRESSION("c2.request.compression", "none", false, true, VALID),
C2_BOOTSTRAP_ACKNOWLEDGE_TIMEOUT("c2.bootstrap.acknowledge.timeout", "15 sec", false, true, VALID),
C2_FLOW_INFO_PROCESSOR_BULLETIN_LIMIT("c2.flow.info.processor.bulletin.limit", "1000", false, true, NON_NEGATIVE_INTEGER_VALIDATOR ),
NIFI_MINIFI_NOTIFIER_INGESTORS("nifi.minifi.notifier.ingestors", null, false, true, VALID),
NIFI_MINIFI_NOTIFIER_INGESTORS_FILE_CONFIG_PATH("nifi.minifi.notifier.ingestors.file.config.path", null, false, true, VALID),
NIFI_MINIFI_NOTIFIER_INGESTORS_FILE_POLLING_PERIOD_SECONDS("nifi.minifi.notifier.ingestors.file.polling.period.seconds", null, false, true, NON_NEGATIVE_INTEGER_VALIDATOR),
Expand Down
Loading

0 comments on commit 83b701a

Please sign in to comment.