MP-Gadget  5.0.1.dev1-76bc7d4726-dirty
Classes | Functions | Variables
exchange.c File Reference
#include <mpi.h>
#include <omp.h>
#include <string.h>
#include "exchange.h"
#include "slotsmanager.h"
#include "partmanager.h"
#include "walltime.h"
#include "drift.h"
#include "timefac.h"
#include "utils.h"
#include "utils/mpsort.h"
Include dependency graph for exchange.c:

Go to the source code of this file.

Classes

struct  ExchangePlanEntry
 
struct  ExchangePartCache
 
struct  ExchangePlan
 

Functions

static int domain_exchange_once (ExchangePlan *plan, int do_gc, struct part_manager_type *pman, struct slots_manager_type *sman, MPI_Comm Comm)
 
static void domain_build_plan (int iter, ExchangeLayoutFunc layoutfunc, const void *layout_userdata, ExchangePlan *plan, struct part_manager_type *pman, MPI_Comm Comm)
 
static size_t domain_find_iter_space (ExchangePlan *plan, const struct part_manager_type *pman, const struct slots_manager_type *sman)
 
static void domain_build_exchange_list (ExchangeLayoutFunc layoutfunc, const void *layout_userdata, ExchangePlan *plan, struct DriftData *drift, struct part_manager_type *pman, struct slots_manager_type *sman, MPI_Comm Comm)
 
static void _transpose_plan_entries (ExchangePlanEntry *entries, int *count, int ptype, int NTask)
 
static ExchangePlan domain_init_exchangeplan (MPI_Comm Comm)
 
static void domain_free_exchangeplan (ExchangePlan *plan)
 
int domain_exchange (ExchangeLayoutFunc layoutfunc, const void *layout_userdata, int do_gc, struct DriftData *drift, struct part_manager_type *pman, struct slots_manager_type *sman, int maxiter, MPI_Comm Comm)
 
static void shall_we_compact_slots (int *compact, ExchangePlan *plan, const struct slots_manager_type *sman, MPI_Comm Comm)
 
static void mp_order_by_id (const void *data, void *radix, void *arg)
 
void domain_test_id_uniqueness (struct part_manager_type *pman)
 

Variables

static MPI_Datatype MPI_TYPE_PLAN_ENTRY = 0
 

Function Documentation

◆ _transpose_plan_entries()

static void _transpose_plan_entries ( ExchangePlanEntry entries,
int *  count,
int  ptype,
int  NTask 
)
static

Definition at line 65 of file exchange.c.

66 {
67  int i;
68  for(i = 0; i < NTask; i ++) {
69  if(ptype == -1) {
70  count[i] = entries[i].base;
71  } else {
72  count[i] = entries[i].slots[ptype];
73  }
74  }
75 }
int64_t slots[6]
Definition: exchange.c:17
int64_t base
Definition: exchange.c:16
int NTask
Definition: test_exchange.c:23
static enum TransferType ptype
Definition: zeldovich.c:146

References ExchangePlanEntry::base, NTask, ptype, and ExchangePlanEntry::slots.

Referenced by domain_exchange_once().

Here is the caller graph for this function:

◆ domain_build_exchange_list()

static void domain_build_exchange_list ( ExchangeLayoutFunc  layoutfunc,
const void *  layout_userdata,
ExchangePlan plan,
struct DriftData drift,
struct part_manager_type pman,
struct slots_manager_type sman,
MPI_Comm  Comm 
)
static

Definition at line 386 of file exchange.c.

387 {
388  int i;
389  size_t numthreads = omp_get_max_threads();
390  plan->nexchange = pman->NumPart;
391  /*static schedule below so we only need this much memory*/
392  size_t narr = plan->nexchange/numthreads+numthreads;
393  plan->ExchangeList = (int *) mymalloc2("exchangelist", sizeof(int) * narr * numthreads);
394  /*Garbage particles are counted so we have an accurate memory estimate*/
395  int ngarbage = 0;
396 
397  size_t *nexthr = ta_malloc("nexthr", size_t, numthreads);
398  int **threx = ta_malloc("threx", int *, numthreads);
399  gadget_setup_thread_arrays(plan->ExchangeList, threx, nexthr,narr,numthreads);
400 
401  int ThisTask;
402  MPI_Comm_rank(Comm, &ThisTask);
403 
404  /* Can't update the random shift without re-decomposing domain*/
405  const double rel_random_shift[3] = {0};
406  /* Find drift factor*/
407  double ddrift = 0;
408  if(drift)
409  ddrift = get_exact_drift_factor(drift->CP, drift->ti0, drift->ti1);
410 
411  /* flag the particles that need to be exported */
412  size_t schedsz = plan->nexchange/numthreads+1;
413  #pragma omp parallel for schedule(static, schedsz) reduction(+: ngarbage)
414  for(i=0; i < pman->NumPart; i++)
415  {
416  if(drift) {
417  real_drift_particle(&pman->Base[i], sman, ddrift, pman->BoxSize, rel_random_shift);
418  pman->Base[i].Ti_drift = drift->ti1;
419  }
420  if(pman->Base[i].IsGarbage) {
421  ngarbage++;
422  continue;
423  }
424  int target = layoutfunc(i, layout_userdata);
425  if(target != ThisTask) {
426  const int tid = omp_get_thread_num();
427  threx[tid][nexthr[tid]] = i;
428  nexthr[tid]++;
429  }
430  }
431  plan->ngarbage = ngarbage;
432  /*Merge step for the queue.*/
433  plan->nexchange = gadget_compact_thread_arrays(plan->ExchangeList, threx, nexthr, numthreads);
434  ta_free(threx);
435  ta_free(nexthr);
436 
437  /*Shrink memory*/
438  plan->ExchangeList = (int *) myrealloc(plan->ExchangeList, sizeof(int) * plan->nexchange);
439 }
void real_drift_particle(struct particle_data *pp, struct slots_manager_type *sman, const double ddrift, const double BoxSize, const double random_shift[3])
Definition: drift.c:22
#define ta_malloc(name, type, nele)
Definition: mymalloc.h:25
#define myrealloc(ptr, size)
Definition: mymalloc.h:18
#define ta_free(p)
Definition: mymalloc.h:28
#define mymalloc2(name, size)
Definition: mymalloc.h:16
inttime_t ti0
Definition: drift.h:15
Cosmology * CP
Definition: drift.h:17
inttime_t ti1
Definition: drift.h:16
size_t nexchange
Definition: exchange.c:39
int64_t ngarbage
Definition: exchange.c:41
int * ExchangeList
Definition: exchange.c:37
struct particle_data * Base
Definition: partmanager.h:74
unsigned int IsGarbage
Definition: partmanager.h:19
inttime_t Ti_drift
Definition: partmanager.h:36
void gadget_setup_thread_arrays(int *dest, int *srcs[], size_t sizes[], size_t total_size, int narrays)
Definition: system.c:600
size_t gadget_compact_thread_arrays(int *dest, int *srcs[], size_t sizes[], int narrays)
Definition: system.c:587
int ThisTask
Definition: test_exchange.c:23
double get_exact_drift_factor(Cosmology *CP, inttime_t ti0, inttime_t ti1)
Definition: timefac.c:64

References part_manager_type::Base, part_manager_type::BoxSize, DriftData::CP, ExchangePlan::ExchangeList, gadget_compact_thread_arrays(), gadget_setup_thread_arrays(), get_exact_drift_factor(), particle_data::IsGarbage, mymalloc2, myrealloc, ExchangePlan::nexchange, ExchangePlan::ngarbage, part_manager_type::NumPart, real_drift_particle(), ta_free, ta_malloc, ThisTask, DriftData::ti0, DriftData::ti1, and particle_data::Ti_drift.

Referenced by domain_exchange().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ domain_build_plan()

static void domain_build_plan ( int  iter,
ExchangeLayoutFunc  layoutfunc,
const void *  layout_userdata,
ExchangePlan plan,
struct part_manager_type pman,
MPI_Comm  Comm 
)
static

Definition at line 502 of file exchange.c.

