current and beam_position
from two different devices — and only process the pair
if both arrived in the same polling window. Discard if either fails."
correlate
Flowable.interval(500, MILLISECONDS)
.flatMapSingle(tick -> Single.zip(
// Both reads fire in parallel
read(device1, "current")
.subscribeOn(Schedulers.io()),
read(device2, "beam_position")
.subscribeOn(Schedulers.io()),
// Combiner: only runs when BOTH complete
(current, pos) -> process(current, pos)
))
.blockingSubscribe(
result -> log(result),
err -> log("ERROR: " + err)
);
double_scalar at 20 Hz, compute a 5-sample sliding
average, and only write the smoothed value back if it deviates more
than 10% from the previous write."
deque, a counter, a threshold check, a write guardsliding-avg + calibrate
Flowable.interval(50, MILLISECONDS) // 20 Hz
.flatMapSingle(read(device, "double_scalar"))
// Sliding window — no deque, no index arithmetic
.buffer(5, 1)
.map(window -> mean(window))
// Skip write if within 10% of last written value
.distinctUntilChanged(
(prev, curr) ->
Math.abs(curr - prev) / prev < 0.1
)
.flatMapSingle(write(device, "double_scalar_w"));
| Platform | Library |
|---|---|
| Java / JVM | RxJava3, Project Reactor |
| Kotlin | Kotlin Flow |
| JavaScript | RxJS |
| Python | RxPY |
| .NET | System.Reactive |
| Swift / iOS | RxSwift |
List,
Vector or
Set,
but for asynchronous sequences.
zip · merge · buffer · scan · throttleLast · distinctUntilChanged
buffer()
accumulate, process in batches
throttleLast()
keep freshest value per window
sample()
emit latest at fixed intervals
onBackpressureDrop()
explicitly drop excess items
Device proxy — attribute reads & writes, command calls, CHANGE / PERIODIC / ARCHIVE event subscriptions
RxTangoAttribute · RxTangoAttributeWrite ·
RxTangoCommand · RxTangoAttributeChangePublisher
— all spec-compliant Publisher<T>
map · zip · merge · buffer · filter · scan · throttleLast
— compose, transform, correlate, reduce streams
Pure functions — calibration, averaging, threshold checks. No I/O, no shared state.
Results written back to devices, forwarded to pipelines, logged to telemetry systems
detectorStream
// noise reduction — no circular buffer
.buffer(5, 1)
.map(window -> mean(window))
// only act on out-of-range readings
.filter(v -> outOfRange(v))
// issue correction command
.flatMapSingle(
write(magnet, "setpoint"));
# 1. Bring up the Tango stack
docker compose up -d
# MariaDB → DatabaseDS → TangoTest
# 2. First example
jbang read-attribute@. \
tango://localhost:10000/sys/tg_test/1 \
double_scalar
# 3. The showstopper
jbang pipeline@. \
tango://localhost:10000/sys/tg_test/1