Skip to content

Events from internal buffer are not sent after consumer reconnect #311

Open
@stefanc18

Description

@stefanc18

Hi,

We've encountered an unexpected behavior with GenStage's buffering. According to the Buffering docs, events will be placed in the internal buffer whenever the producer tries to send an event but there are no consumers, and when the consumers come back available they will receive the buffered events.

However it seems that this is not happening as expected. When a consumer goes down, events are queued, then the consumer comes back up, the events from the buffer are not sent anymore, but kept in the buffer continuously.

Here is an example to reproduce the issue. We have a producer and a consumer on 2 different nodes.

producer.ex:

defmodule Producer do
  use GenStage

  def start_link(_args) do
    GenStage.start_link(__MODULE__, [])
  end

  def init(_args) do
    {:producer, %{events: [], pending_demand: 0}}
  end

  def handle_cast({:add_event, event}, state) do
    new_events = state.events ++ [event]

    {events_to_dispatch, remaining_events} = Enum.split(new_events, state.pending_demand)

    events_to_dispatch_count = length(events_to_dispatch)
    new_state = %{state | events: remaining_events, pending_demand: state.pending_demand - events_to_dispatch_count}

    {:noreply, events_to_dispatch, new_state}
  end

  def handle_demand(demand, state) do
    {events_to_dispatch, remaining_events} = Enum.split(state.events, demand)

    events_to_dispatch_count = length(events_to_dispatch)
    new_state = %{state | events: remaining_events, pending_demand: state.pending_demand + demand - events_to_dispatch_count}

    {:noreply, events_to_dispatch, new_state}
  end
end

consumer.ex:

defmodule Consumer do
  use GenStage

  def start_link(consumer_index) do
    GenStage.start_link(__MODULE__, consumer_index)
  end

  @impl GenStage
  def init(_args) do
    {:consumer, :ok, subscribe_to: [{{:global, :producer}, max_demand: 1, min_demand: 0}]}
  end

  @impl GenStage
  def handle_events(events, _from, state) do
    IO.inspect("Handling events: #{inspect(events)}")

    {:noreply, [], state}
  end
end

These files were placed in a mix project. Steps taken to reproduce:

  1. Open 2 terminals
  2. Run iex --sname node_1 -S mix run --no-halt in the first terminal and iex --sname node_2 -S mix run --no-halt in the second terminal
  3. Start the producer in the first terminal: GenStage.start_link(Producer, [], name: {:global, :producer})
  4. Connect nodes and start the consumer from the second terminal: Node.connect(:"node_1@myuser") and GenStage.start_link(Consumer, :ok)
  5. Send an event from the first terminal to verify that it works: GenServer.whereis({:global, :producer}) |> GenServer.cast({:add_event, "event1"}). The log from the consumer should be visible in the second terminal.
  6. Stop the second node (Ctrl+C in the second terminal); now the consumer is stopped and we have only the producer running.
  7. Send another event from the first terminal (while there is no consumer).
  8. Run GenServer.whereis({:global, :producer}) |> :sys.get_state() in the first terminal and notice that the event we just sent is now in the internal buffer of the producer (for example: buffer: {{["event2"], []}, 1, 10000})
  9. Start again the consumer by repeating step 4 Notice that even the consumer is now back up, it didn't receive the event sent at step 7. By running GenServer.whereis({:global, :producer}) |> :sys.get_state() in the first terminal, notice how the event is still in the internal buffer.

The expectation, at least from my understanding in the docs, is that as soon as the consumer gets connected back to the producer, it should receive the event from the buffer. It does not. The consumer basically becomes stale, not demanding anymore events after this.

I investigated a bit the dispatcher code and noticed that if I change this line of code

from this:

{:ok, 0, {demands, current + pending, max, shuffle_demand}}

to this:

{:ok, 0, {demands, pending, max, shuffle_demand}}

the problem is fixed. I'm not sure if this is the correct fix and what other implications this change has, but changing this will keep the pending to 0, so when the consumer reconnects and asks again for an event, the event from the buffer will be sent & received by the consumer.

Creating this issue as this looks like a bug in the dispatcher's implementation, but maybe I'm missing something. Looking forward to your thoughts.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions