0%

FLux 和 Mono 操作符

创建一个新序列, 它…​​

  • 发出一个 T, 已经存在了: just
    • …​​基于一个 Optional<T>: Mono#justOrEmpty(Optional<T>)
    • …​​基于一个可能为 nullT: Mono#justOrEmpty(T)
  • 发出一个 T, 且还是由 just 方法返回
    • …​​但是 “懒” 创建的: 使用 Mono#fromSupplier 或用 defer 包装 just
  • 发出许多 T, 这些元素可以被明确列举出来: Flux#just(T...)
  • 基于迭代数据结构:
    • 一个数组: Flux#fromArray
    • 一个集合或 iterable: Flux#fromIterable
    • 一个 Integerrange: Flux#range
    • 一个 Stream 提供给每一个订阅: Flux#fromStream(Supplier<Stream>)
  • 基于一个参数值给出的源:
    • 一个 Supplier<T>: Mono#fromSupplier
    • 一个任务: Mono#fromCallable, Mono#fromRunnable
    • 一个 CompletableFuture<T>: Mono#fromFuture
  • 直接完成: empty
  • 立即生成错误: error
    • …​​但是 “懒” 的方式生成 Throwable: error(Supplier<Throwable>)
  • 什么都不做: never
  • 订阅时才决定: defer
  • 依赖一个可回收的资源: using
  • 可编程地生成事件 (可以使用状态) :
    • 同步且逐个的: Flux#generate
    • 异步 (也可同步) 的, 每次尽可能多发出元素: Flux#create (Mono#create 也是异步的, 只不过只能发一个)

对序列进行转化

转化一个序列

  • 11 地转化 (比如字符串转化为它的长度) : map
    • …​​类型转化: cast
    • …​​为了获得每个元素的序号: Flux#index
  • 1n 地转化 (如字符串转化为一串字符) : flatMap + 使用一个工厂方法
  • 1n 地转化可自定义转化方法和/或状态: handle
  • 对每一个元素执行一个异步操作 (如对 url 执行 http 请求) : flatMap + 一个异步的返回类型为 Publisher 的方法
    • …​​忽略一些数据: 在 flatMap lambda 中根据条件返回一个 Mono.empty()
    • …​​保留原来的序列顺序: Flux#flatMapSequential (对每个元素的异步任务会立即执行, 但会将结果按照原序列顺序排序)
    • …​​当 Mono 元素的异步任务会返回多个元素的序列时: Mono#flatMapMany

添加一些数据元素到一个现有的序列

  • 在开头添加: Flux#startWith(T...)
  • 在最后添加: Flux#concatWith(T...)

Flux 转化为集合 (以下操作都是针对 Flux 的)

  • 转化为 List: collectList, collectSortedList
  • 转化为 Map: collectMap, collectMultiMap
  • 转化为自定义集合: collect
  • 计数: count
  • reduce 算法 (将上个元素的 reduce 结果与当前元素值作为输入执行 reduce 方法, 如 sum) reduce
    • …​​将每次 reduce 的结果立即发出: scan
  • 转化为一个 boolean 值:
    • 对所有元素判断都为 true: all
    • 对至少一个元素判断为 true: any
    • 判断序列是否有元素 (不为空) : hasElements
    • 判断序列中是否有匹配的元素: hasElement

合并 publishers…​​

  • 按序连接: Flux#concat.concatWith(other)
    • …​​即使有错误, 也会等所有的 publishers 连接完成: Flux#concatDelayError
    • …​​按订阅顺序连接 (这里的合并仍然可以理解成序列的连接) : Flux#mergeSequential
  • 按元素发出的顺序合并 (无论哪个序列的, 元素先到先合并) : Flux#merge / .mergeWith(other)
    • …​​元素类型会发生变化: Flux#zip / Flux#zipWith
  • 将元素组合:
    • 2Monos 组成 1Tuple2: Mono#zipWith
    • nMonos 的元素都发出来后组成一个 Tuple: Mono#zip
  • 在终止信号出现时 “采取行动” :
    • Mono 终止时转换为一个 Mono<Void>: Mono#and
    • nMono 都终止时返回 Mono<Void>: Mono#when
    • 返回一个存放组合数据的类型, 对于被合并的多个序列:
      • 每个序列都发出一个元素时: Flux#zip
      • 任何一个序列发出元素时: Flux#combineLatest
  • 只取各个序列的第一个元素: Flux#first, Mono#first, mono.or (otherMono).or(thirdMono), `flux.or(otherFlux).or(thirdFlux)
  • 由一个序列触发 (类似于 flatMap, 不过 “喜新厌旧” ) : switchMap
  • 由每个新序列开始时触发 (也是 “喜新厌旧” 风格) : switchOnNext

重复一个序列: repeat

  • …​​但是以一定的间隔重复: Flux.interval(duration).flatMap(tick -> myExistingPublisher)

一个空序列, 但是…​​

  • 需要一个缺省值来代替: defaultIfEmpty
  • 需要一个缺省的序列来代替: switchIfEmpty

一个序列, 但是对序列的元素值不感兴趣: ignoreElements

  • …​​并且希望用 Mono 来表示序列已经结束: then
  • …​​并且需要在序列结束后等待另一个任务完成: thenEmpty
  • …​​并且需要在序列结束之后返回一个 Mono: Mono#then(mono)
  • …​​并且需要在序列结束之后返回一个值: Mono#thenReturn(T)
  • …​​并且需要在序列结束之后返回一个 Flux: thenMany

一个 Mono 但需要延迟完成…​​

  • …​​当有 1 个或 N 个其他 publishers 都发出 (或结束) 时才完成: Mono#delayUntilOther
    • …​​使用一个函数式来定义如何获取 “其他 publisher“ : Mono#delayUntil(Function)

基于一个递归的生成序列的规则扩展每一个元素, 然后合并为一个序列发出

  • …​​广度优先: expand(Function)
  • …​​深度优先: expandDeep(Function)

“窥视” (只读) 序列

在不对序列造成改变的情况下

  • 得到通知或执行一些操作:
    • 发出元素: doOnNext
    • 序列完成: Flux#doOnComplete, Mono#doOnSuccess
    • 因错误终止: doOnError
    • 取消: doOnCancel
    • 订阅时: doOnSubscribe
    • 请求时: doOnRequest
    • 完成或错误终止: doOnTerminate (Mono 的方法可能包含有结果)
      • 但是在终止信号向下游传递 之后 : doAfterTerminate
    • 所有类型的信号 (Signal) : Flux#doOnEach
    • 所有结束的情况 (完成 complete、错误 error、取消 cancel) : doFinally
  • 记录日志: log

知道所有的事件

  • 每一个事件都体现为一个 single 对象:
    • 执行 callback: doOnEach
    • 每个元素转化为 single 对象: materialize
      • …​​在转化回元素: dematerialize
  • 转化为一行日志: log

过滤序列

过滤一个序列

  • 基于给定的判断条件: filter
    • …​​异步地进行判断: filterWhen
  • 仅限于指定类型的对象: ofType
  • 忽略所有元素: ignoreElements
  • 去重:
    • 对于整个序列: Flux#distinct
    • 去掉连续重复的元素: Flux#distinctUntilChanged

只需要一部分序列

  • 只要 N 个元素:
    • 从序列的第一个元素开始算: Flux#take(long)
      • …​​取一段时间内发出的元素: Flux#take(Duration)
      • …​​只取第一个元素放到 Mono 中返回: Flux#next()
      • …​​使用 request(N) 而不是取消: Flux#limitRequest(long)
    • 从序列的最后一个元素倒数: Flux#takeLast
    • 直到满足某个条件 (包含) : Flux#takeUntil (基于判断条件) , Flux#takeUntilOther (基于对 publisher 的比较)
    • 直到满足某个条件 (不包含) : Flux#takeWhile
  • 最多只取 1 个元素:
    • 给定序号: Flux#elementAt
    • 最后一个: .takeLast(1)
      • …​​如果为序列空则发出错误信号: Flux#last()
      • …​​如果序列为空则返回默认值: Flux#last(T)
  • 跳过一些元素:
    • 从序列的第一个元素开始跳过: Flux#skip(long)
      • …​​跳过一段时间内发出的元素: Flux#skip(Duration)
    • 跳过最后的 n 个元素: Flux#skipLast
    • 直到满足某个条件 (包含) : Flux#skipUntil (基于判断条件) , Flux#skipUntilOther (基于对 publisher 的比较)
    • 直到满足某个条件 (不包含) : Flux#skipWhile
  • 采样:
    • 给定采样周期: Flux#sample(Duration)
      • 取采样周期里的第一个元素而不是最后一个: sampleFirst
    • 基于另一个 publisher: Flux#sample(Publisher)
    • 基于 publisher “超时” : Flux#sampleTimeout (每一个元素会触发一个 publisher, 如果这个 publisher 不被下一个元素触发的 publisher 覆盖就发出这个元素)

只需要一个元素 (如果多于一个就返回错误) …​​

  • 如果序列为空, 发出错误信号: Flux#single()
  • 如果序列为空, 发出一个缺省值: Flux#single(T)
  • 如果序列为空就返回一个空序列: Flux#singleOrEmpty