503 {
504  int ptype;
505  size_t n;
506 
507  memset(plan->toGo, 0, sizeof(plan->toGo[0]) * plan->NTask);
508 
509  plan->layouts = (ExchangePartCache *) mymalloc("layoutcache",sizeof(ExchangePartCache) * plan->last);
510 
511  #pragma omp parallel for
512  for(n = 0; n < plan->last; n++)
513  {
514  const int i = plan->ExchangeList[n];
515  const int target = layoutfunc(i, layout_userdata);
516  plan->layouts[n].ptype = pman->Base[i].Type;
517  plan->layouts[n].target = target;
518  if(target >= plan->NTask || target < 0)
519  endrun(4, "layoutfunc for %d returned unreasonable %d for %d tasks\n", i, target, plan->NTask);
520  }
521 
522  /*Do the sum*/
523  for(n = 0; n < plan->last; n++)
524  {
525  plan->toGo[plan->layouts[n].target].base++;
526  plan->toGo[plan->layouts[n].target].slots[plan->layouts[n].ptype]++;
527  }
528 
529  MPI_Alltoall(plan->toGo, 1, MPI_TYPE_PLAN_ENTRY, plan->toGet, 1, MPI_TYPE_PLAN_ENTRY, Comm);
530 
531  memset(&plan->toGoOffset[0], 0, sizeof(plan->toGoOffset[0]));
532  memset(&plan->toGetOffset[0], 0, sizeof(plan->toGetOffset[0]));
533  memcpy(&plan->toGoSum, &plan->toGo[0], sizeof(plan->toGoSum));
534  memcpy(&plan->toGetSum, &plan->toGet[0], sizeof(plan->toGetSum));
535 
536  int rank;
537  int64_t maxbasetogo=-1, maxbasetoget=-1;
538  for(rank = 1; rank < plan->NTask; rank ++) {
539  /* Direct assignment breaks compilers like icc */
540  memcpy(&plan->toGoOffset[rank], &plan->toGoSum, sizeof(plan->toGoSum));
541  memcpy(&plan->toGetOffset[rank], &plan->toGetSum, sizeof(plan->toGetSum));
542 
543  plan->toGoSum.base += plan->toGo[rank].base;
544  plan->toGetSum.base += plan->toGet[rank].base;
545  if(plan->toGo[rank].base > maxbasetogo)
546  maxbasetogo = plan->toGo[rank].base;
547  if(plan->toGet[rank].base > maxbasetoget)
548  maxbasetoget = plan->toGet[rank].base;
549 
550  for(ptype = 0; ptype < 6; ptype++) {
551  plan->toGoSum.slots[ptype] += plan->toGo[rank].slots[ptype];
552  plan->toGetSum.slots[ptype] += plan->toGet[rank].slots[ptype];
553  }
554  }
555 
556  int64_t maxbasetogomax, maxbasetogetmax, sumtogo;
557  MPI_Reduce(&maxbasetogo, &maxbasetogomax, 1, MPI_INT64, MPI_MAX, 0, Comm);
558  MPI_Reduce(&maxbasetoget, &maxbasetogetmax, 1, MPI_INT64, MPI_MAX, 0, Comm);
559  MPI_Reduce(&plan->toGoSum.base, &sumtogo, 1, MPI_INT64, MPI_SUM, 0, Comm);
560  message(0, "iter = %d Total particles in flight: %ld Largest togo: %ld, toget %ld\n", iter, sumtogo, maxbasetogomax, maxbasetogetmax);
561 }
void message(int where, const char *fmt,...)
Definition: endrun.c:175
void endrun(int where, const char *fmt,...)
Definition: endrun.c:147
static MPI_Datatype MPI_TYPE_PLAN_ENTRY
Definition: exchange.c:20
#define mymalloc(name, size)
Definition: mymalloc.h:15
unsigned int target
Definition: exchange.c:25
unsigned int ptype
Definition: exchange.c:24
ExchangePlanEntry toGetSum
Definition: exchange.c:34
ExchangePlanEntry toGoSum
Definition: exchange.c:33
ExchangePartCache * layouts
Definition: exchange.c:45
size_t last
Definition: exchange.c:44
ExchangePlanEntry * toGet
Definition: exchange.c:31
ExchangePlanEntry * toGetOffset
Definition: exchange.c:32
ExchangePlanEntry * toGoOffset
Definition: exchange.c:30
ExchangePlanEntry * toGo
Definition: exchange.c:29
unsigned int Type
Definition: partmanager.h:17
#define MPI_INT64
Definition: system.h:12

References ExchangePlanEntry::base, part_manager_type::Base, endrun(), ExchangePlan::ExchangeList, ExchangePlan::last, ExchangePlan::layouts, message(), MPI_INT64, MPI_TYPE_PLAN_ENTRY, mymalloc, ExchangePlan::NTask, ExchangePartCache::ptype, ptype, ExchangePlanEntry::slots, ExchangePartCache::target, ExchangePlan::toGet, ExchangePlan::toGetOffset, ExchangePlan::toGetSum, ExchangePlan::toGo, ExchangePlan::toGoOffset, ExchangePlan::toGoSum, and particle_data::Type.

Referenced by domain_exchange().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ domain_exchange()

int domain_exchange ( ExchangeLayoutFunc  layoutfunc,
const void *  layout_userdata,
int  do_gc,
struct DriftData drift,
struct part_manager_type pman,
struct slots_manager_type sman,
int  maxiter,
MPI_Comm  Comm 
)

Definition at line 103 of file exchange.c.

