feat: Button control and RGB LED

This commit is contained in:
rpi 2023-05-04 08:00:38 +01:00
parent f3beb33466
commit fca50be595

View File

@ -11,7 +11,7 @@ import requests
from awscrt import mqtt
from awsiot import mqtt_connection_builder
DEBUG = 1
DEBUG = 0
UUID = f"cloudprog-{uuid.uuid4()}"
@ -35,10 +35,20 @@ cam = cv2.VideoCapture()
if not DEBUG:
import RPi.GPIO as GPIO
LED_PIN = 32
STATUS_LED_PIN = 35
DETECT_LED_PIN = 37
ABSENT_LED_PIN = 36
BTN_PIN = 38
GPIO.setmode(GPIO.BOARD)
GPIO.setup(LED_PIN, GPIO.OUT)
GPIO.setup(STATUS_LED_PIN, GPIO.OUT)
GPIO.setup(DETECT_LED_PIN, GPIO.OUT)
GPIO.setup(ABSENT_LED_PIN, GPIO.OUT)
GPIO.setup(BTN_PIN, GPIO.IN)
# reset
GPIO.output(STATUS_LED_PIN, GPIO.LOW)
GPIO.output(DETECT_LED_PIN, GPIO.LOW)
GPIO.output(ABSENT_LED_PIN, GPIO.LOW)
def prompt_logging(message, prompt=PROMPT):
@ -86,16 +96,17 @@ def parse_led_payload(payload):
data = json.loads(payload.decode())
prompt_logging(f"detected faces: {data.get('faces')}")
if data.get("faces"):
led_control(True)
led_callback(True)
else:
led_control(False)
led_callback(False)
def led_control(light):
def led_callback(light):
prompt_logging(["Light off", "Light on"][light])
if not DEBUG:
GPIO.output(LED_PIN, GPIO.HIGH if light else GPIO.LOW)
GPIO.output(STATUS_LED_PIN, GPIO.LOW)
GPIO.output(DETECT_LED_PIN if light else ABSENT_LED_PIN, GPIO.HIGH)
def parse_s3_url_payload(payload):
@ -119,8 +130,26 @@ def post_presigned_s3_url(data, img_bytes):
return res
def picture_request(_channel):
if not DEBUG:
# reset
GPIO.output(STATUS_LED_PIN, GPIO.HIGH)
GPIO.output(DETECT_LED_PIN, GPIO.LOW)
GPIO.output(ABSENT_LED_PIN, GPIO.LOW)
filename = datetime.datetime.strftime(datetime.datetime.now(), "%Y-%m-%d-%H-%M-%S") + FORMAT
publish_future, packet_id = mqtt_connection.publish(
topic=S3_UPLOAD_TOPIC,
payload=json.dumps({ "filename": filename }),
qos=mqtt.QoS.AT_MOST_ONCE
)
publish_result = publish_future.result()
# print(publish_result)
print(f"Upload request {filename} sent!")
async def main(loop):
global filename
global filename, mqtt_connection
# create MQTT connection
mqtt_connection = mqtt_connection_builder.mtls_from_path(
endpoint=ENDPOINT,
@ -158,19 +187,14 @@ async def main(loop):
# print(subscribe_result)
print(f"Subscribed to {S3_URL_TOPIC}")
if not DEBUG:
GPIO.add_event_detect(BTN_PIN, GPIO.RISING, callback=picture_request, bouncetime=300)
while True:
command = input(PROMPT)
if command == "p":
filename = datetime.datetime.strftime(datetime.datetime.now(), "%Y-%m-%d-%H-%M-%S") + FORMAT
publish_future, packet_id = mqtt_connection.publish(
topic=S3_UPLOAD_TOPIC,
payload=json.dumps({ "filename": filename }),
qos=mqtt.QoS.AT_MOST_ONCE
)
publish_result = publish_future.result()
# print(publish_result)
print(f"Upload request {filename} sent!")
picture_request(BTN_PIN)
elif command == "q":
break
else: