root/mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_serv_launch.c @ 4887

Revision 4887, 19.4 KB (checked in by balaji, 5 months ago)

Warning stomp.

Line 
1/* -*- Mode: C; c-basic-offset:4 ; -*- */
2/*
3 *  (C) 2008 by Argonne National Laboratory.
4 *      See COPYRIGHT in top-level directory.
5 */
6
7#include "hydra.h"
8#include "hydra_utils.h"
9#include "pmci.h"
10#include "pmi_handle.h"
11#include "bsci.h"
12#include "demux.h"
13#include "pmi_serv.h"
14
15static char *pmi_port_str = NULL;
16static char *proxy_port_str = NULL;
17
18static void *launch_helper(void *args)
19{
20    struct HYD_Partition *partition = (struct HYD_Partition *) args;
21    enum HYD_PMCD_pmi_proxy_cmds cmd;
22    HYD_Status status = HYD_SUCCESS;
23
24    /*
25     * Here are the steps we will follow:
26     *
27     * 1. Put all the arguments to pass in to a string list.
28     *
29     * 2. Connect to the proxy (this will be our primary control
30     *    socket).
31     *
32     * 3. Read this string list and write the following to the socket:
33     *    (a) The PROC_INFO command.
34     *    (b) Integer sized data with the number of arguments to
35     *        follow.
36     *    (c) For each argument to pass, first send an integer which
37     *        tells the proxy how many bytes are coming in that
38     *        argument.
39     *
40     * 4. Open two new sockets and connect them to the proxy.
41     *
42     * 5. On the first new socket, send USE_AS_STDOUT and the second
43     *    send USE_AS_STDERR.
44     *
45     * 6. For the first process, open a separate socket and send the
46     *    USE_AS_STDIN command on it.
47     *
48     * 7. We need to figure out what to do with the LAUNCH_JOB
49     *    command; since it's going on a different socket, it might go
50     *    out-of-order. Maybe a state machine on the proxy to see if
51     *    it got all the information it needs to launch the job would
52     *    work.
53     */
54
55    status = HYDU_sock_connect(partition->base->name, HYD_handle.proxy_port,
56                               &partition->control_fd);
57    HYDU_ERR_POP(status, "unable to connect to proxy\n");
58
59    status = HYD_PMCD_pmi_send_exec_info(partition);
60    HYDU_ERR_POP(status, "error sending executable info\n");
61
62    /* Create an stdout socket */
63    status = HYDU_sock_connect(partition->base->name, HYD_handle.proxy_port,
64                               &partition->base->out);
65    HYDU_ERR_POP(status, "unable to connect to proxy\n");
66
67    cmd = USE_AS_STDOUT;
68    status = HYDU_sock_write(partition->base->out, &cmd, sizeof(enum HYD_PMCD_pmi_proxy_cmds));
69    HYDU_ERR_POP(status, "unable to write data to proxy\n");
70
71    /* Create an stderr socket */
72    status = HYDU_sock_connect(partition->base->name, HYD_handle.proxy_port,
73                               &partition->base->err);
74    HYDU_ERR_POP(status, "unable to connect to proxy\n");
75
76    cmd = USE_AS_STDERR;
77    status = HYDU_sock_write(partition->base->err, &cmd, sizeof(enum HYD_PMCD_pmi_proxy_cmds));
78    HYDU_ERR_POP(status, "unable to write data to proxy\n");
79
80    /* If rank 0 is here, create an stdin socket */
81    if (partition->base->partition_id == 0) {
82        status = HYDU_sock_connect(partition->base->name, HYD_handle.proxy_port,
83                                   &partition->base->in);
84        HYDU_ERR_POP(status, "unable to connect to proxy\n");
85
86        cmd = USE_AS_STDIN;
87        status = HYDU_sock_write(partition->base->in, &cmd,
88                                 sizeof(enum HYD_PMCD_pmi_proxy_cmds));
89        HYDU_ERR_POP(status, "unable to write data to proxy\n");
90    }
91
92  fn_exit:
93    return NULL;
94
95  fn_fail:
96    goto fn_exit;
97}
98
99static HYD_Status
100create_and_listen_portstr(HYD_Status(*callback) (int fd, HYD_Event_t events, void *userp),
101                          char **port_str)
102{
103    int listenfd;
104    char *port_range, *sport;
105    uint16_t port;
106    char hostname[MAX_HOSTNAME_LEN];
107    HYD_Status status = HYD_SUCCESS;
108
109    /* Check if the user wants us to use a port within a certain
110     * range. */
111    port_range = getenv("MPIEXEC_PORTRANGE");
112    if (!port_range)
113        port_range = getenv("MPIEXEC_PORT_RANGE");
114    if (!port_range)
115        port_range = getenv("MPICH_PORT_RANGE");
116
117    /* Listen on a port in the port range */
118    port = 0;
119    status = HYDU_sock_listen(&listenfd, port_range, &port);
120    HYDU_ERR_POP(status, "unable to listen on port\n");
121
122    /* Register the listening socket with the demux engine */
123    status = HYD_DMX_register_fd(1, &listenfd, HYD_STDOUT, NULL, callback);
124    HYDU_ERR_POP(status, "unable to register fd\n");
125
126    /* Create a port string for MPI processes to use to connect to */
127    if (gethostname(hostname, MAX_HOSTNAME_LEN) < 0)
128        HYDU_ERR_SETANDJUMP2(status, HYD_SOCK_ERROR,
129                             "gethostname error (hostname: %s; errno: %d)\n", hostname, errno);
130
131    sport = HYDU_int_to_str(port);
132    HYDU_MALLOC(*port_str, char *, strlen(hostname) + 1 + strlen(sport) + 1, status);
133    HYDU_snprintf(*port_str, strlen(hostname) + 1 + strlen(sport) + 1,
134                  "%s:%s", hostname, sport);
135    HYDU_FREE(sport);
136
137  fn_exit:
138    return status;
139
140  fn_fail:
141    goto fn_exit;
142}
143
144static HYD_Status fill_in_proxy_args(HYD_Launch_mode_t mode, char **proxy_args)
145{
146    int i, arg;
147    char *path_str[HYD_NUM_TMP_STRINGS];
148    HYD_Status status = HYD_SUCCESS;
149
150    if (mode != HYD_LAUNCH_RUNTIME && mode != HYD_LAUNCH_BOOT &&
151        mode != HYD_LAUNCH_BOOT_FOREGROUND)
152        goto fn_exit;
153
154    arg = 0;
155    i = 0;
156    path_str[i++] = HYDU_strdup(HYD_handle.base_path);
157    path_str[i++] = HYDU_strdup("pmi_proxy");
158    path_str[i] = NULL;
159    status = HYDU_str_alloc_and_join(path_str, &proxy_args[arg++]);
160    HYDU_ERR_POP(status, "unable to join strings\n");
161    HYDU_free_strlist(path_str);
162
163    proxy_args[arg++] = HYDU_strdup("--launch-mode");
164    proxy_args[arg++] = HYDU_int_to_str(mode);
165
166    proxy_args[arg++] = HYDU_strdup("--proxy-port");
167    if (mode == HYD_LAUNCH_RUNTIME)
168        proxy_args[arg++] = HYDU_strdup(proxy_port_str);
169    else
170        proxy_args[arg++] = HYDU_int_to_str(HYD_handle.proxy_port);
171
172    if (HYD_handle.debug)
173        proxy_args[arg++] = HYDU_strdup("--debug");
174
175    if (HYD_handle.enablex != -1) {
176        proxy_args[arg++] = HYDU_strdup("--enable-x");
177        proxy_args[arg++] = HYDU_int_to_str(HYD_handle.enablex);
178    }
179
180    proxy_args[arg++] = HYDU_strdup("--bootstrap");
181    proxy_args[arg++] = HYDU_strdup(HYD_handle.bootstrap);
182
183    if (HYD_handle.bootstrap_exec) {
184        proxy_args[arg++] = HYDU_strdup("--bootstrap-exec");
185        proxy_args[arg++] = HYDU_strdup(HYD_handle.bootstrap_exec);
186    }
187
188    proxy_args[arg++] = NULL;
189
190  fn_exit:
191    return status;
192
193  fn_fail:
194    goto fn_exit;
195}
196
197static HYD_Status fill_in_exec_args(void)
198{
199    int i, arg, process_id;
200    int inherited_env_count, user_env_count, system_env_count;
201    int segment_count, exec_count, total_args;
202    static int proxy_count = 0;
203    HYD_Env_t *env;
204    struct HYD_Partition *partition;
205    struct HYD_Partition_exec *exec;
206    struct HYD_Partition_segment *segment;
207    HYD_Status status = HYD_SUCCESS;
208
209    /* Create the arguments list for each proxy */
210    process_id = 0;
211    FORALL_ACTIVE_PARTITIONS(partition, HYD_handle.partition_list) {
212        for (inherited_env_count = 0, env = HYD_handle.inherited_env; env;
213             env = env->next, inherited_env_count++);
214        for (user_env_count = 0, env = HYD_handle.user_env; env;
215             env = env->next, user_env_count++);
216        for (system_env_count = 0, env = HYD_handle.system_env; env;
217             env = env->next, system_env_count++);
218
219        for (segment_count = 0, segment = partition->segment_list; segment;
220             segment = segment->next)
221            segment_count++;
222
223        for (exec_count = 0, exec = partition->exec_list; exec; exec = exec->next)
224            exec_count++;
225
226        total_args = HYD_NUM_TMP_STRINGS; /* For the basic arguments */
227
228        /* Environments */
229        total_args += inherited_env_count;
230        total_args += user_env_count;
231        total_args += system_env_count;
232
233        /* For each segment add a few strings */
234        total_args += (segment_count * HYD_NUM_TMP_STRINGS);
235
236        /* For each exec add a few strings */
237        total_args += (exec_count * HYD_NUM_TMP_STRINGS);
238
239        HYDU_MALLOC(partition->base->exec_args, char **, total_args * sizeof(char *),
240                    status);
241
242        arg = 0;
243        partition->base->exec_args[arg++] = HYDU_strdup("--global-core-count");
244        partition->base->exec_args[arg++] = HYDU_int_to_str(HYD_handle.global_core_count);
245
246        partition->base->exec_args[arg++] = HYDU_strdup("--wdir");
247        partition->base->exec_args[arg++] = HYDU_strdup(HYD_handle.wdir);
248
249        partition->base->exec_args[arg++] = HYDU_strdup("--pmi-port-str");
250        if (HYD_handle.pm_env)
251            partition->base->exec_args[arg++] = HYDU_strdup(pmi_port_str);
252        else
253            partition->base->exec_args[arg++] = HYDU_strdup("HYDRA_NULL");
254
255#if defined HAVE_PROC_BINDING
256        partition->base->exec_args[arg++] = HYDU_strdup("--binding");
257        partition->base->exec_args[arg++] = HYDU_int_to_str(HYD_handle.binding);
258        if (HYD_handle.user_bind_map)
259            partition->base->exec_args[arg++] = HYDU_strdup(HYD_handle.user_bind_map);
260        else if (partition->user_bind_map)
261            partition->base->exec_args[arg++] = HYDU_strdup(partition->user_bind_map);
262        else
263            partition->base->exec_args[arg++] = HYDU_strdup("HYDRA_NULL");
264#else
265        if (HYD_handle.binding != HYD_BIND_UNSET && HYD_handle.binding != HYD_BIND_NONE)
266            HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR,
267                                "Hydra not configured with process binding\n");
268#endif /* HAVE_PROC_BINDING */
269
270        partition->base->exec_args[arg++] = HYDU_strdup("--inherited-env");
271        for (i = 0, env = HYD_handle.inherited_env; env; env = env->next, i++);
272        partition->base->exec_args[arg++] = HYDU_int_to_str(i);
273        partition->base->exec_args[arg++] = NULL;
274        HYDU_list_append_env_to_str(HYD_handle.inherited_env, partition->base->exec_args);
275
276        arg = HYDU_strlist_lastidx(partition->base->exec_args);
277        partition->base->exec_args[arg++] = HYDU_strdup("--user-env");
278        for (i = 0, env = HYD_handle.user_env; env; env = env->next, i++);
279        partition->base->exec_args[arg++] = HYDU_int_to_str(i);
280        partition->base->exec_args[arg++] = NULL;
281        HYDU_list_append_env_to_str(HYD_handle.user_env, partition->base->exec_args);
282
283        arg = HYDU_strlist_lastidx(partition->base->exec_args);
284        partition->base->exec_args[arg++] = HYDU_strdup("--system-env");
285        for (i = 0, env = HYD_handle.system_env; env; env = env->next, i++);
286        partition->base->exec_args[arg++] = HYDU_int_to_str(i);
287        partition->base->exec_args[arg++] = NULL;
288        HYDU_list_append_env_to_str(HYD_handle.system_env, partition->base->exec_args);
289
290        arg = HYDU_strlist_lastidx(partition->base->exec_args);
291        partition->base->exec_args[arg++] = HYDU_strdup("--genv-prop");
292        partition->base->exec_args[arg++] = HYDU_int_to_str(HYD_handle.prop);
293        partition->base->exec_args[arg++] = NULL;
294
295        /* Pass the segment information */
296        for (segment = partition->segment_list; segment; segment = segment->next) {
297            arg = HYDU_strlist_lastidx(partition->base->exec_args);
298            partition->base->exec_args[arg++] = HYDU_strdup("--segment");
299
300            partition->base->exec_args[arg++] = HYDU_strdup("--segment-start-pid");
301            partition->base->exec_args[arg++] = HYDU_int_to_str(segment->start_pid);
302
303            partition->base->exec_args[arg++] = HYDU_strdup("--segment-proc-count");
304            partition->base->exec_args[arg++] = HYDU_int_to_str(segment->proc_count);
305            partition->base->exec_args[arg++] = NULL;
306        }
307
308        /* Now pass the local executable information */
309        for (exec = partition->exec_list; exec; exec = exec->next) {
310            arg = HYDU_strlist_lastidx(partition->base->exec_args);
311            partition->base->exec_args[arg++] = HYDU_strdup("--exec");
312
313            partition->base->exec_args[arg++] = HYDU_strdup("--exec-proc-count");
314            partition->base->exec_args[arg++] = HYDU_int_to_str(exec->proc_count);
315
316            partition->base->exec_args[arg++] = HYDU_strdup("--exec-local-env");
317            for (i = 0, env = exec->user_env; env; env = env->next, i++);
318            partition->base->exec_args[arg++] = HYDU_int_to_str(i);
319            partition->base->exec_args[arg++] = NULL;
320            HYDU_list_append_env_to_str(exec->user_env, partition->base->exec_args);
321
322            arg = HYDU_strlist_lastidx(partition->base->exec_args);
323            partition->base->exec_args[arg++] = HYDU_strdup("--exec-env-prop");
324            partition->base->exec_args[arg++] = HYDU_int_to_str(exec->prop);
325            partition->base->exec_args[arg++] = NULL;
326
327            HYDU_list_append_strlist(exec->exec, partition->base->exec_args);
328
329            process_id += exec->proc_count;
330        }
331
332        if (HYD_handle.debug) {
333            printf("Arguments being passed to proxy %d:\n", proxy_count++);
334            HYDU_print_strlist(partition->base->exec_args);
335            printf("\n");
336        }
337    }
338
339  fn_exit:
340    return status;
341
342  fn_fail:
343    goto fn_exit;
344}
345
346HYD_Status HYD_PMCI_launch_procs(void)
347{
348    struct HYD_Partition *partition;
349    enum HYD_PMCD_pmi_proxy_cmds cmd;
350    int fd, len, id;
351#if defined HAVE_THREAD_SUPPORT
352    struct HYD_Thread_context *thread_context = NULL;
353#endif /* HAVE_THREAD_SUPPORT */
354    char *proxy_args[HYD_NUM_TMP_STRINGS] = { NULL };
355    HYD_Status status = HYD_SUCCESS;
356
357    HYDU_FUNC_ENTER();
358
359    status = HYDU_set_common_signals(HYD_PMCD_pmi_serv_signal_cb);
360    HYDU_ERR_POP(status, "unable to set signal\n");
361
362    /* Initialize PMI */
363    status = create_and_listen_portstr(HYD_PMCD_pmi_connect_cb, &pmi_port_str);
364    HYDU_ERR_POP(status, "unable to create PMI port\n");
365    HYDU_Debug(HYD_handle.debug, "Got a PMI port string of %s\n", pmi_port_str);
366
367    status = HYD_PMCD_pmi_init();
368    HYDU_ERR_POP(status, "unable to create process group\n");
369
370    if (HYD_handle.launch_mode == HYD_LAUNCH_RUNTIME) {
371        status = create_and_listen_portstr(HYD_PMCD_pmi_serv_control_connect_cb,
372                                           &proxy_port_str);
373        HYDU_ERR_POP(status, "unable to create PMI port\n");
374        HYDU_Debug(HYD_handle.debug, "Got a proxy port string of %s\n", proxy_port_str);
375
376        status = fill_in_proxy_args(HYD_handle.launch_mode, proxy_args);
377        HYDU_ERR_POP(status, "unable to fill in proxy arguments\n");
378
379        status = fill_in_exec_args();
380        HYDU_ERR_POP(status, "unable to fill in executable arguments\n");
381
382        status = HYD_BSCI_launch_procs(proxy_args, "--partition-id", HYD_handle.partition_list);
383        HYDU_ERR_POP(status, "bootstrap server cannot launch processes\n");
384    }
385    else if (HYD_handle.launch_mode == HYD_LAUNCH_BOOT ||
386             HYD_handle.launch_mode == HYD_LAUNCH_BOOT_FOREGROUND) {
387        status = fill_in_proxy_args(HYD_handle.launch_mode, proxy_args);
388        HYDU_ERR_POP(status, "unable to fill in proxy arguments\n");
389
390        status = HYD_BSCI_launch_procs(proxy_args, "--partition-id", HYD_handle.partition_list);
391        HYDU_ERR_POP(status, "bootstrap server cannot launch processes\n");
392    }
393    else if (HYD_handle.launch_mode == HYD_LAUNCH_SHUTDOWN) {
394        FORALL_ACTIVE_PARTITIONS(partition, HYD_handle.partition_list) {
395            status = HYDU_sock_connect(partition->base->name, HYD_handle.proxy_port, &fd);
396            if (status != HYD_SUCCESS) {
397                /* Don't abort. Try to shutdown as many proxies as possible */
398                HYDU_Error_printf("Unable to connect to proxy at %s\n", partition->base->name);
399                continue;
400            }
401
402            cmd = PROXY_SHUTDOWN;
403            status = HYDU_sock_write(fd, &cmd, sizeof(enum HYD_PMCD_pmi_proxy_cmds));
404            HYDU_ERR_POP(status, "unable to write data to proxy\n");
405
406            close(fd);
407        }
408    }
409    else if (HYD_handle.launch_mode == HYD_LAUNCH_PERSISTENT) {
410        status = fill_in_exec_args();
411        HYDU_ERR_POP(status, "unable to fill in proxy arguments\n");
412
413        len = 0;
414        FORALL_ACTIVE_PARTITIONS(partition, HYD_handle.partition_list)
415            len++;
416
417#if defined HAVE_THREAD_SUPPORT
418        HYDU_CALLOC(thread_context, struct HYD_Thread_context *, len,
419                    sizeof(struct HYD_Thread_context), status);
420        if (!thread_context)
421            HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR,
422                                "Unable to allocate memory for thread context\n");
423#endif /* HAVE_THREAD_SUPPORT */
424
425        id = 0;
426        FORALL_ACTIVE_PARTITIONS(partition, HYD_handle.partition_list) {
427#if defined HAVE_THREAD_SUPPORT
428            HYDU_create_thread(launch_helper, (void *) partition, &thread_context[id]);
429#else
430            launch_helper(partition);
431#endif /* HAVE_THREAD_SUPPORT */
432            id++;
433        }
434
435        id = 0;
436        FORALL_ACTIVE_PARTITIONS(partition, HYD_handle.partition_list) {
437#if defined HAVE_THREAD_SUPPORT
438            HYDU_join_thread(thread_context[id]);
439#endif /* HAVE_THREAD_SUPPORT */
440
441            status = HYD_DMX_register_fd(1, &partition->control_fd, HYD_STDOUT, partition,
442                                         HYD_PMCD_pmi_serv_control_cb);
443            HYDU_ERR_POP(status, "unable to register control fd\n");
444
445            id++;
446        }
447    }
448
449  fn_exit:
450    if (pmi_port_str)
451        HYDU_FREE(pmi_port_str);
452    if (proxy_port_str)
453        HYDU_FREE(proxy_port_str);
454#if defined HAVE_THREAD_SUPPORT
455    if (thread_context)
456        HYDU_FREE(thread_context);
457#endif /* HAVE_THREAD_SUPPORT */
458    HYDU_free_strlist(proxy_args);
459    HYDU_FUNC_EXIT();
460    return status;
461
462  fn_fail:
463    goto fn_exit;
464}
465
466
467HYD_Status HYD_PMCI_wait_for_completion(void)
468{
469    struct HYD_Partition *partition;
470    int sockets_open, all_procs_exited;
471    HYD_Status status = HYD_SUCCESS;
472
473    HYDU_FUNC_ENTER();
474
475    if ((HYD_handle.launch_mode == HYD_LAUNCH_BOOT) || (HYD_handle.launch_mode == HYD_LAUNCH_SHUTDOWN)) {
476        status = HYD_SUCCESS;
477    }
478    else {
479        while (1) {
480            /* Wait for some event to occur */
481            status = HYD_DMX_wait_for_event(HYDU_time_left(HYD_handle.start, HYD_handle.timeout));
482            HYDU_ERR_POP(status, "error waiting for event\n");
483
484            /* If the timeout expired, raise a SIGINT and kill all the
485             * processes */
486            if (HYDU_time_left(HYD_handle.start, HYD_handle.timeout) == 0)
487                raise(SIGINT);
488
489            /* Check to see if there's any open read socket left; if
490             * there are, we will just wait for more events. */
491            sockets_open = 0;
492            FORALL_ACTIVE_PARTITIONS(partition, HYD_handle.partition_list) {
493                if (partition->base->out != -1 || partition->base->err != -1) {
494                    sockets_open++;
495                    break;
496                }
497            }
498
499            if (sockets_open && HYDU_time_left(HYD_handle.start, HYD_handle.timeout))
500                continue;
501
502            break;
503        }
504
505        do {
506            /* Check if the exit status has already arrived */
507            all_procs_exited = 1;
508            FORALL_ACTIVE_PARTITIONS(partition, HYD_handle.partition_list) {
509                if (partition->exit_status == NULL) {
510                    all_procs_exited = 0;
511                    break;
512                }
513            }
514
515            if (all_procs_exited)
516                break;
517
518            /* If not, wait for some event to occur */
519            status = HYD_DMX_wait_for_event(HYDU_time_left(HYD_handle.start, HYD_handle.timeout));
520            HYDU_ERR_POP(status, "error waiting for event\n");
521        } while (1);
522    }
523
524  fn_exit:
525    HYDU_FUNC_EXIT();
526    return status;
527
528  fn_fail:
529    goto fn_exit;
530}
Note: See TracBrowser for help on using the browser.