Skip to content

Commit

Permalink
project: enable pylint
Browse files Browse the repository at this point in the history
added a comprehensive configuration for pylint. made the following changes to fix pylint warnings:

- class names converted to CamelCase
- replaced filter() calls with list comprehensions
- renamed variable names that conflict with builtins
- defined variables for some magic constants

added pylint job to CI.

also made the following refactorings:

- the project now uses dataclasses. added dataclasses dependency for py3.6
- extras/py36-test-dependencies.py now lists regular dependencies as well.
- renamed extras/py36-test-dependencies.py to extras/py36-all-dependencies.py
- Added FolderRetentionManagerSettings dataclass for storing FolderRetentionManager settings.
- Added load_policies_from_settings() method to FolderRetentionManager, which automatically creates policies based on setting values.
- Added debug logs to FolderRetentionManager.execute_policies
- Replaced all f-strings with "%" formatting in log calls for lazy evaluation
- Added HotkdumpParameters dataclass.
- Hotkdump class is now constructed with HotkdumpParameters.
- Added encoding parameter to write open() calls
- Moved the Hotkdump workflow from main to class member run() function
. Moved main() method logic from hotkdump_impl.py to main.py
- Renamed hotkdump_impl.py to hotkdump.py
- Moved switch_cwd method from Hotkdump to utils

Fixes #62

Signed-off-by: Mustafa Kemal Gilor <mustafa.gilor@canonical.com>
  • Loading branch information
xmkg committed Jun 3, 2024
1 parent 34fcac9 commit 6b56d64
Show file tree
Hide file tree
Showing 14 changed files with 587 additions and 369 deletions.
7 changes: 6 additions & 1 deletion .github/workflows/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,15 @@ jobs:
$SUDO_CMD apt -y install ${{ matrix.vars.aptpkg }}
pip3 install tox ${{ matrix.vars.pipargs }}
shell: bash
- name: Run unit tests ()
- name: Run unit tests
run: |
echo "Environment" ${{ matrix.toxenv }}
tox -e "${{ matrix.toxenv }}"
- name: Run pylint
if: matrix.toxenv== 'py312'
run: |
tox -e pylint
Test-Snap:
runs-on: ubuntu-20.04
steps:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,10 @@
with open("pyproject.toml", "r") as f:
data = toml.load(f)

# Dependencies
for line in data.get("project").get("dependencies"):
print(line)

# Test dependencies
for line in data.get("project").get("optional-dependencies").get("testing"):
print(line)
print(line)
2 changes: 1 addition & 1 deletion hotkdump/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env python3

# Copyright 2023 Canonical Limited.
# SPDX-License-Identifier: GPL-3.0
# SPDX-License-Identifier: GPL-3.0
2 changes: 1 addition & 1 deletion hotkdump/core/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env python3

# Copyright 2023 Canonical Limited.
# SPDX-License-Identifier: GPL-3.0
# SPDX-License-Identifier: GPL-3.0
5 changes: 2 additions & 3 deletions hotkdump/core/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
# Copyright 2023 Canonical Limited.
# SPDX-License-Identifier: GPL-3.0

"""
`hotkdump` exception types.
"""`hotkdump` exception types.
"""

import logging
Expand All @@ -18,4 +17,4 @@ def __init__(self, msg) -> None:

class NotAKernelCrashDumpException(ExceptionWithLog):
"""Exception thrown when a file given to kdump_file_header
is not recognized as a crash dump file."""
is not recognized as a crash dump file."""
135 changes: 101 additions & 34 deletions hotkdump/core/folder_retention_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,36 +3,41 @@
# Copyright 2023 Canonical Limited.
# SPDX-License-Identifier: GPL-3.0

"""
Folder retention manager and its' policies.
"""Folder retention manager and its' policies.
"""

import abc
import os
import logging
import time
from dataclasses import dataclass

from hotkdump.core.utils import pretty_size
from hotkdump.core.exceptions import ExceptionWithLog


class retention_policy_base(abc.ABC):
class RetentionPolicyBase(abc.ABC):
"""Base class for all retention policies.
"""

@abc.abstractproperty
@property
@abc.abstractmethod
def name(self):
""""""
"""Name of the policy."""

@abc.abstractclassmethod
def execute(self, file_infos: list):
""""""
@abc.abstractmethod
def execute(self, file_infos: list):
"""Implementation of the policy."""

def remove_file(self, file_info):
"""Remove a file."""
(path, stat) = file_info
os.remove(path)
logging.debug(
f"removed {path} to reclaim {pretty_size(stat.st_size)}. age: {(time.time() - stat.st_atime):.2f} seconds. reason: {self.name}")
logging.debug("removed %s to reclaim %s. age: %2f seconds. reason: %s ",
path, pretty_size(stat.st_size), (time.time() - stat.st_atime), self.name)


class rpolicy_total_file_count(retention_policy_base):
class RPTotalFileCount(RetentionPolicyBase):
"""File retention policy based on maximum file count
Args:
Expand All @@ -46,18 +51,21 @@ def __init__(self, max_file_count) -> None:
def name(self):
return "max file count policy"

def execute(self, file_infos: list) -> list:
def execute(self, file_infos: list) -> list:
if len(file_infos) > self.max_file_count:
to_remove = file_infos[self.max_file_count:]
to_remove = file_infos[self.max_file_count :]

for info in to_remove:
self.remove_file(info)

return list(filter(lambda i: i not in to_remove, file_infos))
return [x for x in file_infos if x not in to_remove]
return file_infos

