Skip to content

Commit

Permalink
Improve analytic logs
Browse files Browse the repository at this point in the history
Fixes #3680
  • Loading branch information
sajinieKavindya committed Jan 24, 2025
1 parent 115aae3 commit 17c371c
Show file tree
Hide file tree
Showing 17 changed files with 385 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,16 @@ private void publishInboundEndpointAnalytics(PublishingEvent event) {
ElasticDataSchemaElement inboundEndpointDetails = new ElasticDataSchemaElement();
inboundEndpointDetails.setAttribute(
ElasticConstants.EnvelopDef.INBOUND_ENDPOINT_NAME, event.getComponentName());

Object obj = event.getElasticMetadata().getProperty(SynapseConstants.STATISTICS_METADATA);
if (obj instanceof Map<?, ?>) {
//noinspection unchecked
Map<String, Object> statisticsDetails = (Map<String, Object>) obj;
for (Map.Entry<String, Object> entry : statisticsDetails.entrySet()) {
inboundEndpointDetails.setAttribute(entry.getKey(), entry.getValue());
}
}

analyticsPayload.setAttribute(
ElasticConstants.EnvelopDef.INBOUND_ENDPOINT_DETAILS, inboundEndpointDetails);
attachHttpProperties(analyticsPayload, event.getElasticMetadata());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,20 @@
import org.apache.commons.vfs2.FileObject;
import org.apache.synapse.SynapseConstants;
import org.apache.synapse.SynapseException;
import org.apache.synapse.aspects.flow.statistics.collectors.RuntimeStatisticCollector;
import org.apache.synapse.commons.vfs.FileObjectDataSource;
import org.apache.synapse.commons.vfs.VFSConstants;
import org.apache.synapse.commons.vfs.VFSUtils;
import org.apache.synapse.core.SynapseEnvironment;
import org.apache.synapse.inbound.InboundEndpoint;
import org.apache.synapse.inbound.InboundEndpointConstants;
import org.apache.synapse.mediators.base.SequenceMediator;
import org.apache.synapse.transport.customlogsetter.CustomLogSetter;
import org.wso2.carbon.inbound.endpoint.inboundfactory.InboundRequestProcessorFactoryImpl;

import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import javax.mail.internet.ContentType;
Expand All @@ -58,6 +63,7 @@ public class FileInjectHandler {
private Properties vfsProperties;
private SynapseEnvironment synapseEnvironment;
private Map<String, Object> transportHeaders;
private String fileURI;

public FileInjectHandler(String injectingSeq, String onErrorSeq, boolean sequential,
SynapseEnvironment synapseEnvironment, Properties vfsProperties) {
Expand All @@ -82,6 +88,9 @@ public boolean invoke(Object object, String name) throws SynapseException {
msgCtx.setProperty(SynapseConstants.INBOUND_ENDPOINT_NAME, name);
msgCtx.setProperty(SynapseConstants.ARTIFACT_NAME, SynapseConstants.FAIL_SAFE_MODE_INBOUND_ENDPOINT + name);
msgCtx.setProperty(SynapseConstants.IS_INBOUND, true);
if (RuntimeStatisticCollector.isStatisticsEnabled()) {
populateStatisticsMetadata(msgCtx);
}
InboundEndpoint inboundEndpoint = msgCtx.getConfiguration().getInboundEndpoint(name);
CustomLogSetter.getInstance().setLogAppender(inboundEndpoint.getArtifactContainerName());
String contentType = vfsProperties.getProperty(VFSConstants.TRANSPORT_FILE_CONTENT_TYPE);
Expand Down Expand Up @@ -198,6 +207,11 @@ public void setTransportHeaders(Map<String, Object> transportHeaders) {
this.transportHeaders = transportHeaders;
}

public void setFileURI(String fileURI) {

this.fileURI = fileURI;
}

/**
* Create the initial message context for the file
*/
Expand All @@ -213,5 +227,13 @@ private org.apache.synapse.MessageContext createMessageContext() {
return msgCtx;
}

private void populateStatisticsMetadata(org.apache.synapse.MessageContext msgCtx) {
Map<String, Object> statisticsDetails = new HashMap<String, Object>();
statisticsDetails.put(SynapseConstants.FILE_URI, VFSUtils.maskURLPassword(fileURI));
statisticsDetails.put(InboundEndpointConstants.INBOUND_ENDPOINT_PROTOCOL,
InboundRequestProcessorFactoryImpl.Protocols.file.toString());
msgCtx.setProperty(SynapseConstants.STATISTICS_METADATA, statisticsDetails);
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -797,6 +797,7 @@ private FileObject processFile(FileObject file) throws SynapseException {
}

injectHandler.setTransportHeaders(transportHeaders);
injectHandler.setFileURI(this.fileURI);
// injectHandler
if (!injectHandler.invoke(file, name)) {
if (log.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,21 @@
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.MessageContext;
import org.apache.synapse.SynapseConstants;
import org.apache.synapse.aspects.flow.statistics.collectors.RuntimeStatisticCollector;
import org.apache.synapse.inbound.InboundEndpoint;
import org.apache.synapse.inbound.InboundEndpointConstants;
import org.apache.synapse.inbound.InboundProcessorParams;
import org.apache.synapse.inbound.InboundResponseSender;
import org.apache.synapse.mediators.base.SequenceMediator;
import org.apache.synapse.transport.customlogsetter.CustomLogSetter;
import org.wso2.carbon.inbound.endpoint.inboundfactory.InboundRequestProcessorFactoryImpl;
import org.wso2.carbon.inbound.endpoint.protocol.hl7.context.MLLPContext;
import org.wso2.carbon.inbound.endpoint.protocol.hl7.util.Axis2HL7Constants;
import org.wso2.carbon.inbound.endpoint.protocol.hl7.util.HL7ExecutorServiceFactory;
import org.wso2.carbon.inbound.endpoint.protocol.hl7.util.HL7MessageUtils;

import java.nio.charset.CharsetDecoder;
import java.util.HashMap;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -95,6 +98,9 @@ public void processRequest(final MLLPContext mllpContext) throws Exception {
synCtx.setProperty(SynapseConstants.ARTIFACT_NAME,
SynapseConstants.FAIL_SAFE_MODE_INBOUND_ENDPOINT + params.getName());
synCtx.setProperty(SynapseConstants.IS_INBOUND, true);
if (RuntimeStatisticCollector.isStatisticsEnabled()) {
populateStatisticsMetadata(synCtx);
}
InboundEndpoint inboundEndpoint = synCtx.getConfiguration().getInboundEndpoint(params.getName());
CustomLogSetter.getInstance().setLogAppender(inboundEndpoint.getArtifactContainerName());
synCtx.setProperty(MLLPConstants.HL7_INBOUND_MSG_ID, synCtx.getMessageID());
Expand Down Expand Up @@ -155,6 +161,9 @@ public void processError(final MLLPContext mllpContext, final Exception ex) {
synCtx.setProperty(SynapseConstants.ARTIFACT_NAME,
SynapseConstants.FAIL_SAFE_MODE_INBOUND_ENDPOINT + params.getName());
synCtx.setProperty(SynapseConstants.IS_INBOUND, true);
if (RuntimeStatisticCollector.isStatisticsEnabled()) {
populateStatisticsMetadata(synCtx);
}
InboundEndpoint inboundEndpoint = synCtx.getConfiguration().getInboundEndpoint(params.getName());
CustomLogSetter.getInstance().setLogAppender(inboundEndpoint.getArtifactContainerName());
synCtx.setProperty(MLLPConstants.HL7_INBOUND_MSG_ID, synCtx.getMessageID());
Expand Down Expand Up @@ -224,6 +233,15 @@ public Map<String, Object> getInboundParameterMap() {
return parameters;
}

private void populateStatisticsMetadata(MessageContext synCtx) {
Map<String, Object> statisticsDetails = new HashMap<String, Object>();
statisticsDetails.put(SynapseConstants.INBOUND_PORT,
params.getProperties().getProperty(MLLPConstants.PARAM_HL7_PORT));
statisticsDetails.put(InboundEndpointConstants.INBOUND_ENDPOINT_PROTOCOL,
InboundRequestProcessorFactoryImpl.Protocols.hl7.toString());
synCtx.setProperty(SynapseConstants.STATISTICS_METADATA, statisticsDetails);
}

@Override
public void sendBack(MessageContext messageContext) {
MLLPContext mllpContext = (MLLPContext) messageContext.getProperty(MLLPConstants.MLLP_CONTEXT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.synapse.SynapseException;
import org.apache.synapse.api.ApiConstants;
import org.apache.synapse.api.inbound.InboundApiHandler;
import org.apache.synapse.aspects.flow.statistics.collectors.RuntimeStatisticCollector;
import org.apache.synapse.core.axis2.Axis2MessageContext;
import org.apache.synapse.core.axis2.Axis2Sender;
import org.apache.synapse.core.axis2.MessageContextCreatorForAxis2;
Expand All @@ -51,6 +52,8 @@
import org.wso2.carbon.inbound.endpoint.protocol.http.management.HTTPEndpointManager;

import java.io.OutputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -275,9 +278,19 @@ private void injectToSequence(org.apache.synapse.MessageContext synCtx, InboundE
synCtx.setProperty(SynapseConstants.INBOUND_ENDPOINT_NAME, endpoint.getName());
synCtx.setProperty(SynapseConstants.ARTIFACT_NAME,
SynapseConstants.FAIL_SAFE_MODE_INBOUND_ENDPOINT + endpoint.getName());
if (RuntimeStatisticCollector.isStatisticsEnabled()) {
populateStatisticsMetadata(synCtx, endpoint);
}
synCtx.getEnvironment().injectMessage(synCtx, injectingSequence);
}

private void populateStatisticsMetadata(org.apache.synapse.MessageContext synCtx, InboundEndpoint endpoint) {
Map<String, Object> statisticsDetails = new HashMap<String, Object>();
statisticsDetails.put(SynapseConstants.INBOUND_PORT, port);
statisticsDetails.put(InboundEndpointConstants.INBOUND_ENDPOINT_PROTOCOL, endpoint.getProtocol());
synCtx.setProperty(SynapseConstants.STATISTICS_METADATA, statisticsDetails);
}

private SequenceMediator getFaultSequence(org.apache.synapse.MessageContext synCtx, InboundEndpoint endpoint) {
SequenceMediator faultSequence = null;
if (endpoint.getOnErrorSeq() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,25 @@
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.SynapseConstants;
import org.apache.synapse.SynapseException;
import org.apache.synapse.aspects.flow.statistics.collectors.RuntimeStatisticCollector;
import org.apache.synapse.commons.vfs.VFSConstants;
import org.apache.synapse.commons.vfs.VFSUtils;
import org.apache.synapse.core.SynapseEnvironment;
import org.apache.synapse.core.axis2.Axis2MessageContext;
import org.apache.synapse.inbound.InboundEndpoint;
import org.apache.synapse.inbound.InboundEndpointConstants;
import org.apache.synapse.mediators.MediatorFaultHandler;
import org.apache.synapse.mediators.base.SequenceMediator;
import org.apache.synapse.transport.customlogsetter.CustomLogSetter;
import org.wso2.carbon.inbound.endpoint.inboundfactory.InboundRequestProcessorFactoryImpl;
import org.wso2.carbon.inbound.endpoint.protocol.generic.GenericConstants;
import org.wso2.carbon.inbound.endpoint.protocol.hl7.core.MLLPConstants;
import org.wso2.carbon.inbound.endpoint.protocol.jms.factory.CachedJMSConnectionFactory;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import javax.jms.BytesMessage;
Expand Down Expand Up @@ -94,6 +100,9 @@ public boolean invoke(Object object, String name) throws SynapseException {
msgCtx.setProperty(SynapseConstants.INBOUND_ENDPOINT_NAME, name);
msgCtx.setProperty(SynapseConstants.ARTIFACT_NAME, SynapseConstants.FAIL_SAFE_MODE_INBOUND_ENDPOINT + name);
msgCtx.setProperty(SynapseConstants.IS_INBOUND, true);
if (RuntimeStatisticCollector.isStatisticsEnabled()) {
populateStatisticsMetadata(msgCtx);
}
InboundEndpoint inboundEndpoint = msgCtx.getConfiguration().getInboundEndpoint(name);

// Adding inbound endpoint parameters as synapse properties
Expand Down Expand Up @@ -290,6 +299,13 @@ public boolean invoke(Object object, String name) throws SynapseException {
return true;
}

private void populateStatisticsMetadata(org.apache.synapse.MessageContext synCtx) {
Map<String, Object> statisticsDetails = new HashMap<String, Object>();
statisticsDetails.put(InboundEndpointConstants.INBOUND_ENDPOINT_PROTOCOL,
InboundRequestProcessorFactoryImpl.Protocols.jms.toString());
synCtx.setProperty(SynapseConstants.STATISTICS_METADATA, statisticsDetails);
}

/**
* Evaluate if JMS session need to be rollback judging
* from properties set to message context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,18 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.SynapseConstants;
import org.apache.synapse.aspects.flow.statistics.collectors.RuntimeStatisticCollector;
import org.apache.synapse.core.SynapseEnvironment;
import org.apache.synapse.inbound.InboundEndpoint;
import org.apache.synapse.inbound.InboundEndpointConstants;
import org.apache.synapse.mediators.base.SequenceMediator;
import org.apache.synapse.transport.customlogsetter.CustomLogSetter;
import org.wso2.carbon.inbound.endpoint.inboundfactory.InboundRequestProcessorFactoryImpl;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;

/**
* KafkaInjectHandler uses to mediate the received Kafka message
Expand Down Expand Up @@ -63,12 +68,15 @@ public KAFKAInjectHandler(String injectingSeq, String onErrorSeq, boolean sequen
* inject the message to the sequence
*/
public boolean invoke(Object object, String name) {
byte[] msg = (byte[]) object;
KafkaMessageContext kafkaMessageContext = (KafkaMessageContext) object;

org.apache.synapse.MessageContext msgCtx = createMessageContext();
msgCtx.setProperty(SynapseConstants.INBOUND_ENDPOINT_NAME, name);
msgCtx.setProperty(SynapseConstants.ARTIFACT_NAME, SynapseConstants.FAIL_SAFE_MODE_INBOUND_ENDPOINT + name);
msgCtx.setProperty(SynapseConstants.IS_INBOUND, true);
if (RuntimeStatisticCollector.isStatisticsEnabled()) {
populateStatisticsMetadata(msgCtx, kafkaMessageContext);
}
InboundEndpoint inboundEndpoint = msgCtx.getConfiguration().getInboundEndpoint(name);
CustomLogSetter.getInstance().setLogAppender(inboundEndpoint.getArtifactContainerName());
log.debug("Processed Kafka Message ");
Expand Down Expand Up @@ -97,7 +105,7 @@ public boolean invoke(Object object, String name) {
}
OMElement documentElement = null;
// set the message payload to the message context
InputStream in = new AutoCloseInputStream(new ByteArrayInputStream(msg));
InputStream in = new AutoCloseInputStream(new ByteArrayInputStream(kafkaMessageContext.getMsg()));
try {
documentElement = builder.processDocument(in, contentType, axis2MsgCtx);
} catch (AxisFault axisFault) {
Expand Down Expand Up @@ -147,4 +155,12 @@ private org.apache.synapse.MessageContext createMessageContext() {
return msgCtx;
}

private void populateStatisticsMetadata(org.apache.synapse.MessageContext synCtx, KafkaMessageContext kafkaMessageContext) {
Map<String, Object> statisticsDetails = new HashMap<String, Object>();
statisticsDetails.put(InboundEndpointConstants.INBOUND_ENDPOINT_PROTOCOL,
InboundRequestProcessorFactoryImpl.Protocols.kafka.toString());
statisticsDetails.put(SynapseConstants.CONNECTION, kafkaMessageContext.getConnection());
statisticsDetails.put(SynapseConstants.TOPIC, kafkaMessageContext.getTopic());
synCtx.setProperty(SynapseConstants.STATISTICS_METADATA, statisticsDetails);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import kafka.consumer.ConsumerTimeoutException;
import kafka.consumer.KafkaStream;
import kafka.consumer.Whitelist;
import kafka.message.MessageAndMetadata;
import org.I0Itec.zkclient.exception.ZkTimeoutException;
import org.apache.synapse.SynapseException;

Expand Down Expand Up @@ -143,8 +144,11 @@ public void injectMessageToESB(String name) {
}

public void injectMessageToESB(String sequenceName, ConsumerIterator<byte[], byte[]> consumerIterator) {
byte[] msg = consumerIterator.next().message();
injectHandler.invoke(msg, sequenceName);
MessageAndMetadata<byte[], byte[]> messageAndMetadata = consumerIterator.next();
KafkaMessageContext kafkaMessageContext = new KafkaMessageContext(
kafkaProperties.getProperty(KAFKAConstants.ZOOKEEPER_CONNECT),
messageAndMetadata.topic(), messageAndMetadata.message());
injectHandler.invoke(kafkaMessageContext, sequenceName);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright (c) 2025, WSO2 LLC. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 LLC. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.carbon.inbound.endpoint.protocol.kafka;

public class KafkaMessageContext {

private String connection;
private String topic;
private byte[] msg;

public KafkaMessageContext(String host, int port, String topic, byte[] msg) {
this.connection = host + ":" + port;
this.topic = topic;
this.msg = msg;
}

public KafkaMessageContext(String connection, String topic, byte[] msg) {
this.connection = connection;
this.topic = topic;
this.msg = msg;
}

public String getTopic() {

return topic;
}

public void setTopic(String topic) {

this.topic = topic;
}

public byte[] getMsg() {

return msg;
}

public void setMsg(byte[] msg) {

this.msg = msg;
}

public String getConnection() {

return connection;
}

public void setConnection(String connection) {

this.connection = connection;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,9 @@ public void injectMessageToESB(String name) {
if (log.isDebugEnabled()) {
log.debug("Start : Add to injectHandler to invoke");
}
injectHandler.invoke(bytes, name);
KafkaMessageContext kafkaMessageContext = new KafkaMessageContext(consumer.host(), port, topic,
bytes);
injectHandler.invoke(kafkaMessageContext, name);
if (log.isDebugEnabled()) {
log.debug("End : Add the injectHandler to invoke");
}
Expand Down
Loading

0 comments on commit 17c371c

Please sign in to comment.