Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Controller #10

Merged
merged 15 commits into from
Jan 8, 2025
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,6 @@ rb_ws/bags
docker-compose.yml~
*TEMP_DO_NOT_EDIT.txt
rb_ws/src/buggy/bags/*
*.bag
*.bag
.vision*/*
mehulgoel873 marked this conversation as resolved.
Show resolved Hide resolved
vision/data*/*
161 changes: 161 additions & 0 deletions rb_ws/src/buggy/buggy/controller/controller_node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
import threading
import sys

import numpy as np

import rclpy
from rclpy.node import Node

from std_msgs.msg import Float32, Float64, Bool
from nav_msgs.msg import Odometry

sys.path.append("/rb_ws/src/buggy/buggy")
from util.trajectory import Trajectory
from controller.stanley_controller import StanleyController

class Controller(Node):

def __init__(self):
"""
Constructor for Controller class.

Creates a ROS node with a publisher that periodically sends a message
indicating whether the node is still alive.

"""
super().__init__('controller')
self.get_logger().info('INITIALIZED.')


#Parameters
self.declare_parameter("dist", 0.0) #Starting Distance along path
start_dist = self.get_parameter("dist").value

self.declare_parameter("traj_name", "buggycourse_safe.json")
TiaSinghania marked this conversation as resolved.
Show resolved Hide resolved
traj_name = self.get_parameter("traj_name").value
self.cur_traj = Trajectory(json_filepath="/rb_ws/src/buggy/paths/" + traj_name) #TODO: Fixed filepath, not good

start_index = self.cur_traj.get_index_from_distance(start_dist)

self.declare_parameter("controller_name", "stanley")

controller_name = self.get_parameter("controller_name").value
print(controller_name.lower)
if (controller_name.lower() == "stanley"):
self.controller = StanleyController(start_index = start_index, namespace = self.get_namespace(), node=self) #IMPORT STANLEY
else:
self.get_logger().error("Invalid Controller Name: " + controller_name.lower())
raise Exception("Invalid Controller Argument")

# Publishers
self.init_check_publisher = self.create_publisher(Bool,
"debug/init_safety_check", 1
)
self.steer_publisher = self.create_publisher(
Float64, "input/steering", 1
)
self.heading_publisher = self.create_publisher(
Float32, "auton/debug/heading", 1
)

# Subscribers
self.odom_subscriber = self.create_subscription(Odometry, 'self/state', self.odom_listener, 1)
self.traj_subscriber = self.create_subscription(Odometry, 'self/cur_traj', self.traj_listener, 1)

self.lock = threading.Lock()

self.odom = None
self.passed_init = False

timer_period = 0.01 # seconds (100 Hz)
self.timer = self.create_timer(timer_period, self.loop)

def odom_listener(self, msg : Odometry):
'''
This is the subscriber that updates the buggies state for navigation
msg, should be a CLEAN state as defined in the wiki
'''
with self.lock:
self.odom = msg

def traj_listener(self, msg): #TYPE UNKOWN AS OF NOW?? CUSTOM TYPE WHEN #TODO: FIGURE OUT BEFORE MERGE
TiaSinghania marked this conversation as resolved.
Show resolved Hide resolved
'''
This is the subscriber that updates the buggies trajectory for navigation
'''
with self.lock:
self.cur_traj, self.controller.current_traj_index = Trajectory.unpack(msg)

def init_check(self):
"""
Checks if it's safe to switch the buggy into autonomous driving mode.
Specifically, it checks:
if we can recieve odometry messages from the buggy
if the covariance is acceptable (less than 1 meter)
if the buggy thinks it is facing in the correct direction wrt the local trajectory (not 180 degrees flipped)

Returns:
A boolean describing the status of the buggy (safe for auton or unsafe for auton)
"""
if (self.odom == None):
self.get_logger().warn("WARNING: no available position estimate")
return False

elif (self.odom.pose.covariance[0] ** 2 + self.odom.pose.covariance[7] ** 2 > 1**2):
self.get_logger().warn("checking position estimate certainty")
return False

#Originally under a lock, doesn't seem necessary?
current_heading = self.odom.pose.pose.orientation.z
closest_heading = self.cur_traj.get_heading_by_index(self.cur_traj.get_closest_index_on_path(self.odom.pose.pose.position.x, self.odom.pose.pose.position.y))

self.get_logger().info("current heading: " + str(np.rad2deg(current_heading)))
msg = Float32()
msg.data = np.rad2deg(current_heading)
self.heading_publisher.publish(msg)

#Converting headings from [-pi, pi] to [0, 2pi]
if (current_heading < 0):
current_heading = 2 * np.pi + current_heading
if (closest_heading < 0):
closest_heading = 2 * np.pi + closest_heading

if (abs(current_heading - closest_heading) >= np.pi/2):
self.get_logger().warn("WARNING: INCORRECT HEADING! restart stack. Current heading [-180, 180]: " + str(np.rad2deg(current_heading)))
return False

return True

def loop(self):
if not self.passed_init:
self.passed_init = self.init_check()
msg = Bool()
msg.data = self.passed_init
self.init_check_publisher.publish(msg)
if self.passed_init:
self.get_logger().info("Passed Initialization Check")
else:
return

self.heading_publisher.publish(Float32(data=np.rad2deg(self.odom.pose.pose.orientation.z)))
steering_angle = self.controller.compute_control(self.odom, self.cur_traj)
steering_angle_deg = np.rad2deg(steering_angle)
self.steer_publisher.publish(Float64(data=float(steering_angle_deg)))



def main(args=None):
rclpy.init(args=args)

controller = Controller()

rclpy.spin(controller)

# Destroy the node explicitly
# (optional - otherwise it will be done automatically
# when the garbage collector destroys the node object)
controller.destroy_node()
rclpy.shutdown()


if __name__ == '__main__':
main()
49 changes: 49 additions & 0 deletions rb_ws/src/buggy/buggy/controller/controller_superclass.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from abc import ABC, abstractmethod
import sys

from nav_msgs.msg import Odometry


sys.path.append("/rb_ws/src/buggy/buggy")
from util.trajectory import Trajectory

class Controller(ABC):
"""
Base class for all controllers.

The controller takes in the current state of the buggy and a reference
trajectory. It must then compute the desired control output.

The method that it does this by is up to the implementation, of course.
Example schemes include Pure Pursuit, Stanley, and LQR.
"""

# TODO: move this to a constants class
NAND_WHEELBASE = 1.3
SC_WHEELBASE = 1.104

def __init__(self, start_index: int, namespace : str, node) -> None:
self.namespace = namespace
if namespace.upper() == '/NAND':
Controller.WHEELBASE = self.NAND_WHEELBASE
else:
Controller.WHEELBASE = self.SC_WHEELBASE

self.current_traj_index = start_index
self.node = node

@abstractmethod
def compute_control(
self, state_msg: Odometry, trajectory: Trajectory,
) -> float:
"""
Computes the desired control output given the current state and reference trajectory

Args:
state: (Odometry): state of the buggy, including position, attitude and associated twists
trajectory (Trajectory): reference trajectory

Returns:
float (desired steering angle)
"""
raise NotImplementedError
150 changes: 150 additions & 0 deletions rb_ws/src/buggy/buggy/controller/stanley_controller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
import sys
import numpy as np

from sensor_msgs.msg import NavSatFix
from geometry_msgs.msg import Pose as ROSPose
from nav_msgs.msg import Odometry

sys.path.append("/rb_ws/src/buggy/buggy")
from util.trajectory import Trajectory
from controller.controller_superclass import Controller
from util.pose import Pose

import utm


class StanleyController(Controller):
"""
Stanley Controller (front axle used as reference point)
mehulgoel873 marked this conversation as resolved.
Show resolved Hide resolved
Referenced from this paper: https://ai.stanford.edu/~gabeh/papers/hoffmann_stanley_control07.pdf
"""

LOOK_AHEAD_DIST_CONST = 0.05 # s
MIN_LOOK_AHEAD_DIST = 0.1 #m
MAX_LOOK_AHEAD_DIST = 2.0 #m

CROSS_TRACK_GAIN = 1.3
K_SOFT = 1.0 # m/s
K_D_YAW = 0.012 # rad / (rad/s)

def __init__(self, start_index, namespace, node):
super(StanleyController, self).__init__(start_index, namespace, node)
self.debug_reference_pos_publisher = self.node.create_publisher(
NavSatFix, "auton/debug/reference_navsat", 1
)
self.debug_error_publisher = self.node.create_publisher(
ROSPose, "auton/debug/error", 1
)

def compute_control(self, state_msg : Odometry, trajectory : Trajectory):
"""Computes the steering angle determined by Stanley controller.
Does this by looking at the crosstrack error + heading error

Args:
state_msg: ros Odometry message
trajectory (Trajectory): reference trajectory

Returns:
float (desired steering angle)
"""
if self.current_traj_index >= trajectory.get_num_points() - 1:
self.node.get_logger.error("[Stanley]: Ran out of path to follow!")
raise Exception("[Stanley]: Ran out of path to follow!")

current_rospose = state_msg.pose.pose
current_speed = np.sqrt(
state_msg.twist.twist.linear.x**2 + state_msg.twist.twist.linear.y**2
)
yaw_rate = state_msg.twist.twist.angular.z
heading = current_rospose.orientation.z
x, y = current_rospose.position.x, current_rospose.position.y #(Easting, Northing)

front_x = x + StanleyController.WHEELBASE * np.cos(heading)
TiaSinghania marked this conversation as resolved.
Show resolved Hide resolved
front_y = y + StanleyController.WHEELBASE * np.sin(heading)

# setting range of indices to search so we don't have to search the entire path
traj_index = trajectory.get_closest_index_on_path(
front_x,
front_y,
start_index=self.current_traj_index - 20,
end_index=self.current_traj_index + 50,
)
self.current_traj_index = max(traj_index, self.current_traj_index)
TiaSinghania marked this conversation as resolved.
Show resolved Hide resolved


lookahead_dist = np.clip(
self.LOOK_AHEAD_DIST_CONST * current_speed,
self.MIN_LOOK_AHEAD_DIST,
self.MAX_LOOK_AHEAD_DIST)

traj_dist = trajectory.get_distance_from_index(self.current_traj_index) + lookahead_dist

ref_heading = trajectory.get_heading_by_index(
trajectory.get_index_from_distance(traj_dist))

error_heading = ref_heading - heading
error_heading = np.arctan2(np.sin(error_heading), np.cos(error_heading)) #Bounds error_heading

# Calculate cross track error by finding the distance from the buggy to the tangent line of
# the reference trajectory
closest_position = trajectory.get_position_by_index(self.current_traj_index)
next_position = trajectory.get_position_by_index(
self.current_traj_index + 0.0001
TiaSinghania marked this conversation as resolved.
Show resolved Hide resolved
)
x1 = closest_position[0]
y1 = closest_position[1]
x2 = next_position[0]
y2 = next_position[1]
error_dist = -((x - x1) * (y2 - y1) - (y - y1) * (x2 - x1)) / np.sqrt(
(y2 - y1) ** 2 + (x2 - x1) ** 2
)


cross_track_component = -np.arctan2(
StanleyController.CROSS_TRACK_GAIN * error_dist, current_speed + StanleyController.K_SOFT
)

# Calculate yaw rate error
r_meas = yaw_rate
r_traj = current_speed * (trajectory.get_heading_by_index(trajectory.get_index_from_distance(traj_dist) + 0.05)
TiaSinghania marked this conversation as resolved.
Show resolved Hide resolved
- trajectory.get_heading_by_index(trajectory.get_index_from_distance(traj_dist))) / 0.05

#Determine steering_command
steering_cmd = error_heading + cross_track_component + StanleyController.K_D_YAW * (r_traj - r_meas)
steering_cmd = np.clip(steering_cmd, -np.pi / 9, np.pi / 9)
TiaSinghania marked this conversation as resolved.
Show resolved Hide resolved


#Calculate error, where x is in orientation of buggy, y is cross track error
current_pose = Pose(current_rospose.position.x, current_rospose.position.y, heading)
reference_error = current_pose.convert_point_from_global_to_local_frame(closest_position)
reference_error -= np.array([StanleyController.WHEELBASE, 0]) #Translae back to back wheel to get accurate error

error_pose = ROSPose()
error_pose.position.x = reference_error[0]
error_pose.position.y = reference_error[1]
self.debug_error_publisher.publish(error_pose)

# Publish reference position for debugging
try:
reference_navsat = NavSatFix()
lat, lon = utm.to_latlon(closest_position[0], closest_position[1], 17, "T")
reference_navsat.latitude = lat
reference_navsat.longitude = lon
self.debug_reference_pos_publisher.publish(reference_navsat)
except Exception as e:
self.node.get_logger().warn(
"[Stanley] Unable to convert closest track position lat lon; Error: " + str(e)
)

return steering_cmd











Loading
Loading