Skip to content
Snippets Groups Projects

Rank modification

2 files
+ 82
151
Compare changes
  • Side-by-side
  • Inline

Files

  • bd435ae9
    Implement rank-modification handling by MPI · bd435ae9
    paun51 authored
    due to the new JSON format. MPI now gets a list of modified ranks it
    applies at once instead of doing it process by process.
    
    Also update rank-swapper-agent/locserv.c accordingly for testing
    purposes (since it has been replaced by the new node agent).
+ 40
18
@@ -95,8 +95,11 @@ static void errorExit(char* msg) {
@@ -95,8 +95,11 @@ static void errorExit(char* msg) {
exit(1);
exit(1);
}
}
// TODO: Change int return to size_t!
/**
static int getProcessAgentRank(uint32_t jobid, uint32_t vpid, size_t size) {
* Connects to the node agent, sends its own process information
 
* and receives the list of modified ranks
 
*/
 
static int get_modified_ranks(uint32_t jobid, uint32_t vpid, size_t size, opal_vpid_t *modified_ranks) {
struct addrinfo hints;
struct addrinfo hints;
struct addrinfo *result, *rp;
struct addrinfo *result, *rp;
const char* host_ip = "localhost";
const char* host_ip = "localhost";
@@ -154,15 +157,39 @@ static int getProcessAgentRank(uint32_t jobid, uint32_t vpid, size_t size) {
@@ -154,15 +157,39 @@ static int getProcessAgentRank(uint32_t jobid, uint32_t vpid, size_t size) {
}
}
char rank_to_recv[BUFFLEN];
char rank_to_recv[BUFFLEN];
recv(socket_fd, rank_to_recv, BUFFLEN, 0);
memset(rank_to_recv, 0, BUFFLEN);
rank_to_recv[BUFFLEN-1] = '\0';
// drop (later overwrite) the length since it is not needed
 
recv(socket_fd, rank_to_recv, BUFFLEN-1, 0);
 
// receive the actual message content
 
recv(socket_fd, rank_to_recv, BUFFLEN-1, 0);
printf("Received from server: %s\n", rank_to_recv);
printf("Received from server: %s\n", rank_to_recv);
// int received_rank = (int)strtol(rank_to_recv, NULL, 0);
// printf("Received from server: %d\n", received_rank);
// look for msg_data field in JSON string
 
const char msg_data_key[] = "msg_data";
 
char *msg_data_location = strstr(rank_to_recv, msg_data_key);
 
 
// parse the ranks starting with the first number after msg_data_location
 
const char digits[] = "0123456789";
 
char *next_digit, *first_non_digit;
 
first_non_digit = msg_data_location;
 
for (size_t i = 0; i < size; i++) {
 
// get the location of the next digit in the string
 
next_digit = strpbrk(first_non_digit, digits);
 
// parse the rank and store the location of the first non-digit char in first_non_digit
 
modified_ranks[i] = strtoul(next_digit, &first_non_digit, 10);
 
}
 
 
// // Print modified_ranks array for debugging purposes
 
// printf("modified_ranks: [");
 
// for (size_t i = 0; i < size; i++) {
 
// printf("%u", modified_ranks[i]);
 
// if (i+1 < size) printf(", ");
 
// }
 
// printf("]\n");
close(socket_fd);
close(socket_fd);
return vpid; //received_rank;
return 0;
}
}
/*
/*
@@ -171,6 +198,7 @@ static int getProcessAgentRank(uint32_t jobid, uint32_t vpid, size_t size) {
@@ -171,6 +198,7 @@ static int getProcessAgentRank(uint32_t jobid, uint32_t vpid, size_t size) {
int ompi_comm_init(void)
int ompi_comm_init(void)
{
{
ompi_group_t *group;
ompi_group_t *group;
 
opal_vpid_t *modified_ranks;
size_t size;
size_t size;
/* Setup communicator array */
/* Setup communicator array */
@@ -196,19 +224,11 @@ int ompi_comm_init(void)
@@ -196,19 +224,11 @@ int ompi_comm_init(void)
group->grp_proc_pointers = (ompi_proc_t **) calloc (size, sizeof (ompi_proc_t *));
group->grp_proc_pointers = (ompi_proc_t **) calloc (size, sizeof (ompi_proc_t *));
group->grp_proc_count = size;
group->grp_proc_count = size;
// transfer the infos about the current processes to the agent
modified_ranks = (opal_vpid_t *) calloc (size, sizeof (opal_vpid_t));
// for (size_t i = 0 ; i < size ; ++i) {
get_modified_ranks(OMPI_PROC_MY_NAME->jobid, OMPI_PROC_MY_NAME->vpid, size, modified_ranks);
// opal_process_name_t name = {.vpid = i, .jobid = OMPI_PROC_MY_NAME->jobid};
// // call without return just to gain knowledge about the processes
// getProcessAgentRank(name.jobid, name.vpid, size);
// }
getProcessAgentRank(OMPI_PROC_MY_NAME->jobid, OMPI_PROC_MY_NAME->vpid, size);
for (size_t i = 0 ; i < size ; ++i) {
for (size_t i = 0 ; i < size ; ++i) {
opal_process_name_t name = {.vpid = i, .jobid = OMPI_PROC_MY_NAME->jobid};
opal_process_name_t name = {.vpid = modified_ranks[i], .jobid = OMPI_PROC_MY_NAME->jobid};
// /* get desired rank from agent (must be within size!)*/
// name.vpid = getProcessAgentRank(name.jobid, name.vpid, size);
// printf("INIT Loop Rank: %u\n", name.vpid);
/* look for existing ompi_proc_t that matches this name */
/* look for existing ompi_proc_t that matches this name */
group->grp_proc_pointers[i] = (ompi_proc_t *) ompi_proc_lookup (name);
group->grp_proc_pointers[i] = (ompi_proc_t *) ompi_proc_lookup (name);
if (NULL == group->grp_proc_pointers[i]) {
if (NULL == group->grp_proc_pointers[i]) {
@@ -219,6 +239,8 @@ int ompi_comm_init(void)
@@ -219,6 +239,8 @@ int ompi_comm_init(void)
}
}
}
}
 
free(modified_ranks);
 
OMPI_GROUP_SET_INTRINSIC (group);
OMPI_GROUP_SET_INTRINSIC (group);
OMPI_GROUP_SET_DENSE (group);
OMPI_GROUP_SET_DENSE (group);
ompi_set_group_rank(group, ompi_proc_local());
ompi_set_group_rank(group, ompi_proc_local());
Loading