Skip to content

Commit

Permalink
Refactor batch deletes
Browse files Browse the repository at this point in the history
  • Loading branch information
bmquinn committed Apr 27, 2024
1 parent 12c79f9 commit 19ff700
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 92 deletions.
188 changes: 104 additions & 84 deletions app/lib/meadow/batches.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,36 @@ defmodule Meadow.Batches do
alias Meadow.Data.{Indexer, Works}
alias Meadow.Data.Schemas.{Batch, Work}
alias Meadow.Repo
alias Meadow.Search.Client, as: SearchClient
alias Meadow.Search.Config, as: SearchConfig
alias Meadow.Search.HTTP

require Logger

@controlled_fields ~w(contributor creator genre language location style_period subject technique)a

def reprocess do
with :ok <- Indexer.synchronize_index() do
list_batches()
|> Enum.each(&delete_batch/1)

query = ~s'{"query":{"match_all":{}}}'
type = "delete"
user = "bmq449"

attrs = %{
query: query,
type: type,
user: user
}

{:ok, batch} = create_batch(attrs)
process_batch(batch)
updated_batch = get_batch!(batch.id)
Logger.info("Total works deleted: #{updated_batch.actual_deletes}")
end
end

@doc """
Creates a batch.
Expand Down Expand Up @@ -117,81 +140,65 @@ defmodule Meadow.Batches do
|> Repo.get!(id)
end

def process_batch(%Batch{type: "update"} = batch) do
perform_batch(batch)
def delete_batch!(id) do
batch = get_batch!(id)
delete_batch(batch)
end

def process_batch(%Batch{type: "delete"} = batch) do
perform_batch(batch)
def delete_batch(%Batch{} = batch) do
Repo.delete(batch)
end

def process_batch(%Batch{type: mode}) do
{:error, "mode: #{mode} not implemented"}
def delete_batch(id) do
batch = get_batch!(id)
delete_batch(batch)
end

defp perform_batch(batch) do
case set_active(batch) do
{:ok, batch} ->
log_batch_info(batch)

try do
case do_update(batch) do
{:ok, _any} ->
Logger.info("Batch #{batch.id} complete")
{:ok, set_complete!(batch)}

{:error, any} ->
Logger.info("Batch #{batch.id} transaction error: #{inspect(any)}")

{:error,
set_error!(batch, "An error occured during the transaction: #{inspect(any)}")}
end
rescue
e ->
Logger.error("Rescued error for batch #{batch.id}: #{Exception.message(e)}")
Logger.error(Exception.format(:error, e, __STACKTRACE__))
{:error, set_error!(batch, Exception.message(e))}
end

{:error, %Ecto.Changeset{}} ->
Logger.info("Couldn't start batch #{batch.id}. Active batch already exists.")
{:ok, batch}
def process_batch(%Batch{} = batch) do
with batch <- set_active!(batch),
:ok <- do_update(batch) do
{:ok, set_complete!(batch)}
else
{:error, error} ->
{:error, set_error!(batch, "An error occurred during the transaction: #{inspect(error)}")}
end
end

defp do_update(%Batch{type: "update"} = batch) do
Meadow.Repo.transaction(
Meadow.Repo.transact(
fn ->
process_updates(
batch.query,
decode_value(batch.delete),
decode_value(batch.add),
decode_value(batch.replace),
batch.id
)

Indexer.synchronize_index()
with {:ok, result} <-
process_updates(
batch.query,
decode_value(batch.delete),
decode_value(batch.add),
decode_value(batch.replace),
batch.id
) do
Logger.info(result)
end
end,
timeout: :infinity
)
end

defp do_update(%Batch{type: "delete"} = batch) do
Meadow.Repo.transaction(
Meadow.Repo.transact(
fn ->
process_deletes(
batch.query,
batch.id
)

Indexer.synchronize_index()
with {:ok, result} <- process_deletes(batch.query, batch.id),
:ok <- Indexer.synchronize_index() do
Logger.info(result)
else
error ->
Logger.error(error)
end
end,
timeout: :infinity
)
end

defp set_active(batch) do
update_batch(batch, %{
defp set_active!(batch) do
update_batch!(batch, %{
started: DateTime.utc_now(),
status: "in_progress",
active: true
Expand Down Expand Up @@ -252,7 +259,6 @@ defmodule Meadow.Batches do
end

# Apply sets of controlled field deletes/adds to a batch of works

defp apply_controlled_field_changes([], _delete, _add), do: []

defp apply_controlled_field_changes(work_ids, delete, nil),
Expand Down Expand Up @@ -296,7 +302,6 @@ defmodule Meadow.Batches do
end

# Make sure all controlled field lists are lists, and that each value has a role

defp prepare_controlled_field_list(nil), do: []

defp prepare_controlled_field_list(data) when is_list(data),
Expand Down Expand Up @@ -379,7 +384,6 @@ defmodule Meadow.Batches do

defp update_top_level_field(work_ids, field, value) do
Logger.debug("Batch updating #{field}")

update_args = Keyword.new([{field, value}, {:updated_at, DateTime.utc_now()}])

from(
Expand Down Expand Up @@ -433,7 +437,6 @@ defmodule Meadow.Batches do
end

# Scroll over the search results and apply changes to each page of work IDs.

defp process_updates(
%{"hits" => %{"hits" => []}},
_delete,
Expand All @@ -445,10 +448,16 @@ defmodule Meadow.Batches do
total <-
from(wb in "works_batches", where: wb.batch_id == ^batch_uuid)
|> Repo.aggregate(:count) do
update_batch(batch_id, %{works_updated: total})
case update_batch(batch_id, %{works_updated: total}) do
{:ok, _} ->
{:ok, batch_id}

{:error, changeset} ->
{:error, changeset}
end
end

{:ok, :noop}
{:ok, batch_id}
end

defp process_updates(
Expand Down Expand Up @@ -480,59 +489,78 @@ defmodule Meadow.Batches do
end

defp process_updates(query, delete, add, replace, batch_id) do
{:ok, start_count} = SearchClient.indexed_doc_count(SearchConfig.alias_for(Work, 2))
Logger.info("Reprocessing, starting index count: #{start_count}")

query =
query
|> Jason.decode!()
|> Map.put("_source", "")
|> Jason.encode!()

Logger.debug("Starting Elasticsearch scroll for batch update")
Logger.debug("query #{inspect(query)}")

HTTP.post!([SearchConfig.alias_for(Work, 2), "_search?scroll=10m"], query)
|> Map.get(:body)
|> process_updates(delete, add, replace, batch_id)

{:ok, batch_id}
end

# Iterate over the Elasticsearch scroll and apply changes to each page of work IDs.

defp process_deletes(
%{"hits" => %{"hits" => [], "total" => %{"value" => total}}},
%{"hits" => %{"hits" => [], "total" => %{"relation" => _, "value" => total}}},
batch_id
) do
update_batch(batch_id, %{works_updated: total})
{:ok, :noop}
case update_batch(batch_id, %{works_updated: total, actual_deletes: total}) do
{:ok, updated_batch} ->
{:ok, updated_batch}

{:error, changeset} ->
{:error, changeset}
end
end

defp process_deletes(%{"_scroll_id" => scroll_id, "hits" => hits}, batch_id) do
current_hits = Map.get(hits, "hits")
defp process_deletes(
%{"_scroll_id" => scroll_id, "hits" => %{"hits" => hits, "total" => %{"value" => _}}},
batch_id
) do
num_deleted =
hits
|> Enum.map(&Map.get(&1, "_id"))
|> delete_works()

current_hits
|> Enum.map(&Map.get(&1, "_id"))
|> delete_works()
batch = get_batch!(batch_id)
update_batch(batch, %{actual_deletes: batch.actual_deletes + num_deleted})

HTTP.post!("/_search/scroll", %{scroll: "1m", scroll_id: scroll_id})
|> Map.get(:body)
|> process_deletes(batch_id)
end

defp process_deletes(query, batch_id) do
{:ok, start_count} = SearchClient.indexed_doc_count(SearchConfig.alias_for(Work, 2))
Logger.info("Reprocessing, starting index count: #{start_count}")

query =
Jason.decode!(query)
|> Map.put("_source", "")
|> Jason.encode!()

Logger.debug("Starting Elasticsearch scroll for batch delete")
Logger.debug("query #{inspect(query)}")
response =
HTTP.post!([SearchConfig.alias_for(Work, 2), "_search?scroll=10m"], query)
|> Map.get(:body)

HTTP.post!([SearchConfig.alias_for(Work, 2), "_search?scroll=10m"], query)
|> Map.get(:body)
|> process_deletes(batch_id)
total = get_in(response, ["hits", "total", "value"])
update_batch(batch_id, %{expected_deletes: total})
process_deletes(response, batch_id)
end

defp delete_works(work_ids) do
from(w in Work, where: w.id in ^work_ids)
|> Repo.delete_all()
{num_deleted, _} =
from(w in Work, where: w.id in ^work_ids)
|> Repo.delete_all()

num_deleted
end

defp load_works(work_ids) do
Expand All @@ -545,12 +573,4 @@ defmodule Meadow.Batches do
defp decode_value(json_string) do
Jason.decode!(json_string, keys: :atoms)
end

defp log_batch_info(batch) do
Logger.info("Processing batch #{batch.type} for batch_id: #{batch.id}")
Logger.debug("query: #{batch.query}")
Logger.debug("delete: #{inspect(batch.delete)}")
Logger.debug("add: #{inspect(batch.add)}")
Logger.debug("replace: #{inspect(batch.replace)}")
end
end
21 changes: 14 additions & 7 deletions app/lib/meadow/data/indexer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@ defmodule Meadow.Data.Indexer do
import Ecto.Query

def reindex_all do
SearchConfig.index_versions()
|> Enum.each(&reindex_all/1)
with versions <- SearchConfig.index_versions() do
case Enum.all?(versions, &reindex_all/1) do
true -> :ok
_error -> {:error, "Reindex all failed"}
end
end
end

def reindex_all(version) do
Expand All @@ -29,7 +33,7 @@ defmodule Meadow.Data.Indexer do
def reindex_all(version, schemas) when is_list(schemas) do
schemas
|> Enum.uniq()
|> Enum.each(&reindex_all(version, &1))
|> Enum.map(&reindex_all(version, &1))
end

def reindex_all(version, schema) do
Expand All @@ -55,12 +59,16 @@ defmodule Meadow.Data.Indexer do

def synchronize_index do
SearchConfig.index_versions()
|> Enum.each(&synchronize_index/1)
|> Enum.flat_map(&synchronize_index/1)
|> Enum.reduce(:ok, fn
result, acc when result == :ok -> acc
_result, _acc -> :error
end)
end

def synchronize_index(version) do
[FileSet, Work, Collection]
|> Enum.each(&synchronize_schema(&1, version))
|> Enum.map(&synchronize_schema(&1, version))
end

def synchronize_schema(schema, version) do
Expand Down Expand Up @@ -91,14 +99,13 @@ defmodule Meadow.Data.Indexer do
end

def synchronize_schema(stream, version, index) do
Repo.transaction(
Repo.transact(
fn ->
stream
|> Stream.map(&SearchDocument.encode(&1, version))
|> Bulk.upload(index)

SearchIndex.refresh(index)
:ok
end,
timeout: :infinity
)
Expand Down
6 changes: 5 additions & 1 deletion app/lib/meadow/data/schemas/batch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ defmodule Meadow.Data.Schemas.Batch do
field :replace, :string
field :error, :string
field :active, :boolean, default: false
field :expected_deletes, :integer
field :actual_deletes, :integer, default: 0

many_to_many(
:works,
Expand All @@ -51,7 +53,9 @@ defmodule Meadow.Data.Schemas.Batch do
:delete,
:replace,
:error,
:active
:active,
:expected_deletes,
:actual_deletes
])
|> validate_required([:query, :type, :user])
|> unique_constraint(:active)
Expand Down
Loading

0 comments on commit 19ff700

Please sign in to comment.