业务数据的变化,我们可以通过 FlinkCDC 采集到,但是 FlinkCDC 是把全部数据统一写入一个 Topic 中, 这些数据包括事实数据,也包含维度数据,这样显然不利于日后的数据处理,所以这个功能是从 Kafka 的业务数据 ODS 层读取数据,经过处理后,将维度数据保存到 HBase,将事实数据写回 Kafka 作为业务数据的 DWD 层

实现动态分流功能

由于 FlinkCDC 是把全部数据统一写入一个 Topic 中, 这样显然不利于日后的数据处理。所以需要把各个表拆开处理。但是由于每个表有不同的特点,有些表是维度表,有些表是事实表。

在实时计算中一般把维度数据写入存储容器,一般是方便通过主键查询的数据库比如HBase,Redis,MySQL 等。一般把事实数据写入流中,进行进一步处理,最终形成宽表。


(资料图片仅供参考)

维度数据不放 Redis 的原因:User 用户维度数据量很大,其它维度还行。为什么不放 MySQL: 并发压力大

这样的配置不适合写在配置文件中,因为这样的话,业务端随着需求变化每增加一张表,就要修改配置重启计算程序。所以这里需要一种动态配置方案,把这种配置长期保存起来,一旦配置有变化,实时计算可以自动感知。

这种可以有两个方案实现

table_process

table_process主健:sourceTable + type
sourceTable根据表名分流
type用来区分新增、变更的数据,不同类型的数据放到不同主题表不
sinkType放Kafka还是其它地方
sinkTable如果是维度表,就是Phoenix表名,如果是 kafka 就是 主题
sinkColumns提供字段,为了自动建表
pkPhoenix 建表必须有主健
extend指定要不要做分区表,等等

Demo

sourceTabletypesinkTypesinkTable
base_trademarkinserthbasedim_xxx(Phoenix 表名)
order_infoinsertkafkadwd_xxx(主题名)
CREATE TABLE `table_process` (`source_table` varchar(200) NOT NULL COMMENT "来源表",`operate_type` varchar(200) NOT NULL COMMENT "操作类型 insert,update,delete",`sink_type` varchar(200) DEFAULT NULL COMMENT "输出类型 hbase kafka",`sink_table` varchar(200) DEFAULT NULL COMMENT "输出表(主题)",`sink_columns` varchar(2000) DEFAULT NULL COMMENT "输出字段",`sink_pk` varchar(200) DEFAULT NULL COMMENT "主键字段",`sink_extend` varchar(200) DEFAULT NULL COMMENT "建表扩展",PRIMARY KEY (`source_table`,`operate_type`)) ENGINE=InnoDB DEFAULT CHARSET=utf8

1.读取状态2.过滤数据3.分流

ODS:

DWD-DIM:

行为数据:DWD(Kafka)

1.过滤脏数据 --> 侧输出流 脏数据率2.新老用户校验 --> 前台校验不准3.分流 --> 侧输出流 页面、启动、曝光、动作、错误4.写入Kafka

业务数据:DWD (Kafka)-DIM(Phoenix)

1.过滤数据-->删除数据2.读取配置表创建广播流3.连接主流和广播流并处理1)广播流数据:

2)主流数据

4.提取Kafka和 HBase 流,分别对应的位置5.HBase流:自定义 Sink6.Kafka流:自定义序列化方式

推荐内容