Decorators with arguments?
Christopher de Vidal
cbdevidal.jk1 at gmail.com
Mon May 25 18:18:50 EDT 2020
Peter Otten, Cameron Simpson, thank you for your detailed replies :-) I
confess, I didn't quite understand all you were saying. (Still only an
intermediate-level programmer.) But Cameron what you said questioning my
use of decorators and maybe a class instead got me thinking. I realized
what I needed was a function within a function. Couldn't have gotten there
without your help. Working code:
#!/usr/bin/env python3
from google.cloud import firestore
import firebase_admin
from firebase_admin import credentials
import json
import mqtt
from time import sleep
def bridge(col_name):
def on_snapshot(col_snapshot, changes, read_time):
data = dict()
for doc in col_snapshot:
serial = doc.id
data[serial] = json.dumps(doc.to_dict()['value'])
for change in changes:
serial = change.document.id
mqtt_topic = col_name + '/outgoing/' + serial
if change.type.name in ['ADDED', 'MODIFIED']:
contents = data[serial]
mqtt.publish(mqtt_topic, contents)
elif change.type.name == 'REMOVED':
mqtt.publish(mqtt_topic, '')
@mqtt.incoming
def mqtt_subscribe(serial, value):
# TODO Passing a None entry to delete from MQTT doesn't trigger
this
# callback, so it doesn't delete from Firestore. Need this ugly
# workaround 'clear_mqtt'.
if value == 'clear_mqtt':
value = None
mqtt.publish(col_name + '/incoming/' + serial, None)
mqtt.publish(col_name + '/outgoing/' + serial, None)
db.collection(col_name).document(serial).set({'value': value})
col_watch = db.collection(col_name).on_snapshot(on_snapshot)
mqtt.subscribe(col_name + '/incoming/#', mqtt_subscribe)
return col_watch
cred = credentials.Certificate("certs/firebase.json")
firebase_admin.initialize_app(cred)
db = firestore.Client()
mqtt.connect()
adapters = list()
for collection in ['door_status', 'cpu_temp']:
adapters.append(bridge(collection))
while True:
sleep(1)
for adapter in adapters:
adapter.unsubscribe()
Christopher de Vidal
Would you consider yourself a good person? Have you ever taken the 'Good
Person' test? It's a fascinating five minute quiz. Google it.
On Fri, May 15, 2020 at 9:55 AM Peter Otten <__peter__ at web.de> wrote:
> Christopher de Vidal wrote:
>
> > Help please? Creating an MQTT-to-Firestore bridge and I know a decorator
> > would help but I'm stumped how to create one. I've used decorators before
> > but not with arguments.
> >
> > The Firestore collection.on_snapshot() method invokes a callback and
> sends
> > it three parameters (collection_snapshot, changes, and read_time). I need
> > the callback to also know the name of the collection so that I can
> publish
> > to the equivalent MQTT topic name. I had thought to add a fourth
> parameter
> > and I believe a decorator is the right approach but am stumped how to add
> > that fourth parameter. How would I do this with the code below?
> >
> > #!/usr/bin/env python3
> > from google.cloud import firestore
> > import firebase_admin
> > from firebase_admin import credentials
> > import json
> > import mqtt
> >
> >
>
> firebase_admin.initialize_app(credentials.Certificate("certs/firebase.json"))
> > db = firestore.Client()
> > mqtt.connect()
> >
> >
> > def load_json(contents):
> > try:
> > return json.loads(contents)
> > except (json.decoder.JSONDecodeError, TypeError):
> > return contents
> >
> >
> > def on_snapshot(col_name, col_snapshot, changes, read_time):
> > data = dict()
> > for doc in col_snapshot:
> > serial = doc.id
> > contents = load_json(doc.to_dict()['value'])
> > data[serial] = contents
> > for change in changes:
> > serial = change.document.id
> > mqtt_topic = col_name + '/' + serial
> > contents = data[serial]
> > if change.type.name in ['ADDED', 'MODIFIED']:
> > mqtt.publish(mqtt_topic, contents)
> > elif change.type.name == 'REMOVED':
> > mqtt.publish(mqtt_topic, None)
> >
> >
> > # Start repeated code section
> > # TODO Better to use decorators but I was stumped on how to pass
> arguments
> > def door_status_on_snapshot(col_snapshot, changes, read_time):
> > on_snapshot('door_status', col_snapshot, changes, read_time)
> >
> >
> > door_status_col_ref = db.collection('door_status')
> > door_status_col_watch =
> > door_status_col_ref.on_snapshot(door_status_on_snapshot)
> >
> > # Repetition...
> > def cpu_temp_on_snapshot(col_snapshot, changes, read_time):
> > on_snapshot('cpu_temp', col_snapshot, changes, read_time)
> >
> >
> > cpu_temp_col_ref = db.collection('cpu_temp')
> > cpu_temp_col_watch = cpu_temp_col_ref.on_snapshot(cpu_temp_on_snapshot)
> > # End repeated code section
> >
> > # Start repeated code section
> > door_status_col_watch.unsubscribe()
> > cpu_temp_col_watch.unsubscribe()
> > # Repetition...
> > # End repeated code section
> >
> > Christopher de Vidal
>
> You might also consider a contextmanager:
>
> https://docs.python.org/3/library/contextlib.html
>
> # untested
>
> @contextmanager
> def subscribe(name, col_snapshot, changes, read_time):
> def status_on_snapshot(col_snapshot, changes, read_time):
> on_snapshot(name, col_snapshot, changes, read_time)
>
> status_col_ref = db.collection(name)
> status_col_watch = status_col_ref.on_snapshot(door_status_on_snapshot)
> try:
> yield status_col_ref
> finally:
> status_col_watch.unsubscribe()
>
>
> with subscribe("door_status", ...) as door_status_col_ref:
> with subscribe("cpu_temp", ...) as cpu_temp_col_ref:
> ...
>
>
> If there are many uniform ones the nested with statements can be
> generalized:
>
> NAMES = "door_status", "cpu_temp", ...
> with ExitStack() as stack:
> col_refs = [
> stack.enter_context(subscribe(name)) for name in NAMES
> ]
>
> And if you like Camoron's suggestion or the subscribe() generator above
> just
> gets too unwieldy: a custom class can act as a contextmanager, too.
>
> https://docs.python.org/3/reference/compound_stmts.html#with
>
> --
> https://mail.python.org/mailman/listinfo/python-list
>
More information about the Python-list
mailing list