cloudprog-hw4/lambda-detect-face.py
2023-05-02 05:56:46 +01:00

54 lines
1.3 KiB
Python

import os
import json
import urllib.parse
import boto3
TOPIC = os.getenv("MQTT_TOPIC", "")
rekognition = boto3.client('rekognition')
iot_data = boto3.client('iot-data')
# trigger by S3 events
def lambda_handler(event, _context):
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
try:
faces = detect_faces(key, bucket)
except Exception as e:
print(e)
print(f"Error while detecting faces in object {key} from bucket {bucket}.")
raise e
try:
iot_publish({ "faces": faces })
except Exception as e:
print(e)
print(f"Error while publishing to MQTT topic {TOPIC}.")
raise e
return str(faces)
def detect_faces(photo, bucket):
response = rekognition.detect_faces(
Image={
'S3Object': {
'Bucket': bucket,
'Name': photo
}
},
Attributes=['ALL']
)
return len(response['FaceDetails'])
def iot_publish(payload):
iot_data.publish(
topic=TOPIC,
qos=0,
payload=json.dumps(payload)
)