您现在的位置是:网站首页> PY&Rust

Python硬件相关开发

  • PY&Rust
  • 2023-11-26
  • 488人已阅读
摘要

Python硬件相关开发

串口通讯

Python GPIO操作

使用ffmpeg+opencv读取摄像头并推流到rtmp服务器



串口通讯

经常做嵌入式开发的兄弟们应该对串口、RS232、RS485比较熟悉,主要用来传感器数据采集和设备之间通信等,是一个引用很广泛的通信方式,window系统PC的USB口通过USB转RS232线连接上嵌入式设备的RS232接口。万能语言python也有对应的库pyserial可以支持串口通信。

1.什么是RS232

1.1RS232接口介绍

RS-232标准接口是常用的串行通信接口标准之一。而工业控制的RS-232口一般只使用RXD、TXD、GND三条线。在大部分的嵌入式设备都会使用RS232接口,可以通过接口实现对设备通信。

4.png


RS232接口任何一条信号线的电压均为负逻辑关系。即:逻辑“1”为-3—-15V;逻辑“0”:+3—+15V,噪声容限为2V。即要求接收器能识别高于+3V的信号作为逻辑“0”,低于-3V的信号作为逻辑“1”,TTL电平为5V为逻辑正,0为逻辑负。与TTL电平不兼容故需使用电平转换电路方能与TTL电路连接。但是RS232的传输距离有限,最大传输距离标准值为50英尺,实际上也只能用在15米左右


(1)RS232没有时钟线,只有两根数据线,分别是rx和tx,这两根线都是1bit位宽的。其中rx是接受数据的线,tx是发送数据的线。通过串口线rx一位接收数据,从最低位到最高位依次接收,最后形成8bit数据。


(2)串口数据的发送与接收是基于帧结构,即一帧一帧的发送与接收数据。每一帧除了中间包含 8bit 有效数据外,每一帧的开头都必须有一个起始位,且固定为 0。在每一帧的结束时也必须有一个停止位,且固定为 1,即最基本的帧结构(不包括校验等)有 10bit。在不发送或者不接收数据的情况下,rx 和 tx 处于空闲状态,此时 rx 和 tx 线都保持 高电平,如果有数据帧传输时,首先会有一个起始位,然后是 8bit 的数据位,接着有 1bit 的停止位,然后 rx 和 tx 继续进入空闲状态,然后等待下一次的数据传输。如下图所示 为一个最基本的 RS232 帧结构。


4.png


1.2通信参数

(1)波特率


在信息传输通道中,携带数据信息的信号单元叫码元(因为串口是 1bit 进 行传输的,所以其码元就是代表一个二进制数),每秒钟通过信号传输的码元数称为码元 的传输速率,简称波特率,常用符号“Baud”表示,其单位为“波特每秒(Bps)”。串口常见的波特率有 4800、9600、115200 等


2)比特率


每秒钟通信信道传输的信息量称为位传输速率,简称比特率,其单位为 “每秒比特数(bps)”。比特率可由波特率计算得出,公式为:比特率=波特率 * 单个调制状态对应的二进制位数。如果使用的是 9600 的波特率,其串口的比特率为:9600Bps * 1bit= 9600bps。



4.png


1.3 RS232串口开发流程

开发前需要掌握RS232串口开发流程,在使用python的pyserial库时,我们可先用我们的串口调试工具确认串口指令功能是否正常,而后再进行下一步的调用实验,避免因串口功能异常,影响正常的代码编写,具体如下。


(1)接入RS232转USB串口线


在电脑的设备管理器里找到对应的USB端口号(若识别不到设备,请相应的驱动包是否正确安装)。


(2)打开串口调试助手工具(sscom、XCOM等)


选中对应的USB端口号,设置波特率、根据需要勾选hex发送、设置校验位,打开串口。


(3)发送指令,接受返回值


连接串口、设置串口通信的参数(波特率),校验方式(奇偶校验、CRC-16校验和LRC校验)。


2. pySerial

pySerial封装了串口通讯模块,可支持Linux、Windows、BSD(可能支持所有支持POSIX的操作系统),支持Jython(Java)和IconPython(.NET and Mono)。所有程序全由Python完成,除了标准库外不依赖其他包,安装pyserial库。


通信流程


(1)确定要使用的串口号。


(2)配置波特率、数据位、奇偶校验位、停止位、DTR/DSR、RTS/CTS 和 XON/XOFF。


(3)打开串口。


(4)收发数据。


(5)关闭串口。


首先要安装库


pip install pyserial

2.1 pyserial库常用函数介绍

serial = serial.Serial(‘COM1’, 115200)  打开COM1并设置波特率为115200,COM1只适用于Windows

serial = serial.Serial(‘/dev/ttyS0’, 115200)  打开/dev/ttyS0并设置波特率为115200, 只适用于Linux

print(serial .portstr)          能看到第一个串口的标识

serial .write(“hello”)     往串口里面写数据

serial .close()               关闭serial 表示的串口

serial .open()                打开串口

data = serial .read(num)      读num个字符

data = serial .readline()     读一行数据,以/n结束,要是没有/n就一直读,阻塞。

serial .baudrate = 9600       设置波特率

print(serial)                  可查看当前串口的状态信息

serial .isOpen()             当前串口是否已经打开

serial.inWaiting()           判断当前接收的数据

serial.flushInput()          清除输入缓冲区数据

serial.flushOutput()         中止当前输出并清除输出缓冲区数据

2.2 python发送字符串指令

import time

import serial


#RS232指令,使用一个字典,把需要被操作的RS232命令封装到一起

RS232_Command = {

    'command1_utf8': 'DCBA',

    'command2_utf8': 'ABCD',

}

#把字符串类型转换为bytes数据流进行发送,RS232命令发送函数

def serial_sent_utf(command):

    #从字典里获取对应的RS232命令

    var = RS232_Command["%s" % command]

    #encode()函数是编码,把字符串数据转换成bytes数据流

    ser.write(var.encode())

    data = ser.read(10)

    # 获取指令的返回值,并且进行类型转换,转换为字符串后便可以进行字符串对比,因而便可以根据返回值进行判断是否执行特定功能

    data = str(data, encoding="utf-8")

    return data


if __name__ == '__main__':

    #实现串口的连接

    ser = serial.Serial('COM7', 115200, timeout=3)#连接com7端口

    command1_utf8 = serial_sent_utf('command1_utf8')

    command2_utf8 = serial_sent_utf('command2_utf8')

基本流程如下


(1)连接串口,设置串口通信的参数


使用 ser = serial.Serial(‘COM7’, 115200, timeout=3) 函数实现串口端口的连接,这里的COM7为PC连接嵌入式设备的USB端口号,115200为串口的波特率,timeout为连接超时。


(2)发送命令


serial_sent_utf(command),使用var = RS232_Command[“%s” % command]把需要发送的RS232命令赋值给 var,然后通过pyserial库里面的 ser.write(var.encode())函数发送到嵌入式设备,这里要注意 var.encode(),因为这里我们发送的RS232命令为字符串格式数据,但一般发送到嵌入式设备的数据是比特流格式,所以这边使用encode()函数进行编码,把发送的数据变成比特流发送。


(3)接收返回值


发送了RS232命令后,一般都会有返回值,通过pyserial库里面的data = ser.read()函数获取返回值,并把返回值赋值给data,使用read()相关函数时要注意避免出现一直读取返回值的情况。这里获取到的数据格式也是比特流数据,所以为了便于后续对返回值的判断等操作,这里加入了函数str(data, encoding=“utf-8”)对data的数值格式进行了转换,转换成utf-8的格式数据。


2.3python发送发送HEX指令

import time

import serial


#RS232指令,使用一个字典,把需要被操作的RS232命令封装到一起

RS232_Command = {

    'command_hex1': '6B 30 31 73 50 32 30 31 0D',

    'command_hex2': '6B 30 31 73 50 32 30 30 0D',

}


#把16进制的数据转换为bytes数据流进行发送,RS232命令发送函数

def serial_sent_hex(command):

    #bytes.fromhex(),使用这个函数进行数据转换,可以把16进制的数值转换字节数据(即比特流,字符串与比特流间还可以用encode()和decode()进行编解码)

    var = bytes.fromhex(RS232_Command["%s" % command])

    ser.write(var)

    data = ser.read(10)

    # 获取指令的返回值,并且进行类型转换,转换为字符串后便可以进行字符串对比,因而便可以根据返回值进行判断是否执行特定功能

    data = str(data, encoding="utf-8")

    return data


if __name__ == '__main__':

    #实现串口的连接

    ser = serial.Serial('COM7', 115200, timeout=3)

    command_hex1 = serial_sent_hex('command_hex1')

    command_hex2 = serial_sent_hex('command_hex2')

不同地方就是其对发送的数据类型的不一样,发送HEX格式的命令,这里用了var = bytes.fromhex(RS232_Command[“%s” % command]),这里使用了bytes.fromhex()函数把hex的数据类型转换成比特流数据。


综上所述,在对串口库的使用过程中,要特别留意对发送数据的处理:要根据实际情况进行数据转换,把字符或HEX格式数据转换成bytes数据流发送。


2.4 ubunt 读取串口

(1)首先查看外接串口的端口号


(2)运行接收代码


'''

date:...

author:

function:

'''

import serial

data_ser = serial.Serial("/dev/ttyUSB0",9600,timeout = 5)

data_ser.flushInput()#清楚缓存


if __name__ == '__main__':

    while True:

        data_count = data_ser.inWaiting()#判断当前接收的数据

        if data_count !=0 :#如果接收数据不为空,则读取数据

            recv = data_ser.read(data_ser.in_waiting).decode("gbk")

            print(recv)



Python GPIO操作

常用GPIO库

RPi.GPIO和gpiozero

pip install RPi.GPIO

pip install gpiozero


使用RPi.GPIO库控制GPIO

下面是一个使用RPi.GPIO库控制GPIO的示例代码:

import RPi.GPIO as GPIO

import time


# 设置GPIO模式为BCM模式

GPIO.setmode(GPIO.BCM)


# 设置GPIO引脚为输出模式

GPIO.setup(17, GPIO.OUT)


# 控制GPIO引脚输出高电平

GPIO.output(17, GPIO.HIGH)


# 延时1秒

time.sleep(1)


# 控制GPIO引脚输出低电平

GPIO.output(17, GPIO.LOW)


# 清理GPIO设置

GPIO.cleanup()

上述代码中,首先导入RPi.GPIO库和time库。然后,通过GPIO.setmode(GPIO.BCM)将GPIO模式设置为BCM模式,这是树莓派上常用的一种模式。接下来,通过GPIO.setup(17, GPIO.OUT)将GPIO引脚17设置为输出模式。然后,通过GPIO.output(17, GPIO.HIGH)将引脚17输出高电平,等待1秒后再输出低电平,并最后通过GPIO.cleanup()清理GPIO设置。


使用gpiozero库控制GPIO

下面是一个使用gpiozero库控制GPIO的示例代码:

from gpiozero import LED

from time import sleep


# 初始化LED对象

led = LED(17)


# 控制LED亮1秒,然后熄灭1秒,重复5次

for i in range(5):

    led.on()

    sleep(1)

    led.off()

    sleep(1)


上述代码中,首先导入LED类和sleep函数。然后,通过LED(17)创建一个LED对象,将GPIO引脚17与该对象绑定。接下来,通过led.on()和led.off()方法控制LED的亮和灭,通过sleep(1)方法实现1秒的延时。最后,通过循环重复亮和灭的过程5次。



使用ffmpeg+opencv读取摄像头并推流到rtmp服务器

1、环境

python3

OSX 12.5

vscode 


2、安装ffmpeg

brew install ffmpeg 


3、安装cv2


pip install opencv-python

# or

pip --default-timeout=100 install opencv-python -i https://pypi.douban.com/simple

 

4、脚本


import cv2

 

# subprocess 模块允许我们启动一个新进程,并连接到它们的输入/输出/错误管道,从而获取返回值。

import subprocess

 

# 视频读取对象

cap = cv2.VideoCapture(0)

 

# 推流地址

rtmp = "rtmp://192.168.10.225:1935/stream/example"# 推流的服务器地址

 

# 设置推流的参数

command = ['ffmpeg',

           '-y',

           '-f', 'rawvideo',

           '-vcodec', 'rawvideo',

           '-pix_fmt', 'bgr24',

           '-s', '1280*720',  # 根据输入视频尺寸填写

           '-r', '25',

           '-i', '-',

           '-c:v', 'h264',

           '-pix_fmt', 'yuv420p',

           '-preset', 'ultrafast',

           '-f', 'flv',

           rtmp]

 

 

# 创建、管理子进程

pipe = subprocess.Popen(command, stdin=subprocess.PIPE)

size = (int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)))

 

# 循环读取

while cap.isOpened():

    # 读取一帧

    ret, frame = cap.read()

    if frame is None:

        print('read frame err!')

        continue

 

    # 显示一帧

    # fps = int(cap.get(cv2.CAP_PROP_FPS))

    cv2.imshow("frame", frame)

 

    # 按键退出

    if cv2.waitKey(1) & 0xFF == ord('q'):

        break

 

    # 读取尺寸、推流

    img = cv2.resize(frame, size)

 

    pipe.stdin.write(img.tobytes())

    

 

# 关闭窗口

cv2.destroyAllWindows()

 

# 停止读取

cap.release()

备注:分辨率要根据本机摄像头支持的分比率设置,不然可能显示不出图像


树莓派推流问题

最近在用ffmpeg+Python opencv 的方法实现采集摄像头实时视频并完成推流。实现思路是:cv2先获取摄像头视频数据,然后建立一个subprocess.popen管道去推流,然后将视频帧处理完后写入管道中完成图像处理并推流的工作。在windows下调试可以正常实现,代码如下(这个网上很多,我把我调试成功的代码也贴出来)。但是我想要在树莓派上实现,将该部分代码移植到树莓派上后就出现报错:pipe.stdin.write(img.tostring()) BrokenPipeError: [Errno 32] Broken pipe。现象是:代码刚跑起来,刚显示一两帧画面就中断了。

import cv2

import subprocess

 

# 视频读取对象

cap = cv2.VideoCapture(0,cv2.CAP_DSHOW)


# 推流地址

rtsp = "rtsp://192.168.1.100:554/live/test1"# 推流的服务器地址


# 设置推流的参数

command = ['ffmpeg',

           '-y',

           '-f', 'rawvideo',

           '-vcodec', 'rawvideo',

           '-pix_fmt', 'bgr24',

           '-s', '640*480',  # 根据输入视频尺寸填写

           '-r', '30',

           '-i', '-',

           '-c:v', 'h264',

           '-pix_fmt', 'yuv420p',

           '-preset', 'ultrafast',

           '-f', 'rtsp',

           rtsp]


# 创建、管理子进程

pipe = subprocess.Popen(command,shell=True, stdin=subprocess.PIPE)

size = (int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)))


# 循环读取

num = 0

while cap.isOpened():

    num = num + 1

    print(num)

    # 读取一帧

    ret, frame = cap.read()

    if frame is None:

        print('read frame err!')

        continue

 

    # 显示一帧

    fps = int(cap.get(cv2.CAP_PROP_FPS))

    cv2.imshow("frame", frame)

    print("fps:",fps)

 

    # 按键退出

    if cv2.waitKey(1) & 0xFF == ord('q'):

        break

 

    # 读取尺寸、推流

    img = cv2.resize(frame, size)

 

    pipe.stdin.write(img.tobytes())


# 关闭窗口

cv2.destroyAllWindows()

 

# 停止读取

cap.release()

问题解决

1.找到问题

  从别人的帖子里发现可以用pipe.poll()来检查当前popen的工作状态,如果然后0,表示进程正常结束,返回1表示进程sleep挂起,返回2,表示子进程不存在,返回-15表示进程被kill,返回None,表示进程正在在运行。我print之后发现,返回了1,所以预估应该是,在树莓派上和windows上的popen创建方法有差别,在树莓派这里popen刚开始工作就被挂起了,终于找到解决办法。

4.png

2.解决办法

  尝试了一下,将创建popen的shell = True 改为 shell = False就可以了。

将如下代码,


pipe = subprocess.Popen(command,shell=True, stdin=subprocess.PIPE)

改为

pipe = subprocess.Popen(command,shell=False, stdin=subprocess.PIPE)

即可






上一篇:Rust学习资料

下一篇:Python与Rust同行者

Top