Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ART-11306] Make quay-doomsday-backup async and send its results to Slack #1278

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 61 additions & 25 deletions pyartcd/pyartcd/pipelines/quay_doomsday_backup.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import os
from typing import Optional
import click
from time import sleep
import shutil
import asyncio
from tenacity import AsyncRetrying, stop_after_attempt

from pyartcd.runtime import Runtime
from pyartcd.cli import cli, pass_runtime
from artcommonlib.exectools import cmd_assert
from pyartcd.cli import cli, pass_runtime, click_coroutine
from artcommonlib.exectools import cmd_assert_async
from doozerlib.util import mkdirs


Expand All @@ -23,10 +24,12 @@ def __init__(self, runtime: Runtime, version: str, arches: Optional[str]):
self.runtime = runtime
self.version = version
self.workdir = "./workspace"
self.slack_client = self.runtime.new_slack_client()
self.slack_client.bind_channel(version)

self.arches = arches.split(",") if arches else ALL_ARCHES_LIST

def sync_arch(self, arch: str):
async def sync_arch(self, arch: str) -> bool:
if arch not in ALL_ARCHES_LIST:
raise Exception(f"Invalid arch: {arch}")

Expand All @@ -44,35 +47,68 @@ def sync_arch(self, arch: str):
f"s3://ocp-doomsday-registry/release-image/{path}"
]

self.runtime.logger.info("Running mirror command: %s", mirror_cmd)
cmd_assert(mirror_cmd, retries=N_RETRIES)
self.runtime.logger.info("Mirror command ran successfully")
if self.runtime.dry_run:
self.runtime.logger.info("[DRY RUN] Would have run %s", " ".join(aws_cmd))
# Setup tenacity retry behavior for calling mirror_cmd and aws_cmd
# because cmd_assert_async does not have retry logic
retry = AsyncRetrying(reraise=True, stop=stop_after_attempt(N_RETRIES))
try:
self.runtime.logger.info("[%s] Running mirror command: %s", arch, mirror_cmd)
await retry(cmd_assert_async, mirror_cmd)
self.runtime.logger.info("[%s] Mirror command ran successfully", arch)
if self.runtime.dry_run:
self.runtime.logger.info("[DRY RUN] [%s] Would have run %s", arch, " ".join(aws_cmd))
self.runtime.logger.info("[DRY RUN] [%s] Would have messaged Slack", arch)
else:
await asyncio.sleep(5)
self.runtime.logger.info("[%s] Running aws command: %s", arch, aws_cmd)
await retry(cmd_assert_async, aws_cmd)
self.runtime.logger.info("[%s] AWS command ran successfully", arch)
await asyncio.sleep(5)

await self.slack_client.say_in_thread(f":white_check_mark: Successfully synced {self.version}-{arch}")

except ChildProcessError as e:
self.runtime.logger.error("[%s] Failed to sync: %s", arch, e)
if self.runtime.dry_run:
self.runtime.logger.info("[DRY RUN] [%s] Would have messaged Slack", arch)
else:
await self.slack_client.say_in_thread(f":warning: Failed to sync {self.version}-{arch}: {e}")
return False

if os.path.exists(f"{self.workdir}/{path}"):
self.runtime.logger.info("[%s] Cleaning dir: %s", arch, f"{self.workdir}/{path}")
shutil.rmtree(f"{self.workdir}/{path}")

return True

async def run(self) -> None:
mkdirs(self.workdir)

if not self.runtime.dry_run:
slack_response = await self.slack_client.say_in_thread(f":construction: Syncing arches {', '.join(self.arches)} of {self.version} to AWS S3 Bucket :construction:")
slack_channel_id = slack_response["channel"]
main_message_ts = slack_response["message"]["ts"]
else:
sleep(5)
self.runtime.logger.info("Running aws command: %s", aws_cmd)
cmd_assert(aws_cmd, retries=N_RETRIES)
self.runtime.logger.info("AWS command ran successfully")
sleep(5)
self.runtime.logger.info("[DRY RUN] Would have messaged Slack")

if os.path.exists(path):
self.runtime.logger.info("Cleaning dir: %s", path)
shutil.rmtree(path)
tasks = [self.sync_arch(arch) for arch in self.arches]
results = await asyncio.gather(*tasks)

def run(self):
mkdirs(self.workdir)

for arch in self.arches:
self.runtime.logger.info("Now syncing arch %s", arch)
self.sync_arch(arch)
# Report the results to Slack
if not self.runtime.dry_run:
if all(results):
await self.slack_client._client.reactions_add(channel=slack_channel_id, timestamp=main_message_ts, name="done_it_is")
else:
await self.slack_client.say_in_thread(":x: Failed to sync some arches", broadcast=True)
else:
self.runtime.logger.info("[DRY RUN] Would have messaged Slack")


@cli.command("quay-doomsday-backup", help="Run doomsday pipeline for the specified version and all arches unless --arches is specified")
@click.option("--arches", required=False, help="Comma separated list of arches to sync")
@click.option("--version", required=True, help="Release to sync, e.g. 4.15.3")
@pass_runtime
def quay_doomsday_backup(runtime: Runtime, arches: str, version: str):
@click_coroutine
async def quay_doomsday_backup(runtime: Runtime, arches: str, version: str):

# In 4.12 we sync only x86_64 and s390x
if version.startswith("4.12"):
Expand All @@ -81,4 +117,4 @@ def quay_doomsday_backup(runtime: Runtime, arches: str, version: str):
doomsday_pipeline = QuayDoomsdaySync(runtime=runtime,
arches=arches,
version=version)
doomsday_pipeline.run()
await doomsday_pipeline.run()
8 changes: 4 additions & 4 deletions pyartcd/pyartcd/slack.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ def bind_channel(self, channel_or_release: Optional[str]):
else:
raise ValueError(f"Invalid channel_or_release value: {channel_or_release}")

async def say_in_thread(self, message: str, reaction: Optional[str] = None):
async def say_in_thread(self, message: str, reaction: Optional[str] = None, broadcast: bool = False):
if not self._thread_ts:
response_data = await self.say(message, thread_ts=None, reaction=reaction)
self._thread_ts = response_data["ts"]
return response_data
else:
return await self.say(message, thread_ts=self._thread_ts, reaction=reaction)
return await self.say(message, thread_ts=self._thread_ts, reaction=reaction, broadcast=broadcast)

async def say(self, message: str, thread_ts: Optional[str] = None, reaction: Optional[str] = None):
async def say(self, message: str, thread_ts: Optional[str] = None, reaction: Optional[str] = None, broadcast: bool = False):
attachments = []
if self.build_url:
attachments.append({
Expand All @@ -62,7 +62,7 @@ async def say(self, message: str, thread_ts: Optional[str] = None, reaction: Opt
return {"message": {"ts": "fake"}, "ts": "fake"}
response = await self._client.chat_postMessage(channel=self.channel, text=message, thread_ts=thread_ts,
username=self.as_user, link_names=True, attachments=attachments,
icon_emoji=self.icon_emoji, reply_broadcast=False)
icon_emoji=self.icon_emoji, reply_broadcast=broadcast)
# https://api.slack.com/methods/reactions.add
if reaction:
await self._client.reactions_add(
Expand Down
Loading