最近更新时间:2021-06-08 15:56:27
go_demo是对应的go语言实现,后续会补充java、C++、python等语言的实现。如果您在使用过程中发现问题或者有好的实现方式,请联系客服,我们会在第一时间处理。
go_demo这个示例展示了如何使用数据订阅从DTS获取数据并且解析数据。 整个的流程包含了:用户进行参数配置、使用原生的kafka consumer从DTS获取增量数据、将获取到数据进行解析,从中获取对应的库名、表名、SQL语句、点位信息等。
使用的时候请替换: config.User = “” config.Passwd = “” // kafka broker url config.BrokerURL = “” // kafka consumer group name config.GroupID = “” // topic to consume, partition is 0 config.Topic = “” config.StartTime = “”
如果您希望订阅响应topic全部消息(即从订阅任务创建开始所有增量数据),config.StartTime则置为空。如果您希望从可选时间范围内某一时刻开始订阅数据,则将config.StartTime置为某一时刻值,时间格式为"2006-01-02 15:04:05"。 配置完成后,直接运行go run go_demo.go,即可订阅数据。您也可以根据自己的需要对订阅结果进行相应处理。
package main
import (
"encoding/json"
"fmt"
"github.com/Shopify/sarama"
cluster "github.com/bsm/sarama-cluster"
"os"
"os/signal"
"time"
)
type KafkaConfig struct {
User string
Passwd string
BrokerURL string
GroupID string
Topic string
StartTime string
}
func main() {
var config KafkaConfig
config.User = ""
config.Passwd = ""
config.BrokerURL = ""
config.GroupID = ""
config.Topic = ""
config.StartTime = ""
clusterConsumer(config)
}
func clusterConsumer(con KafkaConfig) {
config := cluster.NewConfig()
config.Version = sarama.V2_3_0_0
config.Net.SASL.Enable = true
config.Net.SASL.Mechanism = "PLAIN"
config.Consumer.Return.Errors = true
config.Group.Return.Notifications = true
config.Net.MaxOpenRequests = 100
config.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second
config.Consumer.Offsets.Initial = sarama.OffsetOldest
config.Net.SASL.User = con.User
config.Net.SASL.Password = con.Passwd
consumer, err := cluster.NewConsumer([]string{con.BrokerURL}, con.GroupID, []string{con.Topic}, config)
if err != nil {
return
}
defer consumer.Close()
var startTime int64
if con.StartTime == "" {
startTime = 0
} else {
formatTime, err := time.ParseInLocation("2006-01-02 15:04:05", con.StartTime, time.Local)
if err != nil {
fmt.Println(err)
return
}
startTime = formatTime.Unix()
}
// trap SIGINT to trigger a shutdown
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
// consume errors
go func() {
for err := range consumer.Errors() {
panic(err)
}
}()
// consume notifications
go func() {
for ntf := range consumer.Notifications() {
fmt.Printf("Rebalanced: %+v\n", ntf)
}
}()
// consume messages, watch signals
var successes int
Loop:
for {
select {
case msg, ok := <-consumer.Messages():
if ok {
if msg.Timestamp.Unix() < startTime {
continue
}
table, operation, sql, err := parseData(msg.Value)
if err != nil {
fmt.Println("Create consumer failed,", err)
return
}
fmt.Fprintf(os.Stdout, "%s:%s/%d/%d\tdb:%s\ttable:%s\toperation:%s\tsql:%s\n",
"group_id", "topic",
msg.Partition, msg.Offset, msg.Key, table, operation, sql)
consumer.MarkOffset(msg, "")
successes++
}
case <-signals:
break Loop
}
}
fmt.Fprintf(os.Stdout, "%s consume %d messages \n", con.GroupID, successes)
}
func parseData(data []byte) (string, string, string, error) {
type SubData struct {
TableName string `json:"table"`
Operation string `json:"operation"`
Sql string `json:"sql"`
}
d := SubData{}
err := json.Unmarshal(data, &d)
if err != nil {
return "", "", "", fmt.Errorf("Data format is wrong.")
}
return d.TableName, d.Operation, d.Sql, nil
}
纯净模式
鼠标选中内容,快速反馈问题