Changeset 4747

Show
Ignore:
Timestamp:
06/16/09 16:39:07 (5 months ago)
Author:
goodell
Message:

Merge process mapping fixes from trunk -> mpich2-1.1 branch.

This commit significantly reduces the amount of time that MPI_Init takes
when using mpd on systems with many nodes.

This is the merge of 4 commits:
r4666 - Merging process mapping code into trunk
r4671 - Fixed srcdir typo in the mpd makefile
r4705 - Prevent mpds from crashing when jobs are launched that don't use all hosts.
r4706 - Multiple fixes for mpd's new process mapping capability.

Location:
mpich2/branches/release/mpich2-1.1/src
Files:
5 modified

Legend:

Unmodified
Added
Removed
  • mpich2/branches/release/mpich2-1.1/src/mpid/ch3/src/mpid_vc.c

    r4390 r4747  
    2121#include <errno.h> 
    2222#endif 
     23#include <ctype.h> 
    2324 
    2425 
     
    797798#endif 
    798799 
     800 
     801#define parse_error() MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**intern", "**intern %s", "parse error") 
     802/* advance _c until we find a non whitespace character */ 
     803#define skip_space(_c) while (isspace(*(_c))) ++(_c) 
     804/* return true iff _c points to a character valid as an indentifier, i.e., [-_a-zA-Z0-9] */ 
     805#define isident(_c) (isalnum(_c) || (_c) == '-' || (_c) == '_') 
     806 
     807/* give an error iff *_c != _e */ 
     808#define expect_c(_c, _e) do { if (*(_c) != _e) parse_error(); } while (0) 
     809#define expect_and_skip_c(_c, _e) do { expect_c(_c, _e); ++c; } while (0) 
     810/* give an error iff the first |_m| characters of the string _s are equal to _e */ 
     811#define expect_s(_s, _e) (strncmp(_s, _e, strlen(_e)) == 0 && !isident((_s)[strlen(_e)])) 
     812 
     813typedef enum { 
     814    NULL_MAPPING = 0, 
     815    VECTOR_MAPPING 
     816} mapping_type_t; 
     817 
     818#define VECTOR "vector" 
     819 
     820typedef struct map_block 
     821{ 
     822    int start_id; 
     823    int count; 
     824    int size; 
     825} map_block_t; 
     826 
     827#undef FUNCNAME 
     828#define FUNCNAME parse_mapping 
     829#undef FCNAME 
     830#define FCNAME MPIDI_QUOTE(FUNCNAME) 
     831static int parse_mapping(char *map_str, mapping_type_t *type, map_block_t **map, int *nblocks) 
     832{ 
     833    int mpi_errno = MPI_SUCCESS; 
     834    char *c = map_str, *d; 
     835    int num_blocks = 0; 
     836    int i; 
     837    MPIU_CHKPMEM_DECL(1); 
     838    MPIDI_STATE_DECL(MPID_STATE_PARSE_MAPPING); 
     839 
     840    MPIDI_FUNC_ENTER(MPID_STATE_PARSE_MAPPING); 
     841 
     842    /* parse string of the form: 
     843       '(' <format> ',' '(' <num> ',' <num> ',' <num> ')' {',' '(' <num> ',' <num> ',' <num> ')'} ')' 
     844 
     845       the values of each 3-tuple have the following meaning (X,Y,Z): 
     846         X - node id start value 
     847         Y - number of nodes with size Z 
     848         Z - number of processes assigned to each node 
     849     */ 
     850    MPIU_DBG_MSG_S(CH3_OTHER,VERBOSE,"parsing mapping string '%s'", map_str); 
     851 
     852    if (!strlen(map_str)) { 
     853        /* An empty-string indicates an inability to determine or express the 
     854         * process layout on the part of the process manager.  Consider this a 
     855         * non-fatal error case. */ 
     856        *type = NULL_MAPPING; 
     857        *map = NULL; 
     858        *nblocks = 0; 
     859        goto fn_exit; 
     860    } 
     861 
     862    skip_space(c); 
     863    expect_and_skip_c(c, '('); 
     864    skip_space(c); 
     865 
     866    d = c; 
     867    if (expect_s(d, VECTOR)) 
     868        *type = VECTOR_MAPPING; 
     869    else 
     870        parse_error(); 
     871    c += strlen(VECTOR); 
     872    skip_space(c); 
     873 
     874    /* first count the number of block descriptors */ 
     875    d = c; 
     876    while (*d) { 
     877        if (*d == '(') 
     878            ++num_blocks; 
     879        ++d; 
     880    } 
     881 
     882    MPIU_CHKPMEM_MALLOC(*map, map_block_t *, sizeof(map_block_t) * num_blocks, mpi_errno, "map"); 
     883 
     884    /* parse block descriptors */ 
     885    for (i = 0; i < num_blocks; ++i) { 
     886        expect_and_skip_c(c, ','); 
     887        skip_space(c); 
     888 
     889        expect_and_skip_c(c, '('); 
     890        skip_space(c); 
     891 
     892        if (!isdigit(*c)) 
     893            parse_error(); 
     894        (*map)[i].start_id = strtol(c, &c, 0); 
     895        skip_space(c); 
     896 
     897        expect_and_skip_c(c, ','); 
     898        skip_space(c); 
     899 
     900        if (!isdigit(*c)) 
     901            parse_error(); 
     902        (*map)[i].count = strtol(c, &c, 0); 
     903        skip_space(c); 
     904 
     905        expect_and_skip_c(c, ','); 
     906        skip_space(c); 
     907 
     908        if (!isdigit(*c)) 
     909            parse_error(); 
     910        (*map)[i].size = strtol(c, &c, 0); 
     911 
     912        expect_and_skip_c(c, ')'); 
     913        skip_space(c); 
     914    } 
     915 
     916    expect_and_skip_c(c, ')'); 
     917 
     918    *nblocks = num_blocks; 
     919    MPIU_CHKPMEM_COMMIT(); 
     920fn_exit: 
     921    MPIDI_FUNC_EXIT(MPID_STATE_PARSE_MAPPING); 
     922    return mpi_errno; 
     923fn_fail: 
     924    MPIU_CHKPMEM_REAP(); 
     925    goto fn_exit; 
     926} 
     927 
     928#if 0 
     929static void t(const char *s, int nprocs) 
     930{ 
     931    int ret; 
     932    map_block_t *mb; 
     933    int nblocks=0; 
     934    int i; 
     935    mapping_type_t mt = -1; 
     936    int rank; 
     937    int block, block_node, node_proc; 
     938 
     939    ret = parse_mapping(strdup(s), &mt, &mb, &nblocks); 
     940    printf("str=\"%s\" type=%d ret=%d\n", s, mt, ret); 
     941    if (ret) return; 
     942    for (i = 0; i < nblocks; ++i) 
     943        printf("    %d: start=%d size=%d count=%d\n", i, mb[i].start_id, mb[i].size, mb[i].count); 
     944    printf("\n"); 
     945 
     946 
     947    rank = 0; 
     948    while (rank < nprocs) { 
     949        int node_id; 
     950        for (block = 0; block < nblocks; ++block) { 
     951            node_id = mb[block].start_id; 
     952            for (block_node = 0; block_node < mb[block].count; ++block_node) { 
     953                for (node_proc = 0; node_proc < mb[block].size; ++node_proc) { 
     954                    printf("    %d  %d\n", rank, node_id); 
     955                    ++rank; 
     956                    if (rank == nprocs) 
     957                        goto done; 
     958                } 
     959                ++node_id; 
     960            } 
     961        } 
     962    } 
     963done: 
     964    return; 
     965 
     966} 
     967 
     968 
     969 void test_parse_mapping(void) 
     970{ 
     971    t("(vector, (0,1,1))", 5); 
     972    t("(vector, (0,1,1), (1,5,3), (6,2, 5))", 100); 
     973    t("(vector, (1,1,1), (0,2,2))", 5); 
     974     
     975    t("(vector, (1,1,1), (0,2,2),)", 5); 
     976    t("XXX, (1,1))", 1); 
     977    t("vector, (1,1))", 1); 
     978    t("(vector, (1.11, 2,2))", 1); 
     979    t("", 1); 
     980 
     981} 
     982 
     983 
     984#endif 
     985 
     986#undef FUNCNAME 
     987#define FUNCNAME populate_ids_from_mapping 
     988#undef FCNAME 
     989#define FCNAME MPIDI_QUOTE(FUNCNAME) 
     990static int populate_ids_from_mapping(char *mapping, int *num_nodes, MPIDI_PG_t *pg, int *did_map) 
     991{ 
     992    int mpi_errno = MPI_SUCCESS; 
     993    /* process-mapping is available */ 
     994    mapping_type_t mt = -1; 
     995    map_block_t *mb = NULL; 
     996    int nblocks = 0; 
     997    int rank; 
     998    int block, block_node, node_proc; 
     999 
     1000    *did_map = 1; /* reset upon failure */ 
     1001 
     1002    mpi_errno = parse_mapping(mapping, &mt, &mb, &nblocks); 
     1003    if (mpi_errno) MPIU_ERR_POP(mpi_errno); 
     1004 
     1005    if (NULL_MAPPING == mt) goto fn_fail; 
     1006    MPIU_ERR_CHKANDJUMP1(mt != VECTOR_MAPPING, mpi_errno, MPI_ERR_OTHER, "**intern", "**intern %s", "unsupported mapping type"); 
     1007 
     1008    rank = 0; 
     1009    /* for a representation like (block,N,(1,1)) this while loop causes us to 
     1010     * re-use that sole map block over and over until we have assigned node 
     1011     * ids to every process */ 
     1012    while (rank < pg->size) { 
     1013        for (block = 0; block < nblocks; ++block) { 
     1014            int node_id = mb[block].start_id; 
     1015            for (block_node = 0; block_node < mb[block].count; ++block_node) { 
     1016                if (node_id > *num_nodes) 
     1017                    *num_nodes = node_id; 
     1018 
     1019                for (node_proc = 0; node_proc < mb[block].size; ++node_proc) { 
     1020                    pg->vct[rank].node_id = node_id; 
     1021                    ++rank; 
     1022                    if (rank == pg->size) 
     1023                        goto fn_exit; 
     1024                } 
     1025                ++node_id; 
     1026            } 
     1027        } 
     1028    } 
     1029 
     1030fn_exit: 
     1031    ++(*num_nodes); /* add one to get the num instead of the max */ 
     1032    MPIU_Free(mb); 
     1033    return mpi_errno; 
     1034fn_fail: 
     1035    *did_map = 0; 
     1036    goto fn_exit; 
     1037} 
     1038 
    7991039/* Fills in the node_id info from PMI info.  Adapted from MPIU_Get_local_procs. 
    8001040   This function is collective over the entire PG because PMI_Barrier is called. 
    801     
     1041 
    8021042   our_pg_rank should be set to -1 if this is not the current process' PG.  This 
    8031043   is currently not supported due to PMI limitations. 
    8041044 
    805    Algorithm: 
    806    
     1045   Fallback Algorithm: 
     1046 
    8071047   Each process kvs_puts its hostname and stores the total number of 
    8081048   processes (g_num_global).  Each process determines the number of nodes 
    8091049   (g_num_nodes) and assigns a node id to each process (g_node_ids[]): 
    810     
     1050 
    8111051     For each hostname the process seaches the list of unique nodes 
    8121052     names (node_names[]) for a match.  If a match is found, the node id 
     
    8141054     added to the list of node names. 
    8151055*/ 
     1056#undef FUNCNAME 
     1057#define FUNCNAME MPIDI_Populate_vc_node_ids 
     1058#undef FCNAME 
     1059#define FCNAME MPIDI_QUOTE(FUNCNAME) 
    8161060int MPIDI_Populate_vc_node_ids(MPIDI_PG_t *pg, int our_pg_rank) 
    8171061{ 
     
    8221066    int i, j; 
    8231067    char *key; 
     1068    char *value; 
    8241069    int key_max_sz; 
     1070    int val_max_sz; 
    8251071    char *kvs_name; 
    8261072    char **node_names; 
     
    8281074    int no_local = 0; 
    8291075    int odd_even_cliques = 0; 
    830     MPIU_CHKLMEM_DECL(3); 
    831  
    832 #ifdef USE_PMI2_API 
    833     { 
    834         int *node_ids; 
    835         int outlen; 
    836         int found = FALSE; 
    837         MPIU_CHKLMEM_MALLOC(node_ids, int *, pg->size * sizeof(int), mpi_errno, "node_ids"); 
    838  
    839         mpi_errno = PMI_Info_GetJobAttrIntArray("nodeIDs", node_ids, pg->size, &outlen, &found); 
    840         if (mpi_errno) MPIU_ERR_POP(mpi_errno); 
    841         MPIU_ERR_CHKANDJUMP1(outlen != pg->size, mpi_errno, MPI_ERR_OTHER, "**intern", "**intern %s", "did not receive enough nodeids"); 
    842         g_num_nodes = 0; 
    843         for (i = 0; i < pg->size; ++i) { 
    844             pg->vct[i].node_id = node_ids[i]; 
    845             if (g_num_nodes < node_ids[i]) 
    846                 g_num_nodes = node_ids[i]; 
    847         } 
    848          
    849         ++g_num_nodes; 
    850          
    851         /* FIXME: need to handle oddeven cliques DARIUS */ 
    852     } 
    853 #else 
    854     if (our_pg_rank == -1) { 
    855         /* FIXME this routine can't handle the dynamic process case at this 
    856            time.  This will require more support from the process manager. */ 
    857         MPIU_Assert(0); 
    858     } 
    859  
    860     mpi_errno = publish_node_id(pg, our_pg_rank); 
    861     if (mpi_errno) MPIU_ERR_POP(mpi_errno); 
     1076    MPIU_CHKLMEM_DECL(4); 
     1077 
     1078    if (pg->size == 1) { 
     1079        pg->vct[0].node_id = g_num_nodes++; 
     1080        goto fn_exit; 
     1081    } 
    8621082 
    8631083    /* Used for debugging only.  This disables communication over shared memory */ 
     
    8891109    } 
    8901110 
    891     /* Allocate space for pmi key */ 
     1111#ifdef USE_PMI2_API 
     1112#if 0 /* use nodeid list */ 
     1113    { 
     1114        int *node_ids; 
     1115        int outlen; 
     1116        int found = FALSE; 
     1117        MPIU_CHKLMEM_MALLOC(node_ids, int *, pg->size * sizeof(int), mpi_errno, "node_ids"); 
     1118 
     1119        mpi_errno = PMI_Info_GetJobAttrIntArray("nodeIDs", node_ids, pg->size, &outlen, &found); 
     1120        if (mpi_errno) MPIU_ERR_POP(mpi_errno); 
     1121        MPIU_ERR_CHKANDJUMP1(!found, mpi_errno, MPI_ERR_OTHER, "**intern", "**intern %s", "nodeIDs attribute not found"); 
     1122        MPIU_ERR_CHKANDJUMP1(outlen != pg->size, mpi_errno, MPI_ERR_OTHER, "**intern", "**intern %s", "did not receive enough nodeids"); 
     1123        g_num_nodes = 0; 
     1124        for (i = 0; i < pg->size; ++i) { 
     1125            pg->vct[i].node_id = node_ids[i]; 
     1126            if (g_num_nodes < node_ids[i]) 
     1127                g_num_nodes = node_ids[i]; 
     1128        } 
     1129 
     1130        ++g_num_nodes; 
     1131 
     1132        /* FIXME: need to handle oddeven cliques DARIUS */ 
     1133    } 
     1134#else 
     1135    { 
     1136        char process_mapping[PMI_MAX_VALLEN]; 
     1137        int outlen; 
     1138        int found = FALSE; 
     1139        int i; 
     1140        map_block_t *mb; 
     1141        int nblocks; 
     1142        int rank; 
     1143        int block, block_node, node_proc; 
     1144        int did_map = 0; 
     1145        int num_nodes = 0; 
     1146 
     1147        mpi_errno = PMI_Info_GetJobAttr("process-mapping", process_mapping, sizeof(process_mapping), &found); 
     1148        if (mpi_errno) MPIU_ERR_POP(mpi_errno); 
     1149        MPIU_ERR_CHKANDJUMP1(!found, mpi_errno, MPI_ERR_OTHER, "**intern", "**intern %s", "process-mapping attribute not found"); 
     1150        /* this code currently assumes pg is comm_world */ 
     1151        mpi_errno = populate_ids_from_mapping(process_mapping, &num_nodes, pg, &did_map); 
     1152        if (mpi_errno) MPIU_ERR_POP(mpi_errno); 
     1153        MPIU_ERR_CHKANDJUMP1(!did_map, mpi_errno, MPI_ERR_OTHER, "**intern", "**intern %s", "unable to populate node ids from process-mapping"); 
     1154        g_num_nodes = num_nodes; 
     1155    } 
     1156#endif 
     1157#else /* USE_PMI2_API */ 
     1158    if (our_pg_rank == -1) { 
     1159        /* FIXME this routine can't handle the dynamic process case at this 
     1160           time.  This will require more support from the process manager. */ 
     1161        MPIU_Assert(0); 
     1162    } 
     1163 
     1164    /* Allocate space for pmi key and value */ 
    8921165    pmi_errno = PMI_KVS_Get_key_length_max(&key_max_sz); 
    8931166    MPIU_ERR_CHKANDJUMP1(pmi_errno, mpi_errno, MPI_ERR_OTHER, "**fail", "**fail %d", pmi_errno); 
    894  
    8951167    MPIU_CHKLMEM_MALLOC(key, char *, key_max_sz, mpi_errno, "key"); 
    8961168 
     1169    pmi_errno = PMI_KVS_Get_value_length_max(&val_max_sz); 
     1170    MPIU_ERR_CHKANDJUMP1(pmi_errno, mpi_errno, MPI_ERR_OTHER, "**fail", "**fail %d", pmi_errno); 
     1171    MPIU_CHKLMEM_MALLOC(value, char *, val_max_sz, mpi_errno, "value"); 
     1172 
    8971173    mpi_errno = MPIDI_PG_GetConnKVSname(&kvs_name); 
     1174    if (mpi_errno) MPIU_ERR_POP(mpi_errno); 
     1175 
     1176    /* See if process manager supports process-mapping keyval */ 
     1177 
     1178    /* FIXME 'process-mapping' only applies for the original PG (MPI_COMM_WORLD) */ 
     1179    pmi_errno = PMI_KVS_Get(kvs_name, "process-mapping", value, val_max_sz); 
     1180    if (pmi_errno == 0) { 
     1181        int did_map = 0; 
     1182        int num_nodes = 0; 
     1183        /* this code currently assumes pg is comm_world */ 
     1184        mpi_errno = populate_ids_from_mapping(value, &num_nodes, pg, &did_map); 
     1185        if (mpi_errno) MPIU_ERR_POP(mpi_errno); 
     1186        g_num_nodes = num_nodes; 
     1187        if (did_map) { 
     1188            goto fn_exit; 
     1189        } 
     1190        else { 
     1191            MPIU_DBG_MSG_S(CH3_OTHER,TERSE,"did_map==0, unable to populate node ids from mapping=%s",value); 
     1192        } 
     1193        /* else fall through to O(N^2) PMI_KVS_Gets version */ 
     1194    } 
     1195    else { 
     1196        MPIU_DBG_MSG(CH3_OTHER,TERSE,"unable to obtain the 'process-mapping' PMI key"); 
     1197    } 
     1198 
     1199    mpi_errno = publish_node_id(pg, our_pg_rank); 
    8981200    if (mpi_errno) MPIU_ERR_POP(mpi_errno); 
    8991201 
  • mpich2/branches/release/mpich2-1.1/src/pm/mpd/Makefile.in

    r1225 r4747  
    8686        @-rm -f *.o *.dep *~ ${EXAMPLES} core* *.1 *.pyc 
    8787        @-rm -f *.gcno *.gcda *.bb *.bbg 
    88         @-rm -f ${srcdir}/*.gcno ${scrdir}/*.gcda 
     88        @-rm -f ${srcdir}/*.gcno ${srcdir}/*.gcda 
    8989 
    9090# echo "some of the following might be executables you want to remove" 
  • mpich2/branches/release/mpich2-1.1/src/pm/mpd/mpd.py

    r4326 r4747  
    631631            msg['ringsize'] = 0 
    632632            msg['ring_ncpus'] = 0 
     633            # maps rank => hostname 
     634            msg['process_mapping'] = {} 
    633635            if msg.has_key('try_1st_locally'): 
    634636                self.do_mpdrun(msg) 
     
    798800            msg['totalview'] = 0 
    799801            msg['ifhns'] = {} 
     802            # maps rank => hostname 
     803            msg['process_mapping'] = {} 
    800804            self.spawnQ.append(msg) 
    801805        elif msg['cmd'] == 'publish_name': 
     
    827831            sock.send_dict_msg(msgToSend) 
    828832 
     833    def calculate_process_mapping(self,mapping_dict): 
     834        # mapping_dict maps ranks => hostnames 
     835        ranks = list(mapping_dict.keys()) 
     836        ranks.sort() 
     837 
     838        # assign node ids based in first-come-first-serve order when iterating 
     839        # over the ranks in increasing order 
     840        next_id = 0 
     841        node_ids = {} 
     842        for rank in ranks: 
     843            host = mapping_dict[rank] 
     844            if not node_ids.has_key(host): 
     845                node_ids[host] = next_id 
     846                next_id += 1 
     847 
     848 
     849        # maps {node_id_A: set([rankX,rankY,...]), node_id_B:...} 
     850        node_to_ranks = {} 
     851        for rank in ranks: 
     852            node_id = node_ids[mapping_dict[rank]] 
     853            if not node_to_ranks.has_key(node_id): 
     854                node_to_ranks[node_id] = set([]) 
     855            node_to_ranks[node_id].add(rank) 
     856 
     857        # we only handle two cases for now: 
     858        # 1. block regular 
     859        # 2. round-robin regular 
     860        # we do handle a "remainder node" that might not be full 
     861        delta = -1 
     862        max_ranks_per_node = 0 
     863        for node_id in node_to_ranks.keys(): 
     864            last_rank = -1 
     865            if len(node_to_ranks[node_id]) > max_ranks_per_node: 
     866                max_ranks_per_node = len(node_to_ranks[node_id]) 
     867            ranks = list(node_to_ranks[node_id]) 
     868            ranks.sort() 
     869            for rank in ranks: 
     870                if last_rank != -1: 
     871                    if delta == -1: 
     872                        if node_id == 0: 
     873                            delta = rank - last_rank 
     874                        else: 
     875                            # irregular case detected such as {0:A,1:B,2:B} 
     876                            mpd_print(1, "irregular case A detected") 
     877                            return '' 
     878                    elif (rank - last_rank) != delta: 
     879                        # irregular such as {0:A,1:B,2:A,3:A,4:B} 
     880                        mpd_print(1, "irregular case B detected") 
     881                        return '' 
     882                last_rank = rank 
     883 
     884        num_nodes = len(node_to_ranks.keys()) 
     885        if delta == 1: 
     886            return '(vector,(%d,%d,%d))' % (0,num_nodes,max_ranks_per_node) 
     887        else: 
     888            # either we are round-robin-regular (delta > 1) or there is only one 
     889            # process per node (delta == -1), either way results in the same 
     890            # mapping spec 
     891            return '(vector,(%d,%d,%d))' % (0,num_nodes,1) 
     892 
    829893    def handle_lhs_input(self,sock): 
    830894        msg = self.ring.lhsSock.recv_dict_msg() 
     
    842906                    self.currRingNCPUs = msg['ring_ncpus'] 
    843907                if msg['nstarted'] == msg['nprocs']: 
     908                    # we have started all processes in the job, tell the 
     909                    # requester this and stop forwarding the mpdrun/spawn 
     910                    # message around the loop 
    844911                    if msg['cmd'] == 'spawn': 
    845912                        self.spawnInProgress = 0 
     
    849916                                      'ring_ncpus' : self.currRingNCPUs} 
    850917                        self.conSock.send_dict_msg(msgToSend) 
     918                    # Tell all MPDs in the ring the final process mapping.  In 
     919                    # turn, they will inform all of their child mpdmans. 
     920                    # Only do this in the case of a regular mpdrun.  The spawn 
     921                    # case it too complicated to handle this way right now. 
     922                    if msg['cmd'] == 'mpdrun': 
     923                        process_mapping_str = self.calculate_process_mapping(msg['process_mapping']) 
     924                        msgToSend = { 'cmd' : 'process_mapping', 
     925                                      'jobid' : msg['jobid'], 
     926                                      'mpdid_mpdrun_start' : self.myId, 
     927                                      'process_mapping' : process_mapping_str } 
     928                        self.ring.rhsSock.send_dict_msg(msgToSend) 
    851929                    return 
    852930                if not msg['first_loop']  and  msg['nstarted_on_this_loop'] == 0: 
     
    873951                msg['nstarted_on_this_loop'] = 0 
    874952            self.do_mpdrun(msg) 
     953        elif msg['cmd'] == 'process_mapping': 
     954            # message transmission terminates once the message has made it all 
     955            # the way around the loop once 
     956            if msg['mpdid_mpdrun_start'] != self.myId: 
     957                self.ring.rhsSock.send_dict_msg(msg) # forward it on around 
     958 
     959            # send to all mpdman's for the jobid embedded in the msg 
     960            jobid = msg['jobid'] 
     961 
     962            # there may be no entry for jobid in the activeJobs table if there 
     963            # weren't any processes from that job actually launched on our host 
     964            if self.activeJobs.has_key(jobid): 
     965                for manPid in self.activeJobs[jobid].keys(): 
     966                    manSock = self.activeJobs[jobid][manPid]['socktoman'] 
     967                    manSock.send_dict_msg(msg) 
    875968        elif msg['cmd'] == 'mpdtrace_info': 
    876969            if msg['dest'] == self.myId: 
     
    11751268            except: 
    11761269                pass 
     1270 
    11771271        if msg.has_key('jobid'): 
    11781272            jobid = msg['jobid'] 
     
    11931287                    for rank in range(lorank,hirank+1): 
    11941288                        self.run_one_cli(rank,msg) 
     1289                        # we use myHost under the assumption that there is only 
     1290                        # one mpd per user on a given host.  The ifhn only 
     1291                        # affects how the MPDs communicate with each other, not 
     1292                        # which host they are on 
     1293                        msg['process_mapping'][rank] = self.myHost 
    11951294                        msg['nstarted'] += 1 
    11961295                        msg['nstarted_on_this_loop'] += 1 
     
    12051304                    if self.myIfhn in hostSpecPool  or  self.myHost in hostSpecPool: 
    12061305                        self.run_one_cli(lorank,msg) 
     1306                        msg['process_mapping'][lorank] = self.myHost 
    12071307                        msg['nstarted'] += 1 
    12081308                        msg['nstarted_on_this_loop'] += 1 
     
    12201320                        (lorank,hirank) = ranks 
    12211321                        self.run_one_cli(lorank,msg) 
     1322                        msg['process_mapping'][lorank] = self.myHost 
    12221323                        msg['nstarted'] += 1 
    12231324                        msg['nstarted_on_this_loop'] += 1 
     
    12251326                        if lorank < hirank: 
    12261327                            msg['hosts'][(lorank+1,hirank)] = '_any_' 
     1328                        # self.activeJobs maps: 
     1329                        # { jobid => { mpdman_pid => {...} } } 
    12271330                        procsHereForJob = len(self.activeJobs[jobid].keys()) 
    12281331                        if procsHereForJob >= self.parmdb['MPD_NCPUS']: 
  • mpich2/branches/release/mpich2-1.1/src/pm/mpd/mpdlib.py

    r4326 r4747  
    480480                    try: 
    481481                        mpd_signum = 0 
    482                         (readyToRecv,unused1,unused2) = select.select([self.sock],[],[],timeout) 
     482                        if timeout == -1: 
     483                            # use -1 to indicate indefinite timeout 
     484                            (readyToRecv,unused1,unused2) = select.select([self.sock],[],[]) 
     485                        else: 
     486                            (readyToRecv,unused1,unused2) = select.select([self.sock],[],[],timeout) 
    483487                        break; 
    484488                    except os.error, errinfo: 
  • mpich2/branches/release/mpich2-1.1/src/pm/mpd/mpdman.py

    r4421 r4747  
    374374                      'spawner_mpd' : os.environ['MPDMAN_SPAWNER_MPD'] } 
    375375        self.mpdSock.send_dict_msg(msgToSend) 
     376 
    376377        if not self.subproc: 
    377378            self.streamHandler.set_handler(self.fd_read_cli_stdout, 
     
    424425            # rshipSock.close() 
    425426            self.waitPids.append(rshipPid) 
     427 
     428        if not self.spawned: 
     429            # receive the final process mapping from our MPD overlords 
     430            msg = self.mpdSock.recv_dict_msg(timeout=-1) 
     431 
     432            # a few defensive checks now to make sure that the various parts of the 
     433            # code are all on the same page 
     434            if not msg.has_key('cmd') or msg['cmd'] != 'process_mapping': 
     435                mpd_print(1,'expected cmd="process_mapping", got cmd="%s" instead' % (msg.get('cmd','**not_present**'))) 
     436                sys.exit(-1) 
     437            if msg['jobid'] != self.jobid: 
     438                mpd_print(1,'expected jobid="%s", got jobid="%s" instead' % (self.jobid,msg['jobid'])) 
     439                sys.exit(-1) 
     440            if not msg.has_key('process_mapping'): 
     441                mpd_print(1,'expected msg to contain a process_mapping key') 
     442                sys.exit(-1) 
     443            self.KVSs[self.default_kvsname]['process-mapping'] = msg['process_mapping'] 
     444 
    426445 
    427446        self.tvReady = 0