Index: /trunk/src/lib/Components/qsim.py
===================================================================
--- /trunk/src/lib/Components/qsim.py (revision 1600)
+++ /trunk/src/lib/Components/qsim.py (revision 1601)
@@ -37,5 +37,4 @@
 default_SPECIFICITY = 0.9
 default_FAILURE_LOG = "failure.lists"
-
 logging.basicConfig()
 logger = logging.getLogger('Qsim')
@@ -47,4 +46,5 @@
 SET_event = set(['I', 'Q', 'S', 'E', 'F', 'R'])
 FAULTAWARE = False
+IDEALWALLTIME = True
 
 def parseline(line):
@@ -158,4 +158,5 @@
                             "failure_time", "location", "state", "is_visible", 
                             "args",
+                            "user",
                             "system_state",
                             "starttime",
@@ -183,5 +184,6 @@
         
         self.walltime = spec.get("walltime")   #in minutes
-        
+        self.user = spec.get("user", "unknown")
+        self.project = spec.get("project", "unknown")
         self.nodes = spec.get("nodes", 0)
         self.runtime = spec.get("runtime", 0)
@@ -233,5 +235,5 @@
         self.policy = policy
         #create default queue
-        self.add_queues([{"name":"default", "policy":self.policy}])         
+        self.add_queues([{"name":"default", "policy":"default"}])         
  
     def add_jobs(self, specs, callback=None, cargs={}):
@@ -240,6 +242,8 @@
         for spec in specs:
             if spec['queue'] not in queue_names:
-                self.add_queues([{"name":spec['queue'], "policy":self.policy}])
-                queue_names.append(spec['queue'])
+                spec['queue'] = "default"
+   
+ #               self.add_queues([{"name":spec['queue'], "policy":self.policy}])
+ #               queue_names.append(spec['queue'])
                
         results = []
@@ -300,4 +304,5 @@
         self.init_partition(partnames)
         self.part_size_list = []
+     
         for part in self.partitions.itervalues():
             if int(part.size) not in self.part_size_list:
@@ -337,4 +342,5 @@
         self.queues = SimQueueDict(policy=kwargs['policy'])
         self.init_queues()
+        self.visible_jobs = []
         
         #initialize failures
@@ -350,4 +356,7 @@
         #initialize PBS-style logger
         self.pbslog = PBSlogger(self.output_log)
+        
+        #initialize debug logger
+        self.dbglog = PBSlogger(self.output_log+"-debug")
         
         #finish tag
@@ -484,4 +493,5 @@
             #convert walltime from 'hh:mm:ss' to float of minutes
             format_walltime = tmp.get('Resource_List.walltime')
+            spec['walltime'] = 0
             if format_walltime:
                 segs = format_walltime.split(':')
@@ -489,8 +499,13 @@
             else:  #invalid job entry, discard
                 spec['valid'] = False
-            
+                
             if tmp.get('start') and tmp.get('end'):
                 act_run_time = float(tmp.get('end')) - float(tmp.get('start'))
                 spec['runtime'] = str(round(act_run_time, 1))
+                
+                if IDEALWALLTIME:
+                    wtime = (round(act_run_time / 60, 2) + float(spec['walltime']))/2
+                    #wtime = act_run_time / 60
+                    spec['walltime'] = str(round(wtime, 2))
             else:
                 spec['valid'] = False
@@ -498,7 +513,13 @@
             if tmp.get('Resource_List.nodect'):
                 spec['nodes'] = tmp.get('Resource_List.nodect')
+
             else:  #invalid job entry, discard
                 spec['valid'] = False
-            
+             
+            if tmp.get('user'):
+                spec['user'] = tmp.get('user')
+            if tmp.get('project'):
+                spec['project'] = tmp.get('project')            
+
             spec['state'] = 'invisible'
             spec['start_time'] = '0'
@@ -543,9 +564,9 @@
             log_walltime = "%s:%s:00" % (walltime_hours, walltime_minutes)
             if eventtype == 'S':  #start running 
-                message = "%s;S;%d;queue=%s qtime=%s Resource_List.ncpus=%s Resource_List.walltime=%s start=%s exec_host=%s" % \
+                message = "%s;S;%d;queue=%s qtime=%s Resource_List.nodect=%s Resource_List.walltime=%s start=%s exec_host=%s" % \
                 (timestamp, spec['jobid'], spec['queue'], spec['submittime'], 
                  spec['nodes'], log_walltime, spec['start_time'], ":".join(spec['location']))
             elif eventtype == 'E':  #end
-                message = "%s;E;%d;queue=%s qtime=%s Resource_List.ncpus=%s Resource_List.walltime=%s start=%s end=%f exec_host=%s runtime=%s" % \
+                message = "%s;E;%d;queue=%s qtime=%s Resource_List.nodect=%s Resource_List.walltime=%s start=%s end=%f exec_host=%s runtime=%s" % \
                 (timestamp, spec['jobid'], spec['queue'], spec['submittime'], spec['nodes'], log_walltime, spec['start_time'], 
                  round(float(spec['end_time']), 1), ":".join(spec['location']),
@@ -553,5 +574,5 @@
             elif eventtype == 'F':  #failure
                 frag_runtime = round(float(spec['failure_time']) - float(spec['start_time']), 1)  #running time before failure(after the latest start)
-                message = "%s;F;%d;queue=%s qtime=%s Resource_List.ncpus=%s Resource_List.walltime=%s exec_host=%s start=%s frag_runtime=%s complete=%f" % \
+                message = "%s;F;%d;queue=%s qtime=%s Resource_List.nodect=%s Resource_List.walltime=%s exec_host=%s start=%s frag_runtime=%s complete=%f" % \
                 (timestamp, spec['jobid'], spec['queue'], spec['submittime'], 
                  spec['nodes'], log_walltime, ":".join(spec['location']), spec['start_time'], 
@@ -559,9 +580,8 @@
                 )
             elif eventtype == 'P':  #pending
-                message = "%s;P;%d;queue=%s qtime=%s Resource_List.ncpus=%s Resource_List.walltime=%s exec_host=%s start=%s" % \
+                message = "%s;P;%d;queue=%s qtime=%s Resource_List.nodect=%s Resource_List.walltime=%s exec_host=%s start=%s" % \
                 (timestamp, spec['jobid'], spec['queue'], spec['submittime'], 
                  spec['nodes'], log_walltime, ":".join(spec['location']), spec['start_time'], 
                 )
-                print "message=", message
             else:
                 print "invalid event type, type=", type
@@ -632,7 +652,9 @@
                 updates.update(rec_updates)
                 
+            updates['has_resources'] = False
+                
             if updates.has_key('state'):
                 newstate = updates['state']
-                
+            
             if CHECKPOINT:
                 print "enter checkpoint handling****"
@@ -640,6 +662,4 @@
                 frag_runtime = float(jobspec['failure_time']) - float(jobspec['start_time'])
                 updates['remain_time'] = jobspec['remain_time'] - frag_runtime
-            
-            updates['has_resources'] = False
                
         else:#other event
@@ -666,8 +686,13 @@
         ids_str = self.get_current_time_job()
         ids = ids_str.split(':')
+        cur_event = self.get_current_time_event()
         for id in ids:
             for spec in specs:
                 spec['jobid'] = int(id)
-            ret = self.queues.get_jobs(specs, _update_job_states, updates)
+            ret_jobs = self.queues.get_jobs(specs, _update_job_states, updates)
+            if cur_event == "Q":
+                self.visible_jobs.extend(ret_jobs)
+            elif cur_event=="E":
+                self.visible_jobs = [j for j in self.visible_jobs if j not in ret_jobs]
         return 0
      
@@ -759,21 +784,14 @@
             else:
                 self.update_job_states(specs, {})
-            #self.update_job_states(specs, {})  #needchange : according to time stamp, update specific job's attribution
-        
+           
         if len(self.recovering_jobs) > 0:
             self.update_recovering_jobs({})
         
         self.increment_tag = True
-        for spec in specs:
-            spec['is_visible'] = True
-            spec['jobid'] = "*"   # can't omitted, reset the spec['jobid'] assinged in update_job_states, (once a tricky bug, cost me nearly one day to find!)
-        jobs = self.queues.get_jobs(specs)
-
-        #make all job queue "default" so that the scheduler won't skip some jobs based on queue-partition relationship, only in simulation!
-        for job in jobs:
-            job.queue = "default"
-            
+        
+        jobs = self.visible_jobs           
 #        print "running jobs=", [job.jobid for job in self.running_jobs]
 #        print "queueing jobs=", [job.jobid for job in self.queuing_jobs]
+#        print "visible jobs=", [job.jobid for job in self.visible_jobs]
 #        print "return jobs=", len(jobs) 
 
@@ -802,9 +820,9 @@
     
     def _get_queuing_jobs(self):
-        return self.queues.get_jobs([{'jobid':"*", 'state':"queued"}])
+        return [job for job in self.visible_jobs if job.is_runnable==True]
     queuing_jobs = property(_get_queuing_jobs)
     
     def _get_running_jobs(self):
-        return self.queues.get_jobs([{'jobid':"*", 'state':"running"}])
+        return [job for job in self.visible_jobs if job.has_resources==True]
     running_jobs = property(_get_running_jobs)
     
@@ -812,4 +830,16 @@
         return self.queues.get_jobs([{'jobid':"*", 'state':"recovering"}])
     recovering_jobs = property(_get_recovering_jobs)
+    
+    def get_visible_jobs(self):
+        return self.visible_jobs;
+    get_visible_jobs = exposed(get_visible_jobs)
+    
+    def get_running_jobs(self):
+        return [job for job in self.visible_jobs if job.has_resources==True]
+    get_running_jobs = exposed(get_running_jobs)
+    
+    def get_queuing_jobs(self):
+        return [job for job in self.visible_jobs if job.is_runnable==True]
+    get_queuing_jobs = exposed(get_queuing_jobs)
     
     def _get_job_by_id(self, jobid):
@@ -820,8 +850,4 @@
             return None
    
-    def get_recovering_jobs(self, specs):
-        return self.recovering_jobs
-    get_running_jobs = exposed(query(get_recovering_jobs))
-           
     def add_queues(self, specs):
         '''add queues'''
@@ -899,5 +925,4 @@
                 if next_sec < closest_fail_sec:
                     closest_fail_sec =next_sec
-        
                         
         if closest_fail_sec == MAXINT:
@@ -1098,4 +1123,21 @@
         self.run_jobs(jobspecs, [partname])
         
+    def possible_locations(self, job):
+        '''find the partitions with the size that can right accomodates the job
+        (returned partions are not necessarily idle)'''
+        locations = []
+        proper_partsize = 64
+        job_nodes = int(job['nodes'])
+        
+        for psize in self.part_size_list:
+            if psize >= job_nodes:
+                proper_partsize = psize
+                break             
+            
+        for part in self.cached_partitions.itervalues():
+            if int(part.size) == proper_partsize:
+                locations.append(part)
+                
+        return locations
 
     def _find_job_location(self, args, drain_partitions=set(), backfilling=False):
@@ -1110,42 +1152,32 @@
         best_score = sys.maxint
         best_partition = None
-                
-        available_partitions = set()
-        if required:
-            for p_name in required:
-                available_partitions.add(self.cached_partitions[p_name])
-                available_partitions.update(self.cached_partitions[p_name]._children)
-        else:
-            for p in self.cached_partitions.itervalues():
-                skip = False
-                for bad_name in forbidden:
-                    if p.name==bad_name or bad_name in p.children or bad_name in p.parents:
-                        skip = True
-                        break
-                if not skip:
-                    available_partitions.add(p)
-        
-        available_partitions -= drain_partitions
-        now = self.get_current_time_sec()
-        
-        for partition in available_partitions:
-            # check if the current partition is linked to the job's queue (but if reservation locations were
-            # passed in via the "required" argument, then we know it's all good)
-#            if not required and queue not in partition.queue.split(':'):
-#                continue
-            
-            # if the job needs more time than the partition currently has available, look elsewhere    
+        
+        # get partitions of proper size as the candidates
+        candidate_partitions = self.possible_locations(args)
+        #exclude the partitions already drained
+        if drain_partitions:
+            candidate_partitions = [part for part in candidate_partitions if part not in drain_partitions]
+        
+        now = self.get_current_time_sec()         
+        for partition in candidate_partitions:
+            
+            #skip partitions that are not "idle"
+            if partition.state != "idle":
+                continue
+            
             if backfilling:
+                #skip the partition with too short cutoff to backfill the job
                 if 60*float(walltime) > (partition.backfill_time - now):
                     continue
                 
-            if self.can_run(partition, nodes, self.cached_partitions):
-                # let's check the impact on partitions that would become blocked
-                #print "can run ", partition.name
-                score = 0
-                for p in partition.parents:
-                    if self.cached_partitions[p].state == "idle" and self.cached_partitions[p].scheduled:
-                        score += 1
-                
+            # let's check the impact on partitions that would become blocked
+            score = 0
+            for p in partition.parents:
+                if self.cached_partitions[p].state == "idle" and self.cached_partitions[p].scheduled:
+                    score += 1
+                
+                for ch in partition.children:
+                    score += 0.01
+               
                 if (FAULTAWARE):
                     Pf = 0
@@ -1156,8 +1188,11 @@
                 if score < best_score:
                     best_score = score
-                    best_partition = partition        
+                    best_partition = partition
+                elif score == best_score:
+                    if partition.name > best_partition.name:
+                        best_partition = partition
 
         if best_partition:
-            #print "return bestpartition=",{jobid: [best_partition.name]}
+            #print "return bestpartition=",{jobid: [best_partition.name, best_partition.state]}
             return {jobid: [best_partition.name]}
  
@@ -1170,15 +1205,8 @@
             return {}
         
-        self._partitions_lock.acquire()
-        try:
-            self.cached_partitions = copy.deepcopy(self.partitions)
-        except Exception,e:
-            print e
-            self.logger.error("error in copy.deepcopy", exc_info=True)
-            return {}
-        finally:
-            self._partitions_lock.release()
-
-        # first, figure out backfilling cutoffs per partition (which we'll also use for picking which partition to drain)
+        self.cached_partitions = self.partitions
+
+        # first, figure out backfilling cutoffs per partition (which we'll also 
+        # use for picking which partition to drain)
         job_end_times = {}
         for item in end_times:
@@ -1217,4 +1245,5 @@
         draining_jobs = set()
         cannot_start = set()
+
         for idx in range(len(arg_list)):
             winning_job = arg_list[idx]
@@ -1247,4 +1276,5 @@
                     drain_partitions.add(location)
                     #self.logger.info("job %s is draining %s" % (winning_job['jobid'], location.name))
+                    #self.dbglog.LogMessage("job %s is draining %s" % (winning_job['jobid'], location.name))
                     location.draining = True
                     draining_jobs.add(winning_job['jobid'])
@@ -1252,4 +1282,8 @@
             # at this time, we only want to try launching one job at a time
             if best_partition_dict:
+                 
+            #    msg =  "idx=%s, jj=%s, job=%s, partition=%s" % (idx, jj, job['jobid'],best_partition_dict[job['jobid']])
+                #print msg                
+            #    self.dbglog.LogMessage(msg)
                 break
         
@@ -1257,18 +1291,22 @@
         if not best_partition_dict:
             
-            # arg_list.sort(self._walltimecmp)
+        # arg_list.sort(self._walltimecmp)
+        #   msg = "try to backfill jobs..."
+        #   self.dbglog.LogMessage(msg)
 
             for args in arg_list:
                 partition_name = self._find_job_location(args, backfilling=True)
                 if partition_name:
-                    self.logger.info("backfilling job %s" % args['jobid'])
+                    msg = "backfilling job %s(%s)" % (args['jobid'], args['nodes'])
+                    self.logger.info(msg)
+                    self.dbglog.LogMessage(msg)
                     best_partition_dict.update(partition_name)
-                    break
-
+                    break                
+                    
         # reserve the stuff in the best_partition_dict, as those partitions are allegedly going to 
         # be running jobs very soon
         #
         # also, this is the only part of finding a job location where we need to lock anything
-        self._partitions_lock.acquire()
+        #self._partitions_lock.acquire()
         try:
             for p in self.partitions.itervalues():
@@ -1279,5 +1317,5 @@
             for partition_list in best_partition_dict.itervalues():
                 part = self.partitions[partition_list[0]] 
-                part.reserved_until = self.get_current_time_sec() + 5*60
+                ##part.reserved_until = self.get_current_time_sec() + 5*60
                 part.state = "starting job"
                 for p in part._parents:
@@ -1289,5 +1327,5 @@
         except:
             self.logger.error("error in find_job_location", exc_info=True)
-        self._partitions_lock.release()
+        #self._partitions_lock.release()
         
         #print "best_partition_dict=", best_partition_dict
