1.创建maven工程,导入依赖。
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.0</version>
</dependency>
2.创建数据流入源(spout)。
/**
* 读取外部的文件,将一行一行的数据发送给下游的bolt
* 类似于hadoop MapReduce中的inputformat
*/
public class ReadFileSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private BufferedReader bufferedReader;
/**
* 初始化方法,类似于这个类的构造器,只被运行一次
* 一般用来打开数据连接,打开网络连接。
*
* @param conf 传入的是storm集群的配置文件和用户自定义配置文件,一般不用。
* @param context 上下文对象,一般不用
* @param collector 数据输出的收集器,spout类将数据发送给collector,由collector发送给storm框架。
*/
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
try {
bufferedReader = new BufferedReader(new FileReader(new File("//data//test//wordcount.txt")));
} catch (FileNotFoundException e) {
e.printStackTrace();
}
this.collector = collector;
}
/**
* 下一个tuple,tuple是数据传送的基本单位。
* 后台有个while循环一直调用该方法,每调用一次,就发送一个tuple出去
*/
public void nextTuple() {
String line = null;
try {
line = bufferedReader.readLine();
if (line!=null){
collector.emit(Arrays.asList(line));
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 声明发出的数据是什么
*
* @param declarer
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("juzi"));
}
}
3.splitBolt进行单词切割。
/**
* 输入:一行数据
* 计算:对一行数据进行切割
* 输出:单词及单词出现的次数
*/
public class SplitBolt extends BaseRichBolt{
private OutputCollector collector;
/**
* 初始化方法,只被运行一次。
* @param stormConf 配置文件
* @param context 上下文对象,一般不用
* @param collector 数据收集器
*/
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
/**
* 执行业务逻辑的方法
* @param input 获取上游的数据
*/
@Override
public void execute(Tuple input) {
// 获取上游的句子
String juzi = input.getStringByField("juzi");
// 对句子进行切割
String[] words = juzi.split(" ");
// 发送数据
for (String word : words) {
// 需要发送单词及单词出现的次数,共两个字段
collector.emit(Arrays.asList(word,"1"));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word","num"));
}
}
4.单词计数
/**
* 输入:单词及单词出现的次数
* 输出:打印在控制台
* 负责统计每个单词出现的次数
* 类似于hadoop MapReduce中的reduce函数
*/
public class WordCountBolt extends BaseRichBolt {
private Map<String, Integer> wordCountMap = new HashMap<String, Integer>();
/**
* 初始化方法
*
* @param stormConf 集群及用户自定义的配置文件
* @param context 上下文对象
* @param collector 数据收集器
*/
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
// 由于wordcount是最后一个bolt,所有不需要自定义OutputCollector collector,并赋值。
}
@Override
public void execute(Tuple input) {
//获取单词出现的信息(单词、次数)
String word = input.getStringByField("word");
String num = input.getStringByField("num");
// 定义map记录单词出现的次数
// 开始计数
if (wordCountMap.containsKey(word)) {
Integer integer = wordCountMap.get(word);
wordCountMap.put(word, integer + Integer.parseInt(num));
} else {
wordCountMap.put(word, Integer.parseInt(num));
}
// 打印整个map
System.out.println(wordCountMap);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// 不发送数据,所以不用实现。
}
}
5.WordCountTopology类(main方法)
/**
* wordcount的驱动类,用来提交任务的。
*/
public class WordCountTopology {
public static void main(String[] args) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
// 通过TopologyBuilder来封装任务信息
TopologyBuilder topologyBuilder = new TopologyBuilder();
// 设置spout,获取数据
topologyBuilder.setSpout("readfilespout",new ReadFileSpout(),2);
// 设置splitbolt,对句子进行切割
topologyBuilder.setBolt("splitbolt",new SplitBolt(),4).shuffleGrouping("readfilespout");
// 设置wordcountbolt,对单词进行统计
topologyBuilder.setBolt("wordcountBolt",new WordCountBolt(),2).shuffleGrouping("splitbolt");
// 准备一个配置文件
Config config = new Config();
// storm中任务提交有两种方式,一种方式是本地模式,另一种是集群模式。
// LocalCluster localCluster = new LocalCluster();
// localCluster.submitTopology("wordcount",config,topologyBuilder.createTopology());
//在storm集群中,worker是用来分配的资源。如果一个程序没有指定worker数,那么就会使用默认值。
config.setNumWorkers(2);
//提交到集群
StormSubmitter.submitTopology("wordcount1",config,topologyBuilder.createTopology());
}
}
6.指定jdk版本插件
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.3</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
7.用 mvn clean install 命令编译,然后把target目录下生成的 storm jar包,拷到master-1机器上,执行集群管理-编译代码并提交任务。
bin/storm jar storm-samples-jar-with-dependencies.jar com.test.storm.MainTopology clusterStorm
8.在Amabari页面中StormUI里面,点进 clusterStorm 可以查看该拓扑运行的情况。
文档内容是否对您有帮助?
评价建议不能为空
非常感谢您的反馈,我们会继续努力做到更好!