一、IO模型
- 阻塞式IO模型
- 非阻塞式IO模型
- IO复用
- 信号驱动式IO
- 异步IO
- linux IO流程
2. 各个IO模型的比较
NIO的优势
- 事件驱动模型 避免多线程 单线程处理多任务
- 非阻塞,IO读写不再阻塞,而是返回0
- 基于通道的传输,比基于流更有效率
- 更高级的IO函数,零拷贝
- IO多路复用大大提高了JAVA网络应用的可伸缩性和实用性
NIO的缺点
- 编程困难
- 陷阱重重
3.TCP粘包,拆包问题
问题原因:TCP协议发送时造成
.NETty解决方案:
4.Netty的零拷贝
传统拷贝方式
- 数据从磁盘读取到内核的read buffer
- 数据从内核缓冲区拷贝到用户缓冲区
- 数据从用户缓冲区拷贝到内核的socket buffer
- 数据从内核的socket buffer拷贝到网卡接口(硬件)的缓冲区
零拷贝
- 调用transferTo,数据从文件由DMA引擎拷贝到内核read buffer
- 接着DMA从内核read buffer将数据拷贝到网卡接口buffer
Netty中的零拷贝体现在这三个方面:
1.bytebuffer
Netty发送和接收消息主要使用bytebuffer,bytebuffer使用对外内存(DirectMemory)直接进行Socket读写。
2.Composite Buffers
传统的ByteBuffer,如果需要将两个ByteBuffer中的数据组合到一起,我们需要首先创建一个size=size1+size2大小的新的数组,然后将两个数组中的数据拷贝到新的数组中。但是使用Netty提供的组合ByteBuf,就可以避免这样的操作,因为CompositeByteBuf并没有真正将多个Buffer组合起来,而是保存了它们的引用,从而避免了数据的拷贝,实现了零拷贝。
3.对于FileChannel.transferTo的使用
Netty中使用了FileChannel的transferTo方法,该方法依赖于操作系统实现零拷贝。
二、Netty组件
- Channel – 对应NIO中的channel
- EventLoop --对应NIO中的while循环
- ChannelHandler和ChannelPipline --对应NIO客户逻辑实现handleRead和handleWrite
- ByteBuf --对应Nio中的ByteBuffer
- BootStrap和BootServerStrap --对应NIO中的Selecter、ServerSocketChannel等的创建、配置、启动
Reactor线程模型
Rector线程模型有三种形式:
1.单线程模型:
2.多线程模型:
3.mutiple模型
Netty对这三种模式都有支持
三、简单的例子
1.引入pom包
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.42.Final</version>
</dependency>
2.服务器端
public class TimeServer {
public void bind(int port) throws InterruptedException {
EventLoopGroup boosGroup=new NioEventLoopGroup();
EventLoopGroup workerGroup=new NioEventLoopGroup();
try{
ServerBootstrap b=new ServerBootstrap();
b.group(boosGroup,workerGroup)
.channel(NIOServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,1024)
.childHandler(new ChildChannelHandler());
//绑定端口,同步等待成功
ChannelFuture future = b.bind(port).sync();
//等待服务端监听关闭
future.channel().closeFuture().sync();
}finally {
//优雅退出,释放线程池资源
boosGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new TimeServer().bind(8888);
}
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new TimeServerHandler());
}
}
}
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf=(ByteBuf)msg;
byte[] req=new byte[buf.readableBytes()];
buf.readBytes(req);
String body=new String(req,"UTF-8");
System.out.println(body);
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String currentTime=simpleDateFormat.format(new Date());
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.write(resp);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}
实战弹幕系统
java:
public class WebSocketDanmuServer {
private int port;
public WebSocketDanmuServer(int port) {
this.port = port;
}
public void run(){
EventLoopGroup bossGroup=new NioEventLoopGroup(1);
EventLoopGroup workGroup=new NioEventLoopGroup(8);
try {
ServerBootstrap b=new ServerBootstrap();
b.group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new WebsocketDanmuServerInitializer())
.option(ChannelOption.SO_BACKLOG,128)
.childOption(ChannelOption.SO_KEEPALIVE,true);
System.out.println("弹幕系统启动了 "+port);
ChannelFuture future = b.bind(port).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
workGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
new WebSocketDanmuServer(8080).run();
}
}
public class WebsocketDanmuServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast("http-decodec",new HttpRequestDecoder());
pipeline.addLast("http-aggregator",new HttpObjectAggregator(65536));
pipeline.addLast("http-encodec",new HttpResponseEncoder());
pipeline.addLast("http-chunked",new ChunkedWriteHandler());
/*
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(64*1024));
pipeline.addLast(new ChunkedWriteHandler());
*/
pipeline.addLast("http-request",new HttpRequestHandler("/ws"));
pipeline.addLast("WebSocket-protocol",new WebSocketServerProtocolHandler("/ws"));
pipeline.addLast("WebSocket-request",new TextWebSocketFrameHandler());
}
}
public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
private final String wsUri;
private static final File INDEX;
static {
URL location=HttpRequestHandler.class.getProtectionDomain().getCodeSource().getLocation();
try {
String path = location.toURI() + "WebsocketDanMu.html";
path = !path.contains("file:") ? path : path.substring(5);
INDEX = new File(path);
} catch (URISyntaxException e) {
throw new IllegalStateException("Unable to locate WebsocketChatClient.html", e);
}
}
public HttpRequestHandler(String wsUri) {
this.wsUri = wsUri;
}
private static void send100Continue(ChannelHandlerContext ctx) {
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
ctx.writeAndFlush(response);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
if(wsUri.equalsIgnoreCase(request.getUri())){
ctx.fireChannelRead(request.retain());
}else {
if(HttpHeaders.is100ContinueExpected(request)){
send100Continue(ctx);
}
RandomaccessFile file = new RandomAccessFile(INDEX, "r");//4
HttpResponse response = new DefaultHttpResponse(request.getProtocolVersion(), HttpResponseStatus.OK);
response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/html; charset=UTF-8");
boolean keepAlive = HttpHeaders.isKeepAlive(request);
if (keepAlive) { //5
response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, file.length());
response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
}
ctx.write(response); //6
if (ctx.pipeline().get(SslHandler.class) == null) { //7
ctx.write(new DefaultFileRegion(file.getChannel(), 0, file.length()));
} else {
ctx.write(new ChunkedNioFile(file.getChannel()));
}
ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); //8
if (!keepAlive) {
future.addListener(ChannelFutureListener.CLOSE); //9
}
file.close();
}
}
}
public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
Channel incoming=ctx.channel();
for (Channel channel:channels){
if(channel!=incoming){
channel.writeAndFlush(new TextWebSocketFrame(msg.text()));
}else {
channel.writeAndFlush(new TextWebSocketFrame("我发送的 "+msg.text()));
}
}
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // (2)
Channel incoming = ctx.channel();
// Broadcast a message to multiple Channels
channels.writeAndFlush(new TextWebSocketFrame("[SERVER] - " + incoming.remoteAddress() + " 加入"));
channels.add(incoming);
System.out.println("Client:"+incoming.remoteAddress() +"加入");
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { // (3)
Channel incoming = ctx.channel();
// Broadcast a message to multiple Channels
channels.writeAndFlush(new TextWebSocketFrame("[SERVER] - " + incoming.remoteAddress() + " 离开"));
System.err.println("Client:"+incoming.remoteAddress() +"离开");
// A closed Channel is automatically removed from ChannelGroup,
// so there is no need to do "channels.remove(ctx.channel());"
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { // (5)
Channel incoming = ctx.channel();
System.out.println("Client:"+incoming.remoteAddress()+"在线");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception { // (6)
Channel incoming = ctx.channel();
System.err.println("Client:"+incoming.remoteAddress()+"掉线");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) // (7)
throws Exception {
Channel incoming = ctx.channel();
System.err.println("Client:"+incoming.remoteAddress()+"异常");
// 当出现异常就关闭连接
cause.printStackTrace();
ctx.close();
}
}
html:
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
<head>
<meta name="Keywords" content="danmu">
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<title>弹幕网站</title>
<style type="text/css">
body {
background: url(http://ot0ak6eri.bkt.clouddn.com/01.jpg); no-repeat:top center;
font-size: 12px;
font-family: "微软雅黑";
}
* {
margin: 0;
padding: 0;
}
/* screen start*/
.screen {
width: 300px;
height: 100px;
background: #669900;
}
.dm {
width: 100%;
height: 100%;
position: absolute;
top: 0;
left: 0;
display: none;
}
.dm .d_screen .d_del {
width: 38px;
height: 38px;
background: #600;
display: block;
text-align: center;
line-height: 38px;
text-decoration: none;
font-size: 20px;
color: #fff;
border-radius: 19px;
border: 1px solid #fff;
z-index: 2;
position: absolute;
right: 20px;
top: 20px;
outline: none;
}
.dm .d_screen .d_del:hover {
background: #F00;
}
.dm .d_screen .d_mask {
width: 100%;
height: 100%;
background: #000;
position: absolute;
top: 0;
left: 0;
opacity: 0.6;
filter: alpha(opacity = 60);
z-index: 1;
}
.dm .d_screen .d_show {
position: relative;
z-index: 2;
}
.dm .d_screen .d_show div {
font-size: 26px;
line-height: 36px;
font-weight: 500;
position: absolute;
top: 76px;
left: 10;
color: #fff;
}
/*end screen*/
/*send start*/
.send {
width: 100%;
height: 76px;
position: absolute;
bottom: 0;
left: 0;
border: 1px solid red;
}
.send .s_filter {
width: 100%;
height: 76px;
background: #000;
position: absolute;
bottom: 0;
left: 0;
opacity: 0.6;
filter: alpha(opacity = 60);
}
.send .s_con {
width: 100%;
height: 76px;
position: absolute;
top: 0;
left: 0;
z-index: 2;
text-align: center;
line-height: 76px;
}
.send .s_con .s_text {
width: 800px;
height: 36px;
border: 0;
border-radius: 6px 0 0 6px;
outline: none;
}
.send .s_con .s_submit {
width: 100px;
height: 36px;
border-radius: 0 6px 6px 0;
outline: none;
font-size: 14px;
color: #fff;
background: #65c33d;
font-family: "微软雅黑";
cursor: pointer;
border: 1px solid #5bba32;
}
.send .s_con .s_submit:hover {
background: #3eaf0e;
}
/*end send*/
</style>
</head>
<body>
<a href="#" id="startDm">开启弹幕</a>
<!-- dm start -->
<div class="dm">
<!-- d_screen start -->
<div class="d_screen">
<a href="#" class="d_del">X</a>
<div class="d_mask"></div>
<div class="d_show">
</div>
</div>
<!-- end d_screen -->
<!-- send start -->
<div class="send">
<div class="s_filter"></div>
<div class="s_con">
<input type="text" class="s_text" /> <input type="button"
value="发表评论" class="s_submit" id="btn"/>
</div>
</div>
<!-- end send -->
</div>
<!-- end dm-->
<script type="text/JavaScript"
src="http://ajax.aspnetcdn.com/ajax/jQuery/jquery-1.8.0.js"></script>
<script type="text/javascript" >
String.prototype.endWith=function(str){
if(str==null||str==""||this.length==0||str.length>this.length)
return false;
if(this.substring(this.length-str.length)==str)
return true;
else
return false;
return true;
}
String.prototype.startWith=function(str){
if(str==null||str==""||this.length==0||str.length>this.length)
return false;
if(this.substr(0,str.length)==str)
return true;
else
return false;
return true;
}
</script>
<!--<script type="text/javascript" src="websocket.js"></script>-->
<script type="text/javascript">
$(function() {
$("#startDm,.d_del").click(function() {
$("#startDm,.dm").toggle(1000);
//init_screen();
});
$("#btn").click(function(){
send();
});
$(".s_text").keydown(function() {
var code = window.event.keyCode;
if (code == 13)//回车键按下时,输出到弹幕
{
send();
}
});
});
function launch()
{
var _height = $(window).height();
var _left = $(window).width() - $("#"+index).width();
var time=10000;
if(index%2==0)
time=20000;
_top+=80;
if(_top>_height-100)
_top=80;
$("#"+index).css({
left:_left,
top:_top,
color:getRandomColor()
});
$("#"+index).animate({
left:"-"+_left+"px"},
time,
function(){});
index++;
}
/* //初始化弹幕
function init_screen() {
var _top = 0;
var _height = $(window).height();
$(".d_show").find("div").show().each(function() {
var _left = $(window).width() - $(this).width();
var time=10000;
if($(this).index()%2==0)
time=20000;
_top+=80;
if(_top>_height-100)
_top=80;
$(this).css({
left:_left,
top:_top,
color:getRandomColor()
});
$(this).animate({
left:"-"+_left+"px"},
time,
function(){});
});
} */
//随机获取颜色值
function getRandomColor() {
return '#' + (function(h) {
return new Array(7 - h.length).join("0") + h
})((Math.random() * 0x1000000 << 0).toString(16))
}
</script>
<script type="text/javascript">
var websocket=null;
var _top=80;
var index=0;
var host=window.location.host;
//判断当前浏览器是否支持WebSocket
if('WebSocket' in window){
websocket=new WebSocket("ws://"+host+"/ws");
}
else{
alert("Not Support WebSocket!");
}
//连接发生错误的回调方法
websocket.onerror = function(){
setMessageInnerHTML("error");
};
//连接成功建立的回调方法
websocket.onopen = function(event){
setMessageInnerHTML("open");
}
//接收到消息的回调方法
// 收到服务器发送的消息
websocket.onmessage = function(){
setMessageInnerHTML(event.data);
}
//连接关闭的回调方法
websocket.onclose = function(){
setMessageInnerHTML("close");
}
//监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
window.onbeforeunload = function(){
websocket.close();
}
//将消息显示在网页上
function setMessageInnerHTML(innerHTML){
//修改背景图
var imgurl;
if (innerHTML.startWith("~background,")) {
var cmd = innerHTML;
imgurl = cmd.split(",")[1];
document.body.style.background = "url("+imgurl+")";
}else{
$(".d_show").Append("<div id='"+index+"'>"+ innerHTML + "</div>");
}
launch();
}
//发送消息
function send(){
//var message = document.getElementById('text').value;
var message = $(".s_text").val();
$(".s_text").val("");
websocket.send(message);
}
</script>
</body>
</html>