Skip to content

Commit

Permalink
working in tests
Browse files Browse the repository at this point in the history
  • Loading branch information
gmartinez95 committed Aug 10, 2023
1 parent e4990b5 commit 5058981
Showing 1 changed file with 362 additions and 0 deletions.
362 changes: 362 additions & 0 deletions tests/test_api_services_context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,362 @@
'''
Faraday Penetration Test IDE
Copyright (C) 2013 Infobyte LLC (http://www.infobytesec.com/)
See the file 'doc/LICENSE' for the license information
'''
"""Tests for many API endpoints that do not depend on workspace_name"""
try:
from urllib import urlencode
except ImportError:
from urllib.parse import urlencode

import pytest
import json

from faraday.server.api.modules.services import ServiceView
from tests import factories
from tests.test_api_non_workspaced_base import ReadWriteAPITests, BulkDeleteTestsMixin, BulkUpdateTestsMixin
from faraday.server.models import (
Service, Credential, Vulnerability
)
from tests.factories import HostFactory, EmptyCommandFactory, CredentialFactory, VulnerabilityFactory


@pytest.mark.usefixtures('logged_user')
class TestListServiceView(ReadWriteAPITests, BulkUpdateTestsMixin, BulkDeleteTestsMixin):
model = Service
factory = factories.ServiceFactory
api_endpoint = 'services'
view_class = ServiceView
patchable_fields = ['name']

def control_cant_change_data(self, data: dict):
if 'parent' in data:
data['parent'] = self.first_object.host_id
return data

def test_service_list_backwards_compatibility(self, test_client,
second_workspace, session):
self.factory.create(workspace=second_workspace)
session.commit()
res = test_client.get(self.url())
assert res.status_code == 200
assert 'services' in res.json
for service in res.json['services']:
assert {'id', 'key', 'value'} == set(service.keys())
object_properties = [
'status',
'protocol',
'description',
'_rev',
'owned',
'owner',
'credentials',
'name',
'version',
'_id',
'metadata'
]
expected = set(object_properties)
result = set(service['value'].keys())
assert expected <= result

def test_create_service(self, test_client, host, session):
session.commit()
data = {
"name": "ftp",
"description": "test. test",
"owned": False,
"ports": [21],
"protocol": "tcp",
"status": "open",
"parent": host.id
}
res = test_client.post(self.url(), data=data)
assert res.status_code == 201
service = Service.query.get(res.json['_id'])
assert service.name == "ftp"
assert service.port == 21
assert service.host is host

@pytest.mark.skip # more detailed test above
def test_create_succeeds(self, test_client):
pass

def test_create_fails_with_invalid_status(self, test_client,
host, session):
session.commit()
data = {
"name": "ftp",
"description": "test. test",
"owned": False,
"ports": [21],
"protocol": "tcp",
"status": "asdasdasd",
"parent": host.id
}
res = test_client.post(self.url(), data=data)
assert res.status_code == 400
assert b'Must be one of' in res.data

def test_create_fails_with_no_host_id(self, test_client,
host, session):
session.commit()
data = {
"name": "ftp",
"description": "test. test",
"owned": False,
"ports": [21],
"protocol": "tcp",
"status": "open",
}
res = test_client.post(self.url(), data=data)
assert res.status_code == 400
assert b'Parent id is required' in res.data

def test_create_fails_with_host_of_other_workspace(self, test_client,
host, session,
second_workspace):
session.commit()
assert host.workspace_id != second_workspace.id
data = {
"name": "ftp",
"description": "test. test",
"owned": False,
"ports": [21],
"protocol": "tcp",
"status": "open",
"parent": host.id
}
res = test_client.post(self.url(workspace=second_workspace), data=data)
assert res.status_code == 400
assert b'Host with id' in res.data

def test_update_fails_with_host_of_other_workspace(self, test_client,
second_workspace,
host_factory,
session):
host = host_factory.create(workspace=second_workspace)
session.commit()
assert host.workspace_id != self.first_object.workspace_id
data = {
"name": "ftp",
"description": "test. test",
"owned": False,
"ports": [21],
"protocol": "tcp",
"status": "open",
"parent": host.id
}
res = test_client.put(self.url(self.first_object), data=data)
assert res.status_code == 400
assert b'Can\'t change service parent.' in res.data

def test_create_service_returns_conflict_if_already_exists(self, test_client, host, session):
session.commit()
service = self.first_object
data = {
"name": service.name,
"description": service.description,
"owned": service.owned,
"ports": [service.port],
"protocol": service.protocol,
"status": service.status,
"parent": service.host_id
}
res = test_client.post(self.url(workspace=service.workspace), data=data)
assert res.status_code == 409
message = json.loads(res.data)
assert message['object']['_id'] == service.id

def _raw_put_data(self, id, parent=None, status='open', protocol='tcp', ports=None):
if not ports:
ports = [22]
raw_data = {"status": status,
"protocol": protocol,
"description": "",
"_rev": "",
"metadata": {"update_time": 1510945708000, "update_user": "", "update_action": 0, "creator": "",
"create_time": 1510945708000, "update_controller_action": "", "owner": "leonardo",
"command_id": None},
"owned": False,
"owner": "",
"version": "",
"_id": id,
"ports": ports,
"name": "ssh2",
"type": "Service"}
if parent:
raw_data['parent'] = parent
return raw_data

def test_update_with_json_from_webui(self, test_client, session):
service = self.factory()
session.commit()
raw_data = self._raw_put_data(service.id)

