Implement handle_message/3
Learn to process incoming messages through the handle_message callback function.
Callback function handle_message/3
Incoming messages sent by the broker are processed by handle_message/3
. Within this callback, we can look at the given message and its data. We can use it to perform any work.
The handle_message/3
callback is special. All callbacks that we’ve seen before when using GenServer
and GenStage
run within the current process. However, code written in handle_message/3
is executed by a processor. Processors concurrently run the processes, which are started by Broadway
to perform the work in handle_message/3
. They also isolate any potential errors. The pipeline is restarted and quickly brought back to a working state if an exception happens.
There are three arguments given to handle_message/3
:
-
We have the current processor group, which is the
:default
atom for now. -
We have the message itself, in the form of a
%Broadway.Message{}
struct. -
We have the context value, which is optional and available to set in
start_link/2
.
The callback must return a %Broadway.Message{}
struct. We’ll see what this struct contains in just a moment.
Let’s implement handle_message/3
in the following way:
Get hands-on with 1300+ tech skills courses.