Skip to content
Snippets Groups Projects
Commit 84dd119d authored by René Pascal Becker's avatar René Pascal Becker
Browse files

Debug Statements and Connect

parent 69755f67
No related branches found
No related tags found
No related merge requests found
......@@ -143,6 +143,19 @@ static pmix_status_t _fencenb_fn(const pmix_proc_t procs_v2[], size_t nprocs,
static pmix_status_t _dmodex_fn(const pmix_proc_t *proc,
const pmix_info_t info[], size_t ninfo,
pmix_modex_cbfunc_t cbfunc, void *cbdata) {
THESIS_LOG
ptr = fopen("/home/pmix", "a");
fprintf(ptr, "Proc: (NS: %s, Rank: %u)\n", proc->nspace, proc->rank);
for (size_t i = 0; i < ninfo; i++) {
pmix_info_t *info_el = &info[i];
fprintf(ptr, "Info %lu: %s\n", i, info_el->key);
if (info_el->value.type == PMIX_STRING) {
fprintf(ptr, "\tValue: %s\n", info_el->value.data.string);
}
}
fclose(ptr);
int rc;
rc = pmixp_dmdx_get(proc->nspace, proc->rank, cbfunc, cbdata);
......@@ -220,7 +233,8 @@ static pmix_status_t _spawn_fn(const pmix_proc_t *proc,
}
fclose(ptr);
pmixp_spawn(proc, apps, napps, cbfunc, cbdata);
if (!pmixp_spawn(proc, apps, napps, cbfunc, cbdata))
return PMIX_ERROR;
PMIXP_DEBUG("called");
return PMIX_OPERATION_SUCCEEDED;
......@@ -245,9 +259,9 @@ static pmix_status_t _connect_fn(const pmix_proc_t procs[], size_t nprocs,
}
fclose(ptr);
collect_connected(procs, nprocs, info, ninfo, cbfunc, cbdata);
if (collect_connected(procs, nprocs, info, ninfo, cbfunc, cbdata))
return PMIX_SUCCESS;
return PMIX_ERROR;
}
static pmix_status_t _disconnect_fn(const pmix_proc_t procs[], size_t nprocs,
......
This diff is collapsed.
This diff is collapsed.
......@@ -47,6 +47,10 @@
fflush(stdout); \
debug("%s [%d]: %s:%d: " format "", pmixp_info_hostname(), \
pmixp_info_nodeid(), THIS_FILE, __LINE__, ##args); \
FILE *__ptr = fopen("/home/pmixp_debug", "a"); \
fprintf(__ptr, "%s [%d]: %s:%d: " format "\n", pmixp_info_hostname(), \
pmixp_info_nodeid(), THIS_FILE, __LINE__, ##args); \
fclose(__ptr); \
}
#define PMIXP_ERROR_STD(format, args...) \
......@@ -65,6 +69,11 @@
error(" %s: %s: %s [%d]: %s:%d: " format, plugin_type, __func__, \
pmixp_info_hostname(), pmixp_info_nodeid(), THIS_FILE, __LINE__, \
##args); \
FILE *___ptr = fopen("/home/pmixp_debug", "a"); \
fprintf(___ptr, "ERROR: %s: %s: %s [%d]: %s:%d: " format "\n", \
plugin_type, __func__, pmixp_info_hostname(), pmixp_info_nodeid(), \
THIS_FILE, __LINE__, ##args); \
fclose(___ptr); \
}
#define PMIXP_ABORT(format, args...) \
......
......@@ -35,18 +35,15 @@
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
\*****************************************************************************/
#include "pmixp_common.h"
#include "pmixp_dmdx.h"
#include "pmixp_server.h"
#include "pmixp_client.h"
#include "pmixp_common.h"
#include "pmixp_server.h"
/* set default direct modex timeout to 10 sec */
#define DMDX_DEFAULT_TIMEOUT 10
typedef enum {
DMDX_REQUEST = 1,
DMDX_RESPONSE
} dmdx_type_t;
typedef enum { DMDX_REQUEST = 1, DMDX_RESPONSE } dmdx_type_t;
typedef struct {
uint32_t seq_num;
......@@ -68,8 +65,7 @@ typedef struct {
int rank;
} dmdx_caddy_t;
void _dmdx_free_caddy(dmdx_caddy_t *caddy)
{
void _dmdx_free_caddy(dmdx_caddy_t *caddy) {
if (NULL == caddy) {
/* nothing to do */
return;
......@@ -83,26 +79,22 @@ void _dmdx_free_caddy(dmdx_caddy_t *caddy)
static List _dmdx_requests;
static uint32_t _dmdx_seq_num = 1;
static void _respond_with_error(int seq_num, int nodeid,
char *sender_ns, int status);
static void _respond_with_error(int seq_num, int nodeid, char *sender_ns,
int status);
int pmixp_dmdx_init(void)
{
int pmixp_dmdx_init(void) {
_dmdx_requests = list_create(xfree_ptr);
_dmdx_seq_num = 1;
return SLURM_SUCCESS;
}
int pmixp_dmdx_finalize(void)
{
int pmixp_dmdx_finalize(void) {
list_destroy(_dmdx_requests);
return 0;
}
static void _setup_header(buf_t *buf, dmdx_type_t t,
const char *nspace, int rank, int status)
{
static void _setup_header(buf_t *buf, dmdx_type_t t, const char *nspace,
int rank, int status) {
char *str;
/* 1. pack message type */
unsigned char type = (char)t;
......@@ -125,8 +117,7 @@ static void _setup_header(buf_t *buf, dmdx_type_t t,
pack32((uint32_t)status, buf);
}
static int _read_type(buf_t *buf, dmdx_type_t *type)
{
static int _read_type(buf_t *buf, dmdx_type_t *type) {
unsigned char t;
int rc;
/* 1. unpack message type */
......@@ -138,9 +129,8 @@ static int _read_type(buf_t *buf, dmdx_type_t *type)
return SLURM_SUCCESS;
}
static int _read_info(buf_t *buf, char **ns, int *rank,
char **sender_ns, int *status)
{
static int _read_info(buf_t *buf, char **ns, int *rank, char **sender_ns,
int *status) {
uint32_t cnt, uint32_tmp;
int rc;
*ns = NULL;
......@@ -179,9 +169,8 @@ static int _read_info(buf_t *buf, char **ns, int *rank,
return SLURM_SUCCESS;
}
static void _respond_with_error(int seq_num, int nodeid,
char *sender_ns, int status)
{
static void _respond_with_error(int seq_num, int nodeid, char *sender_ns,
int status) {
buf_t *buf = create_buf(NULL, 0);
pmixp_ep_t ep;
int rc;
......@@ -193,19 +182,17 @@ static void _respond_with_error(int seq_num, int nodeid,
_setup_header(buf, DMDX_RESPONSE, sender_ns, -1, status);
/* send response */
PMIXP_DEBUG("Err response");
rc = pmixp_server_send_nb(&ep, PMIXP_MSG_DMDX, seq_num, buf,
pmixp_server_sent_buf_cb, buf);
if (SLURM_SUCCESS != rc) {
char *nodename = pmixp_info_job_host(nodeid);
PMIXP_ERROR("Cannot send direct modex error response to %s",
nodename);
PMIXP_ERROR("Cannot send direct modex error response to %s", nodename);
xfree(nodename);
}
}
static void _dmdx_pmix_cb(int status, char *data, size_t sz,
void *cbdata)
{
static void _dmdx_pmix_cb(int status, char *data, size_t sz, void *cbdata) {
dmdx_caddy_t *caddy = (dmdx_caddy_t *)cbdata;
buf_t *buf = pmixp_server_buf_new();
pmixp_ep_t ep;
......@@ -221,20 +208,19 @@ static void _dmdx_pmix_cb(int status, char *data, size_t sz,
/* send the request */
ep.type = PMIXP_EP_NOIDEID;
ep.ep.nodeid = caddy->sender_nodeid;
PMIXP_DEBUG("dmdx cb");
rc = pmixp_server_send_nb(&ep, PMIXP_MSG_DMDX, caddy->seq_num, buf,
pmixp_server_sent_buf_cb, buf);
if (SLURM_SUCCESS != rc) {
char *nodename = pmixp_info_job_host(caddy->sender_nodeid);
/* not much we can do here. Caller will react by timeout */
PMIXP_ERROR("Cannot send direct modex response to %s",
nodename);
PMIXP_ERROR("Cannot send direct modex response to %s", nodename);
}
_dmdx_free_caddy(caddy);
}
int pmixp_dmdx_get(const pmix_nspace_t nspace, int rank,
void *cbfunc, void *cbdata)
{
int pmixp_dmdx_get(const pmix_nspace_t nspace, int rank, void *cbfunc,
void *cbdata) {
dmdx_req_info_t *req;
buf_t *buf;
int rc;
......@@ -265,25 +251,24 @@ int pmixp_dmdx_get(const pmix_nspace_t nspace, int rank,
list_append(_dmdx_requests, req);
/* send the request */
PMIXP_DEBUG("dmdx get");
rc = pmixp_server_send_nb(&ep, PMIXP_MSG_DMDX, seq, buf,
pmixp_server_sent_buf_cb, buf);
/* check the return status */
if (SLURM_SUCCESS != rc) {
char *nodename = pmixp_info_job_host(ep.ep.nodeid);
PMIXP_ERROR("Cannot send direct modex request to %s, size %d",
nodename, get_buf_offset(buf));
PMIXP_ERROR("Cannot send direct modex request to %s, size %d", nodename,
get_buf_offset(buf));
xfree(nodename);
pmixp_lib_modex_invoke(cbfunc, SLURM_ERROR, NULL, 0,
cbdata, NULL, NULL);
pmixp_lib_modex_invoke(cbfunc, SLURM_ERROR, NULL, 0, cbdata, NULL, NULL);
rc = SLURM_ERROR;
}
return rc;
}
static void _dmdx_req(buf_t *buf, int nodeid, uint32_t seq_num)
{
static void _dmdx_req(buf_t *buf, int nodeid, uint32_t seq_num) {
int rank, rc;
int status;
char *ns = NULL, *sender_ns = NULL;
......@@ -307,8 +292,7 @@ static void _dmdx_req(buf_t *buf, int nodeid, uint32_t seq_num)
char *nodename = pmixp_info_job_host(nodeid);
PMIXP_ERROR("Bad request from %s: asked for nspace = %s, mine is %s",
nodename, ns, pmixp_info_namespace());
_respond_with_error(seq_num, nodeid, sender_ns,
PMIX_ERR_INVALID_NAMESPACE);
_respond_with_error(seq_num, nodeid, sender_ns, PMIX_ERR_INVALID_NAMESPACE);
xfree(nodename);
goto exit;
}
......@@ -316,10 +300,10 @@ static void _dmdx_req(buf_t *buf, int nodeid, uint32_t seq_num)
nsptr = pmixp_nspaces_local();
if (nsptr->ntasks <= rank) {
char *nodename = pmixp_info_job_host(nodeid);
PMIXP_ERROR("Bad request from %s: nspace \"%s\" has only %d ranks, asked for %d",
PMIXP_ERROR(
"Bad request from %s: nspace \"%s\" has only %d ranks, asked for %d",
nodename, ns, nsptr->ntasks, rank);
_respond_with_error(seq_num, nodeid, sender_ns,
PMIX_ERR_BAD_PARAM);
_respond_with_error(seq_num, nodeid, sender_ns, PMIX_ERR_BAD_PARAM);
xfree(nodename);
goto exit;
}
......@@ -344,9 +328,9 @@ static void _dmdx_req(buf_t *buf, int nodeid, uint32_t seq_num)
(void *)caddy);
if (SLURM_SUCCESS != rc) {
char *nodename = pmixp_info_job_host(nodeid);
PMIXP_ERROR("Can't request modex data from libpmix-server, requesting host = %s, nspace = %s, rank = %d, rc = %d",
nodename, caddy->proc.nspace,
caddy->proc.rank, rc);
PMIXP_ERROR("Can't request modex data from libpmix-server, requesting host "
"= %s, nspace = %s, rank = %d, rc = %d",
nodename, caddy->proc.nspace, caddy->proc.rank, rc);
_respond_with_error(seq_num, nodeid, caddy->sender_ns, rc);
_dmdx_free_caddy(caddy);
xfree(nodename);
......@@ -359,15 +343,13 @@ exit:
* anyway. We've notified libpmix, that's enough */
}
static int _dmdx_req_cmp(void *x, void *key)
{
static int _dmdx_req_cmp(void *x, void *key) {
dmdx_req_info_t *req = (dmdx_req_info_t *)x;
uint32_t seq_num = *((uint32_t *)key);
return (req->seq_num == seq_num);
}
static void _dmdx_resp(buf_t *buf, int nodeid, uint32_t seq_num)
{
static void _dmdx_resp(buf_t *buf, int nodeid, uint32_t seq_num) {
dmdx_req_info_t *req;
int rank, rc = SLURM_SUCCESS;
int status;
......@@ -381,8 +363,8 @@ static void _dmdx_resp(buf_t *buf, int nodeid, uint32_t seq_num)
if (NULL == req) {
char *nodename = pmixp_info_job_host(nodeid);
/* We haven't sent this request! */
PMIXP_ERROR("Received DMDX response with bad seq_num=%d from %s!",
seq_num, nodename);
PMIXP_ERROR("Received DMDX response with bad seq_num=%d from %s!", seq_num,
nodename);
list_iterator_destroy(it);
rc = SLURM_ERROR;
xfree(nodename);
......@@ -393,22 +375,22 @@ static void _dmdx_resp(buf_t *buf, int nodeid, uint32_t seq_num)
rc = _read_info(buf, &ns, &rank, &sender_ns, &status);
if (SLURM_SUCCESS != rc) {
/* notify libpmix about an error */
pmixp_lib_modex_invoke(req->cbfunc, SLURM_ERROR, NULL, 0,
req->cbdata, NULL, NULL);
pmixp_lib_modex_invoke(req->cbfunc, SLURM_ERROR, NULL, 0, req->cbdata, NULL,
NULL);
goto exit;
}
/* get the modex blob */
if (SLURM_SUCCESS != (rc = unpackmem_ptr(&data, &size, buf))) {
/* notify libpmix about an error */
pmixp_lib_modex_invoke(req->cbfunc, SLURM_ERROR, NULL, 0,
req->cbdata, NULL, NULL);
pmixp_lib_modex_invoke(req->cbfunc, SLURM_ERROR, NULL, 0, req->cbdata, NULL,
NULL);
goto exit;
}
/* call back to libpmix-server */
pmixp_lib_modex_invoke(req->cbfunc, status, data, size,
req->cbdata, pmixp_free_buf, (void *)buf);
pmixp_lib_modex_invoke(req->cbfunc, status, data, size, req->cbdata,
pmixp_free_buf, (void *)buf);
/* release tracker & list iterator */
req = NULL;
......@@ -424,8 +406,7 @@ exit:
* anyway. We've notified libpmix, that's enough */
}
void pmixp_dmdx_process(buf_t *buf, int nodeid, uint32_t seq)
{
void pmixp_dmdx_process(buf_t *buf, int nodeid, uint32_t seq) {
dmdx_type_t type = 0;
_read_type(buf, &type);
......@@ -445,8 +426,7 @@ void pmixp_dmdx_process(buf_t *buf, int nodeid, uint32_t seq)
}
}
void pmixp_dmdx_timeout_cleanup(void)
{
void pmixp_dmdx_timeout_cleanup(void) {
ListIterator it = list_iterator_create(_dmdx_requests);
dmdx_req_info_t *req = NULL;
time_t ts = time(NULL);
......@@ -456,21 +436,18 @@ void pmixp_dmdx_timeout_cleanup(void)
if ((ts - req->ts) > pmixp_info_timeout()) {
#ifndef NDEBUG
/* respond with the timeout to libpmix */
int nodeid = pmixp_nspace_resolve(req->nspace,
req->rank);
int nodeid = pmixp_nspace_resolve(req->nspace, req->rank);
char *nodename = pmixp_info_job_host(nodeid);
xassert(NULL != nodename);
PMIXP_ERROR("timeout: ns=%s, rank=%d, host=%s, ts=%lu",
req->nspace, req->rank,
(NULL != nodename) ? nodename : "unknown",
ts);
PMIXP_ERROR("timeout: ns=%s, rank=%d, host=%s, ts=%lu", req->nspace,
req->rank, (NULL != nodename) ? nodename : "unknown", ts);
if (NULL != nodename) {
xfree(nodename);
}
#endif
/* PMIX_ERR_TIMEOUT */
pmixp_lib_modex_invoke(req->cbfunc, SLURM_ERROR, NULL, 0,
req->cbdata, NULL, NULL);
pmixp_lib_modex_invoke(req->cbfunc, SLURM_ERROR, NULL, 0, req->cbdata,
NULL, NULL);
/* release tracker & list iterator */
list_delete_item(it);
}
......
......@@ -35,13 +35,12 @@
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
\*****************************************************************************/
#include "pmixp_common.h"
#include "pmixp_nspaces.h"
#include "pmixp_common.h"
pmixp_db_t _pmixp_nspaces;
static void _xfree_nspace(void *n)
{
static void _xfree_nspace(void *n) {
pmixp_namespace_t *nsptr = n;
xfree(nsptr->task_cnts);
xfree(nsptr->task_map);
......@@ -49,8 +48,7 @@ static void _xfree_nspace(void *n)
xfree(nsptr);
}
int pmixp_nspaces_init(void)
{
int pmixp_nspaces_init(void) {
char *mynspace, *task_map;
uint32_t nnodes, ntasks, *task_cnts;
int nodeid, rc;
......@@ -68,22 +66,20 @@ int pmixp_nspaces_init(void)
task_map = pmixp_info_task_map();
hl = pmixp_info_step_hostlist();
/* Initialize local namespace */
rc = pmixp_nspaces_add(mynspace, nnodes, nodeid, ntasks, task_cnts,
task_map, hostlist_copy(hl));
rc = pmixp_nspaces_add(mynspace, nnodes, nodeid, ntasks, task_cnts, task_map,
hostlist_copy(hl));
_pmixp_nspaces.local = pmixp_nspaces_find(mynspace);
return rc;
}
int pmixp_nspaces_finalize(void)
{
int pmixp_nspaces_finalize(void) {
list_destroy(_pmixp_nspaces.nspaces);
return 0;
}
int pmixp_nspaces_add(char *name, uint32_t nnodes, int node_id,
uint32_t ntasks, uint32_t *task_cnts,
char *task_map_packed, hostlist_t hl)
{
int pmixp_nspaces_add(char *name, uint32_t nnodes, int node_id, uint32_t ntasks,
uint32_t *task_cnts, char *task_map_packed,
hostlist_t hl) {
pmixp_namespace_t *nsptr = xmalloc(sizeof(pmixp_namespace_t));
int i;
......@@ -103,8 +99,8 @@ int pmixp_nspaces_add(char *name, uint32_t nnodes, int node_id,
nsptr->task_cnts[i] = task_cnts[i];
}
nsptr->task_map_packed = xstrdup(task_map_packed);
nsptr->task_map = unpack_process_mapping_flat(task_map_packed, nnodes,
ntasks, NULL);
nsptr->task_map =
unpack_process_mapping_flat(task_map_packed, nnodes, ntasks, NULL);
if (nsptr->task_map == NULL) {
xfree(nsptr->task_cnts);
xfree(nsptr->task_map_packed);
......@@ -115,14 +111,12 @@ int pmixp_nspaces_add(char *name, uint32_t nnodes, int node_id,
return SLURM_SUCCESS;
}
pmixp_namespace_t *pmixp_nspaces_local(void)
{
pmixp_namespace_t *pmixp_nspaces_local(void) {
xassert(_pmixp_nspaces.magic == PMIXP_NSPACE_DB_MAGIC);
return _pmixp_nspaces.local;
}
pmixp_namespace_t *pmixp_nspaces_find(const char *name)
{
pmixp_namespace_t *pmixp_nspaces_find(const char *name) {
xassert(_pmixp_nspaces.magic == PMIXP_NSPACE_DB_MAGIC);
ListIterator it = list_iterator_create(_pmixp_nspaces.nspaces);
......@@ -139,9 +133,8 @@ exit:
return nsptr;
}
hostlist_t pmixp_nspace_rankhosts(pmixp_namespace_t *nsptr, const uint32_t *ranks,
size_t nranks)
{
hostlist_t pmixp_nspace_rankhosts(pmixp_namespace_t *nsptr,
const uint32_t *ranks, size_t nranks) {
hostlist_t hl = hostlist_create("");
int i;
for (i = 0; i < nranks; i++) {
......@@ -155,8 +148,9 @@ hostlist_t pmixp_nspace_rankhosts(pmixp_namespace_t *nsptr, const uint32_t *rank
return hl;
}
int pmixp_nspace_resolve(const char *name, int rank)
{
int pmixp_nspace_resolve(const char *name, int rank) {
PMIXP_DEBUG("Resolve %s : %d", name, rank);
pmixp_namespace_t *nsptr;
xassert(_pmixp_nspaces.magic == PMIXP_NSPACE_DB_MAGIC);
......@@ -164,6 +158,7 @@ int pmixp_nspace_resolve(const char *name, int rank)
ListIterator it = list_iterator_create(_pmixp_nspaces.nspaces);
while ((nsptr = list_next(it))) {
xassert(nsptr->magic == PMIXP_NSPACE_MAGIC);
PMIXP_DEBUG("NS %s", nsptr->name);
if (0 == xstrcmp(nsptr->name, name)) {
break;
}
......
This diff is collapsed.
......@@ -382,6 +382,38 @@ int lookup_from_dpm_agent(const pmix_proc_t *proc, const pmix_info_t info[],
// CONNECT ----
struct CollectConnectInfo {
pmix_op_cbfunc_t cbfunc;
void *cbdata;
int socket_fd;
};
void *_collect_connect_receiver(void *data) {
struct CollectConnectInfo *info = (struct CollectConnectInfo *)data;
// Receive result
char *data_msg = _receive_message(info->socket_fd);
if (!data_msg)
goto collectConnectCleanup;
// Fetch message string
char *result = _cpy_message_data(data_msg);
_destroy_message(data_msg);
// Check result
if (strcmp(result, "0") != 0 || strlen(result) != 1)
info->cbfunc(PMIX_ERROR, info->cbdata);
else
info->cbfunc(PMIX_SUCCESS, info->cbdata);
_destroy_message(result);
// Cleanup
collectConnectCleanup:
close(info->socket_fd);
free(info);
return NULL;
}
int collect_connected(const pmix_proc_t procs[], size_t proc_count,
const pmix_info_t infos[], size_t info_count,
pmix_op_cbfunc_t cbfunc, void *cbdata) {
......@@ -427,9 +459,16 @@ int collect_connected(const pmix_proc_t procs[], size_t proc_count,
}
free(msg_data);
// TODO: thread to receive go!
close(socket_fd);
return 0;
// Create thread to receive asynchronously
pthread_t th1;
struct CollectConnectInfo *connect_info =
(struct CollectConnectInfo *)malloc(sizeof(struct CollectConnectInfo));
connect_info->cbdata = cbdata;
connect_info->cbfunc = cbfunc;
connect_info->socket_fd = socket_fd;
pthread_create(&th1, NULL, _collect_connect_receiver, connect_info);
pthread_detach(th1);
return 1;
}
// SPAWN ----
......@@ -468,7 +507,7 @@ void _populate_srun_argv(const pmix_app_t *app, int app_argc, char **argv) {
}
int _validate_launch(const pmix_proc_t *proc, const pmix_app_t apps[],
size_t napps) {
size_t napps, size_t **_out_ids, size_t *_out_nids) {
// Make sure we do not launch "nothing"
if (!napps)
return 0;
......@@ -513,10 +552,9 @@ int _validate_launch(const pmix_proc_t *proc, const pmix_app_t apps[],
// Receive assign response
char *msg = _receive_message(socket_fd);
if (!msg) {
close(socket_fd);
if (!msg)
return 0;
}
msg_data = _cpy_message_data(msg);
_destroy_message(msg);
......@@ -525,15 +563,38 @@ int _validate_launch(const pmix_proc_t *proc, const pmix_app_t apps[],
strtok(msg, ",");
strtok(NULL, ",");
FILE *ptr = fopen("/home/pmix", "a");
fprintf(ptr, "ASSIGN RESPONSE %s\n", strtok(NULL, ","));
fclose(ptr);
// Check if we are allowed to proceed
if (strcmp(strtok(NULL, ","), "0") == 0) {
_destroy_message(msg_data);
return 0;
}
close(socket_fd);
// Ignore nonce
strtok(NULL, ",");
// Parse all assigned ids
char *count;
uint32_t totalCount = 0;
size_t *ids = (size_t *)malloc(0);
while ((count = strtok(NULL, ",")) != NULL &&
count[0] != '\"') { // last check for start of "msg_type"
uint32_t numTasks;
sscanf(count, "%u", &numTasks);
ids = realloc(ids, totalCount + numTasks);
for (uint32_t i = 0; i < numTasks; i++)
sscanf(strtok(NULL, ","), "%zu", &ids[totalCount + i]);
totalCount += numTasks;
}
*_out_ids = ids;
*_out_nids = totalCount;
_destroy_message(msg_data);
return 1;
}
void _launch_app(const pmix_proc_t *parent, const pmix_app_t *app) {
void _launch_app(const pmix_proc_t *parent, const pmix_app_t *app,
size_t dynamicId) {
int app_argc;
char **argv = _create_srun_argv(app, &app_argc);
_populate_srun_argv(app, app_argc, argv);
......@@ -559,7 +620,7 @@ void _launch_app(const pmix_proc_t *parent, const pmix_app_t *app) {
env_array_append_fmt(&env, "HOME", "/root");
env_array_append_fmt(&env, "DPM_AGENT_PORT", "25000");
env_array_append_fmt(&env, "SLURM_VRM_JOBID", "%u", pmixp_info_jobid());
env_array_append_fmt(&env, "DPM_PMIX_DYNAMIC_ID", "%zu", dynamicId);
execve(SLURM_PREFIX "/bin/srun", argv, env);
abort();
}
......@@ -567,9 +628,13 @@ void _launch_app(const pmix_proc_t *parent, const pmix_app_t *app) {
int pmixp_spawn(const pmix_proc_t *proc, const pmix_app_t apps[], size_t napps,
pmix_spawn_cbfunc_t cbfunc, void *cbdata) {
// First ask for permission
if (!_validate_launch(proc, apps, napps))
size_t *dynamicIds;
size_t nDynamicIds;
if (!_validate_launch(proc, apps, napps, &dynamicIds, &nDynamicIds))
return 0;
size_t currentApp = 0;
THESIS_LOG
ptr = fopen("/home/pmix", "a");
......@@ -590,9 +655,11 @@ int pmixp_spawn(const pmix_proc_t *proc, const pmix_app_t apps[], size_t napps,
__pid_t pid = fork();
if (pid == 0) {
_launch_app(proc, app);
_launch_app(proc, app, dynamicIds[currentApp]);
}
currentApp++;
}
free(dynamicIds);
fclose(ptr);
return 1;
}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment