Computer scienceProgramming languagesKotlinAdditional instrumentsAdditional librariesCommunications and Networks

WebSocket

9 minutes read

Let's imagine three applications:

  • a service for selling plane tickets. We need up-to-date information on the number of available tickets and their prices.

  • a stock exchange monitor. In this case, the information is updated non-stop.

  • a chat application in which you want to see the message window updated in real-time.

To exchange data in such applications, we can use WebSocket. In this topic, you will learn what it is and how to take full advantage of its functionalities.

WebSocket

WebSocket is a full-duplex communication protocol. Full-duplex is a connection in which the client and server can send and receive requests independently of each other (as opposed to half-duplex, where the request-response is directed to one of the parties).

As usual, there are two parties: a server waiting for connection and a client that can establish connection. Then, they can send frames to each other. A frame is a message that can contain text or binary data.

WebSocket is built on top of HTTP. Every session is initiated by the client sending a normal HTTP request. On the server side, the connection gets upgraded to WebSocket connection. For us, this means that we can transfer data via a usual HTTP request before the session starts.

WebSocket server

For example, let's create a messaging application. A client sends their name (via HTTP) when connecting. During the session, every text frame from any client is just forwarded to all connected clients by the server. Let's use Ktor for the server side. We need to add it to our build.gradle file:

dependencies {
    implementation("io.ktor:ktor-server-core:2.2.3")
    implementation("io.ktor:ktor-server-netty:2.2.3")
    implementation("io.ktor:ktor-server-websockets:2.2.3")
    // please look up the last version at ktor.io -^^^^^
}

Then, let's create an embedded server and enable WebSocket:

fun main() {
    embeddedServer(Netty, host = "localhost", port = 8080) {
        install(WebSockets)
        routing {

        }
    }.start(true)
}

In Ktor, the WebSocket handler is a suspend function that runs a separate co-routine for each client. We need to store all connected clients to be able to reach them from any client's scope.

val clients = ConcurrentHashMap<WebSocketSession, String>()

suspend fun broadcast(clients: Map<WebSocketSession, *>, text: String) {
    clients.forEach { (client, _) ->
        client.send(Frame.Text(text))
    }
}

Then, inside routing, we'll handle clients connecting to the /chat endpoint.

webSocket("/chat") {
    // oblige everyone to have a name
    val name = call.parameters["name"]?.takeIf(String::isNotBlank)
        ?: return@webSocket close(CloseReason(CloseReason.Codes.CANNOT_ACCEPT, "no nickname"))

    try {
        clients[this] = name
        broadcast(clients, "$name joined!")

        // for each incoming text frame, send it to everyone
        for (frame in incoming) {
            if (frame is Frame.Text) {
                val text = frame.readText()
                broadcast(clients, "$name: $text")
            }
        }
    } finally {
        // on disconnet, forget client and say bye
        clients.remove(this)
        broadcast(clients, "$name left.")
    }
}

Proof-of-concept on the server side is done! Now let's implement a command-line client.

WebSocket client

On the client side, we're going to use the popular and lightweight OkHttp library:

dependencies {
    implementation("com.squareup.okhttp3:okhttp:4.10.0")
}

Now we need to instantiate a client and create an HTTP request which differs from "normal" HTTP by using ws scheme instead of http, or wss instead of https. Oh, and we need to ask for the username before joining:

