diff --git a/.gitignore b/.gitignore index 64de12b6c..13035ac99 100644 --- a/.gitignore +++ b/.gitignore @@ -231,3 +231,5 @@ _Pvt_Extensions # FAKE - F# Make .fake/ /deployment/docker-compose/secrets/basic.json +/src/dashboard/Synapse.Dashboard/wwwroot/lib/bootstrap/node_modules +/src/dashboard/Synapse.Dashboard/wwwroot/lib/bootstrap/package-lock.json diff --git a/Synapse.sln b/Synapse.sln index 015cef214..7da6cabb5 100644 --- a/Synapse.sln +++ b/Synapse.sln @@ -43,8 +43,6 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Synapse.UnitTests", "tests\ EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Synapse.Api.Client.Core", "src\api\Synapse.Api.Client.Core\Synapse.Api.Client.Core.csproj", "{3E650D2D-B018-4056-8922-FC7DF68233B1}" EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Synapse.Operator.Application", "src\operator\Synapse.Operator.Application\Synapse.Operator.Application.csproj", "{7627C036-B55A-479C-AB0D-C859D4A11238}" -EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "runtime", "runtime", "{175CE1C5-FE17-4C8B-8823-E812BAD4E527}" EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Synapse.Runtime.Abstractions", "src\runtime\Synapse.Runtime.Abstractions\Synapse.Runtime.Abstractions.csproj", "{A0E5E7F2-8C9C-4F36-B3FD-C09074893023}" @@ -57,6 +55,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Synapse.Runner", "src\runne EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Synapse.Dashboard.StateManagement", "src\dashboard\Synapse.Dashboard.StateManagement\Synapse.Dashboard.StateManagement.csproj", "{91EF9F64-4997-407C-B353-C26B1421D0FB}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Synapse.Operator", "src\operator\Synapse.Operator\Synapse.Operator.csproj", "{A9085F4A-5FDF-4F4A-B267-A03BC5E0FDB0}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -107,10 +107,6 @@ Global {3E650D2D-B018-4056-8922-FC7DF68233B1}.Debug|Any CPU.Build.0 = Debug|Any CPU {3E650D2D-B018-4056-8922-FC7DF68233B1}.Release|Any CPU.ActiveCfg = Release|Any CPU {3E650D2D-B018-4056-8922-FC7DF68233B1}.Release|Any CPU.Build.0 = Release|Any CPU - {7627C036-B55A-479C-AB0D-C859D4A11238}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {7627C036-B55A-479C-AB0D-C859D4A11238}.Debug|Any CPU.Build.0 = Debug|Any CPU - {7627C036-B55A-479C-AB0D-C859D4A11238}.Release|Any CPU.ActiveCfg = Release|Any CPU - {7627C036-B55A-479C-AB0D-C859D4A11238}.Release|Any CPU.Build.0 = Release|Any CPU {A0E5E7F2-8C9C-4F36-B3FD-C09074893023}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {A0E5E7F2-8C9C-4F36-B3FD-C09074893023}.Debug|Any CPU.Build.0 = Debug|Any CPU {A0E5E7F2-8C9C-4F36-B3FD-C09074893023}.Release|Any CPU.ActiveCfg = Release|Any CPU @@ -131,6 +127,10 @@ Global {91EF9F64-4997-407C-B353-C26B1421D0FB}.Debug|Any CPU.Build.0 = Debug|Any CPU {91EF9F64-4997-407C-B353-C26B1421D0FB}.Release|Any CPU.ActiveCfg = Release|Any CPU {91EF9F64-4997-407C-B353-C26B1421D0FB}.Release|Any CPU.Build.0 = Release|Any CPU + {A9085F4A-5FDF-4F4A-B267-A03BC5E0FDB0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {A9085F4A-5FDF-4F4A-B267-A03BC5E0FDB0}.Debug|Any CPU.Build.0 = Debug|Any CPU + {A9085F4A-5FDF-4F4A-B267-A03BC5E0FDB0}.Release|Any CPU.ActiveCfg = Release|Any CPU + {A9085F4A-5FDF-4F4A-B267-A03BC5E0FDB0}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -153,13 +153,13 @@ Global {6F7ED286-A02C-4E78-9FE3-2689BD94B97C} = {F041A1CB-45FA-4432-BAD2-DE2EE174C6FC} {CB1DF439-BFBA-408D-BC09-3FF94E6C4F0F} = {F041A1CB-45FA-4432-BAD2-DE2EE174C6FC} {3E650D2D-B018-4056-8922-FC7DF68233B1} = {63715FC0-736D-4972-A865-41126155DF45} - {7627C036-B55A-479C-AB0D-C859D4A11238} = {32EAD165-3D99-42CD-B3AF-05136DCC7F35} {175CE1C5-FE17-4C8B-8823-E812BAD4E527} = {4B9AF05C-9D6D-48C0-994D-D4A5C28FA24D} {A0E5E7F2-8C9C-4F36-B3FD-C09074893023} = {175CE1C5-FE17-4C8B-8823-E812BAD4E527} {DC24E506-602F-4FD9-B8C0-CEA6B2AD8888} = {175CE1C5-FE17-4C8B-8823-E812BAD4E527} {F327B8F1-9A13-4924-AE1B-E69788AC73E7} = {175CE1C5-FE17-4C8B-8823-E812BAD4E527} {E5FAA9BA-07C3-49CF-AD3B-897AE1D0B018} = {1DA47E5F-B23A-4D3C-96AA-4BD2662AB946} {91EF9F64-4997-407C-B353-C26B1421D0FB} = {7DF998B8-0FB1-470E-8ED0-EA1CC7B16901} + {A9085F4A-5FDF-4F4A-B267-A03BC5E0FDB0} = {32EAD165-3D99-42CD-B3AF-05136DCC7F35} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {2A6C03D6-355A-4B39-9F2B-D0FDE429C0E2} diff --git a/src/api/Synapse.Api.Client.Core/Services/ISynapseApiClient.cs b/src/api/Synapse.Api.Client.Core/Services/ISynapseApiClient.cs index 5dcb89218..7bb2bbd5c 100644 --- a/src/api/Synapse.Api.Client.Core/Services/ISynapseApiClient.cs +++ b/src/api/Synapse.Api.Client.Core/Services/ISynapseApiClient.cs @@ -11,6 +11,11 @@ public interface ISynapseApiClient /// IClusterResourceApiClient Namespaces { get; } + /// + /// Gets the Synapse API used to manage s + /// + INamespacedResourceApiClient Operators { get; } + /// /// Gets the Synapse API used to manage s /// diff --git a/src/api/Synapse.Api.Client.Http/Configuration/SynapseHttpApiClientOptions.cs b/src/api/Synapse.Api.Client.Http/Configuration/SynapseHttpApiClientOptions.cs index 8cf83c31c..96c4b68bd 100644 --- a/src/api/Synapse.Api.Client.Http/Configuration/SynapseHttpApiClientOptions.cs +++ b/src/api/Synapse.Api.Client.Http/Configuration/SynapseHttpApiClientOptions.cs @@ -6,9 +6,18 @@ public class SynapseHttpApiClientOptions { + /// + /// Initializes a new + /// + public SynapseHttpApiClientOptions() + { + var uri = Environment.GetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Api.Uri); + this.BaseAddress = string.IsNullOrWhiteSpace(uri) ? null! : new(uri, UriKind.RelativeOrAbsolute); + } + /// /// Gets/sets the base address of the Cloud Streams API to connect to /// - public required virtual Uri BaseAddress { get; set; } + public virtual Uri BaseAddress { get; set; } } diff --git a/src/api/Synapse.Api.Client.Http/Services/ResourceHttpApiClient.cs b/src/api/Synapse.Api.Client.Http/Services/ResourceHttpApiClient.cs index 437c2090d..c34c11659 100644 --- a/src/api/Synapse.Api.Client.Http/Services/ResourceHttpApiClient.cs +++ b/src/api/Synapse.Api.Client.Http/Services/ResourceHttpApiClient.cs @@ -96,11 +96,10 @@ public virtual async Task GetDefinitionAsync(CancellationTok } /// - public virtual async Task> ListAsync(string @namespace, IEnumerable? labelSelectors = null, CancellationToken cancellationToken = default) + public virtual async Task> ListAsync(string? @namespace = null, IEnumerable? labelSelectors = null, CancellationToken cancellationToken = default) { - ArgumentException.ThrowIfNullOrWhiteSpace(@namespace); var resource = new TResource(); - var uri = $"/api/{resource.Definition.Version}/{resource.Definition.Plural}/{@namespace}"; + var uri = string.IsNullOrWhiteSpace(@namespace) ? $"/api/{resource.Definition.Version}/{resource.Definition.Plural}" : $"/api/{resource.Definition.Version}/{resource.Definition.Plural}/{@namespace}"; var queryStringArguments = new Dictionary(); if (labelSelectors?.Any() == true) queryStringArguments.Add(nameof(labelSelectors), labelSelectors.Select(s => s.ToString()).Join(',')); if (queryStringArguments.Count != 0) uri += $"?{queryStringArguments.Select(kvp => $"{kvp.Key}={kvp.Value}").Join('&')}"; @@ -228,7 +227,7 @@ public virtual async Task DeleteAsync(string name, string @namespace, Cancellati ArgumentException.ThrowIfNullOrWhiteSpace(name); ArgumentException.ThrowIfNullOrWhiteSpace(@namespace); var resource = new TResource(); - var uri = $"/api/{resource.Definition.Version}/{@namespace}/{resource.Definition.Plural}/{name}"; + var uri = $"/api/{resource.Definition.Version}/{resource.Definition.Plural}/{@namespace}/{name}"; using var request = await this.ProcessRequestAsync(new HttpRequestMessage(HttpMethod.Delete, uri), cancellationToken).ConfigureAwait(false); await ProcessResponseAsync(await this.HttpClient.SendAsync(request, cancellationToken).ConfigureAwait(false), cancellationToken).ConfigureAwait(false); } diff --git a/src/api/Synapse.Api.Client.Http/Services/SynapseHttpApiClient.cs b/src/api/Synapse.Api.Client.Http/Services/SynapseHttpApiClient.cs index 977a008b4..6ea85fabf 100644 --- a/src/api/Synapse.Api.Client.Http/Services/SynapseHttpApiClient.cs +++ b/src/api/Synapse.Api.Client.Http/Services/SynapseHttpApiClient.cs @@ -69,6 +69,9 @@ public SynapseHttpApiClient(IServiceProvider serviceProvider, ILoggerFactory log /// protected HttpClient HttpClient { get; } + /// + public INamespacedResourceApiClient Operators { get; private set; } = null!; + /// public IClusterResourceApiClient Namespaces { get; private set; } = null!; diff --git a/src/api/Synapse.Api.Http/Controllers/OperatorsController.cs b/src/api/Synapse.Api.Http/Controllers/OperatorsController.cs new file mode 100644 index 000000000..f27058554 --- /dev/null +++ b/src/api/Synapse.Api.Http/Controllers/OperatorsController.cs @@ -0,0 +1,16 @@ +using Synapse.Resources; + +namespace Synapse.Api.Http.Controllers; + +/// +/// Represents the used to manage s +/// +/// The service used to mediate calls +[Route("api/v1/operators")] +public class OperatorsController(IMediator mediator) + : NamespacedResourceController(mediator) +{ + + + +} diff --git a/src/api/Synapse.Api.Http/NamespacedResourceController.cs b/src/api/Synapse.Api.Http/NamespacedResourceController.cs index cea4465ea..1b12de5ff 100644 --- a/src/api/Synapse.Api.Http/NamespacedResourceController.cs +++ b/src/api/Synapse.Api.Http/NamespacedResourceController.cs @@ -30,6 +30,21 @@ public async Task GetResource(string name, string @namespace, Can return this.Process(await this.Mediator.ExecuteAsync(new GetResourceQuery(name, @namespace), cancellationToken).ConfigureAwait(false)); } + /// + /// Lists matching resources + /// + /// A comma-separated list of label selectors, if any + /// A + /// A new + [HttpGet] + [ProducesResponseType(typeof(Collection), (int)HttpStatusCode.OK)] + [ProducesErrorResponseType(typeof(Neuroglia.ProblemDetails))] + public virtual async Task GetResources(string? labelSelector = null, CancellationToken cancellationToken = default) + { + if (!this.TryParseLabelSelectors(labelSelector, out var labelSelectors)) return this.InvalidLabelSelector(labelSelector!); + return this.Process(await this.Mediator.ExecuteAsync(new GetResourcesQuery(null, labelSelectors), cancellationToken).ConfigureAwait(false)); + } + /// /// Lists matching resources /// @@ -46,6 +61,23 @@ public virtual async Task GetResources(string @namespace, string? return this.Process(await this.Mediator.ExecuteAsync(new GetResourcesQuery(@namespace, labelSelectors), cancellationToken).ConfigureAwait(false)); } + /// + /// Lists matching resources + /// + /// A comma-separated list of label selectors, if any + /// The maximum amount, if any, of results to list at once + /// A token, defined by a previously retrieved collection, used to continue enumerating through matches + /// A + /// A new + [HttpGet("list")] + [ProducesResponseType(typeof(Collection), (int)HttpStatusCode.OK)] + [ProducesErrorResponseType(typeof(Neuroglia.ProblemDetails))] + public virtual async Task ListResources(string? labelSelector = null, ulong? maxResults = null, string? continuationToken = null, CancellationToken cancellationToken = default) + { + if (!this.TryParseLabelSelectors(labelSelector, out var labelSelectors)) return this.InvalidLabelSelector(labelSelector!); + return this.Process(await this.Mediator.ExecuteAsync(new ListResourcesQuery(null, labelSelectors, maxResults, continuationToken), cancellationToken).ConfigureAwait(false)); + } + /// /// Lists matching resources /// diff --git a/src/api/Synapse.Api.Http/Services/ResourceWatchEventHubController.cs b/src/api/Synapse.Api.Http/Services/ResourceWatchEventHubController.cs index 6713fba0f..81cd552da 100644 --- a/src/api/Synapse.Api.Http/Services/ResourceWatchEventHubController.cs +++ b/src/api/Synapse.Api.Http/Services/ResourceWatchEventHubController.cs @@ -86,7 +86,7 @@ public virtual async Task WatchResourcesAsync(string connectionId, ResourceDefin }); } watch = await this.Resources.WatchAsync(definition.Group, definition.Version, definition.Plural, @namespace, cancellationToken: cancellationToken).ConfigureAwait(false); - watch.SubscribeAsync(e => this.OnResourceWatchEventAsync(connectionId, e)); + watch.SubscribeAsync(e => this.OnResourceWatchEventAsync(connectionId, e), this.CancellationTokenSource.Token); subscriptions.AddOrUpdate(subscriptionKey, watch, (key, current) => { current.Dispose(); diff --git a/src/core/Synapse.Core.Infrastructure.Containers.Docker/DockerContainer.cs b/src/core/Synapse.Core.Infrastructure.Containers.Docker/DockerContainer.cs index 48b21af97..a17dd5287 100644 --- a/src/core/Synapse.Core.Infrastructure.Containers.Docker/DockerContainer.cs +++ b/src/core/Synapse.Core.Infrastructure.Containers.Docker/DockerContainer.cs @@ -24,6 +24,8 @@ public class DockerContainer(string id, IDockerClient dockerClient) : IContainer { + bool _disposed; + /// /// Gets the container's ID /// @@ -35,15 +37,17 @@ public class DockerContainer(string id, IDockerClient dockerClient) protected virtual IDockerClient DockerClient { get; } = dockerClient; /// - public virtual StreamReader? StandardOutput { get; set; } + public virtual StreamReader? StandardOutput { get; protected set; } + + /// + public virtual StreamReader? StandardError { get; protected set; } /// - public virtual StreamReader? StandardError { get; set; } + public long? ExitCode { get; protected set; } /// public virtual async Task StartAsync(CancellationToken cancellationToken = default) { - await this.DockerClient.Containers.StartContainerAsync(this.Id, new() { }, cancellationToken).ConfigureAwait(false); #pragma warning disable CS0618 // Type or member is obsolete var standardOutputStream = await this.DockerClient.Containers.GetContainerLogsAsync(this.Id, new() { Follow = true, ShowStdout = true, ShowStderr = true, Timestamps = false }, cancellationToken).ConfigureAwait(false); @@ -54,9 +58,49 @@ public virtual async Task StartAsync(CancellationToken cancellationToken = defau } /// - public virtual Task WaitForExitAsync(CancellationToken cancellationToken = default) => this.DockerClient.Containers.WaitContainerAsync(this.Id, cancellationToken); + public virtual async Task WaitForExitAsync(CancellationToken cancellationToken = default) + { + var response = await this.DockerClient.Containers.WaitContainerAsync(this.Id, cancellationToken).ConfigureAwait(false); + this.ExitCode = response.StatusCode; + } /// public virtual Task StopAsync(CancellationToken cancellationToken = default) => this.DockerClient.Containers.StopContainerAsync(this.Id, new() { }, cancellationToken); + /// + /// Disposes of the + /// + /// A boolean indicating whether or not the is being disposed of + /// A new awaitable + protected virtual async ValueTask DisposeAsync(bool disposing) + { + if (!this._disposed) return; + this._disposed = true; + await Task.CompletedTask.ConfigureAwait(false); + } + + /// + public async ValueTask DisposeAsync() + { + await this.DisposeAsync(true).ConfigureAwait(false); + GC.SuppressFinalize(this); + } + + /// + /// Disposes of the + /// + /// A boolean indicating whether or not the is being disposed of + protected virtual void Dispose(bool disposing) + { + if (!this._disposed) return; + this._disposed = true; + } + + /// + public void Dispose() + { + this.Dispose(true); + GC.SuppressFinalize(this); + } + } \ No newline at end of file diff --git a/src/core/Synapse.Core.Infrastructure/Extensions/IServiceCollectionExtensions.cs b/src/core/Synapse.Core.Infrastructure/Extensions/IServiceCollectionExtensions.cs index 23ea868de..ea4f50a0e 100644 --- a/src/core/Synapse.Core.Infrastructure/Extensions/IServiceCollectionExtensions.cs +++ b/src/core/Synapse.Core.Infrastructure/Extensions/IServiceCollectionExtensions.cs @@ -39,9 +39,9 @@ public static IServiceCollection AddSynapse(this IServiceCollection services, IC services.AddPluginProvider(); var redisConnectionString = configuration.GetConnectionString(RedisDatabase.ConnectionStringName); - services.AddPlugin(typeof(IDatabase), string.IsNullOrWhiteSpace(redisConnectionString) ? null : provider => provider.GetRequiredService(), serviceLifetime: ServiceLifetime.Scoped); + services.AddPlugin(typeof(IDatabase), string.IsNullOrWhiteSpace(redisConnectionString) ? null : provider => provider.GetRequiredService(), serviceLifetime: ServiceLifetime.Singleton); - if (!string.IsNullOrWhiteSpace(redisConnectionString)) services.AddRedisDatabase(redisConnectionString, ServiceLifetime.Scoped); + if (!string.IsNullOrWhiteSpace(redisConnectionString)) services.AddRedisDatabase(redisConnectionString, ServiceLifetime.Singleton); services.AddHostedService(); services.AddPlugin(typeof(IRepository), provider => provider.GetRequiredService>(), serviceLifetime: ServiceLifetime.Scoped); diff --git a/src/core/Synapse.Core.Infrastructure/Services/Interfaces/IContainer.cs b/src/core/Synapse.Core.Infrastructure/Services/Interfaces/IContainer.cs index 7c77735e3..d8c2d9901 100644 --- a/src/core/Synapse.Core.Infrastructure/Services/Interfaces/IContainer.cs +++ b/src/core/Synapse.Core.Infrastructure/Services/Interfaces/IContainer.cs @@ -17,6 +17,7 @@ namespace Synapse.Core.Infrastructure.Services; /// Defines the fundamentals of a container /// public interface IContainer + : IDisposable, IAsyncDisposable { /// @@ -29,6 +30,11 @@ public interface IContainer /// StreamReader? StandardError { get; } + /// + /// Gets the container's exit code + /// + long? ExitCode { get; } + /// /// Starts the container /// diff --git a/src/core/Synapse.Core.Infrastructure/Services/Interfaces/IContainerPlatform.cs b/src/core/Synapse.Core.Infrastructure/Services/Interfaces/IContainerPlatform.cs index 9af0fd811..3aff3dfd5 100644 --- a/src/core/Synapse.Core.Infrastructure/Services/Interfaces/IContainerPlatform.cs +++ b/src/core/Synapse.Core.Infrastructure/Services/Interfaces/IContainerPlatform.cs @@ -16,7 +16,7 @@ namespace Synapse.Core.Infrastructure.Services; /// -/// Defines the fundamentals of a service used to interact with a container platform +/// Defines the fundamentals of a service used to manage containers /// public interface IContainerPlatform { diff --git a/src/core/Synapse.Core/OperatorRuntimeMode.cs b/src/core/Synapse.Core/OperatorRuntimeMode.cs new file mode 100644 index 000000000..cfe5368a1 --- /dev/null +++ b/src/core/Synapse.Core/OperatorRuntimeMode.cs @@ -0,0 +1,28 @@ +namespace Synapse; + +/// +/// Enumerates all default operator runtime modes +/// +public static class OperatorRuntimeMode +{ + + /// + /// Gets the native operator runtime mode + /// + public const string Native = "native"; + /// + /// Gets the containerized operator runtime mode + /// + public const string Containerized = "containerized"; + + /// + /// Gets a new containing all default operator runtime modes + /// + /// A new containing all default operator runtime modes + public static IEnumerable AsEnumerable() + { + yield return Native; + yield return Containerized; + } + +} \ No newline at end of file diff --git a/src/core/Synapse.Core/OperatorStatusPhase.cs b/src/core/Synapse.Core/OperatorStatusPhase.cs new file mode 100644 index 000000000..dafdeb477 --- /dev/null +++ b/src/core/Synapse.Core/OperatorStatusPhase.cs @@ -0,0 +1,41 @@ +// Copyright © 2024-Present Neuroglia SRL. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +namespace Synapse; + +/// +/// Exposes all default operator status phases +/// +public static class OperatorStatusPhase +{ + + /// + /// Indicates that the operator is running + /// + public const string Running = "running"; + /// + /// Indicates that the operator has been stopped and is not running + /// + public const string Stopped = "stopped"; + + /// + /// Gets an containing default operator status phases + /// + /// A new containing default operator status phases + public static IEnumerable AsEnumerable() + { + yield return Running; + yield return Stopped; + } + +} diff --git a/src/core/Synapse.Core/Resources/CertificateValidationStrategyDefinition.cs b/src/core/Synapse.Core/Resources/CertificateValidationStrategyDefinition.cs new file mode 100644 index 000000000..22b2d6229 --- /dev/null +++ b/src/core/Synapse.Core/Resources/CertificateValidationStrategyDefinition.cs @@ -0,0 +1,32 @@ +// Copyright © 2024-Present Neuroglia SRL. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System.ComponentModel; + +namespace Synapse.Resources; + +/// +/// Represents the configuration of a certificate validation strategy +/// +[DataContract] +public record CertificateValidationStrategyDefinition +{ + + /// + /// Gets/sets a boolean indicating whether or not to validate certificates when performing requests + /// + [DefaultValue(true)] + [DataMember(Order = 1, Name = "validate"), JsonPropertyOrder(1), JsonPropertyName("validate"), YamlMember(Order = 1, Alias = "validate")] + public virtual bool Validate { get; set; } = true; + +} \ No newline at end of file diff --git a/src/core/Synapse.Core/Resources/NativeRunnerProcessDefinition.cs b/src/core/Synapse.Core/Resources/NativeRunnerProcessDefinition.cs new file mode 100644 index 000000000..943e1a0c6 --- /dev/null +++ b/src/core/Synapse.Core/Resources/NativeRunnerProcessDefinition.cs @@ -0,0 +1,35 @@ +// Copyright © 2024-Present Neuroglia SRL. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +namespace Synapse.Resources; + +/// +/// Represents the definition of a native workflow runner process +/// +[DataContract] +public record NativeRunnerProcessDefinition +{ + + /// + /// Gets/sets the path to the file to execute to run a workflow instance + /// + [DataMember(Order = 1, Name = "executable"), JsonPropertyOrder(1), JsonPropertyName("executable"), YamlMember(Order = 1, Alias = "executable")] + public virtual string Executable { get; set; } = Path.Combine(AppContext.BaseDirectory, "bin", "runner", "Synapse.Runner"); + + /// + /// Gets/sets the working directory + /// + [DataMember(Order = 2, Name = "directory"), JsonPropertyOrder(2), JsonPropertyName("directory"), YamlMember(Order = 2, Alias = "directory")] + public virtual string Directory { get; set; } = Path.Combine(AppContext.BaseDirectory, "bin", "runner"); + +} diff --git a/src/core/Synapse.Core/Resources/Operator.cs b/src/core/Synapse.Core/Resources/Operator.cs index 5842affe1..f21a4473b 100644 --- a/src/core/Synapse.Core/Resources/Operator.cs +++ b/src/core/Synapse.Core/Resources/Operator.cs @@ -20,7 +20,7 @@ namespace Synapse.Resources; /// [DataContract] public record Operator - : Resource + : Resource { /// diff --git a/src/core/Synapse.Core/Resources/Operator.yaml b/src/core/Synapse.Core/Resources/Operator.yaml new file mode 100644 index 000000000..0c3efe6cf --- /dev/null +++ b/src/core/Synapse.Core/Resources/Operator.yaml @@ -0,0 +1,31 @@ +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: operators.synapse.io +spec: + scope: Namespaced + group: synapse.io + names: + plural: operators + singular: operator + kind: Operators + shortNames: + - op + versions: + - name: v1 + served: true + storage: true + schema: + openAPIV3Schema: + type: object + properties: + spec: + type: object + description: Configures the Synapse Operator + properties: {} #todo + status: + type: object + required: + - spec + subresources: + status: {} \ No newline at end of file diff --git a/src/core/Synapse.Core/Resources/OperatorSpec.cs b/src/core/Synapse.Core/Resources/OperatorSpec.cs index f6b4072dd..c0ebaedb0 100644 --- a/src/core/Synapse.Core/Resources/OperatorSpec.cs +++ b/src/core/Synapse.Core/Resources/OperatorSpec.cs @@ -20,6 +20,20 @@ namespace Synapse.Resources; public record OperatorSpec { + /// + /// Gets/sets the options used to configure the runtime used by the Synapse Operator to spawn workflow instance processes + /// + [DataMember(Order = 1, Name = "runner"), JsonPropertyOrder(1), JsonPropertyName("runner"), YamlMember(Order = 1, Alias = "runner")] + public virtual RunnerDefinition Runner { get; set; } = new() + { + Runtime = new() + }; + /// + /// Gets/sets a key/value mapping of the labels to select both workflows and workflow instances by. + /// If not set, the broker will attempt to pick up all unclaimed workflows and workflow instances + /// + [DataMember(Order = 2, Name = "selector"), JsonPropertyOrder(2), JsonPropertyName("selector"), YamlMember(Order = 2, Alias = "selector")] + public virtual IDictionary? Selector { get; set; } -} \ No newline at end of file +} diff --git a/src/core/Synapse.Core/Resources/OperatorStatus.cs b/src/core/Synapse.Core/Resources/OperatorStatus.cs new file mode 100644 index 000000000..3222f4ee2 --- /dev/null +++ b/src/core/Synapse.Core/Resources/OperatorStatus.cs @@ -0,0 +1,29 @@ +// Copyright © 2024-Present Neuroglia SRL. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +namespace Synapse.Resources; + +/// +/// Represents an object used to describe the status of an operator +/// +[DataContract] +public record OperatorStatus +{ + + /// + /// Gets/sets the operator's current status phase + /// + [DataMember(Name = "phase", Order = 1), JsonPropertyName("phase"), JsonPropertyOrder(1), YamlMember(Alias = "phase", Order = 1)] + public virtual string? Phase { get; set; } + +} \ No newline at end of file diff --git a/src/core/Synapse.Core/Resources/RunnerDefinition.cs b/src/core/Synapse.Core/Resources/RunnerDefinition.cs new file mode 100644 index 000000000..4494c390f --- /dev/null +++ b/src/core/Synapse.Core/Resources/RunnerDefinition.cs @@ -0,0 +1,50 @@ +// Copyright © 2024-Present Neuroglia SRL. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +namespace Synapse.Resources; + +/// +/// Represents the definition of a Synapse workflow runner +/// +[DataContract] +public record RunnerDefinition +{ + + /// + /// Gets/sets the endpoint that references the base address and authentication policy for the Synapse API used by runners + /// + [Required] + [DataMember(Order = 1, Name = "api"), JsonPropertyOrder(1), JsonPropertyName("api"), YamlMember(Order = 1, Alias = "api")] + public virtual EndpointDefinition Api { get; set; } = null!; + + /// + /// Gets/sets the options used to configure the runtime used by the Synapse Operator to spawn workflow instance processes + /// + [Required] + [DataMember(Order = 2, Name = "runtime"), JsonPropertyOrder(2), JsonPropertyName("runtime"), YamlMember(Order = 2, Alias = "runtime")] + public virtual RuntimeDefinition Runtime { get; set; } = new() + { + Container = new() + { + Image = "ghcr.io/serverlessworkflow/synapse/runner" + } + }; + + /// + /// Gets/sets the endpoint that references the base address and authentication policy for the Synapse API used by runners + /// + [Required] + [DataMember(Order = 3, Name = "certificate"), JsonPropertyOrder(3), JsonPropertyName("certificate"), YamlMember(Order = 3, Alias = "certificate")] + public virtual CertificateValidationStrategyDefinition Certificates { get; set; } = null!; + +} diff --git a/src/core/Synapse.Core/Resources/RuntimeDefinition.cs b/src/core/Synapse.Core/Resources/RuntimeDefinition.cs new file mode 100644 index 000000000..f140835fe --- /dev/null +++ b/src/core/Synapse.Core/Resources/RuntimeDefinition.cs @@ -0,0 +1,43 @@ +// Copyright © 2024-Present Neuroglia SRL. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using ServerlessWorkflow.Sdk.Models.Processes; + +namespace Synapse.Resources; + +/// +/// Represents an object used to configure the runtime used to spawn workflow instance processes +/// +[DataContract] +public record RuntimeDefinition +{ + + /// + /// Gets/sets the options used to configure the runtime, when using the native mode + /// + [DataMember(Order = 1, Name = "native"), JsonPropertyOrder(1), JsonPropertyName("native"), YamlMember(Order = 1, Alias = "native")] + public virtual NativeRunnerProcessDefinition? Native { get; set; } + + /// + /// Gets/sets the options used to configure the runtime, when using the container mode + /// + [DataMember(Order = 2, Name = "container"), JsonPropertyOrder(2), JsonPropertyName("container"), YamlMember(Order = 2, Alias = "container")] + public virtual ContainerProcessDefinition? Container { get; set; } + + /// + /// Gets the runtime mode + /// + [IgnoreDataMember, JsonIgnore, YamlIgnore] + public virtual string Mode => this.Native != null ? OperatorRuntimeMode.Native : this.Container != null ? OperatorRuntimeMode.Containerized : throw new Exception("The runtime mode must be set"); + +} diff --git a/src/core/Synapse.Core/Synapse.Core.csproj b/src/core/Synapse.Core/Synapse.Core.csproj index 3be50a739..910d13698 100644 --- a/src/core/Synapse.Core/Synapse.Core.csproj +++ b/src/core/Synapse.Core/Synapse.Core.csproj @@ -14,11 +14,13 @@ + + diff --git a/src/core/Synapse.Core/SynapseDefaults.cs b/src/core/Synapse.Core/SynapseDefaults.cs index f6cfce5e6..b94f14c23 100644 --- a/src/core/Synapse.Core/SynapseDefaults.cs +++ b/src/core/Synapse.Core/SynapseDefaults.cs @@ -58,7 +58,29 @@ public static class Api /// /// Gets the environment variable used to configure the base uri of the Synapse API to use /// - public const string Uri = Prefix + "HOST"; + public const string Uri = Prefix + "URI"; + + } + + /// + /// Exposes constants about operator-related environment variables + /// + public static class Operator + { + + /// + /// Gets the prefix for all operator related environment variables + /// + public const string Prefix = EnvironmentVariables.Prefix + "OPERATOR_"; + + /// + /// Gets the environment variable used to configure the operator's namespace + /// + public const string Namespace = Prefix + "NAMESPACE"; + /// + /// Gets the environment variable used to configure the operator's name + /// + public const string Name = Prefix + "NAME"; } @@ -104,6 +126,11 @@ public static class Definitions /// public static ResourceDefinition WorkflowInstance { get; } = new WorkflowInstanceResourceDefinition(); + /// + /// Gets the definition of Operator resources + /// + public static ResourceDefinition Operator { get; } = new OperatorResourceDefinition(); + /// /// Gets a new containing Synapse default resource definitions /// @@ -112,6 +139,7 @@ public static IEnumerable AsEnumerable() { yield return Workflow; yield return WorkflowInstance; + yield return Operator; } } diff --git a/src/dashboard/Synapse.Dashboard/Components/Layout/MainLayout.razor b/src/dashboard/Synapse.Dashboard/Components/Layout/MainLayout.razor index 5fca86796..3c5be5d6f 100644 --- a/src/dashboard/Synapse.Dashboard/Components/Layout/MainLayout.razor +++ b/src/dashboard/Synapse.Dashboard/Components/Layout/MainLayout.razor @@ -17,8 +17,13 @@ + + diff --git a/src/dashboard/Synapse.Dashboard/Components/ResourceManagement/ClusterResourceManagementComponent.cs b/src/dashboard/Synapse.Dashboard/Components/ResourceManagement/ClusterResourceManagementComponent.cs new file mode 100644 index 000000000..50fe7b4aa --- /dev/null +++ b/src/dashboard/Synapse.Dashboard/Components/ResourceManagement/ClusterResourceManagementComponent.cs @@ -0,0 +1,27 @@ +// Copyright © 2024-Present The Synapse Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +namespace Synapse.Dashboard.Components; + +/// +/// Represents the base class for all components used to manage s +/// +/// The type of to manage +public abstract class ClusterResourceManagementComponent + : ResourceManagementComponent, TResource> + where TResource : Resource, new() +{ + + + +} \ No newline at end of file diff --git a/src/dashboard/Synapse.Dashboard/Components/ResourceManagement/ClusterResourceManagementComponentStore.cs b/src/dashboard/Synapse.Dashboard/Components/ResourceManagement/ClusterResourceManagementComponentStore.cs index 652c65158..d5fa05f85 100644 --- a/src/dashboard/Synapse.Dashboard/Components/ResourceManagement/ClusterResourceManagementComponentStore.cs +++ b/src/dashboard/Synapse.Dashboard/Components/ResourceManagement/ClusterResourceManagementComponentStore.cs @@ -26,187 +26,51 @@ namespace Synapse.Dashboard.Components.ResourceManagement; /// The service used to interact with the Synapse API /// The websocket service client public class ClusterResourceManagementComponentStore(ISynapseApiClient apiClient, ResourceWatchEventHubClient resourceEventHub) - : ComponentStore>(new()) + : ResourceManagementComponentStoreBase(apiClient, resourceEventHub) where TResource : Resource, new() { - ResourceDefinition? _definition; - EquatableList? _resources; - - /// - /// Gets an used to observe s of the specified type - /// - public IObservable Definition => this.Select(s => s.Definition); - - /// - /// Gets an used to observe s of the specified type - /// - public IObservable?> Resources => this.Select(s => s.Resources); - - /// - /// Gets an used to observe changes - /// - public IObservable Loading => this.Select(state => state.Loading).DistinctUntilChanged(); - - /// - /// Gets the service used to interact with the Synapse API - /// - protected ISynapseApiClient ApiClient { get; } = apiClient; - - /// - /// Gets the websocket service client - /// - protected ResourceWatchEventHubClient ResourceEventHub { get; } = resourceEventHub; - - /// - /// Gets the service used to monitor resources of the specified type - /// - protected Api.Client.Services.ResourceWatch ResourceWatch { get; private set; } = null!; - - /// - /// Gets an that represents the store's subscription - /// - protected IDisposable ResourceWatchSubscription { get; private set; } = null!; - /// - public override async Task InitializeAsync() - { - await this.ResourceEventHub.StartAsync().ConfigureAwait(false); - this.ResourceWatch = await this.ResourceEventHub.WatchAsync().ConfigureAwait(false); - this.ResourceWatch.SubscribeAsync(OnResourceWatchEventAsync, onErrorAsync: ex => Task.Run(() => Console.WriteLine(ex))); - await base.InitializeAsync(); - } - - /// - /// Fetches the definition of the managed type - /// - /// A new awaitable - public virtual async Task GetResourceDefinitionAsync() + public override async Task GetResourceDefinitionAsync() { - this._definition = await this.ApiClient.ManageCluster().GetDefinitionAsync().ConfigureAwait(false); + this.ResourceDefinition = await this.ApiClient.ManageCluster().GetDefinitionAsync().ConfigureAwait(false); this.Reduce(s => s with { - Definition = this._definition + Definition = this.ResourceDefinition }); } - /// - /// Lists all the channels managed by Synapse - /// - /// A new awaitable - public virtual async Task ListResourcesAsync() + /// + public override async Task ListResourcesAsync() { this.Reduce(state => state with { Loading = true }); - this._resources = new EquatableList(await (await this.ApiClient.ManageCluster().ListAsync().ConfigureAwait(false)).ToListAsync().ConfigureAwait(false)); + this.ResourceList = new EquatableList(await (await this.ApiClient.ManageCluster().ListAsync().ConfigureAwait(false)).ToListAsync().ConfigureAwait(false)); this.Reduce(s => s with { - Resources = this._resources, + Resources = this.ResourceList, Loading = false }); } - /// - /// Deletes the specified - /// - /// The to delete - /// A new awaitable - public virtual async Task DeleteResourceAsync(TResource resource) + /// + public override async Task DeleteResourceAsync(TResource resource) { await this.ApiClient.ManageCluster().DeleteAsync(resource.GetName()).ConfigureAwait(false); - var match = this._resources?.ToList().FirstOrDefault(r => r.GetName() == resource.GetName() && r.GetNamespace() == resource.GetNamespace()); + var match = this.ResourceList?.ToList().FirstOrDefault(r => r.GetName() == resource.GetName() && r.GetNamespace() == resource.GetNamespace()); var resourceCollectionChanged = false; if (match != null) { - this._resources!.Remove(match); + this.ResourceList!.Remove(match); resourceCollectionChanged = true; } if (!resourceCollectionChanged) return; this.Reduce(s => s with { - Resources = this._resources + Resources = this.ResourceList }); } - /// - /// Handles the specified - /// - /// The to handle - /// A new awaitable - protected virtual Task OnResourceWatchEventAsync(IResourceWatchEvent e) - { - switch (e.Type) - { - case ResourceWatchEventType.Created: - this.Reduce(state => - { - var resources = state.Resources == null ? [] : new EquatableList(state.Resources); - resources.Add(e.Resource); - return state with - { - Resources = resources - }; - }); - break; - case ResourceWatchEventType.Updated: - this.Reduce(state => - { - if (state.Resources == null) - { - return state; - } - var resources = new EquatableList(state.Resources); - var resource = resources.FirstOrDefault(r => r.GetQualifiedName() == e.Resource.GetQualifiedName()); - if (resource == null) return state; - var index = resources.IndexOf(resource); - resources.Remove(resource); - resources.Insert(index, e.Resource); - return state with - { - Resources = resources - }; - }); - break; - case ResourceWatchEventType.Deleted: - this.Reduce(state => - { - if (state.Resources == null) - { - return state; - } - var resources = new EquatableList(state.Resources); - var resource = resources.FirstOrDefault(r => r.GetQualifiedName() == e.Resource.GetQualifiedName()); - if (resource == null) return state; - resources.Remove(resource); - return state with - { - Resources = resources - }; - }); - break; - default: - throw new NotSupportedException($"The specified {nameof(ResourceWatchEventType)} '{e.Type}' is not supported"); - } - return Task.CompletedTask; - } - - /// - protected override void Dispose(bool disposing) - { - if (!disposing) return; - this.ResourceWatchSubscription?.Dispose(); - base.Dispose(disposing); - } - - /// - protected override async ValueTask DisposeAsync(bool disposing) - { - if (!disposing) return; - await this.ResourceWatch.DisposeAsync().ConfigureAwait(false); - this.ResourceWatchSubscription.Dispose(); - base.Dispose(disposing); - } - -} \ No newline at end of file +} diff --git a/src/dashboard/Synapse.Dashboard/Components/ResourceManagement/NamespacedResourceManagementComponent.cs b/src/dashboard/Synapse.Dashboard/Components/ResourceManagement/NamespacedResourceManagementComponent.cs new file mode 100644 index 000000000..5dc8f821e --- /dev/null +++ b/src/dashboard/Synapse.Dashboard/Components/ResourceManagement/NamespacedResourceManagementComponent.cs @@ -0,0 +1,27 @@ +// Copyright © 2024-Present The Synapse Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +namespace Synapse.Dashboard.Components; + +/// +/// Represents the base class for all components used to manage s +/// +/// The type of to manage +public abstract class NamespacedResourceManagementComponent + : ResourceManagementComponent, TResource> + where TResource : Resource, new() +{ + + + +} diff --git a/src/dashboard/Synapse.Dashboard/Components/ResourceManagement/NamespacedResourceManagementComponentStore.cs b/src/dashboard/Synapse.Dashboard/Components/ResourceManagement/NamespacedResourceManagementComponentStore.cs index 8fc9f5e3c..8564a04d2 100644 --- a/src/dashboard/Synapse.Dashboard/Components/ResourceManagement/NamespacedResourceManagementComponentStore.cs +++ b/src/dashboard/Synapse.Dashboard/Components/ResourceManagement/NamespacedResourceManagementComponentStore.cs @@ -26,187 +26,51 @@ namespace Synapse.Dashboard.Components.ResourceManagement; /// The service used to interact with the Synapse API /// The websocket service client public class NamespacedResourceManagementComponentStore(ISynapseApiClient apiClient, ResourceWatchEventHubClient resourceEventHub) - : ComponentStore>(new()) + : ResourceManagementComponentStoreBase(apiClient, resourceEventHub) where TResource : Resource, new() { - ResourceDefinition? _definition; - EquatableList? _resources; - - /// - /// Gets an used to observe s of the specified type - /// - public IObservable Definition => this.Select(s => s.Definition); - - /// - /// Gets an used to observe s of the specified type - /// - public IObservable?> Resources => this.Select(s => s.Resources); - - /// - /// Gets an used to observe changes - /// - public IObservable Loading => this.Select(state => state.Loading).DistinctUntilChanged(); - - /// - /// Gets the service used to interact with the Synapse API - /// - protected ISynapseApiClient ApiClient { get; } = apiClient; - - /// - /// Gets the websocket service client - /// - protected ResourceWatchEventHubClient ResourceEventHub { get; } = resourceEventHub; - - /// - /// Gets the service used to monitor resources of the specified type - /// - protected Api.Client.Services.ResourceWatch ResourceWatch { get; private set; } = null!; - - /// - /// Gets an that represents the store's subscription - /// - protected IDisposable ResourceWatchSubscription { get; private set; } = null!; - /// - public override async Task InitializeAsync() - { - await this.ResourceEventHub.StartAsync().ConfigureAwait(false); - this.ResourceWatch = await this.ResourceEventHub.WatchAsync().ConfigureAwait(false); - this.ResourceWatch.SubscribeAsync(OnResourceWatchEventAsync, onErrorAsync: ex => Task.Run(() => Console.WriteLine(ex))); - await base.InitializeAsync(); - } - - /// - /// Fetches the definition of the managed type - /// - /// A new awaitable - public virtual async Task GetResourceDefinitionAsync() + public override async Task GetResourceDefinitionAsync() { - this._definition = await this.ApiClient.ManageNamespaced().GetDefinitionAsync().ConfigureAwait(false); + this.ResourceDefinition = await this.ApiClient.ManageNamespaced().GetDefinitionAsync().ConfigureAwait(false); this.Reduce(s => s with { - Definition = this._definition + Definition = this.ResourceDefinition }); } - /// - /// Lists all the channels managed by Synapse - /// - /// A new awaitable - public virtual async Task ListResourcesAsync() + /// + public override async Task ListResourcesAsync() { this.Reduce(state => state with { Loading = true }); - this._resources = new EquatableList(await (await this.ApiClient.ManageNamespaced().ListAsync("all").ConfigureAwait(false)).ToListAsync().ConfigureAwait(false)); + this.ResourceList = new EquatableList(await (await this.ApiClient.ManageNamespaced().ListAsync(null!).ConfigureAwait(false)).ToListAsync().ConfigureAwait(false)); this.Reduce(s => s with { - Resources = this._resources, + Resources = this.ResourceList, Loading = false }); } - /// - /// Deletes the specified - /// - /// The to delete - /// A new awaitable - public virtual async Task DeleteResourceAsync(TResource resource) + /// + public override async Task DeleteResourceAsync(TResource resource) { await this.ApiClient.ManageNamespaced().DeleteAsync(resource.GetName(), resource.GetNamespace()!).ConfigureAwait(false); - var match = this._resources?.ToList().FirstOrDefault(r => r.GetName() == resource.GetName() && r.GetNamespace() == resource.GetNamespace()); + var match = this.ResourceList?.ToList().FirstOrDefault(r => r.GetName() == resource.GetName() && r.GetNamespace() == resource.GetNamespace()); var resourceCollectionChanged = false; if (match != null) { - this._resources!.Remove(match); + this.ResourceList!.Remove(match); resourceCollectionChanged = true; } if (!resourceCollectionChanged) return; this.Reduce(s => s with { - Resources = this._resources + Resources = this.ResourceList }); } - /// - /// Handles the specified - /// - /// The to handle - /// A new awaitable - protected virtual Task OnResourceWatchEventAsync(IResourceWatchEvent e) - { - switch (e.Type) - { - case ResourceWatchEventType.Created: - this.Reduce(state => - { - var resources = state.Resources == null ? [] : new EquatableList(state.Resources); - resources.Add(e.Resource); - return state with - { - Resources = resources - }; - }); - break; - case ResourceWatchEventType.Updated: - this.Reduce(state => - { - if (state.Resources == null) - { - return state; - } - var resources = new EquatableList(state.Resources); - var resource = resources.FirstOrDefault(r => r.GetQualifiedName() == e.Resource.GetQualifiedName()); - if (resource == null) return state; - var index = resources.IndexOf(resource); - resources.Remove(resource); - resources.Insert(index, e.Resource); - return state with - { - Resources = resources - }; - }); - break; - case ResourceWatchEventType.Deleted: - this.Reduce(state => - { - if (state.Resources == null) - { - return state; - } - var resources = new EquatableList(state.Resources); - var resource = resources.FirstOrDefault(r => r.GetQualifiedName() == e.Resource.GetQualifiedName()); - if (resource == null) return state; - resources.Remove(resource); - return state with - { - Resources = resources - }; - }); - break; - default: - throw new NotSupportedException($"The specified {nameof(ResourceWatchEventType)} '{e.Type}' is not supported"); - } - return Task.CompletedTask; - } - - /// - protected override void Dispose(bool disposing) - { - if (!disposing) return; - this.ResourceWatchSubscription?.Dispose(); - base.Dispose(disposing); - } - - /// - protected override async ValueTask DisposeAsync(bool disposing) - { - if (!disposing) return; - await this.ResourceWatch.DisposeAsync().ConfigureAwait(false); - this.ResourceWatchSubscription.Dispose(); - base.Dispose(disposing); - } - } diff --git a/src/dashboard/Synapse.Dashboard/Components/ResourceManagement/ResourceManagementComponent.cs b/src/dashboard/Synapse.Dashboard/Components/ResourceManagement/ResourceManagementComponent.cs index 6d5db4da8..2b80fb6e9 100644 --- a/src/dashboard/Synapse.Dashboard/Components/ResourceManagement/ResourceManagementComponent.cs +++ b/src/dashboard/Synapse.Dashboard/Components/ResourceManagement/ResourceManagementComponent.cs @@ -11,16 +11,21 @@ // See the License for the specific language governing permissions and // limitations under the License. +using Json.Schema; + namespace Synapse.Dashboard.Components; /// /// Represents the base class for all components used to manage s /// +/// The type of to use /// The type of to manage -public abstract class ResourceManagementComponent - : StatefulComponent, ResourceManagementComponentState> +public abstract class ResourceManagementComponent + : StatefulComponent> + where TStore : ResourceManagementComponentStoreBase where TResource : Resource, new() { + /// /// Gets/sets the service to build a bridge with the monaco interop extension /// @@ -75,7 +80,7 @@ protected override async Task OnInitializedAsync() this.Definition = definition; if (this.Definition != null && this.MonacoInterop != null) { - await this.MonacoInterop.AddValidationSchemaAsync(this.Serializer.SerializeToText(this.Definition.Spec.Versions.First().Schema.OpenAPIV3Schema), $"https://synapse.io/schemas/{typeof(TResource).Name.ToLower()}.json", $"{typeof(TResource).Name.ToLower()}").ConfigureAwait(false); + await this.MonacoInterop.AddValidationSchemaAsync(this.Serializer.SerializeToText(this.Definition.Spec.Versions.First().Schema.OpenAPIV3Schema ?? new JsonSchemaBuilder().Build()), $"https://synapse.io/schemas/{typeof(TResource).Name.ToLower()}.json", $"{typeof(TResource).Name.ToLower()}").ConfigureAwait(false); } } }, cancellationToken: this.CancellationTokenSource.Token); @@ -88,14 +93,14 @@ protected override async Task OnInitializedAsync() /// Patches the 's fields after a change /// /// The patch to apply - private void OnStateChanged(Action> patch) + private void OnStateChanged(Action> patch) { patch(this); this.StateHasChanged(); } /// - /// Updates the + /// Updates the /// /// protected void OnResourceCollectionChanged(EquatableList? resources) diff --git a/src/dashboard/Synapse.Dashboard/Components/ResourceManagement/ResourceManagementComponentStoreBase.cs b/src/dashboard/Synapse.Dashboard/Components/ResourceManagement/ResourceManagementComponentStoreBase.cs new file mode 100644 index 000000000..e57b7babb --- /dev/null +++ b/src/dashboard/Synapse.Dashboard/Components/ResourceManagement/ResourceManagementComponentStoreBase.cs @@ -0,0 +1,184 @@ +// Copyright © 2024-Present The Synapse Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Synapse.Api.Client.Services; +using System.Reactive.Linq; + +namespace Synapse.Dashboard.Components.ResourceManagement; + +/// +/// Represents a used to manage Synapse s of the specified type +/// +/// The type of s to manage +/// +/// Initializes a new +/// +/// The service used to interact with the Synapse API +/// The websocket service client +public abstract class ResourceManagementComponentStoreBase(ISynapseApiClient apiClient, ResourceWatchEventHubClient resourceEventHub) + : ComponentStore>(new()) + where TResource : Resource, new() +{ + + /// + /// Gets an used to observe s of the specified type + /// + public IObservable Definition => this.Select(s => s.Definition); + + /// + /// Gets an used to observe s of the specified type + /// + public IObservable?> Resources => this.Select(s => s.Resources); + + /// + /// Gets an used to observe changes + /// + public IObservable Loading => this.Select(state => state.Loading).DistinctUntilChanged(); + + /// + /// Gets the service used to interact with the Synapse API + /// + protected ISynapseApiClient ApiClient { get; } = apiClient; + + /// + /// Gets the websocket service client + /// + protected ResourceWatchEventHubClient ResourceEventHub { get; } = resourceEventHub; + + /// + /// Gets the service used to monitor resources of the specified type + /// + protected Api.Client.Services.ResourceWatch ResourceWatch { get; private set; } = null!; + + /// + /// Gets an that represents the store's subscription + /// + protected IDisposable ResourceWatchSubscription { get; private set; } = null!; + + /// + /// Gets the definition of managed s + /// + protected ResourceDefinition? ResourceDefinition { get; set; } + + /// + /// Gets a list containing local copies of managed s + /// + protected EquatableList? ResourceList { get; set; } + + /// + public override async Task InitializeAsync() + { + await this.ResourceEventHub.StartAsync().ConfigureAwait(false); + this.ResourceWatch = await this.ResourceEventHub.WatchAsync().ConfigureAwait(false); + this.ResourceWatch.SubscribeAsync(OnResourceWatchEventAsync, onErrorAsync: ex => Task.Run(() => Console.WriteLine(ex))); + await base.InitializeAsync(); + } + + /// + /// Fetches the definition of the managed type + /// + /// A new awaitable + public abstract Task GetResourceDefinitionAsync(); + + /// + /// Lists all the channels managed by Synapse + /// + /// A new awaitable + public abstract Task ListResourcesAsync(); + + /// + /// Deletes the specified + /// + /// The to delete + /// A new awaitable + public abstract Task DeleteResourceAsync(TResource resource); + + /// + /// Handles the specified + /// + /// The to handle + /// A new awaitable + protected virtual Task OnResourceWatchEventAsync(IResourceWatchEvent e) + { + switch (e.Type) + { + case ResourceWatchEventType.Created: + this.Reduce(state => + { + var resources = state.Resources == null ? [] : new EquatableList(state.Resources); + resources.Add(e.Resource); + return state with + { + Resources = resources + }; + }); + break; + case ResourceWatchEventType.Updated: + this.Reduce(state => + { + if (state.Resources == null) + { + return state; + } + var resources = new EquatableList(state.Resources); + var resource = resources.FirstOrDefault(r => r.GetQualifiedName() == e.Resource.GetQualifiedName()); + if (resource == null) return state; + var index = resources.IndexOf(resource); + resources.Remove(resource); + resources.Insert(index, e.Resource); + return state with + { + Resources = resources + }; + }); + break; + case ResourceWatchEventType.Deleted: + this.Reduce(state => + { + if (state.Resources == null) + { + return state; + } + var resources = new EquatableList(state.Resources); + var resource = resources.FirstOrDefault(r => r.GetQualifiedName() == e.Resource.GetQualifiedName()); + if (resource == null) return state; + resources.Remove(resource); + return state with + { + Resources = resources + }; + }); + break; + default: + throw new NotSupportedException($"The specified {nameof(ResourceWatchEventType)} '{e.Type}' is not supported"); + } + return Task.CompletedTask; + } + + /// + protected override void Dispose(bool disposing) + { + if (!disposing) return; + this.ResourceWatchSubscription?.Dispose(); + base.Dispose(disposing); + } + + /// + protected override async ValueTask DisposeAsync(bool disposing) + { + if (!disposing) return; + await this.ResourceWatch.DisposeAsync().ConfigureAwait(false); + this.ResourceWatchSubscription.Dispose(); + base.Dispose(disposing); + } +} \ No newline at end of file diff --git a/src/dashboard/Synapse.Dashboard/Pages/About/View.razor b/src/dashboard/Synapse.Dashboard/Pages/About/View.razor new file mode 100644 index 000000000..dd62240df --- /dev/null +++ b/src/dashboard/Synapse.Dashboard/Pages/About/View.razor @@ -0,0 +1,52 @@ +@namespace Synapse.Dashboard.Components +@page "/about" + +Synapse - About + + +
+ +

Synapse

+
+
+ +
+ + + + + + + + + + + + + + + + + + + +
Version + @typeof(MainLayout).Assembly.GetName().Version!.ToString(3) +
LicenseApache 2.0
CopyrightCopyright © 2024-Present The Synapse Authors. All Rights Reserved.
Repositoryhttps://github.com/serverlessworkflow/synapse/
+
+ +
+
+
+ GitHub logo +

+ A question, an idea, want to contribute? +
+ Join in the fun on GitHub! + New contributors are welcome! +

+
+
+
\ No newline at end of file diff --git a/src/dashboard/Synapse.Dashboard/Pages/Namespaces/List/View.razor b/src/dashboard/Synapse.Dashboard/Pages/Namespaces/List/View.razor new file mode 100644 index 000000000..7abcb9472 --- /dev/null +++ b/src/dashboard/Synapse.Dashboard/Pages/Namespaces/List/View.razor @@ -0,0 +1,48 @@ +@namespace Synapse.Dashboard.Pages.Namespaces.List +@page "/namespaces" +@inherits ClusterResourceManagementComponent + +Namespaces + +
+ @if (Loading) + { +
+
+ Loading... +
+
+ } + + + + + + + + + + @if (Resources != null && Resources.Any()) + { + + + + + + + + } + +
NameCreated At
@resource.Metadata.Name@resource.Metadata.CreationTimestamp.ToString() + + +
+
+ + + + + + + + \ No newline at end of file diff --git a/src/dashboard/Synapse.Dashboard/Pages/Operators/List/View.razor b/src/dashboard/Synapse.Dashboard/Pages/Operators/List/View.razor new file mode 100644 index 000000000..17e942fbd --- /dev/null +++ b/src/dashboard/Synapse.Dashboard/Pages/Operators/List/View.razor @@ -0,0 +1,66 @@ +@namespace Synapse.Dashboard.Pages.Operators.List +@page "/operators" +@inherits NamespacedResourceManagementComponent + +Operators + +
+ @if (Loading) + { +
+
+ Loading... +
+
+ } + + + + + + + + + + + + @if (Resources != null && Resources.Any()) + { + + + + + + + + + + } + +
NamespaceNameCreated AtStatus
@resource.Metadata.Namespace@resource.Metadata.Name@resource.Metadata.CreationTimestamp.ToString()@resource.Status?.Phase + + +
+
+ + + + + + + + + +@{ + + string GetStatusClass(Operator @operator) + { + return @operator.Status?.Phase switch + { + OperatorStatusPhase.Running => "bg-primary", + OperatorStatusPhase.Stopped => "bg-danger", + _ => "bg-secondary" + }; + } + +} \ No newline at end of file diff --git a/src/dashboard/Synapse.Dashboard/Pages/WorkflowInstances/List/View.razor b/src/dashboard/Synapse.Dashboard/Pages/WorkflowInstances/List/View.razor new file mode 100644 index 000000000..bbadef672 --- /dev/null +++ b/src/dashboard/Synapse.Dashboard/Pages/WorkflowInstances/List/View.razor @@ -0,0 +1,80 @@ +@namespace Synapse.Dashboard.Pages.WorkflowInstances.List +@page "/workflow-instances" +@inherits NamespacedResourceManagementComponent + +Workflow Instances + +
+ @if (Loading) + { +
+
+ Loading... +
+
+ } + + + + + + + + + + + + + @if (Resources != null && Resources.Any()) + { + + + + + + + + + + + } + +
NamespaceNameCreated AtOperatorStatus
@resource.Metadata.Namespace@resource.Metadata.Name@resource.Metadata.CreationTimestamp.ToString() + @if (resource.Metadata.Labels?.TryGetValue(SynapseDefaults.Resources.Labels.Operator, out var operatorName) == true && !string.IsNullOrWhiteSpace(operatorName)) + { + @operatorName + } + else + { + - + } + @resource.Status?.Phase + + +
+
+ + + + + + + + + +@{ + + string GetStatusClass(WorkflowInstance instance) + { + return instance.Status?.Phase switch + { + WorkflowInstanceStatusPhase.Pending => "bg-secondary", + WorkflowInstanceStatusPhase.Running => "bg-primary", + WorkflowInstanceStatusPhase.Faulted => "bg-danger", + WorkflowInstanceStatusPhase.Cancelled => "bg-warning", + WorkflowInstanceStatusPhase.Completed => "bg-success", + _ => "bg-secondary" + }; + } + +} \ No newline at end of file diff --git a/src/dashboard/Synapse.Dashboard/Pages/Workflows/List/View.razor b/src/dashboard/Synapse.Dashboard/Pages/Workflows/List/View.razor index 2dea66b04..483da1dc4 100644 --- a/src/dashboard/Synapse.Dashboard/Pages/Workflows/List/View.razor +++ b/src/dashboard/Synapse.Dashboard/Pages/Workflows/List/View.razor @@ -1,6 +1,6 @@ @namespace Synapse.Dashboard.Pages.Workflows.List @page "/" -@inherits ResourceManagementComponent +@inherits NamespacedResourceManagementComponent Workflows @@ -30,7 +30,7 @@ @resource.Metadata.Namespace - @resource.Metadata.Name + @resource.Metadata.Name @resource.Metadata.CreationTimestamp.ToString() @resource.Spec.Versions.Count @resource.Spec.Versions.Last().Document.Version diff --git a/src/dashboard/Synapse.Dashboard/wwwroot/img/github-logo.png b/src/dashboard/Synapse.Dashboard/wwwroot/img/github-logo.png new file mode 100644 index 000000000..84ed9088e Binary files /dev/null and b/src/dashboard/Synapse.Dashboard/wwwroot/img/github-logo.png differ diff --git a/src/operator/Synapse.Operator.Application/Synapse.Operator.Application.csproj b/src/operator/Synapse.Operator.Application/Synapse.Operator.Application.csproj deleted file mode 100644 index 844fc1c71..000000000 --- a/src/operator/Synapse.Operator.Application/Synapse.Operator.Application.csproj +++ /dev/null @@ -1,16 +0,0 @@ - - - - net8.0 - enable - enable - en - True - - - - - - - - diff --git a/src/operator/Synapse.Operator.Application/Usings.cs b/src/operator/Synapse.Operator.Application/Usings.cs deleted file mode 100644 index d22c2dcaa..000000000 --- a/src/operator/Synapse.Operator.Application/Usings.cs +++ /dev/null @@ -1,12 +0,0 @@ -global using Microsoft.Extensions.DependencyInjection; -global using Microsoft.Extensions.Logging; -global using Microsoft.Extensions.Options; -global using Neuroglia; -global using Neuroglia.Data; -global using Neuroglia.Data.Infrastructure.ResourceOriented; -global using Neuroglia.Data.Infrastructure.ResourceOriented.Configuration; -global using Neuroglia.Data.Infrastructure.ResourceOriented.Services; -global using ServerlessWorkflow.Sdk.Models; -global using Synapse.Resources; -global using Synapse.Runtime.Services; -global using System.Collections.Concurrent; diff --git a/src/operator/Synapse.Operator/Configuration/OperatorOptions.cs b/src/operator/Synapse.Operator/Configuration/OperatorOptions.cs new file mode 100644 index 000000000..c7d689e14 --- /dev/null +++ b/src/operator/Synapse.Operator/Configuration/OperatorOptions.cs @@ -0,0 +1,33 @@ +namespace Synapse.Operator.Configuration; + +/// +/// Represents the options used to configure a Synapse Operator application +/// +public class OperatorOptions +{ + + /// + /// Initializes a new + /// + public OperatorOptions() + { + Namespace = Environment.GetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Operator.Namespace)!; + Name = Environment.GetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Operator.Name)!; + } + + /// + /// Gets/sets the operator's namespace + /// + public virtual string Namespace { get; set; } + + /// + /// Gets/sets the operator's name + /// + public virtual string Name { get; set; } + + /// + /// Gets/sets the options used to configure the runners spawned by a Synapse Operator + /// + public virtual RunnerDefinition Runner { get; set; } = new(); + +} diff --git a/src/operator/Synapse.Operator/Dockerfile b/src/operator/Synapse.Operator/Dockerfile new file mode 100644 index 000000000..373f400ef --- /dev/null +++ b/src/operator/Synapse.Operator/Dockerfile @@ -0,0 +1,27 @@ +FROM mcr.microsoft.com/dotnet/runtime:8.0 AS base +USER root +RUN apt-get update +RUN apt-get install -y jq +USER app +WORKDIR /app + +FROM mcr.microsoft.com/dotnet/sdk:8.0 AS build +ARG BUILD_CONFIGURATION=Release +WORKDIR /src +COPY ["src/operator/Synapse.Operator/Synapse.Operator.csproj", "src/operator/Synapse.Operator/"] +COPY ["src/core/Synapse.Core.Infrastructure/Synapse.Core.Infrastructure.csproj", "src/core/Synapse.Core.Infrastructure/"] +COPY ["src/core/Synapse.Core/Synapse.Core.csproj", "src/core/Synapse.Core/"] +COPY ["src/runtime/Synapse.Runtime.Abstractions/Synapse.Runtime.Abstractions.csproj", "src/runtime/Synapse.Runtime.Abstractions/"] +RUN dotnet restore "./src/operator/Synapse.Operator/Synapse.Operator.csproj" +COPY . . +WORKDIR "/src/src/operator/Synapse.Operator" +RUN dotnet build "./Synapse.Operator.csproj" -c $BUILD_CONFIGURATION -o /app/build + +FROM build AS publish +ARG BUILD_CONFIGURATION=Release +RUN dotnet publish "./Synapse.Operator.csproj" -c $BUILD_CONFIGURATION -o /app/publish /p:UseAppHost=false + +FROM base AS final +WORKDIR /app +COPY --from=publish /app/publish . +ENTRYPOINT ["dotnet", "Synapse.Operator.dll"] \ No newline at end of file diff --git a/src/operator/Synapse.Operator/Program.cs b/src/operator/Synapse.Operator/Program.cs new file mode 100644 index 000000000..c03bd3c99 --- /dev/null +++ b/src/operator/Synapse.Operator/Program.cs @@ -0,0 +1,57 @@ +if (args.Length != 0 && args.Contains("--debug") && !Debugger.IsAttached) Debugger.Launch(); + +var builder = Host.CreateDefaultBuilder() + .ConfigureAppConfiguration((context, config) => + { + config.AddJsonFile("appsettings.json", true, true); + config.AddJsonFile($"appsettings.{context.HostingEnvironment.EnvironmentName}.json", true, true); + config.AddEnvironmentVariables("SYNAPSE_OPERATOR"); + config.AddCommandLine(args); + config.AddKeyPerFile("/run/secrets/synapse", true, true); + }) + .ConfigureServices((context, services) => + { + services.Configure(context.Configuration); + services.AddLogging(builder => + { + builder.AddSimpleConsole(options => + { + options.TimestampFormat = "[HH:mm:ss] "; + }); + }); + services.AddSingleton(); + services.AddSynapse(context.Configuration); + services.AddSingleton(provider => + { + var configuration = new DockerClientConfiguration(); + return configuration.CreateClient(); + }); + services.AddSingleton(); + services.AddScoped(); + services.AddScoped(); + services.AddScoped(provider => + { + var options = provider.GetRequiredService>().CurrentValue; + return options.Runner.Runtime.Mode switch + { + OperatorRuntimeMode.Native => provider.GetRequiredService(), + OperatorRuntimeMode.Containerized => provider.GetRequiredService(), + _ => throw new NotSupportedException($"The specified operator runtime mode '{options.Runner.Runtime.Mode}' is not supported") + }; + }); + + services.AddScoped(); + services.AddScoped(provider => provider.GetRequiredService()); + + services.AddScoped(); + services.AddScoped>(provider => provider.GetRequiredService()); + + services.AddScoped(); + services.AddScoped>(provider => provider.GetRequiredService()); + + services.AddHostedService(); + }); + +using var app = builder.Build(); + +await app.RunAsync(); diff --git a/src/operator/Synapse.Operator/Properties/launchSettings.json b/src/operator/Synapse.Operator/Properties/launchSettings.json new file mode 100644 index 000000000..589c3fd7f --- /dev/null +++ b/src/operator/Synapse.Operator/Properties/launchSettings.json @@ -0,0 +1,13 @@ +{ + "profiles": { + "Synapse.Operator": { + "commandName": "Project", + "environmentVariables": { + "DOTNET_ENVIRONMENT": "Development" + } + }, + "Container (Dockerfile)": { + "commandName": "Docker" + } + } +} \ No newline at end of file diff --git a/src/operator/Synapse.Operator/Services/Application.cs b/src/operator/Synapse.Operator/Services/Application.cs new file mode 100644 index 000000000..71b958215 --- /dev/null +++ b/src/operator/Synapse.Operator/Services/Application.cs @@ -0,0 +1,39 @@ +namespace Synapse.Operator.Services; + +internal class Application(IServiceProvider serviceProvider) + : IHostedService, IDisposable +{ + + readonly IServiceScope _scope = serviceProvider.CreateScope(); + IServiceProvider ServiceProvider => this._scope.ServiceProvider; + + OperatorController _operatorController = null!; + WorkflowController _workflowController = null!; + WorkflowInstanceController _workflowInstanceController = null!; + + public async Task StartAsync(CancellationToken cancellationToken) + { + this._operatorController = this.ServiceProvider.GetRequiredService(); + this._workflowController = this.ServiceProvider.GetRequiredService(); + this._workflowInstanceController = this.ServiceProvider.GetRequiredService(); + await this._operatorController.StartAsync(cancellationToken).ConfigureAwait(false); + await Task.WhenAll( + [ + this._workflowController.StartAsync(cancellationToken), + this._workflowInstanceController.StartAsync(cancellationToken) + ]).ConfigureAwait(false); + } + + public async Task StopAsync(CancellationToken cancellationToken) + { + await Task.WhenAll( + [ + this._operatorController.StopAsync(cancellationToken), + this._workflowController.StopAsync(cancellationToken), + this._workflowInstanceController.StopAsync(cancellationToken) + ]).ConfigureAwait(false); + } + + void IDisposable.Dispose() => this._scope.Dispose(); + +} \ No newline at end of file diff --git a/src/operator/Synapse.Operator/Services/ApplicationUserAccessor.cs b/src/operator/Synapse.Operator/Services/ApplicationUserAccessor.cs new file mode 100644 index 000000000..c7c23c551 --- /dev/null +++ b/src/operator/Synapse.Operator/Services/ApplicationUserAccessor.cs @@ -0,0 +1,16 @@ +using Neuroglia.Security.Services; +using System.Security.Claims; + +namespace Synapse.Operator.Services; + +/// +/// Represents an implementation used to access the user that represents the executing application +/// +public class ApplicationUserAccessor + : IUserAccessor +{ + + /// + public ClaimsPrincipal? User => new(new ClaimsIdentity()); + +} diff --git a/src/operator/Synapse.Operator/Services/Interfaces/IOperatorController.cs b/src/operator/Synapse.Operator/Services/Interfaces/IOperatorController.cs new file mode 100644 index 000000000..0f23ed39a --- /dev/null +++ b/src/operator/Synapse.Operator/Services/Interfaces/IOperatorController.cs @@ -0,0 +1,15 @@ +namespace Synapse.Operator.Services; + +/// +/// Defines the fundamentals of the service used to access the current Synapse Operator +/// +public interface IOperatorController + : IHostedService +{ + + /// + /// Gets the service used to monitor the current + /// + IResourceMonitor Operator { get; } + +} \ No newline at end of file diff --git a/src/operator/Synapse.Operator.Application/Services/Interfaces/IWorkflowScheduler.cs b/src/operator/Synapse.Operator/Services/Interfaces/IWorkflowScheduler.cs similarity index 89% rename from src/operator/Synapse.Operator.Application/Services/Interfaces/IWorkflowScheduler.cs rename to src/operator/Synapse.Operator/Services/Interfaces/IWorkflowScheduler.cs index b798342cb..c4426bbb7 100644 --- a/src/operator/Synapse.Operator.Application/Services/Interfaces/IWorkflowScheduler.cs +++ b/src/operator/Synapse.Operator/Services/Interfaces/IWorkflowScheduler.cs @@ -1,4 +1,4 @@ -namespace Synapse.Operator.Application.Services; +namespace Synapse.Operator.Services; /// /// Defines the fundamentals of a service used to manage the scheduling or a workflow process @@ -13,4 +13,4 @@ public interface IWorkflowScheduler /// A new awaitable Task ScheduleAsync(CancellationToken cancellationToken = default); -} \ No newline at end of file +} diff --git a/src/operator/Synapse.Operator/Services/OperatorController.cs b/src/operator/Synapse.Operator/Services/OperatorController.cs new file mode 100644 index 000000000..01ed7913e --- /dev/null +++ b/src/operator/Synapse.Operator/Services/OperatorController.cs @@ -0,0 +1,95 @@ +using System.Net; +using System.Reactive.Linq; +using static Synapse.SynapseDefaults.EnvironmentVariables; + +namespace Synapse.Operator.Services; + +/// +/// Represents the default implementation of the interface +/// +/// The service used to manage s +/// The current +/// The current +public class OperatorController(IResourceRepository repository, IOptionsMonitor options, IOptionsMonitor runnerOptions) + : IOperatorController +{ + + /// + /// Gets the service used to manage s + /// + protected IResourceRepository Repository { get; } = repository; + + /// + /// Gets the current + /// + protected OperatorOptions Options => options.CurrentValue; + + /// + /// Gets the current + /// + protected RunnerDefinition RunnerOptions => runnerOptions.CurrentValue; + + /// + public IResourceMonitor Operator { get; protected set; } = null!; + + /// + public virtual async Task StartAsync(CancellationToken cancellationToken) + { + Resources.Operator? @operator = null; + try + { + @operator = await this.Repository.GetAsync(this.Options.Name, this.Options.Namespace, cancellationToken).ConfigureAwait(false); + } + catch (ProblemDetailsException ex) when (ex.Problem.Status == (int)HttpStatusCode.NotFound) { } + finally + { + if (@operator == null) + { + @operator = new Resources.Operator(new ResourceMetadata(this.Options.Name, this.Options.Namespace), new OperatorSpec() + { + Runner = this.Options.Runner + }); + await this.Repository.AddAsync(@operator, false, cancellationToken).ConfigureAwait(false); + } + this.Operator = await this.Repository.MonitorAsync(this.Options.Name, this.Options.Namespace, false, cancellationToken).ConfigureAwait(false); + await this.SetOperatorStatusPhaseAsync(OperatorStatusPhase.Running, cancellationToken).ConfigureAwait(false); + this.Operator.Where(e => e.Type == ResourceWatchEventType.Updated).Select(o => o.Resource.Spec).DistinctUntilChanged().Subscribe(_ => this.OnOperatorSpecChanged(), token: cancellationToken); + this.OnOperatorSpecChanged(); + } + } + + /// + /// Sets the 's status phase + /// + /// The 's status phase + /// A + /// A new awaitable + protected virtual async Task SetOperatorStatusPhaseAsync(string phase, CancellationToken cancellationToken = default) + { + if (this.Operator.Resource.Status?.Phase == phase) return; + var updatedResource = this.Operator.Resource.Clone()!; + var originalResource = this.Operator.Resource.Clone()!; + updatedResource.Status ??= new(); + updatedResource.Status.Phase = phase; + var patch = JsonPatchUtility.CreateJsonPatchFromDiff(originalResource, updatedResource); + await this.Repository.PatchStatusAsync(new(PatchType.JsonPatch, patch), updatedResource.GetName(), updatedResource.GetNamespace(), false, cancellationToken).ConfigureAwait(false); + } + + /// + /// Handles changes to the operator spec, and update the operator options accordingly + /// + protected virtual void OnOperatorSpecChanged() + { + this.Options.Runner = this.Operator.Resource.Spec.Runner; + this.RunnerOptions.Api = this.Options.Runner.Api; + this.RunnerOptions.Runtime = this.Options.Runner.Runtime; + this.RunnerOptions.Certificates = this.Options.Runner.Certificates; + } + + /// + public async Task StopAsync(CancellationToken cancellationToken) + { + await this.SetOperatorStatusPhaseAsync(OperatorStatusPhase.Stopped, cancellationToken).ConfigureAwait(false); + } + +} \ No newline at end of file diff --git a/src/operator/Synapse.Operator.Application/Services/WorkflowController.cs b/src/operator/Synapse.Operator/Services/WorkflowController.cs similarity index 78% rename from src/operator/Synapse.Operator.Application/Services/WorkflowController.cs rename to src/operator/Synapse.Operator/Services/WorkflowController.cs index 255b5f6cd..1a38fae31 100644 --- a/src/operator/Synapse.Operator.Application/Services/WorkflowController.cs +++ b/src/operator/Synapse.Operator/Services/WorkflowController.cs @@ -1,4 +1,7 @@ -namespace Synapse.Operator.Application.Services; +using Neuroglia.Reactive; +using System.Reactive.Linq; + +namespace Synapse.Operator.Services; /// /// Represents the service used to manage resources @@ -7,8 +10,9 @@ /// The service used to create s /// The service used to access the current /// The service used to manage s -/// The service used to monitor the current -public class WorkflowController(IServiceProvider serviceProvider, ILoggerFactory loggerFactory, IOptions> controllerOptions, IResourceRepository resources, IResourceMonitor @operator) +/// The current +/// The service used to access the current +public class WorkflowController(IServiceProvider serviceProvider, ILoggerFactory loggerFactory, IOptions> controllerOptions, IResourceRepository resources, IOptions operatorOptions, IOperatorController operatorAccessor) : ResourceController(loggerFactory, controllerOptions, resources) { @@ -17,16 +21,37 @@ public class WorkflowController(IServiceProvider serviceProvider, ILoggerFactory /// protected IServiceProvider ServiceProvider { get; } = serviceProvider; + /// + /// Gets the running 's options + /// + protected OperatorOptions OperatorOptions { get; } = operatorOptions.Value; + /// /// Gets the service used to monitor the current /// - protected IResourceMonitor Operator { get; } = @operator; + protected IResourceMonitor Operator => operatorAccessor.Operator; /// /// Gets a that contains current s /// protected ConcurrentDictionary Schedulers { get; } = []; + /// + public override async Task StartAsync(CancellationToken cancellationToken) + { + await base.StartAsync(cancellationToken).ConfigureAwait(false); + foreach (var workflowInstance in this.Resources.Values.ToList()) await this.OnResourceCreatedAsync(workflowInstance, cancellationToken).ConfigureAwait(false); + this.Operator!.Select(b => b.Resource.Spec.Selector).SubscribeAsync(this.OnResourceSelectorChangedAsync, cancellationToken: cancellationToken); + await this.OnResourceSelectorChangedAsync(this.Operator!.Resource.Spec.Selector).ConfigureAwait(false); + } + + /// + protected override Task ReconcileAsync(CancellationToken cancellationToken = default) + { + this.Options.LabelSelectors = this.Operator?.Resource.Spec.Selector?.Select(s => new LabelSelector(s.Key, LabelSelectionOperator.Equals, s.Value)).ToList(); + return base.ReconcileAsync(cancellationToken); + } + /// /// Creates a new for the specified workflow /// @@ -52,6 +77,7 @@ protected virtual async Task CreateSchedulerAsync(Workflow re protected virtual async Task TryClaimAsync(Workflow resource, CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(resource); + if (this.Operator == null) throw new Exception("The controller must be started before attempting any operation"); if (resource.Metadata.Labels != null && resource.Metadata.Labels.TryGetValue(SynapseDefaults.Resources.Labels.Operator, out var operatorQualifiedName) && operatorQualifiedName == this.Operator.Resource.GetQualifiedName()) return true; try { @@ -107,9 +133,9 @@ public override async Task StopAsync(CancellationToken cancellationToken) /// protected override async Task OnResourceCreatedAsync(Workflow resource, CancellationToken cancellationToken = default) { - if (!await this.TryClaimAsync(resource, cancellationToken).ConfigureAwait(false)) return; var definition = resource.Spec.Versions.GetLatest(); if (definition.Schedule == null) return; + if (!await this.TryClaimAsync(resource, cancellationToken).ConfigureAwait(false)) return; var scheduler = await this.CreateSchedulerAsync(resource, definition, cancellationToken).ConfigureAwait(false); await scheduler.ScheduleAsync(cancellationToken).ConfigureAwait(false); } @@ -136,4 +162,11 @@ protected override async Task OnResourceDeletedAsync(Workflow resource, Cancella } } + /// + /// Handles changes to the current operator's subscription selector + /// + /// A key/value mapping of the labels both workflows and workflow instances to select must define + /// A new awaitable + protected virtual Task OnResourceSelectorChangedAsync(IDictionary? selector) => this.ReconcileAsync(this.CancellationTokenSource.Token); + } diff --git a/src/operator/Synapse.Operator.Application/Services/WorkflowInstanceController.cs b/src/operator/Synapse.Operator/Services/WorkflowInstanceController.cs similarity index 80% rename from src/operator/Synapse.Operator.Application/Services/WorkflowInstanceController.cs rename to src/operator/Synapse.Operator/Services/WorkflowInstanceController.cs index 36d19825e..8919def65 100644 --- a/src/operator/Synapse.Operator.Application/Services/WorkflowInstanceController.cs +++ b/src/operator/Synapse.Operator/Services/WorkflowInstanceController.cs @@ -1,4 +1,7 @@ -namespace Synapse.Operator.Application.Services; +using Neuroglia.Reactive; +using System.Reactive.Linq; + +namespace Synapse.Operator.Services; /// /// Represents the service used to manage resources @@ -7,9 +10,8 @@ /// The service used to create s /// The service used to access the current /// The service used to manage s -/// The service used to monitor the current -/// The service used to create and run es -public class WorkflowInstanceController(IServiceProvider serviceProvider, ILoggerFactory loggerFactory, IOptions> controllerOptions, IResourceRepository repository, IResourceMonitor @operator, IWorkflowRuntime runtime) +/// The service used to access the current +public class WorkflowInstanceController(IServiceProvider serviceProvider, ILoggerFactory loggerFactory, IOptions> controllerOptions, IResourceRepository repository, IOperatorController operatorAccessor) : ResourceController(loggerFactory, controllerOptions, repository) { @@ -21,18 +23,37 @@ public class WorkflowInstanceController(IServiceProvider serviceProvider, ILogge /// /// Gets the service used to monitor the current /// - protected IResourceMonitor Operator { get; } = @operator; + protected IResourceMonitor Operator => operatorAccessor.Operator; /// /// Gets the service used to create and run es /// - protected IWorkflowRuntime Runtime { get; } = runtime; + protected IWorkflowRuntime Runtime => this.ServiceProvider.GetRequiredService(); /// /// Gets a that contains current es /// protected ConcurrentDictionary Processes { get; } = []; + /// + public override async Task StartAsync(CancellationToken cancellationToken) + { + await base.StartAsync(cancellationToken).ConfigureAwait(false); + foreach (var workflowInstance in this.Resources.Values.ToList()) + { + await this.OnResourceCreatedAsync(workflowInstance, cancellationToken).ConfigureAwait(false); + } + this.Operator!.Select(b => b.Resource.Spec.Selector).SubscribeAsync(this.OnResourceSelectorChangedAsync, cancellationToken: cancellationToken); + await this.OnResourceSelectorChangedAsync(this.Operator!.Resource.Spec.Selector).ConfigureAwait(false); + } + + /// + protected override Task ReconcileAsync(CancellationToken cancellationToken = default) + { + this.Options.LabelSelectors = this.Operator?.Resource.Spec.Selector?.Select(s => new LabelSelector(s.Key, LabelSelectionOperator.Equals, s.Value)).ToList(); + return base.ReconcileAsync(cancellationToken); + } + /// /// Creates a new for the specified workflow /// @@ -131,6 +152,13 @@ protected override async Task OnResourceDeletedAsync(WorkflowInstance workflowIn if (this.Processes.TryRemove(workflowInstance.GetQualifiedName(), out var process)) await process.DisposeAsync().ConfigureAwait(false); } + /// + /// Handles changes to the current operator's subscription selector + /// + /// A key/value mapping of the labels both workflows and workflow instances to select must define + /// A new awaitable + protected virtual Task OnResourceSelectorChangedAsync(IDictionary? selector) => this.ReconcileAsync(this.CancellationTokenSource.Token); + /// protected override async ValueTask DisposeAsync(bool disposing) { diff --git a/src/operator/Synapse.Operator.Application/Services/WorkflowScheduler.cs b/src/operator/Synapse.Operator/Services/WorkflowScheduler.cs similarity index 98% rename from src/operator/Synapse.Operator.Application/Services/WorkflowScheduler.cs rename to src/operator/Synapse.Operator/Services/WorkflowScheduler.cs index 99c998d60..af5c21f0e 100644 --- a/src/operator/Synapse.Operator.Application/Services/WorkflowScheduler.cs +++ b/src/operator/Synapse.Operator/Services/WorkflowScheduler.cs @@ -1,4 +1,4 @@ -namespace Synapse.Operator.Application.Services; +namespace Synapse.Operator.Services; /// /// Represents the service used to manage the scheduling and execution of a specific diff --git a/src/operator/Synapse.Operator/Synapse.Operator.csproj b/src/operator/Synapse.Operator/Synapse.Operator.csproj new file mode 100644 index 000000000..3ed212659 --- /dev/null +++ b/src/operator/Synapse.Operator/Synapse.Operator.csproj @@ -0,0 +1,47 @@ + + + + net8.0 + enable + enable + en + True + Exe + ghcr.io/serverlessworkflow/synapse/operator + Linux + ..\..\.. + + + + + + + + + + PreserveNewest + true + PreserveNewest + + + PreserveNewest + true + PreserveNewest + + + + + + + + + + + + + + + + + + diff --git a/src/operator/Synapse.Operator/Usings.cs b/src/operator/Synapse.Operator/Usings.cs new file mode 100644 index 000000000..857075e06 --- /dev/null +++ b/src/operator/Synapse.Operator/Usings.cs @@ -0,0 +1,24 @@ +global using Docker.DotNet; +global using Microsoft.Extensions.Configuration; +global using Microsoft.Extensions.DependencyInjection; +global using Microsoft.Extensions.DependencyInjection.Extensions; +global using Microsoft.Extensions.Hosting; +global using Microsoft.Extensions.Logging; +global using Microsoft.Extensions.Options; +global using Neuroglia; +global using Neuroglia.Data; +global using Neuroglia.Data.Infrastructure.ResourceOriented; +global using Neuroglia.Data.Infrastructure.ResourceOriented.Configuration; +global using Neuroglia.Data.Infrastructure.ResourceOriented.Services; +global using Neuroglia.Security.Services; +global using ServerlessWorkflow.Sdk.Models; +global using Synapse; +global using Synapse.Core.Infrastructure.Containers; +global using Synapse.Core.Infrastructure.Services; +global using Synapse.Operator; +global using Synapse.Operator.Configuration; +global using Synapse.Operator.Services; +global using Synapse.Resources; +global using Synapse.Runtime.Services; +global using System.Collections.Concurrent; +global using System.Diagnostics; diff --git a/src/operator/Synapse.Operator/appsettings.Development.json b/src/operator/Synapse.Operator/appsettings.Development.json new file mode 100644 index 000000000..4f95f7ede --- /dev/null +++ b/src/operator/Synapse.Operator/appsettings.Development.json @@ -0,0 +1,18 @@ +{ + "ConnectionStrings": { + "redis": "localhost:6379" + }, + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft.AspNetCore": "Warning" + } + }, + "Namespace": "default", + "Name": "operator-1", + "Runner": { + "Api": { + "Uri": "http://localhost:5257" + } + } +} diff --git a/src/operator/Synapse.Operator/appsettings.json b/src/operator/Synapse.Operator/appsettings.json new file mode 100644 index 000000000..10f68b8c8 --- /dev/null +++ b/src/operator/Synapse.Operator/appsettings.json @@ -0,0 +1,9 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft.AspNetCore": "Warning" + } + }, + "AllowedHosts": "*" +} diff --git a/src/runner/Synapse.Runner/Configuration/RunnerOptions.cs b/src/runner/Synapse.Runner/Configuration/RunnerOptions.cs index d2ad909cc..bc480a75c 100644 --- a/src/runner/Synapse.Runner/Configuration/RunnerOptions.cs +++ b/src/runner/Synapse.Runner/Configuration/RunnerOptions.cs @@ -1,4 +1,6 @@ -namespace Synapse.Runner.Configuration; +using Synapse.Api.Client.Http.Configuration; + +namespace Synapse.Runner.Configuration; /// /// Represents the options used to configure a Synapse Runner application @@ -6,9 +8,14 @@ public class RunnerOptions { + /// + /// Gets/sets the options used to configure the Synapse API the runner must use + /// + public virtual SynapseHttpApiClientOptions Api { get; set; } = new(); + /// /// Gets/sets the options used to configure the workflow the Synapse Runner must run and how /// - public required virtual WorkflowOptions Workflow { get; set; } + public virtual WorkflowOptions Workflow { get; set; } = new(); } diff --git a/src/runner/Synapse.Runner/Program.cs b/src/runner/Synapse.Runner/Program.cs index ffdb15160..a1b7cb9b3 100644 --- a/src/runner/Synapse.Runner/Program.cs +++ b/src/runner/Synapse.Runner/Program.cs @@ -1,22 +1,19 @@ -using Microsoft.Extensions.Configuration; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Hosting; -using Synapse.Api.Client; -using Synapse.Runner.Services; -using System.Diagnostics; - -if (args.Length != 0 && args.Contains("--debug") && !Debugger.IsAttached) Debugger.Launch(); +if (args.Length != 0 && args.Contains("--debug") && !Debugger.IsAttached) Debugger.Launch(); var builder = Host.CreateDefaultBuilder() - .ConfigureAppConfiguration(config => - { - config.AddJsonFile("appsettings.json", true, true); - config.AddEnvironmentVariables("SYNAPSE"); - config.AddCommandLine(args); - config.AddKeyPerFile("/run/secrets/synapse", true, true); - }) + .ConfigureAppConfiguration((context, config) => + { + config.AddJsonFile("appsettings.json", true, true); + config.AddJsonFile($"appsettings.{context.HostingEnvironment.EnvironmentName}.json", true, true); + config.AddEnvironmentVariables("SYNAPSE"); + config.AddCommandLine(args); + config.AddKeyPerFile("/run/secrets/synapse", true, true); + }) .ConfigureServices((context, services) => { + var options = new RunnerOptions(); + context.Configuration.Bind(options); + services.Configure(context.Configuration); services.AddLogging(builder => { builder.AddSimpleConsole(options => @@ -24,7 +21,12 @@ options.TimestampFormat = "[HH:mm:ss] "; }); }); - services.AddSynapseHttpApiClient(http => { }); + services.AddJsonSerializer(); + services.AddYamlDotNetSerializer(); + services.AddSynapseHttpApiClient(http => + { + http.BaseAddress = options.Api.BaseAddress; + }); services.AddHostedService(); }); diff --git a/src/runner/Synapse.Runner/Services/Executors/ShellProcessExecutor.cs b/src/runner/Synapse.Runner/Services/Executors/ShellProcessExecutor.cs index 2675f56cb..f1de567fc 100644 --- a/src/runner/Synapse.Runner/Services/Executors/ShellProcessExecutor.cs +++ b/src/runner/Synapse.Runner/Services/Executors/ShellProcessExecutor.cs @@ -1,5 +1,4 @@ -using System.Diagnostics; -using System.Net; +using System.Net; namespace Synapse.Runner.Services.Executors; diff --git a/src/runner/Synapse.Runner/Services/WorkflowExecutorInitializer.cs b/src/runner/Synapse.Runner/Services/WorkflowExecutorInitializer.cs index 5b01b0aed..2ce6a8c4b 100644 --- a/src/runner/Synapse.Runner/Services/WorkflowExecutorInitializer.cs +++ b/src/runner/Synapse.Runner/Services/WorkflowExecutorInitializer.cs @@ -1,9 +1,6 @@ -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Hosting; -using Microsoft.Extensions.Options; +using Microsoft.Extensions.Options; using Neuroglia.Data.Expressions.Services; using Synapse.Api.Client.Services; -using Synapse.Runner.Configuration; namespace Synapse.Runner.Services; diff --git a/src/runner/Synapse.Runner/Synapse.Runner.csproj b/src/runner/Synapse.Runner/Synapse.Runner.csproj index d725f4005..57e5adad9 100644 --- a/src/runner/Synapse.Runner/Synapse.Runner.csproj +++ b/src/runner/Synapse.Runner/Synapse.Runner.csproj @@ -12,6 +12,24 @@ ..\..\.. + + + + + + + + PreserveNewest + true + PreserveNewest + + + PreserveNewest + true + PreserveNewest + + + diff --git a/src/runner/Synapse.Runner/Usings.cs b/src/runner/Synapse.Runner/Usings.cs index 700b82066..a0cb0f4fe 100644 --- a/src/runner/Synapse.Runner/Usings.cs +++ b/src/runner/Synapse.Runner/Usings.cs @@ -11,13 +11,19 @@ // See the License for the specific language governing permissions and // limitations under the License. -global using Synapse; -global using Synapse.Core.Infrastructure.Services; -global using Synapse.Resources; +global using Microsoft.Extensions.Configuration; +global using Microsoft.Extensions.DependencyInjection; +global using Microsoft.Extensions.Hosting; +global using Microsoft.Extensions.Logging; +global using Neuroglia.Serialization; global using ServerlessWorkflow.Sdk; global using ServerlessWorkflow.Sdk.Models; global using ServerlessWorkflow.Sdk.Models.Calls; global using ServerlessWorkflow.Sdk.Models.Processes; global using ServerlessWorkflow.Sdk.Models.Tasks; -global using Microsoft.Extensions.Logging; -global using Neuroglia.Serialization; +global using Synapse.Api.Client; +global using Synapse.Core.Infrastructure.Services; +global using Synapse.Resources; +global using Synapse.Runner.Configuration; +global using Synapse.Runner.Services; +global using System.Diagnostics; \ No newline at end of file diff --git a/src/runner/Synapse.Runner/appsettings.Development.json b/src/runner/Synapse.Runner/appsettings.Development.json new file mode 100644 index 000000000..bd4e0fb91 --- /dev/null +++ b/src/runner/Synapse.Runner/appsettings.Development.json @@ -0,0 +1,9 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft.AspNetCore": "Warning" + } + }, + "ServeDashboard": true +} diff --git a/src/runner/Synapse.Runner/appsettings.json b/src/runner/Synapse.Runner/appsettings.json new file mode 100644 index 000000000..10f68b8c8 --- /dev/null +++ b/src/runner/Synapse.Runner/appsettings.json @@ -0,0 +1,9 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft.AspNetCore": "Warning" + } + }, + "AllowedHosts": "*" +} diff --git a/src/runtime/Synapse.Runtime.Abstractions/Configuration/ApiOptions.cs b/src/runtime/Synapse.Runtime.Abstractions/Configuration/ApiOptions.cs deleted file mode 100644 index 025e964f4..000000000 --- a/src/runtime/Synapse.Runtime.Abstractions/Configuration/ApiOptions.cs +++ /dev/null @@ -1,14 +0,0 @@ -namespace Synapse.Runtime.Configuration; - -/// -/// Represents the options used to configure the Synapse API to use -/// -public class ApiOptions -{ - - /// - /// Gets/sets the URI that references the Synapse API to use - /// - public required Uri Uri { get; set; } - -} diff --git a/src/runtime/Synapse.Runtime.Abstractions/Configuration/RuntimeOptions.cs b/src/runtime/Synapse.Runtime.Abstractions/Configuration/RuntimeOptions.cs deleted file mode 100644 index 7f1d4e015..000000000 --- a/src/runtime/Synapse.Runtime.Abstractions/Configuration/RuntimeOptions.cs +++ /dev/null @@ -1,19 +0,0 @@ -namespace Synapse.Runtime.Configuration; - -/// -/// Represents the base class for all runtime options -/// -public abstract class RuntimeOptions -{ - - /// - /// Gets/sets the options used to configure the API used by processes created by the configured runtime - /// - public required ApiOptions Api { get; set; } - - /// - /// Gets/sets a boolean indicating whether or not to skip certificate validation - /// - public bool SkipCertificateValidation { get; set; } - -} diff --git a/src/runtime/Synapse.Runtime.Abstractions/Services/Interfaces/IWorkflowProcess.cs b/src/runtime/Synapse.Runtime.Abstractions/Services/Interfaces/IWorkflowProcess.cs index bd5056152..0b608f61d 100644 --- a/src/runtime/Synapse.Runtime.Abstractions/Services/Interfaces/IWorkflowProcess.cs +++ b/src/runtime/Synapse.Runtime.Abstractions/Services/Interfaces/IWorkflowProcess.cs @@ -20,12 +20,12 @@ public interface IWorkflowProcess /// /// Gets an used to observe the 's STDOUT /// - IObservable StandardOutput { get; } + IObservable? StandardOutput { get; } /// /// Gets an used to observe the 's STDERR /// - IObservable StandardError { get; } + IObservable? StandardError { get; } /// /// Gets the 's exit code diff --git a/src/runtime/Synapse.Runtime.Abstractions/Services/WorkflowProcessBase.cs b/src/runtime/Synapse.Runtime.Abstractions/Services/WorkflowProcessBase.cs index 9c42412ae..ec40f6775 100644 --- a/src/runtime/Synapse.Runtime.Abstractions/Services/WorkflowProcessBase.cs +++ b/src/runtime/Synapse.Runtime.Abstractions/Services/WorkflowProcessBase.cs @@ -16,10 +16,10 @@ public abstract class WorkflowProcessBase public abstract string Id { get; } /// - public abstract IObservable StandardOutput { get; } + public abstract IObservable? StandardOutput { get; } /// - public abstract IObservable StandardError { get; } + public abstract IObservable? StandardError { get; } /// public abstract long? ExitCode { get; } diff --git a/src/runtime/Synapse.Runtime.Containerized/Services/ContainerProcess.cs b/src/runtime/Synapse.Runtime.Containerized/Services/ContainerProcess.cs new file mode 100644 index 000000000..b25f5b82c --- /dev/null +++ b/src/runtime/Synapse.Runtime.Containerized/Services/ContainerProcess.cs @@ -0,0 +1,88 @@ +using Synapse.Core.Infrastructure.Services; +using System.Reactive.Linq; + +namespace Synapse.Runtime.Services; + +/// +/// Represents the container implementation of the +/// + +public class ContainerProcess + : WorkflowProcessBase +{ + + /// + /// Initializes a new + /// + /// The underlying + public ContainerProcess(IContainer container) + { + ArgumentNullException.ThrowIfNull(container); + this.Id = Guid.NewGuid().ToString(); + this.Container = container; + } + + /// + public override string Id { get; } + + /// + /// Gets the underlying + /// + protected IContainer Container { get; } + + IObservable? _standardOutput; + /// + public override IObservable? StandardOutput => this._standardOutput; + + IObservable? _standardError; + /// + public override IObservable? StandardError => this._standardError; + + long? _exitCode; + /// + public override long? ExitCode => this._exitCode; + + /// + public override async Task StartAsync(CancellationToken cancellationToken = default) + { + await this.Container.StartAsync(cancellationToken).ConfigureAwait(false); + this._standardOutput = Observable.FromAsync(async () => (await this.Container.StandardOutput!.ReadLineAsync(cancellationToken).ConfigureAwait(false))!).Repeat().TakeWhile(line => line != null); + this._standardError = Observable.FromAsync(async () => (await this.Container.StandardError!.ReadLineAsync(cancellationToken).ConfigureAwait(false))!).Repeat().TakeWhile(line => line != null); + _ = this.WaitForExitAsync(cancellationToken).ConfigureAwait(false); + } + + /// + /// Waits for the container to complete + /// + /// A + /// A new awaitable + protected virtual async Task WaitForExitAsync(CancellationToken cancellationToken) + { + while (!cancellationToken.IsCancellationRequested) + { + await this.Container.WaitForExitAsync(cancellationToken); + this._exitCode = this.Container.ExitCode; + this.OnExited(); + } + } + + /// + public override async Task StopAsync(CancellationToken cancellationToken = default) + { + await this.Container.StopAsync(cancellationToken).ConfigureAwait(false); + } + + /// + protected override async ValueTask DisposeAsync(bool disposing) + { + await this.Container.DisposeAsync().ConfigureAwait(false); + await base.DisposeAsync(disposing).ConfigureAwait(false); + } + + /// + protected override void Dispose(bool disposing) + { + this.Container.Dispose(); + base.Dispose(disposing); + } +} \ No newline at end of file diff --git a/src/runtime/Synapse.Runtime.Containerized/Services/ContainerRuntime.cs b/src/runtime/Synapse.Runtime.Containerized/Services/ContainerRuntime.cs new file mode 100644 index 000000000..30b066db7 --- /dev/null +++ b/src/runtime/Synapse.Runtime.Containerized/Services/ContainerRuntime.cs @@ -0,0 +1,70 @@ +using Synapse.Core.Infrastructure.Services; + +namespace Synapse.Runtime.Services; + +/// +/// Represents the native implementation of the +/// +/// +/// Initializes a new +/// +/// The service used to create s +/// The current +/// The service used to access the current +/// The service used to manage containers +public class ContainerRuntime(ILoggerFactory loggerFactory, IHostEnvironment environment, IContainerPlatform containerPlatform, IOptionsMonitor options) + : WorkflowRuntimeBase(loggerFactory) +{ + + /// + /// Gets the current + /// + protected IHostEnvironment Environment { get; } = environment; + + /// + /// Gets the service used to manage containers + /// + protected IContainerPlatform ContainerPlatform { get; } = containerPlatform; + + /// + /// Gets the current + /// + protected RunnerDefinition Options => options.CurrentValue; + + /// + /// Gets a containing all known runner processes + /// + protected ConcurrentDictionary Processes { get; } = new(); + + /// + public override async Task CreateProcessAsync(Workflow workflow, WorkflowInstance workflowInstance, CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(workflow); + ArgumentNullException.ThrowIfNull(workflowInstance); + var containerDefinition = this.Options.Runtime.Container!.Clone()!; + containerDefinition.Environment ??= []; + containerDefinition.Environment[SynapseDefaults.EnvironmentVariables.Api.Uri] = this.Options.Api.Uri.OriginalString; + containerDefinition.Environment[SynapseDefaults.EnvironmentVariables.Workflow.Instance] = workflowInstance.GetQualifiedName(); + var container = await this.ContainerPlatform.CreateAsync(containerDefinition, cancellationToken).ConfigureAwait(false); + return this.Processes.AddOrUpdate(workflowInstance.GetQualifiedName(), new ContainerProcess(container), (key, current) => current); + } + + /// + protected override async ValueTask DisposeAsync(bool disposing) + { + if (!disposing) return; + foreach (var process in this.Processes) process.Value.Dispose(); + this.Processes.Clear(); + await base.DisposeAsync(disposing).ConfigureAwait(false); + } + + /// + protected override void Dispose(bool disposing) + { + if (!disposing) return; + foreach (var process in this.Processes) process.Value.Dispose(); + this.Processes.Clear(); + base.Dispose(disposing); + } + +} diff --git a/src/runtime/Synapse.Runtime.Containerized/Usings.cs b/src/runtime/Synapse.Runtime.Containerized/Usings.cs new file mode 100644 index 000000000..754daa625 --- /dev/null +++ b/src/runtime/Synapse.Runtime.Containerized/Usings.cs @@ -0,0 +1,6 @@ +global using Microsoft.Extensions.Hosting; +global using Microsoft.Extensions.Logging; +global using Microsoft.Extensions.Options; +global using Neuroglia.Data.Infrastructure.ResourceOriented; +global using Synapse.Resources; +global using System.Collections.Concurrent; diff --git a/src/runtime/Synapse.Runtime.Native/Configuration/NativeRuntimeOptions.cs b/src/runtime/Synapse.Runtime.Native/Configuration/NativeRuntimeOptions.cs deleted file mode 100644 index 34848b582..000000000 --- a/src/runtime/Synapse.Runtime.Native/Configuration/NativeRuntimeOptions.cs +++ /dev/null @@ -1,37 +0,0 @@ -namespace Synapse.Runtime.Configuration; - -/// -/// Represents the options used to configure a Synapse Docker-based runtime -/// -public class NativeRuntimeOptions - : RuntimeOptions -{ - - /// - /// Gets the name of the default runner file - /// - public const string DefaultRunnerExecutable = "Synapse.Runner"; - - /// - /// Gets/sets the name of the worker executable file to run - /// - public virtual string RunnerExecutable { get; set; } = DefaultRunnerExecutable; - - /// - /// Gets/sets the directory in which to run the worker process - /// - public virtual string WorkingDirectory { get; set; } = Path.Combine(AppContext.BaseDirectory, "bin", "runner"); - - /// - /// Gets the full name of the worker file to run - /// - /// The full name of the worker file to run - public virtual string GetWorkerFileName() - { - var directory = this.WorkingDirectory; - var fileName = this.RunnerExecutable; - if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) fileName += ".exe"; - return Path.Combine(directory, fileName); - } - -} diff --git a/src/runtime/Synapse.Runtime.Native/Services/NativeProcess.cs b/src/runtime/Synapse.Runtime.Native/Services/NativeProcess.cs index 2afeab696..6fd0bbcbd 100644 --- a/src/runtime/Synapse.Runtime.Native/Services/NativeProcess.cs +++ b/src/runtime/Synapse.Runtime.Native/Services/NativeProcess.cs @@ -54,4 +54,18 @@ public override Task StopAsync(CancellationToken cancellationToken = default) return Task.CompletedTask; } + /// + protected override async ValueTask DisposeAsync(bool disposing) + { + this.Process.Dispose(); + await base.DisposeAsync(disposing).ConfigureAwait(false); + } + + /// + protected override void Dispose(bool disposing) + { + this.Process.Dispose(); + base.Dispose(disposing); + } + } \ No newline at end of file diff --git a/src/runtime/Synapse.Runtime.Native/Services/NativeRuntime.cs b/src/runtime/Synapse.Runtime.Native/Services/NativeRuntime.cs index 7a21d16bd..ac98a1ec4 100644 --- a/src/runtime/Synapse.Runtime.Native/Services/NativeRuntime.cs +++ b/src/runtime/Synapse.Runtime.Native/Services/NativeRuntime.cs @@ -11,8 +11,8 @@ namespace Synapse.Runtime.Services; /// The service used to create s /// The current /// The service used to create s -/// The service used to access the current -public class NativeRuntime(ILoggerFactory loggerFactory, IHostEnvironment environment, IHttpClientFactory httpClientFactory, IOptions options) +/// The service used to access the current +public class NativeRuntime(ILoggerFactory loggerFactory, IHostEnvironment environment, IHttpClientFactory httpClientFactory, IOptionsMonitor options) : WorkflowRuntimeBase(loggerFactory) { @@ -27,55 +27,29 @@ public class NativeRuntime(ILoggerFactory loggerFactory, IHostEnvironment enviro protected HttpClient HttpClient { get; } = httpClientFactory.CreateClient(); /// - /// Gets the current + /// Gets the current /// - protected NativeRuntimeOptions Options { get; } = options.Value; + protected RunnerDefinition Options => options.CurrentValue; /// /// Gets a containing all known worker processes /// - protected ConcurrentDictionary Processes { get; } = new(); - - /// - /// Downloads and installs the worker binaries, if not already present - /// - /// A - /// A new awaitable - protected virtual async Task InstallWorkerAsync(CancellationToken cancellationToken) - { - this.Logger.LogInformation("Downloading the Runner application..."); - var workerDirectory = new DirectoryInfo(this.Options.WorkingDirectory); - if (!workerDirectory.Exists) workerDirectory.Create(); - if (File.Exists(this.Options.GetWorkerFileName())) - { - this.Logger.LogInformation("Runner application already present locally. Skipping download."); //todo: config based: the user might want to get latest every time - return; - } - string? target; - if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) target = "win-x64.zip"; - else if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux)) target = "linux-x64.tar.gz"; - else if (RuntimeInformation.IsOSPlatform(OSPlatform.OSX)) target = "osx-x64.tar.gz"; - else throw new PlatformNotSupportedException(); - using var packageStream = await this.HttpClient.GetStreamAsync($"https://github.com/serverlessworkflow/synapse/releases/download/v{typeof(NativeRuntime).Assembly.GetName().Version!.ToString(3)!}/synapse-runner-{target}", cancellationToken); //todo: config based - using ZipArchive archive = new(packageStream, ZipArchiveMode.Read); - this.Logger.LogInformation("Runner application successfully downloaded. Extracting..."); - archive.ExtractToDirectory(workerDirectory.FullName, true); - this.Logger.LogInformation("Runner application successfully extracted"); - } + protected ConcurrentDictionary Processes { get; } = new(); /// public override Task CreateProcessAsync(Workflow workflow, WorkflowInstance workflowInstance, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(workflow); ArgumentNullException.ThrowIfNull(workflowInstance); - var fileName = this.Options.GetWorkerFileName(); + if (this.Options.Runtime.Native == null) throw new NullReferenceException("The native runtime must be configured"); + var fileName = this.Options.Runtime.Native.Executable; var args = string.Empty; if (this.Environment.IsDevelopment()) args += "--debug"; var startInfo = new ProcessStartInfo() { FileName = fileName, Arguments = args, - WorkingDirectory = this.Options.WorkingDirectory, + WorkingDirectory = this.Options.Runtime.Native.Directory, RedirectStandardOutput = true, RedirectStandardError = true, CreateNoWindow = true, @@ -83,13 +57,13 @@ public override Task CreateProcessAsync(Workflow workflow, Wor }; startInfo.Environment.Add(SynapseDefaults.EnvironmentVariables.Api.Uri, this.Options.Api.Uri.OriginalString); startInfo.Environment.Add(SynapseDefaults.EnvironmentVariables.Workflow.Instance, workflowInstance.GetQualifiedName()); - if (this.Options.SkipCertificateValidation) startInfo.Environment.Add(SynapseDefaults.EnvironmentVariables.SkipCertificateValidation, "true"); + if (!this.Options.Certificates.Validate) startInfo.Environment.Add(SynapseDefaults.EnvironmentVariables.SkipCertificateValidation, "true"); var process = new Process() { StartInfo = startInfo, EnableRaisingEvents = true }; - return Task.FromResult(new NativeProcess(process)); + return Task.FromResult(this.Processes.AddOrUpdate(workflowInstance.GetQualifiedName(), new NativeProcess(process), (key, current) => current)); } /// diff --git a/src/runtime/Synapse.Runtime.Native/Usings.cs b/src/runtime/Synapse.Runtime.Native/Usings.cs index 3b76ef190..6f8399581 100644 --- a/src/runtime/Synapse.Runtime.Native/Usings.cs +++ b/src/runtime/Synapse.Runtime.Native/Usings.cs @@ -2,9 +2,6 @@ global using Microsoft.Extensions.Logging; global using Microsoft.Extensions.Options; global using Synapse.Resources; -global using Synapse.Runtime.Configuration; global using System.Collections.Concurrent; global using System.Diagnostics; -global using System.IO.Compression; global using System.Reactive.Linq; -global using System.Runtime.InteropServices;