做视频付费网站科技幻想画

当前位置: 首页 > news >正文

做视频付费网站,科技幻想画,天津校园文化设计公司,矢量图免费素材网实时从TDengine数据库采集数据到Kafka Topic 一、认识TDengine二、TDengine Kafka Connector三、什么是 Kafka Connect#xff1f;四、前置条件五、安装 TDengine Connector 插件六、启动 Kafka七、验证 kafka Connect 是否启动成功八、TDengine Source Connector 的使用九、添… 实时从TDengine数据库采集数据到Kafka Topic 一、认识TDengine二、TDengine Kafka Connector三、什么是 Kafka Connect四、前置条件五、安装 TDengine Connector 插件六、启动 Kafka七、验证 kafka Connect 是否启动成功八、TDengine Source Connector 的使用九、添加 Source Connector 配置文件十、准备测试数据十一、创建 Source Connector 实例十二、查看 topic 数据十三、unload 插件十四、性能调优十五、配置参考通用配置TDengine Source Connector 特有的配置 十六、更多内容 一、认识TDengine TDengine是一款高性能、高稳定性的开源时间序列数据库。它是由中国的PingCAP团队开发并开源的旨在为大规模数据存储和实时分析提供解决方案。TDengine具有以下特点 高性能TDengine使用了多种优化技术如数据压缩、索引优化和并行计算以实现高性能的数据写入和查询。它能够处理大规模的数据并且在毫秒级的响应时间内提供查询结果。高稳定性TDengine具有良好的容错和恢复机制能够保证数据的持久性和可靠性。它支持数据的多副本备份和自动故障转移以及数据一致性和完整性的检查。时间序列支持TDengine专注于时间序列数据的存储和分析能够高效地处理时间序列数据的写入、查询和聚合操作。它支持多种数据类型和数据模型如数字、文本、地理位置和时间等。开源TDengine是一个开源项目遵循Apache 2.0许可证。用户可以自由地使用、修改和分发该软件同时也可以参与到开发和改进过程中。总之TDengine是一款专注于时间序列数据存储和分析的高性能、高稳定性的开源数据库适用于大规模数据存储和实时分析的场景。 二、TDengine Kafka Connector TDengine Kafka Connector 包含两个插件: TDengine Source Connector 和 TDengine Sink Connector。用户只需提供简单的配置文件就可以将 Kafka 中指定 topic 的数据批量或实时同步到 TDengine 或将 TDengine 中指定数据库的数据批量或实时同步到 Kafka。 三、什么是 Kafka Connect Kafka Connect 是 Apache Kafka 的一个组件用于使其它系统比如数据库、云服务、文件系统等能方便地连接到 Kafka。数据既可以通过 Kafka Connect 从其它系统流向 Kafka, 也可以通过 Kafka Connect 从 Kafka 流向其它系统。从其它系统读数据的插件称为 Source Connector, 写数据到其它系统的插件称为 Sink Connector。Source Connector 和 Sink Connector 都不会直接连接 Kafka BrokerSource Connector 把数据转交给 Kafka Connect。Sink Connector 从 Kafka Connect 接收数据。 TDengine Source Connector 用于把数据实时地从 TDengine 读出来发送给 Kafka Connect。TDengine Sink Connector 用于 从 Kafka Connect 接收数据并写入 TDengine。 四、前置条件 运行本教程中示例的前提条件。 Linux 操作系统已安装 Java 8 和 Maven已安装 Git、curl、vi已安装并启动 TDengine。 五、安装 TDengine Connector 插件 编译插件 git clone –branch 3.0 https://github.com/taosdata/kafka-connect-tdengine.git cd kafka-connect-tdengine mvn clean package -Dmaven.test.skiptrue unzip -d \(KAFKA_HOME/components/ target/components/packages/taosdata-kafka-connect-tdengine-*.zip以上脚本先 clone 项目源码然后用 Maven 编译打包。打包完成后在 target/components/packages/ 目录生成了插件的 zip 包。把这个 zip 包解压到安装插件的路径即可。上面的示例中使用了内置的插件安装路径 \)KAFKA_HOME/components/。 配置插件 将 kafka-connect-tdengine 插件加入 \(KAFKA_HOME/config/connect-distributed.properties 配置文件 plugin.path 中 plugin.path/usr/share/java,/opt/kafka/components六、启动 Kafka zookeeper-server-start.sh -daemon \)KAFKA_HOME/config/zookeeper.propertieskafka-server-start.sh -daemon \(KAFKA_HOME/config/server.propertiesconnect-distributed.sh -daemon \)KAFKA_HOME/config/connect-distributed.properties七、验证 kafka Connect 是否启动成功 输入命令 curl http://localhost:8083/connectors如果各组件都启动成功会得到如下输出 []八、TDengine Source Connector 的使用 TDengine Source Connector 的作用是将 TDengine 某个数据库某一时刻之后的数据全部推送到 Kafka。TDengine Source Connector 的实现原理是先分批拉取历史数据再用定时查询的策略同步增量数据。同时会监控表的变化可以自动同步新增的表。如果重启 Kafka Connect, 会从上次中断的位置继续同步。 TDengine Source Connector 会将 TDengine 数据表中的数据转换成 InfluxDB Line 协议格式 或 OpenTSDB JSON 协议格式然后写入 Kafka。 下面的示例程序同步数据库 test 中的数据到主题 tdengine-test-meters。 九、添加 Source Connector 配置文件 vi source-demo.json输入以下内容 source-demo.json{name:TDengineSourceConnector,config:{connector.class: com.taosdata.kafka.connect.source.TDengineSourceConnector,tasks.max: 1,subscription.group.id: source-demo,connection.url: jdbc:TAOS://127.0.0.1:6030,connection.user: root,connection.password: taosdata,connection.database: test,connection.attempts: 3,connection.backoff.ms: 5000,topic.prefix: tdengine,topic.delimiter: -,poll.interval.ms: 1000,fetch.max.rows: 100,topic.per.stable: true,topic.ignore.db: false,out.format: line,data.precision: ms,key.converter: org.apache.kafka.connect.storage.StringConverter,value.converter: org.apache.kafka.connect.storage.StringConverter} }十、准备测试数据 准备生成测试数据的 SQL 文件。 prepare-source-data.sqlDROP DATABASE IF EXISTS test; CREATE DATABASE test; USE test; CREATE STABLE meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT);INSERT INTO d1001 USING meters TAGS(California.SanFrancisco, 2) VALUES(2018-10-03 14:38:05.000,10.30000,219,0.31000) \d1001 USING meters TAGS(California.SanFrancisco, 2) VALUES(2018-10-03 14:38:15.000,12.60000,218,0.33000) \d1001 USING meters TAGS(California.SanFrancisco, 2) VALUES(2018-10-03 14:38:16.800,12.30000,221,0.31000) \d1002 USING meters TAGS(California.SanFrancisco, 3) VALUES(2018-10-03 14:38:16.650,10.30000,218,0.25000) \d1003 USING meters TAGS(California.LosAngeles, 2) VALUES(2018-10-03 14:38:05.500,11.80000,221,0.28000) \d1003 USING meters TAGS(California.LosAngeles, 2) VALUES(2018-10-03 14:38:16.600,13.40000,223,0.29000) \d1004 USING meters TAGS(California.LosAngeles, 3) VALUES(2018-10-03 14:38:05.000,10.80000,223,0.29000) \d1004 USING meters TAGS(California.LosAngeles, 3) VALUES(2018-10-03 14:38:06.500,11.50000,221,0.35000); 使用 TDengine CLI, 执行 SQL 文件。 taos -f prepare-source-data.sql十一、创建 Source Connector 实例 curl -X POST -d source-demo.json http://localhost:8083/connectors -H Content-Type: application/json十二、查看 topic 数据 使用 kafka-console-consumer 命令行工具监控主题 tdengine-test-meters 中的数据。一开始会输出所有历史数据 往 TDengine 插入两条新的数据之后kafka-console-consumer 也立即输出了新增的两条数据。 输出数据 InfluxDB line protocol 的格式。 kafka-console-consumer.sh –bootstrap-server localhost:9092 –from-beginning –topic tdengine-test-meters输出 …… meters,locationCalifornia.SanFrancisco,groupid2i32 current10.3f32,voltage219i32,phase0.31f32 1538548685000000000 meters,locationCalifornia.SanFrancisco,groupid2i32 current12.6f32,voltage218i32,phase0.33f32 1538548695000000000 ……此时会显示所有历史数据。切换到 TDengine CLI 插入两条新的数据 USE test; INSERT INTO d1001 VALUES (now, 13.3, 229, 0.38); INSERT INTO d1002 VALUES (now, 16.3, 233, 0.22);再切换回 kafka-console-consumer 此时命令行窗口已经打印出刚插入的 2 条数据。 十三、unload 插件 测试完毕之后用 unload 命令停止已加载的 connector。 查看当前活跃的 connector curl http://localhost:8083/connectors如果按照前述操作此时应有两个活跃的 connector。使用下面的命令 unload curl -X DELETE http://localhost:8083/connectors/TDengineSinkConnector curl -X DELETE http://localhost:8083/connectors/TDengineSourceConnector十四、性能调优 如果在从 TDengine 同步数据到 Kafka 的过程中发现性能不达预期可以尝试使用如下参数提升 Kafka 的写入吞吐量。 打开 KAFKA_HOME/config/producer.properties 配置文件。参数说明及配置建议如下 参数参数说明设置建议producer.type此参数用于设置消息的发送方式默认值为 sync 表示同步发送async 表示异步发送。采用异步发送能够提升消息发送的吞吐量。asyncrequest.required.acks参数用于配置生产者发送消息后需要等待的确认数量。当设置为1时表示只要领导者副本成功写入消息就会给生产者发送确认而无需等待集群中的其他副本写入成功。这种设置可以在一定程度上保证消息的可靠性同时也能保证一定的吞吐量。因为不需要等待所有副本都写入成功所以可以减少生产者的等待时间提高发送消息的效率。1max.request.size该参数决定了生产者在一次请求中可以发送的最大数据量。其默认值为 1048576也就是 1M。如果设置得太小可能会导致频繁的网络请求降低吞吐量。如果设置得太大可能会导致内存占用过高或者在网络状况不佳时增加请求失败的概率。建议设置为 100M。104857600batch.size此参数用于设定 batch 的大小默认值为 16384即 16KB。在消息发送过程中发送到 Kafka 缓冲区中的消息会被划分成一个个的 batch。故而减小 batch 大小有助于降低消息延迟而增大 batch 大小则有利于提升吞吐量可根据实际的数据量大小进行合理配置。可根据实际情况进行调整建议设置为 512K。524288buffer.memory此参数用于设置生产者缓冲待发送消息的内存总量。较大的缓冲区可以允许生产者积累更多的消息后批量发送提高吞吐量但也会增加延迟和内存使用。可根据机器资源来配置建议配置为 1G。1073741824 十五、配置参考 通用配置 以下配置项对 TDengine Sink Connector 和 TDengine Source Connector 均适用。 name: connector 名称。connector.class: connector 的完整类名 如: com.taosdata.kafka.connect.sink.TDengineSinkConnector。tasks.max: 最大任务数, 默认 1。topics: 需要同步的 topic 列表 多个用逗号分隔, 如 topic1,topic2。connection.url: TDengine JDBC 连接字符串 如 jdbc:TAOS://127.0.0.1:6030。connection.user TDengine 用户名 默认 root。connection.password TDengine 用户密码 默认 taosdata。connection.attempts 最大尝试连接次数。默认 3。connection.backoff.ms 创建连接失败重试时间隔时间单位为 ms。 默认 5000。 TDengine Source Connector 特有的配置 connection.database: 源数据库名称无缺省值。topic.prefix 数据导入 kafka 时使用的 topic 名称的前缀。默认为空字符串 “”。timestamp.initial: 数据同步起始时间。格式为’yyyy-MM-dd HH:mm:ss’若未指定则从指定 DB 中最早的一条记录开始。poll.interval.ms: 检查是否有新建或删除的表的时间间隔单位为 ms。默认为 1000。fetch.max.rows : 检索数据库时最大检索条数。 默认为 100。query.interval.ms: 从 TDengine 一次读取数据的时间跨度需要根据表中的数据特征合理配置避免一次查询的数据量过大或过小在具体的环境中建议通过测试设置一个较优值默认值为 0即获取到当前最新时间的所有数据。out.format : 结果集输出格式。line 表示输出格式为 InfluxDB Line 协议格式json 表示输出格式是 json。默认为 line。data.precision: 使用 InfluxDB 行协议格式时时间戳的精度。可选值为 ms 表示毫秒us 表示微秒ns 表示纳秒。 topic.per.stable: 如果设置为 true表示一个超级表对应一个 Kafka topictopic的命名规则 topic.prefixtopic.delimiterconnection.databasetopic.delimiter stable.name如果设置为 false则指定的 DB 中的所有数据进入一个 Kafka topictopic 的命名规则为 topic.prefixtopic.delimiterconnection.databasetopic.ignore.db: topic 命名规则是否包含 database 名称true 表示规则为 topic.prefixtopic.delimiterstable.namefalse 表示规则为 topic.prefixtopic.delimiterconnection.databasetopic.delimiterstable.name默认 false。此配置项在 topic.per.stable 设置为 false 时不生效。topic.delimiter: topic 名称分割符默认为 -。read.method: 从 TDengine 读取数据方式query 或是 subscription。默认为 subscription。subscription.group.id: 指定 TDengine 数据订阅的组 id当 read.method 为 subscription 时此项为必填项。subscription.from: 指定 TDengine 数据订阅起始位置latest 或是 earliest。默认为 latest。 十六、更多内容 更多内容请参阅官网 https://docs.taosdata.com/third-party/collection/kafka/