...
/Solution: Build a Sensor Monitoring System
Solution: Build a Sensor Monitoring System
Learn how to implement the coded solution of a sensor data monitoring system using concurrency.
Implement sensor simulation
Define sensor classes (TemperatureSensor
, PressureSensor
, and VibrationSensor
) that simulate sensor data and detect anomalies, producing alerts based on specific thresholds and tracking last alerts and counts.
Press + to interact
import randomimport timeclass Sensor:def __init__(self, sensor_id):self.sensor_id = sensor_idself.last_alert = Noneself.alert_count = 0def generate_data(self):raise NotImplementedError("Subclasses must implement generate_data method")def process_data(self, data):raise NotImplementedError("Subclasses must implement process_data method")class TemperatureSensor(Sensor):def generate_data(self):temperature = random.uniform(20, 30) # Simulate temperature readingsreturn {"sensor_id": self.sensor_id, "temperature": temperature}def process_data(self, data):temperature = data["temperature"]if temperature > 28 and self.last_alert != "high_temperature":self.last_alert = "high_temperature"self.alert_count += 1return f"High temperature detected! Sensor {data['sensor_id']} reading: {temperature:.2f}°C"elif temperature <= 28 and self.last_alert == "high_temperature":self.last_alert = Nonereturn "Temperature back to normal."class PressureSensor(Sensor):def generate_data(self):pressure = random.uniform(100, 150) # Simulate pressure readingsreturn {"sensor_id": self.sensor_id, "pressure": pressure}def process_data(self, data):pressure = data["pressure"]if pressure > 130 and self.last_alert != "high_pressure":self.last_alert = "high_pressure"self.alert_count += 1return f"High pressure detected! Sensor {data['sensor_id']} reading: {pressure:.2f} psi"elif pressure <= 130 and self.last_alert == "high_pressure":self.last_alert = Nonereturn "Pressure back to normal."class VibrationSensor(Sensor):def generate_data(self):vibration = random.uniform(0.5, 2.5) # Simulate vibration readingsreturn {"sensor_id": self.sensor_id, "vibration": vibration}def process_data(self, data):vibration = data["vibration"]if vibration > 2.0 and self.last_alert != "excessive_vibration":self.last_alert = "excessive_vibration"self.alert_count += 1return f"Excessive vibration detected! Sensor {data['sensor_id']} reading: {vibration:.2f}"elif vibration <= 2.0 and self.last_alert == "excessive_vibration":self.last_alert = Nonereturn "Vibration back to normal."
Code explanation
-
Sensor
class:-
Line–8: Initialized a
Sensor
object with a uniquesensor_id
. Also, initialized properties for tracking the last alert type (last_alert
) and the count of alerts (alert_count
). -
Line 10–11: Created an abstract method that must be implemented by subclasses. It generates simulated sensor data specific to the ...
-