Reactive Programming With RxPYยค
Motivationยค
Let's build an API server, which can send data to clients as soon as it arrives.
You might say that's trivial but it is not:
Data may arrive e.g.
- not en bloc but in parts, like from
- "follow tailing" files or
- from stream subscriptions, e.g. via websockets or
- a server implemented as show here, i.e. itself streaming
- from jobs of other clients (but still interesting for us)
- from different systems, with vastly varying response times
In all these cases we want to update the client partially, as soon as new data is present on the server, w/o having to wait for the "full" result (if that even exists).
Important
Also we do not want to constantly poll the server but passively wait for new data, with the server being able to push to all interested parties, once there.
The operationally most simple solution is not via websockets but by sending the responses as chunks.
Solution Stackยค
- Flask as (not so) Microframework
- gevent as async framwork
- ReactiveX
- for a far more high level coordination of jobs and coroutines, compared to using gevent low level machinery
- in order to declaratively parametrize the server's async behaviour via high level coordination functions like this one:
Packagesยค
$ for p in rx gevent flask; do pip show $p || pip install $p; done
$ for p in rx gevent flask; do pip show $p || pip install $p; done
Name: Rx
Version: 3.2.0
Summary: Reactive Extensions (Rx) for Python
Home-page: http://reactivex.io
Author: Dag Brattli
Author-email: dag@brattli.net
License: MIT License
Location: /home/gk/miniconda3/envs/blog_py3.9/lib/python3.9/site-packages
Requires:
Required-by:
Name: gevent
Version: 21.12.0
Summary: Coroutine-based network library
Home-page: http://www.gevent.org/
Author: Denis Bilenko
Author-email: denis.bilenko@gmail.com
License: MIT
Location: /home/gk/miniconda3/envs/blog_py3.9/lib/python3.9/site-packages
Requires: greenlet, setuptools, zope.event, zope.interface
Required-by:
Name: Flask
Version: 2.0.3
Summary: A simple framework for building complex web applications.
Home-page: https://palletsprojects.com/p/flask
Author: Armin Ronacher
Author-email: armin.ronacher@active-4.com
License: BSD-3-Clause
Location: /home/gk/miniconda3/envs/blog_py3.9/lib/python3.9/site-packages
Requires: click, itsdangerous, Jinja2, Werkzeug
Required-by:
Toolsยค
We want to carefully inspect what's going on on the server. For this any invocation of print will also show
- millisecs since process start
- threadname (shortened)
- colors, according to a
who
integer given (so we can see who initiated an event)
Little test, one w/o and one with the gevent monkey patch:
$ python tests/rx/tools.py
[0;38;5;245m 5[0m M [31m[S] msg 1[0m
[0;38;5;245m 56[0m M [33m[2] async msg[0m
[0;38;5;245m 56[0m M [34m[3] async msg[0m
[0;38;5;245m 56[0m M [31m[S] async msg[0m
[0;38;5;245m 106[0m M [35m[4] msg 3[0m
$ python tests/rx/tools.py patched
[0;38;5;245m 115[0m M [31m[S] msg 1[0m
[0;38;5;245m 166[0m 1 [33m[2] async msg[0m
[0;38;5;245m 166[0m 2 [34m[3] async msg[0m
[0;38;5;245m 166[0m 3 [31m[S] async msg[0m
[0;38;5;245m 216[0m M [35m[4] msg 3[0m
M
is MainThread, Numbers like2
in the second column are "Greenlet" numbers. You see w/o the monkey patch that gevent actually coordinates everything on the main thread of the process.- The
[S]
symbol is printed forwho=0
(the default - here the server) - Colors depend on the
who
argument - You can see that the (pretty complex) monkey patching of all(!) blocking calls consumes well over 100 milliseconds startup time.
Console Printer Source
The tests are done in the main section
$ cat tests/rx/tools.py
import time
import sys
from threading import current_thread
import gevent
now = lambda: int(time.time() * 1000)
t0 = now()
dt = lambda: now() - t0
thread = lambda: current_thread().name.replace('Dummy-', '').replace('MainThread', 'M')
gray = '\x1b[0;38;5;245m%s\x1b[0m'
def print(*a, p=print, who=0, **kw):
pre = '%s %2s ' % (gray % ('%5s' % dt()), thread())
msg = ' '.join([str(i) for i in a])
msg = '\x1b[3%sm[%s] %s\x1b[0m' % (who + 1, who or 'S', msg)
p(pre + msg, **kw)
return a[0]
# -------------------- quick test follows -------------------------------------------
if __name__ == '__main__':
from gevent import monkey
sleep, asyn = gevent.sleep, gevent.spawn
if sys.argv[-1] == 'patched':
monkey.patch_all()
assert time.sleep == gevent.sleep # since patched
print('msg 1')
def f(who):
sleep(0.05)
print('async msg', who=who)
asyn(f, 2)
asyn(f, 3)
asyn(f, 0)
sleep(0.1)
print('msg 3', who=4)
Serverยค
Intro: Async HTTP processing with greenletsยค
Any webserver based on gevent will allocate one greenlet per incoming request.
Therefore we can block within the request handler w/o affecting other clients:
def handle_get(job):
# run within a request specific greenlet
# blocks for this client request but servers others
res = request.get("<external api>").text
return res
The job processing, incl. sending the full response was a direct consequence of the client request event
We have to solve now
- How to send data in parts (chunks) to the API clients (browsers, others)
- How to allow any eventhandler on the server to send a data chunk at
- any time
- to any client interested in it
Here is how this is done (framwork API wise): https://bottlepy.org/docs/dev/async.html. Please read - it is pretty concise.
Server Codeยค
We want to build a server, which we can reparametrize declaratively how it coordinates the events.
Currently we have two event sources:
- From the clients (the requests they send)
- From the side effects we trigger, based on those requests. E.g. external websocket respsonses, file reading chunks, ...
We want to completely decouple those event streams and therefore create two subjects: one for jobs, one for response (parts):
Async Server Source
$ cat tests/rx/server.py
#!/usr/bin/env python
import json
import sys
import time
from functools import partial
import gevent
import gevent.queue
import rx as Rx
from flask import Flask, stream_with_context
from gevent import monkey
from gevent.pywsgi import WSGIServer
from rx import operators as rx
from rx.scheduler.eventloop import GEventScheduler as GS
from tools import gevent, now, print
from werkzeug.debug import DebuggedApplication
from werkzeug.serving import run_with_reloader
sys.path.append(__file__.rsplit('/', 1)[0])
monkey.patch_all()
app = Flask(__name__)
GS = GS(gevent)
jobs = Rx.subject.Subject()
results = Rx.subject.Subject()
flush_response = lambda q: q.put(StopIteration) # closing the socket
wait = lambda dt: time.sleep(dt / 1000.0)
asyn = lambda: rx.observe_on(GS) # put pipeline onto a new greenlet
class J:
"""Job Running Pipeline Functions"""
def _run_job(job):
"""syncronous job running"""
# how is the job (transport, behaviour, ...):
_, job = job['meta'].pop, job['job']
parts, dt, q, ts = _('parts'), _('dt'), _('req'), _('ts')
print('running job', job)
for c in range(parts):
wait(dt)
print('got result part', c, job)
res = {
'res': '%s result %s' % (job, c),
'nr': c,
'dt': now() - ts,
'req': q,
}
results.on_next(res)
# Closing the req socket after all parts are there. Normally elsewhere, we do
# not know that normally or 'all there' is never.
# But that can be done from anywhere - e.g. at cancel from the client
# or when his socket closes:
wait(dt)
flush_response(q)
run_job = rx.map(_run_job)
class R:
"""Results Handling Functions"""
def interested_clients(res):
"""Work via a central registry of clients who want job results"""
# here we just added "them" directly into the result:
return [res.pop('req')]
def _send_response(res):
clients = R.interested_clients(res)
ser = json.dumps(res) + '\r\n'
[c.put(ser) for c in clients]
return res
send_response = rx.map(_send_response)
def new_job(job, meta):
"""Production decoupled from repsonse sending"""
jobs.on_next({'job': job, 'meta': meta})
@app.route('/<job>/<int:parts>/<int:dt>')
def index(job, parts, dt):
"""
The request handling greenlet.
The client can parametrize how many data parts the job result should have - and
when they 'arrive'.
Creates a queue which, when seing an item, will cause a chunk response
"""
# eventhandlers can produce here and we'll send to the client:
q = gevent.queue.Queue()
meta = {'parts': parts, 'dt': dt, 'req': q, 'ts': now()}
new_job(job, meta)
return app.response_class(stream_with_context(q), mimetype='application/json')
# ------------------------------------------------------------------------------- server
def reconfigure_server_pipelines(pipelines, subs=[0, 0]):
print('starting processing pipelines for jobs and incomming data')
if subs[0]:
print('stopping old pipeline')
[s.dispose() for s in subs]
for i in [0, 1]:
s = [jobs, results][i].pipe(asyn(), *pipelines[i])
subs[i] = s.subscribe()
return True
def run_server():
print('')
print('starting server at 50000')
http_server = WSGIServer(('', 50000), DebuggedApplication(app))
http_server.serve_forever()
Processing Pipeline(s)ยค
Here is the code
$ cat tests/rx/test_async_server.py
import json
import os
import sys
import time
from functools import partial
from server import J, R, Rx, asyn, reconfigure_server_pipelines, run_server, rx, wait
from tools import gevent, print
sys.path.append(__file__.rsplit('/', 1)[0]) # noqa
gevent.spawn(run_server)
done_clients = []
def test_server_one():
"""Keeping order but processing jobs (and responses) one after the other
Whichever clients' requests is first on the server will see all chunks first:
"""
p = [[J.run_job], [R.send_response]]
reconfigure_server_pipelines(p) and send_requests()
def test_server_two():
"""Here we run the jobs within parallel greenlets"""
p = [
[rx.flat_map(lambda job: Rx.just(job).pipe(asyn(), J.run_job)),],
[R.send_response],
]
reconfigure_server_pipelines(p) and send_requests()
# ------------------------------------------------------------------------------- Tools
# here spawn 3 greenlets, sending 3 requests in parallel, simulating 3 clients"""
# they iterate of the chunks from the server and print them
def send_requests():
# yes we can reconfigure the pipeline while running;
done_clients.clear()
print('')
for client in 1, 2, 3:
s = partial(send_job_req_to_api_server, client=client)
gevent.spawn(s)
while not len(done_clients) == 3:
wait(100)
print('all_done', who=client)
def send_job_req_to_api_server(client):
from requests import get
print('Sending job', who=client)
j, url = json.loads, 'http://127.0.0.1:50000/job%s/3/%s'
chunks = get(url % (client, 100 * client), stream=True)
[print('got chunk', j(r), who=client) for r in chunks.iter_lines()]
print('done client', who=client)
done_clients.append(client)
First we run the non parallel version of the job processor:
$ pytest -xs tests/rx/test_async_server.py -k one 2>/dev/null
============================= test session starts ==============================
platform linux -- Python 3.9.10, pytest-6.2.5, py-1.11.0, pluggy-1.0.0
Using --randomly-seed=860789797
rootdir: /home/gk/repos
plugins: forked-1.4.0, randomly-3.11.0, cov-2.12.1, xdist-2.5.0, sugar-0.9.4
collected 2 items / 1 deselected / 1 selected
tests/rx/test_async_server.py [0;38;5;245m 80[0m M [31m[S] starting processing pipelines for jobs and incomming data[0m
[0;38;5;245m 80[0m M [31m[S] [0m
[0;38;5;245m 81[0m 1 [31m[S] [0m
[0;38;5;245m 81[0m 1 [31m[S] starting server at 50000[0m
[0;38;5;245m 113[0m 2 [32m[1] Sending job[0m
[0;38;5;245m 114[0m 3 [33m[2] Sending job[0m
[0;38;5;245m 115[0m 4 [34m[3] Sending job[0m
[0;38;5;245m 120[0m 5 [31m[S] running job job3[0m
[0;38;5;245m 420[0m 5 [31m[S] got result part 0 job3[0m
[0;38;5;245m 421[0m 4 [34m[3] got chunk {'res': 'job3 result 0', 'nr': 0, 'dt': 301}[0m
[0;38;5;245m 721[0m 5 [31m[S] got result part 1 job3[0m
[0;38;5;245m 722[0m 4 [34m[3] got chunk {'res': 'job3 result 1', 'nr': 1, 'dt': 602}[0m
[0;38;5;245m 1021[0m 5 [31m[S] got result part 2 job3[0m
[0;38;5;245m 1022[0m 4 [34m[3] got chunk {'res': 'job3 result 2', 'nr': 2, 'dt': 902}[0m
[0;38;5;245m 1322[0m 6 [31m[S] running job job1[0m
[0;38;5;245m 1323[0m 4 [34m[3] done client[0m
[0;38;5;245m 1423[0m 6 [31m[S] got result part 0 job1[0m
[0;38;5;245m 1424[0m 2 [32m[1] got chunk {'res': 'job1 result 0', 'nr': 0, 'dt': 1304}[0m
[0;38;5;245m 1524[0m 6 [31m[S] got result part 1 job1[0m
[0;38;5;245m 1524[0m 2 [32m[1] got chunk {'res': 'job1 result 1', 'nr': 1, 'dt': 1405}[0m
[0;38;5;245m 1625[0m 6 [31m[S] got result part 2 job1[0m
[0;38;5;245m 1625[0m 2 [32m[1] got chunk {'res': 'job1 result 2', 'nr': 2, 'dt': 1506}[0m
[0;38;5;245m 1726[0m 7 [31m[S] running job job2[0m
[0;38;5;245m 1726[0m 2 [32m[1] done client[0m
[0;38;5;245m 1926[0m 7 [31m[S] got result part 0 job2[0m
[0;38;5;245m 1927[0m 3 [33m[2] got chunk {'res': 'job2 result 0', 'nr': 0, 'dt': 1807}[0m
[0;38;5;245m 2127[0m 7 [31m[S] got result part 1 job2[0m
[0;38;5;245m 2127[0m 3 [33m[2] got chunk {'res': 'job2 result 1', 'nr': 1, 'dt': 2008}[0m
[0;38;5;245m 2328[0m 7 [31m[S] got result part 2 job2[0m
[0;38;5;245m 2328[0m 3 [33m[2] got chunk {'res': 'job2 result 2', 'nr': 2, 'dt': 2209}[0m
[0;38;5;245m 2530[0m 3 [33m[2] done client[0m
[0;38;5;245m 2591[0m M [34m[3] all_done[0m
.
======================= 1 passed, 1 deselected in 2.72s ========================
And here processing in parallel greenlets, i.e. giving up order for throughput:
$ pytest -xs tests/rx/test_async_server.py -k two 2>/dev/null
============================= test session starts ==============================
platform linux -- Python 3.9.10, pytest-6.2.5, py-1.11.0, pluggy-1.0.0
Using --randomly-seed=3525127434
rootdir: /home/gk/repos
plugins: forked-1.4.0, randomly-3.11.0, cov-2.12.1, xdist-2.5.0, sugar-0.9.4
collected 2 items / 1 deselected / 1 selected
tests/rx/test_async_server.py [0;38;5;245m 78[0m M [31m[S] starting processing pipelines for jobs and incomming data[0m
[0;38;5;245m 79[0m M [31m[S] [0m
[0;38;5;245m 79[0m 1 [31m[S] [0m
[0;38;5;245m 79[0m 1 [31m[S] starting server at 50000[0m
[0;38;5;245m 111[0m 2 [32m[1] Sending job[0m
[0;38;5;245m 112[0m 3 [33m[2] Sending job[0m
[0;38;5;245m 113[0m 4 [34m[3] Sending job[0m
[0;38;5;245m 117[0m 6 [31m[S] running job job1[0m
[0;38;5;245m 118[0m 8 [31m[S] running job job2[0m
[0;38;5;245m 118[0m 10 [31m[S] running job job3[0m
[0;38;5;245m 218[0m 6 [31m[S] got result part 0 job1[0m
[0;38;5;245m 219[0m 2 [32m[1] got chunk {'res': 'job1 result 0', 'nr': 0, 'dt': 102}[0m
[0;38;5;245m 318[0m 6 [31m[S] got result part 1 job1[0m
[0;38;5;245m 318[0m 8 [31m[S] got result part 0 job2[0m
[0;38;5;245m 319[0m 2 [32m[1] got chunk {'res': 'job1 result 1', 'nr': 1, 'dt': 202}[0m
[0;38;5;245m 319[0m 3 [33m[2] got chunk {'res': 'job2 result 0', 'nr': 0, 'dt': 202}[0m
[0;38;5;245m 418[0m 10 [31m[S] got result part 0 job3[0m
[0;38;5;245m 419[0m 6 [31m[S] got result part 2 job1[0m
[0;38;5;245m 419[0m 2 [32m[1] got chunk {'res': 'job1 result 2', 'nr': 2, 'dt': 303}[0m
[0;38;5;245m 419[0m 4 [34m[3] got chunk {'res': 'job3 result 0', 'nr': 0, 'dt': 301}[0m
[0;38;5;245m 519[0m 8 [31m[S] got result part 1 job2[0m
[0;38;5;245m 519[0m 3 [33m[2] got chunk {'res': 'job2 result 1', 'nr': 1, 'dt': 403}[0m
[0;38;5;245m 519[0m 2 [32m[1] done client[0m
[0;38;5;245m 719[0m 10 [31m[S] got result part 1 job3[0m
[0;38;5;245m 719[0m 8 [31m[S] got result part 2 job2[0m
[0;38;5;245m 719[0m 3 [33m[2] got chunk {'res': 'job2 result 2', 'nr': 2, 'dt': 603}[0m
[0;38;5;245m 720[0m 4 [34m[3] got chunk {'res': 'job3 result 1', 'nr': 1, 'dt': 602}[0m
[0;38;5;245m 921[0m 3 [33m[2] done client[0m
[0;38;5;245m 1019[0m 10 [31m[S] got result part 2 job3[0m
[0;38;5;245m 1020[0m 4 [34m[3] got chunk {'res': 'job3 result 2', 'nr': 2, 'dt': 902}[0m
[0;38;5;245m 1321[0m 4 [34m[3] done client[0m
[0;38;5;245m 1386[0m M [34m[3] all_done[0m
.
======================= 1 passed, 1 deselected in 1.51s ========================
Todoยค
- Invent a mapping registry of job result (parts) to interested clients.
- Who is interested in what should be up to the client - server should only care about security. See below.
- Once you have that, do e.g.
pipe(rx.group_by(job_data_source), rx.buffer_with_time(1), rx.run_job_list
, to not overload external api with many small requests (use their list APIs). - Send all jobs you have at e.g. dashboard open in ONE request - and stream all results via that socket
Suggestion: Expressing Interestยค
How can we enable clients to express sth. like "Dear API server, I want to get all data updates from any job,
containing joe@doe.com for the next 10 seconds
( no matter the nesting level of that email address
within the data schemas).
In order to facilitate clients to be able to express and update to the server, in which data they are interested, you might look into an expression library like this one.
The most common conditions will be e.g. ids of certain data objects, like cpeids, userids, (...).
Problem here that those are typically not always at the "same places" within the data.
One solution might be, to define, when implementing a new use case, also sth like a 'possibly interesting value extractor' function and then push those possibly interesting values into a dedicated stream, mapping them to jobids. When client interest matches, you can pull the full data from short lived memory caches.
Redis streams would be a good choice, when such values arrive on many server processes.
Then store also the process (or local redis server) where the full data is in the cache.