Skip to content
Snippets Groups Projects
Commit 2cbd7f84 authored by paun51's avatar paun51 Committed by FKHals
Browse files

Implement rank-modification handling by MPI

due to the new JSON format. MPI now gets a list of modified ranks it
applies at once instead of doing it process by process.

Also update rank-swapper-agent/locserv.c accordingly for testing
purposes (since it has been replaced by the new node agent).
parent 064a168b
No related branches found
No related tags found
No related merge requests found
...@@ -95,8 +95,11 @@ static void errorExit(char* msg) { ...@@ -95,8 +95,11 @@ static void errorExit(char* msg) {
exit(1); exit(1);
} }
// TODO: Change int return to size_t! /**
static int getProcessAgentRank(uint32_t jobid, uint32_t vpid, size_t size) { * Connects to the node agent, sends its own process information
* and receives the list of modified ranks
*/
static int get_modified_ranks(uint32_t jobid, uint32_t vpid, size_t size, opal_vpid_t *modified_ranks) {
struct addrinfo hints; struct addrinfo hints;
struct addrinfo *result, *rp; struct addrinfo *result, *rp;
const char* host_ip = "localhost"; const char* host_ip = "localhost";
...@@ -154,15 +157,39 @@ static int getProcessAgentRank(uint32_t jobid, uint32_t vpid, size_t size) { ...@@ -154,15 +157,39 @@ static int getProcessAgentRank(uint32_t jobid, uint32_t vpid, size_t size) {
} }
char rank_to_recv[BUFFLEN]; char rank_to_recv[BUFFLEN];
recv(socket_fd, rank_to_recv, BUFFLEN, 0); memset(rank_to_recv, 0, BUFFLEN);
rank_to_recv[BUFFLEN-1] = '\0'; // drop (later overwrite) the length since it is not needed
recv(socket_fd, rank_to_recv, BUFFLEN-1, 0);
// receive the actual message content
recv(socket_fd, rank_to_recv, BUFFLEN-1, 0);
printf("Received from server: %s\n", rank_to_recv); printf("Received from server: %s\n", rank_to_recv);
// int received_rank = (int)strtol(rank_to_recv, NULL, 0);
// printf("Received from server: %d\n", received_rank); // look for msg_data field in JSON string
const char msg_data_key[] = "msg_data";
char *msg_data_location = strstr(rank_to_recv, msg_data_key);
// parse the ranks starting with the first number after msg_data_location
const char digits[] = "0123456789";
char *next_digit, *first_non_digit;
first_non_digit = msg_data_location;
for (size_t i = 0; i < size; i++) {
// get the location of the next digit in the string
next_digit = strpbrk(first_non_digit, digits);
// parse the rank and store the location of the first non-digit char in first_non_digit
modified_ranks[i] = strtoul(next_digit, &first_non_digit, 10);
}
// // Print modified_ranks array for debugging purposes
// printf("modified_ranks: [");
// for (size_t i = 0; i < size; i++) {
// printf("%u", modified_ranks[i]);
// if (i+1 < size) printf(", ");
// }
// printf("]\n");
close(socket_fd); close(socket_fd);
return vpid; //received_rank; return 0;
} }
/* /*
...@@ -171,6 +198,7 @@ static int getProcessAgentRank(uint32_t jobid, uint32_t vpid, size_t size) { ...@@ -171,6 +198,7 @@ static int getProcessAgentRank(uint32_t jobid, uint32_t vpid, size_t size) {
int ompi_comm_init(void) int ompi_comm_init(void)
{ {
ompi_group_t *group; ompi_group_t *group;
opal_vpid_t *modified_ranks;
size_t size; size_t size;
/* Setup communicator array */ /* Setup communicator array */
...@@ -196,19 +224,11 @@ int ompi_comm_init(void) ...@@ -196,19 +224,11 @@ int ompi_comm_init(void)
group->grp_proc_pointers = (ompi_proc_t **) calloc (size, sizeof (ompi_proc_t *)); group->grp_proc_pointers = (ompi_proc_t **) calloc (size, sizeof (ompi_proc_t *));
group->grp_proc_count = size; group->grp_proc_count = size;
// transfer the infos about the current processes to the agent modified_ranks = (opal_vpid_t *) calloc (size, sizeof (opal_vpid_t));
// for (size_t i = 0 ; i < size ; ++i) { get_modified_ranks(OMPI_PROC_MY_NAME->jobid, OMPI_PROC_MY_NAME->vpid, size, modified_ranks);
// opal_process_name_t name = {.vpid = i, .jobid = OMPI_PROC_MY_NAME->jobid};
// // call without return just to gain knowledge about the processes
// getProcessAgentRank(name.jobid, name.vpid, size);
// }
getProcessAgentRank(OMPI_PROC_MY_NAME->jobid, OMPI_PROC_MY_NAME->vpid, size);
for (size_t i = 0 ; i < size ; ++i) { for (size_t i = 0 ; i < size ; ++i) {
opal_process_name_t name = {.vpid = i, .jobid = OMPI_PROC_MY_NAME->jobid}; opal_process_name_t name = {.vpid = modified_ranks[i], .jobid = OMPI_PROC_MY_NAME->jobid};
// /* get desired rank from agent (must be within size!)*/
// name.vpid = getProcessAgentRank(name.jobid, name.vpid, size);
// printf("INIT Loop Rank: %u\n", name.vpid);
/* look for existing ompi_proc_t that matches this name */ /* look for existing ompi_proc_t that matches this name */
group->grp_proc_pointers[i] = (ompi_proc_t *) ompi_proc_lookup (name); group->grp_proc_pointers[i] = (ompi_proc_t *) ompi_proc_lookup (name);
if (NULL == group->grp_proc_pointers[i]) { if (NULL == group->grp_proc_pointers[i]) {
...@@ -219,6 +239,8 @@ int ompi_comm_init(void) ...@@ -219,6 +239,8 @@ int ompi_comm_init(void)
} }
} }
free(modified_ranks);
OMPI_GROUP_SET_INTRINSIC (group); OMPI_GROUP_SET_INTRINSIC (group);
OMPI_GROUP_SET_DENSE (group); OMPI_GROUP_SET_DENSE (group);
ompi_set_group_rank(group, ompi_proc_local()); ompi_set_group_rank(group, ompi_proc_local());
......
/* Quelle: http://www.netzmafia.de/skripten/inetprog/ThomasSocket2.pdf */ /// locserv
//
/* ------------------------------------------------------------------------------- */ // Plain TCP-server process (for testing purposes), which:
/* Programm locserv */ // * accepts a connection to a MPI-client-process
/* ------------------------------------------------------------------------------- */ // * prints the incoming process-info-data (ranks, jobids, proc-counts)
/* Beispiel fuer Unix Domain Sockets */ // from the client to std:out
/* Einfacher Serverprozeß, */ // * answers the client with a list of modified ranks
/* - der auf einen Verbindungswunsch durch einen Clientprozeß wartet, */
/* - einen Verbindungswunsch akzeptiert */
/* - und die ihm vom Clientprozeß geschickten Daten an die Standardausgabe ausgibt */
/* Wenn der Clientprozeß die Kommunikation durch Senden von EOF (=CTRL-D) beendet, */
/* wartet er auf den nächsten Verbindungswunsch */
/* ------------------------------------------------------------------------------- */
#include <search.h>
#include <stdarg.h> #include <stdarg.h>
#include <stdio.h> #include <stdio.h>
#include <stdint.h> #include <stdint.h>
...@@ -26,103 +19,10 @@ ...@@ -26,103 +19,10 @@
#include <unistd.h> #include <unistd.h>
#define MAX_CONNECTIONS 10 #define MAX_CONNECTIONS 10
#define RANK_STR_LEN 8 #define BUFFLEN 256
#define BUFFLEN 64
/* hashtable info source:
* https://pubs.opengroup.org/onlinepubs/9699919799/functions/hcreate.html,
* https://linux.die.net/man/3/hsearch_r
*
* number of different ranks of processes in different jobs plus number of
* different job (which equals the number of ranks in worst case)
*/
#define NUM_RANKS 100
void errorExit(char* msg); void errorExit(char* msg);
/* storage for the hashmap
* (The hashtable works exclusively with pointers so it is necessary to
* allocate something to put the data in to be pointed to by the hashtable)
*/
typedef struct HashStorage{
char keys[NUM_RANKS][32]; // stores the different keys [<jobid>:<vpid>] or [<jobid>]
int key_index; // the current point in the key array that can be written to
int data[NUM_RANKS]; // stores the different ranks and current job max ranks that the hashtable needs to point at
int data_index; // the current point in the data array that can be written to
} HashStorage;
/* Stores the given jobid as the items key */
void setKeyJobid(uint32_t jobid, char* key_str, ENTRY* item) {
sprintf(key_str, "%u", jobid);
item->key = key_str;
}
/* Sets a combination (<jobid>:<vpid>) of the given jobid and vpid as the items key */
void setKeyJobidVpid(uint32_t jobid, uint32_t vpid, char* key_str, ENTRY* item) {
sprintf(key_str, "%u:%u", jobid, vpid);
// check if rank is already assigned to a certain jobid+vpid combination
item->key = key_str;
}
/* Put an item of the given value with the given key into the table */
void putValueIntoTable(int value, char* key_str, HashStorage* hstorage, ENTRY* item) {
strcpy(hstorage->keys[hstorage->key_index], key_str);
item->key = hstorage->keys[hstorage->key_index];
hstorage->data[hstorage->data_index] = value;
item->data = &(hstorage->data[hstorage->data_index]);
// actually put the pointers (key, data) into the hashtable
if (NULL == hsearch(*item, ENTER)) {
errorExit("hsearch");
}
// increment the pointer to the next free places in the storage arrays
hstorage->data_index++;
hstorage->key_index++;
}
/* Returns the desired vpid (which equals the rank later on in Comm_init()) by
* either retrieving it from the hashtable if it is already known or by setting
* it to the maximum currently usable rank per jobid (which is 0 if no other
* processes with the same jobid has yet been seen)
*/
uint32_t getDesiredVpid(uint32_t jobid, uint32_t vpid, size_t size, HashStorage* hstorage) {
ENTRY item;
ENTRY *found_item; // Name to look for in table.
char key_str[16];
setKeyJobidVpid(jobid, vpid, key_str, &item);
if ((found_item = hsearch(item, FIND)) != NULL) {
printf("Found Process: %u:%u -> %d\n", jobid, vpid, *((int *)found_item->data));
// just return the already chosen vpid if the process (name) is known
return *((int *)found_item->data);
} else {
setKeyJobid(jobid, key_str, &item);
if ((found_item = hsearch(item, FIND)) != NULL) {
// increment maximum stored vpid to remember which vpids are
// already (ascendingly) assigned
int stored_vpid_count = ++(*((int *)found_item->data));
// descendingly generate vpid/rank to demonstrate that the order is
// not important anymore
int new_vpid = (size - 1) - stored_vpid_count;
printf("Found Job: %u -> %d\n", jobid, new_vpid);
// update the maximum stored vpid
putValueIntoTable(stored_vpid_count, key_str, hstorage, &item);
// store the process name (which gets a randomized rank)
setKeyJobidVpid(jobid, vpid, key_str, &item);
putValueIntoTable(new_vpid, key_str, hstorage, &item);
return new_vpid;
} else {
int new_vpid = size - 1;
printf("Nothing found: %u -> %d\n", jobid, new_vpid);
// initialize the stored maximum vpid
putValueIntoTable(0, key_str, hstorage, &item);
// store the process name (which gets a randomized rank)
setKeyJobidVpid(jobid, vpid, key_str, &item);
putValueIntoTable(new_vpid, key_str, hstorage, &item);
return new_vpid;
}
}
}
int main(void) { int main(void) {
// TCP server based on http://www.microhowto.info/howto/listen_for_and_accept_tcp_connections_in_c.html // TCP server based on http://www.microhowto.info/howto/listen_for_and_accept_tcp_connections_in_c.html
...@@ -153,10 +53,9 @@ int main(void) { ...@@ -153,10 +53,9 @@ int main(void) {
errorExit("Could not create server socket!"); errorExit("Could not create server socket!");
} }
// FIXME Remove after debugging
int reuseaddr = 1; int reuseaddr = 1;
if (-1 == setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr, sizeof(reuseaddr))) { if (-1 == setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr, sizeof(reuseaddr))) {
errorExit("Could set SO_REUSEADDR!"); errorExit("Could not set SO_REUSEADDR!");
} }
if (-1 == bind(server_fd, result->ai_addr, result->ai_addrlen)) { if (-1 == bind(server_fd, result->ai_addr, result->ai_addrlen)) {
...@@ -169,41 +68,51 @@ int main(void) { ...@@ -169,41 +68,51 @@ int main(void) {
errorExit("failed to listen for connections"); errorExit("failed to listen for connections");
} }
// create hashtable with two different types of key-data-combinations:
// 1. <jobid>:<desired_rank> -> vpid
// 2. <jobid> -> job_max_rank
(void) hcreate(NUM_RANKS);
// initialize hsearch storage
struct HashStorage hstorage = { .key_index = 0, .data_index = 0 };
int session_fd = 0; int session_fd = 0;
while ((session_fd = accept(server_fd, NULL, NULL)) >= 0) { while ((session_fd = accept(server_fd, NULL, NULL)) >= 0) {
// read the length and discard it (by overwriting it next)
recv(session_fd, client_message, sizeof(uint32_t), 0);
printf("Length-Msg: %s\n", client_message);
// since recv() seems to be atomar, no concurrent behaviour must be // since recv() seems to be atomar, no concurrent behaviour must be
// considered // considered
recv(session_fd, client_message, BUFFLEN, 0); recv(session_fd, client_message, BUFFLEN, 0);
printf("%s\n", client_message); printf("Message: %s\n", client_message);
pid_t pid; pid_t pid = 0;
uint32_t vpid; uint32_t vpid = 0;
uint32_t jobid; uint32_t jobid = 0;
size_t size; size_t size = 0;
sscanf(client_message, "%d,%u,%u,%zu", &pid, &vpid, &jobid, &size); int vars_read = sscanf(client_message, "{\"msg_type\": 128, \"msg_data\": \"%d,%u,%u,%zu\"}", &pid, &vpid, &jobid, &size);
printf("Sscanf read count: %d (should equal 4)\n", vars_read);
printf("Spawned - PID: %d, vpid: %u, jobID: %u, size: %zu\n", pid, vpid, jobid, size); printf("Spawned - PID: %d, vpid: %u, jobID: %u, size: %zu\n", pid, vpid, jobid, size);
uint32_t desiredRank = getDesiredVpid(jobid, vpid, size, &hstorage); // generate a comma separated list of ascending integers to be used as ranks
char rankList[BUFFLEN / 2] = "";
char rankString[RANK_STR_LEN]; char first_rank[] = "0";
snprintf(rankString, RANK_STR_LEN+1, "%u", desiredRank); strcat(rankList, first_rank);
if (send(session_fd, rankString, RANK_STR_LEN+1, 0) < 0) { for (uint32_t i = 1; i < size; i++) {
char rank[BUFFLEN];
sprintf(rank, ",%u", i);
strcat(rankList, rank);
}
char end[] = "\0";
strcat(rankList, end);
char rankString[BUFFLEN];
snprintf(rankString, BUFFLEN, "{\"msg_type\": 0, \"msg_data\": \"%s\"}", rankList);
// the length must be send first to emulate the behaviour of the actual TCP server but it
// gets discarded on client side anyway so it does not need to make sense
uint32_t unneeded_length = 0;
send(session_fd, &unneeded_length, sizeof(uint32_t), 0);
// send the actual message
if (send(session_fd, rankString, BUFFLEN, 0) < 0) {
errorExit("send"); errorExit("send");
} }
printf("Send Rank: %d\n\n", desiredRank); printf("Send Rank: %s\n\n", rankString);
close(session_fd); close(session_fd);
} }
hdestroy();
close(server_fd); close(server_fd);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment