import collections
import os
import select
[docs]class GPIOError(IOError):
"""Base class for GPIO errors."""
pass
[docs]class EdgeEvent(collections.namedtuple('EdgeEvent', ['edge', 'timestamp'])):
def __new__(cls, edge, timestamp):
"""EdgeEvent containing the event edge and event time reported by Linux.
Args:
edge (str): event edge, either "rising" or "falling".
timestamp (int): event time in nanoseconds.
"""
return super(EdgeEvent, cls).__new__(cls, edge, timestamp)
[docs]class GPIO(object):
def __new__(cls, *args, **kwargs):
if len(args) > 2:
return CdevGPIO.__new__(cls, *args, **kwargs)
else:
return SysfsGPIO.__new__(cls, *args, **kwargs)
def __del__(self):
self.close()
def __enter__(self):
return self
def __exit__(self, t, value, traceback):
self.close()
# Methods
[docs] def read(self):
"""Read the state of the GPIO.
Returns:
bool: ``True`` for high state, ``False`` for low state.
Raises:
GPIOError: if an I/O or OS error occurs.
"""
raise NotImplementedError()
[docs] def write(self, value):
"""Set the state of the GPIO to `value`.
Args:
value (bool): ``True`` for high state, ``False`` for low state.
Raises:
GPIOError: if an I/O or OS error occurs.
TypeError: if `value` type is not bool.
"""
raise NotImplementedError()
[docs] def poll(self, timeout=None):
"""Poll a GPIO for the edge event configured with the .edge property
with an optional timeout.
For character device GPIOs, the edge event should be consumed with
`read_event()`. For sysfs GPIOs, the edge event should be consumed with
`read()`.
`timeout` can be a positive number for a timeout in seconds, zero for a
non-blocking poll, or negative or None for a blocking poll. Default is
a blocking poll.
Args:
timeout (int, float, None): timeout duration in seconds.
Returns:
bool: ``True`` if an edge event occurred, ``False`` on timeout.
Raises:
GPIOError: if an I/O or OS error occurs.
TypeError: if `timeout` type is not None or int.
"""
raise NotImplementedError()
[docs] def read_event(self):
"""Read the edge event that occurred with the GPIO.
This method is intended for use with character device GPIOs and is
unsupported by sysfs GPIOs.
Returns:
EdgeEvent: a namedtuple containing the string edge event that
occurred (either ``"rising"`` or ``"falling"``), and the event time
reported by Linux in nanoseconds.
Raises:
GPIOError: if an I/O or OS error occurs.
NotImplementedError: if called on a sysfs GPIO.
"""
raise NotImplementedError()
[docs] @staticmethod
def poll_multiple(gpios, timeout=None):
"""Poll multiple GPIOs for the edge event configured with the .edge
property with an optional timeout.
For character device GPIOs, the edge event should be consumed with
`read_event()`. For sysfs GPIOs, the edge event should be consumed with
`read()`.
`timeout` can be a positive number for a timeout in seconds, zero for a
non-blocking poll, or negative or None for a blocking poll. Default is
a blocking poll.
Args:
gpios (list): list of GPIO objects to poll.
timeout (int, float, None): timeout duration in seconds.
Returns:
list: list of GPIO objects for which an edge event occurred.
Raises:
GPIOError: if an I/O or OS error occurs.
TypeError: if `timeout` type is not None or int.
"""
if not isinstance(timeout, (int, float, type(None))):
raise TypeError("Invalid timeout type, should be integer, float, or None.")
# Setup poll
p = select.poll()
# Register GPIO file descriptors and build map of fd to object
fd_gpio_map = {}
for gpio in gpios:
if isinstance(gpio, SysfsGPIO):
p.register(gpio.fd, select.POLLPRI | select.POLLERR)
else:
p.register(gpio.fd, select.POLLIN | select.POLLRDNORM)
fd_gpio_map[gpio.fd] = gpio
# Scale timeout to milliseconds
if isinstance(timeout, (int, float)) and timeout > 0:
timeout *= 1000
# Poll
events = p.poll(timeout)
# Gather GPIOs that had edge events occur
results = []
for (fd, _) in events:
gpio = fd_gpio_map[fd]
results.append(gpio)
if isinstance(gpio, SysfsGPIO):
# Rewind for read
try:
os.lseek(fd, 0, os.SEEK_SET)
except OSError as e:
raise GPIOError(e.errno, "Rewinding GPIO: " + e.strerror)
return results
[docs] def close(self):
"""Close the GPIO.
Raises:
GPIOError: if an I/O or OS error occurs.
"""
raise NotImplementedError()
# Immutable properties
@property
def devpath(self):
"""Get the device path of the underlying GPIO device.
:type: str
"""
raise NotImplementedError()
@property
def fd(self):
"""Get the line file descriptor of the GPIO object.
:type: int
"""
raise NotImplementedError()
@property
def line(self):
"""Get the GPIO object's line number.
:type: int
"""
raise NotImplementedError()
@property
def name(self):
"""Get the line name of the GPIO.
This method is intended for use with character device GPIOs and always
returns the empty string for sysfs GPIOs.
:type: str
"""
raise NotImplementedError()
@property
def label(self):
"""Get the line consumer label of the GPIO.
This method is intended for use with character device GPIOs and always
returns the empty string for sysfs GPIOs.
:type: str
"""
raise NotImplementedError()
@property
def chip_fd(self):
"""Get the GPIO chip file descriptor of the GPIO object.
This method is intended for use with character device GPIOs and is unsupported by sysfs GPIOs.
Raises:
NotImplementedError: if accessed on a sysfs GPIO.
:type: int
"""
raise NotImplementedError()
@property
def chip_name(self):
"""Get the name of the GPIO chip associated with the GPIO.
:type: str
"""
raise NotImplementedError()
@property
def chip_label(self):
"""Get the label of the GPIO chip associated with the GPIO.
:type: str
"""
raise NotImplementedError()
# Mutable properties
def _get_direction(self):
raise NotImplementedError()
def _set_direction(self, direction):
raise NotImplementedError()
direction = property(_get_direction, _set_direction)
"""Get or set the GPIO's direction. Can be "in", "out", "high", "low".
Direction "in" is input; "out" is output, initialized to low; "high" is
output, initialized to high; and "low" is output, initialized to low.
Raises:
GPIOError: if an I/O or OS error occurs.
TypeError: if `direction` type is not str.
ValueError: if `direction` value is invalid.
:type: str
"""
def _get_edge(self):
raise NotImplementedError()
def _set_edge(self, edge):
raise NotImplementedError()
edge = property(_get_edge, _set_edge)
"""Get or set the GPIO's interrupt edge. Can be "none", "rising",
"falling", "both".
Raises:
GPIOError: if an I/O or OS error occurs.
TypeError: if `edge` type is not str.
ValueError: if `edge` value is invalid.
:type: str
"""
def _get_bias(self):
raise NotImplementedError()
def _set_bias(self, bias):
raise NotImplementedError()
bias = property(_get_bias, _set_bias)
"""Get or set the GPIO's line bias. Can be "default", "pull_up",
"pull_down", "disable".
This property is not supported by sysfs GPIOs.
Raises:
GPIOError: if an I/O or OS error occurs.
TypeError: if `bias` type is not str.
ValueError: if `bias` value is invalid.
:type: str
"""
def _get_drive(self):
raise NotImplementedError()
def _set_drive(self, drive):
raise NotImplementedError()
drive = property(_get_drive, _set_drive)
"""Get or set the GPIO's line drive. Can be "default" (for push-pull),
"open_drain", "open_source".
This property is not supported by sysfs GPIOs.
Raises:
GPIOError: if an I/O or OS error occurs.
TypeError: if `drive` type is not str.
ValueError: if `drive` value is invalid.
:type: str
"""
def _get_inverted(self):
raise NotImplementedError()
def _set_inverted(self, inverted):
raise NotImplementedError()
inverted = property(_get_inverted, _set_inverted)
"""Get or set the GPIO's inverted (active low) property.
Raises:
GPIOError: if an I/O or OS error occurs.
TypeError: if `inverted` type is not bool.
:type: bool
"""
# String representation
def __str__(self):
"""Get the string representation of the GPIO.
:type: str
"""
raise NotImplementedError()
# Assign GPIO classes
from . import gpio_cdev1
from . import gpio_cdev2
from . import gpio_sysfs
CdevGPIO = gpio_cdev2.Cdev2GPIO if gpio_cdev2.Cdev2GPIO.SUPPORTED else gpio_cdev1.Cdev1GPIO
SysfsGPIO = gpio_sysfs.SysfsGPIO