Beagle Board - beagleboard.org
Jack YehLucy ZhuAntonioNick Zhang
Published

Team Spy Family

Autonomous vehicles using BeagleBone AI-64 can follow lanes, stop at red boxes and detect stop signs.

ExpertFull instructions provided17
Team Spy Family

Things used in this project

Hardware components

BeagleBoard.org BeagleBone AI-64
×1
Webcam, Logitech® HD Pro
Webcam, Logitech® HD Pro
×1
USB WiFi adapter
×1
portable charger
×1

Software apps and online services

OpenCV
OpenCV

Story

Read more

Code

Optical sensor driver code

C/C++
This is the driver code for the optical sensor installed on the shaft of the car. It increments a counter for every rising edge. If you sample the conter at a constant frequency and take the difference between readings, you get a number which is propotional to the speed of the car.
#include <linux/module.h>
#include <linux/of_device.h>
#include <linux/kernel.h>
#include <linux/gpio/consumer.h>
#include <linux/platform_device.h>
/* YOU WILL NEED OTHER HEADER FILES */
#include <linux/interrupt.h>
#include <linux/gpio.h>

/* YOU WILL HAVE TO DECLARE SOME VARIABLES HERE */
unsigned int irq_number;
struct gpio_desc *speed_encode;
static int op_count = 0;
module_param(op_count, int, S_IRUGO);

/* ADD THE INTERRUPT SERVICE ROUTINE HERE */
static irq_handler_t gpio_irq_handler(unsigned int irq, void *dev_id, struct pt_regs *regs) {
	op_count += 1;
	return (irq_handler_t) IRQ_HANDLED; 
}


// probe function
static int led_probe(struct platform_device *pd/*INSERT*/)
{
	/*INSERT*/
	printk("op_encode: Loading module... ");
	// struct gpio_desc *__must_check devm_gpiod_get(struct device *dev, const char *con_id, enum gpiod_flags flags)
	speed_encode = devm_gpiod_get(&pd->dev, "userbutton", GPIOD_IN);
	gpiod_set_debounce(speed_encode, 10000000); // debouce is set to 10 million microseconds, which doesn't make sense but works...
	irq_number = gpiod_to_irq(speed_encode); // set up interrupt service routine
	request_irq(irq_number, (irq_handler_t) gpio_irq_handler, IRQF_TRIGGER_RISING, "op_encode_driver", NULL);

	return 0;
}

// remove function
static int led_remove(struct platform_device *pd/*INSERT*/)
{
	/* INSERT: Free the irq and print a message */
	free_irq(irq_number, NULL);
	printk("gpiod_driver: irq freed, module removed... ");
	return 0;
}

static struct of_device_id matchy_match[] = {
    // {/*INSERT*/},
	{.compatible = "hello"},
    {/* leave alone - keep this here (end node) */},
};

// platform driver object
static struct platform_driver adam_driver = {
	.probe	 = led_probe/*INSERT*/,
	.remove	 = led_remove/*INSERT*/,
	.driver	 = {
	       .name  = "The Rock: this name doesn't even matter",
	       .owner = THIS_MODULE,
	       .of_match_table = matchy_match/*INSERT*/,
	},
};

module_platform_driver(adam_driver/*INSERT*/);

MODULE_DESCRIPTION("424\'s finest");
MODULE_AUTHOR("GOAT");
MODULE_LICENSE("GPL v2");
MODULE_ALIAS("platform:adam_driver");

Elec553FinalProject.py

Python
import cv2
import numpy as np
import matplotlib.pyplot as plt
import math
import sys
import time

# based on: https://www.hackster.io/really-bad-idea/autonomous-path-following-car-6c4992#overview


# Throttle
throttlePin = "P9_14"
go_forward = 8.22
dont_move_throttle = 7.75
dont_move_steer = 7.5

# op encode
speed = 0
prev = 0
go_faster_tick = 0  # Do not change this here. Code will set this value after seeing stop sign

# Encoder related variables
speedCheck = 10 # number of frames in between speed adjustment
TARGET = 30 #Target value we want to get from our encoder 
SPEED_STEP = 0.02
SPEED_CEIL = 8.5
SPEED_FLOOR = 8.1

# Steering
steeringPin = "P9_16"
left = 9
right = 6


with open("yolov3.txt", 'r') as f:
    classes = [line.strip() for line in f.readlines()]

