Skip to content

Commit

Permalink
ADCM-6210 Test and Job fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Sealwing committed Dec 20, 2024
1 parent b115859 commit fee608f
Show file tree
Hide file tree
Showing 7 changed files with 232 additions and 40 deletions.
7 changes: 3 additions & 4 deletions adcm_aio_client/core/actions/_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,9 @@ async def run(self: Self) -> Job:
from adcm_aio_client.core.objects.cm import Job

# todo build data for config and mapping
data = {"isVerbose": self._verbose, "isBlocking": self._blocking}
data = {"isVerbose": self._verbose, "shouldBlockObject": self._blocking}
response = await self._requester.post(*self.get_own_path(), "run", data=data)
job = Job(requester=self._requester, data=response.as_dict())
return job
return Job(requester=self._requester, data=response.as_dict())

@async_cached_property
async def _mapping_rule(self: Self) -> list[dict] | None:
Expand Down Expand Up @@ -84,7 +83,7 @@ async def _rich_data(self: Self) -> dict:
return (await self._requester.get(*self.get_own_path())).as_dict()


class ActionsAccessor(NonPaginatedChildAccessor):
class ActionsAccessor[Parent: InteractiveObject](NonPaginatedChildAccessor[Parent, Action]):
class_type = Action
filtering = Filtering(FilterByName, FilterByDisplayName)

Expand Down
8 changes: 6 additions & 2 deletions adcm_aio_client/core/objects/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,6 @@ def _repr(self: Self) -> str:


class RootInteractiveObject(InteractiveObject):
PATH_PREFIX: str

def get_own_path(self: Self) -> Endpoint:
# change here
return self._build_own_path(self.id)
Expand All @@ -106,6 +104,12 @@ def __init__(self: Self, parent: Parent, data: dict[str, Any]) -> None:
def get_own_path(self: Self) -> Endpoint:
return *self._parent.get_own_path(), self.PATH_PREFIX, self.id

@classmethod
async def with_id(cls: type[Self], parent: Parent, object_id: int) -> Self:
object_path = (*parent.get_own_path(), cls.PATH_PREFIX, str(object_id))
response = await parent.requester.get(*object_path)
return cls(parent=parent, data=response.as_dict())


