Skip to main content

Workflow Streams - Go SDK

Workflow Streams is a Temporal Go SDK contrib library that gives a Workflow a durable, offset-addressed event channel built on Temporal's basic message primitives: Signals, Updates, and Queries. It batch-publishes events to amortize per-Signal cost, deduplicates batches for exactly-once delivery to the log, supports topic filtering, and carries state across Continue-As-New for long-running streams.

Use Workflow Streams when you want outside observers to follow the progress of a Workflow and its Activities: updating a UI as an AI agent works, surfacing status from a payment or order pipeline, or reporting intermediate results from a data job. It is not suited to ultra-low-latency cases like real-time voice, and it targets modest fan-out: tens of publishers and subscribers per Workflow, not thousands.

The Workflow hosts the event log. Publishers append events — the Workflow itself, Activities, or external processes via Client. Subscribers attach to the Workflow Id, optionally filter by topic (a string label set when publishing; topics are implicit and created on first publish), and consume events by long-polling from an offset they store.

Looking for...

Enable streaming on a Workflow

The library ships as go.temporal.io/sdk/contrib/workflowstreams. Enable streaming by constructing a WorkflowStream once at the top of your Workflow function, before any blocking call. Construction must happen there because the stream's handlers have to be registered before the first publish Signal arrives. Doing it after a blocking call would miss any publishes that arrived before the run body resumed.

import (
"go.temporal.io/sdk/contrib/workflowstreams"
"go.temporal.io/sdk/workflow"
)

type OrderInput struct {
OrderID string
StreamState *workflowstreams.WorkflowStreamState
}

func OrderWorkflow(ctx workflow.Context, input OrderInput) error {
stream, err := workflowstreams.NewWorkflowStream(ctx, input.StreamState)
if err != nil {
return err
}
// ... rest of the workflow
_ = stream
return nil
}

NewWorkflowStream creates the in-memory event log and registers the publish Signal, subscribe Update, and offset Query handlers on the current Workflow. The priorState argument is nil on a fresh start and a *WorkflowStreamState after a Continue-As-New rollover (see Continue-As-New).

Construct exactly one WorkflowStream per Workflow. The constructor re-registers the handlers unconditionally on every call, so constructing more than one on the same Workflow registers duplicate handlers. The Go SDK doesn't expose an inspection API for existing handlers, so the library can't raise an exception on a duplicate the way the Python SDK does.

Publish from a Workflow

Bind a topic name with stream.Topic(name), then call Publish() on the returned *WorkflowTopicHandle to append events. Repeated calls with the same name return the same handle.

type StatusEvent struct {
State string
Progress int
Detail string
}

func OrderWorkflow(ctx workflow.Context, input OrderInput) error {
stream, err := workflowstreams.NewWorkflowStream(ctx, input.StreamState)
if err != nil {
return err
}
status := stream.Topic("status")

if err := status.Publish(StatusEvent{State: "validating", Detail: "checking inventory"}); err != nil {
return err
}
if err := workflow.ExecuteActivity(ctx, ValidateOrder, input.OrderID).Get(ctx, nil); err != nil {
return err
}

if err := status.Publish(StatusEvent{State: "charging", Progress: 33, Detail: "authorizing payment"}); err != nil {
return err
}
if err := workflow.ExecuteActivity(ctx, ChargePayment, input.OrderID).Get(ctx, nil); err != nil {
return err
}

if err := status.Publish(StatusEvent{State: "shipping", Progress: 66, Detail: "dispatching to warehouse"}); err != nil {
return err
}
if err := workflow.ExecuteActivity(ctx, DispatchOrder, input.OrderID).Get(ctx, nil); err != nil {
return err
}

return status.Publish(StatusEvent{State: "completed", Progress: 100})
}

Publish() runs the payload converter to encode each value. The codec chain (encryption, compression, etc.) runs once on the Signal or Update envelope that carries the batch, never per item, so encryption and compression are applied exactly once each direction.

Unlike the Python and TypeScript SDKs, Go topics carry no per-topic type binding. A topic handle is bound only to a name; published values are any and subscribers decode each item from its raw payload (see Subscribe). To customize per-item serialization, pass workflowstreams.WithPayloadConverters(...) to NewWorkflowStream, and use the matching converters on the subscriber side.

Publish from a client

Any process that has a Temporal Client and the target Workflow Id can publish to that Workflow's stream by constructing a Client. This is the general pattern and covers HTTP backends, starters, one-off scripts, other Workflows' Activities, and standalone Activities.

Construct one with:

workflowstreams.NewClient(temporalClient, workflowID, workflowstreams.Options{})

Then use it the same way you would the Workflow-side handle: bind a topic, publish through it, and defer client.Close(ctx) to flush on scope exit.

When events originate in an Activity, publish from the Activity directly rather than returning them for the Workflow to forward. The Workflow hosts the stream but doesn't read its own stream; it processes the Activity's return value and emits its own lifecycle events. Keeping Workflow state independent of streamed output is what lets retried Activity attempts surface to subscribers without polluting the Workflow's durable state — see Delivery semantics.

import (
"context"
"time"

"go.temporal.io/sdk/client"
"go.temporal.io/sdk/contrib/workflowstreams"
)

func PublishStatus(ctx context.Context, temporalClient client.Client, workflowID string) error {
streamClient := workflowstreams.NewClient(temporalClient, workflowID, workflowstreams.Options{
BatchInterval: 200 * time.Millisecond,
})
defer streamClient.Close(ctx)

status := streamClient.Topic("status")
status.Publish(StatusEvent{State: "started"}, false)
// ...
// Buffer is flushed automatically on Close.
return nil
}

Inside an Activity scheduled by a Workflow, workflowstreams.NewClientFromActivity() infers the Temporal Client and the parent Workflow Id from the Activity context, so you don't have to thread them through the Activity's input:

import (
"context"
"time"

"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/contrib/workflowstreams"
)

type Delta struct {
Text string
}

func StreamDeltas(ctx context.Context, orderID string) error {
streamClient, err := workflowstreams.NewClientFromActivity(ctx, workflowstreams.Options{})
if err != nil {
return err
}
defer streamClient.Close(ctx)

deltas := streamClient.Topic("delta")
for delta := range generateDeltas(orderID) {
deltas.Publish(delta, false)
activity.RecordHeartbeat(ctx)
}
// Buffer is flushed automatically on Close.
return nil
}

For a standalone Activity (one started directly via the Client rather than from a Workflow), there is no parent Workflow context to infer, so NewClientFromActivity() returns an error. Fall back to the general pattern with activity.GetClient(ctx) and the target Workflow Id threaded through the Activity's input.

Two operations give the application explicit control over when batches ship: the forceFlush argument on a publish for latency, and client.Flush(ctx) for confirmation that prior publications have landed.

Pass true as the forceFlush argument on a publish to wake the background flusher so the current buffer ships without waiting for the next interval. The flusher only runs while the client is open (between construction and Close). The call returns immediately after appending to the buffer and signaling the flusher. It doesn't wait for delivery to the Workflow or to subscribers:

deltas.Publish(delta, true)

Use it for latency-sensitive events: the first delta of a response so the user sees something fast, or punctuated events like RETRY and STATUS_CHANGE. See Tuning for the trade-off against history pressure.

Use client.Flush(ctx) when you need a mid-stream barrier. Successful completion of the flush is proof that the Temporal server has received all prior publications, so subsequent work that depends on those events being durable can proceed. The client stays open for further publishing afterward. Close already flushes on its way out, so the explicit call is only for barriers in the middle:

streamClient := workflowstreams.NewClient(temporalClient, workflowID, workflowstreams.Options{})
defer streamClient.Close(ctx)
deltas := streamClient.Topic("delta")

for _, delta := range firstPhase() {
deltas.Publish(delta, false)
}

if err := streamClient.Flush(ctx); err != nil {
return err
}
checkpointID, err := recordPhaseOneComplete(ctx) // only safe once phase-one events are durable
if err != nil {
return err
}

for _, delta := range secondPhase(checkpointID) {
deltas.Publish(delta, false)
}

Publish() is non-blocking and applies no backpressure. From an Activity or other client, it appends to the client's in-memory buffer and returns. From inside a Workflow, it appends synchronously to the in-memory log. Subscribers pull from the Workflow's log on their own schedule, so a slow subscriber doesn't slow down publishers. If a publisher emits faster than batches can ship to the server, the buffer grows: the process uses more memory, the stream falls further behind real time, and at the limit Signals can't keep up.

