Skip to content
Snippets Groups Projects
Commit e531457b authored by René Pascal Becker's avatar René Pascal Becker
Browse files

Add dmodex daemon

parent 84dd119d
No related branches found
No related tags found
No related merge requests found
......@@ -54,9 +54,11 @@
#include <endian.h>
#include <netdb.h>
#include <pmix_common.h>
#include <pmix_deprecated.h>
#include <pmix_server.h>
#include <pthread.h>
#include <string.h>
#include <sys/poll.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/types.h>
......@@ -380,6 +382,102 @@ int lookup_from_dpm_agent(const pmix_proc_t *proc, const pmix_info_t info[],
return 1;
}
// DMODEX DAEMON
void _handle_dmodex_result(pmix_status_t status, char *data, size_t sz,
void *cbdata) {
FILE *ptr = fopen("/home/dmodex", "a");
fprintf(ptr, "Running Callback\n");
fprintf(ptr, "Status: %d\n", status);
fprintf(ptr, "Blob: %s\n", data);
fclose(ptr);
}
bool stop_dmodex_daemon = false;
void *dmodex_daemon(void *dpm_socket) {
int socket_fd = (int)(size_t)dpm_socket;
stop_dmodex_daemon = false;
if (!socket_fd)
return NULL;
struct pollfd fd;
fd.fd = socket_fd;
fd.events = POLLIN;
while (!stop_dmodex_daemon) {
FILE *ptr = fopen("/home/dmodex", "a");
fprintf(ptr, "POLL %d\n", socket_fd);
fclose(ptr);
int rv = 0;
if ((rv = poll(&fd, 1, 100) > 0)) {
FILE *ptr = fopen("/home/dmodex", "a");
fprintf(ptr, "RESULT: %d\n", fd.revents);
fclose(ptr);
if (fd.revents & POLLIN) {
FILE *ptr = fopen("/home/dmodex", "a");
fprintf(ptr, "POLLIN\n");
fclose(ptr);
// New message to be read
char *data_msg = _receive_message(socket_fd);
if (!data_msg) {
continue;
}
// Fetch message string
char *msg = _cpy_message_data(data_msg);
_destroy_message(data_msg);
ptr = fopen("/home/dmodex", "a");
fprintf(ptr, "%s\n", msg);
fclose(ptr);
// Parse Data
char *nspace = strtok(msg, ",");
if (nspace == NULL) {
_destroy_message(msg);
continue;
}
char *rank = strtok(NULL, ",");
if (rank == NULL) {
_destroy_message(msg);
continue;
}
ptr = fopen("/home/dmodex", "a");
fprintf(ptr, "Received Request %s,%s\n", nspace, rank);
fclose(ptr);
// Fill proc and submit dmodex request to the server
pmix_proc_t proc;
sscanf(rank, "%u", &proc.rank);
strlcpy(proc.nspace, nspace, PMIX_MAX_NSLEN);
// Call the server
size_t cbval = ((size_t)socket_fd) << 32 | (size_t)proc.rank;
int error = PMIx_server_dmodex_request(&proc, _handle_dmodex_result,
(void *)cbval);
ptr = fopen("/home/dmodex", "a");
fprintf(ptr, "Request error code: %d\n", error);
fclose(ptr);
_destroy_message(msg);
} else if (fd.revents & POLLERR || fd.revents & POLLHUP ||
fd.revents & POLLNVAL) {
FILE *ptr = fopen("/home/dmodex", "a");
fprintf(ptr, "ERROR: %d, %d, %d\n", fd.revents & POLLERR,
fd.revents & POLLHUP, fd.revents & POLLNVAL);
fclose(ptr);
stop_dmodex_daemon = true;
}
}
}
close(socket_fd);
return NULL;
}
// CONNECT ----
struct CollectConnectInfo {
......@@ -400,13 +498,19 @@ void *_collect_connect_receiver(void *data) {
char *result = _cpy_message_data(data_msg);
_destroy_message(data_msg);
// Check result
if (strcmp(result, "0") != 0 || strlen(result) != 1)
info->cbfunc(PMIX_ERROR, info->cbdata);
else
FILE *ptr = fopen("/home/pmix", "a");
fprintf(ptr, "Running Callback %d\n", info->socket_fd);
fclose(ptr);
info->cbfunc(PMIX_SUCCESS, info->cbdata);
_destroy_message(result);
// Startup the dmodex thread
pthread_t th;
pthread_create(&th, NULL, dmodex_daemon, (void *)((size_t)info->socket_fd));
pthread_detach(th);
free(info);
return NULL;
// Cleanup
collectConnectCleanup:
close(info->socket_fd);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment