From 123da0849d72156f0d3ecf4ffbaea510d8ee373d Mon Sep 17 00:00:00 2001 From: Jan Tennert Date: Mon, 9 Dec 2024 18:20:42 +0100 Subject: [PATCH] Add pull token approach to realtime (#807) * Add pull token approach to realtime * Update docs --- .../github/jan/supabase/realtime/Realtime.kt | 19 +++++++++++++++++++ .../jan/supabase/realtime/RealtimeChannel.kt | 4 ++-- .../supabase/realtime/RealtimeChannelImpl.kt | 13 +++++++++---- .../jan/supabase/realtime/RealtimeImpl.kt | 9 ++++++--- 4 files changed, 36 insertions(+), 9 deletions(-) diff --git a/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/Realtime.kt b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/Realtime.kt index 6ea5aa95..774d6bf6 100644 --- a/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/Realtime.kt +++ b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/Realtime.kt @@ -4,7 +4,9 @@ import io.github.jan.supabase.SupabaseClient import io.github.jan.supabase.SupabaseClientBuilder import io.github.jan.supabase.SupabaseSerializer import io.github.jan.supabase.annotations.SupabaseInternal +import io.github.jan.supabase.auth.resolveAccessToken import io.github.jan.supabase.logging.SupabaseLogger +import io.github.jan.supabase.logging.w import io.github.jan.supabase.plugins.CustomSerializationConfig import io.github.jan.supabase.plugins.CustomSerializationPlugin import io.github.jan.supabase.plugins.MainConfig @@ -92,6 +94,15 @@ sealed interface Realtime : MainPlugin, CustomSerializationPlug @SupabaseInternal suspend fun send(message: RealtimeMessage) + /** + * Sets the JWT access token used for channel subscription authorization and Realtime RLS. + * + * If [token] is null, the token will be resolved using the [Realtime.Config.accessToken] provider. + * + * @param token The JWT access token + */ + suspend fun setAuth(token: String? = null) + /** * @property websocketConfig Custom configuration for the Ktor Websocket Client. This only applies if [Realtime.Config.websocketFactory] is null. * @property secure Whether to use wss or ws. Defaults to [SupabaseClient.useHTTPS] when null @@ -114,6 +125,14 @@ sealed interface Realtime : MainPlugin, CustomSerializationPlug var disconnectOnNoSubscriptions: Boolean = true, ): MainConfig(), CustomSerializationConfig { + /** + * A custom access token provider. If this is set, the [SupabaseClient] will not be used to resolve the access token. + */ + var accessToken: suspend SupabaseClient.() -> String? = { resolveAccessToken(realtime, keyAsFallback = false) } + set(value) { + logger.w { "You are setting a custom access token provider. This can lead to unexpected behavior." } + field = value + } override var serializer: SupabaseSerializer? = null } diff --git a/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeChannel.kt b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeChannel.kt index 2c461805..60365b51 100644 --- a/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeChannel.kt +++ b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeChannel.kt @@ -46,9 +46,9 @@ sealed interface RealtimeChannel { suspend fun subscribe(blockUntilSubscribed: Boolean = false) /** - * Updates the JWT token for this client + * Updates the JWT token for this channel */ - suspend fun updateAuth(jwt: String) + suspend fun updateAuth(jwt: String?) /** * Unsubscribes from the channel diff --git a/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeChannelImpl.kt b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeChannelImpl.kt index 27bdca12..a03d05ce 100644 --- a/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeChannelImpl.kt +++ b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeChannelImpl.kt @@ -1,7 +1,6 @@ package io.github.jan.supabase.realtime import io.github.jan.supabase.annotations.SupabaseInternal -import io.github.jan.supabase.auth.resolveAccessToken import io.github.jan.supabase.collections.AtomicMutableList import io.github.jan.supabase.logging.d import io.github.jan.supabase.logging.e @@ -41,7 +40,9 @@ internal class RealtimeChannelImpl( private val _status = MutableStateFlow(RealtimeChannel.Status.UNSUBSCRIBED) override val status = _status.asStateFlow() override val realtime: Realtime = realtimeImpl - + private val accessToken = suspend { + realtimeImpl.config.accessToken(supabaseClient) ?: realtimeImpl.accessToken + } override val supabaseClient = realtimeImpl.supabaseClient private val broadcastUrl = realtimeImpl.broadcastUrl() @@ -59,7 +60,7 @@ internal class RealtimeChannelImpl( } _status.value = RealtimeChannel.Status.SUBSCRIBING Realtime.logger.d { "Subscribing to channel $topic" } - val currentJwt = supabaseClient.resolveAccessToken(realtimeImpl, keyAsFallback = false) + val currentJwt = accessToken() val postgrestChanges = clientChanges.toList() val joinConfig = RealtimeJoinPayload(RealtimeJoinConfig(broadcastJoinConfig, presenceJoinConfig, postgrestChanges, isPrivate)) val joinConfigObject = buildJsonObject { @@ -93,7 +94,7 @@ internal class RealtimeChannelImpl( realtimeImpl.send(RealtimeMessage(topic, RealtimeChannel.CHANNEL_EVENT_LEAVE, buildJsonObject {}, null)) } - override suspend fun updateAuth(jwt: String) { + override suspend fun updateAuth(jwt: String?) { Realtime.logger.d { "Updating auth token for channel $topic" } realtimeImpl.send(RealtimeMessage(topic, RealtimeChannel.CHANNEL_EVENT_ACCESS_TOKEN, buildJsonObject { put("access_token", jwt) @@ -102,12 +103,16 @@ internal class RealtimeChannelImpl( override suspend fun broadcast(event: String, message: JsonObject) { if(status.value != RealtimeChannel.Status.SUBSCRIBED) { + val token = accessToken() val response = httpClient.postJson( url = broadcastUrl, body = BroadcastApiBody(listOf(BroadcastApiMessage(subTopic, event, message, isPrivate))) ) { headers { append("apikey", realtimeImpl.supabaseClient.supabaseKey) + token?.let { + set("Authorization", "Bearer $it") + } } } @Suppress("MagicNumber") diff --git a/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeImpl.kt b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeImpl.kt index b05f4256..8b578b8f 100644 --- a/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeImpl.kt +++ b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeImpl.kt @@ -43,6 +43,7 @@ import kotlinx.serialization.json.buildJsonObject override val subscriptions: Map = _subscriptions private val scope = CoroutineScope(Dispatchers.Default + SupervisorJob()) private val mutex = Mutex() + internal var accessToken by atomic(null) var heartbeatJob: Job? = null var messageJob: Job? = null var ref by atomic(0) @@ -92,7 +93,7 @@ import kotlinx.serialization.json.buildJsonObject supabaseClient.pluginManager.getPluginOrNull(Auth)?.sessionStatus?.collect { if(status.value == Realtime.Status.CONNECTED) { when(it) { - is SessionStatus.Authenticated -> updateJwt(it.session.accessToken) + is SessionStatus.Authenticated -> setAuth(it.session.accessToken) is SessionStatus.NotAuthenticated -> { if(config.disconnectOnSessionLoss) { Realtime.logger.w { "No auth session found, disconnecting from realtime websocket"} @@ -166,9 +167,11 @@ import kotlinx.serialization.json.buildJsonObject } } - private fun updateJwt(jwt: String) { + override suspend fun setAuth(token: String?) { + val newToken = token ?: config.accessToken(supabaseClient) + this.accessToken = newToken scope.launch { - subscriptions.values.filter { it.status.value == RealtimeChannel.Status.SUBSCRIBED }.forEach { it.updateAuth(jwt) } + subscriptions.values.filter { it.status.value == RealtimeChannel.Status.SUBSCRIBED }.forEach { it.updateAuth(accessToken) } } }