Elasticsearch的api
官方文档:https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-overview.html
官方给了两种Java调用Elasticsearch(以下简称"es")方式,分别是:
- Java Low Level REST Client:Java低级客户端。可以认为就是一个通过http请求来与es进行通信的工具,里面封装了对es的连接。就是自己传入json报文,同样也是返回给你json结果报文。
- Java High Level REST Client: Java高级客户端。基于低级的客户端,提供了更多的封装操作,通过面向对象的方式来组装请求报文,同样也支持面向对象式的处理结果报文。
有点类似mybaits、hibernate的区别。一个支持你传入sql来执行,一个更加符合面向对象的特点。
当然了,使用Java高级客户端的更加符合我们的编程习惯。
Java低级客户端
依赖
首先,需要导入java依赖。注:一般来说,maven依赖最好跟所使用的es版本相同
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>6.3.2</version>
</dependency>
获取客户端的连接
如同jdbc对应一个Connection一样,es对应的是一个RestClient,获取该RestClient的最简单的方式如下:
public RestClient getSimpleClient(){
RestClientBuilder restClientBuilder = RestClient.builder(
new HttpHost("localhost", 9200, "http"),
new HttpHost("localhost", 9201, "http"));
return restClientBuilder.build();
}
如果需要指定请求头里的数据,比如es需要先进行用户名、密码的验证
public RestClient getClientWithAuthentication(){
RestClientBuilder restClientBuilder = RestClient.builder(
new HttpHost("localhost", 9200, "http"),
new HttpHost("localhost", 9201, "http"));
//elasticsearch验证的用户名密码。
String username = "admin";
String password = "123456";
byte[] tokenByte = Base64.encodeBase64((username+":"+password).getBytes());
//将加密的信息转换为string
String tokenStr = new String(tokenByte);
String token = "Basic "+tokenStr;
Header[] headers = new Header[]{
new BasicHeader("Content-type","application/json;charset=utf-8"),
new BasicHeader("Authorization",token)
};
restClientBuilder.setDefaultHeaders(headers);
return restClientBuilder.build();
}
值得一提的是,RestClient是线程安全的,其生命周期是伴随着程序系统的,所以,不再使用时,记得关闭RestClient:
public void close(RestClient restClient) throws IOException {
restClient.close();
}
一个简单查询
低级客户端的api提供的调用方式也是类似之前curl执行rest调用过程一样。先是获取到es客户端的连接,而后指定报文执行调用。
public void doQuery() throws Exception {
//请求报文
String json = "{\"query\":{\"term\":{ \"srvName\":\"sDynSvc\"}}}";
RestClient restClient = getClientWithAuthentication();
//指定请求方式,请求uri:即索引、类型等
Request request = new Request("post", "/esb-srvlog-2019-07-21/doc/_search");
request.setEntity(new NStringEntity(json));
request.setOptions(RequestOptions.DEFAULT);
Response response = restClient.performRequest(request);
//返回的数据保存在HttpEntity里面,以流的形式,json的格式
HttpEntity responseEntity = response.getEntity();
String s = EntityUtils.toString(responseEntity);
System.out.println(s);
close(restClient);
}
请求报文是json格式,就得需要自己进行拼装,比如通过fastjson工具。响应的数据是流的形式,转成字符串也是json的格式,里面得数据就得需要需要自己写解析的方法来进行解析了。
其他的方法:比如创建、删除索引,修改文档等,基本上都是类似上面,大同小异。可参考下官方文档。
Java高级客户端
依赖
maven依赖如下:
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>6.5.4</version>
</dependency>
获取连接
正如之前所说es的高级客户端是基于低级客户端的,它封装了许多内部操作,它基于低级客户端的RestClientBuilder来获取一个RestHighLevelClient:
public RestHighLevelClient getSimpleClient(){
RestClientBuilder restClientBuilder = RestClient.builder(
new HttpHost("localhost", 9200, "http"),
new HttpHost("localhost", 9201, "http"));
return new RestHighLevelClient(restClientBuilder);
}
高级客户端内部会创建低级客户端用于基于提供的builder执行请求。低级客户端维护一个连接池,并启动一些线程,因此当你用完以后同样应该关闭高级客户端,并且在内部它将会关闭低级客户端,以释放这些资源。
public void close(RestHighLevelClient highLevelClient) throws IOException {
highLevelClient.close();
}
查询api
之前看到,低级客户端的入参及出参都得需要自己去拼装参数。与低级客户端不同的是,高级客户端则是通过面向对象的方式来描述入参与出参。
它得两个主要的类:
- SearchRequest:用来执行一次查询请求,指定查询的方法、索引类型;如果不指定索引,默认是所有索引进行查询。
- SearchSourceBuilder:封装我们的请求参数,我们之前的terms、bool、各种匹配等关键字的查询。
基本查询
比如简单的查询bool过滤器,指定各种must、must_not、should方法。 比如,我们对这条sql进行翻译:
SELECT
*
FROM
TABLE
WHERE srvName = 'srvName'
AND @timestamp < '2019-07-21T12:00:00.473Z'
AND @timestamp > '2019-07-21T13:23:00.473Z'
AND (retMsg LIKE '%ok%'
OR retCode = 0)
LIMIT 2, 5
翻译如下:
public void doQuery() throws IOException {
RestHighLevelClient client = getRestHighLevelClient();
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
String index = "";
SearchRequest searchRequest = new SearchRequest(index);
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
boolQuery.must(QueryBuilders.termQuery("srvName","sDynSvc"));
RangeQueryBuilder rangeQuery = QueryBuilders.rangeQuery("@timestamp");
rangeQuery.gte("2019-07-21T12:00:00.473Z");
rangeQuery.lt("2019-07-21T13:23:00.473Z");
boolQuery.mustNot(rangeQuery);
boolQuery.should(QueryBuilders.regexpQuery("retMsg",".*ok.*"));
boolQuery.should(QueryBuilders.termQuery("esbRetCode","0"));
searchSourceBuilder.query(boolQuery);
searchSourceBuilder.from(2);
searchSourceBuilder.size(5);
searchRequest.source(searchSourceBuilder);
SearchResponse response = client.search(searchRequest,RequestOptions.DEFAULT);
SearchHits hits = response.getHits();
System.out.println(hits.totalHits);
close(client);
}
聚合
es的api里面并不支持像sql里面的group by 多个字段的这种表达式,但是支持多层聚合的嵌套。
其实sql里面的group by a,b,c... 这种多个字段的聚合,也可以认为是一种多层的嵌套,只不过是以一种二维的形式展示出来的。
比如这种,聚合出来的二维表, 我们同样可以通过Json节点的形式来进行描述:
{
"DMDB":{
"DRS":{
"DMDB.DRS.DrSendType":"",
"DMDB.DRS.ProcessExists":"",
},
"DRS.B":{
"DMDB.DRS.B.DrSendType":"",
"DMDB.DRS.B.ProcessExists":""
},
...
},
"HBase":{
"HMaster":{
"Hadoop.HBase.HMaster.HaStatus":"",
"Hadoop.HBase.HMaster.ProcessExists":""
},
"HRegionServer":{
"Hadoop.HBase.HRegionServer.Blocked":"",
"Hadoop.HBase.HRegionServer.ProcessExists":""
}
...
},
"HDFS":{
"DataNode":{
"Hadoop.HDFS.DataNode.ProcessExists":""
},
"NameNode":{
"Hadoop.HDFS.NameNode.HaStatus":"",
"Hadoop.HDFS.NameNode.ProcessExists":""
}
...
},
"YARN":{
...
}
}
所以,针对这种多聚合,我们直接嵌套桶(buket)的方式:
public void agg(SearchSourceBuilder searchSourceBuilder){
TermsAggregationBuilder srvNameAgg = AggregationBuilders.terms("componentAgg").field("COMPONENT");
TermsAggregationBuilder retCodeAgg = AggregationBuilders.terms("subComponentAgg").field("SUB_COMPONENT");
TermsAggregationBuilder retMsgAgg = AggregationBuilders.terms("indicatorIdAgg").field("indicatorId");
srvNameAgg.subAggregation(retCodeAgg.subAggregation(retMsgAgg));
searchSourceBuilder.aggregation(srvNameAgg);
}
从返回的结果获取到Aggregations 对象:
searchRequest.source(searchSourceBuilder);
SearchResponse response = client.search(searchRequest,RequestOptions.DEFAULT);
Aggregations aggregations = response.getAggregations();
获取到aggregations里面的buket,再通过递归的方式,不断地获取到里面的Aggregations聚合对象。
发表评论