运行结果

统计每个学员的总成绩

这个是索引库使用通配符

优先在本地查询

只在本地节点中查询

只在指定id的节点里面进行查询

查询指定分片的数据

参考代码ESTestAggregation.java

package com.dajiangtai.djt_spider.elasticsearch;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit; import org.codehaus.jackson.map.ObjectMapper;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket;
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
import org.junit.Before;
import org.junit.Test;
/**
* Aggregation 操作
*
* @author 大讲台
*
*/
public class ESTestAggregation {
private TransportClient client; @Before
public void test0() throws UnknownHostException { // 开启client.transport.sniff功能,探测集群所有节点
Settings settings = Settings.settingsBuilder()
.put("cluster.name", "escluster")
.put("client.transport.sniff", true).build();
// on startup
// 获取TransportClient
client = TransportClient
.builder()
.settings(settings)
.build()
.addTransportAddress(
new InetSocketTransportAddress(InetAddress
.getByName("master"), 9300))
.addTransportAddress(
new InetSocketTransportAddress(InetAddress
.getByName("slave1"), 9300))
.addTransportAddress(
new InetSocketTransportAddress(InetAddress
.getByName("slave2"), 9300));
}
/**
* Aggregation 分组统计相同年龄学员个数
* @throws Exception
*/
@Test
public void test1() throws Exception {
SearchRequestBuilder builder = client.prepareSearch("djt1");
builder.setTypes("user")
.setQuery(QueryBuilders.matchAllQuery())
//按年龄分组聚合统计
.addAggregation(AggregationBuilders.terms("by_age").field("age").size(0))
; SearchResponse searchResponse = builder.get();
//获取分组信息
Terms terms = searchResponse.getAggregations().get("by_age");
List<Bucket> buckets = terms.getBuckets();
for (Bucket bucket : buckets) {
System.out.println(bucket.getKey()+":"+bucket.getDocCount());
}
} /**
* Aggregation 分组统计每个学员的总成绩
* @throws Exception
*/
@Test
public void test2() throws Exception {
SearchRequestBuilder builder = client.prepareSearch("djt2");
builder.setTypes("user")
.setQuery(QueryBuilders.matchAllQuery())
//按姓名分组聚合统计
.addAggregation(AggregationBuilders.terms("by_name")
.field("name")
.subAggregation(AggregationBuilders.sum("sum_score")
.field("score"))
.size(0))
;
SearchResponse searchResponse = builder.get();
//获取分组信息
Terms terms = searchResponse.getAggregations().get("by_name");
List<Bucket> buckets = terms.getBuckets();
for (Bucket bucket : buckets) {
Sum sum = bucket.getAggregations().get("sum_score");
System.out.println(bucket.getKey()+":"+sum.getValue());
}
} /**
* 支持多索引和多类型查询
* @throws Exception
*/
@Test
public void test3() throws Exception {
SearchRequestBuilder builder
= client//.prepareSearch("djt1","djt2")//可以指定多个索引库
.prepareSearch("djt*")//索引库可以使用通配符
.setTypes("user");//支持多个类型,但不支持通配符 SearchResponse searchResponse = builder.get(); SearchHits hits = searchResponse.getHits();
SearchHit[] hits2 = hits.getHits();
for (SearchHit searchHit : hits2) {
System.out.println(searchHit.getSourceAsString());
}
}
/**
* 分片查询方式
* @throws Exception
*/
@Test
public void test4() throws Exception {
SearchRequestBuilder
builder = client.prepareSearch("djt3")
.setTypes("user")
//.setPreference("_local")
//.setPreference("_only_local")
//.setPreference("_primary")
//.setPreference("_replica")
//.setPreference("_primary_first")
//.setPreference("_replica_first")
//.setPreference("_only_node:crKxtA2fRTG1UZdPN8QtaA")
//.setPreference("_prefer_node:nJL_MqcsSle6gY7iujoAlw")
.setPreference("_shards:3")
;
SearchResponse searchResponse = builder.get();
SearchHits hits = searchResponse.getHits();
SearchHit[] hits2 = hits.getHits();
for (SearchHit searchHit : hits2) {
System.out.println(searchHit.getSourceAsString());
}
}
/**
* 极速查询:通过路由插入数据(同一类别数据在一个分片)
* @throws Exception
*/
@Test
public void test5() throws Exception {
Acount acount = new Acount("13602546655","tom1","male",16);
Acount acount2 = new Acount("13602546655","tom2","male",17);
Acount acount3 = new Acount("13602546655","tom3","male",18);
Acount acount4 = new Acount("18903762536","john1","male",28);
Acount acount5 = new Acount("18903762536","john2","male",29);
Acount acount6 = new Acount("18903762536","john3","male",30);
List<Acount> list = new ArrayList<Acount>();
list.add(acount);
list.add(acount2);
list.add(acount3);
list.add(acount4);
list.add(acount5);
list.add(acount6); BulkProcessor bulkProcessor = BulkProcessor.builder(
client,
new BulkProcessor.Listener() { public void beforeBulk(long executionId, BulkRequest request) {
// TODO Auto-generated method stub
System.out.println(request.numberOfActions());
} public void afterBulk(long executionId, BulkRequest request,
Throwable failure) {
// TODO Auto-generated method stub
System.out.println(failure.getMessage());
} public void afterBulk(long executionId, BulkRequest request,
BulkResponse response) {
// TODO Auto-generated method stub
System.out.println(response.hasFailures());
}
})
.setBulkActions(1000) // 每个批次的最大数量
.setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))// 每个批次的最大字节数
.setFlushInterval(TimeValue.timeValueSeconds(5))// 每批提交时间间隔
.setConcurrentRequests(1) //设置多少个并发处理线程
//可以允许用户自定义当一个或者多个bulk请求失败后,该执行如何操作
.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
.build();
for (Acount a : list) {
ObjectMapper mapper = new ObjectMapper(); byte[] json = mapper.writeValueAsBytes(a);
bulkProcessor.add(new IndexRequest("djt3", "user")
.routing(a.getPhone().substring(0, 3))
.source(json));
} //阻塞至所有的请求线程处理完毕后,断开连接资源
bulkProcessor.awaitClose(3, TimeUnit.MINUTES);
client.close();
}
/**
* 极速查询:通过路由极速查询,也可以通过分片shards查询演示
*
* @throws Exception
*/
@Test
public void test6() throws Exception {
SearchRequestBuilder builder = client.prepareSearch("djt3")//可以指定多个索引库
.setTypes("user");//支持多个类型,但不支持通配符
builder.setQuery(QueryBuilders.matchAllQuery())
.setRouting("13602546655".substring(0, 3))
//.setRouting("18903762536".substring(0, 3))
;
SearchResponse searchResponse = builder.get(); SearchHits hits = searchResponse.getHits();
SearchHit[] hits2 = hits.getHits();
for (SearchHit searchHit : hits2) {
System.out.println(searchHit.getSourceAsString());
}
}
}

