Skip to content

Commit

Permalink
Date filter, logging and types (#26)
Browse files Browse the repository at this point in the history
* added logging, date filter and types

Signed-off-by: Shashank Reddy Boyapally <sboyapal@redhat.com>

* added types for constructor

Signed-off-by: Shashank Reddy Boyapally <sboyapal@redhat.com>

* add pull request to workflows, add new unit test

Signed-off-by: Shashank Reddy Boyapally <sboyapal@redhat.com>

* pylint error

Signed-off-by: Shashank Reddy Boyapally <sboyapal@redhat.com>

* bumped version

Signed-off-by: Shashank Reddy Boyapally <sboyapal@redhat.com>

* updated utility for functional testing

Signed-off-by: Shashank Reddy Boyapally <sboyapal@redhat.com>

---------

Signed-off-by: Shashank Reddy Boyapally <sboyapal@redhat.com>
  • Loading branch information
shashank-boyapally authored Jul 10, 2024
1 parent f715ad5 commit 69145e4
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 48 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/unit_test.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
name: Run pytest on Pull Request

on: [push]
on:
push:
pull_request:

jobs:
test:
Expand Down
49 changes: 49 additions & 0 deletions fmatch/logrus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"""
Logger as a common package
"""

import logging
import sys


class SingletonLogger:
"""Singleton logger to set logging at one single place
Returns:
_type_: _description_
"""

instance = {}

def __new__(cls, debug: int, name: str):
if (not cls.instance) or name not in cls.instance:
cls.instance[name] = cls._initialize_logger(debug,name)
return cls.instance[name]

@staticmethod
def _initialize_logger(debug: int, name: str) -> logging.Logger:
level = debug # if debug else logging.INFO
logger = logging.getLogger(name)
logger.propagate=False
if not logger.hasHandlers():
logger.setLevel(level)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(level)
formatter = logging.Formatter(
"%(asctime)s - %(name)-10s - %(levelname)s - file: %(filename)s - line: %(lineno)d - %(message)s" # pylint: disable = line-too-long
)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger

@classmethod
def getLogger(cls, name:str) -> logging.Logger:
"""Return logger in instance
Args:
name (str): name of the logger
Returns:
logging.Logger: logger
"""
return cls.instance.get(name, None)
106 changes: 65 additions & 41 deletions fmatch/matcher.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
""" metadata matcher
"""

# pylint: disable = invalid-name, invalid-unary-operand-type
# pylint: disable = invalid-name, invalid-unary-operand-type, no-member
import os
import sys
import logging
from datetime import datetime
from typing import List, Dict, Any

# pylint: disable=import-error
from elasticsearch import Elasticsearch
Expand All @@ -13,35 +14,28 @@
# pylint: disable=import-error
import pandas as pd
from elasticsearch_dsl import Search, Q
from elasticsearch_dsl.response import Response
from fmatch.logrus import SingletonLogger


class Matcher:
"""Matcher"""

def __init__(
self, index="ospst-perf-scale-ci",
level=logging.INFO,
ES_URL=os.getenv("ES_SERVER"),
verify_certs=True
self,
index: str ="ospst-perf-scale-ci",
level: int =logging.INFO,
ES_URL: str =os.getenv("ES_SERVER"),
verify_certs: bool =True,
):
self.index = index
self.es_url = ES_URL
self.search_size = 10000
self.logger = logging.getLogger("Matcher")
self.logger.setLevel(level)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(level)
formatter = logging.Formatter(
"%(asctime)s [%(name)s:%(filename)s:%(lineno)d] %(levelname)s: %(message)s"
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
# We can set the ES logging higher if we want additional debugging
logging.getLogger("elasticsearch").setLevel(logging.WARN)
self.logger = SingletonLogger(debug=level, name="Matcher")
self.es = Elasticsearch([self.es_url], timeout=30, verify_certs=verify_certs)
self.data = None

def get_metadata_by_uuid(self, uuid, index=None):
def get_metadata_by_uuid(self, uuid: str, index: str = None) -> dict:
"""Returns back metadata when uuid is given
Args:
Expand All @@ -62,7 +56,7 @@ def get_metadata_by_uuid(self, uuid, index=None):
result = dict(hits[0].to_dict()["_source"])
return result

def query_index(self, index, search):
def query_index(self, index: str, search: Search) -> Response:
"""generic query function
Args:
Expand All @@ -73,33 +67,50 @@ def query_index(self, index, search):
self.logger.debug("Executing query \r\n%s", search.to_dict())
return search.execute()

def get_uuid_by_metadata(self, meta, index=None):
def get_uuid_by_metadata(
self, meta: Dict[str, Any], index: str = None, lookback_date: datetime = None
) -> List[Dict[str, str]]:
"""get_uuid_by_metadata"""
if index is None:
index = self.index
version = meta["ocpVersion"][:4]

must_clause = [
(
Q("match", **{field: str(value)})
if isinstance(value, str)
else Q("match", **{field: value})
)
for field, value in meta.items()
if field not in "ocpVersion"
]

filter_clause = [
Q("wildcard", ocpVersion=f"{version}*"),
Q("match", jobStatus="success"),
]
if isinstance(lookback_date, datetime):
lookback_date = lookback_date.strftime("%Y-%m-%dT%H:%M:%SZ")
if lookback_date:
filter_clause.append(Q("range", timestamp={"gt": lookback_date}))
query = Q(
"bool",
must=[
Q(
"match", **{field: str(value)}
) if isinstance(value, str) else Q('match', **{field: value})
for field, value in meta.items()
if field not in "ocpVersion"
],
filter=[
Q("wildcard", ocpVersion=f"{version}*"),
Q("match", jobStatus="success"),
],
must=must_clause,
filter=filter_clause,
)
s = Search(using=self.es, index=index).query(query).extra(size=self.search_size)
result = self.query_index(index, s)
hits = result.hits.hits
uuids_docs = [{ "uuid":hit.to_dict()["_source"]["uuid"],
"buildUrl":hit.to_dict()["_source"]["buildUrl"]} for hit in hits]
uuids_docs = [
{
"uuid": hit.to_dict()["_source"]["uuid"],
"buildUrl": hit.to_dict()["_source"]["buildUrl"],
}
for hit in hits
]
return uuids_docs

def match_kube_burner(self, uuids, index):
def match_kube_burner(self, uuids: List[str], index: str) -> List[Dict[str, Any]]:
"""match kube burner runs
Args:
uuids (list): list of uuids
Expand All @@ -121,7 +132,7 @@ def match_kube_burner(self, uuids, index):
runs = [item.to_dict()["_source"] for item in result.hits.hits]
return runs

def filter_runs(self, pdata, data):
def filter_runs(self, pdata: Dict[Any, Any], data: Dict[Any, Any]) -> List[str]:
"""filter out runs with different jobIterations
Args:
pdata (_type_): _description_
Expand All @@ -138,7 +149,9 @@ def filter_runs(self, pdata, data):
ids_df = ndf.loc[df["jobConfig.jobIterations"] == iterations]
return ids_df["uuid"].to_list()

def getResults(self, uuid: str, uuids: list, index_str: str, metrics: dict):
def getResults(
self, uuid: str, uuids: List[str], index_str: str, metrics: Dict[str, Any]
) -> Dict[Any, Any]:
"""
Get results of elasticsearch data query based on uuid(s) and defined metrics
Expand All @@ -156,7 +169,7 @@ def getResults(self, uuid: str, uuids: list, index_str: str, metrics: dict):
metric_queries = []
not_queries = [
~Q("match", **{not_item_key: not_item_value})
for not_item_key, not_item_value in metrics.get("not",{}).items()
for not_item_key, not_item_value in metrics.get("not", {}).items()
]
metric_queries = [
Q("match", **{metric_key: metric_value})
Expand All @@ -180,7 +193,9 @@ def getResults(self, uuid: str, uuids: list, index_str: str, metrics: dict):
runs = [item.to_dict()["_source"] for item in result.hits.hits]
return runs

def get_agg_metric_query(self, uuids, index, metrics):
def get_agg_metric_query(
self, uuids: List[str], index: str, metrics: Dict[str, Any]
):
"""burner_metric_query will query for specific metrics data.
Args:
Expand Down Expand Up @@ -222,7 +237,9 @@ def get_agg_metric_query(self, uuids, index, metrics):
data = self.parse_agg_results(result, agg_value, agg_type)
return data

def parse_agg_results(self, data: dict, agg_value, agg_type):
def parse_agg_results(
self, data: Dict[Any, Any], agg_value: str, agg_type: str
) -> List[Dict[Any, Any]]:
"""parse out CPU data from kube-burner query
Args:
data (dict): Aggregated data from Elasticsearch DSL query
Expand All @@ -249,7 +266,9 @@ def parse_agg_results(self, data: dict, agg_value, agg_type):
res.append(dat)
return res

def convert_to_df(self, data, columns=None):
def convert_to_df(
self, data: Dict[Any, Any], columns: List[str] = None
) -> pd.DataFrame:
"""convert to a dataframe
Args:
data (_type_): _description_
Expand All @@ -263,7 +282,12 @@ def convert_to_df(self, data, columns=None):
odf = pd.DataFrame(odf, columns=columns)
return odf

def save_results(self, df, csv_file_path="output.csv", columns=None):
def save_results(
self,
df: pd.DataFrame,
csv_file_path: str = "output.csv",
columns: List[str] = None,
) -> None:
"""write results to CSV
Args:
df (_type_): _description_
Expand Down
22 changes: 17 additions & 5 deletions fmatch/test_fmatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,47 @@
test_fmatch
"""

from datetime import datetime
import sys
import warnings
# pylint: disable=import-error
import pandas as pd

# pylint: disable=import-error
from matcher import Matcher

match = Matcher(index="perf_scale_ci")
res=match.get_metadata_by_uuid("b4afc724-f175-44d1-81ff-a8255fea034f",'perf_scale_ci')
warnings.filterwarnings("ignore", message="Unverified HTTPS request.*")
warnings.filterwarnings(
"ignore", category=UserWarning, message=".*Connecting to.*verify_certs=False.*"
)

match = Matcher(index="perf_scale_ci*", verify_certs=False)
res=match.get_metadata_by_uuid("b4afc724-f175-44d1-81ff-a8255fea034f",'perf_scale_ci*')

meta = {}
meta["masterNodesType"] = "m6a.xlarge"
meta["workerNodesType"] = "m6a.xlarge"
meta["platform"] = "AWS"
meta["masterNodesCount"] = 3
meta["workerNodesCount"] = 24
meta["workerNodesCount"] = 6
meta["jobStatus"] = "success"
meta["ocpVersion"] = "4.15"
meta["ocpVersion"] = "4.17"
meta["networkType"] = "OVNKubernetes"
meta["benchmark.keyword"] = "cluster-density-v2"
# meta['encrypted'] = "true"
# meta['ipsec'] = "false"
# meta['fips'] = "false"

uuids = match.get_uuid_by_metadata(meta)
print("All uuids",len(uuids))
date= datetime.strptime("2024-07-01T13:46:24Z","%Y-%m-%dT%H:%M:%SZ")
uuids2= match.get_uuid_by_metadata(meta,lookback_date=date)
print("lookback uuids",len(uuids2))
uuids2 = match.get_uuid_by_metadata(meta)
if len(uuids) == 0:
print("No UUID present for given metadata")
sys.exit()
runs = match.match_kube_burner(uuids)
runs = match.match_kube_burner(uuids,"ripsaw-kube-burner*")

ids = match.filter_runs(runs, runs)
podl_metrics = {
Expand Down
24 changes: 24 additions & 0 deletions fmatch/tests/test_matcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
# pylint: disable = import-error, duplicate-code
import os
from unittest.mock import patch
import datetime

from elasticsearch_dsl import Search
from elasticsearch_dsl.response import Response
Expand Down Expand Up @@ -76,6 +77,29 @@ def test_get_uuid_by_metadata(matcher_instance):
"buildUrl":"buildUrl1"}]
assert result == expected

def test_get_uuid_by_metadata_lookback(matcher_instance):
matcher_instance.es.search = lambda *args, **kwargs: {
"hits": {
"hits": [{"_source": {"uuid": "uuid1",
"buildUrl":"buildUrl1",
"timestamp":"2024-07-10T13:46:24Z"}},
{"_source": {"uuid": "uuid2",
"buildUrl":"buildUrl1",
"timestamp":"2024-07-08T13:46:24Z"}}]
}
}
meta = {
"field1": "value1",
"ocpVersion": "4.15",
}
date= datetime.datetime.strptime("2024-07-07T13:46:24Z","%Y-%m-%dT%H:%M:%SZ")
result = matcher_instance.get_uuid_by_metadata(meta=meta, lookback_date=date)
expected= [{"uuid": "uuid1",
"buildUrl":"buildUrl1"},
{"uuid": "uuid2",
"buildUrl":"buildUrl1"}]
assert result == expected


def test_match_kube_burner(matcher_instance):
result = matcher_instance.match_kube_burner(["uuid1"],index="ospst-*")
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from setuptools import setup, find_packages


VERSION = '0.0.7'
VERSION = '0.0.8'
DESCRIPTION = 'Common package for matching runs with provided metadata'
# pylint: disable= line-too-long
LONG_DESCRIPTION = "A package that allows to match metadata and get runs and create csv files with queried metrics"
Expand Down

0 comments on commit 69145e4

Please sign in to comment.