Skip to content

Commit

Permalink
Merge pull request #44 from autolab/persistent_job_queue_prodcom
Browse files Browse the repository at this point in the history
Persistent job queue prodcom
  • Loading branch information
Ilter committed Mar 18, 2015
2 parents 0806f99 + 460eea4 commit c3f61db
Show file tree
Hide file tree
Showing 1,061 changed files with 869 additions and 219,276 deletions.
9 changes: 9 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,12 @@ scripts/*.sh
vmms/id_rsa*
# config
config.py

# Virtualenv
.Python
bin
lib
include

# Mac OS X custom attribute files
.DS_Store
21 changes: 13 additions & 8 deletions config.py → config.template.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
# config.py - Global configuration constants and runtime info
#

from vmms.localSSH import *
import logging

# Config - defines
Expand All @@ -22,18 +21,17 @@ class Config:
PORT = 3000

# Log file. Setting this to None sends the server output to stdout
LOGFILE = None
LOGFILE = "tango3.log"

# Logging level
LOGLEVEL = logging.INFO
LOGLEVEL = logging.DEBUG

# Courselabs directory. Must be created before starting Tango
COURSELABS = ""
COURSELABS = "<INSERT_PATH_HERE>"

# VMMS to use. Must be set to a VMMS implemented in vmms/ before
# starting Tango
# starting Tango. Options are: "localSSH", "tashiSSH", "ec2SSH"
VMMS_NAME = "localSSH"
VMMS = LocalSSH()

#####
# Part 2: Constants that shouldn't need to change very often.
Expand Down Expand Up @@ -88,7 +86,7 @@ class Config:
POOL_SIZE = 2

# Path for tashi images
TASHI_IMAGE_PATH = ""
TASHI_IMAGE_PATH = "/raid/tashi/images/"

# Optionally log finer-grained timing information
LOG_TIMING = True
Expand All @@ -109,7 +107,14 @@ class Config:
copyout_errors=0

######
# Part 4: EC2 Constants
# Part 4: Settings for shared memory
#
USE_REDIS = False
REDIS_HOSTNAME = "127.0.0.1"
REDIS_PORT = 6379

######
# Part 5: EC2 Constants
#
EC2_REGION = ''
DEFAULT_AMI = ''
Expand Down
93 changes: 93 additions & 0 deletions jobManager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
#
# JobManager - Thread that assigns jobs to worker threads
#
# The job manager thread wakes up every so often, scans the job list
# for new unassigned jobs, and tries to assign them.
#
# Assigning a job will try to get a preallocated VM that is ready,
# otherwise will pass 'None' as the preallocated vm. A worker thread
# is launched that will handle things from here on. If anything goes
# wrong, the job is made dead with the error.
#
import time, threading, logging

from config import Config
from worker import Worker

from jobQueue import JobQueue
from preallocator import Preallocator

class JobManager:

def __init__(self, queue, vmms, preallocator):
self.daemon = True
self.jobQueue = queue
self.vmms = vmms
self.preallocator = preallocator
self.log = logging.getLogger("JobManager")
threading.Thread(target=self.__manage).start()


def __manage(self):
while True:
if Config.REUSE_VMS:
id,vm = self.jobQueue.getNextPendingJobReuse()
else:
id = self.jobQueue.getNextPendingJob()

if id:
job = self.jobQueue.get(id)
try:
# Mark the job assigned
self.jobQueue.assignJob(job.id)

# Try to find a vm on the free list and allocate it to
# the worker if successful.
if Config.REUSE_VMS:
preVM = vm
else:
preVM = self.preallocator.allocVM(job.vm.name)

# Now dispatch the job to a worker
self.log.info("Dispatched job %s:%d to %s [try %d]" %
(job.name, job.id, preVM.name, job.retries))
job.appendTrace("%s|Dispatched job %s:%d [try %d]" %
(time.ctime(time.time()+time.timezone), job.name, job.id,
job.retries))
vmms = self.vmms[job.vm.vmms] # Create new vmms object
Worker(job, vmms, self.jobQueue, self.preallocator, preVM).start()

except Exception, err:
self.jobQueue.makeDead(job.id, str(err))


# Sleep for a bit and then check again
time.sleep(Config.DISPATCH_PERIOD)


if __name__ == "__main__":

if not Config.USE_REDIS:
print("You need to have Redis running to be able to initiate stand-alone\
JobManager")
else:
vmms = None

if Config.VMMS_NAME == "localSSH":
from vmms.localSSH import LocalSSH
vmms = LocalSSH()
elif Config.VMMS_NAME == "tashiSSH":
from vmms.tashiSSH import TashiSSH
vmms = TashiSSH()
elif Config.VMMS_NAME == "ec2SSH":
from vmms.ec2SSH import Ec2SSH
vmms = Ec2SSH()

vmms = {Config.VMMS_NAME: vmms}
preallocator = Preallocator(vmms)
queue = JobQueue(preallocator)

JobManager(queue, vmms, preallocator)

print("Starting the stand-alone Tango JobManager")

Loading

0 comments on commit c3f61db

Please sign in to comment.