Skip to content

Commit

Permalink
Merge pull request #102 from eWaterCycle/sing-capture-output
Browse files Browse the repository at this point in the history
Adds logs() to BmiClientSingularity and BmiClientDocker
  • Loading branch information
sverhoeven authored Jul 20, 2021
2 parents 15213c4 + 6001693 commit 1882d5f
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 35 deletions.
31 changes: 11 additions & 20 deletions grpc4bmi/bmi_client_docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,7 @@
from typeguard import check_argument_types, qualified_name

from grpc4bmi.bmi_grpc_client import BmiClient


class DeadDockerContainerException(ChildProcessError):
"""
Exception for when a Docker container has died.
Args:
message (str): Human readable error message
exitcode (int): The non-zero exit code of the container
logs (str): Logs the container produced
"""
def __init__(self, message, exitcode, logs, *args):
super().__init__(message, *args)
#: Exit code of container
self.exitcode = exitcode
#: Stdout and stderr of container
self.logs = logs
from grpc4bmi.exceptions import DeadContainerException


class BmiClientDocker(BmiClient):
Expand Down Expand Up @@ -62,6 +45,7 @@ class BmiClientDocker(BmiClient):
See :py:class:`grpc4bmi.bmi_client_singularity.BmiClientSingularity` for examples using `input_dirs` and `work_dir`.
"""

def __init__(self, image: str, work_dir: str, image_port=50051, host=None,
input_dirs: Iterable[str] = tuple(),
user=os.getuid(), remove=False, delay=5,
Expand Down Expand Up @@ -99,9 +83,9 @@ def __init__(self, image: str, work_dir: str, image_port=50051, host=None,
self.container.reload()
if self.container.status == 'exited':
exitcode = self.container.attrs["State"]["ExitCode"]
logs = self.container.logs()
logs = self.logs()
msg = f'Failed to start Docker container with image {image}, Container log: {logs}'
raise DeadDockerContainerException(msg, exitcode, logs)
raise DeadContainerException(msg, exitcode, logs)

super(BmiClientDocker, self).__init__(BmiClient.create_grpc_channel(port=port, host=host), timeout=timeout)

Expand All @@ -111,3 +95,10 @@ def __del__(self):

def get_value_ref(self, var_name):
raise NotImplementedError("Cannot exchange memory references across process boundary")

def logs(self) -> str:
"""Returns complete combined stdout and stderr written by the Docker container.
"""
if hasattr(self, "container"):
return self.container.logs().decode('utf8')
return ''
65 changes: 56 additions & 9 deletions grpc4bmi/bmi_client_singularity.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,29 @@
import logging
import os
import subprocess
import time
from os.path import abspath
import subprocess
import logging
from tempfile import SpooledTemporaryFile
from typing import Iterable

import semver
from typeguard import check_argument_types, qualified_name

from grpc4bmi.bmi_grpc_client import BmiClient
from grpc4bmi.exceptions import DeadContainerException, SingularityVersionException

REQUIRED_SINGULARITY_VERSION = '>=3.6.0'
REQUIRED_SINGULARITY_VERSION = '3.6.0'


def check_singularity_version():
p = subprocess.Popen(['singularity', 'version'], stdout=subprocess.PIPE)
(stdout, _stderr) = p.communicate()
if p.returncode != 0:
raise Exception('Unable to determine singularity version')
if not semver.match(stdout.decode('utf-8').replace('_', '-'), REQUIRED_SINGULARITY_VERSION):
raise Exception(f'Wrong version of singularity found, require version {REQUIRED_SINGULARITY_VERSION}')
raise SingularityVersionException('Unable to determine singularity version')
local_version = semver.VersionInfo.parse(stdout.decode('utf-8').replace('_', '-'))
if local_version < REQUIRED_SINGULARITY_VERSION:
raise SingularityVersionException(f'Wrong version ({local_version}) of singularity found, '
f'require version {REQUIRED_SINGULARITY_VERSION}')
return True


Expand Down Expand Up @@ -76,6 +80,12 @@ class BmiClientSingularity(BmiClient):
By default will try forever to connect to gRPC server inside container.
Set to low number to escape endless wait.
capture_logs (bool): Whether to capture stdout and stderr of container .
If false then redirects output to null device never to be seen again.
If true then redirects output to temporary file which can be read with :py:func:`BmiClientSingularity.logs()`.
The temporary file gets removed when this object is deleted.
**Example 1: Config file already inside image**
MARRMoT has an `example config file <https://github.com/wknoben/MARRMoT/blob/master/BMI/Config/BMI_testcase_m01_BuffaloRiver_TN_USA.mat>`_ inside its Docker image.
Expand Down Expand Up @@ -176,7 +186,10 @@ class BmiClientSingularity(BmiClient):
del client_rhine
"""
def __init__(self, image: str, work_dir: str, input_dirs: Iterable[str] = tuple(), delay=0, timeout=None):

