Skip to content

Commit

Permalink
fix: improve Slack rate limiting logic when updating alert groups (#5287
Browse files Browse the repository at this point in the history
)

# What this PR does

https://www.loom.com/share/1ac33822301444748133ffe72638ddc4

The two asks in the [original GH
issue](grafana/oncall-private#2947) were:

> 1. Make the error message clearer. We can identify if it's delivering
or updating and being rate-limited. This is possible because Slack sets
limits per API method. Also, this limit is a per-slack channel while we
are posting messages & applying ratelimit per on-call integration, which
confuses customers.
> 2. Debounce update alert group message in Slack

Both of these have been addressed in this PR

## Which issue(s) this PR closes

Closes grafana/oncall-private#2947

## Checklist

- [x] Unit, integration, and e2e (if applicable) tests updated
- [x] Documentation added (or `pr:no public docs` PR label added if not
required)
- [x] Added the relevant release notes label (see labels prefixed w/
`release:`). These labels dictate how your PR will
    show up in the autogenerated release notes.
  • Loading branch information
joeyorlando authored Dec 2, 2024
1 parent fa071bc commit 26946f0
Show file tree
Hide file tree
Showing 16 changed files with 975 additions and 187 deletions.
8 changes: 5 additions & 3 deletions engine/apps/alerts/models/alert_receive_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
metrics_remove_deleted_integration_from_cache,
metrics_update_integration_cache,
)
from apps.slack.constants import SLACK_RATE_LIMIT_DELAY, SLACK_RATE_LIMIT_TIMEOUT
from apps.slack.constants import SLACK_RATE_LIMIT_TIMEOUT
from apps.slack.tasks import post_slack_rate_limit_message
from apps.slack.utils import post_message_to_channel
from common.api_helpers.utils import create_engine_url
Expand Down Expand Up @@ -442,12 +442,14 @@ def is_rate_limited_in_slack(self) -> bool:
and self.rate_limited_in_slack_at + SLACK_RATE_LIMIT_TIMEOUT > timezone.now()
)

def start_send_rate_limit_message_task(self, delay=SLACK_RATE_LIMIT_DELAY):
def start_send_rate_limit_message_task(self, error_message_verb: str, delay: int) -> None:
task_id = celery_uuid()

self.rate_limit_message_task_id = task_id
self.rate_limited_in_slack_at = timezone.now()
self.save(update_fields=["rate_limit_message_task_id", "rate_limited_in_slack_at"])
post_slack_rate_limit_message.apply_async((self.pk,), countdown=delay, task_id=task_id)

post_slack_rate_limit_message.apply_async((self.pk, error_message_verb), countdown=delay, task_id=task_id)

@property
def alert_groups_count(self) -> int:
Expand Down
39 changes: 3 additions & 36 deletions engine/apps/slack/alert_group_slack_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,9 @@

from apps.slack.client import SlackClient
from apps.slack.errors import (
SlackAPICantUpdateMessageError,
SlackAPIChannelArchivedError,
SlackAPIChannelInactiveError,
SlackAPIChannelNotFoundError,
SlackAPIInvalidAuthError,
SlackAPIMessageNotFoundError,
SlackAPIRatelimitError,
SlackAPITokenError,
)

Expand All @@ -34,41 +30,12 @@ def __init__(
else:
self._slack_client = SlackClient(slack_team_identity)

def update_alert_group_slack_message(self, alert_group: "AlertGroup") -> None:
logger.info(f"Update message for alert_group {alert_group.pk}")

try:
self._slack_client.chat_update(
# TODO: once _channel_id has been fully migrated to channel, remove _channel_id
# see https://raintank-corp.slack.com/archives/C06K1MQ07GS/p173255546
# channel=alert_group.slack_message.channel.slack_id,
channel=alert_group.slack_message._channel_id,
ts=alert_group.slack_message.slack_id,
attachments=alert_group.render_slack_attachments(),
blocks=alert_group.render_slack_blocks(),
)
logger.info(f"Message has been updated for alert_group {alert_group.pk}")
except SlackAPIRatelimitError as e:
if not alert_group.channel.is_maintenace_integration:
if not alert_group.channel.is_rate_limited_in_slack:
alert_group.channel.start_send_rate_limit_message_task(e.retry_after)
logger.info(
f"Message has not been updated for alert_group {alert_group.pk} due to slack rate limit."
)
else:
raise
except (
SlackAPIMessageNotFoundError,
SlackAPICantUpdateMessageError,
SlackAPIChannelInactiveError,
SlackAPITokenError,
SlackAPIChannelNotFoundError,
):
pass

def publish_message_to_alert_group_thread(
self, alert_group: "AlertGroup", attachments=None, mrkdwn=True, unfurl_links=True, text=None
) -> None:
"""
TODO: refactor this method and move it to the `SlackMessage` model, such that we can remove this class..
"""
# TODO: refactor checking the possibility of sending message to slack
# do not try to post message to slack if integration is rate limited
if alert_group.channel.is_rate_limited_in_slack:
Expand Down
1 change: 0 additions & 1 deletion engine/apps/slack/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

SLACK_RATE_LIMIT_TIMEOUT = datetime.timedelta(minutes=5)
SLACK_RATE_LIMIT_DELAY = 10
CACHE_UPDATE_INCIDENT_SLACK_MESSAGE_LIFETIME = 60 * 10

BLOCK_SECTION_TEXT_MAX_SIZE = 2800
PRIVATE_METADATA_MAX_LENGTH = 3000
Expand Down
92 changes: 91 additions & 1 deletion engine/apps/slack/models/slack_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
import typing
import uuid

from celery import uuid as celery_uuid
from django.core.cache import cache
from django.db import models
from django.utils import timezone

from apps.slack.client import SlackClient
from apps.slack.constants import BLOCK_SECTION_TEXT_MAX_SIZE
Expand All @@ -15,6 +18,7 @@
SlackAPIRatelimitError,
SlackAPITokenError,
)
from apps.slack.tasks import update_alert_group_slack_message

