Skip to content

Commit

Permalink
Merge pull request #1 from Frameio/cschiewek/re-apply-changes
Browse files Browse the repository at this point in the history
Update main from private fork
  • Loading branch information
cschiewek authored Feb 29, 2024
2 parents 5a97423 + b9da99b commit 61ead74
Showing 1 changed file with 45 additions and 19 deletions.
64 changes: 45 additions & 19 deletions lib/absinthe/subscription/local.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ defmodule Absinthe.Subscription.Local do
This module handles broadcasting documents that are local to this node
"""

require Logger
# This module handles running and broadcasting documents that are local to this
# node.

alias Absinthe.Phase
alias Absinthe.Pipeline
alias Absinthe.Pipeline.BatchResolver

# This module handles running and broadcasting documents that are local to this
# node.
require Logger

@doc """
Publish a mutation to the local node only.
Expand All @@ -32,11 +34,29 @@ defmodule Absinthe.Subscription.Local do
:ok
end

alias Absinthe.{Phase, Pipeline}

defp run_docset(pubsub, docs_and_topics, mutation_result) do
for {topic, key_strategy, doc} <- docs_and_topics do
try do
run_docset(pubsub, docs_and_topics, mutation_result, %{})
end

defp run_docset(_pubsub, [], _, _memo), do: :ok

defp run_docset(pubsub, [{topic, _key_strategy, doc} | rest], mutation_result, memo) do
{data, updated_memo} = resolve_doc(doc, mutation_result, memo)
:ok = pubsub.publish_subscription(topic, data)
run_docset(pubsub, rest, mutation_result, updated_memo)
rescue
e ->
BatchResolver.pipeline_error(e, __STACKTRACE__)
end

defp resolve_doc(doc, mutation_result, memo) do
doc_key = get_doc_key(doc)

case Map.get(memo, doc_key) do
%{} = memoized_result ->
{memoized_result, memo}

nil ->
pipeline =
doc.initial_phases
|> Pipeline.replace(
Expand All @@ -60,21 +80,27 @@ defmodule Absinthe.Subscription.Local do

{:ok, %{result: data}, _} = Absinthe.Pipeline.run(doc.source, pipeline)

Logger.debug("""
Absinthe Subscription Publication
Field Topic: #{inspect(key_strategy)}
Subscription id: #{inspect(topic)}
Data: #{inspect(data)}
""")

:ok = pubsub.publish_subscription(topic, data)
rescue
e ->
BatchResolver.pipeline_error(e, __STACKTRACE__)
end
updated_memo = Map.put(memo, doc_key, data)

{data, updated_memo}
end
end

defp get_doc_key(doc) do
variables =
Enum.flat_map(doc.initial_phases, fn phase ->
case phase do
{Absinthe.Phase.Document.Variables, opts} ->
opts[:variables]

_ ->
[]
end
end)

:erlang.term_to_binary({doc.source, variables})
end

defp get_docs(pubsub, field, mutation_result, topic: topic_fun)
when is_function(topic_fun, 1) do
do_get_docs(pubsub, field, topic_fun.(mutation_result))
Expand Down

0 comments on commit 61ead74

Please sign in to comment.