-
Tristan Walter authoredTristan Walter authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
closed_loop.py 2.97 KiB
import numpy as np
import cv2 as cv
import TRex
import subprocess
from queue import Queue, Empty
import sys
from threading import Thread
import platform
ON_POSIX = 'posix' in sys.builtin_module_names
def enqueue_output(out, queue):
for line in iter(out.readline, b''):
queue.put(line)
out.close()
q = Queue()
t = None
dimensions = None
process = None
scale_factor = 0.5
def request_features():
return "position" #"position,visual_field,midline"
def update_tracking():
global process, t, q, dimensions
if type(None) == type(process):
print("initializing subprocess --")
process = subprocess.Popen(['python', '-i'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE, bufsize=1, close_fds=ON_POSIX)
t = Thread(target=enqueue_output, args=(process.stdout, q))
t.daemon = True # thread dies with the program
t.start()
dimensions = TRex.video_size()
if frame % 100 == 0:
s = (str(frame)+"*2\n").encode('ascii')
print(frame, "sending message", s)
process.stdin.write(s)
process.stdin.flush()
try:
while True:
line = q.get_nowait() # or q.get(timeout=.1)
line = line.decode('utf-8')
print("output:",line)
# do work with 'line'
except Empty:
pass
image = np.zeros((int(dimensions["height"] * scale_factor), int(dimensions["width"] * scale_factor), 3), dtype=np.uint8)
#print(positions.shape, ids.shape)
#print(visual_field.shape, ids, colors)
#print(midlines)
for i, key in zip(range(len(ids)), ids):
color = (int(colors[i * 3]), int(colors[i * 3 + 1]), int(colors[i * 3 + 2]))
pos = positions[i] * scale_factor
if len(midlines) > i:
midline = (midlines[i] * scale_factor + pos ).astype(np.int)
pos = tuple((pos + centers[i] * scale_factor).astype(np.int))
if len(midlines) > i and not np.isinf(midline[0]).any():
print(midlines, i, midline[0].min(), midline[0].max())
cv.circle(image, tuple(midline[0]), 5, (255, 0, 0), -1)
for j in range(1, len(midline)):
cv.line(image, tuple(midline[j-1, :]), tuple(midline[j]), (255, 255, 255))
cv.circle(image, pos, 5, color, -1)
if key != 1 or i >= len(visual_field):
continue
for id in np.unique(np.concatenate((visual_field[i]))):
if id == -1 or id == key or not id in ids:
continue
j = np.where(ids == id)[0]
other = tuple(((positions[j] + centers[j]) * scale_factor)[0].astype(np.int))
cv.line(image, pos, other, (255, 255, 255))
#print("tracking", frame, key, positions[key][3])
#cv.imwrite("image.png", image)
cv.putText(image, str(frame), (10, 10), cv.FONT_HERSHEY_PLAIN, 0.5, (255, 255, 255))
image = cv.cvtColor(image, cv.COLOR_BGR2RGB)
TRex.imshow("image", image)