diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index fa0969d398..cd7d0596a9 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -375,7 +375,7 @@ def start(self) -> None: self.zmq_context.destroy() delta = time.time() - start - logger.info("Processed {} tasks in {} seconds".format(self.count, delta)) + logger.info(f"Processed {self.count} tasks in {delta} seconds") logger.warning("Exiting") def process_task_outgoing_incoming( @@ -396,9 +396,8 @@ def process_task_outgoing_incoming( try: msg = json.loads(message[1].decode('utf-8')) except Exception: - logger.warning("Got Exception reading message from manager: {!r}".format( - manager_id), exc_info=True) - logger.debug("Message: \n{!r}\n".format(message[1])) + logger.warning(f"Got Exception reading message from manager: {manager_id!r}", exc_info=True) + logger.debug("Message:\n %r\n", message[1]) return # perform a bit of validation on the structure of the deserialized @@ -406,7 +405,7 @@ def process_task_outgoing_incoming( # in obviously malformed cases if not isinstance(msg, dict) or 'type' not in msg: logger.error(f"JSON message was not correctly formatted from manager: {manager_id!r}") - logger.debug("Message: \n{!r}\n".format(message[1])) + logger.debug("Message:\n %r\n", message[1]) return if msg['type'] == 'registration': @@ -425,7 +424,7 @@ def process_task_outgoing_incoming( self.connected_block_history.append(msg['block_id']) interesting_managers.add(manager_id) - logger.info("Adding manager: {!r} to ready queue".format(manager_id)) + logger.info(f"Adding manager: {manager_id!r} to ready queue") m = self._ready_managers[manager_id] # m is a ManagerRecord, but msg is a dict[Any,Any] and so can @@ -434,12 +433,12 @@ def process_task_outgoing_incoming( # later. m.update(msg) # type: ignore[typeddict-item] - logger.info("Registration info for manager {!r}: {}".format(manager_id, msg)) + logger.info(f"Registration info for manager {manager_id!r}: {msg}") self._send_monitoring_info(monitoring_radio, m) if (msg['python_v'].rsplit(".", 1)[0] != self.current_platform['python_v'].rsplit(".", 1)[0] or msg['parsl_v'] != self.current_platform['parsl_v']): - logger.error("Manager {!r} has incompatible version info with the interchange".format(manager_id)) + logger.error(f"Manager {manager_id!r} has incompatible version info with the interchange") logger.debug("Setting kill event") kill_event.set() e = VersionMismatch("py.v={} parsl.v={}".format(self.current_platform['python_v'].rsplit(".", 1)[0], @@ -452,16 +451,15 @@ def process_task_outgoing_incoming( self.results_outgoing.send(pkl_package) logger.error("Sent failure reports, shutting down interchange") else: - logger.info("Manager {!r} has compatible Parsl version {}".format(manager_id, msg['parsl_v'])) - logger.info("Manager {!r} has compatible Python version {}".format(manager_id, - msg['python_v'].rsplit(".", 1)[0])) + logger.info(f"Manager {manager_id!r} has compatible Parsl version {msg['parsl_v']}") + logger.info(f"Manager {manager_id!r} has compatible Python version {msg['python_v'].rsplit('.', 1)[0]}") elif msg['type'] == 'heartbeat': self._ready_managers[manager_id]['last_heartbeat'] = time.time() - logger.debug("Manager {!r} sent heartbeat via tasks connection".format(manager_id)) + logger.debug("Manager %r sent heartbeat via tasks connection", manager_id) self.task_outgoing.send_multipart([manager_id, b'', PKL_HEARTBEAT_CODE]) elif msg['type'] == 'drain': self._ready_managers[manager_id]['draining'] = True - logger.debug(f"Manager {manager_id!r} requested drain") + logger.debug("Manager %r requested drain", manager_id) else: logger.error(f"Unexpected message type received from manager: {msg['type']}") logger.debug("leaving task_outgoing section") @@ -484,9 +482,11 @@ def expire_drained_managers(self, interesting_managers: Set[bytes], monitoring_r def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None: # Check if there are tasks that could be sent to managers - logger.debug("Managers count (interesting/total): {interesting}/{total}".format( - total=len(self._ready_managers), - interesting=len(interesting_managers))) + logger.debug( + "Managers count (interesting/total): {}/{}", + len(interesting_managers), + len(self._ready_managers) + ) if interesting_managers and not self.pending_task_queue.empty(): shuffled_managers = self.manager_selector.sort_managers(self._ready_managers, interesting_managers) @@ -497,7 +497,7 @@ def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None: tasks_inflight = len(m['tasks']) real_capacity = m['max_capacity'] - tasks_inflight - if (real_capacity and m['active'] and not m['draining']): + if real_capacity and m["active"] and not m["draining"]: tasks = self.get_tasks(real_capacity) if tasks: self.task_outgoing.send_multipart([manager_id, b'', pickle.dumps(tasks)]) @@ -506,19 +506,19 @@ def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None: tids = [t['task_id'] for t in tasks] m['tasks'].extend(tids) m['idle_since'] = None - logger.debug("Sent tasks: {} to manager {!r}".format(tids, manager_id)) + logger.debug("Sent tasks: %s to manager %r", tids, manager_id) # recompute real_capacity after sending tasks real_capacity = m['max_capacity'] - tasks_inflight if real_capacity > 0: - logger.debug("Manager {!r} has free capacity {}".format(manager_id, real_capacity)) + logger.debug("Manager %r has free capacity %s", manager_id, real_capacity) # ... so keep it in the interesting_managers list else: - logger.debug("Manager {!r} is now saturated".format(manager_id)) + logger.debug("Manager %r is now saturated", manager_id) interesting_managers.remove(manager_id) else: interesting_managers.remove(manager_id) # logger.debug("Nothing to send to manager {}".format(manager_id)) - logger.debug("leaving _ready_managers section, with {} managers still interesting".format(len(interesting_managers))) + logger.debug("leaving _ready_managers section, with %s managers still interesting", len(interesting_managers)) else: logger.debug("either no interesting managers or no tasks, so skipping manager pass") @@ -528,9 +528,9 @@ def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_ logger.debug("entering results_incoming section") manager_id, *all_messages = self.results_incoming.recv_multipart() if manager_id not in self._ready_managers: - logger.warning("Received a result from a un-registered manager: {!r}".format(manager_id)) + logger.warning(f"Received a result from a un-registered manager: {manager_id!r}") else: - logger.debug(f"Got {len(all_messages)} result items in batch from manager {manager_id!r}") + logger.debug("Got %s result items in batch from manager %r", len(all_messages), manager_id) b_messages = [] @@ -548,10 +548,10 @@ def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_ monitoring_radio.send(r['payload']) elif r['type'] == 'heartbeat': - logger.debug(f"Manager {manager_id!r} sent heartbeat via results connection") + logger.debug("Manager %r sent heartbeat via results connection", manager_id) b_messages.append((p_message, r)) else: - logger.error("Interchange discarding result_queue message of unknown type: {}".format(r['type'])) + logger.error("Interchange discarding result_queue message of unknown type: %s", r["type"]) got_result = False m = self._ready_managers[manager_id] @@ -560,14 +560,16 @@ def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_ if r['type'] == 'result': got_result = True try: - logger.debug(f"Removing task {r['task_id']} from manager record {manager_id!r}") + logger.debug("Removing task %s from manager record %r", r["task_id"], manager_id) m['tasks'].remove(r['task_id']) except Exception: # If we reach here, there's something very wrong. - logger.exception("Ignoring exception removing task_id {} for manager {!r} with task list {}".format( + logger.exception( + "Ignoring exception removing task_id %s for manager %r with task list %s", r['task_id'], manager_id, - m['tasks'])) + m["tasks"] + ) b_messages_to_send = [] for (b_message, _) in b_messages: @@ -578,7 +580,7 @@ def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_ self.results_outgoing.send_multipart(b_messages_to_send) logger.debug("Sent messages on results_outgoing") - logger.debug(f"Current tasks on manager {manager_id!r}: {m['tasks']}") + logger.debug("Current tasks on manager %r: %s", manager_id, m["tasks"]) if len(m['tasks']) == 0 and m['idle_since'] is None: m['idle_since'] = time.time()