Ros智行mini,opencv,Gmapping建图,自主导航auto_slam,人脸识别,语音控制

功能

photo

一、Gmapping建图

二、自主导航 起始点 、终点

三、人脸识别

四、语音控制

完成任务: 机器人先建图 建完图后给出目标点,机器人就可以完成调用自主导航走到目标点,期间会调用激光雷达扫描局部环境来进行自主避障,到达终点后进行语音播报和人脸识别

photo

主要功能文件

photo
按照工作目录来讲

一、Gmapping

就是开启运动服务器 然后通过语音控制或者键盘控制让机器人跑一遍地图,在跑的时候机器人会调用激光雷达进行环境扫描 ,绘制地图

photo

二、自主导航

给定机器人初始路径点,结束路径点并存入文件,有起始位置,有终点位置,机器人就能使用move_base动作服务器将机器人导航到每个路径点,运行时出现障碍物 激光雷达进行环境扫描 绘制出局部地图 进行自主避障

auto_slam.py

#!/usr/bin/env python
import rospy

import actionlib
import roslaunch
from actionlib_msgs.msg import *
from move_base_msgs.msg import MoveBaseAction, MoveBaseGoal
from nav_msgs.msg import Path
from std_msgs.msg import String
from geometry_msgs.msg import PoseWithCovarianceStamped
from tf_conversions import transformations
from xml.dom.minidom import parse
from math import pi
import tf
###定义对象
class navigation_demo:
    def __init__(self):
        self.set_pose_pub = rospy.Publisher('/initialpose', PoseWithCovarianceStamped, queue_size=5)

        self.move_base = actionlib.SimpleActionClient("move_base", MoveBaseAction)### 运动信息的节点(动作服务器)
        self.move_base.wait_for_server(rospy.Duration(60))###用于等待与动作服务器的连接建立。
        
        
        self.tf_listener = tf.TransformListener()### 监听阵列
        self.get_point = rospy.Publisher('get_pos', String, queue_size=5)
        
        self.plist = []
        self.success_count = 0
        
    def set_plist(self,plist):
        self.plist = plist
        
     ## 初始化机器人姿态       
    def set_pose(self, p):
        if self.move_base is None:
            return False

        x, y, th = p

        pose = PoseWithCovarianceStamped()
        pose.header.stamp = rospy.Time.now()
        pose.header.frame_id = 'map'
        pose.pose.pose.position.x = x
        pose.pose.pose.position.y = y
        q = transformations.quaternion_from_euler(0.0, 0.0, th/180.0*pi)
        pose.pose.pose.orientation.x = q[0]
        pose.pose.pose.orientation.y = q[1]
        pose.pose.pose.orientation.z = q[2]
        pose.pose.pose.orientation.w = q[3]

        self.set_pose_pub.publish(pose)
        return True

     

    # 当导航行为完成时的回调函数
    def _done_cb(self, status, result):
        rospy.loginfo("navigation done! status:%d result:%s"%(status, result))
	# 当导航行为激活时的回调函数
    def _active_cb(self):
        rospy.loginfo("[Navi] navigation has be actived")
	 # 导航过程中的反馈回调函数
    def _feedback_cb(self, feedback):
        rospy.loginfo("[Navi] navigation feedback\r\n%s"%feedback)
        
    def goto(self, p):
        goal = MoveBaseGoal()### 定义MoveBaseGoal对象 进行

        goal.target_pose.header.frame_id = 'map'  ###建立坐标’
        ### 设置goal的移动目标地点
        goal.target_pose.header.stamp = rospy.Time.now()
        goal.target_pose.pose.position.x = p[0]
        goal.target_pose.pose.position.y = p[1]
        goal.target_pose.pose.position.z = p[2]
        #q = transformations.quaternion_from_euler(0.0, 0.0, p[2]/180.0*pi)###欧拉数转化为四元数,三维空间的旋转方向
        goal.target_pose.pose.orientation.x = p[3]
        goal.target_pose.pose.orientation.y = p[4]
        goal.target_pose.pose.orientation.z = p[5]
        goal.target_pose.pose.orientation.w = p[6]
		### 发送导航目标,并指定回调函数
        self.move_base.send_goal(goal, self._done_cb, self._active_cb, self._feedback_cb)
        # 等待导航结果,超时时间为60秒
        result = self.move_base.wait_for_result(rospy.Duration(60))### 是否到达这个导航点
        print(result)
       
        state = self.move_base.get_state()
        if state == GoalStatus.SUCCEEDED:
            self.success_count += 1
            ### 到达的导航点是否为最终目标点
            if len(self.plist) == self.success_count:
                rospy.loginfo("arrived goal point")
                self.get_point.publish("1")
                self.isSendVoice = False
        return True

    def cancel(self):
        self.move_base.cancel_all_goals()
        return True
    
###定义回调函数
def callback(msg):###调用回调函数   向订阅话题发消息 就会调用回调函数
    
    doc = parse("/home/bcsh/waypoints.xml")### parse对象处理xml文档 Dom
    root_element = doc.documentElement###文档根结点
    points = root_element.getElementsByTagName("Waypoint")### 每个航点包含七个
    
    plist = []
    
    rospy.loginfo("set pose...")
    navi = navigation_demo() ##创建一个navigation_demo对象  
    
  
    
    for p in points:
        point = [0] * 7
        point[0] = float(p.getElementsByTagName("Pos_x")[0].childNodes[0].data)
        point[1] = float(p.getElementsByTagName("Pos_y")[0].childNodes[0].data)
        point[2] = float(p.getElementsByTagName("Pos_z")[0].childNodes[0].data)
        ###三维空间旋转方向的四元数
        point[3] = float(p.getElementsByTagName("Ori_x")[0].childNodes[0].data)
        point[4] = float(p.getElementsByTagName("Ori_y")[0].childNodes[0].data)
        point[5] = float(p.getElementsByTagName("Ori_z")[0].childNodes[0].data)
        point[6] = float(p.getElementsByTagName("Ori_w")[0].childNodes[0].data)
        plist.append(point)
        
    print(plist)
    
    rospy.loginfo("goto goal...")
    navi.set_plist(plist)
    
    for waypoint in plist:
        #print(waypoint)
        navi.goto(waypoint)
        
if __name__ == "__main__":
    rospy.init_node('auto_slam_node',anonymous=True)#### 初始化ROS节点,命名'auto_slam_node'
    rospy.Subscriber("auto_slam", String,callback)###订阅 "auto_slam" 话题并设置回调函数处理消息
    
    rospy.spin()
    r = rospy.Rate(0.2)# 创建一个rate对象以控制循环速率
    r.sleep()

首先第一步完成建图

photo
关于waypoints.xml

创建完图之后,用Rviz 插件 waterplus_map_tools 通过输入指令进行航点标注,

photo

三、 人脸识别

Take_photo.py

照片存放位置

photo

Face_Rec.py

1.Take_photo.py

拍照 存储 调用人脸识别

TakePhoto类继承了之前 ROS 与 Opencv 接口类,在这个类里面我们重写了 process_imag 函数,使得该函数可以完成人脸识别功能。核心函数为 detectMultiScale 函数,这个函数实现了将视频中的人脸提取出来,反馈值为 faces,faces 是由多个数组组成,每个数组代表人脸在当前图像中的位置(x,y,w,h)分别代表人脸框的左上角点的坐标,人脸框的宽度和长度。

#!/usr/bin/env python

import rospy
import cv2
from ros_opencv import ROS2OPENCV
import sys, select, os

# 定义一个类 TakePhoto,继承 ROS2OPENCV 类
class TakePhoto(ROS2OPENCV):
    def __init__(self, node_name): 
        # 调用 ROS2OPENCV 类的构造函数
        super(TakePhoto, self).__init__(node_name)
        self.detect_box = None ##用于存储检测到的人脸的框的坐标信息。。
        self.result = None ###存储处理后的图像,其中人脸被矩形框标记
        self.count = 0   ##用于计数保存的人脸图像数量,初始化为 0,每次按下 'p' 键保存一张图像时递增。
        self.person_name = rospy.get_param('~person_name', 'name_one')
        self.face_cascade = cv2.CascadeClassifier('/home/bcsh/robot_ws/src/match_mini/scripts/cascades/haarcascade_frontalface_default.xml')###Haar 级联分类器 存放一组描述人脸特征的模型,用来识别人脸
        self.dirname = "/home/bcsh/robot_ws/src/match_mini/scripts/p1/" + self.person_name + "/"
        self.X = None
        self.Y = None
        
        
    # 定义图像处理函数
    def process_image(self, frame):
       # print("sss")
        src = frame.copy()##复制输入的图像帧,以便在不修改原始数据的情况下进行处理。
        gray = cv2.cvtColor(src, cv2.COLOR_BGR2GRAY)##将复制的图像帧转换为灰度图像,因为 Haar 级联分类器通常在灰度图像上执行人脸检测。
        faces = self.face_cascade.detectMultiScale(gray, 1.3, 5)###使用预训练的 Haar 级联分类器检测灰度图像中的人脸。detectMultiScale 返回一个包含检测到的人脸位置坐标的列表。
        result = src.copy() ###以便在上面绘制矩形框。self.result 保存了处理后的图像。
        self.result = result
        #### 遍历检测到的人脸,并在图像上画矩形框
        
       ### 遍历检测到的人脸坐标,将每个人脸用蓝色矩形框标记在图像上。同时,如果按下 'p' 键并且 self.count 小于 20,将当前人脸图像保存到指定的目录,并递增 self.count。
        for (x, y, w, h) in faces:
            ### 给人脸用矩阵框住  左上角,长度,宽度,颜色等参数 
            result = cv2.rectangle(result, (x, y), (x+w, y+h), (255, 0, 0), 2)
            f = cv2.resize(gray[y:y+h, x:x+w], (200, 200))##对存储图片尺寸进行处理
            if self.count<20:
                # 如果按下 'p' 键,保存人脸图像
                if key == 'p' :
                    cv2.imwrite(self.dirname + '%s.pgm' % str(self.count), f)
                    self.count += 1
        return result
    
        
if __name__ == '__main__':
    try:
        # 初始化节点并运行
        node_name = "take_photo_rec"
        TakePhoto(node_name)
        rospy.spin()
    except KeyboardInterrupt:
        print "Shutting down face detector node."
cv2.destroyAllWindows()

Face_Rec.py

#!/usr/bin/env python
# encoding: utf-8

import sys,os,cv2
import numpy as np

import rospy

from geometry_msgs.msg import Twist
from std_msgs.msg import String

pub = rospy.Publisher('cmd_vel', Twist, queue_size = 1)

speed = 0.3
turn = 1.0

face_path = "/home/bcsh/robot_ws/src/match_mini/scripts/data"
face_name = ""

def read_images(path, sz=None):
    c = 0
    X, y = [], []
    names = []
    for dirname, dirnames, filenames in os.walk(path):
        for subdirname in dirnames:
            subject_path = os.path.join(dirname, subdirname)
            for filename in os.listdir(subject_path):
                try:
                    if (filename == ".directory"):
                        continue
                    filepath = os.path.join(subject_path, filename)
                    im = cv2.imread(os.path.join(subject_path, filename), cv2.IMREAD_GRAYSCALE)
                    if (im is None):
                        print("image" + filepath + "is None")
                    if (sz is not None):
                        im = cv2.resize(im, sz)
                    X.append(np.asarray(im, dtype=np.uint8))
                    y.append(c)
                except:
                    print("unexpected error")
                    raise
            c = c + 1
            names.append(subdirname)
    ###函数返回一个包含主题名称(names)、图像数据(X)和相应标签(y)的列表。
    return [names, X, y]

def face_rec():
    [names,X, y] = read_images(face_path)
    y = np.asarray(y, dtype=np.int32)
    #model = cv2.face_EigenFaceRecognizer.create()
    ### 创建训练模型
    model = cv2.face.LBPHFaceRecognizer_create()
    model.train(np.asarray(X), np.asarray(y))
    

    face_cascade = cv2.CascadeClassifier(
        '/home/bcsh/robot_ws/src/match_mini/scripts/cascades/haarcascade_frontalface_default.xml')
    cap = cv2.VideoCapture(0)
    ###调用cv的图象识别
    ### 大筐筐 视图
    cv2.namedWindow("face_detector",0)  ##框框名字
    cv2.resizeWindow("face_detector",480,320)## 框框大小
    while True:
        ret, frame = cap.read()## frame 传过来的一帧图片
        ### 对图片进行处理
        x, y = frame.shape[0:2]
        small_frame = cv2.resize(frame, (int(y / 2), int(x / 2)))
        result = small_frame.copy()
        gray = cv2.cvtColor(small_frame, cv2.COLOR_BGR2GRAY)
        faces = face_cascade.detectMultiScale(gray, 1.3, 5)##人脸在当前图像中的位置(x,y,w,h)
        for (x, y, w, h) in faces:
            ### 小框框
            result = cv2.rectangle(result, (x, y), (x + w, y + h), (255, 0, 0), 2)
            roi = gray[y:y + h,x:x + w]
            try:
                roi = cv2.resize(roi, (200, 200), interpolation=cv2.INTER_LINEAR)
                ### 模型预测 对新图像 p_label,p_confidence进行预测
                [p_label, p_confidence] = model.predict(roi)
                cv2.putText(result, names[p_label], (x, y - 20), cv2.FONT_HERSHEY_SIMPLEX, 1, 255, 2)
                
		print("p_confidence = " + str( ) +"  name=" + names[p_label])
		if p_confidence<60 and names[p_label] == face_name:
            ### 机器人停止位置有一段距离  所以因为距离误差,就得实际改变 置信度  因为要一直往一个方向走 所以p_confidence必须小于 所以只能实际情况确定来小于 机器人能识别置信度的最大值

                   	
                   offset_x = ((x+w) / 2 - 240)
                   target_area = w * h###摄像头看见人脸的目标区域
                   linear_vel = 0
                   angular_vel = 0
                
                   print(target_area)
                    ## 到一定距离才能识别
                   if target_area<100:
                    linear_vel = 0.0
                   elif target_area >110:
                    linear_vel = 0.3
                   else:
                    linear_vel = 0.0
                  
                   if offset_x > 0:
                    angular_vel = 0.1
                
                   if offset_x < 0:
                    angxular_vel = -0.1
		   update_cmd(linear_vel,angular_vel)
		
		
                    
            except:
                continue
	#update_cmd(linear_vel,angular_vel)
        cv2.imshow("face_detector", result)
        if cv2.waitKey(30) & 0xFF == ord('q'):
            break
    cap.release()
    cv2.destroyAllWindows()


def update_cmd(linear_speed, angular_speed):
    twist = Twist()
    twist.linear.x = 1*linear_speed; twist.linear.y = 1*linear_speed; twist.linear.z = 1*linear_speed;
    twist.angular.x = 0; twist.angular.y = 0; twist.angular.z = 1*angular_speed
    pub.publish(twist) 

def callback(msg):
    global face_path
    global face_name
    
	
    if msg.data == "liwei":
    	face_name = "liwei"
    if msg.data == "yaom":
    	face_name = "yaom"
    face_rec()

if __name__ == "__main__":
    rospy.init_node('face_detector')
    rospy.Subscriber("auto_face", String, callback)###订阅消息 定义回调函数
    
    rospy.spin()

四、语音控制

voicecontroller.py

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
import sys
reload(sys)
sys.setdefaultencoding('utf8')
import os
import rospy

from respeaker_interface import x
from respeaker_audio import RespeakerAudio
from std_msgs.msg import String


class VoiceController(object):
    def __init__(self, node_name):
        self.node_name = node_name
        rospy.init_node(node_name)
        rospy.on_shutdown(self.shutdown)
        self.respeaker_interface = RespeakerInterface()
        self.respeaker_audio = RespeakerAudio()
        self.ask_pub = rospy.Publisher('cmd_msg', String, queue_size=5)

    def shutdown(self):
        self.respeaker_interface.close()
        self.respeaker_audio.stop()
def callback(msg):
    os.system("mpg123 /home/bcsh/robot_ws/src/match_mini/voice/zhuabu.mp3")

if __name__ == '__main__':
    voice_controller = VoiceController("voice_controller")
    auto_slam = rospy.Publisher('auto_slam', String, queue_size=10)  # 定义了话题对象auto_slam 发布话题的时候会调用话题的回调函数
    auto_face = rospy.Publisher('auto_face', String, queue_size=10)  # 定义了话题对象auto_face 发布话题的时候会调用话题的回调函数
    rospy.Subscriber("get_pos", String,callback, queue_size=10)
    rate = rospy.Rate(100)

    isPub = False
    while not rospy.is_shutdown():
        text = voice_controller.respeaker_audio.record()### 记录音频输入流
        if text.find("开始") >= 0 and isPub is not True:
            auto_slam.publish("start")
            isPub = True
        if text.find("右") >= 0:
           print("send liwei to auto_face")
           auto_face.publish("liwei")
        elif text.find("偷") >= 0:
	   		print("send yaom to auto_face")
	   		auto_face.publish("yaom")

        direction = voice_controller.respeaker_interface.direction
        print(text)
        print(direction)
        rate.sleep()

用到的类

RespeakerInterface 类用于与 Respeaker 设备进行通信

respeaker_audio.py

#!/usr/bin/env python

import pyaudio
from baidu_speech_api import BaiduVoiceApi
import json
import sys
import os
from aip.speech import AipSpeech
from contextlib import contextmanager

# 重新设置默认字符编码为 utf-8
reload(sys)
sys.setdefaultencoding("utf-8")

# 定义音频采样参数
CHUNK = 1024
RECORD_SECONDS = 5

# 百度语音识别 API 的应用参数
APP_ID = '41721436'
API_KEY = 'QG7UA5m5YZC0PLTw3qWzh2Xd'
SECRET_KEY = 'Y9Q22OM13s2oXLzMUzETiQk96SX7Geq3'

@contextmanager
def ignore_stderr(enable=True):
    """
    用于忽略标准错误流的上下文管理器。
    """
    if enable:
        devnull = None
        try:
            devnull = os.open(os.devnull, os.O_WRONLY)
            stderr = os.dup(2)
            sys.stderr.flush()
            os.dup2(devnull, 2)
            try:
                yield
            finally:
                os.dup2(stderr, 2)
                os.close(stderr)
        finally:
            if devnull is not None:
                os.close(devnull)
    else:
        yield

class RespeakerAudio(object):
    def __init__(self, channel=0, suppress_error=True):
        """
        初始化 RespeakerAudio 类。
        """
        # 忽略标准错误流以避免输出 PyAudio 警告信息
        with ignore_stderr(enable=suppress_error):
            self.pyaudio = pyaudio.PyAudio()
        
        # 初始化音频参数和设备信息
        self.channels = None
        self.channel = channel
        self.device_index = None
        self.rate = 16000
        self.bitwidth = 2
        self.bitdepth = 16
        
        # 查找 Respeaker 设备
        count = self.pyaudio.get_device_count()
        for i in range(count):
            info = self.pyaudio.get_device_info_by_index(i)
            name = info["name"].encode("utf-8")
            chan = info["maxInputChannels"]
            
            # 如果设备名中包含 "respeaker",则认为是 Respeaker 设备
            if name.lower().find("respeaker") >= 0:
                self.channels = chan
                self.device_index = i
                break  
        
        # 如果没有找到 Respeaker 设备,则使用默认输入设备
        if self.device_index is None:
            info = self.pyaudio.get_default_input_device_info()
            self.channels = info["maxInputChannels"]
            self.device_index = info["index"]
        
        # 确保选择的通道在有效范围内
        self.channel = min(self.channels - 1, max(0, self.channel))
        
        # 打开音频输入流
        self.stream = self.pyaudio.open(
            rate=self.rate,
            format=self.pyaudio.get_format_from_width(self.bitwidth),
            channels=1,
            input=True,
            input_device_index=self.device_index,
        )
        
        # 初始化百度语音 API
        self.aipSpeech = AipSpeech(APP_ID, API_KEY, SECRET_KEY)
        self.baidu = BaiduVoiceApi(appkey=API_KEY, secretkey=SECRET_KEY)
    
    def stop(self):
        """
        停止音频输入流。
        """
        # 停止音频输入流
        self.stream.stop_stream()
        self.stream.close()
        self.stream = None
        
        # 终止 PyAudio
        self.pyaudio.terminate()
 
    def generator_list(self, lst):
        """
        生成列表的生成器。
        """
        for l in lst:
            yield l
            
    def record(self):
        """
        录制音频并发送到百度语音识别 API 进行识别。
        """
        # 启动音频输入流
        self.stream.start_stream()
        print("* recording")
        
        frames = []  # 用于存储音频帧
        
        # 录制指定的音频
        for i in range(0, int(self.rate / CHUNK * RECORD_SECONDS)):
            data = self.stream.read(CHUNK)
            frames.append(data)
            
        print("done recording")
        
        # 停止音频输入流
        self.stream.stop_stream()
        
        print("start to send to Baidu")
        
        # 将录制的音频发送到百度语音识别 API 进行识别
        text = self.baidu.server_api(self.generator_list(frames))
        
        # 解析识别结果并返回
        if text:
            try:
                text = json.loads(text)#### 
                for t in text['result']:
                    print(t)
                    return str(t)
            except KeyError:
                return "get nothing"
        else:
            print("get nothing")
            return "get nothing"

if __name__ == '__main__':
    # 创建 RespeakerAudio 实例
    snowman_audio = RespeakerAudio()
    
    # 持续录制并输出识别结果
    while True:
        text = snowman_audio.record()

最近更新

  1. TCP协议是安全的吗?

    2023-12-10 11:58:02       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2023-12-10 11:58:02       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2023-12-10 11:58:02       19 阅读
  4. 通过文章id递归查询所有评论(xml)

    2023-12-10 11:58:02       20 阅读

热门阅读

  1. TCP三次握手、四次挥手及状态转换详解

    2023-12-10 11:58:02       39 阅读
  2. windbg的vscode智能提示js

    2023-12-10 11:58:02       32 阅读
  3. ScheduledThreadPoolExecutor 类定时器弊端和有点

    2023-12-10 11:58:02       32 阅读
  4. facebook广告相比谷歌广告的优势

    2023-12-10 11:58:02       36 阅读
  5. facebook广告运营技巧

    2023-12-10 11:58:02       32 阅读
  6. Linux服务器超级实用的脚本

    2023-12-10 11:58:02       44 阅读
  7. 业务设计原则

    2023-12-10 11:58:02       30 阅读
  8. CTF-Crypto-RSA共模攻击和模不互素攻击

    2023-12-10 11:58:02       34 阅读