Skip to content

Commit

Permalink
Updated main.py
Browse files Browse the repository at this point in the history
Fixed lots of errors and bad logic
Added error handling for any type of incorrect command usage
Added info command
  • Loading branch information
ThatSINEWAVE authored Mar 7, 2024
1 parent 6e64cd1 commit 03b355a
Showing 1 changed file with 147 additions and 123 deletions.
270 changes: 147 additions & 123 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import sys
import itertools
import os
import queue


def load_config():
Expand All @@ -24,7 +23,7 @@ def load_config():
stop_event = threading.Event()
mode = "all" # Default mode is 'all'
show_timestamps = True
command_queue = queue.Queue()
selected_parts_names = []
welcome_art = r"""
_____ _____ _ _ ______ __ __ __ __ ______ _ _____
/ ____||_ _|| \ | || ____|\ \ / //\\ \ / /| ____|( )/ ____|
Expand All @@ -38,59 +37,36 @@ def load_config():
| | | | | |/ __|| __| / _ \ | '_ ` _ \ | _ / | ___/
| |____ | |_| |\__ \| |_ | (_) || | | | | || | \ \ | |
\_____| \__,_||___/ \__| \___/ |_| |_| |_||_| \_\|_|
"""


def command_input_thread():
while True:
cmd = input("[CustomRP] Enter command: ").strip().lower()
command_queue.put(cmd)
def custom_print(message):
"""Prints a message with a timestamp if enabled."""
timestamp = time.strftime("[%Y-%m-%d %H:%M:%S]", time.localtime())
if show_timestamps:
print(message.replace("{TIME_STAMP}", timestamp))
else:
print(message.replace("{TIME_STAMP} ", ""))


def clear_console():
os.system('cls' if os.name == 'nt' else 'clear')


def animated_countdown(duration, prefix=""):
global stop_event, command_queue
symbols = itertools.cycle(['|', '/', '-', '\\'])
end_time = time.time() + duration

while time.time() < end_time and not stop_event.is_set():
while time.time() < end_time:
sys.stdout.write(f"\r{prefix} {next(symbols)} Remaining: {int(end_time - time.time())}s ")
sys.stdout.flush()

# Check for commands more frequently within the countdown
for _ in range(10): # Split the sleep into smaller intervals for responsiveness
time.sleep(0.01)

# Check for new commands in the command queue
try:
cmd = command_queue.get_nowait()
if cmd == "stop":
clear_console()
print(welcome_art)
print("[CustomRP] Stopping Rich Presence...")
stop_event.set()
return # Exit the countdown early
else:
clear_console()
print(welcome_art)
print("[CustomRP] Rich Presence is active. Only 'stop' command is accepted.")
except queue.Empty:
# Queue is empty, no commands to process
pass

if stop_event.is_set():
return # Check if stop_event has been set to exit early

sys.stdout.write('\r[CustomRP] Done! \n')
time.sleep(0.1)
sys.stdout.write('\rDone! \n')


def update_presence(application_id, message_sets, timer_interval, presence_mode):
global stop_event
global stop_event, selected_parts_names
print("[CustomRP] Attempting to connect to Discord RPC...")
RPC = Presence(client_id=application_id)
try:
Expand All @@ -99,64 +75,62 @@ def update_presence(application_id, message_sets, timer_interval, presence_mode)
except Exception as e:
print(f"[CustomRP] Could not connect to Discord RPC: {e}")
return

# Determine which message sets to use based on mode
if presence_mode == "multi":
active_message_sets = [msg_set for msg_set in message_sets if msg_set.get("name") in selected_parts_names]
elif presence_mode == "single":
active_message_sets = [msg_set for msg_set in message_sets if msg_set.get("name") in selected_parts_names]
else: # "all"
active_message_sets = message_sets

index = 0
animation_symbols = itertools.cycle(['|', '/', '-', '\\'])
while not stop_event.is_set():
clear_console()
print(welcome_art)
# This check ensures we loop through message_sets correctly in 'all' and 'multi' modes
if index >= len(message_sets):
index = 0 # Reset index if out of range