net = cv2.dnn.readNet("yolov3.weights", "yolov3.cfg")


def throttle(duty_cycle):
    duty_to_write = 200000 * duty_cycle
    duty_to_write = int(duty_to_write)
    with open('/dev/bone/pwm/1/a/duty_cycle', 'w') as filetowrite:
        filetowrite.write(str(duty_to_write))

def steer(duty_cycle):
    duty_to_write = 200000 * duty_cycle
    duty_to_write = int(duty_to_write)
    with open('/dev/bone/pwm/1/b/duty_cycle', 'w') as filetowrite:
        filetowrite.write(str(duty_to_write))

def getRedFloorBoundaries():
    """
    Gets the hsv boundaries and success boundaries indicating if the floor is red
    :return: [[lower color and success boundaries for red floor], [upper color and success boundaries for red floor]]
    """
    return getBoundaries("redboundaries.txt")


def isRedFloorVisible(frame):
    """
    Detects whether or not the floor is red
    :param frame: Image
    :return: [(True is the camera sees a red on the floor, false otherwise), video output]
    """
    # print("Checking for floor stop")
    boundaries = getRedFloorBoundaries()
    return isMostlyColor(frame, boundaries)


def isMostlyColor(image, boundaries):
    """
    Detects whether or not the majority of a color on the screen is a particular color
    :param image:
    :param boundaries: [[color boundaries], [success boundaries]]
    :return: boolean if image satisfies provided boundaries, and an image used for debugging
    """
    #Convert to HSV color space
    hsv_img = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)

    #parse out the color boundaries and the success boundaries
    color_boundaries = boundaries[0]
    percentage = boundaries[1]

    lower = np.array(color_boundaries[0])
    upper = np.array(color_boundaries[1])
    mask = cv2.inRange(hsv_img, lower, upper)
    output = cv2.bitwise_and(hsv_img, hsv_img, mask=mask)

    #Calculate what percentage of image falls between color boundaries
    percentage_detected = np.count_nonzero(mask) * 100 / np.size(mask)
    # print("percentage_detected " + str(percentage_detected) + " lower " + str(lower) + " upper " + str(upper))
    # If the percentage percentage_detected is betweeen the success boundaries, we return true, otherwise false for result
    result = percentage[0] < percentage_detected <= percentage[1]
    # if result:
        # print(percentage_detected)
    return result, output


def getBoundaries(filename):
    """
    Reads the boundaries from the file filename
    Format:
        [0] lower: [H, S, V, lower percentage for classification of success]
        [1] upper: [H, S, V, upper percentage for classification of success]
    :param filename: file containing boundary information as above
    :return: [[lower color and success boundaries], [upper color and success boundaries]]
    """
    default_lower_percent = 50
    default_upper_percent = 100
    with open(filename, "r") as f:
        boundaries = f.readlines()
        lower_data = [val for val in boundaries[0].split(",")]
        upper_data = [val for val in boundaries[1].split(",")]

        if len(lower_data) >= 4:
            lower_percent = float(lower_data[3])
        else:
            lower_percent = default_lower_percent

        if len(upper_data) >= 4:
            upper_percent = float(upper_data[3])
        else:
            upper_percent = default_upper_percent

        lower = [int(x) for x in lower_data[:3]]
        upper = [int(x) for x in upper_data[:3]]
        boundaries = [lower, upper]
        percentages = [lower_percent, upper_percent]
    return boundaries, percentages


def initialize_car():
    # give 7.5% duty at 50Hz to throttle
    #PWM.start(throttlePin, dont_move, frequency=50)
    throttle(dont_move_throttle)
    # wait for car to be ready
    
    #PWM.start(steeringPin, dont_move, frequency=50)
    steer(dont_move_steer)
   

def stop():
    """
    Stops the car
    :return: none
    """
    #PWM.set_duty_cycle(throttlePin, dont_move)
    throttle(dont_move_throttle)
    

def go():
    """
    Sends the car forward at a default PWM
    :return: none
    """
    #PWM.set_duty_cycle(throttlePin, go_forward)
    throttle(go_forward)

def detect_edges(frame):
    # filter for blue lane lines
    hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
    # cv2.imshow("HSV",hsv)
    lower_blue = np.array([90, 120, 0], dtype="uint8")
    upper_blue = np.array([150, 255, 255], dtype="uint8")
    mask = cv2.inRange(hsv, lower_blue, upper_blue)
    # cv2.imshow("mask",mask)

    # detect edges
    edges = cv2.Canny(mask, 50, 100)
    # cv2.imshow("edges",edges)

    return edges


def region_of_interest(edges):
    height, width = edges.shape
    mask = np.zeros_like(edges)

    # only focus lower half of the screen
    polygon = np.array([[
        (0, height),
        (0, height / 2),
        (width, height / 2),
        (width, height),
    ]], np.int32)

    cv2.fillPoly(mask, polygon, 255)

    cropped_edges = cv2.bitwise_and(edges, mask)
    # cv2.imshow("roi",cropped_edges)

    return cropped_edges


def detect_line_segments(cropped_edges):
    rho = 1
    theta = np.pi / 180
    min_threshold = 10

    line_segments = cv2.HoughLinesP(cropped_edges, rho, theta, min_threshold,
                                    np.array([]), minLineLength=5, maxLineGap=150)

    return line_segments


def average_slope_intercept(frame, line_segments):
    lane_lines = []

    if line_segments is None:
        # print("no line segments detected")
        return lane_lines

    height, width, _ = frame.shape
    left_fit = []
    right_fit = []

    boundary = 1 / 3
    left_region_boundary = width * (1 - boundary)
    right_region_boundary = width * boundary

    for line_segment in line_segments:
        for x1, y1, x2, y2 in line_segment:
            if x1 == x2:
                # print("skipping vertical lines (slope = infinity")
                continue

            fit = np.polyfit((x1, x2), (y1, y2), 1)
            slope = (y2 - y1) / (x2 - x1)
            intercept = y1 - (slope * x1)

            if slope < 0:
                if x1 < left_region_boundary and x2 < left_region_boundary:
                    left_fit.append((slope, intercept))
            else:
                if x1 > right_region_boundary and x2 > right_region_boundary:
                    right_fit.append((slope, intercept))

    left_fit_average = np.average(left_fit, axis=0)
    if len(left_fit) > 0:
        lane_lines.append(make_points(frame, left_fit_average))

    right_fit_average = np.average(right_fit, axis=0)
    if len(right_fit) > 0:
        lane_lines.append(make_points(frame, right_fit_average))

    return lane_lines


def make_points(frame, line):
    height, width, _ = frame.shape

    slope, intercept = line

    y1 = height  # bottom of the frame
    y2 = int(y1 / 2)  # make points from middle of the frame down

    if slope == 0:
        slope = 0.1

    x1 = int((y1 - intercept) / slope)
    x2 = int((y2 - intercept) / slope)

    return [[x1, y1, x2, y2]]


def display_lines(frame, lines, line_color=(0, 255, 0), line_width=6):
    line_image = np.zeros_like(frame)

    if lines is not None:
        for line in lines:
            for x1, y1, x2, y2 in line:
                cv2.line(line_image, (x1, y1), (x2, y2), line_color, line_width)

    line_image = cv2.addWeighted(frame, 0.8, line_image, 1, 1)

    return line_image


def display_heading_line(frame, steering_angle, line_color=(0, 0, 255), line_width=5):
    heading_image = np.zeros_like(frame)
    height, width, _ = frame.shape

    steering_angle_radian = steering_angle / 180.0 * math.pi

    x1 = int(width / 2)
    y1 = height
    x2 = int(x1 - height / 2 / math.tan(steering_angle_radian))
    y2 = int(height / 2)

    cv2.line(heading_image, (x1, y1), (x2, y2), line_color, line_width)
    heading_image = cv2.addWeighted(frame, 0.8, heading_image, 1, 1)

    return heading_image


def get_steering_angle(frame, lane_lines):
    height, width, _ = frame.shape

    if len(lane_lines) == 2:
        _, _, left_x2, _ = lane_lines[0][0]
        _, _, right_x2, _ = lane_lines[1][0]
        mid = int(width / 2)
        x_offset = (left_x2 + right_x2) / 2 - mid
        y_offset = int(height / 2)

    elif len(lane_lines) == 1:
        x1, _, x2, _ = lane_lines[0][0]
        x_offset = x2 - x1
        y_offset = int(height / 2)

    elif len(lane_lines) == 0:
        x_offset = 0
        y_offset = int(height / 2)

    angle_to_mid_radian = math.atan(x_offset / y_offset)
    angle_to_mid_deg = int(angle_to_mid_radian * 180.0 / math.pi)
    steering_angle = angle_to_mid_deg + 90

    return steering_angle


