Skip to content

Reactive Programming With RxPYยค

Motivationยค

Let's build an API server, which can send data to clients as soon as it arrives.

You might say that's trivial but it is not:

Data may arrive e.g.

  • not en bloc but in parts, like from
    • "follow tailing" files or
    • from stream subscriptions, e.g. via websockets or
    • a server implemented as show here, i.e. itself streaming
  • from jobs of other clients (but still interesting for us)
  • from different systems, with vastly varying response times

In all these cases we want to update the client partially, as soon as new data is present on the server, w/o having to wait for the "full" result (if that even exists).

Important

Also we do not want to constantly poll the server but passively wait for new data, with the server being able to push to all interested parties, once there.

The operationally most simple solution is not via websockets but by sending the responses as chunks.

Solution Stackยค

  • Flask as (not so) Microframework
  • gevent as async framwork
  • ReactiveX
    • for a far more high level coordination of jobs and coroutines, compared to using gevent low level machinery
    • in order to declaratively parametrize the server's async behaviour via high level coordination functions like this one:

Packagesยค

$ for p in rx gevent flask; do pip show $p || pip install $p; done

$ for p in rx gevent flask; do pip show $p || pip install $p; done
Name: Rx
Version: 3.2.0
Summary: Reactive Extensions (Rx) for Python
Home-page: http://reactivex.io
Author: Dag Brattli
Author-email: dag@brattli.net
License: MIT License
Location: /home/gk/miniconda3/envs/blog_py3.9/lib/python3.9/site-packages
Requires: 
Required-by: 
Name: gevent
Version: 21.12.0
Summary: Coroutine-based network library
Home-page: http://www.gevent.org/
Author: Denis Bilenko
Author-email: denis.bilenko@gmail.com
License: MIT
Location: /home/gk/miniconda3/envs/blog_py3.9/lib/python3.9/site-packages
Requires: greenlet, setuptools, zope.event, zope.interface
Required-by: 
Name: Flask
Version: 2.0.3
Summary: A simple framework for building complex web applications.
Home-page: https://palletsprojects.com/p/flask
Author: Armin Ronacher
Author-email: armin.ronacher@active-4.com
License: BSD-3-Clause
Location: /home/gk/miniconda3/envs/blog_py3.9/lib/python3.9/site-packages
Requires: click, itsdangerous, Jinja2, Werkzeug
Required-by:

Toolsยค

We want to carefully inspect what's going on on the server. For this any invocation of print will also show

  • millisecs since process start
  • threadname (shortened)
  • colors, according to a who integer given (so we can see who initiated an event)

Little test, one w/o and one with the gevent monkey patch:

$ python tests/rx/tools.py
    5  M [S] msg 1
   56  M [2] async msg
   56  M [3] async msg
   56  M [S] async msg
  106  M [4] msg 3
$ python tests/rx/tools.py patched
  115  M [S] msg 1
  166  1 [2] async msg
  166  2 [3] async msg
  166  3 [S] async msg
  216  M [4] msg 3
  • M is MainThread, Numbers like 2 in the second column are "Greenlet" numbers. You see w/o the monkey patch that gevent actually coordinates everything on the main thread of the process.
  • The [S] symbol is printed for who=0 (the default - here the server)
  • Colors depend on the who argument
  • You can see that the (pretty complex) monkey patching of all(!) blocking calls consumes well over 100 milliseconds startup time.
Console Printer Source

The tests are done in the main section

$ cat tests/rx/tools.py
import time
import sys
from threading import current_thread
import gevent


now = lambda: int(time.time() * 1000)
t0 = now()
dt = lambda: now() - t0
thread = lambda: current_thread().name.replace('Dummy-', '').replace('MainThread', 'M')
gray = '\x1b[0;38;5;245m%s\x1b[0m'


def print(*a, p=print, who=0, **kw):
    pre = '%s %2s ' % (gray % ('%5s' % dt()), thread())
    msg = ' '.join([str(i) for i in a])
    msg = '\x1b[3%sm[%s] %s\x1b[0m' % (who + 1, who or 'S', msg)
    p(pre + msg, **kw)
    return a[0]


