forked from StarRocks/starrocks
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathquery-trace-thread-pov.py
144 lines (120 loc) · 4.41 KB
/
query-trace-thread-pov.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
#!/usr/bin/env python
# Copyright 2021-present StarRocks, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https:#www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This tool is to change POV of trace graph, from pipeline driver to thread.
# usage:
# ./query-trace-thread-pov.py < pipeline-driver.trace > thread.trace // use pipe
# ./query-trace-thread-pov.py --input pipeline-driver.trace --output thread.trace // use file
# ./query-trace-thread-pov.py < pipeline-driver.trace | gzip > thread.trace.gz // use pipe and compress
import json
import sys
def load_events(fh):
for s in fh:
s = s.strip()
if not s: continue
if s.startswith('{'):
if s.startswith('{"traceEvents"'): continue
if s.startswith('{}'): continue
if s.endswith(','):
s = s[:-1]
yield json.loads(s)
def remove_fields(d):
for f in ('id', 'tidx', 'args'):
if f in d:
del d[f]
def fix_events(fh, events, prefilters, postfilters):
io_tid_set = set()
exec_tid_set = set()
fh.write('{"traceEvents":[\n')
# pid: fragment instance id low bits.
# tid: pipeline driver pointer.
def fix_io_task(ev):
d = ev.copy()
d.update({'pid': '0', 'tid': d['tidx'], 'name': d['cat']})
d['ph'] = d['ph'].upper()
d['cat'] = d['tid']
# id: chunk source pointer.
return d
def fix_driver(ev):
d = ev.copy()
d.update({'pid': 0, 'tid': d['tidx'], 'name': d['cat'] + '_' + d['tid']})
d['cat'] = d['tid']
return d
def fix_operator(ev):
d = ev.copy()
d.update({'pid': 0, 'tid': d['tidx']})
return d
for ev in events:
name = ev.get('name')
cat = ev.get('cat')
d = None
if any(f(ev) for f in prefilters): continue
if name == 'io_task':
d = fix_io_task(ev)
io_tid_set.add(d['tid'])
else:
if name == 'process' and cat.startswith('driver'):
d = fix_driver(ev)
elif cat in ('pull_chunk', 'push_chunk'):
d = fix_operator(ev)
if d:
exec_tid_set.add(d['tid'])
if not d:
continue
remove_fields(d)
if any(f(ev) for f in postfilters): continue
fh.write(json.dumps(d) + ',\n')
headers = []
headers.append(dict(name='process_name', ph='M', pid=0, args={'name': 'starrocks_be'}))
for tid in io_tid_set:
d = dict(name='thread_name', ph="M", pid=0, tid=tid, args={"name": "IOThread-%s" % tid})
headers.append(d)
for tid in exec_tid_set:
d = dict(name='thread_name', ph="M", pid=0, tid=tid, args={"name": "ExecThread-%s" % tid})
headers.append(d)
for d in headers:
fh.write(json.dumps(d) + ',\n')
fh.write('{}]}')
def drill_events(fh, events, text):
fh.write('{"traceEvents":[\n')
for ev in events:
if ev['ph'] == 'M' or ev['name'].find(text) != -1:
fh.write(json.dumps(ev) + ',\n')
fh.write('{}]}')
def main():
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--input', help='input file')
parser.add_argument('--output', help='output file')
parser.add_argument('-v', action='store_true', help='output detailed version')
parser.add_argument('-c', action='store_true', help='compress output file')
parser.add_argument('--drill', help='to drill down a single operator by name')
args = parser.parse_args()
fin = sys.stdin
if args.input:
fin = open(args.input)
fout = sys.stdout
if args.output:
fout = open(args.output, 'w')
prefilters = []
postfilters = []
if not args.v:
prefilters.append(lambda x: x.get('cat') in ('push_chunk', 'pull_chunk'))
events = load_events(fin)
if args.drill:
drill_events(fout, events, args.drill)
else:
fix_events(fout, events, prefilters, postfilters)
if __name__ == '__main__':
main()