Skip to content

Commit

Permalink
added stopping and RTH script in sim and deploy env
Browse files Browse the repository at this point in the history
  • Loading branch information
Rather1337 committed Nov 28, 2024
1 parent 4a46d10 commit 3b4ef6f
Show file tree
Hide file tree
Showing 3 changed files with 529 additions and 41 deletions.
247 changes: 211 additions & 36 deletions lsy_drone_racing/envs/drone_racing_deploy_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@

import gymnasium
import numpy as np
import minsnap_trajectories as ms

Check failure on line 31 in lsy_drone_racing/envs/drone_racing_deploy_env.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (F401)

lsy_drone_racing/envs/drone_racing_deploy_env.py:31:32: F401 `minsnap_trajectories` imported but unused
from scipy.interpolate import CubicSpline
from lsy_drone_racing.utils.quintic_spline import QuinticSpline
import rospy

Check failure on line 34 in lsy_drone_racing/envs/drone_racing_deploy_env.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (F401)

lsy_drone_racing/envs/drone_racing_deploy_env.py:34:8: F401 `rospy` imported but unused
from gymnasium import spaces
from scipy.spatial.transform import Rotation as R
Expand Down Expand Up @@ -165,43 +168,215 @@ def step(

def close(self):
"""Close the environment by stopping the drone and landing."""

Check failure on line 170 in lsy_drone_racing/envs/drone_racing_deploy_env.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (D202)

lsy_drone_racing/envs/drone_racing_deploy_env.py:170:9: D202 No blank lines allowed after function docstring (found 1)
start_pos = self.vicon.pos[self.vicon.drone_name]
gate_rot = R.from_euler("xyz", self.config.env.track.gates[-1].rpy)
final_pos = start_pos + gate_rot.as_matrix()[:, 1]
# Slow down after last gate and rise over the gates
t_max = 2.0
t_start = time.perf_counter()
while (dt := time.perf_counter() - t_start) < t_max:
rospy.sleep(0.033)
alpha = np.sqrt(np.minimum(dt / t_max, 1.0)) # Non-linear breaking
target_pos = alpha * final_pos + (1 - alpha) * start_pos
self.cf.cmdFullState(target_pos, np.zeros(3), np.zeros(3), 0, np.zeros(3))
# Fly up to avoid collisions
t_max = 2.0
t_start = time.perf_counter()
start_pos = self.vicon.pos[self.vicon.drone_name]
final_pos[2] = 2.0
while (dt := time.perf_counter() - t_start) < t_max:
rospy.sleep(0.033)
alpha = np.minimum(dt / t_max, 1.0)
target_pos = alpha * final_pos + (1 - alpha) * start_pos
self.cf.cmdFullState(target_pos, np.zeros(3), np.zeros(3), 0, np.zeros(3))
# Move back to intial state
t_max = 4.0
t_start = time.perf_counter()
start_pos = self.vicon.pos[self.vicon.drone_name]
final_pos = np.array([*self.config.env.track.drone.pos[:2], 2.0])
offset = 1.0 # Additional time at the end to really reach the des. position
while (dt := time.perf_counter() - t_start) < t_max + offset:
alpha = min(dt / t_max, 1.0)
target_pos = alpha * final_pos + (1 - alpha) * start_pos
# Additionally pass position difference as vel for more landing accuracy
vel = target_pos - self.vicon.pos[self.vicon.drone_name]
self.cf.cmdFullState(target_pos, vel, np.zeros(3), 0, np.zeros(3))
rospy.sleep(0.033)


debug = True # print statements + makes the tracks plot in sim
decelerate = False # makes the drone simulate the stopping motion
return_home = False # makes the drone simulate the return to home after stopping
if return_home: decelerate = True

Check failure on line 175 in lsy_drone_racing/envs/drone_racing_deploy_env.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (E701)

lsy_drone_racing/envs/drone_racing_deploy_env.py:175:23: E701 Multiple statements on one line (colon)

x_end = 2.0 # distance the drone is supposed to stop behind the last gate
t_RTH = 9.0 # time it takes to get back to start (return to home)
t_step_ctrl = 1/self.config.env.freq # control step
t_hover = 2.0 # time the drone hovers before RTH and before landing
height_homing = 2.0 # height of the return path

##### First, simply stop the drone with constant acceleration
if decelerate:
# do one step to get the current acceleration
start_pos = self.obs["pos"]
start_vel = self.obs["vel"]
self.step(np.array([*start_pos,
*start_vel,
0,0,0,
0,0,0,0]))
time.sleep(t_step_ctrl)

start_acc = -(start_vel - self.obs["vel"])/t_step_ctrl

if debug:
obs_x = []
obs_v = []
cmd_x = []
cmd_v = []
cmd_a = []
start_pos = self.obs["pos"]
start_vel = self.obs["vel"]
obs_x.append(self.obs["pos"])
obs_v.append(self.obs["vel"])
cmd_x.append(start_pos)
cmd_v.append(start_vel)
cmd_a.append(start_acc)

direction = start_vel/np.linalg.norm(start_vel) # unit vector in the direction of travel
direction_angle = np.arccos( (np.dot(direction, [0,0,1])) / (1*1) )
direction_angle = -(direction_angle-np.pi/2) # angle to the floor => negative means v_z < 0

# drone can actually go further to no reach the x_end limit if angle!=0
x_end = x_end/np.cos(direction_angle)
# check if drone would crash into floor or ceiling
x_end_z = start_pos[2] + x_end*np.sin(direction_angle)
if x_end_z < 0.2:
if debug: print("x_end_z<0.2")

Check failure on line 219 in lsy_drone_racing/envs/drone_racing_deploy_env.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (E701)

lsy_drone_racing/envs/drone_racing_deploy_env.py:219:25: E701 Multiple statements on one line (colon)
x_end = (0.2 - start_pos[2])/np.sin(direction_angle)
elif x_end_z > 2.5:
if debug: print("x_end_z>2.5")

Check failure on line 222 in lsy_drone_racing/envs/drone_racing_deploy_env.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (E701)

lsy_drone_racing/envs/drone_racing_deploy_env.py:222:25: E701 Multiple statements on one line (colon)
x_end = (2.5 - start_pos[2])/np.sin(direction_angle)

if debug:
print(f"start_pos_z={start_pos[2]}, x_end_z={start_pos[2] + x_end*np.sin(direction_angle)}, x_end={x_end}")
print(f"direction_angle={direction_angle*180/np.pi}°")

const_acc = np.linalg.norm(start_vel)**2/(x_end) # this is just an estimate of what constant deceleration is necessary
t_brake_max = np.sqrt(4*x_end/const_acc) # the time it takes to brake completely
t_brake = np.arange(0, t_brake_max, t_step_ctrl)

if debug:
print(f"t_brake={t_brake_max}")
print(f"v_gate = {start_vel}")

quintic_spline = QuinticSpline(0, t_brake_max, start_pos, start_vel, start_acc,
start_pos+direction*x_end, start_vel*0, start_acc*0)
ref_pos_stop = quintic_spline(t_brake, order=0)
ref_vel_stop = quintic_spline(t_brake, order=1)
ref_acc_stop = quintic_spline(t_brake, order=2)

# if debug:
# try:
# step = 5
# for i in np.arange(0, len(ref_pos_stop[:,0]) - step, step):
# p.addUserDebugLine(
# ref_pos_stop[i,:],
# ref_pos_stop[i + step,:],
# lineColorRGB=[0, 1, 0],
# lineWidth=2,
# lifeTime=0, # 0 means the line persists indefinitely
# physicsClientId=0,
# )
# except p.error:
# print("PyBullet not available") # Ignore errors if PyBullet is not available

# apply controls to slow down
for i in np.arange(len(ref_pos_stop)):
pos = self.obs["pos"]
vel = self.obs["vel"]
direction = vel/np.linalg.norm(vel[:2])
if debug:
obs_x.append(self.obs["pos"])
obs_v.append(self.obs["vel"])
cmd_x.append(ref_pos_stop[i])
cmd_v.append(ref_vel_stop[i])
cmd_a.append(ref_acc_stop[i])
if np.linalg.norm(vel) < 0.05:
print(f"leaving stopping loop at v={vel}")
break

self.step(np.array([ref_pos_stop[i,0],ref_pos_stop[i,1],ref_pos_stop[i,2],
ref_vel_stop[i,0],ref_vel_stop[i,1],ref_vel_stop[i,2],
ref_acc_stop[i,0],ref_acc_stop[i,1],ref_acc_stop[i,2],
0,0,0,0]))
time.sleep(t_step_ctrl)

# apply controls to hover for some more time
for i in np.arange(int(t_hover/t_step_ctrl)):
if debug:
obs_x.append(self.obs["pos"])
obs_v.append(self.obs["vel"])
cmd_x.append(np.array(pos))
cmd_v.append(np.array([0,0,0]))
cmd_a.append(np.array([0,0,0]))
self.step(np.array([pos[0],pos[1],pos[2],
0,0,0,0,0,0,
0,0,0,0]))
time.sleep(t_step_ctrl)

##### Second, return to home along a spline
if return_home:
t_returnhome = np.arange(0, t_RTH, t_step_ctrl)
start_pos = self.obs["pos"]
start_vel = self.obs["vel"]

landing_pos = np.array([*self.config.env.track.drone.pos[:2], 0.25]) # 0.25m above actual landing pos

intermed_delta = landing_pos - start_pos
intermed_pos1 = [start_pos[0] + intermed_delta[0]/4, start_pos[1] + intermed_delta[1]/4, height_homing]
intermed_pos2 = [intermed_pos1[0] + intermed_delta[0]/2, intermed_pos1[1] + intermed_delta[1]/2, intermed_pos1[2]]
intermed_pos3 = [0,0,0]
intermed_pos3[0] = (5*landing_pos[0]+intermed_pos2[0])/6
intermed_pos3[1] = (5*landing_pos[1]+intermed_pos2[1])/6
intermed_pos3[2] = (landing_pos[2]+intermed_pos2[2])/2

waypoints = np.array([
start_pos,
intermed_pos1,
intermed_pos2,
intermed_pos3,
landing_pos,
])
spline = CubicSpline(np.linspace(0, t_RTH, len(waypoints)), waypoints, bc_type=((1, [0,0,0]), (1, [0,0,0]))) # bc type set boundary conditions for the derivative (here 1)
ref_pos_return = spline(t_returnhome)
spline_v = spline.derivative()
ref_vel_return = spline_v(t_returnhome)

# if debug:
# try:
# step = 5
# for i in np.arange(0, len(ref_pos_return) - step, step):
# p.addUserDebugLine(
# ref_pos_return[i],
# ref_pos_return[i + step],
# lineColorRGB=[0, 0, 1],
# lineWidth=2,
# lifeTime=0, # 0 means the line persists indefinitely
# physicsClientId=0,
# )
# p.addUserDebugText("x", start_pos, textColorRGB=[0,1,0])
# p.addUserDebugText("x", intermed_pos1, textColorRGB=[0,0,1])
# p.addUserDebugText("x", intermed_pos2, textColorRGB=[0,0,1])
# p.addUserDebugText("x", intermed_pos3, textColorRGB=[0,0,1])
# p.addUserDebugText("x", landing_pos, textColorRGB=[0,0,1])
# except p.error:
# print("PyBullet not available") # Ignore errors if PyBullet is not available


for i in np.arange(len(t_returnhome)):
self.step(np.array([ref_pos_return[i,0],ref_pos_return[i,1],ref_pos_return[i,2],
ref_vel_return[i,0],ref_vel_return[i,1],ref_vel_return[i,2],
0,0,0,0,0,0,0]))
time.sleep(t_step_ctrl)


for i in np.arange(int(t_hover/t_step_ctrl)): # hover for some more time
self.step(np.array([ref_pos_return[-1,0],ref_pos_return[-1,1],ref_pos_return[-1,2],0,0,0,0,0,0,0,0,0,0]))
time.sleep(t_step_ctrl)

# if debug:
# plt.rcParams.update(plt.rcParamsDefault)
# fig, ax = plt.subplots(1,3)
# obs_x = np.array(obs_x)
# cmd_x = np.array(cmd_x)
# ax[0].plot(np.arange(len(obs_x[:,0])), -obs_x[:,0], label="obs")
# ax[0].plot(np.arange(len(cmd_x[:,0])), -cmd_x[:,0], label="cmd")
# ax[0].set_title("Position")

# obs_v = np.array(obs_v)
# cmd_v = np.array(cmd_v)
# ax[1].plot(np.arange(len(obs_v[:,0])), -obs_v[:,0], label="obs")
# ax[1].plot(np.arange(len(cmd_v[:,0])), -cmd_v[:,0], label="cmd")
# ax[1].set_title("Velocity")

# obs_a = np.array(obs_v)
# cmd_a = np.array(cmd_a)
# obs_a[1:-1] = (obs_a[2:]-obs_a[:-2])/(2*t_step_ctrl)
# ax[2].plot(np.arange(len(obs_a[:,0])), -obs_a[:,0], label="obs")
# ax[2].plot(np.arange(len(cmd_a[:,0])), -cmd_a[:,0], label="cmd")
# ax[2].set_title("Acceleration")
# plt.legend()
# plt.show()

time.sleep(t_step_ctrl)
self.cf.notifySetpointsStop()
self.cf.land(0.05, 3.0)
self.cf.land(0.05, 2.0)


@property
def obs(self) -> dict:
Expand Down
Loading

0 comments on commit 3b4ef6f

Please sign in to comment.