def __init__(self, image: str, work_dir: str, input_dirs: Iterable[str] = tuple(), delay=0, timeout=None,
capture_logs=True,
):
assert check_argument_types()
if type(input_dirs) == str:
msg = f'type of argument "input_dirs" must be collections.abc.Iterable; ' \
Expand All @@ -198,7 +211,7 @@ def __init__(self, image: str, work_dir: str, input_dirs: Iterable[str] = tuple(
raise NotADirectoryError(input_dir)
args += ["--bind", f'{input_dir}:{input_dir}:ro']
self.work_dir = abspath(work_dir)
if self.work_dir in set([abspath(d) for d in input_dirs]):
if self.work_dir in {abspath(d) for d in input_dirs}:
raise ValueError('Found work_dir equal to one of the input directories. Please drop that input dir.')
if not os.path.isdir(self.work_dir):
raise NotADirectoryError(self.work_dir)
Expand All @@ -207,14 +220,48 @@ def __init__(self, image: str, work_dir: str, input_dirs: Iterable[str] = tuple(
args += ["--pwd", self.work_dir]
args.append(image)
logging.info(f'Running {image} singularity container on port {port}')
self.container = subprocess.Popen(args, preexec_fn=os.setsid)
if capture_logs:
self.logfile = SpooledTemporaryFile(max_size=2 ** 16, # keep until 65Kb in memory if bigger write to disk
prefix='grpc4bmi-singularity-log',
mode='w+t',
encoding='utf8')
stdout = self.logfile
else:
stdout = subprocess.DEVNULL
self.container = subprocess.Popen(args, preexec_fn=os.setsid, stderr=subprocess.STDOUT, stdout=stdout)
time.sleep(delay)
returncode = self.container.poll()
if returncode is not None:
raise DeadContainerException(
f'singularity container {image} prematurely exited with code {returncode}',
returncode,
self.logs()
)
super(BmiClientSingularity, self).__init__(BmiClient.create_grpc_channel(port=port, host=host), timeout=timeout)

def __del__(self):
if hasattr(self, "container"):
self.container.terminate()
self.container.wait()
if hasattr(self, "logfile"):
# Force deletion of log file
self.logfile.close()

def get_value_ref(self, var_name):
raise NotImplementedError("Cannot exchange memory references across process boundary")

def logs(self) -> str:
"""Returns complete combined stdout and stderr written by the Singularity container.
When object was created with `log_enable=False` argument then always returns empty string.
"""
if not hasattr(self, "logfile"):
return ''

current_position = self.logfile.tell()
# Read from start
self.logfile.seek(0)
content = self.logfile.read()
# Write from last position
self.logfile.seek(current_position)
return content
21 changes: 21 additions & 0 deletions grpc4bmi/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@

class DeadContainerException(ChildProcessError):
"""
Exception for when a container has died.
Args:
message (str): Human readable error message
exitcode (int): The non-zero exit code of the container
logs (str): Logs the container produced
"""
def __init__(self, message, exitcode, logs, *args):
super().__init__(message, *args)
#: Exit code of container
self.exitcode = exitcode
#: Stdout and stderr of container
self.logs = logs


class SingularityVersionException(ValueError):
pass
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def read(fname):


setup(name="grpc4bmi",
version="0.2.12",
version="0.2.13",
author="Gijs van den Oord",
author_email="g.vandenoord@esciencecenter.nl",
description="Run your BMI implementation in a separate process and expose it as BMI-python with GRPC",
Expand All @@ -31,7 +31,7 @@ def read(fname):
"numpy",
"docker",
"basic-modeling-interface",
"semver",
"semver>=2.10.0",
"typeguard",
],
extras_require={
Expand Down
15 changes: 11 additions & 4 deletions test/test_docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
import docker
import pytest

from grpc4bmi.bmi_client_docker import BmiClientDocker, DeadDockerContainerException
from grpc4bmi.bmi_client_docker import BmiClientDocker
from grpc4bmi.exceptions import DeadContainerException

walrus_docker_image = 'ewatercycle/walrus-grpc4bmi:v0.2.0'

Expand Down Expand Up @@ -79,12 +80,12 @@ def test_workdir_absent(self, tmp_path):

def test_container_start_failure(self, exit_container, tmp_path):
expected = r"Failed to start Docker container with image"
with pytest.raises(DeadDockerContainerException, match=expected) as excinfo:
with pytest.raises(DeadContainerException, match=expected) as excinfo:
BmiClientDocker(image=exit_container, work_dir=str(tmp_path))

assert excinfo.value.exitcode == 25
assert b'my stderr' in excinfo.value.logs
assert b'my stdout' in excinfo.value.logs
assert 'my stderr' in excinfo.value.logs
assert 'my stdout' in excinfo.value.logs

def test_same_inputdir_and_workdir(self, tmp_path):
some_dir = str(tmp_path)
Expand All @@ -105,3 +106,9 @@ def test_inputdirs_as_number(self, tmp_path):
some_dir = str(tmp_path)
with pytest.raises(TypeError, match='must be collections.abc.Iterable; got int instead'):
BmiClientDocker(image=walrus_docker_image, input_dirs=42, work_dir=some_dir)

def test_logs(self, walrus_model, capfd):
logs = walrus_model.logs()

assert 'R[write to console]' in logs
assert 'R[write to console]' not in capfd.readouterr().out
27 changes: 27 additions & 0 deletions test/test_singularity.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
import subprocess
from tempfile import TemporaryDirectory
from textwrap import dedent
Expand All @@ -8,6 +9,7 @@
from nbformat.v4 import new_notebook, new_code_cell

from grpc4bmi.bmi_client_singularity import BmiClientSingularity
from grpc4bmi.exceptions import DeadContainerException
from test.conftest import write_config, write_datafile

IMAGE_NAME = "docker://ewatercycle/walrus-grpc4bmi:v0.2.0"
Expand Down Expand Up @@ -175,6 +177,31 @@ def test_inputdirs_as_number(self, tmp_path):
BmiClientSingularity(image=IMAGE_NAME, input_dirs=42, work_dir=some_dir)


class TestRedirectOutput:
EXPECTED = 'Hello from Docker!'

@pytest.fixture
def image(self):
hello_image = 'docker://hello-world'
# Cache image, first test does not use delay time to build image
os.system(f'singularity run {hello_image}')
return hello_image

def test_default(self, image, tmp_path, capfd):
with pytest.raises(DeadContainerException) as excinf:
BmiClientSingularity(image=image, work_dir=str(tmp_path), delay=2)

assert self.EXPECTED not in capfd.readouterr().out
assert self.EXPECTED in excinf.value.logs

def test_devnull(self, image, tmp_path, capfd):
with pytest.raises(DeadContainerException) as excinf:
BmiClientSingularity(image=image, work_dir=str(tmp_path), capture_logs=False, delay=2)

assert self.EXPECTED not in capfd.readouterr().out
assert self.EXPECTED not in excinf.value.logs


@pytest.fixture
def notebook(tmp_path):
tmp_path.mkdir(exist_ok=True)
Expand Down

0 comments on commit 1882d5f

Please sign in to comment.