Skip to content

Commit

Permalink
Add Slack webhook exposed key handling (#605)
Browse files Browse the repository at this point in the history
  • Loading branch information
gjcthinkst authored Nov 20, 2024
1 parent cf65a03 commit f221b26
Show file tree
Hide file tree
Showing 6 changed files with 266 additions and 91 deletions.
26 changes: 1 addition & 25 deletions canarytokens/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from __future__ import annotations

import datetime
from typing import Any, Coroutine, List, Optional, Union
from typing import Any, Coroutine, Optional, Union

import twisted.internet.reactor
from twisted.internet import threads
Expand All @@ -24,8 +24,6 @@
GoogleChatHeader,
GoogleChatSection,
Memo,
SlackAttachment,
SlackField,
DiscordDetails,
DiscordEmbeds,
DiscordAuthorField,
Expand All @@ -34,35 +32,13 @@
MsTeamsPotentialAction,
TokenAlertDetails,
TokenAlertDetailsGoogleChat,
TokenAlertDetailsSlack,
TokenAlertDetailsDiscord,
TokenAlertDetailsMsTeams,
)

log = Logger()


def format_as_slack_canaryalert(details: TokenAlertDetails) -> TokenAlertDetailsSlack:
"""
Transforms `TokenAlertDetails` to `TokenAlertDetailsSlack`.
"""
fields: List[SlackField] = [
SlackField(title="Channel", value=details.channel),
SlackField(title="Memo", value=details.memo),
SlackField(
title="time",
value=details.time.strftime("%Y-%m-%d %H:%M:%S (UTC)"),
),
SlackField(title="Manage", value=details.manage_url),
]

attchments = [SlackAttachment(title_link=details.manage_url, fields=fields)]
return TokenAlertDetailsSlack(
# channel="#general",
attachments=attchments,
)


def format_as_googlechat_canaryalert(
details: TokenAlertDetails,
) -> TokenAlertDetailsGoogleChat:
Expand Down
39 changes: 6 additions & 33 deletions canarytokens/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,12 @@
CANARY_IMAGE_URL,
MEMO_MAX_CHARACTERS,
)
from canarytokens.utils import prettify_snake_case, dict_to_csv, get_src_ip_continent
from canarytokens.utils import (
json_safe_dict,
prettify_snake_case,
dict_to_csv,
get_src_ip_continent,
)

CANARYTOKEN_RE = re.compile(
".*([" + "".join(CANARYTOKEN_ALPHABET) + "]{" + str(CANARYTOKEN_LENGTH) + "}).*",
Expand Down Expand Up @@ -401,10 +406,6 @@ def __str__(self) -> str:
]


def json_safe_dict(m: BaseModel, exclude: Tuple = ()) -> Dict[str, str]:
return json.loads(m.json(exclude_none=True, exclude=set(exclude)))


class TokenRequest(BaseModel):
"""
TokenRequest holds fields needed to create a Canarytoken.
Expand Down Expand Up @@ -2138,25 +2139,6 @@ class Config:
}


class SlackField(BaseModel):
title: str
value: str
short: bool = True


class SlackAttachment(BaseModel):
title: str = "Canarytoken Triggered"
title_link: HttpUrl
mrkdwn_in: List[str] = ["title"]
fallback: str = ""
fields: List[SlackField]

def __init__(__pydantic_self__, **data: Any) -> None:
# HACK: We can do better here.
data["fallback"] = f"Canarytoken Triggered: {data['title_link']}"
super().__init__(**data)


class GoogleChatDecoratedText(BaseModel):
topLabel: str = ""
text: str = ""
Expand Down Expand Up @@ -2301,15 +2283,6 @@ def json_safe_dict(self) -> Dict[str, str]:
return json_safe_dict(self)


class TokenAlertDetailsSlack(BaseModel):
"""Details that are sent to slack webhooks."""

attachments: List[SlackAttachment]

def json_safe_dict(self) -> Dict[str, str]:
return json_safe_dict(self)


class MsTeamsDetailsSection(BaseModel):
canarytoken: Canarytoken
token_reminder: Memo
Expand Down
25 changes: 2 additions & 23 deletions canarytokens/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import re
import secrets
from ipaddress import IPv4Address
from typing import Dict, List, Literal, Optional, Tuple, Union
from typing import Dict, List, Literal, Optional, Tuple

import advocate
import requests
Expand Down Expand Up @@ -876,29 +876,8 @@ def validate_webhook(url, token_type: models.TokenTypes):
if len(url) > constants.MAX_WEBHOOK_URL_LENGTH:
raise WebhookTooLongError()

payload: Union[
models.TokenAlertDetails,
models.TokenAlertDetailsSlack,
models.TokenAlertDetailsGoogleChat,
models.TokenAlertDetailsDiscord,
models.TokenAlertDetailsMsTeams,
]
webhook_type = get_webhook_type(url)
if webhook_type == WebhookType.SLACK:
payload = models.TokenAlertDetailsSlack(
attachments=[
models.SlackAttachment(
title_link=HttpUrl("https://test.com/check", scheme="https"),
fields=[
models.SlackField(
title="test",
value="Working",
)
],
)
]
)
elif webhook_type == WebhookType.GOOGLE_CHAT:
if webhook_type == WebhookType.GOOGLE_CHAT:
# construct google chat alert card
card = models.GoogleChatCard(
header=models.GoogleChatHeader(
Expand Down
8 changes: 7 additions & 1 deletion canarytokens/utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
import json
import subprocess
from pathlib import Path
from typing import Any, Literal, Union
from typing import Any, Dict, Literal, Tuple, Union

import pycountry_convert
from pydantic import BaseModel


def json_safe_dict(m: BaseModel, exclude: Tuple = ()) -> Dict[str, str]:
return json.loads(m.json(exclude_none=True, exclude=set(exclude)))


def dict_to_csv(d: dict) -> str:
Expand Down
Loading

0 comments on commit f221b26

Please sign in to comment.