1 / 9
rx-controls-suite · Python Edition
Reactive Programming
for Tango Controls
One programming model · Two languages · Same operator vocabulary
Igor Khokhriakov  ·  Principal Software Engineer
2 / 9
Motivation Same Model Across Languages & Platforms Completing the matrix
Tango EPICS
Java RxTango/java ✓
RxJava3 · jbang · TCK-verified
future
Python RxTango/python ✓
RxPY v4 · uv · unit-tested
RxEpics/python ✓
RxPY v4 · caproto
All three use the same operator vocabulary
zip · merge · buffer_with_count · scan · sample · flat_map
The only difference is the control-system call underneath
3 / 9
Real Application 1 Correlated Multi-Attribute Reads Beyond bulk read
The scenario
"Every 500 ms, read current and beam_position from two different devices — and only process the pair if both arrived in the same polling window. Discard if either fails."
With today's tools…
  • read_attributes() — reduces round-trips, but only works on the same device
  • Two sequential reads across devices — a timing gap opens between them
  • Sardana environment monitor — if your facility is already on that stack
  • Custom thread pair with Lock + asyncio.Event + careful timeout logic
Most facilities end up implementing variations of this pattern — each slightly different, each with its own edge cases around failure handling and timing.
With Rx — rx.zip()  ·  demoed as correlate.py
Python · rx.zip()
rx.interval(timedelta(ms=500), scheduler=scheduler).pipe(

    ops.flat_map(lambda _: rx.zip(

        # Both reads fire in parallel
        read_attribute(device1, "current"),
        read_attribute(device2, "beam_position"),

    ))

).subscribe(
    on_next=lambda pair: process(*pair),
    on_error=lambda e: log(e),
    scheduler=scheduler,
)
✓  If either read fails, the pair is silently dropped — never half-processed.
✓  Zero Lock. Zero shared state.
4 / 9
Real Application 2 Real-Time Stream Processing Beyond alarm daemons
The scenario
"Poll double_scalar at 20 Hz, compute a 5-sample sliding average, and only write the smoothed value back if it deviates more than 10% from the previous write."
With today's tools…
  • PANIC — excellent for threshold alerting, but this is not an alarm
  • Custom Python script: a deque, a counter, a threshold check, a write guard
  • A PANIC formula expression — if the formula engine happens to support it
  • A Sardana macro — if you're already in that stack
Most facilities end up reimplementing the same scaffolding — sliding windows, rate control, conditional writes — in slightly different ways each time.
With Rx — demoed as sliding_average.py + pipeline.py
Python · buffer_with_count + distinct_until_changed
rx.interval(timedelta(ms=50), scheduler=scheduler).pipe(
    ops.flat_map(
        lambda _: read_attribute(device, "double_scalar")),

    # Sliding window — no deque, no index arithmetic
    ops.buffer_with_count(count=5, skip=1),
    ops.map(lambda w: sum(w) / len(w)),

    # Skip write if within 10% of last written value
    ops.distinct_until_changed(
        comparer=lambda p, c: abs(c - p) / (p or 1e-9) < 0.1
    ),

    ops.flat_map(
        lambda v: write_attribute(device, "double_scalar_w", v)),
).subscribe(on_next=print, scheduler=scheduler)
✓  No daemon. No formula DSL. No deque.
✓  Runs anywhere Python runs.
5 / 9
Concepts Reactive Streams — 2 Minutes Not a library. A specification.
The Contract — reactive-streams.org
Publisher<T> Produces items — one method: subscribe()
Subscriber<T> onNext / onError / onComplete
Subscription Back-pressure: request(n) and cancel()
Processor<T,R> Both Publisher and Subscriber
RxPY v4 follows the same observable contract as RxJava3.
Unit tests verify reactive-spec compliance via mocked DeviceProxy.
Multiplatform — Learn Once, Use Everywhere
PlatformLibrary
Java / JVMRxJava3, Project Reactor
KotlinKotlin Flow
JavaScriptRxJS
PythonRxPY (reactivex) ← here
.NETSystem.Reactive
Swift / iOSRxSwift
Python scripts, Java servers, JS dashboards — all talking to the same devices. One paradigm across all of them.
Mental Model — It's a Data Structure
source
operator
operator
subscriber
⚠ Common misconception A reactive stream is not just a dynamic event bus.
It is a lazy, composable data structure — like list, dict or set, but for asynchronous sequences.
Nothing runs until someone subscribes.
Key operators used in demos:
zip · merge · buffer_with_count · scan · sample · distinct_until_changed
Back-Pressure — Flow Control in Control Systems
What happens when producers are faster than consumers?
  • Detector emitting events faster than analysis code
  • Multiple devices feeding a single processing pipeline
