diff --git a/x10.runtime/src-cpp/x10aux/network.cc b/x10.runtime/src-cpp/x10aux/network.cc index b97bc01f3b..ec4486a84c 100644 --- a/x10.runtime/src-cpp/x10aux/network.cc +++ b/x10.runtime/src-cpp/x10aux/network.cc @@ -460,13 +460,10 @@ void *x10aux::coll_enter() { return fs; } -void x10aux::coll_handler(void *arg) { - x10::xrx::FinishState* fs = (x10::xrx::FinishState*)arg; - fs->notifyActivityTermination(); -} -//used with ULFM, called only when a collective has failed due to a process failure -void x10aux::failed_coll_handler(void *arg) { +void x10aux::coll_handler(void *arg, bool throwDPE) { x10::xrx::FinishState* fs = (x10::xrx::FinishState*)arg; + if (throwDPE) //triggered only by ULFM when native MPI collectives fail + fs->pushException(x10::lang::DeadPlaceException::_make(x10::lang::String::Lit("[Native] Team contains at least one dead member"))); fs->notifyActivityTermination(); } diff --git a/x10.runtime/src-cpp/x10aux/network.h b/x10.runtime/src-cpp/x10aux/network.h index 4e8aad5b98..1c5f164366 100644 --- a/x10.runtime/src-cpp/x10aux/network.h +++ b/x10.runtime/src-cpp/x10aux/network.h @@ -199,12 +199,10 @@ namespace x10aux { // teams void *coll_enter(); - void coll_handler(void *arg); + void coll_handler(void *arg, bool throwDPE); void *coll_enter2(void *arg); void coll_handler2(x10rt_team t, void *arg); - void failed_coll_handler(void *arg); - void register_place_removed_handler(x10::lang::VoidFun_0_1* body_fun); void notify_place_death(unsigned int place); } diff --git a/x10.runtime/src-java/x10/x10rt/TeamSupport.java b/x10.runtime/src-java/x10/x10rt/TeamSupport.java index 6decf1a09c..6f66e61867 100644 --- a/x10.runtime/src-java/x10/x10rt/TeamSupport.java +++ b/x10.runtime/src-java/x10/x10rt/TeamSupport.java @@ -140,9 +140,8 @@ public static void nativeScatter(int id, int role, int root, Rail src, int sr } } - public static boolean nativeBcast(int id, int role, int root, Rail src, int src_off, + public static void nativeBcast(int id, int role, int root, Rail src, int src_off, Rail dst, int dst_off, int count) { - boolean success = true; if (!X10RT.forceSinglePlace) { int typeCode = getTypeCode(src); assert getTypeCode(dst) == typeCode : "Incompatible src and dst arrays"; @@ -153,12 +152,11 @@ public static boolean nativeBcast(int id, int role, int root, Rail src, int s FinishState fs = ActivityManagement.activityCreationBookkeeping(); try { - success =nativeBcastImpl(id, role, root, srcRaw, src_off, dstRaw, dst_off, count, typeCode, fs); + nativeBcastImpl(id, role, root, srcRaw, src_off, dstRaw, dst_off, count, typeCode, fs); } catch (UnsatisfiedLinkError e) { aboutToDie("nativeBcast"); } } - return success; } public static void nativeAllToAll(int id, int role, Rail src, int src_off, @@ -199,9 +197,8 @@ public static void nativeReduce(int id, int role, int root, Rail src, int src } } - public static boolean nativeAllReduce(int id, int role, Rail src, int src_off, + public static void nativeAllReduce(int id, int role, Rail src, int src_off, Rail dst, int dst_off, int count, int op) { - boolean success = true; if (!X10RT.forceSinglePlace) { int typeCode = getTypeCode(src); Object srcRaw = typeCode == RED_TYPE_COMPLEX ? copyComplexToNewDouble(src, src_off, count) : src.getBackingArray(); @@ -212,12 +209,11 @@ public static boolean nativeAllReduce(int id, int role, Rail src, int src_off FinishState fs = ActivityManagement.activityCreationBookkeeping(); try { - success = nativeAllReduceImpl(id, role, srcRaw, src_off, dstRaw, dst_off, count, op, typeCode, fs); + nativeAllReduceImpl(id, role, srcRaw, src_off, dstRaw, dst_off, count, op, typeCode, fs); } catch (UnsatisfiedLinkError e) { aboutToDie("nativeAllReduce"); } } - return success; } public static void nativeIndexOfMax(int id, int role, Rail src, @@ -296,7 +292,7 @@ private static native void nativeScatterImpl(int id, int role, int root, Object Object dstRaw, int dst_off, int count, int typecode, FinishState fs); - private static native Boolean nativeBcastImpl(int id, int role, int root, Object srcRaw, int src_off, + private static native void nativeBcastImpl(int id, int role, int root, Object srcRaw, int src_off, Object dstRaw, int dst_off, int count, int typecode, FinishState fs); @@ -308,7 +304,7 @@ private static native void nativeReduceImpl(int id, int role, int root, Object s Object dstRaw, int dst_off, int count, int op, int typecode, FinishState fs); - private static native Boolean nativeAllReduceImpl(int id, int role, Object srcRaw, int src_off, + private static native void nativeAllReduceImpl(int id, int role, Object srcRaw, int src_off, Object dstRaw, int dst_off, int count, int op, int typecode, FinishState fs); diff --git a/x10.runtime/src-x10/x10/util/Team.x10 b/x10.runtime/src-x10/x10/util/Team.x10 index 1dbd05bd1c..f49c4caacc 100644 --- a/x10.runtime/src-x10/x10/util/Team.x10 +++ b/x10.runtime/src-x10/x10/util/Team.x10 @@ -7,6 +7,7 @@ * http://www.opensource.org/licenses/eclipse-1.0.php * * (C) Copyright IBM Corporation 2006-2016. + * (C) Copyright Sara Salem Hamouda 2018. */ package x10.util; @@ -213,9 +214,7 @@ public struct Team { finish nativeAgree(id, id==0n?here.id() as Int:Team.roles(id), src, dst); else if (collectiveSupportLevel == X10RT_COLL_ALLBLOCKINGCOLLECTIVES || collectiveSupportLevel == X10RT_COLL_NONBLOCKINGBARRIER) { barrierIgnoreExceptions(); - val success = nativeAgree(id, id==0n?here.id() as Int:Team.roles(id), src, dst); - if (!success) - throw new DeadPlaceException("[Native] Team "+id+" contains at least one dead member"); + finish nativeAgree(id, id==0n?here.id() as Int:Team.roles(id), src, dst); } } else { throw new UnsupportedOperationException("Emulated agreement not supported"); @@ -223,9 +222,10 @@ public struct Team { return dst(0); } - //TODO: support Java - @Native("c++", "x10rt_agree(#id, #role, #src->raw, #dst->raw, ::x10aux::failed_coll_handler, ::x10aux::coll_handler, ::x10aux::coll_enter())") - private static def nativeAgree (id:Int, role:Int, src:Rail[Int], dst:Rail[Int]) :Boolean = false; + private static def nativeAgree (id:Int, role:Int, src:Rail[Int], dst:Rail[Int]) : void { + //FIXME: support Java + @Native("c++", "x10rt_agree(id, role, src->raw, dst->raw, ::x10aux::coll_handler, ::x10aux::coll_enter());") {} + } /** Blocks until all members have received their part of root's array. * Each member receives a contiguous and distinct portion of the src array. @@ -253,7 +253,7 @@ public struct Team { finish nativeScatter(id, id==0n?here.id() as Int:Team.roles(id), root.id() as Int, src, src_off as Int, dst, dst_off as Int, count as Int); else if (collectiveSupportLevel == X10RT_COLL_ALLBLOCKINGCOLLECTIVES || collectiveSupportLevel == X10RT_COLL_NONBLOCKINGBARRIER) { barrierIgnoreExceptions(); - nativeScatter(id, id==0n?here.id() as Int:Team.roles(id), root.id() as Int, src, src_off as Int, dst, dst_off as Int, count as Int); + finish nativeScatter(id, id==0n?here.id() as Int:Team.roles(id), root.id() as Int, src, src_off as Int, dst, dst_off as Int, count as Int); } else state(id).collective_impl[T](LocalTeamState.COLL_SCATTER, root, src, src_off, dst, dst_off, count, 0n, null, null); @@ -301,22 +301,20 @@ public struct Team { finish nativeScatterv(id, my_role, root.id() as Int, src, src_off as Int, scounts, soffsets, dst, dst_off as Int); else if (collectiveSupportLevel == X10RT_COLL_ALLBLOCKINGCOLLECTIVES || collectiveSupportLevel == X10RT_COLL_NONBLOCKINGBARRIER) { barrierIgnoreExceptions(); - val success = nativeScatterv(id, my_role, root.id() as Int, src, src_off as Int, scounts, soffsets, dst, dst_off as Int); - if (!success) - throw new DeadPlaceException("[Native] Team "+id+" contains at least one dead member"); + finish nativeScatterv(id, my_role, root.id() as Int, src, src_off as Int, scounts, soffsets, dst, dst_off as Int); } else{ state(id).collective_impl[T](LocalTeamState.COLL_SCATTERV, root, src, src_off, dst, dst_off, 0n, 0n, soffsets, scounts); } } - //TODO: not supported for Java - //@Native("java", "x10.x10rt.TeamSupport.nativeScatterv(id, role, root, ...);") - @Native("c++", "x10rt_scatterv(#id, #role, #root, #src->raw, #soffsets->raw, #scounts->raw, &(#dst)->raw[#dst_off], #scounts->raw[#role], sizeof(TPMGL(T)), ::x10aux::failed_coll_handler, ::x10aux::coll_handler, ::x10aux::coll_enter())") - private static def nativeScatterv[T] (id:Int, role:Int, root:Int, src:Rail[T], src_off:Int, scounts:Rail[Int], soffsets:Rail[Int], dst:Rail[T], dst_off:Int):Boolean = false; - + private static def nativeScatterv[T] (id:Int, role:Int, root:Int, src:Rail[T], src_off:Int, scounts:Rail[Int], soffsets:Rail[Int], dst:Rail[T], dst_off:Int) : void { + //FIXME: support Java + //@Native("java", "x10.x10rt.TeamSupport.nativeScatterv(id, role, root, ...);") + @Native("c++", "x10rt_scatterv(id, role, root, src->raw, soffsets->raw, scounts->raw, &dst->raw[dst_off], scounts->raw[role], sizeof(TPMGL(T)), ::x10aux::coll_handler, ::x10aux::coll_enter());") {} + } - //TODO: not supported for Java or PAMI + //FIXME: support Java and PAMI public def gather[T] (root:Place, src:Rail[T], src_off:Long, dst:Rail[T], dst_off:Long, count:Long) : void { if (CompilerFlags.checkBounds() && here == root) checkBounds(dst_off + (size() * count) -1, dst.size); checkBounds(src_off+count-1, src.size); @@ -324,13 +322,13 @@ public struct Team { finish nativeGather(id, id==0n?here.id() as Int:Team.roles(id), root.id() as Int, src, src_off as Int, dst, dst_off as Int, count as Int); else if (collectiveSupportLevel == X10RT_COLL_ALLBLOCKINGCOLLECTIVES || collectiveSupportLevel == X10RT_COLL_NONBLOCKINGBARRIER) { barrierIgnoreExceptions(); - nativeGather(id, id==0n?here.id() as Int:Team.roles(id), root.id() as Int, src, src_off as Int, dst, dst_off as Int, count as Int); + finish nativeGather(id, id==0n?here.id() as Int:Team.roles(id), root.id() as Int, src, src_off as Int, dst, dst_off as Int, count as Int); } else state(id).collective_impl[T](LocalTeamState.COLL_GATHER, root, src, src_off, dst, dst_off, count, 0n, null, null); } - //TODO: not supported for Java or PAMI + //FIXME: support Java and PAMI private static def nativeGather[T] (id:Int, role:Int, root:Int, src:Rail[T], src_off:Int, dst:Rail[T], dst_off:Int, count:Int) : void { //@Native("java", "x10.x10rt.TeamSupport.nativeGather(id, role, root, src, src_off, dst, dst_off, count);") @Native("c++", "x10rt_gather(id, role, root, &src->raw[src_off], &dst->raw[dst_off], sizeof(TPMGL(T)), count, ::x10aux::coll_handler, ::x10aux::coll_enter());") {} @@ -368,20 +366,19 @@ public struct Team { finish nativeGatherv(id, my_role, root.id() as Int, src, src_off as Int, dst, dst_off as Int, dcounts, doffsets); else if (collectiveSupportLevel == X10RT_COLL_ALLBLOCKINGCOLLECTIVES || collectiveSupportLevel == X10RT_COLL_NONBLOCKINGBARRIER) { barrierIgnoreExceptions(); - val success = nativeGatherv(id, my_role, root.id() as Int, src, src_off as Int, dst, dst_off as Int, dcounts, doffsets); - if (!success) - throw new DeadPlaceException("[Native] Team "+id+" contains at least one dead member"); + finish nativeGatherv(id, my_role, root.id() as Int, src, src_off as Int, dst, dst_off as Int, dcounts, doffsets); } else{ state(id).collective_impl[T](LocalTeamState.COLL_GATHERV, root, src, src_off, dst, dst_off, 0n, 0n, doffsets, dcounts); } } - //TODO: not supported for Java - //@Native("java", "x10.x10rt.TeamSupport.nativeGatherv(id, role, root, ...);") - @Native("c++", "x10rt_gatherv(#id, #role, #root, &(#src)->raw[#src_off], #dcounts->raw[#role], #dst->raw, #doffsets->raw, #dcounts->raw, sizeof(TPMGL(T)), ::x10aux::failed_coll_handler, ::x10aux::coll_handler, ::x10aux::coll_enter())") - private static def nativeGatherv[T] (id:Int, role:Int, root:Int, src:Rail[T], src_off:Int, dst:Rail[T], dst_off:Int, dcounts:Rail[Int], doffsets:Rail[Int]) : Boolean = false; - + private static def nativeGatherv[T] (id:Int, role:Int, root:Int, src:Rail[T], src_off:Int, dst:Rail[T], dst_off:Int, dcounts:Rail[Int], doffsets:Rail[Int]) : void { + //FIXME: support Java + //@Native("java", "x10.x10rt.TeamSupport.nativeGatherv(id, role, root, ...);") + @Native("c++", "x10rt_gatherv(id, role, root, &src->raw[src_off], dcounts->raw[role], dst->raw, doffsets->raw, dcounts->raw, sizeof(TPMGL(T)), ::x10aux::coll_handler, ::x10aux::coll_enter());") {} + } + /** Blocks until all members have received root's array. * * @param root The member who is supplying the data @@ -403,17 +400,16 @@ public struct Team { finish nativeBcast(id, id==0n?here.id() as Int:Team.roles(id), root.id() as Int, src, src_off as Int, dst, dst_off as Int, count as Int); else if (collectiveSupportLevel == X10RT_COLL_ALLBLOCKINGCOLLECTIVES || collectiveSupportLevel == X10RT_COLL_NONBLOCKINGBARRIER) { barrierIgnoreExceptions(); - val success = nativeBcast(id, id==0n?here.id() as Int:Team.roles(id), root.id() as Int, src, src_off as Int, dst, dst_off as Int, count as Int); - if (!success) - throw new DeadPlaceException("[Native] Team "+id+" contains at least one dead member"); + finish nativeBcast(id, id==0n?here.id() as Int:Team.roles(id), root.id() as Int, src, src_off as Int, dst, dst_off as Int, count as Int); } else state(id).collective_impl[T](LocalTeamState.COLL_BROADCAST, root, src, src_off, dst, dst_off, count, 0n, null, null); } - @Native("java", "x10.x10rt.TeamSupport.nativeBcast(#id, #role, #root, #src, #src_off, #dst, #dst_off, #count)") - @Native("c++", "x10rt_bcast(#id, #role, #root, &(#src)->raw[#src_off], &(#dst)->raw[#dst_off], sizeof(TPMGL(T)), #count, ::x10aux::failed_coll_handler, ::x10aux::coll_handler, ::x10aux::coll_enter())") - private static def nativeBcast[T] (id:Int, role:Int, root:Int, src:Rail[T], src_off:Int, dst:Rail[T], dst_off:Int, count:Int) : Boolean = false; + private static def nativeBcast[T] (id:Int, role:Int, root:Int, src:Rail[T], src_off:Int, dst:Rail[T], dst_off:Int, count:Int) : void { + @Native("java", "x10.x10rt.TeamSupport.nativeBcast(id, role, root, src, src_off, dst, dst_off, count);") + @Native("c++", "x10rt_bcast(id, role, root, &src->raw[src_off], &dst->raw[dst_off], sizeof(TPMGL(T)), count, ::x10aux::coll_handler, ::x10aux::coll_enter());") {} + } /** Blocks until all members have received their part of each other member's array. * Each member receives a contiguous and distinct portion of the src array. @@ -445,7 +441,7 @@ public struct Team { if (DEBUG) Runtime.println(here + " entering pre-alltoall barrier of team "+id); barrierIgnoreExceptions(); if (DEBUG) Runtime.println(here + " entering native alltoall of team "+id); - nativeAlltoall(id, id==0n?here.id() as Int:Team.roles(id), src, src_off as Int, dst, dst_off as Int, count as Int); + finish nativeAlltoall(id, id==0n?here.id() as Int:Team.roles(id), src, src_off as Int, dst, dst_off as Int, count as Int); } // XTENLANG-3434 X10 alltoall is broken /* @@ -554,7 +550,7 @@ public struct Team { if (DEBUG) Runtime.println(here + " entering pre-reduce barrier on team "+id); barrierIgnoreExceptions(); if (DEBUG) Runtime.println(here + " entering native reduce on team "+id); - nativeReduce(id, id==0n?here.id() as Int:Team.roles(id), root.id() as Int, src, src_off as Int, dst, dst_off as Int, count as Int, op); + finish nativeReduce(id, id==0n?here.id() as Int:Team.roles(id), root.id() as Int, src, src_off as Int, dst, dst_off as Int, count as Int, op); if (DEBUG) Runtime.println(here + " Finished native reduce on team "+id); } else { state(id).collective_impl[T](LocalTeamState.COLL_REDUCE, root, src, src_off, dst, dst_off, count, op, null, null); @@ -728,9 +724,7 @@ public struct Team { if (DEBUG) Runtime.println(here + " entering pre-allreduce barrier on team "+id); barrierIgnoreExceptions(); if (DEBUG) Runtime.println(here + " entering native allreduce on team "+id); - val success = nativeAllreduce(id, id==0n?here.id() as Int:Team.roles(id), src, src_off as Int, dst, dst_off as Int, count as Int, op); - if (!success) - throw new DeadPlaceException("[Native] Team "+id+" contains at least one dead member"); + finish nativeAllreduce(id, id==0n?here.id() as Int:Team.roles(id), src, src_off as Int, dst, dst_off as Int, count as Int, op); } else { if (DEBUG) Runtime.println(here + " entering Team.x10 allreduce on team "+id); state(id).collective_impl[T](LocalTeamState.COLL_ALLREDUCE, state(id).places(0), src, src_off, dst, dst_off, count, op, null, null); @@ -738,9 +732,10 @@ public struct Team { if (DEBUG) Runtime.println(here + " Finished allreduce on team "+id); } - @Native("java", "x10.x10rt.TeamSupport.nativeAllReduce(#id, #role, #src, #src_off, #dst, #dst_off, #count, #op)") - @Native("c++", "x10rt_allreduce(#id, #role, &(#src)->raw[#src_off], &(#dst)->raw[#dst_off], (x10rt_red_op_type)(#op), x10rt_get_red_type(), #count, ::x10aux::failed_coll_handler, ::x10aux::coll_handler,::x10aux::coll_enter())") - private static def nativeAllreduce[T](id:Int, role:Int, src:Rail[T], src_off:Int, dst:Rail[T], dst_off:Int, count:Int, op:Int):Boolean = false; + private static def nativeAllreduce[T](id:Int, role:Int, src:Rail[T], src_off:Int, dst:Rail[T], dst_off:Int, count:Int, op:Int) : void { + @Native("java", "x10.x10rt.TeamSupport.nativeAllReduce(id, role, src, src_off, dst, dst_off, count, op);") + @Native("c++", "x10rt_allreduce(id, role, &src->raw[src_off], &dst->raw[dst_off], (x10rt_red_op_type)op, x10rt_get_red_type(), count, ::x10aux::coll_handler, ::x10aux::coll_enter());") {} + } /** Performs a reduction on a single value, returning the result */ public def allreduce (src:Boolean, op:Int):Boolean { @@ -829,7 +824,7 @@ public struct Team { private static def nativeAllreduce[T](id:Int, role:Int, src:Rail[T], dst:Rail[T], op:Int) : void { @Native("java", "x10.x10rt.TeamSupport.nativeAllReduce(id, role, src, 0, dst, 0, 1, op);") - @Native("c++", "x10rt_allreduce(id, role, src->raw, dst->raw, (x10rt_red_op_type)op, x10rt_get_red_type(), 1, ::x10aux::failed_coll_handler, ::x10aux::coll_handler, ::x10aux::coll_enter());") {} + @Native("c++", "x10rt_allreduce(id, role, src->raw, dst->raw, (x10rt_red_op_type)op, x10rt_get_red_type(), 1, ::x10aux::coll_handler, ::x10aux::coll_enter());") {} } /** This operation blocks until all members have received the computed result. @@ -847,7 +842,7 @@ public struct Team { finish nativeIndexOfMax(id, id==0n?here.id() as Int:Team.roles(id), src, dst); else if (collectiveSupportLevel == X10RT_COLL_ALLBLOCKINGCOLLECTIVES || collectiveSupportLevel == X10RT_COLL_NONBLOCKINGBARRIER) { barrierIgnoreExceptions(); - nativeIndexOfMax(id, id==0n?here.id() as Int:Team.roles(id), src, dst); + finish nativeIndexOfMax(id, id==0n?here.id() as Int:Team.roles(id), src, dst); } else state(id).collective_impl[DoubleIdx](LocalTeamState.COLL_INDEXOFMAX, state(id).places(0), src, 0, dst, 0, 1, 0n, null, null); @@ -856,7 +851,7 @@ public struct Team { private static def nativeIndexOfMax(id:Int, role:Int, src:Rail[DoubleIdx], dst:Rail[DoubleIdx]) : void { @Native("java", "x10.x10rt.TeamSupport.nativeIndexOfMax(id, role, src, dst);") - @Native("c++", "x10rt_allreduce(id, role, src->raw, dst->raw, X10RT_RED_OP_MAX, X10RT_RED_TYPE_DBL_S32, 1, ::x10aux::failed_coll_handler, ::x10aux::coll_handler, ::x10aux::coll_enter());") {} + @Native("c++", "x10rt_allreduce(id, role, src->raw, dst->raw, X10RT_RED_OP_MAX, X10RT_RED_TYPE_DBL_S32, 1, ::x10aux::coll_handler, ::x10aux::coll_enter());") {} } /** This operation blocks until all members have received the computed result. @@ -874,7 +869,7 @@ public struct Team { finish nativeIndexOfMin(id, id==0n?here.id() as Int:Team.roles(id), src, dst); else if (collectiveSupportLevel == X10RT_COLL_ALLBLOCKINGCOLLECTIVES || collectiveSupportLevel == X10RT_COLL_NONBLOCKINGBARRIER) { barrierIgnoreExceptions(); - nativeIndexOfMin(id, id==0n?here.id() as Int:Team.roles(id), src, dst); + finish nativeIndexOfMin(id, id==0n?here.id() as Int:Team.roles(id), src, dst); } else state(id).collective_impl[DoubleIdx](LocalTeamState.COLL_INDEXOFMIN, state(id).places(0), src, 0, dst, 0, 1, 0n, null, null); @@ -883,7 +878,7 @@ public struct Team { private static def nativeIndexOfMin(id:Int, role:Int, src:Rail[DoubleIdx], dst:Rail[DoubleIdx]) : void { @Native("java", "x10.x10rt.TeamSupport.nativeIndexOfMin(id, role, src, dst);") - @Native("c++", "x10rt_allreduce(id, role, src->raw, dst->raw, X10RT_RED_OP_MIN, X10RT_RED_TYPE_DBL_S32, 1, ::x10aux::failed_coll_handler, ::x10aux::coll_handler, ::x10aux::coll_enter());") {} + @Native("c++", "x10rt_allreduce(id, role, src->raw, dst->raw, X10RT_RED_OP_MIN, X10RT_RED_TYPE_DBL_S32, 1, ::x10aux::coll_handler, ::x10aux::coll_enter());") {} } /** Create new teams by subdividing an existing team. This is called by each member @@ -943,7 +938,7 @@ public struct Team { if (DEBUGINTERNALS) Runtime.println(here + " calling pre-native split barrier on team "+id+" color="+color+" new_role="+new_role); barrierIgnoreExceptions(); if (DEBUGINTERNALS) Runtime.println(here + " calling native split on team "+id+" color="+color+" new_role="+new_role); - nativeSplit(id, id==0n?here.id() as Int:Team.roles(id), color, new_role as Int, result); + finish nativeSplit(id, id==0n?here.id() as Int:Team.roles(id), color, new_role as Int, result); if (DEBUG) Runtime.println(here + " finished native split on team "+id+" color="+color+" new_role="+new_role); return Team(result(0), newTeamPlaceGroup, new_role); } @@ -968,7 +963,7 @@ public struct Team { finish nativeDel(id, id==0n?here.id() as Int:Team.roles(id)); else if (collectiveSupportLevel == X10RT_COLL_ALLBLOCKINGCOLLECTIVES || collectiveSupportLevel == X10RT_COLL_NONBLOCKINGBARRIER) { barrierIgnoreExceptions(); - nativeDel(id, id==0n?here.id() as Int:Team.roles(id)); + finish nativeDel(id, id==0n?here.id() as Int:Team.roles(id)); } // TODO - see if there is something useful to delete with the local team implementation } diff --git a/x10.runtime/x10rt/common/x10rt_emu_coll.cc b/x10.runtime/x10rt/common/x10rt_emu_coll.cc index b131c55ee6..01f4659598 100644 --- a/x10.runtime/x10rt/common/x10rt_emu_coll.cc +++ b/x10.runtime/x10rt/common/x10rt_emu_coll.cc @@ -406,7 +406,7 @@ void x10rt_emu_team_del (x10rt_team team, x10rt_place role, x10rt_completion_han { assert(gtdb[team]->placev[role] == x10rt_net_here()); gtdb.releaseTeam(team); - ch(arg); + ch(arg, false); } namespace { @@ -519,7 +519,7 @@ static void reduce_c_to_p_update_recv (const x10rt_msg_params *p) } m.reduce.rbuf = recv; if (m.reduce.started) { - m.reduce.ch(m.reduce.arg); + m.reduce.ch(m.reduce.arg, false); } //fprintf(stderr, "%d: Decrementing child from %d to %d\n", (int)role, (int) m.barrier.wait, (int) m.barrier.wait-1); m.barrier.childToReceive--; @@ -672,7 +672,7 @@ bool CollOp::progress (void) } safe_free(this); m.bcast.count = 0; // bcast completed - m.barrier.ch(m.barrier.arg); + m.barrier.ch(m.barrier.arg, false); return true; } } @@ -684,12 +684,12 @@ void CollOp::handlePendingReduce(MemberObj *m) { if (m->reduce.count > 0) { SYNCHRONIZED (global_lock); if (m->reduce.rbuf != NULL) { - m->reduce.ch(m->reduce.arg); + m->reduce.ch(m->reduce.arg, false); } if (m->reduce.rbuf2 != NULL) { m->reduce.rbuf = m->reduce.rbuf2; m->reduce.rbuf2 = NULL; - m->reduce.ch(m->reduce.arg); + m->reduce.ch(m->reduce.arg, false); } } } @@ -733,7 +733,7 @@ static void scatter_copy_recv (const x10rt_msg_params *p) m.scatter.data_done = true; if (m.scatter.barrier_done && m.scatter.ch != NULL) { PREEMPT (global_lock); - m.scatter.ch(m.scatter.arg); + m.scatter.ch(m.scatter.arg, false); } } @@ -744,7 +744,7 @@ namespace { }; } -static void scatter_after_barrier (void *arg) +static void scatter_after_barrier (void *arg, bool dummy) { MemberObj &m = *(static_cast(arg)); TeamObj &t = *gtdb[m.team]; @@ -764,7 +764,7 @@ static void scatter_after_barrier (void *arg) m2->scatter.data_done = true; if (m2->scatter.barrier_done && m2->scatter.ch != NULL) { PREEMPT (global_lock); - m2->scatter.ch(m2->scatter.arg); + m2->scatter.ch(m2->scatter.arg, false); } } else { // serialise all the data @@ -782,7 +782,7 @@ static void scatter_after_barrier (void *arg) // the barrier must have completed or we wouldn't even be here // signal completion to root role if (m.scatter.ch != NULL) { - m.scatter.ch(m.scatter.arg); + m.scatter.ch(m.scatter.arg, false); } } else { @@ -792,7 +792,7 @@ static void scatter_after_barrier (void *arg) m.scatter.barrier_done = true; if (m.scatter.data_done && m.scatter.ch != NULL) { PREEMPT (global_lock); - m.scatter.ch(m.scatter.arg); + m.scatter.ch(m.scatter.arg, false); } } } @@ -835,7 +835,6 @@ bool x10rt_emu_scatterv (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, const void *soffsets, const void *scounts, void *dbuf, size_t dcount, size_t el, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) { abort(); //not used by Team.x10 @@ -852,7 +851,6 @@ void x10rt_emu_gather (x10rt_team team, x10rt_place role, bool x10rt_emu_gatherv (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, size_t scount, void *dbuf, const void *doffsets, const void *dcounts, size_t el, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) { abort(); //not used by Team.x10 @@ -861,7 +859,6 @@ bool x10rt_emu_gatherv (x10rt_team team, x10rt_place role, x10rt_place root, bool x10rt_emu_bcast (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, void *dbuf, size_t el, size_t count, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) { TeamObj &t = *gtdb[team]; @@ -885,7 +882,7 @@ bool x10rt_emu_bcast (x10rt_team team, x10rt_place role, } -static void alltoall_intermediate (void *arg) +static void alltoall_intermediate (void *arg, bool dummy) { MemberObj &m = *(static_cast(arg)); @@ -1076,7 +1073,7 @@ namespace { }; template - void reduce3 (void *arg) + void reduce3 (void *arg, bool dummy) { MemberObj &m = *(static_cast(arg)); @@ -1234,7 +1231,7 @@ static void receive_new_team (x10rt_team new_team, void *arg) safe_free(m.split.newTeamPlaces); } -static void split (void *arg) +static void split (void *arg, bool dummy) { MemberObj &m = *(static_cast(arg)); TeamObj &t = *gtdb[m.team]; diff --git a/x10.runtime/x10rt/common/x10rt_front.cc b/x10.runtime/x10rt/common/x10rt_front.cc index f951e95e99..06c3e403d8 100644 --- a/x10.runtime/x10rt/common/x10rt_front.cc +++ b/x10.runtime/x10rt/common/x10rt_front.cc @@ -210,13 +210,12 @@ void x10rt_barrier (x10rt_team team, x10rt_place role, x10rt_lgl_barrier(team, role, ch, arg); } -bool x10rt_bcast (x10rt_team team, x10rt_place role, +void x10rt_bcast (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, void *dbuf, size_t el, size_t count, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) { - return x10rt_lgl_bcast(team, role, root, sbuf, dbuf, el, count, errch, ch, arg); + x10rt_lgl_bcast(team, role, root, sbuf, dbuf, el, count, ch, arg); } void x10rt_scatter (x10rt_team team, x10rt_place role, @@ -227,14 +226,13 @@ void x10rt_scatter (x10rt_team team, x10rt_place role, x10rt_lgl_scatter(team, role, root, sbuf, dbuf, el, count, ch, arg); } -bool x10rt_scatterv (x10rt_team team, x10rt_place role, +void x10rt_scatterv (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, const void *soffsets, const void *scounts, void *dbuf, size_t dcount, size_t el, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) { - return x10rt_lgl_scatterv(team, role, root, sbuf, soffsets, scounts, dbuf, dcount, el, errch, ch, arg); + x10rt_lgl_scatterv(team, role, root, sbuf, soffsets, scounts, dbuf, dcount, el, ch, arg); } @@ -246,13 +244,12 @@ void x10rt_gather (x10rt_team team, x10rt_place role, x10rt_lgl_gather (team, role, root, sbuf, dbuf, el, count, ch, arg); } -bool x10rt_gatherv (x10rt_team team, x10rt_place role, x10rt_place root, +void x10rt_gatherv (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, size_t scount, void *dbuf, const void *doffsets, const void *dcounts, size_t el, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) { - return x10rt_lgl_gatherv (team, role, root, sbuf, scount, dbuf, doffsets, dcounts, el, errch, ch, arg); + x10rt_lgl_gatherv (team, role, root, sbuf, scount, dbuf, doffsets, dcounts, el, ch, arg); } void x10rt_alltoall (x10rt_team team, x10rt_place role, @@ -273,26 +270,24 @@ void x10rt_reduce (x10rt_team team, x10rt_place role, x10rt_lgl_reduce(team, role, root, sbuf, dbuf, op, dtype, count, ch, arg); } -bool x10rt_allreduce (x10rt_team team, x10rt_place role, +void x10rt_allreduce (x10rt_team team, x10rt_place role, const void *sbuf, void *dbuf, x10rt_red_op_type op, x10rt_red_type dtype, size_t count, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) { - return x10rt_lgl_allreduce(team, role, sbuf, dbuf, op, dtype, count, errch, ch, arg); + x10rt_lgl_allreduce(team, role, sbuf, dbuf, op, dtype, count, ch, arg); } -bool x10rt_agree (x10rt_team team, x10rt_place role, +void x10rt_agree (x10rt_team team, x10rt_place role, const int *sbuf, int *dbuf, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) { - return x10rt_lgl_agree(team, role, sbuf, dbuf, errch, ch, arg); + x10rt_lgl_agree(team, role, sbuf, dbuf, ch, arg); } -void x10rt_one_setter (void *arg) +void x10rt_one_setter (void *arg, bool dummy) { *((int*)arg) = 1; } void x10rt_team_setter (x10rt_team v, void *arg) diff --git a/x10.runtime/x10rt/common/x10rt_internal.h b/x10.runtime/x10rt/common/x10rt_internal.h index 208e1e7495..90313fa3fa 100644 --- a/x10.runtime/x10rt/common/x10rt_internal.h +++ b/x10.runtime/x10rt/common/x10rt_internal.h @@ -151,7 +151,6 @@ X10RT_C void x10rt_emu_barrier (x10rt_team team, x10rt_place role, X10RT_C bool x10rt_emu_bcast (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, void *dbuf, size_t el, size_t count, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg); X10RT_C void x10rt_emu_scatter (x10rt_team team, x10rt_place role, @@ -163,7 +162,6 @@ X10RT_C bool x10rt_emu_scatterv (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, const void *soffsets, const void *scounts, void *dbuf, size_t dcount, size_t el, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg); X10RT_C void x10rt_emu_gather (x10rt_team team, x10rt_place role, @@ -175,7 +173,6 @@ X10RT_C bool x10rt_emu_gatherv (x10rt_team team, x10rt_place role, x10rt_place r const void *sbuf, size_t scount, void *dbuf, const void *doffsets, const void *dcounts, size_t el, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg); X10RT_C void x10rt_emu_alltoall (x10rt_team team, x10rt_place role, diff --git a/x10.runtime/x10rt/common/x10rt_logical.cc b/x10.runtime/x10rt/common/x10rt_logical.cc index 6504321484..6d0ab66277 100644 --- a/x10.runtime/x10rt/common/x10rt_logical.cc +++ b/x10.runtime/x10rt/common/x10rt_logical.cc @@ -101,7 +101,7 @@ static x10rt_error fatal (const char *format, ...) x10rt_stats x10rt_lgl_stats; -static void one_setter (void *arg) +static void one_setter (void *arg, bool throwDPE) { *((int*)arg) = 1; } const char *x10rt_lgl_error_msg (void) { @@ -1069,19 +1069,17 @@ void x10rt_lgl_barrier (x10rt_team team, x10rt_place role, } } -bool x10rt_lgl_bcast (x10rt_team team, x10rt_place role, +void x10rt_lgl_bcast (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, void *dbuf, size_t el, size_t count, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) { - ESCAPE_IF_ERR_BOOL; + ESCAPE_IF_ERR; if (has_collectives >= X10RT_COLL_ALLBLOCKINGCOLLECTIVES) { - return x10rt_net_bcast(team, role, root, sbuf, dbuf, el, count, errch, ch, arg); + x10rt_net_bcast(team, role, root, sbuf, dbuf, el, count, ch, arg); } else { - x10rt_emu_bcast(team, role, root, sbuf, dbuf, el, count, errch, ch, arg); + x10rt_emu_bcast(team, role, root, sbuf, dbuf, el, count, ch, arg); while (x10rt_emu_coll_probe()); - return true; //TODO: should not always return true, but x10rt_emu_bcast is not used in Team.x10 } } @@ -1099,33 +1097,28 @@ void x10rt_lgl_scatter (x10rt_team team, x10rt_place role, } } -bool x10rt_lgl_scatterv (x10rt_team team, x10rt_place role, +void x10rt_lgl_scatterv (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, const void *soffsets, const void *scounts, void *dbuf, size_t dcount, size_t el, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) { - ESCAPE_IF_ERR_BOOL; + ESCAPE_IF_ERR; if (has_collectives >= X10RT_COLL_ALLBLOCKINGCOLLECTIVES) { - return x10rt_net_scatterv(team, role, root, sbuf, soffsets, scounts, dbuf, dcount, el, errch, ch, arg); + x10rt_net_scatterv(team, role, root, sbuf, soffsets, scounts, dbuf, dcount, el, ch, arg); } else { - x10rt_emu_scatterv(team, role, root, sbuf, soffsets, scounts, dbuf, dcount, el, errch, ch, arg); + x10rt_emu_scatterv(team, role, root, sbuf, soffsets, scounts, dbuf, dcount, el, ch, arg); while (x10rt_emu_coll_probe()); - return true; //TODO: should not always return true, but x10rt_emu_scatterv is not used in Team.x10 } } -bool x10rt_lgl_agree (x10rt_team team, x10rt_place role, +void x10rt_lgl_agree (x10rt_team team, x10rt_place role, const int *sbuf, int *dbuf, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) { - ESCAPE_IF_ERR_BOOL; + ESCAPE_IF_ERR; if (x10rt_lgl_agreement_support()) { - return x10rt_net_agree(team, role, sbuf, dbuf, errch, ch, arg); - } else { - return false; + x10rt_net_agree(team, role, sbuf, dbuf, ch, arg); } } @@ -1143,20 +1136,18 @@ void x10rt_lgl_gather (x10rt_team team, x10rt_place role, } } -bool x10rt_lgl_gatherv (x10rt_team team, x10rt_place role, x10rt_place root, +void x10rt_lgl_gatherv (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, size_t scount, void *dbuf, const void *doffsets, const void *dcounts, size_t el, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) { - ESCAPE_IF_ERR_BOOL; + ESCAPE_IF_ERR; if (has_collectives >= X10RT_COLL_ALLBLOCKINGCOLLECTIVES) { - return x10rt_net_gatherv(team, role, root, sbuf, scount, dbuf, doffsets, dcounts, el, errch, ch, arg); + x10rt_net_gatherv(team, role, root, sbuf, scount, dbuf, doffsets, dcounts, el, ch, arg); } else { - x10rt_emu_gatherv(team, role, root, sbuf, scount, dbuf, doffsets, dcounts, el, errch, ch, arg); + x10rt_emu_gatherv(team, role, root, sbuf, scount, dbuf, doffsets, dcounts, el, ch, arg); while (x10rt_emu_coll_probe()); - return true; //TODO: should not always return true, but x10rt_emu_gatherv is not used by Team.x10 } } @@ -1190,20 +1181,18 @@ void x10rt_lgl_reduce (x10rt_team team, x10rt_place role, } } -bool x10rt_lgl_allreduce (x10rt_team team, x10rt_place role, +void x10rt_lgl_allreduce (x10rt_team team, x10rt_place role, const void *sbuf, void *dbuf, x10rt_red_op_type op, x10rt_red_type dtype, size_t count, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) { - ESCAPE_IF_ERR_BOOL; + ESCAPE_IF_ERR; if (has_collectives >= X10RT_COLL_ALLBLOCKINGCOLLECTIVES) { - return x10rt_net_allreduce(team, role, sbuf, dbuf, op, dtype, count, errch, ch, arg); + x10rt_net_allreduce(team, role, sbuf, dbuf, op, dtype, count, ch, arg); } else { x10rt_emu_reduce(team, role, 0, sbuf, dbuf, op, dtype, count, ch, arg, true); while (x10rt_emu_coll_probe()); - return true; //TODO: should not always return true, but x10rt_emu_reduce is not used } } diff --git a/x10.runtime/x10rt/include/x10rt_front.h b/x10.runtime/x10rt/include/x10rt_front.h index 0800e40489..f9f470e440 100644 --- a/x10.runtime/x10rt/include/x10rt_front.h +++ b/x10.runtime/x10rt/include/x10rt_front.h @@ -733,10 +733,9 @@ X10RT_C void x10rt_barrier (x10rt_team team, x10rt_place role, * * \param arg User pointer that is passed to the completion handler */ -X10RT_C bool x10rt_bcast (x10rt_team team, x10rt_place role, +X10RT_C void x10rt_bcast (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, void *dbuf, size_t el, size_t count, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg); /** Asynchronously blocks until all members have received their part of root's array. Note that @@ -796,11 +795,10 @@ X10RT_C void x10rt_scatter (x10rt_team team, x10rt_place role, * * \param arg User pointer that is passed to the completion handler */ -X10RT_C bool x10rt_scatterv (x10rt_team team, x10rt_place role, +X10RT_C void x10rt_scatterv (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, const void *soffsets, const void *scounts, void *dbuf, size_t dcount, size_t el, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg); @@ -858,11 +856,10 @@ X10RT_C void x10rt_gather (x10rt_team team, x10rt_place role, * * \param arg User pointer that is passed to the completion handler */ -X10RT_C bool x10rt_gatherv (x10rt_team team, x10rt_place role, x10rt_place root, +X10RT_C void x10rt_gatherv (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, size_t scount, void *dbuf, const void *doffsets, const void *dcounts, size_t el, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg); /** Asynchronously blocks until all members have received their portion of data from each @@ -948,12 +945,11 @@ X10RT_C void x10rt_reduce (x10rt_team team, x10rt_place role, * * \param arg User pointer that is passed to the completion handler */ -X10RT_C bool x10rt_allreduce (x10rt_team team, x10rt_place role, +X10RT_C void x10rt_allreduce (x10rt_team team, x10rt_place role, const void *sbuf, void *dbuf, x10rt_red_op_type op, x10rt_red_type dtype, size_t count, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg); @@ -971,15 +967,14 @@ X10RT_C bool x10rt_allreduce (x10rt_team team, x10rt_place role, * * \param arg User pointer that is passed to the completion handler */ -X10RT_C bool x10rt_agree (x10rt_team team, x10rt_place role, +X10RT_C void x10rt_agree (x10rt_team team, x10rt_place role, const int *sbuf, int *dbuf, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg); /** Sets arg to 1. * \param arg Assumed to be an int* */ -X10RT_C void x10rt_one_setter (void *arg); +X10RT_C void x10rt_one_setter (void *arg, bool throwDPE); /** Sets arg to the given team. * \param v The new team is passed in here diff --git a/x10.runtime/x10rt/include/x10rt_logical.h b/x10.runtime/x10rt/include/x10rt_logical.h index d84a61c1d5..c85c3410e7 100644 --- a/x10.runtime/x10rt/include/x10rt_logical.h +++ b/x10.runtime/x10rt/include/x10rt_logical.h @@ -398,10 +398,9 @@ X10RT_C void x10rt_lgl_barrier (x10rt_team team, x10rt_place role, * \param ch As in #x10rt_bcast * \param arg As in #x10rt_bcast */ -X10RT_C bool x10rt_lgl_bcast (x10rt_team team, x10rt_place role, +X10RT_C void x10rt_lgl_bcast (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, void *dbuf, size_t el, size_t count, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg); /** \see #x10rt_scatter @@ -433,12 +432,11 @@ X10RT_C void x10rt_lgl_scatter (x10rt_team team, x10rt_place role, * \param ch As in #x10rt_scatterv * \param arg As in #x10rt_scatterv */ -X10RT_C bool x10rt_lgl_scatterv (x10rt_team team, x10rt_place role, +X10RT_C void x10rt_lgl_scatterv (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, const void *soffsets, const void *scounts, void *dbuf, size_t dcount, size_t el, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg); /** \see #x10rt_gather @@ -470,11 +468,10 @@ X10RT_C void x10rt_lgl_gather (x10rt_team team, x10rt_place role, * \param ch As in #x10rt_gatherv * \param arg As in #x10rt_gatherv */ -X10RT_C bool x10rt_lgl_gatherv (x10rt_team team, x10rt_place role, x10rt_place root, +X10RT_C void x10rt_lgl_gatherv (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, size_t scount, void *dbuf, const void *doffsets, const void *dcounts, size_t el, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg); /** \see #x10rt_alltoall @@ -521,12 +518,11 @@ X10RT_C void x10rt_lgl_reduce (x10rt_team team, x10rt_place role, * \param ch As in #x10rt_allreduce * \param arg As in #x10rt_allreduce */ -X10RT_C bool x10rt_lgl_allreduce (x10rt_team team, x10rt_place role, +X10RT_C void x10rt_lgl_allreduce (x10rt_team team, x10rt_place role, const void *sbuf, void *dbuf, x10rt_red_op_type op, x10rt_red_type dtype, size_t count, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg); /** \see #x10rt_agree @@ -537,9 +533,8 @@ X10RT_C bool x10rt_lgl_allreduce (x10rt_team team, x10rt_place role, * \param ch As in #x10rt_allreduce * \param arg As in #x10rt_allreduce */ -X10RT_C bool x10rt_lgl_agree (x10rt_team team, x10rt_place role, +X10RT_C void x10rt_lgl_agree (x10rt_team team, x10rt_place role, const int *sbuf, int *dbuf, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg); #endif diff --git a/x10.runtime/x10rt/include/x10rt_net.h b/x10.runtime/x10rt/include/x10rt_net.h index feb6662212..15f5c7df41 100644 --- a/x10.runtime/x10rt/include/x10rt_net.h +++ b/x10.runtime/x10rt/include/x10rt_net.h @@ -245,10 +245,9 @@ X10RT_C void x10rt_net_barrier (x10rt_team team, x10rt_place role, * \param ch As in #x10rt_lgl_bcast * \param arg As in #x10rt_lgl_bcast */ -X10RT_C bool x10rt_net_bcast (x10rt_team team, x10rt_place role, +X10RT_C void x10rt_net_bcast (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, void *dbuf, size_t el, size_t count, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg); /** \see #x10rt_lgl_scatter @@ -280,11 +279,10 @@ X10RT_C void x10rt_net_scatter (x10rt_team team, x10rt_place role, * \param ch As in #x10rt_lgl_scatterv * \param arg As in #x10rt_lgl_scatterv */ -X10RT_C bool x10rt_net_scatterv (x10rt_team team, x10rt_place role, +X10RT_C void x10rt_net_scatterv (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, const void *soffsets, const void *scounts, void *dbuf, size_t dcount, size_t el, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg); /** \see #x10rt_lgl_gather * \param team As in #x10rt_lgl_gather @@ -316,10 +314,9 @@ X10RT_C void x10rt_net_gather (x10rt_team team, x10rt_place role, * \param ch As in #x10rt_lgl_gatherv * \param arg As in #x10rt_lgl_gatherv */ -X10RT_C bool x10rt_net_gatherv (x10rt_team team, x10rt_place role, x10rt_place root, +X10RT_C void x10rt_net_gatherv (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, size_t scount, void *dbuf, const void *doffsets, const void *dcounts, size_t el, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg); /** \see #x10rt_lgl_alltoall @@ -366,12 +363,11 @@ X10RT_C void x10rt_net_reduce (x10rt_team team, x10rt_place role, * \param ch As in #x10rt_lgl_allreduce * \param arg As in #x10rt_lgl_allreduce */ -X10RT_C bool x10rt_net_allreduce (x10rt_team team, x10rt_place role, +X10RT_C void x10rt_net_allreduce (x10rt_team team, x10rt_place role, const void *sbuf, void *dbuf, x10rt_red_op_type op, x10rt_red_type dtype, size_t count, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg); @@ -383,9 +379,8 @@ X10RT_C bool x10rt_net_allreduce (x10rt_team team, x10rt_place role, * \param ch As in #x10rt_allreduce * \param arg As in #x10rt_allreduce */ -X10RT_C bool x10rt_net_agree (x10rt_team team, x10rt_place role, +X10RT_C void x10rt_net_agree (x10rt_team team, x10rt_place role, const int *sbuf, int *dbuf, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg); diff --git a/x10.runtime/x10rt/include/x10rt_types.h b/x10.runtime/x10rt/include/x10rt_types.h index 6862da90f1..00df61ba0a 100644 --- a/x10.runtime/x10rt/include/x10rt_types.h +++ b/x10.runtime/x10rt/include/x10rt_types.h @@ -43,7 +43,7 @@ typedef uint32_t x10rt_team; /** User callback to signal that non-blocking operations have completed. */ -typedef void x10rt_completion_handler (void *arg); +typedef void x10rt_completion_handler (void *arg, bool throwDPE); /** User callback to signal that non-blocking team construction operations have completed. */ diff --git a/x10.runtime/x10rt/jni/jni_team.cc b/x10.runtime/x10rt/jni/jni_team.cc index 3c4938efd0..7071266aa0 100644 --- a/x10.runtime/x10rt/jni/jni_team.cc +++ b/x10.runtime/x10rt/jni/jni_team.cc @@ -111,7 +111,7 @@ typedef struct finishOnlyStruct { jobject globalFinishState; } finishOnlyStruct; -static void finishOnlyCallback(void *arg) { +static void finishOnlyCallback(void *arg, bool throwDPE) { finishOnlyStruct* callbackArg = (finishOnlyStruct*)arg; JNIEnv *env = jniHelper_getEnv(); @@ -166,7 +166,7 @@ typedef struct postCopyStruct { void *dstData; } postCopyStruct; -static void postCopyCallback(void *arg) { +static void postCopyCallback(void *arg, bool dummy) { postCopyStruct *callbackArg = (postCopyStruct*)arg; JNIEnv *env = jniHelper_getEnv(); @@ -429,7 +429,7 @@ JNIEXPORT void JNICALL Java_x10_x10rt_TeamSupport_nativeScatterImpl(JNIEnv *env, * Method: nativeBcastImpl * Signature: (IIILjava/lang/Object;ILjava/lang/Object;IIILx10/lang/FinishState;)V */ -JNIEXPORT jobject JNICALL Java_x10_x10rt_TeamSupport_nativeBcastImpl(JNIEnv *env, jclass klazz, +JNIEXPORT void JNICALL Java_x10_x10rt_TeamSupport_nativeBcastImpl(JNIEnv *env, jclass klazz, jint id, jint role, jint root, jobject src, jint src_off, jobject dst, jint dst_off, @@ -578,7 +578,7 @@ JNIEXPORT jobject JNICALL Java_x10_x10rt_TeamSupport_nativeBcastImpl(JNIEnv *env callbackArg->srcData = srcData; callbackArg->dstData = dstData; - return (jobject)x10rt_bcast(id, role, root, srcData, dstData, el, count, &postCopyCallback, &postCopyCallback, callbackArg); + x10rt_bcast(id, role, root, srcData, dstData, el, count, &postCopyCallback, callbackArg); } @@ -854,7 +854,7 @@ JNIEXPORT void JNICALL Java_x10_x10rt_TeamSupport_nativeReduceImpl(JNIEnv *env, * Method: nativeAllReduceImpl * Signature: (IILjava/lang/Object;ILjava/lang/Object;IIIILx10/lang/FinishState;)V */ -JNIEXPORT jobject JNICALL Java_x10_x10rt_TeamSupport_nativeAllReduceImpl(JNIEnv *env, jclass klazz, +JNIEXPORT void JNICALL Java_x10_x10rt_TeamSupport_nativeAllReduceImpl(JNIEnv *env, jclass klazz, jint id, jint role, jobject src, jint src_off, jobject dst, jint dst_off, @@ -954,9 +954,8 @@ JNIEXPORT jobject JNICALL Java_x10_x10rt_TeamSupport_nativeAllReduceImpl(JNIEnv callbackArg->srcData = srcData; callbackArg->dstData = dstData; - //FIXME: how to call the correct failure call back? - return (jobject)x10rt_allreduce(id, role, srcData, dstData, (x10rt_red_op_type)op, (x10rt_red_type)typecode, - count, &postCopyCallback, &postCopyCallback, callbackArg); + x10rt_allreduce(id, role, srcData, dstData, (x10rt_red_op_type)op, (x10rt_red_type)typecode, + count, &postCopyCallback, callbackArg); } @@ -976,7 +975,7 @@ typedef struct minmaxStruct { DoubleIdx *dstData; } minmaxStruct; -static void minmaxCallback(void *arg) { +static void minmaxCallback(void *arg, bool dummy) { minmaxStruct *callbackArg = (minmaxStruct*)arg; JNIEnv *env = jniHelper_getEnv(); @@ -1024,7 +1023,7 @@ static void indexOfImpl(JNIEnv *env, jint id, jint role, callbackArg->srcData = srcData; callbackArg->dstData = dstData; - x10rt_allreduce(id, role, srcData, dstData, op, X10RT_RED_TYPE_DBL_S32, 1, &minmaxCallback, &minmaxCallback, callbackArg); + x10rt_allreduce(id, role, srcData, dstData, op, X10RT_RED_TYPE_DBL_S32, 1, &minmaxCallback, callbackArg); } diff --git a/x10.runtime/x10rt/mpi/x10rt_mpi.cc b/x10.runtime/x10rt/mpi/x10rt_mpi.cc index de77d54bdc..40c2032cc3 100644 --- a/x10.runtime/x10rt/mpi/x10rt_mpi.cc +++ b/x10.runtime/x10rt/mpi/x10rt_mpi.cc @@ -7,6 +7,7 @@ * http://www.opensource.org/licenses/eclipse-1.0.php * * (C) Copyright IBM Corporation 2006-2016. + * (C) Copyright Sara Salem Hamouda 2018. */ /* MPICH2 mpi.h wants to not have SEEK_SET etc defined for C++ bindings */ @@ -49,9 +50,15 @@ #if MPI_VERSION >= 3 || (defined(OPEN_MPI) && ( OMPI_MAJOR_VERSION >= 2 || (OMPI_MAJOR_VERSION == 1 && OMPI_MINOR_VERSION >= 8))) || (defined(MVAPICH2_NUMVERSION) && MVAPICH2_NUMVERSION == 10900002) #define X10RT_NONBLOCKING_SUPPORTED true +#ifdef OPEN_MPI_ULFM +#define ULFM2 true +#endif //#define X10RT_MPI3_RMA true // performance hasn't been shown to be better than active messages, so disabled by default. Uncomment this line to use RDMA for PUT & GET #else #define X10RT_NONBLOCKING_SUPPORTED false +#ifdef OPEN_MPI_ULFM +#define ULFM1 true +#endif #endif #define X10RT_NET_DEBUG(fmt, ...) do { \ @@ -156,13 +163,21 @@ static inline void release_lock(pthread_mutex_t * lock) { release_lock(&global_state.lock); \ } + #ifdef OPEN_MPI_ULFM void mpiErrorHandler(MPI_Comm * comm, int *errorCode, ...); +#endif + static inline int is_process_failure_error(int mpi_error){ +#ifdef OPEN_MPI_ULFM return mpi_error == MPI_ERR_PROC_FAILED || + mpi_error == MPI_ERR_PROC_FAILED_PENDING || mpi_error == MPI_ERR_REVOKED; -} +#else + return false; #endif +} + /** * Each X10RT API call is broken down into * a X10RT request. Each request of either @@ -543,7 +558,7 @@ x10rt_error x10rt_net_init(int *argc, char ** *argv, x10rt_msg_type *counter) { } else { char *thread_serialized = getenv(X10RT_MPI_THREAD_SERIALIZED); #ifdef OPEN_MPI_ULFM - thread_serialized = "1"; //ULFM does not support MPI_THREAD_MULTIPLE + thread_serialized = "1"; //ULFM1 does not support MPI_THREAD_MULTIPLE #endif int level_required; int level_provided; @@ -708,18 +723,23 @@ x10rt_place x10rt_net_ndead (void) { bool x10rt_net_is_place_dead (x10rt_place p) { #ifdef OPEN_MPI_ULFM - if (p >= global_state.nprocs) return true; - bool found = false; - get_lock(&global_state.lock); - //deadPlaces is not sorted, can't use binary search - for (int i=0; i 0){ + if (p >= global_state.nprocs) return true; + bool found = false; + get_lock(&global_state.lock); + //deadPlaces is not sorted, can't use binary search + for (int i=0; i(ch_); void *arg = reinterpret_cast(arg_); - ch(arg); + ch(arg, false); free(cont); } @@ -2642,7 +2647,7 @@ static void x10rt_net_team_barrier_for_blocking (x10rt_place placec, x10rt_place x10rt_serbuf_free(&b); */ - ch(arg); + ch(arg, false); X10RT_NET_DEBUG("%s", "finished"); } @@ -2697,7 +2702,7 @@ void x10rt_net_team_del (x10rt_team team, x10rt_place role, { X10RT_NET_DEBUG("team=%d, role=%d", team, role); mpi_tdb.releaseTeam(team); - ch(arg); + ch(arg, false); return; } @@ -2741,7 +2746,7 @@ void x10rt_net_team_members (x10rt_team team, x10rt_place *members, x10rt_comple MPI_Group_free(&MPI_GROUP_WORLD); MPI_Group_free(&grp); - ch(arg); + ch(arg, false); } x10rt_place x10rt_net_team_sz (x10rt_team team) @@ -2977,6 +2982,17 @@ MPI_Op mpi_red_op_type(x10rt_red_type dtype, x10rt_red_op_type op) { abort(); \ } \ UNLOCK_IF_MPI_IS_NOT_MULTITHREADED; +#define MPI_AGREEMENT_COLLECTIVE(name, iname, ...) \ + CollectivePostprocess *cp = new CollectivePostprocess(); \ + struct CollectivePostprocessEnv cpe = cp->env; \ + MPI_Request &req = cp->req; \ + LOCK_IF_MPI_IS_NOT_MULTITHREADED; \ + if (MPI_SUCCESS != MPIX_Comm_iagree(__VA_ARGS__, &req)) { \ + fprintf(stderr, "[%s:%d] %s\n", \ + __FILE__, __LINE__, "Error in MPI_" #iname); \ + abort(); \ + } \ + UNLOCK_IF_MPI_IS_NOT_MULTITHREADED; #define MPI_COLLECTIVE_SAVE(var) \ cp->env.env.MPI_COLLECTIVE_NAME.var = var; #define MPI_COLLECTIVE_POSTPROCESS \ @@ -2988,7 +3004,7 @@ MPI_Op mpi_red_op_type(x10rt_red_type dtype, x10rt_red_op_type op) { #define SAVED(var) \ cpe.env.MPI_COLLECTIVE_NAME.var #define MPI_COLLECTIVE_POSTPROCESS_END X10RT_NET_DEBUG("%s: %"PRIxPTR"_%"PRIxPTR,"end postprocess", SAVED(ch), SAVED(arg)); -#elif defined(OPEN_MPI_ULFM) +#else #define MPI_COLLECTIVE(name, iname, ...) \ CollectivePostprocessEnv cpe; \ do { LOCK_IF_MPI_IS_NOT_MULTITHREADED; \ @@ -3000,17 +3016,6 @@ MPI_Op mpi_red_op_type(x10rt_red_type dtype, x10rt_red_op_type op) { } \ UNLOCK_IF_MPI_IS_NOT_MULTITHREADED; \ } while(0) -#define MPI_COLLECTIVE_SAVE(var) \ - cpe.env.MPI_COLLECTIVE_NAME.var = var; -#define MPI_COLLECTIVE_POSTPROCESS \ - cpe.ch = ch; \ - cpe.arg = arg; \ - cpe.env.MPI_COLLECTIVE_NAME.mpiError = cpe.mpiError; \ - X10RT_NET_DEBUG("call handler %s","x10rt_net_handler_" TOSTR(MPI_COLLECTIVE_NAME)); \ - CONCAT(x10rt_net_handler_,MPI_COLLECTIVE_NAME)(cpe); -#define SAVED(var) \ - cpe.env.MPI_COLLECTIVE_NAME.var -#define MPI_COLLECTIVE_POSTPROCESS_END X10RT_NET_DEBUG("calling ULFM blocking collective completed, mpi_return_code is: %d", cpe.mpiError); #define MPI_AGREEMENT_COLLECTIVE(name, iname, ...) \ CollectivePostprocessEnv cpe; \ do { LOCK_IF_MPI_IS_NOT_MULTITHREADED; \ @@ -3022,29 +3027,20 @@ MPI_Op mpi_red_op_type(x10rt_red_type dtype, x10rt_red_op_type op) { } \ UNLOCK_IF_MPI_IS_NOT_MULTITHREADED; \ } while(0) -#else -#define MPI_COLLECTIVE(name, iname, ...) \ - CollectivePostprocessEnv cpe; \ - do { LOCK_IF_MPI_IS_NOT_MULTITHREADED; \ - if (MPI_SUCCESS != MPI_##name(__VA_ARGS__)) { \ - fprintf(stderr, "[%s:%d] %s\n", \ - __FILE__, __LINE__, "Error in MPI_" #name); \ - abort(); \ - } \ - UNLOCK_IF_MPI_IS_NOT_MULTITHREADED; \ - } while(0) #define MPI_COLLECTIVE_SAVE(var) \ cpe.env.MPI_COLLECTIVE_NAME.var = var; #define MPI_COLLECTIVE_POSTPROCESS \ cpe.ch = ch; \ cpe.arg = arg; \ + cpe.env.MPI_COLLECTIVE_NAME.mpiError = cpe.mpiError; \ X10RT_NET_DEBUG("call handler %s","x10rt_net_handler_" TOSTR(MPI_COLLECTIVE_NAME)); \ CONCAT(x10rt_net_handler_,MPI_COLLECTIVE_NAME)(cpe); #define SAVED(var) \ cpe.env.MPI_COLLECTIVE_NAME.var -#define MPI_COLLECTIVE_POSTPROCESS_END +#define MPI_COLLECTIVE_POSTPROCESS_END X10RT_NET_DEBUG("calling blocking collective completed, mpi_return_code is: %d", cpe.mpiError); #endif + static void x10rt_net_handler_barrier(CollectivePostprocessEnv); static void x10rt_net_handler_bcast(CollectivePostprocessEnv); static void x10rt_net_handler_scatter(CollectivePostprocessEnv); @@ -3088,16 +3084,22 @@ void x10rt_net_barrier (x10rt_team team, x10rt_place role, static void x10rt_net_handler_barrier (struct CollectivePostprocessEnv cpe) { X10RT_NET_DEBUG("%s: %"PRIxPTR"_%"PRIxPTR,"begin postprocess", SAVED(ch), SAVED(arg)); X10RT_NET_DEBUG("%s","before postprocess"); - SAVED(ch)(SAVED(arg)); +#ifdef OPEN_MPI_ULFM + if (is_process_failure_error(cpe.mpiError)) + SAVED(ch)(SAVED(arg), true); + else + SAVED(ch)(SAVED(arg), false); +#else + SAVED(ch)(SAVED(arg), false); +#endif X10RT_NET_DEBUG("%s","after postprocess"); MPI_COLLECTIVE_POSTPROCESS_END #undef MPI_COLLECTIVE_NAME } -bool x10rt_net_bcast (x10rt_team team, x10rt_place role, +void x10rt_net_bcast (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, void *dbuf, size_t el, size_t count, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) { #define MPI_COLLECTIVE_NAME bcast @@ -3120,7 +3122,9 @@ bool x10rt_net_bcast (x10rt_team team, x10rt_place role, MPI_Comm comm = mpi_tdb.comm(team); + X10RT_NET_DEBUG("%s", "pre bcast"); MPI_COLLECTIVE(Bcast, Ibcast, buf, count, get_mpi_datatype(el), root, comm); + X10RT_NET_DEBUG("%s", "pro bcast"); MPI_COLLECTIVE_SAVE(team); MPI_COLLECTIVE_SAVE(role); @@ -3135,12 +3139,6 @@ bool x10rt_net_bcast (x10rt_team team, x10rt_place role, MPI_COLLECTIVE_SAVE(buf); MPI_COLLECTIVE_POSTPROCESS - -#ifdef OPEN_MPI_ULFM - return MPI_SUCCESS == SAVED(mpiError); -#else - return true; -#endif } static void x10rt_net_handler_bcast (struct CollectivePostprocessEnv cpe) { @@ -3153,11 +3151,11 @@ static void x10rt_net_handler_bcast (struct CollectivePostprocessEnv cpe) { } #ifdef OPEN_MPI_ULFM if (is_process_failure_error(cpe.mpiError)) - SAVED(errch)(SAVED(arg)); + SAVED(ch)(SAVED(arg), true); else - SAVED(ch)(SAVED(arg)); + SAVED(ch)(SAVED(arg), false); #else - SAVED(ch)(SAVED(arg)); + SAVED(ch)(SAVED(arg), false); #endif MPI_COLLECTIVE_POSTPROCESS_END #undef MPI_COLLECTIVE_NAME @@ -3202,7 +3200,14 @@ static void x10rt_net_handler_scatter (struct CollectivePostprocessEnv cpe) { memcpy(SAVED(dbuf), SAVED(buf), SAVED(count) * SAVED(el)); free(SAVED(buf)); } - SAVED(ch)(SAVED(arg)); +#ifdef OPEN_MPI_ULFM + if (is_process_failure_error(cpe.mpiError)) + SAVED(ch)(SAVED(arg), true); + else + SAVED(ch)(SAVED(arg), false); +#else + SAVED(ch)(SAVED(arg), false); +#endif MPI_COLLECTIVE_POSTPROCESS_END #undef MPI_COLLECTIVE_NAME } @@ -3244,17 +3249,23 @@ static void x10rt_net_handler_alltoall (struct CollectivePostprocessEnv cpe) { memcpy(SAVED(dbuf), SAVED(buf), SAVED(count) * SAVED(el)); free(SAVED(buf)); } - SAVED(ch)(SAVED(arg)); +#ifdef OPEN_MPI_ULFM + if (is_process_failure_error(cpe.mpiError)) + SAVED(ch)(SAVED(arg), true); + else + SAVED(ch)(SAVED(arg), false); +#else + SAVED(ch)(SAVED(arg), false); +#endif MPI_COLLECTIVE_POSTPROCESS_END #undef MPI_COLLECTIVE_NAME } -bool x10rt_net_allreduce (x10rt_team team, x10rt_place role, +void x10rt_net_allreduce (x10rt_team team, x10rt_place role, const void *sbuf, void *dbuf, x10rt_red_op_type op, x10rt_red_type dtype, size_t count, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) { #define MPI_COLLECTIVE_NAME allreduce @@ -3280,7 +3291,6 @@ bool x10rt_net_allreduce (x10rt_team team, x10rt_place role, MPI_COLLECTIVE_SAVE(op); MPI_COLLECTIVE_SAVE(dtype); MPI_COLLECTIVE_SAVE(count); - MPI_COLLECTIVE_SAVE(errch); MPI_COLLECTIVE_SAVE(ch); MPI_COLLECTIVE_SAVE(arg); @@ -3288,12 +3298,6 @@ bool x10rt_net_allreduce (x10rt_team team, x10rt_place role, MPI_COLLECTIVE_SAVE(buf); MPI_COLLECTIVE_POSTPROCESS - -#ifdef OPEN_MPI_ULFM - return MPI_SUCCESS == SAVED(mpiError); -#else - return true; -#endif } static void x10rt_net_handler_allreduce (struct CollectivePostprocessEnv cpe) { @@ -3303,18 +3307,18 @@ static void x10rt_net_handler_allreduce (struct CollectivePostprocessEnv cpe) { } #ifdef OPEN_MPI_ULFM if (is_process_failure_error(cpe.mpiError)) - SAVED(errch)(SAVED(arg)); + SAVED(ch)(SAVED(arg), true); else - SAVED(ch)(SAVED(arg)); + SAVED(ch)(SAVED(arg), false); #else - SAVED(ch)(SAVED(arg)); + SAVED(ch)(SAVED(arg), false); #endif MPI_COLLECTIVE_POSTPROCESS_END #undef MPI_COLLECTIVE_NAME } -bool x10rt_net_scatterv (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, const void *soffsets, const void *scounts, - void *dbuf, size_t dcount, size_t el, x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) +void x10rt_net_scatterv (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, const void *soffsets, const void *scounts, + void *dbuf, size_t dcount, size_t el, x10rt_completion_handler *ch, void *arg) { #define MPI_COLLECTIVE_NAME scatterv assert(global_state.init); @@ -3353,12 +3357,6 @@ bool x10rt_net_scatterv (x10rt_team team, x10rt_place role, x10rt_place root, co MPI_COLLECTIVE_SAVE(soffsets_); MPI_COLLECTIVE_POSTPROCESS - -#ifdef OPEN_MPI_ULFM - return MPI_SUCCESS == SAVED(mpiError); -#else - return true; -#endif } static void x10rt_net_handler_scatterv (struct CollectivePostprocessEnv cpe) { @@ -3373,11 +3371,11 @@ static void x10rt_net_handler_scatterv (struct CollectivePostprocessEnv cpe) { #ifdef OPEN_MPI_ULFM if (is_process_failure_error(cpe.mpiError)) - SAVED(errch)(SAVED(arg)); + SAVED(ch)(SAVED(arg), true); else - SAVED(ch)(SAVED(arg)); + SAVED(ch)(SAVED(arg), false); #else - SAVED(ch)(SAVED(arg)); + SAVED(ch)(SAVED(arg), false); #endif MPI_COLLECTIVE_POSTPROCESS_END #undef MPI_COLLECTIVE_NAME @@ -3421,14 +3419,20 @@ static void x10rt_net_handler_gather (struct CollectivePostprocessEnv cpe) { memcpy(SAVED(dbuf), SAVED(buf), SAVED(gsize) * SAVED(count) * SAVED(el)); free(SAVED(buf)); } - SAVED(ch)(SAVED(arg)); +#ifdef OPEN_MPI_ULFM + if (is_process_failure_error(cpe.mpiError)) + SAVED(ch)(SAVED(arg), true); + else + SAVED(ch)(SAVED(arg), false); +#else + SAVED(ch)(SAVED(arg), false); +#endif MPI_COLLECTIVE_POSTPROCESS_END #undef MPI_COLLECTIVE_NAME } -bool x10rt_net_gatherv (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, size_t scount, +void x10rt_net_gatherv (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, size_t scount, void *dbuf, const void *doffsets, const void *dcounts, size_t el, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) { #define MPI_COLLECTIVE_NAME gatherv @@ -3463,12 +3467,6 @@ bool x10rt_net_gatherv (x10rt_team team, x10rt_place role, x10rt_place root, con MPI_COLLECTIVE_SAVE(doffsets_); MPI_COLLECTIVE_POSTPROCESS - -#ifdef OPEN_MPI_ULFM - return MPI_SUCCESS == SAVED(mpiError); -#else - return true; -#endif } static void x10rt_net_handler_gatherv (struct CollectivePostprocessEnv cpe) { @@ -3483,11 +3481,11 @@ static void x10rt_net_handler_gatherv (struct CollectivePostprocessEnv cpe) { #ifdef OPEN_MPI_ULFM if (is_process_failure_error(cpe.mpiError)) - SAVED(errch)(SAVED(arg)); + SAVED(ch)(SAVED(arg), true); else - SAVED(ch)(SAVED(arg)); + SAVED(ch)(SAVED(arg), false); #else - SAVED(ch)(SAVED(arg)); + SAVED(ch)(SAVED(arg), false); #endif MPI_COLLECTIVE_POSTPROCESS_END #undef MPI_COLLECTIVE_NAME @@ -3530,7 +3528,14 @@ static void x10rt_net_handler_allgather (struct CollectivePostprocessEnv cpe) { memcpy(SAVED(dbuf), SAVED(buf), SAVED(gsize) * SAVED(count) * SAVED(el)); free(SAVED(buf)); } - SAVED(ch)(SAVED(arg)); +#ifdef OPEN_MPI_ULFM + if (is_process_failure_error(cpe.mpiError)) + SAVED(ch)(SAVED(arg), true); + else + SAVED(ch)(SAVED(arg), false); +#else + SAVED(ch)(SAVED(arg), false); +#endif MPI_COLLECTIVE_POSTPROCESS_END #undef MPI_COLLECTIVE_NAME } @@ -3572,7 +3577,14 @@ void x10rt_net_allgatherv (x10rt_team team, x10rt_place role, const void *sbuf, static void x10rt_net_handler_allgatherv (struct CollectivePostprocessEnv cpe) { free(SAVED(dcounts_)); free(SAVED(doffsets_)); - SAVED(ch)(SAVED(arg)); +#ifdef OPEN_MPI_ULFM + if (is_process_failure_error(cpe.mpiError)) + SAVED(ch)(SAVED(arg), true); + else + SAVED(ch)(SAVED(arg), false); +#else + SAVED(ch)(SAVED(arg), false); +#endif MPI_COLLECTIVE_POSTPROCESS_END #undef MPI_COLLECTIVE_NAME } @@ -3628,12 +3640,19 @@ static void x10rt_net_handler_alltoallv (struct CollectivePostprocessEnv cpe) { free(SAVED(soffsets_)); free(SAVED(dcounts_)); free(SAVED(doffsets_)); - SAVED(ch)(SAVED(arg)); +#ifdef OPEN_MPI_ULFM + if (is_process_failure_error(cpe.mpiError)) + SAVED(ch)(SAVED(arg), true); + else + SAVED(ch)(SAVED(arg), false); +#else + SAVED(ch)(SAVED(arg), false); +#endif MPI_COLLECTIVE_POSTPROCESS_END #undef MPI_COLLECTIVE_NAME } -bool x10rt_net_agree (x10rt_team team, x10rt_place role, const int *sbuf, int *dbuf, x10rt_completion_handler *errch, +void x10rt_net_agree (x10rt_team team, x10rt_place role, const int *sbuf, int *dbuf, x10rt_completion_handler *ch, void *arg) { #ifdef OPEN_MPI_ULFM @@ -3653,19 +3672,15 @@ bool x10rt_net_agree (x10rt_team team, x10rt_place role, const int *sbuf, int *d MPI_COLLECTIVE_SAVE(arg); MPI_COLLECTIVE_POSTPROCESS - - return MPI_SUCCESS == SAVED(mpiError); -#else - return false; #endif } static void x10rt_net_handler_agree(CollectivePostprocessEnv cpe) { #ifdef OPEN_MPI_ULFM if (is_process_failure_error(cpe.mpiError)) - SAVED(errch)(SAVED(arg)); + SAVED(ch)(SAVED(arg), true); else - SAVED(ch)(SAVED(arg)); + SAVED(ch)(SAVED(arg), false); MPI_COLLECTIVE_POSTPROCESS_END #undef MPI_COLLECTIVE_NAME #endif @@ -3737,7 +3752,14 @@ static void x10rt_net_handler_reduce (struct CollectivePostprocessEnv cpe) { memcpy(SAVED(dbuf), SAVED(buf), SAVED(count) * sizeof_dtype(SAVED(dtype))); free(SAVED(buf)); } - SAVED(ch)(SAVED(arg)); +#ifdef OPEN_MPI_ULFM + if (is_process_failure_error(cpe.mpiError)) + SAVED(ch)(SAVED(arg), true); + else + SAVED(ch)(SAVED(arg), false); +#else + SAVED(ch)(SAVED(arg), false); +#endif MPI_COLLECTIVE_POSTPROCESS_END #undef MPI_COLLECTIVE_NAME } @@ -3935,6 +3957,7 @@ void mpiErrorHandler(MPI_Comm * comm, int *errorCode, ...){ if (placeRemovedCB != NULL && newDeadCount > oldDeadCount) { for (int i = oldDeadCount; i < newDeadCount; ++i) { placeRemovedCB(global_state.deadPlaces[i]); + X10RT_NET_DEBUG("Place(%d): MPI found dead Place(%d)", x10rt_net_here(), global_state.deadPlaces[i]); } } MPI_Group_free(&failedGroup); diff --git a/x10.runtime/x10rt/pami/x10rt_pami.cc b/x10.runtime/x10rt/pami/x10rt_pami.cc index 8ec2b6b401..f478a88501 100644 --- a/x10.runtime/x10rt/pami/x10rt_pami.cc +++ b/x10.runtime/x10rt/pami/x10rt_pami.cc @@ -1748,7 +1748,7 @@ static void collective_operation_complete (pami_context_t context, #ifdef DEBUG fprintf(stderr, "Place %u completed collective operation. cookie=%p\n", state.myPlaceId, cookie); #endif - cbd->tcb(cbd->arg); + cbd->tcb(cbd->arg, false); free(cbd->counts); free(cbd->offsets); free(cbd); @@ -1845,8 +1845,8 @@ void x10rt_net_barrier (x10rt_team team, x10rt_place role, x10rt_completion_hand if (status != PAMI_SUCCESS) error("Unable to post a barrier on team %u", team); } -bool x10rt_net_bcast (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, - void *dbuf, size_t el, size_t count, x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) +void x10rt_net_bcast (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, + void *dbuf, size_t el, size_t count, x10rt_completion_handler *ch, void *arg) { #ifdef DEBUG fprintf(stderr, "Place %u executing broadcast of %lu %lu-byte elements on team %u, with role=%u, root=%u\n", state.myPlaceId, count, el, team, role, root); @@ -1886,7 +1886,6 @@ bool x10rt_net_bcast (x10rt_team team, x10rt_place role, x10rt_place root, const // copy the data for the root separately. PAMI does not do this for us. if (role == root) memcpy(dbuf, sbuf, count*el); - return true; //PAMI not resilient } void x10rt_net_scatter (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, @@ -1928,9 +1927,9 @@ void x10rt_net_scatter (x10rt_team team, x10rt_place role, x10rt_place root, con // The local copy is not needed. PAMI handles this for us. } -bool x10rt_net_scatterv (x10rt_team team, x10rt_place role, x10rt_place root, +void x10rt_net_scatterv (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, const void *soffsets, const void *scounts, - void *dbuf, size_t dcount, size_t el, x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) + void *dbuf, size_t dcount, size_t el, x10rt_completion_handler *ch, void *arg) { x10rt_pami_team_callback *tcb = (x10rt_pami_team_callback *)x10rt_malloc(sizeof(x10rt_pami_team_callback)); if (tcb == NULL) error("Unable to allocate memory for a scatterv callback header"); @@ -1979,7 +1978,6 @@ bool x10rt_net_scatterv (x10rt_team team, x10rt_place role, x10rt_place root, PAMI_Context_unlock(state.context); #endif if (status != PAMI_SUCCESS) error("Unable to post a scatterv on team %u", team); - return true; //PAMI is not resilient } void x10rt_net_gather (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, @@ -2019,9 +2017,8 @@ void x10rt_net_gather (x10rt_team team, x10rt_place role, x10rt_place root, cons if (status != PAMI_SUCCESS) error("Unable to post a gather on team %u", team); } -bool x10rt_net_gatherv (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, size_t scount, +void x10rt_net_gatherv (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, size_t scount, void *dbuf, const void *doffsets, const void *dcounts, size_t el, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) { x10rt_pami_team_callback *tcb = (x10rt_pami_team_callback *)x10rt_malloc(sizeof(x10rt_pami_team_callback)); @@ -2071,7 +2068,6 @@ bool x10rt_net_gatherv (x10rt_team team, x10rt_place role, x10rt_place root, con PAMI_Context_unlock(state.context); #endif if (status != PAMI_SUCCESS) error("Unable to post a gatherv on team %u", team); - return true; //PAMI is not resilent } void x10rt_net_alltoall (x10rt_team team, x10rt_place role, const void *sbuf, void *dbuf, @@ -2209,8 +2205,8 @@ void x10rt_net_reduce (x10rt_team team, x10rt_place role, if (status != PAMI_SUCCESS) error("Unable to post a reduce on team %u", team); } -bool x10rt_net_allreduce (x10rt_team team, x10rt_place role, const void *sbuf, void *dbuf, - x10rt_red_op_type op, x10rt_red_type dtype, size_t count, x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) +void x10rt_net_allreduce (x10rt_team team, x10rt_place role, const void *sbuf, void *dbuf, + x10rt_red_op_type op, x10rt_red_type dtype, size_t count, x10rt_completion_handler *ch, void *arg) { // Issue the collective x10rt_pami_team_callback *tcb = (x10rt_pami_team_callback *)x10rt_malloc(sizeof(x10rt_pami_team_callback)); @@ -2253,16 +2249,13 @@ bool x10rt_net_allreduce (x10rt_team team, x10rt_place role, const void *sbuf, v PAMI_Context_unlock(state.context); #endif if (status != PAMI_SUCCESS) error("Unable to post an allreduce on team %u", team); - return true; //PAMI is not resilient } -bool x10rt_net_agree (x10rt_team team, x10rt_place role, +void x10rt_net_agree (x10rt_team team, x10rt_place role, const int *sbuf, int *dbuf, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) { error("x10rt_net_agree not implemented"); - return false; } /* Registering a place removed callback, never used in this implementation */ diff --git a/x10.runtime/x10rt/sockets/x10rt_sockets.cc b/x10.runtime/x10rt/sockets/x10rt_sockets.cc index 801ed95c72..44239d935e 100644 --- a/x10.runtime/x10rt/sockets/x10rt_sockets.cc +++ b/x10.runtime/x10rt/sockets/x10rt_sockets.cc @@ -1618,13 +1618,11 @@ void x10rt_net_barrier (x10rt_team team, x10rt_place role, x10rt_completion_hand fatal_error("x10rt_net_barrier not implemented"); } -bool x10rt_net_bcast (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, +void x10rt_net_bcast (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, void *dbuf, size_t el, size_t count, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) { fatal_error("x10rt_net_bcast not implemented"); - return false; } void x10rt_net_scatter (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, @@ -1633,11 +1631,10 @@ void x10rt_net_scatter (x10rt_team team, x10rt_place role, x10rt_place root, con fatal_error("x10rt_net_scatter not implemented"); } -bool x10rt_net_scatterv (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, const void *soffsets, const void *scounts, - void *dbuf, size_t dcount, size_t el,x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) +void x10rt_net_scatterv (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, const void *soffsets, const void *scounts, + void *dbuf, size_t dcount, size_t el, x10rt_completion_handler *ch, void *arg) { fatal_error("x10rt_net_scatterv not implemented"); - return false; } void x10rt_net_gather (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, @@ -1646,13 +1643,11 @@ void x10rt_net_gather (x10rt_team team, x10rt_place role, x10rt_place root, cons fatal_error("x10rt_net_gather not implemented"); } -bool x10rt_net_gatherv (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, size_t scount, +void x10rt_net_gatherv (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, size_t scount, void *dbuf, const void *doffsets, const void *dcounts, size_t el, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) { fatal_error("x10rt_net_gatherv not implemented"); - return false; } void x10rt_net_alltoall (x10rt_team team, x10rt_place role, const void *sbuf, void *dbuf, @@ -1671,20 +1666,17 @@ void x10rt_net_reduce (x10rt_team team, x10rt_place role, fatal_error("x10rt_net_reduce not implemented"); } -bool x10rt_net_allreduce (x10rt_team team, x10rt_place role, const void *sbuf, void *dbuf, - x10rt_red_op_type op, x10rt_red_type dtype, size_t count,x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) +void x10rt_net_allreduce (x10rt_team team, x10rt_place role, const void *sbuf, void *dbuf, + x10rt_red_op_type op, x10rt_red_type dtype, size_t count, x10rt_completion_handler *ch, void *arg) { fatal_error("x10rt_net_allreduce not implemented"); - return false; } -bool x10rt_net_agree (x10rt_team team, x10rt_place role, +void x10rt_net_agree (x10rt_team team, x10rt_place role, const int *sbuf, int *dbuf, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) { fatal_error("x10rt_net_agree not implemented"); - return false; } const char *x10rt_net_error_msg (void) { return context.errorMsg; } diff --git a/x10.runtime/x10rt/standalone/x10rt_standalone.cc b/x10.runtime/x10rt/standalone/x10rt_standalone.cc index e87e12c194..ab7700a1bf 100644 --- a/x10.runtime/x10rt/standalone/x10rt_standalone.cc +++ b/x10.runtime/x10rt/standalone/x10rt_standalone.cc @@ -976,10 +976,9 @@ void x10rt_net_barrier (x10rt_team team, x10rt_place role, abort(); } -bool x10rt_net_bcast (x10rt_team team, x10rt_place role, +void x10rt_net_bcast (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, void *dbuf, size_t el, size_t count, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) { abort(); @@ -993,11 +992,10 @@ void x10rt_net_scatter (x10rt_team team, x10rt_place role, abort(); } -bool x10rt_net_scatterv (x10rt_team team, x10rt_place role, +void x10rt_net_scatterv (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, const void *soffsets, const void *scounts, void *dbuf, size_t dcount, size_t el, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) { abort(); @@ -1011,11 +1009,10 @@ void x10rt_net_gather (x10rt_team team, x10rt_place role, abort(); } -bool x10rt_net_gatherv (x10rt_team team, x10rt_place role, +void x10rt_net_gatherv (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, size_t scount, void *dbuf, const void *doffsets, const void *dcounts, size_t el, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) { abort(); @@ -1039,20 +1036,18 @@ void x10rt_net_reduce (x10rt_team team, x10rt_place role, abort(); } -bool x10rt_net_allreduce (x10rt_team team, x10rt_place role, +void x10rt_net_allreduce (x10rt_team team, x10rt_place role, const void *sbuf, void *dbuf, x10rt_red_op_type op, x10rt_red_type dtype, size_t count, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) { abort(); } -bool x10rt_net_agree (x10rt_team team, x10rt_place role, +void x10rt_net_agree (x10rt_team team, x10rt_place role, const int *sbuf, int *dbuf, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) { abort(); diff --git a/x10.runtime/x10rt/test/x10rt_coll.cc b/x10.runtime/x10rt/test/x10rt_coll.cc index 97326bcbbb..6d4a6d8b72 100644 --- a/x10.runtime/x10rt/test/x10rt_coll.cc +++ b/x10.runtime/x10rt/test/x10rt_coll.cc @@ -145,7 +145,7 @@ static void coll_test (x10rt_team team, x10rt_place role, x10rt_place per_place) << " correctness (if no warnings follow then OK)..." << std::endl; finished = 0; x10rt_bcast(team, role, root, root==role ? sbuf : NULL, dbuf, - el, count, x10rt_one_setter, x10rt_one_setter, &finished); + el, count, x10rt_one_setter, &finished); while (!finished) { x10rt_aborting_probe(); } for (size_t i=0 ; i