103  {
104  int failure = 0;
105 
106  /* register the MPI types used in communication if not yet. */
107  if (MPI_TYPE_PLAN_ENTRY == 0) {
108  MPI_Type_contiguous(sizeof(ExchangePlanEntry), MPI_BYTE, &MPI_TYPE_PLAN_ENTRY);
109  MPI_Type_commit(&MPI_TYPE_PLAN_ENTRY);
110  }
111 
112  /*Structure for building a list of particles that will be exchanged*/
114 
115  walltime_measure("/Domain/exchange/init");
116 
117  int iter = 0;
118 
119  do {
120  if(iter >= maxiter) {
121  failure = 1;
122  break;
123  }
124  domain_build_exchange_list(layoutfunc, layout_userdata, &plan, (iter > 0 ? NULL : drift), pman, sman, Comm);
125 
126  /*Exit early if nothing to do*/
127  if(!MPIU_Any(plan.nexchange > 0, Comm))
128  {
129  myfree(plan.ExchangeList);
130  walltime_measure("/Domain/exchange/togo");
131  break;
132  }
133 
134  /* determine for each rank how many particles have to be shifted to other ranks */
135  plan.last = domain_find_iter_space(&plan, pman, sman);
136  domain_build_plan(iter, layoutfunc, layout_userdata, &plan, pman, Comm);
137  walltime_measure("/Domain/exchange/togo");
138 
139 
140  /* Do a GC if we are asked to or if this isn't the last iteration.
141  * The gc decision is made collective in domain_exchange_once,
142  * and a gc will also be done if we have no space for particles.*/
143  int really_do_gc = do_gc || (plan.last < plan.nexchange);
144 
145  failure = domain_exchange_once(&plan, really_do_gc, pman, sman, Comm);
146 
147  myfree(plan.ExchangeList);
148 
149  if(failure)
150  break;
151  iter++;
152  }
153  while(MPIU_Any(plan.last < plan.nexchange, Comm));
154 #ifdef DEBUG
155  /* This does not apply for the FOF code, where the exchange list is pre-assigned
156  * and we only get one iteration. */
157  if(!failure && maxiter > 1) {
159  /* Do not drift again*/
160  domain_build_exchange_list(layoutfunc, layout_userdata, &plan9, NULL, pman, sman, Comm);
161  if(plan9.nexchange > 0)
162  endrun(5, "Still have %ld particles in exchange list\n", plan9.nexchange);
163  myfree(plan9.ExchangeList);
164  domain_free_exchangeplan(&plan9);
165  }
166 #endif
168 
169  return failure;
170 }
static void domain_build_plan(int iter, ExchangeLayoutFunc layoutfunc, const void *layout_userdata, ExchangePlan *plan, struct part_manager_type *pman, MPI_Comm Comm)
Definition: exchange.c:502
static ExchangePlan domain_init_exchangeplan(MPI_Comm Comm)
Definition: exchange.c:78
static int domain_exchange_once(ExchangePlan *plan, int do_gc, struct part_manager_type *pman, struct slots_manager_type *sman, MPI_Comm Comm)
Definition: exchange.c:191
static void domain_build_exchange_list(ExchangeLayoutFunc layoutfunc, const void *layout_userdata, ExchangePlan *plan, struct DriftData *drift, struct part_manager_type *pman, struct slots_manager_type *sman, MPI_Comm Comm)
Definition: exchange.c:386
static void domain_free_exchangeplan(ExchangePlan *plan)
Definition: exchange.c:94
static size_t domain_find_iter_space(ExchangePlan *plan, const struct part_manager_type *pman, const struct slots_manager_type *sman)
Definition: exchange.c:443
#define myfree(x)
Definition: mymalloc.h:19
int MPIU_Any(int condition, MPI_Comm comm)
Definition: system.c:545
#define walltime_measure(name)
Definition: walltime.h:8

References domain_build_exchange_list(), domain_build_plan(), domain_exchange_once(), domain_find_iter_space(), domain_free_exchangeplan(), domain_init_exchangeplan(), endrun(), ExchangePlan::ExchangeList, ExchangePlan::last, MPI_TYPE_PLAN_ENTRY, MPIU_Any(), myfree, ExchangePlan::nexchange, and walltime_measure.

Referenced by domain_decompose_full(), domain_maintain(), fof_try_particle_exchange(), test_exchange(), test_exchange_uneven(), test_exchange_with_garbage(), and test_exchange_zero_slots().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ domain_exchange_once()

static int domain_exchange_once ( ExchangePlan plan,
int  do_gc,
struct part_manager_type pman,
struct slots_manager_type sman,
MPI_Comm  Comm 
)
static

Definition at line 191 of file exchange.c.

