1 / 11
rx-controls-suite · Combined Demo
Storage Ring × Beamline
One Reactive Pipeline
One Python process · Two control systems · Same operator vocabulary
Igor Khokhriakov  ·  Principal Software Engineer
2 / 11
The Facility Two Control Systems. One Experiment. storage ring → beamline
Tango Controls — C++ cppTango
sr/demo/controller
BeamCurrent→ acquisition gate
InterlockCount→ abort trigger
ScenarioId (rw) — inject fault via write
sr/demo/sector04
OrbitX→ quality flag per frame
VacuumPressure, BeamLossFraction — feed interlock
EPICS Channel Access — softIoc
TOMO:ROT:*
VAL, MOVN, SPEED — rotation stage control
TOMO:DET:*
ACQUIRE, COUNTS, ACQUIRING, EXPOSURE — detector
TOMO:SHUTTER:*
OPEN — beam shutter
TOMO:BEAM:POSX/Y
beam_posx, beam_posy — position diagnostics
Cross-system coupling — one client
Four reactive patterns
🛡 Beam-loss gatefilter + take(1)
Shutter supervisordistinct_until_changed
🔬 Quality flaggingrx.zip (Tango + EPICS)
Vacuum-burst aborttake_until
📦 Backpressureshare() + sample()
One shared health stream
ring_health(scheduler)
→ polls Tango at 1 Hz · shared by all downstream operators
Fault injection (second terminal)
python inject_fault.py beam_loss
python inject_fault.py orbit_drift
python inject_fault.py vacuum_burst
python inject_fault.py nominal
3 / 11
The Showstopper Cross-System rx.zip — One Operator, Two Protocols five reads · in parallel · atomic frame
The scenario
"After each exposure, read detector counts and beam position from EPICS and ring current and orbit deviation from Tango — simultaneously, atomically. Tag the frame with a quality flag computed from the orbit."
Traditional approach
  • Read Tango attributes sequentially (one DeviceProxy call at a time)
  • Read EPICS PVs in a separate loop or thread
  • Correlate timestamps in post-processing
  • Manage failures from two different exception hierarchies
  • Shared buffer between threads + Lock + timestamp matching
In practice: two background threads, a correlation queue, ~50 lines of glue code — before any science logic.
With rx.zip — one operator, two control systems
Python · rx.zip across EPICS + Tango
ops.flat_map(lambda _: rx.zip(

    # EPICS Channel Access (caproto)
    read_pv("TOMO:DET:COUNTS",  ctx),
    read_pv("TOMO:BEAM:POSX",   ctx),
    read_pv("TOMO:BEAM:POSY",   ctx),

    # Tango Controls (PyTango)
    read_attribute(CONTROLLER, "BeamCurrent"),
    read_attribute(SECTOR_04,  "OrbitX"),

)),
ops.map(lambda r: (
    float(r[0]), float(r[1]), float(r[2]),  # EPICS
    float(r[3]), float(r[4]),               # Tango
    abs(float(r[4])) < ORBIT_ALARM,        # quality
))
✓  All five requests fire in parallel.
✓  Frame emitted only when all five complete.
✓  Failure in any one → frame dropped, never half-written.
4 / 11
Pattern 0 Ring Health as a Shared Observable one poll, many subscribers
facility.py — the shared source
Python · ring_health()
Health = namedtuple("Health",
    ["current", "interlocks", "orbit_x"])

def ring_health(scheduler, interval_ms=1000):
    return rx.interval(
        timedelta(milliseconds=interval_ms),
        scheduler=scheduler,
    ).pipe(
        # Read three ring attributes simultaneously
        ops.flat_map(lambda _: rx.zip(
            read_attribute(CONTROLLER, "BeamCurrent"),
            read_attribute(CONTROLLER, "InterlockCount"),
            read_attribute(SECTOR_04,  "OrbitX"),
        )),
        ops.map(lambda t: Health(
            current=float(t[0]),
            interlocks=int(t[1]),
            orbit_x=float(t[2]),
        )),
        # One poll stream → many subscribers
        ops.share(),
    )
Why share()?
  • One TCP connection to the Tango server per interval tick
  • Three subscribers share the same emission: shutter supervisor, abort trigger, per-projection gate
  • No duplicate network traffic — no matter how many operators listen
  • Deterministic timing — all subscribers see the same Health value per tick
Subscribers of health in this demo
Python · three consumers
# 1. Shutter supervisor
health.pipe(map(beam_ok), distinct_until_changed,
            flat_map(write_pv(SHUTTER))).subscribe(...)

# 2. Abort trigger (in take_until)
abort = health.pipe(filter(interlocks > 0), take(1))

# 3. Per-projection gate
health.pipe(filter(is_healthy), take(1), ignore_elements())
✓  One ring poll subscription per second.
✓  All three consumers see the same emission.
✓  No external message bus, no shared dict.
5 / 11
Pattern 1 Beam-Loss Recovery — Gate + Shutter Supervisor filter · take(1) · distinct_until_changed
Per-projection gate
Python · wait_healthy
# Before each projection: wait for ring health
wait_healthy = health.pipe(
    ops.filter(is_healthy),    # ≥ 50 mA, no interlocks
    ops.take(1),               # complete on first match
    ops.ignore_elements(),     # timing only
)

# Each projection is gated
return rx.concat(wait_healthy, acquire(...))
Shutter supervisor — runs throughout
Python · 3 operators
health.pipe(
    ops.map(lambda h:
        h.current >= MIN_BEAM_CURRENT),
    ops.distinct_until_changed(), # only on transitions
    ops.flat_map(lambda ok: write_pv(
        "TOMO:SHUTTER:OPEN", 1 if ok else 0, ctx)),
).subscribe(on_next=print, scheduler=scheduler)
Demo sequence
shell
# Terminal A: scan running
python guarded_scan.py --ascii

# Terminal B: inject beam loss
python inject_fault.py beam_loss
# → ring current → 25 mA
# → shutter supervisor writes TOMO:SHUTTER:OPEN = 0
# → next projection's wait_healthy blocks
# → "[proj N] waiting for healthy beam..."

# Terminal B: restore
python inject_fault.py nominal
# → ring current → ~100 mA
# → shutter opens (distinct_until_changed fires)
# → wait_healthy unblocks → projection starts
  • distinct_until_changed — shutter only toggles on state transitions, not on every tick
  • wait_healthy — scan never starts a projection into bad beam
  • Zero explicit state machine. Three operators handle pause, resume, and shutter coordination
6 / 11
Pattern 2 Orbit-Drift Quality Flagging cross-system zip · quality computed at acquisition time
"Tag every tomography frame with the storage-ring orbit deviation at the exact moment the detector exposure completed — no timestamp correlation, no post-processing join."
How it works
Python · quality from Tango orbit
# After detector completes — cross-system zip:
ops.flat_map(lambda _: rx.zip(
    read_pv("TOMO:DET:COUNTS",  ctx),   # EPICS
    read_pv("TOMO:BEAM:POSX",   ctx),   # EPICS
    read_pv("TOMO:BEAM:POSY",   ctx),   # EPICS
    read_attribute(CONTROLLER,  # Tango
        "BeamCurrent"),
    read_attribute(SECTOR_04,   # Tango
        "OrbitX"),
)),
ops.map(lambda r: (
    ...,
    abs(float(r[4])) < 55.0,  # quality_ok
))
HDF5 dtype — quality per frame
Python · HDF5 dataset
dtype=np.dtype([
    ("timestamp",    "f8"),
    ("proj_index",   "i4"),
    ("angle",        "f4"),
    ("counts",       "f8"),     # EPICS
    ("beam_posx",    "f4"),     # EPICS
    ("beam_posy",    "f4"),     # EPICS
    ("ring_current", "f4"),     # Tango ← ring
    ("orbit_x",      "f4"),     # Tango ← ring
    ("quality_ok",   "?"),      # True/False
])
shell · inject orbit drift
python inject_fault.py orbit_drift
# orbit_x → drifts past 55 µm
# → frames tagged quality_ok = False
# → live display shows ~ instead of ✓
✓  Quality is computed co-temporaneously with acquisition.
✓  No background thread, no ring-state buffer, no join key.
✓  One rx.zip per projection — atomically correct.
7 / 11
Pattern 3 Vacuum-Burst Abort — take_until one operator terminates a 360-projection scan
"If any interlock fires during the scan, terminate immediately — close the shutter, write ABORTED status, stop all acquisition."
take_until wraps the entire scan
Python · abort_trigger + take_until
# Abort fires once: first emission with interlocks > 0
abort_trigger = health.pipe(
    ops.filter(lambda h: h.interlocks > 0),
    ops.take(1),
    ops.do_action(on_next=lambda h: (
        scan_aborted.set(),
        print(f"⚠ VACUUM BURST: "
              f"interlocks={h.interlocks}")
    )),
)

# Wrap the entire 360-projection scan
scan = rx.concat(
    setup, *projections, teardown_done
).pipe(
    ops.take_until(abort_trigger)   # ← one line
)

# Emergency teardown (after scan_done.wait())
if scan_aborted.is_set():
    rx.zip(
        write_pv("TOMO:SCAN:STATUS", SCAN_ABORTED, ctx),
        write_pv("TOMO:SHUTTER:OPEN", 0, ctx),
    ).subscribe(...)
Demo sequence
shell
python inject_fault.py vacuum_burst
# C++ simulator: VacuumPressure → high
# → sector interlocks fire
# → InterlockCount on controller > 0
# → abort_trigger emits once
# → take_until completes the scan stream
# → scan_aborted.set()
# → emergency teardown fires:
#   TOMO:SCAN:STATUS = ABORTED (3)
#   TOMO:SHUTTER:OPEN = 0
  • take_until — terminates an arbitrarily complex rx.concat chain from outside
  • No try/except in the scan logic — the abort is a first-class observable
  • Composable — add more abort conditions with a second filter into abort_trigger
  • The abort happens at the Rx level — the scan never sees it as an exception
8 / 11
Pattern 4 Backpressure — share() + sample() HDF5 keeps all · display drops under load
"The HDF5 writer must keep every frame. The live display only needs to refresh at human speed. When the scan is fast, the display should drop frames silently — without slowing the acquisition."
share() splits one stream into two branches
Python · share + sample
# One execution of the scan
source = guarded_scan(...).pipe(ops.share())

# Branch 1: HDF5 writer — receives every frame
source.subscribe(
    on_next=write_frame,       # must not block
    scheduler=scheduler,
)

# Branch 2: live display — throttled at 250 ms
source.pipe(
    ops.sample(timedelta(milliseconds=250),
               scheduler=scheduler),
).subscribe(
    on_next=display_frame,     # allowed to be slow
    scheduler=scheduler,
)
  • share() — turns the cold Observable hot: one subscription to the source, N observers
  • sample(250 ms) — keeps the most recent frame per window; drops the rest
  • The slow consumer never backpressures the fast producer — they are independent
  • The application chooses the overload policy explicitly — not silently swallowed by a queue
Python · summary stats at the end
print(f"Frames acquired: {hdf5_written}")
print(f"Frames displayed: {display_shown}")
# Actual drop rate = 1 - displayed / acquired
# e.g.: acquired 360, displayed 12 → 97% dropped
# HDF5 has all 360 — zero data loss
✓  HDF5: zero frame loss at any scan speed.
✓  Display: automatic drop, no queue overflow.
✓  One operator per branch — no manual accounting.
9 / 11
The Hero guarded_scan.py — All Four Patterns ~60 lines of declarative pipeline
Setup — one shared source drives everything
Python · guarded_scan.py (condensed)
health = ring_health(scheduler)      # shared Tango poll

# Pattern 1: shutter supervisor (3 operators)
supervisor = health.pipe(
    ops.map(lambda h: h.current >= 50),
    ops.distinct_until_changed(),
    ops.flat_map(lambda ok:
        write_pv(SHUTTER, 1 if ok else 0, ctx)),
).subscribe(...)

# Pattern 3: abort trigger (1 filter + take)
abort = health.pipe(
    ops.filter(lambda h: h.interlocks > 0),
    ops.take(1),
)

# Pattern 1: per-projection health gate
wait = health.pipe(
    ops.filter(is_healthy), ops.take(1),
    ops.ignore_elements(),
)

# Scan: each projection is gated + cross-system zip
scan = rx.concat(
    setup,
    *[rx.concat(wait, acquire(angle, i, health))
      for i, angle in enumerate(angles)],
    teardown,
).pipe(ops.take_until(abort))        # Pattern 3

# Pattern 4: share + sample
source = scan.pipe(ops.share())
source.subscribe(on_next=write_frame)
source.pipe(ops.sample(250ms)).subscribe(
    on_next=display_frame)
Cross-system zip inside each projection
Python · Pattern 2 — quality per frame
# After detector exposure completes:
ops.flat_map(lambda _: rx.zip(
    read_pv("TOMO:DET:COUNTS",  ctx),   # EPICS
    read_pv("TOMO:BEAM:POSX",   ctx),   # EPICS
    read_pv("TOMO:BEAM:POSY",   ctx),   # EPICS
    read_attribute(CTRL, "BeamCurrent"),# Tango
    read_attribute(SEC4, "OrbitX"),     # Tango
)),
ops.map(lambda r: (
    *r[:5],
    abs(r[4]) < ORBIT_ALARM,  # quality
))
  • One file, 60 lines of pipeline logic
  • Five operators handle all four fault scenarios
  • No threads, no locks, no flags, no state machine
  • Adding a new fault: one more filter + hook into take_until
✓  guarded_scan.py + facility.py = ~200 lines total
✓  Equivalent threading code: 800+ lines
✓  Each new requirement adds operators, not complexity
10 / 11
Live Demo
demo/synchrotron-beamline/  ·  docker compose  ·  inject_fault.py
11 / 11
Thank you!
Questions?
rx-controls-suite
github.com/scientific-software-hub/rx-controls-suite
The same operators across every control system
RxTango/java
Java · jbang
· RxTango/python
Python · uv
· RxEpics/python
Python · caproto
· Combined
Tango + EPICS
zip · merge · share · distinct_until_changed · take_until · sample