From d6a96ff2d5f336d9e6f5490a4457ae968a91c7b6 Mon Sep 17 00:00:00 2001 From: Mustafa Kemal Gilor Date: Fri, 26 Jul 2024 13:02:27 +0300 Subject: [PATCH] kdump-hdr: implement kdump flattened file format support the code now properly identifies the flattened format and parses the structure correctly. the code now uses struct.unpack() to read integers from binary file. added unit tests to cover the both file formats. Signed-off-by: Mustafa Kemal Gilor --- hotkdump/core/kdumpfile.py | 224 +++++++++++++++----- tests/test_hotkdump.py | 349 +++++++++++++++++++------------- tests/test_kdump_file_header.py | 253 +++++++++++++++++------ tests/utils.py | 61 ++++-- 4 files changed, 607 insertions(+), 280 deletions(-) diff --git a/hotkdump/core/kdumpfile.py b/hotkdump/core/kdumpfile.py index 200c1c2..666eff2 100644 --- a/hotkdump/core/kdumpfile.py +++ b/hotkdump/core/kdumpfile.py @@ -13,6 +13,7 @@ import os import logging +import struct from dataclasses import dataclass, field from hotkdump.core.exceptions import NotAKernelCrashDumpException @@ -45,18 +46,18 @@ def seek_to_first_non_nul(f): f.seek(pos) @staticmethod - def read_int32(f, off=None): - """Read a 4-byte integer from given file.""" - if off: - f.seek(off, os.SEEK_SET) - return int.from_bytes(f.read(4), byteorder="little") + def read_int(f, fmt, off=None): + """Read an integer value from binary file stream.""" - @staticmethod - def read_int64(f, off=None): - """Read a 8-byte integer from given file.""" if off: f.seek(off, os.SEEK_SET) - return int.from_bytes(f.read(8), byteorder="little") + byte_cnt = struct.calcsize(fmt) + raw_bytes = f.read(byte_cnt) + + if not raw_bytes or len(raw_bytes) != byte_cnt: + return None + + return struct.unpack(fmt, raw_bytes)[0] @staticmethod def read_str(f, ln): @@ -102,6 +103,9 @@ def __post_init__(self): self.normalized_version = self.version.split("-", maxsplit=1)[0].lstrip("#") +# TO-DO: Switch to struct.pack / struct.unpack? + + @dataclass() # pylint: disable-next=too-many-instance-attributes class DiskDumpHeader: @@ -143,10 +147,10 @@ def from_fd(fd): machine=BinaryFileReader.read_cstr(fd), domain=BinaryFileReader.read_cstr(fd), ), - timestamp_sec=BinaryFileReader.read_int64(fd, timestamp_offset), - timestamp_usec=BinaryFileReader.read_int64(fd), - status=BinaryFileReader.read_int32(fd), - block_size=BinaryFileReader.read_int32(fd), + timestamp_sec=BinaryFileReader.read_int(fd, "q"), + buf_size=BinaryFileReader.read_int(fd, ">q"), + ) + + def next(self, fd): + """Seek the given file's offset to the next makedumpfile_header""" + + fd.seek(self.self_offset + self.sizeof + self.buf_size, os.SEEK_SET) + return MakeDumpFileDataHeader.from_fd(fd) + + def in_range(self, offset): + """Check whether given offset is in range of this block or not.""" + return self.offset <= offset < (self.offset + self.buf_size) + + @property + def data_offset(self): + """Offset to the beginning of the data.""" + return self.self_offset + self.sizeof + + def __bool__(self): + """Check whether this header is valid.""" + return (self.offset is not None and self.offset >= 0) and ( + self.buf_size is not None and self.buf_size >= 0 + ) + + class KdumpFile: """Helper class for parsing headers from kernel crash dumps generated with kdump. """ - # pylint: disable=too-many-instance-attributes + def parse_flattened(self, fd): + """Parse the diskdumpfile, ksubhdr and vmcoreinfo from a + flattened makedumpfile.""" + + # The first header is guaranteed to be disk_dump_header. + # Read it first. + + mdhdr = MakeDumpFileDataHeader.from_fd(fd) + assert mdhdr.buf_size == DiskDumpHeader.sizeof + self._ddhdr = DiskDumpHeader.from_fd(fd) + + # The next header is kdump_sub_header, which is located at the + # page offset. + disk_dump_header_blocks = 1 + kdump_sub_header_off = disk_dump_header_blocks * self.ddhdr.block_size + + flat_blocks = [] + mdhdr = mdhdr.next(fd) + while mdhdr: + if mdhdr.offset == kdump_sub_header_off: + logging.debug("found makedumpfile_data block for ksubhdr: %s", mdhdr) + self._ksubhdr = KdumpSubHeader.from_fd(fd) + + flat_blocks.append(mdhdr) + + if self.ksubhdr: + for flat_block in flat_blocks: + if flat_block.in_range(self.ksubhdr.offset_vmcoreinfo): + logging.debug( + "found makedumpfile_data block for vmcore: %s", flat_block + ) + fd.seek(flat_block.data_offset) + translated_offset = ( + self.ksubhdr.offset_vmcoreinfo - flat_block.offset + ) + self._vmcoreinfo = VMCoreInfo.from_fd( + fd, + flat_block.data_offset + translated_offset, + self.ksubhdr.size_vmcoreinfo, + ) + return + mdhdr = mdhdr.next(fd) + + def parse_compressed(self, fd): + """Parse the diskdumpfile, ksubhdr and vmcoreinfo from a + kdump compressed file.""" + kdump_hdr_signature = b"KDUMP " + magic = fd.peek(len(kdump_hdr_signature))[0 : len(kdump_hdr_signature)] + if magic != kdump_hdr_signature: + raise NotAKernelCrashDumpException( + f"{self.kdump_file_path} is not a kernel crash dump file" + ) + + self._ddhdr = DiskDumpHeader.from_fd(fd) + # Skip a block + disk_dump_header_blocks = 1 + offset = disk_dump_header_blocks * self.ddhdr.block_size + fd.seek(offset, os.SEEK_SET) + + self._ksubhdr = KdumpSubHeader.from_fd(fd) + self._vmcoreinfo = VMCoreInfo.from_fd( + fd, self.ksubhdr.offset_vmcoreinfo, self.ksubhdr.size_vmcoreinfo + ) + def __init__(self, kdump_file_path) -> None: """Parse kdump file header and expose them as member variables @@ -237,28 +352,29 @@ def __init__(self, kdump_file_path) -> None: Exception: If the kdump_file_path is not recognized as a kdump file """ + self.kdump_file_path = kdump_file_path + self._ddhdr = None + self._ksubhdr = None + self._vmcoreinfo = None with open(kdump_file_path, "rb") as fd: - - # Let's be more forgiving about locating - # the KDUMP signature: - blob = fd.read(1024 * 8) - expected_magic = b"KDUMP " - offset = blob.find(expected_magic) - if offset == -1: - raise NotAKernelCrashDumpException( - f"{kdump_file_path} is not a kernel crash dump file" - ) - - # Seek to the KDUMP signature offset - fd.seek(offset, os.SEEK_SET) - self._ddhdr = DiskDumpHeader.from_fd(fd) - self._ksubhdr = KdumpSubHeader.from_fd(fd, self.ddhdr.block_size) - self._vmcoreinfo = VMCoreInfo.from_fd( - fd, self.ksubhdr.offset_vmcoreinfo, self.ksubhdr.size_vmcoreinfo - ) + # First, check if it's makedumpfile signature + makedumpfile_signature = b"makedumpfile\0\0\0\0" + signature = fd.peek(len(makedumpfile_signature))[ + 0 : len(makedumpfile_signature) + ] + # First check if it's flattened format + # https://github.com/makedumpfile/makedumpfile/blob/bad2a7c4fa75d37a41578441468584963028bdda/IMPLEMENTATION#L285 + if signature == makedumpfile_signature: + logging.debug("the file is in flattened format") + # 1 page makedumpfile_header + # 1 makedumpfile_data_header (16 bytes) + fd.seek(4096) + self.parse_flattened(fd) + else: + self.parse_compressed(fd) logging.debug("kdump_hdr: %s", str(self.ddhdr)) - logging.debug("kdump_subhdr: %s", str(self.ddhdr)) + logging.debug("kdump_subhdr: %s", str(self.ksubhdr)) logging.debug("vmcore-info: %s", str(self.vmcoreinfo)) @property diff --git a/tests/test_hotkdump.py b/tests/test_hotkdump.py index 5f78bb7..590bf3d 100644 --- a/tests/test_hotkdump.py +++ b/tests/test_hotkdump.py @@ -11,22 +11,34 @@ import textwrap -from hotkdump.core.hotkdump import ( - Hotkdump, - HotkdumpParameters, - ExceptionWithLog -) +from hotkdump.core.hotkdump import Hotkdump, HotkdumpParameters, ExceptionWithLog from tests.utils import ( assert_has_no_such_calls, - mock_file, - mock_stat_obj + mock_file_ctx, + mock_stat_obj, + fill_zeros, ) mock.Mock.assert_has_no_such_calls = assert_has_no_such_calls -MOCK_HDR = b'KDUMP \x01\x02\x03\x04sys\0node\0release\0#version-443\0machine\0domain\0\0' +MOCK_HDR = fill_zeros( + b"KDUMP " # signature + + b"\x01\x02\x03\x04" # header_version + + fill_zeros(b"sys", 65) # system + + fill_zeros(b"node", 65) # node + + fill_zeros(b"release", 65) # release + + fill_zeros(b"#version-443", 65) # version + + fill_zeros(b"machine", 65) # machine + + fill_zeros(b"domain", 65) # domain + + b"\x02" * 6 # padding + + b"\x01\x00\x00\x00\x00\x00\x00\x00" # timestamp_sec + + b"\x02\x00\x00\x00\x00\x00\x00\x00" # timestamp_usec + + b"\x03\x00\x00\x00" # status + + b"\x00\x10\x00\x00", # block_size + 4096, +) + fill_zeros(b"", 4096) @mock.patch.multiple( @@ -34,22 +46,23 @@ remove=lambda x: True, listdir=lambda x: [], stat=lambda x: "a", - makedirs=lambda *a, **kw: None -) -@mock.patch.multiple( - "os.path", - dirname=lambda x: x, - realpath=lambda x: x, - exists=lambda x: True + makedirs=lambda *a, **kw: None, ) @mock.patch.multiple( - "shutil", - which=lambda x: x + "os.path", dirname=lambda x: x, realpath=lambda x: x, exists=lambda x: True ) -@mock.patch('builtins.open', mock_file(bytes=MOCK_HDR, name="name")) +@mock.patch.multiple("shutil", which=lambda x: x) +@mock.patch("builtins.open", mock_file_ctx(bytes=MOCK_HDR, name="name")) class HotkdumpTest(TestCase): """test hotkdump class public api""" + def setUp(self): + self.patcher = mock.patch('tempfile.TemporaryDirectory') + self.mock_temp_dir = self.patcher.start() + + def tearDown(self): + self.patcher.stop() + def test_default_construct(self): """Default-construct the class and verify that the class variables are initialized @@ -67,9 +80,13 @@ def test_construct(self): """ params = HotkdumpParameters( - internal_case_number="1", dump_file_path="vmcore", - output_file_path="opf", log_file_path="log", - ddebs_folder_path="ddebs",interactive=True) + internal_case_number="1", + dump_file_path="vmcore", + output_file_path="opf", + log_file_path="log", + ddebs_folder_path="ddebs", + interactive=True, + ) uut = Hotkdump(params) self.assertEqual(uut.params.internal_case_number, "1") self.assertEqual(uut.params.dump_file_path, "vmcore") @@ -92,12 +109,12 @@ def test_arch(self): uut.kdump_file.ddhdr.utsname.machine = "invalid" self.assertRaises(NotImplementedError, uut.get_architecture) - @mock.patch('builtins.open', mock_file(bytes=MOCK_HDR, name="name")) + @mock.patch("builtins.open", mock_file_ctx(bytes=MOCK_HDR, name="name")) def test_kdump_hdr(self): """Test if the kdump_header has the correct values included - in the MOCK_HDR after opening the fake vmcore file + in the MOCK_HDR after opening the fake vmcore file """ - params= HotkdumpParameters(dump_file_path="empty") + params = HotkdumpParameters(dump_file_path="empty") uut = Hotkdump(params) self.assertEqual(uut.kdump_file.ddhdr.header_version, 67305985) self.assertEqual(uut.kdump_file.ddhdr.utsname.domain, "domain") @@ -112,12 +129,14 @@ def test_find_crash_executable_symlink_exists(self): """Verify that the hotkdump uses the crash symlink on the root folder if exists. """ - params= HotkdumpParameters(dump_file_path="empty") + params = HotkdumpParameters(dump_file_path="empty") uut = Hotkdump(params) - with mock.patch.multiple("os.path", - dirname=lambda *a, **kw: "/root/dir", - realpath=lambda *a, **kw: "rp", - exists=lambda *a, **kw: True) as _: + with mock.patch.multiple( + "os.path", + dirname=lambda *a, **kw: "/root/dir", + realpath=lambda *a, **kw: "rp", + exists=lambda *a, **kw: True, + ) as _: value = uut.find_crash_executable() self.assertEqual(value, "/root/dir/../crash") @@ -125,12 +144,14 @@ def test_find_crash_executable_nosymlink_but_path_exists(self): """Verify that the hotkdump uses the crash from PATH when the root-dir `crash` symlink does not exists. """ - params= HotkdumpParameters(dump_file_path="empty") + params = HotkdumpParameters(dump_file_path="empty") uut = Hotkdump(params) - with mock.patch.multiple("os.path", - dirname=lambda *a, **kw: "/root/dir", - realpath=lambda *a, **kw: "rp", - exists=lambda *a, **kw: False): + with mock.patch.multiple( + "os.path", + dirname=lambda *a, **kw: "/root/dir", + realpath=lambda *a, **kw: "rp", + exists=lambda *a, **kw: False, + ): with mock.patch("shutil.which", lambda *a, **kw: "/usr/mybin/crash"): value = uut.find_crash_executable() self.assertEqual(value, "/usr/mybin/crash") @@ -139,24 +160,26 @@ def test_find_crash_executable_notfound(self): """Verify that the hotkdump raises an exception when crash executable could not be found. """ - params= HotkdumpParameters(dump_file_path="empty") + params = HotkdumpParameters(dump_file_path="empty") uut = Hotkdump(params) - with mock.patch.multiple("os.path", - dirname=lambda *a, **kw: "/root/dir", - realpath=lambda *a, **kw: "rp", - exists=lambda *a, **kw: False): + with mock.patch.multiple( + "os.path", + dirname=lambda *a, **kw: "/root/dir", + realpath=lambda *a, **kw: "rp", + exists=lambda *a, **kw: False, + ): with mock.patch("shutil.which", lambda *a, **kw: None): - self.assertRaises(ExceptionWithLog, - uut.find_crash_executable) + self.assertRaises(ExceptionWithLog, uut.find_crash_executable) def test_write_crash_commands_file(self): """Verify that the hotkdump `write_crash_commands_file` writes the correct commands file. """ - params= HotkdumpParameters(dump_file_path="empty", output_file_path="hkd.test") + params = HotkdumpParameters(dump_file_path="empty", output_file_path="hkd.test") uut = Hotkdump(params) uut.temp_working_dir.name = "/tmpdir" - expected_output = textwrap.dedent(r""" + expected_output = textwrap.dedent( + r""" !echo "---------------------------------------" >> hkd.test !echo "Output of 'sys'" >> hkd.test !echo "---------------------------------------" >> hkd.test @@ -207,17 +230,18 @@ def test_write_crash_commands_file(self): ps -m | grep UN | tail -n1 | grep -oE "PID: [0-9]+" | grep -oE "[0-9]+" | awk '{print "bt " $1 " >> hkd.test"}' >> /tmpdir/crash_commands !echo "\nquit >> hkd.test" >> /tmpdir/crash_commands !echo "" >> hkd.test - """).strip() + """ + ).strip() with mock.patch("builtins.open", new_callable=mock.mock_open()) as mo: contents = None def update_contents(c): nonlocal contents contents = c + mo.return_value.__enter__.return_value.write = update_contents mo.return_value.__enter__.return_value.name = "/tmpdir/crash_commands" - self.assertEqual("/tmpdir/crash_commands", - uut.write_crash_commands_file()) + self.assertEqual("/tmpdir/crash_commands", uut.write_crash_commands_file()) mo.assert_called_with("/tmpdir/crash_commands", "w", encoding="utf-8") self.assertEqual(contents, expected_output) @@ -225,12 +249,11 @@ def test_exec(self): """Verify that the hotkdump calls the subprocess.Popen with the correct arguments. """ - params= HotkdumpParameters(dump_file_path="empty") + params = HotkdumpParameters(dump_file_path="empty") uut = Hotkdump(params) with mock.patch("subprocess.Popen", mock.MagicMock()) as p: uut.exec("a", "args", "wd") - p.assert_called_once_with( - "a args", shell=True, cwd="wd") + p.assert_called_once_with("a args", shell=True, cwd="wd") def test_switch_cwd(self): """To be implemented later""" @@ -254,7 +277,6 @@ def test_strip_tags(self): ("5.4.0-146-gke", "5.4.0-146"), ("5.4.0-146-snapdragon", "5.4.0-146"), ("5.4.0-146-raspi2", "5.4.0-146"), - # Tags with version-specific suffix ("5.4.0-146-generic-hwe-16.04", "5.4.0-146"), ("5.4.0-146-generic-hwe-18.04", "5.4.0-146"), @@ -266,7 +288,6 @@ def test_strip_tags(self): ("5.4.0-146-lowlatency-hwe-20.04", "5.4.0-146"), ("5.4.0-146-lowlatency-hwe-22.04", "5.4.0-146"), ("5.4.0-146-lowlatency-hwe-24.04", "5.4.0-146"), - # Tags with version-specific suffix and '-edge' suffix ("5.4.0-146-generic-hwe-16.04-edge", "5.4.0-146"), ("5.4.0-146-generic-hwe-18.04-edge", "5.4.0-146"), @@ -284,28 +305,31 @@ def test_strip_tags(self): test_cases_invalid = [ ("5.4.0-146-generic-hwe-20.04----edge", "5.4.0-146"), ("5.4.0-146_generic-hwe-20.04_edge", "5.4.0-146"), - ("5.4.0-146aws", "5.4.0-146aws"), ("5.4.0-146_azure", "5.4.0-146_azure"), - ("5.4.0-146-generic-hwe-21.04", "5.4.0-146-generic-hwe-21.04") + ("5.4.0-146-generic-hwe-21.04", "5.4.0-146-generic-hwe-21.04"), ] - params= HotkdumpParameters(dump_file_path="empty") + params = HotkdumpParameters(dump_file_path="empty") uut = Hotkdump(params) for input_str, expected_output in test_cases_valid: with self.subTest(input_str=input_str): - self.assertEqual(uut.strip_release_variant_tags( - input_str), expected_output) + self.assertEqual( + uut.strip_release_variant_tags(input_str), expected_output + ) for input_str, expected_output in test_cases_invalid: with self.subTest(input_str=input_str): - self.assertRaises(ExceptionWithLog, - uut.strip_release_variant_tags, input_str) + self.assertRaises( + ExceptionWithLog, uut.strip_release_variant_tags, input_str + ) @mock.patch("os.utime") @mock.patch("hotkdump.core.hotkdump.PullPkg") @mock.patch("hotkdump.core.hotkdump.switch_cwd") - def test_maybe_download_vmlinux_ddeb(self,mock_switch_cwd, mock_pullpkg, mock_utime): + def test_maybe_download_vmlinux_ddeb( + self, mock_switch_cwd, mock_pullpkg, mock_utime + ): """Verify that the hotkdump: - calls the PullPkg when the ddeb is absent - does not call the PullPkg when the ddeb is present @@ -315,22 +339,24 @@ def test_maybe_download_vmlinux_ddeb(self,mock_switch_cwd, mock_pullpkg, mock_ut mock_pull = mock.MagicMock() mock_pullpkg.return_value.pull = mock_pull - switch_cwd = mock.MagicMock() mock_switch_cwd.return_value = switch_cwd # mock_pull.return_value.pull - params= HotkdumpParameters(dump_file_path="empty") + params = HotkdumpParameters(dump_file_path="empty") uut = Hotkdump(params) uut.kdump_file.ddhdr.utsname.release = "5.15.0-1030-gcp" uut.kdump_file.ddhdr.utsname.machine = "x86_64" - uut.kdump_file.ddhdr.utsname.version = "#37-Ubuntu SMP Tue Feb 14 19:37:08 UTC 2023" + uut.kdump_file.ddhdr.utsname.version = ( + "#37-Ubuntu SMP Tue Feb 14 19:37:08 UTC 2023" + ) uut.kdump_file.ddhdr.utsname.normalized_version = "37" - # Test downloading a new ddeb file - expected_ddeb_path = "linux-image-unsigned-5.15.0-1030-gcp-dbgsym_5.15.0-1030.37_amd64.ddeb" + expected_ddeb_path = ( + "linux-image-unsigned-5.15.0-1030-gcp-dbgsym_5.15.0-1030.37_amd64.ddeb" + ) with mock.patch("os.path.exists") as mock_exists: mock_exists.side_effect = [False, True] @@ -340,8 +366,17 @@ def test_maybe_download_vmlinux_ddeb(self,mock_switch_cwd, mock_pullpkg, mock_ut # Assert that pullpkg was invoked with the correct arguments mock_pull.assert_called_once_with( - ["--distro", "ubuntu", "--arch", "amd64", "--pull", - "ddebs", "linux-image-unsigned-5.15.0-1030-gcp", "5.15.0-1030.37"]) + [ + "--distro", + "ubuntu", + "--arch", + "amd64", + "--pull", + "ddebs", + "linux-image-unsigned-5.15.0-1030-gcp", + "5.15.0-1030.37", + ] + ) # Assert that the expected ddeb file path was returned self.assertEqual(result, expected_ddeb_path) @@ -360,7 +395,8 @@ def test_maybe_download_vmlinux_ddeb(self,mock_switch_cwd, mock_pullpkg, mock_ut # # Assert that the file's last access time was updated mock_utime.assert_called_once_with( - expected_ddeb_path, (1234567890.0, 1234567890.0)) + expected_ddeb_path, (1234567890.0, 1234567890.0) + ) # Assert that the expected ddeb file path was returned self.assertEqual(result, expected_ddeb_path) @@ -377,14 +413,18 @@ def test_post_run_ddeb_count_policy(self): count policy after execution as per configured. """ - with mock.patch( - "os.remove") as mock_remove, mock.patch( - "os.listdir") as mock_listdir, mock.patch( - "os.stat") as mock_stat, mock.patch( - "time.time") as mock_time: + with mock.patch("os.remove") as mock_remove, mock.patch( + "os.listdir" + ) as mock_listdir, mock.patch("os.stat") as mock_stat, mock.patch( + "time.time" + ) as mock_time: mock_time.return_value = 1234567890.0 mock_listdir.return_value = [ - 'file1.ddeb', 'file2.ddeb', 'file3.txt', 'file4.ddeb'] + "file1.ddeb", + "file2.ddeb", + "file3.txt", + "file4.ddeb", + ] mock_stat.return_value.st_atime = 3600 mock_stat.return_value.st_size = 500000 @@ -402,16 +442,16 @@ def test_post_run_ddeb_count_policy(self): uut.post_run() # Check if the function has removed the ddebs correctly - mock_stat.assert_has_calls([ - mock.call('/path/to/ddebs/file1.ddeb'), - mock.call('/path/to/ddebs/file2.ddeb') - ]) + mock_stat.assert_has_calls( + [ + mock.call("/path/to/ddebs/file1.ddeb"), + mock.call("/path/to/ddebs/file2.ddeb"), + ] + ) - mock_listdir.assert_called_once_with('/path/to/ddebs') + mock_listdir.assert_called_once_with("/path/to/ddebs") - expected_calls = [ - mock.call('/path/to/ddebs/file4.ddeb') - ] + expected_calls = [mock.call("/path/to/ddebs/file4.ddeb")] mock_remove.assert_has_calls(expected_calls) # Now bump the limit to 3, and re-test @@ -439,23 +479,35 @@ def test_post_run_ddeb_age_policy(self): """Verify that the hotkdump executes the file age policy after execution as per configured. """ - - with mock.patch( - "os.remove") as mock_remove, mock.patch( - "os.listdir") as mock_listdir, mock.patch( - "os.stat") as mock_stat, mock.patch( - "time.time") as mock_time: + with mock.patch("os.remove") as mock_remove, mock.patch( + "os.listdir" + ) as mock_listdir, mock.patch("os.stat") as mock_stat, mock.patch( + "time.time" + ) as mock_time: mock_time.return_value = 1234567890.0 mock_listdir.return_value = [ - 'file1.ddeb', 'file2.ddeb', 'file3.txt', 'file4.ddeb'] - - mock_stat.side_effect = lambda fname: mock_stat_obj(fname, { - "/path/to/ddebs/file1.ddeb": {"atime": 1234567890.0 + 1, "size": 3150}, - "/path/to/ddebs/file2.ddeb": {"atime": 1234567890.0 + 1, "size": 3150}, - "/path/to/ddebs/file4.ddeb": {"atime": 0, "size": 3150}, - # 50000 bytes in total - }) + "file1.ddeb", + "file2.ddeb", + "file3.txt", + "file4.ddeb", + ] + + mock_stat.side_effect = lambda fname: mock_stat_obj( + fname, + { + "/path/to/ddebs/file1.ddeb": { + "atime": 1234567890.0 + 1, + "size": 3150, + }, + "/path/to/ddebs/file2.ddeb": { + "atime": 1234567890.0 + 1, + "size": 3150, + }, + "/path/to/ddebs/file4.ddeb": {"atime": 0, "size": 3150}, + # 50000 bytes in total + }, + ) mock_listdir.reset_mock() # Set up test data @@ -469,16 +521,14 @@ def test_post_run_ddeb_age_policy(self): uut = Hotkdump(params) uut.post_run() - mock_listdir.assert_called_once_with('/path/to/ddebs') + mock_listdir.assert_called_once_with("/path/to/ddebs") - expected_calls = [ - mock.call('/path/to/ddebs/file4.ddeb') - ] + expected_calls = [mock.call("/path/to/ddebs/file4.ddeb")] not_expected_calls = [ - mock.call('/path/to/ddebs/file1.ddeb'), - mock.call('/path/to/ddebs/file2.ddeb'), - mock.call('/path/to/ddebs/file3.txt') + mock.call("/path/to/ddebs/file1.ddeb"), + mock.call("/path/to/ddebs/file2.ddeb"), + mock.call("/path/to/ddebs/file3.txt"), ] mock_remove.assert_has_calls(expected_calls, any_order=True) @@ -489,22 +539,28 @@ def test_post_run_ddeb_total_size_policy(self): total size policy after execution as per configured. """ - - with mock.patch( - "os.remove") as mock_remove, mock.patch( - "os.listdir") as mock_listdir, mock.patch( - "os.stat") as mock_stat, mock.patch( - "time.time") as mock_time: + with mock.patch("os.remove") as mock_remove, mock.patch( + "os.listdir" + ) as mock_listdir, mock.patch("os.stat") as mock_stat, mock.patch( + "time.time" + ) as mock_time: mock_time.return_value = 1234567890.0 mock_listdir.return_value = [ - 'file1.ddeb', 'file2.ddeb', 'file3.txt', 'file4.ddeb'] + "file1.ddeb", + "file2.ddeb", + "file3.txt", + "file4.ddeb", + ] - mock_stat.side_effect = lambda fname: mock_stat_obj(fname, { - "/path/to/ddebs/file1.ddeb": {"atime": 0, "size": 15000}, - "/path/to/ddebs/file2.ddeb": {"atime": 1, "size": 15000}, - "/path/to/ddebs/file4.ddeb": {"atime": 2, "size": 20000}, - # 50000 bytes in total - }) + mock_stat.side_effect = lambda fname: mock_stat_obj( + fname, + { + "/path/to/ddebs/file1.ddeb": {"atime": 0, "size": 15000}, + "/path/to/ddebs/file2.ddeb": {"atime": 1, "size": 15000}, + "/path/to/ddebs/file4.ddeb": {"atime": 2, "size": 20000}, + # 50000 bytes in total + }, + ) # Set up test data params = HotkdumpParameters(dump_file_path="empty") @@ -517,7 +573,7 @@ def test_post_run_ddeb_total_size_policy(self): uut = Hotkdump(params) uut.post_run() - mock_listdir.assert_called_once_with('/path/to/ddebs') + mock_listdir.assert_called_once_with("/path/to/ddebs") # file1, file2 and file4 are in total 50000 bytes in size, which exceeds # the high watermark. the algorithm will start removing ddebs one by one, @@ -526,36 +582,22 @@ def test_post_run_ddeb_total_size_policy(self): # removed while file4 is untouched. expected_calls = [ - mock.call('/path/to/ddebs/file1.ddeb'), - mock.call('/path/to/ddebs/file2.ddeb') + mock.call("/path/to/ddebs/file1.ddeb"), + mock.call("/path/to/ddebs/file2.ddeb"), ] - not_expected_calls = [ - mock.call('/path/to/ddebs/file4.ddeb') - ] + not_expected_calls = [mock.call("/path/to/ddebs/file4.ddeb")] mock_remove.assert_has_calls(expected_calls, any_order=True) mock_remove.assert_has_no_such_calls(not_expected_calls) - def test_post_run_ddeb_retention_disabled(self): + def test_post_run_ddeb_retention_disabled( + self, + ): """Verify that the hotkdump removes the ddeb files post-run when the file retention is disabled. """ - - with mock.patch( - "os.remove") as mock_remove, mock.patch( - "os.listdir") as mock_listdir, mock.patch( - "os.stat") as mock_stat, mock.patch( - "time.time") as mock_time: - mock_time.return_value = 1234567890.0 - mock_listdir.return_value = [ - 'file1.ddeb', 'file2.ddeb', 'file3.txt', 'file4.ddeb'] - mock_stat.side_effect = lambda fname: mock_stat_obj(fname, { - "/path/to/ddebs/file1.ddeb": {"atime": 0, "size": 15000}, - "/path/to/ddebs/file2.ddeb": {"atime": 1, "size": 15000}, - "/path/to/ddebs/file4.ddeb": {"atime": 2, "size": 20000}, - # 50000 bytes in total - }) + with mock.patch("os.remove") as mock_remove: params = HotkdumpParameters(dump_file_path="empty") params.ddebs_folder_path = "/path/to/ddebs" @@ -565,11 +607,32 @@ def test_post_run_ddeb_retention_disabled(self): params.ddeb_retention_settings.size_hwm = 1 params.ddeb_retention_settings.max_count = 1 uut = Hotkdump(params) - uut.post_run() - mock_listdir.assert_called_once_with('/path/to/ddebs') + + with mock.patch("os.listdir") as mock_listdir, mock.patch( + "os.stat" + ) as mock_stat, mock.patch("time.time") as mock_time: + mock_time.return_value = 1234567890.0 + mock_listdir.return_value = [ + "file1.ddeb", + "file2.ddeb", + "file3.txt", + "file4.ddeb", + ] + mock_stat.side_effect = lambda fname, *args, **kwargs: mock_stat_obj( + fname, + { + "/path/to/ddebs/file1.ddeb": {"atime": 0, "size": 15000}, + "/path/to/ddebs/file2.ddeb": {"atime": 1, "size": 15000}, + "/path/to/ddebs/file4.ddeb": {"atime": 2, "size": 20000}, + # 50000 bytes in total + }, + ) + uut.post_run() + + mock_listdir.assert_called_once_with("/path/to/ddebs") expected_calls = [ - mock.call('/path/to/ddebs/file1.ddeb'), - mock.call('/path/to/ddebs/file2.ddeb'), - mock.call('/path/to/ddebs/file4.ddeb') + mock.call("/path/to/ddebs/file1.ddeb"), + mock.call("/path/to/ddebs/file2.ddeb"), + mock.call("/path/to/ddebs/file4.ddeb"), ] - mock_remove.assert_has_calls(expected_calls, any_order=True) + mock_remove.assert_has_calls(expected_calls, any_order=True) diff --git a/tests/test_kdump_file_header.py b/tests/test_kdump_file_header.py index bfa116c..c686e99 100644 --- a/tests/test_kdump_file_header.py +++ b/tests/test_kdump_file_header.py @@ -10,6 +10,7 @@ import os from unittest import mock, TestCase from io import BytesIO +import struct from hotkdump.core.exceptions import ExceptionWithLog @@ -19,20 +20,30 @@ DiskDumpHeader, BinaryFileReader, KdumpSubHeader, + NotAKernelCrashDumpException, ) from hotkdump.core.hotkdump import Hotkdump, HotkdumpParameters -from tests.utils import mock_file +from tests.utils import mock_file_ctx, fill_zeros + + +MOCK_HDR = fill_zeros( + b"KDUMP " # signature + + b"\x01\x02\x03\x04" # header_version + + fill_zeros(b"sys", 65) # system + + fill_zeros(b"node", 65) # node + + fill_zeros(b"release", 65) # release + + fill_zeros(b"#version-443", 65) # version + + fill_zeros(b"machine", 65) # machine + + fill_zeros(b"domain", 65) # domain + + b"\x02" * 6 # padding + + b"\x01\x00\x00\x00\x00\x00\x00\x00" # timestamp_sec + + b"\x02\x00\x00\x00\x00\x00\x00\x00" # timestamp_usec + + b"\x03\x00\x00\x00" # status + + b"\x00\x10\x00\x00", # block_size + 4096, +) + fill_zeros(b"", 4096) - -def fill_zeros(by, n): - by += b"\0" * (n - len(by)) - return by - - -MOCK_HDR = ( - b"KDUMP \x01\x02\x03\x04sys\0node\0release\0#version-443\0machine\0domain\0\0" -) MOCK_HDR_INVALID_NO_SIG = os.urandom(4096) MOCK_VMCOREINFO = b"""key=value this is a key=value value @@ -52,13 +63,21 @@ def test_seek_to_first_non_nul(self): BinaryFileReader.seek_to_first_non_nul(fake_file) self.assertEqual(fake_file.read(3), b"ABC") - def test_read_int32(self): + def test_read_int32_le(self): fake_file = BytesIO(b"\x01\x00\x00\x00") - self.assertEqual(BinaryFileReader.read_int32(fake_file), 1) + self.assertEqual(BinaryFileReader.read_int(fake_file, "i"), 1) + + def test_read_int64_be(self): + fake_file = BytesIO(b"\x00\x00\x00\x00\x00\x00\x00\x01") + self.assertEqual(BinaryFileReader.read_int(fake_file, ">q"), 1) def test_read_str(self): fake_file = BytesIO(b"Hello\x00\x00") @@ -84,7 +103,7 @@ def test_read_cstr(self): class KdumpDiskDumpHeaderTest(TestCase): """kdump header parsing tests""" - @mock.patch("builtins.open", mock_file(bytes=MOCK_HDR, name="name")) + @mock.patch("builtins.open", mock_file_ctx(bytes=MOCK_HDR, name="name")) def test_kdump_hdr(self): """Test kdump file header parsing with a correct header. @@ -118,22 +137,23 @@ def test_from_bytes_io(self, mfile): + b"\x03\x00\x00\x00" # status + b"\x04\x00\x00\x00" # block_size ) - fake_file = BytesIO(fake_file_content) + fake_file = mock_file_ctx(fake_file_content, "test") mfile.return_value = fake_file - header = DiskDumpHeader.from_fd(fake_file) - self.assertEqual(header.signature, "KDUMP ") - self.assertEqual(header.header_version, 1) - self.assertEqual(header.utsname.system, "Linux") - self.assertEqual(header.utsname.node, "node") - self.assertEqual(header.utsname.release, "release") - self.assertEqual(header.utsname.version, "version") - self.assertEqual(header.utsname.machine, "machine") - self.assertEqual(header.utsname.domain, "domain") - self.assertEqual(header.timestamp_sec, 1) - self.assertEqual(header.timestamp_usec, 2) - self.assertEqual(header.status, 3) - self.assertEqual(header.block_size, 4) + with fake_file as fd: + header = DiskDumpHeader.from_fd(fd) + self.assertEqual(header.signature, "KDUMP ") + self.assertEqual(header.header_version, 1) + self.assertEqual(header.utsname.system, "Linux") + self.assertEqual(header.utsname.node, "node") + self.assertEqual(header.utsname.release, "release") + self.assertEqual(header.utsname.version, "version") + self.assertEqual(header.utsname.machine, "machine") + self.assertEqual(header.utsname.domain, "domain") + self.assertEqual(header.timestamp_sec, 1) + self.assertEqual(header.timestamp_usec, 2) + self.assertEqual(header.status, 3) + self.assertEqual(header.block_size, 4) class TestKdumpSubHeader(TestCase): @@ -161,7 +181,7 @@ def test_from_fd(self, mfile): fake_file = BytesIO(self.fake_file_content) mfile.return_value = fake_file - sub_header = KdumpSubHeader.from_fd(fake_file, 0) + sub_header = KdumpSubHeader.from_fd(fake_file) self.assertEqual(sub_header.phys_base, 1) self.assertEqual(sub_header.dump_level, 2) self.assertEqual(sub_header.split, 3) @@ -193,7 +213,7 @@ def test_vmcoreinfo_parse(self): self.assertEqual(v.get("TEST(ABCD)"), "EFGHI") self.assertEqual(v.get("KEY"), "VALUE") - @mock.patch("builtins.open", mock_file(bytes=MOCK_VMCOREINFO, name="name")) + @mock.patch("builtins.open", mock_file_ctx(bytes=MOCK_VMCOREINFO, name="name")) def test_vmcoreinfo_from_file(self): """Check if VMCoreInfo class can read from a file.""" with open("a", "rb") as f: @@ -207,17 +227,20 @@ class TestKdumpFile(TestCase): """Unit tests for KDumpFile class.""" @mock.patch("builtins.open", new_callable=mock.mock_open, read_data=b"") - def test_init_valid_kdump_file_shifted(self, mfile): + def test_init_valid_kdump_file_flattened(self, mfile): fake_vmcoreinfo = b"""key=value this is a key=value value $$=@@ """ fake_file_content = ( - fill_zeros( - os.urandom(2048) # preamble - + b"KDUMP " # signature - + b"\x01\x00\x00\x00" # header_version + fill_zeros(b"makedumpfile\0\0\0\0", 4096) + # makedumpfile_data_header + + struct.pack(">q", 0) # offset + + struct.pack(">q", 464) # size + + fill_zeros( + b"KDUMP " # signature + + struct.pack("q", 4096) # offsert + + struct.pack(">q", 4096) # size + fill_zeros( - b"\x01\x00\x00\x00\x00\x00\x00\x00" # phys_base - + b"\x02\x00\x00\x00" # dump_level - + b"\x03\x00\x00\x00" # split - + b"\x04\x00\x00\x00\x00\x00\x00\x00" # start_pfn - + b"\x05\x00\x00\x00\x00\x00\x00\x00" # end_pfn - + b"\x00\x20\x00\x00\x00\x00\x00\x00" # offset_vmcoreinfo - + b"\x2a\x00\x00\x00\x00\x00\x00\x00" # size_vmcoreinfo - + b"\x08\x00\x00\x00\x00\x00\x00\x00" # offset_note - + b"\x09\x00\x00\x00\x00\x00\x00\x00" # size_note - + b"\x0a\x00\x00\x00\x00\x00\x00\x00" # offset_eraseinfo - + b"\x0b\x00\x00\x00\x00\x00\x00\x00" # size_eraseinfo - + b"\x0c\x00\x00\x00\x00\x00\x00\x00" # start_pfn_64 - + b"\x0d\x00\x00\x00\x00\x00\x00\x00" # end_pfn_64 - + b"\x0e\x00\x00\x00\x00\x00\x00\x00", # max_mapnr_64 + struct.pack("q", 8192) # offset + + struct.pack(">q", 42) # size + fake_vmcoreinfo ) - fake_file = BytesIO(fake_file_content) + fake_file = mock_file_ctx(fake_file_content, "test") mfile.return_value = fake_file kdump_file = KdumpFile("dummy_path") self.assertIsInstance(kdump_file.ddhdr, DiskDumpHeader) + + self.assertEqual(kdump_file.ddhdr.signature, "KDUMP ") + self.assertEqual(kdump_file.ddhdr.header_version, 242526) + self.assertEqual(kdump_file.ddhdr.utsname.system, "Linux") + self.assertEqual(kdump_file.ddhdr.utsname.node, "node") + self.assertEqual(kdump_file.ddhdr.utsname.release, "release") + self.assertEqual(kdump_file.ddhdr.utsname.version, "version") + self.assertEqual(kdump_file.ddhdr.utsname.machine, "machine") + self.assertEqual(kdump_file.ddhdr.utsname.domain, "domain") + self.assertEqual(kdump_file.ddhdr.timestamp_sec, 1234) + self.assertEqual(kdump_file.ddhdr.timestamp_usec, 5678) + self.assertEqual(kdump_file.ddhdr.status, 91011) + self.assertEqual(kdump_file.ddhdr.block_size, 4096) + self.assertIsInstance(kdump_file.ksubhdr, KdumpSubHeader) + self.assertEqual(kdump_file.ksubhdr.phys_base, 1111) + self.assertEqual(kdump_file.ksubhdr.dump_level, 2222) + self.assertEqual(kdump_file.ksubhdr.split, 3333) + self.assertEqual(kdump_file.ksubhdr.start_pfn, 4444) + self.assertEqual(kdump_file.ksubhdr.end_pfn, 5555) + self.assertEqual(kdump_file.ksubhdr.offset_vmcoreinfo, 8192) + self.assertEqual(kdump_file.ksubhdr.size_vmcoreinfo, 42) + self.assertEqual(kdump_file.ksubhdr.offset_note, 8) + self.assertEqual(kdump_file.ksubhdr.size_note, 9) + self.assertEqual(kdump_file.ksubhdr.offset_eraseinfo, 10) + self.assertEqual(kdump_file.ksubhdr.size_eraseinfo, 11) + self.assertEqual(kdump_file.ksubhdr.start_pfn_64, 12) + self.assertEqual(kdump_file.ksubhdr.end_pfn_64, 13) + self.assertEqual(kdump_file.ksubhdr.max_mapnr_64, 14) + self.assertIsInstance(kdump_file.vmcoreinfo, VMCoreInfo) + self.assertEqual(kdump_file.vmcoreinfo.get("key"), "value") + self.assertEqual(kdump_file.vmcoreinfo.get("this is a key"), "value value") + self.assertEqual(kdump_file.vmcoreinfo.get("$$"), "@@") + + @mock.patch("builtins.open", new_callable=mock.mock_open, read_data=b"") + def test_init_valid_kdump_file_compressed(self, mfile): + fake_vmcoreinfo = b"""key=value +this is a key=value value +$$=@@ +""" + fake_file_content = ( + # 1 page diskdump_header + fill_zeros( + b"KDUMP " # signature + + struct.pack(" None: self.bytes = bytes self.name = name + self.io = None def __enter__(self): self.init_ctx() @@ -61,4 +86,8 @@ def st_atime(self): @property def st_size(self): - return self.mock_data[self.name]["size"] \ No newline at end of file + return self.mock_data[self.name]["size"] + + @property + def st_mode(self): + return 16877