#  --------------------   quick test follows -------------------------------------------
if __name__ == '__main__':
    from gevent import monkey

    sleep, asyn = gevent.sleep, gevent.spawn
    if sys.argv[-1] == 'patched':
        monkey.patch_all()
        assert time.sleep == gevent.sleep  # since patched
    print('msg 1')

    def f(who):
        sleep(0.05)
        print('async msg', who=who)

    asyn(f, 2)
    asyn(f, 3)
    asyn(f, 0)

    sleep(0.1)
    print('msg 3', who=4)

Serverยค

Intro: Async HTTP processing with greenletsยค

Any webserver based on gevent will allocate one greenlet per incoming request.

Therefore we can block within the request handler w/o affecting other clients:

def handle_get(job):
    # run within a request specific greenlet
    # blocks for this client request but servers others
    res = request.get("<external api>").text 
    return res

The job processing, incl. sending the full response was a direct consequence of the client request event

We have to solve now

  1. How to send data in parts (chunks) to the API clients (browsers, others)
  2. How to allow any eventhandler on the server to send a data chunk at
    • any time
    • to any client interested in it

Here is how this is done (framwork API wise): https://bottlepy.org/docs/dev/async.html. Please read - it is pretty concise.

Server Codeยค

We want to build a server, which we can reparametrize declaratively how it coordinates the events.

Currently we have two event sources:

  1. From the clients (the requests they send)
  2. From the side effects we trigger, based on those requests. E.g. external websocket respsonses, file reading chunks, ...

We want to completely decouple those event streams and therefore create two subjects: one for jobs, one for response (parts):

Async Server Source
$ cat tests/rx/server.py
#!/usr/bin/env python
import json
import sys
import time
from functools import partial

import gevent
import gevent.queue
import rx as Rx
from flask import Flask, stream_with_context
from gevent import monkey
from gevent.pywsgi import WSGIServer
from rx import operators as rx
from rx.scheduler.eventloop import GEventScheduler as GS
from tools import gevent, now, print
from werkzeug.debug import DebuggedApplication
from werkzeug.serving import run_with_reloader

sys.path.append(__file__.rsplit('/', 1)[0])


monkey.patch_all()
app = Flask(__name__)
GS = GS(gevent)
jobs = Rx.subject.Subject()
results = Rx.subject.Subject()
flush_response = lambda q: q.put(StopIteration)  # closing the socket
wait = lambda dt: time.sleep(dt / 1000.0)
asyn = lambda: rx.observe_on(GS)  # put pipeline onto a new greenlet


class J:
    """Job Running Pipeline Functions"""

    def _run_job(job):
        """syncronous job running"""
        # how is the job (transport, behaviour, ...):
        _, job = job['meta'].pop, job['job']
        parts, dt, q, ts = _('parts'), _('dt'), _('req'), _('ts')
        print('running job', job)
        for c in range(parts):
            wait(dt)
            print('got result part', c, job)
            res = {
                'res': '%s result %s' % (job, c),
                'nr': c,
                'dt': now() - ts,
                'req': q,
            }
            results.on_next(res)
        # Closing the req socket after all parts are there. Normally elsewhere, we do
        # not know that normally or 'all there' is never.
        # But that can be done from anywhere - e.g. at cancel from the client
        # or when his socket closes:
        wait(dt)
        flush_response(q)

    run_job = rx.map(_run_job)


class R:
    """Results Handling Functions"""

    def interested_clients(res):
        """Work via a central registry of clients who want job results"""
        # here we just added "them" directly into the result:
        return [res.pop('req')]

    def _send_response(res):
        clients = R.interested_clients(res)
        ser = json.dumps(res) + '\r\n'
        [c.put(ser) for c in clients]
        return res

    send_response = rx.map(_send_response)


def new_job(job, meta):
    """Production decoupled from repsonse sending"""
    jobs.on_next({'job': job, 'meta': meta})


@app.route('/<job>/<int:parts>/<int:dt>')
def index(job, parts, dt):
    """
    The request handling greenlet.

    The client can parametrize how many data parts the job result should have - and
    when they 'arrive'.

    Creates a queue which, when seing an item, will cause a chunk response
    """
    # eventhandlers can produce here and we'll send to the client:
    q = gevent.queue.Queue()
    meta = {'parts': parts, 'dt': dt, 'req': q, 'ts': now()}
    new_job(job, meta)
    return app.response_class(stream_with_context(q), mimetype='application/json')


# ------------------------------------------------------------------------------- server
def reconfigure_server_pipelines(pipelines, subs=[0, 0]):
    print('starting processing pipelines for jobs and incomming data')
    if subs[0]:
        print('stopping old pipeline')
        [s.dispose() for s in subs]

    for i in [0, 1]:
        s = [jobs, results][i].pipe(asyn(), *pipelines[i])
        subs[i] = s.subscribe()
    return True


def run_server():
    print('')
    print('starting server at 50000')
    http_server = WSGIServer(('', 50000), DebuggedApplication(app))
    http_server.serve_forever()

Processing Pipeline(s)ยค

Here is the code

$ cat tests/rx/test_async_server.py
import json
import os
import sys
import time
from functools import partial

from server import J, R, Rx, asyn, reconfigure_server_pipelines, run_server, rx, wait
from tools import gevent, print

sys.path.append(__file__.rsplit('/', 1)[0])  # noqa
gevent.spawn(run_server)

done_clients = []


def test_server_one():
    """Keeping order but processing jobs (and responses) one after the other
    Whichever clients' requests is first on the server will see all chunks first:
    """
    p = [[J.run_job], [R.send_response]]
    reconfigure_server_pipelines(p) and send_requests()


def test_server_two():
    """Here we run the jobs within parallel greenlets"""
    p = [
        [rx.flat_map(lambda job: Rx.just(job).pipe(asyn(), J.run_job)),],
        [R.send_response],
    ]
    reconfigure_server_pipelines(p) and send_requests()


# ------------------------------------------------------------------------------- Tools
# here spawn 3 greenlets, sending 3 requests in parallel, simulating 3 clients"""
# they iterate of the chunks from the server and print them
def send_requests():
    # yes we can reconfigure the pipeline while running;
    done_clients.clear()
    print('')
    for client in 1, 2, 3:
        s = partial(send_job_req_to_api_server, client=client)
        gevent.spawn(s)
    while not len(done_clients) == 3:
        wait(100)
    print('all_done', who=client)


def send_job_req_to_api_server(client):
    from requests import get

    print('Sending job', who=client)
    j, url = json.loads, 'http://127.0.0.1:50000/job%s/3/%s'
    chunks = get(url % (client, 100 * client), stream=True)
    [print('got chunk', j(r), who=client) for r in chunks.iter_lines()]
    print('done client', who=client)
    done_clients.append(client)

First we run the non parallel version of the job processor:

$ pytest -xs tests/rx/test_async_server.py -k one 2>/dev/null
============================= test session starts ==============================
platform linux -- Python 3.9.10, pytest-6.2.5, py-1.11.0, pluggy-1.0.0
Using --randomly-seed=860789797
rootdir: /home/gk/repos
plugins: forked-1.4.0, randomly-3.11.0, cov-2.12.1, xdist-2.5.0, sugar-0.9.4
collected 2 items / 1 deselected / 1 selected

tests/rx/test_async_server.py    80  M [S] starting processing pipelines for jobs and incomming data
   80  M [S] 
   81  1 [S] 
   81  1 [S] starting server at 50000
  113  2 [1] Sending job
  114  3 [2] Sending job
  115  4 [3] Sending job
  120  5 [S] running job job3
  420  5 [S] got result part 0 job3
  421  4 [3] got chunk {'res': 'job3 result 0', 'nr': 0, 'dt': 301}
  721  5 [S] got result part 1 job3
  722  4 [3] got chunk {'res': 'job3 result 1', 'nr': 1, 'dt': 602}
 1021  5 [S] got result part 2 job3
 1022  4 [3] got chunk {'res': 'job3 result 2', 'nr': 2, 'dt': 902}
 1322  6 [S] running job job1
 1323  4 [3] done client
 1423  6 [S] got result part 0 job1
 1424  2 [1] got chunk {'res': 'job1 result 0', 'nr': 0, 'dt': 1304}
 1524  6 [S] got result part 1 job1
 1524  2 [1] got chunk {'res': 'job1 result 1', 'nr': 1, 'dt': 1405}
 1625  6 [S] got result part 2 job1
 1625  2 [1] got chunk {'res': 'job1 result 2', 'nr': 2, 'dt': 1506}
 1726  7 [S] running job job2
 1726  2 [1] done client
 1926  7 [S] got result part 0 job2
 1927  3 [2] got chunk {'res': 'job2 result 0', 'nr': 0, 'dt': 1807}
 2127  7 [S] got result part 1 job2
 2127  3 [2] got chunk {'res': 'job2 result 1', 'nr': 1, 'dt': 2008}
 2328  7 [S] got result part 2 job2
 2328  3 [2] got chunk {'res': 'job2 result 2', 'nr': 2, 'dt': 2209}
 2530  3 [2] done client
 2591  M [3] all_done
