html制作企业宣传网站app开发大概费用多少钱合适
- 作者: 多梦笔记
- 时间: 2026年02月16日 22:20
当前位置: 首页 > news >正文
html制作企业宣传网站,app开发大概费用多少钱合适,怎样进行站点优化,购物网站底部设计一、概述 1.1介绍 Reactor 是一个用于JVM的完全非阻塞的响应式编程框架#xff0c;Webflux 底层使用的也是该框架#xff0c;其通过流的方式实现了异步相应#xff0c;具备高效的需求管理#xff08;即对 “背压#xff08;backpressure#xff09;”的控制#xff09…一、概述 1.1介绍 Reactor 是一个用于JVM的完全非阻塞的响应式编程框架Webflux 底层使用的也是该框架其通过流的方式实现了异步相应具备高效的需求管理即对 “背压backpressure”的控制能力。它与 Java 8 函数式 API 直接集成比如 CompletableFuture Stream 以及 Duration。它提供了异步序列 API Flux用于[N]个元素和 Mono用于 [0|1]个元素并完全遵循和实现了“响应式扩展规范”Reactive Extensions Specification。 Reactor 的 reactor-ipc 组件还支持非阻塞的进程间通信inter-process communication, IPC。 Reactor IPC 为 HTTP包括 Websockets、TCP 和 UDP 提供了支持背压的网络引擎从而适合 应用于微服务架构。并且完整支持响应式编解码reactive encoding and decoding。 1.2特性 Reactive Streams是JVM面向流的库的标准和规范 1、处理可能无限数量的元素 2、有序 3、在组件之间异步传递元素 4、强制性非阻塞背压模式 1.3相关概念 Publisher发布者产生数据流 Subscriber 订阅者 消费数据流 Subscription 订阅关系 订阅关系是发布者和订阅者之间的关键接口。订阅者通过订阅来表示对发布者产生的数据的兴趣。订阅者可以请求一定数量的元素也可以取消订阅。 Processor处理器 处理器是同时实现了发布者和订阅者接口的组件。它可以接收来自一个发布者的数据进行处理并将结果发布给下一个订阅者。处理器在Reactor中充当中间环节代表一个处理阶段允许你在数据流中进行转换、过滤和其他操作。 命令式编程全自定义 响应式编程/声明式编程 声明流、说清楚要干什么、最终结果是要怎样 响应式编程 1、底层基于数据缓冲队列 消息驱动模型 异步回调机制 2、编码流式编程 链式调用 声明式API 3、效果优雅全异步 消息实时处理 高吞吐量 占用少量资源 线程池、DataBuffer 1.4演示demo Publisher 数据发布者 Publisher 是数据的发布者它是一个函数式接口其中只有一个方法该方法可以配置其对应的消费者源码如下 FunctionalInterfacepublic static interface PublisherT {public void subscribe(Subscriber? super T subscriber);}Subscriber 数据消费者 Subscriber 负责消费数据其内部定义了 4 个方法源码如下 public static interface SubscriberT {public void onSubscribe(Subscription subscription);public void onNext(T item);public void onError(Throwable throwable);public void onComplete();}onSubscribe()当与消费者绑定成功时调用该方法 onNext()当接收到一条发布数据时调用该方法 onError()当发布者或消费者发生异常时调用该方法 onComplete()当发布者关闭且所有数据已经被全部消费后调用该方法 Subscription 订阅关系 Subscription 定义了发布者和消费者的订阅关系可以理解为两者之间的通道定义源码如下 public static interface Subscription {public void request(long n);public void cancel();}request()该方法用于向通道中请求 n 个数据进行处理 cancel()该方法用于取消发布者和消费者的绑定关系 Processor 中间处理器 Processor 是数据发布者与消费者中间的处理器可以对发布者发布的数据进行预处理后再发送给消费者进行消费。 实际上其既是发布者又是消费者即在两者中间进行了一次数据的处理转发其源码如下 public static interface ProcessorT,R extends SubscriberT, PublisherR {}消息发布订阅示例代码 package com.yanyu.reactor.flow;import java.util.concurrent.Flow; import java.util.concurrent.SubmissionPublisher;public class FlowDemo1 {//定义流中间操作处理器 只用写订阅者的接口static class MyProcessor extends SubmissionPublisherString implements Flow.ProcessorString,String {private Flow.Subscription subscription; //保存绑定关系Overridepublic void onSubscribe(Flow.Subscription subscription) {System.out.println(processor订阅绑定完成);this.subscription subscription;subscription.request(1); //找上游要一个数据}Override //数据到达触发这个回调public void onNext(String item) {System.out.println(processor拿到数据item);//再加工item 哈哈;submit(item);//把我加工后的数据发出去subscription.request(1); //再要新数据}Overridepublic void onError(Throwable throwable) {// 产生异常后直接取消订阅this.subscription.cancel();}Overridepublic void onComplete() {// 发布者所有数据全部被接收且发布者已经关闭System.out.println(数据接收完毕);}}public static void main(String[] args) throws InterruptedException {//1、定义一个发布者 发布数据SubmissionPublisherString publisher new SubmissionPublisher();//2、定一个中间操作 给每个元素加个 哈哈 前缀MyProcessor myProcessor1 new MyProcessor();MyProcessor myProcessor2 new MyProcessor();MyProcessor myProcessor3 new MyProcessor();//3、定义一个订阅者 订阅者感兴趣发布者的数据Flow.SubscriberString subscriber new Flow.SubscriberString() {private Flow.Subscription subscription;Overridepublic void onSubscribe(Flow.Subscription subscription) {System.out.println(Thread.currentThread()订阅开始了subscription);this.subscription subscription;// 向数据发布者请求数据( //从上游请求一个数据)this.subscription.request(1);}Overridepublic void onNext(String item) {System.out.println(接收到消息 item);// 接收数据后可以继续接收或取消订阅if(item.equals(7)){subscription.cancel(); //取消订阅}else {subscription.request(1);}}Overridepublic void onError(Throwable throwable) {// 产生异常后直接取消订阅this.subscription.cancel();}Overridepublic void onComplete() {// 发布者所有数据全部被接收且发布者已经关闭System.out.println(数据接收完毕);}};// 将订阅者注册到发布者//4、绑定发布者和订阅者publisher.subscribe(myProcessor1); //此时处理器相当于订阅者myProcessor1.subscribe(myProcessor2); //此时处理器相当于发布者myProcessor2.subscribe(myProcessor3);myProcessor3.subscribe(subscriber); //链表关系绑定出责任链。//绑定操作就是发布者记住了所有订阅者都有谁有数据后给所有订阅者把数据推送过去。// 发布消息for (int i 0; i 10; i) {// 发送数据if(i 5){ // publisher.closeExceptionally(new RuntimeException(5555));}else {publisher.submit(String.valueOf(i));}//publisher发布的所有数据在它的buffer区//中断 // publisher.closeExceptionally();}// 关闭发布者publisher.close();Thread.sleep(20000);// 维持程序保持开启while (true) {}} } Backpressure 回压 Backpressure 回压是指消费能力低于生产能力时Subscriber 会将 Publisher 发布的数据缓存在 Subscription 中其长度默认为256源码如下 static final int DEFAULT_BUFFER_SIZE 256;public static int defaultBufferSize() {return DEFAULT_BUFFER_SIZE;}当 Subscription 存满时生产者将根据消费者的消费能力动态的调整数据发布的速度以实现消费者对生产者的反向控制。 二、Reactor – Mono/Flux API 2.1概述 Reactor 这样的响应式库的目标就是要弥补上述“经典”的 JVM 异步方式所带来的不足 此外还会关注一下几个方面 可编排性Composability 以及 可读性Readability使用丰富的 操作符 来处理形如 流 的数据在 订阅subscribe 之前什么都不会发生背压backpressure 具体来说即 消费者能够反向告知生产者生产内容的速度的能力高层次 同时也是有高价值的的抽象从而达到 并发无关 的效果 Mono和Flux Mono: 0|1 数据流 Flux: N数据流 响应式流元素内容 信号完成/异常 Mono是一种特殊类型的Publisher。Mono对象表示单个或空值。这意味着它最多只能是onnext() 请求发出一个值然后以oncomplete()信号终止。如果失败它只会发出oneror()信号。 Flux是标准的Publisher代表 0 到 N 异步序列值。这意味着它可以发出 0 对于多个值onnnext()请求可能是无限值然后以完成或错误信号终止。 2.2创建 Mono/Flux 常见创建方法 just()使用已知内容创建 fromIterable()通过可迭代对象创建 fromStream()从集合流中创建 range()通过范围迭代创建 interval()按照从 0 递增的方式自动创建 delayElements()数据流延时发送方法 //测试Fluxpublic static void flux() throws IOException { // Mono: 0|1个元素的流 // Flux: N个元素的流 N1//发布者发布数据流源头// 1. 创建 Flux/Mono// 1.1 使用已知内容创建 FluxFlux.just(1, 2, 3, 4, hello, world).subscribe(e - System.out.println(e1 e));// 1.2 通过可迭代对象创建 FluxFlux.fromIterable(Arrays.asList(1, 2, 3, 4, 5)).subscribe(System.out::println);// 1.3 从集合流中创建 FluxFlux.fromStream(Stream.of(1,2,3,4)).subscribe(System.out::println);// 1.4 通过范围迭代创建 FluxFlux.range(0,10).subscribe(System.out::println);// 2. 创建时常用的方法// 2.1 interval() 方法可以用来生成从 0 开始递增的 Long 对象的数据序列Flux.interval(Duration.ofMillis(100))// 限制执行10次.take(10).subscribe(System.out::println);// 2.2 delayElements() 方法延时发送Flux.fromIterable(Arrays.asList(1, 2, 3, 4, 5)).delayElements(Duration.ofMillis(1000L)).subscribe(System.out::println);//3.企业级用法//3.1、多元素的流FluxInteger just Flux.just(1, 2, 3, 4, 5); ////流不消费就没用 消费订阅just.subscribe(e - System.out.println(e1 e));//一个数据流可以有很多消费者just.subscribe(e - System.out.println(e2 e));//对于每个消费者来说流都是一样的 广播模式System.out.println();FluxLong flux Flux.interval(Duration.ofSeconds(1));//每秒产生一个从0开始的递增数字flux.subscribe(System.out::println);System.in.read();} 数据中间操作 map()对序列中的每个元素应用一个函数并返回一个新的序列 flatMap()对序列中的每个元素应用一个异步函数并将多个 Publisher 合并成一个序列(将流中的每一个元素看作一个新的流进行处理并按实际生产顺序进行合并)。 flatMapSequential()按订阅顺序进行合并 filter() :过滤序列中的元素只保留满足条件的元素。 handle()对序列中的每个元素进行处理可以选择性地发出零个或多个元素。 mapNotNull()对序列中的每个元素应用一个函数并过滤掉结果为 null 的元素。 switchIfEmpty()如果序列为空则切换到另一个 Publisher defaultIfEmpty()如果序列为空则发出一个默认值。 public static void flux1() throws IOException { // Mono: 0|1个元素的流 // Flux: N个元素的流 N1//发布者发布数据流源头// 作用对序列中的每个元素应用一个函数并返回一个新的序列。 // 使用场景转化数据元素。Flux.just(1, 2, 3).map(i - i * 2).subscribe(System.out::println);// 输出: 2, 4, 6 // 作用对序列中的每个元素应用一个异步函数并将多个 Publisher 合并成一个序列。 // 使用场景异步处理和合并结果。Flux.just(1, 2, 3).flatMap(i - Flux.just(i * 2)).subscribe(System.out::println);Flux.just(zhang san, li si).flatMap(v - {String[] s v.split( );return Flux.fromArray(s); //把数据包装成多元素流}).log().subscribe();//两个人的名字按照空格拆分打印出所有的姓与名 // 作用过滤序列中的元素只保留满足条件的元素。 // 使用场景筛选数据。Flux.just(1, 2, 3, 4).filter(i - i % 2 0).subscribe(System.out::println); // 输出: 2, 4 // 作用对序列中的每个元素进行处理可以选择性地发出零个或多个元素。 // 使用场景复杂的元素转换或过滤。Flux.just(1, 2, 3, 4).handle((i, sink) - {if (i % 2 0) {sink.next(i * 2);}}).subscribe(System.out::println); // 输出: 4, 8 // 作用对序列中的每个元素应用一个函数并过滤掉结果为 null 的元素。 // 使用场景转换数据并去除 null 值。Flux.just(1, 2, 3, 4).mapNotNull(i - i % 2 0 ? i * 2 : null).subscribe(System.out::println); // 输出: 4, 8 // 作用如果序列为空则切换到另一个 Publisher。 // 使用场景提供备用数据源。Flux.empty().switchIfEmpty(Flux.just(1, 2, 3)).subscribe(System.out::println); // 输出: 1, 2, 3 // 作用如果序列为空则发出一个默认值。 // 使用场景提供默认值。 // Flux.empty() // .defaultIfEmpty(1) // .subscribe(System.out::println); // 输出: 1 // Flux.range(1, 100) // .map(x - { // return x / 2; // }) // .subscribe(System.out::println);Flux.just(5, 10).flatMap(x -Flux.interval(Duration.ofMillis(x * 10))).subscribe(System.out::println);Flux.just(5, 10).flatMapSequential(x -Flux.interval(Duration.ofMillis(x * 10)).take(x)).subscribe(System.out::println);Flux.just(5, 10).flatMapSequential(x -Flux.interval(Duration.ofMillis(x * 10)).take(10)).subscribe(System.out::println);} merge()按照所有流的实际产生顺序进行合并 mergeSequential()按照流合并的次序进行合并先消费第一个在消费第二个 zip():zip() 方法用于将多个流Publisher合并成一个流。 zipWith()把流中的元素与另一个流中对应元素进行合并多余元素将被抛弃 第二个参数可以指定合并的规则 public static void flux2() throws IOException { // Flux.merge(Flux.interval(Duration.ofMillis(10)).take(5), // Flux.interval(Duration.ofMillis(10)).take(3)) // .subscribe(System.out::println);Flux.mergeSequential(Flux.interval(Duration.ofMillis(10)).take(5),Flux.interval(Duration.ofMillis(10)).take(3)).subscribe(System.out::println);FluxInteger numbers Flux.just(1, 2, 3);FluxString letters Flux.just(A, B, C);Flux.zip(numbers, letters, (number, letter) - number letter).subscribe(System.out::println);Flux.just(1, 2).zipWith(Flux.just(3, 4, 5 ), (s1, s2) - s1 , s2).subscribe(System.out::println);System.in.read();}concat() concat() 方法用于将两个或多个流按顺序连接在一起。它会等待前一个流完成后再订阅下一个流。 FluxInteger flux1 Flux.just(1, 2, 3); FluxInteger flux2 Flux.just(4, 5, 6);Flux.concat(flux1, flux2).subscribe(System.out::println);concatWith() concatWith() 方法是 concat() 的实例方法用于将当前流与另一个流按顺序连接在一起。 FluxInteger flux1 Flux.just(1, 2, 3); FluxInteger flux2 Flux.just(4, 5, 6);flux1.concatWith(flux2).subscribe(System.out::println);concatMap() concatMap() 方法将流中的每个元素映射为一个新的流并按顺序连接这些流。 Flux.just(1, 2, 3).concatMap(i - Flux.just(i * 10, i * 20)).subscribe(System.out::println);buffer()将流中的元素收集为集合 可以传入两个参数分别为 maxSize 和 skip。其中maxSize 代表数据切割后每个集合的最大长度skip 代表每一次切割后切割器切割起点跳跃的元素个数 bufferTimeout()按照时间间隔切割对流中的数据进行收集 可以传入两个参数分别为 maxSize 和 maxTime。其中maxSize 代表数据切割后每个集合的最大长度maxTime 代表切割的最大时间间隔 bufferWhile()当 Predicate 为 true 时才收集当前元素 bufferUntil()直到 Predicate 为 true 时才收集一次所有元素 当第二个参数为 true 时满足条件的元素将作为集合的首个元素若为 false则满足条件的元素为最后一个元素默认 Flux.range(1, 10).buffer(3, 3).subscribe(System.out::println);Flux.interval(Duration.ofMillis(100L)).bufferTimeout(9, Duration.ofMillis(1000L)).subscribe(System.out::println);Flux.range(1, 10).bufferWhile(i - i % 2 0).subscribe(System.out::println);Flux.range(1, 10).bufferUntil(i - i % 2 0, false).subscribe(System.out::println);System.in.read(); take()提取指定数量的元素或按时间间隔提取元素 takeLast()提取最后 n 个元素 takeWhile()当 Predicate 返回 true 时才进行提取 takeUntil()提取元素直到 Predicate 返回 true skip()跳过指定条数或跳过指定时间间隔 last()取流中的最后一个元素 next()取流中的第一个元素 Flux.range(1, 1000).take(10).subscribe(System.out::println);Flux.interval(Duration.ofMillis(11)).take(Duration.ofMillis(100)).subscribe(System.out::println);Flux.range(1, 1000).takeLast(10).subscribe(System.out::println);Flux.range(1, 1000).takeWhile(i - i 20).subscribe(System.out::println);Flux.range(1, 1000).takeUntil(i - i 100).subscribe(System.out::println);Flux.just(1, 2, 3, 4, 5, 6, 7).skip(2).subscribe(System.out::println);Flux.interval(Duration.ofMillis(100)).skip(Duration.ofMillis(300)).subscribe(System.out::println);Flux.just(1, 2, 3, 4, 5).last().subscribe(System.out::println);Flux.just(1, 2, 3, 4, 5).next().subscribe(System.out::println););System.in.read(); reduce()对流中数据进行规约聚合 reduceWith()对流中数据进行规约聚合第一个参数可以通过 Supplier 设置初始值 groupBy() 通过一个策略 key 将一个 Flux分割为多个组。 Flux.range(1, 100).reduce(Integer::sum).subscribe(System.out::println);Flux.range(1, 100).reduceWith(() - 100, Integer::sum).subscribe(System.out::println);Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9).groupBy(i - i % 2 0 ? even : odd).concatMap(i - i.defaultIfEmpty(-1).map(String::valueOf).startWith(i.key())).subscribe(System.out::println); flatMapMany() flatMapMany() 方法用于将单值流Mono转换为多值流Flux。它将 Mono 中的元素映射为一个 Publisher并将这些 Publisher 的元素合并成一个 Flux。 MonoString mono Mono.just(Hello);mono.flatMapMany(s - Flux.just(s.split())).subscribe(System.out::println);transform() transform() 方法用于将流Flux 或 Mono转换为另一种流。它接受一个转换函数该函数接收一个流并返回一个新的流。这种方法通常用于将通用的转换逻辑封装在一个函数中以便在多个地方重用。 FunctionFluxString, FluxString transformToUpperCase flux - flux.map(String::toUpperCase);FluxString flux Flux.just(a, b, c);flux.transform(transformToUpperCase).subscribe(System.out::println);Test//把流变形成新数据void transform() {AtomicInteger atomic new AtomicInteger(0);FluxString flux Flux.just(a, b, c).transform(values - {// atomicif (atomic.incrementAndGet() 1) {//如果是第一次调用老流中的所有元素转成大写return values.map(String::toUpperCase);} else {//如果不是第一次调用原封不动返回return values;}});//transform 无defer不会共享外部变量的值。 无状态转换; 原理无论多少个订阅者transform只执行一次//transform 有defer会共享外部变量的值。 有状态转换; 原理无论多少个订阅者每个订阅者transform都只执行一次flux.subscribe(v - System.out.println(订阅者1v v));flux.subscribe(v - System.out.println(订阅者2v v));}2.3事件感知 信号类型 在响应式编程中信号是数据流的核心概念主要包括正常信号和异常信号。SignalType 枚举类定义了各种信号类型 SUBSCRIBE被订阅。REQUEST请求了 N 个元素。CANCEL流被取消。ON_SUBSCRIBE在订阅的时候。ON_NEXT在元素到达的时候。ON_ERROR在流发生错误的时候。ON_COMPLETE在流正常完成的时候。AFTER_TERMINATE中断以后。CURRENT_CONTEXT当前上下文。ON_CONTEXT感知上下文。 doOnXxx API 触发时机 * 1、doOnNext每个数据流的数据到达的时候触发
- 2、doOnEach每个元素流的数据和信号到达的时候触发
- 3、doOnRequest 消费者请求流元素的时候
- 4、doOnError流发生错误
- 5、doOnSubscribe: 流被订阅的时候
- 6、doOnTerminate 发送取消/异常信号中断了流
- 7、doOnCancle 流被取消
- 8、doOnDiscard流中元素被忽略的时候 //MonoInteger 只有一个Integer//FluxInteger 有很多Integerpublic void fluxDoOn(String[] args) throws IOException, InterruptedException { // MonoInteger just Mono.just(1); // // just.subscribe(System.out::println);//空流: 链式API中下面的操作符操作的是上面的流。// 事件感知API当流发生什么事的时候触发一个回调,系统调用提前定义好的钩子函数Hook【钩子函数】doOnXxxFluxInteger flux Flux.range(1, 7).delayElements(Duration.ofSeconds(1)).doOnComplete(() - {System.out.println(流正常结束…);}).doOnCancel(() - {System.out.println(流已被取消…);}).doOnError(throwable - {System.out.println(流出错… throwable);}).doOnNext(integer - {System.out.println(doOnNext… integer);}); //有一个信号此时代表完成信号flux.subscribe(new BaseSubscriberInteger() {Overrideprotected void hookOnSubscribe(Subscription subscription) {System.out.println(订阅者和发布者绑定好了 subscription);request(1); //背压}Overrideprotected void hookOnNext(Integer value) {System.out.println(元素到达 value);if (value 5) {request(1);if (value 3) {int i 10 / 0;}} else {cancel();//取消订阅}; //继续要元素}Overrideprotected void hookOnComplete() {System.out.println(数据流结束);}Overrideprotected void hookOnError(Throwable throwable) {System.out.println(数据流异常);}Overrideprotected void hookOnCancel() {System.out.println(数据流被取消);}Overrideprotected void hookFinally(SignalType type) {System.out.println(结束信号 type);// 正常、异常 // try { // //业务 // }catch (Exception e){ // // }finally { // //结束 // }}});Thread.sleep(2000);// FluxInteger range Flux.range(1, 7);System.in.read();} public void doOnXxxx(String[] args) {// 关键doOnNext表示流中某个元素到达以后触发我一个回调// doOnXxx要感知某个流的事件写在这个流的后面新流的前面Flux.just(1, 2, 3, 4, 5, 6, 7, 0, 5, 6).doOnNext(integer - System.out.println(元素到达 integer)) //元素到达得到时候触发.doOnEach(integerSignal - { //each封装的详细System.out.println(doOnEach.. integerSignal);})//1,2,3,4,5,6,7,0.map(integer - 10 / integer) //10,5,3,.doOnError(throwable - {System.out.println(数据库已经保存了异常 throwable.getMessage());}).map(integer - 100 / integer).doOnNext(integer - System.out.println(元素到哈 integer)).subscribe(System.out::println);}2.4Exception 异常处理 doOnError()异常监听监听到异常的处理逻辑 onErrorReturn()产生异常时返回消息给订阅者 Flux.just(1, 2, 3)// concatWith() 可以连接一个新的流.concatWith(Flux.error(new Exception(手动模拟异常…))).doOnError(Throwable::printStackTrace).onErrorReturn(产生异常返回 500…).subscribe(System.out::println);subscribe()可以通过传入参数指定异常处理 参数1定义正常消费逻辑 参数2定义异常处理逻辑 参数3定义消费完成的逻辑 Flux.just(1, 2, 3)// concatWith() 可以连接一个新的流.concatWith(Flux.error(new Exception(手动模拟异常…))).subscribe(System.out::println,System.err::println,() - System.out.println(完成…));onErrorResume()产生异常后重新产生新的流 Flux.just(1, 2, 3)// concatWith() 可以连接一个新的流.concatWith(Flux.error(new Exception(手动模拟异常…))).onErrorResume(throwable - {System.out.println(throwable.getMessage());;return Flux.just(1, 1, 1);}).subscribe(System.out::println);retry()产生异常后进行重试参数为重试次数 Flux.just(1).delayElements(Duration.ofSeconds(3)).log().timeout(Duration.ofSeconds(2)).retry(2) // 把流从头到尾重新请求一次.onErrorReturn(2).map(i- ihaha).subscribe(v- System.out.println(v v));System.in.read(); 2.5流日志 Flux.range(1, 7) // .log() //日志 onNext(1~7).filter(i - i 3) //挑出3的元素 // .log() //onNext(4~7).map(i - haha- i).log() // onNext(haha-4 ~ 7).subscribe(System.out::println); 2.6背压和请求重塑 1、buffer缓冲 FluxListInteger flux Flux.range(1, 10) //原始流10个.buffer(3).log();//缓冲区缓冲3个元素: 消费一次最多可以拿到三个元素 凑满数批量发给消费者 // // //一次发一个一个一个发 // 10元素buffer(3)消费者请求4次数据消费完成 2、limit限流 Flux.range(1, 1000).log()//限流触发看上游是怎么限流获取数据的.limitRate(100) //一次预取30个元素 第一次 request(100)以后request(75).subscribe(); 2.7以编程方式创建序列-Sink 在响应式编程中Sink 是一种用于手动推送数据到流中的工具。Reactor 提供了 Sinks 类来创建不同类型的 Sink例如单值的 Sink.One、多值的 Sink.Many 和空值的 Sink.Empty 等。下面我们将介绍如何使用 Sink 来以编程方式创建序列。 Sinks.many(); //发送Flux数据。 Sinks.one(); //发送Mono数据 Sinks 接受器数据管道所有数据顺着这个管道往下走的 S inks.many().unicast(); //单播 这个管道只能绑定单个订阅者消费者 Sinks.many().multicast();//多播 这个管道能绑定多个订阅者 Sinks.many().replay();//重放 这个管道能重放元素。 是否给后来的订阅者把之前的元素依然发给它 Testvoid sinks() throws InterruptedException, IOException {// Flux.create(fluxSink - { // fluxSink.next(111) // })// Sinks.many(); //发送Flux数据。 // Sinks.one(); //发送Mono数据// Sinks 接受器数据管道所有数据顺着这个管道往下走的//Sinks.many().unicast(); //单播 这个管道只能绑定单个订阅者消费者//Sinks.many().multicast();//多播 这个管道能绑定多个订阅者//Sinks.many().replay();//重放 这个管道能重放元素。 是否给后来的订阅者把之前的元素依然发给它// 从头消费还是从订阅的那一刻消费// Sinks.ManyObject many Sinks.many() // .multicast() //多播 // .onBackpressureBuffer(); //背压队列//默认订阅者从订阅的那一刻开始接元素//发布者数据重放 底层利用队列进行缓存之前数据 // Sinks.ManyObject many Sinks.many().replay().limit(3); // // new Thread(()-{ // for (int i 0; i 10; i) { // many.tryEmitNext(a-i); // try { // Thread.sleep(1000); // } catch (InterruptedException e) { // throw new RuntimeException(e); // } // } // }).start(); // // // // //订阅 // many.asFlux().subscribe(v- System.out.println(v1 v)); // // new Thread(()-{ // try { // Thread.sleep(5000); // } catch (InterruptedException e) { // throw new RuntimeException(e); // } // many.asFlux().subscribe(v- System.out.println(v2 v)); // }).start();FluxInteger cache Flux.range(1, 10).delayElements(Duration.ofSeconds(1)) //不调缓存默认就是缓存所有.cache(1); //缓存两个元素 默认全部缓存cache.subscribe();//缓存元素;// 最定义订阅者new Thread(()-{try {Thread.sleep(5000);} catch (InterruptedException e) {throw new RuntimeException(e);}cache.subscribe(v- System.out.println(v v));}).start();System.in.read();}2.8自定义元素处理 handle() 方法是 Reactor 中一个非常强大的操作符它允许你同时处理和转换流中的元素。它的功能类似于 map() 和 filter() 的结合体能够在处理元素的同时决定是否将其传递给下游。 import reactor.core.publisher.Flux;public class HandleExample {public static void main(String[] args) {Flux.range(1, 10).handle((value, sink) - {System.out.println(拿到的值 value);if (value % 2 0) {sink.next(张三 value); // 仅向下游发送偶数}// 可以在这里添加更多的逻辑例如错误处理或完成信号}).log() // 日志.subscribe(System.out::println,error - System.err.println(Error: error),() - System.out.println(Completed));} }2.9线程调度 在响应式编程中调度器Schedulers用于控制操作符执行的线程和调度策略。Reactor 提供了一些内置的调度器帮助你管理并发和异步操作。 常见的调度器类型 Schedulers.immediate()在当前线程中执行。Schedulers.single()在一个单独的线程中执行。Schedulers.elastic()在弹性线程池中执行适用于 I/O 密集型操作。Schedulers.parallel()在固定大小的线程池中执行适用于 CPU 密集型操作。Schedulers.boundedElastic()在弹性线程池中执行适用于需要大量线程的 I/O 密集型操作。Schedulers.fromExecutor(Executor executor)使用自定义的 Executor。Schedulers.newSingle()创建一个新的单线程调度器。Schedulers.newParallel()创建一个新的并行调度器。 使用调度器 你可以使用 subscribeOn() 和 publishOn() 方法来指定操作符在不同的调度器上执行。 subscribeOn(Scheduler scheduler)指定订阅操作的调度器。publishOn(Scheduler scheduler)指定接下来操作的调度器。 // 百万数据8个线程每个线程处理100进行分批处理一直处理结束Flux.range(1,10000).buffer(100).parallel(8).runOn(Schedulers.newParallel(yy)).log().flatMap(list-Flux.fromIterable(list)).collectSortedList(Integer::compareTo).subscribe(v- System.out.println(v v));System.in.read(); 2.10Context-API //Context-API https://projectreactor.io/docs/core/release/reference/#contextTest //ThreadLocal在响应式编程中无法使用。//响应式中数据流期间共享数据Context API: Context读写 ContextView只读void threadlocal(){//支持Context的中间操作Flux.just(1,2,3).transformDeferredContextual((flux,context)-{System.out.println(flux flux);System.out.println(context context);return flux.map(i-icontext.get(prefix));})//上游能拿到下游的最近一次数据.contextWrite(Context.of(prefix,哈哈))//ThreadLocal共享了数据上游的所有人能看到; Context由下游传播给上游.subscribe(v- System.out.println(v v));//以前 命令式编程// controller – service – dao//响应式编程 dao(10数据源) – service(10) – controller(10); 从下游反向传播}
- 上一篇: html展示网站源代码免费建站平台哪个好
- 下一篇: html制作网站大众网站平安建设之星
相关文章
-
html展示网站源代码免费建站平台哪个好
html展示网站源代码免费建站平台哪个好
- 站长
- 2026年02月16日
-
html原神网页制作教程wordpress图片地址优化
html原神网页制作教程wordpress图片地址优化
- 站长
- 2026年02月16日
-
html语言大型网站开发免费行情软件在线观看
html语言大型网站开发免费行情软件在线观看
- 站长
- 2026年02月16日
-
html制作网站大众网站平安建设之星
html制作网站大众网站平安建设之星
- 站长
- 2026年02月16日
-
html做的好看的网站WordPress按评论时间排序
html做的好看的网站WordPress按评论时间排序
- 站长
- 2026年02月16日
-
HTML做网站的书籍天津网站建设制作价格
HTML做网站的书籍天津网站建设制作价格
- 站长
- 2026年02月16日






