Source code for genericroboticarm.sila_server.server

# Generated by sila2.code_generator; sila2.__version__: 0.10.4

from contextlib import contextmanager
from typing import Iterator, Optional
from uuid import UUID
from threading import BoundedSemaphore, Lock

from sila2.server import SilaServer

from genericroboticarm.robo_APIs import InteractiveTransfer, RoboInterface

from .feature_implementations.implicitinteractionservice_impl import ImplicitInteractionServiceImpl
from .feature_implementations.labwaretransfermanipulatorcontroller_impl import LabwareTransferManipulatorControllerImpl
from .feature_implementations.robotcontroller_impl import RobotControllerImpl
from .feature_implementations.roboteachingservice_impl import RoboTeachingServiceImpl
from .generated.implicitinteractionservice import ImplicitInteractionServiceFeature
from .generated.labwaretransfermanipulatorcontroller import LabwareTransferManipulatorControllerFeature
from .generated.robotcontroller import RobotControllerFeature
from .generated.roboteachingservice import RoboTeachingServiceFeature


[docs] class RobotCommandQueueFull(RuntimeError): pass
[docs] class Server(SilaServer): robot: RoboInterface def __init__(self, robo_interface: RoboInterface, server_uuid: Optional[UUID] = None): super().__init__( server_name=robo_interface.name, server_type="RoboArmServer", server_version="0.1", server_description=f"Server to control a {robo_interface.name}", server_vendor_url="https://gitlab.com/SiLA2/sila_python", server_uuid=server_uuid, ) self.robot = robo_interface self.sila_lock = Lock() self.robot_command_slots = BoundedSemaphore(2) self.robotcontroller = RobotControllerImpl(self) self.set_feature_implementation(RobotControllerFeature, self.robotcontroller) self.roboteachingservice = RoboTeachingServiceImpl(self) self.set_feature_implementation(RoboTeachingServiceFeature, self.roboteachingservice) print("is interactive: ", isinstance(robo_interface, InteractiveTransfer)) if isinstance(robo_interface, InteractiveTransfer): self.labwaretransfermanipulatorcontroller = LabwareTransferManipulatorControllerImpl(self) self.set_feature_implementation( LabwareTransferManipulatorControllerFeature, self.labwaretransfermanipulatorcontroller ) self.implicitinteractionservice = ImplicitInteractionServiceImpl(self, robo_interface.interacting_devices) self.set_feature_implementation(ImplicitInteractionServiceFeature, self.implicitinteractionservice)
[docs] def stop(self, grace_period: Optional[float] = None) -> None: self.robot.manual_mover.stop() self.robot.end_connection() super().stop(grace_period)
[docs] @contextmanager def robot_command(self) -> Iterator[None]: accepted = self.robot_command_slots.acquire(blocking=False) if not accepted: raise RobotCommandQueueFull( "Robot is already executing one SiLA command and has one command waiting." ) try: with self.sila_lock: yield finally: self.robot_command_slots.release()