Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added new add-in to fail the job when endpoint is busy. #39

Merged
merged 5 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<Version>10.6.4</Version>
<Version>10.6.5</Version>
<AssemblyVersion>1.0.0.0</AssemblyVersion>
<Title>OData Provider</Title>
<Description>The Odata Provider lets you fetch and map data from or to any OData endpoint.</Description>
Expand Down
17 changes: 16 additions & 1 deletion src/ODataProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@ public string EndpointId
[AddInParameterSection("Advanced activity settings")]
public string DeltaModifier { get; set; }

[AddInParameter("Fail job if endpoint is busy or down")]
[AddInParameterEditor(typeof(YesNoParameterEditor), "Tooltip=If endpoint is busy or down during the run time, it will not insert already imported rows.")]
[AddInParameterGroup("Source")]
[AddInParameterSection("Advanced activity settings")]
public bool FailJobOnEndpointIsBusy { get; set; }

#endregion

#region AddInManager/ConfigurableAddIn Destination
Expand Down Expand Up @@ -455,7 +461,7 @@ public override ISourceReader GetReader(Mapping mapping)
DoNotStoreLastResponseInLogFile = false;
}

return new ODataSourceReader(new HttpRestClient(_credentials, RequestTimeout), Logger, mapping, _endpoint, Mode, DeltaModifier, MaximumPageSize, RunLastRequest, RequestIntervals, DoNotStoreLastResponseInLogFile);
return new ODataSourceReader(new HttpRestClient(_credentials, RequestTimeout), Logger, mapping, _endpoint, Mode, DeltaModifier, MaximumPageSize, RunLastRequest, RequestIntervals, DoNotStoreLastResponseInLogFile, FailJobOnEndpointIsBusy);
}

/// <inheritdoc />
Expand Down Expand Up @@ -541,6 +547,7 @@ public override void UpdateSourceSettings(ISource source)
RequestIntervals = newProvider.RequestIntervals;
DoNotStoreLastResponseInLogFile = newProvider.DoNotStoreLastResponseInLogFile;
EndpointId = newProvider.EndpointId;
FailJobOnEndpointIsBusy = newProvider.FailJobOnEndpointIsBusy;
}

public ODataProvider() { }
Expand Down Expand Up @@ -614,6 +621,12 @@ public ODataProvider(XmlNode xmlNode) : this()
ContinueOnError = node.FirstChild.Value == "True";
}
break;
case "Failjobonendpointisbusy":
if (node.HasChildNodes)
{
FailJobOnEndpointIsBusy = node.FirstChild.Value == "True";
}
break;
}
}
}
Expand All @@ -633,6 +646,7 @@ public override string Serialize()
root.Add(CreateParameterNode(GetType(), "Predefined endpoint", EndpointId));
root.Add(CreateParameterNode(GetType(), "Destination endpoint", DestinationEndpointId));
root.Add(CreateParameterNode(GetType(), "Continue on error", ContinueOnError.ToString()));
root.Add(CreateParameterNode(GetType(), "Fail job if endpoint is busy or down", FailJobOnEndpointIsBusy.ToString()));
return document.ToString();
}

Expand All @@ -649,6 +663,7 @@ public override void SaveAsXml(XmlTextWriter textWriter)
textWriter.WriteElementString("Predefinedendpoint", EndpointId);
textWriter.WriteElementString("Destinationendpoint", DestinationEndpointId);
textWriter.WriteElementString("Continueonerror", ContinueOnError.ToString());
textWriter.WriteElementString("Failjobonendpointisbusy", FailJobOnEndpointIsBusy.ToString());
GetSchema().SaveAsXml(textWriter);
}

Expand Down
19 changes: 15 additions & 4 deletions src/ODataSourceReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
using System;
using System.Collections.Generic;
using System.Data;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Net;
Expand Down Expand Up @@ -44,6 +43,7 @@
private bool _requestTimedOutFromGlobalSettings;
private readonly int _maximumCharacterLengthOfAutoAddedSelectStatement = 1250;
private readonly int _timeoutInMilliseconds;
private readonly bool _failJobOnEndpointIsBusy;

internal void SaveRequestResponseFile()
{
Expand Down Expand Up @@ -95,7 +95,7 @@
/// <param name="mapping">The mapping.</param>
/// <param name="endpoint">The endpoint.</param>
/// <param name="nextPaginationUrlName">Name of the next pagination URL. "odata.nextLink" (case insensitive) is supposed to be a standard.</param>
internal ODataSourceReader(IHttpRestClient httpRestClient, ILogger logger, Mapping mapping, Endpoint endpoint, string mode, string deltaModifier, int maximumPageSize, bool readFromLastRequestResponse, int requestIntervals, bool doNotStoreLastResponseInLogFile, string nextPaginationUrlName = "odata.nextLink")
internal ODataSourceReader(IHttpRestClient httpRestClient, ILogger logger, Mapping mapping, Endpoint endpoint, string mode, string deltaModifier, int maximumPageSize, bool readFromLastRequestResponse, int requestIntervals, bool doNotStoreLastResponseInLogFile, bool failJobOnEndpointIsBusy, string nextPaginationUrlName = "odata.nextLink")
{
_totalResponseResult = new List<Dictionary<string, object>>();
_httpRestClient = httpRestClient;
Expand All @@ -109,6 +109,7 @@
_requestIntervals = requestIntervals;
_doNotStoreLastResponseInLogFile = doNotStoreLastResponseInLogFile;
_timeoutInMilliseconds = GetTimeOutInMilliseconds();
_failJobOnEndpointIsBusy = failJobOnEndpointIsBusy;
string logFileName = Scheduling.Task.MakeSafeFileName(mapping.Job.Name) + $"_{_mapping.SourceTable.Name}.log";

IDictionary<string, string> headers = GetAllHeaders();
Expand Down Expand Up @@ -578,11 +579,11 @@
if (endpointAuthentication.IsTokenBased())
{
string token = GetToken(_endpoint, endpointAuthentication);
task = RetryHelper.RetryOnExceptionAsync<Exception>(10, async () => { _httpRestClient.GetAsync(url, HandleStream, token, (Dictionary<string, string>)headers).Wait(new CancellationTokenSource(_timeoutInMilliseconds).Token); }, _logger);

Check warning on line 582 in src/ODataSourceReader.cs

View workflow job for this annotation

GitHub Actions / call-workflow / Build

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.

Check warning on line 582 in src/ODataSourceReader.cs

View workflow job for this annotation

GitHub Actions / call-workflow / Build

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.
}
else
{
task = RetryHelper.RetryOnExceptionAsync<Exception>(10, async () => { _httpRestClient.GetAsync(url, HandleStream, endpointAuthentication, (Dictionary<string, string>)headers).Wait(new CancellationTokenSource(_timeoutInMilliseconds).Token); }, _logger);

Check warning on line 586 in src/ODataSourceReader.cs

View workflow job for this annotation

GitHub Actions / call-workflow / Build

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.

Check warning on line 586 in src/ODataSourceReader.cs

View workflow job for this annotation

GitHub Actions / call-workflow / Build

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.
}
if (task.IsCanceled)
{
Expand All @@ -604,7 +605,7 @@
_logger?.Info($"This is retry {retryCounter} out of 2");
HandleRequest(url, loggerInfo, headers, retryCounter);
}

return false;
}
}
Expand Down Expand Up @@ -699,11 +700,11 @@
if (endpointAuthentication.IsTokenBased())
{
string token = GetToken(_endpoint, endpointAuthentication);
task = RetryHelper.RetryOnExceptionAsync<Exception>(10, async () => { _httpRestClient.GetAsync(checkUrl, HandleResponse, token, (Dictionary<string, string>)GetAllHeaders()).Wait(new CancellationTokenSource(_timeoutInMilliseconds).Token); }, _logger);

Check warning on line 703 in src/ODataSourceReader.cs

View workflow job for this annotation

GitHub Actions / call-workflow / Build

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.

Check warning on line 703 in src/ODataSourceReader.cs

View workflow job for this annotation

GitHub Actions / call-workflow / Build

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.
}
else
{
task = RetryHelper.RetryOnExceptionAsync<Exception>(10, async () => { _httpRestClient.GetAsync(checkUrl, HandleResponse, endpointAuthentication, (Dictionary<string, string>)GetAllHeaders()).Wait(new CancellationTokenSource(_timeoutInMilliseconds).Token); }, _logger);

Check warning on line 707 in src/ODataSourceReader.cs

View workflow job for this annotation

GitHub Actions / call-workflow / Build

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.

Check warning on line 707 in src/ODataSourceReader.cs

View workflow job for this annotation

GitHub Actions / call-workflow / Build

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.
}
if (task.IsCanceled)
{
Expand All @@ -720,7 +721,17 @@
}
else
{
_logger?.Info($"{checkUrl} returned the HttpStatusCode of: '{responseStatusCode}' ");
using var stream = new StreamReader(responseStream);
var streamResponse = stream.ReadToEnd();

if (_failJobOnEndpointIsBusy)
{
throw new WebException($"{checkUrl} returned: {streamResponse} with the HttpStatusCode of: '{responseStatusCode}' ");
}
else
{
_logger?.Info($"{checkUrl} returned: {streamResponse} with the HttpStatusCode of: '{responseStatusCode}' ");
}
}
}
return result;
Expand Down
Loading