root/mpich2/trunk/src/pm/hydra/demux/demux.c @ 4887

Revision 4887, 4.8 KB (checked in by balaji, 5 months ago)

Warning stomp.

Line 
1/* -*- Mode: C; c-basic-offset:4 ; -*- */
2/*
3 *  (C) 2008 by Argonne National Laboratory.
4 *      See COPYRIGHT in top-level directory.
5 */
6
7#include "hydra.h"
8#include "hydra_utils.h"
9#include "demux.h"
10
11static int num_cb_fds = 0;
12
13typedef struct HYD_DMXI_callback {
14    int num_fds;
15    int *fd;
16    HYD_Event_t events;
17    void *userp;
18     HYD_Status(*callback) (int fd, HYD_Event_t events, void *userp);
19
20    struct HYD_DMXI_callback *next;
21} HYD_DMXI_callback_t;
22
23static HYD_DMXI_callback_t *cb_list = NULL;
24
25HYD_Status HYD_DMX_register_fd(int num_fds, int *fd, HYD_Event_t events, void *userp,
26                               HYD_Status(*callback) (int fd, HYD_Event_t events, void *userp))
27{
28    HYD_DMXI_callback_t *cb_element, *run;
29    int i;
30    HYD_Status status = HYD_SUCCESS;
31
32    HYDU_FUNC_ENTER();
33
34    for (i = 0; i < num_fds; i++)
35        if (fd[i] < 0)
36            HYDU_ERR_SETANDJUMP1(status, HYD_INTERNAL_ERROR, "registering bad fd %d\n", fd[i]);
37
38    HYDU_MALLOC(cb_element, HYD_DMXI_callback_t *, sizeof(HYD_DMXI_callback_t), status);
39    cb_element->num_fds = num_fds;
40    HYDU_MALLOC(cb_element->fd, int *, num_fds * sizeof(int), status);
41    memcpy(cb_element->fd, fd, num_fds * sizeof(int));
42    cb_element->events = events;
43    cb_element->userp = userp;
44    cb_element->callback = callback;
45    cb_element->next = NULL;
46
47    if (cb_list == NULL) {
48        cb_list = cb_element;
49    }
50    else {
51        run = cb_list;
52        while (run->next)
53            run = run->next;
54        run->next = cb_element;
55    }
56
57    num_cb_fds += num_fds;
58
59  fn_exit:
60    HYDU_FUNC_EXIT();
61    return status;
62
63  fn_fail:
64    goto fn_exit;
65}
66
67
68HYD_Status HYD_DMX_deregister_fd(int fd)
69{
70    int i;
71    HYD_DMXI_callback_t *cb_element;
72    HYD_Status status = HYD_SUCCESS;
73
74    HYDU_FUNC_ENTER();
75
76    cb_element = cb_list;
77    while (cb_element) {
78        for (i = 0; i < cb_element->num_fds; i++) {
79            if (cb_element->fd[i] == fd) {
80                cb_element->fd[i] = -1;
81                num_cb_fds--;
82                goto fn_exit;
83            }
84        }
85        cb_element = cb_element->next;
86    }
87
88    /* FD is not found */
89    HYDU_ERR_SETANDJUMP1(status, HYD_INTERNAL_ERROR,
90                         "could not find fd to deregister: %d\n", fd);
91
92  fn_exit:
93    HYDU_FUNC_EXIT();
94    return status;
95
96  fn_fail:
97    goto fn_exit;
98}
99
100
101HYD_Status HYD_DMX_wait_for_event(int wtime)
102{
103    int total_fds, i, j, events, ret;
104    HYD_DMXI_callback_t *run;
105    struct pollfd *pollfds = NULL;
106    HYD_Status status = HYD_SUCCESS;
107
108    HYDU_FUNC_ENTER();
109
110    HYDU_MALLOC(pollfds, struct pollfd *, num_cb_fds * sizeof(struct pollfd), status);
111
112    run = cb_list;
113    i = 0;
114    while (run) {
115        for (j = 0; j < run->num_fds; j++) {
116            if (run->fd[j] == -1)
117                continue;
118
119            pollfds[i].fd = run->fd[j];
120
121            pollfds[i].events = 0;
122            if (run->events & HYD_STDOUT)
123                pollfds[i].events |= POLLIN;
124            if (run->events & HYD_STDIN)
125                pollfds[i].events |= POLLOUT;
126
127            i++;
128        }
129        run = run->next;
130    }
131    total_fds = i;
132
133    while (1) {
134        ret = poll(pollfds, total_fds, wtime);
135        if (ret < 0) {
136            if (errno == EINTR) {
137                /* We were interrupted by a system call; this is not
138                 * an error case in the regular sense; but the upper
139                 * layer needs to gracefully cleanup the processes. */
140                status = HYD_SUCCESS;
141                goto fn_exit;
142            }
143            HYDU_ERR_SETANDJUMP1(status, HYD_SOCK_ERROR, "poll error (%s)\n",
144                                 HYDU_strerror(errno));
145        }
146        break;
147    }
148
149    run = cb_list;
150    i = 0;
151    while (run) {
152        for (j = 0; j < run->num_fds; j++) {
153            if (run->fd[j] == -1)
154                continue;
155
156            if (pollfds[i].revents) {
157                events = 0;
158                if (pollfds[i].revents & POLLOUT)
159                    events |= HYD_STDIN;
160                if (pollfds[i].revents & POLLIN)
161                    events |= HYD_STDOUT;
162
163                status = run->callback(pollfds[i].fd, events, run->userp);
164                HYDU_ERR_POP(status, "callback returned error status\n");
165            }
166
167            i++;
168            if (i == total_fds)
169                break;
170        }
171        run = run->next;
172
173        if (i == total_fds)
174            break;
175    }
176
177  fn_exit:
178    if (pollfds)
179        HYDU_FREE(pollfds);
180    HYDU_FUNC_EXIT();
181    return status;
182
183  fn_fail:
184    goto fn_exit;
185}
186
187
188HYD_Status HYD_DMX_finalize(void)
189{
190    HYD_DMXI_callback_t *run1, *run2;
191    HYD_Status status = HYD_SUCCESS;
192
193    HYDU_FUNC_ENTER();
194
195    run1 = cb_list;
196    while (run1) {
197        run2 = run1->next;
198        if (run1->fd)
199            HYDU_FREE(run1->fd);
200        HYDU_FREE(run1);
201        run1 = run2;
202    }
203    cb_list = NULL;
204
205    HYDU_FUNC_EXIT();
206    return status;
207}
Note: See TracBrowser for help on using the browser.