全部文档
当前文档

暂无内容

如果没有找到您期望的内容,请尝试其他搜索词

文档中心

Spark streaming 写入数据Kafka

最近更新时间:2025-08-18 17:25:23

Spark Streaming介绍

Spark Streaming 是 Spark Core 的一个扩展,用于高吞吐且容错地处理持续性的数据,目前支持的外部输入有 Kafka、Flume、HDFS/S3、Kinesis、Twitter 和 TCP socket。

Spark Streaming 将连续数据抽象成 DStream(Discretized Stream),而 DStream 由一系列连续的 RDD(弹性分布式数据集)组成,每个 RDD 是一定时间间隔内产生的数据。使用函数对 DStream 进行处理其实即为对这些 RDD 进行处理。

使用 Spark Streaming 作为 Kafka 的数据输入时,可支持 Kafka 稳定版本与实验版本:

Kafka Version

spark-streaming-kafka-0.8

spark-streaming-kafka-0.10

Broker Version

0.8.2.1 or higher

0.10.0 or higher

Api Maturity

Deprecated

Stable

Language Support

Scala、Java、Python

Scala、Java

Receiver DStream

Yes

No

Direct DStream

Yes

Yes

SSL / TLS Support

No

Yes

Offset Commit Api

No

Yes

Dynamic Topic Subscription

No

Yes

操作步骤

步骤一、获取托管Kafka接入地址

  1. 登录金山云KMR控制台

  2. 点击已经创建好的集群进入集群详情列表。

  3. 在集群详情页面找到获取集群接入地址,接入地址是生产消费需要用到的 bootstrap-server。

步骤二、创建Topic

  1. 开启公网访问,进入Kafka eagle webui仪表盘界面。

  2. 在kafka eagle webui 管理页面,单击新建,创建一个名为_test 的 Topic。接下来将以该 Topic 为例介绍如何消费。

步骤三、准备云服务器环境

Centos7.6 系统

package

version

sbt

0.13.16

hadoop

2.7.3

spark

2.1.0

protobuf

2.5.0

ssh

CentOS 默认安装

Java

1.8

步骤四、对接Kafka

一、向Kafka中生产消息

这里使用 2.8.1 版本的 Kafka 依赖。

  1. 在 build.sbt 添加依赖:

name := "Producer Example"
version := "1.0"
scalaVersion := "2.12.1"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.8.1"
  1. 配置 producer_example.scala:

import java.util.Properties
import org.apache.kafka.clients.producer._
object ProducerExample extends App {
 val  props = new Properties()
 props.put("bootstrap.servers", "172.16.16.12:9092") //集群信息中的内网 IP 与端口
  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  val producer = new KafkaProducer[String, String](props)
 val TOPIC="test"  //指定要生产的 Topic
 for(i<- 1 to 50){
        val record = new ProducerRecord(TOPIC, "key", s"hello $i") //生产 key 是"key",value 是 hello i 的消息
        producer.send(record)
 }
 val record = new ProducerRecord(TOPIC, "key", "the end "+new java.util.Date)
 producer.send(record)
 producer.close() //最后要断开
}

更多有关 ProducerRecord 的用法请参见 ProducerRecord 文档。

二、从Kakfa中消费消息

DirectStream

  1. 在 build.sbt 添加依赖:

name := "Consumer Example"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.1.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.1.0"
  1. 配置 DirectStream_example.scala:

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.OffsetRange
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import collection.JavaConversions._
import Array._
object Kafka {
    def main(args: Array[String]) {
        val kafkaParams = Map[String, Object](
            "bootstrap.servers" -> "172.16.16.12:9092",
            "key.deserializer" -> classOf[StringDeserializer],
            "value.deserializer" -> classOf[StringDeserializer],
            "group.id" -> "spark_stream_test1",
            "auto.offset.reset" -> "earliest",
            "enable.auto.commit" -> "false"
        )

        val sparkConf = new SparkConf()
        sparkConf.setMaster("local")
        sparkConf.setAppName("Kafka")
        val ssc = new StreamingContext(sparkConf, Seconds(5))
        val topics = Array("spark_test")

        val offsets : Map[TopicPartition, Long] = Map()

        for (i <- 0 until 3){
            val tp = new TopicPartition("spark_test", i)
            offsets.updated(tp , 0L)
        }
        val stream = KafkaUtils.createDirectStream[String, String](
            ssc,
            PreferConsistent,
            Subscribe[String, String](topics, kafkaParams)
        )
        println("directStream")
        stream.foreachRDD{ rdd=>
            //输出获得的消息
            rdd.foreach{iter =>
                val i = iter.value
                println(s"${i}")
            }
            //获得offset
            val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
            rdd.foreachPartition { iter =>
                val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
                println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
            }
        }

        // Start the computation
        ssc.start()
        ssc.awaitTermination()
    }
}

RDD

  1. 配置build.sbt(配置同上)。

  2. 配置RDD_example

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.OffsetRange
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import collection.JavaConversions._
import Array._
object Kafka {
    def main(args: Array[String]) {
        val kafkaParams = Map[String, Object](
            "bootstrap.servers" -> "172.16.16.12:9092",
            "key.deserializer" -> classOf[StringDeserializer],
            "value.deserializer" -> classOf[StringDeserializer],
            "group.id" -> "spark_stream",
            "auto.offset.reset" -> "earliest",
            "enable.auto.commit" -> (false: java.lang.Boolean)
        )
        val sc = new SparkContext("local", "Kafka", new SparkConf())
        val java_kafkaParams : java.util.Map[String, Object] = kafkaParams
        //按顺序向 parition 拉取相应 offset 范围的消息,如果拉取不到则阻塞直到超过等待时间或者新生产消息达到拉取的数量
        val offsetRanges = Array[OffsetRange](
            OffsetRange("spark_test", 0, 0, 5),
            OffsetRange("spark_test", 1, 0, 5),
            OffsetRange("spark_test", 2, 0, 5)
        )
        val range = KafkaUtils.createRDD[String, String](
            sc,
            java_kafkaParams,
            offsetRanges,
            PreferConsistent
        )
        range.foreach(rdd=>println(rdd.value))
        sc.stop()
    }
}

更多 kafkaParams 用法参见 kafkaParams 文档。

配置环境

安装 sbt

  1. 在 sbt 官网 上下载 sbt 包。

  2. 解压后在 sbt 的目录下创建一个 sbt_run.sh 脚本并增加可执行权限,脚本内容如下:

#!/bin/bash
SBT_OPTS="-Xms512M -Xmx1536M -Xss1M -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=256M"
java $SBT_OPTS -jar `dirname $0`/bin/sbt-launch.jar "$@"
chmod u+x ./sbt_run.sh
  1. 执行以下命令。

./sbt-run.sh sbt-version

若能看到 sbt 版本说明可以正常运行。

安装 protobuf

  1. 下载 protobuf 相应版本。

  2. 解压后进入目录。

./configure

make && make install

需要预先安装 gcc-g++,执行中可能需要 root 权限。

  1. 重新登录,在命令行中输入下述内容。

protoc --version

  1. 若能看到 protobuf 版本说明可以正常运行

安装 Hadoop 、Spark

在 kmr 控制台创建选择即可

说明:本示例同样适用 hadoop 用户进行操作。

  1. 解压进入目录。

  2. 修改配置文件

cp ./conf/spark-env.sh.template ./conf/spark-env.sh

vim ./conf/spark-env.sh

在第一行添加下述内容:

export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/Hadoop classpath)

根据 hadoop 安装情况修改路径。

  1. 运行示例

bin/run-example SparkPi

若成功安装可以看到程序输出 π 的近似值。

文档导读
纯净模式常规模式

纯净模式

点击可全屏预览文档内容
文档反馈