From dc214360e7043f509810f5e6787f834468fb44d3 Mon Sep 17 00:00:00 2001 From: Stefan Obermeier Date: Mon, 26 Jun 2017 16:10:16 +0200 Subject: [PATCH] Show configuration errors --- .../QueryspaceConfigurationFileHandler.scala | 144 ++++++++++-------- 1 file changed, 79 insertions(+), 65 deletions(-) diff --git a/scray-loader/src/main/scala/scray/loader/configparser/QueryspaceConfigurationFileHandler.scala b/scray-loader/src/main/scala/scray/loader/configparser/QueryspaceConfigurationFileHandler.scala index 70aab7c3b..1cf643f45 100644 --- a/scray-loader/src/main/scala/scray/loader/configparser/QueryspaceConfigurationFileHandler.scala +++ b/scray-loader/src/main/scala/scray/loader/configparser/QueryspaceConfigurationFileHandler.scala @@ -14,80 +14,93 @@ // limitations under the License. package scray.loader.configparser -import org.apache.hadoop.fs.Path -import scala.util.Try -import org.apache.hadoop.fs.FileSystem -import org.apache.hadoop.conf.Configuration -import org.apache.commons.io.IOUtils +import java.io.File +import java.io.FileInputStream +import java.io.FilenameFilter +import java.io.IOException import java.net.URL + +import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap import scala.util.Failure -import scala.collection.mutable.ArrayBuffer +import scala.util.Success +import scala.util.Try + +import org.apache.commons.io.IOUtils +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.fs.Path + import com.typesafe.scalalogging.slf4j.LazyLogging -import java.io.File -import java.io.FilenameFilter -import java.io.IOException -import java.io.FileInputStream /** * handles reading scray config files from external sources. */ object QueryspaceConfigurationFileHandler extends LazyLogging { - + type UpdateCallback = (String, Option[(ScrayQueryspaceConfiguration, Long)]) => Unit - + val WITH_QS = " with queryspace " val READ_FILE = "Read queryspace configuration file " - + /** * reads in all the queryspace configurations and updates the configurations if necessary */ def performQueryspaceUpdate(newConfig: ScrayConfiguration, - oldqueryspaces: HashMap[String, (Long, ScrayQueryspaceConfiguration)], - callbacks: Seq[UpdateCallback]): Unit = { + oldqueryspaces: HashMap[String, (Long, ScrayQueryspaceConfiguration)], + callbacks: Seq[UpdateCallback]): Unit = { val configs = scanDirectories(newConfig) + + configs.map(_ match { + case Success(scannedFiles) => registerScanedFiles(scannedFiles, oldqueryspaces, callbacks) + case Failure(e) => logger.error(e.getMessage) // Messages from e are mostly good. + }) + } + + private def registerScanedFiles(scannedFiles: ScannedQueryspaceConfigfiles, oldqueryspaces: HashMap[String, (Long, ScrayQueryspaceConfiguration)], callbacks: Seq[UpdateCallback]) = { val knownQS = new ArrayBuffer[String] - configs.filter(_.isSuccess).foreach { scannedFiles => - knownQS += scannedFiles.get.name - val oldQs = oldqueryspaces.get(scannedFiles.get.name) - if(oldQs.isEmpty) { - // new queryspace registered... notify - callbacks.foreach { _(scannedFiles.get.name, Some((scannedFiles.get.queryspaceConfig, scannedFiles.get.version))) } - oldqueryspaces.update(scannedFiles.get.name, (scannedFiles.get.version, scannedFiles.get.queryspaceConfig)) - } else { - // new version? -> notify - if(oldQs.get._1 != scannedFiles.get.version) { - callbacks.foreach { _(scannedFiles.get.name, Some((scannedFiles.get.queryspaceConfig, scannedFiles.get.version))) } - oldqueryspaces.update(scannedFiles.get.name, (scannedFiles.get.version, scannedFiles.get.queryspaceConfig)) - } + + knownQS += scannedFiles.name + val oldQs = oldqueryspaces.get(scannedFiles.name) + if (oldQs.isEmpty) { + // new queryspace registered... notify + callbacks.foreach { _(scannedFiles.name, Some((scannedFiles.queryspaceConfig, scannedFiles.version))) } + oldqueryspaces.update(scannedFiles.name, (scannedFiles.version, scannedFiles.queryspaceConfig)) + } else { + // new version? -> notify + if (oldQs.get._1 != scannedFiles.version) { + callbacks.foreach { _(scannedFiles.name, Some((scannedFiles.queryspaceConfig, scannedFiles.version))) } + oldqueryspaces.update(scannedFiles.name, (scannedFiles.version, scannedFiles.queryspaceConfig)) } } + // the queryspaces which were there, but aren't any more, must be notified - (oldqueryspaces.keySet -- knownQS).foreach { qs => + (oldqueryspaces.keySet -- knownQS).foreach { qs => callbacks.foreach { _(qs, None) } oldqueryspaces.remove(qs) - } + } } - + /** * reads a file into a String directly from HDFS */ def readFileFromHDFS(filePath: Path, fs: FileSystem): String = IOUtils.toString(fs.open(filePath)) - - private def handleHDFS(url: ScrayQueryspaceConfigurationURL, config: ScrayConfiguration): - Seq[Try[ScannedQueryspaceConfigfiles]] = { + + private def handleHDFS(url: ScrayQueryspaceConfigurationURL, config: ScrayConfiguration): Seq[Try[ScannedQueryspaceConfigfiles]] = { try { val path = new Path(url.url) val fs = FileSystem.get(new Configuration()) - if(fs.getFileStatus(path).isDir()) { + if (fs.getFileStatus(path).isDir()) { logger.info("Reading directory for HDFS-URL " + url.url + ", scanning for *" + SCRAY_QUERYSPACE_CONFIG_ENDING + "...") // read complete directory val stati = fs.listStatus(path) - stati.filter(_.getPath().getName().endsWith(SCRAY_QUERYSPACE_CONFIG_ENDING)).map { status => Try { - val parsedFile = ScrayQueryspaceConfigurationParser.parse(readFileFromHDFS(status.getPath, fs), config, true) - logger.info(READ_FILE + status.getPath + WITH_QS + parsedFile.get.name) - ScannedQueryspaceConfigfiles(status.getPath.toString(), parsedFile.get.name, parsedFile.get.version, parsedFile.get) - }}.toSeq + stati.filter(_.getPath().getName().endsWith(SCRAY_QUERYSPACE_CONFIG_ENDING)).map { status => + Try { + val parsedFile = ScrayQueryspaceConfigurationParser.parse(readFileFromHDFS(status.getPath, fs), config, true) + logger.info(READ_FILE + status.getPath + WITH_QS + parsedFile.get.name) + ScannedQueryspaceConfigfiles(status.getPath.toString(), parsedFile.get.name, parsedFile.get.version, parsedFile.get) + } + }.toSeq } else { Seq(Try { val parsedFile = ScrayQueryspaceConfigurationParser.parse(readFileFromHDFS(path, fs), config, true) @@ -97,46 +110,47 @@ object QueryspaceConfigurationFileHandler extends LazyLogging { } } catch { case e: Exception => Seq(Failure(e)) - } + } } - - private def handleURL(url: ScrayQueryspaceConfigurationURL, config: ScrayConfiguration): - Seq[Try[ScannedQueryspaceConfigfiles]] = { + + private def handleURL(url: ScrayQueryspaceConfigurationURL, config: ScrayConfiguration): Seq[Try[ScannedQueryspaceConfigfiles]] = { // we could install an URL-Stream-handler, but this might require command line parameters, so we handle this hard-wired - if(url.url.startsWith(RESOURCE_SCHEMA)) { + if (url.url.startsWith(RESOURCE_SCHEMA)) { Seq(Try { val parsedFile = ScrayQueryspaceConfigurationParser.parseResource(url.url.stripPrefix(RESOURCE_SCHEMA), config, true) logger.warn("###" + parsedFile) - logger.info("Read queryspace configuration from classpath " + url.url + WITH_QS + parsedFile.get.name) - ScannedQueryspaceConfigfiles(url.url, parsedFile.get.name, parsedFile.get.version, parsedFile.get) + logger.info("Read queryspace configuration from classpath " + url.url + WITH_QS + parsedFile.get.name) + ScannedQueryspaceConfigfiles(url.url, parsedFile.get.name, parsedFile.get.version, parsedFile.get) }) } else { Seq(Try { val fileURL = new URL(url.url) val parsedFile = ScrayQueryspaceConfigurationParser.parse(IOUtils.toString(fileURL.openStream()), config, true) - logger.info(READ_FILE + url.url + WITH_QS + parsedFile.get.name) + logger.info(READ_FILE + url.url + WITH_QS + parsedFile.get.name) ScannedQueryspaceConfigfiles(url.url, parsedFile.get.name, parsedFile.get.version, parsedFile.get) }) } } - - private def handleDiskFiles(url: ScrayQueryspaceConfigurationURL, config: ScrayConfiguration): - Seq[Try[ScannedQueryspaceConfigfiles]] = { + + private def handleDiskFiles(url: ScrayQueryspaceConfigurationURL, config: ScrayConfiguration): Seq[Try[ScannedQueryspaceConfigfiles]] = { val file = new File(url.url) - if(file.exists()) { - if(file.isDirectory()) { + if (file.exists()) { + if (file.isDirectory()) { logger.info(s"Reading directory ${url.url}, scanning for *" + SCRAY_QUERYSPACE_CONFIG_ENDING + "...") - val files = file.listFiles(new FilenameFilter() { - override def accept(dir: File, name: String): Boolean = name.endsWith(SCRAY_QUERYSPACE_CONFIG_ENDING) }) - files.map { conffile => Try { - val parsedFile = ScrayQueryspaceConfigurationParser.parse(IOUtils.toString(new FileInputStream(conffile)), config, true) - logger.info(READ_FILE + conffile.toString() + WITH_QS + parsedFile.get.name) - ScannedQueryspaceConfigfiles(conffile.toString(), parsedFile.get.name, parsedFile.get.version, parsedFile.get) - }}.toSeq + val files = file.listFiles(new FilenameFilter() { + override def accept(dir: File, name: String): Boolean = name.endsWith(SCRAY_QUERYSPACE_CONFIG_ENDING) + }) + files.map { conffile => + Try { + val parsedFile = ScrayQueryspaceConfigurationParser.parse(IOUtils.toString(new FileInputStream(conffile)), config, true) + logger.info(READ_FILE + conffile.toString() + WITH_QS + parsedFile.get.name) + ScannedQueryspaceConfigfiles(conffile.toString(), parsedFile.get.name, parsedFile.get.version, parsedFile.get) + } + }.toSeq } else { Seq(Try { val parsedFile = ScrayQueryspaceConfigurationParser.parse(IOUtils.toString(new FileInputStream(file)), config, true) - logger.info(READ_FILE + url.url + WITH_QS + parsedFile.get.name) + logger.info(READ_FILE + url.url + WITH_QS + parsedFile.get.name) ScannedQueryspaceConfigfiles(url.url, parsedFile.get.name, parsedFile.get.version, parsedFile.get) }) } @@ -144,21 +158,21 @@ object QueryspaceConfigurationFileHandler extends LazyLogging { val msg = "Could not read queryspace configuration file " + url.url logger.warn(msg) Seq(Failure(new IOException(msg))) - } + } } - + /** * Scan directories and files for Queryspaces. * We ignore reload for now, which is controlled externally. */ def scanDirectories(config: ScrayConfiguration): Seq[Try[ScannedQueryspaceConfigfiles]] = { config.urls.map { url => - if(url.url.substring(0, HDFS_SCHEMA_LENGTH).equalsIgnoreCase(HDFS_SCHEMA)) { + if (url.url.substring(0, HDFS_SCHEMA_LENGTH).equalsIgnoreCase(HDFS_SCHEMA)) { // if the url starts with hdfs we read from HDFS handleHDFS(url, config) } else { // if the URL contains :// we expect this to be a URL to a file containing scray queryspace information - if(url.url.contains(SCHEMA_SEPARATOR)) { + if (url.url.contains(SCHEMA_SEPARATOR)) { handleURL(url, config) } else { // this is either a regular file or a directory...