-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathws.py
94 lines (87 loc) · 3.04 KB
/
ws.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# -*- coding: utf-8 -*-
import hashlib, json, datetime, subprocess, ssl, socket
import pandas as pd
from websocket import create_connection, WebSocketTimeoutException
from config import *
class ConnectionTimedOutException(Exception):
def __init__(self, message):
self.message = message
def __str__(self):
return self.message
class AlgoLabSocket():
def __init__(self, api_key, hash, verbose=True, callback=None):
"""
:String api_key: API_KEY
:String hash: LoginUser'dan dönen Hash kodu
:String type: T: Tick Paketi (Fiyat), D: Depth Paketi (Derinlik), O: Emir Statüsü
:Obj type: callback: Soketin veriyi göndereceği fonksiyon
"""
self.verbose = verbose
self.callback = callback
self.connected = False
self.arbitraj = {}
self.thread_running = False
self.kurum = {}
self.hisse = {}
self.df = pd.DataFrame(columns=["Date", "Hisse", "Yon", "Fiyat", "Lot", "Deger", "Usd", "Alici", "Satici"])
self.usdtry = 0.0
self.ws = None
self.api_key = api_key
self.hash = hash
self.data = self.api_key + api_hostname + "/ws"
self.checker = hashlib.sha256(self.data.encode('utf-8')).hexdigest()
self.request_time = datetime.datetime.now()
self.headers = {
"APIKEY": self.api_key,
"Authorization": self.hash,
"Checker": self.checker
}
def load_ciphers(self):
output = subprocess.run(["openssl", "ciphers"], capture_output=True).stdout
output_str = output.decode("utf-8")
ciphers = output_str.strip().split("\n")
return ciphers[0]
def close(self):
self.connected = False
self.ws = None
def connect(self):
if self.verbose:
print("Socket bağlantisi kuruluyor...")
context = ssl.create_default_context()
context.set_ciphers("DEFAULT")
try:
sock = socket.create_connection((hostname, 443))
ssock = context.wrap_socket(sock, server_hostname=hostname)
self.ws = create_connection(socket_url, socket=ssock, header=self.headers)
self.connected = True
except Exception as e:
self.close()
print(f"Socket Hatasi: {e}")
return False
if self.verbose and self.connected:
print("Socket bağlantisi başarili.")
return self.connected
def recv(self):
try:
data = self.ws.recv()
except WebSocketTimeoutException:
data = ""
except Exception as e:
print("Recv Error:", e)
data = None
self.close()
return data
def send(self, d):
"""
:param d: Dict
"""
try:
data = {"token": self.hash}
for s in d:
data[s] = d[s]
resp = self.ws.send(json.dumps(data))
except Exception as e:
print("Send Error:", e)
resp = None
self.close()
return resp