Source code for poulet_py.hardware.triggers.gpio
try:
from time import sleep, time
from typing import Literal
from gpiozero import Button
from pydantic import Field, PrivateAttr
from poulet_py import LOGGER, BaseTrigger
except ImportError as e:
msg = """
Missing 'gpio' module. Install options:
- Dedicated: pip install poulet_py[gpio]
- Sub-Module: pip install poulet_py[triggers]
- Module: pip install poulet_py[hardware]
- Full: pip install poulet_py[all]
"""
raise ImportError(msg) from e
[docs]
class GPIOTrigger(BaseTrigger):
"""GPIO-based trigger using gpiozero."""
pin: int = Field(..., description="GPIO pin number")
pull_up: bool = Field(False, description="Use pull-up resistor")
edge: Literal["rising", "falling", "both"] = Field("rising", description="Edge to detect")
_triggered: bool = PrivateAttr(False)
_device: Button | None = PrivateAttr(None)
[docs]
def __init__(self, **data):
super().__init__(**data)
self._setup()
def _setup(self) -> None:
"""Setup GPIO device."""
try:
self._device = Button(self.pin, pull_up=self.pull_up)
self._device.when_activated = self._on_rising
self._device.when_deactivated = self._on_falling
except Exception as e:
msg = f"Failed to initialize GPIO pin {self.pin}: {e}"
raise RuntimeError(msg) from e
def _on_rising(self):
if self.edge in ("rising", "both"):
self._triggered = True
def _on_falling(self):
if self.edge in ("falling", "both"):
self._triggered = True
[docs]
def wait(self) -> bool:
"""Wait for GPIO event."""
try:
start = time()
self._triggered = False
while not self._triggered:
if self.timeout and time() - start > self.timeout:
return False
sleep(0.001)
return True
except Exception as e:
LOGGER.error(f"Error waiting for GPIO event: {e}")
return False
[docs]
def cleanup(self) -> None:
"""Cleanup GPIO resources."""
if self._device is not None:
self._device.close()
self._device = None