꾸준한 기록

[Combine] Publisher, Subscriber, Operator 본문

Swift

[Combine] Publisher, Subscriber, Operator

Doo혀니 2022. 3. 28. 13:58

이 포스팅에서는 Combine의 특징과 값(데이터)를 전달하는 Publisher, 값(데이터)를 전달받는 Subscriber, 그리고 Publisherd와 Subscriber의 중간에서 값을 조작 및 변환하는 Opeator를 알아볼 것이다.

 

Combine 의 특징

1️⃣ Generic을 사용하여 개발자가 보일러 플레이트 코드를 줄일 수 있도록 해준다.

Combine을 활용하여 비동기 동작을 제네릭하게 만들어두면, 우리는 이것을 그대로 가져다가 재활용해서 쓰면 된다.

 

2️⃣ type-safe 하다. 

Combine은 런타임이 아닌 컴파일 타임에 에러를 체크하여, 개발자가 실행 전에 에러를 해결할 수 있도록 해준다.

 

3️⃣ 개발자가 Composition(병합)을 사용하여 프로그래밍하도록 설계되었다.

애플은 핵심 개념은 간단하고 이해하기 쉽게 만듬으로써 개발자가 쉽게 학습할 수 있도록 하되, 개념들을 조립하여 큰 기능을 만들 수 있도록 Combine을 만들었다.

 

4️⃣ request-driven 이다. 

개발자가 Combine을 사용하며 앱의 메모리 사용과 성능을 제어할 수 있다.

 


📣 Combine에는 리액티브 프로그래밍에 관련된 개념이 많이 포함되어 있다. 대표적인 리액티브 프로그래밍 라이브러리인 Rx에서 '데이터를 발행(emit)한다'고 표현하는 것을 Combine에서는 'send'로 표현한다. 따라서 이 포스팅에서는 Observable이 Observer로 값을 발행하는 것을 값을 '전달'한다 라고 표현할 것이다. 


 

Publisher

  • 데이터를 전달하는 주체이다. (Rx에서 Observable에 해당한다.)
  • Publisher를 사용할 때는 Publisher가 전달할 값의 타입과 전달 과정에서 발생할 수 있는 error의 타입을 명시해야 한다.
  • struct(value 타입)이다. 
  • Publisher는 Subscriber와 같이 사용된다.
  • Publisher는 Publisher 프로토콜을 따른다.

 

 

Publisher 프로토콜

 

Publisher 프로토콜은 2개의 associatedtype을 갖는다. 첫번째 assicatedtype인 Ouput은 Publisher가 전달할 타입을 나타낸다. 두번째 assoicatedtype인 Failure은 Publisher가 발생시킬 수 있는 Error타입을 나타낸다. 만일 Publisher가 수행중에 에러를 발생할 가능성이 없다면, Failure의 타입으로 Never를 지정하면 된다.

 

Publisher는 subscribe(subscribe:)함수를 갖는데, 이 함수의 바디에서 Subscriber와 Publisher가 연결되는 방식을 정의한다.  또한 이 함수의 제약조건(contsraints)으로 인해, Publisher는 자신의 Ouput 타입과 일치하는 Input 타입을 가지며, 자신과 Failure 타입이 동일한 Subscriber 하고만 연결될 수 있다.

 

built-in Publisher

1. Just: Subscriber에게 정해진 값을 발행한 후 종료한다. 

2. Future: 1개의 value를 downstream으로 전달한 후 종료한다. 혹은 실패 알림을 전달한다.

3. Deffered: Subscriber가 subscribe를 할 때 생성되는 Publisher 

4. Empty: 어떤 값도 downstream으로 전달하지 않는 Publisher

5. Fail: 에러를 전달하는 Publisher

6. Record: downstream으로 전달한 값들을 모두 저장해두었다가 새로운 subscriber가 연결될 때마다 저장한 값을 재전달한다.

7. NotificationCenter.default.publisher(for:) : NotificationCenter를 publisher 형태로 얻어올 수 있다.

... 그 밖에도 매우 많다.

 

Apple Developer Documentation

 

developer.apple.com

 

