1 / 9
EPICS Collaboration Meeting
Reactive Programming
for EPICS Controls
A reference implementation — and a programming model
Igor Khokhriakov  ·  Principal Software Engineer
2 / 9
Real Application 1 Correlated Multi-PV Reads Beyond sequential caget()
The scenario
"Every 500 ms, read BEAM:CURRENT and BEAM:POSITION from two different PVs — and only process the pair if both arrived in the same polling window. Discard if either fails."
With today's tools…
  • caget() from pyepics — sequential; two calls means a timing gap
  • await aioca.caget(pv1) + await aioca.caget(pv2) — still two separate awaits, still torn
  • asyncio.gather(caget(pv1), caget(pv2)) — parallel, but pairing and error handling are manual
  • No built-in "atomic multi-PV read" primitive in CA — every facility rolls its own
Most facilities end up implementing variations of this pattern — each slightly different, each with its own edge cases around failure handling and timing.
With Rx — zip()  ·  just demoed as pv_correlate
Python · rx.zip()
rx.interval(
    timedelta(milliseconds=500), scheduler
).pipe(
    ops.flat_map(
        lambda _: rx.zip(
            # Both reads fire in parallel
            read_pv("BEAM:CURRENT",   ctx),
            read_pv("BEAM:POSITION",  ctx),
        ).pipe(
            # Combiner: only runs when BOTH complete
            ops.map(lambda pair: process(*pair))
        )
    )
).subscribe(
    on_next=log,
    on_error=lambda e: log(f"ERROR: {e}")
)
✓  If either read fails, the pair is silently dropped — never half-processed.
✓  Zero asyncio.Event. Zero shared state.
3 / 9
Real Application 2 Real-Time Stream Processing Beyond monitor callbacks
The scenario
"Monitor BEAM:INTENSITY at 20 Hz, compute a 5-sample sliding average, and only write the smoothed value back if it deviates more than 10% from the previous write."
With today's tools…
  • pyepics ca.Monitor callback — you bring the deque(maxlen=5) and the write-guard state
  • Manual accumulation: a counter, a threshold check, a prev_write float — same scaffolding every time
  • IOC-side CALC record chain — only for simple cases; becomes unmaintainable quickly
  • p4p Context.monitor() async — cleaner API, same manual window logic
Most facilities end up reimplementing the same scaffolding — sliding windows, rate control, conditional writes — in slightly different ways each time.
With Rx — just demoed as pv_sliding_average
Python · buffer_with_count + distinct_until_changed
rx.interval(
    timedelta(milliseconds=50), scheduler   # 20 Hz
).pipe(
    ops.flat_map(
        lambda _: read_pv("BEAM:INTENSITY", ctx)),

    # sliding window — no deque, no index arithmetic
    ops.buffer_with_count(5, 1),
    ops.map(lambda w: statistics.mean(w)),

    # skip write if within 10% of last written value
    ops.distinct_until_changed(),

    ops.flat_map(
        lambda v: write_pv("BEAM:SETPOINT", v, ctx))
).subscribe(
    on_next=log,
    on_error=lambda e: log(f"ERROR: {e}")
)
✓  No deque. No counter. No write-guard variable.
✓  Runs anywhere Python 3.12+ runs — scripts, notebooks, services.
4 / 9
Framing Control Systems Are Already Streams We just don't model them that way
What your facility actually produces
SourceProcess variable (PV) updates
TransportChannel Access (CA) monitor subscriptions  ·  DBE_VALUE / DBE_ALARM
ProcessingAlarm records  ·  soft channel records  ·  archiver data
OutputSetpoint writes  ·  soft interlocks  ·  beam-mode changes
How most application code treats it today
poll store check write
// loops, threads, shared state,
// callbacks scattered everywhere
Reactive programming treats it as streams from the start
source operator operator subscriber
# composable, declarative, back-pressure-aware
# error handling is part of the type system
Reactive programming does not introduce a new concept.
It gives your system's existing stream nature a proper programming model.
5 / 9
Concepts Reactive Streams — 2 Minutes Not a library. A specification.
The Contract — reactive-streams.org
Publisher<T> Produces items — one method: subscribe()
Subscriber<T> on_next / on_error / on_completed
Subscription Back-pressure: request(n) and cancel()
Processor<T,R> Both Publisher and Subscriber
4 interfaces. Everything else is implementation.
RxEpics observables are built on reactivex (RxPY v4), which implements this spec for Python.
Multiplatform — Learn Once, Use Everywhere
PlatformLibrary
PythonRxPY (reactivex)
Java / JVMRxJava3, Project Reactor
KotlinKotlin Flow
JavaScriptRxJS
.NETSystem.Reactive
Swift / iOSRxSwift
Python scripts, Java servers, JS dashboards — all talking to the same devices. One paradigm across all of them.
Mental Model — It's a Data Structure
source
operator
operator
subscriber
⚠ Common misconception A reactive stream is not just a dynamic callback chain.
It is a lazy, composable data structure — like list, dict or generator, but for asynchronous sequences.
Nothing runs until someone subscribes. Operators build a description of computation, not a running pipeline.
Key operators used in demos:
zip · merge · buffer_with_count · scan · throttle_with_timeout · distinct_until_changed
Back-Pressure — Flow Control Matters in Control Systems
What happens when producers are faster than consumers?
  • Detector emitting events faster than analysis code
  • PV bursting DBE_VALUE updates during a scan
  • Multiple PVs feeding a single processing pipeline
