From 789e50b2e568d5c86a3f54293968d0531df51536 Mon Sep 17 00:00:00 2001 From: cthermolia-grnet Date: Thu, 1 Dec 2022 15:51:34 +0200 Subject: [PATCH 1/8] ARGO-4123 Issue with generating events when there is a downtime --- .../main/java/argo/amr/ApiResourceManager.java | 1 - flink_jobs_v2/ams_ingest_metric/pom.xml | 2 +- .../java/argo/streaming/AmsIngestMetric.java | 2 +- .../java/argo/streaming/AmsStreamStatus.java | 18 +++++++++++++++--- .../src/main/java/status/StatusManager.java | 6 ++++++ .../src/main/java/sync/DowntimeCache.java | 5 +++++ 6 files changed, 28 insertions(+), 6 deletions(-) diff --git a/flink_jobs_v2/ApiResourceManager/src/main/java/argo/amr/ApiResourceManager.java b/flink_jobs_v2/ApiResourceManager/src/main/java/argo/amr/ApiResourceManager.java index 467c0aca..687c821f 100644 --- a/flink_jobs_v2/ApiResourceManager/src/main/java/argo/amr/ApiResourceManager.java +++ b/flink_jobs_v2/ApiResourceManager/src/main/java/argo/amr/ApiResourceManager.java @@ -297,7 +297,6 @@ public void getRemoteDowntimes() { String path = "https://%s/api/v2/downtimes?date=%s"; String fullURL = String.format(path, this.endpoint, this.date); String content = this.requestManager.getResource(fullURL); - this.data.put(ApiResource.DOWNTIMES, this.apiResponseParser.getJsonData(content, false)); } diff --git a/flink_jobs_v2/ams_ingest_metric/pom.xml b/flink_jobs_v2/ams_ingest_metric/pom.xml index 6ece83cb..7579c15c 100644 --- a/flink_jobs_v2/ams_ingest_metric/pom.xml +++ b/flink_jobs_v2/ams_ingest_metric/pom.xml @@ -382,4 +382,4 @@ language governing permissions and limitations under the License. --> --> - + \ No newline at end of file diff --git a/flink_jobs_v2/ams_ingest_metric/src/main/java/argo/streaming/AmsIngestMetric.java b/flink_jobs_v2/ams_ingest_metric/src/main/java/argo/streaming/AmsIngestMetric.java index 283832f4..d09fa219 100644 --- a/flink_jobs_v2/ams_ingest_metric/src/main/java/argo/streaming/AmsIngestMetric.java +++ b/flink_jobs_v2/ams_ingest_metric/src/main/java/argo/streaming/AmsIngestMetric.java @@ -266,4 +266,4 @@ private static void configJID() { //config the JID in the log4j.properties String jobId = getJID(); MDC.put("JID", jobId); } -} +} \ No newline at end of file diff --git a/flink_jobs_v2/stream_status/src/main/java/argo/streaming/AmsStreamStatus.java b/flink_jobs_v2/stream_status/src/main/java/argo/streaming/AmsStreamStatus.java index 17070db4..2685a708 100644 --- a/flink_jobs_v2/stream_status/src/main/java/argo/streaming/AmsStreamStatus.java +++ b/flink_jobs_v2/stream_status/src/main/java/argo/streaming/AmsStreamStatus.java @@ -49,6 +49,7 @@ import argo.avro.GroupEndpoint; import argo.avro.MetricData; import argo.avro.MetricProfile; +import java.text.SimpleDateFormat; import org.apache.commons.lang.StringUtils; import org.apache.flink.api.common.JobID; import org.apache.flink.core.fs.FileSystem; @@ -101,6 +102,9 @@ public class AmsStreamStatus { static Logger LOG = LoggerFactory.getLogger(AmsStreamStatus.class); private static String runDate; + private static String apiToken; + private static String apiEndpoint; + private static ApiResourceManager amr; /** * Sets configuration parameters to streaming enviroment @@ -188,8 +192,8 @@ public static void main(String[] args) throws Exception { String subMetric = parameterTool.getRequired("ams.sub.metric"); - String apiEndpoint = parameterTool.getRequired("api.endpoint"); - String apiToken = parameterTool.getRequired("api.token"); + apiEndpoint = parameterTool.getRequired("api.endpoint"); + apiToken = parameterTool.getRequired("api.token"); String reportID = parameterTool.getRequired("report.uuid"); int apiInterval = parameterTool.getInt("api.interval"); runDate = parameterTool.get("run.date"); @@ -209,7 +213,7 @@ public static void main(String[] args) throws Exception { String strictParam = parameterTool.get("interval.strict"); strictInterval = getInterval(strictParam); } - ApiResourceManager amr = new ApiResourceManager(apiEndpoint, apiToken); + amr = new ApiResourceManager(apiEndpoint, apiToken); // fetch // set params @@ -573,6 +577,14 @@ public void flatMap1(Tuple2 value, Collector out) String monHost = item.getMonitoringHost(); String message = item.getMessage(); String summary = item.getSummary(); + String dayStamp = tsMon.split("T")[0]; + + if (!sm.checkIfExistDowntime(dayStamp)) { + amr.setDate(dayStamp); + amr.getRemoteDowntimes(); + ArrayList downList = new ArrayList(Arrays.asList(amr.getListDowntimes())); + sm.addDowntimeSet(dayStamp, downList); + } // if daily generation is enable check if has day changed? if (config.daily && sm.hasDayChanged(sm.getTsLatest(), tsMon)) { diff --git a/flink_jobs_v2/stream_status/src/main/java/status/StatusManager.java b/flink_jobs_v2/stream_status/src/main/java/status/StatusManager.java index be02c439..7f15de2b 100644 --- a/flink_jobs_v2/stream_status/src/main/java/status/StatusManager.java +++ b/flink_jobs_v2/stream_status/src/main/java/status/StatusManager.java @@ -1179,4 +1179,10 @@ private boolean isStatusToRepeat(int status) { return false; } + + public boolean checkIfExistDowntime(String timestamp) { + + return this.dc.getCache().keySet().contains(timestamp); + } + } diff --git a/flink_jobs_v2/stream_status/src/main/java/sync/DowntimeCache.java b/flink_jobs_v2/stream_status/src/main/java/sync/DowntimeCache.java index a2a7eaf7..d6e78cbd 100644 --- a/flink_jobs_v2/stream_status/src/main/java/sync/DowntimeCache.java +++ b/flink_jobs_v2/stream_status/src/main/java/sync/DowntimeCache.java @@ -120,4 +120,9 @@ public String toString() { return this.cache.toString(); } + public TreeMap getCache() { + return cache; + } + + } From 8fb6062d4f8b1be9e785b7bfa4caab387c233da6 Mon Sep 17 00:00:00 2001 From: Konstantinos Kagkelidis Date: Thu, 8 Dec 2022 03:36:16 +0200 Subject: [PATCH 2/8] ARGO-4132 Fix class cast issue due to Avro SpecificDatumReader caching --- .../ams/connector/ArgoMessagingSource.java | 29 +- .../main/java/argo/avro/MetricDataOld.java | 536 ------------------ .../java/argo/streaming/AmsIngestMetric.java | 20 +- .../streaming/HBaseMetricOutputFormat.java | 2 +- .../main/java/argo/streaming/MetricParse.java | 4 +- 5 files changed, 38 insertions(+), 553 deletions(-) delete mode 100644 flink_jobs_v2/ams_ingest_metric/src/main/java/argo/avro/MetricDataOld.java diff --git a/flink_jobs_v2/ams-connector/src/main/java/ams/connector/ArgoMessagingSource.java b/flink_jobs_v2/ams-connector/src/main/java/ams/connector/ArgoMessagingSource.java index 806d1e68..2cc3a431 100644 --- a/flink_jobs_v2/ams-connector/src/main/java/ams/connector/ArgoMessagingSource.java +++ b/flink_jobs_v2/ams-connector/src/main/java/ams/connector/ArgoMessagingSource.java @@ -30,6 +30,8 @@ public class ArgoMessagingSource extends RichSourceFunction { private boolean useProxy = false; private String proxyURL = ""; private transient Object rateLck; // lock for waiting to establish rate + private boolean advanceOffset = true; + private volatile boolean isRunning = true; @@ -49,6 +51,21 @@ public ArgoMessagingSource(String endpoint, String port, String token, String pr this.runDate=runDate; } + + // second constructor with advanceOffset parametter + public ArgoMessagingSource(String endpoint, String port, String token, String project, String sub, int batch, Long interval, String runDate, boolean advanceOffset) { + this.endpoint = endpoint; + this.port = port; + this.token = token; + this.project = project; + this.sub = sub; + this.interval = interval; + this.batch = batch; + this.verify = true; + this.runDate=runDate; + this.advanceOffset = advanceOffset; + + } /** * Set verify to true or false. If set to false AMS client will be able to contact AMS endpoints that use self-signed certificates @@ -115,8 +132,16 @@ public void open(Configuration parameters) throws Exception { if (this.useProxy) { client.setProxy(this.proxyURL); } - int offset=client.offset(); //get the offset of the subscription, that corresponds to the date - client.modifyOffset(offset); //mofify the offset of the subscription to point to the offset index of the date. if date is null then the index points to the latest offset (max) + + // if advanceOffset is set to true (default) advance the offset to latest or based to the run date provided + if (advanceOffset) { + // get the offset of the subscription, that corresponds to the date + int offset=client.offset(); + // mofify the offset of the subscription to point to the offset index of the date. + // if date is null then the index points to the latest offset (max) + client.modifyOffset(offset); + } + } catch (KeyManagementException e) { e.printStackTrace(); } catch (NoSuchAlgorithmException e) { diff --git a/flink_jobs_v2/ams_ingest_metric/src/main/java/argo/avro/MetricDataOld.java b/flink_jobs_v2/ams_ingest_metric/src/main/java/argo/avro/MetricDataOld.java deleted file mode 100644 index dafb8d40..00000000 --- a/flink_jobs_v2/ams_ingest_metric/src/main/java/argo/avro/MetricDataOld.java +++ /dev/null @@ -1,536 +0,0 @@ -/** - * Autogenerated by Avro - * - * DO NOT EDIT DIRECTLY - */ -package argo.avro; -@SuppressWarnings("all") -@org.apache.avro.specific.AvroGenerated -public class MetricDataOld extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { - public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"MetricData\",\"namespace\":\"argo.avro\",\"fields\":[{\"name\":\"timestamp\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"service\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"hostname\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"metric\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"status\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"monitoring_host\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}]},{\"name\":\"summary\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}]},{\"name\":\"message\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}]},{\"name\":\"tags\",\"type\":[\"null\",{\"type\":\"map\",\"values\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"avro.java.string\":\"String\"}]}]}"); - public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } - @Deprecated public java.lang.String timestamp; - @Deprecated public java.lang.String service; - @Deprecated public java.lang.String hostname; - @Deprecated public java.lang.String metric; - @Deprecated public java.lang.String status; - @Deprecated public java.lang.String monitoring_host; - @Deprecated public java.lang.String summary; - @Deprecated public java.lang.String message; - @Deprecated public java.util.Map tags; - - /** - * Default constructor. - */ - public MetricDataOld() {} - - /** - * All-args constructor. - */ - public MetricDataOld(java.lang.String timestamp, java.lang.String service, java.lang.String hostname, java.lang.String metric, java.lang.String status, java.lang.String monitoring_host, java.lang.String summary, java.lang.String message, java.util.Map tags) { - this.timestamp = timestamp; - this.service = service; - this.hostname = hostname; - this.metric = metric; - this.status = status; - this.monitoring_host = monitoring_host; - this.summary = summary; - this.message = message; - this.tags = tags; - } - - public org.apache.avro.Schema getSchema() { return SCHEMA$; } - // Used by DatumWriter. Applications should not call. - public java.lang.Object get(int field$) { - switch (field$) { - case 0: return timestamp; - case 1: return service; - case 2: return hostname; - case 3: return metric; - case 4: return status; - case 5: return monitoring_host; - case 6: return summary; - case 7: return message; - case 8: return tags; - default: throw new org.apache.avro.AvroRuntimeException("Bad index"); - } - } - // Used by DatumReader. Applications should not call. - @SuppressWarnings(value="unchecked") - public void put(int field$, java.lang.Object value$) { - switch (field$) { - case 0: timestamp = (java.lang.String)value$; break; - case 1: service = (java.lang.String)value$; break; - case 2: hostname = (java.lang.String)value$; break; - case 3: metric = (java.lang.String)value$; break; - case 4: status = (java.lang.String)value$; break; - case 5: monitoring_host = (java.lang.String)value$; break; - case 6: summary = (java.lang.String)value$; break; - case 7: message = (java.lang.String)value$; break; - case 8: tags = (java.util.Map)value$; break; - default: throw new org.apache.avro.AvroRuntimeException("Bad index"); - } - } - - /** - * Gets the value of the 'timestamp' field. - */ - public java.lang.String getTimestamp() { - return timestamp; - } - - /** - * Sets the value of the 'timestamp' field. - * @param value the value to set. - */ - public void setTimestamp(java.lang.String value) { - this.timestamp = value; - } - - /** - * Gets the value of the 'service' field. - */ - public java.lang.String getService() { - return service; - } - - /** - * Sets the value of the 'service' field. - * @param value the value to set. - */ - public void setService(java.lang.String value) { - this.service = value; - } - - /** - * Gets the value of the 'hostname' field. - */ - public java.lang.String getHostname() { - return hostname; - } - - /** - * Sets the value of the 'hostname' field. - * @param value the value to set. - */ - public void setHostname(java.lang.String value) { - this.hostname = value; - } - - /** - * Gets the value of the 'metric' field. - */ - public java.lang.String getMetric() { - return metric; - } - - /** - * Sets the value of the 'metric' field. - * @param value the value to set. - */ - public void setMetric(java.lang.String value) { - this.metric = value; - } - - /** - * Gets the value of the 'status' field. - */ - public java.lang.String getStatus() { - return status; - } - - /** - * Sets the value of the 'status' field. - * @param value the value to set. - */ - public void setStatus(java.lang.String value) { - this.status = value; - } - - /** - * Gets the value of the 'monitoring_host' field. - */ - public java.lang.String getMonitoringHost() { - return monitoring_host; - } - - /** - * Sets the value of the 'monitoring_host' field. - * @param value the value to set. - */ - public void setMonitoringHost(java.lang.String value) { - this.monitoring_host = value; - } - - /** - * Gets the value of the 'summary' field. - */ - public java.lang.String getSummary() { - return summary; - } - - /** - * Sets the value of the 'summary' field. - * @param value the value to set. - */ - public void setSummary(java.lang.String value) { - this.summary = value; - } - - /** - * Gets the value of the 'message' field. - */ - public java.lang.String getMessage() { - return message; - } - - /** - * Sets the value of the 'message' field. - * @param value the value to set. - */ - public void setMessage(java.lang.String value) { - this.message = value; - } - - /** - * Gets the value of the 'tags' field. - */ - public java.util.Map getTags() { - return tags; - } - - /** - * Sets the value of the 'tags' field. - * @param value the value to set. - */ - public void setTags(java.util.Map value) { - this.tags = value; - } - - /** Creates a new MetricData RecordBuilder */ - public static argo.avro.MetricDataOld.Builder newBuilder() { - return new argo.avro.MetricDataOld.Builder(); - } - - /** Creates a new MetricData RecordBuilder by copying an existing Builder */ - public static argo.avro.MetricDataOld.Builder newBuilder(argo.avro.MetricDataOld.Builder other) { - return new argo.avro.MetricDataOld.Builder(other); - } - - /** Creates a new MetricData RecordBuilder by copying an existing MetricData instance */ - public static argo.avro.MetricDataOld.Builder newBuilder(argo.avro.MetricDataOld other) { - return new argo.avro.MetricDataOld.Builder(other); - } - - /** - * RecordBuilder for MetricData instances. - */ - public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase - implements org.apache.avro.data.RecordBuilder { - - private java.lang.String timestamp; - private java.lang.String service; - private java.lang.String hostname; - private java.lang.String metric; - private java.lang.String status; - private java.lang.String monitoring_host; - private java.lang.String summary; - private java.lang.String message; - private java.util.Map tags; - - /** Creates a new Builder */ - private Builder() { - super(argo.avro.MetricDataOld.SCHEMA$); - } - - /** Creates a Builder by copying an existing Builder */ - private Builder(argo.avro.MetricDataOld.Builder other) { - super(other); - } - - /** Creates a Builder by copying an existing MetricData instance */ - private Builder(argo.avro.MetricDataOld other) { - super(argo.avro.MetricDataOld.SCHEMA$); - if (isValidValue(fields()[0], other.timestamp)) { - this.timestamp = data().deepCopy(fields()[0].schema(), other.timestamp); - fieldSetFlags()[0] = true; - } - if (isValidValue(fields()[1], other.service)) { - this.service = data().deepCopy(fields()[1].schema(), other.service); - fieldSetFlags()[1] = true; - } - if (isValidValue(fields()[2], other.hostname)) { - this.hostname = data().deepCopy(fields()[2].schema(), other.hostname); - fieldSetFlags()[2] = true; - } - if (isValidValue(fields()[3], other.metric)) { - this.metric = data().deepCopy(fields()[3].schema(), other.metric); - fieldSetFlags()[3] = true; - } - if (isValidValue(fields()[4], other.status)) { - this.status = data().deepCopy(fields()[4].schema(), other.status); - fieldSetFlags()[4] = true; - } - if (isValidValue(fields()[5], other.monitoring_host)) { - this.monitoring_host = data().deepCopy(fields()[5].schema(), other.monitoring_host); - fieldSetFlags()[5] = true; - } - if (isValidValue(fields()[6], other.summary)) { - this.summary = data().deepCopy(fields()[6].schema(), other.summary); - fieldSetFlags()[6] = true; - } - if (isValidValue(fields()[7], other.message)) { - this.message = data().deepCopy(fields()[7].schema(), other.message); - fieldSetFlags()[7] = true; - } - if (isValidValue(fields()[8], other.tags)) { - this.tags = data().deepCopy(fields()[8].schema(), other.tags); - fieldSetFlags()[8] = true; - } - } - - /** Gets the value of the 'timestamp' field */ - public java.lang.String getTimestamp() { - return timestamp; - } - - /** Sets the value of the 'timestamp' field */ - public argo.avro.MetricDataOld.Builder setTimestamp(java.lang.String value) { - validate(fields()[0], value); - this.timestamp = value; - fieldSetFlags()[0] = true; - return this; - } - - /** Checks whether the 'timestamp' field has been set */ - public boolean hasTimestamp() { - return fieldSetFlags()[0]; - } - - /** Clears the value of the 'timestamp' field */ - public argo.avro.MetricDataOld.Builder clearTimestamp() { - timestamp = null; - fieldSetFlags()[0] = false; - return this; - } - - /** Gets the value of the 'service' field */ - public java.lang.String getService() { - return service; - } - - /** Sets the value of the 'service' field */ - public argo.avro.MetricDataOld.Builder setService(java.lang.String value) { - validate(fields()[1], value); - this.service = value; - fieldSetFlags()[1] = true; - return this; - } - - /** Checks whether the 'service' field has been set */ - public boolean hasService() { - return fieldSetFlags()[1]; - } - - /** Clears the value of the 'service' field */ - public argo.avro.MetricDataOld.Builder clearService() { - service = null; - fieldSetFlags()[1] = false; - return this; - } - - /** Gets the value of the 'hostname' field */ - public java.lang.String getHostname() { - return hostname; - } - - /** Sets the value of the 'hostname' field */ - public argo.avro.MetricDataOld.Builder setHostname(java.lang.String value) { - validate(fields()[2], value); - this.hostname = value; - fieldSetFlags()[2] = true; - return this; - } - - /** Checks whether the 'hostname' field has been set */ - public boolean hasHostname() { - return fieldSetFlags()[2]; - } - - /** Clears the value of the 'hostname' field */ - public argo.avro.MetricDataOld.Builder clearHostname() { - hostname = null; - fieldSetFlags()[2] = false; - return this; - } - - /** Gets the value of the 'metric' field */ - public java.lang.String getMetric() { - return metric; - } - - /** Sets the value of the 'metric' field */ - public argo.avro.MetricDataOld.Builder setMetric(java.lang.String value) { - validate(fields()[3], value); - this.metric = value; - fieldSetFlags()[3] = true; - return this; - } - - /** Checks whether the 'metric' field has been set */ - public boolean hasMetric() { - return fieldSetFlags()[3]; - } - - /** Clears the value of the 'metric' field */ - public argo.avro.MetricDataOld.Builder clearMetric() { - metric = null; - fieldSetFlags()[3] = false; - return this; - } - - /** Gets the value of the 'status' field */ - public java.lang.String getStatus() { - return status; - } - - /** Sets the value of the 'status' field */ - public argo.avro.MetricDataOld.Builder setStatus(java.lang.String value) { - validate(fields()[4], value); - this.status = value; - fieldSetFlags()[4] = true; - return this; - } - - /** Checks whether the 'status' field has been set */ - public boolean hasStatus() { - return fieldSetFlags()[4]; - } - - /** Clears the value of the 'status' field */ - public argo.avro.MetricDataOld.Builder clearStatus() { - status = null; - fieldSetFlags()[4] = false; - return this; - } - - /** Gets the value of the 'monitoring_host' field */ - public java.lang.String getMonitoringHost() { - return monitoring_host; - } - - /** Sets the value of the 'monitoring_host' field */ - public argo.avro.MetricDataOld.Builder setMonitoringHost(java.lang.String value) { - validate(fields()[5], value); - this.monitoring_host = value; - fieldSetFlags()[5] = true; - return this; - } - - /** Checks whether the 'monitoring_host' field has been set */ - public boolean hasMonitoringHost() { - return fieldSetFlags()[5]; - } - - /** Clears the value of the 'monitoring_host' field */ - public argo.avro.MetricDataOld.Builder clearMonitoringHost() { - monitoring_host = null; - fieldSetFlags()[5] = false; - return this; - } - - /** Gets the value of the 'summary' field */ - public java.lang.String getSummary() { - return summary; - } - - /** Sets the value of the 'summary' field */ - public argo.avro.MetricDataOld.Builder setSummary(java.lang.String value) { - validate(fields()[6], value); - this.summary = value; - fieldSetFlags()[6] = true; - return this; - } - - /** Checks whether the 'summary' field has been set */ - public boolean hasSummary() { - return fieldSetFlags()[6]; - } - - /** Clears the value of the 'summary' field */ - public argo.avro.MetricDataOld.Builder clearSummary() { - summary = null; - fieldSetFlags()[6] = false; - return this; - } - - /** Gets the value of the 'message' field */ - public java.lang.String getMessage() { - return message; - } - - /** Sets the value of the 'message' field */ - public argo.avro.MetricDataOld.Builder setMessage(java.lang.String value) { - validate(fields()[7], value); - this.message = value; - fieldSetFlags()[7] = true; - return this; - } - - /** Checks whether the 'message' field has been set */ - public boolean hasMessage() { - return fieldSetFlags()[7]; - } - - /** Clears the value of the 'message' field */ - public argo.avro.MetricDataOld.Builder clearMessage() { - message = null; - fieldSetFlags()[7] = false; - return this; - } - - /** Gets the value of the 'tags' field */ - public java.util.Map getTags() { - return tags; - } - - /** Sets the value of the 'tags' field */ - public argo.avro.MetricDataOld.Builder setTags(java.util.Map value) { - validate(fields()[8], value); - this.tags = value; - fieldSetFlags()[8] = true; - return this; - } - - /** Checks whether the 'tags' field has been set */ - public boolean hasTags() { - return fieldSetFlags()[8]; - } - - /** Clears the value of the 'tags' field */ - public argo.avro.MetricDataOld.Builder clearTags() { - tags = null; - fieldSetFlags()[8] = false; - return this; - } - - @Override - public MetricDataOld build() { - try { - MetricDataOld record = new MetricDataOld(); - record.timestamp = fieldSetFlags()[0] ? this.timestamp : (java.lang.String) defaultValue(fields()[0]); - record.service = fieldSetFlags()[1] ? this.service : (java.lang.String) defaultValue(fields()[1]); - record.hostname = fieldSetFlags()[2] ? this.hostname : (java.lang.String) defaultValue(fields()[2]); - record.metric = fieldSetFlags()[3] ? this.metric : (java.lang.String) defaultValue(fields()[3]); - record.status = fieldSetFlags()[4] ? this.status : (java.lang.String) defaultValue(fields()[4]); - record.monitoring_host = fieldSetFlags()[5] ? this.monitoring_host : (java.lang.String) defaultValue(fields()[5]); - record.summary = fieldSetFlags()[6] ? this.summary : (java.lang.String) defaultValue(fields()[6]); - record.message = fieldSetFlags()[7] ? this.message : (java.lang.String) defaultValue(fields()[7]); - record.tags = fieldSetFlags()[8] ? this.tags : (java.util.Map) defaultValue(fields()[8]); - return record; - } catch (Exception e) { - throw new org.apache.avro.AvroRuntimeException(e); - } - } - } -} diff --git a/flink_jobs_v2/ams_ingest_metric/src/main/java/argo/streaming/AmsIngestMetric.java b/flink_jobs_v2/ams_ingest_metric/src/main/java/argo/streaming/AmsIngestMetric.java index 283832f4..1dac8aeb 100644 --- a/flink_jobs_v2/ams_ingest_metric/src/main/java/argo/streaming/AmsIngestMetric.java +++ b/flink_jobs_v2/ams_ingest_metric/src/main/java/argo/streaming/AmsIngestMetric.java @@ -34,7 +34,6 @@ import com.google.gson.JsonParser; import argo.avro.MetricData; -import argo.avro.MetricDataOld; import org.apache.flink.api.common.JobID; import org.slf4j.MDC; @@ -57,7 +56,8 @@ * optional turn on/off ssl verify */ public class AmsIngestMetric { - // setup logger + private static final DatumReader METRIC_DATA_READER = (DatumReader)new SpecificData().createDatumReader(MetricData.getClassSchema()); + // setup logger static Logger LOG = LoggerFactory.getLogger(AmsIngestMetric.class); private static String runDate; @@ -156,7 +156,7 @@ public static void main(String[] args) throws Exception { } // Ingest sync avro encoded data from AMS endpoint - ArgoMessagingSource ams = new ArgoMessagingSource(endpoint, port, token, project, sub, batch, interval, runDate); + ArgoMessagingSource ams = new ArgoMessagingSource(endpoint, port, token, project, sub, batch, interval, runDate, false); if (parameterTool.has("ams.verify")) { ams.setVerify(parameterTool.getBoolean("ams.verify")); @@ -188,18 +188,14 @@ public void flatMap(String value, Collector out) throws Exception { byte[] decoded64 = Base64.decodeBase64(data.getBytes("UTF-8")); // Decode from avro - DatumReader avroReader = new SpecificDatumReader(MetricData.getClassSchema()); + + + Decoder decoder = DecoderFactory.get().binaryDecoder(decoded64, null); MetricData item; - try { - item = avroReader.read(null, decoder); - } catch (java.io.EOFException ex) { - //convert from old to new - avroReader = new SpecificDatumReader(MetricDataOld.getClassSchema(), MetricData.getClassSchema()); - decoder = DecoderFactory.get().binaryDecoder(decoded64, null); - item = avroReader.read(null, decoder); - } + item = METRIC_DATA_READER.read(null, decoder); + if (item != null) { LOG.info("Captured data -- {}", item.toString()); out.collect(item); diff --git a/flink_jobs_v2/ams_ingest_metric/src/main/java/argo/streaming/HBaseMetricOutputFormat.java b/flink_jobs_v2/ams_ingest_metric/src/main/java/argo/streaming/HBaseMetricOutputFormat.java index 5a4b084e..3fdce8a1 100644 --- a/flink_jobs_v2/ams_ingest_metric/src/main/java/argo/streaming/HBaseMetricOutputFormat.java +++ b/flink_jobs_v2/ams_ingest_metric/src/main/java/argo/streaming/HBaseMetricOutputFormat.java @@ -13,7 +13,7 @@ import org.apache.hadoop.hbase.util.Bytes; import argo.avro.MetricData; -import argo.avro.MetricDataOld; + /** * Hbase Output Format for storing Metric Data to an hbase destination diff --git a/flink_jobs_v2/ams_ingest_metric/src/main/java/argo/streaming/MetricParse.java b/flink_jobs_v2/ams_ingest_metric/src/main/java/argo/streaming/MetricParse.java index 40873bfb..9b6aaa6a 100644 --- a/flink_jobs_v2/ams_ingest_metric/src/main/java/argo/streaming/MetricParse.java +++ b/flink_jobs_v2/ams_ingest_metric/src/main/java/argo/streaming/MetricParse.java @@ -16,7 +16,7 @@ import com.google.gson.JsonElement; import argo.avro.MetricData; -import argo.avro.MetricDataOld; + @@ -32,7 +32,7 @@ public static ArrayList parseGroupEndpoint(byte[] avroBytes) throws ArrayList result = new ArrayList(); - DatumReader avroReader = new SpecificDatumReader(MetricDataOld.getClassSchema(),MetricData.getClassSchema(),new SpecificData()); + DatumReader avroReader = new SpecificDatumReader(MetricData.getClassSchema(),MetricData.getClassSchema(),new SpecificData()); BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(avroBytes, null); while (!decoder.isEnd()){ From d54f461f88764d66909c2e4955acda86c5eb7391 Mon Sep 17 00:00:00 2001 From: cthermolia-grnet Date: Thu, 1 Dec 2022 15:51:34 +0200 Subject: [PATCH 3/8] Fix NULL exception during using ArgoMessagingSource in stream status --- .../java/argo/streaming/AmsIngestMetric.java | 4 -- .../java/argo/streaming/AmsStreamStatus.java | 55 ++++++++++--------- 2 files changed, 28 insertions(+), 31 deletions(-) diff --git a/flink_jobs_v2/ams_ingest_metric/src/main/java/argo/streaming/AmsIngestMetric.java b/flink_jobs_v2/ams_ingest_metric/src/main/java/argo/streaming/AmsIngestMetric.java index a0ff8502..954ba93a 100644 --- a/flink_jobs_v2/ams_ingest_metric/src/main/java/argo/streaming/AmsIngestMetric.java +++ b/flink_jobs_v2/ams_ingest_metric/src/main/java/argo/streaming/AmsIngestMetric.java @@ -1,18 +1,14 @@ package argo.streaming; import ams.connector.ArgoMessagingSource; -import java.util.Arrays; import java.util.concurrent.TimeUnit; -import org.apache.avro.AvroRuntimeException; import org.apache.avro.io.DatumReader; import org.apache.avro.io.Decoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.specific.SpecificData; -import org.apache.avro.specific.SpecificDatumReader; import org.apache.commons.codec.binary.Base64; import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.common.io.OutputFormat; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.utils.ParameterTool; diff --git a/flink_jobs_v2/stream_status/src/main/java/argo/streaming/AmsStreamStatus.java b/flink_jobs_v2/stream_status/src/main/java/argo/streaming/AmsStreamStatus.java index 2685a708..d71f960b 100644 --- a/flink_jobs_v2/stream_status/src/main/java/argo/streaming/AmsStreamStatus.java +++ b/flink_jobs_v2/stream_status/src/main/java/argo/streaming/AmsStreamStatus.java @@ -104,8 +104,7 @@ public class AmsStreamStatus { private static String runDate; private static String apiToken; private static String apiEndpoint; - private static ApiResourceManager amr; - + /** * Sets configuration parameters to streaming enviroment * @@ -213,7 +212,7 @@ public static void main(String[] args) throws Exception { String strictParam = parameterTool.get("interval.strict"); strictInterval = getInterval(strictParam); } - amr = new ApiResourceManager(apiEndpoint, apiToken); + ApiResourceManager amr = new ApiResourceManager(apiEndpoint, apiToken); // fetch // set params @@ -358,6 +357,7 @@ private static class MetricDataWithGroup extends RichCoFlatMapFunction mpsList = new ArrayList(Arrays.asList(amr.getListMetrics())); - ArrayList egpList = new ArrayList(Arrays.asList(amr.getListGroupEndpoints())); + ArrayList mpsList = new ArrayList(Arrays.asList(this.amr.getListMetrics())); + ArrayList egpList = new ArrayList(Arrays.asList(this.amr.getListGroupEndpoints())); mps = new MetricProfileManager(); mps.loadFromList(mpsList); @@ -498,6 +498,7 @@ private static class StatusMap extends RichCoFlatMapFunction opsList = new ArrayList(); opsList.add(opsJSON); ArrayList apsList = new ArrayList(); apsList.add(apsJSON); - ArrayList downList = new ArrayList(Arrays.asList(amr.getListDowntimes())); - ArrayList mpsList = new ArrayList(Arrays.asList(amr.getListMetrics())); - ArrayList egpListFull = new ArrayList(Arrays.asList(amr.getListGroupEndpoints())); + ArrayList downList = new ArrayList(Arrays.asList(this.amr.getListDowntimes())); + ArrayList mpsList = new ArrayList(Arrays.asList(this.amr.getListMetrics())); + ArrayList egpListFull = new ArrayList(Arrays.asList(this.amr.getListGroupEndpoints())); // create a new status manager sm = new StatusManager(); @@ -544,7 +545,7 @@ public void open(Configuration parameters) throws IOException, ParseException, U sm.setStrictInterval(strictInterval); // sm.setTimeout(config.timeout); sm.setReport(config.report); - sm.setGroupType(amr.getEgroup()); + sm.setGroupType(this.amr.getEgroup()); // load all the connector data sm.loadAll(config.runDate, downList, egpListFull, mpsList, apsList, opsList); @@ -580,10 +581,10 @@ public void flatMap1(Tuple2 value, Collector out) String dayStamp = tsMon.split("T")[0]; if (!sm.checkIfExistDowntime(dayStamp)) { - amr.setDate(dayStamp); - amr.getRemoteDowntimes(); - ArrayList downList = new ArrayList(Arrays.asList(amr.getListDowntimes())); - sm.addDowntimeSet(dayStamp, downList); + this.amr.setDate(dayStamp); + this.amr.getRemoteDowntimes(); + ArrayList downList = new ArrayList(Arrays.asList(this.amr.getListDowntimes())); + sm.addDowntimeSet(dayStamp, downList); } // if daily generation is enable check if has day changed? From 4934b06d5769b2529d49eb5c10090ec7539f2119 Mon Sep 17 00:00:00 2001 From: Konstantinos Kagkelidis Date: Thu, 8 Dec 2022 16:27:14 +0200 Subject: [PATCH 4/8] Remove deprecated hbase code and MetricParse class --- .../java/argo/streaming/AmsIngestMetric.java | 14 +- .../streaming/HBaseMetricOutputFormat.java | 137 ------------------ .../main/java/argo/streaming/MetricParse.java | 65 --------- 3 files changed, 1 insertion(+), 215 deletions(-) delete mode 100644 flink_jobs_v2/ams_ingest_metric/src/main/java/argo/streaming/HBaseMetricOutputFormat.java delete mode 100644 flink_jobs_v2/ams_ingest_metric/src/main/java/argo/streaming/MetricParse.java diff --git a/flink_jobs_v2/ams_ingest_metric/src/main/java/argo/streaming/AmsIngestMetric.java b/flink_jobs_v2/ams_ingest_metric/src/main/java/argo/streaming/AmsIngestMetric.java index 954ba93a..dc4255e3 100644 --- a/flink_jobs_v2/ams_ingest_metric/src/main/java/argo/streaming/AmsIngestMetric.java +++ b/flink_jobs_v2/ams_ingest_metric/src/main/java/argo/streaming/AmsIngestMetric.java @@ -223,19 +223,7 @@ public void flatMap(String value, Collector out) throws Exception { metricDataPOJO.addSink(bs); } - // Check if saving to Hbase is desired - if (hasHbaseArgs(parameterTool)) { - // Initialize Output : Hbase Output Format - HBaseMetricOutputFormat hbf = new HBaseMetricOutputFormat(); - hbf.setMaster(parameterTool.getRequired("hbase.master")); - hbf.setMasterPort(parameterTool.getRequired("hbase.master-port")); - hbf.setZkQuorum(parameterTool.getRequired("hbase.zk.quorum")); - hbf.setZkPort(parameterTool.getRequired("hbase.zk.port")); - hbf.setNamespace(parameterTool.getRequired("hbase.namespace")); - hbf.setTableName(parameterTool.getRequired("hbase.table")); - - metricDataPOJO.writeUsingOutputFormat(hbf); - } + // Create a job title message to discern job in flink dashboard/cli StringBuilder jobTitleSB = new StringBuilder(); diff --git a/flink_jobs_v2/ams_ingest_metric/src/main/java/argo/streaming/HBaseMetricOutputFormat.java b/flink_jobs_v2/ams_ingest_metric/src/main/java/argo/streaming/HBaseMetricOutputFormat.java deleted file mode 100644 index 3fdce8a1..00000000 --- a/flink_jobs_v2/ams_ingest_metric/src/main/java/argo/streaming/HBaseMetricOutputFormat.java +++ /dev/null @@ -1,137 +0,0 @@ -package argo.streaming; - -import java.io.IOException; - -import org.apache.flink.api.common.io.OutputFormat; -import org.apache.flink.configuration.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.util.Bytes; - -import argo.avro.MetricData; - - -/** - * Hbase Output Format for storing Metric Data to an hbase destination - */ -public class HBaseMetricOutputFormat implements OutputFormat { - - private String master = null; - private String masterPort = null; - private String zkQuorum = null; - private String zkPort = null; - private String namespace = null; - private String tname = null; - private Connection connection = null; - private Table ht = null; - - - private static final long serialVersionUID = 1L; - - // Setters - public void setMasterPort(String masterPort) { - this.masterPort = masterPort; - } - - public void setMaster(String master) { - this.master = master; - } - - public void setZkQuorum(String zkQuorum) { - this.zkQuorum = zkQuorum; - } - - public void setZkPort(String zkPort) { - this.zkPort = zkPort; - } - - public void setNamespace(String namespace) { - this.namespace = namespace; - } - - public void setTableName(String tname) { - this.tname = tname; - } - - @Override - public void configure(Configuration parameters) { - - } - - /** - * Initialize Hbase remote connection - */ - @Override - public void open(int taskNumber, int numTasks) throws IOException { - // Create hadoop based configuration for hclient to use - org.apache.hadoop.conf.Configuration config = HBaseConfiguration.create(); - // Modify configuration to job needs - config.setInt("timeout", 120000); - if (masterPort != null && !masterPort.isEmpty()){ - config.set("hbase.master", master + ":" + masterPort); - }else { - config.set("hbase.master", master + ":60000"); - } - - config.set("hbase.zookeeper.quorum", zkQuorum); - config.set("hbase.zookeeper.property.clientPort", (zkPort)); - // Create the connection - connection = ConnectionFactory.createConnection(config); - if (namespace != null) { - ht = connection.getTable(TableName.valueOf(namespace + ":" + tname)); - } else { - ht = connection.getTable(TableName.valueOf(tname)); - } - - } - - /** - * Store a Metric Data object as an Hbase Record - */ - @Override - public void writeRecord(MetricData record) throws IOException { - - String ts = record.getTimestamp(); - String host = record.getHostname(); - String service = record.getService(); - String metric = record.getMetric(); - String mHost = record.getMonitoringHost(); - String status = record.getStatus(); - String summary = record.getSummary(); - String msg = record.getMessage(); - String tags = record.getTags().toString(); - - // Compile key - String key = host + "|" + service + "|" + metric + "|" +ts+ "|" + mHost; - - // Prepare columns - Put put = new Put(Bytes.toBytes(key)); - put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("timestamp"), Bytes.toBytes(ts)); - put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("host"), Bytes.toBytes(host)); - put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("service"), Bytes.toBytes(service)); - put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("metric"), Bytes.toBytes(metric)); - put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("monitoring_host"), Bytes.toBytes(mHost)); - put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("status"), Bytes.toBytes(status)); - put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("summary"), Bytes.toBytes(summary)); - put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("msg"), Bytes.toBytes(msg)); - put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("tags"), Bytes.toBytes(tags)); - - // Insert row in hbase - ht.put(put); - - } - - /** - * Close Hbase Connection - */ - @Override - public void close() throws IOException { - ht.close(); - connection.close(); - } - -} \ No newline at end of file diff --git a/flink_jobs_v2/ams_ingest_metric/src/main/java/argo/streaming/MetricParse.java b/flink_jobs_v2/ams_ingest_metric/src/main/java/argo/streaming/MetricParse.java deleted file mode 100644 index 9b6aaa6a..00000000 --- a/flink_jobs_v2/ams_ingest_metric/src/main/java/argo/streaming/MetricParse.java +++ /dev/null @@ -1,65 +0,0 @@ -package argo.streaming; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import java.util.Map.Entry; - -import org.apache.avro.io.BinaryDecoder; -import org.apache.avro.io.DatumReader; -import org.apache.avro.io.DecoderFactory; -import org.apache.avro.specific.SpecificData; -import org.apache.avro.specific.SpecificDatumReader; - -import com.google.gson.JsonElement; - -import argo.avro.MetricData; - - - - -/** - * SyncParse is a utility class providing methods to parse specific connector data in avro format - */ -public class MetricParse { - - /** - * Parses a byte array and decodes avro MetricData objects - */ - public static ArrayList parseGroupEndpoint(byte[] avroBytes) throws IOException{ - - ArrayList result = new ArrayList(); - - DatumReader avroReader = new SpecificDatumReader(MetricData.getClassSchema(),MetricData.getClassSchema(),new SpecificData()); - BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(avroBytes, null); - - while (!decoder.isEnd()){ - MetricData cur = avroReader.read(null, decoder); - result.add(cur); - } - - return result; - } - - - /** - * Parses attributes from a json attribute element - */ - public static Map parseAttributes(JsonElement jAttr) throws IOException{ - - Map result = new HashMap(); - if (jAttr!=null){ - Set> jItems = jAttr.getAsJsonObject().entrySet(); - - for (Entry jItem : jItems){ - result.put(jItem.getKey(), jItem.getValue().getAsString()); - } - } - - return result; - } - - -} From 29c0f26199921041e0c41ae505f0f2bf60bb0271 Mon Sep 17 00:00:00 2001 From: cthermolia-grnet Date: Fri, 9 Dec 2022 09:51:25 +0200 Subject: [PATCH 5/8] ARGO-4133 AMS connector offset advancement based on date --- .../src/main/java/argo/streaming/AmsStreamStatus.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/flink_jobs_v2/stream_status/src/main/java/argo/streaming/AmsStreamStatus.java b/flink_jobs_v2/stream_status/src/main/java/argo/streaming/AmsStreamStatus.java index d71f960b..96ee0a9a 100644 --- a/flink_jobs_v2/stream_status/src/main/java/argo/streaming/AmsStreamStatus.java +++ b/flink_jobs_v2/stream_status/src/main/java/argo/streaming/AmsStreamStatus.java @@ -96,6 +96,9 @@ * Any of these formats is transformed to minutes in the computations if not * defined the default value is 1440m * + * -- latest.offset (Optional) boolean true/false, to define if the argo messaging source + * should set offset at the latest or at the start of the runDate. By default, if not defined , the + * offset should be the latest. */ public class AmsStreamStatus { // setup logger @@ -238,7 +241,11 @@ public static void main(String[] args) throws Exception { // Establish the metric data AMS stream // Ingest sync avro encoded data from AMS endpoint - ArgoMessagingSource amsMetric = new ArgoMessagingSource(endpoint, port, token, project, subMetric, batch, interval, runDate); + String offsetDt=null; + if(!parameterTool.getBoolean("latest.offset")){ + offsetDt=runDate; + } + ArgoMessagingSource amsMetric = new ArgoMessagingSource(endpoint, port, token, project, subMetric, batch, interval, offsetDt); ArgoApiSource apiSync = new ArgoApiSource(apiEndpoint, apiToken, reportID, apiInterval, interval); if (parameterTool.has("ams.verify")) { From 148d57bf57fe8e5a5dc9e461cae610f34a1301e4 Mon Sep 17 00:00:00 2001 From: cthermolia-grnet Date: Tue, 20 Dec 2022 16:45:33 +0200 Subject: [PATCH 6/8] minor fix initializing parameter latest.offset --- .../src/main/java/argo/streaming/AmsStreamStatus.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink_jobs_v2/stream_status/src/main/java/argo/streaming/AmsStreamStatus.java b/flink_jobs_v2/stream_status/src/main/java/argo/streaming/AmsStreamStatus.java index 96ee0a9a..5632f7dd 100644 --- a/flink_jobs_v2/stream_status/src/main/java/argo/streaming/AmsStreamStatus.java +++ b/flink_jobs_v2/stream_status/src/main/java/argo/streaming/AmsStreamStatus.java @@ -242,7 +242,7 @@ public static void main(String[] args) throws Exception { // Establish the metric data AMS stream // Ingest sync avro encoded data from AMS endpoint String offsetDt=null; - if(!parameterTool.getBoolean("latest.offset")){ + if(parameterTool.has("latest.offset") && !parameterTool.getBoolean("latest.offset")){ offsetDt=runDate; } ArgoMessagingSource amsMetric = new ArgoMessagingSource(endpoint, port, token, project, subMetric, batch, interval, offsetDt); From 4162a9c2411569ab3a802bdccaba331f89144441 Mon Sep 17 00:00:00 2001 From: cthermolia-grnet Date: Fri, 3 Jun 2022 09:12:25 +0300 Subject: [PATCH 7/8] ARGO-3378 Support a tenant that has been declared as having combined data in job submission --- .../src/main/java/argo/amr/ApiResource.java | 2 +- .../java/argo/amr/ApiResourceManager.java | 50 ++++++++-- .../main/java/argo/amr/ApiResponseParser.java | 26 ++++- .../main/java/argo/batch/ArgoMultiJob.java | 97 +++++++++++++------ 4 files changed, 136 insertions(+), 39 deletions(-) diff --git a/flink_jobs_v2/ApiResourceManager/src/main/java/argo/amr/ApiResource.java b/flink_jobs_v2/ApiResourceManager/src/main/java/argo/amr/ApiResource.java index d6cc60e9..ebdf241c 100644 --- a/flink_jobs_v2/ApiResourceManager/src/main/java/argo/amr/ApiResource.java +++ b/flink_jobs_v2/ApiResourceManager/src/main/java/argo/amr/ApiResource.java @@ -1,5 +1,5 @@ package argo.amr; public enum ApiResource { - CONFIG, OPS, METRIC, AGGREGATION, THRESHOLDS, TOPOENDPOINTS, TOPOGROUPS, WEIGHTS, DOWNTIMES, RECOMPUTATIONS, MTAGS + CONFIG, OPS, METRIC, AGGREGATION, THRESHOLDS, TOPOENDPOINTS, TOPOGROUPS, WEIGHTS, DOWNTIMES, RECOMPUTATIONS, MTAGS, TENANTFEED } \ No newline at end of file diff --git a/flink_jobs_v2/ApiResourceManager/src/main/java/argo/amr/ApiResourceManager.java b/flink_jobs_v2/ApiResourceManager/src/main/java/argo/amr/ApiResourceManager.java index 687c821f..54ca1751 100644 --- a/flink_jobs_v2/ApiResourceManager/src/main/java/argo/amr/ApiResourceManager.java +++ b/flink_jobs_v2/ApiResourceManager/src/main/java/argo/amr/ApiResourceManager.java @@ -33,6 +33,7 @@ public class ApiResourceManager { private String weightsID; private RequestManager requestManager; private ApiResponseParser apiResponseParser; + private boolean isCombined; //private boolean verify; //private int timeoutSec; @@ -47,8 +48,8 @@ public ApiResourceManager(String endpoint, String token) { this.reportID = ""; this.date = ""; this.weightsID = ""; - this.tenant=""; - this.egroup=""; + this.tenant = ""; + this.egroup = ""; this.requestManager = new RequestManager("", this.token); this.apiResponseParser = new ApiResponseParser(this.reportName, this.metricID, this.aggregationID, this.opsID, this.threshID, this.tenant, this.egroup); } @@ -182,8 +183,6 @@ public String getEgroup() { public void setEgroup(String egroup) { this.egroup = egroup; } - - /** * Retrieves the remote report configuration based on reportID main class @@ -352,8 +351,8 @@ public void parseReport() { this.opsID = this.apiResponseParser.getOpsID(); this.threshID = this.apiResponseParser.getThreshID(); this.reportName = this.apiResponseParser.getReportName(); - this.tenant=this.apiResponseParser.getTenant(); - this.egroup=this.apiResponseParser.getEgroup(); + this.tenant = this.apiResponseParser.getTenant(); + this.egroup = this.apiResponseParser.getEgroup(); } /** @@ -457,12 +456,43 @@ public MetricProfile[] getListMetrics() { return rArr; } + /** + * Retrieves the remote report configuration based on reportID main class + * attribute and stores the content in the enum map + */ + public void getRemoteTenantFeed() { + String path = "https://%s/api/v2/feeds/data"; + String fullURL = String.format(path, this.endpoint); + String content = this.requestManager.getResource(fullURL); + if (content != null) { + this.data.put(ApiResource.TENANTFEED, this.apiResponseParser.getJsonData(content, true)); + } + } + + public String[] getListTenants() { + List results = new ArrayList(); + if (!this.data.containsKey(ApiResource.TENANTFEED)) { + String[] rArr = new String[results.size()]; + rArr = results.toArray(rArr); + return rArr; + } + + String content = this.data.get(ApiResource.TENANTFEED); + results = this.apiResponseParser.getListTenants(content); + String[] rArr = new String[results.size()]; + rArr = results.toArray(rArr); + return rArr; + } + /** * Executes all steps to retrieve the complete amount of the available * profile, topology, weights and downtime information from argo-web-api */ public void getRemoteAll() { // Start with report and configuration + if (isCombined) { + this.getRemoteTenantFeed(); + } this.getRemoteConfig(); // parse remote report config to be able to get the other profiles @@ -488,4 +518,12 @@ public void getRemoteAll() { this.getRemoteMetricTags(); } + public boolean isIsCombined() { + return isCombined; + } + + public void setIsCombined(boolean isCombined) { + this.isCombined = isCombined; + } + } diff --git a/flink_jobs_v2/ApiResourceManager/src/main/java/argo/amr/ApiResponseParser.java b/flink_jobs_v2/ApiResourceManager/src/main/java/argo/amr/ApiResponseParser.java index 321fa363..ab94f615 100644 --- a/flink_jobs_v2/ApiResourceManager/src/main/java/argo/amr/ApiResponseParser.java +++ b/flink_jobs_v2/ApiResourceManager/src/main/java/argo/amr/ApiResponseParser.java @@ -14,8 +14,8 @@ import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; + import java.util.ArrayList; -import java.util.EnumMap; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -103,13 +103,11 @@ public void setEgroup(String egroup) { this.egroup = egroup; } - /** * Extract first JSON item from data JSON array in api response * * @param content JSON content of the full repsonse (status + data) * @return First available item in data array as JSON string representation - * */ public String getJsonData(String content, boolean asArray) { @@ -118,10 +116,17 @@ public String getJsonData(String content, boolean asArray) { JsonElement jElement = jsonParser.parse(content); JsonObject jRoot = jElement.getAsJsonObject(); // Get the data array and the first item + if (asArray) { + if (jRoot.get("data") == null) { + return null; + } return jRoot.get("data").toString(); } JsonArray jData = jRoot.get("data").getAsJsonArray(); + if (!jData.iterator().hasNext()) { + return null; + } JsonElement jItem = jData.get(0); return jItem.toString(); } @@ -163,6 +168,21 @@ public void parseReport(String content) { } + public List getListTenants(String content) { + List results = new ArrayList(); + + JsonParser jsonParser = new JsonParser(); + JsonElement jElement = jsonParser.parse(content); + JsonArray jArray = jElement.getAsJsonArray(); + JsonObject jRoot = jArray.get(0).getAsJsonObject(); + JsonArray tenants = jRoot.get("tenants").getAsJsonArray(); + for (int i = 0; i < tenants.size(); i++) { + String jItem = tenants.get(i).getAsString(); + results.add(jItem); + } + return results; + } + /** * Parses the Downtime content retrieved from argo-web-api and provides a * list of Downtime avro objects to be used in the next steps of the diff --git a/flink_jobs_v2/batch_multi/src/main/java/argo/batch/ArgoMultiJob.java b/flink_jobs_v2/batch_multi/src/main/java/argo/batch/ArgoMultiJob.java index 022b63df..2efa7c08 100644 --- a/flink_jobs_v2/batch_multi/src/main/java/argo/batch/ArgoMultiJob.java +++ b/flink_jobs_v2/batch_multi/src/main/java/argo/batch/ArgoMultiJob.java @@ -49,7 +49,10 @@ import trends.status.GroupTrendsCounter; import trends.status.MetricTrendsCounter; import trends.status.ServiceTrendsCounter; +import utils.Utils; +import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import org.apache.flink.api.common.JobID; import org.joda.time.DateTime; @@ -58,7 +61,7 @@ /** * Implements an ARGO Status Batch Job in flink - * + *

* Submit job in flink cluster using the following parameters: --pdata: path to * previous day's metric data file (For hdfs use: * hdfs://namenode:port/path/to/file) --mdata: path to metric data file (For @@ -119,13 +122,17 @@ public static void main(String[] args) throws Exception { if (params.get("clearMongo") != null && params.getBoolean("clearMongo") == true) { clearMongo = true; } - + boolean isCombined = false; + if (params.get("isCombined") != null && params.getBoolean("isCombined") == true) { + isCombined = true; + } String apiEndpoint = params.getRequired("api.endpoint"); String apiToken = params.getRequired("api.token"); reportID = params.getRequired("report.id"); ApiResourceManager amr = new ApiResourceManager(apiEndpoint, apiToken); + // fetch // set params if (params.has("api.proxy")) { @@ -136,7 +143,7 @@ public static void main(String[] args) throws Exception { amr.setTimeoutSec(params.getInt("api.timeout")); } runDate = params.getRequired("run.date"); - + amr.setIsCombined(isCombined); amr.setReportID(reportID); amr.setDate(runDate); amr.getRemoteAll(); @@ -148,6 +155,9 @@ public static void main(String[] args) throws Exception { cfgMgr.loadJsonString(confData); enableComputations(cfgMgr.activeComputations, params); + if(isCombined){ + amr.getTenant(); + } DataSource opsDS = env.fromElements(amr.getResourceJSON(ApiResource.OPS)); DataSource apsDS = env.fromElements(amr.getResourceJSON(ApiResource.AGGREGATION)); @@ -177,6 +187,8 @@ public static void main(String[] args) throws Exception { thrDS = env.fromElements(amr.getResourceJSON(ApiResource.THRESHOLDS)); } + // Get conf data + DataSet mpsDS = env.fromElements(amr.getListMetrics()); DataSet egpDS = env.fromElements(amr.getListGroupEndpoints()); DataSet ggpDS = env.fromElements(new GroupGroup()); @@ -189,32 +201,59 @@ public static void main(String[] args) throws Exception { if (listDowntimes.length > 0) { downDS = env.fromElements(amr.getListDowntimes()); } - // todays metric data - Path in = new Path(params.getRequired("mdata")); - AvroInputFormat mdataAvro = new AvroInputFormat(in, MetricData.class); - DataSet mdataDS = env.createInput(mdataAvro); - // previous metric data - Path pin = new Path(params.getRequired("pdata")); - AvroInputFormat pdataAvro = new AvroInputFormat(pin, MetricData.class); - DataSet pdataDS = env.createInput(pdataAvro); - DataSet pdataCleanDS = pdataDS.flatMap(new ExcludeMetricData()).withBroadcastSet(recDS, "rec"); + List tenantList=new ArrayList<>(); + if (isCombined) { + tenantList = Arrays.asList(amr.getListTenants()); + }else{ + tenantList.add(amr.getTenant()); + } + List tenantPaths=new ArrayList<>(); + + DateTime currentDate=Utils.convertStringtoDate("yyyy-MM-dd",runDate); + String previousDate=Utils.convertDateToString("yyyy-MM-dd",currentDate.minusDays(1)); + for(String tenant: tenantList){ + Path[] paths=new Path[2]; + paths[0]=new Path(params.get("basispath")+"/"+tenant+"/mdata/"+runDate); + paths[1] =new Path(params.get("basispath")+"/"+tenant+"/mdata/"+previousDate); + tenantPaths.add(paths); + } + DataSet allMetricData=null; + for(Path[] path:tenantPaths) { + // todays metric data + Path in = path[0]; + AvroInputFormat mdataAvro = new AvroInputFormat(in, MetricData.class); + DataSet mdataDS = env.createInput(mdataAvro); + + // previous metric data + Path pin = path[1]; + AvroInputFormat pdataAvro = new AvroInputFormat(pin, MetricData.class); + DataSet pdataDS = env.createInput(pdataAvro); + - // Find the latest day - DataSet pdataMin = pdataCleanDS.groupBy("service", "hostname", "metric") - .sortGroup("timestamp", Order.DESCENDING).first(1); + DataSet pdataCleanDS = pdataDS.flatMap(new ExcludeMetricData()).withBroadcastSet(recDS, "rec"); - // Union todays data with the latest statuses from previous day - DataSet mdataPrevTotalDS = mdataDS.union(pdataMin); + // Find the latest day + DataSet pdataMin = pdataCleanDS.groupBy("service", "hostname", "metric") + .sortGroup("timestamp", Order.DESCENDING).first(1); + // Union todays data with the latest statuses from previous day + DataSet mdataPrevTotalDS = mdataDS.union(pdataMin); + + if(allMetricData==null) { + allMetricData = mdataPrevTotalDS; + }else{ + allMetricData=allMetricData.union(mdataPrevTotalDS); + } + } // Use yesterday's latest statuses and todays data to find the missing ones and add them to the mix - DataSet fillMissDS = mdataPrevTotalDS.reduceGroup(new FillMissing(params)) + DataSet fillMissDS = allMetricData.reduceGroup(new FillMissing(params)) .withBroadcastSet(mpsDS, "mps").withBroadcastSet(egpDS, "egp").withBroadcastSet(ggpDS, "ggp") .withBroadcastSet(opsDS, "ops").withBroadcastSet(confDS, "conf"); // Discard unused data and attach endpoint group as information - DataSet mdataTrimDS = mdataPrevTotalDS.flatMap(new PickEndpoints(params)) + DataSet mdataTrimDS = allMetricData.flatMap(new PickEndpoints(params)) .withBroadcastSet(mpsDS, "mps").withBroadcastSet(egpDS, "egp").withBroadcastSet(ggpDS, "ggp") .withBroadcastSet(recDS, "rec").withBroadcastSet(confDS, "conf").withBroadcastSet(thrDS, "thr") .withBroadcastSet(opsDS, "ops").withBroadcastSet(apsDS, "aps"); @@ -237,6 +276,7 @@ public static void main(String[] args) throws Exception { .reduceGroup(new CalcMetricTimeline(params)).withBroadcastSet(mpsDS, "mps").withBroadcastSet(opsDS, "ops") .withBroadcastSet(apsDS, "aps"); + //Create StatusMetricTimeline dataset for endpoints DataSet statusEndpointTimeline = statusMetricTimeline.groupBy("group", "service", "hostname") .reduceGroup(new CalcEndpointTimeline(params, now)).withBroadcastSet(mpsDS, "mps").withBroadcastSet(opsDS, "ops") @@ -320,7 +360,6 @@ public static void main(String[] args) throws Exception { DataSet serviceTrends = statusServiceTimeline.flatMap(new CalcServiceFlipFlopTrends()); DataSet groupTrends = statusGroupTimeline.flatMap(new CalcGroupFlipFlopTrends()); if (calcFlipFlops) { - DataSet noZeroMetricFlipFlops = metricTrends.filter(new ZeroMetricFlipFlopFilter()); if (rankNum != null) { //sort and rank data noZeroMetricFlipFlops = noZeroMetricFlipFlops.sortPartition("flipflops", Order.DESCENDING).setParallelism(1).first(rankNum); @@ -372,7 +411,7 @@ public static void main(String[] args) throws Exception { if (calcStatusTrends) { //flatMap dataset to tuples and count the apperances of each status type to the timeline - DataSet< Tuple8< String, String, String, String, String, Integer, Integer, String>> metricStatusTrendsData = metricTrends.flatMap(new MetricTrendsCounter()).withBroadcastSet(opsDS, "ops").withBroadcastSet(mtagsDS, "mtags"); + DataSet> metricStatusTrendsData = metricTrends.flatMap(new MetricTrendsCounter()).withBroadcastSet(opsDS, "ops").withBroadcastSet(mtagsDS, "mtags"); //filter dataset for each status type and write to mongo db filterByStatusAndWriteMongo(MongoTrendsOutput.TrendsType.TRENDS_STATUS_METRIC, "status_trends_metrics", metricStatusTrendsData, "critical"); filterByStatusAndWriteMongo(MongoTrendsOutput.TrendsType.TRENDS_STATUS_METRIC, "status_trends_metrics", metricStatusTrendsData, "warning"); @@ -380,7 +419,7 @@ public static void main(String[] args) throws Exception { /*=============================================================================================*/ //flatMap dataset to tuples and count the apperances of each status type to the timeline - DataSet< Tuple8< String, String, String, String, String, Integer, Integer, String>> endpointStatusTrendsData = endpointTrends.flatMap(new EndpointTrendsCounter()).withBroadcastSet(opsDS, "ops"); + DataSet> endpointStatusTrendsData = endpointTrends.flatMap(new EndpointTrendsCounter()).withBroadcastSet(opsDS, "ops"); //filter dataset for each status type and write to mongo db filterByStatusAndWriteMongo(MongoTrendsOutput.TrendsType.TRENDS_STATUS_ENDPOINT, "status_trends_endpoints", endpointStatusTrendsData, "critical"); @@ -390,7 +429,7 @@ public static void main(String[] args) throws Exception { /** * ************************************************************************************************** */ - DataSet< Tuple8< String, String, String, String, String, Integer, Integer, String>> serviceStatusTrendsData = serviceTrends.flatMap(new ServiceTrendsCounter()).withBroadcastSet(opsDS, "ops"); + DataSet> serviceStatusTrendsData = serviceTrends.flatMap(new ServiceTrendsCounter()).withBroadcastSet(opsDS, "ops"); //filter dataset for each status type and write to mongo db filterByStatusAndWriteMongo(MongoTrendsOutput.TrendsType.TRENDS_STATUS_SERVICE, "status_trends_services", serviceStatusTrendsData, "critical"); filterByStatusAndWriteMongo(MongoTrendsOutput.TrendsType.TRENDS_STATUS_SERVICE, "status_trends_services", serviceStatusTrendsData, "warning"); @@ -401,7 +440,7 @@ public static void main(String[] args) throws Exception { */ //group data by group and count flip flops //flatMap dataset to tuples and count the apperances of each status type to the timeline - DataSet< Tuple8< String, String, String, String, String, Integer, Integer, String>> groupStatusTrendsData = groupTrends.flatMap(new GroupTrendsCounter()).withBroadcastSet(opsDS, "ops"); + DataSet> groupStatusTrendsData = groupTrends.flatMap(new GroupTrendsCounter()).withBroadcastSet(opsDS, "ops"); //filter dataset for each status type and write to mongo db filterByStatusAndWriteMongo(MongoTrendsOutput.TrendsType.TRENDS_STATUS_GROUP, "status_trends_groups", groupStatusTrendsData, "critical"); filterByStatusAndWriteMongo(MongoTrendsOutput.TrendsType.TRENDS_STATUS_GROUP, "status_trends_groups", groupStatusTrendsData, "warning"); @@ -422,9 +461,9 @@ public static void main(String[] args) throws Exception { } - private static void filterByStatusAndWriteMongo(MongoTrendsOutput.TrendsType mongoTrendsType, String uri, DataSet< Tuple8< String, String, String, String, String, Integer, Integer, String>> data, String status) { + private static void filterByStatusAndWriteMongo(MongoTrendsOutput.TrendsType mongoTrendsType, String uri, DataSet> data, String status) { - DataSet< Tuple8< String, String, String, String, String, Integer, Integer, String>> filteredData = data.filter(new StatusAndDurationFilter(status)); //filter dataset by status type and status appearances>0 + DataSet> filteredData = data.filter(new StatusAndDurationFilter(status)); //filter dataset by status type and status appearances>0 if (rankNum != null) { filteredData = filteredData.sortPartition(7, Order.DESCENDING).setParallelism(1).first(rankNum); @@ -436,15 +475,15 @@ private static void filterByStatusAndWriteMongo(MongoTrendsOutput.TrendsType mon } // write status trends to mongo db - private static void writeStatusTrends(DataSet< Tuple8< String, String, String, String, String, Integer, Integer, String>> outputData, String uri, final MongoTrendsOutput.TrendsType mongoCase, DataSet< Tuple8< String, String, String, String, String, Integer, Integer, String>> data, String status) { + private static void writeStatusTrends(DataSet> outputData, String uri, final MongoTrendsOutput.TrendsType mongoCase, DataSet> data, String status) { //MongoTrendsOutput.TrendsType.TRENDS_STATUS_ENDPOINT MongoTrendsOutput metricMongoOut = new MongoTrendsOutput(dbURI, uri, mongoCase, reportID, runDate, clearMongo); - DataSet trends = outputData.map(new MapFunction< Tuple8< String, String, String, String, String, Integer, Integer, String>, Trends>() { + DataSet trends = outputData.map(new MapFunction, Trends>() { @Override - public Trends map(Tuple8< String, String, String, String, String, Integer, Integer, String> in) throws Exception { + public Trends map(Tuple8 in) throws Exception { switch (mongoCase) { case TRENDS_STATUS_METRIC: return new Trends(in.f0, in.f1, in.f2, in.f3, in.f4, in.f5, in.f6, in.f7); From 841144045c11d959b0afd088b21278d016337bec Mon Sep 17 00:00:00 2001 From: Konstantinos Kagkelidis Date: Thu, 2 Feb 2023 17:47:04 +0200 Subject: [PATCH 8/8] Bump to v2.1.1 --- flink_jobs_v2/ApiResourceManager/pom.xml | 4 ++-- flink_jobs_v2/ProfilesManager/pom.xml | 4 ++-- flink_jobs_v2/Timelines/pom.xml | 4 ++-- flink_jobs_v2/ams-connector/pom.xml | 4 ++-- flink_jobs_v2/ams_ingest_metric/pom.xml | 4 ++-- flink_jobs_v2/ams_ingest_sync/pom.xml | 4 ++-- flink_jobs_v2/batch_multi/pom.xml | 10 +++++----- flink_jobs_v2/pom.xml | 2 +- flink_jobs_v2/stream_status/pom.xml | 10 +++++----- 9 files changed, 23 insertions(+), 23 deletions(-) diff --git a/flink_jobs_v2/ApiResourceManager/pom.xml b/flink_jobs_v2/ApiResourceManager/pom.xml index ad52bead..0db6a9cc 100644 --- a/flink_jobs_v2/ApiResourceManager/pom.xml +++ b/flink_jobs_v2/ApiResourceManager/pom.xml @@ -4,9 +4,9 @@ flink.jobs.v2 flink_jobs_v2 - 2.1.0 + 2.1.1 - 2.1.0 + 2.1.1 api.resource.manager ApiResourceManager jar diff --git a/flink_jobs_v2/ProfilesManager/pom.xml b/flink_jobs_v2/ProfilesManager/pom.xml index 33daffb2..d6aef8da 100644 --- a/flink_jobs_v2/ProfilesManager/pom.xml +++ b/flink_jobs_v2/ProfilesManager/pom.xml @@ -4,11 +4,11 @@ flink.jobs.v2 flink_jobs_v2 - 2.1.0 + 2.1.1 profiles.manager ProfilesManager - 2.1.0 + 2.1.1 jar diff --git a/flink_jobs_v2/Timelines/pom.xml b/flink_jobs_v2/Timelines/pom.xml index daed1286..50cd73f8 100644 --- a/flink_jobs_v2/Timelines/pom.xml +++ b/flink_jobs_v2/Timelines/pom.xml @@ -4,11 +4,11 @@ flink.jobs.v2 flink_jobs_v2 - 2.1.0 + 2.1.1 timeline.manager Timelines - 2.1.0 + 2.1.1 jar diff --git a/flink_jobs_v2/ams-connector/pom.xml b/flink_jobs_v2/ams-connector/pom.xml index 17013fe5..9a22721b 100644 --- a/flink_jobs_v2/ams-connector/pom.xml +++ b/flink_jobs_v2/ams-connector/pom.xml @@ -16,11 +16,11 @@ flink.jobs.v2 flink_jobs_v2 - 2.1.0 + 2.1.1 ams.connector - 2.1.0 + 2.1.1 ams.connector Connect to AMS diff --git a/flink_jobs_v2/ams_ingest_metric/pom.xml b/flink_jobs_v2/ams_ingest_metric/pom.xml index 7579c15c..95e17534 100644 --- a/flink_jobs_v2/ams_ingest_metric/pom.xml +++ b/flink_jobs_v2/ams_ingest_metric/pom.xml @@ -14,7 +14,7 @@ language governing permissions and limitations under the License. --> argo.streaming ams-ingest-metric - 2.1.0 + 2.1.1 jar ARGO AMS Ingest Metric Data job @@ -63,7 +63,7 @@ language governing permissions and limitations under the License. --> ams.connector ams-connector - 2.1.0 + 2.1.1 jar diff --git a/flink_jobs_v2/ams_ingest_sync/pom.xml b/flink_jobs_v2/ams_ingest_sync/pom.xml index e7fe8b30..c9ae5c21 100644 --- a/flink_jobs_v2/ams_ingest_sync/pom.xml +++ b/flink_jobs_v2/ams_ingest_sync/pom.xml @@ -14,7 +14,7 @@ language governing permissions and limitations under the License. --> 4.0.0 argo.streaming - 2.1.0 + 2.1.1 ams-ingest-sync Stream sync data from AMS to HDFS @@ -57,7 +57,7 @@ language governing permissions and limitations under the License. --> ams.connector ams-connector - 2.1.0 + 2.1.1 jar diff --git a/flink_jobs_v2/batch_multi/pom.xml b/flink_jobs_v2/batch_multi/pom.xml index 93ca60c5..c416b780 100644 --- a/flink_jobs_v2/batch_multi/pom.xml +++ b/flink_jobs_v2/batch_multi/pom.xml @@ -15,11 +15,11 @@ language governing permissions and limitations under the License. --> flink.jobs.v2 flink_jobs_v2 - 2.1.0 + 2.1.1 argo.batch ArgoMultiJob - 2.0.0 + 2.1.1 jar Argo Multi Job @@ -175,19 +175,19 @@ language governing permissions and limitations under the License. --> timeline.manager Timelines - 2.1.0 + 2.1.1 jar profiles.manager ProfilesManager - 2.1.0 + 2.1.1 jar api.resource.manager ApiResourceManager - 2.1.0 + 2.1.1 jar diff --git a/flink_jobs_v2/pom.xml b/flink_jobs_v2/pom.xml index d8ebce62..541811c4 100644 --- a/flink_jobs_v2/pom.xml +++ b/flink_jobs_v2/pom.xml @@ -3,7 +3,7 @@ 4.0.0 flink.jobs.v2 flink_jobs_v2 - 2.1.0 + 2.1.1 pom UTF-8 diff --git a/flink_jobs_v2/stream_status/pom.xml b/flink_jobs_v2/stream_status/pom.xml index 86c136d5..b521b266 100644 --- a/flink_jobs_v2/stream_status/pom.xml +++ b/flink_jobs_v2/stream_status/pom.xml @@ -14,11 +14,11 @@ language governing permissions and limitations under the License. --> flink.jobs.v2 flink_jobs_v2 - 2.1.0 + 2.1.1 argo.streaming streaming-status - 2.1.0 + 2.1.1 jar ARGO Streaming status job @@ -66,19 +66,19 @@ language governing permissions and limitations under the License. --> ams.connector ams-connector - 2.1.0 + 2.1.1 jar api.resource.manager ApiResourceManager - 2.1.0 + 2.1.1 jar profiles.manager ProfilesManager - 2.1.0 + 2.1.1 jar