Changeset 4931

Show
Ignore:
Timestamp:
07/09/09 15:00:30 (8 months ago)
Author:
dkim
Message:

Code cleanup.

Location:
mpich2/branches/dev/dkim/src/pm/hydra
Files:
9 modified

Legend:

Unmodified
Added
Removed
  • mpich2/branches/dev/dkim/src/pm/hydra/include/hydra_utils.h

    r4908 r4931  
    230230HYD_Status HYDU_sock_set_nonblock(int fd); 
    231231HYD_Status HYDU_sock_set_cloexec(int fd); 
    232 HYD_Status HYDU_sock_stdout_cb(int fd, HYD_Event_t events, int stdout_fd, int *closed); 
    233 HYD_Status HYDU_sock_stdout_file_cb(int fd, HYD_Event_t events, int stdout_fd, int *closed, int message_length); 
    234 /*HYD_Status HYDU_sock_stderr_file_cb(int fd, HYD_Event_t events, int stdout_fd, int *closed, int message_length);*/ 
     232HYD_Status HYDU_sock_stdout_cb(int fd, HYD_Event_t events, int stdout_fd, int *closed, int message_length); 
    235233HYD_Status HYDU_sock_stdin_cb(int fd, HYD_Event_t events, int stdin_fd, char *buf, 
    236234                              int *buf_count, int *buf_offset, int *closed); 
  • mpich2/branches/dev/dkim/src/pm/hydra/pm/pmiserv/pmi_proxy_cb.c

    r4912 r4931  
    128128    HYDU_ERR_POP(status, "make header and message error\n"); 
    129129 
    130     if (pmi_header.message_length >= 0) 
    131         { 
    132             status = HYD_PMCD_pmi_proxy_write_header(HYD_PMCD_pmi_proxy_params.upstream.out, &pmi_header); 
    133             HYDU_ERR_POP(status, "header write error\n"); 
    134             status = HYD_PMCD_pmi_proxy_write_message(HYD_PMCD_pmi_proxy_params.upstream.out, buf, &pmi_header); 
    135             HYDU_ERR_POP(status, "message write error\n"); 
    136         } 
     130    status = HYD_PMCD_pmi_proxy_write_header(HYD_PMCD_pmi_proxy_params.upstream.out, &pmi_header); 
     131    HYDU_ERR_POP(status, "header write error\n"); 
     132     
     133    status = HYD_PMCD_pmi_proxy_write_message(HYD_PMCD_pmi_proxy_params.upstream.out, buf, &pmi_header); 
     134    HYDU_ERR_POP(status, "message write error\n"); 
    137135 
    138136    if (pmi_header.message_length == 0) 
    139137        closed = 1; 
    140  
    141     /*status = HYDU_sock_stdout_cb(fd, events, HYD_PMCD_pmi_proxy_params.upstream.out, &closed); 
    142       HYDU_ERR_POP(status, "stdout callback error\n");*/ 
    143138 
    144139    if (closed) { 
     
    183178    HYDU_ERR_POP(status, "make header and message error\n"); 
    184179 
    185     if (pmi_header.message_length >= 0) 
    186         { 
    187             status = HYD_PMCD_pmi_proxy_write_header(HYD_PMCD_pmi_proxy_params.upstream.err, &pmi_header); 
    188             HYDU_ERR_POP(status, "header write error\n"); 
    189             status = HYD_PMCD_pmi_proxy_write_message(HYD_PMCD_pmi_proxy_params.upstream.err, buf, &pmi_header); 
    190             HYDU_ERR_POP(status, "message write error\n"); 
    191         } 
     180    status = HYD_PMCD_pmi_proxy_write_header(HYD_PMCD_pmi_proxy_params.upstream.err, &pmi_header); 
     181    HYDU_ERR_POP(status, "header write error\n"); 
     182 
     183    status = HYD_PMCD_pmi_proxy_write_message(HYD_PMCD_pmi_proxy_params.upstream.err, buf, &pmi_header); 
     184    HYDU_ERR_POP(status, "message write error\n"); 
    192185 
    193186    if (pmi_header.message_length == 0) 
    194187        closed = 1; 
    195  
    196     /*status = HYDU_sock_stdout_cb(fd, events, HYD_PMCD_pmi_proxy_params.upstream.err, &closed); 
    197       HYDU_ERR_POP(status, "stdout callback error\n");*/ 
    198188 
    199189    if (closed) { 
  • mpich2/branches/dev/dkim/src/pm/hydra/pm/pmiserv/pmi_proxy_utils.c

    r4910 r4931  
    671671 
    672672    if (i == HYD_PMCD_pmi_proxy_params.exec_proc_count) 
    673         pmi_header->pmi_id = -1; 
     673        HYDU_ERR_SETANDJUMP(status, HYD_INVALID_PARAM, "no matching fd\n"); 
    674674 
    675675    pmi_header->pmi_id = HYDU_local_to_global_id(i, HYD_PMCD_pmi_proxy_params.partition_core_count, HYD_PMCD_pmi_proxy_params.segment_list, HYD_PMCD_pmi_proxy_params.global_core_count); 
     
    699699{ 
    700700    int i, written; 
     701    const int intsize = sizeof(int); 
    701702    HYD_Status status = HYD_SUCCESS; 
    702703 
     
    704705 
    705706    i = 0; 
    706     while (i != sizeof(int)) 
     707    while (i != intsize) 
    707708        { 
    708             written = write(upstream_fd, (char *)&(pmi_header->pmi_id) + i, sizeof(int) - i); 
    709  
    710             if (written < 0 && errno != EAGAIN) 
    711                     HYDU_ERR_SETANDJUMP2(status, HYD_SOCK_ERROR, "write error on %d (%s)\n", upstream_fd, HYDU_strerror(errno)); 
     709            written = write(upstream_fd, (char *)&(pmi_header->pmi_id) + i, intsize - i); 
    712710 
    713711            if (written > 0) 
    714712                i += written; 
     713            else if (written < 0 && errno != EAGAIN) 
     714                    HYDU_ERR_SETANDJUMP2(status, HYD_SOCK_ERROR, "write error on %d (%s)\n", upstream_fd, HYDU_strerror(errno)); 
    715715        } 
    716716 
    717717    i = 0; 
    718     while (i != sizeof(int)) 
     718    while (i != intsize) 
    719719        { 
    720             written = write(upstream_fd, (char *)&(pmi_header->message_length) + i, sizeof(int) - i); 
    721  
    722             if (written < 0 && errno != EAGAIN) 
    723                     HYDU_ERR_SETANDJUMP2(status, HYD_SOCK_ERROR, "write error on %d (%s)\n", upstream_fd, HYDU_strerror(errno)); 
     720            written = write(upstream_fd, (char *)&(pmi_header->message_length) + i, intsize - i); 
    724721 
    725722            if (written > 0) 
    726723                i += written; 
     724            else if (written < 0 && errno != EAGAIN) 
     725                    HYDU_ERR_SETANDJUMP2(status, HYD_SOCK_ERROR, "write error on %d (%s)\n", upstream_fd, HYDU_strerror(errno)); 
    727726        } 
    728727 
     
    747746            written = write(upstream_fd, buf + i, pmi_header->message_length - i); 
    748747 
    749             if (written < 0 && errno != EAGAIN) 
     748            if (written > 0) 
     749                { 
     750                    i +=written; 
     751                } 
     752            else if (written < 0 && errno != EAGAIN) 
    750753                { 
    751754                    HYDU_ERR_SETANDJUMP2(status, HYD_SOCK_ERROR, "write error on %d (%s)\n", upstream_fd, HYDU_strerror(errno)); 
    752755                } 
    753             else if (written == 0) 
    754                 { 
    755                     break; 
    756                 } 
    757             else 
    758                 i += written; 
    759756        } 
    760757 
  • mpich2/branches/dev/dkim/src/pm/hydra/ui/mpiexec/callback.c

    r4921 r4931  
    5050    HYDU_FUNC_ENTER(); 
    5151 
    52     HYD_UII_mpx_read_header(fd, &rank, &message_length); 
     52    status = HYD_UII_mpx_read_header(fd, &rank, &message_length); 
     53    HYDU_ERR_SETANDJUMP2(status, status, "header read error on fd %d: %s\n", fd, HYDU_strerror(errno)); 
    5354 
    5455    if (rank < 0 || rank > HYD_handle.process_count) 
    55         { 
    56             goto fn_fail; 
    57         } 
     56        HYDU_ERR_SETANDJUMP2(status, HYD_INVALID_PARAM, "invalid rank %d returned on fd %d\n", rank, fd); 
    5857 
    59     /* Write output to stdout fd */ 
     58    /* Write output to appropriate stdout fd */ 
    6059    if (HYD_handle.stdout_file && HYD_handle.multiple_stdout_files) 
    61         status = HYDU_sock_stdout_file_cb(fd, events, HYD_handle.stdout_file_fds[rank], &closed, message_length); 
     60        status = HYDU_sock_stdout_cb(fd, events, HYD_handle.stdout_file_fds[rank], &closed, message_length); 
    6261    else if (HYD_handle.stdout_file) 
    63         status = HYDU_sock_stdout_file_cb(fd, events, HYD_handle.stdout_file_fds[0], &closed, message_length); 
     62        status = HYDU_sock_stdout_cb(fd, events, HYD_handle.stdout_file_fds[0], &closed, message_length); 
    6463    else 
    65         status = HYDU_sock_stdout_file_cb(fd, events, 1, &closed, message_length); 
     64        status = HYDU_sock_stdout_cb(fd, events, 1, &closed, message_length); 
    6665 
    6766    HYDU_ERR_SETANDJUMP2(status, status, "stdout callback error on fd %d: %s\n", 
     
    9089    HYDU_FUNC_ENTER(); 
    9190 
    92     HYD_UII_mpx_read_header(fd, &rank, &message_length); 
     91    status = HYD_UII_mpx_read_header(fd, &rank, &message_length); 
     92    HYDU_ERR_SETANDJUMP2(status, status, "header read error on fd %d: %s\n", fd, HYDU_strerror(errno)); 
    9393 
    9494    if (rank < 0 || rank > HYD_handle.process_count) 
    95         { 
    96             goto fn_fail; 
    97         } 
     95        HYDU_ERR_SETANDJUMP2(status, HYD_INVALID_PARAM, "invalid rank %d returned on fd %d\n", rank, fd); 
    9896 
    99     /* Write output to stderr fd */ 
     97    /* Write output to appropriate stderr fd */ 
    10098    if (HYD_handle.stderr_file && HYD_handle.multiple_stderr_files) 
    101         status = HYDU_sock_stdout_file_cb(fd, events, HYD_handle.stderr_file_fds[rank], &closed, message_length); 
     99        status = HYDU_sock_stdout_cb(fd, events, HYD_handle.stderr_file_fds[rank], &closed, message_length); 
    102100    else if (HYD_handle.stderr_file) 
    103         status = HYDU_sock_stdout_file_cb(fd, events, HYD_handle.stderr_file_fds[0], &closed, message_length); 
     101        status = HYDU_sock_stdout_cb(fd, events, HYD_handle.stderr_file_fds[0], &closed, message_length); 
    104102    else 
    105         status = HYDU_sock_stdout_file_cb(fd, events, 2, &closed, message_length); 
     103        status = HYDU_sock_stdout_cb(fd, events, 2, &closed, message_length); 
    106104 
    107105    HYDU_ERR_SETANDJUMP2(status, status, "stdout callback error on %d (%s)\n", 
  • mpich2/branches/dev/dkim/src/pm/hydra/ui/mpiexec/mpiexec.c

    r4921 r4931  
    247247    /* If output from ALL processes are dumped into a single file */ 
    248248    else if (HYD_handle.stdout_file) { 
     249        partition = HYD_handle.partition_list; 
    249250        HYD_handle.stdout_file_fds = (int *) malloc(sizeof(int)); 
    250         HYD_handle.stdout_file_fds[0] = -1; 
     251 
     252        name = make_filename(HYD_handle.stdout_file, partition->base->name, partition->base->partition_id, HYD_handle.process_count); 
     253        HYD_handle.stdout_file_fds[0] = open(name, O_RDWR | O_CREAT, 0644); 
     254        HYDU_FREE(name); 
    251255    } 
    252256 
     
    271275    /* If error from ALL processes are dumped into a single file */ 
    272276    else if (HYD_handle.stderr_file) { 
     277        partition = HYD_handle.partition_list; 
    273278        HYD_handle.stderr_file_fds = (int *) malloc(sizeof(int)); 
    274         HYD_handle.stderr_file_fds[0] = -1; 
     279 
     280        name = make_filename(HYD_handle.stderr_file, partition->base->name, partition->base->partition_id, HYD_handle.process_count); 
     281        HYD_handle.stderr_file_fds[0] = open(name, O_RDWR | O_CREAT, 0644); 
     282        HYDU_FREE(name); 
    275283    } 
    276284 
    277285    FORALL_ACTIVE_PARTITIONS(partition, HYD_handle.partition_list) { 
    278286        if (partition->base->out != -1) { 
    279             /* If output from ALL processes are dumped into a single file */ 
    280             if (HYD_handle.stdout_file && HYD_handle.stdout_file_fds[0] == -1) 
    281                 { 
    282                     name = make_filename(HYD_handle.stdout_file, partition->base->name, partition->base->partition_id, HYD_handle.process_count); 
    283  
    284                     close(1); 
    285                     HYD_handle.stdout_file_fds[0] = open(name, O_RDWR | O_CREAT, 0644); 
    286                     HYDU_FREE(name); 
    287                 } 
    288  
    289287            status = HYD_DMX_register_fd(1, &partition->base->out, HYD_STDOUT, NULL, 
    290288                                         HYD_UII_mpx_stdout_cb); 
     
    293291 
    294292        if (partition->base->err != -1) { 
    295             /* If error from ALL processes are dumped into a single file */ 
    296             if (HYD_handle.stderr_file && HYD_handle.stderr_file_fds[0] == -1) 
    297                 { 
    298                     name = make_filename(HYD_handle.stderr_file, partition->base->name, partition->base->partition_id, HYD_handle.process_count); 
    299  
    300                     close(2); 
    301                     HYD_handle.stderr_file_fds[0] = open(name, O_RDWR | O_CREAT, 0644); 
    302                     HYDU_FREE(name); 
    303                 } 
    304  
    305293            status = HYD_DMX_register_fd(1, &partition->base->err, HYD_STDOUT, NULL, 
    306294                                         HYD_UII_mpx_stderr_cb); 
     
    346334    HYDU_ERR_POP(status, "process manager error on finalize\n"); 
    347335 
    348     if (HYD_handle.stdout_file_fds) 
    349         HYDU_FREE(HYD_handle.stdout_file_fds); 
    350  
    351     if (HYD_handle.stderr_file_fds) 
    352         HYDU_FREE(HYD_handle.stderr_file_fds); 
    353  
    354336    /* Free the mpiexec params */ 
    355337    HYD_UIU_free_params(); 
  • mpich2/branches/dev/dkim/src/pm/hydra/ui/mpiexec/mpiexec.h

    r4921 r4931  
    1515HYD_Status HYD_UII_mpx_stdin_cb(int fd, HYD_Event_t events, void *userp); 
    1616char *make_filename(char *input, char *hostname, int rank, int proc_count); 
    17 void HYD_UII_mpx_read_header(int fd, int *rank, int *message_length); 
     17HYD_Status HYD_UII_mpx_read_header(int fd, int *rank, int *message_length); 
    1818 
    1919#endif /* MPIEXEC_H_INCLUDED */ 
  • mpich2/branches/dev/dkim/src/pm/hydra/ui/mpiexec/utils.c

    r4921 r4931  
    4040} 
    4141 
     42/* parses filename argument and returns the final formatted filename */ 
    4243char *make_filename(char *input, char *hostname, int rank, int proc_count) 
    4344{ 
     
    4748    char *hostnameptr = NULL; 
    4849    char *temp = NULL; 
     50    const int charsize = sizeof(char); 
    4951    HYD_Status status = HYD_SUCCESS; 
    5052 
     
    5355    hostnameptr = strstr(input, "%q"); 
    5456 
    55     /* find how many digits proc_count is */  /* is this necessary? */ 
     57    /* find how many digits proc_count is */ 
    5658    divten = proc_count / 10; 
    5759 
     
    6264        } 
    6365 
     66    difference = num_digits - strlen(rankstr); 
     67 
    6468    /* create a string for rank which contains leading zeroes * 
    65      * ie - 001 instead of just 1 for a job running 100-1000 * 
    66      * processes.  This makes reading through the list of    * 
    67      * output files much easier as they are sorted correctly */ 
    68     difference = num_digits - strlen(rankstr); 
    69  
     69     * ie - 001 instead of just 1 for a job running 1000      * 
     70     * processes.  This makes reading through the list of     * 
     71     * output files much easier as they are sorted correctly  */ 
    7072    if (difference > 0) 
    7173        { 
    72             temp = (char *) calloc(num_digits + 1, sizeof(char)); 
     74            temp = (char *) calloc(num_digits + 1, charsize); 
    7375 
    7476            i = 0; 
     
    9294 
    9395    /* allocate - use calloc to initialize to zero */ 
    94     char *filename = (char *) calloc(size, sizeof(char)); 
     96    char *filename = (char *) calloc(size, charsize); 
    9597 
    9698    /* read each character from the input one at a time and put into         * 
     
    101103    while (input[i + l] && i + l < size) 
    102104        { 
     105            /* if current position in array does not match a flag location */ 
    103106            if (&input[i + l] != rankptr && &input[i + l] != hostnameptr) 
    104107                { 
    105108                    filename[i + k] = input[i + l]; 
    106109                } 
     110            /* if current position matches location of '%p' */ 
    107111            else if (&input[i + l] == rankptr) 
    108112                { 
     
    115119                    l++; 
    116120                } 
     121            /* current position matches '%q' */ 
    117122            else 
    118123                { 
     
    133138} 
    134139 
    135 void HYD_UII_mpx_read_header(int fd, int *rank, int *message_length) 
     140HYD_Status HYD_UII_mpx_read_header(int fd, int *rank, int *message_length) 
    136141{ 
    137142    int i, ret; 
     143    const int intsize = sizeof(int); 
     144    HYD_Status status = HYD_SUCCESS; 
     145 
     146    HYDU_FUNC_ENTER() 
    138147 
    139148    i = 0; 
    140     while (i != sizeof(int)) 
     149    while (i != intsize) 
    141150        { 
    142             ret = read(fd, (char *)rank + i, sizeof(int) - i); 
    143  
    144             /*if (ret < 0 && errno != EAGAIN) 
    145                 { 
    146                     /* ERROR MESSAGE HERE */ 
    147             /*break;*/ 
    148                     /*}*/ 
     151            ret = read(fd, (char *)rank + i, intsize - i); 
    149152 
    150153            if (ret > 0) 
    151                 i += ret; 
    152             else 
    153                 break; 
     154                { 
     155                    i += ret; 
     156                } 
     157            else if (ret < 0 && errno != EAGAIN) 
     158                { 
     159                    HYDU_ERR_SETANDJUMP2(status, HYD_SOCK_ERROR, "read error on %d (%s)\n", fd, HYDU_strerror(errno)); 
     160                } 
     161            else if (ret == 0 && i != intsize) 
     162                { 
     163                    HYDU_ERR_SETANDJUMP2(status, HYD_SOCK_ERROR, "socket closed before reading %d expected bytes on fd %d\n", intsize, fd); 
     164                } 
    154165        } 
    155166 
    156167    i = 0; 
    157     while (i != sizeof(int)) 
     168    while (i != intsize) 
    158169        { 
    159             ret = read(fd, (char *)message_length + i, sizeof(int) - i); 
    160  
    161             /*if (ret < 0 && errno != EAGAIN) 
    162                 { 
    163                     /* ERROR MESSAGE HERE */ 
    164             /*break;*/ 
    165                     /*}*/ 
     170            ret = read(fd, (char *)message_length + i, intsize - i); 
    166171 
    167172            if (ret > 0) 
    168                 i += ret; 
    169             else 
    170                 break; 
     173                { 
     174                    i += ret; 
     175                } 
     176            else if (ret < 0 && errno != EAGAIN) 
     177                { 
     178                    HYDU_ERR_SETANDJUMP2(status, HYD_SOCK_ERROR, "read error on %d (%s)\n", fd, HYDU_strerror(errno)); 
     179                } 
     180            else if (ret == 0 && i != intsize) 
     181                { 
     182                    HYDU_ERR_SETANDJUMP2(status, HYD_SOCK_ERROR, "socket closed before reading %d expected bytes on fd %d\n", intsize, fd); 
     183                } 
    171184        } 
     185 
     186 fn_exit: 
     187    HYDU_FUNC_EXIT(); 
     188    return status; 
     189 
     190 fn_fail: 
     191    goto fn_exit; 
    172192} 
    173193 
  • mpich2/branches/dev/dkim/src/pm/hydra/ui/utils/uiu.c

    r4908 r4931  
    6666void HYD_UIU_free_params(void) 
    6767{ 
     68    if (HYD_handle.stdout_file_fds) 
     69        { 
     70            if (HYD_handle.multiple_stdout_files) 
     71                { 
     72                    int i; 
     73                    for (i = 0; i < HYD_handle.process_count; i++) 
     74                        if (HYD_handle.stdout_file_fds[i] != -1) 
     75                            close(HYD_handle.stdout_file_fds[i]); 
     76                } 
     77            else if (HYD_handle.stdout_file_fds[0] != -1) 
     78                close(HYD_handle.stdout_file_fds[0]); 
     79                                   
     80            HYDU_FREE(HYD_handle.stdout_file_fds); 
     81        } 
     82 
     83    if (HYD_handle.stderr_file_fds) 
     84        { 
     85            if (HYD_handle.multiple_stderr_files) 
     86                { 
     87                    int i; 
     88                    for (i = 0; i < HYD_handle.process_count; i++) 
     89                        if (HYD_handle.stderr_file_fds[i] != -1) 
     90                            close(HYD_handle.stderr_file_fds[i]); 
     91                } 
     92            else if (HYD_handle.stderr_file_fds[0] != -1) 
     93                close(HYD_handle.stderr_file_fds[0]); 
     94                                   
     95            HYDU_FREE(HYD_handle.stderr_file_fds); 
     96        } 
     97 
    6898    if (HYD_handle.stdout_file) 
    6999        HYDU_FREE(HYD_handle.stdout_file); 
  • mpich2/branches/dev/dkim/src/pm/hydra/utils/sock/sock.c

    r4920 r4931  
    376376} 
    377377 
    378  
    379 HYD_Status HYDU_sock_stdout_cb(int fd, HYD_Event_t events, int stdout_fd, int *closed) 
     378HYD_Status HYDU_sock_stdout_cb(int fd, HYD_Event_t events, int stdout_fd, int *closed, int message_length) 
    380379{ 
    381380    int count, written, ret; 
     
    390389        HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR, "stdout handler got stdin event\n"); 
    391390 
    392     count = read(fd, buf, HYD_TMPBUF_SIZE); 
    393     if (count < 0) { 
    394         HYDU_ERR_SETANDJUMP2(status, HYD_SOCK_ERROR, "read error on %d (%s)\n", 
    395                              fd, HYDU_strerror(errno)); 
    396     } 
    397     else if (count == 0) { 
    398         /* The connection has closed */ 
    399         *closed = 1; 
    400         goto fn_exit; 
    401     } 
    402  
    403     written = 0; 
    404     while (written != count) { 
    405         ret = write(stdout_fd, buf + written, count - written); 
    406         if (ret < 0 && errno != EAGAIN) 
    407             HYDU_ERR_SETANDJUMP2(status, HYD_SOCK_ERROR, "write error on %d (%s)\n", 
    408                                  stdout_fd, HYDU_strerror(errno)); 
    409         if (ret > 0) 
    410             written += ret; 
    411     } 
    412  
    413   fn_exit: 
    414     HYDU_FUNC_EXIT(); 
    415     return status; 
    416  
    417   fn_fail: 
    418     goto fn_exit; 
    419 } 
    420  
    421 HYD_Status HYDU_sock_stdout_file_cb(int fd, HYD_Event_t events, int stdout_fd, int *closed, int message_length) 
    422 { 
    423     int count, written, ret; 
    424     char buf[HYD_TMPBUF_SIZE]; 
    425     HYD_Status status = HYD_SUCCESS; 
    426  
    427     HYDU_FUNC_ENTER(); 
    428  
    429     *closed = 0; 
    430  
    431     if (events & HYD_STDIN) 
    432         HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR, "stdout handler got stdin event\n"); 
    433  
    434391    if (stdout_fd < 0) 
    435         goto fn_exit; 
     392        HYDU_ERR_SETANDJUMP1(status, HYD_INVALID_PARAM, "callback received invalid output fd %d\n", stdout_fd); 
    436393 
    437394    /* Connection has closed */ 
     395    /* Knowing the connection has closed is no longer given by a read * 
     396     * and checking if the read returned 0.  Headers are ALWAYS sent  * 
     397     * and ALWAYS received.  message_length ensures messages/headers  * 
     398     * are not cannabalized by other reads.                           */ 
    438399    if (message_length == -1) 
    439400        { 
     
    442403        } 
    443404 
     405    /* Header should have already been read at this point.  We now * 
     406     * read for the message.                                       */ 
    444407    count = 0; 
    445408    do 
     
    453416            else if (ret < 0 && errno != EAGAIN) 
    454417                { 
    455                     *closed = 1; 
    456                     break; 
     418                    HYDU_ERR_SETANDJUMP2(status, HYD_SOCK_ERROR, "read error on %d (%s)\n", fd, HYDU_strerror(errno)); 
     419                } 
     420            else if (ret == 0 && count != message_length) 
     421                { 
     422                    HYDU_ERR_SETANDJUMP2(status, HYD_SOCK_ERROR, "socket closed before reading %d expected bytes on fd %d\n", message_length, fd); 
    457423                } 
    458424        } while(count != message_length); 
    459425 
    460     if (count < 0) { 
    461         HYDU_ERR_SETANDJUMP2(status, HYD_SOCK_ERROR, "read error on %d (%s)\n", 
    462                              fd, HYDU_strerror(errno)); 
    463     } 
    464  
     426 
     427    /* Write the message out.  If message_length == 0, nothing is written */ 
    465428    written = 0; 
    466429    while (written != count) {