root/mpich2/trunk/src/mpid/ch3/channels/nemesis/nemesis/netmod/tcp/socksm.c @ 4707

Revision 4707, 65.9 KB (checked in by buntinas, 5 months ago)

converted all uses of memcpy to the macro MPIU_Memcpy

Line 
1/* -*- Mode: C; c-basic-offset:4 ; -*- */
2/*
3 *  (C) 2006 by Argonne National Laboratory.
4 *      See COPYRIGHT in top-level directory.
5 */
6
7#define SOCKSM_H_DEFGLOBALS_
8
9#include "tcp_impl.h"
10#include "socksm.h"
11#ifdef USE_PMI2_API
12#include "pmi2.h"
13#else
14#include "pmi.h"
15#endif
16
17/* FIXME trace/log all the state transitions */
18
19typedef struct freenode {
20    int index;
21    struct freenode* next;
22} freenode_t;
23
24static struct {
25    freenode_t *head, *tail;
26} freeq = {NULL, NULL};
27
28static int g_tbl_size = 0;
29static int g_tbl_capacity = CONN_PLFD_TBL_INIT_SIZE;
30static int g_tbl_grow_size = CONN_PLFD_TBL_GROW_SIZE;
31
32static sockconn_t *g_sc_tbl = NULL;
33struct pollfd *MPID_nem_tcp_plfd_tbl = NULL;
34
35sockconn_t MPID_nem_tcp_g_lstn_sc = {0};
36struct pollfd MPID_nem_tcp_g_lstn_plfd = {0};
37
38/* We define this in order to trick the compiler into including
39   information about the MPID_nem_tcp_vc_area type.  This is
40   needed to easily expand the VC_FIELD macro in a debugger.  The
41   'unused' attribute keeps the compiler from complaining.  The 'used'
42   attribute makes sure the symbol is added in the lib, even if it's
43   unused */
44static MPID_nem_tcp_vc_area *dummy_vc_area ATTRIBUTE((unused, used)) = NULL;
45
46#define MAX_SKIP_POLLS_INACTIVE 512 /* something bigger */
47#define MAX_SKIP_POLLS_ACTIVE   128 /* something smaller */
48static int MPID_nem_tcp_skip_polls = MAX_SKIP_POLLS_INACTIVE;
49
50/* Debug function to dump the sockconn table.  This is intended to be
51   called from a debugger.  The 'unused' attribute keeps the compiler
52   from complaining.  The 'used' attribute makes sure the function is
53   added in the lib, despite the fact that it's unused. */
54static void dbg_print_sc_tbl(FILE *stream, int print_free_entries) ATTRIBUTE((unused, used));
55
56#define MPID_NEM_TCP_RECV_MAX_PKT_LEN 1024
57static char *recv_buf;
58
59static struct {
60    handler_func_t sc_state_handler;
61    short sc_state_plfd_events;
62} sc_state_info[CONN_STATE_SIZE];
63
64#define IS_WRITEABLE(plfd)                      \
65    (plfd->revents & POLLOUT) ? 1 : 0
66
67#define IS_READABLE(plfd)                       \
68    (plfd->revents & POLLIN) ? 1 : 0
69
70#define IS_READ_WRITEABLE(plfd)                                 \
71    (plfd->revents & POLLIN && plfd->revents & POLLOUT) ? 1 : 0
72
73#define IS_SAME_PGID(id1, id2) \
74    (strcmp(id1, id2) == 0)
75
76/* Will evaluate to false if either one of these sc's does not have valid pg data */
77#define IS_SAME_CONNECTION(sc1, sc2)                                    \
78    (sc1->pg_is_set && sc2->pg_is_set &&                                \
79     sc1->pg_rank == sc2->pg_rank &&                                    \
80     ((sc1->is_same_pg && sc2->is_same_pg) ||                           \
81      (!sc1->is_same_pg && !sc2->is_same_pg &&                          \
82       IS_SAME_PGID(sc1->pg_id, sc2->pg_id))))
83
84#define INIT_SC_ENTRY(sc, ind)                  \
85    do {                                        \
86        (sc)->fd = CONN_INVALID_FD;             \
87        (sc)->index = ind;                      \
88        (sc)->vc = NULL;                        \
89        (sc)->pg_is_set = FALSE;                \
90        (sc)->is_tmpvc = FALSE;                 \
91        (sc)->state.cstate = CONN_STATE_TS_CLOSED; \
92    } while (0)
93
94#define INIT_POLLFD_ENTRY(plfd)                               \
95    do {                                                      \
96        (plfd)->fd = CONN_INVALID_FD;                         \
97        (plfd)->events = POLLIN;                              \
98    } while (0)
99
100
101/* --BEGIN ERROR HANDLING-- */
102/* This function can be called from a debugger to dump the contents of the
103   g_sc_tbl to the given stream.  If print_free_entries is true entries
104   0..g_tbl_capacity will be printed.  Otherwise, only 0..g_tbl_size will be
105   shown. */
106static void dbg_print_sc_tbl(FILE *stream, int print_free_entries)
107{
108    int i;
109    sockconn_t *sc;
110
111    fprintf(stream, "========================================\n");
112    for (i = 0; i < (print_free_entries ? g_tbl_capacity : g_tbl_size); ++i) {
113        sc = &g_sc_tbl[i];
114#define TF(_b) ((_b) ? "TRUE" : "FALSE")
115        fprintf(stream, "[%d] ptr=%p idx=%d fd=%d state=%s\n",
116                i, sc, sc->index, sc->fd, CONN_STATE_TO_STRING(sc->state.cstate));
117        fprintf(stream, "....pg_is_set=%s is_same_pg=%s is_tmpvc=%s pg_rank=%d pg_id=%s\n",
118                TF(sc->pg_is_set), TF(sc->is_same_pg), TF(sc->is_tmpvc), sc->pg_rank, sc->pg_id);
119#undef TF
120    }
121    fprintf(stream, "========================================\n");
122}
123/* --END ERROR HANDLING-- */
124
125static int find_free_entry(int *index);
126
127#undef FUNCNAME
128#define FUNCNAME alloc_sc_plfd_tbls
129#undef FCNAME
130#define FCNAME MPIDI_QUOTE(FUNCNAME)
131static int alloc_sc_plfd_tbls (void)
132{
133    int i, mpi_errno = MPI_SUCCESS, index = -1;
134    MPIU_CHKPMEM_DECL (2);
135
136    MPIU_Assert(g_sc_tbl == NULL);
137    MPIU_Assert(MPID_nem_tcp_plfd_tbl == NULL);
138
139    MPIU_CHKPMEM_MALLOC (g_sc_tbl, sockconn_t *, g_tbl_capacity * sizeof(sockconn_t), 
140                         mpi_errno, "connection table");
141    MPIU_CHKPMEM_MALLOC (MPID_nem_tcp_plfd_tbl, struct pollfd *, g_tbl_capacity * sizeof(struct pollfd), 
142                         mpi_errno, "pollfd table");
143#if defined(MPICH_DEBUG_MEMINIT)
144    /* We initialize the arrays in order to eliminate spurious valgrind errors
145       that occur when poll(2) returns 0.  See valgrind bugzilla#158425 and
146       remove this code if the fix ever gets into a release of valgrind.
147       [goodell@ 2007-02-25] */
148    memset(MPID_nem_tcp_plfd_tbl, 0, g_tbl_capacity * sizeof(struct pollfd));
149#endif
150
151    for (i = 0; i < g_tbl_capacity; i++) {
152        INIT_SC_ENTRY(((sockconn_t *)&g_sc_tbl[i]), i);
153        INIT_POLLFD_ENTRY(((struct pollfd *)&MPID_nem_tcp_plfd_tbl[i]));
154    }
155    MPIU_CHKPMEM_COMMIT();
156
157    mpi_errno = find_free_entry(&index);
158    if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP (mpi_errno);
159
160    MPIU_Assert(0 == index); /* assumed in other parts of this file */
161    MPIU_Memcpy (&g_sc_tbl[index], &MPID_nem_tcp_g_lstn_sc, sizeof(MPID_nem_tcp_g_lstn_sc));
162    MPIU_Memcpy (&MPID_nem_tcp_plfd_tbl[index], &MPID_nem_tcp_g_lstn_plfd, sizeof(MPID_nem_tcp_g_lstn_plfd));
163    MPIU_Assert(MPID_nem_tcp_plfd_tbl[index].fd == g_sc_tbl[index].fd);
164    MPIU_Assert(MPID_nem_tcp_plfd_tbl[index].events == POLLIN);
165
166 fn_exit:
167    return mpi_errno;
168 fn_fail:
169    MPIU_CHKPMEM_REAP();
170    MPIU_DBG_MSG_FMT(NEM_SOCK_DET, VERBOSE, (MPIU_DBG_FDEST, "failure. mpi_errno = %d", mpi_errno));
171    goto fn_exit;
172}
173
174#undef FUNCNAME
175#define FUNCNAME free_sc_plfd_tbls
176#undef FCNAME
177#define FCNAME MPIDI_QUOTE(FUNCNAME)
178static int free_sc_plfd_tbls (void)
179{
180    int mpi_errno = MPI_SUCCESS;
181
182    MPIU_Free(g_sc_tbl);
183    MPIU_Free(MPID_nem_tcp_plfd_tbl);
184    return mpi_errno;
185}
186
187/*
188  Reason for not doing realloc for both sc and plfd tables :
189  Either both the tables have to be expanded or both should remain the same size, if
190  enough memory could not be allocated, as we have only one set of variables to control
191  the size of the tables. Also, it is not useful to expand one table and leave the other
192  at the same size, 'coz of memory allocation failures.
193*/
194#undef FUNCNAME
195#define FUNCNAME expand_sc_plfd_tbls
196#undef FCNAME
197#define FCNAME MPIDI_QUOTE(FUNCNAME)
198static int expand_sc_plfd_tbls (void)
199{
200    int mpi_errno = MPI_SUCCESS; 
201    sockconn_t *new_sc_tbl = NULL;
202    struct pollfd *new_plfd_tbl = NULL;
203    int new_capacity = g_tbl_capacity + g_tbl_grow_size, i;
204    MPIU_CHKPMEM_DECL (2);
205
206    MPIU_DBG_MSG_FMT(NEM_SOCK_DET, VERBOSE, (MPIU_DBG_FDEST, "expand_sc_plfd_tbls Entry"));
207    MPIU_DBG_MSG_FMT(NEM_SOCK_DET, VERBOSE, (MPIU_DBG_FDEST, "expand_sc_plfd_tbls b4 g_sc_tbl[0].fd=%d", g_sc_tbl[0].fd));
208    MPIU_CHKPMEM_MALLOC (new_sc_tbl, sockconn_t *, new_capacity * sizeof(sockconn_t), 
209                         mpi_errno, "expanded connection table");
210    MPIU_CHKPMEM_MALLOC (new_plfd_tbl, struct pollfd *, new_capacity * sizeof(struct pollfd), 
211                         mpi_errno, "expanded pollfd table");
212
213    MPIU_Memcpy (new_sc_tbl, g_sc_tbl, g_tbl_capacity * sizeof(sockconn_t));
214    MPIU_Memcpy (new_plfd_tbl, MPID_nem_tcp_plfd_tbl, g_tbl_capacity * sizeof(struct pollfd));
215
216    /* VCs have pointers to entries in the sc table.  These
217       are updated here after the expand. */
218    for (i = 1; i < g_tbl_capacity; i++)   /* i=0 = listening socket fd won't have a VC pointer */
219    {
220        /* It's important to only make the assignment if the sc address in the
221           vc matches the old sc address, otherwise we can corrupt the vc's
222           state in certain head-to-head situations. */
223        if (g_sc_tbl[i].vc &&
224            VC_FIELD(g_sc_tbl[i].vc, sc) &&
225            VC_FIELD(g_sc_tbl[i].vc, sc) == &g_sc_tbl[i])
226        {
227            ASSIGN_SC_TO_VC(g_sc_tbl[i].vc, &new_sc_tbl[i]);
228        }
229    }
230
231    MPIU_Free(g_sc_tbl);
232    MPIU_Free(MPID_nem_tcp_plfd_tbl);
233    g_sc_tbl = new_sc_tbl;
234    MPID_nem_tcp_plfd_tbl = new_plfd_tbl;
235    for (i = g_tbl_capacity; i < new_capacity; i++) {
236        INIT_SC_ENTRY(((sockconn_t *)&g_sc_tbl[i]), i);
237        INIT_POLLFD_ENTRY(((struct pollfd *)&MPID_nem_tcp_plfd_tbl[i]));
238    }
239    g_tbl_capacity = new_capacity;
240
241    MPIU_DBG_MSG_FMT(NEM_SOCK_DET, VERBOSE, (MPIU_DBG_FDEST, "expand_sc_plfd_tbls af g_sc_tbl[0].fd=%d", g_sc_tbl[0].fd));
242    for (i = 0; i < g_tbl_capacity; ++i)
243    {
244        /*         sockconn_t *dbg_sc = g_sc_tbl[i].vc ? VC_FIELD(g_sc_tbl[i].vc, sc) : (sockconn_t*)(-1); */
245
246        /* The state is only valid if the FD is valid.  The VC field is only
247           valid if the state is valid and COMMRDY. */
248        MPIU_Assert(MPID_nem_tcp_plfd_tbl[i].fd == CONN_INVALID_FD ||
249                    g_sc_tbl[i].state.cstate != CONN_STATE_TS_COMMRDY ||
250                    VC_FIELD(g_sc_tbl[i].vc, sc) == &g_sc_tbl[i]);
251    }
252   
253   
254    MPIU_CHKPMEM_COMMIT();   
255 fn_exit:
256    MPIU_DBG_MSG_FMT(NEM_SOCK_DET, VERBOSE, (MPIU_DBG_FDEST, "expand_sc_plfd_tbls Exit"));
257    return mpi_errno;
258 fn_fail:
259    MPIU_CHKPMEM_REAP();
260    MPIU_DBG_MSG_FMT(NEM_SOCK_DET, VERBOSE, (MPIU_DBG_FDEST, "failure. mpi_errno = %d", mpi_errno));
261    goto fn_exit;
262}
263
264/*
265  Finds the first free entry in the connection table/pollfd table. Note that both the
266  tables are indexed the same. i.e. each entry in one table corresponds to the
267  entry of the same index in the other table. If an entry is valid in one table, then
268  it is valid in the other table too.
269
270  This function finds the first free entry in both the tables by checking the queue of
271  free elements. If the free queue is empty, then it returns the next available slot
272  in the tables. If the size of the slot is already full, then this expands the table
273  and then returns the next available slot
274*/
275#undef FUNCNAME
276#define FUNCNAME find_free_entry
277#undef FCNAME
278#define FCNAME MPIDI_QUOTE(FUNCNAME)
279static int find_free_entry(int *index)
280{
281    int mpi_errno = MPI_SUCCESS;
282    freenode_t *node;
283
284    if (!Q_EMPTY(freeq)) {
285        Q_DEQUEUE(&freeq, ((freenode_t **)&node)); 
286        *index = node->index;
287        MPIU_Free(node);
288        goto fn_exit;
289    }
290
291    if (g_tbl_size == g_tbl_capacity) {
292        mpi_errno = expand_sc_plfd_tbls();
293        if (mpi_errno != MPI_SUCCESS)
294            goto fn_fail;
295    }
296
297    MPIU_Assert(g_tbl_capacity > g_tbl_size);
298    *index = g_tbl_size;
299    ++g_tbl_size;
300
301 fn_exit:
302    /* This function is the closest thing we have to a constructor, so we throw
303       in a couple of initializers here in case the caller is sloppy about his
304       assumptions. */
305    INIT_SC_ENTRY(&g_sc_tbl[*index], *index);
306    INIT_POLLFD_ENTRY(&MPID_nem_tcp_plfd_tbl[*index]);
307    return mpi_errno;
308 fn_fail:
309    MPIU_DBG_MSG_FMT(NEM_SOCK_DET, VERBOSE, (MPIU_DBG_FDEST, "failure. mpi_errno = %d", mpi_errno));
310    goto fn_exit;
311}
312
313/* Note:
314   fnd_sc is returned only for certain states. If it is not returned for a state,
315   the handler function can simply pass NULL as the second argument.
316 */
317#undef FUNCNAME
318#define FUNCNAME found_better_sc
319#undef FCNAME
320#define FCNAME MPIDI_QUOTE(FUNCNAME)
321static int found_better_sc(sockconn_t *sc, sockconn_t **fnd_sc)
322{
323    int i, found = FALSE;
324    MPIDI_STATE_DECL(MPID_STATE_FOUND_BETTER_SC);
325
326    MPIDI_FUNC_ENTER(MPID_STATE_FOUND_BETTER_SC);
327
328    /* tmpvc's can never match a better sc */
329    if (sc->is_tmpvc) {
330        found = FALSE;
331        goto fn_exit;
332    }
333
334    /* if we don't know our own pg info, how can we look for a better SC? */
335    MPIU_Assert(sc->pg_is_set);
336
337    for(i = 0; i < g_tbl_size && !found; i++)
338    {
339        sockconn_t *iter_sc = &g_sc_tbl[i];
340        MPID_nem_tcp_Conn_State_t istate = iter_sc->state.cstate;
341
342        if (iter_sc != sc && iter_sc->fd != CONN_INVALID_FD
343            && IS_SAME_CONNECTION(iter_sc, sc))
344        {
345            switch (sc->state.cstate)
346            {
347            case CONN_STATE_TC_C_CNTD:
348                MPIU_Assert(fnd_sc == NULL);
349                if (istate == CONN_STATE_TS_COMMRDY ||
350                    istate == CONN_STATE_TA_C_RANKRCVD ||
351                    istate == CONN_STATE_TC_C_TMPVCSENT)
352                    found = TRUE;
353                break;
354            case CONN_STATE_TA_C_RANKRCVD:
355                MPIU_Assert(fnd_sc != NULL);
356                if (istate == CONN_STATE_TS_COMMRDY || istate == CONN_STATE_TC_C_RANKSENT) {
357                    found = TRUE;
358                    *fnd_sc = iter_sc;
359                }
360                break;               
361            case CONN_STATE_TA_C_TMPVCRCVD:
362                MPIU_Assert(fnd_sc != NULL);
363                if (istate == CONN_STATE_TS_COMMRDY || istate == CONN_STATE_TC_C_TMPVCSENT) {
364                    found = TRUE;
365                    *fnd_sc = iter_sc;
366                }
367                break;               
368                /* Add code for other states here, if need be. */
369            default:
370                /* FIXME: need to handle error condition better */
371                MPIU_Assert (0);
372                break;
373            }
374        }
375    }
376
377fn_exit:
378    if (found) {
379        if (fnd_sc) {
380            MPIU_DBG_MSG_FMT(NEM_SOCK_DET, VERBOSE,
381                             (MPIU_DBG_FDEST, "found_better_sc(sc=%p (%s), *fnd_sc=%p (%s)) found=TRUE",
382                              sc, CONN_STATE_STR[sc->state.cstate],
383                              *fnd_sc, (*fnd_sc ? CONN_STATE_STR[(*fnd_sc)->state.cstate] : "N/A")));
384        }
385        else {
386            MPIU_DBG_MSG_FMT(NEM_SOCK_DET, VERBOSE,
387                             (MPIU_DBG_FDEST, "found_better_sc(sc=%p (%s), fnd_sc=(nil)) found=TRUE",
388                              sc, CONN_STATE_STR[sc->state.cstate]));
389        }
390    }
391    else {
392        MPIU_DBG_MSG_FMT(NEM_SOCK_DET, VERBOSE,
393                         (MPIU_DBG_FDEST, "found_better_sc(sc=%p (%s), *fnd_sc=N/A) found=FALSE",
394                          sc, CONN_STATE_STR[sc->state.cstate]));
395    }
396    MPIDI_FUNC_EXIT(MPID_STATE_FOUND_BETTER_SC);
397    return found;
398}
399
400
401#undef FUNCNAME
402#define FUNCNAME vc_is_in_shutdown
403#undef FCNAME
404#define FCNAME MPIDI_QUOTE(FUNCNAME)
405static int vc_is_in_shutdown(MPIDI_VC_t *vc)
406{
407    int retval = FALSE;
408    MPIU_Assert(vc != NULL);
409    if (vc->state == MPIDI_VC_STATE_REMOTE_CLOSE ||
410        vc->state == MPIDI_VC_STATE_CLOSE_ACKED ||
411        vc->state == MPIDI_VC_STATE_LOCAL_CLOSE ||
412        vc->state == MPIDI_VC_STATE_INACTIVE)
413    {
414        retval = TRUE;
415    }
416
417    MPIU_DBG_MSG_FMT(NEM_SOCK_DET, VERBOSE, (MPIU_DBG_FDEST, "vc_is_in_shutdown(%p)=%s", vc, (retval ? "TRUE" : "FALSE")));
418    return retval;
419}
420
421#undef FUNCNAME
422#define FUNCNAME send_id_info
423#undef FCNAME
424#define FCNAME MPIDI_QUOTE(FUNCNAME)
425static int send_id_info(const sockconn_t *const sc)
426{
427    int mpi_errno = MPI_SUCCESS;
428    MPIDI_nem_tcp_idinfo_t id_info;
429    MPIDI_nem_tcp_header_t hdr;
430    struct iovec iov[3];
431    int pg_id_len = 0, offset, buf_size, iov_cnt = 2;
432    MPIDI_STATE_DECL(MPID_STATE_SEND_ID_INFO);
433
434    MPIDI_FUNC_ENTER(MPID_STATE_SEND_ID_INFO);
435
436    MPIU_DBG_MSG_FMT(NEM_SOCK_DET, VERBOSE, (MPIU_DBG_FDEST, "my_pg->id=%s my_pg->rank=%d, sc->pg_rank=%d sc->is_same_pg=%s",
437                                             (char *)MPIDI_Process.my_pg->id, MPIDI_Process.my_pg_rank, sc->pg_rank,
438                                             (sc->is_same_pg ? "TRUE" : "FALSE")));
439    if (!sc->is_same_pg)
440        pg_id_len = strlen(MPIDI_Process.my_pg->id) + 1; 
441
442/*     store ending NULL also */
443/*     FIXME better keep pg_id_len itself as part of MPIDI_Process.my_pg structure to */
444/*     avoid computing the length of string everytime this function is called. */
445   
446    hdr.pkt_type = MPIDI_NEM_TCP_PKT_ID_INFO;
447    hdr.datalen = sizeof(MPIDI_nem_tcp_idinfo_t) + pg_id_len;   
448    id_info.pg_rank = MPIDI_Process.my_pg_rank;
449
450    iov[0].iov_base = (MPID_IOV_BUF_CAST)&hdr;
451    iov[0].iov_len = sizeof(hdr);
452    iov[1].iov_base = (MPID_IOV_BUF_CAST)&id_info;
453    iov[1].iov_len = sizeof(id_info);
454    buf_size = sizeof(hdr) + sizeof(id_info);
455
456    if (!sc->is_same_pg) {
457        iov[2].iov_base = MPIDI_Process.my_pg->id;
458        iov[2].iov_len = pg_id_len;
459        buf_size += pg_id_len;
460        ++iov_cnt;
461    }
462   
463    CHECK_EINTR (offset, writev(sc->fd, iov, iov_cnt));
464    MPIU_ERR_CHKANDJUMP1 (offset == -1 && errno != EAGAIN, mpi_errno, MPI_ERR_OTHER, 
465                          "**write", "**write %s", strerror (errno));
466    MPIU_ERR_CHKANDJUMP1 (offset != buf_size, mpi_errno, MPI_ERR_OTHER, 
467                          "**write", "**write %s", strerror (errno)); 
468/*     FIXME log appropriate error */
469/*     FIXME-Z1  socket is just connected and we are sending a few bytes. So, there should not */
470/*     be a problem of partial data only being written to. If partial data written, */
471/*     handle this. */
472
473 fn_exit:
474    MPIDI_FUNC_EXIT(MPID_STATE_SEND_ID_INFO);
475    return mpi_errno;
476 fn_fail:
477    MPIU_DBG_MSG_FMT(NEM_SOCK_DET, VERBOSE, (MPIU_DBG_FDEST, "failure. mpi_errno = %d, offset=%d, errno=%d %s", mpi_errno, offset, errno, strerror(errno)));
478    goto fn_exit;   
479}
480
481
482#undef FUNCNAME
483#define FUNCNAME send_tmpvc_info
484#undef FCNAME
485#define FCNAME MPIDI_QUOTE(FUNCNAME)
486static int send_tmpvc_info(const sockconn_t *const sc)
487{
488    int mpi_errno = MPI_SUCCESS;
489    MPIDI_nem_tcp_portinfo_t port_info;
490    MPIDI_nem_tcp_header_t hdr;
491    struct iovec iov[3];
492    int offset, buf_size, iov_cnt = 2;
493    MPIDI_STATE_DECL(MPID_STATE_SEND_TMPVC_INFO);
494
495    MPIDI_FUNC_ENTER(MPID_STATE_SEND_TMPVC_INFO);
496
497    MPIU_DBG_MSG_FMT(NEM_SOCK_DET, VERBOSE, (MPIU_DBG_FDEST, "my->pg_rank=%d, sc->pg_rank=%d"
498                                             , MPIDI_Process.my_pg_rank, sc->pg_rank));
499
500/*     store ending NULL also */
501/*     FIXME better keep pg_id_len itself as part of MPIDI_Process.my_pg structure to */
502/*     avoid computing the length of string everytime this function is called. */
503   
504    hdr.pkt_type = MPIDI_NEM_TCP_PKT_TMPVC_INFO;
505    hdr.datalen = sizeof(MPIDI_nem_tcp_portinfo_t);
506    port_info.port_name_tag = sc->vc->port_name_tag;
507
508    iov[0].iov_base = (MPID_IOV_BUF_CAST)&hdr;
509    iov[0].iov_len = sizeof(hdr);
510    iov[1].iov_base = (MPID_IOV_BUF_CAST)&port_info;
511    iov[1].iov_len = sizeof(port_info);
512    buf_size = sizeof(hdr) + sizeof(port_info);
513   
514    CHECK_EINTR (offset, writev(sc->fd, iov, iov_cnt));
515    MPIU_ERR_CHKANDJUMP1 (offset == -1 && errno != EAGAIN, mpi_errno, MPI_ERR_OTHER, 
516                          "**write", "**write %s", strerror (errno));
517    MPIU_ERR_CHKANDJUMP1 (offset != buf_size, mpi_errno, MPI_ERR_OTHER, 
518                          "**write", "**write %s", strerror (errno)); 
519/*     FIXME log appropriate error */
520/*     FIXME-Z1  socket is just connected and we are sending a few bytes. So, there should not */
521/*     be a problem of partial data only being written to. If partial data written, */
522/*     handle this. */
523
524 fn_exit:
525    MPIDI_FUNC_EXIT(MPID_STATE_SEND_TMPVC_INFO);
526    return mpi_errno;
527 fn_fail:
528    MPIU_DBG_MSG_FMT(NEM_SOCK_DET, VERBOSE, (MPIU_DBG_FDEST, "failure. mpi_errno = %d, offset=%d, errno=%d %s", mpi_errno, offset, errno, strerror(errno)));
529    goto fn_exit;   
530}
531
532#undef FUNCNAME
533#define FUNCNAME recv_id_or_tmpvc_info
534#undef FCNAME
535#define FCNAME MPIDI_QUOTE(FUNCNAME)
536static int recv_id_or_tmpvc_info(sockconn_t *const sc, int *got_sc_eof)
537{
538    int mpi_errno = MPI_SUCCESS;
539    MPIDI_nem_tcp_header_t hdr;
540    int pg_id_len = 0, nread, iov_cnt = 1;
541    int hdr_len = sizeof(MPIDI_nem_tcp_header_t);
542    struct iovec iov[2];
543    char *pg_id = NULL;
544
545    MPIU_CHKPMEM_DECL (1);
546    MPIU_CHKLMEM_DECL (1);
547    MPIDI_STATE_DECL(MPID_STATE_RECV_ID_OR_TMPVC_INFO);
548
549    MPIDI_FUNC_ENTER(MPID_STATE_RECV_ID_OR_TMPVC_INFO);
550
551    *got_sc_eof = 0;
552
553    CHECK_EINTR (nread, read(sc->fd, &hdr, hdr_len));
554
555    /* The other side closed this connection (hopefully as part of a
556       head-to-head resolution. */
557    if (0 == nread) {
558        CHANGE_STATE(sc, CONN_STATE_TS_D_QUIESCENT);
559        *got_sc_eof = 1;
560        goto fn_exit;
561    }
562    MPIU_ERR_CHKANDJUMP1 (nread == -1 && errno != EAGAIN, mpi_errno, MPI_ERR_OTHER,
563                          "**read", "**read %s", strerror (errno));
564    MPIU_ERR_CHKANDJUMP1 (nread != hdr_len, mpi_errno, MPI_ERR_OTHER,
565                          "**read", "**read %s", strerror (errno));  /* FIXME-Z1 */
566    MPIU_Assert(hdr.pkt_type == MPIDI_NEM_TCP_PKT_ID_INFO ||
567                hdr.pkt_type == MPIDI_NEM_TCP_PKT_TMPVC_INFO);
568    MPIU_Assert(hdr.datalen != 0);
569   
570    if (hdr.pkt_type == MPIDI_NEM_TCP_PKT_ID_INFO) {
571        iov[0].iov_base = (void *) &(sc->pg_rank);
572        iov[0].iov_len = sizeof(sc->pg_rank);
573        pg_id_len = hdr.datalen - sizeof(MPIDI_nem_tcp_idinfo_t);
574        if (pg_id_len != 0) {
575            MPIU_CHKLMEM_MALLOC (pg_id, char *, pg_id_len, mpi_errno, "sockconn pg_id");
576            iov[1].iov_base = (void *)pg_id;
577            iov[1].iov_len = pg_id_len;
578            ++iov_cnt;
579        } 
580        CHECK_EINTR (nread, readv(sc->fd, iov, iov_cnt));
581        MPIU_ERR_CHKANDJUMP1 (nread == -1 && errno != EAGAIN, mpi_errno, MPI_ERR_OTHER,
582                              "**read", "**read %s", strerror (errno));
583        MPIU_ERR_CHKANDJUMP1 (nread != hdr.datalen, mpi_errno, MPI_ERR_OTHER,
584                              "**read", "**read %s", strerror (errno)); /* FIXME-Z1 */
585        if (pg_id_len == 0) {
586            sc->is_same_pg = TRUE;
587            mpi_errno = MPID_nem_tcp_get_vc_from_conninfo (MPIDI_Process.my_pg->id, 
588                                                                     sc->pg_rank, &sc->vc);
589            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
590            sc->pg_id = NULL;
591        }
592        else {
593            sc->is_same_pg = FALSE;
594            mpi_errno = MPID_nem_tcp_get_vc_from_conninfo (pg_id, sc->pg_rank, &sc->vc);
595            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
596            sc->pg_id = sc->vc->pg->id;
597        }
598
599        MPIU_Assert(sc->vc != NULL);
600        MPIU_DBG_MSG_FMT(NEM_SOCK_DET, VERBOSE, (MPIU_DBG_FDEST, "about to incr sc_ref_count sc=%p sc->vc=%p sc_ref_count=%d", sc, sc->vc, VC_FIELD(sc->vc, sc_ref_count)));
601        ++VC_FIELD(sc->vc, sc_ref_count);
602
603        /* very important, without this IS_SAME_CONNECTION will always fail */
604        sc->pg_is_set = TRUE;
605       
606        MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "PKT_ID_INFO: sc->fd=%d, sc->vc=%p, sc=%p", sc->fd, sc->vc, sc));
607    }
608    else if (hdr.pkt_type == MPIDI_NEM_TCP_PKT_TMPVC_INFO) {
609        MPIDI_VC_t *vc;
610
611        MPIU_DBG_MSG_FMT(NEM_SOCK_DET, VERBOSE, (MPIU_DBG_FDEST, "PKT_TMPVC_INFO: sc->fd=%d", sc->fd));
612        /* create a new VC */
613        MPIU_CHKPMEM_MALLOC (vc, MPIDI_VC_t *, sizeof(MPIDI_VC_t), mpi_errno, "real vc from tmp vc");
614        /* --BEGIN ERROR HANDLING-- */
615        if (vc == NULL) {
616            mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem", NULL);
617            goto fn_fail;
618        }
619        /* --END ERROR HANDLING-- */
620
621        MPIDI_VC_Init(vc, NULL, 0);     
622        ((MPIDI_CH3I_VC *)vc->channel_private)->state = MPID_NEM_TCP_VC_STATE_CONNECTED; /* FIXME: is it needed ? */
623        sc->vc = vc; 
624        MPIU_DBG_MSG_FMT(NEM_SOCK_DET, VERBOSE, (MPIU_DBG_FDEST, "about to incr sc_ref_count sc=%p sc->vc=%p sc_ref_count=%d", sc, sc->vc, VC_FIELD(sc->vc, sc_ref_count)));
625        ++VC_FIELD(vc, sc_ref_count);
626
627        ASSIGN_SC_TO_VC(vc, sc);
628
629        /* get the port's tag from the packet and stash it in the VC */
630        iov[0].iov_base = (void *) &(sc->vc->port_name_tag);
631        iov[0].iov_len = sizeof(sc->vc->port_name_tag);
632
633        CHECK_EINTR (nread, readv(sc->fd, iov, iov_cnt));
634
635        MPIU_ERR_CHKANDJUMP1 (nread == -1 && errno != EAGAIN, mpi_errno, MPI_ERR_OTHER,
636                              "**read", "**read %s", strerror (errno));
637       
638        MPIU_ERR_CHKANDJUMP1 (nread != hdr.datalen, mpi_errno, MPI_ERR_OTHER,
639                              "**read", "**read %s", strerror (errno)); /* FIXME-Z1 */
640        sc->is_same_pg = FALSE;
641        sc->pg_id = NULL;
642        sc->is_tmpvc = TRUE;
643
644        MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "enqueuing on acceptq vc=%p, sc->fd=%d, tag=%d", vc, sc->fd, sc->vc->port_name_tag));
645        MPIDI_CH3I_Acceptq_enqueue(vc, sc->vc->port_name_tag);
646    }
647
648    MPIU_CHKPMEM_COMMIT();
649 fn_exit:
650    MPIU_CHKLMEM_FREEALL();
651    MPIDI_FUNC_EXIT(MPID_STATE_RECV_ID_OR_TMPVC_INFO);
652    return mpi_errno;
653 fn_fail:
654    MPIU_CHKPMEM_REAP();
655    MPIU_DBG_MSG_FMT(NEM_SOCK_DET, VERBOSE, (MPIU_DBG_FDEST, "failure. mpi_errno = %d", mpi_errno));
656    goto fn_exit;
657}
658
659#define send_cmd_pkt(fd_, pkt_type_) ( \
660    send_cmd_pkt_func(fd_, pkt_type_) \
661)
662
663/*
664  This function is used to send commands that don't have data but just only
665  the header.
666 */
667#undef FUNCNAME
668#define FUNCNAME send_cmd_pkt
669#undef FCNAME
670#define FCNAME MPIDI_QUOTE(FUNCNAME)
671static int send_cmd_pkt_func(int fd, MPIDI_nem_tcp_pkt_type_t pkt_type)
672{
673    int mpi_errno = MPI_SUCCESS, offset;
674    MPIDI_nem_tcp_header_t pkt;
675    int pkt_len = sizeof(MPIDI_nem_tcp_header_t);
676
677    MPIU_Assert(pkt_type == MPIDI_NEM_TCP_PKT_ID_ACK ||
678                pkt_type == MPIDI_NEM_TCP_PKT_ID_NAK ||
679                pkt_type == MPIDI_NEM_TCP_PKT_DISC_REQ ||
680                pkt_type == MPIDI_NEM_TCP_PKT_DISC_ACK ||
681                pkt_type == MPIDI_NEM_TCP_PKT_DISC_NAK ||
682                pkt_type == MPIDI_NEM_TCP_PKT_TMPVC_ACK ||
683                pkt_type == MPIDI_NEM_TCP_PKT_TMPVC_NAK);
684
685    pkt.pkt_type = pkt_type;
686    pkt.datalen = 0;
687
688    CHECK_EINTR (offset, write(fd, &pkt, pkt_len));
689    MPIU_ERR_CHKANDJUMP1 (offset == -1 && errno != EAGAIN, mpi_errno, MPI_ERR_OTHER,
690                          "**write", "**write %s", strerror (errno));
691    MPIU_ERR_CHKANDJUMP1 (offset != pkt_len, mpi_errno, MPI_ERR_OTHER,
692                          "**write", "**write %s", strerror (errno)); /* FIXME-Z1 */
693 fn_exit:
694    return mpi_errno;
695 fn_fail:
696    MPIU_DBG_MSG_FMT(NEM_SOCK_DET, VERBOSE, (MPIU_DBG_FDEST, "failure. mpi_errno = %d", mpi_errno));
697    goto fn_exit;
698}
699
700
701/*
702  This function is used to recv commands that don't have data but just only
703  the header.
704 */
705#undef FUNCNAME
706#define FUNCNAME recv_cmd_pkt
707#undef FCNAME
708#define FCNAME MPIDI_QUOTE(FUNCNAME)
709static int recv_cmd_pkt(int fd, MPIDI_nem_tcp_pkt_type_t *pkt_type)
710{
711    int mpi_errno = MPI_SUCCESS, nread;
712    MPIDI_nem_tcp_header_t pkt;
713    int pkt_len = sizeof(MPIDI_nem_tcp_header_t);
714    MPIDI_STATE_DECL(MPID_STATE_RECV_CMD_PKT);
715
716    MPIDI_FUNC_ENTER(MPID_STATE_RECV_CMD_PKT);
717
718    CHECK_EINTR (nread, read(fd, &pkt, pkt_len));
719    MPIU_ERR_CHKANDJUMP1 (nread == -1 && errno != EAGAIN, mpi_errno, MPI_ERR_OTHER,
720                          "**read", "**read %s", strerror (errno));
721    MPIU_ERR_CHKANDJUMP2 (nread != pkt_len, mpi_errno, MPI_ERR_OTHER,
722                          "**read", "**read %d %s", nread, strerror (errno)); /* FIXME-Z1 */
723    MPIU_Assert(pkt.datalen == 0);
724    MPIU_Assert(pkt.pkt_type == MPIDI_NEM_TCP_PKT_ID_ACK ||
725                pkt.pkt_type == MPIDI_NEM_TCP_PKT_ID_NAK ||
726                pkt.pkt_type == MPIDI_NEM_TCP_PKT_TMPVC_ACK ||
727                pkt.pkt_type == MPIDI_NEM_TCP_PKT_TMPVC_NAK ||
728                pkt.pkt_type == MPIDI_NEM_TCP_PKT_DISC_REQ ||
729                pkt.pkt_type == MPIDI_NEM_TCP_PKT_DISC_ACK ||
730                pkt.pkt_type == MPIDI_NEM_TCP_PKT_DISC_NAK);
731    *pkt_type = pkt.pkt_type;
732 fn_exit:
733    MPIDI_FUNC_EXIT(MPID_STATE_RECV_CMD_PKT);
734    return mpi_errno;
735 fn_fail:
736    MPIU_DBG_MSG_FMT(NEM_SOCK_DET, VERBOSE, (MPIU_DBG_FDEST, "failure. mpi_errno = %d", mpi_errno));
737    goto fn_exit;
738}
739
740
741
742#undef FUNCNAME
743#define FUNCNAME MPID_nem_tcp_connect
744#undef FCNAME
745#define FCNAME MPIDI_QUOTE(FUNCNAME)
746int MPID_nem_tcp_connect(struct MPIDI_VC *const vc) 
747{
748    sockconn_t *sc = NULL;
749    struct pollfd *plfd = NULL;
750    int index = -1;
751    int mpi_errno = MPI_SUCCESS;
752    freenode_t *node;
753    MPIU_CHKLMEM_DECL(1);
754    MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_TCP_CONNECT);
755
756    MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_TCP_CONNECT);
757
758    MPIU_Assert(vc != NULL);
759
760    /* We have an active connection, start polling more often */
761    MPID_nem_tcp_skip_polls = MAX_SKIP_POLLS_ACTIVE;   
762       
763    MPIDI_CHANGE_VC_STATE(vc, ACTIVE);
764
765    if (((MPIDI_CH3I_VC *)vc->channel_private)->state == MPID_NEM_TCP_VC_STATE_DISCONNECTED) {
766        struct sockaddr_in *sock_addr;
767        struct in_addr addr;
768        int rc = 0;
769
770        MPIU_Assert(VC_FIELD(vc, sc) == NULL);
771        mpi_errno = find_free_entry(&index);
772        if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP (mpi_errno);
773
774        sc = &g_sc_tbl[index];
775        plfd = &MPID_nem_tcp_plfd_tbl[index];       
776
777        /* FIXME: 
778           We need to set addr and port using bc.
779           If a process is dynamically spawned, vc->pg is NULL.
780           In that case, same procedure is done
781           in MPID_nem_tcp_connect_to_root()
782        */
783        if (vc->pg != NULL) { /* VC is not a temporary one */
784            char *bc;
785            int pmi_errno;
786            int val_max_sz;
787
788#ifdef USE_PMI2_API
789            val_max_sz = PMI_MAX_VALLEN;
790#else
791            pmi_errno = PMI_KVS_Get_value_length_max(&val_max_sz);
792            MPIU_ERR_CHKANDJUMP1(pmi_errno, mpi_errno, MPI_ERR_OTHER, "**fail", "**fail %d", pmi_errno);
793#endif
794            MPIU_CHKLMEM_MALLOC(bc, char *, val_max_sz, mpi_errno, "bc");
795           
796            sc->is_tmpvc = FALSE;
797           
798            mpi_errno = vc->pg->getConnInfo(vc->pg_rank, bc, val_max_sz, vc->pg);
799            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
800
801            mpi_errno = MPID_nem_tcp_get_addr_port_from_bc(bc, &addr, &(VC_FIELD(vc, sock_id).sin_port));
802            VC_FIELD(vc, sock_id).sin_addr.s_addr = addr.s_addr;
803            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
804        }
805        else {
806            sc->is_tmpvc = TRUE;
807        }
808
809        sock_addr = &(VC_FIELD(vc, sock_id));
810
811        CHECK_EINTR(sc->fd, socket(AF_INET, SOCK_STREAM, 0));
812        MPIU_ERR_CHKANDJUMP2(sc->fd == -1, mpi_errno, MPI_ERR_OTHER, "**sock_create", 
813                             "**sock_create %s %d", strerror(errno), errno);
814        plfd->fd = sc->fd;
815        MPIU_DBG_MSG_FMT(NEM_SOCK_DET, VERBOSE, (MPIU_DBG_FDEST, "sc->fd=%d, plfd->events=%d, plfd->revents=%d, vc=%p, sc=%p", sc->fd, plfd->events, plfd->revents, vc, sc));
816        mpi_errno = MPID_nem_tcp_set_sockopts(sc->fd);
817        if (mpi_errno) MPIU_ERR_POP (mpi_errno);
818
819        MPIU_DBG_MSG_FMT(NEM_SOCK_DET, VERBOSE, (MPIU_DBG_FDEST, "connecting to 0x%08X:%d", sock_addr->sin_addr.s_addr, sock_addr->sin_port));
820        rc = connect(sc->fd, (SA*)sock_addr, sizeof(*sock_addr)); 
821        /* connect should not be called with CHECK_EINTR macro */
822        MPIU_ERR_CHKANDJUMP2 (rc < 0 && errno != EINPROGRESS, mpi_errno, MPI_ERR_OTHER,
823                              "**sock_connect", "**sock_connect %d %s", errno, strerror (errno));
824       
825        if (rc == 0) {
826            CHANGE_STATE(sc, CONN_STATE_TC_C_CNTD);
827        }
828        else {
829            CHANGE_STATE(sc, CONN_STATE_TC_C_CNTING);
830        }
831       
832/*         sc->handler = sc_state_info[sc->state.cstate].sc_state_handler; */
833        ((MPIDI_CH3I_VC *)vc->channel_private)->state = MPID_NEM_TCP_VC_STATE_CONNECTED;
834        sc->pg_rank = vc->pg_rank;
835
836        if (vc->pg != NULL) { /* normal (non-dynamic) connection */
837            if (IS_SAME_PGID(vc->pg->id, MPIDI_Process.my_pg->id)) {
838                sc->is_same_pg = TRUE;
839                sc->pg_id = NULL;
840            }
841            else {
842                sc->is_same_pg = FALSE;
843                sc->pg_id = vc->pg->id;
844            }
845        }
846        else { /* (vc->pg == NULL), dynamic proc connection - temp vc */
847            sc->is_same_pg = FALSE;
848            sc->pg_id = NULL;
849        }
850
851        /* very important, without this IS_SAME_CONNECTION will always fail */
852        sc->pg_is_set = TRUE;
853
854        ASSIGN_SC_TO_VC(vc, sc);
855        sc->vc = vc;
856        MPIU_DBG_MSG_FMT(NEM_SOCK_DET, VERBOSE, (MPIU_DBG_FDEST, "about to incr sc_ref_count sc=%p sc->vc=%p sc_ref_count=%d", sc, sc->vc, VC_FIELD(sc->vc, sc_ref_count)));
857        ++VC_FIELD(vc, sc_ref_count);
858    }
859    else if (((MPIDI_CH3I_VC *)vc->channel_private)->state == MPID_NEM_TCP_VC_STATE_CONNECTED) {
860        sc = VC_FIELD(vc, sc);
861        MPIU_Assert(sc != NULL);
862        /* Do nothing here, the caller just needs to wait for the connection
863           state machine to work its way through the states.  Doing something at
864           this point will almost always just mess up any head-to-head
865           resolution. */
866    }
867    else {
868        MPIU_Assert(0);
869    }
870
871 fn_exit:
872    /* MPID_nem_tcp_connpoll(); FIXME-Imp should be called? */
873    MPIU_CHKLMEM_FREEALL();
874    MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_TCP_CONNECT);
875    return mpi_errno;
876 fn_fail:
877    if (index != -1) {
878        if (sc->fd != CONN_INVALID_FD) {
879            MPIU_DBG_MSG_FMT(NEM_SOCK_DET, VERBOSE, (MPIU_DBG_FDEST, "MPID_nem_tcp_connect(). closing fd = %d", sc->fd));
880            close(sc->fd);
881            sc->fd = CONN_INVALID_FD;
882            plfd->fd = CONN_INVALID_FD;
883        }
884        node = MPIU_Malloc(sizeof(freenode_t));     
885        MPIU_ERR_CHKANDSTMT(node == NULL, mpi_errno, MPI_ERR_OTHER, goto fn_exit, "**nomem");
886        node->index = index;
887/*         Note: MPIU_ERR_CHKANDJUMP should not be used here as it will be recursive  */
888/*         within fn_fail */ 
889        Q_ENQUEUE(&freeq, node);
890    }
891    MPIU_DBG_MSG_FMT(NEM_SOCK_DET, VERBOSE, (MPIU_DBG_FDEST, "failure. mpi_errno = %d", mpi_errno));
892    goto fn_exit;
893}
894
895/* Called to transition an sc to CLOSED.  This might be done as part of a ch3
896   close protocol or it might be done because the sc is in a quiescent state. */
897static int cleanup_sc(sockconn_t *sc)
898{
899    int mpi_errno = MPI_SUCCESS;
900    int rc;
901    struct pollfd *plfd = NULL;
902    freenode_t *node;
903    MPIU_CHKPMEM_DECL(1);
904    MPIDI_STATE_DECL(MPID_STATE_CLEANUP_SC);
905
906    MPIDI_FUNC_ENTER(MPID_STATE_CLEANUP_SC);
907
908    if (sc == NULL)
909        goto fn_exit;
910
911    if (sc->vc) {
912        MPIU_DBG_MSG_FMT(NEM_SOCK_DET, VERBOSE, (MPIU_DBG_FDEST, "about to decr sc_ref_count sc=%p sc->vc=%p sc_ref_count=%d", sc, sc->vc, VC_FIELD(sc->vc, sc_ref_count)));
913        MPIU_Assert(VC_FIELD(sc->vc, sc_ref_count) > 0);
914        --VC_FIELD(sc->vc, sc_ref_count);
915    }
916   
917    plfd = &MPID_nem_tcp_plfd_tbl[sc->index]; 
918    MPIU_DBG_MSG_FMT(NEM_SOCK_DET, VERBOSE, (MPIU_DBG_FDEST, "vc=%p, sc=%p, closing fd=%d", sc->vc, sc, sc->fd));
919
920    CHECK_EINTR(rc, close(sc->fd));
921
922    MPIU_ERR_CHKANDJUMP1 (rc == -1 && errno != EAGAIN && errno != EBADF, mpi_errno, MPI_ERR_OTHER, 
923                          "**close", "**close %s", strerror (errno));
924
925    sc->fd = plfd->fd = CONN_INVALID_FD;
926    if (sc->vc && VC_FIELD(sc->vc, sc) == sc) /* this vc may be connecting/accepting with another sc e.g., this sc lost the tie-breaker */
927    {
928        ((MPIDI_CH3I_VC *)sc->vc->channel_private)->state = MPID_NEM_TCP_VC_STATE_DISCONNECTED;
929        ASSIGN_SC_TO_VC(sc->vc, NULL);
930    }
931
932    CHANGE_STATE(sc, CONN_STATE_TS_CLOSED);
933    sc->vc = NULL;
934
935    MPIU_CHKPMEM_MALLOC (node, freenode_t *, sizeof(freenode_t), mpi_errno, "free node");
936    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
937    node->index = sc->index;
938    Q_ENQUEUE(&freeq, node);
939
940    MPIU_CHKPMEM_COMMIT();
941 fn_exit:
942    MPIDI_FUNC_EXIT(MPID_STATE_CLEANUP_SC);
943    return mpi_errno;
944 fn_fail:
945    MPIU_CHKPMEM_REAP();
946    MPIU_DBG_MSG_FMT(NEM_SOCK_DET, VERBOSE, (MPIU_DBG_FDEST, "failure. mpi_errno = %d", mpi_errno));
947    goto fn_exit;
948}
949
950/* this function is called when vc->state becomes CLOSE_ACKED */
951/* FIXME XXX DJG do we need to do anything here to ensure that the final
952   close(TRUE) packet has made it into a writev call?  The code might have a
953   race for queued messages. */
954#undef FUNCNAME
955#define FUNCNAME MPID_nem_tcp_cleanup
956#undef FCNAME
957#define FCNAME MPIDI_QUOTE(FUNCNAME)
958int MPID_nem_tcp_cleanup (struct MPIDI_VC *const vc)
959{
960    int mpi_errno = MPI_SUCCESS, i;
961    MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_TCP_CLEANUP);
962
963    MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_TCP_CLEANUP);
964
965    MPIU_Assert(vc->state == MPIDI_VC_STATE_CLOSE_ACKED);
966
967    if (VC_FIELD(vc, sc) != NULL) {
968        mpi_errno = cleanup_sc(VC_FIELD(vc, sc));
969        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
970    }
971
972    i = 0;
973    while (VC_FIELD(vc, sc_ref_count) > 0 && i < g_tbl_size) {
974        if (g_sc_tbl[i].vc == vc) {
975            /* We've found a proto-connection that doesn't yet have enough
976               information to resolve the head-to-head situation.  If we don't
977               clean him up he'll end up accessing the about-to-be-freed vc. */
978            mpi_errno = cleanup_sc(&g_sc_tbl[i]);
979            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
980            MPIU_Assert(g_sc_tbl[i].vc == NULL);
981        }
982        ++i;
983    }
984
985    /* cleanup_sc can technically cause a reconnect on a per-sc basis, but I
986       don't think that it can happen when cleanup is called.  Let's
987       assert this for now and remove it if we prove that it can happen. */
988    MPIU_Assert(VC_FIELD(vc, sc_ref_count) == 0);
989
990 fn_exit:
991    MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_TCP_CLEANUP);
992    return mpi_errno;
993 fn_fail:
994    MPIU_DBG_MSG_FMT(NEM_SOCK_DET, VERBOSE, (MPIU_DBG_FDEST, "failure. mpi_errno = %d", mpi_errno));
995    goto fn_exit;
996}
997
998
999#undef FUNCNAME
1000#define FUNCNAME state_tc_c_cnting_handler
1001#undef FCNAME
1002#define FCNAME MPIDI_QUOTE(FUNCNAME)
1003static int state_tc_c_cnting_handler(struct pollfd *const plfd, sockconn_t *const sc)
1004{
1005    int mpi_errno = MPI_SUCCESS;
1006    MPID_NEM_TCP_SOCK_STATUS_t stat;
1007    MPIDI_STATE_DECL(MPID_STATE_STATE_TC_C_CNTING_HANDLER);
1008
1009    MPIDI_FUNC_ENTER(MPID_STATE_STATE_TC_C_CNTING_HANDLER);
1010   
1011    stat = MPID_nem_tcp_check_sock_status(plfd);
1012
1013    if (stat == MPID_NEM_TCP_SOCK_CONNECTED) {
1014        CHANGE_STATE(sc, CONN_STATE_TC_C_CNTD);
1015    }
1016    else if (stat == MPID_NEM_TCP_SOCK_ERROR_EOF) {
1017        MPIU_DBG_MSG_FMT(NEM_SOCK_DET, VERBOSE, (MPIU_DBG_FDEST, "state_tc_c_cnting_handler(): changing to "
1018              "quiescent"));
1019        CHANGE_STATE(sc, CONN_STATE_TS_D_QUIESCENT);
1020        /* FIXME: retry 'n' number of retries before signalling an error to VC layer. */
1021    }
1022    else { /* stat == MPID_NEM_TCP_SOCK_NOEVENT */
1023        /*
1024          Still connecting... let it. While still connecting, even if
1025          a duplicate connection exists and this connection can be closed, it can get
1026          tricky. close/shutdown on a unconnected fd will fail anyways. So, let it either
1027          connect or let the connect itself fail on the fd before making a transition
1028          from this state. However, we are relying on the time taken by connect to
1029          report an error. If we want control over that time, fix the code to poll for
1030          that amount of time or change the socket option to control the time-out of
1031          connect.
1032        */
1033    }
1034
1035    MPIDI_FUNC_EXIT(MPID_STATE_STATE_TC_C_CNTING_HANDLER);
1036    return mpi_errno;
1037}
1038
1039#undef FUNCNAME
1040#define FUNCNAME state_tc_c_cntd_handler
1041#undef FCNAME
1042#define FCNAME MPIDI_QUOTE(FUNCNAME)
1043static int state_tc_c_cntd_handler(struct pollfd *const plfd, sockconn_t *const sc)
1044{
1045    int mpi_errno = MPI_SUCCESS;
1046    MPIDI_STATE_DECL(MPID_STATE_STATE_TC_C_CNTD_HANDLER);
1047
1048    MPIDI_FUNC_ENTER(MPID_STATE_STATE_TC_C_CNTD_HANDLER);
1049
1050    if (found_better_sc(sc, NULL)) {
1051        MPIU_DBG_MSG_FMT(NEM_SOCK_DET, VERBOSE, (MPIU_DBG_FDEST, "state_tc_c_cntd_handler(): changing to "
1052              "quiescent"));
1053        CHANGE_STATE(sc, CONN_STATE_TS_D_QUIESCENT);
1054        goto fn_exit;
1055    }
1056   
1057    if (IS_WRITEABLE(plfd)) {
1058        MPIU_DBG_MSG(NEM_SOCK_DET, VERBOSE, "inside if (IS_WRITEABLE(plfd))");
1059        if (!sc->is_tmpvc) { /* normal connection */
1060            if (send_id_info(sc) == MPI_SUCCESS) {
1061                CHANGE_STATE(sc, CONN_STATE_TC_C_RANKSENT);
1062            }
1063            else {
1064                mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME,
1065                                                 __LINE__,  MPI_ERR_OTHER, 
1066                                                 "**fail", 0);
1067                /* FIXME-Danger add error string  actual string : "**cannot send idinfo" */
1068                goto fn_fail;
1069            }
1070        }
1071        else { /* temp VC */
1072            if (send_tmpvc_info(sc) == MPI_SUCCESS) {
1073                CHANGE_STATE(sc, CONN_STATE_TC_C_TMPVCSENT);
1074            }
1075            else {
1076                mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME,
1077                                                 __LINE__, MPI_ERR_OTHER,
1078                                                 "**fail", 0);
1079                goto fn_fail;
1080            }
1081        } 
1082    }
1083    else {
1084        /* Remain in the same state */
1085    }
1086 fn_exit:
1087    MPIDI_FUNC_EXIT(MPID_STATE_STATE_TC_C_CNTD_HANDLER);
1088    return mpi_errno;
1089 fn_fail:
1090    MPIU_DBG_MSG_FMT(NEM_SOCK_DET, VERBOSE, (MPIU_DBG_FDEST, "failure. mpi_errno = %d", mpi_errno));
1091    goto fn_exit;
1092}
1093
1094#undef FUNCNAME
1095#define FUNCNAME state_c_ranksent_handler
1096#undef FCNAME
1097#define FCNAME MPIDI_QUOTE(FUNCNAME)
1098static int state_c_ranksent_handler(struct pollfd *const plfd, sockconn_t *const sc)
1099{
1100    int mpi_errno = MPI_SUCCESS;
1101    MPIDI_nem_tcp_pkt_type_t pkt_type;
1102    MPIDI_STATE_DECL(MPID_STATE_STATE_C_RANKSENT_HANDLER);
1103
1104    MPIDI_FUNC_ENTER(MPID_STATE_STATE_C_RANKSENT_HANDLER);
1105
1106    if (IS_READABLE(plfd)) {
1107        mpi_errno = recv_cmd_pkt(sc->fd, &pkt_type);
1108        if (mpi_errno != MPI_SUCCESS) {
1109            MPIU_DBG_MSG_FMT(NEM_SOCK_DET, VERBOSE, (MPIU_DBG_FDEST, "state_c_ranksent_handler() 1: changing to "
1110              "quiescent.. "));
1111            CHANGE_STATE(sc, CONN_STATE_TS_D_QUIESCENT);
1112            if (vc_is_in_shutdown(sc->vc)) {
1113                mpi_errno = MPI_SUCCESS;
1114            }
1115        }
1116        else {
1117            MPIU_Assert(pkt_type == MPIDI_NEM_TCP_PKT_ID_ACK ||
1118                        pkt_type == MPIDI_NEM_TCP_PKT_ID_NAK);
1119
1120            if (pkt_type == MPIDI_NEM_TCP_PKT_ID_ACK) {
1121                CHANGE_STATE(sc, CONN_STATE_TS_COMMRDY);
1122                ASSIGN_SC_TO_VC(sc->vc, sc);
1123
1124                MPID_nem_tcp_conn_est (sc->vc);
1125                MPIU_DBG_MSG_FMT(NEM_SOCK_DET, VERBOSE, (MPIU_DBG_FDEST, "c_ranksent_handler(): connection established (sc=%p, sc->vc=%p, fd=%d)", sc, sc->vc, sc->fd));
1126            }
1127            else { /* pkt_type must be MPIDI_NEM_TCP_PKT_ID_NAK */
1128                CHANGE_STATE(sc, CONN_STATE_TS_D_QUIESCENT);
1129            }
1130        }   
1131    }
1132
1133    MPIDI_FUNC_EXIT(MPID_STATE_STATE_C_RANKSENT_HANDLER);
1134    return mpi_errno;
1135}
1136
1137#undef FUNCNAME
1138#define FUNCNAME state_c_tmpvcsent_handler
1139#undef FCNAME
1140#define FCNAME MPIDI_QUOTE(FUNCNAME)
1141static int state_c_tmpvcsent_handler(struct pollfd *const plfd, sockconn_t *const sc)
1142{
1143    int mpi_errno = MPI_SUCCESS;
1144    MPIDI_nem_tcp_pkt_type_t pkt_type;
1145    MPIDI_STATE_DECL(MPID_STATE_STATE_C_TMPVCSENT_HANDLER);
1146
1147    MPIDI_FUNC_ENTER(MPID_STATE_STATE_C_TMPVCSENT_HANDLER);
1148
1149
1150    if (IS_READABLE(plfd)) {
1151        mpi_errno = recv_cmd_pkt(sc->fd, &pkt_type);
1152        if (mpi_errno != MPI_SUCCESS) {
1153            CHANGE_STATE(sc, CONN_STATE_TS_D_QUIESCENT);
1154            /* no head-to-head issues to deal with, if we failed to recv the
1155               packet then there really was a problem */
1156        }
1157        else {
1158            MPIU_Assert(pkt_type == MPIDI_NEM_TCP_PKT_TMPVC_ACK ||
1159                        pkt_type == MPIDI_NEM_TCP_PKT_TMPVC_NAK);
1160
1161            if (pkt_type == MPIDI_NEM_TCP_PKT_TMPVC_ACK) {
1162                CHANGE_STATE(sc, CONN_STATE_TS_COMMRDY);
1163                ASSIGN_SC_TO_VC(sc->vc, sc);
1164                MPID_nem_tcp_conn_est (sc->vc);
1165                MPIU_DBG_MSG_FMT(NEM_SOCK_DET, VERBOSE, (MPIU_DBG_FDEST, "c_tmpvcsent_handler(): connection established (fd=%d, sc=%p, sc->vc=%p)", sc->fd, sc, sc->vc));
1166            }
1167            else { /* pkt_type must be MPIDI_NEM_TCP_PKT_ID_NAK */
1168                MPIU_DBG_MSG_FMT(NEM_SOCK_DET, VERBOSE, (MPIU_DBG_FDEST, "state_c_tmpvcsent_handler() 2: changing to quiescent"));
1169                CHANGE_STATE(sc, CONN_STATE_TS_D_QUIESCENT);
1170            }
1171        }   
1172    }
1173
1174    MPIDI_FUNC_EXIT(MPID_STATE_STATE_C_TMPVCSENT_HANDLER);
1175    return mpi_errno;
1176}
1177
1178#undef FUNCNAME
1179#define FUNCNAME state_l_cntd_handler
1180#undef FCNAME
1181#define FCNAME MPIDI_QUOTE(FUNCNAME)
1182static int state_l_cntd_handler(struct pollfd *const plfd, sockconn_t *const sc)
1183{
1184    int mpi_errno = MPI_SUCCESS;
1185    MPID_NEM_TCP_SOCK_STATUS_t stat;
1186    int got_sc_eof = 0;
1187    MPIDI_STATE_DECL(MPID_STATE_STATE_L_CNTD_HANDLER);
1188
1189    MPIDI_FUNC_ENTER(MPID_STATE_STATE_L_CNTD_HANDLER);
1190
1191    stat = MPID_nem_tcp_check_sock_status(plfd);
1192    if (stat == MPID_NEM_TCP_SOCK_ERROR_EOF) {
1193        MPIU_DBG_MSG_FMT(NEM_SOCK_DET, VERBOSE, (MPIU_DBG_FDEST, "state_l_cntd_handler() 1: changing to "
1194            "quiescent"));
1195        CHANGE_STATE(sc, CONN_STATE_TS_D_QUIESCENT);
1196        goto fn_exit;
1197    }
1198
1199    /* We have an active connection, start polling more often */
1200    MPID_nem_tcp_skip_polls = MAX_SKIP_POLLS_ACTIVE;
1201
1202    if (IS_READABLE(plfd)) {
1203        mpi_errno = recv_id_or_tmpvc_info(sc, &got_sc_eof);
1204        if (mpi_errno == MPI_SUCCESS) {
1205            if (got_sc_eof) {
1206                /* recv_id_or_tmpvc already moved the sc to QUIESCENT, just return */
1207                goto fn_exit;
1208            }
1209
1210            if (!sc->is_tmpvc) {
1211                CHANGE_STATE(sc, CONN_STATE_TA_C_RANKRCVD);
1212            }
1213            else {
1214                CHANGE_STATE(sc, CONN_STATE_TA_C_TMPVCRCVD);
1215            }
1216        }
1217        else {
1218            MPIU_DBG_MSG_FMT(NEM_SOCK_DET, VERBOSE, (MPIU_DBG_FDEST, "state_l_cntd_handler() 2: changing to "
1219               "quiescent"));
1220            CHANGE_STATE(sc, CONN_STATE_TS_D_QUIESCENT);
1221
1222            MPIU_ERR_POP(mpi_errno);
1223        }
1224    }
1225    else {
1226        /* remain in same state */
1227        MPIU_DBG_MSG_FMT(NEM_SOCK_DET, VERBOSE, (MPIU_DBG_FDEST, "!IS_READABLE(plfd) fd=%d events=%#x revents=%#x", plfd->fd, plfd->events, plfd->revents));
1228    }
1229
1230 fn_exit:
1231    MPIDI_FUNC_EXIT(MPID_STATE_STATE_L_CNTD_HANDLER);
1232    return mpi_errno;
1233 fn_fail:
1234    MPIU_DBG_MSG_FMT(NEM_SOCK_DET, VERBOSE, (MPIU_DBG_FDEST, "failure. mpi_errno = %d", mpi_errno));
1235    goto fn_exit;
1236
1237}
1238
1239/*
1240  Returns TRUE, if the process(self) wins against the remote process
1241  FALSE, otherwise
1242 */
1243#undef FUNCNAME
1244#define FUNCNAME do_i_win
1245#undef FCNAME
1246#define FCNAME MPIDI_QUOTE(FUNCNAME)
1247static int do_i_win(sockconn_t *rmt_sc)
1248{
1249    int win = FALSE;
1250    MPIDI_STATE_DECL(MPID_STATE_DO_I_WIN);
1251
1252    MPIDI_FUNC_ENTER(MPID_STATE_DO_I_WIN);
1253
1254    MPIU_Assert(rmt_sc->pg_is_set);
1255
1256    if (rmt_sc->is_same_pg) {
1257        if (MPIDI_Process.my_pg_rank > rmt_sc->pg_rank)
1258            win = TRUE;
1259    }
1260    else {
1261        if (strcmp(MPIDI_Process.my_pg->id, rmt_sc->pg_id) > 0)
1262            win = TRUE;
1263    }
1264
1265    MPIU_DBG_MSG_FMT(NEM_SOCK_DET, VERBOSE,
1266                     (MPIU_DBG_FDEST, "do_i_win(rmt_sc=%p (%s)) win=%s is_same_pg=%s my_pg_rank=%d rmt_pg_rank=%d",
1267                      rmt_sc, CONN_STATE_STR[rmt_sc->state.cstate],
1268                      (win ? "TRUE" : "FALSE"),(rmt_sc->is_same_pg ? "TRUE" : "FALSE"), MPIDI_Process.my_pg_rank,
1269                      rmt_sc->pg_rank));
1270    MPIDI_FUNC_EXIT(MPID_STATE_DO_I_WIN);
1271    return win;
1272}
1273
1274#undef FUNCNAME
1275#define FUNCNAME state_l_rankrcvd_handler
1276#undef FCNAME
1277#define FCNAME MPIDI_QUOTE(FUNCNAME)
1278static int state_l_rankrcvd_handler(struct pollfd *const plfd, sockconn_t *const sc)
1279{
1280    int mpi_errno = MPI_SUCCESS;
1281    MPID_NEM_TCP_SOCK_STATUS_t stat;
1282    sockconn_t *fnd_sc = NULL;
1283    int snd_nak = FALSE;
1284    MPIDI_STATE_DECL(MPID_STATE_STATE_L_RANKRCVD_HANDLER);
1285
1286    MPIDI_FUNC_ENTER(MPID_STATE_STATE_L_RANKRCVD_HANDLER);
1287
1288    stat = MPID_nem_tcp_check_sock_status(plfd);
1289    if (stat == MPID_NEM_TCP_SOCK_ERROR_EOF) {
1290        MPIU_DBG_MSG_FMT(NEM_SOCK_DET, VERBOSE, (MPIU_DBG_FDEST, "state_l_rankrcvd_handler() 1: changing to quiescent"));
1291        CHANGE_STATE(sc, CONN_STATE_TS_D_QUIESCENT);
1292        goto fn_exit;
1293    }
1294    if (found_better_sc(sc, &fnd_sc)) {
1295        if (fnd_sc->state.cstate == CONN_STATE_TS_COMMRDY)
1296            snd_nak = TRUE;
1297        else if (fnd_sc->state.cstate == CONN_STATE_TC_C_RANKSENT)
1298            snd_nak = do_i_win(sc);
1299    }
1300    if (IS_WRITEABLE(plfd)) {
1301        if (snd_nak) {
1302            if (send_cmd_pkt(sc->fd, MPIDI_NEM_TCP_PKT_ID_NAK) == MPI_SUCCESS) {
1303                MPIU_DBG_MSG_FMT(NEM_SOCK_DET, VERBOSE, (MPIU_DBG_FDEST, "state_l_rankrcvd_handler() 2: changing to quiescent"));
1304                CHANGE_STATE(sc, CONN_STATE_TS_D_QUIESCENT);
1305            }
1306        }
1307        else {
1308            /* The following line is _crucial_ to correct operation.  We need to
1309             * ensure that all head-to-head resolution has completed before we
1310             * move to COMMRDY and send any pending messages.  If we don't this
1311             * connection could shut down before the other connection has a
1312             * chance to finish the connect protocol.  That can lead to all
1313             * kinds of badness, including zombie connections, segfaults, and
1314             * accessing PG/VC info that is no longer present. */
1315            if (VC_FIELD(sc->vc, sc_ref_count) > 1) goto fn_exit;
1316
1317            if (send_cmd_pkt(sc->fd, MPIDI_NEM_TCP_PKT_ID_ACK) == MPI_SUCCESS) {
1318                CHANGE_STATE(sc, CONN_STATE_TS_COMMRDY);
1319                ASSIGN_SC_TO_VC(sc->vc, sc);
1320                MPIU_DBG_MSG_FMT(NEM_SOCK_DET, VERBOSE, (MPIU_DBG_FDEST, "connection established: sc=%p, sc->vc=%p, sc->fd=%d, is_same_pg=%s, pg_rank=%d", sc, sc->vc, sc->fd, (sc->is_same_pg ? "TRUE" : "FALSE"), sc->pg_rank));
1321                MPID_nem_tcp_conn_est (sc->vc);
1322            }
1323        }
1324    }
1325
1326 fn_exit:
1327    MPIDI_FUNC_EXIT(MPID_STATE_STATE_L_RANKRCVD_HANDLER);
1328    return mpi_errno;
1329}
1330
1331#undef FUNCNAME
1332#define FUNCNAME state_l_tmpvcrcvd_handler
1333#undef FCNAME
1334#define FCNAME MPIDI_QUOTE(FUNCNAME)
1335static int state_l_tmpvcrcvd_handler(struct pollfd *const plfd, sockconn_t *const sc)
1336{
1337    int mpi_errno = MPI_SUCCESS;
1338    MPID_NEM_TCP_SOCK_STATUS_t stat;
1339    int snd_nak = FALSE;
1340    MPIDI_STATE_DECL(MPID_STATE_STATE_L_TMPVCRCVD_HANDLER);
1341
1342    MPIDI_FUNC_ENTER(MPID_STATE_STATE_L_TMPVCRCVD_HANDLER);
1343
1344    stat = MPID_nem_tcp_check_sock_status(plfd);
1345    if (stat == MPID_NEM_TCP_SOCK_ERROR_EOF) {
1346        CHANGE_STATE(sc, CONN_STATE_TS_D_QUIESCENT);
1347        goto fn_exit;
1348    }
1349    /* we don't want to perform any h2h resolution for temp vcs */
1350    if (IS_WRITEABLE(plfd)) {
1351        if (snd_nak) {
1352            if (send_cmd_pkt(sc->fd, MPIDI_NEM_TCP_PKT_TMPVC_NAK) == MPI_SUCCESS) {
1353                CHANGE_STATE(sc, CONN_STATE_TS_D_QUIESCENT);
1354            }
1355        }
1356        else {
1357            if (send_cmd_pkt(sc->fd, MPIDI_NEM_TCP_PKT_TMPVC_ACK) == MPI_SUCCESS) {
1358                CHANGE_STATE(sc, CONN_STATE_TS_COMMRDY);
1359                ASSIGN_SC_TO_VC(sc->vc, sc);
1360                MPID_nem_tcp_conn_est (sc->vc);
1361                MPIU_DBG_MSG_FMT(NEM_SOCK_DET, VERBOSE, (MPIU_DBG_FDEST, "fd=%d: TMPVC_ACK sent, connection established!", sc->fd));
1362            }
1363        }
1364    }
1365
1366 fn_exit:
1367    MPIDI_FUNC_EXIT(MPID_STATE_STATE_L_TMPVCRCVD_HANDLER);
1368    return mpi_errno;
1369}
1370
1371#undef FUNCNAME
1372#define FUNCNAME MPID_nem_tcp_recv_handler
1373#undef FCNAME
1374#define FCNAME MPIDI_QUOTE(FUNCNAME)
1375static int MPID_nem_tcp_recv_handler (struct pollfd *pfd, sockconn_t *sc)
1376{
1377    int mpi_errno = MPI_SUCCESS;
1378    ssize_t bytes_recvd;
1379    MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_TCP_RECV_HANDLER);
1380
1381    MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_TCP_RECV_HANDLER);
1382
1383    if (((MPIDI_CH3I_VC *)sc->vc->channel_private)->recv_active == NULL)
1384    {
1385        /* receive a new message */
1386        CHECK_EINTR(bytes_recvd, recv(sc->fd, recv_buf, MPID_NEM_TCP_RECV_MAX_PKT_LEN, 0));
1387        if (bytes_recvd <= 0)
1388        {
1389            if (bytes_recvd == -1 && errno == EAGAIN) /* handle this fast */
1390                goto fn_exit;
1391
1392            if (bytes_recvd == 0)
1393            {
1394                MPIU_Assert(sc != NULL);
1395                MPIU_Assert(sc->vc != NULL);
1396                /* sc->vc->sc will be NULL if sc->vc->state == _INACTIVE */
1397                MPIU_Assert(VC_FIELD(sc->vc, sc) == NULL || VC_FIELD(sc->vc, sc) == sc);
1398
1399                if (vc_is_in_shutdown(sc->vc))
1400                {
1401                    /* there's currently no hook for CH3 to tell nemesis/tcp
1402                       that we are in the middle of a disconnection dance.  So
1403                       if we don't check to see if we are currently
1404                       disconnecting, then we end up with a potential race where
1405                       the other side performs a tcp close() before we do and we
1406                       blow up here. */
1407                    CHANGE_STATE(sc, CONN_STATE_TS_D_QUIESCENT);
1408                    goto fn_exit;
1409                }
1410                else
1411                {
1412                    MPIU_DBG_MSG_D(CH3_CHANNEL, VERBOSE, "ERROR: sock (fd=%d) is closed: bytes_recvd == 0", sc->fd );
1413                    MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**sock_closed");
1414                }
1415            }
1416            else
1417            {
1418                MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**read", "**read %s", strerror(errno));
1419            }
1420        }
1421   
1422        MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "New recv " MPIDI_MSG_SZ_FMT " (fd=%d, vc=%p, sc=%p)", bytes_recvd, sc->fd, sc->vc, sc));
1423
1424        mpi_errno = MPID_nem_handle_pkt(sc->vc, recv_buf, bytes_recvd);
1425        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
1426    }
1427    else
1428    {
1429        /* there is a pending receive, receive it directly into the user buffer */
1430        MPID_Request *rreq = ((MPIDI_CH3I_VC *)sc->vc->channel_private)->recv_active;
1431        MPID_IOV *iov = &rreq->dev.iov[rreq->dev.iov_offset];
1432        int (*reqFn)(MPIDI_VC_t *, MPID_Request *, int *);
1433
1434        CHECK_EINTR(bytes_recvd, readv(sc->fd, iov, rreq->dev.iov_count));
1435        if (bytes_recvd <= 0)
1436        {
1437            if (bytes_recvd == -1 && errno == EAGAIN) /* handle this fast */
1438                goto fn_exit;
1439
1440            if (bytes_recvd == 0)
1441            {
1442                MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**sock_closed");
1443            }
1444            else
1445                MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**read", "**read %s", strerror(errno));
1446        }
1447
1448        MPIU_DBG_MSG_D(CH3_CHANNEL, VERBOSE, "Cont recv %ld", (long int)bytes_recvd);
1449
1450        /* update the iov */
1451        for (iov = &rreq->dev.iov[rreq->dev.iov_offset]; iov < &rreq->dev.iov[rreq->dev.iov_offset + rreq->dev.iov_count]; ++iov)
1452        {
1453            if (bytes_recvd < iov->MPID_IOV_LEN)
1454            {
1455                iov->MPID_IOV_BUF = (char *)iov->MPID_IOV_BUF + bytes_recvd;
1456                iov->MPID_IOV_LEN -= bytes_recvd;
1457                rreq->dev.iov_count = &rreq->dev.iov[rreq->dev.iov_offset + rreq->dev.iov_count] - iov;
1458                rreq->dev.iov_offset = iov - rreq->dev.iov;
1459                MPIU_DBG_MSG_D(CH3_CHANNEL, VERBOSE, "bytes_recvd = %ld", (long int)bytes_recvd);
1460                MPIU_DBG_MSG_D(CH3_CHANNEL, VERBOSE, "iov len = %ld", (long int)iov->MPID_IOV_LEN);
1461                MPIU_DBG_MSG_D(CH3_CHANNEL, VERBOSE, "iov_offset = %d", rreq->dev.iov_offset);
1462                goto fn_exit;
1463            }
1464            bytes_recvd -= iov->MPID_IOV_LEN;
1465        }
1466       
1467        /* the whole iov has been received */
1468
1469        reqFn = rreq->dev.OnDataAvail;
1470        if (!reqFn)
1471        {
1472            MPIU_Assert(MPIDI_Request_get_type(rreq) != MPIDI_REQUEST_TYPE_GET_RESP);
1473            MPIDI_CH3U_Request_complete(rreq);
1474            MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "...complete");
1475            ((MPIDI_CH3I_VC *)sc->vc->channel_private)->recv_active = NULL;
1476        }
1477        else
1478        {
1479            int complete = 0;
1480               
1481            mpi_errno = reqFn(sc->vc, rreq, &complete);
1482            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
1483
1484            if (complete)
1485            {
1486                MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "...complete");
1487                ((MPIDI_CH3I_VC *)sc->vc->channel_private)->recv_active = NULL;
1488            }
1489            else
1490            {
1491                MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "...not complete");
1492            }
1493        }       
1494    }
1495
1496 fn_exit:
1497    MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_TCP_RECV_HANDLER);
1498    return mpi_errno;
1499 fn_fail:
1500    goto fn_exit;
1501}
1502
1503#undef FUNCNAME
1504#define FUNCNAME state_commrdy_handler
1505#undef FCNAME
1506#define FCNAME MPIDI_QUOTE(FUNCNAME)
1507static int state_commrdy_handler(struct pollfd *const plfd, sockconn_t *const sc)
1508{
1509    int mpi_errno = MPI_SUCCESS;
1510    MPIDI_STATE_DECL(MPID_STATE_STATE_COMMRDY_HANDLER);
1511
1512    MPIDI_FUNC_ENTER(MPID_STATE_STATE_COMMRDY_HANDLER);
1513
1514    if (IS_READABLE(plfd))
1515    {
1516        mpi_errno = MPID_nem_tcp_recv_handler(plfd, sc);
1517        if (mpi_errno) MPIU_ERR_POP (mpi_errno);
1518    }
1519    if (IS_WRITEABLE(plfd))
1520    {
1521        mpi_errno = MPID_nem_tcp_send_queued(sc->vc);
1522        if (mpi_errno) MPIU_ERR_POP (mpi_errno);
1523    }
1524 fn_exit:
1525    MPIDI_FUNC_EXIT(MPID_STATE_STATE_COMMRDY_HANDLER);
1526    return mpi_errno;
1527 fn_fail:
1528    goto fn_exit;
1529
1530}
1531
1532#undef FUNCNAME
1533#define FUNCNAME state_d_quiescent_handler
1534#undef FCNAME
1535#define FCNAME MPIDI_QUOTE(FUNCNAME)
1536static int state_d_quiescent_handler(struct pollfd *const plfd, sockconn_t *const sc)
1537{
1538    int mpi_errno = MPI_SUCCESS;
1539    MPIDI_STATE_DECL(MPID_STATE_STATE_D_QUIESCENT_HANDLER);
1540
1541    MPIDI_FUNC_ENTER(MPID_STATE_STATE_D_QUIESCENT_HANDLER);
1542
1543    mpi_errno = cleanup_sc(sc);
1544    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
1545
1546 fn_exit:
1547    MPIDI_FUNC_EXIT(MPID_STATE_STATE_D_QUIESCENT_HANDLER);
1548    return mpi_errno;
1549 fn_fail:
1550    MPIU_DBG_MSG_FMT(NEM_SOCK_DET, VERBOSE, (MPIU_DBG_FDEST, "failure. mpi_errno = %d", mpi_errno));
1551    goto fn_exit;
1552}
1553
1554#undef FUNCNAME
1555#define FUNCNAME MPID_nem_tcp_sm_init
1556#undef FCNAME
1557#define FCNAME MPIDI_QUOTE(FUNCNAME)
1558int MPID_nem_tcp_sm_init()
1559{
1560    int mpi_errno = MPI_SUCCESS;
1561    MPIU_CHKPMEM_DECL(1);
1562    /* Set the appropriate handlers */
1563    sc_state_info[CONN_STATE_TS_CLOSED].sc_state_handler = NULL;
1564    sc_state_info[CONN_STATE_TC_C_CNTING].sc_state_handler = state_tc_c_cnting_handler;
1565    sc_state_info[CONN_STATE_TC_C_CNTD].sc_state_handler = state_tc_c_cntd_handler;
1566    sc_state_info[CONN_STATE_TC_C_RANKSENT].sc_state_handler = state_c_ranksent_handler;
1567    sc_state_info[CONN_STATE_TC_C_TMPVCSENT].sc_state_handler = state_c_tmpvcsent_handler;
1568    sc_state_info[CONN_STATE_TA_C_CNTD].sc_state_handler = state_l_cntd_handler;
1569    sc_state_info[CONN_STATE_TA_C_RANKRCVD].sc_state_handler = state_l_rankrcvd_handler;
1570    sc_state_info[CONN_STATE_TA_C_TMPVCRCVD].sc_state_handler = state_l_tmpvcrcvd_handler;
1571    sc_state_info[CONN_STATE_TS_COMMRDY].sc_state_handler = state_commrdy_handler;
1572    sc_state_info[CONN_STATE_TS_D_QUIESCENT].sc_state_handler = state_d_quiescent_handler;
1573
1574    /* Set the appropriate states */
1575    sc_state_info[CONN_STATE_TS_CLOSED].sc_state_plfd_events = 0;
1576    sc_state_info[CONN_STATE_TC_C_CNTING].sc_state_plfd_events = POLLOUT | POLLIN;
1577    sc_state_info[CONN_STATE_TC_C_CNTD].sc_state_plfd_events = POLLOUT | POLLIN;
1578    sc_state_info[CONN_STATE_TC_C_RANKSENT].sc_state_plfd_events = POLLIN;
1579    sc_state_info[CONN_STATE_TC_C_TMPVCSENT].sc_state_plfd_events = POLLIN;
1580    sc_state_info[CONN_STATE_TA_C_CNTD].sc_state_plfd_events = POLLIN;
1581    sc_state_info[CONN_STATE_TA_C_RANKRCVD].sc_state_plfd_events = POLLOUT | POLLIN;
1582    sc_state_info[CONN_STATE_TA_C_TMPVCRCVD].sc_state_plfd_events = POLLOUT | POLLIN;
1583    sc_state_info[CONN_STATE_TS_COMMRDY].sc_state_plfd_events = POLLIN;
1584    sc_state_info[CONN_STATE_TS_D_QUIESCENT].sc_state_plfd_events = POLLOUT | POLLIN;
1585
1586    /* Allocate the PLFD table */
1587    alloc_sc_plfd_tbls();
1588   
1589    MPIU_CHKPMEM_MALLOC(recv_buf, char*, MPID_NEM_TCP_RECV_MAX_PKT_LEN, mpi_errno, "TCP temporary buffer");
1590    MPIU_CHKPMEM_COMMIT();
1591
1592 fn_exit:
1593    return mpi_errno;
1594 fn_fail:
1595    MPIU_CHKPMEM_REAP();
1596    goto fn_exit;
1597}
1598
1599#undef FUNCNAME
1600#define FUNCNAME MPID_nem_tcp_sm_finalize
1601#undef FCNAME
1602#define FCNAME MPIDI_QUOTE(FUNCNAME)
1603int MPID_nem_tcp_sm_finalize()
1604{
1605    freenode_t *node;
1606
1607    /* walk the freeq and free all the elements */
1608    while (!Q_EMPTY(freeq)) {
1609        Q_DEQUEUE(&freeq, ((freenode_t **)&node)); 
1610        MPIU_Free(node);
1611    }
1612
1613    free_sc_plfd_tbls();
1614
1615    MPIU_Free(recv_buf);
1616
1617    return MPI_SUCCESS;
1618}
1619
1620/*
1621 N1: create a new listener fd?? While doing so, if we bind it to the same port used befor,
1622then it is ok. Else,the new port number(and thus the business card) has to be communicated
1623to the other processes (in same and different pg's), which is not quite simple to do.
1624Evaluate the need for it by testing and then do it, if needed.
1625
1626*/
1627#undef FUNCNAME
1628#define FUNCNAME MPID_nem_tcp_connpoll
1629#undef FCNAME
1630#define FCNAME MPIDI_QUOTE(FUNCNAME)
1631int MPID_nem_tcp_connpoll(int in_blocking_poll)
1632{
1633    int mpi_errno = MPI_SUCCESS, n, i;
1634    static int num_skipped_polls = 0;
1635
1636    /* num_polled is needed b/c the call to it_sc->handler() can change the
1637       size of the table, which leads to iterating over invalid revents. */
1638    int num_polled = g_tbl_size;
1639
1640    /* To improve shared memory performance, we don't call the poll()
1641     * system call every time. The MPID_nem_tcp_skip_polls value is
1642     * changed depending on whether we have any active connections.
1643     * We only skip polls when we're in a blocking progress loop in
1644     * order to avoid poor performance if the user does a "MPI_Test();
1645     * compute();" loop waiting for a req to complete. */
1646    if (in_blocking_poll && num_skipped_polls++ < MPID_nem_tcp_skip_polls)
1647        goto fn_exit;
1648    num_skipped_polls = 0;
1649
1650    CHECK_EINTR(n, poll(MPID_nem_tcp_plfd_tbl, num_polled, 0));
1651    MPIU_ERR_CHKANDJUMP1 (n == -1, mpi_errno, MPI_ERR_OTHER, 
1652                          "**poll", "**poll %s", strerror (errno));
1653    /* MPIU_DBG_MSG_FMT(NEM_SOCK_DET, VERBOSE, (MPIU_DBG_FDEST, "some sc fd poll event")); */
1654    for(i = 0; i < num_polled; i++)
1655    {
1656        struct pollfd *it_plfd = &MPID_nem_tcp_plfd_tbl[i];
1657        sockconn_t *it_sc = &g_sc_tbl[i];
1658
1659        if (it_plfd->fd != CONN_INVALID_FD && it_plfd->revents != 0)
1660        {
1661            /* We could check for POLLHUP here, but HUP/HUP+EOF is not erroneous
1662             * on many platforms, including modern Linux. */
1663            MPIU_ERR_CHKANDJUMP(it_plfd->revents & POLLERR, mpi_errno, MPI_ERR_OTHER, "**comm_fail");
1664            MPIU_ERR_CHKANDJUMP(it_sc->state.cstate != CONN_STATE_TS_D_QUIESCENT && (it_plfd->revents & POLLNVAL), mpi_errno, MPI_ERR_OTHER, "**comm_fail");
1665           
1666            mpi_errno = it_sc->handler(it_plfd, it_sc);
1667            if (mpi_errno) MPIU_ERR_POP (mpi_errno); 
1668        }
1669    }
1670   
1671 fn_exit:
1672    return mpi_errno;
1673 fn_fail:
1674    MPIU_DBG_MSG_FMT(NEM_SOCK_DET, VERBOSE, (MPIU_DBG_FDEST, "failure. mpi_errno = %d", mpi_errno));
1675    goto fn_exit;
1676}
1677
1678/*
1679  FIXME
1680  1.Check for accept error.
1681  2.If listening socket dies, create a new listening socket so that future connects
1682  can use that. In that case, how to tell other processes about the new listening port. It's
1683  probably a design issue at MPI level.
1684
1685  N1: There might be a timing window where poll on listen_fd was successful and there was
1686  a connection in the queue. Assume there was only one connection in the listen queue and
1687  before we called accept, the peer had reset the connection. On receiving a TCP RST,
1688  some implementations remove the connection from the listen queue. So, accept on
1689  a non-blocking fd would return error with EWOULDBLOCK.
1690
1691  N2: The peer might have reset or closed the connection. In some implementations,
1692  even if the connection is reset by the peer, accept is successful. By POSIX standard,
1693  if the connection is closed by the peer, accept will still be successful. So, soon
1694  after accept, check whether the new fd is really connected (i.e neither reset nor
1695  EOF received from peer).
1696  Now, it is decided not to check for this condition at this point. After all, in the next
1697  state, anyhow we can close the socket, if we receive an EOF.
1698
1699  N3:  find_free_entry is called within the while loop. It may cause the table to expand. So,
1700  the arguments passed for this callback function may get invalidated. So, it is imperative
1701  that we obtain sc pointer and plfd pointer everytime within the while loop.
1702  Accordingly, the parameters are named unused1 and unused2 for clarity.
1703*/
1704#undef FUNCNAME
1705#define FUNCNAME state_listening_handler
1706#undef FCNAME
1707#define FCNAME MPIDI_QUOTE(FUNCNAME)
1708int MPID_nem_tcp_state_listening_handler(struct pollfd *const unused_1, sockconn_t *const unused_2)
1709        /*  listener fd poll struct and sockconn structure */
1710{
1711    int mpi_errno = MPI_SUCCESS;
1712    int connfd;
1713    socklen_t len;
1714    SA_IN rmt_addr;
1715    struct pollfd *l_plfd;
1716    sockconn_t *l_sc;
1717    MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_TCP_STATE_LISTENING_HANDLER);
1718
1719    MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_TCP_STATE_LISTENING_HANDLER);
1720
1721    while (1) {
1722        l_sc = &g_sc_tbl[0];  /* N3 Important */
1723        l_plfd = &MPID_nem_tcp_plfd_tbl[0];
1724        len = sizeof(SA_IN);
1725        MPIU_DBG_MSG_FMT(NEM_SOCK_DET, VERBOSE, (MPIU_DBG_FDEST, "before accept"));
1726        if ((connfd = accept(l_sc->fd, (SA *) &rmt_addr, &len)) < 0) {
1727            MPIU_DBG_MSG_FMT(NEM_SOCK_DET, VERBOSE, (MPIU_DBG_FDEST, "after accept, l_sc=%p lstnfd=%d connfd=%d, errno=%d:%s ", l_sc, l_sc->fd, connfd, errno, strerror(errno)));
1728            if (errno == EINTR) 
1729                continue;
1730            else if (errno == EWOULDBLOCK)
1731                break; /*  no connection in the listen queue. get out of here.(N1) */
1732            MPIU_ERR_SETANDJUMP1 (mpi_errno, MPI_ERR_OTHER,
1733                                  "**sock_accept", "**sock_accept %s", strerror(errno));
1734        }
1735        else {
1736            int index = -1;
1737            struct pollfd *plfd;
1738            sockconn_t *sc;
1739
1740            MPID_nem_tcp_set_sockopts(connfd); /* (N2) */
1741            mpi_errno = find_free_entry(&index);
1742            if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP (mpi_errno);       
1743            sc = &g_sc_tbl[index];
1744            plfd = &MPID_nem_tcp_plfd_tbl[index];
1745           
1746            sc->fd = plfd->fd = connfd;
1747            sc->pg_rank = CONN_INVALID_RANK;
1748            sc->pg_is_set = FALSE;
1749            sc->is_tmpvc = 0;
1750
1751            CHANGE_STATE(sc, CONN_STATE_TA_C_CNTD);
1752
1753            MPIU_DBG_MSG_FMT(NEM_SOCK_DET, VERBOSE, (MPIU_DBG_FDEST, "accept success, added to table, connfd=%d, sc->vc=%p, sc=%p plfd->events=%#x", connfd, sc->vc, sc, plfd->events)); /* sc->vc should be NULL at this point */
1754        }
1755    }
1756
1757 fn_exit:
1758    MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_TCP_STATE_LISTENING_HANDLER);
1759    return mpi_errno;
1760 fn_fail:
1761    MPIU_DBG_MSG_FMT(NEM_SOCK_DET, VERBOSE, (MPIU_DBG_FDEST, "failure. mpi_errno = %d", mpi_errno));
1762    goto fn_exit;
1763}
Note: See TracBrowser for help on using the browser.