If your application needs to bound this (to cap memory, to keep the stream close to real time, or to apply a policy when the publisher overruns the network), apply that policy upstream of Publish(). The choice (block, drop, error, sample) is application-specific, and Workflow Streams doesn't pick one for you.

Subscribe

Subscribing uses the same client construction as publishing: workflowstreams.NewClient(temporalClient, workflowID, opts) from any process that has a Temporal Client, or NewClientFromActivity() inside an Activity. Subscribing from an Activity is less common in practice, so the general client case is the primary example below.

Once you have a client, range over client.Subscribe(), the counterpart to Publish(). It returns an iter.Seq2 iterator that yields a WorkflowStreamItem and an error on each step. Each item's Data is the raw payload; decode it at the call site with a payload converter.

import (
"context"
"fmt"

"go.temporal.io/sdk/client"
"go.temporal.io/sdk/contrib/workflowstreams"
"go.temporal.io/sdk/converter"
)

func WatchOrder(ctx context.Context, temporalClient client.Client, orderID string) error {
stream := workflowstreams.NewClient(temporalClient, orderID, workflowstreams.Options{})

for item, err := range stream.Subscribe(ctx, workflowstreams.SubscribeOptions{Topics: []string{"status"}}) {
if err != nil {
return err
}
var evt StatusEvent
if err := converter.GetDefaultDataConverter().FromPayload(item.Data, &evt); err != nil {
return err
}
fmt.Printf("[%3d%%] %s: %s\n", evt.Progress, evt.State, evt.Detail)
if evt.State == "completed" {
break
}
}
return nil
}

SubscribeOptions controls the subscription: Topics filters by name (empty or nil means all topics), FromOffset resumes from a stored global offset (zero means the beginning), and PollCooldown sets the minimum interval between polls. The iterator handles re-polling, pagination when a poll response hits the ~1 MB cap, and Workflow-side log truncation transparently. A single-topic convenience method, streamClient.Topic("status").Subscribe(ctx, fromOffset), is equivalent to passing one name in Topics.

A subscriber doesn't need Close() because the background flusher only runs for publishers.

Heterogeneous topics

Every item arrives as a raw *commonpb.Payload in item.Data, so a single subscription naturally consumes multiple topics whose payload types differ. Pass the topic names in SubscribeOptions.Topics (or leave it empty for every topic on the stream), dispatch on item.Topic, and decode into the matching type:

for item, err := range stream.Subscribe(ctx, workflowstreams.SubscribeOptions{Topics: []string{"status", "progress"}}) {
if err != nil {
return err
}
switch item.Topic {
case "status":
var evt StatusEvent
if err := converter.GetDefaultDataConverter().FromPayload(item.Data, &evt); err != nil {
return err
}
fmt.Printf("[status] %s: %s\n", evt.State, evt.Detail)
case "progress":
var evt ProgressEvent
if err := converter.GetDefaultDataConverter().FromPayload(item.Data, &evt); err != nil {
return err
}
fmt.Printf("[progress] %s\n", evt.Message)
}
}

A single iterator over multiple topics also avoids the cancellation race that two concurrent subscribers would create. Because item.Data is the raw payload, it's also the right shape when you want to forward the bytes through to another system without decoding them.

Closing the stream

A subscriber's for ... range doesn't know when the publisher is done. How you close a stream depends on what the application needs. As one example, a common pattern combines two pieces:

  1. An in-band terminator. The Workflow or its Activity publishes a sentinel event the subscriber recognizes and breaks on. In the WatchOrder example above, StatusEvent{State: "completed"} is the minimal form, and the consumer's if evt.State == "completed" { break } is the matching half. Each subscription decides what its own end-of-stream marker is.
  2. A brief overlap before the Workflow returns. A poll Update that is still in flight when the Workflow returns is surfaced to the iterator and consumed silently, and no new polls can complete after that. If the Workflow returns immediately after publishing the terminator, subscribers may miss it.

