[C#]Using RX

我之前在開發Consumer API時,對於Message的接收是使用callback function+event,code review時,

同事建議為什麼不用RX來簡化code,並更簡單處理非同步error handle和multiple threads的concurrency問題。

沒想到MSDN有一系列的RX文章,看來我真的太Lag了,下面我來紀錄一下自己使用Rx非同步開發心得。

RX(Reactive programming)是一個基於觀察者的非同步程式設計模型,

我覺得它讓我更方便使用非同步來處理資料和error handle,

同時省下我開發去預防multiple threads的concurrency問題,

撰寫風格上就是Observables + LINQ+Schedulers,

它讓我更關注business logic並使用message set來操作資料(彈性更高),

而不用擔心底層safe thread、thread deadlock和IO blocking…等問題,

而我第一版的code大多在關心非同步threads問題(blocking or deadlock),

但使用Rx改寫後真的簡化不少,也不用擔心concurrency 問題,

下面簡單來驗證一下Rx的核心,非同步和觀察者模式。

先安裝RX assembly如下圖

沒有使用scheduler(Rx預設為單一執行緒)

 static void SequenceBehavior()
        {
            
            Console.WriteLine($"Starting on threadId:{ Thread.CurrentThread.ManagedThreadId}");
            IObservable<int> sourceObservable = Observable.Range(1, 10);
            IObserver<int> observer = Observer.Create<int>(
                x => Console.WriteLine($"OnNext: {x} on threadId:{Thread.CurrentThread.ManagedThreadId}"),
                ex => Console.WriteLine($"OnError: {ex.Message}"),
                () => Console.WriteLine($"OnCompleted on threadId:{Thread.CurrentThread.ManagedThreadId}"));
            IDisposable subscription = sourceObservable.Subscribe(observer);
            Console.WriteLine($"Subscribed on threadId:{Thread.CurrentThread.ManagedThreadId}");
            Console.WriteLine("Press ENTER to unsubscribe...");
            Console.ReadLine();
            subscription.Dispose();
        }

我們可以看到,當我沒有使用scheduler就沒有非同步執行,

所有的操作都在同一條執行緒且依序處理(Scheduler算是為了非同步而生)。

 

現在來看看怎麼達成非同步(因為我絕對不想封鎖主執行緒)

static void SchedulerBehavior()
        {
            Console.WriteLine($"Starting on threadId:{ Thread.CurrentThread.ManagedThreadId}");
            IObservable<int> sourceObservable = Observable.Range(1, 10);
            IObserver<int> observer = Observer.Create<int>(
                x => Console.WriteLine($"OnNext: {x} on threadId:{Thread.CurrentThread.ManagedThreadId}"),
                ex => Console.WriteLine($"OnError: {ex.Message}"),
                () => Console.WriteLine($"OnCompleted on threadId:{Thread.CurrentThread.ManagedThreadId}"));
            IDisposable subscription = sourceObservable
                .SubscribeOn(NewThreadScheduler.Default)
                .Subscribe(observer);
            Console.WriteLine($"Subscribed on threadId:{Thread.CurrentThread.ManagedThreadId}");
            Console.WriteLine("Press ENTER to unsubscribe...");
            Console.ReadLine();
            subscription.Dispose();
        }

可以看到我只加了SubscribeOn(NewThreadScheduler.Default)一行,

我的AP就變成非同步設計了,Rx真的好威。

 

參考

Implementing the Observer Pattern using Rx.

Using Rx

http://reactivex.io/

Reactive Extensions (Rx) – Part 1 – Replacing C# Events

When Will You Use Rx

PART 4 - Concurrency

Rx - can/should I replace .NET events with Observables?

IScheduler.Schedule vs IScheduler.ScheduleAsync?

.Net中的反应式编程(Reactive Programming)