Skip to content

Commit

Permalink
replace core.reader fixture factory w/function
Browse files Browse the repository at this point in the history
  • Loading branch information
avaldebe committed Dec 8, 2022
1 parent c129192 commit 2337c99
Showing 1 changed file with 49 additions and 94 deletions.
143 changes: 49 additions & 94 deletions tests/core/reader/test_SensorReader.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from pms import SensorWarmingUp, SensorWarning
from pms.core.reader import SensorReader, UnableToRead
from pms.core.sensor import Sensor


@pytest.fixture
Expand Down Expand Up @@ -110,64 +111,47 @@ def passive_read(n):
)


@pytest.fixture
def sensor_reader_factory(monkeypatch, mock_sensor):
def factory(
*,
@pytest.fixture()
def reader(monkeypatch, mock_sensor) -> SensorReader:
reader = SensorReader(
"PMSx003", # match with stubs
mock_sensor.port,
samples=0, # exit immediately
interval=None,
sensor="PMSx003", # match with stubs
max_retries=None,
):
sensor_reader = SensorReader(
port=mock_sensor.port,
samples=samples,
interval=interval,
sensor=sensor,
timeout=0.01, # low to avoid hanging on failure
max_retries=max_retries,
)

# https://github.com/pyserial/pyserial/issues/625
monkeypatch.setattr(
sensor_reader.serial,
"flush",
lambda: None,
)

sensor_reader.pre_heat = 0 # disable any preheat
timeout=0.01, # low to avoid hanging on failure
)

return sensor_reader
# https://github.com/pyserial/pyserial/issues/625
monkeypatch.setattr(reader.serial, "flush", lambda: None)

return factory
reader.pre_heat = 0 # disable any preheat

return reader

def test_sensor_reader(mock_sensor, sensor_reader_factory):
sensor_reader = sensor_reader_factory()

with sensor_reader:
obs = tuple(sensor_reader())
def test_reader(reader: SensorReader, mock_serial):
with reader:
obs = tuple(reader())

# check warm up happened
assert mock_sensor.stubs["wake"].called
assert mock_sensor.stubs["passive_mode"].called
assert mock_serial.stubs["wake"].called
assert mock_serial.stubs["passive_mode"].called

# check data was read
assert len(obs) == 1
assert obs[0].pm10 == 11822

# check sleep happened
assert mock_sensor.stubs["sleep"].called
assert mock_serial.stubs["sleep"].called


def test_sensor_reader_sleep(sensor_reader_factory, mock_sleep):
sensor_reader = sensor_reader_factory(
samples=2, # try to read twice
interval=5, # sleep between samples
)
def test_reader_sleep(reader: SensorReader, mock_sleep):
reader.samples = 2 # try to read twice
reader.interval = 5 # sleep between samples

with sensor_reader:
obs = tuple(sensor_reader())
with reader:
obs = tuple(reader())

# check we read twice
assert len(obs) == 2
Expand All @@ -176,105 +160,76 @@ def test_sensor_reader_sleep(sensor_reader_factory, mock_sleep):
assert 0 < mock_sleep.slept_for < 5


def test_sensor_reader_closed(mock_sensor, sensor_reader_factory):
sensor_reader = sensor_reader_factory()
obs = tuple(sensor_reader())
def test_reader_closed(reader: SensorReader):
obs = tuple(reader())
assert len(obs) == 0


def test_sensor_reader_preheat(sensor_reader_factory, mock_sleep):
sensor_reader = sensor_reader_factory()

# override pre heat duration
sensor_reader.pre_heat = 5
def test_reader_preheat(reader: SensorReader, mock_sleep):
reader.pre_heat = 5 # override pre heat duration

with sensor_reader:
with reader:
pass

# check we slept between reads
assert mock_sleep.slept_for == 5


def test_sensor_reader_warm_up(
mock_sensor,
sensor_reader_factory,
mock_sleep,
mock_sensor_warm_up,
):
sensor_reader = sensor_reader_factory()

with sensor_reader:
obs = tuple(sensor_reader())
def test_reader_warm_up(reader: SensorReader, mock_sleep, mock_sensor_warm_up):
with reader:
obs = tuple(reader())

# check we slept for warm up
assert mock_sleep.slept_for == 5
assert len(obs) == 1


def test_sensor_reader_warm_up_exhaust_retries(
mock_sensor,
sensor_reader_factory,
mock_sensor_warm_up,
):
sensor_reader = sensor_reader_factory(max_retries=0)
def test_reader_warm_up_exhaust_retries(reader: SensorReader, mock_sensor_warm_up):
reader.max_retries = 0

with sensor_reader:
with reader:
with pytest.raises(SensorWarmingUp):
next(sensor_reader())

next(reader())

def test_sensor_reader_temp_failure(
mock_sensor,
sensor_reader_factory,
mock_sensor_temp_failure,
):
sensor_reader = sensor_reader_factory()

with sensor_reader:
obs = tuple(sensor_reader())
def test_reader_temp_failure(reader: SensorReader, mock_serial, mock_sensor_temp_failure):
with reader:
obs = tuple(reader())

# check one sample still acquired
assert len(obs) == 1

# check two samples were attempted
assert mock_sensor.stubs["passive_read"].calls == 2
assert mock_serial.stubs["passive_read"].calls == 2


def test_sensor_reader_temp_failure_exhaust_retries(
mock_sensor,
sensor_reader_factory,
mock_sensor_temp_failure,
):
sensor_reader = sensor_reader_factory(max_retries=0)
def test_reader_temp_failure_exhaust_retries(reader: SensorReader, mock_sensor_temp_failure):
reader.max_retries = 0

with sensor_reader:
with reader:
with pytest.raises(SensorWarning):
next(sensor_reader())

next(reader())

def test_sensor_reader_sensor_mismatch(mock_sensor, sensor_reader_factory):
sensor_reader = sensor_reader_factory()

mock_sensor.stub(
def test_reader_sensor_mismatch(reader: SensorReader, mock_serial):
mock_serial.stub(
name="passive_mode", # used for validation
receive_bytes=b"BM\xe1\x00\x00\x01p",
send_bytes=b"123", # nonsense
)

with pytest.raises(UnableToRead) as e:
with sensor_reader as r:
with reader as r:
pass

assert "failed validation" in str(e.value)


def test_sensor_reader_sensor_no_response(sensor_reader_factory):
sensor_reader = sensor_reader_factory(
sensor="PMS3003", # arbitrary sensor
)
def test_reader_sensor_no_response(reader: SensorReader):
reader.sensor = Sensor["PMS3003"] # wrong sensor

with pytest.raises(UnableToRead) as e:
with sensor_reader:
with reader:
pass

assert "did not respond" in str(e.value)

0 comments on commit 2337c99

Please sign in to comment.