-
Notifications
You must be signed in to change notification settings - Fork 163
/
Copy pathlisten_new_direct.py
119 lines (101 loc) · 4.5 KB
/
listen_new_direct.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import asyncio
import json
import websockets
import base58
import base64
import struct
import sys
import os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from config import WSS_ENDPOINT, PUMP_PROGRAM
# Load the IDL JSON file
with open('../idl/pump_fun_idl.json', 'r') as f:
idl = json.load(f)
# Extract the "create" instruction definition
create_instruction = next(instr for instr in idl['instructions'] if instr['name'] == 'create')
def parse_create_instruction(data):
if len(data) < 8:
return None
offset = 8
parsed_data = {}
# Parse fields based on CreateEvent structure
fields = [
('name', 'string'),
('symbol', 'string'),
('uri', 'string'),
('mint', 'publicKey'),
('bondingCurve', 'publicKey'),
('user', 'publicKey'),
]
try:
for field_name, field_type in fields:
if field_type == 'string':
length = struct.unpack('<I', data[offset:offset+4])[0]
offset += 4
value = data[offset:offset+length].decode('utf-8')
offset += length
elif field_type == 'publicKey':
value = base58.b58encode(data[offset:offset+32]).decode('utf-8')
offset += 32
parsed_data[field_name] = value
return parsed_data
except:
return None
def print_transaction_details(log_data):
print(f"Signature: {log_data.get('signature')}")
for log in log_data.get('logs', []):
if log.startswith("Program data:"):
try:
data = base58.b58decode(log.split(": ")[1]).decode('utf-8')
print(f"Data: {data}")
except:
pass
async def listen_for_new_tokens():
while True:
try:
async with websockets.connect(WSS_ENDPOINT) as websocket:
subscription_message = json.dumps({
"jsonrpc": "2.0",
"id": 1,
"method": "logsSubscribe",
"params": [
{"mentions": [str(PUMP_PROGRAM)]},
{"commitment": "processed"}
]
})
await websocket.send(subscription_message)
print(f"Listening for new token creations from program: {PUMP_PROGRAM}")
# Wait for subscription confirmation
response = await websocket.recv()
print(f"Subscription response: {response}")
while True:
try:
response = await websocket.recv()
data = json.loads(response)
if 'method' in data and data['method'] == 'logsNotification':
log_data = data['params']['result']['value']
logs = log_data.get('logs', [])
if any("Program log: Instruction: Create" in log for log in logs):
for log in logs:
if "Program data:" in log:
try:
encoded_data = log.split(": ")[1]
decoded_data = base64.b64decode(encoded_data)
parsed_data = parse_create_instruction(decoded_data)
if parsed_data and 'name' in parsed_data:
print("Signature:", log_data.get('signature'))
for key, value in parsed_data.items():
print(f"{key}: {value}")
print("##########################################################################################")
except Exception as e:
print(f"Failed to decode: {log}")
print(f"Error: {str(e)}")
except Exception as e:
print(f"An error occurred while processing message: {e}")
break
except Exception as e:
print(f"Connection error: {e}")
print("Reconnecting in 5 seconds...")
await asyncio.sleep(5)
if __name__ == "__main__":
asyncio.run(listen_for_new_tokens())