fun main() {
    print("What's your name? ")
    val name = readLine() ?: return

    val client = OkHttpClient()
    val request =
        Request.Builder()
            .url("ws://localhost:8080/chat?name=" +
                URLEncoder.encode(name, StandardCharsets.UTF_8)
            )
            .build()

In order to connect, we call the client.newWebSocket(Request, WebSocketlistener) function. WebSocketListener is an abstract class that has several callback functions we can override.

Draft implementation of a messaging client may look like this: just print all incoming text messages, and forward lines received from the terminal to the server.

val ws = client.newWebSocket(
    request,
    object : WebSocketListener() {
        override fun onMessage(webSocket: WebSocket, text: String) {
            println(text)
        }
    }
)

while (true) {
    ws.send(readLine() ?: break)
}

Here's what it looks like from one client's perspective:

What's your name? Jack
Jack joined!
Jill joined!
hello
Jack: hello
Jill: hi bro!
Jill: sorry, gotta go
Jill left.

Okay, this works, but it's not enough! Let's examine the WebSocketListener class.

abstract class WebSocketListener {
  /**
   * Invoked once a web socket has been accepted by the remote peer and may begin transmitting
   * messages.
   */
  open fun onOpen(webSocket: WebSocket, response: Response) {
  }

  /** Invoked when a text (type `0x1`) message has been received. */
  open fun onMessage(webSocket: WebSocket, text: String) {
  }

  /** Invoked when a binary (type `0x2`) message has been received. */
  open fun onMessage(webSocket: WebSocket, bytes: ByteString) {
    // (ByteString is just an immutable ByteArray)
  }

  /**
   * Invoked when the remote peer has indicated that no more incoming messages will be transmitted.
   */
  open fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
  }

  /**
   * Invoked when both peers have indicated that no more messages will be transmitted and the
   * connection has been successfully released. No further calls to this listener will be made.
   */
  open fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
  }

  /**
   * Invoked when a web socket has been closed due to an error reading from or writing to the
   * network. Both outgoing and incoming messages may have been lost. No further calls to this
   * listener will be made.
   */
  open fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
  }
}

What can we tell based on it?

  • We must never send anything to WebSocket before onOpen is called since it's not connected yet.

  • The messaging session ends after onClosed or onFailure, so that's when we should shut the application down.

  • If onClosing is received, we should do the same: close the session and let it shut down. Otherwise, we can get a dangling connection, which is useless but still uses some memory and a TCP socket.

Also, the user should be able to exit the session, for example, by sending EOF (Ctrl+D) to the terminal. We will know it happened when readLine() return null.

To break the connection, there's the close(code: Int, reason: String?) function. code=1000 means normal disconnection, all defined codes are listed in RFC 6455, Section 7.4.1. The reason is optional and can contain an arbitrary short text message.

Bringing all this together, we'll get this:

client.newWebSocket(
    request,
    object : WebSocketListener() {
        override fun onOpen(webSocket: WebSocket, response: Response) {
            // start accepting input in a separate thread (I/O is blocking!)
            thread(isDaemon = true) {
                while (true)
                    webSocket.send(readLine() ?: break)

                // disconnect when EOF received
                webSocket.close(1000, null)
                client.close()
            }
        }
        override fun onMessage(webSocket: WebSocket, text: String) {
            println(text)
        }
        override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
            // You're leaving me? Oh OK, I'm leaving you too!
            webSocket.close(1000, null)
        }
        override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
            println("Connection closed. $code $reason")
            client.close()
        }
        override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
            t.printStackTrace()
            client.close()
        }
    }
)

OkHttp client doesn't have the .close() function on its own. To shut down OkHttp threads (and, in our case, exit the process because there are no alive non-daemon threads left):

private fun OkHttpClient.close() {
    dispatcher.executorService.shutdown()
    connectionPool.evictAll()
}

Conclusion

Here is what you've learned in this topic:

  • WebSocket provides a permanent two-way connection, which allows soft real-time data updates;

  • The abstract class WebSocketListener provides a set of necessary methods for using the connection;

  • It's important to close the connection when not in use, this prevents memory leaks;

  • When developing the server-side of the application, remember that the load on the server increases with each open connection;

  • You can track the events of the created connection, such as onOpen, onFailure, onClosed, onMessage and exchange information inform about the state of the connection, send and receive messages.

19 learners liked this piece of theory. 2 didn't like it. What about you?
Report a typo