root/trunk/src/lib/Components/qsim.py @ 1600

Revision 1600, 49.3 KB (checked in by wtang, 5 months ago)

change 'location' attribute from string to list, corresponding to the change of new cqm

Line 
1#!/usr/bin/env python
2
3'''Cobalt Queue Simulator library'''
4
5import ConfigParser
6import copy
7import logging
8import math
9import os
10import os.path
11import random
12import signal
13import sys
14import time
15import urlparse
16
17from ConfigParser import SafeConfigParser, NoSectionError, NoOptionError
18from datetime import datetime
19
20import Cobalt
21import Cobalt.Cqparse
22import Cobalt.Util
23
24from Cobalt.Components.base import Component, exposed, query, automatic, locking
25from Cobalt.Components.cqm import QueueDict, Queue
26from Cobalt.Components.simulator import Simulator
27from Cobalt.Data import Data, DataList
28from Cobalt.Exceptions import ComponentLookupError
29from Cobalt.Proxy import ComponentProxy, local_components
30from Cobalt.Server import XMLRPCServer, find_intended_location
31
32MAXINT = 2021072587
33MIDPLANE_SIZE = 512
34default_SCALE = 2000000
35default_SHAPE = 0.9 
36default_SENSITIVITY = 0.7
37default_SPECIFICITY = 0.9
38default_FAILURE_LOG = "failure.lists"
39
40logging.basicConfig()
41logger = logging.getLogger('Qsim')
42
43RECOVERYOPT = 1 # by default, the failed job is sent back to the rear of the queue
44CHECKPOINT = False  #not used in this version
45MTTR = 1200   #time to repair partition(in sec), a failed partition will be available again in MTTR seconds,
46FRACTION = 1  #factor to tune workload, the times between job arrival will be multipled with FRACTION.(1 means no change.)
47SET_event = set(['I', 'Q', 'S', 'E', 'F', 'R'])
48FAULTAWARE = False
49
50def parseline(line):
51    '''parse a line in work load file, return a temp
52    dictionary with parsed fields in the line'''
53    temp = {}
54    firstparse = line.split(';')
55    temp['EventType'] = firstparse[1]
56    if temp['EventType'] == 'Q':
57        temp['submittime'] = firstparse[0]
58    temp['jobid'] = firstparse[2]
59    substr = firstparse.pop()
60    if len(substr) > 0:
61        secondparse = substr.split(' ')
62        for item in secondparse:
63            tup = item.partition('=')
64            if not temp.has_key(tup[0]):
65                temp[tup[0]] = tup[2]
66    return temp
67
68def parse_work_load(filename):
69    '''parse the whole work load file, return a raw job dictionary''' 
70    temp = {'jobid':'*', 'submittime':'*', 'queue':'*', 
71            'Resource_List.walltime':'*','nodes':'*', 'runtime':'*'}
72    # raw_job_dict = { '<jobid>':temp, '<jobid2>':temp2, ...}
73    raw_job_dict = {}
74    wlf = open(filename, 'r')
75    for line in wlf:
76        if line[0].isdigit():
77            line = line.strip('\n')
78            temp = parseline(line)
79            jobid = temp['jobid']
80            #new job id encountered, add a new entry for this job
81            if not raw_job_dict.has_key(jobid):
82                raw_job_dict[jobid] = temp
83            else:  #not a new job id, update the existing entry
84                raw_job_dict[jobid].update(temp)
85    return raw_job_dict
86
87def tune_workload(specs, frac):
88    '''tune workload heavier or lighter'''
89   
90    print "inside tune_workload"
91   
92    def _subtimecmp(spec1, spec2):
93        return cmp(spec1.get('submittime'), spec2.get('submittime'))
94   
95    specs.sort(_subtimecmp)
96       
97    #calc mtbs
98    lastsubtime = 0
99    for spec in specs:
100        if (lastsubtime==0):
101            interval = 0
102        else:
103            interval = spec['submittime'] - lastsubtime
104        lastsubtime =  spec['submittime']
105        spec['interval'] = interval
106   
107     #tune workload heavy or light
108   
109    last_newsubtime = specs[0].get('submittime')
110    i = 0
111    for spec in specs:
112        interval = spec['interval']
113        newsubtime = last_newsubtime + frac* interval
114        spec['submittime'] = newsubtime
115        spec['interval'] = frac* interval
116        last_newsubtime = newsubtime
117        i += 1
118   
119    print "in adjust: last submit job=", specs[len(specs)-1].get('submittime')
120
121def sec_to_date(sec, format="%m/%d/%Y %H:%M:%S"):
122    tmp = datetime.fromtimestamp(sec)
123    fmtdate = tmp.strftime(format)
124    return fmtdate   
125                     
126def date_to_sec(fmtdate, format="%m/%d/%Y %H:%M:%S"):
127    t_tuple = time.strptime(fmtdate, format)
128    sec = time.mktime(t_tuple)
129    return sec
130
131def qsim_quit():
132    print "pid=", os.getpid()
133    os.kill(os.getpid(), signal.SIGINT)
134
135class Job (Data):
136    '''Job for simulation
137    Job attribute description and type:
138    jobid: int
139    submittime:  unix second, float
140    queue: queue name, string
141    walltime: estimate runtime, minutes, string
142    nodes: node number, string
143    runtime: seconds, string
144    remain_time: seconds, float
145    start_time: unix second, float
146    end_time: unix second, float
147    failure_time: unix second, float
148    location: list of string(partition name)
149    state: ['invisible', 'running', 'queued', 'ended', 'pending']  string
150    is_visible: true/false
151    recovery_opt,     #0-4
152    first_subtime: unix second, float, the time that the job sumibitted for the first time
153    enque_time: the time the job start waiting in queue, used by scheduler?
154    '''
155   
156    fields = Data.fields + ["jobid", "submittime", "queue", "walltime",
157                            "nodes","runtime", "start_time", "end_time",
158                            "failure_time", "location", "state", "is_visible", 
159                            "args",
160                            "system_state",
161                            "starttime",
162                            "project",
163                            "is_runnable",
164                            "is_active",
165                            "has_resources",
166                            #below are qsim specific fields
167                            "remain_time",   
168                            "recovery_opt",     #0-4
169                            "arrival_time",
170                            "enque_time",
171                            "checkpoint",   #0,1
172                            ]   
173
174    def __init__(self, spec):
175        Data.__init__(self, spec)
176        self.tag = 'job'
177        #following fields are initialized at beginning of simulation
178        self.jobid = int(spec.get("jobid"))
179        self.queue = spec.get("queue", "default")
180        #self.queue = "default"
181               
182        self.submittime = spec.get("submittime")   #in seconds
183       
184        self.walltime = spec.get("walltime")   #in minutes
185       
186        self.nodes = spec.get("nodes", 0)
187        self.runtime = spec.get("runtime", 0)
188        self.remain_time = float(self.runtime)       
189        self.start_time = spec.get('start_time', '0')
190        self.end_time = spec.get('end_time', '0')
191        self.state = spec.get("state", "invisible")
192        self.system_state = ''
193        self.starttime = 0
194        self.arrival_time = 0
195        self.failure_time = 0
196        self.has_resources = False
197        self.is_runnable = False
198        self.is_visible = False
199        self.args = []
200        self.progress = 0
201        self.recovery_opt = spec.get("recovery_opt", RECOVERYOPT)
202        self.checkpoint = 1
203        self.location = []
204
205class JobList(DataList):
206    '''the list of job objects'''
207    item_cls = Job
208   
209    def __init__(self, _queue):
210        self.queue = _queue
211
212class SimQueue (Queue):
213    '''SimQueue object, extended from cqm.Queue,
214     the attribute jobs is qsim.JobList'''
215   
216    def __init__(self, spec):
217        Queue.__init__(self, spec)
218        self.jobs = JobList(self)
219        self.state = 'running'
220        self.tag = 'queue'
221       
222    def get_joblist(self):
223        '''return the job list'''
224        return self.jobs
225 
226class SimQueueDict(QueueDict):
227    '''Queue Dict class for simulating, extended from cqm.QueueDict'''
228    item_cls = SimQueue
229    key = "name"
230
231    def __init__(self, policy):
232        QueueDict.__init__(self)
233        self.policy = policy
234        #create default queue
235        self.add_queues([{"name":"default", "policy":self.policy}])         
236 
237    def add_jobs(self, specs, callback=None, cargs={}):
238        '''add jobs to queues, if specified queue not exist, create one''' 
239        queue_names = self.keys()
240        for spec in specs:
241            if spec['queue'] not in queue_names:
242                self.add_queues([{"name":spec['queue'], "policy":self.policy}])
243                queue_names.append(spec['queue'])
244               
245        results = []
246         # add the jobs to the appropriate JobList
247        for spec in specs:
248            results += self[spec['queue']].jobs.q_add([spec], callback, cargs)
249           
250        return results
251
252class PBSlogger:
253    '''Logger to generate PBS-style event log'''
254
255    def __init__(self, name):
256        #get log directory
257        CP = ConfigParser.ConfigParser()
258        CP.read(Cobalt.CONFIG_FILES)
259        try:
260            self.logdir = os.path.expandvars(CP.get('cqm', 'log_dir'))
261        except ConfigParser.NoOptionError:
262            self.logdir = '.'
263           
264        #determine log filename
265        if name:
266            filename = "%s/qsim-%s.log" % (self.logdir, name)
267        else:
268            self.date = time.localtime()[:3]
269            date_string = "%s_%02d_%02d" % self.date
270            filename = "%s/qsim-%s.log" % (self.logdir, date_string)   
271       
272        self.logfile = open(filename, 'w')
273        self.name = name
274
275    def closeLog(self):
276        self.logfile.close()
277
278    def LogMessage(self, message):
279        '''log message into pbs-style log'''
280        try:
281            self.logfile.write("%s\n" % (message))
282            self.logfile.flush()
283        except IOError, e:
284            logger.error("PBSlogger failure : %s" % e)
285   
286class Qsimulator(Simulator):
287    '''Cobalt Queue Simulator'''
288   
289    implementation = "qsim"
290    name = "queue-manager"
291    alias = Simulator.name
292
293    def __init__(self, *args, **kwargs):
294       
295        print "kwargs= ",  kwargs
296       
297        #initialize partitions
298        Simulator.__init__(self, *args, **kwargs)
299        partnames = self._partitions.keys()
300        self.init_partition(partnames)
301        self.part_size_list = []
302        for part in self.partitions.itervalues():
303            if int(part.size) not in self.part_size_list:
304                self.part_size_list.append(int(part.size))
305        self.part_size_list.sort()
306   
307        #get command line parameters
308        self.FAILURE_FREE = True
309        self.FRACTION = kwargs.get("fraction", 1)
310        self.workload_file =  kwargs.get("workload")
311        self.output_log = kwargs.get("outputlog")
312        self.failure_log = kwargs.get('failurelog')
313       
314        self.weibull = kwargs.get('weibull')
315        if self.weibull:
316            self.SCALE = float(kwargs.get('scale'))
317            if self.SCALE == 0:
318                self.SCALE = default_SCALE
319            self.SHAPE = float(kwargs.get('shape'))
320            if self.SHAPE == 0:
321                self.SHAPE = default_SHAPE
322       
323        self.fault_aware = kwargs.get('faultaware')
324        self.SENSITIVITY = default_SENSITIVITY
325        self.SPECIFICITY = default_SPECIFICITY
326        if self.fault_aware:
327            self.SENSITIVITY = float(kwargs.get('sensitivity', default_SENSITIVITY))
328            self.SPECIFICITY = float(kwargs.get('specificity', defalt_SPECIFICITY))
329               
330        if self.failure_log or self.weibull:
331            self.FAILURE_FREE = False
332       
333        #initialize time stamps and job queues
334        #time stamp format: ('EVENT', 'time_stamp_date', time_stamp_second, {'job_id':str(jobid), 'location':[partition1, partition2,...]})
335        self.time_stamps = [('I', '0', 0, {})]
336        self.cur_time_index = 0
337        self.queues = SimQueueDict(policy=kwargs['policy'])
338        self.init_queues()
339       
340        #initialize failures
341        self.failure_dict = {}
342        if not self.FAILURE_FREE:
343            if self.failure_log: 
344                #if specified failure log, use log trace failure
345                self.inject_failures()
346            elif self.weibull:
347                #else MAKE failures by Weibull distribution
348                self.make_failures()
349       
350        #initialize PBS-style logger
351        self.pbslog = PBSlogger(self.output_log)
352       
353        #finish tag
354        self.finished = False
355       
356        #tag for controlling time stamp increment
357        self.increment_tag = True
358       
359        #register local alias "system" for this component
360        local_components["system"] = self
361        print "Simulation starts:"
362             
363    def register_alias(self):
364        '''register alternate name for the Qsimulator, by registering in slp
365        with another name for the same location. in this case 'system' is the
366        alternate name'''
367        try:
368            slp = Cobalt.Proxy.ComponentProxy("service-location", defer=False)
369        except ComponentLookupError:
370            print >> sys.stderr, "unable to find service-location"
371            qsim_quit()
372        svc_location = slp.locate(self.name)
373        if svc_location:
374            slp.register(self.alias, svc_location)
375    register_alias = automatic(register_alias, 30)
376   
377    def is_finished(self):
378        return self.finished
379    is_finished = exposed(is_finished)
380   
381    def init_partition(self, namelist):
382        '''add all paritions and apply activate and enable'''
383        func = self.add_partitions
384        args = ([{'tag':'partition', 'name':partname, 'size':"*",
385                  'functional':False, 'scheduled':False, 'queue':"*",
386                  'deps':[]} for partname in namelist],)
387        apply(func, args)
388       
389        func = self.set_partitions
390        args = ([{'tag':'partition', 'name':partname} for partname in namelist],
391                {'scheduled':True, 'functional': True})
392        apply(func, args)
393
394       
395    def get_current_time_event(self):
396        return self.time_stamps[self.cur_time_index][0]
397   
398    def get_current_time(self):
399        '''get current time in date format'''
400        return self.time_stamps[self.cur_time_index][1]
401   
402    def get_current_time_sec(self):
403        return self.time_stamps[self.cur_time_index][2]
404    get_current_time_sec = exposed(get_current_time_sec)
405       
406    def get_current_time_job(self):
407        ret = None
408        if self.time_stamps[self.cur_time_index][3].has_key('jobid'):
409            ret = self.time_stamps[self.cur_time_index][3]['jobid']
410        return ret
411   
412    def get_current_time_partition(self):
413        if self.get_current_time_event() in set(["R","S"]):
414            return self.time_stamps[self.cur_time_index][3]['location']
415        else:
416            return None
417   
418    def get_current_time_stamp(self):
419        '''get current time stamp index'''
420        return self.cur_time_index
421    get_current_time_stamp = exposed(get_current_time_stamp)
422   
423    def get_current_time_stamp_tuple(self):
424        return  self.time_stamps[self.cur_time_index]
425
426    def time_increment(self):
427        '''the current time stamp increments by 1'''
428        if self.cur_time_index < len(self.time_stamps) - 1:
429            self.cur_time_index += 1
430            print " "
431            print str(self.get_current_time()) + \
432            " Time stamp is incremented by 1, current time stamp: " + \
433            str(self.cur_time_index)
434        else:
435            print str(self.get_current_time()) +\
436            " Reached maximum time stamp: %s, simulating finished! " \
437             %  (str(self.cur_time_index))
438            self.finished = True
439            self.pbslog.closeLog()
440            qsim_quit()  #simulation completed, exit!!!
441        return self.cur_time_index
442       
443    def insert_time_stamp(self, new_time_date, event, info):
444        '''insert time stamps in the same order'''
445        if event not in SET_event:
446            print "invalid event type,", event
447            return
448       
449        new_time_sec = date_to_sec(new_time_date)
450        new_time_tuple = (event, new_time_date, new_time_sec, info)
451               
452        pos = len(self.time_stamps)
453       
454        while new_time_sec < self.time_stamps[pos-1][2]:
455            pos = pos - 1
456   
457        self.time_stamps.insert(pos, new_time_tuple)
458        #print "insert time stamp ", new_time_tuple, " at pos ", pos
459        return pos
460   
461    def init_queues(self):
462        '''parses the work load log file, initializes queues and sorted time
463        stamp list'''
464       
465        raw_jobs = parse_work_load(self.workload_file)
466        specs = []
467       
468        tag = 0
469        for key in raw_jobs:
470            spec = {'valid':True}
471            tmp = raw_jobs[key]
472           
473            spec['jobid'] = tmp.get('jobid')
474            spec['queue'] = tmp.get('queue')
475           
476            #convert submittime from "%m/%d/%Y %H:%M:%S" to Unix time sec
477            format_sub_time = tmp.get('submittime')
478            if format_sub_time:
479                spec['submittime'] = date_to_sec(format_sub_time)
480                spec['first_subtime'] = spec['submittime']  #set the first submit time               
481            else:
482                spec['valid'] = False
483               
484            #convert walltime from 'hh:mm:ss' to float of minutes
485            format_walltime = tmp.get('Resource_List.walltime')
486            if format_walltime:
487                segs = format_walltime.split(':')
488                spec['walltime'] = str(int(segs[0])*60 + int(segs[1]))
489            else:  #invalid job entry, discard
490                spec['valid'] = False
491           
492            if tmp.get('start') and tmp.get('end'):
493                act_run_time = float(tmp.get('end')) - float(tmp.get('start'))
494                spec['runtime'] = str(round(act_run_time, 1))
495            else:
496                spec['valid'] = False
497               
498            if tmp.get('Resource_List.nodect'):
499                spec['nodes'] = tmp.get('Resource_List.nodect')
500            else:  #invalid job entry, discard
501                spec['valid'] = False
502           
503            spec['state'] = 'invisible'
504            spec['start_time'] = '0'
505            spec['end_time'] = '0'
506           
507            #add the job spec to the spec list           
508            if spec['valid'] == True:
509                specs.append(spec)
510               
511        #adjust workload density
512        if FRACTION != 1:
513            tune_workload(specs, FRACTION)
514            print "workload adjusted: last submit job=", specs[len(specs)-1].get('submittime')
515       
516        print "Initializing jobs and time stamps list, wait one moment... ..."
517        for spec in specs:
518            format_sub_time = sec_to_date(spec['submittime']) 
519            if not self.time_stamps.__contains__(format_sub_time):
520                    self.insert_time_stamp(format_sub_time, 'Q', {'jobid':str(spec['jobid'])})
521       
522        print "total job number:", len(specs)
523        self.add_jobs(specs)
524
525        return 0
526   
527    def log_job_event(self, eventtype, timestamp, spec):
528        '''log job events(Queue,Start,End) to PBS-style log'''
529        def len2 (_input):
530            _input = str(_input)
531            if len(_input) == 1:
532                return "0" + _input
533            else:
534                return _input
535        if eventtype == 'Q':  #submitted(queued) for the first time
536            message = "%s;Q;%d;queue=%s" % (timestamp, spec['jobid'], spec['queue'])
537        elif eventtype == 'R':  #resume running after failure recovery
538            message = "%s;R;%s" % (timestamp, ":".join(spec['location']))
539        else:
540            wall_time = spec['walltime']
541            walltime_minutes = len2(int(float(wall_time)) % 60)
542            walltime_hours = len2(int(float(wall_time)) // 60)
543            log_walltime = "%s:%s:00" % (walltime_hours, walltime_minutes)
544            if eventtype == 'S':  #start running
545                message = "%s;S;%d;queue=%s qtime=%s Resource_List.ncpus=%s Resource_List.walltime=%s start=%s exec_host=%s" % \
546                (timestamp, spec['jobid'], spec['queue'], spec['submittime'], 
547                 spec['nodes'], log_walltime, spec['start_time'], ":".join(spec['location']))
548            elif eventtype == 'E':  #end
549                message = "%s;E;%d;queue=%s qtime=%s Resource_List.ncpus=%s Resource_List.walltime=%s start=%s end=%f exec_host=%s runtime=%s" % \
550                (timestamp, spec['jobid'], spec['queue'], spec['submittime'], spec['nodes'], log_walltime, spec['start_time'], 
551                 round(float(spec['end_time']), 1), ":".join(spec['location']),
552                 spec['runtime'])
553            elif eventtype == 'F':  #failure
554                frag_runtime = round(float(spec['failure_time']) - float(spec['start_time']), 1)  #running time before failure(after the latest start)
555                message = "%s;F;%d;queue=%s qtime=%s Resource_List.ncpus=%s Resource_List.walltime=%s exec_host=%s start=%s frag_runtime=%s complete=%f" % \
556                (timestamp, spec['jobid'], spec['queue'], spec['submittime'], 
557                 spec['nodes'], log_walltime, ":".join(spec['location']), spec['start_time'], 
558                 frag_runtime, round(frag_runtime / float(spec['runtime']), 2)
559                )
560            elif eventtype == 'P':  #pending
561                message = "%s;P;%d;queue=%s qtime=%s Resource_List.ncpus=%s Resource_List.walltime=%s exec_host=%s start=%s" % \
562                (timestamp, spec['jobid'], spec['queue'], spec['submittime'], 
563                 spec['nodes'], log_walltime, ":".join(spec['location']), spec['start_time'], 
564                )
565                print "message=", message
566            else:
567                print "invalid event type, type=", type
568                return
569        self.pbslog.LogMessage(message)
570               
571    def get_new_states(self, jobspec):
572        '''return the new state updates of a specific job at specific time
573        stamp, including invisible->queued, running->ended'''
574       
575        updates = {}
576        curstate = jobspec['state']
577        newstate = curstate
578        job_id = jobspec['jobid']
579       
580        cur_event = self.get_current_time_event()
581 
582        #handle job submssion event       
583        if cur_event == 'Q' and curstate == "invisible":
584            newstate = "queued"
585            updates['is_runnable'] = True
586            updates['is_visible'] = True
587            self.log_job_event('Q', self.get_current_time(), jobspec)
588       
589        #handle job completion event
590        elif cur_event == 'E' and curstate == "running":
591            newstate = "ended"
592            updates['is_runnable'] = False
593            updates['has_resources'] = False
594            updates['is_visible'] = False
595           
596            #release partition immediately
597            partitions = jobspec['location']
598            for partition in partitions:
599                self.release_partition(partition)
600            self.queues.del_jobs([{'jobid':job_id}])
601           
602            #write to output log
603            if jobspec['end_time']:
604                end = float(jobspec['end_time'])
605            else:
606                end = 0
607            end_datetime = sec_to_date(end)                                                                                                   
608            self.log_job_event('E', end_datetime, jobspec)
609       
610        #handle job failure event       
611        elif cur_event == 'F' and curstate == "running":
612            print "entered failure handling"
613 
614            #release partition
615            partitions = jobspec['location']
616            for partition in partitions:
617                print "partition %s start repairing" % (partition)
618                self.start_repair_partition(partition)
619             
620            #write to output log
621            if jobspec['failure_time']:
622                fail = float(jobspec['failure_time'])
623            else:
624                fail = 0
625            failure_datetime = sec_to_date(fail)
626            self.log_job_event('F', failure_datetime, jobspec)
627            print self.get_current_time(), " job %d failed at %s!!" % (job_id, ":".join(jobspec['location']))
628           
629            rec_updates = self.recovery_mgr(jobspec)
630           
631            if not rec_updates == {}:
632                updates.update(rec_updates)
633               
634            if updates.has_key('state'):
635                newstate = updates['state']
636               
637            if CHECKPOINT:
638                print "enter checkpoint handling****"
639                #runtime before failed after latest start
640                frag_runtime = float(jobspec['failure_time']) - float(jobspec['start_time'])
641                updates['remain_time'] = jobspec['remain_time'] - frag_runtime
642           
643            updates['has_resources'] = False
644               
645        else:#other event
646            pass
647       
648        if updates and not curstate == newstate:
649            print self.get_current_time(), "state changed, job", job_id, \
650             ":", curstate, "->", newstate
651            updates['state'] = newstate
652     
653        return updates
654   
655    def update_job_states(self, specs, updates):
656        '''update the state of the jobs associated to the current time stamp'''
657       
658        def _update_job_states(job, newattr):
659            '''callback function to update job states'''
660            temp = job.to_rx()
661            newattr = self.get_new_states(temp)
662            if newattr:
663                temp.update(newattr)
664                job.update(newattr)
665                   
666        ids_str = self.get_current_time_job()
667        ids = ids_str.split(':')
668        for id in ids:
669            for spec in specs:
670                spec['jobid'] = int(id)
671            ret = self.queues.get_jobs(specs, _update_job_states, updates)
672        return 0
673     
674    def run_job_updates(self, jobspec, newattr):
675        ''' return the state updates (including state queued -> running,
676        setting the start_time, end_time)'''
677        updates = {}
678       
679        #print "enter run_job_updates, jobspec=", jobspec
680       
681        start = self.get_current_time_sec()
682        updates['start_time'] = start
683        updates['starttime'] = start
684
685        updates['state'] = 'running'
686        updates['system_state'] = 'running'
687        updates['is_runnable'] = False
688        updates['has_resources'] = True
689
690        print self.get_current_time(), "run job state change, job", jobspec['jobid'], \
691             ":", jobspec['state'], "->", updates['state']
692             
693        #determine whether the job is going to fail before completion
694        location = newattr['location']
695        duration = jobspec['remain_time']
696        #print "duration=", duration
697        nearest_failure = self.get_next_failure(location, start, duration)
698        if (nearest_failure):
699            updates['failure_time'] = date_to_sec(nearest_failure)
700            new_time_stamp = nearest_failure
701            self.insert_time_stamp(new_time_stamp, 'F', {'jobid':str(jobspec['jobid'])})
702        else:  # will complete
703            end = start + duration
704            updates['end_time'] = end
705            new_time_stamp = sec_to_date(end)
706            #print "new_time_stamp=", new_time_stamp
707            self.insert_time_stamp(new_time_stamp, 'E', {'jobid':str(jobspec['jobid'])})
708       
709        updates.update(newattr)
710   
711        return updates
712 
713    def start_job(self, specs, updates):
714        '''update the job state and start_time and end_time when cqadm --run
715        is issued to a group of jobs'''
716        partitions = updates['location']
717        for partition in partitions:
718            self.reserve_partition(partition)
719           
720        def _start_job(job, newattr):
721            '''callback function to update job start/end time'''
722            temp = job.to_rx()
723            newattr = self.run_job_updates(temp, newattr)
724            temp.update(newattr)
725            job.update(newattr)
726            self.log_job_event('S', self.get_current_time(), temp)
727        return self.queues.get_jobs(specs, _start_job, updates)
728   
729    def add_jobs(self, specs):
730        '''Add a job, currently for unit test only'''
731        response = self.queues.add_jobs(specs)
732        return response
733    add_jobs = exposed(query(add_jobs))
734   
735    def get_jobs(self, specs):
736        '''get a list of jobs, each time triggers time stamp increment and job
737        states update'''
738
739        jobs = []
740        if self.increment_tag:
741            self.time_increment()
742            eventtype = self.get_current_time_event()
743            print "current event type====", eventtype
744            if eventtype == "R":
745                self.release_repaired_partition()
746               
747                #if the repaired job associated with some pending jobs,
748                #returen empty list to scheduler, in order to ensure the next
749                #time stamp will restart the pending job other than scheduling other jobs at this time stamp
750                #this will avoid run multiple jobs on the same partition(once a bug, solved)
751                if self.get_current_time_job():
752                    return jobs
753                   
754            elif eventtype == "S":
755               
756                self.restart_pending_job() 
757                return jobs
758           
759            else:
760                self.update_job_states(specs, {})
761            #self.update_job_states(specs, {})  #needchange : according to time stamp, update specific job's attribution
762       
763        if len(self.recovering_jobs) > 0:
764            self.update_recovering_jobs({})
765       
766        self.increment_tag = True
767        for spec in specs:
768            spec['is_visible'] = True
769            spec['jobid'] = "*"   # can't omitted, reset the spec['jobid'] assinged in update_job_states, (once a tricky bug, cost me nearly one day to find!)
770        jobs = self.queues.get_jobs(specs)
771
772        #make all job queue "default" so that the scheduler won't skip some jobs based on queue-partition relationship, only in simulation!
773        for job in jobs:
774            job.queue = "default"
775           
776#        print "running jobs=", [job.jobid for job in self.running_jobs]
777#        print "queueing jobs=", [job.jobid for job in self.queuing_jobs]
778#        print "return jobs=", len(jobs)
779
780        return jobs
781    get_jobs = exposed(query(get_jobs))
782   
783    def update_recovering_jobs(self, updates):
784        print "enter update_recovering_jobs()"
785       
786        def _update_recovering_jobs(job, newattr):
787            '''callback function to update job states'''
788            temp = job.to_rx()
789            print "temp=", temp
790            newattr = self.recovery_mgr(temp)
791            print "update_recovering_jobs newattr=", newattr
792            print "temp=", temp
793            if newattr:
794                temp.update(newattr)
795                job.update(newattr)
796               
797        ids = [job.jobid for job in self.recovering_jobs]
798        print "ids=", ids
799       
800        ret = self.queues.get_jobs([{'tag':"job", 'state': "recovering"}], _update_recovering_jobs, updates)
801        return 0
802   
803    def _get_queuing_jobs(self):
804        return self.queues.get_jobs([{'jobid':"*", 'state':"queued"}])
805    queuing_jobs = property(_get_queuing_jobs)
806   
807    def _get_running_jobs(self):
808        return self.queues.get_jobs([{'jobid':"*", 'state':"running"}])
809    running_jobs = property(_get_running_jobs)
810   
811    def _get_recovering_jobs(self):
812        return self.queues.get_jobs([{'jobid':"*", 'state':"recovering"}])
813    recovering_jobs = property(_get_recovering_jobs)
814   
815    def _get_job_by_id(self, jobid):
816        jobs = self.queues.get_jobs([{'jobid':jobid}])
817        if len(jobs) == 1:
818            return jobs[0]
819        else:
820            return None
821   
822    def get_recovering_jobs(self, specs):
823        return self.recovering_jobs
824    get_running_jobs = exposed(query(get_recovering_jobs))
825           
826    def add_queues(self, specs):
827        '''add queues'''
828        return self.queues.add_queues(specs)
829    add_queues = exposed(query(add_queues))
830   
831    def get_queues(self, specs):
832        '''get queues'''
833        return self.queues.get_queues(specs)
834    get_queues = exposed(query(get_queues))
835
836    def run_jobs(self, specs, nodelist):
837        '''run a queued job, by updating the job state, start_time and
838        end_time'''
839        print "run job specs=", specs, " on partion", nodelist
840        if specs:
841            self.start_job(specs, {'location': nodelist})
842            #set tag false, enable scheduling another job at the same time
843            self.increment_tag = False
844        #print "current running jobs=", [job.jobid for job in self.running_jobs]
845        return self.running_jobs
846    run_jobs = exposed(query(run_jobs))
847   
848   
849    def get_midplanes(self, partname):
850        '''return a list of sub-partitions each contains 512-nodes(midplane)'''
851        midplane_list = []
852        partition = self._partitions[partname]
853       
854        if partition.size == MIDPLANE_SIZE:
855            midplane_list.append(partname)
856        elif partition.size > MIDPLANE_SIZE:
857            children = partition.children
858            for part in children:
859                if self._partitions[part].size == MIDPLANE_SIZE:
860                    midplane_list.append(part)
861        else:
862            parents = partition.parents
863            for part in parents:
864                if self._partitions[part].size == MIDPLANE_SIZE:
865                    midplane_list.append(part)
866                           
867        return midplane_list
868   
869    def get_next_failure(self, location, now, duration): 
870        '''return the next(closest) failure moment according the partition failure list'''
871       
872        if (self.FAILURE_FREE):
873            return None
874       
875        def _find_next_failure(partname, now):
876            next = None
877            failure_list = self.failure_dict[partname]
878            if failure_list:
879                for fail_time in failure_list:
880                    if date_to_sec(fail_time) > now:
881                        next = fail_time
882                        break
883            return next
884                                       
885        closest_fail_sec = MAXINT
886        partitions = location
887
888        midplanes = set()
889        for partition in partitions:
890            tmp_midplanes = self.get_midplanes(partition)
891            for item in tmp_midplanes:
892                if item not in midplanes:
893                    midplanes.add(item)
894                       
895        for midplane in midplanes:
896            next = _find_next_failure(midplane, now)
897            if (next):
898                next_sec = date_to_sec(next)
899                if next_sec < closest_fail_sec:
900                    closest_fail_sec =next_sec
901       
902                       
903        if closest_fail_sec == MAXINT:
904            next_failure_date = None
905        else:
906            job_end_sec = now + duration
907            if closest_fail_sec < job_end_sec:
908                next_failure_date = sec_to_date(closest_fail_sec)
909            else:
910                next_failure_date = None
911               
912        #print "next_failure_date=", next_failure_date
913               
914        return next_failure_date                 
915
916    def will_job_fail(self, mtbf, nodes, hours):
917        '''simulate static failure chance, [not used]'''
918        return False
919        print "mtbf=%d, nodes=%d, hours=%f" % (mtbf,nodes,hours)
920        failure_chance = 1 - (1 - hours * 1.0/mtbf) ** nodes
921        if failure_chance > 0.7 :
922            failure_chance = 0.7
923        random_num = random.random()
924        print "failure chance=%f, random_num=%f" % (failure_chance, random_num)
925        if random_num < failure_chance:
926            return True
927        else:
928            return False
929       
930    def nodes_static(self):
931        '''static the node requested by each job, [not used]'''
932        jobs = self.queues.get_jobs([{'jobid':"*", 'queue':"*", 'nodes':"*"}])
933        nodesdict = {}
934        for job in jobs:
935            nodes = int(job.nodes)
936            nodesstr = nodes
937            if (nodesdict.has_key(nodesstr)):
938                nodesdict[nodesstr] =  nodesdict[nodesstr] + 1
939            else:
940                nodesdict[nodesstr] = 1
941        keys = nodesdict.keys()
942        keys.sort()
943        for key in keys:
944            print key, ":", nodesdict[key]
945           
946    def gen_failure_list(self, scale, shape, startdate, enddate):
947        '''generate a synthetic failure time list based on weibull distribution
948         and start/end date time'''
949        failure_moments = []
950        ttf_list = []
951               
952        start = date_to_sec(startdate)
953        end = date_to_sec(enddate)
954       
955        cur_failure = start
956       
957        while True:
958            ttf = random.weibullvariate(scale,shape)
959            cur_failure += ttf
960            if cur_failure < end:
961                ttf_list.append(ttf)
962                failure_moments.append(sec_to_date(cur_failure))
963            else:
964                break
965        return failure_moments, ttf_list
966   
967    def make_failures(self):
968        '''generate failure lists for each 512-nodes partition'''
969        ttf_dict = {}
970        start = self.time_stamps[1][1]
971        end = self.time_stamps[len(self.time_stamps)-1][1]
972       
973        for partition in self._partitions.values():
974            if partition.size == MIDPLANE_SIZE:
975                fl, ttfs = self.gen_failure_list(self.SCALE, self.SHAPE, start, end)
976                self.failure_dict[partition.name] = fl
977                ttf_dict[partition.name] = ttfs
978                       
979        partnames = self.failure_dict.keys()
980        partnames.sort()
981        f = open(default_FAILURE_LOG, "w")
982        total_f = 0
983        mtbf = 0
984        for part in partnames:
985            f_list = self.failure_dict[part]
986            print part, " ", f_list
987            f.write("%s;%s\n" % (part, ";".join(f_list)))
988            total_f +=  len(f_list)
989           
990            ttfs = ttf_dict[part] 
991            if len(ttfs)==0:
992                mtbf = 0
993            else:
994                total = 0
995               
996                for ttf in ttfs:
997                    total += ttf
998                    mtbf = total / len(ttfs)
999        start_sec = date_to_sec(start)
1000        end_sec = date_to_sec(end)
1001        f.write("Total=%d\nMTBF=%f" % (total_f, (end_sec-start_sec)/(total_f*3600)))
1002
1003        f.close()
1004       
1005    def inject_failures(self):
1006        '''parse failure trace log to make failure list for each 1-midplane partition'''
1007               
1008        raw_job_dict = {}
1009        partnames = set(self._partitions.keys())
1010        flog = open(self.failure_log, "r")
1011        self.failure_dict = {}
1012        for line in flog:
1013            print "line=", line
1014            line = line.strip('\n')
1015            parsedline = line.split(";")
1016            print "parsedline=", parsedline
1017            failure_list = []
1018            part = parsedline[0]
1019            if part in partnames:
1020                for i in range(1, len(parsedline)):
1021                    failure_moment = parsedline[i]
1022                    if len(failure_moment) == 0:
1023                        continue
1024                    failure_list.append(failure_moment)
1025                self.failure_dict[part] = failure_list
1026        partnames = self.failure_dict.keys()
1027        partnames.sort()
1028        for part in partnames:
1029            f_list = self.failure_dict[part]
1030            print part, " ", f_list   
1031       
1032    def get_failure_chance(self, location, duration):
1033        now = date_to_sec(self.get_current_time())
1034        next_fail = self.get_next_failure(location, now, duration)
1035        if (next_fail != None):
1036            return self.SENSITIVITY
1037        else:
1038            return 1 - self.SPECIFICITY
1039    get_failure_chance = exposed(get_failure_chance)
1040   
1041    def recovery_mgr(self, jobspec):
1042        """Recovery manager, this function can be extended to support various recovery options.
1043        at this version, the failed job is sent back to the rear of the queue. The extended code
1044        is ready and available at private code branch(wtang)."""
1045   
1046        updates = {}
1047       
1048        updates = self.handle_reque_rear(jobspec) 
1049
1050        recovery_option = jobspec['recovery_opt']
1051        print "rec_opt=", recovery_option
1052       
1053        #if_else structure remains room for recovery option extending   
1054        if recovery_option == 1:
1055            #resubmit the job
1056            #resubmit the job, the submit time changed to NOW
1057            updates = self.handle_reque_rear(jobspec) 
1058       
1059        return updates
1060
1061    def handle_reque_rear(self, jobspec):
1062        '''handle option 1 - resubmit the job to rear of waiting queue'''
1063        updates = {}
1064        updates['state'] = "queued"
1065        updates['start_time'] = 0
1066        updates['submittime'] = self.get_current_time_sec()
1067        return updates
1068       
1069    def start_repair_partition(self, partname):
1070        '''partition failed, assuming get repaired MTTR seconds later'''
1071        now = self.get_current_time_sec()
1072        time_to_repair = now + MTTR
1073        time_to_repair_date = sec_to_date(time_to_repair)
1074        self.insert_time_stamp(time_to_repair_date, "R", {'location':partname})
1075       
1076    def release_repaired_partition(self):
1077        '''enter release_repaired_partition() partition repaired'''
1078        partition = self.get_current_time_partition()
1079        if partition == None:
1080            return False
1081        self.release_partition(partition)
1082        print "partition %s gets repaired" % (partition)
1083        self.log_job_event('R', self.get_current_time(), {'location':partition})
1084        return True
1085   
1086    def restart_pending_job(self):
1087        '''restart jobs that pending for the nodes repair'''
1088        partname = self.get_current_time_partition()
1089        print "enter restart_pending_job() partname=", partname
1090       
1091        ids_str = self.get_current_time_job()
1092        ids = ids_str.split(':')
1093        jobspecs = []
1094        for id in ids:
1095            spec = {'tag':'job', 'jobid':int(id)}
1096            jobspecs.append(spec)
1097        print "restart pending job ", jobspecs, " on repaired partition ", partname
1098        self.run_jobs(jobspecs, [partname])
1099       
1100
1101    def _find_job_location(self, args, drain_partitions=set(), backfilling=False):
1102        jobid = args['jobid']
1103        nodes = args['nodes']
1104        queue = args['queue']
1105        utility_score = args['utility_score']
1106        walltime = args['walltime']
1107        forbidden = args.get("forbidden", [])
1108        required = args.get("required", [])
1109
1110        best_score = sys.maxint
1111        best_partition = None
1112               
1113        available_partitions = set()
1114        if required:
1115            for p_name in required:
1116                available_partitions.add(self.cached_partitions[p_name])
1117                available_partitions.update(self.cached_partitions[p_name]._children)
1118        else:
1119            for p in self.cached_partitions.itervalues():
1120                skip = False
1121                for bad_name in forbidden:
1122                    if p.name==bad_name or bad_name in p.children or bad_name in p.parents:
1123                        skip = True
1124                        break
1125                if not skip:
1126                    available_partitions.add(p)
1127       
1128        available_partitions -= drain_partitions
1129        now = self.get_current_time_sec()
1130       
1131        for partition in available_partitions:
1132            # check if the current partition is linked to the job's queue (but if reservation locations were
1133            # passed in via the "required" argument, then we know it's all good)
1134#            if not required and queue not in partition.queue.split(':'):
1135#                continue
1136           
1137            # if the job needs more time than the partition currently has available, look elsewhere   
1138            if backfilling:
1139                if 60*float(walltime) > (partition.backfill_time - now):
1140                    continue
1141               
1142            if self.can_run(partition, nodes, self.cached_partitions):
1143                # let's check the impact on partitions that would become blocked
1144                #print "can run ", partition.name
1145                score = 0
1146                for p in partition.parents:
1147                    if self.cached_partitions[p].state == "idle" and self.cached_partitions[p].scheduled:
1148                        score += 1
1149               
1150                if (FAULTAWARE):
1151                    Pf = 0
1152                    Pf = self.get_failure_chance(partition.name, 60*float(walltime))
1153                    score += Pf
1154               
1155                # the lower the score, the fewer new partitions will be blocked by this selection
1156                if score < best_score:
1157                    best_score = score
1158                    best_partition = partition       
1159
1160        if best_partition:
1161            #print "return bestpartition=",{jobid: [best_partition.name]}
1162            return {jobid: [best_partition.name]}
1163 
1164    def find_job_location(self, arg_list, end_times):
1165               
1166        best_partition_dict = {}
1167       
1168        if self.bridge_in_error:
1169            print "bridge_in_error"
1170            return {}
1171       
1172        self._partitions_lock.acquire()
1173        try:
1174            self.cached_partitions = copy.deepcopy(self.partitions)
1175        except Exception,e:
1176            print e
1177            self.logger.error("error in copy.deepcopy", exc_info=True)
1178            return {}
1179        finally:
1180            self._partitions_lock.release()
1181
1182        # first, figure out backfilling cutoffs per partition (which we'll also use for picking which partition to drain)
1183        job_end_times = {}
1184        for item in end_times:
1185            job_end_times[item[0][0]] = item[1]
1186                   
1187        now = self.get_current_time_sec()
1188        for p in self.cached_partitions.itervalues():
1189            if p.state == "idle":
1190                p.backfill_time = now
1191            else:
1192                p.backfill_time = now + 5*60
1193            p.draining = False
1194       
1195        for p in self.cached_partitions.itervalues():   
1196            if p.name in job_end_times:
1197                if job_end_times[p.name] > p.backfill_time:
1198                    p.backfill_time = job_end_times[p.name]
1199               
1200                for parent_name in p.parents:
1201                    parent_partition = self.cached_partitions[parent_name]
1202                    if p.backfill_time > parent_partition.backfill_time:
1203                        parent_partition.backfill_time = p.backfill_time
1204       
1205        for p in self.cached_partitions.itervalues():
1206            if p.backfill_time == now:
1207                continue
1208           
1209            for child_name in p.children:
1210                child_partition = self.cached_partitions[child_name]
1211                if child_partition.backfill_time == now or child_partition.backfill_time > p.backfill_time:
1212                    child_partition.backfill_time = p.backfill_time
1213       
1214        # first time through, try for starting jobs based on utility scores
1215        drain_partitions = set()
1216        # the sets draining_jobs and cannot_start are for efficiency, not correctness
1217        draining_jobs = set()
1218        cannot_start = set()
1219        for idx in range(len(arg_list)):
1220            winning_job = arg_list[idx]
1221            for jj in range(idx, len(arg_list)):
1222                job = arg_list[jj]
1223               
1224                # this job isn't good enough!
1225                if job['utility_score'] < winning_job['threshold']:
1226                    break
1227
1228                if job['jobid'] not in cannot_start:
1229                    partition_name = self._find_job_location(job, drain_partitions)
1230                    if partition_name:
1231                        best_partition_dict.update(partition_name)
1232                        break
1233               
1234                cannot_start.add(job['jobid'])
1235               
1236                # we already picked a drain location for the winning job
1237                if winning_job['jobid'] in draining_jobs:
1238                    continue
1239
1240                location = self._find_drain_partition(winning_job)
1241                if location is not None:
1242                    for p_name in location.parents:
1243                        drain_partitions.add(self.cached_partitions[p_name])
1244                    for p_name in location.children:
1245                        drain_partitions.add(self.cached_partitions[p_name])
1246                        self.cached_partitions[p_name].draining = True
1247                    drain_partitions.add(location)
1248                    #self.logger.info("job %s is draining %s" % (winning_job['jobid'], location.name))
1249                    location.draining = True
1250                    draining_jobs.add(winning_job['jobid'])
1251           
1252            # at this time, we only want to try launching one job at a time
1253            if best_partition_dict:
1254                break
1255       
1256        # the next time through, try to backfill, but only if we couldn't find anything to start
1257        if not best_partition_dict:
1258           
1259            # arg_list.sort(self._walltimecmp)
1260
1261            for args in arg_list:
1262                partition_name = self._find_job_location(args, backfilling=True)
1263                if partition_name:
1264                    self.logger.info("backfilling job %s" % args['jobid'])
1265                    best_partition_dict.update(partition_name)
1266                    break
1267
1268        # reserve the stuff in the best_partition_dict, as those partitions are allegedly going to
1269        # be running jobs very soon
1270        #
1271        # also, this is the only part of finding a job location where we need to lock anything
1272        self._partitions_lock.acquire()
1273        try:
1274            for p in self.partitions.itervalues():
1275                # push the backfilling info from the local cache back to the real objects
1276                p.draining = self.cached_partitions[p.name].draining
1277                p.backfill_time = self.cached_partitions[p.name].backfill_time
1278               
1279            for partition_list in best_partition_dict.itervalues():
1280                part = self.partitions[partition_list[0]] 
1281                part.reserved_until = self.get_current_time_sec() + 5*60
1282                part.state = "starting job"
1283                for p in part._parents:
1284                    if p.state == "idle":
1285                        p.state = "blocked by starting job"
1286                for p in part._children:
1287                    if p.state == "idle":
1288                        p.state = "blocked by starting job"
1289        except:
1290            self.logger.error("error in find_job_location", exc_info=True)
1291        self._partitions_lock.release()
1292       
1293        #print "best_partition_dict=", best_partition_dict
1294       
1295        return best_partition_dict
1296    find_job_location = locking(exposed(find_job_location))
Note: See TracBrowser for help on using the browser.