From c27d4c1d8b30fddb5419b425de3f2b6cad916d5c Mon Sep 17 00:00:00 2001 From: Stefan Obermeier Date: Wed, 30 Aug 2017 10:55:45 +0200 Subject: [PATCH] Allow conf loading from different sources --- .../scray/cassandra/sync/CassandraImpl.scala | 2 +- .../cassandra/sync/CassandraJobInfo.scala | 2 +- .../cassandra/sync/StartTimeDetector.scala | 2 +- .../cassandra/sync/TransactionTests.scala | 2 +- scray-querying/nohup.out | 3 - .../sync/conf/SyncConfigurationLoader.scala | 87 +++++++++++++++---- .../src/test/resources/scray-sync.yaml | 3 + 7 files changed, 75 insertions(+), 26 deletions(-) delete mode 100644 scray-querying/nohup.out create mode 100644 scray-querying/src/test/resources/scray-sync.yaml diff --git a/scray-cassandra/src/main/scala/scray/cassandra/sync/CassandraImpl.scala b/scray-cassandra/src/main/scala/scray/cassandra/sync/CassandraImpl.scala index 587feea2d..e448c86c4 100644 --- a/scray-cassandra/src/main/scala/scray/cassandra/sync/CassandraImpl.scala +++ b/scray-cassandra/src/main/scala/scray/cassandra/sync/CassandraImpl.scala @@ -85,7 +85,7 @@ class OnlineBatchSyncCassandra(dbSession: DbSession[Statement, Insert, ResultSet // Create or use a given DB session. @transient val session = dbSession - val config: SyncConfiguration = (new SyncConfigurationLoader).loadConfig + val config: SyncConfiguration = SyncConfigurationLoader.loadConfig val syncTable = SyncTable(config.dbSystem, config.tableName) val jobLockTable = JobLockTable(config.dbSystem, "JobLockTable") diff --git a/scray-cassandra/src/main/scala/scray/cassandra/sync/CassandraJobInfo.scala b/scray-cassandra/src/main/scala/scray/cassandra/sync/CassandraJobInfo.scala index d960da994..8a335f0ef 100644 --- a/scray-cassandra/src/main/scala/scray/cassandra/sync/CassandraJobInfo.scala +++ b/scray-cassandra/src/main/scala/scray/cassandra/sync/CassandraJobInfo.scala @@ -41,7 +41,7 @@ class CassandraJobInfo( def getLock(dbSession: DbSession[Statement, Insert, ResultSet]): LockApi[Statement, Insert, ResultSet] = { this.lock = this.lock.orElse { - val configSync: SyncConfiguration = (new SyncConfigurationLoader).loadConfig + val configSync: SyncConfiguration = SyncConfigurationLoader.loadConfig val table = JobLockTable(configSync.dbSystem, "JobSync") dbSession.execute(statementGenerator.createKeyspaceCreationStatement(table, configSync.replicationSetting).get). diff --git a/scray-cassandra/src/main/scala/scray/cassandra/sync/StartTimeDetector.scala b/scray-cassandra/src/main/scala/scray/cassandra/sync/StartTimeDetector.scala index 0d2bc8527..7f55093fa 100644 --- a/scray-cassandra/src/main/scala/scray/cassandra/sync/StartTimeDetector.scala +++ b/scray-cassandra/src/main/scala/scray/cassandra/sync/StartTimeDetector.scala @@ -39,7 +39,7 @@ import scray.querying.sync.conf.SyncConfiguration class StartTimeDetector(job: JobInfo[Statement, Insert, ResultSet], dbSession: DbSession[Statement, Insert, ResultSet]) extends LazyLogging { - val configSync: SyncConfiguration = (new SyncConfigurationLoader).loadConfig + val configSync: SyncConfiguration = SyncConfigurationLoader.loadConfig val startConsensusTable = new Table(configSync.dbSystem, "startconsensus", new StartConsensusRow) var valueAlreadySet = false diff --git a/scray-cassandra/src/test/scala/scray/cassandra/sync/TransactionTests.scala b/scray-cassandra/src/test/scala/scray/cassandra/sync/TransactionTests.scala index 5a73bbcbe..eb96fe2cb 100644 --- a/scray-cassandra/src/test/scala/scray/cassandra/sync/TransactionTests.scala +++ b/scray-cassandra/src/test/scala/scray/cassandra/sync/TransactionTests.scala @@ -28,7 +28,7 @@ class TransactionTests extends WordSpec { "lock job" in { val jobInfo = CassandraJobInfo(getNextJobName) val table = new OnlineBatchSyncCassandra(dbconnection) - table.initJob(jobInfo, new SumTestColumns()) + table.initJob(jobInfo, new SumTestColumns) jobInfo.getLock(dbconnection) assert(true) diff --git a/scray-querying/nohup.out b/scray-querying/nohup.out deleted file mode 100644 index 005ca2818..000000000 --- a/scray-querying/nohup.out +++ /dev/null @@ -1,3 +0,0 @@ -Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=384m; support was removed in 8.0 -NOT SANDBOXED -Vector smash protection is enabled. diff --git a/scray-querying/src/main/scala/scray/querying/sync/conf/SyncConfigurationLoader.scala b/scray-querying/src/main/scala/scray/querying/sync/conf/SyncConfigurationLoader.scala index 6bc4f1ac5..b1d9e5b16 100644 --- a/scray-querying/src/main/scala/scray/querying/sync/conf/SyncConfigurationLoader.scala +++ b/scray-querying/src/main/scala/scray/querying/sync/conf/SyncConfigurationLoader.scala @@ -26,34 +26,83 @@ import org.yaml.snakeyaml.introspector.PropertyUtils; import com.typesafe.scalalogging.slf4j.LazyLogging import com.twitter.chill.config.Config - /** * Load scray sync configuration */ -class SyncConfigurationLoader(path: String = "scray-sync.yaml") extends LazyLogging { - - def loadConfig: SyncConfiguration = { +object SyncConfigurationLoader extends LazyLogging { - val url: URL = (new File(path)).toURI().toURL + val DEFAULT_CONFIGURATION = "scray-sync.yaml"; - val yaml = new Yaml; - val yamlData = yaml.load(url.openStream).asInstanceOf[Map[String, String]] - - val conf = new SyncConfiguration + private def getConfigURL: Option[URL] = { + var configUrl = System.getProperty("cassandra.config"); - - if(yamlData.get("dbSystem") != null) { - conf.dbSystem = yamlData.get("dbSystem") + if (configUrl == null) { + logger.debug(s"Option -Dcassandra.config not foud. Try to use ${DEFAULT_CONFIGURATION}") + configUrl = DEFAULT_CONFIGURATION; } - - if(yamlData.get("tableName") != null) { - conf.tableName = yamlData.get("tableName") + + var url: Option[URL] = None + try { + url = Some(new URL(configUrl)) + url.get.openStream().close(); + } catch { + case e: Exception => { + val loader = SyncConfigurationLoader.getClass.getClassLoader + val resourceURL = loader.getResource(configUrl) + + if (resourceURL == null) { + logger.warn(s"No valid configuration file found in ${configUrl}. Use default values") + url = None + } else { + url = Some(resourceURL) + } + } } + + url + } + + def loadConfig: SyncConfiguration = this.synchronized { + var conf: Option[SyncConfiguration] = None + + getConfigURL.map(url => { + + if (conf.isDefined) { + logger.debug(s"Configuration already loaded. Use existing configuration: ${conf.get}") + } else { + + conf = Some(new SyncConfiguration) + conf.map(confe => { + try { + val yaml = new Yaml; + val yamlData = yaml.load(url.openStream).asInstanceOf[Map[String, String]] + + if (yamlData.get("dbSystem") != null) { + confe.dbSystem = yamlData.get("dbSystem") + } + + if (yamlData.get("tableName") != null) { + confe.tableName = yamlData.get("tableName") + } + + if (yamlData.get("replication") != null) { + confe.replicationSetting = yamlData.get("replication") + } + } catch { + case e: YAMLException => { + logger.error(s"Invalid sync configuration yaml: ${url}, ${e.getMessage}") + conf = None + } + } + }) + } + }) - if(yamlData.get("replication") != null) { - conf.replicationSetting = yamlData.get("replication") + // Use default configuration if no configuration file was provided + if(conf.isDefined) { + conf.get + } else { + new SyncConfiguration } - - conf } } \ No newline at end of file diff --git a/scray-querying/src/test/resources/scray-sync.yaml b/scray-querying/src/test/resources/scray-sync.yaml new file mode 100644 index 000000000..7d8f812b0 --- /dev/null +++ b/scray-querying/src/test/resources/scray-sync.yaml @@ -0,0 +1,3 @@ +dbSystem: scray +tableName : synctable +replication: "{'class': 'NetworkTopologyStrategy', 'DC1': '5', 'DC2': '3', 'DC3': '0'}" \ No newline at end of file