Having debated several ways of doing a soft-realtime app it seems that websockets are indeed the path of least resistance.
Python provides the admirably ergonomic "websockets" module. This looks superficially extremely simple, but getting this working was an insane nightmare.
#!/usr/bin/env python3
import asyncio
import websockets
import datetime
import random
import logging
async def consumer_handler(websocket):
while True:
logging.info("consumer looping")
await asyncio.sleep(5.0)
async def producer_handler(websocket):
while True:
logging.info("producer looping")
await websocket.send("foo")
await asyncio.sleep(1.0)
async def handler(websocket, path):
consumer_task = asyncio.ensure_future(consumer_handler(websocket))
producer_task = asyncio.ensure_future(producer_handler(websocket))
done, pending = await asyncio.wait(
[consumer_task, producer_task],
return_when=asyncio.FIRST_COMPLETED,
)
for task in pending:
task.cancel()
logging.basicConfig(level=logging.INFO)
print("Starting server")
start_server = websockets.serve(handler, 'localhost', 8765)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
Now we will try doing it in Clojure, using aleph
and manifold
.
A behaviourally-appropriate implementation would probably have to begin two
futures, each performing a d/loop
, as above. Alternatively it may be possible
to fold one of them into the main thread.
(defn long-function []
(Thread/sleep 1000))
(defn simple-async-ws-handler [req]
(let [s (-> req http/websocket-connection deref)]
(d/loop []
(d/chain (stream/take! s)
(fn [_]
(d/future (long-function)))
(fn [_] (stream/put! s "foo"))
(fn [_] (d/recur))))))
(http/start-server simple-async-ws-handler {:port 8765})
I was able to extend this to the following example that is "full duplex", like the Python example:
(defn consume-stream [s]
(d/loop []
(-> (stream/take! s)
(d/chain (fn [item]
(println "consumed")
(if item
(d/recur)
(println "end of stream"))))
(d/catch Exception #(println "did not work: " %)))))
(defn produce-to-stream [s]
(d/loop []
(-> (long-function)
(d/chain (fn [_] (stream/put! s "foo"))
(fn [result]
(if result
(d/recur)
(println "stream end"))))
(d/catch Exception #(println "did not work: " %)))))
(defn duplex-handler [req]
(let [s (-> req http/websocket-connection deref)]
(d/zip
(consume-stream s)
(produce-to-stream s))))
This is actually extremely neat, because it doesn't involve any explicit use of futures, which now begin to feel rather "manual". I'm not totally sure that this is actually doing what I want, though, but it seems a reasonable bet.
The exception handling is just necessary to avoid exceptions being shunted off
to nowhere when they happen asynchronously. Actually, manifold is good enough
to put them to clojure.tools.logging
, which means that they end up in my
*nrepl-server*
buffer.
The JavaScript client code for this is simple
document.addEventListener("DOMContentLoaded", e => {
var exampleSocket = new WebSocket("ws://localhost:8765/");
console.log(exampleSocket);
exampleSocket.onopen = e => {
exampleSocket.send("Here's some text that the server is urgently awaiting!");
};
exampleSocket.onmessage = e => {
console.log("Received data: %o", e.data);
};
});
You can also connect to it from elisp, using the websocket.el
library.
(defun wstest-ws ()
(websocket-open
"ws://127.0.0.1:8765"
:on-message (lambda (_websocket frame)
(message "Message call happened")
(message (format "%S" (websocket-frame-text frame))))
:on-open (lambda (_websocket)
(message "Websocket opened")
(websocket-send-text _websocket "Hello, world")))
'created-socket)