From 19ff700d689e9f202822f224ca580d6796e6dc4a Mon Sep 17 00:00:00 2001 From: Brendan Quinn Date: Fri, 26 Apr 2024 22:01:27 +0000 Subject: [PATCH] Refactor batch deletes --- app/lib/meadow/batches.ex | 188 ++++++++++-------- app/lib/meadow/data/indexer.ex | 21 +- app/lib/meadow/data/schemas/batch.ex | 6 +- app/lib/meadow/repo.ex | 41 ++++ .../20240425205546_add_fields_to_batches.exs | 10 + 5 files changed, 174 insertions(+), 92 deletions(-) create mode 100644 app/priv/repo/migrations/20240425205546_add_fields_to_batches.exs diff --git a/app/lib/meadow/batches.ex b/app/lib/meadow/batches.ex index 2df096326..d79ea3475 100644 --- a/app/lib/meadow/batches.ex +++ b/app/lib/meadow/batches.ex @@ -7,6 +7,7 @@ defmodule Meadow.Batches do alias Meadow.Data.{Indexer, Works} alias Meadow.Data.Schemas.{Batch, Work} alias Meadow.Repo + alias Meadow.Search.Client, as: SearchClient alias Meadow.Search.Config, as: SearchConfig alias Meadow.Search.HTTP @@ -14,6 +15,28 @@ defmodule Meadow.Batches do @controlled_fields ~w(contributor creator genre language location style_period subject technique)a + def reprocess do + with :ok <- Indexer.synchronize_index() do + list_batches() + |> Enum.each(&delete_batch/1) + + query = ~s'{"query":{"match_all":{}}}' + type = "delete" + user = "bmq449" + + attrs = %{ + query: query, + type: type, + user: user + } + + {:ok, batch} = create_batch(attrs) + process_batch(batch) + updated_batch = get_batch!(batch.id) + Logger.info("Total works deleted: #{updated_batch.actual_deletes}") + end + end + @doc """ Creates a batch. @@ -117,81 +140,65 @@ defmodule Meadow.Batches do |> Repo.get!(id) end - def process_batch(%Batch{type: "update"} = batch) do - perform_batch(batch) + def delete_batch!(id) do + batch = get_batch!(id) + delete_batch(batch) end - def process_batch(%Batch{type: "delete"} = batch) do - perform_batch(batch) + def delete_batch(%Batch{} = batch) do + Repo.delete(batch) end - def process_batch(%Batch{type: mode}) do - {:error, "mode: #{mode} not implemented"} + def delete_batch(id) do + batch = get_batch!(id) + delete_batch(batch) end - defp perform_batch(batch) do - case set_active(batch) do - {:ok, batch} -> - log_batch_info(batch) - - try do - case do_update(batch) do - {:ok, _any} -> - Logger.info("Batch #{batch.id} complete") - {:ok, set_complete!(batch)} - - {:error, any} -> - Logger.info("Batch #{batch.id} transaction error: #{inspect(any)}") - - {:error, - set_error!(batch, "An error occured during the transaction: #{inspect(any)}")} - end - rescue - e -> - Logger.error("Rescued error for batch #{batch.id}: #{Exception.message(e)}") - Logger.error(Exception.format(:error, e, __STACKTRACE__)) - {:error, set_error!(batch, Exception.message(e))} - end - - {:error, %Ecto.Changeset{}} -> - Logger.info("Couldn't start batch #{batch.id}. Active batch already exists.") - {:ok, batch} + def process_batch(%Batch{} = batch) do + with batch <- set_active!(batch), + :ok <- do_update(batch) do + {:ok, set_complete!(batch)} + else + {:error, error} -> + {:error, set_error!(batch, "An error occurred during the transaction: #{inspect(error)}")} end end defp do_update(%Batch{type: "update"} = batch) do - Meadow.Repo.transaction( + Meadow.Repo.transact( fn -> - process_updates( - batch.query, - decode_value(batch.delete), - decode_value(batch.add), - decode_value(batch.replace), - batch.id - ) - - Indexer.synchronize_index() + with {:ok, result} <- + process_updates( + batch.query, + decode_value(batch.delete), + decode_value(batch.add), + decode_value(batch.replace), + batch.id + ) do + Logger.info(result) + end end, timeout: :infinity ) end defp do_update(%Batch{type: "delete"} = batch) do - Meadow.Repo.transaction( + Meadow.Repo.transact( fn -> - process_deletes( - batch.query, - batch.id - ) - - Indexer.synchronize_index() + with {:ok, result} <- process_deletes(batch.query, batch.id), + :ok <- Indexer.synchronize_index() do + Logger.info(result) + else + error -> + Logger.error(error) + end end, timeout: :infinity ) end - defp set_active(batch) do - update_batch(batch, %{ + defp set_active!(batch) do + update_batch!(batch, %{ started: DateTime.utc_now(), status: "in_progress", active: true @@ -252,7 +259,6 @@ defmodule Meadow.Batches do end # Apply sets of controlled field deletes/adds to a batch of works - defp apply_controlled_field_changes([], _delete, _add), do: [] defp apply_controlled_field_changes(work_ids, delete, nil), @@ -296,7 +302,6 @@ defmodule Meadow.Batches do end # Make sure all controlled field lists are lists, and that each value has a role - defp prepare_controlled_field_list(nil), do: [] defp prepare_controlled_field_list(data) when is_list(data), @@ -379,7 +384,6 @@ defmodule Meadow.Batches do defp update_top_level_field(work_ids, field, value) do Logger.debug("Batch updating #{field}") - update_args = Keyword.new([{field, value}, {:updated_at, DateTime.utc_now()}]) from( @@ -433,7 +437,6 @@ defmodule Meadow.Batches do end # Scroll over the search results and apply changes to each page of work IDs. - defp process_updates( %{"hits" => %{"hits" => []}}, _delete, @@ -445,10 +448,16 @@ defmodule Meadow.Batches do total <- from(wb in "works_batches", where: wb.batch_id == ^batch_uuid) |> Repo.aggregate(:count) do - update_batch(batch_id, %{works_updated: total}) + case update_batch(batch_id, %{works_updated: total}) do + {:ok, _} -> + {:ok, batch_id} + + {:error, changeset} -> + {:error, changeset} + end end - {:ok, :noop} + {:ok, batch_id} end defp process_updates( @@ -480,36 +489,48 @@ defmodule Meadow.Batches do end defp process_updates(query, delete, add, replace, batch_id) do + {:ok, start_count} = SearchClient.indexed_doc_count(SearchConfig.alias_for(Work, 2)) + Logger.info("Reprocessing, starting index count: #{start_count}") + query = query |> Jason.decode!() |> Map.put("_source", "") |> Jason.encode!() - Logger.debug("Starting Elasticsearch scroll for batch update") - Logger.debug("query #{inspect(query)}") - HTTP.post!([SearchConfig.alias_for(Work, 2), "_search?scroll=10m"], query) |> Map.get(:body) |> process_updates(delete, add, replace, batch_id) + + {:ok, batch_id} end # Iterate over the Elasticsearch scroll and apply changes to each page of work IDs. defp process_deletes( - %{"hits" => %{"hits" => [], "total" => %{"value" => total}}}, + %{"hits" => %{"hits" => [], "total" => %{"relation" => _, "value" => total}}}, batch_id ) do - update_batch(batch_id, %{works_updated: total}) - {:ok, :noop} + case update_batch(batch_id, %{works_updated: total, actual_deletes: total}) do + {:ok, updated_batch} -> + {:ok, updated_batch} + + {:error, changeset} -> + {:error, changeset} + end end - defp process_deletes(%{"_scroll_id" => scroll_id, "hits" => hits}, batch_id) do - current_hits = Map.get(hits, "hits") + defp process_deletes( + %{"_scroll_id" => scroll_id, "hits" => %{"hits" => hits, "total" => %{"value" => _}}}, + batch_id + ) do + num_deleted = + hits + |> Enum.map(&Map.get(&1, "_id")) + |> delete_works() - current_hits - |> Enum.map(&Map.get(&1, "_id")) - |> delete_works() + batch = get_batch!(batch_id) + update_batch(batch, %{actual_deletes: batch.actual_deletes + num_deleted}) HTTP.post!("/_search/scroll", %{scroll: "1m", scroll_id: scroll_id}) |> Map.get(:body) @@ -517,22 +538,29 @@ defmodule Meadow.Batches do end defp process_deletes(query, batch_id) do + {:ok, start_count} = SearchClient.indexed_doc_count(SearchConfig.alias_for(Work, 2)) + Logger.info("Reprocessing, starting index count: #{start_count}") + query = Jason.decode!(query) |> Map.put("_source", "") |> Jason.encode!() - Logger.debug("Starting Elasticsearch scroll for batch delete") - Logger.debug("query #{inspect(query)}") + response = + HTTP.post!([SearchConfig.alias_for(Work, 2), "_search?scroll=10m"], query) + |> Map.get(:body) - HTTP.post!([SearchConfig.alias_for(Work, 2), "_search?scroll=10m"], query) - |> Map.get(:body) - |> process_deletes(batch_id) + total = get_in(response, ["hits", "total", "value"]) + update_batch(batch_id, %{expected_deletes: total}) + process_deletes(response, batch_id) end defp delete_works(work_ids) do - from(w in Work, where: w.id in ^work_ids) - |> Repo.delete_all() + {num_deleted, _} = + from(w in Work, where: w.id in ^work_ids) + |> Repo.delete_all() + + num_deleted end defp load_works(work_ids) do @@ -545,12 +573,4 @@ defmodule Meadow.Batches do defp decode_value(json_string) do Jason.decode!(json_string, keys: :atoms) end - - defp log_batch_info(batch) do - Logger.info("Processing batch #{batch.type} for batch_id: #{batch.id}") - Logger.debug("query: #{batch.query}") - Logger.debug("delete: #{inspect(batch.delete)}") - Logger.debug("add: #{inspect(batch.add)}") - Logger.debug("replace: #{inspect(batch.replace)}") - end end diff --git a/app/lib/meadow/data/indexer.ex b/app/lib/meadow/data/indexer.ex index 28f897cb2..752f2f75a 100644 --- a/app/lib/meadow/data/indexer.ex +++ b/app/lib/meadow/data/indexer.ex @@ -18,8 +18,12 @@ defmodule Meadow.Data.Indexer do import Ecto.Query def reindex_all do - SearchConfig.index_versions() - |> Enum.each(&reindex_all/1) + with versions <- SearchConfig.index_versions() do + case Enum.all?(versions, &reindex_all/1) do + true -> :ok + _error -> {:error, "Reindex all failed"} + end + end end def reindex_all(version) do @@ -29,7 +33,7 @@ defmodule Meadow.Data.Indexer do def reindex_all(version, schemas) when is_list(schemas) do schemas |> Enum.uniq() - |> Enum.each(&reindex_all(version, &1)) + |> Enum.map(&reindex_all(version, &1)) end def reindex_all(version, schema) do @@ -55,12 +59,16 @@ defmodule Meadow.Data.Indexer do def synchronize_index do SearchConfig.index_versions() - |> Enum.each(&synchronize_index/1) + |> Enum.flat_map(&synchronize_index/1) + |> Enum.reduce(:ok, fn + result, acc when result == :ok -> acc + _result, _acc -> :error + end) end def synchronize_index(version) do [FileSet, Work, Collection] - |> Enum.each(&synchronize_schema(&1, version)) + |> Enum.map(&synchronize_schema(&1, version)) end def synchronize_schema(schema, version) do @@ -91,14 +99,13 @@ defmodule Meadow.Data.Indexer do end def synchronize_schema(stream, version, index) do - Repo.transaction( + Repo.transact( fn -> stream |> Stream.map(&SearchDocument.encode(&1, version)) |> Bulk.upload(index) SearchIndex.refresh(index) - :ok end, timeout: :infinity ) diff --git a/app/lib/meadow/data/schemas/batch.ex b/app/lib/meadow/data/schemas/batch.ex index 05f734619..fa35ecdb6 100644 --- a/app/lib/meadow/data/schemas/batch.ex +++ b/app/lib/meadow/data/schemas/batch.ex @@ -25,6 +25,8 @@ defmodule Meadow.Data.Schemas.Batch do field :replace, :string field :error, :string field :active, :boolean, default: false + field :expected_deletes, :integer + field :actual_deletes, :integer, default: 0 many_to_many( :works, @@ -51,7 +53,9 @@ defmodule Meadow.Data.Schemas.Batch do :delete, :replace, :error, - :active + :active, + :expected_deletes, + :actual_deletes ]) |> validate_required([:query, :type, :user]) |> unique_constraint(:active) diff --git a/app/lib/meadow/repo.ex b/app/lib/meadow/repo.ex index fd61f79cf..d94a2d020 100644 --- a/app/lib/meadow/repo.ex +++ b/app/lib/meadow/repo.ex @@ -9,6 +9,47 @@ defmodule Meadow.Repo do require Logging require WaitForIt + @doc """ + Credit to Saša Jurić https://github.com/sasa1977/mix_phx_alt/blob/8ef7c36e5ac1a13a8152d0991757811cfd479568/lib/core/repo.ex#L6 + Runs the given function inside a transaction. + + This function is a wrapper around `Ecto.Repo.transaction`, with the following differences: + + - It accepts only a lambda of arity 0 or 1 (i.e. it doesn't work with multi). + - If the lambda returns `:ok | {:ok, result}` the transaction is committed. + - If the lambda returns `:error | {:error, reason}` the transaction is rolled back. + - If the lambda returns any other kind of result, an exception is raised, and the transaction is rolled back. + - The result of `transact` is the value returned by the lambda. + + This function accepts the same options as `Ecto.Repo.transaction/2`. + """ + @spec transact((-> result) | (module -> result), Keyword.t()) :: result + when result: :ok | {:ok, any} | :error | {:error, any} + def transact(fun, opts \\ []) do + transaction_result = + transaction( + fn repo -> + lambda_result = + case Function.info(fun, :arity) do + {:arity, 0} -> fun.() + {:arity, 1} -> fun.(repo) + end + + case lambda_result do + :ok -> {__MODULE__, :transact, :ok} + :error -> rollback({__MODULE__, :transact, :error}) + {:ok, result} -> result + {:error, reason} -> rollback(reason) + end + end, + opts + ) + + with {outcome, {__MODULE__, :transact, outcome}} + when outcome in [:ok, :error] <- transaction_result, + do: outcome + end + def init(_, opts), do: {:ok, opts} def listen(event_name) do diff --git a/app/priv/repo/migrations/20240425205546_add_fields_to_batches.exs b/app/priv/repo/migrations/20240425205546_add_fields_to_batches.exs new file mode 100644 index 000000000..83af90aa9 --- /dev/null +++ b/app/priv/repo/migrations/20240425205546_add_fields_to_batches.exs @@ -0,0 +1,10 @@ +defmodule Meadow.Repo.Migrations.AddFieldsToBatches do + use Ecto.Migration + + def change do + alter table(:batches) do + add :expected_deletes, :integer + add :actual_deletes, :integer + end + end +end