您的当前位置:首页正文

Storm学习笔记

来源:图艺博知识网

Storm简介

Storm是什么

  • Storm是Twitter开源的一个分布式的实时计算系统,用于数据的实时分析,持续计算,分布式RPC等等
  • Storm是一个免费开源、分布式、高容错的实时计算系统。Storm令持续不断的流计算变得容易,弥补了Hadoop批处理所不能满足的实时要求。Storm经常用于在实时分析、在线机器学习、持续计算、分布式远程调用和ETL等领域。Storm的部署管理非常简单,而且,在同类的流式计算工具,Storm的性能也是非常出众的

JStorm

  • 阿里的JStorm:

实时计算

  • 扩展了解

  • 实时计算解决的问题

    • 实时推荐系统,其中Hadoop只是做离线的数据分析(海量数据分析),无法做到实时分析计算
    • 车流量实时计算,比如用Storm计算每一个路段的拥挤度等相关路况信息
    • 股票系统
  • 实现实时计算系统

    • 低延迟
    • 高性能
    • 分布式
    • 可扩展
    • 容错
    • 可靠性
    • 快速,系统的设计保证了消息能得到快速的处理,使用ZeroMQ作为其底层消息队列(0.9.0版本以前)
    • 本地模式:Storm有一个本地模式,可以在处理过程中完全模拟Storm集群,可快速进行开发和单元测试

Storm体系结构

Storm与Hadoop的对比

结构 Hadoop Storm
主节点 JobTracker Nimbus
从节点 TaskTracker Supervisor
应用程序 Job Topology
工作进程名称 Child Worker
计算模型 Map / Reduce Spout / Bolt

