Source code for genericroboticarm.__main__

"""_____________________________________________________________________

:PROJECT: genericroboticarm

* Main module command line interface *

:details:  Main module command line interface.
           !!! Warning: it should have a diffent name than the package name.

.. note:: -
________________________________________________________________________
"""
import asyncio
import argparse
import logging
import sys

from genericroboticarm import __version__

from genericroboticarm.robo_APIs import collect_robo_implementations
from genericroboticarm.control.robo_dash import RoboDashApp


logging.basicConfig(
    format="%(levelname)-4s| %(module)s.%(funcName)s: %(message)s",
    level=logging.WARNING,
)


[docs] def parse_command_line(): """Console script for genericroboticarm.""" description = "genericroboticarm" parser = argparse.ArgumentParser(description=description) parser.add_argument("_", nargs="*") parser.add_argument( "-r", "--robot", action="store", default="Dummy", help="Name of the robot implementation (Dummy by default). Options are: Dummy, Dorna, XArm, " "PF400, UppsalaXArm, JoyIt", ) parser.add_argument("-a", "--address", action="store", default="127.0.0.1", help="IP address of the SiLA server") parser.add_argument("-p", "--port", action="store", default=50051, help="Port of the SiLA server") parser.add_argument( "-s", "--simulation", action=argparse.BooleanOptionalAction, default=False, help="Set to True to start the robot in simulation mode", ) parser.add_argument( "-u", "--unitelabs", action=argparse.BooleanOptionalAction, default=False, help="Starts the sila interface as a unitelabs connector instead of sila_python server.", ) parser.add_argument("--com-port", action="store", default=None, help="Com port for the hardware. Only required for JoyIt.") parser.add_argument("--robot-ip", action="store", default=None, help="The ip for the robot hardware. Only required for XArm and, PF400 and Dorna") parser.add_argument("--robot-port", action="store", default=None, help="The port for the robot hardware. Only required for PF400") parser.add_argument("-v", "--version", action="version", version="%(prog)s " + __version__) return parser.parse_args()
[docs] def main(): """Creates and starts the connector application""" logging.basicConfig(format="%(levelname)-4s| %(module)s.%(funcName)s: %(message)s", level=logging.DEBUG) args = parse_command_line() # select the robot implementation robot_name = args.robot implementations = collect_robo_implementations() names = [implementation.get_name() for implementation in implementations] print(f"Found RoboInterface implementations: {names}") for implementation in implementations: if implementation.get_name() == robot_name: robot_type = implementation break else: logging.error(f"Unknown robot {robot_name}. Available: {', '.join(names)}") return required_kwargs = robot_type.get_used_cli_args() kwargs = {} for key, cl_attr in required_kwargs.items(): try: kwargs[key] = getattr(args, cl_attr) except AttributeError: logging.error(f"You must specify {cl_attr} to use {robot_name}.") return robo_impl = robot_type(**kwargs) dash_app = RoboDashApp(robo_interface=robo_impl, port=8055) dash_app.run() if args.unitelabs: from .connector import create_app asyncio.run(create_app(robo_impl)) else: from .sila_server import Server # start the sila sever server = Server(robo_interface=robo_impl) server.start_insecure(args.address, int(args.port)) if sys.stdin.isatty(): print("\nPress 'q' -> enter to exit.") while not input() == 'q': pass server.stop(grace_period=.5) else: # Non-interactive (Docker) - wait for interrupt signal import signal import threading logging.info("Waiting for ctrl-c to stop server...") stop_event = threading.Event() def quit(signum, frame): logging.info(f"Received signal {signum}, stopping server...") stop_event.set() signal.signal(signal.SIGINT, quit) signal.signal(signal.SIGTERM, quit) stop_event.wait()
if __name__ == "__main__": main()