>  기사  >  Java  >  java9 Reactive Stream 반응형 프로그래밍 API의 새로운 기능을 사용하는 방법

java9 Reactive Stream 반응형 프로그래밍 API의 새로운 기능을 사용하는 방법

王林
王林앞으로
2023-05-10 13:46:061470검색

1. Java9 반응형 스트림 API

Java 9는 반응형 스트림 프로그래밍을 정의하는 인터페이스 세트를 제공합니다. 이러한 모든 인터페이스는 java.util.concurrent.Flow 클래스에 정적 내부 인터페이스로 정의되어 있습니다. java.util.concurrent.Flow类里面。

java9 Reactive Stream 반응형 프로그래밍 API의 새로운 기능을 사용하는 방법

下面是Java 响应式编程中的一些重要角色和概念,先简单理解一下

发布者(Publisher)是潜在的无限数量的有序数据元素的生产者。 它根据收到的需求(subscription)向当前订阅者发布一定数量的数据元素。

订阅者(Subscriber)从发布者那里订阅并接收数据元素。与发布者建立订阅关系后,发布者向订阅者发送订阅令牌(subscription),订阅者可以根据自己的处理能力请求发布者发布数据元素的数量。

订阅令牌(subscription)表示订阅者与发布者之间建立的订阅关系。 当建立订阅关系后,发布者将其传递给订阅者。 订阅者使用订阅令牌与发布者进行交互,例如请求数据元素的数量或取消订阅。

二、Java响应式编程四大接口

2.1.Subscriber Interface(订阅者订阅接口)

public static interface Subscriber<T> {
    public void onSubscribe(Subscription subscription);
    public void onNext(T item);
    public void onError(Throwable throwable);
    public void onComplete();
}

onSubscribe:在发布者接受订阅者的订阅动作之后,发布任何的订阅消息之前被调用。新创建的Subscription订阅令牌对象通过此方法传递给订阅者。

onNext:下一个待处理的数据项的处理函数

onError:在发布者或订阅遇到不可恢复的错误时调用

onComplete:当没有订阅者调用(包括onNext()方法)发生时调用。

2.2.Subscription Interface (订阅令牌接口)

订阅令牌对象通过Subscriber.onSubscribe()方法传递

public static interface Subscription {    public void request(long n);    public void cancel();}

request(long n)是无阻塞背压概念背后的关键方法。订阅者使用它来请求n个以上的消费项目。这样,订阅者控制了它当前能够接收多少个数据。cancel()由订阅者主动来取消其订阅,取消后将不会在接收到任何数据消息。

2.3.Publisher Interface(发布者接口)

@FunctionalInterface
public static interface Publisher<T> {
    public void subscribe(Subscriber<? super T> subscriber);
}

调用该方法,建立订阅者Subscriber与发布者Publisher之间的消息订阅关系。

2.4.Processor Interface(处理器接口)

处理者Processor 可以同时充当订阅者和发布者,起到转换发布者——订阅者管道中的元素的作用。用于将发布者T类型的数据元素,接收并转换为类型R的数据并发布。

public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}

二、实战案例

现在我们要去实现上面的四个接口来完成响应式编程

Subscription Interface订阅令牌接口通常不需要我们自己编程去实现,我们只需要在知道request()方法和cancle()方法含义即可。

Publisher Interface发布者接口,Java 9 已经默认为我们提供了实现SubmissionPublisher,该实现类除了实现Publisher接口的方法外,提供了一个方法叫做submit()来完成消息数据的发送。

Subscriber Interface订阅者接口,通常需要我们自己去实现。因为在数据订阅接收之后,不同的业务有不同的处理逻辑。

Processor实际上是 Publisher Interface和Subscriber Interface的集合体,有需要数据类型转换及数据处理的需求才去实现这个接口

下面的例子实现的式字符串的数据消息订阅处理

实现订阅者Subscriber Interface

import java.util.concurrent.Flow;
public class MySubscriber implements Flow.Subscriber<String> {
  private Flow.Subscription subscription;  //订阅令牌
  @Override
  public void onSubscribe(Flow.Subscription subscription) {
      System.out.println("订阅关系建立onSubscribe: " + subscription);
      this.subscription = subscription;
      subscription.request(2);
  }
  @Override
  public void onNext(String item) {
      System.out.println("item: " + item);
      // 一个消息处理完成之后,可以继续调用subscription.request(n);向发布者要求数据发送
      //subscription.request(n);
  }
  @Override
  public void onError(Throwable throwable) {
      System.out.println("onError: " + throwable);
  }
  @Override
  public void onComplete() {
      System.out.println("onComplete");
  }
}

SubmissionPublisher消息发布者

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
public class SubmissionPublisherExample {
  public static void main(String[] args) throws InterruptedException {
      ExecutorService executor = Executors.newFixedThreadPool(1);
      SubmissionPublisher<String> sb = new SubmissionPublisher<>(executor, Flow.defaultBufferSize());
      sb.subscribe(new MySubscriber());   //建立订阅关系,可以有多个订阅者
      sb.submit("数据 1");  //发送消息1
      sb.submit("数据 2"); //发送消息2
      sb.submit("数据 3"); //发送消息3
      executor.shutdown();
  }
}

控制台打印输出结果

订阅关系建立
onSubscribe: java.util.concurrent.SubmissionPublisher$BufferedSubscription@27e81a39
item: 数据 1
item: 数据 2

