Skip to content

Commit

Permalink
Show configuration errors
Browse files Browse the repository at this point in the history
  • Loading branch information
obermeier committed Jun 26, 2017
1 parent a174b00 commit dc21436
Showing 1 changed file with 79 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,80 +14,93 @@
// limitations under the License.
package scray.loader.configparser

import org.apache.hadoop.fs.Path
import scala.util.Try
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.conf.Configuration
import org.apache.commons.io.IOUtils
import java.io.File
import java.io.FileInputStream
import java.io.FilenameFilter
import java.io.IOException
import java.net.URL

import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import scala.util.Failure
import scala.collection.mutable.ArrayBuffer
import scala.util.Success
import scala.util.Try

import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path

import com.typesafe.scalalogging.slf4j.LazyLogging
import java.io.File
import java.io.FilenameFilter
import java.io.IOException
import java.io.FileInputStream

/**
* handles reading scray config files from external sources.
*/
object QueryspaceConfigurationFileHandler extends LazyLogging {

type UpdateCallback = (String, Option[(ScrayQueryspaceConfiguration, Long)]) => Unit

val WITH_QS = " with queryspace "
val READ_FILE = "Read queryspace configuration file "

/**
* reads in all the queryspace configurations and updates the configurations if necessary
*/
def performQueryspaceUpdate(newConfig: ScrayConfiguration,
oldqueryspaces: HashMap[String, (Long, ScrayQueryspaceConfiguration)],
callbacks: Seq[UpdateCallback]): Unit = {
oldqueryspaces: HashMap[String, (Long, ScrayQueryspaceConfiguration)],
callbacks: Seq[UpdateCallback]): Unit = {
val configs = scanDirectories(newConfig)

configs.map(_ match {
case Success(scannedFiles) => registerScanedFiles(scannedFiles, oldqueryspaces, callbacks)
case Failure(e) => logger.error(e.getMessage) // Messages from e are mostly good.
})
}

private def registerScanedFiles(scannedFiles: ScannedQueryspaceConfigfiles, oldqueryspaces: HashMap[String, (Long, ScrayQueryspaceConfiguration)], callbacks: Seq[UpdateCallback]) = {
val knownQS = new ArrayBuffer[String]
configs.filter(_.isSuccess).foreach { scannedFiles =>
knownQS += scannedFiles.get.name
val oldQs = oldqueryspaces.get(scannedFiles.get.name)
if(oldQs.isEmpty) {
// new queryspace registered... notify
callbacks.foreach { _(scannedFiles.get.name, Some((scannedFiles.get.queryspaceConfig, scannedFiles.get.version))) }
oldqueryspaces.update(scannedFiles.get.name, (scannedFiles.get.version, scannedFiles.get.queryspaceConfig))
} else {
// new version? -> notify
if(oldQs.get._1 != scannedFiles.get.version) {
callbacks.foreach { _(scannedFiles.get.name, Some((scannedFiles.get.queryspaceConfig, scannedFiles.get.version))) }
oldqueryspaces.update(scannedFiles.get.name, (scannedFiles.get.version, scannedFiles.get.queryspaceConfig))
}

knownQS += scannedFiles.name
val oldQs = oldqueryspaces.get(scannedFiles.name)
if (oldQs.isEmpty) {
// new queryspace registered... notify
callbacks.foreach { _(scannedFiles.name, Some((scannedFiles.queryspaceConfig, scannedFiles.version))) }
oldqueryspaces.update(scannedFiles.name, (scannedFiles.version, scannedFiles.queryspaceConfig))
} else {
// new version? -> notify
if (oldQs.get._1 != scannedFiles.version) {
callbacks.foreach { _(scannedFiles.name, Some((scannedFiles.queryspaceConfig, scannedFiles.version))) }
oldqueryspaces.update(scannedFiles.name, (scannedFiles.version, scannedFiles.queryspaceConfig))
}
}

// the queryspaces which were there, but aren't any more, must be notified
(oldqueryspaces.keySet -- knownQS).foreach { qs =>
(oldqueryspaces.keySet -- knownQS).foreach { qs =>
callbacks.foreach { _(qs, None) }
oldqueryspaces.remove(qs)
}
}
}

/**
* reads a file into a String directly from HDFS
*/
def readFileFromHDFS(filePath: Path, fs: FileSystem): String = IOUtils.toString(fs.open(filePath))

private def handleHDFS(url: ScrayQueryspaceConfigurationURL, config: ScrayConfiguration):
Seq[Try[ScannedQueryspaceConfigfiles]] = {

private def handleHDFS(url: ScrayQueryspaceConfigurationURL, config: ScrayConfiguration): Seq[Try[ScannedQueryspaceConfigfiles]] = {
try {
val path = new Path(url.url)
val fs = FileSystem.get(new Configuration())
if(fs.getFileStatus(path).isDir()) {
if (fs.getFileStatus(path).isDir()) {
logger.info("Reading directory for HDFS-URL " + url.url + ", scanning for *" + SCRAY_QUERYSPACE_CONFIG_ENDING + "...")
// read complete directory
val stati = fs.listStatus(path)
stati.filter(_.getPath().getName().endsWith(SCRAY_QUERYSPACE_CONFIG_ENDING)).map { status => Try {
val parsedFile = ScrayQueryspaceConfigurationParser.parse(readFileFromHDFS(status.getPath, fs), config, true)
logger.info(READ_FILE + status.getPath + WITH_QS + parsedFile.get.name)
ScannedQueryspaceConfigfiles(status.getPath.toString(), parsedFile.get.name, parsedFile.get.version, parsedFile.get)
}}.toSeq
stati.filter(_.getPath().getName().endsWith(SCRAY_QUERYSPACE_CONFIG_ENDING)).map { status =>
Try {
val parsedFile = ScrayQueryspaceConfigurationParser.parse(readFileFromHDFS(status.getPath, fs), config, true)
logger.info(READ_FILE + status.getPath + WITH_QS + parsedFile.get.name)
ScannedQueryspaceConfigfiles(status.getPath.toString(), parsedFile.get.name, parsedFile.get.version, parsedFile.get)
}
}.toSeq
} else {
Seq(Try {
val parsedFile = ScrayQueryspaceConfigurationParser.parse(readFileFromHDFS(path, fs), config, true)
Expand All @@ -97,68 +110,69 @@ object QueryspaceConfigurationFileHandler extends LazyLogging {
}
} catch {
case e: Exception => Seq(Failure(e))
}
}
}

private def handleURL(url: ScrayQueryspaceConfigurationURL, config: ScrayConfiguration):
Seq[Try[ScannedQueryspaceConfigfiles]] = {

private def handleURL(url: ScrayQueryspaceConfigurationURL, config: ScrayConfiguration): Seq[Try[ScannedQueryspaceConfigfiles]] = {
// we could install an URL-Stream-handler, but this might require command line parameters, so we handle this hard-wired
if(url.url.startsWith(RESOURCE_SCHEMA)) {
if (url.url.startsWith(RESOURCE_SCHEMA)) {
Seq(Try {
val parsedFile = ScrayQueryspaceConfigurationParser.parseResource(url.url.stripPrefix(RESOURCE_SCHEMA), config, true)
logger.warn("###" + parsedFile)
logger.info("Read queryspace configuration from classpath " + url.url + WITH_QS + parsedFile.get.name)
ScannedQueryspaceConfigfiles(url.url, parsedFile.get.name, parsedFile.get.version, parsedFile.get)
logger.info("Read queryspace configuration from classpath " + url.url + WITH_QS + parsedFile.get.name)
ScannedQueryspaceConfigfiles(url.url, parsedFile.get.name, parsedFile.get.version, parsedFile.get)
})
} else {
Seq(Try {
val fileURL = new URL(url.url)
val parsedFile = ScrayQueryspaceConfigurationParser.parse(IOUtils.toString(fileURL.openStream()), config, true)
logger.info(READ_FILE + url.url + WITH_QS + parsedFile.get.name)
logger.info(READ_FILE + url.url + WITH_QS + parsedFile.get.name)
ScannedQueryspaceConfigfiles(url.url, parsedFile.get.name, parsedFile.get.version, parsedFile.get)
})
}
}

private def handleDiskFiles(url: ScrayQueryspaceConfigurationURL, config: ScrayConfiguration):
Seq[Try[ScannedQueryspaceConfigfiles]] = {

private def handleDiskFiles(url: ScrayQueryspaceConfigurationURL, config: ScrayConfiguration): Seq[Try[ScannedQueryspaceConfigfiles]] = {
val file = new File(url.url)
if(file.exists()) {
if(file.isDirectory()) {
if (file.exists()) {
if (file.isDirectory()) {
logger.info(s"Reading directory ${url.url}, scanning for *" + SCRAY_QUERYSPACE_CONFIG_ENDING + "...")
val files = file.listFiles(new FilenameFilter() {
override def accept(dir: File, name: String): Boolean = name.endsWith(SCRAY_QUERYSPACE_CONFIG_ENDING) })
files.map { conffile => Try {
val parsedFile = ScrayQueryspaceConfigurationParser.parse(IOUtils.toString(new FileInputStream(conffile)), config, true)
logger.info(READ_FILE + conffile.toString() + WITH_QS + parsedFile.get.name)
ScannedQueryspaceConfigfiles(conffile.toString(), parsedFile.get.name, parsedFile.get.version, parsedFile.get)
}}.toSeq
val files = file.listFiles(new FilenameFilter() {
override def accept(dir: File, name: String): Boolean = name.endsWith(SCRAY_QUERYSPACE_CONFIG_ENDING)
})
files.map { conffile =>
Try {
val parsedFile = ScrayQueryspaceConfigurationParser.parse(IOUtils.toString(new FileInputStream(conffile)), config, true)
logger.info(READ_FILE + conffile.toString() + WITH_QS + parsedFile.get.name)
ScannedQueryspaceConfigfiles(conffile.toString(), parsedFile.get.name, parsedFile.get.version, parsedFile.get)
}
}.toSeq
} else {
Seq(Try {
val parsedFile = ScrayQueryspaceConfigurationParser.parse(IOUtils.toString(new FileInputStream(file)), config, true)
logger.info(READ_FILE + url.url + WITH_QS + parsedFile.get.name)
logger.info(READ_FILE + url.url + WITH_QS + parsedFile.get.name)
ScannedQueryspaceConfigfiles(url.url, parsedFile.get.name, parsedFile.get.version, parsedFile.get)
})
}
} else {
val msg = "Could not read queryspace configuration file " + url.url
logger.warn(msg)
Seq(Failure(new IOException(msg)))
}
}
}

/**
* Scan directories and files for Queryspaces.
* We ignore reload for now, which is controlled externally.
*/
def scanDirectories(config: ScrayConfiguration): Seq[Try[ScannedQueryspaceConfigfiles]] = {
config.urls.map { url =>
if(url.url.substring(0, HDFS_SCHEMA_LENGTH).equalsIgnoreCase(HDFS_SCHEMA)) {
if (url.url.substring(0, HDFS_SCHEMA_LENGTH).equalsIgnoreCase(HDFS_SCHEMA)) {
// if the url starts with hdfs we read from HDFS
handleHDFS(url, config)
} else {
// if the URL contains :// we expect this to be a URL to a file containing scray queryspace information
if(url.url.contains(SCHEMA_SEPARATOR)) {
if (url.url.contains(SCHEMA_SEPARATOR)) {
handleURL(url, config)
} else {
// this is either a regular file or a directory...
Expand Down

0 comments on commit dc21436

Please sign in to comment.