Introduction
Distributed stream processing systems must answer a deceptively simple question: how many times is each input record processed?
The three classical delivery guarantees are at-most-once, at-least-once, and exactly-once.
Of these, exactly-once semantics (EOS) is the most desirable and the most misunderstood.
Critics have rightly pointed out that "exactly-once delivery" is impossible in the general case.
Network partitions, process crashes, and message loss make it impossible to guarantee that a message is delivered across a network exactly one time.
But exactly-once semantics is a different claim.
It does not promise that messages are never retransmitted or that failures never occur.
It promises that the observable effect on system state is indistinguishable from a world where every record was processed exactly once.
The distinction between delivery and semantics is the crux of the entire topic.
Why It Matters
In at-least-once systems, failures cause retries, and retries cause duplicates.
For many workloads (counting ad impressions, computing financial aggregates, and updating account balances) duplicates produce incorrect results.
Engineers working around this typically push deduplication logic into application code, which is error-prone, hard to test, and difficult to maintain.
EOS shifts this burden into the infrastructure layer, letting application logic remain clean.
Core Mechanisms
Achieving exactly-once semantics requires coordinating three things: input consumption, state mutation, and output production.
If any one of these falls out of sync after a failure, the guarantee breaks.
Most production systems use one of two broad strategies, or a combination of both.
Idempotent Writes
If every operation is idempotent, replaying a record has no additional effect.
This is the simplest path to EOS but also the most constrained.
True idempotency requires that the output depends only on the input record and not on accumulated state.
Stateless transformations (filtering, mapping) are naturally idempotent.
Stateful operations (aggregations, windowed joins) are not, because the result of processing a record depends on what records were processed before it.
Some systems simulate idempotency for stateful operations by tagging each output with a deterministic sequence number or transaction ID, and having sinks reject duplicates.
This works well when the sink supports conditional writes or upserts but pushes complexity to the boundary.
Transactional State Management
The more general approach treats input offset advancement, state updates, and output production as a single atomic operation If the operation commits, all three take effect.
If it fails, none do and the system retries from the last committed position.
Apache Kafka's exactly-once support (introduced in KIP-98) is the canonical example.
It combines idempotent producers, transactional writes, and consumer offset commits into a single transaction.
Kafka Streams builds on this by atomically committing changelog state, output records, and consumer offsets in one Kafka transaction.
Apache Flink takes a different path: it uses distributed snapshots based on the Chandy-Lamport algorithm to periodically checkpoint operator state, input positions, and (via two-phase commit sinks) output side effects.
Distributed Snapshots and Checkpointing
Flink's approach deserves closer examination because it generalizes beyond Kafka.
The idea is rooted in the Chandy-Lamport snapshot algorithm: inject special barrier markers into the data stream, and when an operator receives barriers from all its input channels, it snapshots its local state.
On recovery, the system restores the most recent consistent snapshot and replays input from the corresponding offsets.
This works cleanly for internal state.
The challenge is external side effects.
If an operator wrote results to an external database before the checkpoint completed, and the system then rolls back, those writes become duplicates.
Flink addresses this with two-phase commit sink connectors: writes are staged (prepared) during processing and only committed when the checkpoint completes.
If the checkpoint fails, staged writes are aborted.
Walkthrough
The following walkthrough illustrates how a checkpoint-based EOS system processes a failure and recovers correctly.
Assume a simple pipeline: a source reads from a partitioned log, a stateful operator computes a running sum, and a sink writes to an external database using two-phase commit.
Normal Processing (Checkpoint N)
1. Checkpoint coordinator injects barrier B(N) into the source streams.
2. Source records its current offsets: {partition_0: 1042, partition_1: 897}.
3. Barrier B(N) flows downstream through the DAG.
4. Sum operator receives B(N) on all input channels.
- Snapshots local state: {running_sum: 53701}.
- Forwards B(N) downstream.
5. Sink receives B(N).
- Prepares (pre-commits in the 2PC sense) pending writes to external DB.
- Acknowledges barrier to coordinator.
6. Coordinator marks checkpoint N as complete.
7. Sink finalizes (commits) pre-committed writes to external DB.
Failure and Recovery
1. After checkpoint N completes, processing continues.
2. Sum operator processes 15 more records, updating running_sum to 54210.
3. Sink prepares 8 results for external DB (staged via 2PC, not yet committed).
4. Sum operator crashes.
Recovery:
5. System restores checkpoint N:
- Source rewinds to offsets {partition_0: 1042, partition_1: 897}.
- Sum operator restores state {running_sum: 53701}.
- Sink aborts the prepared (not yet committed) writes from the failed epoch.
6. Processing resumes from checkpoint N offsets.
7. The 15 records are replayed. Sum operator recomputes running_sum to 54210.
8. Sink re-prepares the 8 results (now in a new transaction epoch).
9. Next checkpoint (N+1) completes, and sink commits the new writes.
The key observation: records between checkpoint N and the failure are processed twice, but the externally visible state is identical to processing them once.
The running sum arrives at the same value (deterministic replay), and the sink's two-phase commit ensures that only committed writes survive.
Pseudocode: Operator Checkpoint Handling
function onBarrierReceived(barrier, channel):
blocked_channels.add(channel)
buffer_input(channel) // buffer records arriving on this channel
if blocked_channels == all_input_channels:
state_snapshot = snapshot(local_state)
persist_async(state_snapshot, barrier.checkpoint_id)
for each output_channel:
emit(barrier)
blocked_channels.clear()
release_buffered_input()
function onCheckpointComplete(checkpoint_id):
// called by coordinator after all operators acknowledge
// simplified: production systems typically retain the last N checkpoints
// for safety; here we discard only snapshots strictly older than the
// previous checkpoint.
discard_snapshots_older_than(checkpoint_id - 1)
function onRecovery(checkpoint_id):
local_state = restore(checkpoint_id)
request_source_rewind(checkpoint_id)
Costs and Limitations
EOS is not free.
The costs show up in several dimensions.
Latency. Checkpoint-based systems add latency proportional to the checkpoint interval.
Two-phase commit sinks hold writes until the checkpoint completes, so end-to-end latency includes at least one checkpoint cycle.
Kafka's transactional approach adds latency from transaction coordination (typically tens of milliseconds).
Throughput. Barriers cause alignment delays when input channels have skewed rates.
Flink introduced unaligned checkpoints (FLIP-76, available from Flink 1.11) to mitigate this by allowing barriers to overtake in-flight records, at the cost of larger checkpoint sizes.
Determinism requirements. Checkpoint-based replay assumes deterministic processing.
If an operator's output depends on wall-clock time, random number generators, or external service calls, replay may produce different results.
In practice, systems handle this by logging non-deterministic inputs or by requiring operators to use processing-time semantics with explicit allowances for non-determinism.
End-to-end scope. EOS within the stream processor does not automatically extend to external systems.
The guarantee holds end-to-end only if the source supports replay (e.g., Kafka's offset-based consumption) and the sink supports idempotent writes or transactional commits.
A stream processor writing to a non-transactional HTTP endpoint cannot provide EOS for that output, regardless of internal guarantees.
The "Effectively Once" Perspective
Jay Kreps, in discussing Kafka's EOS implementation, prefers the framing "effectively once" to emphasize that messages may be physically delivered more than once, but the system's mechanisms ensure that the effect is as if each were delivered once.
This is a useful mental model.
The system does not prevent duplicates at the network layer.
It prevents duplicate effects at the application layer through a combination of deduplication, atomic state management, and transactional output.
Key Points
- Exactly-once semantics guarantee that observable state reflects each record being processed once, even though retries and redeliveries may occur internally.
- The guarantee requires atomically coordinating three concerns: input position tracking, state updates, and output production.
- Distributed snapshot algorithms (Chandy-Lamport and its variants) provide a general mechanism for consistent checkpointing in dataflow systems.
- Two-phase commit sinks extend the guarantee to external systems by staging (preparing) writes and committing them only after a checkpoint succeeds.
- EOS introduces costs in latency (checkpoint intervals, transaction coordination), throughput (barrier alignment), and engineering constraints (determinism requirements).
- End-to-end exactly-once requires cooperation from sources (replayable) and sinks (idempotent or transactional), not just the processing engine.
- The distinction between exactly-once delivery (impossible in general) and exactly-once semantics (achievable through careful design) is fundamental.
References
Chandy, K. M. and Lamport, L. "Distributed Snapshots: Determining Global States of Distributed Systems." ACM Transactions on Computer Systems, 3(1), 1985.
Carbone, P., Ewen, S., Haridi, S., Richter, S., and Tzoumas, K. "State Management in Apache Flink." Proceedings of the VLDB Endowment, 10(12), 2017.
Kafka Improvement Proposal KIP-98: Exactly Once Delivery and Transactional Messaging. Apache Software Foundation, 2017.
Akidau, T., Bradshaw, R., Chambers, C., et al. "The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale, Unbounded, Out-of-Order Data Processing." Proceedings of the VLDB Endowment, 8(12), 2015.
Apache Flink FLIP-76: Unaligned Checkpoints. Apache Software Foundation, 2020. https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints