Changeset 4747
- Timestamp:
- 06/16/09 16:39:07 (5 months ago)
- Location:
- mpich2/branches/release/mpich2-1.1/src
- Files:
-
- 5 modified
-
mpid/ch3/src/mpid_vc.c (modified) (6 diffs)
-
pm/mpd/Makefile.in (modified) (1 diff)
-
pm/mpd/mpd.py (modified) (11 diffs)
-
pm/mpd/mpdlib.py (modified) (1 diff)
-
pm/mpd/mpdman.py (modified) (2 diffs)
Legend:
- Unmodified
- Added
- Removed
-
mpich2/branches/release/mpich2-1.1/src/mpid/ch3/src/mpid_vc.c
r4390 r4747 21 21 #include <errno.h> 22 22 #endif 23 #include <ctype.h> 23 24 24 25 … … 797 798 #endif 798 799 800 801 #define parse_error() MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**intern", "**intern %s", "parse error") 802 /* advance _c until we find a non whitespace character */ 803 #define skip_space(_c) while (isspace(*(_c))) ++(_c) 804 /* return true iff _c points to a character valid as an indentifier, i.e., [-_a-zA-Z0-9] */ 805 #define isident(_c) (isalnum(_c) || (_c) == '-' || (_c) == '_') 806 807 /* give an error iff *_c != _e */ 808 #define expect_c(_c, _e) do { if (*(_c) != _e) parse_error(); } while (0) 809 #define expect_and_skip_c(_c, _e) do { expect_c(_c, _e); ++c; } while (0) 810 /* give an error iff the first |_m| characters of the string _s are equal to _e */ 811 #define expect_s(_s, _e) (strncmp(_s, _e, strlen(_e)) == 0 && !isident((_s)[strlen(_e)])) 812 813 typedef enum { 814 NULL_MAPPING = 0, 815 VECTOR_MAPPING 816 } mapping_type_t; 817 818 #define VECTOR "vector" 819 820 typedef struct map_block 821 { 822 int start_id; 823 int count; 824 int size; 825 } map_block_t; 826 827 #undef FUNCNAME 828 #define FUNCNAME parse_mapping 829 #undef FCNAME 830 #define FCNAME MPIDI_QUOTE(FUNCNAME) 831 static int parse_mapping(char *map_str, mapping_type_t *type, map_block_t **map, int *nblocks) 832 { 833 int mpi_errno = MPI_SUCCESS; 834 char *c = map_str, *d; 835 int num_blocks = 0; 836 int i; 837 MPIU_CHKPMEM_DECL(1); 838 MPIDI_STATE_DECL(MPID_STATE_PARSE_MAPPING); 839 840 MPIDI_FUNC_ENTER(MPID_STATE_PARSE_MAPPING); 841 842 /* parse string of the form: 843 '(' <format> ',' '(' <num> ',' <num> ',' <num> ')' {',' '(' <num> ',' <num> ',' <num> ')'} ')' 844 845 the values of each 3-tuple have the following meaning (X,Y,Z): 846 X - node id start value 847 Y - number of nodes with size Z 848 Z - number of processes assigned to each node 849 */ 850 MPIU_DBG_MSG_S(CH3_OTHER,VERBOSE,"parsing mapping string '%s'", map_str); 851 852 if (!strlen(map_str)) { 853 /* An empty-string indicates an inability to determine or express the 854 * process layout on the part of the process manager. Consider this a 855 * non-fatal error case. */ 856 *type = NULL_MAPPING; 857 *map = NULL; 858 *nblocks = 0; 859 goto fn_exit; 860 } 861 862 skip_space(c); 863 expect_and_skip_c(c, '('); 864 skip_space(c); 865 866 d = c; 867 if (expect_s(d, VECTOR)) 868 *type = VECTOR_MAPPING; 869 else 870 parse_error(); 871 c += strlen(VECTOR); 872 skip_space(c); 873 874 /* first count the number of block descriptors */ 875 d = c; 876 while (*d) { 877 if (*d == '(') 878 ++num_blocks; 879 ++d; 880 } 881 882 MPIU_CHKPMEM_MALLOC(*map, map_block_t *, sizeof(map_block_t) * num_blocks, mpi_errno, "map"); 883 884 /* parse block descriptors */ 885 for (i = 0; i < num_blocks; ++i) { 886 expect_and_skip_c(c, ','); 887 skip_space(c); 888 889 expect_and_skip_c(c, '('); 890 skip_space(c); 891 892 if (!isdigit(*c)) 893 parse_error(); 894 (*map)[i].start_id = strtol(c, &c, 0); 895 skip_space(c); 896 897 expect_and_skip_c(c, ','); 898 skip_space(c); 899 900 if (!isdigit(*c)) 901 parse_error(); 902 (*map)[i].count = strtol(c, &c, 0); 903 skip_space(c); 904 905 expect_and_skip_c(c, ','); 906 skip_space(c); 907 908 if (!isdigit(*c)) 909 parse_error(); 910 (*map)[i].size = strtol(c, &c, 0); 911 912 expect_and_skip_c(c, ')'); 913 skip_space(c); 914 } 915 916 expect_and_skip_c(c, ')'); 917 918 *nblocks = num_blocks; 919 MPIU_CHKPMEM_COMMIT(); 920 fn_exit: 921 MPIDI_FUNC_EXIT(MPID_STATE_PARSE_MAPPING); 922 return mpi_errno; 923 fn_fail: 924 MPIU_CHKPMEM_REAP(); 925 goto fn_exit; 926 } 927 928 #if 0 929 static void t(const char *s, int nprocs) 930 { 931 int ret; 932 map_block_t *mb; 933 int nblocks=0; 934 int i; 935 mapping_type_t mt = -1; 936 int rank; 937 int block, block_node, node_proc; 938 939 ret = parse_mapping(strdup(s), &mt, &mb, &nblocks); 940 printf("str=\"%s\" type=%d ret=%d\n", s, mt, ret); 941 if (ret) return; 942 for (i = 0; i < nblocks; ++i) 943 printf(" %d: start=%d size=%d count=%d\n", i, mb[i].start_id, mb[i].size, mb[i].count); 944 printf("\n"); 945 946 947 rank = 0; 948 while (rank < nprocs) { 949 int node_id; 950 for (block = 0; block < nblocks; ++block) { 951 node_id = mb[block].start_id; 952 for (block_node = 0; block_node < mb[block].count; ++block_node) { 953 for (node_proc = 0; node_proc < mb[block].size; ++node_proc) { 954 printf(" %d %d\n", rank, node_id); 955 ++rank; 956 if (rank == nprocs) 957 goto done; 958 } 959 ++node_id; 960 } 961 } 962 } 963 done: 964 return; 965 966 } 967 968 969 void test_parse_mapping(void) 970 { 971 t("(vector, (0,1,1))", 5); 972 t("(vector, (0,1,1), (1,5,3), (6,2, 5))", 100); 973 t("(vector, (1,1,1), (0,2,2))", 5); 974 975 t("(vector, (1,1,1), (0,2,2),)", 5); 976 t("XXX, (1,1))", 1); 977 t("vector, (1,1))", 1); 978 t("(vector, (1.11, 2,2))", 1); 979 t("", 1); 980 981 } 982 983 984 #endif 985 986 #undef FUNCNAME 987 #define FUNCNAME populate_ids_from_mapping 988 #undef FCNAME 989 #define FCNAME MPIDI_QUOTE(FUNCNAME) 990 static int populate_ids_from_mapping(char *mapping, int *num_nodes, MPIDI_PG_t *pg, int *did_map) 991 { 992 int mpi_errno = MPI_SUCCESS; 993 /* process-mapping is available */ 994 mapping_type_t mt = -1; 995 map_block_t *mb = NULL; 996 int nblocks = 0; 997 int rank; 998 int block, block_node, node_proc; 999 1000 *did_map = 1; /* reset upon failure */ 1001 1002 mpi_errno = parse_mapping(mapping, &mt, &mb, &nblocks); 1003 if (mpi_errno) MPIU_ERR_POP(mpi_errno); 1004 1005 if (NULL_MAPPING == mt) goto fn_fail; 1006 MPIU_ERR_CHKANDJUMP1(mt != VECTOR_MAPPING, mpi_errno, MPI_ERR_OTHER, "**intern", "**intern %s", "unsupported mapping type"); 1007 1008 rank = 0; 1009 /* for a representation like (block,N,(1,1)) this while loop causes us to 1010 * re-use that sole map block over and over until we have assigned node 1011 * ids to every process */ 1012 while (rank < pg->size) { 1013 for (block = 0; block < nblocks; ++block) { 1014 int node_id = mb[block].start_id; 1015 for (block_node = 0; block_node < mb[block].count; ++block_node) { 1016 if (node_id > *num_nodes) 1017 *num_nodes = node_id; 1018 1019 for (node_proc = 0; node_proc < mb[block].size; ++node_proc) { 1020 pg->vct[rank].node_id = node_id; 1021 ++rank; 1022 if (rank == pg->size) 1023 goto fn_exit; 1024 } 1025 ++node_id; 1026 } 1027 } 1028 } 1029 1030 fn_exit: 1031 ++(*num_nodes); /* add one to get the num instead of the max */ 1032 MPIU_Free(mb); 1033 return mpi_errno; 1034 fn_fail: 1035 *did_map = 0; 1036 goto fn_exit; 1037 } 1038 799 1039 /* Fills in the node_id info from PMI info. Adapted from MPIU_Get_local_procs. 800 1040 This function is collective over the entire PG because PMI_Barrier is called. 801 1041 802 1042 our_pg_rank should be set to -1 if this is not the current process' PG. This 803 1043 is currently not supported due to PMI limitations. 804 1044 805 Algorithm:806 1045 Fallback Algorithm: 1046 807 1047 Each process kvs_puts its hostname and stores the total number of 808 1048 processes (g_num_global). Each process determines the number of nodes 809 1049 (g_num_nodes) and assigns a node id to each process (g_node_ids[]): 810 1050 811 1051 For each hostname the process seaches the list of unique nodes 812 1052 names (node_names[]) for a match. If a match is found, the node id … … 814 1054 added to the list of node names. 815 1055 */ 1056 #undef FUNCNAME 1057 #define FUNCNAME MPIDI_Populate_vc_node_ids 1058 #undef FCNAME 1059 #define FCNAME MPIDI_QUOTE(FUNCNAME) 816 1060 int MPIDI_Populate_vc_node_ids(MPIDI_PG_t *pg, int our_pg_rank) 817 1061 { … … 822 1066 int i, j; 823 1067 char *key; 1068 char *value; 824 1069 int key_max_sz; 1070 int val_max_sz; 825 1071 char *kvs_name; 826 1072 char **node_names; … … 828 1074 int no_local = 0; 829 1075 int odd_even_cliques = 0; 830 MPIU_CHKLMEM_DECL(3); 831 832 #ifdef USE_PMI2_API 833 { 834 int *node_ids; 835 int outlen; 836 int found = FALSE; 837 MPIU_CHKLMEM_MALLOC(node_ids, int *, pg->size * sizeof(int), mpi_errno, "node_ids"); 838 839 mpi_errno = PMI_Info_GetJobAttrIntArray("nodeIDs", node_ids, pg->size, &outlen, &found); 840 if (mpi_errno) MPIU_ERR_POP(mpi_errno); 841 MPIU_ERR_CHKANDJUMP1(outlen != pg->size, mpi_errno, MPI_ERR_OTHER, "**intern", "**intern %s", "did not receive enough nodeids"); 842 g_num_nodes = 0; 843 for (i = 0; i < pg->size; ++i) { 844 pg->vct[i].node_id = node_ids[i]; 845 if (g_num_nodes < node_ids[i]) 846 g_num_nodes = node_ids[i]; 847 } 848 849 ++g_num_nodes; 850 851 /* FIXME: need to handle oddeven cliques DARIUS */ 852 } 853 #else 854 if (our_pg_rank == -1) { 855 /* FIXME this routine can't handle the dynamic process case at this 856 time. This will require more support from the process manager. */ 857 MPIU_Assert(0); 858 } 859 860 mpi_errno = publish_node_id(pg, our_pg_rank); 861 if (mpi_errno) MPIU_ERR_POP(mpi_errno); 1076 MPIU_CHKLMEM_DECL(4); 1077 1078 if (pg->size == 1) { 1079 pg->vct[0].node_id = g_num_nodes++; 1080 goto fn_exit; 1081 } 862 1082 863 1083 /* Used for debugging only. This disables communication over shared memory */ … … 889 1109 } 890 1110 891 /* Allocate space for pmi key */ 1111 #ifdef USE_PMI2_API 1112 #if 0 /* use nodeid list */ 1113 { 1114 int *node_ids; 1115 int outlen; 1116 int found = FALSE; 1117 MPIU_CHKLMEM_MALLOC(node_ids, int *, pg->size * sizeof(int), mpi_errno, "node_ids"); 1118 1119 mpi_errno = PMI_Info_GetJobAttrIntArray("nodeIDs", node_ids, pg->size, &outlen, &found); 1120 if (mpi_errno) MPIU_ERR_POP(mpi_errno); 1121 MPIU_ERR_CHKANDJUMP1(!found, mpi_errno, MPI_ERR_OTHER, "**intern", "**intern %s", "nodeIDs attribute not found"); 1122 MPIU_ERR_CHKANDJUMP1(outlen != pg->size, mpi_errno, MPI_ERR_OTHER, "**intern", "**intern %s", "did not receive enough nodeids"); 1123 g_num_nodes = 0; 1124 for (i = 0; i < pg->size; ++i) { 1125 pg->vct[i].node_id = node_ids[i]; 1126 if (g_num_nodes < node_ids[i]) 1127 g_num_nodes = node_ids[i]; 1128 } 1129 1130 ++g_num_nodes; 1131 1132 /* FIXME: need to handle oddeven cliques DARIUS */ 1133 } 1134 #else 1135 { 1136 char process_mapping[PMI_MAX_VALLEN]; 1137 int outlen; 1138 int found = FALSE; 1139 int i; 1140 map_block_t *mb; 1141 int nblocks; 1142 int rank; 1143 int block, block_node, node_proc; 1144 int did_map = 0; 1145 int num_nodes = 0; 1146 1147 mpi_errno = PMI_Info_GetJobAttr("process-mapping", process_mapping, sizeof(process_mapping), &found); 1148 if (mpi_errno) MPIU_ERR_POP(mpi_errno); 1149 MPIU_ERR_CHKANDJUMP1(!found, mpi_errno, MPI_ERR_OTHER, "**intern", "**intern %s", "process-mapping attribute not found"); 1150 /* this code currently assumes pg is comm_world */ 1151 mpi_errno = populate_ids_from_mapping(process_mapping, &num_nodes, pg, &did_map); 1152 if (mpi_errno) MPIU_ERR_POP(mpi_errno); 1153 MPIU_ERR_CHKANDJUMP1(!did_map, mpi_errno, MPI_ERR_OTHER, "**intern", "**intern %s", "unable to populate node ids from process-mapping"); 1154 g_num_nodes = num_nodes; 1155 } 1156 #endif 1157 #else /* USE_PMI2_API */ 1158 if (our_pg_rank == -1) { 1159 /* FIXME this routine can't handle the dynamic process case at this 1160 time. This will require more support from the process manager. */ 1161 MPIU_Assert(0); 1162 } 1163 1164 /* Allocate space for pmi key and value */ 892 1165 pmi_errno = PMI_KVS_Get_key_length_max(&key_max_sz); 893 1166 MPIU_ERR_CHKANDJUMP1(pmi_errno, mpi_errno, MPI_ERR_OTHER, "**fail", "**fail %d", pmi_errno); 894 895 1167 MPIU_CHKLMEM_MALLOC(key, char *, key_max_sz, mpi_errno, "key"); 896 1168 1169 pmi_errno = PMI_KVS_Get_value_length_max(&val_max_sz); 1170 MPIU_ERR_CHKANDJUMP1(pmi_errno, mpi_errno, MPI_ERR_OTHER, "**fail", "**fail %d", pmi_errno); 1171 MPIU_CHKLMEM_MALLOC(value, char *, val_max_sz, mpi_errno, "value"); 1172 897 1173 mpi_errno = MPIDI_PG_GetConnKVSname(&kvs_name); 1174 if (mpi_errno) MPIU_ERR_POP(mpi_errno); 1175 1176 /* See if process manager supports process-mapping keyval */ 1177 1178 /* FIXME 'process-mapping' only applies for the original PG (MPI_COMM_WORLD) */ 1179 pmi_errno = PMI_KVS_Get(kvs_name, "process-mapping", value, val_max_sz); 1180 if (pmi_errno == 0) { 1181 int did_map = 0; 1182 int num_nodes = 0; 1183 /* this code currently assumes pg is comm_world */ 1184 mpi_errno = populate_ids_from_mapping(value, &num_nodes, pg, &did_map); 1185 if (mpi_errno) MPIU_ERR_POP(mpi_errno); 1186 g_num_nodes = num_nodes; 1187 if (did_map) { 1188 goto fn_exit; 1189 } 1190 else { 1191 MPIU_DBG_MSG_S(CH3_OTHER,TERSE,"did_map==0, unable to populate node ids from mapping=%s",value); 1192 } 1193 /* else fall through to O(N^2) PMI_KVS_Gets version */ 1194 } 1195 else { 1196 MPIU_DBG_MSG(CH3_OTHER,TERSE,"unable to obtain the 'process-mapping' PMI key"); 1197 } 1198 1199 mpi_errno = publish_node_id(pg, our_pg_rank); 898 1200 if (mpi_errno) MPIU_ERR_POP(mpi_errno); 899 1201 -
mpich2/branches/release/mpich2-1.1/src/pm/mpd/Makefile.in
r1225 r4747 86 86 @-rm -f *.o *.dep *~ ${EXAMPLES} core* *.1 *.pyc 87 87 @-rm -f *.gcno *.gcda *.bb *.bbg 88 @-rm -f ${srcdir}/*.gcno ${s crdir}/*.gcda88 @-rm -f ${srcdir}/*.gcno ${srcdir}/*.gcda 89 89 90 90 # echo "some of the following might be executables you want to remove" -
mpich2/branches/release/mpich2-1.1/src/pm/mpd/mpd.py
r4326 r4747 631 631 msg['ringsize'] = 0 632 632 msg['ring_ncpus'] = 0 633 # maps rank => hostname 634 msg['process_mapping'] = {} 633 635 if msg.has_key('try_1st_locally'): 634 636 self.do_mpdrun(msg) … … 798 800 msg['totalview'] = 0 799 801 msg['ifhns'] = {} 802 # maps rank => hostname 803 msg['process_mapping'] = {} 800 804 self.spawnQ.append(msg) 801 805 elif msg['cmd'] == 'publish_name': … … 827 831 sock.send_dict_msg(msgToSend) 828 832 833 def calculate_process_mapping(self,mapping_dict): 834 # mapping_dict maps ranks => hostnames 835 ranks = list(mapping_dict.keys()) 836 ranks.sort() 837 838 # assign node ids based in first-come-first-serve order when iterating 839 # over the ranks in increasing order 840 next_id = 0 841 node_ids = {} 842 for rank in ranks: 843 host = mapping_dict[rank] 844 if not node_ids.has_key(host): 845 node_ids[host] = next_id 846 next_id += 1 847 848 849 # maps {node_id_A: set([rankX,rankY,...]), node_id_B:...} 850 node_to_ranks = {} 851 for rank in ranks: 852 node_id = node_ids[mapping_dict[rank]] 853 if not node_to_ranks.has_key(node_id): 854 node_to_ranks[node_id] = set([]) 855 node_to_ranks[node_id].add(rank) 856 857 # we only handle two cases for now: 858 # 1. block regular 859 # 2. round-robin regular 860 # we do handle a "remainder node" that might not be full 861 delta = -1 862 max_ranks_per_node = 0 863 for node_id in node_to_ranks.keys(): 864 last_rank = -1 865 if len(node_to_ranks[node_id]) > max_ranks_per_node: 866 max_ranks_per_node = len(node_to_ranks[node_id]) 867 ranks = list(node_to_ranks[node_id]) 868 ranks.sort() 869 for rank in ranks: 870 if last_rank != -1: 871 if delta == -1: 872 if node_id == 0: 873 delta = rank - last_rank 874 else: 875 # irregular case detected such as {0:A,1:B,2:B} 876 mpd_print(1, "irregular case A detected") 877 return '' 878 elif (rank - last_rank) != delta: 879 # irregular such as {0:A,1:B,2:A,3:A,4:B} 880 mpd_print(1, "irregular case B detected") 881 return '' 882 last_rank = rank 883 884 num_nodes = len(node_to_ranks.keys()) 885 if delta == 1: 886 return '(vector,(%d,%d,%d))' % (0,num_nodes,max_ranks_per_node) 887 else: 888 # either we are round-robin-regular (delta > 1) or there is only one 889 # process per node (delta == -1), either way results in the same 890 # mapping spec 891 return '(vector,(%d,%d,%d))' % (0,num_nodes,1) 892 829 893 def handle_lhs_input(self,sock): 830 894 msg = self.ring.lhsSock.recv_dict_msg() … … 842 906 self.currRingNCPUs = msg['ring_ncpus'] 843 907 if msg['nstarted'] == msg['nprocs']: 908 # we have started all processes in the job, tell the 909 # requester this and stop forwarding the mpdrun/spawn 910 # message around the loop 844 911 if msg['cmd'] == 'spawn': 845 912 self.spawnInProgress = 0 … … 849 916 'ring_ncpus' : self.currRingNCPUs} 850 917 self.conSock.send_dict_msg(msgToSend) 918 # Tell all MPDs in the ring the final process mapping. In 919 # turn, they will inform all of their child mpdmans. 920 # Only do this in the case of a regular mpdrun. The spawn 921 # case it too complicated to handle this way right now. 922 if msg['cmd'] == 'mpdrun': 923 process_mapping_str = self.calculate_process_mapping(msg['process_mapping']) 924 msgToSend = { 'cmd' : 'process_mapping', 925 'jobid' : msg['jobid'], 926 'mpdid_mpdrun_start' : self.myId, 927 'process_mapping' : process_mapping_str } 928 self.ring.rhsSock.send_dict_msg(msgToSend) 851 929 return 852 930 if not msg['first_loop'] and msg['nstarted_on_this_loop'] == 0: … … 873 951 msg['nstarted_on_this_loop'] = 0 874 952 self.do_mpdrun(msg) 953 elif msg['cmd'] == 'process_mapping': 954 # message transmission terminates once the message has made it all 955 # the way around the loop once 956 if msg['mpdid_mpdrun_start'] != self.myId: 957 self.ring.rhsSock.send_dict_msg(msg) # forward it on around 958 959 # send to all mpdman's for the jobid embedded in the msg 960 jobid = msg['jobid'] 961 962 # there may be no entry for jobid in the activeJobs table if there 963 # weren't any processes from that job actually launched on our host 964 if self.activeJobs.has_key(jobid): 965 for manPid in self.activeJobs[jobid].keys(): 966 manSock = self.activeJobs[jobid][manPid]['socktoman'] 967 manSock.send_dict_msg(msg) 875 968 elif msg['cmd'] == 'mpdtrace_info': 876 969 if msg['dest'] == self.myId: … … 1175 1268 except: 1176 1269 pass 1270 1177 1271 if msg.has_key('jobid'): 1178 1272 jobid = msg['jobid'] … … 1193 1287 for rank in range(lorank,hirank+1): 1194 1288 self.run_one_cli(rank,msg) 1289 # we use myHost under the assumption that there is only 1290 # one mpd per user on a given host. The ifhn only 1291 # affects how the MPDs communicate with each other, not 1292 # which host they are on 1293 msg['process_mapping'][rank] = self.myHost 1195 1294 msg['nstarted'] += 1 1196 1295 msg['nstarted_on_this_loop'] += 1 … … 1205 1304 if self.myIfhn in hostSpecPool or self.myHost in hostSpecPool: 1206 1305 self.run_one_cli(lorank,msg) 1306 msg['process_mapping'][lorank] = self.myHost 1207 1307 msg['nstarted'] += 1 1208 1308 msg['nstarted_on_this_loop'] += 1 … … 1220 1320 (lorank,hirank) = ranks 1221 1321 self.run_one_cli(lorank,msg) 1322 msg['process_mapping'][lorank] = self.myHost 1222 1323 msg['nstarted'] += 1 1223 1324 msg['nstarted_on_this_loop'] += 1 … … 1225 1326 if lorank < hirank: 1226 1327 msg['hosts'][(lorank+1,hirank)] = '_any_' 1328 # self.activeJobs maps: 1329 # { jobid => { mpdman_pid => {...} } } 1227 1330 procsHereForJob = len(self.activeJobs[jobid].keys()) 1228 1331 if procsHereForJob >= self.parmdb['MPD_NCPUS']: -
mpich2/branches/release/mpich2-1.1/src/pm/mpd/mpdlib.py
r4326 r4747 480 480 try: 481 481 mpd_signum = 0 482 (readyToRecv,unused1,unused2) = select.select([self.sock],[],[],timeout) 482 if timeout == -1: 483 # use -1 to indicate indefinite timeout 484 (readyToRecv,unused1,unused2) = select.select([self.sock],[],[]) 485 else: 486 (readyToRecv,unused1,unused2) = select.select([self.sock],[],[],timeout) 483 487 break; 484 488 except os.error, errinfo: -
mpich2/branches/release/mpich2-1.1/src/pm/mpd/mpdman.py
r4421 r4747 374 374 'spawner_mpd' : os.environ['MPDMAN_SPAWNER_MPD'] } 375 375 self.mpdSock.send_dict_msg(msgToSend) 376 376 377 if not self.subproc: 377 378 self.streamHandler.set_handler(self.fd_read_cli_stdout, … … 424 425 # rshipSock.close() 425 426 self.waitPids.append(rshipPid) 427 428 if not self.spawned: 429 # receive the final process mapping from our MPD overlords 430 msg = self.mpdSock.recv_dict_msg(timeout=-1) 431 432 # a few defensive checks now to make sure that the various parts of the 433 # code are all on the same page 434 if not msg.has_key('cmd') or msg['cmd'] != 'process_mapping': 435 mpd_print(1,'expected cmd="process_mapping", got cmd="%s" instead' % (msg.get('cmd','**not_present**'))) 436 sys.exit(-1) 437 if msg['jobid'] != self.jobid: 438 mpd_print(1,'expected jobid="%s", got jobid="%s" instead' % (self.jobid,msg['jobid'])) 439 sys.exit(-1) 440 if not msg.has_key('process_mapping'): 441 mpd_print(1,'expected msg to contain a process_mapping key') 442 sys.exit(-1) 443 self.KVSs[self.default_kvsname]['process-mapping'] = msg['process_mapping'] 444 426 445 427 446 self.tvReady = 0
