forked from Screenly/Anthias
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathscheduler_test.py
173 lines (141 loc) · 5.26 KB
/
scheduler_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
from datetime import datetime
from datetime import timedelta
import functools
import unittest
import viewer
from lib import db
from lib import assets_helper
import settings
import os
asset_x = {
'mimetype': u'web',
'asset_id': u'4c8dbce552edb5812d3a866cfe5f159d',
'name': u'WireLoad',
'uri': u'http://www.wireload.net',
'start_date': datetime.now() - timedelta(days=3),
'end_date': datetime.now() + timedelta(days=3),
'duration': u'5',
'is_enabled': 1,
'nocache': 0,
'is_processing': 0,
'play_order': 1,
'skip_asset_check': 0
}
asset_x_diff = {
'duration': u'10'
}
asset_y = {
'mimetype': u'image',
'asset_id': u'7e978f8c1204a6f70770a1eb54a76e9b',
'name': u'Google',
'uri': u'https://www.google.com/images/srpr/logo3w.png',
'start_date': datetime.now() - timedelta(days=1),
'end_date': datetime.now() + timedelta(days=2),
'duration': u'6',
'is_enabled': 1,
'nocache': 0,
'is_processing': 0,
'play_order': 0,
'skip_asset_check': 0
}
asset_z = {
'mimetype': u'image',
'asset_id': u'7e978f8c1204a6f70770a1eb54a76e9c',
'name': u'Google',
'uri': u'https://www.google.com/images/srpr/logo3w.png',
'start_date': datetime.now() - timedelta(days=1),
'end_date': datetime.now() + timedelta(days=1),
'duration': u'6',
'is_enabled': 1,
'nocache': 0,
'is_processing': 0,
'play_order': 2,
'skip_asset_check': 0
}
asset_tomorrow = {
'mimetype': u'image',
'asset_id': u'7e978f8c1204a6f70770a1eb54a76e9c',
'name': u'Google',
'uri': u'https://www.google.com/images/srpr/logo3w.png',
'start_date': datetime.now() + timedelta(days=1),
'end_date': datetime.now() + timedelta(days=1),
'duration': u'6',
'is_enabled': 1,
'nocache': 0,
'is_processing': 0,
'play_order': 2,
'skip_asset_check': 0
}
FAKE_DB_PATH = '/tmp/fakedb'
class FakeDatetime:
def __init__(self, need_time):
self.need_time = need_time
def utcnow(self):
return self.need_time
def now(self):
return self.need_time
class SchedulerTest(unittest.TestCase):
def setUp(self):
self.old_db_path = settings.settings['database']
viewer.db_conn = db.conn(':memory:')
with db.commit(viewer.db_conn) as cursor:
cursor.execute(assets_helper.create_assets_table)
def tearDown(self):
settings.settings['database'] = self.old_db_path
settings.settings['shuffle_playlist'] = False
viewer.datetime, assets_helper.get_time = datetime, lambda: datetime.utcnow()
viewer.db_conn.close()
try:
os.remove(FAKE_DB_PATH)
except:
pass
def test_generate_asset_list_assets_should_be_y_and_x(self):
assets_helper.create_multiple(viewer.db_conn, [asset_x, asset_y])
assets, _ = viewer.generate_asset_list()
self.assertEqual(assets, [asset_y, asset_x])
def test_generate_asset_list_check_deadline_if_both_active(self):
# if x and y currently active
assets_helper.create_multiple(viewer.db_conn, [asset_x, asset_y])
_, deadline = viewer.generate_asset_list()
self.assertEqual(deadline, asset_y['end_date'])
def test_generate_asset_list_check_deadline_if_asset_scheduled(self):
"""If asset_x is active and asset_x[end_date] == (now + 3) and asset_tomorrow will be active tomorrow then
deadline should be asset_tomorrow[start_date]
"""
assets_helper.create_multiple(viewer.db_conn, [asset_x, asset_tomorrow])
_, deadline = viewer.generate_asset_list()
self.assertEqual(deadline, asset_tomorrow['start_date'])
def test_get_next_asset_should_be_y_and_x(self):
assets_helper.create_multiple(viewer.db_conn, [asset_x, asset_y])
sch = viewer.Scheduler()
expect_y = sch.get_next_asset()
expect_x = sch.get_next_asset()
self.assertEqual([expect_y, expect_x], [asset_y, asset_x])
def test_keep_same_position_on_playlist_update(self):
assets_helper.create_multiple(viewer.db_conn, [asset_x, asset_y])
sch = viewer.Scheduler()
sch.get_next_asset()
assets_helper.create(viewer.db_conn, asset_z)
sch.update_playlist()
self.assertEqual(sch.index, 1)
def test_counter_should_increment_after_full_asset_loop(self):
settings.settings['shuffle_playlist'] = True
assets_helper.create_multiple(viewer.db_conn, [asset_x, asset_y])
sch = viewer.Scheduler()
self.assertEqual(sch.counter, 0)
sch.get_next_asset()
sch.get_next_asset()
self.assertEqual(sch.counter, 1)
def test_check_get_db_mtime(self):
settings.settings['database'] = FAKE_DB_PATH
with open(FAKE_DB_PATH, 'a'):
os.utime(FAKE_DB_PATH, (0, 0))
self.assertEqual(0, viewer.Scheduler().get_db_mtime())
def test_playlist_should_be_updated_after_deadline_reached(self):
assets_helper.create_multiple(viewer.db_conn, [asset_x, asset_y])
_, deadline = viewer.generate_asset_list()
fake = FakeDatetime(deadline + timedelta(seconds=1))
viewer.datetime, assets_helper.get_time = fake, lambda: fake.utcnow()
sch = viewer.Scheduler()
sch.refresh_playlist()
self.assertEqual([asset_x], sch.assets)