October 2018
Intermediate to advanced
420 pages
10h 26m
English
The subscriber is implemented as a component. It uses two drivers: the tcp_client and stdout. Its implementation is as follows:
def rmux_client(sources): response = sources.tcp_client.response.share() tcp_connect = Observable.just(tcp_client.Connect( host='127.0.0.1', port='8080' )) create_observable = ( response .flat_map(lambda connection: Observable.just({ 'what': 'subscribe', 'id':42, 'name': '1234'}) .map(lambda i: json.dumps(i)) .let(frame) .map(lambda j: tcp_client.Write( id=connection.id, data=j.encode())) ) ) console = ( response .flat_map(lambda connection: connection.observable .map(lambda i: i.data.decode('utf-8')) .let(unframe) .map(lambda i: json.loads(i)) .group_by(lambda i: i['id']) .flat_map(lambda ...