Pulse Meter
So I recently setup a microcontroller (using ESPHome, this repository in particular) to measure the pulses my electricity meter outputs. It's been working ok, but I fear that the wifi connectivity in my basement is terrible and that the microcontroller is missing pulses1 due to this.
I located another SBC2 intending to use it as a small hotspot, and place it close to the microcontroller to improve things, but realized that I couldn't power it reliably with my cheap PoE to USB-adapter for that board in particular, that it was tricky to power things in that part of the basement using anything over than PoE (having a power outlet near the electricity meter, how absurd?!)…so I took a Raspberry Pi 4 instead (as the PoE to USB-C adapters I have are reliable), and spent an hour or so to write some Python.
Below is the script I came up with. I was fearing/hoping that this project would have me learn to use system calls like select, or kqueue/epoll, but it turned out that gpiozero is an excellent library for tasks such as this!
The first working version was much shorter, but I decided to stop juggling global variables for state variables everywhere and just tidy it up a bit. I'm just using the distribution provided versions of gpiozero & paho-mqtt due to lazyness.
# apt install python3-paho-mqtt python3-gpiozero
# https://forum.micropython.org/viewtopic.php?t=7224
# https://github.com/sjmelia/pi-electricity-monitor/blob/master/monitor.py
# https://www.vishay.com/resistors/pulse-energy-calculator/
# https://learn.openenergymonitor.org/electricity-monitoring/pulse-counting/introduction-to-pulse-counting
from paho.mqtt import client as mqtt_client
from gpiozero import LED, DigitalInputDevice
from signal import pause
import random, time, os, logging, json, systemd.daemon
MQTT_TOPIC = "test/test"
MQTT_BROKER = "mqtt.local"
MQTT_PORT = 1883
MQTT_CLIENT_ID = f'python-mqtt-{random.randint(0, 1000)}'
MQTT_USER = "test"
MQTT_PASSWORD = "test"
# Mandatory, which PIN is the photodiode connected to?
PD_PIN = 23
# This was used when writing this script, set to true and connect a
# LED to the correct pin
BLINK_LED = os.environ.get("BLINK_LED", False)
LED_PIN = 24 # Optional
class PulseMeter(object):
def __init__(self, mqtt_topic=MQTT_TOPIC, pulse_rate=1000):
self.last_pulse = False
self.pulse_start_time = False
self.delta = 0
self.current_power = 0.0
self.pulse_rate = pulse_rate
self.mqtt_topic = mqtt_topic
self.mqtt_client = self.connect_mqtt()
self.mqtt_client.loop_start()
def connect_mqtt(self):
def on_connect(client, userdata, flags, rc):
logging.debug(f"broker: {MQTT_BROKER}, port: {MQTT_PORT}, client id: {MQTT_CLIENT_ID}")
if rc == 0:
logging.info("Connected to MQTT Broker!")
else:
logging.warn("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(MQTT_CLIENT_ID)
client.username_pw_set(MQTT_USER, MQTT_PASSWORD)
client.on_connect = on_connect
client.connect(MQTT_BROKER, MQTT_PORT)
return client
def pulse_start(self):
# This isn't even used, but this looks cooler than just having
# a pass call here. :)
self.pulse_start_time = time.time()
def pulse_end(self):
# No last time? Probably in startup, ignore and wait for the
# next pulse...
if not self.last_pulse:
self.last_pulse = time.time()
return
current_time = time.time()
self.delta = current_time - self.last_pulse
self.current_power = 3600 / ( self.delta * self.pulse_rate )
self.last_pulse = current_time
logging.debug(f"power: {self.current_power:.3f} kWh delta: {self.delta}")
self.publish_current_power()
def publish_current_power(self, precision=3):
# JSON due to reasons
output = dict([("power", round(self.current_power, precision))])
logging.debug(f"Trying to publish: {output}")
result = self.mqtt_client.publish(self.mqtt_topic, json.dumps(output))
status = result[0]
if status != 0:
logging.warn(f"Failed to send message to topic {self.mqtt_topic}")
def run():
logging.basicConfig(level=os.environ.get("LOGLEVEL", "INFO"))
pm = PulseMeter()
led = LED(LED_PIN)
pd = DigitalInputDevice(PD_PIN)
pd.when_activated = pm.pulse_start
pd.when_deactivated = pm.pulse_end
if BLINK_LED:
led.blink(on_time=0.01, off_time=1.25)
systemd.daemon.notify('READY=1')
pause()
if __name__ == "__main__":
run()
systemd unit file
Here's a systemd unit file, place it in /etc/systemd/system/pulse-meter.service
or such. I stole this from torfsen/python-systemd-tutorial on GitHub.
[Unit]
Description=Pulse metering service
[Service]
# CHANGE ME
ExecStart=/usr/bin/python3 /path/to/pulse-meter.py
# Disable Python's buffering of STDOUT and STDERR, so that output from the
# service shows up immediately in systemd's logs
Environment=PYTHONUNBUFFERED=1
Restart=on-failure
Type=notify
# /dev/gpio seems to be owned by root/root
# User=python_demo_service
[Install]
WantedBy=default.target
Enable it:
# systemctl daemon-reload
# systemctl enable --now pulse-meter.service
# journalctl -u pulse-meter.service
...
Nov 13 17:59:21 ubuntu systemd[1]: Starting Pulse metering service...
Nov 13 17:59:22 ubuntu python3[11487]: INFO:root:Connected to MQTT Broker!
Nov 13 17:59:22 ubuntu systemd[1]: Started Pulse metering service.
Now I just have to figure out how reliable this is. That shouldn't be so hard, right? (-:
Yes, I've stared at it and counted each pulse from the electricity meter and counted the 'echoes' from the microcontroller setup. They didn't match up, and I've been too lazy to read the actual source code for the protobuf based protocol that is used for reporting to Home Assistant.
An ASUS Tinkerboard, infamous for being hard to power with micro-USB, which ironically also is its primary power source. It's generally recommeded to power it through its VCC pin instead.