BEAM:CURRENT and BEAM:POSITION
from two different PVs — and only process the pair
if both arrived in the same polling window. Discard if either fails."
await aioca.caget(pv1) + await aioca.caget(pv2) — still two separate awaits, still tornasyncio.gather(caget(pv1), caget(pv2)) — parallel, but pairing and error handling are manualpv_correlate
rx.interval(
timedelta(milliseconds=500), scheduler
).pipe(
ops.flat_map(
lambda _: rx.zip(
# Both reads fire in parallel
read_pv("BEAM:CURRENT", ctx),
read_pv("BEAM:POSITION", ctx),
).pipe(
# Combiner: only runs when BOTH complete
ops.map(lambda pair: process(*pair))
)
)
).subscribe(
on_next=log,
on_error=lambda e: log(f"ERROR: {e}")
)
BEAM:INTENSITY at 20 Hz, compute a 5-sample sliding
average, and only write the smoothed value back if it deviates more
than 10% from the previous write."
deque(maxlen=5) and the write-guard stateprev_write float — same scaffolding every timeContext.monitor() async — cleaner API, same manual window logicpv_sliding_average
rx.interval(
timedelta(milliseconds=50), scheduler # 20 Hz
).pipe(
ops.flat_map(
lambda _: read_pv("BEAM:INTENSITY", ctx)),
# sliding window — no deque, no index arithmetic
ops.buffer_with_count(5, 1),
ops.map(lambda w: statistics.mean(w)),
# skip write if within 10% of last written value
ops.distinct_until_changed(),
ops.flat_map(
lambda v: write_pv("BEAM:SETPOINT", v, ctx))
).subscribe(
on_next=log,
on_error=lambda e: log(f"ERROR: {e}")
)
| Platform | Library |
|---|---|
| Python | RxPY (reactivex) |
| Java / JVM | RxJava3, Project Reactor |
| Kotlin | Kotlin Flow |
| JavaScript | RxJS |
| .NET | System.Reactive |
| Swift / iOS | RxSwift |
list,
dict or
generator,
but for asynchronous sequences.
zip · merge · buffer_with_count · scan · throttle_with_timeout · distinct_until_changed
buffer_with_count()
accumulate, process in batches
throttle_with_timeout()
keep freshest value per window
sample()
emit latest at fixed intervals
ops.filter(gate)
explicitly drop excess items
caproto Context — single-shot reads & writes, CA monitor subscriptions (DBE_VALUE / DBE_ALARM / DBE_PROPERTY)
read_pv(pv, ctx) · write_pv(pv, val, ctx) ·
monitor_pv(pv, ctx)
— all rx.Observable, lazy, composable
map · zip · merge · buffer_with_count · filter · scan · throttle_with_timeout
— compose, transform, correlate, reduce streams
Pure Python functions — calibration, averaging, threshold checks. No I/O, no shared state.
Results written back to setpoint PVs, forwarded to pipelines, logged to archiver or HDF5
monitor_pv("DETECTOR:INTENSITY", ctx).pipe(
# noise reduction — no deque
ops.buffer_with_count(5, 1),
ops.map(lambda w: statistics.mean(w)),
# only act on out-of-range readings
ops.filter(out_of_range),
# write correction back
ops.flat_map(
lambda v: write_pv(
"MAGNET:SETPOINT", v, ctx))
).subscribe(on_next=apply, on_error=log)
# 1. Start the EPICS softIoc
docker compose up -d
# caproto softIoc with test.db
# TEST:CALC TEST:DOUBLE TEST:STRING
# 2. First example
uv run python examples/poll_pv.py \
TEST:CALC 500
# 3. The showstopper
uv run python examples/pv_pipeline.py