aion_client/stream
Event stream abstraction with sequence-based resumption.
Types
pub type Event(event) {
Event(sequence: Int, payload: event)
}
Constructors
-
Event(sequence: Int, payload: event)
pub type EventStream(event) {
EventStream(read_all: fn() -> List(StreamItem(event)))
}
Constructors
-
EventStream(read_all: fn() -> List(StreamItem(event)))
pub type Frame {
Frame(sequence: Int, payload: payload.Payload)
TransientDisconnect
TerminalFailure(error: error.Error)
EndOfStream
}
Constructors
-
Frame(sequence: Int, payload: payload.Payload) -
TransientDisconnect -
TerminalFailure(error: error.Error) -
EndOfStream
pub type StreamItem(event) {
EventItem(event: Event(event))
StreamError(error: error.Error)
StreamEnd
}
Constructors
-
EventItem(event: Event(event)) -
StreamError(error: error.Error) -
StreamEnd
Stub subscription transport. The cursor passed to open mirrors the wire
contract for PerWorkflowSubscription.resume_from_seq (proto3
optional uint64) exactly:
None— the subscription request carries no resume field: a fresh live-tail subscription.Some(n)—resume_from_seq = n, the FIRST per-workflow sequence number the caller wants (last delivered + 1).Some(1)asks for a full-history replay.
On the wire resume_from_seq = 0 is invalid_input; Option makes the
absent case unrepresentable as an integer sentinel, so this client can
never emit it.
pub type StubTransport {
StubTransport(open: fn(option.Option(Int)) -> List(Frame))
}
Constructors
-
StubTransport(open: fn(option.Option(Int)) -> List(Frame))
Values
pub fn collect(
stream: EventStream(event),
) -> List(StreamItem(event))
pub fn subscribe(
handle: aion_client.WorkflowHandle,
decoder: decode.Decoder(event),
) -> EventStream(event)
Build a stream for a workflow handle. The concrete WebSocket adapter is an AW-owned transport concern; until that adapter is wired this returns an Unavailable item rather than silently ending.
pub fn subscribe_with_stub(
transport: StubTransport,
decoder: decode.Decoder(event),
) -> EventStream(event)
Conformance/test helper that exercises the same cursor protocol as the
reference SDK transports: the initial open passes None (no resume field
— live tail), every reconnect after a transient disconnect passes
Some(last delivered + 1) (the first sequence wanted), re-sent
duplicates are filtered, and a sequence gap surfaces as Unavailable
instead of silently losing events.