192 {
193  size_t n;
194  int ptype;
195  struct particle_data *partBuf;
196  char * slotBuf[6] = {NULL, NULL, NULL, NULL, NULL, NULL};
197 
198  /* Check whether the domain exchange will succeed.
199  * Garbage particles will be collected after the particles are exported, so do not need to count.*/
200  int64_t needed = pman->NumPart + plan->toGetSum.base - plan->toGoSum.base - plan->ngarbage;
201  if(needed > pman->MaxPart)
202  message(1,"Too many particles for exchange: NumPart=%ld count_get = %d count_togo=%d garbage = %d MaxPart=%ld\n",
203  pman->NumPart, plan->toGetSum.base, plan->toGoSum.base, plan->ngarbage, pman->MaxPart);
204  if(MPIU_Any(needed > pman->MaxPart, Comm)) {
205  myfree(plan->layouts);
206  return 1;
207  }
208 
209  partBuf = (struct particle_data *) mymalloc2("partBuf", plan->toGoSum.base * sizeof(struct particle_data));
210 
211  for(ptype = 0; ptype < 6; ptype++) {
212  if(!sman->info[ptype].enabled) continue;
213  slotBuf[ptype] = (char *) mymalloc2("SlotBuf", plan->toGoSum.slots[ptype] * sman->info[ptype].elsize);
214  }
215 
216  ExchangePlanEntry * toGoPtr = ta_malloc("toGoPtr", ExchangePlanEntry, plan->NTask);
217  memset(toGoPtr, 0, sizeof(toGoPtr[0]) * plan->NTask);
218 
219  for(n = 0; n < plan->last; n++)
220  {
221  const int i = plan->ExchangeList[n];
222  /* preparing for export */
223  const int target = plan->layouts[n].target;
224 
225  int type = plan->layouts[n].ptype;
226 
227  /* watch out thread unsafe */
228  int bufPI = toGoPtr[target].slots[type];
229  toGoPtr[target].slots[type] ++;
230  size_t elsize = sman->info[type].elsize;
231  if(sman->info[type].enabled)
232  memcpy(slotBuf[type] + (bufPI + plan->toGoOffset[target].slots[type]) * elsize,
233  (char*) sman->info[type].ptr + pman->Base[i].PI * elsize, elsize);
234  /* now copy the base P; after PI has been updated */
235  memcpy(&(partBuf[plan->toGoOffset[target].base + toGoPtr[target].base]), pman->Base+i, sizeof(struct particle_data));
236  toGoPtr[target].base ++;
237  /* mark the particle for removal. Both secondary and base slots will be marked. */
238  slots_mark_garbage(i, pman, sman);
239  }
240 
241  myfree(plan->layouts);
242  ta_free(toGoPtr);
243  walltime_measure("/Domain/exchange/makebuf");
244 
245  /* Do a gc if we were asked to, or if we need one
246  * to have enough space for the incoming material*/
247  int shall_we_gc = do_gc || (pman->NumPart + plan->toGetSum.base > pman->MaxPart);
248  if(MPIU_Any(shall_we_gc, Comm)) {
249  /*Find which slots to gc*/
250  int compact[6] = {0};
251  shall_we_compact_slots(compact, plan, sman, Comm);
252  slots_gc(compact, pman, sman);
253 
254  walltime_measure("/Domain/exchange/garbage");
255  }
256 
257  int64_t newNumPart;
258  int64_t newSlots[6] = {0};
259  newNumPart = pman->NumPart + plan->toGetSum.base;
260 
261  for(ptype = 0; ptype < 6; ptype ++) {
262  if(!sman->info[ptype].enabled) continue;
263  newSlots[ptype] = sman->info[ptype].size + plan->toGetSum.slots[ptype];
264  }
265 
266  if(newNumPart > pman->MaxPart) {
267  endrun(787878, "NumPart=%ld MaxPart=%ld\n", newNumPart, pman->MaxPart);
268  }
269 
270  slots_reserve(1, newSlots, sman);
271 
272  int * sendcounts = (int*) ta_malloc("sendcounts", int, plan->NTask);
273  int * senddispls = (int*) ta_malloc("senddispls", int, plan->NTask);
274  int * recvcounts = (int*) ta_malloc("recvcounts", int, plan->NTask);
275  int * recvdispls = (int*) ta_malloc("recvdispls", int, plan->NTask);
276 
277  _transpose_plan_entries(plan->toGo, sendcounts, -1, plan->NTask);
278  _transpose_plan_entries(plan->toGoOffset, senddispls, -1, plan->NTask);
279  _transpose_plan_entries(plan->toGet, recvcounts, -1, plan->NTask);
280  _transpose_plan_entries(plan->toGetOffset, recvdispls, -1, plan->NTask);
281 
282  /* Ensure the mallocs are finished on all tasks before we start sending the data*/
283  MPI_Barrier(Comm);
284 #ifdef DEBUG
285  message(0, "Starting particle data exchange\n");
286 #endif
287  /* recv at the end */
288  MPI_Alltoallv_sparse(partBuf, sendcounts, senddispls, MPI_TYPE_PARTICLE,
289  pman->Base + pman->NumPart, recvcounts, recvdispls, MPI_TYPE_PARTICLE,
290  Comm);
291 
292  for(ptype = 0; ptype < 6; ptype ++) {
293  /* skip unused slot types */
294  if(!sman->info[ptype].enabled) continue;
295 
296  size_t elsize = sman->info[ptype].elsize;
297  int N_slots = sman->info[ptype].size;
298  char * ptr = sman->info[ptype].ptr;
299  _transpose_plan_entries(plan->toGo, sendcounts, ptype, plan->NTask);
300  _transpose_plan_entries(plan->toGoOffset, senddispls, ptype, plan->NTask);
301  _transpose_plan_entries(plan->toGet, recvcounts, ptype, plan->NTask);
302  _transpose_plan_entries(plan->toGetOffset, recvdispls, ptype, plan->NTask);
303 
304 #ifdef DEBUG
305  message(0, "Starting exchange for slot %d\n", ptype);
306 #endif
307 
308  /* recv at the end */
309  MPI_Alltoallv_sparse(slotBuf[ptype], sendcounts, senddispls, MPI_TYPE_SLOT[ptype],
310  ptr + N_slots * elsize,
311  recvcounts, recvdispls, MPI_TYPE_SLOT[ptype],
312  Comm);
313  }
314 
315 #ifdef DEBUG
316  message(0, "Done with AlltoAllv\n");
317 #endif
318  int src;
319  for(src = 0; src < plan->NTask; src++) {
320  /* unpack each source rank */
321  int64_t newPI[6];
322  int64_t i;
323  for(ptype = 0; ptype < 6; ptype ++) {
324  newPI[ptype] = sman->info[ptype].size + plan->toGetOffset[src].slots[ptype];
325  }
326 
327  for(i = pman->NumPart + plan->toGetOffset[src].base;
328  i < pman->NumPart + plan->toGetOffset[src].base + plan->toGet[src].base;
329  i++) {
330 
331  int ptype = pman->Base[i].Type;
332 
333 
334  pman->Base[i].PI = newPI[ptype];
335 
336  newPI[ptype]++;
337 
338  if(!sman->info[ptype].enabled) continue;
339 
340  int PI = pman->Base[i].PI;
341  if(BASESLOT_PI(PI, ptype, sman)->ID != pman->Base[i].ID) {
342  endrun(1, "Exchange: P[%d].ID = %ld (type %d) != SLOT ID = %ld. garbage: %d ReverseLink: %d\n",i,pman->Base[i].ID, pman->Base[i].Type, BASESLOT_PI(PI, ptype, sman)->ID, pman->Base[i].IsGarbage, BASESLOT_PI(PI, ptype, sman)->ReverseLink);
343  }
344  }
345  for(ptype = 0; ptype < 6; ptype ++) {
346  if(newPI[ptype] !=
347  sman->info[ptype].size + plan->toGetOffset[src].slots[ptype]
348  + plan->toGet[src].slots[ptype]) {
349  endrun(1, "N_slots mismatched\n");
350  }
351  }
352  }
353 
354  walltime_measure("/Domain/exchange/alltoall");
355 
356  myfree(recvdispls);
357  myfree(recvcounts);
358  myfree(senddispls);
359  myfree(sendcounts);
360  for(ptype = 5; ptype >=0; ptype --) {
361  if(!sman->info[ptype].enabled) continue;
362  myfree(slotBuf[ptype]);
363  }
364  myfree(partBuf);
365 
366  pman->NumPart = newNumPart;
367 
368  for(ptype = 0; ptype < 6; ptype++) {
369  if(!sman->info[ptype].enabled) continue;
370  sman->info[ptype].size = newSlots[ptype];
371  }
372 
373 #ifdef DEBUG
375  slots_check_id_consistency(pman, sman);
376 #endif
377  walltime_measure("/Domain/exchange/finalize");
378 
379  return 0;
380 }
static void shall_we_compact_slots(int *compact, ExchangePlan *plan, const struct slots_manager_type *sman, MPI_Comm Comm)
Definition: exchange.c:175
void domain_test_id_uniqueness(struct part_manager_type *pman)
Definition: exchange.c:570
static void _transpose_plan_entries(ExchangePlanEntry *entries, int *count, int ptype, int NTask)
Definition: exchange.c:65
size_t slots_reserve(int where, int64_t atleast[6], struct slots_manager_type *sman)
Definition: slotsmanager.c:475
MPI_Datatype MPI_TYPE_SLOT[6]
Definition: slotsmanager.c:13
MPI_Datatype MPI_TYPE_PARTICLE
Definition: slotsmanager.c:11
void slots_mark_garbage(int i, struct part_manager_type *pman, struct slots_manager_type *sman)
Definition: slotsmanager.c:577
int slots_gc(int *compact_slots, struct part_manager_type *pman, struct slots_manager_type *sman)
Definition: slotsmanager.c:145
void slots_check_id_consistency(struct part_manager_type *pman, struct slots_manager_type *sman)
Definition: slotsmanager.c:587
#define BASESLOT_PI(PI, ptype, sman)
Definition: slotsmanager.h:132
MyIDType ID
Definition: partmanager.h:38
size_t elsize
Definition: slotsmanager.h:13
int64_t size
Definition: slotsmanager.h:12
char * ptr
Definition: slotsmanager.h:10
struct slot_info info[6]
Definition: slotsmanager.h:112
int MPI_Alltoallv_sparse(void *sendbuf, int *sendcnts, int *sdispls, MPI_Datatype sendtype, void *recvbuf, int *recvcnts, int *rdispls, MPI_Datatype recvtype, MPI_Comm comm)
Definition: system.c:353

