-
1、简介ElasticSearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。Elasticsearch是用Java开发的,并作为Apache许可条款下的开放源码发布,是当前流行的企业级搜索引擎。设计用于云计算中,能够达到实时搜索,稳定,可靠,快速,安装使用方便。官方链接:https://www.elastic.co/cn/products/elasticsearch类别:搜索引擎2、基础环境类别子项版本获取地址(方法)华为云虚拟机KC1(920)--OSCentOS7.5Kernel4.14软件包elasticsearch6.3.0https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.3.0.tar.gzjdk1.8.0yum安装3、依赖安装 安装JDKyum install -y java-1.8.0-openjdk 查看java版本java -version 4、组件编译安装从官网下载elasticsearch并解压wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.3.0.tar.gztar -xzf elasticsearch-6.3.0.tar.gz5、系统配置 Elasticsearch默认是不允许root用户运行的,添加非root用户。useradd esgroupadd es 修改ES安装目录的权限。chown -R es:es elasticsearch-6.3.0修改/etc/security/limits.conf文件,添加或修改如下行。* hard nofile 65536* soft nofile 65536修改 /etc/sysctl.conf 文件,添加或修改如下行vm.max_map_count=655360#如果该文件不存在,则应安装initscripts使当前环境设置vm.max_map_count生效sysctl -w vm.max_map_count=655360 修改ES配置禁用xpack.mlecho 'xpack.ml.enabled: false' >> elasticsearch-6.3.0/config/elasticsearch.yml 切换es账号su - es启动ES服务sh elasticsearch-6.3.0/bin/elasticsearch说明:后台启动需要添加-d参数。检查ES端口状态netstat -antlp|grep 9200 6、测试 测试内容:elasticsearch服务是否正常启动 测试结果:启动正常 7、参考信息官方文档:https://www.elastic.co/cn 8、FAQ无
-
1、简介ElasticSearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎。官网:https://artifacts.elastic.co2、基础环境类别子项版本获取地址(方法)华为云虚拟机KC1(920)--OSCentOS7.7Kernel4.14软件包ElasticSearch5.6.3https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.6.3.tar.gz 3、依赖安装 yum install -y java-1.8.0-openjdk4、组件编译安装 获取源码包cd /vdb/langwget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.6.3.tar.gztar xf elasticsearch-5.6.3.tar.gz5、系统配置 修改单进程最多可用于内存映射区大小为262145(ElasticSearch要求最小为262144)。vim /etc/sysctl.conf增加vm.max_map_count=262145sysctl –p 使配置生效#如果该文件不存在,则应安装initscripts 修改系统支持的文件句柄为65536。 系统默认文件句柄为65535,您可用ulimit -n查询,ElasticSearch要求最低为65536。 执行以下命令,编辑ElasticSearch-nofile.conf文件,添加如下内容。vi /etc/security/limits.d/ElasticSearch-nofile.conf * soft nofile 65536 * hard nofile 655336reboot重启生效 配置ElasticSearch启动文件cd /vdb/lang/elasticsearch-5.6.3vim config/elasticsearch.ymlnetwork.host修改为本地地址,http.port端口改为9301 创建账户并配备权限(ElasticSearch不支持root直接运行)。useradd elasticsearchgroupadd elasticsearchchown elasticsearch:elasticsearch ../elasticsearch-5.6.3 -R切换到“elasticsearch”账号下,运行ElasticSearch。su elasticsearch./bin/elasticsearch #启动6、测试curl localhost:9301 7、参考信息 无8、FAQ 无
-
1 ElasticSearch简介ElasticSearch 是一个分布式、高扩展、高实时的搜索与数据分析引擎。它能很方便的使大量数据具有搜索、分析和探索的能力。充分利用ElasticSearch的水平伸缩性,能使数据在生产环境变得更有价值。ElasticSearch 的实现原理主要分为以下几个步骤,首先用户将数据提交到Elastic Search 数据库中,再通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据,当用户搜索数据时候,再根据权重将结果排名,打分,再将返回结果呈现给用户。2 环境信息2.1 环境信息类别子项版本获取地址OSCentOS7.5 Aarch64href="https://www.centos.org/download/" https://www.centos.org/download/服务器配置16U16GB50GB软件ElasticSearch5.6.3https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.6.3.tar.gz3 软件移植3.1 环境准备:OS安装类型:CentOS-7.5-aarch64-1804。 注:操作系统安装使用最小简化版安装(如上图),其余步骤安装一般安装操作系统步骤即可。 3.1.1 相关软件下载上传:1、 上传CentOS 7.5系统ISO镜像文件至服务器2、 上传elasticsearch-5.6.3.tar压缩包至服务器目录下,如/opt下载地址:https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.6.3.tar.gz 3.2 安装ElasticSearch 1、 提前修改内核参数,并使之生效。a. 修改单进程最多映射大小(ElasticSearch最小要求262144)b. vi /etc/sysctl.conf c. 使上面的命令生效sysctl -p d. 修改参数,保存后重启云服务器生效。(ecs服务器不可以用reboot重启,需要去云平台上让管理员重启,否则可能不生效) 2、 使用脚本安装ElasticSearch。a. 上传以下脚本至要安装ElasticSearch的服务器上的任意目录,如/opt。 (elasticsearch_install.sh)b. 执行如下命令添加执行权限: chmod u+x /opt/elasticsearch_install.shc. 执行脚本安装 sh /opt/ elasticsearch_install.sh3.3 验证执行1.启动切换elasticsearch用户,进入bin目录启动elasticsearch。su – elasticsearch/opt/install/elasticsearch-5.6.3/bin/elasticsearch 2.客户端访问 4 参考信息5 FAQ
-
如题
-
1、简介 ElasticSearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。Elasticsearch是用Java开发的,并作为Apache许可条款下的开放源码发布,是当前流行的企业级搜索引擎。设计用于云计算中,能够达到实时搜索,稳定,可靠,快速,安装使用方便。 官方链接:https://www.elastic.co/cn/products/elasticsearch 类别:搜索引擎2、基础环境类别子项版本获取地址(方法)华为云虚拟机RC3(916)--OSCentOS7.5 Kernel4.14 软件包GCC4.8.5 elasticsearch6.2.2 Jdk1.8.0 ncurses5.9.14 zlib1.2.7 perl5.16.3 cmake2.8.12.2 openssl1.0.2k libaio0.3.109 3、依赖安装清除yumyum clean all下载服务器包到本地缓存yum makecache更新yum 包yum -y update 安装操作系统自带依赖软件版本yum install gcc gcc-c++ cmake ncurses-devel bison libaio-devel libncurses-devel libopenssl-devel zlib-devel autoconf perl per-devel4、组件编译安装 下文以此配置为例安装目录:${INSTALL_DIR}解压后的ES源码目录:${PATH_TO_ES}解压后的JDK源码目录:${PATH_TO_JDK} 从官网下载JDK源码并解压wget https://download.oracle.com/otn/java/jdk/8u211-b12/478a62b7d4e34b78b671c754eaaf38ab/jdk-8u211-linux-arm64-vfp-hflt.tar.gztar -xzvf jdk-8u211-linux-arm64-vfp-hflt.tar.gz 配置JAVA环境变量,编辑/etc/profile文件,在文件末尾添加JAVA的环境变量export JAVA_HOME=${PATH_TO_JDK}export JRE_HOME=${JAVA_HOME}/jreexport CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib:$CLASSPATHexport JAVA_PATH=${JAVA_HOME}/bin:${JRE_HOME}/binexport PATH=$PATH:${JAVA_PATH} 使环境变量生效source /etc/profile 验证JDK安装,出现java版本信息,即安装成功。java -version从官网下载elasticsearch源码并解压wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.2.2.tar.gztar -xzvf elasticsearch-6.2.2.tar.gz5、系统配置 Elasticsearch默认是不允许root用户运行的,添加非root用户。useradd es修改ES安装目录的权限。chown -R es:es ${PATH_TO_ES}修改/etc/security/limits.conf文件,添加或修改如下行。* hard nofile 65536* soft nofile 65536修改 /etc/sysctl.conf 文件,添加或修改如下行vm.max_map_count=655360切换es账号su - es启动ES服务sh ${PATH_TO_ES}/bin/elasticsearch检查ES端口状态。netstat -antlp|grep 9200访问ES服务。curl localhost:92006、测试 测试内容:elasticsearch服务是否正常启动 测试结果:7、参考信息官方文档:https://www.elastic.co/cn8、FAQ无
-
介绍主要分为安全模式和非安全模式两个方式进行介绍。非安全模式 非安全模式包括ES 6.2.3版本和6.5.4版本未开启安全模式时的集群。主要分为两个方式连接:TransportClient和RestHighLevelClient。 在此,强烈推荐6.2.3版本使用TransportClient方式,6.5.4版本使用RestHighLevelClient方式。 因为,6.2.3版本中RestHighLevelClient API未开发完全,功能欠缺;6.5.4版本中TransportClient已经渐渐被弃用,不具备一些新功能,如安全模式等,并且在ES 7.0.0版本官方文档中就不建议使用TransportClient了。(1)使用TransportClient类建立客户端:直接使用默认方法利用地址、端口等参数创建TransportClient类 //自动添加集群ip,并发现新加入的机器Settings settings = ImmutableSettings.settingsBuilder().put("client.transport.sniff",true).build(); TransportClient client = new TransportClient(settings).addTransportAddress(new InetSocketTransportAddress("host1", 9300));client.close();(2)使用RestHighLevelClient建立客户端使用默认方法创建RestHighLevelClient类RestHighLevelClient client = new RestHighLevelClient( RestClient.builder( new HttpHost("localhost", 9200, "http")));client.close();安全模式安全模式是6.5.4版本中开放的特性,启动之后需要用户名和密码才能连接集群,并进行在用户权限内的操作。(1)使用RestHighLevelClient类建立客户端参见论坛中另一篇帖子:ES安全模式使用Java API连接集群示例(2)使用TransportClient类建立客户端:首先,使用以下步骤在客户端使用命令行分别生成keystore和truststore文件,其中使用到从集群管理界面下载的证书(CloudSearchService.cer)。以下是关键步骤,其中keytool工具还有各种参数,可根据需求设置。keytool -genkeypair -alias certificatekey -keyalg RSA -keystore transport-keystore.jkskeytool -list -v -keystore transport-keystore.jkskeytool -import -alias certificatekey -file CloudSearchService.cer -keystore truststore.jkskeytool -list -v -keystore truststore然后,将生成的keystore、truststore文件导出,并放入工程项目文件夹中。最后,将上一步的文件放入客户端设置,使用PreBuiltTransportClient方法建立TransportClient类,并将连接设置放入客户端线程中。注意:不建议使用这种方法,一是需要将证书导入导出,操作较为复杂;二是代码中需要注意的地方也很多,比如jar包路径需要提前规划以及此方法使用的关键插件类OpenDistroSecurityPlugin需要检查引入jar包以及相关jar包是否引入完整。SecurityTransportClientDemo示例类import com.amazon.opendistroforelasticsearch.security.OpenDistroSecurityPlugin;import java.net.InetAddress;import java.net.URISyntaxException;import java.net.UnknownHostException;import java.nio.file.Paths;import java.util.Base64;import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;import org.elasticsearch.client.transport.TransportClient;import org.elasticsearch.common.settings.Settings;import org.elasticsearch.common.transport.TransportAddress;import org.elasticsearch.transport.client.PreBuiltTransportClient;public class SecurityTransportClientDemo { public SecurityTransportClientDemo() { } public static void main(String[] args) throws UnknownHostException, URISyntaxException { String ip = "100.95.184.188"; String userPw = "username:password"; String path = “jar path”;//jar包路径 Settings settings = Settings.builder().put("opendistro_security.ssl.transport.enforce_hostname_verification", false) .put("opendistro_security.ssl.transport.keystore_filepath", path + "/transport-keystore.jks") .put("opendistro_security.ssl.transport.keystore_password", "tscpass") .put("opendistro_security.ssl.transport.truststore_filepath", path + "/truststore.jks") .put("client.transport.ignore_cluster_name", "true") .put("client.transport.sniff", false) .build(); try { Throwable var5 = null; Object var6 = null; try { TransportClient client = (new PreBuiltTransportClient(settings, new Class[]{OpenDistroSecurityPlugin.class})) .addTransportAddress(new TransportAddress(InetAddress.getByName(ip), 9300)); try { String base64UserPw = Base64.getEncoder().encodeToString(userPw.getBytes("utf-8")); client.threadPool().getThreadContext().putHeader("Authorization", "Basic " + base64UserPw); //Specific business operations NodesInfoResponse nodes = (NodesInfoResponse)client.admin().cluster().prepareNodesInfo(new String[0]).get(); System.out.println(nodes.getClusterName()); nodes.getNodes().forEach((p) -> { System.out.println("host:" + p.getHostname() + ", version:" + p.getVersion()); }); } finally { if (client != null) { client.close(); } } } catch (Throwable var17) { if (var5 == null) { var5 = var17; } else if (var5 != var17) { var5.addSuppressed(var17); } throw var5; } } catch (Exception var18) { var18.printStackTrace(); } }}pom.xml文件<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0modelVersion> <groupId>com.huawei.cloud.ei.cssgroupId> <artifactId>security-transport-clients-demoartifactId> <version>1.0.0version> <dependencies> <dependency> <groupId>org.elasticsearch.clientgroupId> <artifactId>transportartifactId> <version>6.5.4version> dependency> <dependency> <groupId>com.amazon.opendistroforelasticsearchgroupId> <artifactId>opendistro_securityartifactId> <version>0.7.0.1version> dependency> dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.pluginsgroupId> <artifactId>maven-compiler-pluginartifactId> <configuration> <source>1.8source> <target>1.8target> configuration> plugin> plugins> build>project>
-
一、问题场景在Elasticsearch 6.5.4版本中提供了安全模式,开启安全模式后需要输入用户名和密码才能连接集群进行相关操作。对于使用Java API连接方式,旧版本提供的TransportClient不能实现使用用户名和密码连接集群,所以需要使用6.5.4版本配套的相关API进行相关开发。二、解决方式使用RestHighLevelClient作为客户端进行连接集群以及相关操作的实现。其中使用HttpHost类负责http请求,并在HttpHost类中将CredentialsProvider和SSLIOSessionStrategy配置参数类封装在自定义的SecuredHttpClientConfigCallback类配置请求连接参数。SecuredHttpClientConfigCallback:封装所有自定义的连接参数。CredentialsProvider:Elasticsearch API原始类,主要使用Elasticsearch提供的方法封装用户名和密码。SSLIOSessionStrategy:配置SSL相关参数,包括SSL域名验证方式、证书处理方式等。主要使用SSLContext类封装相关设置。在SSLIOSessionStrategy类中提供两种方式解决连接问题:(1)忽略所有证书,跳过安全认证环节进行连接① 构造TrustManager,使用默认X509TrustManager,不重写任何方法,相当于忽略所有相关操作。② 构造SSLContext:使用第一步的TrustManager为参数,默认方法构造SSLContext。(2)使用下载的证书(CloudSearchService.cer),加载证书进行连接① 上传证书到客户端,在命令行中使用keytool工具将证书转换成Java可以读取的证书格式:keytool -import -alias 自命名 -keystore 输出的证书路径和重命名名字 -file 上传证书的路径keytool默认密码为changeit② 自定义TrustManager类,继承于X509TrustManager,读取第一步输出的证书,将其加入信任证书里,重写X509TrustManager接口的3个方法③ 构造SSLContext:使用第一步的TrustManager为参数,默认方法构造SSLContext。三、代码示例代码运行时,传入3个参数,分别是连接地址,集群登录用户名和密码,示例实现的请求是GET /_search{"query": {"match_all": {}}}四、具体代码ESSecuredClient类(解决方式(1))import org.apache.http.auth.AuthScope;import org.apache.http.auth.UsernamePasswordCredentials;import org.apache.http.client.CredentialsProvider;import org.apache.http.impl.client.BasicCredentialsProvider;import org.apache.http.HttpHost;import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy; import org.elasticsearch.action.search.SearchRequest;import org.elasticsearch.action.search.SearchResponse;import org.elasticsearch.client.RequestOptions;import org.elasticsearch.client.RestClient;import org.elasticsearch.client.RestClientBuilder;import org.elasticsearch.client.RestHighLevelClient;import org.elasticsearch.index.query.QueryBuilders;import org.elasticsearch.search.SearchHit;import org.elasticsearch.search.SearchHits;import org.elasticsearch.search.builder.SearchSourceBuilder; import javax.net.ssl.*;import java.security.KeyManagementException;import java.security.NoSuchAlgorithmException;import java.security.SecureRandom;import java.security.cert.CertificateException;import java.security.cert.X509Certificate; public class ESSecuredClient {public static void main(String[] args) throws Exception { String clusterAddress = args[0]; String userName = args[1]; String password = args[2]; RestHighLevelClient client = initESClient(clusterAddress, userName, password); //Specific operations based on demand try{ SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); SearchHits hits = searchResponse.getHits(); for(SearchHit hit:hits) { System.out.println(hit.getSourceAsString()); } System.out.println("connected"); Thread.sleep(2000L); } catch (InterruptedException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); }finally { try{ client.close(); } catch (IOException e) { e.printStackTrace(); } } } private static RestHighLevelClient initESClient(String clusterAddress,String userName,String password) { final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(userName, password)); SSLContext sc = null; try{ sc = SSLContext.getInstance("SSL"); sc.init(null, trustAllCerts, new SecureRandom()); }catch(KeyManagementException e){ e.printStackTrace(); }catch(NoSuchAlgorithmException e){ e.printStackTrace(); } SSLIOSessionStrategy sessionStrategy = new SSLIOSessionStrategy(sc, new NullHostNameVerifier()); SecuredHttpClientConfigCallback httpClientConfigCallback = new SecuredHttpClientConfigCallback(sessionStrategy,credentialsProvider); RestClientBuilder builder = RestClient.builder(new HttpHost(clusterAddress, 9200, "https")).setHttpClientConfigCallback(httpClientConfigCallback); RestHighLevelClient client = new RestHighLevelClient(builder); return client; } static TrustManager[] trustAllCerts = new TrustManager[] { new X509TrustManager() { @Override public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException { } @Override public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException { } @Override public X509Certificate[] getAcceptedIssuers() { return null; } }}; public static class NullHostNameVerifier implements HostnameVerifier { @Override public boolean verify(String arg0, SSLSession arg1) { return true; } }}ESSecuredClient类(解决方式(2))import org.apache.http.auth.AuthScope;import org.apache.http.auth.UsernamePasswordCredentials;import org.apache.http.client.CredentialsProvider;import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.HttpHost;import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;import org.elasticsearch.action.search.SearchRequest;import org.elasticsearch.action.search.SearchResponse;import org.elasticsearch.client.RequestOptions;import org.elasticsearch.client.RestClient;import org.elasticsearch.client.RestClientBuilder;import org.elasticsearch.client.RestHighLevelClient;import org.elasticsearch.index.query.QueryBuilders;import org.elasticsearch.search.SearchHit;import org.elasticsearch.search.SearchHits;import org.elasticsearch.search.builder.SearchSourceBuilder; import javax.net.ssl.*;import java.io.File;import java.io.FileInputStream;import java.io.InputStream;import java.security.*;import java.security.cert.CertificateException;import java.security.cert.X509Certificate; public class ESSecuredClient { public static void main(String[] args) throws Exception { String clusterAddress = args[0]; String userName = args[1]; String password = args[2]; RestHighLevelClient client = initESClient(clusterAddress, userName, password); //Specific operations based on demand try{ SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); SearchHits hits = searchResponse.getHits(); for(SearchHit hit:hits) { System.out.println(hit.getSourceAsString()); } System.out.println("connected"); Thread.sleep(2000L); } catch (InterruptedException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); }finally { try{ client.close(); } catch (IOException e) { e.printStackTrace(); } } } private static RestHighLevelClient initESClient(String clusterAddress,String userName,String password) { final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY,new UsernamePasswordCredentials(userName, password)); SSLContext sc = null; try{ TrustManager[] tm = {new MyX509TrustManager()}; sc = SSLContext.getInstance("SSL", "SunJSSE"); sc.init(null, tm, new SecureRandom()); }catch (Exception e) { e.printStackTrace(); } SSLIOSessionStrategy sessionStrategy = new SSLIOSessionStrategy(sc, new NullHostNameVerifier()); SecuredHttpClientConfigCallback httpClientConfigCallback = new SecuredHttpClientConfigCallback(sessionStrategy,credentialsProvider); RestClientBuilder builder = RestClient.builder(new HttpHost(clusterAddress, 9200, "https")) .setHttpClientConfigCallback(httpClientConfigCallback); RestHighLevelClient client = new RestHighLevelClient(builder); return client;} //create a TrustManager class contains your certification public static class MyX509TrustManager implements X509TrustManager { X509TrustManager sunJS**509TrustManager; MyX509TrustManager() throws Exception { //enter your output certification file path, not the original certification file. File file = new File("certification file path"); if (file.isFile() == false) { throw new Exception("Wrong Certification Path"); } System.out.println("Loading KeyStore " + file + "..."); InputStream in = new FileInputStream(file); KeyStore ks = KeyStore.getInstance("JKS"); //enter the keytool password, default password is changeit ks.load(in, "changeit".toCharArray()); TrustManagerFactory tmf =TrustManagerFactory.getInstance("SunX509", "SunJSSE"); tmf.init(ks); TrustManager tms [] = tmf.getTrustManagers(); /* * Iterate over the returned trustmanagers, look * for an instance of X509TrustManager. If found, * use that as our "default" trust manager. */ for (int i = 0; i < tms.length; i++) { if (tms instanceof X509TrustManager) { sunJS**509TrustManager = (X509TrustManager) tms; return; } } in.close(); throw new Exception("Couldn't initialize"); } //Delegate to the default trust manager. public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException { try { sunJS**509TrustManager.checkClientTrusted(chain, authType); } catch (CertificateException excep) { e.printStackTrace(); } } //Delegate to the default trust manager. public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException { try { sunJS**509TrustManager.checkServerTrusted(chain, authType); } catch (CertificateException excep) { e.printStackTrace(); } } public X509Certificate[] getAcceptedIssuers() { return sunJS**509TrustManager.getAcceptedIssuers(); } } public static class NullHostNameVerifier implements HostnameVerifier { @Override public boolean verify(String arg0, SSLSession arg1) { return true; } }}SecuredHttpClientConfigCallback类import org.apache.http.client.CredentialsProvider;import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;import org.elasticsearch.client.RestClientBuilder;import org.elasticsearch.common.Nullable; import java.util.Objects; class SecuredHttpClientConfigCallback implements RestClientBuilder.HttpClientConfigCallback { @Nullable private final CredentialsProvider credentialsProvider; /** * The {@link SSLIOSessionStrategy} for all requests to enable SSL / TLS encryption. */ private final SSLIOSessionStrategy sslStrategy; /** * Create a new {@link SecuredHttpClientConfigCallback}. * * @param credentialsProvider The credential provider, if a username/password have been supplied * @param sslStrategy The SSL strategy, if SSL / TLS have been supplied * @throws NullPointerException if {@code sslStrategy} is {@code null} */ SecuredHttpClientConfigCallback(final SSLIOSessionStrategy sslStrategy, @Nullable final CredentialsProvider credentialsProvider) { this.sslStrategy = Objects.requireNonNull(sslStrategy); this.credentialsProvider = credentialsProvider; } /** * Get the {@link CredentialsProvider} that will be added to the HTTP client. * @return Can be {@code null}. */ @Nullable CredentialsProvider getCredentialsProvider() { return credentialsProvider; } /** * Get the {@link SSLIOSessionStrategy} that will be added to the HTTP client. * * @return Never {@code null}. */ SSLIOSessionStrategy getSSLStrategy() { return sslStrategy; } /** * Sets the {@linkplain HttpAsyncClientBuilder#setDefaultCredentialsProvider(CredentialsProvider) credential provider}, * * @param httpClientBuilder The client to configure. * @return Always {@code httpClientBuilder}. */ @Override public HttpAsyncClientBuilder customizeHttpClient(final HttpAsyncClientBuilder httpClientBuilder) { // enable SSL / TLS httpClientBuilder.setSSLStrategy(sslStrategy); // enable user authentication if (credentialsProvider != null) { httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); } return httpClientBuilder; }}Pom.xml文件 <?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>1</groupId> <artifactId>ESClient</artifactId> <version>1.0-SNAPSHOT</version> <name>ESClient</name> <!-- FIXME change it to the project's website --> <url>http://www.example.com</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.7</maven.compiler.source> <maven.compiler.target>1.7</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>transport</artifactId> <version>6.5.4</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>6.5.4</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>6.5.4</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <version>2.7</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.7</version> </dependency> </dependencies> <build> <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) --> <plugins> <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle --> <plugin> <artifactId>maven-clean-plugin</artifactId> <version>3.1.0</version> </plugin> <!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging --> <plugin> <artifactId>maven-resources-plugin</artifactId> <version>3.0.2</version> </plugin> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.0</version> </plugin> <plugin> <artifactId>maven-surefire-plugin</artifactId> <version>2.22.1</version> </plugin> <plugin> <artifactId>maven-jar-plugin</artifactId> <version>3.0.2</version> </plugin> <plugin> <artifactId>maven-install-plugin</artifactId> <version>2.5.2</version> </plugin> <plugin> <artifactId>maven-deploy-plugin</artifactId> <version>2.8.2</version> </plugin> <!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle --> <plugin> <artifactId>maven-site-plugin</artifactId> <version>3.7.1</version> </plugin> <plugin> <artifactId>maven-project-info-reports-plugin</artifactId> <version>3.0.0</version> </plugin> </plugins> </pluginManagement> </build></project>
-
Elasticsearch6.7 啥时候会上线?Elasticsearch 发表于2019-05-31 15:30:36 2019-05-31 15:30:36 最后回复 Elasticsearch 2019-05-31 15:30:362727 0
-
软件/服务说明华为云RDS服务Mysql v5.7.23华为云云搜索CSS服务ElasticSearch v6.2.3canal.adapter-1.1.3.tar.gzcanal.deployer-1.1.3.tar.gzGithub开源ETL软件canal华为云ECS用来部署canalDeployer和canalAdapter,Windows或Linux都行,本文档使用Windows演示。Postman在华为云ECS中调用canal的Restful接口手动触发历史存量数据导出;Linux下用curl命令。 备注:华为云RDS服务、云搜索CSS服务、ECS三者必须网络相同,即vpc、子网、安全组一致。 一、配置并拉起canal deployer服务1. 在RDS服务界面获取数据库访问地址,如下图 2. 下载并解压canal最新release的deployer包3. 编辑conf/example/instance.properties,配置mysql访问地址和账号密码,如下图4. 启动Deployer进程,命令如下.\bin\ startup 二、配置并拉起canal adapter-es服务1. 华为云CSS服务conle界面,获取ES集群访问地址,如下图 2. 下载并解压canal最新release的adapter包3. 编辑conf/application.yml,配置canalDeployer访问地址、mysql访问地址和账号密码、ES访问地址和集群名称,如下图备注:端口请使用9300,不要使用9200 mytest是mysql数据库名称 4. 配置conf/es/*.yml文件,定义mysql数据到ES数据的映射字段,如下 数据库mytest中user表的字段如下 华为云CSS集群中定义的mytest_user索引的mapping如下 "mappings": { "_doc": { "properties": { "name": { "type": "text" }, "role_id": { "type": "long" }, "c_time": { "type": "date" } } } } 更多配置样例参考Sync-ES wiki 5. 启动canal-adapter启动器bin/startup 备注:该进程会自动同步增量数据到华为云CSS(ES)集群 三、手动导出历史存量数据1. 从canal adapter的配置文件application.yml中查看http的启动端口默认是80812. 安装postman,使用postman下发restful接口导出历史存量数据到ES,如下ps:数据量特别大的时候,可以where条件分批导出,即通过http://127.0.0.1:8081/etl/es/mytest_user.yml -X POST -d "params=2018-10-21 00:00:00"的params参数和\conf\es\*.yml配置中的etlCondition指定扫描范围。 3. 在华为云CSS的console界面上打开kibana,查询数据成功。 四、验证canal自动导出增量数据1. 使用mysql客户端(mysql shell,heidisql等),新增/修改/删除mysql数据库中mytest.user表的数据,将会自动同步到ES的mytest_user索引下面2. 在华为云CSS的console界面上打开kibana,查询数据成功 如有其它问题,可以阅读Github canal的wiki解疑。
-
一、Bulk API使用bulk命令时,REST API以_bulk结尾,批量操作写在json文件中,官网给出的语法格式:action_and_meta_data\noptional_source\naction_and_meta_data\noptional_source\n....action_and_meta_data\noptional_source\n也就是说每一个操作都有2行数据组成,末尾要回车换行。第一行用来说明操作命令和原数据、第二行是自定义的选项.举个例子,同时执行**2条数据、删除一条数据, 新建bulkdata.json,写入如下内容:{ "create" : { "_index" : "blog", "_type" : "article", "_id" : "3" }}{ "title":"title1","posttime":"2016-07-02","content":"内容一" }{ "create" : { "_index" : "blog", "_type" : "article", "_id" : "4" }}{ "title":"title2","posttime":"2016-07-03","content":"内容2" }{ "delete":{"_index" : "blog", "_type" : "article", "_id" : "1" }}执行:$ curl -XPOST "http://localhost:9200/_bulk?pretty" --data-binary @bulkAdd.json{ "took" : 11, "errors" : false, "items" : [ { "create" : { "_index" : "blog", "_type" : "article", "_id" : "13", "_version" : 1, "_shards" : { "total" : 1, "successful" : 1, "failed" : 0 }, "status" : 201 } } ]}注意:行末要回车换行,不然会因为命令不能识别而出现错误.$ curl -XPOST "http://localhost:9200/_bulk?pretty" --data-binary @bulkAdd.json { "error" : { "root_cause" : [ { "type" : "action_request_validation_exception", "reason" : "Validation Failed: 1: no requests added;" } ], "type" : "action_request_validation_exception", "reason" : "Validation Failed: 1: no requests added;" }, "status" : 400}二、批量导出下面的例子是把索引库中的文档以json格式批量导出到文件中,其中集群名称为”bropen”,索引库名为”blog”,type为”article”,项目根目录下新建files/bulk.txt,索引内容写入bulk.txt中:import java.io.BufferedWriter;import java.io.File;import java.io.FileWriter;import java.io.IOException;import java.net.InetAddress;import java.net.UnknownHostException;import org.elasticsearch.action.search.SearchResponse;import org.elasticsearch.client.Client;import org.elasticsearch.client.transport.TransportClient;import org.elasticsearch.common.settings.Settings;import org.elasticsearch.common.transport.InetSocketTransportAddress;import org.elasticsearch.index.query.QueryBuilder;import org.elasticsearch.index.query.QueryBuilders;import org.elasticsearch.search.SearchHits;public class ElasticSearchBulkOut { public static void main(String[] args) { try { Settings settings = Settings.settingsBuilder() .put("cluster.name", "bropen").build();// cluster.name在elasticsearch.yml Client client = TransportClient.builder().settings(settings).build() .addTransportAddress(new InetSocketTransportAddress( InetAddress.getByName("127.0.0.1"), 9300)); QueryBuilder qb = QueryBuilders.matchAllQuery(); SearchResponse response = client.prepareSearch("blog") .setTypes("article").setQuery(QueryBuilders.matchAllQuery()) .execute().actionGet(); SearchHits resultHits = response.getHits(); File article = new File("files/bulk.txt"); FileWriter fw = new FileWriter(article); BufferedWriter bfw = new BufferedWriter(fw); if (resultHits.getHits().length == 0) { System.out.println("查到0条数据!"); } else { for (int i = 0; i < resultHits.getHits().length; i++) { String jsonStr = resultHits.getHits()[i] .getSourceAsString(); System.out.println(jsonStr); bfw.write(jsonStr); bfw.write("\n"); } } bfw.close(); fw.close(); } catch (UnknownHostException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } }}三、批量导入从刚才导出的bulk.txt文件中按行读取,然后bulk导入。首先通过调用client.prepareBulk()实例化一个BulkRequestBuilder对象,调用BulkRequestBuilder对象的add方法添加数据。实现代码:import java.io.BufferedReader;import java.io.File;import java.io.FileNotFoundException;import java.io.FileReader;import java.io.IOException;import java.net.InetAddress;import java.net.UnknownHostException;import org.elasticsearch.action.bulk.BulkRequestBuilder;import org.elasticsearch.client.Client;import org.elasticsearch.client.transport.TransportClient;import org.elasticsearch.common.settings.Settings;import org.elasticsearch.common.transport.InetSocketTransportAddress;public class ElasticSearchBulkIn { public static void main(String[] args) { try { Settings settings = Settings.settingsBuilder() .put("cluster.name", "bropen").build();// cluster.name在elasticsearch.yml中配置 Client client = TransportClient.builder().settings(settings).build() .addTransportAddress(new InetSocketTransportAddress( InetAddress.getByName("127.0.0.1"), 9300)); File article = new File("files/bulk.txt"); FileReader fr=new FileReader(article); BufferedReader bfr=new BufferedReader(fr); String line=null; BulkRequestBuilder bulkRequest=client.prepareBulk(); int count=0; while((line=bfr.readLine())!=null){ bulkRequest.add(client.prepareIndex("test","article").setSource(line)); if (count%10==0) { bulkRequest.execute().actionGet(); } count++; //System.out.println(line); } bulkRequest.execute().actionGet(); bfr.close(); fr.close(); } catch (UnknownHostException e) { e.printStackTrace(); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } }}参考文档:Elasticsearch Reference [2.3] » Document APIs » Bulk API原文:https://blog.csdn.net/napoay/article/details/51907709
-
摘要:结合华为云,介绍如何使用Logstash将数据迁往Elasticsearch(ES),为后续提供指导。一、背景首先介绍下ES常用的数据导入工具,以及华为云上ES的应用场景:1)常用数据导入方法:将数据导入到ES的方法较多,常用的工具与方法包括:Elasticsearch-dump,通过Snapshot,Beats以及LogStash等等;其中Elasticsearch-dump主要用于ES集群间导入导出数据,Snapshot主要用于ES集群间数据的备份与恢复,Beats与LogStash作为独立的ETL工具,其应用场景较广泛。2)华为云ES应用场景:华为云上的ES服务运行在独立的VPC中,通过外网无法直接访问,因此其主要使用场景是在ES集群所在VPC下申请一台ECS并绑定弹性IP地址(用于部署业务应用),然后客户通过EIP访问业务应用,业务应用程序再调用ES集群。具体如下图:二、具体使用场景【conf文件组成】使用Logstash将数据导入到ES集群,其关键在于如何配置其conf文件。一般而言,Logstash的配置文件包括3个部分,分别是input,filter,以及output;其中input用于配置数据源,filter用于对数据进行ETL预处理,output用于配置目的端。【运行Logstash】当配置好conf文件后(假设其名称为logstash-simple.conf),接下来在logstash目录下,通过如下命令可启动数据导入:./bin/logstash -f logstash-simple.conf;【具体使用场景】接下来我们结合几个具体需求来介绍如何配置conf文件:1)如何指定导入的index与type名称?使用Logstash导入数据的过程中,如果没有指定index以及type,默认使用logstash-%{+YYYY.MM.dd}以及logs;如果想指定index以及type,可以在output中增加如下内容,使得index以myindex开头,type为mytype:output { elasticsearch { hosts => "192.168.0.65:9200" index => "myindex-%{+YYYY.MM.dd}" document_type => "mytype" }}2)如何删除导入过程中产生的新字段?使用Logstash在进行数据导入的过程中,默认会添加两个字段@version,@timestamp;如果在应用中不想要这两这个字段,可以在filter中增加如下内容:filter { mutate { remove_field => ["@version"," @timestamp"] }}3)如何使得索引名称与当前时区一致?使用Logstash在进行数据导入的过程中,index名称后缀默认为@timestamp字段所对应的UTC时间,该设计的初衷是为了统一不同时区的查询(不同时区的查询经过Kibana首先会转化为UTC时间,然后Kibana再向ES发送查询请求,这样不同时区查询过去一段时间的数据时,获得的结果是一样的),但这种情况下如果要查询本地时间某天的数据,则需要遍历Day-1,Day以及Day+1 3天的index;为了兼顾统一查询以及查询效率,可以在filter中增加如下内容,这里假设源数据中存在一个时间字段timestamp,并且其格式为dd/MMM/yyyy:HH:mm:ss +0800(其他格式类似),那么通过如下命令可以将timestamp字段向前滚动8个小时,然后赋值给@timestamp,从而使得index名称的后缀为东八区的时间,同时通过使用timestamp进行统计分析,仍然可以统一查询。filter { date { match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss +0800"] locale => "en" timezone => "UTC" }}4)如何将外网数据导入到华为云ES,并控制数据导入速率?如前所述,将外网数据导入到华为云ES,需要一台带有EIP的ECS作为中转,此时根据Logstash部署的位置,可分为两种场景:一种是Logstash部署在外网,一种是Logstash部署在ECS上。a) 当Logstash部署在外网时,首先需要申请一台与ES集群在同一VPC下的ECS,并开放其安全组的9200外网访问权限;然后为该ECS绑定一个EIP,并限制该EIP的网络带宽;接下来在ECS上做一个端口映射,具体命令如下,然后输入root用户密码完成端口转发(其中9200为EIP接入端口,192.168.0.66:9200为ES集群接入点,192.168.0.200为ECS本机的内网ip地址):ssh -g -L 9200:192.168.0.66:9200 -N -f [email]root@192.168.0.200[/email]最后配置Logstash的conf文件output部分如下,然后运行Logstash开始数据导入过程:output { elasticsearch { hosts => "EIP:9200" }}b) 当Logstash部署在ECS上时,首先需要确保ES集群与ECS在同一VPC下,并且已开放安全组的9200外网访问权限;然后为该ECS绑定一个EIP,并限制该EIP的网络带宽;最后按常规方式配置conf文件,并运行Logstash开始数据导入过程(不再需要在ECS上做端口映射)。三、拓展思考上面给大家介绍了如何使用Logstash将数据导入到ES集群,此外Logstash还可以用于ES集群内部index的变换处理,包括index合并、过滤某些字段等等。另外如果是在两个ES集群间互导数据,可通过指定document_id防止重复导入数据。
-
原因:CSS服务的ES建在创建indices的时候默认不指定字段的分词器,默认使用的是标准分词器,就是一元分词,创建的term全部都是单个字,wildcard模糊查询指定的value字段查询的时候,将value字段当成一个term做查询,当value为多字中文时,ES的索引中没有多字的term,此时就查不到对应的字典;例如:查“中”,可以查到“中华人民共和国”,但是查“中华”,就会发现没有查询结果, 对比举例如下:无结果查询:"wildcard": {"name": {"value": "*中华*","boost": 1}},有结果查询:"wildcard": {"name": {"value": "中","boost": 1}},
-
最常见的原因是VPC的掩码的位数和子网掩码的位数一样,比如,如下截图,两个位数都是24,此时会导致添加路由失败,这里就需要将子网的掩码位数修改,比如16,然后可以在本端VPC的ECS节点访问对端的VPC内的ES集群。
-
Elasticsearch 2.x版本往5.x版本迁移数据迁移前准备1. String类型: 2.x版本跟 5.x版本有一个比较大的改动就是ES的基本类型string字段。5.x版本中使用text/keyword字段替代了2.x版本的string类型。text表示使用分词的string(即之前默认的string),keyword为不使用分词的string(即not_analyzed),keyword类型的数据只能完全匹配,适合那些不需要分词的数据,对过滤、聚合非常友好。 如下2.x版本中的mappin**段: "content" : { "properties" : { "status" : { "type" : "string" }, "tgroup" : { "type" :"string", "index" :"not_analyzed" }, } }5.x版本中可以修改为如下: "content" : { "properties" : { "status" : { "type" : "text" }, "tgroup" : { "type" : "keyword", }, } },Status字段类型变为使用分词的text,tgroup字段类型变为不分词的keyword。 2. index下面的type映射之logstash配置 从2.x版本的Elasticsearch往5.x的elasticsearch导入数据,一个index下面有多个type,导入到5.x中如果要求type一一对应,需要指定type映射,使用如下logstash配置: input{ elasticsearch{ hosts=> 2.xES地址 index=>xx docinfo=>true }}filter{}output{ elasticsearch{ hosts => 5.xES地址 index => xx document_type=>"%{[@metadata][_type]}" }}如上标黄部分中,指定了2.x版本index下的type会一一映射到5.x版本,使用docinfo打开一些原始数据如 迁移后使用遇到的比较多的查询错误如下,2.x可以使用如下查询语句进行查询GET _search{ "query": { "filtered": { "query": { "match": { "text": "quick brown fox" } }, "filter": { "term": { "status": "published" } } } }}但是这个语句如果在5.x中执行将会报错no[query] registered for [filtered]"root_cause": [ { "type":"parsing_exception", "reason": "no [query] registered for[filtered]", …….. } ],在5.x版本中filtered 查询已经被废弃了,如果要实现过滤可以使用constant_score或在bool子句下filter实现。这两者都不会计算文档得分,使查询更高效。改为如下方式GET_search{ "query": { "bool": { "must": { "match": { "text": "quick brownfox" } }, "filter": { "term": { "status":"published" } } } }}关于filtered query 具体可以参考elastic官网https://www.elastic.co/guide/en/elasticsearch/reference/5.0/query-dsl-filtered-query.html 5.x Eleasticsearch版本的一些改进增加了一些document API,可以使得操作更加方便。如:_update_by_query,可以根据查询到的结果进行更新。_delete_by_query,可以根据查询删除某些文档。_reindex ,可以使用_reindex迁移索引。如下POST /_reindex{ "source":{ "index":"cars" }, "dest":{ "index":"cars_new" }}其他的关于性能方面的改进可以参考: http://cwiki.apachecn.org/pages/viewpage.action?pageId=4260605
-
安装指导及使用简介1. 下载安装包:https://github.com/lmenezes/cerebro/releases/download/v0.7.3/cerebro-0.7.3.tgz2. 将安装包copy到ECS节点,该节点网络和待监控的ES集群在同一VPC,网络互通3. 解压安装包,参考README.md文件启动服务4. 指定IP和端口号启动:启动方式一:bin/cerebro -Dhttp.port=1234 -Dhttp.address=127.0.0.1启动方式二:修改配置文件conf/application.conf指定监控的ES集群IP和Port:# A list of known hosts 43 hosts = [ 44 { 45 host = "http://10.51.179.102:9200" 46 name = "Es-test" 47 }, 48 # Example of host with authentication 49 #{ 50 # host = "http://some-authenticated-host:9200" 51 # name = "Secured Cluster" 52 # auth = { 53 # username = "username" 54 # password = "secret-password" 55 # } 56 #}启动服务:bin/cerebro -Dconfig.file=conf/application.conf5. 方式一启动之后浏览器访问服务:http://127.0.0.1:1234如下图界面:17509窗口中输入待监控的ES集群的ip和port如下图:17519登陆进去之后overview可以查看集群的索引分布情况(方式二启动直接到该界面):17520Nodes可以看到各节点的资源使用情况:17521Rest可以往集群发请求,调用ES的API:17513More可以做更多操作:17514
上滑加载中
推荐直播
-
揭秘高可靠高性能的亿级物联网平台发展历程
2024/03/27 周三 16:30-18:00
阿钟 华为云IoT DTSE技术布道师
对话华为20年资深测试老兵,从手工测试到自动化测试,到实现测试服务化的成长过程,揭秘亿级物联网平台高可靠性、高性能的发展历程。
回顾中
热门标签