Failure Recovery in Flink

As mentioned previously, stream processing applications in Flink are supposed to be long-lived. So there must be an efficient way to recover from failures without repeating a lot of work. For this purpose, Flink periodically checkpoints the operators’ state and the position of the consumed stream to generate this state. In case of a failure, an application can be restarted from the latest checkpoint and continue processing from there.

All this is achieved via an algorithm similar to the Chandy-Lamport algorithm for distributed snapshots, called Asynchronous Barrier Snapshotting (ABS).

Asynchronous Barrier Snapshotting (ABS)

The ABS algorithm operates slightly differently for acyclic and cyclic graphs, so we will examine the first case here, which is a bit simpler.

Working

The algorithm works in the following way:

  • The Job Manager periodically injects some control records in the stream, referred to as stage barriers. These records are supposed to divide the stream into stages. At the end of a stage, the set of operator states reflects the whole execution history up to the associated barrier. Thus it can be used for a snapshot.

  • When a source task receives a barrier, it takes a snapshot of its current state and then broadcasts the barrier to all its outputs.

  • When a non-source task receives a barrier from one of its inputs, it blocks that input until it has received a barrier from all the inputs. It then takes a snapshot of its current state and broadcasts the barrier to its outputs. Finally, it unblocks its inputs. This blocking guarantees that the checkpoint contains the state after processing all the elements before the barrier and no elements after the barrier.

Note: The snapshot taken while the inputs are blocked are logical , where the actual, physical snapshot is happening asynchronously in the background. One way to achieve this is through copy-on-write techniques. This is done to reduce the duration of this blocking phase to start processing data again as quickly as possible.

  • Once the background copy process is completed, each task acknowledges the checkpoint back to the Job Manager.The checkpoint is considered complete after the job manager receives the acknowledgement from all the tasks and can be used for recovery if a failure happens later. At this point, the Job Manager notifies all the tasks that the checkpoint is complete so that they can perform any cleanup or bookkeeping logic required.

Get hands-on with 1400+ tech skills courses.