Netty是Trustin Lee在2004年开发的一款高性能的网络应用程序框架。相比于JDK自带的NIO,Netty做了相当多的增强,且隔离了jdk nio的实现细节,API也比较友好,还支持流量整形等高级特性。在我们常见的一些开源项目中已经普遍的应用到了Netty,比如Dubbo、Elasticsearch、Zookeeper等。

Netty的具体开发

提示:因代码相对较多,这里只展示其主要部分,至于项目中用到的编解码器、工具类,请直接拉到最后下载源码!也欢迎顺手给个Star~

需要的依赖
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency> <dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>4.1.1</version>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-jmx</artifactId>
<version>4.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.29.Final</version>
</dependency>
Client端代码
package com.example.nettydemo.client;

import com.example.nettydemo.client.codec.*;
import com.example.nettydemo.client.codec.dispatcher.OperationResultFuture;
import com.example.nettydemo.client.codec.dispatcher.RequestPendingCenter;
import com.example.nettydemo.client.codec.dispatcher.ResponseDispatcherHandler;
import com.example.nettydemo.common.RequestMessage;
import com.example.nettydemo.common.string.StringOperation;
import com.example.nettydemo.util.IdUtil;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler; import javax.net.ssl.SSLException;
import java.util.concurrent.ExecutionException; public class Client { public static void main(String[] args) throws InterruptedException, ExecutionException, SSLException { Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class); //客户端连接服务器最大允许时间,默认为30s
bootstrap.option(NioChannelOption.CONNECT_TIMEOUT_MILLIS, 30 * 1000); //10s NioEventLoopGroup group = new NioEventLoopGroup();
try { bootstrap.group(group); RequestPendingCenter requestPendingCenter = new RequestPendingCenter();
LoggingHandler loggingHandler = new LoggingHandler(LogLevel.INFO); bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new FrameDecoder());
pipeline.addLast(new FrameEncoder()); pipeline.addLast(new ProtocolEncoder());
pipeline.addLast(new ProtocolDecoder()); pipeline.addLast(new ResponseDispatcherHandler(requestPendingCenter));
pipeline.addLast(new OperationToRequestMessageEncoder()); // pipeline.addLast(loggingHandler); }
}); //连接服务
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8888);
//因为future是异步执行,所以需要先连接上后,再进行下一步操作
channelFuture.sync(); long streamId = IdUtil.nextId();
/**
* 发送数据测试,按照定义的规则组装数据
*/
// OrderOperation orderOperation = new OrderOperation(1001, "你好啊,hi");
RequestMessage requestMessage = new RequestMessage(streamId, new StringOperation(1234, "你好啊,hi")); //将future放入center
OperationResultFuture operationResultFuture = new OperationResultFuture();
requestPendingCenter.add(streamId, operationResultFuture); //发送消息
for (int i = 0; i < 10; i++) {
channelFuture.channel().writeAndFlush(requestMessage);
} //阻塞等待结果,结果来了之后会调用ResponseDispatcherHandler去set结果
// OperationResult operationResult = operationResultFuture.get();
// //将结果打印
// System.out.println("返回:"+operationResult); channelFuture.channel().closeFuture().get(); } finally {
group.shutdownGracefully();
} } }
Server端代码
package com.example.nettydemo.server;

