Changeset 4931
- Timestamp:
- 07/09/09 15:00:30 (8 months ago)
- Location:
- mpich2/branches/dev/dkim/src/pm/hydra
- Files:
-
- 9 modified
-
include/hydra_utils.h (modified) (1 diff)
-
pm/pmiserv/pmi_proxy_cb.c (modified) (2 diffs)
-
pm/pmiserv/pmi_proxy_utils.c (modified) (4 diffs)
-
ui/mpiexec/callback.c (modified) (2 diffs)
-
ui/mpiexec/mpiexec.c (modified) (4 diffs)
-
ui/mpiexec/mpiexec.h (modified) (1 diff)
-
ui/mpiexec/utils.c (modified) (8 diffs)
-
ui/utils/uiu.c (modified) (1 diff)
-
utils/sock/sock.c (modified) (4 diffs)
Legend:
- Unmodified
- Added
- Removed
-
mpich2/branches/dev/dkim/src/pm/hydra/include/hydra_utils.h
r4908 r4931 230 230 HYD_Status HYDU_sock_set_nonblock(int fd); 231 231 HYD_Status HYDU_sock_set_cloexec(int fd); 232 HYD_Status HYDU_sock_stdout_cb(int fd, HYD_Event_t events, int stdout_fd, int *closed); 233 HYD_Status HYDU_sock_stdout_file_cb(int fd, HYD_Event_t events, int stdout_fd, int *closed, int message_length); 234 /*HYD_Status HYDU_sock_stderr_file_cb(int fd, HYD_Event_t events, int stdout_fd, int *closed, int message_length);*/ 232 HYD_Status HYDU_sock_stdout_cb(int fd, HYD_Event_t events, int stdout_fd, int *closed, int message_length); 235 233 HYD_Status HYDU_sock_stdin_cb(int fd, HYD_Event_t events, int stdin_fd, char *buf, 236 234 int *buf_count, int *buf_offset, int *closed); -
mpich2/branches/dev/dkim/src/pm/hydra/pm/pmiserv/pmi_proxy_cb.c
r4912 r4931 128 128 HYDU_ERR_POP(status, "make header and message error\n"); 129 129 130 if (pmi_header.message_length >= 0) 131 { 132 status = HYD_PMCD_pmi_proxy_write_header(HYD_PMCD_pmi_proxy_params.upstream.out, &pmi_header); 133 HYDU_ERR_POP(status, "header write error\n"); 134 status = HYD_PMCD_pmi_proxy_write_message(HYD_PMCD_pmi_proxy_params.upstream.out, buf, &pmi_header); 135 HYDU_ERR_POP(status, "message write error\n"); 136 } 130 status = HYD_PMCD_pmi_proxy_write_header(HYD_PMCD_pmi_proxy_params.upstream.out, &pmi_header); 131 HYDU_ERR_POP(status, "header write error\n"); 132 133 status = HYD_PMCD_pmi_proxy_write_message(HYD_PMCD_pmi_proxy_params.upstream.out, buf, &pmi_header); 134 HYDU_ERR_POP(status, "message write error\n"); 137 135 138 136 if (pmi_header.message_length == 0) 139 137 closed = 1; 140 141 /*status = HYDU_sock_stdout_cb(fd, events, HYD_PMCD_pmi_proxy_params.upstream.out, &closed);142 HYDU_ERR_POP(status, "stdout callback error\n");*/143 138 144 139 if (closed) { … … 183 178 HYDU_ERR_POP(status, "make header and message error\n"); 184 179 185 if (pmi_header.message_length >= 0) 186 { 187 status = HYD_PMCD_pmi_proxy_write_header(HYD_PMCD_pmi_proxy_params.upstream.err, &pmi_header); 188 HYDU_ERR_POP(status, "header write error\n"); 189 status = HYD_PMCD_pmi_proxy_write_message(HYD_PMCD_pmi_proxy_params.upstream.err, buf, &pmi_header); 190 HYDU_ERR_POP(status, "message write error\n"); 191 } 180 status = HYD_PMCD_pmi_proxy_write_header(HYD_PMCD_pmi_proxy_params.upstream.err, &pmi_header); 181 HYDU_ERR_POP(status, "header write error\n"); 182 183 status = HYD_PMCD_pmi_proxy_write_message(HYD_PMCD_pmi_proxy_params.upstream.err, buf, &pmi_header); 184 HYDU_ERR_POP(status, "message write error\n"); 192 185 193 186 if (pmi_header.message_length == 0) 194 187 closed = 1; 195 196 /*status = HYDU_sock_stdout_cb(fd, events, HYD_PMCD_pmi_proxy_params.upstream.err, &closed);197 HYDU_ERR_POP(status, "stdout callback error\n");*/198 188 199 189 if (closed) { -
mpich2/branches/dev/dkim/src/pm/hydra/pm/pmiserv/pmi_proxy_utils.c
r4910 r4931 671 671 672 672 if (i == HYD_PMCD_pmi_proxy_params.exec_proc_count) 673 pmi_header->pmi_id = -1;673 HYDU_ERR_SETANDJUMP(status, HYD_INVALID_PARAM, "no matching fd\n"); 674 674 675 675 pmi_header->pmi_id = HYDU_local_to_global_id(i, HYD_PMCD_pmi_proxy_params.partition_core_count, HYD_PMCD_pmi_proxy_params.segment_list, HYD_PMCD_pmi_proxy_params.global_core_count); … … 699 699 { 700 700 int i, written; 701 const int intsize = sizeof(int); 701 702 HYD_Status status = HYD_SUCCESS; 702 703 … … 704 705 705 706 i = 0; 706 while (i != sizeof(int))707 while (i != intsize) 707 708 { 708 written = write(upstream_fd, (char *)&(pmi_header->pmi_id) + i, sizeof(int) - i); 709 710 if (written < 0 && errno != EAGAIN) 711 HYDU_ERR_SETANDJUMP2(status, HYD_SOCK_ERROR, "write error on %d (%s)\n", upstream_fd, HYDU_strerror(errno)); 709 written = write(upstream_fd, (char *)&(pmi_header->pmi_id) + i, intsize - i); 712 710 713 711 if (written > 0) 714 712 i += written; 713 else if (written < 0 && errno != EAGAIN) 714 HYDU_ERR_SETANDJUMP2(status, HYD_SOCK_ERROR, "write error on %d (%s)\n", upstream_fd, HYDU_strerror(errno)); 715 715 } 716 716 717 717 i = 0; 718 while (i != sizeof(int))718 while (i != intsize) 719 719 { 720 written = write(upstream_fd, (char *)&(pmi_header->message_length) + i, sizeof(int) - i); 721 722 if (written < 0 && errno != EAGAIN) 723 HYDU_ERR_SETANDJUMP2(status, HYD_SOCK_ERROR, "write error on %d (%s)\n", upstream_fd, HYDU_strerror(errno)); 720 written = write(upstream_fd, (char *)&(pmi_header->message_length) + i, intsize - i); 724 721 725 722 if (written > 0) 726 723 i += written; 724 else if (written < 0 && errno != EAGAIN) 725 HYDU_ERR_SETANDJUMP2(status, HYD_SOCK_ERROR, "write error on %d (%s)\n", upstream_fd, HYDU_strerror(errno)); 727 726 } 728 727 … … 747 746 written = write(upstream_fd, buf + i, pmi_header->message_length - i); 748 747 749 if (written < 0 && errno != EAGAIN) 748 if (written > 0) 749 { 750 i +=written; 751 } 752 else if (written < 0 && errno != EAGAIN) 750 753 { 751 754 HYDU_ERR_SETANDJUMP2(status, HYD_SOCK_ERROR, "write error on %d (%s)\n", upstream_fd, HYDU_strerror(errno)); 752 755 } 753 else if (written == 0)754 {755 break;756 }757 else758 i += written;759 756 } 760 757 -
mpich2/branches/dev/dkim/src/pm/hydra/ui/mpiexec/callback.c
r4921 r4931 50 50 HYDU_FUNC_ENTER(); 51 51 52 HYD_UII_mpx_read_header(fd, &rank, &message_length); 52 status = HYD_UII_mpx_read_header(fd, &rank, &message_length); 53 HYDU_ERR_SETANDJUMP2(status, status, "header read error on fd %d: %s\n", fd, HYDU_strerror(errno)); 53 54 54 55 if (rank < 0 || rank > HYD_handle.process_count) 55 { 56 goto fn_fail; 57 } 56 HYDU_ERR_SETANDJUMP2(status, HYD_INVALID_PARAM, "invalid rank %d returned on fd %d\n", rank, fd); 58 57 59 /* Write output to stdout fd */58 /* Write output to appropriate stdout fd */ 60 59 if (HYD_handle.stdout_file && HYD_handle.multiple_stdout_files) 61 status = HYDU_sock_stdout_ file_cb(fd, events, HYD_handle.stdout_file_fds[rank], &closed, message_length);60 status = HYDU_sock_stdout_cb(fd, events, HYD_handle.stdout_file_fds[rank], &closed, message_length); 62 61 else if (HYD_handle.stdout_file) 63 status = HYDU_sock_stdout_ file_cb(fd, events, HYD_handle.stdout_file_fds[0], &closed, message_length);62 status = HYDU_sock_stdout_cb(fd, events, HYD_handle.stdout_file_fds[0], &closed, message_length); 64 63 else 65 status = HYDU_sock_stdout_ file_cb(fd, events, 1, &closed, message_length);64 status = HYDU_sock_stdout_cb(fd, events, 1, &closed, message_length); 66 65 67 66 HYDU_ERR_SETANDJUMP2(status, status, "stdout callback error on fd %d: %s\n", … … 90 89 HYDU_FUNC_ENTER(); 91 90 92 HYD_UII_mpx_read_header(fd, &rank, &message_length); 91 status = HYD_UII_mpx_read_header(fd, &rank, &message_length); 92 HYDU_ERR_SETANDJUMP2(status, status, "header read error on fd %d: %s\n", fd, HYDU_strerror(errno)); 93 93 94 94 if (rank < 0 || rank > HYD_handle.process_count) 95 { 96 goto fn_fail; 97 } 95 HYDU_ERR_SETANDJUMP2(status, HYD_INVALID_PARAM, "invalid rank %d returned on fd %d\n", rank, fd); 98 96 99 /* Write output to stderr fd */97 /* Write output to appropriate stderr fd */ 100 98 if (HYD_handle.stderr_file && HYD_handle.multiple_stderr_files) 101 status = HYDU_sock_stdout_ file_cb(fd, events, HYD_handle.stderr_file_fds[rank], &closed, message_length);99 status = HYDU_sock_stdout_cb(fd, events, HYD_handle.stderr_file_fds[rank], &closed, message_length); 102 100 else if (HYD_handle.stderr_file) 103 status = HYDU_sock_stdout_ file_cb(fd, events, HYD_handle.stderr_file_fds[0], &closed, message_length);101 status = HYDU_sock_stdout_cb(fd, events, HYD_handle.stderr_file_fds[0], &closed, message_length); 104 102 else 105 status = HYDU_sock_stdout_ file_cb(fd, events, 2, &closed, message_length);103 status = HYDU_sock_stdout_cb(fd, events, 2, &closed, message_length); 106 104 107 105 HYDU_ERR_SETANDJUMP2(status, status, "stdout callback error on %d (%s)\n", -
mpich2/branches/dev/dkim/src/pm/hydra/ui/mpiexec/mpiexec.c
r4921 r4931 247 247 /* If output from ALL processes are dumped into a single file */ 248 248 else if (HYD_handle.stdout_file) { 249 partition = HYD_handle.partition_list; 249 250 HYD_handle.stdout_file_fds = (int *) malloc(sizeof(int)); 250 HYD_handle.stdout_file_fds[0] = -1; 251 252 name = make_filename(HYD_handle.stdout_file, partition->base->name, partition->base->partition_id, HYD_handle.process_count); 253 HYD_handle.stdout_file_fds[0] = open(name, O_RDWR | O_CREAT, 0644); 254 HYDU_FREE(name); 251 255 } 252 256 … … 271 275 /* If error from ALL processes are dumped into a single file */ 272 276 else if (HYD_handle.stderr_file) { 277 partition = HYD_handle.partition_list; 273 278 HYD_handle.stderr_file_fds = (int *) malloc(sizeof(int)); 274 HYD_handle.stderr_file_fds[0] = -1; 279 280 name = make_filename(HYD_handle.stderr_file, partition->base->name, partition->base->partition_id, HYD_handle.process_count); 281 HYD_handle.stderr_file_fds[0] = open(name, O_RDWR | O_CREAT, 0644); 282 HYDU_FREE(name); 275 283 } 276 284 277 285 FORALL_ACTIVE_PARTITIONS(partition, HYD_handle.partition_list) { 278 286 if (partition->base->out != -1) { 279 /* If output from ALL processes are dumped into a single file */280 if (HYD_handle.stdout_file && HYD_handle.stdout_file_fds[0] == -1)281 {282 name = make_filename(HYD_handle.stdout_file, partition->base->name, partition->base->partition_id, HYD_handle.process_count);283 284 close(1);285 HYD_handle.stdout_file_fds[0] = open(name, O_RDWR | O_CREAT, 0644);286 HYDU_FREE(name);287 }288 289 287 status = HYD_DMX_register_fd(1, &partition->base->out, HYD_STDOUT, NULL, 290 288 HYD_UII_mpx_stdout_cb); … … 293 291 294 292 if (partition->base->err != -1) { 295 /* If error from ALL processes are dumped into a single file */296 if (HYD_handle.stderr_file && HYD_handle.stderr_file_fds[0] == -1)297 {298 name = make_filename(HYD_handle.stderr_file, partition->base->name, partition->base->partition_id, HYD_handle.process_count);299 300 close(2);301 HYD_handle.stderr_file_fds[0] = open(name, O_RDWR | O_CREAT, 0644);302 HYDU_FREE(name);303 }304 305 293 status = HYD_DMX_register_fd(1, &partition->base->err, HYD_STDOUT, NULL, 306 294 HYD_UII_mpx_stderr_cb); … … 346 334 HYDU_ERR_POP(status, "process manager error on finalize\n"); 347 335 348 if (HYD_handle.stdout_file_fds)349 HYDU_FREE(HYD_handle.stdout_file_fds);350 351 if (HYD_handle.stderr_file_fds)352 HYDU_FREE(HYD_handle.stderr_file_fds);353 354 336 /* Free the mpiexec params */ 355 337 HYD_UIU_free_params(); -
mpich2/branches/dev/dkim/src/pm/hydra/ui/mpiexec/mpiexec.h
r4921 r4931 15 15 HYD_Status HYD_UII_mpx_stdin_cb(int fd, HYD_Event_t events, void *userp); 16 16 char *make_filename(char *input, char *hostname, int rank, int proc_count); 17 voidHYD_UII_mpx_read_header(int fd, int *rank, int *message_length);17 HYD_Status HYD_UII_mpx_read_header(int fd, int *rank, int *message_length); 18 18 19 19 #endif /* MPIEXEC_H_INCLUDED */ -
mpich2/branches/dev/dkim/src/pm/hydra/ui/mpiexec/utils.c
r4921 r4931 40 40 } 41 41 42 /* parses filename argument and returns the final formatted filename */ 42 43 char *make_filename(char *input, char *hostname, int rank, int proc_count) 43 44 { … … 47 48 char *hostnameptr = NULL; 48 49 char *temp = NULL; 50 const int charsize = sizeof(char); 49 51 HYD_Status status = HYD_SUCCESS; 50 52 … … 53 55 hostnameptr = strstr(input, "%q"); 54 56 55 /* find how many digits proc_count is */ /* is this necessary? */57 /* find how many digits proc_count is */ 56 58 divten = proc_count / 10; 57 59 … … 62 64 } 63 65 66 difference = num_digits - strlen(rankstr); 67 64 68 /* create a string for rank which contains leading zeroes * 65 * ie - 001 instead of just 1 for a job running 100-1000 * 66 * processes. This makes reading through the list of * 67 * output files much easier as they are sorted correctly */ 68 difference = num_digits - strlen(rankstr); 69 69 * ie - 001 instead of just 1 for a job running 1000 * 70 * processes. This makes reading through the list of * 71 * output files much easier as they are sorted correctly */ 70 72 if (difference > 0) 71 73 { 72 temp = (char *) calloc(num_digits + 1, sizeof(char));74 temp = (char *) calloc(num_digits + 1, charsize); 73 75 74 76 i = 0; … … 92 94 93 95 /* allocate - use calloc to initialize to zero */ 94 char *filename = (char *) calloc(size, sizeof(char));96 char *filename = (char *) calloc(size, charsize); 95 97 96 98 /* read each character from the input one at a time and put into * … … 101 103 while (input[i + l] && i + l < size) 102 104 { 105 /* if current position in array does not match a flag location */ 103 106 if (&input[i + l] != rankptr && &input[i + l] != hostnameptr) 104 107 { 105 108 filename[i + k] = input[i + l]; 106 109 } 110 /* if current position matches location of '%p' */ 107 111 else if (&input[i + l] == rankptr) 108 112 { … … 115 119 l++; 116 120 } 121 /* current position matches '%q' */ 117 122 else 118 123 { … … 133 138 } 134 139 135 voidHYD_UII_mpx_read_header(int fd, int *rank, int *message_length)140 HYD_Status HYD_UII_mpx_read_header(int fd, int *rank, int *message_length) 136 141 { 137 142 int i, ret; 143 const int intsize = sizeof(int); 144 HYD_Status status = HYD_SUCCESS; 145 146 HYDU_FUNC_ENTER() 138 147 139 148 i = 0; 140 while (i != sizeof(int))149 while (i != intsize) 141 150 { 142 ret = read(fd, (char *)rank + i, sizeof(int) - i); 143 144 /*if (ret < 0 && errno != EAGAIN) 145 { 146 /* ERROR MESSAGE HERE */ 147 /*break;*/ 148 /*}*/ 151 ret = read(fd, (char *)rank + i, intsize - i); 149 152 150 153 if (ret > 0) 151 i += ret; 152 else 153 break; 154 { 155 i += ret; 156 } 157 else if (ret < 0 && errno != EAGAIN) 158 { 159 HYDU_ERR_SETANDJUMP2(status, HYD_SOCK_ERROR, "read error on %d (%s)\n", fd, HYDU_strerror(errno)); 160 } 161 else if (ret == 0 && i != intsize) 162 { 163 HYDU_ERR_SETANDJUMP2(status, HYD_SOCK_ERROR, "socket closed before reading %d expected bytes on fd %d\n", intsize, fd); 164 } 154 165 } 155 166 156 167 i = 0; 157 while (i != sizeof(int))168 while (i != intsize) 158 169 { 159 ret = read(fd, (char *)message_length + i, sizeof(int) - i); 160 161 /*if (ret < 0 && errno != EAGAIN) 162 { 163 /* ERROR MESSAGE HERE */ 164 /*break;*/ 165 /*}*/ 170 ret = read(fd, (char *)message_length + i, intsize - i); 166 171 167 172 if (ret > 0) 168 i += ret; 169 else 170 break; 173 { 174 i += ret; 175 } 176 else if (ret < 0 && errno != EAGAIN) 177 { 178 HYDU_ERR_SETANDJUMP2(status, HYD_SOCK_ERROR, "read error on %d (%s)\n", fd, HYDU_strerror(errno)); 179 } 180 else if (ret == 0 && i != intsize) 181 { 182 HYDU_ERR_SETANDJUMP2(status, HYD_SOCK_ERROR, "socket closed before reading %d expected bytes on fd %d\n", intsize, fd); 183 } 171 184 } 185 186 fn_exit: 187 HYDU_FUNC_EXIT(); 188 return status; 189 190 fn_fail: 191 goto fn_exit; 172 192 } 173 193 -
mpich2/branches/dev/dkim/src/pm/hydra/ui/utils/uiu.c
r4908 r4931 66 66 void HYD_UIU_free_params(void) 67 67 { 68 if (HYD_handle.stdout_file_fds) 69 { 70 if (HYD_handle.multiple_stdout_files) 71 { 72 int i; 73 for (i = 0; i < HYD_handle.process_count; i++) 74 if (HYD_handle.stdout_file_fds[i] != -1) 75 close(HYD_handle.stdout_file_fds[i]); 76 } 77 else if (HYD_handle.stdout_file_fds[0] != -1) 78 close(HYD_handle.stdout_file_fds[0]); 79 80 HYDU_FREE(HYD_handle.stdout_file_fds); 81 } 82 83 if (HYD_handle.stderr_file_fds) 84 { 85 if (HYD_handle.multiple_stderr_files) 86 { 87 int i; 88 for (i = 0; i < HYD_handle.process_count; i++) 89 if (HYD_handle.stderr_file_fds[i] != -1) 90 close(HYD_handle.stderr_file_fds[i]); 91 } 92 else if (HYD_handle.stderr_file_fds[0] != -1) 93 close(HYD_handle.stderr_file_fds[0]); 94 95 HYDU_FREE(HYD_handle.stderr_file_fds); 96 } 97 68 98 if (HYD_handle.stdout_file) 69 99 HYDU_FREE(HYD_handle.stdout_file); -
mpich2/branches/dev/dkim/src/pm/hydra/utils/sock/sock.c
r4920 r4931 376 376 } 377 377 378 379 HYD_Status HYDU_sock_stdout_cb(int fd, HYD_Event_t events, int stdout_fd, int *closed) 378 HYD_Status HYDU_sock_stdout_cb(int fd, HYD_Event_t events, int stdout_fd, int *closed, int message_length) 380 379 { 381 380 int count, written, ret; … … 390 389 HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR, "stdout handler got stdin event\n"); 391 390 392 count = read(fd, buf, HYD_TMPBUF_SIZE);393 if (count < 0) {394 HYDU_ERR_SETANDJUMP2(status, HYD_SOCK_ERROR, "read error on %d (%s)\n",395 fd, HYDU_strerror(errno));396 }397 else if (count == 0) {398 /* The connection has closed */399 *closed = 1;400 goto fn_exit;401 }402 403 written = 0;404 while (written != count) {405 ret = write(stdout_fd, buf + written, count - written);406 if (ret < 0 && errno != EAGAIN)407 HYDU_ERR_SETANDJUMP2(status, HYD_SOCK_ERROR, "write error on %d (%s)\n",408 stdout_fd, HYDU_strerror(errno));409 if (ret > 0)410 written += ret;411 }412 413 fn_exit:414 HYDU_FUNC_EXIT();415 return status;416 417 fn_fail:418 goto fn_exit;419 }420 421 HYD_Status HYDU_sock_stdout_file_cb(int fd, HYD_Event_t events, int stdout_fd, int *closed, int message_length)422 {423 int count, written, ret;424 char buf[HYD_TMPBUF_SIZE];425 HYD_Status status = HYD_SUCCESS;426 427 HYDU_FUNC_ENTER();428 429 *closed = 0;430 431 if (events & HYD_STDIN)432 HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR, "stdout handler got stdin event\n");433 434 391 if (stdout_fd < 0) 435 goto fn_exit;392 HYDU_ERR_SETANDJUMP1(status, HYD_INVALID_PARAM, "callback received invalid output fd %d\n", stdout_fd); 436 393 437 394 /* Connection has closed */ 395 /* Knowing the connection has closed is no longer given by a read * 396 * and checking if the read returned 0. Headers are ALWAYS sent * 397 * and ALWAYS received. message_length ensures messages/headers * 398 * are not cannabalized by other reads. */ 438 399 if (message_length == -1) 439 400 { … … 442 403 } 443 404 405 /* Header should have already been read at this point. We now * 406 * read for the message. */ 444 407 count = 0; 445 408 do … … 453 416 else if (ret < 0 && errno != EAGAIN) 454 417 { 455 *closed = 1; 456 break; 418 HYDU_ERR_SETANDJUMP2(status, HYD_SOCK_ERROR, "read error on %d (%s)\n", fd, HYDU_strerror(errno)); 419 } 420 else if (ret == 0 && count != message_length) 421 { 422 HYDU_ERR_SETANDJUMP2(status, HYD_SOCK_ERROR, "socket closed before reading %d expected bytes on fd %d\n", message_length, fd); 457 423 } 458 424 } while(count != message_length); 459 425 460 if (count < 0) { 461 HYDU_ERR_SETANDJUMP2(status, HYD_SOCK_ERROR, "read error on %d (%s)\n", 462 fd, HYDU_strerror(errno)); 463 } 464 426 427 /* Write the message out. If message_length == 0, nothing is written */ 465 428 written = 0; 466 429 while (written != count) {
