客户端:根据 长度+数据 方式发送

package com.server;

import java.net.Socket;
import java.nio.ByteBuffer; public class Client { public static void main(String[] args) throws Exception {
Socket socket = new Socket("127.0.0.1", 10101); String message = "hello"; byte[] bytes = message.getBytes(); ByteBuffer buffer = ByteBuffer.allocate(4 + bytes.length);
buffer.putInt(bytes.length);//netty是write,ByteBuffer是nio的,所以用put。
buffer.put(bytes); byte[] array = buffer.array(); for(int i=0; i<5; i++){
socket.getOutputStream().write(array);
} socket.close();
} }

服务端:根据 长度+数据 接收解码

package com.server;

import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.string.StringDecoder;
import org.jboss.netty.handler.codec.string.StringEncoder; public class Server { public static void main(String[] args) {
//服务类
ServerBootstrap bootstrap = new ServerBootstrap();
//boss线程监听端口,worker线程负责数据读写
ExecutorService boss = Executors.newCachedThreadPool();
ExecutorService worker = Executors.newCachedThreadPool();
//设置niosocket工厂
bootstrap.setFactory(new NioServerSocketChannelFactory(boss, worker));
//设置管道的工厂
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", new MyDecoder());
pipeline.addLast("handler1", new HelloHandler());
return pipeline;
}
});
bootstrap.bind(new InetSocketAddress(10101));
System.out.println("start!!!");
}
}
package com.server;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.frame.FrameDecoder; public class MyDecoder extends FrameDecoder {
@Override //FrameDecoder的decode方法
protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
//buffer是netty的ChannelBuffer
if(buffer.readableBytes() > 4){ //必须大于基本的最短长度4个字节
if(buffer.readableBytes() > 2048){ //防止socket攻击,缓存很大,所以限制数据的长度不能大于2048,
buffer.skipBytes(buffer.readableBytes());
}
//标记
buffer.markReaderIndex();
//长度
int length = buffer.readInt();
//buffer里面剩余的数据小于长度
if(buffer.readableBytes() < length){
//前面做了标记,这里可以还原
buffer.resetReaderIndex();
//缓存当前剩余的buffer数据,等待剩下数据包到来
return null;
}
//大于长度,开始读数据
byte[] bytes = new byte[length];
buffer.readBytes(bytes);
//往下传递对象给HelloHandler,这次的buffer处理完了,后面在来buffer的时候FrameDecoder会帮我们循环读取,
return new String(bytes);
}
//缓存当前剩余的buffer数据,等待剩下数据包到来(FrameDecoder帮我们实现的),
return null;
}
}
package com.server;

import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler; public class HelloHandler extends SimpleChannelHandler { private int count = 1;//单线程的没有并发问题 @Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
System.out.println(e.getMessage() + " " +count);
count++;
}
}
2、看下粘包和分包是怎么样一个情况:

粘包是因为服务端和客户端没有约定好数据结构,发送100个hello,粘包就是100个hello全在一起服务端分不开。分包就是服务端把100个hello随机的分开。

定义一个稳定的结构:最简单结构:length + hello
根据长度+数据会有攻击:
,把长度定义的很大Intger.max,这种数据包,会导致一直在缓存,把缓存撑的很大。通常被称为socket攻击,字节流式攻击。
所以要 包头+长度+数据,前面加包头做标识。大于2048就清除,然后读取到包头就表示数据的开始。 心中会有连个疑惑(FrameDecoder源码分析):
1、为什么FrameDecoder return的对象就是往下传递的对象 (还是调用了sendUpstream)
2、buffer里面数据未被读取完怎么办? (cumulation缓存)
3、为什么return null就可以缓存buffer (cumulation缓存)
3、FrameDecoder里面的cumulation其实就是一个缓存的buffer对象 public abstract class FrameDecoder extends SimpleChannelUpstreamHandler //事件进入管道以后进入第一个handler的messageReceived方法,
protected ChannelBuffer cumulation;//缓存的buffer对象
@Override
public void messageReceived(
ChannelHandlerContext ctx, MessageEvent e) throws Exception {
Object m = e.getMessage();
if (!(m instanceof ChannelBuffer)) {//判断是不是一个ChannelBuffer,不是就不管他了,
ctx.sendUpstream(e);//传给下一个handler
return;
}
ChannelBuffer input = (ChannelBuffer) m;//是ChannelBuffer,转成ChannelBuffer对象
if (!input.readable()) {//没有数据读,也不管了。return
return;
}
if (cumulation == null) {//如果renturn null还有数据,就缓存在buffer里面
try {
// the cumulation buffer is not created yet so just pass the input to callDecode(...) method
callDecode(ctx, e.getChannel(), input, e.getRemoteAddress());
} finally {
updateCumulation(ctx, input);
}
} else {
input = appendToCumulation(input);
try {
callDecode(ctx, e.getChannel(), input, e.getRemoteAddress());
} finally {
updateCumulation(ctx, input);
}
}
}

最新文章

  1. 4. web前端开发分享-css,js工具篇
  2. sass/scss 和 less的区别
  3. 《More Effective C#》读书笔记
  4. JSON生成c#类代码小工具
  5. javascript基础语法——变量和标识符
  6. ios开发FMDB导入SQLCipher加密数据库
  7. Hark的数据结构与算法练习之图书馆排序
  8. TI CC2541 BLE协议栈蓝牙MAC 地址
  9. 微信支付开发(4) 动态链接Native支付
  10. Windows7隐藏字体
  11. javascript获取div高度
  12. android自定义控件,其三个父类构造方法有什么区别
  13. iptables中DNAT的配置方法
  14. ubuntu连接android设备(附最简单方法)
  15. github的markdown页内锚点以及本地资源链接
  16. python列表的基础操作
  17. [数学]MIT牛人解说数学体系
  18. 判断网络远端服务器是否断开连接(心跳连接:socket.sendUrgentData)
  19. 根据ip获取地点
  20. python基础学习8----文件基本操作

热门文章

  1. js自定义双击函数
  2. Linux命令之乐--md5sum
  3. 自定义控件_VIewPager显示多个Item
  4. Zabbix-3.0.0 安装Graphtree
  5. java解析xml字符串(用dom4j)
  6. sort命令与cat区别
  7. 170413、怎么精确理解leader布置的任务(技术外的话)
  8. 粘性会话 session affinity sticky session requests from the same client to be passed to the same server in a group of servers
  9. python celery rabbitmq--- pypi image from ustc
  10. echarts容器动态设置高度