68 for(i = 0; i <
NTask; i ++) {
70 count[i] = entries[i].
base;
81 MPI_Comm_size(Comm, &plan.
NTask);
120 if(iter >= maxiter) {
157 if(!failure && maxiter > 1) {
162 endrun(5,
"Still have %ld particles in exchange list\n", plan9.
nexchange);
178 int lcompact[6] = {0};
188 MPI_Allreduce(lcompact, compact, 6, MPI_INT, MPI_LOR, Comm);
196 char * slotBuf[6] = {NULL, NULL, NULL, NULL, NULL, NULL};
202 message(1,
"Too many particles for exchange: NumPart=%ld count_get = %d count_togo=%d garbage = %d MaxPart=%ld\n",
217 memset(toGoPtr, 0,
sizeof(toGoPtr[0]) * plan->
NTask);
219 for(n = 0; n < plan->
last; n++)
228 int bufPI = toGoPtr[target].
slots[type];
229 toGoPtr[target].
slots[type] ++;
232 memcpy(slotBuf[type] + (bufPI + plan->
toGoOffset[target].
slots[type]) * elsize,
233 (
char*) sman->
info[type].
ptr + pman->
Base[i].
PI * elsize, elsize);
236 toGoPtr[target].
base ++;
250 int compact[6] = {0};
258 int64_t newSlots[6] = {0};
266 if(newNumPart > pman->
MaxPart) {
267 endrun(787878,
"NumPart=%ld MaxPart=%ld\n", newNumPart, pman->
MaxPart);
272 int * sendcounts = (
int*)
ta_malloc(
"sendcounts",
int, plan->
NTask);
273 int * senddispls = (
int*)
ta_malloc(
"senddispls",
int, plan->
NTask);
274 int * recvcounts = (
int*)
ta_malloc(
"recvcounts",
int, plan->
NTask);
275 int * recvdispls = (
int*)
ta_malloc(
"recvdispls",
int, plan->
NTask);
285 message(0,
"Starting particle data exchange\n");
310 ptr + N_slots * elsize,
316 message(0,
"Done with AlltoAllv\n");
319 for(src = 0; src < plan->
NTask; src++) {
342 endrun(1,
"Exchange: P[%d].ID = %ld (type %d) != SLOT ID = %ld. garbage: %d ReverseLink: %d\n",i,pman->
Base[i].
ID, pman->
Base[i].
Type,
BASESLOT_PI(
PI,
ptype, sman)->
ID, pman->
Base[i].
IsGarbage,
BASESLOT_PI(
PI,
ptype, sman)->ReverseLink);
349 endrun(1,
"N_slots mismatched\n");
389 size_t numthreads = omp_get_max_threads();
392 size_t narr = plan->
nexchange/numthreads+numthreads;
397 size_t *nexthr =
ta_malloc(
"nexthr",
size_t, numthreads);
398 int **threx =
ta_malloc(
"threx",
int *, numthreads);
405 const double rel_random_shift[3] = {0};
412 size_t schedsz = plan->
nexchange/numthreads+1;
413 #pragma omp parallel for schedule(static, schedsz) reduction(+: ngarbage)
414 for(i=0; i < pman->
NumPart; i++)
424 int target = layoutfunc(i, layout_userdata);
426 const int tid = omp_get_thread_num();
427 threx[tid][nexthr[tid]] = i;
448 if (nlimit < 4096L * 6 + plan->
NTask * 2 *
sizeof(MPI_Request))
449 endrun(1,
"Not enough memory free to store requests!\n");
451 nlimit -= 4096 * 2L + plan->
NTask * 2 *
sizeof(MPI_Request);
457 message(0,
"Using %td bytes for exchange.\n", nlimit);
462 if (maxsize < sman->info[
ptype].elsize)
467 size_t package = sizeof(pman->Base[0]) + maxsize;
469 endrun(212,
"Package is too large, no free memory: package = %lu nlimit = %lu.", package, nlimit);
473 const size_t maxexch = 1024L*1024L*2030L;
483 size_t slotexch[6] = {0};
489 partexch +=
sizeof(pman->
Base[0]);
491 package += sizeof(pman->Base[0]) + sman->info[ptype].elsize + sizeof(ExchangePartCache);
492 if(package >= nlimit || slotexch[
ptype] >= maxexch || partexch >= maxexch) {
511 #pragma omp parallel for
512 for(n = 0; n < plan->
last; n++)
515 const int target = layoutfunc(i, layout_userdata);
518 if(target >= plan->
NTask || target < 0)
519 endrun(4,
"layoutfunc for %d returned unreasonable %d for %d tasks\n", i, target, plan->
NTask);
523 for(n = 0; n < plan->
last; n++)
537 int64_t maxbasetogo=-1, maxbasetoget=-1;
538 for(rank = 1; rank < plan->
NTask; rank ++) {
545 if(plan->
toGo[rank].
base > maxbasetogo)
546 maxbasetogo = plan->
toGo[rank].
base;
547 if(plan->
toGet[rank].
base > maxbasetoget)
556 int64_t maxbasetogomax, maxbasetogetmax, sumtogo;
557 MPI_Reduce(&maxbasetogo, &maxbasetogomax, 1,
MPI_INT64, MPI_MAX, 0, Comm);
558 MPI_Reduce(&maxbasetoget, &maxbasetogetmax, 1,
MPI_INT64, MPI_MAX, 0, Comm);
560 message(0,
"iter = %d Total particles in flight: %ld Largest togo: %ld, toget %ld\n", iter, sumtogo, maxbasetogomax, maxbasetogetmax);
566 ((uint64_t *) radix)[0] = ((
MyIDType*) data)[0];
575 MPI_Comm_size(MPI_COMM_WORLD, &
NTask);
576 MPI_Comm_rank(MPI_COMM_WORLD, &
ThisTask);
578 message(0,
"Testing ID uniqueness...\n");
582 #pragma omp parallel for
583 for(i = 0; i < pman->
NumPart; i++) {
584 ids[i] = pman->
Base[i].
ID;
593 while(nids > 0 && (ids[nids-1] == (
MyIDType)-1)) {
597 #pragma omp parallel for
598 for(i = 1; i < nids; i++) {
599 if(ids[i] <= ids[i - 1])
601 endrun(12,
"non-unique (or non-ordered) ID=%013ld found on task=%d (i=%d NumPart=%ld)\n",
608 const int TAG = 0xdead;
616 MPI_Send(ptr,
sizeof(
MyIDType), MPI_BYTE,
ThisTask + 1, TAG, MPI_COMM_WORLD);
619 MPI_Recv(prev,
sizeof(
MyIDType), MPI_BYTE,
620 ThisTask - 1, TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
624 MPI_Recv(prev,
sizeof(
MyIDType), MPI_BYTE,
ThisTask - 1, TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
625 MPI_Send(prev,
sizeof(
MyIDType), MPI_BYTE,
ThisTask + 1, TAG, MPI_COMM_WORLD);
630 ids+(nids - 1),
sizeof(
MyIDType), MPI_BYTE,
633 ThisTask - 1, TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
639 if(ids[0] <= *prev && ids[0])
640 endrun(13,
"non-unique ID=%ld found on task=%d\n", ids[0],
ThisTask);
void real_drift_particle(struct particle_data *pp, struct slots_manager_type *sman, const double ddrift, const double BoxSize, const double random_shift[3])
void message(int where, const char *fmt,...)
void endrun(int where, const char *fmt,...)
static void domain_build_plan(int iter, ExchangeLayoutFunc layoutfunc, const void *layout_userdata, ExchangePlan *plan, struct part_manager_type *pman, MPI_Comm Comm)
static ExchangePlan domain_init_exchangeplan(MPI_Comm Comm)
static void shall_we_compact_slots(int *compact, ExchangePlan *plan, const struct slots_manager_type *sman, MPI_Comm Comm)
void domain_test_id_uniqueness(struct part_manager_type *pman)
static int domain_exchange_once(ExchangePlan *plan, int do_gc, struct part_manager_type *pman, struct slots_manager_type *sman, MPI_Comm Comm)
static void domain_build_exchange_list(ExchangeLayoutFunc layoutfunc, const void *layout_userdata, ExchangePlan *plan, struct DriftData *drift, struct part_manager_type *pman, struct slots_manager_type *sman, MPI_Comm Comm)
static MPI_Datatype MPI_TYPE_PLAN_ENTRY
static void _transpose_plan_entries(ExchangePlanEntry *entries, int *count, int ptype, int NTask)
static void domain_free_exchangeplan(ExchangePlan *plan)
static size_t domain_find_iter_space(ExchangePlan *plan, const struct part_manager_type *pman, const struct slots_manager_type *sman)
static void mp_order_by_id(const void *data, void *radix, void *arg)
int domain_exchange(ExchangeLayoutFunc layoutfunc, const void *layout_userdata, int do_gc, struct DriftData *drift, struct part_manager_type *pman, struct slots_manager_type *sman, int maxiter, MPI_Comm Comm)
int(* ExchangeLayoutFunc)(int p, const void *userdata)
#define mpsort_mpi(base, nmemb, elsize, radix, rsize, arg, comm)
#define mymalloc(name, size)
#define ta_malloc(name, type, nele)
#define myrealloc(ptr, size)
#define mymalloc2(name, size)
#define mymalloc_freebytes()
size_t slots_reserve(int where, int64_t atleast[6], struct slots_manager_type *sman)
MPI_Datatype MPI_TYPE_SLOT[6]
MPI_Datatype MPI_TYPE_PARTICLE
void slots_mark_garbage(int i, struct part_manager_type *pman, struct slots_manager_type *sman)
int slots_gc(int *compact_slots, struct part_manager_type *pman, struct slots_manager_type *sman)
void slots_check_id_consistency(struct part_manager_type *pman, struct slots_manager_type *sman)
#define BASESLOT_PI(PI, ptype, sman)
ExchangePlanEntry toGetSum
ExchangePlanEntry toGoSum
ExchangePartCache * layouts
ExchangePlanEntry * toGet
ExchangePlanEntry * toGetOffset
ExchangePlanEntry * toGoOffset
struct particle_data * Base
void gadget_setup_thread_arrays(int *dest, int *srcs[], size_t sizes[], size_t total_size, int narrays)
int MPI_Alltoallv_sparse(void *sendbuf, int *sendcnts, int *sdispls, MPI_Datatype sendtype, void *recvbuf, int *recvcnts, int *rdispls, MPI_Datatype recvtype, MPI_Comm comm)
size_t gadget_compact_thread_arrays(int *dest, int *srcs[], size_t sizes[], int narrays)
int MPIU_Any(int condition, MPI_Comm comm)
double get_exact_drift_factor(Cosmology *CP, inttime_t ti0, inttime_t ti1)
#define walltime_measure(name)
static enum TransferType ptype