• [技术干货] 如何构建本机的flink安全模式的开发环境,读取kafka数据
    本地idea如何构建本机的开发环境,flink读取sasl_plaintext的kafka?然后本机idea启动,直接消费kafka数据?在flink提交任务前加了if (LoginUtil.isSecurityModel()) { try { LOG.info("Securitymode start."); //!!注意,安全认证时,需要用户手动修改为自己申请的机机账号 LoginUtil.securityPrepare(USER_PRINCIPAL, USER_KEYTAB_FILE); } catch (IOException e) { LOG.error("Security prepare failure."); LOG.error("The IOException occured : {}.", e); return; } LOG.info("Security prepare success.");}这部分认证,但是这个好像并没有提交到本地的flinkclient中。
  • [最佳实践] 创建Flink OpenSource作业从Postgres CDC源表读取数据写入到DWS
    场景描述CDC是变更数据捕获(Change Data Capture)技术的缩写,它可以将源数据库的增量变动记录,同步到一个或多个数据目的中。CDC在数据同步过程中,还可以对数据进行一定的处理,例如分组(GROUP BY)、多表的关联(JOIN)等。本示例通过创建Postgres CDC源表来监控Postgres的数据变化,并将变化的数据信息插入到DWS数据库中。前提条件已创建RDS Postgres实例,具体步骤可参考:RDS PostgreSQL快速入门。本示例创建的RDS Postgres数据库版本选择为:11。说明:创建的RDS Postgres数据库版本不能低于11。已创建DWS实例,具体创建DWS集群的操作可以参考创建DWS集群。本示例创建的DWS集群版本为:8.1.1.205。整体作业开发流程整体作业开发流程参考图1。图1 作业开发流程步骤1:创建队列:创建DLI作业运行的队列。步骤2:创建RDS Postgres数据库:创建RDS Postgres的数据库和表。步骤3:创建DWS数据库和表:创建用于接收数据的DWS数据库和表。步骤4:创建增强型跨源连接:DLI上创建连接RDS和DWS的跨源连接,打通网络。步骤5:运行作业:DLI上创建和运行Flink OpenSource作业。步骤6:发送数据和查询结果:RDS Postgres的表上插入数据,在DWS上查看运行结果。步骤1:创建队列登录DLI管理控制台,在左侧导航栏单击“资源管理 > 队列管理”,可进入队列管理页面。在队列管理界面,单击界面右上角的“购买队列”。在“购买队列”界面,填写具体的队列配置参数,具体参数填写参考如下。计费模式:选择“包年/包月”或“按需计费”。本示例选择“按需计费”。区域和项目:保持默认值即可。名称:填写具体的队列名称。说明:新建的队列名称,名称只能包含数字、英文字母和下划线,但不能是纯数字,且不能以下划线开头。长度限制:1~128个字符。队列名称不区分大小写,系统会自动转换为小写。类型:队列类型选择“通用队列”。“按需计费”时需要勾选“专属资源模式”。AZ策略、CPU架构、规格:保持默认即可。企业项目:当前选择为“default”。高级选项:选择“自定义”。网段:配置队列网段。例如,当前配置为10.0.0.0/16。注意:队列的网段不能和DMS Kafka、RDS MySQL实例的子网网段有重合,否则后续创建跨源连接会失败。其他参数根据需要选择和配置。图2 创建队列参数配置完成后,单击“立即购买”,确认配置信息无误后,单击“提交”完成队列创建。步骤2:创建RDS Postgres数据库登录RDS管理控制台,在“实例管理”界面,选择已创建的RDS Postgres实例,选择操作列的“更多 > 登录”,进入数据管理服务实例登录界面。输入实例登录的用户名和密码。单击“登录”,即可进入RDS Postgres数据库并进行管理。在数据库实例界面,单击“新建数据库”,数据库名定义为:testrdsdb,字符集保持默认即可。在testrdsdb数据库下,单击“新建Schema”,Schema名称输入为:test。在test的Schema所在行,单击“操作”列的“打开Schema”。单击“SQL查询”,输入以下创建表语句,创建RDS Postgres表。create table test.cdc_order( order_id VARCHAR, order_channel VARCHAR, order_time VARCHAR, pay_amount FLOAT8, real_pay FLOAT8, pay_time VARCHAR, user_id VARCHAR, user_name VARCHAR, area_id VARCHAR, primary key(order_id));在Postgre中执行下列SQL语句。ALTER TABLE test.cdc_order REPLICA IDENTITY FULL;步骤3:创建DWS数据库和表参考使用gsql命令行客户端连接DWS集群连接已创建的DWS集群。执行以下命令连接DWS集群的默认数据库“gaussdb”:gsql -d gaussdb -h DWS集群连接地址 -U dbadmin -p 8000 -W password -rgaussdb:DWS集群默认数据库。DWS集群连接地址:请参见获取集群连接地址进行获取。如果通过公网地址连接,请指定为集群“公网访问地址”或“公网访问域名”,如果通过内网地址连接,请指定为集群“内网访问地址”或“内网访问域名”。如果通过弹性负载均衡连接,请指定为“弹性负载均衡地址”。dbadmin:创建集群时设置的默认管理员用户名。-W:默认管理员用户的密码。在命令行窗口输入以下命令创建数据库“testdwsdb”。CREATE DATABASE testdwsdb;执行以下命令,退出gaussdb数据库,连接新创建的数据库“testdwsdb”。\qgsql -d testdwsdb -h DWS集群连接地址 -U dbadmin -p 8000 -W password -r执行以下命令创建表。create schema test;set current_schema= test;drop table if exists dws_order;CREATE TABLE dws_order( order_id VARCHAR, order_channel VARCHAR, order_time VARCHAR, pay_amount FLOAT8, real_pay FLOAT8, pay_time VARCHAR, user_id VARCHAR, user_name VARCHAR, area_id VARCHAR);步骤4:创建增强型跨源连接创建DLI连接RDS的增强型跨源连接在RDS管理控制台,选择“实例管理”,单击对应的RDS实例名称,进入到RDS的基本信息页面。在“基本信息”的“连接信息”中获取该实例的“内网地址”、“数据库端口”、“虚拟私有云”和“子网”信息,方便后续操作步骤使用。单击“连接信息”中的安全组名称,在“入方向规则”中添加放通队列网段的规则。例如,本示例队列网段为“10.0.0.0/16”,则规则添加为:优先级选为:1,策略选为:允许,协议选择:TCP,端口值不填,类型:IPV4,源地址为:10.0.0.0/16,单击“确定”完成安全组规则添加。登录DLI管理控制台,在左侧导航栏单击“跨源管理”,在跨源管理界面,单击“增强型跨源”,单击“创建”。在增强型跨源创建界面,配置具体的跨源连接参数。具体参考如下。连接名称:设置具体的增强型跨源名称。本示例输入为:dli_rds。弹性资源池:选择步骤1:创建队列中已经创建的队列。虚拟私有云:选择RDS的虚拟私有云。子网:选择RDS的子网。其他参数可以根据需要选择配置。参数配置完成后,单击“确定”完成增强型跨源配置。单击创建的跨源连接名称,查看跨源连接的连接状态,等待连接状态为:“已激活”后可以进行后续步骤。单击“队列管理”,选择操作的队列,本示例为步骤1:创建队列中创建的队列,在操作列,单击“更多 > 测试地址连通性”。在“测试连通性”界面,根据2中获取的RDS连接信息,地址栏输入“RDS内网地址:RDS数据库端口”,单击“测试”测试DLI到RDS网络是否可达。创建DLI连接DWS的增强型跨源连接在DWS管理控制台,选择“集群管理”,单击已创建的DWS集群名称,进入到DWS的基本信息页面。在“基本信息”的“数据库属性”中获取该实例的“内网IP”、“端口”,“基本信息”页面的“网络”中获取“虚拟私有云”和“子网”信息,方便后续操作步骤使用。单击“连接信息”中的安全组名称,在“入方向规则”中添加放通队列网段的规则。例如,本示例队列网段为“10.0.0.0/16”,则规则添加为:优先级选为:1,策略选为:允许,协议选择:TCP,端口值不填,类型:IPV4,源地址为:10.0.0.0/16,单击“确定”完成安全组规则添加。登录DLI管理控制台,在左侧导航栏单击“跨源管理”,在跨源管理界面,单击“增强型跨源”,单击“创建”。说明:本示例默认RDS和DWS实例分别在两个VPC和子网下,所以要分别创建增强型跨源连接打通网络。如果RDS和DWS实例属于同一VPC和子网下,则创建增强型跨源一次即可,4和5不需要再执行。在增强型跨源创建界面,配置具体的跨源连接参数。具体参考如下。连接名称:设置具体的增强型跨源名称。本示例输入为:dli_dws。弹性资源池:选择步骤1:创建队列中已经创建的队列。虚拟私有云:选择DWS的虚拟私有云。子网:选择DWS的子网。其他参数可以根据需要选择配置。参数配置完成后,单击“确定”完成增强型跨源配置。单击创建的跨源连接名称,查看跨源连接的连接状态,等待连接状态为:“已激活”后可以进行后续步骤。单击“队列管理”,选择操作的队列,本示例为步骤1:创建队列中创建的队列,在操作列,单击“更多 > 测试地址连通性”。在“测试连通性”界面,根据2中获取的DWS连接信息,地址栏输入“DWS内网IP:DWS端口”,单击“测试”测试DLI到DWS网络是否可达。步骤5:运行作业在DLI管理控制台,单击“作业管理 > Flink作业”,在Flink作业管理界面,单击“创建作业”。在创建队列界面,类型选择“Flink OpenSource SQL”,名称填写为:FlinkCDCPostgreDWS。单击“确定”,跳转到Flink作业编辑界面。在Flink OpenSource SQL作业编辑界面,配置如下参数。所属队列:选择步骤1:创建队列中创建的队列。Flink版本:选择1.12。保存作业日志:勾选。OBS桶:选择保存作业日志的OBS桶,根据提示进行OBS桶权限授权。开启Checkpoint:勾选。Flink作业编辑框中输入具体的作业SQL,本示例作业参考如下。SQL中加粗的参数需要根据实际情况修改。说明:本示例使用的Flink版本为1.12,故Flink OpenSource SQL语法也是1.12。本示例数据源是Kafka,写入结果数据到Elasticsearch,故请参考Flink OpenSource SQL 1.12创建Postgres CDC源表和Flink OpenSource SQL 1.12创建DWS结果表。create table PostgreCdcSource( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id STRING, primary key (order_id) not enforced) with ( 'connector' = 'postgres-cdc', 'hostname' = '192.168.15.153',--IP替换为RDS Postgres的实例IP 'port' = '5432',--端口替换为RDS Postgres的实例端口 'username' = 'xxxxx',--RDS Postgres实例的数据库用户名 'password' = 'xxxxx',-RDS Postgres实例的数据库用户密码 'database-name' = 'testrdsdb',--RDS Postgres实例的数据库名 'schema-name' = 'test',--RDS Postgres数据库下的schema 'table-name' = 'cdc_order'--RDS Postgres数据库下的表名);create table dwsSink( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id STRING, primary key(order_id) not enforced) with ( 'connector' = 'gaussdb', 'driver' = 'com.huawei.gauss200.jdbc.Driver', 'url' = 'jdbc:gaussdb://192.168.168.16:8000/testdwsdb', ---192.168.168.16:8000替换为DWS的内网IP和端口,testdwsdb为创建的DWS数据库名 'table-name' = 'test\".\"dws_order', ---test为创建的DWS表的schema,dws_order为对应的DWS表名 'username' = 'xxxxx',--替换为DWS实例的用户名 'password' = 'xxxxx',--替换为DWS实例的用户密码 'write.mode' = 'insert');insert into dwsSink select * from PostgreCdcSource where pay_amount > 100;单击“语义校验”确保SQL语义校验成功。单击“保存”,保存作业。单击“启动”,启动作业,确认作业参数信息,单击“立即启动”开始执行作业。等待作业运行状态变为“运行中”。步骤6:发送数据和查询结果登录RDS管理控制台,在“实例管理”界面,选择已创建的RDS Postgres实例,选择操作列的“更多 > 登录”,进入数据管理服务实例登录界面。输入实例登录的用户名和密码。单击“登录”,即可进入RDS Postgres数据库并进行管理。在已创建的数据库的操作列,单击“SQL查询”,输入以下创建表语句,插入测试数据。insert into test.cdc_order values('202103241000000001','webShop','2021-03-24 10:00:00','50.00','100.00','2021-03-24 10:02:03','0001','Alice','330106'),('202103251606060001','appShop','2021-03-24 12:06:06','200.00','180.00','2021-03-24 16:10:06','0002','Jason','330106'),('202103261000000001','webShop','2021-03-24 14:03:00','300.00','100.00','2021-03-24 10:02:03','0003','Lily','330106'),('202103271606060001','appShop','2021-03-24 16:36:06','99.00','150.00','2021-03-24 16:10:06','0001','Henry','330106');参考使用gsql命令行客户端连接DWS集群连接已创建的DWS集群。执行以下命令连接DWS集群的默认数据库“testdwsdb”:gsql -d testdwsdb -h DWS集群连接地址 -U dbadmin -p 8000 -W password -r执行以下语句,查询DWS的表数据。select * from test.dws_order;查询结果参考如下:order_channel order_channel order_time pay_amount real_pay pay_time user_id user_name area_id202103251606060001 appShop 2021-03-24 12:06:06 200.0 180.0 2021-03-24 16:10:06 0002 Jason 330106202103261000000001 webShop 2021-03-24 14:03:00 300.0 100.0 2021-03-24 10:02:03 0003 Lily 330106
  • [最佳实践] 创建Flink OpenSource作业从MySQL CDC源表读取数据写入到DWS
    场景描述CDC是变更数据捕获(Change Data Capture)技术的缩写,它可以将源数据库的增量变动记录,同步到一个或多个数据目的中。CDC在数据同步过程中,还可以对数据进行一定的处理,例如分组(GROUP BY)、多表的关联(JOIN)等。本示例通过创建MySQL CDC源表来监控MySQL的数据变化,并将变化的数据信息插入到DWS数据库中。前提条件已创建RDS MySQL实例,具体步骤可参考:RDS MySQL快速入门。本示例创建的RDS MySQL数据库版本选择为:8.0。已创建DWS实例,具体创建DWS集群的操作可以参考创建DWS集群。本示例创建的DWS集群版本为:8.1.1.205。整体作业开发流程整体作业开发流程参考图1。图1 作业开发流程步骤1:创建队列:创建DLI作业运行的队列。步骤2:创建RDS MySQL数据库和表:创建RDS MySQL的数据库和表。步骤3:创建DWS数据库和表:创建用于接收数据的DWS数据库和表。步骤4:创建增强型跨源连接:DLI上创建连接RDS和DWS的跨源连接,打通网络。步骤5:运行作业:DLI上创建和运行Flink OpenSource作业。步骤6:发送数据和查询结果:RDS MySQL的表上插入数据,在DWS上查看运行结果。步骤1:创建队列登录DLI管理控制台,在左侧导航栏单击“资源管理 > 队列管理”,可进入队列管理页面。在队列管理界面,单击界面右上角的“购买队列”。在“购买队列”界面,填写具体的队列配置参数,具体参数填写参考如下。计费模式:选择“包年/包月”或“按需计费”。本示例选择“按需计费”。区域和项目:保持默认值即可。名称:填写具体的队列名称。说明:新建的队列名称,名称只能包含数字、英文字母和下划线,但不能是纯数字,且不能以下划线开头。长度限制:1~128个字符。队列名称不区分大小写,系统会自动转换为小写。类型:队列类型选择“通用队列”。“按需计费”时需要勾选“专属资源模式”。AZ策略、CPU架构、规格:保持默认即可。企业项目:当前选择为“default”。高级选项:选择“自定义”。网段:配置队列网段。例如,当前配置为10.0.0.0/16。注意:队列的网段不能和DMS Kafka、RDS MySQL实例的子网网段有重合,否则后续创建跨源连接会失败。其他参数根据需要选择和配置。图2 创建队列参数配置完成后,单击“立即购买”,确认配置信息无误后,单击“提交”完成队列创建。步骤2:创建RDS MySQL数据库和表登录RDS管理控制台,在“实例管理”界面,选择已创建的RDS MySQL实例,选择操作列的“更多 > 登录”,进入数据管理服务实例登录界面。输入实例登录的用户名和密码。单击“登录”,即可进入RDS MySQL数据库并进行管理。在数据库实例界面,单击“新建数据库”,数据库名定义为:testrdsdb,字符集保持默认即可。在已创建的数据库的操作列,单击“SQL查询”,输入以下创建表语句,创建RDS MySQL表。CREATE TABLE mysqlcdc ( `order_id` VARCHAR(64) NOT NULL, `order_channel` VARCHAR(32) NOT NULL, `order_time` VARCHAR(32), `pay_amount` DOUBLE, `real_pay` DOUBLE, `pay_time` VARCHAR(32), `user_id` VARCHAR(32), `user_name` VARCHAR(32), `area_id` VARCHAR(32)) ENGINE = InnoDB DEFAULT CHARACTER SET = utf8mb4;步骤3:创建DWS数据库和表参考使用gsql命令行客户端连接DWS集群连接已创建的DWS集群。执行以下命令连接DWS集群的默认数据库“gaussdb”:gsql -d gaussdb -h DWS集群连接地址 -U dbadmin -p 8000 -W password -rgaussdb:DWS集群默认数据库。DWS集群连接地址:请参见获取集群连接地址进行获取。如果通过公网地址连接,请指定为集群“公网访问地址”或“公网访问域名”,如果通过内网地址连接,请指定为集群“内网访问地址”或“内网访问域名”。如果通过弹性负载均衡连接,请指定为“弹性负载均衡地址”。dbadmin:创建集群时设置的默认管理员用户名。-W:默认管理员用户的密码。在命令行窗口输入以下命令创建数据库“testdwsdb”。CREATE DATABASE testdwsdb;执行以下命令,退出gaussdb数据库,连接新创建的数据库“testdwsdb”。\qgsql -d testdwsdb -h DWS集群连接地址 -U dbadmin -p 8000 -W password -r执行以下命令创建表。create schema test;set current_schema= test;drop table if exists dwsresult;CREATE TABLE dwsresult( car_id VARCHAR, car_owner VARCHAR, car_age INTEGER , average_speed FLOAT8, total_miles FLOAT8);步骤4:创建增强型跨源连接创建DLI连接RDS的增强型跨源连接在RDS管理控制台,选择“实例管理”,单击对应的RDS实例名称,进入到RDS的基本信息页面。在“基本信息”的“连接信息”中获取该实例的“内网地址”、“数据库端口”、“虚拟私有云”和“子网”信息,方便后续操作步骤使用。单击“连接信息”中的安全组名称,在“入方向规则”中添加放通队列网段的规则。例如,本示例队列网段为“10.0.0.0/16”,则规则添加为:优先级选为:1,策略选为:允许,协议选择:TCP,端口值不填,类型:IPV4,源地址为:10.0.0.0/16,单击“确定”完成安全组规则添加。登录DLI管理控制台,在左侧导航栏单击“跨源管理”,在跨源管理界面,单击“增强型跨源”,单击“创建”。在增强型跨源创建界面,配置具体的跨源连接参数。具体参考如下。连接名称:设置具体的增强型跨源名称。本示例输入为:dli_rds。弹性资源池:选择步骤1:创建队列中已经创建的队列。虚拟私有云:选择RDS的虚拟私有云。子网:选择RDS的子网。其他参数可以根据需要选择配置。参数配置完成后,单击“确定”完成增强型跨源配置。单击创建的跨源连接名称,查看跨源连接的连接状态,等待连接状态为:“已激活”后可以进行后续步骤。单击“队列管理”,选择操作的队列,本示例为步骤1:创建队列中创建的队列,在操作列,单击“更多 > 测试地址连通性”。在“测试连通性”界面,根据2中获取的RDS连接信息,地址栏输入“RDS内网地址:RDS数据库端口”,单击“测试”测试DLI到RDS网络是否可达。创建DLI连接DWS的增强型跨源连接在DWS管理控制台,选择“集群管理”,单击已创建的DWS集群名称,进入到DWS的基本信息页面。在“基本信息”的“数据库属性”中获取该实例的“内网IP”、“端口”,“基本信息”页面的“网络”中获取“虚拟私有云”和“子网”信息,方便后续操作步骤使用。单击“连接信息”中的安全组名称,在“入方向规则”中添加放通队列网段的规则。例如,本示例队列网段为“10.0.0.0/16”,则规则添加为:优先级选为:1,策略选为:允许,协议选择:TCP,端口值不填,类型:IPV4,源地址为:10.0.0.0/16,单击“确定”完成安全组规则添加。登录DLI管理控制台,在左侧导航栏单击“跨源管理”,在跨源管理界面,单击“增强型跨源”,单击“创建”。说明:本示例默认RDS和DWS实例分别在两个VPC和子网下,所以要分别创建增强型跨源连接打通网络。如果RDS和DWS实例属于同一VPC和子网下,则创建增强型跨源一次即可,4和5不需要再执行。在增强型跨源创建界面,配置具体的跨源连接参数。具体参考如下。连接名称:设置具体的增强型跨源名称。本示例输入为:dli_dws。弹性资源池:选择步骤1:创建队列中已经创建的队列。虚拟私有云:选择DWS的虚拟私有云。子网:选择DWS的子网。其他参数可以根据需要选择配置。参数配置完成后,单击“确定”完成增强型跨源配置。单击创建的跨源连接名称,查看跨源连接的连接状态,等待连接状态为:“已激活”后可以进行后续步骤。单击“队列管理”,选择操作的队列,本示例为步骤1:创建队列中创建的队列,在操作列,单击“更多 > 测试地址连通性”。在“测试连通性”界面,根据2中获取的DWS连接信息,地址栏输入“DWS内网IP:DWS端口”,单击“测试”测试DLI到DWS网络是否可达。步骤5:运行作业在DLI管理控制台,单击“作业管理 > Flink作业”,在Flink作业管理界面,单击“创建作业”。在创建队列界面,类型选择“Flink OpenSource SQL”,名称填写为:FlinkCDCMySQLDWS。单击“确定”,跳转到Flink作业编辑界面。在Flink OpenSource SQL作业编辑界面,配置如下参数。所属队列:选择步骤1:创建队列中创建的队列。Flink版本:选择1.12。保存作业日志:勾选。OBS桶:选择保存作业日志的OBS桶,根据提示进行OBS桶权限授权。开启Checkpoint:勾选。Flink作业编辑框中输入具体的作业SQL,本示例作业参考如下。SQL中加粗的参数需要根据实际情况修改。说明:本示例使用的Flink版本为1.12,故Flink OpenSource SQL语法也是1.12。本示例数据源是Kafka,写入结果数据到Elasticsearch,故请参考Flink OpenSource SQL 1.12创建MySQL CDC源表和Flink OpenSource SQL 1.12创建DWS结果表。create table mysqlCdcSource( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id STRING) with ( 'connector' = 'mysql-cdc', 'hostname' = '192.168.12.148',--IP替换为RDS MySQL的实例IP 'port' = '3306',--端口替换为RDS MySQL的实例端口 'username' = 'xxx',--RDS MySQL实例的数据库用户名 'password' = 'xxx',--RDS MySQL实例的数据库用户密码 'database-name' = 'testrdsdb',--RDS MySQL实例的数据库名 'table-name' = 'mysqlcdc'--RDS MySQL实例的数据库下的表名);create table dwsSink( order_channel string, pay_amount double, real_pay double, primary key(order_channel) not enforced) with ( 'connector' = 'gaussdb', 'driver' = 'com.huawei.gauss200.jdbc.Driver', 'url' = 'jdbc:gaussdb://192.168.168.16:8000/testdwsdb', ---192.168.168.16:8000替换为DWS的内网IP和端口,testdwsdb为创建的DWS数据库名 'table-name' = 'test\".\"dwsresult', ---test为创建的DWS表的schema,dwsresult为对应的DWS表名 'username' = 'xxx',--替换为DWS实例的用户名 'password' = 'xxx',--替换为DWS实例的用户密码 'write.mode' = 'insert');insert into dwsSink select order_channel, sum(pay_amount),sum(real_pay) from mysqlCdcSource group by order_channel;单击“语义校验”确保SQL语义校验成功。单击“保存”,保存作业。单击“启动”,启动作业,确认作业参数信息,单击“立即启动”开始执行作业。等待作业运行状态变为“运行中”。步骤6:发送数据和查询结果登录RDS管理控制台,在“实例管理”界面,选择已创建的RDS MySQL实例,选择操作列的“更多 > 登录”,进入数据管理服务实例登录界面。输入实例登录的用户名和密码。单击“登录”,即可进入RDS MySQL数据库并进行管理。在已创建的数据库的操作列,单击“SQL查询”,输入以下创建表语句,插入测试数据。insert into mysqlcdc values('202103241000000001','webShop','2021-03-24 10:00:00','100.00','100.00','2021-03-24 10:02:03','0001','Alice','330106'),('202103241206060001','appShop','2021-03-24 12:06:06','200.00','180.00','2021-03-24 16:10:06','0002','Jason','330106'),('202103241403000001','webShop','2021-03-24 14:03:00','300.00','100.00','2021-03-24 10:02:03','0003','Lily','330106'),('202103241636060001','appShop','2021-03-24 16:36:06','200.00','150.00','2021-03-24 16:10:06','0001','Henry','330106');参考使用gsql命令行客户端连接DWS集群连接已创建的DWS集群。执行以下命令连接DWS集群的默认数据库“testdwsdb”:gsql -d testdwsdb -h DWS集群连接地址 -U dbadmin -p 8000 -W password -r执行以下命令,查询DWS的表数据。select * from test.dwsresult;查询结果参考如下:order_channel pay_amount real_pay appShop 400.0 330.0webShop 400.0 200.0
  • [最佳实践] 创建Flink OpenSource作业从Kafka读取数据写入到Elasticsearch
    场景描述本示例场景对汽车驾驶的实时数据信息进行分析,将满足特定条件的数据结果进行汇总。数据的输入源为Kafka,结果输出到DWS中。例如,如下样例输入数据:{"car_id":"3027", "car_owner":"lilei", "car_age":"7", "average_speed":"76", "total_miles":"15000"}{"car_id":"3028", "car_owner":"hanmeimei", "car_age":"6", "average_speed":"92", "total_miles":"17000"}{"car_id":"3029", "car_owner":"zhangsan", "car_age":"10", "average_speed":"81", "total_miles":"230000"}预期输出:average_speed <= 90 和 total_miles <= 200000的车辆,即:{"car_id":"3027", "car_owner":"lilei", "car_age":"7", "average_speed":"76", "total_miles":"15000"}前提条件已创建DMS Kafka实例,具体步骤可参考:DMS Kafka入门指引。注意:创建DMS Kafka实例时,不能开启Kafka SASL_SSL。已创建Elasticsearch类型的CSS集群。具体创建CSS集群的操作可以参考创建CSS集群。本示例创建的CSS集群版本为:7.6.2,集群为非安全集群。整体作业开发流程整体作业开发流程参考图1。图1 作业开发流程步骤1:创建队列:创建DLI作业运行的队列。步骤2:创建Kafka的Topic:创建Kafka生产消费数据的Topic。步骤3:创建Elasticsearch搜索索引:创建Elasticsearch搜索索引用于接收结果数据。步骤4:创建增强型跨源连接:DLI上创建连接Kafka和CSS的跨源连接,打通网络。步骤5:运行作业:DLI上创建和运行Flink OpenSource作业。步骤6:发送数据和查询结果:Kafka上发送流数据,在CSS上查看运行结果。步骤1:创建队列登录DLI管理控制台,在左侧导航栏单击“资源管理 > 队列管理”,可进入队列管理页面。在队列管理界面,单击界面右上角的“购买队列”。在“购买队列”界面,填写具体的队列配置参数,具体参数填写参考如下。计费模式:选择“包年/包月”或“按需计费”。本示例选择“按需计费”。区域和项目:保持默认值即可。名称:填写具体的队列名称。说明:新建的队列名称,名称只能包含数字、英文字母和下划线,但不能是纯数字,且不能以下划线开头。长度限制:1~128个字符。队列名称不区分大小写,系统会自动转换为小写。类型:队列类型选择“通用队列”。“按需计费”时需要勾选“专属资源模式”。AZ策略、CPU架构、规格:保持默认即可。企业项目:当前选择为“default”。高级选项:选择“自定义”。网段:配置队列网段。例如,当前配置为10.0.0.0/16。注意:队列的网段不能和DMS Kafka、RDS MySQL实例的子网网段有重合,否则后续创建跨源连接会失败。其他参数根据需要选择和配置。图2 创建队列参数配置完成后,单击“立即购买”,确认配置信息无误后,单击“提交”完成队列创建。步骤2:创建Kafka的Topic在Kafka管理控制台,选择“Kafka专享版”,单击对应的Kafka名称,进入到Kafka的基本信息页面。单击“Topic管理 > 创建Topic”,创建一个Topic。Topic配置参数如下:Topic名称。本示例输入为:testkafkatopic。分区数:1。副本数:1其他参数保持默认即可。步骤3:创建Elasticsearch搜索索引登录CSS管理控制台,选择“集群管理 > Elasticsearch”。在集群管理界面,在已创建的CSS集群的“操作”列,单击“Kibana”访问集群。在Kibana的左侧导航中选择“Dev Tools”,进入到Console界面。在Console界面,执行如下命令创建索引“shoporders”。PUT /shoporders{ "settings": { "number_of_shards": 1 }, "mappings": { "properties": { "order_id": { "type": "text" }, "order_channel": { "type": "text" }, "order_time": { "type": "text" }, "pay_amount": { "type": "double" }, "real_pay": { "type": "double" }, "pay_time": { "type": "text" }, "user_id": { "type": "text" }, "user_name": { "type": "text" }, "area_id": { "type": "text" } } }}步骤4:创建增强型跨源连接创建DLI连接Kafka的增强型跨源连接在Kafka管理控制台,选择“Kafka专享版”,单击对应的Kafka名称,进入到Kafka的基本信息页面。在“连接信息”中获取该Kafka的“内网连接地址”,在“基本信息”的“网络”中获取获取该实例的“虚拟私有云”和“子网”信息,方便后续操作步骤使用。单击“网络”中的安全组名称,在“入方向规则”中添加放通队列网段的规则。例如,本示例队列网段为“10.0.0.0/16”,则规则添加为:优先级选为:1,策略选为:允许,协议选择:TCP,端口值不填,类型:IPV4,源地址为:10.0.0.0/16,单击“确定”完成安全组规则添加。登录DLI管理控制台,在左侧导航栏单击“跨源管理”,在跨源管理界面,单击“增强型跨源”,单击“创建”。在增强型跨源创建界面,配置具体的跨源连接参数。具体参考如下。连接名称:设置具体的增强型跨源名称。本示例输入为:dli_kafka。弹性资源池:选择步骤1:创建队列中已经创建的队列。虚拟私有云:选择Kafka的虚拟私有云。子网:选择Kafka的子网。其他参数可以根据需要选择配置。参数配置完成后,单击“确定”完成增强型跨源配置。单击创建的跨源连接名称,查看跨源连接的连接状态,等待连接状态为:“已激活”后可以进行后续步骤。单击“队列管理”,选择操作的队列,本示例为步骤1:创建队列中添加的队列,在操作列,单击“更多 > 测试地址连通性”。在“测试连通性”界面,根据中获取的Kafka连接信息,地址栏输入“Kafka内网地址:Kafka数据库端口”,单击“测试”测试DLI到Kafka网络是否可达。创建DLI连接CSS的增强型跨源连接在CSS管理控制台,选择“集群管理”,单击已创建的CSS集群名称,进入到CSS的基本信息页面。在“基本信息”中获取CSS的“内网访问地址”、“虚拟私有云”和“子网”信息,方便后续操作步骤使用。单击“连接信息”中的安全组名称,在“入方向规则”中添加放通队列网段的规则。例如,本示例队列网段为“10.0.0.0/16”,则规则添加为:优先级选为:1,策略选为:允许,协议选择:TCP,端口值不填,类型:IPV4,源地址为:10.0.0.0/16,单击“确定”完成安全组规则添加。登录DLI管理控制台,在左侧导航栏单击“跨源管理”,在跨源管理界面,单击“增强型跨源”,单击“创建”。说明:本示例默认Kafka和CSS实例分别在两个VPC和子网下,所以要分别创建增强型跨源连接打通网络。如果Kafka和CSS实例属于同一VPC和子网下,则创建增强型跨源一次即可,4和5不需要再执行。在增强型跨源创建界面,配置具体的跨源连接参数。具体参考如下。连接名称:设置具体的增强型跨源名称。本示例输入为:dli_css。弹性资源池:选择步骤1:创建队列中已经创建的队列。虚拟私有云:选择CSS的虚拟私有云。子网:选择CSS的子网。其他参数可以根据需要选择配置。参数配置完成后,单击“确定”完成增强型跨源配置。单击创建的跨源连接名称,查看跨源连接的连接状态,等待连接状态为:“已激活”后可以进行后续步骤。单击“队列管理”,选择操作的队列,本示例为步骤1:创建队列中添加的队列,在操作列,单击“更多 > 测试地址连通性”。在“测试连通性”界面,根据2获取的CSS连接信息,地址栏输入“CSS内网地址:CSS内网端口”,单击“测试”测试DLI到CSS网络是否可达。步骤5:运行作业在DLI管理控制台,单击“作业管理 > Flink作业”,在Flink作业管理界面,单击“创建作业”。在创建队列界面,类型选择“Flink OpenSource SQL”,名称填写为:FlinkKafkaES。单击“确定”,跳转到Flink作业编辑界面。在Flink OpenSource SQL作业编辑界面,配置如下参数。所属队列:选择步骤1:创建队列中创建的队列。Flink版本:选择1.12。保存作业日志:勾选。OBS桶:选择保存作业日志的OBS桶,根据提示进行OBS桶权限授权。开启Checkpoint:勾选。Flink作业编辑框中输入具体的作业SQL,本示例作业参考如下。SQL中加粗的参数需要根据实际情况修改。说明:本示例使用的Flink版本为1.12,故Flink OpenSource SQL语法也是1.12。本示例数据源是Kafka,写入结果数据到Elasticsearch,故请参考Flink OpenSource SQL 1.12创建Kafka源表和Flink OpenSource SQL 1.12创建Elasticsearch结果表。CREATE TABLE kafkaSource ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string) with ( "connector" = "kafka", "properties.bootstrap.servers" = "10.128.0.120:9092,10.128.0.89:9092,10.128.0.83:9092",--替换为kafka的内网连接地址和端口 "properties.group.id" = "click", "topic" = "testkafkatopic", --创建的Kafka Topic "format" = "json", "scan.startup.mode" = "latest-offset");CREATE TABLE elasticsearchSink ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = '192.168.168.125:9200', --替换为CSS集群的内网地址和端口 'index' = 'shoporders' --创建的Elasticsearch搜索引擎);--将Kafka数据写入到Elasticsearch索引中insert into elasticsearchSinkselect *from kafkaSource;单击“语义校验”确保SQL语义校验成功。单击“保存”,保存作业。单击“启动”,启动作业,确认作业参数信息,单击“立即启动”开始执行作业。等待作业运行状态变为“运行中”。步骤6:发送数据和查询结果使用Kafka客户端向步骤2:创建Kafka的Topic中的Topic发送数据,模拟实时数据流。Kafka生产和发送数据的方法请参考:DMS - 连接实例生产消费信息。发送样例数据如下:{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}{"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0002", "user_name":"Jason", "area_id":"330106"}发送成功后,在CSS集群的Kibana中执行下述语句并查看相应结果:GET shoporders/_search查询结果返回如下:{ "took" : 0, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 2, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "shoporders", "_type" : "_doc", "_id" : "6fswzIAByVjqg3_qAyM1", "_score" : 1.0, "_source" : { "order_id" : "202103241000000001", "order_channel" : "webShop", "order_time" : "2021-03-24 10:00:00", "pay_amount" : 100.0, "real_pay" : 100.0, "pay_time" : "2021-03-24 10:02:03", "user_id" : "0001", "user_name" : "Alice", "area_id" : "330106" } }, { "_index" : "shoporders", "_type" : "_doc", "_id" : "6vs1zIAByVjqg3_qyyPp", "_score" : 1.0, "_source" : { "order_id" : "202103241606060001", "order_channel" : "appShop", "order_time" : "2021-03-24 16:06:06", "pay_amount" : 200.0, "real_pay" : 180.0, "pay_time" : "2021-03-24 16:10:06", "user_id" : "0002", "user_name" : "Jason", "area_id" : "330106" } } ] }}
  • [最佳实践] 创建Flink OpenSource作业从Kafka读取数据写入到DWS
    场景描述该场景为对汽车驾驶的实时数据信息进行分析,将满足特定条件的数据结果进行汇总。数据的输入源为Kafka,结果输出到DWS中。例如,如下样例输入数据:{"car_id":"3027", "car_owner":"lilei", "car_age":"7", "average_speed":"76", "total_miles":"15000"}{"car_id":"3028", "car_owner":"hanmeimei", "car_age":"6", "average_speed":"92", "total_miles":"17000"}{"car_id":"3029", "car_owner":"zhangsan", "car_age":"10", "average_speed":"81", "total_miles":"230000"}预期输出:average_speed <= 90 和 total_miles <= 200000的车辆,即:{"car_id":"3027", "car_owner":"lilei", "car_age":"7", "average_speed":"76", "total_miles":"15000"}前提条件已创建DMS Kafka实例,具体步骤可参考:创建Kafka实例。注意:创建DMS Kafka实例时,不能开启Kafka SASL_SSL。已创建DWS实例,具体创建DWS集群的操作可以参考创建DWS集群。本示例创建的DWS集群版本为:8.1.1.205。整体作业开发流程整体作业开发流程参考图1。图1 作业开发流程步骤1:创建队列:创建DLI作业运行的队列。步骤2:创建Kafka的Topic:创建Kafka生产消费数据的Topic。步骤3:创建DWS数据库和表:创建DWS数据库和表信息。步骤4:创建增强型跨源连接:DLI上创建连接Kafka和DWS的跨源连接,打通网络。步骤5:运行作业:DLI上创建和运行Flink OpenSource作业。步骤6:发送数据和查询结果:Kafka上发送流数据,在RDS上查看运行结果。步骤1:创建队列登录DLI管理控制台,在左侧导航栏单击“资源管理 > 队列管理”,可进入队列管理页面。在队列管理界面,单击界面右上角的“购买队列”。在“购买队列”界面,填写具体的队列配置参数,具体参数填写参考如下。计费模式:选择“包年/包月”或“按需计费”。本示例选择“按需计费”。区域和项目:保持默认值即可。名称:填写具体的队列名称。说明:新建的队列名称,名称只能包含数字、英文字母和下划线,但不能是纯数字,且不能以下划线开头。长度限制:1~128个字符。队列名称不区分大小写,系统会自动转换为小写。类型:队列类型选择“通用队列”。“按需计费”时需要勾选“专属资源模式”。AZ策略、CPU架构、规格:保持默认即可。企业项目:当前选择为“default”。高级选项:选择“自定义”。网段:配置队列网段。例如,当前配置为10.0.0.0/16。注意:队列的网段不能和DMS Kafka、RDS MySQL实例的子网网段有重合,否则后续创建跨源连接会失败。其他参数根据需要选择和配置。图2 创建队列参数配置完成后,单击“立即购买”,确认配置信息无误后,单击“提交”完成队列创建。步骤2:创建Kafka的Topic在Kafka管理控制台,选择“Kafka专享版”,单击对应的Kafka名称,进入到Kafka的基本信息页面。单击“Topic管理 > 创建Topic”,创建一个Topic。Topic配置参数如下:Topic名称。本示例输入为:testkafkatopic。分区数:1。副本数:1其他参数保持默认即可。步骤3:创建DWS数据库和表参考使用gsql命令行客户端连接DWS集群连接已创建的DWS集群。执行以下命令连接DWS集群的默认数据库“gaussdb”:gsql -d gaussdb -h DWS集群连接地址 -U dbadmin -p 8000 -W password -rgaussdb:DWS集群默认数据库。DWS集群连接地址:请参见获取集群连接地址进行获取。如果通过公网地址连接,请指定为集群“公网访问地址”或“公网访问域名”,如果通过内网地址连接,请指定为集群“内网访问地址”或“内网访问域名”。如果通过弹性负载均衡连接,请指定为“弹性负载均衡地址”。dbadmin:创建集群时设置的默认管理员用户名。-W:默认管理员用户的密码。在命令行窗口输入以下命令创建数据库“testdwsdb”。CREATE DATABASE testdwsdb;执行以下命令,退出gaussdb数据库,连接新创建的数据库“testdwsdb”。\qgsql -d testdwsdb -h DWS集群连接地址 -U dbadmin -p 8000 -W password -r执行以下命令创建表。create schema test;set current_schema= test;drop table if exists qualified_cars;CREATE TABLE qualified_cars( car_id VARCHAR, car_owner VARCHAR, car_age INTEGER , average_speed FLOAT8, total_miles FLOAT8);步骤4:创建增强型跨源连接创建DLI连接Kafka的增强型跨源连接在Kafka管理控制台,选择“Kafka专享版”,单击对应的Kafka名称,进入到Kafka的基本信息页面。在“连接信息”中获取该Kafka的“内网连接地址”,在“基本信息”的“网络”中获取获取该实例的“虚拟私有云”和“子网”信息,方便后续操作步骤使用。单击“网络”中的安全组名称,在“入方向规则”中添加放通队列网段的规则。例如,本示例队列网段为“10.0.0.0/16”,则规则添加为:优先级选为:1,策略选为:允许,协议选择:TCP,端口值不填,类型:IPV4,源地址为:10.0.0.0/16,单击“确定”完成安全组规则添加。登录DLI管理控制台,在左侧导航栏单击“跨源管理”,在跨源管理界面,单击“增强型跨源”,单击“创建”。在增强型跨源创建界面,配置具体的跨源连接参数。具体参考如下。连接名称:设置具体的增强型跨源名称。本示例输入为:dli_kafka。弹性资源池:选择步骤1:创建队列中已经创建的队列名。虚拟私有云:选择Kafka的虚拟私有云。子网:选择Kafka的子网。其他参数可以根据需要选择配置。参数配置完成后,单击“确定”完成增强型跨源配置。单击创建的跨源连接名称,查看跨源连接的连接状态,等待连接状态为:“已激活”后可以进行后续步骤。单击“队列管理”,选择操作的队列,本示例为步骤1:创建队列中创建的队列,在操作列,单击“更多 > 测试地址连通性”。在“测试连通性”界面,根据2中获取的Kafka连接信息,地址栏输入“Kafka内网地址:Kafka数据库端口”,单击“测试”测试DLI到Kafka网络是否可达。创建DLI连接DWS的增强型跨源连接在DWS管理控制台,选择“集群管理”,单击已创建的DWS集群名称,进入到DWS的基本信息页面。在“基本信息”的“数据库属性”中获取该实例的“内网IP”、“端口”,“基本信息”页面的“网络”中获取“虚拟私有云”和“子网”信息,方便后续操作步骤使用。单击“连接信息”中的安全组名称,在“入方向规则”中添加放通队列网段的规则。例如,本示例队列网段为“10.0.0.0/16”,则规则添加为:优先级选为:1,策略选为:允许,协议选择:TCP,端口值不填,类型:IPV4,源地址为:10.0.0.0/16,单击“确定”完成安全组规则添加。登录DLI管理控制台,在左侧导航栏单击“跨源管理”,在跨源管理界面,单击“增强型跨源”,单击“创建”。说明:本示例默认Kafka和DWS实例分别在两个VPC和子网下,所以要分别创建增强型跨源连接打通网络。如果Kafka和DWS实例属于同一VPC和子网下,则创建增强型跨源一次即可,4和5不需要再执行。在增强型跨源创建界面,配置具体的跨源连接参数。具体参考如下。连接名称:设置具体的增强型跨源名称。本示例输入为:dli_dws。弹性资源池:选择步骤1:创建队列中已经创建的队列名。虚拟私有云:选择DWS的虚拟私有云。子网:选择DWS的子网。其他参数可以根据需要选择配置。参数配置完成后,单击“确定”完成增强型跨源配置。单击创建的跨源连接名称,查看跨源连接的连接状态,等待连接状态为:“已激活”后可以进行后续步骤。单击“队列管理”,选择操作的队列,本示例为步骤1:创建队列中创建的队列,在操作列,单击“更多 > 测试地址连通性”。在“测试连通性”界面,根据2中获取的DWS连接信息,地址栏输入“DWS内网IP:DWS端口”,单击“测试”测试DLI到DWS网络是否可达。步骤5:运行作业在DLI管理控制台,单击“作业管理 > Flink作业”,在Flink作业管理界面,单击“创建作业”。在创建队列界面,类型选择“Flink OpenSource SQL”,名称填写为:FlinkKafkaDWS。单击“确定”,跳转到Flink作业编辑界面。在Flink OpenSource SQL作业编辑界面,配置如下参数。所属队列:选择步骤1:创建队列中创建的队列。Flink版本:选择1.12。保存作业日志:勾选。OBS桶:选择保存作业日志的OBS桶,根据提示进行OBS桶权限授权。开启Checkpoint:勾选。Flink作业编辑框中输入具体的作业SQL,本示例作业参考如下。SQL中加粗的参数需要根据实际情况修改。说明:本示例使用的Flink版本为1.12,故Flink OpenSource SQL语法也是1.12。本示例数据源是Kafka,写入结果数据到DWS,故请参考Flink OpenSource SQL 1.12创建Kafka源表和Flink OpenSource SQL 1.12创建DWS结果表(RDS连接)。create table car_infos( car_id STRING, car_owner STRING, car_age INT, average_speed DOUBLE, total_miles DOUBLE) with ( "connector" = "kafka", "properties.bootstrap.servers" = "10.128.0.120:9092,10.128.0.89:9092,10.128.0.83:9092",--替换为kafka的内网连接地址和端口 "properties.group.id" = "click", "topic" = "testkafkatopic",--创建的Kafka的Topic "format" = "json", "scan.startup.mode" = "latest-offset");create table qualified_cars ( car_id STRING, car_owner STRING, car_age INT, average_speed DOUBLE, total_miles DOUBLE)WITH ( 'connector' = 'gaussdb', 'driver' = 'com.huawei.gauss200.jdbc.Driver', 'url' = 'jdbc:gaussdb://192.168.168.16:8000/testdwsdb', ---192.168.168.16:8000替换为DWS的内网IP和端口,testdwsdb为创建的DWS数据库名 'table-name' = 'test\".\"qualified_cars', ---test为创建的DWS表的schema,qualified_cars为对应的DWS表名 'username' = 'xxxx',--替换为DWS实例的用户名 'password' = 'xxxx',--替换为DWS实例的用户密码 'write.mode' = 'insert');/** 将合格车辆信息输出 **/INSERT INTO qualified_carsSELECT *FROM car_infoswhere average_speed <= 90 and total_miles <= 200000;单击“语义校验”确保SQL语义校验成功。单击“保存”,保存作业。单击“启动”,启动作业,确认作业参数信息,单击“立即启动”开始执行作业。等待作业运行状态变为“运行中”。步骤6:发送数据和查询结果使用Kafka客户端向步骤2:创建Kafka的Topic中的Topic发送数据,模拟实时数据流。Kafka生产和发送数据的方法请参考:DMS - 连接实例生产消费信息。发送样例数据如下:{"car_id":"3027", "car_owner":"lilei", "car_age":"7", "average_speed":"76", "total_miles":"15000"}{"car_id":"3028", "car_owner":"hanmeimei", "car_age":"6", "average_speed":"92", "total_miles":"17000"}{"car_id":"3029", "car_owner":"zhangsan", "car_age":"10", "average_speed":"81", "total_miles":"230000"}参考使用gsql命令行客户端连接DWS集群连接已创建的DWS集群。执行以下命令连接DWS集群的默认数据库“testdwsdb”:gsql -d testdwsdb -h DWS集群连接地址 -U dbadmin -p 8000 -W password -r查询DWS的表数据。select * from test.qualified_cars;查询结果参考如下:car_id car_owner car_age average_speed total_miles3027 lilei 7 76.0 15000.0
  • [最佳实践] 创建Flink OpenSource作业从Kafka读取数据写入到RDS
    场景描述该场景为根据商品的实时点击量,获取每小时内点击量最高的3个商品及其相关信息,数据的输入源为Kafka,结果输出到RDS中。例如,如下样例输入数据:{"user_id":"0001", "user_name":"Alice", "event_time":"2021-03-24 08:01:00", "product_id":"0002", "product_name":"name1"}{"user_id":"0002", "user_name":"Bob", "event_time":"2021-03-24 08:02:00", "product_id":"0002", "product_name":"name1"}{"user_id":"0002", "user_name":"Bob", "event_time":"2021-03-24 08:06:00", "product_id":"0004", "product_name":"name2"}{"user_id":"0001", "user_name":"Alice", "event_time":"2021-03-24 08:10:00", "product_id":"0003", "product_name":"name3"}{"user_id":"0003", "user_name":"Cindy", "event_time":"2021-03-24 08:15:00", "product_id":"0005", "product_name":"name4"}{"user_id":"0003", "user_name":"Cindy", "event_time":"2021-03-24 08:16:00", "product_id":"0005", "product_name":"name4"}{"user_id":"0001", "user_name":"Alice", "event_time":"2021-03-24 08:56:00", "product_id":"0004", "product_name":"name2"}{"user_id":"0001", "user_name":"Alice", "event_time":"2021-03-24 09:05:00", "product_id":"0005", "product_name":"name4"} {"user_id":"0001", "user_name":"Alice", "event_time":"2021-03-24 09:10:00", "product_id":"0006", "product_name":"name5"}{"user_id":"0002", "user_name":"Bob", "event_time":"2021-03-24 09:13:00", "product_id":"0006", "product_name":"name5"}预期输出:2021-03-24 08:00:00 - 2021-03-24 08:59:59,0002,name1,22021-03-24 08:00:00 - 2021-03-24 08:59:59,0004,name2,22021-03-24 08:00:00 - 2021-03-24 08:59:59,0005,name4,22021-03-24 09:00:00 - 2021-03-24 09:59:59,0006,name5,22021-03-24 09:00:00 - 2021-03-24 09:59:59,0005,name4,1前提条件已创建DMS Kafka实例,具体步骤可参考:创建Kafka实例。注意:创建DMS Kafka实例时,不能开启Kafka SASL_SSL。已创建RDS MySQL实例,具体步骤可参考:RDS MySQL快速入门。本示例创建的RDS MySQL数据库版本选择为:8.0。整体作业开发流程整体作业开发流程参考图1。图1 作业开发流程步骤1:创建队列:创建DLI作业运行的队列。步骤2:创建Kafka的Topic:创建Kafka生产消费数据的Topic。步骤3:创建RDS数据库和表:创建RDS MySQL数据库和表信息。步骤4:创建增强型跨源连接:DLI上创建连接Kafka和RDS的跨源连接,打通网络。步骤5:运行作业:DLI上创建和运行Flink OpenSource作业。步骤6:发送数据和查询结果:Kafka上发送流数据,在RDS上查看运行结果。步骤1:创建队列登录DLI管理控制台,在左侧导航栏单击“资源管理 > 队列管理”,可进入队列管理页面。在队列管理界面,单击界面右上角的“购买队列”。在“购买队列”界面,填写具体的队列配置参数,具体参数填写参考如下。计费模式:选择“包年/包月”或“按需计费”。本示例选择“按需计费”。区域和项目:保持默认值即可。名称:填写具体的队列名称。说明:新建的队列名称,名称只能包含数字、英文字母和下划线,但不能是纯数字,且不能以下划线开头。长度限制:1~128个字符。队列名称不区分大小写,系统会自动转换为小写。类型:队列类型选择“通用队列”。“按需计费”时需要勾选“专属资源模式”。AZ策略、CPU架构、规格:保持默认即可。企业项目:当前选择为“default”。高级选项:选择“自定义”。网段:配置队列网段。例如,当前配置为10.0.0.0/16。注意:队列的网段不能和DMS Kafka、RDS MySQL实例的子网网段有重合,否则后续创建跨源连接会失败。其他参数根据需要选择和配置。图2 创建队列参数配置完成后,单击“立即购买”,确认配置信息无误后,单击“提交”完成队列创建。步骤2:创建Kafka的Topic登录Kafka管理控制台,选择“Kafka专享版”,单击对应的Kafka实例名称,进入到Kafka实例的基本信息页面。单击“Topic管理 > 创建Topic”,创建一个Topic。Topic配置参数如下:Topic名称。本示例输入为:testkafkatopic。分区数:1。副本数:1其他参数保持默认即可。步骤3:创建RDS数据库和表登录RDS管理控制台,在“实例管理”界面,选择已创建的RDS MySQL实例,选择操作列的“更多 > 登录”,进入数据管理服务实例登录界面。输入实例登录的用户名和密码。单击“登录”,即可进入RDS MySQL数据库并进行管理。在数据库实例界面,单击“新建数据库”,数据库名定义为:testrdsdb,字符集保持默认即可。在已创建的数据库的操作列,单击“SQL查询”,输入以下创建表语句,创建RDS MySQL表。CREATE TABLE clicktop ( `range_time` VARCHAR(64) NOT NULL, `product_id` VARCHAR(32) NOT NULL, `product_name` VARCHAR(32), `event_count` VARCHAR(32), PRIMARY KEY (`range_time`,`product_id`)) ENGINE = InnoDB DEFAULT CHARACTER SET = utf8mb4;步骤4:创建增强型跨源连接创建DLI连接Kafka的增强型跨源连接在Kafka管理控制台,选择“Kafka专享版”,单击对应的Kafka名称,进入到Kafka的基本信息页面。在“连接信息”中获取该Kafka的“内网连接地址”,在“基本信息”的“网络”中获取该实例的“虚拟私有云”和“子网”信息,方便后续操作步骤使用。单击“网络”中的安全组名称,在“入方向规则”中添加放通队列网段的规则。例如,本示例队列网段为“10.0.0.0/16”,则规则添加为:优先级选为:1,策略选为:允许,协议选择:TCP,端口值不填,类型:IPV4,源地址为:10.0.0.0/16,单击“确定”完成安全组规则添加。登录DLI管理控制台,在左侧导航栏单击“跨源管理”,在跨源管理界面,单击“增强型跨源”,单击“创建”。在增强型跨源创建界面,配置具体的跨源连接参数。具体参考如下。连接名称:设置具体的增强型跨源名称。本示例输入为:dli_kafka。弹性资源池:选择步骤1:创建队列中已经创建的队列名称。虚拟私有云:选择Kafka的虚拟私有云。子网:选择Kafka的子网。其他参数可以根据需要选择配置。参数配置完成后,单击“确定”完成增强型跨源配置。单击创建的跨源连接名称,查看跨源连接的连接状态,等待连接状态为“已激活”后可以进行后续步骤。单击“队列管理”,选择操作的队列,本示例为步骤1:创建队列中创建的队列,在操作列,单击“更多 > 测试地址连通性”。在“测试连通性”界面,根据2中获取的Kafka连接信息,地址栏输入“Kafka内网地址:Kafka数据库端口”,单击“测试”测试DLI到Kafka网络是否可达。创建DLI连接RDS的增强型跨源连接在RDS管理控制台,选择“实例管理”,单击对应的RDS实例名称,进入到RDS的基本信息页面。在“基本信息”的“连接信息”中获取该实例的“内网地址”、“数据库端口”、“虚拟私有云”和“子网”信息,方便后续操作步骤使用。单击“连接信息”中的安全组名称,在“入方向规则”中添加放通队列网段的规则。例如,本示例队列网段为“10.0.0.0/16”,则规则添加为:优先级选为:1,策略选为:允许,协议选择:TCP,端口值不填,类型:IPV4,源地址为:10.0.0.0/16,单击“确定”完成安全组规则添加。登录DLI管理控制台,在左侧导航栏单击“跨源管理”,在跨源管理界面,单击“增强型跨源”,单击“创建”。说明:本示例默认Kafka和RDS实例分别在两个VPC和子网下,所以要分别创建增强型跨源连接打通网络。如果Kafka和RDS实例属于同一VPC和子网下,则创建增强型跨源一次即可,4和5不需要再执行。在增强型跨源创建界面,配置具体的跨源连接参数。具体参考如下。连接名称:设置具体的增强型跨源名称。本示例输入为:dli_rds。弹性资源池:选择步骤1:创建队列中已经创建的队列名称。虚拟私有云:选择RDS的虚拟私有云。子网:选择RDS的子网。其他参数可以根据需要选择配置。参数配置完成后,单击“确定”完成增强型跨源配置。单击创建的跨源连接名称,查看跨源连接的连接状态,等待连接状态为:“已激活”后可以进行后续步骤。单击“队列管理”,选择操作的队列,本示例为步骤1:创建队列中创建的队列,在操作列,单击“更多 > 测试地址连通性”。在“测试连通性”界面,根据2中获取的RDS连接信息,地址栏输入“RDS内网地址:RDS数据库端口”,单击“测试”测试DLI到RDS网络是否可达。步骤5:运行作业在DLI管理控制台,单击“作业管理 > Flink作业”,在Flink作业管理界面,单击“创建作业”。在创建作业界面,作业类型选择“Flink OpenSource SQL”,名称填写为:FlinkKafkaRds。单击“确定”,跳转到Flink作业编辑界面。在Flink OpenSource SQL作业编辑界面,配置如下参数。所属队列:选择步骤1:创建队列中创建的队列。Flink版本:选择1.12。保存作业日志:勾选。OBS桶:选择保存作业日志的OBS桶,根据提示进行OBS桶权限授权。开启Checkpoint:勾选。Flink作业编辑框中输入具体的作业SQL,本示例作业参考如下。SQL中加粗的参数需要根据实际情况修改。说明:本示例使用的Flink版本为1.12,故Flink OpenSource SQL语法也是1.12。本示例数据源是Kafka,写入结果数据到RDS,故请参考Flink OpenSource SQL 1.12创建Kafka源表和Flink OpenSource SQL 1.12创建JDBC结果表(RDS连接)。create table click_product( user_id string, --点击用户的id user_name string, --用户名称 event_time string, --点击时间 product_id string, --商品id product_name string --商品名称) with ( "connector" = "kafka", "properties.bootstrap.servers" = "10.128.0.120:9092,10.128.0.89:9092,10.128.0.83:9092",--替换为kafka的内网连接地址和端口 "properties.group.id" = "click", "topic" = "testkafkatopic",--创建的Kafka Topic名称 "format" = "json", "scan.startup.mode" = "latest-offset");--结果表create table top_product ( range_time string, --计算的时间范围 product_id string, --商品id product_name string, --商品名称 event_count bigint, --点击次数 primary key (range_time, product_id) not enforced) with ( "connector" = "jdbc", "url" = "jdbc:mysql://192.168.12.148:3306/testrdsdb",--testrdsdb为创建的RDS的数据库名,IP和端口替换为RDS MySQL的实例IP和端口 "table-name" = "clicktop", "username" = "xxxxx", --替换为RDS MySQL的实例的用户名 "password" = "xxxxx", --替换为RDS MySQL的实例的用户密码 "sink.buffer-flush.max-rows" = "1000", "sink.buffer-flush.interval" = "1s");create view current_event_viewas select product_id, product_name, count(1) as click_count, concat(substring(event_time, 1, 13), ":00:00") as min_event_time, concat(substring(event_time, 1, 13), ":59:59") as max_event_time from click_product group by substring (event_time, 1, 13), product_id, product_name;insert into top_product select concat(min_event_time, " - ", max_event_time) as range_time, product_id, product_name, click_count from ( select *, row_number() over (partition by min_event_time order by click_count desc) as row_num from current_event_view ) where row_num <= 3单击“语义校验”确保SQL语义校验成功。单击“保存”,保存作业。单击“启动”,启动作业,确认作业参数信息,单击“立即启动”开始执行作业。等待作业运行状态变为“运行中”。步骤6:发送数据和查询结果使用Kafka客户端向步骤2:创建Kafka的Topic中的Topic发送数据,模拟实时数据流。Kafka生产和发送数据的方法请参考:DMS - 连接实例生产消费信息。发送样例数据如下:{"user_id":"0001", "user_name":"Alice", "event_time":"2021-03-24 08:01:00", "product_id":"0002", "product_name":"name1"}{"user_id":"0002", "user_name":"Bob", "event_time":"2021-03-24 08:02:00", "product_id":"0002", "product_name":"name1"}{"user_id":"0002", "user_name":"Bob", "event_time":"2021-03-24 08:06:00", "product_id":"0004", "product_name":"name2"}{"user_id":"0001", "user_name":"Alice", "event_time":"2021-03-24 08:10:00", "product_id":"0003", "product_name":"name3"}{"user_id":"0003", "user_name":"Cindy", "event_time":"2021-03-24 08:15:00", "product_id":"0005", "product_name":"name4"}{"user_id":"0003", "user_name":"Cindy", "event_time":"2021-03-24 08:16:00", "product_id":"0005", "product_name":"name4"}{"user_id":"0001", "user_name":"Alice", "event_time":"2021-03-24 08:56:00", "product_id":"0004", "product_name":"name2"}{"user_id":"0001", "user_name":"Alice", "event_time":"2021-03-24 09:05:00", "product_id":"0005", "product_name":"name4"} {"user_id":"0001", "user_name":"Alice", "event_time":"2021-03-24 09:10:00", "product_id":"0006", "product_name":"name5"}{"user_id":"0002", "user_name":"Bob", "event_time":"2021-03-24 09:13:00", "product_id":"0006", "product_name":"name5"}登录RDS控制台,单击RDS数据库实例,单击创建的数据库名,如“testrdsdb”,在创建的表“clicktop”所在行的“操作”列,单击“SQL查询”,输入以下查询语句。select * from `clicktop`;在“SQL查询”界面,单击“执行SQL”,查看RDS表数据已写入成功。
  • [问题求助] fusioninsight opensource flink sql 作业
    fusioninsight opensource flink 1.12 sql 作业中,怎么把kafka的数据接进来写入postgres中,尝试好多,一直sql校验失败。查资料没有示例
  • [生态对接] 【FI8.1.2】【flink】yarn-session看不到submit new job
    【功能模块】flink 1.12.2 session模式【操作步骤&问题现象】1、客户端认证之后执行yarn-session.sh -jm 1024 -tm 4096 -d2、提示submit成功,application_id写入/tmp/.yarn-properties-xxx成功【截图信息】【日志信息】(可选,上传日志内容或者附件)
  • [其他问题] 【MRS】【Flink】1.12版本服务能直接放1.10环境跑吗?
    【功能模块】fusioninsight8.1.2的flinkdemo工程丢到8.0.2的环境跑【操作步骤&问题现象】1、打包8.1.2 接收kafka消息样例代码,flink版本是github开源版本1.12.2(华为的版本1.12.2-hw-ei-312005有的jar包没有,就没用)2、在8.0.2客户端(flink版本1.10)上运行flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka FlinkKafkaJavaExample.jar --topic xxx-bootstrap.server 10.xxx.xxx.xxx:210053、报错【截图信息】【日志信息】(可选,上传日志内容或者附件)
  • [其他问题] Flink 1.12 yarn-per-job不报错但是也没提交任务到yarn
    环境:FusionInsight8.1.2,flink 1.12windows本地运行okMRS客户端执行 flink run -t yarn-per-job --detached xxx.jar 不报错也不提示提交情况
  • [认证交流] 微认证 - 大数据板块 -《使用DLI Flink SQL进行电商实时业务数据分析开发》 - 学习分享
    什么是微认证?          华为云微认证是基于线上学习与在线实践,快速获得场景化技能提升的认证。微认证清单 - 大数据使用DLI Flink SQL进行电商实时业务数据分析开发     课程简介:电商通常有web,小程序等多种接入方式,为掌握其实时变化,需统计各平台的实时访问量、订单数等,从而针对性地调整营销策略。     课程结构:电商实时业务应用场景介绍8认识电商常用的实时业务特点及应用电商实业业务对应大数据技术组件的原理47了解实现电商网站数据实时计算的相关大数据技术特性及原理华为云实时流计算Flink及解决方案7掌握华为云实时流计算Flink及解决方案及相应应用华为云实战案例15掌握华为云实时流计算Flink验流程及开发思路     1、电商实时业务应用场景介绍          电商从2009年发展至今,当前线上购物无疑是最火热的购物方式,而电商平台则又可以以多种方式接入,例如通过web方式访问、通过app的方式访问、通过微信小程序的方式访问等等。电商平台则需要每天统计各平台的实时访问数据量、订单数、访问人数等等          指标,从而能在显示大屏上实时展示相关数据,方便及时了解数据变化,有针对性地调整营销策略。这些高效快捷地统计指标是如何获得的呢?这是我们这次课程及实验所需要理解学习的          当前有很多电商的大数据平台会将每个商品的订单信息实时写入Kafka中,这些信息包括订单ID、订单生成的渠道(即web方式、app方式等)、订单时间、订单金额、折扣后实际支付金额、支付时间、用户ID、用户姓名、订单地区ID等信息。          针对业务场景,我们在大数据分析业务需要做的,就是根据当前可以获取到的业务数据,实时统计每种渠道的相关指标,输出存储到数据库中,并进行大屏展示。     2、电商实时业务对应大数据技术组件的原理        (1)流计算                概述         流式计算就像汽车过收费站,每一个车在通过闸口时都要收费。流式计算中每个实时产生的数据都要被实时的处理。        流计算秉承一个基本理念,即数据的价值随着时间的流逝而降低,如用户点击流。因此,当事件出现时就应该立即进行处理,而不是缓存起来进行离线处理。为了及时处理流数据,就需要一个低延迟、可扩展、高可靠的处理引擎。                应用场景                主要框架       Kafka        Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeepert办调的分布式日志系统。       主要应用场景是:日志收集系统和消息系统。        分布式消息传递基于可靠的消息队列,在客户端应用和消息系统之间异步传递消息。有两种主要的消息传递模式:点对点传递模式、发布-订阅模式。大部分的消息系统选用发布-订阅模式。       Kafka就是一种发布-订阅模式。 Spark                Spark简介          2009年诞生于美国加州大学伯克利分校AMP实验室。          Apache Spark是一种基于内存的快速、通用、可扩展的大数据计算引擎。          Spark 是一站式解决方案,集批处理(Spark Core )、实时流处理(Spark Streaming )、交互式查询(Spark SQL )、图计算(GraphX )与机器学习(MLLib )于一体。                  Spark应用场景           批处理可用于ETL (抽取、转换、加载)。          机器学习可用于自动判断淘宝的买家评论是好评还是差评。          交互式分析可用于查询Hive数据仓库。          流处理可用于页面点击流分析,推荐系统,舆情分析等实时业务。                 Spark架构                Spark特点                       SparkStreaming          Spark Streaming的基本原理是将实时输入数据流以时间片(秒级)为单位进行拆分,然后经Spark引擎以类似批处理的方式处理每个时间片数据。        Flink华为云数据可视化DLI          产品概述数据湖探索(Data Lake Insight,简称DLI)是完全兼容Apache Spark和Apache Flink生态,实现批流一体的Serverless大数据计算分析服务。DLI支持多模引擎,企业仅需使用SQL或程序就可轻松完成异构数据源的批处理、流处理、内存计算、机器学习等,挖掘和探索数据价值。          特点          应用场景:电商行业数据可视化          概述            广义:指一切能够把抽象、枯燥或难以理解的内容,包括看似毫无意义的数据、信息、知识等以一种容易理解的视觉方式展示出来的技术。            狭义:利用计算机图形学和图像处理技术,将数据转换为图形或图像在屏幕上显示出来,并进行各种交互处理的理论、方法和技术。          发展          工具        华为云数据可视化DLV          概述         数据可视化(Data Lake Visualization,简称DLV)是一站式数据可视化开发平台,适配云上云下多种数据源,提供丰富多样的2D、3D可视化组件,采用拖搜式自由布局。          特点          应用场景:某企业安全态势感知     3、华为云实时流计算Flink及解决方案        基于实时流计算的可视化解决方案        解决方案应用场景之智慧城市          智慧城市是通过对大量实时数据的监控、采集和处理,为复杂问题做出快速响应。智慧城市涉及范围很广,智慧城市建设主要包括政务、交通、企业、民生等方面。         解决方案应用场景之实时推荐          根据用户行为数据(包含历史数据和实时数据),通过构建的推荐模型对用户行为秒级调整并生成对应的推荐列表,分钟级更新候选集。          实时推荐主要包括广告推荐、商品推荐、视频推荐、游戏推荐等。     动手实验:         流程介绍 实验单独学习链接:华为云原生大数据serverless服务DLI_在线课程_华为云开发者学堂_云计算培训-华为云 (huaweicloud.com)
  • [交流吐槽] Flink SQL 知其所以然:SQL 数据类型大全!
    SQL 数据类型在介绍完一些基本概念之后,我们来认识一下,Flink SQL 中的数据类型。Flink SQL 内置了很多常见的数据类型,并且也为用户提供了自定义数据类型的能力。总共包含 3 部分:原子数据类型。复合数据类型。用户自定义数据类型。一、原子数据类型1、字符串类型:CHAR、CHAR(n):定长字符串,就和 Java 中的 Char 一样,n 代表字符的定长,取值范围 [1, 2,147,483,647]。如果不指定 n,则默认为 1。VARCHAR、VARCHAR(n)、STRING:可变长字符串,就和 Java 中的 String 一样,n 代表字符的最大长度,取值范围 [1, 2,147,483,647]。如果不指定 n,则默认为 1。STRING 等同于 VARCHAR(2147483647)。2、二进制字符串类型:BINARY、BINARY(n):定长二进制字符串,n 代表定长,取值范围 [1, 2,147,483,647]。如果不指定 n,则默认为 1。VARBINARY、VARBINARY(n)、BYTES:可变长二进制字符串,n 代表字符的最大长度,取值范围 [1, 2,147,483,647]。如果不指定 n,则默认为 1。BYTES 等同于 VARBINARY(2147483647)。3、 精确数值类型:DECIMAL、DECIMAL(p)、DECIMAL(p, s)、DEC、DEC(p)、DEC(p, s)、NUMERIC、NUMERIC(p)、NUMERIC(p, s):固定长度和精度的数值类型,就和 Java 中的 BigDecima一样,p 代表数值位数(长度),取值范围 [1, 38];s 代表小数点后的位数(精度),取值范围 [0, p]。如果不指定,p 默认为 10,s 默认为 0。TINYINT:-128 到 127 的 1 字节大小的有符号整数,就和 Java 中的 byte 一样。SMALLINT:-32,768 to 32,767 的 2 字节大小的有符号整数,就和 Java 中的 short 一样。INT、INTEGER:-2,147,483,648 to 2,147,483,647 的 4 字节大小的有符号整数,就和 Java 中的 int 一样。BIGINT:-9,223,372,036,854,775,808 to 9,223,372,036,854,775,807 的 8 字节大小的有符号整数,就和 Java 中的 long 一样。4、有损精度数值类型:FLOAT:4 字节大小的单精度浮点数值,就和 Java 中的 float 一样。DOUBLE、DOUBLE PRECISION:8 字节大小的双精度浮点数值,就和 Java 中的 double 一样。关于 FLOAT 和 DOUBLE 的区别可见 https://www.runoob.com/w3cnote/float-and-double-different.html。5、布尔类型:BOOLEAN。6、NULL 类型:NULL。7、Raw 类型:RAW('class', 'snapshot') 。只会在数据发生网络传输时进行序列化,反序列化操作,可以保留其原始数据。以 Java 举例,class 参数代表具体对应的 Java 类型,snapshot 代表类型在发生网络传输时的序列化器。8、日期、时间类型:DATE:由 年-月-日 组成的 不带时区含义 的日期类型,取值范围 [0000-01-01, 9999-12-31]TIME、TIME(p):由 小时:分钟:秒[.小数秒] 组成的 不带时区含义 的的时间的数据类型,精度高达纳秒,取值范围 [00:00:00.000000000到23:59:59.9999999]。其中 p 代表小数秒的位数,取值范围 [0, 9],如果不指定 p,默认为 0。TIMESTAMP、TIMESTAMP(p)、TIMESTAMP WITHOUT TIME ZONE、TIMESTAMP(p) WITHOUT TIME ZONE:由 年-月-日 小时:分钟:秒[.小数秒] 组成的 不带时区含义 的时间类型,取值范围 [0000-01-01 00:00:00.000000000, 9999-12-31 23:59:59.999999999]。其中 p 代表小数秒的位数,取值范围 [0, 9],如果不指定 p,默认为 6。TIMESTAMP WITH TIME ZONE、TIMESTAMP(p) WITH TIME ZONE:由 年-月-日 小时:分钟:秒[.小数秒] 时区 组成的 带时区含义 的时间类型,取值范围 [0000-01-01 00:00:00.000000000 +14:59, 9999-12-31 23:59:59.999999999 -14:59]。其中 p 代表小数秒的位数,取值范围 [0, 9],如果不指定 p,默认为 6。TIMESTAMP_LTZ、TIMESTAMP_LTZ(p):由 年-月-日 小时:分钟:秒[.小数秒] 时区 组成的 带时区含义 的时间类型,取值范围 [0000-01-01 00:00:00.000000000 +14:59, 9999-12-31 23:59:59.999999999 -14:59]。其中 p 代表小数秒的位数,取值范围 [0, 9],如果不指定 p,默认为 6。TIMESTAMP_LTZ 与 TIMESTAMP WITH TIME ZONE 的区别在于:TIMESTAMP WITH TIME ZONE 的时区信息是携带在数据中的,举例:其输入数据应该是 2022-01-01 00:00:00.000000000 +08:00;TIMESTAMP_LTZ 的时区信息不是携带在数据中的,而是由 Flink SQL 任务的全局配置决定的,我们可以由 table.local-time-zone 参数来设置时区。INTERVAL YEAR TO MONTH、 INTERVAL DAY TO SECOND:interval 的涉及到的种类比较多。INTERVAL 主要是用于给 TIMESTAMP、TIMESTAMP_LTZ 添加偏移量的。举例,比如给 TIMESTAMP 加、减几天、几个月、几年。二、复合数据类型数组类型:ARRAY、t ARRAY。数组最大长度为 2,147,483,647。t 代表数组内的数据类型。举例 ARRAY、ARRAY,其等同于 INT ARRAY、STRING ARRAY。Map 类型:MAP。Map 类型就和 Java 中的 Map 类型一样,key 是没有重复的。举例 Map、Map。集合类型:MULTISET、t MULTISET。就和 Java 中的 List 类型,一样,运行重复的数据。举例 MULTISET,其等同于 INT MULTISET。对象类型:ROW、ROW、ROW(n0 t0, n1 t1, ...>、ROW(n0 t0 'd0', n1 t1 'd1', ...)。就和 Java 中的自定义对象一样。举例:ROW(myField INT, myOtherField BOOLEAN),其等同于 ROW。三、用户自定义数据类型用户自定义类型就是运行用户使用 Java 等语言自定义一个数据类型出来。但是目前数据类型不支持使用 CREATE TABLE 的 DDL 进行定义,只支持作为函数的输入输出参数。
  • [技术干货] Flink on Yarn三部曲之三:提交Flink任务[转载]
    欢迎访问我的GitHub这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos本文是《Flink on Yarn三部曲》系列的终篇,先简单回顾前面的内容:《Flink on Yarn三部曲之一:准备工作》:准备好机器、脚本、安装包;《Flink on Yarn三部曲之二:部署和设置》:完成CDH和Flink部署,并在管理页面做好相关的设置;现在Flink、Yarn、HDFS都就绪了,接下来实践提交Flink任务到Yarn执行;两种Flink on YARN模式实践之前,对Flink on YARN先简单了解一下,如下图所示,Flink on Yarn在使用的时候分为两种模式,Job Mode和Session Mode:Session Mode:在YARN中提前初始化一个Flink集群,以后所有Flink任务都提交到这个集群,如下图:Job Mode:每次提交Flink任务都会创建一个专用的Flink集群,任务完成后资源释放,如下图:接下来分别实战这两种模式;准备实战用的数据(CDH服务器)接下来提交的Flink任务是经典的WordCount,先在HDFS中准备一份文本文件,后面提交的Flink任务都会读取这个文件,统计里面每个单词的数字,准备文本的步骤如下:SSH登录CDH服务器;切换到hdfs账号:su - hdfs下载实战用的txt文件:创建hdfs文件夹:hdfs dfs -mkdir /input将文本文件上传到/input目录:hdfs dfs -put ./GoneWiththeWind.txt /input准备工作完成,可以提交任务试试了。Session Mode实战SSH登录CDH服务器;切换到hdfs账号:su - hdfs进入目录:/opt/flink-1.7.2/执行如下命令创建Flink集群,-n参数表示TaskManager的数量,-jm表示JobManager的内存大小,-tm表示每个TaskManager的内存大小:创建成功后,控制台输出如下图,注意红框中的提示,表明可以通过38301端口访问Flink:浏览器访问CDH服务器的38301端口,可见Flink服务已经启动:浏览器访问CDH服务器的8088端口,可见YARN的Application(即Flink集群)创建成功,如下图,红框中是任务ID,稍后结束Application的时候会用到此ID:再开启一个终端,SSH登录CDH服务器,切换到hdfs账号,进入目录:/opt/flink-1.7.2执行以下命令,就会提交一个Flink任务(安装包自带的WordCount例子),并指明将结果输出到HDFS的wordcount-result.txt文件中:执行完毕后,控制台输出如下:flink的WordCount任务结果保存在hdfs,我们将结果取出来看看:hdfs dfs -get /wordcount-result.txtvi打开wordcount-result.txt文件,如下图,可见任务执行成功,指定文本中的每个单词数量都统计出来了:浏览器访问Flink页面(CDH服务器的38301端口),也能看到任务的详细情况:销毁这个Flink集群的方法是在控制台执行命令:yarn application -kill application_1580173588985_0002Session Mode的实战就完成了,接下来我们来尝试Job Mode;Job Mode执行以下命令,创建一个Flink集群,该集群只用于执行参数中指定的任务(wordCount.jar),结果输出到hdfs的wordcount-result-1.txt文件:控制台输出如下,表明任务执行完成:如果您的内存和CPU核数充裕,可以立即执行以下命令再创建一个Flink集群,该集群只用于执行参数中指定的任务(wordCount.jar),结果输出到hdfs的wordcount-result-2.txt文件:在YARN管理页面可见任务已经结束:执行命令hdfs dfs -ls /查看结果文件,已经成功生成:执行命令hdfs dfs -get /wordcount-result-1.txt下载结果文件到本地,检查数据正常;至此,Flink on Yarn的部署、设置、提交都实践完成,《Flink on Yarn三部曲》系列也结束了,如果您也在学习Flink,希望本文能够给您一些参考,也建议您根据自身情况和需求,修改ansible脚本,搭建更适合自己的环境;欢迎关注华为云博客:程序员欣宸学习路上,你不孤单,欣宸原创一路相伴…
  • [大数据] Flink基本原理与案例分享
  • [技术干货] flink-shaded-netty-4.0.27.Final-1.0移植指南
     1 介绍简要介绍大数据组件flink-shaded-netty module。2 环境要求硬件要求仅在线下的物理服务器时涉及,需要写作此章节。没有内容时请删除。硬件要求如表2-1所示。表2-1 硬件要求项目说明服务器Taishan服务器CPU鲲鹏920处理器 或 鲲鹏916处理器磁盘分区对磁盘分区无要求网络可访问外网 软件要求表2-2 软件要求项目说明CentOS7.6OS Kernel4.14.0GCC4.8.5Openjdk1.8.0_252Maven3.5.4Protobuf2.5.0netty-all4.0.27.Final 3 配置编译环境提供编译安装该软件所需的环境的安装方法,例如编译器、第三方库、JDK、OS的yum源、防火墙、用户帐号(如要安装前需要单独创建用户或用户组)、环境变量等。多个任务时原则上建议采用Section进行区分,任务操作步骤较多时可以灵活处理为子章节。任务之间的依赖关系需要在各自的任务中明确。写作注意点:l   执行的操作和命令必须用正文,不能用带灰色底纹的回显。l   命令执行后的回显,必须用文本(带灰色底纹的回显样式),不要用截图。示例:3.1 安装基础库                                步骤 1      安装gcc等相关软件。确保有外网环境后,执行yum -y install gcc.aarch64 gcc-c++.aarch64 gcc-gfortran.aarch64 libgcc.aarch64XXXX                                 步骤 2      解决-fsigned-char问题(修改gcc)a、寻找gcc所在路径(一般位于“/usr/bin/gcc”)。command -v gcc b、更改gcc的名字(例如改成gcc-impl)。mv /usr/bin/gcc /usr/bin/gcc-impl c、新建gcc文件。vi /usr/bin/gcc d、填入如下内容保存。#! /bin/sh/usr/bin/gcc-impl -fsigned-char "$@" e、给脚本添加执行权限。chmod +x /usr/bin/gcc f、确认命令是否可用。gcc --version                                 步骤 3      解决-fsigned-char问题(修改g++)a、寻找gcc所在路径(一般位于“/usr/bin/g++”)。command -v g++ b、更改g++的名字(例如改成g++-impl)。mv /usr/bin/gcc /usr/bin/g++-impl c、新建g++文件。vi /usr/bin/g++ d、填入如下内容保存。#! /bin/sh/usr/bin/g++-impl -fsigned-char "$@" e、给脚本添加执行权限。chmod +x /usr/bin/g++ f、确认命令是否可用。g++ --version                                 步骤 4      安装依赖 yum安装依赖的相关软件。yum install -y wget vim openssl-devel zlib-devel automake libtool make  libstdc++-static glibc-static git snappy snappy-devel fuse fuse-devel ----结束  3.2 安装OpenJDK                                 步骤 1      下载并解压安装到指定目录(此处以指定“/opt/tools/installed”目录为例)wget https://github.com/AdoptOpenJDK/openjdk8-binaries/releases/download/jdk8u252-b09/OpenJDK8U-jdk_aarch64_linux_hotspot_8u252b09.tar.gztar -zxf OpenJDK8U-jdk_aarch64_linux_hotspot_8u252b09.tar.gzmkdir -p /opt/tools/installed/mv jdk8u252-b09 /opt/tools/installed/                                 步骤 2      配置Java环境变量vim /etc/profile在文件末尾添加如下代码。export JAVA_HOME=/opt/tools/installed/jdk8u252-b09export PATH=$JAVA_HOME/bin:$PATH                                 步骤 3      使修改的环境变量生效source /etc/profile                                 步骤 4      查看配置是否生效 3.3 安装Maven                                步骤 1      下载并安装到指定目录(此处以指定“/opt/tools/installed”目录为例)wget https://archive.apache.org/dist/maven/maven-3/3.5.4/binaries/apache-maven-3.5.4-bin.tar.gztar -zxf apache-maven-3.5.4-bin.tar.gzmv apache-maven-3.5.4 /opt/tools/installed/                                步骤 2      修改Maven环境变量vim /etc/profile在“/etc/profile”文件末尾增加下面代码。export MAVEN_HOME=/opt/tools/installed/apache-maven-3.5.4export PATH=$MAVEN_HOME/bin:$PATH                                步骤 3      使修改的环境变量生效source /etc/profile                                步骤 4      查看配置是否生效。mvn -v                                 步骤 5      查看配置是否生效。修改Maven配置文件中的:本地仓路径、远程仓等。配置文件路径:“/opt/tools/installed/apache-maven-3.5.4/conf/settings.xml”远程仓库配置(修改成自己搭建的Maven仓库,如果没有,可以按照下面示例配置),在<mirrors>标签内添加以下内容: <mirror>  <id>huaweimaven</id>  <name>huawei maven</name>  <url>https://mirrors.huaweicloud.com/repository/maven/</url>  <mirrorOf>central</mirrorOf></mirror> 若编译环境需要代理才能访问外网,需要在settings.xml配置文件中添加代理配置,具体内容如下:<proxies>  <proxy>    <id>optional</id>    <active>true</active>    <protocol>http</protocol>    <username>用户名</username>    <password>密码</password>    <host>代理服务器网址</host>    <port>代理服务器端口</port>    <nonProxyHosts>local.net|some.host.com</nonProxyHosts>  </proxy></proxies> 3.4 安装Protobuf                                步骤 1      查看配置是否生效。yum install -y protobuf protobuf-devel                                步骤 2      通过执行以下命令,指定安装的Protoc可执行文件mvn install:install-file -DgroupId=com.google.protobuf -DartifactId=protoc -Dversion=2.5.0 -Dclassifier=linux-aarch_64 -Dpackaging=exe -Dfile=/usr/bin/protoc  4 编译4.1 编译netty-all-4.0.27.Final                                步骤 1      按xxx指导编译netty-all-4.0.27.Final4.2 编译flink-shaded-netty                                步骤 1      下载flink-shaded-1.0安装包。wget https://github.com/apache/flink-shaded/archive/1.0.tar.gz                                步骤 2      解压安装包。tar -zxf 1.0.tar.gz                                步骤 3      进入解压后目录cd flink-shaded-1.0                                步骤 4      执行编译编译打成jar包,flink-shaded-netty-4.0.27.Final-1.0.jar放置于“flink-shaded-netty-4/target”目录。mvn clean install -pl flink-shaded-netty-4                                 步骤 5      使用鲲鹏分析扫描工具扫描编译生成的jar包,确保没有包含有x86的so和jar包。----结束A 修订记录写作说明:仅在第一次发布时,明确第一次正式发布。后续的刷新记录,不需要写作是第几次发布,只需要提供发布日期和修订说明即可。发布日期修订记录2021-01-05第一次正式发布 
总条数:99 到第
上滑加载中