From 4000e5bba1cdb936bef787ad94d6c75d8b0eaeaf Mon Sep 17 00:00:00 2001 From: Tony Yang Date: Sun, 30 Apr 2023 18:43:13 +0800 Subject: [PATCH] init: untested code --- Pipfile | 15 ++++ lambda-detect-face.py | 53 +++++++++++++ lambda-gen-s3-url.py | 40 ++++++++++ led_control.py | 170 ++++++++++++++++++++++++++++++++++++++++++ upload.py | 19 +++++ 5 files changed, 297 insertions(+) create mode 100644 Pipfile create mode 100644 lambda-detect-face.py create mode 100644 lambda-gen-s3-url.py create mode 100644 led_control.py create mode 100644 upload.py diff --git a/Pipfile b/Pipfile new file mode 100644 index 0000000..99ed9fd --- /dev/null +++ b/Pipfile @@ -0,0 +1,15 @@ +[[source]] +url = "https://pypi.org/simple" +verify_ssl = true +name = "pypi" + +[packages] +awsiotsdk = "*" +boto3 = "*" +opencv-python = "*" +requests = "*" + +[dev-packages] + +[requires] +python_version = "3.10" diff --git a/lambda-detect-face.py b/lambda-detect-face.py new file mode 100644 index 0000000..b20b290 --- /dev/null +++ b/lambda-detect-face.py @@ -0,0 +1,53 @@ +import os +import json +import urllib.parse + +import boto3 + +TOPIC = os.getenv("MQTT_TOPIC", "") + +rekognition = boto3.client('rekognition') +iot_data = boto3.client('iot-data') + +# trigger by S3 events +def lambda_handler(event, _context): + bucket = event['Records'][0]['s3']['bucket']['name'] + key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8') + + try: + faces = detect_faces(key, bucket) + except Exception as e: + print(e) + print(f"Error while detecting faces in object {key} from bucket {bucket}.") + raise e + + try: + iot_publish({ "faces": faces }) + except Exception as e: + print(e) + print(f"Error while publishing to MQTT topic {TOPIC}.") + raise e + + return str(faces) + + +def detect_faces(photo, bucket): + response = rekognition.detect_faces( + Image={ + 'S3Object': { + 'Bucket': bucket, + 'Name': photo + } + }, + Attributes=['ALL'] + ) + + return len(response['FaceDetails']) + + +def iot_publish(payload): + iot_data.publish( + topic=TOPIC, + qos=1, + payload=json.dumps(payload) + ) \ No newline at end of file diff --git a/lambda-gen-s3-url.py b/lambda-gen-s3-url.py new file mode 100644 index 0000000..ff1f83a --- /dev/null +++ b/lambda-gen-s3-url.py @@ -0,0 +1,40 @@ +import os +import json + +import boto3 +from botocore.exceptions import ClientError + +TOPIC = os.getenv("MQTT_TOPIC", "") +BUCKET = os.getenv("BUCKET", "") + +s3 = boto3.client('s3') +iot_data = boto3.client('iot-data') + +# trigger by MQTT topic events +def lambda_handler(event, _context): + filename = event['filename'] + + try: + url = s3.generate_presigned_put(BUCKET, filename) + print("Got presigned URL: %s", url) + except ClientError as e: + print(e) + print("Couldn't get a presigned URL") + raise e + + try: + iot_publish({ "url": url }) + except Exception as e: + print(e) + print(f"Error while publishing to MQTT topic {TOPIC}.") + raise e + + return url + + +def iot_publish(payload): + iot_data.publish( + topic=TOPIC, + qos=1, + payload=json.dumps(payload) + ) \ No newline at end of file diff --git a/led_control.py b/led_control.py new file mode 100644 index 0000000..2a86409 --- /dev/null +++ b/led_control.py @@ -0,0 +1,170 @@ +import json +import uuid +import asyncio +import datetime + +import cv2 +import requests + +from awscrt import mqtt +from awsiot import mqtt_connection_builder + +DEBUG = 0 + +UUID = f"cloudprog-{uuid.uuid4()}" + +ENDPOINT = "" +CA = "" +CERT = "" +PRIVKEY = "" + +LED_TOPIC = "" + +# https://aws.amazon.com/tw/blogs/iot/securely-ingesting-large-sized-payloads-from-iot-devices-to-the-aws-cloud/ +S3_UPLOAD_TOPIC = "" +S3_URL_TOPIC = "" + +FORMAT = "jpg" + +filename = None +cam = cv2.VideoCapture(0) + +if not DEBUG: + import RPi.GPIO as GPIO + + LED_PIN = 32 + + GPIO.setmode(GPIO.BOARD) + GPIO.setup(LED_PIN, GPIO.OUT) + + +# Callback when connection is accidentally lost. +def on_connection_interrupted(connection, error, **kwargs): + print("Connection interrupted. error: {}".format(error)) + + +# Callback when an interrupted connection is re-established. +def on_connection_resumed(connection, return_code, session_present, **kwargs): + print("Connection resumed. return_code: {} session_present: {}".format(return_code, session_present)) + + if return_code == mqtt.ConnectReturnCode.ACCEPTED and not session_present: + print("Session did not persist. Resubscribing to existing topics...") + resubscribe_future, _ = connection.resubscribe_existing_topics() + + # Cannot synchronously wait for resubscribe result because we're on the connection's event-loop thread, + # evaluate result with a callback instead. + resubscribe_future.add_done_callback(on_resubscribe_complete) + + +def on_resubscribe_complete(resubscribe_future): + resubscribe_results = resubscribe_future.result() + print("Resubscribe results: {}".format(resubscribe_results)) + + for topic, qos in resubscribe_results['topics']: + if qos is None: + print("Server rejected resubscribe to topic: {}".format(topic)) + + +# Callback when the subscribed topic receives a message +def on_message_received(topic, payload, dup, qos, retain, **kwargs): + # print("Received message from topic '{}': {}".format(topic, payload)) + + if topic.endswith("led"): + parse_led_payload(payload) + elif topic.endswith("s3/url"): + parse_s3_url_payload(payload) + + +def parse_led_payload(payload): + data = json.loads(payload) + if data.get("faces"): + led_control(True) + else: + led_control(False) + + +def led_control(light): + print(["Light off", "Light on"][light]) + + if not DEBUG: + GPIO.output(LED_PIN, GPIO.HIGH if light else GPIO.LOW) + + +def parse_s3_url_payload(payload): + data = json.dumps(payload) + + put_presigned_s3_url(data["url"], take_picture()) + + +def take_picture(): + cam.open() + ret, frame = cam.read() + ret, img = cv2.imencode(FORMAT, frame) + cam.release() + return img.tobytes() + + +def put_presigned_s3_url(url, img_bytes): + res = requests.put(url, data=img_bytes) + print(f"Put to S3. status code: {res.status_code}") + return res + + +async def main(): + global filename + # create MQTT connection + mqtt_connection = mqtt_connection_builder.mtls_from_path( + endpoint=ENDPOINT, + cert_filepath=CERT, + pri_key_filepath=PRIVKEY, + ca_filepath=CA, + on_connection_interrupted=on_connection_interrupted, + on_connection_resumed=on_connection_resumed, + client_id=UUID + ) + + connect_future = mqtt_connection.connect() + + connect_result = connect_future.result() + print(f"{UUID} is connected!") + + # subscribe to MQTT topic + subscribe_future, packet_id = mqtt_connection.subscribe( + topic=LED_TOPIC, + qos=mqtt.QoS.AT_LEAST_ONCE, + callback=on_message_received + ) + + subscribe_result = subscribe_future.result() + print(f"Subscribed to {LED_TOPIC}") + + subscribe_future, packet_id = mqtt_connection.subscribe( + topic=S3_URL_TOPIC, + qos=mqtt.QoS.AT_LEAST_ONCE, + callback=on_message_received + ) + + subscribe_result = subscribe_future.result() + print(f"Subscribed to {LED_TOPIC}") + + while True: + command = input("> ") + + if command == "p": + filename = datetime.datetime.strftime(datetime.datetime.now(), "%Y-%m-%d-%H-%M-%S") + "." + FORMAT + publish_future, packet_id = mqtt_connection.publish( + topic=S3_UPLOAD_TOPIC, + qos=1, + payload=json.dumps({ "filename": filename }) + ) + publish_result = publish_future.result() + print(f"Upload request {filename} sent!") + + +# run +loop = asyncio.get_event_loop() +loop.set_exception_handler(lambda loop, context: [print(context['message']), GPIO.cleanup()]) + +loop.create_task(main()) + +loop.run_forever() \ No newline at end of file diff --git a/upload.py b/upload.py new file mode 100644 index 0000000..9e64214 --- /dev/null +++ b/upload.py @@ -0,0 +1,19 @@ +import datetime + +import cv2 +import boto3 + +BUCKET = "" +FORMAT = "jpg" + +cam = cv2.VideoCapture(0) +s3 = boto3.client('s3') + +ret, frame = cam.read() +ret, img = cv2.imencode(FORMAT, frame) + +response = s3.put_object( + Body=img.tobytes(), + Bucket=BUCKET, + Key=datetime.datetime.strftime(datetime.datetime.now(), "%Y-%m-%d-%H-%M-%S") + "." + FORMAT +) \ No newline at end of file