From 0236ff11882fa4406626a956679137abe9278b6c Mon Sep 17 00:00:00 2001 From: Gesyk Nazar <77268518+nazargesyk@users.noreply.github.com> Date: Fri, 13 Dec 2024 13:50:09 +0200 Subject: [PATCH 01/10] gis-9379 add new endpoint /iocs/generate --- uncoder-core/app/routers/ioc_translate.py | 32 ++++++++- .../app/translator/core/mixins/tokens.py | 18 +++++ .../app/translator/core/render_cti.py | 5 ++ uncoder-core/app/translator/cti_translator.py | 60 ++++++++++++++++ .../arcsight/renders/arcsight_cti.py | 7 +- .../platforms/sigma/renders/sigma_cti.py | 69 +++++++++++++++++++ uncoder-core/app/translator/tools/const.py | 28 ++++++-- 7 files changed, 209 insertions(+), 10 deletions(-) create mode 100644 uncoder-core/app/translator/core/mixins/tokens.py create mode 100644 uncoder-core/app/translator/platforms/sigma/renders/sigma_cti.py diff --git a/uncoder-core/app/routers/ioc_translate.py b/uncoder-core/app/routers/ioc_translate.py index 7eb702ed..51ecc96d 100644 --- a/uncoder-core/app/routers/ioc_translate.py +++ b/uncoder-core/app/routers/ioc_translate.py @@ -4,11 +4,10 @@ from app.models.ioc_translation import CTIPlatform, OneTranslationCTIData from app.models.translation import InfoMessage -from app.translator.cti_translator import CTITranslator +from app.translator.cti_translator import cti_translator from app.translator.tools.const import HashType, IocParsingRule, IOCType iocs_router = APIRouter() -cti_translator = CTITranslator() @iocs_router.post("/iocs/translate", description="Parse IOCs from text.") @@ -46,3 +45,32 @@ def parse_and_translate_iocs( info_message = InfoMessage(message=translations, severity="error") return OneTranslationCTIData(info=info_message, status=status, target_platform_id=platform.id) + + +@iocs_router.post("/iocs/generate", description="Parse IOCs from text and based on input data generate translation") +@iocs_router.post("/iocs/generate", include_in_schema=False) +def parse_iocs_and_generate_rule( + text: str = Body(..., description="Text to parse IOCs from", embed=True), + platform: CTIPlatform = Body(..., description="Platform to parse IOCs to", embed=True), + iocs_per_query: int = Body(25, description="IOCs per query limit", embed=True), + title: str = Body(..., description="Title", embed=True), + description: str = Body(..., description="Description", embed=True), + references: list[str] = Body(..., description="References", embed=True), + created_date: str = Body(..., description="Rule created date", embed=True), + mitre_tags: Optional[list[str]] = Body(..., description="Mitra tactics and techniques", embed=True), +) -> OneTranslationCTIData: + status, translations = cti_translator.generate( + title=title, + text=text, + platform_data=platform, + description=description, + references=references, + created_date=created_date, + mitre_tags=mitre_tags, + iocs_per_query=iocs_per_query, + ) + if status: + return OneTranslationCTIData(status=status, translations=translations, target_platform_id=platform.id) + + info_message = InfoMessage(message=translations, severity="error") + return OneTranslationCTIData(info=info_message, status=status, target_platform_id=platform.id) diff --git a/uncoder-core/app/translator/core/mixins/tokens.py b/uncoder-core/app/translator/core/mixins/tokens.py new file mode 100644 index 00000000..2bba49c7 --- /dev/null +++ b/uncoder-core/app/translator/core/mixins/tokens.py @@ -0,0 +1,18 @@ +from app.translator.core.const import QUERY_TOKEN_TYPE +from app.translator.core.custom_types.tokens import LogicalOperatorType, OperatorType +from app.translator.core.mapping import SourceMapping +from app.translator.core.models.query_tokens.field_value import FieldValue +from app.translator.core.models.query_tokens.identifier import Identifier + + +class ExtraConditionMixin: + def generate_extra_conditions(self, source_mapping: SourceMapping) -> list[QUERY_TOKEN_TYPE]: + extra_tokens = [] + for field, value in source_mapping.conditions.items(): + extra_tokens.extend( + [ + FieldValue(source_name=field, operator=Identifier(token_type=OperatorType.EQ), value=value), + Identifier(token_type=LogicalOperatorType.AND), + ] + ) + return extra_tokens diff --git a/uncoder-core/app/translator/core/render_cti.py b/uncoder-core/app/translator/core/render_cti.py index 20bfb7bf..b0cf631e 100644 --- a/uncoder-core/app/translator/core/render_cti.py +++ b/uncoder-core/app/translator/core/render_cti.py @@ -16,6 +16,7 @@ limitations under the License. ----------------------------------------------------------------- """ +from abc import abstractmethod from app.translator.core.models.iocs import IocsChunkValue from app.translator.core.models.platform_details import PlatformDetails @@ -46,6 +47,10 @@ def render(self, data: list[list[IocsChunkValue]]) -> list[str]: final_result.append(self.final_result_for_one.format(result=data_values[0])) return final_result + @abstractmethod + def generate(self, data: dict[str, list[list[IocsChunkValue]]], **kwargs) -> list[str]: + raise NotImplementedError("Abstract method") + def collect_data_values(self, chunk: list[IocsChunkValue]) -> list[str]: data_values = [] key_chunk = [] diff --git a/uncoder-core/app/translator/cti_translator.py b/uncoder-core/app/translator/cti_translator.py index 79b25fc4..2d345b3e 100644 --- a/uncoder-core/app/translator/cti_translator.py +++ b/uncoder-core/app/translator/cti_translator.py @@ -6,6 +6,7 @@ from app.translator.core.models.iocs import IocsChunkValue from app.translator.core.parser_cti import CTIParser from app.translator.managers import RenderCTIManager, render_cti_manager +from app.translator.tools.const import DefaultHashType, DefaultIocParsingRule, DefaultIOCType, iocs_types_map from app.translator.tools.decorators import handle_translation_exceptions @@ -45,6 +46,31 @@ def __render_translation(self, parsed_data: dict, platform_data: CTIPlatform, io ) return render_cti.render(chunked_iocs) + def __sort_iocs_by_type(self, parsed_data: dict) -> dict: + result = {} + for key, values in iocs_types_map.items(): + if not result.get(key): + result[key] = {} + for generic_field, iocs_list in parsed_data.items(): + if generic_field in values: + result[key][generic_field] = iocs_list + return result + + @handle_translation_exceptions + def __generate_translation( + self, parsed_data: dict, platform_data: CTIPlatform, iocs_per_query: int, **kwargs + ) -> list[str]: + render_cti = self.render_manager.get(platform_data.id) + + sorted_data = self.__sort_iocs_by_type(parsed_data) + chunked_iocs = {} + for key, chunk in sorted_data.items(): + if ioc_chuck := self.__get_iocs_chunk( + chunks_size=iocs_per_query, data=chunk, mapping=render_cti.default_mapping + ): + chunked_iocs[key] = ioc_chuck + return render_cti.generate(chunked_iocs, **kwargs) + def translate( self, text: str, @@ -70,6 +96,37 @@ def translate( ) return status, parsed_data + def generate( + self, + text: str, + title: str, + description: str, + references: list[str], + created_date: str, + mitre_tags: Optional[list[str]], + platform_data: CTIPlatform, + iocs_per_query: int = CTI_IOCS_PER_QUERY_LIMIT, + ) -> (bool, list[str]): + status, parsed_data = self.__parse_iocs_from_string( + text=text, + include_ioc_types=DefaultIOCType, + include_hash_types=DefaultHashType, + ioc_parsing_rules=DefaultIocParsingRule, + include_source_ip=True, + ) + if status: + kwargs = { + "title": title, + "description": description, + "references": references, + "created_date": created_date, + "mitre_tags": mitre_tags, + } + return self.__generate_translation( + parsed_data=parsed_data, platform_data=platform_data, iocs_per_query=iocs_per_query, **kwargs + ) + return status, parsed_data + @staticmethod def __get_iocs_chunk( chunks_size: int, data: dict[str, list[str]], mapping: dict[str, str] @@ -86,3 +143,6 @@ def __get_iocs_chunk( @classmethod def get_renders(cls) -> list: return cls.render_manager.get_platforms_details + + +cti_translator = CTITranslator() diff --git a/uncoder-core/app/translator/platforms/arcsight/renders/arcsight_cti.py b/uncoder-core/app/translator/platforms/arcsight/renders/arcsight_cti.py index 778ef04e..311e124b 100644 --- a/uncoder-core/app/translator/platforms/arcsight/renders/arcsight_cti.py +++ b/uncoder-core/app/translator/platforms/arcsight/renders/arcsight_cti.py @@ -1,15 +1,14 @@ from app.translator.core.models.platform_details import PlatformDetails from app.translator.core.render_cti import RenderCTI from app.translator.managers import render_cti_manager -from app.translator.platforms.arcsight.const import ARCSIGHT_QUERY_DETAILS -from app.translator.platforms.arcsight.mappings.arcsight_cti import DEFAULT_ARCSIGHT_MAPPING +from app.translator.platforms.arcsight.const import DEFAULT_ARCSIGHT_CTI_MAPPING, arcsight_query_details @render_cti_manager.register class ArcsightKeyword(RenderCTI): - details: PlatformDetails = PlatformDetails(**ARCSIGHT_QUERY_DETAILS) + details: PlatformDetails = arcsight_query_details - default_mapping = DEFAULT_ARCSIGHT_MAPPING + default_mapping = DEFAULT_ARCSIGHT_CTI_MAPPING field_value_template: str = "{key} = {value}" or_operator: str = " OR " group_or_operator: str = " OR " diff --git a/uncoder-core/app/translator/platforms/sigma/renders/sigma_cti.py b/uncoder-core/app/translator/platforms/sigma/renders/sigma_cti.py new file mode 100644 index 00000000..a1f2829d --- /dev/null +++ b/uncoder-core/app/translator/platforms/sigma/renders/sigma_cti.py @@ -0,0 +1,69 @@ +import uuid +import yaml + +from app.translator.core.custom_types.meta_info import SeverityType +from app.translator.core.models.iocs import IocsChunkValue +from app.translator.core.models.platform_details import PlatformDetails +from app.translator.core.render_cti import RenderCTI +from app.translator.managers import render_cti_manager +from app.translator.platforms.sigma.const import sigma_rule_details, DEFAULT_SIGMA_CTI_MAPPING +from app.translator.tools.const import LOGSOURCE_MAP + + +@render_cti_manager.register +class SigmaRenderCTI(RenderCTI): + details: PlatformDetails = sigma_rule_details + default_mapping = DEFAULT_SIGMA_CTI_MAPPING + + def render(self, data: list[list[IocsChunkValue]]) -> list[str]: + final_result = [] + for iocs_chunk in data: + data_values = self.collect_sigma_data_values(iocs_chunk) + rule = { + "title": "Sigma automatically generated based on IOCs", + "id": uuid.uuid4().__str__(), + "description": "Detects suspicious activity based on IOCs.", + "status": "experimental", + "author": "SOC Prime", + "logsource": {"product": "windows"}, + "fields": list(data_values.keys()), + "detection": {"selection": data_values, "condition": "selection"}, + "level": SeverityType.low, + "falsepositives": "", + } + final_result.append(yaml.dump(rule, default_flow_style=False, sort_keys=False)) + return final_result + + def collect_sigma_data_values(self, chunk: list[IocsChunkValue]) -> dict: + raw_data_values = {} + for value in chunk: + if value.platform_field in raw_data_values.keys(): + raw_data_values[value.platform_field].append(value.value) + else: + raw_data_values[value.platform_field] = [value.value] + return raw_data_values + + def generate(self, data: dict[list[list[IocsChunkValue]]], **kwargs): + final_result = [] + for key, iocs_chunks in data.items(): + for iocs_chunk in iocs_chunks: + data_values = self.collect_sigma_data_values(iocs_chunk) + rule = { + "title": f"IOCs ({key}) to detect: {kwargs['title']}", + "id": uuid.uuid4().__str__(), + "description": kwargs["description"], + "status": "stable", + "author": "SOC Prime Team", + "logsource": LOGSOURCE_MAP.get(key), + "fields": list(data_values.keys()), + "detection": {"selection": data_values, "condition": "selection"}, + "level": SeverityType.medium, + "falsepositives": "", + "references": kwargs["references"], + "date": kwargs["created_date"], + "modified": kwargs["created_date"], + } + if kwargs.get("mitre_tags"): + rule["tags"] = kwargs["mitre_tags"] + final_result.append(yaml.dump(rule, default_flow_style=False, sort_keys=False)) + return final_result \ No newline at end of file diff --git a/uncoder-core/app/translator/tools/const.py b/uncoder-core/app/translator/tools/const.py index caee962e..14aafe51 100644 --- a/uncoder-core/app/translator/tools/const.py +++ b/uncoder-core/app/translator/tools/const.py @@ -1,15 +1,35 @@ -from typing import Literal +import typing IP_IOC_REGEXP_PATTERN = r"(?:^|[ \/\[(\"',;>|])((?:25[0-5]|2[0-4]\d|[0-1]?\d{1,2})(?:\.(?:25[0-5]|2[0-4]\d|[0-1]?\d{1,2})){3})(?=[\s)\]\"',;:\/?\n<|]|$)" # noqa: E501 DOMAIN_IOC_REGEXP_PATTERN = r"(?:^|[\s\/\[\]@(\"',;{>|])(?:(?:http[s]?|ftp):\/\/?)?([^:\\\/\s({\[\]@\"'`,]+\.[a-zA-Z]+)(?:(?:(?:[/|:]\w+)*\/)(?:[\w\-.]+[^#?\s]+)?(?:[\w/\-&?=%.#]+(?:\(\))?)?)?(?=[\s)\]\"',;<|]|$)" # noqa: E501 URL_IOC_REGEXP_PATTERN = r"(?:^|[\s\/\[\]@(\"',;{>|])((?:(?:http[s]?|ftp):\/\/?)+(?:[^:\\\/\s({\[\]@\"'`,]+\.[a-zA-Z0-9]+)(?:(?:(?:[/|:]\w+)*\/)(?:[\w\-.]+[^#?\s<']+)?(?:[\w/\-&?=%.#]+(?:\(\))?)?)?)(?=[\s)\]\"',;<|]|$)" # noqa: E501 -IOCType = Literal["ip", "domain", "url", "hash"] -HashType = Literal["md5", "sha1", "sha256", "sha512"] -IocParsingRule = Literal["replace_dots", "remove_private_and_reserved_ips", "replace_hxxp"] +IOCType = typing.Literal["ip", "domain", "url", "hash"] +HashType = typing.Literal["md5", "sha1", "sha256", "sha512"] +IocParsingRule = typing.Literal["replace_dots", "remove_private_and_reserved_ips", "replace_hxxp"] + +DefaultIOCType = list(typing.get_args(IOCType)) +DefaultHashType = list(typing.get_args(HashType)) +DefaultIocParsingRule = list(typing.get_args(IocParsingRule)) HASH_MAP = {"md5": "HashMd5", "sha1": "HashSha1", "sha256": "HashSha256", "sha512": "HashSha512"} +iocs_types_map = { + "url": ["URL"], + "domain": ["Domain"], + "ip": ["DestinationIP", "SourceIP"], + "hash": ["HashMd5", "HashSha1", "HashSha256", "HashSha512"], +} + +LOGSOURCE_MAP = { + "hash": {"category": "process_creation"}, + "domain": {"category": "proxy"}, + "url": {"category": "proxy"}, + "ip": {"category": "proxy"}, + "emails": {"category": "mail"}, + "files": {"category": "file_event"}, +} + hash_regexes = { "md5": r"(?:^|[\s\/\[(\"',;{>|])([A-Fa-f0-9]{32})(?=[\s)\]\"',;\n<|]|$)", "sha1": r"(?:^|[\s\/\[(\"',;{>|])([A-Fa-f0-9]{40})(?=[\s)\]\"',;\n<|]|$)", From ea3262cd0c83b772ae21d7fc86b8319e201b2c16 Mon Sep 17 00:00:00 2001 From: Gesyk Nazar <77268518+nazargesyk@users.noreply.github.com> Date: Fri, 13 Dec 2024 13:51:40 +0200 Subject: [PATCH 02/10] gis-9379 add new endpoint /iocs/generate --- uncoder-core/app/translator/tools/const.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/uncoder-core/app/translator/tools/const.py b/uncoder-core/app/translator/tools/const.py index 14aafe51..3097ab06 100644 --- a/uncoder-core/app/translator/tools/const.py +++ b/uncoder-core/app/translator/tools/const.py @@ -1,16 +1,16 @@ -import typing +from typing import get_args, Literal IP_IOC_REGEXP_PATTERN = r"(?:^|[ \/\[(\"',;>|])((?:25[0-5]|2[0-4]\d|[0-1]?\d{1,2})(?:\.(?:25[0-5]|2[0-4]\d|[0-1]?\d{1,2})){3})(?=[\s)\]\"',;:\/?\n<|]|$)" # noqa: E501 DOMAIN_IOC_REGEXP_PATTERN = r"(?:^|[\s\/\[\]@(\"',;{>|])(?:(?:http[s]?|ftp):\/\/?)?([^:\\\/\s({\[\]@\"'`,]+\.[a-zA-Z]+)(?:(?:(?:[/|:]\w+)*\/)(?:[\w\-.]+[^#?\s]+)?(?:[\w/\-&?=%.#]+(?:\(\))?)?)?(?=[\s)\]\"',;<|]|$)" # noqa: E501 URL_IOC_REGEXP_PATTERN = r"(?:^|[\s\/\[\]@(\"',;{>|])((?:(?:http[s]?|ftp):\/\/?)+(?:[^:\\\/\s({\[\]@\"'`,]+\.[a-zA-Z0-9]+)(?:(?:(?:[/|:]\w+)*\/)(?:[\w\-.]+[^#?\s<']+)?(?:[\w/\-&?=%.#]+(?:\(\))?)?)?)(?=[\s)\]\"',;<|]|$)" # noqa: E501 -IOCType = typing.Literal["ip", "domain", "url", "hash"] -HashType = typing.Literal["md5", "sha1", "sha256", "sha512"] -IocParsingRule = typing.Literal["replace_dots", "remove_private_and_reserved_ips", "replace_hxxp"] +IOCType = Literal["ip", "domain", "url", "hash"] +HashType = Literal["md5", "sha1", "sha256", "sha512"] +IocParsingRule = Literal["replace_dots", "remove_private_and_reserved_ips", "replace_hxxp"] -DefaultIOCType = list(typing.get_args(IOCType)) -DefaultHashType = list(typing.get_args(HashType)) -DefaultIocParsingRule = list(typing.get_args(IocParsingRule)) +DefaultIOCType = list(get_args(IOCType)) +DefaultHashType = list(get_args(HashType)) +DefaultIocParsingRule = list(get_args(IocParsingRule)) HASH_MAP = {"md5": "HashMd5", "sha1": "HashSha1", "sha256": "HashSha256", "sha512": "HashSha512"} From 58aa2543309d2e0defe0707307c523cbd621771a Mon Sep 17 00:00:00 2001 From: Gesyk Nazar <77268518+nazargesyk@users.noreply.github.com> Date: Fri, 10 Jan 2025 10:10:22 +0200 Subject: [PATCH 03/10] gis-9379 change sigma cti mapping --- uncoder-core/app/translator/platforms/sigma/const.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/uncoder-core/app/translator/platforms/sigma/const.py b/uncoder-core/app/translator/platforms/sigma/const.py index aaedda41..5687babc 100644 --- a/uncoder-core/app/translator/platforms/sigma/const.py +++ b/uncoder-core/app/translator/platforms/sigma/const.py @@ -8,4 +8,16 @@ "group_id": "sigma", } +DEFAULT_SIGMA_CTI_MAPPING = { + "SourceIP": "SourceIP", + "DestinationIP": "DestinationIP", + "Domain": "Domain", + "URL": "URL", + "HashMd5": "HashMd5", + "HashSha1": "HashSha1", + "HashSha256": "HashSha256", + "HashSha512": "HashSha512", +} + + sigma_rule_details = PlatformDetails(**SIGMA_RULE_DETAILS) From 1426c79ad63ee957a5f8c5442c88dccb27a32b94 Mon Sep 17 00:00:00 2001 From: Gesyk Nazar <77268518+nazargesyk@users.noreply.github.com> Date: Mon, 20 Jan 2025 13:25:37 +0200 Subject: [PATCH 04/10] gis-9379 fixes --- .../app/translator/platforms/sigma/const.py | 16 ++++++++-------- .../platforms/sigma/renders/sigma_cti.py | 2 +- uncoder-core/app/translator/tools/const.py | 2 +- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/uncoder-core/app/translator/platforms/sigma/const.py b/uncoder-core/app/translator/platforms/sigma/const.py index 5687babc..87d06daa 100644 --- a/uncoder-core/app/translator/platforms/sigma/const.py +++ b/uncoder-core/app/translator/platforms/sigma/const.py @@ -9,14 +9,14 @@ } DEFAULT_SIGMA_CTI_MAPPING = { - "SourceIP": "SourceIP", - "DestinationIP": "DestinationIP", - "Domain": "Domain", - "URL": "URL", - "HashMd5": "HashMd5", - "HashSha1": "HashSha1", - "HashSha256": "HashSha256", - "HashSha512": "HashSha512", + "SourceIP": "src-ip", + "DestinationIP": "dst-ip", + "Domain": "cs-host", + "URL": "c-uri", + "HashMd5": "Hashes", + "HashSha1": "Hashes", + "HashSha256": "Hashes", + "HashSha512": "Hashes", } diff --git a/uncoder-core/app/translator/platforms/sigma/renders/sigma_cti.py b/uncoder-core/app/translator/platforms/sigma/renders/sigma_cti.py index a1f2829d..51c6ce20 100644 --- a/uncoder-core/app/translator/platforms/sigma/renders/sigma_cti.py +++ b/uncoder-core/app/translator/platforms/sigma/renders/sigma_cti.py @@ -53,7 +53,7 @@ def generate(self, data: dict[list[list[IocsChunkValue]]], **kwargs): "id": uuid.uuid4().__str__(), "description": kwargs["description"], "status": "stable", - "author": "SOC Prime Team", + "author": "Uncoder Autogenerated", "logsource": LOGSOURCE_MAP.get(key), "fields": list(data_values.keys()), "detection": {"selection": data_values, "condition": "selection"}, diff --git a/uncoder-core/app/translator/tools/const.py b/uncoder-core/app/translator/tools/const.py index 3097ab06..247af3a5 100644 --- a/uncoder-core/app/translator/tools/const.py +++ b/uncoder-core/app/translator/tools/const.py @@ -8,7 +8,7 @@ HashType = Literal["md5", "sha1", "sha256", "sha512"] IocParsingRule = Literal["replace_dots", "remove_private_and_reserved_ips", "replace_hxxp"] -DefaultIOCType = list(get_args(IOCType)) +DefaultIOCType = list(get_args(Literal["ip", "url", "hash"])) DefaultHashType = list(get_args(HashType)) DefaultIocParsingRule = list(get_args(IocParsingRule)) From 7b7c354a73df7f77e26286cb1fc6280edadc82e0 Mon Sep 17 00:00:00 2001 From: Gesyk Nazar <77268518+nazargesyk@users.noreply.github.com> Date: Mon, 20 Jan 2025 14:19:09 +0200 Subject: [PATCH 05/10] gis-9379 fixes --- uncoder-core/app/translator/core/parser_cti.py | 2 +- uncoder-core/app/translator/tools/const.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/uncoder-core/app/translator/core/parser_cti.py b/uncoder-core/app/translator/core/parser_cti.py index 05ca14fa..107946e4 100644 --- a/uncoder-core/app/translator/core/parser_cti.py +++ b/uncoder-core/app/translator/core/parser_cti.py @@ -58,7 +58,7 @@ def get_iocs_from_string( if not include_ioc_types or "domain" in include_ioc_types: iocs.domain.extend(self._find_all_str_by_regex(string, DOMAIN_IOC_REGEXP_PATTERN)) if not include_ioc_types or "url" in include_ioc_types: - iocs.url.extend(self._find_all_str_by_regex(string, URL_IOC_REGEXP_PATTERN)) + iocs.url.extend([url.rstrip(".") for url in self._find_all_str_by_regex(string, URL_IOC_REGEXP_PATTERN)]) if not include_ioc_types or "hash" in include_ioc_types: if not include_hash_types: include_hash_types = list(hash_regexes.keys()) diff --git a/uncoder-core/app/translator/tools/const.py b/uncoder-core/app/translator/tools/const.py index 247af3a5..142acacf 100644 --- a/uncoder-core/app/translator/tools/const.py +++ b/uncoder-core/app/translator/tools/const.py @@ -1,8 +1,8 @@ from typing import get_args, Literal -IP_IOC_REGEXP_PATTERN = r"(?:^|[ \/\[(\"',;>|])((?:25[0-5]|2[0-4]\d|[0-1]?\d{1,2})(?:\.(?:25[0-5]|2[0-4]\d|[0-1]?\d{1,2})){3})(?=[\s)\]\"',;:\/?\n<|]|$)" # noqa: E501 +IP_IOC_REGEXP_PATTERN = r"(?:^|[ \/\[(\"',;>|])((?:25[0-5]|2[0-4]\d|[0-1]?\d{1,2})(?:\[?\(?\{?\.\]?\)?\}?(?:25[0-5]|2[0-4]\d|[0-1]?\d{1,2})){3})(?=[\s)\]\"',;:\/?\n<|]|$)" # noqa: E501 DOMAIN_IOC_REGEXP_PATTERN = r"(?:^|[\s\/\[\]@(\"',;{>|])(?:(?:http[s]?|ftp):\/\/?)?([^:\\\/\s({\[\]@\"'`,]+\.[a-zA-Z]+)(?:(?:(?:[/|:]\w+)*\/)(?:[\w\-.]+[^#?\s]+)?(?:[\w/\-&?=%.#]+(?:\(\))?)?)?(?=[\s)\]\"',;<|]|$)" # noqa: E501 -URL_IOC_REGEXP_PATTERN = r"(?:^|[\s\/\[\]@(\"',;{>|])((?:(?:http[s]?|ftp):\/\/?)+(?:[^:\\\/\s({\[\]@\"'`,]+\.[a-zA-Z0-9]+)(?:(?:(?:[/|:]\w+)*\/)(?:[\w\-.]+[^#?\s<']+)?(?:[\w/\-&?=%.#]+(?:\(\))?)?)?)(?=[\s)\]\"',;<|]|$)" # noqa: E501 +URL_IOC_REGEXP_PATTERN = r"(?:h[xX][xX]p[s]?|f[txX]p[s]?):\/\/[^\s,:;]+" # noqa: E501 IOCType = Literal["ip", "domain", "url", "hash"] HashType = Literal["md5", "sha1", "sha256", "sha512"] From 1dc67450cf3c9774be1dedd2a6be3b5c0754e2d5 Mon Sep 17 00:00:00 2001 From: Gesyk Nazar <77268518+nazargesyk@users.noreply.github.com> Date: Mon, 20 Jan 2025 16:21:24 +0200 Subject: [PATCH 06/10] gis-9379 unlock domains --- uncoder-core/app/translator/tools/const.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/uncoder-core/app/translator/tools/const.py b/uncoder-core/app/translator/tools/const.py index 142acacf..144e4deb 100644 --- a/uncoder-core/app/translator/tools/const.py +++ b/uncoder-core/app/translator/tools/const.py @@ -1,14 +1,14 @@ -from typing import get_args, Literal +from typing import Literal, get_args IP_IOC_REGEXP_PATTERN = r"(?:^|[ \/\[(\"',;>|])((?:25[0-5]|2[0-4]\d|[0-1]?\d{1,2})(?:\[?\(?\{?\.\]?\)?\}?(?:25[0-5]|2[0-4]\d|[0-1]?\d{1,2})){3})(?=[\s)\]\"',;:\/?\n<|]|$)" # noqa: E501 -DOMAIN_IOC_REGEXP_PATTERN = r"(?:^|[\s\/\[\]@(\"',;{>|])(?:(?:http[s]?|ftp):\/\/?)?([^:\\\/\s({\[\]@\"'`,]+\.[a-zA-Z]+)(?:(?:(?:[/|:]\w+)*\/)(?:[\w\-.]+[^#?\s]+)?(?:[\w/\-&?=%.#]+(?:\(\))?)?)?(?=[\s)\]\"',;<|]|$)" # noqa: E501 -URL_IOC_REGEXP_PATTERN = r"(?:h[xX][xX]p[s]?|f[txX]p[s]?):\/\/[^\s,:;]+" # noqa: E501 +DOMAIN_IOC_REGEXP_PATTERN = r"((?:[A-Za-z0-9|-]+\[?\{?\(?\.\)?\}?\]?)*[A-Za-z0-9|-]+(?:\[|\{|\()\.(?:\)|\}|\])[A-Za-z|-]+)|((?:[A-Za-z0-9|-]+(?:\[|\{|\()\.(?:\)|\}|\]))*[A-Za-z0-9|-]+(?:\[|\{|\()\.(?:\)|\}|\])[A-Za-z|-]+)|(?:h[xX][xX]p[s]?|f[txX]p[s]?):\/\/([^\/\s,:;]+)+" # noqa: E501 +URL_IOC_REGEXP_PATTERN = r"(?:h[xX][xX]p[s]?|f[txX]p[s]?):\/\/[^\s,:;]+" IOCType = Literal["ip", "domain", "url", "hash"] HashType = Literal["md5", "sha1", "sha256", "sha512"] IocParsingRule = Literal["replace_dots", "remove_private_and_reserved_ips", "replace_hxxp"] -DefaultIOCType = list(get_args(Literal["ip", "url", "hash"])) +DefaultIOCType = list(get_args(IOCType)) DefaultHashType = list(get_args(HashType)) DefaultIocParsingRule = list(get_args(IocParsingRule)) From e078abf2f7fca143417c9fbca462c8a6be3a7e02 Mon Sep 17 00:00:00 2001 From: Gesyk Nazar <77268518+nazargesyk@users.noreply.github.com> Date: Mon, 20 Jan 2025 17:52:49 +0200 Subject: [PATCH 07/10] gis-9379 fix --- uncoder-core/app/translator/tools/const.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/uncoder-core/app/translator/tools/const.py b/uncoder-core/app/translator/tools/const.py index 144e4deb..d6bee8c3 100644 --- a/uncoder-core/app/translator/tools/const.py +++ b/uncoder-core/app/translator/tools/const.py @@ -10,7 +10,7 @@ DefaultIOCType = list(get_args(IOCType)) DefaultHashType = list(get_args(HashType)) -DefaultIocParsingRule = list(get_args(IocParsingRule)) +DefaultIocParsingRule = list(get_args(Literal["remove_private_and_reserved_ips"])) HASH_MAP = {"md5": "HashMd5", "sha1": "HashSha1", "sha256": "HashSha256", "sha512": "HashSha512"} From 16138fee0dc40db7d4718ee387417d7ff59d8893 Mon Sep 17 00:00:00 2001 From: Gesyk Nazar <77268518+nazargesyk@users.noreply.github.com> Date: Mon, 20 Jan 2025 18:51:44 +0200 Subject: [PATCH 08/10] gis-9379 fix --- uncoder-core/app/translator/core/parser_cti.py | 15 +++++++++++---- uncoder-core/app/translator/tools/const.py | 8 ++++---- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/uncoder-core/app/translator/core/parser_cti.py b/uncoder-core/app/translator/core/parser_cti.py index 107946e4..e0059aae 100644 --- a/uncoder-core/app/translator/core/parser_cti.py +++ b/uncoder-core/app/translator/core/parser_cti.py @@ -52,13 +52,20 @@ def get_iocs_from_string( include_source_ip: Optional[bool] = False, ) -> dict: iocs = Iocs() - string = self.replace_dots_hxxp(string, ioc_parsing_rules) if not include_ioc_types or "ip" in include_ioc_types: iocs.ip.extend(self._find_all_str_by_regex(string, IP_IOC_REGEXP_PATTERN)) if not include_ioc_types or "domain" in include_ioc_types: - iocs.domain.extend(self._find_all_str_by_regex(string, DOMAIN_IOC_REGEXP_PATTERN)) + for domain in self._find_all_str_by_regex(string, DOMAIN_IOC_REGEXP_PATTERN): + for domain_val in domain: + if domain_val: + iocs.domain.extend(self.replace_dots_hxxp(domain_val)) if not include_ioc_types or "url" in include_ioc_types: - iocs.url.extend([url.rstrip(".") for url in self._find_all_str_by_regex(string, URL_IOC_REGEXP_PATTERN)]) + iocs.url.extend( + [ + self.replace_dots_hxxp(url).rstrip(".") + for url in self._find_all_str_by_regex(string, URL_IOC_REGEXP_PATTERN) + ] + ) if not include_ioc_types or "hash" in include_ioc_types: if not include_hash_types: include_hash_types = list(hash_regexes.keys()) @@ -74,7 +81,7 @@ def get_iocs_from_string( raise IocsLimitExceededException(f"IOCs count {total_count} exceeds limit {limit}.") return iocs.return_iocs(include_source_ip) - def replace_dots_hxxp(self, string: str, ioc_parsing_rules: Optional[list[IocParsingRule]]) -> str: + def replace_dots_hxxp(self, string: str, ioc_parsing_rules: Optional[list[IocParsingRule]] = None) -> str: if ioc_parsing_rules is None or "replace_dots" in ioc_parsing_rules: string = self._replace_dots(string) if ioc_parsing_rules is None or "replace_hxxp" in ioc_parsing_rules: diff --git a/uncoder-core/app/translator/tools/const.py b/uncoder-core/app/translator/tools/const.py index d6bee8c3..ab7bae78 100644 --- a/uncoder-core/app/translator/tools/const.py +++ b/uncoder-core/app/translator/tools/const.py @@ -6,11 +6,11 @@ IOCType = Literal["ip", "domain", "url", "hash"] HashType = Literal["md5", "sha1", "sha256", "sha512"] -IocParsingRule = Literal["replace_dots", "remove_private_and_reserved_ips", "replace_hxxp"] +IocParsingRule = Literal["remove_private_and_reserved_ips"] DefaultIOCType = list(get_args(IOCType)) DefaultHashType = list(get_args(HashType)) -DefaultIocParsingRule = list(get_args(Literal["remove_private_and_reserved_ips"])) +DefaultIocParsingRule = list(get_args(IocParsingRule)) HASH_MAP = {"md5": "HashMd5", "sha1": "HashSha1", "sha256": "HashSha256", "sha512": "HashSha512"} @@ -22,10 +22,10 @@ } LOGSOURCE_MAP = { - "hash": {"category": "process_creation"}, + "hash": {"category": "file_event"}, "domain": {"category": "proxy"}, "url": {"category": "proxy"}, - "ip": {"category": "proxy"}, + "ip": {"category": "firewall"}, "emails": {"category": "mail"}, "files": {"category": "file_event"}, } From 5818932725ad6b0ed5b85d1faddb5535f0d24ebf Mon Sep 17 00:00:00 2001 From: Gesyk Nazar <77268518+nazargesyk@users.noreply.github.com> Date: Tue, 21 Jan 2025 09:41:54 +0200 Subject: [PATCH 09/10] fix --- uncoder-core/app/translator/core/parser_cti.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/uncoder-core/app/translator/core/parser_cti.py b/uncoder-core/app/translator/core/parser_cti.py index e0059aae..c402789c 100644 --- a/uncoder-core/app/translator/core/parser_cti.py +++ b/uncoder-core/app/translator/core/parser_cti.py @@ -53,12 +53,13 @@ def get_iocs_from_string( ) -> dict: iocs = Iocs() if not include_ioc_types or "ip" in include_ioc_types: - iocs.ip.extend(self._find_all_str_by_regex(string, IP_IOC_REGEXP_PATTERN)) + for ip in self._find_all_str_by_regex(string, IP_IOC_REGEXP_PATTERN): + iocs.ip.append(self.replace_dots_hxxp(ip)) if not include_ioc_types or "domain" in include_ioc_types: for domain in self._find_all_str_by_regex(string, DOMAIN_IOC_REGEXP_PATTERN): for domain_val in domain: if domain_val: - iocs.domain.extend(self.replace_dots_hxxp(domain_val)) + iocs.domain.append(self.replace_dots_hxxp(domain_val)) if not include_ioc_types or "url" in include_ioc_types: iocs.url.extend( [ From 7bcae2ec556b5588fd9c43d87eed357885656583 Mon Sep 17 00:00:00 2001 From: Gesyk Nazar <77268518+nazargesyk@users.noreply.github.com> Date: Tue, 21 Jan 2025 09:55:26 +0200 Subject: [PATCH 10/10] fix --- uncoder-core/app/translator/core/parser_cti.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/uncoder-core/app/translator/core/parser_cti.py b/uncoder-core/app/translator/core/parser_cti.py index c402789c..41df2773 100644 --- a/uncoder-core/app/translator/core/parser_cti.py +++ b/uncoder-core/app/translator/core/parser_cti.py @@ -61,12 +61,8 @@ def get_iocs_from_string( if domain_val: iocs.domain.append(self.replace_dots_hxxp(domain_val)) if not include_ioc_types or "url" in include_ioc_types: - iocs.url.extend( - [ - self.replace_dots_hxxp(url).rstrip(".") - for url in self._find_all_str_by_regex(string, URL_IOC_REGEXP_PATTERN) - ] - ) + for url in self._find_all_str_by_regex(string, URL_IOC_REGEXP_PATTERN): + iocs.url.append(self.replace_dots_hxxp(url).rstrip(".")) if not include_ioc_types or "hash" in include_ioc_types: if not include_hash_types: include_hash_types = list(hash_regexes.keys())