References _transpose_plan_entries(), ExchangePlanEntry::base, part_manager_type::Base, BASESLOT_PI, domain_test_id_uniqueness(), slot_info::elsize, slot_info::enabled, endrun(), ExchangePlan::ExchangeList, particle_data::ID, slots_manager_type::info, particle_data::IsGarbage, ExchangePlan::last, ExchangePlan::layouts, part_manager_type::MaxPart, message(), MPI_Alltoallv_sparse(), MPI_TYPE_PARTICLE, MPI_TYPE_SLOT, MPIU_Any(), myfree, mymalloc2, ExchangePlan::ngarbage, ExchangePlan::NTask, part_manager_type::NumPart, particle_data::PI, slot_info::ptr, ExchangePartCache::ptype, ptype, shall_we_compact_slots(), slot_info::size, ExchangePlanEntry::slots, slots_check_id_consistency(), slots_gc(), slots_mark_garbage(), slots_reserve(), ta_free, ta_malloc, ExchangePartCache::target, ExchangePlan::toGet, ExchangePlan::toGetOffset, ExchangePlan::toGetSum, ExchangePlan::toGo, ExchangePlan::toGoOffset, ExchangePlan::toGoSum, particle_data::Type, and walltime_measure.

Referenced by domain_exchange().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ domain_find_iter_space()

static size_t domain_find_iter_space ( ExchangePlan plan,
const struct part_manager_type pman,
const struct slots_manager_type sman 
)
static

Definition at line 443 of file exchange.c.

