FastThreadLocal快在哪

ThreadLocal原理

ThreadLocal本质上就是在Thread对象中维护了一个Map对象,Map的key为ThreadLocal自身,value为要存的值。将数据保存在Thread中而不是保存在ThreadLocal中,这一设计使得当线程对象销毁时,在该线程的所有变量都会随之销毁。以ThreadLocal对象为key,使得一个Thread对象可以同时被多个ThreadLocal对象使用,每个ThreadLocal对应一个值,保存在Thread对象的Map中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 该Map在Thread对象中,由ThreadLocal对象维护
ThreadLocal.ThreadLocalMap threadLocals = null;

public void set(T value) {
Thread t = Thread.currentThread();
// 获取该线程的Map
ThreadLocalMap map = getMap(t);
// 将当前ThreadLocal和值存入Map
if (map != null)
map.set(this, value);
else
createMap(t, value);
}

public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}

FastThreadLocal优化点

由于ThreadLocal使用的是hash map数据结构,其查找需要通过hash值计算索引,再通过索引查找值,其速度比数组结构要慢。所以Netty提供了一种数组结构的ThreadLocal,也就是FastThreadLocal。其实现原理是在Thread对象中维护了一个InternalThreadLocalMap对象,该InternalThreadLocalMap对象内部维护了一个数组,数组中保存要存的值,不保存对应的FastThreadLocal对象,而是每个FastThreadLocal自己维护该值在数组中对应的索引。一个FastThreadLocal所保存值对应的索引在其创建的时候已经确定,其所保存的所有Thread都应用该索引位置。

1
2
3
4
5
6
// 创建FastThreadLocal,确定其所对应的数组索引。
// 该值是静态原子类型的,所有ThreadLocal对象共享,所以不会出现索引冲突值被覆盖等问题。
static final AtomicInteger nextIndex = new AtomicInteger();
public FastThreadLocal() {
index = InternalThreadLocalMap.nextVariableIndex();
}

Netty编解码器

ByteToMessageCodec

字节转换成消息对象的编解码器。实现了ChannelInboundHandlerAdapter和ChannelOutboundHandler,说明无论是对外还是对内都会经过该Handler。ByteToMessageCodec内部维护encoder和decoder两个对象,负责编码和解码。对于编解码器只需要处理read和write两种请求,当发生read时会将请求传递给decoder对象去解码,当发生write时会将请求传递给encoder对象编码。

1
2
3
4
5
6
7
8
9
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
decoder.channelRead(ctx, msg);
}

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
encoder.write(ctx, msg, promise);
}

再来看看encoder和decoder这两个对象,encoder是一个MessageToByteEncoder消息对象转换成字节的编码器,decoder是一个ByteToMessageDecoder字节转换成消息对象的解码器。对外的请求会经过Encoder编码器编码成字节数据,而读取远端的数据则会经过Decoder解码成消息对象再传递给上层。所以上层在设置读取消息的Handler时必须设置在编解码器之后。

1
2
3
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {}

public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter {}

Pipeline之Handler链

ChannelPipeline采用责任链模式设计。在责任链模式中,由每个对象对其下个对象的引用而连接起来的一条链,请求在这条链上传递,知道链上某个对象处理该请求并中止传递。发送请求者并不知道链上的哪个对象处理该请求,从而使得系统可以在不影响发送端的情况下动态的重新分配责任。
Netty-pipeline责任链

DefaultChannelPipeline

Handler链的默认实现类,维护了链中的头结点和尾结点,DefaultChannelPipeline对象在构建时,就会创建头结点和尾结点对象。之后添加的Handler都会添加到两者中间,addLast会添加到尾结点的前面,addFirst会添加到头结点的后面。在添加时可以给Handler指定名称,如果名称有重复会抛出异常。
Handler分为两种,一种是实现ChannelInboundHandler接口的,一种是实现ChannelOutboundHandler接口的。ChannelInboundHandler接口主要处理从外到内的请求,将远端操作通知本地(比如注册,激活,读等),从Head结点开始传递,直到tail结点。而ChannelOutboundHandler接口主要处理从内到外的操作,将本地操作通知远端(比如连接,绑定,写)等,从tail结点开始传递,到Head结点后转由unsafe类进行真正处理。

1
2
tail = new TailContext(this); // tail实现了ChannelInboundHandler接口
head = new HeadContext(this); // head实现了ChannelOutboundHandler和ChannelInboundHandler接口

上文讲到的ServerBootstrapAcceptor类,就是一个ChannelInboundHandlerAdapter类型的Handler。所以当连接完成对上层发出通知时,通知从Head结点开始传递,到达ServerBootstrapAcceptor中处理并中止传递。

AbstractChannelHandlerContext

AbstractChannelHandlerContext是对ChannelHandler的一个包装类,Pipeline中真正的对象实际上是AbstractChannelHandlerContext。在AbstractChannelHandlerContext中维护了next、prev属性,所以这是一个双向链表结构。

