-
Notifications
You must be signed in to change notification settings - Fork 9
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Added a websocket-based audit scanner
This scans manually-created items for those which have missing mandatory or desirable fields
- Loading branch information
Showing
37 changed files
with
802 additions
and
121 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
package actors.datamodel | ||
|
||
import actors.LongRunningJob.Cancel | ||
import actors.datamodel.AuditorManager.{AuditTask, AuditorJob} | ||
import akka.actor.{Actor, ActorLogging, ActorRef} | ||
import akka.pattern._ | ||
import models._ | ||
import services.data.DataUser | ||
import services.search._ | ||
import utils.PageParams | ||
|
||
import java.time.{Duration, Instant} | ||
import scala.concurrent.ExecutionContext | ||
|
||
object Auditor { | ||
sealed trait Action | ||
|
||
case class RunAudit(job: AuditorJob, query: Option[SearchQuery] = None) extends Action | ||
|
||
case class Check(id: String, errors: Seq[ValidationError]) | ||
|
||
case class Checked(checked: Int) extends Action | ||
|
||
case class Completed(checked: Int, count: Int, secs: Long = 0) extends Action | ||
|
||
case class Cancelled(checked: Int, count: Int, secs: Long = 0) extends Action | ||
|
||
case class CheckBatch(checks: Seq[Check], more: Boolean) | ||
|
||
private case class Batch(job: AuditorJob, items: Seq[Entity], query: SearchQuery, more: Boolean) extends Action | ||
} | ||
|
||
|
||
case class Auditor(searchEngine: SearchEngine, resolver: SearchItemResolver, fieldMetadataSet: FieldMetadataSet, batchSize: Int, maxFlagged: Int)( | ||
implicit userOpt: Option[UserProfile], ec: ExecutionContext) extends Actor with ActorLogging { | ||
import Auditor._ | ||
private implicit val apiUser: DataUser = DataUser(userOpt.map(_.id)) | ||
|
||
override def receive: Receive = { | ||
case e: RunAudit => | ||
context.become(running(sender(), Instant.now(), 0, 0)) | ||
self ! e | ||
} | ||
|
||
def running(msgTo: ActorRef, start: Instant, checked: Int, flagged: Int): Receive = { | ||
case RunAudit(job, queryOpt) => | ||
// Search for entities to audit | ||
val query: SearchQuery = queryOpt.getOrElse(initSearch(job.task)) | ||
|
||
searchEngine.search(query).flatMap { res => | ||
resolver.resolve[Entity](res.page.items).map { list => | ||
Batch(job, list.flatten, query, res.page.hasMore) | ||
} | ||
}.pipeTo(self) | ||
|
||
case Batch(job, items, query, more) => | ||
log.debug(s"Found ${items.size} items for audit, total so far: $checked") | ||
val errors: Seq[Check] = items.flatMap { item => | ||
val errs = fieldMetadataSet.validate(item) | ||
val pErrs = errs.collect { case e@MissingMandatoryField(_) => e} | ||
if (job.task.mandatoryOnly && pErrs.isEmpty) None | ||
else if (errs.nonEmpty) Some(Check(item.id, errs)) | ||
else None | ||
} | ||
|
||
if (checked % batchSize == 0) { | ||
msgTo ! Checked(checked) | ||
} | ||
|
||
msgTo ! CheckBatch(errors, more) | ||
|
||
if (more && (flagged + errors.size) < maxFlagged) { | ||
val next = query.copy(paging = query.paging.next) | ||
context.become(running(msgTo, start, checked + items.size, flagged + errors.size)) | ||
self ! RunAudit(job, Some(next)) | ||
} else { | ||
msgTo ! Completed(checked + items.size, flagged + errors.size, time(start)) | ||
} | ||
|
||
case Cancel => | ||
log.debug(s"Cancelling audit job, checked so far: $checked, flagged: $flagged") | ||
sender() ! Cancelled(checked, flagged) | ||
} | ||
|
||
private def time(from: Instant): Long = Duration.between(from, Instant.now()).toMillis / 1000 | ||
|
||
private def initSearch(task: AuditTask): SearchQuery = { | ||
val paging: PageParams = PageParams(limit = batchSize) | ||
val params: SearchParams = SearchParams(entities = Seq(task.entityType), query = task.idPrefix.map(p => s"$p*"), sort = Some(SearchSort.Id)) | ||
val cpFacet = AppliedFacet(SearchConstants.CREATION_PROCESS, Seq(Description.CreationProcess.Manual.toString)) | ||
val facets = List(FieldFacetClass(key = SearchConstants.CREATION_PROCESS, name = "Creation Process", param = "creation")) | ||
SearchQuery(params = params, paging = paging, appliedFacets = Seq(cpFacet), user = userOpt, facetClasses = facets) | ||
} | ||
} |
116 changes: 116 additions & 0 deletions
116
modules/admin/app/actors/datamodel/AuditorManager.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,116 @@ | ||
package actors.datamodel | ||
|
||
import actors.LongRunningJob.Cancel | ||
import actors.datamodel.Auditor.{Cancelled, CheckBatch, Checked, Completed, RunAudit} | ||
import actors.datamodel.AuditorManager.{AuditorJob, ItemResult} | ||
import akka.actor.SupervisorStrategy.Stop | ||
import akka.actor.{Actor, ActorLogging, ActorRef, OneForOneStrategy, Props, SupervisorStrategy, Terminated} | ||
import models._ | ||
import play.api.Configuration | ||
import play.api.i18n.Messages | ||
import play.api.libs.json.{Format, Json} | ||
import services.search.{SearchEngine, SearchItemResolver} | ||
import utils.WebsocketConstants | ||
|
||
import scala.concurrent.ExecutionContext | ||
import scala.concurrent.duration.DurationInt | ||
|
||
object AuditorManager { | ||
|
||
case class AuditTask(entityType: EntityType.Value, idPrefix: Option[String] = None, mandatoryOnly: Boolean) | ||
object AuditTask { | ||
implicit val _format: Format[AuditTask] = Json.format[AuditTask] | ||
} | ||
|
||
case class AuditorJob(jobId: String, task: AuditTask) | ||
|
||
case class ItemResult(id: String, mandatory: Seq[String], desirable: Seq[String]) | ||
object ItemResult { | ||
implicit val _format: Format[ItemResult] = Json.format[ItemResult] | ||
} | ||
} | ||
|
||
case class AuditorManager(job: AuditorJob, searchEngine: SearchEngine, searchItemResolver: SearchItemResolver, fieldMetadataSet: FieldMetadataSet)( | ||
implicit userOpt: Option[UserProfile], messages: Messages, ec: ExecutionContext, config: Configuration) extends Actor with ActorLogging { | ||
|
||
override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() { | ||
case e => | ||
self ! e | ||
Stop | ||
} | ||
|
||
// Ready state: we've received a job but won't actually start | ||
// until there is a channel to talk through | ||
override def receive: Receive = { | ||
case chan: ActorRef => | ||
log.debug("Received initial subscriber, starting...") | ||
val maxResults = config.get[Int]("ehri.admin.auditor.maxResults") | ||
val batchSize = config.get[Int]("ehri.admin.auditor.batchSize") | ||
val runner = context.actorOf(Props(Auditor(searchEngine, searchItemResolver, fieldMetadataSet, batchSize, maxResults))) | ||
context.become(running(runner, Set(chan))) | ||
runner ! RunAudit(job, None) | ||
} | ||
|
||
/** | ||
* Running state. | ||
* | ||
* @param runner the harvest runner actor | ||
* @param subs a set of subscribers to message w/ updates | ||
*/ | ||
def running(runner: ActorRef, subs: Set[ActorRef]): Receive = { | ||
|
||
// Add a new message subscriber | ||
case chan: ActorRef => | ||
log.debug(s"Added new message subscriber, ${subs.size}") | ||
context.watch(chan) | ||
context.become(running(runner, subs + chan)) | ||
|
||
case Terminated(actor) if actor == runner => | ||
log.debug(s"Actor terminated: $actor") | ||
context.system.scheduler.scheduleOnce(5.seconds, self, | ||
"Convert runner unexpectedly shut down") | ||
|
||
// Remove terminated subscribers | ||
case Terminated(chan) => | ||
log.debug(s"Removing subscriber: $chan") | ||
context.unwatch(chan) | ||
context.become(running(runner, subs - chan)) | ||
|
||
// A file has been converted | ||
case CheckBatch(checks, _) => | ||
val res: Seq[ItemResult] = checks.filter(_.errors.nonEmpty).map { check => | ||
val mandatory = check.errors.collect { case e: MissingMandatoryField => e } | ||
val desirable = check.errors.collect { case e: MissingDesirableField => e } | ||
ItemResult(check.id, mandatory.map(_.id), desirable.map(_.id)) | ||
} | ||
msg(Json.stringify(Json.toJson(res)), subs) | ||
|
||
// Received confirmation that the runner has shut down | ||
case Cancelled(checked, _, secs) => | ||
msg(Messages("dataModel.audit.cancelled", WebsocketConstants.DONE_MESSAGE, checked, secs), subs) | ||
context.stop(self) | ||
|
||
// The runner has completed, so we log the | ||
// event and shut down too | ||
case Completed(checked, flagged, secs) => | ||
msg(Messages("dataModel.audit.completed", WebsocketConstants.DONE_MESSAGE, checked, flagged, secs), subs) | ||
context.stop(self) | ||
|
||
case Checked(checked) => | ||
msg(Messages("dataModel.audit.checked", WebsocketConstants.INFO_MESSAGE, checked), subs) | ||
|
||
// Cancel conversion... here we tell the runner to exit | ||
// and shut down on its termination signal... | ||
case Cancel => | ||
log.info("Cancelling audit...") | ||
runner ! Cancel | ||
|
||
case m => | ||
log.error(s"Unexpected message: $m") | ||
} | ||
|
||
private def msg(s: String, subs: Set[ActorRef]): Unit = { | ||
// log.info(s + s" (subscribers: ${subs.size})") | ||
subs.foreach(_ ! s) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.