Skip to content

Commit

Permalink
add token deletion tests, fix broken tests
Browse files Browse the repository at this point in the history
  • Loading branch information
wleightond committed Jun 13, 2024
1 parent a579213 commit 6b3b80b
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 22 deletions.
1 change: 1 addition & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ on:
branches:
- "master"
- "dev"
- "Feature_branch_New_UI"

workflow_dispatch:
inputs:
Expand Down
7 changes: 4 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
import time
from distutils.util import strtobool
from pathlib import Path
from typing import Generator, Optional
from typing import Any, Generator, Optional
from unittest import mock

import pytest
from redis import StrictRedis
import requests
from requests import HTTPError
import uvicorn # type: ignore
Expand Down Expand Up @@ -259,7 +260,7 @@ def setup_db_connection_only(settings: SwitchboardSettings):
@pytest.fixture(scope="function", autouse=False)
def setup_db( # noqa: C901
settings: SwitchboardSettings, frontend_settings: FrontendSettings
):
) -> Generator[Any, Any, Optional[StrictRedis]]:
redis_hostname = "localhost" if strtobool(os.getenv("CI", "False")) else "redis"
DB.set_db_details(hostname=redis_hostname, port=6379)
# Kubeconfig token needs a client cert in redis.
Expand Down Expand Up @@ -307,7 +308,7 @@ def setup_db( # noqa: C901
add_canary_page("post.jsp")
add_canary_path_element("tags")

yield
yield db

for key in db.scan_iter():
if not any(key.startswith(key_prefix) for key_prefix in prefixes_to_persist):
Expand Down
35 changes: 23 additions & 12 deletions tests/integration/test_against_token_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
DNSTokenHistory,
DNSTokenRequest,
DNSTokenResponse,
DownloadFmtTypes,
DownloadIncidentListJsonRequest,
FastRedirectTokenHistory,
FastRedirectTokenRequest,
FastRedirectTokenResponse,
Expand All @@ -43,6 +45,7 @@
from tests.utils import (
clear_stats_on_webhook,
create_token,
delete_token,
get_stats_from_webhook,
get_token_history,
log_4_shell_fire_token,
Expand All @@ -56,32 +59,40 @@
)


@pytest.mark.parametrize("version", [None])
def test_basic_v3(version, runv3, runv2): # pragma: no cover
@pytest.mark.parametrize("version", [v3])
def test_delete_token(version, runv3, runv2):
run_or_skip(version, runv2=runv2, runv3=runv3)
token_request = DNSTokenRequest(
webhook_url=slack_webhook_test,
email="test@test.com",
memo="We are v3",
)
resp = create_token(token_request, version=version)

# Check dns token has correct attributes
token_info = DNSTokenResponse(**resp)
# assert dns_token_info.webhook_url == token_request.webhook_url
assert token_info.hostname.split(".")[0] == token_info.token

# Trigger DNS token
_ = plain_fire_token(token_info, version=version)
# Trigger it once
_ = plain_fire_token(token_info, version)

# Check that the returned history has a single hit.
# Check that we can query the token's history
resp = get_token_history(token_info=token_info, version=version)
_ = DNSTokenHistory(**resp)

token_history = DNSTokenHistory(**resp)
resp = delete_token(token_info.token, token_info.auth_token, version)
assert resp.get("message") == "success"

# TODO: what other fields do we want to assert on.
# note: making them TokenHistory have stronger validators is
# the better option.
assert len(token_history.hits) == 1
token_history_request = DownloadIncidentListJsonRequest(
token=token_info.token,
# TODO: auth vs. auth_token choose one at least at the object level
auth=token_info.auth_token,
fmt=DownloadFmtTypes.INCIDENTLISTJSON,
)
resp = requests.get(
url=f"{version.server_url}/download", params=token_history_request
)

assert resp.status_code == 403


@pytest.mark.parametrize(
Expand Down
7 changes: 5 additions & 2 deletions tests/units/test_frontend.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ def test_get_generate_page(test_client: TestClient) -> None:


def test_redirect_base_to_generate(test_client: TestClient) -> None:
if FrontendSettings().NEW_UI:
pytest.skip("New UI does not redirect to /generate")
response = test_client.get("/")
assert response.status_code == 200
assert response.url.path == "/generate"
Expand Down Expand Up @@ -534,8 +536,7 @@ def test_history_page(
).dict(),
)
assert resp.status_code == 200
# TODO: Make this a stricter test
assert cd.canarytoken.value() in resp.content.decode()
# now that it's a Vue app we can't test further here


@pytest.mark.parametrize(
Expand All @@ -558,6 +559,8 @@ def test_authorised_page_access(
endpoint (str): endpoint to attempt to access.
verb (str): HTTP verb for endpoint.
"""
if FrontendSettings().NEW_UI:
pytest.skip("New UI redirects these to index.html")
resp = test_client.post(
"/generate",
data=DNSTokenRequest(
Expand Down
105 changes: 101 additions & 4 deletions tests/units/test_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import pytest
from pydantic import EmailStr
from redis import StrictRedis

from canarytokens import queries
from canarytokens.canarydrop import Canarydrop
Expand All @@ -14,9 +15,17 @@
Memo,
TokenTypes,
)
from canarytokens.redismanager import (
KEY_AUTH_IDX,
KEY_CANARYDROP,
KEY_CANARYDROPS_TIMELINE,
KEY_EMAIL_IDX,
KEY_WEBHOOK_IDX,
)
from canarytokens.queries import (
add_canarydrop_hit,
add_webhook_token_idx,
delete_canarydrop,
delete_email_tokens,
delete_webhook_tokens,
get_canarydrop,
get_canarydrop_and_authenticate,
Expand All @@ -27,11 +36,47 @@


def test_add_delete_webhook(setup_db):
token_value = Canarytoken.generate()
db: StrictRedis = setup_db
webhook_url = "https://slack.com/api/api.test"
ret = add_webhook_token_idx(webhook_url, canarytoken=token_value)
assert ret == 1

canarytoken = Canarytoken()
canarydrop = Canarydrop(
generate=True,
type=TokenTypes.DNS,
alert_webhook_enabled=True,
alert_webhook_url=webhook_url,
canarytoken=canarytoken,
memo="stuff happened",
browser_scanner_enabled=False,
)
save_canarydrop(canarydrop)

key = KEY_WEBHOOK_IDX + webhook_url
assert len(db.smembers(key)) == 1
delete_webhook_tokens(webhook=webhook_url)
assert not db.exists(key)


def test_add_delete_email(setup_db):
db: StrictRedis = setup_db
email = "test@test.com"

canarytoken = Canarytoken()
canarydrop = Canarydrop(
generate=True,
type=TokenTypes.DNS,
alert_email_enabled=True,
alert_email_recipient=email,
canarytoken=canarytoken,
memo="stuff happened",
browser_scanner_enabled=False,
)
save_canarydrop(canarydrop)

key = KEY_EMAIL_IDX + email
assert len(db.smembers(key)) == 1
delete_email_tokens(email_address=email)
assert not db.exists(key)


def test_add_hit_get_canarytoken(setup_db):
Expand Down Expand Up @@ -96,6 +141,58 @@ def test_add_hit_get_canarytoken_wrong_type(setup_db):
assert cd.triggered_details


def test_delete_drop(setup_db):
db: StrictRedis = setup_db

email = "test@test.com"
webhook = "https://slack.com/api/api.test"

canarytoken = Canarytoken()

canarydrop = Canarydrop(
generate=True,
type=TokenTypes.DNS,
alert_email_enabled=True,
alert_email_recipient=email,
alert_webhook_enabled=True,
alert_webhook_url=webhook,
canarytoken=canarytoken,
memo="stuff happened",
browser_scanner_enabled=False,
)

critical_keys = [
KEY_CANARYDROP + canarytoken.value(),
KEY_EMAIL_IDX + email,
KEY_WEBHOOK_IDX + webhook,
KEY_AUTH_IDX + canarydrop.auth,
]

save_canarydrop(canarydrop)
token_hit = DNSTokenHit(
time_of_hit=datetime.utcnow().strftime("%s.%f"),
src_ip="127.0.0.1",
geo_info=GeoIPBogonInfo(ip="127.0.0.1", bogon=True),
is_tor_relay=False,
input_channel="dns",
additional_info=AdditionalInfo(),
location="/get",
useragent="Unknown",
)
add_canarydrop_hit(token_hit=token_hit, canarytoken=canarytoken)
cd = get_canarydrop(canarytoken=canarytoken)

for key in critical_keys:
assert db.exists(key)
assert db.zscore(KEY_CANARYDROPS_TIMELINE, canarytoken.value()) is not None

delete_canarydrop(cd)

for key in critical_keys:
assert not db.exists(key)
assert db.zscore(KEY_CANARYDROPS_TIMELINE, canarytoken.value()) is None


def test_remove_tokens_with_email_x(setup_db):
"""
There is a 1 to many mapping of email to tokens.
Expand Down
36 changes: 35 additions & 1 deletion tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
CustomImageTokenRequest,
CustomImageTokenResponse,
DNSTokenResponse,
DownloadFmtTypes,
DownloadGetRequestModel,
DownloadIncidentListJsonRequest,
GeoIPBogonInfo,
Expand All @@ -50,6 +51,7 @@
WindowsDirectoryTokenResponse,
)
from canarytokens.tokens import Canarytoken
from frontend.app import ROOT_API_ENDPOINT

log = Logger("test_utils")

Expand Down Expand Up @@ -292,7 +294,7 @@ def get_token_history(
],
version: Union[V2, V3],
expected_len: int = 1,
fmt="incidentlist_json",
fmt: DownloadFmtTypes = DownloadFmtTypes.INCIDENTLISTJSON,
) -> Dict[str, str]:
token_history_request = DownloadIncidentListJsonRequest(
token=token_info.token,
Expand Down Expand Up @@ -488,6 +490,38 @@ def create_token(token_request: TokenRequest, version: Union[V2, V3]):
return data


@retry_on_failure(
retry_when_raised=(requests.exceptions.HTTPError, CanaryTokenCreationError)
)
def delete_token(token: str, auth: str, version: Union[V2, V3]):
delete_url = f"{version.server_url}{ROOT_API_ENDPOINT}/delete"
data = {"token": token, "auth": auth}
timeout = request_timeout
if not isinstance(version, V3):
raise ValueError(f"Version not supported: {version}")

resp = session.post(
url=delete_url,
timeout=timeout,
json=data,
headers={"Connection": "close"},
)
# TODO / DESIGN: The webhook receiver sometimes chokes due to ngrok rate limit 429 error.
# retry for now. +1 for a webhook receiver as a docker service.
if resp.status_code == 429: # webhook failed not the servers fault
raise CanaryTokenCreationError("Webhook failed to validate") # pragma: no cover
if resp.status_code >= 400:
log.error(
f"Token deletion error: \n\t{token=}\n\t{resp.status_code=}; {resp.json()=}"
)
resp.raise_for_status()
data = resp.json()
resp.close()
session.close()

return data


def make_token_alert_detail(
channel: Optional[str] = None,
token_type: Optional[TokenTypes] = None,
Expand Down

0 comments on commit 6b3b80b

Please sign in to comment.