When almost a year ago I watched interesting course Principles of Reactive Programming, I found that many examples of FRP are about circuits and wires. I tried to do something similar to course examples on arduino, but nothing good came, mostly because low expressiveness of Wiring.
And now I trying to do it with pyboard and my microasync library.
First of all, I wrote simple actor (not a really actor, but something similar) for bicolor LED:
from microasync.async import as_chan, do_all
from microasync.device import get_output_pin
@as_chan(Channel) # now this function returns channel
def get_bicolor_led(chan, left, right): # `channel` is this actor "mailbox"
left_pin = get_output_pin(left) # returns channel for a pin, when we put 1 in that
right_pin = get_output_pin(right) # channel then voltage will be set to 3.3V; when we put 0 - 0V
while True:
msg = yield chan.get() # receive message from a channel
if msg == 'red':
yield do_all(left_pin.put(1), # do commands sequentially
right_pin.put(0))
elif msg == 'green':
yield do_all(left_pin.put(0),
right_pin.put(1))
elif msg == 'yellow':
yield do_all(left_pin.put(1),
right_pin.put(1))
elif msg == 'none':
yield do_all(left_pin.put(0),
right_pin.put(0))
This actor is simple to use, for changing color of LED we just need to send (put in channel)
red
, green
, yellow
or none
:
from microasync.async import coroutine
@coroutine
def test_bicolor_led():
led = get_bicolar_led('X1', 'X2')
yield led.put('red') # switch LED color to red
yield led.put('green') # switch LED color to green
yield led.put('yellow') # switch LED color to yellow (red and green together)
yield led.put('none') # turn off LED
Then I created simple filter
for channel, which can be toggled by button on pyboard:
from microasync.async import as_chan, select
from microasync.device import get_switch
@as_chan(Channel)
def switchable_filter(chan, orig_chan, fn):
# Returns channel from which we can get values from switch and this actor
# "mailbox", works almost like `select` from golang:
select_ch = select(get_switch(), chan)
enabled = False
while True:
result_ch, val = yield select_ch.get()
if result_ch == chan:
if not enabled or fn(val): # apply filter if filter enabled
yield orig_chan.put(val)
else:
enabled = not enabled # toggle filter state
Now I created simple coroutine, which sends red
, green
, yellow
and none
to two LEDs sequentially in loop. And when we click button on pyboard we
toggle filter, which passess all messages except red
to the first LED
and only red
to the second LED:
from microasync.async import coroutine
@coroutine
def main():
first_led = switchable_filter(get_bicolor_led('X1', 'X2'), # creates bicolor led and applies filter
lambda msg: msg != 'red')
second_led = switchable_filter(get_bicolor_led('X3', 'X4'),
lambda msg: msg == 'red')
while True:
for led in (first_led, second_led):
for mode in ('red', 'green', 'yellow', 'none'):
yield led.put(mode) # sends red, green, yellow, none
Full source code of example available on github. And video with result:
* not a really FRP, but almost follows The Reactive Manifesto