Source code for genericroboticarm.control.graph_manager

"""

"""

from __future__ import annotations

import logging
from pathlib import Path

import networkx as nx
from typing import List, Dict, Callable, Optional
from math import dist
import platformdirs
from importlib import resources

JointState = Dict[str, float]


[docs] def default_dist(p: JointState, q: JointState) -> float: return dist(p.values(), q.values())
[docs] class GraphManager: g: nx.Graph dist: Callable[[JointState, JointState], float] def __init__(self, name: str): self.dist = default_dist # Filename without the path self.filename = f"position_graph_{name}.gml" self.default_dir = "genericroboticarm.graphs" self.custom_dir = platformdirs.user_config_path(appname="GenericRoboticArm", ensure_exists=True) try: self.load(self.filename) except FileNotFoundError: self.g = nx.Graph()
[docs] def load(self, file_name: Optional[str] = None): """ Loads a networkx graph :param file_name: :return: """ try: path = Path(self.custom_dir) / file_name self.g = nx.read_gml(path) except FileNotFoundError: try: # Load default resource with resources.path(self.default_dir, file_name) as path: self.g = nx.read_gml(path) except FileNotFoundError: # If the file is not found in either location, raise a more descriptive error raise FileNotFoundError(f"File {file_name} not found in {self.custom_dir} or {self.default_dir}")
[docs] def save(self, filename: str): """ Saves the current networkx graph :param filename: :return: """ dest_path = self.custom_dir/filename nx.write_gml(self.g, dest_path, stringizer=str)
[docs] def position_known(self, identifier: str) -> bool: known = identifier in self.g.nodes return known
[docs] def change_position(self, identifier: str, pos: JointState): """ :param identifier: :param pos: :return: """ self.g.nodes[identifier].update(pos) self.update_distances() self.save(self.filename)
[docs] def add_position(self, identifier: str, pos: JointState): """ :param pos: :param identifier: :return: """ self.g.add_node(identifier, **pos) self.save(self.filename)
[docs] def add_intermediates(self, from_pos: str, to_pos: str, num: int, name_generator: Callable[[int], str]): """ Adds a specified number of intermediate positions between two specified positions. The new positions will be equidistant points on a straight line between the two positions. You need to add a name generator function. This function will receive the number of the intermediate position as an integer starting at 0. The intermediate positions will be enumerated starting near the 'from_pos' position. :return: """ start = self.get_coordinates(from_pos) end = self.get_coordinates(to_pos) for i in range(num): new_identifier = name_generator(i) # create intermediate points a*end + (1-a)*start with a going 1/(num+1) to num/(num+1) factor = (i+1)/(num+1) joint_state = { joint_name: factor*end[joint_name] + (1-factor)*value for joint_name, value in start.items() } self.add_position(new_identifier, joint_state)
[docs] def add_connection(self, head: str, tail: str): """ :param head: :param tail: :return: """ # it is important (for this code) to consistently have a attribute called dist tail_pos = self.g.nodes[tail] head_pos = self.g.nodes[head] length = self.dist(tail_pos, head_pos) self.g.add_edge(tail, head, dist=length) self.save(self.filename)
[docs] def update_distances(self): """ Applies the distance function to all edges :return: """ for u, v, data in self.g.edges(data=True): u_coordinates = self.get_coordinates(u) v_coordinates = self.get_coordinates(v) data["dist"] = self.dist(u_coordinates, v_coordinates)
[docs] def get_nearest_position(self, pos: JointState) -> str: """ :param pos: :return: """ def criteria(n: str): return self.dist(self.g.nodes[n], pos) sorted_positions = sorted(self.g.nodes, key=criteria) # the first one should be the nearest return sorted_positions[0]
[docs] def get_coordinates(self, identifier: str) -> JointState: """ :param identifier: :return: """ data = self.g.nodes[identifier] return data.copy()
[docs] def get_shortest_path(self, start: str, target: str) -> List[str]: """ Returns the shortest path from start to target as a list of node identifiers including the target but not the start. Returns an empty list if no path exists. :param start: :param target: :return: """ if nx.has_path(self.g, start, target): path = nx.shortest_path(self.g, start, target, weight='dist') logging.info("shortest path is:", path) # the first entry is the current node. we remove it. return path[1:] else: logging.warning(f"There is no path between {start} and {target}.") return []
[docs] def remove_position(self, identifier: str): """ :param identifier: :return: """ if identifier in self.g.nodes: self.g.remove_node(identifier) self.save(self.filename)
[docs] def remove_connection(self, head: str, tail: str): """ :param head: :param tail: :return: """ if (tail, head) in self.g.edges: self.g.remove_edge(tail, head) self.save(self.filename)