Source code for genericroboticarm.control.manual_movement

"""
Used to accumulate key presses and create movement commands from them.
"""
from queue import Queue
import time
from threading import Thread
from typing import NamedTuple, List, Tuple, Any


MOVE_FREQUENCY = 10


[docs] class MoveKey(NamedTuple): # key to listen to key: str # number of the joint to be moved joint_index: int # whether moved forward positive: bool
move_keys = [ MoveKey("ArrowLeft", 0, True), MoveKey("ArrowRight", 0, False), MoveKey("ArrowUp", 1, True), MoveKey("ArrowDown", 1, False), MoveKey('y', 2, True), MoveKey('a', 2, False), MoveKey('x', 3, True), MoveKey('s', 3, False), MoveKey('c', 4, True), MoveKey('d', 4, False), MoveKey('v', 5, True), MoveKey('f', 5, False), MoveKey('b', 6, True), MoveKey('g', 6, False), MoveKey('n', 7, True), MoveKey('h', 7, False), MoveKey('m', 8, True), MoveKey('j', 8, False), ]
[docs] class ManualMovementManager: input_queue: Queue[str] def __init__(self, robot: Any, joint_names: List[str]): self.input_queue = Queue[str]() self._stop = False self.joint_names = joint_names # start the listener Thread(daemon=True, target=self.listen).start() self.robot = robot self.sending_stats = []
[docs] def listen(self): """ will run forever and accumulate received key presses. :return: """ while not self._stop: # wait to match the update frequency time.sleep(1/MOVE_FREQUENCY) movements = self.empty_queue() if any(movements): self.robot.move_relative(movements)
[docs] def empty_queue(self) -> List[Tuple[str, int]]: # collect all inputs movements = [0 for move_key in move_keys if move_key.positive] while not self.input_queue.empty(): key = self.input_queue.get() # check whether the key has a meaning and add a movement to the respective joint for move_key in move_keys: # check whether there are enough joints in the current robot if move_key.joint_index <= len(self.joint_names): if key == move_key.key: if move_key.positive: movements[move_key.joint_index] += 1 else: movements[move_key.joint_index] -= 1 to_move = [] for idx, diff in enumerate(movements): # only add relevant key-presses if diff and idx < len(self.joint_names): to_move.append((self.joint_names[idx], diff)) return to_move
[docs] def receive_key(self, key: str): self.input_queue.put(key)
[docs] def stop(self): self._stop = True