There are two ways to provide that overlap.

  • Fixed sleep. Sleep between the terminator and the return so any in-flight poll has time to fetch the terminator before the Workflow exits:

    // at the end of the workflow function
    if err := status.Publish(StatusEvent{State: "completed", Progress: 100}); err != nil {
    return err
    }
    if err := workflow.Sleep(ctx, 30*time.Second); err != nil {
    return err
    }
    return nil
  • Acknowledgment handshake. The subscriber sends a Signal once it has the terminator; the Workflow waits up to a timeout, returning as soon as the ack arrives:

    func ChatWorkflow(ctx workflow.Context, input ChatInput) (string, error) {
    stream, err := workflowstreams.NewWorkflowStream(ctx, input.StreamState)
    if err != nil {
    return "", err
    }

    subscriberDone := false
    ackCh := workflow.GetSignalChannel(ctx, "subscriber-acknowledged-terminator")
    workflow.Go(ctx, func(ctx workflow.Context) {
    ackCh.Receive(ctx, nil)
    subscriberDone = true
    })

    // ... do work and publish events ...

    // Returns true if the ack arrived, false on timeout — either way, fall through.
    _, _ = workflow.AwaitWithTimeout(ctx, 30*time.Second, func() bool { return subscriberDone })
    return result, nil
    }

The full pattern is wired into the Stream LLM output example below.

You can inspect the terminal status. Subscribe() ends cleanly when the Workflow reaches COMPLETED, FAILED, CANCELED, TERMINATED, or TIMED_OUT, but doesn't distinguish among them. If your application needs to know which (to display success or failure to the user, log the outcome, or decide whether to retry), call temporalClient.DescribeWorkflowExecution(ctx, workflowID, "") after the loop returns to inspect the Workflow's status.

Continue-As-New

Continue-As-New following requires the client retained from NewClient() or NewClientFromActivity(), which re-target the Workflow Id across runs.

To roll a long-running streaming Workflow over without subscribers seeing a gap, carry both your application state and the stream state across the boundary. Add a *WorkflowStreamState field to your Workflow input, pass it to NewWorkflowStream, and return stream.NewContinueAsNewError(ctx, wfn, buildArgs) to invoke the rollover. The helper drains waiting subscribers, waits for in-flight handlers to finish, then returns a Continue-As-New error built from the args produced by buildArgs(postDrainState):

type WorkflowInput struct {
ItemsProcessed int
StreamState *workflowstreams.WorkflowStreamState
}

func LongRunningWorkflow(ctx workflow.Context, input WorkflowInput) error {
stream, err := workflowstreams.NewWorkflowStream(ctx, input.StreamState)
if err != nil {
return err
}
itemsProcessed := input.ItemsProcessed

for {
if err := doOneIteration(ctx, stream); err != nil {
return err
}
itemsProcessed++

if workflow.GetInfo(ctx).GetContinueAsNewSuggested() {
return stream.NewContinueAsNewError(ctx, LongRunningWorkflow, func(state *workflowstreams.WorkflowStreamState) []any {
return []any{WorkflowInput{
ItemsProcessed: itemsProcessed,
StreamState: state,
}}
})
}
}
}

The *WorkflowStreamState field is nil on a fresh start and a populated snapshot after a rollover. The buildArgs callback receives the post-detach *WorkflowStreamState as its only argument, so the snapshot is guaranteed to happen after pollers detach.

To pass other Continue-As-New parameters such as a different task queue, or to use a custom publisher TTL, use the explicit recipe instead. Drain the pollers, wait for handlers to finish, snapshot the state with your chosen TTL, then build the Continue-As-New error yourself (set options such as the task queue on the context first):

stream.DetachPollers()
_ = workflow.Await(ctx, func() bool { return workflow.AllHandlersFinished(ctx) })
state, err := stream.GetState(30 * time.Minute) // custom publisher TTL
if err != nil {
return err
}
ctx = workflow.WithWorkflowTaskQueue(ctx, "other-tq")
return workflow.NewContinueAsNewError(ctx, LongRunningWorkflow, WorkflowInput{
ItemsProcessed: itemsProcessed,
StreamState: state,
})

The carried WorkflowStreamState includes the entire in-memory log of the previous run, so streams that carry large items can hit Temporal's per-payload size limit at the rollover. Offload the bytes via External Storage so each item is a small reference rather than the full payload, and combine that with stream.Truncate(upToOffset) to keep the carried log itself small.

