Skip to content

Commit

Permalink
Merge pull request #613 from salesforce/revert-611-W-13913006-count-c…
Browse files Browse the repository at this point in the history
…lient-events

Revert "@W-13913006 count client facing events"
  • Loading branch information
sagikSF authored Aug 29, 2023
2 parents fd0bfc9 + b0e296e commit 41a31bd
Show file tree
Hide file tree
Showing 3 changed files with 2 additions and 41 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.datorama.oss.timbermill.common;

import com.datorama.oss.timbermill.TaskIndexer;
import com.datorama.oss.timbermill.pipe.LocalOutputPipe;
import com.datorama.oss.timbermill.unit.Event;
import com.google.common.collect.Sets;
import org.slf4j.Logger;
Expand Down Expand Up @@ -308,7 +307,6 @@ public class ElasticsearchUtil {
private static final Set<String> envsSet = Sets.newConcurrentHashSet();
private static final Pattern metadataPatten = Pattern.compile("metadata.*");
private static Pattern notToSkipRegexPattern;
private static Pattern clientFacingEventsPattern = LocalOutputPipe.getClientFacingEventsPattern();

public static Set<String> getEnvSet() {
return envsSet;
Expand All @@ -324,18 +322,7 @@ public static void drainAndIndex(BlockingQueue<Event> eventsQueue, TaskIndexer t
Collection<Event> unfilteredEvents = new ArrayList<>();
eventsQueue.drainTo(unfilteredEvents, maxElement);
Collection<Event> events = filterEvents(unfilteredEvents, skipEventsAtDrainFlag, notToSkipRegex);

if (clientFacingEventsPattern != null){
events.forEach(e -> {
if (clientFacingEventsPattern.matcher(e.getName()).matches()){
KamonConstants.MESSAGES_IN_INPUT_QUEUE_RANGE_SAMPLER.withTag("client_facing", true).decrement();
} else {
KamonConstants.MESSAGES_IN_INPUT_QUEUE_RANGE_SAMPLER.withTag("client_facing", false).decrement();
}
});
} else {
KamonConstants.MESSAGES_IN_INPUT_QUEUE_RANGE_SAMPLER.withoutTags().decrement(events.size());
}
KamonConstants.MESSAGES_IN_INPUT_QUEUE_RANGE_SAMPLER.withoutTags().decrement(events.size());
logErrorInEventsMap(events.stream().filter(event -> event.getTaskId() != null).collect(Collectors.groupingBy(Event::getTaskId)), "drainAndIndex");

events.forEach(e -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.regex.Pattern;

public class LocalOutputPipe implements EventOutputPipe {

Expand All @@ -42,7 +41,6 @@ public class LocalOutputPipe implements EventOutputPipe {
private boolean stoppedRunning = false;
private static final Logger LOG = LoggerFactory.getLogger(LocalOutputPipe.class);
private LoadingCache<String, RateLimiter> rateLimiterMap;
private static Pattern clientFacingEventsPattern = null;

private LocalOutputPipe(Builder builder) {
if (builder.elasticUrl == null){
Expand Down Expand Up @@ -123,15 +121,7 @@ public static void doPushEventToQueues(PersistenceHandler persistenceHandler, Bl
KamonConstants.MESSAGES_IN_OVERFLOWED_QUEUE_RANGE_SAMPLER.withoutTags().increment();
}
} else {
if (clientFacingEventsPattern != null) {
if (clientFacingEventsPattern.matcher(event.getName()).matches()) {
KamonConstants.MESSAGES_IN_INPUT_QUEUE_RANGE_SAMPLER.withTag("client_facing", true).increment();
} else {
KamonConstants.MESSAGES_IN_INPUT_QUEUE_RANGE_SAMPLER.withTag("client_facing", false).increment();
}
} else {
KamonConstants.MESSAGES_IN_INPUT_QUEUE_RANGE_SAMPLER.withoutTags().increment();
}
KamonConstants.MESSAGES_IN_INPUT_QUEUE_RANGE_SAMPLER.withoutTags().increment();
}
}

Expand Down Expand Up @@ -180,14 +170,6 @@ public void setRateLimiterMap(LoadingCache<String, RateLimiter> rateLimiterMap)
this.rateLimiterMap = rateLimiterMap;
}

public static void setClientFacingEventsPattern(Pattern newClientFacingEventsPattern){
clientFacingEventsPattern = newClientFacingEventsPattern;
}

public static Pattern getClientFacingEventsPattern(){
return clientFacingEventsPattern;
}

public static class Builder {

Bulker bulker;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ public class TimbermillService {

private static Pattern notToSkipRegexPattern = null;
private static Pattern metadataPatten = Pattern.compile("metadata.*");
private String clientFacingEventsRegex;
public static Pattern clientFacingEventsPattern = null;

private boolean skipEventsAtInsertFlag;
private boolean skipEventsAtDrainFlag;
Expand Down Expand Up @@ -118,7 +116,6 @@ public TimbermillService(@Value("${INDEX_BULK_SIZE:200000}") Integer indexBulkSi
@Value("${LIMIT_FOR_PERIOD:30000}") int limitForPeriod,
@Value("${LIMIT_REFRESH_PERIOD_MINUTES:1}") int limitRefreshPeriod,
@Value("${RATE_LIMITER_CAPACITY:1000000}") int rateLimiterCapacity,
@Value("${client.facing.events.regex:''}") String clientFacingEventsRegex,
@Value("${skip.events.at.insert.flag:false}") boolean skipEventsAtInsertFlag,
@Value("${skip.events.at.drain.flag:false}") boolean skipEventsAtDrainFlag,
@Value("${not.to.skip.events.regex:.*}") String notToSkipRegex){
Expand All @@ -132,10 +129,6 @@ public TimbermillService(@Value("${INDEX_BULK_SIZE:200000}") Integer indexBulkSi
this.skipEventsAtDrainFlag = skipEventsAtDrainFlag;
this.notToSkipRegex = notToSkipRegex;

if (!clientFacingEventsRegex.isEmpty()){
clientFacingEventsPattern = Pattern.compile(clientFacingEventsRegex);
}

RedisService redisService = null;
if (!StringUtils.isEmpty(redisHost)) {
redisService = new RedisService(redisHost, redisPort, redisPass, redisMaxMemory,
Expand All @@ -155,7 +148,6 @@ public TimbermillService(@Value("${INDEX_BULK_SIZE:200000}") Integer indexBulkSi
AbstractCacheHandler cacheHandler = CacheHandlerUtil.getCacheHandler(cacheStrategy, cacheParams);
this.eventsMaxElement = eventsMaxElement;
taskIndexer = new TaskIndexer(pluginsJson, daysRotation, es, timbermillVersion, cacheHandler);
if(!(clientFacingEventsPattern == null)){LocalOutputPipe.setClientFacingEventsPattern(clientFacingEventsPattern);}
cronsRunner.runCrons(bulkPersistentFetchCronExp, eventsPersistentFetchCronExp, persistenceHandler, es, deletionCronExp,
eventsQueue, overflowedQueue, mergingCronExp, redisService, rateLimiterMap, indexMergerCronExp);
startQueueSpillerThread();
Expand Down

0 comments on commit 41a31bd

Please sign in to comment.