Skip to content

Commit

Permalink
add tests for the windows fake file system token
Browse files Browse the repository at this point in the history
  • Loading branch information
thinkst-pieter committed Nov 20, 2024
1 parent d80fff6 commit e8de142
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 0 deletions.
93 changes: 93 additions & 0 deletions tests/integration/test_windows_fake_fs_token.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import random
import pytest

from canarytokens.models import (
WindowsFakeFSTokenHistory,
WindowsFakeFSTokenRequest,
WindowsFakeFSTokenResponse,
)
from canarytokens.webhook_formatting import TokenAlertDetailGeneric

from tests.utils import (
clear_stats_on_webhook,
trigger_windows_fake_fs_token,
create_token,
get_stats_from_webhook,
get_token_history,
run_or_skip,
v3,
)


@pytest.mark.parametrize("version", [v3])
@pytest.mark.parametrize(
"file_name, process_name",
[
("doc b.docx", "explorer.exe"),
("doc b.doc", "explorer.exe"),
],
)
def test_windows_fake_fs_token_fires(
file_name: str,
process_name: str,
webhook_receiver,
version,
runv2,
runv3,
):
"""
Tests the Windows Fake FS token.
"""
run_or_skip(version, runv2=runv2, runv3=runv3)
expected_hits = 1

# Create a Windows Fake File System token request
memo = "Test stuff break stuff test stuff sometimes build stuff"
root_dir = r"C:\Secrets"
file_structure = "TESTFS"

token_request = WindowsFakeFSTokenRequest(
webhook_url=webhook_receiver,
memo=memo,
windows_fake_fs_root=root_dir,
windows_fake_fs_file_structure=file_structure,
)
resp = create_token(token_request, version=version)

# Check dns token has correct attributes
token_info = WindowsFakeFSTokenResponse(**resp)
assert token_info.token in token_info.hostname.split(".")

clear_stats_on_webhook(webhook_receiver, token=token_info.token)
# Trigger CMD token twice, make sure the invocation ID limits it to one hit
invocation_id = random.randint(1000, 10000)
_ = trigger_windows_fake_fs_token(
token_info=token_info,
version=version,
invocation_id=invocation_id,
file_name=file_name,
process_name=process_name,
)
_ = trigger_windows_fake_fs_token(
token_info=token_info,
version=version,
invocation_id=invocation_id,
file_name=file_name,
process_name=process_name,
)

stats = get_stats_from_webhook(webhook_receiver, token=token_info.token)
if stats is not None:
# Check that what was sent to the webhook is consistent.
assert len(stats) == expected_hits
assert stats[0]["memo"] == memo
_ = TokenAlertDetailGeneric(**stats[0])

# Check that the returned history has a single hit.
resp = get_token_history(token_info=token_info, version=version)

token_history = WindowsFakeFSTokenHistory(**resp)
# TODO: what other fields do we want to assert on.
# note: making them TokenHistory have stronger validators is
# the better option.
assert len(token_history.hits) == expected_hits
22 changes: 22 additions & 0 deletions tests/units/test_tokens.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from typing import Optional

import pytest
import random
import base64

from canarytokens import tokens as t
from canarytokens.models import TokenTypes
Expand Down Expand Up @@ -125,6 +127,26 @@ def test_windows_fake_fs_pattern(query, invocation_id, file_name, process_name):
assert data["src_data"]["windows_fake_fs_process_name"] == process_name.lower()


def test_windows_fake_fs_base32_padding():
invocation_id = f"{random.randint(1000, 10000)}"
for counter in range(1, 20):
test_data = "a" * counter
base32_data = base64.b32encode(test_data.encode("utf-8")).decode()
base32_data_no_padding = base32_data.replace("=", "")
query = "u{invocation_id}.f{file_name}.i{process_name}.{domain}".format(
invocation_id=invocation_id,
file_name=base32_data_no_padding,
process_name=base32_data_no_padding,
domain="someid.sometoken.com",
)

m = t.windows_fake_fs_pattern.match(query)
data = t.Canarytoken._windows_fake_fs(m)
assert data["src_data"]["windows_fake_fs_invocation_id"] == invocation_id
assert data["src_data"]["windows_fake_fs_file_name"] == test_data
assert data["src_data"]["windows_fake_fs_process_name"] == test_data


def test_canarytoken_create_and_fetch():
ct = t.Canarytoken()
ct_new = t.Canarytoken(value=ct.value())
Expand Down
33 changes: 33 additions & 0 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from functools import wraps
from logging import Logger
from typing import Callable, Dict, Optional, Union
import base64
import re

import dns.resolver
import pytest
Expand All @@ -29,6 +31,7 @@
AzureIDTokenResponse,
AzureIDAdditionalInfo,
CMDTokenResponse,
WindowsFakeFSTokenResponse,
CustomBinaryTokenRequest,
CustomBinaryTokenResponse,
CustomImageTokenRequest,
Expand Down Expand Up @@ -203,6 +206,36 @@ def trigger_cmd_token(
return target


def trigger_windows_fake_fs_token(
token_info: WindowsFakeFSTokenResponse,
version: Union[V2, V3],
invocation_id: int,
file_name: str = "doc b.docx",
process_name: str = "explorer.exe",
) -> str:
"""
Triggers a Windows Fake File System token by making a dns query with the expected parameters as Windows would.
"""
alert_domain_pattern = re.compile(
r"\$alertDomain = \"([A-Za-z0-9.]*)\"", re.IGNORECASE | re.MULTILINE
)
re_search_result = alert_domain_pattern.search(token_info.powershell_file)
domain = re_search_result.groups()[0]

target = "u{invocation_id}.f{file_name}.i{process_name}.{domain}".format(
invocation_id=invocation_id,
file_name=base64.b32encode(file_name.encode("utf-8")).decode().replace("=", ""),
process_name=base64.b32encode(process_name.encode("utf-8"))
.decode()
.replace("=", ""),
domain=domain,
)

resolver = grab_resolver(version=version)
resolver.resolve(target, "A")
return target


def retry_on_failure(
retry_when_raised: tuple[Exception, ...],
retry_intervals: tuple[float, ...] = (3.0, 3.0, 5.0, 5.0),
Expand Down

0 comments on commit e8de142

Please sign in to comment.