Skip to content

Commit

Permalink
Add test to cover SensorReader warm up exception
Browse files Browse the repository at this point in the history
  • Loading branch information
benthorner authored and avaldebe committed Nov 8, 2022
1 parent 3a2bb66 commit 8bc41f9
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 1 deletion.
2 changes: 1 addition & 1 deletion src/pms/core/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ def __call__(self, *, raw: Optional[bool] = None):

try:
obs = self.sensor.decode(buffer)
except (SensorWarmingUp, InconsistentObservation) as e: # pragma: no cover
except (SensorWarmingUp, InconsistentObservation) as e:
logger.debug(e)
time.sleep(5)
except SensorWarning as e: # pragma: no cover
Expand Down
53 changes: 53 additions & 0 deletions tests/core/test_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,31 @@ def mock_sensor(mock_serial):
return mock_serial


@pytest.fixture
def mock_sensor_warm_up(mock_serial):
def passive_read(n):
if n == 1:
# first return a "0" payload ("warming up")
return (
b"BM\x00\x1c" # expected header
+ b"\0" * 26 # payload (to total 32 bytes)
+ b"\x00\xAB" # checksum
)
else:
# then behave like the original stub again
return (
b"BM\x00\x1c" # expected header
+ b".........................." # payload
+ b"\x05W" # checksum
)

mock_serial.stub(
name="passive_read",
receive_bytes=b"BM\xe2\x00\x00\x01q",
send_fn=passive_read,
)


def test_sensor_reader(mock_sensor, monkeypatch):
sensor_reader = reader.SensorReader(
port=mock_sensor.port,
Expand Down Expand Up @@ -180,6 +205,34 @@ def test_sensor_reader_preheat(mock_sensor, monkeypatch, mock_sleep):
assert mock_sleep.slept_for == 5


def test_sensor_reader_warm_up(
mock_sensor,
monkeypatch,
mock_sleep,
mock_sensor_warm_up,
):
sensor_reader = reader.SensorReader(
port=mock_sensor.port,
samples=0,
sensor="PMSx003", # match with stubs
timeout=0.01, # low to avoid hanging on failure
)

# https://github.com/pyserial/pyserial/issues/625
monkeypatch.setattr(
sensor_reader.serial,
"flush",
lambda: None,
)

with sensor_reader as r:
obs = list(r())

# check we slept for warm up
assert mock_sleep.slept_for == 5
assert len(obs) == 1


def test_sensor_reader_sensor_mismatch(mock_sensor, monkeypatch):
sensor_reader = reader.SensorReader(
port=mock_sensor.port,
Expand Down

0 comments on commit 8bc41f9

Please sign in to comment.