Skip to content

Commit

Permalink
feat(utils): implement symlink sandbox
Browse files Browse the repository at this point in the history
What/why: implement detection/removal of unsafe symlinks in repos, specifically
covering cachi2 use case: Cachito already does this

How:

- copypasta `_enforce_sandbox()` and related unit tests from Cachito
  ("cachito/cachito/workers/tasks/general.py" and
   "cachito/tests/test_workers/test_tasks/test_general.py", respectively)
- add call to `_enforce_sandbox()`
- add CLI boolean arg `remove-unsafe-symlinks`, which toggles removing all
  symlinks which point to location(s) outside of any cloned repository

Signed-off-by: Ben Alkov <ben.alkov@redhat.com>

rh-pre-commit.version: 2.3.2
rh-pre-commit.check-secrets: ENABLED

Signed-off-by: Ben Alkov <ben.alkov@redhat.com>
  • Loading branch information
ben-alkov committed Dec 6, 2024
1 parent b592101 commit 25b41cc
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 4 deletions.
45 changes: 45 additions & 0 deletions osbs/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import logging
import os
import os.path
from pathlib import Path
import random
import re
import shutil
Expand Down Expand Up @@ -69,6 +70,50 @@ def __repr__(self):
return self.uri


def _enforce_sandbox(repo_root: str, *, remove_unsafe_symlinks: bool) -> None:
"""
Check that there are no symlinks that try to leave the cloned repository.
:param (str | Path) repo_root: absolute path to root of cloned repository
:raises OsbsValidationException: if any symlink points outside of cloned repository
"""
for dirpath, subdirs, files in os.walk(repo_root):
dirpath = Path(dirpath)

for entry in subdirs + files:
# the logic in here actually *requires* f-strings with `!r`. using
# `%r` DOES NOT WORK (tested)
# pylint: disable=logging-fstring-interpolation

# apparently pylint doesn't understand Path
full_path = dirpath / entry # pylint: disable=old-division

try:
real_path = full_path.resolve()
except RuntimeError as e:
if "Symlink loop from " in str(e):
logger.info(f"Symlink loop from {full_path!r}")
continue
logger.exception("RuntimeError encountered")
raise

try:
real_path.relative_to(repo_root)
except ValueError:
# Unlike the real path, the full path is always relative to the root
relative_path = str(full_path.relative_to(repo_root))
if remove_unsafe_symlinks:
full_path.unlink()
logger.warning(
f"The destination of {relative_path!r} is outside of cloned repository. "
"Removing...",
)
else:
raise OsbsValidationException(
f"The destination of {relative_path!r} is outside of cloned repository",
)


