root/mpich2/trunk/src/pm/hydra/pm/pmiserv/pmi_handle_v2.c @ 4887

Revision 4887, 24.7 KB (checked in by balaji, 5 months ago)

Warning stomp.

Line 
1/* -*- Mode: C; c-basic-offset:4 ; -*- */
2/*
3 *  (C) 2008 by Argonne National Laboratory.
4 *      See COPYRIGHT in top-level directory.
5 */
6
7#include "hydra.h"
8#include "hydra_utils.h"
9#include "bsci.h"
10#include "demux.h"
11#include "pmi_handle.h"
12#include "pmi_handle_v2.h"
13
14/* TODO: abort, create_kvs, destroy_kvs, getbyidx, spawn */
15static struct HYD_PMCD_pmi_handle_fns pmi_v2_handle_fns_foo[] = {
16    {"fullinit", HYD_PMCD_pmi_handle_v2_fullinit},
17    {"job-getid", HYD_PMCD_pmi_handle_v2_job_getid},
18    {"info-putnodeattr", HYD_PMCD_pmi_handle_v2_info_putnodeattr},
19    {"info-getnodeattr", HYD_PMCD_pmi_handle_v2_info_getnodeattr},
20    {"info-getjobattr", HYD_PMCD_pmi_handle_v2_info_getjobattr},
21    {"kvs-put", HYD_PMCD_pmi_handle_v2_kvs_put},
22    {"kvs-get", HYD_PMCD_pmi_handle_v2_kvs_get},
23    {"kvs-fence", HYD_PMCD_pmi_handle_v2_kvs_fence},
24    {"finalize", HYD_PMCD_pmi_handle_v2_finalize},
25    {"\0", NULL}
26};
27
28static struct HYD_PMCD_pmi_handle pmi_v2_foo = { PMI_V2_DELIM, pmi_v2_handle_fns_foo };
29
30struct HYD_PMCD_pmi_handle *HYD_PMCD_pmi_v2 = &pmi_v2_foo;
31
32struct token {
33    char *key;
34    char *val;
35};
36
37static struct attr_reqs {
38    int fd;
39    char *thrid;
40    char **req;
41    enum req_type {
42        GET_NODE_ATTR,
43        KVS_GET
44    } type;
45    struct attr_reqs *next;
46} *outstanding_attr_reqs = NULL;
47
48static void print_attr_reqs(void)
49{
50    int i;
51    struct attr_reqs *areq;
52
53    dprintf("Outstanding reqs: ");
54    for (areq = outstanding_attr_reqs; areq; areq = areq->next) {
55        dprintf("%d:%d(", areq->fd, areq->type);
56        for (i = 0; areq->req[i]; i++) {
57            dprintf("%s", areq->req[i]);
58            if (areq->req[i + 1]) {
59                dprintf(",");
60            }
61        }
62        dprintf(") ");
63    }
64    dprintf("\n");
65}
66
67static HYD_Status send_command(int fd, char *cmd)
68{
69    char cmdlen[7];
70    HYD_Status status = HYD_SUCCESS;
71
72    HYDU_FUNC_ENTER();
73
74    HYDU_snprintf(cmdlen, 7, "%6d", strlen(cmd));
75    status = HYDU_sock_write(fd, cmdlen, 6);
76    HYDU_ERR_POP(status, "error writing PMI line\n");
77
78    status = HYDU_sock_write(fd, cmd, strlen(cmd));
79    HYDU_ERR_POP(status, "error writing PMI line\n");
80
81  fn_exit:
82    HYDU_FUNC_EXIT();
83    return status;
84
85  fn_fail:
86    goto fn_exit;
87}
88
89
90static HYD_Status args_to_tokens(char *args[], struct token **tokens, int *count)
91{
92    int i;
93    char *arg;
94    HYD_Status status = HYD_SUCCESS;
95
96    for (i = 0; args[i]; i++);
97    *count = i;
98    HYDU_MALLOC(*tokens, struct token *, *count * sizeof(struct token), status);
99
100    for (i = 0; args[i]; i++) {
101        arg = HYDU_strdup(args[i]);
102        (*tokens)[i].key = strtok(arg, "=");
103        (*tokens)[i].val = strtok(NULL, "=");
104    }
105
106  fn_exit:
107    return status;
108
109  fn_fail:
110    goto fn_exit;
111}
112
113static int progress_nest_count = 0;
114static int req_complete = 0;
115
116static void free_attr_req(struct attr_reqs *areq)
117{
118    HYDU_free_strlist(areq->req);
119    HYDU_FREE(areq);
120}
121
122static HYD_Status queue_outstanding_req(int fd, enum req_type req_type, char *args[])
123{
124    struct attr_reqs *attr_req, *a;
125    HYD_Status status = HYD_SUCCESS;
126
127    HYDU_MALLOC(attr_req, struct attr_reqs *, sizeof(struct attr_reqs), status);
128    attr_req->fd = fd;
129    attr_req->type = req_type;
130    attr_req->next = NULL;
131
132    status = HYDU_strdup_list(args, &attr_req->req);
133    HYDU_ERR_POP(status, "unable to dup args\n");
134
135    if (outstanding_attr_reqs == NULL)
136        outstanding_attr_reqs = attr_req;
137    else {
138        a = outstanding_attr_reqs;
139        while (a->next)
140            a = a->next;
141        a->next = attr_req;
142    }
143    print_attr_reqs();
144
145  fn_exit:
146    return status;
147
148  fn_fail:
149    goto fn_exit;
150}
151
152static HYD_Status poke_progress(void)
153{
154    struct attr_reqs *areq, *tmp;
155    HYD_Status status = HYD_SUCCESS;
156
157    progress_nest_count++;
158
159    if (outstanding_attr_reqs == NULL)
160        goto fn_exit;
161
162    for (areq = outstanding_attr_reqs; areq;) {
163        req_complete = 0;
164
165        if (areq->type == GET_NODE_ATTR) {
166            status = HYD_PMCD_pmi_handle_v2_info_getnodeattr(areq->fd, areq->req);
167            HYDU_ERR_POP(status, "getnodeattr returned error\n");
168        }
169        else if (areq->type == KVS_GET) {
170            status = HYD_PMCD_pmi_handle_v2_kvs_get(areq->fd, areq->req);
171            HYDU_ERR_POP(status, "kvs_get returned error\n");
172        }
173
174        tmp = areq->next;
175        if (req_complete) {
176            if (areq == outstanding_attr_reqs) {
177                outstanding_attr_reqs = areq->next;
178            }
179            else {
180                for (tmp = outstanding_attr_reqs; tmp->next != areq; tmp = tmp->next);
181                tmp->next = areq->next;
182            }
183            tmp = areq->next;
184            free_attr_req(areq);
185        }
186
187        areq = tmp;
188        print_attr_reqs();
189    }
190
191  fn_exit:
192    progress_nest_count--;
193    return status;
194
195  fn_fail:
196    goto fn_exit;
197}
198
199
200static char *find_token_keyval(struct token *tokens, int count, const char *key)
201{
202    int i;
203
204    for (i = 0; i < count; i++) {
205        if (!strcmp(tokens[i].key, key))
206            return tokens[i].val;
207    }
208
209    return NULL;
210}
211
212
213HYD_Status HYD_PMCD_pmi_handle_v2_fullinit(int fd, char *args[])
214{
215    int id, rank, i;
216    char *tmp[HYD_NUM_TMP_STRINGS], *cmd, *rank_str;
217    HYD_PMCD_pmi_pg_t *run;
218    struct token *tokens;
219    int token_count;
220    HYD_Status status = HYD_SUCCESS;
221
222    HYDU_FUNC_ENTER();
223
224    status = args_to_tokens(args, &tokens, &token_count);
225    HYDU_ERR_POP(status, "unable to convert args to tokens\n");
226
227    rank_str = find_token_keyval(tokens, token_count, "pmirank");
228    HYDU_ERR_CHKANDJUMP(status, rank_str == NULL, HYD_INTERNAL_ERROR,
229                        "unable to find pmirank token\n");
230    id = atoi(rank_str);
231
232    i = 0;
233    tmp[i++] = HYDU_strdup("cmd=fullinit-response;pmi-version=2;pmi-subversion=0;rank=");
234
235    status = HYD_PMCD_pmi_id_to_rank(id, &rank);
236    HYDU_ERR_POP(status, "unable to convert ID to rank\n");
237    tmp[i++] = HYDU_int_to_str(rank);
238
239    tmp[i++] = HYDU_strdup(";size=");
240    tmp[i++] = HYDU_int_to_str(HYD_pg_list->num_procs);
241    tmp[i++] = HYDU_strdup(";appnum=0;debugged=FALSE;pmiverbose=0;rc=0;");
242    tmp[i++] = NULL;
243
244    status = HYDU_str_alloc_and_join(tmp, &cmd);
245    HYDU_ERR_POP(status, "error while joining strings\n");
246
247    for (i = 0; tmp[i]; i++)
248        HYDU_FREE(tmp[i]);
249
250    status = send_command(fd, cmd);
251    HYDU_ERR_POP(status, "send command failed\n");
252
253    HYDU_FREE(cmd);
254
255    run = HYD_pg_list;
256    while (run->next)
257        run = run->next;
258
259    /* Add the process to the last PG */
260    status = HYD_PMCD_pmi_add_process_to_pg(run, fd, rank);
261    HYDU_ERR_POP(status, "unable to add process to pg\n");
262
263  fn_exit:
264    HYDU_FUNC_EXIT();
265    return status;
266
267  fn_fail:
268    goto fn_exit;
269}
270
271
272HYD_Status HYD_PMCD_pmi_handle_v2_job_getid(int fd, char *args[])
273{
274    char *tmp[HYD_NUM_TMP_STRINGS], *cmd, *thrid;
275    int i;
276    HYD_PMCD_pmi_process_t *process;
277    struct token *tokens;
278    int token_count;
279    HYD_Status status = HYD_SUCCESS;
280
281    HYDU_FUNC_ENTER();
282
283    status = args_to_tokens(args, &tokens, &token_count);
284    HYDU_ERR_POP(status, "unable to convert args to tokens\n");
285
286    thrid = find_token_keyval(tokens, token_count, "thrid");
287
288    /* Find the group id corresponding to this fd */
289    process = HYD_PMCD_pmi_find_process(fd);
290    if (process == NULL)        /* We didn't find the process */
291        HYDU_ERR_SETANDJUMP1(status, HYD_INTERNAL_ERROR,
292                             "unable to find process structure for fd %d\n", fd);
293
294    i = 0;
295    tmp[i++] = HYDU_strdup("cmd=job-getid-response;");
296    if (thrid) {
297        tmp[i++] = HYDU_strdup("thrid=");
298        tmp[i++] = HYDU_strdup(thrid);
299        tmp[i++] = HYDU_strdup(";");
300    }
301    tmp[i++] = HYDU_strdup("jobid=");
302    tmp[i++] = HYDU_strdup(process->node->pg->kvs->kvs_name);
303    tmp[i++] = HYDU_strdup(";rc=0;");
304    tmp[i++] = NULL;
305
306    status = HYDU_str_alloc_and_join(tmp, &cmd);
307    HYDU_ERR_POP(status, "unable to join strings\n");
308
309    HYDU_free_strlist(tmp);
310
311    status = send_command(fd, cmd);
312    HYDU_ERR_POP(status, "send command failed\n");
313
314    HYDU_FREE(cmd);
315
316  fn_exit:
317    HYDU_FUNC_EXIT();
318    return status;
319
320  fn_fail:
321    goto fn_exit;
322}
323
324
325HYD_Status HYD_PMCD_pmi_handle_v2_info_putnodeattr(int fd, char *args[])
326{
327    char *tmp[HYD_NUM_TMP_STRINGS], *cmd, *key_pair_str;
328    char *key, *val, *thrid;
329    int i, ret;
330    HYD_PMCD_pmi_process_t *process;
331    struct token *tokens;
332    int token_count;
333    HYD_Status status = HYD_SUCCESS;
334
335    HYDU_FUNC_ENTER();
336
337    status = args_to_tokens(args, &tokens, &token_count);
338    HYDU_ERR_POP(status, "unable to convert args to tokens\n");
339
340    key = find_token_keyval(tokens, token_count, "key");
341    HYDU_ERR_CHKANDJUMP(status, key == NULL, HYD_INTERNAL_ERROR, "unable to find key token\n");
342
343    val = find_token_keyval(tokens, token_count, "value");
344    HYDU_ERR_CHKANDJUMP(status, val == NULL, HYD_INTERNAL_ERROR,
345                        "unable to find value token\n");
346
347    thrid = find_token_keyval(tokens, token_count, "thrid");
348
349    /* Find the group id corresponding to this fd */
350    process = HYD_PMCD_pmi_find_process(fd);
351    if (process == NULL)        /* We didn't find the process */
352        HYDU_ERR_SETANDJUMP1(status, HYD_INTERNAL_ERROR,
353                             "unable to find process structure for fd %d\n", fd);
354
355    status = HYD_PMCD_pmi_add_kvs(key, val, process->node->kvs, &key_pair_str, &ret);
356    HYDU_ERR_POP(status, "unable to put data into kvs\n");
357
358    i = 0;
359    tmp[i++] = HYDU_strdup("cmd=info-putnodeattr-response;");
360    if (thrid) {
361        tmp[i++] = HYDU_strdup("thrid=");
362        tmp[i++] = HYDU_strdup(thrid);
363        tmp[i++] = HYDU_strdup(";");
364    }
365    tmp[i++] = HYDU_strdup("rc=");
366    tmp[i++] = HYDU_int_to_str(ret);
367    tmp[i++] = HYDU_strdup(";");
368    tmp[i++] = NULL;
369
370    status = HYDU_str_alloc_and_join(tmp, &cmd);
371    HYDU_ERR_POP(status, "unable to join strings\n");
372
373    HYDU_free_strlist(tmp);
374
375    status = send_command(fd, cmd);
376    HYDU_ERR_POP(status, "send command failed\n");
377
378    HYDU_FREE(cmd);
379
380    /* Poke the progress engine before exiting */
381    status = poke_progress();
382    HYDU_ERR_POP(status, "poke progress error\n");
383
384  fn_exit:
385    HYDU_FUNC_EXIT();
386    return status;
387
388  fn_fail:
389    goto fn_exit;
390}
391
392
393HYD_Status HYD_PMCD_pmi_handle_v2_info_getnodeattr(int fd, char *args[])
394{
395    int i, found;
396    HYD_PMCD_pmi_process_t *process;
397    HYD_PMCD_pmi_kvs_pair_t *run;
398    char *key, *waitval, *thrid;
399    char *tmp[HYD_NUM_TMP_STRINGS] = { 0 }, *cmd;
400    struct token *tokens;
401    int token_count;
402    HYD_Status status = HYD_SUCCESS;
403
404    HYDU_FUNC_ENTER();
405
406    status = args_to_tokens(args, &tokens, &token_count);
407    HYDU_ERR_POP(status, "unable to convert args to tokens\n");
408
409    key = find_token_keyval(tokens, token_count, "key");
410    HYDU_ERR_CHKANDJUMP(status, key == NULL, HYD_INTERNAL_ERROR, "unable to find key token\n");
411
412    waitval = find_token_keyval(tokens, token_count, "wait");
413    thrid = find_token_keyval(tokens, token_count, "thrid");
414
415    /* Find the group id corresponding to this fd */
416    process = HYD_PMCD_pmi_find_process(fd);
417    if (process == NULL)        /* We didn't find the process */
418        HYDU_ERR_SETANDJUMP1(status, HYD_INTERNAL_ERROR,
419                             "unable to find process structure for fd %d\n", fd);
420
421    found = 0;
422    for (run = process->node->kvs->key_pair; run; run = run->next) {
423        if (!strcmp(run->key, key)) {
424            found = 1;
425            break;
426        }
427    }
428
429    if (!found) {       /* We need to decide whether to return not found or queue up */
430        /* If we are already nested, get out of here */
431        if (progress_nest_count)
432            goto fn_exit;
433
434        if (waitval && !strcmp(waitval, "TRUE")) {
435            /* queue up */
436            status = queue_outstanding_req(fd, GET_NODE_ATTR, args);
437            HYDU_ERR_POP(status, "unable to queue outstanding request\n");
438        }
439        else {
440            /* Tell the client that we can't find the attribute */
441            i = 0;
442            tmp[i++] = HYDU_strdup("cmd=info-getnodeattr-response;");
443            if (thrid) {
444                tmp[i++] = HYDU_strdup("thrid=");
445                tmp[i++] = HYDU_strdup(thrid);
446                tmp[i++] = HYDU_strdup(";");
447            }
448            tmp[i++] = HYDU_strdup("found=FALSE;rc=0;");
449            tmp[i++] = NULL;
450
451            status = HYDU_str_alloc_and_join(tmp, &cmd);
452            HYDU_ERR_POP(status, "unable to join strings\n");
453            HYDU_free_strlist(tmp);
454
455            status = send_command(fd, cmd);
456            HYDU_ERR_POP(status, "send command failed\n");
457            HYDU_FREE(cmd);
458        }
459    }
460    else {      /* We found the attribute */
461        i = 0;
462        tmp[i++] = HYDU_strdup("cmd=info-getnodeattr-response;");
463        if (thrid) {
464            tmp[i++] = HYDU_strdup("thrid=");
465            tmp[i++] = HYDU_strdup(thrid);
466            tmp[i++] = HYDU_strdup(";");
467        }
468        tmp[i++] = HYDU_strdup("found=TRUE;value=");
469        tmp[i++] = HYDU_strdup(run->val);
470        tmp[i++] = HYDU_strdup(";rc=0;");
471        tmp[i++] = NULL;
472
473        status = HYDU_str_alloc_and_join(tmp, &cmd);
474        HYDU_ERR_POP(status, "unable to join strings\n");
475        HYDU_free_strlist(tmp);
476
477        status = send_command(fd, cmd);
478        HYDU_ERR_POP(status, "send command failed\n");
479        HYDU_FREE(cmd);
480
481        if (progress_nest_count)
482            req_complete = 1;
483    }
484
485  fn_exit:
486    HYDU_FUNC_EXIT();
487    return status;
488
489  fn_fail:
490    goto fn_exit;
491}
492
493
494HYD_Status HYD_PMCD_pmi_handle_v2_info_getjobattr(int fd, char *args[])
495{
496    int i, ret;
497    HYD_PMCD_pmi_process_t *process;
498    HYD_PMCD_pmi_kvs_pair_t *run;
499    char *key, *thrid;
500    char *tmp[HYD_NUM_TMP_STRINGS], *cmd, *node_list, *key_pair_str;
501    struct token *tokens;
502    int token_count, found;
503    HYD_Status status = HYD_SUCCESS;
504
505    HYDU_FUNC_ENTER();
506
507    status = args_to_tokens(args, &tokens, &token_count);
508    HYDU_ERR_POP(status, "unable to convert args to tokens\n");
509
510    key = find_token_keyval(tokens, token_count, "key");
511    HYDU_ERR_CHKANDJUMP(status, key == NULL, HYD_INTERNAL_ERROR, "unable to find key token\n");
512
513    thrid = find_token_keyval(tokens, token_count, "thrid");
514
515    /* Find the group id corresponding to this fd */
516    process = HYD_PMCD_pmi_find_process(fd);
517    if (process == NULL)        /* We didn't find the process */
518        HYDU_ERR_SETANDJUMP1(status, HYD_INTERNAL_ERROR,
519                             "unable to find process structure for fd %d\n", fd);
520
521    /* If no format is specified, use the default values */
522    if (strcmp(key, "process-mapping") == 0)
523        key = "process-mapping-vector";
524
525    /* Try to find the key */
526    found = 0;
527    for (run = process->node->pg->kvs->key_pair; run; run = run->next) {
528        if (!strcmp(run->key, key)) {
529            found = 1;
530            break;
531        }
532    }
533
534    if (found == 0) {
535        /* Didn't find the job attribute; see if we know how to
536         * generate it */
537        if (strcmp(key, "process-mapping-vector") == 0) {
538            /* Create a vector format */
539            status = HYD_PMCD_pmi_process_mapping(process, HYD_PMCD_pmi_vector, &node_list);
540            HYDU_ERR_POP(status, "Unable to get process mapping information\n");
541
542            if (strlen(node_list) > MAXVALLEN)
543                HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR,
544                                    "key value larger than maximum allowed\n");
545
546            status = HYD_PMCD_pmi_add_kvs("process-mapping-vector", node_list,
547                                          process->node->pg->kvs, &key_pair_str, &ret);
548            HYDU_ERR_POP(status, "unable to add process_mapping to KVS\n");
549        }
550        else if (strcmp(key, "process-mapping-explicit") == 0) {
551            status = HYD_PMCD_pmi_process_mapping(process, HYD_PMCD_pmi_explicit, &node_list);
552            HYDU_ERR_POP(status, "Unable to get process mapping information\n");
553
554            if (strlen(node_list) > MAXVALLEN)
555                HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR,
556                                    "key value larger than maximum allowed\n");
557
558            status = HYD_PMCD_pmi_add_kvs("process-mapping-explicit", node_list,
559                                          process->node->pg->kvs, &key_pair_str, &ret);
560            HYDU_ERR_POP(status, "unable to add process_mapping to KVS\n");
561        }
562
563        /* Search for the key again */
564        for (run = process->node->pg->kvs->key_pair; run; run = run->next) {
565            if (!strcmp(run->key, key)) {
566                found = 1;
567                break;
568            }
569        }
570    }
571
572    i = 0;
573    tmp[i++] = HYDU_strdup("cmd=info-getjobattr-response;");
574    if (thrid) {
575        tmp[i++] = HYDU_strdup("thrid=");
576        tmp[i++] = HYDU_strdup(thrid);
577        tmp[i++] = HYDU_strdup(";");
578    }
579    tmp[i++] = HYDU_strdup("found=");
580    if (found) {
581        tmp[i++] = HYDU_strdup("TRUE;value=");
582        tmp[i++] = HYDU_strdup(run->val);
583        tmp[i++] = HYDU_strdup(";rc=0;");
584    }
585    else {
586        tmp[i++] = HYDU_strdup("FALSE;rc=0;");
587    }
588    tmp[i++] = NULL;
589
590    status = HYDU_str_alloc_and_join(tmp, &cmd);
591    HYDU_ERR_POP(status, "unable to join strings\n");
592
593    HYDU_free_strlist(tmp);
594
595    status = send_command(fd, cmd);
596    HYDU_ERR_POP(status, "send command failed\n");
597
598    HYDU_FREE(cmd);
599
600  fn_exit:
601    HYDU_FUNC_EXIT();
602    return status;
603
604  fn_fail:
605    goto fn_exit;
606}
607
608
609HYD_Status HYD_PMCD_pmi_handle_v2_kvs_put(int fd, char *args[])
610{
611    char *tmp[HYD_NUM_TMP_STRINGS], *cmd, *key_pair_str;
612    char *key, *val, *thrid;
613    int i, ret;
614    HYD_PMCD_pmi_process_t *process;
615    struct token *tokens;
616    int token_count;
617    HYD_Status status = HYD_SUCCESS;
618
619    HYDU_FUNC_ENTER();
620
621    status = args_to_tokens(args, &tokens, &token_count);
622    HYDU_ERR_POP(status, "unable to convert args to tokens\n");
623
624    key = find_token_keyval(tokens, token_count, "key");
625    HYDU_ERR_CHKANDJUMP(status, key == NULL, HYD_INTERNAL_ERROR, "unable to find key token\n");
626
627    val = find_token_keyval(tokens, token_count, "value");
628    HYDU_ERR_CHKANDJUMP(status, val == NULL, HYD_INTERNAL_ERROR,
629                        "unable to find value token\n");
630
631    thrid = find_token_keyval(tokens, token_count, "thrid");
632
633    /* Find the group id corresponding to this fd */
634    process = HYD_PMCD_pmi_find_process(fd);
635    if (process == NULL)        /* We didn't find the process */
636        HYDU_ERR_SETANDJUMP1(status, HYD_INTERNAL_ERROR,
637                             "unable to find process structure for fd %d\n", fd);
638
639    status = HYD_PMCD_pmi_add_kvs(key, val, process->node->pg->kvs, &key_pair_str, &ret);
640    HYDU_ERR_POP(status, "unable to put data into kvs\n");
641
642    i = 0;
643    tmp[i++] = HYDU_strdup("cmd=kvs-put-response;");
644    if (thrid) {
645        tmp[i++] = HYDU_strdup("thrid=");
646        tmp[i++] = HYDU_strdup(thrid);
647        tmp[i++] = HYDU_strdup(";");
648    }
649    tmp[i++] = HYDU_strdup("rc=");
650    tmp[i++] = HYDU_int_to_str(ret);
651    tmp[i++] = HYDU_strdup(";");
652    tmp[i++] = NULL;
653
654    status = HYDU_str_alloc_and_join(tmp, &cmd);
655    HYDU_ERR_POP(status, "unable to join strings\n");
656
657    HYDU_free_strlist(tmp);
658
659    status = send_command(fd, cmd);
660    HYDU_ERR_POP(status, "send command failed\n");
661
662    HYDU_FREE(cmd);
663
664    /* Poke the progress engine before exiting */
665    status = poke_progress();
666    HYDU_ERR_POP(status, "poke progress error\n");
667
668  fn_exit:
669    HYDU_FUNC_EXIT();
670    return status;
671
672  fn_fail:
673    goto fn_exit;
674}
675
676
677HYD_Status HYD_PMCD_pmi_handle_v2_kvs_get(int fd, char *args[])
678{
679    int i, found;
680    HYD_PMCD_pmi_process_t *process, *prun;
681    HYD_PMCD_pmi_node_t *node;
682    HYD_PMCD_pmi_kvs_pair_t *run;
683    char *key, *thrid;
684    char *tmp[HYD_NUM_TMP_STRINGS], *cmd;
685    struct token *tokens;
686    int token_count, consistent_epoch;
687    HYD_Status status = HYD_SUCCESS;
688
689    HYDU_FUNC_ENTER();
690
691    status = args_to_tokens(args, &tokens, &token_count);
692    HYDU_ERR_POP(status, "unable to convert args to tokens\n");
693
694    key = find_token_keyval(tokens, token_count, "key");
695    HYDU_ERR_CHKANDJUMP(status, key == NULL, HYD_INTERNAL_ERROR, "unable to find key token\n");
696
697    thrid = find_token_keyval(tokens, token_count, "thrid");
698
699    /* Find the group id corresponding to this fd */
700    process = HYD_PMCD_pmi_find_process(fd);
701    if (process == NULL)        /* We didn't find the process */
702        HYDU_ERR_SETANDJUMP1(status, HYD_INTERNAL_ERROR,
703                             "unable to find process structure for fd %d\n", fd);
704
705    found = 0;
706    for (run = process->node->pg->kvs->key_pair; run; run = run->next) {
707        if (!strcmp(run->key, key)) {
708            found = 1;
709            break;
710        }
711    }
712
713    if (!found) {
714        /* If we are already nested, get out of here */
715        if (progress_nest_count)
716            goto fn_exit;
717
718        consistent_epoch = 1;
719        for (node = process->node->pg->node_list; node; node = node->next) {
720            for (prun = node->process_list; prun; prun = prun->next) {
721                if (prun->epoch != process->epoch) {
722                    /* The epochs are not consistent */
723                    consistent_epoch = 0;
724                    break;
725                }
726            }
727        }
728
729        if (consistent_epoch == 0) {
730            /* queue up */
731            status = queue_outstanding_req(fd, KVS_GET, args);
732            HYDU_ERR_POP(status, "unable to queue outstanding request\n");
733
734            /* We are done */
735            goto fn_exit;
736        }
737    }
738
739    i = 0;
740    tmp[i++] = HYDU_strdup("cmd=kvs-get-response;");
741    if (thrid) {
742        tmp[i++] = HYDU_strdup("thrid=");
743        tmp[i++] = HYDU_strdup(thrid);
744        tmp[i++] = HYDU_strdup(";");
745    }
746    if (found) {
747        tmp[i++] = HYDU_strdup("found=TRUE;value=");
748        tmp[i++] = HYDU_strdup(run->val);
749        tmp[i++] = HYDU_strdup(";");
750    }
751    else {
752        tmp[i++] = HYDU_strdup("found=FALSE;");
753    }
754    tmp[i++] = HYDU_strdup("rc=0;");
755    tmp[i++] = NULL;
756
757    status = HYDU_str_alloc_and_join(tmp, &cmd);
758    HYDU_ERR_POP(status, "unable to join strings\n");
759    HYDU_free_strlist(tmp);
760
761    status = send_command(fd, cmd);
762    HYDU_ERR_POP(status, "send command failed\n");
763
764    HYDU_FREE(cmd);
765
766  fn_exit:
767    HYDU_FUNC_EXIT();
768    return status;
769
770  fn_fail:
771    goto fn_exit;
772}
773
774
775HYD_Status HYD_PMCD_pmi_handle_v2_kvs_fence(int fd, char *args[])
776{
777    HYD_PMCD_pmi_process_t *process;
778    char *tmp[HYD_NUM_TMP_STRINGS], *cmd, *thrid;
779    struct token *tokens;
780    int token_count, i;
781    HYD_Status status = HYD_SUCCESS;
782
783    HYDU_FUNC_ENTER();
784
785    status = args_to_tokens(args, &tokens, &token_count);
786    HYDU_ERR_POP(status, "unable to convert args to tokens\n");
787
788    thrid = find_token_keyval(tokens, token_count, "thrid");
789
790    /* Find the group id corresponding to this fd */
791    process = HYD_PMCD_pmi_find_process(fd);
792    if (process == NULL)        /* We didn't find the process */
793        HYDU_ERR_SETANDJUMP1(status, HYD_INTERNAL_ERROR,
794                             "unable to find process structure for fd %d\n", fd);
795
796    process->epoch++;   /* We have reached the next epoch */
797
798    process->node->pg->barrier_count++;
799
800    i = 0;
801    tmp[i++] = HYDU_strdup("cmd=kvs-fence-response;");
802    if (thrid) {
803        tmp[i++] = HYDU_strdup("thrid=");
804        tmp[i++] = HYDU_strdup(thrid);
805        tmp[i++] = HYDU_strdup(";");
806    }
807    tmp[i++] = HYDU_strdup("rc=0;");
808    tmp[i++] = NULL;
809
810    status = HYDU_str_alloc_and_join(tmp, &cmd);
811    HYDU_ERR_POP(status, "unable to join strings\n");
812    HYDU_free_strlist(tmp);
813
814    status = send_command(fd, cmd);
815    HYDU_ERR_POP(status, "send command failed\n");
816    HYDU_FREE(cmd);
817
818    /* Poke the progress engine before exiting */
819    status = poke_progress();
820    HYDU_ERR_POP(status, "poke progress error\n");
821
822  fn_exit:
823    HYDU_FUNC_EXIT();
824    dprintf("[%d] out of fence\n", fd);
825    return status;
826
827  fn_fail:
828    goto fn_exit;
829}
830
831
832HYD_Status HYD_PMCD_pmi_handle_v2_finalize(int fd, char *args[])
833{
834    char *thrid;
835    char *tmp[HYD_NUM_TMP_STRINGS], *cmd;
836    struct token *tokens;
837    int token_count, i;
838    HYD_Status status = HYD_SUCCESS;
839
840    HYDU_FUNC_ENTER();
841
842    status = args_to_tokens(args, &tokens, &token_count);
843    HYDU_ERR_POP(status, "unable to convert args to tokens\n");
844
845    thrid = find_token_keyval(tokens, token_count, "thrid");
846
847    i = 0;
848    tmp[i++] = HYDU_strdup("cmd=finalize-response;");
849    if (thrid) {
850        tmp[i++] = HYDU_strdup("thrid=");
851        tmp[i++] = HYDU_strdup(thrid);
852        tmp[i++] = HYDU_strdup(";");
853    }
854    tmp[i++] = HYDU_strdup("rc=0;");
855    tmp[i++] = NULL;
856
857    status = HYDU_str_alloc_and_join(tmp, &cmd);
858    HYDU_ERR_POP(status, "unable to join strings\n");
859    HYDU_free_strlist(tmp);
860
861    status = send_command(fd, cmd);
862    HYDU_ERR_POP(status, "send command failed\n");
863    HYDU_FREE(cmd);
864
865    if (status == HYD_SUCCESS) {
866        status = HYD_DMX_deregister_fd(fd);
867        HYDU_ERR_POP(status, "unable to register fd\n");
868        close(fd);
869    }
870
871  fn_exit:
872    HYDU_FUNC_EXIT();
873    return status;
874
875  fn_fail:
876    goto fn_exit;
877}
Note: See TracBrowser for help on using the browser.