• java
  • go
  • 数据库
  • linux
  • 中间件
  • 书
  • 源码
  • 夕拾

  • java
  • go
  • 数据库
  • linux
  • 中间件
  • 书
  • 源码
  • 夕拾

reactor-core

概念理解

声明式编程:类似sql,通过语句,底层处理交给语言计算机
命令式编程:传统的写法即为命令式编程
函数式编程:数据处理过程编为函数变量,可作为出入参数
响应式编程:加入异步

reactor使用文档

flux

subscribe

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public final Disposable subscribe(
@Nullable Consumer<? super T> consumer,
@Nullable Consumer<? super Throwable> errorConsumer,
@Nullable Runnable completeConsumer) {
return subscribe(consumer, errorConsumer, completeConsumer, (Context) null);
}

public final Disposable subscribe(
@Nullable Consumer<? super T> consumer,
@Nullable Consumer<? super Throwable> errorConsumer,
@Nullable Runnable completeConsumer,
@Nullable Context initialContext) {
return subscribeWith(new LambdaSubscriber<>(consumer, errorConsumer,
completeConsumer,
null,
initialContext));
}

public final void subscribe(Subscriber<? super T> actual) {
//Applies the hooks registered with Hooks.onLastOperator and returns
//CorePublisher ready to be subscribed on.
//通过lastAssembly方法,提前定义一个钩子函数,每次发生订阅完成统一的调用
//相当于拦截器
CorePublisher publisher = Operators.onLastAssembly(this);
CoreSubscriber subscriber = Operators.toCoreSubscriber(actual);

try {
if (publisher instanceof OptimizableOperator) {
OptimizableOperator operator = (OptimizableOperator) publisher;
while (true) {
subscriber = operator.subscribeOrReturn(subscriber);
if (subscriber == null) {
// null means "I will subscribe myself", returning...
return;
}
OptimizableOperator newSource = operator.nextOptimizableSource();
if (newSource == null) {
publisher = operator.source();
break;
}
operator = newSource;
}
}

publisher.subscribe(subscriber);
}
catch (Throwable e) {
Operators.reportThrowInSubscribe(subscriber, e);
return;
}
}

core-subscriber

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public interface CoreSubscriber<T> extends Subscriber<T> {

/**
* Request a {@link Context} from dependent components which can include downstream
* operators during subscribing or a terminal {@link org.reactivestreams.Subscriber}.
*
* @return a resolved context or {@link Context#empty()}
*/
// reactor上下文
default Context currentContext(){
return Context.empty();
}

/**
* Implementors should initialize any state used by {@link #onNext(Object)} before
* calling {@link Subscription#request(long)}. Should further {@code onNext} related
* state modification occur, thread-safety will be required.
* <p>
* Note that an invalid request {@code <= 0} will not produce an onError and
* will simply be ignored or reported through a debug-enabled
* {@link reactor.util.Logger}.
*
* {@inheritDoc}
*/
// <=不会抛出异常
@Override
void onSubscribe(Subscription s);
}

zookeeper单机启动
mysql日志
  1. 1. 概念理解
  2. 2. flux
    1. 2.1. subscribe
  3. 3. core-subscriber
© 2023 haoxp
Hexo theme