diff --git a/parsl/curvezmq.py b/parsl/curvezmq.py index 3e0ad9409f..e90e13a5bd 100644 --- a/parsl/curvezmq.py +++ b/parsl/curvezmq.py @@ -160,6 +160,9 @@ def socket(self, socket_type: int, *args, **kwargs) -> zmq.Socket: except zmq.ZMQError as e: raise ValueError("Invalid CurveZMQ key format") from e sock.setsockopt(zmq.CURVE_SERVER, True) # Must come before bind + + # This flag enables IPV6 in addition to IPV4 + sock.setsockopt(zmq.IPV6, True) return sock def term(self): @@ -202,4 +205,5 @@ def socket(self, socket_type: int, *args, **kwargs) -> zmq.Socket: sock.setsockopt(zmq.CURVE_SERVERKEY, server_public_key) except zmq.ZMQError as e: raise ValueError("Invalid CurveZMQ key format") from e + sock.setsockopt(zmq.IPV6, True) return sock diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index a1def0466a..9acf9313b5 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -86,12 +86,14 @@ address : string An address to connect to the main Parsl process which is reachable from the network in which - workers will be running. This field expects an IPv4 address (xxx.xxx.xxx.xxx). + workers will be running. This field expects an IPv4 address (xxx.xxx.xxx.xxx) (when + enable_ipv6 is not set) Most login nodes on clusters have several network interfaces available, only some of which can be reached from the compute nodes. This field can be used to limit the executor to listen only on a specific interface, and limiting connections to the internal network. By default, the executor will attempt to enumerate and connect through all possible addresses. Setting an address here overrides the default behavior. + If `enable_ipv6` is set, specify an IPv6 address in brackets, for eg: address=[::1] default=None worker_ports : (int, int) @@ -224,6 +226,10 @@ class HighThroughputExecutor(BlockProviderExecutor, RepresentationMixin, UsageIn Parsl will create names as integers starting with 0. default: empty list + + enable_ipv6: bool + Set this flag to enable communication over IPV6 in addition to IPV4. + default: False """ @typeguard.typechecked @@ -253,7 +259,8 @@ def __init__(self, worker_logdir_root: Optional[str] = None, manager_selector: ManagerSelector = RandomManagerSelector(), block_error_handler: Union[bool, Callable[[BlockProviderExecutor, Dict[str, JobStatus]], None]] = True, - encrypted: bool = False): + encrypted: bool = False, + enable_ipv6: bool = False): logger.debug("Initializing HighThroughputExecutor") @@ -268,6 +275,13 @@ def __init__(self, self.address = address self.address_probe_timeout = address_probe_timeout self.manager_selector = manager_selector + self.enable_ipv6 = enable_ipv6 + if self.enable_ipv6: + # This will force usage of IPV6 for internal communication + self._internal_address = "[::1]" + else: + self._internal_address = "127.0.0.1" + if self.address: self.all_addresses = address else: @@ -408,13 +422,13 @@ def start(self): ) self.outgoing_q = zmq_pipes.TasksOutgoing( - "127.0.0.1", self.interchange_port_range, self.cert_dir + self._internal_address, self.interchange_port_range, self.cert_dir ) self.incoming_q = zmq_pipes.ResultsIncoming( - "127.0.0.1", self.interchange_port_range, self.cert_dir + self._internal_address, self.interchange_port_range, self.cert_dir ) self.command_client = zmq_pipes.CommandClient( - "127.0.0.1", self.interchange_port_range, self.cert_dir + self._internal_address, self.interchange_port_range, self.cert_dir ) self._result_queue_thread = None @@ -515,7 +529,7 @@ def _start_local_interchange_process(self) -> None: get the worker task and result ports that the interchange has bound to. """ - interchange_config = {"client_address": "127.0.0.1", + interchange_config = {"client_address": self._internal_address, "client_ports": (self.outgoing_q.port, self.incoming_q.port, self.command_client.port), diff --git a/parsl/executors/high_throughput/probe.py b/parsl/executors/high_throughput/probe.py index 6b8defe601..bb7b4b4843 100644 --- a/parsl/executors/high_throughput/probe.py +++ b/parsl/executors/high_throughput/probe.py @@ -32,6 +32,7 @@ def probe_addresses(addresses, task_port, timeout=120): for addr in addresses: socket = context.socket(zmq.DEALER) socket.setsockopt(zmq.LINGER, 0) + socket.setsockopt(zmq.IPV6, True) url = "tcp://{}:{}".format(addr, task_port) logger.debug("Trying to connect back on {}".format(url)) socket.connect(url) diff --git a/parsl/tests/configs/htex_local.py b/parsl/tests/configs/htex_local.py index 5553a7f70a..548f9d2845 100644 --- a/parsl/tests/configs/htex_local.py +++ b/parsl/tests/configs/htex_local.py @@ -10,6 +10,7 @@ def fresh_config(): executors=[ HighThroughputExecutor( label="htex_local", + enable_ipv6=True, worker_debug=True, cores_per_worker=1, encrypted=True,