root/mpich2/trunk/src/mpid/ch3/channels/nemesis/nemesis/src/mpid_nem_lmt_shm.c @ 4865

Revision 4865, 30.7 KB (checked in by buntinas, 5 months ago)

stomped some warnings

Line 
1/* -*- Mode: C; c-basic-offset:4 ; -*- */
2/*
3 *  (C) 2006 by Argonne National Laboratory.
4 *      See COPYRIGHT in top-level directory.
5 */
6
7#include "mpid_nem_impl.h"
8#include "mpid_nem_datatypes.h"
9
10#include "mpiu_os_wrappers.h"
11#if defined(USE_DBG_LOGGING) && 0
12#define DBG_LMT(x) x
13#else
14#define DBG_LMT(x)
15#endif
16
17#ifdef ENABLE_NO_YIELD
18#define COND_Yield() do { } while(0)
19#else
20#define COND_Yield() MPIDU_Yield()
21#endif
22
23/* Progress queue */
24
25typedef struct lmt_shm_prog_element
26{
27    MPIDI_VC_t *vc;
28    struct lmt_shm_prog_element *next;
29    struct lmt_shm_prog_element *prev;
30} lmt_shm_prog_element_t;
31
32static struct {lmt_shm_prog_element_t *head;} lmt_shm_progress_q = {NULL};
33
34#define LMT_SHM_L_EMPTY() GENERIC_L_EMPTY(lmt_shm_progress_q)
35#define LMT_SHM_L_HEAD() GENERIC_L_HEAD(lmt_shm_progress_q)
36#define LMT_SHM_L_REMOVE(ep) GENERIC_L_REMOVE(&lmt_shm_progress_q, ep, next, prev)
37#define LMT_SHM_L_ADD(ep) GENERIC_L_ADD(&lmt_shm_progress_q, ep, next, prev)
38
39typedef struct MPID_nem_lmt_shm_wait_element
40{
41    int (* progress)(MPIDI_VC_t *vc, MPID_Request *req, int *done);
42    MPID_Request *req;
43    struct MPID_nem_lmt_shm_wait_element *next;
44} MPID_nem_lmt_shm_wait_element_t;
45
46#define LMT_SHM_Q_EMPTY(qp) GENERIC_Q_EMPTY(qp)
47#define LMT_SHM_Q_HEAD(qp) GENERIC_Q_HEAD(qp)
48#define LMT_SHM_Q_ENQUEUE(qp, ep) GENERIC_Q_ENQUEUE(qp, ep, next)
49#define LMT_SHM_Q_ENQUEUE_AT_HEAD(qp, ep) GENERIC_Q_ENQUEUE_AT_HEAD(qp, ep, next)
50#define LMT_SHM_Q_DEQUEUE(qp, epp) GENERIC_Q_DEQUEUE(qp, epp, next)
51#define LMT_SHM_Q_SEARCH_REMOVE(qp, req_id, epp) GENERIC_Q_SEARCH_REMOVE(qp, _e->req->handle == (req_id), epp, \
52                                                                         MPID_nem_lmt_shm_wait_element_t, next)
53#define CHECK_Q(qp) do{\
54    if (LMT_SHM_Q_EMPTY(*(qp))) break;\
55    if (LMT_SHM_Q_HEAD(*(qp))->next == NULL && (qp)->head != (qp)->tail)\
56    {\
57        printf ("ERROR\n");\
58        while(1);\
59    }\
60    \
61    }while(0)
62
63
64/* copy buffer in shared memory */
65#define NUM_BUFS 8
66#define MPID_NEM_COPY_BUF_LEN (32 * 1024)
67
68typedef union
69{
70    volatile int val;
71    char padding[MPID_NEM_CACHE_LINE_LEN];
72} MPID_nem_cacheline_int_t;
73
74typedef union
75{
76    volatile MPIDI_msg_sz_t val;
77    char padding[MPID_NEM_CACHE_LINE_LEN];
78} MPID_nem_cacheline_msg_sz_t;
79
80typedef union
81{
82    struct
83    {
84        OPA_int_t rank; /* OPA_int_t is already volatile */
85        volatile int remote_req_id;
86        DBG_LMT(volatile int ctr;)
87    } val;
88    char padding[MPID_NEM_CACHE_LINE_LEN];
89} MPID_nem_cacheline_owner_info_t;
90
91typedef struct MPID_nem_copy_buf
92{
93    MPID_nem_cacheline_owner_info_t owner_info;
94    MPID_nem_cacheline_int_t sender_present; /* is the sender currently in the lmt progress function for this buffer */
95    MPID_nem_cacheline_int_t receiver_present; /* is the receiver currently in the lmt progress function for this buffer */
96    MPID_nem_cacheline_int_t len[NUM_BUFS];
97    volatile char underflow_buf[MPID_NEM_CACHE_LINE_LEN]; /* used when not all data could be unpacked from previous buffer */
98    volatile char buf[NUM_BUFS][MPID_NEM_COPY_BUF_LEN];
99} MPID_nem_copy_buf_t;
100/* copy buffer flag values */
101#define BUF_EMPTY 0
102#define BUF_FULL  1
103
104#define NO_OWNER -1
105#define IN_USE -2
106
107/* pipeline values : if the data size is less than PIPELINE_THRESHOLD,
108   then copy no more than PIPELINE_MAX_SIZE at a time. */
109#define PIPELINE_MAX_SIZE (16 * 1024)
110#define PIPELINE_THRESHOLD (128 * 1024)
111
112
113static inline int lmt_shm_progress_vc(MPIDI_VC_t *vc, int *done);
114static int lmt_shm_send_progress(MPIDI_VC_t *vc, MPID_Request *req, int *done);
115static int lmt_shm_recv_progress(MPIDI_VC_t *vc, MPID_Request *req, int *done);
116static int MPID_nem_allocate_shm_region(MPID_nem_copy_buf_t **buf_p, MPIU_SHMW_Hnd_t handle);
117static int MPID_nem_attach_shm_region(MPID_nem_copy_buf_t **buf_p, MPIU_SHMW_Hnd_t handle);
118static int MPID_nem_detach_shm_region(MPID_nem_copy_buf_t **buf, MPIU_SHMW_Hnd_t handle);
119static int MPID_nem_delete_shm_region(MPID_nem_copy_buf_t **buf, MPIU_SHMW_Hnd_t *handle_p);
120
121/* number of iterations to wait for the other side to process a buffer */
122#define LMT_POLLS_BEFORE_YIELD 1000
123
124#undef FUNCNAME
125#define FUNCNAME MPID_nem_lmt_shm_initiate_lmt
126#undef FCNAME
127#define FCNAME MPIDI_QUOTE(FUNCNAME)
128int MPID_nem_lmt_shm_initiate_lmt(MPIDI_VC_t *vc, MPIDI_CH3_Pkt_t *pkt, MPID_Request *req)
129{
130    int mpi_errno = MPI_SUCCESS;
131    MPIDI_msg_sz_t data_sz;
132    int dt_contig;
133    MPI_Aint dt_true_lb;
134    MPID_Datatype * dt_ptr;
135    MPID_nem_pkt_lmt_rts_t * const rts_pkt = (MPID_nem_pkt_lmt_rts_t *)pkt;
136    MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_LMT_SHM_INITIATE_LMT);
137
138    MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_LMT_SHM_INITIATE_LMT);
139
140    MPID_nem_lmt_send_RTS(vc, rts_pkt, NULL, 0);
141
142    MPIDI_Datatype_get_info(req->dev.user_count, req->dev.datatype, dt_contig, data_sz, dt_ptr, dt_true_lb);
143    req->ch.lmt_data_sz = data_sz;
144
145 fn_exit:
146    MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_LMT_SHM_INITIATE_LMT);
147    return mpi_errno;
148 fn_fail:
149    goto fn_exit;
150}
151
152#undef FUNCNAME
153#define FUNCNAME MPID_nem_lmt_shm_start_recv
154#undef FCNAME
155#define FCNAME MPIDI_QUOTE(FUNCNAME)
156int MPID_nem_lmt_shm_start_recv(MPIDI_VC_t *vc, MPID_Request *req, MPID_IOV s_cookie)
157{
158    int mpi_errno = MPI_SUCCESS;
159    int done = FALSE;
160    MPIU_CHKPMEM_DECL(2);
161    MPID_nem_lmt_shm_wait_element_t *e;
162    int queue_initially_empty;
163    MPIDI_CH3I_VC *vc_ch = (MPIDI_CH3I_VC *)vc->channel_private;
164    char *ser_lmt_copy_buf_handle=NULL;
165    MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_LMT_SHM_START_RECV);
166
167    MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_LMT_SHM_START_RECV);
168
169    if (vc_ch->lmt_copy_buf == NULL)
170    {
171        int i;
172        mpi_errno = MPID_nem_allocate_shm_region(&vc_ch->lmt_copy_buf, vc_ch->lmt_copy_buf_handle);
173        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
174
175        vc_ch->lmt_copy_buf->sender_present.val   = 0;
176        vc_ch->lmt_copy_buf->receiver_present.val = 0;
177
178        for (i = 0; i < NUM_BUFS; ++i)
179            vc_ch->lmt_copy_buf->len[i].val = 0;
180
181        OPA_store_int(&vc_ch->lmt_copy_buf->owner_info.val.rank, NO_OWNER);
182        vc_ch->lmt_copy_buf->owner_info.val.remote_req_id = MPI_REQUEST_NULL;
183        DBG_LMT(vc_ch->lmt_copy_buf->owner_info.val.ctr = 0);
184    }
185
186    /* send CTS with handle for copy buffer */
187    mpi_errno = MPIU_SHMW_Hnd_get_serialized_by_ref((vc_ch->lmt_copy_buf_handle), &ser_lmt_copy_buf_handle);
188    if(mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); }
189   
190    MPID_nem_lmt_send_CTS(vc, req, ser_lmt_copy_buf_handle, (int)strlen(ser_lmt_copy_buf_handle) + 1);
191
192    queue_initially_empty = LMT_SHM_Q_EMPTY(vc_ch->lmt_queue) && vc_ch->lmt_active_lmt == NULL;
193
194    MPIU_CHKPMEM_MALLOC (e, MPID_nem_lmt_shm_wait_element_t *, sizeof (MPID_nem_lmt_shm_wait_element_t), mpi_errno, "lmt wait queue element");
195    e->progress = lmt_shm_recv_progress;
196    e->req = req;
197    LMT_SHM_Q_ENQUEUE(&vc_ch->lmt_queue, e); /* MT: not thread safe */
198
199    /* make progress on that vc */
200    mpi_errno = lmt_shm_progress_vc(vc, &done);
201    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
202
203    /* MT: not thread safe, another thread may have enqueued another
204       lmt after we did, and added this vc to the progress list.  In
205       that case we would be adding the vc twice. */
206    if (!done && queue_initially_empty)
207    {
208        /* lmt send didn't finish, enqueue it to be completed later */
209        lmt_shm_prog_element_t *pe;
210
211        MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "lmt recv not finished:  enqueue");
212
213        MPIU_CHKPMEM_MALLOC (pe, lmt_shm_prog_element_t *, sizeof (lmt_shm_prog_element_t), mpi_errno, "lmt progress queue element");
214        pe->vc = vc;
215        LMT_SHM_L_ADD(pe);
216        MPID_nem_local_lmt_pending = TRUE;
217        MPIU_Assert(!vc_ch->lmt_enqueued);
218        vc_ch->lmt_enqueued = TRUE;
219    }
220
221    MPIU_Assert(LMT_SHM_Q_EMPTY(vc_ch->lmt_queue) || !LMT_SHM_L_EMPTY());
222
223    MPIU_CHKPMEM_COMMIT();
224 fn_exit:
225    MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_LMT_SHM_START_RECV);
226    return mpi_errno;
227 fn_fail:
228    MPIU_CHKPMEM_REAP();
229    goto fn_exit;
230}
231
232#undef FUNCNAME
233#define FUNCNAME MPID_nem_lmt_shm_start_send
234#undef FCNAME
235#define FCNAME MPIDI_QUOTE(FUNCNAME)
236int MPID_nem_lmt_shm_start_send(MPIDI_VC_t *vc, MPID_Request *req, MPID_IOV r_cookie)
237{
238    int mpi_errno = MPI_SUCCESS;
239    int done = FALSE;
240    int queue_initially_empty;
241    MPID_nem_lmt_shm_wait_element_t *e;
242    MPIDI_CH3I_VC *vc_ch = (MPIDI_CH3I_VC *)vc->channel_private;
243    MPIU_CHKPMEM_DECL(3);
244    MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_LMT_SHM_START_SEND);
245
246    MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_LMT_SHM_START_SEND);
247
248    if (vc_ch->lmt_copy_buf == NULL){
249        mpi_errno = MPIU_SHMW_Hnd_deserialize(vc_ch->lmt_copy_buf_handle, r_cookie.MPID_IOV_BUF, strlen(r_cookie.MPID_IOV_BUF));
250        if(mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); }
251
252        mpi_errno = MPID_nem_attach_shm_region(&vc_ch->lmt_copy_buf, vc_ch->lmt_copy_buf_handle);
253        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
254        MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "attached to remote copy_buf");
255    }
256    else{
257        char *ser_lmt_copy_buf_handle=NULL;
258        mpi_errno = MPIU_SHMW_Hnd_get_serialized_by_ref(vc_ch->lmt_copy_buf_handle, &ser_lmt_copy_buf_handle);
259        if(mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); }
260        if (strncmp(ser_lmt_copy_buf_handle, r_cookie.MPID_IOV_BUF, r_cookie.MPID_IOV_LEN) < 0){
261            /* Each side allocated its own buffer, lexicographically lower valued buffer handle is deleted */
262            mpi_errno = MPID_nem_delete_shm_region(&(vc_ch->lmt_copy_buf), &(vc_ch->lmt_copy_buf_handle));
263            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
264
265            vc_ch->lmt_copy_buf = NULL;
266
267            /* The shared memory handle is not valid any more -- so get a new shm handle */
268            mpi_errno = MPIU_SHMW_Hnd_init(&vc_ch->lmt_copy_buf_handle);
269            if(mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); }
270
271            mpi_errno = MPIU_SHMW_Hnd_deserialize(vc_ch->lmt_copy_buf_handle, r_cookie.MPID_IOV_BUF, strlen(r_cookie.MPID_IOV_BUF));
272            if(mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); }
273
274            mpi_errno = MPID_nem_attach_shm_region(&vc_ch->lmt_copy_buf, vc_ch->lmt_copy_buf_handle);
275            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
276
277            LMT_SHM_Q_ENQUEUE_AT_HEAD(&vc_ch->lmt_queue, vc_ch->lmt_active_lmt); /* MT: not thread safe */
278            vc_ch->lmt_active_lmt = NULL;
279        }
280
281        MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "deleted my copy_buf and attached to remote");
282    }
283
284    queue_initially_empty = LMT_SHM_Q_EMPTY(vc_ch->lmt_queue) && vc_ch->lmt_active_lmt == NULL;
285
286    MPIU_CHKPMEM_MALLOC (e, MPID_nem_lmt_shm_wait_element_t *, sizeof (MPID_nem_lmt_shm_wait_element_t), mpi_errno, "lmt wait queue element");
287    e->progress = lmt_shm_send_progress;
288    e->req = req;
289    LMT_SHM_Q_ENQUEUE(&vc_ch->lmt_queue, e); /* MT: not thread safe */
290
291    /* make progress on that vc */
292    mpi_errno = lmt_shm_progress_vc(vc, &done);
293    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
294
295    /* MT: not thread safe, another thread may have enqueued another
296       lmt after we did, and added this vc to the progress list.  In
297       that case we would be adding the vc twice. */
298    if (!done && queue_initially_empty)
299    {
300        /* lmt send didn't finish, enqueue it to be completed later */
301        lmt_shm_prog_element_t *pe;
302
303        MPIU_CHKPMEM_MALLOC (pe, lmt_shm_prog_element_t *, sizeof (lmt_shm_prog_element_t), mpi_errno, "lmt progress queue element");
304        pe->vc = vc;
305        LMT_SHM_L_ADD(pe);
306        MPID_nem_local_lmt_pending = TRUE;
307        MPIU_Assert(!vc_ch->lmt_enqueued);
308        vc_ch->lmt_enqueued = TRUE;
309        MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "lmt send not finished:  enqueue");
310   }
311
312    MPIU_Assert(LMT_SHM_Q_EMPTY(vc_ch->lmt_queue) || !LMT_SHM_L_EMPTY());
313
314    MPIU_CHKPMEM_COMMIT();
315 fn_return:
316    MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_LMT_SHM_START_SEND);
317    return mpi_errno;
318 fn_fail:
319    MPIU_CHKPMEM_REAP();
320    goto fn_return;
321}
322
323#undef FUNCNAME
324#define FUNCNAME get_next_req
325#undef FCNAME
326#define FCNAME MPIDI_QUOTE(FUNCNAME)
327static int get_next_req(MPIDI_VC_t *vc)
328{
329    int mpi_errno = MPI_SUCCESS;
330    MPIDI_CH3I_VC *vc_ch = (MPIDI_CH3I_VC *)vc->channel_private;
331    MPID_nem_copy_buf_t * const copy_buf = vc_ch->lmt_copy_buf;
332    int prev_owner_rank;
333    MPID_Request *req;
334    MPIDI_STATE_DECL(MPID_STATE_GET_NEXT_REQ);
335
336    MPIDI_FUNC_ENTER(MPID_STATE_GET_NEXT_REQ);
337
338    prev_owner_rank = OPA_cas_int(&copy_buf->owner_info.val.rank, NO_OWNER, MPIDI_Process.my_pg_rank);
339
340    if (prev_owner_rank == IN_USE || prev_owner_rank == MPIDI_Process.my_pg_rank)
341    {
342        /* last lmt is not complete (receiver still receiving */
343        MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "waiting for receiver");
344        DBG_LMT(MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "ctr=%d rank=%d", copy_buf->owner_info.val.ctr, vc->pg_rank)));
345        goto fn_exit;
346    }
347
348    if (prev_owner_rank == NO_OWNER)
349    {
350        int i;
351        /* successfully grabbed idle copy buf */
352        OPA_write_barrier();
353        for (i = 0; i < NUM_BUFS; ++i)
354            copy_buf->len[i].val = 0;
355
356        DBG_LMT(++copy_buf->owner_info.val.ctr);
357
358        OPA_write_barrier();
359
360        LMT_SHM_Q_DEQUEUE(&vc_ch->lmt_queue, &vc_ch->lmt_active_lmt);
361        copy_buf->owner_info.val.remote_req_id = vc_ch->lmt_active_lmt->req->ch.lmt_req_id;
362        MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "grabbed idle buf.  remote_req=%d local_req=%d", copy_buf->owner_info.val.remote_req_id, vc_ch->lmt_active_lmt->req->handle));
363        DBG_LMT(MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "ctr=%d rank=%d", copy_buf->owner_info.val.ctr, vc->pg_rank)));
364    }
365    else
366    {
367        /* copy buf is owned by the remote side */
368        /* remote side chooses next transfer */
369        int i = 0;
370
371        OPA_read_barrier();
372       
373        MPIU_DBG_STMT(CH3_CHANNEL, VERBOSE, if (copy_buf->owner_info.val.remote_req_id == MPI_REQUEST_NULL)
374                                                MPIU_DBG_MSG_D(CH3_CHANNEL, VERBOSE, "waiting for owner rank=%d", vc->pg_rank));
375           
376        while (copy_buf->owner_info.val.remote_req_id == MPI_REQUEST_NULL)
377        {
378            if (i == LMT_POLLS_BEFORE_YIELD)
379            {
380                COND_Yield();
381                i = 0;
382            }
383            ++i;
384        }
385
386        OPA_read_barrier();
387        LMT_SHM_Q_SEARCH_REMOVE(&vc_ch->lmt_queue, copy_buf->owner_info.val.remote_req_id, &vc_ch->lmt_active_lmt);
388
389        MPIU_DBG_MSG_D(CH3_CHANNEL, VERBOSE, "remote side owns buf.  local_req=%d", copy_buf->owner_info.val.remote_req_id);
390        DBG_LMT(MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "ctr=%d rank=%d", copy_buf->owner_info.val.ctr, vc->pg_rank)));
391
392        if (vc_ch->lmt_active_lmt == NULL)
393        {
394            /* request not found  */
395            MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "request not found in lmt queue");
396            goto fn_exit;
397        }
398
399        /* found request, clear remote_req_id field to prevent this buffer from matching future reqs */
400        copy_buf->owner_info.val.remote_req_id = MPI_REQUEST_NULL;
401
402        OPA_store_int(&vc_ch->lmt_copy_buf->owner_info.val.rank, IN_USE);
403    }
404
405    req = vc_ch->lmt_active_lmt->req;
406    if (req->dev.segment_ptr == NULL)
407    {
408        /* Check to see if we've already allocated a seg for this req.
409           This can happen if both sides allocated copy buffers, and
410           we decided to use the remote side's buffer. */
411        req->dev.segment_ptr = MPID_Segment_alloc();
412        MPIU_ERR_CHKANDJUMP1((req->dev.segment_ptr == NULL), mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %s", "MPID_Segment_alloc");
413        MPID_Segment_init(req->dev.user_buf, req->dev.user_count, req->dev.datatype, req->dev.segment_ptr, 0);
414        req->dev.segment_first = 0;
415    }
416    vc_ch->lmt_buf_num = 0;
417    vc_ch->lmt_surfeit = 0;
418
419 fn_exit:
420    MPIDI_FUNC_EXIT(MPID_STATE_GET_NEXT_REQ);
421    return mpi_errno;
422 fn_fail:
423    goto fn_exit;
424}
425
426/* The message is copied in a pipelined fashion.  There are NUM_BUFS
427   buffers to copy through.  The sender waits until there is an empty
428   buffer, then fills it in and marks the number of bytes copied.
429   Note that because segment_pack() copies on basic-datatype granularity,
430   (i.e., won't copy three bytes of an int) we may not fill the entire
431   buffer each time. */
432
433#undef FUNCNAME
434#define FUNCNAME lmt_shm_send_progress
435#undef FCNAME
436#define FCNAME MPIDI_QUOTE(FUNCNAME)
437static int lmt_shm_send_progress(MPIDI_VC_t *vc, MPID_Request *req, int *done)
438{
439    int mpi_errno = MPI_SUCCESS;
440    MPIDI_CH3I_VC *vc_ch = (MPIDI_CH3I_VC *)vc->channel_private;
441    MPID_nem_copy_buf_t * const copy_buf = vc_ch->lmt_copy_buf;
442    MPIDI_msg_sz_t first;
443    MPIDI_msg_sz_t last;
444    int buf_num;
445    MPIDI_msg_sz_t data_sz, copy_limit;
446    MPIDI_STATE_DECL(MPID_STATE_LMT_SHM_SEND_PROGRESS);
447
448    MPIDI_FUNC_ENTER(MPID_STATE_LMT_SHM_SEND_PROGRESS);
449
450    DBG_LMT(MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "ctr=%d rank=%d", copy_buf->owner_info.val.ctr, vc->pg_rank)));
451
452    copy_buf->sender_present.val = TRUE;
453
454    MPIU_Assert(req == vc_ch->lmt_active_lmt->req);
455/*     MPIU_Assert(MPIDI_Request_get_type(req) == MPIDI_REQUEST_TYPE_SEND); */
456
457    data_sz = req->ch.lmt_data_sz;
458    buf_num = vc_ch->lmt_buf_num;
459    first = req->dev.segment_first;
460
461    do
462    {
463        int i;
464        /* If the buffer is full, wait.  If the receiver is actively
465           working on this transfer, yield the processor and keep
466           waiting, otherwise wait for a bounded amount of time. */
467        i = 0;
468        while (copy_buf->len[buf_num].val != 0)
469        {
470            if (i == LMT_POLLS_BEFORE_YIELD)
471            {
472                if (copy_buf->receiver_present.val)
473                {
474                    COND_Yield();
475                    i = 0;
476                }
477                else
478                {
479                    req->dev.segment_first = first;
480                    vc_ch->lmt_buf_num = buf_num;
481                    *done = FALSE;
482                    MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "first=" MPIDI_MSG_SZ_FMT " data_sz="MPIDI_MSG_SZ_FMT, first, data_sz));       
483                    MPIU_DBG_MSG_D(CH3_CHANNEL, VERBOSE, "Waiting on full buffer %d", buf_num);
484                    goto fn_exit;
485                }
486            }
487
488            ++i;
489        }
490
491        OPA_read_write_barrier();
492
493
494        /* we have a free buffer, fill it */
495        if (data_sz <= PIPELINE_THRESHOLD)
496            copy_limit = PIPELINE_MAX_SIZE;
497        else
498            copy_limit = MPID_NEM_COPY_BUF_LEN;
499        last = (data_sz - first <= copy_limit) ? data_sz : first + copy_limit;
500        MPID_Segment_pack(req->dev.segment_ptr, first, &last, (void *)copy_buf->buf[buf_num]); /* cast away volatile */
501        OPA_write_barrier();
502        copy_buf->len[buf_num].val = last - first;
503
504        first = last;
505        buf_num = (buf_num+1) % NUM_BUFS;
506
507        MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "sent data.  last=" MPIDI_MSG_SZ_FMT " data_sz=" MPIDI_MSG_SZ_FMT, last, data_sz));       
508    }
509    while (last < data_sz);
510
511    *done = TRUE;
512    MPIDI_CH3U_Request_complete(req);
513    MPIU_DBG_MSG_D(CH3_CHANNEL, VERBOSE, "completed req local_req=%d", req->handle);
514
515
516 fn_exit:
517    copy_buf->sender_present.val = FALSE;
518    MPIDI_FUNC_EXIT(MPID_STATE_LMT_SHM_SEND_PROGRESS);
519    return mpi_errno;
520}
521
522/* Continued from note for lmt_shm_send_progress() above.  The
523   receiver uses segment_unpack(), and just like segment_pack() it may
524   not copy all the data you ask it to.  This means that we may leave
525   some data in the buffer uncopied.  To handle this, we copy the
526   remaining data to just before the next buffer.  Then when the next
527   buffer is filled, we start copying where we just copied the
528   leftover data from last time.  */
529
530#undef FUNCNAME
531#define FUNCNAME lmt_shm_recv_progress
532#undef FCNAME
533#define FCNAME MPIDI_QUOTE(FUNCNAME)
534static int lmt_shm_recv_progress(MPIDI_VC_t *vc, MPID_Request *req, int *done)
535{
536    int mpi_errno = MPI_SUCCESS;
537    MPIDI_CH3I_VC *vc_ch = (MPIDI_CH3I_VC *)vc->channel_private;
538    MPID_nem_copy_buf_t * const copy_buf = vc_ch->lmt_copy_buf;
539    MPIDI_msg_sz_t first;
540    MPIDI_msg_sz_t last, expected_last;
541    int buf_num;
542    MPIDI_msg_sz_t data_sz, len;
543    int i;
544    MPIDI_msg_sz_t surfeit;
545    char *src_buf;
546    char tmpbuf[MPID_NEM_CACHE_LINE_LEN];
547    MPIDI_STATE_DECL(MPID_STATE_LMT_SHM_RECV_PROGRESS);
548
549    MPIDI_FUNC_ENTER(MPID_STATE_LMT_SHM_RECV_PROGRESS);
550
551    DBG_LMT(MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "ctr=%d rank=%d", copy_buf->owner_info.val.ctr, vc->pg_rank)));
552
553    copy_buf->receiver_present.val = TRUE;
554
555    surfeit = vc_ch->lmt_surfeit;
556    data_sz = req->ch.lmt_data_sz;
557    buf_num = vc_ch->lmt_buf_num;
558    first = req->dev.segment_first;
559
560    do
561    {
562        /* If the buffer is empty, wait.  If the sender is actively
563           working on this transfer, yield the processor and keep
564           waiting, otherwise wait for a bounded amount of time. */
565        i = 0;
566        while ((len = copy_buf->len[buf_num].val) == 0)
567        {
568            if (i == LMT_POLLS_BEFORE_YIELD)
569            {
570                if (copy_buf->sender_present.val)
571                {
572                    COND_Yield();
573                    i = 0;
574                }
575                else
576                {
577                    req->dev.segment_first = first;
578                    vc_ch->lmt_buf_num = buf_num;
579                    vc_ch->lmt_surfeit = surfeit;
580                    *done = FALSE;
581                    MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "first=" MPIDI_MSG_SZ_FMT " data_sz=" MPIDI_MSG_SZ_FMT, first, data_sz));
582                    MPIU_DBG_MSG_D(CH3_CHANNEL, VERBOSE, "Waiting on empty buffer %d", buf_num);
583                    goto fn_exit;
584                }
585            }
586
587            ++i;
588        }
589
590        OPA_read_barrier();
591
592        /* unpack data including any leftover from the previous buffer */
593        src_buf = ((char *)copy_buf->buf[buf_num]) - surfeit; /* cast away volatile */
594        last = expected_last = (data_sz - first <= surfeit + len) ? data_sz : first + surfeit + len;
595
596        MPID_Segment_unpack(req->dev.segment_ptr, first, &last, src_buf);
597
598        MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "recvd data.  last=" MPIDI_MSG_SZ_FMT " data_sz=" MPIDI_MSG_SZ_FMT, last, data_sz));
599
600        if (surfeit && buf_num > 0)
601        {
602            /* we had leftover data from the previous buffer, we can
603               now mark that buffer as empty */
604
605            OPA_read_write_barrier();
606            copy_buf->len[(buf_num-1)].val = 0;
607            /* Make sure we copied at least the leftover data from last time */
608            MPIU_Assert(last - first > surfeit);
609
610            MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "freed previous buffer");
611       }
612
613        if (last < expected_last)
614        {
615            /* we have leftover data in the buffer that we couldn't copy out */
616            char *surfeit_ptr;
617
618            surfeit_ptr = (char *)src_buf + last - first;
619            surfeit = expected_last - last;
620
621            if (buf_num == NUM_BUFS-1)
622            {
623                /* if we're wrapping back to buf 0, then we can copy it directly */
624                MPIU_Memcpy(((char *)copy_buf->buf[0]) - surfeit, surfeit_ptr, surfeit);
625
626                OPA_read_write_barrier();
627                copy_buf->len[buf_num].val = 0;
628            }
629            else
630            {
631                /* otherwise, we need to copy to a tmpbuf first to make sure the src and dest addresses don't overlap */
632                MPIU_Memcpy(tmpbuf, surfeit_ptr, surfeit);
633                MPIU_Memcpy(((char *)copy_buf->buf[buf_num+1]) - surfeit, tmpbuf, surfeit);
634            }
635
636            MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "copied leftover data");
637        }
638        else
639        {
640            /* all data was unpacked, we can mark this buffer as empty */
641            surfeit = 0;
642
643            OPA_read_write_barrier();
644            copy_buf->len[buf_num].val = 0;
645        }
646
647        first = last;
648        buf_num = (buf_num+1) % NUM_BUFS;
649    }
650    while (last < data_sz);
651
652    for (i = 0; i < NUM_BUFS; ++i)
653        copy_buf->len[i].val = 0;
654
655    MPIU_DBG_MSG_D(CH3_CHANNEL, VERBOSE, "completed request local_req=%d", req->handle);
656    OPA_write_barrier();
657    OPA_store_int(&copy_buf->owner_info.val.rank, NO_OWNER);
658
659    *done = TRUE;
660    MPIDI_CH3U_Request_complete(req);
661
662 fn_exit:
663    copy_buf->receiver_present.val = FALSE;
664    MPIDI_FUNC_EXIT(MPID_STATE_LMT_SHM_RECV_PROGRESS);
665    return mpi_errno;
666}
667
668#undef FUNCNAME
669#define FUNCNAME MPID_nem_lmt_shm_handle_cookie
670#undef FCNAME
671#define FCNAME MPIDI_QUOTE(FUNCNAME)
672int MPID_nem_lmt_shm_handle_cookie(MPIDI_VC_t *vc, MPID_Request *req, MPID_IOV cookie)
673{
674    MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_LMT_SHM_HANDLE_COOKIE);
675
676    MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_LMT_SHM_HANDLE_COOKIE);
677
678    MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_LMT_SHM_HANDLE_COOKIE);
679    return MPI_SUCCESS;
680}
681
682#undef FUNCNAME
683#define FUNCNAME MPID_nem_lmt_shm_done_send
684#undef FCNAME
685#define FCNAME MPIDI_QUOTE(FUNCNAME)
686int MPID_nem_lmt_shm_done_send(MPIDI_VC_t *vc, MPID_Request *req)
687{
688    MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_LMT_SHM_DONE_SEND);
689
690    MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_LMT_SHM_DONE_SEND);
691
692    MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_LMT_SHM_DONE_SEND);
693    return MPI_SUCCESS;
694}
695
696#undef FUNCNAME
697#define FUNCNAME MPID_nem_lmt_shm_done_recv
698#undef FCNAME
699#define FCNAME MPIDI_QUOTE(FUNCNAME)
700int MPID_nem_lmt_shm_done_recv(MPIDI_VC_t *vc, MPID_Request *req)
701{
702    MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_LMT_SHM_DONE_RECV);
703
704    MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_LMT_SHM_DONE_RECV);
705
706    MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_LMT_SHM_DONE_RECV);
707    return MPI_SUCCESS;
708}
709
710#undef FUNCNAME
711#define FUNCNAME MPID_nem_lmt_shm_progress_vc
712#undef FCNAME
713#define FCNAME MPIDI_QUOTE(FUNCNAME)
714static inline int lmt_shm_progress_vc(MPIDI_VC_t *vc, int *done)
715{
716    int mpi_errno = MPI_SUCCESS;
717    int done_req = FALSE;
718    MPID_nem_lmt_shm_wait_element_t *we;
719    MPIDI_CH3I_VC *vc_ch = (MPIDI_CH3I_VC *)vc->channel_private;
720    MPIDI_STATE_DECL(MPID_STATE_LMT_SHM_PROGRESS_VC);
721
722    MPIDI_FUNC_ENTER(MPID_STATE_LMT_SHM_PROGRESS_VC);
723
724    *done = FALSE;
725
726    if (vc_ch->lmt_active_lmt == NULL)
727    {
728        mpi_errno = get_next_req(vc);
729        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
730
731        if (vc_ch->lmt_active_lmt == NULL)
732        {
733            /* couldn't find an appropriate request, try again later */
734            goto fn_exit;
735        }
736    }
737
738    we = vc_ch->lmt_active_lmt;
739    mpi_errno = we->progress(vc, we->req, &done_req);
740    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
741
742    if (done_req)
743    {
744        MPIU_Free(vc_ch->lmt_active_lmt);
745        vc_ch->lmt_active_lmt = NULL;
746
747        if (LMT_SHM_Q_EMPTY(vc_ch->lmt_queue))
748            *done = TRUE;
749    }
750
751 fn_exit:
752    MPIDI_FUNC_EXIT(MPID_STATE_LMT_SHM_PROGRESS_VC);
753    return mpi_errno;
754 fn_fail:
755    goto fn_exit;
756}
757
758#undef FUNCNAME
759#define FUNCNAME MPID_nem_lmt_shm_progress
760#undef FCNAME
761#define FCNAME MPIDI_QUOTE(FUNCNAME)
762int MPID_nem_lmt_shm_progress(void)
763{
764    int mpi_errno = MPI_SUCCESS;
765    lmt_shm_prog_element_t *pe;
766    MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_LMT_SHM_PROGRESS);
767
768    MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_LMT_SHM_PROGRESS);
769
770    pe = LMT_SHM_L_HEAD();
771
772    while (pe)
773    {
774        int done = FALSE;
775
776        mpi_errno = lmt_shm_progress_vc(pe->vc, &done);
777        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
778
779        if (done)
780        {
781            lmt_shm_prog_element_t *f;
782            MPIU_Assert(LMT_SHM_Q_EMPTY(((MPIDI_CH3I_VC *)pe->vc->channel_private)->lmt_queue));
783            MPIU_Assert(((MPIDI_CH3I_VC *)pe->vc->channel_private)->lmt_active_lmt == NULL);
784            MPIU_Assert(((MPIDI_CH3I_VC *)pe->vc->channel_private)->lmt_enqueued);
785            ((MPIDI_CH3I_VC *)pe->vc->channel_private)->lmt_enqueued = FALSE;
786
787            f = pe;
788            pe = pe->next;
789            LMT_SHM_L_REMOVE(f);
790            MPIU_Free(f);
791        }
792        else
793            pe = pe->next;
794    }
795
796    if (LMT_SHM_L_EMPTY())
797        MPID_nem_local_lmt_pending = FALSE;
798
799 fn_exit:
800    MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_LMT_SHM_PROGRESS);
801    return mpi_errno;
802 fn_fail:
803    goto fn_exit;
804}
805
806#undef FUNCNAME
807#define FUNCNAME MPID_nem_allocate_shm_region
808#undef FCNAME
809#define FCNAME MPIDI_QUOTE(FUNCNAME)
810static int MPID_nem_allocate_shm_region(MPID_nem_copy_buf_t **buf_p, MPIU_SHMW_Hnd_t handle)
811{
812    int mpi_errno = MPI_SUCCESS;
813    MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_ALLOCATE_SHM_REGION);
814
815    MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_ALLOCATE_SHM_REGION);
816
817    if (*buf_p)
818    {
819        /* we're already attached */
820        goto fn_exit;
821    }
822
823    mpi_errno = MPIU_SHMW_Seg_create_and_attach(handle, sizeof(MPID_nem_copy_buf_t), (char **)buf_p, 0);
824    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
825
826 fn_exit:
827    MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_ALLOCATE_SHM_REGION);
828    return mpi_errno;
829 fn_fail:
830    goto fn_exit;
831}
832
833#undef FUNCNAME
834#define FUNCNAME MPID_nem_attach_shm_region
835#undef FCNAME
836#define FCNAME MPIDI_QUOTE(FUNCNAME)
837static int MPID_nem_attach_shm_region(MPID_nem_copy_buf_t **buf_p, MPIU_SHMW_Hnd_t handle)
838{
839    int mpi_errno = MPI_SUCCESS;
840    MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_ATTACH_SHM_REGION);
841
842    MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_ATTACH_SHM_REGION);
843
844    if(*buf_p)
845    {
846        /* we're already attached */
847        goto fn_exit;
848    }
849
850    mpi_errno = MPIU_SHMW_Seg_attach(handle, sizeof(MPID_nem_copy_buf_t), (char **)buf_p, 0);
851    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
852
853    mpi_errno = MPIU_SHMW_Seg_remove(handle);
854    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
855
856 fn_exit:
857    MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_ATTACH_SHM_REGION);
858    return mpi_errno;
859 fn_fail:
860    goto fn_exit;
861}
862
863#undef FUNCNAME
864#define FUNCNAME MPID_nem_detach_shm_region
865#undef FCNAME
866#define FCNAME MPIDI_QUOTE(FUNCNAME)
867static int MPID_nem_detach_shm_region(MPID_nem_copy_buf_t **buf_p, MPIU_SHMW_Hnd_t handle)
868{
869    int mpi_errno = MPI_SUCCESS;
870    MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_DETACH_SHM_REGION);
871
872    MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_DETACH_SHM_REGION);
873
874    mpi_errno = MPIU_SHMW_Seg_detach(handle, (char **)buf_p, sizeof(MPID_nem_copy_buf_t));
875    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
876
877 fn_exit:
878    MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_DETACH_SHM_REGION);
879    return mpi_errno;
880 fn_fail:
881    goto fn_exit;
882}
883
884#undef FUNCNAME
885#define FUNCNAME MPID_nem_delete_shm_region
886#undef FCNAME
887#define FCNAME MPIDI_QUOTE(FUNCNAME)
888static int MPID_nem_delete_shm_region(MPID_nem_copy_buf_t **buf_p, MPIU_SHMW_Hnd_t *handle_p)
889{
890    int mpi_errno = MPI_SUCCESS;
891    MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_DELETE_SHM_REGION);
892
893    MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_DELETE_SHM_REGION);
894
895    mpi_errno = MPIU_SHMW_Seg_remove(*handle_p);
896    if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); }
897
898    mpi_errno = MPID_nem_detach_shm_region(buf_p, *handle_p); 
899    if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); }
900
901    mpi_errno = MPIU_SHMW_Hnd_finalize(handle_p);
902    if(mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); }
903
904 fn_exit:
905    MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_DELETE_SHM_REGION);
906    return mpi_errno;
907 fn_fail:
908    goto fn_exit;
909}
Note: See TracBrowser for help on using the browser.