From 4abf52976cd18390f2663be68d1d25da06fe90ab Mon Sep 17 00:00:00 2001 From: yampelo Date: Wed, 13 Nov 2019 16:14:53 -0500 Subject: [PATCH] adds from_datasources function for graphing multiple artifacts. --- README.md | 19 +++++++++++- beagle/backends/base_backend.py | 34 +++++++++++++++++++- tests/backend/test_networkx.py | 55 ++++++++++++++++++++++++++++++++- 3 files changed, 105 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index bf3fd27c..97c5c53f 100644 --- a/README.md +++ b/README.md @@ -39,7 +39,7 @@ Beagle can be used directly as a python library, or through a provided web inter -The library can be used either as a sequence of functional calls. +The library can be used either as a sequence of functional calls from a single datasource. ```python >>> from beagle.datasources import SysmonEVTX @@ -49,6 +49,23 @@ The library can be used either as a sequence of functional calls. ``` +As a graph generated from a set of multiple artifacts + +```python +>>> from beagle.datasources import SysmonEVTX, HXTriage, PCAP +>>> from beagle.backends import NetworkX + +>>> nx = NetworkX.from_datasources( + datasources=[ + SysmonEVTX("malicious.evtx"), + HXTriage("alert.mans"), + PCAP("traffic.pcap"), + ] +) +>>> G = nx.graph() + +``` + Or by strictly calling each intermediate step of the data source to graph process. ```python diff --git a/beagle/backends/base_backend.py b/beagle/backends/base_backend.py index 1583ccdd..8fd1420a 100644 --- a/beagle/backends/base_backend.py +++ b/beagle/backends/base_backend.py @@ -1,8 +1,11 @@ from abc import ABCMeta, abstractmethod -from typing import List, Any, Union +from typing import List, Any, Union, TYPE_CHECKING from beagle.nodes import Node +if TYPE_CHECKING: + from beagle.datasources import DataSource + class Backend(object, metaclass=ABCMeta): """Abstract Backend Class. All Backends must implement the @@ -56,3 +59,32 @@ def is_empty(self) -> bool: """Returns true if there wasn't a graph created. """ raise NotImplementedError() + + @classmethod + def from_datasources( + cls, datasources: Union["DataSource", List["DataSource"]], *args, **kwargs + ) -> "Backend": + """Create a backend instance from a set of datasources + + Parameters + ---------- + datasources : Union[DataSource, List[DataSource]] + A set of datasources to use when creating the backend. + + Returns + ------- + Backend + Returns the configured instance + """ + + nodes = [] # type: List[Node] + + if not isinstance(datasources, List): + datasources = [datasources] + + for datasource in datasources: + nodes += datasource.to_transformer().run() + + instance = cls(*args, nodes=nodes, **kwargs) # type: ignore + + return instance diff --git a/tests/backend/test_networkx.py b/tests/backend/test_networkx.py index 74760d0c..054e7005 100644 --- a/tests/backend/test_networkx.py +++ b/tests/backend/test_networkx.py @@ -1,5 +1,5 @@ import json -from typing import Callable +from typing import Callable, List import networkx import pytest @@ -8,6 +8,22 @@ from beagle.nodes import Process, File +from io import BytesIO + +from scapy.all import Ether, PcapWriter, Packet, IP, UDP, TCP, DNS, DNSQR, DNSRR + +from scapy.layers.http import HTTPRequest, HTTP + +from beagle.datasources.pcap import PCAP + + +def packets_to_datasource_events(packets: List[Packet]) -> PCAP: + f = BytesIO() + PcapWriter(f).write(packets) + f.seek(0) + return PCAP(f) # type: ignore + + @pytest.fixture() def nx() -> Callable[..., NetworkX]: def _backend(*args, **kwargs) -> networkx.Graph: @@ -186,3 +202,40 @@ def test_add_node_overlaps_existing(nx): assert networkx.has_path(G, u, v2) assert "Launched" in G[u][v] assert "Wrote" in G[u][v2] + + +def test_from_datasources(): + packets_1 = [ + Ether(src="ab:ab:ab:ab:ab:ab", dst="12:12:12:12:12:12") + / IP(src="127.0.0.1", dst="192.168.1.1") + / TCP(sport=12345, dport=80) + / HTTP() + / HTTPRequest(Method="GET", Path="/foo", Host="https://google.com") + ] + + packets_2 = [ + # HTTP Packet + Ether(src="ab:ab:ab:ab:ab:ab", dst="12:12:12:12:12:12") + / IP(src="127.0.0.1", dst="192.168.1.1") + / TCP(sport=12345, dport=80) + / HTTP() + / HTTPRequest(Method="GET", Path="/foo", Host="https://google.com"), + # DNS Packet + Ether(src="ab:ab:ab:ab:ab:ab", dst="12:12:12:12:12:12") + / IP(src="127.0.0.1", dst="192.168.1.1") + / UDP(sport=80, dport=53) + / DNS(rd=1, qd=DNSQR(qtype="A", qname="google.com"), an=DNSRR(rdata="123.0.0.1")), + # TCP Packet + Ether(src="ab:ab:ab:ab:ab:ab", dst="12:12:12:12:12:12") + / IP(src="127.0.0.1", dst="192.168.1.1") + / TCP(sport=80, dport=5355), + ] + + nx = NetworkX.from_datasources( + [packets_to_datasource_events(packets) for packets in [packets_1, packets_2]] + ) + + # Make the graph + nx.graph() + + assert not nx.is_empty()