Skip to content
Snippets Groups Projects

Rank modification

Merged felixkhals requested to merge feature/handle_rank_modification_msg into master
2 files
+ 82
151
Compare changes
  • Side-by-side
  • Inline
Files
2
  • bd435ae9
    Implement rank-modification handling by MPI · bd435ae9
    paun51 authored
    due to the new JSON format. MPI now gets a list of modified ranks it
    applies at once instead of doing it process by process.
    
    Also update rank-swapper-agent/locserv.c accordingly for testing
    purposes (since it has been replaced by the new node agent).
@@ -95,8 +95,11 @@ static void errorExit(char* msg) {
exit(1);
}
// TODO: Change int return to size_t!
static int getProcessAgentRank(uint32_t jobid, uint32_t vpid, size_t size) {
/**
* Connects to the node agent, sends its own process information
* and receives the list of modified ranks
*/
static int get_modified_ranks(uint32_t jobid, uint32_t vpid, size_t size, opal_vpid_t *modified_ranks) {
struct addrinfo hints;
struct addrinfo *result, *rp;
const char* host_ip = "localhost";
@@ -154,15 +157,39 @@ static int getProcessAgentRank(uint32_t jobid, uint32_t vpid, size_t size) {
}
char rank_to_recv[BUFFLEN];
recv(socket_fd, rank_to_recv, BUFFLEN, 0);
rank_to_recv[BUFFLEN-1] = '\0';
memset(rank_to_recv, 0, BUFFLEN);
// drop (later overwrite) the length since it is not needed
recv(socket_fd, rank_to_recv, BUFFLEN-1, 0);
// receive the actual message content
recv(socket_fd, rank_to_recv, BUFFLEN-1, 0);
printf("Received from server: %s\n", rank_to_recv);
// int received_rank = (int)strtol(rank_to_recv, NULL, 0);
// printf("Received from server: %d\n", received_rank);
// look for msg_data field in JSON string
const char msg_data_key[] = "msg_data";
char *msg_data_location = strstr(rank_to_recv, msg_data_key);
// parse the ranks starting with the first number after msg_data_location
const char digits[] = "0123456789";
char *next_digit, *first_non_digit;
first_non_digit = msg_data_location;
for (size_t i = 0; i < size; i++) {
// get the location of the next digit in the string
next_digit = strpbrk(first_non_digit, digits);
// parse the rank and store the location of the first non-digit char in first_non_digit
modified_ranks[i] = strtoul(next_digit, &first_non_digit, 10);
}
// // Print modified_ranks array for debugging purposes
// printf("modified_ranks: [");
// for (size_t i = 0; i < size; i++) {
// printf("%u", modified_ranks[i]);
// if (i+1 < size) printf(", ");
// }
// printf("]\n");
close(socket_fd);
return vpid; //received_rank;
return 0;
}
/*
@@ -171,6 +198,7 @@ static int getProcessAgentRank(uint32_t jobid, uint32_t vpid, size_t size) {
int ompi_comm_init(void)
{
ompi_group_t *group;
opal_vpid_t *modified_ranks;
size_t size;
/* Setup communicator array */
@@ -196,19 +224,11 @@ int ompi_comm_init(void)
group->grp_proc_pointers = (ompi_proc_t **) calloc (size, sizeof (ompi_proc_t *));
group->grp_proc_count = size;
// transfer the infos about the current processes to the agent
// for (size_t i = 0 ; i < size ; ++i) {
// opal_process_name_t name = {.vpid = i, .jobid = OMPI_PROC_MY_NAME->jobid};
// // call without return just to gain knowledge about the processes
// getProcessAgentRank(name.jobid, name.vpid, size);
// }
getProcessAgentRank(OMPI_PROC_MY_NAME->jobid, OMPI_PROC_MY_NAME->vpid, size);
modified_ranks = (opal_vpid_t *) calloc (size, sizeof (opal_vpid_t));
get_modified_ranks(OMPI_PROC_MY_NAME->jobid, OMPI_PROC_MY_NAME->vpid, size, modified_ranks);
for (size_t i = 0 ; i < size ; ++i) {
opal_process_name_t name = {.vpid = i, .jobid = OMPI_PROC_MY_NAME->jobid};
// /* get desired rank from agent (must be within size!)*/
// name.vpid = getProcessAgentRank(name.jobid, name.vpid, size);
// printf("INIT Loop Rank: %u\n", name.vpid);
opal_process_name_t name = {.vpid = modified_ranks[i], .jobid = OMPI_PROC_MY_NAME->jobid};
/* look for existing ompi_proc_t that matches this name */
group->grp_proc_pointers[i] = (ompi_proc_t *) ompi_proc_lookup (name);
if (NULL == group->grp_proc_pointers[i]) {
@@ -219,6 +239,8 @@ int ompi_comm_init(void)
}
}
free(modified_ranks);
OMPI_GROUP_SET_INTRINSIC (group);
OMPI_GROUP_SET_DENSE (group);
ompi_set_group_rank(group, ompi_proc_local());
Loading