root/mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_cb.c @ 4887

Revision 4887, 10.0 KB (checked in by balaji, 5 months ago)

Warning stomp.

Line 
1/* -*- Mode: C; c-basic-offset:4 ; -*- */
2/*
3 *  (C) 2008 by Argonne National Laboratory.
4 *      See COPYRIGHT in top-level directory.
5 */
6
7#include "hydra.h"
8#include "hydra_utils.h"
9#include "pmi_handle.h"
10#include "pmi_handle_common.h"
11#include "pmi_handle_v1.h"
12#include "pmci.h"
13#include "bsci.h"
14#include "demux.h"
15#include "pmi_serv.h"
16
17HYD_Status HYD_PMCD_pmi_connect_cb(int fd, HYD_Event_t events, void *userp)
18{
19    int accept_fd;
20    HYD_Status status = HYD_SUCCESS;
21
22    HYDU_FUNC_ENTER();
23
24    /* We got a PMI connection */
25    status = HYDU_sock_accept(fd, &accept_fd);
26    HYDU_ERR_POP(status, "accept error\n");
27
28    status = HYD_DMX_register_fd(1, &accept_fd, HYD_STDOUT, NULL, HYD_PMCD_pmi_cmd_cb);
29    HYDU_ERR_POP(status, "unable to register fd\n");
30
31  fn_exit:
32    HYDU_FUNC_EXIT();
33    return status;
34
35  fn_fail:
36    goto fn_exit;
37}
38
39
40HYD_Status HYD_PMCD_pmi_cmd_cb(int fd, HYD_Event_t events, void *userp)
41{
42    int linelen, i, cmdlen;
43    char *buf = NULL, *tbuf = NULL, *cmd, *args[HYD_NUM_TMP_STRINGS];
44    char *str1 = NULL, *str2 = NULL;
45    struct HYD_PMCD_pmi_handle_fns *h;
46    HYD_Status status = HYD_SUCCESS;
47    int buflen = 0;
48    char *bufptr;
49
50    HYDU_FUNC_ENTER();
51
52    /* We got a PMI command */
53
54    buflen = HYD_TMPBUF_SIZE;
55
56    HYDU_MALLOC(buf, char *, buflen, status);
57    bufptr = buf;
58   
59    /*
60     * FIXME: This is a big hack. We temporarily initialize to
61     * PMI-v1. If the incoming message is an "init", it will
62     * reinitialize the function pointers. If we get an unsolicited
63     * command, we just use the PMI-1 version for it.
64     *
65     * This part of the code should not know anything about PMI-1
66     * vs. PMI-2. But the simple PMI client-side code is so hacked up,
67     * that commands can arrive out-of-order and this is necessary.
68     */
69    if (HYD_PMCD_pmi_handle == NULL)
70        HYD_PMCD_pmi_handle = HYD_PMCD_pmi_v1;
71
72    do {
73        status = HYDU_sock_read(fd, bufptr, 6, &linelen, HYDU_SOCK_COMM_MSGWAIT);
74        HYDU_ERR_POP(status, "unable to read the length of the command");
75        buflen -= linelen;
76        bufptr += linelen;
77
78        /* Unexpected termination of connection */
79        if (linelen == 0)
80            break;
81
82        /* If we get "cmd=" here, we just assume that this is PMI-1
83         * format (or a PMI-2 command that is backward compatible). */
84        if (!strncmp(buf, "cmd=", strlen("cmd="))) {
85            /* PMI-1 format command; read the rest of it */
86            status = HYDU_sock_readline(fd, bufptr, buflen, &linelen);
87            HYDU_ERR_POP(status, "PMI read line error\n");
88            buflen -= linelen;
89            bufptr += linelen;
90
91            /* Unexpected termination of connection */
92            if (linelen == 0)
93                break;
94            else 
95                *(bufptr-1) = '\0';
96
97            /* Here we only get PMI-1 commands or backward compatible
98             * PMI-2 commands, so we always explicitly use the PMI-1
99             * delimiter. This allows us to get backward-compatible
100             * PMI-2 commands interleaved with regular PMI-2
101             * commands. */
102            tbuf = HYDU_strdup(buf);
103            cmd = strtok(tbuf, HYD_PMCD_pmi_v1->delim);
104            for (i = 0; i < HYD_NUM_TMP_STRINGS; i++) {
105                args[i] = strtok(NULL, HYD_PMCD_pmi_v1->delim);
106                if (args[i] == NULL)
107                    break;
108            }
109
110            if (!strcmp("cmd=init", cmd)) {
111                /* Init is generic to all PMI implementations */
112                status = HYD_PMCD_pmi_handle_init(fd, args);
113                goto fn_exit;
114            }
115        }
116        else {  /* PMI-2 command */
117            *bufptr = '\0';
118            cmdlen = atoi(buf);
119
120            status = HYDU_sock_read(fd, buf, cmdlen, &linelen, HYDU_SOCK_COMM_MSGWAIT);
121            HYDU_ERR_POP(status, "PMI read line error\n");
122            buf[linelen] = 0;
123        }
124    } while (0);
125
126    if (linelen == 0) {
127        /* This is not a clean close. If a finalize was called, we
128         * would have deregistered this socket. The application might
129         * have aborted. Just cleanup all the processes */
130        status = HYD_PMCD_pmi_serv_cleanup();
131        if (status != HYD_SUCCESS) {
132            HYDU_Warn_printf("bootstrap server returned error cleaning up processes\n");
133            status = HYD_SUCCESS;
134            goto fn_fail;
135        }
136
137        status = HYD_DMX_deregister_fd(fd);
138        if (status != HYD_SUCCESS) {
139            HYDU_Warn_printf("unable to deregister fd %d\n", fd);
140            status = HYD_SUCCESS;
141            goto fn_fail;
142        }
143
144        close(fd);
145        goto fn_exit;
146    }
147
148    /* Use the PMI version specific command delimiter to find what
149     * command we got and call the appropriate handler
150     * function. Before we get an "init", we are preinitialized to
151     * PMI-1, so we will use that delimited even for PMI-2 for this
152     * one command. From the next command onward, we will use the
153     * PMI-2 specific delimiter. */
154    cmd = strtok(buf, HYD_PMCD_pmi_handle->delim);
155    for (i = 0; i < HYD_NUM_TMP_STRINGS; i++) {
156        args[i] = strtok(NULL, HYD_PMCD_pmi_handle->delim);
157        if (args[i] == NULL)
158            break;
159    }
160
161    if (cmd == NULL) {
162        status = HYD_SUCCESS;
163    }
164    else {
165        /* Search for the PMI command in our table */
166        status = HYDU_strsplit(cmd, &str1, &str2, '=');
167        HYDU_ERR_POP(status, "string split returned error\n");
168
169        h = HYD_PMCD_pmi_handle->handle_fns;
170        while (h->handler) {
171            if (!strcmp(str2, h->cmd)) {
172                status = h->handler(fd, args);
173                HYDU_ERR_POP(status, "PMI handler returned error\n");
174                break;
175            }
176            h++;
177        }
178        if (!h->handler) {
179            /* We don't understand the command */
180            HYDU_Error_printf("Unrecognized PMI command: %s | cleaning up processes\n", cmd);
181
182            /* Cleanup all the processes and return. We don't need to
183             * check the return status since we are anyway returning
184             * an error */
185            HYD_PMCD_pmi_serv_cleanup();
186            HYDU_ERR_SETANDJUMP(status, HYD_SUCCESS, "");
187        }
188    }
189
190  fn_exit:
191    if (tbuf)
192        HYDU_FREE(tbuf);
193    if (buf)
194        HYDU_FREE(buf);
195    if (str1)
196        HYDU_FREE(str1);
197    if (str2)
198        HYDU_FREE(str2);
199    HYDU_FUNC_EXIT();
200    return status;
201
202  fn_fail:
203    goto fn_exit;
204}
205
206
207HYD_Status HYD_PMCD_pmi_serv_control_connect_cb(int fd, HYD_Event_t events, void *userp)
208{
209    int accept_fd, partition_id, count;
210    struct HYD_Partition *partition;
211    HYD_Status status = HYD_SUCCESS;
212
213    HYDU_FUNC_ENTER();
214
215    /* We got a control socket connection */
216    status = HYDU_sock_accept(fd, &accept_fd);
217    HYDU_ERR_POP(status, "accept error\n");
218
219    /* Read the partition ID */
220    status = HYDU_sock_read(accept_fd, &partition_id, sizeof(int), &count,
221                            HYDU_SOCK_COMM_MSGWAIT);
222    HYDU_ERR_POP(status, "sock read returned error\n");
223
224    /* Find the partition */
225    FORALL_PARTITIONS(partition, HYD_handle.partition_list) {
226        if (partition->base->partition_id == partition_id)
227            break;
228    }
229    HYDU_ERR_CHKANDJUMP1(status, partition == NULL, HYD_INTERNAL_ERROR,
230                         "cannot find partition with ID %d\n", partition_id);
231
232    /* This will be the control socket for this partition */
233    partition->control_fd = accept_fd;
234
235    /* Send out the executable information */
236    status = HYD_PMCD_pmi_send_exec_info(partition);
237    HYDU_ERR_POP(status, "unable to send exec info to proxy\n");
238
239    status = HYD_DMX_register_fd(1, &accept_fd, HYD_STDOUT, partition,
240                                 HYD_PMCD_pmi_serv_control_cb);
241    HYDU_ERR_POP(status, "unable to register fd\n");
242
243  fn_exit:
244    HYDU_FUNC_EXIT();
245    return status;
246
247  fn_fail:
248    goto fn_exit;
249}
250
251
252HYD_Status HYD_PMCD_pmi_serv_control_cb(int fd, HYD_Event_t events, void *userp)
253{
254    struct HYD_Partition *partition;
255    struct HYD_Partition_exec *exec;
256    int count, proc_count;
257    HYD_Status status = HYD_SUCCESS;
258
259    HYDU_FUNC_ENTER();
260
261    partition = (struct HYD_Partition *) userp;
262
263    proc_count = 0;
264    for (exec = partition->exec_list; exec; exec = exec->next)
265        proc_count += exec->proc_count;
266
267    HYDU_MALLOC(partition->exit_status, int *, proc_count * sizeof(int), status);
268
269    status = HYDU_sock_read(fd, (void *) partition->exit_status, proc_count * sizeof(int),
270                            &count, HYDU_SOCK_COMM_MSGWAIT);
271    HYDU_ERR_POP(status, "unable to read status from proxy\n");
272
273    status = HYD_DMX_deregister_fd(fd);
274    HYDU_ERR_POP(status, "error deregistering fd\n");
275
276    close(fd);
277
278  fn_exit:
279    HYDU_FUNC_EXIT();
280    return status;
281
282  fn_fail:
283    goto fn_exit;
284}
285
286
287HYD_Status HYD_PMCD_pmi_serv_cleanup(void)
288{
289    struct HYD_Partition *partition;
290    enum HYD_PMCD_pmi_proxy_cmds cmd;
291    HYD_Status status = HYD_SUCCESS, overall_status = HYD_SUCCESS;
292
293    HYDU_FUNC_ENTER();
294
295    /* FIXME: Instead of doing this from this process itself, fork a
296     * bunch of processes to do this. */
297    /* Connect to all proxies and send a KILL command */
298    FORALL_ACTIVE_PARTITIONS(partition, HYD_handle.partition_list) {
299        cmd = KILL_JOB;
300        status = HYDU_sock_trywrite(partition->control_fd, &cmd,
301                                    sizeof(enum HYD_PMCD_pmi_proxy_cmds));
302        if (status != HYD_SUCCESS) {
303            HYDU_Warn_printf("unable to send data to the proxy on %s\n",
304                             partition->base->name);
305            overall_status = HYD_INTERNAL_ERROR;
306            continue;   /* Move on to the next proxy */
307        }
308    }
309
310    HYDU_FUNC_EXIT();
311
312    return overall_status;
313}
314
315
316void HYD_PMCD_pmi_serv_signal_cb(int sig)
317{
318    HYDU_FUNC_ENTER();
319
320    if (sig == SIGINT || sig == SIGQUIT || sig == SIGTERM
321#if defined SIGSTOP
322        || sig == SIGSTOP
323#endif /* SIGSTOP */
324#if defined SIGCONT
325        || sig == SIGCONT
326#endif /* SIGSTOP */
327) {
328        /* There's nothing we can do with the return value for now. */
329        HYD_PMCD_pmi_serv_cleanup();
330        exit(-1);
331    }
332    else {
333        /* Ignore other signals for now */
334    }
335
336    HYDU_FUNC_EXIT();
337    return;
338}
Note: See TracBrowser for help on using the browser.