root/mpich2/trunk/src/mpi/romio/adio/ad_lustre/ad_lustre_wrcoll.c @ 4458

Revision 4458, 33.1 KB (checked in by robl, 7 months ago)

updates from Sun for Lustre:
- rename a couple hints to be more meaningful
- remove ADIOI_LUSTRE_Calc_others_req and related hints until we can figure out

a good way to do this that doesn't voilate the "we must be correct if user
lies to us via hints" rule.

Line 
1/* -*- Mode: C; c-basic-offset:4 ; -*- */
2/*
3 *   Copyright (C) 1997 University of Chicago.
4 *   See COPYRIGHT notice in top-level directory.
5 *
6 *   Copyright (C) 2007 Oak Ridge National Laboratory
7 *
8 *   Copyright (C) 2008 Sun Microsystems, Lustre group
9 */
10
11#include "ad_lustre.h"
12#include "adio_extern.h"
13
14/* prototypes of functions used for collective writes only. */
15static void ADIOI_LUSTRE_Exch_and_write(ADIO_File fd, void *buf,
16                                        MPI_Datatype datatype, int nprocs,
17                                        int myrank,
18                                        ADIOI_Access *others_req,
19                                        ADIOI_Access *my_req,
20                                        ADIO_Offset *offset_list,
21                                        int *len_list,
22                                        int contig_access_count,
23                                        int *striping_info,
24                                        int *buf_idx, int *error_code);
25static void ADIOI_LUSTRE_Fill_send_buffer(ADIO_File fd, void *buf,
26                                          ADIOI_Flatlist_node *flat_buf,
27                                          char **send_buf,
28                                          ADIO_Offset *offset_list,
29                                          int *len_list, int *send_size,
30                                          MPI_Request *requests,
31                                          int *sent_to_proc, int nprocs,
32                                          int myrank, int contig_access_count,
33                                          int *striping_info,
34                                          int *send_buf_idx,
35                                          int *curr_to_proc,
36                                          int *done_to_proc, int iter,
37                                          MPI_Aint buftype_extent);
38static void ADIOI_LUSTRE_W_Exchange_data(ADIO_File fd, void *buf,
39                                         char *write_buf,
40                                         ADIOI_Flatlist_node *flat_buf,
41                                         ADIO_Offset *offset_list,
42                                         int *len_list, int *send_size,
43                                         int *recv_size, ADIO_Offset off,
44                                         int size, int *count,
45                                         int *start_pos, int *partial_recv,
46                                         int *sent_to_proc, int nprocs,
47                                         int myrank, int buftype_is_contig,
48                                         int contig_access_count,
49                                         int *striping_info,
50                                         ADIOI_Access *others_req,
51                                         int *send_buf_idx,
52                                         int *curr_to_proc,
53                                         int *done_to_proc, int *hole,
54                                         int iter, MPI_Aint buftype_extent,
55                                         int *buf_idx, int *error_code);
56void ADIOI_Heap_merge(ADIOI_Access *others_req, int *count,
57                      ADIO_Offset *srt_off, int *srt_len, int *start_pos,
58                      int nprocs, int nprocs_recv, int total_elements);
59
60void ADIOI_LUSTRE_WriteStridedColl(ADIO_File fd, void *buf, int count,
61                                   MPI_Datatype datatype,
62                                   int file_ptr_type, ADIO_Offset offset,
63                                   ADIO_Status *status, int *error_code)
64{
65    /* Uses a generalized version of the extended two-phase method described
66     * in "An Extended Two-Phase Method for Accessing Sections of
67     * Out-of-Core Arrays", Rajeev Thakur and Alok Choudhary,
68     * Scientific Programming, (5)4:301--317, Winter 1996.
69     * http://www.mcs.anl.gov/home/thakur/ext2ph.ps
70     */
71
72    ADIOI_Access *my_req;
73    /* array of nprocs access structures, one for each other process has
74       this process's request */
75
76    ADIOI_Access *others_req;
77    /* array of nprocs access structures, one for each other process
78       whose request is written by this process. */
79
80    int i, filetype_is_contig, nprocs, myrank, do_collect = 0;
81    int contig_access_count = 0, buftype_is_contig, interleave_count = 0;
82    int *count_my_req_per_proc, count_my_req_procs, count_others_req_procs;
83    ADIO_Offset orig_fp, start_offset, end_offset, off;
84    ADIO_Offset *offset_list = NULL, *st_offsets = NULL, *end_offsets = NULL;
85    int *buf_idx = NULL, *len_list = NULL, *striping_info = NULL;
86    int old_error, tmp_error;
87
88    MPI_Comm_size(fd->comm, &nprocs);
89    MPI_Comm_rank(fd->comm, &myrank);
90
91    orig_fp = fd->fp_ind;
92
93    /* IO patten identification if cb_write isn't disabled */
94    if (fd->hints->cb_write != ADIOI_HINT_DISABLE) {
95        /* For this process's request, calculate the list of offsets and
96           lengths in the file and determine the start and end offsets. */
97
98        /* Note: end_offset points to the last byte-offset that will be accessed.
99         * e.g., if start_offset=0 and 100 bytes to be read, end_offset=99
100         */
101
102        ADIOI_Calc_my_off_len(fd, count, datatype, file_ptr_type, offset,
103                              &offset_list, &len_list, &start_offset,
104                              &end_offset, &contig_access_count);
105
106        /* each process communicates its start and end offsets to other
107         * processes. The result is an array each of start and end offsets
108         * stored in order of process rank.
109         */
110        st_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs * sizeof(ADIO_Offset));
111        end_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs * sizeof(ADIO_Offset));
112        MPI_Allgather(&start_offset, 1, ADIO_OFFSET, st_offsets, 1,
113                      ADIO_OFFSET, fd->comm);
114        MPI_Allgather(&end_offset, 1, ADIO_OFFSET, end_offsets, 1,
115                      ADIO_OFFSET, fd->comm);
116        /* are the accesses of different processes interleaved? */
117        for (i = 1; i < nprocs; i++)
118            if ((st_offsets[i] < end_offsets[i-1]) &&
119                (st_offsets[i] <= end_offsets[i]))
120                interleave_count++;
121        /* This is a rudimentary check for interleaving, but should suffice
122           for the moment. */
123
124        /* Two typical access patterns can benefit from collective write.
125         *   1) the processes are interleaved, and
126         *   2) the req size is small.
127         */
128        if (interleave_count > 0) {
129            do_collect = 1;
130        } else {
131            do_collect = ADIOI_LUSTRE_Docollect(fd, contig_access_count,
132                                                len_list, nprocs);
133        }
134    }
135    ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
136
137    /* Decide if collective I/O should be done */
138    if ((!do_collect && fd->hints->cb_write == ADIOI_HINT_AUTO) ||
139        fd->hints->cb_write == ADIOI_HINT_DISABLE) {
140
141        int filerange_is_contig = 0;
142
143        /* use independent accesses */
144        if (fd->hints->cb_write != ADIOI_HINT_DISABLE) {
145            ADIOI_Free(offset_list);
146            ADIOI_Free(len_list);
147            ADIOI_Free(st_offsets);
148            ADIOI_Free(end_offsets);
149        }
150
151        fd->fp_ind = orig_fp;
152        ADIOI_Datatype_iscontig(fd->filetype, &filetype_is_contig);
153        if (buftype_is_contig && filetype_is_contig) {
154            if (file_ptr_type == ADIO_EXPLICIT_OFFSET) {
155                off = fd->disp + (fd->etype_size) * offset;
156                ADIO_WriteContig(fd, buf, count, datatype,
157                                 ADIO_EXPLICIT_OFFSET,
158                                 off, status, error_code);
159            } else
160                ADIO_WriteContig(fd, buf, count, datatype, ADIO_INDIVIDUAL,
161                                 0, status, error_code);
162        } else {
163            ADIO_WriteStrided(fd, buf, count, datatype, file_ptr_type,
164                              offset, status, error_code);
165        }
166        return;
167    }
168
169    /* Get Lustre hints information */
170    ADIOI_LUSTRE_Get_striping_info(fd, &striping_info, 1);
171
172    /* calculate what portions of the access requests of this process are
173     * located in which process
174     */
175    ADIOI_LUSTRE_Calc_my_req(fd, offset_list, len_list, contig_access_count,
176                             striping_info, nprocs, &count_my_req_procs,
177                             &count_my_req_per_proc, &my_req, &buf_idx);
178
179    /* based on everyone's my_req, calculate what requests of other processes
180     * will be accessed by this process.
181     * count_others_req_procs = number of processes whose requests (including
182     * this process itself) will be accessed by this process
183     * count_others_req_per_proc[i] indicates how many separate contiguous
184     * requests of proc. i will be accessed by this process.
185     */
186
187    ADIOI_Calc_others_req(fd, count_my_req_procs, count_my_req_per_proc,
188                          my_req, nprocs, myrank, &count_others_req_procs,
189                          &others_req);
190    ADIOI_Free(count_my_req_per_proc);
191
192    /* exchange data and write in sizes of no more than stripe_size. */
193    ADIOI_LUSTRE_Exch_and_write(fd, buf, datatype, nprocs, myrank,
194                                others_req, my_req,
195                                offset_list, len_list, contig_access_count,
196                                striping_info, buf_idx, error_code);
197
198    /* If this collective write is followed by an independent write,
199     * it's possible to have those subsequent writes on other processes
200     * race ahead and sneak in before the read-modify-write completes.
201     * We carry out a collective communication at the end here so no one
202     * can start independent i/o before collective I/O completes.
203     *
204     * need to do some gymnastics with the error codes so that if something
205     * went wrong, all processes report error, but if a process has a more
206     * specific error code, we can still have that process report the
207     * additional information */
208
209    old_error = *error_code;
210    if (*error_code != MPI_SUCCESS)
211        *error_code = MPI_ERR_IO;
212
213    /* optimization: if only one process performing i/o, we can perform
214     * a less-expensive Bcast  */
215#ifdef ADIOI_MPE_LOGGING
216    MPE_Log_event(ADIOI_MPE_postwrite_a, 0, NULL);
217#endif
218    if (fd->hints->cb_nodes == 1)
219        MPI_Bcast(error_code, 1, MPI_INT,
220                  fd->hints->ranklist[0], fd->comm);
221    else {
222        tmp_error = *error_code;
223        MPI_Allreduce(&tmp_error, error_code, 1, MPI_INT,
224                      MPI_MAX, fd->comm);
225    }
226#ifdef ADIOI_MPE_LOGGING
227    MPE_Log_event(ADIOI_MPE_postwrite_b, 0, NULL);
228#endif
229
230    if ((old_error != MPI_SUCCESS) && (old_error != MPI_ERR_IO))
231        *error_code = old_error;
232
233
234    if (!buftype_is_contig)
235        ADIOI_Delete_flattened(datatype);
236
237    /* free all memory allocated for collective I/O */
238    /* free others_req */
239    for (i = 0; i < nprocs; i++) {
240        if (others_req[i].count) {
241            ADIOI_Free(others_req[i].offsets);
242            ADIOI_Free(others_req[i].lens);
243            ADIOI_Free(others_req[i].mem_ptrs);
244        }
245    }
246    ADIOI_Free(others_req);
247    /* free my_req here */
248    for (i = 0; i < nprocs; i++) {
249        if (my_req[i].count) {
250            ADIOI_Free(my_req[i].offsets);
251            ADIOI_Free(my_req[i].lens);
252        }
253    }
254    ADIOI_Free(my_req);
255    ADIOI_Free(buf_idx);
256    ADIOI_Free(offset_list);
257    ADIOI_Free(len_list);
258    ADIOI_Free(st_offsets);
259    ADIOI_Free(end_offsets);
260    ADIOI_Free(striping_info);
261
262#ifdef HAVE_STATUS_SET_BYTES
263    if (status) {
264        int bufsize, size;
265        /* Don't set status if it isn't needed */
266        MPI_Type_size(datatype, &size);
267        bufsize = size * count;
268        MPIR_Status_set_bytes(status, datatype, bufsize);
269    }
270    /* This is a temporary way of filling in status. The right way is to
271     * keep track of how much data was actually written during collective I/O.
272     */
273#endif
274
275    fd->fp_sys_posn = -1;       /* set it to null. */
276}
277
278/* If successful, error_code is set to MPI_SUCCESS.  Otherwise an error
279 * code is created and returned in error_code.
280 */
281static void ADIOI_LUSTRE_Exch_and_write(ADIO_File fd, void *buf,
282                                        MPI_Datatype datatype, int nprocs,
283                                        int myrank, ADIOI_Access *others_req,
284                                        ADIOI_Access *my_req,
285                                        ADIO_Offset *offset_list,
286                                        int *len_list, int contig_access_count,
287                                        int *striping_info, int *buf_idx,
288                                        int *error_code)
289{
290    /* Send data to appropriate processes and write in sizes of no more
291     * than lustre stripe_size.
292     * The idea is to reduce the amount of extra memory required for
293     * collective I/O. If all data were written all at once, which is much
294     * easier, it would require temp space more than the size of user_buf,
295     * which is often unacceptable. For example, to write a distributed
296     * array to a file, where each local array is 8Mbytes, requiring
297     * at least another 8Mbytes of temp space is unacceptable.
298     */
299
300    int hole, i, j, m, flag, ntimes = 1 , max_ntimes, buftype_is_contig;
301    ADIO_Offset st_loc = -1, end_loc = -1, min_st_loc, max_end_loc;
302    ADIO_Offset off, req_off, send_off, iter_st_off, *off_list;
303    ADIO_Offset max_size, step_size = 0;
304    int real_size, req_len, send_len;
305    int *recv_curr_offlen_ptr, *recv_count, *recv_size;
306    int *send_curr_offlen_ptr, *send_size;
307    int *partial_recv, *sent_to_proc, *recv_start_pos;
308    int *send_buf_idx, *curr_to_proc, *done_to_proc;
309    char *write_buf = NULL;
310    MPI_Status status;
311    ADIOI_Flatlist_node *flat_buf = NULL;
312    MPI_Aint buftype_extent;
313    int stripe_size = striping_info[0], avail_cb_nodes = striping_info[2];
314    int data_sieving = 0;
315
316    *error_code = MPI_SUCCESS;  /* changed below if error */
317    /* only I/O errors are currently reported */
318
319    /* calculate the number of writes of stripe size to be done.
320     * That gives the no. of communication phases as well.
321     * Note:
322     *   Because we redistribute data in stripe-contiguous pattern for Lustre,
323     *   each process has the same no. of communication phases.
324     */
325
326    for (i = 0; i < nprocs; i++) {
327        if (others_req[i].count) {
328            st_loc = others_req[i].offsets[0];
329            end_loc = others_req[i].offsets[0];
330            break;
331        }
332    }
333    for (i = 0; i < nprocs; i++) {
334        for (j = 0; j < others_req[i].count; j++) {
335            st_loc = ADIOI_MIN(st_loc, others_req[i].offsets[j]);
336            end_loc = ADIOI_MAX(end_loc, (others_req[i].offsets[j] +
337                                          others_req[i].lens[j] - 1));
338        }
339    }
340    /* this process does no writing. */
341    if ((st_loc == -1) && (end_loc == -1))
342        ntimes = 0;
343    MPI_Allreduce(&end_loc, &max_end_loc, 1, MPI_LONG_LONG_INT, MPI_MAX, fd->comm);
344    /* avoid min_st_loc be -1 */
345    if (st_loc == -1)
346        st_loc = max_end_loc;
347    MPI_Allreduce(&st_loc, &min_st_loc, 1, MPI_LONG_LONG_INT, MPI_MIN, fd->comm);
348    /* align downward */
349    min_st_loc -= min_st_loc % (ADIO_Offset)stripe_size;
350
351    /* Each time, only avail_cb_nodes number of IO clients perform IO,
352     * so, step_size=avail_cb_nodes*stripe_size IO will be performed at most,
353     * and ntimes=whole_file_portion/step_size
354     */
355    step_size = (ADIO_Offset) avail_cb_nodes * stripe_size;
356    max_ntimes = (int)((max_end_loc - min_st_loc) / step_size + 1);
357    if (ntimes)
358        write_buf = (char *) ADIOI_Malloc(stripe_size);
359
360    /* calculate the start offset for each iteration */
361    off_list = (ADIO_Offset *) ADIOI_Malloc(max_ntimes * sizeof(ADIO_Offset));
362    for (m = 0; m < max_ntimes; m ++)
363        off_list[m] = max_end_loc;
364    for (i = 0; i < nprocs; i++) {
365        for (j = 0; j < others_req[i].count; j ++) {
366            req_off = others_req[i].offsets[j];
367            m = (int)((req_off - min_st_loc) / step_size);
368            off_list[m] = ADIOI_MIN(off_list[m], req_off);
369        }
370    }
371
372    recv_curr_offlen_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int));
373    send_curr_offlen_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int));
374    /* their use is explained below. calloc initializes to 0. */
375
376    recv_count = (int *) ADIOI_Malloc(nprocs * sizeof(int));
377    /* to store count of how many off-len pairs per proc are satisfied
378       in an iteration. */
379
380    send_size = (int *) ADIOI_Malloc(nprocs * sizeof(int));
381    /* total size of data to be sent to each proc. in an iteration.
382       Of size nprocs so that I can use MPI_Alltoall later. */
383
384    recv_size = (int *) ADIOI_Malloc(nprocs * sizeof(int));
385    /* total size of data to be recd. from each proc. in an iteration. */
386
387    sent_to_proc = (int *) ADIOI_Calloc(nprocs, sizeof(int));
388    /* amount of data sent to each proc so far. Used in
389       ADIOI_Fill_send_buffer. initialized to 0 here. */
390
391    send_buf_idx = (int *) ADIOI_Malloc(nprocs * sizeof(int));
392    curr_to_proc = (int *) ADIOI_Malloc(nprocs * sizeof(int));
393    done_to_proc = (int *) ADIOI_Malloc(nprocs * sizeof(int));
394    /* Above three are used in ADIOI_Fill_send_buffer */
395
396    recv_start_pos = (int *) ADIOI_Malloc(nprocs * sizeof(int));
397    /* used to store the starting value of recv_curr_offlen_ptr[i] in
398       this iteration */
399
400    ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
401    if (!buftype_is_contig) {
402        ADIOI_Flatten_datatype(datatype);
403        flat_buf = ADIOI_Flatlist;
404        while (flat_buf->type != datatype)
405            flat_buf = flat_buf->next;
406    }
407    MPI_Type_extent(datatype, &buftype_extent);
408    /* I need to check if there are any outstanding nonblocking writes to
409     * the file, which could potentially interfere with the writes taking
410     * place in this collective write call. Since this is not likely to be
411     * common, let me do the simplest thing possible here: Each process
412     * completes all pending nonblocking operations before completing.
413     */
414    /*ADIOI_Complete_async(error_code);
415    if (*error_code != MPI_SUCCESS) return;
416    MPI_Barrier(fd->comm);
417    */
418
419    iter_st_off = min_st_loc;
420
421    /* Although we have recognized the data according to OST index,
422     * a read-modify-write will be done if there is a hole between the data.
423     * For example: if blocksize=60, xfersize=30 and stripe_size=100,
424     * then rank0 will collect data [0, 30] and [60, 90] then write. There
425     * is a hole in [30, 60], which will cause a read-modify-write in [0, 90].
426     *
427     * To reduce its impact on the performance, we can disable data sieving
428     * by hint "ds_in_coll".
429     */
430    /* check the hint for data sieving */
431    data_sieving = fd->hints->fs_hints.lustre.ds_in_coll;
432
433    for (m = 0; m < max_ntimes; m++) {
434        /* go through all others_req and my_req to check which will be received
435         * and sent in this iteration.
436         */
437
438        /* Note that MPI guarantees that displacements in filetypes are in
439           monotonically nondecreasing order and that, for writes, the
440           filetypes cannot specify overlapping regions in the file. This
441           simplifies implementation a bit compared to reads. */
442
443        /*
444           off         = start offset in the file for the data to be written in
445                         this iteration
446           iter_st_off = start offset of this iteration
447           real_size   = size of data written (bytes) corresponding to off
448           max_size    = possible maximum size of data written in this iteration
449           req_off     = offset in the file for a particular contiguous request minus
450                         what was satisfied in previous iteration
451           send_off    = offset the request needed by other processes in this iteration
452           req_len     = size corresponding to req_off
453           send_len    = size corresponding to send_off
454         */
455
456        /* first calculate what should be communicated */
457        for (i = 0; i < nprocs; i++)
458            recv_count[i] = recv_size[i] = send_size[i] = 0;
459
460        off = off_list[m];
461        max_size = ADIOI_MIN(step_size, max_end_loc - iter_st_off + 1);
462        real_size = (int) ADIOI_MIN((off / stripe_size + 1) * stripe_size - off,
463                                    end_loc - off + 1);
464
465        for (i = 0; i < nprocs; i++) {
466            if (my_req[i].count) {
467                for (j = send_curr_offlen_ptr[i]; j < my_req[i].count; j++) {
468                    send_off = my_req[i].offsets[j];
469                    send_len = my_req[i].lens[j];
470                    if (send_off < iter_st_off + max_size) {
471                        send_size[i] += send_len;
472                    } else {
473                        break;
474                    }
475                }
476                send_curr_offlen_ptr[i] = j;
477            }
478            if (others_req[i].count) {
479                recv_start_pos[i] = recv_curr_offlen_ptr[i];
480                for (j = recv_curr_offlen_ptr[i]; j < others_req[i].count; j++) {
481                    req_off = others_req[i].offsets[j];
482                    req_len = others_req[i].lens[j];
483                    if (req_off < iter_st_off + max_size) {
484                        recv_count[i]++;
485                        MPI_Address(write_buf + req_off - off,
486                                    &(others_req[i].mem_ptrs[j]));
487                        recv_size[i] += req_len;
488                    } else {
489                        break;
490                    }
491                }
492                recv_curr_offlen_ptr[i] = j;
493            }
494        }
495        /* use variable "hole" to pass data_sieving flag into W_Exchange_data */
496        hole = data_sieving;
497        ADIOI_LUSTRE_W_Exchange_data(fd, buf, write_buf, flat_buf, offset_list,
498                                     len_list, send_size, recv_size, off, real_size,
499                                     recv_count, recv_start_pos, partial_recv,
500                                     sent_to_proc, nprocs, myrank,
501                                     buftype_is_contig, contig_access_count,
502                                     striping_info, others_req, send_buf_idx,
503                                     curr_to_proc, done_to_proc, &hole, m,
504                                     buftype_extent, buf_idx, error_code);
505        if (*error_code != MPI_SUCCESS)
506            goto over;
507
508        flag = 0;
509        for (i = 0; i < nprocs; i++)
510            if (recv_count[i]) {
511                flag = 1;
512                break;
513            }
514        if (flag) {
515            /* check whether to do data sieving */
516            if(data_sieving == ADIOI_HINT_ENABLE) {
517                ADIO_WriteContig(fd, write_buf, real_size, MPI_BYTE,
518                                 ADIO_EXPLICIT_OFFSET, off, &status,
519                                 error_code);
520            } else {
521                /* if there is no hole, write data in one time;
522                 * otherwise, write data in several times */
523                if (!hole) {
524                    ADIO_WriteContig(fd, write_buf, real_size, MPI_BYTE,
525                                     ADIO_EXPLICIT_OFFSET, off, &status,
526                                     error_code);
527                } else {
528                    for (i = 0; i < nprocs; i++) {
529                        if (others_req[i].count) {
530                            for (j = 0; j < others_req[i].count; j++) {
531                                if (others_req[i].offsets[j] < off + real_size &&
532                                    others_req[i].offsets[j] >= off) {
533                                    ADIO_WriteContig(fd,
534                                                     write_buf + others_req[i].offsets[j] - off,
535                                                     others_req[i].lens[j],
536                                                     MPI_BYTE, ADIO_EXPLICIT_OFFSET,
537                                                     others_req[i].offsets[j], &status,
538                                                     error_code);
539                                    if (*error_code != MPI_SUCCESS)
540                                        goto over;
541                                }
542                            }
543                        }
544                    }
545                }
546            }
547            if (*error_code != MPI_SUCCESS)
548                goto over;
549        }
550        iter_st_off += max_size;
551    }
552over:
553    if (ntimes)
554        ADIOI_Free(write_buf);
555    ADIOI_Free(recv_curr_offlen_ptr);
556    ADIOI_Free(send_curr_offlen_ptr);
557    ADIOI_Free(recv_count);
558    ADIOI_Free(send_size);
559    ADIOI_Free(recv_size);
560    ADIOI_Free(sent_to_proc);
561    ADIOI_Free(recv_start_pos);
562    ADIOI_Free(send_buf_idx);
563    ADIOI_Free(curr_to_proc);
564    ADIOI_Free(done_to_proc);
565    ADIOI_Free(off_list);
566}
567
568/* Sets error_code to MPI_SUCCESS if successful, or creates an error code
569 * in the case of error.
570 */
571static void ADIOI_LUSTRE_W_Exchange_data(ADIO_File fd, void *buf,
572                                         char *write_buf,
573                                         ADIOI_Flatlist_node *flat_buf,
574                                         ADIO_Offset *offset_list,
575                                         int *len_list, int *send_size,
576                                         int *recv_size, ADIO_Offset off,
577                                         int size, int *count,
578                                         int *start_pos, int *partial_recv,
579                                         int *sent_to_proc, int nprocs,
580                                         int myrank, int buftype_is_contig,
581                                         int contig_access_count,
582                                         int *striping_info,
583                                         ADIOI_Access *others_req,
584                                         int *send_buf_idx,
585                                         int *curr_to_proc, int *done_to_proc,
586                                         int *hole, int iter,
587                                         MPI_Aint buftype_extent,
588                                         int *buf_idx, int *error_code)
589{
590    int i, j, nprocs_recv, nprocs_send, err;
591    char **send_buf = NULL;
592    MPI_Request *requests, *send_req;
593    MPI_Datatype *recv_types;
594    MPI_Status *statuses, status;
595    int *srt_len, sum, sum_recv;
596    ADIO_Offset *srt_off;
597    int data_sieving = *hole;
598    static char myname[] = "ADIOI_W_EXCHANGE_DATA";
599
600    /* create derived datatypes for recv */
601    nprocs_recv = 0;
602    for (i = 0; i < nprocs; i++)
603        if (recv_size[i])
604            nprocs_recv++;
605
606    recv_types = (MPI_Datatype *) ADIOI_Malloc((nprocs_recv + 1) *
607                                               sizeof(MPI_Datatype));
608    /* +1 to avoid a 0-size malloc */
609
610    j = 0;
611    for (i = 0; i < nprocs; i++) {
612        if (recv_size[i]) {
613            MPI_Type_hindexed(count[i],
614                              &(others_req[i].lens[start_pos[i]]),
615                              &(others_req[i].mem_ptrs[start_pos[i]]),
616                              MPI_BYTE, recv_types + j);
617            /* absolute displacements; use MPI_BOTTOM in recv */
618            MPI_Type_commit(recv_types + j);
619            j++;
620        }
621    }
622
623    /* To avoid a read-modify-write,
624     * check if there are holes in the data to be written.
625     * For this, merge the (sorted) offset lists others_req using a heap-merge.
626     */
627
628    sum = 0;
629    for (i = 0; i < nprocs; i++)
630        sum += count[i];
631    srt_off = (ADIO_Offset *) ADIOI_Malloc((sum + 1) * sizeof(ADIO_Offset));
632    srt_len = (int *) ADIOI_Malloc((sum + 1) * sizeof(int));
633    /* +1 to avoid a 0-size malloc */
634
635    ADIOI_Heap_merge(others_req, count, srt_off, srt_len, start_pos,
636                     nprocs, nprocs_recv, sum);
637
638    /* check if there are any holes */
639    *hole = 0;
640    for (i = 0; i < sum - 1; i++) {
641        if (srt_off[i] + srt_len[i] < srt_off[i + 1]) {
642            *hole = 1;
643            break;
644        }
645    }
646    /* In some cases (see John Bent ROMIO REQ # 835), an odd interaction
647     * between aggregation, nominally contiguous regions, and cb_buffer_size
648     * should be handled with a read-modify-write (otherwise we will write out
649     * more data than we receive from everyone else (inclusive), so override
650     * hole detection
651     */
652    if (*hole == 0) {
653        sum_recv = 0;
654        for (i = 0; i < nprocs; i++)
655            sum_recv += recv_size[i];
656        if (size > sum_recv)
657            *hole = 1;
658    }
659    /* check the hint for data sieving */
660    if (data_sieving == ADIOI_HINT_ENABLE && nprocs_recv && *hole) {
661        ADIO_ReadContig(fd, write_buf, size, MPI_BYTE,
662                        ADIO_EXPLICIT_OFFSET, off, &status, &err);
663        // --BEGIN ERROR HANDLING--
664        if (err != MPI_SUCCESS) {
665            *error_code = MPIO_Err_create_code(err,
666                                               MPIR_ERR_RECOVERABLE,
667                                               myname, __LINE__,
668                                               MPI_ERR_IO,
669                                               "**ioRMWrdwr", 0);
670            ADIOI_Free(recv_types);
671            ADIOI_Free(srt_off);
672            ADIOI_Free(srt_len);
673            return;
674        }
675        // --END ERROR HANDLING--
676    }
677    ADIOI_Free(srt_off);
678    ADIOI_Free(srt_len);
679
680    nprocs_send = 0;
681    for (i = 0; i < nprocs; i++)
682        if (send_size[i])
683            nprocs_send++;
684
685    if (fd->atomicity) {
686        /* bug fix from Wei-keng Liao and Kenin Coloma */
687        requests = (MPI_Request *) ADIOI_Malloc((nprocs_send + 1) *
688                                                sizeof(MPI_Request));
689        send_req = requests;
690    } else {
691        requests = (MPI_Request *) ADIOI_Malloc((nprocs_send + nprocs_recv + 1)*
692                                                sizeof(MPI_Request));
693        /* +1 to avoid a 0-size malloc */
694
695        /* post receives */
696        j = 0;
697        for (i = 0; i < nprocs; i++) {
698            if (recv_size[i]) {
699                MPI_Irecv(MPI_BOTTOM, 1, recv_types[j], i,
700                          myrank + i + 100 * iter, fd->comm, requests + j);
701                j++;
702            }
703        }
704        send_req = requests + nprocs_recv;
705    }
706
707    /* post sends.
708     * if buftype_is_contig, data can be directly sent from
709     * user buf at location given by buf_idx. else use send_buf.
710     */
711    if (buftype_is_contig) {
712        j = 0;
713        for (i = 0; i < nprocs; i++)
714            if (send_size[i]) {
715                MPI_Isend(((char *) buf) + buf_idx[i], send_size[i],
716                          MPI_BYTE, i, myrank + i + 100 * iter, fd->comm,
717                          send_req + j);
718                j++;
719                buf_idx[i] += send_size[i];
720            }
721    } else if (nprocs_send) {
722        /* buftype is not contig */
723        send_buf = (char **) ADIOI_Malloc(nprocs * sizeof(char *));
724        for (i = 0; i < nprocs; i++)
725            if (send_size[i])
726                send_buf[i] = (char *) ADIOI_Malloc(send_size[i]);
727
728        ADIOI_LUSTRE_Fill_send_buffer(fd, buf, flat_buf, send_buf, offset_list,
729                                      len_list, send_size, send_req,
730                                      sent_to_proc, nprocs, myrank,
731                                      contig_access_count, striping_info,
732                                      send_buf_idx, curr_to_proc, done_to_proc,
733                                      iter, buftype_extent);
734        /* the send is done in ADIOI_Fill_send_buffer */
735    }
736
737        /* bug fix from Wei-keng Liao and Kenin Coloma */
738    if (fd->atomicity) {
739        j = 0;
740        for (i = 0; i < nprocs; i++) {
741            MPI_Status wkl_status;
742            if (recv_size[i]) {
743                MPI_Recv(MPI_BOTTOM, 1, recv_types[j], i,
744                         myrank + i + 100 * iter, fd->comm, &wkl_status);
745                j++;
746            }
747        }
748    }
749
750    for (i = 0; i < nprocs_recv; i++)
751        MPI_Type_free(recv_types + i);
752    ADIOI_Free(recv_types);
753
754        /* bug fix from Wei-keng Liao and Kenin Coloma */
755        /* +1 to avoid a 0-size malloc */
756    if (fd->atomicity) {
757        statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send + 1) *
758                                               sizeof(MPI_Status));
759    } else {
760        statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send + nprocs_recv + 1) *
761                                               sizeof(MPI_Status));
762    }
763
764#ifdef NEEDS_MPI_TEST
765    i = 0;
766    if (fd->atomicity) {
767        /* bug fix from Wei-keng Liao and Kenin Coloma */
768        while (!i)
769            MPI_Testall(nprocs_send, send_req, &i, statuses);
770    } else {
771        while (!i)
772            MPI_Testall(nprocs_send + nprocs_recv, requests, &i, statuses);
773    }
774#else
775        /* bug fix from Wei-keng Liao and Kenin Coloma */
776    if (fd->atomicity)
777        MPI_Waitall(nprocs_send, send_req, statuses);
778    else
779        MPI_Waitall(nprocs_send + nprocs_recv, requests, statuses);
780#endif
781    ADIOI_Free(statuses);
782    ADIOI_Free(requests);
783    if (!buftype_is_contig && nprocs_send) {
784        for (i = 0; i < nprocs; i++)
785            if (send_size[i])
786                ADIOI_Free(send_buf[i]);
787        ADIOI_Free(send_buf);
788    }
789}
790
791#define ADIOI_BUF_INCR \
792{ \
793    while (buf_incr) { \
794        size_in_buf = ADIOI_MIN(buf_incr, flat_buf_sz); \
795        user_buf_idx += size_in_buf; \
796        flat_buf_sz -= size_in_buf; \
797        if (!flat_buf_sz) { \
798            if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
799            else { \
800                flat_buf_idx = 0; \
801                n_buftypes++; \
802            } \
803            user_buf_idx = flat_buf->indices[flat_buf_idx] + \
804                              n_buftypes*buftype_extent; \
805            flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
806        } \
807        buf_incr -= size_in_buf; \
808    } \
809}
810
811
812#define ADIOI_BUF_COPY \
813{ \
814    while (size) { \
815        size_in_buf = ADIOI_MIN(size, flat_buf_sz); \
816        memcpy(&(send_buf[p][send_buf_idx[p]]), \
817               ((char *) buf) + user_buf_idx, size_in_buf); \
818        send_buf_idx[p] += size_in_buf; \
819        user_buf_idx += size_in_buf; \
820        flat_buf_sz -= size_in_buf; \
821        if (!flat_buf_sz) { \
822            if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
823            else { \
824                flat_buf_idx = 0; \
825                n_buftypes++; \
826            } \
827            user_buf_idx = flat_buf->indices[flat_buf_idx] + \
828                              n_buftypes*buftype_extent; \
829            flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
830        } \
831        size -= size_in_buf; \
832        buf_incr -= size_in_buf; \
833    } \
834    ADIOI_BUF_INCR \
835}
836
837static void ADIOI_LUSTRE_Fill_send_buffer(ADIO_File fd, void *buf,
838                                          ADIOI_Flatlist_node *flat_buf,
839                                          char **send_buf,
840                                          ADIO_Offset *offset_list,
841                                          int *len_list, int *send_size,
842                                          MPI_Request *requests,
843                                          int *sent_to_proc, int nprocs,
844                                          int myrank,
845                                          int contig_access_count,
846                                          int *striping_info,
847                                          int *send_buf_idx,
848                                          int *curr_to_proc,
849                                          int *done_to_proc, int iter,
850                                          MPI_Aint buftype_extent)
851{
852    /* this function is only called if buftype is not contig */
853    int i, p, flat_buf_idx, size;
854    int flat_buf_sz, buf_incr, size_in_buf, jj, n_buftypes;
855    ADIO_Offset off, len, rem_len, user_buf_idx;
856
857    /* curr_to_proc[p] = amount of data sent to proc. p that has already
858     * been accounted for so far
859     * done_to_proc[p] = amount of data already sent to proc. p in
860     * previous iterations
861     * user_buf_idx = current location in user buffer
862     * send_buf_idx[p] = current location in send_buf of proc. p
863     */
864
865    for (i = 0; i < nprocs; i++) {
866        send_buf_idx[i] = curr_to_proc[i] = 0;
867        done_to_proc[i] = sent_to_proc[i];
868    }
869    jj = 0;
870
871    user_buf_idx = flat_buf->indices[0];
872    flat_buf_idx = 0;
873    n_buftypes = 0;
874    flat_buf_sz = flat_buf->blocklens[0];
875
876    /* flat_buf_idx = current index into flattened buftype
877     * flat_buf_sz = size of current contiguous component in flattened buf
878     */
879    for (i = 0; i < contig_access_count; i++) {
880        off = offset_list[i];
881        rem_len = (ADIO_Offset) len_list[i];
882
883        /*this request may span to more than one process */
884        while (rem_len != 0) {
885            len = rem_len;
886            /* NOTE: len value is modified by ADIOI_Calc_aggregator() to be no
887             * longer than the single region that processor "p" is responsible
888             * for.
889             */
890            p = ADIOI_LUSTRE_Calc_aggregator(fd, off, &len, striping_info);
891
892            if (send_buf_idx[p] < send_size[p]) {
893                if (curr_to_proc[p] + len > done_to_proc[p]) {
894                    if (done_to_proc[p] > curr_to_proc[p]) {
895                        size = (int) ADIOI_MIN(curr_to_proc[p] + len -
896                                               done_to_proc[p],
897                                               send_size[p] -
898                                               send_buf_idx[p]);
899                        buf_incr = done_to_proc[p] - curr_to_proc[p];
900                        ADIOI_BUF_INCR
901                            buf_incr = (int) (curr_to_proc[p] + len -
902                                              done_to_proc[p]);
903                        curr_to_proc[p] = done_to_proc[p] + size;
904                        ADIOI_BUF_COPY
905                    } else {
906                        size = (int) ADIOI_MIN(len, send_size[p] -
907                                               send_buf_idx[p]);
908                        buf_incr = (int) len;
909                        curr_to_proc[p] += size;
910                        ADIOI_BUF_COPY
911                    }
912                    if (send_buf_idx[p] == send_size[p]) {
913                        MPI_Isend(send_buf[p], send_size[p], MPI_BYTE, p,
914                                  myrank + p + 100 * iter, fd->comm,
915                                  requests + jj);
916                        jj++;
917                    }
918                } else {
919                    curr_to_proc[p] += (int) len;
920                    buf_incr = (int) len;
921                    ADIOI_BUF_INCR
922                }
923            } else {
924                buf_incr = (int) len;
925                ADIOI_BUF_INCR
926            }
927            off += len;
928            rem_len -= len;
929        }
930    }
931    for (i = 0; i < nprocs; i++)
932        if (send_size[i])
933            sent_to_proc[i] = curr_to_proc[i];
934}
Note: See TracBrowser for help on using the browser.