全部文档
当前文档

暂无内容

如果没有找到您期望的内容,请尝试其他搜索词

文档中心

Storm简介

最近更新时间:2024-01-16 16:40:55

Storm 是一个实时的、分布式的、可靠的流式数据处理系统。它的工作就是委派各种组件分别独立的处理一些简单任务。在 Storm 集群中处理输入流的是 Spout 组件,而 Spout 又把读取的数据传递给叫Bolt的组件。Bolt组件会对收到的数据元组进行处理,也有可能传递给下一个Bolt。我们可以把 Storm集群想象成一个由Bolt 组件组成的链条集合,数据在这些链条上传输,而Bolt作为链条上的节点来对数据进行处理。

Storm 保证每个消息都会得到处理,而且处理速度非常快,在一个小集群中,每秒可以处理数以百万计的消息。Storm 的处理速度非常惊人:经测试,每个节点每秒可以处理 100 万个数据元组。其主要应用领域有实时分析、在线机器学习、持续计算、分布式 RPC(远过程调用协议,一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。)、ETL(数据抽取、转换和加载)等。

Storm 和 Hadoop 集群表面看上去很类似,但是 Hadoop 上面运行的是 MapReduce Jobs,而在Storm上运行的是拓扑 Topology,这两者之间是非常不一样的,关键区别是:MapReduce 最终会结束,而一个 Topology永远会运行(除非你手动 kill 掉),换句话说,Storm 是面向实时数据分析,而 Hadoop 面向的是离线数据分析。

Storm实践

WordCount程序

1.创建maven工程,导入依赖。

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>1.1.0</version>
</dependency>

2.创建数据流入源(spout)。

/**
 * 读取外部的文件,将一行一行的数据发送给下游的bolt
 * 类似于hadoop MapReduce中的inputformat
 */
public class ReadFileSpout extends BaseRichSpout {
    private SpoutOutputCollector collector;
    private BufferedReader bufferedReader;
 
    /**
     * 初始化方法,类似于这个类的构造器,只被运行一次
     * 一般用来打开数据连接,打开网络连接。
     *
     * @param conf      传入的是storm集群的配置文件和用户自定义配置文件,一般不用。
     * @param context   上下文对象,一般不用
     * @param collector 数据输出的收集器,spout类将数据发送给collector,由collector发送给storm框架。
     */
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        try {
            bufferedReader = new BufferedReader(new FileReader(new File("//data//test//wordcount.txt")));
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        }
        this.collector = collector;
    }
    /**
     * 下一个tuple,tuple是数据传送的基本单位。
     * 后台有个while循环一直调用该方法,每调用一次,就发送一个tuple出去
     */
    public void nextTuple() {
        String line = null;
        try {
            line = bufferedReader.readLine();
            if (line!=null){
                collector.emit(Arrays.asList(line));
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    /**
     * 声明发出的数据是什么
     *
     * @param declarer
     */
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("juzi"));
    }
}

3.splitBolt进行单词切割。

/**
 * 输入:一行数据
 * 计算:对一行数据进行切割
 * 输出:单词及单词出现的次数
 */
public class SplitBolt extends BaseRichBolt{
    private  OutputCollector collector;
    /**
     * 初始化方法,只被运行一次。
     * @param stormConf 配置文件
     * @param context 上下文对象,一般不用
     * @param collector 数据收集器
     */
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
    }
    /**
     * 执行业务逻辑的方法
     * @param input 获取上游的数据
     */
    @Override
    public void execute(Tuple input) {
        // 获取上游的句子
        String juzi = input.getStringByField("juzi");
        // 对句子进行切割
        String[] words = juzi.split(" ");
        // 发送数据
        for (String word : words) {
            // 需要发送单词及单词出现的次数,共两个字段
            collector.emit(Arrays.asList(word,"1"));
        }
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word","num"));
    }
}

4.单词计数

/**
 * 输入:单词及单词出现的次数
 * 输出:打印在控制台
 * 负责统计每个单词出现的次数
 * 类似于hadoop MapReduce中的reduce函数
 */
public class WordCountBolt extends BaseRichBolt {
    private Map<String, Integer> wordCountMap = new HashMap<String, Integer>();
 
    /**
     * 初始化方法
     *
     * @param stormConf 集群及用户自定义的配置文件
     * @param context   上下文对象
     * @param collector 数据收集器
     */
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        // 由于wordcount是最后一个bolt,所有不需要自定义OutputCollector collector,并赋值。
    }
    @Override
    public void execute(Tuple input) {
        //获取单词出现的信息(单词、次数)
        String word = input.getStringByField("word");
        String num = input.getStringByField("num");
        // 定义map记录单词出现的次数
        // 开始计数
        if (wordCountMap.containsKey(word)) {
            Integer integer = wordCountMap.get(word);
            wordCountMap.put(word, integer + Integer.parseInt(num));
        } else {
            wordCountMap.put(word, Integer.parseInt(num));
        }
        // 打印整个map
        System.out.println(wordCountMap);
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // 不发送数据,所以不用实现。
    }
}

5.WordCountTopology类(main方法)

/**
 * wordcount的驱动类,用来提交任务的。
 */
public class WordCountTopology {
    public static void main(String[] args) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
        // 通过TopologyBuilder来封装任务信息
        TopologyBuilder topologyBuilder = new TopologyBuilder();
//        设置spout,获取数据
        topologyBuilder.setSpout("readfilespout",new ReadFileSpout(),2);
//        设置splitbolt,对句子进行切割
        topologyBuilder.setBolt("splitbolt",new SplitBolt(),4).shuffleGrouping("readfilespout");
//        设置wordcountbolt,对单词进行统计
        topologyBuilder.setBolt("wordcountBolt",new WordCountBolt(),2).shuffleGrouping("splitbolt");
//        准备一个配置文件
        Config config = new Config();
//        storm中任务提交有两种方式,一种方式是本地模式,另一种是集群模式。
//        LocalCluster localCluster = new LocalCluster();
//        localCluster.submitTopology("wordcount",config,topologyBuilder.createTopology());
        //在storm集群中,worker是用来分配的资源。如果一个程序没有指定worker数,那么就会使用默认值。
        config.setNumWorkers(2);
        //提交到集群
        StormSubmitter.submitTopology("wordcount1",config,topologyBuilder.createTopology());
    }
}

6.指定jdk版本插件

<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-compiler-plugin</artifactId>
    <version>3.6.3</version>
    <configuration>
        <source>1.8</source>
        <target>1.8</target>
    </configuration>
</plugin>

7.用 mvn clean install 命令编译,然后把target目录下生成的 storm jar包,拷到master-1机器上,执行集群管理-编译代码并提交任务。

bin/storm jar storm-samples-jar-with-dependencies.jar com.test.storm.MainTopology clusterStorm

8.在Amabari页面中StormUI里面,点进 clusterStorm 可以查看该拓扑运行的情况。

文档导读
纯净模式常规模式

纯净模式

点击可全屏预览文档内容
文档反馈