请注意:即使发布者submit了3条数据,MySubscriber也仅收到了2条数据进行了处理。是因为我们在MySubscriber#onSubscribe()方法中使用了subscription.request(2);

java9 Reactive Stream 반응형 프로그래밍 API의 새로운 기능을 사용하는 방법 🎜🎜 다음은 Java 반응형 프로그래밍의 몇 가지 중요한 역할과 개념입니다. 먼저 간단히 이해해 보겠습니다. 게시자는 잠재적으로 정렬된 데이터 요소를 무제한으로 생성합니다. 수신된 수요(구독)를 기반으로 현재 구독자에게 특정 수의 데이터 요소를 게시합니다. 🎜🎜구독자는 게시자로부터 데이터 요소를 구독하고 수신합니다. 게시자와 구독 관계를 맺은 후 게시자는 구독자에게 구독 토큰(구독)을 보내고, 구독자는 자신의 처리 능력에 따라 게시자가 게시한 데이터 요소 수를 요청할 수 있습니다. 🎜🎜구독 토큰(subscription)은 구독자와 게시자 사이에 설정된 구독 관계를 나타냅니다. 구독 관계가 설정되면 게시자는 이를 구독자에게 전달합니다. 구독자는 구독 토큰을 사용하여 데이터 요소 수 요청 또는 구독 취소 등 게시자와 상호 작용합니다. 🎜🎜2. Java 반응형 프로그래밍의 네 가지 주요 인터페이스🎜

2.1.구독자 인터페이스

rrreee🎜onSubscribe: 게시자가 구독자의 구독 작업을 수락한 후 구독 메시지를 게시하기 전에 호출됩니다. . 새로 생성된 구독 구독 토큰 개체는 이 메서드를 통해 구독자에게 전달됩니다. 🎜🎜onNext: 처리할 다음 데이터 항목의 처리 함수 🎜🎜onError: 게시자 또는 구독에서 복구할 수 없는 오류가 발생하면 호출됩니다🎜🎜 onComplete: 구독자 호출(onNext() 메서드 포함)이 발생하지 않을 때 호출됩니다. 🎜<h4>2.2.구독 인터페이스(구독 토큰 인터페이스)</h4>🎜구독 토큰 객체는 <code>Subscriber.onSubscribe() 메소드🎜rrreee🎜request(long n)를 통해 전달됩니다. 는 Non-Blocking BackPressure 개념의 핵심 방법입니다. 구독자는 n개 이상의 소비 아이템을 요청하는 데 사용됩니다. 이러한 방식으로 구독자는 현재 수신할 수 있는 데이터의 양을 제어합니다. <code>cancel()구독자는 구독을 적극적으로 취소합니다. 취소 후에는 데이터 메시지가 수신되지 않습니다. 🎜

2.3.게시자 인터페이스

rrreee🎜이 메서드를 호출하여 구독자와 게시자 간의 메시지 구독 관계를 설정합니다. 🎜

2.4.프로세서 인터페이스

🎜프로세서는 구독자와 게시자의 역할을 모두 수행할 수 있으며 게시자-구독자 파이프라인에서 요소를 변환하는 역할을 합니다. 게시자로부터 T 유형의 데이터 요소를 수신하여 R 유형의 데이터로 변환하고 게시하는 데 사용됩니다. 🎜rrreee🎜2. 실제 사례🎜🎜이제 리액티브 프로그래밍을 완료하려면 위의 네 가지 인터페이스를 구현해야 합니다.🎜🎜구독 인터페이스 구독 토큰 인터페이스는 일반적으로 우리가 직접 프로그래밍할 필요가 없습니다. request() 메소드와 cancel() 메소드의 의미만 알면 됩니다. 🎜🎜Publisher 인터페이스Publisher 인터페이스인 Java 9에서는 기본적으로 SubmissionPublisher 구현을 제공합니다. 게시자 인터페이스를 구현하는 방법 외에도 이 구현 클래스는 submit()이라는 메서드를 제공합니다. 메시지 데이터를 보냅니다. 🎜🎜구독자 인터페이스 구독자 인터페이스는 일반적으로 직접 구현해야 합니다. 데이터 구독이 수신된 후 기업마다 처리 논리가 다르기 때문입니다. 🎜🎜Processor는 실제로 게시자 인터페이스와 구독자 인터페이스의 모음입니다. 이 인터페이스에는 데이터 유형 변환 및 데이터 처리가 필요합니다. 🎜🎜다음 예에서는 문자열 데이터 메시지 구독 처리를 구현합니다.🎜

구독자 인터페이스 구현

rrreee

SubmissionPublisher 메시지 게시자

rrreee🎜콘솔 인쇄 출력 결과🎜
🎜구독 관계 설정
onSubscribe: java.util.concurrent.SubmissionPublisher$BufferedSubscription@27e81a39
item: 데이터 1
item: 데이터 2🎜
🎜참고: 게시자가 3개의 데이터를 제출해도 MySubscriber에서는 2개의 데이터만 처리되었습니다. MySubscriber#onSubscribe() 메서드에서 subscription.request(2);를 사용했기 때문입니다. 이것이 '배압'의 반응형 프로그래밍 효과입니다. 내가 처리할 수 있는 데이터의 양만큼 메시지 게시자에게 얼마나 많은 데이터를 제공해야 하는지 알려줍니다. 🎜

위 내용은 java9 Reactive Stream 반응형 프로그래밍 API의 새로운 기능을 사용하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 yisu.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제