diff --git a/uncoder-core/app/routers/ioc_translate.py b/uncoder-core/app/routers/ioc_translate.py index 7eb702ed..51ecc96d 100644 --- a/uncoder-core/app/routers/ioc_translate.py +++ b/uncoder-core/app/routers/ioc_translate.py @@ -4,11 +4,10 @@ from app.models.ioc_translation import CTIPlatform, OneTranslationCTIData from app.models.translation import InfoMessage -from app.translator.cti_translator import CTITranslator +from app.translator.cti_translator import cti_translator from app.translator.tools.const import HashType, IocParsingRule, IOCType iocs_router = APIRouter() -cti_translator = CTITranslator() @iocs_router.post("/iocs/translate", description="Parse IOCs from text.") @@ -46,3 +45,32 @@ def parse_and_translate_iocs( info_message = InfoMessage(message=translations, severity="error") return OneTranslationCTIData(info=info_message, status=status, target_platform_id=platform.id) + + +@iocs_router.post("/iocs/generate", description="Parse IOCs from text and based on input data generate translation") +@iocs_router.post("/iocs/generate", include_in_schema=False) +def parse_iocs_and_generate_rule( + text: str = Body(..., description="Text to parse IOCs from", embed=True), + platform: CTIPlatform = Body(..., description="Platform to parse IOCs to", embed=True), + iocs_per_query: int = Body(25, description="IOCs per query limit", embed=True), + title: str = Body(..., description="Title", embed=True), + description: str = Body(..., description="Description", embed=True), + references: list[str] = Body(..., description="References", embed=True), + created_date: str = Body(..., description="Rule created date", embed=True), + mitre_tags: Optional[list[str]] = Body(..., description="Mitra tactics and techniques", embed=True), +) -> OneTranslationCTIData: + status, translations = cti_translator.generate( + title=title, + text=text, + platform_data=platform, + description=description, + references=references, + created_date=created_date, + mitre_tags=mitre_tags, + iocs_per_query=iocs_per_query, + ) + if status: + return OneTranslationCTIData(status=status, translations=translations, target_platform_id=platform.id) + + info_message = InfoMessage(message=translations, severity="error") + return OneTranslationCTIData(info=info_message, status=status, target_platform_id=platform.id) diff --git a/uncoder-core/app/translator/core/mixins/tokens.py b/uncoder-core/app/translator/core/mixins/tokens.py new file mode 100644 index 00000000..2bba49c7 --- /dev/null +++ b/uncoder-core/app/translator/core/mixins/tokens.py @@ -0,0 +1,18 @@ +from app.translator.core.const import QUERY_TOKEN_TYPE +from app.translator.core.custom_types.tokens import LogicalOperatorType, OperatorType +from app.translator.core.mapping import SourceMapping +from app.translator.core.models.query_tokens.field_value import FieldValue +from app.translator.core.models.query_tokens.identifier import Identifier + + +class ExtraConditionMixin: + def generate_extra_conditions(self, source_mapping: SourceMapping) -> list[QUERY_TOKEN_TYPE]: + extra_tokens = [] + for field, value in source_mapping.conditions.items(): + extra_tokens.extend( + [ + FieldValue(source_name=field, operator=Identifier(token_type=OperatorType.EQ), value=value), + Identifier(token_type=LogicalOperatorType.AND), + ] + ) + return extra_tokens diff --git a/uncoder-core/app/translator/core/parser_cti.py b/uncoder-core/app/translator/core/parser_cti.py index 05ca14fa..41df2773 100644 --- a/uncoder-core/app/translator/core/parser_cti.py +++ b/uncoder-core/app/translator/core/parser_cti.py @@ -52,13 +52,17 @@ def get_iocs_from_string( include_source_ip: Optional[bool] = False, ) -> dict: iocs = Iocs() - string = self.replace_dots_hxxp(string, ioc_parsing_rules) if not include_ioc_types or "ip" in include_ioc_types: - iocs.ip.extend(self._find_all_str_by_regex(string, IP_IOC_REGEXP_PATTERN)) + for ip in self._find_all_str_by_regex(string, IP_IOC_REGEXP_PATTERN): + iocs.ip.append(self.replace_dots_hxxp(ip)) if not include_ioc_types or "domain" in include_ioc_types: - iocs.domain.extend(self._find_all_str_by_regex(string, DOMAIN_IOC_REGEXP_PATTERN)) + for domain in self._find_all_str_by_regex(string, DOMAIN_IOC_REGEXP_PATTERN): + for domain_val in domain: + if domain_val: + iocs.domain.append(self.replace_dots_hxxp(domain_val)) if not include_ioc_types or "url" in include_ioc_types: - iocs.url.extend(self._find_all_str_by_regex(string, URL_IOC_REGEXP_PATTERN)) + for url in self._find_all_str_by_regex(string, URL_IOC_REGEXP_PATTERN): + iocs.url.append(self.replace_dots_hxxp(url).rstrip(".")) if not include_ioc_types or "hash" in include_ioc_types: if not include_hash_types: include_hash_types = list(hash_regexes.keys()) @@ -74,7 +78,7 @@ def get_iocs_from_string( raise IocsLimitExceededException(f"IOCs count {total_count} exceeds limit {limit}.") return iocs.return_iocs(include_source_ip) - def replace_dots_hxxp(self, string: str, ioc_parsing_rules: Optional[list[IocParsingRule]]) -> str: + def replace_dots_hxxp(self, string: str, ioc_parsing_rules: Optional[list[IocParsingRule]] = None) -> str: if ioc_parsing_rules is None or "replace_dots" in ioc_parsing_rules: string = self._replace_dots(string) if ioc_parsing_rules is None or "replace_hxxp" in ioc_parsing_rules: diff --git a/uncoder-core/app/translator/core/render_cti.py b/uncoder-core/app/translator/core/render_cti.py index 20bfb7bf..b0cf631e 100644 --- a/uncoder-core/app/translator/core/render_cti.py +++ b/uncoder-core/app/translator/core/render_cti.py @@ -16,6 +16,7 @@ limitations under the License. ----------------------------------------------------------------- """ +from abc import abstractmethod from app.translator.core.models.iocs import IocsChunkValue from app.translator.core.models.platform_details import PlatformDetails @@ -46,6 +47,10 @@ def render(self, data: list[list[IocsChunkValue]]) -> list[str]: final_result.append(self.final_result_for_one.format(result=data_values[0])) return final_result + @abstractmethod + def generate(self, data: dict[str, list[list[IocsChunkValue]]], **kwargs) -> list[str]: + raise NotImplementedError("Abstract method") + def collect_data_values(self, chunk: list[IocsChunkValue]) -> list[str]: data_values = [] key_chunk = [] diff --git a/uncoder-core/app/translator/cti_translator.py b/uncoder-core/app/translator/cti_translator.py index 79b25fc4..2d345b3e 100644 --- a/uncoder-core/app/translator/cti_translator.py +++ b/uncoder-core/app/translator/cti_translator.py @@ -6,6 +6,7 @@ from app.translator.core.models.iocs import IocsChunkValue from app.translator.core.parser_cti import CTIParser from app.translator.managers import RenderCTIManager, render_cti_manager +from app.translator.tools.const import DefaultHashType, DefaultIocParsingRule, DefaultIOCType, iocs_types_map from app.translator.tools.decorators import handle_translation_exceptions @@ -45,6 +46,31 @@ def __render_translation(self, parsed_data: dict, platform_data: CTIPlatform, io ) return render_cti.render(chunked_iocs) + def __sort_iocs_by_type(self, parsed_data: dict) -> dict: + result = {} + for key, values in iocs_types_map.items(): + if not result.get(key): + result[key] = {} + for generic_field, iocs_list in parsed_data.items(): + if generic_field in values: + result[key][generic_field] = iocs_list + return result + + @handle_translation_exceptions + def __generate_translation( + self, parsed_data: dict, platform_data: CTIPlatform, iocs_per_query: int, **kwargs + ) -> list[str]: + render_cti = self.render_manager.get(platform_data.id) + + sorted_data = self.__sort_iocs_by_type(parsed_data) + chunked_iocs = {} + for key, chunk in sorted_data.items(): + if ioc_chuck := self.__get_iocs_chunk( + chunks_size=iocs_per_query, data=chunk, mapping=render_cti.default_mapping + ): + chunked_iocs[key] = ioc_chuck + return render_cti.generate(chunked_iocs, **kwargs) + def translate( self, text: str, @@ -70,6 +96,37 @@ def translate( ) return status, parsed_data + def generate( + self, + text: str, + title: str, + description: str, + references: list[str], + created_date: str, + mitre_tags: Optional[list[str]], + platform_data: CTIPlatform, + iocs_per_query: int = CTI_IOCS_PER_QUERY_LIMIT, + ) -> (bool, list[str]): + status, parsed_data = self.__parse_iocs_from_string( + text=text, + include_ioc_types=DefaultIOCType, + include_hash_types=DefaultHashType, + ioc_parsing_rules=DefaultIocParsingRule, + include_source_ip=True, + ) + if status: + kwargs = { + "title": title, + "description": description, + "references": references, + "created_date": created_date, + "mitre_tags": mitre_tags, + } + return self.__generate_translation( + parsed_data=parsed_data, platform_data=platform_data, iocs_per_query=iocs_per_query, **kwargs + ) + return status, parsed_data + @staticmethod def __get_iocs_chunk( chunks_size: int, data: dict[str, list[str]], mapping: dict[str, str] @@ -86,3 +143,6 @@ def __get_iocs_chunk( @classmethod def get_renders(cls) -> list: return cls.render_manager.get_platforms_details + + +cti_translator = CTITranslator() diff --git a/uncoder-core/app/translator/platforms/arcsight/renders/arcsight_cti.py b/uncoder-core/app/translator/platforms/arcsight/renders/arcsight_cti.py index 778ef04e..311e124b 100644 --- a/uncoder-core/app/translator/platforms/arcsight/renders/arcsight_cti.py +++ b/uncoder-core/app/translator/platforms/arcsight/renders/arcsight_cti.py @@ -1,15 +1,14 @@ from app.translator.core.models.platform_details import PlatformDetails from app.translator.core.render_cti import RenderCTI from app.translator.managers import render_cti_manager -from app.translator.platforms.arcsight.const import ARCSIGHT_QUERY_DETAILS -from app.translator.platforms.arcsight.mappings.arcsight_cti import DEFAULT_ARCSIGHT_MAPPING +from app.translator.platforms.arcsight.const import DEFAULT_ARCSIGHT_CTI_MAPPING, arcsight_query_details @render_cti_manager.register class ArcsightKeyword(RenderCTI): - details: PlatformDetails = PlatformDetails(**ARCSIGHT_QUERY_DETAILS) + details: PlatformDetails = arcsight_query_details - default_mapping = DEFAULT_ARCSIGHT_MAPPING + default_mapping = DEFAULT_ARCSIGHT_CTI_MAPPING field_value_template: str = "{key} = {value}" or_operator: str = " OR " group_or_operator: str = " OR " diff --git a/uncoder-core/app/translator/platforms/sigma/const.py b/uncoder-core/app/translator/platforms/sigma/const.py index aaedda41..87d06daa 100644 --- a/uncoder-core/app/translator/platforms/sigma/const.py +++ b/uncoder-core/app/translator/platforms/sigma/const.py @@ -8,4 +8,16 @@ "group_id": "sigma", } +DEFAULT_SIGMA_CTI_MAPPING = { + "SourceIP": "src-ip", + "DestinationIP": "dst-ip", + "Domain": "cs-host", + "URL": "c-uri", + "HashMd5": "Hashes", + "HashSha1": "Hashes", + "HashSha256": "Hashes", + "HashSha512": "Hashes", +} + + sigma_rule_details = PlatformDetails(**SIGMA_RULE_DETAILS) diff --git a/uncoder-core/app/translator/platforms/sigma/renders/sigma_cti.py b/uncoder-core/app/translator/platforms/sigma/renders/sigma_cti.py new file mode 100644 index 00000000..51c6ce20 --- /dev/null +++ b/uncoder-core/app/translator/platforms/sigma/renders/sigma_cti.py @@ -0,0 +1,69 @@ +import uuid +import yaml + +from app.translator.core.custom_types.meta_info import SeverityType +from app.translator.core.models.iocs import IocsChunkValue +from app.translator.core.models.platform_details import PlatformDetails +from app.translator.core.render_cti import RenderCTI +from app.translator.managers import render_cti_manager +from app.translator.platforms.sigma.const import sigma_rule_details, DEFAULT_SIGMA_CTI_MAPPING +from app.translator.tools.const import LOGSOURCE_MAP + + +@render_cti_manager.register +class SigmaRenderCTI(RenderCTI): + details: PlatformDetails = sigma_rule_details + default_mapping = DEFAULT_SIGMA_CTI_MAPPING + + def render(self, data: list[list[IocsChunkValue]]) -> list[str]: + final_result = [] + for iocs_chunk in data: + data_values = self.collect_sigma_data_values(iocs_chunk) + rule = { + "title": "Sigma automatically generated based on IOCs", + "id": uuid.uuid4().__str__(), + "description": "Detects suspicious activity based on IOCs.", + "status": "experimental", + "author": "SOC Prime", + "logsource": {"product": "windows"}, + "fields": list(data_values.keys()), + "detection": {"selection": data_values, "condition": "selection"}, + "level": SeverityType.low, + "falsepositives": "", + } + final_result.append(yaml.dump(rule, default_flow_style=False, sort_keys=False)) + return final_result + + def collect_sigma_data_values(self, chunk: list[IocsChunkValue]) -> dict: + raw_data_values = {} + for value in chunk: + if value.platform_field in raw_data_values.keys(): + raw_data_values[value.platform_field].append(value.value) + else: + raw_data_values[value.platform_field] = [value.value] + return raw_data_values + + def generate(self, data: dict[list[list[IocsChunkValue]]], **kwargs): + final_result = [] + for key, iocs_chunks in data.items(): + for iocs_chunk in iocs_chunks: + data_values = self.collect_sigma_data_values(iocs_chunk) + rule = { + "title": f"IOCs ({key}) to detect: {kwargs['title']}", + "id": uuid.uuid4().__str__(), + "description": kwargs["description"], + "status": "stable", + "author": "Uncoder Autogenerated", + "logsource": LOGSOURCE_MAP.get(key), + "fields": list(data_values.keys()), + "detection": {"selection": data_values, "condition": "selection"}, + "level": SeverityType.medium, + "falsepositives": "", + "references": kwargs["references"], + "date": kwargs["created_date"], + "modified": kwargs["created_date"], + } + if kwargs.get("mitre_tags"): + rule["tags"] = kwargs["mitre_tags"] + final_result.append(yaml.dump(rule, default_flow_style=False, sort_keys=False)) + return final_result \ No newline at end of file diff --git a/uncoder-core/app/translator/tools/const.py b/uncoder-core/app/translator/tools/const.py index caee962e..ab7bae78 100644 --- a/uncoder-core/app/translator/tools/const.py +++ b/uncoder-core/app/translator/tools/const.py @@ -1,15 +1,35 @@ -from typing import Literal +from typing import Literal, get_args -IP_IOC_REGEXP_PATTERN = r"(?:^|[ \/\[(\"',;>|])((?:25[0-5]|2[0-4]\d|[0-1]?\d{1,2})(?:\.(?:25[0-5]|2[0-4]\d|[0-1]?\d{1,2})){3})(?=[\s)\]\"',;:\/?\n<|]|$)" # noqa: E501 -DOMAIN_IOC_REGEXP_PATTERN = r"(?:^|[\s\/\[\]@(\"',;{>|])(?:(?:http[s]?|ftp):\/\/?)?([^:\\\/\s({\[\]@\"'`,]+\.[a-zA-Z]+)(?:(?:(?:[/|:]\w+)*\/)(?:[\w\-.]+[^#?\s]+)?(?:[\w/\-&?=%.#]+(?:\(\))?)?)?(?=[\s)\]\"',;<|]|$)" # noqa: E501 -URL_IOC_REGEXP_PATTERN = r"(?:^|[\s\/\[\]@(\"',;{>|])((?:(?:http[s]?|ftp):\/\/?)+(?:[^:\\\/\s({\[\]@\"'`,]+\.[a-zA-Z0-9]+)(?:(?:(?:[/|:]\w+)*\/)(?:[\w\-.]+[^#?\s<']+)?(?:[\w/\-&?=%.#]+(?:\(\))?)?)?)(?=[\s)\]\"',;<|]|$)" # noqa: E501 +IP_IOC_REGEXP_PATTERN = r"(?:^|[ \/\[(\"',;>|])((?:25[0-5]|2[0-4]\d|[0-1]?\d{1,2})(?:\[?\(?\{?\.\]?\)?\}?(?:25[0-5]|2[0-4]\d|[0-1]?\d{1,2})){3})(?=[\s)\]\"',;:\/?\n<|]|$)" # noqa: E501 +DOMAIN_IOC_REGEXP_PATTERN = r"((?:[A-Za-z0-9|-]+\[?\{?\(?\.\)?\}?\]?)*[A-Za-z0-9|-]+(?:\[|\{|\()\.(?:\)|\}|\])[A-Za-z|-]+)|((?:[A-Za-z0-9|-]+(?:\[|\{|\()\.(?:\)|\}|\]))*[A-Za-z0-9|-]+(?:\[|\{|\()\.(?:\)|\}|\])[A-Za-z|-]+)|(?:h[xX][xX]p[s]?|f[txX]p[s]?):\/\/([^\/\s,:;]+)+" # noqa: E501 +URL_IOC_REGEXP_PATTERN = r"(?:h[xX][xX]p[s]?|f[txX]p[s]?):\/\/[^\s,:;]+" IOCType = Literal["ip", "domain", "url", "hash"] HashType = Literal["md5", "sha1", "sha256", "sha512"] -IocParsingRule = Literal["replace_dots", "remove_private_and_reserved_ips", "replace_hxxp"] +IocParsingRule = Literal["remove_private_and_reserved_ips"] + +DefaultIOCType = list(get_args(IOCType)) +DefaultHashType = list(get_args(HashType)) +DefaultIocParsingRule = list(get_args(IocParsingRule)) HASH_MAP = {"md5": "HashMd5", "sha1": "HashSha1", "sha256": "HashSha256", "sha512": "HashSha512"} +iocs_types_map = { + "url": ["URL"], + "domain": ["Domain"], + "ip": ["DestinationIP", "SourceIP"], + "hash": ["HashMd5", "HashSha1", "HashSha256", "HashSha512"], +} + +LOGSOURCE_MAP = { + "hash": {"category": "file_event"}, + "domain": {"category": "proxy"}, + "url": {"category": "proxy"}, + "ip": {"category": "firewall"}, + "emails": {"category": "mail"}, + "files": {"category": "file_event"}, +} + hash_regexes = { "md5": r"(?:^|[\s\/\[(\"',;{>|])([A-Fa-f0-9]{32})(?=[\s)\]\"',;\n<|]|$)", "sha1": r"(?:^|[\s\/\[(\"',;{>|])([A-Fa-f0-9]{40})(?=[\s)\]\"',;\n<|]|$)",