Skip to content

Commit

Permalink
working version of task migration with single worker
Browse files Browse the repository at this point in the history
  • Loading branch information
dmwu committed Aug 7, 2016
1 parent deafb30 commit 2b3413b
Show file tree
Hide file tree
Showing 2 changed files with 264 additions and 25 deletions.
82 changes: 57 additions & 25 deletions tailFirst_perTask.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
import sys,os,argparse
import random
import heapq
import numpy as np
from numpy import mean
from copy import deepcopy
from itertools import groupby
from operator import itemgetter
from math import floor

def executionTime():
return random.expovariate(0.01)
sample = np.random.pareto(1,1)+1
return round(sample[0],1)

def jobSizeGenerate():
return int(random.expovariate(0.02))
return int(random.expovariate(0.025))

def mockPlace(heap, tasks_num,duration):
candidates=[]
Expand All @@ -28,9 +31,9 @@ def main(jobs_num, worker_num, probRatio=5):
dsrjfSet=[]
speedupOverSTF=[]
speedupOverFIFO=[]
speedupOverSRJF =[]
speedupOverta =[]
speedupOverDSRJF = []
for iteration in range(30):
for iteration in range(20):
workers = [[] for i in range(worker_num)]
schedulers = []
for jobIndex in range(jobs_num):
Expand All @@ -54,15 +57,17 @@ def main(jobs_num, worker_num, probRatio=5):
fifo = FIFO(deepcopy(workers),2)
(ta) = tailAware(deepcopy(workers))
dsrjf = DistributedSRJF(deepcopy(workers))
if(srjf >= dsrjf):
print'*'*30
stfSet.append(stf)
srjfSet.append(srjf)
fifoSet.append(fifo)
taSet.append(ta)
dsrjfSet.append(dsrjf)
speedupOverSRJF.append(float(srjf)/ta)
speedupOverFIFO.append(float(fifo)/ta)
speedupOverSTF.append(float(stf)/ta)
speedupOverDSRJF.append(float(dsrjf)/ta)
speedupOverta.append(float(ta)/srjf)
speedupOverFIFO.append(float(fifo)/srjf)
speedupOverSTF.append(float(stf)/srjf)
speedupOverDSRJF.append(float(dsrjf)/srjf)
print("stf",stf,"srjf",srjf,"dsrjf",dsrjf,"fifo",fifo,"ta",ta)
print ("STF:", mean(stfSet))
print ("SRJF:", mean(srjfSet))
Expand All @@ -73,8 +78,8 @@ def main(jobs_num, worker_num, probRatio=5):
#print ("speedupOverFIFO", speedupOverFIFO)
#print ("len(ta)",len(ta),"len(fifo)",len(fifo),"len(srjf)",len(srjf),"len(workers)",\
# len(set([x for worker in workers for (y,x) in worker])))
print ("speedupOverSRJF==================")
print ("max",max(speedupOverSRJF),"min",min(speedupOverSRJF),"mean",mean(speedupOverSRJF))
print ("speedupOverta==================")
print ("max",max(speedupOverta),"min",min(speedupOverta),"mean",mean(speedupOverta))
print ("speedupOverFIFO==================")
print ("max",max(speedupOverFIFO),"min",min(speedupOverFIFO),"mean",mean(speedupOverFIFO))
print ("speedupOverSTF==================")
Expand Down Expand Up @@ -111,7 +116,9 @@ def SRJF(placements):
timeAccu =[0]*len(placements)
jobOrder =[]
totalRemainTasks = sum([len(x) for x in placements])
JobTracker = {}
JobTotal= {}
updatesOfRemWork=[]

def tallyEachJob(placements):
items = [item for sublist in placements for item in sublist]
items.sort(key=itemgetter(1))
Expand All @@ -128,19 +135,42 @@ def findClosestKey(JobTracker, key):
if k <= key:
return k

JobTracker[0] = tallyEachJob(placements)
while(totalRemainTasks > 0):
for i in range(len(placements)):
if totalRemainTasks > 0:
mostRecentKey = findClosestKey(JobTracker,timeAccu[i])
jobOrder = JobTracker[mostRecentKey]
if(len(placements[i]) > 0):
(duration, jobIndex) = min(placements[i], key=lambda x: jobOrder[x[1]] )
placements[i].remove((duration,jobIndex))
timeAccu[i] += duration
JobTracker[timeAccu[i]]=tallyEachJob(placements)
execLog.append((timeAccu[i],jobIndex))
totalRemainTasks -= 1
def readRemWork(time):
effectiveUpdates = [(y,z) for (x,y,z) in updatesOfRemWork if x<=time]
JTCopy = deepcopy(JobTotal)
for (duration,jobIndex) in effectiveUpdates:
JTCopy[jobIndex] -= duration
return JTCopy



JobTotal= tallyEachJob(placements)
heap =[]
for i in range(len(placements)):
heap.append((0,i))
heapq.heapify(heap)
while(len(heap)>0):
(curTime, workerID) = heapq.heappop(heap)
if(len(placements[workerID]) > 0):
jobOrder = readRemWork(curTime)
(duration, jobIndex) = min(placements[workerID], key=lambda x: jobOrder[x[1]] )
placements[workerID].remove((duration,jobIndex))
updatesOfRemWork.append((curTime+duration,duration,jobIndex))
execLog.append((curTime+duration,jobIndex))
heapq.heappush(heap, (curTime+duration,workerID))

# while(totalRemainTasks > 0):
# for i in range(len(placements)):
# if totalRemainTasks > 0:
# mostRecentKey = findClosestKey(JobTracker,timeAccu[i])
# jobOrder = JobTracker[mostRecentKey]
# if(len(placements[i]) > 0):
# (duration, jobIndex) = min(placements[i], key=lambda x: jobOrder[x[1]] )
# placements[i].remove((duration,jobIndex))
# timeAccu[i] += duration
# JobTracker[timeAccu[i]]=tallyEachJob(placements)
# execLog.append((timeAccu[i],jobIndex))
# totalRemainTasks -= 1

execLog.sort(key=itemgetter(1))
JCT = [reduce(lambda x,y: (max(x[0],y[0]),x[1]), group) for _,group in groupby(execLog,key=itemgetter(1))]
Expand Down Expand Up @@ -225,6 +255,9 @@ def checkAndPerform(worker,budgetsPerWorker,indexOfBottleneck):


if __name__ == "__main__":
placements = [[(3.3, 0), (1.6, 1), (1.6, 1)],
[(1.1, 3), (1.1, 3), (3.3, 0), (1.3, 2)],
[(1.1, 3), (1.3, 2), (1.6, 1), (1.3, 2), (3.3, 0)]]
parser = argparse.ArgumentParser()
parser.add_argument("jobNum", type=int, help= "specify how many jobs")
parser.add_argument("workerNum", type=int, help= "how many workers")
Expand All @@ -234,4 +267,3 @@ def checkAndPerform(worker,budgetsPerWorker,indexOfBottleneck):
#parser.add_argument("iterations", type=int, help="specify the number of iterations",default=1)
args = parser.parse_args()
main(args.jobNum,args.workerNum)

207 changes: 207 additions & 0 deletions taskMigration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
import sys,os,argparse
import numpy as np
import Queue
import math
import random
import logging
from abc import ABCMeta, abstractmethod
from numpy import mean
from copy import deepcopy
from itertools import groupby
from operator import itemgetter
from math import floor
#taskDurations = random.pareto(1,1)+1
numCores = 8
memoryCapacity = 128.0
paretoK = 0.05
paretoA = 1.001 #should be larger than 1
maxTaskDuration = 360000.0
memLower = 1 #unit GB
memUpper = 20
taskAssignWaitingTime = 3 # unit second
switchingOverhead = 0.001
numTasks = 30000

class Task(object):
def __init__(self, duration, memDemand, taskid):
self.duration = duration
self.remTime = duration
self.memDemand = memDemand
self.taskid = taskid

def taskGeneration(taskid):
duration = float((np.random.pareto(paretoA,1)+1)*paretoK)
duration = min(duration,maxTaskDuration)
memDemand = (duration/maxTaskDuration)*(memUpper-memLower) + memLower
return Task(duration,memDemand,taskid)

class Event(object):
""" Abstract class representing events. """
__metaclass__ = ABCMeta

@abstractmethod
def __init__(self):
pass

@abstractmethod
def run(self, current_time):
""" Returns any events that should be added to the queue. """
pass

class TaskArrival(Event):
def __init__(self, worker, interArrivalDelay,taskid):
self.worker = worker
self.interArrivalDelay = interArrivalDelay
self.taskid = taskid
def run(self, currentTime):
task = taskGeneration(self.taskid)
self.worker.centralQueue.append(task)
self.worker.arrivalTime[task.taskid] = currentTime
taskAssignEvent = (currentTime, TaskAssign(self.worker))
if self.taskid < numTasks:
logging.debug('task arrival (duration, memDemand, id) = (%f,%d,%d) at time %f\n',\
task.duration,task.memDemand,task.taskid, currentTime)
arrivalDelay = random.expovariate(1.0 / self.interArrivalDelay)
logging.debug("next task will arrival on time %f\n",currentTime+arrivalDelay)
taskArrivalEvent = \
(currentTime + arrivalDelay, TaskArrival(self.worker, self.interArrivalDelay, self.taskid+1))
return [taskArrivalEvent, taskAssignEvent]
else:
return []

