"""
Used to accumulate key presses and create movement commands from them.
"""
from queue import Queue
import time
from threading import Thread
from typing import NamedTuple, List, Tuple, Any
MOVE_FREQUENCY = 10
[docs]
class MoveKey(NamedTuple):
# key to listen to
key: str
# number of the joint to be moved
joint_index: int
# whether moved forward
positive: bool
move_keys = [
MoveKey("ArrowLeft", 0, True),
MoveKey("ArrowRight", 0, False),
MoveKey("ArrowUp", 1, True),
MoveKey("ArrowDown", 1, False),
MoveKey('y', 2, True),
MoveKey('a', 2, False),
MoveKey('x', 3, True),
MoveKey('s', 3, False),
MoveKey('c', 4, True),
MoveKey('d', 4, False),
MoveKey('v', 5, True),
MoveKey('f', 5, False),
MoveKey('b', 6, True),
MoveKey('g', 6, False),
MoveKey('n', 7, True),
MoveKey('h', 7, False),
MoveKey('m', 8, True),
MoveKey('j', 8, False),
]
[docs]
class ManualMovementManager:
input_queue: Queue[str]
def __init__(self, robot: Any, joint_names: List[str]):
self.input_queue = Queue[str]()
self._stop = False
self.joint_names = joint_names
# start the listener
Thread(daemon=True, target=self.listen).start()
self.robot = robot
self.sending_stats = []
[docs]
def listen(self):
"""
will run forever and accumulate received key presses.
:return:
"""
while not self._stop:
# wait to match the update frequency
time.sleep(1/MOVE_FREQUENCY)
movements = self.empty_queue()
if any(movements):
self.robot.move_relative(movements)
[docs]
def empty_queue(self) -> List[Tuple[str, int]]:
# collect all inputs
movements = [0 for move_key in move_keys if move_key.positive]
while not self.input_queue.empty():
key = self.input_queue.get()
# check whether the key has a meaning and add a movement to the respective joint
for move_key in move_keys:
# check whether there are enough joints in the current robot
if move_key.joint_index <= len(self.joint_names):
if key == move_key.key:
if move_key.positive:
movements[move_key.joint_index] += 1
else:
movements[move_key.joint_index] -= 1
to_move = []
for idx, diff in enumerate(movements):
# only add relevant key-presses
if diff and idx < len(self.joint_names):
to_move.append((self.joint_names[idx], diff))
return to_move
[docs]
def receive_key(self, key: str):
self.input_queue.put(key)
[docs]
def stop(self):
self._stop = True