# ##### BEGIN GPL LICENSE BLOCK #####
#
# Copyright (C) 2021 Patrick Baus
# This file is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This file is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this file. If not, see <http://www.gnu.org/licenses/>.
#
# ##### END GPL LICENSE BLOCK #####
"""
This is an asyncIO driver for the HP 3478A DMM to abstract away the GPIB interface.
"""
from __future__ import annotations
import asyncio
import re # Used to test for numerical return values
from dataclasses import dataclass
from decimal import Decimal
from math import log
from types import TracebackType
from typing import TYPE_CHECKING, AsyncGenerator, Type
from hp3478a_async.enums import DisplayType, FrontRearSwitchPosition, FunctionType, Range, TriggerType
from hp3478a_async.errors import DeviceError
from hp3478a_async.flags import ErrorFlags, SerialPollFlags, SrqMask, StatusFlags
try:
from typing import Self # type: ignore # Python 3.11
except ImportError:
from typing_extensions import Self
if TYPE_CHECKING:
from async_gpib import AsyncGpib
from prologix_gpib_async import AsyncPrologixGpibController
[docs]
@dataclass
class DmmStatus:
"""The device status of them DMM"""
function: FunctionType
range: Range
ndigits: int
status: StatusFlags
srq_flags: SerialPollFlags
error_flags: ErrorFlags
dac_value: int
[docs]
@dataclass
class NtcParameters:
"""
The Steinhart-Hart coefficient of an NTC thermistor. The formula to calculate the temperature from the resistance is
as follows:
1/T=a+b*Log(Rt/R25)+c*Log(Rt/R25)**2+d*Log(Rt/R25)**3
See `Wikipedia: Steinhart–Hart equation <https://en.wikipedia.org/wiki/Steinhart%E2%80%93Hart_equation>`_ for more
details.
"""
a: float # pylint: disable=invalid-name # this is standard naming convention
b: float # pylint: disable=invalid-name # this is standard naming convention
c: float # pylint: disable=invalid-name # this is standard naming convention
d: float # pylint: disable=invalid-name # this is standard naming convention
rt25: float
def __post_init__(self):
assert all([self.rt25 > 0, self.a > 0, self.b > 0, self.c > 0, self.d > 0])
# Used to test for numerical return values of the read() command
numerical_test_pattern = re.compile(rb"^[+-]\d+\.\d+E[+-]\d")
[docs]
class HP_3478A: # noqa pylint: disable=too-many-public-methods,invalid-name
"""
The driver for the HP 3478A 5.5 digit multimeter. It supports both linux-gpib and the Prologix
GPIB adapters.
"""
@property
def connection(self) -> AsyncGpib | AsyncPrologixGpibController:
"""
The GPIB connection.
"""
return self.__conn
[docs]
def __init__(self, connection: AsyncGpib | AsyncPrologixGpibController) -> None:
"""
Create an HP 3478A with the GPIB connection given.
Parameters
----------
connection: AsyncGpib or AsyncPrologixGpibController
The GPIB connection
"""
self.__conn = connection
self.__special_function: FunctionType | None = None
# Default constants taken from Amphenol DC95 (Material Type 10kY)
# https://www.amphenol-sensors.com/hubfs/Documents/AAS-913-318C-Temperature-resistance-curves-071816-web.pdf
self.__ntc_parameters: NtcParameters = NtcParameters(
rt25=10 * 10**3,
a=3.3540153 * 10**-3,
b=2.7867185 * 10**-4,
c=4.0006637 * 10**-6,
d=1.5575628 * 10**-7,
)
def __str__(self) -> str:
return f"HEWLETT-PACKARD 3478A at {str(self.connection)}"
async def __aenter__(self) -> Self:
await self.connect()
return self
async def __aexit__(
self, exc_type: Type[BaseException] | None, exc: BaseException | None, traceback: TracebackType | None
) -> None:
await self.disconnect()
[docs]
@staticmethod
async def get_id() -> tuple[str, str, str, str]:
"""
The HP 3478A does not support an ID request, so this function returns the constant
`"HEWLETT-PACKARD", "3478A", "0", "0"` to emulate the `*IDN?` SCPI command. The method is not async, but for
compatibility reasons with other drivers, it is declared async.
Returns
-------
tuple of str
A SCPI compliant id string
"""
return "HEWLETT-PACKARD", "3478A", "0", "0"
[docs]
async def connect(self) -> None:
"""
Connect the GPIB connection and configure the GPIB device for the DMM. This function must be called from the
loop and takes care of connecting the GPIB adapter.
"""
await self.__conn.connect()
if hasattr(self.__conn, "set_eot"):
# Used by the Prologix adapters
await self.__conn.set_eot(False)
await asyncio.gather(
# Default display mode
self.set_display(DisplayType.NORMAL),
# Default SRQ Mask
self.set_srq_mask(SrqMask.NONE),
)
[docs]
async def disconnect(self) -> None:
"""
Disconnect the GPIB device and release any lock on the front panel of the device if held.
"""
try:
await self.local()
# Wait 0.5 seconds for the DMM to finish reading and accepting the local() command
# The slowest reading rate is 1.9 readings/s
await asyncio.sleep(0.5)
except ConnectionError:
pass
finally:
await self.__conn.disconnect()
[docs]
def set_ntc_parameters(
self, a: float, b: float, c: float, d: float, rt25: float
): # pylint: disable=invalid-name,too-many-arguments # this is standard naming convention
"""
Set the parameters used when in mode :attr:`FunctionType.NTC <hp3478a_async.enums.FunctionType.NTC>` or
:attr:`FunctionType.NTCF <hp3478a_async.enums.FunctionType.NTCF>`. The parameters can be found in the datasheet
of the thermistor. The formula for converting resistance values to temperature is:
1/T=a+b*Log(Rt/R25)+c*Log(Rt/R25)**2+d*Log(Rt/R25)**3
See `Wikipedia: Steinhart–Hart equation <https://en.wikipedia.org/wiki/Steinhart%E2%80%93Hart_equation>`_ for
more details.
Parameters
----------
a: float
The parameters of the NTC thermistor used
b: float
The parameters of the NTC thermistor used
c: float
The parameters of the NTC thermistor used
d: float
The parameters of the NTC thermistor used
rt25: float
The resistance of the NTC at 25 °C
"""
self.__ntc_parameters = NtcParameters(a, b, c, d, rt25)
@staticmethod
def __convert_thermistor_to_temperature(value: Decimal, ntc_parameters: NtcParameters) -> Decimal:
"""
Convert a resistance to temperature using the formula 1/T=a+b*Log(Rt/R25)+c*Log(Rt/R25)**2+d*Log(Rt/R25)**3.
Parameters
----------
value: Decimal or float
The resistance of the NTC
ntc_parameters: NtcParameters
Returns
-------
Decimal or float
The temperature in K
"""
# Note: float precision is good enough for thermistors, so we convert the value to float and finally back to
# Decimal
return Decimal(
1
/ (
ntc_parameters.a
+ ntc_parameters.b * log(float(value) / ntc_parameters.rt25)
+ ntc_parameters.c * log(float(value) / ntc_parameters.rt25) ** 2
+ ntc_parameters.d * log(float(value) / ntc_parameters.rt25) ** 3
)
)
def __post_process(self, value: Decimal) -> Decimal:
"""
Post-process the DMM value, if a special function was selected using :func:`set_function`. Returns the
unmodified value if no special function was selected.
Parameters
----------
value: Decimal or float
The value to post-process
Returns
-------
Decimal
the post-processed value. The value is unmodified if no special function was selected.
"""
if self.__special_function is not None:
try:
return self.__convert_thermistor_to_temperature(value, self.__ntc_parameters)
except ValueError:
raise ValueError(f"Cannot convert resistance to temperature. Measurement was: {value}.") from None
return value
[docs]
async def read(self, length: int | None = None) -> Decimal | bytes:
"""
Read a single value from the device. If `length' is given, read `length` bytes, else read until a line break
``b"\\n"``.
Parameters
----------
length: int, optional
The number of bytes to read. Omit to read a line.
Returns
-------
Decimal or bytes
Either a Decimal value or a number of bytes as defined by `length`.
Raises
------
OverflowError
If the instrument input is overloaded, i.e. returns `+9.99999E+9`.
"""
if length is None:
result = (await self.__conn.read())[:-2] # strip the EOT characters (\r\n)
else:
result = await self.__conn.read(length=length)
match = numerical_test_pattern.match(result)
if match is not None:
if match[0] == b"+9.99999E+9":
raise OverflowError("DMM input overloaded")
return self.__post_process(Decimal(match[0].decode("ascii")))
return result # else return the bytes
[docs]
async def read_all(self, length: int | None = None) -> AsyncGenerator[Decimal | bytes, None]:
"""
Read all values from the device. If `length' is given, read `length` bytes, else read until a line break
``b"\\n"``, then yield the result.
Parameters
----------
length: int, optional
The number of bytes to read. Omit to read a line.
Returns
-------
Iterator[Decimal or bytes]
Either a Decimal value or a number of bytes as defined by `length`.
Raises
------
OverflowError
If the instrument input is overloaded, i.e. returns `+9.99999E+9`.
DeviceError
If the device is not ready for read.
asyncio.TimeoutError
If the GPIB controller does not respond in time.
"""
await self.set_srq_mask(SrqMask.DATA_READY) # Enable a GPIB interrupt when the conversion is done
while "loop not cancelled":
try:
status_byte = SerialPollFlags(await self.connection.wait((1 << 11) | (1 << 14)))
if SerialPollFlags.SRQ_ON_DATA_READY in status_byte:
result = await self.read(length)
yield result
else:
raise DeviceError(f"Device did not signal ready for read. Status was: {status_byte}")
except asyncio.TimeoutError:
raise asyncio.TimeoutError("The GPIB controller did not respond in time.") from None
async def __query(self, command: bytes, length: int | None = None) -> bytes:
await self.write(command)
return await self.__conn.read(length=length)
[docs]
async def set_display(self, value: DisplayType, text: str = "") -> None:
"""
Sets a custom display text or display measurands. See page 12 of the manual for details.
Parameters
----------
value: DisplayType
The type of text to display on the front panel tft. If set to
:attr:`DisplayType.NORMAL <hp3478a_async.enums.DisplayType.NORMAL>`, no text will be set.
text: str
The text to display if `value` is not set to
:attr:`DisplayType.NORMAL <hp3478a_async.enums.DisplayType.NORMAL>`. There is no need to terminate the
string with ``"\\r"`` or ``"\\n"``.
"""
value = DisplayType(value)
if value == DisplayType.NORMAL:
# Do not allow text in normal display mode
await self.write(f"D{value.value:d}".encode("ascii"))
else:
# The text must be terminated by a control character like \r or \n
await self.write(f"D{value.value:d}{text.rstrip()}\n".encode("ascii"))
[docs]
async def set_trigger(self, value: TriggerType) -> None:
"""
Set the DMM trigger. See page 53 of the manual for details.
Parameters
----------
value: TriggerType
The trigger type used when taking measurements.
"""
value = TriggerType(value)
await self.write(f"T{value.value:d}".encode("ascii"))
[docs]
async def write(self, msg: bytes) -> None:
"""
Write data or commands to the instrument. Do not terminate the command with a new line or carriage return
(``b"\\r"`` or ``b"\\n"``). This bytestring will be written as is, be careful.
Parameters
----------
msg: bytes
The string to be sent to the device.
"""
await self.__conn.write(msg)
[docs]
async def set_srq_mask(self, value: SrqMask) -> None:
"""
Set the service interrupt mask. This will determine, when the GPIB SRQ is triggered by the instrument. The
:attr:`SrqMask.DATA_READY <hp3478a_async.flags.SrqMask.DATA_READY>` flag is useful, when reading with long
conversion times. See :doc:`examples` for an example and page 46 of the manual for details.
Parameters
----------
value: SrqMask
The service request register setting.
"""
value = SrqMask(value)
await self.write(f"M{value.value:02o}".encode("ascii"))
[docs]
async def get_front_rear_switch_position(self) -> FrontRearSwitchPosition:
"""
Check whether the front or rear panel binding posts are active.
Returns
----------
FrontRearSwitchPosition
The position of the front/rear switch
"""
return FrontRearSwitchPosition(int(await self.__query(b"S")))
[docs]
async def device_clear(self) -> None:
"""
Send the Selected Device Clear (SDC) event. This will trigger the self-test routine and reset the device to
its power on state.
"""
await self.__conn.clear()
[docs]
async def clear(self) -> None:
"""
Clear serial poll register
"""
await self.write(b"K")
[docs]
async def reset(self) -> None:
"""
Place the device in DCV, autorange, autozero, single trigger, 4.5 digits mode and erase any output stored in
the buffers.
"""
await self.write(b"H0")
[docs]
async def local(self) -> None:
"""
Disable the front panel and allow only GPIB commands.
"""
await self.__conn.ibloc()
[docs]
async def set_function(self, value: FunctionType) -> None:
"""
Put the device in a certain measurement mode of either DVC, ACV, Ohms, 4-W Ohms, DCI, ACI or the extended ohms
mode. See page 55 of the manual for details on the extended ohms mode.
Parameters
----------
value: FunctionType
The function type to be measured.
"""
value = FunctionType(value)
if value in (FunctionType.NTC, FunctionType.NTCF):
self.__special_function = value
# Convert to OHM/OHMF
value = FunctionType(((value.value - 8) % 2) + 3)
else:
self.__special_function = None
await self.write(f"F{value.value:d}".encode("ascii"))
[docs]
async def set_autozero(self, enable: bool) -> None:
"""
Change the auto-zero mode of the DMM. If enabled, the DMM will auto-zero between readings.
Parameters
----------
enable: bool
`True` to enable auto-zeroing.
"""
enable = bool(enable)
await self.write(f"Z{enable:d}".encode("ascii"))
[docs]
async def set_number_of_digits(self, value: int) -> None:
"""
Set the number of digits returned by the DMM. This has an influence on the integration time.
See page 15 of the manual for details.
Parameters
----------
value: {4, 5, 6}
A value between 4 and 6.
"""
value = int(value)
assert 4 <= value <= 6
await self.write(f"N{(value-1):d}".encode("ascii"))
[docs]
async def get_error_register(self) -> ErrorFlags:
"""
Get the contents of the error register, which is the result of the power on self-test. See page 62 of the manual
for details.
Returns
----------
ErrorFlags
The error register
"""
result = int(await self.__query(b"E"), base=8) # Convert the octal result to int
return ErrorFlags(result)
[docs]
async def set_range(self, value: Range) -> None:
"""
Sets the measurement range. The range, that can be selected depend on the measurement mode.
Parameters
----------
value: Range
The measurement range.
"""
value = Range(value)
await self.write(f"R{value.value}".encode("ascii"))
@staticmethod
def __calculate_range(function: FunctionType, range_value: int) -> Range:
"""
The range Enum is basically the exponent of the range. Unfortunately the returned bits depend on the function,
so we need to add or subtract according to the DMM function.
Parameters
----------
function: FunctionType
The function that is currently in use
range_value: int
The exponent
Returns
-------
Range
The adjusted range
"""
range_value_correction = {
FunctionType.DCV: -3,
FunctionType.ACV: -2,
FunctionType.OHM: 1,
FunctionType.OHMF: 1,
FunctionType.OHM_EXT: 1,
FunctionType.DCI: -2,
FunctionType.ACI: -2,
}
return Range(range_value + range_value_correction[function])
[docs]
async def get_cal_ram(self) -> bytes:
"""
An undocumented function. Read the internal calibration memory from the NVRAM. It can be used to backup the
calibration memory in case the internal battery fails. See :doc:`examples` for an example on how to read the
memory and convert it to meaningful data.
Returns
----------
bytes
The contents of the calibration ram.
"""
result = bytearray()
for addr in range(256):
result.append(ord(await self.__query(command=bytes([ord("W"), addr]), length=1)))
return bytes(result)
[docs]
async def set_cal_ram(self, data: bytes) -> None:
"""
Write to the internal NVRAM. Warning: This can brick the device until a valid calibration
configuration is written to the NVRAM. This function only works, if the front panel CAL switch is enabled.
Parameters
----------
data: bytes
The data to be written to the calibration memory.
"""
for addr, data_block in enumerate(data):
await self.write(bytes([ord("X"), addr, data_block]))
[docs]
async def get_status(self) -> DmmStatus:
"""
Read the binary status register of the device. See page 61 of the manual for details.
Returns
-------
DmmStatus
The current status of the DMM
"""
# The "B" command is special. It does not contain a line terminator, the
# device will output exactly 5 bytes and no more. So we need to read exactly
# 5 bytes.
result = await self.__query(command=b"B", length=5)
function = FunctionType((result[0] >> 5) & 0b111)
if self.__special_function is not None and function is FunctionType(
((self.__special_function.value - 8) % 2) + 3
):
# If a special function is enabled in the driver, and the instrument is set to
# the correct function, we will return the special function instead
function = self.__special_function
else:
# If the correct function is not set on the device, we will disable the special function
# in the driver
self.__special_function = None
return DmmStatus(
function=function,
range=self.__calculate_range(function, (result[0] >> 2) & 0b111),
ndigits=6 - (result[0] & 0b11),
status=StatusFlags(result[1]),
srq_flags=SerialPollFlags(result[2]),
error_flags=ErrorFlags(result[3]),
dac_value=result[4],
)
[docs]
async def serial_poll(self) -> SerialPollFlags:
"""
Serial poll the device/GPIB controller. Use this in combination with the SRQ mask to determine, if the
instrument triggered the SRQ and requests service.
Returns
-------
SerialPollFlags
The status register of the device
"""
return SerialPollFlags(await self.__conn.serial_poll())