Skip to content

Commit

Permalink
fixup! Support Snowflake (spotify#5500)
Browse files Browse the repository at this point in the history
  • Loading branch information
turb committed Nov 13, 2024
1 parent 0859b24 commit 8b0da46
Showing 1 changed file with 23 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.spotify.scio.snowflake

import scala.util.chaining._
import com.spotify.scio.ScioContext
import com.spotify.scio.coders.{Coder, CoderMaterializer}
import com.spotify.scio.io.{EmptyTap, EmptyTapOf, ScioIO, Tap, TapT}
Expand All @@ -27,32 +28,29 @@ import org.apache.beam.sdk.io.{snowflake => beam}

object SnowflakeIO {

private[snowflake] def dataSourceConfiguration(connectionOptions: SnowflakeConnectionOptions) = {

val datasourceInitial = beam.SnowflakeIO.DataSourceConfiguration
private[snowflake] def dataSourceConfiguration(connectionOptions: SnowflakeConnectionOptions) =
beam.SnowflakeIO.DataSourceConfiguration
.create()

val datasourceWithAuthent = connectionOptions.authenticationOptions match {
case SnowflakeUsernamePasswordAuthenticationOptions(username, password) =>
datasourceInitial.withUsernamePasswordAuth(username, password)
case SnowflakeKeyPairAuthenticationOptions(username, privateKeyPath, None) =>
datasourceInitial.withKeyPairPathAuth(username, privateKeyPath)
case SnowflakeKeyPairAuthenticationOptions(username, privateKeyPath, Some(passphrase)) =>
datasourceInitial.withKeyPairPathAuth(username, privateKeyPath, passphrase)
case SnowflakeOAuthTokenAuthenticationOptions(token) =>
datasourceInitial.withOAuth(token)
}

val datasourceBeforeSchema = datasourceWithAuthent
.withServerName(connectionOptions.serverName)
.withDatabase(connectionOptions.database)
.withRole(connectionOptions.role)
.withWarehouse(connectionOptions.warehouse)

connectionOptions.schema
.map(schema => datasourceBeforeSchema.withSchema(schema))
.getOrElse(datasourceBeforeSchema)
}
.pipe(ds =>
connectionOptions.authenticationOptions match {
case SnowflakeUsernamePasswordAuthenticationOptions(username, password) =>
ds.withUsernamePasswordAuth(username, password)
case SnowflakeKeyPairAuthenticationOptions(username, privateKeyPath, None) =>
ds.withKeyPairPathAuth(username, privateKeyPath)
case SnowflakeKeyPairAuthenticationOptions(username, privateKeyPath, Some(passphrase)) =>
ds.withKeyPairPathAuth(username, privateKeyPath, passphrase)
case SnowflakeOAuthTokenAuthenticationOptions(token) =>
ds.withOAuth(token)
}
)
.pipe(ds =>
ds
.withServerName(connectionOptions.serverName)
.withDatabase(connectionOptions.database)
.withRole(connectionOptions.role)
.withWarehouse(connectionOptions.warehouse)
)
.pipe(ds => connectionOptions.schema.map(schema => ds.withSchema(schema)).getOrElse(ds))

private[snowflake] def buildCsvMapper[T](rowDecoder: RowDecoder[T]): CsvMapper[T] =
new CsvMapper[T] {
Expand Down

0 comments on commit 8b0da46

Please sign in to comment.