Subscriber

  • Publisher로부터 값을 전달받아서 처리하는 주체이다. (Rx의 Observer에 해당한다.)
  • class(레퍼런스 타입)이다. 
    (참고💡 Publisher로부터 값을 받았을 때, 특정한 처리 로직을 수행하거나 자신의 상태를 변경하므로 Subscriber를 레퍼런스 타입을 만들었다고 한다.)
  • Subscriber 프로토콜을 따른다.

Subscrbier 프로토콜

Subscriber 프로토콜은 Publisher와 마찬가지로 2가지 associatedtype을 갖는다. 첫번째 associatedtype인 Input은 Publisher로부터 전달받는 값의 타입을 의미한다. 두번째 associatedtype인 Failure는 Subscrber가 동작하는 과정에서 발생할 수 있는 에러타입을 나타낸다. Publisher와 마찬가지로 에러를 발생시키지 않는 경우에 Failrure 타입으로 Never가 지정될 수 있다.  

 

Subscriber는 3가지 함수를 갖는다. 

 

대표적인 Subscriber로 Assign과 Sink가 존재한다.

 

Assign

대표적인 Subscriber, Assign

Assign은 initializer의 파라미터로, 객체와 객체의 key path를 받는다. 그리고 Producer로부터 값을 받았을 때 이 값을 intializer를 통해서 받아둔 객체의 프로퍼티에 할당한다.

Assign을 직접 사용해도 되지만, Assign을 Publisher의 extension function형태로 구현한 assign(to:on)을 주로 사용한다.

assign(to:on:)은 호출결과로 Cancellation 프로토콜 구현체를 반환하며 이를 이용하여 Publisher와 Subscriber 간 연결을 끊을 수 있다.

let namePublisher = Just<String>("roadtos7")

let canceller = namePublisher.assign(to: \.someProperty, on: someObject)

// ...

canceller.cancel()

 

Sink

Publisher가 값을 전달해줄 때마다 sink의 closure가 호출된다. 따라서 sink의 클로저에 Publisher가 전달한 값을 가지고 수행할 작업을 작성하면 된다.

assign과 마찬가지로 sink도 호출결과로 Cancellation 구현체를 반환하며, 이를 이용하여 Publisher와의 연결을 끊을 수 있다.

let namePublisher = Just<String>("roadtos7")

let canceller = namePublisher.sink { name in 
	// name을 가지고 하는 작업 서술
}

canceller.cancel()

 

 

💡 Cancellable

Cancellable 프로토콜은 Subscriber와 Publisher의 연결을 끊을 수 있는 cancel() 메서드를 제공한다.

따라서 개발자는 더이상 Subscriber가 필요하지 않거나 Subscriber에서 사용하는 resource를 릴리즈 해주어야 하는 경우, Cancellation의 cancel() 을 호출하면 된다.

Cancellable 프로토콜

일일히 cancellation의 cancel을 호출하기 번거로운 경우, AnyCancellable을 활용하면 된다. 

AnyCancellable은 자신이 deinit될 때 자동으로 cancel을 호출한다.

AnyCancellable 프로토콜

 

Publisher와 Subscriber 연결 로직

이제 Publisher와 Subscriber를 함께 사용할 때의 로직을 정리해보면 다음과 같다.

 

Publisher와 Subscriber간 동작 흐름

1. Subscriber을 인자로 하여 Publisher의 subscribe(_ subscriber:)를 호출한다. Subscriber와 Publisher가 연결된다.

2. Publisher는 Subscriber의 send(subscription:) 메서드를 호출한다. Subscrbier는 send(subscription:) 메서드에서 파라미터로 전달받는 subscription 인스턴스를 사용하여 Publisher에게 특정한 횟수만큼 혹은 무한히 값을 요청한다.

3. Publisher는 Subscriber가 요청한 개수만큼 값을 전달한다. (상황에 따라서 Subscriber가 요구한 수량보다 적게 전달할 수 있다. )

4. Publisher는 값을 모두 전달한 후, 이 사실을 알리기 위해 Completion 알림을 Subscriber에게 전달한다. 혹은 값 전달 과정에서 에러가 발생한다면 Error 알림을 전달한다.

 

 

