Skip to content
Snippets Groups Projects
Commit 771c9f42 authored by FKHals's avatar FKHals
Browse files

Merge commit '70778c68' as 'rank-swapper-agent'

parents ea69cc96 70778c68
Branches
No related tags found
No related merge requests found
socket
hello
locserv
*.o
export OMPI := /<path-to-my-openmpi-installation>/openmpi-install
export PATH := $(PATH):$(OMPI)/bin
export LD_LIBRARY_PATH := $(LD_LIBRARY_PATH):$(OMPI)/bin
# requirement: GNU make, see https://unix.stackexchange.com/questions/11530/adding-directory-to-path-through-makefile/261844#261844
CC = gcc
SRC = locserv hello_c
OBJ = $(SRC:.c=.o)
CFLAGS = -W -Wall -Wextra -Wpedantic -pedantic
.PHONY: clean
all: locserv hello
com_util.o: com_util.c
$(CC) $(CFLAGS) -c $<
locserv: locserv.c com_util.o
$(CC) $(CFLAGS) -o $@ $< com_util.o
hello: hello_c.c com_util.o
mpicc -o $@ $< com_util.o
clean:
rm -f $(OBJ) com_util.o
# OpenMPI Rank Swapper Agent
0. Set the required environment variable `OMPI` which is most probably the path to your directory `openmpi-install` where the current OpenMPI-installation resides (just replace `<path-to-my-openmpi-installation>`). You can export it either in the `Makefile` (if a real MPI program should be run) or in the `client.sh` (if the test client should be run) or in your current shell if you do not mind repeating that for different shell instances.
1. Compile everything:
```shell
make all
```
2. Starting the Agent/Server:
```shell
./server.sh
```
3. Starting the OpenMPI-Client (other MPI-Programs should also suffice):
```shell
./client.sh
```
#!/bin/bash
export OMPI=/<path-to-my-openmpi-installation>/openmpi-install
export PATH=$PATH:$OMPI/bin
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$OMPI/bin
PROCESSES=1
HOSTFILE="--hostfile hostfile.txt"
ENABLE_HYPERTHREADING="--bind-to hwthread"
ALLOW_OVERLOAD="--bind-to core:overload-allowed"
ENABLE_OVERSUBSCRIBE="--map-by :OVERSUBSCRIBE"
# --bind-to hwthread: makes the usage of threads instead of just cores possible
mpirun --np $PROCESSES $ALLOW_OVERLOAD $ENABLE_OVERSUBSCRIBE $HOSTFILE hello
/* Quelle: http://www.netzmafia.de/skripten/inetprog/ThomasSocket2.pdf */
/* ------------------------------------------------------------------------------ */
/* C-Quell-Modul com_util.c */
/* ------------------------------------------------------------------------------ */
/* Hilfsfunktionen, die in den Programmen für locserv und loccli */
/* eingesetzt werden */
/* ------------------------------------------------------------------------------ */
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
/* ------------------------------------------------------------------------------ */
/* Ausgabe einer Meldung msg, gefolgt von der Fehlermeldung, die dem in errno */
/* gespeicherten Fehlercode entspricht */
/* anschließende Beendigung des Programms */
void errorExit(char* msg) {
perror(msg);
exit(1);
}
/*
* Copyright (c) 2004-2006 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2006 Cisco Systems, Inc. All rights reserved.
*
* Sample MPI "hello world" application in C
*/
#include <stdlib.h>
#include <unistd.h>
#include <stdio.h>
#include <sys/socket.h>
#include <sys/un.h>
#include "mpi.h"
#define SOCKET_PATH "/home/beach/Dokumente/Uni/Informatik_BA/Bachelorarbeit_MPI/sockets_mpi_test/socket"
#define BUFFLEN 64
/* Spawn Modes (dynamic job spawning):
* 1: Spawn just one process (in one job)
* 2: Spawn 2 processes in 2 different jobs
* 3: Spawn 2 prcoesses in one (shared) job
* 0 or other: Do not spawn a dynamic process/job
*/
#define SPAWN_MODE 0
void errorExit(char* msg);
/*void notifyProcessAgent(pid_t pid, int rank, const char *eventInfo) {
struct sockaddr_un strAddr;
socklen_t lenAddr;
int fdSock;
if ((fdSock=socket(PF_UNIX, SOCK_STREAM, 0)) < 0) {
errorExit("socket");
}
strAddr.sun_family=AF_LOCAL; // Unix domain
strcpy(strAddr.sun_path, SOCKET_PATH);
lenAddr=sizeof(strAddr.sun_family)+strlen(strAddr.sun_path);
if (connect(fdSock, (struct sockaddr*)&strAddr, lenAddr) !=0 ) {
errorExit("connect");
}
char info2Send[BUFFLEN];
snprintf(info2Send, BUFFLEN+1, "%s: %d, %d", eventInfo, pid, rank);
if (send(fdSock, info2Send, BUFFLEN+1, 0) < 0) {
errorExit("send");
}
printf("\nData send!\n");
char rank2Recv[BUFFLEN];
recv(fdSock, rank2Recv, BUFFLEN+1, 0);
int receivedNumber = (int)strtol(rank2Recv, NULL, 0);
printf("Received from server: %d\n", receivedNumber);
close(fdSock);
}*/
int main(int argc, char* argv[]) {
int rank, size, len;
pid_t pid;
char version[MPI_MAX_LIBRARY_VERSION_STRING];
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
pid = getpid();
//notifyProcessAgent(pid, rank, "Spawned");
printf("Hello, world, I am %d of %d, PID: %d\n", rank, size, pid);
// dynamically spawn child process
// https://mpi.deino.net/mpi_functions/MPI_Comm_spawn.html
if (1 == SPAWN_MODE) {
int np = 1;
int errcodes[1];
MPI_Comm parentcomm, intercomm;
MPI_Comm_get_parent( &parentcomm );
if (parentcomm == MPI_COMM_NULL) {
MPI_Comm_spawn( "hello", MPI_ARGV_NULL, np, MPI_INFO_NULL, 0,
MPI_COMM_WORLD, &intercomm, errcodes );
printf("I'm the parent.\n");
} else {
printf("I'm the spawned.\n");
}
if (0 != errcodes[0]) {
printf("ERROR_SPAWN: code: %d\n", errcodes[0]);
}
// (instead of Comm_multiple) spawns a second, different intercommunicator
} else if (2 == SPAWN_MODE) {
int np = 1;
int errcodes[1];
MPI_Comm parentcomm, intercomm;
MPI_Comm_get_parent( &parentcomm );
if (parentcomm == MPI_COMM_NULL) {
for (int i = 0; i < 2; i++) {
MPI_Comm_spawn( "hello", MPI_ARGV_NULL, np, MPI_INFO_NULL, 0,
MPI_COMM_WORLD, &intercomm, errcodes );
if (0 != errcodes[0]) {
printf("ERROR_SPAWN: code: %d\n", errcodes[0]);
}
}
printf("I'm the parent.\n");
} else {
printf("I'm the spawned.\n");
}
} else if (3 == SPAWN_MODE) {
int np[2] = { 1, 1 };
int errcodes[2];
MPI_Comm parentcomm, intercomm;
char *cmds[2] = { "hello", "hello" };
MPI_Info infos[2] = { MPI_INFO_NULL, MPI_INFO_NULL };
MPI_Comm_get_parent( &parentcomm );
if (parentcomm == MPI_COMM_NULL) {
// Create n more processes using the "hello" executable
MPI_Comm_spawn_multiple(2, cmds, MPI_ARGVS_NULL, np, infos, 0, MPI_COMM_WORLD, &intercomm, errcodes );
printf("I'm the parent.\n");
} else {
printf("I'm the spawned.\n");
}
for (int i = 0; i < 2; i++) {
if (0 != errcodes[i]) {
printf("ERROR_SPAWN: code: %d\n", errcodes[i]);
}
}
}
sleep(5);
MPI_Finalize();
//notifyProcessAgent(pid, rank, "Ended");
return 0;
}
localhost slots=4 max-slots=4
/* Quelle: http://www.netzmafia.de/skripten/inetprog/ThomasSocket2.pdf */
/* ------------------------------------------------------------------------------- */
/* Programm locserv */
/* ------------------------------------------------------------------------------- */
/* Beispiel fuer Unix Domain Sockets */
/* Einfacher Serverprozeß, */
/* - der auf einen Verbindungswunsch durch einen Clientprozeß wartet, */
/* - einen Verbindungswunsch akzeptiert */
/* - und die ihm vom Clientprozeß geschickten Daten an die Standardausgabe ausgibt */
/* Wenn der Clientprozeß die Kommunikation durch Senden von EOF (=CTRL-D) beendet, */
/* wartet er auf den nächsten Verbindungswunsch */
/* ------------------------------------------------------------------------------- */
#include <search.h>
#include <stdarg.h>
#include <stdio.h>
#include <stdint.h>
#include <string.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <time.h>
#include <unistd.h>
#define SOCKET_PATH "/home/beach/Dokumente/Uni/Informatik_BA/Bachelorarbeit_MPI/sockets_mpi_test/socket"
#define MAX_CONNECTIONS 10
#define RANK_STR_LEN 8
#define BUFFLEN 64
/* hashtable info source:
* https://pubs.opengroup.org/onlinepubs/9699919799/functions/hcreate.html,
* https://linux.die.net/man/3/hsearch_r
*
* number of different ranks of processes in different jobs plus number of
* different job (which equals the number of ranks in worst case)
*/
#define NUM_RANKS 100
void errorExit(char* msg);
/* storage for the hashmap
* (The hashtable works exclusively with pointers so it is necessary to
* allocate something to put the data in to be pointed to by the hashtable)
*/
typedef struct HashStorage{
char keys[NUM_RANKS][32]; // stores the different keys [<jobid>:<vpid>] or [<jobid>]
int key_index; // the current point in the key array that can be written to
int data[NUM_RANKS]; // stores the different ranks and current job max ranks that the hashtable needs to point at
int data_index; // the current point in the data array that can be written to
} HashStorage;
/* Stores the given jobid as the items key */
void setKeyJobid(uint32_t jobid, char* key_str, ENTRY* item) {
sprintf(key_str, "%u", jobid);
item->key = key_str;
}
/* Sets a combination (<jobid>:<vpid>) of the given jobid and vpid as the items key */
void setKeyJobidVpid(uint32_t jobid, uint32_t vpid, char* key_str, ENTRY* item) {
sprintf(key_str, "%u:%u", jobid, vpid);
// check if rank is already assigned to a certain jobid+vpid combination
item->key = key_str;
}
/* Put an item of the given value with the given key into the table */
void putValueIntoTable(int value, char* key_str, HashStorage* hstorage, ENTRY* item) {
strcpy(hstorage->keys[hstorage->key_index], key_str);
item->key = hstorage->keys[hstorage->key_index];
hstorage->data[hstorage->data_index] = value;
item->data = &(hstorage->data[hstorage->data_index]);
// actually put the pointers (key, data) into the hashtable
if (NULL == hsearch(*item, ENTER)) {
errorExit("hsearch");
}
// increment the pointer to the next free places in the storage arrays
hstorage->data_index++;
hstorage->key_index++;
}
/* Returns the desired vpid (which equals the rank later on in Comm_init()) by
* either retrieving it from the hashtable if it is already known or by setting
* it to the maximum currently usable rank per jobid (which is 0 if no other
* processes with the same jobid has yet been seen)
*/
uint32_t getDesiredVpid(uint32_t jobid, uint32_t vpid, size_t size, HashStorage* hstorage) {
ENTRY item;
ENTRY *found_item; // Name to look for in table.
char key_str[16];
setKeyJobidVpid(jobid, vpid, key_str, &item);
if ((found_item = hsearch(item, FIND)) != NULL) {
printf("Found Process: %u:%u -> %d\n", jobid, vpid, *((int *)found_item->data));
// just return the already chosen vpid if the process (name) is known
return *((int *)found_item->data);
} else {
setKeyJobid(jobid, key_str, &item);
if ((found_item = hsearch(item, FIND)) != NULL) {
// increment maximum stored vpid to remember which vpids are
// already (ascendingly) assigned
int stored_vpid_count = ++(*((int *)found_item->data));
// descendingly generate vpid/rank to demonstrate that the order is
// not important anymore
int new_vpid = (size - 1) - stored_vpid_count;
printf("Found Job: %u -> %d\n", jobid, new_vpid);
// update the maximum stored vpid
putValueIntoTable(stored_vpid_count, key_str, hstorage, &item);
// store the process name (which gets a randomized rank)
setKeyJobidVpid(jobid, vpid, key_str, &item);
putValueIntoTable(new_vpid, key_str, hstorage, &item);
return new_vpid;
} else {
int new_vpid = size - 1;
printf("Nothing found: %u -> %d\n", jobid, new_vpid);
// initialize the stored maximum vpid
putValueIntoTable(0, key_str, hstorage, &item);
// store the process name (which gets a randomized rank)
setKeyJobidVpid(jobid, vpid, key_str, &item);
putValueIntoTable(new_vpid, key_str, hstorage, &item);
return new_vpid;
}
}
}
int main(void) {
char client_message[BUFFLEN];
struct sockaddr_un strAddr;
socklen_t lenAddr;
int fdSock;
int fdConn;
if ((fdSock=socket(PF_UNIX, SOCK_STREAM, 0)) < 0) {
errorExit("socket");
}
unlink (SOCKET_PATH); /* Sicherstellung, daß SOCKET_PATH nicht existiert */
strAddr.sun_family=AF_LOCAL; /* Unix Domain */
strcpy(strAddr.sun_path, SOCKET_PATH);
lenAddr=sizeof(strAddr.sun_family)+strlen(strAddr.sun_path);
if (bind(fdSock, (struct sockaddr*)&strAddr, lenAddr) != 0) {
errorExit("bind");
}
if (listen(fdSock, MAX_CONNECTIONS) != 0) {
errorExit("listen");
}
// create hashtable with two different types of key-data-combinations:
// 1. <jobid>:<desired_rank> -> vpid
// 2. <jobid> -> job_max_rank
(void) hcreate(NUM_RANKS);
// initialize hsearch storage
struct HashStorage hstorage = { .key_index = 0, .data_index = 0 };
while ((fdConn=accept(fdSock, (struct sockaddr*)&strAddr, &lenAddr)) >= 0) {
// since recv() seems to be atomar, no concurrent behaviour must be
// considered
recv(fdConn , client_message , BUFFLEN , 0);
printf("%s\n", client_message);
pid_t pid;
uint32_t vpid;
uint32_t jobid;
size_t size;
sscanf(client_message, "Spawned - PID: %d, vpid: %u, jobID: %u, size: %zu", &pid, &vpid, &jobid, &size);
//printf("\nCheck: PID: %d, vpid: %u, jobID: %u, size: %zu", pid, vpid, jobid, size);
uint32_t desiredRank = getDesiredVpid(jobid, vpid, size, &hstorage);
char rankString[RANK_STR_LEN];
snprintf(rankString, RANK_STR_LEN+1, "%u", desiredRank);
if (send(fdConn, rankString, RANK_STR_LEN+1, 0) < 0) {
errorExit("send");
}
printf("Send Rank: %d\n\n", desiredRank);
close(fdConn);
}
hdestroy();
close(fdSock);
return 0;
}
#!/bin/bash
./locserv
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment