| Tango | EPICS | |
|---|---|---|
| Java | RxTango/java ✓ RxJava3 · jbang · TCK-verified |
future |
| Python | RxTango/python ✓ RxPY v4 · uv · unit-tested |
RxEpics/python ✓ RxPY v4 · caproto |
zip · merge · buffer_with_count · scan · sample · flat_mapcurrent and beam_position
from two different devices — and only process the pair
if both arrived in the same polling window. Discard if either fails."
correlate.py
rx.interval(timedelta(ms=500), scheduler=scheduler).pipe(
ops.flat_map(lambda _: rx.zip(
# Both reads fire in parallel
read_attribute(device1, "current"),
read_attribute(device2, "beam_position"),
))
).subscribe(
on_next=lambda pair: process(*pair),
on_error=lambda e: log(e),
scheduler=scheduler,
)
double_scalar at 20 Hz, compute a 5-sample sliding
average, and only write the smoothed value back if it deviates more
than 10% from the previous write."
deque, a counter, a threshold check, a write guardsliding_average.py + pipeline.py
rx.interval(timedelta(ms=50), scheduler=scheduler).pipe(
ops.flat_map(
lambda _: read_attribute(device, "double_scalar")),
# Sliding window — no deque, no index arithmetic
ops.buffer_with_count(count=5, skip=1),
ops.map(lambda w: sum(w) / len(w)),
# Skip write if within 10% of last written value
ops.distinct_until_changed(
comparer=lambda p, c: abs(c - p) / (p or 1e-9) < 0.1
),
ops.flat_map(
lambda v: write_attribute(device, "double_scalar_w", v)),
).subscribe(on_next=print, scheduler=scheduler)
| Platform | Library |
|---|---|
| Java / JVM | RxJava3, Project Reactor |
| Kotlin | Kotlin Flow |
| JavaScript | RxJS |
| Python | RxPY (reactivex) ← here |
| .NET | System.Reactive |
| Swift / iOS | RxSwift |
list,
dict or
set,
but for asynchronous sequences.
zip · merge · buffer_with_count · scan · sample · distinct_until_changed
buffer_with_count()
accumulate, process in batches
sample()
keep freshest value per window
throttle_with_timeout()
rate-limit by time
ops.catch()
isolate per-device errors
Attribute reads & writes, command calls, event subscriptions.
Blocking calls bridged to asyncio via run_in_executor.
read_attribute · write_attribute ·
execute_command · monitor_attribute
— all rx.Observable via rx.create
map · zip · merge · buffer_with_count · filter · scan · sample
— compose, transform, correlate, reduce streams
Pure functions — calibration, averaging, threshold checks. No I/O, no shared state.
Results written back to devices, forwarded to pipelines, logged to telemetry systems
rx.interval(timedelta(ms=50), scheduler=sched).pipe(
ops.flat_map(
lambda _: read_attribute(detector, "intensity")),
# noise reduction — no circular buffer
ops.buffer_with_count(count=5, skip=1),
ops.map(lambda w: sum(w) / len(w)),
# only act on out-of-range readings
ops.filter(lambda v: out_of_range(v)),
# issue correction
ops.flat_map(
lambda v: write_attribute(magnet, "setpoint", v)),
).subscribe(on_next=log, scheduler=sched)
# 1. Bring up the Tango stack
docker compose up -d
# MariaDB → DatabaseDS → TangoTest
# 2. Install
cd RxTango/python
uv pip install -e .
# 3. First example
python examples/read_attribute.py
# 4. The showstopper
python examples/pipeline.py \
tango://localhost:10000/sys/tg_test/1
pytest -v — mocked DeviceProxy,
TestScheduler marble tests.