15 #define BREAKPOINT raise(SIGTRAP)
17 #define FACT1 0.366025403785
58 MPI_Comm_rank(MPI_COMM_WORLD, &
ThisTask);
113 printf("tw->WorkSet[0] = %d (%d) %s:%d\n", tw->WorkSet ? tw->WorkSet[0] : 0, tw->WorkSetSize, __FILE__, __LINE__); \
120 const size_t thread_id = omp_get_thread_num();
131 size_t localbunch = tw->
BunchSize/omp_get_max_threads();
134 if(localbunch > tw->
BunchSize - thread_id * localbunch)
139 for(j = 0; j <
NTask; j++)
150 return local_exports;
164 const size_t NumThreads = omp_get_max_threads();
165 MPI_Comm_size(MPI_COMM_WORLD, &tw->
NTask);
173 int64_t nmin, nmax, total;
177 message(0,
"Treewalk %s iter %d: total part %ld max/MPI: %ld min/MPI: %ld balance: %g.\n",
205 if(freebytes <= 4096 * 11 * bytesperbuffer) {
206 endrun(1231245,
"Not enough memory for exporting any particles: needed %d bytes have %d. \n", bytesperbuffer, freebytes-4096*10);
208 freebytes -= 4096 * 10 * bytesperbuffer;
210 tw->
BunchSize = (size_t) floor(((
double)freebytes)/ bytesperbuffer);
212 const size_t twogb = 1024*1024*3092L;
217 endrun(2,
"Only enough free memory to export %d elements.\n", tw->
BunchSize);
250 for(d = 0; d < 3; d ++) {
251 query->
Pos[d] =
P[i].Pos[d];
265 tw->
fill(i, query, tw);
273 result->ID = query->ID;
281 tw->
reduce(i, result, mode, tw);
283 if(
P[i].ID != result->ID)
284 endrun(2,
"Mismatched ID (%ld != %ld) for particle %d in treewalk reduction, mode %d\n",
P[i].ID, result->ID, i, mode);
317 int end = chnk + chnksz;
325 for(k = chnk; k < end; k++) {
337 const int rt = tw->
visit(input, output, lv);
355 message(1,
"Tree export buffer full with %ld particles. start %ld lastsucceeded: %ld end %d size %ld.\n",
357 #pragma omp atomic write
360 if(lastSucceeded < end) {
367 endrun(5,
"Something screwed up in export queue: nexp %ld (local %d) last %d < index %d\n", lv->
Nexport,
373 }
while(chnk < tw->WorkSetSize);
377 return lastSucceeded;
382 cmpint(
const void *a,
const void *b)
384 const int * aa = (
const int *) a;
385 const int * bb = (
const int *) b;
386 if(aa < bb)
return -1;
387 if(aa > bb)
return 1;
396 tw->
NThread = omp_get_max_threads();
398 if(!tw->
haswork && !may_have_garbage)
424 size_t schedsz = size/tw->
NThread+1;
425 #pragma omp parallel for schedule(static, schedsz)
426 for(i=0; i < size; i++)
428 const int tid = omp_get_thread_num();
430 const int p_i = active_set ? active_set[i] : (int) i;
439 if(nqthr[tid] >= tsize)
440 endrun(5,
"tid = %d nqthr = %d, tsize = %d size = %d, tw->Nthread = %d i = %d\n", tid, nqthr[tid], tsize, size, tw->
NThread, i);
442 thrqueue[tid][nqthr[tid]] = p_i;
455 for(i = 0; i < nqueue - 1; i ++) {
457 endrun(8829,
"A few particles are twicely active.\n");
481 #pragma omp parallel reduction(min: lastSucceeded)
483 int tid = omp_get_thread_num();
484 lastSucceeded =
real_ev(local_exports, tw, &dataindexoffset[tid], &nexports[tid], ¤tIndex);
491 for(i = 0; i < tw->
NThread; i++)
494 if(tw->
Nexport != dataindexoffset[i])
519 MPI_Allreduce(&done, &ndone, 1, MPI_INT, MPI_SUM, MPI_COMM_WORLD);
535 int nlist = tw->
Nlist;
536 #pragma omp parallel reduction(+: nnodes) reduction(+: nlist)
544 for(j = 0; j < tw->
Nimport; j++) {
549 tw->
visit(input, output, lv);
570 endrun(1,
"Trying to export a ghost particle.\n");
572 const int target = lv->
target;
580 if(exportflag[task] != target)
582 exportflag[task] = target;
594 exportnodecount[task] = 0;
595 exportindex[task] = nexp;
622 endrun(0,
"Tree has been freed before this treewalk.\n");
631 #pragma omp parallel for
679 #pragma omp parallel for
695 MPI_Type_contiguous(elsize, MPI_BYTE, &type);
696 MPI_Type_commit(&type);
707 MPI_Type_free(&type);
715 int NTask = tw->NTask;
728 for(i = 0; i < tw->Nexport; i++) {
736 tw->timewait1 +=
timediff(tstart, tend);
750 void * recvbuf =
mymalloc(
"EvDataGet", tw->Nimport * tw->query_type_elsize);
751 char * sendbuf = (
char *)
mymalloc(
"EvDataIn", tw->Nexport * tw->query_type_elsize);
755 #pragma omp parallel for
756 for(j = 0; j < tw->Nexport; j++)
764 tw->timecomp1 +=
timediff(tstart, tend);
767 ev_communicate(sendbuf, recvbuf, tw->query_type_elsize, sndrcv, 0);
769 tw->timecommsumm1 +=
timediff(tstart, tend);
771 tw->dataget = (
char *) recvbuf;
774 size_t total_full_nodes = 0;
776 for(i = 0; i < tw->Nexport; i++) {
784 nodelist_hist[j-1]++;
788 message(1,
"Avg. node utilisation: %g Nodes %lu %lu %lu %lu %lu %lu %lu %lu\n", (
double)total_full_nodes/tw->Nexport,nodelist_hist[0], nodelist_hist[1],nodelist_hist[2], nodelist_hist[3], nodelist_hist[4],nodelist_hist[5], nodelist_hist[6], nodelist_hist[7] );
816 const int Nexport = tw->
Nexport;
818 char * recvbuf = (
char*)
mymalloc(
"EvDataOut",
828 for(j = 0; j < Nexport; j++) {
835 int * UniqueOff = (
int *)
mymalloc(
"UniqueIndex",
sizeof(
int) * (Nexport + 1));
839 for(j = 1; j < Nexport; j++) {
841 UniqueOff[++Nunique] = j;
844 UniqueOff[++Nunique] = Nexport;
847 #pragma omp parallel for
848 for(j = 0; j < Nunique; j++)
852 int start = UniqueOff[j];
853 int end = UniqueOff[j + 1];
854 for(k = start; k < end; k++) {
878 static int ev_task_cmp_by_top_node(
const void * p1,
const void * p2) {
879 const struct ev_task * t1 = p1, * t2 = p2;
880 if(t1->top_node > t2->top_node)
return 1;
881 if(t1->top_node < t2->top_node)
return -1;
885 static void fill_task_queue (
TreeWalk * tw,
struct ev_task * tq,
int * pq,
int length) {
887 #pragma omp parallel for if(length > 1024)
888 for(i = 0; i < length; i++) {
937 endrun(5,
"Tried to run treewalk for particle mask %d but tree mask is %d.\n", iter->
mask, lv->
tw->
tree->
mask);
941 int ninteractions = 0;
944 for(inode = 0; (lv->
mode == 0 && inode < 1)|| (lv->
mode == 1 && inode < NODELISTLENGTH && I->NodeList[inode] >= 0); inode++)
948 if(numcand < 0)
return numcand;
954 for(numngb = 0; numngb < numcand; numngb ++) {
955 int other = lv->
ngblist[numngb];
958 if(
P[other].IsGarbage)
962 if(!((1<<
P[other].Type) & iter->
mask)) {
976 double h2 = dist * dist;
977 for(d = 0; d < 3; d ++) {
980 r2 += iter->
dist[d] * iter->
dist[d];
983 if(r2 > h2)
continue;
993 ninteractions += numngb;
1017 dist = iter->
Hsml + 0.5 * current->
len;
1024 for(d = 0; d < 3; d ++) {
1026 if(dx > dist)
return 0;
1027 if(dx < -dist)
return 0;
1033 if(r2 > dist * dist) {
1062 const double BoxSize = tree->
BoxSize;
1070 endrun(12312,
"Particles should be added before getting here! no = %d, father = %d (ptype = %d) start=%d mode = %d\n", no, fat, tree->
Nodes[fat].
f.
ChildType, startnode, lv->
mode);
1074 endrun(12312,
"Pseudo-Particles should be added before getting here! no = %d, father = %d (ptype = %d)\n", no, fat, tree->
Nodes[fat].
f.
ChildType);
1077 struct NODE *current = &tree->
Nodes[no];
1083 if(current->
f.
TopLevel && no != startnode) {
1090 if(0 ==
cull_node(I, iter, current, BoxSize)) {
1099 int * suns = current->
s.
suns;
1104 int type = (current->
s.
Types >> (3*i)) % 8;
1106 if(!((1<<type) & iter->
mask))
1109 lv->
ngblist[numcand++] = suns[i];
1118 endrun(12312,
"Secondary for particle %d from node %d found pseudo at %d.\n", lv->
target, startnode, current);
1129 no = current->
s.
suns[0];
1154 for(inode = 0; (lv->
mode == 0 && inode < 1)|| (lv->
mode == 1 && inode < NODELISTLENGTH && I->NodeList[inode] >= 0); inode++)
1158 const double BoxSize = tree->
BoxSize;
1162 struct NODE *current = &tree->
Nodes[no];
1175 if(0 ==
cull_node(I, iter, current, BoxSize)) {
1184 int * suns = current->
s.
suns;
1189 int type = (current->
s.
Types >> (3*i)) % 8;
1191 if(!((1<<type) & iter->
mask))
1195 int other = suns[i];
1197 if(
P[other].IsGarbage)
1201 if(!((1<<
P[other].Type) & iter->
mask))
1204 double dist = iter->
Hsml;
1207 double h2 = dist * dist;
1208 for(d = 0; d < 3; d ++) {
1211 r2 += iter->
dist[d] * iter->
dist[d];
1214 if(r2 > h2)
continue;
1218 iter->
other = other;
1229 endrun(12312,
"Secondary for particle %d from node %d found pseudo at %d.\n", lv->
target, I->
NodeList[inode], current);
1240 no = current->
s.
suns[0];
1257 int NumThreads = omp_get_max_threads();
1267 int * ReDoQueue = tw->
WorkSet;
1272 int orig_queue_alloc = (tw->
haswork != NULL);
1279 int * CurQueue = ReDoQueue;
1281 for(i = 0; i < NumThreads; i++) {
1289 ReDoQueue = (
int *)
mymalloc2(
"ReDoQueue", size *
sizeof(
int) * NumThreads);
1293 ReDoQueue = (
int *)
mymalloc(
"ReDoQueue", size *
sizeof(
int) * NumThreads);
1312 MPI_Allreduce(&size, &ntot, 1,
MPI_INT64, MPI_SUM, MPI_COMM_WORLD);
1317 for(i = 1; i < NumThreads; i++) {
1323 double minngb, maxngb;
1324 MPI_Reduce(&tw->
maxnumngb[0], &maxngb, 1, MPI_DOUBLE, MPI_MAX, 0, MPI_COMM_WORLD);
1325 MPI_Reduce(&tw->
minnumngb[0], &minngb, 1, MPI_DOUBLE, MPI_MIN, 0, MPI_COMM_WORLD);
1326 message(0,
"Max ngb=%g, min ngb=%g\n", maxngb, minngb);
1329 ReDoQueue = (
int *)
myrealloc(ReDoQueue,
sizeof(
int) * size);
1347 if(ntot == 1 && size > 0 && tw->
Niteration > 20 ) {
1348 int pp = ReDoQueue[0];
1349 message(1,
"Remaining i=%d, t %d, pos %g %g %g, hsml: %g\n", pp,
P[pp].Type,
P[pp].Pos[0],
P[pp].Pos[1],
P[pp].Pos[2],
P[pp].Hsml);
1354 endrun(1155,
"failed to converge density for %ld particles\n", ntot);
1366 ngb_narrow_down(
double *right,
double *left,
const double *radius,
const double *numNgb,
int maxcmpt,
int desnumngb,
int *closeidx,
double BoxSize)
1370 double ngbdist = fabs(numNgb[0] - desnumngb);
1371 for(j = 1; j < maxcmpt; j++){
1372 double newdist = fabs(numNgb[j] - desnumngb);
1373 if(newdist < ngbdist){
1381 for(j = 0; j < maxcmpt; j++){
1382 if(numNgb[j] < desnumngb)
1384 if(numNgb[j] > desnumngb){
1390 double hsml = radius[close];
1392 if(*right > 0.99 * BoxSize){
1394 if(maxcmpt > 1 && (radius[maxcmpt-1]>radius[maxcmpt-2]))
1395 dngbdv = (numNgb[maxcmpt-1]-numNgb[maxcmpt-2])/(pow(radius[maxcmpt-1],3) - pow(radius[maxcmpt-2],3));
1397 double newhsml = 4 * hsml;
1399 double dngb = (desnumngb - numNgb[maxcmpt-1]);
1400 double newvolume = pow(hsml,3) + dngb / dngbdv;
1401 if(pow(newvolume, 1./3) < newhsml)
1402 newhsml = pow(newvolume, 1./3);
1412 if(radius[1] > radius[0])
1413 dngbdv = (numNgb[1] - numNgb[0]) / (pow(radius[1],3) - pow(radius[0],3));
1415 if(maxcmpt == 1 && radius[0] > 0)
1416 dngbdv = numNgb[0] / pow(radius[0],3);
1419 double dngb = desnumngb - numNgb[0];
1420 double newvolume = pow(hsml,3) + dngb / dngbdv;
1421 hsml = pow(newvolume, 1./3);
void message(int where, const char *fmt,...)
void endrun(int where, const char *fmt,...)
int force_tree_allocated(const ForceTree *tree)
int force_get_father(int no, const ForceTree *tree)
#define PARTICLE_NODE_TYPE
static int node_is_particle(int no, const ForceTree *tree)
static int node_is_pseudo_particle(int no, const ForceTree *tree)
#define ta_malloc2(name, type, nele)
#define mymalloc(name, size)
#define ta_malloc(name, type, nele)
#define myrealloc(ptr, size)
#define mymalloc2(name, size)
#define report_memory_usage(x)
#define mymalloc_freebytes()
int param_get_int(ParameterSet *ps, const char *name)
struct part_manager_type PartManager[1]
#define NEAREST(x, BoxSize)
struct topleaf_data * TopLeaves
size_t NThisParticleExport
enum NgbTreeFindSymmetric symmetric
int NodeList[NODELISTLENGTH]
TreeWalkHasWorkFunction haswork
size_t ngbiter_type_elsize
TreeWalkProcessFunction preprocess
TreeWalkProcessFunction postprocess
TreeWalkNgbIterFunction ngbiter
int work_set_stolen_from_active
TreeWalkReduceResultFunction reduce
TreeWalkVisitFunction visit
size_t result_type_elsize
TreeWalkFillQueryFunction fill
int NodeList[NODELISTLENGTH]
void gadget_setup_thread_arrays(int *dest, int *srcs[], size_t sizes[], size_t total_size, int narrays)
int MPI_Alltoallv_sparse(void *sendbuf, int *sendcnts, int *sdispls, MPI_Datatype sendtype, void *recvbuf, int *recvcnts, int *rdispls, MPI_Datatype recvtype, MPI_Comm comm)
double timediff(double t0, double t1)
size_t gadget_compact_thread_arrays(int *dest, int *srcs[], size_t sizes[], int narrays)
static int atomic_fetch_and_add(int *ptr, int value)
static void ev_begin(TreeWalk *tw, int *active_set, const size_t size)
static void ev_free_threadlocals(struct TreeWalkThreadLocals local_exports)
static struct data_nodelist * DataNodeList
int treewalk_visit_ngbiter(TreeWalkQueryBase *I, TreeWalkResultBase *O, LocalTreeWalk *lv)
static void ev_finish(TreeWalk *tw)
int treewalk_visit_nolist_ngbiter(TreeWalkQueryBase *I, TreeWalkResultBase *O, LocalTreeWalk *lv)
static void ev_reduce_result(const struct SendRecvBuffer sndrcv, TreeWalk *tw)
void treewalk_run(TreeWalk *tw, int *active_set, size_t size)
static void ev_primary(TreeWalk *tw)
static int data_index_compare(const void *a, const void *b)
static int cull_node(const TreeWalkQueryBase *const I, const TreeWalkNgbIterBase *const iter, const struct NODE *const current, const double BoxSize)
void treewalk_build_queue(TreeWalk *tw, int *active_set, const size_t size, int may_have_garbage)
static void ev_init_thread(const struct TreeWalkThreadLocals local_exports, TreeWalk *const tw, LocalTreeWalk *lv)
double ngb_narrow_down(double *right, double *left, const double *radius, const double *numNgb, int maxcmpt, int desnumngb, int *closeidx, double BoxSize)
static void ev_communicate(void *sendbuf, void *recvbuf, size_t elsize, const struct SendRecvBuffer sndrcv, int import)
static void ev_secondary(TreeWalk *tw)
void treewalk_do_hsml_loop(TreeWalk *tw, int *queue, int64_t queuesize, int update_hsml)
static TreeWalk * GDB_current_ev
static int real_ev(struct TreeWalkThreadLocals local_exports, TreeWalk *tw, size_t *dataindexoffset, size_t *nexports, int *currentIndex)
static struct SendRecvBuffer ev_get_remote(TreeWalk *tw)
static struct data_index * DataIndexTable
static int ngb_treefind_threads(TreeWalkQueryBase *I, TreeWalkResultBase *O, TreeWalkNgbIterBase *iter, int startnode, LocalTreeWalk *lv)
static void treewalk_reduce_result(TreeWalk *tw, TreeWalkResultBase *result, int i, enum TreeWalkReduceMode mode)
static int ImportBufferBoost
int treewalk_export_particle(LocalTreeWalk *lv, int no)
void set_treewalk_params(ParameterSet *ps)
static int data_index_compare_by_index(const void *a, const void *b)
static int ev_ndone(TreeWalk *tw)
static void treewalk_init_result(TreeWalk *tw, TreeWalkResultBase *result, TreeWalkQueryBase *query)
static void treewalk_init_query(TreeWalk *tw, TreeWalkQueryBase *query, int i, int *NodeList)
static struct TreeWalkThreadLocals ev_alloc_threadlocals(TreeWalk *tw, const int NTask, const int NumThreads)