Skip to content

Commit

Permalink
Merge pull request #79 from neuroglia-io/fix-unfault-subscription
Browse files Browse the repository at this point in the history
Repaired the ability to recover a faulted subscription without restart
  • Loading branch information
cdavernas authored Jan 22, 2025
2 parents 2e92505 + a38c9e0 commit 343e524
Show file tree
Hide file tree
Showing 6 changed files with 211 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,10 @@ public virtual async Task<TResource> PatchAsync(Patch patch, string name, string
public virtual async Task<TResource> PatchStatusAsync(Patch patch, string name, string? @namespace = null, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(patch);
var uri = string.IsNullOrWhiteSpace(@namespace) ? $"{this.Path}/{name}/status" : $"{this.Path}/namespace/{@namespace}/{name}/status";
var json = this.Serializer.SerializeToText(patch);
using var content = new StringContent(json, Encoding.UTF8, MediaTypeNames.Application.Json);
using var request = await this.ProcessRequestAsync(new HttpRequestMessage(HttpMethod.Patch, $"{this.Path}/status") { Content = content }, cancellationToken).ConfigureAwait(false);
using var request = await this.ProcessRequestAsync(new HttpRequestMessage(HttpMethod.Patch, uri) { Content = content }, cancellationToken).ConfigureAwait(false);
using var response = await this.ProcessResponseAsync(await this.HttpClient.SendAsync(request, cancellationToken).ConfigureAwait(false), cancellationToken).ConfigureAwait(false);
json = await response.Content.ReadAsStringAsync(cancellationToken).ConfigureAwait(false);
return this.Serializer.Deserialize<TResource>(json)!;
Expand Down
17 changes: 17 additions & 0 deletions src/core/CloudStreams.Core.Api/ClusterResourceApiController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,23 @@ public virtual async Task<IActionResult> PatchResource(string name, [FromBody] P
return this.Process(await this.Mediator.ExecuteAsync(new PatchResourceCommand<TResource>(name, null, patch, dryRun), cancellationToken).ConfigureAwait(false));
}

/// <summary>
/// Patches the specified resource status
/// </summary>
/// <param name="patch">The patch to apply</param>
/// <param name="name">The name of the resource to patch</param>
/// <param name="dryRun">A boolean indicating whether or not to persist changes</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/></param>
/// <returns>A new <see cref="IActionResult"/></returns>
[HttpPatch("{name}/status")]
[ProducesResponseType(typeof(IAsyncEnumerable<Resource>), (int)HttpStatusCode.OK)]
[ProducesErrorResponseType(typeof(Neuroglia.ProblemDetails))]
public virtual async Task<IActionResult> PatchResourceStatus(string name, [FromBody] Patch patch, bool dryRun = false, CancellationToken cancellationToken = default)
{
if (!this.ModelState.IsValid) return this.ValidationProblem(this.ModelState);
return this.Process(await this.Mediator.ExecuteAsync(new PatchResourceStatusCommand<TResource>(name, null, patch, dryRun), cancellationToken).ConfigureAwait(false));
}

/// <summary>
/// Deletes the specified resource
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using Json.Patch;
using Neuroglia.Data.Infrastructure.ResourceOriented;
using Neuroglia.Data.PatchModel.Services;
using Neuroglia.Data;
using System.Xml.Linq;
using Neuroglia.Serialization.Json;

namespace CloudStreams.Core.Application.Commands.Resources.Generic;

/// <summary>
/// Represents the <see cref="ICommand"/> used to delete an existing <see cref="IResource"/>
/// Represents the <see cref="ICommand"/> used to patch an existing <see cref="IResource"/>
/// </summary>
/// <typeparam name="TResource">The type of <see cref="IResource"/> to patch</typeparam>
public class PatchResourceCommand<TResource>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright © 2024-Present The Cloud Streams Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"),
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

namespace CloudStreams.Core.Application.Commands.Resources.Generic;

/// <summary>
/// Represents the <see cref="ICommand"/> used to patch the status of an existing <see cref="IResource"/>
/// </summary>
/// <typeparam name="TResource">The type of <see cref="IResource"/> to patch</typeparam>
public class PatchResourceStatusCommand<TResource>
: Command<TResource>
where TResource : class, IResource, new()
{

/// <summary>
/// Initializes a new <see cref="PatchResourceStatusCommand{TResource}"/>
/// </summary>
/// <param name="name">The name of the <see cref="IResource"/> to patch</param>
/// <param name="namespace">The namespace the <see cref="IResource"/> to patch belongs to</param>
/// <param name="patch">The patch to apply</param>
/// <param name="dryRun">A boolean indicating whether or not to persist changes</param>
public PatchResourceStatusCommand(string name, string? @namespace, Patch patch, bool dryRun)
{
if (string.IsNullOrWhiteSpace(name)) throw new ArgumentNullException(nameof(name));
this.Name = name;
this.Namespace = @namespace;
this.Patch = patch ?? throw new ArgumentNullException(nameof(patch));
this.DryRun = dryRun;
}

/// <summary>
/// Gets the name of the <see cref="IResource"/> to patch
/// </summary>
public string Name { get; }

/// <summary>
/// Gets the name of the <see cref="IResource"/> to patch
/// </summary>
public string? Namespace { get; }

/// <summary>
/// Gets the patch to apply
/// </summary>
public Patch Patch { get; }

/// <summary>
/// Gets a boolean indicating whether or not to persist changes
/// </summary>
public bool DryRun { get; }

}

/// <summary>
/// Represents the service used to handle <see cref="PatchResourceStatusCommand"/>s
/// </summary>
/// <typeparam name="TResource">The type of <see cref="IResource"/> to patch</typeparam>
/// <param name="repository">The service used to manage <see cref="IResource"/>s</param>
public class PatchResourceStatusCommandHandler<TResource>(IResourceRepository repository)
: ICommandHandler<PatchResourceStatusCommand<TResource>, TResource>
where TResource : class, IResource, new()
{

/// <inheritdoc/>
public virtual async Task<IOperationResult<TResource>> HandleAsync(PatchResourceStatusCommand<TResource> command, CancellationToken cancellationToken)
{
var resource = await repository.PatchStatusAsync<TResource>(command.Patch, command.Name, command.Namespace, null, command.DryRun, cancellationToken).ConfigureAwait(false);
return this.Ok(resource);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright © 2024-Present The Cloud Streams Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"),
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

namespace CloudStreams.Core.Application.Commands.Resources;

/// <summary>
/// Represents the <see cref="ICommand"/> used to patch the status of an existing <see cref="IResource"/>
/// </summary>
public class PatchResourceStatusCommand
: Command<IResource>
{

/// <summary>
/// Initializes a new <see cref="PatchResourceStatusCommand"/>
/// </summary>
protected PatchResourceStatusCommand() { }

/// <summary>
/// Initializes a new <see cref="PatchResourceStatusCommand"/>
/// </summary>
/// <param name="group">The API group the resource to patch belongs to</param>
/// <param name="version">The version of the resource to patch</param>
/// <param name="plural">The plural name of the type of resource to patch</param>
/// <param name="name">The name of the <see cref="IResource"/> to patch</param>
/// <param name="namespace">The namespace the <see cref="IResource"/> to patch belongs to</param>
/// <param name="patch">The patch to apply</param>
/// <param name="dryRun">A boolean indicating whether or not to persist changes</param>
public PatchResourceStatusCommand(string group, string version, string plural, string name, string? @namespace, Patch patch, bool dryRun)
{
if (string.IsNullOrWhiteSpace(group)) throw new ArgumentNullException(nameof(group));
if (string.IsNullOrWhiteSpace(version)) throw new ArgumentNullException(nameof(version));
if (string.IsNullOrWhiteSpace(plural)) throw new ArgumentNullException(nameof(plural));
if (string.IsNullOrWhiteSpace(name)) throw new ArgumentNullException(nameof(name));
this.Group = group;
this.Version = version;
this.Plural = plural;
this.Name = name;
this.Namespace = @namespace;
this.Patch = patch ?? throw new ArgumentNullException(nameof(patch));
this.DryRun = dryRun;
}

/// <summary>
/// Gets the API group the resource to patch belongs to
/// </summary>
public string Group { get; } = null!;

/// <summary>
/// Gets the version of the resource to patch
/// </summary>
public string Version { get; } = null!;

/// <summary>
/// Gets the plural name of the type of resource to patch
/// </summary>
public string Plural { get; } = null!;

/// <summary>
/// Gets the name of the <see cref="IResource"/> to patch
/// </summary>
public string Name { get; } = null!;

/// <summary>
/// Gets the name of the <see cref="IResource"/> to patch
/// </summary>
public string? Namespace { get; }

/// <summary>
/// Gets the patch to apply
/// </summary>
public Patch Patch { get; } = null!;

/// <summary>
/// Gets a boolean indicating whether or not to persist changes
/// </summary>
public bool DryRun { get; }

}

/// <summary>
/// Represents the service used to handle <see cref="PatchResourceStatusCommand"/>s
/// </summary>
/// <param name="repository">The service used to manage <see cref="IResource"/>s</param>
public class PatchResourceStatusCommandHandler(IResourceRepository repository)
: ICommandHandler<PatchResourceStatusCommand, IResource>
{

/// <inheritdoc/>
public virtual async Task<IOperationResult<IResource>> HandleAsync(PatchResourceStatusCommand command, CancellationToken cancellationToken)
{
var resource = await repository.PatchSubResourceAsync(command.Patch, command.Group, command.Version, command.Plural, command.Name, "status", command.Namespace, null, command.DryRun, cancellationToken).ConfigureAwait(false);
return this.Ok(resource);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ public static IServiceCollection AddCoreApiCommands(this IServiceCollection serv
handlerImplementationType = typeof(PatchResourceCommandHandler<>).MakeGenericType(resourceType);
services.Add(new ServiceDescriptor(handlerServiceType, handlerImplementationType, serviceLifetime));

commandType = typeof(PatchResourceStatusCommand<>).MakeGenericType(resourceType);
resultType = typeof(IOperationResult<>).MakeGenericType(resourceType);
handlerServiceType = typeof(IRequestHandler<,>).MakeGenericType(commandType, resultType);
handlerImplementationType = typeof(PatchResourceStatusCommandHandler<>).MakeGenericType(resourceType);
services.Add(new ServiceDescriptor(handlerServiceType, handlerImplementationType, serviceLifetime));

commandType = typeof(DeleteResourceCommand<>).MakeGenericType(resourceType);
resultType = typeof(IOperationResult<>).MakeGenericType(resourceType);
handlerServiceType = typeof(IRequestHandler<,>).MakeGenericType(commandType, resultType);
Expand Down

0 comments on commit 343e524

Please sign in to comment.