序
Doug lea文章《scalable io in java》 翻译,原文地址http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf
不知道是老外与国人的书写习惯差异还是什么,翻译到后期,感觉很别扭,不像是文章,有点像是个ppt.尤其每篇上边的题目??,只相当于这一页的关键字.
目录
概述
文章主要包含内容如下:
可扩展的网络服务
事件驱动处理
reactor
- 基础版本
- 多线程版本
- 其他版本
使用 java nio的api
网络服务
在一般的网络服务或分布式服务中,大多具有如下的基础结构:
- 接收请求
- 解码请求
- 处理请求
- 对响应数据编码
- 发送编码
在实际的使用过程中,每一步的开销都是不同的,如:xml解析,文件传输,web页面加载,计算服务
传统的服务设计
如图所示,每个处理程序都会被分配一个线程
传统的ServerSocket循环
1 | class Server implements Runnable { |
扩展性的目标(什么是可扩展性)
- 负载增加时优雅的降级
- 随着硬件资源的增加,能够不断提高性能
- 满足可用性和性能指标
- 短延迟
- 满足高峰需求
- 可调节服务质量
分治处理通常是实现扩展性的最好方法.
分治处理机制
- 将处理分解成更小的任务,每个任务以非阻塞的的方式执行相同的操作.
- 当任务处于启用状态时,才执行(io事件充当触发器)
java nio 对这种机制有支持的基础
- 非阻塞的读写
- 通过感知到的io事件分发给相关联的任务执行
结合事件驱动设计,可以有更多的变化.
事件驱动设计
事件驱动设计与其他方案更有效率
- 更少的资源,通常不需要为每个client创建线程
- 更少的开销,减少的线程会使降低上下文的切换,使用更少的锁.
- 调度会慢,必须手动将action绑定到事件上.
事件驱动设计也导致编码实现更加复杂困难.
- 分解为简单的非阻塞操作
- 类似gui的事件驱动
- 也不能完全消除所有的阻塞:gc, page faults(内存缺页中断)等.
- 必须跟踪服务的相关逻辑状态
图为gui中的事件驱动模型设计,其事件驱动的基本思路类似.
反应模式
- reactor模式通过派遣相应的处理器来响应IO事件(类似AWT线程)
- 处理器使用非阻塞处理(类似AWT ActionListener)
- 将处理器绑定到事件(类似Awt中 addActionListener)
参见Schmidt etal《pattern-oriented software architecture》第二卷(POSA2),还有 Richard Steven 的《networking books》,Matt Welsh的 《SEDA framework》等
基础的响应式设计
单线程版本
Channels以非阻塞模式读写文件,套接字
Bufferes被Channels直接进行读写操作的数组对象
Selectors判断Channels发生IO事件的选择器
SelectionKeys负责IO事件状态与绑定
- setup
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107class Reactor implements Runnable {
final Selector selector;
final ServerSocketChannel serverSocket;
Reactor(int port) throws IOException {
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(port));
serverSocket.configureBlocking(false);
SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
sk.attach(new Acceptor());=
}
public void run() {
try {
while (!Thread.interrupted()) {
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext())
dispatch((SelectionKey)(it.next());
selected.clear();
}
} catch (IOException ex) { /* ... */ }
}
void dispatch(SelectionKey k) {
Runnable r = (Runnable)(k.attachment());
if (r != null)
r.run();
}
class Acceptor implements Runnable {
public void run() {
try {
SocketChannel c = serverSocket.accept();
if (c != null)
new Handler(selector, c);
}
catch(IOException ex) { /* ... */ }
}
}
}
final class Handler implements Runnable {
final SocketChannel socket;
final SelectionKey sk;
ByteBuffer input = ByteBuffer.allocate(MAXIN);
ByteBuffer output = ByteBuffer.allocate(MAXOUT);
static final int READING = 0, SENDING = 1;
int state = READING;
Handler(Selector sel, SocketChannel c) throws IOException {
socket = c;
c.configureBlocking(false);
// Optionally try first read now
sk = socket.register(sel, 0);
sk.attach(this); //将Handler绑定到SelectionKey上
sk.interestOps(SelectionKey.OP_READ);
sel.wakeup();
}
boolean inputIsComplete() { /* ... */ }
boolean outputIsComplete() { /* ... */ }
void process() { /* ... */ }
// 请求处理
public void run() {
try {
if (state == READING) read();
else if (state == SENDING) send();
} catch (IOException ex) { /* ... */ }
}
void read() throws IOException {
socket.read(input);
if (inputIsComplete()) {
process();
state = SENDING;
// Normally also do first write now
sk.interestOps(SelectionKey.OP_WRITE);
}
}
void send() throws IOException {
socket.write(output);
if (outputIsComplete()) sk.cancel();
}
}
class Handler {
public void run() {
socket.read(input);
if (inputIsComplete()) {
process();
sk.attach(new Sender());
sk.interest(SelectionKey.OP_WRITE);
sk.selector().wakeup();
}
}
class Sender implements Runnable {
public void run(){ // ...
socket.write(output);
if (outputIsComplete()) sk.cancel();
}
}
}
使用状态模式(GoF)进行优化,不需要再进行状态的判断。
1 | class Handler { // ... |
多线程设计实现
多处理器下,增加线程数可提高伸缩性.
增加worker线程
reactor需要迅速触发处理流程,process()
方法会使reactor变慢,将非io操作(process()方法
)放到worker thread
中增加reactor线程
reactor多线程能够饱和式处理io,将负载分发到其他reactor也可以进行负载均衡,根据cpu和io使用率进行调整
工作者线程
- 从reactor thread中移除非io处理可以提高reactor thread速度(类似POSA2中Proactor设计)
- 比将非阻塞计算处理重新设计为事件驱动模式更简单
- 很难与io并发处理,最好在第一时间读取缓冲区所有数据.
- 使用线程池方便调优与控制(通常需要比客户端连接数量少很多的线程数)
handler的线程池版实现
1 | class Handler implements Runnable { |
协调work thread中的任务问题
- 任务之间的交互,每个任务的启动,执行,传递通常很快,因此难以控制
- 每个handler中分发器的回调设置状态,返回值等(中介者模式的变体)
- 不同线程的缓冲区问题
- 需要返回值时,线程需要通过join,wait/notify等方法进行协调
使用池化处理
将work-thread进行池化,使用execute(Runnable r)
作为主要方法进行处理,可以通过线程池的参数进行控制,进行调优.如:
- 阻塞队列的选择
- 空闲线程的存活时间
- 拒绝策略
- 最大/最小线程数
- “Warm” versus on-demand threads(实在没懂这句话 = =b…,按照线程池的7个参数中 ThreadFactory,可以生产 daemon线程,应该是指这个吧!)
reactor线程的池化处理
- 可以根据io/cpu使用率调整,达到合适的状态
- 每个reactor静态或动态构造,在自己的线程中,包含自己的selector,dispatch loop(指进行分发的循环体)
- 在主接收器(acceptor)中分发给其他reactor.
1 | Selector[] selectors; // also create threads |
与java-nio的其他特性结合
- 一个reactor拥有多个selector,将不同的handler绑定不同的io时间时,需要小心同步问题.
- 网络文件传输.
- 内存映射文件,通过缓冲区访问文件
- 直接缓冲区,合适的情况下使用零拷贝,但是会又初始化和释放的开销.适合长期存活的应用.
基础连接的扩展
(翻译不出人话了已经…..没看懂…..直译出来的….)
- 代替单个服务请求
- client建立连接
- client发送请求
- 客户端端口连接
- 如:数据库和事务的监控,多人在线游戏,聊天室等.
- 能基于网络服务模式进行扩展: 处理长期连接的客户端,跟踪会话状态,跨主机分发服务(分布式).
api走读
原文中没有注释,只包含api
- Buffer
- ByteBuffer
(CharBuffer, LongBuffer, etc not shown.) - Channel
- SelectableChannel
- SocketChannel
- ServerSocketChannel
- FileChannel
- Selector
- SelectionKey