root/mpich2/trunk/src/pm/hydra/utils/sock/sock.c @ 4887

Revision 4887, 12.2 KB (checked in by balaji, 5 months ago)

Warning stomp.

Line 
1/* -*- Mode: C; c-basic-offset:4 ; -*- */
2/*
3 *  (C) 2008 by Argonne National Laboratory.
4 *      See COPYRIGHT in top-level directory.
5 */
6
7#include "hydra_utils.h"
8
9static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
10
11HYD_Status HYDU_sock_listen(int *listen_fd, char *port_range, uint16_t * port)
12{
13    struct sockaddr_in sa;
14    int one = 1;
15    uint16_t low_port, high_port;
16    char *port_str;
17    uint16_t i;
18    HYD_Status status = HYD_SUCCESS;
19
20    HYDU_FUNC_ENTER();
21
22    low_port = 0;
23    high_port = 0;
24    if (port_range) {
25        /* If port range is set, we always pick from there */
26        *port = 0;
27
28        port_str = strtok(port_range, ":");
29        if (port_str == NULL)
30            HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR, "error parsing port range\n");
31        low_port = atoi(port_str);
32
33        port_str = strtok(NULL, ":");
34        if (port_str == NULL)
35            HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR, "error parsing port range\n");
36        high_port = atoi(port_str);
37
38        if (high_port < low_port)
39            HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR, "high port < low port\n");
40    }
41    else {
42        /* If port range is NULL, if a port is already provided, we
43         * pick that. Otherwise, we search for an available port. */
44        low_port = *port;
45        high_port = *port;
46    }
47
48    *listen_fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
49    if (*listen_fd < 0)
50        HYDU_ERR_SETANDJUMP1(status, HYD_SOCK_ERROR, "cannot open socket (%s)\n",
51                             HYDU_strerror(errno));
52
53    if (setsockopt(*listen_fd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(int)) < 0)
54        HYDU_ERR_SETANDJUMP(status, HYD_SOCK_ERROR, "cannot set TCP_NODELAY\n");
55
56    for (i = low_port; i <= high_port; i++) {
57        memset((void *) &sa, 0, sizeof(sa));
58        sa.sin_family = AF_INET;
59        sa.sin_port = htons(i);
60        sa.sin_addr.s_addr = INADDR_ANY;
61
62        /* The sockets standard does not guarantee that a successful
63         * return here means that this is set. However, REUSEADDR not
64         * being set is not a fatal error, so we ignore that
65         * case. However, we do check for error cases, which means
66         * that something bad has happened. */
67        if (setsockopt(*listen_fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0)
68            HYDU_ERR_SETANDJUMP(status, HYD_SOCK_ERROR, "cannot set SO_REUSEADDR\n");
69
70        if (bind(*listen_fd, (struct sockaddr *) &sa, sizeof(sa)) < 0) {
71            close(*listen_fd);
72            /* If the address is in use, we should try the next
73             * port. Otherwise, it's an error. */
74            if (errno != EADDRINUSE)
75                HYDU_ERR_SETANDJUMP1(status, HYD_SOCK_ERROR, "bind error (%s)\n",
76                                     HYDU_strerror(errno));
77        }
78        else    /* We got a port */
79            break;
80    }
81
82    *port = i;
83    if (*port > high_port)
84        HYDU_ERR_SETANDJUMP(status, HYD_SOCK_ERROR, "no port to bind\n");
85
86    if (listen(*listen_fd, -1) < 0)
87        HYDU_ERR_SETANDJUMP1(status, HYD_SOCK_ERROR, "listen error (%s)\n",
88                             HYDU_strerror(errno));
89
90    /* We asked for any port, so we need to find out which port we
91     * actually got. */
92    if (*port == 0) {
93        socklen_t sinlen = sizeof(sa);
94
95        if (getsockname(*listen_fd, (struct sockaddr *) &sa, &sinlen) < 0)
96            HYDU_ERR_SETANDJUMP1(status, HYD_SOCK_ERROR, "getsockname error (%s)\n",
97                                 HYDU_strerror(errno));
98        *port = ntohs(sa.sin_port);
99    }
100
101  fn_exit:
102    HYDU_FUNC_EXIT();
103    return status;
104
105  fn_fail:
106    goto fn_exit;
107}
108
109
110HYD_Status HYDU_sock_connect(const char *host, uint16_t port, int *fd)
111{
112    struct hostent *ht;
113    struct sockaddr_in sa;
114    HYD_Status status = HYD_SUCCESS;
115
116    HYDU_FUNC_ENTER();
117
118    memset((char *) &sa, 0, sizeof(struct sockaddr_in));
119    sa.sin_family = AF_INET;
120    sa.sin_port = htons(port);
121
122    /* Get the remote host's IP address */
123    pthread_mutex_lock(&mutex);
124    ht = gethostbyname(host);
125    pthread_mutex_unlock(&mutex);
126    if (ht == NULL)
127        HYDU_ERR_SETANDJUMP1(status, HYD_INVALID_PARAM,
128                             "unable to get host address (%s)\n", HYDU_strerror(errno));
129    memcpy(&sa.sin_addr, ht->h_addr_list[0], ht->h_length);
130
131    /* Create a socket and set the required options */
132    *fd = socket(AF_INET, SOCK_STREAM, 0);
133    if (*fd < 0)
134        HYDU_ERR_SETANDJUMP1(status, HYD_SOCK_ERROR, "cannot open socket (%s)\n",
135                             HYDU_strerror(errno));
136
137    /* Not being able to connect is not an error in all cases. So we
138     * return an error, but only print a warning message. The upper
139     * layer can decide what to do with the return status. */
140    if (connect(*fd, (struct sockaddr *) &sa, sizeof(sa)) < 0) {
141        HYDU_Error_printf("connect error (%s)\n", HYDU_strerror(errno));
142        status = HYD_SOCK_ERROR;
143        goto fn_fail;
144    }
145
146  fn_exit:
147    HYDU_FUNC_EXIT();
148    return status;
149
150  fn_fail:
151    goto fn_exit;
152}
153
154
155HYD_Status HYDU_sock_accept(int listen_fd, int *fd)
156{
157    HYD_Status status = HYD_SUCCESS;
158
159    HYDU_FUNC_ENTER();
160
161    *fd = accept(listen_fd, 0, 0);
162    if (*fd < 0)
163        HYDU_ERR_SETANDJUMP1(status, HYD_SOCK_ERROR, "accept error (%s)\n",
164                             HYDU_strerror(errno));
165
166  fn_exit:
167    HYDU_FUNC_EXIT();
168    return status;
169
170  fn_fail:
171    goto fn_exit;
172}
173
174
175/* HYD_Sock_readline: Return the next newline-terminated string of
176 * maximum length maxlen.  This is a buffered version, and reads from
177 * fd as necessary. */
178HYD_Status HYDU_sock_readline(int fd, char *buf, int maxlen, int *linelen)
179{
180    int n;
181    HYD_Status status = HYD_SUCCESS;
182
183    HYDU_FUNC_ENTER();
184
185    *linelen = 0;
186    while (1) {
187        n = read(fd, buf + *linelen, maxlen - *linelen - 1);
188        if (n == 0) {   /* No more data to read */
189            break;
190        }
191        else if (n < 0) {
192            if (errno == EINTR)
193                continue;
194            HYDU_ERR_SETANDJUMP1(status, HYD_SOCK_ERROR, "read error (%s)\n",
195                                 HYDU_strerror(errno));
196        }
197
198        *linelen += n;
199        break;
200    }
201
202    /* Done reading; pad the last byte with a NULL */
203    buf[*linelen - 1] = 0;
204
205    /* Run through the read data and see if there are any new lines in
206     * there */
207    for (n = 0; n < *linelen; n++) {
208        if (*(buf + n) == '\n') {
209            *(buf + n) = 0;
210            *linelen = n;
211            break;
212        }
213    }
214
215  fn_exit:
216    HYDU_FUNC_EXIT();
217    return status;
218
219  fn_fail:
220    goto fn_exit;
221}
222
223
224HYD_Status HYDU_sock_writeline(int fd, const char *buf, int maxsize)
225{
226    int n;
227    HYD_Status status = HYD_SUCCESS;
228
229    HYDU_FUNC_ENTER();
230
231    if (buf[maxsize - 1] != '\n')
232        HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR, "line does not end in newline\n");
233
234    do {
235        n = write(fd, buf, maxsize);
236    } while (n < 0 && errno == EINTR);
237
238    if (n < maxsize)
239        HYDU_ERR_SETANDJUMP1(status, HYD_SOCK_ERROR, "write error (%s)\n",
240                             HYDU_strerror(errno));
241
242  fn_exit:
243    HYDU_FUNC_EXIT();
244    return status;
245
246  fn_fail:
247    goto fn_exit;
248}
249
250
251HYD_Status HYDU_sock_read(int fd, void *buf, int maxlen, int *count,
252                          enum HYDU_sock_comm_flag flag)
253{
254    int tmp;
255    HYD_Status status = HYD_SUCCESS;
256
257    HYDU_FUNC_ENTER();
258
259    *count = 0;
260    while (1) {
261        do {
262            tmp = read(fd, buf, maxlen);
263        } while (tmp < 0 && errno == EINTR);
264
265        if (tmp < 0) {
266            *count = tmp;
267            break;
268        }
269        *count += tmp;
270
271        if (flag != HYDU_SOCK_COMM_MSGWAIT || *count == maxlen || tmp == 0)
272            break;
273    };
274
275    if (*count < 0)
276        HYDU_ERR_SETANDJUMP1(status, HYD_SOCK_ERROR, "read errno (%s)\n",
277                             HYDU_strerror(errno));
278
279  fn_exit:
280    HYDU_FUNC_EXIT();
281    return status;
282
283  fn_fail:
284    goto fn_exit;
285}
286
287
288HYD_Status HYDU_sock_write(int fd, const void *buf, int maxsize)
289{
290    int n;
291    HYD_Status status = HYD_SUCCESS;
292
293    HYDU_FUNC_ENTER();
294
295    do {
296        n = write(fd, buf, maxsize);
297    } while (n < 0 && errno == EINTR);
298
299    if (n < maxsize)
300        HYDU_ERR_SETANDJUMP1(status, HYD_SOCK_ERROR, "write error (%s)\n",
301                             HYDU_strerror(errno));
302
303  fn_exit:
304    HYDU_FUNC_EXIT();
305    return status;
306
307  fn_fail:
308    goto fn_exit;
309}
310
311
312HYD_Status HYDU_sock_trywrite(int fd, const void *buf, int maxsize)
313{
314    int n;
315    HYD_Status status = HYD_SUCCESS;
316
317    HYDU_FUNC_ENTER();
318
319    do {
320        n = write(fd, buf, maxsize);
321    } while (n < 0 && errno == EINTR);
322
323    if (n < maxsize) {
324        HYDU_Warn_printf("write error (%s)\n", HYDU_strerror(errno));
325        status = HYD_SOCK_ERROR;
326        goto fn_fail;
327    }
328
329  fn_exit:
330    HYDU_FUNC_EXIT();
331    return status;
332
333  fn_fail:
334    goto fn_exit;
335}
336
337
338HYD_Status HYDU_sock_set_nonblock(int fd)
339{
340    int flags;
341    HYD_Status status = HYD_SUCCESS;
342
343    HYDU_FUNC_ENTER();
344
345    flags = fcntl(fd, F_GETFL, 0);
346    if (flags < 0)
347        HYDU_ERR_SETANDJUMP1(status, HYD_SOCK_ERROR, "fcntl failed on %d\n", fd);
348    if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0)
349        HYDU_ERR_SETANDJUMP1(status, HYD_SOCK_ERROR, "fcntl failed on %d\n", fd);
350
351  fn_exit:
352    HYDU_FUNC_EXIT();
353    return status;
354
355  fn_fail:
356    goto fn_exit;
357}
358
359
360HYD_Status HYDU_sock_set_cloexec(int fd)
361{
362    int flags;
363    HYD_Status status = HYD_SUCCESS;
364
365    HYDU_FUNC_ENTER();
366
367    flags = fcntl(fd, F_GETFL, 0);
368    if (flags >= 0) {
369        /* Make sure that exec'd processes don't get this fd */
370        flags |= FD_CLOEXEC;
371        fcntl(fd, F_SETFD, flags);
372    }
373
374    HYDU_FUNC_EXIT();
375    return status;
376}
377
378
379HYD_Status HYDU_sock_stdout_cb(int fd, HYD_Event_t events, int stdout_fd, int *closed)
380{
381    int count, written, ret;
382    char buf[HYD_TMPBUF_SIZE];
383    HYD_Status status = HYD_SUCCESS;
384
385    HYDU_FUNC_ENTER();
386
387    *closed = 0;
388
389    if (events & HYD_STDIN)
390        HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR, "stdout handler got stdin event\n");
391
392    count = read(fd, buf, HYD_TMPBUF_SIZE);
393    if (count < 0) {
394        HYDU_ERR_SETANDJUMP2(status, HYD_SOCK_ERROR, "read error on %d (%s)\n",
395                             fd, HYDU_strerror(errno));
396    }
397    else if (count == 0) {
398        /* The connection has closed */
399        *closed = 1;
400        goto fn_exit;
401    }
402
403    written = 0;
404    while (written != count) {
405        ret = write(stdout_fd, buf + written, count - written);
406        if (ret < 0 && errno != EAGAIN)
407            HYDU_ERR_SETANDJUMP2(status, HYD_SOCK_ERROR, "write error on %d (%s)\n",
408                                 stdout_fd, HYDU_strerror(errno));
409        if (ret > 0)
410            written += ret;
411    }
412
413  fn_exit:
414    HYDU_FUNC_EXIT();
415    return status;
416
417  fn_fail:
418    goto fn_exit;
419}
420
421
422HYD_Status HYDU_sock_stdin_cb(int fd, HYD_Event_t events, int stdin_fd, char *buf,
423                              int *buf_count, int *buf_offset, int *closed)
424{
425    int count;
426    HYD_Status status = HYD_SUCCESS;
427
428    HYDU_FUNC_ENTER();
429
430    *closed = 0;
431
432    if (events & HYD_STDOUT)
433        HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR, "stdin handler got stdout event\n");
434
435    while (1) {
436        /* If we already have buffered data, send it out */
437        if (*buf_count) {
438            count = write(fd, buf + *buf_offset, *buf_count);
439            if (count < 0)
440                HYDU_ERR_SETANDJUMP2(status, HYD_SOCK_ERROR, "write error on %d (%s)\n",
441                                     fd, HYDU_strerror(errno))
442                    * buf_offset += count;
443            *buf_count -= count;
444            break;
445        }
446
447        /* If we are still here, we need to refill our temporary buffer */
448        count = read(stdin_fd, buf, HYD_TMPBUF_SIZE);
449        if (count < 0) {
450            if (errno == EINTR || errno == EAGAIN || errno == ENOTCONN) {
451                /* This call was interrupted or there was no data to read; just break out. */
452                *closed = 1;
453                break;
454            }
455
456            HYDU_ERR_SETANDJUMP1(status, HYD_SOCK_ERROR, "read error on 0 (%s)\n",
457                                 HYDU_strerror(errno));
458        }
459        else if (count == 0) {
460            /* The connection has closed */
461            *closed = 1;
462            break;
463        }
464        *buf_count += count;
465    }
466
467  fn_exit:
468    HYDU_FUNC_EXIT();
469    return status;
470
471  fn_fail:
472    goto fn_exit;
473}
Note: See TracBrowser for help on using the browser.