Kotlin: Subscribe to channel

Subscribe to realtime changes in your database.

Examples

Connect to Realtime

supabase.realtime.connect()

Listen to broadcasts

@Serializable
data class Message(val content: String, val sender: String)

val channel = supabase.realtime.createChannel("channelId") \{
    //optional config
\}

val broadcastFlow = channel.broadcastFlow<Message>(event = "message")

//in a new coroutine (or use Flow.onEach().launchIn(scope)):
broadcastFlow.collect \{ //it: Message
    println(it)
\}

supabase.realtime.connect()
channel.join(blockUntilJoined = true)

channel.broadcast(event = "message", Message("I joined!", "John"))

Listen to presence updates

@Serializable
data class PresenceState(val username: String)

val connectedUsers = mutableSetOf<PresenceState>()
val channel = supabase.realtime.createChannel("channelId") \{
    //optional config
\}

val presenceChangeFlow = channel.presenceChangeFlow()

//in a new coroutine (or use Flow.onEach().launchIn(scope)):
presenceChangeFlow.collect \{
    connectedUsers += it.decodeJoinsAs<PresenceState>()
    connectedUsers -= it.decodeLeavesAs<PresenceState>()
\}

supabase.realtime.connect()
channel.join(blockUntilJoined = true)
//send own state
channel.track(PresenceState(username = "John"))

Listen to all database changes

val channel = supabase.realtime.createChannel("channelId") \{
    //optional config
\}
val changeFlow = channel.postgresChangeFlow<PostgresAction>(schema = "public")

//in a new coroutine (or use Flow.onEach().launchIn(scope)):
changeFlow.collect \{
    when(it) \{
        is PostgresAction.Delete -> println("Deleted: $\{it.oldRecord\}")
        is PostgresAction.Insert -> println("Inserted: $\{it.record\}")
        is PostgresAction.Select -> println("Selected: $\{it.record\}")
        is PostgresAction.Update -> println("Updated: $\{it.oldRecord\} with $\{it.record\}")
    \}
\}

supabase.realtime.connect()
channel.join()

Listen to a specific table

val channel = supabase.realtime.createChannel("channelId") \{
    //optional config
\}
val changeFlow = channel.postgresChangeFlow<PostgresAction>(schema = "public") \{
    table = "users"
\}

//in a new coroutine (or use Flow.onEach().launchIn(scope)):
changeFlow.collect \{
    when(it) \{
        is PostgresAction.Delete -> println("Deleted: $\{it.oldRecord\}")
        is PostgresAction.Insert -> println("Inserted: $\{it.record\}")
        is PostgresAction.Select -> println("Selected: $\{it.record\}")
        is PostgresAction.Update -> println("Updated: $\{it.oldRecord\} with $\{it.record\}")
    \}
\}

supabase.realtime.connect()
channel.join()

Listen to inserts

val channel = supabase.realtime.createChannel("channelId") \{
    //optional config
\}
val changeFlow = channel.postgresChangeFlow<PostgresAction.Insert>(schema = "public") \{
    table = "users"
\}

//in a new coroutine (or use Flow.onEach().launchIn(scope)):
changeFlow.collect \{
    println(it.record)
\}

supabase.realtime.connect()
channel.join()

Listen to updates

val channel = supabase.realtime.createChannel("channelId") \{
    //optional config
\}
val changeFlow = channel.postgresChangeFlow<PostgresAction.Update>(schema = "public") \{
    table = "users"
\}

//in a new coroutine (or use Flow.onEach().launchIn(scope)):
changeFlow.collect \{
    println(it.record)
    println(it.oldRecord)
\}

supabase.realtime.connect()
channel.join()

Listen to deletes

val channel = supabase.realtime.createChannel("channelId") \{
    //optional config
\}
val changeFlow = channel.postgresChangeFlow<PostgresAction.Delete>(schema = "public") \{
    table = "users"
\}

//in a new coroutine (or use Flow.onEach().launchIn(scope)):
changeFlow.collect \{
    println(it.oldRecord)
\}

supabase.realtime.connect()
channel.join()

Listen to row level changes

val channel = supabase.realtime.createChannel("channelId") \{
   //optional config
\}
val changeFlow = channel.postgresChangeFlow<PostgresAction.Delete>(schema = "public") \{
    table = "users"
    filter = "id=eq.1"
\}

//in a new coroutine:
changeFlow.collect \{
    println(it.oldRecord)
\}

supabase.realtime.connect()
channel.join()