Rxjava 정리

Observable

  • 1.0에서는 Observable 1개 지원
  • 2.x에서는 Observable, Maybe, Flowable 3개 지원

Obserbable

3가지 알림형태

  • onNext : Data 발행
  • onComplete : 모든 이벤트 완료, 완전한 종료
  • onError : 에러, 완전한 종료

생성 - Factory 함수 사용

  • create() : 개발자가 ObservableEmitter를 이용해서 onNext와 oncomplete를 처리해줘야 함
  • just() : 인자 10개까지 가능, 1회성, subscribe시 전체 Stream 송신
  • fromArray() : Object[] Array 형태
  • fromIterable() : Iterator 형태
  • fromCallable() : 동시성 API Callable 인터페이스, Callable Interface 실체화 객체, 비동기 실행후 결과 반환 처리
  • fromFuture() : Executor 인터페이스에 Callable 객체를 통해 Future 객체를 반환 받는다. future.get() 처리 시 Callable 객체 실행 후 결과값 나올때까지 blocking 처리 된다. Executor는 SingleTreadExcutor, FixedThreadPool, CachedThreadPoo을 지원한다.
  • fromPublisher() : Flow API 대상이나 잘 사용 안하는 듯(?) create()와 사용법 동일

기타 - Factory 함수

  • Interval()
  • ragen()
  • timer()
  • defer()

Subscribe

  • 인자 없는 경우 : onError 이벤트만 처리
  • 인자 1개 : onNext 처리
  • 인자 2개 : onNext, onError 처리
  • 인자 3개 : onNext, onError, onComplete 처리

subscribe()는 Disposable 인터페이스 객체를 Return 함
Subscription의 결과 객체임

subscribe가 되어야지 모든 데이터를 Stream 시작하는 경우 Cold Observable
subscribe가 된 시점의 최근 데이타 또는 이후 데이터 부터 Stream 시작하는 경우 Hot Observable

Disposable

  • dispose() : 구독해지 -> 일반적으로는 개발자가 명시 하지 않는 경우 많음
  • isDispose() : 구독 해지 하였는지 확인

Consumer

subscribe()의 onNext 객체로 사용 가능
람다 표현식으로 쉽게 변경 가능

Single 클래스

Observable의 특수 형태, 오직 1개의 데이터만 발행
데이터 하나가 발행 onNext 과 동시에 onComplete 됨 => onSuccess() 로 통합

Single.just()

just로 입력받은 값을 subscribe와 함께 모두 Stream 처리함

just에 2개의 발행을 요청하면 즉시 오류 처리됨

Maybe 클래스

Single 클래스 처럼 최대 데이터를 하나만 가질 수 있음

그러나 0개를 가질 수도 있음

Hot Observable

Cold는 구독자가 있을 때 Stream을 일괄로 보냄

Hot의 경우 구독자 존재 여부와 관계없이 데이터를 발행 함

  • Cold Observable : 파일, 데이터베이스 쿼리, URL Post
  • Hot Observable : 마우스이벤트, 키보드 이벤트, 시스템 이벤트, 센서정보, 주식

Subject 클래스

Cold Observable을 Hot Observable로 변환 시킴

Observable 특성과 Observer 속성을 동시에 갖고 있음

  • AsyncSubject : onComplete 이벤트와 함께 마지막 데이터를 송신함
  • BehaviorSubject : 가장 최근 값을 수신 받거나, Default 값을 받는다
  • PublishSubject : 오직 해당 시간에 발행된 데이터 만을 받는다
  • ReplaySubject : cold Observable처럼 작동함, Hostory 데이터를 Stream으로 받을 수 있음
  • CompletableSubject
  • ConnectableObservable : subscribe() 호출 시점에 데이터를 발행하지 않고 connect가 발생하는 시점에 subscribe를 한 모든 Observer에게 데이터를 발행함. Cold 처리는 하지 않음
  • MaybeSubject
  • SingleSubject
  • UnicastSubject
Data Publisher Data Receiver
Observable
Single
Maybe
Subject
Completable
Subscriber
Observer
Consumer

 

리엑티브 연산자

생성연산자

  • create
  • just
  • fromArray
  • interval : 주어진 시간간격으로 Long 객체를 0부터 1씩 증가하면서 발행
    • Computation Sceduler
    • Main Thread가 종료 되지 않아야 한다.
  • range : 주어진 n 부터 m개의 Integer 발행
  • timer : 일정 시간 후 한번만 실행 하는 함수
  • intervalRange : interval로 작동하지만 Range m개 만큼만 작동
  • defer : 지연 함수로 첫번째 구독자가 cold observe가 끝나면 두번째 구독자가 두번째 colde observe를 수신 받는다
  • repeat : Stream을 N회 반복하는 cold observe이다. 1회 끝나면 바로 다음 subscribe 처리 하게 된다.

변환 연산자

  • map : 1대 1 변환 함수
  • flatMap : 1대 1 or N 변환 함수, 결과는 Observable로 리턴
  • concatMap : 입력과 수신의 속도차이로 수신이 역전현상이 일어날 수 있으나 이를 정정해 준다. 순서가 보장됨
  • switchMap : 순서를 보장하지만 수신 역전이 일어난다면 뒤늦게 역전되어 들어오는 Observable을 삭제 처리
  • groupBy : 어떤 기준으로 동일한 getKey를 갖는 GroupObservable을 수신
  • scan : reduce와 같으나 중간 결과물을 산출 해줌
  • buffer
  • window

필터 연산자

  • filter : 결과값이 true인 값만 통과 시킴, Predicate 형
  • first : 첫번째 값 필터
  • last : 마지막 값 필터
  • take : 최초 N개 값만
  • takeLast : 마지막 N개 값만
  • skip : 최초 N개 값은 Skip
  • skipLast : 마지막 N개 건너뜀
  • distinct
  • debounce
  • reduce : 발행된 모든 데이터를 바탕으로 하나의 결과 값을 얻어 냄

합성 연산자

observable 합성 및 조합

  • zip : Observable source 2개를 결합
  • combineLastest
  • merge
  • concat

 

오류처리 연산자

  • onErrorReturn
  • onErrorResumeNext
  • retry
  • retryUntil

 

유틸리티 연산자

  • subscribeOn
  • observeOn

조건 연산자

Observable 흐름 제어

  • amb : Observable중 먼저 나오는 것을 선택
  • takeUntil : other Observable나오기 전까지 현재 Observable 선택
  • skipUntil : other observable나오기 전까지 현재 Observable 포기
  • all : Observable이 특정 조건을 만족하면 true, 입력된 모든 stream이 true여야한다.

수학과 집합형 연산자

  • count
  • max
  • sum
  • min
  • average

back Pressure 연산자

 

728x90
반응형

'잡동사니' 카테고리의 다른 글

apt-cyg cygwin에 install  (0) 2021.05.11
프로그래머스 vs code 체험  (0) 2020.09.26
Java VM Option (계속)  (0) 2020.09.15