Handling message errors using Broadway and AWS SQS
For the past few months, I've been working on an Elixir project for a fintech. This project had one particularity: its main source of data would come from SQS queues -almost no data would be created from the application itself. Since SQS needed to be used, Broadway seemed like the right tool for the job.
In this post, I am going to talk a little about how I managed to set up a data ingestion pipeline using Broadway to consume data from an SQS queue while dealing with corrupt messages and other constraints. I'll use simpler schemas and data to represent stuff, but it should help you in complex cases, such as the one I went through.
The project needed to be able to turn the data that came from the SQS queue into its Ecto schemas and insert those into the database. This meant that it was needed to do some data transformation with the message's contents.
Table of contents:
Base concerns
I had two main concerns while thinking about how to properly set up my data ingestion pipeline:
- Dealing with corrupt messages
- Dealing with message's relationships
Dealing with the possibility of having corrupt messages in the pipeline meant I had to do two things:
- Parse the message's JSON data into an elixir data structure
- Check whether this data structure conforms to a pre-defined schema or struct. I used Ecto's embedded schemas for this.
A constraint of this project was that it had many relationships in between its schemas and it was never supposed to insert data that had missing relationships counterparts. e.g.: If a message references a user that doesn't yet exist in the database it shouldn't yet be inserted/ingested -it should go back to the queue. e.g.:
- Message A is received and its content contains an order that belongs to a user X
- The pipeline checks if user X is already in the application's database
- If user X doesn't yet exist then message A should go back to the queue (and this should be repeated until user X exists or the message gets removed from the queue)
- If user X exists then message A gets consumed/inserted and then gets acknowledged (removed from the queue)
Base application setup
We'll start by creating a Phoenix application and adding Broadway and BroadwaySQS to our dependencies.
mix phx.new post --no-html --no-assets --no-dashboard --no-live --no-mailer
# post/config/config.exs
config :post,
generators: [binary_id: true],
ecto_repos: [Post.Repo]
config :post, Post.Repo,
migration_primary_key: [type: :binary_id],
migration_foreign_key: [type: :binary_id]
# mix.exs
defp deps do
[
{:broadway, "~> 1.0"},
{:broadway_sqs, "~> 0.7.0"},
{:hackney, "~> 1.9"}
]
end
Having that setup, it is now time to get a Postgres database up and running, create our migrations, and then create our schemas.
mix ecto.gen.migration add_users && mix ecto.gen.migration add_orders
For this post, let's assume the messages our application will consume can be of two kinds:
// For the user:
{
"external_id": "some_string",
"name": "some_string"
}
// For an order:
{
"description": "some_string",
"cart_list": [
{"price": "some_decimal"},
{"price": "some_decimal"}
],
"external_id": "some_string",
"user_id": "some_external_id(string)"
}
Similarly, we'll assume our schemas like the following:
user {
external_id: string,
name: string
}
order {
description: string,
price: decimal,
user_id: uuid
}
Users migration:
defmodule Post.Repo.Migrations.AddUsers do
use Ecto.Migration
def change do
create table(:users) do
add :name, :string, null: false
add :external_id, :string, null: false, unique: true
timestamps()
end
end
end
defmodule Post.Repo.Migrations.AddOrders do
use Ecto.Migration
def change do
create table(:orders) do
add :description, :string, null: false
add :price, :decimal, null: false
add :external_id, :string, null: false, unique: true
add :user_id, references(:users)
timestamps()
end
end
end
With the migrations configured, lets run mix ecto.setup
and create the schemas
defmodule Post.User do
use Ecto.Schema
import Ecto.Changeset
@primary_key {:id, :binary_id, autogenerate: true}
@fields [:name, :external_id]
schema "users" do
field :name, :string
field :external_id, :string
end
def changeset(params) do
%__MODULE__{}
|> cast(params, @fields)
|> unique_constraint([:external_id])
end
end
defmodule Post.Order do
use Ecto.Schema
import Ecto.Changeset
alias Post.User
@primary_key {:id, :binary_id, autogenerate: true}
@foreign_key_type :binary_id
@fields [:description, :price, :external_id, :user_id]
schema "orders" do
field :description, :string
field :price, :decimal
field :external_id, :string
timestamps()
belongs_to :user, User
end
def changeset(params) do
%__MODULE__{}
|> cast(params, @fields)
|> unique_constraint([:external_id])
end
end
With the schemas set up, it is time to move on.
Dealing with corrupt messages
The first step to get Broadway and SQS up and running is to install its dependencies and configure the necessary keys. I won't go into much detail on Broadway's installation since this is well covered by its documentation. Let's create our "Pipeline" (which is how I'll call our SQS message consuming functions/steps):
defmodule Post.SQS.Broadway do
use Broadway
def start_link(_opts) do
producer_module = Application.fetch_env!(:post, :sqs_producer)
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module: producer_module
],
processors: [
default: [concurrency: 50]
],
batchers: [
default: [concurrency: 5, batch_size: 10, batch_timeout: 1000]
]
)
end
def handle_message(_processor_name, message, _context) do
message
end
def handle_batch(_batcher, messages, _batch_info, _config) do
messages
end
def handle_failed(messages, _context) do
messages
end
end
This pipeline is pretty dummy as of now: it simply passes messages throughout the callbacks. Let's now tackle our first problem: validating data integrity.
We'll start by defining a module and a utility function for handling message errors and encode the message's JSON data (I decided to use Jason, which was already included in the project)
defmodule Post.SQS.Handlers do
alias Broadway.Message
def handle_error(message, :invalid_json),
do: Message.failed(message, Jason.encode!(%{type: :malformed_json}))
def wrapped_decode(json) do
case Jason.decode(json) do
{:ok, data} -> {:ok, data}
{:error, _r} -> {:error, :invalid_json}
end
end
def verify(%{data: data} = message) do
case wrapped_decode(data) do
{:ok, map} -> Message.update_data(message, fn _ -> map end)
{:error, message_reason} -> handle_error(message, message_reason)
end
end
end
The code above does two things:
- Wraps Jason.decode into a function that omits the decoding failure reason so we can get more consistent errors
- Tries to decode a message's data. If it succeeds, then the message's data gets updated, if not, then the message gets flagged as a failure with the failure type being "malformed_json"
The handle_error
function is pretty useful: using it we're able to attach error reasons to our messages and then decide how we'll handle their errors on the handle_failed
callback.
Now let's hook it to the pipeline and change the handle_failed
callback:
# Post.SQS.Broadway
def handle_message(_processor_name, message, _context) do
Handlers.verify(message)
end
def handle_failed(messages, _context) do
messages
end
def handle_failed(messages, _context) do
statuses = Enum.map(messages, fn %{status: status} -> status end)
IO.inspect(%{failed: "failed", statuses: statuses})
messages
|> Enum.map(fn m -> Broadway.Message.configure_ack(m, on_failed: :ack) end)
end
Since messages that aren't possible to decode can't be used, we'll simply log those and acknowledge them, so we don't end up processing these more than once.
The fact that Jason can decode a message isn't yet a guarantee the data the application receives is the way it should be: there may be type mismatches between the application schema and the json data, for example. To handle this, we'll define two of Ecto's embedded schemas.
defmodule Post.SQS.Fields do
@doc """
A module containing utility functions for transforming
changesets into result tuples
"""
defmacro __using__(_) do
quote do
use Ecto.Schema
import Ecto.Changeset
import Post.SQS.Fields
end
end
def parse(
%Ecto.Changeset{valid?: true, changes: changes},
relationship_keys
) do
mounted_relationships_data =
relationship_keys
|> Enum.reduce(changes, fn rlk, acc -> map_relationships_from_struct(rlk, acc) end)
{:ok, mounted_relationships_data}
end
def parse(%Ecto.Changeset{valid?: false} = changeset, _relationship_keys),
do: {:error, to_errors(changeset)}
def parse(%Ecto.Changeset{
valid?: true,
changes: changes
}),
do: {:ok, changes}
def parse(%Ecto.Changeset{valid?: false} = changeset),
do: {:error, to_errors(changeset)}
defp to_errors(changeset),
do:
Ecto.Changeset.traverse_errors(changeset, fn {message, opts} ->
Regex.replace(~r"%{(\w+)}", message, fn _, key ->
opts |> Keyword.get(String.to_existing_atom(key), key) |> to_string()
end)
end)
defp extract(data) when is_list(data), do: Enum.map(data, &Map.get(&1, :changes))
defp extract(data) when is_map(data), do: Map.get(data, :changes)
def map_relationships_from_struct(key, item) do
relationship_items =
item
|> Map.get(key)
|> extract()
Map.put(item, key, relationship_items)
end
end
defmodule Post.SQS.User do
use Post.SQS.Fields
import Ecto.Changeset
@fields [:external_id, :name]
embedded_schema do
field :external_id, :string
field :name, :string
end
def changeset(params) do
%__MODULE__{}
|> cast(params, @fields)
|> validate_required(@fields)
end
def to_result(user_changeset), do: parse(user_changeset)
end
defmodule Post.SQS.OrderItem do
use Post.SQS.Fields
@fields [:price]
embedded_schema do
field :price, :decimal
end
def changeset(_order, params) do
%__MODULE__{}
|> cast(params, @fields)
|> validate_required(@fields)
end
end
defmodule Post.SQS.Order do
use Post.SQS.Fields
alias Post.SQS.OrderItem
@fields [:description, :external_id, :user_id]
embedded_schema do
field :external_id, :string
field :description, :string
field :user_id, :string
embeds_many :cart_list, OrderItem
end
def changeset(params) do
%__MODULE__{}
|> cast(params, @fields)
|> validate_required(@fields)
|> cast_embed(:cart_list, reqired: true)
end
def to_result(order_changeset), do: parse(order_changeset, [:cart_list])
end
In order to use the changeset's for data checking, we'll modify our Handlers.verify
function
def handle_error(message, :unrecognized_data_structure),
do: Message.failed(message,
Jason.encode!(%{type: :unrecognized_data_structure}))
def handle_error(message, :changeset_error),
do: Message.failed(message, Jason.encode!(%{type: :changeset_error}))
def verify_changeset(%{name: _} = data) do
changeset = Post.SQS.User.changeset(data)
case Post.SQS.User.to_result(changeset) do
{:ok, data} -> {:ok, data}
{:error, _reason} -> {:error, :changeset_error}
end
end
def verify_changeset(%{description: _} = data) do
changeset = Post.SQS.Order.changeset(data)
case Post.SQS.Order.to_result(changeset) do
{:ok, data} -> {:ok, data}
{:error, _reason} -> {:error, :changeset_error}
end
end
def verify_changeset(_data), do:
{:error, :unrecognized_data}
def verify(%{data: data} = message) do
with {:ok, map} <- wrapped_decode(data),
{:ok, verified_data} <- verify_changeset(map) do
Message.update_data(message, fn _ -> verified_data end)
else
{:error, reason} ->
handle_error(message, reason)
end
end
Our application is now ready to check whether messages are valid and conform to our specs. If we then feed SQS a corrupt message while our application is running we'll see two things happening:
- Our application logging the failed message
- The failed message being acknowledged (and therefore not being consumed again)
Messages and relationships
As previously stated, every Order in our application depends on the existence of a user with the order's user_id
. Let's say we receive a message that contains an order with the user_id of x123
, but there is no user with an external_id
for this value in our database. How can we handle this situation?
There are two main ways we can tackle this issue:
- Inserting the order and later creating its relationships
- Sending back the order's message to the queue hoping a message for the user with
external_id: "x123"
gets into our queue before the order message "expires".
To be quite honest, I'd always pick the first option, but I couldn't. For several reasons, the application I was working on wasn't allowed to have "incomplete" data. Since I ended up going with the second option, I'll talk about how I did it.
To start, we'll create a new module, which will be responsible for checking our order messages' relationships:
defmodule Post.SQS.Checker do
import Ecto.Query
alias Post.{Repo, User, SQS.Handlers}
def get_user_by_external_id(%{user_id: user_external_id} = data) do
case Repo.one(
from(u in User, where: field(u, :external_id)
== ^user_external_id, select: u)) do
nil -> {:error, :missing_user}
_user -> {:ok, data}
end
end
def check_dependencies(%{user_id: _, description: _} = data), do:
get_user_by_external_id(data)
def check_dependencies(%{name: _, external_id: _} = data), do:
{:ok, data}
# There is no point in checking messages
# that have failed the previous steps
def check(%{status: {:failed, _reason}} = message), do: message
def check(%{data: data, status: :ok} = message) do
case check_dependencies(data) do
{:ok, _data} -> message
{:error, error} -> Handlers.handle_error(message, error)
end
end
end
We'll also need to make sure our Handlers.handle_error
deals with the missing_user
error type:
def handle_error(message, :invalid_json),
do: Message.failed(message, Jason.encode!(%{type: :malformed_json}))
It is then time to hook our check
function to our pipeline using the handle_message
callback:
def handle_message(_processor_name, message, _context) do
message
|> Handlers.verify()
|> Checker.check()
end
Since we implemented all the error handling using the Handlers.handle_error
function, we're now able to decide how to deal with the failed messages in handle_failed
by checking the message's error message (which is a JSON containing the error type):
def extract_error_type(%{status: status} = message), do:
%{message: message, error: Jason.decode!(status)}
def configure_ack_based_on_error(
%{error: %{type: :changeset_error}, message: m}), do:
Broadway.Message.configure_ack(m, on_failed: :ack)
def configure_ack_based_on_error(
%{error: %{type: :invalid_json}, message: m}), do:
Broadway.Message.configure_ack(m, on_failed: :ack)
def configure_ack_based_on_error(
%{error: %{type: :unrecognized_data_structure}, message: m}), do:
Broadway.Message.configure_ack(m, on_failed: :ack)
# When an user is missing we do not ackwnoledge the message
# so it goes back to the queue :)
def configure_ack_based_on_error(
%{error: %{type: :missing_user}, message: m}), do:
Broadway.Message.configure_ack(m, on_failed: :noop)
def handle_failed(messages, _context) do
messages
|> Enum.map(&extract_error_type/1)
|> Enum.map(&configure_ack_based_on_error/1)
end
There are a few situations that will flag a message as corrupt:
- A message that can't be decoded
(invalid_json)
- A message that doesn't conform to our embedded schemas
(changeset_error)
- A message that doesn't pattern match our functions
(unrecognized_data_structure)
Since corrupt messages can't be consumed we simply acknowledge them - it would also be wise to log these messages, which can be done using some of AWS's services or something like Sentry, AppSignal.
When messages that depend on data that doesn't yet exist (missing_user
, in this case) get through our pipeline, we simply send these back to the queue.
We would then be able to insert data into our database, but that is a point I wont cover since it is pretty straightforward.