Skip to content

Commit

Permalink
Implement identity mapping
Browse files Browse the repository at this point in the history
The main addition in this commit is the implementation and utilization of
`PosixIdentityMapper`, which wraps the Globus Identity Mapper tool.  Basic use
looks like:

```python
im = PosixIdentityMapper("path/some/conf.json", "some uuid or other id")
local_user_id = im.map_identity([{"some": "list"}, {"of": "globus ids"}])
```

The `im` object will watch the configuration file for changes (stat every few
seconds), and atomically update the configuration accordingly.

The previous route of a simple JSON file (dictionary mapping) is no more.

Also:

- update the default configuration to point to an example configuration
  document
- disallow specification of an engine for a multi-user config (where it doesn't
  make any sense)

[sc-26671]
  • Loading branch information
khk-globus committed Nov 4, 2023
1 parent 60ffcc6 commit defa6ab
Show file tree
Hide file tree
Showing 14 changed files with 782 additions and 104 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from __future__ import annotations

import inspect
import os
import pathlib
import warnings

from globus_compute_endpoint.engines import HighThroughputEngine
Expand Down Expand Up @@ -147,6 +149,7 @@ def __init__(
# Tuning info
heartbeat_period=30,
heartbeat_threshold=120,
identity_mapping_config_path: os.PathLike | None = None,
idle_heartbeats_soft=0,
idle_heartbeats_hard=5760, # Two days, divided by `heartbeat_period`
detach_endpoint=True,
Expand Down Expand Up @@ -194,6 +197,11 @@ def __init__(
self.multi_user = multi_user is True
self.force_mu_allow_same_user = force_mu_allow_same_user is True
self.mu_child_ep_grace_period_s = mu_child_ep_grace_period_s
self.identity_mapping_config_path = identity_mapping_config_path
if self.identity_mapping_config_path:
_p = pathlib.Path(self.identity_mapping_config_path)
if not _p.exists():
warnings.warn(f"Identity mapping config path not found ({_p})")

# Auth
self.allowed_functions = allowed_functions
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[
{
"comment": "For more examples, see: https://docs.globus.org/globus-connect-server/v5.4/identity-mapping-guide/",
"DATA_TYPE": "external_identity_mapping#1.0.0",
"command": ["/bin/false", "--some", "flag", "-a", "-b", "-c"]
},
{
"comment": "For more examples, see: https://docs.globus.org/globus-connect-server/v5.4/identity-mapping-guide/",
"DATA_TYPE": "expression_identity_mapping#1.0.0",
"mappings": [
{
"source": "{username}",
"match": "(.*)@example.com",
"output": "{0}"
}
]
}
]
26 changes: 22 additions & 4 deletions compute_endpoint/globus_compute_endpoint/endpoint/config/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
import typing as t
from types import ModuleType

import pydantic
from globus_compute_endpoint import engines, strategies
from parsl import addresses as parsl_addresses
from parsl import channels as parsl_channels
from parsl import launchers as parsl_launchers
from parsl import providers as parsl_providers
from pydantic import BaseModel, validator
from pydantic import BaseModel, FilePath, validator


def _validate_import(field: str, package: ModuleType):
Expand Down Expand Up @@ -94,14 +95,15 @@ class Config:


class ConfigModel(BaseConfigModel):
engine: EngineModel
engine: t.Optional[EngineModel]
display_name: t.Optional[str]
environment: t.Optional[str]
funcx_service_address: t.Optional[str]
multi_user: t.Optional[bool]
allowed_functions: t.Optional[t.List[str]]
heartbeat_period: t.Optional[int]
heartbeat_threshold: t.Optional[int]
identity_mapping_config_path: t.Optional[FilePath]
idle_heartbeats_soft: t.Optional[int]
idle_heartbeats_hard: t.Optional[int]
detach_endpoint: t.Optional[bool]
Expand All @@ -114,10 +116,26 @@ class ConfigModel(BaseConfigModel):

_validate_engine = _validate_params("engine")

@pydantic.root_validator
@classmethod
def _validate(cls, values):
is_mt = values.get("multi_user") is True

if is_mt:
msg_engine = "no engine if multi-user"
msg_identity = "multi-user requires identity_mapping_config_path"
else:
msg_engine = "missing engine"
msg_identity = "identity_mapping_config_path should not be specified"
assert is_mt is not bool(values.get("engine")), msg_engine
assert is_mt is bool(values.get("identity_mapping_config_path")), msg_identity
return values

def dict(self, *args, **kwargs):
# Slight modification is needed here since we still
# store the engine/executor in a list named executors
ret = super().dict(*args, **kwargs)
executor = ret.pop("engine")
ret["executors"] = [executor]

engine = ret.pop("engine", None)
ret["executors"] = [engine] if engine else None
return ret
25 changes: 25 additions & 0 deletions compute_endpoint/globus_compute_endpoint/endpoint/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ def user_config_schema_path(endpoint_dir: pathlib.Path) -> pathlib.Path:
def _user_environment_path(endpoint_dir: pathlib.Path) -> pathlib.Path:
return endpoint_dir / "user_environment.yaml"

@staticmethod
def _example_identity_mapping_configuration_path(
endpoint_dir: pathlib.Path,
) -> pathlib.Path:
return endpoint_dir / "example_identity_mapping_config.json"

@staticmethod
def update_config_file(
original_path: pathlib.Path,
Expand All @@ -85,6 +91,12 @@ def update_config_file(

if multi_user:
config_dict["multi_user"] = multi_user
config_dict.pop("engine", None)
config_dict["identity_mapping_config_path"] = str(
Endpoint._example_identity_mapping_configuration_path(
target_path.parent
)
)

config_text = yaml.safe_dump(config_dict)
target_path.write_text(config_text)
Expand Down Expand Up @@ -133,24 +145,33 @@ def init_endpoint_dir(
if multi_user:
# template must be readable by user-endpoint processes (see
# endpoint_manager.py)
owner_only = 0o0600
world_readable = 0o0644 & ((0o0777 - user_umask) | 0o0444)
world_executable = 0o0711 & ((0o0777 - user_umask) | 0o0111)
endpoint_dir.chmod(world_executable)

src_user_tmpl_path = package_dir / "config/user_config_template.yaml"
src_user_schem_path = package_dir / "config/user_config_schema.json"
src_user_env_path = package_dir / "config/user_environment.yaml"
src_example_idmap_path = (
package_dir / "config/example_identity_mapping_config.json"
)
dst_user_tmpl_path = Endpoint.user_config_template_path(endpoint_dir)
dst_user_schem_path = Endpoint.user_config_schema_path(endpoint_dir)
dst_user_env_path = Endpoint._user_environment_path(endpoint_dir)
dst_idmap_conf_path = (
Endpoint._example_identity_mapping_configuration_path(endpoint_dir)
)

shutil.copy(src_user_tmpl_path, dst_user_tmpl_path)
shutil.copy(src_user_schem_path, dst_user_schem_path)
shutil.copy(src_user_env_path, dst_user_env_path)
shutil.copy(src_example_idmap_path, dst_idmap_conf_path)

dst_user_tmpl_path.chmod(world_readable)
dst_user_schem_path.chmod(world_readable)
dst_user_env_path.chmod(world_readable)
dst_idmap_conf_path.chmod(owner_only)

finally:
os.umask(user_umask)
Expand All @@ -177,10 +198,14 @@ def configure_endpoint(
if multi_user:
user_conf_tmpl_path = Endpoint.user_config_template_path(conf_dir)
user_env_path = Endpoint._user_environment_path(conf_dir)
idmap_ex_conf_path = Endpoint._example_identity_mapping_configuration_path(
conf_dir
)

print(f"Created multi-user profile for endpoint named <{ep_name}>")
print(
f"\n\tConfiguration file: {config_path}\n"
f"\n\tExample identity mapping configuration: {idmap_ex_conf_path}\n"
f"\n\tUser endpoint configuration template: {user_conf_tmpl_path}"
f"\n\tUser endpoint environment variables: {user_env_path}"
"\n\nUse the `start` subcommand to run it:\n"
Expand Down
107 changes: 71 additions & 36 deletions compute_endpoint/globus_compute_endpoint/endpoint/endpoint_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import globus_compute_sdk as GC
from cachetools import TTLCache
from globus_compute_endpoint.endpoint.identity_mapper import PosixIdentityMapper
from pydantic import BaseModel

try:
Expand Down Expand Up @@ -114,6 +115,15 @@ def __init__(

endpoint_uuid = Endpoint.get_or_create_endpoint_uuid(conf_dir, endpoint_uuid)

if not config.identity_mapping_config_path:
msg = (
"No identity mapping file specified; please specify"
" identity_mapping_config_path"
)
log.error(msg)
print(msg, file=sys.stderr)
exit(os.EX_OSFILE)

if not reg_info:
try:
client_options = {
Expand Down Expand Up @@ -188,11 +198,32 @@ def __init__(
" considered a very dangerous override -- please use with care,"
" especially if allowing this endpoint to be utilized by multiple"
" users."
f"\n Endpoint (UID, GID): ({os.getuid()}, {os.getgid()}) "
f"\n Endpoint (UID, GID): ({os.getuid()}, {os.getgid()})"
)
else:
self._allow_same_user = not is_privileged(self._mu_user)

try:
self.identity_mapper = PosixIdentityMapper(
config.identity_mapping_config_path, self._endpoint_uuid_str
)

except PermissionError as e:
msg = f"({type(e).__name__}) {e}"
log.error(msg)
print(msg, file=sys.stderr)
exit(os.EX_NOPERM)

except Exception as e:
msg = (
f"({type(e).__name__}) {e} -- Unable to read identity mapping"
f" configuration from: {config.identity_mapping_config_path}"
)
log.debug(msg, exc_info=e)
log.error(msg)
print(msg, file=sys.stderr)
exit(os.EX_CONFIG)

# sanitize passwords in logs
log_reg_info = re.subn(r"://.*?@", r"://***:***@", repr(reg_info))
log.debug(f"Registration information: {log_reg_info}")
Expand Down Expand Up @@ -337,6 +368,9 @@ def start(self):
self._command_stop_event.set()
self._kill_event.set()

if self.identity_mapper:
self.identity_mapper.stop_watching()

os.killpg(os.getpgid(0), signal.SIGTERM)

proc_uid, proc_gid = os.getuid(), os.getgid()
Expand Down Expand Up @@ -374,19 +408,6 @@ def start(self):
def _event_loop(self):
self._command.start()

local_user_lookup = {}
try:
with open("local_user_lookup.json") as f:
local_user_lookup = json.load(f)
except Exception as e:
msg = (
f"Unable to load local users ({e.__class__.__name__}) {e}\n"
" Will be unable to respond to any commands; update the lookup file"
f" and either restart (stop, start) or SIGHUP ({os.getpid()}) this"
" endpoint."
)
log.error(msg)

valid_method_name_re = re.compile(r"^cmd_[A-Za-z][0-9A-Za-z_]{0,99}$")
max_skew_s = 180 # 3 minutes; ignore commands with out-of-date timestamp
while not self._time_to_stop:
Expand Down Expand Up @@ -444,25 +465,43 @@ def _event_loop(self):
continue

try:
globus_uuid = msg["globus_uuid"]
globus_username = msg["globus_username"]
effective_identity = msg["globus_effective_identity"]
identity_set = msg["globus_identity_set"]
except Exception as e:
log.error(f"Invalid server command. ({e.__class__.__name__}) {e}")
self._command.ack(d_tag)
continue

identity_for_log = (
f"\n Globus effective identity: {effective_identity}"
f"\n Globus identity set: {identity_set}"
)
try:
local_user = local_user_lookup[globus_username]
local_username = self.identity_mapper.map_identity(identity_set)
if not local_username:
raise LookupError()
except LookupError as e:
log.error(
"Identity failed to map to a local user name."
f" ({e.__class__.__name__}) {e}{identity_for_log}"
)
self._command.ack(d_tag)
continue
except Exception as e:
log.warning(f"Invalid or unknown user. ({e.__class__.__name__}) {e}")
msg = "Unhandled error attempting to map user."
log.debug(f"{msg}{identity_for_log}", exc_info=e)
log.error(f"{msg} ({e.__class__.__name__}) {e}{identity_for_log}")
self._command.ack(d_tag)
continue

try:
local_user_rec = pwd.getpwnam(local_user)
local_user_rec = pwd.getpwnam(local_username)

except Exception as e:
log.warning(
f"Invalid or unknown local username. ({e.__class__.__name__}) {e}"
log.error(
f"({type(e).__name__}) {e}\n"
" Identity mapped to a local user name, but local user does not"
f" exist.\n Local user name: {local_username}{identity_for_log}"
)
self._command.ack(d_tag)
continue
Expand All @@ -477,24 +516,24 @@ def _event_loop(self):

command_func(local_user_rec, command_args, command_kwargs)
log.info(
f"Command process successfully forked for '{globus_username}'"
f" ('{globus_uuid}')."
f"Command process successfully forked for '{local_username}'"
f" (Globus effective identity: {effective_identity})."
)
except (InvalidCommandError, InvalidUserError) as err:
log.error(str(err))
except (InvalidCommandError, InvalidUserError) as e:
log.error(f"({type(e).__name__}) {e}{identity_for_log}")

except Exception:
log.exception(
f"Unable to execute command: {command}\n"
f" args: {command_args}\n"
f" kwargs: {command_kwargs}"
f" kwargs: {command_kwargs}{identity_for_log}"
)
finally:
self._command.ack(d_tag)

def cmd_start_endpoint(
self,
local_user_rec: pwd.struct_passwd,
user_record: pwd.struct_passwd,
args: list[str] | None,
kwargs: dict | None,
):
Expand All @@ -513,15 +552,11 @@ def cmd_start_endpoint(
f"User endpoint {ep_name} is already running (pid: {p}); "
"caching arguments in case it's about to shut down"
)
self._cached_cmd_start_args[p] = (local_user_rec, args, kwargs)
self._cached_cmd_start_args[p] = (user_record, args, kwargs)
return

udir, uid, gid, uname = (
local_user_rec.pw_dir,
local_user_rec.pw_uid,
local_user_rec.pw_gid,
local_user_rec.pw_name,
)
udir, uid, gid = user_record.pw_dir, user_record.pw_uid, user_record.pw_gid
uname = user_record.pw_name

if not self._allow_same_user:
p_uname = self._mu_user.pw_name
Expand Down Expand Up @@ -553,7 +588,7 @@ def cmd_start_endpoint(
if pid > 0:
proc_args_s = f"({uname}, {ep_name}) {' '.join(proc_args)}"
self._children[pid] = UserEndpointRecord(
ep_name=ep_name, local_user_info=local_user_rec, arguments=proc_args_s
ep_name=ep_name, local_user_info=user_record, arguments=proc_args_s
)
log.info(f"Creating new user endpoint (pid: {pid}) [{proc_args_s}]")
return
Expand Down Expand Up @@ -693,7 +728,7 @@ def cmd_start_endpoint(

# fcntl.F_GETPIPE_SZ is not available in Python versions less than 3.10
F_GETPIPE_SZ = 1032
# 256 - Allow some head room for multiple kernel-specific factors
# 256 - Allow some headroom for multiple kernel-specific factors
max_buf_size = fcntl.fcntl(write_handle, F_GETPIPE_SZ) - 256
stdin_data_size = len(stdin_data)
if stdin_data_size > max_buf_size:
Expand Down
Loading

0 comments on commit defa6ab

Please sign in to comment.