Skip to content
Snippets Groups Projects
Commit 857af6a4 authored by FKHals's avatar FKHals
Browse files

Use TCP connection from MPI to node-agent

instead of the previously used IPC connection (linux domain sockets) in
preparation to connect with the new asynchronous node-agent.
parent 8b411e81
No related branches found
No related tags found
1 merge request!3Rank modification
...@@ -96,41 +96,61 @@ static void errorExit(char* msg) { ...@@ -96,41 +96,61 @@ static void errorExit(char* msg) {
// TODO: Change int return to size_t! // TODO: Change int return to size_t!
static int getProcessAgentRank(uint32_t jobid, uint32_t vpid, size_t size) { static int getProcessAgentRank(uint32_t jobid, uint32_t vpid, size_t size) {
struct sockaddr_un strAddr; struct addrinfo hints;
socklen_t lenAddr; struct addrinfo *result, *rp;
int fdSock; const char* host_ip = "localhost";
const char * agent_port = getenv("DPM_AGENT_PORT");
if (NULL == agent_port) {
errorExit("Could not find DPM_AGENT_PORT env");
}
memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM; //opting for reliable sequenced socket type
hints.ai_flags = 0;
hints.ai_protocol = IPPROTO_TCP;
int error_code = getaddrinfo(host_ip, agent_port, &hints, &result);
if (0 != error_code) {
errorExit("Could not get addrinfo!");
}
if ((fdSock=socket(PF_UNIX, SOCK_STREAM, 0)) < 0) { int socket_fd = 0;
errorExit("socket"); for (rp = result; rp != NULL; rp = rp->ai_next) {
socket_fd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
if (socket_fd == -1) {
continue;
} }
strAddr.sun_family=AF_LOCAL; /* Unix domain */ if (0 == connect(socket_fd, rp->ai_addr, rp->ai_addrlen)) {
const char * socket_path = getenv("DPM_AGENT_SOCKET"); // check for success break; //successfully connected
if (NULL == socket_path) { } else {
errorExit("Could not find DPM_AGENT_SOCKET env"); close(socket_fd); //close socket when we cannot connect
}
} }
strcpy(strAddr.sun_path, socket_path); freeaddrinfo(result); //free result
lenAddr=sizeof(strAddr.sun_family)+strlen(strAddr.sun_path); // exit if the result-linked-list has been traversed until the end without a successful
if (connect(fdSock, (struct sockaddr*)&strAddr, lenAddr) !=0 ) { // connection
errorExit("connect"); if (NULL == rp) {
errorExit("Could not connect to host. Host not available or host/port invalid.");
} }
pid_t pid = getpid(); pid_t pid = getpid();
char info_to_send[BUFFLEN]; char info_to_send[BUFFLEN];
snprintf(info_to_send, BUFFLEN+1, snprintf(info_to_send, BUFFLEN,
"%d,%u,%u,%zu", "%d,%u,%u,%zu",
pid, vpid, jobid, size); pid, vpid, jobid, size);
if (send(fdSock, info_to_send, BUFFLEN+1, 0) < 0) { if (send(socket_fd, info_to_send, BUFFLEN, 0) < 0) {
errorExit("send"); errorExit("send");
} }
char rank_to_recv[BUFFLEN]; char rank_to_recv[BUFFLEN];
recv(fdSock, rank_to_recv, BUFFLEN+1, 0); recv(socket_fd, rank_to_recv, BUFFLEN, 0);
int received_rank = (int)strtol(rank_to_recv, NULL, 0); int received_rank = (int)strtol(rank_to_recv, NULL, 0);
printf("Received from server: %d\n", received_rank); printf("Received from server: %d\n", received_rank);
close(fdSock); close(socket_fd);
return received_rank; return received_rank;
} }
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
# the export not done directly since not all shells allow the value of a # the export not done directly since not all shells allow the value of a
# variable to be set at the same time it is exported # variable to be set at the same time it is exported
OMPI=/home/openmpi-install; export OMPI OMPI=/home/openmpi-install; export OMPI
DPM_AGENT_SOCKET=$OMPI/dpm_socket_socket; export DPM_AGENT_SOCKET DPM_AGENT_PORT=25000; export DPM_AGENT_PORT
# BEWARE: the order (custom OpenMPI first/prepended) is important since the # BEWARE: the order (custom OpenMPI first/prepended) is important since the
# first gets picked and there may be other MPI installations on the system! # first gets picked and there may be other MPI installations on the system!
PATH=$OMPI/bin:$PATH; export PATH PATH=$OMPI/bin:$PATH; export PATH
......
...@@ -18,7 +18,9 @@ ...@@ -18,7 +18,9 @@
#include <stdint.h> #include <stdint.h>
#include <string.h> #include <string.h>
#include <stdlib.h> #include <stdlib.h>
#include <netdb.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <netinet/in.h>
#include <sys/un.h> #include <sys/un.h>
#include <time.h> #include <time.h>
#include <unistd.h> #include <unistd.h>
...@@ -123,32 +125,48 @@ uint32_t getDesiredVpid(uint32_t jobid, uint32_t vpid, size_t size, HashStorage* ...@@ -123,32 +125,48 @@ uint32_t getDesiredVpid(uint32_t jobid, uint32_t vpid, size_t size, HashStorage*
int main(void) { int main(void) {
// TCP server based on http://www.microhowto.info/howto/listen_for_and_accept_tcp_connections_in_c.html
char client_message[BUFFLEN]; char client_message[BUFFLEN];
struct sockaddr_un strAddr; const char* host_ip = "localhost";
socklen_t lenAddr;
int fdSock; const char * agent_port = getenv("DPM_AGENT_PORT");
int fdConn; if (NULL == agent_port) {
errorExit("Could not find DPM_AGENT_PORT env");
}
if ((fdSock=socket(PF_UNIX, SOCK_STREAM, 0)) < 0) { struct addrinfo hints;
errorExit("socket"); memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = 0;
hints.ai_flags = AI_PASSIVE|AI_ADDRCONFIG;
struct addrinfo* result = 0;
int error_code = getaddrinfo(host_ip, agent_port, &hints, &result);
if (0 != error_code) {
errorExit("Could not get addrinfo!");
} }
// create unix domain socket in the installation directory of OpenMPI int server_fd = socket(result->ai_family,result->ai_socktype,result->ai_protocol);
const char * socket_path = getenv("DPM_AGENT_SOCKET"); // check for success if (-1 == server_fd) {
if (NULL == socket_path) { errorExit("Could not create server socket!");
errorExit("Could not find DPM_AGENT_SOCKET env");
} }
unlink (socket_path); /* Make sure that socket_path does not exist yet */
strAddr.sun_family=AF_LOCAL; /* Unix Domain */ // FIXME Remove after debugging
strcpy(strAddr.sun_path, socket_path); int reuseaddr = 1;
lenAddr=sizeof(strAddr.sun_family)+strlen(strAddr.sun_path); if (-1 == setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr, sizeof(reuseaddr))) {
if (bind(fdSock, (struct sockaddr*)&strAddr, lenAddr) != 0) { errorExit("Could set SO_REUSEADDR!");
errorExit("bind");
} }
if (listen(fdSock, MAX_CONNECTIONS) != 0) { if (-1 == bind(server_fd, result->ai_addr, result->ai_addrlen)) {
errorExit("listen"); errorExit("Could not bind address!");
}
freeaddrinfo(result);
if (0 != listen(server_fd, MAX_CONNECTIONS)) {
errorExit("failed to listen for connections");
} }
// create hashtable with two different types of key-data-combinations: // create hashtable with two different types of key-data-combinations:
...@@ -159,11 +177,12 @@ int main(void) { ...@@ -159,11 +177,12 @@ int main(void) {
// initialize hsearch storage // initialize hsearch storage
struct HashStorage hstorage = { .key_index = 0, .data_index = 0 }; struct HashStorage hstorage = { .key_index = 0, .data_index = 0 };
while ((fdConn=accept(fdSock, (struct sockaddr*)&strAddr, &lenAddr)) >= 0) { int session_fd = 0;
while ((session_fd = accept(server_fd, NULL, NULL)) >= 0) {
// since recv() seems to be atomar, no concurrent behaviour must be // since recv() seems to be atomar, no concurrent behaviour must be
// considered // considered
recv(fdConn , client_message , BUFFLEN , 0); recv(session_fd, client_message, BUFFLEN, 0);
printf("%s\n", client_message); printf("%s\n", client_message);
pid_t pid; pid_t pid;
...@@ -177,16 +196,16 @@ int main(void) { ...@@ -177,16 +196,16 @@ int main(void) {
char rankString[RANK_STR_LEN]; char rankString[RANK_STR_LEN];
snprintf(rankString, RANK_STR_LEN+1, "%u", desiredRank); snprintf(rankString, RANK_STR_LEN+1, "%u", desiredRank);
if (send(fdConn, rankString, RANK_STR_LEN+1, 0) < 0) { if (send(session_fd, rankString, RANK_STR_LEN+1, 0) < 0) {
errorExit("send"); errorExit("send");
} }
printf("Send Rank: %d\n\n", desiredRank); printf("Send Rank: %d\n\n", desiredRank);
close(fdConn); close(session_fd);
} }
hdestroy(); hdestroy();
close(fdSock); close(server_fd);
return 0; return 0;
} }
......
#!/bin/bash #!/bin/bash
export OMPI=/home/openmpi-install OMPI=/home/openmpi-install; export OMPI
export DPM_AGENT_SOCKET=$OMPI/dpm_socket_socket DPM_AGENT_PORT=25000; export DPM_AGENT_PORT
./locserv ./locserv
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment