全部文档
当前文档

暂无内容

如果没有找到您期望的内容,请尝试其他搜索词

文档中心

Flink写入数据Kafka

最近更新时间:2025-08-18 17:22:48

Flink介绍

Apache Flink 是一个可以处理流数据的实时处理框架,用于在无界和有界数据流上进行有状态的计算。Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。

Apache Flink 擅长处理无界和有界数据集。Flink runtime 能够通过对时间和状态的精确控制处理无界数据流,也能够使用为固定大小数据集设计的算法和数据结构对有界数据集进行处理,并达到出色的性能。

应用程序可能会使用来自各种数据源(如消息队列或分布式日志,如 Apache Kafka 或 Kinesis)的实时数据。Flink 提供了 Apache Kafka 连接器,用于从 Kafka topic 中读取或者向其中写入数据,可提供一次精确的处理语义。

操作步骤

步骤一、获取托管Kafka接入地址

  1. 登录金山云KMR控制台

  2. 点击已经创建好的集群进入集群详情列表.

  3. 在集群详情页面找到获取集群接入地址,接入地址是生产消费需要用到的 bootstrap-server。

步骤二、创建Topic

  1. 开启公网访问,进入Kafka eagle webui仪表盘界面

  2. 在kafka eagle webui 管理页面,单击新建,创建一个名为kafka_test 的 Topic。接下来将以该 Topic 为例介绍如何消费.

步骤三、添加 Maven 依赖

pom.xml 配置如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>Test-Kafka</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.8.1</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.25</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.6.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.6.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.7.0</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.3</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

步骤四、消费 Kafka 中的消息

通过VPC方式消费

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;

public class KafkaConsumerDemo {
        public static void main(String args[]) throws Exception {
                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                Properties properties = new Properties();
                //公网接入域名地址,即公网路由地址,在集群详情页的接入方式模块获取。
                properties.setProperty("bootstrap.servers", "IP:PORT");
                //消费者组id。
                properties.setProperty("group.id", "testConsumerGroup");
                DataStream<String> stream = env
                                .addSource(new FlinkKafkaConsumer<>("topicName", new SimpleStringSchema(), properties));
                stream.print();
                env.execute();
        }
}
import org.apache.flink.api.common.serialization.SimpleStringSchema;
通过公网域名消费
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;

public class KafkaConsumerDemo {
        public static void main(String args[]) throws Exception {
                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                Properties properties = new Properties();
                //公网接入域名地址,即公网路由地址,在集群详情页的接入方式模块获取。
                properties.setProperty("bootstrap.servers", "IP:PORT");
                //消费者组id。
                properties.setProperty("group.id", "testConsumerGroup");
                properties.setProperty("security.protocol", "SASL_PLAINTEXT");
                properties.setProperty("sasl.mechanism", "PLAIN");
                //用户名和密码,注:用户名是需要拼接,并非控制台的用户名:instanceId#username。
                properties.setProperty("sasl.jaas.config", 
                                                             "org.apache.kafka.common.security.plain.PlainLoginModule required\nusername=\"yourinstanceId#yourusername\"\npassword=\"yourpassword\";");
                properties.setProperty("sasl.kerberos.service.name","kafka");
                DataStream<String> stream = env
                                .addSource(new FlinkKafkaConsumer<>("topicName", new SimpleStringSchema(), properties));
                stream.print();
                env.execute();
        }
}

文档导读
纯净模式常规模式

纯净模式

点击可全屏预览文档内容
文档反馈