|
| 1 | +import time |
| 2 | +import ast |
| 3 | +import sqlite3 |
| 4 | +import pandas as pd |
| 5 | +from typing import List |
| 6 | +from polygon import WebSocketClient, STOCKS_CLUSTER, CRYPTO_CLUSTER, FOREX_CLUSTER |
| 7 | + |
| 8 | + |
| 9 | +def my_custom_process_message(messages: List[str]): |
| 10 | + """ |
| 11 | + Custom processing function for incoming streaming messages. |
| 12 | + """ |
| 13 | + def add_message_to_list(message): |
| 14 | + """ |
| 15 | + Simple function that parses dict objects from incoming message. |
| 16 | + """ |
| 17 | + messages.append(ast.literal_eval(message)) |
| 18 | + |
| 19 | + return add_message_to_list |
| 20 | + |
| 21 | + |
| 22 | +def main(waiting_time = seconds): |
| 23 | + """ |
| 24 | + Main function which connects to live stream data, and saves incoming data over |
| 25 | + some pre-determined time in an sqlite database. |
| 26 | + """ |
| 27 | + key = 'YOUR-API-KEY-HERE' |
| 28 | + messages = [] |
| 29 | + #my_client = WebSocketClient(STOCKS_CLUSTER, key, my_custom_process_message(messages)) |
| 30 | + my_client = WebSocketClient(CRYPTO_CLUSTER, key, my_custom_process_message(messages)) |
| 31 | + #my_client = WebSocketClient(FOREX_CLUSTER, key, my_custom_process_message(messages)) |
| 32 | + my_client.run_async() |
| 33 | + |
| 34 | + #my_client.subscribe("T.MSFT", "T.AAPL", "T.AMD", "T.NVDA") # Stock data |
| 35 | + my_client.subscribe("XA.BTC-USD", "XA.ETH-USD", "XA.LTC-USD") # Crypto data |
| 36 | + #my_client.subscribe("C.USD/CNH", "C.USD/EUR") # Forex data |
| 37 | + time.sleep(waiting_time) |
| 38 | + |
| 39 | + my_client.close_connection() |
| 40 | + |
| 41 | + df = pd.DataFrame(messages) |
| 42 | + |
| 43 | + df = df.iloc[5:, 0].to_frame() |
| 44 | + df.columns = ["data"] |
| 45 | + df["data"] = df["data"].astype("str") |
| 46 | + |
| 47 | + df = pd.json_normalize(df["data"].apply(lambda x : dict(eval(x)))) |
| 48 | + |
| 49 | + # export data to sqlite |
| 50 | + with sqlite3.connect("realtime_crypto.sqlite") as conn: |
| 51 | + df.to_sql("data", con=conn, if_exists="append", index=False) |
| 52 | + |
| 53 | + |
| 54 | +if __name__ == "__main__": |
| 55 | + main(waiting_time=60 * 420) |
0 commit comments