buffer_with_count() accumulate, process in batches
sample() keep freshest value per window
throttle_with_timeout() rate-limit by time
ops.catch() isolate per-device errors
The application explicitly chooses the overload strategy — no silent buffer overflow.
6 / 9
Library rxtango — Data Flow reference implementation

Tango Device (PyTango DeviceProxy)

Attribute reads & writes, command calls, event subscriptions. Blocking calls bridged to asyncio via run_in_executor.

Reactive Observable

read_attribute · write_attribute · execute_command · monitor_attribute — all rx.Observable via rx.create

RxPY Operators

map · zip · merge · buffer_with_count · filter · scan · sample — compose, transform, correlate, reduce streams

Application Logic

Pure functions — calibration, averaging, threshold checks. No I/O, no shared state.

Commands / Writes / Downstream

Results written back to devices, forwarded to pipelines, logged to telemetry systems

Real-World Example
Beamline Feedback Loop
Detector intensity → sliding average → drift detection → magnet correction
Python · feedback pipeline
rx.interval(timedelta(ms=50), scheduler=sched).pipe(
    ops.flat_map(
        lambda _: read_attribute(detector, "intensity")),

    # noise reduction — no circular buffer
    ops.buffer_with_count(count=5, skip=1),
    ops.map(lambda w: sum(w) / len(w)),

    # only act on out-of-range readings
    ops.filter(lambda v: out_of_range(v)),

    # issue correction
    ops.flat_map(
        lambda v: write_attribute(magnet, "setpoint", v)),
).subscribe(on_next=log, scheduler=sched)
Same pattern as the Java version — different language, identical model.
asyncio-native · run_in_executor bridges PyTango · unit-tested without a live device
7 / 9
Demo What We'll Run Python examples · uv · no build step
Basic
read_attribute.py Single-shot Observable — simplest case
poll_attribute.py Continuous read — no loop
Coordination
multi_device_snapshot.py Parallel reads, concurrent by default
correlate.py ★ zip — guaranteed atomic pair
Stream Processing
alarm_monitor.py ★ merge + filter — isolated per-device failure
sliding_average.py ★ buffer_with_count(N,1) — rolling mean, no deque
throttle.py Rate control with sample
running_stats.py Live streaming stats with scan
Composition
calibration_pipeline.py Read → transform → write pipeline
pipeline.py ★ Showstopper — fluent TangoClient 6-step chain
Start here
shell
# 1. Bring up the Tango stack
docker compose up -d
# MariaDB → DatabaseDS → TangoTest

# 2. Install
cd RxTango/python
uv pip install -e .

# 3. First example
python examples/read_attribute.py

# 4. The showstopper
python examples/pipeline.py \
  tango://localhost:10000/sys/tg_test/1
Tests run without a live device:
pytest -v — mocked DeviceProxy, TestScheduler marble tests.
★ directly address the real-world patterns shown after the demo.
github.com/scientific-software-hub/rx-controls-suite
AGPL-3.0  ·  Python 3.10+  ·  uv
8 / 9
Live Demo
examples/README.md  ·  uv  ·  docker compose
9 / 9
Thank you!
Questions?
rx-controls-suite
github.com/scientific-software-hub/rx-controls-suite
The same model in every corner of the suite
RxTango/java
Java · jbang
· RxTango/python
Python · uv
· RxEpics/python
Python · caproto
zip · merge · buffer · scan · sample — same operators on every platform