Skip to main content
Version0.1.1
Puritybrowser-safe
PurposeThree-layer pipeline (Bridge → Policy → Handler) that classifies raw OpenClaw Gateway EventFrames, derives pure intents, and dispatches them to injected Zustand stores.
Workspace deps@clawboo/gateway-client, @clawboo/logger, @clawboo/protocol
External depsnone
The pipeline is the architectural invariant “all Gateway events go Bridge → Policy → Handler.” Bridge parsers and Policy deciders are pure and unit-testable; only the Handler holds state (a debounced summary-refresh timer + a closed-runs TTL guard) and reaches out to injected dispatchers.

Public API

Functions

ExportSignatureContract
processEvent(frame: EventFrame, handler: EventHandlerHandle) => voidConvenience runner: classifyEventderivePolicyhandler.applyIntents. The full pipeline in one call.
classifyEvent(frame: EventFrame) => ClassifiedEventBridge step 1. Maps a raw frame’s event field to an EventKind, extracts agentId / sessionKey (from agent:<id>: session keys or payload fields). Unrecognized events → kind: 'unknown'.
parseChatPayload(payload: unknown) => ChatEventPayload | nullValidates a chat payload: requires runId + sessionKey + a state of delta/final/aborted/error. Returns null if malformed.
parseAgentPayload(payload: unknown) => AgentEventPayload | nullValidates an agent payload: requires runId. Extracts seq, stream, data, sessionKey. Returns null if no runId.
isReasoningStream(stream: string) => booleanTrue when the stream name reads as reasoning (reason/think/analysis/trace) and not assistant/tool/lifecycle.
resolveLifecyclePatch(input: { phase: LifecyclePhase; incomingRunId: string; currentRunId: string | null; timestamp: number }) => LifecycleTransitionPure lifecycle reducer: start → running patch; end/error → terminal patch, but ignore when currentRunId is set and mismatches incomingRunId (stale-run guard).
mergeRuntimeStream(current: string, incoming: string) => stringConcatenates incoming streaming text onto the current buffer; no-ops on empty input.
dedupeRunLines(seen: Set<string>, lines: string[]) => { appended: string[]; nextSeen: Set<string> }Filters out already-seen lines, returns the newly-appended subset plus an updated seen-set (immutable copy).
extractText(message: unknown) => string | nullRe-export of @clawboo/protocol. Strips assistant prefix / thinking tags / approval suffix from a message.
extractThinking(message: unknown) => string | nullRe-export of @clawboo/protocol. Pulls the thinking trace from block content / tagged streams.
extractToolLines(message: unknown) => string[]Re-export of @clawboo/protocol. Formats tool calls/results as [[tool]] / [[tool-result]] markdown lines.
derivePolicy(event: ClassifiedEvent) => EventIntent[]Policy router. Dispatches by event.kind to the four deciders below; malformed payloads / unknown kinds → [{ kind: 'ignore', … }].
decideAgentEvent(event: ClassifiedEvent) => EventIntent[]Agent-plane decider for summary-refresh (presence/heartbeat) → one debounced scheduleSummaryRefresh intent (delayMs: 750; includeHeartbeatRefresh true for heartbeat).
decideTrustEvent(event: ClassifiedEvent) => EventIntent[]Trust-plane decider for approval events → approvalPending (exec.approval.pending/.requested) or approvalResolved. Ignores when agentId is missing.
decideWorkChatEvent(event: ClassifiedEvent, payload: ChatEventPayload) => EventIntent[]Work-plane decider for chat frames: deltaqueueLivePatch; final/aborted/errorclearPendingLivePatch + commitChat (+ a requestHistoryRefresh on a final with no thinking trace).
decideWorkAgentEvent(event: ClassifiedEvent, payload: AgentEventPayload) => EventIntent[]Work/agent-plane decider for agent streams: lifecycle phases → updateAgentStatus; reasoning/assistant streams → queueLivePatch; tool/unknown streams → ignore.
createEventHandler(deps: EventHandlerDeps) => EventHandlerHandleBuilds the stateful Handler. Owns a debounced summary-refresh timer + a 30 s / 500-entry closed-runs guard against stale terminal events; dispatches intents to injected store callbacks.
createPatchQueue(onFlush: (patches: Patch[]) => void) => { enqueue: (patch: Patch) => void; flush: () => void; dispose: () => void }RAF-batched per-agent patch merger. Merges updates per agentId until the next animation frame; a run-ID change discards the prior streaming state. SSR-guarded (no-op without requestAnimationFrame).

Types & interfaces

ExportKindContract
EventKindtype'summary-refresh' | 'runtime-chat' | 'runtime-agent' | 'approval' | 'unknown'.
EventPlanetype'work' | 'agent' | 'trust', which store family an intent targets.
ChatStatetype'delta' | 'final' | 'aborted' | 'error'.
LifecyclePhasetype'start' | 'end' | 'error'.
ChatEventPayloadtypeParsed chat frame: { runId, sessionKey, state, seq?, stopReason?, message?, errorMessage? }.
AgentEventPayloadtypeParsed agent frame: { runId, seq?, stream?, data?, sessionKey? }.
ClassifiedEventinterfaceBridge output: { kind, agentId?, sessionKey?, payload, timestamp, raw: EventFrame }.
AgentStatusPatchtypePartial agent state mutation: { status?, runId?, runStartedAt?, streamText?, thinkingTrace?, lastActivityAt? }.
LifecycleTransitiontypeDiscriminated union: { kind: 'start'; patch; clearRunTracking: false } | { kind: 'terminal'; patch; clearRunTracking: true } | { kind: 'ignore' }.
EventIntenttypeThe pipeline’s currency, a discriminated union of queueLivePatch / clearPendingLivePatch / commitChat / updateAgentStatus / scheduleSummaryRefresh / requestHistoryRefresh / approvalPending / approvalResolved / ignore, each tagged with its plane.
EventHandlerDepstypeInjected dependencies for createEventHandler: state queries (getConnectionStatus, getAgentRunId), dispatchers (dispatchIntent, queueLivePatch, appendOutputLines, requestHistoryRefresh, loadSummarySnapshot, refreshHeartbeatLatest, …), injectable timers, and an optional log.
EventHandlerHandletype{ applyIntents: (intents: EventIntent[], event: ClassifiedEvent) => void; dispose: () => void }, the Handler’s return shape.
PatchinterfacePatch-queue entry: { agentId: string; updates: Record<string, unknown> }.
The package exposes no classes or runtime constants from its barrel; the Bridge and Policy layers are plain functions, and the Handler / patch-queue are factory functions returning closures.

Used by

  • apps/web/src/features/connection/useGatewayEvents.ts; wires createEventHandler + createPatchQueue + processEvent to the live Gateway stream and the Zustand stores.
  • apps/web/src/stores/fleet.ts; imports the AgentStatusPatch type for its patchAgent action.
  • packages/adapters/openclaw/src/mapFrame.ts; reuses the pure Bridge parsers (isReasoningStream, parseAgentPayload, parseChatPayload) to map Gateway frames into the runtime-event stream.

Source

Barrel: packages/events/src/index.ts. Layers: bridge.ts (parsers), policy/{index,work,agent,trust}.ts (deciders), handler.ts (dispatch), patch-queue.ts (RAF batching), types.ts (the shared union types).

See also