在上一篇博客(netty入门实现简单的echo程序)中,我们知道了如何使用netty发送一个简单的消息,但是这远远是不够的。在这篇博客中,我们来使用netty发送一个java bean对象的消息,但是要发送对象类型的消息,必然要将java对象进行序列化,在java中序列化框架有很多中,此处我们使用protostuff来进行序列化,不了解protostuff的可以先看一下这篇博客(protostuff序列化)了解一下简单的用法。

需求:

客户端在连接上服务器端时,向服务器发送100个Person对象。

服务器端接收到消息后在控制台上打印即可。

消息发送的协议:4个字节的长度(长度是后面对象的长度,不包含自身的4个字节),后面是实际的要发送的数据

实现思路:

一、服务器端:

1、服务器端先用LengthFieldBasedFrameDecoder进行解码,获取到一个完整的ByteBuf消息

2、然后编写ProtostuffDecoder解码器将上一步解码出来的消息,转换成一个Person对象

3、编写ProtoStuffServerHandler类用于将上一步解码出来的Person对象输出出来

二、客户端

1、编写ProtostuffClientHandler类用于向服务器端发送Person对象

2、编写ProtostuffEncoder来进行将Person对象转换成字节数组

3、借助netty的LengthFieldPrepender来向上一步的字节数组前增加4个字节的消息长度

半包的处理主要是借助netty提交的LengthFieldBasedFrameDecoder来进行处理

注意:

new LengthFieldPrepender(4) ==>  会在发送的数据前增加4个字节表示消息的长度

new LengthFieldBasedFrameDecoder(10240, 0, 4, 0, 4)  ==> 10240表示如果此次读取的字节长度比这个大说明可能是别人伪造socket攻击,将会抛出异常,第一个4表示读取四个字节表示此次 消息的长度,后面一个4表示丢弃四个字节,然后读取业务数据。

实现步骤:

一、引入maven依赖

<strong><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.huan.netty</groupId>
<artifactId>netty-study</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>netty-study</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.6.Final</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.18</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.1.7</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.1.7</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-access</artifactId>
<version>1.1.7</version>
</dependency>
<dependency>
<groupId>io.protostuff</groupId>
<artifactId>protostuff-api</artifactId>
</dependency>
<dependency>
<groupId>io.protostuff</groupId>
<artifactId>protostuff-core</artifactId>
</dependency>
<dependency>
<groupId>io.protostuff</groupId>
<artifactId>protostuff-runtime</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.protostuff</groupId>
<artifactId>protostuff-bom</artifactId>
<version>1.4.4</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
</project>
</strong>

 二、编写实体类Person(客户端将会发送这个对象,服务器端接收这个对象)

<strong>@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Person {
private int id;
private String name;
}</strong>

 三、编写ProtostuffDecoder用将ByteBuf中的数据转换成Person对象

<strong>public class ProtostuffDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Schema<Person> schema = RuntimeSchema.getSchema(Person.class);
Person person = schema.newMessage();
byte[] array = new byte[in.readableBytes()];
in.readBytes(array);
ProtobufIOUtil.mergeFrom(array, person, schema);
out.add(person);
}
}</strong>

 四、编写ProtoStuffServerHandler用于将接收到的数据输出到控制台

<strong>@Slf4j
public class ProtoStuffServerHandler extends ChannelInboundHandlerAdapter {
private int counter = 0; /**
* 接收到数据的时候调用
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Person person = (Person) msg;
log.info("当前是第[{}]次获取到客户端发送过来的person对象[{}].", ++counter, person);
} /** 当发生了异常时,次方法调用 */
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("error:", cause);
ctx.close();
}
}
</strong>

 五、编写netty的服务端,用于启动netty的服务

