Skip to content

Commit

Permalink
NIFI-12331 Added PublishSlack Processor
Browse files Browse the repository at this point in the history
- Removed deprecated PutSlack and PostSlack

This closes #8120

Signed-off-by: David Handermann <[email protected]>
  • Loading branch information
markap14 authored and exceptionfactory committed Dec 5, 2023
1 parent ab8a82b commit a21993e
Show file tree
Hide file tree
Showing 20 changed files with 789 additions and 2,094 deletions.
45 changes: 7 additions & 38 deletions nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,45 +26,26 @@
<packaging>jar</packaging>

<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.slack.api</groupId>
<artifactId>bolt-socket-mode</artifactId>
<version>1.32.1</version>
<version>1.36.1</version>
</dependency>
<!-- Required by bolt-socket-mode but the library itself doesn't have the dependency. -->
<dependency>
<groupId>org.glassfish.tyrus.bundles</groupId>
<artifactId>tyrus-standalone-client</artifactId>
<version>1.20</version>
</dependency>

<dependency>
<groupId>javax.json</groupId>
<artifactId>javax.json-api</artifactId>
<version>1.1.4</version>
</dependency>
<dependency>
<groupId>org.glassfish</groupId>
<artifactId>javax.json</artifactId>
<version>1.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpmime</artifactId>
<version>${org.apache.httpcomponents.httpclient.version}</version>
<version>1.21</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
Expand All @@ -82,17 +63,6 @@
<artifactId>mockwebserver</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>javax.ws.rs-api</artifactId>
<version>2.1.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId>
Expand Down Expand Up @@ -146,6 +116,5 @@
<artifactId>nifi-proxy-configuration-api</artifactId>
<scope>test</scope>
</dependency>

</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -57,23 +57,23 @@
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.slack.consume.ConsumeChannel;
import org.apache.nifi.processors.slack.consume.ConsumeSlackClient;
import org.apache.nifi.processors.slack.consume.ConsumeSlackUtil;
import org.apache.nifi.processors.slack.consume.UsernameLookup;
import org.apache.nifi.processors.slack.util.RateLimit;
import org.apache.nifi.processors.slack.util.SlackResponseUtil;
import org.apache.nifi.util.StringUtils;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

@PrimaryNodeOnly
@TriggerSerially
Expand All @@ -86,7 +86,7 @@
@WritesAttribute(attribute = "slack.message.count", description = "The number of slack messages that are included in the FlowFile"),
@WritesAttribute(attribute = "mime.type", description = "Set to application/json, as the output will always be in JSON format")
})
@SeeAlso({ListenSlack.class, PostSlack.class, PutSlack.class})
@SeeAlso({ListenSlack.class})
@Tags({"slack", "conversation", "conversation.history", "social media", "team", "text", "unstructured"})
@CapabilityDescription("Retrieves messages from one or more configured Slack channels. The messages are written out in JSON format. " +
"See Usage / Additional Details for more information about how to configure this Processor and enable it to retrieve messages from Slack.")
Expand Down Expand Up @@ -182,7 +182,7 @@ public class ConsumeSlack extends AbstractProcessor implements VerifiableProcess
.build();


private final AtomicLong nextRequestTime = new AtomicLong(0L);
private final RateLimit rateLimit = new RateLimit(getLogger());
private final Queue<ConsumeChannel> channels = new LinkedBlockingQueue<>();
private volatile App slackApp;

Expand Down Expand Up @@ -329,7 +329,8 @@ private ConsumeChannel getChannel() {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
// Check to see if we are currently in a backoff period due to Slack's Rate Limit
if (isRateLimited()) {
if (rateLimit.isLimitReached()) {
getLogger().debug("Will not consume from Slack because rate limit has been reached");
context.yield();
return;
}
Expand All @@ -355,32 +356,18 @@ public void onTrigger(final ProcessContext context, final ProcessSession session


private void yieldOnException(final Throwable t, final String channelId, final ProcessContext context) {
if (ConsumeSlackUtil.isRateLimited(t)) {
if (SlackResponseUtil.isRateLimited(t)) {
getLogger().warn("Slack indicated that the Rate Limit has been exceeded when attempting to retrieve messages for channel {}", channelId);
} else {
getLogger().error("Failed to retrieve messages for channel {}", channelId, t);
}

final int retryAfterSeconds = ConsumeSlackUtil.getRetryAfterSeconds(t);
final long timeOfNextRequest = System.currentTimeMillis() + (retryAfterSeconds * 1000L);
nextRequestTime.getAndUpdate(currentTime -> Math.max(currentTime, timeOfNextRequest));
final int retryAfterSeconds = SlackResponseUtil.getRetryAfterSeconds(t);
rateLimit.retryAfter(Duration.ofSeconds(retryAfterSeconds));
context.yield();
}


private boolean isRateLimited() {
final long nextTime = nextRequestTime.get();
if (nextTime > 0 && System.currentTimeMillis() < nextTime) {
getLogger().debug("Will not retrieve any messages until {} due to Slack's Rate Limit", new Date(nextTime));
return true;
} else if (nextTime > 0) {
// Set nextRequestTime to 0 so that we no longer bother to make system calls to System.currentTimeMillis()
nextRequestTime.compareAndSet(nextTime, 0);
}

return false;
}

@Override
public List<ConfigVerificationResult> verify(final ProcessContext context, final ComponentLog verificationLogger, final Map<String, String> attributes) {
final List<ConfigVerificationResult> results = new ArrayList<>();
Expand Down Expand Up @@ -466,7 +453,7 @@ public Map<String, String> fetchChannelIds() throws SlackApiException, IOExcepti
continue;
}

final String errorMessage = ConsumeSlackUtil.getErrorMessage(response.getError(), response.getNeeded(), response.getProvided(), response.getWarning());
final String errorMessage = SlackResponseUtil.getErrorMessage(response.getError(), response.getNeeded(), response.getProvided(), response.getWarning());
throw new RuntimeException("Failed to determine Channel IDs: " + errorMessage);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
@WritesAttributes({
@WritesAttribute(attribute = "mime.type", description = "Set to application/json, as the output will always be in JSON format")
})
@SeeAlso({ConsumeSlack.class, PostSlack.class, PutSlack.class})
@SeeAlso({ConsumeSlack.class})
@Tags({"slack", "real-time", "event", "message", "command", "listen", "receive", "social media", "team", "text", "unstructured"})
@CapabilityDescription("Retrieves real-time messages or Slack commands from one or more Slack conversations. The messages are written out in JSON format. " +
"Note that this Processor should be used to obtain real-time messages and commands from Slack and does not provide a mechanism for obtaining historical messages. " +
Expand Down
Loading

0 comments on commit a21993e

Please sign in to comment.