最近更新时间:2024-01-16 16:40:55
Storm 是一个实时的、分布式的、可靠的流式数据处理系统。它的工作就是委派各种组件分别独立的处理一些简单任务。在 Storm 集群中处理输入流的是 Spout 组件,而 Spout 又把读取的数据传递给叫Bolt的组件。Bolt组件会对收到的数据元组进行处理,也有可能传递给下一个Bolt。我们可以把 Storm集群想象成一个由Bolt 组件组成的链条集合,数据在这些链条上传输,而Bolt作为链条上的节点来对数据进行处理。
Storm 保证每个消息都会得到处理,而且处理速度非常快,在一个小集群中,每秒可以处理数以百万计的消息。Storm 的处理速度非常惊人:经测试,每个节点每秒可以处理 100 万个数据元组。其主要应用领域有实时分析、在线机器学习、持续计算、分布式 RPC(远过程调用协议,一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。)、ETL(数据抽取、转换和加载)等。
Storm 和 Hadoop 集群表面看上去很类似,但是 Hadoop 上面运行的是 MapReduce Jobs,而在Storm上运行的是拓扑 Topology,这两者之间是非常不一样的,关键区别是:MapReduce 最终会结束,而一个 Topology永远会运行(除非你手动 kill 掉),换句话说,Storm 是面向实时数据分析,而 Hadoop 面向的是离线数据分析。
1.创建maven工程,导入依赖。
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.0</version>
</dependency>
2.创建数据流入源(spout)。
/**
* 读取外部的文件,将一行一行的数据发送给下游的bolt
* 类似于hadoop MapReduce中的inputformat
*/
public class ReadFileSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private BufferedReader bufferedReader;
/**
* 初始化方法,类似于这个类的构造器,只被运行一次
* 一般用来打开数据连接,打开网络连接。
*
* @param conf 传入的是storm集群的配置文件和用户自定义配置文件,一般不用。
* @param context 上下文对象,一般不用
* @param collector 数据输出的收集器,spout类将数据发送给collector,由collector发送给storm框架。
*/
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
try {
bufferedReader = new BufferedReader(new FileReader(new File("//data//test//wordcount.txt")));
} catch (FileNotFoundException e) {
e.printStackTrace();
}
this.collector = collector;
}
/**
* 下一个tuple,tuple是数据传送的基本单位。
* 后台有个while循环一直调用该方法,每调用一次,就发送一个tuple出去
*/
public void nextTuple() {
String line = null;
try {
line = bufferedReader.readLine();
if (line!=null){
collector.emit(Arrays.asList(line));
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 声明发出的数据是什么
*
* @param declarer
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("juzi"));
}
}
3.splitBolt进行单词切割。
/**
* 输入:一行数据
* 计算:对一行数据进行切割
* 输出:单词及单词出现的次数
*/
public class SplitBolt extends BaseRichBolt{
private OutputCollector collector;
/**
* 初始化方法,只被运行一次。
* @param stormConf 配置文件
* @param context 上下文对象,一般不用
* @param collector 数据收集器
*/
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
/**
* 执行业务逻辑的方法
* @param input 获取上游的数据
*/
@Override
public void execute(Tuple input) {
// 获取上游的句子
String juzi = input.getStringByField("juzi");
// 对句子进行切割
String[] words = juzi.split(" ");
// 发送数据
for (String word : words) {
// 需要发送单词及单词出现的次数,共两个字段
collector.emit(Arrays.asList(word,"1"));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word","num"));
}
}
4.单词计数
/**
* 输入:单词及单词出现的次数
* 输出:打印在控制台
* 负责统计每个单词出现的次数
* 类似于hadoop MapReduce中的reduce函数
*/
public class WordCountBolt extends BaseRichBolt {
private Map<String, Integer> wordCountMap = new HashMap<String, Integer>();
/**
* 初始化方法
*
* @param stormConf 集群及用户自定义的配置文件
* @param context 上下文对象
* @param collector 数据收集器
*/
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
// 由于wordcount是最后一个bolt,所有不需要自定义OutputCollector collector,并赋值。
}
@Override
public void execute(Tuple input) {
//获取单词出现的信息(单词、次数)
String word = input.getStringByField("word");
String num = input.getStringByField("num");
// 定义map记录单词出现的次数
// 开始计数
if (wordCountMap.containsKey(word)) {
Integer integer = wordCountMap.get(word);
wordCountMap.put(word, integer + Integer.parseInt(num));
} else {
wordCountMap.put(word, Integer.parseInt(num));
}
// 打印整个map
System.out.println(wordCountMap);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// 不发送数据,所以不用实现。
}
}
5.WordCountTopology类(main方法)
/**
* wordcount的驱动类,用来提交任务的。
*/
public class WordCountTopology {
public static void main(String[] args) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
// 通过TopologyBuilder来封装任务信息
TopologyBuilder topologyBuilder = new TopologyBuilder();
// 设置spout,获取数据
topologyBuilder.setSpout("readfilespout",new ReadFileSpout(),2);
// 设置splitbolt,对句子进行切割
topologyBuilder.setBolt("splitbolt",new SplitBolt(),4).shuffleGrouping("readfilespout");
// 设置wordcountbolt,对单词进行统计
topologyBuilder.setBolt("wordcountBolt",new WordCountBolt(),2).shuffleGrouping("splitbolt");
// 准备一个配置文件
Config config = new Config();
// storm中任务提交有两种方式,一种方式是本地模式,另一种是集群模式。
// LocalCluster localCluster = new LocalCluster();
// localCluster.submitTopology("wordcount",config,topologyBuilder.createTopology());
//在storm集群中,worker是用来分配的资源。如果一个程序没有指定worker数,那么就会使用默认值。
config.setNumWorkers(2);
//提交到集群
StormSubmitter.submitTopology("wordcount1",config,topologyBuilder.createTopology());
}
}
6.指定jdk版本插件
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.3</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
7.用 mvn clean install 命令编译,然后把target目录下生成的 storm jar包,拷到master-1机器上,执行集群管理-编译代码并提交任务。
bin/storm jar storm-samples-jar-with-dependencies.jar com.test.storm.MainTopology clusterStorm
8.在Amabari页面中StormUI里面,点进 clusterStorm 可以查看该拓扑运行的情况。
纯净模式