Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision

Target

Select target project
  • becker29/master-thesis-custom-ompi
  • felixkhals/swp-cm22-planbased-mpi
2 results
Select Git revision
Show changes
Commits on Source (8)
......@@ -13,6 +13,8 @@
.hg
.hgignore_local
build_docker
*.la
*.lo
*.o
......
This diff is collapsed.
This diff is collapsed.
......@@ -27,86 +27,84 @@
#include "ompi_config.h"
#include <stdio.h>
#include "opal/util/show_help.h"
#include "opal/util/printf.h"
#include "opal/util/show_help.h"
#include "ompi/info/info.h"
#include "ompi/mpi/c/bindings.h"
#include "ompi/runtime/params.h"
#include "ompi/runtime/mpiruntime.h"
#include "ompi/communicator/communicator.h"
#include "ompi/errhandler/errhandler.h"
#include "ompi/dpm/dpm.h"
#include "ompi/errhandler/errhandler.h"
#include "ompi/info/info.h"
#include "ompi/memchecker.h"
#include "ompi/mpi/c/bindings.h"
#include "ompi/runtime/mpiruntime.h"
#include "ompi/runtime/params.h"
#if OMPI_BUILD_MPI_PROFILING
#if OPAL_HAVE_WEAK_SYMBOLS
#pragma weak MPI_Comm_spawn = PMPI_Comm_spawn
#endif
#define MPI_Comm_spawn PMPI_Comm_spawn
# if OPAL_HAVE_WEAK_SYMBOLS
# pragma weak MPI_Comm_spawn = PMPI_Comm_spawn
# endif
# define MPI_Comm_spawn PMPI_Comm_spawn
#endif
static const char FUNC_NAME[] = "MPI_Comm_spawn";
int MPI_Comm_spawn(const char *command, char *argv[], int maxprocs, MPI_Info info,
int root, MPI_Comm comm, MPI_Comm *intercomm,
int array_of_errcodes[])
int MPI_Comm_spawn(const char *command, char *argv[], int maxprocs, MPI_Info info, int root,
MPI_Comm comm, MPI_Comm *intercomm, int array_of_errcodes[])
{
int rank, rc=OMPI_SUCCESS, i, flag;
printf("MPI A\n");
fflush(stdout);
int rank, rc = OMPI_SUCCESS, i, flag;
bool send_first = false; /* we wait to be contacted */
ompi_communicator_t *newcomp=MPI_COMM_NULL;
char port_name[MPI_MAX_PORT_NAME]; char *port_string = NULL;
ompi_communicator_t *newcomp = MPI_COMM_NULL;
char port_name[MPI_MAX_PORT_NAME];
char *port_string = NULL;
bool non_mpi = false;
MEMCHECKER(
memchecker_comm(comm);
);
MEMCHECKER(memchecker_comm(comm););
printf("MPI B\n");
fflush(stdout);
if ( MPI_PARAM_CHECK ) {
if (MPI_PARAM_CHECK) {
OMPI_ERR_INIT_FINALIZE(FUNC_NAME);
if ( ompi_comm_invalid (comm)) {
return OMPI_ERRHANDLER_NOHANDLE_INVOKE(MPI_ERR_COMM,
FUNC_NAME);
if (ompi_comm_invalid(comm)) {
return OMPI_ERRHANDLER_NOHANDLE_INVOKE(MPI_ERR_COMM, FUNC_NAME);
}
if ( OMPI_COMM_IS_INTER(comm)) {
return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_COMM,
FUNC_NAME);
if (OMPI_COMM_IS_INTER(comm)) {
return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_COMM, FUNC_NAME);
}
if ( (0 > root) || (ompi_comm_size(comm) <= root) ) {
return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_ARG,
FUNC_NAME);
if ((0 > root) || (ompi_comm_size(comm) <= root)) {
return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_ARG, FUNC_NAME);
}
if ( NULL == intercomm ) {
return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_ARG,
FUNC_NAME);
if (NULL == intercomm) {
return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_ARG, FUNC_NAME);
}
}
#if OPAL_ENABLE_FT_MPI
if( OPAL_UNLIKELY(!ompi_comm_iface_coll_check(comm, &rc)) ) {
if (OPAL_UNLIKELY(!ompi_comm_iface_coll_check(comm, &rc))) {
return OMPI_ERRHANDLER_INVOKE(comm, rc, FUNC_NAME);
}
#endif
rank = ompi_comm_rank ( comm );
if ( MPI_PARAM_CHECK ) {
if ( rank == root ) {
if ( NULL == command ) {
return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_ARG,
FUNC_NAME);
printf("MPI C\n");
fflush(stdout);
rank = ompi_comm_rank(comm);
if (MPI_PARAM_CHECK) {
if (rank == root) {
if (NULL == command) {
return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_ARG, FUNC_NAME);
}
if ( 0 > maxprocs ) {
return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_ARG,
FUNC_NAME);
if (0 > maxprocs) {
return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_ARG, FUNC_NAME);
}
if (NULL == info || ompi_info_is_freed(info)) {
return OMPI_ERRHANDLER_NOHANDLE_INVOKE(MPI_ERR_INFO,
FUNC_NAME);
return OMPI_ERRHANDLER_NOHANDLE_INVOKE(MPI_ERR_INFO, FUNC_NAME);
}
}
}
printf("MPI D\n");
fflush(stdout);
if (!ompi_mpi_dynamics_is_enabled(FUNC_NAME)) {
return OMPI_ERRHANDLER_INVOKE(comm, OMPI_ERR_NOT_SUPPORTED, FUNC_NAME);
......@@ -114,61 +112,80 @@ int MPI_Comm_spawn(const char *command, char *argv[], int maxprocs, MPI_Info inf
/* initialize the port name to avoid problems */
memset(port_name, 0, MPI_MAX_PORT_NAME);
printf("MPI E\n");
fflush(stdout);
/* See if the info key "ompi_non_mpi" was set to true */
if (rank == root) {
ompi_info_get_bool(info, "ompi_non_mpi", &non_mpi, &flag);
}
if ( rank == root ) {
if (rank == root) {
if (!non_mpi) {
/* Open a port. The port_name is passed as an environment
variable to the children. */
if (OMPI_SUCCESS != (rc = ompi_dpm_open_port (port_name))) {
printf("NON_MPI\n");
fflush(stdout);
if (OMPI_SUCCESS != (rc = ompi_dpm_open_port(port_name))) {
goto error;
}
} else if (1 < ompi_comm_size(comm)) {
printf("OMPI_COMM_SIZE\n");
fflush(stdout);
/* we do not support non_mpi spawns on comms this size */
rc = OMPI_ERR_NOT_SUPPORTED;
goto error;
}
if (OMPI_SUCCESS != (rc = ompi_dpm_spawn (1, &command, &argv, &maxprocs,
&info, port_name))) {
printf("SPAWN\n");
fflush(stdout);
if (OMPI_SUCCESS
!= (rc = ompi_dpm_spawn(1, &command, &argv, &maxprocs, &info, port_name))) {
goto error;
}
}
printf("MPI F\n");
fflush(stdout);
error:
printf("MPI G\n");
fflush(stdout);
if (OMPI_SUCCESS != rc) {
printf("NO SUCCESS\n");
fflush(stdout);
/* There was an error in one of the above stages,
* we still need to do the connect_accept stage so that
* non-root ranks do not deadlock.
* Add the error code to the port string for connect_accept
* to propagate the error code. */
(void)opal_asprintf(&port_string, "%s:error=%d", port_name, rc);
}
else {
(void) opal_asprintf(&port_string, "%s:error=%d", port_name, rc);
} else {
port_string = port_name;
}
printf("MPI H\n");
fflush(stdout);
if (non_mpi) {
newcomp = MPI_COMM_NULL;
} else {
rc = ompi_dpm_connect_accept (comm, root, port_string, send_first, &newcomp);
rc = ompi_dpm_connect_accept(comm, root, port_string, send_first, &newcomp);
}
if (OPAL_ERR_NOT_SUPPORTED == rc) {
opal_show_help("help-mpi-api.txt",
"MPI function not supported",
true,
FUNC_NAME,
printf("NOT SUPPORTED\n");
opal_show_help("help-mpi-api.txt", "MPI function not supported", true, FUNC_NAME,
"Underlying runtime environment does not support spawn functionality");
fflush(stdout);
}
if(port_string != port_name) {
if (port_string != port_name) {
free(port_string);
}
printf("MPI I\n");
fflush(stdout);
/* close the port */
if (rank == root && !non_mpi) {
ompi_dpm_close_port(port_name);
......@@ -176,11 +193,14 @@ error:
/* set error codes */
if (MPI_ERRCODES_IGNORE != array_of_errcodes) {
for ( i=0; i < maxprocs; i++ ) {
array_of_errcodes[i]=rc;
for (i = 0; i < maxprocs; i++) {
array_of_errcodes[i] = rc;
}
}
printf("MPI J\n", rc);
fflush(stdout);
*intercomm = newcomp;
OMPI_ERRHANDLER_RETURN (rc, comm, rc, FUNC_NAME);
OMPI_ERRHANDLER_RETURN(rc, comm, rc, FUNC_NAME);
}
......@@ -27,88 +27,86 @@
#include "ompi_config.h"
#include <stdio.h>
#include "opal/util/show_help.h"
#include "opal/util/printf.h"
#include "opal/util/show_help.h"
#include "ompi/mpi/c/bindings.h"
#include "ompi/runtime/params.h"
#include "ompi/runtime/mpiruntime.h"
#include "ompi/communicator/communicator.h"
#include "ompi/dpm/dpm.h"
#include "ompi/errhandler/errhandler.h"
#include "ompi/info/info.h"
#include "ompi/dpm/dpm.h"
#include "ompi/memchecker.h"
#include "ompi/mpi/c/bindings.h"
#include "ompi/runtime/mpiruntime.h"
#include "ompi/runtime/params.h"
#if OMPI_BUILD_MPI_PROFILING
#if OPAL_HAVE_WEAK_SYMBOLS
#pragma weak MPI_Comm_spawn_multiple = PMPI_Comm_spawn_multiple
#endif
#define MPI_Comm_spawn_multiple PMPI_Comm_spawn_multiple
# if OPAL_HAVE_WEAK_SYMBOLS
# pragma weak MPI_Comm_spawn_multiple = PMPI_Comm_spawn_multiple
# endif
# define MPI_Comm_spawn_multiple PMPI_Comm_spawn_multiple
#endif
static const char FUNC_NAME[] = "MPI_Comm_spawn_multiple";
int MPI_Comm_spawn_multiple(int count, char *array_of_commands[], char **array_of_argv[],
const int array_of_maxprocs[], const MPI_Info array_of_info[],
int root, MPI_Comm comm, MPI_Comm *intercomm,
int array_of_errcodes[])
const int array_of_maxprocs[], const MPI_Info array_of_info[], int root,
MPI_Comm comm, MPI_Comm *intercomm, int array_of_errcodes[])
{
int i=0, rc=0, rank=0, size=0, flag;
ompi_communicator_t *newcomp=MPI_COMM_NULL;
bool send_first=false; /* they are contacting us first */
char port_name[MPI_MAX_PORT_NAME]; char *port_string = NULL;
printf("MPI A\n");
fflush(stdout);
int i = 0, rc = 0, rank = 0, size = 0, flag;
ompi_communicator_t *newcomp = MPI_COMM_NULL;
bool send_first = false; /* they are contacting us first */
char port_name[MPI_MAX_PORT_NAME];
char *port_string = NULL;
bool non_mpi = false, cumulative = false;
MEMCHECKER(
memchecker_comm(comm);
);
MEMCHECKER(memchecker_comm(comm););
if ( MPI_PARAM_CHECK ) {
if (MPI_PARAM_CHECK) {
OMPI_ERR_INIT_FINALIZE(FUNC_NAME);
if ( ompi_comm_invalid (comm)) {
return OMPI_ERRHANDLER_NOHANDLE_INVOKE(MPI_ERR_COMM,
FUNC_NAME);
if (ompi_comm_invalid(comm)) {
return OMPI_ERRHANDLER_NOHANDLE_INVOKE(MPI_ERR_COMM, FUNC_NAME);
}
if ( OMPI_COMM_IS_INTER(comm)) {
if (OMPI_COMM_IS_INTER(comm)) {
return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_COMM, FUNC_NAME);
}
if ( (0 > root) || (ompi_comm_size(comm) <= root) ) {
if ((0 > root) || (ompi_comm_size(comm) <= root)) {
return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_ARG, FUNC_NAME);
}
if ( NULL == intercomm ) {
if (NULL == intercomm) {
return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_ARG, FUNC_NAME);
}
}
rank = ompi_comm_rank ( comm );
if ( MPI_PARAM_CHECK ) {
if ( rank == root ) {
if ( 0 > count ) {
printf("MPI B\n");
fflush(stdout);
rank = ompi_comm_rank(comm);
if (MPI_PARAM_CHECK) {
if (rank == root) {
if (0 > count) {
return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_ARG, FUNC_NAME);
}
if ( NULL == array_of_commands ) {
if (NULL == array_of_commands) {
return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_ARG, FUNC_NAME);
}
if ( NULL == array_of_maxprocs ) {
if (NULL == array_of_maxprocs) {
return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_ARG, FUNC_NAME);
}
if ( NULL == array_of_info ) {
if (NULL == array_of_info) {
return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_INFO, FUNC_NAME);
}
for (i = 0; i < count; ++i) {
if (NULL == array_of_info[i] ||
ompi_info_is_freed(array_of_info[i])) {
return OMPI_ERRHANDLER_NOHANDLE_INVOKE(MPI_ERR_INFO,
FUNC_NAME);
if (NULL == array_of_info[i] || ompi_info_is_freed(array_of_info[i])) {
return OMPI_ERRHANDLER_NOHANDLE_INVOKE(MPI_ERR_INFO, FUNC_NAME);
}
/* If ompi_non_mpi is set to true on any info, it must
be set to true on all of them. Note that not
setting ompi_non_mpi is the same as setting it to
false. */
ompi_info_get_bool(array_of_info[i], "ompi_non_mpi", &non_mpi,
&flag);
ompi_info_get_bool(array_of_info[i], "ompi_non_mpi", &non_mpi, &flag);
if (flag && 0 == i) {
/* If this is the first info, save its
ompi_non_mpi value */
......@@ -119,32 +117,35 @@ int MPI_Comm_spawn_multiple(int count, char *array_of_commands[], char **array_o
/* If this info's effective value doesn't agree with
the rest of them, error */
if (cumulative != non_mpi) {
return OMPI_ERRHANDLER_NOHANDLE_INVOKE(
MPI_ERR_INFO,
FUNC_NAME);
return OMPI_ERRHANDLER_NOHANDLE_INVOKE(MPI_ERR_INFO, FUNC_NAME);
}
}
for ( i=0; i<count; i++ ) {
if ( NULL == array_of_commands[i] ) {
for (i = 0; i < count; i++) {
if (NULL == array_of_commands[i]) {
return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_ARG, FUNC_NAME);
}
if ( 0 > array_of_maxprocs[i] ) {
if (0 > array_of_maxprocs[i]) {
return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_ARG, FUNC_NAME);
}
}
}
}
printf("MPI C\n");
fflush(stdout);
if (!ompi_mpi_dynamics_is_enabled(FUNC_NAME)) {
return OMPI_ERRHANDLER_INVOKE(comm, OMPI_ERR_NOT_SUPPORTED, FUNC_NAME);
}
printf("MPI D\n");
fflush(stdout);
if (rank == root) {
if (MPI_INFO_NULL == array_of_info[0]) {
non_mpi = false;
} else {
ompi_info_get_bool(array_of_info[0], "ompi_non_mpi", &non_mpi,
&flag);
ompi_info_get_bool(array_of_info[0], "ompi_non_mpi", &non_mpi, &flag);
if (!flag) {
non_mpi = false;
}
......@@ -152,7 +153,7 @@ int MPI_Comm_spawn_multiple(int count, char *array_of_commands[], char **array_o
}
#if OPAL_ENABLE_FT_MPI
if( OPAL_UNLIKELY(!ompi_comm_iface_coll_check(comm, &rc)) ) {
if (OPAL_UNLIKELY(!ompi_comm_iface_coll_check(comm, &rc))) {
return OMPI_ERRHANDLER_INVOKE(comm, rc, FUNC_NAME);
}
#endif
......@@ -160,12 +161,14 @@ int MPI_Comm_spawn_multiple(int count, char *array_of_commands[], char **array_o
/* initialize the port name to avoid problems */
memset(port_name, 0, MPI_MAX_PORT_NAME);
printf("MPI E\n");
fflush(stdout);
if ( rank == root ) {
if (rank == root) {
if (!non_mpi) {
/* Open a port. The port_name is passed as an environment
variable to the children. */
if (OMPI_SUCCESS != (rc = ompi_dpm_open_port (port_name))) {
if (OMPI_SUCCESS != (rc = ompi_dpm_open_port(port_name))) {
goto error;
}
} else if (1 < ompi_comm_size(comm)) {
......@@ -173,41 +176,46 @@ int MPI_Comm_spawn_multiple(int count, char *array_of_commands[], char **array_o
rc = OMPI_ERR_NOT_SUPPORTED;
goto error;
}
if (OMPI_SUCCESS != (rc = ompi_dpm_spawn(count, (const char **) array_of_commands,
array_of_argv, array_of_maxprocs,
array_of_info, port_name))) {
if (OMPI_SUCCESS
!= (rc = ompi_dpm_spawn(count, (const char **) array_of_commands, array_of_argv,
array_of_maxprocs, array_of_info, port_name))) {
goto error;
}
}
printf("MPI F\n");
fflush(stdout);
error:
printf("MPI G\n");
fflush(stdout);
if (OMPI_SUCCESS != rc) {
/* There was an error in one of the above stages,
* we still need to do the connect_accept stage so that
* non-root ranks do not deadlock.
* Add the error code to the port string for connect_accept
* to propagate the error code. */
(void)opal_asprintf(&port_string, "%s:error=%d", port_name, rc);
}
else {
(void) opal_asprintf(&port_string, "%s:error=%d", port_name, rc);
} else {
port_string = port_name;
}
if (non_mpi) {
newcomp = MPI_COMM_NULL;
} else {
rc = ompi_dpm_connect_accept (comm, root, port_string, send_first, &newcomp);
rc = ompi_dpm_connect_accept(comm, root, port_string, send_first, &newcomp);
}
printf("MPI H\n");
fflush(stdout);
if (OPAL_ERR_NOT_SUPPORTED == rc) {
opal_show_help("help-mpi-api.txt",
"MPI function not supported",
true,
FUNC_NAME,
opal_show_help("help-mpi-api.txt", "MPI function not supported", true, FUNC_NAME,
"Underlying runtime environment does not support spawn functionality");
}
if(port_string != port_name) {
if (port_string != port_name) {
free(port_string);
}
......@@ -216,21 +224,26 @@ error:
ompi_dpm_close_port(port_name);
}
printf("MPI I\n");
fflush(stdout);
/* set array of errorcodes */
if (MPI_ERRCODES_IGNORE != array_of_errcodes) {
if (MPI_COMM_NULL != newcomp) {
size = newcomp->c_remote_group->grp_proc_count;
} else {
for ( i=0; i < count; i++) {
for (i = 0; i < count; i++) {
size = size + array_of_maxprocs[i];
}
}
for ( i=0; i < size; i++ ) {
array_of_errcodes[i]=rc;
for (i = 0; i < size; i++) {
array_of_errcodes[i] = rc;
}
}
printf("MPI J\n");
fflush(stdout);
*intercomm = newcomp;
OMPI_ERRHANDLER_RETURN (rc, comm, rc, FUNC_NAME);
OMPI_ERRHANDLER_RETURN(rc, comm, rc, FUNC_NAME);
}
......@@ -20,33 +20,40 @@
#include "ompi_config.h"
#include "ompi/mpi/c/bindings.h"
#include "ompi/runtime/params.h"
#include "ompi/errhandler/errhandler.h"
#include "ompi/mpi/c/bindings.h"
#include "ompi/runtime/ompi_spc.h"
#include "ompi/runtime/params.h"
#if OMPI_BUILD_MPI_PROFILING
#if OPAL_HAVE_WEAK_SYMBOLS
#pragma weak MPI_Finalize = PMPI_Finalize
#endif
#define MPI_Finalize PMPI_Finalize
# if OPAL_HAVE_WEAK_SYMBOLS
# pragma weak MPI_Finalize = PMPI_Finalize
# endif
# define MPI_Finalize PMPI_Finalize
#endif
static const char FUNC_NAME[] = "MPI_Finalize";
int MPI_Finalize(void)
{
/* If --with-spc and ompi_mpi_spc_dump_enabled were specified, print
* all of the final SPC values aggregated across the whole MPI run.
* Also, free all SPC memory.
*/
printf("FIN 1\n");
fflush(stdout);
SPC_FINI();
printf("FIN 2\n");
fflush(stdout);
if (MPI_PARAM_CHECK) {
OMPI_ERR_INIT_FINALIZE(FUNC_NAME);
}
printf("FIN 3\n");
fflush(stdout);
/* Pretty simple */
return ompi_mpi_finalize();
......
......@@ -35,67 +35,67 @@
#include "ompi_config.h"
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
# include <sys/types.h>
#endif
#ifdef HAVE_UNISTD_H
#include <unistd.h>
# include <unistd.h>
#endif
#ifdef HAVE_SYS_PARAM_H
#include <sys/param.h>
# include <sys/param.h>
#endif
#ifdef HAVE_NETDB_H
#include <netdb.h>
# include <netdb.h>
#endif
#include "opal/util/event.h"
#include "opal/util/output.h"
#include "opal/runtime/opal_progress.h"
#include "opal/mca/allocator/base/base.h"
#include "opal/mca/base/base.h"
#include "opal/sys/atomic.h"
#include "opal/runtime/opal.h"
#include "opal/util/show_help.h"
#include "opal/util/opal_environ.h"
#include "opal/mca/mpool/base/base.h"
#include "opal/mca/mpool/base/mpool_base_tree.h"
#include "opal/mca/rcache/base/base.h"
#include "opal/mca/allocator/base/base.h"
#include "opal/mca/pmix/pmix-internal.h"
#include "opal/mca/rcache/base/base.h"
#include "opal/runtime/opal.h"
#include "opal/runtime/opal_progress.h"
#include "opal/sys/atomic.h"
#include "opal/util/event.h"
#include "opal/util/opal_environ.h"
#include "opal/util/output.h"
#include "opal/util/show_help.h"
#include "opal/util/timings.h"
#include "mpi.h"
#include "ompi/constants.h"
#include "ompi/errhandler/errcode.h"
#include "ompi/attribute/attribute.h"
#include "ompi/communicator/communicator.h"
#include "ompi/constants.h"
#include "ompi/datatype/ompi_datatype.h"
#include "ompi/message/message.h"
#include "ompi/op/op.h"
#include "ompi/dpm/dpm.h"
#include "ompi/errhandler/errcode.h"
#include "ompi/file/file.h"
#include "ompi/info/info.h"
#include "ompi/runtime/mpiruntime.h"
#include "ompi/attribute/attribute.h"
#include "ompi/mca/pml/pml.h"
#include "ompi/mca/bml/bml.h"
#include "ompi/mca/pml/base/base.h"
#include "ompi/mca/bml/base/base.h"
#include "ompi/mca/osc/base/base.h"
#include "ompi/mca/part/base/base.h"
#include "ompi/mca/coll/base/coll_base_functions.h"
#include "ompi/mca/bml/bml.h"
#include "ompi/mca/coll/base/base.h"
#include "ompi/runtime/ompi_rte.h"
#include "ompi/mca/topo/base/base.h"
#include "ompi/mca/io/io.h"
#include "ompi/mca/coll/base/coll_base_functions.h"
#include "ompi/mca/hook/base/base.h"
#include "ompi/mca/io/base/base.h"
#include "ompi/mca/io/io.h"
#include "ompi/mca/osc/base/base.h"
#include "ompi/mca/part/base/base.h"
#include "ompi/mca/pml/base/base.h"
#include "ompi/mca/pml/base/pml_base_bsend.h"
#include "ompi/runtime/params.h"
#include "ompi/dpm/dpm.h"
#include "ompi/mca/pml/pml.h"
#include "ompi/mca/topo/base/base.h"
#include "ompi/message/message.h"
#include "ompi/mpiext/mpiext.h"
#include "ompi/mca/hook/base/base.h"
#include "ompi/op/op.h"
#include "ompi/runtime/mpiruntime.h"
#include "ompi/runtime/ompi_rte.h"
#include "ompi/runtime/params.h"
extern bool ompi_enable_timing;
static void fence_cbfunc(pmix_status_t status, void *cbdata)
{
volatile bool *active = (volatile bool*)cbdata;
volatile bool *active = (volatile bool *) cbdata;
OPAL_ACQUIRE_OBJECT(active);
*active = false;
OPAL_POST_OBJECT(active);
......@@ -105,18 +105,20 @@ int ompi_mpi_finalize(void)
{
int ret = MPI_SUCCESS;
opal_list_item_t *item;
ompi_proc_t** procs;
ompi_proc_t **procs;
size_t nprocs;
volatile bool active;
uint32_t key;
ompi_datatype_t * datatype;
ompi_datatype_t *datatype;
pmix_status_t rc;
printf("FIN 4\n");
fflush(stdout);
ompi_hook_base_mpi_finalize_top();
int32_t state = ompi_mpi_state;
if (state < OMPI_MPI_STATE_INIT_COMPLETED ||
state >= OMPI_MPI_STATE_FINALIZE_STARTED) {
if (state < OMPI_MPI_STATE_INIT_COMPLETED || state >= OMPI_MPI_STATE_FINALIZE_STARTED) {
/* Note that if we're not initialized or already finalized, we
cannot raise an MPI error. The best that we can do is
write something to stderr. */
......@@ -125,13 +127,11 @@ int ompi_mpi_finalize(void)
hostname = opal_gethostname();
if (state < OMPI_MPI_STATE_INIT_COMPLETED) {
opal_show_help("help-mpi-runtime.txt",
"mpi_finalize: not initialized",
true, hostname, pid);
opal_show_help("help-mpi-runtime.txt", "mpi_finalize: not initialized", true, hostname,
pid);
} else if (state >= OMPI_MPI_STATE_FINALIZE_STARTED) {
opal_show_help("help-mpi-runtime.txt",
"mpi_finalize:invoked_multiple_times",
true, hostname, pid);
opal_show_help("help-mpi-runtime.txt", "mpi_finalize:invoked_multiple_times", true,
hostname, pid);
}
return MPI_ERR_OTHER;
}
......@@ -140,36 +140,39 @@ int ompi_mpi_finalize(void)
ompi_mpiext_fini();
printf("FIN 5\n");
fflush(stdout);
/* Per MPI-2:4.8, we have to free MPI_COMM_SELF before doing
anything else in MPI_FINALIZE (to include setting up such that
MPI_FINALIZED will return true). */
if (NULL != ompi_mpi_comm_self.comm.c_keyhash) {
ompi_attr_delete_all(COMM_ATTR, &ompi_mpi_comm_self,
ompi_mpi_comm_self.comm.c_keyhash);
ompi_attr_delete_all(COMM_ATTR, &ompi_mpi_comm_self, ompi_mpi_comm_self.comm.c_keyhash);
OBJ_RELEASE(ompi_mpi_comm_self.comm.c_keyhash);
ompi_mpi_comm_self.comm.c_keyhash = NULL;
}
#if OPAL_ENABLE_FT_MPI
if( ompi_ftmpi_enabled ) {
ompi_communicator_t* comm = &ompi_mpi_comm_world.comm;
OPAL_OUTPUT_VERBOSE((50, ompi_ftmpi_output_handle, "FT: Rank %d entering finalize", ompi_comm_rank(comm)));
if (ompi_ftmpi_enabled) {
ompi_communicator_t *comm = &ompi_mpi_comm_world.comm;
OPAL_OUTPUT_VERBOSE(
(50, ompi_ftmpi_output_handle, "FT: Rank %d entering finalize", ompi_comm_rank(comm)));
/* grpcomm barrier does not tolerate /new/ failures. Let's make sure
* we drain all preexisting failures before we proceed;
* TODO: when we have better failure support in the runtime, we can
* remove that agreement */
ompi_communicator_t* ncomm;
ompi_communicator_t *ncomm;
ret = ompi_comm_shrink_internal(comm, &ncomm);
if( MPI_SUCCESS != ret ) {
if (MPI_SUCCESS != ret) {
OMPI_ERROR_LOG(ret);
goto done;
}
/* do a barrier with closest neighbors in the ring, using doublering as
* it is synchronous and will help flush all past communications */
ret = ompi_coll_base_barrier_intra_doublering(ncomm, ncomm->c_coll->coll_barrier_module);
if( MPI_SUCCESS != ret ) {
if (MPI_SUCCESS != ret) {
OMPI_ERROR_LOG(ret);
goto done;
}
......@@ -178,12 +181,14 @@ int ompi_mpi_finalize(void)
/* finalize the fault tolerant infrastructure (revoke,
* failure propagator, etc). From now-on we do not tolerate new failures. */
OPAL_OUTPUT_VERBOSE((50, ompi_ftmpi_output_handle, "FT: Rank %05d turning off FT", ompi_comm_rank(comm)));
OPAL_OUTPUT_VERBOSE(
(50, ompi_ftmpi_output_handle, "FT: Rank %05d turning off FT", ompi_comm_rank(comm)));
ompi_comm_failure_detector_finalize();
ompi_comm_failure_propagator_finalize();
ompi_comm_revoke_finalize();
ompi_comm_rbcast_finalize();
opal_output_verbose(40, ompi_ftmpi_output_handle, "Rank %05d: DONE WITH FINALIZE", ompi_comm_rank(comm));
opal_output_verbose(40, ompi_ftmpi_output_handle, "Rank %05d: DONE WITH FINALIZE",
ompi_comm_rank(comm));
}
#endif /* OPAL_ENABLE_FT_MPI */
......@@ -193,18 +198,20 @@ int ompi_mpi_finalize(void)
COMM_SELF is destroyed / all the attribute callbacks have been
invoked) */
opal_atomic_wmb();
opal_atomic_swap_32(&ompi_mpi_state,
OMPI_MPI_STATE_FINALIZE_PAST_COMM_SELF_DESTRUCT);
opal_atomic_swap_32(&ompi_mpi_state, OMPI_MPI_STATE_FINALIZE_PAST_COMM_SELF_DESTRUCT);
/* As finalize is the last legal MPI call, we are allowed to force the release
* of the user buffer used for bsend, before going anywhere further.
*/
(void)mca_pml_base_bsend_detach(NULL, NULL);
(void) mca_pml_base_bsend_detach(NULL, NULL);
#if OPAL_ENABLE_PROGRESS_THREADS == 0
opal_progress_set_event_flag(OPAL_EVLOOP_ONCE | OPAL_EVLOOP_NONBLOCK);
#endif
printf("FIN 6\n");
fflush(stdout);
/* Redo ORTE calling opal_progress_event_users_increment() during
MPI lifetime, to get better latency when not using TCP */
opal_progress_event_users_increment();
......@@ -293,7 +300,8 @@ int ompi_mpi_finalize(void)
* communications/actions to complete. See
* https://github.com/open-mpi/ompi/issues/1576 for the
* original bug report. */
if (PMIX_SUCCESS != (rc = PMIx_Fence_nb(NULL, 0, NULL, 0, fence_cbfunc, (void*)&active))) {
if (PMIX_SUCCESS
!= (rc = PMIx_Fence_nb(NULL, 0, NULL, 0, fence_cbfunc, (void *) &active))) {
ret = opal_pmix_convert_status(rc);
OMPI_ERROR_LOG(ret);
/* Reset the active flag to false, to avoid waiting for
......@@ -303,6 +311,9 @@ int ompi_mpi_finalize(void)
OMPI_LAZY_WAIT_FOR_COMPLETION(active);
}
printf("FIN 7\n");
fflush(stdout);
/* Shut down any bindings-specific issues: C++, F77, F90 */
/* Remove all memory associated by MPI_REGISTER_DATAREP (per
......@@ -315,16 +326,19 @@ int ompi_mpi_finalize(void)
OBJ_DESTRUCT(&ompi_registered_datareps);
/* Remove all F90 types from the hash tables */
OPAL_HASH_TABLE_FOREACH(key, uint32, datatype, &ompi_mpi_f90_integer_hashtable)
OPAL_HASH_TABLE_FOREACH (key, uint32, datatype, &ompi_mpi_f90_integer_hashtable)
OBJ_RELEASE(datatype);
OBJ_DESTRUCT(&ompi_mpi_f90_integer_hashtable);
OPAL_HASH_TABLE_FOREACH(key, uint32, datatype, &ompi_mpi_f90_real_hashtable)
OPAL_HASH_TABLE_FOREACH (key, uint32, datatype, &ompi_mpi_f90_real_hashtable)
OBJ_RELEASE(datatype);
OBJ_DESTRUCT(&ompi_mpi_f90_real_hashtable);
OPAL_HASH_TABLE_FOREACH(key, uint32, datatype, &ompi_mpi_f90_complex_hashtable)
OPAL_HASH_TABLE_FOREACH (key, uint32, datatype, &ompi_mpi_f90_complex_hashtable)
OBJ_RELEASE(datatype);
OBJ_DESTRUCT(&ompi_mpi_f90_complex_hashtable);
printf("FIN 7a\n");
fflush(stdout);
/* Free communication objects */
/* free file resources */
......@@ -332,6 +346,9 @@ int ompi_mpi_finalize(void)
goto done;
}
printf("FIN 7b\n");
fflush(stdout);
/* free window resources */
if (OMPI_SUCCESS != (ret = ompi_win_finalize())) {
goto done;
......@@ -343,6 +360,8 @@ int ompi_mpi_finalize(void)
goto done;
}
printf("FIN 7c\n");
fflush(stdout);
/* free communicator resources. this MUST come before finalizing the PML
* as this will call into the pml */
......@@ -350,16 +369,19 @@ int ompi_mpi_finalize(void)
goto done;
}
printf("FIN 8\n");
fflush(stdout);
/* call del_procs on all allocated procs even though some may not be known
* to the pml layer. the pml layer is expected to be resilient and ignore
* any unknown procs. */
nprocs = 0;
procs = ompi_proc_get_allocated (&nprocs);
procs = ompi_proc_get_allocated(&nprocs);
MCA_PML_CALL(del_procs(procs, nprocs));
free(procs);
/* free pml resource */
if(OMPI_SUCCESS != (ret = mca_pml_base_finalize())) {
if (OMPI_SUCCESS != (ret = mca_pml_base_finalize())) {
goto done;
}
......@@ -380,7 +402,7 @@ int ompi_mpi_finalize(void)
/* Now that all MPI objects dealing with communications are gone,
shut down MCA types having to do with communications */
if (OMPI_SUCCESS != (ret = mca_base_framework_close(&ompi_pml_base_framework) ) ) {
if (OMPI_SUCCESS != (ret = mca_base_framework_close(&ompi_pml_base_framework))) {
OMPI_ERROR_LOG(ret);
goto done;
}
......@@ -401,7 +423,7 @@ int ompi_mpi_finalize(void)
}
/* finalize the DPM subsystem */
if ( OMPI_SUCCESS != (ret = ompi_dpm_finalize())) {
if (OMPI_SUCCESS != (ret = ompi_dpm_finalize())) {
goto done;
}
......@@ -420,6 +442,9 @@ int ompi_mpi_finalize(void)
goto done;
}
printf("FIN 9\n");
fflush(stdout);
/* Free all other resources */
/* free op resources */
......@@ -472,8 +497,11 @@ int ompi_mpi_finalize(void)
goto done;
}
printf("FIN 10\n");
fflush(stdout);
/* free proc resources */
if ( OMPI_SUCCESS != (ret = ompi_proc_finalize())) {
if (OMPI_SUCCESS != (ret = ompi_proc_finalize())) {
goto done;
}
......@@ -494,7 +522,7 @@ int ompi_mpi_finalize(void)
ompi_rte_initialized = false;
/* Now close the hook framework */
if (OMPI_SUCCESS != (ret = mca_base_framework_close(&ompi_hook_base_framework) ) ) {
if (OMPI_SUCCESS != (ret = mca_base_framework_close(&ompi_hook_base_framework))) {
OMPI_ERROR_LOG(ret);
goto done;
}
......@@ -516,11 +544,19 @@ int ompi_mpi_finalize(void)
/* All done */
done:
printf("FIN 11\n");
fflush(stdout);
done:
printf("FIN 12\n");
fflush(stdout);
opal_atomic_wmb();
opal_atomic_swap_32(&ompi_mpi_state, OMPI_MPI_STATE_FINALIZE_COMPLETED);
ompi_hook_base_mpi_finalize_bottom();
printf("FIN 13\n");
fflush(stdout);
return ret;
}
This diff is collapsed.
......@@ -7,58 +7,42 @@
* Sample MPI "hello world" application in C
*/
#include <stdlib.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <unistd.h>
#include "mpi.h"
#define BUFFLEN 64
extern char **environ;
/* Spawn Modes (dynamic job spawning):
* 1: Spawn just one process (in one job)
* 2: Spawn 2 processes in 2 different jobs
* 3: Spawn 2 prcoesses in one (shared) job
* 3: Spawn 3 processes in one (shared) job
* 0 or other: Do not spawn a dynamic process/job
*/
#define SPAWN_MODE 0
#define SPAWN_MODE 1
void errorExit(char* msg);
int main(int argc, char *argv[])
{
FILE *ptr = fopen("/home/test_out", "a");
fprintf(ptr, "TEST APP STARTED\n");
char **s = environ;
/*void notifyProcessAgent(pid_t pid, int rank, const char *eventInfo) {
struct sockaddr_un strAddr;
socklen_t lenAddr;
int fdSock;
if ((fdSock=socket(PF_UNIX, SOCK_STREAM, 0)) < 0) {
errorExit("socket");
}
strAddr.sun_family=AF_LOCAL; // Unix domain
strcpy(strAddr.sun_path, SOCKET_PATH);
lenAddr=sizeof(strAddr.sun_family)+strlen(strAddr.sun_path);
if (connect(fdSock, (struct sockaddr*)&strAddr, lenAddr) !=0 ) {
errorExit("connect");
// PRINT ENV
for (; *s; s++) {
fprintf(ptr, "%s\n", *s);
}
char info2Send[BUFFLEN];
snprintf(info2Send, BUFFLEN+1, "%s: %d, %d", eventInfo, pid, rank);
if (send(fdSock, info2Send, BUFFLEN+1, 0) < 0) {
errorExit("send");
char *parentPort = getenv("OMPI_PARENT_PORT");
if (parentPort) {
fprintf(ptr, "Hey we have a parent port! %s", parentPort);
}
printf("\nData send!\n");
char rank2Recv[BUFFLEN];
recv(fdSock, rank2Recv, BUFFLEN+1, 0);
int receivedNumber = (int)strtol(rank2Recv, NULL, 0);
printf("Received from server: %d\n", receivedNumber);
close(fdSock);
}*/
int main(int argc, char* argv[]) {
fclose(ptr);
int rank, size, len;
pid_t pid;
......@@ -67,80 +51,97 @@ int main(int argc, char* argv[]) {
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
pid = getpid();
//notifyProcessAgent(pid, rank, "Spawned");
// notifyProcessAgent(pid, rank, "Spawned");
printf("Hello, world, I am %d of %d, PID: %d\n", rank, size, pid);
fflush(stdout);
// dynamically spawn child process
// https://mpi.deino.net/mpi_functions/MPI_Comm_spawn.html
if (1 == SPAWN_MODE) {
int np = 1;
int errcodes[1];
int np = 2;
int errcodes[2];
MPI_Comm parentcomm, intercomm;
MPI_Comm_get_parent( &parentcomm );
MPI_Comm_get_parent(&parentcomm);
if (parentcomm == MPI_COMM_NULL) {
MPI_Comm_spawn( "hello", MPI_ARGV_NULL, np, MPI_INFO_NULL, 0,
MPI_COMM_WORLD, &intercomm, errcodes );
MPI_Comm_spawn("/home/ompi/rank-swapper-agent/hello", MPI_ARGV_NULL, np, MPI_INFO_NULL,
0, MPI_COMM_WORLD, &intercomm, errcodes);
printf("I'm the parent.\n");
fflush(stdout);
} else {
printf("I'm the spawned.\n");
fflush(stdout);
}
if (0 != errcodes[0]) {
printf("ERROR_SPAWN: code: %d\n", errcodes[0]);
fflush(stdout);
}
// (instead of Comm_multiple) spawns a second, different intercommunicator
// (instead of Comm_multiple) spawns a second, different intercommunicator
} else if (2 == SPAWN_MODE) {
int np = 1;
int errcodes[1];
MPI_Comm parentcomm, intercomm;
MPI_Comm_get_parent( &parentcomm );
int np = 2;
int errcodes[2];
MPI_Comm parentcomm;
MPI_Comm intercomm[2];
MPI_Comm_get_parent(&parentcomm);
if (parentcomm == MPI_COMM_NULL) {
for (int i = 0; i < 2; i++) {
MPI_Comm_spawn( "hello", MPI_ARGV_NULL, np, MPI_INFO_NULL, 0,
MPI_COMM_WORLD, &intercomm, errcodes );
MPI_Comm_spawn("/home/ompi/rank-swapper-agent/hello", MPI_ARGV_NULL, np,
MPI_INFO_NULL, 0, MPI_COMM_WORLD, &intercomm[i], errcodes);
if (0 != errcodes[0]) {
printf("ERROR_SPAWN: code: %d\n", errcodes[0]);
fflush(stdout);
}
}
printf("I'm the parent.\n");
fflush(stdout);
} else {
printf("I'm the spawned.\n");
fflush(stdout);
}
} else if (3 == SPAWN_MODE) {
int np[2] = { 1, 1 };
int errcodes[2];
int np[2] = {2, 1};
int errcodes[3];
MPI_Comm parentcomm, intercomm;
char *cmds[2] = { "hello", "hello" };
MPI_Info infos[2] = { MPI_INFO_NULL, MPI_INFO_NULL };
MPI_Comm_get_parent( &parentcomm );
char *cmds[3] = {"/home/ompi/rank-swapper-agent/hello",
"/home/ompi/rank-swapper-agent/hello",
"/home/ompi/rank-swapper-agent/hello"};
MPI_Info infos[3] = {MPI_INFO_NULL, MPI_INFO_NULL, MPI_INFO_NULL};
MPI_Comm_get_parent(&parentcomm);
if (parentcomm == MPI_COMM_NULL) {
// Create n more processes using the "hello" executable
MPI_Comm_spawn_multiple(2, cmds, MPI_ARGVS_NULL, np, infos, 0, MPI_COMM_WORLD, &intercomm, errcodes );
MPI_Comm_spawn_multiple(2, cmds, MPI_ARGVS_NULL, np, infos, 0, MPI_COMM_WORLD,
&intercomm, errcodes);
printf("I'm the parent.\n");
fflush(stdout);
} else {
printf("I'm the spawned.\n");
fflush(stdout);
}
for (int i = 0; i < 2; i++) {
for (int i = 0; i < 3; i++) {
if (0 != errcodes[i]) {
printf("ERROR_SPAWN: code: %d\n", errcodes[i]);
fflush(stdout);
}
}
}
sleep(5);
// printf("Sleeping.\n");
// fflush(stdout);
// sleep(5);
// printf("Done sleeping.\n");
// fflush(stdout);
MPI_Finalize();
//notifyProcessAgent(pid, rank, "Ended");
// notifyProcessAgent(pid, rank, "Ended");
return 0;
}