Skip to content

Commit

Permalink
feat(Runner): Continued development of the Synapse Runner application
Browse files Browse the repository at this point in the history
Signed-off-by: Charles d'Avernas <[email protected]>
  • Loading branch information
cdavernas committed May 29, 2024
1 parent a0f29de commit 6cdf503
Show file tree
Hide file tree
Showing 75 changed files with 1,743 additions and 515 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -231,3 +231,5 @@ _Pvt_Extensions
# FAKE - F# Make
.fake/
/deployment/docker-compose/secrets/basic.json
/src/dashboard/Synapse.Dashboard/wwwroot/lib/bootstrap/node_modules
/src/dashboard/Synapse.Dashboard/wwwroot/lib/bootstrap/package-lock.json
14 changes: 7 additions & 7 deletions Synapse.sln
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Synapse.UnitTests", "tests\
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Synapse.Api.Client.Core", "src\api\Synapse.Api.Client.Core\Synapse.Api.Client.Core.csproj", "{3E650D2D-B018-4056-8922-FC7DF68233B1}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Synapse.Operator.Application", "src\operator\Synapse.Operator.Application\Synapse.Operator.Application.csproj", "{7627C036-B55A-479C-AB0D-C859D4A11238}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "runtime", "runtime", "{175CE1C5-FE17-4C8B-8823-E812BAD4E527}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Synapse.Runtime.Abstractions", "src\runtime\Synapse.Runtime.Abstractions\Synapse.Runtime.Abstractions.csproj", "{A0E5E7F2-8C9C-4F36-B3FD-C09074893023}"
Expand All @@ -57,6 +55,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Synapse.Runner", "src\runne
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Synapse.Dashboard.StateManagement", "src\dashboard\Synapse.Dashboard.StateManagement\Synapse.Dashboard.StateManagement.csproj", "{91EF9F64-4997-407C-B353-C26B1421D0FB}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Synapse.Operator", "src\operator\Synapse.Operator\Synapse.Operator.csproj", "{A9085F4A-5FDF-4F4A-B267-A03BC5E0FDB0}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -107,10 +107,6 @@ Global
{3E650D2D-B018-4056-8922-FC7DF68233B1}.Debug|Any CPU.Build.0 = Debug|Any CPU
{3E650D2D-B018-4056-8922-FC7DF68233B1}.Release|Any CPU.ActiveCfg = Release|Any CPU
{3E650D2D-B018-4056-8922-FC7DF68233B1}.Release|Any CPU.Build.0 = Release|Any CPU
{7627C036-B55A-479C-AB0D-C859D4A11238}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{7627C036-B55A-479C-AB0D-C859D4A11238}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7627C036-B55A-479C-AB0D-C859D4A11238}.Release|Any CPU.ActiveCfg = Release|Any CPU
{7627C036-B55A-479C-AB0D-C859D4A11238}.Release|Any CPU.Build.0 = Release|Any CPU
{A0E5E7F2-8C9C-4F36-B3FD-C09074893023}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A0E5E7F2-8C9C-4F36-B3FD-C09074893023}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A0E5E7F2-8C9C-4F36-B3FD-C09074893023}.Release|Any CPU.ActiveCfg = Release|Any CPU
Expand All @@ -131,6 +127,10 @@ Global
{91EF9F64-4997-407C-B353-C26B1421D0FB}.Debug|Any CPU.Build.0 = Debug|Any CPU
{91EF9F64-4997-407C-B353-C26B1421D0FB}.Release|Any CPU.ActiveCfg = Release|Any CPU
{91EF9F64-4997-407C-B353-C26B1421D0FB}.Release|Any CPU.Build.0 = Release|Any CPU
{A9085F4A-5FDF-4F4A-B267-A03BC5E0FDB0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A9085F4A-5FDF-4F4A-B267-A03BC5E0FDB0}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A9085F4A-5FDF-4F4A-B267-A03BC5E0FDB0}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A9085F4A-5FDF-4F4A-B267-A03BC5E0FDB0}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -153,13 +153,13 @@ Global
{6F7ED286-A02C-4E78-9FE3-2689BD94B97C} = {F041A1CB-45FA-4432-BAD2-DE2EE174C6FC}
{CB1DF439-BFBA-408D-BC09-3FF94E6C4F0F} = {F041A1CB-45FA-4432-BAD2-DE2EE174C6FC}
{3E650D2D-B018-4056-8922-FC7DF68233B1} = {63715FC0-736D-4972-A865-41126155DF45}
{7627C036-B55A-479C-AB0D-C859D4A11238} = {32EAD165-3D99-42CD-B3AF-05136DCC7F35}
{175CE1C5-FE17-4C8B-8823-E812BAD4E527} = {4B9AF05C-9D6D-48C0-994D-D4A5C28FA24D}
{A0E5E7F2-8C9C-4F36-B3FD-C09074893023} = {175CE1C5-FE17-4C8B-8823-E812BAD4E527}
{DC24E506-602F-4FD9-B8C0-CEA6B2AD8888} = {175CE1C5-FE17-4C8B-8823-E812BAD4E527}
{F327B8F1-9A13-4924-AE1B-E69788AC73E7} = {175CE1C5-FE17-4C8B-8823-E812BAD4E527}
{E5FAA9BA-07C3-49CF-AD3B-897AE1D0B018} = {1DA47E5F-B23A-4D3C-96AA-4BD2662AB946}
{91EF9F64-4997-407C-B353-C26B1421D0FB} = {7DF998B8-0FB1-470E-8ED0-EA1CC7B16901}
{A9085F4A-5FDF-4F4A-B267-A03BC5E0FDB0} = {32EAD165-3D99-42CD-B3AF-05136DCC7F35}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {2A6C03D6-355A-4B39-9F2B-D0FDE429C0E2}
Expand Down
5 changes: 5 additions & 0 deletions src/api/Synapse.Api.Client.Core/Services/ISynapseApiClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ public interface ISynapseApiClient
/// </summary>
IClusterResourceApiClient<Namespace> Namespaces { get; }

