Kotlin: Subscribe to channel

Subscribe to realtime changes in your database.

Examples

Listen to broadcasts

@Serializable
data class Message(val content: String, val sender: String)

val channel = supabase.channel("channelId") \{
    // optional config
\}

val broadcastFlow = channel.broadcastFlow<Message>(event = "message")

// Collect the flow
broadcastFlow.onEach \{ // it: Message
    println(it)
\}.launchIn(coroutineScope) // launch a new coroutine to collect the flow

channel.subscribe(blockUntilSubscribed = true)

channel.broadcast(event = "message", Message("I joined!", "John"))

Listen to presence updates

@Serializable
data class PresenceState(val username: String)

val channel = supabase.channel("channelId") \{
    //optional config
\}

//if you want more control and want to receive the raw data, use the `presenceChangeFlow` method
val presenceFlow = channel.presenceDataFlow<PresenceState>() 

//Collect the flow
presenceFlow.onEach \{
    for (presence in it) \{
        println(presence.username)
    \}
\}.launchIn(coroutineScope) // launch a new coroutine to collect the flow

channel.subscribe(blockUntilSubscribed = true)
//send your own state
channel.track(PresenceState(username = "John"))

Listen to all database changes

val channel = supabase.channel("channelId") \{
    //optional config
\}
val changeFlow = channel.postgresChangeFlow<PostgresAction>(schema = "public")

//Collect the flow
changeFlow.onEach \{
    when(it) \{
        is PostgresAction.Delete -> println("Deleted: $\{it.oldRecord\}")
        is PostgresAction.Insert -> println("Inserted: $\{it.record\}")
        is PostgresAction.Select -> println("Selected: $\{it.record\}")
        is PostgresAction.Update -> println("Updated: $\{it.oldRecord\} with $\{it.record\}")
    \}
\}.launchIn(coroutineScope) // launch a new coroutine to collect the flow

channel.subscribe()

Listen to a specific table

val channel = supabase.channel("channelId") \{
    //optional config
\}

//This flow will automatically emit the initial data of all messages (which can be restricted by specifying a filter) and then listens for updates.
//The data is cached automatically by the primary key you specify
val users: Flow<List<User>> = channel.postgresListDataFlow(schema = "public", table = "users", primaryKey = User::id)

//If you want more control and want to receive the raw data use the `postgresChangeFlow` method:
val changeFlow = channel.postgresChangeFlow<PostgresAction>(schema = "public") \{
    table = "users"
\}

//Collect the flow
changeFlow.onEach \{
    when(it) \{
        is PostgresAction.Delete -> println("Deleted: $\{it.oldRecord\}")
        is PostgresAction.Insert -> println("Inserted: $\{it.record\}")
        is PostgresAction.Select -> println("Selected: $\{it.record\}")
        is PostgresAction.Update -> println("Updated: $\{it.oldRecord\} with $\{it.record\}")
    \}
\}.launchIn(coroutineScope) // launch a new coroutine to collect the flow

channel.subscribe()

Listen to inserts

val channel = supabase.channel("channelId") \{
    //optional config
\}
val changeFlow = channel.postgresChangeFlow<PostgresAction.Insert>(schema = "public") \{
    table = "users"
\}

//Collect the flow
changeFlow.onEach \{
    println(it.record)
\}.launchIn(coroutineScope) // launch a new coroutine to collect the flow

channel.subscribe()

Listen to updates

val channel = supabase.channel("channelId") \{
    //optional config
\}

val changeFlow = channel.postgresChangeFlow<PostgresAction.Update>(schema = "public") \{
    table = "users"
\}

//Collect the flow
changeFlow.onEach \{
    println(it.record)
    println(it.oldRecord)
\}.launchIn(coroutineScope) // launch a new coroutine to collect the flow

channel.subscribe()

Listen to deletes

val channel = supabase.channel("channelId") \{
    //optional config
\}

val changeFlow = channel.postgresChangeFlow<PostgresAction.Delete>(schema = "public") \{
    table = "users"
\}

//Collect the flow
changeFlow.onEach \{
    println(it.oldRecord)
\}.launchIn(coroutineScope) // launch a new coroutine to collect the flow

channel.subscribe()

Listen to row level changes

val channel = supabase.channel("channelId") \{
   //optional config
\}

//This flow will automatically emit the user's initial data and then emits new values on update. The flow ends on deletion
val user: Flow<User> = channel.postgresSingleDataFlow(schema = "public", table = "users", primaryKey = User::id) \{
    //This is the same filter as used in postgrest, so you could use complex queries, but only one entry is fetched
    eq("id", 1)
\}

//If you want more control, use the `postgresChangeFlow` method which works the same as the other examples
val changeFlow = channel.postgresChangeFlow<PostgresAction.Delete>(schema = "public") \{
    table = "users"
    filter("id", FilterOperator.EQ, 1)
\}

//Collect the flow
changeFlow.onEach \{
    println(it.oldRecord)
\}.launchIn(coroutineScope) // launch a new coroutine to collect the flow

channel.subscribe()