poulet_py.hardware package#

class Arduino(ports=None)[source]#

Bases: object

__init__(ports=None)[source]#

Initialize the Arduino class with the given ports.

Args:

ports (list): List of serial port addresses.

close_port()[source]#

Close all serial connections.

log_error(error_message)[source]#
read_data(data_parser=<class 'float'>)[source]#

Read data from the Arduino and parse it.

Args:
data_parser (callable): Function to parse the data read from the Arduino.

Defaults to float.

Sets:

self.parsed_data: Parsed data from Arduino.

save_data(data)[source]#

Save the data to a CSV file.

Args:

timestamp: The timestamp to be saved.

set_error_log_path(folder_path, error_file_name)[source]#

Set the error log path for the Arduino.

Args:

folder_path (str): Path to the folder where the error log will be stored. error_file_name (str): Name of the error log file.

set_output_file(path, extra_name, data_columns=['data'], base_file_name='arduino')[source]#

Set the output file for the Arduino.

Args:

path (str): Path to the folder where the output file will be saved. extra_name (str): An additional name to be added to the base file name. base_file_name (str): Base name of the output file. Defaults to ‘arduino’.

set_timer(start_time)[source]#

Sets the timer for the camera.

Args:

start_time (float): The time at which the camera recording started.

class BaslerCamera(max_cameras=2)[source]#

Bases: object

A class to interact with multiple Basler cameras using pypylon and OpenCV. Each camera will record to its own video file and log timestamps to a CSV.

__init__(max_cameras=2)[source]#

Initializes the BaslerCamera object by enumerating devices and attaching up to max_cameras.

Args:

max_cameras (int): The maximum number of cameras to use.

capture_frame()[source]#

Captures a single frame from whichever camera has a frame ready. The frame is written to its corresponding video file and timestamp logged.

log_error(error_message)[source]#
recording(data_save_folder, cage_id, n_mouse, condition, mouse_ids=[], duration_s=10, buffer_s=10, total_rec=4, fps=30, video_format='mp4')[source]#
save_metadata(base_file_name='basler-camera', extra_name='')[source]#

Saves metadata about the recording for each camera to a JSON file.

Args:

path (str): Directory to save the metadata files. base_file_name (str, optional): Base name for the metadata files. extra_name (str, optional): Extra name to add to the file names.

save_timestamp(camera_index, timestamp)[source]#

Save a timestamp to the CSV file for the specified camera.

Args:

camera_index (int): Index of the camera. timestamp (float): Timestamp to record.

set_error_log_path(path, file_name)[source]#

Sets the error log file.

Args:

path (str): Directory for the error log. file_name (str): Name of the error log file.

set_frames_per_second(frames_per_second)[source]#

Sets the frame rate for each camera.

Args:

frames_per_second (float): Desired frame rate in frames per second.

set_output_file(path, extra_name, base_file_name='basler-camera')[source]#

Sets up output video files and timestamp CSV files for all cameras.

Args:

path (str): Directory to save the output files. extra_name (str): Extra name to add to the file names. base_file_name (str): Base name for the files.

set_timer(start_time)[source]#

Sets the timer for the camera.

Args:

start_time (float): The time at which the camera recording started.

start_streaming()[source]#

Starts the grabbing (streaming) for all cameras.

stop_streaming()[source]#

Stops the streaming and closes all cameras and video writers.

stream_video(window_width=None, window_height=None)[source]#

Streams the live video feed from all cameras. Each camera is shown in its own window.

Args:

window_width (int, optional): Width to resize the window. window_height (int, optional): Height to resize the window.

class JulaboChiller(port=None, baudrate=9600, timeout=1)[source]#

Bases: object

Class to interact with a Julabo water chiller via serial port.

__init__(port=None, baudrate=9600, timeout=1)[source]#

Initialize the JulaboChiller with the given serial port configuration.

Args:

port (str): The serial port (e.g., ‘COM12’). baudrate (int, optional): The baud rate for the serial communication. Default is 9600. timeout (int or float, optional): The read timeout value. Default is 1 second.

check_started()[source]#

Check whether the chiller has started.

Returns:

str: The started status.

check_status()[source]#

Check the status of the chiller.

Returns:

str: The status information.

check_version()[source]#

Check the version of the chiller.

Returns:

str: The version information.

close_port()[source]#

Close the serial port connection.

get_target_temperature()[source]#

Get the target temperature set point.

Returns:

str: The target temperature.

get_temperature()[source]#

Get the current temperature from the chiller.

Returns:

str: The current temperature reported by the chiller.

read()[source]#

Read data from the chiller.

Returns:

str: The response from the chiller, or None if no data is available.

save_temperature(timestamp, temperature)[source]#

Save the temperature to a CSV file.

Args:

timestamp: The timestamp to be saved. temperature: The temperature to be saved.

set_error_log_path(path, file_name)[source]#

Sets the path for the error log file.

Args:

path (str): The directory where the error log file will be saved.

set_output_file(path, extra_name, base_file_name='julabo_chiller')[source]#

Sets the output file for recording the video.

Args:

path (str): The directory where the output file will be saved.

set_temperature(temperature)[source]#

Set the temperature of the chiller.

Args:

temperature (float): The temperature to set (in Celsius).

set_timer(start_time)[source]#

Set the timer for the chiller.

Args:

start_time (float): The start time of the experiment.

start()[source]#

Turn on the chiller.

stop()[source]#

Turn off the chiller.

write(command)[source]#

Write a command to the chiller.

Args:

command (str): The command to send to the chiller.

class TCS(port, *, maximum_temperature=40, beep=False, trigger_out_channel=255, read_timeout=2, response_timeout=2)[source]#

Bases: object

Interface for communicating with a TCS thermal stimulator.

__init__(port, *, maximum_temperature=40, beep=False, trigger_out_channel=255, read_timeout=2, response_timeout=2)[source]#

Initialize TCS interface with validation.

Args:

port: Serial port device path (e.g. ‘/dev/ttyUSB0’ or ‘COM3’) maximum_temperature: Safety limit for maximum allowed temperature (0-60°C) beep: Whether to enable audible beep during stimulation trigger_out_channel: Output channel of trigger signal (1-255) read_timeout: Serial read timeout in seconds response_timeout: Timeout for command responses in seconds

close()[source]#

Close the connection and clean up resources.

execute_command(command, *args, expected_pattern=None)[source]#

Execute a command and optionally wait for a response.

Parameters:
  • command (TCSCommand) – The command to execute

  • *args – Arguments to format into the command

  • expected_pattern (Pattern | None) – Regex pattern to match against the response

Returns:

If expected_pattern provided, returns (timestamp, match) tuple

Return type:

tuple[int, Match[str]]] | None

Examples

>>> tcs.execute_command(TCSCommand.READ_INFO, expected_pattern=compile(r"Firmware:(.*)"))
get_readings()[source]#

Get current temperature readings from all sensors.

Returns:

Dictionary containing temperatures for neutral and all surfaces, plus a timestamp key.

Return type:

Dict[str, float]

Raises:

RuntimeError – If reading temperatures fails

info()[source]#

Get device information including firmware version and probe details.

Returns:

Device information string

Return type:

str

Raises:

RuntimeError – If the info command fails or times out

init()[source]#

Initialize the TCS connection and verify communication.

Raises:

RuntimeError – If initialization fails

reset()[source]#

Reset the TCS device to its default state.

property stimulus: TCSStimulus#
trigger()[source]#

Execute the configured stimulation.

Raises:

RuntimeError – If stimulation fails to trigger

write(command)[source]#

Write a command to the TCS device.

Parameters:

command (bytes) – The command to send

Returns:

Number of bytes written

Return type:

int

Raises:

RuntimeError – If the write operation fails

class TCSCommand(*values)[source]#

Bases: bytes, Enum

Enumeration of all available TCS commands with their byte representations.

Each command includes formatting capability for parameterized commands.

Examples

>>> TCSCommand.READ_TEMPERATURES
<TCSCommand.READ_TEMPERATURES: b'E'>
>>> TCSCommand.BASELINE_TEMPERATURE.format(300)
b'N300'
format(*args)[source]#

Format the command with the given arguments.

Parameters:

*args (int, float) – Arguments to format into the command string

Returns:

Formatted command string

Return type:

bytes

Raises:

ValueError – If arguments don’t match the command’s format requirements

Examples

>>> TCSCommand.TARGET_TEMPERATURE.format(1, 350)
b'C1350'
READ_INFO = b'H'#
READ_TEMPERATURES = b'E'#
READ_STIMULATION_VALUES = b'P'#
READ_BUTTON_STATUS = b'K'#
READ_BATTERY = b'B'#
READ_ERRORS = b'Q'#
DISPLAY_TEMPERATURES_BETWEEN_STIMULATION = b'Oa'#
DISPLAY_TEMPERATURES_DURING_STIMULATION = b'Ob'#
RESET = b'Oc'#
SET_MAX_TEMPERATURE = b'Om%03d'#
AUTOMATIC_CALIBRATION = b'G'#
DEACTIVATE_DISPLAY = b'F'#
TRIGGER_STIMULATION = b'L'#
HALT_STIMULATION = b'A'#
BASELINE_TEMPERATURE = b'N%03d'#
SURFACE_SELECTION = b'S%05d'#
TARGET_TEMPERATURE = b'C%d%03d'#
STIMULATION_RATE = b'V%d%04d'#
RETURN_SPEED = b'R%d%04d'#
STIMULATION_DURATION = b'D%d%05d'#
TRIGGER_CHANNEL_DURATION = b'T%03d%03d'#
BUZZER = b'Z%03d%03d'#
class TCSStimulus(**data)[source]#

Bases: BaseModel

Configuration for thermal stimulation parameters.

commands()[source]#

Generate the sequence of commands needed to configure this stimulus.

Returns:

Sequence of formatted command strings

Return type:

Tuple[bytes, …]

Examples

>>> stimulus = TCSStimulus(surface=1)
>>> stimulus.commands()
[b'S10000', b'N300', b'C1000', b'V10010', b'D100100', b'R10010']
model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

surface: int#
baseline: float#
target: float#
rise_rate: float#
return_speed: float#
duration: int#

Subpackages#