最新文章

  1. javascript中矩形的碰撞检测---- 计算碰撞部分的面积
  2. first Automation
  3. [HDOJ5439]Aggregated Counting(乱搞)
  4. Node.js模块
  5. Python基础4:数据类型:数字 字符串 日期
  6. cocos2d中的可见性检测
  7. ps 使用说明
  8. web api post注意事项
  9. Linux的启动过程
  10. UDP程序设计
  11. React组件二
  12. python 3.5 购物小程序
  13. windows下wchar_t* 转char*
  14. 第一百零二节,JavaScript函数
  15. 修复python的ModuleNotFoundError
  16. fcode-页面九宫格自动锁屏jquery插件
  17. python subprocess模块使用总结
  18. docker删除镜像和删除容器
  19. Kattis之旅——Rational Arithmetic
  20. JMeter学习(三)元件的作用域与执行顺序(转载)

热门文章

  1. SWIFT Function
  2. bug生命周期和bug状态处理
  3. UI基础:视图控制器.屏幕旋转.MVC 分类: iOS学习-UI 2015-07-02 22:21 62人阅读 评论(0) 收藏
  4. MyEclipse10 添加反编译JadClipse插件
  5. I.MX6 Linux Serial Baud Rate hacking
  6. php curl批处理
  7. js模板引擎---jtemplates使用
  8. 强大的Java Json工具类
  9. window.location.href跳转问题2
  10. test20180922 世界第一的猛汉王