假设你已经知道什么是ZeroMQ(不知道的话可以看这个:http://zh.wikipedia.org/wiki/%C3%98MQ),以下就给出在Clojure中如何使用ZeroMQ(感谢此文作者:http://patternhatch.com/2013/06/12/messaging-using-clojure-and-zeromq/)。
1.? ? 创建一个Clojure项目,这里我们用leiin。lein new app zmq-test
2.? ? 在project.clj文件中添加
[com.rmoquin.bundle/jeromq "0.2.0"]
cheshire "5.3.1"]]
其中jeromq就是我们需要使用的ZeroMQ类库(纯Java实现),cheshire用于双向处理json。
3.? ? 打开core.clj文件,输入如下代码:
(ns zmq-test.core
? (:import [org.jeromq ZMQ])
? (:require (cheshire [core :as c])))
(def ctx (ZMQ/context 1))
;; REQ/REP [Request-Reply] Pattern
;; In REPL, input
;;? (future-call echo-server)
;;? (echo "hi")
;; to run the demo function
(defn echo-server
? []
? (let [s (.socket ctx ZMQ/REP)]
? ? (.bind s "tcp:// 127.0.0.1:5555")
? ? (loop [msg (.recv s)]
? ? ? (.send s msg)
? ? ? (recur (.recv s)))))
(defn echo
? [msg]
? (let [s (.socket ctx ZMQ/REQ)]
? ? (.connect s "tcp:// 127.0.0.1:5555")
? ? (.send s msg)
? ? (println "Server replied:" (String. (.recv s)))
? ? (.close s)))
;; PUB/SUB [Publish-Subscribe] Pattern
;; In REPL, input
;;? (future-call market-data-publisher)
;;? (get-market-data 100)
;; to run the demo function
(defn market-data-publisher
? []
? (let [s (.socket ctx ZMQ/PUB)
? ? ? ? market-data-event (fn []
? ? ? ? ? ? ? ? ? ? ? ? ? ? {:symbol (rand-nth ["CAT" "UTX"])
? ? ? ? ? ? ? ? ? ? ? ? ? ? :size (rand-int 1000)
? ? ? ? ? ? ? ? ? ? ? ? ? ? :price (format "%.2f" (rand 50.0))})]
? ? (.bind s "tcp:// 127.0.0.1:6666")
? ? (while :true
? ? ? (.send s (c/generate-string (market-data-event))))))
(defn get-market-data
? [num-events]
? (let [s (.socket ctx ZMQ/SUB)]
? ? (.subscribe s "")
? ? (.connect s "tcp://127.0.0.1:6666")
? ? (dotimes [_ num-events]
? ? ? (println (c/parse-string (String. (.recv s)))))
? ? (.close s)))
;; PUSH/PULL [Pipeline] Pattern
;; In REPL, input
;;? (future-call collector)
;;? (future-call worker)
;;? (future-call worker)
;;? (future-call worker)
;;? (dispatcher 100)
;; to run the demo function
(defn dispatcher
? [jobs]
? (let [s (.socket ctx ZMQ/PUSH)]
? ? (.bind s "tcp://127.0.0.1:7777")
? ? (Thread/sleep 1000)
? ? (dotimes [n jobs]
? ? ? (.send s (str n)))
? ? (.close s)))
(defn worker
? []
? (let [rcv (.socket ctx ZMQ/PULL)
? ? ? ? snd (.socket ctx ZMQ/PUSH)
? ? ? ? id (str (gensym "w"))]
? ? (.connect rcv "tcp://127.0.0.1:7777")
? ? (.connect snd "tcp://127.0.0.1:8888")
? ? (while :true
? ? ? (let [job-id (String. (.recv rcv))
? ? ? ? ? ? proc-time (rand-int 100)]
? ? ? ? (Thread/sleep proc-time)
? ? ? ? (.send snd (c/generate-string {:worker-id id
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? :job-id job-id
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? :processing-time proc-time}))))))
(defn collector
? []
? (let [s (.socket ctx ZMQ/PULL)]
? ? (.bind s "tcp://127.0.0.1:8888")
? ? (while :true
? ? ? (->> (.recv s)
? ? ? ? ? (String.)
? ? ? ? ? (c/parse-string)
? ? ? ? ? (println "Job completed:")))))
代码中包括了ZeroMQ的三种模式,可以直接在REPL中进行测试。但是这只是很简单的Hello World程序,如果要将ZeroMQ用于实际生产环境中的话,还有很多环节需要考虑和完善。
相关阅读: