import json import uuid import asyncio import datetime import cv2 import requests from awscrt import mqtt from awsiot import mqtt_connection_builder DEBUG = 0 UUID = f"cloudprog-{uuid.uuid4()}" ENDPOINT = "" CA = "" CERT = "" PRIVKEY = "" LED_TOPIC = "" # https://aws.amazon.com/tw/blogs/iot/securely-ingesting-large-sized-payloads-from-iot-devices-to-the-aws-cloud/ S3_UPLOAD_TOPIC = "" S3_URL_TOPIC = "" FORMAT = "jpg" filename = None cam = cv2.VideoCapture(0) if not DEBUG: import RPi.GPIO as GPIO LED_PIN = 32 GPIO.setmode(GPIO.BOARD) GPIO.setup(LED_PIN, GPIO.OUT) # Callback when connection is accidentally lost. def on_connection_interrupted(connection, error, **kwargs): print("Connection interrupted. error: {}".format(error)) # Callback when an interrupted connection is re-established. def on_connection_resumed(connection, return_code, session_present, **kwargs): print("Connection resumed. return_code: {} session_present: {}".format(return_code, session_present)) if return_code == mqtt.ConnectReturnCode.ACCEPTED and not session_present: print("Session did not persist. Resubscribing to existing topics...") resubscribe_future, _ = connection.resubscribe_existing_topics() # Cannot synchronously wait for resubscribe result because we're on the connection's event-loop thread, # evaluate result with a callback instead. resubscribe_future.add_done_callback(on_resubscribe_complete) def on_resubscribe_complete(resubscribe_future): resubscribe_results = resubscribe_future.result() print("Resubscribe results: {}".format(resubscribe_results)) for topic, qos in resubscribe_results['topics']: if qos is None: print("Server rejected resubscribe to topic: {}".format(topic)) # Callback when the subscribed topic receives a message def on_message_received(topic, payload, dup, qos, retain, **kwargs): # print("Received message from topic '{}': {}".format(topic, payload)) if topic.endswith("led"): parse_led_payload(payload) elif topic.endswith("s3/url"): parse_s3_url_payload(payload) def parse_led_payload(payload): data = json.loads(payload) if data.get("faces"): led_control(True) else: led_control(False) def led_control(light): print(["Light off", "Light on"][light]) if not DEBUG: GPIO.output(LED_PIN, GPIO.HIGH if light else GPIO.LOW) def parse_s3_url_payload(payload): data = json.dumps(payload) put_presigned_s3_url(data["url"], take_picture()) def take_picture(): cam.open() ret, frame = cam.read() ret, img = cv2.imencode(FORMAT, frame) cam.release() return img.tobytes() def put_presigned_s3_url(url, img_bytes): res = requests.put(url, data=img_bytes) print(f"Put to S3. status code: {res.status_code}") return res async def main(): global filename # create MQTT connection mqtt_connection = mqtt_connection_builder.mtls_from_path( endpoint=ENDPOINT, cert_filepath=CERT, pri_key_filepath=PRIVKEY, ca_filepath=CA, on_connection_interrupted=on_connection_interrupted, on_connection_resumed=on_connection_resumed, client_id=UUID ) connect_future = mqtt_connection.connect() connect_result = connect_future.result() print(f"{UUID} is connected!") # subscribe to MQTT topic subscribe_future, packet_id = mqtt_connection.subscribe( topic=LED_TOPIC, qos=mqtt.QoS.AT_LEAST_ONCE, callback=on_message_received ) subscribe_result = subscribe_future.result() print(f"Subscribed to {LED_TOPIC}") subscribe_future, packet_id = mqtt_connection.subscribe( topic=S3_URL_TOPIC, qos=mqtt.QoS.AT_LEAST_ONCE, callback=on_message_received ) subscribe_result = subscribe_future.result() print(f"Subscribed to {LED_TOPIC}") while True: command = input("> ") if command == "p": filename = datetime.datetime.strftime(datetime.datetime.now(), "%Y-%m-%d-%H-%M-%S") + "." + FORMAT publish_future, packet_id = mqtt_connection.publish( topic=S3_UPLOAD_TOPIC, qos=1, payload=json.dumps({ "filename": filename }) ) publish_result = publish_future.result() print(f"Upload request {filename} sent!") # run loop = asyncio.get_event_loop() loop.set_exception_handler(lambda loop, context: [print(context['message']), GPIO.cleanup()]) loop.create_task(main()) loop.run_forever()