diff --git a/ompi/communicator/comm_init.c b/ompi/communicator/comm_init.c index 6b81e04c90f4cab8cfffbcc5fe36fd5c2f061bda..70e9f74895726698caa93d7da697cce0e1219f83 100644 --- a/ompi/communicator/comm_init.c +++ b/ompi/communicator/comm_init.c @@ -96,41 +96,61 @@ static void errorExit(char* msg) { // TODO: Change int return to size_t! static int getProcessAgentRank(uint32_t jobid, uint32_t vpid, size_t size) { - struct sockaddr_un strAddr; - socklen_t lenAddr; - int fdSock; - - if ((fdSock=socket(PF_UNIX, SOCK_STREAM, 0)) < 0) { - errorExit("socket"); + struct addrinfo hints; + struct addrinfo *result, *rp; + const char* host_ip = "localhost"; + const char * agent_port = getenv("DPM_AGENT_PORT"); + if (NULL == agent_port) { + errorExit("Could not find DPM_AGENT_PORT env"); } + memset(&hints, 0, sizeof(struct addrinfo)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; //opting for reliable sequenced socket type + hints.ai_flags = 0; + hints.ai_protocol = IPPROTO_TCP; + + int error_code = getaddrinfo(host_ip, agent_port, &hints, &result); + if (0 != error_code) { + errorExit("Could not get addrinfo!"); + } + + int socket_fd = 0; + for (rp = result; rp != NULL; rp = rp->ai_next) { + socket_fd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); + + if (socket_fd == -1) { + continue; + } - strAddr.sun_family=AF_LOCAL; /* Unix domain */ - const char * socket_path = getenv("DPM_AGENT_SOCKET"); // check for success - if (NULL == socket_path) { - errorExit("Could not find DPM_AGENT_SOCKET env"); + if (0 == connect(socket_fd, rp->ai_addr, rp->ai_addrlen)) { + break; //successfully connected + } else { + close(socket_fd); //close socket when we cannot connect + } } - strcpy(strAddr.sun_path, socket_path); - lenAddr=sizeof(strAddr.sun_family)+strlen(strAddr.sun_path); - if (connect(fdSock, (struct sockaddr*)&strAddr, lenAddr) !=0 ) { - errorExit("connect"); + freeaddrinfo(result); //free result + // exit if the result-linked-list has been traversed until the end without a successful + // connection + if (NULL == rp) { + errorExit("Could not connect to host. Host not available or host/port invalid."); } pid_t pid = getpid(); char info_to_send[BUFFLEN]; - snprintf(info_to_send, BUFFLEN+1, + snprintf(info_to_send, BUFFLEN, "%d,%u,%u,%zu", pid, vpid, jobid, size); - if (send(fdSock, info_to_send, BUFFLEN+1, 0) < 0) { + if (send(socket_fd, info_to_send, BUFFLEN, 0) < 0) { errorExit("send"); } char rank_to_recv[BUFFLEN]; - recv(fdSock, rank_to_recv, BUFFLEN+1, 0); + recv(socket_fd, rank_to_recv, BUFFLEN, 0); int received_rank = (int)strtol(rank_to_recv, NULL, 0); printf("Received from server: %d\n", received_rank); - close(fdSock); + close(socket_fd); return received_rank; } diff --git a/rank-swapper-agent/client.sh b/rank-swapper-agent/client.sh index 19dad87afc5a708c164eaa2cda40b2bce49e4c37..d2abec61ea0ef8fcd825801117fd3287693ce901 100755 --- a/rank-swapper-agent/client.sh +++ b/rank-swapper-agent/client.sh @@ -3,7 +3,7 @@ # the export not done directly since not all shells allow the value of a # variable to be set at the same time it is exported OMPI=/home/openmpi-install; export OMPI -DPM_AGENT_SOCKET=$OMPI/dpm_socket_socket; export DPM_AGENT_SOCKET +DPM_AGENT_PORT=25000; export DPM_AGENT_PORT # BEWARE: the order (custom OpenMPI first/prepended) is important since the # first gets picked and there may be other MPI installations on the system! PATH=$OMPI/bin:$PATH; export PATH diff --git a/rank-swapper-agent/locserv.c b/rank-swapper-agent/locserv.c index 8bfde488ce7385a1bafc1aa0a5463d7f38d69095..a749b807917f53873579006683adc50ed58b51dc 100644 --- a/rank-swapper-agent/locserv.c +++ b/rank-swapper-agent/locserv.c @@ -18,7 +18,9 @@ #include <stdint.h> #include <string.h> #include <stdlib.h> +#include <netdb.h> #include <sys/socket.h> +#include <netinet/in.h> #include <sys/un.h> #include <time.h> #include <unistd.h> @@ -122,33 +124,49 @@ uint32_t getDesiredVpid(uint32_t jobid, uint32_t vpid, size_t size, HashStorage* } int main(void) { - + + // TCP server based on http://www.microhowto.info/howto/listen_for_and_accept_tcp_connections_in_c.html + char client_message[BUFFLEN]; - - struct sockaddr_un strAddr; - socklen_t lenAddr; - int fdSock; - int fdConn; - - if ((fdSock=socket(PF_UNIX, SOCK_STREAM, 0)) < 0) { - errorExit("socket"); + + const char* host_ip = "localhost"; + + const char * agent_port = getenv("DPM_AGENT_PORT"); + if (NULL == agent_port) { + errorExit("Could not find DPM_AGENT_PORT env"); } - // create unix domain socket in the installation directory of OpenMPI - const char * socket_path = getenv("DPM_AGENT_SOCKET"); // check for success - if (NULL == socket_path) { - errorExit("Could not find DPM_AGENT_SOCKET env"); + struct addrinfo hints; + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_protocol = 0; + hints.ai_flags = AI_PASSIVE|AI_ADDRCONFIG; + struct addrinfo* result = 0; + int error_code = getaddrinfo(host_ip, agent_port, &hints, &result); + if (0 != error_code) { + errorExit("Could not get addrinfo!"); } - unlink (socket_path); /* Make sure that socket_path does not exist yet */ - strAddr.sun_family=AF_LOCAL; /* Unix Domain */ - strcpy(strAddr.sun_path, socket_path); - lenAddr=sizeof(strAddr.sun_family)+strlen(strAddr.sun_path); - if (bind(fdSock, (struct sockaddr*)&strAddr, lenAddr) != 0) { - errorExit("bind"); + + int server_fd = socket(result->ai_family,result->ai_socktype,result->ai_protocol); + if (-1 == server_fd) { + errorExit("Could not create server socket!"); } - - if (listen(fdSock, MAX_CONNECTIONS) != 0) { - errorExit("listen"); + + // FIXME Remove after debugging + int reuseaddr = 1; + if (-1 == setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr, sizeof(reuseaddr))) { + errorExit("Could set SO_REUSEADDR!"); + } + + if (-1 == bind(server_fd, result->ai_addr, result->ai_addrlen)) { + errorExit("Could not bind address!"); + } + + freeaddrinfo(result); + + if (0 != listen(server_fd, MAX_CONNECTIONS)) { + errorExit("failed to listen for connections"); } // create hashtable with two different types of key-data-combinations: @@ -159,11 +177,12 @@ int main(void) { // initialize hsearch storage struct HashStorage hstorage = { .key_index = 0, .data_index = 0 }; - while ((fdConn=accept(fdSock, (struct sockaddr*)&strAddr, &lenAddr)) >= 0) { + int session_fd = 0; + while ((session_fd = accept(server_fd, NULL, NULL)) >= 0) { // since recv() seems to be atomar, no concurrent behaviour must be // considered - recv(fdConn , client_message , BUFFLEN , 0); + recv(session_fd, client_message, BUFFLEN, 0); printf("%s\n", client_message); pid_t pid; @@ -177,16 +196,16 @@ int main(void) { char rankString[RANK_STR_LEN]; snprintf(rankString, RANK_STR_LEN+1, "%u", desiredRank); - if (send(fdConn, rankString, RANK_STR_LEN+1, 0) < 0) { + if (send(session_fd, rankString, RANK_STR_LEN+1, 0) < 0) { errorExit("send"); } printf("Send Rank: %d\n\n", desiredRank); - close(fdConn); + close(session_fd); } hdestroy(); - close(fdSock); + close(server_fd); return 0; } diff --git a/rank-swapper-agent/server.sh b/rank-swapper-agent/server.sh index 563310e0b6e25784aad1b76b95f129882b663d21..6f0f70c2785f61c4847dd0bb41680c1975d1602a 100755 --- a/rank-swapper-agent/server.sh +++ b/rank-swapper-agent/server.sh @@ -1,5 +1,5 @@ #!/bin/bash -export OMPI=/home/openmpi-install -export DPM_AGENT_SOCKET=$OMPI/dpm_socket_socket +OMPI=/home/openmpi-install; export OMPI +DPM_AGENT_PORT=25000; export DPM_AGENT_PORT ./locserv