| 1 | /* -*- Mode: C; c-basic-offset:4 ; -*- */ |
|---|
| 2 | /* |
|---|
| 3 | * Copyright (C) 1997 University of Chicago. |
|---|
| 4 | * See COPYRIGHT notice in top-level directory. |
|---|
| 5 | * |
|---|
| 6 | * Copyright (C) 2007 Oak Ridge National Laboratory |
|---|
| 7 | * |
|---|
| 8 | * Copyright (C) 2008 Sun Microsystems, Lustre group |
|---|
| 9 | */ |
|---|
| 10 | |
|---|
| 11 | #include "ad_lustre.h" |
|---|
| 12 | #include "adio_extern.h" |
|---|
| 13 | |
|---|
| 14 | #undef AGG_DEBUG |
|---|
| 15 | |
|---|
| 16 | void ADIOI_LUSTRE_Get_striping_info(ADIO_File fd, int **striping_info_ptr, |
|---|
| 17 | int mode) |
|---|
| 18 | { |
|---|
| 19 | int *striping_info = NULL; |
|---|
| 20 | /* get striping information: |
|---|
| 21 | * striping_info[0]: stripe_size |
|---|
| 22 | * striping_info[1]: stripe_count |
|---|
| 23 | * striping_info[2]: avail_cb_nodes |
|---|
| 24 | */ |
|---|
| 25 | int stripe_size, stripe_count, CO = 1, CO_max = 1, CO_nodes, lflag; |
|---|
| 26 | int avail_cb_nodes, divisor, nprocs_for_coll = fd->hints->cb_nodes; |
|---|
| 27 | char *value = (char *) ADIOI_Malloc((MPI_MAX_INFO_VAL + 1) * sizeof(char)); |
|---|
| 28 | |
|---|
| 29 | /* Get hints value */ |
|---|
| 30 | /* stripe size */ |
|---|
| 31 | stripe_size = fd->hints->striping_unit; |
|---|
| 32 | /* stripe count */ |
|---|
| 33 | /* stripe_size and stripe_count have been validated in ADIOI_LUSTRE_Open() */ |
|---|
| 34 | stripe_count = fd->hints->striping_factor; |
|---|
| 35 | |
|---|
| 36 | /* Calculate the available number of I/O clients, that is |
|---|
| 37 | * avail_cb_nodes=min(cb_nodes, stripe_count*CO), where |
|---|
| 38 | * CO=1 by default |
|---|
| 39 | */ |
|---|
| 40 | if (!mode) { |
|---|
| 41 | /* for collective read, |
|---|
| 42 | * if "CO" clients access the same OST simultaneously, |
|---|
| 43 | * the OST disk seek time would be much. So, to avoid this, |
|---|
| 44 | * it might be better if 1 client only accesses 1 OST. |
|---|
| 45 | * So, we set CO = 1 to meet the above requirement. |
|---|
| 46 | */ |
|---|
| 47 | CO = 1; |
|---|
| 48 | /*XXX: maybe there are other better way for collective read */ |
|---|
| 49 | } else { |
|---|
| 50 | /* CO_max: the largest number of IO clients for each ost group */ |
|---|
| 51 | CO_max = (nprocs_for_coll - 1)/ stripe_count + 1; |
|---|
| 52 | /* CO also has been validated in ADIOI_LUSTRE_Open(), >0 */ |
|---|
| 53 | CO = fd->hints->fs_hints.lustre.co_ratio; |
|---|
| 54 | CO = ADIOI_MIN(CO_max, CO); |
|---|
| 55 | } |
|---|
| 56 | /* Calculate how many IO clients we need */ |
|---|
| 57 | /* To avoid extent lock conflicts, |
|---|
| 58 | * avail_cb_nodes should divide (stripe_count*CO) exactly, |
|---|
| 59 | * so that each OST is accessed by only one or more constant clients. */ |
|---|
| 60 | CO_nodes = stripe_count * CO; |
|---|
| 61 | avail_cb_nodes = ADIOI_MIN(nprocs_for_coll, CO_nodes); |
|---|
| 62 | if (avail_cb_nodes == CO_nodes) { |
|---|
| 63 | do { |
|---|
| 64 | /* find the divisor of CO_nodes */ |
|---|
| 65 | divisor = 1; |
|---|
| 66 | do { |
|---|
| 67 | divisor ++; |
|---|
| 68 | } while (CO_nodes % divisor); |
|---|
| 69 | CO_nodes = CO_nodes / divisor; |
|---|
| 70 | /* if stripe_count*CO is a prime number, change nothing */ |
|---|
| 71 | if ((CO_nodes <= avail_cb_nodes) && (CO_nodes != 1)) { |
|---|
| 72 | avail_cb_nodes = CO_nodes; |
|---|
| 73 | break; |
|---|
| 74 | } |
|---|
| 75 | } while (CO_nodes != 1); |
|---|
| 76 | } |
|---|
| 77 | |
|---|
| 78 | *striping_info_ptr = (int *) ADIOI_Malloc(3 * sizeof(int)); |
|---|
| 79 | striping_info = *striping_info_ptr; |
|---|
| 80 | striping_info[0] = stripe_size; |
|---|
| 81 | striping_info[1] = stripe_count; |
|---|
| 82 | striping_info[2] = avail_cb_nodes; |
|---|
| 83 | |
|---|
| 84 | ADIOI_Free(value); |
|---|
| 85 | } |
|---|
| 86 | |
|---|
| 87 | int ADIOI_LUSTRE_Calc_aggregator(ADIO_File fd, ADIO_Offset off, |
|---|
| 88 | ADIO_Offset *len, int *striping_info) |
|---|
| 89 | { |
|---|
| 90 | int rank_index, rank; |
|---|
| 91 | ADIO_Offset avail_bytes; |
|---|
| 92 | int stripe_size = striping_info[0]; |
|---|
| 93 | int avail_cb_nodes = striping_info[2]; |
|---|
| 94 | |
|---|
| 95 | /* Produce the stripe-contiguous pattern for Lustre */ |
|---|
| 96 | rank_index = (int)((off / stripe_size) % avail_cb_nodes); |
|---|
| 97 | |
|---|
| 98 | /* we index into fd_end with rank_index, and fd_end was allocated to be no |
|---|
| 99 | * bigger than fd->hins->cb_nodes. If we ever violate that, we're |
|---|
| 100 | * overrunning arrays. Obviously, we should never ever hit this abort |
|---|
| 101 | */ |
|---|
| 102 | if (rank_index >= fd->hints->cb_nodes) |
|---|
| 103 | MPI_Abort(MPI_COMM_WORLD, 1); |
|---|
| 104 | |
|---|
| 105 | avail_bytes = (off / (ADIO_Offset)stripe_size + 1) * |
|---|
| 106 | (ADIO_Offset)stripe_size - off; |
|---|
| 107 | if (avail_bytes < *len) { |
|---|
| 108 | /* this proc only has part of the requested contig. region */ |
|---|
| 109 | *len = avail_bytes; |
|---|
| 110 | } |
|---|
| 111 | /* map our index to a rank */ |
|---|
| 112 | /* NOTE: FOR NOW WE DON'T HAVE A MAPPING...JUST DO 0..NPROCS_FOR_COLL */ |
|---|
| 113 | rank = fd->hints->ranklist[rank_index]; |
|---|
| 114 | |
|---|
| 115 | return rank; |
|---|
| 116 | } |
|---|
| 117 | |
|---|
| 118 | /* ADIOI_LUSTRE_Calc_my_req() - calculate what portions of the access requests |
|---|
| 119 | * of this process are located in the file domains of various processes |
|---|
| 120 | * (including this one) |
|---|
| 121 | */ |
|---|
| 122 | void ADIOI_LUSTRE_Calc_my_req(ADIO_File fd, ADIO_Offset *offset_list, |
|---|
| 123 | int *len_list, int contig_access_count, |
|---|
| 124 | int *striping_info, int nprocs, |
|---|
| 125 | int *count_my_req_procs_ptr, |
|---|
| 126 | int **count_my_req_per_proc_ptr, |
|---|
| 127 | ADIOI_Access **my_req_ptr, |
|---|
| 128 | int **buf_idx_ptr) |
|---|
| 129 | { |
|---|
| 130 | /* Nothing different from ADIOI_Calc_my_req(), except calling |
|---|
| 131 | * ADIOI_Lustre_Calc_aggregator() instead of the old one */ |
|---|
| 132 | int *count_my_req_per_proc, count_my_req_procs, *buf_idx; |
|---|
| 133 | int i, l, proc; |
|---|
| 134 | ADIO_Offset avail_len, rem_len, curr_idx, off; |
|---|
| 135 | ADIOI_Access *my_req; |
|---|
| 136 | |
|---|
| 137 | *count_my_req_per_proc_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int)); |
|---|
| 138 | count_my_req_per_proc = *count_my_req_per_proc_ptr; |
|---|
| 139 | /* count_my_req_per_proc[i] gives the no. of contig. requests of this |
|---|
| 140 | * process in process i's file domain. calloc initializes to zero. |
|---|
| 141 | * I'm allocating memory of size nprocs, so that I can do an |
|---|
| 142 | * MPI_Alltoall later on. |
|---|
| 143 | */ |
|---|
| 144 | |
|---|
| 145 | buf_idx = (int *) ADIOI_Malloc(nprocs * sizeof(int)); |
|---|
| 146 | /* buf_idx is relevant only if buftype_is_contig. |
|---|
| 147 | * buf_idx[i] gives the index into user_buf where data received |
|---|
| 148 | * from proc. i should be placed. This allows receives to be done |
|---|
| 149 | * without extra buffer. This can't be done if buftype is not contig. |
|---|
| 150 | */ |
|---|
| 151 | |
|---|
| 152 | /* initialize buf_idx to -1 */ |
|---|
| 153 | for (i = 0; i < nprocs; i++) |
|---|
| 154 | buf_idx[i] = -1; |
|---|
| 155 | |
|---|
| 156 | /* one pass just to calculate how much space to allocate for my_req; |
|---|
| 157 | * contig_access_count was calculated way back in ADIOI_Calc_my_off_len() |
|---|
| 158 | */ |
|---|
| 159 | for (i = 0; i < contig_access_count; i++) { |
|---|
| 160 | /* short circuit offset/len processing if len == 0 |
|---|
| 161 | * (zero-byte read/write |
|---|
| 162 | */ |
|---|
| 163 | if (len_list[i] == 0) |
|---|
| 164 | continue; |
|---|
| 165 | off = offset_list[i]; |
|---|
| 166 | avail_len = len_list[i]; |
|---|
| 167 | /* note: we set avail_len to be the total size of the access. |
|---|
| 168 | * then ADIOI_LUSTRE_Calc_aggregator() will modify the value to return |
|---|
| 169 | * the amount that was available. |
|---|
| 170 | */ |
|---|
| 171 | proc = ADIOI_LUSTRE_Calc_aggregator(fd, off, &avail_len, striping_info); |
|---|
| 172 | count_my_req_per_proc[proc]++; |
|---|
| 173 | |
|---|
| 174 | /* figure out how many data is remaining in the access |
|---|
| 175 | * we'll take care of this data (if there is any) |
|---|
| 176 | * in the while loop below. |
|---|
| 177 | */ |
|---|
| 178 | rem_len = len_list[i] - avail_len; |
|---|
| 179 | |
|---|
| 180 | while (rem_len != 0) { |
|---|
| 181 | off += avail_len; /* point to first remaining byte */ |
|---|
| 182 | avail_len = rem_len; /* save remaining size, pass to calc */ |
|---|
| 183 | proc = ADIOI_LUSTRE_Calc_aggregator(fd, off, &avail_len, striping_info); |
|---|
| 184 | count_my_req_per_proc[proc]++; |
|---|
| 185 | rem_len -= avail_len; /* reduce remaining length by amount from fd */ |
|---|
| 186 | } |
|---|
| 187 | } |
|---|
| 188 | |
|---|
| 189 | /* now allocate space for my_req, offset, and len */ |
|---|
| 190 | *my_req_ptr = (ADIOI_Access *) ADIOI_Malloc(nprocs * sizeof(ADIOI_Access)); |
|---|
| 191 | my_req = *my_req_ptr; |
|---|
| 192 | |
|---|
| 193 | count_my_req_procs = 0; |
|---|
| 194 | for (i = 0; i < nprocs; i++) { |
|---|
| 195 | if (count_my_req_per_proc[i]) { |
|---|
| 196 | my_req[i].offsets = (ADIO_Offset *) |
|---|
| 197 | ADIOI_Malloc(count_my_req_per_proc[i] * |
|---|
| 198 | sizeof(ADIO_Offset)); |
|---|
| 199 | my_req[i].lens = (int *) ADIOI_Malloc(count_my_req_per_proc[i] * |
|---|
| 200 | sizeof(int)); |
|---|
| 201 | count_my_req_procs++; |
|---|
| 202 | } |
|---|
| 203 | my_req[i].count = 0; /* will be incremented where needed later */ |
|---|
| 204 | } |
|---|
| 205 | |
|---|
| 206 | /* now fill in my_req */ |
|---|
| 207 | curr_idx = 0; |
|---|
| 208 | for (i = 0; i < contig_access_count; i++) { |
|---|
| 209 | /* short circuit offset/len processing if len == 0 |
|---|
| 210 | * (zero-byte read/write */ |
|---|
| 211 | if (len_list[i] == 0) |
|---|
| 212 | continue; |
|---|
| 213 | off = offset_list[i]; |
|---|
| 214 | avail_len = len_list[i]; |
|---|
| 215 | proc = ADIOI_LUSTRE_Calc_aggregator(fd, off, &avail_len, striping_info); |
|---|
| 216 | |
|---|
| 217 | /* for each separate contiguous access from this process */ |
|---|
| 218 | if (buf_idx[proc] == -1) |
|---|
| 219 | buf_idx[proc] = (int) curr_idx; |
|---|
| 220 | |
|---|
| 221 | l = my_req[proc].count; |
|---|
| 222 | curr_idx += (int) avail_len; /* NOTE: Why is curr_idx an int? Fix? */ |
|---|
| 223 | |
|---|
| 224 | rem_len = len_list[i] - avail_len; |
|---|
| 225 | |
|---|
| 226 | /* store the proc, offset, and len information in an array |
|---|
| 227 | * of structures, my_req. Each structure contains the |
|---|
| 228 | * offsets and lengths located in that process's FD, |
|---|
| 229 | * and the associated count. |
|---|
| 230 | */ |
|---|
| 231 | my_req[proc].offsets[l] = off; |
|---|
| 232 | my_req[proc].lens[l] = (int) avail_len; |
|---|
| 233 | my_req[proc].count++; |
|---|
| 234 | |
|---|
| 235 | while (rem_len != 0) { |
|---|
| 236 | off += avail_len; |
|---|
| 237 | avail_len = rem_len; |
|---|
| 238 | proc = ADIOI_LUSTRE_Calc_aggregator(fd, off, &avail_len, |
|---|
| 239 | striping_info); |
|---|
| 240 | if (buf_idx[proc] == -1) |
|---|
| 241 | buf_idx[proc] = (int) curr_idx; |
|---|
| 242 | |
|---|
| 243 | l = my_req[proc].count; |
|---|
| 244 | curr_idx += avail_len; |
|---|
| 245 | rem_len -= avail_len; |
|---|
| 246 | |
|---|
| 247 | my_req[proc].offsets[l] = off; |
|---|
| 248 | my_req[proc].lens[l] = (int) avail_len; |
|---|
| 249 | my_req[proc].count++; |
|---|
| 250 | } |
|---|
| 251 | } |
|---|
| 252 | |
|---|
| 253 | #ifdef AGG_DEBUG |
|---|
| 254 | for (i = 0; i < nprocs; i++) { |
|---|
| 255 | if (count_my_req_per_proc[i] > 0) { |
|---|
| 256 | FPRINTF(stdout, "data needed from %d (count = %d):\n", |
|---|
| 257 | i, my_req[i].count); |
|---|
| 258 | for (l = 0; l < my_req[i].count; l++) { |
|---|
| 259 | FPRINTF(stdout, " off[%d] = %lld, len[%d] = %d\n", |
|---|
| 260 | l, my_req[i].offsets[l], l, my_req[i].lens[l]); |
|---|
| 261 | } |
|---|
| 262 | } |
|---|
| 263 | } |
|---|
| 264 | #if 0 |
|---|
| 265 | for (i = 0; i < nprocs; i++) { |
|---|
| 266 | FPRINTF(stdout, "buf_idx[%d] = 0x%x\n", i, buf_idx[i]); |
|---|
| 267 | } |
|---|
| 268 | #endif |
|---|
| 269 | #endif |
|---|
| 270 | |
|---|
| 271 | *count_my_req_procs_ptr = count_my_req_procs; |
|---|
| 272 | *buf_idx_ptr = buf_idx; |
|---|
| 273 | } |
|---|
| 274 | |
|---|
| 275 | int ADIOI_LUSTRE_Docollect(ADIO_File fd, int contig_access_count, |
|---|
| 276 | int *len_list, int nprocs) |
|---|
| 277 | { |
|---|
| 278 | /* If the processes are non-interleaved, we will check the req_size. |
|---|
| 279 | * if (avg_req_size > big_req_size) { |
|---|
| 280 | * docollect = 0; |
|---|
| 281 | * } |
|---|
| 282 | */ |
|---|
| 283 | |
|---|
| 284 | int i, docollect = 1, lflag, big_req_size = 0; |
|---|
| 285 | ADIO_Offset req_size = 0, total_req_size; |
|---|
| 286 | int avg_req_size, total_access_count; |
|---|
| 287 | |
|---|
| 288 | /* calculate total_req_size and total_access_count */ |
|---|
| 289 | for (i = 0; i < contig_access_count; i++) |
|---|
| 290 | req_size += len_list[i]; |
|---|
| 291 | MPI_Allreduce(&req_size, &total_req_size, 1, MPI_LONG_LONG_INT, MPI_SUM, |
|---|
| 292 | fd->comm); |
|---|
| 293 | MPI_Allreduce(&contig_access_count, &total_access_count, 1, MPI_INT, MPI_SUM, |
|---|
| 294 | fd->comm); |
|---|
| 295 | /* estimate average req_size */ |
|---|
| 296 | avg_req_size = (int)(total_req_size / total_access_count); |
|---|
| 297 | /* get hint of big_req_size */ |
|---|
| 298 | big_req_size = fd->hints->fs_hints.lustre.coll_threshold; |
|---|
| 299 | /* Don't perform collective I/O if there are big requests */ |
|---|
| 300 | if ((big_req_size > 0) && (avg_req_size > big_req_size)) |
|---|
| 301 | docollect = 0; |
|---|
| 302 | |
|---|
| 303 | return docollect; |
|---|
| 304 | } |
|---|