2023-01-10 01:52:32 +08:00

136 lines
3.4 KiB
Python

import cv2
import numpy as np
import requests
import urllib.request
import urllib.parse
import serial
import os
from inference import predict, labels
from extract import garbage_extract_no_preprocess
CAM_IP = "192.168.154.28"
API_BASE = "http://localhost:5000/api"
PHOTO_PATH = "/tmp/photo.jpg"
label_name = ["紙類", "塑膠類", "鋁罐"]
def findCOM():
for i in range(0, 10):
if os.path.exists(f'/dev/ttyACM{i}'):
return f'/dev/ttyACM{i}'
def API(endpoint):
try:
requests.get(API_BASE + endpoint)
except requests.exceptions.RequestException as e:
print(f"[!] Request failed: {e}")
def loop():
ser = serial.Serial(findCOM(), 9600)
try:
while True:
print("[+] Waiting for arduino...")
response = ser.read(1)
print(response)
if (response == b's'):
print("[+] Object placed!")
API('/putdown')
print("[+] Waiting for weight...")
weight = float(ser.readline().decode())
print(f"[+] Got weight: {weight}!")
print("[+] Taking photo...")
takePhoto()
API('/pic')
print("[+] Took photo!")
print("[+] Predicting...")
label_idx = getPrediction(weight)
print(f"[+] Got prediction: {label_name[label_idx]}!")
print("[+] Sending prediction...")
ser.write(bytes([label_idx]))
API(f'/result?{urllib.parse.quote_plus(label_name[label_idx])}')
print("[+] Sent prediction!")
print("[+] Waiting for finish...")
while ser.read(1) != b'd':
pass
print("[+] Finished!")
API('/ready')
except:
ser.close()
def takePhoto():
try:
URL = f"http://{CAM_IP}:8080/photo.jpg"
imgResp = requests.get(URL).content
imgNp = np.array(bytearray(imgResp), dtype=np.uint8)
img = cv2.imdecode(imgNp, -1)
img = cv2.rotate(img, cv2.ROTATE_90_CLOCKWISE)
img = garbage_extract_no_preprocess(img)
cv2.imwrite(PHOTO_PATH, img)
except Exception as e:
print(f"[!] Photo / Preprocess error: {e}")
def getPrediction(weight):
try:
img = cv2.imread(PHOTO_PATH)
label = predict(img, weight)
except Exception as e:
print(f"[!] Predict error: {e}")
return 3
if label in ['paper_cup', 'paper_box', 'paper_milkbox']:
return 0
elif label in ['plastic']:
return 1
elif label in ['can']:
return 2
def test():
import time
print("[+] Waiting for arduino...")
time.sleep(1)
print("[+] Object placed!")
API('/putdown')
print("[+] Waiting for weight...")
time.sleep(1.5)
weight = 30.0
print(f"[+] Got weight: {weight}!")
print("[+] Taking photo...")
takePhoto()
API(f'/pic')
print("[+] Took photo!")
print("[+] Predicting...")
label_idx = getPrediction(weight)
print(f"[+] Got prediction: {label_name[label_idx]}!")
print("[+] Sending prediction...")
API(f'/result?type={urllib.parse.quote_plus(label_name[label_idx])}')
print("[+] Sent prediction!")
print("[+] Waiting for finish...")
time.sleep(5)
print("[+] Finished!")
API('/ready')
time.sleep(5)
while True:
test()
# loop()