CURRENT from /HERA/Magnets/QF1
and XPOS from /HERA/Diagnostics/BPM01 —
only process the pair if both arrive in the same polling window.
Discard if either fails."
Flowable.interval(500, MILLISECONDS)
.flatMapSingle(tick -> Single.zip(
// Both reads fire in parallel
TineClient.read("/HERA/Magnets/QF1", "CURRENT")
.subscribeOn(Schedulers.io()),
TineClient.read("/HERA/Diagnostics/BPM01", "XPOS")
.subscribeOn(Schedulers.io()),
// Only runs when BOTH complete
(current, pos) -> process(current[0], pos[0])
));
TDataType returns arrays — take [0] for scalars.
INTENSITY from /HERA/Beam/Monitor,
compute a 5-sample sliding average, and write the smoothed value
back only if it deviates more than 10% from the last write."
prevWrite float — same scaffolding, every projectTineClient.monitor("/HERA/Beam/Monitor", "INTENSITY")
// sliding window — no deque, no index arithmetic
.buffer(5, 1)
.map(window -> mean(window))
// skip write if within 10% of last written value
.distinctUntilChanged(
(prev, curr) ->
Math.abs(curr - prev) / prev < 0.1
)
.flatMapSingle(
v -> TineClient.write(
"/HERA/Beam/Corrector", "SETPOINT", v));
Flowable.merge(
TineClient.monitor(
"/HERA/RF/Cavity01", "STATUS"),
TineClient.monitor(
"/HERA/RF/Cavity02", "STATUS"),
TineClient.monitor(
"/HERA/RF/Cavity03", "STATUS")
)
.filter(s -> s[0] != STATUS_OK)
.subscribe(
alarm -> notifyOperator(alarm),
error -> log.error("monitor failed", error)
);
Address: /Context/Server/Device + property name (e.g. SENSOR).
Single-shot reads via TLink.executeAndClose();
push via TLink.attach(CM_POLL).
RxTineRead<T> · RxTineWrite<T>
— single-shot ·
RxTineMonitor<T> — push, CM_POLL-backed.
All spec-compliant Publisher<T>.
map · zip · merge · buffer · filter · scan · distinctUntilChanged
— compose, transform, correlate, reduce streams
Pure Java functions — calibration, averaging, threshold checks. No I/O, no shared state.
Results written back to TINE properties, forwarded to pipelines, logged to telemetry systems
RxTineWrite emits the written value,
so the same pipeline composition applies.
TineClient
.monitor("/HERA/Beam/Monitor", "INTENSITY")
.buffer(5, 1)
.map(w -> mean(w))
.filter(v -> outOfRange(v))
.flatMapSingle(
v -> TineClient.write(
"/HERA/Beam/Corrector",
"SETPOINT", v))
.subscribe(v -> log("corrected: " + v));