current_set = message_sets[index]
set_name = current_set.get('name', f'Set {index + 1}')
if index >= len(active_message_sets):
index = 0 # Loop back to the first message set

# Constructs kwargs for RPC.update()
update_kwargs = {
"state": current_set["state"]["text"] if "state" in current_set and current_set["state"].get("enabled",
False) else "",
"details": current_set["details"]["text"] if "details" in current_set and current_set["details"].get(
"enabled", False) else "",
"large_image": current_set["large_image"]["key"] if "large_image" in current_set and current_set[
"large_image"].get("enabled", False) else "",
"small_image": current_set["small_image"]["key"] if "small_image" in current_set and current_set[
"small_image"].get("enabled", False) else "",
"large_text": current_set["large_text"]["text"] if "large_text" in current_set and current_set[
"large_text"].get("enabled", False) else "",
"small_text": current_set["small_text"]["text"] if "small_text" in current_set and current_set[
"small_text"].get("enabled", False) else "",
"buttons": [{"label": button["label"], "url": button["url"]} for button in current_set.get("buttons", []) if
button.get("enabled", True)]
}
if "startTimestamp" in current_set and current_set["startTimestamp"].get("enabled", False):
update_kwargs["start"] = current_set["startTimestamp"]["time"]
if "endTimestamp" in current_set and current_set["endTimestamp"].get("enabled", False):
update_kwargs["end"] = current_set["endTimestamp"]["time"]
if "party_size" in current_set and current_set["party_size"].get("enabled", False):
update_kwargs["party_id"] = "party_" + str(index)
update_kwargs["party_size"] = [current_set["party_size"]["current_size"],
current_set["party_size"]["max_size"]]
current_set = active_message_sets[index]
set_name = current_set.get('name', f'Set {index + 1}')

update_kwargs = build_update_kwargs(current_set)
try:
RPC.update(**update_kwargs)
symbol = next(animation_symbols)
print(f"[CustomRP] Current presence: '{set_name}' {symbol}")
# Increment the index for 'all' and 'multi' modes to cycle through message sets
if presence_mode in ["all", "multi"]:
index += 1
custom_print(f"[CustomRP] Current presence: '{set_name}' {symbol}")
animated_countdown(timer_interval, prefix=f"[CustomRP] Next update in:")
index += 1
except Exception as e:
print(f"[CustomRP] Failed to update Discord presence: {e}")
break # Exit the loop on failure to update presence
break

RPC.clear()
RPC.close()
print("[CustomRP] Discord RPC connection closed.")
stop_event.clear()


def build_update_kwargs(current_set):
update_kwargs = {
"state": current_set["state"]["text"] if "state" in current_set and current_set["state"].get("enabled", False) else None,
"details": current_set["details"]["text"] if "details" in current_set and current_set["details"].get("enabled", False) else None,
"start": current_set.get("startTimestamp", {}).get("time") if current_set.get("startTimestamp", {}).get("enabled", False) else None,
"end": current_set.get("endTimestamp", {}).get("time") if current_set.get("endTimestamp", {}).get("enabled", False) else None,
"large_image": current_set.get("large_image", {}).get("key") if current_set.get("large_image", {}).get("enabled", False) else None,
"small_image": current_set.get("small_image", {}).get("key") if current_set.get("small_image", {}).get("enabled", False) else None,
"large_text": current_set.get("large_text", {}).get("text") if current_set.get("large_text", {}).get("enabled", False) else None,
"small_text": current_set.get("small_text", {}).get("text") if current_set.get("small_text", {}).get("enabled", False) else None,
"buttons": [{"label": button["label"], "url": button["url"]} for button in current_set.get("buttons", []) if button.get("enabled", True)] if current_set.get("buttons") else None
}
# Remove keys with None values
return {k: v for k, v in update_kwargs.items() if v is not None}


