全部文档
当前文档

暂无内容

如果没有找到您期望的内容,请尝试其他搜索词

文档中心

客户端示例代码

最近更新时间:2021-06-08 15:56:27

说明

go_demo是对应的go语言实现,后续会补充java、C++、python等语言的实现。如果您在使用过程中发现问题或者有好的实现方式,请联系客服,我们会在第一时间处理。
go_demo这个示例展示了如何使用数据订阅从DTS获取数据并且解析数据。 整个的流程包含了:用户进行参数配置、使用原生的kafka consumer从DTS获取增量数据、将获取到数据进行解析,从中获取对应的库名、表名、SQL语句、点位信息等。
使用的时候请替换: config.User = “” config.Passwd = “” // kafka broker url config.BrokerURL = “” // kafka consumer group name config.GroupID = “” // topic to consume, partition is 0 config.Topic = “” config.StartTime = “”
如果您希望订阅响应topic全部消息(即从订阅任务创建开始所有增量数据),config.StartTime则置为空。如果您希望从可选时间范围内某一时刻开始订阅数据,则将config.StartTime置为某一时刻值,时间格式为"2006-01-02 15:04:05"。 配置完成后,直接运行go run go_demo.go,即可订阅数据。您也可以根据自己的需要对订阅结果进行相应处理。

go_demo

package main



import (

	"encoding/json"

	"fmt"

	"github.com/Shopify/sarama"

	cluster "github.com/bsm/sarama-cluster"

	"os"

	"os/signal"

	"time"

)



type KafkaConfig struct {

	User      string

	Passwd    string

	BrokerURL string

	GroupID   string

	Topic     string

	StartTime string

}



func main() {

	var config KafkaConfig

	config.User = ""

	config.Passwd = ""

	config.BrokerURL = ""

	config.GroupID = ""

	config.Topic = ""

	config.StartTime = ""



	clusterConsumer(config)

}



func clusterConsumer(con KafkaConfig) {

	config := cluster.NewConfig()

	config.Version = sarama.V2_3_0_0

	config.Net.SASL.Enable = true

	config.Net.SASL.Mechanism = "PLAIN"

	config.Consumer.Return.Errors = true

	config.Group.Return.Notifications = true

	config.Net.MaxOpenRequests = 100

	config.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second

	config.Consumer.Offsets.Initial = sarama.OffsetOldest

	config.Net.SASL.User = con.User

	config.Net.SASL.Password = con.Passwd



	consumer, err := cluster.NewConsumer([]string{con.BrokerURL}, con.GroupID, []string{con.Topic}, config)

	if err != nil {

		return

	}

	defer consumer.Close()



	var startTime int64

	if con.StartTime == "" {

		startTime = 0

	} else {

		formatTime, err := time.ParseInLocation("2006-01-02 15:04:05", con.StartTime, time.Local)

		if err != nil {

			fmt.Println(err)

			return

		}

		startTime = formatTime.Unix()

	}



	// trap SIGINT to trigger a shutdown

	signals := make(chan os.Signal, 1)

	signal.Notify(signals, os.Interrupt)



	// consume errors

	go func() {

		for err := range consumer.Errors() {

			panic(err)

		}

	}()



	// consume notifications

	go func() {

		for ntf := range consumer.Notifications() {

			fmt.Printf("Rebalanced: %+v\n", ntf)

		}

	}()



	// consume messages, watch signals

	var successes int

Loop:

	for {

		select {

		case msg, ok := <-consumer.Messages():

			if ok {

				if msg.Timestamp.Unix() < startTime {

					continue

				}

				table, operation, sql, err := parseData(msg.Value)

				if err != nil {

					fmt.Println("Create consumer failed,", err)

					return

				}

				fmt.Fprintf(os.Stdout, "%s:%s/%d/%d\tdb:%s\ttable:%s\toperation:%s\tsql:%s\n",

					"group_id", "topic",

					msg.Partition, msg.Offset, msg.Key, table, operation, sql)



				consumer.MarkOffset(msg, "")

				successes++

			}



		case <-signals:

			break Loop

		}

	}

	fmt.Fprintf(os.Stdout, "%s consume %d messages \n", con.GroupID, successes)

}



func parseData(data []byte) (string, string, string, error) {

	type SubData struct {

		TableName string `json:"table"`

		Operation string `json:"operation"`

		Sql       string `json:"sql"`

	}



	d := SubData{}

	err := json.Unmarshal(data, &d)

	if err != nil {

		return "", "", "", fmt.Errorf("Data format is wrong.")

	}



	return d.TableName, d.Operation, d.Sql, nil

}

文档导读
纯净模式常规模式

纯净模式

点击可全屏预览文档内容
文档反馈