bale real-time sorting conveyor ito using deep learning at raspberry pi 5. Nag-train ako ng deep learning model sa roboflow.com then ginawan ko ng workflow.
Process:
1st IR sensor will detect the object to trigger the camera module v3 to capture
2nd IR sensor will detect the object to trigger the servo motor to divert the object if the decision is "REJECT". kung ang decision ng model is "ACCEPTED", hindi gagalaw yung servo motor.
Pa-check naman po kung tama yung code
Process:
1st IR sensor will detect the object to trigger the camera module v3 to capture
2nd IR sensor will detect the object to trigger the servo motor to divert the object if the decision is "REJECT". kung ang decision ng model is "ACCEPTED", hindi gagalaw yung servo motor.
Pa-check naman po kung tama yung code
Python:
[B]import os
import time
from collections import deque
import cv2
from gpiozero import DigitalInputDevice, Servo
from inference import InferencePipeline # SDK: video + workflows
# =========================
# CONFIGURATION
# =========================
# Roboflow API key (or set ROBOFLOW_API_KEY in env)
API_KEY = os.environ.get("ROBOFLOW_API_KEY", "api key")
# From the "Deploy Workflow" panel in Roboflow
WORKSPACE_NAME = "okra-conveyorv2" # e.g. "okra-conveyorv2"
WORKFLOW_ID = "custom-workflow-2" # e.g. "okra-sorting-workflow"
# Name of the key in Workflow outputs that contains the decision string
# Ensure your Workflow has an output (e.g. E×ρréššion / JSON Parser) named "decision"
WORKFLOW_DECISION_KEY = "decision" # expected: "ACCEPT" / "REJECT"
# GPIO PINS
PIN_IR_CAMERA = 17 # Sensor 1: detect okra entering
PIN_IR_SERVO = 27 # Sensor 2: at sorter
PIN_SERVO = 12 # Servo pin
# SERVO SETTINGS
SERVO_OPEN = -0.5 # "default" chute position
SERVO_KICK = 0.5 # "reject" position
KICK_DURATION = 0.3 # seconds to hold reject position
# Sleep debouncing for IR sensors (seconds)
SENSOR_DEBOUNCE = 0.4
# =========================
# HARDWARE INIT
# =========================
print("[INIT] Initializing hardware...")
servo = Servo(PIN_SERVO)
ir_camera = DigitalInputDevice(PIN_IR_CAMERA)
ir_sorter = DigitalInputDevice(PIN_IR_SERVO)
servo.value = SERVO_OPEN
# Queue to keep decisions between sensor 1 and sensor 2
decision_queue = deque()
print("[READY] Hardware initialized. Starting Workflow pipeline...")
# =========================
# WORKFLOW CALLBACK
# =========================
def my_sink(result, video_frame):
"""
Called by InferencePipeline on every frame.
- `result`: dict of Workflow outputs for this frame.
- `video_frame.image`: numpy array (BGR) for display / debugging.
"""
global decision_queue
frame = video_frame.image
# --- OPTIONAL: Display the live annotated / raw frame ---
# If your Workflow has a visualization block (e.g. "label_visualization"),
# you can show that instead:
#
# if "label_visualization" in result:
# frame_to_show = result["label_visualization"].numpy_image
# else:
# frame_to_show = frame
#
frame_to_show = frame
cv2.imshow("Okra Sorter (Workflow Stream)", frame_to_show)
cv2.waitKey(1)
# --- SENSOR 1: TRIGGER INFERENCE DECISION ---
# First IR sensor: object enters camera zone
if ir_camera.is_active:
print("[SENSOR 1] Okra detected. Using this frame's Workflow decision...")
# Extract decision for THIS frame from Workflow output.
# NOTE: You *must* ensure your Workflow exposes a top-level key
# named WORKFLOW_DECISION_KEY that contains "ACCEPT"/"REJECT".
decision = "REJECT" # default fallback
if WORKFLOW_DECISION_KEY in result:
decision = str(result[WORKFLOW_DECISION_KEY])
else:
# If this prints all the time, inspect `result` and adjust parsing.
print(f"[WARN] '{WORKFLOW_DECISION_KEY}' not found in result. Raw result:")
print(result)
print(f"[SENSOR 1] AI decision: {decision}")
decision_queue.append(decision)
# Simple debounce so we don't enqueue many decisions for one object
time.sleep(SENSOR_DEBOUNCE)
# --- SENSOR 2: TRIGGER SERVO BASED ON QUEUED DECISION ---
# Second IR sensor: object has reached the sorting point
if ir_sorter.is_active:
if decision_queue:
action = decision_queue.popleft()
print(f"[SENSOR 2] Sorting action: {action}")
# Move servo ONLY if decision == "REJECT"
if action.upper() == "REJECT":
print("[ACTUATOR] REJECT -> Kicking object off conveyor")
servo.value = SERVO_KICK
time.sleep(KICK_DURATION)
servo.value = SERVO_OPEN
else:
print("[ACTUATOR] ACCEPT -> Letting object pass")
else:
print("[SENSOR 2] No decision in queue (ghost object?)")
# Debounce for sensor 2
time.sleep(SENSOR_DEBOUNCE)
# =========================
# START PIPELINE
# =========================
pipeline = InferencePipeline.init_with_workflow(
api_key=API_KEY,
workspace_name=WORKSPACE_NAME,
workflow_id=WORKFLOW_ID,
video_reference=0, # 0 = default camera / /dev/video0
max_fps=30,
on_prediction=my_sink # tie Workflow directly to video stream
)
# This matches the pattern in the Workflows docs for processing video streams.
# [[Deploy a workflow](https://docs.roboflow.com/workflows/deploy-a-workflow#process-video-stream-rtsp-webcam)]
try:
pipeline.start() # starts a background thread that calls my_sink per frame
pipeline.join() # wait until pipeline stops (Ctrl+C to interrupt)
except KeyboardInterrupt:
print("\n[INFO] KeyboardInterrupt received. Exiting...")
finally:
print("[CLEANUP] Cleaning up GPIO and OpenCV windows...")
servo.close()
cv2.destroyAllWindows()[/B]