/// <summary>
/// Gets the Synapse API used to manage <see cref="Operator"/>s
/// </summary>
INamespacedResourceApiClient<Operator> Operators { get; }

/// <summary>
/// Gets the Synapse API used to manage <see cref="Workflow"/>s
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,18 @@
public class SynapseHttpApiClientOptions
{

/// <summary>
/// Initializes a new <see cref="SynapseHttpApiClientOptions"/>
/// </summary>
public SynapseHttpApiClientOptions()
{
var uri = Environment.GetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Api.Uri);
this.BaseAddress = string.IsNullOrWhiteSpace(uri) ? null! : new(uri, UriKind.RelativeOrAbsolute);
}

/// <summary>
/// Gets/sets the base address of the Cloud Streams API to connect to
/// </summary>
public required virtual Uri BaseAddress { get; set; }
public virtual Uri BaseAddress { get; set; }

}
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,10 @@ public virtual async Task<ResourceDefinition> GetDefinitionAsync(CancellationTok
}

/// <inheritdoc/>
public virtual async Task<IAsyncEnumerable<TResource>> ListAsync(string @namespace, IEnumerable<LabelSelector>? labelSelectors = null, CancellationToken cancellationToken = default)
public virtual async Task<IAsyncEnumerable<TResource>> ListAsync(string? @namespace = null, IEnumerable<LabelSelector>? labelSelectors = null, CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(@namespace);
var resource = new TResource();
var uri = $"/api/{resource.Definition.Version}/{resource.Definition.Plural}/{@namespace}";
var uri = string.IsNullOrWhiteSpace(@namespace) ? $"/api/{resource.Definition.Version}/{resource.Definition.Plural}" : $"/api/{resource.Definition.Version}/{resource.Definition.Plural}/{@namespace}";
var queryStringArguments = new Dictionary<string, string>();
if (labelSelectors?.Any() == true) queryStringArguments.Add(nameof(labelSelectors), labelSelectors.Select(s => s.ToString()).Join(','));
if (queryStringArguments.Count != 0) uri += $"?{queryStringArguments.Select(kvp => $"{kvp.Key}={kvp.Value}").Join('&')}";
Expand Down Expand Up @@ -228,7 +227,7 @@ public virtual async Task DeleteAsync(string name, string @namespace, Cancellati
ArgumentException.ThrowIfNullOrWhiteSpace(name);
ArgumentException.ThrowIfNullOrWhiteSpace(@namespace);
var resource = new TResource();
var uri = $"/api/{resource.Definition.Version}/{@namespace}/{resource.Definition.Plural}/{name}";
var uri = $"/api/{resource.Definition.Version}/{resource.Definition.Plural}/{@namespace}/{name}";
using var request = await this.ProcessRequestAsync(new HttpRequestMessage(HttpMethod.Delete, uri), cancellationToken).ConfigureAwait(false);
await ProcessResponseAsync(await this.HttpClient.SendAsync(request, cancellationToken).ConfigureAwait(false), cancellationToken).ConfigureAwait(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ public SynapseHttpApiClient(IServiceProvider serviceProvider, ILoggerFactory log
/// </summary>
protected HttpClient HttpClient { get; }

/// <inheritdoc/>
public INamespacedResourceApiClient<Operator> Operators { get; private set; } = null!;

/// <inheritdoc/>
public IClusterResourceApiClient<Namespace> Namespaces { get; private set; } = null!;

Expand Down
16 changes: 16 additions & 0 deletions src/api/Synapse.Api.Http/Controllers/OperatorsController.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using Synapse.Resources;

namespace Synapse.Api.Http.Controllers;

/// <summary>
/// Represents the <see cref="NamespacedResourceController{TResource}"/> used to manage <see cref="Operator"/>s
/// </summary>
/// <param name="mediator">The service used to mediate calls</param>
[Route("api/v1/operators")]
public class OperatorsController(IMediator mediator)
: NamespacedResourceController<Operator>(mediator)
{



}
32 changes: 32 additions & 0 deletions src/api/Synapse.Api.Http/NamespacedResourceController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,21 @@ public async Task<IActionResult> GetResource(string name, string @namespace, Can
return this.Process(await this.Mediator.ExecuteAsync(new GetResourceQuery<TResource>(name, @namespace), cancellationToken).ConfigureAwait(false));
}

/// <summary>
/// Lists matching resources
/// </summary>
/// <param name="labelSelector">A comma-separated list of label selectors, if any</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/></param>
/// <returns>A new <see cref="IActionResult"/></returns>
[HttpGet]
[ProducesResponseType(typeof(Collection<Resource>), (int)HttpStatusCode.OK)]
[ProducesErrorResponseType(typeof(Neuroglia.ProblemDetails))]
public virtual async Task<IActionResult> GetResources(string? labelSelector = null, CancellationToken cancellationToken = default)
{
if (!this.TryParseLabelSelectors(labelSelector, out var labelSelectors)) return this.InvalidLabelSelector(labelSelector!);
return this.Process(await this.Mediator.ExecuteAsync(new GetResourcesQuery<TResource>(null, labelSelectors), cancellationToken).ConfigureAwait(false));
}

/// <summary>
/// Lists matching resources
/// </summary>
Expand All @@ -46,6 +61,23 @@ public virtual async Task<IActionResult> GetResources(string @namespace, string?
return this.Process(await this.Mediator.ExecuteAsync(new GetResourcesQuery<TResource>(@namespace, labelSelectors), cancellationToken).ConfigureAwait(false));
}

/// <summary>
/// Lists matching resources
/// </summary>
/// <param name="labelSelector">A comma-separated list of label selectors, if any</param>
/// <param name="maxResults">The maximum amount, if any, of results to list at once</param>
/// <param name="continuationToken">A token, defined by a previously retrieved collection, used to continue enumerating through matches</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/></param>
/// <returns>A new <see cref="IActionResult"/></returns>
[HttpGet("list")]
[ProducesResponseType(typeof(Collection<Resource>), (int)HttpStatusCode.OK)]
[ProducesErrorResponseType(typeof(Neuroglia.ProblemDetails))]
public virtual async Task<IActionResult> ListResources(string? labelSelector = null, ulong? maxResults = null, string? continuationToken = null, CancellationToken cancellationToken = default)
{
if (!this.TryParseLabelSelectors(labelSelector, out var labelSelectors)) return this.InvalidLabelSelector(labelSelector!);
return this.Process(await this.Mediator.ExecuteAsync(new ListResourcesQuery<TResource>(null, labelSelectors, maxResults, continuationToken), cancellationToken).ConfigureAwait(false));
}

/// <summary>
/// Lists matching resources
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public virtual async Task WatchResourcesAsync(string connectionId, ResourceDefin
});
}
watch = await this.Resources.WatchAsync(definition.Group, definition.Version, definition.Plural, @namespace, cancellationToken: cancellationToken).ConfigureAwait(false);
watch.SubscribeAsync(e => this.OnResourceWatchEventAsync(connectionId, e));
watch.SubscribeAsync(e => this.OnResourceWatchEventAsync(connectionId, e), this.CancellationTokenSource.Token);
subscriptions.AddOrUpdate(subscriptionKey, watch, (key, current) =>
{
current.Dispose();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ public class DockerContainer(string id, IDockerClient dockerClient)
: IContainer
{

bool _disposed;

/// <summary>
/// Gets the container's ID
/// </summary>
Expand All @@ -35,15 +37,17 @@ public class DockerContainer(string id, IDockerClient dockerClient)
protected virtual IDockerClient DockerClient { get; } = dockerClient;

/// <inheritdoc/>
public virtual StreamReader? StandardOutput { get; set; }
public virtual StreamReader? StandardOutput { get; protected set; }

/// <inheritdoc/>
public virtual StreamReader? StandardError { get; protected set; }

/// <inheritdoc/>
public virtual StreamReader? StandardError { get; set; }
public long? ExitCode { get; protected set; }

/// <inheritdoc/>
public virtual async Task StartAsync(CancellationToken cancellationToken = default)
{

await this.DockerClient.Containers.StartContainerAsync(this.Id, new() { }, cancellationToken).ConfigureAwait(false);
#pragma warning disable CS0618 // Type or member is obsolete
var standardOutputStream = await this.DockerClient.Containers.GetContainerLogsAsync(this.Id, new() { Follow = true, ShowStdout = true, ShowStderr = true, Timestamps = false }, cancellationToken).ConfigureAwait(false);
Expand All @@ -54,9 +58,49 @@ public virtual async Task StartAsync(CancellationToken cancellationToken = defau
}

/// <inheritdoc/>
public virtual Task WaitForExitAsync(CancellationToken cancellationToken = default) => this.DockerClient.Containers.WaitContainerAsync(this.Id, cancellationToken);
public virtual async Task WaitForExitAsync(CancellationToken cancellationToken = default)
{
var response = await this.DockerClient.Containers.WaitContainerAsync(this.Id, cancellationToken).ConfigureAwait(false);
this.ExitCode = response.StatusCode;
}

/// <inheritdoc/>
public virtual Task StopAsync(CancellationToken cancellationToken = default) => this.DockerClient.Containers.StopContainerAsync(this.Id, new() { }, cancellationToken);

/// <summary>
/// Disposes of the <see cref="DockerContainer"/>
/// </summary>
/// <param name="disposing">A boolean indicating whether or not the <see cref="DockerContainer"/> is being disposed of</param>
/// <returns>A new awaitable <see cref="ValueTask"/></returns>
protected virtual async ValueTask DisposeAsync(bool disposing)
{
if (!this._disposed) return;
this._disposed = true;
await Task.CompletedTask.ConfigureAwait(false);
}

/// <inheritdoc/>
public async ValueTask DisposeAsync()
{
await this.DisposeAsync(true).ConfigureAwait(false);
GC.SuppressFinalize(this);
}

/// <summary>
/// Disposes of the <see cref="DockerContainer"/>
/// </summary>
/// <param name="disposing">A boolean indicating whether or not the <see cref="DockerContainer"/> is being disposed of</param>
protected virtual void Dispose(bool disposing)
{
if (!this._disposed) return;
this._disposed = true;
}

/// <inheritdoc/>
public void Dispose()
{
this.Dispose(true);
GC.SuppressFinalize(this);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ public static IServiceCollection AddSynapse(this IServiceCollection services, IC
services.AddPluginProvider();

var redisConnectionString = configuration.GetConnectionString(RedisDatabase.ConnectionStringName);
services.AddPlugin(typeof(IDatabase), string.IsNullOrWhiteSpace(redisConnectionString) ? null : provider => provider.GetRequiredService<RedisDatabase>(), serviceLifetime: ServiceLifetime.Scoped);
services.AddPlugin(typeof(IDatabase), string.IsNullOrWhiteSpace(redisConnectionString) ? null : provider => provider.GetRequiredService<RedisDatabase>(), serviceLifetime: ServiceLifetime.Singleton);

if (!string.IsNullOrWhiteSpace(redisConnectionString)) services.AddRedisDatabase(redisConnectionString, ServiceLifetime.Scoped);
if (!string.IsNullOrWhiteSpace(redisConnectionString)) services.AddRedisDatabase(redisConnectionString, ServiceLifetime.Singleton);
services.AddHostedService<Core.Infrastructure.Services.DatabaseInitializer>();

services.AddPlugin(typeof(IRepository<Document>), provider => provider.GetRequiredService<MongoRepository<Document, string>>(), serviceLifetime: ServiceLifetime.Scoped);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ namespace Synapse.Core.Infrastructure.Services;
/// Defines the fundamentals of a container
/// </summary>
public interface IContainer
: IDisposable, IAsyncDisposable
{

/// <summary>
Expand All @@ -29,6 +30,11 @@ public interface IContainer
/// </summary>
StreamReader? StandardError { get; }

/// <summary>
/// Gets the container's exit code
/// </summary>
long? ExitCode { get; }

/// <summary>
/// Starts the container
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
namespace Synapse.Core.Infrastructure.Services;

/// <summary>
/// Defines the fundamentals of a service used to interact with a container platform
/// Defines the fundamentals of a service used to manage containers
/// </summary>
public interface IContainerPlatform
{
Expand Down
28 changes: 28 additions & 0 deletions src/core/Synapse.Core/OperatorRuntimeMode.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
namespace Synapse;

/// <summary>
/// Enumerates all default operator runtime modes
/// </summary>
public static class OperatorRuntimeMode
{

/// <summary>
/// Gets the native operator runtime mode
/// </summary>
public const string Native = "native";
/// <summary>
/// Gets the containerized operator runtime mode
/// </summary>
public const string Containerized = "containerized";

/// <summary>
/// Gets a new <see cref="IEnumerable{T}"/> containing all default operator runtime modes
/// </summary>
/// <returns>A new <see cref="IEnumerable{T}"/> containing all default operator runtime modes</returns>
public static IEnumerable<string> AsEnumerable()
{
yield return Native;
yield return Containerized;
}

}
41 changes: 41 additions & 0 deletions src/core/Synapse.Core/OperatorStatusPhase.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright © 2024-Present Neuroglia SRL. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"),
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

namespace Synapse;

/// <summary>
/// Exposes all default operator status phases
/// </summary>
public static class OperatorStatusPhase
{

/// <summary>
/// Indicates that the operator is running
/// </summary>
public const string Running = "running";
/// <summary>
/// Indicates that the operator has been stopped and is not running
/// </summary>
public const string Stopped = "stopped";

/// <summary>
/// Gets an <see cref="IEnumerable{T}"/> containing default operator status phases
/// </summary>
/// <returns>A new <see cref="IEnumerable{T}"/> containing default operator status phases</returns>
public static IEnumerable<string> AsEnumerable()
{
yield return Running;
yield return Stopped;
}

}
Loading

0 comments on commit 6cdf503

Please sign in to comment.