BeamCurrent→ acquisition gateInterlockCount→ abort triggerScenarioId (rw) — inject fault via writeOrbitX→ quality flag per frameVacuumPressure, BeamLossFraction — feed interlockVAL, MOVN, SPEED — rotation stage controlACQUIRE, COUNTS, ACQUIRING, EXPOSURE — detectorOPEN — beam shutterbeam_posx, beam_posy — position diagnosticsfilter + take(1)distinct_until_changedrx.zip (Tango + EPICS)take_untilshare() + sample()ring_health(scheduler)python inject_fault.py beam_losspython inject_fault.py orbit_driftpython inject_fault.py vacuum_burstpython inject_fault.py nominalops.flat_map(lambda _: rx.zip(
# EPICS Channel Access (caproto)
read_pv("TOMO:DET:COUNTS", ctx),
read_pv("TOMO:BEAM:POSX", ctx),
read_pv("TOMO:BEAM:POSY", ctx),
# Tango Controls (PyTango)
read_attribute(CONTROLLER, "BeamCurrent"),
read_attribute(SECTOR_04, "OrbitX"),
)),
ops.map(lambda r: (
float(r[0]), float(r[1]), float(r[2]), # EPICS
float(r[3]), float(r[4]), # Tango
abs(float(r[4])) < ORBIT_ALARM, # quality
))
Health = namedtuple("Health",
["current", "interlocks", "orbit_x"])
def ring_health(scheduler, interval_ms=1000):
return rx.interval(
timedelta(milliseconds=interval_ms),
scheduler=scheduler,
).pipe(
# Read three ring attributes simultaneously
ops.flat_map(lambda _: rx.zip(
read_attribute(CONTROLLER, "BeamCurrent"),
read_attribute(CONTROLLER, "InterlockCount"),
read_attribute(SECTOR_04, "OrbitX"),
)),
ops.map(lambda t: Health(
current=float(t[0]),
interlocks=int(t[1]),
orbit_x=float(t[2]),
)),
# One poll stream → many subscribers
ops.share(),
)
# 1. Shutter supervisor
health.pipe(map(beam_ok), distinct_until_changed,
flat_map(write_pv(SHUTTER))).subscribe(...)
# 2. Abort trigger (in take_until)
abort = health.pipe(filter(interlocks > 0), take(1))
# 3. Per-projection gate
health.pipe(filter(is_healthy), take(1), ignore_elements())
# Before each projection: wait for ring health
wait_healthy = health.pipe(
ops.filter(is_healthy), # ≥ 50 mA, no interlocks
ops.take(1), # complete on first match
ops.ignore_elements(), # timing only
)
# Each projection is gated
return rx.concat(wait_healthy, acquire(...))
health.pipe(
ops.map(lambda h:
h.current >= MIN_BEAM_CURRENT),
ops.distinct_until_changed(), # only on transitions
ops.flat_map(lambda ok: write_pv(
"TOMO:SHUTTER:OPEN", 1 if ok else 0, ctx)),
).subscribe(on_next=print, scheduler=scheduler)
# Terminal A: scan running
python guarded_scan.py --ascii
# Terminal B: inject beam loss
python inject_fault.py beam_loss
# → ring current → 25 mA
# → shutter supervisor writes TOMO:SHUTTER:OPEN = 0
# → next projection's wait_healthy blocks
# → "[proj N] waiting for healthy beam..."
# Terminal B: restore
python inject_fault.py nominal
# → ring current → ~100 mA
# → shutter opens (distinct_until_changed fires)
# → wait_healthy unblocks → projection starts
# After detector completes — cross-system zip:
ops.flat_map(lambda _: rx.zip(
read_pv("TOMO:DET:COUNTS", ctx), # EPICS
read_pv("TOMO:BEAM:POSX", ctx), # EPICS
read_pv("TOMO:BEAM:POSY", ctx), # EPICS
read_attribute(CONTROLLER, # Tango
"BeamCurrent"),
read_attribute(SECTOR_04, # Tango
"OrbitX"),
)),
ops.map(lambda r: (
...,
abs(float(r[4])) < 55.0, # quality_ok
))
dtype=np.dtype([
("timestamp", "f8"),
("proj_index", "i4"),
("angle", "f4"),
("counts", "f8"), # EPICS
("beam_posx", "f4"), # EPICS
("beam_posy", "f4"), # EPICS
("ring_current", "f4"), # Tango ← ring
("orbit_x", "f4"), # Tango ← ring
("quality_ok", "?"), # True/False
])
python inject_fault.py orbit_drift
# orbit_x → drifts past 55 µm
# → frames tagged quality_ok = False
# → live display shows ~ instead of ✓
# Abort fires once: first emission with interlocks > 0
abort_trigger = health.pipe(
ops.filter(lambda h: h.interlocks > 0),
ops.take(1),
ops.do_action(on_next=lambda h: (
scan_aborted.set(),
print(f"⚠ VACUUM BURST: "
f"interlocks={h.interlocks}")
)),
)
# Wrap the entire 360-projection scan
scan = rx.concat(
setup, *projections, teardown_done
).pipe(
ops.take_until(abort_trigger) # ← one line
)
# Emergency teardown (after scan_done.wait())
if scan_aborted.is_set():
rx.zip(
write_pv("TOMO:SCAN:STATUS", SCAN_ABORTED, ctx),
write_pv("TOMO:SHUTTER:OPEN", 0, ctx),
).subscribe(...)
python inject_fault.py vacuum_burst
# C++ simulator: VacuumPressure → high
# → sector interlocks fire
# → InterlockCount on controller > 0
# → abort_trigger emits once
# → take_until completes the scan stream
# → scan_aborted.set()
# → emergency teardown fires:
# TOMO:SCAN:STATUS = ABORTED (3)
# TOMO:SHUTTER:OPEN = 0
# One execution of the scan
source = guarded_scan(...).pipe(ops.share())
# Branch 1: HDF5 writer — receives every frame
source.subscribe(
on_next=write_frame, # must not block
scheduler=scheduler,
)
# Branch 2: live display — throttled at 250 ms
source.pipe(
ops.sample(timedelta(milliseconds=250),
scheduler=scheduler),
).subscribe(
on_next=display_frame, # allowed to be slow
scheduler=scheduler,
)
print(f"Frames acquired: {hdf5_written}")
print(f"Frames displayed: {display_shown}")
# Actual drop rate = 1 - displayed / acquired
# e.g.: acquired 360, displayed 12 → 97% dropped
# HDF5 has all 360 — zero data loss
health = ring_health(scheduler) # shared Tango poll
# Pattern 1: shutter supervisor (3 operators)
supervisor = health.pipe(
ops.map(lambda h: h.current >= 50),
ops.distinct_until_changed(),
ops.flat_map(lambda ok:
write_pv(SHUTTER, 1 if ok else 0, ctx)),
).subscribe(...)
# Pattern 3: abort trigger (1 filter + take)
abort = health.pipe(
ops.filter(lambda h: h.interlocks > 0),
ops.take(1),
)
# Pattern 1: per-projection health gate
wait = health.pipe(
ops.filter(is_healthy), ops.take(1),
ops.ignore_elements(),
)
# Scan: each projection is gated + cross-system zip
scan = rx.concat(
setup,
*[rx.concat(wait, acquire(angle, i, health))
for i, angle in enumerate(angles)],
teardown,
).pipe(ops.take_until(abort)) # Pattern 3
# Pattern 4: share + sample
source = scan.pipe(ops.share())
source.subscribe(on_next=write_frame)
source.pipe(ops.sample(250ms)).subscribe(
on_next=display_frame)
# After detector exposure completes:
ops.flat_map(lambda _: rx.zip(
read_pv("TOMO:DET:COUNTS", ctx), # EPICS
read_pv("TOMO:BEAM:POSX", ctx), # EPICS
read_pv("TOMO:BEAM:POSY", ctx), # EPICS
read_attribute(CTRL, "BeamCurrent"),# Tango
read_attribute(SEC4, "OrbitX"), # Tango
)),
ops.map(lambda r: (
*r[:5],
abs(r[4]) < ORBIT_ALARM, # quality
))