Skip to content

Commit

Permalink
Improve shutdown (#8335)
Browse files Browse the repository at this point in the history
  • Loading branch information
Martin-Molinero authored Sep 19, 2024
1 parent 86fd80a commit f2f1d06
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 31 deletions.
38 changes: 16 additions & 22 deletions Common/Isolator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
using System.Threading.Tasks;
using QuantConnect.Logging;
using QuantConnect.Util;
using static QuantConnect.StringExtensions;

namespace QuantConnect
{
Expand All @@ -36,22 +35,6 @@ public CancellationTokenSource CancellationTokenSource
get; private set;
}

/// <summary>
/// Algo cancellation controls - cancellation token for algorithm thread.
/// </summary>
public CancellationToken CancellationToken
{
get { return CancellationTokenSource.Token; }
}

/// <summary>
/// Check if this task isolator is cancelled, and exit the analysis
/// </summary>
public bool IsCancellationRequested
{
get { return CancellationTokenSource.IsCancellationRequested; }
}

/// <summary>
/// Initializes a new instance of the <see cref="Isolator"/> class
/// </summary>
Expand Down Expand Up @@ -117,7 +100,7 @@ private bool MonitorTask(Task task,
memoryCap *= 1024 * 1024;
var spikeLimit = memoryCap*2;

while (!task.IsCompleted && utcNow < end)
while (!task.IsCompleted && !CancellationTokenSource.IsCancellationRequested && utcNow < end)
{
// if over 80% allocation force GC then sample
var sample = Convert.ToDouble(GC.GetTotalMemory(memoryUsed > memoryCap * 0.8));
Expand Down Expand Up @@ -166,15 +149,26 @@ private bool MonitorTask(Task task,
utcNow = DateTime.UtcNow;
}

if (task.IsCompleted == false && string.IsNullOrEmpty(message))
if (task.IsCompleted == false)
{
message = Messages.Isolator.MemoryUsageMonitorTaskTimedOut(timeSpan);
Log.Trace($"Isolator.ExecuteWithTimeLimit(): {message}");
if (CancellationTokenSource.IsCancellationRequested)
{
Log.Trace($"Isolator.ExecuteWithTimeLimit(): Operation was canceled");
throw new OperationCanceledException("Operation was canceled");
}
else if (string.IsNullOrEmpty(message))
{
message = Messages.Isolator.MemoryUsageMonitorTaskTimedOut(timeSpan);
Log.Trace($"Isolator.ExecuteWithTimeLimit(): {message}");
}
}

if (!string.IsNullOrEmpty(message))
{
CancellationTokenSource.Cancel();
if (!CancellationTokenSource.IsCancellationRequested)
{
CancellationTokenSource.Cancel();
}
Log.Error($"Security.ExecuteWithTimeLimit(): {message}");
throw new TimeoutException(message);
}
Expand Down
17 changes: 15 additions & 2 deletions Engine/AlgorithmManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class AlgorithmManager
private IAlgorithm _algorithm;
private readonly object _lock;
private readonly bool _liveMode;
private CancellationTokenSource _cancellationTokenSource;

/// <summary>
/// Publicly accessible algorithm status
Expand Down Expand Up @@ -111,14 +112,17 @@ public AlgorithmManager(bool liveMode, AlgorithmNodePacket job = null)
/// <param name="results">Result handler object</param>
/// <param name="realtime">Realtime processing object</param>
/// <param name="leanManager">ILeanManager implementation that is updated periodically with the IAlgorithm instance</param>
/// <param name="token">Cancellation token</param>
/// <param name="cancellationTokenSource">Cancellation token source to monitor</param>
/// <remarks>Modify with caution</remarks>
public void Run(AlgorithmNodePacket job, IAlgorithm algorithm, ISynchronizer synchronizer, ITransactionHandler transactions, IResultHandler results, IRealTimeHandler realtime, ILeanManager leanManager, CancellationToken token)
public void Run(AlgorithmNodePacket job, IAlgorithm algorithm, ISynchronizer synchronizer, ITransactionHandler transactions, IResultHandler results, IRealTimeHandler realtime, ILeanManager leanManager, CancellationTokenSource cancellationTokenSource)
{
//Initialize:
DataPoints = 0;
_algorithm = algorithm;

var token = cancellationTokenSource.Token;
_cancellationTokenSource = cancellationTokenSource;

var backtestMode = (job.Type == PacketType.BacktestNode);
var methodInvokers = new Dictionary<Type, MethodInvoker>();
var marginCallFrequency = TimeSpan.FromMinutes(5);
Expand Down Expand Up @@ -607,6 +611,15 @@ public void SetStatus(AlgorithmStatus state)
{
_algorithm.SetStatus(state);
}

if (state == AlgorithmStatus.Deleted)
{
if (!_cancellationTokenSource.IsCancellationRequested)
{
// if the algorithm was deleted or stopped, let's give the algorithm a few seconds to shutdown and cancel it out
_cancellationTokenSource.CancelAfter(TimeSpan.FromSeconds(5));
}
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion Engine/Engine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ public void Run(AlgorithmNodePacket job, AlgorithmManager manager, string assemb
// -> Using this Data Feed,
// -> Send Orders to this TransactionHandler,
// -> Send Results to ResultHandler.
algorithmManager.Run(job, algorithm, synchronizer, AlgorithmHandlers.Transactions, AlgorithmHandlers.Results, AlgorithmHandlers.RealTime, SystemHandlers.LeanManager, isolator.CancellationToken);
algorithmManager.Run(job, algorithm, synchronizer, AlgorithmHandlers.Transactions, AlgorithmHandlers.Results, AlgorithmHandlers.RealTime, SystemHandlers.LeanManager, isolator.CancellationTokenSource);
}
catch (Exception err)
{
Expand Down
3 changes: 2 additions & 1 deletion Tests/Brokerages/Paper/PaperBrokerageTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ public void AppliesDividendsOnce()
var realTime = new BacktestingRealTimeHandler();
using var nullLeanManager = new AlgorithmManagerTests.NullLeanManager();

using var tokenSource = new CancellationTokenSource();
// run algorithm manager
manager.Run(job,
algorithm,
Expand All @@ -138,7 +139,7 @@ public void AppliesDividendsOnce()
results,
realTime,
nullLeanManager,
new CancellationToken()
tokenSource
);

var postDividendCash = algorithm.Portfolio.CashBook[Currencies.USD].Amount;
Expand Down
38 changes: 35 additions & 3 deletions Tests/Common/IsolatorTests.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/*
/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
Expand All @@ -21,7 +21,7 @@

namespace QuantConnect.Tests.Common
{
[TestFixture]
[TestFixture, Parallelizable(ParallelScope.All)]
public class IsolatorTests
{
[Test]
Expand All @@ -45,6 +45,38 @@ public void WorksCorrectlyUsingWorker()
}
}

[Test]
public void Cancellation()
{
var isolator = new Isolator();
var executed = false;
var ended = false;
var canceled = false;
var result = false;
isolator.CancellationTokenSource.CancelAfter(TimeSpan.FromMilliseconds(100));
try
{
result = isolator.ExecuteWithTimeLimit(
TimeSpan.FromSeconds(5),
() => {
executed = true;
Thread.Sleep(5000);
ended = true;
},
5000,
sleepIntervalMillis: 10
);
}
catch (OperationCanceledException)
{
canceled = true;
}
Assert.IsTrue(canceled);
Assert.IsFalse(result);
Assert.IsTrue(executed);
Assert.IsFalse(ended);
}

[TestCase(Language.Python, true)]
[TestCase(Language.Python, false)]
[TestCase(Language.CSharp, true)]
Expand Down Expand Up @@ -98,4 +130,4 @@ private class TestWorkerThread : WorkerThread

}
}
}
}
4 changes: 2 additions & 2 deletions Tests/Engine/AlgorithmManagerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ public void TestAlgorithmManagerSpeed()
var results = new BacktestingResultHandler();
var realtime = new BacktestingRealTimeHandler();
using var leanManager = new NullLeanManager();
var token = new CancellationToken();
var nullSynchronizer = new NullSynchronizer(algorithm);

algorithm.Initialize();
Expand All @@ -136,7 +135,8 @@ public void TestAlgorithmManagerSpeed()

Log.Trace("Starting algorithm manager loop to process " + nullSynchronizer.Count + " time slices");
var sw = Stopwatch.StartNew();
algorithmManager.Run(job, algorithm, nullSynchronizer, transactions, results, realtime, leanManager, token);
using var tokenSource = new CancellationTokenSource();
algorithmManager.Run(job, algorithm, nullSynchronizer, transactions, results, realtime, leanManager, tokenSource);
sw.Stop();

realtime.Exit();
Expand Down

0 comments on commit f2f1d06

Please sign in to comment.