diff --git a/canarytokens/channel.py b/canarytokens/channel.py
index 655e76e60..c4b15fe81 100644
--- a/canarytokens/channel.py
+++ b/canarytokens/channel.py
@@ -18,11 +18,6 @@
from canarytokens.models import (
AnyTokenHit,
AnyTokenExposedHit,
- GoogleChatAlertDetailsSectionData,
- GoogleChatCard,
- GoogleChatCardV2,
- GoogleChatHeader,
- GoogleChatSection,
Memo,
DiscordDetails,
DiscordEmbeds,
@@ -31,7 +26,6 @@
MsTeamsDetailsSection,
MsTeamsPotentialAction,
TokenAlertDetails,
- TokenAlertDetailsGoogleChat,
TokenAlertDetailsDiscord,
TokenAlertDetailsMsTeams,
)
@@ -39,42 +33,6 @@
log = Logger()
-def format_as_googlechat_canaryalert(
- details: TokenAlertDetails,
-) -> TokenAlertDetailsGoogleChat:
- # construct google chat alert , top section
- top_section = GoogleChatSection(header="Alert Details")
- top_section.add_widgets(
- widgets_info=GoogleChatAlertDetailsSectionData(
- channel=details.channel,
- time=details.time.strftime("%Y-%m-%d %H:%M:%S (UTC)"),
- canarytoken=details.token,
- token_reminder=details.memo,
- manage_url=details.manage_url,
- ).get_googlechat_data()
- )
- # construct google chat alert , additional section
- additional_section = GoogleChatSection(header="Additional Details")
- if details.src_data:
- additional_section.add_widgets(widgets_info=details.src_data)
- if details.additional_data:
- additional_section.add_widgets(widgets_info=details.additional_data)
-
- # construct google chat alert card
- card = GoogleChatCard(
- header=GoogleChatHeader(
- title="Canarytoken Triggered",
- imageUrl="https://s3-eu-west-1.amazonaws.com/email-images.canary.tools/canary-logo-round.png",
- imageType="CIRCLE",
- imageAltText="Thinkst Canary",
- ),
- sections=[top_section, additional_section],
- )
- # make google chat payload
- return TokenAlertDetailsGoogleChat(
- cardsV2=[GoogleChatCardV2(cardId="unique-card-id", card=card)]
- )
-
def format_as_discord_canaryalert(
details: TokenAlertDetails,
diff --git a/canarytokens/models.py b/canarytokens/models.py
index 05b12b8b4..663efaa05 100644
--- a/canarytokens/models.py
+++ b/canarytokens/models.py
@@ -2139,82 +2139,6 @@ class Config:
}
-class GoogleChatDecoratedText(BaseModel):
- topLabel: str = ""
- text: str = ""
-
-
-class GoogleChatWidget(BaseModel):
- decoratedText: GoogleChatDecoratedText
-
-
-class GoogleChatAlertDetailsSectionData(BaseModel):
- channel: str = ""
- time: datetime
- canarytoken: Canarytoken
- token_reminder: Memo
- manage_url: HttpUrl
-
- @validator("time", pre=True)
- def validate_time(cls, value):
- if isinstance(value, str):
- return datetime.strptime(value, "%Y-%m-%d %H:%M:%S (UTC)")
- return value
-
- def get_googlechat_data(self) -> Dict[str, str]:
- data = json_safe_dict(self)
- data["Channel"] = data.pop("channel", "")
- data["Time"] = data.pop("time", "")
- data["Canarytoken"] = data.pop("canarytoken", "")
- data["Token Reminder"] = data.pop("token_reminder", "")
- data["Manage URL"] = '{manage_url}'.format(
- manage_url=data.pop("manage_url", "")
- )
- return data
-
- class Config:
- json_encoders = {
- datetime: lambda v: v.strftime("%Y-%m-%d %H:%M:%S (UTC)"),
- }
-
-
-class GoogleChatHeader(BaseModel):
- title: str = "Canarytoken Triggered"
- imageUrl: HttpUrl
- imageType: str = "CIRCLE"
- imageAltText: str = "Thinkst Canary"
-
-
-class GoogleChatSection(BaseModel):
- header: str = ""
- collapsible: bool = False
- widgets: List[GoogleChatWidget] = []
-
- def add_widgets(self, widgets_info: Optional[Dict[str, str]] = {}) -> None:
- for label, text in widgets_info.items():
- if not label or not text:
- continue
- message_text = (
- json.dumps(text) if isinstance(text, dict) else "{}".format(text)
- )
- self.widgets.append(
- GoogleChatWidget(
- decoratedText=GoogleChatDecoratedText(
- topLabel=label, text=message_text
- )
- )
- )
-
-
-class GoogleChatCard(BaseModel):
- header: GoogleChatHeader
- sections: List[GoogleChatSection] = []
-
-
-class GoogleChatCardV2(BaseModel):
- cardId: str = "unique-card-id"
- card: GoogleChatCard
-
class DiscordFieldEntry(BaseModel):
name: str = ""
@@ -2276,12 +2200,6 @@ class Config:
}
-class TokenAlertDetailsGoogleChat(BaseModel):
- cardsV2: List[GoogleChatCardV2]
-
- def json_safe_dict(self) -> Dict[str, str]:
- return json_safe_dict(self)
-
class MsTeamsDetailsSection(BaseModel):
canarytoken: Canarytoken
diff --git a/canarytokens/queries.py b/canarytokens/queries.py
index 218ee96e7..4411a3a47 100644
--- a/canarytokens/queries.py
+++ b/canarytokens/queries.py
@@ -877,22 +877,7 @@ def validate_webhook(url, token_type: models.TokenTypes):
raise WebhookTooLongError()
webhook_type = get_webhook_type(url)
- if webhook_type == WebhookType.GOOGLE_CHAT:
- # construct google chat alert card
- card = models.GoogleChatCard(
- header=models.GoogleChatHeader(
- title="Validating new canarytokens webhook",
- imageUrl="https://s3-eu-west-1.amazonaws.com/email-images.canary.tools/canary-logo-round.png",
- imageType="CIRCLE",
- imageAltText="Thinkst Canary",
- ),
- sections=[],
- )
- # make google chat payload
- payload = models.TokenAlertDetailsGoogleChat(
- cardsV2=[models.GoogleChatCardV2(cardId="unique-card-id", card=card)]
- )
- elif webhook_type == WebhookType.DISCORD:
+ if webhook_type == WebhookType.DISCORD:
# construct discord alert card
embeds = models.DiscordEmbeds(
author=models.DiscordAuthorField(
diff --git a/canarytokens/webhook_formatting.py b/canarytokens/webhook_formatting.py
index d5d4f7bb6..93b7f841b 100644
--- a/canarytokens/webhook_formatting.py
+++ b/canarytokens/webhook_formatting.py
@@ -1,6 +1,6 @@
from __future__ import annotations
import json
-from typing import Union, Literal
+from typing import Optional, Union, Literal
from enum import Enum
import re
from functools import partial
@@ -11,7 +11,6 @@
from canarytokens import constants
from canarytokens.channel import (
format_as_discord_canaryalert,
- format_as_googlechat_canaryalert,
format_as_ms_teams_canaryalert,
)
from canarytokens.models import (
@@ -102,7 +101,7 @@ def _format_alert_details_for_webhook(
if webhook_type == WebhookType.SLACK:
return _format_as_slack_canaryalert(details)
elif webhook_type == WebhookType.GOOGLE_CHAT:
- return format_as_googlechat_canaryalert(details)
+ return _format_as_googlechat_canaryalert(details)
elif webhook_type == WebhookType.DISCORD:
return format_as_discord_canaryalert(details)
elif webhook_type == WebhookType.MS_TEAMS:
@@ -121,9 +120,7 @@ def _format_exposed_details_for_webhook(
if webhook_type == WebhookType.SLACK:
return _format_as_slack_token_exposed(details)
elif webhook_type == WebhookType.GOOGLE_CHAT:
- raise NotImplementedError(
- f"_format_exposed_details_for_webhook not implemented for webhook type: {webhook_type}"
- )
+ return _format_as_googlechat_token_exposed(details)
elif webhook_type == WebhookType.DISCORD:
raise NotImplementedError(
f"_format_exposed_details_for_webhook not implemented for webhook type: {webhook_type}"
@@ -153,8 +150,17 @@ def generate_webhook_test_payload(webhook_type: WebhookType, token_type: TokenTy
]
)
elif webhook_type == WebhookType.GOOGLE_CHAT:
- raise NotImplementedError(
- "generate_webhook_test_payload not implemented for GOOGLE_CHAT"
+ card = GoogleChatCard(
+ header=GoogleChatHeader(
+ title="Validating new Canarytokens webhook",
+ imageUrl=CANARY_LOGO_ROUND_PUBLIC_URL,
+ imageType="CIRCLE",
+ imageAltText="Thinkst Canary",
+ ),
+ sections=[],
+ )
+ return TokenAlertDetailsGoogleChat(
+ cardsV2=[GoogleChatCardV2(cardId="unique-card-id", card=card)]
)
elif webhook_type == WebhookType.DISCORD:
raise NotImplementedError(
@@ -391,6 +397,249 @@ def json_safe_dict(self) -> dict[str, str]:
return json_safe_dict(self)
+def _format_as_googlechat_canaryalert(
+ details: TokenAlertDetails,
+) -> TokenAlertDetailsGoogleChat:
+ # construct google chat alert , top section
+ top_section = GoogleChatSection(
+ header="Alert Details",
+ widgets=_data_to_googlechat_text_widgets(
+ {
+ "channel": details.channel,
+ "time": details.time.strftime("%Y-%m-%d %H:%M:%S (UTC)"),
+ "canarytoken": details.token,
+ "token_reminder": details.memo,
+ }
+ ),
+ )
+
+ # construct google chat alert , additional section
+ additional_widgets: list[GoogleChatTextWithTopLabel] = []
+ if details.src_data:
+ additional_widgets.extend(_data_to_googlechat_text_widgets(details.src_data))
+ if details.additional_data:
+ additional_widgets.extend(
+ _data_to_googlechat_text_widgets(details.additional_data)
+ )
+
+ additional_widgets.append(
+ GoogleChatButtonList(
+ buttons=[
+ GoogleChatButton(
+ text="Manage token",
+ url=details.manage_url,
+ material_icon="settings",
+ )
+ ]
+ )
+ )
+ additional_section = GoogleChatSection(
+ header="Additional Details", widgets=additional_widgets
+ )
+
+ # construct google chat alert card
+ card = GoogleChatCard(
+ header=GoogleChatHeader(
+ title="Canarytoken Triggered",
+ imageUrl=CANARY_LOGO_ROUND_PUBLIC_URL,
+ imageType="CIRCLE",
+ imageAltText="Thinkst Canary",
+ ),
+ sections=[top_section, additional_section],
+ )
+ # make google chat payload
+ return TokenAlertDetailsGoogleChat(
+ cardsV2=[GoogleChatCardV2(cardId="unique-card-id", card=card)]
+ )
+
+
+def _format_as_googlechat_token_exposed(
+ details: TokenExposedDetails,
+) -> TokenAlertDetailsGoogleChat:
+ card = GoogleChatCardV2(
+ cardId="unique-card-id",
+ card=GoogleChatCard(
+ header=GoogleChatHeader(
+ title="Canarytoken Exposed",
+ imageUrl=CANARY_LOGO_ROUND_PUBLIC_URL,
+ imageType="CIRCLE",
+ imageAltText="Thinkst Canary",
+ ),
+ sections=[
+ GoogleChatSection(
+ widgets=[
+ GoogleChatParagraph(
+ text=_get_exposed_token_description(details.token_type)
+ )
+ ]
+ ),
+ GoogleChatSection(
+ header="Exposure Details",
+ widgets=[
+ GoogleChatColumns(
+ column_items=[
+ GoogleChatColumnItems(
+ widgets=[
+ GoogleChatTextWithTopLabel(
+ top_label="Key ID", text=details.key_id
+ ),
+ GoogleChatTextWithTopLabel(
+ top_label="Key exposed here",
+ text=f'{details.public_location}',
+ ),
+ ]
+ ),
+ GoogleChatColumnItems(
+ widgets=[
+ GoogleChatTextWithTopLabel(
+ top_label="Token Reminder",
+ text=details.memo,
+ ),
+ GoogleChatTextWithTopLabel(
+ top_label="Key exposed at",
+ text=details.exposed_time.strftime(
+ "%Y-%m-%d %H:%M:%S (UTC)"
+ ),
+ ),
+ ]
+ ),
+ ]
+ ),
+ GoogleChatButtonList(
+ buttons=[
+ GoogleChatButton(
+ text="Manage token",
+ url=details.manage_url,
+ material_icon="settings",
+ )
+ ]
+ ),
+ ],
+ ),
+ ],
+ ),
+ )
+
+ return TokenAlertDetailsGoogleChat(cardsV2=[card])
+
+
+def _data_to_googlechat_text_widgets(
+ data: dict[str, str]
+) -> list[GoogleChatTextWithTopLabel]:
+ widgets: list[GoogleChatTextWithTopLabel] = []
+ for label, text in data.items():
+ if not label or not text:
+ continue
+
+ message_text = json.dumps(text) if isinstance(text, dict) else "{}".format(text)
+ widgets.append(
+ GoogleChatTextWithTopLabel(
+ text=message_text, top_label=prettify_snake_case(label)
+ )
+ )
+
+ return widgets
+
+
+class GoogleChatWidget(BaseModel):
+ ...
+
+
+class GoogleChatParagraph(GoogleChatWidget):
+ text: str
+
+ def dict(self, *args, **kwargs):
+ return {"textParagraph": {"text": self.text}}
+
+
+class GoogleChatTextWithTopLabel(GoogleChatWidget):
+ text: str
+ top_label: Optional[str] = None
+
+ def dict(self, *args, **kwargs):
+ d = {"decoratedText": {"text": self.text}}
+ if self.top_label:
+ d["decoratedText"]["topLabel"] = self.top_label
+
+ return d
+
+
+class GoogleChatButton(BaseModel):
+ text: str
+ url: HttpUrl
+ material_icon: Optional[str] = None
+ alt_text: Optional[str] = None
+
+ def dict(self, *args, **kwargs):
+ d = {"text": self.text, "onClick": {"openLink": {"url": self.url}}}
+ if self.material_icon:
+ d["icon"] = {
+ "materialIcon": {"name": self.material_icon},
+ "altText": self.alt_text or self.text.lower(),
+ }
+
+ return d
+
+
+class GoogleChatButtonList(GoogleChatWidget):
+ buttons: list[GoogleChatButton]
+
+ def dict(self, *args, **kwargs):
+ return {"buttonList": {"buttons": [button.dict() for button in self.buttons]}}
+
+
+class GoogleChatColumnItems(BaseModel):
+ widgets: list[GoogleChatWidget]
+ horizontalSizeStyle: str = "FILL_MINIMUM_SPACE"
+ horizontalAlignment: str = "START"
+ verticalAlignment: str = "CENTER"
+
+ def dict(self, *args, **kwargs):
+ return {
+ "horizontalSizeStyle": self.horizontalSizeStyle,
+ "horizontalAlignment": self.horizontalAlignment,
+ "verticalAlignment": self.verticalAlignment,
+ "widgets": [widget.dict() for widget in self.widgets],
+ }
+
+
+class GoogleChatColumns(GoogleChatWidget):
+ column_items: list[GoogleChatColumnItems]
+
+ def dict(self, *args, **kwargs):
+ return {"columns": {"columnItems": [ci.dict() for ci in self.column_items]}}
+
+
+class GoogleChatHeader(BaseModel):
+ title: str = "Canarytoken Triggered"
+ imageUrl: HttpUrl
+ imageType: str = "CIRCLE"
+ imageAltText: str = "Thinkst Canary"
+
+
+class GoogleChatSection(BaseModel):
+ header: str = ""
+ collapsible: bool = False
+ widgets: list[GoogleChatWidget] = []
+
+
+class GoogleChatCard(BaseModel):
+ header: GoogleChatHeader
+ sections: list[GoogleChatSection] = []
+
+
+class GoogleChatCardV2(BaseModel):
+ cardId: str = "unique-card-id"
+ card: GoogleChatCard
+
+
+class TokenAlertDetailsGoogleChat(BaseModel):
+ cardsV2: list[GoogleChatCardV2]
+
+ def json_safe_dict(self) -> dict[str, str]:
+ return json_safe_dict(self)
+
+
class TokenAlertDetailGeneric(TokenAlertDetails):
...
diff --git a/tests/units/test_channel_output_webhook.py b/tests/units/test_channel_output_webhook.py
index 853515ee3..03e86f993 100644
--- a/tests/units/test_channel_output_webhook.py
+++ b/tests/units/test_channel_output_webhook.py
@@ -5,13 +5,11 @@
from canarytokens.canarydrop import Canarydrop
from canarytokens.channel import (
- format_as_googlechat_canaryalert,
format_as_ms_teams_canaryalert,
)
from canarytokens.channel_dns import ChannelDNS
from canarytokens.channel_output_webhook import WebhookOutputChannel
from canarytokens.models import (
- TokenAlertDetailsGoogleChat,
TokenAlertDetailsMsTeams,
TokenTypes,
)
@@ -19,7 +17,12 @@
from canarytokens.switchboard import Switchboard
from canarytokens.tokens import Canarytoken
from canarytokens.constants import CANARY_IMAGE_URL
-from canarytokens.webhook_formatting import format_details_for_webhook, get_webhook_type
+from canarytokens.webhook_formatting import (
+ WebhookType,
+ format_details_for_webhook,
+ get_webhook_type,
+ TokenAlertDetailsGoogleChat,
+)
def test_broken_webhook(
@@ -153,7 +156,7 @@ def test_googlechat_webhook_format(
host=settings.PUBLIC_DOMAIN,
)
print("Webhook details = {}".format(details))
- webhook_payload = format_as_googlechat_canaryalert(details=details)
+ webhook_payload = format_details_for_webhook(WebhookType.GOOGLE_CHAT, details)
webhook_payload_json = webhook_payload.json_safe_dict()
print("Webhook_payload json = {}".format(webhook_payload.json()))
diff --git a/tests/units/test_webhook_formatting.py b/tests/units/test_webhook_formatting.py
index ebd607ff7..6bd6eb81b 100644
--- a/tests/units/test_webhook_formatting.py
+++ b/tests/units/test_webhook_formatting.py
@@ -8,6 +8,7 @@
format_details_for_webhook,
get_webhook_type,
TokenAlertDetailGeneric,
+ TokenAlertDetailsGoogleChat,
TokenExposedDetailGeneric,
)
@@ -45,6 +46,8 @@ def test_get_webhook_type(url: str, expected_type: WebhookType):
("exposed", WebhookType.GENERIC, TokenExposedDetailGeneric),
("alert", WebhookType.SLACK, TokenAlertDetailsSlack),
("exposed", WebhookType.SLACK, TokenAlertDetailsSlack),
+ ("alert", WebhookType.GOOGLE_CHAT, TokenAlertDetailsGoogleChat),
+ ("exposed", WebhookType.GOOGLE_CHAT, TokenAlertDetailsGoogleChat),
],
)
def test_format_details_for_webhook_alert_type(