@contextlib.contextmanager
def checkout_git_repo(git_url, target_dir=None, commit=None, retry_times=GIT_MAX_RETRIES,
branch=None, depth=None):
Expand Down
122 changes: 118 additions & 4 deletions tests/utils/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
of the BSD license. See the LICENSE file for details.
"""
from __future__ import absolute_import
from typing import Union
from unittest import mock

from flexmock import flexmock
import os
Expand All @@ -18,14 +20,17 @@
import logging
from time import sleep
import time
from pathlib import Path
from textwrap import dedent

from osbs.constants import REPO_CONTAINER_CONFIG, USER_WARNING_LEVEL
from osbs.repo_utils import RepoInfo
from osbs.utils import (git_repo_humanish_part_from_uri, sanitize_strings_for_openshift,
make_name_from_git, get_instance_token_file_name, clone_git_repo,
get_repo_info, UserWarningsStore, ImageName, reset_git_repo)
from osbs.exceptions import OsbsException, OsbsCommitNotFound, OsbsLocallyModified
from osbs.utils import (_enforce_sandbox, git_repo_humanish_part_from_uri,
sanitize_strings_for_openshift, make_name_from_git,
get_instance_token_file_name, clone_git_repo,
get_repo_info, UserWarningsStore, ImageName, reset_git_repo,)
from osbs.exceptions import (OsbsException, OsbsCommitNotFound, OsbsLocallyModified,
OsbsValidationException,)
from tests.constants import (TEST_DOCKERFILE_GIT, TEST_DOCKERFILE_SHA1, TEST_DOCKERFILE_INIT_SHA1,
TEST_DOCKERFILE_BRANCH)
import osbs.kerberos_ccache
Expand Down Expand Up @@ -491,3 +496,112 @@ def test_store_user_warnings(logs, expected, wrong_input, caplog):

user_warnings = str(user_warnings).splitlines()
assert sorted(user_warnings) == sorted(expected)


class Symlink(str):
"""
Use this to create symlinks via write_file_tree().
The value of a Symlink instance is the target path (path to make a symlink to).
"""


def write_file_tree(tree_def: dict, rooted_at: Union[str, Path], *, exist_dirs_ok: bool = False):
"""
Write a file tree to disk.
:param tree_def: Definition of file tree, see usage for intuitive examples
:param rooted_at: Root of file tree, must be an existing directory
:param exist_dirs_ok: If True, existing directories will not cause this function to fail
"""
root = Path(rooted_at)
for entry, value in tree_def.items():
entry_path = root / entry
if isinstance(value, Symlink):
os.symlink(value, entry_path)
elif isinstance(value, str):
entry_path.write_text(value)
else:
entry_path.mkdir(exist_ok=exist_dirs_ok)
write_file_tree(value, entry_path)


@pytest.mark.parametrize(
"file_tree,bad_symlink",
[
# good
pytest.param({}, None, id="empty-no-symlink"),
pytest.param(
{"symlink_to_self": Symlink(".")},
None,
id="self-symlink-ok"),
pytest.param(
{"subdir": {"symlink_to_parent": Symlink("..")}},
None,
id="parent-symlink-ok"),
pytest.param(
{"symlink_to_subdir": Symlink("subdir/some_file"),
"subdir": {"some_file": "foo"}},
None,
id="subdir-symlink-ok"),
# bad
pytest.param(
{"symlink_to_parent": Symlink("..")},
"symlink_to_parent",
id="parent-symlink-bad"),
pytest.param(
{"symlink_to_root": Symlink("/")},
"symlink_to_root",
id="root-symlink-bad"),
pytest.param(
{"subdir": {"symlink_to_parent_parent": Symlink("../..")}},
"subdir/symlink_to_parent_parent",
id="parent-parent-symlink-bad"),
pytest.param(
{"subdir": {"symlink_to_root": Symlink("/")}},
"subdir/symlink_to_root",
id="subdir-root-symlink-bad"),
],
)
def test_enforce_sandbox(file_tree, bad_symlink, tmp_path):
write_file_tree(file_tree, tmp_path)
if bad_symlink:
error = f"The destination of {bad_symlink!r} is outside of cloned repository"
with pytest.raises(OsbsValidationException, match=error):
_enforce_sandbox(tmp_path, remove_unsafe_symlinks=False)
assert Path(tmp_path / bad_symlink).exists()
_enforce_sandbox(tmp_path, remove_unsafe_symlinks=True)
assert not Path(tmp_path / bad_symlink).exists()
else:
_enforce_sandbox(tmp_path, remove_unsafe_symlinks=False)
_enforce_sandbox(tmp_path, remove_unsafe_symlinks=True)


def test_enforce_sandbox_symlink_loop(tmp_path, caplog):
workers_logger = logging.getLogger("cachito.workers.tasks.general")
workers_logger.disabled = False
workers_logger.setLevel(logging.INFO)

file_tree = {"foo_b": Symlink("foo_a"), "foo_a": Symlink("foo_b")}
write_file_tree(file_tree, tmp_path)
_enforce_sandbox(tmp_path, remove_unsafe_symlinks=True)
assert "Symlink loop from " in caplog.text


@mock.patch("pathlib.Path.resolve")
def test_enforce_sandbox_runtime_error(mock_resolve, tmp_path):
workers_logger = logging.getLogger("cachito.workers.tasks.general")
workers_logger.disabled = False
workers_logger.setLevel(logging.INFO)

error = "RuntimeError is triggered"

def side_effect():
raise RuntimeError(error)

mock_resolve.side_effect = side_effect

file_tree = {"foo_b": Symlink("foo_a"), "foo_a": Symlink("foo_b")}
write_file_tree(file_tree, tmp_path)
with pytest.raises(RuntimeError, match=error):
_enforce_sandbox(tmp_path, remove_unsafe_symlinks=True)

0 comments on commit 25b41cc

Please sign in to comment.