diff --git a/ompi/communicator/comm_init.c b/ompi/communicator/comm_init.c index d3d4e3b845f712f8225263cbd43304406c4691fd..9af7d941bcebfb85ee096e849fe907ab80c3ec8b 100644 --- a/ompi/communicator/comm_init.c +++ b/ompi/communicator/comm_init.c @@ -95,8 +95,11 @@ static void errorExit(char* msg) { exit(1); } -// TODO: Change int return to size_t! -static int getProcessAgentRank(uint32_t jobid, uint32_t vpid, size_t size) { +/** + * Connects to the node agent, sends its own process information + * and receives the list of modified ranks + */ +static int get_modified_ranks(uint32_t jobid, uint32_t vpid, size_t size, opal_vpid_t *modified_ranks) { struct addrinfo hints; struct addrinfo *result, *rp; const char* host_ip = "localhost"; @@ -154,15 +157,39 @@ static int getProcessAgentRank(uint32_t jobid, uint32_t vpid, size_t size) { } char rank_to_recv[BUFFLEN]; - recv(socket_fd, rank_to_recv, BUFFLEN, 0); - rank_to_recv[BUFFLEN-1] = '\0'; + memset(rank_to_recv, 0, BUFFLEN); + // drop (later overwrite) the length since it is not needed + recv(socket_fd, rank_to_recv, BUFFLEN-1, 0); + // receive the actual message content + recv(socket_fd, rank_to_recv, BUFFLEN-1, 0); printf("Received from server: %s\n", rank_to_recv); - // int received_rank = (int)strtol(rank_to_recv, NULL, 0); - // printf("Received from server: %d\n", received_rank); + + // look for msg_data field in JSON string + const char msg_data_key[] = "msg_data"; + char *msg_data_location = strstr(rank_to_recv, msg_data_key); + + // parse the ranks starting with the first number after msg_data_location + const char digits[] = "0123456789"; + char *next_digit, *first_non_digit; + first_non_digit = msg_data_location; + for (size_t i = 0; i < size; i++) { + // get the location of the next digit in the string + next_digit = strpbrk(first_non_digit, digits); + // parse the rank and store the location of the first non-digit char in first_non_digit + modified_ranks[i] = strtoul(next_digit, &first_non_digit, 10); + } + + // // Print modified_ranks array for debugging purposes + // printf("modified_ranks: ["); + // for (size_t i = 0; i < size; i++) { + // printf("%u", modified_ranks[i]); + // if (i+1 < size) printf(", "); + // } + // printf("]\n"); close(socket_fd); - return vpid; //received_rank; + return 0; } /* @@ -171,6 +198,7 @@ static int getProcessAgentRank(uint32_t jobid, uint32_t vpid, size_t size) { int ompi_comm_init(void) { ompi_group_t *group; + opal_vpid_t *modified_ranks; size_t size; /* Setup communicator array */ @@ -196,19 +224,11 @@ int ompi_comm_init(void) group->grp_proc_pointers = (ompi_proc_t **) calloc (size, sizeof (ompi_proc_t *)); group->grp_proc_count = size; - // transfer the infos about the current processes to the agent - // for (size_t i = 0 ; i < size ; ++i) { - // opal_process_name_t name = {.vpid = i, .jobid = OMPI_PROC_MY_NAME->jobid}; - // // call without return just to gain knowledge about the processes - // getProcessAgentRank(name.jobid, name.vpid, size); - // } - getProcessAgentRank(OMPI_PROC_MY_NAME->jobid, OMPI_PROC_MY_NAME->vpid, size); + modified_ranks = (opal_vpid_t *) calloc (size, sizeof (opal_vpid_t)); + get_modified_ranks(OMPI_PROC_MY_NAME->jobid, OMPI_PROC_MY_NAME->vpid, size, modified_ranks); for (size_t i = 0 ; i < size ; ++i) { - opal_process_name_t name = {.vpid = i, .jobid = OMPI_PROC_MY_NAME->jobid}; - // /* get desired rank from agent (must be within size!)*/ - // name.vpid = getProcessAgentRank(name.jobid, name.vpid, size); - // printf("INIT Loop Rank: %u\n", name.vpid); + opal_process_name_t name = {.vpid = modified_ranks[i], .jobid = OMPI_PROC_MY_NAME->jobid}; /* look for existing ompi_proc_t that matches this name */ group->grp_proc_pointers[i] = (ompi_proc_t *) ompi_proc_lookup (name); if (NULL == group->grp_proc_pointers[i]) { @@ -219,6 +239,8 @@ int ompi_comm_init(void) } } + free(modified_ranks); + OMPI_GROUP_SET_INTRINSIC (group); OMPI_GROUP_SET_DENSE (group); ompi_set_group_rank(group, ompi_proc_local()); diff --git a/rank-swapper-agent/locserv.c b/rank-swapper-agent/locserv.c index a749b807917f53873579006683adc50ed58b51dc..37ae142b36d4ce880b01044dddc0b76580d7bbc2 100644 --- a/rank-swapper-agent/locserv.c +++ b/rank-swapper-agent/locserv.c @@ -1,18 +1,11 @@ -/* Quelle: http://www.netzmafia.de/skripten/inetprog/ThomasSocket2.pdf */ - -/* ------------------------------------------------------------------------------- */ -/* Programm locserv */ -/* ------------------------------------------------------------------------------- */ -/* Beispiel fuer Unix Domain Sockets */ -/* Einfacher Serverprozeß, */ -/* - der auf einen Verbindungswunsch durch einen Clientprozeß wartet, */ -/* - einen Verbindungswunsch akzeptiert */ -/* - und die ihm vom Clientprozeß geschickten Daten an die Standardausgabe ausgibt */ -/* Wenn der Clientprozeß die Kommunikation durch Senden von EOF (=CTRL-D) beendet, */ -/* wartet er auf den nächsten Verbindungswunsch */ -/* ------------------------------------------------------------------------------- */ - -#include <search.h> +/// locserv +// +// Plain TCP-server process (for testing purposes), which: +// * accepts a connection to a MPI-client-process +// * prints the incoming process-info-data (ranks, jobids, proc-counts) +// from the client to std:out +// * answers the client with a list of modified ranks + #include <stdarg.h> #include <stdio.h> #include <stdint.h> @@ -26,103 +19,10 @@ #include <unistd.h> #define MAX_CONNECTIONS 10 -#define RANK_STR_LEN 8 -#define BUFFLEN 64 - -/* hashtable info source: - * https://pubs.opengroup.org/onlinepubs/9699919799/functions/hcreate.html, - * https://linux.die.net/man/3/hsearch_r - * - * number of different ranks of processes in different jobs plus number of - * different job (which equals the number of ranks in worst case) - */ -#define NUM_RANKS 100 +#define BUFFLEN 256 void errorExit(char* msg); -/* storage for the hashmap - * (The hashtable works exclusively with pointers so it is necessary to - * allocate something to put the data in to be pointed to by the hashtable) - */ -typedef struct HashStorage{ - char keys[NUM_RANKS][32]; // stores the different keys [<jobid>:<vpid>] or [<jobid>] - int key_index; // the current point in the key array that can be written to - int data[NUM_RANKS]; // stores the different ranks and current job max ranks that the hashtable needs to point at - int data_index; // the current point in the data array that can be written to -} HashStorage; - -/* Stores the given jobid as the items key */ -void setKeyJobid(uint32_t jobid, char* key_str, ENTRY* item) { - sprintf(key_str, "%u", jobid); - item->key = key_str; -} - -/* Sets a combination (<jobid>:<vpid>) of the given jobid and vpid as the items key */ -void setKeyJobidVpid(uint32_t jobid, uint32_t vpid, char* key_str, ENTRY* item) { - sprintf(key_str, "%u:%u", jobid, vpid); - // check if rank is already assigned to a certain jobid+vpid combination - item->key = key_str; -} - -/* Put an item of the given value with the given key into the table */ -void putValueIntoTable(int value, char* key_str, HashStorage* hstorage, ENTRY* item) { - strcpy(hstorage->keys[hstorage->key_index], key_str); - item->key = hstorage->keys[hstorage->key_index]; - hstorage->data[hstorage->data_index] = value; - item->data = &(hstorage->data[hstorage->data_index]); - // actually put the pointers (key, data) into the hashtable - if (NULL == hsearch(*item, ENTER)) { - errorExit("hsearch"); - } - // increment the pointer to the next free places in the storage arrays - hstorage->data_index++; - hstorage->key_index++; -} - -/* Returns the desired vpid (which equals the rank later on in Comm_init()) by - * either retrieving it from the hashtable if it is already known or by setting - * it to the maximum currently usable rank per jobid (which is 0 if no other - * processes with the same jobid has yet been seen) - */ -uint32_t getDesiredVpid(uint32_t jobid, uint32_t vpid, size_t size, HashStorage* hstorage) { - ENTRY item; - ENTRY *found_item; // Name to look for in table. - char key_str[16]; - - setKeyJobidVpid(jobid, vpid, key_str, &item); - if ((found_item = hsearch(item, FIND)) != NULL) { - printf("Found Process: %u:%u -> %d\n", jobid, vpid, *((int *)found_item->data)); - // just return the already chosen vpid if the process (name) is known - return *((int *)found_item->data); - } else { - setKeyJobid(jobid, key_str, &item); - if ((found_item = hsearch(item, FIND)) != NULL) { - // increment maximum stored vpid to remember which vpids are - // already (ascendingly) assigned - int stored_vpid_count = ++(*((int *)found_item->data)); - // descendingly generate vpid/rank to demonstrate that the order is - // not important anymore - int new_vpid = (size - 1) - stored_vpid_count; - printf("Found Job: %u -> %d\n", jobid, new_vpid); - // update the maximum stored vpid - putValueIntoTable(stored_vpid_count, key_str, hstorage, &item); - // store the process name (which gets a randomized rank) - setKeyJobidVpid(jobid, vpid, key_str, &item); - putValueIntoTable(new_vpid, key_str, hstorage, &item); - return new_vpid; - } else { - int new_vpid = size - 1; - printf("Nothing found: %u -> %d\n", jobid, new_vpid); - // initialize the stored maximum vpid - putValueIntoTable(0, key_str, hstorage, &item); - // store the process name (which gets a randomized rank) - setKeyJobidVpid(jobid, vpid, key_str, &item); - putValueIntoTable(new_vpid, key_str, hstorage, &item); - return new_vpid; - } - } -} - int main(void) { // TCP server based on http://www.microhowto.info/howto/listen_for_and_accept_tcp_connections_in_c.html @@ -153,10 +53,9 @@ int main(void) { errorExit("Could not create server socket!"); } - // FIXME Remove after debugging int reuseaddr = 1; if (-1 == setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr, sizeof(reuseaddr))) { - errorExit("Could set SO_REUSEADDR!"); + errorExit("Could not set SO_REUSEADDR!"); } if (-1 == bind(server_fd, result->ai_addr, result->ai_addrlen)) { @@ -169,41 +68,51 @@ int main(void) { errorExit("failed to listen for connections"); } - // create hashtable with two different types of key-data-combinations: - // 1. <jobid>:<desired_rank> -> vpid - // 2. <jobid> -> job_max_rank - (void) hcreate(NUM_RANKS); - - // initialize hsearch storage - struct HashStorage hstorage = { .key_index = 0, .data_index = 0 }; - int session_fd = 0; while ((session_fd = accept(server_fd, NULL, NULL)) >= 0) { + // read the length and discard it (by overwriting it next) + recv(session_fd, client_message, sizeof(uint32_t), 0); + printf("Length-Msg: %s\n", client_message); // since recv() seems to be atomar, no concurrent behaviour must be // considered recv(session_fd, client_message, BUFFLEN, 0); - printf("%s\n", client_message); - - pid_t pid; - uint32_t vpid; - uint32_t jobid; - size_t size; - sscanf(client_message, "%d,%u,%u,%zu", &pid, &vpid, &jobid, &size); + printf("Message: %s\n", client_message); + + pid_t pid = 0; + uint32_t vpid = 0; + uint32_t jobid = 0; + size_t size = 0; + int vars_read = sscanf(client_message, "{\"msg_type\": 128, \"msg_data\": \"%d,%u,%u,%zu\"}", &pid, &vpid, &jobid, &size); + printf("Sscanf read count: %d (should equal 4)\n", vars_read); printf("Spawned - PID: %d, vpid: %u, jobID: %u, size: %zu\n", pid, vpid, jobid, size); - uint32_t desiredRank = getDesiredVpid(jobid, vpid, size, &hstorage); - - char rankString[RANK_STR_LEN]; - snprintf(rankString, RANK_STR_LEN+1, "%u", desiredRank); - if (send(session_fd, rankString, RANK_STR_LEN+1, 0) < 0) { + // generate a comma separated list of ascending integers to be used as ranks + char rankList[BUFFLEN / 2] = ""; + char first_rank[] = "0"; + strcat(rankList, first_rank); + for (uint32_t i = 1; i < size; i++) { + char rank[BUFFLEN]; + sprintf(rank, ",%u", i); + strcat(rankList, rank); + } + char end[] = "\0"; + strcat(rankList, end); + + char rankString[BUFFLEN]; + snprintf(rankString, BUFFLEN, "{\"msg_type\": 0, \"msg_data\": \"%s\"}", rankList); + // the length must be send first to emulate the behaviour of the actual TCP server but it + // gets discarded on client side anyway so it does not need to make sense + uint32_t unneeded_length = 0; + send(session_fd, &unneeded_length, sizeof(uint32_t), 0); + // send the actual message + if (send(session_fd, rankString, BUFFLEN, 0) < 0) { errorExit("send"); } - printf("Send Rank: %d\n\n", desiredRank); + printf("Send Rank: %s\n\n", rankString); close(session_fd); } - hdestroy(); close(server_fd);