<strong>@Slf4j
public class NettyServer {
public static void main(String[] args) throws Exception {
EventLoopGroup boss = new NioEventLoopGroup(1);
EventLoopGroup worker = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, worker)//
.channel(NioServerSocketChannel.class)// 对应的是ServerSocketChannel类
.option(ChannelOption.SO_BACKLOG, 128)//
.handler(new LoggingHandler(LogLevel.TRACE))//
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(10240, 0, 4, 0, 4));
ch.pipeline().addLast(new ProtostuffDecoder());
ch.pipeline().addLast(new ProtoStuffServerHandler());
}
});
ChannelFuture future = bootstrap.bind(9090).sync();
log.info("server start in port:[{}]", 9090);
future.channel().closeFuture().sync();
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}</strong>

 此处需要注意解码器的顺序:

    必须要先是LengthFieldBasedFrameDecoder,然后是ProtostuffDecoder,在是ProtoStuffServerHandler

六、编写客户端的handler处理器,用于向服务器端发送Person对象

<strong>@Slf4j
public class ProtostuffClientHandler extends ChannelInboundHandlerAdapter {
/**
* 客户端和服务器端TCP链路建立成功后,此方法被调用
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Person person;
for (int i = 0; i < 100; i++) {
person = new Person();
person.setId(i);
person.setName("张三" + i);
ctx.writeAndFlush(person);
}
} /**
* 发生异常时调用
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("client error:", cause);
ctx.close();
}
}</strong>

 七、编写ProtostuffEncoder编码器,用于将Person对象编码成字节数组

<strong>public class ProtostuffEncoder extends MessageToByteEncoder<Person> {

	@Override
protected void encode(ChannelHandlerContext ctx, Person msg, ByteBuf out) throws Exception {
LinkedBuffer buffer = LinkedBuffer.allocate(1024);
Schema<Person> schema = RuntimeSchema.getSchema(Person.class);
byte[] array = ProtobufIOUtil.toByteArray(msg, schema, buffer);
out.writeBytes(array);
} }</strong>

 八、编写客户端

<strong>@Slf4j
public class NettyClient { public static void main(String[] args) throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)//
.channel(NioSocketChannel.class)//
.option(ChannelOption.TCP_NODELAY, true)//
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LengthFieldPrepender(4));
ch.pipeline().addLast(new ProtostuffEncoder());
ch.pipeline().addLast(new ProtostuffClientHandler());
}
});
ChannelFuture future = bootstrap.connect("127.0.0.1", 9090).sync();
log.info("client connect server.");
future.channel().closeFuture().sync();
group.shutdownGracefully();
}
}
</strong>

 注意:此处也需要注意initChannel方法中的编码器加入的顺序

          ProtostuffEncoder->将Person对象转换成字节数组

          LengthFieldPrepender->在上一步的字节数组前加入4个字节的长度

九、启动服务器端和客户端进行测试

最新文章

  1. iOS 8 以后 MKMapView 代理不执行问题
  2. Sprint回顾_团队听诊器
  3. Android --RatingBar的使用
  4. NET 框架基本原理透析⑵
  5. EventBus学习
  6. JSP页面动态联动
  7. Hive内表和外表的区别
  8. Python之路第二天,基础(2)-基本数据类型
  9. USB class总结
  10. linux 定时执行任务
  11. jqmobile
  12. 全网最全ASP.NET MVC 教程汇总
  13. oracle当需要commit
  14. 基于Dubbo的http自动测试工具分享
  15. 解决Error:All flavors must now belong to a named flavor dimension. Learn more at...
  16. 【c语言】分配内存与释放内存
  17. 把button中文字的省略号放到后面
  18. javaweb中的乱码问题
  19. 行为事件(ActionChains)源码详解
  20. 【BZOJ2281】【Sdoi2011】黑白棋 解题报告

热门文章

  1. Linux上使用设置printf显示的颜色
  2. 使用Dockerfile Maven插件
  3. HDU1548 Building Roads
  4. PC端利用Xshell连接Android上的Termux
  5. 解决wampserver无法启动问题
  6. js中针对dom的crud
  7. 虚拟机启动jenkins,访问提示:该Jenkins实例似乎已离线
  8. cgroup之cpu关键参数
  9. [转载]php连接postgreSQL数据库及其操作(php5,postgreSQL9)
  10. 华为云计算IE面试笔记-Fusionsphere架构及组件介绍(服务器虚拟化解决方案)