.

======================= 1 passed, 1 deselected in 2.72s ========================

And here processing in parallel greenlets, i.e. giving up order for throughput:

$ pytest -xs tests/rx/test_async_server.py -k two 2>/dev/null
============================= test session starts ==============================
platform linux -- Python 3.9.10, pytest-6.2.5, py-1.11.0, pluggy-1.0.0
Using --randomly-seed=3525127434
rootdir: /home/gk/repos
plugins: forked-1.4.0, randomly-3.11.0, cov-2.12.1, xdist-2.5.0, sugar-0.9.4
collected 2 items / 1 deselected / 1 selected

tests/rx/test_async_server.py    78  M [S] starting processing pipelines for jobs and incomming data
   79  M [S] 
   79  1 [S] 
   79  1 [S] starting server at 50000
  111  2 [1] Sending job
  112  3 [2] Sending job
  113  4 [3] Sending job
  117  6 [S] running job job1
  118  8 [S] running job job2
  118 10 [S] running job job3
  218  6 [S] got result part 0 job1
  219  2 [1] got chunk {'res': 'job1 result 0', 'nr': 0, 'dt': 102}
  318  6 [S] got result part 1 job1
  318  8 [S] got result part 0 job2
  319  2 [1] got chunk {'res': 'job1 result 1', 'nr': 1, 'dt': 202}
  319  3 [2] got chunk {'res': 'job2 result 0', 'nr': 0, 'dt': 202}
  418 10 [S] got result part 0 job3
  419  6 [S] got result part 2 job1
  419  2 [1] got chunk {'res': 'job1 result 2', 'nr': 2, 'dt': 303}
  419  4 [3] got chunk {'res': 'job3 result 0', 'nr': 0, 'dt': 301}
  519  8 [S] got result part 1 job2
  519  3 [2] got chunk {'res': 'job2 result 1', 'nr': 1, 'dt': 403}
  519  2 [1] done client
  719 10 [S] got result part 1 job3
  719  8 [S] got result part 2 job2
  719  3 [2] got chunk {'res': 'job2 result 2', 'nr': 2, 'dt': 603}
  720  4 [3] got chunk {'res': 'job3 result 1', 'nr': 1, 'dt': 602}
  921  3 [2] done client
 1019 10 [S] got result part 2 job3
 1020  4 [3] got chunk {'res': 'job3 result 2', 'nr': 2, 'dt': 902}
 1321  4 [3] done client
 1386  M [3] all_done
.

======================= 1 passed, 1 deselected in 1.51s ========================

Todoยค

  • Invent a mapping registry of job result (parts) to interested clients.
  • Who is interested in what should be up to the client - server should only care about security. See below.
  • Once you have that, do e.g. pipe(rx.group_by(job_data_source), rx.buffer_with_time(1), rx.run_job_list, to not overload external api with many small requests (use their list APIs).
  • Send all jobs you have at e.g. dashboard open in ONE request - and stream all results via that socket

Suggestion: Expressing Interestยค

How can we enable clients to express sth. like "Dear API server, I want to get all data updates from any job, containing joe@doe.com for the next 10 seconds ( no matter the nesting level of that email address within the data schemas).

In order to facilitate clients to be able to express and update to the server, in which data they are interested, you might look into an expression library like this one.

The most common conditions will be e.g. ids of certain data objects, like cpeids, userids, (...).

Problem here that those are typically not always at the "same places" within the data.

One solution might be, to define, when implementing a new use case, also sth like a 'possibly interesting value extractor' function and then push those possibly interesting values into a dedicated stream, mapping them to jobids. When client interest matches, you can pull the full data from short lived memory caches.

Redis streams would be a good choice, when such values arrive on many server processes.

Then store also the process (or local redis server) where the full data is in the cache.

Back to top