错误处理

  • 需要创建一个错误序列: error…​​
    • …​​替换一个完成的 Flux: .concat(Flux.error(e))
    • …​​替换一个完成的 Mono: .then(Mono.error(e))
    • …​​如果元素超时未发出: timeout
    • …​​ “懒” 创建: error(Supplier<Throwable>)
  • 需要类似 try/catch 的表达方式:
    • 抛出异常: error
    • 捕获异常:
      • 然后返回缺省值: onErrorReturn
      • 然后返回一个 FluxMono: onErrorResume
      • 包装异常后再抛出: .onErrorMap(t -> new RuntimeException(t))
    • finally 代码块: doFinally
    • Java 7 之后的 try-with-resources 写法: using 工厂方法
  • 需要从错误中恢复…​​
    • 返回一个缺省的:
      • 的值: onErrorReturn
      • Publisher: Flux#onErrorResumeMono#onErrorResume
    • 重试: retry
      • …​​由一个用于伴随 Flux 触发: retryWhen
  • 需要处理回压错误 (向上游发出 “MAX” 的 request, 如果下游的 request 比较少, 则应用策略) …​​
    • 抛出 IllegalStateException: Flux#onBackpressureError
    • 丢弃策略: Flux#onBackpressureDrop
      • …​​但是不丢弃最后一个元素: Flux#onBackpressureLatest
    • 缓存策略 (有限或无限) : Flux#onBackpressureBuffer
      • …​​当有限的缓存空间用满则应用给定策略: Flux#onBackpressureBuffer 带有策略 BufferOverflowStrategy

基于时间的操作

  • 需要将元素转换为带有时间信息的 Tuple2<Long, T>…​​
    • 从订阅时开始: elapsed
    • 记录时间戳: timestamp
  • 如果元素间延迟过长则中止序列: timeout
  • 以固定的周期发出元素: Flux#interval
  • 在一个给定的延迟后发出 0: static Mono.delay.
  • 需要引入延迟:
    • 对每一个元素: Mono#delayElement, Flux#delayElements
    • 延迟订阅: delaySubscription

拆分 Flux

需要将一个 Flux<T> 拆分为一个 Flux<Flux<T>>

  • 以个数为界: window(int)
    • …​​会出现重叠或丢弃的情况: window(int, int)
  • 以时间为界: window(Duration)
    • …​​会出现重叠或丢弃的情况: window(Duration, Duration)
  • 以个数或时间为界: windowTimeout(int, Duration)
  • 基于对元素的判断条件: windowUntil
    • …​​触发判断条件的元素会分到下一波 (cutBefore 变量) : .windowUntil(predicate, true)
    • …​​满足条件的元素在一波, 直到不满足条件的元素发出开始下一波: windowWhile (不满足条件的元素会被丢弃)
  • 通过另一个 Publisher 的每一个 onNext 信号来拆分序列: window(Publisher), windowWhen

需要将一个 Flux<T> 的元素拆分到集合…​​

  • 拆分为一个一个的 List:
    • 以个数为界: buffer(int)
      • …​​会出现重叠或丢弃的情况: buffer(int, int)
    • 以时间为界: buffer(Duration)
      • …​​会出现重叠或丢弃的情况: buffer(Duration, Duration)
    • 以个数或时间为界: bufferTimeout(int, Duration)
    • 基于对元素的判断条件: bufferUntil(Predicate)
      • …​​触发判断条件的元素会分到下一个 buffer: .bufferUntil(predicate, true)
      • …​​满足条件的元素在一个 buffer, 直到不满足条件的元素发出开始下一 buffer: bufferWhile(Predicate)
    • 通过另一个 Publisher 的每一个 onNext 信号来拆分序列: buffer(Publisher), bufferWhen
  • 拆分到指定类型的 “collection”: buffer(int, Supplier<C>)

需要将 Flux<T> 中具有共同特征的元素分组到子 Flux

  • groupBy(Function<T, K>) TIP: 注意返回值是 Flux<GroupedFlux<K, T>>, 每一个 GroupedFlux 具有相同的 keyK, 可以通过 key() 方法获取。

回到同步的世界

  • 有一个 Flux<T>, 需要:
    • 在拿到第一个元素前阻塞: Flux#blockFirst
      • …​​并给出超时时限: Flux#blockFirst(Duration)
    • 在拿到最后一个元素前阻塞 (如果序列为空则返回 null) : Flux#blockLast
      • …​​并给出超时时限: Flux#blockLast(Duration)
    • 同步地转换为 Iterable<T>: Flux#toIterable
    • 同步地转换为 Java 8 Stream<T>: Flux#toStream
  • 有一个 Mono<T>, 需要:
    • 在拿到元素前阻塞: Mono#block
      • …​​并给出超时时限: Mono#block(Duration)
    • 转换为 CompletableFuture<T>: Mono#toFuture

参考

附录 A: 需要什么操作符