Netty编解码技术是什么意思呢?所谓的编解码技术,说白了就是java序列化技术。序列化有两个目的:

1、进行网络传输
2、对象持久化

虽然我们可以使用java进行序列化,Netty去传输。但是java序列化的硬伤太多,比如java的序列化无法跨平台、序列化后码流太大、序列化性能非常低等等...

码流太大是什么意思呢?比如说原先的我一篇文档,比如说大小是1M,序列化完了之后可能0.5M,序列化减少二分之一的码,比较大。然后0.5M去网络传输这个不太好。你比如说用其它的一些主流序列化的话可能就0.01M,非常小。性能非常好。

性能太低就是说,我用java序列化的过程可能需要10s,而用其它的高性能序列化可能0.1s。差距就是这么的大。

序列化的目的无非就是网络传输。而目前主流的序列化框架有以下几种:

1、JBoss的Marshalling
2、Google的Protobuf
3、基于Protobuf的Kyro
4、MessagePack框架

其实我们主要是讲Marshalling和Google的Protobuf。这两个是业界非常好用的框架。其中JBoss的Marshalling速度还要比Google的Protobuf要快,原因是因为Marshalling不是跨语言,两端都是java与java之间相互传输的。因此,在这种情况下我们就用它就行了。但如果你想实现跨语言,比如这边是c#,另一边是java。这种跨语言进行通信传输的话,那你就需要用到Google的Protobuf来进行跨语言的传输。性能也非常高。而且它自己有一些大端小端的优化机制。

下面开始Marshalling编码实现。

首先新建一个java工程,导入netty和jboss-marshalling的jar包,导入几张图片到sources文件夹以便测试。

新建一个Req类,并编写相关代码

 package com.it448.serial;

 import java.io.Serializable;

 public class Req implements Serializable{
private static final long serialVersionUID = 1L; private String id ;
private String name ;
private String requestMessage ;
private byte[] attachment; public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getRequestMessage() {
return requestMessage;
}
public void setRequestMessage(String requestMessage) {
this.requestMessage = requestMessage;
}
public byte[] getAttachment() {
return attachment;
}
public void setAttachment(byte[] attachment) {
this.attachment = attachment;
}
}

新建一个Resp类,并编写相关代码

 package com.it448.serial;

 import java.io.Serializable;

 public class Resp implements Serializable{

     private static final long serialVersionUID = 1L;

     private String id;
private String name;
private String responseMessage; public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getResponseMessage() {
return responseMessage;
}
public void setResponseMessage(String responseMessage) {
this.responseMessage = responseMessage;
}
}

新建一个工具类GzipUtils,方便调用

 package com.it448.utils;

 import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream; public class GzipUtils {
public static byte[] gzip(byte[] data) throws Exception{
ByteArrayOutputStream bos = new ByteArrayOutputStream();
GZIPOutputStream gzip = new GZIPOutputStream(bos);
gzip.write(data);
gzip.finish();
gzip.close();
byte[] ret = bos.toByteArray();
bos.close();
return ret;
} public static byte[] ungzip(byte[] data) throws Exception{
ByteArrayInputStream bis = new ByteArrayInputStream(data);
GZIPInputStream gzip = new GZIPInputStream(bis);
byte[] buf = new byte[1024];
int num = -1;
ByteArrayOutputStream bos = new ByteArrayOutputStream();
while((num = gzip.read(buf, 0 , buf.length)) != -1 ){
bos.write(buf, 0, num);
}
gzip.close();
bis.close();
byte[] ret = bos.toByteArray();
bos.flush();
bos.close();
return ret;
} public static void main(String[] args) throws Exception{ // 读取文件
String readPath = System.getProperty("user.dir") + File.separatorChar + "sources" + File.separatorChar + "006.jpg";
File file = new File(readPath);
FileInputStream in = new FileInputStream(file);
byte[] data = new byte[in.available()];
in.read(data);
in.close(); System.out.println("文件原始大小:" + data.length);
// 测试压缩 byte[] ret1 = GzipUtils.gzip(data);
System.out.println("压缩之后大小:" + ret1.length); byte[] ret2 = GzipUtils.ungzip(ret1);
System.out.println("还原之后大小:" + ret2.length); // 写出文件
String writePath = System.getProperty("user.dir") + File.separatorChar + "receive" + File.separatorChar + "006.jpg";
FileOutputStream fos = new FileOutputStream(writePath);
fos.write(ret2);
fos.close();
}
}

新建一个Marshalling工厂类MarshallingCodeCFactory.java

 package com.it448.serial;

 import io.netty.handler.codec.marshalling.DefaultMarshallerProvider;
import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallingDecoder;
import io.netty.handler.codec.marshalling.MarshallingEncoder;
import io.netty.handler.codec.marshalling.UnmarshallerProvider; import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.MarshallingConfiguration; /**
* Marshalling工厂
* @author(xyh)
* @since 2019-06-12
*/
public final class MarshallingCodeCFactory { /**
* 创建Jboss Marshalling解码器MarshallingDecoder
* @return MarshallingDecoder
*/
public static MarshallingDecoder buildMarshallingDecoder() {
// 首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
// 创建了MarshallingConfiguration对象,配置了版本号为5
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
// 根据marshallerFactory和configuration创建provider
UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
// 构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度
MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);
return decoder;
} /**
* 创建Jboss Marshalling编码器MarshallingEncoder
* @return MarshallingEncoder
*/
public static MarshallingEncoder buildMarshallingEncoder() {
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
// 构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
MarshallingEncoder encoder = new MarshallingEncoder(provider);
return encoder;
}
}