def plot_pd(p_vals, d_vals, error, show_img=False):
    fig, ax1 = plt.subplots()
    t_ax = np.arange(len(p_vals))
    ax1.plot(t_ax, p_vals, '-', label="P values")
    ax1.plot(t_ax, d_vals, '-', label="D values")
    ax2 = ax1.twinx()
    ax2.plot(t_ax, error, '--r', label="Error")

    ax1.set_xlabel("Frames")
    ax1.set_ylabel("PD Value")
    ax2.set_ylim(-90, 90)
    ax2.set_ylabel("Error Value")

    plt.title("PD Values over time")
    fig.legend()
    fig.tight_layout()
    plt.savefig("pd_plot.png")

    if show_img:
        plt.show()
    plt.clf()


def plot_pwm(speed_pwms, turn_pwms, error, show_img=False):
    fig, ax1 = plt.subplots()
    t_ax = np.arange(len(speed_pwms))
    ax1.plot(t_ax, speed_pwms, '-', label="Speed PWM")
    ax1.plot(t_ax, turn_pwms, '-', label="Steering PWM")
    ax2 = ax1.twinx()
    ax2.plot(t_ax, error, '--r', label="Error")

    ax1.set_xlabel("Frames")
    ax1.set_ylabel("PWM Values")
    ax2.set_ylabel("Error Value")

    plt.title("PWM Values over time")
    fig.legend()
    plt.savefig("pwm_plot.png")

    if show_img:
        plt.show()
    plt.clf()

# CYY add for image detect ----- start
def get_output_layers(net):
    layer_names = net.getLayerNames()
    try:
        output_layers = [layer_names[i - 1] for i in net.getUnconnectedOutLayers()]
    except:
        output_layers = [layer_names[i[0] - 1] for i in net.getUnconnectedOutLayers()]

    return output_layers

def checkifStopsign(image, classes, net):
    Width = image.shape[1]
    Height = image.shape[0]
    scale = 0.00392
    # classes = None
    # with open("yolov3.txt", 'r') as f:
    #     classes = [line.strip() for line in f.readlines()]

    # net = cv2.dnn.readNet("yolov3.weights", "yolov3.cfg")

    blob = cv2.dnn.blobFromImage(image, scale, (416,416), (0,0,0), True, crop=False)

    net.setInput(blob)

    outs = net.forward(get_output_layers(net))

    class_ids = []
    confidences = []
    boxes = []
    conf_threshold = 0.5
    nms_threshold = 0.4

    for out in outs:
        for detection in out:
            scores = detection[5:]
            class_id = np.argmax(scores)
            confidence = scores[class_id]
            if confidence > 0.5:
                center_x = int(detection[0] * Width)
                center_y = int(detection[1] * Height)
                w = int(detection[2] * Width)
                h = int(detection[3] * Height)
                x = center_x - w / 2
                y = center_y - h / 2
                class_ids.append(class_id)
                confidences.append(float(confidence))
                boxes.append([x, y, w, h])

    
    indices = cv2.dnn.NMSBoxes(boxes, confidences, conf_threshold, nms_threshold)
    for i in indices:
        print(classes[class_ids[int(i)]])
        if classes[class_ids[int(i)]] == "stop sign":
            # print("acutal stop sign")
            # global signflag 
            # signflag = True
            return True
    return False
# CYY add for image detect ----- end


# Max number of loops
max_ticks = 2000

# set up the car throttle and steering PWMs
initialize_car()

# set up video
video = cv2.VideoCapture(2)
video.set(cv2.CAP_PROP_FRAME_WIDTH, 320)
video.set(cv2.CAP_PROP_FRAME_HEIGHT, 240)

# wait for video to load
time.sleep(1)

# PD variables
kp = 0.09
kd = kp*0.1
lastTime = 0
lastError = 0
# counter for number of ticks
counter = 0

# start the engines
go()

# arrays for making the final graphs
p_vals = []
d_vals = []
err_vals = []
speed_pwm = []
steer_pwm = []
current_speed = go_forward

stopSignCheck = 1
sightDebug = False
isStopSignBool = False
isAtRedBox = False
stopSeen = 0
speedCap = False
pause_counter = 0

