1 / 9
Tango Users Meeting
Reactive Programming
for Tango Controls
A reference implementation — and a programming model
Igor Khokhriakov  ·  Principal Software Engineer
2 / 9
Real Application 1 Correlated Multi-Attribute Reads Beyond bulk read
The scenario
"Every 500 ms, read current and beam_position from two different devices — and only process the pair if both arrived in the same polling window. Discard if either fails."
With today's tools…
  • read_attributes() — reduces round-trips, but only works on the same device
  • Two sequential reads across devices — a timing gap opens between them
  • Sardana environment monitor — if your facility is already on that stack
  • Custom thread pair with CountDownLatch + careful timeout logic
Most facilities end up implementing variations of this pattern — each slightly different, each with its own edge cases around failure handling and timing.
With Rx — zip()  ·  just demoed as correlate
Java · Single.zip()
Flowable.interval(500, MILLISECONDS)
  .flatMapSingle(tick -> Single.zip(

      // Both reads fire in parallel
      read(device1, "current")
          .subscribeOn(Schedulers.io()),
      read(device2, "beam_position")
          .subscribeOn(Schedulers.io()),

      // Combiner: only runs when BOTH complete
      (current, pos) -> process(current, pos)

  ))
  .blockingSubscribe(
      result -> log(result),
      err    -> log("ERROR: " + err)
  );
✓  If either read fails, the pair is silently dropped — never half-processed.
✓  Zero CountDownLatch. Zero shared state.
3 / 9
Real Application 2 Real-Time Stream Processing Beyond alarm daemons
The scenario
"Poll double_scalar at 20 Hz, compute a 5-sample sliding average, and only write the smoothed value back if it deviates more than 10% from the previous write."
With today's tools…
  • PANIC — excellent for threshold alerting, but this is not an alarm
  • Custom Python script: a deque, a counter, a threshold check, a write guard
  • A PANIC formula expression — if the formula engine happens to support it
  • A Sardana macro — if you're already in that stack
Most facilities end up reimplementing the same scaffolding — sliding windows, rate control, conditional writes — in slightly different ways each time.
With Rx — just demoed as sliding-avg + calibrate
Java · buffer + distinctUntilChanged
Flowable.interval(50, MILLISECONDS)     // 20 Hz
  .flatMapSingle(read(device, "double_scalar"))

  // Sliding window — no deque, no index arithmetic
  .buffer(5, 1)
  .map(window -> mean(window))

  // Skip write if within 10% of last written value
  .distinctUntilChanged(
      (prev, curr) ->
          Math.abs(curr - prev) / prev < 0.1
  )

  .flatMapSingle(write(device, "double_scalar_w"));
✓  No daemon. No formula DSL. No deque.
✓  Runs anywhere the JVM runs — Java, Kotlin, Groovy, Scala.
4 / 9
Framing Control Systems Are Already Streams We just don't model them that way
What your facility actually produces
SourceDevice attribute updates
TransportTango event streams  ·  CHANGE / PERIODIC / ARCHIVE
ProcessingAlarm streams  ·  telemetry streams
OutputFeedback signals  ·  correction commands
How most application code treats it today
poll store check write
// loops, threads, shared state,
// try/catch scattered everywhere
Reactive programming treats it as streams from the start
source operator operator subscriber
// composable, declarative, back-pressure-aware
// error handling is part of the type system
Reactive programming does not introduce a new concept.
It gives your system's existing stream nature a proper programming model.
5 / 9
Concepts Reactive Streams — 2 Minutes Not a library. A specification.
The Contract — reactive-streams.org
Publisher<T> Produces items — one method: subscribe()
Subscriber<T> onNext / onError / onComplete
Subscription Back-pressure: request(n) and cancel()
Processor<T,R> Both Publisher and Subscriber
4 interfaces. Everything else is implementation.
All RxJTango publishers are TCK-verified against this spec.
Multiplatform — Learn Once, Use Everywhere
PlatformLibrary
Java / JVMRxJava3, Project Reactor
KotlinKotlin Flow
JavaScriptRxJS
PythonRxPY
.NETSystem.Reactive
Swift / iOSRxSwift
Python scripts, Java servers, JS dashboards — all talking to the same devices. One paradigm across all of them.
Mental Model — It's a Data Structure
source
operator
operator
subscriber
⚠ Common misconception A reactive stream is not just a dynamic event bus.
It is a lazy, composable data structure — like List, Vector or Set, but for asynchronous sequences.
Nothing runs until someone subscribes. Operators build a description of computation, not a running pipeline.
Key operators used in demos:
zip · merge · buffer · scan · throttleLast · distinctUntilChanged
Back-Pressure — Flow Control Matters in Control Systems
What happens when producers are faster than consumers?
  • Detector emitting events faster than analysis code
  • Device bursting change events during a scan
  • Multiple devices feeding a single processing pipeline
