Changeset 1601

Show
Ignore:
Timestamp:
07/06/09 13:28:41 (5 months ago)
Author:
wtang
Message:

making qsim faster by optimzing derived allocator code (qsim._find_job_location)

Files:
1 modified

Legend:

Unmodified
Added
Removed
  • trunk/src/lib/Components/qsim.py

    r1600 r1601  
    3737default_SPECIFICITY = 0.9 
    3838default_FAILURE_LOG = "failure.lists" 
    39  
    4039logging.basicConfig() 
    4140logger = logging.getLogger('Qsim') 
     
    4746SET_event = set(['I', 'Q', 'S', 'E', 'F', 'R']) 
    4847FAULTAWARE = False 
     48IDEALWALLTIME = True 
    4949 
    5050def parseline(line): 
     
    158158                            "failure_time", "location", "state", "is_visible",  
    159159                            "args", 
     160                            "user", 
    160161                            "system_state", 
    161162                            "starttime", 
     
    183184         
    184185        self.walltime = spec.get("walltime")   #in minutes 
    185          
     186        self.user = spec.get("user", "unknown") 
     187        self.project = spec.get("project", "unknown") 
    186188        self.nodes = spec.get("nodes", 0) 
    187189        self.runtime = spec.get("runtime", 0) 
     
    233235        self.policy = policy 
    234236        #create default queue 
    235         self.add_queues([{"name":"default", "policy":self.policy}])          
     237        self.add_queues([{"name":"default", "policy":"default"}])          
    236238  
    237239    def add_jobs(self, specs, callback=None, cargs={}): 
     
    240242        for spec in specs: 
    241243            if spec['queue'] not in queue_names: 
    242                 self.add_queues([{"name":spec['queue'], "policy":self.policy}]) 
    243                 queue_names.append(spec['queue']) 
     244                spec['queue'] = "default" 
     245    
     246 #               self.add_queues([{"name":spec['queue'], "policy":self.policy}]) 
     247 #               queue_names.append(spec['queue']) 
    244248                
    245249        results = [] 
     
    300304        self.init_partition(partnames) 
    301305        self.part_size_list = [] 
     306      
    302307        for part in self.partitions.itervalues(): 
    303308            if int(part.size) not in self.part_size_list: 
     
    337342        self.queues = SimQueueDict(policy=kwargs['policy']) 
    338343        self.init_queues() 
     344        self.visible_jobs = [] 
    339345         
    340346        #initialize failures 
     
    350356        #initialize PBS-style logger 
    351357        self.pbslog = PBSlogger(self.output_log) 
     358         
     359        #initialize debug logger 
     360        self.dbglog = PBSlogger(self.output_log+"-debug") 
    352361         
    353362        #finish tag 
     
    484493            #convert walltime from 'hh:mm:ss' to float of minutes 
    485494            format_walltime = tmp.get('Resource_List.walltime') 
     495            spec['walltime'] = 0 
    486496            if format_walltime: 
    487497                segs = format_walltime.split(':') 
     
    489499            else:  #invalid job entry, discard 
    490500                spec['valid'] = False 
    491              
     501                 
    492502            if tmp.get('start') and tmp.get('end'): 
    493503                act_run_time = float(tmp.get('end')) - float(tmp.get('start')) 
    494504                spec['runtime'] = str(round(act_run_time, 1)) 
     505                 
     506                if IDEALWALLTIME: 
     507                    wtime = (round(act_run_time / 60, 2) + float(spec['walltime']))/2 
     508                    #wtime = act_run_time / 60 
     509                    spec['walltime'] = str(round(wtime, 2)) 
    495510            else: 
    496511                spec['valid'] = False 
     
    498513            if tmp.get('Resource_List.nodect'): 
    499514                spec['nodes'] = tmp.get('Resource_List.nodect') 
     515 
    500516            else:  #invalid job entry, discard 
    501517                spec['valid'] = False 
    502              
     518              
     519            if tmp.get('user'): 
     520                spec['user'] = tmp.get('user') 
     521            if tmp.get('project'): 
     522                spec['project'] = tmp.get('project')             
     523 
    503524            spec['state'] = 'invisible' 
    504525            spec['start_time'] = '0' 
     
    543564            log_walltime = "%s:%s:00" % (walltime_hours, walltime_minutes) 
    544565            if eventtype == 'S':  #start running  
    545                 message = "%s;S;%d;queue=%s qtime=%s Resource_List.ncpus=%s Resource_List.walltime=%s start=%s exec_host=%s" % \ 
     566                message = "%s;S;%d;queue=%s qtime=%s Resource_List.nodect=%s Resource_List.walltime=%s start=%s exec_host=%s" % \ 
    546567                (timestamp, spec['jobid'], spec['queue'], spec['submittime'],  
    547568                 spec['nodes'], log_walltime, spec['start_time'], ":".join(spec['location'])) 
    548569            elif eventtype == 'E':  #end 
    549                 message = "%s;E;%d;queue=%s qtime=%s Resource_List.ncpus=%s Resource_List.walltime=%s start=%s end=%f exec_host=%s runtime=%s" % \ 
     570                message = "%s;E;%d;queue=%s qtime=%s Resource_List.nodect=%s Resource_List.walltime=%s start=%s end=%f exec_host=%s runtime=%s" % \ 
    550571                (timestamp, spec['jobid'], spec['queue'], spec['submittime'], spec['nodes'], log_walltime, spec['start_time'],  
    551572                 round(float(spec['end_time']), 1), ":".join(spec['location']), 
     
    553574            elif eventtype == 'F':  #failure 
    554575                frag_runtime = round(float(spec['failure_time']) - float(spec['start_time']), 1)  #running time before failure(after the latest start) 
    555                 message = "%s;F;%d;queue=%s qtime=%s Resource_List.ncpus=%s Resource_List.walltime=%s exec_host=%s start=%s frag_runtime=%s complete=%f" % \ 
     576                message = "%s;F;%d;queue=%s qtime=%s Resource_List.nodect=%s Resource_List.walltime=%s exec_host=%s start=%s frag_runtime=%s complete=%f" % \ 
    556577                (timestamp, spec['jobid'], spec['queue'], spec['submittime'],  
    557578                 spec['nodes'], log_walltime, ":".join(spec['location']), spec['start_time'],  
     
    559580                ) 
    560581            elif eventtype == 'P':  #pending 
    561                 message = "%s;P;%d;queue=%s qtime=%s Resource_List.ncpus=%s Resource_List.walltime=%s exec_host=%s start=%s" % \ 
     582                message = "%s;P;%d;queue=%s qtime=%s Resource_List.nodect=%s Resource_List.walltime=%s exec_host=%s start=%s" % \ 
    562583                (timestamp, spec['jobid'], spec['queue'], spec['submittime'],  
    563584                 spec['nodes'], log_walltime, ":".join(spec['location']), spec['start_time'],  
    564585                ) 
    565                 print "message=", message 
    566586            else: 
    567587                print "invalid event type, type=", type 
     
    632652                updates.update(rec_updates) 
    633653                 
     654            updates['has_resources'] = False 
     655                 
    634656            if updates.has_key('state'): 
    635657                newstate = updates['state'] 
    636                  
     658             
    637659            if CHECKPOINT: 
    638660                print "enter checkpoint handling****" 
     
    640662                frag_runtime = float(jobspec['failure_time']) - float(jobspec['start_time']) 
    641663                updates['remain_time'] = jobspec['remain_time'] - frag_runtime 
    642              
    643             updates['has_resources'] = False 
    644664                
    645665        else:#other event 
     
    666686        ids_str = self.get_current_time_job() 
    667687        ids = ids_str.split(':') 
     688        cur_event = self.get_current_time_event() 
    668689        for id in ids: 
    669690            for spec in specs: 
    670691                spec['jobid'] = int(id) 
    671             ret = self.queues.get_jobs(specs, _update_job_states, updates) 
     692            ret_jobs = self.queues.get_jobs(specs, _update_job_states, updates) 
     693            if cur_event == "Q": 
     694                self.visible_jobs.extend(ret_jobs) 
     695            elif cur_event=="E": 
     696                self.visible_jobs = [j for j in self.visible_jobs if j not in ret_jobs] 
    672697        return 0 
    673698      
     
    759784            else: 
    760785                self.update_job_states(specs, {}) 
    761             #self.update_job_states(specs, {})  #needchange : according to time stamp, update specific job's attribution 
    762          
     786            
    763787        if len(self.recovering_jobs) > 0: 
    764788            self.update_recovering_jobs({}) 
    765789         
    766790        self.increment_tag = True 
    767         for spec in specs: 
    768             spec['is_visible'] = True 
    769             spec['jobid'] = "*"   # can't omitted, reset the spec['jobid'] assinged in update_job_states, (once a tricky bug, cost me nearly one day to find!) 
    770         jobs = self.queues.get_jobs(specs) 
    771  
    772         #make all job queue "default" so that the scheduler won't skip some jobs based on queue-partition relationship, only in simulation! 
    773         for job in jobs: 
    774             job.queue = "default" 
    775              
     791         
     792        jobs = self.visible_jobs            
    776793#        print "running jobs=", [job.jobid for job in self.running_jobs] 
    777794#        print "queueing jobs=", [job.jobid for job in self.queuing_jobs] 
     795#        print "visible jobs=", [job.jobid for job in self.visible_jobs] 
    778796#        print "return jobs=", len(jobs)  
    779797 
     
    802820     
    803821    def _get_queuing_jobs(self): 
    804         return self.queues.get_jobs([{'jobid':"*", 'state':"queued"}]) 
     822        return [job for job in self.visible_jobs if job.is_runnable==True] 
    805823    queuing_jobs = property(_get_queuing_jobs) 
    806824     
    807825    def _get_running_jobs(self): 
    808         return self.queues.get_jobs([{'jobid':"*", 'state':"running"}]) 
     826        return [job for job in self.visible_jobs if job.has_resources==True] 
    809827    running_jobs = property(_get_running_jobs) 
    810828     
     
    812830        return self.queues.get_jobs([{'jobid':"*", 'state':"recovering"}]) 
    813831    recovering_jobs = property(_get_recovering_jobs) 
     832     
     833    def get_visible_jobs(self): 
     834        return self.visible_jobs; 
     835    get_visible_jobs = exposed(get_visible_jobs) 
     836     
     837    def get_running_jobs(self): 
     838        return [job for job in self.visible_jobs if job.has_resources==True] 
     839    get_running_jobs = exposed(get_running_jobs) 
     840     
     841    def get_queuing_jobs(self): 
     842        return [job for job in self.visible_jobs if job.is_runnable==True] 
     843    get_queuing_jobs = exposed(get_queuing_jobs) 
    814844     
    815845    def _get_job_by_id(self, jobid): 
     
    820850            return None 
    821851    
    822     def get_recovering_jobs(self, specs): 
    823         return self.recovering_jobs 
    824     get_running_jobs = exposed(query(get_recovering_jobs)) 
    825             
    826852    def add_queues(self, specs): 
    827853        '''add queues''' 
     
    899925                if next_sec < closest_fail_sec: 
    900926                    closest_fail_sec =next_sec 
    901          
    902927                         
    903928        if closest_fail_sec == MAXINT: 
     
    10981123        self.run_jobs(jobspecs, [partname]) 
    10991124         
     1125    def possible_locations(self, job): 
     1126        '''find the partitions with the size that can right accomodates the job 
     1127        (returned partions are not necessarily idle)''' 
     1128        locations = [] 
     1129        proper_partsize = 64 
     1130        job_nodes = int(job['nodes']) 
     1131         
     1132        for psize in self.part_size_list: 
     1133            if psize >= job_nodes: 
     1134                proper_partsize = psize 
     1135                break              
     1136             
     1137        for part in self.cached_partitions.itervalues(): 
     1138            if int(part.size) == proper_partsize: 
     1139                locations.append(part) 
     1140                 
     1141        return locations 
    11001142 
    11011143    def _find_job_location(self, args, drain_partitions=set(), backfilling=False): 
     
    11101152        best_score = sys.maxint 
    11111153        best_partition = None 
    1112                  
    1113         available_partitions = set() 
    1114         if required: 
    1115             for p_name in required: 
    1116                 available_partitions.add(self.cached_partitions[p_name]) 
    1117                 available_partitions.update(self.cached_partitions[p_name]._children) 
    1118         else: 
    1119             for p in self.cached_partitions.itervalues(): 
    1120                 skip = False 
    1121                 for bad_name in forbidden: 
    1122                     if p.name==bad_name or bad_name in p.children or bad_name in p.parents: 
    1123                         skip = True 
    1124                         break 
    1125                 if not skip: 
    1126                     available_partitions.add(p) 
    1127          
    1128         available_partitions -= drain_partitions 
    1129         now = self.get_current_time_sec() 
    1130          
    1131         for partition in available_partitions: 
    1132             # check if the current partition is linked to the job's queue (but if reservation locations were 
    1133             # passed in via the "required" argument, then we know it's all good) 
    1134 #            if not required and queue not in partition.queue.split(':'): 
    1135 #                continue 
    1136              
    1137             # if the job needs more time than the partition currently has available, look elsewhere     
     1154         
     1155        # get partitions of proper size as the candidates 
     1156        candidate_partitions = self.possible_locations(args) 
     1157        #exclude the partitions already drained 
     1158        if drain_partitions: 
     1159            candidate_partitions = [part for part in candidate_partitions if part not in drain_partitions] 
     1160         
     1161        now = self.get_current_time_sec()          
     1162        for partition in candidate_partitions: 
     1163             
     1164            #skip partitions that are not "idle" 
     1165            if partition.state != "idle": 
     1166                continue 
     1167             
    11381168            if backfilling: 
     1169                #skip the partition with too short cutoff to backfill the job 
    11391170                if 60*float(walltime) > (partition.backfill_time - now): 
    11401171                    continue 
    11411172                 
    1142             if self.can_run(partition, nodes, self.cached_partitions): 
    1143                 # let's check the impact on partitions that would become blocked 
    1144                 #print "can run ", partition.name 
    1145                 score = 0 
    1146                 for p in partition.parents: 
    1147                     if self.cached_partitions[p].state == "idle" and self.cached_partitions[p].scheduled: 
    1148                         score += 1 
    1149                  
     1173            # let's check the impact on partitions that would become blocked 
     1174            score = 0 
     1175            for p in partition.parents: 
     1176                if self.cached_partitions[p].state == "idle" and self.cached_partitions[p].scheduled: 
     1177                    score += 1 
     1178                 
     1179                for ch in partition.children: 
     1180                    score += 0.01 
     1181                
    11501182                if (FAULTAWARE): 
    11511183                    Pf = 0 
     
    11561188                if score < best_score: 
    11571189                    best_score = score 
    1158                     best_partition = partition         
     1190                    best_partition = partition 
     1191                elif score == best_score: 
     1192                    if partition.name > best_partition.name: 
     1193                        best_partition = partition 
    11591194 
    11601195        if best_partition: 
    1161             #print "return bestpartition=",{jobid: [best_partition.name]} 
     1196            #print "return bestpartition=",{jobid: [best_partition.name, best_partition.state]} 
    11621197            return {jobid: [best_partition.name]} 
    11631198  
     
    11701205            return {} 
    11711206         
    1172         self._partitions_lock.acquire() 
    1173         try: 
    1174             self.cached_partitions = copy.deepcopy(self.partitions) 
    1175         except Exception,e: 
    1176             print e 
    1177             self.logger.error("error in copy.deepcopy", exc_info=True) 
    1178             return {} 
    1179         finally: 
    1180             self._partitions_lock.release() 
    1181  
    1182         # first, figure out backfilling cutoffs per partition (which we'll also use for picking which partition to drain) 
     1207        self.cached_partitions = self.partitions 
     1208 
     1209        # first, figure out backfilling cutoffs per partition (which we'll also  
     1210        # use for picking which partition to drain) 
    11831211        job_end_times = {} 
    11841212        for item in end_times: 
     
    12171245        draining_jobs = set() 
    12181246        cannot_start = set() 
     1247 
    12191248        for idx in range(len(arg_list)): 
    12201249            winning_job = arg_list[idx] 
     
    12471276                    drain_partitions.add(location) 
    12481277                    #self.logger.info("job %s is draining %s" % (winning_job['jobid'], location.name)) 
     1278                    #self.dbglog.LogMessage("job %s is draining %s" % (winning_job['jobid'], location.name)) 
    12491279                    location.draining = True 
    12501280                    draining_jobs.add(winning_job['jobid']) 
     
    12521282            # at this time, we only want to try launching one job at a time 
    12531283            if best_partition_dict: 
     1284                  
     1285            #    msg =  "idx=%s, jj=%s, job=%s, partition=%s" % (idx, jj, job['jobid'],best_partition_dict[job['jobid']]) 
     1286                #print msg                 
     1287            #    self.dbglog.LogMessage(msg) 
    12541288                break 
    12551289         
     
    12571291        if not best_partition_dict: 
    12581292             
    1259             # arg_list.sort(self._walltimecmp) 
     1293        # arg_list.sort(self._walltimecmp) 
     1294        #   msg = "try to backfill jobs..." 
     1295        #   self.dbglog.LogMessage(msg) 
    12601296 
    12611297            for args in arg_list: 
    12621298                partition_name = self._find_job_location(args, backfilling=True) 
    12631299                if partition_name: 
    1264                     self.logger.info("backfilling job %s" % args['jobid']) 
     1300                    msg = "backfilling job %s(%s)" % (args['jobid'], args['nodes']) 
     1301                    self.logger.info(msg) 
     1302                    self.dbglog.LogMessage(msg) 
    12651303                    best_partition_dict.update(partition_name) 
    1266                     break 
    1267  
     1304                    break                 
     1305                     
    12681306        # reserve the stuff in the best_partition_dict, as those partitions are allegedly going to  
    12691307        # be running jobs very soon 
    12701308        # 
    12711309        # also, this is the only part of finding a job location where we need to lock anything 
    1272         self._partitions_lock.acquire() 
     1310        #self._partitions_lock.acquire() 
    12731311        try: 
    12741312            for p in self.partitions.itervalues(): 
     
    12791317            for partition_list in best_partition_dict.itervalues(): 
    12801318                part = self.partitions[partition_list[0]]  
    1281                 part.reserved_until = self.get_current_time_sec() + 5*60 
     1319                ##part.reserved_until = self.get_current_time_sec() + 5*60 
    12821320                part.state = "starting job" 
    12831321                for p in part._parents: 
     
    12891327        except: 
    12901328            self.logger.error("error in find_job_location", exc_info=True) 
    1291         self._partitions_lock.release() 
     1329        #self._partitions_lock.release() 
    12921330         
    12931331        #print "best_partition_dict=", best_partition_dict