Skip to content

Commit

Permalink
make plugin imports optional
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkjanm committed Nov 15, 2022
1 parent c5ace31 commit 91a007c
Showing 1 changed file with 109 additions and 105 deletions.
214 changes: 109 additions & 105 deletions roadrecon/roadtools/roadrecon/plugins/road2timeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@
import yaml
import pandas as pd
import numpy as np
HAS_RTT_MODULES = True
except (ModuleNotFoundError, ImportError) as exc:
print(f"Problem importing required module for ROAD2timeline plugin: {str(exc)}")
raise exc
HAS_RTT_MODULES = False


DESCRIPTION = "Timeline analysis of Azure AD objects"
Expand Down Expand Up @@ -80,109 +80,109 @@ def add_args(parser: argparse.ArgumentParser):
default="timeline-output.jsonl",
)


def populate_timeline_entry(
row: pd.Series, timeline_entry_templates: Dict[str, Dict[str, str]]
) -> str:
"""Attempts to populate a timeline entry template with
the relevant fields from a row in the database.
Parameters
----------
row : pd.Series
A row from a DataFrame, passed by `.apply(..., axis=1)`
timeline_entry_templates : Dict[str, Dict[str, str]]
Dictionary of template strings organized by table and
timestamp column name.
Returns
-------
str
Templated timeline event message
"""

table_templates = timeline_entry_templates.get(row._table_name, {})
template_text = table_templates.get(row._timestamp_column)
if template_text:
try:
return template_text.format(**row.to_dict())
except Exception as exc:
print(f"There was a problem parsing the message: {str(exc)}")
return f"Error parsing template {row._table_name}.{row._timestamp_column}: Object ID - {row._object_id}"
else:
return f"No template found for {row._table_name}.{row._timestamp_column}: Object ID - {row._object_id}"


def to_dataframe(
session: sqlalchemy.orm.session.Session,
table: sqlalchemy.orm.DeclarativeMeta,
) -> pd.DataFrame:
"""Reads sqlite table and converts to pd.DataFrame.
This also converts the string values in datetime columns
into their equivalent Python `datetime` representation.
Parameters
----------
session : sqlalchemy.orm.session.Session
SQLAlchemy session with the ROADtools database
table : sqlalchemy.ext.declarative.api.DeclarativeMeta
An object representing the SQL table in the ROADtools
database
Returns
-------
pd.DataFrame
All data from the table converted to Python data
types
"""
rows = session.query(table).all()
if not rows:
return pd.DataFrame()

df = pd.json_normalize(
[
{
**row._asdict(),
"_object_id": getattr(row, "objectId", None)
or getattr(row, "id", None)
or "Identifier Not Found",
}
for row in rows
]
).assign(_table_name=table.name)

for column in table.columns:
# Attempts to coerce sqlite datetime columns
# into a python Datetime object.
if type(column.type) == sqlalchemy.sql.sqltypes.DATETIME:
if column.name in df.columns:
df[column.name] = df[column.name].apply(pd.to_datetime, errors="coerce")

return df


def copy_dataframe_by_col(df: pd.DataFrame, col: str) -> pd.DataFrame:
"""Returns a copy of a `pd.DataFrame` with additional columns
for a given datetime field.
Parameters
----------
df : pd.DataFrame
Source DataFrame
col : str
Timestamp column of interest
Returns
-------
pd.DataFrame
Copy of the source DataFrame, but with additional
columns detailing the column of interest and dropping
all rows where that timestamp column is not null
"""
df = df.copy()
df["_timestamp"] = df[col]
df["_timestamp_column"] = col
return df[df[col].notnull()]
if HAS_RTT_MODULES:
def populate_timeline_entry(
row: pd.Series, timeline_entry_templates: Dict[str, Dict[str, str]]
) -> str:
"""Attempts to populate a timeline entry template with
the relevant fields from a row in the database.
Parameters
----------
row : pd.Series
A row from a DataFrame, passed by `.apply(..., axis=1)`
timeline_entry_templates : Dict[str, Dict[str, str]]
Dictionary of template strings organized by table and
timestamp column name.
Returns
-------
str
Templated timeline event message
"""

table_templates = timeline_entry_templates.get(row._table_name, {})
template_text = table_templates.get(row._timestamp_column)
if template_text:
try:
return template_text.format(**row.to_dict())
except Exception as exc:
print(f"There was a problem parsing the message: {str(exc)}")
return f"Error parsing template {row._table_name}.{row._timestamp_column}: Object ID - {row._object_id}"
else:
return f"No template found for {row._table_name}.{row._timestamp_column}: Object ID - {row._object_id}"


def to_dataframe(
session: sqlalchemy.orm.session.Session,
table: sqlalchemy.orm.DeclarativeMeta,
) -> pd.DataFrame:
"""Reads sqlite table and converts to pd.DataFrame.
This also converts the string values in datetime columns
into their equivalent Python `datetime` representation.
Parameters
----------
session : sqlalchemy.orm.session.Session
SQLAlchemy session with the ROADtools database
table : sqlalchemy.ext.declarative.api.DeclarativeMeta
An object representing the SQL table in the ROADtools
database
Returns
-------
pd.DataFrame
All data from the table converted to Python data
types
"""
rows = session.query(table).all()
if not rows:
return pd.DataFrame()

df = pd.json_normalize(
[
{
**row._asdict(),
"_object_id": getattr(row, "objectId", None)
or getattr(row, "id", None)
or "Identifier Not Found",
}
for row in rows
]
).assign(_table_name=table.name)

for column in table.columns:
# Attempts to coerce sqlite datetime columns
# into a python Datetime object.
if type(column.type) == sqlalchemy.sql.sqltypes.DATETIME:
if column.name in df.columns:
df[column.name] = df[column.name].apply(pd.to_datetime, errors="coerce")

return df


def copy_dataframe_by_col(df: pd.DataFrame, col: str) -> pd.DataFrame:
"""Returns a copy of a `pd.DataFrame` with additional columns
for a given datetime field.
Parameters
----------
df : pd.DataFrame
Source DataFrame
col : str
Timestamp column of interest
Returns
-------
pd.DataFrame
Copy of the source DataFrame, but with additional
columns detailing the column of interest and dropping
all rows where that timestamp column is not null
"""
df = df.copy()
df["_timestamp"] = df[col]
df["_timestamp_column"] = col
return df[df[col].notnull()]


def main(args: Optional[argparse.Namespace] = None) -> Path:
Expand All @@ -201,6 +201,10 @@ def main(args: Optional[argparse.Namespace] = None) -> Path:
db_metadata = sqlalchemy.MetaData(bind=engine, schema="main")
db_metadata.reflect()

if not HAS_RTT_MODULES:
print('Error importing required modules for road2timeline. Make sure pyyaml, numpy and pandas are installed.')
return

# Find the templates file which is used
# to populate timeline entries from the
# source rows.
Expand Down

0 comments on commit 91a007c

Please sign in to comment.