Skip to content

Commit

Permalink
Azure Functions Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
sjuarez committed Jan 13, 2025
1 parent be10e1f commit 7ac2c12
Show file tree
Hide file tree
Showing 14 changed files with 75 additions and 151 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.genexus.cloud.serverless.azure.handler;
import com.genexus.cloud.serverless.model.*;

import com.genexus.cloud.serverless.model.*;
import com.microsoft.azure.functions.annotation.*;
import com.microsoft.azure.functions.ExecutionContext;

Expand All @@ -17,41 +17,43 @@ public AzureBlobStorageHandler() throws Exception {
super();
}
public void run(
@BlobTrigger(name = "content", source = "EventGrid", path = "%blob_path%", dataType = "binary") byte[] content,
@BlobTrigger(name = "content", source = "EventGrid", path = "test-triggerinput-java/{name}", dataType = "binary") byte[] content,
@BindingName("name") String name,
final ExecutionContext context
) throws Exception {

context.getLogger().info("GeneXus Blob Storage trigger handler. Function processed: " + context.getFunctionName() + " Invocation Id: " + context.getInvocationId());
setupServerlessMappings(context.getFunctionName());

String functionName = context.getFunctionName().trim().toUpperCase();
String storageEnvVar = String.format("GX_AZURE_%s_BLOB_STORAGE", functionName);
String containerEnvVar = String.format("GX_AZURE_%s_BLOB_STORAGE_CONTAINER", functionName);
String storageAccount = System.getenv(storageEnvVar);
String containerName = System.getenv(containerEnvVar);
String blobUri;
if (storageAccount != null && !storageAccount.isEmpty() && containerName != null && !containerName.isEmpty())
blobUri = String.format("https://%s.blob.core.windows.net/%s/%s", storageAccount, containerName, "name");
else {
blobUri = "name";
context.getLogger().warning(String.format("Could not return complete URI. Please configure GX_AZURE_%s_BLOB_STORAGE and GX_AZURE_%s_BLOB_STORAGE_CONTAINER app settings.",functionName,functionName));
}
switch (executor.getMethodSignatureIdx()) {
case 0:
EventMessage msg = new EventMessage();
msg.setMessageId(context.getInvocationId());
msg.setMessageSourceType(EventMessageSourceType.BLOB);
Instant nowUtc = Instant.now();
msg.setMessageDate(Date.from(nowUtc));
msg.setMessageData(Base64.getEncoder().encodeToString(content));
List<EventMessageProperty> msgAtts = msg.getMessageProperties();
msg.setMessageData(blobUri);
msgAtts.add(new EventMessageProperty("Id", context.getInvocationId()));
msgAtts.add(new EventMessageProperty("name", name));

msgs.add(msg);
break;
case 1:
case 2:
rawMessage = Base64.getEncoder().encodeToString(content);
}
try {
EventMessageResponse response = dispatchEvent(msgs, rawMessage);
if (response.hasFailed()) {
logger.error(String.format("Messages were not handled. Error: %s", response.getErrorMessage()));
throw new RuntimeException(response.getErrorMessage()); //Throw the exception so the runtime can Retry the operation.
}
} catch (Exception e) {
logger.error("HandleRequest execution error", e);
throw e; //Throw the exception so the runtime can Retry the operation.
rawMessage = blobUri;
}
ExecuteDynamic(msgs,rawMessage);
}
}

Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
package com.genexus.cloud.serverless.azure.handler;

import com.genexus.cloud.serverless.JSONHelper;
import com.genexus.cloud.serverless.model.*;
import com.microsoft.azure.functions.ExecutionContext;
Expand All @@ -16,37 +17,26 @@ public void run(
final ExecutionContext context) throws Exception {

context.getLogger().info("GeneXus CosmosDB trigger handler. Function processed: " + context.getFunctionName() + " Invocation Id: " + context.getInvocationId());

UUID eventId = UUID.randomUUID();
EventMessages msgs = new EventMessages();
EventMessagesList eventMessagesList = new EventMessagesList();
String rawMessage= "";
setupServerlessMappings(context.getFunctionName());

switch (executor.getMethodSignatureIdx()) {
case 0:
msgs = setupEventMessages(eventId,TryGetDocuments(items));
msgs = setupEventMessages(eventId,TryGetDocuments(items, context));
break;
case 4:
eventMessagesList = setupEventMessagesList(TryGetDocuments(items));
eventMessagesList = setupEventMessagesList(TryGetDocuments(items, context));
break;
default:
rawMessage = JSONHelper.toJSONString(items);
break;
}
try {
EventMessageResponse response = dispatchEvent(msgs, eventMessagesList, rawMessage);
if (response.hasFailed()) {
logger.error(String.format("Messages were not handled. Error: %s", response.getErrorMessage()));
throw new RuntimeException(response.getErrorMessage()); //Throw the exception so the runtime can Retry the operation.
}
} catch (Exception e) {
logger.error("HandleRequest execution error", e);
throw e; //Throw the exception so the runtime can Retry the operation.
}
ExecuteDynamic(msgs,eventMessagesList,rawMessage);
}

