Writing a GenStage producer for AWS SQS
February 1, 2017
When I first saw GenStage
I thought it would be a great fit for processing messages from an AWS SQS queue. This is something I’ve done a bit of in other languages, but elixir and particularly GenStage
gives a nice abstraction for this type of problem. For the purpose of this post I’m going to ignore the actual processing of the messages from SQS queue (consumer GenStage
) and only focus on writing a GenStage
producer that gets data from a SQS queue.
Before we start we will want to add some dependencies to our project:
def deps do
{:ex_aws, "~> 1.1.0"},
{:poison, ">= 1.2.0"},
{:hackney, "~> 1.6"},
{:sweet_xml, "~> 0.6"},
{:gen_stage, "~> 0.11.0"},
: we’ll use this to interact with AWS and specifically the SQS queuehackney
: Used by ExAws to make http requests to AWSpoison
: Needed by ExAWS for JSON decoding/encodingsweet_xml
: SQS API responses are in XML 😦, but with this installed ExAws will handle parsing responses into Mapsgen_stage
: the thing we are here for 😃
You will also need to configure ex_aws
and you should look at the Getting Started section of the ex_aws
SQS will default to us-east-1
region, but this and other setting can be changed in the config.
So now lets get started writing our GenStage
defmodule SQSProducer do
use GenStage
def start_link(sqs_queue_name, opts \\ []) do
GenStage.start_link(__MODULE__, sqs_queue_name, opts)
This is a pretty bare-bones start_link
definition, we pass the module and the SQS queue name that we will be getting messages from. Then we also take options for the GenStage
Next we need out init
function that will setup the state of our process as a map with the SQS queue name.
def init(sqs_queue_name) do
{:producer, %{queue: queue_name}}
Now we can start on our GenStage
implementation. The simplest and most naive approach would be to directly get messages from SQS every time we receive demand.
def handle_demand(incoming_demand, state) do
aws_resp = ExAws.SQS.receive_message(
max_number_of_messages: min(state.demand, incoming_demand)
|> ExAws.request
messages = case aws_resp do
{:ok, resp} ->
{:error, reason} ->
{:noreply, messages, state}
For production use we would not just ignore errors like we are doing here, we would also likely refactor the getting of message to its own function. But what else is wrong here?
- What happens if our SQS queue is empty?
- What happens if SQS call does error?
With the default GenStage
consumer events are asked for on start up and then after handling received events. If we return no events from our handle_demand/2
function, then we would never call SQS again to satisfy out demand. This would only work if our SQS queue had more messages than we were asking for and AWS was always available.
So lets try to improve this by separating getting messages from SQS from the handle_demand/2
function. But first we will need to add our outstanding demand to the process state.
def init(sqs_queue_name) do
{:producer, %{queue: queue_name, demand: 0}}
Now we need to update the state when we receive demand.
def handle_demand(incoming_demand, state) do
new_demand = state.demand + incoming_demand
{:noreply, [], %{state| demand: new_demand}}
But we still need to get messages from SQS, so lets send ourselves a message to do that.
def handle_demand(incoming_demand, state) do
new_demand = state.demand + incoming_demand
Process.send(self(), :get_messages, [])
{:noreply, [], %{state| demand: new_demand}}
Then we need to handle the message.
def handle_info(:get_messages, state) do
aws_resp = ExAws.SQS.receive_message(
max_number_of_messages: min(state.demand, 10)
|> ExAws.request
messages = case aws_resp do
{:ok, resp} ->
{:error, reason} ->
new_demand = state.demand - Enum.count(messages)
{:noreply, messages, %{state| demand: new_demand}}
This is better, but still doesn’t fix our issues when SQS has no messages or errors. Lets add some code to keep getting messages until our demand = 0.
def handle_info(:get_messages, state) do
aws_resp = ExAws.SQS.receive_message(
max_number_of_messages: min(state.demand, 10)
|> ExAws.request
messages = case aws_resp do
{:ok, resp} ->
{:error, reason} ->
new_demand = state.demand - Enum.count(messages)
cond do
new_demand == 0 -> :ok
true -> Process.send(self(), :get_messages, [])
{:noreply, messages, %{state| demand: new_demand}}
Thats good, but if SQS does return 0 messages then we are going to be constantly making requesting until we get messages and our demand is not zero. So lets add some additional logic for that case.
def handle_info(:get_messages, state) do
aws_resp = ExAws.SQS.receive_message(
max_number_of_messages: min(state.demand, 10)
|> ExAws.request
messages = case aws_resp do
{:ok, resp} ->
{:error, reason} ->
num_messages_received = Enum.count(messages)
new_demand = state.demand - num_messages_received
cond do
new_demand == 0 -> :ok
num_messages_received == 0 ->
Process.send_after(self(), :get_messages, 200)
true ->
Process.send(self(), :get_messages, [])
{:noreply, messages, %{state| demand: new_demand}}
We could improve this more by adding some back-off if we repeatedly get 0 messages back from SQS, but this will do for now.
Now we have a new problem, when we have multiple consumers we will send ourselves multiple :get_messages
messages. Again flooding SQS, so lets fix that by only sending the message when demand is 0.
def handle_demand(incoming_demand, %{demand: 0} = state) do
new_demand = state.demand + incoming_demand
Process.send(self(), :get_messages, [])
{:noreply, [], %{state| demand: new_demand}}
def handle_demand(incoming_demand, state) do
new_demand = state.demand + incoming_demand
{:noreply, [], %{state| demand: new_demand}}
And thats pretty much it for a basic implementation. There are several things we would want to improve for production.
- Refactor processing into smaller testable functions
- Handle errors from AWS
- Implement locking of messages across producers/nodes/instances
defmodule SQSProducer do
use GenStage
def start_link(queue_name, opts \\ []) do
GenStage.start_link(__MODULE__, queue_name, opts)
def init(queue_name) do
state = %{
demand: 0,
queue: queue_name
{:producer, state}
def handle_demand(incoming_demand, %{demand: 0} = state) do
new_demand = state.demand + incoming_demand
Process.send(self(), :get_messages, [])
{:noreply, [], %{state| demand: new_demand}}
def handle_demand(incoming_demand, state) do
new_demand = state.demand + incoming_demand
{:noreply, [], %{state| demand: new_demand}}
def handle_info(:get_messages, state) do
aws_resp = ExAws.SQS.receive_message(
max_number_of_messages: min(state.demand, 10)
|> ExAws.request
messages = case aws_resp do
{:ok, resp} ->
{:error, reason} ->
# You probably want to handle errors differently than this.
num_messages_received = Enum.count(messages)
new_demand = max(state.demand - num_messages_received, 0)
cond do
new_demand == 0 -> :ok
num_messages_received == 0 ->
Process.send_after(self(), :get_messages, 200)
true ->
Process.send(self(), :get_messages, [])
{:noreply, messages, %{state| demand: new_demand}}
By request I created an example repo here: https://github.com/TattdCodeMonkey/genstage_sqs_example with this code as well as an OTP app and bare bones consumer I used to test the code in this post.