diff --git a/deployer/src/cloudfoundry_bridge.py b/deployer/src/cloudfoundry_bridge.py index 558c7d5..05776e2 100644 --- a/deployer/src/cloudfoundry_bridge.py +++ b/deployer/src/cloudfoundry_bridge.py @@ -48,7 +48,8 @@ def build_config(self, local=False): "token": self._vcap_services.token, "dashboard_url": self._vcap_services.dashboard_url, "kafka_servers": self._vcap_services.kafka_servers, - "kafka_topic": self._vcap_services.topic_name, + "kafka_observations_topic": self._vcap_services.observations_topic_name, + "kafka_rule_engine_topic": self._vcap_services.rule_engine_topic_name, "kafka_zookeeper_quorum": kafka_zookeeper_quorum, "application_name": "rule_engine_" + self._vcap_services.dashboard_url_normalized_for_gearpump, "dashboard_strict_ssl": self._vcap_services.dashboard_strict_ssl, diff --git a/deployer/src/vcap.py b/deployer/src/vcap.py index 41ab720..b5c047a 100644 --- a/deployer/src/vcap.py +++ b/deployer/src/vcap.py @@ -41,7 +41,7 @@ def __init__(self): self.__parse_dashboard_url(ups) self.__gather_rule_engine_token(ups) - self.__gather_topic_name(ups) + self.__gather_topics_names(ups) self.__parse_gearpump_credentials() self.__parse_kerberos_hbase_properties(ups) @@ -68,9 +68,10 @@ def __parse_dashboard_url(self, ups): .replace("-", "_") \ .replace(".", "_") - def __gather_topic_name(self, ups): + def __gather_topics_names(self, ups): kafka_ups = self.__get_ups_by_name(ups, 'kafka-ups') - self.topic_name = kafka_ups['credentials']['topic'] + self.observations_topic_name = kafka_ups['credentials']['topics']['observations'] + self.rule_engine_topic_name = kafka_ups['credentials']['topics']['rule_engine'] def __parse_gearpump_credentials(self): self.gearpump_credentials = self.json['gearpump'][0]['credentials'] diff --git a/src/main/java/com/intel/ruleengine/gearpump/graph/Config.java b/src/main/java/com/intel/ruleengine/gearpump/graph/Config.java index f77b93b..b015153 100644 --- a/src/main/java/com/intel/ruleengine/gearpump/graph/Config.java +++ b/src/main/java/com/intel/ruleengine/gearpump/graph/Config.java @@ -27,7 +27,8 @@ class Config { private Boolean dashboard_strict_ssl; private String kafka_servers; private String kafka_zookeeper_quorum; - private String kafka_topic; + private String kafka_observations_topic; + private String kafka_rule_engine_topic; private String application_name; private String hadoop_security_authentication; private String krb_kdc; @@ -38,12 +39,20 @@ class Config { private String krb_regionserver_principal; - public String getKafka_topic() { - return kafka_topic; + public String getKafka_observations_topic() { + return kafka_observations_topic; } - public void setKafka_topic(String kafka_topic) { - this.kafka_topic = kafka_topic; + public void setKafka_observations_topic(String kafka_topic) { + this.kafka_observations_topic = kafka_topic; + } + + public String getKafka_rule_engine_topic() { + return kafka_rule_engine_topic; + } + + public void setKafka_rule_engine_topic(String kafka_topic) { + this.kafka_rule_engine_topic = kafka_topic; } public String getApplication_name() { @@ -170,7 +179,8 @@ public void setDashboard_strict_ssl(Boolean dashboard_strict_ssl) { public String toString() { String sep = ", "; StringBuilder builder = new StringBuilder() - .append("kafka_topic: ").append(getKafka_topic()).append(sep) + .append("kafka_observations_topic: ").append(getKafka_observations_topic()).append(sep) + .append("kafka_rule_engine_topic: ").append(getKafka_rule_engine_topic()).append(sep) .append("application_name: ").append(getApplication_name()).append(sep) .append("zookeeper_hbase_quorum: ").append(getZookeeper_hbase_quorum()).append(sep) .append("hbase_table_prefix: ").append(getHbase_table_prefix()).append(sep) diff --git a/src/main/java/com/intel/ruleengine/gearpump/graph/GraphConfig.java b/src/main/java/com/intel/ruleengine/gearpump/graph/GraphConfig.java index abfc562..3f4a03b 100644 --- a/src/main/java/com/intel/ruleengine/gearpump/graph/GraphConfig.java +++ b/src/main/java/com/intel/ruleengine/gearpump/graph/GraphConfig.java @@ -5,6 +5,9 @@ import com.intel.ruleengine.gearpump.data.HbaseProperties; import com.intel.ruleengine.gearpump.data.KerberosProperties; import com.intel.ruleengine.gearpump.tasks.KafkaSourceProcessor; +import com.intel.ruleengine.gearpump.tasks.KafkaSourceObservationsProcessor; +import com.intel.ruleengine.gearpump.tasks.KafkaSourceRulesUpdateProcessor; + import com.intel.ruleengine.gearpump.util.LogHelper; import io.gearpump.cluster.UserConfig; import org.slf4j.Logger; @@ -41,7 +44,8 @@ public GraphConfig(String... args) { .withString(DashboardConfig.DASHBOARD_TOKEN_PROPERTY, externalConfig.getToken()) .withString(DashboardConfig.DASHBOARD_URL_PROPERTY, externalConfig.getDashboard_url()) .withBoolean(DashboardConfig.DASHBOARD_STRICT_SSL_VERIFICATION, externalConfig.getDashboard_strict_ssl()) - .withString(KafkaSourceProcessor.KAFKA_TOPIC_PROPERTY, externalConfig.getKafka_topic()) + .withString(KafkaSourceObservationsProcessor.KAFKA_TOPIC_PROPERTY, externalConfig.getKafka_observations_topic()) + .withString(KafkaSourceRulesUpdateProcessor.KAFKA_TOPIC_PROPERTY, externalConfig.getKafka_rule_engine_topic()) .withString(KafkaSourceProcessor.KAFKA_URI_PROPERTY, externalConfig.getKafka_servers()) .withString(KafkaSourceProcessor.KAFKA_ZOOKEEPER_PROPERTY, externalConfig.getKafka_zookeeper_quorum()) .withString(HbaseProperties.AUTHENTICATION_METHOD, externalConfig.getHadoop_security_authentication()) diff --git a/src/main/java/com/intel/ruleengine/gearpump/graph/GraphDefinition.java b/src/main/java/com/intel/ruleengine/gearpump/graph/GraphDefinition.java index 9fbd711..2f30ae3 100644 --- a/src/main/java/com/intel/ruleengine/gearpump/graph/GraphDefinition.java +++ b/src/main/java/com/intel/ruleengine/gearpump/graph/GraphDefinition.java @@ -24,7 +24,8 @@ class GraphDefinition { private final Map> definition; - private final Processor kafkaSourceProcessor; + private final Processor kafkaSourceObservationsProcessor; + private final Processor kafkaSourceRulesUpdateProcessor; private final Processor checkObservationInRules; private final Processor sendAlerts; @@ -43,7 +44,8 @@ class GraphDefinition { persistComponentAlerts = processorsBuilder.getPersistComponentAlertsProccesor(); checkRules = processorsBuilder.getCheckRulesProcessor(); getRulesForComponent = processorsBuilder.getRulesForComponentProcessor(); - kafkaSourceProcessor = processorsBuilder.getKafkaSource(); + kafkaSourceObservationsProcessor = processorsBuilder.getKafkaSourceObservations(); + kafkaSourceRulesUpdateProcessor = processorsBuilder.getKafkaSourceRulesUpdate(); persistObservation = processorsBuilder.getPersistObservationProcessor(); this.definition = new HashMap<>(); buildDefinition(); @@ -54,16 +56,16 @@ public Map> getDefinition() { } private void buildDefinition() { - definition.put(kafkaSourceProcessor, Arrays.asList(getRulesForComponent)); + definition.put(kafkaSourceObservationsProcessor, Arrays.asList(getRulesForComponent)); definition.put(getRulesForComponent, Arrays.asList(persistObservation)); definition.put(persistObservation, Arrays.asList(checkObservationInRules)); - definition.put(checkObservationInRules, Arrays.asList(persistComponentAlerts)); definition.put(persistComponentAlerts, Arrays.asList(checkRules)); - definition.put(checkRules, Arrays.asList(sendAlerts)); + definition.put(sendAlerts, new ArrayList<>()); + + definition.put(kafkaSourceRulesUpdateProcessor, Arrays.asList(downloadRulesTask)); definition.put(downloadRulesTask, Arrays.asList(persistRulesTask)); definition.put(persistRulesTask, new ArrayList<>()); - definition.put(sendAlerts, new ArrayList<>()); } } diff --git a/src/main/java/com/intel/ruleengine/gearpump/graph/ProcessorsBuilder.java b/src/main/java/com/intel/ruleengine/gearpump/graph/ProcessorsBuilder.java index bd5738a..caaa52c 100644 --- a/src/main/java/com/intel/ruleengine/gearpump/graph/ProcessorsBuilder.java +++ b/src/main/java/com/intel/ruleengine/gearpump/graph/ProcessorsBuilder.java @@ -1,6 +1,7 @@ package com.intel.ruleengine.gearpump.graph; -import com.intel.ruleengine.gearpump.tasks.KafkaSourceProcessor; +import com.intel.ruleengine.gearpump.tasks.KafkaSourceObservationsProcessor; +import com.intel.ruleengine.gearpump.tasks.KafkaSourceRulesUpdateProcessor; import com.intel.ruleengine.gearpump.tasks.processors.*; import io.gearpump.cluster.UserConfig; import io.gearpump.streaming.javaapi.Processor; @@ -32,8 +33,12 @@ class ProcessorsBuilder { this.parallelismDefinition = parallelismDefinition; } - public Processor getKafkaSource() { - return new KafkaSourceProcessor(userConfig).getKafkaSourceProcessor(parallelismDefinition.getKafkaSourceProcessorsNumber()); + public Processor getKafkaSourceRulesUpdate() { + return new KafkaSourceRulesUpdateProcessor(userConfig).getKafkaSourceProcessor(parallelismDefinition.getKafkaSourceProcessorsNumber()); + } + + public Processor getKafkaSourceObservations() { + return new KafkaSourceObservationsProcessor(userConfig).getKafkaSourceProcessor(parallelismDefinition.getKafkaSourceProcessorsNumber()); } public Processor getSendAlertsProcessor() { diff --git a/src/main/java/com/intel/ruleengine/gearpump/tasks/KafkaSourceObservationsProcessor.java b/src/main/java/com/intel/ruleengine/gearpump/tasks/KafkaSourceObservationsProcessor.java new file mode 100644 index 0000000..131bb49 --- /dev/null +++ b/src/main/java/com/intel/ruleengine/gearpump/tasks/KafkaSourceObservationsProcessor.java @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2016 Intel Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.intel.ruleengine.gearpump.tasks; + +import com.intel.ruleengine.gearpump.tasks.KafkaSourceProcessor; +import io.gearpump.cluster.UserConfig; + + +public class KafkaSourceObservationsProcessor extends KafkaSourceProcessor { + + public static final String KAFKA_TOPIC_PROPERTY = "KAFKA_OBSERVATIONS_TOPIC"; + + private static final String NAME = "KafkaSourceObservations"; + + public KafkaSourceObservationsProcessor(UserConfig userConfig) { + super(userConfig, NAME, userConfig.getString(KAFKA_TOPIC_PROPERTY).get()); + } +} diff --git a/src/main/java/com/intel/ruleengine/gearpump/tasks/KafkaSourceProcessor.java b/src/main/java/com/intel/ruleengine/gearpump/tasks/KafkaSourceProcessor.java index f8ef3a6..711e6cd 100644 --- a/src/main/java/com/intel/ruleengine/gearpump/tasks/KafkaSourceProcessor.java +++ b/src/main/java/com/intel/ruleengine/gearpump/tasks/KafkaSourceProcessor.java @@ -33,13 +33,14 @@ public class KafkaSourceProcessor { public static final String KAFKA_URI_PROPERTY = "KAFKA_URI"; public static final String KAFKA_ZOOKEEPER_PROPERTY = "KAFKA_URI_ZOOKEEPER"; - private static final String NAME = "KafkaSource"; + private static String name; private final KafkaSource kafkaSource; private final ClientContext context; - public KafkaSourceProcessor(UserConfig userConfig) { - String topic = userConfig.getString(KAFKA_TOPIC_PROPERTY).get(); + public KafkaSourceProcessor(UserConfig userConfig, String name, String topic) { + this.name = name; + String zookeeperQuorum = userConfig.getString(KAFKA_ZOOKEEPER_PROPERTY).get(); String serverUri = userConfig.getString(KAFKA_URI_PROPERTY).get(); @@ -61,6 +62,6 @@ public KafkaSourceProcessor(UserConfig userConfig) { } public Processor getKafkaSourceProcessor(int parallelProcessorNumber) { - return Processor.source(kafkaSource, parallelProcessorNumber, NAME, UserConfig.empty(), context.system()); + return Processor.source(kafkaSource, parallelProcessorNumber, name, UserConfig.empty(), context.system()); } } diff --git a/src/main/java/com/intel/ruleengine/gearpump/tasks/KafkaSourceRulesUpdateProcessor.java b/src/main/java/com/intel/ruleengine/gearpump/tasks/KafkaSourceRulesUpdateProcessor.java new file mode 100644 index 0000000..a9ec2f7 --- /dev/null +++ b/src/main/java/com/intel/ruleengine/gearpump/tasks/KafkaSourceRulesUpdateProcessor.java @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2016 Intel Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.intel.ruleengine.gearpump.tasks; + +import com.intel.ruleengine.gearpump.tasks.KafkaSourceProcessor; +import io.gearpump.cluster.UserConfig; + +public class KafkaSourceRulesUpdateProcessor extends KafkaSourceProcessor { + + public static final String KAFKA_TOPIC_PROPERTY = "KAFKA_RULES_UPDATE_TOPIC"; + + private static final String NAME = "KafkaSourceRulesUpdate"; + + public KafkaSourceRulesUpdateProcessor(UserConfig userConfig) { + super(userConfig, NAME, userConfig.getString(KAFKA_TOPIC_PROPERTY).get()); + } +} diff --git a/src/main/java/com/intel/ruleengine/gearpump/tasks/processors/DownloadRulesTask.java b/src/main/java/com/intel/ruleengine/gearpump/tasks/processors/DownloadRulesTask.java index 90d65ad..3de6849 100644 --- a/src/main/java/com/intel/ruleengine/gearpump/tasks/processors/DownloadRulesTask.java +++ b/src/main/java/com/intel/ruleengine/gearpump/tasks/processors/DownloadRulesTask.java @@ -45,7 +45,6 @@ public class DownloadRulesTask extends RuleEngineTask { private static final String START_MSG = "start"; private static final String CONTINUE_MSG = "continue"; private final RulesApi rulesApi; - private static final int TRIGER_INTERVAL = 10; //in seconds private Map> componentsRules; public DownloadRulesTask(TaskContext context, UserConfig userConfig) { @@ -75,14 +74,12 @@ public void onNext(Message message) { } catch (Exception e) { getLogger().error("Unknown error during rules downloading.", e); } - getContext().scheduleOnce(FiniteDuration.create(TRIGER_INTERVAL, TimeUnit.SECONDS), new SelfTrigger()); } private Map> getComponentsRules() throws InvalidDashboardResponseException { List componentsRules = rulesApi.getActiveComponentsRules(); RuleParser ruleParser = new RuleParser(componentsRules); Map> result = ruleParser.getComponentRules(); - return result; } @@ -94,12 +91,4 @@ public static Processor getProcessor(UserConfig config, int parallelProcessorNum return createProcessor(DownloadRulesTask.class, config, parallelProcessorNumber, TASK_NAME); } - private class SelfTrigger extends scala.runtime.AbstractFunction0 { - @Override - public Object apply() { - self().tell(new Message(CONTINUE_MSG, now()), self()); - return null; - } - } - } diff --git a/src/main/java/com/intel/ruleengine/gearpump/tasks/processors/GetComponentRulesTask.java b/src/main/java/com/intel/ruleengine/gearpump/tasks/processors/GetComponentRulesTask.java index 71fdbba..fb8207c 100644 --- a/src/main/java/com/intel/ruleengine/gearpump/tasks/processors/GetComponentRulesTask.java +++ b/src/main/java/com/intel/ruleengine/gearpump/tasks/processors/GetComponentRulesTask.java @@ -57,6 +57,8 @@ public GetComponentRulesTask(TaskContext context, UserConfig userConfig, RulesRe public void onNext(Message message) { try { getLogger().info("GetRulesTask started"); + getLogger().warn("GetRulesTask started"); + observations = getInputMessage(message); sendObservations(); } catch (InvalidMessageTypeException e) { diff --git a/src/test/java/com/intel/ruleengine/gearpump/tasks/processors/DownloadRulesTaskTest.java b/src/test/java/com/intel/ruleengine/gearpump/tasks/processors/DownloadRulesTaskTest.java index d155ab5..364b498 100644 --- a/src/test/java/com/intel/ruleengine/gearpump/tasks/processors/DownloadRulesTaskTest.java +++ b/src/test/java/com/intel/ruleengine/gearpump/tasks/processors/DownloadRulesTaskTest.java @@ -104,7 +104,6 @@ public void onNextConditionsShouldBeFulfilled() throws InvalidDashboardResponseE downloadRulesTask.onNext(message); verify(taskContext, times(1)).output(expectedOutput); - verify(taskContext, times(1)).scheduleOnce(any(), any()); } @Test @@ -112,7 +111,6 @@ public void onNextShouldCatchInvalidDashboardResponseException() throws InvalidD when(rulesApi.getActiveComponentsRules()).thenThrow(InvalidDashboardResponseException.class); downloadRulesTask.onNext(message); verify(taskContext, never()).output(any()); - verify(taskContext, times(1)).scheduleOnce(any(), any()); } @Test @@ -120,6 +118,5 @@ public void onNextShouldCatchException() throws InvalidDashboardResponseExceptio when(rulesApi.getActiveComponentsRules()).thenThrow(Exception.class); downloadRulesTask.onNext(message); verify(taskContext, never()).output(any()); - verify(taskContext, times(1)).scheduleOnce(any(), any()); } }