由于项目中有用到HttpClient异步发送大量http请求,所以做已记录

思路:使用HttpClient连接池,多线程

public class HttpAsyncClient {
private static int socketTimeout = 500;// 设置等待数据超时时间0.5秒钟 根据业务调整 private static int connectTimeout = 2000;// 连接超时 private static int poolSize = 100;// 连接池最大连接数 private static int maxPerRoute = 100;// 每个主机的并发最多只有1500 private static int connectionRequestTimeout = 3000; //从连接池中后去连接的timeout时间 // http代理相关参数
private String host = "";
private int port = 0;
private String username = "";
private String password = ""; // 异步httpclient
private CloseableHttpAsyncClient asyncHttpClient; // 异步加代理的httpclient
private CloseableHttpAsyncClient proxyAsyncHttpClient; public HttpAsyncClient() {
try {
this.asyncHttpClient = createAsyncClient(false);
this.proxyAsyncHttpClient = createAsyncClient(true);
} catch (Exception e) {
e.printStackTrace();
} } public CloseableHttpAsyncClient createAsyncClient(boolean proxy)
throws KeyManagementException, UnrecoverableKeyException,
NoSuchAlgorithmException, KeyStoreException,
MalformedChallengeException, IOReactorException { RequestConfig requestConfig = RequestConfig.custom()
.setConnectionRequestTimeout(connectionRequestTimeout)
.setConnectTimeout(connectTimeout)
.setSocketTimeout(socketTimeout).build(); SSLContext sslcontext = SSLContexts.createDefault(); UsernamePasswordCredentials credentials = new UsernamePasswordCredentials(
username, password); CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, credentials); // 设置协议http和https对应的处理socket链接工厂的对象
Registry<SchemeIOSessionStrategy> sessionStrategyRegistry = RegistryBuilder
.<SchemeIOSessionStrategy> create()
.register("http", NoopIOSessionStrategy.INSTANCE)
.register("https", new SSLIOSessionStrategy(sslcontext))
.build(); // 配置io线程
IOReactorConfig ioReactorConfig = IOReactorConfig.custom().setSoKeepAlive(false).setTcpNoDelay(true)
.setIoThreadCount(Runtime.getRuntime().availableProcessors())
.build();
// 设置连接池大小
ConnectingIOReactor ioReactor;
ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
PoolingNHttpClientConnectionManager conMgr = new PoolingNHttpClientConnectionManager(
ioReactor, null, sessionStrategyRegistry, null); if (poolSize > 0) {
conMgr.setMaxTotal(poolSize);
} if (maxPerRoute > 0) {
conMgr.setDefaultMaxPerRoute(maxPerRoute);
} else {
conMgr.setDefaultMaxPerRoute(10);
} ConnectionConfig connectionConfig = ConnectionConfig.custom()
.setMalformedInputAction(CodingErrorAction.IGNORE)
.setUnmappableInputAction(CodingErrorAction.IGNORE)
.setCharset(Consts.UTF_8).build(); Lookup<AuthSchemeProvider> authSchemeRegistry;
authSchemeRegistry = RegistryBuilder
.<AuthSchemeProvider> create()
.register(AuthSchemes.BASIC, new BasicSchemeFactory())
.register(AuthSchemes.DIGEST, new DigestSchemeFactory())
.register(AuthSchemes.NTLM, new NTLMSchemeFactory())
.register(AuthSchemes.SPNEGO, new SPNegoSchemeFactory())
.register(AuthSchemes.KERBEROS, new KerberosSchemeFactory())
.build();
conMgr.setDefaultConnectionConfig(connectionConfig); if (proxy) {
return HttpAsyncClients.custom().setConnectionManager(conMgr)
.setDefaultCredentialsProvider(credentialsProvider)
.setDefaultAuthSchemeRegistry(authSchemeRegistry)
.setProxy(new HttpHost(host, port))
.setDefaultCookieStore(new BasicCookieStore())
.setDefaultRequestConfig(requestConfig).build();
} else {
return HttpAsyncClients.custom().setConnectionManager(conMgr)
.setDefaultCredentialsProvider(credentialsProvider)
.setDefaultAuthSchemeRegistry(authSchemeRegistry)
.setDefaultCookieStore(new BasicCookieStore()).build();
} } public CloseableHttpAsyncClient getAsyncHttpClient() {
return asyncHttpClient;
} public CloseableHttpAsyncClient getProxyAsyncHttpClient() {
return proxyAsyncHttpClient;
}
}
public class HttpClientFactory {
private static HttpAsyncClient httpAsyncClient = new HttpAsyncClient(); private HttpClientFactory() {
} private static HttpClientFactory httpClientFactory = new HttpClientFactory(); public static HttpClientFactory getInstance() { return httpClientFactory; } public HttpAsyncClient getHttpAsyncClientPool() {
return httpAsyncClient;
} }
public void sendThredPost(List<FaceBookUserQuitEntity> list,String title,String subTitle,String imgUrl){
if(list == null || list.size() == 0){
new BusinessException("亚洲查询用户数据为空");
}
int number = list.size();
int num = number / 10;
PostThread[] threads = new PostThread[1];
if(num > 0){
threads = new PostThread[10];
for(int i = 0; i <= 9; i++) {
List<FaceBookUserQuitEntity> threadList = list.subList(i * num, (i + 1) * num > number ? number : (i + 1) * num);
if (threadList == null || threadList.size() == 0) {
new BusinessException("亚洲切分用户数据为空");
}
threads[i] = new PostThread(HttpClientFactory.getInstance().getHttpAsyncClientPool().getAsyncHttpClient(),
threadList, title, subTitle, imgUrl);
}
for (int k = 0; k< threads.length; k++) {
threads[k].start();
logger.info("亚洲线程: {} 启动",k);
}
for (int j = 0; j < threads.length; j++) {
try {
threads[j].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}else{
threads[0] = new PostThread(HttpClientFactory.getInstance().getHttpAsyncClientPool().getAsyncHttpClient(),
list,title,subTitle, imgUrl);
threads[0].start();
try {
threads[0].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
 public PostThread(CloseableHttpAsyncClient httpClient, List<FaceBookUserQuitEntity> list, String title, String subTitle,String imgUrl){
this.httpClient = httpClient;
this.list = list;
this. title= title;
this. subTitle= subTitle;
this. imgUrl= imgUrl;
}
@Override
public void run() {
try {
int size = list.size();
for (int k = 0; k < size; k += 100) {
List<FaceBookUserQuitEntity> subList = new ArrayList<FaceBookUserQuitEntity>();
if (k + 100 < size) {
subList = list.subList(k, k + 100);
} else {
subList = list.subList(k, size);
}
if(subList.size() > 0){
httpClient.start();
final long startTime = System.currentTimeMillis();
final CountDownLatch latch = new CountDownLatch(subList.size());
for (FaceBookUserQuitEntity faceBookEntity : subList) {
String senderId = faceBookEntity.getSenderId();
String player_id = faceBookEntity.getPlayer_id();
logger.info("开始发送消息:playerid=" + player_id);
String bodyStr = getPostbody(senderId, player_id, title, subTitle,
imgUrl, "Play Game", "");
if (!bodyStr.isEmpty()) {
final HttpPost httpPost = new HttpPost(URL);
StringEntity stringEntity = new StringEntity(bodyStr, "utf-8");
stringEntity.setContentEncoding("UTF-8");
stringEntity.setContentType("application/json");
httpPost.setEntity(stringEntity);
httpClient.execute(httpPost, new FutureCallback<HttpResponse>() {
@Override
public void completed(HttpResponse result) {
latch.countDown();
int statusCode = result.getStatusLine().getStatusCode();
if(200 == statusCode){
logger.info("请求发消息成功="+bodyStr);
try {
logger.info(EntityUtils.toString(result.getEntity(), "UTF-8"));
} catch (IOException e) {
e.printStackTrace();
}
}else{
logger.info("请求返回状态="+statusCode);
logger.info("请求发消息失败="+bodyStr);
try {
logger.info(EntityUtils.toString(result.getEntity(), "UTF-8"));
} catch (IOException e) {
e.printStackTrace();
}
}
} @Override
public void failed(Exception ex) {
latch.countDown();
logger.info("请求发消息失败e="+ex);
} @Override
public void cancelled() {
latch.countDown();
}
});
}
}
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
long leftTime = 10000 - (System.currentTimeMillis() - startTime);
if (leftTime > 0) {
try {
Thread.sleep(leftTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
} catch (UnsupportedCharsetException e) {
e.printStackTrace();
}
}

以上工具代码可直接使用,发送逻辑代码需适当修改。

												

最新文章

  1. Jquery 操作IFrame
  2. KIP-32 Add timestamps to Kafka message
  3. redis安装配置与测试
  4. 09_控制线程_线程睡眠sleep
  5. 让BOOTSTRAP默认SLIDER支持触屏设备
  6. CENTOS6上禁用IPV6和DHCP
  7. 浏览器兼容性判定写法格式(ie)
  8. Python封装的访问MySQL数据库的类及DEMO
  9. transition与visibility与display
  10. Debian/Ubuntu 已安装gcc/g++ 4.8.1
  11. 【响应式编程的思维艺术】 (3)flatMap背后的代数理论Monad
  12. Django-ORM多表操作(进阶)
  13. java求最大值以及定义方法调用
  14. e780. 设置JList中的已选项
  15. socat 广播以及多播
  16. 2018NOIP爆0记第一弹
  17. hadoop的自定义分组实现 (Partition机制)
  18. Eclipse中 将java Gradle项目转换为web项目
  19. 能显示git分支的终端提示配置
  20. 【ACM】最小公倍数

热门文章

  1. 关于几类STL容器swap的复杂度问题
  2. 2019 SDN上机第2次作业
  3. Salesforce 开发整理(七)配置审批流
  4. React Hooks 深入系列
  5. 初探Java设计模式5:一文了解Spring涉及到的9种设计模式
  6. 2019年上-C语言程序设计-第1次blog作业
  7. SQL ------------- 最大与最小函数
  8. mgcp的alg功能实现
  9. Hbase Filter之PrefixFilter
  10. 硬件笔记之制作MacOS Mojave U盘USB启动安装盘方法