root/mpich2/trunk/src/mpi/romio/adio/ad_lustre/ad_lustre_aggregate.c @ 4458

Revision 4458, 10.3 KB (checked in by robl, 7 months ago)

updates from Sun for Lustre:
- rename a couple hints to be more meaningful
- remove ADIOI_LUSTRE_Calc_others_req and related hints until we can figure out

a good way to do this that doesn't voilate the "we must be correct if user
lies to us via hints" rule.

Line 
1/* -*- Mode: C; c-basic-offset:4 ; -*- */
2/*
3 *   Copyright (C) 1997 University of Chicago.
4 *   See COPYRIGHT notice in top-level directory.
5 *
6 *   Copyright (C) 2007 Oak Ridge National Laboratory
7 *
8 *   Copyright (C) 2008 Sun Microsystems, Lustre group
9 */
10
11#include "ad_lustre.h"
12#include "adio_extern.h"
13
14#undef AGG_DEBUG
15
16void ADIOI_LUSTRE_Get_striping_info(ADIO_File fd, int **striping_info_ptr,
17                                    int mode)
18{
19    int *striping_info = NULL;
20    /* get striping information:
21     *  striping_info[0]: stripe_size
22     *  striping_info[1]: stripe_count
23     *  striping_info[2]: avail_cb_nodes
24     */
25    int stripe_size, stripe_count, CO = 1, CO_max = 1, CO_nodes, lflag;
26    int avail_cb_nodes, divisor, nprocs_for_coll = fd->hints->cb_nodes;
27    char *value = (char *) ADIOI_Malloc((MPI_MAX_INFO_VAL + 1) * sizeof(char));
28
29    /* Get hints value */
30    /* stripe size */
31    stripe_size = fd->hints->striping_unit;
32    /* stripe count */
33    /* stripe_size and stripe_count have been validated in ADIOI_LUSTRE_Open() */
34    stripe_count = fd->hints->striping_factor;
35
36    /* Calculate the available number of I/O clients, that is
37     *  avail_cb_nodes=min(cb_nodes, stripe_count*CO), where
38     *  CO=1 by default
39     */
40    if (!mode) {
41        /* for collective read,
42         * if "CO" clients access the same OST simultaneously,
43         * the OST disk seek time would be much. So, to avoid this,
44         * it might be better if 1 client only accesses 1 OST.
45         * So, we set CO = 1 to meet the above requirement.
46         */
47        CO = 1;
48        /*XXX: maybe there are other better way for collective read */
49    } else {
50        /* CO_max: the largest number of IO clients for each ost group */
51        CO_max = (nprocs_for_coll - 1)/ stripe_count + 1;
52        /* CO also has been validated in ADIOI_LUSTRE_Open(), >0 */
53        CO = fd->hints->fs_hints.lustre.co_ratio;
54        CO = ADIOI_MIN(CO_max, CO);
55    }
56    /* Calculate how many IO clients we need */
57    /* To avoid extent lock conflicts,
58     * avail_cb_nodes should divide (stripe_count*CO) exactly,
59     * so that each OST is accessed by only one or more constant clients. */
60    CO_nodes = stripe_count * CO;
61    avail_cb_nodes = ADIOI_MIN(nprocs_for_coll, CO_nodes);
62    if (avail_cb_nodes ==  CO_nodes) {
63        do {
64            /* find the divisor of CO_nodes */
65            divisor = 1;
66            do {
67                divisor ++;
68            } while (CO_nodes % divisor);
69            CO_nodes = CO_nodes / divisor;
70            /* if stripe_count*CO is a prime number, change nothing */
71            if ((CO_nodes <= avail_cb_nodes) && (CO_nodes != 1)) {
72                avail_cb_nodes = CO_nodes;
73                break;
74            }
75        } while (CO_nodes != 1);
76    }
77
78    *striping_info_ptr = (int *) ADIOI_Malloc(3 * sizeof(int));
79    striping_info = *striping_info_ptr;
80    striping_info[0] = stripe_size;
81    striping_info[1] = stripe_count;
82    striping_info[2] = avail_cb_nodes;
83
84    ADIOI_Free(value);
85}
86
87int ADIOI_LUSTRE_Calc_aggregator(ADIO_File fd, ADIO_Offset off,
88                                 ADIO_Offset *len, int *striping_info)
89{
90    int rank_index, rank;
91    ADIO_Offset avail_bytes;
92    int stripe_size = striping_info[0];
93    int avail_cb_nodes = striping_info[2];
94
95    /* Produce the stripe-contiguous pattern for Lustre */
96    rank_index = (int)((off / stripe_size) % avail_cb_nodes);
97
98    /* we index into fd_end with rank_index, and fd_end was allocated to be no
99     * bigger than fd->hins->cb_nodes.   If we ever violate that, we're
100     * overrunning arrays.  Obviously, we should never ever hit this abort
101     */
102    if (rank_index >= fd->hints->cb_nodes)
103            MPI_Abort(MPI_COMM_WORLD, 1);
104
105    avail_bytes = (off / (ADIO_Offset)stripe_size + 1) *
106                  (ADIO_Offset)stripe_size - off;
107    if (avail_bytes < *len) {
108        /* this proc only has part of the requested contig. region */
109        *len = avail_bytes;
110    }
111    /* map our index to a rank */
112    /* NOTE: FOR NOW WE DON'T HAVE A MAPPING...JUST DO 0..NPROCS_FOR_COLL */
113    rank = fd->hints->ranklist[rank_index];
114
115    return rank;
116}
117
118/* ADIOI_LUSTRE_Calc_my_req() - calculate what portions of the access requests
119 * of this process are located in the file domains of various processes
120 * (including this one)
121 */
122void ADIOI_LUSTRE_Calc_my_req(ADIO_File fd, ADIO_Offset *offset_list,
123                              int *len_list, int contig_access_count,
124                              int *striping_info, int nprocs,
125                              int *count_my_req_procs_ptr,
126                              int **count_my_req_per_proc_ptr,
127                              ADIOI_Access **my_req_ptr,
128                              int **buf_idx_ptr)
129{
130    /* Nothing different from ADIOI_Calc_my_req(), except calling
131     * ADIOI_Lustre_Calc_aggregator() instead of the old one */
132    int *count_my_req_per_proc, count_my_req_procs, *buf_idx;
133    int i, l, proc;
134    ADIO_Offset avail_len, rem_len, curr_idx, off;
135    ADIOI_Access *my_req;
136
137    *count_my_req_per_proc_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int));
138    count_my_req_per_proc = *count_my_req_per_proc_ptr;
139    /* count_my_req_per_proc[i] gives the no. of contig. requests of this
140     * process in process i's file domain. calloc initializes to zero.
141     * I'm allocating memory of size nprocs, so that I can do an
142     * MPI_Alltoall later on.
143     */
144
145    buf_idx = (int *) ADIOI_Malloc(nprocs * sizeof(int));
146    /* buf_idx is relevant only if buftype_is_contig.
147     * buf_idx[i] gives the index into user_buf where data received
148     * from proc. i should be placed. This allows receives to be done
149     * without extra buffer. This can't be done if buftype is not contig.
150     */
151
152    /* initialize buf_idx to -1 */
153    for (i = 0; i < nprocs; i++)
154        buf_idx[i] = -1;
155
156    /* one pass just to calculate how much space to allocate for my_req;
157     * contig_access_count was calculated way back in ADIOI_Calc_my_off_len()
158     */
159    for (i = 0; i < contig_access_count; i++) {
160        /* short circuit offset/len processing if len == 0
161         * (zero-byte  read/write
162         */
163        if (len_list[i] == 0)
164            continue;
165        off = offset_list[i];
166        avail_len = len_list[i];
167        /* note: we set avail_len to be the total size of the access.
168         * then ADIOI_LUSTRE_Calc_aggregator() will modify the value to return
169         * the amount that was available.
170         */
171        proc = ADIOI_LUSTRE_Calc_aggregator(fd, off, &avail_len, striping_info);
172        count_my_req_per_proc[proc]++;
173
174        /* figure out how many data is remaining in the access
175         * we'll take care of this data (if there is any)
176         * in the while loop below.
177         */
178        rem_len = len_list[i] - avail_len;
179
180        while (rem_len != 0) {
181            off += avail_len;   /* point to first remaining byte */
182            avail_len = rem_len;        /* save remaining size, pass to calc */
183            proc = ADIOI_LUSTRE_Calc_aggregator(fd, off, &avail_len, striping_info);
184            count_my_req_per_proc[proc]++;
185            rem_len -= avail_len;       /* reduce remaining length by amount from fd */
186        }
187    }
188
189    /* now allocate space for my_req, offset, and len */
190    *my_req_ptr = (ADIOI_Access *) ADIOI_Malloc(nprocs * sizeof(ADIOI_Access));
191    my_req = *my_req_ptr;
192
193    count_my_req_procs = 0;
194    for (i = 0; i < nprocs; i++) {
195        if (count_my_req_per_proc[i]) {
196            my_req[i].offsets = (ADIO_Offset *)
197                                ADIOI_Malloc(count_my_req_per_proc[i] *
198                                             sizeof(ADIO_Offset));
199            my_req[i].lens = (int *) ADIOI_Malloc(count_my_req_per_proc[i] *
200                                                  sizeof(int));
201            count_my_req_procs++;
202        }
203        my_req[i].count = 0;    /* will be incremented where needed later */
204    }
205
206    /* now fill in my_req */
207    curr_idx = 0;
208    for (i = 0; i < contig_access_count; i++) {
209        /* short circuit offset/len processing if len == 0
210         *      (zero-byte  read/write */
211        if (len_list[i] == 0)
212            continue;
213        off = offset_list[i];
214        avail_len = len_list[i];
215        proc = ADIOI_LUSTRE_Calc_aggregator(fd, off, &avail_len, striping_info);
216
217        /* for each separate contiguous access from this process */
218        if (buf_idx[proc] == -1)
219            buf_idx[proc] = (int) curr_idx;
220
221        l = my_req[proc].count;
222        curr_idx += (int) avail_len;    /* NOTE: Why is curr_idx an int?  Fix? */
223
224        rem_len = len_list[i] - avail_len;
225
226        /* store the proc, offset, and len information in an array
227         * of structures, my_req. Each structure contains the
228         * offsets and lengths located in that process's FD,
229         * and the associated count.
230         */
231        my_req[proc].offsets[l] = off;
232        my_req[proc].lens[l] = (int) avail_len;
233        my_req[proc].count++;
234
235        while (rem_len != 0) {
236            off += avail_len;
237            avail_len = rem_len;
238            proc = ADIOI_LUSTRE_Calc_aggregator(fd, off, &avail_len,
239                                                striping_info);
240            if (buf_idx[proc] == -1)
241                buf_idx[proc] = (int) curr_idx;
242
243            l = my_req[proc].count;
244            curr_idx += avail_len;
245            rem_len -= avail_len;
246
247            my_req[proc].offsets[l] = off;
248            my_req[proc].lens[l] = (int) avail_len;
249            my_req[proc].count++;
250        }
251    }
252
253#ifdef AGG_DEBUG
254    for (i = 0; i < nprocs; i++) {
255        if (count_my_req_per_proc[i] > 0) {
256            FPRINTF(stdout, "data needed from %d (count = %d):\n",
257                            i, my_req[i].count);
258            for (l = 0; l < my_req[i].count; l++) {
259                FPRINTF(stdout, "   off[%d] = %lld, len[%d] = %d\n",
260                                l, my_req[i].offsets[l], l, my_req[i].lens[l]);
261            }
262        }
263    }
264#if 0
265    for (i = 0; i < nprocs; i++) {
266        FPRINTF(stdout, "buf_idx[%d] = 0x%x\n", i, buf_idx[i]);
267    }
268#endif
269#endif
270
271    *count_my_req_procs_ptr = count_my_req_procs;
272    *buf_idx_ptr = buf_idx;
273}
274
275int ADIOI_LUSTRE_Docollect(ADIO_File fd, int contig_access_count,
276                           int *len_list, int nprocs)
277{
278    /* If the processes are non-interleaved, we will check the req_size.
279     *   if (avg_req_size > big_req_size) {
280     *       docollect = 0;
281     *   }
282     */
283
284    int i, docollect = 1, lflag, big_req_size = 0;
285    ADIO_Offset req_size = 0, total_req_size;
286    int avg_req_size, total_access_count;
287
288    /* calculate total_req_size and total_access_count */
289    for (i = 0; i < contig_access_count; i++)
290        req_size += len_list[i];
291    MPI_Allreduce(&req_size, &total_req_size, 1, MPI_LONG_LONG_INT, MPI_SUM,
292               fd->comm);
293    MPI_Allreduce(&contig_access_count, &total_access_count, 1, MPI_INT, MPI_SUM,
294               fd->comm);
295    /* estimate average req_size */
296    avg_req_size = (int)(total_req_size / total_access_count);
297    /* get hint of big_req_size */
298    big_req_size = fd->hints->fs_hints.lustre.coll_threshold;
299    /* Don't perform collective I/O if there are big requests */
300    if ((big_req_size > 0) && (avg_req_size > big_req_size))
301        docollect = 0;
302
303    return docollect;
304}
Note: See TracBrowser for help on using the browser.