百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

Flink SQL Client综合实战(flink原理深入与编程实战下载)

csdh11 2025-05-02 17:17 7 浏览

在《Flink SQL Client初探》一文中,我们体验了Flink SQL Client的基本功能,今天来通过实战更深入学习和体验Flink SQL;

实战内容

本次实战主要是通过Flink SQL Client消费kafka的实时消息,再用各种SQL操作对数据进行查询统计,内容汇总如下:

  1. DDL创建Kafka表
  2. 窗口统计;
  3. 数据写入ElasticSearch
  4. 联表操作

版本信息

  • Flink:1.10.0
  • Flink所在操作系统:CentOS Linux release 7.7.1908
  • JDK:1.8.0_211
  • Kafka:2.4.0(scala:2.12)
  • Mysql:5.7.29
  • 数据源准备

    • 本次实战用的数据,来源是阿里云天池公开数据集,其中有一份淘宝用户行为数据集,获取方式请参考《准备数据集用于flink学习
    • 获取到数据集文件后转成kafka消息发出,这样我们使用Flink SQL时就按照实时消费kafka消息的方式来操作,具体的操作方式请参考《将CSV的数据发送到kafka
    • 上述操作完成后,一百零四万条淘宝用户行为数据就会通过kafka消息顺序发出,咱们的实战就有不间断实时数据可用 了,消息内容如下:
    {"user_id":1004080,"item_id":2258662,"category_id":79451,"behavior":"pv","ts":"2017-11-24T23:47:47Z"}
    {"user_id":100814,"item_id":5071478,"category_id":1107469,"behavior":"pv","ts":"2017-11-24T23:47:47Z"}
    {"user_id":114321,"item_id":4306269,"category_id":4756105,"behavior":"pv","ts":"2017-11-24T23:47:48Z"}
    • 上述消息中每个字段的含义如下表:

    jar准备

    实战过程中要用到下面这五个jar文件:

    1. flink-jdbc_2.11-1.10.0.jar
    2. flink-json-1.10.0.jar
    3. flink-sql-connector-elasticsearch6_2.11-1.10.0.jar
    4. flink-sql-connector-kafka_2.11-1.10.0.jar
    5. mysql-connector-java-5.1.48.jar
    • 我已将这些文件打包上传到GitHub,下载地址:https://raw.githubusercontent.com/zq2599/blog_demos/master/files/sql_lib.zip
    • 请在flink安装目录下新建文件夹sql_lib,然后将这五个jar文件放进去;

    Elasticsearch准备

    如果您装了docker和docker-compose,那么下面的命令可以快速部署elasticsearch和head工具:

    wget https://raw.githubusercontent.com/zq2599/blog_demos/master/elasticsearch_docker_compose/docker-compose.yml && \
    docker-compose up -d

    准备完毕,开始操作吧;

    DDL创建Kafka表

    • 进入flink目录,启动flink:bin/start-cluster.sh
    • 启动Flink SQL Client:bin/sql-client.sh embedded -l sql_lib
    • 启动成功显示如下:
    • 执行以下命令即可创建kafka表,请按照自己的信息调整参数:
    CREATE TABLE user_behavior (
        user_id BIGINT,
        item_id BIGINT,
        category_id BIGINT,
        behavior STRING,
        ts TIMESTAMP(3),
        proctime as PROCTIME(),   -- 处理时间列
        WATERMARK FOR ts as ts - INTERVAL '5' SECOND  -- 在ts上定义watermark,ts成为事件时间列
    ) WITH (
        'connector.type' = 'kafka',  -- kafka connector
        'connector.version' = 'universal',  -- universal 支持 0.11 以上的版本
        'connector.topic' = 'user_behavior',  -- kafka topic
        'connector.startup-mode' = 'earliest-offset',  -- 从起始 offset 开始读取
        'connector.properties.zookeeper.connect' = '192.168.50.43:2181',  -- zk 地址
        'connector.properties.bootstrap.servers' = '192.168.50.43:9092',  -- broker 地址
        'format.type' = 'json'  -- 数据源格式为 json
    );
    • 执行SELECT * FROM user_behavior;看看原始数据,如果消息正常应该和下图类似:

    窗口统计

    • 下面的SQL是以每十分钟为窗口,统计每个窗口内的总浏览数,TUMBLE_START返回的数据格式是timestamp,这里再调用DATE_FORMAT函数将其格式化成了字符串:
    SELECT DATE_FORMAT(TUMBLE_START(ts, INTERVAL '10' MINUTE), 'yyyy-MM-dd hh:mm:ss'), 
    DATE_FORMAT(TUMBLE_END(ts, INTERVAL '10' MINUTE), 'yyyy-MM-dd hh:mm:ss'), 
    COUNT(*)
    FROM user_behavior
    WHERE behavior = 'pv'
    GROUP BY TUMBLE(ts, INTERVAL '10' MINUTE);
    • 得到数据如下所示:

    数据写入ElasticSearch

    • 确保elasticsearch已部署好;
    • 执行以下语句即可创建es表,请按照您自己的es信息调整下面的参数:
    CREATE TABLE pv_per_minute ( 
        start_time STRING,
        end_time STRING,
        pv_cnt BIGINT
    ) WITH (
        'connector.type' = 'elasticsearch', -- 类型
        'connector.version' = '6',  -- elasticsearch版本
        'connector.hosts' = 'http://192.168.133.173:9200',  -- elasticsearch地址
        'connector.index' = 'pv_per_minute',  -- 索引名,相当于数据库表名
        'connector.document-type' = 'user_behavior', -- type,相当于数据库库名
        'connector.bulk-flush.max-actions' = '1',  -- 每条数据都刷新
        'format.type' = 'json',  -- 输出数据格式json
        'update-mode' = 'append'
    );
    • 执行以下语句,就会将每分钟的pv总数写入es的pv_per_minute索引:
    INSERT INTO pv_per_minute
    SELECT DATE_FORMAT(TUMBLE_START(ts, INTERVAL '1' MINUTE), 'yyyy-MM-dd hh:mm:ss') AS start_time, 
    DATE_FORMAT(TUMBLE_END(ts, INTERVAL '1' MINUTE), 'yyyy-MM-dd hh:mm:ss') AS end_time, 
    COUNT(*) AS pv_cnt
    FROM user_behavior
    WHERE behavior = 'pv'
    GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE);
    • 用es-head查看,发现数据已成功写入:

    联表操作

    • 当前user_behavior表的category_id表示商品类目,例如11120表示计算机书籍,61626表示牛仔裤,本次实战的数据集中,这样的类目共有五千多种;
    • 如果我们将这五千多种类目分成6个大类,例如11120属于教育类61626属于服装类,那么应该有个大类和类目的关系表;
    • 这个大类和类目的关系表在MySQL创建,表名叫category_info,建表语句如下:
    CREATE TABLE `category_info`(
       `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
       `parent_id` bigint ,
       `category_id` bigint ,
       PRIMARY KEY ( `id` )
    ) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
    • category_info所有数据来自对原始数据中category_id字段的提取,并且随机将它们划分为6个大类,该表的数据请在我的GitHub下载:https://raw.githubusercontent.com/zq2599/blog_demos/master/files/category_info.sql
    • 请在MySQL上建表category_info,并将上述数据全部写进去;
    • 在Flink SQL Client执行以下语句创建这个维表,mysql信息请按您自己配置调整:
    CREATE TABLE category_info (
    	parent_id BIGINT, -- 商品大类
        category_id BIGINT  -- 商品详细类目
    ) WITH (
        'connector.type' = 'jdbc',
        'connector.url' = 'jdbc:mysql://192.168.50.43:3306/flinkdemo',
        'connector.table' = 'category_info',
        'connector.driver' = 'com.mysql.jdbc.Driver',
        'connector.username' = 'root',
        'connector.password' = '123456',
        'connector.lookup.cache.max-rows' = '5000',
        'connector.lookup.cache.ttl' = '10min'
    );
    • 尝试联表查询:
    SELECT U.user_id, U.item_id, U.behavior, C.parent_id, C.category_id
    FROM user_behavior AS U LEFT JOIN category_info FOR SYSTEM_TIME AS OF U.proctime AS C
    ON U.category_id = C.category_id;
    • 如下图,联表查询成功,每条记录都能对应大类:
    • 再试试联表统计,每个大类的总浏览量:
    SELECT C.parent_id, COUNT(*) AS pv_count
    FROM user_behavior AS U LEFT JOIN category_info FOR SYSTEM_TIME AS OF U.proctime AS C
    ON U.category_id = C.category_id
    WHERE behavior = 'pv'
    GROUP BY C.parent_id;
    • 如下图,数据是动态更新的:
    • 执行以下语句,可以在统计时将大类ID转成中文名:
    SELECT CASE C.parent_id
        WHEN 1 THEN '服饰鞋包'
        WHEN 2 THEN '家装家饰'
        WHEN 3 THEN '家电'
        WHEN 4 THEN '美妆'
        WHEN 5 THEN '母婴'
        WHEN 6 THEN '3C数码'
        ELSE '其他'
      END AS category_name,
    COUNT(*) AS pv_count
    FROM user_behavior AS U LEFT JOIN category_info FOR SYSTEM_TIME AS OF U.proctime AS C
    ON U.category_id = C.category_id
    WHERE behavior = 'pv'
    GROUP BY C.parent_id;

    效果如下图:

    至此,我们借助Flink SQL Client体验了Flink SQL丰富的功能,如果您也在学习Flink SQL,希望本文能给您一些参考;

    欢迎关注我的公众号:程序员欣宸

    相关推荐

    Github霸榜的SpringBoot全套学习教程,从入门到实战,内容超详细

    前言...

    SpringBoot+LayUI后台管理系统开发脚手架

    源码获取方式:关注,转发之后私信回复【源码】即可免费获取到!项目简介本项目本着避免重复造轮子的原则,建立一套快速开发JavaWEB项目(springboot-mini),能满足大部分后台管理系统基础开...

    Spring Boot+Vue全栈开发实战,中文版高清PDF资源

    SpringBoot+Vue全栈开发实战,中文高清PDF资源,需要的可以私我:)SpringBoot致力于简化开发配置并为企业级开发提供一系列非业务性功能,而Vue则采用数据驱动视图的方式将程序...

    2021年超详细的java学习路线总结—纯干货分享

    本文整理了java开发的学习路线和相关的学习资源,非常适合零基础入门java的同学,希望大家在学习的时候,能够节省时间。纯干货,良心推荐!第一阶段:Java基础...

    探秘Spring Cache:让Java应用飞起来的秘密武器

    探秘SpringCache:让Java应用飞起来的秘密武器在当今快节奏的软件开发环境中,性能优化显得尤为重要。SpringCache作为Spring框架的一部分,为我们提供了强大的缓存管理能力,让...

    3,从零开始搭建SSHM开发框架(集成Spring MVC)

    目录本专题博客已共享在(这个可能会更新的稍微一些)https://code.csdn.net/yangwei19680827/maven_sshm_blog...

    Spring Boot中如何使用缓存?超简单

    SpringBoot中的缓存可以减少从数据库重复获取数据或执行昂贵计算的需要,从而显著提高应用程序的性能。SpringBoot提供了与各种缓存提供程序的集成,您可以在应用程序中轻松配置和使用缓...

    我敢保证,全网没有再比这更详细的Java知识点总结了,送你啊

    接下来你看到的将是全网最详细的Java知识点总结,全文分为三大部分:Java基础、Java框架、Java+云数据小编将为大家仔细讲解每大部分里面的详细知识点,别眨眼,从小白到大佬、零基础到精通,你绝...

    1,从零开始搭建SSHM开发框架(环境准备)

    目录本专题博客已共享在https://code.csdn.net/yangwei19680827/maven_sshm_blog1,从零开始搭建SSHM开发框架(环境准备)...

    做一个适合二次开发的低代码平台,把程序员从curd中解脱出来-1

    干程序员也有好长时间了,大多数时间都是在做curd。现在想做一个通用的curd平台直接将我们解放出来;把核心放在业务处理中。用过代码生成器,在数据表设计好之后使用它就可以生成需要的controller...

    设计一个高性能Java Web框架(java做网站的框架)

    设计一个高性能JavaWeb框架在当今互联网高速发展的时代,构建高性能的JavaWeb框架对于提升用户体验至关重要。本文将从多个角度探讨如何设计这样一个框架,让我们一起进入这段充满挑战和乐趣的旅程...

    【推荐】强&牛!一款开源免费的功能强大的代码生成器系统!

    今天,给大家推荐一个代码生成器系统项目,这个项目目前收获了5.3KStar,个人觉得不错,值得拿出来和大家分享下。这是我目前见过最好的代码生成器系统项目。功能完整,代码结构清晰。...

    Java面试题及答案总结(2025版持续更新)

    大家好,我是Java面试分享最近很多小伙伴在忙着找工作,给大家整理了一份非常全面的Java面试场景题及答案。...

    Java开发网站架构演变过程-从单体应用到微服务架构详解

    Java开发网站架构演变过程,到目前为止,大致分为5个阶段,分别为单体架构、集群架构、分布式架构、SOA架构和微服务架构。下面玄武老师来给大家详细介绍下这5种架构模式的发展背景、各自优缺点以及涉及到的...

    本地缓存GuavaCache(一)(guava本地缓存原理)

    在并发量、吞吐量越来越大的情况下往往是离不开缓存的,使用缓存能减轻数据库的压力,临时存储数据。根据不同的场景选择不同的缓存,分布式缓存有Redis,Memcached、Tair、EVCache、Aer...