Source code for genericroboticarm.sila_server.server
# Generated by sila2.code_generator; sila2.__version__: 0.10.4
from contextlib import contextmanager
from typing import Iterator, Optional
from uuid import UUID
from threading import BoundedSemaphore, Lock
from sila2.server import SilaServer
from genericroboticarm.robo_APIs import InteractiveTransfer, RoboInterface
from .feature_implementations.implicitinteractionservice_impl import ImplicitInteractionServiceImpl
from .feature_implementations.labwaretransfermanipulatorcontroller_impl import LabwareTransferManipulatorControllerImpl
from .feature_implementations.robotcontroller_impl import RobotControllerImpl
from .feature_implementations.roboteachingservice_impl import RoboTeachingServiceImpl
from .generated.implicitinteractionservice import ImplicitInteractionServiceFeature
from .generated.labwaretransfermanipulatorcontroller import LabwareTransferManipulatorControllerFeature
from .generated.robotcontroller import RobotControllerFeature
from .generated.roboteachingservice import RoboTeachingServiceFeature
[docs]
class RobotCommandQueueFull(RuntimeError):
pass
[docs]
class Server(SilaServer):
robot: RoboInterface
def __init__(self, robo_interface: RoboInterface, server_uuid: Optional[UUID] = None):
super().__init__(
server_name=robo_interface.name,
server_type="RoboArmServer",
server_version="0.1",
server_description=f"Server to control a {robo_interface.name}",
server_vendor_url="https://gitlab.com/SiLA2/sila_python",
server_uuid=server_uuid,
)
self.robot = robo_interface
self.sila_lock = Lock()
self.robot_command_slots = BoundedSemaphore(2)
self.robotcontroller = RobotControllerImpl(self)
self.set_feature_implementation(RobotControllerFeature, self.robotcontroller)
self.roboteachingservice = RoboTeachingServiceImpl(self)
self.set_feature_implementation(RoboTeachingServiceFeature, self.roboteachingservice)
print("is interactive: ", isinstance(robo_interface, InteractiveTransfer))
if isinstance(robo_interface, InteractiveTransfer):
self.labwaretransfermanipulatorcontroller = LabwareTransferManipulatorControllerImpl(self)
self.set_feature_implementation(
LabwareTransferManipulatorControllerFeature, self.labwaretransfermanipulatorcontroller
)
self.implicitinteractionservice = ImplicitInteractionServiceImpl(self, robo_interface.interacting_devices)
self.set_feature_implementation(ImplicitInteractionServiceFeature, self.implicitinteractionservice)
[docs]
def stop(self, grace_period: Optional[float] = None) -> None:
self.robot.manual_mover.stop()
self.robot.end_connection()
super().stop(grace_period)
[docs]
@contextmanager
def robot_command(self) -> Iterator[None]:
accepted = self.robot_command_slots.acquire(blocking=False)
if not accepted:
raise RobotCommandQueueFull(
"Robot is already executing one SiLA command and has one command waiting."
)
try:
with self.sila_lock:
yield
finally:
self.robot_command_slots.release()