Changeset 1601
- Timestamp:
- 07/06/09 13:28:41 (5 months ago)
- Files:
-
- 1 modified
-
trunk/src/lib/Components/qsim.py (modified) (33 diffs)
Legend:
- Unmodified
- Added
- Removed
-
trunk/src/lib/Components/qsim.py
r1600 r1601 37 37 default_SPECIFICITY = 0.9 38 38 default_FAILURE_LOG = "failure.lists" 39 40 39 logging.basicConfig() 41 40 logger = logging.getLogger('Qsim') … … 47 46 SET_event = set(['I', 'Q', 'S', 'E', 'F', 'R']) 48 47 FAULTAWARE = False 48 IDEALWALLTIME = True 49 49 50 50 def parseline(line): … … 158 158 "failure_time", "location", "state", "is_visible", 159 159 "args", 160 "user", 160 161 "system_state", 161 162 "starttime", … … 183 184 184 185 self.walltime = spec.get("walltime") #in minutes 185 186 self.user = spec.get("user", "unknown") 187 self.project = spec.get("project", "unknown") 186 188 self.nodes = spec.get("nodes", 0) 187 189 self.runtime = spec.get("runtime", 0) … … 233 235 self.policy = policy 234 236 #create default queue 235 self.add_queues([{"name":"default", "policy": self.policy}])237 self.add_queues([{"name":"default", "policy":"default"}]) 236 238 237 239 def add_jobs(self, specs, callback=None, cargs={}): … … 240 242 for spec in specs: 241 243 if spec['queue'] not in queue_names: 242 self.add_queues([{"name":spec['queue'], "policy":self.policy}]) 243 queue_names.append(spec['queue']) 244 spec['queue'] = "default" 245 246 # self.add_queues([{"name":spec['queue'], "policy":self.policy}]) 247 # queue_names.append(spec['queue']) 244 248 245 249 results = [] … … 300 304 self.init_partition(partnames) 301 305 self.part_size_list = [] 306 302 307 for part in self.partitions.itervalues(): 303 308 if int(part.size) not in self.part_size_list: … … 337 342 self.queues = SimQueueDict(policy=kwargs['policy']) 338 343 self.init_queues() 344 self.visible_jobs = [] 339 345 340 346 #initialize failures … … 350 356 #initialize PBS-style logger 351 357 self.pbslog = PBSlogger(self.output_log) 358 359 #initialize debug logger 360 self.dbglog = PBSlogger(self.output_log+"-debug") 352 361 353 362 #finish tag … … 484 493 #convert walltime from 'hh:mm:ss' to float of minutes 485 494 format_walltime = tmp.get('Resource_List.walltime') 495 spec['walltime'] = 0 486 496 if format_walltime: 487 497 segs = format_walltime.split(':') … … 489 499 else: #invalid job entry, discard 490 500 spec['valid'] = False 491 501 492 502 if tmp.get('start') and tmp.get('end'): 493 503 act_run_time = float(tmp.get('end')) - float(tmp.get('start')) 494 504 spec['runtime'] = str(round(act_run_time, 1)) 505 506 if IDEALWALLTIME: 507 wtime = (round(act_run_time / 60, 2) + float(spec['walltime']))/2 508 #wtime = act_run_time / 60 509 spec['walltime'] = str(round(wtime, 2)) 495 510 else: 496 511 spec['valid'] = False … … 498 513 if tmp.get('Resource_List.nodect'): 499 514 spec['nodes'] = tmp.get('Resource_List.nodect') 515 500 516 else: #invalid job entry, discard 501 517 spec['valid'] = False 502 518 519 if tmp.get('user'): 520 spec['user'] = tmp.get('user') 521 if tmp.get('project'): 522 spec['project'] = tmp.get('project') 523 503 524 spec['state'] = 'invisible' 504 525 spec['start_time'] = '0' … … 543 564 log_walltime = "%s:%s:00" % (walltime_hours, walltime_minutes) 544 565 if eventtype == 'S': #start running 545 message = "%s;S;%d;queue=%s qtime=%s Resource_List.n cpus=%s Resource_List.walltime=%s start=%s exec_host=%s" % \566 message = "%s;S;%d;queue=%s qtime=%s Resource_List.nodect=%s Resource_List.walltime=%s start=%s exec_host=%s" % \ 546 567 (timestamp, spec['jobid'], spec['queue'], spec['submittime'], 547 568 spec['nodes'], log_walltime, spec['start_time'], ":".join(spec['location'])) 548 569 elif eventtype == 'E': #end 549 message = "%s;E;%d;queue=%s qtime=%s Resource_List.n cpus=%s Resource_List.walltime=%s start=%s end=%f exec_host=%s runtime=%s" % \570 message = "%s;E;%d;queue=%s qtime=%s Resource_List.nodect=%s Resource_List.walltime=%s start=%s end=%f exec_host=%s runtime=%s" % \ 550 571 (timestamp, spec['jobid'], spec['queue'], spec['submittime'], spec['nodes'], log_walltime, spec['start_time'], 551 572 round(float(spec['end_time']), 1), ":".join(spec['location']), … … 553 574 elif eventtype == 'F': #failure 554 575 frag_runtime = round(float(spec['failure_time']) - float(spec['start_time']), 1) #running time before failure(after the latest start) 555 message = "%s;F;%d;queue=%s qtime=%s Resource_List.n cpus=%s Resource_List.walltime=%s exec_host=%s start=%s frag_runtime=%s complete=%f" % \576 message = "%s;F;%d;queue=%s qtime=%s Resource_List.nodect=%s Resource_List.walltime=%s exec_host=%s start=%s frag_runtime=%s complete=%f" % \ 556 577 (timestamp, spec['jobid'], spec['queue'], spec['submittime'], 557 578 spec['nodes'], log_walltime, ":".join(spec['location']), spec['start_time'], … … 559 580 ) 560 581 elif eventtype == 'P': #pending 561 message = "%s;P;%d;queue=%s qtime=%s Resource_List.n cpus=%s Resource_List.walltime=%s exec_host=%s start=%s" % \582 message = "%s;P;%d;queue=%s qtime=%s Resource_List.nodect=%s Resource_List.walltime=%s exec_host=%s start=%s" % \ 562 583 (timestamp, spec['jobid'], spec['queue'], spec['submittime'], 563 584 spec['nodes'], log_walltime, ":".join(spec['location']), spec['start_time'], 564 585 ) 565 print "message=", message566 586 else: 567 587 print "invalid event type, type=", type … … 632 652 updates.update(rec_updates) 633 653 654 updates['has_resources'] = False 655 634 656 if updates.has_key('state'): 635 657 newstate = updates['state'] 636 658 637 659 if CHECKPOINT: 638 660 print "enter checkpoint handling****" … … 640 662 frag_runtime = float(jobspec['failure_time']) - float(jobspec['start_time']) 641 663 updates['remain_time'] = jobspec['remain_time'] - frag_runtime 642 643 updates['has_resources'] = False644 664 645 665 else:#other event … … 666 686 ids_str = self.get_current_time_job() 667 687 ids = ids_str.split(':') 688 cur_event = self.get_current_time_event() 668 689 for id in ids: 669 690 for spec in specs: 670 691 spec['jobid'] = int(id) 671 ret = self.queues.get_jobs(specs, _update_job_states, updates) 692 ret_jobs = self.queues.get_jobs(specs, _update_job_states, updates) 693 if cur_event == "Q": 694 self.visible_jobs.extend(ret_jobs) 695 elif cur_event=="E": 696 self.visible_jobs = [j for j in self.visible_jobs if j not in ret_jobs] 672 697 return 0 673 698 … … 759 784 else: 760 785 self.update_job_states(specs, {}) 761 #self.update_job_states(specs, {}) #needchange : according to time stamp, update specific job's attribution 762 786 763 787 if len(self.recovering_jobs) > 0: 764 788 self.update_recovering_jobs({}) 765 789 766 790 self.increment_tag = True 767 for spec in specs: 768 spec['is_visible'] = True 769 spec['jobid'] = "*" # can't omitted, reset the spec['jobid'] assinged in update_job_states, (once a tricky bug, cost me nearly one day to find!) 770 jobs = self.queues.get_jobs(specs) 771 772 #make all job queue "default" so that the scheduler won't skip some jobs based on queue-partition relationship, only in simulation! 773 for job in jobs: 774 job.queue = "default" 775 791 792 jobs = self.visible_jobs 776 793 # print "running jobs=", [job.jobid for job in self.running_jobs] 777 794 # print "queueing jobs=", [job.jobid for job in self.queuing_jobs] 795 # print "visible jobs=", [job.jobid for job in self.visible_jobs] 778 796 # print "return jobs=", len(jobs) 779 797 … … 802 820 803 821 def _get_queuing_jobs(self): 804 return self.queues.get_jobs([{'jobid':"*", 'state':"queued"}])822 return [job for job in self.visible_jobs if job.is_runnable==True] 805 823 queuing_jobs = property(_get_queuing_jobs) 806 824 807 825 def _get_running_jobs(self): 808 return self.queues.get_jobs([{'jobid':"*", 'state':"running"}])826 return [job for job in self.visible_jobs if job.has_resources==True] 809 827 running_jobs = property(_get_running_jobs) 810 828 … … 812 830 return self.queues.get_jobs([{'jobid':"*", 'state':"recovering"}]) 813 831 recovering_jobs = property(_get_recovering_jobs) 832 833 def get_visible_jobs(self): 834 return self.visible_jobs; 835 get_visible_jobs = exposed(get_visible_jobs) 836 837 def get_running_jobs(self): 838 return [job for job in self.visible_jobs if job.has_resources==True] 839 get_running_jobs = exposed(get_running_jobs) 840 841 def get_queuing_jobs(self): 842 return [job for job in self.visible_jobs if job.is_runnable==True] 843 get_queuing_jobs = exposed(get_queuing_jobs) 814 844 815 845 def _get_job_by_id(self, jobid): … … 820 850 return None 821 851 822 def get_recovering_jobs(self, specs):823 return self.recovering_jobs824 get_running_jobs = exposed(query(get_recovering_jobs))825 826 852 def add_queues(self, specs): 827 853 '''add queues''' … … 899 925 if next_sec < closest_fail_sec: 900 926 closest_fail_sec =next_sec 901 902 927 903 928 if closest_fail_sec == MAXINT: … … 1098 1123 self.run_jobs(jobspecs, [partname]) 1099 1124 1125 def possible_locations(self, job): 1126 '''find the partitions with the size that can right accomodates the job 1127 (returned partions are not necessarily idle)''' 1128 locations = [] 1129 proper_partsize = 64 1130 job_nodes = int(job['nodes']) 1131 1132 for psize in self.part_size_list: 1133 if psize >= job_nodes: 1134 proper_partsize = psize 1135 break 1136 1137 for part in self.cached_partitions.itervalues(): 1138 if int(part.size) == proper_partsize: 1139 locations.append(part) 1140 1141 return locations 1100 1142 1101 1143 def _find_job_location(self, args, drain_partitions=set(), backfilling=False): … … 1110 1152 best_score = sys.maxint 1111 1153 best_partition = None 1112 1113 available_partitions = set() 1114 if required: 1115 for p_name in required: 1116 available_partitions.add(self.cached_partitions[p_name]) 1117 available_partitions.update(self.cached_partitions[p_name]._children) 1118 else: 1119 for p in self.cached_partitions.itervalues(): 1120 skip = False 1121 for bad_name in forbidden: 1122 if p.name==bad_name or bad_name in p.children or bad_name in p.parents: 1123 skip = True 1124 break 1125 if not skip: 1126 available_partitions.add(p) 1127 1128 available_partitions -= drain_partitions 1129 now = self.get_current_time_sec() 1130 1131 for partition in available_partitions: 1132 # check if the current partition is linked to the job's queue (but if reservation locations were 1133 # passed in via the "required" argument, then we know it's all good) 1134 # if not required and queue not in partition.queue.split(':'): 1135 # continue 1136 1137 # if the job needs more time than the partition currently has available, look elsewhere 1154 1155 # get partitions of proper size as the candidates 1156 candidate_partitions = self.possible_locations(args) 1157 #exclude the partitions already drained 1158 if drain_partitions: 1159 candidate_partitions = [part for part in candidate_partitions if part not in drain_partitions] 1160 1161 now = self.get_current_time_sec() 1162 for partition in candidate_partitions: 1163 1164 #skip partitions that are not "idle" 1165 if partition.state != "idle": 1166 continue 1167 1138 1168 if backfilling: 1169 #skip the partition with too short cutoff to backfill the job 1139 1170 if 60*float(walltime) > (partition.backfill_time - now): 1140 1171 continue 1141 1172 1142 if self.can_run(partition, nodes, self.cached_partitions): 1143 # let's check the impact on partitions that would become blocked 1144 #print "can run ", partition.name 1145 score = 0 1146 for p in partition.parents: 1147 if self.cached_partitions[p].state == "idle" and self.cached_partitions[p].scheduled: 1148 score += 1 1149 1173 # let's check the impact on partitions that would become blocked 1174 score = 0 1175 for p in partition.parents: 1176 if self.cached_partitions[p].state == "idle" and self.cached_partitions[p].scheduled: 1177 score += 1 1178 1179 for ch in partition.children: 1180 score += 0.01 1181 1150 1182 if (FAULTAWARE): 1151 1183 Pf = 0 … … 1156 1188 if score < best_score: 1157 1189 best_score = score 1158 best_partition = partition 1190 best_partition = partition 1191 elif score == best_score: 1192 if partition.name > best_partition.name: 1193 best_partition = partition 1159 1194 1160 1195 if best_partition: 1161 #print "return bestpartition=",{jobid: [best_partition.name ]}1196 #print "return bestpartition=",{jobid: [best_partition.name, best_partition.state]} 1162 1197 return {jobid: [best_partition.name]} 1163 1198 … … 1170 1205 return {} 1171 1206 1172 self._partitions_lock.acquire() 1173 try: 1174 self.cached_partitions = copy.deepcopy(self.partitions) 1175 except Exception,e: 1176 print e 1177 self.logger.error("error in copy.deepcopy", exc_info=True) 1178 return {} 1179 finally: 1180 self._partitions_lock.release() 1181 1182 # first, figure out backfilling cutoffs per partition (which we'll also use for picking which partition to drain) 1207 self.cached_partitions = self.partitions 1208 1209 # first, figure out backfilling cutoffs per partition (which we'll also 1210 # use for picking which partition to drain) 1183 1211 job_end_times = {} 1184 1212 for item in end_times: … … 1217 1245 draining_jobs = set() 1218 1246 cannot_start = set() 1247 1219 1248 for idx in range(len(arg_list)): 1220 1249 winning_job = arg_list[idx] … … 1247 1276 drain_partitions.add(location) 1248 1277 #self.logger.info("job %s is draining %s" % (winning_job['jobid'], location.name)) 1278 #self.dbglog.LogMessage("job %s is draining %s" % (winning_job['jobid'], location.name)) 1249 1279 location.draining = True 1250 1280 draining_jobs.add(winning_job['jobid']) … … 1252 1282 # at this time, we only want to try launching one job at a time 1253 1283 if best_partition_dict: 1284 1285 # msg = "idx=%s, jj=%s, job=%s, partition=%s" % (idx, jj, job['jobid'],best_partition_dict[job['jobid']]) 1286 #print msg 1287 # self.dbglog.LogMessage(msg) 1254 1288 break 1255 1289 … … 1257 1291 if not best_partition_dict: 1258 1292 1259 # arg_list.sort(self._walltimecmp) 1293 # arg_list.sort(self._walltimecmp) 1294 # msg = "try to backfill jobs..." 1295 # self.dbglog.LogMessage(msg) 1260 1296 1261 1297 for args in arg_list: 1262 1298 partition_name = self._find_job_location(args, backfilling=True) 1263 1299 if partition_name: 1264 self.logger.info("backfilling job %s" % args['jobid']) 1300 msg = "backfilling job %s(%s)" % (args['jobid'], args['nodes']) 1301 self.logger.info(msg) 1302 self.dbglog.LogMessage(msg) 1265 1303 best_partition_dict.update(partition_name) 1266 break 1267 1304 break 1305 1268 1306 # reserve the stuff in the best_partition_dict, as those partitions are allegedly going to 1269 1307 # be running jobs very soon 1270 1308 # 1271 1309 # also, this is the only part of finding a job location where we need to lock anything 1272 self._partitions_lock.acquire()1310 #self._partitions_lock.acquire() 1273 1311 try: 1274 1312 for p in self.partitions.itervalues(): … … 1279 1317 for partition_list in best_partition_dict.itervalues(): 1280 1318 part = self.partitions[partition_list[0]] 1281 part.reserved_until = self.get_current_time_sec() + 5*601319 ##part.reserved_until = self.get_current_time_sec() + 5*60 1282 1320 part.state = "starting job" 1283 1321 for p in part._parents: … … 1289 1327 except: 1290 1328 self.logger.error("error in find_job_location", exc_info=True) 1291 self._partitions_lock.release()1329 #self._partitions_lock.release() 1292 1330 1293 1331 #print "best_partition_dict=", best_partition_dict
![(please configure the [header_logo] section in trac.ini)](/projects/cobalt/chrome/common/trac_banner.png)