def handle_command(command):
global config, presence_thread, stop_event, mode # Declare 'mode' here along with other globals
global config, presence_thread, stop_event, mode, selected_parts_names # Declare 'mode' here along with other globals

clear_console() # Clear the console at the beginning of handling any command
print(welcome_art) # Display the welcome art each time a command is handled
Expand All @@ -173,6 +147,7 @@ def handle_command(command):
print("[CustomRP] timer <seconds> - Sets the update interval for the presence.")
print("[CustomRP] appid <application_id> - Updates the application ID.")
print("[CustomRP] help - You are here, this is the help command :)")
print("[CustomRP] info - Information about current setup.")
print("[CustomRP] about - Provides information about this script.")
print("[CustomRP] quit - Exits the script.")

Expand All @@ -189,20 +164,21 @@ def handle_command(command):

stop_event.clear() # Reset the stop event for the new thread

print("[CustomRP] Starting Discord Rich Presence in '{}' mode.".format(mode))
custom_print("[CustomRP] Starting Discord Rich Presence in '{}' mode.".format(mode))
presence_thread = threading.Thread(target=update_presence, args=(
config["application_id"], config["message_sets"], config["timer_interval"], mode))
presence_thread.start()

elif command == "stop":
if presence_thread is not None:
if presence_thread is None or not presence_thread.is_alive():
print("[CustomRP] Cannot stop the Rich Presence because it is not currently running.")
print("[CustomRP] Please type 'help' to view all commands.")
else:
stop_event.set()
if presence_thread.is_alive():
presence_thread.join()
presence_thread.join()
presence_thread = None
print("[CustomRP] Stopping Rich Presence...")
print("[CustomRP] Discord RPC connection closed.")
print("[CustomRP] Enter command: ")
print("[CustomRP] Discord Rich Presence has been stopped.")
print("[CustomRP] Please wait a second for Discord to update.")

elif command_name == "timer":
if len(command_args) == 1:
Expand Down Expand Up @@ -240,54 +216,102 @@ def handle_command(command):
time.sleep(2)
clear_console()
sys.exit(0)

elif command == "info":
print("[CustomRP] General Information:")
print(f"[CustomRP] App ID: {config['application_id']}")
print(f"[CustomRP] Current Mode: {mode.title()}")
if mode != "all":
used_message_sets = ', '.join(selected_parts_names) if selected_parts_names else 'None'
print(f"[CustomRP] Message Sets Used: {used_message_sets}")
else:
print("[CustomRP] Message Sets Used: All")
print(f"[CustomRP] Timer Interval: {config['timer_interval']} seconds")

all_message_sets = ", ".join([msg_set.get('name') for msg_set in config["message_sets"]])
print(f"[CustomRP] All Message Sets in Config: {all_message_sets}")

elif command.startswith("mode"):
mode_args = command.split(maxsplit=1)
if len(mode_args) < 2:
print("[CustomRP] Please select a mode: 'all', 'single', or 'multi'.")
print(
"[CustomRP] Use 'mode all', 'mode single', or type 'mode multi' and enter names of the parts afterwards.")
# Inform the user about the current mode and selected parts if no mode is specified
if len(mode_args) == 1:
current_mode_info = f"[CustomRP] Currently in mode '{mode}'."
if mode == "all":
current_mode_info += " Using all message sets."
elif selected_parts_names:
current_mode_info += f" Using message sets: {', '.join(selected_parts_names)}."
else:
current_mode_info += " No specific message sets selected."
print(current_mode_info)
print("[CustomRP] Please specify a mode: 'all', 'single', or 'multi'.")
return
new_mode = mode_args[1]
if new_mode not in ["all", "single", "multi"]:
print("[CustomRP] Invalid mode specified. Use 'all', 'single', or 'multi'.")
return
_, selected_mode = mode_args
if selected_mode == "all" or selected_mode == "single":
mode = selected_mode
print(f"[CustomRP] Mode set to {mode}.")
if mode == "single":
message_name = input("[CustomRP] Enter the name of the part you want to use: ").strip()
selected_sets = [part for part in config["message_sets"] if part.get("name") == message_name]
if selected_sets:
config["message_sets"] = selected_sets
print(f"[CustomRP] Mode set to single, using part: {message_name}")
else:
print("[CustomRP] Specified part not found in config.json.")
elif selected_mode == "multi":
names_input = input(
"[CustomRP] Enter the names of the parts you want to use, separated by a comma: ").strip()
names = [name.strip() for name in names_input.split(',')]
selected_sets = [part for part in config["message_sets"] if part.get("name") in names]
if selected_sets:
config["message_sets"] = selected_sets
mode = "multi"
print(f"[CustomRP] Mode set to multi, using parts: {', '.join(names)}")
if new_mode == "all":
mode = "all"
selected_parts_names.clear()
print("[CustomRP] Mode set to 'all'. Using all message sets.")
else:
# Display available parts from config before asking for user input
available_parts = [msg_set.get("name") for msg_set in config["message_sets"]]
print("[CustomRP] Available message sets: " + ", ".join(available_parts))
if selected_parts_names:
print(f"[CustomRP] Previously selected message sets: {', '.join(selected_parts_names)}")
selected_names_input = input(
"[CustomRP] Enter the names of the message sets you want to use, separated by a comma: ").strip()
selected_parts = [name.strip() for name in selected_names_input.split(',')]
valid_parts, invalid_parts = verify_parts(selected_parts, config["message_sets"])
if invalid_parts:
print(f"[CustomRP] The following message sets do not exist: {', '.join(invalid_parts)}.")
print("[CustomRP] Please enter valid part names.")
return
if new_mode == "single" and len(valid_parts) > 1:
print("[CustomRP] 'single' mode allows only one part. Please enter only one valid part name.")
return
else:
print("[CustomRP] Specified parts not found in config.json.")
selected_parts_names = valid_parts if new_mode == "multi" else valid_parts[:1]
mode = new_mode
print(f"[CustomRP] Mode set to '{mode}'.")
if selected_parts_names:
print(f"[CustomRP] Using message sets: {', '.join(selected_parts_names)}.")
else:
print(f"[CustomRP] '{command_name}' is not a known command. Please try again.")
print("[CustomRP] Type 'help' for a list of commands.")
print("[CustomRP] Unknown command. Type 'help' for a list of commands.")


def verify_parts(parts_list, message_sets):
valid_parts = []
invalid_parts = []
available_part_names = [msg_set.get("name") for msg_set in message_sets]

for part in parts_list:
if part in available_part_names:
valid_parts.append(part)
else:
invalid_parts.append(part)

return valid_parts, invalid_parts


if __name__ == "__main__":
clear_console()
print(welcome_art)
print("[CustomRP] Welcome to ThatSINEWAVE's custom rich presence script!")
print("[CustomRP] To get started, please type 'help'")

# Start the command input thread
input_thread = threading.Thread(target=command_input_thread, daemon=True)
input_thread.start()

while True:
if not command_queue.empty():
user_input = command_queue.get()
try:
clear_console()
print(welcome_art)
print("[CustomRP] Welcome to ThatSINEWAVE's custom rich presence script!")
print("[CustomRP] To get started, please type 'help'")

while True:
user_input = input("[CustomRP] Enter command: ").strip().lower()
handle_command(user_input)
else:
time.sleep(0.1) # Adjust as necessary for responsiveness vs CPU usage
except KeyboardInterrupt:
clear_console()
print(welcome_art)
print("\n[CustomRP] Interrupt received, shutting down...")
if presence_thread and presence_thread.is_alive():
stop_event.set()
presence_thread.join()
print("[CustomRP] Cleaned up and exiting. Bye!")
time.sleep(2)
clear_console()
sys.exit(0)

0 comments on commit 03b355a

Please sign in to comment.