From fca50be595af3788e8d5214b06340e1baa4e10e8 Mon Sep 17 00:00:00 2001 From: rpi Date: Thu, 4 May 2023 08:00:38 +0100 Subject: [PATCH] feat: Button control and RGB LED --- led_control.py | 62 ++++++++++++++++++++++++++++++++++---------------- 1 file changed, 43 insertions(+), 19 deletions(-) diff --git a/led_control.py b/led_control.py index 486078a..c2f1698 100644 --- a/led_control.py +++ b/led_control.py @@ -11,7 +11,7 @@ import requests from awscrt import mqtt from awsiot import mqtt_connection_builder -DEBUG = 1 +DEBUG = 0 UUID = f"cloudprog-{uuid.uuid4()}" @@ -35,10 +35,20 @@ cam = cv2.VideoCapture() if not DEBUG: import RPi.GPIO as GPIO - LED_PIN = 32 + STATUS_LED_PIN = 35 + DETECT_LED_PIN = 37 + ABSENT_LED_PIN = 36 + BTN_PIN = 38 GPIO.setmode(GPIO.BOARD) - GPIO.setup(LED_PIN, GPIO.OUT) + GPIO.setup(STATUS_LED_PIN, GPIO.OUT) + GPIO.setup(DETECT_LED_PIN, GPIO.OUT) + GPIO.setup(ABSENT_LED_PIN, GPIO.OUT) + GPIO.setup(BTN_PIN, GPIO.IN) + # reset + GPIO.output(STATUS_LED_PIN, GPIO.LOW) + GPIO.output(DETECT_LED_PIN, GPIO.LOW) + GPIO.output(ABSENT_LED_PIN, GPIO.LOW) def prompt_logging(message, prompt=PROMPT): @@ -86,16 +96,17 @@ def parse_led_payload(payload): data = json.loads(payload.decode()) prompt_logging(f"detected faces: {data.get('faces')}") if data.get("faces"): - led_control(True) + led_callback(True) else: - led_control(False) + led_callback(False) -def led_control(light): +def led_callback(light): prompt_logging(["Light off", "Light on"][light]) if not DEBUG: - GPIO.output(LED_PIN, GPIO.HIGH if light else GPIO.LOW) + GPIO.output(STATUS_LED_PIN, GPIO.LOW) + GPIO.output(DETECT_LED_PIN if light else ABSENT_LED_PIN, GPIO.HIGH) def parse_s3_url_payload(payload): @@ -119,8 +130,26 @@ def post_presigned_s3_url(data, img_bytes): return res +def picture_request(_channel): + if not DEBUG: + # reset + GPIO.output(STATUS_LED_PIN, GPIO.HIGH) + GPIO.output(DETECT_LED_PIN, GPIO.LOW) + GPIO.output(ABSENT_LED_PIN, GPIO.LOW) + + filename = datetime.datetime.strftime(datetime.datetime.now(), "%Y-%m-%d-%H-%M-%S") + FORMAT + publish_future, packet_id = mqtt_connection.publish( + topic=S3_UPLOAD_TOPIC, + payload=json.dumps({ "filename": filename }), + qos=mqtt.QoS.AT_MOST_ONCE + ) + publish_result = publish_future.result() + # print(publish_result) + print(f"Upload request {filename} sent!") + + async def main(loop): - global filename + global filename, mqtt_connection # create MQTT connection mqtt_connection = mqtt_connection_builder.mtls_from_path( endpoint=ENDPOINT, @@ -158,19 +187,14 @@ async def main(loop): # print(subscribe_result) print(f"Subscribed to {S3_URL_TOPIC}") + if not DEBUG: + GPIO.add_event_detect(BTN_PIN, GPIO.RISING, callback=picture_request, bouncetime=300) + while True: command = input(PROMPT) - + if command == "p": - filename = datetime.datetime.strftime(datetime.datetime.now(), "%Y-%m-%d-%H-%M-%S") + FORMAT - publish_future, packet_id = mqtt_connection.publish( - topic=S3_UPLOAD_TOPIC, - payload=json.dumps({ "filename": filename }), - qos=mqtt.QoS.AT_MOST_ONCE - ) - publish_result = publish_future.result() - # print(publish_result) - print(f"Upload request {filename} sent!") + picture_request(BTN_PIN) elif command == "q": break else: @@ -202,4 +226,4 @@ except KeyboardInterrupt: pass finally: if not DEBUG: - GPIO.cleanup() \ No newline at end of file + GPIO.cleanup()