From 857af6a4c06f03ff0b9e4fb55112337863cfa5f7 Mon Sep 17 00:00:00 2001 From: FKHals <5229803-FKHals@users.noreply.gitlab.com> Date: Mon, 26 Sep 2022 15:32:38 +0200 Subject: [PATCH] Use TCP connection from MPI to node-agent instead of the previously used IPC connection (linux domain sockets) in preparation to connect with the new asynchronous node-agent. --- ompi/communicator/comm_init.c | 56 ++++++++++++++++++--------- rank-swapper-agent/client.sh | 2 +- rank-swapper-agent/locserv.c | 73 ++++++++++++++++++++++------------- rank-swapper-agent/server.sh | 4 +- 4 files changed, 87 insertions(+), 48 deletions(-) diff --git a/ompi/communicator/comm_init.c b/ompi/communicator/comm_init.c index 6b81e04c90..70e9f74895 100644 --- a/ompi/communicator/comm_init.c +++ b/ompi/communicator/comm_init.c @@ -96,41 +96,61 @@ static void errorExit(char* msg) { // TODO: Change int return to size_t! static int getProcessAgentRank(uint32_t jobid, uint32_t vpid, size_t size) { - struct sockaddr_un strAddr; - socklen_t lenAddr; - int fdSock; - - if ((fdSock=socket(PF_UNIX, SOCK_STREAM, 0)) < 0) { - errorExit("socket"); + struct addrinfo hints; + struct addrinfo *result, *rp; + const char* host_ip = "localhost"; + const char * agent_port = getenv("DPM_AGENT_PORT"); + if (NULL == agent_port) { + errorExit("Could not find DPM_AGENT_PORT env"); } + memset(&hints, 0, sizeof(struct addrinfo)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; //opting for reliable sequenced socket type + hints.ai_flags = 0; + hints.ai_protocol = IPPROTO_TCP; + + int error_code = getaddrinfo(host_ip, agent_port, &hints, &result); + if (0 != error_code) { + errorExit("Could not get addrinfo!"); + } + + int socket_fd = 0; + for (rp = result; rp != NULL; rp = rp->ai_next) { + socket_fd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); + + if (socket_fd == -1) { + continue; + } - strAddr.sun_family=AF_LOCAL; /* Unix domain */ - const char * socket_path = getenv("DPM_AGENT_SOCKET"); // check for success - if (NULL == socket_path) { - errorExit("Could not find DPM_AGENT_SOCKET env"); + if (0 == connect(socket_fd, rp->ai_addr, rp->ai_addrlen)) { + break; //successfully connected + } else { + close(socket_fd); //close socket when we cannot connect + } } - strcpy(strAddr.sun_path, socket_path); - lenAddr=sizeof(strAddr.sun_family)+strlen(strAddr.sun_path); - if (connect(fdSock, (struct sockaddr*)&strAddr, lenAddr) !=0 ) { - errorExit("connect"); + freeaddrinfo(result); //free result + // exit if the result-linked-list has been traversed until the end without a successful + // connection + if (NULL == rp) { + errorExit("Could not connect to host. Host not available or host/port invalid."); } pid_t pid = getpid(); char info_to_send[BUFFLEN]; - snprintf(info_to_send, BUFFLEN+1, + snprintf(info_to_send, BUFFLEN, "%d,%u,%u,%zu", pid, vpid, jobid, size); - if (send(fdSock, info_to_send, BUFFLEN+1, 0) < 0) { + if (send(socket_fd, info_to_send, BUFFLEN, 0) < 0) { errorExit("send"); } char rank_to_recv[BUFFLEN]; - recv(fdSock, rank_to_recv, BUFFLEN+1, 0); + recv(socket_fd, rank_to_recv, BUFFLEN, 0); int received_rank = (int)strtol(rank_to_recv, NULL, 0); printf("Received from server: %d\n", received_rank); - close(fdSock); + close(socket_fd); return received_rank; } diff --git a/rank-swapper-agent/client.sh b/rank-swapper-agent/client.sh index 19dad87afc..d2abec61ea 100755 --- a/rank-swapper-agent/client.sh +++ b/rank-swapper-agent/client.sh @@ -3,7 +3,7 @@ # the export not done directly since not all shells allow the value of a # variable to be set at the same time it is exported OMPI=/home/openmpi-install; export OMPI -DPM_AGENT_SOCKET=$OMPI/dpm_socket_socket; export DPM_AGENT_SOCKET +DPM_AGENT_PORT=25000; export DPM_AGENT_PORT # BEWARE: the order (custom OpenMPI first/prepended) is important since the # first gets picked and there may be other MPI installations on the system! PATH=$OMPI/bin:$PATH; export PATH diff --git a/rank-swapper-agent/locserv.c b/rank-swapper-agent/locserv.c index 8bfde488ce..a749b80791 100644 --- a/rank-swapper-agent/locserv.c +++ b/rank-swapper-agent/locserv.c @@ -18,7 +18,9 @@ #include <stdint.h> #include <string.h> #include <stdlib.h> +#include <netdb.h> #include <sys/socket.h> +#include <netinet/in.h> #include <sys/un.h> #include <time.h> #include <unistd.h> @@ -122,33 +124,49 @@ uint32_t getDesiredVpid(uint32_t jobid, uint32_t vpid, size_t size, HashStorage* } int main(void) { - + + // TCP server based on http://www.microhowto.info/howto/listen_for_and_accept_tcp_connections_in_c.html + char client_message[BUFFLEN]; - - struct sockaddr_un strAddr; - socklen_t lenAddr; - int fdSock; - int fdConn; - - if ((fdSock=socket(PF_UNIX, SOCK_STREAM, 0)) < 0) { - errorExit("socket"); + + const char* host_ip = "localhost"; + + const char * agent_port = getenv("DPM_AGENT_PORT"); + if (NULL == agent_port) { + errorExit("Could not find DPM_AGENT_PORT env"); } - // create unix domain socket in the installation directory of OpenMPI - const char * socket_path = getenv("DPM_AGENT_SOCKET"); // check for success - if (NULL == socket_path) { - errorExit("Could not find DPM_AGENT_SOCKET env"); + struct addrinfo hints; + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_protocol = 0; + hints.ai_flags = AI_PASSIVE|AI_ADDRCONFIG; + struct addrinfo* result = 0; + int error_code = getaddrinfo(host_ip, agent_port, &hints, &result); + if (0 != error_code) { + errorExit("Could not get addrinfo!"); } - unlink (socket_path); /* Make sure that socket_path does not exist yet */ - strAddr.sun_family=AF_LOCAL; /* Unix Domain */ - strcpy(strAddr.sun_path, socket_path); - lenAddr=sizeof(strAddr.sun_family)+strlen(strAddr.sun_path); - if (bind(fdSock, (struct sockaddr*)&strAddr, lenAddr) != 0) { - errorExit("bind"); + + int server_fd = socket(result->ai_family,result->ai_socktype,result->ai_protocol); + if (-1 == server_fd) { + errorExit("Could not create server socket!"); } - - if (listen(fdSock, MAX_CONNECTIONS) != 0) { - errorExit("listen"); + + // FIXME Remove after debugging + int reuseaddr = 1; + if (-1 == setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr, sizeof(reuseaddr))) { + errorExit("Could set SO_REUSEADDR!"); + } + + if (-1 == bind(server_fd, result->ai_addr, result->ai_addrlen)) { + errorExit("Could not bind address!"); + } + + freeaddrinfo(result); + + if (0 != listen(server_fd, MAX_CONNECTIONS)) { + errorExit("failed to listen for connections"); } // create hashtable with two different types of key-data-combinations: @@ -159,11 +177,12 @@ int main(void) { // initialize hsearch storage struct HashStorage hstorage = { .key_index = 0, .data_index = 0 }; - while ((fdConn=accept(fdSock, (struct sockaddr*)&strAddr, &lenAddr)) >= 0) { + int session_fd = 0; + while ((session_fd = accept(server_fd, NULL, NULL)) >= 0) { // since recv() seems to be atomar, no concurrent behaviour must be // considered - recv(fdConn , client_message , BUFFLEN , 0); + recv(session_fd, client_message, BUFFLEN, 0); printf("%s\n", client_message); pid_t pid; @@ -177,16 +196,16 @@ int main(void) { char rankString[RANK_STR_LEN]; snprintf(rankString, RANK_STR_LEN+1, "%u", desiredRank); - if (send(fdConn, rankString, RANK_STR_LEN+1, 0) < 0) { + if (send(session_fd, rankString, RANK_STR_LEN+1, 0) < 0) { errorExit("send"); } printf("Send Rank: %d\n\n", desiredRank); - close(fdConn); + close(session_fd); } hdestroy(); - close(fdSock); + close(server_fd); return 0; } diff --git a/rank-swapper-agent/server.sh b/rank-swapper-agent/server.sh index 563310e0b6..6f0f70c278 100755 --- a/rank-swapper-agent/server.sh +++ b/rank-swapper-agent/server.sh @@ -1,5 +1,5 @@ #!/bin/bash -export OMPI=/home/openmpi-install -export DPM_AGENT_SOCKET=$OMPI/dpm_socket_socket +OMPI=/home/openmpi-install; export OMPI +DPM_AGENT_PORT=25000; export DPM_AGENT_PORT ./locserv -- GitLab