Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added parallel search logic #10

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions __init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import os
import sys

sys.path.append(os.path.dirname(os.path.abspath(__file__)))

from src.gaussian_process_search import GaussianProcessSearch
from src.parallel_searcher import ParallelSearcher
from src.search_job_instance import SearchJobInstance
Empty file added src/__init__.py
Empty file.
23 changes: 14 additions & 9 deletions gaussian_process.py → src/gaussian_process_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
from skopt.plots import plot_objective, plot_evaluations
from skopt import dump, load

sys.path.append(os.path.dirname(os.path.abspath(__file__)))
import load_save
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import utilities.io

# Session variables
session_params = {}
Expand Down Expand Up @@ -46,7 +46,7 @@ def __init__(self, search_space, fixed_space, evaluator, input_file=None, output
self.solutions = []
if input_file is not None:
try:
data_dict = load_save.load(data_file=input_file)
data_dict = utilities.io.load(data_file=input_file)
self.x_values, self.y_values = self._extract_values(data_dict)
except OSError as e:
raise OSError('Cannot read input file. \n' + str(e))
Expand Down Expand Up @@ -147,7 +147,7 @@ def add_point_value(self, point, value):
self.x_values.append(p)
self.y_values.append(value)

def get_next_candidate(self, n_points):
def get_next_candidate(self, n_points, n_initial_points=None):
"""Returns the next candidates for the skopt acquisition function

Args:
Expand All @@ -162,10 +162,15 @@ def get_next_candidate(self, n_points):
optimizer = Optimizer(
dimensions=self.search_space,
base_estimator='gp',
n_initial_points=len(self.x_values),
acq_func='EI'
n_initial_points=n_initial_points,
initial_point_generator="random",
acq_func='EI',
n_jobs=-1
)
optimizer.tell(self.x_values, y_values) # TODO Does this fit the values???
# x_values = np.array(self.x_values)
x_values = self.x_values
y_values = y_values
optimizer.tell(x_values, y_values, fit=True)
points = optimizer.ask(n_points=n_points)
return self._to_dict_list(points)

Expand Down Expand Up @@ -263,7 +268,7 @@ def save_values(self):

"""
data_dict = self._pack_values()
load_save.save(self.output_file, data_dict)
utilities.io.save(self.output_file, data_dict)

@staticmethod
def _to_key_value(values):
Expand Down Expand Up @@ -319,4 +324,4 @@ def save_checkpoint(self, res):
res_dict[dimension.name].append(point[i])
res_dict['value'] = y_values

load_save.save(self.output_file, res_dict)
utilities.io.save(self.output_file, res_dict)
80 changes: 80 additions & 0 deletions src/parallel_searcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@

import time

class ParallelSearcher:
def __init__(self, optimizer, job_class):
""" Instantiate parallel searching

Args:
optimizer (GaussianProcessSearch): Optimizer used to find next points to test
job_class (SearchJobInstance): Implementation of SearchJobInstance to manage jobs
"""
self.optimizer = optimizer
self.job_class = job_class

def __launch(self, instance, candidate, retrials=5):
launch_status = 1
attempt = 0
while launch_status != 0 and attempt < retrials:
launch_status = instance.launch(**candidate)
if launch_status != 0:
print("There was some error launching the instance. Retrying (" + str(attempt) + "/" + retrials + ")")
time.sleep(0.1)
attempt += 1

def optimize(self,
n_calls=10,
n_random_starts=5,
noise=0.01,
n_parallel_jobs=1,
refresh_rate=1,
first_id=0,
verbose=True,
plot_results=False):

# Instantiate all initial jobs
instances = [self.job_class(i) for i in range(first_id, first_id + n_parallel_jobs)]

# Get all initial candidates
candidates = []
if len(self.optimizer.x_values) == 0: # If first points, sample random
candidates = self.optimizer.get_random_candidate(n_parallel_jobs)
else:
candidates = self.optimizer.get_next_candidate(n_parallel_jobs)

# Launch all instances
for i in range(n_parallel_jobs):
print(candidates[i])
self.__launch(instance=instances[i], candidate=candidates[i])
n_calls -= 1

while n_calls > 0:
time.sleep(refresh_rate) # refresh rate in seconds
for i in range(n_parallel_jobs):
instance = instances[i]
if instance.done():
n_calls -= 1
instance_params = instance.passed_args
instance_result = instance.get_result()

# Display information
print("*****")
print("Finished job:", instance.id)
print("Instance_params:", instance_params)
print("Instance_result:", instance_result)
print("*****")

# Add point-evaluation info to the optimizer
self.optimizer.add_point_value(instance_params, instance_result)
self.optimizer.save_values()

# Instantiate new job instance
candidate = self.optimizer.get_next_candidate(1, n_random_starts)[0]
instances[i] = self.job_class(instance.id + n_parallel_jobs)
self.__launch(instance=instances[i], candidate=candidate)

# Display information
print("*****")
print("Starting job:", instances[i])
print("Instance_params:", candidate)
print("*****")
44 changes: 44 additions & 0 deletions src/search_job_instance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from abc import ABC, abstractmethod

class SearchJobInstance(ABC):
"""Abstract class linked to a single job.
It is used to manage
"""
def __init__(self, id):
self.id = id

@abstractmethod
def launch(self, **kwargs) -> int:
"""Execute command given the objective arguments
IMPORTANT: Must be non-blocking!

Returns:
status(Int): 0 everything is ok, 1 there was some error
"""
self.passed_args = kwargs
raise NotImplementedError

@abstractmethod
def get_result(self) -> float:
"""Return final result of the optimization
"""
raise NotImplementedError

@abstractmethod
def done(self) -> bool:
"""True if job has finished, false otherwise
IMPORTANT: Must be non-blocking!
"""
raise NotImplementedError

@abstractmethod
def kill(self) -> None:
"""Finish job
"""
raise NotImplementedError

@abstractmethod
def end(self) -> None:
"""Run any task necessary when done
"""
raise NotImplementedError
Comment on lines +34 to +44
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these two functions called by anyone? If not, I would not include them in the abstract SearchJobInstance

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are not called now. I thought they might be useful but you are right, they are not needed.

I'll remove them for now :)

Empty file added utilities/__init__.py
Empty file.
File renamed without changes.