Skip to content

Commit

Permalink
Merge branch 'main' into flexible-inventory
Browse files Browse the repository at this point in the history
  • Loading branch information
mkanoor authored Jun 29, 2023
2 parents a673beb + 67c5413 commit 4b2e365
Show file tree
Hide file tree
Showing 32 changed files with 416 additions and 125 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.13.0
current_version = 1.0.0
commit = True
tag = True
search = {current_version}
Expand Down
1 change: 0 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ jobs:
strategy:
matrix:
python-version:
- "3.8"
- "3.9"
- "3.10"
- "3.11"
Expand Down
1 change: 0 additions & 1 deletion .github/workflows/e2e-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ jobs:
strategy:
matrix:
python-version:
- "3.8"
- "3.9"
- "3.10"
- "3.11"
Expand Down
1 change: 0 additions & 1 deletion .github/workflows/scheduled.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ jobs:
strategy:
matrix:
python-version:
- "3.8"
- "3.9"
- "3.10"
- "3.11"
Expand Down
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@ repos:
hooks:
- id: pretty-format-json
language_version: python3
args: ['--autofix', '--no-sort-keys', '--indent', '4']
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,15 @@

## [Unreleased]

### Added

### Fixed

### Removed
- Remove official support for Python 3.8

## [1.0.0] - 2023-06-13

### Added
- Sending heartbeat to the server with the session stats
- Added command line option --execution-strategy
Expand All @@ -12,6 +21,11 @@
- In a collection look for playbook in playbooks directory
- Support .yaml and .yml extension for playbooks
- Retract fact for partial and complete matches
- Checking of controller url and token at startup
- rule_uuid and ruleset_uuid provided even when an action fails
- Drools intermittently misses firing of rules
- Resend events lost during websocket disconnect
- limits the number of simultaneously open connection to controller to 30

### Removed

Expand Down
2 changes: 1 addition & 1 deletion ansible_rulebook/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@

"""Top-level package for Ansible Events."""

__version__ = "__version__ = '0.13.0'"
__version__ = "__version__ = '1.0.0'"
12 changes: 10 additions & 2 deletions ansible_rulebook/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ async def run(parsed_args: argparse.ArgumentParser) -> None:
startup_args.controller_ssl_verify = parsed_args.controller_ssl_verify

validate_actions(startup_args)
set_controller_params(startup_args)

if startup_args.check_controller_connection:
await validate_controller_params(startup_args)

if parsed_args.websocket_address:
event_log = asyncio.Queue()
Expand Down Expand Up @@ -139,6 +141,7 @@ async def run(parsed_args: argparse.ArgumentParser) -> None:

logger.info("Main complete")
await event_log.put(dict(type="Exit"))
await job_template_runner.close_session()
if error_found:
raise Exception("One of the source plugins failed")

Expand Down Expand Up @@ -225,6 +228,8 @@ def validate_actions(startup_args: StartupArgs) -> None:
for ruleset in startup_args.rulesets:
for rule in ruleset.rules:
for action in rule.actions:
if action.action == "run_job_template":
startup_args.check_controller_connection = True
if (
action.action in INVENTORY_ACTIONS
and not startup_args.inventory
Expand All @@ -245,9 +250,12 @@ def validate_actions(startup_args: StartupArgs) -> None:
)


def set_controller_params(startup_args: StartupArgs) -> None:
async def validate_controller_params(startup_args: StartupArgs) -> None:
if startup_args.controller_url:
job_template_runner.host = startup_args.controller_url
job_template_runner.token = startup_args.controller_token
if startup_args.controller_ssl_verify:
job_template_runner.verify_ssl = startup_args.controller_ssl_verify

data = await job_template_runner.get_config()
logger.info("AAP Version %s", data["version"])
2 changes: 1 addition & 1 deletion ansible_rulebook/builtin.py
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,7 @@ async def run_job_template(
rule_run_at=rule_run_at,
)
if "error" in controller_job:
a_log["reason"] = dict(error=controller_job["error"])
a_log["message"] = controller_job["error"]
await event_log.put(a_log)

if set_facts or post_events:
Expand Down
1 change: 1 addition & 0 deletions ansible_rulebook/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ class StartupArgs:
controller_ssl_verify: str = field(default="")
project_data_file: str = field(default="")
inventory: str = field(default="")
check_controller_connection: bool = field(default=False)
129 changes: 57 additions & 72 deletions ansible_rulebook/job_template_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

class JobTemplateRunner:
JOB_TEMPLATE_SLUG = "/api/v2/job_templates"
VALID_POST_CODES = [200, 201, 202]
CONFIG_SLUG = "/api/v2/config"
JOB_COMPLETION_STATUSES = ["successful", "failed", "error", "canceled"]

def __init__(
Expand All @@ -46,27 +46,36 @@ def __init__(
self.refresh_delay = int(
os.environ.get("EDA_JOB_TEMPLATE_REFRESH_DELAY", 10)
)

async def _get_page(
self, session: aiohttp.ClientSession, href_slug: str, params: dict
) -> dict:
url = urljoin(self.host, href_slug)
async with session.get(
url, params=params, ssl=self._sslcontext
) as response:
response_text = dict(
status=response.status, body=await response.text()
self._session = None

async def close_session(self):
if self._session and not self._session.closed:
await self._session.close()

def _create_session(self):
if self._session is None:
limit = int(os.getenv("EDA_CONTROLLER_CONNECTION_LIMIT", "30"))
self._session = aiohttp.ClientSession(
connector=aiohttp.TCPConnector(limit=limit),
headers=self._auth_headers(),
raise_for_status=True,
)
if response_text["status"] != 200:
raise ControllerApiException(
"Failed to get from %s. Status: %s, Body: %s"
% (
url,
response_text["status"],
response_text.get("body", "empty"),
)
)
return response_text

async def _get_page(self, href_slug: str, params: dict) -> dict:
try:
url = urljoin(self.host, href_slug)
self._create_session()
async with self._session.get(
url, params=params, ssl=self._sslcontext
) as response:
return json.loads(await response.text())
except aiohttp.ClientError as e:
logger.error("Error connecting to controller %s", str(e))
raise ControllerApiException(str(e))

async def get_config(self) -> dict:
logger.info("Attempting to connect to Controller %s", self.host)
return await self._get_page(self.CONFIG_SLUG, {})

def _auth_headers(self) -> dict:
return dict(Authorization=f"Bearer {self.token}")
Expand All @@ -84,26 +93,20 @@ async def _get_job_template_id(self, name: str, organization: str) -> int:
slug = f"{self.JOB_TEMPLATE_SLUG}/"
params = {"name": name}

async with aiohttp.ClientSession(
headers=self._auth_headers()
) as session:
while True:
response = await self._get_page(session, slug, params)
json_body = json.loads(response["body"])
for jt in json_body["results"]:
if (
jt["name"] == name
and dpath.get(
jt, "summary_fields.organization.name", "."
)
== organization
):
return jt["id"]

if json_body.get("next", None):
params["page"] = params.get("page", 1) + 1
else:
break
while True:
json_body = await self._get_page(slug, params)
for jt in json_body["results"]:
if (
jt["name"] == name
and dpath.get(jt, "summary_fields.organization.name", ".")
== organization
):
return jt["id"]

if json_body.get("next", None):
params["page"] = params.get("page", 1) + 1
else:
break

raise JobTemplateNotFoundException(
(
Expand All @@ -123,47 +126,29 @@ async def run_job_template(
url = job["url"]
params = {}

async with aiohttp.ClientSession(
headers=self._auth_headers()
) as session:
while True:
# fetch and process job status
response = await self._get_page(session, url, params)
json_body = json.loads(response["body"])
job_status = json_body["status"]
if job_status in self.JOB_COMPLETION_STATUSES:
return json_body
while True:
# fetch and process job status
json_body = await self._get_page(url, params)
job_status = json_body["status"]
if job_status in self.JOB_COMPLETION_STATUSES:
return json_body

await asyncio.sleep(self.refresh_delay)
await asyncio.sleep(self.refresh_delay)

async def launch(
self, name: str, organization: str, job_params: dict
) -> dict:
jt_id = await self._get_job_template_id(name, organization)
url = urljoin(self.host, f"{self.JOB_TEMPLATE_SLUG}/{jt_id}/launch/")

async with aiohttp.ClientSession(
headers=self._auth_headers()
) as session:
async with session.post(
try:
async with self._session.post(
url, json=job_params, ssl=self._sslcontext
) as post_response:
response = dict(
status=post_response.status,
body=await post_response.text(),
)

if response["status"] not in self.VALID_POST_CODES:
raise ControllerApiException(
"Failed to post to %s. Status: %s, Body: %s"
% (
url,
response["status"],
response.get("body", "empty"),
)
)
json_body = json.loads(response["body"])
return json_body
return json.loads(await post_response.text())
except aiohttp.ClientError as e:
logger.error("Error connecting to controller %s", str(e))
raise ControllerApiException(str(e))


job_template_runner = JobTemplateRunner()
12 changes: 9 additions & 3 deletions ansible_rulebook/rule_set_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import asyncio
import gc
import logging
from datetime import datetime
import uuid
from pprint import PrettyPrinter, pformat
from types import MappingProxyType
from typing import Dict, List, Optional, Union, cast
Expand Down Expand Up @@ -446,11 +446,17 @@ async def _call_action(
dict(
type="Action",
action=action,
action_uuid=str(uuid.uuid4()),
activation_id=settings.identifier,
playbook_name=action_args.get("name"),
status="failed",
run_at=str(datetime.utcnow()),
reason=dict(error=str(error)),
run_at=run_at(),
rule_run_at=rule_run_at,
message=str(error),
rule=rule,
ruleset=ruleset,
rule_uuid=rule_uuid,
ruleset_uuid=ruleset_uuid,
)
)

Expand Down
5 changes: 4 additions & 1 deletion ansible_rulebook/schema/ruleset_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@
},
"execution_strategy": {
"type": "string",
"enum": ["sequential", "parallel"],
"enum": [
"parallel",
"sequential"
],
"default": "sequential"
},
"sources": {
Expand Down
17 changes: 14 additions & 3 deletions ansible_rulebook/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,21 +89,32 @@ async def send_event_log_to_websocket(
event_log, websocket_address, websocket_ssl_verify
):
logger.info("websocket %s connecting", websocket_address)
event = None
async for websocket in websockets.connect(
websocket_address,
logger=logger,
ssl=_sslcontext(websocket_address, websocket_ssl_verify),
):
logger.info("websocket %s connected", websocket_address)
event = None
try:
if event:
logger.info("Resending last event...")
await websocket.send(json.dumps(event))
event = None

while True:
event = await event_log.get()
await websocket.send(json.dumps(event))

if event == dict(type="Shutdown"):
return
except websockets.ConnectionClosed:
logger.warning("websocket %s connection closed", websocket_address)

event = None
except websockets.exceptions.ConnectionClosed:
logger.warning(
"websocket %s connection closed, will retry...",
websocket_address,
)
except CancelledError:
logger.info("closing websocket due to task cancelled")
return
Expand Down
Loading

0 comments on commit 4b2e365

Please sign in to comment.