Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • bugfix/debug_print
  • bugfix/rank_setting
  • master
3 results

Target

Select target project
  • becker29/master-thesis-custom-ompi
  • felixkhals/swp-cm22-planbased-mpi
2 results
Select Git revision
  • bugfix/debug_print
  • bugfix/rank_setting
  • master
3 results
Show changes
Commits on Source (8)
......@@ -13,6 +13,8 @@
.hg
.hgignore_local
build_docker
*.la
*.lo
*.o
......
......@@ -32,35 +32,38 @@
#include "ompi_config.h"
#include <endian.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/un.h>
#include <unistd.h>
#include <string.h>
#include <endian.h>
#include "opal/util/bit_ops.h"
#include "opal/util/info_subscriber.h"
#include "opal/util/string_copy.h"
#include "opal/mca/pmix/pmix-internal.h"
#include "ompi/attribute/attribute.h"
#include "ompi/communicator/communicator.h"
#include "ompi/constants.h"
#include "ompi/mca/pml/pml.h"
#include "ompi/dpm/dpm.h"
#include "ompi/mca/coll/base/base.h"
#include "ompi/mca/coll/base/coll_tags.h"
#include "ompi/mca/pml/pml.h"
#include "ompi/mca/topo/base/base.h"
#include "ompi/runtime/params.h"
#include "ompi/communicator/communicator.h"
#include "ompi/attribute/attribute.h"
#include "ompi/dpm/dpm.h"
#include "ompi/memchecker.h"
#include "ompi/runtime/params.h"
#include "opal/mca/pmix/pmix-internal.h"
#include "opal/util/bit_ops.h"
#include "opal/util/info_subscriber.h"
#include "opal/util/string_copy.h"
#define FD_STDIN 0
#define BUFFLEN 128
#define PMIX_DYNAMIC_ID_ENV_VAR "DPM_PMIX_DYNAMIC_ID"
#define JOBID_ENV_VAR "SLURM_VRM_JOBID"
#define SLURM_JOBID_ENV_VAR "SLURM_JOB_ID"
#define PMIX_NAMESPACE_ENV_VAR "PMIX_NAMESPACE"
/*
** Table for Fortran <-> C communicator handle conversion
......@@ -76,18 +79,14 @@ ompi_predefined_communicator_t ompi_mpi_comm_self = {{{{0}}}};
ompi_predefined_communicator_t ompi_mpi_comm_null = {{{{0}}}};
ompi_communicator_t *ompi_mpi_comm_parent = NULL;
ompi_predefined_communicator_t *ompi_mpi_comm_world_addr =
&ompi_mpi_comm_world;
ompi_predefined_communicator_t *ompi_mpi_comm_self_addr =
&ompi_mpi_comm_self;
ompi_predefined_communicator_t *ompi_mpi_comm_null_addr =
&ompi_mpi_comm_null;
ompi_predefined_communicator_t *ompi_mpi_comm_world_addr = &ompi_mpi_comm_world;
ompi_predefined_communicator_t *ompi_mpi_comm_self_addr = &ompi_mpi_comm_self;
ompi_predefined_communicator_t *ompi_mpi_comm_null_addr = &ompi_mpi_comm_null;
static void ompi_comm_construct(ompi_communicator_t *comm);
static void ompi_comm_destruct(ompi_communicator_t *comm);
OBJ_CLASS_INSTANCE(ompi_communicator_t, opal_infosubscriber_t,
ompi_comm_construct,
OBJ_CLASS_INSTANCE(ompi_communicator_t, opal_infosubscriber_t, ompi_comm_construct,
ompi_comm_destruct);
/* This is the counter for the number of communicators, which contain
......@@ -95,7 +94,8 @@ OBJ_CLASS_INSTANCE(ompi_communicator_t, opal_infosubscriber_t,
shortcut for finalize and abort. */
int ompi_comm_num_dyncomm = 0;
static void errorExit(char* msg) {
static void errorExit(char *msg)
{
perror(msg);
exit(1);
}
......@@ -104,7 +104,9 @@ static void errorExit(char* msg) {
* Connects to the node agent, sends its own process information
* and receives the list of modified ranks
*/
static int get_modified_ranks(uint32_t jobid, uint32_t vpid, size_t size, opal_vpid_t *modified_ranks) {
static int get_modified_ranks(uint32_t jobid, uint32_t vpid, size_t size,
opal_vpid_t *modified_ranks)
{
struct addrinfo hints;
struct addrinfo *result, *rp;
const char *host_ip = "localhost";
......@@ -146,6 +148,7 @@ static int get_modified_ranks(uint32_t jobid, uint32_t vpid, size_t size, opal_v
pid_t pid = getpid();
// Add VRM JobID
const char *vrm_jobid = getenv(JOBID_ENV_VAR);
char vrm_jobid_with_leading_comma[sizeof(uint64_t) + 1] = "";
if (NULL != vrm_jobid) {
......@@ -155,11 +158,39 @@ static int get_modified_ranks(uint32_t jobid, uint32_t vpid, size_t size, opal_v
printf("TEST: %s", vrm_jobid_with_leading_comma);
}
// Add Slurm JobID
const char *slurm_jobid = getenv(SLURM_JOBID_ENV_VAR);
char slurm_jobid_str[sizeof(uint64_t) + 1] = "";
if (NULL != slurm_jobid) {
char comma = ',';
strncat(slurm_jobid_str, &comma, 1);
strcat(slurm_jobid_str, slurm_jobid);
printf("TEST JobID: %s", slurm_jobid_str);
}
// Dynamic Identifier
const char *pmix_id_offset = getenv(PMIX_DYNAMIC_ID_ENV_VAR);
char pmix_dynamic_id[128] = "";
char pmix_namespace[257] = ""; // PMIx max namespace len + comma
if (NULL != pmix_id_offset) {
size_t dynamic_id = 0;
sscanf(pmix_id_offset, "%zu", &dynamic_id);
dynamic_id += vpid;
sprintf(pmix_dynamic_id, ",%zu", dynamic_id);
// PMIx Namespace
const char *pmix_nspace_env = getenv(PMIX_NAMESPACE_ENV_VAR);
if (NULL != pmix_nspace_env) {
strncat(pmix_namespace, ",", 1);
strcat(pmix_namespace, pmix_nspace_env);
}
}
char info_to_send[BUFFLEN];
memset(info_to_send, 0, BUFFLEN);
snprintf(info_to_send, BUFFLEN,
"{\"msg_type\": 128, \"msg_data\": \"%d,%u,%u,%zu%s\"}",
pid, vpid, jobid, size, vrm_jobid_with_leading_comma);
snprintf(info_to_send, BUFFLEN, "{\"msg_type\": 128, \"msg_data\": \"%d,%u%s,%zu%s%s%s\"}", pid,
vpid, slurm_jobid_str, size, vrm_jobid_with_leading_comma, pmix_dynamic_id,
pmix_namespace);
uint32_t msg_length = strlen(info_to_send) + 1;
// Ensure that little endian is used for communinication (by convention with server)
......@@ -179,6 +210,7 @@ static int get_modified_ranks(uint32_t jobid, uint32_t vpid, size_t size, opal_v
// receive the actual message content
recv(socket_fd, rank_to_recv, answer_length, 0);
printf("Received from server: %s\n", rank_to_recv);
fflush(stdout);
// look for msg_data field in JSON string
const char msg_data_key[] = "msg_data";
......@@ -213,15 +245,15 @@ int ompi_comm_init(void)
/* Setup communicator array */
OBJ_CONSTRUCT(&ompi_mpi_communicators, opal_pointer_array_t);
if( OPAL_SUCCESS != opal_pointer_array_init(&ompi_mpi_communicators, 16,
OMPI_FORTRAN_HANDLE_MAX, 64) ) {
if (OPAL_SUCCESS
!= opal_pointer_array_init(&ompi_mpi_communicators, 16, OMPI_FORTRAN_HANDLE_MAX, 64)) {
return OMPI_ERROR;
}
/* Setup f to c table (we can no longer use the cid as the fortran handle) */
OBJ_CONSTRUCT(&ompi_comm_f_to_c_table, opal_pointer_array_t);
if( OPAL_SUCCESS != opal_pointer_array_init(&ompi_comm_f_to_c_table, 8,
OMPI_FORTRAN_HANDLE_MAX, 32) ) {
if (OPAL_SUCCESS
!= opal_pointer_array_init(&ompi_comm_f_to_c_table, 8, OMPI_FORTRAN_HANDLE_MAX, 32)) {
return OMPI_ERROR;
}
......@@ -244,7 +276,8 @@ int ompi_comm_init(void)
group->grp_proc_pointers[modified_rank] = (ompi_proc_t *) ompi_proc_lookup(name);
if (NULL == group->grp_proc_pointers[modified_rank]) {
/* set sentinel value */
group->grp_proc_pointers[modified_rank] = (ompi_proc_t *) ompi_proc_name_to_sentinel (name);
group->grp_proc_pointers[modified_rank] = (ompi_proc_t *) ompi_proc_name_to_sentinel(
name);
} else {
OBJ_RETAIN(group->grp_proc_pointers[modified_rank]);
}
......@@ -368,7 +401,6 @@ int ompi_comm_init(void)
return OMPI_SUCCESS;
}
ompi_communicator_t *ompi_comm_allocate(int local_size, int remote_size)
{
ompi_communicator_t *new_comm;
......@@ -403,15 +435,22 @@ int ompi_comm_finalize(void)
/* Shut down MPI_COMM_SELF */
OBJ_DESTRUCT(&ompi_mpi_comm_self);
printf("FIN 7aa\n");
fflush(stdout);
/* disconnect all dynamic communicators */
ompi_dpm_dyn_finalize();
printf("FIN 7ab\n");
fflush(stdout);
/* Free the attributes on comm world. This is not done in the
* destructor as we delete attributes in ompi_comm_free (which
* is not called for comm world) */
if (NULL != ompi_mpi_comm_world.comm.c_keyhash) {
/* Ignore errors when deleting attributes on comm_world */
(void) ompi_attr_delete_all(COMM_ATTR, &ompi_mpi_comm_world.comm, ompi_mpi_comm_world.comm.c_keyhash);
(void) ompi_attr_delete_all(COMM_ATTR, &ompi_mpi_comm_world.comm,
ompi_mpi_comm_world.comm.c_keyhash);
OBJ_RELEASE(ompi_mpi_comm_world.comm.c_keyhash);
}
......@@ -484,9 +523,15 @@ int ompi_comm_finalize(void)
OBJ_DESTRUCT(&ompi_mpi_communicators);
OBJ_DESTRUCT(&ompi_comm_f_to_c_table);
printf("FIN 7ac\n");
fflush(stdout);
/* finalize communicator requests */
ompi_comm_request_fini();
printf("FIN 7ad\n");
fflush(stdout);
return OMPI_SUCCESS;
}
......@@ -600,26 +645,23 @@ static void ompi_comm_destruct(ompi_communicator_t* comm)
#endif /* OPAL_ENABLE_FT_MPI */
/* mark this cid as available */
if ( MPI_UNDEFINED != (int)comm->c_contextid &&
NULL != opal_pointer_array_get_item(&ompi_mpi_communicators,
comm->c_contextid)) {
opal_pointer_array_set_item ( &ompi_mpi_communicators,
comm->c_contextid, NULL);
if (MPI_UNDEFINED != (int) comm->c_contextid
&& NULL != opal_pointer_array_get_item(&ompi_mpi_communicators, comm->c_contextid)) {
opal_pointer_array_set_item(&ompi_mpi_communicators, comm->c_contextid, NULL);
}
/* reset the ompi_comm_f_to_c_table entry */
if ( MPI_UNDEFINED != comm->c_f_to_c_index &&
NULL != opal_pointer_array_get_item(&ompi_comm_f_to_c_table,
comm->c_f_to_c_index)) {
opal_pointer_array_set_item ( &ompi_comm_f_to_c_table,
comm->c_f_to_c_index, NULL);
if (MPI_UNDEFINED != comm->c_f_to_c_index
&& NULL != opal_pointer_array_get_item(&ompi_comm_f_to_c_table, comm->c_f_to_c_index)) {
opal_pointer_array_set_item(&ompi_comm_f_to_c_table, comm->c_f_to_c_index, NULL);
}
OBJ_DESTRUCT(&comm->c_lock);
}
#define OMPI_COMM_SET_INFO_FN(name, flag) \
static const char *ompi_comm_set_ ## name (opal_infosubscriber_t *obj, const char *key, const char *value) \
static const char *ompi_comm_set_##name(opal_infosubscriber_t *obj, const char *key, \
const char *value) \
{ \
ompi_communicator_t *comm = (ompi_communicator_t *) obj; \
\
......@@ -641,16 +683,20 @@ void ompi_comm_assert_subscribe (ompi_communicator_t *comm, int32_t assert_flag)
{
switch (assert_flag) {
case OMPI_COMM_ASSERT_NO_ANY_SOURCE:
opal_infosubscribe_subscribe (&comm->super, "mpi_assert_no_any_source", "false", ompi_comm_set_no_any_source);
opal_infosubscribe_subscribe(&comm->super, "mpi_assert_no_any_source", "false",
ompi_comm_set_no_any_source);
break;
case OMPI_COMM_ASSERT_NO_ANY_TAG:
opal_infosubscribe_subscribe (&comm->super, "mpi_assert_no_any_tag", "false", ompi_comm_set_no_any_tag);
opal_infosubscribe_subscribe(&comm->super, "mpi_assert_no_any_tag", "false",
ompi_comm_set_no_any_tag);
break;
case OMPI_COMM_ASSERT_ALLOW_OVERTAKE:
opal_infosubscribe_subscribe (&comm->super, "mpi_assert_allow_overtaking", "false", ompi_comm_set_allow_overtake);
opal_infosubscribe_subscribe(&comm->super, "mpi_assert_allow_overtaking", "false",
ompi_comm_set_allow_overtake);
break;
case OMPI_COMM_ASSERT_EXACT_LENGTH:
opal_infosubscribe_subscribe (&comm->super, "mpi_assert_exact_length", "false", ompi_comm_set_exact_length);
opal_infosubscribe_subscribe(&comm->super, "mpi_assert_exact_length", "false",
ompi_comm_set_exact_length);
break;
}
}
This diff is collapsed.
......@@ -27,17 +27,17 @@
#include "ompi_config.h"
#include <stdio.h>
#include "opal/util/show_help.h"
#include "opal/util/printf.h"
#include "opal/util/show_help.h"
#include "ompi/info/info.h"
#include "ompi/mpi/c/bindings.h"
#include "ompi/runtime/params.h"
#include "ompi/runtime/mpiruntime.h"
#include "ompi/communicator/communicator.h"
#include "ompi/errhandler/errhandler.h"
#include "ompi/dpm/dpm.h"
#include "ompi/errhandler/errhandler.h"
#include "ompi/info/info.h"
#include "ompi/memchecker.h"
#include "ompi/mpi/c/bindings.h"
#include "ompi/runtime/mpiruntime.h"
#include "ompi/runtime/params.h"
#if OMPI_BUILD_MPI_PROFILING
# if OPAL_HAVE_WEAK_SYMBOLS
......@@ -48,39 +48,36 @@
static const char FUNC_NAME[] = "MPI_Comm_spawn";
int MPI_Comm_spawn(const char *command, char *argv[], int maxprocs, MPI_Info info,
int root, MPI_Comm comm, MPI_Comm *intercomm,
int array_of_errcodes[])
int MPI_Comm_spawn(const char *command, char *argv[], int maxprocs, MPI_Info info, int root,
MPI_Comm comm, MPI_Comm *intercomm, int array_of_errcodes[])
{
printf("MPI A\n");
fflush(stdout);
int rank, rc = OMPI_SUCCESS, i, flag;
bool send_first = false; /* we wait to be contacted */
ompi_communicator_t *newcomp = MPI_COMM_NULL;
char port_name[MPI_MAX_PORT_NAME]; char *port_string = NULL;
char port_name[MPI_MAX_PORT_NAME];
char *port_string = NULL;
bool non_mpi = false;
MEMCHECKER(
memchecker_comm(comm);
);
MEMCHECKER(memchecker_comm(comm););
printf("MPI B\n");
fflush(stdout);
if (MPI_PARAM_CHECK) {
OMPI_ERR_INIT_FINALIZE(FUNC_NAME);
if (ompi_comm_invalid(comm)) {
return OMPI_ERRHANDLER_NOHANDLE_INVOKE(MPI_ERR_COMM,
FUNC_NAME);
return OMPI_ERRHANDLER_NOHANDLE_INVOKE(MPI_ERR_COMM, FUNC_NAME);
}
if (OMPI_COMM_IS_INTER(comm)) {
return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_COMM,
FUNC_NAME);
return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_COMM, FUNC_NAME);
}
if ((0 > root) || (ompi_comm_size(comm) <= root)) {
return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_ARG,
FUNC_NAME);
return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_ARG, FUNC_NAME);
}
if (NULL == intercomm) {
return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_ARG,
FUNC_NAME);
return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_ARG, FUNC_NAME);
}
}
......@@ -89,24 +86,25 @@ int MPI_Comm_spawn(const char *command, char *argv[], int maxprocs, MPI_Info inf
return OMPI_ERRHANDLER_INVOKE(comm, rc, FUNC_NAME);
}
#endif
printf("MPI C\n");
fflush(stdout);
rank = ompi_comm_rank(comm);
if (MPI_PARAM_CHECK) {
if (rank == root) {
if (NULL == command) {
return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_ARG,
FUNC_NAME);
return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_ARG, FUNC_NAME);
}
if (0 > maxprocs) {
return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_ARG,
FUNC_NAME);
return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_ARG, FUNC_NAME);
}
if (NULL == info || ompi_info_is_freed(info)) {
return OMPI_ERRHANDLER_NOHANDLE_INVOKE(MPI_ERR_INFO,
FUNC_NAME);
return OMPI_ERRHANDLER_NOHANDLE_INVOKE(MPI_ERR_INFO, FUNC_NAME);
}
}
}
printf("MPI D\n");
fflush(stdout);
if (!ompi_mpi_dynamics_is_enabled(FUNC_NAME)) {
return OMPI_ERRHANDLER_INVOKE(comm, OMPI_ERR_NOT_SUPPORTED, FUNC_NAME);
......@@ -114,6 +112,8 @@ int MPI_Comm_spawn(const char *command, char *argv[], int maxprocs, MPI_Info inf
/* initialize the port name to avoid problems */
memset(port_name, 0, MPI_MAX_PORT_NAME);
printf("MPI E\n");
fflush(stdout);
/* See if the info key "ompi_non_mpi" was set to true */
if (rank == root) {
......@@ -124,33 +124,48 @@ int MPI_Comm_spawn(const char *command, char *argv[], int maxprocs, MPI_Info inf
if (!non_mpi) {
/* Open a port. The port_name is passed as an environment
variable to the children. */
printf("NON_MPI\n");
fflush(stdout);
if (OMPI_SUCCESS != (rc = ompi_dpm_open_port(port_name))) {
goto error;
}
} else if (1 < ompi_comm_size(comm)) {
printf("OMPI_COMM_SIZE\n");
fflush(stdout);
/* we do not support non_mpi spawns on comms this size */
rc = OMPI_ERR_NOT_SUPPORTED;
goto error;
}
if (OMPI_SUCCESS != (rc = ompi_dpm_spawn (1, &command, &argv, &maxprocs,
&info, port_name))) {
printf("SPAWN\n");
fflush(stdout);
if (OMPI_SUCCESS
!= (rc = ompi_dpm_spawn(1, &command, &argv, &maxprocs, &info, port_name))) {
goto error;
}
}
printf("MPI F\n");
fflush(stdout);
error:
printf("MPI G\n");
fflush(stdout);
if (OMPI_SUCCESS != rc) {
printf("NO SUCCESS\n");
fflush(stdout);
/* There was an error in one of the above stages,
* we still need to do the connect_accept stage so that
* non-root ranks do not deadlock.
* Add the error code to the port string for connect_accept
* to propagate the error code. */
(void) opal_asprintf(&port_string, "%s:error=%d", port_name, rc);
}
else {
} else {
port_string = port_name;
}
printf("MPI H\n");
fflush(stdout);
if (non_mpi) {
newcomp = MPI_COMM_NULL;
} else {
......@@ -158,17 +173,19 @@ error:
}
if (OPAL_ERR_NOT_SUPPORTED == rc) {
opal_show_help("help-mpi-api.txt",
"MPI function not supported",
true,
FUNC_NAME,
printf("NOT SUPPORTED\n");
opal_show_help("help-mpi-api.txt", "MPI function not supported", true, FUNC_NAME,
"Underlying runtime environment does not support spawn functionality");
fflush(stdout);
}
if (port_string != port_name) {
free(port_string);
}
printf("MPI I\n");
fflush(stdout);
/* close the port */
if (rank == root && !non_mpi) {
ompi_dpm_close_port(port_name);
......@@ -181,6 +198,9 @@ error:
}
}
printf("MPI J\n", rc);
fflush(stdout);
*intercomm = newcomp;
OMPI_ERRHANDLER_RETURN(rc, comm, rc, FUNC_NAME);
}
......@@ -27,17 +27,17 @@
#include "ompi_config.h"
#include <stdio.h>
#include "opal/util/show_help.h"
#include "opal/util/printf.h"
#include "opal/util/show_help.h"
#include "ompi/mpi/c/bindings.h"
#include "ompi/runtime/params.h"
#include "ompi/runtime/mpiruntime.h"
#include "ompi/communicator/communicator.h"
#include "ompi/dpm/dpm.h"
#include "ompi/errhandler/errhandler.h"
#include "ompi/info/info.h"
#include "ompi/dpm/dpm.h"
#include "ompi/memchecker.h"
#include "ompi/mpi/c/bindings.h"
#include "ompi/runtime/mpiruntime.h"
#include "ompi/runtime/params.h"
#if OMPI_BUILD_MPI_PROFILING
# if OPAL_HAVE_WEAK_SYMBOLS
......@@ -48,28 +48,26 @@
static const char FUNC_NAME[] = "MPI_Comm_spawn_multiple";
int MPI_Comm_spawn_multiple(int count, char *array_of_commands[], char **array_of_argv[],
const int array_of_maxprocs[], const MPI_Info array_of_info[],
int root, MPI_Comm comm, MPI_Comm *intercomm,
int array_of_errcodes[])
const int array_of_maxprocs[], const MPI_Info array_of_info[], int root,
MPI_Comm comm, MPI_Comm *intercomm, int array_of_errcodes[])
{
printf("MPI A\n");
fflush(stdout);
int i = 0, rc = 0, rank = 0, size = 0, flag;
ompi_communicator_t *newcomp = MPI_COMM_NULL;
bool send_first = false; /* they are contacting us first */
char port_name[MPI_MAX_PORT_NAME]; char *port_string = NULL;
char port_name[MPI_MAX_PORT_NAME];
char *port_string = NULL;
bool non_mpi = false, cumulative = false;
MEMCHECKER(
memchecker_comm(comm);
);
MEMCHECKER(memchecker_comm(comm););
if (MPI_PARAM_CHECK) {
OMPI_ERR_INIT_FINALIZE(FUNC_NAME);
if (ompi_comm_invalid(comm)) {
return OMPI_ERRHANDLER_NOHANDLE_INVOKE(MPI_ERR_COMM,
FUNC_NAME);
return OMPI_ERRHANDLER_NOHANDLE_INVOKE(MPI_ERR_COMM, FUNC_NAME);
}
if (OMPI_COMM_IS_INTER(comm)) {
return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_COMM, FUNC_NAME);
......@@ -82,6 +80,9 @@ int MPI_Comm_spawn_multiple(int count, char *array_of_commands[], char **array_o
}
}
printf("MPI B\n");
fflush(stdout);
rank = ompi_comm_rank(comm);
if (MPI_PARAM_CHECK) {
if (rank == root) {
......@@ -98,17 +99,14 @@ int MPI_Comm_spawn_multiple(int count, char *array_of_commands[], char **array_o
return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_INFO, FUNC_NAME);
}
for (i = 0; i < count; ++i) {
if (NULL == array_of_info[i] ||
ompi_info_is_freed(array_of_info[i])) {
return OMPI_ERRHANDLER_NOHANDLE_INVOKE(MPI_ERR_INFO,
FUNC_NAME);
if (NULL == array_of_info[i] || ompi_info_is_freed(array_of_info[i])) {
return OMPI_ERRHANDLER_NOHANDLE_INVOKE(MPI_ERR_INFO, FUNC_NAME);
}
/* If ompi_non_mpi is set to true on any info, it must
be set to true on all of them. Note that not
setting ompi_non_mpi is the same as setting it to
false. */
ompi_info_get_bool(array_of_info[i], "ompi_non_mpi", &non_mpi,
&flag);
ompi_info_get_bool(array_of_info[i], "ompi_non_mpi", &non_mpi, &flag);
if (flag && 0 == i) {
/* If this is the first info, save its
ompi_non_mpi value */
......@@ -119,9 +117,7 @@ int MPI_Comm_spawn_multiple(int count, char *array_of_commands[], char **array_o
/* If this info's effective value doesn't agree with
the rest of them, error */
if (cumulative != non_mpi) {
return OMPI_ERRHANDLER_NOHANDLE_INVOKE(
MPI_ERR_INFO,
FUNC_NAME);
return OMPI_ERRHANDLER_NOHANDLE_INVOKE(MPI_ERR_INFO, FUNC_NAME);
}
}
for (i = 0; i < count; i++) {
......@@ -135,16 +131,21 @@ int MPI_Comm_spawn_multiple(int count, char *array_of_commands[], char **array_o
}
}
printf("MPI C\n");
fflush(stdout);
if (!ompi_mpi_dynamics_is_enabled(FUNC_NAME)) {
return OMPI_ERRHANDLER_INVOKE(comm, OMPI_ERR_NOT_SUPPORTED, FUNC_NAME);
}
printf("MPI D\n");
fflush(stdout);
if (rank == root) {
if (MPI_INFO_NULL == array_of_info[0]) {
non_mpi = false;
} else {
ompi_info_get_bool(array_of_info[0], "ompi_non_mpi", &non_mpi,
&flag);
ompi_info_get_bool(array_of_info[0], "ompi_non_mpi", &non_mpi, &flag);
if (!flag) {
non_mpi = false;
}
......@@ -160,6 +161,8 @@ int MPI_Comm_spawn_multiple(int count, char *array_of_commands[], char **array_o
/* initialize the port name to avoid problems */
memset(port_name, 0, MPI_MAX_PORT_NAME);
printf("MPI E\n");
fflush(stdout);
if (rank == root) {
if (!non_mpi) {
......@@ -173,14 +176,20 @@ int MPI_Comm_spawn_multiple(int count, char *array_of_commands[], char **array_o
rc = OMPI_ERR_NOT_SUPPORTED;
goto error;
}
if (OMPI_SUCCESS != (rc = ompi_dpm_spawn(count, (const char **) array_of_commands,
array_of_argv, array_of_maxprocs,
array_of_info, port_name))) {
if (OMPI_SUCCESS
!= (rc = ompi_dpm_spawn(count, (const char **) array_of_commands, array_of_argv,
array_of_maxprocs, array_of_info, port_name))) {
goto error;
}
}
printf("MPI F\n");
fflush(stdout);
error:
printf("MPI G\n");
fflush(stdout);
if (OMPI_SUCCESS != rc) {
/* There was an error in one of the above stages,
* we still need to do the connect_accept stage so that
......@@ -188,8 +197,7 @@ error:
* Add the error code to the port string for connect_accept
* to propagate the error code. */
(void) opal_asprintf(&port_string, "%s:error=%d", port_name, rc);
}
else {
} else {
port_string = port_name;
}
......@@ -199,11 +207,11 @@ error:
rc = ompi_dpm_connect_accept(comm, root, port_string, send_first, &newcomp);
}
printf("MPI H\n");
fflush(stdout);
if (OPAL_ERR_NOT_SUPPORTED == rc) {
opal_show_help("help-mpi-api.txt",
"MPI function not supported",
true,
FUNC_NAME,
opal_show_help("help-mpi-api.txt", "MPI function not supported", true, FUNC_NAME,
"Underlying runtime environment does not support spawn functionality");
}
......@@ -216,6 +224,9 @@ error:
ompi_dpm_close_port(port_name);
}
printf("MPI I\n");
fflush(stdout);
/* set array of errorcodes */
if (MPI_ERRCODES_IGNORE != array_of_errcodes) {
if (MPI_COMM_NULL != newcomp) {
......@@ -230,7 +241,9 @@ error:
}
}
printf("MPI J\n");
fflush(stdout);
*intercomm = newcomp;
OMPI_ERRHANDLER_RETURN(rc, comm, rc, FUNC_NAME);
}
......@@ -20,10 +20,10 @@
#include "ompi_config.h"
#include "ompi/mpi/c/bindings.h"
#include "ompi/runtime/params.h"
#include "ompi/errhandler/errhandler.h"
#include "ompi/mpi/c/bindings.h"
#include "ompi/runtime/ompi_spc.h"
#include "ompi/runtime/params.h"
#if OMPI_BUILD_MPI_PROFILING
# if OPAL_HAVE_WEAK_SYMBOLS
......@@ -34,19 +34,26 @@
static const char FUNC_NAME[] = "MPI_Finalize";
int MPI_Finalize(void)
{
/* If --with-spc and ompi_mpi_spc_dump_enabled were specified, print
* all of the final SPC values aggregated across the whole MPI run.
* Also, free all SPC memory.
*/
printf("FIN 1\n");
fflush(stdout);
SPC_FINI();
printf("FIN 2\n");
fflush(stdout);
if (MPI_PARAM_CHECK) {
OMPI_ERR_INIT_FINALIZE(FUNC_NAME);
}
printf("FIN 3\n");
fflush(stdout);
/* Pretty simple */
return ompi_mpi_finalize();
......
......@@ -47,49 +47,49 @@
# include <netdb.h>
#endif
#include "opal/util/event.h"
#include "opal/util/output.h"
#include "opal/runtime/opal_progress.h"
#include "opal/mca/allocator/base/base.h"
#include "opal/mca/base/base.h"
#include "opal/sys/atomic.h"
#include "opal/runtime/opal.h"
#include "opal/util/show_help.h"
#include "opal/util/opal_environ.h"
#include "opal/mca/mpool/base/base.h"
#include "opal/mca/mpool/base/mpool_base_tree.h"
#include "opal/mca/rcache/base/base.h"
#include "opal/mca/allocator/base/base.h"
#include "opal/mca/pmix/pmix-internal.h"
#include "opal/mca/rcache/base/base.h"
#include "opal/runtime/opal.h"
#include "opal/runtime/opal_progress.h"
#include "opal/sys/atomic.h"
#include "opal/util/event.h"
#include "opal/util/opal_environ.h"
#include "opal/util/output.h"
#include "opal/util/show_help.h"
#include "opal/util/timings.h"
#include "mpi.h"
#include "ompi/constants.h"
#include "ompi/errhandler/errcode.h"
#include "ompi/attribute/attribute.h"
#include "ompi/communicator/communicator.h"
#include "ompi/constants.h"
#include "ompi/datatype/ompi_datatype.h"
#include "ompi/message/message.h"
#include "ompi/op/op.h"
#include "ompi/dpm/dpm.h"
#include "ompi/errhandler/errcode.h"
#include "ompi/file/file.h"
#include "ompi/info/info.h"
#include "ompi/runtime/mpiruntime.h"
#include "ompi/attribute/attribute.h"
#include "ompi/mca/pml/pml.h"
#include "ompi/mca/bml/bml.h"
#include "ompi/mca/pml/base/base.h"
#include "ompi/mca/bml/base/base.h"
#include "ompi/mca/osc/base/base.h"
#include "ompi/mca/part/base/base.h"
#include "ompi/mca/coll/base/coll_base_functions.h"
#include "ompi/mca/bml/bml.h"
#include "ompi/mca/coll/base/base.h"
#include "ompi/runtime/ompi_rte.h"
#include "ompi/mca/topo/base/base.h"
#include "ompi/mca/io/io.h"
#include "ompi/mca/coll/base/coll_base_functions.h"
#include "ompi/mca/hook/base/base.h"
#include "ompi/mca/io/base/base.h"
#include "ompi/mca/io/io.h"
#include "ompi/mca/osc/base/base.h"
#include "ompi/mca/part/base/base.h"
#include "ompi/mca/pml/base/base.h"
#include "ompi/mca/pml/base/pml_base_bsend.h"
#include "ompi/runtime/params.h"
#include "ompi/dpm/dpm.h"
#include "ompi/mca/pml/pml.h"
#include "ompi/mca/topo/base/base.h"
#include "ompi/message/message.h"
#include "ompi/mpiext/mpiext.h"
#include "ompi/mca/hook/base/base.h"
#include "ompi/op/op.h"
#include "ompi/runtime/mpiruntime.h"
#include "ompi/runtime/ompi_rte.h"
#include "ompi/runtime/params.h"
extern bool ompi_enable_timing;
......@@ -112,11 +112,13 @@ int ompi_mpi_finalize(void)
ompi_datatype_t *datatype;
pmix_status_t rc;
printf("FIN 4\n");
fflush(stdout);
ompi_hook_base_mpi_finalize_top();
int32_t state = ompi_mpi_state;
if (state < OMPI_MPI_STATE_INIT_COMPLETED ||
state >= OMPI_MPI_STATE_FINALIZE_STARTED) {
if (state < OMPI_MPI_STATE_INIT_COMPLETED || state >= OMPI_MPI_STATE_FINALIZE_STARTED) {
/* Note that if we're not initialized or already finalized, we
cannot raise an MPI error. The best that we can do is
write something to stderr. */
......@@ -125,13 +127,11 @@ int ompi_mpi_finalize(void)
hostname = opal_gethostname();
if (state < OMPI_MPI_STATE_INIT_COMPLETED) {
opal_show_help("help-mpi-runtime.txt",
"mpi_finalize: not initialized",
true, hostname, pid);
opal_show_help("help-mpi-runtime.txt", "mpi_finalize: not initialized", true, hostname,
pid);
} else if (state >= OMPI_MPI_STATE_FINALIZE_STARTED) {
opal_show_help("help-mpi-runtime.txt",
"mpi_finalize:invoked_multiple_times",
true, hostname, pid);
opal_show_help("help-mpi-runtime.txt", "mpi_finalize:invoked_multiple_times", true,
hostname, pid);
}
return MPI_ERR_OTHER;
}
......@@ -140,13 +140,15 @@ int ompi_mpi_finalize(void)
ompi_mpiext_fini();
printf("FIN 5\n");
fflush(stdout);
/* Per MPI-2:4.8, we have to free MPI_COMM_SELF before doing
anything else in MPI_FINALIZE (to include setting up such that
MPI_FINALIZED will return true). */
if (NULL != ompi_mpi_comm_self.comm.c_keyhash) {
ompi_attr_delete_all(COMM_ATTR, &ompi_mpi_comm_self,
ompi_mpi_comm_self.comm.c_keyhash);
ompi_attr_delete_all(COMM_ATTR, &ompi_mpi_comm_self, ompi_mpi_comm_self.comm.c_keyhash);
OBJ_RELEASE(ompi_mpi_comm_self.comm.c_keyhash);
ompi_mpi_comm_self.comm.c_keyhash = NULL;
}
......@@ -154,7 +156,8 @@ int ompi_mpi_finalize(void)
#if OPAL_ENABLE_FT_MPI
if (ompi_ftmpi_enabled) {
ompi_communicator_t *comm = &ompi_mpi_comm_world.comm;
OPAL_OUTPUT_VERBOSE((50, ompi_ftmpi_output_handle, "FT: Rank %d entering finalize", ompi_comm_rank(comm)));
OPAL_OUTPUT_VERBOSE(
(50, ompi_ftmpi_output_handle, "FT: Rank %d entering finalize", ompi_comm_rank(comm)));
/* grpcomm barrier does not tolerate /new/ failures. Let's make sure
* we drain all preexisting failures before we proceed;
......@@ -178,12 +181,14 @@ int ompi_mpi_finalize(void)
/* finalize the fault tolerant infrastructure (revoke,
* failure propagator, etc). From now-on we do not tolerate new failures. */
OPAL_OUTPUT_VERBOSE((50, ompi_ftmpi_output_handle, "FT: Rank %05d turning off FT", ompi_comm_rank(comm)));
OPAL_OUTPUT_VERBOSE(
(50, ompi_ftmpi_output_handle, "FT: Rank %05d turning off FT", ompi_comm_rank(comm)));
ompi_comm_failure_detector_finalize();
ompi_comm_failure_propagator_finalize();
ompi_comm_revoke_finalize();
ompi_comm_rbcast_finalize();
opal_output_verbose(40, ompi_ftmpi_output_handle, "Rank %05d: DONE WITH FINALIZE", ompi_comm_rank(comm));
opal_output_verbose(40, ompi_ftmpi_output_handle, "Rank %05d: DONE WITH FINALIZE",
ompi_comm_rank(comm));
}
#endif /* OPAL_ENABLE_FT_MPI */
......@@ -193,8 +198,7 @@ int ompi_mpi_finalize(void)
COMM_SELF is destroyed / all the attribute callbacks have been
invoked) */
opal_atomic_wmb();
opal_atomic_swap_32(&ompi_mpi_state,
OMPI_MPI_STATE_FINALIZE_PAST_COMM_SELF_DESTRUCT);
opal_atomic_swap_32(&ompi_mpi_state, OMPI_MPI_STATE_FINALIZE_PAST_COMM_SELF_DESTRUCT);
/* As finalize is the last legal MPI call, we are allowed to force the release
* of the user buffer used for bsend, before going anywhere further.
......@@ -205,6 +209,9 @@ int ompi_mpi_finalize(void)
opal_progress_set_event_flag(OPAL_EVLOOP_ONCE | OPAL_EVLOOP_NONBLOCK);
#endif
printf("FIN 6\n");
fflush(stdout);
/* Redo ORTE calling opal_progress_event_users_increment() during
MPI lifetime, to get better latency when not using TCP */
opal_progress_event_users_increment();
......@@ -293,7 +300,8 @@ int ompi_mpi_finalize(void)
* communications/actions to complete. See
* https://github.com/open-mpi/ompi/issues/1576 for the
* original bug report. */
if (PMIX_SUCCESS != (rc = PMIx_Fence_nb(NULL, 0, NULL, 0, fence_cbfunc, (void*)&active))) {
if (PMIX_SUCCESS
!= (rc = PMIx_Fence_nb(NULL, 0, NULL, 0, fence_cbfunc, (void *) &active))) {
ret = opal_pmix_convert_status(rc);
OMPI_ERROR_LOG(ret);
/* Reset the active flag to false, to avoid waiting for
......@@ -303,6 +311,9 @@ int ompi_mpi_finalize(void)
OMPI_LAZY_WAIT_FOR_COMPLETION(active);
}
printf("FIN 7\n");
fflush(stdout);
/* Shut down any bindings-specific issues: C++, F77, F90 */
/* Remove all memory associated by MPI_REGISTER_DATAREP (per
......@@ -325,6 +336,9 @@ int ompi_mpi_finalize(void)
OBJ_RELEASE(datatype);
OBJ_DESTRUCT(&ompi_mpi_f90_complex_hashtable);
printf("FIN 7a\n");
fflush(stdout);
/* Free communication objects */
/* free file resources */
......@@ -332,6 +346,9 @@ int ompi_mpi_finalize(void)
goto done;
}
printf("FIN 7b\n");
fflush(stdout);
/* free window resources */
if (OMPI_SUCCESS != (ret = ompi_win_finalize())) {
goto done;
......@@ -343,6 +360,8 @@ int ompi_mpi_finalize(void)
goto done;
}
printf("FIN 7c\n");
fflush(stdout);
/* free communicator resources. this MUST come before finalizing the PML
* as this will call into the pml */
......@@ -350,6 +369,9 @@ int ompi_mpi_finalize(void)
goto done;
}
printf("FIN 8\n");
fflush(stdout);
/* call del_procs on all allocated procs even though some may not be known
* to the pml layer. the pml layer is expected to be resilient and ignore
* any unknown procs. */
......@@ -420,6 +442,9 @@ int ompi_mpi_finalize(void)
goto done;
}
printf("FIN 9\n");
fflush(stdout);
/* Free all other resources */
/* free op resources */
......@@ -472,6 +497,9 @@ int ompi_mpi_finalize(void)
goto done;
}
printf("FIN 10\n");
fflush(stdout);
/* free proc resources */
if (OMPI_SUCCESS != (ret = ompi_proc_finalize())) {
goto done;
......@@ -516,11 +544,19 @@ int ompi_mpi_finalize(void)
/* All done */
printf("FIN 11\n");
fflush(stdout);
done:
printf("FIN 12\n");
fflush(stdout);
opal_atomic_wmb();
opal_atomic_swap_32(&ompi_mpi_state, OMPI_MPI_STATE_FINALIZE_COMPLETED);
ompi_hook_base_mpi_finalize_bottom();
printf("FIN 13\n");
fflush(stdout);
return ret;
}
......@@ -46,59 +46,59 @@
#include "mpi.h"
#include "opal/class/opal_list.h"
#include "opal/mca/allocator/base/base.h"
#include "opal/mca/base/base.h"
#include "opal/mca/btl/base/base.h"
#include "opal/mca/hwloc/base/base.h"
#include "opal/runtime/opal_progress.h"
#include "opal/mca/mpool/base/base.h"
#include "opal/mca/pmix/base/base.h"
#include "opal/mca/rcache/base/base.h"
#include "opal/mca/rcache/rcache.h"
#include "opal/mca/threads/threads.h"
#include "opal/runtime/opal.h"
#include "opal/runtime/opal_progress.h"
#include "opal/util/arch.h"
#include "opal/util/argv.h"
#include "opal/util/output.h"
#include "opal/util/error.h"
#include "opal/util/stacktrace.h"
#include "opal/util/show_help.h"
#include "opal/runtime/opal.h"
#include "opal/util/event.h"
#include "opal/mca/allocator/base/base.h"
#include "opal/mca/rcache/base/base.h"
#include "opal/mca/rcache/rcache.h"
#include "opal/mca/mpool/base/base.h"
#include "opal/mca/btl/base/base.h"
#include "opal/mca/pmix/base/base.h"
#include "opal/util/timings.h"
#include "opal/util/opal_environ.h"
#include "opal/util/output.h"
#include "opal/util/show_help.h"
#include "opal/util/stacktrace.h"
#include "opal/util/timings.h"
#include "ompi/constants.h"
#include "ompi/mpi/fortran/base/constants.h"
#include "ompi/runtime/mpiruntime.h"
#include "ompi/runtime/params.h"
#include "ompi/attribute/attribute.h"
#include "ompi/communicator/communicator.h"
#include "ompi/info/info.h"
#include "ompi/constants.h"
#include "ompi/debuggers/debuggers.h"
#include "ompi/dpm/dpm.h"
#include "ompi/errhandler/errcode.h"
#include "ompi/errhandler/errhandler.h"
#include "ompi/interlib/interlib.h"
#include "ompi/request/request.h"
#include "ompi/message/message.h"
#include "ompi/op/op.h"
#include "ompi/mca/op/op.h"
#include "ompi/mca/op/base/base.h"
#include "ompi/file/file.h"
#include "ompi/attribute/attribute.h"
#include "ompi/mca/pml/pml.h"
#include "ompi/mca/bml/bml.h"
#include "ompi/mca/pml/base/base.h"
#include "ompi/info/info.h"
#include "ompi/interlib/interlib.h"
#include "ompi/mca/bml/base/base.h"
#include "ompi/mca/osc/base/base.h"
#include "ompi/mca/part/base/base.h"
#include "ompi/mca/bml/bml.h"
#include "ompi/mca/coll/base/base.h"
#include "ompi/mca/io/io.h"
#include "ompi/mca/hook/base/base.h"
#include "ompi/mca/io/base/base.h"
#include "ompi/runtime/ompi_rte.h"
#include "ompi/debuggers/debuggers.h"
#include "ompi/proc/proc.h"
#include "ompi/mca/io/io.h"
#include "ompi/mca/op/base/base.h"
#include "ompi/mca/op/op.h"
#include "ompi/mca/osc/base/base.h"
#include "ompi/mca/part/base/base.h"
#include "ompi/mca/pml/base/base.h"
#include "ompi/mca/pml/base/pml_base_bsend.h"
#include "ompi/dpm/dpm.h"
#include "ompi/mca/pml/pml.h"
#include "ompi/message/message.h"
#include "ompi/mpi/fortran/base/constants.h"
#include "ompi/mpiext/mpiext.h"
#include "ompi/mca/hook/base/base.h"
#include "ompi/op/op.h"
#include "ompi/proc/proc.h"
#include "ompi/request/request.h"
#include "ompi/runtime/mpiruntime.h"
#include "ompi/runtime/ompi_rte.h"
#include "ompi/runtime/params.h"
#include "ompi/util/timings.h"
/* newer versions of gcc have poisoned this deprecated feature */
......@@ -108,8 +108,7 @@
implicity from libmpi, there are times when the malloc initialize
hook in the memory component doesn't work. So we have to do it
from here, since any MPI code is going to call MPI_Init... */
OPAL_DECLSPEC void (*__malloc_initialize_hook) (void) =
opal_memory_base_malloc_init_hook;
OPAL_DECLSPEC void (*__malloc_initialize_hook)(void) = opal_memory_base_malloc_init_hook;
#endif
/* This is required for the boundaries of the hash tables used to store
......@@ -171,10 +170,8 @@ ompi_predefined_datatype_t *ompi_mpi_2real_addr = &ompi_mpi_2real;
ompi_predefined_datatype_t *ompi_mpi_2dblprec_addr = &ompi_mpi_2dblprec;
ompi_predefined_datatype_t *ompi_mpi_2integer_addr = &ompi_mpi_2integer;
struct ompi_status_public_t *ompi_mpi_status_ignore_addr =
(ompi_status_public_t *) 0;
struct ompi_status_public_t *ompi_mpi_statuses_ignore_addr =
(ompi_status_public_t *) 0;
struct ompi_status_public_t *ompi_mpi_status_ignore_addr = (ompi_status_public_t *) 0;
struct ompi_status_public_t *ompi_mpi_statuses_ignore_addr = (ompi_status_public_t *) 0;
/*
* These variables are here, rather than under ompi/mpi/c/foo.c
......@@ -234,7 +231,6 @@ MPI_Fint *MPI_F_STATUS_IGNORE = NULL;
MPI_Fint *MPI_F_STATUSES_IGNORE = NULL;
#endif /* OMPI_BUILD_FORTRAN_BINDINGS */
/* Constants for the Fortran layer. These values are referred to via
common blocks in the Fortran equivalents. See
ompi/mpi/fortran/base/constants.h for a more detailed explanation.
......@@ -286,29 +282,25 @@ extern int ompi_mpi_event_tick_rate;
* Static functions used to configure the interactions between the OPAL and
* the runtime.
*/
static char*
_process_name_print_for_opal(const opal_process_name_t procname)
static char *_process_name_print_for_opal(const opal_process_name_t procname)
{
ompi_process_name_t *rte_name = (ompi_process_name_t *) &procname;
return OMPI_NAME_PRINT(rte_name);
}
static int
_process_name_compare(const opal_process_name_t p1, const opal_process_name_t p2)
static int _process_name_compare(const opal_process_name_t p1, const opal_process_name_t p2)
{
ompi_process_name_t *o1 = (ompi_process_name_t *) &p1;
ompi_process_name_t *o2 = (ompi_process_name_t *) &p2;
return ompi_rte_compare_name_fields(OMPI_RTE_CMP_ALL, o1, o2);
}
static int _convert_string_to_process_name(opal_process_name_t *name,
const char* name_string)
static int _convert_string_to_process_name(opal_process_name_t *name, const char *name_string)
{
return ompi_rte_convert_string_to_process_name(name, name_string);
}
static int _convert_process_name_to_string(char** name_string,
const opal_process_name_t *name)
static int _convert_process_name_to_string(char **name_string, const opal_process_name_t *name)
{
return ompi_rte_convert_process_name_to_string(name_string, name);
}
......@@ -332,8 +324,7 @@ void ompi_mpi_thread_level(int requested, int *provided)
ompi_mpi_main_thread = opal_thread_get_self();
}
ompi_mpi_thread_multiple = (ompi_mpi_thread_provided ==
MPI_THREAD_MULTIPLE);
ompi_mpi_thread_multiple = (ompi_mpi_thread_provided == MPI_THREAD_MULTIPLE);
}
static int ompi_register_mca_variables(void)
......@@ -352,10 +343,8 @@ static int ompi_register_mca_variables(void)
ompi_enable_timing = false;
(void) mca_base_var_register("ompi", "ompi", NULL, "timing",
"Request that critical timing loops be measured",
MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY,
&ompi_enable_timing);
MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0, OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY, &ompi_enable_timing);
#if OPAL_ENABLE_FT_MPI
/* Before loading any other part of the MPI library, we need to load
......@@ -379,9 +368,7 @@ static void fence_release(pmix_status_t status, void *cbdata)
OPAL_POST_OBJECT(active);
}
static void evhandler_reg_callbk(pmix_status_t status,
size_t evhandler_ref,
void *cbdata)
static void evhandler_reg_callbk(pmix_status_t status, size_t evhandler_ref, void *cbdata)
{
opal_pmix_lock_t *lock = (opal_pmix_lock_t *) cbdata;
......@@ -389,9 +376,7 @@ static void evhandler_reg_callbk(pmix_status_t status,
OPAL_PMIX_WAKEUP_THREAD(lock);
}
int ompi_mpi_init(int argc, char **argv, int requested, int *provided,
bool reinit_ok)
int ompi_mpi_init(int argc, char **argv, int requested, int *provided, bool reinit_ok)
{
int ret;
ompi_proc_t **procs;
......@@ -412,14 +397,12 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided,
int32_t expected = OMPI_MPI_STATE_NOT_INITIALIZED;
int32_t desired = OMPI_MPI_STATE_INIT_STARTED;
opal_atomic_wmb();
if (!opal_atomic_compare_exchange_strong_32(&ompi_mpi_state, &expected,
desired)) {
if (!opal_atomic_compare_exchange_strong_32(&ompi_mpi_state, &expected, desired)) {
// If we failed to atomically transition ompi_mpi_state from
// NOT_INITIALIZED to INIT_STARTED, then someone else already
// did that, and we should return.
if (expected >= OMPI_MPI_STATE_FINALIZE_STARTED) {
opal_show_help("help-mpi-runtime.txt",
"mpi_init: already finalized", true);
opal_show_help("help-mpi-runtime.txt", "mpi_init: already finalized", true);
return MPI_ERR_OTHER;
} else if (expected >= OMPI_MPI_STATE_INIT_STARTED) {
// In some cases (e.g., oshmem_shmem_init()), we may call
......@@ -433,8 +416,7 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided,
return MPI_SUCCESS;
}
opal_show_help("help-mpi-runtime.txt",
"mpi_init: invoked multiple times", true);
opal_show_help("help-mpi-runtime.txt", "mpi_init: invoked multiple times", true);
return MPI_ERR_OTHER;
}
}
......@@ -508,7 +490,8 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided,
goto error;
}
if (OPAL_SUCCESS != (ret = opal_arch_set_fortran_logical_size(sizeof(ompi_fortran_logical_t)))) {
if (OPAL_SUCCESS
!= (ret = opal_arch_set_fortran_logical_size(sizeof(ompi_fortran_logical_t)))) {
error = "ompi_mpi_init: opal_arch_set_fortran_logical_size failed";
goto error;
}
......@@ -547,7 +530,6 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided,
ompi_hook_base_mpi_init_top_post_opal(argc, argv, requested, provided);
OMPI_TIMING_NEXT("initialization");
/* Setup RTE */
......@@ -567,7 +549,8 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided,
/* give it a name so we can distinguish it */
PMIX_INFO_LOAD(&info[1], PMIX_EVENT_HDLR_NAME, "MPI-Default", PMIX_STRING);
OPAL_PMIX_CONSTRUCT_LOCK(&mylock);
PMIx_Register_event_handler(codes, 1, info, 2, ompi_errhandler_callback, evhandler_reg_callbk, (void*)&mylock);
PMIx_Register_event_handler(codes, 1, info, 2, ompi_errhandler_callback, evhandler_reg_callbk,
(void *) &mylock);
OPAL_PMIX_WAIT_THREAD(&mylock);
rc = mylock.status;
OPAL_PMIX_DESTRUCT_LOCK(&mylock);
......@@ -609,8 +592,8 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided,
error = "ompi_op_base_open() failed";
goto error;
}
if (OMPI_SUCCESS !=
(ret = ompi_op_base_find_available(OPAL_ENABLE_PROGRESS_THREADS,
if (OMPI_SUCCESS
!= (ret = ompi_op_base_find_available(OPAL_ENABLE_PROGRESS_THREADS,
ompi_mpi_thread_multiple))) {
error = "ompi_op_base_find_available() failed";
goto error;
......@@ -669,9 +652,8 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided,
/* Select which MPI components to use */
if (OMPI_SUCCESS !=
(ret = mca_pml_base_select(OPAL_ENABLE_PROGRESS_THREADS,
ompi_mpi_thread_multiple))) {
if (OMPI_SUCCESS
!= (ret = mca_pml_base_select(OPAL_ENABLE_PROGRESS_THREADS, ompi_mpi_thread_multiple))) {
error = "mca_pml_base_select() failed";
goto error;
}
......@@ -691,8 +673,8 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided,
}
OMPI_TIMING_NEXT("commit");
#if (OPAL_ENABLE_TIMING)
if (OMPI_TIMING_ENABLED && !opal_pmix_base_async_modex &&
opal_pmix_collect_all_data && !ompi_singleton) {
if (OMPI_TIMING_ENABLED && !opal_pmix_base_async_modex && opal_pmix_collect_all_data
&& !ompi_singleton) {
if (PMIX_SUCCESS != (rc = PMIx_Fence(NULL, 0, NULL, 0))) {
ret - opal_pmix_convert_status(rc);
error = "timing: pmix-barrier-1 failed";
......@@ -723,9 +705,8 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided,
active = true;
OPAL_POST_OBJECT(&active);
PMIX_INFO_LOAD(&info[0], PMIX_COLLECT_DATA, &opal_pmix_collect_all_data, PMIX_BOOL);
if( PMIX_SUCCESS != (rc = PMIx_Fence_nb(NULL, 0, NULL, 0,
fence_release,
(void*)&active))) {
if (PMIX_SUCCESS
!= (rc = PMIx_Fence_nb(NULL, 0, NULL, 0, fence_release, (void *) &active))) {
ret = opal_pmix_convert_status(rc);
error = "PMIx_Fence_nb() failed";
goto error;
......@@ -753,30 +734,27 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided,
OMPI_TIMING_NEXT("modex");
/* select buffered send allocator component to be used */
if( OMPI_SUCCESS !=
(ret = mca_pml_base_bsend_init(ompi_mpi_thread_multiple))) {
if (OMPI_SUCCESS != (ret = mca_pml_base_bsend_init(ompi_mpi_thread_multiple))) {
error = "mca_pml_base_bsend_init() failed";
goto error;
}
if (OMPI_SUCCESS !=
(ret = mca_coll_base_find_available(OPAL_ENABLE_PROGRESS_THREADS,
if (OMPI_SUCCESS
!= (ret = mca_coll_base_find_available(OPAL_ENABLE_PROGRESS_THREADS,
ompi_mpi_thread_multiple))) {
error = "mca_coll_base_find_available() failed";
goto error;
}
if (OMPI_SUCCESS !=
(ret = ompi_osc_base_find_available(OPAL_ENABLE_PROGRESS_THREADS,
if (OMPI_SUCCESS
!= (ret = ompi_osc_base_find_available(OPAL_ENABLE_PROGRESS_THREADS,
ompi_mpi_thread_multiple))) {
error = "ompi_osc_base_find_available() failed";
goto error;
}
if (OMPI_SUCCESS !=
(ret = mca_part_base_select(OPAL_ENABLE_PROGRESS_THREADS,
ompi_mpi_thread_multiple))) {
if (OMPI_SUCCESS
!= (ret = mca_part_base_select(OPAL_ENABLE_PROGRESS_THREADS, ompi_mpi_thread_multiple))) {
error = "mca_part_base_select() failed";
goto error;
}
......@@ -887,8 +865,7 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided,
Otherwise, if we got some other failure, fall through to print
a generic message. */
if (OMPI_ERR_UNREACH == ret) {
opal_show_help("help-mpi-runtime.txt",
"mpi_init:startup:pml-add-procs-fail", true);
opal_show_help("help-mpi-runtime.txt", "mpi_init:startup:pml-add-procs-fail", true);
error = NULL;
goto error;
} else if (OMPI_SUCCESS != ret) {
......@@ -905,13 +882,17 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided,
if (ompi_ftmpi_enabled) {
const char *evmethod;
rc = ompi_comm_rbcast_init();
if( OMPI_SUCCESS != rc ) return rc;
if (OMPI_SUCCESS != rc)
return rc;
rc = ompi_comm_revoke_init();
if( OMPI_SUCCESS != rc ) return rc;
if (OMPI_SUCCESS != rc)
return rc;
rc = ompi_comm_failure_propagator_init();
if( OMPI_SUCCESS != rc ) return rc;
if (OMPI_SUCCESS != rc)
return rc;
rc = ompi_comm_failure_detector_init();
if( OMPI_SUCCESS != rc ) return rc;
if (OMPI_SUCCESS != rc)
return rc;
evmethod = event_base_get_method(opal_sync_event_base);
if (0 == strcmp("select", evmethod)) {
......@@ -924,8 +905,7 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided,
* Dump all MCA parameters if requested
*/
if (ompi_mpi_show_mca_params) {
ompi_show_all_mca_params(ompi_mpi_comm_world.comm.c_my_rank,
nprocs,
ompi_show_all_mca_params(ompi_mpi_comm_world.comm.c_my_rank, nprocs,
ompi_process_info.nodename);
}
......@@ -949,8 +929,8 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided,
active = true;
OPAL_POST_OBJECT(&active);
PMIX_INFO_LOAD(&info[0], PMIX_COLLECT_DATA, &flag, PMIX_BOOL);
if (PMIX_SUCCESS != (rc = PMIx_Fence_nb(NULL, 0, info, 1,
fence_release, (void*)&active))) {
if (PMIX_SUCCESS
!= (rc = PMIx_Fence_nb(NULL, 0, info, 1, fence_release, (void *) &active))) {
ret = opal_pmix_convert_status(rc);
error = "PMIx_Fence_nb() failed";
goto error;
......@@ -1003,14 +983,12 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided,
(since dpm.mark_dyncomm is not set in the communicator creation
function else), but before dpm.dyncom_init, since this function
might require collective for the CID allocation. */
if (OMPI_SUCCESS !=
(ret = mca_coll_base_comm_select(MPI_COMM_WORLD))) {
if (OMPI_SUCCESS != (ret = mca_coll_base_comm_select(MPI_COMM_WORLD))) {
error = "mca_coll_base_comm_select(MPI_COMM_WORLD) failed";
goto error;
}
if (OMPI_SUCCESS !=
(ret = mca_coll_base_comm_select(MPI_COMM_SELF))) {
if (OMPI_SUCCESS != (ret = mca_coll_base_comm_select(MPI_COMM_SELF))) {
error = "mca_coll_base_comm_select(MPI_COMM_SELF) failed";
goto error;
}
......@@ -1052,7 +1030,8 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided,
/* start the failure detector */
if (ompi_ftmpi_enabled) {
rc = ompi_comm_failure_detector_start();
if( OMPI_SUCCESS != rc ) return rc;
if (OMPI_SUCCESS != rc)
return rc;
}
#endif
......@@ -1062,8 +1041,7 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided,
/* Only print a message if one was not already printed */
if (NULL != error && OMPI_ERR_SILENT != ret) {
const char *err_msg = opal_strerror(ret);
opal_show_help("help-mpi-runtime.txt",
"mpi_init:startup:internal-failure", true,
opal_show_help("help-mpi-runtime.txt", "mpi_init:startup:internal-failure", true,
"MPI_INIT", "MPI_INIT", error, err_msg, ret);
}
ompi_hook_base_mpi_init_error(argc, argv, requested, provided);
......
......@@ -7,58 +7,42 @@
* Sample MPI "hello world" application in C
*/
#include <stdlib.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <unistd.h>
#include "mpi.h"
#define BUFFLEN 64
extern char **environ;
/* Spawn Modes (dynamic job spawning):
* 1: Spawn just one process (in one job)
* 2: Spawn 2 processes in 2 different jobs
* 3: Spawn 2 prcoesses in one (shared) job
* 3: Spawn 3 processes in one (shared) job
* 0 or other: Do not spawn a dynamic process/job
*/
#define SPAWN_MODE 0
#define SPAWN_MODE 1
void errorExit(char* msg);
int main(int argc, char *argv[])
{
FILE *ptr = fopen("/home/test_out", "a");
fprintf(ptr, "TEST APP STARTED\n");
char **s = environ;
/*void notifyProcessAgent(pid_t pid, int rank, const char *eventInfo) {
struct sockaddr_un strAddr;
socklen_t lenAddr;
int fdSock;
if ((fdSock=socket(PF_UNIX, SOCK_STREAM, 0)) < 0) {
errorExit("socket");
}
strAddr.sun_family=AF_LOCAL; // Unix domain
strcpy(strAddr.sun_path, SOCKET_PATH);
lenAddr=sizeof(strAddr.sun_family)+strlen(strAddr.sun_path);
if (connect(fdSock, (struct sockaddr*)&strAddr, lenAddr) !=0 ) {
errorExit("connect");
// PRINT ENV
for (; *s; s++) {
fprintf(ptr, "%s\n", *s);
}
char info2Send[BUFFLEN];
snprintf(info2Send, BUFFLEN+1, "%s: %d, %d", eventInfo, pid, rank);
if (send(fdSock, info2Send, BUFFLEN+1, 0) < 0) {
errorExit("send");
char *parentPort = getenv("OMPI_PARENT_PORT");
if (parentPort) {
fprintf(ptr, "Hey we have a parent port! %s", parentPort);
}
printf("\nData send!\n");
char rank2Recv[BUFFLEN];
recv(fdSock, rank2Recv, BUFFLEN+1, 0);
int receivedNumber = (int)strtol(rank2Recv, NULL, 0);
printf("Received from server: %d\n", receivedNumber);
close(fdSock);
}*/
int main(int argc, char* argv[]) {
fclose(ptr);
int rank, size, len;
pid_t pid;
......@@ -73,69 +57,87 @@ int main(int argc, char* argv[]) {
// notifyProcessAgent(pid, rank, "Spawned");
printf("Hello, world, I am %d of %d, PID: %d\n", rank, size, pid);
fflush(stdout);
// dynamically spawn child process
// https://mpi.deino.net/mpi_functions/MPI_Comm_spawn.html
if (1 == SPAWN_MODE) {
int np = 1;
int errcodes[1];
int np = 2;
int errcodes[2];
MPI_Comm parentcomm, intercomm;
MPI_Comm_get_parent(&parentcomm);
if (parentcomm == MPI_COMM_NULL) {
MPI_Comm_spawn( "hello", MPI_ARGV_NULL, np, MPI_INFO_NULL, 0,
MPI_COMM_WORLD, &intercomm, errcodes );
MPI_Comm_spawn("/home/ompi/rank-swapper-agent/hello", MPI_ARGV_NULL, np, MPI_INFO_NULL,
0, MPI_COMM_WORLD, &intercomm, errcodes);
printf("I'm the parent.\n");
fflush(stdout);
} else {
printf("I'm the spawned.\n");
fflush(stdout);
}
if (0 != errcodes[0]) {
printf("ERROR_SPAWN: code: %d\n", errcodes[0]);
fflush(stdout);
}
// (instead of Comm_multiple) spawns a second, different intercommunicator
} else if (2 == SPAWN_MODE) {
int np = 1;
int errcodes[1];
MPI_Comm parentcomm, intercomm;
int np = 2;
int errcodes[2];
MPI_Comm parentcomm;
MPI_Comm intercomm[2];
MPI_Comm_get_parent(&parentcomm);
if (parentcomm == MPI_COMM_NULL) {
for (int i = 0; i < 2; i++) {
MPI_Comm_spawn( "hello", MPI_ARGV_NULL, np, MPI_INFO_NULL, 0,
MPI_COMM_WORLD, &intercomm, errcodes );
MPI_Comm_spawn("/home/ompi/rank-swapper-agent/hello", MPI_ARGV_NULL, np,
MPI_INFO_NULL, 0, MPI_COMM_WORLD, &intercomm[i], errcodes);
if (0 != errcodes[0]) {
printf("ERROR_SPAWN: code: %d\n", errcodes[0]);
fflush(stdout);
}
}
printf("I'm the parent.\n");
fflush(stdout);
} else {
printf("I'm the spawned.\n");
fflush(stdout);
}
} else if (3 == SPAWN_MODE) {
int np[2] = { 1, 1 };
int errcodes[2];
int np[2] = {2, 1};
int errcodes[3];
MPI_Comm parentcomm, intercomm;
char *cmds[2] = { "hello", "hello" };
MPI_Info infos[2] = { MPI_INFO_NULL, MPI_INFO_NULL };
char *cmds[3] = {"/home/ompi/rank-swapper-agent/hello",
"/home/ompi/rank-swapper-agent/hello",
"/home/ompi/rank-swapper-agent/hello"};
MPI_Info infos[3] = {MPI_INFO_NULL, MPI_INFO_NULL, MPI_INFO_NULL};
MPI_Comm_get_parent(&parentcomm);
if (parentcomm == MPI_COMM_NULL) {
// Create n more processes using the "hello" executable
MPI_Comm_spawn_multiple(2, cmds, MPI_ARGVS_NULL, np, infos, 0, MPI_COMM_WORLD, &intercomm, errcodes );
MPI_Comm_spawn_multiple(2, cmds, MPI_ARGVS_NULL, np, infos, 0, MPI_COMM_WORLD,
&intercomm, errcodes);
printf("I'm the parent.\n");
fflush(stdout);
} else {
printf("I'm the spawned.\n");
fflush(stdout);
}
for (int i = 0; i < 2; i++) {
for (int i = 0; i < 3; i++) {
if (0 != errcodes[i]) {
printf("ERROR_SPAWN: code: %d\n", errcodes[i]);
fflush(stdout);
}
}
}
sleep(5);
// printf("Sleeping.\n");
// fflush(stdout);
// sleep(5);
// printf("Done sleeping.\n");
// fflush(stdout);
MPI_Finalize();
......@@ -143,4 +145,3 @@ int main(int argc, char* argv[]) {
return 0;
}