(def ctx (ZMQ/context 1))
(def msg-list (atom ()))? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ; 消息列表
(def stop-signal (atom false))? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ; 停止发送服务标识
(defn msg-publisher
? []
? (let [s (.socket ctx ZMQ/PUB)]
? ? (.bind s “tcp://x.x.x.x:xxxx”)
? ? (while (false? @stop-signal)? ? ? ? ? ? ? ? ? ? ? ? ? ? ; 遇到停止信号则退出发送循环
? ? ? (loop [msgs @msg-list]? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ; 对消息列表进行循环发送处理
? ? ? ? (if (empty? msgs)
? ? ? ? ? (do
? ? ? ? ? ? (reset! msg-list ())? ? ? ? ? ? ? ? ? ? ? ? ? ? ; 全部发送后清空消息列表
? ? ? ? ? ? (.send s (c/generate-string "0"))? ? ? ? ? ? ? ; 发送结束标识
? ? ? ? ? ? (Thread/sleep 1000)? ? ? ? ? ? ? ? ? ? ? ? ? ? ; 延时1秒后再重新读取,以免发送空数据太频繁
? ? ? ? ? ? )
? ? ? ? ? (do
? ? ? ? ? ? (.send s (c/generate-string (first msgs)))? ? ? ; 发送消息
? ? ? ? ? ? (recur (rest msgs)))? ? ? ? ? ? ? ? ? ? ? ? ? ? ; 发送下一条消息
? ? ? ? ? )))
(.close s)))
通过(future-call msg-publisher)将msg-publisher常驻线程后,msg-publisher会自动读取msg-list列表,将新增加的内容推送给客户端。下面附上测试代码:
(deftest test-msg-publisher
? (do
? ? (let [f (future-call msg-publisher)
? ? ? ? ? s (.socket ctx ZMQ/SUB)]
? ? ? (reset! stop-signal false)
? ? ? f
? ? ? (.subscribe s ZMQ/SUBSCRIPTION_ALL)
? ? ? (.connect s “tcp://x.x.x.x:xxxx”)
? ? ? (reset! msg-list (range 10000))? ? ? ? ? ? ? ? ? ? ? ; 产生消息10000条,但是只接收1000条,这是因为连接延时的问题,
? ? ? (loop [exec-times 1000? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ; 导致不可能将全部消息收全
? ? ? ? ? ? msg-count 0]
? ? ? ? (if (= 0 exec-times)
? ? ? ? ? (is (= 1000 msg-count))
? ? ? ? ? (do
? ? ? ? ? ? (let [msg (c/parse-string (.recvStr s))]
? ? ? ? ? ? ? ;(println msg)
? ? ? ? ? ? ? (if (not (= "0" msg))? ? ? ? ? ? ? ? ? ? ? ? ; 如果为0则表示不是我们希望要的数据
? ? ? ? ? ? ? ? (recur (dec exec-times) (inc msg-count))
? ? ? ? ? ? ? ? (recur (dec exec-times) msg-count)))))
? ? ? ? )
? ? ? (.close s)
? ? ? (reset! stop-signal true)
? ? ? (future-cancel f)
? ? ? (is (future-cancelled? f)))))
运行lein test,如果输出如下就表示运行正常。

相关阅读: