• java
  • go
  • 数据库
  • linux
  • 中间件
  • 书
  • 源码
  • 夕拾

  • java
  • go
  • 数据库
  • linux
  • 中间件
  • 书
  • 源码
  • 夕拾

<scalable io in java> 翻译

序

Doug lea文章《scalable io in java》 翻译,原文地址http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf

不知道是老外与国人的书写习惯差异还是什么,翻译到后期,感觉很别扭,不像是文章,有点像是个ppt.尤其每篇上边的题目??,只相当于这一页的关键字.

目录

  • 序
  • 目录
  • 概述
  • 网络服务
  • 传统的服务设计
  • 扩展性的目标(什么是可扩展性)
  • 分治处理机制
  • 事件驱动设计
  • 反应模式
  • 基础的响应式设计
    • 单线程版本
    • 多线程设计实现
    • 基础连接的扩展
  • api走读

概述

文章主要包含内容如下:

  • 可扩展的网络服务

  • 事件驱动处理

  • reactor

    • 基础版本
    • 多线程版本
    • 其他版本
  • 使用 java nio的api

网络服务

在一般的网络服务或分布式服务中,大多具有如下的基础结构:

  1. 接收请求
  2. 解码请求
  3. 处理请求
  4. 对响应数据编码
  5. 发送编码

在实际的使用过程中,每一步的开销都是不同的,如:xml解析,文件传输,web页面加载,计算服务

传统的服务设计

微信图片_20200910143656.png

如图所示,每个处理程序都会被分配一个线程

传统的ServerSocket循环

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class Server implements Runnable {
public void run() {
try {
ServerSocket ss = new ServerSocket(PORT);
while (!Thread.interrupted())
new Thread(new Handler(ss.accept())).start();
// or, single-threaded, or a thread pool
} catch (IOException ex) {
/* ... */ }
}

static class Handler implements Runnable {
final Socket socket;

Handler(Socket s) {
socket = s;
}

public void run() {
try {
byte[] input = new byte[MAX_INPUT];
socket.getInputStream().read(input);
byte[] output = process(input);
socket.getOutputStream().write(output);
} catch (IOException ex) {
/* ... */ }
}

private byte[] process(byte[] cmd) {
/* ... */ }
}
}

扩展性的目标(什么是可扩展性)

  • 负载增加时优雅的降级
  • 随着硬件资源的增加,能够不断提高性能
  • 满足可用性和性能指标
    • 短延迟
    • 满足高峰需求
    • 可调节服务质量
      分治处理通常是实现扩展性的最好方法.

分治处理机制

  • 将处理分解成更小的任务,每个任务以非阻塞的的方式执行相同的操作.
  • 当任务处于启用状态时,才执行(io事件充当触发器)

java nio 对这种机制有支持的基础

  • 非阻塞的读写
  • 通过感知到的io事件分发给相关联的任务执行

结合事件驱动设计,可以有更多的变化.

事件驱动设计

事件驱动设计与其他方案更有效率

  • 更少的资源,通常不需要为每个client创建线程
  • 更少的开销,减少的线程会使降低上下文的切换,使用更少的锁.
  • 调度会慢,必须手动将action绑定到事件上.

事件驱动设计也导致编码实现更加复杂困难.

  • 分解为简单的非阻塞操作
    • 类似gui的事件驱动
    • 也不能完全消除所有的阻塞:gc, page faults(内存缺页中断)等.
  • 必须跟踪服务的相关逻辑状态

事件驱动在gui的中的实现.png
图为gui中的事件驱动模型设计,其事件驱动的基本思路类似.

反应模式

  • reactor模式通过派遣相应的处理器来响应IO事件(类似AWT线程)
  • 处理器使用非阻塞处理(类似AWT ActionListener)
  • 将处理器绑定到事件(类似Awt中 addActionListener)

    参见Schmidt etal《pattern-oriented software architecture》第二卷(POSA2),还有 Richard Steven 的《networking books》,Matt Welsh的 《SEDA framework》等

基础的响应式设计

基础的响应式设计.png

单线程版本

Channels以非阻塞模式读写文件,套接字
Bufferes被Channels直接进行读写操作的数组对象
Selectors判断Channels发生IO事件的选择器
SelectionKeys负责IO事件状态与绑定

  1. setup
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    class Reactor implements Runnable { 
    final Selector selector;
    final ServerSocketChannel serverSocket;
    Reactor(int port) throws IOException {
    selector = Selector.open();
    serverSocket = ServerSocketChannel.open();
    serverSocket.socket().bind(new InetSocketAddress(port));
    serverSocket.configureBlocking(false);
    SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
    sk.attach(new Acceptor());=
    }

    public void run() {
    try {
    while (!Thread.interrupted()) {
    selector.select();
    Set selected = selector.selectedKeys();
    Iterator it = selected.iterator();
    while (it.hasNext())
    dispatch((SelectionKey)(it.next());
    selected.clear();
    }
    } catch (IOException ex) { /* ... */ }
    }

    void dispatch(SelectionKey k) {
    Runnable r = (Runnable)(k.attachment());
    if (r != null)
    r.run();
    }

    class Acceptor implements Runnable {
    public void run() {
    try {
    SocketChannel c = serverSocket.accept();
    if (c != null)
    new Handler(selector, c);
    }
    catch(IOException ex) { /* ... */ }
    }
    }
    }

    final class Handler implements Runnable {
    final SocketChannel socket;
    final SelectionKey sk;
    ByteBuffer input = ByteBuffer.allocate(MAXIN);
    ByteBuffer output = ByteBuffer.allocate(MAXOUT);
    static final int READING = 0, SENDING = 1;
    int state = READING;

    Handler(Selector sel, SocketChannel c) throws IOException {
    socket = c;
    c.configureBlocking(false);
    // Optionally try first read now
    sk = socket.register(sel, 0);
    sk.attach(this); //将Handler绑定到SelectionKey上
    sk.interestOps(SelectionKey.OP_READ);
    sel.wakeup();
    }
    boolean inputIsComplete() { /* ... */ }
    boolean outputIsComplete() { /* ... */ }
    void process() { /* ... */ }

    // 请求处理
    public void run() {
    try {
    if (state == READING) read();
    else if (state == SENDING) send();
    } catch (IOException ex) { /* ... */ }
    }

    void read() throws IOException {
    socket.read(input);
    if (inputIsComplete()) {
    process();
    state = SENDING;
    // Normally also do first write now
    sk.interestOps(SelectionKey.OP_WRITE);
    }
    }
    void send() throws IOException {
    socket.write(output);
    if (outputIsComplete()) sk.cancel();
    }
    }


    class Handler {
    public void run() {
    socket.read(input);
    if (inputIsComplete()) {
    process();
    sk.attach(new Sender());
    sk.interest(SelectionKey.OP_WRITE);
    sk.selector().wakeup();
    }
    }
    class Sender implements Runnable {
    public void run(){ // ...
    socket.write(output);
    if (outputIsComplete()) sk.cancel();
    }
    }
    }


使用状态模式(GoF)进行优化,不需要再进行状态的判断。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class Handler { // ...
public void run() { // initial state is reader
socket.read(input);
if (inputIsComplete()) {
process();
sk.attach(new Sender());
sk.interest(SelectionKey.OP_WRITE);
sk.selector().wakeup();
}
}
class Sender implements Runnable {
public void run(){ // ...
socket.write(output);
if (outputIsComplete()) sk.cancel();
}
}
}

多线程设计实现

多处理器下,增加线程数可提高伸缩性.

  1. 增加worker线程
    reactor需要迅速触发处理流程,process()方法会使reactor变慢,将非io操作(process()方法)放到worker thread中

  2. 增加reactor线程
    reactor多线程能够饱和式处理io,将负载分发到其他reactor也可以进行负载均衡,根据cpu和io使用率进行调整

工作者线程

  • 从reactor thread中移除非io处理可以提高reactor thread速度(类似POSA2中Proactor设计)
  • 比将非阻塞计算处理重新设计为事件驱动模式更简单
  • 很难与io并发处理,最好在第一时间读取缓冲区所有数据.
  • 使用线程池方便调优与控制(通常需要比客户端连接数量少很多的线程数)

模型图.png

handler的线程池版实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class Handler implements Runnable {
// uses util.concurrent thread pool
static PooledExecutor pool = new PooledExecutor(...);
static final int PROCESSING = 3;
// ...
synchronized void read() { // ...
socket.read(input);
if (inputIsComplete()) {
state = PROCESSING;
pool.execute(new Processer());
}
}
synchronized void processAndHandOff() {
process();
state = SENDING; // or rebind attachment
sk.interest(SelectionKey.OP_WRITE);
}
class Processer implements Runnable {
public void run() { processAndHandOff(); }
}
}

协调work thread中的任务问题

  • 任务之间的交互,每个任务的启动,执行,传递通常很快,因此难以控制
  • 每个handler中分发器的回调设置状态,返回值等(中介者模式的变体)
  • 不同线程的缓冲区问题
  • 需要返回值时,线程需要通过join,wait/notify等方法进行协调

使用池化处理
将work-thread进行池化,使用execute(Runnable r)作为主要方法进行处理,可以通过线程池的参数进行控制,进行调优.如:

  1. 阻塞队列的选择
  2. 空闲线程的存活时间
  3. 拒绝策略
  4. 最大/最小线程数
  5. “Warm” versus on-demand threads(实在没懂这句话 = =b…,按照线程池的7个参数中 ThreadFactory,可以生产 daemon线程,应该是指这个吧!)

reactor线程的池化处理

  • 可以根据io/cpu使用率调整,达到合适的状态
  • 每个reactor静态或动态构造,在自己的线程中,包含自己的selector,dispatch loop(指进行分发的循环体)
  • 在主接收器(acceptor)中分发给其他reactor.
1
2
3
4
5
6
7
8
9
10
11
Selector[] selectors; // also create threads
int next = 0;
class Acceptor { // ...
public synchronized void run() { ...
Socket connection = serverSocket.accept();
if (connection != null)
new Handler(selectors[next], connection);
if (++next == selectors.length) next = 0;
}
}

reacotr多线程模式.png

与java-nio的其他特性结合

  • 一个reactor拥有多个selector,将不同的handler绑定不同的io时间时,需要小心同步问题.
  • 网络文件传输.
    • 内存映射文件,通过缓冲区访问文件
    • 直接缓冲区,合适的情况下使用零拷贝,但是会又初始化和释放的开销.适合长期存活的应用.

基础连接的扩展

(翻译不出人话了已经…..没看懂…..直译出来的….)

  • 代替单个服务请求
    1. client建立连接
    2. client发送请求
    3. 客户端端口连接
  • 如:数据库和事务的监控,多人在线游戏,聊天室等.
  • 能基于网络服务模式进行扩展: 处理长期连接的客户端,跟踪会话状态,跨主机分发服务(分布式).

api走读

原文中没有注释,只包含api

  • Buffer
  • ByteBuffer
    (CharBuffer, LongBuffer, etc not shown.)
  • Channel
  • SelectableChannel
  • SocketChannel
  • ServerSocketChannel
  • FileChannel
  • Selector
  • SelectionKey
tcp协议详解
  1. 1. 序
  2. 2. 目录
  3. 3. 概述
  4. 4. 网络服务
  5. 5. 传统的服务设计
  6. 6. 扩展性的目标(什么是可扩展性)
  7. 7. 分治处理机制
  8. 8. 事件驱动设计
  9. 9. 反应模式
  10. 10. 基础的响应式设计
    1. 10.1. 单线程版本
    2. 10.2. 多线程设计实现
    3. 10.3. 基础连接的扩展
  11. 11. api走读
© 2023 haoxp
Hexo theme