-
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathktc_persisting.py
126 lines (104 loc) · 4.51 KB
/
ktc_persisting.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# KTC - Klipper Tool Changer code (v.2)
# Persistent storage of variables for the KTC
#
# Copyright (C) 2024 Andrei Ignat <[email protected]>
#
# This file may be distributed under the terms of the GNU GPLv3 license.
#
# This is based on the Klipper save_variables.py to be able to
# save variables from KTC inddependent of other plugins.
# Using the default save_variables.py will cause conflicts when
# other plugins such as HappyHare tries to reference the file again.
# This will use a different filename to avoid this incompatibility.
#
# This module will also only save the variables when needed and no more.
# This is to avoid excessive writes to the SD card and overhaed on the system.
import os.path, ast, configparser, typing
# Only import these modules in Dev environment. Consult Dev_doc.md for more info.
if typing.TYPE_CHECKING:
from ...klipper.klippy import configfile
from ...klipper.klippy import klippy, reactor
from. import ktc_log
# Constant values moved here to avoid circular imports
KTC_SAVE_VARIABLES_FILENAME = "~/ktc_variables.cfg"
KTC_SAVE_VARIABLES_DELAY = 10
class KtcPersisting:
def __init__(self, config: 'configfile.ConfigWrapper'):
self.printer : 'klippy.Printer' = config.get_printer()
self.reactor: 'reactor.Reactor' = self.printer.get_reactor()
self.log = typing.cast('ktc_log.KtcLog', self.printer.load_object(
config, "ktc_log"
))
self.filename = os.path.expanduser(KTC_SAVE_VARIABLES_FILENAME)
self.content = {}
self.ready_to_save = False
# Set up timer to only save values when needed
# and no more than once every 10 seconds to allow for
# multiple changes and avoid excessive writes
self.timer_save = self.reactor.register_timer(
self._save_changes_timer_event,
self.reactor.monotonic() + (KTC_SAVE_VARIABLES_DELAY),
)
try:
if not os.path.exists(self.filename):
open(self.filename, "w", encoding="utf-8").close()
self.load_content()
except Exception as e:
raise e.with_traceback(e.__traceback__)
# Remove the timer when Klipper shuts down
def disconnect(self):
self.reactor.update_timer(self.timer_save, self.reactor.NEVER)
def load_content(self):
sections = {}
varfile = configparser.ConfigParser()
try:
varfile.read(self.filename)
# Load variables for each section
for section in varfile.sections():
sections[section] = {}
for name, val in varfile.items(section):
sections[section][name] = ast.literal_eval(val)
except Exception as e:
msg = "Unable to parse existing KTC variable file: %s" % (self.filename,)
raise Exception(msg) from e
self.content = sections
def save_variable(self, varname: str, value: str, section: str = "Variables",
force_save: bool = False):
try:
value = ast.literal_eval(value)
except ValueError as e:
raise Exception("Unable to parse '%s' as a literal: %s" % (value, e)) from e
if section not in self.content:
self.content[section] = {}
self.content[section][varname] = value
self.ready_to_save = True
if force_save:
self.force_save()
def force_save(self):
self.ready_to_save = True
self._save_changes_timer_event(self.reactor.monotonic())
def _save_changes_timer_event(self, eventtime):
try:
if self.ready_to_save:
self.ready_to_save = False
# Write file
varfile = configparser.ConfigParser()
for section, variables in sorted(self.content.items()):
varfile.add_section(section)
for name, val in sorted(variables.items()):
varfile.set(section, name, repr(val))
f = open(self.filename, "w", encoding="utf-8")
varfile.write(f)
f.close()
except Exception as e:
self.log.debug("_save_changes_timer_event:Exception: %s" % (str(e)))
raise e.with_traceback(e.__traceback__)
nextwake = eventtime + KTC_SAVE_VARIABLES_DELAY
return nextwake
def get_status(self, eventtime=None): # pylint: disable=unused-argument
status = {
"content": self.content,
}
return status
def load_config(config):
return KtcPersisting(config)