1 / 6
DESY · TINE Middleware
Reactive Programming
for TINE Controls
A reference implementation — and a programming model
Igor Khokhriakov  ·  Principal Software Engineer
2 / 6
Pattern 1 Correlated Multi-Property Reads Beyond sequential TLink.executeAndClose()
The scenario
"Every 500 ms, read CURRENT from /HERA/Magnets/QF1 and XPOS from /HERA/Diagnostics/BPM01 — only process the pair if both arrive in the same polling window. Discard if either fails."
With today's tools…
  • Two sequential TLink.executeAndClose() calls — a timing gap opens between them
  • Thread pool + CountDownLatch — parallel, but pairing and error handling are manual
  • No built-in "atomic multi-property read" primitive in TINE Java — every project rolls its own
Most facilities end up implementing variations of this pattern — each slightly different, each with its own edge cases around failure handling and timing.
With Rx — Single.zip()
Java · Single.zip()
Flowable.interval(500, MILLISECONDS)
  .flatMapSingle(tick -> Single.zip(

      // Both reads fire in parallel
      TineClient.read("/HERA/Magnets/QF1", "CURRENT")
          .subscribeOn(Schedulers.io()),
      TineClient.read("/HERA/Diagnostics/BPM01", "XPOS")
          .subscribeOn(Schedulers.io()),

      // Only runs when BOTH complete
      (current, pos) -> process(current[0], pos[0])

  ));
✓  If either read fails, the pair is silently dropped — never half-processed.
✓  Zero CountDownLatch. Zero shared state.
✓  TDataType returns arrays — take [0] for scalars.
3 / 6
Pattern 2 Real-Time Monitor Stream Processing Beyond CM_POLL callbacks
The scenario
"Monitor INTENSITY from /HERA/Beam/Monitor, compute a 5-sample sliding average, and write the smoothed value back only if it deviates more than 10% from the last write."
With today's tools…
  • TLink.attach(CM_POLL, ...) callback — you manage the ring buffer and write-guard state yourself
  • Custom ArrayDeque + a counter + a prevWrite float — same scaffolding, every project
  • Cancellation and resource cleanup are entirely manual
Most facilities end up reimplementing the same scaffolding — sliding windows, rate control, conditional writes — in slightly different ways each time.
With Rx — RxTineMonitor + buffer + distinctUntilChanged
Java · buffer(5,1) + distinctUntilChanged
TineClient.monitor("/HERA/Beam/Monitor", "INTENSITY")

  // sliding window — no deque, no index arithmetic
  .buffer(5, 1)
  .map(window -> mean(window))

  // skip write if within 10% of last written value
  .distinctUntilChanged(
      (prev, curr) ->
          Math.abs(curr - prev) / prev < 0.1
  )

  .flatMapSingle(
      v -> TineClient.write(
          "/HERA/Beam/Corrector", "SETPOINT", v));
✓  No deque. No counter. No write-guard variable.
✓  Dispose the subscription to cancel — no manual detach().
4 / 6
Pattern 3 Alarm Fan-In Beyond per-device polling loops
The scenario
"Watch three RF cavities. When any one reports a fault, notify the operator immediately. The other two must continue monitoring unaffected."
With today's tools…
  • One TLink.attach(CM_POLL, ...) per device — three separate callback registrations
  • Manual fan-in: shared queue, lock, or a third-party aggregator thread
  • One device failing breaks the others if state is shared
Each additional device to watch is another manual registration, another edge case, another chance for a threading bug.
With Rx — Flowable.merge()
Java · Flowable.merge()
Flowable.merge(
    TineClient.monitor(
        "/HERA/RF/Cavity01", "STATUS"),
    TineClient.monitor(
        "/HERA/RF/Cavity02", "STATUS"),
    TineClient.monitor(
        "/HERA/RF/Cavity03", "STATUS")
)
.filter(s -> s[0] != STATUS_OK)
.subscribe(
    alarm  -> notifyOperator(alarm),
    error  -> log.error("monitor failed", error)
);
✓  Any one failing device propagates — others continue.
✓  Add a fourth cavity: one line.
✓  No shared state. No locks.
5 / 6
Library RxTine — Data Flow reference implementation

TINE Property

Address: /Context/Server/Device + property name (e.g. SENSOR). Single-shot reads via TLink.executeAndClose(); push via TLink.attach(CM_POLL).

Reactive Publisher

RxTineRead<T> · RxTineWrite<T> — single-shot  ·  RxTineMonitor<T> — push, CM_POLL-backed.
All spec-compliant Publisher<T>.

Rx Operators

map · zip · merge · buffer · filter · scan · distinctUntilChanged — compose, transform, correlate, reduce streams

Application Logic

Pure Java functions — calibration, averaging, threshold checks. No I/O, no shared state.

Writes / Downstream

Results written back to TINE properties, forwarded to pipelines, logged to telemetry systems

Key difference from Tango
TINE has no commands
Write to a property instead — RxTineWrite emits the written value, so the same pipeline composition applies.
Java · feedback pipeline
TineClient
  .monitor("/HERA/Beam/Monitor", "INTENSITY")
  .buffer(5, 1)
  .map(w -> mean(w))
  .filter(v -> outOfRange(v))
  .flatMapSingle(
      v -> TineClient.write(
          "/HERA/Beam/Corrector",
          "SETPOINT", v))
  .subscribe(v -> log("corrected: " + v));
org.reactivestreams · TCK-verified · zero build step (jbang) · Java 11+
6 / 6
Thank you!
Questions?
Reference implementation
github.com/scientific-software-hub/rx-controls-suite
AGPL-3.0  ·  Java 11+  ·  jbang  ·  RxJava3  ·  TINE Java API
The very same approach works for Tango Controls
Reactive Programming for Tango Controls →
Same spec · same operators · Java / jbang / RxJava3