都市在线 | 漳州发展网
ad1
您所在的位置: 都市在线 > 财经

Flink常见维表Join方案,收藏学习开发很有用!

来源:TechWeb    发布时间:2022-01-30 22:55   作者:如思   阅读量:7640   

前言

Flink常见维表Join方案,收藏学习开发很有用!

实时数仓,难免会遇到join维表的业务。现总结几种方案,供各位看官选择:

查找关联 状态编程,预加载数据到状态中,按需取 冷热数据 广播维表 Temporal Table Join Lookup Table Join

其中中间留下两个问题,供大家思考,可留言一起讨论。

查找关联

查找关联就是在主流数据中直接访问外部数据去根据主键或者某种关键条件去关联取值。据IT之家网友J极客投稿,昨日晚间,LinkedIn领英官方微博宣布,迈出了领英在中国市场新战略的第一步,正式发布全新应用“领英职场”。

适合: 维表数据量大,但是主数据不大的业务实时计算。

缺点:数据量大的时候,会给外部数据源库带来很大的压力,因为某条数据都需要关联。

同步

访问数据库是同步调用,导致 subtak 线程会被阻塞,影响吞吐量

defanalyses:Unit=valenv:StreamExecutionEnvironment=FlinkStreamEnv.getvalsource:DataStream=KafkaSourceEnv.getKafkaSourceStream).map(JSON.parseObject(_)).filter(_!=null).flatMap(newFlatMapFunction(JSONObject,String)overridedefflatMap(jSONObject:JSONObject,collector:Collector):Unit=//如果topic就一张表,不用区分,如果多张表,可以通过database与table区分,放到下一步去处理//表的名字valdatabaseName:String=jSONObject.getString("database")//表的名字valtableName:String=jSONObject.getString("table")//数据操作类型INSERTUPDATEDELETEvaloperationType:String=jSONObject.getString("type")//主体数据valtableData:JSONArray=jSONObject.getJSONArray("data")//oldvalold:JSONArray=jSONObject.getJSONArray("old")//canaljson可能存在批处理出现data数据多条for(i首先把维表数据初始化到state中,设置好更新时间,定时去把维表

优点:flink 自己维护状态数据,"荣辱与共",不需要频繁链接外部数据源,达到解耦。

缺点:不适合大的维表和变化大的维表。接下来将持续优化,为用户及企业客户打造全球领先的使用体验。

思考下:直接定义一个Map集合这样的优缺点是什么可以留言说出自己的看法

冷热数据

思想:先去状态去取,如果没有,去外部查询,同时去存到状态里面StateTtlConfig 的过期时间可以设置短点

优点:中庸取值方案,热备常用数据到内存,也避免了数据join相对过多外部数据源。

缺点:也不能一劳永逸解决某些问题,热备数据过多,或者冷数据过大,都会对state 或者 外部数据库造成压力。

比如上面提到的字典表,每一个Task都需要这份数据,那么需要join这份数据的时候就可以使用广播维表。

valdimStream=env.addSource//广播状态valbroadcastStateDesc=newMapStateDescriptor(String,String)("broadcaststate",BasicTypeInfo.STRING_TYPE_INFO,newMapTypeInfolt,gt,(Long.class,Dim.class))//广播流valbroadStream=dimStream.broadcast()//主数据流valmainConsumer=newFlinkKafkaConsumer("topic",newSimpleStringSchema(),kafkaConfig)valmainStream=env.addSource(mainConsumer)//广播状态与维度表关联valconnectedStream=mainStream.connect(broadStream).map(..User(id,name)).key(_.1)connectedStream.process(newKeyedBroadcastProcessFunction(String,User,Map(Long,Dim),String)overridedefprocessElement(value:User,ctx:KeyedBroadcastProcessFunction(String,User,Map(Long,Dim),String)#ReadOnlyContext,out:Collector):Unit=//取到数据就可以愉快的玩耍了valstate=ctx.getBroadcastState(broadcastStateDesc)xxxxxx

「思考:」 如果把维表流也通过实时监控binlog到kafka,当维度数据发生变化时,更新放到状态中,这种方式,是不是更具有时效性呢。

通过canal把变更binlog方式发送到kafka中。

数据流定义成为广播流,广播到数据到主数据流中。

定义一个广播状态存储数据,在主数据进行查找匹配,符合要求则join成功。。

Temporal Table Join

由于维表是一张不断变化的表那如何 JOIN 一张不断变化的表呢如果用传统的 JOIN 语法来表达维表 JOIN,是不完整的因为维表是一直在更新变化的,如果用这个语法那么关联上的是哪个时刻的维表呢我们是不知道的,结果是不确定的所以 Flink SQL 的维表 JOIN 语法引入了Temporal Table 的标准语法,用来声明关联的是维表哪个时刻的快照

普通关联会一直保留关联双侧的数据,数据也就会一直膨胀,直到撑爆内存导致任务失败,Temporal Join则可以定期清理过期数据,在合理的内存配置下即可避免内存溢出。

Event Time Temporal Join

语法

SELECTFROMtable1(AS)(LEFT)JOINtable2FORSYSTEM_TIMEASOFtable1.proctime

使用事件时间属性,可以检索过去某个时间点的键值这允许在一个共同的时间点连接两个表

举例

假设我们有一个订单表,每个订单都有不同货币的价格为了将此表正确地规范化为单一货币,每个订单都需要与下订单时的适当货币兑换率相结合

CREATETABLEorders,currencySTRING,order_timeTIMESTAMP(3),WATERMARKFORorder_timeASorder_time)WITH(/*...*/),CREATETABLEcurrency_rates(currencySTRING,conversion_rateDECIMAL(32,2),update_timeTIMESTAMP(3)METADATAFROM`values.source.timestamp`VIRTUALWATERMARKFORupdate_timeASupdate_time,PRIMARYKEY(currency)NOTENFORCED)WITH('connector'='upsert—kafka',/*...*/),event—timetemporaljoin需要temporaljoin条件的等价条件中包含的主键SELECTorder_id,price,currency,conversion_rate,order_time,FROMordersLEFTJOINcurrency_ratesFORSYSTEMTIMEASOForders.order_timeONorders.currency=currency_rates.currency Processing Time Temporal Join

