Skip to content

Commit

Permalink
Optionally use genstage for consuming publish_mutation broadcast events
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanjos committed Jun 12, 2024
1 parent 253b1f3 commit 4400f0e
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 3 deletions.
12 changes: 12 additions & 0 deletions lib/absinthe/subscription/local_consumer.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
defmodule Absinthe.Subscription.LocalConsumer do
@moduledoc """
Processes the publish_mutation request on the
local node.
"""

def start_link({pubsub, mutation_result, subscribed_fields}) do
Task.start_link(fn ->
Absinthe.Subscription.Local.publish_mutation(pubsub, mutation_result, subscribed_fields)
end)
end
end
20 changes: 20 additions & 0 deletions lib/absinthe/subscription/local_consumer_supervisor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
defmodule Absinthe.Subscription.LocalConsumerSupervisor do
@moduledoc """
Supervisor for consuming publish_mutation requests
"""

use ConsumerSupervisor

alias Absinthe.Subscription.LocalProducer
alias Absinthe.Subscription.LocalConsumer

def start_link(args) do
ConsumerSupervisor.start_link(__MODULE__, args)
end

def init([min_demand, max_demand]) do
children = [%{id: LocalConsumer, start: {LocalConsumer, :start_link, []}, restart: :transient}]
opts = [strategy: :one_for_one, subscribe_to: [{LocalProducer, min_demand: min_demand, max_demand: max_demand}]]
ConsumerSupervisor.init(children, opts)
end
end
40 changes: 40 additions & 0 deletions lib/absinthe/subscription/local_producer.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
defmodule Absinthe.Subscription.LocalProducer do
@moduledoc """
GenStage producer that listens for `publish_mutation` broadcasts
in order to process the subscriptions on this node.
"""
use GenStage

def start_link(args) do
GenStage.start_link(__MODULE__, args)
end

def topic(shard), do: "__absinthe__:proxy:#{shard}"

def init([pubsub, shards, buffer_size]) do
Enum.each(shards, fn shard ->
:ok = pubsub.subscribe(topic(shard))
end)

# default buffer_size
{:producer, %{pubsub: pubsub, node: pubsub.node_name()}, buffer_size: buffer_size}
end

@doc """
Callback for the consumer to ask for more
subscriptions to process. Since we will be sending
them immediately when we get a message from pubsub,
this just sends an empty list
"""
def handle_demand(_demand, state) do
{:noreply, [], state}
end

def handle_info(payload, state) do
if payload.node == state.node do
{:noreply, [], state}
else
{:noreply, [{state.pubsub, payload.mutation_result, payload.subscribed_fields}], state}
end
end
end
23 changes: 23 additions & 0 deletions lib/absinthe/subscription/stage_supervisor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
defmodule Absinthe.Subscription.StageSupervisor do
@moduledoc false

use Supervisor

def start_link([pubsub, registry, pool_size]) do
Supervisor.start_link(__MODULE__, {pubsub, registry, pool_size})
end

def init({pubsub, _registry, pool_size}) do
min_demand = 1
max_demand = pool_size
shards = Enum.to_list(0..(pool_size - 1))
buffer_size = 10_000

children = [
{Absinthe.Subscription.LocalProducer, [pubsub, shards, buffer_size]},
{Absinthe.Subscription.LocalConsumerSupervisor, [min_demand, max_demand]}
]

Supervisor.init(children, strategy: :one_for_one)
end
end
13 changes: 10 additions & 3 deletions lib/absinthe/subscription/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,21 @@ defmodule Absinthe.Subscription.Supervisor do

pool_size = Keyword.get(opts, :pool_size, System.schedulers_online() * 2)
compress_registry? = Keyword.get(opts, :compress_registry?, true)
use_stage? = Keyword.get(opts, :use_stage?, true)

Supervisor.start_link(__MODULE__, {pubsub, pool_size, compress_registry?})
Supervisor.start_link(__MODULE__, {pubsub, pool_size, compress_registry?, use_stage?})
end

def init({pubsub, pool_size, compress_registry?}) do
def init({pubsub, pool_size, compress_registry?, use_stage?}) do
registry_name = Absinthe.Subscription.registry_name(pubsub)
meta = [pool_size: pool_size]

supervisor = if use_stage? do
Absinthe.Subscription.StageSupervisor
else
Absinthe.Subscription.ProxySupervisor
end

children = [
{Registry,
[
Expand All @@ -40,7 +47,7 @@ defmodule Absinthe.Subscription.Supervisor do
meta: meta,
compressed: compress_registry?
]},
{Absinthe.Subscription.ProxySupervisor, [pubsub, registry_name, pool_size]}
{supervisor, [pubsub, registry_name, pool_size]}
]

Supervisor.init(children, strategy: :one_for_one)
Expand Down
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ defmodule Absinthe.Mixfile do

defp deps do
[
{:gen_stage, "~> 1.2"},
{:nimble_parsec, "~> 1.2.2 or ~> 1.3"},
{:telemetry, "~> 1.0 or ~> 0.4"},
{:dataloader, "~> 1.0.0 or ~> 2.0", optional: true},
Expand Down
1 change: 1 addition & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"},
"ex_doc": {:hex, :ex_doc, "0.30.9", "d691453495c47434c0f2052b08dd91cc32bc4e1a218f86884563448ee2502dd2", [:mix], [{:earmark_parser, "~> 1.4.31", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "d7aaaf21e95dc5cddabf89063327e96867d00013963eadf2c6ad135506a8bc10"},
"file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"},
"gen_stage": {:hex, :gen_stage, "1.2.1", "19d8b5e9a5996d813b8245338a28246307fd8b9c99d1237de199d21efc4c76a1", [:mix], [], "hexpm", "83e8be657fa05b992ffa6ac1e3af6d57aa50aace8f691fcf696ff02f8335b001"},
"makeup": {:hex, :makeup, "1.1.1", "fa0bc768698053b2b3869fa8a62616501ff9d11a562f3ce39580d60860c3a55e", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "5dc62fbdd0de44de194898b6710692490be74baa02d9d108bc29f007783b0b48"},
"makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"},
"makeup_erlang": {:hex, :makeup_erlang, "0.1.2", "ad87296a092a46e03b7e9b0be7631ddcf64c790fa68a9ef5323b6cbb36affc72", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "f3f5a1ca93ce6e092d92b6d9c049bcda58a3b617a8d888f8e7231c85630e8108"},
Expand Down

0 comments on commit 4400f0e

Please sign in to comment.