From 4aef1db90e59d3e3d7f316fc90d1b22a89be9b62 Mon Sep 17 00:00:00 2001 From: gjcthinkst <135841050+gjcthinkst@users.noreply.github.com> Date: Wed, 20 Nov 2024 15:32:30 +0200 Subject: [PATCH] Add Google Chat webhook exposed key handling (#606) --- canarytokens/channel.py | 42 ---- canarytokens/models.py | 82 ------- canarytokens/queries.py | 17 +- canarytokens/webhook_formatting.py | 265 ++++++++++++++++++++- tests/units/test_channel_output_webhook.py | 11 +- tests/units/test_webhook_formatting.py | 3 + 6 files changed, 268 insertions(+), 152 deletions(-) diff --git a/canarytokens/channel.py b/canarytokens/channel.py index 655e76e60..c4b15fe81 100644 --- a/canarytokens/channel.py +++ b/canarytokens/channel.py @@ -18,11 +18,6 @@ from canarytokens.models import ( AnyTokenHit, AnyTokenExposedHit, - GoogleChatAlertDetailsSectionData, - GoogleChatCard, - GoogleChatCardV2, - GoogleChatHeader, - GoogleChatSection, Memo, DiscordDetails, DiscordEmbeds, @@ -31,7 +26,6 @@ MsTeamsDetailsSection, MsTeamsPotentialAction, TokenAlertDetails, - TokenAlertDetailsGoogleChat, TokenAlertDetailsDiscord, TokenAlertDetailsMsTeams, ) @@ -39,42 +33,6 @@ log = Logger() -def format_as_googlechat_canaryalert( - details: TokenAlertDetails, -) -> TokenAlertDetailsGoogleChat: - # construct google chat alert , top section - top_section = GoogleChatSection(header="Alert Details") - top_section.add_widgets( - widgets_info=GoogleChatAlertDetailsSectionData( - channel=details.channel, - time=details.time.strftime("%Y-%m-%d %H:%M:%S (UTC)"), - canarytoken=details.token, - token_reminder=details.memo, - manage_url=details.manage_url, - ).get_googlechat_data() - ) - # construct google chat alert , additional section - additional_section = GoogleChatSection(header="Additional Details") - if details.src_data: - additional_section.add_widgets(widgets_info=details.src_data) - if details.additional_data: - additional_section.add_widgets(widgets_info=details.additional_data) - - # construct google chat alert card - card = GoogleChatCard( - header=GoogleChatHeader( - title="Canarytoken Triggered", - imageUrl="https://s3-eu-west-1.amazonaws.com/email-images.canary.tools/canary-logo-round.png", - imageType="CIRCLE", - imageAltText="Thinkst Canary", - ), - sections=[top_section, additional_section], - ) - # make google chat payload - return TokenAlertDetailsGoogleChat( - cardsV2=[GoogleChatCardV2(cardId="unique-card-id", card=card)] - ) - def format_as_discord_canaryalert( details: TokenAlertDetails, diff --git a/canarytokens/models.py b/canarytokens/models.py index 05b12b8b4..663efaa05 100644 --- a/canarytokens/models.py +++ b/canarytokens/models.py @@ -2139,82 +2139,6 @@ class Config: } -class GoogleChatDecoratedText(BaseModel): - topLabel: str = "" - text: str = "" - - -class GoogleChatWidget(BaseModel): - decoratedText: GoogleChatDecoratedText - - -class GoogleChatAlertDetailsSectionData(BaseModel): - channel: str = "" - time: datetime - canarytoken: Canarytoken - token_reminder: Memo - manage_url: HttpUrl - - @validator("time", pre=True) - def validate_time(cls, value): - if isinstance(value, str): - return datetime.strptime(value, "%Y-%m-%d %H:%M:%S (UTC)") - return value - - def get_googlechat_data(self) -> Dict[str, str]: - data = json_safe_dict(self) - data["Channel"] = data.pop("channel", "") - data["Time"] = data.pop("time", "") - data["Canarytoken"] = data.pop("canarytoken", "") - data["Token Reminder"] = data.pop("token_reminder", "") - data["Manage URL"] = '{manage_url}'.format( - manage_url=data.pop("manage_url", "") - ) - return data - - class Config: - json_encoders = { - datetime: lambda v: v.strftime("%Y-%m-%d %H:%M:%S (UTC)"), - } - - -class GoogleChatHeader(BaseModel): - title: str = "Canarytoken Triggered" - imageUrl: HttpUrl - imageType: str = "CIRCLE" - imageAltText: str = "Thinkst Canary" - - -class GoogleChatSection(BaseModel): - header: str = "" - collapsible: bool = False - widgets: List[GoogleChatWidget] = [] - - def add_widgets(self, widgets_info: Optional[Dict[str, str]] = {}) -> None: - for label, text in widgets_info.items(): - if not label or not text: - continue - message_text = ( - json.dumps(text) if isinstance(text, dict) else "{}".format(text) - ) - self.widgets.append( - GoogleChatWidget( - decoratedText=GoogleChatDecoratedText( - topLabel=label, text=message_text - ) - ) - ) - - -class GoogleChatCard(BaseModel): - header: GoogleChatHeader - sections: List[GoogleChatSection] = [] - - -class GoogleChatCardV2(BaseModel): - cardId: str = "unique-card-id" - card: GoogleChatCard - class DiscordFieldEntry(BaseModel): name: str = "" @@ -2276,12 +2200,6 @@ class Config: } -class TokenAlertDetailsGoogleChat(BaseModel): - cardsV2: List[GoogleChatCardV2] - - def json_safe_dict(self) -> Dict[str, str]: - return json_safe_dict(self) - class MsTeamsDetailsSection(BaseModel): canarytoken: Canarytoken diff --git a/canarytokens/queries.py b/canarytokens/queries.py index 218ee96e7..4411a3a47 100644 --- a/canarytokens/queries.py +++ b/canarytokens/queries.py @@ -877,22 +877,7 @@ def validate_webhook(url, token_type: models.TokenTypes): raise WebhookTooLongError() webhook_type = get_webhook_type(url) - if webhook_type == WebhookType.GOOGLE_CHAT: - # construct google chat alert card - card = models.GoogleChatCard( - header=models.GoogleChatHeader( - title="Validating new canarytokens webhook", - imageUrl="https://s3-eu-west-1.amazonaws.com/email-images.canary.tools/canary-logo-round.png", - imageType="CIRCLE", - imageAltText="Thinkst Canary", - ), - sections=[], - ) - # make google chat payload - payload = models.TokenAlertDetailsGoogleChat( - cardsV2=[models.GoogleChatCardV2(cardId="unique-card-id", card=card)] - ) - elif webhook_type == WebhookType.DISCORD: + if webhook_type == WebhookType.DISCORD: # construct discord alert card embeds = models.DiscordEmbeds( author=models.DiscordAuthorField( diff --git a/canarytokens/webhook_formatting.py b/canarytokens/webhook_formatting.py index d5d4f7bb6..93b7f841b 100644 --- a/canarytokens/webhook_formatting.py +++ b/canarytokens/webhook_formatting.py @@ -1,6 +1,6 @@ from __future__ import annotations import json -from typing import Union, Literal +from typing import Optional, Union, Literal from enum import Enum import re from functools import partial @@ -11,7 +11,6 @@ from canarytokens import constants from canarytokens.channel import ( format_as_discord_canaryalert, - format_as_googlechat_canaryalert, format_as_ms_teams_canaryalert, ) from canarytokens.models import ( @@ -102,7 +101,7 @@ def _format_alert_details_for_webhook( if webhook_type == WebhookType.SLACK: return _format_as_slack_canaryalert(details) elif webhook_type == WebhookType.GOOGLE_CHAT: - return format_as_googlechat_canaryalert(details) + return _format_as_googlechat_canaryalert(details) elif webhook_type == WebhookType.DISCORD: return format_as_discord_canaryalert(details) elif webhook_type == WebhookType.MS_TEAMS: @@ -121,9 +120,7 @@ def _format_exposed_details_for_webhook( if webhook_type == WebhookType.SLACK: return _format_as_slack_token_exposed(details) elif webhook_type == WebhookType.GOOGLE_CHAT: - raise NotImplementedError( - f"_format_exposed_details_for_webhook not implemented for webhook type: {webhook_type}" - ) + return _format_as_googlechat_token_exposed(details) elif webhook_type == WebhookType.DISCORD: raise NotImplementedError( f"_format_exposed_details_for_webhook not implemented for webhook type: {webhook_type}" @@ -153,8 +150,17 @@ def generate_webhook_test_payload(webhook_type: WebhookType, token_type: TokenTy ] ) elif webhook_type == WebhookType.GOOGLE_CHAT: - raise NotImplementedError( - "generate_webhook_test_payload not implemented for GOOGLE_CHAT" + card = GoogleChatCard( + header=GoogleChatHeader( + title="Validating new Canarytokens webhook", + imageUrl=CANARY_LOGO_ROUND_PUBLIC_URL, + imageType="CIRCLE", + imageAltText="Thinkst Canary", + ), + sections=[], + ) + return TokenAlertDetailsGoogleChat( + cardsV2=[GoogleChatCardV2(cardId="unique-card-id", card=card)] ) elif webhook_type == WebhookType.DISCORD: raise NotImplementedError( @@ -391,6 +397,249 @@ def json_safe_dict(self) -> dict[str, str]: return json_safe_dict(self) +def _format_as_googlechat_canaryalert( + details: TokenAlertDetails, +) -> TokenAlertDetailsGoogleChat: + # construct google chat alert , top section + top_section = GoogleChatSection( + header="Alert Details", + widgets=_data_to_googlechat_text_widgets( + { + "channel": details.channel, + "time": details.time.strftime("%Y-%m-%d %H:%M:%S (UTC)"), + "canarytoken": details.token, + "token_reminder": details.memo, + } + ), + ) + + # construct google chat alert , additional section + additional_widgets: list[GoogleChatTextWithTopLabel] = [] + if details.src_data: + additional_widgets.extend(_data_to_googlechat_text_widgets(details.src_data)) + if details.additional_data: + additional_widgets.extend( + _data_to_googlechat_text_widgets(details.additional_data) + ) + + additional_widgets.append( + GoogleChatButtonList( + buttons=[ + GoogleChatButton( + text="Manage token", + url=details.manage_url, + material_icon="settings", + ) + ] + ) + ) + additional_section = GoogleChatSection( + header="Additional Details", widgets=additional_widgets + ) + + # construct google chat alert card + card = GoogleChatCard( + header=GoogleChatHeader( + title="Canarytoken Triggered", + imageUrl=CANARY_LOGO_ROUND_PUBLIC_URL, + imageType="CIRCLE", + imageAltText="Thinkst Canary", + ), + sections=[top_section, additional_section], + ) + # make google chat payload + return TokenAlertDetailsGoogleChat( + cardsV2=[GoogleChatCardV2(cardId="unique-card-id", card=card)] + ) + + +def _format_as_googlechat_token_exposed( + details: TokenExposedDetails, +) -> TokenAlertDetailsGoogleChat: + card = GoogleChatCardV2( + cardId="unique-card-id", + card=GoogleChatCard( + header=GoogleChatHeader( + title="Canarytoken Exposed", + imageUrl=CANARY_LOGO_ROUND_PUBLIC_URL, + imageType="CIRCLE", + imageAltText="Thinkst Canary", + ), + sections=[ + GoogleChatSection( + widgets=[ + GoogleChatParagraph( + text=_get_exposed_token_description(details.token_type) + ) + ] + ), + GoogleChatSection( + header="Exposure Details", + widgets=[ + GoogleChatColumns( + column_items=[ + GoogleChatColumnItems( + widgets=[ + GoogleChatTextWithTopLabel( + top_label="Key ID", text=details.key_id + ), + GoogleChatTextWithTopLabel( + top_label="Key exposed here", + text=f'{details.public_location}', + ), + ] + ), + GoogleChatColumnItems( + widgets=[ + GoogleChatTextWithTopLabel( + top_label="Token Reminder", + text=details.memo, + ), + GoogleChatTextWithTopLabel( + top_label="Key exposed at", + text=details.exposed_time.strftime( + "%Y-%m-%d %H:%M:%S (UTC)" + ), + ), + ] + ), + ] + ), + GoogleChatButtonList( + buttons=[ + GoogleChatButton( + text="Manage token", + url=details.manage_url, + material_icon="settings", + ) + ] + ), + ], + ), + ], + ), + ) + + return TokenAlertDetailsGoogleChat(cardsV2=[card]) + + +def _data_to_googlechat_text_widgets( + data: dict[str, str] +) -> list[GoogleChatTextWithTopLabel]: + widgets: list[GoogleChatTextWithTopLabel] = [] + for label, text in data.items(): + if not label or not text: + continue + + message_text = json.dumps(text) if isinstance(text, dict) else "{}".format(text) + widgets.append( + GoogleChatTextWithTopLabel( + text=message_text, top_label=prettify_snake_case(label) + ) + ) + + return widgets + + +class GoogleChatWidget(BaseModel): + ... + + +class GoogleChatParagraph(GoogleChatWidget): + text: str + + def dict(self, *args, **kwargs): + return {"textParagraph": {"text": self.text}} + + +class GoogleChatTextWithTopLabel(GoogleChatWidget): + text: str + top_label: Optional[str] = None + + def dict(self, *args, **kwargs): + d = {"decoratedText": {"text": self.text}} + if self.top_label: + d["decoratedText"]["topLabel"] = self.top_label + + return d + + +class GoogleChatButton(BaseModel): + text: str + url: HttpUrl + material_icon: Optional[str] = None + alt_text: Optional[str] = None + + def dict(self, *args, **kwargs): + d = {"text": self.text, "onClick": {"openLink": {"url": self.url}}} + if self.material_icon: + d["icon"] = { + "materialIcon": {"name": self.material_icon}, + "altText": self.alt_text or self.text.lower(), + } + + return d + + +class GoogleChatButtonList(GoogleChatWidget): + buttons: list[GoogleChatButton] + + def dict(self, *args, **kwargs): + return {"buttonList": {"buttons": [button.dict() for button in self.buttons]}} + + +class GoogleChatColumnItems(BaseModel): + widgets: list[GoogleChatWidget] + horizontalSizeStyle: str = "FILL_MINIMUM_SPACE" + horizontalAlignment: str = "START" + verticalAlignment: str = "CENTER" + + def dict(self, *args, **kwargs): + return { + "horizontalSizeStyle": self.horizontalSizeStyle, + "horizontalAlignment": self.horizontalAlignment, + "verticalAlignment": self.verticalAlignment, + "widgets": [widget.dict() for widget in self.widgets], + } + + +class GoogleChatColumns(GoogleChatWidget): + column_items: list[GoogleChatColumnItems] + + def dict(self, *args, **kwargs): + return {"columns": {"columnItems": [ci.dict() for ci in self.column_items]}} + + +class GoogleChatHeader(BaseModel): + title: str = "Canarytoken Triggered" + imageUrl: HttpUrl + imageType: str = "CIRCLE" + imageAltText: str = "Thinkst Canary" + + +class GoogleChatSection(BaseModel): + header: str = "" + collapsible: bool = False + widgets: list[GoogleChatWidget] = [] + + +class GoogleChatCard(BaseModel): + header: GoogleChatHeader + sections: list[GoogleChatSection] = [] + + +class GoogleChatCardV2(BaseModel): + cardId: str = "unique-card-id" + card: GoogleChatCard + + +class TokenAlertDetailsGoogleChat(BaseModel): + cardsV2: list[GoogleChatCardV2] + + def json_safe_dict(self) -> dict[str, str]: + return json_safe_dict(self) + + class TokenAlertDetailGeneric(TokenAlertDetails): ... diff --git a/tests/units/test_channel_output_webhook.py b/tests/units/test_channel_output_webhook.py index 853515ee3..03e86f993 100644 --- a/tests/units/test_channel_output_webhook.py +++ b/tests/units/test_channel_output_webhook.py @@ -5,13 +5,11 @@ from canarytokens.canarydrop import Canarydrop from canarytokens.channel import ( - format_as_googlechat_canaryalert, format_as_ms_teams_canaryalert, ) from canarytokens.channel_dns import ChannelDNS from canarytokens.channel_output_webhook import WebhookOutputChannel from canarytokens.models import ( - TokenAlertDetailsGoogleChat, TokenAlertDetailsMsTeams, TokenTypes, ) @@ -19,7 +17,12 @@ from canarytokens.switchboard import Switchboard from canarytokens.tokens import Canarytoken from canarytokens.constants import CANARY_IMAGE_URL -from canarytokens.webhook_formatting import format_details_for_webhook, get_webhook_type +from canarytokens.webhook_formatting import ( + WebhookType, + format_details_for_webhook, + get_webhook_type, + TokenAlertDetailsGoogleChat, +) def test_broken_webhook( @@ -153,7 +156,7 @@ def test_googlechat_webhook_format( host=settings.PUBLIC_DOMAIN, ) print("Webhook details = {}".format(details)) - webhook_payload = format_as_googlechat_canaryalert(details=details) + webhook_payload = format_details_for_webhook(WebhookType.GOOGLE_CHAT, details) webhook_payload_json = webhook_payload.json_safe_dict() print("Webhook_payload json = {}".format(webhook_payload.json())) diff --git a/tests/units/test_webhook_formatting.py b/tests/units/test_webhook_formatting.py index ebd607ff7..6bd6eb81b 100644 --- a/tests/units/test_webhook_formatting.py +++ b/tests/units/test_webhook_formatting.py @@ -8,6 +8,7 @@ format_details_for_webhook, get_webhook_type, TokenAlertDetailGeneric, + TokenAlertDetailsGoogleChat, TokenExposedDetailGeneric, ) @@ -45,6 +46,8 @@ def test_get_webhook_type(url: str, expected_type: WebhookType): ("exposed", WebhookType.GENERIC, TokenExposedDetailGeneric), ("alert", WebhookType.SLACK, TokenAlertDetailsSlack), ("exposed", WebhookType.SLACK, TokenAlertDetailsSlack), + ("alert", WebhookType.GOOGLE_CHAT, TokenAlertDetailsGoogleChat), + ("exposed", WebhookType.GOOGLE_CHAT, TokenAlertDetailsGoogleChat), ], ) def test_format_details_for_webhook_alert_type(