private List<Map<String,Object>> TryGetDocuments(List<Object> items){
private List<Map<String,Object>> TryGetDocuments(List<Object> items, ExecutionContext context){
List<Map<String,Object>> documents = new ArrayList<>();
if (items != null && !items.isEmpty()) {
for (Object item : items) {
Expand All @@ -57,7 +47,7 @@ private List<Map<String,Object>> TryGetDocuments(List<Object> items){
}
catch (Exception ex)
{
logger.error(String.format("Messages were not handled. Error trying to read Cosmos DB data. %s", ex.toString()));
context.getLogger().severe(String.format("Messages were not handled. Error trying to read Cosmos DB data. %s", ex.toString()));
throw new RuntimeException(String.format("Error trying to read Cosmos DB data. %s", ex.toString())); //Throw the exception so the runtime can Retry the operation.
}
}
Expand All @@ -66,6 +56,7 @@ private List<Map<String,Object>> TryGetDocuments(List<Object> items){
}
return null;
}

private EventMessagesList setupEventMessagesList(List<Map<String,Object>> jsonList)
{
EventMessagesList messagesList = new EventMessagesList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.genexus.cloud.serverless.model.EventMessage;
import com.genexus.cloud.serverless.model.EventMessageProperty;
import com.genexus.cloud.serverless.model.EventMessageResponse;
import com.genexus.cloud.serverless.model.EventMessages;
import com.microsoft.azure.functions.ExecutionContext;
import com.microsoft.azure.functions.annotation.EventGridTrigger;
Expand All @@ -25,20 +24,11 @@ public void run(
final ExecutionContext context) throws Exception {
context.getLogger().info("GeneXus Event Grid CloudEvents trigger handler. Function processed: " + context.getFunctionName() + " Invocation Id: " + context.getInvocationId());
setupServerlessMappings(context.getFunctionName());
setupEventGridMessage(eventJson);

try {
EventMessageResponse response = dispatchEvent(msgs, rawMessage);
if (response.hasFailed()) {
logger.error(String.format("Messages were not handled. Error: %s", response.getErrorMessage()));
throw new RuntimeException(response.getErrorMessage()); //Throw the exception so the runtime can Retry the operation.
}
} catch (Exception e) {
logger.error("HandleRequest execution error", e);
throw e; //Throw the exception so the runtime can Retry the operation.
}
setupEventGridMessage(eventJson, context);
ExecuteDynamic(msgs, rawMessage);
}
protected void setupEventGridMessage(String eventJson) throws JsonProcessingException {

protected void setupEventGridMessage(String eventJson, ExecutionContext context) throws JsonProcessingException {
switch (executor.getMethodSignatureIdx()) {
case 0:
try {
Expand Down Expand Up @@ -67,7 +57,7 @@ protected void setupEventGridMessage(String eventJson) throws JsonProcessingExce
msgs.add(msg);
}
catch (Exception e) {
logger.error("HandleRequest execution error", e);
context.getLogger().severe(String.format("HandleRequest execution error: %s",e.getMessage()));
throw e;}
break;
case 1:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,10 @@ public AzureEventGridHandler() throws Exception {
public void run(
@EventGridTrigger(name = "eventgridEvent") EventGridEvent event,
final ExecutionContext context) throws Exception {

context.getLogger().info("GeneXus Event Grid trigger handler. Function processed: " + context.getFunctionName() + " Invocation Id: " + context.getInvocationId());

setupServerlessMappings(context.getFunctionName());
setupEventGridMessage(event);

try {
EventMessageResponse response = dispatchEvent(msgs, rawMessage);
if (response.hasFailed()) {
logger.error(String.format("Messages were not handled. Error: %s", response.getErrorMessage()));
throw new RuntimeException(response.getErrorMessage()); //Throw the exception so the runtime can Retry the operation.
}
} catch (Exception e) {
logger.error("HandleRequest execution error", e);
throw e; //Throw the exception so the runtime can Retry the operation.
}
ExecuteDynamic(msgs, rawMessage);
}

protected void setupEventGridMessage(EventGridEvent event) {
Expand All @@ -46,15 +34,12 @@ protected void setupEventGridMessage(EventGridEvent event) {
msg.setMessageVersion(event.getDataVersion());
msg.setMessageDate(new Date());
msg.setMessageData(event.getData().toString());

List<EventMessageProperty> msgAtts = msg.getMessageProperties();
msgAtts.add(new EventMessageProperty("Id", event.getId()));

msgAtts.add(new EventMessageProperty("Subject",event.getSubject()));
msgAtts.add(new EventMessageProperty("Topic",event.getTopic()));
if (event.getEventTime()!= null)
msgAtts.add(new EventMessageProperty("EventTime",event.getEventTime().toString()));

msgs.add(msg);
break;
case 1:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@ public class AzureFunctionConfiguration extends ServerlessFunctionConfiguration

public AzureFunctionConfiguration() {
}

public AzureFunctionConfiguration(String functionName, String gxEntrypoint) {
this.functionName = functionName;
this.gxEntrypoint = gxEntrypoint;
}

public void setFunctionName(String functionName) {
this.functionName = functionName;
}

public void setGXEntrypoint(String gxEntrypoint) {
this.gxEntrypoint = gxEntrypoint;
}
Expand All @@ -30,6 +33,7 @@ public String getFunctionName() {
public String getGXEntrypoint() {
return gxEntrypoint;
}

@Override
public boolean isValidConfiguration () {
return functionName != null && !functionName.trim().isEmpty() && gxEntrypoint != null && !gxEntrypoint.trim().isEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ public class AzureFunctionConfigurationHelper {

public static List<AzureFunctionConfiguration> getFunctionsMapConfiguration() throws FunctionConfigurationException {
File configFile = new File(FUNCTION_CONFIG_PATH);

if (configFile.exists()) {
try {
String jsonConfig = new String(Files.readAllBytes(Paths.get(FUNCTION_CONFIG_PATH)));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package com.genexus.cloud.serverless.azure.handler;

import com.genexus.cloud.serverless.model.*;

import com.microsoft.azure.functions.annotation.*;
import com.microsoft.azure.functions.ExecutionContext;

import java.time.Instant;
import java.util.List;
import java.util.*;
Expand All @@ -13,6 +11,7 @@ public class AzureQueueHandler extends AzureEventHandler {
public AzureQueueHandler() throws Exception {
super();
}

public void run(
@QueueTrigger(name = "message", queueName = "%queue_name%") String message,
@BindingName("Id") final String id,
Expand Down Expand Up @@ -41,15 +40,6 @@ public void run(
msgAtts.add(new EventMessageProperty("PopReceipt", popReceipt));
msgs.add(msg);
}
try {
EventMessageResponse response = dispatchEvent(msgs, message);
if (response.hasFailed()) {
logger.error(String.format("Messages were not handled. Error: %s", response.getErrorMessage()));
throw new RuntimeException(response.getErrorMessage()); //Throw the exception so the runtime can Retry the operation.
}
} catch (Exception e) {
logger.error("HandleRequest execution error", e);
throw e; //Throw the exception so the runtime can Retry the operation.
}
ExecuteDynamic(msgs, message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.genexus.cloud.serverless.helpers.ServiceBusBatchMessageProcessor;
import com.genexus.cloud.serverless.helpers.ServiceBusProcessedMessage;
import com.genexus.cloud.serverless.model.*;
import com.microsoft.azure.functions.ExecutionContext;
import com.microsoft.azure.functions.annotation.*;

Expand All @@ -22,15 +21,6 @@ public void run(
setupServerlessMappings(context.getFunctionName());
ServiceBusBatchMessageProcessor queueBatchMessageProcessor = new ServiceBusBatchMessageProcessor();
ServiceBusProcessedMessage queueMessage = queueBatchMessageProcessor.processQueueMessage(context, executor, messages);
try {
EventMessageResponse response = dispatchEvent(queueMessage.getEventMessages(),queueMessage.getRawMessage());
if (response.hasFailed()) {
logger.error(String.format("Messages were not handled. Error: %s", response.getErrorMessage()));
throw new RuntimeException(response.getErrorMessage()); //Throw the exception so the runtime can Retry the operation.
}
} catch (Exception e) {
logger.error("HandleRequest execution error", e);
throw e; //Throw the exception so the runtime can Retry the operation.
}
ExecuteDynamic(queueMessage.getEventMessages(),queueMessage.getRawMessage());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.genexus.cloud.serverless.helpers.ServiceBusProcessedMessage;
import com.genexus.cloud.serverless.helpers.ServiceBusSingleMessageProcessor;
import com.genexus.cloud.serverless.model.*;
import com.microsoft.azure.functions.ExecutionContext;
import com.microsoft.azure.functions.annotation.BindingName;
import com.microsoft.azure.functions.annotation.Cardinality;
Expand All @@ -26,15 +25,6 @@ public void run(
setupServerlessMappings(context.getFunctionName());
ServiceBusSingleMessageProcessor queueSingleMessageProcessor = new ServiceBusSingleMessageProcessor();
ServiceBusProcessedMessage queueMessage = queueSingleMessageProcessor.processQueueMessage(executor,messageId,enqueuedTimeUtc,context,message);
try {
EventMessageResponse response = dispatchEvent(queueMessage.getEventMessages(), queueMessage.getRawMessage());
if (response.hasFailed()) {
logger.error(String.format("Messages were not handled. Error: %s", response.getErrorMessage()));
throw new RuntimeException(response.getErrorMessage()); //Throw the exception so the runtime can Retry the operation.
}
} catch (Exception e) {
logger.error("HandleRequest execution error", e);
throw e; //Throw the exception so the runtime can Retry the operation.
}
ExecuteDynamic(queueMessage.getEventMessages(), queueMessage.getRawMessage());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.genexus.cloud.serverless.helpers.ServiceBusBatchMessageProcessor;
import com.genexus.cloud.serverless.helpers.ServiceBusProcessedMessage;
import com.genexus.cloud.serverless.model.*;
import com.microsoft.azure.functions.ExecutionContext;
import com.microsoft.azure.functions.annotation.*;

Expand All @@ -22,15 +21,6 @@ public void run(
setupServerlessMappings(context.getFunctionName());
ServiceBusBatchMessageProcessor queueBatchMessageProcessor = new ServiceBusBatchMessageProcessor();
ServiceBusProcessedMessage queueMessage = queueBatchMessageProcessor.processQueueMessage(context, executor, messages);
try {
EventMessageResponse response = dispatchEvent(queueMessage.getEventMessages(), queueMessage.getRawMessage());
if (response.hasFailed()) {
logger.error(String.format("Messages were not handled. Error: %s", response.getErrorMessage()));
throw new RuntimeException(response.getErrorMessage()); //Throw the exception so the runtime can Retry the operation.
}
} catch (Exception e) {
logger.error("HandleRequest execution error", e);
throw e; //Throw the exception so the runtime can Retry the operation.
}
ExecuteDynamic(queueMessage.getEventMessages(), queueMessage.getRawMessage());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.genexus.cloud.serverless.helpers.ServiceBusProcessedMessage;
import com.genexus.cloud.serverless.helpers.ServiceBusSingleMessageProcessor;
import com.genexus.cloud.serverless.model.*;
import com.microsoft.azure.functions.ExecutionContext;
import com.microsoft.azure.functions.annotation.BindingName;
import com.microsoft.azure.functions.annotation.Cardinality;
Expand All @@ -26,16 +25,7 @@ public void run(
setupServerlessMappings(context.getFunctionName());
ServiceBusSingleMessageProcessor queueSingleMessageProcessor = new ServiceBusSingleMessageProcessor();
ServiceBusProcessedMessage queueMessage = queueSingleMessageProcessor.processQueueMessage(executor,messageId,enqueuedTimeUtc,context,message);
try {
EventMessageResponse response = dispatchEvent(queueMessage.getEventMessages(),queueMessage.getRawMessage());
if (response.hasFailed()) {
logger.error(String.format("Messages were not handled. Error: %s", response.getErrorMessage()));
throw new RuntimeException(response.getErrorMessage()); //Throw the exception so the runtime can Retry the operation.
}
} catch (Exception e) {
logger.error("HandleRequest execution error", e);
throw e; //Throw the exception so the runtime can Retry the operation.
}
ExecuteDynamic(queueMessage.getEventMessages(),queueMessage.getRawMessage());
}
}

Loading

0 comments on commit 7ac2c12

Please sign in to comment.