DevOps

Reactive Programming with JDK 9 Flow API

IT오이시이 2017. 6. 20. 23:01
728x90

Reactive Programming with JDK 9 Flow API


https://community.oracle.com/docs/DOC-1006738


반응성 프로그램은 무엇입니까?

 

반응성이 있음 프로그래밍이 발생할 때 어플리케이션의 데이터 아이템에 대응 데이터 항목의 비동기 스트림 처리에 관한 것이다. 데이터의 스트림은 본질적으로 시간이 지남에 따라 발생하는 데이터 아이템들의 시퀀스이다. 메모리 내 데이터의 반복 처리에 비해, 데이터가 스트림으로서 처리되기 때문에 이 모델은 메모리를 더 효율적이다.

 

반응성 프로그래밍 모델에서 게시자 및 구독자가있다. 게시자 가입자 비동기 등록되는 데이터의 스트림을 발행한다.

 

이 모델은 또한, 프로세서에 의해 상기 스트림에 작동 고차 함수를 도입하기위한 메커니즘을 제공한다. 프로세서는 게시자 또는 구독자 변경을위한 필요없이 데이터 스트림을 변환. 프로세서 (또는 프로세서들의 체인)은 다른 하나 개의 데이터 스트림을 변환 게시자와 가입자 사이에 앉는다. 게시자와 가입자는 데이터 스트림에 일어나는 변화에 독립적이다.

 

push.png

 

 

왜 반응성 프로그래밍?

  • 간단한 코드를 더 읽기 만들기.
  • 보일러 플레이트 코드보다 비즈니스 로직에 초점을 둔  추상화를 합니다.
  •  낮은 수준의 스레딩, 동기화 및 동시성 문제를 보다 추상화를 합니다.
  • 스트림 처리는 메모리를 효율적으로  사용하는 의미
  • 이 모델은 문제의 거의 모든 종류를 해결하기 위해 거의 모든 곳에서 적용 할 수 있습니다.

 

 

JDK 9 흐름 API

 

사실상의 표준 반응 스트림 사양에 JDK (9) 대응의 흐름 API를. 반응성 스트림 사양 반응성 프로그램을 표준화하는 사업 중 하나입니다. 몇몇  구현은 이미 반응성 스트림 사양을 지원합니다.

 

플로우 API (및 반응성 스트림 API)는, 어떤 방법으로, 반복자에서 아이디어와 옵저버 패턴의 조합입니다. 반복자는 응용 프로그램이 소스에서 항목을 끌어 풀 모델이다. 관찰자는 광원으로부터의 항목들이 애플리케이션에 푸시 푸시 모델이다. 흐름의 API를 사용하여 응용 프로그램이 처음 N 항목에 대한 요청하고 게시자는 구독자에게 가장 N 항목에 푸시합니다. 그래서 풀의 자사의 혼합 및 프로그래밍 모델에 밀어 넣습니다.

 

풀 push.png

 

 

The Flow API Interfaces (At a glance)

 

  1. @FunctionalInterface   
  2. public static interface Flow.Publisher<T> {  
  3.     public void    subscribe(Flow.Subscriber<? super T> subscriber);  
  4. }   
  5.   
  6. public static interface Flow.Subscriber<T> {  
  7.     public void    onSubscribe(Flow.Subscription subscription);  
  8.     public void    onNext(T item) ;  
  9.     public void    onError(Throwable throwable) ;  
  10.     public void    onComplete() ;  
  11. }   
  12.   
  13. public static interface Flow.Subscription {  
  14.     public void    request(long n);  
  15.     public void    cancel() ;  
  16. }   
  17.   
  18. public static interface Flow.Processor<T,R>  extends Flow.Subscriber<T>, Flow.Publisher<R> {  
  19. }  

 

 

가입자

 

가입자는 콜백 게시자에 등록합니다. 요청하지 않는 데이터 항목은 가입자에 가압되지 않고, 여러 항목을 요청할 수있다. 주어진 구독에 대한 가입자 메소드 호출은 엄격하게 정렬됩니다. 응용 프로그램은 가입자에서 사용할 수있는 다음과 같은 콜백에 반응 할 수 있습니다.

 

 

