elastisSearch-aggregations
2024-08-27 14:03:45
运行结果
统计每个学员的总成绩
这个是索引库使用通配符
优先在本地查询
只在本地节点中查询
只在指定id的节点里面进行查询
查询指定分片的数据
参考代码ESTestAggregation.java
package com.dajiangtai.djt_spider.elasticsearch; import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit; import org.codehaus.jackson.map.ObjectMapper;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket;
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
import org.junit.Before;
import org.junit.Test;
/**
* Aggregation 操作
*
* @author 大讲台
*
*/
public class ESTestAggregation {
private TransportClient client; @Before
public void test0() throws UnknownHostException { // 开启client.transport.sniff功能,探测集群所有节点
Settings settings = Settings.settingsBuilder()
.put("cluster.name", "escluster")
.put("client.transport.sniff", true).build();
// on startup
// 获取TransportClient
client = TransportClient
.builder()
.settings(settings)
.build()
.addTransportAddress(
new InetSocketTransportAddress(InetAddress
.getByName("master"), 9300))
.addTransportAddress(
new InetSocketTransportAddress(InetAddress
.getByName("slave1"), 9300))
.addTransportAddress(
new InetSocketTransportAddress(InetAddress
.getByName("slave2"), 9300));
}
/**
* Aggregation 分组统计相同年龄学员个数
* @throws Exception
*/
@Test
public void test1() throws Exception {
SearchRequestBuilder builder = client.prepareSearch("djt1");
builder.setTypes("user")
.setQuery(QueryBuilders.matchAllQuery())
//按年龄分组聚合统计
.addAggregation(AggregationBuilders.terms("by_age").field("age").size(0))
; SearchResponse searchResponse = builder.get();
//获取分组信息
Terms terms = searchResponse.getAggregations().get("by_age");
List<Bucket> buckets = terms.getBuckets();
for (Bucket bucket : buckets) {
System.out.println(bucket.getKey()+":"+bucket.getDocCount());
}
} /**
* Aggregation 分组统计每个学员的总成绩
* @throws Exception
*/
@Test
public void test2() throws Exception {
SearchRequestBuilder builder = client.prepareSearch("djt2");
builder.setTypes("user")
.setQuery(QueryBuilders.matchAllQuery())
//按姓名分组聚合统计
.addAggregation(AggregationBuilders.terms("by_name")
.field("name")
.subAggregation(AggregationBuilders.sum("sum_score")
.field("score"))
.size(0))
;
SearchResponse searchResponse = builder.get();
//获取分组信息
Terms terms = searchResponse.getAggregations().get("by_name");
List<Bucket> buckets = terms.getBuckets();
for (Bucket bucket : buckets) {
Sum sum = bucket.getAggregations().get("sum_score");
System.out.println(bucket.getKey()+":"+sum.getValue());
}
} /**
* 支持多索引和多类型查询
* @throws Exception
*/
@Test
public void test3() throws Exception {
SearchRequestBuilder builder
= client//.prepareSearch("djt1","djt2")//可以指定多个索引库
.prepareSearch("djt*")//索引库可以使用通配符
.setTypes("user");//支持多个类型,但不支持通配符 SearchResponse searchResponse = builder.get(); SearchHits hits = searchResponse.getHits();
SearchHit[] hits2 = hits.getHits();
for (SearchHit searchHit : hits2) {
System.out.println(searchHit.getSourceAsString());
}
}
/**
* 分片查询方式
* @throws Exception
*/
@Test
public void test4() throws Exception {
SearchRequestBuilder
builder = client.prepareSearch("djt3")
.setTypes("user")
//.setPreference("_local")
//.setPreference("_only_local")
//.setPreference("_primary")
//.setPreference("_replica")
//.setPreference("_primary_first")
//.setPreference("_replica_first")
//.setPreference("_only_node:crKxtA2fRTG1UZdPN8QtaA")
//.setPreference("_prefer_node:nJL_MqcsSle6gY7iujoAlw")
.setPreference("_shards:3")
;
SearchResponse searchResponse = builder.get();
SearchHits hits = searchResponse.getHits();
SearchHit[] hits2 = hits.getHits();
for (SearchHit searchHit : hits2) {
System.out.println(searchHit.getSourceAsString());
}
}
/**
* 极速查询:通过路由插入数据(同一类别数据在一个分片)
* @throws Exception
*/
@Test
public void test5() throws Exception {
Acount acount = new Acount("13602546655","tom1","male",16);
Acount acount2 = new Acount("13602546655","tom2","male",17);
Acount acount3 = new Acount("13602546655","tom3","male",18);
Acount acount4 = new Acount("18903762536","john1","male",28);
Acount acount5 = new Acount("18903762536","john2","male",29);
Acount acount6 = new Acount("18903762536","john3","male",30);
List<Acount> list = new ArrayList<Acount>();
list.add(acount);
list.add(acount2);
list.add(acount3);
list.add(acount4);
list.add(acount5);
list.add(acount6); BulkProcessor bulkProcessor = BulkProcessor.builder(
client,
new BulkProcessor.Listener() { public void beforeBulk(long executionId, BulkRequest request) {
// TODO Auto-generated method stub
System.out.println(request.numberOfActions());
} public void afterBulk(long executionId, BulkRequest request,
Throwable failure) {
// TODO Auto-generated method stub
System.out.println(failure.getMessage());
} public void afterBulk(long executionId, BulkRequest request,
BulkResponse response) {
// TODO Auto-generated method stub
System.out.println(response.hasFailures());
}
})
.setBulkActions(1000) // 每个批次的最大数量
.setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))// 每个批次的最大字节数
.setFlushInterval(TimeValue.timeValueSeconds(5))// 每批提交时间间隔
.setConcurrentRequests(1) //设置多少个并发处理线程
//可以允许用户自定义当一个或者多个bulk请求失败后,该执行如何操作
.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
.build();
for (Acount a : list) {
ObjectMapper mapper = new ObjectMapper(); byte[] json = mapper.writeValueAsBytes(a);
bulkProcessor.add(new IndexRequest("djt3", "user")
.routing(a.getPhone().substring(0, 3))
.source(json));
} //阻塞至所有的请求线程处理完毕后,断开连接资源
bulkProcessor.awaitClose(3, TimeUnit.MINUTES);
client.close();
}
/**
* 极速查询:通过路由极速查询,也可以通过分片shards查询演示
*
* @throws Exception
*/
@Test
public void test6() throws Exception {
SearchRequestBuilder builder = client.prepareSearch("djt3")//可以指定多个索引库
.setTypes("user");//支持多个类型,但不支持通配符
builder.setQuery(QueryBuilders.matchAllQuery())
.setRouting("13602546655".substring(0, 3))
//.setRouting("18903762536".substring(0, 3))
;
SearchResponse searchResponse = builder.get(); SearchHits hits = searchResponse.getHits();
SearchHit[] hits2 = hits.getHits();
for (SearchHit searchHit : hits2) {
System.out.println(searchHit.getSourceAsString());
}
}
}
最新文章
- javascript中矩形的碰撞检测---- 计算碰撞部分的面积
- first Automation
- [HDOJ5439]Aggregated Counting(乱搞)
- Node.js模块
- Python基础4:数据类型:数字 字符串 日期
- cocos2d中的可见性检测
- ps 使用说明
- web api post注意事项
- Linux的启动过程
- UDP程序设计
- React组件二
- python 3.5 购物小程序
- windows下wchar_t* 转char*
- 第一百零二节,JavaScript函数
- 修复python的ModuleNotFoundError
- fcode-页面九宫格自动锁屏jquery插件
- python subprocess模块使用总结
- docker删除镜像和删除容器
- Kattis之旅——Rational Arithmetic
- JMeter学习(三)元件的作用域与执行顺序(转载)
热门文章
- SWIFT Function
- bug生命周期和bug状态处理
- UI基础:视图控制器.屏幕旋转.MVC 分类: iOS学习-UI 2015-07-02 22:21 62人阅读 评论(0) 收藏
- MyEclipse10 添加反编译JadClipse插件
- I.MX6 Linux Serial Baud Rate hacking
- php curl批处理
- js模板引擎---jtemplates使用
- 强大的Java Json工具类
- window.location.href跳转问题2
- test20180922 世界第一的猛汉王