Operator 

  • Operator는 Publisher와 Subscriber 사이에 위치하여 값을 변형, 삭제하는 등의 역할을 한다.
  • Publisher 프로토콜을 따르므로 엄밀히 말하자면 Operator도 Publisher이다.
  • value type이다. 
  • Publisher로부터 값을 전달받은 후, 이 값을 처리하여 Subscriber에게 전달한다.
  • 이때 Operator에게 값을 전달해주는 Publisher를 upstream이라고 부르며, Operator가 값을 전달하는 Subscriber를 downstream이라고 부른다. 

 

Operator 종류 

Operator의 종류로 크게 4가지가 있다. 

1. Map과 같은 List Operartor

2. Error handling에 사용되는 Operator

3. 로직을 실행하는 Thread 혹은 Queue를 전환하는 Operator

4. Scheduling 과 실행시간 관련 Operator

 

Map

Map의 구현 코드

collection의 Map과 동일한 역할을 한다.

자신에게 들어오는 input을 원하는 타입의 output으로 변환한다.

직접 Map 인스턴스를 생성해서 사용해도 되지만, 애플에서는 많은 operator들을 Publisher의 extension function으로 구현해두었다. 따라서 우리는 Publisher의 메서드를 호출하는 형태로 operator를 간편하게 사용할 수 있다.

Map Operator가 Publisher의 extension function으로 구현되어 있으므로, Publisher에서 바로 호출하여 사용할 수 있다.

 

map을 사용한 예시 코드는 다음과 같다.

let cancellable = NofiicationCenter.default.publisher(for: .graduated, object: merlin)
	.map { note in 
    	return note.userInfo?["NewGrade"] as? Int ?? 0
    }
    .assign(to:\.grade, on:merlin)

 


💡 Operation 사용 Tip

이외에도 많은 operation들이 존재한다. 이렇게 많은 operation 에서 어떤 것을 사용해야 할지 고민하는 것은 꽤나 골치 아픈 일이다. 

따라서 Apple에서는 Combine이 연산자를 작은 기능단위로 제공하고 있으니, 이것을 조합해서 원하는 기능을 만들려고 시도하는 것이 좋으며, 실제 Apple에서도 Composition(병합)을 강조하고 있다.

 

또한, Combine의 연산자는 Swift의 collection api와 동일한 기능과 이름을 가진 경우가 많다. 따라서 Swif의 array에서 사용하던 특정한 연산자와 동일한 기능을 하는 것을 Combine에서 찾는다면, 우선 Publihser에서 해당 array 연산자의 이름을 타이핑해보자. 대부분의 경우 동일한 기능을 하는 operator가 동일한 이름의 Combine operator로 존재할 것이다. 

 

좀더 자세히 코드와 함께 살펴보겠다.

let cancellable = NotificationCenter.default.publisher(for: .graduated, object: merlin)
	.map { note in 
    	return note.userInfo?["NewGrade"] as? Int ?? 0
    }
    .assign(to:\.grade, on:merlin)

위 코드는 앞서 map의 사용 예시코드이다.

map은 NotificationCenter에서 전해주는 Notification에서 "NewGrade 값을 꺼내서 Int로 캐스팅하고, 많일 이 캐스팅 결과가 nil일 경우에는 default 값으로 0을 반환한다.

만일 Notification에 값이 존재하지 않을 경우에는 default 값을 반환하지 않고 그저 skip할 수 있는 방법은 없을까? skip하여 실제로 Notification의 "NewGrade" 에 매칭되는 값만을 모을 수는 없을까?

array에서는 compactMap을 이용하여 구현할 수 있다. Publisher도 compactMap을 가지고 있다.

compactMap은 map의 결과로 nil이 반환되는 것을 필터링해서 없애며, nil이 아닌 값들만 연산 결과로 반환한다. 

 

따라서 compatMap을 이용하여 동일한 기능을 구현한 코드는 다음과 같다.

let cancellable = NotificationCenter.default.publisher(for: .graduated, object: merlin)
	.compactMap { note in 
		return note.userInfo?["NewGrade"] as? Int 
	}
	.assign(to:\.grade, on:merlin)

이렇듯 원하는 기능이 있다면 우선 collection api에서 찾아보고, 이것이 Publisher에서도 동일하게 존재하는지 찾아가보자. 


tryMap

map과 기능이 동일하다. 단, map과 달리 발생한 처리 과정에서 발생한 error를 failure로 변환하여 downstream으로 전달한다.

let namePublisher = NotificationCenter.default.publisher(for: .nameDownload)
	.map { notification in 
		return notification.userInfo?["data"] as! Data
	}
	.tryMap { data in 
		let decoder = JSONDecoder()
		try decoder.decode(Name.self, from: data)
	}

이 예시 코드에서 tryMap은 Data 인스턴스를 Name 인스턴스로 decoding한다.

그런데 decoding 작업은 error 를 발생시킬 수 있으므로 tryMap을 사용한다.

 

decode

Data 인스턴스를 특정한 클래스의 인스턴스로 decoding하는 연산자이다.

tryMap예시의 동작을 decode 연산자를 사용하면 더 간단하게 작성할 수 있다.

// tryMap예시 코드와 같은 동작을 한다.
let namePublisher = NotificationCenter.default.publisher(for: .nameDownload)
	.map { notification in 
		return notification.userInfo?["data"] as! Data
	}
	.decode(Name.self, JSONDecoder())

 

Error Handing 연산자

publisher가 데이터를 처리하고 전달하는 과정에서 에러가 발생할 수 있다.
이런 경우 combine의 에러 핸들링 메서드를 사용하여 에러를 처리해야 한다.

 

Error Handing 연산자 종류

1. assertNoFailure

2. retry 

3. catch

4. mapError

5. setFailrueType

assertNoFailure

upstream publisher가 fail할 경우 fatal error를 일으킨다. 그렇지 않을 경우에는 downtream으로 input을 전달한다.

// tryMap예시 코드와 같은 동작을 한다.
let namePublisher = NotificationCenter.default.publisher(for: .nameDownload)
	.map { notification in 
		return notification.userInfo?["data"] as! Data
	}
	.decode(Name.self, JSONDecoder())
	.assertNoFailure()

 

retry

지정한 횟수만큼 upstream publisher에 다시 연결한다. 만일 3번 retry 하도록 지정했을 때,upstream publisher에서 4번 error가 발생한 경우, 3번째 error 까지는 downstream에 전달하지 않다가, 4번째 error 부터는 downstream 에 전달한다. 

struct WebSiteData: Codable {
    var rawHTML: String
}

let myURL = URL(string: "https://www.example.com")

cancellable = URLSession.shared.dataTaskPublisher(for: myURL!)
    .retry(3)
    .map({ (page) -> WebSiteData in
        return WebSiteData(rawHTML: String(decoding: page.data, as: UTF8.self))
    })
    .catch { error in
        return Just(WebSiteData(rawHTML: "<HTML>Unable to load page - timed out.</HTML>"))
}
.sink(receiveCompletion: { print ("completion: \($0)") },
      receiveValue: { print ("value: \($0)") }
 )

// Prints: The HTML content from the remote URL upon a successful connection,
//         or returns "<HTML>Unable to load page - timed out.</HTML>" if the number of retries exceeds the specified value.

 

catch

upstream 에서 error가 발생할 경우, upstream publisher 대신 catch operator에서 제공하는 대체 publisher를 사용한다.

이때 error가 발생한 upstream publisher와의 연결은 끊어지고, 대체 publisher와 연결된다.

 

// tryMap예시 코드와 같은 동작을 한다.
let namePublisher = NotificationCenter.default.publisher(for: .nameDownload)
	.map { notification in 
		return notification.userInfo?["data"] as! Data
	}
	.decode(Name.self, JSONDecoder())
	.catch {
		return Just(Name.placeHolder)
	}

위 예시 코드에서 catch 실행로직은 다음과 같다.

catch 의 upstream에서 error가 발생할 경우, catch의 closure가 실행된다. 실행 결과 새로운 publisher가 반환되며, 이것이 catch의 upstream을 대체하게 된다. 

하지만 catch를 사용할 경우, error가 발생한 upstream publisher와의 연결이 끊어지기 때문에 NotificationCenter에서 전달하는 알림을 더이상 받을 수 없다. 

기존의 upstream publisher와의 연결을 유지하면서, error가 발생했을 때에만 다른 publisher를 사용하고 싶을 때 flatMap operator를 사용한다.

💡 flatMap

flatMap은 upstream publisher로부터 값을 받아서 이 값을 publisher로 변환한다. 그리고 flatMap이 변환한 결과로 나온 publisher에 subscriber를 연결시킨다.

 

catch 만 사용했을 때 발생했던, 에러가 발생했을 때 upstream publisher와의 연결이 끊어지는 문제를 flatMap과 catch를 함께 사용하여 다음과 같이 해결할 수 있다.

// tryMap예시 코드와 같은 동작을 한다.
let namePublisher = NotificationCenter.default.publisher(for: .nameDownload)
	.map { notification in 
		return notification.userInfo?["data"] as! Data
	}
	.flatMap { data in 
		return Just(data)
        	.decode(Name.self, JSONDecoder())
        	.catch {
            		return Just(Name.placeholder)
            }
	}

map 실행결과로 반환되는 data가 flatMap operator로 전달된다. flatMap operator에서는 map으로부터 받은 data를 가지고 Just publisher를 만든다. Just publisher에서 전달하는 데이터는 decode operator를 통해 Name 인스턴스로 변환된다. 만일 이 decoding 과정에서 에러가 발생할 경우 catch operator가 호출되어 Just(Name.placeholder) publisher로 대체된다. 이때 catch operator의 upstream publisher와의 연결이 끊어지는데, 여기서는 upstream publisher가 Just(data)에 해당하며 namePublisher와의 연결은 그대로 유지된다. 따라서 error 발생 이후에 namePublisher에서 새로운 값을 전달했을 때, subscriber 이 값을 받을 수 있다.

 

mapError

upstream failure를 새로운 error로 변환한다.

upstream이 발생시키는 error 타입과 downstream에서 처리할 수 있는 error 타입이 일치하지 않을 때, 두 error 타입을 일치시키기 위해서 사용한다.

struct DivisionByError: Error {}
struct MyGenericError: Error {var wrappedError: Error }

func myDivide(_ dividend: Dobule, _ divisor: Double) throw -> Double {
	guard divisor != 0 else { throw DivisionByError }
	return dividend / divisor
}

let divisors: [Double] = [5, 4, 3, 2, 1, 0]
divisors.publisher 
	.tryMap { try myDivide(1, $0) }
	.mapError { MyGenericError(wrapped: $0) }
	.sink {
		receiveCompletion: { print ("completion: \($0)") ,
		receiveValue: { print ("value: \($0)", terminator: " ") }
	}

 

setFailureType

upstream publisher의 failure type을 변경한다.

erorr가 발생하지 않는 publisher의 error type을 지정해주어야 할 때 사용한다.

let pub1 = [0, 1, 2, 3, 4, 5].publisher
let pub2 = CurrentValueSubject<Int, Error>(0)
let cancellable = pub1
	.setFailrueType(to: Error.self)
	.combineLatest(pub2)
	.sink (
		receiveCompletion: { print ("completed: \($0)") },
		receiveValue: { print ("value: \($0)")}
	)

 

combineLatest는 같은 input type과 error type을 갖는 pubilisher 간에 사용할 수 있다.

여기서 pub1의 error type이 Never이기 때문에 pub2와 combineLatest를 사용하려면 error type을 바꾸어 주어야 하기에,  setFailureType을 사용했다.

 

💡CurrentValueSubject

single value를 갖는 subject.
이 single value가 바뀔 때마다 바뀐 값을 downstream으로 전달한다.

 

Scheduled Operator

Scheduled Operator는 언제(when), 어디(where)서부터 이벤트 전달되어야 하는지를 지정한다.

RunLoop와 DispatchQueue를 함께 사용할 수 있다.

 

Scheduled Operator 종류

1. delay: 지정한 시간만큼 기다렸다가 값을 전달한다.

2. debounce: 마지막 input이 들어온 후, 일정한 시간이 지난 후에 input을 전달한다.

3. throttle: 지정한 시간 간격마다 값을 전달한다. delay와 차이점은 정해진 시간 간격 내에서 가장 최근 input을 전달한다.

4. receive(on:): downstream이 특정한 thread 혹은 queue에서 값/이벤트를 수신한다.

5. subscribe(on:): subscribe, cancel, request를 수행할 scheduler를 지정한다.

 

예시 코드

// tryMap예시 코드와 같은 동작을 한다.
let namePublisher = NotificationCenter.default.publisher(for: .nameDownload)
	.map { notification in 
		return notification.userInfo?["data"] as! Data
	}
	.flatMap { data in 
		return Just(data)
        	.decode(Name.self, JSONDecoder())
        	.catch {
            		return Just(Name.placeholder)
            }
	}
	.publisher(for: \.name)
	.receive(on: RunLoop.main)

receive(on:)을 통해 값이 메인 스레드를 통해서 전달된다.

 

 

병합 Publisher (Combining Publishers)

Publisher를 병합해주는 Publisher로서 대표적으로 2가지를 소개하고싶다.

1. Zip

2. CombinLatest

 

여러개의 비동기 동작을 실행하고, 결과를 합쳐서 가져올 때 사용하면 유용하다

Zip

input 여러개를 묶어서 하나의 tuple로 만든다.

모든 upstream으로부터 값이 들어온 후에 동작한다.

Publisher A만 값을 전달했을 때

그림처럼 Publisher A와 Publisher B를 zip으로 묶여있다고 할 때, 만일 Publisher A가 Zip에게 A 값을 전달했고 Publisher B는 아직 값을 전달하지 않았다면, Zip은 Publisher B로 부터 값을 받을 때까지 대기한다.

 

Publisher A, Publisher B가 모두 값을 전달했을 때

만약 Publisher B도 값을 전달한다면, 이때 Zip은 두 Publisher에게 받은 값 A, B를 묶어서 tuple로 만들고, 이 tuple을 Subscriber에게 전달한다.

 

이때 주의해야할 점은 tuple로 묶여서 Zip이 처리한 값 A, B는 Zip내부에 남아있지 않다는 것이다. 따라서 만일 Publisher A가 새로운 값 A'을 전달하면, 마찬가지로 Zip은 Publisher B가 새로운 값을 전달할 때까지 대기하게 된다. Publisher B가 값을 전달한 후에 비로소 Zip이 동작한다.

 

 

 

위의 예시는 oragning, decomposing, arranging이라는 3가지 비동기 작업을 동시에 실행한 후, 이 작업의 결과가 모두 true 일 경우에 continueButton의 isEnabled 값을 true로 바꾸는 코드이다.

이렇듯 여러 비동기 작업을 동시에 실행시킨 후, 3작업의 결과를 결합해서 가져와야 하는 경우 주로 Zip을 사용한다.

 

CombineLastest

Zip과 비슷하게 input 여러개를 묶어서 하나의 값으로 변환해 전달한다.

하지만 Zip은 모든 Publisher가 값을 전달해야 작업을 수행할 수 있었던 반면에, CombinLatest는 새로운 값을 전달하는 Publisher가 하나라도 존재하면 동작한다.

이를 위해 CombineLatest는 각 Publisher가 가장 마지막으로 전달한 최신 값을 저장해둔다. 

 

위의 예시는 사용자가 3개의 약관 동의 Swift를 모두 on했을 때, Play 버튼이 활성화되고 동의하지 않는 것이 1개라도 존재한다면, Play버튼이 비활성화되는 코드이다.

위와 같이 어떤 하나의 값이 변경될 때마다 재수행되어야 하는 로직이라면 CombineLatest를 사용하는 것이 좋다.

 

 


참고 자료

Introducing Combine - WWDC 2019 

Swift Combine, 시작하기

Combine in Practice - WWDC 2019

Receiving and Handling Events with Combine

 

Combine in Practice - WWDC19 - Videos - Apple Developer

Expand your knowledge of Combine, Apple's new unified, declarative framework for processing values over time. Learn about how to...

developer.apple.com

 

Swift Combine, 시작하기

Combine 기본 개념 | https://developer.apple.com/documentation/Combine Apple Document 의 Combine 을 보면 다음과 같이 설명된다. ( 번역이 미숙하지만... ) Combine - 이벤트 처리 연산자들을 통해 비동기 이벤트들을 핸

brunch.co.kr

 

Introducing Combine - WWDC19 - Videos - Apple Developer

Combine is a unified declarative framework for processing values over time. Learn how it can simplify asynchronous code like networking,...

developer.apple.com