444 {
445  int ptype;
446  size_t n, nlimit = mymalloc_freebytes();
447 
448  if (nlimit < 4096L * 6 + plan->NTask * 2 * sizeof(MPI_Request))
449  endrun(1, "Not enough memory free to store requests!\n");
450 
451  nlimit -= 4096 * 2L + plan->NTask * 2 * sizeof(MPI_Request);
452 
453  /* Save some memory for memory headers and wasted space at the end of each allocation.
454  * Need max. 2*4096 for each heap-allocated array.*/
455  nlimit -= 4096 * 4L;
456 
457  message(0, "Using %td bytes for exchange.\n", nlimit);
458 
459  size_t maxsize = 0;
460  for(ptype = 0; ptype < 6; ptype ++ ) {
461  if(!sman->info[ptype].enabled) continue;
462  if (maxsize < sman->info[ptype].elsize)
463  maxsize = sman->info[ptype].elsize;
464  /*Reserve space for slotBuf header*/
465  nlimit -= 4096 * 2L;
466  }
467  size_t package = sizeof(pman->Base[0]) + maxsize;
468  if(package >= nlimit || nlimit > mymalloc_freebytes())
469  endrun(212, "Package is too large, no free memory: package = %lu nlimit = %lu.", package, nlimit);
470 
471  /* We want to avoid doing an alltoall with
472  * more than 2GB of material as this hangs.*/
473  const size_t maxexch = 1024L*1024L*2030L;
474 
475  /* Fast path: if we have enough space no matter what type the particles
476  * are we don't need to check them.*/
477  if((plan->nexchange * (sizeof(pman->Base[0]) + maxsize + sizeof(ExchangePartCache)) < nlimit) &&
478  (plan->nexchange * sizeof(pman->Base[0]) < maxexch) && (plan->nexchange * maxsize < maxexch)) {
479  return plan->nexchange;
480  }
481 
482  size_t partexch = 0;
483  size_t slotexch[6] = {0};
484  /*Find how many particles we have space for.*/
485  for(n = 0; n < plan->nexchange; n++)
486  {
487  const int i = plan->ExchangeList[n];
488  const int ptype = pman->Base[i].Type;
489  partexch += sizeof(pman->Base[0]);
490  slotexch[ptype] += sman->info[ptype].elsize;
491  package += sizeof(pman->Base[0]) + sman->info[ptype].elsize + sizeof(ExchangePartCache);
492  if(package >= nlimit || slotexch[ptype] >= maxexch || partexch >= maxexch) {
493 // message(1,"Not enough space for particles: nlimit=%d, package=%d\n",nlimit,package);
494  break;
495  }
496  }
497  return n;
498 }
#define mymalloc_freebytes()
Definition: mymalloc.h:31

References part_manager_type::Base, slot_info::elsize, slot_info::enabled, endrun(), ExchangePlan::ExchangeList, slots_manager_type::info, message(), mymalloc_freebytes, ExchangePlan::nexchange, ExchangePlan::NTask, NTask, ptype, and particle_data::Type.

Referenced by domain_exchange().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ domain_free_exchangeplan()

static void domain_free_exchangeplan ( ExchangePlan plan)
static

Definition at line 94 of file exchange.c.

95 {
96  myfree(plan->toGetOffset);
97  myfree(plan->toGet);
98  myfree(plan->toGoOffset);
99  myfree(plan->toGo);
100 }

References myfree, ExchangePlan::toGet, ExchangePlan::toGetOffset, ExchangePlan::toGo, and ExchangePlan::toGoOffset.

Referenced by domain_exchange().

Here is the caller graph for this function:

◆ domain_init_exchangeplan()

static ExchangePlan domain_init_exchangeplan ( MPI_Comm  Comm)
static

toGo[0][task*NTask + partner] gives the number of particles in task 'task' that have to go to task 'partner' toGo[1] is SPH, toGo[2] is BH and toGo[3] is stars

Definition at line 78 of file exchange.c.

79 {
80  ExchangePlan plan;
81  MPI_Comm_size(Comm, &plan.NTask);
86  plan.toGo = (ExchangePlanEntry *) mymalloc2("toGo", sizeof(plan.toGo[0]) * plan.NTask);
87  plan.toGoOffset = (ExchangePlanEntry *) mymalloc2("toGo", sizeof(plan.toGo[0]) * plan.NTask);
88  plan.toGet = (ExchangePlanEntry *) mymalloc2("toGet", sizeof(plan.toGo[0]) * plan.NTask);
89  plan.toGetOffset = (ExchangePlanEntry *) mymalloc2("toGet", sizeof(plan.toGo[0]) * plan.NTask);
90  return plan;
91 }

References mymalloc2, ExchangePlan::NTask, ExchangePlan::toGet, ExchangePlan::toGetOffset, ExchangePlan::toGo, and ExchangePlan::toGoOffset.

Referenced by domain_exchange().

Here is the caller graph for this function:

◆ domain_test_id_uniqueness()

void domain_test_id_uniqueness ( struct part_manager_type pman)

Definition at line 570 of file exchange.c.

