forked from donnemartin/gitsome
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathjobs.py
273 lines (223 loc) · 7.48 KB
/
jobs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
"""
Job control for the xonsh shell.
"""
import os
import sys
import time
import signal
import builtins
from subprocess import TimeoutExpired
from xonsh.tools import ON_WINDOWS
try:
_shell_tty = sys.stderr.fileno()
except OSError:
_shell_tty = None
if ON_WINDOWS:
def _continue(obj):
return None
def _kill(obj):
return obj.kill()
def ignore_sigtstp():
pass
def _set_pgrp(info):
pass
def wait_for_active_job(signal_to_send=None):
"""
Wait for the active job to finish, to be killed by SIGINT, or to be
suspended by ctrl-z.
"""
_clear_dead_jobs()
act = builtins.__xonsh_active_job__
if act is None:
return
job = builtins.__xonsh_all_jobs__[act]
obj = job['obj']
if job['bg']:
return
while obj.returncode is None:
try:
obj.wait(0.01)
except TimeoutExpired:
pass
except KeyboardInterrupt:
obj.kill()
if obj.poll() is not None:
builtins.__xonsh_active_job__ = None
else:
def _continue(obj):
return signal.SIGCONT
def _kill(obj):
os.kill(obj.pid, signal.SIGKILL)
def ignore_sigtstp():
signal.signal(signal.SIGTSTP, signal.SIG_IGN)
def _set_pgrp(info):
try:
info['pgrp'] = os.getpgid(info['obj'].pid)
except ProcessLookupError:
pass
_shell_pgrp = os.getpgrp()
_block_when_giving = (signal.SIGTTOU, signal.SIGTTIN, signal.SIGTSTP)
def _give_terminal_to(pgid):
# over-simplified version of:
# give_terminal_to from bash 4.3 source, jobs.c, line 4030
# this will give the terminal to the process group pgid
if _shell_tty is not None and os.isatty(_shell_tty):
oldmask = signal.pthread_sigmask(signal.SIG_BLOCK, _block_when_giving)
os.tcsetpgrp(_shell_tty, pgid)
signal.pthread_sigmask(signal.SIG_SETMASK, oldmask)
def wait_for_active_job(signal_to_send=None):
"""
Wait for the active job to finish, to be killed by SIGINT, or to be
suspended by ctrl-z.
"""
_clear_dead_jobs()
act = builtins.__xonsh_active_job__
if act is None:
return
job = builtins.__xonsh_all_jobs__[act]
obj = job['obj']
if job['bg']:
return
pgrp = job['pgrp']
obj.done = False
# give the terminal over to the fg process
_give_terminal_to(pgrp)
# if necessary, send the specified signal to this process
# (this hook was added because vim, emacs, etc, seem to need to have
# the terminal when they receive SIGCONT from the "fg" command)
if signal_to_send is not None:
os.kill(obj.pid, signal_to_send)
_, s = os.waitpid(obj.pid, os.WUNTRACED)
if os.WIFSTOPPED(s):
obj.done = True
job['bg'] = True
job['status'] = 'stopped'
print() # get a newline because ^Z will have been printed
print_one_job(act)
elif os.WIFSIGNALED(s):
print() # get a newline because ^C will have been printed
if obj.poll() is not None:
builtins.__xonsh_active_job__ = None
_give_terminal_to(_shell_pgrp) # give terminal back to the shell
def _clear_dead_jobs():
to_remove = set()
for num, job in builtins.__xonsh_all_jobs__.items():
obj = job['obj']
if obj.poll() is not None:
to_remove.add(num)
for i in to_remove:
del builtins.__xonsh_all_jobs__[i]
if builtins.__xonsh_active_job__ == i:
builtins.__xonsh_active_job__ = None
if builtins.__xonsh_active_job__ is None:
_reactivate_job()
def _reactivate_job():
if len(builtins.__xonsh_all_jobs__) == 0:
return
builtins.__xonsh_active_job__ = max(builtins.__xonsh_all_jobs__.items(),
key=lambda x: x[1]['started'])[0]
def print_one_job(num):
"""Print a line describing job number ``num``."""
try:
job = builtins.__xonsh_all_jobs__[num]
except KeyError:
return
act = '*' if num == builtins.__xonsh_active_job__ else ' '
status = job['status']
cmd = [' '.join(i) if isinstance(i, list) else i for i in job['cmds']]
cmd = ' '.join(cmd)
pid = job['pids'][-1]
bg = ' &' if job['bg'] else ''
print('{}[{}] {}: {}{} ({})'.format(act, num, status, cmd, bg, pid))
def get_next_job_number():
"""Get the lowest available unique job number (for the next job created).
"""
_clear_dead_jobs()
i = 1
while i in builtins.__xonsh_all_jobs__:
i += 1
return i
def add_job(info):
"""
Add a new job to the jobs dictionary.
"""
info['started'] = time.time()
info['status'] = 'running'
_set_pgrp(info)
num = get_next_job_number()
builtins.__xonsh_all_jobs__[num] = info
builtins.__xonsh_active_job__ = num
if info['bg']:
print_one_job(num)
def _default_sigint_handler(num, frame):
raise KeyboardInterrupt
def kill_all_jobs():
"""
Send SIGKILL to all child processes (called when exiting xonsh).
"""
_clear_dead_jobs()
for job in builtins.__xonsh_all_jobs__.values():
_kill(job['obj'])
def jobs(args, stdin=None):
"""
xonsh command: jobs
Display a list of all current jobs.
"""
_clear_dead_jobs()
for j in sorted(builtins.__xonsh_all_jobs__):
print_one_job(j)
return None, None
def fg(args, stdin=None):
"""
xonsh command: fg
Bring the currently active job to the foreground, or, if a single number is
given as an argument, bring that job to the foreground.
"""
_clear_dead_jobs()
if len(args) == 0:
# start active job in foreground
act = builtins.__xonsh_active_job__
if act is None:
return '', 'Cannot bring nonexistent job to foreground.\n'
elif len(args) == 1:
try:
act = int(args[0])
except ValueError:
return '', 'Invalid job: {}\n'.format(args[0])
if act not in builtins.__xonsh_all_jobs__:
return '', 'Invalid job: {}\n'.format(args[0])
else:
return '', 'fg expects 0 or 1 arguments, not {}\n'.format(len(args))
builtins.__xonsh_active_job__ = act
job = builtins.__xonsh_all_jobs__[act]
job['bg'] = False
job['status'] = 'running'
print_one_job(act)
wait_for_active_job(_continue(job['obj']))
def bg(args, stdin=None):
"""
xonsh command: bg
Resume execution of the currently active job in the background, or, if a
single number is given as an argument, resume that job in the background.
"""
_clear_dead_jobs()
if len(args) == 0:
# start active job in foreground
act = builtins.__xonsh_active_job__
if act is None:
return '', 'Cannot send nonexistent job to background.\n'
elif len(args) == 1:
try:
act = int(args[0])
except ValueError:
return '', 'Invalid job: {}\n'.format(args[0])
if act not in builtins.__xonsh_all_jobs__:
return '', 'Invalid job: {}\n'.format(args[0])
else:
return '', 'bg expects 0 or 1 arguments, not {}\n'.format(len(args))
builtins.__xonsh_active_job__ = act
job = builtins.__xonsh_all_jobs__[act]
job['bg'] = True
job['status'] = 'running'
print_one_job(act)
wait_for_active_job(_continue(job['obj']))