Skip to content

Commit

Permalink
Add Streaming improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
areed1192 committed Apr 26, 2020
1 parent 5dca2f8 commit a3aabf2
Show file tree
Hide file tree
Showing 8 changed files with 1,045 additions and 825 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,5 @@ pypirc
.pypirc

# don't track the json with state
*State.json
*State.json
tests/parse_level_two.py
59 changes: 52 additions & 7 deletions samples/api_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import config.credentials as config
from td.client import TDClient


# Create a new session
TDSession = TDClient(
client_id=config.CLIENT_ID,
Expand All @@ -18,13 +17,37 @@
TDStreamingClient = TDSession.create_streaming_session()

# Level One Quote
TDStreamingClient.level_one_quotes(symbols=["SPY", "IVV", "SDS", "SH", "SPXL", "SPXS", "SPXU", "SSO", "UPRO", "VOO"], fields=list(range(0,50)))
TDStreamingClient.level_one_quotes(symbols=["SPY", "IVV", "SDS", "SH"], fields=list(range(0,50)))

# Level One Option
TDStreamingClient.level_one_options(symbols=['AAPL_040920C115'], fields=list(range(0,42)))
TDStreamingClient.level_one_futures(symbols=['/ES'], fields=list(range(0,42)))

# Data Pipeline function
async def data_pipeline():
"""
This is an example of how to build a data pipeline,
using the library. A common scenario that would warrant
using a pipeline is taking data that is sent back to the stream
and processing it so it can be used in other programs or functions.
Generally speaking, you will need to wrap the operations that process
and handle the data inside an async function. The reason being is so
that you can await the return of data.
However, operations like creating the client, and subscribing to services
can be performed outside the async function. In the example below, we demonstrate
building the pipline, which is connecting to the websocket and logging in.
We then start the pipeline, which is where the services are subscribed to and data
begins streaming. The `start_pipeline()` will return the data as it comes in. From
there, we process the data however we choose.
Additionally, we can also see how to unsubscribe from a stream using logic and how
to close the socket mid-stream.
"""

data_response_count = 0
heartbeat_response_count = 0

# Build the Pipeline.
await TDStreamingClient.build_pipeline()
Expand All @@ -34,18 +57,40 @@ async def data_pipeline():

# Start the Pipeline.
data = await TDStreamingClient.start_pipeline()
# Grab the Data.

# Grab the Data, if there was any. Remember not every message will have `data.`
if 'data' in data:
data_content = data['data'][0]['content']

pprint.pprint(data_content)
print('='*80)

data_content = data['data'][0]['content']
pprint.pprint(data_content, indent=4)

# Here I can grab data as it comes in and do something with it.
if 'key' in data_content[0]:
print('Here is my key: {}'.format(data_content[0]['key']))

print('-'*80)
data_response_count += 1

# If we get a heartbeat notice, let's increment our counter.
elif 'notify' in data:
print(data['notify'][0])
heartbeat_response_count += 1

# Once we have 1 data responses, we can unsubscribe from a service.
if data_response_count == 1:
unsub = await TDStreamingClient.unsubscribe(service='LEVELONE_FUTURES')
data_response_count += 1
print('='*80)
print(unsub)
print('-'*80)

# Once we have 5 heartbeats, let's close the stream. Make sure to break the while loop.
# or else you will encounter an exception.
if heartbeat_response_count == 3:
await TDStreamingClient.close_stream()
break

# Run the pipeline.
asyncio.run(data_pipeline())
18 changes: 0 additions & 18 deletions samples/api_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,21 +77,3 @@

# Stream it.
TDStreamingClient.stream()


'''
DEFINING CLOSE LOGIC
Closing the stream involves defining the number of seconds you want to keep it open. Right now,
the logic is basic but in future releases we will be able to specify specific times like during
market hours.
'''

# Let's keep the server open for only 10 seconds, so define the time in seconds.
keep_open_in_seconds = 10

# Call the streaming client, and set the logic.
TDStreamingClient.close_logic(run_duration=keep_open_in_seconds)

# # Start Streaming.
TDStreamingClient.stream()
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

# I'm in alpha development still, so a compliant version number is a1.
# read this as MAJOR VERSION 0, MINOR VERSION 1, MAINTENANCE VERSION 0
version='0.2.4',
version='0.2.5',

# here is a simple description of the library, this will appear when
# someone searches for the library on https://pypi.org/search
Expand Down
Loading

0 comments on commit a3aabf2

Please sign in to comment.