Skip to content

Commit

Permalink
Add aggregator reader (QuantConnect#5942)
Browse files Browse the repository at this point in the history
* Add BaseDataCollectionAggregatorReader

- Add enumerable for BaseDataCollection
- Add BaseDataCollectionAggregatorReader
- Clean up CollectionSubscriptionDataSourceReader

* Address reviews

- Add new FileFormat that will be handled by the new Reader
- Adding unit tests for the new BaseDataCollectionAggregatorReader
- Some DataSourceReader duplication cleaning up

* Minor adjustment after some live trading tests
  • Loading branch information
Martin-Molinero authored Sep 27, 2021
1 parent 0f7dfe8 commit d739873
Show file tree
Hide file tree
Showing 14 changed files with 291 additions and 173 deletions.
2 changes: 1 addition & 1 deletion Common/Data/Custom/Tiingo/TiingoPrice.cs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public override SubscriptionDataSource GetSource(SubscriptionDataConfig config,

var tiingoTicker = TiingoSymbolMapper.GetTiingoTicker(config.Symbol);
var source = Invariant($"https://api.tiingo.com/tiingo/daily/{tiingoTicker}/prices?startDate={startDate:yyyy-MM-dd}&token={Tiingo.AuthCode}");
return new SubscriptionDataSource(source, SubscriptionTransportMedium.RemoteFile, FileFormat.Collection);
return new SubscriptionDataSource(source, SubscriptionTransportMedium.RemoteFile, FileFormat.UnfoldingCollection);
}

/// <summary>
Expand Down
14 changes: 11 additions & 3 deletions Common/Data/FileFormat.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,19 @@ public enum FileFormat
/// <summary>
/// Reader returns a BaseDataCollection object.
/// </summary>
Collection,
/// <remarks>Lean will unfold the collection and consume it as individual data points</remarks>
UnfoldingCollection,

/// <summary>
/// Data stored using an intermediate index source
/// </summary>
Index
Index,

/// <summary>
/// Data type inherits from BaseDataCollection.
/// Reader method can return a non BaseDataCollection type which will be folded, based on unique time,
/// into an instance of the data type.
/// </summary>
FoldingCollection
}
}
}
23 changes: 21 additions & 2 deletions Common/Data/UniverseSelection/BaseDataCollection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@
*/

using System;
using System.Collections.Generic;
using System.Linq;
using System.Collections;
using System.Collections.Generic;

namespace QuantConnect.Data.UniverseSelection
{
/// <summary>
/// This type exists for transport of data as a single packet
/// </summary>
public class BaseDataCollection : BaseData
public class BaseDataCollection : BaseData, IEnumerable<BaseData>
{
private DateTime _endTime;

Expand Down Expand Up @@ -98,5 +99,23 @@ public override BaseData Clone()
{
return new BaseDataCollection(Time, EndTime, Symbol, Data);
}

/// <summary>
/// Returns an IEnumerator for this enumerable Object. The enumerator provides
/// a simple way to access all the contents of a collection.
/// </summary>
public IEnumerator<BaseData> GetEnumerator()
{
return (Data ?? Enumerable.Empty<BaseData>()).GetEnumerator();
}

/// <summary>
/// Returns an IEnumerator for this enumerable Object. The enumerator provides
/// a simple way to access all the contents of a collection.
/// </summary>
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
}
}
85 changes: 85 additions & 0 deletions Engine/DataFeeds/BaseDataCollectionAggregatorReader.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

using System;
using QuantConnect.Data;
using QuantConnect.Interfaces;
using System.Collections.Generic;
using QuantConnect.Data.UniverseSelection;

namespace QuantConnect.Lean.Engine.DataFeeds
{
/// <summary>
/// Data source reader that will aggregate data points into a base data collection
/// </summary>
public class BaseDataCollectionAggregatorReader : TextSubscriptionDataSourceReader
{
private readonly Type _collectionType;
private BaseDataCollection _collection;

/// <summary>
/// Initializes a new instance of the <see cref="TextSubscriptionDataSourceReader"/> class
/// </summary>
/// <param name="dataCacheProvider">This provider caches files if needed</param>
/// <param name="config">The subscription's configuration</param>
/// <param name="date">The date this factory was produced to read data for</param>
/// <param name="isLiveMode">True if we're in live mode, false for backtesting</param>
public BaseDataCollectionAggregatorReader(IDataCacheProvider dataCacheProvider, SubscriptionDataConfig config, DateTime date, bool isLiveMode)
: base(dataCacheProvider, config, date, isLiveMode)
{
_collectionType = config.Type;
}

/// <summary>
/// Reads the specified <paramref name="source"/>
/// </summary>
/// <param name="source">The source to be read</param>
/// <returns>An <see cref="IEnumerable{BaseData}"/> that contains the data in the source</returns>
public override IEnumerable<BaseData> Read(SubscriptionDataSource source)
{
foreach (var point in base.Read(source))
{
if (point is BaseDataCollection)
{
yield return point;
}
else
{
if (_collection != null && _collection.EndTime != point.EndTime)
{
yield return _collection;
_collection = null;
}

if (_collection == null)
{
_collection = (BaseDataCollection)Activator.CreateInstance(_collectionType);
_collection.Time = point.Time;
_collection.Symbol = point.Symbol;
_collection.EndTime = point.EndTime;
}
_collection.Data.Add(point);
}
}

if (_collection != null)
{
yield return _collection;
_collection = null;
}
}
}
}
57 changes: 36 additions & 21 deletions Engine/DataFeeds/BaseSubscriptionDataSourceReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
*/

using System;
using System.Collections.Generic;
using System.ComponentModel;
using QuantConnect.Data;
using System.ComponentModel;
using QuantConnect.Interfaces;
using System.Collections.Generic;
using QuantConnect.Lean.Engine.DataFeeds.Transport;

namespace QuantConnect.Lean.Engine.DataFeeds
Expand All @@ -41,7 +41,7 @@ public abstract class BaseSubscriptionDataSourceReader : ISubscriptionDataSource
/// Event fired when the specified source is considered invalid, this may
/// be from a missing file or failure to download a remote source
/// </summary>
public abstract event EventHandler<InvalidSourceEventArgs> InvalidSource;
public event EventHandler<InvalidSourceEventArgs> InvalidSource;

/// <summary>
/// Creates a new instance
Expand All @@ -66,40 +66,55 @@ protected BaseSubscriptionDataSourceReader(IDataCacheProvider dataCacheProvider,
/// <returns>A new instance of <see cref="IStreamReader"/> to read the source, or null if there was an error</returns>
protected IStreamReader CreateStreamReader(SubscriptionDataSource subscriptionDataSource)
{
IStreamReader reader;
switch (subscriptionDataSource.TransportMedium)
IStreamReader reader = null;
try
{
case SubscriptionTransportMedium.LocalFile:
reader = HandleLocalFileSource(subscriptionDataSource);
break;
switch (subscriptionDataSource.TransportMedium)
{
case SubscriptionTransportMedium.LocalFile:
reader = new LocalFileSubscriptionStreamReader(DataCacheProvider, subscriptionDataSource.Source);
break;

case SubscriptionTransportMedium.RemoteFile:
reader = HandleRemoteSourceFile(subscriptionDataSource);
break;
case SubscriptionTransportMedium.RemoteFile:
reader = HandleRemoteSourceFile(subscriptionDataSource);
break;

case SubscriptionTransportMedium.Rest:
reader = new RestSubscriptionStreamReader(subscriptionDataSource.Source, subscriptionDataSource.Headers, IsLiveMode);
break;
case SubscriptionTransportMedium.Rest:
reader = new RestSubscriptionStreamReader(subscriptionDataSource.Source, subscriptionDataSource.Headers, IsLiveMode);
break;

default:
throw new InvalidEnumArgumentException("Unexpected SubscriptionTransportMedium specified: " + subscriptionDataSource.TransportMedium);
default:
throw new InvalidEnumArgumentException("Unexpected SubscriptionTransportMedium specified: " + subscriptionDataSource.TransportMedium);
}
}
catch (Exception e)
{
OnInvalidSource(subscriptionDataSource, e);
return reader;
}

if (reader == null || reader.EndOfStream)
{
OnInvalidSource(subscriptionDataSource, new Exception($"The reader was empty for source: ${subscriptionDataSource.Source}"));
return null;
}
return reader;
}

/// <summary>
/// Opens up an IStreamReader for a local file source
/// Event invocator for the <see cref="InvalidSource"/> event
/// </summary>
protected IStreamReader HandleLocalFileSource(SubscriptionDataSource source)
/// <param name="source">The <see cref="SubscriptionDataSource"/> that was invalid</param>
/// <param name="exception">The exception if one was raised, otherwise null</param>
protected void OnInvalidSource(SubscriptionDataSource source, Exception exception)
{
// handles zip or text files
return new LocalFileSubscriptionStreamReader(DataCacheProvider, source.Source);
InvalidSource?.Invoke(this, new InvalidSourceEventArgs(source, exception));
}

/// <summary>
/// Opens up an IStreamReader for a remote file source
/// </summary>
protected IStreamReader HandleRemoteSourceFile(SubscriptionDataSource source)
private IStreamReader HandleRemoteSourceFile(SubscriptionDataSource source)
{
SubscriptionDataSourceReader.CheckRemoteFileCache();

Expand Down
Loading

0 comments on commit d739873

Please sign in to comment.