-
Notifications
You must be signed in to change notification settings - Fork 82
/
Copy pathconsumer.py
115 lines (104 loc) · 3.84 KB
/
consumer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import argparse
from collections import defaultdict
import json
import logging
import os
from pathlib import Path
import shutil
import subprocess
from xml.dom import minidom
from pystalk import BeanstalkClient
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("-b", "--beanstalkd", default="0.0.0.0:11300",
help="beanstalkd host:port.")
parser.add_argument("-x", "--extractor", required=True,
help="Namespaces extractor executable path.")
parser.add_argument("-t", "--tmp", required=True,
help="Temporary files directory.")
parser.add_argument("-o", "--output", required=True,
help="Output file path.")
return parser.parse_args()
def run_cmd(log, *cmd):
p = subprocess.run(list(cmd), stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
output = p.stdout.decode()
try:
p.check_returncode()
except Exception as e:
log.info("\"%s\"", "\" \"".join(cmd))
log.info("\n%s", output)
raise e from None
return output
def execute_job(job, exe, tmp, output, log):
subdirs = run_cmd(log, "ls", "-1vr", job).split("\n")
if not subdirs:
log.warning("- (subdir) %s", job)
return
subdir = Path(job) / subdirs[0]
nupkg = list(subdir.rglob("*.nupkg"))
if not nupkg:
log.warning("- (nupkg) %s", job)
return
nupkg = str(nupkg[0])
nuspec = list(subdir.glob("*.nuspec"))
if not nuspec:
log.warning("- (nuspec) %s", job)
return
nuspec = str(nuspec[0])
nuspec = minidom.parse(nuspec)
name = nuspec.getElementsByTagName("id")[0].firstChild.nodeValue
try:
tags = nuspec.getElementsByTagName("tags")[0].firstChild.nodeValue
except (IndexError, AttributeError):
tags = ""
try:
descr = nuspec.getElementsByTagName("description")[0].firstChild.nodeValue
except (IndexError, AttributeError):
descr = ""
tmp = Path(tmp) / str(os.getpid())
tmp = tmp / name
tmp.mkdir(parents=True, exist_ok=True)
try:
run_cmd(log, "unzip", "-o", "-d", str(tmp), nupkg)
namespaces = defaultdict(int)
for dll in tmp.rglob("*.dll"):
try:
for line in run_cmd(log, exe, str(dll)).split("\n"):
if not line:
continue
ns, count = line.split()
namespaces[ns] += int(count)
except subprocess.CalledProcessError:
log.warning("failed to extract %s", dll)
json.dump({"name": name, "tags": tags, "description": descr, "namespaces": namespaces},
output, sort_keys=True)
output.write("\n")
finally:
shutil.rmtree(tmp)
log.info("✔ %s", job)
def main():
args = parse_args()
log = logging.getLogger("nuget-meta")
logging.basicConfig(level=logging.INFO)
host, port = args.beanstalkd.split(":")
client = BeanstalkClient(host, int(port), auto_decode=True)
try:
with open(args.output, "a") as fout:
for job in client.reserve_iter():
try:
execute_job(job.job_data, args.extractor, args.tmp, fout, log)
except Exception:
log.exception(job)
try:
client.bury_job(job.job_id)
except Exception as e:
log.error("bury %s: %s: %s", job.job_data, type(e).__name__, e)
continue
try:
client.delete_job(job.job_id)
except Exception as e:
log.error("delete %s: %s: %s", job.job_data, type(e).__name__, e)
finally:
shutil.rmtree(os.path.join(args.tmp, str(os.getpid())), ignore_errors=True)
if __name__ == "__main__":
exit(main())