Skip to content

Commit

Permalink
improve robustness, and failure case of not detecting ASLR slide
Browse files Browse the repository at this point in the history
  • Loading branch information
gaasedelen committed Sep 13, 2021
1 parent b53f4f2 commit 80d4eb2
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 24 deletions.
15 changes: 14 additions & 1 deletion plugins/tenet/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,20 @@ def load_trace(self, filepath):
#

self.reader = TraceReader(filepath, self.arch, disassembler[self])
pmsg(f"Loaded trace of {self.reader.trace.length:,} instructions...")
pmsg(f"Loaded trace {self.reader.trace.filepath}")
pmsg(f"- {self.reader.trace.length:,} instructions...")

if self.reader.analysis.slide != None:
pmsg(f"- {self.reader.analysis.slide:08X} ASLR slide...")
else:
disassembler.warning("Failed to automatically detect ASLR base!\n\nSee console for more info...")
pmsg(" +------------------------------------------------------")
pmsg(" |- ERROR: Failed to detect ASLR base for this trace.")
pmsg(" | --------------------------------------- ")
pmsg(" +-+ You can 'try' rebasing the database to the correct ASLR base")
pmsg(" | if you know it, and reload the trace. Otherwise, it is possible")
pmsg(" | your trace is just... very small and Tenet was not confident")
pmsg(" | predicting an ASLR slide.")

#
# we only hook directly into the disassembler / UI / subsytems once
Expand Down
127 changes: 104 additions & 23 deletions plugins/tenet/trace/analysis.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import bisect
import collections

from tenet.util.log import pmsg

#-----------------------------------------------------------------------------
# analysis.py -- Trace Analysis
#-----------------------------------------------------------------------------
Expand All @@ -27,6 +29,7 @@ def __init__(self, trace, dctx):
self._trace = trace
self._remapped_regions = []
self._unmapped_entry_points = []
self.slide = None
self._analyze()

#-------------------------------------------------------------------------
Expand Down Expand Up @@ -73,64 +76,142 @@ def _analyze_aslr(self):
"""
dctx, trace = self._dctx, self._trace

# get instruction addresses from disassembler
instuction_addresses = dctx.get_instruction_addresses()
# get *all* of the instruction addresses from disassembler
instruction_addresses = dctx.get_instruction_addresses()

#
# bucket the instruction addresses from the disassembler
# based on non-aslr'd bits (lower 12 bits, 0xFFF)
#

# bucket the disas addresses based on non-aslr'd bits
binary_buckets = collections.defaultdict(list)
for address in instuction_addresses:
for address in instruction_addresses:
bits = address & 0xFFF
binary_buckets[bits].append(address)

# get the set of ips from the trace
# get the set of unique, executed addresses from the trace
trace_addresses = trace.ip_addrs

# keep trace addresses that 'seem' interesting
#
# scan the executed addresses from the trace, and discard
# any that cannot be bucketed by the non ASLR-d bits that
# match the open executable
#

trace_buckets = collections.defaultdict(list)
for executed_address in trace_addresses:
bits = executed_address & 0xFFF
if bits not in binary_buckets:
continue
trace_buckets[bits].append(executed_address)

#
# this is where things get a little bit interesting. we compute the
# distance between addresses in the trace and disassembler buckets
#
# the distance that appears most frequently is likely to be the ASLR
# slide to align the disassembler imagebase and trace addresses
#

slide_buckets = collections.defaultdict(list)
for bits, instruction_addresses in binary_buckets.items():
for bits, bin_addresses in binary_buckets.items():
for executed_address in trace_buckets[bits]:
for address in instruction_addresses:
distance = address - executed_address
for disas_address in bin_addresses:
distance = disas_address - executed_address
slide_buckets[distance].append(executed_address)

hits = []
for slide, executed_addresses in slide_buckets.items():
hits.append((len(executed_addresses), slide))
# basically the executable 'range' of the open binary
disas_low_address = instruction_addresses[0]
disas_high_address = instruction_addresses[-1]

# convert to set for O(1) lookup in following loop
instruction_addresses = set(instruction_addresses)

#
# loop through all the slide buckets, from the most frequent distance
# (ASLR slide) to least frequent. the goal now is to sanity check the
# ranges to find one that seems to couple tightly with the disassembler
#

for k in sorted(slide_buckets, key=lambda k: len(slide_buckets[k]), reverse=True):
expected = len(slide_buckets[k])

hits.sort(reverse=True)
#
# TODO: uh, if it's getting this small, I don't feel comfortable
# selecting an ASLR slide. the user might be loading a tiny trace
# with literally 'less than 10' unique instructions (?) that
# would map to the database
#

#for num_executed, slide in hits:
# print(f"{num_executed} items, slide {slide:08X}")
#for address in sorted(slide_buckets[hits[0][1]]):
# print(f"Executed: {address:08X} --> {address + hits[0][1]:08X}")
if expected < 10:
continue

hit, seen = 0, 0
for address in trace_addresses:

# add the ASLR slide for this bucket to a traced address
rebased_address = address + k

# the rebased address seems like it falls within the disassembler ranges
if disas_low_address <= rebased_address < disas_high_address:
seen += 1

# but does the address *actually* exist in the disassembler?
if rebased_address in instruction_addresses:
hit += 1

#
# the first *high* hit ratio is almost certainly the correct
# ASLR, practically speaking this should probably be 1.00, but
# I lowered it a bit to give a bit of flexibility.
#
# NOTE/TODO: a lower 'hit' ratio *could* occur if a lot of
# undefined instruction addresses in the disassembler get
# executed in the trace. this could be packed code / malware,
# in which case we will have to perform more aggressive analysis
#

if (hit / seen) > 0.95:
#print(f"ASLR Slide: {k:08X} Quality: {hit/seen:0.2f} (h {hit} s {seen} e {expected})")
slide = k
break

#
# if we do not break from the loop, we failed to find an adequate
# slide, which is very bad.
#
# NOTE/TODO: uh what do we do if we fail the ASLR slide?
#

else:
self.slide = None
return False

# fetch the top hit
_, slide = hits[0]
#
# TODO: err, lol this is all kind of dirty. should probably refactor
# and clean up this whole 'remapped_regions' stuff.
#

m1 = [instuction_addresses[0], instuction_addresses[-1]]
m1 = [disas_low_address, disas_high_address]

if slide < 0:
m2 = [m1[0] - slide, m1[1] - slide]
else:
m2 = [m1[0] + slide, m1[1] + slide]

self.slide = slide
self._remapped_regions.append((m1, m2))

#print(f"BIN ADDRESSES: {len(instuction_addresses)}")
#print(f"TRC ADDRESSES: {len(trace_addresses)}")
#print(f"INT ADDRESSES: {len(interesting_addresses)}")
return True

def _analyze_unmapped(self):
"""
Analyze trace execution to identify entry/exit to unmapped segments.
"""
if self.slide is None:
return

# alias for readability and speed
trace, ips = self._trace, self._trace.ip_addrs
lower_mapped, upper_mapped = self._remapped_regions[0][1]

Expand Down

0 comments on commit 80d4eb2

Please sign in to comment.