Skip to content

Sensors

Generic Sensor

GenericSensor

Source code in software/sailowtech_ctd/sensors/generic.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
class GenericSensor:
    class Commands(Enum):
        ...

    def __init__(self, brand: SensorBrand, sensor_type: Sensor, name: str, address: int, min_delay: float, metric_type: Metric):
        """
        Initialisation of a generic sensor
        :param brand: Brand of the sensor
        :param sensor_type: Type of the sensor
        :param name: Name of the sensor
        :param address: Address of the sensor
        :param min_delay: Minimum delay of the sensor required for measuring
        """
        self.brand: SensorBrand = brand
        self.sensor_type: Sensor = sensor_type
        self.metric_type: Metric = metric_type

        self.name: str = name
        self.addr: int = address

        self.min_delay: float = min_delay
        self.last_read: float = 0.

    def init(self, bus: smbus.SMBus):
        ...

    def calibrate(self, bus: smbus.SMBus):
        ...

    def measure_value(self, bus: smbus.SMBus):
        ...

    def write_read_command(self, bus: smbus.SMBus) -> bool:
        ...

    def read_result(self, bus: smbus.SMBus) -> float:
        ...

    def __str__(self):
        return f'{self.__class__.__name__}(' \
               f'name="{self.name}", ' \
               f'address={hex(self.addr)}, ' \
               f'brand={self.brand}, ' \
               f'type={self.sensor_type}, ' \
               f'min_delay={self.min_delay}' \
               f')'

    def __repr__(self):
        return self.__str__()

__init__(brand, sensor_type, name, address, min_delay, metric_type)

Initialisation of a generic sensor

Parameters:

Name Type Description Default
brand SensorBrand

Brand of the sensor

required
sensor_type Sensor

Type of the sensor

required
name str

Name of the sensor

required
address int

Address of the sensor

required
min_delay float

Minimum delay of the sensor required for measuring

required
Source code in software/sailowtech_ctd/sensors/generic.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def __init__(self, brand: SensorBrand, sensor_type: Sensor, name: str, address: int, min_delay: float, metric_type: Metric):
    """
    Initialisation of a generic sensor
    :param brand: Brand of the sensor
    :param sensor_type: Type of the sensor
    :param name: Name of the sensor
    :param address: Address of the sensor
    :param min_delay: Minimum delay of the sensor required for measuring
    """
    self.brand: SensorBrand = brand
    self.sensor_type: Sensor = sensor_type
    self.metric_type: Metric = metric_type

    self.name: str = name
    self.addr: int = address

    self.min_delay: float = min_delay
    self.last_read: float = 0.

SensorBrand

Bases: Enum

The different brands of the sensors. Some brands have their own protocols, which is why we group them together.

Source code in software/sailowtech_ctd/sensors/generic.py
 8
 9
10
11
12
13
14
class SensorBrand(Enum):
    """
    The different brands of the sensors. Some brands have their own protocols, which is why we group them together.
    """
    Atlas = auto()
    BlueRobotics = auto()
    MockBrand = auto()

Mock Sensor

MockSensor

Bases: GenericSensor

Class which provides a mock sensor in order to be able to test the interface

Source code in software/sailowtech_ctd/sensors/mocksensor.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
class MockSensor(GenericSensor):
    """
    Class which provides a mock sensor in order to be able to test the interface
    """
    def __init__(self, brand: SensorBrand, sensor_type: Sensor, name: str, address: int, min_val: int, max_val: int, min_delay: float, metric_type: Metric):
        """
        Initialise the mock sensor
        :param brand: Brand of the sensor (mock_sensor)
        :param sensor_type: Type of the sensor (mock_sensor)
        :param name: The given name for this sensor
        :param address: The address assigned to this sensor (is ignored)
        :param min_val: The minimum value that the mock sensor is returning
        :param max_val: The maximum value that the mock sensor is returning
        """
        self.min = min_val
        self.max = max_val
        self.min_delay = min_delay
        super().__init__(brand, sensor_type, name, address, self.min_delay, metric_type)

    def init(self, bus: smbus.SMBus):
        """
        Initialises the mock sensor. Always returns true since no setup is required
        :param bus: The bus to be used - is ignored
        :return: Returns True
        """
        return True

    def write_read_command(self, bus: smbus.SMBus) -> bool:
        """
        Method to write the read command to the sensor - is ignored
        :param bus: The bus to be used - is ignored
        :return: Returns True
        """
        return True

    def read_result(self, bus: smbus.SMBus) -> float:
        """
        Method to read the measurement result
        :param bus: The bus to be used - is ignored
        :return: Returns the measured value
        """
        return self.measure_value(bus)

    def measure_value(self, bus: smbus.SMBus):
        """
        Measure the value of the mock sensor. Returns a random value between the minimum and maximum set during class instantiation
        :param bus: The bus to be used - is ignored
        :return: Returns the randomly generated simulated measurement
        """
        return random.uniform(self.min, self.max)

__init__(brand, sensor_type, name, address, min_val, max_val, min_delay, metric_type)

Initialise the mock sensor

Parameters:

Name Type Description Default
brand SensorBrand

Brand of the sensor (mock_sensor)

required
sensor_type Sensor

Type of the sensor (mock_sensor)

required
name str

The given name for this sensor

required
address int

The address assigned to this sensor (is ignored)

required
min_val int

The minimum value that the mock sensor is returning

required
max_val int

The maximum value that the mock sensor is returning

required
Source code in software/sailowtech_ctd/sensors/mocksensor.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def __init__(self, brand: SensorBrand, sensor_type: Sensor, name: str, address: int, min_val: int, max_val: int, min_delay: float, metric_type: Metric):
    """
    Initialise the mock sensor
    :param brand: Brand of the sensor (mock_sensor)
    :param sensor_type: Type of the sensor (mock_sensor)
    :param name: The given name for this sensor
    :param address: The address assigned to this sensor (is ignored)
    :param min_val: The minimum value that the mock sensor is returning
    :param max_val: The maximum value that the mock sensor is returning
    """
    self.min = min_val
    self.max = max_val
    self.min_delay = min_delay
    super().__init__(brand, sensor_type, name, address, self.min_delay, metric_type)

init(bus)

Initialises the mock sensor. Always returns true since no setup is required

Parameters:

Name Type Description Default
bus SMBus

The bus to be used - is ignored

required

Returns:

Type Description

Returns True

Source code in software/sailowtech_ctd/sensors/mocksensor.py
27
28
29
30
31
32
33
def init(self, bus: smbus.SMBus):
    """
    Initialises the mock sensor. Always returns true since no setup is required
    :param bus: The bus to be used - is ignored
    :return: Returns True
    """
    return True

measure_value(bus)

Measure the value of the mock sensor. Returns a random value between the minimum and maximum set during class instantiation

Parameters:

Name Type Description Default
bus SMBus

The bus to be used - is ignored

required

Returns:

Type Description

Returns the randomly generated simulated measurement

Source code in software/sailowtech_ctd/sensors/mocksensor.py
51
52
53
54
55
56
57
def measure_value(self, bus: smbus.SMBus):
    """
    Measure the value of the mock sensor. Returns a random value between the minimum and maximum set during class instantiation
    :param bus: The bus to be used - is ignored
    :return: Returns the randomly generated simulated measurement
    """
    return random.uniform(self.min, self.max)

read_result(bus)

Method to read the measurement result

Parameters:

Name Type Description Default
bus SMBus

The bus to be used - is ignored

required

Returns:

Type Description
float

Returns the measured value

Source code in software/sailowtech_ctd/sensors/mocksensor.py
43
44
45
46
47
48
49
def read_result(self, bus: smbus.SMBus) -> float:
    """
    Method to read the measurement result
    :param bus: The bus to be used - is ignored
    :return: Returns the measured value
    """
    return self.measure_value(bus)

write_read_command(bus)

Method to write the read command to the sensor - is ignored

Parameters:

Name Type Description Default
bus SMBus

The bus to be used - is ignored

required

Returns:

Type Description
bool

Returns True

Source code in software/sailowtech_ctd/sensors/mocksensor.py
35
36
37
38
39
40
41
def write_read_command(self, bus: smbus.SMBus) -> bool:
    """
    Method to write the read command to the sensor - is ignored
    :param bus: The bus to be used - is ignored
    :return: Returns True
    """
    return True

Atlas Sensor

AtlasSensor

Bases: GenericSensor

Class which can be used for all (supported) Atlas Scientific sensors

Source code in software/sailowtech_ctd/sensors/atlas.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
class AtlasSensor(GenericSensor):
    """
    Class which can be used for all (supported) Atlas Scientific sensors
    """
    def __init__(self, sensor: Sensor, name: str, address: int, min_delay: float):
        """
        Initialise Atlas Scientific sensor
        :param sensor: Type of the sensor
        :param name: Name of the sensor
        :param address: I2C-address of the sensor
        :param min_delay: Minimum delay between write / read required for proper functioning
        """
        match sensor:
            case Sensor.ATLAS_EZO_DO: metric_type = Metric.DISSOLVED_OXYGEN
            case Sensor.ATLAS_EZO_TEMP: metric_type = Metric.TEMPERATURE
            case Sensor.ATLAS_EZO_CONDUCTIVITY: metric_type = Metric.CONDUCTIVITY
            case _: metric_type = None; logger.critical("Implementation missing for Sensor!")
        super().__init__(SensorBrand.Atlas, sensor, name, address, min_delay, metric_type)
        self.device = atlas_i2c.AtlasI2C()
        self.device.set_i2c_address(address)

    def init(self, bus: smbus.SMBus) -> None:
        """
        Initialises the sensor. Currently, does not do anything
        :param bus: SMBus object
        :return: Returns nothing
        """
        pass

    def calibrate(self, bus: smbus.SMBus):
        """
        Calibrate the sensor. Currently, does not do anything
        :param bus: SMBus object
        :return: Returns nothing
        """
        pass

    def write_read_command(self, bus: smbus.SMBus = None) -> bool:
        """
        Write a read command to the device.
        :param bus: Bus device. Not required
        :return: Returns true after sending read command
        """
        self.device.write("R")
        return True

    def read_result(self, bus: smbus.SMBus = None) -> float:
        """
        Read the measured value from device.
        :param bus: Bus device. Not required
        :return: Returns true after sending read command
        """
        result = self.device.read("R")
        if result.status_code == 1:
            return float(result.data.decode("UTF-8"))
        else:
            logger.warning(f"Invalid response received from sensor {self.name}")
            return 0

    def measure_value(self, bus: smbus.SMBus) -> float:
        response = self.device.query(commands.READ)
        if response.status_code == 1:
            return float(response.data.decode("UTF-8"))
        else:
            logger.warning(f"Invalid response received from sensor {self.name}")
            return 0

__init__(sensor, name, address, min_delay)

Initialise Atlas Scientific sensor

Parameters:

Name Type Description Default
sensor Sensor

Type of the sensor

required
name str

Name of the sensor

required
address int

I2C-address of the sensor

required
min_delay float

Minimum delay between write / read required for proper functioning

required
Source code in software/sailowtech_ctd/sensors/atlas.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def __init__(self, sensor: Sensor, name: str, address: int, min_delay: float):
    """
    Initialise Atlas Scientific sensor
    :param sensor: Type of the sensor
    :param name: Name of the sensor
    :param address: I2C-address of the sensor
    :param min_delay: Minimum delay between write / read required for proper functioning
    """
    match sensor:
        case Sensor.ATLAS_EZO_DO: metric_type = Metric.DISSOLVED_OXYGEN
        case Sensor.ATLAS_EZO_TEMP: metric_type = Metric.TEMPERATURE
        case Sensor.ATLAS_EZO_CONDUCTIVITY: metric_type = Metric.CONDUCTIVITY
        case _: metric_type = None; logger.critical("Implementation missing for Sensor!")
    super().__init__(SensorBrand.Atlas, sensor, name, address, min_delay, metric_type)
    self.device = atlas_i2c.AtlasI2C()
    self.device.set_i2c_address(address)

calibrate(bus)

Calibrate the sensor. Currently, does not do anything

Parameters:

Name Type Description Default
bus SMBus

SMBus object

required

Returns:

Type Description

Returns nothing

Source code in software/sailowtech_ctd/sensors/atlas.py
51
52
53
54
55
56
57
def calibrate(self, bus: smbus.SMBus):
    """
    Calibrate the sensor. Currently, does not do anything
    :param bus: SMBus object
    :return: Returns nothing
    """
    pass

init(bus)

Initialises the sensor. Currently, does not do anything

Parameters:

Name Type Description Default
bus SMBus

SMBus object

required

Returns:

Type Description
None

Returns nothing

Source code in software/sailowtech_ctd/sensors/atlas.py
43
44
45
46
47
48
49
def init(self, bus: smbus.SMBus) -> None:
    """
    Initialises the sensor. Currently, does not do anything
    :param bus: SMBus object
    :return: Returns nothing
    """
    pass

read_result(bus=None)

Read the measured value from device.

Parameters:

Name Type Description Default
bus SMBus

Bus device. Not required

None

Returns:

Type Description
float

Returns true after sending read command

Source code in software/sailowtech_ctd/sensors/atlas.py
68
69
70
71
72
73
74
75
76
77
78
79
def read_result(self, bus: smbus.SMBus = None) -> float:
    """
    Read the measured value from device.
    :param bus: Bus device. Not required
    :return: Returns true after sending read command
    """
    result = self.device.read("R")
    if result.status_code == 1:
        return float(result.data.decode("UTF-8"))
    else:
        logger.warning(f"Invalid response received from sensor {self.name}")
        return 0

write_read_command(bus=None)

Write a read command to the device.

Parameters:

Name Type Description Default
bus SMBus

Bus device. Not required

None

Returns:

Type Description
bool

Returns true after sending read command

Source code in software/sailowtech_ctd/sensors/atlas.py
59
60
61
62
63
64
65
66
def write_read_command(self, bus: smbus.SMBus = None) -> bool:
    """
    Write a read command to the device.
    :param bus: Bus device. Not required
    :return: Returns true after sending read command
    """
    self.device.write("R")
    return True

handle_raspi_glitch(response)

Change MSB to 0 for all received characters except the first and get a list of characters NOTE: having to change the MSB to 0 is a glitch in the raspberry pi, and you shouldn't have to do this!

Source code in software/sailowtech_ctd/sensors/atlas.py
12
13
14
15
16
17
18
19
def handle_raspi_glitch(response):  # Maybe useless when using smbus ?
    """
    Change MSB to 0 for all received characters except the first
    and get a list of characters
    NOTE: having to change the MSB to 0 is a glitch in the raspberry pi,
    and you shouldn't have to do this!
    """
    return list(map(lambda x: chr(x & ~0x80), list(response)))

Bluerobotics Sensor

BlueRoboticsSensor

Bases: GenericSensor

Base for a Bluerobotics sensor

Source code in software/sailowtech_ctd/sensors/bluerobotics.py
 8
 9
10
11
12
13
class BlueRoboticsSensor(GenericSensor):
    """
    Base for a Bluerobotics sensor
    """
    def __init__(self, sensor: Sensor, name: str, address: int, min_delay: float):
        super().__init__(SensorBrand.BlueRobotics, sensor, name, address, min_delay, Metric.DEPTH)

DepthSensor

Bases: BlueRoboticsSensor

Class for a depth sensor by Bluerobotics

Source code in software/sailowtech_ctd/sensors/bluerobotics.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
class DepthSensor(BlueRoboticsSensor):
    """
    Class for a depth sensor by Bluerobotics
    """
    _MS5837_ADDR = 0x76
    _MS5837_RESET = 0x1E
    _MS5837_ADC_READ = 0x00
    _MS5837_PROM_READ = 0xA0
    _MS5837_CONVERT_D1_256 = 0x40
    _MS5837_CONVERT_D2_256 = 0x50

    DEFAULT_ADDRESS = 98

    # Models
    MODEL_02BA = 0
    MODEL_30BA = 1

    # Oversampling options
    OSR_256 = 0
    OSR_512 = 1
    OSR_1024 = 2
    OSR_2048 = 3
    OSR_4096 = 4
    OSR_8192 = 5

    # kg/m^3 convenience
    DENSITY_FRESHWATER = 997
    DENSITY_SALTWATER = 1029

    # Conversion factors (from native unit, mbar)
    UNITS_Pa = 100.0
    UNITS_hPa = 1.0
    UNITS_kPa = 0.1
    UNITS_mbar = 1.0
    UNITS_bar = 0.001
    UNITS_atm = 0.000986923
    UNITS_Torr = 0.750062
    UNITS_psi = 0.014503773773022

    # Valid units
    UNITS_Centigrade = 1
    UNITS_Fahrenheit = 2
    UNITS_Kelvin = 3

    def __init__(self, name: str, address: int = DEFAULT_ADDRESS, min_delay: float = 1):
        """
        Initialise the class of the depth sensor
        :param name: Name of the sensor
        :param address: I2C-Address of the sensor
        :param min_delay: Required minimum delay
        """
        super().__init__(Sensor.BLUEROBOTICS_BAR30_DEPTH, name, address, min_delay)
        self._model = self.MODEL_30BA
        self._C = []

        self._fluidDensity = self.DENSITY_FRESHWATER
        self._pressure = 0
        self._temperature = 0
        self._D1 = 0
        self._D2 = 0

    def init(self, bus: smbus.SMBus) -> bool:
        """
        Initialise the device of the depth sensor
        :param bus: SMBus over which the device can be reached
        :return: True if successful
        """
        bus.write_byte(i2c_addr=self._MS5837_ADDR, value=self._MS5837_RESET)

        # Wait for reset to complete
        sleep(0.01)

        self._C = []

        # Read calibration values and CRC
        for i in range(7):
            c = bus.read_word_data(self._MS5837_ADDR, self._MS5837_PROM_READ + 2 * i)
            c = ((c & 0xFF) << 8) | (c >> 8)  # SMBus is little-endian for word transfers, we need to swap MSB and LSB
            self._C.append(c)

        crc = (self._C[0] & 0xF000) >> 12
        if crc != self._crc4(self._C):
            print("PROM read error, CRC failed!")
            return False
        return True

    def write_read_command(self, bus: smbus.SMBus) -> bool:
        return True

    def read_result(self, bus: smbus.SMBus) -> float:
        return self.measure_value(bus)

    def measure_value(self, bus: smbus.SMBus) -> float:
        """
        Measures the different values
        :param bus: SMBus over which the device can be reached
        :return: Returns the different values
        """
        self.read(bus)
        # TODO here, we are returning all the different values that we get from this sensor. However, we are only expecting one. We need to find a good solution on how to differentiate these readings across the objects. Maybe save both depth and pressure, since both can be useful and directly saving the depth can be good for easier quick-assessment
        return self.depth()

    ###############################################################
    # FULLY COPIED FROM BLUEROBOTICS's CODE, just some parameters adapted and "self."  added
    def read(self, bus: smbus.SMBus, oversampling=OSR_8192) -> bool:
        """
        Take a reading of the sensor
        :param bus: SMBus over which the device can be reached
        :param oversampling: Oversampling attribute
        :return: True if successful
        """
        if oversampling < self.OSR_256 or oversampling > self.OSR_8192:
            print("Invalid oversampling option!")
            return False

        # Request D1 conversion (pressure)
        bus.write_byte(self._MS5837_ADDR, self._MS5837_CONVERT_D1_256 + 2 * oversampling)

        # Maximum conversion time increases linearly with oversampling
        # max time (seconds) ~= 2.2e-6(x) where x = OSR = (2^8, 2^9, ..., 2^13)
        # We use 2.5e-6 for some overhead
        sleep(2.5e-6 * 2 ** (8 + oversampling))

        d = bus.read_i2c_block_data(self._MS5837_ADDR, self._MS5837_ADC_READ, 3)
        self._D1 = d[0] << 16 | d[1] << 8 | d[2]

        # Request D2 conversion (temperature)
        bus.write_byte(self._MS5837_ADDR, self._MS5837_CONVERT_D2_256 + 2 * oversampling)

        # As above
        sleep(2.5e-6 * 2 ** (8 + oversampling))

        d = bus.read_i2c_block_data(self._MS5837_ADDR, self._MS5837_ADC_READ, 3)
        self._D2 = d[0] << 16 | d[1] << 8 | d[2]

        # Calculate compensated pressure and temperature
        # using raw ADC values and internal calibration
        self._calculate()

        return True

    def set_fluid_density(self, density):
        """
        Set the fluid density of the water
        :param density: Density to be set
        :return: None
        """
        self._fluidDensity = density

    def pressure(self, conversion=UNITS_mbar) -> float:
        """
        Get the pressure in the requested unit. Defaults to mBar.
        :param conversion: The unit conversion which should be used to return the value
        :return: Returns the pressure measured
        """
        return self._pressure * conversion


    def temperature(self, conversion=UNITS_Centigrade) -> float:
        """
        Temperature in requested units. Default in degrees Celsius
        :param conversion: The conversion to be applied
        :return: Returns the temperature in requested unit
        """
        deg_c = self._temperature / 100.0
        if conversion == self.UNITS_Fahrenheit:
            return (9.0 / 5.0) * deg_c + 32
        elif conversion == self.UNITS_Kelvin:
            return deg_c + 273
        return deg_c


    def depth(self) -> float:
        """
        Depth relative to MSL pressure in given fluid density
        :return: Returns depth
        """
        return (self.pressure(self.UNITS_Pa) - 101300) / (self._fluidDensity * 9.80665)


    def altitude(self) -> float:
        """
        Altitude relative to MSL pressure
        :return: Altitude relative to MSL pressure
        """
        return (1 - pow((self.pressure() / 1013.25), .190284)) * 145366.45 * .3048

        # Cribbed from datasheet

    def _calculate(self) -> None:
        OFFi = 0
        SENSi = 0
        Ti = 0

        dT = self._D2 - self._C[5] * 256
        if self._model == self.MODEL_02BA:
            SENS = self._C[1] * 65536 + (self._C[3] * dT) / 128
            OFF = self._C[2] * 131072 + (self._C[4] * dT) / 64
            self._pressure = (self._D1 * SENS / (2097152) - OFF) / (32768)
        else:
            SENS = self._C[1] * 32768 + (self._C[3] * dT) / 256
            OFF = self._C[2] * 65536 + (self._C[4] * dT) / 128
            self._pressure = (self._D1 * SENS / (2097152) - OFF) / (8192)

        self._temperature = 2000 + dT * self._C[6] / 8388608

        # Second order compensation
        if self._model == self.MODEL_02BA:
            if (self._temperature / 100) < 20:  # Low temp
                Ti = (11 * dT * dT) / (34359738368)
                OFFi = (31 * (self._temperature - 2000) * (self._temperature - 2000)) / 8
                SENSi = (63 * (self._temperature - 2000) * (self._temperature - 2000)) / 32

        else:
            if (self._temperature / 100) < 20:  # Low temp
                Ti = (3 * dT * dT) / (8589934592)
                OFFi = (3 * (self._temperature - 2000) * (self._temperature - 2000)) / 2
                SENSi = (5 * (self._temperature - 2000) * (self._temperature - 2000)) / 8
                if (self._temperature / 100) < -15:  # Very low temp
                    OFFi = OFFi + 7 * (self._temperature + 1500) * (self._temperature + 1500)
                    SENSi = SENSi + 4 * (self._temperature + 1500) * (self._temperature + 1500)
            elif (self._temperature / 100) >= 20:  # High temp
                Ti = 2 * (dT * dT) / (137438953472)
                OFFi = (1 * (self._temperature - 2000) * (self._temperature - 2000)) / 16
                SENSi = 0

        OFF2 = OFF - OFFi
        SENS2 = SENS - SENSi

        if self._model == self.MODEL_02BA:
            self._temperature = (self._temperature - Ti)
            self._pressure = (((self._D1 * SENS2) / 2097152 - OFF2) / 32768) / 100.0
        else:
            self._temperature = (self._temperature - Ti)
            self._pressure = (((self._D1 * SENS2) / 2097152 - OFF2) / 8192) / 10.0

            # Cribbed from datasheet

    def _crc4(self, n_prom):
        """
        Checksum function
        :param n_prom: ?
        :return: Checksum
        """
        n_rem = 0

        n_prom[0] = ((n_prom[0]) & 0x0FFF)
        n_prom.append(0)

        for i in range(16):
            if i % 2 == 1:
                n_rem ^= ((n_prom[i >> 1]) & 0x00FF)
            else:
                n_rem ^= (n_prom[i >> 1] >> 8)

            for n_bit in range(8, 0, -1):
                if n_rem & 0x8000:
                    n_rem = (n_rem << 1) ^ 0x3000
                else:
                    n_rem = (n_rem << 1)

        n_rem = ((n_rem >> 12) & 0x000F)

        self.n_prom = n_prom
        self.n_rem = n_rem

        return n_rem ^ 0x00

