外链网站推荐网站的整合
- 作者: 五速梦信息网
- 时间: 2026年04月20日 08:20
当前位置: 首页 > news >正文
外链网站推荐,网站的整合,wordpress启动插件出错,国内做航模比较好的网站Flink系列之#xff1a;Elasticsearch SQL 连接器 一、Elasticsearch SQL 连接器二、创建 Elasticsearch表三、连接器参数四、Key 处理五、动态索引六、数据类型映射 一、Elasticsearch SQL 连接器 Sink: BatchSink: Streaming Append Upsert ModeElasticsearch 连接器… Flink系列之Elasticsearch SQL 连接器 一、Elasticsearch SQL 连接器二、创建 Elasticsearch表三、连接器参数四、Key 处理五、动态索引六、数据类型映射 一、Elasticsearch SQL 连接器 Sink: BatchSink: Streaming Append Upsert ModeElasticsearch 连接器允许将数据写入到 Elasticsearch 引擎的索引中。本文档描述运行 SQL 查询时如何设置 Elasticsearch 连接器。连接器可以工作在 upsert 模式使用 DDL 中定义的主键与外部系统交换 UPDATE/DELETE 消息。如果 DDL 中没有定义主键那么连接器只能工作在 append 模式只能与外部系统交换 INSERT 消息。 二、创建 Elasticsearch表 以下示例展示了如何创建 Elasticsearch sink 表 CREATE TABLE myUserTable (user_id STRING,user_name STRING,uv BIGINT,pv BIGINT,PRIMARY KEY (user_id) NOT ENFORCED ) WITH (connector elasticsearch-7,hosts http://localhost:9200,index users );三、连接器参数 参数是否必选默认值数据类型描述connector必选(none)String指定要使用的连接器有效值为elasticsearch-6连接到 Elasticsearch 6.x 的集群。elasticsearch-7连接到 Elasticsearch 7.x 及更高版本的集群。hosts必选(none)String要连接到的一台或多台 Elasticsearch 主机例如 ‘http://host_name:9092;http://host_name:9093’。index必选(none)StringElasticsearch 中每条记录的索引。可以是一个静态索引例如 ‘myIndex’或一个动态索引例如 index-{logtsdocument-type6.x 版本中必选(none)StringElasticsearch 文档类型。在 elasticsearch-7 中不再需要。document-id.key-delimiter可选-String复合键的分隔符默认为“例如指定为” 将导致文档 I D 为 K E Y 1 将导致文档 ID 为KEY1 将导致文档ID为KEY1KEY2$KEY3。username可选(none)String用于连接 Elasticsearch 实例的用户名。请注意Elasticsearch 没有预绑定安全特性但你可以通过如下指南启用它来保护 Elasticsearch 集群。password可选(none)String用于连接 Elasticsearch 实例的密码。如果配置了username则此选项也必须配置为非空字符串。failure-handler可选failString对 Elasticsearch 请求失败情况下的失败处理策略。有效策略为fail如果请求失败并因此导致作业失败则抛出异常。ignore忽略失败并放弃请求。retry-rejected重新添加由于队列容量饱和而失败的请求。自定义类名称使用 ActionRequestFailureHandler 的子类进行失败处理。sink.delivery-guarantee可选AT_LEAST_ONCEString承诺时可选择交付保证。有效值为EXACTLY_ONCE在故障转移情况下记录也仅传递一次。AT_LEAST_ONCE确保记录被传递但可能会发生同一条记录被传递多次的情况。NONE尽力提供记录。sink.flush-on-checkpoint可选trueBoolean在进行 checkpoint 时是否保证刷出缓冲区中的数据。如果关闭这一选项在进行checkpoint时 sink 将不再为所有进行 中的请求等待 Elasticsearch 的执行完成确认。因此在这种情况下 sink 将不对至少一次的请求的一致性提供任何保证。sink.bulk-flush.max-actions可选1000Integer每个批量请求的最大缓冲操作数。 可以设置为’0’来禁用它。sink.bulk-flush.max-size可选2mbMemorySize每个批量请求的缓冲操作在内存中的最大值。单位必须为 MB。 可以设置为’0’来禁用它。sink.bulk-flush.interval可选1sDurationflush 缓冲操作的间隔。 可以设置为’0’来禁用它。注意sink.bulk-flush.max-size’和’sink.bulk-flush.max-actions’都设置为’0’的这种 flush 间隔设置允许对缓冲操作进行完全异步处理。sink.bulk-flush.backoff.strategy可选DISABLEDString指定在由于临时请求错误导致任何 flush 操作失败时如何执行重试。有效策略为DISABLED不执行重试即第一次请求错误后失败。CONSTANT等待重试之间的回退延迟。EXPONENTIAL先等待回退延迟然后在重试之间指数递增。sink.bulk-flush.backoff.max-retries可选(none)Integer最大回退重试次数。sink.bulk-flush.backoff.delay可选(none)Duration每次退避尝试之间的延迟。对于 CONSTANT 退避策略该值是每次重试之间的延迟。对于 EXPONENTIAL 退避策略该值是初始的延迟。connection.path-prefix可选(none)String添加到每个 REST 通信中的前缀字符串例如‘/v1’。connection.request-timeout可选(none)Duration从连接管理器请求连接的超时时间。超时时间必须大于或者等于 0如果设置为 0 则是无限超时。connection.timeout可选(none)Duration建立请求的超时时间 。超时时间必须大于或者等于 0 如果设置为 0 则是无限超时。socket.timeout可选(none)Duration等待数据的 socket 的超时时间 (SO_TIMEOUT)。超时时间必须大于或者等于 0如果设置为 0 则是无限超时。format可选jsonStringElasticsearch 连接器支持指定格式。该格式必须生成一个有效的 json 文档。 默认使用内置的 ‘json’ 格式。 四、Key 处理 Elasticsearch sink 可以根据是否定义了一个主键来确定是在 upsert 模式还是 append 模式下工作。 如果定义了主键Elasticsearch sink 将以 upsert 模式工作该模式可以消费包含 UPDATE/DELETE 消息的查询。 如果未定义主键Elasticsearch sink 将以 append 模式工作该模式只能消费包含 INSERT 消息的查询。在 Elasticsearch 连接器中主键用于计算 Elasticsearch 的文档 id文档 id 为最多 512 字节且不包含空格的字符串。 Elasticsearch 连接器通过使用 document-id.key-delimiter 指定的键分隔符按照 DDL 中定义的顺序连接所有主键字段为每一行记录生成一个文档 ID 字符串。 某些类型不允许作为主键字段因为它们没有对应的字符串表示形式例如BYTESROWARRAYMAP 等。 如果未指定主键Elasticsearch 将自动生成文档 id。 五、动态索引 Elasticsearch sink 同时支持静态索引和动态索引。如果你想使用静态索引则 index 选项值应为纯字符串例如 ‘myusers’所有记录都将被写入到 “myusers” 索引中。如果你想使用动态索引你可以使用 {field_name} 来引用记录中的字段值来动态生成目标索引。 你也可以使用 ‘{field_name|date_format_string}’ 将 TIMESTAMP/DATE/TIME 类型的字段值转换为 date_format_string 指定的格式。 date_format_string 与 Java 的 DateTimeFormatter 兼容。 例如如果选项值设置为 ‘myusers-{log_ts|yyyy-MM-dd}’则 log_ts 字段值为 2020-03-27 12:25:55 的记录将被写入到 “myusers-2020-03-27” 索引中。你也可以使用 ‘{now()|date_format_string}’ 将当前的系统时间转换为 date_format_string 指定的格式。now() 对应的时间类型是 TIMESTAMP_WITH_LTZ 。 在将系统时间格式化为字符串时会使用 session 中通过 table.local-time-zone 中配置的时区。 使用 NOW(), now(), CURRENT_TIMESTAMP, current_timestamp 均可以。注意: 使用当前系统时间生成的动态索引时 对于 changelog 的流无法保证同一主键对应的记录能产生相同的索引名, 因此使用基于系统时间的动态索引只能支持 append only 的流。 六、数据类型映射 Elasticsearch 将文档存储在 JSON 字符串中。因此数据类型映射介于 Flink 数据类型和 JSON 数据类型之间。 Flink 为 Elasticsearch 连接器使用内置的 ‘json’ 格式。 下表列出了 Flink 中的数据类型与 JSON 中的数据类型的映射关系。 Flink SQL类型JSON类型CHAR/VARCHAR/STRINGstringBOOLEANbooleanBINARY/VARBINARYstring with encoding: base64DECIMALnumberTINYINTnumberSMALLINTnumberINTnumberBIGINTnumberFLOATnumberDOUBLEnumberDATEstring with format: dateTIMEstring with format: timeTIMESTAMPstring with format: date-timeTIMESTAMP_WITH_LOCAL_TIME_ZONEstring with format: date-time (with UTC time zone)INTERVALnumberARRAYarrayMAP / MULTISETobjectROWobject
- 上一篇: 外链生成器seo短视频加密路线
- 下一篇: 外链网站有哪些公众号运营收费价格表
相关文章
-
外链生成器seo短视频加密路线
外链生成器seo短视频加密路线
- 技术栈
- 2026年04月20日
-
外汇网站源码 asp凡科建站的应用场景
外汇网站源码 asp凡科建站的应用场景
- 技术栈
- 2026年04月20日
-
外汇申报在哪个网站上做海外新闻发布
外汇申报在哪个网站上做海外新闻发布
- 技术栈
- 2026年04月20日
-
外链网站有哪些公众号运营收费价格表
外链网站有哪些公众号运营收费价格表
- 技术栈
- 2026年04月20日
-
外卖网站制作wordpress 公司插件
外卖网站制作wordpress 公司插件
- 技术栈
- 2026年04月20日
-
外卖优惠券网站怎么做1网站免费建站
外卖优惠券网站怎么做1网站免费建站
- 技术栈
- 2026年04月20日
