import sys import json import uuid import asyncio import datetime import traceback import cv2 import requests from awscrt import mqtt from awsiot import mqtt_connection_builder DEBUG = 1 UUID = f"cloudprog-{uuid.uuid4()}" ENDPOINT = "a1io7cze9x1oli-ats.iot.us-east-1.amazonaws.com" CA = "rootCA.pem" CERT = "cert.crt" PRIVKEY = "privkey.pem" LED_TOPIC = "cloudprog/hw4/team02/led" # https://aws.amazon.com/tw/blogs/iot/securely-ingesting-large-sized-payloads-from-iot-devices-to-the-aws-cloud/ S3_UPLOAD_TOPIC = "cloudprog/hw4/team02/s3/upload" S3_URL_TOPIC = "cloudprog/hw4/team02/s3/url" FORMAT = ".jpg" filename = None cam = cv2.VideoCapture(0) if not DEBUG: import RPi.GPIO as GPIO LED_PIN = 32 GPIO.setmode(GPIO.BOARD) GPIO.setup(LED_PIN, GPIO.OUT) # Callback when connection is accidentally lost. def on_connection_interrupted(connection, error, **kwargs): print("Connection interrupted. error: {}".format(error)) # Callback when an interrupted connection is re-established. def on_connection_resumed(connection, return_code, session_present, **kwargs): print("Connection resumed. return_code: {} session_present: {}".format(return_code, session_present)) if return_code == mqtt.ConnectReturnCode.ACCEPTED and not session_present: print("Session did not persist. Resubscribing to existing topics...") resubscribe_future, _ = connection.resubscribe_existing_topics() # Cannot synchronously wait for resubscribe result because we're on the connection's event-loop thread, # evaluate result with a callback instead. resubscribe_future.add_done_callback(on_resubscribe_complete) def on_resubscribe_complete(resubscribe_future): resubscribe_results = resubscribe_future.result() print("Resubscribe results: {}".format(resubscribe_results)) for topic, qos in resubscribe_results['topics']: if qos is None: print("Server rejected resubscribe to topic: {}".format(topic)) # Callback when the subscribed topic receives a message def on_message_received(topic, payload, dup, qos, retain, **kwargs): # print("Received message from topic '{}': {}".format(topic, payload)) if topic.endswith("led"): parse_led_payload(payload) elif topic.endswith("s3/url"): parse_s3_url_payload(payload) def parse_led_payload(payload): data = json.loads(payload.decode()) if data.get("faces"): led_control(True) else: led_control(False) def led_control(light): print(["Light off", "Light on"][light]) if not DEBUG: GPIO.output(LED_PIN, GPIO.HIGH if light else GPIO.LOW) def parse_s3_url_payload(payload): data = json.loads(payload.decode()) post_presigned_s3_url(data, take_picture()) def take_picture(): #cam.open() ret, frame = cam.read() ret, img = cv2.imencode(FORMAT, frame) cam.release() return img.tobytes() def post_presigned_s3_url(data, img_bytes): fn = data['fields']['key'] res = requests.post(data['url'], data=data['fields'], files={ "file": (fn, img_bytes) }) print(f"Put to S3. status code: {res.status_code}") return res async def main(loop): global filename # create MQTT connection mqtt_connection = mqtt_connection_builder.mtls_from_path( endpoint=ENDPOINT, cert_filepath=CERT, pri_key_filepath=PRIVKEY, ca_filepath=CA, on_connection_interrupted=on_connection_interrupted, on_connection_resumed=on_connection_resumed, client_id=UUID ) connect_future = mqtt_connection.connect() connect_result = connect_future.result() print(f"{UUID} is connected!") # subscribe to MQTT topic subscribe_future, packet_id = mqtt_connection.subscribe( topic=LED_TOPIC, qos=mqtt.QoS.AT_MOST_ONCE, callback=on_message_received ) subscribe_result = subscribe_future.result() print(subscribe_result) print(f"Subscribed to {LED_TOPIC}") subscribe_future, packet_id = mqtt_connection.subscribe( topic=S3_URL_TOPIC, qos=mqtt.QoS.AT_MOST_ONCE, callback=on_message_received ) subscribe_result = subscribe_future.result() print(subscribe_result) print(f"Subscribed to {S3_URL_TOPIC}") while True: command = input("> ") if command == "p": filename = datetime.datetime.strftime(datetime.datetime.now(), "%Y-%m-%d-%H-%M-%S") + FORMAT publish_future, packet_id = mqtt_connection.publish( topic=S3_UPLOAD_TOPIC, payload=json.dumps({ "filename": filename }), qos=mqtt.QoS.AT_MOST_ONCE ) publish_result = publish_future.result() print(publish_result) print(f"Upload request {filename} sent!") elif command == "q": break loop.stop() def handler(loop, context): e = context['exception'] print(*traceback.format_exception(None, e, e.__traceback__), file=sys.stderr, flush=True) if not DEBUG: GPIO.cleanup() loop.stop() # run loop = asyncio.get_event_loop() loop.set_exception_handler(handler) loop.create_task(main(loop)) loop.run_forever()