Skip to content

Commit

Permalink
Add MS Teams webhook exposed key handling (#608)
Browse files Browse the repository at this point in the history
  • Loading branch information
gjcthinkst authored Nov 20, 2024
1 parent fdce653 commit 8e71ee4
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 125 deletions.
26 changes: 0 additions & 26 deletions canarytokens/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,39 +19,13 @@
AnyTokenHit,
AnyTokenExposedHit,
Memo,
MsTeamsTitleSection,
MsTeamsDetailsSection,
MsTeamsPotentialAction,
TokenAlertDetails,
TokenAlertDetailsMsTeams,
)

log = Logger()



def format_as_ms_teams_canaryalert(
details: TokenAlertDetails,
) -> TokenAlertDetailsMsTeams:
sections = [
MsTeamsTitleSection(activityTitle="<b>Canarytoken triggered</b>"),
MsTeamsDetailsSection(
canarytoken=details.token,
token_reminder=details.memo,
src_data=details.src_data if details.src_data else None,
additional_data=details.additional_data,
),
]

return TokenAlertDetailsMsTeams(
summary="Canarytoken triggered",
sections=sections,
potentialAction=[
MsTeamsPotentialAction(name="Manage", target=[details.manage_url])
],
)


class Channel(object):
CHANNEL = "Base"

Expand Down
66 changes: 0 additions & 66 deletions canarytokens/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
from canarytokens.constants import (
CANARYTOKEN_ALPHABET,
CANARYTOKEN_LENGTH,
CANARY_IMAGE_URL,
MEMO_MAX_CHARACTERS,
)
from canarytokens.utils import (
Expand Down Expand Up @@ -2140,71 +2139,6 @@ class Config:




class MsTeamsDetailsSection(BaseModel):
canarytoken: Canarytoken
token_reminder: Memo
src_data: Optional[dict[str, Any]] = None
additional_data: Optional[dict[str, Any]] = None

def dict(self, *args, **kwargs):
data = json_safe_dict(self)
data["Canarytoken"] = data.pop("canarytoken", "")
data["Token Reminder"] = data.pop("token_reminder", "")
if "src_data" in data:
data["Source Data"] = data.pop("src_data", "")

if data["additional_data"]:
add_data = data.pop("additional_data", {})
data.update(add_data)

facts = []
for k, v in data.items():
if not v:
continue

if isinstance(v, dict):
v = dict_to_csv(v)
else:
v = str(v)

facts.append({"name": prettify_snake_case(k), "value": v})

return {"facts": facts}


class MsTeamsTitleSection(BaseModel):
activityTitle: str
activityImage = CANARY_IMAGE_URL


class MsTeamsPotentialAction(BaseModel):
name: str
target: List[AnyHttpUrl]
type: str = "ViewAction"
context: str = "http://schema.org"

def dict(self, *args, **kwargs):
d = super().dict(*args, **kwargs)

d["@type"] = d.pop("type")
d["@context"] = d.pop("context")

return d


class TokenAlertDetailsMsTeams(BaseModel):
"""Details that are sent to MS Teams webhooks."""

summary: str
themeColor = "ff0000"
sections: Optional[List[Union[MsTeamsTitleSection, MsTeamsDetailsSection]]] = None
potentialAction: Optional[List[MsTeamsPotentialAction]] = None

def json_safe_dict(self) -> Dict[str, str]:
return json_safe_dict(self)


class UserName(ConstrainedStr):
max_lengthint: int = 30
strip_whitespace: bool = True
Expand Down
13 changes: 2 additions & 11 deletions canarytokens/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import re
import secrets
from ipaddress import IPv4Address
from typing import Literal, Optional
from typing import Literal, Optional, Union

import advocate
import requests
Expand Down Expand Up @@ -42,7 +42,6 @@
KEY_WIREGUARD_KEYMAP,
)
from canarytokens.webhook_formatting import (
WebhookType,
generate_webhook_test_payload,
get_webhook_type,
)
Expand Down Expand Up @@ -877,15 +876,7 @@ def validate_webhook(url, token_type: models.TokenTypes):
raise WebhookTooLongError()

