本文共 2690 字,大约阅读时间需要 8 分钟。
官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/sql/queries.html
需求产生的原因:在flinksql中,每来一条记录,就会触发一次写数据库,记录来的太频繁,导致写入的数据库的记录数过多
解决方法:可以通过开启minibatch来实现调整批量计算大小一般写在每个项目: 代号=init 的SQL里面```sql-- N秒计算一次,或是N条计算一次,调大可以提升处理效率set `table.exec.mini-batch.enabled`=`true`; set `table.exec.mini-batch.allow-latency`=`10s`;set `table.exec.mini-batch.size`=`100000`;
需求产生的原因:在flinksql下,通过group by出来的数据都是CDC类型的(有delete、insert),但需要记录每次计算的结果,所以不能delete掉上一次计算group by出来的数据
set `table.dynamic-table-options.enabled`= `true`; --支持动态定义表选项select .... from table_name/*+OPTIONS('maxwell-json-ext.ignore-delete'='true' --忽略delete操作,'maxwell-json-ext.update-to-insert'='true' --将update转成insert数据 )*/--忽略delete操作
详见资料
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/datagen.html第一步、自定义表函数继承TableFunction
import org.apache.flink.table.annotation.DataTypeHint;import org.apache.flink.table.annotation.FunctionHint;import org.apache.flink.table.api.*;import org.apache.flink.table.functions.TableFunction;import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.*;@FunctionHint(output = @DataTypeHint("ROW"))public static class SplitFunction extends TableFunction { public void eval(String str) { for (String s : str.split(" ")) { // use collect(...) to emit a row collect(Row.of(s, s.length())); } }}
第二步、注册表函数,略
第三步、在flinksql中调用SELECT myField, word, length FROM MyTable LEFT JOIN LATERAL TABLE(SplitFunction(myField)) ON TRUE
其他详情请参考官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/functions/udfs.html
hive里的用法 | flink里的用法 |
---|---|
null as user_id | cast (null as bigint) as user_id |
设置kafka的消费起始值:'scan.startup.mode'='timestamp','scan.startup.timestamp-millis'='1617551100000' --从该kafka这一时间戳(毫秒值)记录的开始消费
详情请见:https://blog.csdn.net/zc19921215/article/details/109246591
详情请见:https://cloud.tencent.com/developer/article/1452854
问题排查方式:通过双写到hbase,对比hbase和mysql的记录,确定最终程序的计算结果没有问题。有问题的是写入mysql的过程中出现了问题,打印写mysql的日志,发现key值同设定的主键不一致,发现是在flink的平台上定义的mysql表主键定义有问题。