架构

  • 架构结构图

    img
  • 体系结构

    Storm主要分为两种组件Nimbus和Supervisor。这两种组件都是快速失败的,没有状态。任务状态和心跳信息等都保存在Zookeeper上的,提交的代码资源都在本地机器的硬盘上。

    • Nimbus负责在集群里面发送代码,分配工作给机器,并且监控状态。全局只有一个。

    • Supervisor会监听分配给它那台机器的工作,根据需要启动/关闭工作进程Worker。每一个要运行Storm的机器上都要部署一个,并且,按照机器的配置设定上面分配的槽位数。

    • Zookeeper是Storm重点依赖的外部资源。Nimbus和Supervisor甚至实际运行的Worker都是把心跳保存在Zookeeper上的。Nimbus也是根据Zookeerper上的心跳和任务运行状况,进行调度和任务分配的。

    • Storm提交运行的程序称为Topology。

    • Topology处理的最小的消息单位是一个Tuple,也就是一个任意对象的数组。

    • Topology由Spout和Bolt构成。Spout是发出Tuple的结点。Bolt可以随意订阅某个Spout或者Bolt发出的Tuple。Spout和Bolt都统称为component。

      • 逻辑图(水管是Spout,闪电是Bolt

        img
      • 提交流程图

        img

Storm环境搭建

环境准备

  • 关闭防火墙,修改/etc/hosts配置(3台机器的IP可以互相通信)

  • 配置JDK环境

  • 搭建Zookeeper集群(保证3台机器的Zookeeper都可用)

  • 安装python(最好是2.6.6版本以上)

    • 目前CentOS自带了python

      [root@yann-centos-187 local]# python
      Python 2.6.6 (r266:84292, Feb 22 2013, 00:00:18) 
      [GCC 4.4.7 20120313 (Red Hat 4.4.7-3)] on linux2
      Type "help", "copyright", "credits" or "license" for more information.
      

集群搭建

  • 下载并解压Storm发布版本

    [root@yann-centos-187 software]# tar -zxf apache-storm-0.9.2-incubating.tar.gz -C /usr/local/
    

    配置Storm环境变量

    #set java environment
    JAVA_HOME=/usr/local/jdk1.7.0_25
    JRE_HOME=/usr/local/jdk1.7.0_25/jre
    ZOOKEEPER_HOME=/usr/local/zookeeper-3.4.9
    STORM_HOME=/usr/local/apache-storm-0.9.2-incubating
    CLASS_PATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
    PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin:$ZOOKEEPER_HOME/bin:$STORM_HOME/bin
    export JAVA_HOME JRE_HOME CLASS_PATH PATH ZOOKEEPER_HOME STORM_HOME
    
  • 修改storm.yaml配置文件

    [root@yann-centos-187 apache-storm-0.9.2-incubating]# cd conf/
    [root@yann-centos-187 conf]# ls
    storm_env.ini  storm.yaml
    [root@yann-centos-187 conf]# vim storm.yaml
    

    修改下面一段

    ########### These MUST be filled in for a storm configuration
     storm.zookeeper.servers:
         - "192.168.1.187"
         - "192.168.1.188"
         - "192.168.1.189"
     
     # 主节点
     nimbus.host: "192.168.1.187"
     # 数据文件夹
     storm.local.dir: "/usr/local/apache-storm-0.9.2-incubating/data"
     ui.port: 18080
     # 配置supervisor的工作进程(slot槽)
     supervisor.slots.ports:
            - 6700
            - 6701
            - 6702
            - 6703
    

    把187机器上的Storm拷贝给其他两台机器

    scp -r apache-storm-0.9.2-incubating/ root@192.168.1.188:/usr/local/
    scp -r apache-storm-0.9.2-incubating/ root@192.168.1.189:/usr/local/
    

    修改其他两台机器的Storm环境变量

    刷新3台机器的环境变量

    [root@yann-centos-187 local]# source /etc/profile
    
  • 启动Strom各个后台进程

    • 后台启动主节点

      [root@yann-centos-187 local]# storm nimbus &
      [1] 5327
      
    • 后台启动从节点

      [root@yann-centos-188 local]# storm supervisor &
      [1] 6007
      
    • 若此时没启动成功,则需要修改/etc/hosts文件

      192.168.1.187 yann-centos-187
      192.168.1.188 yann-centos-188
      192.168.1.189 yann-centos-189
      
    • 使用jps查看进程是否启动(QuorumPeerMain为Zookeeper的进程)

      • 主节点

        [root@yann-centos-187 conf]# jps
        4369 QuorumPeerMain
        5539 nimbus
        5583 Jps
        

        查看日志

        [root@yann-centos-187 logs]# tail -f -n 10  nimbus.log 
        2017-07-10 06:31:49 o.a.z.ClientCnxn [INFO] EventThread shut down
        2017-07-10 06:31:49 o.a.z.ZooKeeper [INFO] Session: 0x5d2859c6280003 closed
        2017-07-10 06:31:49 o.a.c.f.i.CuratorFrameworkImpl [INFO] Starting
        2017-07-10 06:31:49 o.a.z.ZooKeeper [INFO] Initiating client connection, connectString=192.168.1.187:2181,192.168.1.188:2181,192.168.1.189:2181/storm sessionTimeout=20000 watcher=org.apache.curator.ConnectionState@4c9fec
        2017-07-10 06:31:49 o.a.z.ClientCnxn [INFO] Opening socket connection to server yann-centos-187/192.168.1.187:2181. Will not attempt to authenticate using SASL (unknown error)
        2017-07-10 06:31:49 o.a.z.ClientCnxn [INFO] Socket connection established to yann-centos-187/192.168.1.187:2181, initiating session
        2017-07-10 06:31:49 o.a.z.ClientCnxn [INFO] Session establishment complete on server yann-centos-187/192.168.1.187:2181, sessionid = 0x5d2859c6280004, negotiated timeout = 20000
        2017-07-10 06:31:49 o.a.c.f.s.ConnectionStateManager [INFO] State change: CONNECTED
        2017-07-10 06:31:49 o.a.c.f.s.ConnectionStateManager [WARN] There are no ConnectionStateListeners registered.
        2017-07-10 06:31:49 b.s.d.nimbus [INFO] Starting Nimbus server...
        
      • 从节点

        [root@yann-centos-188 local]# jps
        6224 Jps
        6181 supervisor
        2851 QuorumPeerMain
        

        查看日志

        [root@yann-centos-188 logs]# tail -f -n 10 supervisor.log 
        2017-07-10 06:33:16 o.a.z.ClientCnxn [INFO] EventThread shut down
        2017-07-10 06:33:16 o.a.z.ZooKeeper [INFO] Session: 0x15d285afbe70001 closed
        2017-07-10 06:33:16 o.a.c.f.i.CuratorFrameworkImpl [INFO] Starting
        2017-07-10 06:33:16 o.a.z.ZooKeeper [INFO] Initiating client connection, connectString=192.168.1.187:2181,192.168.1.188:2181,192.168.1.189:2181/storm sessionTimeout=20000 watcher=org.apache.curator.ConnectionState@72345f3c
        2017-07-10 06:33:16 o.a.z.ClientCnxn [INFO] Opening socket connection to server yann-centos-188/192.168.1.188:2181. Will not attempt to authenticate using SASL (unknown error)
        2017-07-10 06:33:16 o.a.z.ClientCnxn [INFO] Socket connection established to yann-centos-188/192.168.1.188:2181, initiating session
        2017-07-10 06:33:16 o.a.z.ClientCnxn [INFO] Session establishment complete on server yann-centos-188/192.168.1.188:2181, sessionid = 0x15d285afbe70002, negotiated timeout = 20000
        2017-07-10 06:33:16 o.a.c.f.s.ConnectionStateManager [INFO] State change: CONNECTED
        2017-07-10 06:33:16 o.a.c.f.s.ConnectionStateManager [WARN] There are no ConnectionStateListeners registered.
        2017-07-10 06:33:16 b.s.d.supervisor [INFO] Starting supervisor with id dc312fc6-efb2-4170-be2b-c94a446fa2a4 at host yann-centos-188
        
      • 在主节点的机器上启动UI

        [root@yann-centos-187 local]# storm ui &
        [1] 2940
        
        [root@yann-centos-187 logs]# tail -f -n 10 ui.log 
        2017-07-10 06:41:26 o.m.log [INFO] Logging to Logger[org.mortbay.log] via org.mortbay.log.Slf4jLog
        2017-07-10 06:41:26 o.m.log [INFO] jetty-6.1.26
        2017-07-10 06:41:26 o.m.log [INFO] Started SocketConnector@0.0.0.0:18080
        

Storm Hello World 示例

建立maven工程

  • 引入依赖

    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>0.9.2-incubating</version>
    </dependency>
    
  • 按如下流程开发java代码(Spout是数据来源,这个图整体就是一个Topology图)

    img
  • 编写数据源类Spout,有如下两种方式

    • 继承BaseRichSpout类
    • 实现IRichSpout接口
    • 几个需要重写或实现的方法
      • open
      • nextTuple
      • declareOutputFields
  • 数据处理类Bolt,有两种方式

    • 继承BaseBasicBolt类
    • 实现IRichBolt接口
    • 几个需要重写或实现的方法
      • Execute
      • declareOutputFields
  • 最后编写主函数Topology去提交一个任务

    • Storm提供两种模式使用Topology
      • 本地模式:无需Storm集群,直接在java中运行,一般用于测试和开发阶段,执行main函数即可
      • 集群模式:需要Storm集群,把实现的java程序打成jar,然后使用Storm命令把Topology提交到集群中

本地模式

  • 直接运行main即可

集群模式

  • 首先需要把可执行代码打成jar

    • 注意jar的编译版本和linux的jdk版本一致
      • 这里需要把3台机器的jdk都改为1.8
    • 为何制定了maven的编译版本为1.7,打出来的包还是1.8点?
  • 把jar拷贝到linux系统中,执行

    [root@yann-centos-187 local]# storm jar archi-storm-0.0.1-SNAPSHOT.jar cn.ares.cocoon.storm.simple.topology.PWTopology1
    
    819  [main] INFO  backtype.storm.StormSubmitter - Jar not uploaded to master yet. Submitting jar...
    840  [main] INFO  backtype.storm.StormSubmitter - Uploading topology jar archi-storm-0.0.1-SNAPSHOT.jar to assigned location: /usr/local/apache-storm-0.9.2-incubating/data/nimbus/inbox/stormjar-8d26f835-df7e-45ae-a834-cb8b04cc4f94.jar
    862  [main] INFO  backtype.storm.StormSubmitter - Successfully uploaded topology jar to assigned location: /usr/local/apache-storm-0.9.2-incubating/data/nimbus/inbox/stormjar-8d26f835-df7e-45ae-a834-cb8b04cc4f94.jar
    862  [main] INFO  backtype.storm.StormSubmitter - Submitting topology top1 in distributed mode with conf {"topology.workers":2,"topology.debug":true}
    1410 [main] INFO  backtype.storm.StormSubmitter - Finished submitting topology: top1
    
  • 然后查看数据

    • top1在运行
    • 5个task
    • 2个worker
    [root@yann-centos-187 local]# storm list
    
    1910 [main] INFO  backtype.storm.thrift - Connecting to Nimbus at 192.168.1.187:6627
    Topology_name        Status     Num_tasks  Num_workers  Uptime_secs
    -------------------------------------------------------------------
    top1                 ACTIVE     5          2            19        
    
  • 在ui上也可以看到这些信息

    3377CC6C-4465-444A-999E-9566A6EB45B0
  • 此时其它两个节点可以看到出现了worker进程

    [root@yann-centos-188 local]# jps
    2700 supervisor
    2645 QuorumPeerMain
    2901 worker
    2911 Jps
    
  • 查看188机器的日志,在不断的处理任务

    [root@yann-centos-188 logs]# tail -f -n 100 worker-6700.log 
    2017-07-13 07:35:31 b.s.d.task [INFO] Emitting: spout default [python]
    2017-07-13 07:35:32 b.s.d.task [INFO] Emitting: spout default [python]
    2017-07-13 07:35:32 b.s.d.task [INFO] Emitting: spout default [groovy]
    2017-07-13 07:35:33 b.s.d.task [INFO] Emitting: spout default [groovy]
    2017-07-13 07:35:33 b.s.d.task [INFO] Emitting: spout default [php]
    2017-07-13 07:35:34 b.s.d.task [INFO] Emitting: spout default [php]
    2017-07-13 07:35:34 b.s.d.task [INFO] Emitting: spout default [php]
    2017-07-13 07:35:35 b.s.d.task [INFO] Emitting: spout default [java]
    2017-07-13 07:35:35 b.s.d.task [INFO] Emitting: spout default [php]
    2017-07-13 07:35:36 b.s.d.task [INFO] Emitting: spout default [groovy]
    2017-07-13 07:35:36 b.s.d.task [INFO] Emitting: spout default [groovy]
    2017-07-13 07:35:37 b.s.d.task [INFO] Emitting: spout default [groovy]
    2017-07-13 07:35:37 b.s.d.task [INFO] Emitting: spout default [python]
    2017-07-13 07:35:38 b.s.d.task [INFO] Emitting: spout default [java]
    2017-07-13 07:35:38 b.s.d.task [INFO] Emitting: spout default [php]
    2017-07-13 07:35:39 b.s.d.task [INFO] Emitting: spout default [ruby]
    2017-07-13 07:35:39 b.s.d.task [INFO] Emitting: spout default [php]
    2017-07-13 07:35:40 b.s.d.task [INFO] Emitting: spout default [ruby]
    2017-07-13 07:35:40 b.s.d.task [INFO] Emitting: spout default [java]
    2017-07-13 07:35:41 b.s.d.task [INFO] Emitting: spout default [ruby]
    2017-07-13 07:35:41 b.s.d.task [INFO] Emitting: spout default [groovy]
    2017-07-13 07:35:42 b.s.d.task [INFO] Emitting: spout default [php]
    2017-07-13 07:35:42 b.s.d.task [INFO] Emitting: spout default [python]
    2017-07-13 07:35:43 b.s.d.task [INFO] Emitting: spout default [python]
    2017-07-13 07:35:43 b.s.d.task [INFO] Emitting: spout default [groovy]
    2017-07-13 07:35:44 b.s.d.task [INFO] Emitting: spout default [java]
    2017-07-13 07:35:44 b.s.d.task [INFO] Emitting: spout default [java]
    
  • 查看189的日志,正在不断的处理数据,并把数据写入文件中,注意189需要建立对应的目录,否则会报错

    [root@yann-centos-189 logs]# tail -f -n 100 worker-6700.log 
    2017-07-13 07:38:28 b.s.d.executor [INFO] Processing received message source: spout:4, stream: default, id: {}, [java]
    2017-07-13 07:38:28 c.a.c.s.b.PrintBolt [INFO] print:java
    2017-07-13 07:38:28 b.s.d.task [INFO] Emitting: print-bolt default [java]
    2017-07-13 07:38:28 b.s.d.executor [INFO] Processing received message source: print-bolt:3, stream: default, id: {}, [java]
    2017-07-13 07:38:28 b.s.d.executor [INFO] Processing received message source: spout:4, stream: default, id: {}, [groovy]
    2017-07-13 07:38:28 c.a.c.s.b.PrintBolt [INFO] print:groovy
    2017-07-13 07:38:28 b.s.d.task [INFO] Emitting: print-bolt default [groovy]
    
  • 查看189上产生的文件

    [root@yann-centos-189 temp]# cat cn.ares.cocoon.storm.bolt.WriteBolt\@2913356b 
    java/njava/njava/ngroovy/nphp/npython/npython/nphp/nphp/njava/npython/njava/nphp/nphp/ngroovy/npython/njava/nruby/npython/nphp/nphp/ngroovy/nphp/njava/njava/njava/ngroovy/nphp/nphp/njava/npython/nruby/njava/npython/nphp/npython/nruby/nphp/ngroovy/njava/ngroovy/njava/ngroovy/njava/ngroovy/njava/ngroovy/npython/nphp/npython/nphp/nphp/ngroovy/nruby/nphp/ngroovy/ngroovy/npython/ngroovy/nphp/nruby/npython/nruby/ngroovy/njava/nruby/nphp/nphp/nruby/nphp/nphp/ngroovy/njava/ngroovy/npython/nphp/nphp/nruby/ngroovy/ngroovy/nphp/nphp/ngroovy/nphp/npython/nruby/nruby/npython/njava/nphp/njava/nruby/njava/nruby/npython/npython/nphp/npython/nruby/ngroovy/nruby/njava/npython/ngroovy/nruby/ngroovy/nruby/nphp/njava/ngroovy/nphp/ngroovy/njava/ngroovy/ngroovy/nruby/ngroovy/nphp/nphp/npython/ngroovy/nruby/npython/npython/npython/ngroovy/nphp/njava/nphp/npython/ngroovy/njava/njava/npython/njava/njava/nruby/ngroovy/nruby/nphp/npython/njava/nphp/nphp/nruby/nphp/nruby/nphp/ngroovy/ngroovy/ngroovy/nruby/npython/ngroovy/npython/ngroovy/ngroovy/ngroovy/ngroovy/nphp/npython/ngroovy/njava/npython/nruby/njava/npython/npython/nruby/nruby/nruby/npython/nruby/npython/npython/njava/nruby/npython/ngroovy/nruby/njava/nphp/npython/nruby/npython/njava/ngroovy/npython/nphp/nphp/nphp/nphp/npython/nphp/nphp/nruby/nruby/npython/njava/njava/npython/npython/npython/ngroovy/nphp/npython/npython/njava/nruby/ngroovy/nphp/njava/njava/npython/npython/njava/ngroovy/nruby/ngroovy/nruby/npython/ngroovy/ngroovy/npython/njava/ngroovy/ngroovy/njava/nphp/n[root@yann-centos-189 temp]# 
    
  • 此时输入storm kill可以停止进程

  • 此时jar会存在如下目录

    [root@yann-centos-187 apache-storm-0.9.2-incubating]# cd data/
    [root@yann-centos-187 data]# ls
    nimbus
    [root@yann-centos-187 data]# cd nimbus/
    [root@yann-centos-187 nimbus]# ls
    inbox  stormdist
    [root@yann-centos-187 nimbus]# cd inbox/
    [root@yann-centos-187 inbox]# ls
    stormjar-04dc559f-9543-4c3d-bf3c-e696c878bf49.jar
    
  • 也可以暂停,此时进程并没有结束,并且worker还存在,再点击Activate则会继续执行

    B3910A48-FC85-4308-A52B-79838447C689

Storm API详解

Storm的组件

Topology

拓扑

  • 拓扑是一个有向图的计算。
  • 创建拓扑步骤如下
    • 构建TopologyBuilder对象
    • 设置Spout数据源对象
    • 设置Bolt数据处理对象
    • 构建Config对象
    • 提交拓扑
  • 使用拓扑的具体流程是使用storm命令把一个jar提交给nimbus节点,然后nimbus会把任务分配给具体的子节点supervisor去工作

Stream grouping

流分组、数据的分发方式

// 分组模式
builder.setBolt("print-bolt", new PrintBolt()).shuffleGrouping("spout");
builder.setBolt("write-bolt", new WriteBolt()).shuffleGrouping("print-bolt");

Spout

喷口、消息源

Bolt

螺栓、处理器

Worker

工作进程

cfg.setNumWorkers(2); // 2个JVM

结构图

Executor

执行器、Task的线程

默认情况下,一个执行器执行一个任务,但是如果指定了任务的数目,则任务会平均分配到执行器中。但是实际使用中,即使设定了多个执行器,也有可能有的执行器被重复使用,有的执行器没有被使用。

Task

具体的执行任务

  • 每个Task有自己独立的执行线程

Configuration

配置

// 配置
Config cfg = new Config();
cfg.setNumWorkers(2); // 2个JVM
cfg.setDebug(true);

Strom的数据结构

tuple

tuple元组

  • tuple是storm的主要,并且是storm中使用的最基本单元、数据模型和元组

tuple描述

  • tuple就是一个值列表,tuple中的值可以是任何类型的,动态类型的tuple的fields可以不用声明,默认情况下,storm中的tuple支持私有类型、字符串、字节数组等作为它的字段值,如果使用其他类型,就需要序列化该类型。
  • tuple的字段默认类型有:integer、float、double、long、short、string、byte、binary
  • tuple可以理解成键值对,例如、创建一个bolt要发送2个字段(命名为double和triple),其中键就是定义declareOutputFields方法中的fields对象,值就是在emit方法中发送的values对象

API使用实例

设置多个执行器和工作进程

public static void main(String[] args) throws Exception {
    // 配置
    Config cfg = new Config();
    cfg.setNumWorkers(2); // 2个JVM
    cfg.setDebug(true);

    // 建立拓扑结构
    TopologyBuilder builder = new TopologyBuilder();
    // 设置两个执行器和2个任务
    builder.setSpout("spout", new PWSpout(), 2);// 默认是.setNumTasks(2)
    // 产生2个执行器和4个任务
    builder.setBolt("print-bolt", new PrintBolt(), 2).shuffleGrouping("spout").setNumTasks(4);
    // 设置6个执行器和6个任务
    builder.setBolt("write-bolt", new WriteBolt(), 6).shuffleGrouping("print-bolt");

//        localModel(cfg, builder);
    clusterModel(cfg, builder);
}
  • 此时我们看本地执行效果,可以看到write-bolt产生了6个文件

    • 可以知道有6个实例去处理写入,有6个线程并发处理结果

    • 其中builder.setSpout("spout", new PWSpout(), 2)效果如下,有2个并行的任务,上面6个线程一样的原理

    • builder.setBolt("x-bolt", new PrintBolt(), 2).shuffleGrouping("spout").setNumTasks(4)效果如下

Storm流分组

  1. Shuffle Grouping :随机分组,尽量均匀分布到下游Bolt中。

    将流分组定义为混排。这种混排分组意味着来自Spout的输入将混排,或随机分发给此Bolt中的任务。shuffle grouping对各个task的tuple分配的比较均匀。

  2. Fields Grouping :按字段分组,按数据中field值进行分组;相同field值的Tuple被发送到相同的Task(但是不同的field不一定就在不同的task中)。

    这种grouping机制保证相同field值的tuple会去同一个task,这对于WordCount来说非常关键,如果同一个单词不去同一个task,那么统计出来的单词次数就不对了。

    “if the stream is grouped by the “user-id” field, tuples with the same “user-id” will always go to the same task”.

  3. All grouping :广播。

    广播发送, 对于每一个tuple将会复制到每一个bolt中处理。

  4. Global grouping :全局分组,Tuple被分配到一个Bolt中的一个Task,实现事务性的Topology。

    Stream中的所有的tuple都会发送给同一个bolt任务处理,所有的tuple将会发送给拥有最小task_id的bolt任务处理。

  5. None grouping :不分组。

    不关注并行处理负载均衡策略时使用该方式,目前等同于shuffle grouping,另外storm将会把bolt任务和他的上游提供数据的任务安排在同一个线程下

  6. Direct grouping :直接分组(指定分组)。

    由tuple的发射单元直接决定tuple将发射给那个bolt,一般情况下是由接收tuple的bolt决定接收哪个bolt发射的Tuple。这是一种比较特别的分组方法,用这种分组意味着消息的发送者指定由消息接收者的哪个task处理这个消息。 只有被声明为Direct Stream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。消息处理者可以通过TopologyContext来获取处理它的消息的taskid(OutputCollector.emit方法也会返回taskid)。

  7. 本地分组:如果目标Bolt在同一个工作进程存在一个或多个任务,元祖数据会随机分配给执行任务,否则该分组方式与随机分组方式一样。

Strom WordCount

统计单词的小程序

数据源Sentence Spout获得数据(一个句子,包含多个单词)以后,发送给SplitBolt进行切分,然后由CountBolt进行统计结果,最后由ReportBolt记录结果。

59958F12-82E8-4642-9C02-B4998F4EC372

Storm的数据可靠性

Spout是Storm数据流的入口,在设计拓扑时,一件很重要的事情就是需要考虑消息的可靠性。

  1. 利用Storm可靠性机制(源生的ackfail),可以很容易的提供至少一次的处理(at least once processing):也就是在一个tuple超时或者fail的时候,Storm会调用Spout的fail函数,在这里,我们可以实现一个重发tuple的机制,当然,这种重发一般都建立在消息队列中间件的重发功能上的;

    1. 如果在第一个bolt失败的时候,可以重试;
    2. 如果在第二个bolt失败的时候,重试就会出现事务问题,如果数据入库,则可以和数据库的id进行比对,或者尽量不要拆分tuple;
    // 注意,此处的index放在第二个参数里,而不是放在new Values里,否则不会进行ack和fail
    collector.emit(new Values(sentence[index]), index);
    

    Storm要求如果要track一个Tuple,必须要指定其messageId,也就是回调回ack和fail方法的参数。如果我们不指定,Storm是不会去track该tuple的,即不保证消息丢失!

  2. 可以使用IBatchSpout批量发送,如果有失败的,则一批都会滚;

  3. 可以使用Trident框架,Trident是利用了幂等性进行对比;

acker

Storm有一组叫做acker的特殊任务,它们负责跟踪DAG(有向无环图)中的每个消息

  1. Spout在初始化时会产生一个tasksId;

  2. Spout中创建新的Tuple,其id是一个64位的随机数;

  3. Spout将新建的Tuple发送出去(给出了messageId来开启Tuple的追踪), 同时会发送一个消息到某个acker,要求acker进行追踪。该消息包含两部分:

  4. Spout的taskId:用户acker在整个tuple树被完全处理后找到原始的Spout进行回调ack或fail

  5. 一个64位的ack val值: 标志该tuple是否被完全处理。初始值为0。

  6. 一个Bolt在处理完Tuple后,如果发射了一个新的anchor tuple,Storm会维护anchor tuple的列表;

  7. 该Bolt调用OutputCollector.ack()时,Storm会做如下操作:

    1. 将anchor tuple列表中每个已经ack过的和新创建的Tuple的id做异或(XOR)。假定Spout发出的TupleID是tuple-id-0,该Bolt新生成的TupleID为tuple-id-1,那么,tuple-id-0XORtuple-id-0*XOR *tuple-id-1
    2. Storm根据该原始TupleID进行一致性hash算法,找到最开始Spout发送的那个acker,然后把上面异或后得出的ack val值发送给acker
  8. acker收到新的ack val值后,与保存的原始的Tuple的id进行异或,如果为0,表示该Tuple已被完全处理,则根据其taskId找到原始的Spout,回调其ack()方法。

  9. fail的机制类似,在发现fail后直接回调Spout的fail方法。

Storm就是通过这个acker的机制来保证数据不丢失。

参考文章

Storm DRPC 详解

RPC

参考文章

DRPC

分布式RPC,Distributed RPC。

引入DRPC主要是利用storm的实时计算能力来并行化CPU密集性的计算任务。

DRPC Server

工作过程

Distributed RPC是由一个”DPRC Server”协调的(storm自带了一个实现)。DRPC服务器工作过程如下:

  1. 接收一个RPC请求。
  2. 发送请求到storm topology
  3. 从storm topology接收结果。
  4. 把结果发回给等待的客户端。

工作流程

img

DRPC配置与示例

Storm提供了一个称作LinearDRPCTopologyBuilder的topology builder,它把实现DRPC的几乎所有步骤都简化了

官方示例

实现DRPC

  1. 修改storm配置文件(理论上只需要修改主节点的配置)

    [root@yann-centos-187 conf]# vim storm.yaml
    
    ## Locations of the drpc servers
     drpc.servers:
         - "192.168.1.187"
    
    • 需要注意的是:必须修改所有Nimbus和supervisor上的配置文件,设置drpc server地址。否则在运行过程中可能无法返回结果
  2. 启动storm的各个节点

    [root@yann-centos-187 conf]# storm nimbus &
    [1] 3507
    [root@yann-centos-187 conf]# storm ui &
    [2] 3567
    [root@yann-centos-187 conf]# storm drpc &
    [3] 3600
    [root@yann-centos-187 conf]# jps
    3600 drpc
    3507 nimbus
    3636 Jps
    3483 QuorumPeerMain
    3567 core
    
  3. 编写测试代码BasicDRPCTopology,打成jar,传入服务器,执行这个jar

    [root@yann-centos-187 local]# storm jar archi-storm-0.0.1-SNAPSHOT.jar cn.ares.cocoon.storm.drpc.topology.BasicDRPCTopology drpc-top
    
    • 其中drpc-top,是入参,指定topology的名字
  4. 在UI中查看,已经出现了这个topology

    9E7800CD-1072-4C8D-B7DE-9C9D058FFAEA
  5. 执行客户端代码DemoDRPCClient,结果如下

    /Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/bin/java -Dfile.encoding=UTF-8 -classpath...
    hello-哈哈被处理啦
    world-哈哈被处理啦
    test-哈哈被处理啦
    ha-哈哈被处理啦
    client-哈哈被处理啦
    
    Process finished with exit code 0
    

官方示例demo

  • 疑问
    • 为什么按id分组:根据测试,每次drpc.executeGetTweeters只执行一个,只有一个id。虽然开了4个并发,但是只处理了3个任务。如果开启4个执行器去处理,那么处理的线程是这4个中的,有可能一样,也有可能不同,如果只开启一个线程,那么只有一个线程处理了。

DRPC实际用途

  1. 用一个queue,实时的往queue里塞数据,然后这边实时的检查,queue啥时有数据了,就从queue中take一个数据,然后放到storm中的topology中去执行;
  2. 使用kafka做数据源,用kafka生产数据,然后通过kafka直接提交到topology中,然后就可以汇总一个结果了;

Storm Trident

Trident Function

09007B43-5758-4978-BD3F-9A43D89F1A1D

官方文档

参考文章

Learning Storm - 第5章 - Trident functions

介绍

代码示例

  1. 创建代码TridentFunctionDemo

    • 在Trident中使用BaseFunction代替以前的bolt
    • 拓扑使用TridentTopology
  2. 本地执行,输出结果:

    >>>>>>>>>>>>>>> a - b - c - d is 1 - 4 - 7 - 10
    >>>>>>>>>>>>>>> sum is 5
    >>>>>>>>>>>>>>> a - b - c - d is 1 - 1 - 3 - 11
    >>>>>>>>>>>>>>> sum is 2
    >>>>>>>>>>>>>>> a - b - c - d is 2 - 2 - 7 - 1
    >>>>>>>>>>>>>>> sum is 4
    >>>>>>>>>>>>>>> a - b - c - d is 2 - 5 - 7 - 2
    >>>>>>>>>>>>>>> sum is 7
    

Trident Filters

052E55B1-BBE3-40AD-B8C7-D48FD82C5782

参考文章

Learning Storm - 第5章 - Trident filters

代码示例

  1. 创建处理类TridentFiltersDemo

    • 与function不同的是,fliter在each时不需要返回值
  2. 执行代码,输出结果如下:

    >>>>>>>>>>>>>>>> a - b - c - d is 1 - 1 - 3 - 11
    >>>>>>>>>>>>>>>> a - b - c - d is 2 - 2 - 7 - 1
    

Trident projection

参考文章

Learning Storm - 第5章 - Trident projection

代码练习

==待补充==

Trident repartitioning operations

参考文章

Learning Storm - 第5章 - Trident repartitioning operations

Shuffle

使用随机轮询算法将tuple在目标分区之间均分

Broadcast

广播,每个元组都复制给所有目标分区

PartitionBy

paritionBy函数接收一组字段并根据这组字段做分区,具体是把这些字段做hash并对分区数取模,从而确定每个tuple落在哪个分区,能够保证同样字段的tuple落在同一分区

Global

所有tuple都去向同一个partition

batchGlobal

同一批的所有tuple落在相同分区,不同批的tuple可能去向不同的分区。这个可以保证同一批的事务一致?

partition

通过使用自定义的分区方法确定tuple落在哪个分区,实现backtype.storm.grouping.CustomStreamGrouping接口

Transactional

Batch与Spout

参考文章

Learning Storm - 第5章 - A transactional topology

功能实现

  • 实现ITridentSpout接口:最通用的API,可以支持transaction or opaque transactional语义
  • 实现IBatchSpout接口:一个non-transactional spout
  • 实现IPartitionedTridentSpout接口:一个transactional spout,幂等性的
  • 实现IOpaquePartitionedTridentSpout接口:一个opaque transactional spout

Storm与Kafka

Storm与Redis

Top