Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Azure Functions Fixes #925

Merged
merged 4 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.genexus.cloud.serverless.azure.handler;
import com.genexus.cloud.serverless.model.*;

import com.genexus.cloud.serverless.model.*;
import com.microsoft.azure.functions.annotation.*;
import com.microsoft.azure.functions.ExecutionContext;

Expand All @@ -17,41 +17,43 @@ public AzureBlobStorageHandler() throws Exception {
super();
}
public void run(
@BlobTrigger(name = "content", source = "EventGrid", path = "%blob_path%", dataType = "binary") byte[] content,
@BlobTrigger(name = "content", source = "EventGrid", path = "test-triggerinput-java/{name}", dataType = "binary") byte[] content,
@BindingName("name") String name,
final ExecutionContext context
) throws Exception {

context.getLogger().info("GeneXus Blob Storage trigger handler. Function processed: " + context.getFunctionName() + " Invocation Id: " + context.getInvocationId());
setupServerlessMappings(context.getFunctionName());

String functionName = context.getFunctionName().trim().toUpperCase();
String storageEnvVar = String.format("GX_AZURE_%s_BLOB_STORAGE", functionName);
String containerEnvVar = String.format("GX_AZURE_%s_BLOB_STORAGE_CONTAINER", functionName);
String storageAccount = System.getenv(storageEnvVar);
String containerName = System.getenv(containerEnvVar);
String blobUri;
if (storageAccount != null && !storageAccount.isEmpty() && containerName != null && !containerName.isEmpty())
blobUri = String.format("https://%s.blob.core.windows.net/%s/%s", storageAccount, containerName, "name");
else {
blobUri = "name";
context.getLogger().warning(String.format("Could not return complete URI. Please configure GX_AZURE_%s_BLOB_STORAGE and GX_AZURE_%s_BLOB_STORAGE_CONTAINER app settings.",functionName,functionName));
}
switch (executor.getMethodSignatureIdx()) {
case 0:
EventMessage msg = new EventMessage();
msg.setMessageId(context.getInvocationId());
msg.setMessageSourceType(EventMessageSourceType.BLOB);
Instant nowUtc = Instant.now();
msg.setMessageDate(Date.from(nowUtc));
msg.setMessageData(Base64.getEncoder().encodeToString(content));
List<EventMessageProperty> msgAtts = msg.getMessageProperties();
msg.setMessageData(blobUri);
msgAtts.add(new EventMessageProperty("Id", context.getInvocationId()));
msgAtts.add(new EventMessageProperty("name", name));

msgs.add(msg);
break;
case 1:
case 2:
rawMessage = Base64.getEncoder().encodeToString(content);
}
try {
EventMessageResponse response = dispatchEvent(msgs, rawMessage);
if (response.hasFailed()) {
logger.error(String.format("Messages were not handled. Error: %s", response.getErrorMessage()));
throw new RuntimeException(response.getErrorMessage()); //Throw the exception so the runtime can Retry the operation.
}
} catch (Exception e) {
logger.error("HandleRequest execution error", e);
throw e; //Throw the exception so the runtime can Retry the operation.
rawMessage = blobUri;
}
ExecuteDynamic(msgs,rawMessage);
}
}

Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
package com.genexus.cloud.serverless.azure.handler;

import com.genexus.cloud.serverless.JSONHelper;
import com.genexus.cloud.serverless.model.*;
import com.microsoft.azure.functions.ExecutionContext;
Expand All @@ -16,37 +17,26 @@ public void run(
final ExecutionContext context) throws Exception {

context.getLogger().info("GeneXus CosmosDB trigger handler. Function processed: " + context.getFunctionName() + " Invocation Id: " + context.getInvocationId());

UUID eventId = UUID.randomUUID();
EventMessages msgs = new EventMessages();
EventMessagesList eventMessagesList = new EventMessagesList();
String rawMessage= "";
setupServerlessMappings(context.getFunctionName());

switch (executor.getMethodSignatureIdx()) {
case 0:
msgs = setupEventMessages(eventId,TryGetDocuments(items));
msgs = setupEventMessages(eventId,TryGetDocuments(items, context));
break;
case 4:
eventMessagesList = setupEventMessagesList(TryGetDocuments(items));
eventMessagesList = setupEventMessagesList(TryGetDocuments(items, context));
break;
default:
rawMessage = JSONHelper.toJSONString(items);
break;
}
try {
EventMessageResponse response = dispatchEvent(msgs, eventMessagesList, rawMessage);
if (response.hasFailed()) {
logger.error(String.format("Messages were not handled. Error: %s", response.getErrorMessage()));
throw new RuntimeException(response.getErrorMessage()); //Throw the exception so the runtime can Retry the operation.
}
} catch (Exception e) {
logger.error("HandleRequest execution error", e);
throw e; //Throw the exception so the runtime can Retry the operation.
}
ExecuteDynamic(msgs,eventMessagesList,rawMessage);
}

private List<Map<String,Object>> TryGetDocuments(List<Object> items){
private List<Map<String,Object>> TryGetDocuments(List<Object> items, ExecutionContext context){
List<Map<String,Object>> documents = new ArrayList<>();
if (items != null && !items.isEmpty()) {
for (Object item : items) {
Expand All @@ -57,7 +47,7 @@ private List<Map<String,Object>> TryGetDocuments(List<Object> items){
}
catch (Exception ex)
{
logger.error(String.format("Messages were not handled. Error trying to read Cosmos DB data. %s", ex.toString()));
context.getLogger().severe(String.format("Messages were not handled. Error trying to read Cosmos DB data. %s", ex.toString()));
throw new RuntimeException(String.format("Error trying to read Cosmos DB data. %s", ex.toString())); //Throw the exception so the runtime can Retry the operation.
}
}
Expand All @@ -66,6 +56,7 @@ private List<Map<String,Object>> TryGetDocuments(List<Object> items){
}
return null;
}

private EventMessagesList setupEventMessagesList(List<Map<String,Object>> jsonList)
{
EventMessagesList messagesList = new EventMessagesList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.genexus.cloud.serverless.model.EventMessage;
import com.genexus.cloud.serverless.model.EventMessageProperty;
import com.genexus.cloud.serverless.model.EventMessageResponse;
import com.genexus.cloud.serverless.model.EventMessages;
import com.microsoft.azure.functions.ExecutionContext;
import com.microsoft.azure.functions.annotation.EventGridTrigger;
Expand All @@ -25,20 +24,11 @@ public void run(
final ExecutionContext context) throws Exception {
context.getLogger().info("GeneXus Event Grid CloudEvents trigger handler. Function processed: " + context.getFunctionName() + " Invocation Id: " + context.getInvocationId());
setupServerlessMappings(context.getFunctionName());
setupEventGridMessage(eventJson);

try {
EventMessageResponse response = dispatchEvent(msgs, rawMessage);
if (response.hasFailed()) {
logger.error(String.format("Messages were not handled. Error: %s", response.getErrorMessage()));
throw new RuntimeException(response.getErrorMessage()); //Throw the exception so the runtime can Retry the operation.
}
} catch (Exception e) {
logger.error("HandleRequest execution error", e);
throw e; //Throw the exception so the runtime can Retry the operation.
}
setupEventGridMessage(eventJson, context);
ExecuteDynamic(msgs, rawMessage);
}
protected void setupEventGridMessage(String eventJson) throws JsonProcessingException {

protected void setupEventGridMessage(String eventJson, ExecutionContext context) throws JsonProcessingException {
switch (executor.getMethodSignatureIdx()) {
case 0:
try {
Expand Down Expand Up @@ -67,7 +57,7 @@ protected void setupEventGridMessage(String eventJson) throws JsonProcessingExce
msgs.add(msg);
}
catch (Exception e) {
logger.error("HandleRequest execution error", e);
context.getLogger().severe(String.format("HandleRequest execution error: %s",e.getMessage()));
throw e;}
break;
case 1:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,10 @@ public AzureEventGridHandler() throws Exception {
public void run(
@EventGridTrigger(name = "eventgridEvent") EventGridEvent event,
final ExecutionContext context) throws Exception {

context.getLogger().info("GeneXus Event Grid trigger handler. Function processed: " + context.getFunctionName() + " Invocation Id: " + context.getInvocationId());

setupServerlessMappings(context.getFunctionName());
setupEventGridMessage(event);

try {
EventMessageResponse response = dispatchEvent(msgs, rawMessage);
if (response.hasFailed()) {
logger.error(String.format("Messages were not handled. Error: %s", response.getErrorMessage()));
throw new RuntimeException(response.getErrorMessage()); //Throw the exception so the runtime can Retry the operation.
}
} catch (Exception e) {
logger.error("HandleRequest execution error", e);
throw e; //Throw the exception so the runtime can Retry the operation.
}
ExecuteDynamic(msgs, rawMessage);
}

protected void setupEventGridMessage(EventGridEvent event) {
Expand All @@ -46,15 +34,12 @@ protected void setupEventGridMessage(EventGridEvent event) {
msg.setMessageVersion(event.getDataVersion());
msg.setMessageDate(new Date());
msg.setMessageData(event.getData().toString());

List<EventMessageProperty> msgAtts = msg.getMessageProperties();
msgAtts.add(new EventMessageProperty("Id", event.getId()));

msgAtts.add(new EventMessageProperty("Subject",event.getSubject()));
msgAtts.add(new EventMessageProperty("Topic",event.getTopic()));
if (event.getEventTime()!= null)
msgAtts.add(new EventMessageProperty("EventTime",event.getEventTime().toString()));

msgs.add(msg);
break;
case 1:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@ public class AzureFunctionConfiguration extends ServerlessFunctionConfiguration

public AzureFunctionConfiguration() {
}

public AzureFunctionConfiguration(String functionName, String gxEntrypoint) {
this.functionName = functionName;
this.gxEntrypoint = gxEntrypoint;
}

public void setFunctionName(String functionName) {
this.functionName = functionName;
}

public void setGXEntrypoint(String gxEntrypoint) {
this.gxEntrypoint = gxEntrypoint;
}
Expand All @@ -30,6 +33,7 @@ public String getFunctionName() {
public String getGXEntrypoint() {
return gxEntrypoint;
}

@Override
public boolean isValidConfiguration () {
return functionName != null && !functionName.trim().isEmpty() && gxEntrypoint != null && !gxEntrypoint.trim().isEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ public class AzureFunctionConfigurationHelper {

public static List<AzureFunctionConfiguration> getFunctionsMapConfiguration() throws FunctionConfigurationException {
File configFile = new File(FUNCTION_CONFIG_PATH);

if (configFile.exists()) {
try {
String jsonConfig = new String(Files.readAllBytes(Paths.get(FUNCTION_CONFIG_PATH)));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package com.genexus.cloud.serverless.azure.handler;

import com.genexus.cloud.serverless.model.*;

import com.microsoft.azure.functions.annotation.*;
import com.microsoft.azure.functions.ExecutionContext;

import java.time.Instant;
import java.util.List;
import java.util.*;
Expand All @@ -13,6 +11,7 @@ public class AzureQueueHandler extends AzureEventHandler {
public AzureQueueHandler() throws Exception {
super();
}

public void run(
@QueueTrigger(name = "message", queueName = "%queue_name%") String message,
@BindingName("Id") final String id,
Expand Down Expand Up @@ -41,15 +40,6 @@ public void run(
msgAtts.add(new EventMessageProperty("PopReceipt", popReceipt));
msgs.add(msg);
}
try {
EventMessageResponse response = dispatchEvent(msgs, message);
if (response.hasFailed()) {
logger.error(String.format("Messages were not handled. Error: %s", response.getErrorMessage()));
throw new RuntimeException(response.getErrorMessage()); //Throw the exception so the runtime can Retry the operation.
}
} catch (Exception e) {
logger.error("HandleRequest execution error", e);
throw e; //Throw the exception so the runtime can Retry the operation.
}
ExecuteDynamic(msgs, message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.genexus.cloud.serverless.helpers.ServiceBusBatchMessageProcessor;
import com.genexus.cloud.serverless.helpers.ServiceBusProcessedMessage;
import com.genexus.cloud.serverless.model.*;
import com.microsoft.azure.functions.ExecutionContext;
import com.microsoft.azure.functions.annotation.*;

Expand All @@ -22,15 +21,6 @@ public void run(
setupServerlessMappings(context.getFunctionName());
ServiceBusBatchMessageProcessor queueBatchMessageProcessor = new ServiceBusBatchMessageProcessor();
ServiceBusProcessedMessage queueMessage = queueBatchMessageProcessor.processQueueMessage(context, executor, messages);
try {
EventMessageResponse response = dispatchEvent(queueMessage.getEventMessages(),queueMessage.getRawMessage());
if (response.hasFailed()) {
logger.error(String.format("Messages were not handled. Error: %s", response.getErrorMessage()));
throw new RuntimeException(response.getErrorMessage()); //Throw the exception so the runtime can Retry the operation.
}
} catch (Exception e) {
logger.error("HandleRequest execution error", e);
throw e; //Throw the exception so the runtime can Retry the operation.
}
ExecuteDynamic(queueMessage.getEventMessages(),queueMessage.getRawMessage());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.genexus.cloud.serverless.helpers.ServiceBusProcessedMessage;
import com.genexus.cloud.serverless.helpers.ServiceBusSingleMessageProcessor;
import com.genexus.cloud.serverless.model.*;
import com.microsoft.azure.functions.ExecutionContext;
import com.microsoft.azure.functions.annotation.BindingName;
import com.microsoft.azure.functions.annotation.Cardinality;
Expand All @@ -26,15 +25,6 @@ public void run(
setupServerlessMappings(context.getFunctionName());
ServiceBusSingleMessageProcessor queueSingleMessageProcessor = new ServiceBusSingleMessageProcessor();
ServiceBusProcessedMessage queueMessage = queueSingleMessageProcessor.processQueueMessage(executor,messageId,enqueuedTimeUtc,context,message);
try {
EventMessageResponse response = dispatchEvent(queueMessage.getEventMessages(), queueMessage.getRawMessage());
if (response.hasFailed()) {
logger.error(String.format("Messages were not handled. Error: %s", response.getErrorMessage()));
throw new RuntimeException(response.getErrorMessage()); //Throw the exception so the runtime can Retry the operation.
}
} catch (Exception e) {
logger.error("HandleRequest execution error", e);
throw e; //Throw the exception so the runtime can Retry the operation.
}
ExecuteDynamic(queueMessage.getEventMessages(), queueMessage.getRawMessage());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.genexus.cloud.serverless.helpers.ServiceBusBatchMessageProcessor;
import com.genexus.cloud.serverless.helpers.ServiceBusProcessedMessage;
import com.genexus.cloud.serverless.model.*;
import com.microsoft.azure.functions.ExecutionContext;
import com.microsoft.azure.functions.annotation.*;

Expand All @@ -22,15 +21,6 @@ public void run(
setupServerlessMappings(context.getFunctionName());
ServiceBusBatchMessageProcessor queueBatchMessageProcessor = new ServiceBusBatchMessageProcessor();
ServiceBusProcessedMessage queueMessage = queueBatchMessageProcessor.processQueueMessage(context, executor, messages);
try {
EventMessageResponse response = dispatchEvent(queueMessage.getEventMessages(), queueMessage.getRawMessage());
if (response.hasFailed()) {
logger.error(String.format("Messages were not handled. Error: %s", response.getErrorMessage()));
throw new RuntimeException(response.getErrorMessage()); //Throw the exception so the runtime can Retry the operation.
}
} catch (Exception e) {
logger.error("HandleRequest execution error", e);
throw e; //Throw the exception so the runtime can Retry the operation.
}
ExecuteDynamic(queueMessage.getEventMessages(), queueMessage.getRawMessage());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.genexus.cloud.serverless.helpers.ServiceBusProcessedMessage;
import com.genexus.cloud.serverless.helpers.ServiceBusSingleMessageProcessor;
import com.genexus.cloud.serverless.model.*;
import com.microsoft.azure.functions.ExecutionContext;
import com.microsoft.azure.functions.annotation.BindingName;
import com.microsoft.azure.functions.annotation.Cardinality;
Expand All @@ -26,16 +25,7 @@ public void run(
setupServerlessMappings(context.getFunctionName());
ServiceBusSingleMessageProcessor queueSingleMessageProcessor = new ServiceBusSingleMessageProcessor();
ServiceBusProcessedMessage queueMessage = queueSingleMessageProcessor.processQueueMessage(executor,messageId,enqueuedTimeUtc,context,message);
try {
EventMessageResponse response = dispatchEvent(queueMessage.getEventMessages(),queueMessage.getRawMessage());
if (response.hasFailed()) {
logger.error(String.format("Messages were not handled. Error: %s", response.getErrorMessage()));
throw new RuntimeException(response.getErrorMessage()); //Throw the exception so the runtime can Retry the operation.
}
} catch (Exception e) {
logger.error("HandleRequest execution error", e);
throw e; //Throw the exception so the runtime can Retry the operation.
}
ExecuteDynamic(queueMessage.getEventMessages(),queueMessage.getRawMessage());
}
}

Loading
Loading