处理时间时态表连接使用处理时间属性将行与外部版本表中键的最新版本相关联。

根据定义,使用processing—time属性,连接将始终返回给定键的最新值可以将查找表看作是一个简单的HashMap,它存储来自构建端的所有记录这种连接的强大之处在于,当在Flink中无法将表具体化为动态表时,它允许Flink直接针对外部系统工作

使用FOR SYSTEM_TIME AS OF table1.proctime表示当左边表的记录与右边的维表join时,只匹配当前处理时间维表所对应的的快照数据。

Lookup Table Join

Lookup Join 通常用于通过连接外部表补充信息,要求一个表具有处理时间属性,另一个表使 Lookup Source Connector。

JDBC 连接器可以用在时态表关联中作为一个可 lookup 的 source 用到的语法是 Temporal Joins 的语法

)|WITH(|'connector'='jdbc',|'url'='xxxx',|'driver'='$DRIVER_CLASS_NAME',|'table—name'='$tableName',|'lookup.cache.max—rows'='100',|'lookup.cache.ttl'='30s'|)|""".stripMargins"""|CREATETABLEcar(|`id`bigint,|`user_id`bigint,|`proctime`asPROCTIME()|)|WITH(|'connector'='kafka',|'topic'='$topic',|'scan.startup.mode'='latest—offset',|'properties.bootstrap.servers'='$KAFKA_SERVICE',|'properties.group.id'='indicator',|'format'='canal—json'|)|""".stripMarginSELECTmc.user_iduser_id,count(1)AS`value`FROMcarmcinnerjoinusersFORSYSTEM_TIMEASOFmc.proctimeasuonmc.user_id=s.idgroupbymc.user_id 总结

总体来讲,关联维表有四个基础的方式:

查找外部数据源关联

预加载维表关联(内存,状态)

冷热数据储备(算是1和2的结合使用)

维表变更日志关联(广播也好,其他方式的流关联也好)

「同时考虑:」 吞吐量,时效性,外部数据源的负载,内存资源,解耦性等等方面。

四种join方式不存在绝对的一劳永逸,更多的是针对业务场景在各指标上的权衡取舍,因此看官需要结合场景来选择适合的。

郑重声明:此文内容为本网站转载企业宣传资讯,目的在于传播更多信息,与本站立场无关。仅供读者参考,并请自行核实相关内容。