Skip to content

Commit

Permalink
Added option for specifying a user to run external extraction utiltie…
Browse files Browse the repository at this point in the history
…s as
  • Loading branch information
devttys0 committed Sep 3, 2021
1 parent bd2c356 commit 8f1dcca
Showing 1 changed file with 32 additions and 38 deletions.
70 changes: 32 additions & 38 deletions src/binwalk/modules/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ class Extractor(Module):
type=int,
kwargs={'max_count': 0},
description='Limit the number of extracted files'),
Option(short='0',
long='run-as',
type=str,
kwargs={'runas_user': 0},
description="Execute external extraction utilities with the specified user's privileges"),
#Option(short='u',
# long='limit',
# type=int,
Expand All @@ -109,10 +114,6 @@ class Extractor(Module):
long='subdirs',
kwargs={'extract_into_subdirs': True},
description="Extract into sub-directories named by the offset"),
Option(short='0',
long='unsafe',
kwargs={'unsafe_extraction': True},
description="Execute external extractions without dropping privileges"),
]

KWARGS = [
Expand All @@ -127,25 +128,24 @@ class Extractor(Module):
Kwarg(name='manual_rules', default=[]),
Kwarg(name='matryoshka', default=0),
Kwarg(name='enabled', default=False),
Kwarg(name='unsafe_extraction', default=False),
Kwarg(name='runas_user', default=None),
]

def load(self):
self.unpriv_uid = None
self.unpriv_gid = None

if self.enabled is True and self.unsafe_extraction is False:
if os.getuid() != 0:
raise ModuleException("Binwalk requires root privileges to safely execute external extraction utilities as an unprivileged user.")

self.shell_call('useradd %s' % self.UNPRIVILEGED_USER_NAME)

user_info = pwd.getpwnam(self.UNPRIVILEGED_USER_NAME)
self.unpriv_uid = user_info.pw_uid
self.unpriv_gid = user_info.pw_gid
else:
self.unpriv_uid = os.getuid()
self.unpriv_gid = os.getgid()
self.runas_uid = None
self.runas_gid = None

if self.enabled is True:
if self.runas_user is None:
if os.getuid() == 0:
raise ModuleException("Binwalk extraction uses many third party utilities, which may not be secure.\nIf you wish to have extraction utilities executed as the current user, use --run-as=<user name>.")

self.runas_uid = os.getuid()
self.runas_gid = os.getgid()
else:
user_info = pwd.getpwnam(self.runas_user)
self.runas_uid = user_info.pw_uid
self.runas_gid = user_info.pw_gid

# Holds a list of extraction rules loaded either from a file or when
# manually specified.
Expand Down Expand Up @@ -560,8 +560,8 @@ def build_output_directory(self, path):
else:
output_directory = self.extraction_directories[path]

# Make sure unpriv user can access this directory
os.chown(output_directory, self.unpriv_uid, self.unpriv_gid)
# Make sure runas user can access this directory
os.chown(output_directory, self.runas_uid, self.runas_gid)

return output_directory

Expand Down Expand Up @@ -856,8 +856,8 @@ def _dd(self, file_name, offset, size, extension, output_file_name=None):
fdout.close()
fdin.close()

# Make sure unprivileged user can access this file
os.chown(fname, self.unpriv_uid, self.unpriv_gid)
# Make sure runasileged user can access this file
os.chown(fname, self.runas_uid, self.runas_gid)
except KeyboardInterrupt as e:
raise e
except Exception as e:
Expand Down Expand Up @@ -938,30 +938,24 @@ def execute(self, cmd, fname, codes=[0, None]):
return (retval, '&&'.join(command_list))

def shell_call(self, command):
# Fork a child to drop privs
child_pid = os.fork()
if child_pid is 0:
# Switch to unprivileged user, if one has been set
if self.unpriv_uid is not None and self.unpriv_gid is not None:
binwalk.core.common.debug("Dropping privileges to %d:%d" % (self.unpriv_uid, self.unpriv_gid))
os.setgid(self.unpriv_uid)
os.setuid(self.unpriv_gid)

# If not in debug mode, create a temporary file to redirect
# stdout and stderr to
# Switch to the run-as user privileges, if one has been set
if self.runas_uid is not None and self.runas_gid is not None:
binwalk.core.common.debug("Switching privileges to %s (%d:%d)" % (self.runas_user, self.runas_uid, self.runas_gid))
os.setgid(self.runas_uid)
os.setuid(self.runas_gid)

# If not in debug mode, redirect output to /dev/null
if not binwalk.core.common.DEBUG:
tmp = tempfile.TemporaryFile()
tmp = subprocess.DEVNULL
else:
tmp = None

# Execute command
binwalk.core.common.debug("subprocess.call(%s, stdout=%s, stderr=%s)" % (command, str(tmp), str(tmp)))
rval = subprocess.call(shlex.split(command), stdout=tmp, stderr=tmp)

# Clean up temp file
if tmp is not None:
tmp.close()

sys.exit(rval)
else:
return os.wait()[1]

0 comments on commit 8f1dcca

Please sign in to comment.