Source code for poulet_py.hardware.stimulator.julabo
try:
import csv
import os
import time
import serial
from dotenv import find_dotenv, load_dotenv
except ImportError as e:
msg = """
Missing 'julabo' module. Install options:
- Dedicated: pip install poulet_py[julabo]
- Module: pip install poulet_py[hardware]
- Full: pip install poulet_py[all]
"""
raise ImportError(msg) from e
[docs]
class JulaboChiller:
"""Class to interact with a Julabo water chiller via serial port."""
[docs]
def __init__(self, port=None, baudrate=9600, timeout=1):
"""
Initialize the JulaboChiller with the given serial port configuration.
Args:
port (str): The serial port (e.g., 'COM12').
baudrate (int, optional): The baud rate for the serial communication. Default is 9600.
timeout (int or float, optional): The read timeout value. Default is 1 second.
"""
if port is None:
# check in env cariable
dotenv_path = find_dotenv(usecwd=True)
load_dotenv(dotenv_path)
self.port = os.getenv("CHILLER_PORT")
if self.port is None:
raise ValueError("No serial port specified in .env file or argument.")
else:
self.port = port
self.baudrate = baudrate
self.timeout = timeout
self.start_time = None
self.ser = self._configure_serial_port()
def _configure_serial_port(self):
"""Configure the serial port with the given settings."""
return serial.Serial(
port=self.port,
baudrate=self.baudrate,
timeout=self.timeout,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
)
[docs]
def set_timer(self, start_time):
"""Set the timer for the chiller.
Args:
start_time (float): The start time of the experiment.
"""
self.start_time = start_time
[docs]
def set_error_log_path(self, path, file_name):
"""
Sets the path for the error log file.
Args:
path (str): The directory where the error log file will be saved.
"""
self.error_log_file = os.path.join(path, file_name)
[docs]
def set_output_file(self, path, extra_name, base_file_name="julabo_chiller"):
"""
Sets the output file for recording the video.
Args:
path (str): The directory where the output file will be saved.
"""
self.output_file = os.path.join(path, f"{base_file_name}-{extra_name}.csv")
if not os.path.isfile(self.output_file):
with open(self.output_file, mode="w", newline="") as csvfile:
writer = csv.writer(csvfile)
writer.writerow(["timestamp", "temperature"])
[docs]
def save_temperature(self, timestamp, temperature):
"""
Save the temperature to a CSV file.
Args:
timestamp: The timestamp to be saved.
temperature: The temperature to be saved.
"""
try:
with open(self.output_file, mode="a", newline="") as csvfile:
writer = csv.writer(csvfile)
writer.writerow([timestamp, temperature])
except Exception as e:
print(f"Error saving temperature: {e}")
[docs]
def read(self):
"""Read data from the chiller.
Returns:
str: The response from the chiller, or None if no data is available.
"""
try:
time.sleep(0.1) # Give the device some time to respond
if self.ser.in_waiting > 0:
data = self.ser.readline().decode("ascii").strip()
if self.start_time:
self.latest_timestamp = time.time() - self.start_time
return data
else:
return None
except Exception as e:
print(f"Error reading from serial port: {e}")
return None
[docs]
def write(self, command):
"""Write a command to the chiller.
Args:
command (str): The command to send to the chiller.
"""
try:
self.ser.write(command.encode("ascii") + b"\r\n")
time.sleep(0.1) # Give the device some time to process the command
except Exception as e:
print(f"Error writing to serial port: {e}")
[docs]
def close_port(self):
"""Close the serial port connection."""
self.ser.close()
[docs]
def set_temperature(self, temperature):
"""Set the temperature of the chiller.
Args:
temperature (float): The temperature to set (in Celsius).
"""
command = f"OUT_SP_00 {temperature:.1f}"
self.write(command)
[docs]
def get_temperature(self):
"""Get the current temperature from the chiller.
Returns:
str: The current temperature reported by the chiller.
"""
self.write("IN_PV_00")
self.latest_temperature = self.read()
return self.latest_temperature
[docs]
def start(self):
"""Turn on the chiller."""
self.write("OUT_MODE_05 1")
[docs]
def stop(self):
"""Turn off the chiller."""
self.write("OUT_MODE_05 0")
[docs]
def check_version(self):
"""Check the version of the chiller.
Returns:
str: The version information.
"""
self.write("VERSION")
return self.read()
[docs]
def check_status(self):
"""Check the status of the chiller.
Returns:
str: The status information.
"""
self.write("STATUS")
return self.read()
[docs]
def check_started(self):
"""Check whether the chiller has started.
Returns:
str: The started status.
"""
self.write("IN_MODE_05")
return self.read()
[docs]
def get_target_temperature(self):
"""Get the target temperature set point.
Returns:
str: The target temperature.
"""
self.write("IN_SP_00")
return self.read()