diff --git a/tests/integration/test_windows_fake_fs_token.py b/tests/integration/test_windows_fake_fs_token.py new file mode 100644 index 000000000..c7955e673 --- /dev/null +++ b/tests/integration/test_windows_fake_fs_token.py @@ -0,0 +1,93 @@ +import random +import pytest + +from canarytokens.models import ( + WindowsFakeFSTokenHistory, + WindowsFakeFSTokenRequest, + WindowsFakeFSTokenResponse, +) +from canarytokens.webhook_formatting import TokenAlertDetailGeneric + +from tests.utils import ( + clear_stats_on_webhook, + trigger_windows_fake_fs_token, + create_token, + get_stats_from_webhook, + get_token_history, + run_or_skip, + v3, +) + + +@pytest.mark.parametrize("version", [v3]) +@pytest.mark.parametrize( + "file_name, process_name", + [ + ("doc b.docx", "explorer.exe"), + ("doc b.doc", "explorer.exe"), + ], +) +def test_windows_fake_fs_token_fires( + file_name: str, + process_name: str, + webhook_receiver, + version, + runv2, + runv3, +): + """ + Tests the Windows Fake FS token. + """ + run_or_skip(version, runv2=runv2, runv3=runv3) + expected_hits = 1 + + # Create a Windows Fake File System token request + memo = "Test stuff break stuff test stuff sometimes build stuff" + root_dir = r"C:\Secrets" + file_structure = "TESTFS" + + token_request = WindowsFakeFSTokenRequest( + webhook_url=webhook_receiver, + memo=memo, + windows_fake_fs_root=root_dir, + windows_fake_fs_file_structure=file_structure, + ) + resp = create_token(token_request, version=version) + + # Check dns token has correct attributes + token_info = WindowsFakeFSTokenResponse(**resp) + assert token_info.token in token_info.hostname.split(".") + + clear_stats_on_webhook(webhook_receiver, token=token_info.token) + # Trigger CMD token twice, make sure the invocation ID limits it to one hit + invocation_id = random.randint(1000, 10000) + _ = trigger_windows_fake_fs_token( + token_info=token_info, + version=version, + invocation_id=invocation_id, + file_name=file_name, + process_name=process_name, + ) + _ = trigger_windows_fake_fs_token( + token_info=token_info, + version=version, + invocation_id=invocation_id, + file_name=file_name, + process_name=process_name, + ) + + stats = get_stats_from_webhook(webhook_receiver, token=token_info.token) + if stats is not None: + # Check that what was sent to the webhook is consistent. + assert len(stats) == expected_hits + assert stats[0]["memo"] == memo + _ = TokenAlertDetailGeneric(**stats[0]) + + # Check that the returned history has a single hit. + resp = get_token_history(token_info=token_info, version=version) + + token_history = WindowsFakeFSTokenHistory(**resp) + # TODO: what other fields do we want to assert on. + # note: making them TokenHistory have stronger validators is + # the better option. + assert len(token_history.hits) == expected_hits diff --git a/tests/units/test_tokens.py b/tests/units/test_tokens.py index 12972c1f5..ec5f8f86b 100644 --- a/tests/units/test_tokens.py +++ b/tests/units/test_tokens.py @@ -1,6 +1,8 @@ from typing import Optional import pytest +import random +import base64 from canarytokens import tokens as t from canarytokens.models import TokenTypes @@ -125,6 +127,26 @@ def test_windows_fake_fs_pattern(query, invocation_id, file_name, process_name): assert data["src_data"]["windows_fake_fs_process_name"] == process_name.lower() +def test_windows_fake_fs_base32_padding(): + invocation_id = f"{random.randint(1000, 10000)}" + for counter in range(1, 20): + test_data = "a" * counter + base32_data = base64.b32encode(test_data.encode("utf-8")).decode() + base32_data_no_padding = base32_data.replace("=", "") + query = "u{invocation_id}.f{file_name}.i{process_name}.{domain}".format( + invocation_id=invocation_id, + file_name=base32_data_no_padding, + process_name=base32_data_no_padding, + domain="someid.sometoken.com", + ) + + m = t.windows_fake_fs_pattern.match(query) + data = t.Canarytoken._windows_fake_fs(m) + assert data["src_data"]["windows_fake_fs_invocation_id"] == invocation_id + assert data["src_data"]["windows_fake_fs_file_name"] == test_data + assert data["src_data"]["windows_fake_fs_process_name"] == test_data + + def test_canarytoken_create_and_fetch(): ct = t.Canarytoken() ct_new = t.Canarytoken(value=ct.value()) diff --git a/tests/utils.py b/tests/utils.py index b3a222804..387f5b14b 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -9,6 +9,8 @@ from functools import wraps from logging import Logger from typing import Callable, Dict, Optional, Union +import base64 +import re import dns.resolver import pytest @@ -29,6 +31,7 @@ AzureIDTokenResponse, AzureIDAdditionalInfo, CMDTokenResponse, + WindowsFakeFSTokenResponse, CustomBinaryTokenRequest, CustomBinaryTokenResponse, CustomImageTokenRequest, @@ -203,6 +206,36 @@ def trigger_cmd_token( return target +def trigger_windows_fake_fs_token( + token_info: WindowsFakeFSTokenResponse, + version: Union[V2, V3], + invocation_id: int, + file_name: str = "doc b.docx", + process_name: str = "explorer.exe", +) -> str: + """ + Triggers a Windows Fake File System token by making a dns query with the expected parameters as Windows would. + """ + alert_domain_pattern = re.compile( + r"\$alertDomain = \"([A-Za-z0-9.]*)\"", re.IGNORECASE | re.MULTILINE + ) + re_search_result = alert_domain_pattern.search(token_info.powershell_file) + domain = re_search_result.groups()[0] + + target = "u{invocation_id}.f{file_name}.i{process_name}.{domain}".format( + invocation_id=invocation_id, + file_name=base64.b32encode(file_name.encode("utf-8")).decode().replace("=", ""), + process_name=base64.b32encode(process_name.encode("utf-8")) + .decode() + .replace("=", ""), + domain=domain, + ) + + resolver = grab_resolver(version=version) + resolver.resolve(target, "A") + return target + + def retry_on_failure( retry_when_raised: tuple[Exception, ...], retry_intervals: tuple[float, ...] = (3.0, 3.0, 5.0, 5.0),