• [问题求助] 作业开发-restClient模块如何操作MRS的es索引
    现在想在数据开发模块,定时用RestClient组件,操作MRS的es删除固定索引。MRS集群是安全模式。应该如何配置呢?
  • [问题求助] 【Phoenix产品】【客户端认证功能】同一个Java 进程内先创建了es的客户端之后,Phoenix客户端获取连接异常
    【功能模块】【操作步骤&问题现象】1、同一个Java 进程内 先调用一下es服务(开启了kerberos认证)2、再调用一下Phoenix服务【截图信息】【日志信息】(可选,上传日志内容或者附件)org.apache.zookeeper.KeeperException$SystemErrorException: KeeperErrorCode = SystemError for /hbase    at org.apache.phoenix.util.ServerUtil.parseServerException(ServerUtil.java:138)    at org.apache.phoenix.query.ConnectionQueryServicesImpl.ensureTableCreated(ConnectionQueryServicesImpl.java:1204)    at org.apache.phoenix.query.ConnectionQueryServicesImpl.createTable(ConnectionQueryServicesImpl.java:1501)    at org.apache.phoenix.query.DelegateConnectionQueryServices.createTable(DelegateConnectionQueryServices.java:119)    at org.apache.phoenix.schema.MetaDataClient.createTableInternal(MetaDataClient.java:2721)    at org.apache.phoenix.schema.MetaDataClient.createTable(MetaDataClient.java:1114)    at org.apache.phoenix.compile.CreateTableCompiler$1.execute(CreateTableCompiler.java:192)    at org.apache.phoenix.jdbc.PhoenixStatement$2.call(PhoenixStatement.java:409)    at org.apache.phoenix.jdbc.PhoenixStatement$2.call(PhoenixStatement.java:392)    at org.apache.phoenix.call.CallRunner.run(CallRunner.java:53)    at org.apache.phoenix.jdbc.PhoenixStatement.executeMutation(PhoenixStatement.java:391)    at org.apache.phoenix.jdbc.PhoenixStatement.executeMutation(PhoenixStatement.java:379)    at org.apache.phoenix.jdbc.PhoenixStatement.executeUpdate(PhoenixStatement.java:1811)    at org.apache.phoenix.query.ConnectionQueryServicesImpl$12.call(ConnectionQueryServicesImpl.java:2573)    at org.apache.phoenix.query.ConnectionQueryServicesImpl$12.call(ConnectionQueryServicesImpl.java:2536)    at org.apache.phoenix.util.PhoenixContextExecutor.call(PhoenixContextExecutor.java:76)    at org.apache.phoenix.query.ConnectionQueryServicesImpl.init(ConnectionQueryServicesImpl.java:2536)    at org.apache.phoenix.jdbc.PhoenixDriver.getConnectionQueryServices(PhoenixDriver.java:264)    at org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.createConnection(PhoenixEmbeddedDriver.java:150)    at org.apache.phoenix.jdbc.PhoenixDriver.connect(PhoenixDriver.java:230)    at java.sql.DriverManager.getConnection(DriverManager.java:664)    at java.sql.DriverManager.getConnection(DriverManager.java:208)    at com.dtwave.ai.engine.freyr.strategy.phoenix.plugin.PhoenixExecutorImpl.lambda$getConnection$0(PhoenixExecutorImpl.java:123)    ... 82 common frames omittedCaused by: org.apache.hadoop.hbase.client.RetriesExhaustedException: Failed after attempts=2, exceptions:2022-03-29T03:03:45.889Z, RpcRetryingCaller{globalStartTime=1648523025126, pause=50, maxAttempts=2}, org.apache.hadoop.hbase.client.RetriesExhaustedException: Failed after attempts=2, exceptions:2022-03-29T03:03:45.197Z, RpcRetryingCaller{globalStartTime=1648523025126, pause=50, maxAttempts=2}, java.io.IOException: org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /hbase2022-03-29T03:03:45.888Z, RpcRetryingCaller{globalStartTime=1648523025126, pause=50, maxAttempts=2}, java.io.IOException: org.apache.zookeeper.KeeperException$SystemErrorException: KeeperErrorCode = SystemError for /hbase2022-03-29T03:03:46.041Z, RpcRetryingCaller{globalStartTime=1648523025126, pause=50, maxAttempts=2}, org.apache.hadoop.hbase.client.RetriesExhaustedException: Failed after attempts=2, exceptions:2022-03-29T03:03:45.989Z, RpcRetryingCaller{globalStartTime=1648523025939, pause=50, maxAttempts=2}, java.io.IOException: org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /hbase2022-03-29T03:03:46.040Z, RpcRetryingCaller{globalStartTime=1648523025939, pause=50, maxAttempts=2}, java.io.IOException: org.apache.zookeeper.KeeperException$SystemErrorException: KeeperErrorCode = SystemError for /hbase    at org.apache.hadoop.hbase.client.RpcRetryingCallerImpl.callWithRetries(RpcRetryingCallerImpl.java:152)    at org.apache.hadoop.hbase.client.HBaseAdmin.executeCallable(HBaseAdmin.java:3272)    at org.apache.hadoop.hbase.client.HBaseAdmin.executeCallable(HBaseAdmin.java:3264)    at org.apache.hadoop.hbase.client.HBaseAdmin.tableExists(HBaseAdmin.java:472)    at org.apache.phoenix.query.ConnectionQueryServicesImpl.ensureTableCreated(ConnectionQueryServicesImpl.java:1105)    ... 103 common frames omittedCaused by: org.apache.hadoop.hbase.client.RetriesExhaustedException: Failed after attempts=2, exceptions:2022-03-29T03:03:45.989Z, RpcRetryingCaller{globalStartTime=1648523025939, pause=50, maxAttempts=2}, java.io.IOException: org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /hbase2022-03-29T03:03:46.040Z, RpcRetryingCaller{globalStartTime=1648523025939, pause=50, maxAttempts=2}, java.io.IOException: org.apache.zookeeper.KeeperException$SystemErrorException: KeeperErrorCode = SystemError for /hbase    at org.apache.hadoop.hbase.client.RpcRetryingCallerImpl.callWithRetries(RpcRetryingCallerImpl.java:152)    at org.apache.hadoop.hbase.client.HTable.get(HTable.java:384)    at org.apache.hadoop.hbase.client.HTable.get(HTable.java:358)    at org.apache.hadoop.hbase.MetaTableAccessor.getTableState(MetaTableAccessor.java:1124)    at org.apache.hadoop.hbase.MetaTableAccessor.tableExists(MetaTableAccessor.java:446)    at org.apache.hadoop.hbase.client.HBaseAdmin$6.rpcCall(HBaseAdmin.java:475)    at org.apache.hadoop.hbase.client.HBaseAdmin$6.rpcCall(HBaseAdmin.java:472)    at org.apache.hadoop.hbase.client.RpcRetryingCallable.call(RpcRetryingCallable.java:58)    at org.apache.hadoop.hbase.client.RpcRetryingCallerImpl.callWithRetries(RpcRetryingCallerImpl.java:109)    ... 107 common frames omittedCaused by: java.io.IOException: org.apache.zookeeper.KeeperException$SystemErrorException: KeeperErrorCode = SystemError for /hbase    at org.apache.hadoop.hbase.client.ConnectionImplementation.get(ConnectionImplementation.java:2147)    at org.apache.hadoop.hbase.client.ConnectionImplementation.locateMeta(ConnectionImplementation.java:820)    at org.apache.hadoop.hbase.client.ConnectionImplementation.locateRegion(ConnectionImplementation.java:787)    at org.apache.hadoop.hbase.client.HRegionLocator.getRegionLocation(HRegionLocator.java:64)    at org.apache.hadoop.hbase.client.RegionLocator.getRegionLocation(RegionLocator.java:58)    at org.apache.hadoop.hbase.client.RegionLocator.getRegionLocation(RegionLocator.java:47)    at org.apache.hadoop.hbase.client.RegionServerCallable.prepare(RegionServerCallable.java:223)    at org.apache.hadoop.hbase.client.RpcRetryingCallerImpl.callWithRetries(RpcRetryingCallerImpl.java:107)    ... 115 common frames omittedCaused by: org.apache.zookeeper.KeeperException$SystemErrorException: KeeperErrorCode = SystemError for /hbase    at org.apache.zookeeper.KeeperException.create(KeeperException.java:97)    at org.apache.zookeeper.KeeperException.create(KeeperException.java:54)    at org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient$ZKTask$1.exec(ReadOnlyZKClient.java:209)    at org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient.run(ReadOnlyZKClient.java:356)    ... 1 common frames omitted
  • [行业资讯] Elasticsearch 8.3.0 发布
    Elasticsearch 是一个基于 Lucene 库的搜索引擎,目前发布了最新的 8.3.0 版本,该版本启用了身份认证中的不推荐在没有相应绑定密码的情况下在 LDAP 或 Active Directory (AD) 领域中配置绑定 DN 。此外增强了在 random_sampleraggregation 中提高最小和最大性能,更多详情可查看发布说明:https://www.elastic.co/guide/en/elasticsearch/reference/8.3/release-notes-8.3.0.html转载于CSDN微信公众号
  • [最佳实践] 创建Flink OpenSource作业从Kafka读取数据写入到Elasticsearch
    场景描述本示例场景对汽车驾驶的实时数据信息进行分析,将满足特定条件的数据结果进行汇总。数据的输入源为Kafka,结果输出到DWS中。例如,如下样例输入数据:{"car_id":"3027", "car_owner":"lilei", "car_age":"7", "average_speed":"76", "total_miles":"15000"}{"car_id":"3028", "car_owner":"hanmeimei", "car_age":"6", "average_speed":"92", "total_miles":"17000"}{"car_id":"3029", "car_owner":"zhangsan", "car_age":"10", "average_speed":"81", "total_miles":"230000"}预期输出:average_speed <= 90 和 total_miles <= 200000的车辆,即:{"car_id":"3027", "car_owner":"lilei", "car_age":"7", "average_speed":"76", "total_miles":"15000"}前提条件已创建DMS Kafka实例,具体步骤可参考:DMS Kafka入门指引。注意:创建DMS Kafka实例时,不能开启Kafka SASL_SSL。已创建Elasticsearch类型的CSS集群。具体创建CSS集群的操作可以参考创建CSS集群。本示例创建的CSS集群版本为:7.6.2,集群为非安全集群。整体作业开发流程整体作业开发流程参考图1。图1 作业开发流程步骤1:创建队列:创建DLI作业运行的队列。步骤2:创建Kafka的Topic:创建Kafka生产消费数据的Topic。步骤3:创建Elasticsearch搜索索引:创建Elasticsearch搜索索引用于接收结果数据。步骤4:创建增强型跨源连接:DLI上创建连接Kafka和CSS的跨源连接,打通网络。步骤5:运行作业:DLI上创建和运行Flink OpenSource作业。步骤6:发送数据和查询结果:Kafka上发送流数据,在CSS上查看运行结果。步骤1:创建队列登录DLI管理控制台,在左侧导航栏单击“资源管理 > 队列管理”,可进入队列管理页面。在队列管理界面,单击界面右上角的“购买队列”。在“购买队列”界面,填写具体的队列配置参数,具体参数填写参考如下。计费模式:选择“包年/包月”或“按需计费”。本示例选择“按需计费”。区域和项目:保持默认值即可。名称:填写具体的队列名称。说明:新建的队列名称,名称只能包含数字、英文字母和下划线,但不能是纯数字,且不能以下划线开头。长度限制:1~128个字符。队列名称不区分大小写,系统会自动转换为小写。类型:队列类型选择“通用队列”。“按需计费”时需要勾选“专属资源模式”。AZ策略、CPU架构、规格:保持默认即可。企业项目:当前选择为“default”。高级选项:选择“自定义”。网段:配置队列网段。例如,当前配置为10.0.0.0/16。注意:队列的网段不能和DMS Kafka、RDS MySQL实例的子网网段有重合,否则后续创建跨源连接会失败。其他参数根据需要选择和配置。图2 创建队列参数配置完成后,单击“立即购买”,确认配置信息无误后,单击“提交”完成队列创建。步骤2:创建Kafka的Topic在Kafka管理控制台,选择“Kafka专享版”,单击对应的Kafka名称,进入到Kafka的基本信息页面。单击“Topic管理 > 创建Topic”,创建一个Topic。Topic配置参数如下:Topic名称。本示例输入为:testkafkatopic。分区数:1。副本数:1其他参数保持默认即可。步骤3:创建Elasticsearch搜索索引登录CSS管理控制台,选择“集群管理 > Elasticsearch”。在集群管理界面,在已创建的CSS集群的“操作”列,单击“Kibana”访问集群。在Kibana的左侧导航中选择“Dev Tools”,进入到Console界面。在Console界面,执行如下命令创建索引“shoporders”。PUT /shoporders{ "settings": { "number_of_shards": 1 }, "mappings": { "properties": { "order_id": { "type": "text" }, "order_channel": { "type": "text" }, "order_time": { "type": "text" }, "pay_amount": { "type": "double" }, "real_pay": { "type": "double" }, "pay_time": { "type": "text" }, "user_id": { "type": "text" }, "user_name": { "type": "text" }, "area_id": { "type": "text" } } }}步骤4:创建增强型跨源连接创建DLI连接Kafka的增强型跨源连接在Kafka管理控制台,选择“Kafka专享版”,单击对应的Kafka名称,进入到Kafka的基本信息页面。在“连接信息”中获取该Kafka的“内网连接地址”,在“基本信息”的“网络”中获取获取该实例的“虚拟私有云”和“子网”信息,方便后续操作步骤使用。单击“网络”中的安全组名称,在“入方向规则”中添加放通队列网段的规则。例如,本示例队列网段为“10.0.0.0/16”,则规则添加为:优先级选为:1,策略选为:允许,协议选择:TCP,端口值不填,类型:IPV4,源地址为:10.0.0.0/16,单击“确定”完成安全组规则添加。登录DLI管理控制台,在左侧导航栏单击“跨源管理”,在跨源管理界面,单击“增强型跨源”,单击“创建”。在增强型跨源创建界面,配置具体的跨源连接参数。具体参考如下。连接名称:设置具体的增强型跨源名称。本示例输入为:dli_kafka。弹性资源池:选择步骤1:创建队列中已经创建的队列。虚拟私有云:选择Kafka的虚拟私有云。子网:选择Kafka的子网。其他参数可以根据需要选择配置。参数配置完成后,单击“确定”完成增强型跨源配置。单击创建的跨源连接名称,查看跨源连接的连接状态,等待连接状态为:“已激活”后可以进行后续步骤。单击“队列管理”,选择操作的队列,本示例为步骤1:创建队列中添加的队列,在操作列,单击“更多 > 测试地址连通性”。在“测试连通性”界面,根据中获取的Kafka连接信息,地址栏输入“Kafka内网地址:Kafka数据库端口”,单击“测试”测试DLI到Kafka网络是否可达。创建DLI连接CSS的增强型跨源连接在CSS管理控制台,选择“集群管理”,单击已创建的CSS集群名称,进入到CSS的基本信息页面。在“基本信息”中获取CSS的“内网访问地址”、“虚拟私有云”和“子网”信息,方便后续操作步骤使用。单击“连接信息”中的安全组名称,在“入方向规则”中添加放通队列网段的规则。例如,本示例队列网段为“10.0.0.0/16”,则规则添加为:优先级选为:1,策略选为:允许,协议选择:TCP,端口值不填,类型:IPV4,源地址为:10.0.0.0/16,单击“确定”完成安全组规则添加。登录DLI管理控制台,在左侧导航栏单击“跨源管理”,在跨源管理界面,单击“增强型跨源”,单击“创建”。说明:本示例默认Kafka和CSS实例分别在两个VPC和子网下,所以要分别创建增强型跨源连接打通网络。如果Kafka和CSS实例属于同一VPC和子网下,则创建增强型跨源一次即可,4和5不需要再执行。在增强型跨源创建界面,配置具体的跨源连接参数。具体参考如下。连接名称:设置具体的增强型跨源名称。本示例输入为:dli_css。弹性资源池:选择步骤1:创建队列中已经创建的队列。虚拟私有云:选择CSS的虚拟私有云。子网:选择CSS的子网。其他参数可以根据需要选择配置。参数配置完成后,单击“确定”完成增强型跨源配置。单击创建的跨源连接名称,查看跨源连接的连接状态,等待连接状态为:“已激活”后可以进行后续步骤。单击“队列管理”,选择操作的队列,本示例为步骤1:创建队列中添加的队列,在操作列,单击“更多 > 测试地址连通性”。在“测试连通性”界面,根据2获取的CSS连接信息,地址栏输入“CSS内网地址:CSS内网端口”,单击“测试”测试DLI到CSS网络是否可达。步骤5:运行作业在DLI管理控制台,单击“作业管理 > Flink作业”,在Flink作业管理界面,单击“创建作业”。在创建队列界面,类型选择“Flink OpenSource SQL”,名称填写为:FlinkKafkaES。单击“确定”,跳转到Flink作业编辑界面。在Flink OpenSource SQL作业编辑界面,配置如下参数。所属队列:选择步骤1:创建队列中创建的队列。Flink版本:选择1.12。保存作业日志:勾选。OBS桶:选择保存作业日志的OBS桶,根据提示进行OBS桶权限授权。开启Checkpoint:勾选。Flink作业编辑框中输入具体的作业SQL,本示例作业参考如下。SQL中加粗的参数需要根据实际情况修改。说明:本示例使用的Flink版本为1.12,故Flink OpenSource SQL语法也是1.12。本示例数据源是Kafka,写入结果数据到Elasticsearch,故请参考Flink OpenSource SQL 1.12创建Kafka源表和Flink OpenSource SQL 1.12创建Elasticsearch结果表。CREATE TABLE kafkaSource ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string) with ( "connector" = "kafka", "properties.bootstrap.servers" = "10.128.0.120:9092,10.128.0.89:9092,10.128.0.83:9092",--替换为kafka的内网连接地址和端口 "properties.group.id" = "click", "topic" = "testkafkatopic", --创建的Kafka Topic "format" = "json", "scan.startup.mode" = "latest-offset");CREATE TABLE elasticsearchSink ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = '192.168.168.125:9200', --替换为CSS集群的内网地址和端口 'index' = 'shoporders' --创建的Elasticsearch搜索引擎);--将Kafka数据写入到Elasticsearch索引中insert into elasticsearchSinkselect *from kafkaSource;单击“语义校验”确保SQL语义校验成功。单击“保存”,保存作业。单击“启动”,启动作业,确认作业参数信息,单击“立即启动”开始执行作业。等待作业运行状态变为“运行中”。步骤6:发送数据和查询结果使用Kafka客户端向步骤2:创建Kafka的Topic中的Topic发送数据,模拟实时数据流。Kafka生产和发送数据的方法请参考:DMS - 连接实例生产消费信息。发送样例数据如下:{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}{"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0002", "user_name":"Jason", "area_id":"330106"}发送成功后,在CSS集群的Kibana中执行下述语句并查看相应结果:GET shoporders/_search查询结果返回如下:{ "took" : 0, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 2, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "shoporders", "_type" : "_doc", "_id" : "6fswzIAByVjqg3_qAyM1", "_score" : 1.0, "_source" : { "order_id" : "202103241000000001", "order_channel" : "webShop", "order_time" : "2021-03-24 10:00:00", "pay_amount" : 100.0, "real_pay" : 100.0, "pay_time" : "2021-03-24 10:02:03", "user_id" : "0001", "user_name" : "Alice", "area_id" : "330106" } }, { "_index" : "shoporders", "_type" : "_doc", "_id" : "6vs1zIAByVjqg3_qyyPp", "_score" : 1.0, "_source" : { "order_id" : "202103241606060001", "order_channel" : "appShop", "order_time" : "2021-03-24 16:06:06", "pay_amount" : 200.0, "real_pay" : 180.0, "pay_time" : "2021-03-24 16:10:06", "user_id" : "0002", "user_name" : "Jason", "area_id" : "330106" } } ] }}
  • [最佳实践] 迁移Elasticsearch数据至DLI
    本文为您介绍如何通过CDM数据同步功能,迁移Elasticsearch类型的CSS集群数据至DLI。其他自建的Elasticsearch等服务数据,均可以通过CDM与DLI进行双向同步。前提条件已创建DLI的SQL队列。创建DLI队列的操作可以参考创建DLI队列。注意:创建DLI队列时队列类型需要选择为“SQL队列”。已创建Elasticsearch类型的CSS集群。具体创建CSS集群的操作可以参考创建CSS集群。本示例创建的CSS集群版本为:7.6.2,集群为非安全集群。已创建CDM迁移集群。创建CDM集群的操作可以参考创建CDM集群。说明:如果目标数据源为云下的数据库,则需要通过公网或者专线打通网络。通过公网互通时,需确保CDM集群已绑定EIP、CDM云上安全组出方向放通云下数据源所在的主机、数据源所在的主机可以访问公网且防火墙规则已开放连接端口。数据源为云上的CSS服务时,网络互通需满足如下条件:i. CDM集群与云上服务处于不同区域的情况下,需要通过公网或者专线打通网络。通过公网互通时,需确保CDM集群已绑定EIP,数据源所在的主机可以访问公网且防火墙规则已开放连接端口。ii. CDM集群与云上服务同区域情况下,同虚拟私有云、同子网、同安全组的不同实例默认网络互通;如果同虚拟私有云但是子网或安全组不同,还需配置路由规则及安全组规则。配置路由规则请参见如何配置路由规则章节,配置安全组规则请参见如何配置安全组规则章节。iii. 此外,您还必须确保该云服务的实例与CDM集群所属的企业项目必须相同,如果不同,需要修改工作空间的企业项目。本示例CDM集群的虚拟私有云、子网以及安全组和创建的CSS集群保持一致。步骤一:数据准备CSS集群上创建索引并导入数据。登录CSS管理控制台,选择“集群管理 > Elasticsearch”。在集群管理界面,在已创建的CSS集群的“操作”列,单击“Kibana”访问集群。在Kibana的左侧导航中选择“Dev Tools”,进入到Console界面。在Console界面,执行如下命令创建索引“my_test”。PUT /my_test{ "settings": { "number_of_shards": 1 }, "mappings": { "properties": { "productName": { "type": "text", "analyzer": "ik_smart" }, "size": { "type": "keyword" } } } }在Console界面,执行如下命令,将数据导入到“my_test”索引中。POST /my_test/_doc/_bulk{"index":{}}{"productName":"2017秋装新款文艺衬衫女装","size":"L"}{"index":{}}{"productName":"2017秋装新款文艺衬衫女装","size":"M"}{"index":{}}{"productName":"2017秋装新款文艺衬衫女装","size":"S"}{"index":{}}{"productName":"2018春装新款牛仔裤女装","size":"M"}{"index":{}}{"productName":"2018春装新款牛仔裤女装","size":"S"}{"index":{}}{"productName":"2017春装新款休闲裤女装","size":"L"}{"index":{}}{"productName":"2017春装新款休闲裤女装","size":"S"}当返回结果信息中“errors”字段的值为“false”时,表示导入数据成功。在DLI上创建数据库和表。登录DLI管理控制台,选择“SQL编辑器”,在SQL编辑器中“执行引擎”选择“spark”,“队列”选择已创建的SQL队列。在编辑器中输入以下语句创建数据库,例如当前创建迁移后的DLI数据库testdb。详细的DLI创建数据库的语法可以参考创建DLI数据库。create database testdb;创建数据库下的表。详细的DLI建表语法可以参考创建DLI表。create table tablecss(size string, productname string);步骤二:数据迁移配置CDM数据源连接。配置源端CSS的数据源连接。登录CDM控制台,选择“集群管理”,选择已创建的CDM集群,在操作列选择“作业管理”。在作业管理界面,选择“连接管理”,单击“新建连接”,连接器类型选择“云搜索服务”,单击“下一步”。图1 创建CSS数据源配置源端CSS的数据源连接,具体参数配置如下。详细参数配置可以参考CDM上配置CSS连接。表1 CSS数据源配置参数值名称自定义CSS数据源名称。例如当前配置为“source_css”。Elasticsearch服务器列表单击输入框旁边的“选择”按钮,选择当前CSS集群即可自动关联出来Elasticsearch服务器列表。安全模式认证如果所需连接的CSS集群在创建时开启了“安全模式”,该参数需设置为“是”,否则设置为“否”。本示例选择为“否”。图2 CDM配置CSS数据源单击“保存”完成CSS数据源配置。配置目的端DLI的数据源连接。登录CDM控制台,选择“集群管理”,选择已创建的CDM集群,在操作列选择“作业管理”。在作业管理界面,选择“连接管理”,单击“新建连接”,连接器类型选择“数据湖探索(DLI)”,单击“下一步”。图3 创建DLI数据源连接配置目的端DLI数据源连接连接参数。具体参数配置可以参考在CDM上配置DLI连接。图4 配置DLI数据源连接参数配置完成后,单击“保存”完成DLI数据源配置。创建CDM迁移作业。登录CDM控制台,选择“集群管理”,选择已创建的CDM集群,在操作列选择“作业管理”。在“作业管理”界面,选择“表/文件迁移”,单击“新建作业”。在新建作业界面,配置当前作业配置信息,具体参数参考如下:图5 新建CDM作业作业配置作业名称:自定义数据迁移的作业名称。例如,当前定义为:css_to_dli。源端作业配置,具体参考如下:表2 源端作业配置参数名参数值源连接名称选择1.a中已创建的数据源名称。索引选择CSS集群中创建的Elasticsearch索引名。当前示例为CSS集群上创建索引并导入数据中创建的索引“my_test”。索引名称只能全部小写,不能有大写。类型Elasticsearch的类型,类似关系数据库中的表名称。类型名称只能全部小写,不能有大写。当前示例为:“_doc”。更多其他参数说明可以参考:CDM配置CSS源端参数。目的端作业配置,具体参考如下:表3 目的端作业配置参数名参数值目的连接名称选择1.b已创建的DLI数据源连接。资源队列选择已创建的DLI SQL类型的队列。数据库名称选择DLI下已创建的数据库。当前示例为在DLI上创建数据库和表中创建的数据库名,即为“testdb”。表名选择DLI下已创建的表名。当前示例为在DLI上创建数据库和表中创建的表名,即为“tablecss”。导入前清空数据选择导入前是否清空目的表的数据。当前示例选择为“否”。如果设置为是,任务启动前会清除目标表中数据。详细的参数配置可以参考:CDM配置DLI目的端参数。单击“下一步”,进入到字段映射界面,CDM会自动匹配源和目的字段。如果字段映射顺序不匹配,可通过拖拽字段调整。如果选择在目的端自动创建类型,这里还需要配置每个类型的字段类型、字段名称。CDM支持迁移过程中转换字段内容,详细请参见字段转换。图6 字段映射单击“下一步”配置任务参数,一般情况下全部保持默认即可。该步骤用户可以配置如下可选功能:作业失败重试:如果作业执行失败,可选择是否自动重试,这里保持默认值“不重试”。作业分组:选择作业所属的分组,默认分组为“DEFAULT”。在CDM“作业管理”界面,支持作业分组显示、按组批量启动作业、按分组导出作业等操作。是否定时执行:如果需要配置作业定时自动执行,请参见配置定时任务。这里保持默认值“否”。抽取并发数:设置同时执行的抽取任务数。这里保持默认值“1”。是否写入脏数据:如果需要将作业执行过程中处理失败的数据、或者被清洗过滤掉的数据写入OBS中,以便后面查看,可通过该参数配置,写入脏数据前需要先配置好OBS连接。这里保持默认值“否”即可,不记录脏数据。单击“保存并运行”,回到作业管理界面,在作业管理界面可查看作业执行进度和结果。图7 迁移作业进度和结果查询步骤三:结果查询CDM迁移作业运行完成后,再登录到DLI管理控制台,选择“SQL编辑器”,在SQL编辑器中“执行引擎”选择“spark”,“队列”选择已创建的SQL队列,数据库选择已1中已创建的数据库,执行DLI表查询语句,查询CSS的数据是否已成功迁移到DLI的“tablecss”表中。select * from tablecss;图8 迁移后查询DLI的表数据
  • [技术干货] 如何解决ES搜索引擎与DB对象模型表的数据不一致的情况
    搜索引擎的使用与DB查询不是使用同一个数据库,它们的事务也是分别开启的,所以会存在异常情况时候:搜索引擎事务与DB事务不一致。解决搜索引擎事务不一致问题的参考方案如下:假设模型对象表为表A,表A需要同步数据到搜索引擎ES中的表B,以便加速搜索。由于在同一脚本中同时操作表A与表B,在异常情况下会存在数据不一致的情况(因为是不同数据库,事务分别开启)。为了确保A表的修改,可以同步到B表中,即当一次操作需要修改A表的某条记录,并且同步修改B表的对应记录时,可以通过如下步骤实现。A表中新增一个字段flag,A表中的每条记录改动时,都将A表记录中的该标记字段flag设为false。然后通过发送事件的方式,异步修改B表的记录,事件需要在A表操作完成并且提交事务成功后发送。否则会导致异步事件监测不到改动,无法正确修改B表的记录。待B表成功修改后,同步更新A表中的该字段flag的值为true。由于同步到B表的过程也可能失败,则需要开启定时任务,搜索A表中flag字段为false(即为未同步到B表的记录)。将定时任务搜索得到的A表中与B表不同步的记录进行再次同步。以上过程,除了定时任务相似的处理没有列出以外,假设最大的方框表示表A与表B数据一致时,无论查询哪一张表,对外都无感知的。对于大方框内,A表与B表之间的保证数据一致性的方式,则由内部流程图所示。由于定时任务查询得到表A中flag字段为false的记录(即为未同步到表B的记录时),再次进行数据一致性操作的处理流程相似,所以流程图中未列出定时任务处理全量查询不一致的记录的流程。
  • [技术干货] 智慧园区ES搜索引擎加速的使用场景
    ElasticSearch作为专业的搜索引擎,具有搜索速度快的好处。但是业务中是否需要使用,则需要视具体情况而定。在AppCube平台的相关参数与限制章节中已经介绍了平台对接ES的一些限制:如果业务需要进行条件查询,条件查询的入参携带参数大于1024个时,不应该考虑使用ES进行加速,参考AppCube平台的相关参数与限制章节中的第1条限制。如果业务需要支持分页查询的起始条数大于10000的情况,则不应该考虑使用ES进行加速,参考AppCube平台的相关参数与限制章节中的第3条限制。如果对当前多表关联的搜索性能要求比较高,可以考虑使用ES进行加速查询。因为使用SQL进行多表关联查询,会受到表记录数量大小,查询SQL写法,SQL条件的复杂程度的影响。以基线人员BO为例,关联表数量较多,大于6表关联,并且表记录较多,有超过3张表的表记录数量达到数十万甚至数百万级别的。未使用ES的情况:如果进行5~6表关联查询,就算使用索引查询,也需要800ms以上。由于count查询性能较低,也会导致一条SQL需要1.5s以上的耗时。单个完整业务的接口处理时长大于2s甚至到达5s以上。这是由于一个业务场景中,这样的SQL不止一两条,会导致整个业务场景下前端调用接口时候高延时。这时候,则可以使用ES进行加速。使用ES后,相同单线程无并发的情况下,整个查询过程仅需要200ms~500ms,对比原来的几秒已经有了极大的提升,这时候推荐使用ES进行加速。
  • [技术干货] ES6 教程
    简介ES6, 全称 ECMAScript 6.0 ,是 JavaScript 的下一个版本标准,2015.06 发版。ES6 主要是为了解决 ES5 的先天不足,比如 JavaScript 里并没有类的概念,但是目前浏览器的 JavaScript 是 ES5 版本,大多数高版本的浏览器也支持 ES6,不过只实现了 ES6 的部分特性和功能。ECMAScript 的背景JavaScript 是大家所了解的语言名称,但是这个语言名称是商标( Oracle 公司注册的商标)。因此,JavaScript 的正式名称是 ECMAScript 。1996年11月,JavaScript 的创造者网景公司将 JS 提交给国际化标准组织 ECMA(European computer manufactures association,欧洲计算机制造联合会),希望这种语言能够成为国际标准,随后 ECMA 发布了规定浏览器脚本语言的标准,即 ECMAScript。这也有利于这门语言的开放和中立。ECMAScript 的历史ES6 是 ECMAScript 标准十余年来变动最大的一个版本,为其添加了许多新的语法特性。1997 年 ECMAScript 1.0 诞生。1998 年 6 月 ECMAScript 2.0 诞生,包含一些小的更改,用于同步独立的 ISO 国际标准。1999 年 12 月 ECMAScript 3.0诞生,它是一个巨大的成功,在业界得到了广泛的支持,它奠定了 JS 的基本语法,被其后版本完全继承。直到今天,我们一开始学习 JS ,其实就是在学 3.0 版的语法。2000 年的 ECMAScript 4.0 是当下 ES6 的前身,但由于这个版本太过激烈,对 ES 3 做了彻底升级,所以暂时被"和谐"了。2009 年 12 月,ECMAScript 5.0 版正式发布。ECMA 专家组预计 ECMAScript 的第五个版本会在 2013 年中期到 2018 年作为主流的开发标准。2011年6月,ES 5.1 版发布,并且成为 ISO 国际标准。2013 年,ES6 草案冻结,不再添加新的功能,新的功能将被放到 ES7 中;2015年6月, ES6 正式通过,成为国际标准。ES6 的目标与愿景成为更好编写的开发语言有以下目标。适应更复杂的应用;实现代码库之间的共享;不断迭代维护新版本。目前各大浏览器基本上都支持 ES6 的新特性,其中 Chrome 和 Firefox 浏览器对 ES6 新特性最友好,IE7~11 基本不支持 ES6。以下是各大浏览器支持情况及开始时间:实例var a = 2; { let a = 3; document.write(a); // 3 } document.write('<br>'); document.write(a); // 2尝试一下 »浏览器支持的详细的内容可以参考:http://kangax.github.io/compat-table/es6/Node.js 是运行在服务端的 JavaScript,它对 ES6 的支持度更高。如果你还不了解 Node.js 可以阅读我们的 Node.js 教程。Node.js 安装可以参考 Node.js 安装配置。在 Node.js 环境中运行 ES6$ node> let sitename="runoob"undefined> console.log(sitename)runoobundefined>使用下面的命令,可以查看 Node 已经实现的 ES6 特性。node --v8-options | grep harmonywebpackwebpack 是一个现代 JavaScript 应用程序的静态模块打包器 (module bundler) 。当 webpack 处理应用程序时,它会递归地构建一个依赖关系图 (dependency graph) ,其中包含应用程序需要的每个模块,然后将所有这些模块打包成一个或多个 bundle 。webpack 主要有四个核心概念:入口 (entry)输出 (output)loader插件 (plugins)入口 (entry)入口会指示 webpack 应该使用哪个模块,来作为构建其内部依赖图的开始。进入入口起点后,webpack 会找出有哪些模块和库是入口起点(直接和间接)依赖的。在 webpack 中入口有多种方式来定义,如下面例子:单个入口(简写)语法:const config = { entry: "./src/main.js"}对象语法:const config = { app: "./src/main.js", vendors: "./src/vendors.js"}输出 (output):output 属性会告诉 webpack 在哪里输出它创建的 bundles ,以及如何命名这些文件,默认值为 ./dist:const config = { entry: "./src/main.js", output: { filename: "bundle.js", path: path.resolve(__dirname, 'dist') }}loaderloader 让 webpack 可以去处理那些非 JavaScript 文件( webpack 自身只理解 JavaScript )。loader 可以将所有类型的文件转换为 webpack 能够有效处理的模块,例如,开发的时候使用 ES6 ,通过 loader 将 ES6 的语法转为 ES5 ,如下配置:const config = { entry: "./src/main.js", output: { filename: "bundle.js", path: path.resolve(__dirname, 'dist') }, module: { rules: [ { test: /\.js$/, exclude: /node_modules/, loader: "babel-loader", options: [ presets: ["env"] ] } ] } }插件 (plugins)loader 被用于转换某些类型的模块,而插件则可以做更多的事情。包括打包优化、压缩、定义环境变量等等。插件的功能强大,是 webpack 扩展非常重要的利器,可以用来处理各种各样的任务。使用一个插件也非常容易,只需要 require() ,然后添加到 plugins 数组中。// 通过 npm 安装 const HtmlWebpackPlugin = require('html-webpack-plugin'); // 用于访问内置插件 const webpack = require('webpack'); const config = { module: { rules: [ { test: /\.js$/, exclude: /node_modules/, loader: "babel-loader" } ] }, plugins: [ new HtmlWebpackPlugin({template: './src/index.html'}) ] };利用 webpack 搭建应用webpack.config.jsconst path = require('path'); module.exports = { mode: "development", // "production" | "development" // 选择 development 为开发模式, production 为生产模式 entry: "./src/main.js", output: { filename: "bundle.js", path: path.resolve(__dirname, 'dist') }, module: { rules: [ { test: /\.js$/, exclude: /node_modules/, loader: "babel-loader", options: [ presets: ["env"] ] } ] }, plugins: [ ... ] }上述例子构建了一个最简单的配置,webpack 会从入口 main.js 文件进行构建,通过 loader 进行js转换,输出一个为 bundle.js 的文件,至此一整个过程就构建完成。gulpgulp 是一个基于流的自动化构建工具,具有易于使用、构建快速、插件高质和易于学习的特点,常用于轻量级的工程中。如何使用?全局安装 gulp:$ npm install --global gulp在项目中引入依赖:$ npm install --save-dev gulp在项目根目录下创建名为 gulpfile.js 的文件:const gulp = require('gulp');// default 表示一个任务名,为默认执行任务gulp.task('default', function() { // 放置默认的任务代码})运行 gulp:$ gulp利用 gulp 搭建应用const gulp = require('gulp'); const uglify = require("gulp-uglify"); gulp.task('default', function() { gulp.src('./src/main.js') .pipe(uglify()) .pipe(gulp.dest('./dist')); })
  • [问题求助] 【appcube】【ES云搜索服务】ES经常性异常,索引频繁坏掉,记录数变为0
    【功能模块】ES云搜索服务【操作步骤&问题现象】索引记录数频繁变为0,每次出现问题的时候把索引删掉再重新同步就可以了【截图信息】【日志信息】(可选,上传日志内容或者附件)
  • [问题求助] 【华为自研项目】【审计日志】一目前审计日志存在es中,可能会被写满,没有做清理机制
    在做安全排查时,咨询到园区同事,发现审计日志没有做清理机制,es可能会被写满bwx1097451
  • [二次开发] 【MRS3.1.2产品】【elasticsearch中HwRestClient方法功能】能否通过封装一个Properties对象
    【功能模块】Rest Client客户端样例中HwRestClient 默认从代码运行路径的conf目录下读取配置文件:esParams.properties、krb5.conf 和 user.keytab 能否通过在客户端连接集群中能否通过封装一个Properties对象来实现连接。【操作步骤&问题现象】二次开发elasticsearch组件在客户端连接集群 开放出来一个构造方法,支持传入一个Properties对象,进行构造client对象,并且把那些time超时类型的设置,给定一些默认值,我需要修改就在Proper中传递,不需要就直接使用你们的默认值,用简单的方式能够快速的构造出es的client对象【截图信息】【日志信息】(可选,上传日志内容或者附件)
  • [大数据] ES关闭索引流程源码解析 -- 7.5.1
  • [技术干货] MRS中的Elasticsearch是否支持存算分离
    MRS中的Elasticsearch是否支持存算分离
  • [生态对接] Kibana-oss 6.7.1对接 FusionInsight Elasticsearch6.7.1
    【功能模块】FusionInsight Elasticsearch 6.7.1Kibana 6.7.1linux系统为7.x x86【操作步骤&问题现象】1、按照https://bbs.huaweicloud.com/forum/thread-66788-1-1.html这篇文章的步骤,kibana连接es, 日志报错如下:{"type":"log","@timestamp":"2022-02-23T02:16:25Z","tags":["warning","elasticsearch","admin"],"pid":8726,"message":"Unable to revive connection: http://x.x.x.x:24100/"}2、打开浏览器,访问kibana,http://x.x.x.x:5601,报错如下:Kibana server is not ready yet【截图信息】有谁知道这个问题怎么解决吗?【日志信息】(可选,上传日志内容或者附件)
  • [技术干货] 还在用ES查日志吗,快看看石墨文档 Clickhouse 日志架构玩法
    1 背景石墨文档全部应用部署在Kubernetes上,每时每刻都会有大量的日志输出,我们之前主要使用SLS和ES作为日志存储。但是我们在使用这些组件的时候,发现了一些问题。成本问题:SLS个人觉得是一个非常优秀的产品,速度快,交互方便,但是SLS索引成本比较贵我们想减少SLS索引成本的时候,发现云厂商并不支持分析单个索引的成本,导致我们无法知道是哪些索引构建的不够合理ES使用的存储非常多,并且耗费大量的内存通用问题:如果业务是混合云架构,或者业务形态有SAAS和私有化两种方式,那么SLS并不能通用日志和链路,需要用两套云产品,不是很方便精确度问题:SLS存储的精度只能到秒,但我们实际日志精度到毫秒,如果日志里面有traceid,SLS中无法通过根据traceid信息,将日志根据毫秒时间做排序,不利于排查错误我们经过一番调研后,发现使用Clickhouse能够很好的解决以上问题,并且Clickhouse省存储空间,非常省钱,所以我们选择了Clickhouse方案存储日志。但当我们深入研究后,Clickhouse作为日志存储有许多落地的细节,但业界并没有很好阐述相关Clickhouse采集日志的整套流程,以及没有一款优秀的Clickhouse日志查询工具帮助分析日志,为此我们写了一套Clickhouse日志系统贡献给开源社区,并将Clickhouse的日志采集架构的经验做了总结。先上个Clickhouse日志查询界面,让大家感受下石墨最懂前端的后端程序员。2 架构原理图我们将日志系统分为四个部分:日志采集、日志传输、日志存储、日志管理。日志采集:LogCollector采用Daemonset方式部署,将宿主机日志目录挂载到LogCollector的容器内,LogCollector通过挂载的目录能够采集到应用日志、系统日志、K8S审计日志等日志传输:通过不同Logstore映射到Kafka中不同的Topic,将不同数据结构的日志做了分离日志存储:使用Clickhouse中的两种引擎数据表和物化视图日志管理:开源的Mogo系统,能够查询日志,设置日志索引,设置LogCollector配置,设置Clickhouse表,设置报警等以下我们按照这四大部分,阐述其中的架构原理3 日志采集3.1 采集方式Kubernetes容器内日志收集的方式通常有以下三种方案DaemonSet方式采集:在每个 node 节点上部署LogCollector,并将宿主机的目录挂载为容器的日志目录,LogCollector读取日志内容,采集到日志中心。网络方式采集:通过应用的日志 SDK,直接将日志内容采集到日志中心 。SideCar方式采集:在每个 pod 内部署LogCollector,LogCollector只读取这个 pod 内的日志内容,采集到日志中心。以下是三种采集方式的优缺点:DaemonSet方式    网络方式    SideCar方式采集日志类型    标准输出+文件    应用日志部署运维    一般,维护DaemonSet    低,维护配置文件日志分类存储    可通过容器/路径等映射    业务独立配置支持集群规模    取决于配置数    无限制适用场景    日志分类明确、功能较单一    性能要求极高的场景资源消耗    中    低我们主要采用DaemonSet方式和网络方式采集日志。DaemonSet方式用于ingress、应用日志的采集,网络方式用于大数据日志的采集。以下我们主要介绍下DeamonSet方式的采集方式。 ​3.2 日志输出从上面的介绍中可以看到,我们的DaemonSet会有两种方式采集日志类型,一种是标准输出,一种是文件。 引用元乙的描述:虽然使用 Stdout 打印日志是 Docker 官方推荐的方式,但大家需要注意:这个推荐是基于容器只作为简单应用的场景,实际的业务场景中我们还是建议大家尽可能使用文件的方式,主要的原因有以下几点:Stdout 性能问题,从应用输出 stdout 到服务端,中间会经过好几个流程(例如普遍使用的JSON LogDriver):应用 stdout -> DockerEngine -> LogDriver -> 序列化成 JSON -> 保存到文件 -> Agent 采集文件 -> 解析 JSON -> 上传服务端。整个流程相比文件的额外开销要多很多,在压测时,每秒 10 万行日志输出就会额外占用 DockerEngine 1 个 CPU 核;Stdout 不支持分类,即所有的输出都混在一个流中,无法像文件一样分类输出,通常一个应用中有 AccessLog、ErrorLog、InterfaceLog(调用外部接口的日志)、TraceLog 等,而这些日志的格式、用途不一,如果混在同一个流中将很难采集和分析;Stdout 只支持容器的主程序输出,如果是 daemon/fork 方式运行的程序将无法使用 stdout;文件的 Dump 方式支持各种策略,例如同步/异步写入、缓存大小、文件轮转策略、压缩策略、清除策略等,相对更加灵活。从这个描述中,我们可以看出在docker中输出文件在采集到日志中心是一个更好的实践。所有日志采集工具都支持采集文件日志方式,但是我们在配置日志采集规则的时候,发现开源的一些日志采集工具,例如fluentbit、filebeat在DaemonSet部署下采集文件日志是不支持追加例如pod、namespace、container_name、container_id等label信息,并且也无法通过这些label做些定制化的日志采集。agent类型    采集方式    daemonset部署    sidecar部署ilogtail    文件日志    能够追加label信息,能够根据label过滤采集    能够追加label信息,能够根据label过滤采集fluentbit    文件日志    无法追加label信息,无法根据label过滤采集    能够追加abel信息,能够根据label过滤采集filebeat    文件日志    无法追加label信息,无法根据label过滤采集    能够追加label信息,能够根据label过滤采集ilogtail    标准输出    能够追加label信息,能够根据label过滤采集    能够追加label信息,能够根据label过滤采集fluentbit    标准输出    能够追加label信息,能够根据label过滤采集    能够追加abel信息,能够根据label过滤采集filebeat    标准输出    能够追加label信息,能够根据label过滤采集    能够追加label信息,能够根据label过滤采集基于无法追加label信息的原因,我们暂时放弃了DeamonSet部署下文件日志采集方式,采用的是基于DeamonSet部署下标准输出的采集方式。 ​3.3 日志目录以下列举了日志目录的基本情况目录    描述    类型/var/log/containers    存放的是软链接,软链到/var/log/pods里的标准输出日志    ​标准输出/var/log/pods    存放标准输出日志    ​标准输出/var/log/kubernetes/    master存放Kubernetes 审计输出日志    标准输出/var/lib/docker/overlay2    存放应用日志文件信息    文件日志/var/run    获取docker.sock,用于docker通信    文件日志/var/lib/docker/containers    用于存储容器信息    两种都需要因为我们采集日志是使用的标准输出模式,所以根据上表我们的LogCollector只需要挂载/var/log,/var/lib/docker/containers两个目录。3.3.1 标准输出日志目录应用的标准输出日志存储在/var/log/containers目录下,​文件名是按照K8S日志规范生成的。这里以nginx-ingress的日志作为一个示例。我们通过ls /var/log/containers/ | grep nginx-ingress指令,可以看到nginx-ingress的文件名。  nginx-ingress-controller-mt2wx_kube-system_nginx-ingress-controller-be3741043eca1621ec4415fd87546b1beb29480ac74ab1cdd9f52003cf4abf0a.log ​我们参照K8S日志的规范:/var/log/containers/%{DATA:pod_name}_%{DATA:namespace}_%{GREEDYDATA:container_name}-%{DATA:container_id}.log。可以将nginx-ingress日志解析为:pod_name:nginx-ingress-controller-mt2wnamespace:kube-systemcontainer_name:nginx-ingress-controllercontainer_id:be3741043eca1621ec4415fd87546b1beb29480ac74ab1cdd9f52003cf4abf0a通过以上的日志解析信息,我们的LogCollector 就可以很方便的追加pod、namespace、container_name、container_id的信息。 ​3.3.2 容器信息目录应用的容器信息存储在/var/lib/docker/containers目录下,目录下的每一个文件夹为容器ID,我们可以通过cat config.v2.json获取应用的docker基本信息。3.4 LogCollector采集日志3.4.1 配置我们LogCollector采用的是fluent-bit,该工具是cncf旗下的,能够更好的与云原生相结合。通过Mogo系统可以选择Kubernetes集群,很方便的设置fluent-bit configmap的配置规则。3.4.2 数据结构​fluent-bit的默认采集数据结构@timestamp字段:string or float,用于记录采集日志的时间log字段:string,用于记录日志的完整内容Clickhouse如果使用@timestamp的时候,因为里面有@特殊字符,会处理的有问题。所以我们在处理fluent-bit的采集数据结构,会做一些映射关系,并且规定双下划线为Mogo系统日志索引,避免和业务日志的索引冲突。_time_字段:string or float,用于记录采集日志的时间_log_字段:string,用于记录日志的完整内容例如你的日志记录的是{"id":1},那么实际fluent-bit采集的日志会是{"_time_":"2022-01-15...","_log_":"{\"id\":1}" 该日志结构会直接写入到kafka中,Mogo系统会根据这两个字段_time_、_log_设置clickhouse中的数据表。3.4.3 采集如果我们要采集ingress日志,我们需要在input配置里,设置ingress的日志目录,fluent-bit会把ingress日志采集到内存里。然后我们在filter配置里,将log改写为_log_ 然后我们在ouput配置里,将追加的日志采集时间设置为_time_,设置好日志写入的kafka borkers和kafka topics,那么fluent-bit里内存的日志就会写入到kafka中日志写入到Kafka中_log_需要为json,如果你的应用写入的日志不是json,那么你就需要根据fluent-bit的parser文档,调整你的日志写入的数据结构:https://docs.fluentbit.io/manual/pipeline/filters/parser4 日志传输Kafka主要用于日志传输。上文说到我们使用fluent-bit采集日志的默认数据结构,在下图kafka工具中我们可以看到日志采集的内容。  在日志采集过程中,会由于不用业务日志字段不一致,解析方式是不一样的。所以我们在日志传输阶段,需要将不同数据结构的日志,创建不同的Clickhouse表,映射到Kafka不同的Topic。这里以ingress为例,那么我们在Clickhouse中需要创建一个ingress_stdout_stream的Kafka引擎表,然后映射到Kafka的ingress-stdout Topic里。5 日志存储我们会使用三种表,用于存储一种业务类型的日志。Kafka引擎表:将数据从Kafka采集到Clickhouse的ingress_stdout_stream数据表中create table logger.ingress_stdout_stream( _source_ String, _pod_name_ String, _namespace_ String, _node_name_ String, _container_name_ String, _cluster_ String, _log_agent_ String, _node_ip_ String, _time_ Float64, _log_ String)engine = Kafka SETTINGS kafka_broker_list = 'kafka:9092', kafka_topic_list = 'ingress-stdout', kafka_group_name = 'logger_ingress_stdout', kafka_format = 'JSONEachRow', kafka_num_consumers = 1;物化视图:将数据从ingress_stdout_stream数据表读取出来,_log_根据Mogo配置的索引,提取字段在写入到ingress_stdout结果表里CREATE MATERIALIZED VIEW logger.ingress_stdout_view TO logger.ingress_stdout ASSELECT    toDateTime(toInt64(_time_)) AS _time_second_,fromUnixTimestamp64Nano(toInt64(_time_*1000000000),'Asia/Shanghai') AS _time_nanosecond_, _pod_name_, _namespace_, _node_name_, _container_name_, _cluster_, _log_agent_, _node_ip_, _source_, _log_ AS _raw_log_,JSONExtractInt(_log_, 'status') AS status,JSONExtractString(_log_, 'url') AS url FROM logger.ingress_stdout_stream where 1=1;结果表:存储最终的数据create table logger.ingress_stdout( _time_second_ DateTime, _time_nanosecond_ DateTime64(9, 'Asia/Shanghai'), _source_ String, _cluster_ String, _log_agent_ String, _namespace_ String, _node_name_ String, _node_ip_ String, _container_name_ String, _pod_name_ String, _raw_log_ String, status Nullable(Int64), url Nullable(String),)engine = MergeTree PARTITION BY toYYYYMMDD(_time_second_)ORDER BY _time_second_TTL toDateTime(_time_second_) + INTERVAL 7 DAYSETTINGS index_granularity = 8192;6 总结流程日志会通过fluent-bit的规则采集到kafka,在这里我们会将日志采集到两个字段里_time_字段用于存储fluent-bit采集的时间_log_字段用于存放原始日志通过mogo,在clickhouse里设置了三个表app_stdout_stream: 将数据从Kafka采集到Clickhouse的Kafka引擎表app_stdout_view: 视图表用于存放mogo设置的索引规则app_stdout:根据app_stdout_view索引解析规则,消费app_stdout_stream里的数据,存放于app_stdout结果表中最后mogo的UI界面,根据app_stdout的数据,查询日志信息7 Mogo界面展示查询日志界面 设置日志采集配置界面以上文档描述是针对石墨Kubernetes的日志采集,想了解物理机采集日志方案的,可以在下文中找到《Mogo使用文档》的链接,运行docker-compose体验Mogo 全部流程,查询Clickhouse日志。限于篇幅有限,Mogo的日志报警功能,下次在讲解。 ​
总条数:142 到第
上滑加载中