forked from threat9/routersploit
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Unifying ftp/ssh/telnet/snmp communication (threat9#505)
* Unifying communication api * Adding error messages
- Loading branch information
Showing
30 changed files
with
510 additions
and
434 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,79 +1,91 @@ | ||
import socket | ||
import ftplib | ||
import io | ||
|
||
from routersploit.core.exploit.exploit import Exploit | ||
from routersploit.core.exploit.exploit import Protocol | ||
from routersploit.core.exploit.option import OptBool | ||
from routersploit.core.exploit.printer import print_error | ||
from routersploit.core.exploit.printer import print_success | ||
|
||
|
||
FTP_TIMEOUT = 8.0 | ||
|
||
|
||
class FTPClient(Exploit): | ||
""" FTP Client exploit """ | ||
class FTPCli(object): | ||
""" FTP Client """ | ||
|
||
target_protocol = Protocol.FTP | ||
def __init__(self, ftp_target, ftp_port, ssl=False, verbosity=False): | ||
self.ftp_target = ftp_target | ||
self.ftp_port = ftp_port | ||
self.verbosity = verbosity | ||
|
||
ssl = OptBool(False, "SSL enabled: true/false") | ||
verbosity = OptBool(True, "Enable verbose output: true/false") | ||
self.peer = "{}:{}".format(self.ftp_target, ftp_port) | ||
|
||
def ftp_create(self): | ||
if self.ssl: | ||
ftp_client = ftplib.FTP_TLS() | ||
if ssl: | ||
self.ftp_client = ftplib.FTP_TLS() | ||
else: | ||
ftp_client = ftplib.FTP() | ||
|
||
return ftp_client | ||
|
||
def ftp_connect(self, retries=1): | ||
ftp_client = self.ftp_create() | ||
self.ftp_client = ftplib.FTP() | ||
|
||
def connect(self, retries=1): | ||
for _ in range(retries): | ||
try: | ||
ftp_client.connect(self.target, self.port, timeout=FTP_TIMEOUT) | ||
except (socket.error, socket.timeout): | ||
print_error("Connection error", verbose=self.verbosity) | ||
self.ftp_client.connect(self.ftp_target, self.ftp_port, timeout=FTP_TIMEOUT) | ||
return self.ftp_client | ||
except Exception as err: | ||
print_error(err, verbose=self.verbosity) | ||
else: | ||
return ftp_client | ||
print_error(self.peer, "FTP Error while connecting to the server", err, verbose=self.verbosity) | ||
|
||
ftp_client.close() | ||
return None | ||
self.ftp_client.close() | ||
|
||
def ftp_login(self, username, password): | ||
ftp_client = self.ftp_connect() | ||
if ftp_client: | ||
try: | ||
ftp_client.login(username, password) | ||
return ftp_client | ||
except Exception as err: | ||
pass | ||
return None | ||
|
||
ftp_client.close() | ||
def login(self, username, password): | ||
try: | ||
self.ftp_client.login(username, password) | ||
print_success(self.peer, "FTP Authentication Successful - Username: '{}' Password: '{}'".format(username, password), verbose=self.verbosity) | ||
return self.ftp_client | ||
except Exception as err: | ||
print_error(self.peer, "FTP Authentication Failed - Username: '{}' Password: '{}'".format(username, password), verbose=self.verbosity) | ||
|
||
self.ftp_client.close() | ||
return None | ||
|
||
def ftp_test_connect(self): | ||
ftp_client = self.ftp_connect() | ||
if ftp_client: | ||
ftp_client.close() | ||
def test_connect(self): | ||
if self.connect(): | ||
self.ftp_client.close() | ||
return True | ||
|
||
return False | ||
|
||
def ftp_get_content(self, ftp_client, remote_file): | ||
if ftp_client: | ||
def get_content(self, remote_file): | ||
try: | ||
fp_content = io.BytesIO() | ||
ftp_client.retrbinary("RETR {}".format(remote_file), fp_content.write) | ||
self.ftp_client.retrbinary("RETR {}".format(remote_file), fp_content.write) | ||
return fp_content.getvalue() | ||
except Exception as err: | ||
print_error(self.peer, "FTP Error while retrieving content", err, verbose=self.verbosity) | ||
|
||
return None | ||
|
||
def ftp_close(self, ftp_client): | ||
if ftp_client: | ||
ftp_client.close() | ||
def close(self): | ||
try: | ||
self.ftp_client.close() | ||
except Exception as err: | ||
print_error(self.peer, "FTP Error while closing connection", err, verbose=self.verbosity) | ||
|
||
return None | ||
|
||
|
||
class FTPClient(Exploit): | ||
""" FTP Client exploit option and api """ | ||
|
||
target_protocol = Protocol.FTP | ||
|
||
ssl = OptBool(False, "SSL enabled: true/false") | ||
verbosity = OptBool(True, "Enable verbose output: true/false") | ||
|
||
def ftp_create(self, target=None, port=None): | ||
ftp_target = target if target else self.target | ||
ftp_port = port if port else self.port | ||
|
||
ftp_client = FTPCli(ftp_target, ftp_port, ssl=self.ssl, verbosity=self.verbosity) | ||
return ftp_client |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.