class MaintenanceMode:
def __init__(
Expand Down
1 change: 0 additions & 1 deletion adcm_aio_client/core/objects/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ def actions(self: Self) -> ActionsAccessor:
return ActionsAccessor(parent=self, path=(*self.get_own_path(), "actions"), requester=self._requester)


# todo whole section lacking implementation (and maybe code move is required)
class WithConfig(ConfigOwner):
@cached_property
async def config(self: Self) -> ObjectConfig:
Expand Down
112 changes: 80 additions & 32 deletions adcm_aio_client/core/objects/cm.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,63 +460,108 @@ async def _get_hosts(
return tuple(hosts)


def default_exit_condition(job: "Job") -> bool:
return job.get_status() in DEFAULT_JOB_TERMINAL_STATUSES
async def default_exit_condition(job: "Job") -> bool:
return await job.get_status() in DEFAULT_JOB_TERMINAL_STATUSES


class Job[Object: "InteractiveObject"](WithStatus, WithActions, RootInteractiveObject):
class Job(WithStatus, RootInteractiveObject):
PATH_PREFIX = "tasks"

@property
def name(self: Self) -> str:
return str(self._data["name"])

@property
@cached_property
def start_time(self: Self) -> datetime | None:
# todo test it that it should be datetime, not str
return self._data["startTime"]
time = self._data["startTime"]
if time is None:
return time

@property
def finish_time(self: Self) -> datetime | None:
return self._data["endTime"]
return datetime.fromisoformat(time)

@property
def object(self: Self) -> Object:
obj_data = self._data["objects"][0]
obj_type = obj_data["type"]
@cached_property
def finish_time(self: Self) -> datetime | None:
time = self._data["endTime"]
if time is None:
return time

obj_dict = {
"host": Host,
"component": Component,
"provider": HostProvider,
"cluster": Cluster,
"service": Service,
"adcm": ADCM,
}
return datetime.fromisoformat(time)

return self._construct(what=obj_dict[obj_type], from_data=obj_data)
@async_cached_property
async def object(self: Self) -> InteractiveObject:
objects_raw = self._parse_objects()
return await self._retrieve_target(objects_raw)

@property
def action(self: Self) -> Action:
return self._construct(what=Action, from_data=self._data["action"])
@async_cached_property
async def action(self: Self) -> Action:
target = await self.object
return Action(parent=target, data=self._data["action"])

async def wait(
self: Self,
timeout: int | None = None,
poll_interval: int = 10,
exit_condition: Callable[[Self], bool] = default_exit_condition,
exit_condition: Callable[[Self], Awaitable[bool]] = default_exit_condition,
) -> Self:
timeout_condition = datetime.max if timeout is None else (datetime.now() + timedelta(seconds=timeout)) # noqa: DTZ005

while datetime.now() < timeout_condition: # noqa: DTZ005
if exit_condition(self):
if await exit_condition(self):
return self

await asyncio.sleep(poll_interval)

raise TimeoutError
message = "Failed to meet exit condition for job"
if timeout:
message = f"{message} in {timeout} seconds with {poll_interval} second interval"

raise TimeoutError(message)

async def terminate(self: Self) -> None:
await self._requester.post(*self.get_own_path(), "terminate", data={})

def _parse_objects(self: Self) -> dict[str, int]:
return {entry["type"]: entry["id"] for entry in self._data["objects"]}

async def _retrieve_target(self: Self, objects: dict[str, int]) -> InteractiveObject:
match objects:
case {"action_host_group": id_}:
objects.pop("action_host_group")
owner = await self._retrieve_target(objects)
return await ActionHostGroup.with_id(parent=owner, object_id=id_)

case {"host": id_}:
return await Host.with_id(requester=self._requester, object_id=id_)

case {"component": id_}:
objects.pop("component")

owner = await self._retrieve_target(objects)
if not isinstance(owner, Service):
message = f"Incorrect owner for component detected from job data: {owner}"
raise TypeError(message)

return await Component.with_id(parent=owner, object_id=id_)

case {"service": id_}:
objects.pop("service")

owner = await self._retrieve_target(objects)
if not isinstance(owner, Cluster):
message = f"Incorrect owner for service detected from job data: {owner}"
raise TypeError(message)

return await Service.with_id(parent=owner, object_id=id_)

case {"cluster": id_}:
return await Cluster.with_id(requester=self._requester, object_id=id_)

case {"provider": id_}:
return await HostProvider.with_id(requester=self._requester, object_id=id_)
case _:
message = f"Failed to detect Job's owner based on {objects}"
raise RuntimeError(message)


class JobsNode(PaginatedAccessor[Job]):
class_type = Job
Expand All @@ -532,23 +577,26 @@ class JobsNode(PaginatedAccessor[Job]):

# override accessor methods to allow passing object

async def get(self: Self, *, object: InteractiveObject | None = None, **filters: FilterValue) -> Job:
async def get(self: Self, *, object: InteractiveObject | None = None, **filters: FilterValue) -> Job: # noqa: A002
object_filter = self._prepare_filter_by_object(object)
all_filters = filters | object_filter
return await super().get(**all_filters)

async def get_or_none(self: Self, *, object: InteractiveObject | None = None, **filters: FilterValue) -> Job | None:
async def get_or_none(self: Self, *, object: InteractiveObject | None = None, **filters: FilterValue) -> Job | None: # noqa: A002
object_filter = self._prepare_filter_by_object(object)
all_filters = filters | object_filter
return await super().get(**all_filters)

async def filter(self: Self, *, object: InteractiveObject | None = None, **filters: FilterValue) -> list[Job]:
async def filter(self: Self, *, object: InteractiveObject | None = None, **filters: FilterValue) -> list[Job]: # noqa: A002
object_filter = self._prepare_filter_by_object(object)
all_filters = filters | object_filter
return await super().filter(**all_filters)

async def iter(
self: Self, *, object: InteractiveObject | None = None, **filters: FilterValue
self: Self,
*,
object: InteractiveObject | None = None, # noqa: A002
**filters: FilterValue,
) -> AsyncGenerator[Job, None]:
object_filter = self._prepare_filter_by_object(object)
all_filters = filters | object_filter
Expand Down
16 changes: 16 additions & 0 deletions tests/integration/bundles/complex_cluster/actions.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
- name: letsgo
hosts: localhost
connection: local
gather_facts: no

tasks:
- name: Success
debug:
msg: "successful step"
tags: [ok]

- name: Fail
fail:
msg: "failed step"
tags: [fail]

2 changes: 1 addition & 1 deletion tests/integration/setup_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from testcontainers.postgres import DbContainer, PostgresContainer

postgres_image_name = "postgres:latest"
adcm_image_name = "hub.adsw.io/adcm/adcm:feature_ADCM-6181"
adcm_image_name = "hub.adsw.io/adcm/adcm:develop"
adcm_container_name = "test_adcm"
postgres_name = "test_pg_db"

Expand Down
126 changes: 126 additions & 0 deletions tests/integration/test_jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
from datetime import datetime
from itertools import chain
import asyncio

import pytest
import pytest_asyncio

from adcm_aio_client.core.client import ADCMClient
from adcm_aio_client.core.filters import Filter, FilterValue
from adcm_aio_client.core.objects._common import WithActions
from adcm_aio_client.core.objects.cm import Bundle, Cluster, Component, Job

pytestmark = [pytest.mark.asyncio]


async def run_non_blocking(target: WithActions, **filters: FilterValue) -> Job:
action = await target.actions.get(**filters)
action.blocking = False
return await action.run()


@pytest_asyncio.fixture()
async def prepare_environment(
adcm_client: ADCMClient,
complex_cluster_bundle: Bundle,
simple_hostprovider_bundle: Bundle,
) -> list[Job]:
cluster_bundle = complex_cluster_bundle
hostprovider_bundle = simple_hostprovider_bundle

clusters: list[Cluster] = await asyncio.gather(
*(adcm_client.clusters.create(cluster_bundle, f"wow-{i}") for i in range(5))
)
hostproviders = await asyncio.gather(
*(adcm_client.hostproviders.create(hostprovider_bundle, f"yay-{i}") for i in range(5))
)
await asyncio.gather(
*(adcm_client.hosts.create(hp, f"host-{hp.name}-{i}") for i in range(5) for hp in hostproviders)
)
hosts = await adcm_client.hosts.all()

services = tuple(
chain.from_iterable(
await asyncio.gather(
*(cluster.services.add(Filter(attr="name", op="eq", value="with_actions")) for cluster in clusters)
)
)
)
components = tuple(chain.from_iterable(await asyncio.gather(*(service.components.all() for service in services))))

host_groups = await asyncio.gather(
*(
object_.action_host_groups.create(name=f"ahg for {object_.__class__.__name__}")
for object_ in chain(clusters, services, components)
)
)

object_jobs = asyncio.gather(
*(
run_non_blocking(object_, name__eq="success")
for object_ in chain(clusters, services, components, hosts, hostproviders)
)
)

group_jobs = asyncio.gather(*(run_non_blocking(group, name__in=["fail"]) for group in host_groups))

return await object_jobs + await group_jobs


@pytest.mark.usefixtures("prepare_environment")
async def test_jobs_api(adcm_client: ADCMClient) -> None:
await _test_basic_api(adcm_client)
await _test_job_object(adcm_client)
await _test_collection_fitlering(adcm_client)


async def is_running(job: Job) -> bool:
return await job.get_status() == "running"


async def _test_basic_api(adcm_client: ADCMClient) -> None:
# fields: id, name, display_name, start_time, finish_time
# properties: object, action
# methods: get_status, wait, terminate
# refresh (with wait)
cluster, *_ = await adcm_client.clusters.list(query={"limit": 1, "offset": 3})
service = await cluster.services.get(name__contains="action")
component = await service.components.get(display_name__icontains="wESo")

action = await component.actions.get(display_name__ieq="Lots of me")
job = await action.run()
# depending on retrieval time it's "one of"
assert await job.get_status() in ("created", "running")
assert job.start_time is None
assert job.finish_time is None
assert (await job.action).id == action.id

await job.wait(exit_condition=is_running, timeout=5, poll_interval=1)
assert job.start_time is None
await job.refresh()
assert isinstance(job.start_time, datetime)
assert job.finish_time is None

target = await job.object
assert isinstance(target, Component)
assert target.id == component.id
assert target.service.id == component.service.id

await job.wait(timeout=10, poll_interval=1)

assert await job.get_status() == "success"
assert job.finish_time is None
await job.refresh()
assert isinstance(job.finish_time, datetime)


async def _test_job_object(adcm_client: ADCMClient) -> None:
# filter by object
# detect object from Job
...


async def _test_collection_fitlering(adcm_client: ADCMClient) -> None:
# filters: status, name, display_name, action
# special filter: object
...

0 comments on commit fee608f

Please sign in to comment.