1
2
3
volatile AbstractChannelHandlerContext next; // 下个结点
volatile AbstractChannelHandlerContext prev; // 上个结点
private final ChannelHandler handler; // 真正Handler对象

接下来讲到的编解码器也是根据Pipeline来实现的。

ByteBuf

ByteBuf

为了简化ByteBuffer的操作,Netty自定义了ByteBuf,相对于ByteBuffer的读写索引共用,Netty将其分开,定义了读索引和写索引,避免了每次读写都需要flip方法来转换。并且定义了扩容方法,解决了ByteBuffer容量无法修改的问题。

基本属性

  • readerIndex : 读索引
  • writerIndex : 写索引
  • markReaderIndex : 标记读索引位置
  • markWriterIndex : 标记写索引位置
  • readableBytes : 可读字节大小(writerIndex - readerIndex)
  • writableBytes : 可写字节大小(capacity - writerIndex)
  • resetReaderIndex : 还原到标记读索引位置
  • resetWriterIndex : 还原到标记写索引位置

池化:将用过的对象保存,重复使用。

堆外内存:使用JVM堆栈外的内存,该内存JVM无法干预,JVM的GC只能回收ByteBuf对象本身,而不能回收其指向的堆外内存,所以需要手动释放。

Netty的ByteBuf实现分为池化和非池化,其中又分为堆内存和堆外内存实现。由于堆外内存需要手动释放,所以需要自己维护引用数来标记是否释放。

Netty线程模型

服务端线程模型

Netty的服务端线程模型类似与Reactor多线程模型。采用的是Acceptor线程和IO线程分离的模式。将连接请求处理和读写处理分开,避免了因读写阻塞而导致无法处理客户端连接请求,导致客户端连接超时。
Netty服务端线程模型

  • 有一个专门的线程池(Acceptor)用来处理客户端的连接请求。
  • 有一个专门的线程池(IO)用来处理读写操作。
  • 一个线程处理N条连接,一个连接对应一个线程。

从Acceptor线程池中随机选择一个线程处理客户端连接请求,连接建立后将创建的SocketChannel注册到IO线程池中的某个线程上,由该线程负责SocketChannel的读写和编解码工作。

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// Acceptor线程池
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// IO线程池
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1000)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new EchoServerHandler());
}
});

// Start the server.
ChannelFuture f = b.bind(PORT).sync();

// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}

Netty服务端Bind-Accept流程

NIO深入

Buffer

NIO中的Buffer用于和NIOChannel进行交互,数据从Channel中读到Buffer中,数据从Buffer中写入到Channel中。Buffer本质是是一块可读写的内存区域,这块内存区域被包装成Buffer对象,并提供了一系列方法访问该内存。Buffer不是线程安全的,多线程时需要同步。

基础属性

  • 容量capacity : 缓冲区能够容纳的数据的最大数量,在创建的时候设定,不可再更改。
  • 上界limit : 缓冲区现存数据的计数,用来读写转换记录写的位置。
  • 位置position : 下一个要被读写的元素的索引。每次读写都会更新位置。
  • 标记mark : 备忘位置,用于重置位置。
    0 <= mark <= position <= limit <= capacity

重要方法

  • allocate : 创建一个Buffer对象,参数为capacity。
  • wrap :使用包装方式创建Buffer对象。
  • duplicate : 创建一个与原始Buffer相似的Buffer,两者共享数据,但是每个Buffer都有各自的position和limit。
  • slice : 创建一个切片Buffer,共享数据。
  • flip :转换读写状态,每次转换都将position的值赋给limit,将position设为0。从写转读,是从0开始读到limit(也就是已写数据)。从读转写,会将limit的值赋给position,也就是抵消了flip的操作,还原到上上次,从上次写的位置开始写。
  • remaining :limit-position,未读数据大小。
  • clear : 清空Buffer,position设为0,limit设为capacity的值。数据并未清除。可以重新开始写数据。
  • compact : 将未读的数据拷贝到Buffer头部,将position设为未读数据后面,这样未读数据会保留,重写数据从其后面开始写。
  • mark : 标记。
  • reset : 恢复到mark的position。
  • equals : 两个Buffer相等的条件,1.类型相同,2.Buffer中剩余的数据个数相同,3.Buffer中剩余的数据相同。

实现类型,Buffer的实现类型分为两种。

  • direct类型:采用直接内存实现,直接内存不受JVM控制,是由系统直接分配的,JVM无法GC回收,必须手动回收该部分内存。
  • heap类型:采用JVM堆栈实现。

大端、小端
每个基本数据类型都是以连续字节序列的形式存储在内存中。如果字节的最高字节在左边,位于低位地址中,则就是大端字节顺序。如果字节的最低字节在左边,位于低位地址中,则就是小端字节顺序。

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×