您的位置: 首页 - 站长

html5手机网站织梦模板Wordpress query 参数

当前位置: 首页 > news >正文

html5手机网站织梦模板,Wordpress query 参数,网站建设实施计划,十堰建设网站首页业务场景 1、RestClientBuilder初始化#xff08;同时支持单机与集群#xff09; 2、发送ES查询请求公共方法封装#xff08;支持sql、kql、代理访问、集群访问、鉴权支持#xff09; 3、判断ES索引是否存在#xff08;/_cat/indices/\({indexName}#xff09; 4、判断ES…业务场景 1、RestClientBuilder初始化同时支持单机与集群 2、发送ES查询请求公共方法封装支持sql、kql、代理访问、集群访问、鉴权支持 3、判断ES索引是否存在/_cat/indices/\){indexName} 4、判断ES索引别名是否存在 /_cat/aliases/\({indexName} 5、判断ES索引指定字段/属性是否存在这里字段支持多级如logObj.id 6、判断ES索引指定字段/属性的类型字段支持多级 7、阻塞线程直至索引就绪为了应对跨日时索引名短时间可能不存在的问题 8、创建索引别名可用于支持sql查询索引名中有特殊字符不能用作表名可通过创建别名来解决 9、索引别名创建结果解析 判断acknowledged 10、KQL查询ESKibana语法查询ElasticSearch 11、SQL查询ES标准SQL语法查询ElasticSearch 12、Java在本地通过代理访问ES可用于解决网络不能直接的问题 13、Java 客户端访问ES集群同时支持单机与集群 14、Java ES客户端鉴权安全需求软件环境 ElasticSearch 7.17.23 下载地址 ElasticSearch 7.17.23 帮助文档 ElasticSearch 8.17.2 下载地址 ElasticSearch 8.17.2 帮助文档 说明当前例子中用的7理论上8也通用 Kibana查询效果 KQL查询ES SQL查询ES 下面讲java代码实现 Java类方法详解 1、RestClientBuilder初始化 同时支持单机与集群 /*** RestClientBuilder 初始化** param host 同时支持单机与集群* 单机host和port各司其职* 集群时port参数无效host中包含IP和PORT多个实例用逗号分隔** eg* 10.***.6.247* 或 host:10.***.6.247:9200,10.***.6.34:9200,10.***.6.120:9200* param port* return*/private RestClientBuilder buildClient(String host, Integer port){RestClientBuilder restClientBuilder null;if(host.indexOf(,)-1){// 单机 host:10.***.6.247, 只有单机会使用port参数restClientBuilder RestClient.builder(new HttpHost( host, port, http ) );}else{// 集群 host:10.***.6.247:9200,10.***.6.34:9200,10.***.6.120:9200,10.***.6.9:9200,10.***.6.183:9200String[] hostArr host.split(\\,);HttpHost[] httpHosts new HttpHost[hostArr.length];for( int i0; ihostArr.length; i ){String[] addrs hostArr[i].split(\\:);HttpHost httpHost new HttpHost( addrs[0], Integer.valueOf(addrs[1]), http );httpHosts[i] httpHost;}restClientBuilder RestClient.builder( httpHosts );}return restClientBuilder;} 2、发送ES查询请求公共方法 SQL支持KQL支持支持代理访问支持鉴权支持 /*** 发送ES 查询请求** param host* param port* param username* param password* param method* param endpoint ES接口* eg* 1、创建别名 /_aliases* 2、判断索引是否存在 /_cat/indices/myIndexName* 2、判断索引别名是否存在 /_cat/aliases/indexName** param jsonEntity 查询语句* eg* 1、为索引创建别名可用于支持sql查询如果使用sql查询时原索引名中有特殊字符不能用作表名可通过创建别名来解决* String kqlJson {\actions\ : [{ \add\ : { \index\ : \idxName\,\alias\ : \idxAliases\ } }]} ;* String kqlJson ;* 2、String jsonEntity {\query\: \ sqlQuery2 \, * \params\: [cStart,cEnd], * \fetch_size\: 65536 };** 实际业务场景举例* POST _sql?formattxt* {* query: SELECT tags.svc_code, sum(iif(tags.response_code.keyword0000,1,0)) as success_count, count(metric) as total* FROM order_service_***** where create_time between 2025-02-27T11:00:000800 and 2025-02-27T13:59:000800* group by tags.svc_code having count(metric)50* order by 3 desc,* fetch_size: 65536* }** return* throws IOException*/public String request(String host, Integer port,String username, String password,String method, String endpoint, String jsonEntity) throws IOException {RestClientBuilder restClientBuilder buildClient( host, port );if(!StringUtils.isEmpty(username)){final CredentialsProvider credentialsProvider new BasicCredentialsProvider();credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {Overridepublic HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);//线程设置httpClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(2).build());/* ******* 能直连ES的不需要如果本地不能直连ES的则加上IP根据实际调整** */if(dev.equals(profile) host.indexOf(10.***.137)0 ) {httpClientBuilder.setProxy(new HttpHost(10.***.248.54, 8443, http) //设置代理服务);}else if(dev.equals(profile) host.indexOf(10.***.6)0 ){httpClientBuilder.setProxy(new HttpHost( 192.***.66.30, 8443,http) //设置代理服务);}return httpClientBuilder;}});}RestClient restClient restClientBuilder.build();Request request new Request(method, endpoint );request.setJsonEntity( jsonEntity );Response response restClient.performRequest(request);HttpEntity entityresponse.getEntity();restClient.close();entity new BufferedHttpEntity(entity);return EntityUtils.toString(entity);} 3、判断ES索引是否存在 /*** 判断索引名是否存在** param host* param port* param username* param password* param indexName 索引名* return* throws IOException*/public boolean isExistsIndex( String host, Integer port,String username, String password,String indexName ) throws IOException {RestClientBuilder restClientBuilder buildClient( host, port );if(!StringUtils.isEmpty(username)){final CredentialsProvider credentialsProvider new BasicCredentialsProvider();credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {Overridepublic HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);//线程设置httpClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(2).build());return httpClientBuilder;}});}RestClient restClient restClientBuilder.build();Request request new Request(GET, /_cat/indices/indexName );try{Response response restClient.performRequest(request);HttpEntity entityresponse.getEntity();entity new BufferedHttpEntity(entity);if (!StringUtils.hasLength(EntityUtils.toString(entity))) {System.out.println(Index exists.);return true;} else {System.out.println(Index does not exist.);return false;}}catch (Exception e){//如果不存在会报404的错误返回false创建别名return false;}finally {restClient.close();}} 4、判断ES索引别名是否存在 /*** 判断索引别名是否存在** param host* param port* param username* param password* param indexName* return* throws IOException*/public boolean isExistsAliases( String host, Integer port,String username, String password,String indexName ) throws IOException {RestClientBuilder restClientBuilder buildClient( host, port );if(!StringUtils.isEmpty(username)){final CredentialsProvider credentialsProvider new BasicCredentialsProvider();credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {Overridepublic HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);//线程设置httpClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(2).build());return httpClientBuilder;}});}RestClient restClient restClientBuilder.build();Request request new Request(GET, /_cat/aliases/indexName );Response response restClient.performRequest(request);HttpEntity entityresponse.getEntity();restClient.close();entity new BufferedHttpEntity(entity);if (!StringUtils.isEmpty(EntityUtils.toString(entity))) {System.out.println(Index alias exists.);return true;} else {System.out.println(Index alias does not exist.);return false;}} 5、获取ES索引指定字段/属性是否存在 这里字段支持多级如logObj.id/*** 判断索引 某个字段/属性是否存在* 说明 这里字段支持多级如logObj.id** param host* param port* param username* param password* param indexName* param property egid、 logObj.id* return* throws IOException*/public boolean isExistsProperty( String host, Integer port,String username, String password,String indexName, String property ) throws IOException {RestClientBuilder restClientBuilder buildClient( host, port );if(!StringUtils.isEmpty(username)){final CredentialsProvider credentialsProvider new BasicCredentialsProvider();credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {Overridepublic HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);//线程设置httpClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(2).build());return httpClientBuilder;}});}RestClient restClient restClientBuilder.build();Request request new Request(GET, /indexName/_mapping );Response response restClient.performRequest(request);HttpEntity entityresponse.getEntity();restClient.close();entity new BufferedHttpEntity(entity);JSONObject obj JSONObject.parseObject(EntityUtils.toString(entity));JSONObject properties obj.getJSONObject( obj.keySet().iterator().next() ).getJSONObject(mappings ).getJSONObject(properties );String[] arr property.split(\\.);for( int i0; iarr.length; i ){if(iarr.length-1){}else{if(properties.containsKey( arr[i] )){properties properties.getJSONObject(arr[i]).getJSONObject(properties );}else{return false;}}}boolean bool properties.containsKey( arr[arr.length-1] );log.info(property:{} , isExist:{}, property, bool );return bool;} 6、获取ES索引指定字段/属性的类型 同一个索引的同一字段不同时间的数据类型可能不一样从而影响sql语句的写法sql语法不一样所以个别场景要做判断 /*** 判断索引某个字段/属性的类型* 说明 这里字段支持多级如logObj.id** param host* param port* param username* param password* param indexName* param property egid、 logObj.id* return* throws IOException*/public String getIndexPropertyType( String host, Integer port,String username, String password,String indexName, String property ) throws IOException {RestClientBuilder restClientBuilder buildClient( host, port );if(!StringUtils.isEmpty(username)){final CredentialsProvider credentialsProvider new BasicCredentialsProvider();credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {Overridepublic HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);//线程设置httpClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(2).build());return httpClientBuilder;}});}RestClient restClient restClientBuilder.build();Request request new Request(GET, /indexName/_mapping );Response response restClient.performRequest(request);HttpEntity entityresponse.getEntity();restClient.close();entity new BufferedHttpEntity(entity);JSONObject obj JSONObject.parseObject(EntityUtils.toString(entity));JSONObject properties obj.getJSONObject( obj.keySet().iterator().next() ).getJSONObject(mappings ).getJSONObject(properties );String[] arr property.split(\\.);for( int i0; iarr.length; i ){if(iarr.length-1){properties properties.getJSONObject(arr[i]);}else{if(properties.containsKey( arr[i] )){properties properties.getJSONObject(arr[i]).getJSONObject(properties );}else{log.error( 判断字段类型时发现字段不存在property:{}, property );throw new RuntimeException( 判断字段类型时发现字段不存在property:property );}}}String type properties.getString(type );log.info(property:{} , type:{}, property, type );return type;}7、阻塞线程直至索引就绪 为了应对跨日时索引名短时间可能不存在的问题不处理可能导致程序报错 /*** 阻塞线程直至索引就绪为了应对跨日时索引名短时间可能不存在的问题** param host* param port* param username* param password* param indexName* throws IOException* throws InterruptedException*/Overridepublic void waitIndexReady( String host, Integer port,String username, String password,String indexName ) throws IOException, InterruptedException {// 循环一次是10s6次是1分钟60次是10分钟for( int i0; i60 !this.isExistsIndex(host, port,username, password,indexName) ; i ){// 共循环10分钟Thread.sleep( 10*1000 );}} 8、创建索引别名 有时候索引名带特殊字符是sql的关键字所以创建别名可供sql查询用作表名 /*** 创建索引别名** param username* param password* param host* param port* param idxName 索引名* param idxAliases 索引别名* throws ParseException* return*/public void createAliases( String username, String password,String host, int port,String idxName, String idxAliases ) throws ParseException {log.info(创建索引别名 {}:{}, {}, host, port, idxAliases );String method POST;String endpoint /_aliases;// 为索引创建别名用于支持sql查询String kqlJson {\actions\ : [{ \add\ : { \index\ : \idxName\,\alias\ : \idxAliases\ } }]} ;try{String body null;body this.request( host, port,username, password,method, endpoint, kqlJson );boolean acknowledged parseCreateAliasesResult( body );if(acknowledged){log.info(别名创建成功);}else{log.error(别名创建失败);log.error(别名创建失败 kqlJson:{}, kqlJson );throw new RuntimeException(索引别名创建失败);}}catch (Exception ex){log.error(创建索引别名异常message:{},ex.getLocalizedMessage());log.error(创建索引别名异常 kqlJson:{}, kqlJson );ex.printStackTrace();}} 9、索引别名创建结果解析 判断创建时返回值中的acknowledged属性值 /*** 判断body 中是否包含 acknowledged** param body* return*/private boolean parseCreateAliasesResult(String body ){JSONObject json JSONObject.parseObject(body);if(json.containsKey(acknowledged) json.getBoolean(acknowledged)){return true;}return false;} 完整代码实现 完整maven pom.xml ?xml version1.0 encodingUTF-8? project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdperson.brickman/groupIdartifactIdjavaProject/artifactIdversion1.0-SNAPSHOT/version!-- 统一管理jar包版本 --propertiesjava.version17/java.versionproject.build.sourceEncodingUTF-8/project.build.sourceEncodingmaven.compiler.source17/maven.compiler.sourcemaven.compiler.target17/maven.compiler.targetspring-boot.version2.7.18/spring-boot.versionspring-cloud.version2021.0.9/spring-cloud.versionspring-cloud-starter-bootstrap.version3.1.9/spring-cloud-starter-bootstrap.versionelasticsearch-client.version7.17.23/elasticsearch-client.versioncommons-lang3.version3.14.0/commons-lang3.versionfastjson2.version2.0.53/fastjson2.versionlombok.version1.18.28/lombok.versiontestng.version6.14.3/testng.version/propertiesdependenciesdependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-starter-bootstrap/artifactIdversion\){spring-cloud-starter-bootstrap.version}/version/dependencydependencygroupIdorg.elasticsearch.client/groupIdartifactIdelasticsearch-rest-high-level-client/artifactIdversion\({elasticsearch-client.version}/version/dependencydependencygroupIdorg.apache.httpcomponents.client5/groupIdartifactIdhttpclient5/artifactIdversion5.3/version/dependencydependencygroupIdorg.apache.commons/groupIdartifactIdcommons-lang3/artifactIdversion\){commons-lang3.version}/version/dependencydependencygroupIdcom.alibaba.fastjson2/groupIdartifactIdfastjson2/artifactIdversion\({fastjson2.version}/version/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdversion\){lombok.version}/versionoptionaltrue/optional/dependency!– 测试相关 默认集成junit5 作者用testng所以排除掉junit5 –dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scopeexclusionsexclusiongroupIdorg.junit.jupiter/groupIdartifactIdjunit-jupiter/artifactId/exclusionexclusiongroupIdorg.mockito/groupIdartifactIdmockito-junit-jupiter/artifactId/exclusion/exclusions/dependencydependencygroupIdorg.testng/groupIdartifactIdtestng/artifactIdversion\({testng.version}/versionscopetest/scope/dependency/dependenciesdependencyManagementdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-dependencies/artifactIdversion\){spring-boot.version}/versiontypepom/typescopeimport/scope/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-configuration-processor/artifactIdoptionaltrue/optional/dependencydependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-dependencies/artifactIdversion\({spring-cloud.version}/versiontypepom/typescopeimport/scope/dependency/dependencies/dependencyManagement /project 完整ES公共组件类 package person.brickman.es;import com.alibaba.fastjson2.JSONObject; import lombok.extern.slf4j.Slf4j; import org.apache.http.HttpEntity; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.entity.BufferedHttpEntity; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import org.apache.http.impl.nio.reactor.IOReactorConfig; import org.apache.http.util.EntityUtils; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils;import java.io.IOException; import java.text.ParseException;/*** Description: ES公共方法类公共组件* 1、RestClientBuilder初始化同时支持单机与集群* 2、发送ES查询请求公共方法封装支持sql、kql、代理访问、集群访问、鉴权* 3、判断ES索引是否存在/_cat/indices/\){indexName}* 4、判断ES索引别名是否存在 /_cat/aliases/\({indexName}* 5、判断ES索引指定字段/属性是否存在这里字段支持多级如logObj.id* 6、判断ES索引指定字段/属性的类型字段支持多级* 7、阻塞线程直至索引就绪为了应对跨日时索引名短时间可能不存在的问题* 8、创建索引别名可用于支持sql查询索引名中有特殊字符不能用作表名可通过创建别名来解决* 9、索引别名创建结果解析 判断acknowledged** Author: brickman* CreateDate: 2025/2/20 23:46* Version: 1.0*/ Slf4j Service public class ESRestClientService {Value(\){spring.profiles.active})private String profile;/*** RestClientBuilder 初始化** param host 同时支持单机与集群* 单机host和port各司其职* 集群时port参数无效host中包含IP和PORT多个实例用逗号分隔** eg* 10.*.6.247* 或 host:10..6.247:9200,10.**.6.34:9200,10.*.6.120:9200* param port* return*/private RestClientBuilder buildClient(String host, Integer port){RestClientBuilder restClientBuilder null;if(host.indexOf(,)-1){// 单机 host:10..6.247, 只有单机会使用port参数restClientBuilder RestClient.builder(new HttpHost( host, port, http ) );}else{// 集群 host:10.**.6.247:9200,10..6.34:9200,10..6.120:9200,10..6.9:9200,10..6.183:9200String[] hostArr host.split(\,);HttpHost[] httpHosts new HttpHost[hostArr.length];for( int i0; ihostArr.length; i ){String[] addrs hostArr[i].split(\:);HttpHost httpHost new HttpHost( addrs[0], Integer.valueOf(addrs[1]), http );httpHosts[i] httpHost;}restClientBuilder RestClient.builder( httpHosts );}return restClientBuilder;}/*** 发送ES 查询请求包含sql、kql、代理访问、鉴权支持** param host 同时支持单机与集群* 单机host和port各司其职* 集群时port参数无效host中包含IP和PORT多个实例用逗号分隔* param port* param username* param password* param method* param endpoint ES接口* eg* 1、创建别名 /_aliases* 2、判断索引是否存在 /_cat/indices/myIndexName* 2、判断索引别名是否存在 /_cat/aliases/indexName** param jsonEntity 查询语句* eg* 1、为索引创建别名可用于支持sql查询如果使用sql查询时原索引名中有特殊字符不能用作表名可通过创建别名来解决* String kqlJson {\actions\ : [{ \add\ : { \index\ : \idxName\,\alias\ : \idxAliases\ } }]} ;* String kqlJson ;* 2、String jsonEntity {\query: \ sqlQuery2 \, * \params: [cStart,cEnd], * \fetch_size: 65536 };** 实际业务场景举例* POST _sql?formattxt* {* query: SELECT tags.svc_code, sum(iif(tags.response_code.keyword0000,1,0)) as success_count, count(metric) as total* FROM orderservice***** where create_time between 2025-02-27T11:00:000800 and 2025-02-27T13:59:000800* group by tags.svc_code having count(metric)50* order by 3 desc,* fetch_size: 65536* }** return* throws IOException/public String request(String host, Integer port,String username, String password,String method, String endpoint, String jsonEntity) throws IOException {RestClientBuilder restClientBuilder buildClient( host, port );if(!StringUtils.isEmpty(username)){final CredentialsProvider credentialsProvider new BasicCredentialsProvider();credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {Overridepublic HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);//线程设置httpClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(2).build());/ ******* 本地能直连ES的不需要如果本地不能直连ES的则加上IP根据实际调整** /if(dev.equals(profile) host.indexOf(10.**.137)0 ) {httpClientBuilder.setProxy(new HttpHost(10..248.54, 8443, http) //设置代理服务);}else if(dev.equals(profile) host.indexOf(10..6)0 ){httpClientBuilder.setProxy(new HttpHost( 192..66.30, 8443,http) //设置代理服务);}return httpClientBuilder;}});}RestClient restClient restClientBuilder.build();Request request new Request(method, endpoint );request.setJsonEntity( jsonEntity );Response response restClient.performRequest(request);HttpEntity entityresponse.getEntity();restClient.close();entity new BufferedHttpEntity(entity);return EntityUtils.toString(entity);}/ 判断索引名是否存在** param host 同时支持单机与集群* 单机host和port各司其职* 集群时port参数无效host中包含IP和PORT多个实例用逗号分隔* param port* param username* param password* param indexName 索引名* return* throws IOException/public boolean isExistsIndex( String host, Integer port,String username, String password,String indexName ) throws IOException {RestClientBuilder restClientBuilder buildClient( host, port );if(!StringUtils.isEmpty(username)){final CredentialsProvider credentialsProvider new BasicCredentialsProvider();credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {Overridepublic HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);//线程设置httpClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(2).build());return httpClientBuilder;}});}RestClient restClient restClientBuilder.build();Request request new Request(GET, /_cat/indices/indexName );try{Response response restClient.performRequest(request);HttpEntity entityresponse.getEntity();entity new BufferedHttpEntity(entity);if (!StringUtils.hasLength(EntityUtils.toString(entity))) {System.out.println(Index exists.);return true;} else {System.out.println(Index does not exist.);return false;}}catch (Exception e){//如果不存在会报404的错误返回false创建别名return false;}finally {restClient.close();}}/** 判断索引别名是否存在** param host* param port* param username* param password* param indexName* return* throws IOException/public boolean isExistsAliases( String host, Integer port,String username, String password,String indexName ) throws IOException {RestClientBuilder restClientBuilder buildClient( host, port );if(!StringUtils.isEmpty(username)){final CredentialsProvider credentialsProvider new BasicCredentialsProvider();credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {Overridepublic HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);//线程设置httpClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(2).build());return httpClientBuilder;}});}RestClient restClient restClientBuilder.build();Request request new Request(GET, /_cat/aliases/indexName );Response response restClient.performRequest(request);HttpEntity entityresponse.getEntity();restClient.close();entity new BufferedHttpEntity(entity);if (!StringUtils.isEmpty(EntityUtils.toString(entity))) {System.out.println(Index alias exists.);return true;} else {System.out.println(Index alias does not exist.);return false;}}/** 判断索引 某个字段/属性是否存在* 说明 这里字段支持多级如logObj.id** param host* param port* param username* param password* param indexName* param property egid、 logObj.id* return* throws IOException/public boolean isExistsProperty( String host, Integer port,String username, String password,String indexName, String property ) throws IOException {RestClientBuilder restClientBuilder buildClient( host, port );if(!StringUtils.isEmpty(username)){final CredentialsProvider credentialsProvider new BasicCredentialsProvider();credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {Overridepublic HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);//线程设置httpClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(2).build());return httpClientBuilder;}});}RestClient restClient restClientBuilder.build();Request request new Request(GET, /indexName/_mapping );Response response restClient.performRequest(request);HttpEntity entityresponse.getEntity();restClient.close();entity new BufferedHttpEntity(entity);JSONObject obj JSONObject.parseObject(EntityUtils.toString(entity));JSONObject properties obj.getJSONObject( obj.keySet().iterator().next() ).getJSONObject(mappings ).getJSONObject(properties );String[] arr property.split(\.);for( int i0; iarr.length; i ){if(iarr.length-1){}else{if(properties.containsKey( arr[i] )){properties properties.getJSONObject(arr[i]).getJSONObject(properties );}else{return false;}}}boolean bool properties.containsKey( arr[arr.length-1] );log.info(property:{} , isExist:{}, property, bool );return bool;}/** 判断索引某个字段/属性的类型* 说明 这里字段支持多级如logObj.id* 同一个索引的同一字段不同时间的数据类型可能不一样从而影响sql语句的写法sql语法不一样所以个别场景要做判断** param host* param port* param username* param password* param indexName* param property egid、 logObj.id* return* throws IOException/public String getIndexPropertyType( String host, Integer port,String username, String password,String indexName, String property ) throws IOException {RestClientBuilder restClientBuilder buildClient( host, port );if(!StringUtils.isEmpty(username)){final CredentialsProvider credentialsProvider new BasicCredentialsProvider();credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {Overridepublic HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);//线程设置httpClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(2).build());return httpClientBuilder;}});}RestClient restClient restClientBuilder.build();Request request new Request(GET, /indexName/_mapping );Response response restClient.performRequest(request);HttpEntity entityresponse.getEntity();restClient.close();entity new BufferedHttpEntity(entity);JSONObject obj JSONObject.parseObject(EntityUtils.toString(entity));JSONObject properties obj.getJSONObject( obj.keySet().iterator().next() ).getJSONObject(mappings ).getJSONObject(properties );String[] arr property.split(\.);for( int i0; iarr.length; i ){if(iarr.length-1){properties properties.getJSONObject(arr[i]);}else{if(properties.containsKey( arr[i] )){properties properties.getJSONObject(arr[i]).getJSONObject(properties );}else{log.error( 判断字段类型时发现字段不存在property:{}, property );throw new RuntimeException( 判断字段类型时发现字段不存在property:property );}}}String type properties.getString(type );log.info(property:{} , type:{}, property, type );return type;}/** 阻塞线程直至索引就绪为了应对跨日时索引名短时间可能不存在的问题** param host* param port* param username* param password* param indexName* throws IOException* throws InterruptedException*/public void waitIndexReady( String host, Integer port,String username, String password,String indexName ) throws IOException, InterruptedException {// 循环一次是10s6次是1分钟60次是10分钟for( int i0; i60 !this.isExistsIndex(host, port,username, password,indexName) ; i ){// 共循环10分钟Thread.sleep( 101000 );}}/** 创建索引别名* 有时候索引名带特殊字符是sql的关键字所以创建别名可供sql查询用作表名** param username* param password* param host* param port* param idxName 索引名* param idxAliases 索引别名* throws ParseException* return/public void createAliases( String username, String password,String host, int port,String idxName, String idxAliases ) throws ParseException {log.info(创建索引别名 {}:{}, {}, host, port, idxAliases );String method POST;String endpoint /_aliases;// 为索引创建别名用于支持sql查询String kqlJson {\actions\ : [{ \add\ : { \index\ : \idxName\,\alias\ : \idxAliases\ } }]} ;try{String body null;body this.request( host, port,username, password,method, endpoint, kqlJson );boolean acknowledged parseCreateAliasesResult( body );if(acknowledged){log.info(别名创建成功);}else{log.error(别名创建失败);log.error(别名创建失败 kqlJson:{}, kqlJson );throw new RuntimeException(索引别名创建失败);}}catch (Exception ex){log.error(创建索引别名异常message:{},ex.getLocalizedMessage());log.error(创建索引别名异常 kqlJson:{}, kqlJson );ex.printStackTrace();}}/** 判断body 中是否包含 acknowledged** param body* return/private boolean parseCreateAliasesResult(String body ){JSONObject json JSONObject.parseObject(body);if(json.containsKey(acknowledged) json.getBoolean(acknowledged)){return true;}return false;} } 单元测试方法详解 作者用的testng 1、执行检索 /** 执行检索这里使用sql查询近一分钟的数据** throws IOException* throws ParseException/Test(groups hlog, enabled true )public void testRequest() throws IOException, ParseException {String method POST;String endpoint /_sql?formatjson;RangeTimeUtils rangeTimeUtils new RangeTimeUtils(); // String time 2025-02-21 17:48:00;String time TimeUtils.calcWholeMinute();rangeTimeUtils.calcAllByTimeAndPeriod( time, 1);// yyyy-MM-dd HH:mm:ss – yyyy-MM-ddTHH:mm 根据实际时间字段格式调整SimpleDateFormat sdf2 new SimpleDateFormat(yyyy-MM-ddTHH:mm );Long cStart rangeTimeUtils.getStartDate().getTime();Long cEnd rangeTimeUtils.getStartDate().getTime();// 原索引名是sql关键字索引需要创建别名SimpleDateFormat sdf_source new SimpleDateFormat(yyyy.MM.dd );SimpleDateFormat sdf_target new SimpleDateFormat(yyyy_MM_dd );String idxAliases interf_dand_comm_undef_sdf_target.format(rangeTimeUtils.getEndDate());// 生产环境String sqlQuery2 SELECT logObj.province,server, logObj.app, logObj.node_ip, SUBSTRING(logObj.uri,0,30) , count(id) as num, avg(logObj.costTime) latency , sum(iif(logObj.code200,1,0)) as success_count FROM idxAliases
// where cTime between cStart:000800 and cEnd:000800 where cTime between cStart and cEnd group by logObj.province,server, logObj.app, logObj.node_ip, SUBSTRING(logObj.uri,0,30) ; // 将index_name、field_name和value替换为相应的索引名称、字段名和值// max fetch_size is 65536 不同的版本单次查询最大数据限制不一样这里测试只查 5 条String jsonEntity {\query: \ sqlQuery2 \, \fetch_size: 5};String ret service.request( host, port,username, password,method, endpoint, jsonEntity );log.debug(ret:{},ret);}2、判断索引是否存在 /**
判断索引是否存在* throws IOException/Test(groups hlog, enabled true )public void testIsExistsIndex() throws IOException {/ interf_dand_hig_c_trans_2024_12_03interf_dand_hig_c_trans_2025_02_20interf_dand_comm_undef_2024_12_03interf_dand_comm_undef_2025_02_20/boolean ret service.isExistsIndex( host, port,username, password,interf_dand_comm_undef_2025_02_21);log.info(ret:{},ret);} 3、判断索引别名是否存在 /** 判断索引别名是否存在* throws IOException/Test(groups hlog, enabled true )public void testIsExistsAliases() throws IOException {/ interf_dand_hig_c_trans_2025_02_21interf_dand_comm_undef_2025_02_21/boolean ret service.isExistsAliases( host, port,username, password,interf_dand_comm_undef_2025_02_21);log.info(ret:{},ret);}4、判断索引字段时否存在 /** 判断索引字段时否存在* throws IOException/Test(groups hlog, enabled true )public void testIsExistsProperty() throws IOException {// logObj.node_ip logObj.statusboolean ret service.isExistsProperty( host, port,username, password,interf_dand_comm_undef_2025_02_21, logObj.status);log.info(ret:{},ret);} 5、获取索引列类型 /** 获取索引列类型** throws IOException/Test( enabled true )public void testGetIndexPropertyType() throws IOException {/ interf_dand_hig_c_trans_2025_02_21interf_dand_comm_undef_2025_02_21/String ret service.getIndexPropertyType( host, port,username, password,interf_dand_comm_undef_2025_02_21,logObj.status );log.info(ret:{},ret);} 6、阻塞线程直至索引就绪 /** 阻塞线程直至索引就绪为了应对跨日时索引名短时间可能不存在的问题** throws IOException/Test( enabled true )public void testWaitIndexReady() throws IOException, ParseException, InterruptedException {RangeTimeUtils rangeTimeUtils new RangeTimeUtils();String time 2025-02-21 20:00:00;rangeTimeUtils.calcAllByTimeAndPeriod( time, 1 );service.waitIndexReady( host, port,username, password,interf_dand_comm_undef_2025_02_21);} 7、创建索引别名 /** 创建索引别名** throws IOException/Test( enabled true )public void testCreateAliases() throws IOException, ParseException {RangeTimeUtils rangeTimeUtils new RangeTimeUtils();String time 2025-02-21 20:00:00;rangeTimeUtils.calcAllByTimeAndPeriod( time, 1 );service.createAliases( username, password,host, port,interf_dand-comm-undef.2025.02.21,interf_dand_comm_undef_2025_02_21);} 8、SQL查询ES /** sql查询** throws IOException/Test( enabled true )public void testSQLRequest() throws IOException, ParseException, InterruptedException {log.info(实时从ES统计接口请求数量、成功率、延迟指标 (所有接口/不区分接口)服务级 {}:{}, host, port );RangeTimeUtils rangeTimeUtils new RangeTimeUtils();String time 2025-02-21 20:00:00;rangeTimeUtils.calcAllByTimeAndPeriod( time, 1 );ListObject retnull;String method POST;String endpoint /_sql?formatjson;// 根据实际时间字段格式调整SimpleDateFormat sdf2 new SimpleDateFormat(yyyy-MM-ddTHH:mm);long cStart rangeTimeUtils.getStartDate().getTime();long cEnd rangeTimeUtils.getEndDate().getTime();// 原索引名是sql关键字索引需要创建别名SimpleDateFormat sdf_source new SimpleDateFormat(yyyy.MM.dd );SimpleDateFormat sdf_target new SimpleDateFormat(yyyy_MM_dd );// interf_dand-comm-undef.2024.01.06String idxName ESConsts.INTERFLOG_INDEX_NAME_PREFIX sdf_source.format(rangeTimeUtils.getStartDate());// interf_dand_comm_undef_2024_01_06String idxAliases ESConsts.INTERFLOG_INDEX_ALIASES_PREFIX sdf_target.format(rangeTimeUtils.getStartDate());//等待索引就绪service.waitIndexReady(host, port,username, password,idxName);// 判断别名是否存在不存在则创建if(!service.isExistsAliases( host, port,username, password,idxAliases)){service.createAliases( username, password, host, port,idxName, idxAliases);}/ logObj.status.keyword对应String 、还是 logObj.status对应int 根据类型来 /String statusFieldName getStatusFieldName(host, port,username, password,idxAliases );String sqlQuery2 SELECT logObj.province,server, logObj.app, node_ip, uri, count(id) as num, avg(logObj.cost) latency , sum(iif(statusFieldName200,1,0)) as success_count FROM idxAliases // 算头不算尾where cTime ? and cTime ? group by logObj.province,server, logObj.app ;// max fetch_size is 65536 已验证一次查询结果不会超过这个数故不用做滚动查询String jsonEntity {\query: \ sqlQuery2 \, \params: [cStart,cEnd], \fetch_size: 10 }; // 65536 – 不同的版本单次查询最大数据限制不一样这里测试只查10条String body null;try{body service.request( host, port,username, password,method, endpoint, jsonEntity );// 组装成实际需要的业务类型集合 // ret ESIntfLoad3Parser.parseESresult4SvcLoad( body, IDC, host ,port, rangeTime);}catch (Exception ex){log.error(实时从ES统计接口请求数量、成功率、延迟指标 (所有接口)服务级 指标异常message:{},ex.getLocalizedMessage());log.error(sqlQuery2:{}, sqlQuery2 );log.error(jsonEntity:{}, jsonEntity);ex.printStackTrace();}log.debug( body:{}, body );} 9、KQL查询ES /** KQL查询** throws IOException/Test( enabled true )public void testKQLRequest() throws IOException, ParseException, InterruptedException {log.info(实时从ES统计接口请求数量、成功率、延迟指标 (所有接口/不区分接口)服务级 {}:{}, host, port );RangeTimeUtils rangeTimeUtils new RangeTimeUtils();String time 2025-02-21 20:00:00;rangeTimeUtils.calcAllByTimeAndPeriod( time, 1 );ListObject retnull;String method GET;String endpoint /interf_dand_comm_undef_2025_02_21/_search;SimpleDateFormat sdf2 new SimpleDateFormat(yyyy-MM-ddTHH:mm);// 这里cStart、cEnd两者相关1分钟在上面做了初始化long cStart rangeTimeUtils.getStartDate().getTime();long cEnd rangeTimeUtils.getEndDate().getTime();// 原索引名是sql关键字索引需要创建别名SimpleDateFormat sdf_source new SimpleDateFormat(yyyy.MM.dd );SimpleDateFormat sdf_target new SimpleDateFormat(yyyy_MM_dd );// interf_dand-comm-undef.2024.01.06String idxName ESConsts.INTERFLOG_INDEX_NAME_PREFIX sdf_source.format(rangeTimeUtils.getStartDate());// interf_dand_comm_undef_2024_01_06String idxAliases ESConsts.INTERFLOG_INDEX_ALIASES_PREFIX sdf_target.format(rangeTimeUtils.getStartDate());//等待索引就绪service.waitIndexReady(host, port,username, password,idxName);// 判断别名是否存在不存在则创建if(!service.isExistsAliases( host, port,username, password,idxAliases)){service.createAliases( username, password, host, port,idxName, idxAliases);}String kqlQuery2 { \bool: { \must: [ { \match_phrase: { \logObj.app:\order-service\ } }, { \range: { \cTime: { \gte: ?, \lt: ? } }} ] } }; // 将index_name、field_name和value替换为相应的索引名称、字段名和值// max fetch_size is 65536 已验证一次查询结果不会超过这个数故不用做滚动查询String jsonEntity {\query: \ kqlQuery2 \, \params: [cStart,cEnd], \size: 0,\ \n \size: 2 }; // 65536 – 不同的版本单次查询最大数据限制不一样, 这里测试只查两条String body null;try{body service.request( host, port,username, password,method, endpoint, jsonEntity );// 组装成实际需要的业务类型集合 // ret ESIntfLoad3Parser.parseESresult4SvcLoad( body, IDC, host ,port, rangeTime);}catch (Exception ex){log.error(实时从ES统计接口请求数量、成功率、延迟指标 (所有接口)服务级 指标异常message:{},ex.getLocalizedMessage());log.error(kqlQuery2:{}, kqlQuery2 );log.error(jsonEntity:{}, jsonEntity);ex.printStackTrace();}log.debug( body:{}, body );} 完整单元测试实现 完整单元测试类 package person.brickman.es;import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.testng.AbstractTestNGSpringContextTests; import org.testng.annotations.Test; import person.brickman.MainApplication; import person.brickman.constant.ESConsts; import person.brickman.util.RangeTimeUtils; import person.brickman.util.TimeUtils;import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.List;/** 单元测试类** 1、RestClientBuilder初始化同时支持单机与集群因每个单元测试方法都会调所以没有写独立的单元测试方法* 2、发送ES查询请求公共方法封装支持sql、kql、代理访问、集群访问、鉴权单元测试以SQL查询举例参数直接拼的使用占位符示例见9、10* 3、判断ES索引是否存在/_cat/indices/\({indexName}* 4、判断ES索引别名是否存在 /_cat/aliases/\){indexName}* 5、判断ES索引指定字段/属性是否存在这里字段支持多级如logObj.id* 6、判断ES索引指定字段/属性的类型字段支持多级* 7、阻塞线程直至索引就绪为了应对跨日时索引名短时间可能不存在的问题* 8、创建索引别名可用于支持sql查询索引名中有特殊字符不能用作表名可通过创建别名来解决* 9、SQL查询ES* 10、KQL查询ES** Author: brickman* CreateDate: 2025/2/21 22:14* Version: 1.0/ Slf4j SpringBootTest(classes MainApplication.class) public class ESRestClientServiceImplTest extends AbstractTestNGSpringContextTests {Value(\({elasticsearch.dand.interf.hosts})private String host;Value(\){elasticsearch.dand.interf.port})private int port;Value(\({elasticsearch.dand.interf.username})private String username;Value(\){elasticsearch.dand.interf.password})private String password;Autowiredprivate ESRestClientService service;/** 执行检索这里使用sql查询近一分钟的数据** throws IOException* throws ParseException/Test(groups hlog, enabled true )public void testRequest() throws IOException, ParseException {String method POST;String endpoint /_sql?formatjson;RangeTimeUtils rangeTimeUtils new RangeTimeUtils(); // String time 2025-02-21 17:48:00;String time TimeUtils.calcWholeMinute();rangeTimeUtils.calcAllByTimeAndPeriod( time, 1);// yyyy-MM-dd HH:mm:ss – yyyy-MM-ddTHH:mm 根据实际时间字段格式调整SimpleDateFormat sdf2 new SimpleDateFormat(yyyy-MM-ddTHH:mm );Long cStart rangeTimeUtils.getStartDate().getTime();Long cEnd rangeTimeUtils.getStartDate().getTime();// 原索引名是sql关键字索引需要创建别名SimpleDateFormat sdf_source new SimpleDateFormat(yyyy.MM.dd );SimpleDateFormat sdf_target new SimpleDateFormat(yyyy_MM_dd );String idxAliases interf_dand_comm_undef_sdf_target.format(rangeTimeUtils.getEndDate());// 生产环境String sqlQuery2 SELECT logObj.province,server, logObj.app, logObj.node_ip, SUBSTRING(logObj.uri,0,30) , count(id) as num, avg(logObj.costTime) latency , sum(iif(logObj.code200,1,0)) as success_count FROM idxAliases
// where cTime between cStart:000800 and cEnd:000800 where cTime between cStart and cEnd group by logObj.province,server, logObj.app, logObj.node_ip, SUBSTRING(logObj.uri,0,30) ; // 将index_name、field_name和value替换为相应的索引名称、字段名和值// max fetch_size is 65536 不同的版本单次查询最大数据限制不一样这里测试只查 5 条String jsonEntity {\query: \ sqlQuery2 \, \fetch_size: 5};String ret service.request( host, port,username, password,method, endpoint, jsonEntity );log.debug(ret:{},ret);}/**
判断索引是否存在* throws IOException/Test(groups hlog, enabled true )public void testIsExistsIndex() throws IOException {/ interf_dand_hig_c_trans_2024_12_03interf_dand_hig_c_trans_2025_02_20interf_dand_comm_undef_2024_12_03interf_dand_comm_undef_2025_02_20/boolean ret service.isExistsIndex( host, port,username, password,interf_dand_comm_undef_2025_02_21);log.info(ret:{},ret);}/** 判断索引别名是否存在* throws IOException/Test(groups hlog, enabled true )public void testIsExistsAliases() throws IOException {/ interf_dand_hig_c_trans_2025_02_21interf_dand_comm_undef_2025_02_21/boolean ret service.isExistsAliases( host, port,username, password,interf_dand_comm_undef_2025_02_21);log.info(ret:{},ret);}/** 判断索引字段时否存在* throws IOException/Test(groups hlog, enabled true )public void testIsExistsProperty() throws IOException {// logObj.node_ip logObj.statusboolean ret service.isExistsProperty( host, port,username, password,interf_dand_comm_undef_2025_02_21, logObj.status);log.info(ret:{},ret);}/** 获取索引列类型** throws IOException/Test( enabled true )public void testGetIndexPropertyType() throws IOException {/ interf_dand_hig_c_trans_2025_02_21interf_dand_comm_undef_2025_02_21/String ret service.getIndexPropertyType( host, port,username, password,interf_dand_comm_undef_2025_02_21,logObj.status );log.info(ret:{},ret);}/** 阻塞线程直至索引就绪为了应对跨日时索引名短时间可能不存在的问题** throws IOException/Test( enabled true )public void testWaitIndexReady() throws IOException, ParseException, InterruptedException {RangeTimeUtils rangeTimeUtils new RangeTimeUtils();String time 2025-02-21 20:00:00;rangeTimeUtils.calcAllByTimeAndPeriod( time, 1 );service.waitIndexReady( host, port,username, password,interf_dand_comm_undef_2025_02_21);}/** 创建索引别名** throws IOException/Test( enabled true )public void testCreateAliases() throws IOException, ParseException {RangeTimeUtils rangeTimeUtils new RangeTimeUtils();String time 2025-02-21 20:00:00;rangeTimeUtils.calcAllByTimeAndPeriod( time, 1 );service.createAliases( username, password,host, port,interf_dand-comm-undef.2025.02.21,interf_dand_comm_undef_2025_02_21);}/** sql查询** throws IOException/Test( enabled true )public void testSQLRequest() throws IOException, ParseException, InterruptedException {log.info(实时从ES统计接口请求数量、成功率、延迟指标 (所有接口/不区分接口)服务级 {}:{}, host, port );RangeTimeUtils rangeTimeUtils new RangeTimeUtils();String time 2025-02-21 20:00:00;rangeTimeUtils.calcAllByTimeAndPeriod( time, 1 );ListObject retnull;String method POST;String endpoint /_sql?formatjson;// 根据实际时间字段格式调整SimpleDateFormat sdf2 new SimpleDateFormat(yyyy-MM-ddTHH:mm);long cStart rangeTimeUtils.getStartDate().getTime();long cEnd rangeTimeUtils.getEndDate().getTime();// 原索引名是sql关键字索引需要创建别名SimpleDateFormat sdf_source new SimpleDateFormat(yyyy.MM.dd );SimpleDateFormat sdf_target new SimpleDateFormat(yyyy_MM_dd );// interf_dand-comm-undef.2024.01.06String idxName ESConsts.INTERFLOG_INDEX_NAME_PREFIX sdf_source.format(rangeTimeUtils.getStartDate());// interf_dand_comm_undef_2024_01_06String idxAliases ESConsts.INTERFLOG_INDEX_ALIASES_PREFIX sdf_target.format(rangeTimeUtils.getStartDate());//等待索引就绪service.waitIndexReady(host, port,username, password,idxName);// 判断别名是否存在不存在则创建if(!service.isExistsAliases( host, port,username, password,idxAliases)){service.createAliases( username, password, host, port,idxName, idxAliases);}/ logObj.status.keyword对应String 、还是 logObj.status对应int 根据类型来 /String statusFieldName getStatusFieldName(host, port,username, password,idxAliases );String sqlQuery2 SELECT logObj.province,server, logObj.app, node_ip, uri, count(id) as num, avg(logObj.cost) latency , sum(iif(statusFieldName200,1,0)) as success_count FROM idxAliases // 算头不算尾where cTime ? and cTime ? group by logObj.province,server, logObj.app ;// max fetch_size is 65536 已验证一次查询结果不会超过这个数故不用做滚动查询String jsonEntity {\query: \ sqlQuery2 \, \params: [cStart,cEnd], \fetch_size: 10 }; // 65536 – 不同的版本单次查询最大数据限制不一样这里测试只查10条String body null;try{body service.request( host, port,username, password,method, endpoint, jsonEntity );// 组装成实际需要的业务类型集合 // ret ESIntfLoad3Parser.parseESresult4SvcLoad( body, IDC, host ,port, rangeTime);}catch (Exception ex){log.error(实时从ES统计接口请求数量、成功率、延迟指标 (所有接口)服务级 指标异常message:{},ex.getLocalizedMessage());log.error(sqlQuery2:{}, sqlQuery2 );log.error(jsonEntity:{}, jsonEntity);ex.printStackTrace();}log.debug( body:{}, body );}/** kql查询** throws IOException/Test( enabled true )public void testKQLRequest() throws IOException, ParseException, InterruptedException {log.info(实时从ES统计接口请求数量、成功率、延迟指标 (所有接口/不区分接口)服务级 {}:{}, host, port );RangeTimeUtils rangeTimeUtils new RangeTimeUtils();String time 2025-02-21 20:00:00;rangeTimeUtils.calcAllByTimeAndPeriod( time, 1 );ListObject retnull;String method GET;String endpoint /interf_dand_comm_undef_2025_02_21/_search;SimpleDateFormat sdf2 new SimpleDateFormat(yyyy-MM-ddTHH:mm);// 这里cStart、cEnd两者相关1分钟在上面做了初始化long cStart rangeTimeUtils.getStartDate().getTime();long cEnd rangeTimeUtils.getEndDate().getTime();// 原索引名是sql关键字索引需要创建别名SimpleDateFormat sdf_source new SimpleDateFormat(yyyy.MM.dd );SimpleDateFormat sdf_target new SimpleDateFormat(yyyy_MM_dd );// interf_dand-comm-undef.2024.01.06String idxName ESConsts.INTERFLOG_INDEX_NAME_PREFIX sdf_source.format(rangeTimeUtils.getStartDate());// interf_dand_comm_undef_2024_01_06String idxAliases ESConsts.INTERFLOG_INDEX_ALIASES_PREFIX sdf_target.format(rangeTimeUtils.getStartDate());//等待索引就绪service.waitIndexReady(host, port,username, password,idxName);// 判断别名是否存在不存在则创建if(!service.isExistsAliases( host, port,username, password,idxAliases)){service.createAliases( username, password, host, port,idxName, idxAliases);}String kqlQuery2 { \bool: { \must: [ { \match_phrase: { \logObj.app:\inst-service\ } }, { \range: { \cTime: { \gte: ?, \lt: ? } }} ] } }; // 将index_name、field_name和value替换为相应的索引名称、字段名和值// max fetch_size is 65536 已验证一次查询结果不会超过这个数故不用做滚动查询String jsonEntity {\query: \ kqlQuery2 \, \params: [cStart,cEnd], \size: 0,\ \n \size: 2 }; // 65536 – 不同的版本单次查询最大数据限制不一样, 这里测试只查两条String body null;try{body service.request( host, port,username, password,method, endpoint, jsonEntity );// 组装成实际需要的业务类型集合 // ret ESIntfLoad3Parser.parseESresult4SvcLoad( body, IDC, host ,port, rangeTime);}catch (Exception ex){log.error(实时从ES统计接口请求数量、成功率、延迟指标 (所有接口)服务级 指标异常message:{},ex.getLocalizedMessage());log.error(kqlQuery2:{}, kqlQuery2 );log.error(jsonEntity:{}, jsonEntity);ex.printStackTrace();}log.debug( body:{}, body );}/** 查询索引 logObj.status 字段类型* 如果是 text 则使用 logObj.status.keyword* 如果是 long 则使用 logObj.status** Author: brickman* CreateDate: 2025-02-21 17:48:00* Version: 1.0/private String getStatusFieldName(String host, int port,String username, String password,String idxAliases ) throws IOException {// logObj.status 字段 default: longString statusFieldName logObj.status;// 查询索引 logObj.status 字段类型if(text.equalsIgnoreCase( service.getIndexPropertyType( host, port,username, password,idxAliases, logObj.status ) )){statusFieldName logObj.status.keyword;}return statusFieldName;} } 单元测试效果 作者使用的testng配置中心nacos 常量类 package person.brickman.constant;/** Description: ES 常量类* Author: brickman* CreateDate: 2025/2/21 22:05* Version: 1.0*/ public class ESConsts {/** interf 索引名常量 */public static final String INTERFLOG_INDEX_NAME_PREFIX interf-log-dand-comm-undef.;/** interf 索引别名常量 */public static final String INTERFLOG_INDEX_ALIASES_PREFIX interf_log_dand_commundef;/** csb 索引名常量 */public static final String CSB_INDEX_NAME_PREFIX csb-service.csb.;/** csb 索引别名常量 */public static final String CSB_INDEX_ALIASES_PREFIX csb_servicecsb;/** skywalking 索引名常量 */public static final String SW_INDEX_NAME_PREFIX sw_metrics-all-;/** skywalking 索引别名常量 /public static final String SW_INDEX_ALIASES_PREFIX sw_metricsall; }工具类 RangeTimeUtils 用于通过指定时间如当前时间生成开始时间、截止时间、采集周期、采集时间跨度范围、指标采集时间 package person.brickman.util;import lombok.Data; import org.apache.commons.lang3.time.DateUtils;import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date;/** Description:** s_start String(19) 开始时间 因为是时间段的数据所以增加4个字段* s_end String(19) 截止时间* period smallint 采集周期 多久采一次。默认1单位分钟与范围不同* n_range smallint 采集时间跨度范围 采多长时间的数据。默认1单位分钟 eg1min* time String(19) 指标采集时间 冗余字段yyyy-MM-dd HH:mm:ss* 部分时序数据库以long形式显示timestamp此字段便于查看** Author: brickman* CreateDate: 2025/02/21 9:03 PM* Version: 1.0*/ Data public class RangeTimeUtils {/** 开始时间 yyyy-MM-dd HH:mm:ss */private String cStart;/** 截止时间 yyyy-MM-dd HH:mm:ss */private String cEnd;/** 采集周期 多久采一次。默认1单位分钟与范围不同 /private int period;/** 采集时间跨度范围* deprecated 一般与采集周期一致* /private int nRange;/** 采集时间 yyyy-MM-dd HH:mm:ss* eg 2025-02-21 17:48:00/private String time;/** 通过时间和周期计算所有字段值* 默认偏移1分钟因之前调的默认方法且需要偏移所有遵循开闭原则* param time eg 2025-02-21 17:48:00* param period 单位分钟* return void/public void calcAllByTimeAndPeriod( String time, int period ) throws ParseException {calcAllByTimeAndPeriod( time, period, -1 );}/* param shifting 波动时间、偏移时间* return null/public void calcAllByTimeAndPeriod( String time, int period, int shifting ) throws ParseException {SimpleDateFormat sdf1 new SimpleDateFormat(yyyy-MM-dd HH:mm:ss);Date dt sdf1.parse(time);// dt:date timeSimpleDateFormat sdf2 new SimpleDateFormat(yyyy-MM-dd HH:mm);Date de sdf2.parse(sdf2.format(dt));//de:date endDate ds DateUtils.addMinutes(de, -period ); // ds: date start/* */ds DateUtils.addMinutes(ds, shifting);de DateUtils.addMinutes(de, shifting);this.cStart sdf1.format(ds);this.cEnd sdf1.format(de);this.periodperiod;// 采集时间跨度范围 与 采集周期一致this.nRange this.period;this.timetime;}public Date getStartDate() throws ParseException {SimpleDateFormat sdf1 new SimpleDateFormat(yyyy-MM-dd HH:mm:ss);return sdf1.parse(this.cStart);// dt:date time}public Date getEndDate() throws ParseException {SimpleDateFormat sdf1 new SimpleDateFormat(yyyy-MM-dd HH:mm:ss);return sdf1.parse(this.cEnd);// dt:date time}public Date getGraphQLStartDate() throws ParseException {SimpleDateFormat sdf1 new SimpleDateFormat(yyyy-MM-dd HH:mm:ss);return sdf1.parse(this.cStart);// dt:date time}public Date getGraphQLEndDate() throws ParseException {SimpleDateFormat sdf1 new SimpleDateFormat(yyyy-MM-dd HH:mm:ss);return sdf1.parse(this.cEnd);// dt:date time}public String getGraphQLStart() throws ParseException {SimpleDateFormat sdf1 new SimpleDateFormat(yyyy-MM-dd HHmm);return sdf1.format(getGraphQLStartDate());// dt:date time}public String getGraphQLEnd() throws ParseException {SimpleDateFormat sdf1 new SimpleDateFormat(yyyy-MM-dd HHmm);return sdf1.format(getGraphQLEndDate());// dt:date time} } TimeUtils 取当前时间精确到分取整分 package person.brickman.util;import java.text.SimpleDateFormat; import java.util.Date;/* Description: 时间工具类* 取当前时间精确到分取整分** Author: brickman* CreateDate: 2025/2/21 22:09* Version: 1.0*/ public class TimeUtils {/** 取当前时间精确到分取整分可优化为取数据库时间 /public static String calcWholeMinute(){SimpleDateFormat sdf new SimpleDateFormat(yyyy-MM-dd HH:mm);String time sdf.format(new Date());System.out.println(#### WholeMinuteTime:time:00);return time:00;}/ 根椐入参取整分 **/public static String calcWholeMinute( Date date ){SimpleDateFormat sdf new SimpleDateFormat(yyyy-MM-dd HH:mm);String time sdf.format( date );System.out.println(#### WholeMinuteTime:time:00);return time:00;} } 总结 1、ElasticSearch公共方法封装能降低开发成本、提高开发效率 2、非常通用的方法如查Skywalking的索引数据、查自研的接口调用日志数据等 3、说本文将ES的日常开发代码一网代尽都不为过 附件一ElasticSearch介绍 ElasticSearch介绍 在Elasticsearch简称ES中它是一个基于Apache Lucene构建的开源、分布式、RESTful搜索引擎旨在实时地存储、搜索和分析大量数据。Elasticsearch广泛应用于日志分析、全文搜索、实时分析等场景。下面我将介绍Elasticsearch的一些基本概念和功能

  1. 基本概念 索引Index在Elasticsearch中索引类似于传统关系数据库中的“数据库”。它是存储相关文档数据的地方。 类型Type在Elasticsearch 7.x及以前版本中每个索引可以包含多个类型。但从Elasticsearch 7.x开始一个索引中只能有一个类型默认为_doc这一改动主要是因为Elasticsearch 8.x将完全废弃类型功能。 文档Document文档是Elasticsearch中最小的数据单元可以是JSON格式的数据。每个文档都有一个唯一的ID。 字段Field文档由一个或多个字段组成每个字段都有一个名称和一个值。 映射Mapping映射定义了索引中文档的结构包括字段的类型、是否索引、是否存储等属性。
  2. 核心功能 全文搜索Elasticsearch提供了强大的全文搜索能力支持模糊搜索、范围查询等。 实时性数据输入Elasticsearch后即可被搜索具有很高的实时性。 分布式特性Elasticsearch可以分布式部署在多台服务器上实现数据的分布式存储和查询提高了系统的可扩展性和可靠性。 RESTful API通过RESTful API可以方便地对Elasticsearch进行索引的创建、文档的增删改查等操作。 聚合Aggregations聚合允许你对数据进行复杂的分析如分组统计、计算平均值、求和等。 多租户Multi-tenancy支持多租户模式可以轻松地管理和隔离不同客户或项目的数据。
    结语 Elasticsearch是一个功能强大且灵活的搜索引擎适用于各种需要快速检索大量数据的场景。通过了解其基本概念和核心功能你可以开始构建自己的搜索解决方案。随着不断学习和实践你将能够充分利用Elasticsearch的潜力。