Skip to content

Collaborative astar #1247

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 22 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions PathPlanning/TimeBasedPathPlanning/BaseClasses.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass
import math
from PathPlanning.TimeBasedPathPlanning.GridWithDynamicObstacles import (
Grid,
Position,
)
from PathPlanning.TimeBasedPathPlanning.Node import NodePath
import random
import numpy.random as numpy_random

# Seed randomness for reproducibility
RANDOM_SEED = 50
random.seed(RANDOM_SEED)
numpy_random.seed(RANDOM_SEED)

class SingleAgentPlanner(ABC):
"""
Base class for single agent planners
"""

@staticmethod
@abstractmethod
def plan(grid: Grid, start: Position, goal: Position, verbose: bool = False) -> NodePath:
pass

@dataclass
class StartAndGoal:
# Index of this agent
index: int
# Start position of the robot
start: Position
# Goal position of the robot
goal: Position

def distance_start_to_goal(self) -> float:
return pow(self.goal.x - self.start.x, 2) + pow(self.goal.y - self.start.y, 2)

class MultiAgentPlanner(ABC):
"""
Base class for multi-agent planners
"""

@staticmethod
@abstractmethod
def plan(grid: Grid, start_and_goal_positions: list[StartAndGoal], verbose: bool = False) -> list[NodePath]:
"""
Plan for all agents. Returned paths are in the order of the `StartAndGoal` list this object was instantiated with
"""
pass
95 changes: 67 additions & 28 deletions PathPlanning/TimeBasedPathPlanning/GridWithDynamicObstacles.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,7 @@
import matplotlib.pyplot as plt
from enum import Enum
from dataclasses import dataclass

@dataclass(order=True)
class Position:
x: int
y: int

def as_ndarray(self) -> np.ndarray:
return np.array([self.x, self.y])

def __add__(self, other):
if isinstance(other, Position):
return Position(self.x + other.x, self.y + other.y)
raise NotImplementedError(
f"Addition not supported for Position and {type(other)}"
)

def __sub__(self, other):
if isinstance(other, Position):
return Position(self.x - other.x, self.y - other.y)
raise NotImplementedError(
f"Subtraction not supported for Position and {type(other)}"
)

def __hash__(self):
return hash((self.x, self.y))
from PathPlanning.TimeBasedPathPlanning.Node import NodePath, Position

@dataclass
class Interval:
Expand All @@ -43,6 +19,8 @@ class ObstacleArrangement(Enum):
RANDOM = 0
# Obstacles start in a line in y at center of grid and move side-to-side in x
ARRANGEMENT1 = 1
# Static obstacle arrangement
NARROW_CORRIDOR = 2

"""
Generates a 2d numpy array with lists for elements.
Expand Down Expand Up @@ -87,6 +65,8 @@ def __init__(
self.obstacle_paths = self.generate_dynamic_obstacles(num_obstacles)
elif obstacle_arrangement == ObstacleArrangement.ARRANGEMENT1:
self.obstacle_paths = self.obstacle_arrangement_1(num_obstacles)
elif obstacle_arrangement == ObstacleArrangement.NARROW_CORRIDOR:
self.obstacle_paths = self.generate_narrow_corridor_obstacles(num_obstacles)

for i, path in enumerate(self.obstacle_paths):
obs_idx = i + 1 # avoid using 0 - that indicates free space in the grid
Expand Down Expand Up @@ -184,6 +164,26 @@ def obstacle_arrangement_1(self, obs_count: int) -> list[list[Position]]:
obstacle_paths.append(path)

return obstacle_paths

def generate_narrow_corridor_obstacles(self, obs_count: int) -> list[list[Position]]:
obstacle_paths = []

for y in range(0, self.grid_size[1]):
if y > obs_count:
break

if y == self.grid_size[1] // 2:
# Skip the middle row
continue

obstacle_path = []
x = 10 # middle of the grid
for t in range(0, self.time_limit - 1):
obstacle_path.append(Position(x, y))

obstacle_paths.append(obstacle_path)

return obstacle_paths

"""
Check if the given position is valid at time t
Expand All @@ -196,11 +196,11 @@ def obstacle_arrangement_1(self, obs_count: int) -> list[list[Position]]:
bool: True if position/time combination is valid, False otherwise
"""
def valid_position(self, position: Position, t: int) -> bool:
# Check if new position is in grid
# Check if position is in grid
if not self.inside_grid_bounds(position):
return False

# Check if new position is not occupied at time t
# Check if position is not occupied at time t
return self.reservation_matrix[position.x, position.y, t] == 0

"""
Expand Down Expand Up @@ -289,9 +289,48 @@ def get_safe_intervals_at_cell(self, cell: Position) -> list[Interval]:
# both the time step when it is entering the cell, and the time step when it is leaving the cell.
intervals = [interval for interval in intervals if interval.start_time != interval.end_time]
return intervals

"""
Reserve an agent's path in the grid. Raises an exception if the agent's index is 0, or if a position is
already reserved by a different agent.
"""
def reserve_path(self, node_path: NodePath, agent_index: int):
if agent_index == 0:
raise Exception("Agent index cannot be 0")

for i, node in enumerate(node_path.path):
reservation_finish_time = node.time + 1
if i < len(node_path.path) - 1:
reservation_finish_time = node_path.path[i + 1].time

show_animation = True
self.reserve_position(node.position, agent_index, Interval(node.time, reservation_finish_time))

"""
Reserve a position for the provided agent during the provided time interval.
Raises an exception if the agent's index is 0, or if the position is already reserved by a different agent during the interval.
"""
def reserve_position(self, position: Position, agent_index: int, interval: Interval):
if agent_index == 0:
raise Exception("Agent index cannot be 0")

for t in range(interval.start_time, interval.end_time + 1):
current_reserver = self.reservation_matrix[position.x, position.y, t]
if current_reserver not in [0, agent_index]:
raise Exception(
f"Agent {agent_index} tried to reserve a position already reserved by another agent: {position} at time {t}, reserved by {current_reserver}"
)
self.reservation_matrix[position.x, position.y, t] = agent_index

"""
Clears the initial reservation for an agent by clearing reservations at its start position with its index for
from time 0 to the time limit.
"""
def clear_initial_reservation(self, position: Position, agent_index: int):
for t in range(self.time_limit):
if self.reservation_matrix[position.x, position.y, t] == agent_index:
self.reservation_matrix[position.x, position.y, t] = 0

show_animation = True

def main():
grid = Grid(
Expand Down
94 changes: 94 additions & 0 deletions PathPlanning/TimeBasedPathPlanning/Node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
from dataclasses import dataclass
from functools import total_ordering
import numpy as np

@dataclass(order=True)
class Position:
x: int
y: int

def as_ndarray(self) -> np.ndarray:
return np.array([self.x, self.y])

def __add__(self, other):
if isinstance(other, Position):
return Position(self.x + other.x, self.y + other.y)
raise NotImplementedError(
f"Addition not supported for Position and {type(other)}"
)

def __sub__(self, other):
if isinstance(other, Position):
return Position(self.x - other.x, self.y - other.y)
raise NotImplementedError(
f"Subtraction not supported for Position and {type(other)}"
)

def __hash__(self):
return hash((self.x, self.y))

@dataclass()
# Note: Total_ordering is used instead of adding `order=True` to the @dataclass decorator because
# this class needs to override the __lt__ and __eq__ methods to ignore parent_index. Parent
# index is just used to track the path found by the algorithm, and has no effect on the quality
# of a node.
@total_ordering
class Node:
position: Position
time: int
heuristic: int
parent_index: int

"""
This is what is used to drive node expansion. The node with the lowest value is expanded next.
This comparison prioritizes the node with the lowest cost-to-come (self.time) + cost-to-go (self.heuristic)
"""
def __lt__(self, other: object):
if not isinstance(other, Node):
return NotImplementedError(f"Cannot compare Node with object of type: {type(other)}")
return (self.time + self.heuristic) < (other.time + other.heuristic)

"""
Note: cost and heuristic are not included in eq or hash, since they will always be the same
for a given (position, time) pair. Including either cost or heuristic would be redundant.
"""
def __eq__(self, other: object):
if not isinstance(other, Node):
return NotImplementedError(f"Cannot compare Node with object of type: {type(other)}")
return self.position == other.position and self.time == other.time

def __hash__(self):
return hash((self.position, self.time))

class NodePath:
path: list[Node]
positions_at_time: dict[int, Position]

def __init__(self, path: list[Node]):
self.path = path
self.positions_at_time = {}
for i, node in enumerate(path):
reservation_finish_time = node.time + 1
if i < len(path) - 1:
reservation_finish_time = path[i + 1].time

for t in range(node.time, reservation_finish_time):
self.positions_at_time[t] = node.position

"""
Get the position of the path at a given time
"""
def get_position(self, time: int) -> Position | None:
return self.positions_at_time.get(time)

"""
Time stamp of the last node in the path
"""
def goal_reached_time(self) -> int:
return self.path[-1].time

def __repr__(self):
repr_string = ""
for i, node in enumerate(self.path):
repr_string += f"{i}: {node}\n"
return repr_string
Loading
Loading