Source code for genericroboticarm.robo_APIs.precise_flex.pf_implementation

from __future__ import annotations

import time
import traceback
from typing import List, Tuple, Optional, Dict
import socket
from math import dist
import logging
from genericroboticarm.control.graph_manager import JointState
from genericroboticarm.robo_APIs import InteractiveRobot
from .pf_simulation import PFSimulation
from threading import Lock


HOST = "10.10.0.98"
PORT = 10100
# profile_number: Tuple (speed and speed2)
movement_profiles: Dict[int, Tuple] = {
    2: (50, 0),
    4: (15, 0)
}


[docs] class PFSocket: socket: Optional[socket.socket] BYTES_BUFFER = 1024 def __init__(self, host: str, port: int): self.com_lock = Lock() for res in socket.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_STREAM): af, socktype, proto, canonname, sa = res try: self.socket = socket.socket(af, socktype, proto) print("socket: ", self.socket) except OSError as msg: self.socket = None logging.debug(f"{msg} ... {traceback.print_exc()}") print(f"{msg} ... {traceback.print_exc()}") continue try: print("sa:", sa) self.socket.connect(sa) except OSError as msg: print("OSError: ", msg) self.socket.close() self.socket = None continue break if self.socket is None: print('Could not open socket') else: print('Socket connected. Connecting to robot...') self.connect() self.configure_movement_profiles()
[docs] def connect(self): """ Enables the robot high power. Returns an error if power does not come on within the specified timeout. Also attaches the robot. The robot must be attached to allow motion commands. """ power_timeout = 60 # 0 or omitted = do not wait for power to come on. # > 0 = wait this many seconds for power to come on. # -1 = wait indefinitely for power to come on. self.send_command(cmd=f'hp 1 {power_timeout}') # self.send_command(cmd="home") self.send_command(cmd='attach 1') print('Connected device successfully!')
[docs] def disconnect(self): """ Also attaches the robot. The robot must be attached to allow motion commands and disables the robot high power. """ self.send_command(cmd='attach 0') self.send_command(cmd='hp 0') self.send_command(cmd='exit') print('Disconnected device successfully!')
[docs] def configure_movement_profiles(self): for profile, params in movement_profiles.items(): self.set_movement_profile(profile_index=profile, speed=params[0], speed_2=params[1])
[docs] def set_movement_profile(self, profile_index: int, speed: int = None, speed_2: int = None, accel: int = None, decel: int = None, accel_ramp: int = None, decel_ramp: int = None, in_range: int = None, straight: int = None): single_commands_list = ['Speed', 'Speed2', 'Accel', 'AccRamp', 'Decel', 'DecRamp', 'InRange', 'Straight'] parameter_list = [speed, speed_2, accel, decel, accel_ramp, decel_ramp, in_range, straight] if None in parameter_list: # Set only the params that have been changed individually for i, param in enumerate(parameter_list): if param is not None: self.send_command(f'{single_commands_list[i]} {profile_index} {param}\n') return else: # Set all params at once self.send_command( f'Profile {profile_index} {speed} {speed_2} {accel} {decel} {accel_ramp} {decel_ramp} {in_range} ' f'{straight}\n' )
[docs] def send_command(self, cmd: str): with self.com_lock: self.socket.send(str.encode(cmd + "\n")) time.sleep(.01) answer = self.receive_data() str_answer = answer.decode().strip() if "\n" in str_answer: # this is probably some leftovers from the last communication #print(f"old answer to cmd {cmd}: |{str_answer}|") str_answer = str_answer.split("\n")[-1] #if "where" not in cmd: # print(f"cmd: {cmd}\nanswer: {str_answer}") try: if float(str_answer) < 0: logging.error(f"Error in cmd: {cmd}\nanswer: {str_answer}") except ValueError: logging.debug(f"answer {str_answer} to cmd: {cmd} is no float") return str_answer
[docs] def receive_data(self): return self.socket.recv(self.BYTES_BUFFER)
def __del__(self): self.socket.shutdown(0) self.socket.close()
[docs] class PFImplementation(InteractiveRobot): socket: PFSocket | PFSimulation def __init__(self, start_simulating: bool = True, host: str = HOST, port: int = PORT): super().__init__(start_simulating) if start_simulating: self.socket = PFSimulation() else: self.init_connection(host, port) self.move_profile = 2
[docs] @classmethod def get_name(cls) -> str: return "PF400"
[docs] @classmethod def get_used_cli_args(cls) -> dict[str, str]: return {"start_simulating": "simulation", "host": "robot_ip", "port": "robot_port"}
@property def name(self)-> str: if self.is_in_simulation_mode: # this ensures separate saved positions and interactive device list fir simulation and real mode return self.get_name() + "_sim" else: return self.get_name() # RoboInterface
[docs] def init_connection(self, host: str, port: int): self.socket = PFSocket(host, port)
@property def joint_names(self) -> list[str]: return ["x", "y", "z", "yaw", "pitch", "roll"]
[docs] def stop_moving(self): self.socket.send_command("halt") super().stop_moving()
[docs] def set_speed(self, new_speed: float): super().set_speed(new_speed) self.socket.send_command(f"Speed {self.move_profile} {new_speed}")
[docs] def set_acceleration(self, new_acceleration: float, **kwargs): raise NotImplementedError
[docs] def move_relative(self, movements: List[Tuple[str, float]], **kwargs): # there seems to be no inherent method for this # get the current destination if robot is moving or location otherwise destination_str = self.socket.send_command("DestC") coordinates = destination_str.split(" ")[1:] target_state = {name: float(val) for name, val in zip(self.joint_names, coordinates)} # add the relative changes to current coordinates for joint, diff in movements: target_state[joint] += diff self.move_to_coordinates(target_state)
[docs] def move_joint(self, joint_index: int, target_angle: float) -> str: """ Moves a specific joint. The movement will be executed after current movements have finished :param joint_index: :param target_angle: :return: """ j_string = self.socket.send_command("destj") j_positions = j_string.split(" ")[1:] j_positions[joint_index] = str(target_angle) j_string_target = " ".join(j_positions) jmove_command = f"MoveJ {self.move_profile} {j_string_target}" return self.socket.send_command(jmove_command)
[docs] def finish_movement(self, max_time: float = 60, tolerance: float = 1) -> bool: """ Returns True as soon as the robots position is tolerance near its current target or False if this does not happen until max_time seconds elapsed. :param max_time: :param tolerance: :return: """ time_out = time.time() + max_time while time.time() < time_out: target_str = self.socket.send_command("destj") current_pos_str = self.socket.send_command("wherej") target = [float(coord) for coord in target_str.split()] current_pos = [float(coord) for coord in current_pos_str.split()] distance = dist(target, current_pos) if distance < tolerance: return True time.sleep(.05) return False
[docs] def grip_close(self, **kwargs): self.finish_movement() self.move_joint(4, 75)
[docs] def grip_open(self, **kwargs): self.finish_movement() self.move_joint(4, 85)
@property def gripper_fully_closed(self) -> bool: answer = self.socket.send_command("IsFullyClosed") if answer == "-1": return True elif answer == "0": return False else: logging.warning(f"Answer {answer} to command IsFullyClosed is not understood") return False
[docs] def move_to_coordinates(self, coords: JointState, **kwargs) -> None: position_str = " ".join(str(s) for s in coords.values()) cmd = f'MoveC {self.move_profile} {position_str}' self.socket.send_command(cmd)
@property def current_coordinates(self) -> JointState: position_str = "empty" try: position_str = self.socket.send_command("wherec") positions = position_str.split(' ')[1:7] joint_state = {name: float(val) for name, val in zip(self.joint_names, positions)} return joint_state except Exception as ex: print(ex, traceback.print_exc(), "\n", position_str) @property def number_internal_positions(self) -> int: return 1
[docs] def put_labware(self, intermediate_actions: List[str], device: str, position: int): position_identifier = self.site_to_position_identifier(device, position) self.place_at_position(position_identifier)
[docs] def get_labware(self, intermediate_actions: List[str], device: str, position: int): position_identifier = self.site_to_position_identifier(device, position) self.pick_at_position(position_identifier)
[docs] def prepare_for_output(self, internal_pos: int, device: str, position: int) -> bool: # just wait until no movement is executing with self.move_lock: return True
[docs] def prepare_for_input(self, internal_pos: int, device: str, position: int, plate_type: str) -> bool: # just wait until no movement is executing with self.move_lock: return True
[docs] def cancel_transfer(self): pass
[docs] def run_custom_command(self, cmd: str) -> str: return self.socket.send_command(cmd)