-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathupdate_server.py
442 lines (363 loc) · 14.3 KB
/
update_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
import socket
import psycopg2
from psycopg2 import sql
from collections import deque
import threading
import time
import select
import signal
import json
# Update Server details
HOST_OWN_SERVER = '0.0.0.0' # Bind to all interfaces
PORT_OWN_SERVER = 11111 # Port to listen on
# Replica Server details (replica server)
HOST_ANOTHER_SERVER = '192.168.12.224' # Change to Server B's address
PORT_ANOTHER_SERVER = 22222 # Change to Server B's port
# Main Server details
HOST_MAIN_SERVER = '192.168.12.140'
PORT_MAIN_SERVER = 12347
# Global variables
shutdown_flag = False
server_socket = None
replica_node_status = "RUNNING"
missing_queries = deque([])
missing_query_manager = None
# PostgreSQL connection details
DB_NAME = "autonomous_car_database_0"
DB_USER = "postgres"
DB_PASSWORD = "coriander0714"
DB_HOST = "localhost"
DB_PORT = "5432"
# TABLE_NAME = "test1_table"
# Log file name
LOG_FILE = "sql_log.txt"
LOG_DIFF_FILE = "sql_log_diff.txt"
# check this server is update server or replica server
is_update_server = True
update_server_host = HOST_OWN_SERVER
update_server_port = PORT_OWN_SERVER
replica_server_host = HOST_ANOTHER_SERVER
replica_server_port = PORT_ANOTHER_SERVER
# --- Database Functions ---
def connect_postgres(dbname="postgres"):
"""Connect to PostgreSQL using psycopg2 with the specified database."""
try:
connection = psycopg2.connect(user=DB_USER,
password=DB_PASSWORD,
host=DB_HOST,
dbname=dbname,
port=DB_PORT)
connection.autocommit = True
print(f"Connected to PostgreSQL database '{dbname}'.")
return connection
except Exception as error:
print(f"Failed to connect to PostgreSQL: {error}")
return None
def create_database_if_not_exists():
"""Create the database if it doesn't already exist."""
connection = connect_postgres()
if connection is None:
return
try:
cursor = connection.cursor()
cursor.execute(
f"SELECT 1 FROM pg_database WHERE datname = '{DB_NAME}';")
if not cursor.fetchone():
cursor.execute(f"CREATE DATABASE {DB_NAME};")
print(f"Database '{DB_NAME}' created successfully.")
else:
print(f"Database '{DB_NAME}' already exists.")
except Exception as error:
print(f"Failed to create database: {error}")
finally:
if connection:
cursor.close()
connection.close()
def delete_database(database_name):
"""
Drops the specified database if it exists.
"""
try:
# Connect to the default database (usually "postgres")
connection = psycopg2.connect(
user=DB_USER,
password=DB_PASSWORD,
host=DB_HOST,
port=DB_PORT,
dbname=
"postgres" # Connect to a database other than the one you want to drop
)
connection.autocommit = True # Enable auto-commit for DROP DATABASE
cursor = connection.cursor()
# Check if the database exists and drop it
drop_query = f"DROP DATABASE IF EXISTS {database_name};"
cursor.execute(drop_query)
print(f"Database '{database_name}' has been dropped (if it existed).")
except Exception as e:
print(f"Error dropping database '{database_name}': {e}")
finally:
if connection:
cursor.close()
connection.close()
def create_table():
"""Create the 'test1_table' table in the database if it doesn't exist."""
try:
conn = connect_postgres(DB_NAME)
cursor = conn.cursor()
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {TABLE_NAME} (
id SERIAL PRIMARY KEY,
message TEXT NOT NULL,
received_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
"""
cursor.execute(create_table_query)
print(f"Table '{TABLE_NAME}' is ready.")
except Exception as error:
print(f"Failed to create table: {error}")
finally:
if conn:
cursor.close()
conn.close()
def execute_sql_message(sql_message):
"""Execute the SQL message in the database."""
try:
# Connect to the specified PostgreSQL database
conn = connect_postgres(DB_NAME)
cursor = conn.cursor()
# Execute the provided SQL command
cursor.execute(sql_message)
conn.commit()
# If execution is successful, return success message with the SQL command
if sql_message.startswith("SELECT"):
res = cursor.fetchall()
res_string = "\n".join([
", ".join(f"{desc[0]}: {str(value)}"
for desc, value in zip(cursor.description, row))
for row in res
])
return res_string
return f"Ack: {sql_message}"
except Exception as error:
print(f"Error executing SQL: {error}")
# If execution fails, return failure message with the SQL command
return f"Fail."
finally:
# Ensure resources are cleaned up: close the cursor and the connection
if conn:
cursor.close()
conn.close()
# --- Logging Functions ---
def write_log_to_file(sql_message, file):
"""Write the SQL operation to the log file."""
try:
with open(file, "a") as log_file:
log_file.write(sql_message + "\n")
print("SQL operation logged successfully.")
except Exception as error:
print(f"Failed to write to log file: {error}")
# --- Replica Sync Functions ---
def check_replica_node_status():
while not shutdown_flag:
try:
replica_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
replica_socket.connect((HOST_ANOTHER_SERVER, PORT_ANOTHER_SERVER))
replica_socket.send("TEST".encode())
# Wait for acknowledgment
acknowledgment = replica_socket.recv(1024).decode()
replica_socket.close()
print(
f"Reconnected to Server B and received acknowledgment from Server B: {acknowledgment}"
)
break
except Exception as error:
print(f"Failed to sync with Server B: {error}")
time.sleep(5)
def sync_missing_queries():
global missing_queries
while missing_queries:
sql_message = missing_queries[0]
print(f"Resyncing with Server B: {sql_message}")
try:
replica_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
replica_socket.connect((HOST_ANOTHER_SERVER, PORT_ANOTHER_SERVER))
replica_socket.send(sql_message.encode())
# Wait for acknowledgment
acknowledgment = replica_socket.recv(1024).decode()
replica_socket.close()
# Remove the query from the queue
missing_queries.popleft()
print(
f"Received resync acknowledgment from Server B: {acknowledgment}"
)
except Exception as error:
print(f"Failed to resync with Server B: {error}")
break
def manage_missing_queries():
global missing_queries, replica_node_status
while replica_node_status == "DOWN":
# Check the status of the replica node
check_replica_node_status()
with open(LOG_DIFF_FILE, "r") as log_file:
lines = log_file.readlines()
for line in lines:
missing_queries.append(line.strip())
replica_node_status = "RECOVERING"
# resync missing queries with server B
sync_missing_queries()
try:
with open(LOG_DIFF_FILE, "w") as log_file:
for sql_message in missing_queries:
log_file.write(sql_message + "\n")
print("SQL operation diff logged successfully.")
except Exception as error:
print(f"Failed to write to log file: {error}")
# check if all queries are synced
if not missing_queries:
replica_node_status = "RUNNING"
print("All missing queries synced with Server B.")
else:
replica_node_status = "DOWN"
missing_queries.clear()
def sync_with_replica_server(sql_message):
"""Send the SQL message to replica server B and wait for acknowledgment."""
global missing_queries, replica_node_status, missing_query_manager
if replica_node_status == "DOWN":
write_log_to_file(sql_message, LOG_DIFF_FILE)
print(f"Server B is down.")
return "DOWN"
elif replica_node_status == "RECOVERING":
missing_queries.append(sql_message)
print("Recovering with Server B...")
return "RECOVERING"
else:
try:
replica_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
replica_socket.connect((HOST_ANOTHER_SERVER, PORT_ANOTHER_SERVER))
replica_socket.send(sql_message.encode())
# Wait for acknowledgment
acknowledgment = replica_socket.recv(1024).decode()
replica_socket.close()
print(f"Received acknowledgment from Server B: {acknowledgment}")
replica_node_status = "RUNNING"
return acknowledgment
except Exception as error:
replica_node_status = "DOWN"
write_log_to_file(sql_message, LOG_DIFF_FILE)
missing_query_manager = threading.Thread(
target=manage_missing_queries)
missing_query_manager.start()
print(f"Failed to sync with Server B: {error}")
return f"Failed: {error}"
# --- SQL Message Processing ---
def process_sql_message(sql_message):
"""
Process the SQL message:
- Log to the local file
- Execute the SQL message
- Sync with Server B
- Return response to the client
"""
# Step 1: Execute in local database
response = execute_sql_message(sql_message)
if not response.startswith("Fail"):
# Step 2: Write to log file
write_log_to_file(sql_message, LOG_FILE)
return response
def handle_sigint(signum, frame):
global shutdown_flag, server_socket
print("Received SIGINT. Shutting down...")
shutdown_flag = True
if server_socket:
print("Closing server socket and releasing port...")
server_socket.close()
def start_server():
create_database_if_not_exists()
global shutdown_flag
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setblocking(False)
server_socket.bind((HOST_OWN_SERVER, PORT_OWN_SERVER))
server_socket.listen(5)
print(f"Server listening on {server_socket.getsockname()}")
poller = select.poll()
poller.register(server_socket, select.POLLIN)
# fd_to_socket = {server_socket.fileno(): server_socket}
signal.signal(signal.SIGINT, handle_sigint)
registered_sockets = {}
try:
while not shutdown_flag:
events = poller.poll(1)
for fileno, event in events:
if fileno == server_socket.fileno():
client_socket, client_address = server_socket.accept()
print(f"Connection from {client_address}")
client_socket.setblocking(False)
poller.register(client_socket, select.POLLIN)
registered_sockets[client_socket] = True
elif event == select.POLLIN:
print("Ready to handle client request")
client_socket = socket.fromfd(fileno, socket.AF_INET,
socket.SOCK_STREAM)
handle_client_request(client_socket)
# Safe unregistration and cleanup
if client_socket in registered_sockets and registered_sockets[
client_socket]:
poller.unregister(client_socket)
registered_sockets[
client_socket] = False # Mark as unregistered
client_socket.close()
except KeyboardInterrupt:
print("Shutting down server.")
finally:
server_socket.close()
def handle_client_request(sock):
"""
Handle a single client request:
- Receive SQL message from the client.
- Process the SQL message (execution and logging).
- Sync with the replica server if the request is from the main server.
- Send a response back to the client who made the request.
"""
try:
# Receive data from the client
data = sock.recv(1024).decode()
print(data)
if not data:
raise ConnectionResetError("Client disconnected")
if data.startswith("Fail") or data.startswith(
"Ack") or data.startswith("TEST"):
print(f"Received Message: {data}")
sock.send(data.encode())
else:
print(f"Received Query: {data}")
# Get client's IP and port
client_ip, client_port = sock.getpeername()
print(f"Request from IP: {client_ip}, Port: {client_port}")
# Process the SQL message
response = process_sql_message(data)
# Check if the request is from the main server
if client_ip == HOST_MAIN_SERVER and not data.startswith("SELECT"):
print(
f"Request from main server ({client_ip}), performing sync..."
)
# Sync with the replica server and include sync status in the response
sync_status = sync_with_replica_server(data)
response += f" | Sync: {sync_status}"
else:
print(
f"Request from other client ({client_ip}), no sync needed."
)
# Send the response back to the requesting client
print("\n====================================\n")
sock.send(response.encode())
except Exception as e:
print(f"Error handling client request: {e}")
# Send error message back to the client
error_response = f"Error: {str(e)}"
sock.send(error_response.encode())
finally:
# Always close the socket after handling the request
sock.close()
# --- Main Execution ---
if __name__ == "__main__":
start_server()