forked from kizniche/Mycodo
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmethod.py
298 lines (258 loc) · 12.9 KB
/
method.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
# coding=utf-8
import datetime
from math import sin, radians
from mycodo.config import SQL_DATABASE_MYCODO
from mycodo.databases.utils import session_scope
from mycodo.utils.database import db_retrieve_table_daemon
MYCODO_DB_PATH = 'sqlite:///' + SQL_DATABASE_MYCODO
def bezier_curve_y_out(shift_angle, P0, P1, P2, P3, second_of_day=None):
"""
For a cubic Bezier segment described by the 2-tuples P0, ..., P3, return
the y-value associated with the given x-value.
Ex: getYfromXforBezSegment((10,0), (5,-5), (5,5), (0,0), 3.2)
"""
try:
import numpy as np
except ImportError:
np = None
if not np:
return 0
seconds_per_day = 24*60*60
# Check if the second of the day is provided.
# If provided, calculate y of the Bezier curve with that x
# Otherwise, use the current second of the day
if second_of_day is None:
now = datetime.datetime.now()
dt = datetime.timedelta(hours=now.hour,
minutes=now.minute,
seconds=now.second)
seconds = dt.total_seconds()
else:
seconds = second_of_day
# Shift the entire graph using 0 - 360 to determine the degree
if shift_angle:
percent_angle = shift_angle/360
angle_seconds = percent_angle*seconds_per_day
if seconds+angle_seconds > seconds_per_day:
seconds_shifted = seconds+angle_seconds-seconds_per_day
else:
seconds_shifted = seconds+angle_seconds
percent_of_day = seconds_shifted/seconds_per_day
else:
percent_of_day = seconds/seconds_per_day
x = percent_of_day*(P0[0]-P3[0])
# First, get the t-value associated with x-value, where t is the
# parameterization of the Bezier curve and ranges from 0 to 1.
# We need the coefficients of the polynomial describing cubic Bezier
# (cubic polynomial in t)
coefficients = [-P0[0] + 3*P1[0] - 3*P2[0] + P3[0],
3*P0[0] - 6*P1[0] + 3*P2[0],
-3*P0[0] + 3*P1[0],
P0[0] - x]
# Find roots of the polynomial to determine the parameter t
roots = np.roots(coefficients)
# Find the root which is between 0 and 1, and is also real
correct_root = None
for root in roots:
if np.isreal(root) and 0 <= root <= 1:
correct_root = root
# Check a valid root was found
if correct_root is None:
print('Error, no valid root found. Are you sure your Bezier curve '
'represents a valid function when projected into the xy-plane?')
return 0
param_t = correct_root
# From the value for the t parameter, find the corresponding y-value
# using the formula for cubic Bezier curves
y = (1-param_t)**3*P0[1] + 3*(1-param_t)**2*param_t*P1[1] + 3*(1-param_t)*param_t**2*P2[1] + param_t**3*P3[1]
assert np.isreal(y)
# Typecast y from np.complex128 to float64
y = y.real
return y
def calculate_method_setpoint(method_id, table, this_controller, Method, MethodData, logger):
"""
Calculates the setpoint from a method
:param method_id: ID of Method to be used
:param table: Table of the this_controller using this function
:param this_controller: The this_controller using this function
:param Method: Method SQL table
:param MethodData: MethodData SQL table
:param logger: The logger to use
:return: 0 (success) or 1 (error) and a setpoint value
"""
method = db_retrieve_table_daemon(Method)
method_key = method.filter(Method.unique_id == method_id).first()
method_data = db_retrieve_table_daemon(MethodData)
method_data = method_data.filter(MethodData.method_id == method_id)
method_data_all = method_data.filter(MethodData.output_id.is_(None)).all()
method_data_first = method_data.filter(MethodData.output_id.is_(None)).first()
now = datetime.datetime.now()
# Calculate where the current time/date is within the time/date method
if method_key.method_type == 'Date':
for each_method in method_data_all:
start_time = datetime.datetime.strptime(each_method.time_start, '%Y-%m-%d %H:%M:%S')
end_time = datetime.datetime.strptime(each_method.time_end, '%Y-%m-%d %H:%M:%S')
if start_time < now < end_time:
setpoint_start = each_method.setpoint_start
if each_method.setpoint_end:
setpoint_end = each_method.setpoint_end
else:
setpoint_end = each_method.setpoint_start
setpoint_diff = abs(setpoint_end - setpoint_start)
total_seconds = (end_time - start_time).total_seconds()
part_seconds = (now - start_time).total_seconds()
percent_total = part_seconds / total_seconds
if setpoint_start < setpoint_end:
new_setpoint = setpoint_start + (setpoint_diff * percent_total)
else:
new_setpoint = setpoint_start - (setpoint_diff * percent_total)
logger.debug("[Method] Start: {start} End: {end}".format(
start=start_time, end=end_time))
logger.debug("[Method] Start: {start} End: {end}".format(
start=setpoint_start, end=setpoint_end))
logger.debug("[Method] Total: {tot} Part total: {par} ({per}%)".format(
tot=total_seconds, par=part_seconds, per=percent_total))
logger.debug("[Method] New Setpoint: {sp}".format(
sp=new_setpoint))
return new_setpoint, False
# Calculate where the current Hour:Minute:Seconds is within the Daily method
elif method_key.method_type == 'Daily':
daily_now = datetime.datetime.now().strftime('%H:%M:%S')
daily_now = datetime.datetime.strptime(str(daily_now), '%H:%M:%S')
for each_method in method_data_all:
start_time = datetime.datetime.strptime(each_method.time_start, '%H:%M:%S')
end_time = datetime.datetime.strptime(each_method.time_end, '%H:%M:%S')
if start_time < daily_now < end_time:
setpoint_start = each_method.setpoint_start
if each_method.setpoint_end:
setpoint_end = each_method.setpoint_end
else:
setpoint_end = each_method.setpoint_start
setpoint_diff = abs(setpoint_end-setpoint_start)
total_seconds = (end_time-start_time).total_seconds()
part_seconds = (daily_now-start_time).total_seconds()
percent_total = part_seconds/total_seconds
if setpoint_start < setpoint_end:
new_setpoint = setpoint_start+(setpoint_diff*percent_total)
else:
new_setpoint = setpoint_start-(setpoint_diff*percent_total)
logger.debug("[Method] Start: {start} End: {end}".format(
start=start_time.strftime('%H:%M:%S'),
end=end_time.strftime('%H:%M:%S')))
logger.debug("[Method] Start: {start} End: {end}".format(
start=setpoint_start, end=setpoint_end))
logger.debug("[Method] Total: {tot} Part total: {par} ({per}%)".format(
tot=total_seconds, par=part_seconds, per=percent_total))
logger.debug("[Method] New Setpoint: {sp}".format(
sp=new_setpoint))
return new_setpoint, False
# Calculate sine y-axis value from the x-axis (seconds of the day)
elif method_key.method_type == 'DailySine':
new_setpoint = sine_wave_y_out(method_data_first.amplitude,
method_data_first.frequency,
method_data_first.shift_angle,
method_data_first.shift_y)
return new_setpoint, False
# Calculate Bezier curve y-axis value from the x-axis (seconds of the day)
elif method_key.method_type == 'DailyBezier':
new_setpoint = bezier_curve_y_out(
method_data_first.shift_angle,
(method_data_first.x0, method_data_first.y0),
(method_data_first.x1, method_data_first.y1),
(method_data_first.x2, method_data_first.y2),
(method_data_first.x3, method_data_first.y3))
return new_setpoint, False
# Calculate the duration in the method based on self.method_start_time
elif method_key.method_type == 'Duration':
start_time = datetime.datetime.strptime(
str(this_controller.method_start_time), '%Y-%m-%d %H:%M:%S.%f')
ended = False
# Check if method_end_time is not None
if this_controller.method_end_time:
# Convert time string to datetime object
end_time = datetime.datetime.strptime(
str(this_controller.method_end_time), '%Y-%m-%d %H:%M:%S.%f')
if now > start_time:
ended = True
seconds_from_start = (now - start_time).total_seconds()
total_sec = 0
previous_total_sec = 0
previous_end = None
method_restart = False
for each_method in method_data_all:
# If duration_sec is 0, method has instruction to restart
if each_method.duration_sec == 0:
method_restart = True
else:
previous_end = each_method.setpoint_end
total_sec += each_method.duration_sec
if previous_total_sec <= seconds_from_start < total_sec:
row_start_time = float(start_time.strftime('%s')) + previous_total_sec
row_since_start_sec = (now - (start_time + datetime.timedelta(0, previous_total_sec))).total_seconds()
percent_row = row_since_start_sec / each_method.duration_sec
setpoint_start = each_method.setpoint_start
if each_method.setpoint_end:
setpoint_end = each_method.setpoint_end
else:
setpoint_end = each_method.setpoint_start
setpoint_diff = abs(setpoint_end - setpoint_start)
if setpoint_start < setpoint_end:
new_setpoint = setpoint_start + (setpoint_diff * percent_row)
else:
new_setpoint = setpoint_start - (setpoint_diff * percent_row)
logger.debug(
"[Method] Start: {start} Seconds Since: {sec}".format(
start=start_time, sec=seconds_from_start))
logger.debug(
"[Method] Start time of row: {start}".format(
start=datetime.datetime.fromtimestamp(row_start_time)))
logger.debug(
"[Method] Sec since start of row: {sec}".format(
sec=row_since_start_sec))
logger.debug(
"[Method] Percent of row: {per}".format(
per=percent_row))
logger.debug(
"[Method] New Setpoint: {sp}".format(
sp=new_setpoint))
return new_setpoint, False
previous_total_sec = total_sec
if this_controller.method_start_time:
if method_restart:
if end_time and now > end_time:
ended = True
else:
# Method has been instructed to restart
with session_scope(MYCODO_DB_PATH) as db_session:
mod_method = db_session.query(table)
mod_method = mod_method.filter(
table.unique_id == this_controller.unique_id).first()
mod_method.method_start_time = datetime.datetime.now()
db_session.commit()
return previous_end, False
else:
ended = True
if ended:
# Duration method has ended, reset method_start_time locally and in DB
with session_scope(MYCODO_DB_PATH) as db_session:
mod_method = db_session.query(table).filter(
table.unique_id == this_controller.unique_id).first()
mod_method.method_start_time = 'Ended'
mod_method.method_end_time = None
db_session.commit()
return None, True
# Setpoint not needing to be calculated, use default setpoint
return None, False
def sine_wave_y_out(amplitude, frequency, shift_angle,
shift_y, angle_in=None):
if angle_in is None:
now = datetime.datetime.now()
dt = datetime.timedelta(hours=now.hour,
minutes=now.minute,
seconds=now.second)
secs_per_day = 24 * 60 * 60
angle = dt.total_seconds() / secs_per_day * 360
else:
angle = angle_in
y = (amplitude * sin(radians(frequency * (angle - shift_angle)))) + shift_y
return y