186 lines
4.9 KiB
Python
186 lines
4.9 KiB
Python
import sys
|
|
import json
|
|
import uuid
|
|
import asyncio
|
|
import datetime
|
|
import traceback
|
|
|
|
import cv2
|
|
import requests
|
|
|
|
from awscrt import mqtt
|
|
from awsiot import mqtt_connection_builder
|
|
|
|
DEBUG = 0
|
|
|
|
UUID = f"cloudprog-{uuid.uuid4()}"
|
|
|
|
ENDPOINT = ""
|
|
CA = ""
|
|
CERT = ""
|
|
PRIVKEY = ""
|
|
|
|
LED_TOPIC = ""
|
|
|
|
# https://aws.amazon.com/tw/blogs/iot/securely-ingesting-large-sized-payloads-from-iot-devices-to-the-aws-cloud/
|
|
S3_UPLOAD_TOPIC = ""
|
|
S3_URL_TOPIC = ""
|
|
|
|
FORMAT = "jpg"
|
|
|
|
filename = None
|
|
cam = cv2.VideoCapture(0)
|
|
|
|
if not DEBUG:
|
|
import RPi.GPIO as GPIO
|
|
|
|
LED_PIN = 32
|
|
|
|
GPIO.setmode(GPIO.BOARD)
|
|
GPIO.setup(LED_PIN, GPIO.OUT)
|
|
|
|
|
|
# Callback when connection is accidentally lost.
|
|
def on_connection_interrupted(connection, error, **kwargs):
|
|
print("Connection interrupted. error: {}".format(error))
|
|
|
|
|
|
# Callback when an interrupted connection is re-established.
|
|
def on_connection_resumed(connection, return_code, session_present, **kwargs):
|
|
print("Connection resumed. return_code: {} session_present: {}".format(return_code, session_present))
|
|
|
|
if return_code == mqtt.ConnectReturnCode.ACCEPTED and not session_present:
|
|
print("Session did not persist. Resubscribing to existing topics...")
|
|
resubscribe_future, _ = connection.resubscribe_existing_topics()
|
|
|
|
# Cannot synchronously wait for resubscribe result because we're on the connection's event-loop thread,
|
|
# evaluate result with a callback instead.
|
|
resubscribe_future.add_done_callback(on_resubscribe_complete)
|
|
|
|
|
|
def on_resubscribe_complete(resubscribe_future):
|
|
resubscribe_results = resubscribe_future.result()
|
|
print("Resubscribe results: {}".format(resubscribe_results))
|
|
|
|
for topic, qos in resubscribe_results['topics']:
|
|
if qos is None:
|
|
print("Server rejected resubscribe to topic: {}".format(topic))
|
|
|
|
|
|
# Callback when the subscribed topic receives a message
|
|
def on_message_received(topic, payload, dup, qos, retain, **kwargs):
|
|
# print("Received message from topic '{}': {}".format(topic, payload))
|
|
|
|
if topic.endswith("led"):
|
|
parse_led_payload(payload)
|
|
elif topic.endswith("s3/url"):
|
|
parse_s3_url_payload(payload)
|
|
|
|
|
|
def parse_led_payload(payload):
|
|
data = json.loads(payload.decode())
|
|
if data.get("faces"):
|
|
led_control(True)
|
|
else:
|
|
led_control(False)
|
|
|
|
|
|
def led_control(light):
|
|
print(["Light off", "Light on"][light])
|
|
|
|
if not DEBUG:
|
|
GPIO.output(LED_PIN, GPIO.HIGH if light else GPIO.LOW)
|
|
|
|
|
|
def parse_s3_url_payload(payload):
|
|
data = json.loads(payload.decode())
|
|
|
|
put_presigned_s3_url(data["url"], take_picture())
|
|
|
|
|
|
def take_picture():
|
|
cam.open()
|
|
ret, frame = cam.read()
|
|
ret, img = cv2.imencode(FORMAT, frame)
|
|
cam.release()
|
|
return img.tobytes()
|
|
|
|
|
|
def put_presigned_s3_url(url, img_bytes):
|
|
res = requests.put(url, data=img_bytes)
|
|
print(f"Put to S3. status code: {res.status_code}")
|
|
return res
|
|
|
|
|
|
async def main():
|
|
global filename
|
|
# create MQTT connection
|
|
mqtt_connection = mqtt_connection_builder.mtls_from_path(
|
|
endpoint=ENDPOINT,
|
|
cert_filepath=CERT,
|
|
pri_key_filepath=PRIVKEY,
|
|
ca_filepath=CA,
|
|
on_connection_interrupted=on_connection_interrupted,
|
|
on_connection_resumed=on_connection_resumed,
|
|
client_id=UUID
|
|
)
|
|
|
|
connect_future = mqtt_connection.connect()
|
|
|
|
connect_result = connect_future.result()
|
|
print(f"{UUID} is connected!")
|
|
|
|
# subscribe to MQTT topic
|
|
subscribe_future, packet_id = mqtt_connection.subscribe(
|
|
topic=LED_TOPIC,
|
|
qos=mqtt.QoS.AT_LEAST_ONCE,
|
|
callback=on_message_received
|
|
)
|
|
|
|
subscribe_result = subscribe_future.result()
|
|
print(subscribe_result)
|
|
print(f"Subscribed to {LED_TOPIC}")
|
|
|
|
subscribe_future, packet_id = mqtt_connection.subscribe(
|
|
topic=S3_URL_TOPIC,
|
|
qos=mqtt.QoS.AT_LEAST_ONCE,
|
|
callback=on_message_received
|
|
)
|
|
|
|
subscribe_result = subscribe_future.result()
|
|
print(subscribe_result)
|
|
print(f"Subscribed to {S3_URL_TOPIC}")
|
|
|
|
while True:
|
|
command = input("> ")
|
|
|
|
if command == "p":
|
|
filename = datetime.datetime.strftime(datetime.datetime.now(), "%Y-%m-%d-%H-%M-%S") + "." + FORMAT
|
|
publish_future, packet_id = mqtt_connection.publish(
|
|
topic=S3_UPLOAD_TOPIC,
|
|
payload=json.dumps({ "filename": filename }),
|
|
qos=mqtt.QoS.AT_LEAST_ONCE
|
|
)
|
|
publish_result = publish_future.result()
|
|
print(publish_result)
|
|
print(f"Upload request {filename} sent!")
|
|
elif command == "q":
|
|
break
|
|
|
|
|
|
def handler(loop, context):
|
|
e = context['exception']
|
|
print(*traceback.format_exception(None, e, e.__traceback__), file=sys.stderr, flush=True)
|
|
if not DEBUG:
|
|
GPIO.cleanup()
|
|
|
|
loop.stop()
|
|
|
|
# run
|
|
loop = asyncio.get_event_loop()
|
|
loop.set_exception_handler(handler)
|
|
|
|
loop.create_task(main())
|
|
|
|
loop.run_forever()
|