while counter < max_ticks:
    ret, original_frame = video.read()
    frame = cv2.resize(original_frame, (160, 120))
    
    if sightDebug:
        cv2.imshow("Resized Frame", frame)

    # if stopSeen == 0:
    #     floorGreen, _ = isTrafficGreenLightVisible(frame)
    #     if floorGreen:
    #         go()
    #     else:
    #         continue
    # print("stopSeen: ", stopSeen)

    # check for stop sign/traffic light every couple ticks

    if ((counter + 1) % stopSignCheck) == 0 and counter > pause_counter:
        
        isStopSignBool, floorSight = isRedFloorVisible(frame)
        if isStopSignBool:
            print("see red, stop")
            stop()
            stopSeen += 1
            pause_counter = counter + 40
            # exit loop on final stop sign
            if stopSeen == 3:
                print("detected final stop sign, stopping")
                break            

            time.sleep(2)
            rett, original_framee = video.read()
            framee = cv2.resize(original_framee, (160, 120))
            if (checkifStopsign(frame, classes, net)):
                print("detected stop sign, stopping")
            else:
                print("detected red box, stopping")
            # if(checkifStopsign(frame, classes, net)):
            #     print("detected actual stop sign, stopping")
            go_forward = 8.22
            go()
            



    # process the frame to determine the desired steering angle
    # cv2.imshow("original",frame)
    edges = detect_edges(frame)
    roi = region_of_interest(edges)
    line_segments = detect_line_segments(roi)
    lane_lines = average_slope_intercept(frame, line_segments)
    lane_lines_image = display_lines(frame, lane_lines)
    steering_angle = get_steering_angle(frame, lane_lines)
    # heading_image = display_heading_line(lane_lines_image,steering_angle)
    # cv2.imshow("heading line",heading_image)

    # calculate changes for PD
    now = time.time()
    dt = now - lastTime
    if sightDebug:
        cv2.imshow("Cropped sight", roi)
    deviation = steering_angle - 90

    # speed adjustment
    if counter % speedCheck == 0:
        with open('/sys/module/op_encode/parameters/op_count', 'r') as fileread:
            read = int(fileread.read())
        speed = read - prev
        prev = read
        # print(speed)
        if speed > TARGET:
            go_forward = go_forward - SPEED_STEP
        elif speed < TARGET:
            go_forward = go_forward + SPEED_STEP
        print(go_forward)

    #     if go_forward > SPEED_CEIL:
    #         print("=====speed ceil=====")
    #         go_forward = SPEED_CEIL #limit the highest speed
    #     elif go_forward < SPEED_FLOOR:
    #         print("=====speed floor=====")
    #         go_forward = SPEED_FLOOR
    #     print(go_forward)
        # throttle (go_forward)

    # PD Codeint
    error = -deviation
    base_turn = 7.5
    proportional = kp * error
    derivative = kd * (error - lastError) / dt

    # take values for graphs
    p_vals.append(proportional)
    d_vals.append(derivative)
    err_vals.append(error)

    # determine actual turn to do
    turn_amt = base_turn + proportional + derivative


    # caps turns to make PWM values
    if 7.2 < turn_amt < 7.8:
        turn_amt = 7.5
    elif turn_amt > left:
        turn_amt = left
    elif turn_amt < right:
        turn_amt = right

    # turn!
    #PWM.set_duty_cycle(steeringPin, turn_amt)
    steer(turn_amt)
    # take values for graphs
    steer_pwm.append(turn_amt)
    speed_pwm.append(go_forward)

    # update PD values for next loop
    lastError = error
    lastTime = time.time()

    key = cv2.waitKey(1)
    if key == 27:
        break

    counter += 1

# clean up resources
video.release()
cv2.destroyAllWindows()
print("Finishing... ")
steer(dont_move_steer)
throttle(dont_move_throttle)

plot_pd(p_vals, d_vals, err_vals, True)
plot_pwm(speed_pwm, steer_pwm, err_vals, True)

Credits

Jack Yeh

Jack Yeh

1 project • 0 followers
Lucy Zhu

Lucy Zhu

1 project • 1 follower
Antonio

Antonio

1 project • 0 followers
Nick Zhang

Nick Zhang

1 project • 0 followers

Comments

Add projectSign up / Login