571 {
572  int64_t i;
573  MyIDType *ids;
574  int NTask, ThisTask;
575  MPI_Comm_size(MPI_COMM_WORLD, &NTask);
576  MPI_Comm_rank(MPI_COMM_WORLD, &ThisTask);
577 
578  message(0, "Testing ID uniqueness...\n");
579 
580  ids = (MyIDType *) mymalloc("ids", pman->NumPart * sizeof(MyIDType));
581 
582  #pragma omp parallel for
583  for(i = 0; i < pman->NumPart; i++) {
584  ids[i] = pman->Base[i].ID;
585  if(pman->Base[i].IsGarbage)
586  ids[i] = (MyIDType) -1;
587  }
588 
589  mpsort_mpi(ids, pman->NumPart, sizeof(MyIDType), mp_order_by_id, 8, NULL, MPI_COMM_WORLD);
590 
591  /*Remove garbage from the end*/
592  int64_t nids = pman->NumPart;
593  while(nids > 0 && (ids[nids-1] == (MyIDType)-1)) {
594  nids--;
595  }
596 
597  #pragma omp parallel for
598  for(i = 1; i < nids; i++) {
599  if(ids[i] <= ids[i - 1])
600  {
601  endrun(12, "non-unique (or non-ordered) ID=%013ld found on task=%d (i=%d NumPart=%ld)\n",
602  ids[i], ThisTask, i, nids);
603  }
604  }
605 
606  MyIDType * prev = ta_malloc("prev", MyIDType, 1);
607  memset(prev, 0, sizeof(MyIDType));
608  const int TAG = 0xdead;
609 
610  if(NTask > 1) {
611  if(ThisTask == 0) {
612  MyIDType * ptr = prev;
613  if(nids > 0) {
614  ptr = ids;
615  }
616  MPI_Send(ptr, sizeof(MyIDType), MPI_BYTE, ThisTask + 1, TAG, MPI_COMM_WORLD);
617  }
618  else if(ThisTask == NTask - 1) {
619  MPI_Recv(prev, sizeof(MyIDType), MPI_BYTE,
620  ThisTask - 1, TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
621  }
622  else if(nids == 0) {
623  /* simply pass through whatever we get */
624  MPI_Recv(prev, sizeof(MyIDType), MPI_BYTE, ThisTask - 1, TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
625  MPI_Send(prev, sizeof(MyIDType), MPI_BYTE, ThisTask + 1, TAG, MPI_COMM_WORLD);
626  }
627  else
628  {
629  MPI_Sendrecv(
630  ids+(nids - 1), sizeof(MyIDType), MPI_BYTE,
631  ThisTask + 1, TAG,
632  prev, sizeof(MyIDType), MPI_BYTE,
633  ThisTask - 1, TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
634  }
635  }
636 
637  if(ThisTask > 1) {
638  if(nids > 0) {
639  if(ids[0] <= *prev && ids[0])
640  endrun(13, "non-unique ID=%ld found on task=%d\n", ids[0], ThisTask);
641  }
642  }
643 
644  myfree(prev);
645  myfree(ids);
646 }
static void mp_order_by_id(const void *data, void *radix, void *arg)
Definition: exchange.c:565
#define mpsort_mpi(base, nmemb, elsize, radix, rsize, arg, comm)
Definition: mpsort.h:26
uint64_t MyIDType
Definition: types.h:10

References part_manager_type::Base, endrun(), particle_data::ID, particle_data::IsGarbage, message(), mp_order_by_id(), mpsort_mpi, myfree, mymalloc, NTask, part_manager_type::NumPart, ta_malloc, and ThisTask.

Referenced by domain_decompose_full(), domain_exchange_once(), init(), test_exchange(), test_exchange_uneven(), test_exchange_with_garbage(), and test_exchange_zero_slots().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ mp_order_by_id()

static void mp_order_by_id ( const void *  data,
void *  radix,
void *  arg 
)
static

Definition at line 565 of file exchange.c.

565  {
566  ((uint64_t *) radix)[0] = ((MyIDType*) data)[0];
567 }

Referenced by domain_test_id_uniqueness().

Here is the caller graph for this function:

◆ shall_we_compact_slots()

static void shall_we_compact_slots ( int *  compact,
ExchangePlan plan,
const struct slots_manager_type sman,
MPI_Comm  Comm 
)
static

Definition at line 175 of file exchange.c.

176 {
177  int ptype;
178  int lcompact[6] = {0};
179  for(ptype = 0; ptype < 6; ptype++) {
180  /* gc if we are low on slot memory. */
181  if (sman->info[ptype].size + plan->toGetSum.slots[ptype] > 0.95 * sman->info[ptype].maxsize)
182  lcompact[ptype] = 1;
183  /* gc if we had a very large exchange. */
184  if(plan->toGoSum.slots[ptype] > 0.1 * sman->info[ptype].size)
185  lcompact[ptype] = 1;
186  }
187  /*Make the slot compaction collective*/
188  MPI_Allreduce(lcompact, compact, 6, MPI_INT, MPI_LOR, Comm);
189 }
int64_t maxsize
Definition: slotsmanager.h:11

References slots_manager_type::info, slot_info::maxsize, ptype, slot_info::size, ExchangePlanEntry::slots, ExchangePlan::toGetSum, and ExchangePlan::toGoSum.

Referenced by domain_exchange_once().

Here is the caller graph for this function:

Variable Documentation

◆ MPI_TYPE_PLAN_ENTRY

MPI_Datatype MPI_TYPE_PLAN_ENTRY = 0
static

Definition at line 20 of file exchange.c.

Referenced by domain_build_plan(), and domain_exchange().