diff --git a/lib/absinthe/subscription/local.ex b/lib/absinthe/subscription/local.ex index b128bc4c..be606309 100644 --- a/lib/absinthe/subscription/local.ex +++ b/lib/absinthe/subscription/local.ex @@ -3,12 +3,14 @@ defmodule Absinthe.Subscription.Local do This module handles broadcasting documents that are local to this node """ - require Logger + # This module handles running and broadcasting documents that are local to this + # node. + alias Absinthe.Phase + alias Absinthe.Pipeline alias Absinthe.Pipeline.BatchResolver - # This module handles running and broadcasting documents that are local to this - # node. + require Logger @doc """ Publish a mutation to the local node only. @@ -32,11 +34,29 @@ defmodule Absinthe.Subscription.Local do :ok end - alias Absinthe.{Phase, Pipeline} - defp run_docset(pubsub, docs_and_topics, mutation_result) do - for {topic, key_strategy, doc} <- docs_and_topics do - try do + run_docset(pubsub, docs_and_topics, mutation_result, %{}) + end + + defp run_docset(_pubsub, [], _, _memo), do: :ok + + defp run_docset(pubsub, [{topic, _key_strategy, doc} | rest], mutation_result, memo) do + {data, updated_memo} = resolve_doc(doc, mutation_result, memo) + :ok = pubsub.publish_subscription(topic, data) + run_docset(pubsub, rest, mutation_result, updated_memo) + rescue + e -> + BatchResolver.pipeline_error(e, __STACKTRACE__) + end + + defp resolve_doc(doc, mutation_result, memo) do + doc_key = get_doc_key(doc) + + case Map.get(memo, doc_key) do + %{} = memoized_result -> + {memoized_result, memo} + + nil -> pipeline = doc.initial_phases |> Pipeline.replace( @@ -60,21 +80,27 @@ defmodule Absinthe.Subscription.Local do {:ok, %{result: data}, _} = Absinthe.Pipeline.run(doc.source, pipeline) - Logger.debug(""" - Absinthe Subscription Publication - Field Topic: #{inspect(key_strategy)} - Subscription id: #{inspect(topic)} - Data: #{inspect(data)} - """) - - :ok = pubsub.publish_subscription(topic, data) - rescue - e -> - BatchResolver.pipeline_error(e, __STACKTRACE__) - end + updated_memo = Map.put(memo, doc_key, data) + + {data, updated_memo} end end + defp get_doc_key(doc) do + variables = + Enum.flat_map(doc.initial_phases, fn phase -> + case phase do + {Absinthe.Phase.Document.Variables, opts} -> + opts[:variables] + + _ -> + [] + end + end) + + :erlang.term_to_binary({doc.source, variables}) + end + defp get_docs(pubsub, field, mutation_result, topic: topic_fun) when is_function(topic_fun, 1) do do_get_docs(pubsub, field, topic_fun.(mutation_result))