forked from anacrolix/pydlnadms
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathffprobe.py
executable file
·84 lines (76 loc) · 2.57 KB
/
ffprobe.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
#!/usr/bin/env python3
import errno
import logging
logger = logging.getLogger()
from resource import setrlimit, RLIMIT_CPU
import re
from subprocess import Popen, PIPE, CalledProcessError, list2cmdline
def parse_section(lines, section):
for l in lines:
if l.startswith('[/{section}]'.format(**vars())):
return
option, value = l.split('=', 1)
yield option, value.rstrip()
def parse_stdout(lines):
retval = {}
for l in lines:
section = re.match(r'\[(.+)\]', l).group(1)
options = dict(parse_section(lines, section))
index = options.get('index')
if index is None:
assert section not in retval, section
retval[section] = options
else:
index = int(index)
assert len(retval.setdefault(section, [])) == index, (section, index)
retval[section].append(options)
return retval
def preexec_fn():
# give the process 1 second, 2 if it treats SIGXCPU
setrlimit(RLIMIT_CPU, (1, 2))
def ffprobe(path):
args = ['ffprobe', '-show_format', '-show_streams', path]
process = Popen(
args,
stdout=PIPE,
stderr=PIPE,
preexec_fn=preexec_fn,
close_fds=True)
stdout, stderr = process.communicate()
if process.returncode != 0:
raise CalledProcessError(process.returncode, args)
# TODO an alternative here could be to try several encodings in succession:
# utf-8, cp1252, and the western european one whatever it is
lines = (l.rstrip() for l in stdout.decode('cp1252', errors='ignore').splitlines())
return parse_stdout(lines)
def res_data(path):
try:
data = ffprobe(path)
except OSError as exc:
if exc.errno == errno.ENOENT:
logger.error(exc)
return {}
else:
raise
except CalledProcessError as exc:
logger.warning('{!r} failed with exit code {:d}'.format(
exc.cmd,
exc.returncode))
return {}
data = {k: v for k, v in data['FORMAT'].items() if v != 'N/A'}
from datetime import timedelta
attrs = {}
if 'bit_rate' in data:
attrs['bitrate'] = int(data['bit_rate'].rstrip('0').rstrip('.'))
if 'duration' in data:
attrs['duration'] = timedelta(seconds=float(data['duration']))
return attrs
def main():
logging.basicConfig(level=logging.NOTSET)
import argparse, pprint, sys
parser = argparse.ArgumentParser()
parser.add_argument('file')
namespace = parser.parse_args()
pprint.pprint(ffprobe(namespace.file))
if __name__ == '__main__':
main()