aion_client/stream

Event stream abstraction with sequence-based resumption.

Types

pub type Event(event) {
  Event(sequence: Int, payload: event)
}

Constructors

  • Event(sequence: Int, payload: event)
pub type EventStream(event) {
  EventStream(read_all: fn() -> List(StreamItem(event)))
}

Constructors

  • EventStream(read_all: fn() -> List(StreamItem(event)))
pub type Frame {
  Frame(sequence: Int, payload: payload.Payload)
  TransientDisconnect
  TerminalFailure(error: error.Error)
  EndOfStream
}

Constructors

pub type StreamItem(event) {
  EventItem(event: Event(event))
  StreamError(error: error.Error)
  StreamEnd
}

Constructors

Stub subscription transport. The cursor passed to open mirrors the wire contract for PerWorkflowSubscription.resume_from_seq (proto3 optional uint64) exactly:

  • None — the subscription request carries no resume field: a fresh live-tail subscription.
  • Some(n)resume_from_seq = n, the FIRST per-workflow sequence number the caller wants (last delivered + 1). Some(1) asks for a full-history replay.

On the wire resume_from_seq = 0 is invalid_input; Option makes the absent case unrepresentable as an integer sentinel, so this client can never emit it.

pub type StubTransport {
  StubTransport(open: fn(option.Option(Int)) -> List(Frame))
}

Constructors

Values

pub fn collect(
  stream: EventStream(event),
) -> List(StreamItem(event))
pub fn subscribe(
  handle: aion_client.WorkflowHandle,
  decoder: decode.Decoder(event),
) -> EventStream(event)

Build a stream for a workflow handle. The concrete WebSocket adapter is an AW-owned transport concern; until that adapter is wired this returns an Unavailable item rather than silently ending.

pub fn subscribe_with_stub(
  transport: StubTransport,
  decoder: decode.Decoder(event),
) -> EventStream(event)

Conformance/test helper that exercises the same cursor protocol as the reference SDK transports: the initial open passes None (no resume field — live tail), every reconnect after a transient disconnect passes Some(last delivered + 1) (the first sequence wanted), re-sent duplicates are filtered, and a sequence gap surfaces as Unavailable instead of silently losing events.

Search Document