class TaskAssign(Event):
# if there's enough mem, put one task into the first execution queue
def __init__(self,worker):
self.worker = worker
def run(self, currentTime):
if(len(self.worker.centralQueue)>0):
task = self.worker.centralQueue[0]
if( self.worker.usedMem + task.memDemand <= self.worker.memCapacity):
self.worker.usedMem += task.memDemand
self.worker.centralQueue.pop(0)
self.worker.queues[0].append(task)
logging.debug("assign task %d on the first queue at time %f\n",task.taskid, currentTime)
if (len(self.worker.queues[0])==1):
#just moved to the first queue which is empty
taskProcessingEvent = (currentTime, TaskProcessing(self.worker, 0))
return [taskProcessingEvent]
return []


class TaskProcessing(Event):

def __init__(self, worker, queueid):
self.worker = worker
self.queueid = queueid

def run(self, currentTime):
if(len(self.worker.queues[self.queueid])>0):
frontTask = self.worker.queues[self.queueid][0]
logging.debug("processing task %d on queue %d at time %f\n",frontTask.taskid,self.queueid,currentTime)
if(frontTask.remTime <= self.worker.thresholds[self.queueid]):
taskEndEvent = (currentTime+frontTask.remTime, TaskEnd(self.worker, frontTask, self.queueid))
return [taskEndEvent]
else:
assert(self.queueid < numCores-1)
curThreshold = self.worker.thresholds[self.queueid]
taskMigrationEvent = (currentTime+switchingOverhead+curThreshold, \
TaskMigration(self.worker, frontTask, self.queueid, self.queueid+1))
return [taskMigrationEvent]
elif (self.queueid == 0 and len(self.worker.centralQueue) >0):
logging.debug("the first queue is empty, a new taskAssignEvent is scheduled at time %f\n",currentTime)
taskAssignEvent = (currentTime, TaskAssign(self.worker))
return [taskAssignEvent]

else:
return []



class TaskMigration(Event):
def __init__(self, worker, task, curID, desID):
self.worker = worker
self.task = task
self.curID = curID
self.desID = desID
assert(desID >=1 )
def run(self,currentTime):
self.worker.queues[self.curID].pop(0)
self.task.remTime -= self.worker.thresholds[self.curID]
self.worker.queues[self.desID].append(self.task)
logging.debug("migrating task %d from queue %d to queue %d at time %f\n",\
self.task.taskid, self.desID-1,self.desID,currentTime)
taskProcessingEvent1 = (currentTime, TaskProcessing(self.worker, self.curID))
if(len(self.worker.queues[self.desID]) == 1):
#just moved to an empty queue
logging.debug("the destination queue is empty\n")
taskProcessingEvent2 = (currentTime, TaskProcessing(self.worker, self.desID))
return [taskProcessingEvent1, taskProcessingEvent2]
else:
return [taskProcessingEvent1]


class TaskEnd(Event):
def __init__(self, worker, task, queueid):
self.worker = worker
self.task = task
self.queueid = queueid
def run(self, currentTime):
logging.debug("task %d end on queue %d at time %f\n",self.task.taskid,self.queueid,currentTime)
self.worker.queues[self.queueid].pop(0)
self.task.remTime =0
self.worker.usedMem -= self.task.memDemand
self.worker.finishTime[self.task.taskid] = currentTime
arrivalTime = self.worker.arrivalTime[self.task.taskid]
slowdown = (currentTime - arrivalTime)/self.task.duration
logging.info("task %d arrivals at %f with duration %f, finishs at %f, slowdown is %f\n",\
self.task.taskid, arrivalTime, self.task.duration, currentTime, slowdown)
self.worker.slowDownStat[self.task.taskid] = slowdown
taskProcessingEvent = (currentTime, TaskProcessing(self.worker,self.queueid))
return [taskProcessingEvent]

class Worker(object):
def __init__(self, id):
self.memCapacity = memoryCapacity
self.usedMem = 0
self.centralQueue = []
self.queues = []
self.id = id
self.thresholds =[2**k*0.1 for k in range(numCores-1)]
self.slowDownStat = {}
self.thresholds.append(np.inf)
self.finishTime = {}
self.arrivalTime = {}
while len(self.queues) < numCores:
self.queues.append([])

class Simulation(object):
def __init__(self, load):
self.load = load
self.centralQueue = []
self.interArrivalDelay = (paretoA*paretoK/(paretoA-1))/(load*numCores)
self.eventQueue = Queue.PriorityQueue()
self.worker = Worker(0)
def run(self):
self.eventQueue.put((0,TaskArrival(self.worker,self.interArrivalDelay,0)))
lastTime = 0
while not self.eventQueue.empty():
(currentTime, event) = self.eventQueue.get()
assert currentTime >= lastTime
lastTime = currentTime
#print (type(event))
newEvents = event.run(currentTime)
for newEvent in newEvents:
self.eventQueue.put(newEvent)
logging.critical("=========average slowdown is=============%f\n",\
np.mean(self.worker.slowDownStat.values()))


def main():
logging.basicConfig(level=logging.CRITICAL, format='%(message)s')
sim = Simulation(0.7)
sim.run()

if __name__ == "__main__":
main()


0 comments on commit 2b3413b

Please sign in to comment.