콜백

기술

onSubscribe

메소드 지정된 구독에 대한 다른 가입자의 메소드를 호출하기 전에 호출됩니다.

OnNext

메소드는 서브 스크립 션의 다음 항목에 호출됩니다.

OnError

다른 가입자 메소드이 구독에 의해 호출되지 후 게시자 또는 구독에 의해 발생 복구 할 수없는 오류에 대해 호출 방법.


게시자는 항목이 가입자에게 발급되는 것을 허용하지 않는 오류가 발생하면 해당 구독자의 OnError받은 다음 더 이상 메시지를 수신하지 않습니다.

onComplete

이 추가적인 가입자 메소드 호출은 다른 가입자 방법은 정액제에 의해 호출되지 않은 후 이미 에러에 의해 종료되지 않은 정액제, 발생할 없다는 것을 알고있을 때 메소드 호출.


그것은 더 이상 메시지가 발급되지 않습니다 것을 알 때, 가입자는 onComplete를받습니다.

 

 


Sample Subscriber

 

  1. import java.util.concurrent.Flow.*;  
  2. ...  
  3.   
  4. public class MySubscriber<T> implements Subscriber<T> {  
  5.   private Subscription subscription;  
  6.   
  7.   @Override  
  8.   public void onSubscribe(Subscription subscription) {  
  9.     this.subscription = subscription;  
  10.     subscription.request(1); //a value of  Long.MAX_VALUE may be considered as effectively unbounded  
  11.   }  
  12.   
  13.   @Override  
  14.   public void onNext(T item) {  
  15.     System.out.println("Got : " + item);  
  16.     subscription.request(1); //a value of  Long.MAX_VALUE may be considered as effectively unbounded  
  17.   }  
  18.   
  19.   @Override  
  20.   public void onError(Throwable t) {  
  21.     t.printStackTrace();  
  22.   }  
  23.   
  24.   @Override  
  25.   public void onComplete() {  
  26.     System.out.println("Done");  
  27.   }  
  28. }

 

 

게시자

 

게시자는 등록 된 가입자에게 데이터 항목의 스트림을 게시합니다. 그것은 일반적으로 집행자를 사용하여 비동기 가입자 항목을 게시합니다. 게시자는 각 가입에 대한 가입자의 메소드 호출은 엄격하게 정렬되어 있는지 확인합니다.

 

Example publishing a stream of data items to Subscribers using JDK's SubmissionPublisher

 

  1. import java.util.concurrent.SubmissionPublisher;  
  2. ...  
  3.     //Create Publisher  
  4.     SubmissionPublisher<String> publisher = new SubmissionPublisher<>();  
  5.   
  6.     //Register Subscriber  
  7.     MySubscriber<String> subscriber = new MySubscriber<>();  
  8.     publisher.subscribe(subscriber);  
  9.   
  10.     //Publish items  
  11.     System.out.println("Publishing Items...");  
  12.     String[] items = {"1""x""2""x""3""x"};  
  13.     Arrays.asList(items).stream().forEach(i -> publisher.submit(i));  
  14.     publisher.close();  

 

 

서브 스크립

 

Flow.Publisher 및 Flow.Subscriber 링크. 가입자가 요청하는 경우에만 항목을 수신하고 구독을 통해 언제든지 취소 할 수 있습니다.

 

 

방법

기술

request

이 구독에 대한 현재 이루어지지 않은 요구에 n 개의 항목의 지정된 수를 추가합니다.

cancel

(결국) 메시지 수신 중지에 구독자가 발생한다.

 

 

프로세서

 

가입자와 출판사 모두 역할을하는 구성 요소. 프로세서는 게시자와 가입자 사이에 위치하며 다른 하나 개의 스트림을 변환한다. 하나 이상의 프로세서를 서로 연결하고, 체인의 최종 처리 결과가있을 수있다, 가입자에 의해 처리된다. 하나가 필요 어떤 프로세서 작성하는 개별 개까지 남아 있도록 JDK는 콘크리트 프로세서를 제공하지 않습니다.

 

Sample Processor to transform String to Integer


  1. import java.util.concurrent.Flow.*;  
  2. import java.util.concurrent.SubmissionPublisher;  
  3. ...  
  4.   
  5. public class MyTransformProcessor<T,R> extends SubmissionPublisher<R> implements Processor<T, R> {  
  6.   
  7.   private Function function;  
  8.   private Subscription subscription;  
  9.   
  10.   public MyTransformProcessor(Function<? super T, ? extends R> function) {  
  11.     super();  
  12.     this.function = function;  
  13.   }  
  14.   
  15.   @Override  
  16.   public void onSubscribe(Subscription subscription) {  
  17.     this.subscription = subscription;  
  18.     subscription.request(1);  
  19.   }  
  20.   
  21.   @Override  
  22.   public void onNext(T item) {  
  23.     submit((R) function.apply(item));  
  24.     subscription.request(1);  
  25.   }  
  26.   
  27.   @Override  
  28.   public void onError(Throwable t) {  
  29.     t.printStackTrace();  
  30.   }  
  31.   
  32.   @Override  
  33.   public void onComplete() {  
  34.     close();  
  35.   }  
  36. }
  37.  

 

 

Sample code to transform data stream using processor

 

  1. import java.util.concurrent.SubmissionPublisher;  
  2. ...  
  3.   
  4.     //Create Publisher  
  5.     SubmissionPublisher<String> publisher = new SubmissionPublisher<>();  
  6.   
  7.     //Create Processor and Subscriber  
  8.     MyFilterProcessor<String, String> filterProcessor = new MyFilterProcessor<>(s -> s.equals("x"));  
  9.   
  10.     MyTransformProcessor<String, Integer> transformProcessor = new MyTransformProcessor<>(s -> Integer.parseInt(s));  
  11.   
  12.     MySubscriber<Integer> subscriber = new MySubscriber<>();  
  13.   
  14.     //Chain Processor and Subscriber  
  15.     publisher.subscribe(filterProcessor);  
  16.     filterProcessor.subscribe(transformProcessor);  
  17.     transformProcessor.subscribe(subscriber);  
  18.   
  19.     System.out.println("Publishing Items...");  
  20.     String[] items = {"1""x""2""x""3""x"};  
  21.     Arrays.asList(items).stream().forEach(i -> publisher.submit(i));  
  22.     publisher.close();  
  23.  

 

 

배압

배압은 게시자는 데이터 항목이 가입자에 의해 소비되는 속도보다 훨씬 빠른 속도로 생성 될 때 만들어집니다. 미처리 항목 버퍼링되는 버퍼의 크기가 제한 될 수있다. 흐름 API는 신호 또는 같은 배압을 처리 할 수있는 API를 제공하지 않지만, 하나는 다시 압력에 대처하기 위해 스스로 구현할 수있는 다양한 전략이있을 수 있습니다. 다시 압력 방법 RxJava 거래를 참조하십시오.

 

 

개요

JDK 9 반응성 프로그래밍 API를 추가하는 것은 좋은 시작이다. 다른 많은 제품들도 기능에 액세스 반응성 Progamming API를 제공하기 시작했다. 흐름 API는 프로그래머가 반응 프로그램을 작성하기 시작할 수 있지만, 에코 시스템은 여전히 ​​진화하고있다.

 

예를 들어, 반응성 프로그램은 여전히 ​​반응성 프로그래밍에 대한 어쩌면 때문에 모든 DB를 지원 API, 기존 API를 사용하여 DB에 접근 끝낼 수 있습니다. - 반응성 프로그램에 따라 달라질 수 있습니다 API를 즉, 아직 반응 프로그래밍 모델을 지원하지 않을 수 있습니다.

 

728x90
반응형