webhook_type = get_webhook_type(url)
if webhook_type == WebhookType.MS_TEAMS:
section = models.MsTeamsTitleSection(
activityTitle="<b>Validating new Canarytokens webhook</b>"
)
payload = models.TokenAlertDetailsMsTeams(
summary="Validating new Canarytokens webhook", sections=[section]
)
else:
payload = generate_webhook_test_payload(webhook_type, token_type)
payload = generate_webhook_test_payload(webhook_type, token_type)

response = advocate.post(
url,
Expand Down
1 change: 0 additions & 1 deletion canarytokens/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import subprocess
from pathlib import Path
from typing import Any, Literal, Union
import json

import pycountry_convert
from pydantic import BaseModel
Expand Down
134 changes: 122 additions & 12 deletions canarytokens/webhook_formatting.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,14 @@
from pydantic import BaseModel, HttpUrl, parse_obj_as, validator

from canarytokens import constants
from canarytokens.utils import json_safe_dict, prettify_snake_case
from canarytokens.channel import (
format_as_ms_teams_canaryalert,
)
from canarytokens.utils import json_safe_dict, prettify_snake_case, dict_to_csv
from canarytokens.models import (
readable_token_type_names,
Memo,
TokenTypes,
TokenAlertDetails,
TokenExposedDetails,
)
from canarytokens.utils import json_safe_dict, prettify_snake_case


CANARY_LOGO_ROUND_PUBLIC_URL = parse_obj_as(
HttpUrl,
Expand Down Expand Up @@ -111,7 +106,7 @@ def _format_alert_details_for_webhook(
elif webhook_type == WebhookType.DISCORD:
return _format_as_discord_canaryalert(details)
elif webhook_type == WebhookType.MS_TEAMS:
return format_as_ms_teams_canaryalert(details)
return _format_as_ms_teams_canaryalert(details)
elif webhook_type == WebhookType.GENERIC:
return TokenAlertDetailGeneric(**details.dict())
else:
Expand All @@ -130,9 +125,7 @@ def _format_exposed_details_for_webhook(
elif webhook_type == WebhookType.DISCORD:
return _format_as_discord_token_exposed(details)
elif webhook_type == WebhookType.MS_TEAMS:
raise NotImplementedError(
f"_format_exposed_details_for_webhook not implemented for webhook type: {webhook_type}"
)
return _format_as_ms_teams_token_exposed(details)
elif webhook_type == WebhookType.GENERIC:
return TokenExposedDetailGeneric(**details.dict())
else:
Expand Down Expand Up @@ -176,8 +169,11 @@ def generate_webhook_test_payload(webhook_type: WebhookType, token_type: TokenTy
)
return TokenAlertDetailsDiscord(embeds=[embeds])
elif webhook_type == WebhookType.MS_TEAMS:
raise NotImplementedError(
"generate_webhook_test_payload not implemented for MS_TEAMS"
section = MsTeamsTitleSection(
activityTitle="<b>Validating new Canarytokens webhook</b>"
)
return TokenAlertDetailsMsTeams(
summary="Validating new Canarytokens webhook", sections=[section]
)
elif webhook_type == WebhookType.GENERIC:
return TokenAlertDetails(
Expand Down Expand Up @@ -761,6 +757,120 @@ def json_safe_dict(self) -> dict[str, str]:
return json_safe_dict(self)


def _format_as_ms_teams_canaryalert(
details: TokenAlertDetails,
) -> TokenAlertDetailsMsTeams:
facts = [
MsTeamsFact(name="Canarytoken", value=details.token),
MsTeamsFact(name="Token Reminder", value=details.memo),
]

if details.src_data:
facts.extend(_data_to_ms_teams_facts(details.src_data))
if details.additional_data:
facts.extend(_data_to_ms_teams_facts(details.additional_data))

sections = [
MsTeamsTitleSection(activityTitle="<b>Canarytoken Triggered</b>"),
MsTeamsDetailsSection(facts=facts),
]

return TokenAlertDetailsMsTeams(
summary="Canarytoken Triggered",
themeColor=HexColor.ERROR.value_without_hash,
sections=sections,
potentialAction=[
MsTeamsPotentialAction(name="Manage token", target=[details.manage_url])
],
)


def _format_as_ms_teams_token_exposed(
details: TokenExposedDetails,
) -> TokenAlertDetailsMsTeams:
facts = [
MsTeamsFact(name="Key ID", value=details.key_id),
MsTeamsFact(name="Token Reminder", value=details.memo),
MsTeamsFact(
name="Key exposed at",
value=details.exposed_time.strftime("%Y-%m-%d %H:%M:%S (UTC)"),
),
MsTeamsFact(name="Key exposed here", value=details.public_location),
]

sections = [
MsTeamsTitleSection(activityTitle="<b>Canarytoken Exposed</b>"),
MsTeamsDetailsSection(
facts=facts, text=_get_exposed_token_description(details.token_type)
),
]

return TokenAlertDetailsMsTeams(
summary="Canarytoken Exposed",
themeColor=HexColor.WARNING.value_without_hash,
sections=sections,
potentialAction=[
MsTeamsPotentialAction(name="Manage token", target=[details.manage_url])
],
)


def _data_to_ms_teams_facts(data: dict[str, Union[str, dict]]) -> list[MsTeamsFact]:
facts: list[MsTeamsFact] = []

for label, value in data.items():
if not label or not value:
continue

message_text = dict_to_csv(value) if isinstance(value, dict) else value
facts.append(MsTeamsFact(name=prettify_snake_case(label), value=message_text))

return facts


class MsTeamsFact(BaseModel):
name: str
value: str


class MsTeamsDetailsSection(BaseModel):
facts: list[MsTeamsFact]
text: Optional[str] = None


class MsTeamsTitleSection(BaseModel):
activityTitle: str
activityImage = CANARY_LOGO_ROUND_PUBLIC_URL


class MsTeamsPotentialAction(BaseModel):
name: str
target: list[HttpUrl]
type: str = "ViewAction"
context: str = "http://schema.org"

def dict(self, *args, **kwargs):
d = super().dict(*args, **kwargs)

d["@type"] = d.pop("type")
d["@context"] = d.pop("context")

return d


class TokenAlertDetailsMsTeams(BaseModel):
"""Details that are sent to MS Teams webhooks."""

summary: str
themeColor: str = HexColor.CANARY_GREEN.value
sections: Optional[list[Union[MsTeamsTitleSection, MsTeamsDetailsSection]]] = None
potentialAction: Optional[list[MsTeamsPotentialAction]] = None
text: Optional[str] = None

def json_safe_dict(self) -> dict[str, str]:
return json_safe_dict(self)


class TokenAlertDetailGeneric(TokenAlertDetails):
...

Expand Down
15 changes: 6 additions & 9 deletions tests/units/test_channel_output_webhook.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,9 @@
from twisted.logger import capturedLogs

from canarytokens.canarydrop import Canarydrop
from canarytokens.channel import (
format_as_ms_teams_canaryalert,
)
from canarytokens.channel_dns import ChannelDNS
from canarytokens.channel_output_webhook import WebhookOutputChannel
from canarytokens.models import (
TokenAlertDetailsMsTeams,
TokenTypes,
)
from canarytokens.settings import FrontendSettings, SwitchboardSettings
Expand All @@ -22,6 +18,7 @@
format_details_for_webhook,
get_webhook_type,
TokenAlertDetailsGoogleChat,
TokenAlertDetailsMsTeams,
)


Expand Down Expand Up @@ -277,23 +274,23 @@ def test_ms_teams_webhook_format(
protocol=input_channel.switchboard_scheme,
host=settings.PUBLIC_DOMAIN,
)
webhook_payload = format_as_ms_teams_canaryalert(details=details)
webhook_payload = format_details_for_webhook(WebhookType.MS_TEAMS, details)
payload = webhook_payload.json_safe_dict()

assert payload["summary"] == "Canarytoken triggered"
assert payload["themeColor"] == "ff0000"
assert payload["summary"] == "Canarytoken Triggered"
assert payload["themeColor"] == "d32f2f"
assert payload["potentialAction"] == [
{
"@context": "http://schema.org",
"@type": "ViewAction",
"name": "Manage",
"name": "Manage token",
"target": [details.manage_url],
}
]
assert len(payload["sections"]) == 2

assert payload["sections"][0] == {
"activityTitle": "<b>Canarytoken triggered</b>",
"activityTitle": "<b>Canarytoken Triggered</b>",
"activityImage": CANARY_IMAGE_URL,
}

Expand Down
Loading

0 comments on commit 8e71ee4

Please sign in to comment.