Skip to content
Snippets Groups Projects
Commit fa822f3b authored by FKHals's avatar FKHals
Browse files

Use TCP connection from MPI to node-agent

instead of the previously used IPC connection (linux domain sockets) in
preparation to connect with the new asynchronous node-agent.
parent 8b411e81
Branches
No related tags found
1 merge request!1Use TCP connection from MPI to node-agent
......@@ -96,23 +96,43 @@ static void errorExit(char* msg) {
// TODO: Change int return to size_t!
static int getProcessAgentRank(uint32_t jobid, uint32_t vpid, size_t size) {
struct sockaddr_un strAddr;
socklen_t lenAddr;
int fdSock;
struct addrinfo hints;
struct addrinfo *result, *rp;
const char* host_ip = "localhost";
const char * agent_port = getenv("DPM_AGENT_PORT");
if (NULL == agent_port) {
errorExit("Could not find DPM_AGENT_PORT env");
}
memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM; //opting for reliable sequenced socket type
hints.ai_flags = 0;
hints.ai_protocol = IPPROTO_TCP;
int error_code = getaddrinfo(host_ip, agent_port, &hints, &result);
if (0 != error_code) {
errorExit("Could not get addrinfo!");
}
if ((fdSock=socket(PF_UNIX, SOCK_STREAM, 0)) < 0) {
errorExit("socket");
int socket_fd = 0;
for (rp = result; rp != NULL; rp = rp->ai_next) {
socket_fd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
if (socket_fd == -1) {
continue;
}
strAddr.sun_family=AF_LOCAL; /* Unix domain */
const char * socket_path = getenv("DPM_AGENT_SOCKET"); // check for success
if (NULL == socket_path) {
errorExit("Could not find DPM_AGENT_SOCKET env");
if (0 == connect(socket_fd, rp->ai_addr, rp->ai_addrlen)) {
break; //successfully connected
} else {
close(socket_fd); //close socket when we cannot connect
}
}
strcpy(strAddr.sun_path, socket_path);
lenAddr=sizeof(strAddr.sun_family)+strlen(strAddr.sun_path);
if (connect(fdSock, (struct sockaddr*)&strAddr, lenAddr) !=0 ) {
errorExit("connect");
freeaddrinfo(result); //free result
// exit if the result-linked-list has been traversed until the end without a successful
// connection
if (NULL == rp) {
errorExit("Could not connect to host. Host not available or host/port invalid.");
}
pid_t pid = getpid();
......@@ -121,16 +141,16 @@ static int getProcessAgentRank(uint32_t jobid, uint32_t vpid, size_t size) {
snprintf(info_to_send, BUFFLEN+1,
"%d,%u,%u,%zu",
pid, vpid, jobid, size);
if (send(fdSock, info_to_send, BUFFLEN+1, 0) < 0) {
if (send(socket_fd, info_to_send, BUFFLEN+1, 0) < 0) {
errorExit("send");
}
char rank_to_recv[BUFFLEN];
recv(fdSock, rank_to_recv, BUFFLEN+1, 0);
recv(socket_fd, rank_to_recv, BUFFLEN+1, 0);
int received_rank = (int)strtol(rank_to_recv, NULL, 0);
printf("Received from server: %d\n", received_rank);
close(fdSock);
close(socket_fd);
return received_rank;
}
......
export OMPI := /home/openmpi-install
export OMPI := ../openmpi-install
# BEWARE: the order (custom OpenMPI first/prepended) is important since the
# first gets picked and there may be other MPI installations on the system!
export PATH := $(OMPI)/bin:$(PATH)
......
......@@ -2,8 +2,8 @@
# the export not done directly since not all shells allow the value of a
# variable to be set at the same time it is exported
OMPI=/home/openmpi-install; export OMPI
DPM_AGENT_SOCKET=$OMPI/dpm_socket_socket; export DPM_AGENT_SOCKET
OMPI=../openmpi-install; export OMPI
DPM_AGENT_PORT=25000; export DPM_AGENT_PORT
# BEWARE: the order (custom OpenMPI first/prepended) is important since the
# first gets picked and there may be other MPI installations on the system!
PATH=$OMPI/bin:$PATH; export PATH
......
......@@ -18,7 +18,9 @@
#include <stdint.h>
#include <string.h>
#include <stdlib.h>
#include <netdb.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/un.h>
#include <time.h>
#include <unistd.h>
......@@ -123,32 +125,48 @@ uint32_t getDesiredVpid(uint32_t jobid, uint32_t vpid, size_t size, HashStorage*
int main(void) {
// TCP server based on http://www.microhowto.info/howto/listen_for_and_accept_tcp_connections_in_c.html
char client_message[BUFFLEN];
struct sockaddr_un strAddr;
socklen_t lenAddr;
int fdSock;
int fdConn;
const char* host_ip = "localhost";
const char * agent_port = getenv("DPM_AGENT_PORT");
if (NULL == agent_port) {
errorExit("Could not find DPM_AGENT_PORT env");
}
if ((fdSock=socket(PF_UNIX, SOCK_STREAM, 0)) < 0) {
errorExit("socket");
struct addrinfo hints;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = 0;
hints.ai_flags = AI_PASSIVE|AI_ADDRCONFIG;
struct addrinfo* result = 0;
int error_code = getaddrinfo(host_ip, agent_port, &hints, &result);
if (0 != error_code) {
errorExit("Could not get addrinfo!");
}
// create unix domain socket in the installation directory of OpenMPI
const char * socket_path = getenv("DPM_AGENT_SOCKET"); // check for success
if (NULL == socket_path) {
errorExit("Could not find DPM_AGENT_SOCKET env");
int server_fd = socket(result->ai_family,result->ai_socktype,result->ai_protocol);
if (-1 == server_fd) {
errorExit("Could not create server socket!");
}
unlink (socket_path); /* Make sure that socket_path does not exist yet */
strAddr.sun_family=AF_LOCAL; /* Unix Domain */
strcpy(strAddr.sun_path, socket_path);
lenAddr=sizeof(strAddr.sun_family)+strlen(strAddr.sun_path);
if (bind(fdSock, (struct sockaddr*)&strAddr, lenAddr) != 0) {
errorExit("bind");
// FIXME Remove after debugging
int reuseaddr = 1;
if (-1 == setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr, sizeof(reuseaddr))) {
errorExit("Could set SO_REUSEADDR!");
}
if (listen(fdSock, MAX_CONNECTIONS) != 0) {
errorExit("listen");
if (-1 == bind(server_fd, result->ai_addr, result->ai_addrlen)) {
errorExit("Could not bind address!");
}
freeaddrinfo(result);
if (0 != listen(server_fd, MAX_CONNECTIONS)) {
errorExit("failed to listen for connections");
}
// create hashtable with two different types of key-data-combinations:
......@@ -159,11 +177,12 @@ int main(void) {
// initialize hsearch storage
struct HashStorage hstorage = { .key_index = 0, .data_index = 0 };
while ((fdConn=accept(fdSock, (struct sockaddr*)&strAddr, &lenAddr)) >= 0) {
int session_fd = 0;
while ((session_fd = accept(server_fd, NULL, NULL)) >= 0) {
// since recv() seems to be atomar, no concurrent behaviour must be
// considered
recv(fdConn , client_message , BUFFLEN , 0);
recv(session_fd, client_message, BUFFLEN, 0);
printf("%s\n", client_message);
pid_t pid;
......@@ -177,16 +196,16 @@ int main(void) {
char rankString[RANK_STR_LEN];
snprintf(rankString, RANK_STR_LEN+1, "%u", desiredRank);
if (send(fdConn, rankString, RANK_STR_LEN+1, 0) < 0) {
if (send(session_fd, rankString, RANK_STR_LEN+1, 0) < 0) {
errorExit("send");
}
printf("Send Rank: %d\n\n", desiredRank);
close(fdConn);
close(session_fd);
}
hdestroy();
close(fdSock);
close(server_fd);
return 0;
}
......
#!/bin/bash
export OMPI=/home/openmpi-install
export DPM_AGENT_SOCKET=$OMPI/dpm_socket_socket
OMPI=../openmpi-install; export OMPI
DPM_AGENT_PORT=25000; export DPM_AGENT_PORT
./locserv
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment