Skip to content
Snippets Groups Projects
Commit 3b6d6cc7 authored by René Pascal Becker's avatar René Pascal Becker
Browse files

Receive Publish Status

parent 9fdfce51
No related branches found
No related tags found
No related merge requests found
...@@ -163,10 +163,42 @@ int _send_message(int socket_fd, int message_id, char* data) ...@@ -163,10 +163,42 @@ int _send_message(int socket_fd, int message_id, char* data)
return 1; return 1;
sendMessageFail: sendMessageFail:
_destroy_message(msg); _destroy_message(msg);
close(socket_fd);
return 0; return 0;
} }
char* _receive_message(int socket_fd)
{
uint32_t msg_length = 0;
if (recv(socket_fd, &msg_length, sizeof(msg_length), 0) < 0)
return NULL;
msg_length = le32toh(msg_length);
char* data = (char*)xmalloc(msg_length);
if (recv(socket_fd, data, msg_length, 0) < 0)
{
free(data);
return NULL;
}
return data;
}
char* _cpy_message_data(char* json)
{
// Look for data in the JSON message.
const char msg_data_key[] = "msg_data\":\"";
char* start = strstr(json, msg_data_key) + strlen(msg_data_key);
// Get length of data
int len = 0;
while (start[len] != '\"')
len++;
len++;
char* result = (char*)xmalloc(len);
strlcpy(result, start, len);
return result;
}
struct PmixInfoData _parse_info_data(const pmix_info_t info[], size_t ninfo) struct PmixInfoData _parse_info_data(const pmix_info_t info[], size_t ninfo)
{ {
// Setup default info data struct // Setup default info data struct
...@@ -220,8 +252,28 @@ int publish_to_dpm_agent(const pmix_proc_t *proc, const pmix_info_t info[], size ...@@ -220,8 +252,28 @@ int publish_to_dpm_agent(const pmix_proc_t *proc, const pmix_info_t info[], size
"%d,%s,%zu,%s,%s", (int)STORE_PUBLISH, "%d,%s,%zu,%s,%s", (int)STORE_PUBLISH,
(const char*)proc->nspace, (size_t)proc->rank, info_data.key, info_data.value); (const char*)proc->nspace, (size_t)proc->rank, info_data.key, info_data.value);
if (!_send_message(socket_fd, DPM_AGENT_KEYSTORE_MSG, data)) if (!_send_message(socket_fd, DPM_AGENT_KEYSTORE_MSG, data)) {
close(socket_fd);
return 0; return 0;
}
// Receive result
char* status_msg = _receive_message(socket_fd);
if (!status_msg) {
close(socket_fd);
return 0;
}
char* status_data = _cpy_message_data(status_msg);
_destroy_message(status_msg);
// Check result
if (strcmp(status_data, "0") != 0 || strlen(status_data) != 1) {
_destroy_message(status_data);
close(socket_fd);
return 0;
}
_destroy_message(status_data);
close(socket_fd); close(socket_fd);
return 1; return 1;
} }
...@@ -261,8 +313,10 @@ int lookup_from_dpm_agent(const pmix_proc_t *proc, const pmix_info_t info[], siz ...@@ -261,8 +313,10 @@ int lookup_from_dpm_agent(const pmix_proc_t *proc, const pmix_info_t info[], siz
"%d,%s,%zu,%s", (int)STORE_LOOKUP, "%d,%s,%zu,%s", (int)STORE_LOOKUP,
(const char*)proc->nspace, (size_t)proc->rank, key); (const char*)proc->nspace, (size_t)proc->rank, key);
if (!_send_message(socket_fd, DPM_AGENT_KEYSTORE_MSG, data)) if (!_send_message(socket_fd, DPM_AGENT_KEYSTORE_MSG, data)) {
close(socket_fd);
return 0; return 0;
}
close(socket_fd); close(socket_fd);
return 1; return 1;
} }
...@@ -336,7 +390,8 @@ void _launch_app(const pmix_proc_t* parent, const pmix_app_t* app) ...@@ -336,7 +390,8 @@ void _launch_app(const pmix_proc_t* parent, const pmix_app_t* app)
char** env = env_array_copy(app->env); char** env = env_array_copy(app->env);
env_array_append_fmt(&env, "HOME", "/root"); env_array_append_fmt(&env, "HOME", "/root");
env_array_append_fmt(&env, "DPM_AGENT_PORT", "25000"); env_array_append_fmt(&env, "DPM_AGENT_PORT", "25000");
env_array_append_fmt(&env, "SLURM_VRM_JOBID", "dynamic-%d", pmixp_info_jobid()); env_array_append_fmt(&env, "SLURM_VRM_JOBID", "%u", pmixp_info_jobid());
execve(SLURM_PREFIX"/bin/srun", argv, env); execve(SLURM_PREFIX"/bin/srun", argv, env);
abort(); abort();
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment