本文介绍在物联网应用开发(IoT Studio)平台使用树莓派摄像头实现人脸识别功能。即将树莓派摄像头采集的人像,存储到阿里云对象存储(OSS)中,同时,通过设备属性上报获取图片,并调用人脸识别API进行人脸识别后,在钉钉群中推送验证结果。
流程图
物料准备
硬件 | 说明 |
---|---|
树莓派主板 | 将摄像头和红外线传感器连接到树莓派主板。
|
适用于树莓派的摄像头 | |
红外线传感器 |
创建存储空间
本示例中,摄像头拍摄的图像将存储在阿里云对象存储中,因此需为图像存储创建一个存储空间Bucket。开发设备端SDK时,需要配置该Bucket信息。
创建产品和设备
为产品定义物模型
设备端SDK开发
因为树莓派基于Python语言,需要配置两个Python SDK: IoT设备端接入SDK和OSS文件上传SDK。
设备端SDK开发代码示例如下:
- 引入相关库。
import aliyunsdkiotclient.AliyunIotMqttClient as iot ##导入阿里云的设备MQTT库,如果import失败需要先pip3 install import json import multiprocessing import time import random import oss2 ##导入阿里云的OSS库,如果import失败需要先pip3 install oss2 from picamera import PiCamera ##树莓派的摄像头,系统自带 import RPi.GPIO as GPIO ##GPIO口,接红外PIR使用
- OSS Bucket访问授权 。
auth = oss2.Auth('**YourAccessKeyId**','**YourAccessKeySecret**') ##OSS的授权,需要您的阿里云账号AccessKey ID和AccessKey Secret,具体查看https://usercenter.console.aliyun.com/#/manage/ak bucket = oss2.Bucket(auth,'http://oss-******.aliyuncs.com','**YourBucketName**') ##请在OSS控制台,Bucket的概览页查看具体信息 global picURLtoIoT camera = PiCamera() camera.resolution = (800,600) ##拍照分辨率,越高越容易分析,但是上传速度越慢
- 初始化树莓派。
def init(): GPIO.setwarnings(False) GPIO.setmode(GPIO.BOARD) GPIO.setup(3, GPIO.IN) pass
- 定义图像上传OSS Bucket。
def take_photo(): ticks = int(time.time()) fileName = 'test%s.jpg' % ticks ##在文件名加入时间戳作为简易加密手段 filePath = '/home/pi/Pictures/%s' % fileName camera.capture(filePath) bucket.put_object_from_file('bucket_file_name/%s', fileName) ##在这里改bucket名称 global picURLtoIoT picURLtoIoT = 'http://**您的bucket名称**.oss-******.aliyuncs.com/bucket_file_name/%s' % fileName ##图片存储URL,需替换为您的bucket名称和bucket内文件名称 print(str(picURLtoIoT))
- 定义红外线感应器检测到有人时,摄像头拍照,否则休眠5秒。
def detectPeople(): if GPIO.input(3) == True: take_photo() else: time.sleep(5) options = { 'productKey':'**设备的ProductKey**', 'deviceName':'**设备的deviceName**', 'deviceSecret':'**设备的deviceSecret**', 'port':****, 'host':'iot-as-mqtt.******.aliyuncs.com' ##物联网平台域名 } host = options['productKey'] + '.' + options['host'] def on_message(client, userdata, msg): topic = '/' + productKey + '/' + deviceName + '/update' print(msg.payload) def on_connect(client, userdata, flags_dict, rc): print("Connected with result code " + str(rc)) def on_disconnect(client, userdata, flags_dict, rc): print("Disconnected.")
- 定义设备上报数据到物联网平台。
def upload_device(client): topic = '/sys/'+options['productKey']+'/'+options['deviceName']+'/thing/event/property/post' while True: payload_json = { 'id': int(time.time()), 'params': { 'people':1 ##物模型里布尔值以0和1的形式上报 'picURL': picURLtoIoT, }, 'method': "thing.event.property.post" } print('send data to iot server: ' + str(payload_json)) client.publish(topic, payload=str(payload_json)) if __name__ == '__main__': client = iot.getAliyunIotMqttClient(options['productKey'], options['deviceName'], options['deviceSecret'], secure_mode=3) client.on_connect = on_connect client.connect(host=host, port=options['port'], keepalive=60) p = multiprocessing.Process(target=upload_device, args=(client,)) p.start() detectPeople() GPIO.cleanup() client.loop_forever()
完整的示例代码,请参见本文结尾附录:设备端SDK代码示例。
创建业务服务
调试与发布
- 在业务服务编辑页面,单击右上方的部署按钮
,部署服务。
- 部署完成后,单击启动按钮,启动服务。
- 调试服务。
- 单击发布按钮,发布服务。
附录:设备端SDK代码示例
import aliyunsdkiotclient.AliyunIotMqttClient as iot ##导入阿里云的设备MQTT库,如果import失败需要先pip3 install
import json
import multiprocessing
import time
import random
import oss2 ##导入阿里云的OSS库,如果import失败需要先pip3 install oss2
from picamera import PiCamera ##树莓派的摄像头,系统自带
import RPi.GPIO as GPIO ##GPIO口,接红外PIR使用
auth = oss2.Auth('**YourAccessKeyId**','**YourAccessKeySecret**') ##OSS的授权,需要您的阿里云账号AccessKey ID和AccessKey Secret,具体查看https://usercenter.console.aliyun.com/#/manage/ak
bucket = oss2.Bucket(auth,'http://oss-******.aliyuncs.com','**YourBucketName**') ##请在OSS控制台,Bucket的概览页查看具体信息
global picURLtoIoT
camera = PiCamera()
camera.resolution = (800,600) ##拍照分辨率,越高越容易分析,但是上传速度越慢
##初始化树莓派
def init():
GPIO.setwarnings(False)
GPIO.setmode(GPIO.BOARD)
GPIO.setup(3, GPIO.IN)
pass
def take_photo():
ticks = int(time.time())
fileName = 'test%s.jpg' % ticks ##在文件名加入时间戳作为简易加密手段
filePath = '/home/pi/Pictures/%s' % fileName
camera.capture(filePath)
bucket.put_object_from_file('bucket_file_name/%s', fileName) ##在这里改bucket名称
global picURLtoIoT
picURLtoIoT = 'http://**您的bucket名称**.oss-******.aliyuncs.com/bucket_file_name/%s' % fileName ##图片存储URL,需替换为您的bucket名称和bucket内文件名称
print(str(picURLtoIoT))
##如果检测到有人则拍照,否则休眠5秒
def detectPeople():
if GPIO.input(3) == True:
take_photo()
else:
time.sleep(5)
options = {
'productKey':'**设备的ProductKey**',
'deviceName':'**设备的deviceName**',
'deviceSecret':'**设备的deviceSecret**',
'port':****,
'host':'iot-as-mqtt.******.aliyuncs.com' ##物联网平台域名
}
host = options['productKey'] + '.' + options['host']
def on_message(client, userdata, msg):
topic = '/' + productKey + '/' + deviceName + '/update'
print(msg.payload)
def on_connect(client, userdata, flags_dict, rc):
print("Connected with result code " + str(rc))
def on_disconnect(client, userdata, flags_dict, rc):
print("Disconnected.")
##设备上报属性
def upload_device(client):
topic = '/sys/'+options['productKey']+'/'+options['deviceName']+'/thing/event/property/post'
while True:
payload_json = {
'id': int(time.time()),
'params': {
'people':1 ##物模型里布尔值以0和1的形式上报
'picURL': picURLtoIoT,
},
'method': "thing.event.property.post"
}
print('send data to iot server: ' + str(payload_json))
client.publish(topic, payload=str(payload_json))
if __name__ == '__main__':
client = iot.getAliyunIotMqttClient(options['productKey'], options['deviceName'], options['deviceSecret'], secure_mode=3)
client.on_connect = on_connect
client.connect(host=host, port=options['port'], keepalive=60)
p = multiprocessing.Process(target=upload_device, args=(client,))
p.start()
detectPeople()
GPIO.cleanup()
client.loop_forever()