buffer_with_count() accumulate, process in batches
throttle_with_timeout() keep freshest value per window
sample() emit latest at fixed intervals
ops.filter(gate) explicitly drop excess items
The application explicitly defines how overload is handled — not implicitly through silent callback queue overflow.
6 / 9
Library RxEpics — Data Flow reference implementation

EPICS Channel Access PV

caproto Context — single-shot reads & writes, CA monitor subscriptions (DBE_VALUE / DBE_ALARM / DBE_PROPERTY)

Reactive Observable

read_pv(pv, ctx) · write_pv(pv, val, ctx) · monitor_pv(pv, ctx) — all rx.Observable, lazy, composable

Rx Operators

map · zip · merge · buffer_with_count · filter · scan · throttle_with_timeout — compose, transform, correlate, reduce streams

Application Logic

Pure Python functions — calibration, averaging, threshold checks. No I/O, no shared state.

Writes / Downstream

Results written back to setpoint PVs, forwarded to pipelines, logged to archiver or HDF5

Real-World Example
Beamline Feedback Loop
Detector intensity → sliding average → drift detection → magnet correction
Python · feedback pipeline
monitor_pv("DETECTOR:INTENSITY", ctx).pipe(
    # noise reduction — no deque
    ops.buffer_with_count(5, 1),
    ops.map(lambda w: statistics.mean(w)),

    # only act on out-of-range readings
    ops.filter(out_of_range),

    # write correction back
    ops.flat_map(
        lambda v: write_pv(
            "MAGNET:SETPOINT", v, ctx))
).subscribe(on_next=apply, on_error=log)
Same pattern, from single beamline feedback to facility-wide telemetry — without changing the model.
Library-agnostic · rx.Observable · asyncio-native · zero build step (uv)
7 / 9
Demo What We'll Run 10 runnable examples · uv · no build step
Basic
poll_pv Continuous read — no loop
monitor_pv Push updates — no polling at all
Coordination
multi_pv_snapshot Parallel reads, concurrent by default
pv_correlate ★ zip — guaranteed atomic pair
Stream Processing
alarm_monitor ★ merge — isolated per-PV failure
pv_sliding_average ★ buffer_with_count(N,1) — rolling mean
pv_throttle Rate control with throttle_with_timeout
pv_running_stats Live streaming stats with scan
Composition
calibration_pipeline Read → transform → write pipeline
pv_pipeline ★ Showstopper — fluent 6-step chain
Start here
shell
# 1. Start the EPICS softIoc
docker compose up -d
# caproto softIoc with test.db
# TEST:CALC  TEST:DOUBLE  TEST:STRING

# 2. First example
uv run python examples/poll_pv.py \
    TEST:CALC 500

# 3. The showstopper
uv run python examples/pv_pipeline.py
All examples run from any Python IDE with a single click — or directly in the terminal with uv run.
No venv setup. No pip install.
★ directly address the real-world patterns shown after the demo.
github.com/scientific-software-hub/rx-controls-suite
AGPL-3.0  ·  Python 3.12+  ·  reactivex + caproto
8 / 9
Live Demo
examples/README.md  ·  uv  ·  docker compose
9 / 9
Thank you!
Questions?
Reference implementation
github.com/scientific-software-hub/rx-controls-suite
AGPL-3.0  ·  Python 3.12+  ·  reactivex + caproto
The very same approach works for Tango Controls
Reactive Programming for Tango Controls →
Same spec · same operators · Java / jbang / RxJava3