Deduplication window

See Delivery Semantics for more details on subscriber and publisher behavior. See Tuning for more details on how to change your settings to meet the requirements for your Workflow Streams.

There are two limits on the deduplication window worth highlighting:

  • Publisher TTL. At each Continue-As-New, deduplicate entries whose last-seen time is older than this are dropped. The last-seen time is updated on each successful publish (not on each retry attempt), so a publisher that retries through a long partition without success can still age out. A publisher that returns after a longer pause may produce a duplicate. stream.NewContinueAsNewError(...) snapshots with a 15-minute default; to tune it, use the explicit recipe above and pass your value to GetState(publisherTTL).

  • MaxRetryDuration. A Client retries a failed batch for up to this long (default 10 minutes). If the duration elapses with the batch still pending, the client gives up, the pending batch is dropped, and a FlushTimeoutError is raised.

    workflowstreams.NewClient(temporalClient, workflowID, workflowstreams.Options{
    MaxRetryDuration: 10 * time.Minute,
    })

    On timeout, the dropped batch is at-most-once: it may or may not have reached the Workflow. One operational caveat: the FlushTimeoutError is raised from inside the background flusher and terminates it. Until you call client.Flush(ctx) or client.Close(ctx), subsequent publishes accumulate in the buffer with no flusher to ship them. MaxRetryDuration must be less than the workflow's publisher TTL to preserve exactly-once delivery.

Best practices

There are a few details to note if you're writing custom message handlers or testing the library's capabilities:

  • Construct exactly one WorkflowStream per Workflow. The constructor re-registers the publish Signal, poll Update, and offset Query handlers on every call, so a second construction registers duplicate handlers. Construct it once at the top of the Workflow function.
  • item.Data is always the raw payload. Decode it with a converter built from the same PayloadConverters used by the publisher. When publishers and subscribers both rely on the defaults, converter.GetDefaultDataConverter() matches on both sides. If you pass WithPayloadConverters on the Workflow side, build a matching converter.NewCompositeDataConverter(...) on the subscriber side.
  • The codec chain runs once on the envelope. Payload codecs (encryption, compression) configured on the Temporal client run on the Signal or Update envelope that carries each batch, never per item, so items are never double-encoded. PayloadConverters handle only per-item serialization.
  • The client publish path isn't goroutine-safe. The client buffer is mutated on the publish path and read from the background flusher. Don't call Publish() on the same Client from multiple goroutines without coordinating; route events to a single owner.

Example: Stream LLM output

The headline use case fits the publish/subscribe shapes documented above. An Activity calls the model and publishes deltas as they arrive. The Workflow starts the Activity and waits for the consumer to acknowledge end-of-stream. The consumer subscribes, accumulates the deltas, and clears its accumulated state on RETRY before continuing. The shape works for a terminal client, a desktop UI, or a Server-Sent Events (SSE) endpoint forwarding to a browser. Anything that holds the displayed state calls render() to display it.

If your Activity can retry, the consumer side has to account for it. A retried attempt is a fresh publisher, so its output appears in the stream alongside the output from the previous attempt. In the LLM streaming pattern below, that means the failed attempt's partial deltas and the retried attempt's full output both reach a subscribed UI unless the UI resets on a RETRY event. The example wires up that pattern. See Delivery semantics for the precise guarantees.

// activity.go
package main

import (
"context"
"strings"
"time"

"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/contrib/workflowstreams"
)

type TextDelta struct {
Text string
}

type RetryEvent struct {
Attempt int32
}

func StreamCompletion(ctx context.Context, prompt string) (string, error) {
attempt := activity.GetInfo(ctx).Attempt
streamClient, err := workflowstreams.NewClientFromActivity(ctx, workflowstreams.Options{
BatchInterval: 200 * time.Millisecond,
})
if err != nil {
return "", err
}
defer streamClient.Close(ctx)

deltas := streamClient.Topic("delta")
retry := streamClient.Topic("retry")
closeTopic := streamClient.Topic("close")

// Tell consumers an earlier attempt's deltas are stale.
if attempt > 1 {
retry.Publish(RetryEvent{Attempt: attempt}, true)
}

var full []string
first := true
// generateDeltas wraps the model call and yields tokens as they arrive.
// Disable provider-side retries; let Temporal own retry policy at the Activity layer.
for token := range generateDeltas(prompt) {
// forceFlush only on the first delta so the user sees something
// immediately; subsequent deltas batch at the 200 ms interval.
deltas.Publish(TextDelta{Text: token}, first)
first = false
full = append(full, token)
}
closeTopic.Publish(struct{}{}, false)
return strings.Join(full, ""), nil
}
// workflow.go
package main

