Skip to content

Commit

Permalink
fix(Api): Added a new IWorkflowInstanceApiClient interface and defaul…
Browse files Browse the repository at this point in the history
…t implementation
  • Loading branch information
cdavernas committed Aug 19, 2024
1 parent 923efd7 commit 640f663
Show file tree
Hide file tree
Showing 16 changed files with 227 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,6 @@ public interface ISynapseApiClient
/// <summary>
/// Gets the Synapse API used to manage <see cref="WorkflowInstance"/>s
/// </summary>
INamespacedResourceApiClient<WorkflowInstance> WorkflowInstances { get; }
IWorkflowInstanceApiClient WorkflowInstances { get; }

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright © 2024-Present The Synapse Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"),
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using Neuroglia.Data.Infrastructure;

namespace Synapse.Api.Client.Services;

/// <summary>
/// Defines the fundamentals of a service used to manage <see cref="WorkflowInstance"/>s using the Synapse API
/// </summary>
public interface IWorkflowInstanceApiClient
: INamespacedResourceApiClient<WorkflowInstance>
{

/// <summary>
/// Reads the logs of the specified <see cref="WorkflowInstance"/>
/// </summary>
/// <param name="name">The name of the <see cref="WorkflowInstance"/> to read the logs of</param>
/// <param name="namespace">The namespace the <see cref="WorkflowInstance"/> to read the logs of belongs to</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/></param>
/// <returns>The logs of the specified <see cref="WorkflowInstance"/></returns>
Task<string> ReadLogsAsync(string name, string @namespace, CancellationToken cancellationToken = default);

/// <summary>
/// Watches the logs of the specified <see cref="WorkflowInstance"/>
/// </summary>
/// <param name="name">The name of the <see cref="WorkflowInstance"/> to watch the logs of</param>
/// <param name="namespace">The namespace the <see cref="WorkflowInstance"/> to watch the logs of belongs to</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/></param>
/// <returns>A new <see cref="IAsyncEnumerable{T}"/> used to watch the logs of the specified <see cref="WorkflowInstance"/></returns>
Task<IAsyncEnumerable<ITextDocumentWatchEvent>> WatchLogsAsync(string name, string @namespace, CancellationToken cancellationToken = default);

}
Original file line number Diff line number Diff line change
Expand Up @@ -268,4 +268,4 @@ public virtual async Task DeleteAsync(string name, string @namespace, Cancellati
await this.ProcessResponseAsync(await this.HttpClient.SendAsync(request, cancellationToken).ConfigureAwait(false), cancellationToken).ConfigureAwait(false);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,14 @@ public SynapseHttpApiClient(IServiceProvider serviceProvider, ILoggerFactory log
this.Serializer = serializer;
this.HttpClient = httpClient;
this.Documents = ActivatorUtilities.CreateInstance<DocumentHttpApiClient>(this.ServiceProvider, this.HttpClient);
foreach (var apiProperty in GetType().GetProperties().Where(p => p.CanRead && p.PropertyType.GetGenericType(typeof(IResourceApiClient<>)) != null))
foreach (var apiProperty in GetType().GetProperties().Where(p => p.CanRead && p.PropertyType.GetGenericType(typeof(IResourceApiClient<>)) != null && p.Name != nameof(WorkflowInstances)))
{
var apiType = apiProperty.PropertyType.GetGenericType(typeof(IResourceApiClient<>))!;
var resourceType = apiType.GetGenericArguments()[0];
var api = ActivatorUtilities.CreateInstance(this.ServiceProvider, typeof(ResourceHttpApiClient<>).MakeGenericType(resourceType), this.HttpClient);
apiProperty.SetValue(this, api);
}
this.WorkflowInstances = ActivatorUtilities.CreateInstance<WorkflowInstanceHttpApiClient>(this.ServiceProvider, this.HttpClient);
this.Users = ActivatorUtilities.CreateInstance<UserHttpApiClient>(this.ServiceProvider, this.HttpClient);
}

Expand Down Expand Up @@ -89,6 +90,6 @@ public SynapseHttpApiClient(IServiceProvider serviceProvider, ILoggerFactory log
public INamespacedResourceApiClient<Workflow> Workflows { get; private set; } = null!;

/// <inheritdoc/>
public INamespacedResourceApiClient<WorkflowInstance> WorkflowInstances { get; private set; } = null!;
public IWorkflowInstanceApiClient WorkflowInstances { get; private set; } = null!;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright © 2024-Present The Synapse Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"),
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using Neuroglia.Data.Infrastructure;

namespace Synapse.Api.Client.Services;

/// <summary>
/// Represents the default HTTP implementation of the <see cref="IWorkflowInstanceApiClient"/> interface
/// </summary>
/// <inheritdoc/>
public class WorkflowInstanceHttpApiClient(IServiceProvider serviceProvider, ILogger<ResourceHttpApiClient<WorkflowInstance>> logger, IOptions<SynapseHttpApiClientOptions> options, IJsonSerializer jsonSerializer, HttpClient httpClient)
: ResourceHttpApiClient<WorkflowInstance>(serviceProvider, logger, options, jsonSerializer, httpClient), IWorkflowInstanceApiClient
{

/// <inheritdoc/>
public virtual async Task<string> ReadLogsAsync(string name, string @namespace, CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(name);
ArgumentException.ThrowIfNullOrWhiteSpace(@namespace);
var resource = new WorkflowInstance();
var uri = $"/api/{resource.Definition.Version}/{resource.Definition.Plural}/{@namespace}/{name}/logs";
using var request = await this.ProcessRequestAsync(new HttpRequestMessage(HttpMethod.Get, uri), cancellationToken).ConfigureAwait(false);
using var response = await this.ProcessResponseAsync(await this.HttpClient.SendAsync(request, cancellationToken).ConfigureAwait(false), cancellationToken).ConfigureAwait(false);
return await response.Content.ReadAsStringAsync(cancellationToken).ConfigureAwait(false);
}

/// <inheritdoc/>
public virtual async Task<IAsyncEnumerable<ITextDocumentWatchEvent>> WatchLogsAsync(string name, string @namespace, CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(name);
ArgumentException.ThrowIfNullOrWhiteSpace(@namespace);
var resource = new WorkflowInstance();
var uri = $"/api/{resource.Definition.Version}/{resource.Definition.Plural}/{@namespace}/{name}/logs/watch";
using var request = await this.ProcessRequestAsync(new HttpRequestMessage(HttpMethod.Get, uri), cancellationToken).ConfigureAwait(false);
var response = await this.ProcessResponseAsync(await this.HttpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false), cancellationToken).ConfigureAwait(false);
var responseStream = await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false);
return this.JsonSerializer.DeserializeAsyncEnumerable<TextDocumentWatchEvent>(responseStream, cancellationToken)!;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ protected virtual async Task<bool> TryFilterEventAsync(EventFilterDefinition fil
{
if (!await this.ExpressionEvaluator.EvaluateConditionAsync(valueStr, e, cancellationToken: cancellationToken)) return false;
}
else if (!string.IsNullOrWhiteSpace(valueStr) && !Regex.IsMatch(value.ToString() ?? string.Empty, valueStr, RegexOptions.IgnoreCase)) return false;
else if (!string.IsNullOrWhiteSpace(valueStr) && !Regex.IsMatch(value?.ToString() ?? string.Empty, valueStr, RegexOptions.IgnoreCase)) return false;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,12 @@ public virtual async Task<TaskInstance> CreateTaskAsync(TaskDefinition definitio
var contextReference = string.Empty;
if (context == null)
{
if(parent?.ContextData != null)
if (parent?.ContextData != null)
{
contextReference = parent.Instance.ContextReference;
context = parent.ContextData;
}
else if(!string.IsNullOrWhiteSpace(this.Instance.Status?.ContextReference))
else if (!string.IsNullOrWhiteSpace(this.Instance.Status?.ContextReference))
{
contextReference = this.Instance.Status?.ContextReference;
var contextDocument = await this.Api.Documents.GetAsync(this.Instance.Status!.ContextReference, cancellationToken).ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ protected virtual IServiceCollection ConfigureServices(IServiceCollection servic
services.AddSingleton<ITaskExecutorFactory, TaskExecutorFactory>();
services.AddMemoryCacheRepository<Document, string>();
services.AddScoped<IResourceRepository, MockResourceRepository>();
services.AddScoped<ITextDocumentRepository<string>, MockTextDocumentRepository<string>>();
services.AddScoped<ITextDocumentRepository>(provider => provider.GetRequiredService<ITextDocumentRepository<string>>());
services.AddCloudEventBus();
services.AddHttpClient();
services.AddSingleton<DockerContainerPlatform>();
Expand Down
2 changes: 2 additions & 0 deletions tests/Synapse.UnitTests/Cases/Runner/RunnerTestsBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ protected virtual IServiceCollection ConfigureServices(IServiceCollection servic
services.AddPythonScriptExecutor();
services.AddMemoryCacheRepository<Document, string>();
services.AddScoped<IResourceRepository, MockResourceRepository>();
services.AddScoped<ITextDocumentRepository<string>, MockTextDocumentRepository<string>>();
services.AddScoped<ITextDocumentRepository>(provider => provider.GetRequiredService<ITextDocumentRepository<string>>());
services.AddCloudEventBus();
services.AddHttpClient();
services.AddSingleton<DockerContainerPlatform>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,6 @@ public async Task Call_Function_Should_Work()
Name = "${ .username }"
})
],
};
var parameters = new Neuroglia.EquatableDictionary<string, object>()
{
new("username", "${ .user.username }")
};
var taskDefinition = new CallTaskDefinition()
{
Call = functionName,
With = parameters,
Input = new()
{
Schema = new()
Expand All @@ -58,6 +49,15 @@ public async Task Call_Function_Should_Work()
}
}
};
var parameters = new Neuroglia.EquatableDictionary<string, object>()
{
new("username", "${ .user.username }")
};
var taskDefinition = new CallTaskDefinition()
{
Call = functionName,
With = parameters
};
var workflowDefinition = new WorkflowDefinitionBuilder()
.WithNamespace("default")
.WithName("fake-workflow")
Expand Down
3 changes: 2 additions & 1 deletion tests/Synapse.UnitTests/Cases/Runtime/NativeRuntimeTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ protected override void ConfigureServices(IServiceCollection services)
};
options.Runtime.Native = new()
{
Directory = Path.Combine("..", "..", "..", "..", "..", "src", "runner", "Synapse.Runner", "bin", "Debug", "net8.0")
Directory = Path.Combine("..", "..", "..", "..", "..", "src", "runner", "Synapse.Runner", "bin", "Debug", "net8.0"),
Executable = "Synapse.Runner"
};
});
services.AddSingleton<IWorkflowRuntime, NativeRuntime>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ protected WorkflowRuntimeTestsBase()
services.AddSingleton<ITaskExecutorFactory, TaskExecutorFactory>();
services.AddMemoryCacheRepository<Document, string>();
services.AddScoped<IResourceRepository, MockResourceRepository>();
services.AddScoped<ITextDocumentRepository<string>, MockTextDocumentRepository<string>>();
services.AddScoped<ITextDocumentRepository>(provider => provider.GetRequiredService<ITextDocumentRepository<string>>());
services.AddCloudEventBus();
services.AddHttpClient();
services.AddSingleton<DockerContainerPlatform>();
Expand Down
3 changes: 2 additions & 1 deletion tests/Synapse.UnitTests/Services/MockCloudFlowsApiClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@ internal class MockSynapseApiClient(IServiceProvider serviceProvider)

public INamespacedResourceApiClient<Workflow> Workflows { get; } = ActivatorUtilities.CreateInstance<MockNamespacedResourceApiClient<Workflow>>(serviceProvider);

public INamespacedResourceApiClient<WorkflowInstance> WorkflowInstances { get; } = ActivatorUtilities.CreateInstance<MockNamespacedResourceApiClient<WorkflowInstance>>(serviceProvider);
public IWorkflowInstanceApiClient WorkflowInstances { get; } = ActivatorUtilities.CreateInstance<MockWorkflowInstanceApiClient>(serviceProvider);

public IDocumentApiClient Documents { get; } = ActivatorUtilities.CreateInstance<MockDocumentApiClient>(serviceProvider);

public INamespacedResourceApiClient<ServiceAccount> ServiceAccounts => throw new NotImplementedException();

public IUserApiClient Users => throw new NotImplementedException();

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,4 @@ internal class MockNamespacedResourceApiClient<TResource>(IResourceRepository re

public Task<TResource> ReplaceStatusAsync(TResource resource, CancellationToken cancellationToken = default) => resources.ReplaceStatusAsync(resource, false, cancellationToken);

}
}
76 changes: 76 additions & 0 deletions tests/Synapse.UnitTests/Services/MockTextDocumentRepository.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright © 2024-Present The Synapse Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"),
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using Neuroglia.Data.Infrastructure;
using Neuroglia.Data.Infrastructure.Services;
using System.Runtime.CompilerServices;

