diff --git a/rank-swapper-agent/.gitignore b/rank-swapper-agent/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..f639b59cf707e0be382089316782ac90f2c4bc46 --- /dev/null +++ b/rank-swapper-agent/.gitignore @@ -0,0 +1,4 @@ +socket +hello +locserv +*.o diff --git a/rank-swapper-agent/Makefile b/rank-swapper-agent/Makefile new file mode 100644 index 0000000000000000000000000000000000000000..7aefe6ec5f0320ff57f1066ca4fe63c46a9ddcaa --- /dev/null +++ b/rank-swapper-agent/Makefile @@ -0,0 +1,25 @@ +export OMPI := /<path-to-my-openmpi-installation>/openmpi-install +export PATH := $(PATH):$(OMPI)/bin +export LD_LIBRARY_PATH := $(LD_LIBRARY_PATH):$(OMPI)/bin +# requirement: GNU make, see https://unix.stackexchange.com/questions/11530/adding-directory-to-path-through-makefile/261844#261844 + +CC = gcc +SRC = locserv hello_c +OBJ = $(SRC:.c=.o) +CFLAGS = -W -Wall -Wextra -Wpedantic -pedantic + +.PHONY: clean + +all: locserv hello + +com_util.o: com_util.c + $(CC) $(CFLAGS) -c $< + +locserv: locserv.c com_util.o + $(CC) $(CFLAGS) -o $@ $< com_util.o + +hello: hello_c.c com_util.o + mpicc -o $@ $< com_util.o + +clean: + rm -f $(OBJ) com_util.o diff --git a/rank-swapper-agent/README.md b/rank-swapper-agent/README.md new file mode 100644 index 0000000000000000000000000000000000000000..d11311f35cec794be9ef2de5320e64d248a733fe --- /dev/null +++ b/rank-swapper-agent/README.md @@ -0,0 +1,19 @@ +# OpenMPI Rank Swapper Agent + +0. Set the required environment variable `OMPI` which is most probably the path to your directory `openmpi-install` where the current OpenMPI-installation resides (just replace `<path-to-my-openmpi-installation>`). You can export it either in the `Makefile` (if a real MPI program should be run) or in the `client.sh` (if the test client should be run) or in your current shell if you do not mind repeating that for different shell instances. + +1. Compile everything: +```shell +make all +``` + +2. Starting the Agent/Server: +```shell +./server.sh +``` + +3. Starting the OpenMPI-Client (other MPI-Programs should also suffice): +```shell +./client.sh +``` + diff --git a/rank-swapper-agent/client.sh b/rank-swapper-agent/client.sh new file mode 100755 index 0000000000000000000000000000000000000000..cb1d85889e5ee0f9657cf4452d3b32ee04aa9eef --- /dev/null +++ b/rank-swapper-agent/client.sh @@ -0,0 +1,14 @@ +#!/bin/bash + +export OMPI=/<path-to-my-openmpi-installation>/openmpi-install +export PATH=$PATH:$OMPI/bin +export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$OMPI/bin + +PROCESSES=1 + +HOSTFILE="--hostfile hostfile.txt" +ENABLE_HYPERTHREADING="--bind-to hwthread" +ALLOW_OVERLOAD="--bind-to core:overload-allowed" +ENABLE_OVERSUBSCRIBE="--map-by :OVERSUBSCRIBE" +# --bind-to hwthread: makes the usage of threads instead of just cores possible +mpirun --np $PROCESSES $ALLOW_OVERLOAD $ENABLE_OVERSUBSCRIBE $HOSTFILE hello diff --git a/rank-swapper-agent/com_util.c b/rank-swapper-agent/com_util.c new file mode 100644 index 0000000000000000000000000000000000000000..5a2a40f1c1241d2d3c45aaa64fcd08aae8608139 --- /dev/null +++ b/rank-swapper-agent/com_util.c @@ -0,0 +1,23 @@ +/* Quelle: http://www.netzmafia.de/skripten/inetprog/ThomasSocket2.pdf */ + +/* ------------------------------------------------------------------------------ */ +/* C-Quell-Modul com_util.c */ +/* ------------------------------------------------------------------------------ */ +/* Hilfsfunktionen, die in den Programmen für locserv und loccli */ +/* eingesetzt werden */ +/* ------------------------------------------------------------------------------ */ + +#include <unistd.h> +#include <stdio.h> +#include <stdlib.h> + +/* ------------------------------------------------------------------------------ */ +/* Ausgabe einer Meldung msg, gefolgt von der Fehlermeldung, die dem in errno */ +/* gespeicherten Fehlercode entspricht */ +/* anschließende Beendigung des Programms */ + +void errorExit(char* msg) { + perror(msg); + exit(1); +} + diff --git a/rank-swapper-agent/hello_c.c b/rank-swapper-agent/hello_c.c new file mode 100644 index 0000000000000000000000000000000000000000..d8b3e6111762a2a5f8010d7eda71b1b6c37faddc --- /dev/null +++ b/rank-swapper-agent/hello_c.c @@ -0,0 +1,147 @@ +/* + * Copyright (c) 2004-2006 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2006 Cisco Systems, Inc. All rights reserved. + * + * Sample MPI "hello world" application in C + */ + +#include <stdlib.h> +#include <unistd.h> +#include <stdio.h> +#include <sys/socket.h> +#include <sys/un.h> + +#include "mpi.h" + +#define SOCKET_PATH "/home/beach/Dokumente/Uni/Informatik_BA/Bachelorarbeit_MPI/sockets_mpi_test/socket" +#define BUFFLEN 64 + +/* Spawn Modes (dynamic job spawning): + * 1: Spawn just one process (in one job) + * 2: Spawn 2 processes in 2 different jobs + * 3: Spawn 2 prcoesses in one (shared) job + * 0 or other: Do not spawn a dynamic process/job + */ +#define SPAWN_MODE 0 + +void errorExit(char* msg); + +/*void notifyProcessAgent(pid_t pid, int rank, const char *eventInfo) { + struct sockaddr_un strAddr; + socklen_t lenAddr; + int fdSock; + + if ((fdSock=socket(PF_UNIX, SOCK_STREAM, 0)) < 0) { + errorExit("socket"); + } + + strAddr.sun_family=AF_LOCAL; // Unix domain + strcpy(strAddr.sun_path, SOCKET_PATH); + lenAddr=sizeof(strAddr.sun_family)+strlen(strAddr.sun_path); + if (connect(fdSock, (struct sockaddr*)&strAddr, lenAddr) !=0 ) { + errorExit("connect"); + } + + char info2Send[BUFFLEN]; + snprintf(info2Send, BUFFLEN+1, "%s: %d, %d", eventInfo, pid, rank); + if (send(fdSock, info2Send, BUFFLEN+1, 0) < 0) { + errorExit("send"); + } + printf("\nData send!\n"); + + char rank2Recv[BUFFLEN]; + recv(fdSock, rank2Recv, BUFFLEN+1, 0); + int receivedNumber = (int)strtol(rank2Recv, NULL, 0); + printf("Received from server: %d\n", receivedNumber); + + close(fdSock); +}*/ + +int main(int argc, char* argv[]) { + + int rank, size, len; + pid_t pid; + char version[MPI_MAX_LIBRARY_VERSION_STRING]; + + MPI_Init(&argc, &argv); + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + MPI_Comm_size(MPI_COMM_WORLD, &size); + + pid = getpid(); + + //notifyProcessAgent(pid, rank, "Spawned"); + + printf("Hello, world, I am %d of %d, PID: %d\n", rank, size, pid); + + // dynamically spawn child process + // https://mpi.deino.net/mpi_functions/MPI_Comm_spawn.html + if (1 == SPAWN_MODE) { + + int np = 1; + int errcodes[1]; + MPI_Comm parentcomm, intercomm; + MPI_Comm_get_parent( &parentcomm ); + if (parentcomm == MPI_COMM_NULL) { + MPI_Comm_spawn( "hello", MPI_ARGV_NULL, np, MPI_INFO_NULL, 0, + MPI_COMM_WORLD, &intercomm, errcodes ); + printf("I'm the parent.\n"); + } else { + printf("I'm the spawned.\n"); + } + if (0 != errcodes[0]) { + printf("ERROR_SPAWN: code: %d\n", errcodes[0]); + } + + // (instead of Comm_multiple) spawns a second, different intercommunicator + } else if (2 == SPAWN_MODE) { + + int np = 1; + int errcodes[1]; + MPI_Comm parentcomm, intercomm; + MPI_Comm_get_parent( &parentcomm ); + if (parentcomm == MPI_COMM_NULL) { + for (int i = 0; i < 2; i++) { + MPI_Comm_spawn( "hello", MPI_ARGV_NULL, np, MPI_INFO_NULL, 0, + MPI_COMM_WORLD, &intercomm, errcodes ); + if (0 != errcodes[0]) { + printf("ERROR_SPAWN: code: %d\n", errcodes[0]); + } + } + printf("I'm the parent.\n"); + } else { + printf("I'm the spawned.\n"); + } + + } else if (3 == SPAWN_MODE) { + + int np[2] = { 1, 1 }; + int errcodes[2]; + MPI_Comm parentcomm, intercomm; + char *cmds[2] = { "hello", "hello" }; + MPI_Info infos[2] = { MPI_INFO_NULL, MPI_INFO_NULL }; + MPI_Comm_get_parent( &parentcomm ); + if (parentcomm == MPI_COMM_NULL) { + // Create n more processes using the "hello" executable + MPI_Comm_spawn_multiple(2, cmds, MPI_ARGVS_NULL, np, infos, 0, MPI_COMM_WORLD, &intercomm, errcodes ); + printf("I'm the parent.\n"); + } else { + printf("I'm the spawned.\n"); + } + for (int i = 0; i < 2; i++) { + if (0 != errcodes[i]) { + printf("ERROR_SPAWN: code: %d\n", errcodes[i]); + } + } + } + + sleep(5); + + MPI_Finalize(); + + //notifyProcessAgent(pid, rank, "Ended"); + + return 0; +} + diff --git a/rank-swapper-agent/hostfile.txt b/rank-swapper-agent/hostfile.txt new file mode 100644 index 0000000000000000000000000000000000000000..69990516239c4520dc38c1451f54565aa29dc508 --- /dev/null +++ b/rank-swapper-agent/hostfile.txt @@ -0,0 +1 @@ +localhost slots=4 max-slots=4 diff --git a/rank-swapper-agent/locserv.c b/rank-swapper-agent/locserv.c new file mode 100644 index 0000000000000000000000000000000000000000..828ae8e66770e6e0ae51300adc1c7b903303a601 --- /dev/null +++ b/rank-swapper-agent/locserv.c @@ -0,0 +1,189 @@ +/* Quelle: http://www.netzmafia.de/skripten/inetprog/ThomasSocket2.pdf */ + +/* ------------------------------------------------------------------------------- */ +/* Programm locserv */ +/* ------------------------------------------------------------------------------- */ +/* Beispiel fuer Unix Domain Sockets */ +/* Einfacher Serverprozeß, */ +/* - der auf einen Verbindungswunsch durch einen Clientprozeß wartet, */ +/* - einen Verbindungswunsch akzeptiert */ +/* - und die ihm vom Clientprozeß geschickten Daten an die Standardausgabe ausgibt */ +/* Wenn der Clientprozeß die Kommunikation durch Senden von EOF (=CTRL-D) beendet, */ +/* wartet er auf den nächsten Verbindungswunsch */ +/* ------------------------------------------------------------------------------- */ + +#include <search.h> +#include <stdarg.h> +#include <stdio.h> +#include <stdint.h> +#include <string.h> +#include <stdlib.h> +#include <sys/socket.h> +#include <sys/un.h> +#include <time.h> +#include <unistd.h> + +#define SOCKET_PATH "/home/beach/Dokumente/Uni/Informatik_BA/Bachelorarbeit_MPI/sockets_mpi_test/socket" +#define MAX_CONNECTIONS 10 +#define RANK_STR_LEN 8 +#define BUFFLEN 64 + +/* hashtable info source: + * https://pubs.opengroup.org/onlinepubs/9699919799/functions/hcreate.html, + * https://linux.die.net/man/3/hsearch_r + * + * number of different ranks of processes in different jobs plus number of + * different job (which equals the number of ranks in worst case) + */ +#define NUM_RANKS 100 + +void errorExit(char* msg); + +/* storage for the hashmap + * (The hashtable works exclusively with pointers so it is necessary to + * allocate something to put the data in to be pointed to by the hashtable) + */ +typedef struct HashStorage{ + char keys[NUM_RANKS][32]; // stores the different keys [<jobid>:<vpid>] or [<jobid>] + int key_index; // the current point in the key array that can be written to + int data[NUM_RANKS]; // stores the different ranks and current job max ranks that the hashtable needs to point at + int data_index; // the current point in the data array that can be written to +} HashStorage; + +/* Stores the given jobid as the items key */ +void setKeyJobid(uint32_t jobid, char* key_str, ENTRY* item) { + sprintf(key_str, "%u", jobid); + item->key = key_str; +} + +/* Sets a combination (<jobid>:<vpid>) of the given jobid and vpid as the items key */ +void setKeyJobidVpid(uint32_t jobid, uint32_t vpid, char* key_str, ENTRY* item) { + sprintf(key_str, "%u:%u", jobid, vpid); + // check if rank is already assigned to a certain jobid+vpid combination + item->key = key_str; +} + +/* Put an item of the given value with the given key into the table */ +void putValueIntoTable(int value, char* key_str, HashStorage* hstorage, ENTRY* item) { + strcpy(hstorage->keys[hstorage->key_index], key_str); + item->key = hstorage->keys[hstorage->key_index]; + hstorage->data[hstorage->data_index] = value; + item->data = &(hstorage->data[hstorage->data_index]); + // actually put the pointers (key, data) into the hashtable + if (NULL == hsearch(*item, ENTER)) { + errorExit("hsearch"); + } + // increment the pointer to the next free places in the storage arrays + hstorage->data_index++; + hstorage->key_index++; +} + +/* Returns the desired vpid (which equals the rank later on in Comm_init()) by + * either retrieving it from the hashtable if it is already known or by setting + * it to the maximum currently usable rank per jobid (which is 0 if no other + * processes with the same jobid has yet been seen) + */ +uint32_t getDesiredVpid(uint32_t jobid, uint32_t vpid, size_t size, HashStorage* hstorage) { + ENTRY item; + ENTRY *found_item; // Name to look for in table. + char key_str[16]; + + setKeyJobidVpid(jobid, vpid, key_str, &item); + if ((found_item = hsearch(item, FIND)) != NULL) { + printf("Found Process: %u:%u -> %d\n", jobid, vpid, *((int *)found_item->data)); + // just return the already chosen vpid if the process (name) is known + return *((int *)found_item->data); + } else { + setKeyJobid(jobid, key_str, &item); + if ((found_item = hsearch(item, FIND)) != NULL) { + // increment maximum stored vpid to remember which vpids are + // already (ascendingly) assigned + int stored_vpid_count = ++(*((int *)found_item->data)); + // descendingly generate vpid/rank to demonstrate that the order is + // not important anymore + int new_vpid = (size - 1) - stored_vpid_count; + printf("Found Job: %u -> %d\n", jobid, new_vpid); + // update the maximum stored vpid + putValueIntoTable(stored_vpid_count, key_str, hstorage, &item); + // store the process name (which gets a randomized rank) + setKeyJobidVpid(jobid, vpid, key_str, &item); + putValueIntoTable(new_vpid, key_str, hstorage, &item); + return new_vpid; + } else { + int new_vpid = size - 1; + printf("Nothing found: %u -> %d\n", jobid, new_vpid); + // initialize the stored maximum vpid + putValueIntoTable(0, key_str, hstorage, &item); + // store the process name (which gets a randomized rank) + setKeyJobidVpid(jobid, vpid, key_str, &item); + putValueIntoTable(new_vpid, key_str, hstorage, &item); + return new_vpid; + } + } +} + +int main(void) { + + char client_message[BUFFLEN]; + + struct sockaddr_un strAddr; + socklen_t lenAddr; + int fdSock; + int fdConn; + + if ((fdSock=socket(PF_UNIX, SOCK_STREAM, 0)) < 0) { + errorExit("socket"); + } + + unlink (SOCKET_PATH); /* Sicherstellung, daß SOCKET_PATH nicht existiert */ + strAddr.sun_family=AF_LOCAL; /* Unix Domain */ + strcpy(strAddr.sun_path, SOCKET_PATH); + lenAddr=sizeof(strAddr.sun_family)+strlen(strAddr.sun_path); + if (bind(fdSock, (struct sockaddr*)&strAddr, lenAddr) != 0) { + errorExit("bind"); + } + + if (listen(fdSock, MAX_CONNECTIONS) != 0) { + errorExit("listen"); + } + + // create hashtable with two different types of key-data-combinations: + // 1. <jobid>:<desired_rank> -> vpid + // 2. <jobid> -> job_max_rank + (void) hcreate(NUM_RANKS); + + // initialize hsearch storage + struct HashStorage hstorage = { .key_index = 0, .data_index = 0 }; + + while ((fdConn=accept(fdSock, (struct sockaddr*)&strAddr, &lenAddr)) >= 0) { + + // since recv() seems to be atomar, no concurrent behaviour must be + // considered + recv(fdConn , client_message , BUFFLEN , 0); + printf("%s\n", client_message); + + pid_t pid; + uint32_t vpid; + uint32_t jobid; + size_t size; + sscanf(client_message, "Spawned - PID: %d, vpid: %u, jobID: %u, size: %zu", &pid, &vpid, &jobid, &size); + //printf("\nCheck: PID: %d, vpid: %u, jobID: %u, size: %zu", pid, vpid, jobid, size); + + uint32_t desiredRank = getDesiredVpid(jobid, vpid, size, &hstorage); + + char rankString[RANK_STR_LEN]; + snprintf(rankString, RANK_STR_LEN+1, "%u", desiredRank); + if (send(fdConn, rankString, RANK_STR_LEN+1, 0) < 0) { + errorExit("send"); + } + printf("Send Rank: %d\n\n", desiredRank); + + close(fdConn); + } + hdestroy(); + + close(fdSock); + + return 0; +} + diff --git a/rank-swapper-agent/server.sh b/rank-swapper-agent/server.sh new file mode 100755 index 0000000000000000000000000000000000000000..2f2bdcdf5cad1c7f6d55370f8b93d44e3523869a --- /dev/null +++ b/rank-swapper-agent/server.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +./locserv