关于netty的多个handler链式模式
2024-08-24 01:34:57
1. 老规矩, 引入我们喜闻乐见的maven依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.6.Final</version>
</dependency>
2. 服务端
2.1: 服务端引导类:
public class EchoServer { private int port; private EchoServer(int port) {
this.port = port;
} private void start() throws Exception {
System.out.println("Echo Server Start");
EventLoopGroup group = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(group)
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoOutboundHandler1());
ch.pipeline().addLast(new EchoOutboundHandler2()); ch.pipeline().addLast(new EchoInboundHandler1());
ch.pipeline().addLast(new EchoInboundHandler2());
}
});
ChannelFuture f = b.bind().sync();
System.out.println("Server Start Listen At: " + port);
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
} public static void main(String[] args) throws Exception {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8080;
}
new EchoServer(port).start();
}
}
2.2 EchoOutboundHandler1
public class EchoInboundHandler1 extends ChannelInboundHandlerAdapter { @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 读取msg中的数据
ByteBuf result = (ByteBuf) msg;
byte[] bytes = new byte[result.readableBytes()];
result.readBytes(bytes);
String resultStr = new String(bytes);
System.out.println("Server Received: " + resultStr + ": Inbound 1 Is OK"); // 处理完msg中的数据后往msg中重新存放新的数据继续传递
result.writeBytes(resultStr.getBytes());
ctx.fireChannelRead(result);
} }
2.3 EchoInboundHandler2
public class EchoInboundHandler2 extends ChannelInboundHandlerAdapter { @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 读取msg中的数据
ByteBuf result = (ByteBuf) msg;
byte[] bytes = new byte[result.readableBytes()];
result.readBytes(bytes);
String resultStr = new String(bytes);
System.out.println("Server Received: " + resultStr + ": Inbound 2 Is OK"); // 处理完msg中的数据后往msg中重新存放新的数据继续传递
result.writeBytes(resultStr.getBytes());
ctx.write(result);
} }
2.4 EchoOutboundHandler2
public class EchoOutboundHandler2 extends ChannelOutboundHandlerAdapter { @Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
// 读取msg中的数据
ByteBuf result = (ByteBuf) msg;
byte[] bytes = new byte[result.readableBytes()];
result.readBytes(bytes);
String resultStr = new String(bytes);
System.out.println("Server Received: " + resultStr + ": Outbound 2 Is OK"); // 处理完msg中的数据后往msg中重新存放新的数据继续传递
result.writeBytes(resultStr.getBytes());
ctx.write(result);
}
}
2.5 EchoOutboundHandler1
public class EchoOutboundHandler1 extends ChannelOutboundHandlerAdapter { @Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
// 读取msg中的数据
ByteBuf result = (ByteBuf) msg;
byte[] bytes = new byte[result.readableBytes()];
result.readBytes(bytes);
String resultStr = new String(bytes);
System.out.println("Server Received: " + resultStr + ": Outbound 1 Is OK"); // 处理完msg中的数据后往msg中重新存放新的数据继续传递
result.writeBytes(resultStr.getBytes());
ctx.write(result);
ctx.flush();
}
}
3. 客户端
public class EchoClient { private String host;
private int port; private EchoClient(String host, int port) {
this.host = host;
this.port = port;
} private void start() throws Exception {
System.out.println("Echo Client Start");
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(host, port))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoClientHandler());
}
});
ChannelFuture f = b.connect().sync();
System.out.println("Server Client Listen IP: [" + host + ":" + port + "]");
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
} public static void main(String[] args) throws Exception {
String host = "127.0.0.1";
int port = 8080;
int len = 2;
if (args.length == len) {
host = args[0];
port = Integer.parseInt(args[1]);
}
new EchoClient(host, port).start();
} }
public class EchoClientHandler extends ChannelInboundHandlerAdapter { @Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.writeAndFlush(Unpooled.copiedBuffer("Netty Rocks!", CharsetUtil.UTF_8));
} @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 读取msg中的数据
ByteBuf result = (ByteBuf) msg;
byte[] bytes = new byte[result.readableBytes()];
result.readBytes(bytes);
String resultStr = new String(bytes);
System.out.println("Echo Client Received Is OK: " + resultStr);
ctx.close();
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
4. 结果:
服务端:
客户端:
5. 注意事项:
5.1. ChannelInboundHandler之间的传递, 通过调用 ctx.fireChannelRead(msg) 实现; 调用ctx.write(msg) 将传递到ChannelOutboundHandler
5.2. ctx.write()方法执行后, 需要调用flush()方法才能令它立即执行
5.3. ChannelOutboundHandler 在注册的时候需要放在最后一个ChannelInboundHandler之前, 否则将无法传递到ChannelOutboundHandler
最新文章
- AEAI WM V1.5.0 升级说明,开源工作管理系统
- [.net 面向对象程序设计深入](5)MVC 6 —— 构建跨平台.NET开发环境(Windows/Mac OS X/Linux)
- 解决问题E: 无法获得锁 /var/lib/dpkg/lock - open (11: 资源暂时不可用) E: 无法锁定管理目录,
- 页面跳转Transfer与Redirect的区别你知道吗?
- [原创]cocos2d-x研习录-第二阶 概念类之节点类(CCNode)
- Spring-framework下载
- 【读书笔记】读《JavaScript模式》 - 函数复用模式之现代继承模式
- jQuery extend() &; jQuery.fn.extend(),插件编写
- 错误描述: 抱歉,该商品的交易金额与原先的不一致,请重新创建交易付款。 错误代码: TRADE_TOTALFEE_NOT_MATCH
- Xcode:Foundation框架找不到,或者是自动提示出现问题
- unity shader在小米2s上的问题
- 深度解析EM菌
- 浅谈HAL
- [转帖]ExtJs与服务器的交互(一)
- selendroid项目实战3 selendroid driver初始化失败问题
- [Javascript] Promise
- php 迭代器与和生成器
- 安卓activity捕获返回button关闭应用的方法
- [三]java8 函数式编程Stream 概念深入理解 Stream 运行原理 Stream设计思路
- Eureka开启登录认证