diff --git a/src/Runner.Common/RunServer.cs b/src/Runner.Common/RunServer.cs index c042796b124..50ad0556018 100644 --- a/src/Runner.Common/RunServer.cs +++ b/src/Runner.Common/RunServer.cs @@ -62,7 +62,10 @@ public Task GetJobMessageAsync(string id, CancellationTo CheckConnection(); return RetryRequest( async () => await _runServiceHttpClient.GetJobMessageAsync(requestUri, id, VarUtil.OS, cancellationToken), cancellationToken, - shouldRetry: ex => ex is not TaskOrchestrationJobAlreadyAcquiredException); + shouldRetry: ex => + ex is not TaskOrchestrationJobNotFoundException && // HTTP status 404 + ex is not TaskOrchestrationJobAlreadyAcquiredException && // HTTP status 409 + ex is not TaskOrchestrationJobUnprocessableException); // HTTP status 422 } public Task CompleteJobAsync( diff --git a/src/Runner.Listener/ErrorThrottler.cs b/src/Runner.Listener/ErrorThrottler.cs new file mode 100644 index 00000000000..8525c728573 --- /dev/null +++ b/src/Runner.Listener/ErrorThrottler.cs @@ -0,0 +1,44 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using GitHub.Runner.Common; +using GitHub.Services.Common; + +namespace GitHub.Runner.Listener +{ + [ServiceLocator(Default = typeof(ErrorThrottler))] + public interface IErrorThrottler : IRunnerService + { + void Reset(); + Task IncrementAndWaitAsync(CancellationToken token); + } + + public sealed class ErrorThrottler : RunnerService, IErrorThrottler + { + internal static readonly TimeSpan MinBackoff = TimeSpan.FromSeconds(1); + internal static readonly TimeSpan MaxBackoff = TimeSpan.FromMinutes(1); + internal static readonly TimeSpan BackoffCoefficient = TimeSpan.FromSeconds(1); + private int _count = 0; + + public void Reset() + { + _count = 0; + } + + public async Task IncrementAndWaitAsync(CancellationToken token) + { + if (++_count <= 1) + { + return; + } + + TimeSpan backoff = BackoffTimerHelper.GetExponentialBackoff( + attempt: _count - 2, // 0-based attempt + minBackoff: MinBackoff, + maxBackoff: MaxBackoff, + deltaBackoff: BackoffCoefficient); + Trace.Warning($"Back off {backoff.TotalSeconds} seconds before next attempt. Current consecutive error count: {_count}"); + await HostContext.Delay(backoff, token); + } + } +} diff --git a/src/Runner.Listener/Runner.cs b/src/Runner.Listener/Runner.cs index 7509e112457..d83cefa0645 100644 --- a/src/Runner.Listener/Runner.cs +++ b/src/Runner.Listener/Runner.cs @@ -32,10 +32,25 @@ public sealed class Runner : RunnerService, IRunner private bool _inConfigStage; private ManualResetEvent _completedCommand = new(false); + // + // Helps avoid excessive calls to Run Service when encountering non-retriable errors from /acquirejob. + // Normally we rely on the HTTP clients to back off between retry attempts. However, acquiring a job + // involves calls to both Run Serivce and Broker. And Run Service and Broker communicate with each other + // in an async fashion. + // + // When Run Service encounters a non-retriable error, it sends an async message to Broker. The runner will, + // however, immediately call Broker to get the next message. If the async event from Run Service to Broker + // has not yet been processed, the next message from Broker may be the same job message. + // + // The error throttler helps us back off when encountering successive, non-retriable errors from /acquirejob. + // + private IErrorThrottler _acquireJobThrottler; + public override void Initialize(IHostContext hostContext) { base.Initialize(hostContext); _term = HostContext.GetService(); + _acquireJobThrottler = HostContext.CreateService(); } public async Task ExecuteCommand(CommandSettings command) @@ -565,13 +580,16 @@ private async Task RunAsync(RunnerSettings settings, bool runOnce = false) await runServer.ConnectAsync(new Uri(messageRef.RunServiceUrl), creds); try { - jobRequestMessage = - await runServer.GetJobMessageAsync(messageRef.RunnerRequestId, - messageQueueLoopTokenSource.Token); + jobRequestMessage = await runServer.GetJobMessageAsync(messageRef.RunnerRequestId, messageQueueLoopTokenSource.Token); + _acquireJobThrottler.Reset(); } - catch (TaskOrchestrationJobAlreadyAcquiredException) + catch (Exception ex) when ( + ex is TaskOrchestrationJobNotFoundException || // HTTP status 404 + ex is TaskOrchestrationJobAlreadyAcquiredException || // HTTP status 409 + ex is TaskOrchestrationJobUnprocessableException) // HTTP status 422 { - Trace.Info("Job is already acquired, skip this message."); + Trace.Info($"Skipping message Job. {ex.Message}"); + await _acquireJobThrottler.IncrementAndWaitAsync(messageQueueLoopTokenSource.Token); continue; } catch (Exception ex) diff --git a/src/Test/L0/Listener/ErrorThrottlerL0.cs b/src/Test/L0/Listener/ErrorThrottlerL0.cs new file mode 100644 index 00000000000..e4118b181f0 --- /dev/null +++ b/src/Test/L0/Listener/ErrorThrottlerL0.cs @@ -0,0 +1,213 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using GitHub.DistributedTask.WebApi; +using GitHub.Runner.Listener; +using GitHub.Runner.Listener.Configuration; +using GitHub.Runner.Common.Tests; +using System.Runtime.CompilerServices; +using GitHub.Services.WebApi; +using Moq; +using Xunit; + +namespace GitHub.Runner.Common.Tests.Listener +{ + public sealed class ErrorThrottlerL0 + { + [Theory] + [InlineData(1)] + [InlineData(2)] + [InlineData(3)] + [InlineData(4)] + [InlineData(5)] + [InlineData(6)] + [InlineData(7)] + [InlineData(8)] + public async void TestIncrementAndWait(int totalAttempts) + { + using (TestHostContext hc = CreateTestContext()) + { + // Arrange + var errorThrottler = new ErrorThrottler(); + errorThrottler.Initialize(hc); + var eventArgs = new List(); + hc.Delaying += (sender, args) => + { + eventArgs.Add(args); + }; + + // Act + for (int attempt = 1; attempt <= totalAttempts; attempt++) + { + await errorThrottler.IncrementAndWaitAsync(CancellationToken.None); + } + + // Assert + Assert.Equal(totalAttempts - 1, eventArgs.Count); + for (int i = 0; i < eventArgs.Count; i++) + { + // Expected milliseconds + int expectedMin; + int expectedMax; + + switch (i) + { + case 0: + expectedMin = 1000; // Min backoff + expectedMax = 1000; + break; + case 1: + expectedMin = 1800; // Min + 0.8 * Coefficient + expectedMax = 2200; // Min + 1.2 * Coefficient + break; + case 2: + expectedMin = 3400; // Min + 0.8 * Coefficient * 3 + expectedMax = 4600; // Min + 1.2 * Coefficient * 3 + break; + case 3: + expectedMin = 6600; // Min + 0.8 * Coefficient * 7 + expectedMax = 9400; // Min + 1.2 * Coefficient * 7 + break; + case 4: + expectedMin = 13000; // Min + 0.8 * Coefficient * 15 + expectedMax = 19000; // Min + 1.2 * Coefficient * 15 + break; + case 5: + expectedMin = 25800; // Min + 0.8 * Coefficient * 31 + expectedMax = 38200; // Min + 1.2 * Coefficient * 31 + break; + case 6: + expectedMin = 51400; // Min + 0.8 * Coefficient * 63 + expectedMax = 60000; // Max backoff + break; + case 7: + expectedMin = 60000; + expectedMax = 60000; + break; + default: + throw new NotSupportedException("Unexpected eventArgs count"); + } + + var actualMilliseconds = eventArgs[i].Delay.TotalMilliseconds; + Assert.True(expectedMin <= actualMilliseconds, $"Unexpected min delay for eventArgs[{i}]. Expected min {expectedMin}, actual {actualMilliseconds}"); + Assert.True(expectedMax >= actualMilliseconds, $"Unexpected max delay for eventArgs[{i}]. Expected max {expectedMax}, actual {actualMilliseconds}"); + } + } + } + + [Fact] + public async void TestReset() + { + using (TestHostContext hc = CreateTestContext()) + { + // Arrange + var errorThrottler = new ErrorThrottler(); + errorThrottler.Initialize(hc); + var eventArgs = new List(); + hc.Delaying += (sender, args) => + { + eventArgs.Add(args); + }; + + // Act + await errorThrottler.IncrementAndWaitAsync(CancellationToken.None); + await errorThrottler.IncrementAndWaitAsync(CancellationToken.None); + await errorThrottler.IncrementAndWaitAsync(CancellationToken.None); + errorThrottler.Reset(); + await errorThrottler.IncrementAndWaitAsync(CancellationToken.None); + await errorThrottler.IncrementAndWaitAsync(CancellationToken.None); + await errorThrottler.IncrementAndWaitAsync(CancellationToken.None); + + // Assert + Assert.Equal(4, eventArgs.Count); + for (int i = 0; i < eventArgs.Count; i++) + { + // Expected milliseconds + int expectedMin; + int expectedMax; + + switch (i) + { + case 0: + case 2: + expectedMin = 1000; // Min backoff + expectedMax = 1000; + break; + case 1: + case 3: + expectedMin = 1800; // Min + 0.8 * Coefficient + expectedMax = 2200; // Min + 1.2 * Coefficient + break; + default: + throw new NotSupportedException("Unexpected eventArgs count"); + } + + var actualMilliseconds = eventArgs[i].Delay.TotalMilliseconds; + Assert.True(expectedMin <= actualMilliseconds, $"Unexpected min delay for eventArgs[{i}]. Expected min {expectedMin}, actual {actualMilliseconds}"); + Assert.True(expectedMax >= actualMilliseconds, $"Unexpected max delay for eventArgs[{i}]. Expected max {expectedMax}, actual {actualMilliseconds}"); + } + } + } + + [Fact] + public async void TestReceivesCancellationToken() + { + using (TestHostContext hc = CreateTestContext()) + { + // Arrange + var errorThrottler = new ErrorThrottler(); + errorThrottler.Initialize(hc); + var eventArgs = new List(); + hc.Delaying += (sender, args) => + { + eventArgs.Add(args); + }; + var cancellationTokenSource1 = new CancellationTokenSource(); + var cancellationTokenSource2 = new CancellationTokenSource(); + var cancellationTokenSource3 = new CancellationTokenSource(); + + // Act + await errorThrottler.IncrementAndWaitAsync(cancellationTokenSource1.Token); + await errorThrottler.IncrementAndWaitAsync(cancellationTokenSource2.Token); + await errorThrottler.IncrementAndWaitAsync(cancellationTokenSource3.Token); + + // Assert + Assert.Equal(2, eventArgs.Count); + Assert.Equal(cancellationTokenSource2.Token, eventArgs[0].Token); + Assert.Equal(cancellationTokenSource3.Token, eventArgs[1].Token); + } + } + + [Fact] + public async void TestReceivesSender() + { + using (TestHostContext hc = CreateTestContext()) + { + // Arrange + var errorThrottler = new ErrorThrottler(); + errorThrottler.Initialize(hc); + var senders = new List(); + hc.Delaying += (sender, args) => + { + senders.Add(sender); + }; + + // Act + await errorThrottler.IncrementAndWaitAsync(CancellationToken.None); + await errorThrottler.IncrementAndWaitAsync(CancellationToken.None); + await errorThrottler.IncrementAndWaitAsync(CancellationToken.None); + + // Assert + Assert.Equal(2, senders.Count); + Assert.Equal(hc, senders[0]); + Assert.Equal(hc, senders[1]); + } + } + + private TestHostContext CreateTestContext([CallerMemberName] String testName = "") + { + return new TestHostContext(this, testName); + } + } +} diff --git a/src/Test/L0/Listener/RunnerL0.cs b/src/Test/L0/Listener/RunnerL0.cs index 03f251a7720..b29f8835c2b 100644 --- a/src/Test/L0/Listener/RunnerL0.cs +++ b/src/Test/L0/Listener/RunnerL0.cs @@ -23,6 +23,7 @@ public sealed class RunnerL0 private Mock _term; private Mock _configStore; private Mock _updater; + private Mock _acquireJobThrottler; public RunnerL0() { @@ -35,6 +36,7 @@ public RunnerL0() _term = new Mock(); _configStore = new Mock(); _updater = new Mock(); + _acquireJobThrottler = new Mock(); } private Pipelines.AgentJobRequestMessage CreateJobRequestMessage(string jobName) @@ -67,6 +69,7 @@ public async void TestRunAsync() hc.SetSingleton(_promptManager.Object); hc.SetSingleton(_runnerServer.Object); hc.SetSingleton(_configStore.Object); + hc.EnqueueInstance(_acquireJobThrottler.Object); runner.Initialize(hc); var settings = new RunnerSettings { @@ -174,6 +177,7 @@ public async void TestExecuteCommandForRunAsService(string[] args, bool configur hc.SetSingleton(_promptManager.Object); hc.SetSingleton(_messageListener.Object); hc.SetSingleton(_configStore.Object); + hc.EnqueueInstance(_acquireJobThrottler.Object); var command = new CommandSettings(hc, args); @@ -205,6 +209,7 @@ public async void TestMachineProvisionerCLI() hc.SetSingleton(_promptManager.Object); hc.SetSingleton(_messageListener.Object); hc.SetSingleton(_configStore.Object); + hc.EnqueueInstance(_acquireJobThrottler.Object); var command = new CommandSettings(hc, new[] { "run" }); @@ -242,6 +247,7 @@ public async void TestRunOnce() hc.SetSingleton(_promptManager.Object); hc.SetSingleton(_runnerServer.Object); hc.SetSingleton(_configStore.Object); + hc.EnqueueInstance(_acquireJobThrottler.Object); runner.Initialize(hc); var settings = new RunnerSettings { @@ -338,6 +344,7 @@ public async void TestRunOnceOnlyTakeOneJobMessage() hc.SetSingleton(_promptManager.Object); hc.SetSingleton(_runnerServer.Object); hc.SetSingleton(_configStore.Object); + hc.EnqueueInstance(_acquireJobThrottler.Object); runner.Initialize(hc); var settings = new RunnerSettings { @@ -439,6 +446,7 @@ public async void TestRunOnceHandleUpdateMessage() hc.SetSingleton(_runnerServer.Object); hc.SetSingleton(_configStore.Object); hc.SetSingleton(_updater.Object); + hc.EnqueueInstance(_acquireJobThrottler.Object); runner.Initialize(hc); var settings = new RunnerSettings @@ -522,6 +530,7 @@ public async void TestRemoveLocalRunnerConfig() hc.SetSingleton(_configurationManager.Object); hc.SetSingleton(_configStore.Object); hc.SetSingleton(_promptManager.Object); + hc.EnqueueInstance(_acquireJobThrottler.Object); var command = new CommandSettings(hc, new[] { "remove", "--local" }); diff --git a/src/Test/L0/TestHostContext.cs b/src/Test/L0/TestHostContext.cs index a3e484b14c8..c44f13f1c4a 100644 --- a/src/Test/L0/TestHostContext.cs +++ b/src/Test/L0/TestHostContext.cs @@ -30,9 +30,11 @@ public sealed class TestHostContext : IHostContext, IDisposable private string _tempDirectoryRoot = Path.Combine(Path.GetTempPath(), Guid.NewGuid().ToString("D")); private StartupType _startupType; public event EventHandler Unloading; + public event EventHandler Delaying; public CancellationToken RunnerShutdownToken => _runnerShutdownTokenSource.Token; public ShutdownReason RunnerShutdownReason { get; private set; } public ISecretMasker SecretMasker => _secretMasker; + public TestHostContext(object testClass, [CallerMemberName] string testName = "") { ArgUtil.NotNull(testClass, nameof(testClass)); @@ -92,6 +94,14 @@ public StartupType StartupType public async Task Delay(TimeSpan delay, CancellationToken token) { + // Event callback + EventHandler handler = Delaying; + if (handler != null) + { + handler(this, new DelayEventArgs(delay, token)); + } + + // Delay zero await Task.Delay(TimeSpan.Zero); } @@ -361,4 +371,19 @@ private void LoadContext_Unloading(AssemblyLoadContext obj) } } } + + public class DelayEventArgs : EventArgs + { + public DelayEventArgs( + TimeSpan delay, + CancellationToken token) + { + Delay = delay; + Token = token; + } + + public TimeSpan Delay { get; } + + public CancellationToken Token { get; } + } }