zeromq官网用来怎么玩

zeromq中文教程_百度文库
两大类热门资源免费畅读
续费一年阅读会员,立省24元!
zeromq中文教程
上传于||文档简介
&&跨​平​台​轻​量​级​应​用​内​核
阅读已结束,如果下载本文需要使用0下载券
想免费下载更多文档?
下载文档到电脑,查找使用更方便
还剩53页未读,继续阅读
你可能喜欢本文介绍了如何使用ZeroMQ来进行内部服务的RPC调用。HTTP是面向公众服务的标准选择,但在由许多小部分组成的系统内调用内部RPC时,最好使用ZeroMQ代替HTTP。
总结一下,ZeroMQ较HTTP提供的好处如下:
多个并发的RPC调用可以使用同一个TCP连接,而HTTP需要按顺序调用(保持连接的情况下)。这是最大的一个好处。
没有手动连接管理。HTTP也是可以做到的,说到底是库的问题,但据我所知只有个别的库可以这样实现HTTP。
支持多个进程响应请求。不需要HTTP负载平衡器。
没有手动重试处理。停止服务器,调用一个RPC请求,然后启动服务器,然后得到一个响应-消息是队列化的而不是直接传送。
让我们体验一下吧! 一个简单的Clojure教程
Clojure的程序可能无法一眼便了解意图,这里是一个简单的注释。
;; Quick Clojure tutorial, with Java equivalents
Java method call>迭代1:请求和回复,无并发
第一个例子是你永远不会在生产中使用,因为它没有并发。我们对一些短小独立的改善迭代,让他们达到我们预期的效果,这些代码段演示了一些ZeroMQ的重要概念。
我们将创建两个socket。一个REQ(请求)socket。ZeroMQ的socket更具类型有不同的API。如果你打算将两个信息在一组从socket发送,你会得到一个错误。你需要先发送,在等待到答复后你才能再次发送。
另一个socket是一个REP(回复)socket。它有一个类似但相反的API。如果你对REP做的第一件事是发送数据,你会得到一个错误,你必须以等待消息开始,只有当你发送一个消息才可以做其他的。然后你必须等待另一个消息。
这个API使它容易做RPC调用,正如ZeroMQ将为我们处理所有的细节。我们需要做的是等待响应和答复答复回复,发出请求并等待响应。
显然,例子中的一个简单的ZeroMQ的REQ或REP的处理并不会有并发。你一次只能发送一个请求或者处理一个相应。后面我们会看到并发的例子。
;; 非常简单的REQ/REP初始化,只是用于基本的演示
;; 一次一个请求,没有并发。
(let [sock (.socket (ZMQ/context 1) ZMQ/REP)]
(.d sock &tcp://127.0.0.1:1337&)
(while true
; 阻塞住,直到收到消息
(let [req (.recv sock 0)]
; req是一个byte[]类型。你可以对其做一些处理!
; 将req回显给。
(.send sock (.getBytes (str (String. req) & - echoed!&)) ZMQ/NOBLOCK)))))))
(let [sock (.socket (ZMQ/context 1) ZMQ/REQ)]
(.connect sock &tcp://127.0.0.1:1337&)
(dotimes [n 5]
; 处理一次请求
(.send sock (.getBytes (str &Hello, & n)) 0)
; 阻塞住,直到收到响应
(let [res (.recv sock 0)]
; res是byte[]类型,包含REP包的回复内容。
(println (String. res))))))))
从这个小例子中我们可以了解很多东西。
看见没,没有用到代理!再重复一次,ZeroMQ不是传统的消息队列,它是一个库。(但是包含了一些类似消息队列的原语,往下看!)
假定消息是,与 JVM 默认的编码一致。但是对 ZeroMQ 来说都是&byte 类型。
bind/connect的顺序并没有关系。呵呵,如果不是这样的话,我们就不得不确保在连接REQ之前必须绑定好REP,实际上我们没有这样做(可以看到,线程代理的顺序并没有指定)。对于ZeroMQ 的套接字,真实的连接对我们来说是隐藏的。我们发送一条消息时,并没有立即连接,消息被加入REQ套接字上的本地队列,由REQ去决定何时连接。试一下,在bind之前加一行代码(Thread/sleep1000),可以看到代码仍然是工作的。
单个ZeroMQ套接字无法在不同的线程之间并发的使用。实际上,限制跟线程没有关系,而是跟状态有关。上面解释了,我们不可能在REQ上并排发送两次,那样会报错。所以你没法在中使用同一个套接字。不过后面我们会提到,如何来共享一直变化的状态。
迭代2:增加并发应答
到现在为止,我们一直保持请求代码不变,不过我们将替代每次只能进行一个应答的单个线程。
ZeroMQ想到了进程内消息传递,而且这就是我们用来获得并发应答的方法。我们将创建多个线程,并且每个线程对应一个REP套接字,然后使用具有新套接字的新线程把这些线程连接为一个大对象。这时就没有跨线程的状态了,我们在线程间要传递的唯一“状态”就是ZeroMQ消息。
为了获得并发应答,我们创建一个类型为DEALER的套接字。我们把这个套接字与&inproc://“协议捆绑在一起,然后把真正的REP套接字和这个DEALER套接字连接在一起。当DEALER套接字接收到消息的时候,它就把这个消息传递给一个REP套接字,然后看看那些套接字正忙。当一个REP套接字应答的时候,它将给只把消息按原样向前传递DEALER同样的应答。这种考虑到对许多REP套接字前转消息的一个顶级套接字。
在这片的所有例子里,我们将在一个单独的TCP连接上使用多个REQ套接字来实现并发请求。不过,如果你需要支持到我们服务器的多个TCP连接的话,我们当然需要这样的连接,那么我们需要在DEALER前面有一个ROUTER。例如我们可能有多个系统正连接这一个服务,并要求每个服务/进程对应各自的TCP连接。或者我们可能需要连接到服务器来维护RPC调用。
ROUTER将给每个连接到它的套接字一个内部ID号,然后立刻前转附带有这个号码的元数据的消息,包含内部套接字ID。当它获得返回的消息的时候,它立刻前转消息给已经连接的并出现在元数据里的套接字ID对应的REQ套接字。
(defn my-response-handler
&Takes a req (bytes), returns the response (also bytes).&
(.getBytes (str (String. req) & - echoed!&)))
(let [ctx (ZMQ/context 1)
worker-url &inproc://responders&
router-socket (.socket ctx ZMQ/ROUTER)
dealer-socket (.socket ctx ZMQ/DEALER)]
(.bind router-socket &tcp://127.0.0.1:1337&)
(.bind dealer-socket worker-url)
;现在我们可以并发地响应10个请求了
(dotimes [n 10]
(let [sock (.socket ctx ZMQ/REP)]
; 我们应答DEALER
(.connect sock worker-url)
(while true
; 同以前相同的API- 接收消息,然后应答。
(let [req (.recv sock 0)]
(.send sock (my-response-handler req) ZMQ/NOBLOCK))))))))
; 前转来自ROUTER的消息给DEALER或者相反。
(fn [] (.run (ZMQQueue. ctx router-socket dealer-socket)))))) 你可以用前面例子里的REP线程替代这段代码,而且它也可以正常运行。不同之处在于现在我们已经有十个响应请求的线程了。要注意的几个事项:
ZMQQueue是一个内置的便利函数,它可以把所有的消息传递给其他套接字或者相反。在我们的部署里,这意味着来自ROUTER的所有的消息都将发送给DEALER,并且所有来自DEALER的消息将发送给ROUTER,而且不需要我们书写这段代码完成这样的事情。
在ZMQQueue上调用.run将阻塞当前线程,这就是我们让它拥有一个独立线程的原因。
&inproc://&协议的绑定/连接事件的顺序。对inproc来说,顺序如何都是真的;正如我们前面看到的&tcp://&首先要进行连接,然后才绑定。而对inproc来说,你不需要冒关闭或者没有起来等这些风险,而且你控制着inproc的绑定和连接的流,因此与其说顺序产生了重要问题不如说它更不方便。这也意味着连接inproc非常容易-它不需要检查是否连接已经完成,做重试等,所以inproc比TCP有效多了,不仅仅因为它不需要TCP握手,和校验等。
迭代三:增加并发请求
现在我没拥有了应答多个的多线程。是时候用更聪明的且让我们可获得并发的东西替代单个REQ套接字了。
你可能已经猜到,我们只要创建无数REQ套接字就可以了。虽然这个主意很糟糕,因为每个REQ套接字都需要拥有它们自己的TCP连接。我们可以拥有一个可用的REQ套接字池,这样我们每次做请求的时候就不需要花费大力气创建新的TCP连接了。然而ZeroMQ有一个更好的方案,这种情况下它可以处理完们遇到的所有令人讨厌的问题。
我们不能脱离这样一个事实:REQ用一个阻塞的API。我们假设你处在阻塞请求是可行的这种环境里。我们所使用的例子一直是Java HTTPservlet响应者,它需要通过ZeroMQ调用内部服务。我不能确定ZeroMQ是否具有异步API。 好,现在看看真实的代码。
(defn connect
[server-url]
(let [ctx (ZMQ/context 1)
worker-url (str &inproc://& (.util.UUID/randomUUID))
queue-thread (Thread.
(let [client-sock (.socket ctx ZMQ/DEALER)
worker-sock (.socket ctx ZMQ/ROUTER)]
(.connect client-sock server-url)
(.bind worker-sock worker-url)
(.run (ZMQQueue. ctx
client-sock worker-sock)))))]
(.start queue-thread)
:worker-url worker-url
:queue-thread queue-thread}))
(defn disconnect
tests etc. Pass the map returned by `connect` above.&
[connection]
(.interrupt (get connection :queue-thread))
(.term (get connection :ctx)))
(defn with-req-sock
&Takes the connection and a higher order function that is passed a new REQ
socket. When this function returns, the REQ socket is destroyed.&
[connection handler]
(let [socket (.socket (get connection :ctx) ZMQ/REQ)]
(.connect socket (get connection :worker-url))
(handler socket)
(finally (.close socket)))))
(def connection (connect &tcp://127.0.0.1:1337&))
(dotimes [n 5]
(with-req-sock connection
(fn [sock]
(.send sock (.getBytes (str &Hello, & n)) 0)
(let [res (.recv sock 0)]
(println (String. res))))))))) (connectserver-url)创建了一些新连接。我们为我们的进程创建了一个单独的连接,让后在我们需要进行请求的时候把请求传给它。我们对每个请求创建一个新的REQ套接字,然后使用&inproc://&发送消息给ROUTER。ROUTER然后前转消息给DEALER(像以前一样通过ZMQQueue)。DEALER通过TCP连接真正的服务器。 要注意的几个事项:
如果你运行这段代码,那么在你的终端将交叉的插入输出。这是因为请求和应答同时发生。
另外,线程间除了inproc的url外没有其他状态共享,我们真正要做的一切是在它们之间发送ZeroMQ消息。
正如上面已经提到的,我们可以直接对服务器发送REQ,不过这意味着我们需要对每个请求执行完整的TCP连接。在这样的部署里,DEALER是一个单独的TCP连接,其他的所有都是inproc。我们可以结束在同一个TCP连接上运行的并发请求。DEALER不真正关心消息类型,因此它可以发送5个请求,得到2个应答,然后发送2个或者更多请求等等。ROUTER套接字根据内部的套接字ID来关注发送消息给正确的地方。
正如代码所呈现那样,现在我们创建了多个处在独立线程里的REQ套接字,并执行了请求。通常你对每个进入的请求使用(with-req-sock)创建新的REQ。
实现真正的 RPC
迄今为止,你已经学习到的所有东西就是如何使用ZeroMQ来来回回的发送消息。我们已经看到我们能在一个TCP连接上并发地发送任意数量的请求和应答。我们启动一个客户端,然后发送请求,接着我们启动服务器,然后即使在我们没有代理的情况下,一切运行正常。我们不需要管理连接而且也不需要手动地重复多次的处理了。
然而怎样才能真正的实现RPC呢?迄今为止,我们已经发送了字节并显示了这些字节。我们需要有一种调用多个过程的方法,而且不需要过程调用还可以发送数据。这里我们应该做什么才能发送这些有用的东西呢?
让我们效仿一下HTTP!我喜欢这种方法、路径和内容体的语义,因此让我们保留这些。类似于HTTP,我们总是应答,而且有时候是含有错误的应答-不存在“null&应答。由于ZeroMQ系统字节,所以我们使用SMILE做为数据格式,它是类JSON的格式,使用这种格式它能知道如何把映射、列表、集合和字符串编码为后来可以解码的东西 。这就是所谓的“”格式,因此它可以传送原始字节,这是就不存在任何字符串编码的问题了。
下面是我如何执行一个请求来列出我系统中所有的用户的:
(defn do-rpc
([sock method path] (do-rpc sock method path nil))
([sock method path body]
(let [msg (cheshire.core/generate-smile
{:method method :path path :body body})]
(.send sock msg 0)
(let [res (.recv sock 0)]
(cheshire.core/parse-smile res true)))))
(with-req-sock connection
(fn [sock]
(println (do-rpc sock &GET& &/users&)))) 这个以“json”地图形式(包含:键、方法、路径和数据体)发送了一个请求。如果我们现在这样做了,我们就只会得到一个奇怪的返回,因为我们的服务器还只是对请求作简单的默认回应。所以让我们动动手来实施一个实际的RPC请求处理程序吧! 响应请求
由于我们非常类似于HTTP,所以只简单的几步就实现基本的请求了。
我们将把上面的响应处理器实现为读取以SMILE编码的请求而且用以SMILE编码的响应应答的东西。
(defn respond
(if (and (= (get req :method) &GET&) (= (get req :path) &/users&))
{:status 200 :body [{:id 1 :name &August&}]}))
;; 这个函数常常用来回显的。现在我们解析并生成SMILE。
(defn my-response-handler
(let [req (cheshire.core/parse-smile raw-req true)]
(if-let [response (respond req)]
(cheshire.core/generate-smile response)
(cheshire.core/generate-smile {:status 404})))
(catch Exception e
(cheshire.core/generate-smile {:status 500}))))
如果响应返回非空,那么就显示它是一个有效地响应并且使用SMILE编码这个响应。如果响应返回的是空,那么返回状态404。就像HTTP那样,因为为什么不那样呢!如果其中任何一个抛出例外,那么我们将响应状态500。再说明一下,就如同HTTP那样。
虽然这个比回显服务器稍稍让人感兴趣一点,但是如果测试的话,我们就不想进行RPC。我们想应用某种库来建立各自不同且尽可能少的重复请求处理器。
深呼吸。请坐下来。你感觉到舒畅吗?这是多么精彩啊。
使用已有的HTTP路由库
我们的请求非常类似于HTTP,有方法和路径,这就是我们为了通过方法和路径来路由传统的HTTP而确实需要的东西。
因此,为了实现这个目的,我们只使用已有的HTTP 路由库!
我们将使用处理Ring请求的Compojure(译者注:Clojure的MVC框架)。Ring是Clojure中最常用的HTTP栈。Ring有非常简单的API。你在Ring上编写一个函数。这个函数把一个参数,请求,应答当作不可更改的map。这个函数应当返回一个表示应答的map。就这些。
我们需要做的唯一的事情就是转换基于map的ZeroMQ为Compojure可以理解的东西。在Ring上,它被称为:request-method而不仅仅是:method,例如:
(defroutes app
(GET &/users& []
{:status 200 :body [{:id 1 :name &August&}]})
(GET &/users/:user-id& [user-id]
(if (= 1 user-id)
{:status 200 :body {:id 1 :name &August&}})))
(defn respond
(app {:request-method (keyword (.toLowerCase (get req :method)))
:uri (get req :path)
:body (get req :body)})) 语言无法表达这是多么的精彩。仅仅只需几行代码,我们就从根本不是HTTP的东西里调用了HTTP栈。 我们已经实现了!
哇,到现在为止你已经实现了RPC了!或者也许你只是大概浏览一下就直接到达这一部分。无论哪种方式,我都为你而骄傲。
现在我们拥有了一个仅在一个TCP连接上进行多个RPC调用的可靠的方法,并且不需要手工管理连接,而且在服务器关闭的情况下可以对请求排队。我们用一种完善的方式实现了响应各种过程的服务器。这就是我们真正需要的,因此让我们庆祝一下,然后在不需要处理哪些诸如连接管理等令人讨厌的细节的情况下执行远程过程调用。
还有,我认为这篇文章显示了一个数据结构和一百个函数的强大。只是通过创建一个看起来和Ring栈创建的相同的数据结构,我们就能重用已经对我们来说可用的整套HTTP工具。
祝你幸运!
事实上我自己没有这样做过,所以我不会给你展示它的代码。我会把实现细节作为练习留给读者们。毕竟,概念很清楚。既然我们已经看到组装REQ/REP 和DEALER/ROUTER是很容易的,所有需要处理的只是介于REQ客户端和REP服务器之间的一个节点和路由,以及处理对多服务器的请求。或者,只需要把我们已经有的DEALER 绑定到服务器上,然后开一堆REP socket直接连接到它上面。但要注意,那会导致为每一个REP socket打开一个TCP连接。
我只是使用ZeroMQ来作为位于之后的内部服务,所以我并不需要添加任何形式的认证。 我对于认证的建议将是:模仿HTTP。我将会完全允许对服务本身的访问,但是在路由上设置一个前提条件,来检查是否在请求中提供了某种类型的认证令牌。如果请求没有通过认证,就响应一个403状态码或者相关的信息。
在 ZeroMQ 3中提供加密传输的工作也正在进行中,所以如果你在通过一个你不信任的网络传播消息,你不必手动加密你的消息来满足MITM性。
Clojure代码的一些说明
我有这样一个疯狂的想法:我应当让那些一般不熟悉Clojure的人们尽可能的读懂代码。因此我做了一些非习惯性东西。我可能获得这个想法的所有就是疏远那些真正读懂这篇文章的人-Clojure人员。喔,就这样。
我们在做许多Java混合的操作。在我代码里,我喜欢使用doto宏,而不是如下:
let [ctx (ZMQ/context 1)
router-socket (.socket ctx ZMQ/ROUTER)
dealer-socket (.socket ctx ZMQ/DEALER)]
(.bind router-socket &tcp://127.0.0.1:1337&)
(.bind dealer-socket &inproc://responders&)
) 我们应当书写成下面:
(let [ctx (ZMQ/context 1)
router-socket (doto (.socket ctx ZMQ/ROUTER) (.bind &tcp://127.0.0.1:1337&))
dealer-socket (doto (.socket ctx ZMQ/DEALER) (.bind &inproc://responders&))]
) 这意味着我们在这段代码里有很少的名字,而且还创建了ROUTER/DEALER对和队列,如下:
(let [ctx (ZMQ/context 1)
queue (ZMQQueue.
(doto (.socket ctx ZMQ/ROUTER) (.bind &tcp://127.0.0.1:1337&))
(doto (.socket ctx ZMQ/DEALER) (.bind &inproc://responders&)))]
(.start (Thread. (fn [] (.run queue))))) 或者有更少的名字,且内联化整个代码:
(let [ctx (ZMQ/context 1)]
(ZMQQueue.
(doto (.socket ctx ZMQ/ROUTER) (.bind &tcp://127.0.0.1:1337&))
(doto (.socket ctx ZMQ/DEALER) (.bind &inproc://responders&)))
(.run)))))) doto 把Java对象当作第一个参数,其他紧跟着的所有参数都被当作列表(在Clojure里,列表就是函数调用),接着把实例插入做为列表的第二个,也就是函数的第一个参数。因此虽然这段代码看起来像我们对一个字符串调用了bind ,实际上我们调用的是传递给第一个参数的实例-对ctx调用套接字所生成的套接字实例。这便于创建某对象的实例,调用实例的许多可以产生其他结果的方法,然后在不需要为这个实例创建命名变量的情况下返回实例。
我还选择使用(get my-map:a-key)而不是更符合语言习惯的(:a-key my-map) 。这使不熟悉Clojure的人迷惑:关键字还是可以调用的多态函数,如果第一个参数是map,那么这将执行map查找。用Apache Spark进行大数据处理——第三部分:Spark流
来源:InfoQ&
作者:荣耀翻译
  在&用Apache Spark进行处理&系列的前两篇文章中,我们看到了Apache Spark框架是什么(第一部分)还有如何使用Spark SQL库访问数据的SQL接口(第二部分)。
  这些方案是基于批处理模式下静态信息处理的,比如作为一个按小时或天运行的任务。但若是在数据驱动的业务决策场景下,当需要飞快地分析实时数据流以执行分析并创建决策支持时,又该如何呢?
  使用流式数据处理,一旦数据到达计算就会被实时完成,而非作为批处理任务。实时数据处理与分析正在变为大多数组织的大数据战略中至关重要的一个组件。 在本文中,我们将会学习到如何使用Apache Spark中一个被称为 Spark流 的库进行实时数据分析。
  我们将会看到一个网络服务器日志分析用例,该用例会向我们展示Spark流是如何帮助我们对持续产生的数据流进行分析的。
  流数据分析
  流数据基本上是一组连续的数据记录,它们通常产生于诸如传感器、服务器流量与在线搜索等数据源。常见的流数据的例子有网站上的用户行为、监控数据、服务器日志与其他事件数据。
  流数据处理应用会有助于现场面板、实时在线推荐与即时诈骗检测。
  如果我们正在构建一个实时收集、处理与分析流数据的应用,我们需要按照与批处理数据应用不同的设计视角进行考虑。
  下面列出了三种不同的流数据处理框架:
  Apache Samza
  Spark流
  在本文中我们将专注于Spark流。
  Spark流
  Spark流是核心Spark API的扩展。Spark流使得基于实时数据流构建容错性处理变得更加简单。下面的图1展示了Spark流是如何融入到整个Apache Spark生态系统中。
  (点击放大图像)
  图1.具有Spark流库的Spark生态系统
  Spark流工作的方式是将数据流按照预先定义的间隔(N秒)划分为批(称微批次)然后将每批数据视为一个 弹性分布式数据集 (Resilient Distributed Datasets,RDDs)。随后我们就可以使用诸如map、reduce、reduceByKey、join和window这样的操作来处理这些RDDs。这些RDD操作的结果会以批的形式返回。通常我们会将这些结果保存到数据存储中以供未来分析并生成报表与面板,或是发送基于事件的预警。
  为Spark流决定时间间隔是很重要的,这需要基于你的用例与数据处理要求。如果值N太低,那么在分析阶段微批次就没有足够的数据以给出有意义的结果。
  与Spark流相比,其他流处理框架是基于每个事件而非一个微批次来处理数据流的。用微批次的方法,我们可以在同一应用下使用Spark流API来应用其他Spark库(比如核心、机器学习等)。
  流数据可以来源于许多不同的数据源。下面列出一些这样的数据源:
  Twitter
  ZeroMQ
  Amazon's Kinesis
  TCP sockets
  使用诸如Apache Spark这种大数据处理框架的另外一个优势就是我们可以在同一系统中组合批处理与流处理。我们也可以在数据流上应用Spark的机器学习与图处理算法。在本系列的后续文章当中,我们将会讨论被称为 MLlib 和 GraphX 的机器学习与图处理库。
  Spark流结构如下图2所示。
  (点击放大图像)
  图2.Spark流如何工作
  Spark流用例
  Spark流正在变为实现实时数据处理与分析方案的首选平台,这些实时数据往往来源于物联网(Internet of Things,IoT)和传感器。它被用于各种用例与商业应用。
  下面是一些最有趣的 Spark流用例 :
  Uber ,车驾共享服务背后的公司,在他们的持续流式ETL管道中使用了Spark流以每天从其移动用户处收集TB级的事件数据来进行实时遥测分析。
  Pinterest ,可视化书签工具背后的公司,使用Spark流、MemSQL与Apache Kafka技术以实时地深入了解他们全球的用户是怎样使用Pins的。
  Netflix 使用Kafka与Spark流来构建一个实时在线电影推荐与数据监控解决 方案 ,该方案每天要处理来自于不同数据源的数十亿条事件。
  Spark流其他现实世界的样例还包括:
  供应链分析
  实时安全情报操作以寻找威胁
  广告竞价平台
  实时视频分析,以帮助观看者实现个性化与互动体验
  让我们看一下Spark流的架构与API方法。若要编写Spark流程序,我们需要知晓两个组件:DStream与流上下文。
  DStream
  Dstream (离散流,Discretized Stream,的缩写)是Spark流中最基本的抽象,它描述了一个持续的数据流。DStream既可以从诸如Kafka、Flume与Kinesis这样的数据源中创建,也可以对其他DStream实施操作。在内部,一个DStream被描述为一个RDD对象的序列。
  与RDDs上的转换与动作操作类似,DStream支持以下 操作 :
  flatMap
  filter
  reduce
  countByValue
  reduceByKey
  updateStateByKey
  流上下文
  与Spark中的 Spark上下文(SparkContext) 相似, 流上下文(StreamingContext)是所有流功能的主入口。
  流上下文拥有内置方法可以将流数据接收到Spark流程序中。
  使用该上下文,我们可以创建一个描述基于TCP数据源的流数据的DStream,可以用主机名与端口号指定TCP数据源。比如,如果我们使用像netcat这样的工具来测试Spark流程序的话,我们将会从运行netcat的机器(比如localhost)的9999端口上接收到数据流。
  当代码被执行,在启动时,Spark流仅是设置将要执行的计算,此时还没有进行实时处理。在所有的转换都被设置完毕后,为了启动处理,我们最终会调用start()方法来启动计算,还有awaitTermination()方法来等待计算终结。
  Spark流API
  Spark流附带了若干个用于处理数据流的API方法。有类似于RDD的操作,比如map、flatMap、filter、count、reduce、groupByKey、reduceByKey、sortByKey和join。它也提供了其他基于window与stateful操作的处理流数据的API。包括window、countByWindow、reduceByWindow、countByValueAndWindow、reduceByKeyAndWindow和updateStateByKey。Spark流库当前支持Scala、Java和Python编程语言。这里是每个语言对应的Spark流API链接:
  Spark流Scala API
  Java API
  Python API
  Spark编程的步骤
  在我们讨论样例应用之前,先来看看Spark流编程中与众不同的步骤:
  Spark流上下文被用于处理实时数据流。因此,第一步就是用两个参数初始化流上下文对象,Spark上下文和切片间隔时间。切片间隔设置了流中我们处理输入数据的更新窗口。一旦上下文被初始化,就无法再向已经存在的上下文中定义或添加新的计算。并且,在同一时间只有一个流上下文对象可以被激活。
  当Spark流上下文被定义后,我们通过创建输入DStreams来指定输入数据源。在我们的样例应用中,输入数据源是一个使用了Apache Kafka分布式数据库和消息系统的日志消息生成器。日志生成器程序创建随机日志消息以模拟网络服务器的运行时环境,作为各种网络应用服务用户而产生的流量,日志消息被持续不断地生成。
  使用map和reduce这样的Spark流变换API为DStreams定义计算。
  当流计算逻辑被定义好后,我们可以使用先前创建的流上下文对象中的start方法来开始接收并处理数据。
  最终,我们使用流上下文对象的awaitTermination方法等待流数据处理完毕并停止它。
  样例应用
  在本文中我们讨论的样例应用是一个服务器日志处理与分析程序。它可以被用于对服务器日志进行实时监控并执行基于这些日志的数据分析。这些日志消息被认为是 时序数据 ,也就是由在一个指定时间间隔内所捕捉到的连续度量的数据点组成的序列。
  时序数据的例子包括传感器数据、天气信息和点击流数据。时序分析就是处理时序数据以提取有助于制定业务决策的信息。该数据也可以被用于基于历史数据的预测分析。
  使用这样的方案,我们不需要每小时或每天的批处理任务来处理服务器日志。Spark流接收持续产生的数据,对其进行处理并计算日志统计,以此来挖掘数据。
  为了遵循服务器日志分析的标准样例,我们将会使用在Data Bricks Spark流 参考应用 中所讨论的Apache日志分析器作为我们样例应用的参考。该应用已经具备将在我们的应用中被重用的日志消息解析代码。这个参考应用是一个用来学习Spark通用框架以及Spark流的优秀资源。
  点击他们的 网站 ,以查看更多关于Databricks Spark参考应用的细节。
  样例应用的用例是一个网络服务器日志分析与统计的生成器。在样例应用中,我们分析网络服务器日志以计算如下统计信息,这些信息有助于进一步的数据分析和报表及面板的创建:
  不同HTTP响应代码的响应计数
  响应内容大小
  导致最高网络流量的访问客户端的IP地址
  最热门的终端URL以识别那些比其他服务被访问的更多服务
  与本系列的前两篇文章不同,在本文中我们将使用Java而非Scala来创建Spark程序。我们按照独立应用的方式运行程序,而不是在控制台窗口中运行代码。在测试与产品环境中部署Spark程序也如此。Shell控制台接口(使用Scala、Python或R语言)仅仅是用于开发者本地测试而已。
  在样例程序中我们将使用如下的技术来演示如何使用Spark流库处理实时数据流。
  Zookeeper
  Zookeeper 是一个为分布式应用提供可靠分布式协调的集中化的服务。Kafka,我们在样例应用中使用的消息系统,依赖于Zoopkeeper在整个集群中的详细设置。
  Apache Kafka 是一个实时的、容错的、可扩展的消息系统,它用于实时地移动数据。对于诸如捕捉网站上用户活动、日志、股票行情数据以及仪表数据这些用例来说,它是一个很好的选择。
  Kafka的工作方式类似于分布式数据库,它是基于被分区和复制的低延迟提交日志的。当我们将一个消息发送给Kafka,在集群中它会被复制给不同的服务器,与此同时它也会被提交到磁盘。
  Apache Kafka包含客户端API以及一个称为Kafka连接的数据转换器框架。
  Kafka客户端:Kafka包括Java客户端(针对消息生产者与消费者)。在我们的样例应用中我们将会使用Java生产者客户端API。
  Kafka连接:Kafka也包含了 Kafka连接 ,即一个介于Apache Kafka与外部数据系统之间的流数据框架,它可以支持组织内的数据管道。它包含了导入与导出连接器以将数据集移入或移出Kafka。Kafka连接程序可以作为独立进程或分布式服务运行,它支持REST接口的方式,即使用REST API提交连接器到Kafka连接集群。
  Spark流
  我们将会使用Spark流Java API来接收数据流,计算日志统计信息并且运行查询以回答诸如&最多网络请求来自于哪个IP地址&这样的问题。下面的表1展示了样例应用中所使用的技术与工具以及他们的版本。
https://zookeeper.apache.org/doc/r3.4.6/
http://kafka.apache.org/downloads.html
https://spark.apache.org/releases/spark-release-1-4-1.html
/technetwork/java/javase/downloads/jdk7-downloads-1880260.html
http://archive.apache.org/dist/maven/maven-3/3.3.3/
  表1.Spark流样例应用技术及工具
  在图3中演示了Spark流样例应用中不同架构组件。
  (点击放大图像)
  图3.Spark流样例应用架构
  Spark流应用运行时
  为了在本地设置Java项目,可以从Github上下载 Databricks参考应用代码 。一旦获取了参考应用代码,就需要两个额外的Java类来运行我们的样例应用。
  日志生成器(SparkStreamingKafkaLogGenerator.java)
  日志分析器(SparkStreamingKafkaLogAnalyzer.java)
  在文章网站上提供了这些文件的zip压缩包( spark-streaming-kafka-sample-app.zip)。如果你想在你本地机器上运行样例应用,使用链接下载zip文件,抽出Java类并将他们添加到之前步骤中创建的Java项目中。
  样例应用可以被执行在不同的操作系统上。我在Windows和Linux(CentOS VM)环境下都运行了应用。
  让我们看一下应用架构中的每个组件还有执行Spark流程序的步骤。
  Zookeeper命令:
  在样例程序中我使用的Zookeeper版本是3.4.6。为了启动服务器,需要设置两个环境变量,JAVA_HOME与ZOOKEEPER_HOME来指定JDK和Zookeeper各自的安装目录。然后导航到Zookeeper的home目录并运行如下命令来启动Zookeeper服务器。
bin\zkServer.cmd
如果你使用的是Linux环境,命令就是:
bin/zkServer.sh start
  Kafka服务器命令:
  在程序中使用的Kafka版本是2.10-0.9.00,基于Scala2.10版本。在Kafka中所使用的Scala版本是非常重要的,因为若是没有使用恰当的版本的话,当执行Spark流程序时就会遇到运行时错误。这里是启动Kafka服务器实例的步骤:
  打开一个新的命令行窗口
  设置JAVA_HONE与KAFKA_HOME环境变量
  导航到Kafka的home目录
  运行如下命令
bin\windows\kafka-server-start.bat config\server.properties
对于Linux环境,命令如下:
bin/kafka-server-start.sh config/server.properties
  日志生成器命令:
  在我们的样例应用中下一步就是运行消息日志生成器。
  日志生成器以不同的HTTP响应码(诸如200、401和404)及不同的终端URL创建测试日志消息。
  在我们运行日志生成器之前,我们需要创建一个主题(Topic),我们可以将消息写到里面去。
  与之前的步骤类似,打开一个新的命令行窗口,设置JAVA_HOME和KAFKA_HOME环境变量,并且导航到Kafka的home目录。然后首先运行以下命令来查看在Kafka服务器中已经存在的可用主题。
bin\windows\kafka-run-class.bat kafka.admin.TopicCommand --zookeeper localhost:2181 --list
在Linux上:
bin/kafka-run-class.sh kafka.admin.TopicCommand --zookeeper localhost:2181 --list
我们将会用以下命令创建一个叫做&spark-streaming-sample-topic&的新主题:
bin\windows\kafka-run-class.bat kafka.admin.TopicCommand --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --create --topic spark-streaming-sample-topic
在Linux上:
bin/kafka-run-class.sh kafka.admin.TopicCommand --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --create --topic spark-streaming-sample-topic
  你可以再次运行list主题命令以查看是否新主题已经被正确创建。当主题已经被创建好后,我们就可以运行日志生成器程序了。通过调用称为SparkStreamingKafkaLogGenerator的Java类来完成此步骤。日志生成器类接收以下四个参数来指定配置参数:
  组标识:spark-streaming-sample-group
  主题:spark-streaming-sample-topic
  迭代次数:50
  间隔:1000
  打开一个新的命令行窗口来运行日志生成器。我们将要为JDK、Maven和Kafka目录分别设置三个环境变量(JAVA_HOME、MAVEN_HOME和KAFKA_HOME)。然后导航到样例项目根目录(比如c:\dev\projects\spark-streaming-kafka-sample-app)并运行以下命令。
mvn exec:java -Dexec.mainClass=com.sparkstreaming.kafka.example.SparkStreamingKafkaLogGenerator -Dexec.args="spark-streaming-sample-groupid spark-streaming-sample-topic 50 1000"
  一旦日志生成器程序运行起来,就应该在控制台上通过debug消息看到被创建的测试日志消息。这只是个样例代码,所以日志消息被随机地生成以模拟从诸如网络服务器这种事件源生成的持续不断的数据流。下面的图4展示了日志消息生产者还有正在生成的日志消息截屏。
  (点击放大图像)
  图4.Spark流日志生成器程序输出
  Spark流命令:
  这是使用了Spark流API的日志消息消费者。我们使用叫做SparkStreamingKafkaLogAnalyzer的Java类来从Kafka服务器上接收并处理数据流以创建日志统计信息。
  Spark流处理服务器日志消息并生成累计日志统计信息,比如网络请求大小(最小、最大与平均)、响应代码计数、IP地址与热点终端。
  我们用&local[*]&创建Spark上下文,它会在本地系统中检测内核的数量并使用它们运行程序。
  为了运行Spark流Java类,将会在classpath中用到以下JAR文件:
  kafka_2.10-0.9.0.0.jar
  kafka-clients-0.9.0.0.jar
  metrics-core-2.2.0.jar
  spark-streaming-kafka_2.10-1.4.0.jar
  zkclient-0.3.jar
  将上述JAR文件添加到classpath后我用Eclipse IDE运行了程序。日志分析Spark流程序的输出如图5。
  (点击放大图像)
 图5.Spark流日志分析程序输出
  Spark流应用的可视化
  当Spark流程序运行的时候,我们可以检查Spark控制台来查看Spark任务的细节。
  打开一个新的网络浏览器窗口并导航到URL http://localhost:4040 以访问Spark控制台。
  先看看一些展示Spark流程序统计信息的图表。
  第一个可视化就是任务的DAG(无回路有向图,Direct Acyclic Grapg),它展示了我们所运行的程序中不同操作的依赖图,操作有map、window和foreachRDD等。下面的图6展示了我们样例程序中Spark流任务的可视化截屏。
  (点击放大图像)
  图6.Spark流任务的可视化图形
  我们将要看的下一个图形就是包含了输入比率的流统计图,它显示了每秒的事件数量,以及处理所花费的毫秒数。图7展示了Spark流程序执行期间的这些统计信息,左面是流数据还没有产生时的情况,而右边是数据流被发送到Kafka并且被Spark流消费者处理的情况。
  图7.为样例程序展示流统计信息的Spark可视化
  Spark流库 ,Apache Spark生态系统中的一部分,用于实时流数据的数据处理。在本文中,我们学习了如何使用Spark流API来处理由服务器日志生成的数据并基于实时数据流执行分析。
  下一步是什么
  机器学习、预测分析和数据科学在近期都在获得越来越多的关注,他们都是不同用例下的问题解决方案。 Spark MLlib ,Spark机器学习库,提供了若干内置方法以使用诸如协同过滤、聚簇与归类这样的不同机器学习算法。
  在下一篇文章中,我们将会探索Spark MLlib并观察几个用例来演示如何利用Spark的数据科学计算能力,它可以使机器学习算法的使用变得更加简单。
  在本系的后续文章中,我们将看看像 BlinkDB 与 Tachyon 这样的即将到来的框架。
相关新闻 & & &
& (03月31日)
& (03月28日)
& (04月05日)
& (03月30日)
   同意评论声明
   发表
尊重网上道德,遵守中华人民共和国的各项有关法律法规
承担一切因您的行为而直接或间接导致的民事或刑事法律责任
本站管理人员有权保留或删除其管辖留言中的任意内容
本站有权在网站内转载或引用您的评论
参与本评论即表明您已经阅读并接受上述条款}

我要回帖

更多关于 zeromq 安装 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信