import cv2 import numpy as np import requests import urllib.request import urllib.parse import serial import os from inference import predict, labels from extract import garbage_extract_no_preprocess CAM_IP = "192.168.154.28" API_BASE = "http://localhost:5000/api" PHOTO_PATH = "/tmp/photo.jpg" label_name = ["紙類", "塑膠類", "鋁罐"] def findCOM(): for i in range(0, 10): if os.path.exists(f'/dev/ttyACM{i}'): return f'/dev/ttyACM{i}' def API(endpoint): try: requests.get(API_BASE + endpoint) except requests.exceptions.RequestException as e: print(f"[!] Request failed: {e}") def loop(): ser = serial.Serial(findCOM(), 9600) try: while True: print("[+] Waiting for arduino...") response = ser.read(1) print(response) if (response == b's'): print("[+] Object placed!") API('/putdown') print("[+] Waiting for weight...") weight = float(ser.readline().decode()) print(f"[+] Got weight: {weight}!") print("[+] Taking photo...") takePhoto() API('/pic') print("[+] Took photo!") print("[+] Predicting...") label_idx = getPrediction(weight) print(f"[+] Got prediction: {label_name[label_idx]}!") print("[+] Sending prediction...") ser.write(bytes([label_idx])) API(f'/result?type={urllib.parse.quote_plus(label_name[label_idx])}') print("[+] Sent prediction!") print("[+] Waiting for finish...") while ser.read(1) != b'd': pass print("[+] Finished!") API('/ready') except: ser.close() def takePhoto(): try: URL = f"http://{CAM_IP}:8080/photo.jpg" imgResp = requests.get(URL).content imgNp = np.array(bytearray(imgResp), dtype=np.uint8) img = cv2.imdecode(imgNp, -1) img = cv2.rotate(img, cv2.ROTATE_90_CLOCKWISE) img = garbage_extract_no_preprocess(img) cv2.imwrite(PHOTO_PATH, img) except Exception as e: print(f"[!] Photo / Preprocess error: {e}") def getPrediction(weight): try: img = cv2.imread(PHOTO_PATH) label = predict(img, weight) except Exception as e: print(f"[!] Predict error: {e}") return 3 if label in ['paper_cup', 'paper_box', 'paper_milkbox']: return 0 elif label in ['plastic']: return 1 elif label in ['can']: return 2 def test(): import time print("[+] Waiting for arduino...") time.sleep(1) print("[+] Object placed!") API('/putdown') print("[+] Waiting for weight...") time.sleep(1.5) weight = 30.0 print(f"[+] Got weight: {weight}!") print("[+] Taking photo...") takePhoto() API(f'/pic') print("[+] Took photo!") print("[+] Predicting...") label_idx = getPrediction(weight) print(f"[+] Got prediction: {label_name[label_idx]}!") print("[+] Sending prediction...") API(f'/result?type={urllib.parse.quote_plus(label_name[label_idx])}') print("[+] Sent prediction!") print("[+] Waiting for finish...") time.sleep(5) print("[+] Finished!") API('/ready') time.sleep(5) while True: # test() loop()