FlinkSQL用例

PV|UV

数据准备

  1. mysql示例
    CREATE TABLE MySQLTable (
     ...
    ) WITH (
     'connector.type' = 'jdbc', -- 必选: jdbc方式
     'connector.url' = 'jdbc:mysql://localhost:3306/flink-test', -- 必选: JDBC url
     'connector.table' = 'jdbc_table_name',  -- 必选: 表名
      -- 可选: JDBC driver,如果不配置,会自动通过url提取 
     'connector.driver' = 'com.mysql.jdbc.Driver',                                           
     'connector.username' = 'name', -- 可选: 数据库用户名
     'connector.password' = 'password',-- 可选: 数据库密码
       -- 可选, 将输入进行分区的字段名.
     'connector.read.partition.column' = 'column_name',
       -- 可选, 分区数量.
     'connector.read.partition.num' = '50', 
       -- 可选, 第一个分区的最小值.
     'connector.read.partition.lower-bound' = '500',
       -- 可选, 最后一个分区的最大值
     'connector.read.partition.upper-bound' = '1000', 
       -- 可选, 一次提取数据的行数,默认为0,表示忽略此配置
     'connector.read.fetch-size' = '100', 
      -- 可选, lookup缓存数据的最大行数,如果超过改配置,老的数据会被清除
     'connector.lookup.cache.max-rows' = '5000', 
      -- 可选,lookup缓存存活的最大时间,超过该时间旧数据会过时,注意cache.max-rows与cache.ttl必须同时配置
     'connector.lookup.cache.ttl' = '10s', 
      -- 可选, 查询数据最大重试次数
     'connector.lookup.max-retries' = '3', 
      -- 可选,写数据最大的flush行数,默认5000,超过改配置,会触发刷数据 
     'connector.write.flush.max-rows' = '5000', 
      --可选,flush数据的间隔时间,超过该时间,会通过一个异步线程flush数据,默认是0s 
     'connector.write.flush.interval' = '2s', 
     -- 可选, 写数据失败最大重试次数
     'connector.write.max-retries' = '3' 
    )
  2. 创建kafka的topic

    shell kafka-topics.sh --create --topic view-page --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

  3. 模拟数据生产
    public class FlinkSQLCaseTests {
       private KafkaProducer<string, string=""> producer;
       private String topic;
    
       private Gson gson = new GsonBuilder()
               .registerTypeAdapter(LocalDateTime.class, new JsonSerializer() {
                   @Override
                   public JsonElement serialize(LocalDateTime localDateTime, Type type, JsonSerializationContext jsonSerializationContext) {
                       return new JsonPrimitive(localDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
                   }
               })
               .create();
    
       // init
       @Before
       public void init() {
           topic = "view-page";
           Properties kafkaProps = new Properties();
           kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
           kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
           kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
           producer = new KafkaProducer<string, string="">(kafkaProps);
       }
    
       @Test
       public void testPvUvData() {
           AtomicLong recordRows = new AtomicLong(0);
           while (recordRows.getAndIncrement() <= 1000) {
               producer.send(new ProducerRecord<>(topic, gson.toJson(PageVisitEvent.builder()
                       .id((long) new Random().nextInt(2000))
                       .eventTime(LocalDateTime.now())
                       .pageName("p_" + new Random().nextInt(100) % 20)
                       .build())));
           }
       }
    
       @After
       public void close() {
           producer.close();
       }
    }
    
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    @Builder
    class PageVisitEvent {
       private Long id;
       private String pageName;
       private LocalDateTime eventTime;
    }</string,></string,>
  4. 创建MysqlTable
           create table if not exists user_visit_minutes_pv
           (
           page_name varchar(64) not null,
           visit_date varchar(24) not null,
           visit_minutes varchar(24) not null,
           times bigint not null default 0,
           unique key(page_name,visit_date, visit_minutes)
           ) comment 'user_visit_minutes_pv';
    
           create table if not exists user_visit_minutes_uv
           (
           page_name varchar(64) not null,
           visit_date varchar(24) not null,
           visit_minutes varchar(24) not null,
           times bigint not null default 0,
           unique key(page_name,visit_date, visit_minutes)
           ) comment 'user_visit_minutes_uv';
  5. 创建flinkTable
    CREATE TABLE if not exists view_page_event_pv
    (
       id      INT,
       pageName    VARCHAR,
       eventTime   VARCHAR,
       r_t AS TO_TIMESTAMP(eventTime,'yyyy-MM-dd HH:mm:ss') comment 'et',
       WATERMARK FOR r_t AS r_t - INTERVAL '5' SECOND,
       p AS proctime()
    )
       WITH (
           'connector' = 'kafka',
           'topic' = 'view-page',
           'scan.startup.mode' = 'group-offsets',
           'properties.bootstrap.servers' = '127.0.0.1:9092',
           'properties.group.id' = 'pv',
           'format' = 'json',
           'json.fail-on-missing-field' = 'false',
           'json.ignore-parse-errors' = 'true'
       );
    CREATE TABLE if not exists view_page_event_uv
    (
       id      INT,
       pageName    VARCHAR,
       eventTime   VARCHAR,
       r_t AS TO_TIMESTAMP(eventTime,'yyyy-MM-dd HH:mm:ss') comment 'et',
       WATERMARK FOR r_t AS r_t - INTERVAL '5' SECOND,
       p AS proctime()
    )
       WITH (
           'connector' = 'kafka',
           'topic' = 'view-page',
           'scan.startup.mode' = 'group-offsets',
           'properties.bootstrap.servers' = '127.0.0.1:9092',
           'properties.group.id' = 'uv',
           'format' = 'json',
           'json.fail-on-missing-field' = 'false',
           'json.ignore-parse-errors' = 'true'
       );
    
       drop table user_visit_minutes_pv;
       drop table user_visit_minutes_uv;
    CREATE TABLE if not exists user_visit_minutes_pv
    (
       page_name STRING NOT NULL,
       visit_date STRING NOT NULL,
       visit_minutes STRING NOT NULL,
       times BIGINT,
       PRIMARY KEY (page_name,visit_date,visit_minutes) NOT ENFORCED
    ) WITH (
         'connector' = 'jdbc',
         'url' = 'jdbc:mysql://192.168.49.2:30336/test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai',
         'table-name' = 'user_visit_minutes_pv',
         'username' = 'root',
         'password' = 'root'
         );
    
    CREATE TABLE if not exists user_visit_minutes_uv
    (
       page_name STRING NOT NULL,
       visit_date STRING NOT NULL,
       visit_minutes STRING NOT NULL,
       times BIGINT,
       PRIMARY KEY (page_name,visit_date,visit_minutes) NOT ENFORCED
    ) WITH (
         'connector' = 'jdbc',
         'url' = 'jdbc:mysql://192.168.49.2:30336/test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai',
         'table-name' = 'user_visit_minutes_uv',
         'username' = 'root',
         'password' = 'root'
         );
    
  6. 计算pv uv
    //计算pv    
    insert into user_visit_minutes_pv
           SELECT 
               pageName page_name,
               max(statistic_date) AS visit_date,
               max(statistic_time) as visit_minutes,
               SUM(statistic_value) AS times
           FROM (
               SELECT pageName,
               CAST('PV' AS STRING) AS statistic_type,
               DATE_FORMAT(max(r_t),'yyyy-MM-dd') AS statistic_date,
               DATE_FORMAT(max(r_t),'HH:mm:00') AS statistic_time,
               COUNT(id) AS statistic_value
               FROM view_page_event_pv
               GROUP BY pageName,TUMBLE(r_t, INTERVAL '1' MINUTE)
           )
           GROUP BY pageName,statistic_date;       
    //计算uv
    insert into user_visit_minutes_uv
           SELECT 
           pageName page_name,
           max(statistic_date) AS visit_date,
           max(statistic_time) as visit_minutes,
           COUNT(distinct user_id) AS times
           FROM (
             SELECT pageName,
           DATE_FORMAT(max(r_t),'yyyy:MM:dd') AS statistic_date,
           DATE_FORMAT(max(r_t),'HH:mm:00') AS statistic_time,
           id AS user_id
           FROM view_page_event_uv
           GROUP BY pageName,id,TUMBLE(r_t, INTERVAL '1' MINUTE)
           )
           GROUP BY pageName,statistic_date;
  7. 维表Join
    -- mysql
    create table if not exists user(
           id int(11) auto_increment primary key,
           name varchar(64) not null,
           city varchar(256) not null
           ) comment  'user table';
    
    -- flink
    CREATE TABLE if not exists user_table
    (
       id INT NOT NULL,
       name STRING NOT NULL,
       city STRING NOT NULL,
       PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
         'connector' = 'jdbc',
         'url' = 'jdbc:mysql://192.168.49.2:30336/test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai',
         'table-name' = 'user',
         'username' = 'root',
         'password' = 'root',
         'scan.fetch-size' = '1000', 
         -- 可选, lookup缓存数据的最大行数,如果超过改配置,老的数据会被清除
         'lookup.cache.max-rows' = '5000', 
         -- 可选,lookup缓存存活的最大时间,超过该时间旧数据会过时,注意cache.max-rows与cache.ttl必须同时配置
         'lookup.cache.ttl' = '10s', 
         -- 可选, 查询数据最大重试次数
         'lookup.max-retries' = '3'
    );
    
    CREATE TABLE if not exists view_page_event
    (
       id      INT,
       pageName    VARCHAR,
       eventTime   VARCHAR,
       r_t AS TO_TIMESTAMP(eventTime,'yyyy-MM-dd HH:mm:ss') comment 'et',
       WATERMARK FOR r_t AS r_t - INTERVAL '5' SECOND,
       p AS proctime()
    )
       WITH (
           'connector' = 'kafka',
           'topic' = 'view-page',
           'scan.startup.mode' = 'group-offsets',
           'properties.bootstrap.servers' = '127.0.0.1:9092',
           'properties.group.id' = 'ev',
           'format' = 'json',
           'json.fail-on-missing-field' = 'false',
           'json.ignore-parse-errors' = 'true'
       );
    
       select a.id,b.name,b.city,a.pageName from view_page_event a left join user_table FOR SYSTEM_TIME AS OF a.p AS b on a.id = b.id

8. 写入Hive

drop table view_page_event_hive;
drop table user_behavior_hive_tbl;
CREATE TABLE if not exists view_page_event_hive
(
    id      INT,
    pageName    VARCHAR,
    eventTime   VARCHAR,
    r_t AS TO_TIMESTAMP(eventTime,'yyyy-MM-dd HH:mm:ss') comment 'et',
    WATERMARK FOR r_t AS r_t - INTERVAL '5' SECOND,
    p AS proctime()
)
    WITH (
        'connector' = 'kafka',
        'topic' = 'view-page',
        'scan.startup.mode' = 'group-offsets',
        'properties.bootstrap.servers' = '127.0.0.1:9092',
        'properties.group.id' = 'hv',
        'format' = 'json',
        'json.fail-on-missing-field' = 'false',
        'json.ignore-parse-errors' = 'true'
    );
    
-- 使用流处理模式
set execution.runtime-mode=streaming;
-- 使用Hive方言
SET table.sql-dialect=hive; 
-- 创建一张Hive分区表
CREATE TABLE user_behavior_hive_tbl (
    id      INT,
    pageName    STRING,
    eventTime   STRING,
    r_t STRING,
    p_t STRING
) PARTITIONED BY (dt STRING,hr String,ms string) STORED AS parquet TBLPROPERTIES (
  'partition.time-extractor.timestamp-pattern'='$dt $hr:$ms:00',
  'sink.partition-commit.trigger'='partition-time',
  'sink.partition-commit.delay'='1 min',
  'sink.partition-commit.policy.kind'='metastore,success-file'
);

-- 'sink.partition-commit.policy.kind'='metastore,success-file'
-- streaming写入,必须要开启checkpoint
-- 中间由于 hive元数据库mysql的编码问题导致写入失败,
-- 报错 Schema Transaction threw exception "Add classes to Catalog "hive", Schema """
-- 修改默认字符集为 latin1
-- 执行流式写入Hive表
SET table.sql-dialect=default; 
-- streaming sql,将数据写入Hive表
INSERT INTO user_behavior_hive_tbl 
SELECT 
    id,
    pageName,
    eventTime,
    date_format(r_t,'yyyy-MM-dd HH:mm:ss') as rt,
    date_format(p,'yyyy-MM-dd HH:mm:ss') as p_t,
    date_format(r_t, 'yyyy-MM-dd') as dt,
    date_format(r_t, 'HH') as hr,
    date_format(r_t, 'mm') as ms
FROM view_page_event_hive
;

-- batch sql,查询Hive表的分区数据
SELECT * FROM user_behavior_hive_tbl WHERE dt='2021-01-04';

set execution.checkpointing.interval='300s';