res = test_client.put(self.url(service, workspace=service.workspace), data=raw_data)
assert res.status_code == 200
updated_service = Service.query.filter_by(id=service.id).first()
assert updated_service.status == 'open'
assert updated_service.name == 'ssh2'

def test_update_cant_change_parent(self, test_client, session):
service = self.factory()
host = HostFactory.create()
session.commit()
raw_data = self._raw_put_data(service.id, parent=host.id)
res = test_client.put(self.url(service, workspace=service.workspace), data=raw_data)
assert res.status_code == 400
assert b'Can\'t change service parent.' in res.data
updated_service = Service.query.filter_by(id=service.id).first()
assert updated_service.name == service.name

def test_update_status(self, test_client, session):
service = self.factory(status='open')
session.commit()
raw_data = self._raw_put_data(service.id, parent=service.host.id, status='closed')
res = test_client.put(self.url(service, workspace=service.workspace), data=raw_data)
assert res.status_code == 200, res.json
updated_service = Service.query.filter_by(id=service.id).first()
assert updated_service.status == 'closed'

def test_update_ports(self, test_client, session):
service = self.factory(port=22)
session.commit()
raw_data = self._raw_put_data(service.id, parent=service.host.id, ports=[221])
res = test_client.put(self.url(service, workspace=service.workspace), data=raw_data)
assert res.status_code == 200, res.json
updated_service = Service.query.filter_by(id=service.id).first()
assert updated_service.port == 221

@pytest.mark.parametrize("method", ["PUT", "PATCH"])
def test_update_cant_change_id(self, test_client, session, method):
service = self.factory()
host = HostFactory.create()
session.commit()
raw_data = self._raw_put_data(service.id)
if method == "PUT":
res = test_client.put(self.url(service, workspace=service.workspace), data=raw_data)
if method == "PATCH":
res = test_client.patch(self.url(service, workspace=service.workspace), data=raw_data)

assert res.status_code == 200, res.json
assert res.json['id'] == service.id

def test_create_service_from_command(self, test_client, session):
host = HostFactory.create(workspace=self.workspace)
command = EmptyCommandFactory.create(workspace=self.workspace)
session.commit()
assert len(command.command_objects) == 0
url = self.url(workspace=command.workspace) + '?' + urlencode({'command_id': command.id})
raw_data = {
"name": "SSH",
"description": "SSH service",
"owned": False,
"ports": [22],
"protocol": "tcp",
"status": "open",
"parent": host.id
}
res = test_client.post(url, data=raw_data)

assert res.status_code == 201
assert len(command.command_objects) == 1
cmd_obj = command.command_objects[0]
assert cmd_obj.object_type == 'service'
assert cmd_obj.object_id == res.json['id']

def test_create_service_without_ost(self, test_client, host, session):
session.commit()
data = {
"name": "ftp",
"description": "test. test",
"owned": False,
"ports": [21],
"protocol": "tcp",
"status": "open",
}
res = test_client.post(self.url(), data=data)
assert res.status_code == 400

def test_load_ports_without_list(self, test_client):
data = {
"name": "ports",
"description": "testing ports load",
"owned": False,
"ports": 21,
"protocol": "tcp",
"status": "open",
}
res = test_client.post(self.url(), data=data)
assert res.status_code == 400

def test_load_ports_with_empty_list(self, test_client):
data = {
"name": "ports",
"description": "testing ports load",
"owned": False,
"ports": [],
"protocol": "tcp",
"status": "open",
}
res = test_client.post(self.url(), data=data)
assert res.status_code == 400

def test_load_ports_with_negative_value(self, test_client, session):
host = HostFactory.create(workspace=self.workspace)
session.commit()
data = {
"name": "ports",
"description": "testing ports load",
"owned": False,
"ports": [-1],
"protocol": "tcp",
"status": "open",
"parent": host.id
}
res = test_client.post(self.url(), data=data)
assert res.status_code == 400

def test_load_invalid_port(self, test_client, session):
host = HostFactory.create(workspace=self.workspace)
session.commit()
data = {
"name": "ports",
"description": "testing ports load",
"owned": False,
"ports": [65536],
"protocol": "tcp",
"status": "open",
"parent": host.id
}
res = test_client.post(self.url(), data=data)
print(res.data)
assert res.status_code == 400

@pytest.mark.usefixtures('ignore_nplusone')
def test_bulk_update_cant_change_id(self, test_client):
super().test_bulk_update_cant_change_id(test_client)

def test_bulk_delete_with_references(self, test_client, session):
previous_creds = Credential.query.count()
previous_vulns = Vulnerability.query.count()
previous_services = Service.query.count()

service_1 = self.factory.create(workspace=self.workspace)
service_2 = self.factory.create(workspace=self.workspace)
service_3 = self.factory.create(workspace=self.workspace)

for _ in range(3):
CredentialFactory.create(service=service_1, workspace=self.workspace)
VulnerabilityFactory.create(service=service_2, workspace=self.workspace)
CredentialFactory.create(service=service_3, workspace=self.workspace)
VulnerabilityFactory.create(service=service_3, workspace=self.workspace)
session.commit()

raw_data = {'ids': [service_1.id, service_2.id, service_3.id]}
res = test_client.delete(self.url(), data=raw_data)

assert res.status_code == 200
assert res.json['deleted'] == 3

0 comments on commit 5058981

Please sign in to comment.