Skip to content

Commit

Permalink
Support DCERPC.V5 library and SMB2 for smb_login and smb_lookupsid mo…
Browse files Browse the repository at this point in the history
…dules
  • Loading branch information
asolino committed Apr 24, 2015
1 parent e6b7be5 commit fccc584
Showing 1 changed file with 28 additions and 68 deletions.
96 changes: 28 additions & 68 deletions patator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2546,15 +2546,18 @@ def execute(self, host, port='389', binddn='', bindpw='', basedn='', ssl='0'):

# SMB {{{
try:
from impacket import smb as impacket_smb
from impacket.dcerpc import dcerpc, transport, lsarpc
from impacket.smbconnection import SMBConnection, SessionError
from impacket import nt_errors
from impacket.dcerpc.v5 import transport, lsat, lsad
from impacket.dcerpc.v5.dtypes import MAXIMUM_ALLOWED
from impacket.dcerpc.v5.samr import SID_NAME_USE
except ImportError:
notfound.append('impacket')

class SMB_Connection(TCP_Connection):

def close(self):
self.fp.get_socket().close()
self.fp.getSMBServer().get_socket().close()

class Response_SMB(Response_Base):
indicatorsfmt = [('code', -8), ('size', -4), ('time', 6)]
Expand All @@ -2579,34 +2582,9 @@ class SMB_login(TCP_Cache):

Response = Response_SMB

# ripped from medusa smbnt.c
error_map = {
0xffff: 'UNKNOWN_ERROR_CODE',
0x0000: 'STATUS_SUCCESS',
0x000d: 'STATUS_INVALID_PARAMETER',
0x005e: 'STATUS_NO_LOGON_SERVERS',
0x006d: 'STATUS_LOGON_FAILURE',
0x006e: 'STATUS_ACCOUNT_RESTRICTION',
0x006f: 'STATUS_INVALID_LOGON_HOURS',
0x0002: 'STATUS_INVALID_LOGON_HOURS (LM Authentication)',
0x0070: 'STATUS_INVALID_WORKSTATION',
0x0002: 'STATUS_INVALID_WORKSTATION (LM Authentication)',
0x0071: 'STATUS_PASSWORD_EXPIRED',
0x0072: 'STATUS_ACCOUNT_DISABLED',
0x015b: 'STATUS_LOGON_TYPE_NOT_GRANTED',
0x018d: 'STATUS_TRUSTED_RELATIONSHIP_FAILURE',
0x0193: 'STATUS_ACCOUNT_EXPIRED',
0x0002: 'STATUS_ACCOUNT_EXPIRED_OR_DISABLED (LM Authentication)',
0x0224: 'STATUS_PASSWORD_MUST_CHANGE',
0x0002: 'STATUS_PASSWORD_MUST_CHANGE (LM Authentication)',
0x0234: 'STATUS_ACCOUNT_LOCKED_OUT (No LM Authentication Code)',
0x0001: 'AS400_STATUS_LOGON_FAILURE',
0x0064: 'The machine you are logging onto is protected by an authentication firewall.',
}

def connect(self, host, port):
# if port == 445, impacket will use <host> instead of '*SMBSERVER' as the remote_name
fp = impacket_smb.SMB('*SMBSERVER', host, sess_port=int(port))
fp = SMBConnection('*SMBSERVER', host, sess_port=int(port))

return SMB_Connection(fp)

Expand All @@ -2632,28 +2610,14 @@ def execute(self, host, port='139', user=None, password='', password_hash=None,
fp.login(user, password, domain)

logger.debug('No error')
code, mesg = '0', '%s\\%s (%s)' % (fp.get_server_domain(), fp.get_server_name(), fp.get_server_os())
code, mesg = '0', '%s\\%s (%s)' % (fp.getServerDomain(), fp.getServerName(), fp.getServerOS())

self.reset()

except impacket_smb.SessionError as e:

if e.error_class == 0:
code = '%04x' % e.error_code
mesg = self.error_map.get(e.error_code & 0x0000fffff, '')

else:
code = '%x%04x' % (e.error_class, e.error_code)

error_class = e.error_classes.get(e.error_class, None) # -> ("ERRNT", nt_msgs)
if error_class:
class_str = error_class[0] # "ERRNT"
error_tuple = error_class[1].get(e.error_code, None) # -> ("STATUS_LOGON_FAILURE","Logon failure: unknown user name or bad password.") or None

if error_tuple:
mesg = '%s %s' % error_tuple
else:
mesg = '%s' % class_str
except SessionError as e:
# Something failed, bye
code = str(hex(e.getErrorCode()))
mesg = nt_errors.ERROR_MESSAGES[e.getErrorCode()][0]

if persistent == '0':
self.reset()
Expand Down Expand Up @@ -2691,37 +2655,33 @@ class SMB_lookupsid(TCP_Cache):
def connect(self, host, port, user, password):
smbt = transport.SMBTransport(host, int(port), r'\lsarpc', user, password)

dce = dcerpc.DCERPC_v5(smbt)
dce = smbt.get_dce_rpc()
dce.connect()
dce.bind(lsarpc.MSRPC_UUID_LSARPC)

fp = lsarpc.DCERPCLsarpc(dce)
return DCE_Connection(fp, smbt)
dce.bind(lsat.MSRPC_UUID_LSAT)

# http://msdn.microsoft.com/en-us/library/windows/desktop/hh448528%28v=vs.85%29.aspx
SID_NAME_USER = [0, 'User', 'Group', 'Domain', 'Alias', 'WellKnownGroup', 'DeletedAccount', 'Invalid', 'Unknown', 'Computer', 'Label']
return DCE_Connection(dce, smbt)

def execute(self, host, port='139', user='', password='', sid=None, rid=None, persistent='1'):

fp, _ = self.bind(host, port, user, password)
try:
fp, _ = self.bind(host, port, user, password)
except SessionError, e:
error_code = nt_errors.ERROR_MESSAGES[e.getErrorCode()][0]
# Something failed, bye
return self.Response(1, error_code )

if rid:
sid = '%s-%s' % (sid, rid)

op2 = fp.LsarOpenPolicy2('\\', access_mask=0x02000000)
res = fp.LsarLookupSids(op2['ContextHandle'], [sid])

if res['ErrorCode'] == 0:
op2 = lsat.hLsarOpenPolicy2(fp, MAXIMUM_ALLOWED | lsat.POLICY_LOOKUP_NAMES)
try:
res = lsat.hLsarLookupSids(fp, op2['PolicyHandle'], [sid],lsat.LSAP_LOOKUP_LEVEL.LsapLookupWksta)
code, names = 0, []

for d in res.formatDict():

if 'types' in d: # http://code.google.com/p/impacket/issues/detail?id=10
names.append(','.join('%s\\%s (%s)' % (d['domain'], n, self.SID_NAME_USER[t]) for n, t in zip(d['names'], d['types'])))
else:
names.append(','.join('%s\\%s' % (d['domain'], n) for n in d['names']))

else:
for n, item in enumerate(res['TranslatedNames']['Names']):
if item['Use'] != SID_NAME_USE.SidTypeUnknown:
names.append("%s\\%s (%s)" % (res['ReferencedDomains']['Domains'][item['DomainIndex']]['Name'], item['Name'], SID_NAME_USE.enumItems(item['Use']).name))
except Exception, e:
code, names = 1, ['unknown'] # STATUS_SOME_NOT_MAPPED

if persistent == '0':
Expand Down

0 comments on commit fccc584

Please sign in to comment.