import (
"time"

"go.temporal.io/sdk/contrib/workflowstreams"
"go.temporal.io/sdk/workflow"
)

type ChatInput struct {
Prompt string
StreamState *workflowstreams.WorkflowStreamState
}

func ChatWorkflow(ctx workflow.Context, input ChatInput) (string, error) {
stream, err := workflowstreams.NewWorkflowStream(ctx, input.StreamState)
if err != nil {
return "", err
}
_ = stream

subscriberDone := false
ackCh := workflow.GetSignalChannel(ctx, "subscriber-acknowledged-terminator")
workflow.Go(ctx, func(ctx workflow.Context) {
ackCh.Receive(ctx, nil)
subscriberDone = true
})

ao := workflow.ActivityOptions{StartToCloseTimeout: 5 * time.Minute}
ctx = workflow.WithActivityOptions(ctx, ao)
var result string
if err := workflow.ExecuteActivity(ctx, StreamCompletion, input.Prompt).Get(ctx, &result); err != nil {
return "", err
}

// Wait for the subscriber to ack the terminal `close` event. The timeout
// is a fallback for when no subscriber is attached; with the ack, the
// typical case exits as soon as the subscriber confirms.
_, _ = workflow.AwaitWithTimeout(ctx, 30*time.Second, func() bool { return subscriberDone })
return result, nil
}
// consumer.go: accumulates the model's output and resets on retry
package main

import (
"context"
"strings"

"go.temporal.io/sdk/client"
"go.temporal.io/sdk/contrib/workflowstreams"
"go.temporal.io/sdk/converter"
)

func StreamChat(ctx context.Context, temporalClient client.Client, chatID string) (string, error) {
// Subscribe-only; no Close needed because the flusher only runs for publishers.
stream := workflowstreams.NewClient(temporalClient, chatID, workflowstreams.Options{})
dc := converter.GetDefaultDataConverter()
var output []string

render := func() {
// ... display the accumulated output (terminal redraw, UI update, etc.)
}

for item, err := range stream.Subscribe(ctx, workflowstreams.SubscribeOptions{Topics: []string{"delta", "retry", "close"}}) {
if err != nil {
return "", err
}
switch item.Topic {
case "retry":
// Earlier attempt's deltas are stale; drop what we've shown.
output = output[:0]
render()
case "delta":
var delta TextDelta
if err := dc.FromPayload(item.Data, &delta); err != nil {
return "", err
}
output = append(output, delta.Text)
render()
case "close":
// Acknowledge so the Workflow can return without waiting on the fallback timeout.
if err := temporalClient.SignalWorkflow(ctx, chatID, "", "subscriber-acknowledged-terminator", nil); err != nil {
return "", err
}
return strings.Join(output, ""), nil
}
}

return strings.Join(output, ""), nil
}

A few choices in this shape are deliberate:

  • The Activity is the publisher because it owns the non-deterministic LLM call. The Workflow processes only the Activity's return value, never reading its own stream — see Publish from a client for why.
  • The Activity publishes a RETRY event when activity.GetInfo(ctx).Attempt > 1. This lets the UI respond appropriately to the failure, typically by clearing accumulated deltas before the next attempt's deltas arrive (see Delivery semantics).
  • Termination uses an ack handshake: the consumer signals the Workflow once it has received the close event, so the Workflow can return as soon as the subscriber confirms. The AwaitWithTimeout timeout is the fallback when no subscriber is attached (see Closing the stream for the simpler fixed-sleep alternative).
  • forceFlush is true only on the first delta and on the RETRY sentinel, where latency matters. Subsequent deltas batch at the 200 ms BatchInterval; per-delta forceFlush would generate one Signal per token (see Tuning for the trade-off).

See also