新建一个服务端的Handler类ServerHandler.java

 package com.it448.serial;

 import java.io.File;
import java.io.FileOutputStream; import com.it448.utils.GzipUtils;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; public class ServerHandler extends ChannelInboundHandlerAdapter{
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
} @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Req req = (Req)msg;
System.out.println("Server : " + req.getId() + ", " + req.getName() + ", " + req.getRequestMessage());
byte[] attachment = GzipUtils.ungzip(req.getAttachment()); String path = System.getProperty("user.dir") + File.separatorChar + "receive" + File.separatorChar + "001.jpg";
FileOutputStream fos = new FileOutputStream(path);
fos.write(attachment);
fos.close(); Resp resp = new Resp();
resp.setId(req.getId());
resp.setName("resp" + req.getId());
resp.setResponseMessage("响应内容" + req.getId());
ctx.writeAndFlush(resp);
} @Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}

新建一个服务端类Server.java

 package com.it448.serial;

 import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler; public class Server {
public static void main(String[] args) throws Exception{
EventLoopGroup pGroup = new NioEventLoopGroup();
EventLoopGroup cGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap();
b.group(pGroup, cGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
// 设置日志
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
sc.pipeline().addLast(new ServerHandler());
}
}); ChannelFuture cf = b.bind(8765).sync(); cf.channel().closeFuture().sync();
pGroup.shutdownGracefully();
cGroup.shutdownGracefully();
}
}

新建一个客户端Handler类ClientHandler.java

 package com.it448.serial;

 import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil; public class ClientHandler extends ChannelInboundHandlerAdapter{
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
} @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
Resp resp = (Resp)msg;
System.out.println("Client : " + resp.getId() + ", " + resp.getName() + ", " + resp.getResponseMessage());
} finally {
ReferenceCountUtil.release(msg);
}
} @Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}

新建一个客户端类Client.java

 package com.it448.serial;

 import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel; import java.io.File;
import java.io.FileInputStream; import com.it448.utils.GzipUtils; public class Client {
public static void main(String[] args) throws Exception{
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
sc.pipeline().addLast(new ClientHandler());
}
}); ChannelFuture cf = b.connect("127.0.0.1", 8765).sync(); for(int i = 0; i < 1000; i++ ){
Req req = new Req();
req.setId("" + i);
req.setName("pro" + i);
req.setRequestMessage("数据信息" + i);
String path = System.getProperty("user.dir") + File.separatorChar + "sources" + File.separatorChar + "001.jpg";
File file = new File(path);
FileInputStream in = new FileInputStream(file);
byte[] data = new byte[in.available()];
in.read(data);
in.close();
req.setAttachment(GzipUtils.gzip(data));
cf.channel().writeAndFlush(req);
} cf.channel().closeFuture().sync();
group.shutdownGracefully();
}
}

代码测试

首先启动服务端,也就是运行Server类的main方法。

然后启用客户端,也就是运行Client类的main方法。

测试结果

从图中可以看到,receive文件夹多了一张001.jpg的图片。说明图片已经传输过来了。

好了,这部分内容就讲到这里,送上今天的福利:三套Netty系列教程【价值600】,加wxhaox就可以领取。当然了,对应netty有任何疑问也都可以咨询!!

end -- 1560313059

  -- 学而不思则罔,思而不学则殆

最新文章

  1. Over:窗口函数(滑动聚合)
  2. javascript中的窗口和框架
  3. 关于C/S模式开发的学习笔记
  4. 【转】25个必须记住的SSH命令
  5. SharePoint 2013 新建项目字段自动加载上次保存值
  6. sql语句原则
  7. [LeetCode]题解(python):104 Maximum Depth of Binary Tree
  8. 2-sat(and,or,xor)poj3678
  9. Hanganalyze 使用
  10. 一次java程序的重构
  11. [置顶] hdu4747 Mex 线段树
  12. JSP page指令
  13. android开发过程中踩过的坑
  14. python3 树莓派 + usb摄像头 做颜色识别 二维码识别
  15. yagmail 实现发邮件
  16. Python yield使用浅析
  17. 菜鸟nginx源代码剖析数据结构篇(八) 缓冲区链表ngx_chain_t
  18. SVN上拖下来的项目,缺少build path怎么办?
  19. 没有什么,开发ASP.NET时随便写写,想到什么写什么
  20. Disconf 学习系列之Disconf是什么?

热门文章

  1. UEditor中多图上传的bug
  2. ios之UISlider
  3. PHP必知必会
  4. APIO2019&amp;&amp;THUSC2019游记
  5. WINDOWS下使用Mysql 中碰到的问题记录
  6. golang 实现冒泡排序
  7. RabbitMQ 初体验
  8. 使用 HTML5 Geolocation 构建基于地理位置的 Web 应用学习网站分享
  9. git 的右键快捷菜单恢复
  10. 黑匣子_NOI导刊2010提高 (对顶堆)