Skip to content

Commit

Permalink
Add Google Chat webhook exposed key handling (#606)
Browse files Browse the repository at this point in the history
  • Loading branch information
gjcthinkst authored Nov 20, 2024
1 parent f221b26 commit 4aef1db
Show file tree
Hide file tree
Showing 6 changed files with 268 additions and 152 deletions.
42 changes: 0 additions & 42 deletions canarytokens/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,6 @@
from canarytokens.models import (
AnyTokenHit,
AnyTokenExposedHit,
GoogleChatAlertDetailsSectionData,
GoogleChatCard,
GoogleChatCardV2,
GoogleChatHeader,
GoogleChatSection,
Memo,
DiscordDetails,
DiscordEmbeds,
Expand All @@ -31,50 +26,13 @@
MsTeamsDetailsSection,
MsTeamsPotentialAction,
TokenAlertDetails,
TokenAlertDetailsGoogleChat,
TokenAlertDetailsDiscord,
TokenAlertDetailsMsTeams,
)

log = Logger()


def format_as_googlechat_canaryalert(
details: TokenAlertDetails,
) -> TokenAlertDetailsGoogleChat:
# construct google chat alert , top section
top_section = GoogleChatSection(header="Alert Details")
top_section.add_widgets(
widgets_info=GoogleChatAlertDetailsSectionData(
channel=details.channel,
time=details.time.strftime("%Y-%m-%d %H:%M:%S (UTC)"),
canarytoken=details.token,
token_reminder=details.memo,
manage_url=details.manage_url,
).get_googlechat_data()
)
# construct google chat alert , additional section
additional_section = GoogleChatSection(header="Additional Details")
if details.src_data:
additional_section.add_widgets(widgets_info=details.src_data)
if details.additional_data:
additional_section.add_widgets(widgets_info=details.additional_data)

# construct google chat alert card
card = GoogleChatCard(
header=GoogleChatHeader(
title="Canarytoken Triggered",
imageUrl="https://s3-eu-west-1.amazonaws.com/email-images.canary.tools/canary-logo-round.png",
imageType="CIRCLE",
imageAltText="Thinkst Canary",
),
sections=[top_section, additional_section],
)
# make google chat payload
return TokenAlertDetailsGoogleChat(
cardsV2=[GoogleChatCardV2(cardId="unique-card-id", card=card)]
)


def format_as_discord_canaryalert(
details: TokenAlertDetails,
Expand Down
82 changes: 0 additions & 82 deletions canarytokens/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2139,82 +2139,6 @@ class Config:
}


class GoogleChatDecoratedText(BaseModel):
topLabel: str = ""
text: str = ""


class GoogleChatWidget(BaseModel):
decoratedText: GoogleChatDecoratedText


class GoogleChatAlertDetailsSectionData(BaseModel):
channel: str = ""
time: datetime
canarytoken: Canarytoken
token_reminder: Memo
manage_url: HttpUrl

@validator("time", pre=True)
def validate_time(cls, value):
if isinstance(value, str):
return datetime.strptime(value, "%Y-%m-%d %H:%M:%S (UTC)")
return value

def get_googlechat_data(self) -> Dict[str, str]:
data = json_safe_dict(self)
data["Channel"] = data.pop("channel", "")
data["Time"] = data.pop("time", "")
data["Canarytoken"] = data.pop("canarytoken", "")
data["Token Reminder"] = data.pop("token_reminder", "")
data["Manage URL"] = '<a href="{manage_url}">{manage_url}</a>'.format(
manage_url=data.pop("manage_url", "")
)
return data

class Config:
json_encoders = {
datetime: lambda v: v.strftime("%Y-%m-%d %H:%M:%S (UTC)"),
}


class GoogleChatHeader(BaseModel):
title: str = "Canarytoken Triggered"
imageUrl: HttpUrl
imageType: str = "CIRCLE"
imageAltText: str = "Thinkst Canary"


class GoogleChatSection(BaseModel):
header: str = ""
collapsible: bool = False
widgets: List[GoogleChatWidget] = []

def add_widgets(self, widgets_info: Optional[Dict[str, str]] = {}) -> None:
for label, text in widgets_info.items():
if not label or not text:
continue
message_text = (
json.dumps(text) if isinstance(text, dict) else "{}".format(text)
)
self.widgets.append(
GoogleChatWidget(
decoratedText=GoogleChatDecoratedText(
topLabel=label, text=message_text
)
)
)


class GoogleChatCard(BaseModel):
header: GoogleChatHeader
sections: List[GoogleChatSection] = []


class GoogleChatCardV2(BaseModel):
cardId: str = "unique-card-id"
card: GoogleChatCard


class DiscordFieldEntry(BaseModel):
name: str = ""
Expand Down Expand Up @@ -2276,12 +2200,6 @@ class Config:
}


class TokenAlertDetailsGoogleChat(BaseModel):
cardsV2: List[GoogleChatCardV2]

def json_safe_dict(self) -> Dict[str, str]:
return json_safe_dict(self)


class MsTeamsDetailsSection(BaseModel):
canarytoken: Canarytoken
Expand Down
17 changes: 1 addition & 16 deletions canarytokens/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -877,22 +877,7 @@ def validate_webhook(url, token_type: models.TokenTypes):
raise WebhookTooLongError()

webhook_type = get_webhook_type(url)
if webhook_type == WebhookType.GOOGLE_CHAT:
# construct google chat alert card
card = models.GoogleChatCard(
header=models.GoogleChatHeader(
title="Validating new canarytokens webhook",
imageUrl="https://s3-eu-west-1.amazonaws.com/email-images.canary.tools/canary-logo-round.png",
imageType="CIRCLE",
imageAltText="Thinkst Canary",
),
sections=[],
)
# make google chat payload
payload = models.TokenAlertDetailsGoogleChat(
cardsV2=[models.GoogleChatCardV2(cardId="unique-card-id", card=card)]
)
elif webhook_type == WebhookType.DISCORD:
if webhook_type == WebhookType.DISCORD:
# construct discord alert card
embeds = models.DiscordEmbeds(
author=models.DiscordAuthorField(
Expand Down
Loading

0 comments on commit 4aef1db

Please sign in to comment.