Skip to content
This repository has been archived by the owner on Nov 8, 2018. It is now read-only.

Set hostname and port through environment variable #45

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions distkeras/networking.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,26 @@
## BEGIN Imports. ##############################################################

import pickle

import os
import socket

## END Imports. ################################################################

def determine_host_address():
"""Determines the human-readable host address of the local machine."""
host_address = socket.gethostbyname(socket.gethostname())

"""Determines the human-readable host address of the local machine.
First checks if environment variable DISTKERAS_HOSTNAME is set, and
if so uses this as host name. Otherwise, checks using socket"""
host_env_var = 'DISTKERAS_HOSTNAME'
host_address = os.environ.get(host_env_var, socket.gethostbyname(socket.gethostname()))
return host_address

def determine_port():
"""Determines port to connect to on local machine.
First checks if environment variable DISTKERAS_PORT is set, and
if so uses this as port. Otherwise, assign to a default"""
port_env_var = 'DISTKERAS_PORT'
port = int(os.environ.get(port_env_var, 5000))
return port

def recvall(connection, num_bytes):
"""Reads `num_bytes` bytes from the specified connection.
Expand Down
47 changes: 20 additions & 27 deletions distkeras/trainers.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from distkeras.utils import set_keras_base_directory
from distkeras.utils import unpickle_object

from distkeras.networking import determine_host_address
from distkeras.networking import determine_host_address, determine_port

from distkeras.workers import ADAGWorker
from distkeras.workers import AEASGDWorker
Expand Down Expand Up @@ -338,10 +338,10 @@ def train(self, dataframe, shuffle=False):
if shuffle:
dataframe = shuffle(dataframe)
# Check if we need to repartition the dataframe.
if num_partitions >= self.num_workers:
dataframe = dataframe.coalesce(self.num_workers)
if num_partitions >= self.num_ensembles:
dataframe = dataframe.coalesce(self.num_ensembles)
else:
dataframe = dataframe.repartition(self.num_workers)
dataframe = dataframe.repartition(self.num_ensembles)
# Start the training procedure.
self.record_training_start()
# Train the models in parallel.
Expand All @@ -368,12 +368,11 @@ class DistributedTrainer(Trainer):
num_epoch: int. Number of epochs.
batch_size: int. Mini-batch size.
num_workers: int. Number of distributed workers.
master_port: int. port number for the parameter server.
loss_weights: optional list or dict specifying weights for different losses.
"""

def __init__(self, keras_model, worker_optimizer, loss, metrics=["accuracy"], num_workers=2, batch_size=32,
features_col="features", label_col="label", num_epoch=1, master_port=5000, loss_weights=None):
features_col="features", label_col="label", num_epoch=1, loss_weights=None):
super(DistributedTrainer, self).__init__(keras_model, loss, worker_optimizer, metrics, loss_weights)
self.num_workers = num_workers
self.batch_size = batch_size
Expand All @@ -383,7 +382,7 @@ def __init__(self, keras_model, worker_optimizer, loss, metrics=["accuracy"], nu
self.parameter_server = None
self.parameter_server_thread = None
self.master_host = determine_host_address()
self.master_port = master_port
self.master_port = determine_port()
self.learning_rate = 1.0

def set_minibatch_size(self, size):
Expand Down Expand Up @@ -558,18 +557,17 @@ class AsynchronousDistributedTrainer(DistributedTrainer):
num_epoch: int. Number of epochs.
batch_size: int. Mini-batch size.
num_workers: int. Number of distributed workers.
master_port: int. port number for the parameter server.
loss_weights: optional list or dict specifying weights for different losses.

# Note
By default, the parallelization factor is set to 1.
"""

def __init__(self, keras_model, worker_optimizer, loss, metrics=["accuracy"], num_workers=2, batch_size=32,
features_col="features", label_col="label", num_epoch=1, master_port=5000, loss_weights=None):
features_col="features", label_col="label", num_epoch=1, loss_weights=None):
super(AsynchronousDistributedTrainer, self).__init__(keras_model, worker_optimizer, loss, metrics,
num_workers, batch_size, features_col,
label_col, num_epoch, master_port, loss_weights)
label_col, num_epoch, loss_weights)
# Initialize asynchronous methods variables.
self.parallelism_factor = 1

Expand Down Expand Up @@ -665,15 +663,14 @@ class AEASGD(AsynchronousDistributedTrainer):
Higher values mean that the model is allowed to "explore" its surroundings.
Smaller values are correlated with less exploration. We use the value
recommend by the authors.
master_port: int. port number for the parameter server.
loss_weights: optional list or dict specifying weights for different losses.
"""

def __init__(self, keras_model, worker_optimizer, loss, metrics=["accuracy"], num_workers=2, batch_size=32,
features_col="features", label_col="label", num_epoch=1, communication_window=32,
rho=5.0, learning_rate=0.1, master_port=5000, loss_weights=None):
rho=5.0, learning_rate=0.1, loss_weights=None):
super(AEASGD, self).__init__(keras_model, worker_optimizer, loss, metrics, num_workers,
batch_size, features_col, label_col, num_epoch, master_port, loss_weights)
batch_size, features_col, label_col, num_epoch, loss_weights)
self.communication_window = communication_window
self.rho = rho
self.learning_rate = learning_rate
Expand Down Expand Up @@ -713,14 +710,13 @@ class DOWNPOUR(AsynchronousDistributedTrainer):
computed before updating the center variable. For DOWNPOUR we
recommend small communication windows.
learning_rate: float. Learning rate.
master_port: int. port number for the parameter server.
loss_weights: optional list or dict specifying weights for different losses.
"""

def __init__(self, keras_model, worker_optimizer, loss, metrics=["accuracy"], num_workers=2, batch_size=32,
features_col="features", label_col="label", num_epoch=1, communication_window=5, master_port=5000, loss_weights=None):
features_col="features", label_col="label", num_epoch=1, communication_window=5, loss_weights=None):
super(DOWNPOUR, self).__init__(keras_model, worker_optimizer, loss, metrics, num_workers,
batch_size, features_col, label_col, num_epoch, master_port, loss_weights)
batch_size, features_col, label_col, num_epoch, loss_weights)
self.communication_window = communication_window

def allocate_worker(self):
Expand Down Expand Up @@ -762,15 +758,14 @@ class EAMSGD(AsynchronousDistributedTrainer):
Smaller values are correlated with less exploration. We use the value
recommend by the authors.
momentum: float. Momentum term.
master_port: int. port number for the parameter server.
loss_weights: optional list or dict specifying weights for different losses.
"""

def __init__(self, keras_model, worker_optimizer, loss, metrics=["accuracy"], num_workers=2, batch_size=32,
features_col="features", label_col="label", num_epoch=1, communication_window=32,
rho=5.0, learning_rate=0.1, momentum=0.9, master_port=5000, loss_weights=None):
rho=5.0, learning_rate=0.1, momentum=0.9, loss_weights=None):
super(EAMSGD, self).__init__(keras_model, worker_optimizer, loss, metrics, num_workers,
batch_size, features_col, label_col, num_epoch, master_port, loss_weights)
batch_size, features_col, label_col, num_epoch, loss_weights)
self.communication_window = communication_window
self.rho = rho
self.learning_rate = learning_rate
Expand Down Expand Up @@ -808,15 +803,14 @@ class ADAG(AsynchronousDistributedTrainer):
This parameter describes the number of mini-batches that will be
computed before updating the center variable. For DOWNPOUR based
algorithms we recommend large communication windows.
master_port: int. port number for the parameter server.
loss_weights: optional list or dict specifying weights for different losses.
"""

def __init__(self, keras_model, worker_optimizer, loss, metrics=["accuracy"], num_workers=2, batch_size=32,
features_col="features", label_col="label", num_epoch=1, communication_window=12, master_port=5000, loss_weights=None):
features_col="features", label_col="label", num_epoch=1, communication_window=12, loss_weights=None):
# Initialize the parent object.
super(ADAG, self).__init__(keras_model, worker_optimizer, loss, metrics, num_workers,
batch_size, features_col, label_col, num_epoch, master_port, loss_weights)
batch_size, features_col, label_col, num_epoch, loss_weights)
# Set algorithm parameters.
self.communication_window = communication_window

Expand Down Expand Up @@ -858,15 +852,14 @@ class DynSGD(AsynchronousDistributedTrainer):
This parameter describes the number of mini-batches that will be
computed before updating the center variable. For DOWNPOUR based
algorithms we recommend large communication windows.
master_port: int. port number for the parameter server.
loss_weights: optional list or dict specifying weights for different losses.
"""

def __init__(self, keras_model, worker_optimizer, loss, metrics=["accuracy"], num_workers=2, batch_size=32,
features_col="features", label_col="label", num_epoch=1, communication_window=5, master_port=5000, loss_weights=None):
features_col="features", label_col="label", num_epoch=1, communication_window=5, loss_weights=None):
# Initialize the parent object.
super(DynSGD, self).__init__(keras_model, worker_optimizer, loss, metrics, num_workers,
batch_size, features_col, label_col, num_epoch, master_port, loss_weights)
batch_size, features_col, label_col, num_epoch, loss_weights)
# Set algorithm parameters.
self.communication_window = communication_window

Expand All @@ -890,10 +883,10 @@ class Experimental(AsynchronousDistributedTrainer):

def __init__(self, keras_model, worker_optimizer, loss, metrics=["accuracy"], num_workers=2, batch_size=32,
features_col="features", label_col="label", num_epoch=1, communication_window=5,
learning_rate=1.0, master_port=5000, loss_weights=None):
learning_rate=1.0, loss_weights=None):
# Initialize the parent object.
super(Experimental, self).__init__(keras_model, worker_optimizer, loss, metrics, num_workers,
batch_size, features_col, label_col, num_epoch, master_port, loss_weights)
batch_size, features_col, label_col, num_epoch, loss_weights)
# Set the algorithm parameters.
self.communication_window = communication_window
self.learning_rate = learning_rate
Expand Down