root/mpich2/trunk/src/mpi/romio/adio/ad_lustre/ad_lustre_wrcoll.c @ 4889

Revision 4889, 33.2 KB (checked in by robl, 5 months ago)

From Pascal Deveze <pascal.deveze@…>: Update lustre for ROMIO's 64-bit extent work (use ADIO_Offset instead of ints)

Line 
1/* -*- Mode: C; c-basic-offset:4 ; -*- */
2/*
3 *   Copyright (C) 1997 University of Chicago.
4 *   See COPYRIGHT notice in top-level directory.
5 *
6 *   Copyright (C) 2007 Oak Ridge National Laboratory
7 *
8 *   Copyright (C) 2008 Sun Microsystems, Lustre group
9 */
10
11#include "ad_lustre.h"
12#include "adio_extern.h"
13
14/* prototypes of functions used for collective writes only. */
15static void ADIOI_LUSTRE_Exch_and_write(ADIO_File fd, void *buf,
16                                        MPI_Datatype datatype, int nprocs,
17                                        int myrank,
18                                        ADIOI_Access *others_req,
19                                        ADIOI_Access *my_req,
20                                        ADIO_Offset *offset_list,
21                                        ADIO_Offset *len_list,
22                                        int contig_access_count,
23                                        int *striping_info,
24                                        int *buf_idx, int *error_code);
25static void ADIOI_LUSTRE_Fill_send_buffer(ADIO_File fd, void *buf,
26                                          ADIOI_Flatlist_node *flat_buf,
27                                          char **send_buf,
28                                          ADIO_Offset *offset_list,
29                                          ADIO_Offset *len_list, int *send_size,
30                                          MPI_Request *requests,
31                                          int *sent_to_proc, int nprocs,
32                                          int myrank, int contig_access_count,
33                                          int *striping_info,
34                                          int *send_buf_idx,
35                                          int *curr_to_proc,
36                                          int *done_to_proc, int iter,
37                                          MPI_Aint buftype_extent);
38static void ADIOI_LUSTRE_W_Exchange_data(ADIO_File fd, void *buf,
39                                         char *write_buf,
40                                         ADIOI_Flatlist_node *flat_buf,
41                                         ADIO_Offset *offset_list,
42                                         ADIO_Offset *len_list, int *send_size,
43                                         int *recv_size, ADIO_Offset off,
44                                         int size, int *count,
45                                         int *start_pos, int *partial_recv,
46                                         int *sent_to_proc, int nprocs,
47                                         int myrank, int buftype_is_contig,
48                                         int contig_access_count,
49                                         int *striping_info,
50                                         ADIOI_Access *others_req,
51                                         int *send_buf_idx,
52                                         int *curr_to_proc,
53                                         int *done_to_proc, int *hole,
54                                         int iter, MPI_Aint buftype_extent,
55                                         int *buf_idx, int *error_code);
56void ADIOI_Heap_merge(ADIOI_Access *others_req, int *count,
57                      ADIO_Offset *srt_off, int *srt_len, int *start_pos,
58                      int nprocs, int nprocs_recv, int total_elements);
59
60void ADIOI_LUSTRE_WriteStridedColl(ADIO_File fd, void *buf, int count,
61                                   MPI_Datatype datatype,
62                                   int file_ptr_type, ADIO_Offset offset,
63                                   ADIO_Status *status, int *error_code)
64{
65    /* Uses a generalized version of the extended two-phase method described
66     * in "An Extended Two-Phase Method for Accessing Sections of
67     * Out-of-Core Arrays", Rajeev Thakur and Alok Choudhary,
68     * Scientific Programming, (5)4:301--317, Winter 1996.
69     * http://www.mcs.anl.gov/home/thakur/ext2ph.ps
70     */
71
72    ADIOI_Access *my_req;
73    /* array of nprocs access structures, one for each other process has
74       this process's request */
75
76    ADIOI_Access *others_req;
77    /* array of nprocs access structures, one for each other process
78       whose request is written by this process. */
79
80    int i, filetype_is_contig, nprocs, myrank, do_collect = 0;
81    int contig_access_count = 0, buftype_is_contig, interleave_count = 0;
82    int *count_my_req_per_proc, count_my_req_procs, count_others_req_procs;
83    ADIO_Offset orig_fp, start_offset, end_offset, off;
84    ADIO_Offset *offset_list = NULL, *st_offsets = NULL, *end_offsets = NULL;
85    ADIO_Offset *len_list = NULL;
86    int *buf_idx = NULL, *striping_info = NULL;
87    int old_error, tmp_error;
88
89    MPI_Comm_size(fd->comm, &nprocs);
90    MPI_Comm_rank(fd->comm, &myrank);
91
92    orig_fp = fd->fp_ind;
93
94    /* IO patten identification if cb_write isn't disabled */
95    if (fd->hints->cb_write != ADIOI_HINT_DISABLE) {
96        /* For this process's request, calculate the list of offsets and
97           lengths in the file and determine the start and end offsets. */
98
99        /* Note: end_offset points to the last byte-offset that will be accessed.
100         * e.g., if start_offset=0 and 100 bytes to be read, end_offset=99
101         */
102
103        ADIOI_Calc_my_off_len(fd, count, datatype, file_ptr_type, offset,
104                              &offset_list, &len_list, &start_offset,
105                              &end_offset, &contig_access_count);
106
107        /* each process communicates its start and end offsets to other
108         * processes. The result is an array each of start and end offsets
109         * stored in order of process rank.
110         */
111        st_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs * sizeof(ADIO_Offset));
112        end_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs * sizeof(ADIO_Offset));
113        MPI_Allgather(&start_offset, 1, ADIO_OFFSET, st_offsets, 1,
114                      ADIO_OFFSET, fd->comm);
115        MPI_Allgather(&end_offset, 1, ADIO_OFFSET, end_offsets, 1,
116                      ADIO_OFFSET, fd->comm);
117        /* are the accesses of different processes interleaved? */
118        for (i = 1; i < nprocs; i++)
119            if ((st_offsets[i] < end_offsets[i-1]) &&
120                (st_offsets[i] <= end_offsets[i]))
121                interleave_count++;
122        /* This is a rudimentary check for interleaving, but should suffice
123           for the moment. */
124
125        /* Two typical access patterns can benefit from collective write.
126         *   1) the processes are interleaved, and
127         *   2) the req size is small.
128         */
129        if (interleave_count > 0) {
130            do_collect = 1;
131        } else {
132            do_collect = ADIOI_LUSTRE_Docollect(fd, contig_access_count,
133                                                len_list, nprocs);
134        }
135    }
136    ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
137
138    /* Decide if collective I/O should be done */
139    if ((!do_collect && fd->hints->cb_write == ADIOI_HINT_AUTO) ||
140        fd->hints->cb_write == ADIOI_HINT_DISABLE) {
141
142        int filerange_is_contig = 0;
143
144        /* use independent accesses */
145        if (fd->hints->cb_write != ADIOI_HINT_DISABLE) {
146            ADIOI_Free(offset_list);
147            ADIOI_Free(len_list);
148            ADIOI_Free(st_offsets);
149            ADIOI_Free(end_offsets);
150        }
151
152        fd->fp_ind = orig_fp;
153        ADIOI_Datatype_iscontig(fd->filetype, &filetype_is_contig);
154        if (buftype_is_contig && filetype_is_contig) {
155            if (file_ptr_type == ADIO_EXPLICIT_OFFSET) {
156                off = fd->disp + (fd->etype_size) * offset;
157                ADIO_WriteContig(fd, buf, count, datatype,
158                                 ADIO_EXPLICIT_OFFSET,
159                                 off, status, error_code);
160            } else
161                ADIO_WriteContig(fd, buf, count, datatype, ADIO_INDIVIDUAL,
162                                 0, status, error_code);
163        } else {
164            ADIO_WriteStrided(fd, buf, count, datatype, file_ptr_type,
165                              offset, status, error_code);
166        }
167        return;
168    }
169
170    /* Get Lustre hints information */
171    ADIOI_LUSTRE_Get_striping_info(fd, &striping_info, 1);
172
173    /* calculate what portions of the access requests of this process are
174     * located in which process
175     */
176    ADIOI_LUSTRE_Calc_my_req(fd, offset_list, len_list, contig_access_count,
177                             striping_info, nprocs, &count_my_req_procs,
178                             &count_my_req_per_proc, &my_req, &buf_idx);
179
180    /* based on everyone's my_req, calculate what requests of other processes
181     * will be accessed by this process.
182     * count_others_req_procs = number of processes whose requests (including
183     * this process itself) will be accessed by this process
184     * count_others_req_per_proc[i] indicates how many separate contiguous
185     * requests of proc. i will be accessed by this process.
186     */
187
188    ADIOI_Calc_others_req(fd, count_my_req_procs, count_my_req_per_proc,
189                          my_req, nprocs, myrank, &count_others_req_procs,
190                          &others_req);
191    ADIOI_Free(count_my_req_per_proc);
192
193    /* exchange data and write in sizes of no more than stripe_size. */
194    ADIOI_LUSTRE_Exch_and_write(fd, buf, datatype, nprocs, myrank,
195                                others_req, my_req,
196                                offset_list, len_list, contig_access_count,
197                                striping_info, buf_idx, error_code);
198
199    /* If this collective write is followed by an independent write,
200     * it's possible to have those subsequent writes on other processes
201     * race ahead and sneak in before the read-modify-write completes.
202     * We carry out a collective communication at the end here so no one
203     * can start independent i/o before collective I/O completes.
204     *
205     * need to do some gymnastics with the error codes so that if something
206     * went wrong, all processes report error, but if a process has a more
207     * specific error code, we can still have that process report the
208     * additional information */
209
210    old_error = *error_code;
211    if (*error_code != MPI_SUCCESS)
212        *error_code = MPI_ERR_IO;
213
214    /* optimization: if only one process performing i/o, we can perform
215     * a less-expensive Bcast  */
216#ifdef ADIOI_MPE_LOGGING
217    MPE_Log_event(ADIOI_MPE_postwrite_a, 0, NULL);
218#endif
219    if (fd->hints->cb_nodes == 1)
220        MPI_Bcast(error_code, 1, MPI_INT,
221                  fd->hints->ranklist[0], fd->comm);
222    else {
223        tmp_error = *error_code;
224        MPI_Allreduce(&tmp_error, error_code, 1, MPI_INT,
225                      MPI_MAX, fd->comm);
226    }
227#ifdef ADIOI_MPE_LOGGING
228    MPE_Log_event(ADIOI_MPE_postwrite_b, 0, NULL);
229#endif
230
231    if ((old_error != MPI_SUCCESS) && (old_error != MPI_ERR_IO))
232        *error_code = old_error;
233
234
235    if (!buftype_is_contig)
236        ADIOI_Delete_flattened(datatype);
237
238    /* free all memory allocated for collective I/O */
239    /* free others_req */
240    for (i = 0; i < nprocs; i++) {
241        if (others_req[i].count) {
242            ADIOI_Free(others_req[i].offsets);
243            ADIOI_Free(others_req[i].lens);
244            ADIOI_Free(others_req[i].mem_ptrs);
245        }
246    }
247    ADIOI_Free(others_req);
248    /* free my_req here */
249    for (i = 0; i < nprocs; i++) {
250        if (my_req[i].count) {
251            ADIOI_Free(my_req[i].offsets);
252            ADIOI_Free(my_req[i].lens);
253        }
254    }
255    ADIOI_Free(my_req);
256    ADIOI_Free(buf_idx);
257    ADIOI_Free(offset_list);
258    ADIOI_Free(len_list);
259    ADIOI_Free(st_offsets);
260    ADIOI_Free(end_offsets);
261    ADIOI_Free(striping_info);
262
263#ifdef HAVE_STATUS_SET_BYTES
264    if (status) {
265        int bufsize, size;
266        /* Don't set status if it isn't needed */
267        MPI_Type_size(datatype, &size);
268        bufsize = size * count;
269        MPIR_Status_set_bytes(status, datatype, bufsize);
270    }
271    /* This is a temporary way of filling in status. The right way is to
272     * keep track of how much data was actually written during collective I/O.
273     */
274#endif
275
276    fd->fp_sys_posn = -1;       /* set it to null. */
277}
278
279/* If successful, error_code is set to MPI_SUCCESS.  Otherwise an error
280 * code is created and returned in error_code.
281 */
282static void ADIOI_LUSTRE_Exch_and_write(ADIO_File fd, void *buf,
283                                        MPI_Datatype datatype, int nprocs,
284                                        int myrank, ADIOI_Access *others_req,
285                                        ADIOI_Access *my_req,
286                                        ADIO_Offset *offset_list,
287                                        ADIO_Offset *len_list, 
288                                        int contig_access_count,
289                                        int *striping_info, int *buf_idx,
290                                        int *error_code)
291{
292    /* Send data to appropriate processes and write in sizes of no more
293     * than lustre stripe_size.
294     * The idea is to reduce the amount of extra memory required for
295     * collective I/O. If all data were written all at once, which is much
296     * easier, it would require temp space more than the size of user_buf,
297     * which is often unacceptable. For example, to write a distributed
298     * array to a file, where each local array is 8Mbytes, requiring
299     * at least another 8Mbytes of temp space is unacceptable.
300     */
301
302    int hole, i, j, m, flag, ntimes = 1 , max_ntimes, buftype_is_contig;
303    ADIO_Offset st_loc = -1, end_loc = -1, min_st_loc, max_end_loc;
304    ADIO_Offset off, req_off, send_off, iter_st_off, *off_list;
305    ADIO_Offset max_size, step_size = 0;
306    int real_size, req_len, send_len;
307    int *recv_curr_offlen_ptr, *recv_count, *recv_size;
308    int *send_curr_offlen_ptr, *send_size;
309    int *partial_recv, *sent_to_proc, *recv_start_pos;
310    int *send_buf_idx, *curr_to_proc, *done_to_proc;
311    char *write_buf = NULL;
312    MPI_Status status;
313    ADIOI_Flatlist_node *flat_buf = NULL;
314    MPI_Aint buftype_extent;
315    int stripe_size = striping_info[0], avail_cb_nodes = striping_info[2];
316    int data_sieving = 0;
317
318    *error_code = MPI_SUCCESS;  /* changed below if error */
319    /* only I/O errors are currently reported */
320
321    /* calculate the number of writes of stripe size to be done.
322     * That gives the no. of communication phases as well.
323     * Note:
324     *   Because we redistribute data in stripe-contiguous pattern for Lustre,
325     *   each process has the same no. of communication phases.
326     */
327
328    for (i = 0; i < nprocs; i++) {
329        if (others_req[i].count) {
330            st_loc = others_req[i].offsets[0];
331            end_loc = others_req[i].offsets[0];
332            break;
333        }
334    }
335    for (i = 0; i < nprocs; i++) {
336        for (j = 0; j < others_req[i].count; j++) {
337            st_loc = ADIOI_MIN(st_loc, others_req[i].offsets[j]);
338            end_loc = ADIOI_MAX(end_loc, (others_req[i].offsets[j] +
339                                          others_req[i].lens[j] - 1));
340        }
341    }
342    /* this process does no writing. */
343    if ((st_loc == -1) && (end_loc == -1))
344        ntimes = 0;
345    MPI_Allreduce(&end_loc, &max_end_loc, 1, MPI_LONG_LONG_INT, MPI_MAX, fd->comm);
346    /* avoid min_st_loc be -1 */
347    if (st_loc == -1)
348        st_loc = max_end_loc;
349    MPI_Allreduce(&st_loc, &min_st_loc, 1, MPI_LONG_LONG_INT, MPI_MIN, fd->comm);
350    /* align downward */
351    min_st_loc -= min_st_loc % (ADIO_Offset)stripe_size;
352
353    /* Each time, only avail_cb_nodes number of IO clients perform IO,
354     * so, step_size=avail_cb_nodes*stripe_size IO will be performed at most,
355     * and ntimes=whole_file_portion/step_size
356     */
357    step_size = (ADIO_Offset) avail_cb_nodes * stripe_size;
358    max_ntimes = (int)((max_end_loc - min_st_loc) / step_size + 1);
359    if (ntimes)
360        write_buf = (char *) ADIOI_Malloc(stripe_size);
361
362    /* calculate the start offset for each iteration */
363    off_list = (ADIO_Offset *) ADIOI_Malloc(max_ntimes * sizeof(ADIO_Offset));
364    for (m = 0; m < max_ntimes; m ++)
365        off_list[m] = max_end_loc;
366    for (i = 0; i < nprocs; i++) {
367        for (j = 0; j < others_req[i].count; j ++) {
368            req_off = others_req[i].offsets[j];
369            m = (int)((req_off - min_st_loc) / step_size);
370            off_list[m] = ADIOI_MIN(off_list[m], req_off);
371        }
372    }
373
374    recv_curr_offlen_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int));
375    send_curr_offlen_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int));
376    /* their use is explained below. calloc initializes to 0. */
377
378    recv_count = (int *) ADIOI_Malloc(nprocs * sizeof(int));
379    /* to store count of how many off-len pairs per proc are satisfied
380       in an iteration. */
381
382    send_size = (int *) ADIOI_Malloc(nprocs * sizeof(int));
383    /* total size of data to be sent to each proc. in an iteration.
384       Of size nprocs so that I can use MPI_Alltoall later. */
385
386    recv_size = (int *) ADIOI_Malloc(nprocs * sizeof(int));
387    /* total size of data to be recd. from each proc. in an iteration. */
388
389    sent_to_proc = (int *) ADIOI_Calloc(nprocs, sizeof(int));
390    /* amount of data sent to each proc so far. Used in
391       ADIOI_Fill_send_buffer. initialized to 0 here. */
392
393    send_buf_idx = (int *) ADIOI_Malloc(nprocs * sizeof(int));
394    curr_to_proc = (int *) ADIOI_Malloc(nprocs * sizeof(int));
395    done_to_proc = (int *) ADIOI_Malloc(nprocs * sizeof(int));
396    /* Above three are used in ADIOI_Fill_send_buffer */
397
398    recv_start_pos = (int *) ADIOI_Malloc(nprocs * sizeof(int));
399    /* used to store the starting value of recv_curr_offlen_ptr[i] in
400       this iteration */
401
402    ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
403    if (!buftype_is_contig) {
404        ADIOI_Flatten_datatype(datatype);
405        flat_buf = ADIOI_Flatlist;
406        while (flat_buf->type != datatype)
407            flat_buf = flat_buf->next;
408    }
409    MPI_Type_extent(datatype, &buftype_extent);
410    /* I need to check if there are any outstanding nonblocking writes to
411     * the file, which could potentially interfere with the writes taking
412     * place in this collective write call. Since this is not likely to be
413     * common, let me do the simplest thing possible here: Each process
414     * completes all pending nonblocking operations before completing.
415     */
416    /*ADIOI_Complete_async(error_code);
417    if (*error_code != MPI_SUCCESS) return;
418    MPI_Barrier(fd->comm);
419    */
420
421    iter_st_off = min_st_loc;
422
423    /* Although we have recognized the data according to OST index,
424     * a read-modify-write will be done if there is a hole between the data.
425     * For example: if blocksize=60, xfersize=30 and stripe_size=100,
426     * then rank0 will collect data [0, 30] and [60, 90] then write. There
427     * is a hole in [30, 60], which will cause a read-modify-write in [0, 90].
428     *
429     * To reduce its impact on the performance, we can disable data sieving
430     * by hint "ds_in_coll".
431     */
432    /* check the hint for data sieving */
433    data_sieving = fd->hints->fs_hints.lustre.ds_in_coll;
434
435    for (m = 0; m < max_ntimes; m++) {
436        /* go through all others_req and my_req to check which will be received
437         * and sent in this iteration.
438         */
439
440        /* Note that MPI guarantees that displacements in filetypes are in
441           monotonically nondecreasing order and that, for writes, the
442           filetypes cannot specify overlapping regions in the file. This
443           simplifies implementation a bit compared to reads. */
444
445        /*
446           off         = start offset in the file for the data to be written in
447                         this iteration
448           iter_st_off = start offset of this iteration
449           real_size   = size of data written (bytes) corresponding to off
450           max_size    = possible maximum size of data written in this iteration
451           req_off     = offset in the file for a particular contiguous request minus
452                         what was satisfied in previous iteration
453           send_off    = offset the request needed by other processes in this iteration
454           req_len     = size corresponding to req_off
455           send_len    = size corresponding to send_off
456         */
457
458        /* first calculate what should be communicated */
459        for (i = 0; i < nprocs; i++)
460            recv_count[i] = recv_size[i] = send_size[i] = 0;
461
462        off = off_list[m];
463        max_size = ADIOI_MIN(step_size, max_end_loc - iter_st_off + 1);
464        real_size = (int) ADIOI_MIN((off / stripe_size + 1) * stripe_size - off,
465                                    end_loc - off + 1);
466
467        for (i = 0; i < nprocs; i++) {
468            if (my_req[i].count) {
469                for (j = send_curr_offlen_ptr[i]; j < my_req[i].count; j++) {
470                    send_off = my_req[i].offsets[j];
471                    send_len = my_req[i].lens[j];
472                    if (send_off < iter_st_off + max_size) {
473                        send_size[i] += send_len;
474                    } else {
475                        break;
476                    }
477                }
478                send_curr_offlen_ptr[i] = j;
479            }
480            if (others_req[i].count) {
481                recv_start_pos[i] = recv_curr_offlen_ptr[i];
482                for (j = recv_curr_offlen_ptr[i]; j < others_req[i].count; j++) {
483                    req_off = others_req[i].offsets[j];
484                    req_len = others_req[i].lens[j];
485                    if (req_off < iter_st_off + max_size) {
486                        recv_count[i]++;
487                        MPI_Address(write_buf + req_off - off,
488                                    &(others_req[i].mem_ptrs[j]));
489                        recv_size[i] += req_len;
490                    } else {
491                        break;
492                    }
493                }
494                recv_curr_offlen_ptr[i] = j;
495            }
496        }
497        /* use variable "hole" to pass data_sieving flag into W_Exchange_data */
498        hole = data_sieving;
499        ADIOI_LUSTRE_W_Exchange_data(fd, buf, write_buf, flat_buf, offset_list,
500                                     len_list, send_size, recv_size, off, real_size,
501                                     recv_count, recv_start_pos, partial_recv,
502                                     sent_to_proc, nprocs, myrank,
503                                     buftype_is_contig, contig_access_count,
504                                     striping_info, others_req, send_buf_idx,
505                                     curr_to_proc, done_to_proc, &hole, m,
506                                     buftype_extent, buf_idx, error_code);
507        if (*error_code != MPI_SUCCESS)
508            goto over;
509
510        flag = 0;
511        for (i = 0; i < nprocs; i++)
512            if (recv_count[i]) {
513                flag = 1;
514                break;
515            }
516        if (flag) {
517            /* check whether to do data sieving */
518            if(data_sieving == ADIOI_HINT_ENABLE) {
519                ADIO_WriteContig(fd, write_buf, real_size, MPI_BYTE,
520                                 ADIO_EXPLICIT_OFFSET, off, &status,
521                                 error_code);
522            } else {
523                /* if there is no hole, write data in one time;
524                 * otherwise, write data in several times */
525                if (!hole) {
526                    ADIO_WriteContig(fd, write_buf, real_size, MPI_BYTE,
527                                     ADIO_EXPLICIT_OFFSET, off, &status,
528                                     error_code);
529                } else {
530                    for (i = 0; i < nprocs; i++) {
531                        if (others_req[i].count) {
532                            for (j = 0; j < others_req[i].count; j++) {
533                                if (others_req[i].offsets[j] < off + real_size &&
534                                    others_req[i].offsets[j] >= off) {
535                                    ADIO_WriteContig(fd,
536                                                     write_buf + others_req[i].offsets[j] - off,
537                                                     others_req[i].lens[j],
538                                                     MPI_BYTE, ADIO_EXPLICIT_OFFSET,
539                                                     others_req[i].offsets[j], &status,
540                                                     error_code);
541                                    if (*error_code != MPI_SUCCESS)
542                                        goto over;
543                                }
544                            }
545                        }
546                    }
547                }
548            }
549            if (*error_code != MPI_SUCCESS)
550                goto over;
551        }
552        iter_st_off += max_size;
553    }
554over:
555    if (ntimes)
556        ADIOI_Free(write_buf);
557    ADIOI_Free(recv_curr_offlen_ptr);
558    ADIOI_Free(send_curr_offlen_ptr);
559    ADIOI_Free(recv_count);
560    ADIOI_Free(send_size);
561    ADIOI_Free(recv_size);
562    ADIOI_Free(sent_to_proc);
563    ADIOI_Free(recv_start_pos);
564    ADIOI_Free(send_buf_idx);
565    ADIOI_Free(curr_to_proc);
566    ADIOI_Free(done_to_proc);
567    ADIOI_Free(off_list);
568}
569
570/* Sets error_code to MPI_SUCCESS if successful, or creates an error code
571 * in the case of error.
572 */
573static void ADIOI_LUSTRE_W_Exchange_data(ADIO_File fd, void *buf,
574                                         char *write_buf,
575                                         ADIOI_Flatlist_node *flat_buf,
576                                         ADIO_Offset *offset_list,
577                                         ADIO_Offset *len_list, int *send_size,
578                                         int *recv_size, ADIO_Offset off,
579                                         int size, int *count,
580                                         int *start_pos, int *partial_recv,
581                                         int *sent_to_proc, int nprocs,
582                                         int myrank, int buftype_is_contig,
583                                         int contig_access_count,
584                                         int *striping_info,
585                                         ADIOI_Access *others_req,
586                                         int *send_buf_idx,
587                                         int *curr_to_proc, int *done_to_proc,
588                                         int *hole, int iter,
589                                         MPI_Aint buftype_extent,
590                                         int *buf_idx, int *error_code)
591{
592    int i, j, nprocs_recv, nprocs_send, err;
593    char **send_buf = NULL;
594    MPI_Request *requests, *send_req;
595    MPI_Datatype *recv_types;
596    MPI_Status *statuses, status;
597    int *srt_len, sum, sum_recv;
598    ADIO_Offset *srt_off;
599    int data_sieving = *hole;
600    static char myname[] = "ADIOI_W_EXCHANGE_DATA";
601
602    /* create derived datatypes for recv */
603    nprocs_recv = 0;
604    for (i = 0; i < nprocs; i++)
605        if (recv_size[i])
606            nprocs_recv++;
607
608    recv_types = (MPI_Datatype *) ADIOI_Malloc((nprocs_recv + 1) *
609                                               sizeof(MPI_Datatype));
610    /* +1 to avoid a 0-size malloc */
611
612    j = 0;
613    for (i = 0; i < nprocs; i++) {
614        if (recv_size[i]) {
615            MPI_Type_hindexed(count[i],
616                              &(others_req[i].lens[start_pos[i]]),
617                              &(others_req[i].mem_ptrs[start_pos[i]]),
618                              MPI_BYTE, recv_types + j);
619            /* absolute displacements; use MPI_BOTTOM in recv */
620            MPI_Type_commit(recv_types + j);
621            j++;
622        }
623    }
624
625    /* To avoid a read-modify-write,
626     * check if there are holes in the data to be written.
627     * For this, merge the (sorted) offset lists others_req using a heap-merge.
628     */
629
630    sum = 0;
631    for (i = 0; i < nprocs; i++)
632        sum += count[i];
633    srt_off = (ADIO_Offset *) ADIOI_Malloc((sum + 1) * sizeof(ADIO_Offset));
634    srt_len = (int *) ADIOI_Malloc((sum + 1) * sizeof(int));
635    /* +1 to avoid a 0-size malloc */
636
637    ADIOI_Heap_merge(others_req, count, srt_off, srt_len, start_pos,
638                     nprocs, nprocs_recv, sum);
639
640    /* check if there are any holes */
641    *hole = 0;
642    for (i = 0; i < sum - 1; i++) {
643        if (srt_off[i] + srt_len[i] < srt_off[i + 1]) {
644            *hole = 1;
645            break;
646        }
647    }
648    /* In some cases (see John Bent ROMIO REQ # 835), an odd interaction
649     * between aggregation, nominally contiguous regions, and cb_buffer_size
650     * should be handled with a read-modify-write (otherwise we will write out
651     * more data than we receive from everyone else (inclusive), so override
652     * hole detection
653     */
654    if (*hole == 0) {
655        sum_recv = 0;
656        for (i = 0; i < nprocs; i++)
657            sum_recv += recv_size[i];
658        if (size > sum_recv)
659            *hole = 1;
660    }
661    /* check the hint for data sieving */
662    if (data_sieving == ADIOI_HINT_ENABLE && nprocs_recv && *hole) {
663        ADIO_ReadContig(fd, write_buf, size, MPI_BYTE,
664                        ADIO_EXPLICIT_OFFSET, off, &status, &err);
665        // --BEGIN ERROR HANDLING--
666        if (err != MPI_SUCCESS) {
667            *error_code = MPIO_Err_create_code(err,
668                                               MPIR_ERR_RECOVERABLE,
669                                               myname, __LINE__,
670                                               MPI_ERR_IO,
671                                               "**ioRMWrdwr", 0);
672            ADIOI_Free(recv_types);
673            ADIOI_Free(srt_off);
674            ADIOI_Free(srt_len);
675            return;
676        }
677        // --END ERROR HANDLING--
678    }
679    ADIOI_Free(srt_off);
680    ADIOI_Free(srt_len);
681
682    nprocs_send = 0;
683    for (i = 0; i < nprocs; i++)
684        if (send_size[i])
685            nprocs_send++;
686
687    if (fd->atomicity) {
688        /* bug fix from Wei-keng Liao and Kenin Coloma */
689        requests = (MPI_Request *) ADIOI_Malloc((nprocs_send + 1) *
690                                                sizeof(MPI_Request));
691        send_req = requests;
692    } else {
693        requests = (MPI_Request *) ADIOI_Malloc((nprocs_send + nprocs_recv + 1)*
694                                                sizeof(MPI_Request));
695        /* +1 to avoid a 0-size malloc */
696
697        /* post receives */
698        j = 0;
699        for (i = 0; i < nprocs; i++) {
700            if (recv_size[i]) {
701                MPI_Irecv(MPI_BOTTOM, 1, recv_types[j], i,
702                          myrank + i + 100 * iter, fd->comm, requests + j);
703                j++;
704            }
705        }
706        send_req = requests + nprocs_recv;
707    }
708
709    /* post sends.
710     * if buftype_is_contig, data can be directly sent from
711     * user buf at location given by buf_idx. else use send_buf.
712     */
713    if (buftype_is_contig) {
714        j = 0;
715        for (i = 0; i < nprocs; i++)
716            if (send_size[i]) {
717                MPI_Isend(((char *) buf) + buf_idx[i], send_size[i],
718                          MPI_BYTE, i, myrank + i + 100 * iter, fd->comm,
719                          send_req + j);
720                j++;
721                buf_idx[i] += send_size[i];
722            }
723    } else if (nprocs_send) {
724        /* buftype is not contig */
725        send_buf = (char **) ADIOI_Malloc(nprocs * sizeof(char *));
726        for (i = 0; i < nprocs; i++)
727            if (send_size[i])
728                send_buf[i] = (char *) ADIOI_Malloc(send_size[i]);
729
730        ADIOI_LUSTRE_Fill_send_buffer(fd, buf, flat_buf, send_buf, offset_list,
731                                      len_list, send_size, send_req,
732                                      sent_to_proc, nprocs, myrank,
733                                      contig_access_count, striping_info,
734                                      send_buf_idx, curr_to_proc, done_to_proc,
735                                      iter, buftype_extent);
736        /* the send is done in ADIOI_Fill_send_buffer */
737    }
738
739        /* bug fix from Wei-keng Liao and Kenin Coloma */
740    if (fd->atomicity) {
741        j = 0;
742        for (i = 0; i < nprocs; i++) {
743            MPI_Status wkl_status;
744            if (recv_size[i]) {
745                MPI_Recv(MPI_BOTTOM, 1, recv_types[j], i,
746                         myrank + i + 100 * iter, fd->comm, &wkl_status);
747                j++;
748            }
749        }
750    }
751
752    for (i = 0; i < nprocs_recv; i++)
753        MPI_Type_free(recv_types + i);
754    ADIOI_Free(recv_types);
755
756        /* bug fix from Wei-keng Liao and Kenin Coloma */
757        /* +1 to avoid a 0-size malloc */
758    if (fd->atomicity) {
759        statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send + 1) *
760                                               sizeof(MPI_Status));
761    } else {
762        statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send + nprocs_recv + 1) *
763                                               sizeof(MPI_Status));
764    }
765
766#ifdef NEEDS_MPI_TEST
767    i = 0;
768    if (fd->atomicity) {
769        /* bug fix from Wei-keng Liao and Kenin Coloma */
770        while (!i)
771            MPI_Testall(nprocs_send, send_req, &i, statuses);
772    } else {
773        while (!i)
774            MPI_Testall(nprocs_send + nprocs_recv, requests, &i, statuses);
775    }
776#else
777        /* bug fix from Wei-keng Liao and Kenin Coloma */
778    if (fd->atomicity)
779        MPI_Waitall(nprocs_send, send_req, statuses);
780    else
781        MPI_Waitall(nprocs_send + nprocs_recv, requests, statuses);
782#endif
783    ADIOI_Free(statuses);
784    ADIOI_Free(requests);
785    if (!buftype_is_contig && nprocs_send) {
786        for (i = 0; i < nprocs; i++)
787            if (send_size[i])
788                ADIOI_Free(send_buf[i]);
789        ADIOI_Free(send_buf);
790    }
791}
792
793#define ADIOI_BUF_INCR \
794{ \
795    while (buf_incr) { \
796        size_in_buf = ADIOI_MIN(buf_incr, flat_buf_sz); \
797        user_buf_idx += size_in_buf; \
798        flat_buf_sz -= size_in_buf; \
799        if (!flat_buf_sz) { \
800            if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
801            else { \
802                flat_buf_idx = 0; \
803                n_buftypes++; \
804            } \
805            user_buf_idx = flat_buf->indices[flat_buf_idx] + \
806                              n_buftypes*buftype_extent; \
807            flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
808        } \
809        buf_incr -= size_in_buf; \
810    } \
811}
812
813
814#define ADIOI_BUF_COPY \
815{ \
816    while (size) { \
817        size_in_buf = ADIOI_MIN(size, flat_buf_sz); \
818        memcpy(&(send_buf[p][send_buf_idx[p]]), \
819               ((char *) buf) + user_buf_idx, size_in_buf); \
820        send_buf_idx[p] += size_in_buf; \
821        user_buf_idx += size_in_buf; \
822        flat_buf_sz -= size_in_buf; \
823        if (!flat_buf_sz) { \
824            if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
825            else { \
826                flat_buf_idx = 0; \
827                n_buftypes++; \
828            } \
829            user_buf_idx = flat_buf->indices[flat_buf_idx] + \
830                              n_buftypes*buftype_extent; \
831            flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
832        } \
833        size -= size_in_buf; \
834        buf_incr -= size_in_buf; \
835    } \
836    ADIOI_BUF_INCR \
837}
838
839static void ADIOI_LUSTRE_Fill_send_buffer(ADIO_File fd, void *buf,
840                                          ADIOI_Flatlist_node *flat_buf,
841                                          char **send_buf,
842                                          ADIO_Offset *offset_list,
843                                          ADIO_Offset *len_list, int *send_size,
844                                          MPI_Request *requests,
845                                          int *sent_to_proc, int nprocs,
846                                          int myrank,
847                                          int contig_access_count,
848                                          int *striping_info,
849                                          int *send_buf_idx,
850                                          int *curr_to_proc,
851                                          int *done_to_proc, int iter,
852                                          MPI_Aint buftype_extent)
853{
854    /* this function is only called if buftype is not contig */
855    int i, p, flat_buf_idx, size;
856    int flat_buf_sz, buf_incr, size_in_buf, jj, n_buftypes;
857    ADIO_Offset off, len, rem_len, user_buf_idx;
858
859    /* curr_to_proc[p] = amount of data sent to proc. p that has already
860     * been accounted for so far
861     * done_to_proc[p] = amount of data already sent to proc. p in
862     * previous iterations
863     * user_buf_idx = current location in user buffer
864     * send_buf_idx[p] = current location in send_buf of proc. p
865     */
866
867    for (i = 0; i < nprocs; i++) {
868        send_buf_idx[i] = curr_to_proc[i] = 0;
869        done_to_proc[i] = sent_to_proc[i];
870    }
871    jj = 0;
872
873    user_buf_idx = flat_buf->indices[0];
874    flat_buf_idx = 0;
875    n_buftypes = 0;
876    flat_buf_sz = flat_buf->blocklens[0];
877
878    /* flat_buf_idx = current index into flattened buftype
879     * flat_buf_sz = size of current contiguous component in flattened buf
880     */
881    for (i = 0; i < contig_access_count; i++) {
882        off = offset_list[i];
883        rem_len = (ADIO_Offset) len_list[i];
884
885        /*this request may span to more than one process */
886        while (rem_len != 0) {
887            len = rem_len;
888            /* NOTE: len value is modified by ADIOI_Calc_aggregator() to be no
889             * longer than the single region that processor "p" is responsible
890             * for.
891             */
892            p = ADIOI_LUSTRE_Calc_aggregator(fd, off, &len, striping_info);
893
894            if (send_buf_idx[p] < send_size[p]) {
895                if (curr_to_proc[p] + len > done_to_proc[p]) {
896                    if (done_to_proc[p] > curr_to_proc[p]) {
897                        size = (int) ADIOI_MIN(curr_to_proc[p] + len -
898                                               done_to_proc[p],
899                                               send_size[p] -
900                                               send_buf_idx[p]);
901                        buf_incr = done_to_proc[p] - curr_to_proc[p];
902                        ADIOI_BUF_INCR
903                            buf_incr = (int) (curr_to_proc[p] + len -
904                                              done_to_proc[p]);
905                        curr_to_proc[p] = done_to_proc[p] + size;
906                        ADIOI_BUF_COPY
907                    } else {
908                        size = (int) ADIOI_MIN(len, send_size[p] -
909                                               send_buf_idx[p]);
910                        buf_incr = (int) len;
911                        curr_to_proc[p] += size;
912                        ADIOI_BUF_COPY
913                    }
914                    if (send_buf_idx[p] == send_size[p]) {
915                        MPI_Isend(send_buf[p], send_size[p], MPI_BYTE, p,
916                                  myrank + p + 100 * iter, fd->comm,
917                                  requests + jj);
918                        jj++;
919                    }
920                } else {
921                    curr_to_proc[p] += (int) len;
922                    buf_incr = (int) len;
923                    ADIOI_BUF_INCR
924                }
925            } else {
926                buf_incr = (int) len;
927                ADIOI_BUF_INCR
928            }
929            off += len;
930            rem_len -= len;
931        }
932    }
933    for (i = 0; i < nprocs; i++)
934        if (send_size[i])
935            sent_to_proc[i] = curr_to_proc[i];
936}
Note: See TracBrowser for help on using the browser.