Skip to content

Commit

Permalink
NIFI-12427 Add channel name attribute to ConsumeSlack
Browse files Browse the repository at this point in the history
This closes apache#8078

Signed-off-by: David Handermann <[email protected]>
  • Loading branch information
pvillard31 authored and exceptionfactory committed Dec 5, 2023
1 parent eeb2b1a commit d0dd4e0
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
Expand Down Expand Up @@ -239,37 +238,45 @@ private List<ConsumeChannel> createChannels(final ProcessContext context, final
final ConsumeSlackClient client = initializeClient(slackApp);

// Split channel ID's by commas and trim any white space
final List<String> channelIds = new ArrayList<>();
final String channelIdsValue = context.getProperty(CHANNEL_IDS).getValue();
Arrays.stream(channelIdsValue.split(","))
final List<String> channels = new ArrayList<>();
final String channelsValue = context.getProperty(CHANNEL_IDS).getValue();
Arrays.stream(channelsValue.split(","))
.map(String::trim)
.filter(s -> !s.isEmpty())
.forEach(channelIds::add);

// If any of the Channel ID's is a channel name instead, fetch all channel ID's and replace names with ID's
final boolean lookupChannels = channelIds.stream().anyMatch(id -> id.startsWith("#"));
if (lookupChannels) {
final Map<String, String> channelMapping = client.fetchChannelIds();
final ListIterator<String> channelItr = channelIds.listIterator();

while (channelItr.hasNext()) {
final String channelIdOrName = channelItr.next().replace("#", "");
final String resolved = channelMapping.get(channelIdOrName);
if (resolved != null) {
channelItr.remove();
channelItr.add(resolved);
getLogger().info("Resolved Channel {} to ID {}", channelIdOrName, resolved);
}
}
}
.forEach(channels::add);

// Fetch all channel ID's to have a name/ID channel mapping
Map<String, String> channelMapping = client.fetchChannelIds();

// Create ConsumeChannel objects for each Channel ID
final UsernameLookup usernameLookup = new UsernameLookup(client, getLogger());

final List<ConsumeChannel> consumeChannels = new ArrayList<>();
for (final String channelId : channelIds) {
for (final String channel : channels) {

String channelName;
String channelId;

final String channelIdOrName = channel.replace("#", "");
channelId = channelMapping.get(channelIdOrName);

if(channelId != null) {
channelName = channelIdOrName;
getLogger().info("Resolved Channel {} to ID {}", channelName, channelId);
} else {
channelId = channelIdOrName;
channelName = channelMapping
.keySet()
.stream()
.filter(entry -> channelIdOrName.equals(channelMapping.get(entry)))
.findFirst()
.orElse("");
getLogger().info("Resolved Channel ID {} to name {}", channelId, channelName);
}

final ConsumeChannel consumeChannel = new ConsumeChannel.Builder()
.channelId(channelId)
.channelName(channelName)
.batchSize(context.getProperty(BATCH_SIZE).asInteger())
.client(client)
.includeMessageBlocks(context.getProperty(INCLUDE_MESSAGE_BLOCKS).asBoolean())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public class ConsumeChannel {

private final ConsumeSlackClient client;
private final String channelId;
private final String channelName;
private final int batchSize;
private final long replyMonitorFrequencyMillis;
private final long replyMonitorWindowMillis;
Expand All @@ -80,6 +81,7 @@ public class ConsumeChannel {
private ConsumeChannel(final Builder builder) {
this.client = builder.client;
this.channelId = builder.channelId;
this.channelName = builder.channelName;
this.batchSize = builder.batchSize;
this.replyMonitorFrequencyMillis = builder.replyMonitorFrequencyMillis;
this.replyMonitorWindowMillis = builder.replyMonitorWindowMillis;
Expand Down Expand Up @@ -545,6 +547,7 @@ private ConsumptionResults consumeMessages(final ProcessContext context, final P
// Determine attributes for outbound FlowFile
final Map<String, String> attributes = new HashMap<>();
attributes.put("slack.channel.id", channelId);
attributes.put("slack.channel.name", channelName);
attributes.put("slack.message.count", Integer.toString(messageCount));
attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");

Expand Down Expand Up @@ -698,6 +701,7 @@ public boolean isYielded() {
public static class Builder {
private ConsumeSlackClient client;
private String channelId;
private String channelName;
private boolean includeMessageBlocks;
private boolean resolveUsernames;
private int batchSize = 50;
Expand All @@ -713,6 +717,11 @@ public Builder channelId(final String channelId) {
return this;
}

public Builder channelName(final String channelName) {
this.channelName = channelName;
return this;
}

public Builder client(final ConsumeSlackClient client) {
this.client = client;
return this;
Expand Down Expand Up @@ -827,6 +836,7 @@ public boolean isContinuePolling() {
return continuePolling;
}

@Override
public boolean isMore() {
return isMore;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public void testSuccessfullyReceivedSingleMessage() throws JsonProcessingExcepti
assertEquals(message, outputMessages[0]);

outFlowFile1.assertAttributeEquals("slack.channel.id", "cid1");
outFlowFile1.assertAttributeEquals("slack.channel.name", "cname1");
}


Expand All @@ -115,6 +116,7 @@ public void testReceivedMultipleMessages() throws JsonProcessingException {
assertArrayEquals(expectedMessages, outputMessages);

outFlowFile1.assertAttributeEquals("slack.channel.id", "cid1");
outFlowFile1.assertAttributeEquals("slack.channel.name", "cname1");
}


Expand Down Expand Up @@ -562,7 +564,9 @@ public void addUserMapping(final String userId, final String username) {

@Override
public Map<String, String> fetchChannelIds() {
return Collections.emptyMap();
final Map<String, String> nameIdMapping = new HashMap<String, String>();
nameIdMapping.put("cname1", "cid1");
return nameIdMapping;
}

private void checkRateLimit() {
Expand Down

0 comments on commit d0dd4e0

Please sign in to comment.