FlinkSQL用例
PV|UV
数据准备
- mysql示例
CREATE TABLE MySQLTable ( ... ) WITH ( 'connector.type' = 'jdbc', -- 必选: jdbc方式 'connector.url' = 'jdbc:mysql://localhost:3306/flink-test', -- 必选: JDBC url 'connector.table' = 'jdbc_table_name', -- 必选: 表名 -- 可选: JDBC driver,如果不配置,会自动通过url提取 'connector.driver' = 'com.mysql.jdbc.Driver', 'connector.username' = 'name', -- 可选: 数据库用户名 'connector.password' = 'password',-- 可选: 数据库密码 -- 可选, 将输入进行分区的字段名. 'connector.read.partition.column' = 'column_name', -- 可选, 分区数量. 'connector.read.partition.num' = '50', -- 可选, 第一个分区的最小值. 'connector.read.partition.lower-bound' = '500', -- 可选, 最后一个分区的最大值 'connector.read.partition.upper-bound' = '1000', -- 可选, 一次提取数据的行数,默认为0,表示忽略此配置 'connector.read.fetch-size' = '100', -- 可选, lookup缓存数据的最大行数,如果超过改配置,老的数据会被清除 'connector.lookup.cache.max-rows' = '5000', -- 可选,lookup缓存存活的最大时间,超过该时间旧数据会过时,注意cache.max-rows与cache.ttl必须同时配置 'connector.lookup.cache.ttl' = '10s', -- 可选, 查询数据最大重试次数 'connector.lookup.max-retries' = '3', -- 可选,写数据最大的flush行数,默认5000,超过改配置,会触发刷数据 'connector.write.flush.max-rows' = '5000', --可选,flush数据的间隔时间,超过该时间,会通过一个异步线程flush数据,默认是0s 'connector.write.flush.interval' = '2s', -- 可选, 写数据失败最大重试次数 'connector.write.max-retries' = '3' )
- 创建kafka的topic
shell kafka-topics.sh --create --topic view-page --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
- 模拟数据生产
public class FlinkSQLCaseTests { private KafkaProducer<string, string=""> producer; private String topic; private Gson gson = new GsonBuilder() .registerTypeAdapter(LocalDateTime.class, new JsonSerializer() { @Override public JsonElement serialize(LocalDateTime localDateTime, Type type, JsonSerializationContext jsonSerializationContext) { return new JsonPrimitive(localDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); } }) .create(); // init @Before public void init() { topic = "view-page"; Properties kafkaProps = new Properties(); kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producer = new KafkaProducer<string, string="">(kafkaProps); } @Test public void testPvUvData() { AtomicLong recordRows = new AtomicLong(0); while (recordRows.getAndIncrement() <= 1000) { producer.send(new ProducerRecord<>(topic, gson.toJson(PageVisitEvent.builder() .id((long) new Random().nextInt(2000)) .eventTime(LocalDateTime.now()) .pageName("p_" + new Random().nextInt(100) % 20) .build()))); } } @After public void close() { producer.close(); } } @Data @AllArgsConstructor @NoArgsConstructor @Builder class PageVisitEvent { private Long id; private String pageName; private LocalDateTime eventTime; }</string,></string,>
- 创建MysqlTable
create table if not exists user_visit_minutes_pv ( page_name varchar(64) not null, visit_date varchar(24) not null, visit_minutes varchar(24) not null, times bigint not null default 0, unique key(page_name,visit_date, visit_minutes) ) comment 'user_visit_minutes_pv'; create table if not exists user_visit_minutes_uv ( page_name varchar(64) not null, visit_date varchar(24) not null, visit_minutes varchar(24) not null, times bigint not null default 0, unique key(page_name,visit_date, visit_minutes) ) comment 'user_visit_minutes_uv';
- 创建flinkTable
CREATE TABLE if not exists view_page_event_pv ( id INT, pageName VARCHAR, eventTime VARCHAR, r_t AS TO_TIMESTAMP(eventTime,'yyyy-MM-dd HH:mm:ss') comment 'et', WATERMARK FOR r_t AS r_t - INTERVAL '5' SECOND, p AS proctime() ) WITH ( 'connector' = 'kafka', 'topic' = 'view-page', 'scan.startup.mode' = 'group-offsets', 'properties.bootstrap.servers' = '127.0.0.1:9092', 'properties.group.id' = 'pv', 'format' = 'json', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true' ); CREATE TABLE if not exists view_page_event_uv ( id INT, pageName VARCHAR, eventTime VARCHAR, r_t AS TO_TIMESTAMP(eventTime,'yyyy-MM-dd HH:mm:ss') comment 'et', WATERMARK FOR r_t AS r_t - INTERVAL '5' SECOND, p AS proctime() ) WITH ( 'connector' = 'kafka', 'topic' = 'view-page', 'scan.startup.mode' = 'group-offsets', 'properties.bootstrap.servers' = '127.0.0.1:9092', 'properties.group.id' = 'uv', 'format' = 'json', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true' ); drop table user_visit_minutes_pv; drop table user_visit_minutes_uv; CREATE TABLE if not exists user_visit_minutes_pv ( page_name STRING NOT NULL, visit_date STRING NOT NULL, visit_minutes STRING NOT NULL, times BIGINT, PRIMARY KEY (page_name,visit_date,visit_minutes) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://192.168.49.2:30336/test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai', 'table-name' = 'user_visit_minutes_pv', 'username' = 'root', 'password' = 'root' ); CREATE TABLE if not exists user_visit_minutes_uv ( page_name STRING NOT NULL, visit_date STRING NOT NULL, visit_minutes STRING NOT NULL, times BIGINT, PRIMARY KEY (page_name,visit_date,visit_minutes) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://192.168.49.2:30336/test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai', 'table-name' = 'user_visit_minutes_uv', 'username' = 'root', 'password' = 'root' );
- 计算pv uv
//计算pv insert into user_visit_minutes_pv SELECT pageName page_name, max(statistic_date) AS visit_date, max(statistic_time) as visit_minutes, SUM(statistic_value) AS times FROM ( SELECT pageName, CAST('PV' AS STRING) AS statistic_type, DATE_FORMAT(max(r_t),'yyyy-MM-dd') AS statistic_date, DATE_FORMAT(max(r_t),'HH:mm:00') AS statistic_time, COUNT(id) AS statistic_value FROM view_page_event_pv GROUP BY pageName,TUMBLE(r_t, INTERVAL '1' MINUTE) ) GROUP BY pageName,statistic_date; //计算uv insert into user_visit_minutes_uv SELECT pageName page_name, max(statistic_date) AS visit_date, max(statistic_time) as visit_minutes, COUNT(distinct user_id) AS times FROM ( SELECT pageName, DATE_FORMAT(max(r_t),'yyyy:MM:dd') AS statistic_date, DATE_FORMAT(max(r_t),'HH:mm:00') AS statistic_time, id AS user_id FROM view_page_event_uv GROUP BY pageName,id,TUMBLE(r_t, INTERVAL '1' MINUTE) ) GROUP BY pageName,statistic_date;
- 维表Join
-- mysql create table if not exists user( id int(11) auto_increment primary key, name varchar(64) not null, city varchar(256) not null ) comment 'user table'; -- flink CREATE TABLE if not exists user_table ( id INT NOT NULL, name STRING NOT NULL, city STRING NOT NULL, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://192.168.49.2:30336/test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai', 'table-name' = 'user', 'username' = 'root', 'password' = 'root', 'scan.fetch-size' = '1000', -- 可选, lookup缓存数据的最大行数,如果超过改配置,老的数据会被清除 'lookup.cache.max-rows' = '5000', -- 可选,lookup缓存存活的最大时间,超过该时间旧数据会过时,注意cache.max-rows与cache.ttl必须同时配置 'lookup.cache.ttl' = '10s', -- 可选, 查询数据最大重试次数 'lookup.max-retries' = '3' ); CREATE TABLE if not exists view_page_event ( id INT, pageName VARCHAR, eventTime VARCHAR, r_t AS TO_TIMESTAMP(eventTime,'yyyy-MM-dd HH:mm:ss') comment 'et', WATERMARK FOR r_t AS r_t - INTERVAL '5' SECOND, p AS proctime() ) WITH ( 'connector' = 'kafka', 'topic' = 'view-page', 'scan.startup.mode' = 'group-offsets', 'properties.bootstrap.servers' = '127.0.0.1:9092', 'properties.group.id' = 'ev', 'format' = 'json', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true' ); select a.id,b.name,b.city,a.pageName from view_page_event a left join user_table FOR SYSTEM_TIME AS OF a.p AS b on a.id = b.id
8. 写入Hive
drop table view_page_event_hive;
drop table user_behavior_hive_tbl;
CREATE TABLE if not exists view_page_event_hive
(
id INT,
pageName VARCHAR,
eventTime VARCHAR,
r_t AS TO_TIMESTAMP(eventTime,'yyyy-MM-dd HH:mm:ss') comment 'et',
WATERMARK FOR r_t AS r_t - INTERVAL '5' SECOND,
p AS proctime()
)
WITH (
'connector' = 'kafka',
'topic' = 'view-page',
'scan.startup.mode' = 'group-offsets',
'properties.bootstrap.servers' = '127.0.0.1:9092',
'properties.group.id' = 'hv',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
);
-- 使用流处理模式
set execution.runtime-mode=streaming;
-- 使用Hive方言
SET table.sql-dialect=hive;
-- 创建一张Hive分区表
CREATE TABLE user_behavior_hive_tbl (
id INT,
pageName STRING,
eventTime STRING,
r_t STRING,
p_t STRING
) PARTITIONED BY (dt STRING,hr String,ms string) STORED AS parquet TBLPROPERTIES (
'partition.time-extractor.timestamp-pattern'='$dt $hr:$ms:00',
'sink.partition-commit.trigger'='partition-time',
'sink.partition-commit.delay'='1 min',
'sink.partition-commit.policy.kind'='metastore,success-file'
);
-- 'sink.partition-commit.policy.kind'='metastore,success-file'
-- streaming写入,必须要开启checkpoint
-- 中间由于 hive元数据库mysql的编码问题导致写入失败,
-- 报错 Schema Transaction threw exception "Add classes to Catalog "hive", Schema """
-- 修改默认字符集为 latin1
-- 执行流式写入Hive表
SET table.sql-dialect=default;
-- streaming sql,将数据写入Hive表
INSERT INTO user_behavior_hive_tbl
SELECT
id,
pageName,
eventTime,
date_format(r_t,'yyyy-MM-dd HH:mm:ss') as rt,
date_format(p,'yyyy-MM-dd HH:mm:ss') as p_t,
date_format(r_t, 'yyyy-MM-dd') as dt,
date_format(r_t, 'HH') as hr,
date_format(r_t, 'mm') as ms
FROM view_page_event_hive
;
-- batch sql,查询Hive表的分区数据
SELECT * FROM user_behavior_hive_tbl WHERE dt='2021-01-04';
set execution.checkpointing.interval='300s';