def __str__(self):
return f"{self.name}: {self.max_file_count}"


class rpolicy_total_file_size(retention_policy_base):
class RPTotalFileSize(RetentionPolicyBase):
"""File retention policy based on total file size
Args:
Expand All @@ -72,27 +80,34 @@ def __init__(self, low_watermark_bytes, high_watermark_bytes) -> None:
def name(self):
return "total file size policy"

def execute(self, file_infos: list) -> list:
def execute(self, file_infos: list) -> list:

def total_size(): return sum(
[finfo[1].st_size for finfo in file_infos])
def total_size():
return sum(finfo[1].st_size for finfo in file_infos)

if total_size() >= self.high_watermark_bytes:
logging.debug(
f"total ddeb size of {pretty_size(total_size())} exceeds the high watermark {pretty_size(self.high_watermark_bytes)}, starting cleanup.")
"total ddeb size of %s exceeds the high watermark %s, starting cleanup.",
pretty_size(total_size()), pretty_size(self.high_watermark_bytes)
)
# Remove files until total size is below the low watermark
while len(file_infos) > 0:
if total_size() < self.low_wm_bytes:
logging.debug(
f"total ddeb folder size is now below {pretty_size(self.low_wm_bytes)} low watermark, stopping cleanup.")
"total ddeb folder size is now below %s low watermark, stopping cleanup.",
pretty_size(self.low_wm_bytes)
)
break
ddeb_info = file_infos.pop()
self.remove_file(ddeb_info)

return file_infos

def __str__(self):
return f"{self.name}: {self.low_wm_bytes}:{self.high_watermark_bytes}"

class rpolicy_age(retention_policy_base):

class RPAge(RetentionPolicyBase):
"""File retention policy based on file age
Args:
Expand All @@ -106,16 +121,20 @@ def __init__(self, max_age_secs) -> None:
def name(self):
return "age policy"

def execute(self, file_infos: list) -> list:
to_remove = [v for v in file_infos if (
time.time() - v[1].st_atime) >= self.max_age_secs]
def execute(self, file_infos: list) -> list:
to_remove = [
v for v in file_infos if (time.time() - v[1].st_atime) >= self.max_age_secs
]
for file_info in to_remove:
self.remove_file(file_info)

return list(filter(lambda i: i not in to_remove, file_infos))
return [x for x in file_infos if x not in to_remove]

def __str__(self):
return f"{self.name}: {self.max_age_secs}"


class rpolicy_no_criteria(retention_policy_base):
class RPNoCriteria(RetentionPolicyBase):
"""File retention policy without a criteria.
Removes all supplied files.
Expand All @@ -127,38 +146,86 @@ class rpolicy_no_criteria(retention_policy_base):
def name(self):
return "no criteria policy"

def execute(self, file_infos: list) -> list:
def execute(self, file_infos: list) -> list:
for file_info in file_infos:
self.remove_file(file_info)
return []


class folder_retention_manager(object):
@dataclass
class FolderRetentionManagerSettings():
"""Settings for folder retention manager."""
enabled: bool
size_hwm: int
size_lwm: int
max_age_secs: int
max_count: int

@property
def _size_enabled(self):
return self.size_hwm and self.size_lwm

def validate_sanity(self):
"""Check whether option values are not contradicting and sane."""
if self._size_enabled:
if self.size_hwm < self.size_lwm:
raise ExceptionWithLog(
"ddeb high watermark cannot be less than low watermark!")


class FolderRetentionManager():
"""Policy-based folder retention manager.
"""

def __init__(self, folder_paths, filter_function) -> None:
self.folder_paths = folder_paths
self.filter_function = filter_function
self.policies = list()
self.policies = []
self.files = self._gather_files()

def load_policies_from_settings(self, settings: FolderRetentionManagerSettings):
"""Load policies depending on setting values."""

if not settings.enabled:
self.policies = []
self.add_policy(RPNoCriteria())
return

if settings.max_count:
self.add_policy(RPTotalFileCount(settings.max_count))
if settings.max_age_secs:
self.add_policy(RPAge(settings.max_age_secs))
# if not self.ddeb_retention_enabled:
if settings.size_hwm and settings.size_lwm:
self.add_policy(RPTotalFileSize(settings.size_lwm, settings.size_hwm))

def _gather_files(self):
files = []
for root in self.folder_paths:
files += [(f"{root}/{file}", os.stat(f"{root}/{file}"))
for file in os.listdir(root) if self.filter_function(file)]
files += [
(f"{root}/{file}", os.stat(f"{root}/{file}"))
for file in os.listdir(root)
if self.filter_function(file)
]
# Sort ddebs by their last access time
files.sort(key=lambda f: f[1].st_atime, reverse=True)

logging.debug("gather_files: %s", str(files))
return files

def add_policy(self, policy):
"""Add a policy to influence folder retention."""
self.policies.append(policy)

def execute_policies(self):
"""Execute all available retention policies over the folder paths."""
files = self.files

for policy in self.policies:
logging.debug("executing policy `%s`", policy)
files = policy.execute(files)
logging.debug(
f"postrun-aftermath: ddeb folder final size {pretty_size(sum([finfo[1].st_size for finfo in files]))}, {len(files)} cached ddebs remain.")
"postrun-aftermath: ddeb folder final size %s, %d cached ddebs remain.",
pretty_size(sum(finfo[1].st_size for finfo in files)), len(files)
)

return list(filter(lambda i: i not in files, self.files))
return [x for x in self.files if x not in files]
Loading

0 comments on commit 6b56d64

Please sign in to comment.