异步HttpClient大量请求
2024-09-06 15:24:16
由于项目中有用到HttpClient异步发送大量http请求,所以做已记录
思路:使用HttpClient连接池,多线程
public class HttpAsyncClient {
private static int socketTimeout = 500;// 设置等待数据超时时间0.5秒钟 根据业务调整 private static int connectTimeout = 2000;// 连接超时 private static int poolSize = 100;// 连接池最大连接数 private static int maxPerRoute = 100;// 每个主机的并发最多只有1500 private static int connectionRequestTimeout = 3000; //从连接池中后去连接的timeout时间 // http代理相关参数
private String host = "";
private int port = 0;
private String username = "";
private String password = ""; // 异步httpclient
private CloseableHttpAsyncClient asyncHttpClient; // 异步加代理的httpclient
private CloseableHttpAsyncClient proxyAsyncHttpClient; public HttpAsyncClient() {
try {
this.asyncHttpClient = createAsyncClient(false);
this.proxyAsyncHttpClient = createAsyncClient(true);
} catch (Exception e) {
e.printStackTrace();
} } public CloseableHttpAsyncClient createAsyncClient(boolean proxy)
throws KeyManagementException, UnrecoverableKeyException,
NoSuchAlgorithmException, KeyStoreException,
MalformedChallengeException, IOReactorException { RequestConfig requestConfig = RequestConfig.custom()
.setConnectionRequestTimeout(connectionRequestTimeout)
.setConnectTimeout(connectTimeout)
.setSocketTimeout(socketTimeout).build(); SSLContext sslcontext = SSLContexts.createDefault(); UsernamePasswordCredentials credentials = new UsernamePasswordCredentials(
username, password); CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, credentials); // 设置协议http和https对应的处理socket链接工厂的对象
Registry<SchemeIOSessionStrategy> sessionStrategyRegistry = RegistryBuilder
.<SchemeIOSessionStrategy> create()
.register("http", NoopIOSessionStrategy.INSTANCE)
.register("https", new SSLIOSessionStrategy(sslcontext))
.build(); // 配置io线程
IOReactorConfig ioReactorConfig = IOReactorConfig.custom().setSoKeepAlive(false).setTcpNoDelay(true)
.setIoThreadCount(Runtime.getRuntime().availableProcessors())
.build();
// 设置连接池大小
ConnectingIOReactor ioReactor;
ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
PoolingNHttpClientConnectionManager conMgr = new PoolingNHttpClientConnectionManager(
ioReactor, null, sessionStrategyRegistry, null); if (poolSize > 0) {
conMgr.setMaxTotal(poolSize);
} if (maxPerRoute > 0) {
conMgr.setDefaultMaxPerRoute(maxPerRoute);
} else {
conMgr.setDefaultMaxPerRoute(10);
} ConnectionConfig connectionConfig = ConnectionConfig.custom()
.setMalformedInputAction(CodingErrorAction.IGNORE)
.setUnmappableInputAction(CodingErrorAction.IGNORE)
.setCharset(Consts.UTF_8).build(); Lookup<AuthSchemeProvider> authSchemeRegistry;
authSchemeRegistry = RegistryBuilder
.<AuthSchemeProvider> create()
.register(AuthSchemes.BASIC, new BasicSchemeFactory())
.register(AuthSchemes.DIGEST, new DigestSchemeFactory())
.register(AuthSchemes.NTLM, new NTLMSchemeFactory())
.register(AuthSchemes.SPNEGO, new SPNegoSchemeFactory())
.register(AuthSchemes.KERBEROS, new KerberosSchemeFactory())
.build();
conMgr.setDefaultConnectionConfig(connectionConfig); if (proxy) {
return HttpAsyncClients.custom().setConnectionManager(conMgr)
.setDefaultCredentialsProvider(credentialsProvider)
.setDefaultAuthSchemeRegistry(authSchemeRegistry)
.setProxy(new HttpHost(host, port))
.setDefaultCookieStore(new BasicCookieStore())
.setDefaultRequestConfig(requestConfig).build();
} else {
return HttpAsyncClients.custom().setConnectionManager(conMgr)
.setDefaultCredentialsProvider(credentialsProvider)
.setDefaultAuthSchemeRegistry(authSchemeRegistry)
.setDefaultCookieStore(new BasicCookieStore()).build();
} } public CloseableHttpAsyncClient getAsyncHttpClient() {
return asyncHttpClient;
} public CloseableHttpAsyncClient getProxyAsyncHttpClient() {
return proxyAsyncHttpClient;
}
}
public class HttpClientFactory {
private static HttpAsyncClient httpAsyncClient = new HttpAsyncClient(); private HttpClientFactory() {
} private static HttpClientFactory httpClientFactory = new HttpClientFactory(); public static HttpClientFactory getInstance() { return httpClientFactory; } public HttpAsyncClient getHttpAsyncClientPool() {
return httpAsyncClient;
} }
public void sendThredPost(List<FaceBookUserQuitEntity> list,String title,String subTitle,String imgUrl){
if(list == null || list.size() == 0){
new BusinessException("亚洲查询用户数据为空");
}
int number = list.size();
int num = number / 10;
PostThread[] threads = new PostThread[1];
if(num > 0){
threads = new PostThread[10];
for(int i = 0; i <= 9; i++) {
List<FaceBookUserQuitEntity> threadList = list.subList(i * num, (i + 1) * num > number ? number : (i + 1) * num);
if (threadList == null || threadList.size() == 0) {
new BusinessException("亚洲切分用户数据为空");
}
threads[i] = new PostThread(HttpClientFactory.getInstance().getHttpAsyncClientPool().getAsyncHttpClient(),
threadList, title, subTitle, imgUrl);
}
for (int k = 0; k< threads.length; k++) {
threads[k].start();
logger.info("亚洲线程: {} 启动",k);
}
for (int j = 0; j < threads.length; j++) {
try {
threads[j].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}else{
threads[0] = new PostThread(HttpClientFactory.getInstance().getHttpAsyncClientPool().getAsyncHttpClient(),
list,title,subTitle, imgUrl);
threads[0].start();
try {
threads[0].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public PostThread(CloseableHttpAsyncClient httpClient, List<FaceBookUserQuitEntity> list, String title, String subTitle,String imgUrl){
this.httpClient = httpClient;
this.list = list;
this. title= title;
this. subTitle= subTitle;
this. imgUrl= imgUrl;
}
@Override
public void run() {
try {
int size = list.size();
for (int k = 0; k < size; k += 100) {
List<FaceBookUserQuitEntity> subList = new ArrayList<FaceBookUserQuitEntity>();
if (k + 100 < size) {
subList = list.subList(k, k + 100);
} else {
subList = list.subList(k, size);
}
if(subList.size() > 0){
httpClient.start();
final long startTime = System.currentTimeMillis();
final CountDownLatch latch = new CountDownLatch(subList.size());
for (FaceBookUserQuitEntity faceBookEntity : subList) {
String senderId = faceBookEntity.getSenderId();
String player_id = faceBookEntity.getPlayer_id();
logger.info("开始发送消息:playerid=" + player_id);
String bodyStr = getPostbody(senderId, player_id, title, subTitle,
imgUrl, "Play Game", "");
if (!bodyStr.isEmpty()) {
final HttpPost httpPost = new HttpPost(URL);
StringEntity stringEntity = new StringEntity(bodyStr, "utf-8");
stringEntity.setContentEncoding("UTF-8");
stringEntity.setContentType("application/json");
httpPost.setEntity(stringEntity);
httpClient.execute(httpPost, new FutureCallback<HttpResponse>() {
@Override
public void completed(HttpResponse result) {
latch.countDown();
int statusCode = result.getStatusLine().getStatusCode();
if(200 == statusCode){
logger.info("请求发消息成功="+bodyStr);
try {
logger.info(EntityUtils.toString(result.getEntity(), "UTF-8"));
} catch (IOException e) {
e.printStackTrace();
}
}else{
logger.info("请求返回状态="+statusCode);
logger.info("请求发消息失败="+bodyStr);
try {
logger.info(EntityUtils.toString(result.getEntity(), "UTF-8"));
} catch (IOException e) {
e.printStackTrace();
}
}
} @Override
public void failed(Exception ex) {
latch.countDown();
logger.info("请求发消息失败e="+ex);
} @Override
public void cancelled() {
latch.countDown();
}
});
}
}
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
long leftTime = 10000 - (System.currentTimeMillis() - startTime);
if (leftTime > 0) {
try {
Thread.sleep(leftTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
} catch (UnsupportedCharsetException e) {
e.printStackTrace();
}
}
以上工具代码可直接使用,发送逻辑代码需适当修改。
最新文章
- Jquery 操作IFrame
- KIP-32 Add timestamps to Kafka message
- redis安装配置与测试
- 09_控制线程_线程睡眠sleep
- 让BOOTSTRAP默认SLIDER支持触屏设备
- CENTOS6上禁用IPV6和DHCP
- 浏览器兼容性判定写法格式(ie)
- Python封装的访问MySQL数据库的类及DEMO
- transition与visibility与display
- Debian/Ubuntu 已安装gcc/g++ 4.8.1
- 【响应式编程的思维艺术】 (3)flatMap背后的代数理论Monad
- Django-ORM多表操作(进阶)
- java求最大值以及定义方法调用
- e780. 设置JList中的已选项
- socat 广播以及多播
- 2018NOIP爆0记第一弹
- hadoop的自定义分组实现 (Partition机制)
- Eclipse中 将java Gradle项目转换为web项目
- 能显示git分支的终端提示配置
- 【ACM】最小公倍数