Skip to content

Commit

Permalink
NIFI-13450 Regenerate MiNiFi Agent Manifest between Heartbeats
Browse files Browse the repository at this point in the history
Signed-off-by: Csaba Bejan <[email protected]>

This closes #9005
  • Loading branch information
ferencerdei authored and bejancsaba committed Jun 26, 2024
1 parent af69646 commit 212dd94
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

import java.util.List;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;

import org.apache.nifi.c2.client.api.C2Client;
import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper;
import org.apache.nifi.c2.protocol.api.C2Heartbeat;
Expand All @@ -36,15 +38,15 @@ public class C2HeartbeatManager implements Runnable {
private final C2Client client;
private final C2HeartbeatFactory c2HeartbeatFactory;
private final ReentrantLock heartbeatLock;
private final RuntimeInfoWrapper runtimeInfoWrapper;
private final Supplier<RuntimeInfoWrapper> runtimeInfoSupplier;
private final C2OperationManager c2OperationManager;

public C2HeartbeatManager(C2Client client, C2HeartbeatFactory c2HeartbeatFactory, ReentrantLock heartbeatLock, RuntimeInfoWrapper runtimeInfoWrapper,
public C2HeartbeatManager(C2Client client, C2HeartbeatFactory c2HeartbeatFactory, ReentrantLock heartbeatLock, Supplier<RuntimeInfoWrapper> runtimeInfoSupplier,
C2OperationManager c2OperationManager) {
this.client = client;
this.c2HeartbeatFactory = c2HeartbeatFactory;
this.heartbeatLock = heartbeatLock;
this.runtimeInfoWrapper = runtimeInfoWrapper;
this.runtimeInfoSupplier = runtimeInfoSupplier;
this.c2OperationManager = c2OperationManager;
}

Expand All @@ -56,7 +58,7 @@ public void run() {
}
try {
LOGGER.debug("Heartbeat lock is acquired, sending heartbeat");
C2Heartbeat c2Heartbeat = c2HeartbeatFactory.create(runtimeInfoWrapper);
C2Heartbeat c2Heartbeat = c2HeartbeatFactory.create(runtimeInfoSupplier.get());
client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse);
} catch (Exception e) {
LOGGER.error("Failed to send/process heartbeat", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import java.util.List;
import java.util.Optional;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;

import org.apache.nifi.c2.client.api.C2Client;
import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper;
import org.apache.nifi.c2.protocol.api.C2Heartbeat;
Expand All @@ -52,6 +54,9 @@ public class C2HeartbeatManagerTest {
@Mock
private ReentrantLock mockHeartbeatLock;

@Mock
private Supplier<RuntimeInfoWrapper> mockRuntimeInfoWrapperSupplier;

@Mock
private RuntimeInfoWrapper mockRuntimeInfoWrapper;

Expand All @@ -75,6 +80,7 @@ void shouldSkipSendingHeartbeatIfHeartbeatLockIsAcquired() {

@Test
void shouldSendHeartbeatAndProcessEmptyResponse() {
when(mockRuntimeInfoWrapperSupplier.get()).thenReturn(mockRuntimeInfoWrapper);
when(mockHeartbeatLock.tryLock()).thenReturn(true);
C2Heartbeat mockC2Heartbeat = mock(C2Heartbeat.class);
when(mockC2HeartbeatFactory.create(mockRuntimeInfoWrapper)).thenReturn(mockC2Heartbeat);
Expand All @@ -91,6 +97,7 @@ void shouldSendHeartbeatAndProcessEmptyResponse() {

@Test
void shouldSendHeartbeatAndProcessResponseWithNoOperation() {
when(mockRuntimeInfoWrapperSupplier.get()).thenReturn(mockRuntimeInfoWrapper);
when(mockHeartbeatLock.tryLock()).thenReturn(true);
C2Heartbeat mockC2Heartbeat = mock(C2Heartbeat.class);
when(mockC2HeartbeatFactory.create(mockRuntimeInfoWrapper)).thenReturn(mockC2Heartbeat);
Expand All @@ -108,6 +115,7 @@ void shouldSendHeartbeatAndProcessResponseWithNoOperation() {

@Test
void shouldSendHeartbeatAndProcessResponseWithMultipleOperation() {
when(mockRuntimeInfoWrapperSupplier.get()).thenReturn(mockRuntimeInfoWrapper);
when(mockHeartbeatLock.tryLock()).thenReturn(true);
C2Heartbeat mockC2Heartbeat = mock(C2Heartbeat.class);
when(mockC2HeartbeatFactory.create(mockRuntimeInfoWrapper)).thenReturn(mockC2Heartbeat);
Expand All @@ -128,6 +136,7 @@ void shouldSendHeartbeatAndProcessResponseWithMultipleOperation() {

@Test
void shouldReleaseHeartbeatLockWhenExceptionOccurs() {
when(mockRuntimeInfoWrapperSupplier.get()).thenReturn(mockRuntimeInfoWrapper);
when(mockHeartbeatLock.tryLock()).thenReturn(true);
when(mockC2HeartbeatFactory.create(mockRuntimeInfoWrapper)).thenThrow(new RuntimeException());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ public C2NifiClientService(NiFiProperties niFiProperties, FlowController flowCon
this.c2OperationManager = new C2OperationManager(
client, c2OperationHandlerProvider, heartbeatLock, operationQueueDAO, c2OperationRestartHandler);
this.c2HeartbeatManager = new C2HeartbeatManager(
client, heartbeatFactory, heartbeatLock, generateRuntimeInfo(), c2OperationManager);
client, heartbeatFactory, heartbeatLock, this::generateRuntimeInfo, c2OperationManager);
}

private C2ClientConfig generateClientConfig(NiFiProperties properties) {
Expand Down

0 comments on commit 212dd94

Please sign in to comment.