root/trunk/src/lib/Components/qsim.py @ 1601

Revision 1601, 50.6 KB (checked in by wtang, 5 months ago)

making qsim faster by optimzing derived allocator code (qsim._find_job_location)

Line 
1#!/usr/bin/env python
2
3'''Cobalt Queue Simulator library'''
4
5import ConfigParser
6import copy
7import logging
8import math
9import os
10import os.path
11import random
12import signal
13import sys
14import time
15import urlparse
16
17from ConfigParser import SafeConfigParser, NoSectionError, NoOptionError
18from datetime import datetime
19
20import Cobalt
21import Cobalt.Cqparse
22import Cobalt.Util
23
24from Cobalt.Components.base import Component, exposed, query, automatic, locking
25from Cobalt.Components.cqm import QueueDict, Queue
26from Cobalt.Components.simulator import Simulator
27from Cobalt.Data import Data, DataList
28from Cobalt.Exceptions import ComponentLookupError
29from Cobalt.Proxy import ComponentProxy, local_components
30from Cobalt.Server import XMLRPCServer, find_intended_location
31
32MAXINT = 2021072587
33MIDPLANE_SIZE = 512
34default_SCALE = 2000000
35default_SHAPE = 0.9 
36default_SENSITIVITY = 0.7
37default_SPECIFICITY = 0.9
38default_FAILURE_LOG = "failure.lists"
39logging.basicConfig()
40logger = logging.getLogger('Qsim')
41
42RECOVERYOPT = 1 # by default, the failed job is sent back to the rear of the queue
43CHECKPOINT = False  #not used in this version
44MTTR = 1200   #time to repair partition(in sec), a failed partition will be available again in MTTR seconds,
45FRACTION = 1  #factor to tune workload, the times between job arrival will be multipled with FRACTION.(1 means no change.)
46SET_event = set(['I', 'Q', 'S', 'E', 'F', 'R'])
47FAULTAWARE = False
48IDEALWALLTIME = True
49
50def parseline(line):
51    '''parse a line in work load file, return a temp
52    dictionary with parsed fields in the line'''
53    temp = {}
54    firstparse = line.split(';')
55    temp['EventType'] = firstparse[1]
56    if temp['EventType'] == 'Q':
57        temp['submittime'] = firstparse[0]
58    temp['jobid'] = firstparse[2]
59    substr = firstparse.pop()
60    if len(substr) > 0:
61        secondparse = substr.split(' ')
62        for item in secondparse:
63            tup = item.partition('=')
64            if not temp.has_key(tup[0]):
65                temp[tup[0]] = tup[2]
66    return temp
67
68def parse_work_load(filename):
69    '''parse the whole work load file, return a raw job dictionary''' 
70    temp = {'jobid':'*', 'submittime':'*', 'queue':'*', 
71            'Resource_List.walltime':'*','nodes':'*', 'runtime':'*'}
72    # raw_job_dict = { '<jobid>':temp, '<jobid2>':temp2, ...}
73    raw_job_dict = {}
74    wlf = open(filename, 'r')
75    for line in wlf:
76        if line[0].isdigit():
77            line = line.strip('\n')
78            temp = parseline(line)
79            jobid = temp['jobid']
80            #new job id encountered, add a new entry for this job
81            if not raw_job_dict.has_key(jobid):
82                raw_job_dict[jobid] = temp
83            else:  #not a new job id, update the existing entry
84                raw_job_dict[jobid].update(temp)
85    return raw_job_dict
86
87def tune_workload(specs, frac):
88    '''tune workload heavier or lighter'''
89   
90    print "inside tune_workload"
91   
92    def _subtimecmp(spec1, spec2):
93        return cmp(spec1.get('submittime'), spec2.get('submittime'))
94   
95    specs.sort(_subtimecmp)
96       
97    #calc mtbs
98    lastsubtime = 0
99    for spec in specs:
100        if (lastsubtime==0):
101            interval = 0
102        else:
103            interval = spec['submittime'] - lastsubtime
104        lastsubtime =  spec['submittime']
105        spec['interval'] = interval
106   
107     #tune workload heavy or light
108   
109    last_newsubtime = specs[0].get('submittime')
110    i = 0
111    for spec in specs:
112        interval = spec['interval']
113        newsubtime = last_newsubtime + frac* interval
114        spec['submittime'] = newsubtime
115        spec['interval'] = frac* interval
116        last_newsubtime = newsubtime
117        i += 1
118   
119    print "in adjust: last submit job=", specs[len(specs)-1].get('submittime')
120
121def sec_to_date(sec, format="%m/%d/%Y %H:%M:%S"):
122    tmp = datetime.fromtimestamp(sec)
123    fmtdate = tmp.strftime(format)
124    return fmtdate   
125                     
126def date_to_sec(fmtdate, format="%m/%d/%Y %H:%M:%S"):
127    t_tuple = time.strptime(fmtdate, format)
128    sec = time.mktime(t_tuple)
129    return sec
130
131def qsim_quit():
132    print "pid=", os.getpid()
133    os.kill(os.getpid(), signal.SIGINT)
134
135class Job (Data):
136    '''Job for simulation
137    Job attribute description and type:
138    jobid: int
139    submittime:  unix second, float
140    queue: queue name, string
141    walltime: estimate runtime, minutes, string
142    nodes: node number, string
143    runtime: seconds, string
144    remain_time: seconds, float
145    start_time: unix second, float
146    end_time: unix second, float
147    failure_time: unix second, float
148    location: list of string(partition name)
149    state: ['invisible', 'running', 'queued', 'ended', 'pending']  string
150    is_visible: true/false
151    recovery_opt,     #0-4
152    first_subtime: unix second, float, the time that the job sumibitted for the first time
153    enque_time: the time the job start waiting in queue, used by scheduler?
154    '''
155   
156    fields = Data.fields + ["jobid", "submittime", "queue", "walltime",
157                            "nodes","runtime", "start_time", "end_time",
158                            "failure_time", "location", "state", "is_visible", 
159                            "args",
160                            "user",
161                            "system_state",
162                            "starttime",
163                            "project",
164                            "is_runnable",
165                            "is_active",
166                            "has_resources",
167                            #below are qsim specific fields
168                            "remain_time",   
169                            "recovery_opt",     #0-4
170                            "arrival_time",
171                            "enque_time",
172                            "checkpoint",   #0,1
173                            ]   
174
175    def __init__(self, spec):
176        Data.__init__(self, spec)
177        self.tag = 'job'
178        #following fields are initialized at beginning of simulation
179        self.jobid = int(spec.get("jobid"))
180        self.queue = spec.get("queue", "default")
181        #self.queue = "default"
182               
183        self.submittime = spec.get("submittime")   #in seconds
184       
185        self.walltime = spec.get("walltime")   #in minutes
186        self.user = spec.get("user", "unknown")
187        self.project = spec.get("project", "unknown")
188        self.nodes = spec.get("nodes", 0)
189        self.runtime = spec.get("runtime", 0)
190        self.remain_time = float(self.runtime)       
191        self.start_time = spec.get('start_time', '0')
192        self.end_time = spec.get('end_time', '0')
193        self.state = spec.get("state", "invisible")
194        self.system_state = ''
195        self.starttime = 0
196        self.arrival_time = 0
197        self.failure_time = 0
198        self.has_resources = False
199        self.is_runnable = False
200        self.is_visible = False
201        self.args = []
202        self.progress = 0
203        self.recovery_opt = spec.get("recovery_opt", RECOVERYOPT)
204        self.checkpoint = 1
205        self.location = []
206
207class JobList(DataList):
208    '''the list of job objects'''
209    item_cls = Job
210   
211    def __init__(self, _queue):
212        self.queue = _queue
213
214class SimQueue (Queue):
215    '''SimQueue object, extended from cqm.Queue,
216     the attribute jobs is qsim.JobList'''
217   
218    def __init__(self, spec):
219        Queue.__init__(self, spec)
220        self.jobs = JobList(self)
221        self.state = 'running'
222        self.tag = 'queue'
223       
224    def get_joblist(self):
225        '''return the job list'''
226        return self.jobs
227 
228class SimQueueDict(QueueDict):
229    '''Queue Dict class for simulating, extended from cqm.QueueDict'''
230    item_cls = SimQueue
231    key = "name"
232
233    def __init__(self, policy):
234        QueueDict.__init__(self)
235        self.policy = policy
236        #create default queue
237        self.add_queues([{"name":"default", "policy":"default"}])         
238 
239    def add_jobs(self, specs, callback=None, cargs={}):
240        '''add jobs to queues, if specified queue not exist, create one''' 
241        queue_names = self.keys()
242        for spec in specs:
243            if spec['queue'] not in queue_names:
244                spec['queue'] = "default"
245   
246 #               self.add_queues([{"name":spec['queue'], "policy":self.policy}])
247 #               queue_names.append(spec['queue'])
248               
249        results = []
250         # add the jobs to the appropriate JobList
251        for spec in specs:
252            results += self[spec['queue']].jobs.q_add([spec], callback, cargs)
253           
254        return results
255
256class PBSlogger:
257    '''Logger to generate PBS-style event log'''
258
259    def __init__(self, name):
260        #get log directory
261        CP = ConfigParser.ConfigParser()
262        CP.read(Cobalt.CONFIG_FILES)
263        try:
264            self.logdir = os.path.expandvars(CP.get('cqm', 'log_dir'))
265        except ConfigParser.NoOptionError:
266            self.logdir = '.'
267           
268        #determine log filename
269        if name:
270            filename = "%s/qsim-%s.log" % (self.logdir, name)
271        else:
272            self.date = time.localtime()[:3]
273            date_string = "%s_%02d_%02d" % self.date
274            filename = "%s/qsim-%s.log" % (self.logdir, date_string)   
275       
276        self.logfile = open(filename, 'w')
277        self.name = name
278
279    def closeLog(self):
280        self.logfile.close()
281
282    def LogMessage(self, message):
283        '''log message into pbs-style log'''
284        try:
285            self.logfile.write("%s\n" % (message))
286            self.logfile.flush()
287        except IOError, e:
288            logger.error("PBSlogger failure : %s" % e)
289   
290class Qsimulator(Simulator):
291    '''Cobalt Queue Simulator'''
292   
293    implementation = "qsim"
294    name = "queue-manager"
295    alias = Simulator.name
296
297    def __init__(self, *args, **kwargs):
298       
299        print "kwargs= ",  kwargs
300       
301        #initialize partitions
302        Simulator.__init__(self, *args, **kwargs)
303        partnames = self._partitions.keys()
304        self.init_partition(partnames)
305        self.part_size_list = []
306     
307        for part in self.partitions.itervalues():
308            if int(part.size) not in self.part_size_list:
309                self.part_size_list.append(int(part.size))
310        self.part_size_list.sort()
311   
312        #get command line parameters
313        self.FAILURE_FREE = True
314        self.FRACTION = kwargs.get("fraction", 1)
315        self.workload_file =  kwargs.get("workload")
316        self.output_log = kwargs.get("outputlog")
317        self.failure_log = kwargs.get('failurelog')
318       
319        self.weibull = kwargs.get('weibull')
320        if self.weibull:
321            self.SCALE = float(kwargs.get('scale'))
322            if self.SCALE == 0:
323                self.SCALE = default_SCALE
324            self.SHAPE = float(kwargs.get('shape'))
325            if self.SHAPE == 0:
326                self.SHAPE = default_SHAPE
327       
328        self.fault_aware = kwargs.get('faultaware')
329        self.SENSITIVITY = default_SENSITIVITY
330        self.SPECIFICITY = default_SPECIFICITY
331        if self.fault_aware:
332            self.SENSITIVITY = float(kwargs.get('sensitivity', default_SENSITIVITY))
333            self.SPECIFICITY = float(kwargs.get('specificity', defalt_SPECIFICITY))
334               
335        if self.failure_log or self.weibull:
336            self.FAILURE_FREE = False
337       
338        #initialize time stamps and job queues
339        #time stamp format: ('EVENT', 'time_stamp_date', time_stamp_second, {'job_id':str(jobid), 'location':[partition1, partition2,...]})
340        self.time_stamps = [('I', '0', 0, {})]
341        self.cur_time_index = 0
342        self.queues = SimQueueDict(policy=kwargs['policy'])
343        self.init_queues()
344        self.visible_jobs = []
345       
346        #initialize failures
347        self.failure_dict = {}
348        if not self.FAILURE_FREE:
349            if self.failure_log: 
350                #if specified failure log, use log trace failure
351                self.inject_failures()
352            elif self.weibull:
353                #else MAKE failures by Weibull distribution
354                self.make_failures()
355       
356        #initialize PBS-style logger
357        self.pbslog = PBSlogger(self.output_log)
358       
359        #initialize debug logger
360        self.dbglog = PBSlogger(self.output_log+"-debug")
361       
362        #finish tag
363        self.finished = False
364       
365        #tag for controlling time stamp increment
366        self.increment_tag = True
367       
368        #register local alias "system" for this component
369        local_components["system"] = self
370        print "Simulation starts:"
371             
372    def register_alias(self):
373        '''register alternate name for the Qsimulator, by registering in slp
374        with another name for the same location. in this case 'system' is the
375        alternate name'''
376        try:
377            slp = Cobalt.Proxy.ComponentProxy("service-location", defer=False)
378        except ComponentLookupError:
379            print >> sys.stderr, "unable to find service-location"
380            qsim_quit()
381        svc_location = slp.locate(self.name)
382        if svc_location:
383            slp.register(self.alias, svc_location)
384    register_alias = automatic(register_alias, 30)
385   
386    def is_finished(self):
387        return self.finished
388    is_finished = exposed(is_finished)
389   
390    def init_partition(self, namelist):
391        '''add all paritions and apply activate and enable'''
392        func = self.add_partitions
393        args = ([{'tag':'partition', 'name':partname, 'size':"*",
394                  'functional':False, 'scheduled':False, 'queue':"*",
395                  'deps':[]} for partname in namelist],)
396        apply(func, args)
397       
398        func = self.set_partitions
399        args = ([{'tag':'partition', 'name':partname} for partname in namelist],
400                {'scheduled':True, 'functional': True})
401        apply(func, args)
402
403       
404    def get_current_time_event(self):
405        return self.time_stamps[self.cur_time_index][0]
406   
407    def get_current_time(self):
408        '''get current time in date format'''
409        return self.time_stamps[self.cur_time_index][1]
410   
411    def get_current_time_sec(self):
412        return self.time_stamps[self.cur_time_index][2]
413    get_current_time_sec = exposed(get_current_time_sec)
414       
415    def get_current_time_job(self):
416        ret = None
417        if self.time_stamps[self.cur_time_index][3].has_key('jobid'):
418            ret = self.time_stamps[self.cur_time_index][3]['jobid']
419        return ret
420   
421    def get_current_time_partition(self):
422        if self.get_current_time_event() in set(["R","S"]):
423            return self.time_stamps[self.cur_time_index][3]['location']
424        else:
425            return None
426   
427    def get_current_time_stamp(self):
428        '''get current time stamp index'''
429        return self.cur_time_index
430    get_current_time_stamp = exposed(get_current_time_stamp)
431   
432    def get_current_time_stamp_tuple(self):
433        return  self.time_stamps[self.cur_time_index]
434
435    def time_increment(self):
436        '''the current time stamp increments by 1'''
437        if self.cur_time_index < len(self.time_stamps) - 1:
438            self.cur_time_index += 1
439            print " "
440            print str(self.get_current_time()) + \
441            " Time stamp is incremented by 1, current time stamp: " + \
442            str(self.cur_time_index)
443        else:
444            print str(self.get_current_time()) +\
445            " Reached maximum time stamp: %s, simulating finished! " \
446             %  (str(self.cur_time_index))
447            self.finished = True
448            self.pbslog.closeLog()
449            qsim_quit()  #simulation completed, exit!!!
450        return self.cur_time_index
451       
452    def insert_time_stamp(self, new_time_date, event, info):
453        '''insert time stamps in the same order'''
454        if event not in SET_event:
455            print "invalid event type,", event
456            return
457       
458        new_time_sec = date_to_sec(new_time_date)
459        new_time_tuple = (event, new_time_date, new_time_sec, info)
460               
461        pos = len(self.time_stamps)
462       
463        while new_time_sec < self.time_stamps[pos-1][2]:
464            pos = pos - 1
465   
466        self.time_stamps.insert(pos, new_time_tuple)
467        #print "insert time stamp ", new_time_tuple, " at pos ", pos
468        return pos
469   
470    def init_queues(self):
471        '''parses the work load log file, initializes queues and sorted time
472        stamp list'''
473       
474        raw_jobs = parse_work_load(self.workload_file)
475        specs = []
476       
477        tag = 0
478        for key in raw_jobs:
479            spec = {'valid':True}
480            tmp = raw_jobs[key]
481           
482            spec['jobid'] = tmp.get('jobid')
483            spec['queue'] = tmp.get('queue')
484           
485            #convert submittime from "%m/%d/%Y %H:%M:%S" to Unix time sec
486            format_sub_time = tmp.get('submittime')
487            if format_sub_time:
488                spec['submittime'] = date_to_sec(format_sub_time)
489                spec['first_subtime'] = spec['submittime']  #set the first submit time               
490            else:
491                spec['valid'] = False
492               
493            #convert walltime from 'hh:mm:ss' to float of minutes
494            format_walltime = tmp.get('Resource_List.walltime')
495            spec['walltime'] = 0
496            if format_walltime:
497                segs = format_walltime.split(':')
498                spec['walltime'] = str(int(segs[0])*60 + int(segs[1]))
499            else:  #invalid job entry, discard
500                spec['valid'] = False
501               
502            if tmp.get('start') and tmp.get('end'):
503                act_run_time = float(tmp.get('end')) - float(tmp.get('start'))
504                spec['runtime'] = str(round(act_run_time, 1))
505               
506                if IDEALWALLTIME:
507                    wtime = (round(act_run_time / 60, 2) + float(spec['walltime']))/2
508                    #wtime = act_run_time / 60
509                    spec['walltime'] = str(round(wtime, 2))
510            else:
511                spec['valid'] = False
512               
513            if tmp.get('Resource_List.nodect'):
514                spec['nodes'] = tmp.get('Resource_List.nodect')
515
516            else:  #invalid job entry, discard
517                spec['valid'] = False
518             
519            if tmp.get('user'):
520                spec['user'] = tmp.get('user')
521            if tmp.get('project'):
522                spec['project'] = tmp.get('project')           
523
524            spec['state'] = 'invisible'
525            spec['start_time'] = '0'
526            spec['end_time'] = '0'
527           
528            #add the job spec to the spec list           
529            if spec['valid'] == True:
530                specs.append(spec)
531               
532        #adjust workload density
533        if FRACTION != 1:
534            tune_workload(specs, FRACTION)
535            print "workload adjusted: last submit job=", specs[len(specs)-1].get('submittime')
536       
537        print "Initializing jobs and time stamps list, wait one moment... ..."
538        for spec in specs:
539            format_sub_time = sec_to_date(spec['submittime']) 
540            if not self.time_stamps.__contains__(format_sub_time):
541                    self.insert_time_stamp(format_sub_time, 'Q', {'jobid':str(spec['jobid'])})
542       
543        print "total job number:", len(specs)
544        self.add_jobs(specs)
545
546        return 0
547   
548    def log_job_event(self, eventtype, timestamp, spec):
549        '''log job events(Queue,Start,End) to PBS-style log'''
550        def len2 (_input):
551            _input = str(_input)
552            if len(_input) == 1:
553                return "0" + _input
554            else:
555                return _input
556        if eventtype == 'Q':  #submitted(queued) for the first time
557            message = "%s;Q;%d;queue=%s" % (timestamp, spec['jobid'], spec['queue'])
558        elif eventtype == 'R':  #resume running after failure recovery
559            message = "%s;R;%s" % (timestamp, ":".join(spec['location']))
560        else:
561            wall_time = spec['walltime']
562            walltime_minutes = len2(int(float(wall_time)) % 60)
563            walltime_hours = len2(int(float(wall_time)) // 60)
564            log_walltime = "%s:%s:00" % (walltime_hours, walltime_minutes)
565            if eventtype == 'S':  #start running
566                message = "%s;S;%d;queue=%s qtime=%s Resource_List.nodect=%s Resource_List.walltime=%s start=%s exec_host=%s" % \
567                (timestamp, spec['jobid'], spec['queue'], spec['submittime'], 
568                 spec['nodes'], log_walltime, spec['start_time'], ":".join(spec['location']))
569            elif eventtype == 'E':  #end
570                message = "%s;E;%d;queue=%s qtime=%s Resource_List.nodect=%s Resource_List.walltime=%s start=%s end=%f exec_host=%s runtime=%s" % \
571                (timestamp, spec['jobid'], spec['queue'], spec['submittime'], spec['nodes'], log_walltime, spec['start_time'], 
572                 round(float(spec['end_time']), 1), ":".join(spec['location']),
573                 spec['runtime'])
574            elif eventtype == 'F':  #failure
575                frag_runtime = round(float(spec['failure_time']) - float(spec['start_time']), 1)  #running time before failure(after the latest start)
576                message = "%s;F;%d;queue=%s qtime=%s Resource_List.nodect=%s Resource_List.walltime=%s exec_host=%s start=%s frag_runtime=%s complete=%f" % \
577                (timestamp, spec['jobid'], spec['queue'], spec['submittime'], 
578                 spec['nodes'], log_walltime, ":".join(spec['location']), spec['start_time'], 
579                 frag_runtime, round(frag_runtime / float(spec['runtime']), 2)
580                )
581            elif eventtype == 'P':  #pending
582                message = "%s;P;%d;queue=%s qtime=%s Resource_List.nodect=%s Resource_List.walltime=%s exec_host=%s start=%s" % \
583                (timestamp, spec['jobid'], spec['queue'], spec['submittime'], 
584                 spec['nodes'], log_walltime, ":".join(spec['location']), spec['start_time'], 
585                )
586            else:
587                print "invalid event type, type=", type
588                return
589        self.pbslog.LogMessage(message)
590               
591    def get_new_states(self, jobspec):
592        '''return the new state updates of a specific job at specific time
593        stamp, including invisible->queued, running->ended'''
594       
595        updates = {}
596        curstate = jobspec['state']
597        newstate = curstate
598        job_id = jobspec['jobid']
599       
600        cur_event = self.get_current_time_event()
601 
602        #handle job submssion event       
603        if cur_event == 'Q' and curstate == "invisible":
604            newstate = "queued"
605            updates['is_runnable'] = True
606            updates['is_visible'] = True
607            self.log_job_event('Q', self.get_current_time(), jobspec)
608       
609        #handle job completion event
610        elif cur_event == 'E' and curstate == "running":
611            newstate = "ended"
612            updates['is_runnable'] = False
613            updates['has_resources'] = False
614            updates['is_visible'] = False
615           
616            #release partition immediately
617            partitions = jobspec['location']
618            for partition in partitions:
619                self.release_partition(partition)
620            self.queues.del_jobs([{'jobid':job_id}])
621           
622            #write to output log
623            if jobspec['end_time']:
624                end = float(jobspec['end_time'])
625            else:
626                end = 0
627            end_datetime = sec_to_date(end)                                                                                                   
628            self.log_job_event('E', end_datetime, jobspec)
629       
630        #handle job failure event       
631        elif cur_event == 'F' and curstate == "running":
632            print "entered failure handling"
633 
634            #release partition
635            partitions = jobspec['location']
636            for partition in partitions:
637                print "partition %s start repairing" % (partition)
638                self.start_repair_partition(partition)
639             
640            #write to output log
641            if jobspec['failure_time']:
642                fail = float(jobspec['failure_time'])
643            else:
644                fail = 0
645            failure_datetime = sec_to_date(fail)
646            self.log_job_event('F', failure_datetime, jobspec)
647            print self.get_current_time(), " job %d failed at %s!!" % (job_id, ":".join(jobspec['location']))
648           
649            rec_updates = self.recovery_mgr(jobspec)
650           
651            if not rec_updates == {}:
652                updates.update(rec_updates)
653               
654            updates['has_resources'] = False
655               
656            if updates.has_key('state'):
657                newstate = updates['state']
658           
659            if CHECKPOINT:
660                print "enter checkpoint handling****"
661                #runtime before failed after latest start
662                frag_runtime = float(jobspec['failure_time']) - float(jobspec['start_time'])
663                updates['remain_time'] = jobspec['remain_time'] - frag_runtime
664               
665        else:#other event
666            pass
667       
668        if updates and not curstate == newstate:
669            print self.get_current_time(), "state changed, job", job_id, \
670             ":", curstate, "->", newstate
671            updates['state'] = newstate
672     
673        return updates
674   
675    def update_job_states(self, specs, updates):
676        '''update the state of the jobs associated to the current time stamp'''
677       
678        def _update_job_states(job, newattr):
679            '''callback function to update job states'''
680            temp = job.to_rx()
681            newattr = self.get_new_states(temp)
682            if newattr:
683                temp.update(newattr)
684                job.update(newattr)
685                   
686        ids_str = self.get_current_time_job()
687        ids = ids_str.split(':')
688        cur_event = self.get_current_time_event()
689        for id in ids:
690            for spec in specs:
691                spec['jobid'] = int(id)
692            ret_jobs = self.queues.get_jobs(specs, _update_job_states, updates)
693            if cur_event == "Q":
694                self.visible_jobs.extend(ret_jobs)
695            elif cur_event=="E":
696                self.visible_jobs = [j for j in self.visible_jobs if j not in ret_jobs]
697        return 0
698     
699    def run_job_updates(self, jobspec, newattr):
700        ''' return the state updates (including state queued -> running,
701        setting the start_time, end_time)'''
702        updates = {}
703       
704        #print "enter run_job_updates, jobspec=", jobspec
705       
706        start = self.get_current_time_sec()
707        updates['start_time'] = start
708        updates['starttime'] = start
709
710        updates['state'] = 'running'
711        updates['system_state'] = 'running'
712        updates['is_runnable'] = False
713        updates['has_resources'] = True
714
715        print self.get_current_time(), "run job state change, job", jobspec['jobid'], \
716             ":", jobspec['state'], "->", updates['state']
717             
718        #determine whether the job is going to fail before completion
719        location = newattr['location']
720        duration = jobspec['remain_time']
721        #print "duration=", duration
722        nearest_failure = self.get_next_failure(location, start, duration)
723        if (nearest_failure):
724            updates['failure_time'] = date_to_sec(nearest_failure)
725            new_time_stamp = nearest_failure
726            self.insert_time_stamp(new_time_stamp, 'F', {'jobid':str(jobspec['jobid'])})
727        else:  # will complete
728            end = start + duration
729            updates['end_time'] = end
730            new_time_stamp = sec_to_date(end)
731            #print "new_time_stamp=", new_time_stamp
732            self.insert_time_stamp(new_time_stamp, 'E', {'jobid':str(jobspec['jobid'])})
733       
734        updates.update(newattr)
735   
736        return updates
737 
738    def start_job(self, specs, updates):
739        '''update the job state and start_time and end_time when cqadm --run
740        is issued to a group of jobs'''
741        partitions = updates['location']
742        for partition in partitions:
743            self.reserve_partition(partition)
744           
745        def _start_job(job, newattr):
746            '''callback function to update job start/end time'''
747            temp = job.to_rx()
748            newattr = self.run_job_updates(temp, newattr)
749            temp.update(newattr)
750            job.update(newattr)
751            self.log_job_event('S', self.get_current_time(), temp)
752        return self.queues.get_jobs(specs, _start_job, updates)
753   
754    def add_jobs(self, specs):
755        '''Add a job, currently for unit test only'''
756        response = self.queues.add_jobs(specs)
757        return response
758    add_jobs = exposed(query(add_jobs))
759   
760    def get_jobs(self, specs):
761        '''get a list of jobs, each time triggers time stamp increment and job
762        states update'''
763
764        jobs = []
765        if self.increment_tag:
766            self.time_increment()
767            eventtype = self.get_current_time_event()
768            print "current event type====", eventtype
769            if eventtype == "R":
770                self.release_repaired_partition()
771               
772                #if the repaired job associated with some pending jobs,
773                #returen empty list to scheduler, in order to ensure the next
774                #time stamp will restart the pending job other than scheduling other jobs at this time stamp
775                #this will avoid run multiple jobs on the same partition(once a bug, solved)
776                if self.get_current_time_job():
777                    return jobs
778                   
779            elif eventtype == "S":
780               
781                self.restart_pending_job() 
782                return jobs
783           
784            else:
785                self.update_job_states(specs, {})
786           
787        if len(self.recovering_jobs) > 0:
788            self.update_recovering_jobs({})
789       
790        self.increment_tag = True
791       
792        jobs = self.visible_jobs           
793#        print "running jobs=", [job.jobid for job in self.running_jobs]
794#        print "queueing jobs=", [job.jobid for job in self.queuing_jobs]
795#        print "visible jobs=", [job.jobid for job in self.visible_jobs]
796#        print "return jobs=", len(jobs)
797
798        return jobs
799    get_jobs = exposed(query(get_jobs))
800   
801    def update_recovering_jobs(self, updates):
802        print "enter update_recovering_jobs()"
803       
804        def _update_recovering_jobs(job, newattr):
805            '''callback function to update job states'''
806            temp = job.to_rx()
807            print "temp=", temp
808            newattr = self.recovery_mgr(temp)
809            print "update_recovering_jobs newattr=", newattr
810            print "temp=", temp
811            if newattr:
812                temp.update(newattr)
813                job.update(newattr)
814               
815        ids = [job.jobid for job in self.recovering_jobs]
816        print "ids=", ids
817       
818        ret = self.queues.get_jobs([{'tag':"job", 'state': "recovering"}], _update_recovering_jobs, updates)
819        return 0
820   
821    def _get_queuing_jobs(self):
822        return [job for job in self.visible_jobs if job.is_runnable==True]
823    queuing_jobs = property(_get_queuing_jobs)
824   
825    def _get_running_jobs(self):
826        return [job for job in self.visible_jobs if job.has_resources==True]
827    running_jobs = property(_get_running_jobs)
828   
829    def _get_recovering_jobs(self):
830        return self.queues.get_jobs([{'jobid':"*", 'state':"recovering"}])
831    recovering_jobs = property(_get_recovering_jobs)
832   
833    def get_visible_jobs(self):
834        return self.visible_jobs;
835    get_visible_jobs = exposed(get_visible_jobs)
836   
837    def get_running_jobs(self):
838        return [job for job in self.visible_jobs if job.has_resources==True]
839    get_running_jobs = exposed(get_running_jobs)
840   
841    def get_queuing_jobs(self):
842        return [job for job in self.visible_jobs if job.is_runnable==True]
843    get_queuing_jobs = exposed(get_queuing_jobs)
844   
845    def _get_job_by_id(self, jobid):
846        jobs = self.queues.get_jobs([{'jobid':jobid}])
847        if len(jobs) == 1:
848            return jobs[0]
849        else:
850            return None
851   
852    def add_queues(self, specs):
853        '''add queues'''
854        return self.queues.add_queues(specs)
855    add_queues = exposed(query(add_queues))
856   
857    def get_queues(self, specs):
858        '''get queues'''
859        return self.queues.get_queues(specs)
860    get_queues = exposed(query(get_queues))
861
862    def run_jobs(self, specs, nodelist):
863        '''run a queued job, by updating the job state, start_time and
864        end_time'''
865        print "run job specs=", specs, " on partion", nodelist
866        if specs:
867            self.start_job(specs, {'location': nodelist})
868            #set tag false, enable scheduling another job at the same time
869            self.increment_tag = False
870        #print "current running jobs=", [job.jobid for job in self.running_jobs]
871        return self.running_jobs
872    run_jobs = exposed(query(run_jobs))
873   
874   
875    def get_midplanes(self, partname):
876        '''return a list of sub-partitions each contains 512-nodes(midplane)'''
877        midplane_list = []
878        partition = self._partitions[partname]
879       
880        if partition.size == MIDPLANE_SIZE:
881            midplane_list.append(partname)
882        elif partition.size > MIDPLANE_SIZE:
883            children = partition.children
884            for part in children:
885                if self._partitions[part].size == MIDPLANE_SIZE:
886                    midplane_list.append(part)
887        else:
888            parents = partition.parents
889            for part in parents:
890                if self._partitions[part].size == MIDPLANE_SIZE:
891                    midplane_list.append(part)
892                           
893        return midplane_list
894   
895    def get_next_failure(self, location, now, duration): 
896        '''return the next(closest) failure moment according the partition failure list'''
897       
898        if (self.FAILURE_FREE):
899            return None
900       
901        def _find_next_failure(partname, now):
902            next = None
903            failure_list = self.failure_dict[partname]
904            if failure_list:
905                for fail_time in failure_list:
906                    if date_to_sec(fail_time) > now:
907                        next = fail_time
908                        break
909            return next
910                                       
911        closest_fail_sec = MAXINT
912        partitions = location
913
914        midplanes = set()
915        for partition in partitions:
916            tmp_midplanes = self.get_midplanes(partition)
917            for item in tmp_midplanes:
918                if item not in midplanes:
919                    midplanes.add(item)
920                       
921        for midplane in midplanes:
922            next = _find_next_failure(midplane, now)
923            if (next):
924                next_sec = date_to_sec(next)
925                if next_sec < closest_fail_sec:
926                    closest_fail_sec =next_sec
927                       
928        if closest_fail_sec == MAXINT:
929            next_failure_date = None
930        else:
931            job_end_sec = now + duration
932            if closest_fail_sec < job_end_sec:
933                next_failure_date = sec_to_date(closest_fail_sec)
934            else:
935                next_failure_date = None
936               
937        #print "next_failure_date=", next_failure_date
938               
939        return next_failure_date                 
940
941    def will_job_fail(self, mtbf, nodes, hours):
942        '''simulate static failure chance, [not used]'''
943        return False
944        print "mtbf=%d, nodes=%d, hours=%f" % (mtbf,nodes,hours)
945        failure_chance = 1 - (1 - hours * 1.0/mtbf) ** nodes
946        if failure_chance > 0.7 :
947            failure_chance = 0.7
948        random_num = random.random()
949        print "failure chance=%f, random_num=%f" % (failure_chance, random_num)
950        if random_num < failure_chance:
951            return True
952        else:
953            return False
954       
955    def nodes_static(self):
956        '''static the node requested by each job, [not used]'''
957        jobs = self.queues.get_jobs([{'jobid':"*", 'queue':"*", 'nodes':"*"}])
958        nodesdict = {}
959        for job in jobs:
960            nodes = int(job.nodes)
961            nodesstr = nodes
962            if (nodesdict.has_key(nodesstr)):
963                nodesdict[nodesstr] =  nodesdict[nodesstr] + 1
964            else:
965                nodesdict[nodesstr] = 1
966        keys = nodesdict.keys()
967        keys.sort()
968        for key in keys:
969            print key, ":", nodesdict[key]
970           
971    def gen_failure_list(self, scale, shape, startdate, enddate):
972        '''generate a synthetic failure time list based on weibull distribution
973         and start/end date time'''
974        failure_moments = []
975        ttf_list = []
976               
977        start = date_to_sec(startdate)
978        end = date_to_sec(enddate)
979       
980        cur_failure = start
981       
982        while True:
983            ttf = random.weibullvariate(scale,shape)
984            cur_failure += ttf
985            if cur_failure < end:
986                ttf_list.append(ttf)
987                failure_moments.append(sec_to_date(cur_failure))
988            else:
989                break
990        return failure_moments, ttf_list
991   
992    def make_failures(self):
993        '''generate failure lists for each 512-nodes partition'''
994        ttf_dict = {}
995        start = self.time_stamps[1][1]
996        end = self.time_stamps[len(self.time_stamps)-1][1]
997       
998        for partition in self._partitions.values():
999            if partition.size == MIDPLANE_SIZE:
1000                fl, ttfs = self.gen_failure_list(self.SCALE, self.SHAPE, start, end)
1001                self.failure_dict[partition.name] = fl
1002                ttf_dict[partition.name] = ttfs
1003                       
1004        partnames = self.failure_dict.keys()
1005        partnames.sort()
1006        f = open(default_FAILURE_LOG, "w")
1007        total_f = 0
1008        mtbf = 0
1009        for part in partnames:
1010            f_list = self.failure_dict[part]
1011            print part, " ", f_list
1012            f.write("%s;%s\n" % (part, ";".join(f_list)))
1013            total_f +=  len(f_list)
1014           
1015            ttfs = ttf_dict[part] 
1016            if len(ttfs)==0:
1017                mtbf = 0
1018            else:
1019                total = 0
1020               
1021                for ttf in ttfs:
1022                    total += ttf
1023                    mtbf = total / len(ttfs)
1024        start_sec = date_to_sec(start)
1025        end_sec = date_to_sec(end)
1026        f.write("Total=%d\nMTBF=%f" % (total_f, (end_sec-start_sec)/(total_f*3600)))
1027
1028        f.close()
1029       
1030    def inject_failures(self):
1031        '''parse failure trace log to make failure list for each 1-midplane partition'''
1032               
1033        raw_job_dict = {}
1034        partnames = set(self._partitions.keys())
1035        flog = open(self.failure_log, "r")
1036        self.failure_dict = {}
1037        for line in flog:
1038            print "line=", line
1039            line = line.strip('\n')
1040            parsedline = line.split(";")
1041            print "parsedline=", parsedline
1042            failure_list = []
1043            part = parsedline[0]
1044            if part in partnames:
1045                for i in range(1, len(parsedline)):
1046                    failure_moment = parsedline[i]
1047                    if len(failure_moment) == 0:
1048                        continue
1049                    failure_list.append(failure_moment)
1050                self.failure_dict[part] = failure_list
1051        partnames = self.failure_dict.keys()
1052        partnames.sort()
1053        for part in partnames:
1054            f_list = self.failure_dict[part]
1055            print part, " ", f_list   
1056       
1057    def get_failure_chance(self, location, duration):
1058        now = date_to_sec(self.get_current_time())
1059        next_fail = self.get_next_failure(location, now, duration)
1060        if (next_fail != None):
1061            return self.SENSITIVITY
1062        else:
1063            return 1 - self.SPECIFICITY
1064    get_failure_chance = exposed(get_failure_chance)
1065   
1066    def recovery_mgr(self, jobspec):
1067        """Recovery manager, this function can be extended to support various recovery options.
1068        at this version, the failed job is sent back to the rear of the queue. The extended code
1069        is ready and available at private code branch(wtang)."""
1070   
1071        updates = {}
1072       
1073        updates = self.handle_reque_rear(jobspec) 
1074
1075        recovery_option = jobspec['recovery_opt']
1076        print "rec_opt=", recovery_option
1077       
1078        #if_else structure remains room for recovery option extending   
1079        if recovery_option == 1:
1080            #resubmit the job
1081            #resubmit the job, the submit time changed to NOW
1082            updates = self.handle_reque_rear(jobspec) 
1083       
1084        return updates
1085
1086    def handle_reque_rear(self, jobspec):
1087        '''handle option 1 - resubmit the job to rear of waiting queue'''
1088        updates = {}
1089        updates['state'] = "queued"
1090        updates['start_time'] = 0
1091        updates['submittime'] = self.get_current_time_sec()
1092        return updates
1093       
1094    def start_repair_partition(self, partname):
1095        '''partition failed, assuming get repaired MTTR seconds later'''
1096        now = self.get_current_time_sec()
1097        time_to_repair = now + MTTR
1098        time_to_repair_date = sec_to_date(time_to_repair)
1099        self.insert_time_stamp(time_to_repair_date, "R", {'location':partname})
1100       
1101    def release_repaired_partition(self):
1102        '''enter release_repaired_partition() partition repaired'''
1103        partition = self.get_current_time_partition()
1104        if partition == None:
1105            return False
1106        self.release_partition(partition)
1107        print "partition %s gets repaired" % (partition)
1108        self.log_job_event('R', self.get_current_time(), {'location':partition})
1109        return True
1110   
1111    def restart_pending_job(self):
1112        '''restart jobs that pending for the nodes repair'''
1113        partname = self.get_current_time_partition()
1114        print "enter restart_pending_job() partname=", partname
1115       
1116        ids_str = self.get_current_time_job()
1117        ids = ids_str.split(':')
1118        jobspecs = []
1119        for id in ids:
1120            spec = {'tag':'job', 'jobid':int(id)}
1121            jobspecs.append(spec)
1122        print "restart pending job ", jobspecs, " on repaired partition ", partname
1123        self.run_jobs(jobspecs, [partname])
1124       
1125    def possible_locations(self, job):
1126        '''find the partitions with the size that can right accomodates the job
1127        (returned partions are not necessarily idle)'''
1128        locations = []
1129        proper_partsize = 64
1130        job_nodes = int(job['nodes'])
1131       
1132        for psize in self.part_size_list:
1133            if psize >= job_nodes:
1134                proper_partsize = psize
1135                break             
1136           
1137        for part in self.cached_partitions.itervalues():
1138            if int(part.size) == proper_partsize:
1139                locations.append(part)
1140               
1141        return locations
1142
1143    def _find_job_location(self, args, drain_partitions=set(), backfilling=False):
1144        jobid = args['jobid']
1145        nodes = args['nodes']
1146        queue = args['queue']
1147        utility_score = args['utility_score']
1148        walltime = args['walltime']
1149        forbidden = args.get("forbidden", [])
1150        required = args.get("required", [])
1151
1152        best_score = sys.maxint
1153        best_partition = None
1154       
1155        # get partitions of proper size as the candidates
1156        candidate_partitions = self.possible_locations(args)
1157        #exclude the partitions already drained
1158        if drain_partitions:
1159            candidate_partitions = [part for part in candidate_partitions if part not in drain_partitions]
1160       
1161        now = self.get_current_time_sec()         
1162        for partition in candidate_partitions:
1163           
1164            #skip partitions that are not "idle"
1165            if partition.state != "idle":
1166                continue
1167           
1168            if backfilling:
1169                #skip the partition with too short cutoff to backfill the job
1170                if 60*float(walltime) > (partition.backfill_time - now):
1171                    continue
1172               
1173            # let's check the impact on partitions that would become blocked
1174            score = 0
1175            for p in partition.parents:
1176                if self.cached_partitions[p].state == "idle" and self.cached_partitions[p].scheduled:
1177                    score += 1
1178               
1179                for ch in partition.children:
1180                    score += 0.01
1181               
1182                if (FAULTAWARE):
1183                    Pf = 0
1184                    Pf = self.get_failure_chance(partition.name, 60*float(walltime))
1185                    score += Pf
1186               
1187                # the lower the score, the fewer new partitions will be blocked by this selection
1188                if score < best_score:
1189                    best_score = score
1190                    best_partition = partition
1191                elif score == best_score:
1192                    if partition.name > best_partition.name:
1193                        best_partition = partition
1194
1195        if best_partition:
1196            #print "return bestpartition=",{jobid: [best_partition.name, best_partition.state]}
1197            return {jobid: [best_partition.name]}
1198 
1199    def find_job_location(self, arg_list, end_times):
1200               
1201        best_partition_dict = {}
1202       
1203        if self.bridge_in_error:
1204            print "bridge_in_error"
1205            return {}
1206       
1207        self.cached_partitions = self.partitions
1208
1209        # first, figure out backfilling cutoffs per partition (which we'll also
1210        # use for picking which partition to drain)
1211        job_end_times = {}
1212        for item in end_times:
1213            job_end_times[item[0][0]] = item[1]
1214                   
1215        now = self.get_current_time_sec()
1216        for p in self.cached_partitions.itervalues():
1217            if p.state == "idle":
1218                p.backfill_time = now
1219            else:
1220                p.backfill_time = now + 5*60
1221            p.draining = False
1222       
1223        for p in self.cached_partitions.itervalues():   
1224            if p.name in job_end_times:
1225                if job_end_times[p.name] > p.backfill_time:
1226                    p.backfill_time = job_end_times[p.name]
1227               
1228                for parent_name in p.parents:
1229                    parent_partition = self.cached_partitions[parent_name]
1230                    if p.backfill_time > parent_partition.backfill_time:
1231                        parent_partition.backfill_time = p.backfill_time
1232       
1233        for p in self.cached_partitions.itervalues():
1234            if p.backfill_time == now:
1235                continue
1236           
1237            for child_name in p.children:
1238                child_partition = self.cached_partitions[child_name]
1239                if child_partition.backfill_time == now or child_partition.backfill_time > p.backfill_time:
1240                    child_partition.backfill_time = p.backfill_time
1241       
1242        # first time through, try for starting jobs based on utility scores
1243        drain_partitions = set()
1244        # the sets draining_jobs and cannot_start are for efficiency, not correctness
1245        draining_jobs = set()
1246        cannot_start = set()
1247
1248        for idx in range(len(arg_list)):
1249            winning_job = arg_list[idx]
1250            for jj in range(idx, len(arg_list)):
1251                job = arg_list[jj]
1252               
1253                # this job isn't good enough!
1254                if job['utility_score'] < winning_job['threshold']:
1255                    break
1256
1257                if job['jobid'] not in cannot_start:
1258                    partition_name = self._find_job_location(job, drain_partitions)
1259                    if partition_name:
1260                        best_partition_dict.update(partition_name)
1261                        break
1262               
1263                cannot_start.add(job['jobid'])
1264               
1265                # we already picked a drain location for the winning job
1266                if winning_job['jobid'] in draining_jobs:
1267                    continue
1268
1269                location = self._find_drain_partition(winning_job)
1270                if location is not None:
1271                    for p_name in location.parents:
1272                        drain_partitions.add(self.cached_partitions[p_name])
1273                    for p_name in location.children:
1274                        drain_partitions.add(self.cached_partitions[p_name])
1275                        self.cached_partitions[p_name].draining = True
1276                    drain_partitions.add(location)
1277                    #self.logger.info("job %s is draining %s" % (winning_job['jobid'], location.name))
1278                    #self.dbglog.LogMessage("job %s is draining %s" % (winning_job['jobid'], location.name))
1279                    location.draining = True
1280                    draining_jobs.add(winning_job['jobid'])
1281           
1282            # at this time, we only want to try launching one job at a time
1283            if best_partition_dict:
1284                 
1285            #    msg =  "idx=%s, jj=%s, job=%s, partition=%s" % (idx, jj, job['jobid'],best_partition_dict[job['jobid']])
1286                #print msg               
1287            #    self.dbglog.LogMessage(msg)
1288                break
1289       
1290        # the next time through, try to backfill, but only if we couldn't find anything to start
1291        if not best_partition_dict:
1292           
1293        # arg_list.sort(self._walltimecmp)
1294        #   msg = "try to backfill jobs..."
1295        #   self.dbglog.LogMessage(msg)
1296
1297            for args in arg_list:
1298                partition_name = self._find_job_location(args, backfilling=True)
1299                if partition_name:
1300                    msg = "backfilling job %s(%s)" % (args['jobid'], args['nodes'])
1301                    self.logger.info(msg)
1302                    self.dbglog.LogMessage(msg)
1303                    best_partition_dict.update(partition_name)
1304                    break               
1305                   
1306        # reserve the stuff in the best_partition_dict, as those partitions are allegedly going to
1307        # be running jobs very soon
1308        #
1309        # also, this is the only part of finding a job location where we need to lock anything
1310        #self._partitions_lock.acquire()
1311        try:
1312            for p in self.partitions.itervalues():
1313                # push the backfilling info from the local cache back to the real objects
1314                p.draining = self.cached_partitions[p.name].draining
1315                p.backfill_time = self.cached_partitions[p.name].backfill_time
1316               
1317            for partition_list in best_partition_dict.itervalues():
1318                part = self.partitions[partition_list[0]] 
1319                ##part.reserved_until = self.get_current_time_sec() + 5*60
1320                part.state = "starting job"
1321                for p in part._parents:
1322                    if p.state == "idle":
1323                        p.state = "blocked by starting job"
1324                for p in part._children:
1325                    if p.state == "idle":
1326                        p.state = "blocked by starting job"
1327        except:
1328            self.logger.error("error in find_job_location", exc_info=True)
1329        #self._partitions_lock.release()
1330       
1331        #print "best_partition_dict=", best_partition_dict
1332       
1333        return best_partition_dict
1334    find_job_location = locking(exposed(find_job_location))
Note: See TracBrowser for help on using the browser.