博客
关于我
Flink资料合集
阅读量:666 次
发布时间:2019-03-15

本文共 2690 字,大约阅读时间需要 8 分钟。

官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/sql/queries.html

需求一:每1分钟汇总一次当天汇总数据

需求产生的原因:在flinksql中,每来一条记录,就会触发一次写数据库,记录来的太频繁,导致写入的数据库的记录数过多

解决方法:可以通过开启minibatch来实现

调整批量计算大小一般写在每个项目: 代号=init 的SQL里面```sql-- N秒计算一次,或是N条计算一次,调大可以提升处理效率set `table.exec.mini-batch.enabled`=`true`; set `table.exec.mini-batch.allow-latency`=`10s`;set `table.exec.mini-batch.size`=`100000`;
需求二:需要将每次汇总的数据持久化到存储中

需求产生的原因:在flinksql下,通过group by出来的数据都是CDC类型的(有delete、insert),但需要记录每次计算的结果,所以不能delete掉上一次计算group by出来的数据

set `table.dynamic-table-options.enabled`= `true`;  --支持动态定义表选项select .... from table_name/*+OPTIONS('maxwell-json-ext.ignore-delete'='true' --忽略delete操作,'maxwell-json-ext.update-to-insert'='true' --将update转成insert数据 )*/--忽略delete操作
问题一:mysql中的tinyint类型映射到flink上报错

在这里插入图片描述

解决方式:
编辑表的数据库连接,添加参数 tinyInt1isBit=false
例如:url=jdbc:mysql://br_live_prop_3323_mydb.lzdb.com:3323/live_prop?useCursorFetch=true&defaultFetchSize=5000&tinyInt1isBit=false
参考文档:https://www.jianshu.com/p/6885cad1cb14

需求三:自动生成测试数据:flink 1.11 中的随机数据生成器- DataGen connector

详见资料

https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/datagen.html

需求四:分割字符串,将分割后的结果达成多列:自定义表函数

第一步、自定义表函数继承TableFunction

import org.apache.flink.table.annotation.DataTypeHint;import org.apache.flink.table.annotation.FunctionHint;import org.apache.flink.table.api.*;import org.apache.flink.table.functions.TableFunction;import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.*;@FunctionHint(output = @DataTypeHint("ROW
"))public static class SplitFunction extends TableFunction
{ public void eval(String str) { for (String s : str.split(" ")) { // use collect(...) to emit a row collect(Row.of(s, s.length())); } }}

第二步、注册表函数,略

第三步、在flinksql中调用

SELECT myField, word, length FROM MyTable LEFT JOIN LATERAL TABLE(SplitFunction(myField)) ON TRUE

其他详情请参考官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/functions/udfs.html

需求五:用NULL填充某个字段
hive里的用法 flink里的用法
null as user_id cast (null as bigint) as user_id
需求六:由于平台问题,项目无法正常启动,只能强行kill掉application,要怎么快速恢复数据
设置kafka的消费起始值:'scan.startup.mode'='timestamp','scan.startup.timestamp-millis'='1617551100000'   --从该kafka这一时间戳(毫秒值)记录的开始消费
问题七:Flink的背压问题

详情请见:https://blog.csdn.net/zc19921215/article/details/109246591

问题八:Flink SQL状态越来越多

详情请见:https://cloud.tencent.com/developer/article/1452854

问题九:Flink最终结果追加到mysql出现记录丢失的情况

问题排查方式:通过双写到hbase,对比hbase和mysql的记录,确定最终程序的计算结果没有问题。有问题的是写入mysql的过程中出现了问题,打印写mysql的日志,发现key值同设定的主键不一致,发现是在flink的平台上定义的mysql表主键定义有问题。

在这里插入图片描述
注意:在修改mysql主键的时候,除了要修改数据库里mysql的主键,同时要修改flink上定义的mysql表的主键,否则程序不会报错,但写入mysql会有数据丢失的问题!!!

你可能感兴趣的文章
Nginx配置限流,技能拉满!
查看>>
Nginx配置静态代理/静态资源映射时root与alias的区别,带前缀映射用alias
查看>>
Nginx面试三连问:Nginx如何工作?负载均衡策略有哪些?如何限流?
查看>>
nginx:/usr/src/fastdfs-nginx-module/src/common.c:21:25:致命错误:fdfs_define.h:没有那个文件或目录 #include
查看>>
Nginx:NginxConfig可视化配置工具安装
查看>>
ngModelController
查看>>
ngrok | 内网穿透,支持 HTTPS、国内访问、静态域名
查看>>
ngrok内网穿透可以实现资源共享吗?快解析更加简洁
查看>>
ngrok内网穿透可以实现资源共享吗?快解析更加简洁
查看>>
NHibernate学习[1]
查看>>
NHibernate异常:No persister for的解决办法
查看>>
nid修改oracle11gR2数据库名
查看>>
NIFI1.21.0/NIFI1.22.0/NIFI1.24.0/NIFI1.26.0_2024-06-11最新版本安装_采用HTTP方式_搭建集群_实际操作---大数据之Nifi工作笔记0050
查看>>
NIFI1.21.0_java.net.SocketException:_Too many open files 打开的文件太多_实际操作---大数据之Nifi工作笔记0051
查看>>
NIFI1.21.0_Mysql到Mysql增量CDC同步中_日期类型_以及null数据同步处理补充---大数据之Nifi工作笔记0057
查看>>
NIFI1.21.0_Mysql到Mysql增量CDC同步中_补充_更新时如果目标表中不存在记录就改为插入数据_Postgresql_Hbase也适用---大数据之Nifi工作笔记0059
查看>>
NIFI1.21.0_NIFI和hadoop蹦了_200G集群磁盘又满了_Jps看不到进程了_Unable to write in /tmp. Aborting----大数据之Nifi工作笔记0052
查看>>
NIFI1.21.0_Postgresql和Mysql同时指定库_指定多表_全量同步到Mysql数据库以及Hbase数据库中---大数据之Nifi工作笔记0060
查看>>
NIFI1.21.0最新版本安装_连接phoenix_单机版_Https登录_什么都没改换了最新版本的NIFI可以连接了_气人_实现插入数据到Hbase_实际操作---大数据之Nifi工作笔记0050
查看>>
NIFI1.21.0最新版本安装_配置使用HTTP登录_默认是用HTTPS登录的_Https登录需要输入用户名密码_HTTP不需要---大数据之Nifi工作笔记0051
查看>>