The publisher is implemented as a Cyclotron component, being the entry point of the application. This component implements the three layers of remote stream multiplexing in fewer than 30 lines of code, thanks to the ReactiveX operators. This component uses only one driver, the TCP server implemented in the previous example. Here is the function implementing this component:
def rmux_server(sources): tcp_listen = Observable.just(tcp_server.Listen( host='127.0.0.1', port='8080' )) beat = ( sources.tcp_server.response .flat_map(lambda connection: connection.observable .map(lambda i: i.data.decode('utf-8')) .let(unframe) .map(lambda i: json.loads(i)) .flat_map(lambda subscription: create_observable[ subscription['name']]() ...