from __future__ import annotations
import time
import traceback
from typing import List, Tuple, Optional, Dict
import socket
from math import dist
import logging
from genericroboticarm.control.graph_manager import JointState
from genericroboticarm.robo_APIs import InteractiveRobot
from .pf_simulation import PFSimulation
from threading import Lock
HOST = "10.10.0.98"
PORT = 10100
# profile_number: Tuple (speed and speed2)
movement_profiles: Dict[int, Tuple] = {
2: (50, 0),
4: (15, 0)
}
[docs]
class PFSocket:
socket: Optional[socket.socket]
BYTES_BUFFER = 1024
def __init__(self, host: str, port: int):
self.com_lock = Lock()
for res in socket.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res
try:
self.socket = socket.socket(af, socktype, proto)
print("socket: ", self.socket)
except OSError as msg:
self.socket = None
logging.debug(f"{msg} ... {traceback.print_exc()}")
print(f"{msg} ... {traceback.print_exc()}")
continue
try:
print("sa:", sa)
self.socket.connect(sa)
except OSError as msg:
print("OSError: ", msg)
self.socket.close()
self.socket = None
continue
break
if self.socket is None:
print('Could not open socket')
else:
print('Socket connected. Connecting to robot...')
self.connect()
self.configure_movement_profiles()
[docs]
def connect(self):
"""
Enables the robot high power. Returns an error if power does not come on within the specified
timeout.
Also attaches the robot. The robot must be attached to allow motion commands.
"""
power_timeout = 60
# 0 or omitted = do not wait for power to come on.
# > 0 = wait this many seconds for power to come on.
# -1 = wait indefinitely for power to come on.
self.send_command(cmd=f'hp 1 {power_timeout}')
# self.send_command(cmd="home")
self.send_command(cmd='attach 1')
print('Connected device successfully!')
[docs]
def disconnect(self):
"""
Also attaches the robot. The robot must be attached to allow motion commands and disables the robot high power.
"""
self.send_command(cmd='attach 0')
self.send_command(cmd='hp 0')
self.send_command(cmd='exit')
print('Disconnected device successfully!')
[docs]
def set_movement_profile(self, profile_index: int, speed: int = None, speed_2: int = None, accel: int = None,
decel: int = None, accel_ramp: int = None, decel_ramp: int = None, in_range: int = None,
straight: int = None):
single_commands_list = ['Speed', 'Speed2', 'Accel', 'AccRamp', 'Decel', 'DecRamp', 'InRange', 'Straight']
parameter_list = [speed, speed_2, accel, decel, accel_ramp, decel_ramp, in_range, straight]
if None in parameter_list:
# Set only the params that have been changed individually
for i, param in enumerate(parameter_list):
if param is not None:
self.send_command(f'{single_commands_list[i]} {profile_index} {param}\n')
return
else:
# Set all params at once
self.send_command(
f'Profile {profile_index} {speed} {speed_2} {accel} {decel} {accel_ramp} {decel_ramp} {in_range} '
f'{straight}\n'
)
[docs]
def send_command(self, cmd: str):
with self.com_lock:
self.socket.send(str.encode(cmd + "\n"))
time.sleep(.01)
answer = self.receive_data()
str_answer = answer.decode().strip()
if "\n" in str_answer:
# this is probably some leftovers from the last communication
#print(f"old answer to cmd {cmd}: |{str_answer}|")
str_answer = str_answer.split("\n")[-1]
#if "where" not in cmd:
# print(f"cmd: {cmd}\nanswer: {str_answer}")
try:
if float(str_answer) < 0:
logging.error(f"Error in cmd: {cmd}\nanswer: {str_answer}")
except ValueError:
logging.debug(f"answer {str_answer} to cmd: {cmd} is no float")
return str_answer
[docs]
def receive_data(self):
return self.socket.recv(self.BYTES_BUFFER)
def __del__(self):
self.socket.shutdown(0)
self.socket.close()
[docs]
class PFImplementation(InteractiveRobot):
socket: PFSocket | PFSimulation
def __init__(self, start_simulating: bool = True, host: str = HOST, port: int = PORT):
super().__init__(start_simulating)
if start_simulating:
self.socket = PFSimulation()
else:
self.init_connection(host, port)
self.move_profile = 2
[docs]
@classmethod
def get_name(cls) -> str:
return "PF400"
[docs]
@classmethod
def get_used_cli_args(cls) -> dict[str, str]:
return {"start_simulating": "simulation", "host": "robot_ip", "port": "robot_port"}
@property
def name(self)-> str:
if self.is_in_simulation_mode:
# this ensures separate saved positions and interactive device list fir simulation and real mode
return self.get_name() + "_sim"
else:
return self.get_name()
# RoboInterface
[docs]
def init_connection(self, host: str, port: int):
self.socket = PFSocket(host, port)
@property
def joint_names(self) -> list[str]:
return ["x", "y", "z", "yaw", "pitch", "roll"]
[docs]
def stop_moving(self):
self.socket.send_command("halt")
super().stop_moving()
[docs]
def set_speed(self, new_speed: float):
super().set_speed(new_speed)
self.socket.send_command(f"Speed {self.move_profile} {new_speed}")
[docs]
def set_acceleration(self, new_acceleration: float, **kwargs):
raise NotImplementedError
[docs]
def move_relative(self, movements: List[Tuple[str, float]], **kwargs):
# there seems to be no inherent method for this
# get the current destination if robot is moving or location otherwise
destination_str = self.socket.send_command("DestC")
coordinates = destination_str.split(" ")[1:]
target_state = {name: float(val) for name, val in zip(self.joint_names, coordinates)}
# add the relative changes to current coordinates
for joint, diff in movements:
target_state[joint] += diff
self.move_to_coordinates(target_state)
[docs]
def move_joint(self, joint_index: int, target_angle: float) -> str:
"""
Moves a specific joint. The movement will be executed after current movements have finished
:param joint_index:
:param target_angle:
:return:
"""
j_string = self.socket.send_command("destj")
j_positions = j_string.split(" ")[1:]
j_positions[joint_index] = str(target_angle)
j_string_target = " ".join(j_positions)
jmove_command = f"MoveJ {self.move_profile} {j_string_target}"
return self.socket.send_command(jmove_command)
[docs]
def finish_movement(self, max_time: float = 60, tolerance: float = 1) -> bool:
"""
Returns True as soon as the robots position is tolerance near its current target or False
if this does not happen until max_time seconds elapsed.
:param max_time:
:param tolerance:
:return:
"""
time_out = time.time() + max_time
while time.time() < time_out:
target_str = self.socket.send_command("destj")
current_pos_str = self.socket.send_command("wherej")
target = [float(coord) for coord in target_str.split()]
current_pos = [float(coord) for coord in current_pos_str.split()]
distance = dist(target, current_pos)
if distance < tolerance:
return True
time.sleep(.05)
return False
[docs]
def grip_close(self, **kwargs):
self.finish_movement()
self.move_joint(4, 75)
[docs]
def grip_open(self, **kwargs):
self.finish_movement()
self.move_joint(4, 85)
@property
def gripper_fully_closed(self) -> bool:
answer = self.socket.send_command("IsFullyClosed")
if answer == "-1":
return True
elif answer == "0":
return False
else:
logging.warning(f"Answer {answer} to command IsFullyClosed is not understood")
return False
[docs]
def move_to_coordinates(self, coords: JointState, **kwargs) -> None:
position_str = " ".join(str(s) for s in coords.values())
cmd = f'MoveC {self.move_profile} {position_str}'
self.socket.send_command(cmd)
@property
def current_coordinates(self) -> JointState:
position_str = "empty"
try:
position_str = self.socket.send_command("wherec")
positions = position_str.split(' ')[1:7]
joint_state = {name: float(val) for name, val in zip(self.joint_names, positions)}
return joint_state
except Exception as ex:
print(ex, traceback.print_exc(), "\n", position_str)
@property
def number_internal_positions(self) -> int:
return 1
[docs]
def put_labware(self, intermediate_actions: List[str], device: str, position: int):
position_identifier = self.site_to_position_identifier(device, position)
self.place_at_position(position_identifier)
[docs]
def get_labware(self, intermediate_actions: List[str], device: str, position: int):
position_identifier = self.site_to_position_identifier(device, position)
self.pick_at_position(position_identifier)
[docs]
def prepare_for_output(self, internal_pos: int, device: str, position: int) -> bool:
# just wait until no movement is executing
with self.move_lock:
return True
[docs]
def cancel_transfer(self):
pass
[docs]
def run_custom_command(self, cmd: str) -> str:
return self.socket.send_command(cmd)