Skip to content

Commit

Permalink
Refactor nag functionality in to NagTimer class.
Browse files Browse the repository at this point in the history
Add default 30 second nag timer to gclient subprocesses.

BUG=227537

Review URL: https://chromiumcodereview.appspot.com/14826003

git-svn-id: svn://svn.chromium.org/chrome/trunk/tools/depot_tools@198207 0039d316-1c4b-4281-b951-d872f2087c98
  • Loading branch information
[email protected] committed May 3, 2013
1 parent 2fd6c3f commit 12b07e7
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 32 deletions.
15 changes: 15 additions & 0 deletions gclient_scm.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ class SCMWrapper(object):
This is the abstraction layer to bind to different SCM.
"""
nag_timer = 30
nag_max = 3

def __init__(self, url=None, root_dir=None, relpath=None):
self.url = url
self._root_dir = root_dir
Expand Down Expand Up @@ -195,6 +198,8 @@ def pack(self, options, args, file_list):
gclient_utils.CheckCallAndFilter(
['git', 'diff', merge_base],
cwd=self.checkout_path,
nag_timer=self.nag_timer,
nag_max=self.nag_max,
filter_fn=GitDiffFilterer(self.relpath).Filter)

def UpdateSubmoduleConfig(self):
Expand All @@ -208,6 +213,8 @@ def UpdateSubmoduleConfig(self):
cmd4 = ['git', 'config', 'fetch.recurseSubmodules', 'false']
kwargs = {'cwd': self.checkout_path,
'print_stdout': False,
'nag_timer': self.nag_timer,
'nag_max': self.nag_max,
'filter_fn': lambda x: None}
try:
gclient_utils.CheckCallAndFilter(cmd, **kwargs)
Expand Down Expand Up @@ -852,6 +859,8 @@ def _Capture(self, args):
return subprocess2.check_output(
['git'] + args,
stderr=subprocess2.PIPE,
nag_timer=self.nag_timer,
nag_max=self.nag_max,
cwd=self.checkout_path).strip()

def _UpdateBranchHeads(self, options, fetch=False):
Expand Down Expand Up @@ -879,6 +888,8 @@ def _UpdateBranchHeads(self, options, fetch=False):
def _Run(self, args, options, **kwargs):
kwargs.setdefault('cwd', self.checkout_path)
kwargs.setdefault('print_stdout', True)
kwargs.setdefault('nag_timer', self.nag_timer)
kwargs.setdefault('nag_max', self.nag_max)
stdout = kwargs.get('stdout', sys.stdout)
stdout.write('\n________ running \'git %s\' in \'%s\'\n' % (
' '.join(args), kwargs['cwd']))
Expand Down Expand Up @@ -928,6 +939,8 @@ def pack(self, options, args, file_list):
['svn', 'diff', '-x', '--ignore-eol-style'] + args,
cwd=self.checkout_path,
print_stdout=False,
nag_timer=self.nag_timer,
nag_max=self.nag_max,
filter_fn=SvnDiffFilterer(self.relpath).Filter)

def update(self, options, args, file_list):
Expand Down Expand Up @@ -1225,6 +1238,8 @@ def FullUrlForRelativeUrl(self, url):
def _Run(self, args, options, **kwargs):
"""Runs a commands that goes to stdout."""
kwargs.setdefault('cwd', self.checkout_path)
kwargs.setdefault('nag_timer', self.nag_timer)
kwargs.setdefault('nag_max', self.nag_max)
gclient_utils.CheckCallAndFilterAndHeader(['svn'] + args,
always=options.verbose, **kwargs)

Expand Down
24 changes: 23 additions & 1 deletion gclient_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ def KillAllRemainingChildren():

def CheckCallAndFilter(args, stdout=None, filter_fn=None,
print_stdout=None, call_filter_on_first_line=False,
**kwargs):
nag_timer=None, nag_max=None, **kwargs):
"""Runs a command and calls back a filter function if needed.
Accepts all subprocess2.Popen() parameters plus:
Expand All @@ -399,13 +399,30 @@ def CheckCallAndFilter(args, stdout=None, filter_fn=None,
# Do a flush of stdout before we begin reading from the subprocess2's stdout
stdout.flush()

nag = None
if nag_timer:
# Hack thread.index to force correct annotation.
index = getattr(threading.currentThread(), 'index', 0)
def _nag_cb(elapsed):
setattr(threading.currentThread(), 'index', index)
stdout.write(' No output for %.0f seconds from command:\n' % elapsed)
stdout.write(' %s\n' % kid.cmd_str)
if (nag_max and
int('%.0f' % (elapsed / nag_timer)) >= nag_max):
stdout.write(' ... killing it!\n')
kid.kill()
nag = subprocess2.NagTimer(nag_timer, _nag_cb)
nag.start()

# Also, we need to forward stdout to prevent weird re-ordering of output.
# This has to be done on a per byte basis to make sure it is not buffered:
# normally buffering is done for each line, but if svn requests input, no
# end-of-line character is output after the prompt and it would not show up.
try:
in_byte = kid.stdout.read(1)
if in_byte:
if nag:
nag.event()
if call_filter_on_first_line:
filter_fn(None)
in_line = ''
Expand All @@ -422,6 +439,8 @@ def CheckCallAndFilter(args, stdout=None, filter_fn=None,
filter_fn(in_line)
in_line = ''
in_byte = kid.stdout.read(1)
if in_byte and nag:
nag.event()
# Flush the rest of buffered output. This is only an issue with
# stdout/stderr not ending with a \n.
if len(in_line):
Expand All @@ -435,6 +454,9 @@ def CheckCallAndFilter(args, stdout=None, filter_fn=None,
except KeyboardInterrupt:
print >> sys.stderr, 'Failed while running "%s"' % ' '.join(args)
raise
finally:
if nag:
nag.cancel()

if rv:
raise subprocess2.CalledProcessError(
Expand Down
89 changes: 60 additions & 29 deletions subprocess2.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,42 @@ def fix_lang(name):
return env


class NagTimer(object):
"""
Triggers a callback when a time interval passes without an event being fired.
For example, the event could be receiving terminal output from a subprocess;
and the callback could print a warning to stderr that the subprocess appeared
to be hung.
"""
def __init__(self, interval, cb):
self.interval = interval
self.cb = cb
self.timer = threading.Timer(self.interval, self.fn)
self.last_output = self.previous_last_output = 0

def start(self):
self.last_output = self.previous_last_output = time.time()
self.timer.start()

def event(self):
self.last_output = time.time()

def fn(self):
now = time.time()
if self.last_output == self.previous_last_output:
self.cb(now - self.previous_last_output)
# Use 0.1 fudge factor, just in case
# (self.last_output - now) is very close to zero.
sleep_time = (self.last_output - now - 0.1) % self.interval
self.previous_last_output = self.last_output
self.timer = threading.Timer(sleep_time + 0.1, self.fn)
self.timer.start()

def cancel(self):
self.timer.cancel()


class Popen(subprocess.Popen):
"""Wraps subprocess.Popen() with various workarounds.
Expand Down Expand Up @@ -192,6 +228,7 @@ def __init__(self, args, **kwargs):
self.start = time.time()
self.timeout = None
self.nag_timer = None
self.nag_max = None
self.shell = kwargs.get('shell', None)
# Silence pylint on MacOSX
self.returncode = None
Expand Down Expand Up @@ -230,8 +267,7 @@ def _tee_threads(self, input): # pylint: disable=W0622
# because of memory exhaustion.
queue = Queue.Queue()
done = threading.Event()
timer = []
last_output = [time.time()] * 2
nag = None

def write_stdin():
try:
Expand All @@ -253,28 +289,12 @@ def _queue_pipe_read(pipe, name):
data = pipe.read(1)
if not data:
break
last_output[0] = time.time()
if nag:
nag.event()
queue.put((name, data))
finally:
queue.put(name)

def nag_fn():
now = time.time()
if done.is_set():
return
if last_output[0] == last_output[1]:
logging.warn(' No output for %.0f seconds from command:' % (
now - last_output[1]))
logging.warn(' %s' % self.cmd_str)
# Use 0.1 fudge factor in case:
# now ~= last_output[0] + self.nag_timer
sleep_time = self.nag_timer + last_output[0] - now - 0.1
while sleep_time < 0:
sleep_time += self.nag_timer
last_output[1] = last_output[0]
timer[0] = threading.Timer(sleep_time, nag_fn)
timer[0].start()

def timeout_fn():
try:
done.wait(self.timeout)
Expand Down Expand Up @@ -313,8 +333,15 @@ def wait_fn():
t.start()

if self.nag_timer:
timer.append(threading.Timer(self.nag_timer, nag_fn))
timer[0].start()
def _nag_cb(elapsed):
logging.warn(' No output for %.0f seconds from command:' % elapsed)
logging.warn(' %s' % self.cmd_str)
if (self.nag_max and
int('%.0f' % (elapsed / self.nag_timer)) >= self.nag_max):
queue.put('timeout')
done.set() # Must do this so that timeout thread stops waiting.
nag = NagTimer(self.nag_timer, _nag_cb)
nag.start()

timed_out = False
try:
Expand All @@ -327,20 +354,22 @@ def wait_fn():
self.stderr_cb(item[1])
else:
# A thread terminated.
threads[item].join()
del threads[item]
if item in threads:
threads[item].join()
del threads[item]
if item == 'wait':
# Terminate the timeout thread if necessary.
done.set()
elif item == 'timeout' and not timed_out and self.poll() is None:
logging.debug('Timed out after %fs: killing' % self.timeout)
logging.debug('Timed out after %.0fs: killing' % (
time.time() - self.start))
self.kill()
timed_out = True
finally:
# Stop the threads.
done.set()
if timer:
timer[0].cancel()
if nag:
nag.cancel()
if 'wait' in threads:
# Accelerate things, otherwise it would hang until the child process is
# done.
Expand All @@ -353,7 +382,8 @@ def wait_fn():
self.returncode = TIMED_OUT

# pylint: disable=W0221,W0622
def communicate(self, input=None, timeout=None, nag_timer=None):
def communicate(self, input=None, timeout=None, nag_timer=None,
nag_max=None):
"""Adds timeout and callbacks support.
Returns (stdout, stderr) like subprocess.Popen().communicate().
Expand All @@ -365,6 +395,7 @@ def communicate(self, input=None, timeout=None, nag_timer=None):
"""
self.timeout = timeout
self.nag_timer = nag_timer
self.nag_max = nag_max
if (not self.timeout and not self.nag_timer and
not self.stdout_cb and not self.stderr_cb):
return super(Popen, self).communicate(input)
Expand Down Expand Up @@ -393,7 +424,7 @@ def communicate(self, input=None, timeout=None, nag_timer=None):
return (stdout, stderr)


def communicate(args, timeout=None, nag_timer=None, **kwargs):
def communicate(args, timeout=None, nag_timer=None, nag_max=None, **kwargs):
"""Wraps subprocess.Popen().communicate() and add timeout support.
Returns ((stdout, stderr), returncode).
Expand Down
10 changes: 9 additions & 1 deletion tests/gclient_scm_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ def testDir(self):
'RunCommand',
'cleanup',
'diff',
'nag_max',
'nag_timer',
'pack',
'relpath',
'revert',
Expand Down Expand Up @@ -496,6 +498,8 @@ def testUpdateSingleCheckout(self):
gclient_scm.gclient_utils.CheckCallAndFilterAndHeader(
['svn', 'checkout', '--depth', 'empty', self.url, self.base_path],
always=True,
nag_max=3,
nag_timer=30,
cwd=self.root_dir)
gclient_scm.scm.SVN.RunAndGetFileList(
options.verbose,
Expand Down Expand Up @@ -530,7 +534,7 @@ def testUpdateSingleCheckoutSVN14(self):
files_list = self.mox.CreateMockAnything()
gclient_scm.gclient_utils.CheckCallAndFilterAndHeader(
['svn', 'export', join(self.url, 'DEPS'), join(self.base_path, 'DEPS')],
always=True, cwd=self.root_dir)
nag_timer=30, nag_max=3, always=True, cwd=self.root_dir)

self.mox.ReplayAll()
scm = self._scm_wrapper(url=self.url, root_dir=self.root_dir,
Expand Down Expand Up @@ -563,6 +567,8 @@ def testUpdateSingleCheckoutSVNUpgrade(self):
gclient_scm.gclient_utils.CheckCallAndFilterAndHeader(
['svn', 'checkout', '--depth', 'empty', self.url, self.base_path],
always=True,
nag_max=3,
nag_timer=30,
cwd=self.root_dir)
gclient_scm.scm.SVN.RunAndGetFileList(
options.verbose,
Expand Down Expand Up @@ -787,6 +793,8 @@ def testDir(self):
'RunCommand',
'cleanup',
'diff',
'nag_max',
'nag_timer',
'pack',
'UpdateSubmoduleConfig',
'relpath',
Expand Down
2 changes: 1 addition & 1 deletion tests/subprocess2_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def __init__(self, args, **kwargs):
results['args'] = args
@staticmethod
# pylint: disable=W0622
def communicate(input=None, timeout=None, nag_timer=None):
def communicate(input=None, timeout=None, nag_max=None, nag_timer=None):
return None, None
self.mock(subprocess2, 'Popen', fake_Popen)
return results
Expand Down

0 comments on commit 12b07e7

Please sign in to comment.