buffer() accumulate, process in batches
throttleLast() keep freshest value per window
sample() emit latest at fixed intervals
onBackpressureDrop() explicitly drop excess items
The application explicitly defines how overload is handled — not implicitly through silent buffer overflows.
6 / 9
Library RxJTango — Data Flow reference implementation

Tango Attribute / Event / Command

Device proxy — attribute reads & writes, command calls, CHANGE / PERIODIC / ARCHIVE event subscriptions

Reactive Publisher

RxTangoAttribute · RxTangoAttributeWrite · RxTangoCommand · RxTangoAttributeChangePublisher — all spec-compliant Publisher<T>

Rx Operators

map · zip · merge · buffer · filter · scan · throttleLast — compose, transform, correlate, reduce streams

Application Logic

Pure functions — calibration, averaging, threshold checks. No I/O, no shared state.

Commands / Writes / Downstream

Results written back to devices, forwarded to pipelines, logged to telemetry systems

Real-World Example
Beamline Feedback Loop
Detector intensity → sliding average → drift detection → magnet correction
Java · feedback pipeline
detectorStream
  // noise reduction — no circular buffer
  .buffer(5, 1)
  .map(window -> mean(window))

  // only act on out-of-range readings
  .filter(v -> outOfRange(v))

  // issue correction command
  .flatMapSingle(
      write(magnet, "setpoint"));
Same pattern, from single beamline feedback to facility-wide telemetry — without changing the model.
Library-agnostic · org.reactivestreams interfaces · TCK-verified · zero build step (jbang)
7 / 9
Demo What We'll Run 11 runnable examples · jbang · no build step
Basic
read-attribute Single-shot Publisher — simplest case
poll Continuous read — no loop
Coordination
snapshot Parallel reads, concurrent by default
correlate ★ zip — guaranteed atomic pair
Stream Processing
alarm ★ merge — isolated per-device failure
sliding-avg ★ buffer(N,1) — rolling mean, no deque
throttle Rate control with throttleLast
running-stats Live streaming stats with scan
Composition
calibrate Read → transform → write pipeline
pipeline ★ Showstopper — fluent 6-step chain
Start here
shell
# 1. Bring up the Tango stack
docker compose up -d
# MariaDB → DatabaseDS → TangoTest

# 2. First example
jbang read-attribute@. \
  tango://localhost:10000/sys/tg_test/1 \
  double_scalar

# 3. The showstopper
jbang pipeline@. \
  tango://localhost:10000/sys/tg_test/1
All examples run from IntelliJ IDEA with a single click on the Run gutter button.
No Maven. No Gradle. No project setup.
★ directly address the real-world patterns shown after the demo.
github.com/Ingvord/RxJTango
Apache-2.0  ·  Java 11+  ·  jbang
8 / 9
Live Demo
examples/README.md  ·  jbang  ·  docker compose
9 / 9
Thank you!
Questions?
Reference implementation
github.com/Ingvord/RxJTango
The very same approach works for EPICS
github.com/Ingvord/RxEpics
Same spec · same operators · different control system underneath