__init__(name, address=DEFAULT_ADDRESS, min_delay=1)

Initialise the class of the depth sensor

Parameters:

Name Type Description Default
name str

Name of the sensor

required
address int

I2C-Address of the sensor

DEFAULT_ADDRESS
min_delay float

Required minimum delay

1
Source code in software/sailowtech_ctd/sensors/bluerobotics.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def __init__(self, name: str, address: int = DEFAULT_ADDRESS, min_delay: float = 1):
    """
    Initialise the class of the depth sensor
    :param name: Name of the sensor
    :param address: I2C-Address of the sensor
    :param min_delay: Required minimum delay
    """
    super().__init__(Sensor.BLUEROBOTICS_BAR30_DEPTH, name, address, min_delay)
    self._model = self.MODEL_30BA
    self._C = []

    self._fluidDensity = self.DENSITY_FRESHWATER
    self._pressure = 0
    self._temperature = 0
    self._D1 = 0
    self._D2 = 0

_crc4(n_prom)

Checksum function

Parameters:

Name Type Description Default
n_prom

?

required

Returns:

Type Description

Checksum

Source code in software/sailowtech_ctd/sensors/bluerobotics.py
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
def _crc4(self, n_prom):
    """
    Checksum function
    :param n_prom: ?
    :return: Checksum
    """
    n_rem = 0

    n_prom[0] = ((n_prom[0]) & 0x0FFF)
    n_prom.append(0)

    for i in range(16):
        if i % 2 == 1:
            n_rem ^= ((n_prom[i >> 1]) & 0x00FF)
        else:
            n_rem ^= (n_prom[i >> 1] >> 8)

        for n_bit in range(8, 0, -1):
            if n_rem & 0x8000:
                n_rem = (n_rem << 1) ^ 0x3000
            else:
                n_rem = (n_rem << 1)

    n_rem = ((n_rem >> 12) & 0x000F)

    self.n_prom = n_prom
    self.n_rem = n_rem

    return n_rem ^ 0x00

altitude()

Altitude relative to MSL pressure

Returns:

Type Description
float

Altitude relative to MSL pressure

Source code in software/sailowtech_ctd/sensors/bluerobotics.py
196
197
198
199
200
201
def altitude(self) -> float:
    """
    Altitude relative to MSL pressure
    :return: Altitude relative to MSL pressure
    """
    return (1 - pow((self.pressure() / 1013.25), .190284)) * 145366.45 * .3048

depth()

Depth relative to MSL pressure in given fluid density

Returns:

Type Description
float

Returns depth

Source code in software/sailowtech_ctd/sensors/bluerobotics.py
188
189
190
191
192
193
def depth(self) -> float:
    """
    Depth relative to MSL pressure in given fluid density
    :return: Returns depth
    """
    return (self.pressure(self.UNITS_Pa) - 101300) / (self._fluidDensity * 9.80665)

init(bus)

Initialise the device of the depth sensor

Parameters:

Name Type Description Default
bus SMBus

SMBus over which the device can be reached

required

Returns:

Type Description
bool

True if successful

Source code in software/sailowtech_ctd/sensors/bluerobotics.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def init(self, bus: smbus.SMBus) -> bool:
    """
    Initialise the device of the depth sensor
    :param bus: SMBus over which the device can be reached
    :return: True if successful
    """
    bus.write_byte(i2c_addr=self._MS5837_ADDR, value=self._MS5837_RESET)

    # Wait for reset to complete
    sleep(0.01)

    self._C = []

    # Read calibration values and CRC
    for i in range(7):
        c = bus.read_word_data(self._MS5837_ADDR, self._MS5837_PROM_READ + 2 * i)
        c = ((c & 0xFF) << 8) | (c >> 8)  # SMBus is little-endian for word transfers, we need to swap MSB and LSB
        self._C.append(c)

    crc = (self._C[0] & 0xF000) >> 12
    if crc != self._crc4(self._C):
        print("PROM read error, CRC failed!")
        return False
    return True

measure_value(bus)

Measures the different values

Parameters:

Name Type Description Default
bus SMBus

SMBus over which the device can be reached

required

Returns:

Type Description
float

Returns the different values

Source code in software/sailowtech_ctd/sensors/bluerobotics.py
108
109
110
111
112
113
114
115
116
def measure_value(self, bus: smbus.SMBus) -> float:
    """
    Measures the different values
    :param bus: SMBus over which the device can be reached
    :return: Returns the different values
    """
    self.read(bus)
    # TODO here, we are returning all the different values that we get from this sensor. However, we are only expecting one. We need to find a good solution on how to differentiate these readings across the objects. Maybe save both depth and pressure, since both can be useful and directly saving the depth can be good for easier quick-assessment
    return self.depth()

pressure(conversion=UNITS_mbar)

Get the pressure in the requested unit. Defaults to mBar.

Parameters:

Name Type Description Default
conversion

The unit conversion which should be used to return the value

UNITS_mbar

Returns:

Type Description
float

Returns the pressure measured

Source code in software/sailowtech_ctd/sensors/bluerobotics.py
165
166
167
168
169
170
171
def pressure(self, conversion=UNITS_mbar) -> float:
    """
    Get the pressure in the requested unit. Defaults to mBar.
    :param conversion: The unit conversion which should be used to return the value
    :return: Returns the pressure measured
    """
    return self._pressure * conversion

read(bus, oversampling=OSR_8192)

Take a reading of the sensor

Parameters:

Name Type Description Default
bus SMBus

SMBus over which the device can be reached

required
oversampling

Oversampling attribute

OSR_8192

Returns:

Type Description
bool

True if successful

Source code in software/sailowtech_ctd/sensors/bluerobotics.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
def read(self, bus: smbus.SMBus, oversampling=OSR_8192) -> bool:
    """
    Take a reading of the sensor
    :param bus: SMBus over which the device can be reached
    :param oversampling: Oversampling attribute
    :return: True if successful
    """
    if oversampling < self.OSR_256 or oversampling > self.OSR_8192:
        print("Invalid oversampling option!")
        return False

    # Request D1 conversion (pressure)
    bus.write_byte(self._MS5837_ADDR, self._MS5837_CONVERT_D1_256 + 2 * oversampling)

    # Maximum conversion time increases linearly with oversampling
    # max time (seconds) ~= 2.2e-6(x) where x = OSR = (2^8, 2^9, ..., 2^13)
    # We use 2.5e-6 for some overhead
    sleep(2.5e-6 * 2 ** (8 + oversampling))

    d = bus.read_i2c_block_data(self._MS5837_ADDR, self._MS5837_ADC_READ, 3)
    self._D1 = d[0] << 16 | d[1] << 8 | d[2]

    # Request D2 conversion (temperature)
    bus.write_byte(self._MS5837_ADDR, self._MS5837_CONVERT_D2_256 + 2 * oversampling)

    # As above
    sleep(2.5e-6 * 2 ** (8 + oversampling))

    d = bus.read_i2c_block_data(self._MS5837_ADDR, self._MS5837_ADC_READ, 3)
    self._D2 = d[0] << 16 | d[1] << 8 | d[2]

    # Calculate compensated pressure and temperature
    # using raw ADC values and internal calibration
    self._calculate()

    return True

set_fluid_density(density)

Set the fluid density of the water

Parameters:

Name Type Description Default
density

Density to be set

required

Returns:

Type Description

None

Source code in software/sailowtech_ctd/sensors/bluerobotics.py
157
158
159
160
161
162
163
def set_fluid_density(self, density):
    """
    Set the fluid density of the water
    :param density: Density to be set
    :return: None
    """
    self._fluidDensity = density

temperature(conversion=UNITS_Centigrade)

Temperature in requested units. Default in degrees Celsius

Parameters:

Name Type Description Default
conversion

The conversion to be applied

UNITS_Centigrade

Returns:

Type Description
float

Returns the temperature in requested unit

Source code in software/sailowtech_ctd/sensors/bluerobotics.py
174
175
176
177
178
179
180
181
182
183
184
185
def temperature(self, conversion=UNITS_Centigrade) -> float:
    """
    Temperature in requested units. Default in degrees Celsius
    :param conversion: The conversion to be applied
    :return: Returns the temperature in requested unit
    """
    deg_c = self._temperature / 100.0
    if conversion == self.UNITS_Fahrenheit:
        return (9.0 / 5.0) * deg_c + 32
    elif conversion == self.UNITS_Kelvin:
        return deg_c + 273
    return deg_c

Types

Sensor

Bases: StrEnum

Types of all the available sensors. Can then be used in other parts of the software, for example in the config file parsing.

Source code in software/sailowtech_ctd/sensors/types.py
 4
 5
 6
 7
 8
 9
10
11
12
class Sensor(StrEnum):
    """
    Types of all the available sensors. Can then be used in other parts of the software, for example in the config file parsing.
    """
    BLUEROBOTICS_BAR30_DEPTH = auto()
    ATLAS_EZO_CONDUCTIVITY = auto()
    ATLAS_EZO_DO = auto()
    ATLAS_EZO_TEMP = auto()
    MOCK_SENSOR = auto()