服务端

package com.mypractice.netty.server;

import java.net.InetSocketAddress;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel; public class EchoServer {
private final int port; public EchoServer(int port) {
this.port = port;
} public static void main(String[] args) throws Exception { int port = Integer.parseInt("8888");
new EchoServer(port).start();
} public void start() throws Exception {
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(boss,worker)
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
.childHandler(new CostomServerChannelInitializer())
//设置高低水位
.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 1)
.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 4); //绑定监听
ChannelFuture f = b.bind().sync();
System.out.println("正在监听...");
f.channel().closeFuture().sync();
} finally {
//关闭EventLoopGroup,释放资源
boss.shutdownGracefully().sync();
worker.shutdownGracefully().sync();
}
}
}
package com.mypractice.netty.server;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel; public class CostomServerChannelInitializer extends ChannelInitializer<SocketChannel> { protected void initChannel(SocketChannel ch) throws Exception {
// EchoServerHandler被标注为@Shareable,所以我们可以总是使用同样的实例
ch.pipeline().addLast(new EchoServerHandler());
} }
package com.mypractice.netty.server;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOption;
import io.netty.util.CharsetUtil; public class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Server 受到client连接:"+ctx.toString());
System.out.println(ctx.channel().config().getWriteBufferHighWaterMark());
System.out.println(ctx.channel().config().getWriteBufferLowWaterMark());
int size = ctx.channel().unsafe().outboundBuffer().size();
System.out.println("size1:" + size);
super.channelActive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = (ByteBuf) msg;
in.writeBytes("hello iam client".getBytes());
System.out.println("Server received: " + in.toString(CharsetUtil.UTF_8));
ctx.write(in);
} @Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
.addListener(ChannelFutureListener.CLOSE);//将未决消息冲刷到远程节点,并且关闭该Channel
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
System.out.println("channge:" +ctx.channel().isWritable());
int size = ctx.channel().unsafe().outboundBuffer().size();
System.out.println("size:" + size);
super.channelWritabilityChanged(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}

客户端

package com.mypractice.netty.client;

import java.net.InetSocketAddress;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel; public class EchoClient {
private final String host;
private final int port; public EchoClient(String host, int port) {
this.host = host;
this.port = port;
} public void start() throws Exception {
EventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(worker)
.channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(host, port))
.handler(new CostomChannelInitializer()); ChannelFuture f = b.connect().sync(); // 连接到远程节点,阻塞等待直到连接完成
f.channel().closeFuture().sync(); // 阻塞,直到Channel关闭
System.out.println("client close");
} finally {
worker.shutdownGracefully().sync(); // 关闭线程池并且释放所有的资源
}
} public static void main(String[] args) throws Exception {
// if (args.length != 2) {
// System.err.println(
// "Usage: " + EchoClient.class.getSimpleName() +
// " <host> <port>");
// return;
// } String host = "127.0.0.1";// args[0];
int port = 8888;// Integer.parseInt(args[1]);
new EchoClient(host, port).start();
}
}
package com.mypractice.netty.client;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel; public class CostomChannelInitializer extends ChannelInitializer<SocketChannel> { @Override
protected void initChannel(SocketChannel sc) throws Exception {
ChannelPipeline pipeline = sc.pipeline();
pipeline.addLast(new EchoClientHandler());
} }
package com.mypractice.netty.client;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil; //@Sharable //⇽--- 标记该类的实例可以被多个Channel共享
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
/**
* 收到链接时出发
*/
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("rock!");
ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!", // ⇽--- 当被通知Channel是活跃的时候,发送一条消息
CharsetUtil.UTF_8));
}
/**
* 收到数据时触发
*/
@Override
public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) {
System.out.println("Client received: " + in.toString(CharsetUtil.UTF_8));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// TODO Auto-generated method stub
ByteBuf in = (ByteBuf) msg;
System.out.println("Client ##received: " + in.toString(CharsetUtil.UTF_8));
super.channelRead(ctx, msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, // ⇽--- 在发生异常时,记录错误并关闭Channel
Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}

最新文章

  1. jQuery-1.9.1源码分析系列(四) 缓存系统
  2. Java Integer(-128~127)值的==和equals比较产生的思考
  3. Windows Azure Virtual Machine (32) 如何在Windows操作系统配置SFTP
  4. Good Bye 2013 A
  5. CSS3—三角形
  6. Generate GUID using vbscript
  7. table 自动换行
  8. n个数的最大公约、最小公倍数
  9. Java之GUI编程(一)
  10. python(序列递归)【输出原子级别元素。。。】
  11. nth-child 和nth-type的区别
  12. Python OptionParser 使用详解(转载)
  13. sieve的objective-c实现
  14. Spring Boot 整合 docker
  15. 亚马逊(Review、Feedback)差评怎么处理?
  16. 【开发】iOS入门 - XCode快捷键
  17. redis 中的key值过期后,触发通知事件
  18. 网络操作系统 第六章 Window Server 2008 活动目录
  19. Linux 常用命令 | mkdir/rmdir/touch 的使用
  20. Arduino 开关控制小灯持续亮之具体思路

热门文章

  1. iOS开发系列-LLVM、Clang
  2. centos lamp笔记
  3. Redhat镜像-RHEL-官方镜像下载大全
  4. Navicat Premium下载、安装、破解
  5. mysql双主热备
  6. redis和ehcache的区别,存储方式(各属于内存还是外存)
  7. day04 - 02 linux简单的操作命令
  8. js--判断当前环境是否为iphonex环境
  9. oxyplot 禁止拖动,缩放
  10. (1)mysql数据库操作