Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

increase timeout #6

Merged
merged 3 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ private R handleResponse(Message rabbitResponse) {

if(exception != null) {
try {
logger.error("Found error on response " + exception);
logger.error("Found error on response {}. Action : {}" ,exception, rabbitResponse.getMessageProperties().getHeaders().get(Headers.METHOD));
throw objectMapper.readValue(exception, CommandExecutionException.class);
} catch (JsonProcessingException e) {
logger.error("Error ", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ public void dispatchEvent(Event event) {
message.getMessageProperties().getHeaders().put(PROJECT_ID, projectId);
}
eventRabbitTemplate.convertAndSend(RabbitMQEventsConfiguration.EVENT_EXCHANGE, "", message);
logger.info("Sent event message!");
} catch (JsonProcessingException | AmqpException e) {
logger.info("Could not serialize event: {}", e.getMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ public RabbitMQEventHandlerWrapper(List<EventHandler<? extends Event>> eventHand

@Override
public void onMessage(Message message) {
logger.info("Handling event with id {}", message.getMessageProperties().getMessageId());
EventHandler eventHandler = eventHandlers.stream()
.filter(handler -> {
String channel = String.valueOf(message.getMessageProperties().getHeaders().get(CHANNEL));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ public RabbitMqCommandHandlerWrapper(List<CommandHandler<? extends Request, ? ex

@Override
public void onMessage(Message message, Channel channel) throws Exception {
logger.info("Received message " + message);

var replyChannel = message.getMessageProperties().getReplyTo();
if (replyChannel == null) {
String errorMessage = Headers.REPLY_CHANNEL + " header is missing. Cannot reply to message.";
Expand Down Expand Up @@ -86,7 +84,6 @@ public void onMessage(Message message, Channel channel) throws Exception {
}

CommandHandler handler = extractHandler(messageType);
logger.info("Dispatch handling to {}", handler.getClass());
parseAndHandleRequest(handler, message, channel, new UserId(userId), accessToken);
}

Expand Down Expand Up @@ -171,10 +168,15 @@ private void authorizeAndReplyToRequest(CommandHandler<Q,R> handler,

private void handleAndReplyToRequest(CommandHandler<Q,R> handler, Channel channel, Message message, UserId userId, Q request, String accessToken) {
var executionContext = new ExecutionContext(userId, accessToken);
long startTime = System.currentTimeMillis();

try {
var response = handler.handleRequest(request, executionContext);
response.subscribe(r -> {
replyWithSuccessResponse(channel, message, userId, r);
long endtime = System.currentTimeMillis();
logger.info("Request executed " + request.getChannel() + ". Time taken for Execution is : " + (endtime-startTime) +"ms");

}, throwable -> {
if (throwable instanceof CommandExecutionException ex) {
logger.info(
Expand All @@ -183,14 +185,23 @@ private void handleAndReplyToRequest(CommandHandler<Q,R> handler, Channel channe
throwable.getMessage(),
request);
replyWithErrorResponse(message,channel, userId, ex.getStatus());
long endtime = System.currentTimeMillis();
logger.info("Request failed " + request.getChannel() + "with error " + throwable.getMessage() + ". Time taken for Execution is : " + (endtime-startTime) +"ms");
}
else {
replyWithErrorResponse(message, channel, userId, HttpStatus.INTERNAL_SERVER_ERROR);
long endtime = System.currentTimeMillis();

logger.info("Request failed " + request.getChannel() + "with error " + throwable.getMessage() + ". Time taken for Execution is : " + (endtime-startTime) +"ms");

}
});
} catch (Throwable throwable) {
logger.error("Uncaught exception when handling request", throwable);
replyWithErrorResponse(message, channel, userId, HttpStatus.INTERNAL_SERVER_ERROR);
long endtime = System.currentTimeMillis();
logger.info("Request failed " + request.getChannel() + ". Time taken for Execution is : " + (endtime-startTime) +"ms");

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public SimpleMessageListenerContainer replyListenerContainer(ConnectionFactory c
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueues(replyQueue);
container.setConcurrency("15-20");
return container;
}

Expand All @@ -125,6 +126,7 @@ public SimpleMessageListenerContainer messageListenerContainers() {
container.setQueueNames(getCommandQueue());
container.setConnectionFactory(connectionFactory);
container.setMessageListener(rabbitMqCommandHandlerWrapper());
container.setConcurrency("15-20");
return container;
}

Expand Down
Loading