namespace Synapse.UnitTests.Services;

internal class MockTextDocumentRepository<TKey>
: ITextDocumentRepository<TKey>
where TKey : IEquatable<TKey>
{
public Task AppendAsync(TKey key, string text, CancellationToken cancellationToken = default) => Task.CompletedTask;

public Task AppendAsync(object key, string text, CancellationToken cancellationToken = default) => Task.CompletedTask;

public Task DeleteAsync(TKey key, CancellationToken cancellationToken = default) => Task.CompletedTask;

public Task DeleteAsync(object key, CancellationToken cancellationToken = default) => Task.CompletedTask;

public async IAsyncEnumerable<ITextDocument<TKey>> GetAllAsync([EnumeratorCancellation] CancellationToken cancellationToken = default)
{
await Task.CompletedTask;
yield break;
}

public Task<ITextDocument<TKey>?> GetAsync(TKey key, CancellationToken cancellationToken = default) => Task.FromResult((ITextDocument<TKey>?)null);

public Task<ITextDocument?> GetAsync(object key, CancellationToken cancellationToken = default) => Task.FromResult((ITextDocument?)null);

public Task<System.Collections.Generic.ICollection<ITextDocument<TKey>>> ListAsync(int? max = null, int? skip = null, CancellationToken cancellationToken = default) => Task.FromResult((System.Collections.Generic.ICollection<ITextDocument<TKey>>)[]);

public async IAsyncEnumerable<string> ReadAsync(TKey key, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
await Task.CompletedTask;
yield break;
}

public async IAsyncEnumerable<string> ReadAsync(object key, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
await Task.CompletedTask;
yield break;
}

public Task<string> ReadToEndAsync(TKey key, CancellationToken cancellationToken = default) => Task.FromResult(string.Empty);

public Task<string> ReadToEndAsync(object key, CancellationToken cancellationToken = default) => Task.FromResult(string.Empty);

public Task ReplaceAsync(TKey key, string text, CancellationToken cancellationToken = default) => Task.CompletedTask;

public Task ReplaceAsync(object key, string text, CancellationToken cancellationToken = default) => Task.CompletedTask;

public Task<ITextDocumentWatch> WatchAsync(TKey key, CancellationToken cancellationToken = default) => Task.FromResult((ITextDocumentWatch)null!);

public Task<ITextDocumentWatch> WatchAsync(object key, CancellationToken cancellationToken = default) => Task.FromResult((ITextDocumentWatch)null!);

async IAsyncEnumerable<ITextDocument> ITextDocumentRepository.GetAllAsync([EnumeratorCancellation] CancellationToken cancellationToken)
{
await Task.CompletedTask;
yield break;
}

Task<System.Collections.Generic.ICollection<ITextDocument>> ITextDocumentRepository.ListAsync(int? max, int? skip, CancellationToken cancellationToken) => Task.FromResult((System.Collections.Generic.ICollection<ITextDocument>)[]);

}
29 changes: 29 additions & 0 deletions tests/Synapse.UnitTests/Services/MockWorkflowInstanceApiClient.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright © 2024-Present The Synapse Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"),
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using Neuroglia.Data.Infrastructure;
using Neuroglia.Data.Infrastructure.ResourceOriented.Services;
using Neuroglia.Data.Infrastructure.Services;
using Synapse.Api.Client.Services;

namespace Synapse.UnitTests.Services;

internal class MockWorkflowInstanceApiClient(IResourceRepository resources, ITextDocumentRepository<string> logs)
: MockNamespacedResourceApiClient<WorkflowInstance>(resources), IWorkflowInstanceApiClient
{

public Task<string> ReadLogsAsync(string name, string @namespace, CancellationToken cancellationToken = default) => logs.ReadToEndAsync($"{name}.{@namespace}", cancellationToken);

public async Task<IAsyncEnumerable<ITextDocumentWatchEvent>> WatchLogsAsync(string name, string @namespace, CancellationToken cancellationToken = default) => (await logs.WatchAsync($"{name}.{@namespace}", cancellationToken)).ToAsyncEnumerable();

}

0 comments on commit 640f663

Please sign in to comment.