if typing.TYPE_CHECKING:
from apps.alerts.models import AlertGroup
Expand All @@ -30,6 +34,8 @@ class SlackMessage(models.Model):
alert_group: typing.Optional["AlertGroup"]
channel: "SlackChannel"

ALERT_GROUP_UPDATE_DEBOUNCE_INTERVAL_SECONDS = 45

id = models.CharField(primary_key=True, default=uuid.uuid4, editable=False, max_length=36)
slack_id = models.CharField(max_length=100)

Expand Down Expand Up @@ -85,7 +91,7 @@ class SlackMessage(models.Model):

active_update_task_id = models.CharField(max_length=100, null=True, default=None)
"""
ID of the latest celery task to update the message
DEPRECATED/TODO: drop this field in a separate PR/release
"""

class Meta:
Expand Down Expand Up @@ -259,3 +265,87 @@ def send_slack_notification(
slack_user_identity.send_link_to_slack_message(slack_message)
except (SlackAPITokenError, SlackAPIMethodNotSupportedForChannelTypeError):
pass

def _get_update_message_cache_key(self) -> str:
return f"update_alert_group_slack_message_{self.alert_group.pk}"

def get_active_update_task_id(self) -> typing.Optional[str]:
return cache.get(self._get_update_message_cache_key(), default=None)

def set_active_update_task_id(self, task_id: str) -> None:
"""
NOTE: we store the task ID in the cache for twice the debounce interval to ensure that the task ID is
EVENTUALLY removed. The background task which updates the message will remove the task ID from the cache, but
this is a safety measure in case the task fails to run or complete. The task ID would be removed from the cache
which would then allow the message to be updated again in a subsequent call to this method.
"""
cache.set(
self._get_update_message_cache_key(),
task_id,
timeout=self.ALERT_GROUP_UPDATE_DEBOUNCE_INTERVAL_SECONDS * 2,
)

def mark_active_update_task_as_complete(self) -> None:
self.last_updated = timezone.now()
self.save(update_fields=["last_updated"])

cache.delete(self._get_update_message_cache_key())

def update_alert_groups_message(self, debounce: bool) -> None:
"""
Schedule an update task for the associated alert group's Slack message, respecting the debounce interval.
This method ensures that updates to the Slack message related to an alert group are not performed
too frequently, adhering to the `ALERT_GROUP_UPDATE_DEBOUNCE_INTERVAL_SECONDS` debounce interval.
It schedules a background task to update the message after the appropriate countdown.
The method performs the following steps:
- Checks if there's already an active update task ID set in the cache. If so, exits to prevent
duplicate scheduling.
- Calculates the time since the last update (`last_updated` field) and determines the remaining time needed
to respect the debounce interval.
- Schedules the `update_alert_group_slack_message` task with the calculated countdown.
- Stores the task ID in the cache to prevent multiple tasks from being scheduled.
debounce: bool - this is intended to be used when we want to debounce updates to the message. Examples:
- when set to True, we will skip scheduling an update task if there's an active update task (eg. debounce it)
- when set to False, we will immediately schedule an update task
"""
if not self.alert_group:
logger.warning(
f"skipping update_alert_groups_message as SlackMessage {self.pk} has no alert_group associated with it"
)
return

active_update_task_id = self.get_active_update_task_id()
if debounce and active_update_task_id is not None:
logger.info(
f"skipping update_alert_groups_message as SlackMessage {self.pk} has an active update task "
f"{active_update_task_id} and debounce is set to True"
)
return

now = timezone.now()

# we previously weren't updating the last_updated field for messages, so there will be cases
# where the last_updated field is None
last_updated = self.last_updated or now

time_since_last_update = (now - last_updated).total_seconds()
remaining_time = self.ALERT_GROUP_UPDATE_DEBOUNCE_INTERVAL_SECONDS - int(time_since_last_update)
countdown = max(remaining_time, 10) if debounce else 0

logger.info(
f"updating message for alert_group {self.alert_group.pk} in {countdown} seconds "
f"(debounce interval: {self.ALERT_GROUP_UPDATE_DEBOUNCE_INTERVAL_SECONDS})"
)

task_id = celery_uuid()

# NOTE: we need to persist the task ID in the cache before scheduling the task to prevent
# a race condition where the task starts before the task ID is stored in the cache as the task
# does a check to verify that the celery task id matches the one stored in the cache
#
# (see update_alert_group_slack_message task for more details)
self.set_active_update_task_id(task_id)
update_alert_group_slack_message.apply_async((self.pk,), countdown=countdown, task_id=task_id)
Loading

0 comments on commit 26946f0

Please sign in to comment.