import com.example.nettydemo.server.codec.FrameDecoder;
import com.example.nettydemo.server.codec.FrameEncoder;
import com.example.nettydemo.server.codec.ProtocolDecoder;
import com.example.nettydemo.server.codec.ProtocolEncoder;
import com.example.nettydemo.server.handler.MetricsHandler;
import com.example.nettydemo.server.handler.ServerIdleCheckHandler;
import com.example.nettydemo.server.handler.ServerProcessHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.flush.FlushConsolidationHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.traffic.GlobalTrafficShapingHandler;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.UnorderedThreadPoolEventExecutor;
import lombok.extern.slf4j.Slf4j; import javax.net.ssl.SSLException;
import java.security.cert.CertificateException;
import java.util.concurrent.ExecutionException; /**
* netty server 入口
*/
@Slf4j
public class Server { public static void main(String... args) throws InterruptedException, ExecutionException, CertificateException, SSLException { ServerBootstrap serverBootstrap = new ServerBootstrap();
//设置channel模式,因为是server所以使用NioServerSocketChannel
serverBootstrap.channel(NioServerSocketChannel.class); //最大的等待连接数量
serverBootstrap.option(NioChannelOption.SO_BACKLOG, 1024);
//设置是否启用 Nagle 算法:用将小的碎片数据连接成更大的报文 来提高发送效率。
//如果需要发送一些较小的报文,则需要禁用该算法
serverBootstrap.childOption(NioChannelOption.TCP_NODELAY, true); //设置netty自带的log,并设置级别
serverBootstrap.handler(new LoggingHandler(LogLevel.INFO)); //thread
//用户指定线程名
NioEventLoopGroup bossGroup = new NioEventLoopGroup(0, new DefaultThreadFactory("boss"));
NioEventLoopGroup workGroup = new NioEventLoopGroup(0, new DefaultThreadFactory("worker"));
UnorderedThreadPoolEventExecutor businessGroup = new UnorderedThreadPoolEventExecutor(10, new DefaultThreadFactory("business")); //只能使用一个线程,因GlobalTrafficShapingHandler比较轻量级
NioEventLoopGroup eventLoopGroupForTrafficShaping = new NioEventLoopGroup(0, new DefaultThreadFactory("TS")); try {
//设置react方式
serverBootstrap.group(bossGroup, workGroup); //metrics
MetricsHandler metricsHandler = new MetricsHandler(); //trafficShaping流量整形
//long writeLimit 写入时控制, long readLimit 读取时控制 具体设置看业务修改
GlobalTrafficShapingHandler globalTrafficShapingHandler = new GlobalTrafficShapingHandler(eventLoopGroupForTrafficShaping, 10 * 1024 * 1024, 10 * 1024 * 1024); //log
LoggingHandler debugLogHandler = new LoggingHandler(LogLevel.DEBUG);
LoggingHandler infoLogHandler = new LoggingHandler(LogLevel.INFO); //设置childHandler,按执行顺序放
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("debugLog", debugLogHandler);
pipeline.addLast("tsHandler", globalTrafficShapingHandler);
pipeline.addLast("metricHandler", metricsHandler);
pipeline.addLast("idleHandler", new ServerIdleCheckHandler()); pipeline.addLast("frameDecoder", new FrameDecoder());
pipeline.addLast("frameEncoder", new FrameEncoder());
pipeline.addLast("protocolDecoder", new ProtocolDecoder());
pipeline.addLast("protocolEncoder", new ProtocolEncoder()); pipeline.addLast("infoLog", infoLogHandler);
//对flush增强,减少flush次数牺牲延迟增强吞吐量
pipeline.addLast("flushEnhance", new FlushConsolidationHandler(10, true));
//为业务处理指定单独的线程池
pipeline.addLast(businessGroup, new ServerProcessHandler());//businessGroup,
}
}); //绑定端口并阻塞启动
ChannelFuture channelFuture = serverBootstrap.bind(8888).sync(); channelFuture.channel().closeFuture().sync(); } finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
businessGroup.shutdownGracefully();
eventLoopGroupForTrafficShaping.shutdownGracefully();
} } }

最后

以上介绍了Netty的基本用法,在代码中也做了一部分的关键注释,但可能还会有许多不足,也不可能满足所有人的要求,大家可根据自己的实际需求去改造此项目。附上源码地址netty源码

持续学习,记录点滴。更多文章请访问 文章首发

最新文章

  1. jquery 将disabled的元素置为enabled的三种方法
  2. Hibernate入门注解笔记
  3. Computer Science Theory for the Information Age-2: 高维空间中的正方体和Chernoff Bounds
  4. BZOJ_3039_玉蟾宫_(动态规划+悬线法)
  5. java基于xml配置的通用excel单表数据导入组件(一、实际应用过程)
  6. POJ-3294-Life Forms(后缀数组-不小于 k 个字符串中的最长子串)
  7. Python进阶之路---1.1python简介
  8. 2016 Multi-University Training Contest 2 总结
  9. ASP.NET基础之HttpHandler学习
  10. Educational Codeforces Round 20.C
  11. MyRolan (快速启动小工具)
  12. idea (2018.09) 安装破解mybatis plugin
  13. Ubuntu16.04 + gtx1060 + cuda8.0 + cudnn5.1 + caffe + Theano + Tensorflow
  14. 《Inside C#》笔记(九) 表达式和运算符
  15. LeetCode--458--可怜的小猪
  16. Vue 初始化多个Vue 及之间的相互修改
  17. 浅谈分布式消息技术 Kafka
  18. tar -h 参数
  19. jquery 获取 tagName(JQuery如何得到tagName?)
  20. ubuntu切割mp3文件

热门文章

  1. ypoj 2286 佳佳买菜
  2. scrapy爬虫-scrapy-redis分布式
  3. python下载神通板砖有声版
  4. Java中文件上传路径与路径修改相关问题(tomcat8.0+eclipse)
  5. getBoundingClientRect的实用场景
  6. Eureka停更了?试试Zookpper和Consul
  7. 如何使用Kibana
  8. LeetCode 136、137、260(只出现一次的数,异或性质及应用)
  9. MVC超链接调用控制器内的方法
  10. 建议13:禁用Function构造函数