forked from pyrocko/pyrocko
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprogress_demo.py
151 lines (114 loc) · 3.65 KB
/
progress_demo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import logging
from time import sleep
import random
from threading import Thread
import pyrocko
from pyrocko import progress
pyrocko.app_init('info', 'terminal')
logger = logging.getLogger('main')
def main():
with progress.view():
counting_generator()
counting_generator_no_len()
counting_context_manager()
counting_multiline_message()
counting_fails()
counting_nested()
counting_multi()
counting_threads()
def counting_generator():
task = progress.task('counting (generator)', logger=logger)
nitems = 50
for iitem in task(range(nitems)):
sleep(0.1)
def counting_generator_no_len():
def my_range(n):
for i in range(n):
yield i
task = progress.task('counting (generator, no len)', logger=logger)
nitems = 50
for iitem in task(my_range(nitems)):
sleep(0.1)
def counting_context_manager():
nitems = 50
with progress.task(
'counting (context manager)',
nitems,
logger=logger) as task:
for iitem in range(nitems):
if iitem > nitems // 2:
message = 'over half already done!'
else:
message = None
task.update(iitem, message)
sleep(0.1)
def counting_multiline_message():
nitems = 50
with progress.task(
'counting (multiline message)',
nitems,
logger=logger) as task:
for iitem in range(nitems):
message = '\nWe are now at %i.' % iitem
task.update(iitem, message)
sleep(0.1)
def counting_fails():
nitems = 50
with progress.task(
'failing task',
nitems,
logger=logger) as task:
try:
for counter in task(range(nitems)):
sleep(0.1)
if counter == 35:
1 // 0
except Exception as e:
logger.warning('Caught an exception: %s' % e)
def counting_nested():
task1 = progress.task('outer', logger=logger)
for i in task1(range(5)):
task2 = progress.task('inner %i' % i, logger=logger)
for j in task2(range(5)):
task3 = progress.task('inner inner %i %i' % (i, j), logger=logger)
for k in task3(range(50)):
sleep(0.01)
def counting_multi():
ntasks = 4
nitems = 30
with progress.task('counting multi', ntasks*nitems, logger=logger) as task:
subtasks = [
task.task('worker %i' % itask, nitems, logger=logger)
for itask in range(ntasks)]
counters = [0] * ntasks
try:
while any(counter < nitems for counter in counters):
for itask in range(ntasks):
if random.random() < 0.5:
if counters[itask] < nitems:
counters[itask] += 1
subtasks[itask].update(counters[itask])
task.update(sum(counters))
sleep(0.1)
except Exception:
for subtask in subtasks:
subtask.fail()
finally:
for subtask in subtasks:
subtask.done()
sleep(0.5)
def counting_threads():
ntasks = 4
nitems = 30
def count(itask, nitems):
with progress.task('thread %i' % itask, nitems, logger=logger) as task:
for iitem in task(range(nitems)):
sleep(0.1)
threads = [
Thread(target=count, args=[itask, nitems])
for itask in range(ntasks)]
for thread in threads:
thread.start()
while threads:
threads.pop().join()
main()