Transformation of sequences
我們消費的序列中的值並不總是我們需要的格式。有時在數據中有太多的雜訊,所以我們要將雜訊去掉。有時,每個值都需要擴展進更豐富的物件中或更多的值。通過組合運算子,Rx允許您控制所消耗的可觀察序列中的值的質量以及數量。
到目前為止,我們已經學習了序列的建立,將值轉移至序列,以及通過過濾,聚合或fold來縮減序列。在本章中,我們將討論序列的轉換。這讓我們得以介紹我們的第三種函數式方法,bind。Rx中的bind函數對序列中的每個元素應用一組轉換以產生新序列。回顧一下:
Ana(morphism) T --> IObservable<T>
Cata(morphism) IObservable<T>
--> T
Bind IObservable<T1>
--> IObservable<T2>
現在我們已經介紹了所有的三個higher order函數,你可能會發現你已經知道他們。Bind和Cata(morphism)由來自Google的MapReduce框架而著名。這裡Google通過他們或許更常見的別名來引用Bind和Cata;Map 及 Reduce。
記住higher order functions的ABCs代表的意思,它是很好的助憶碼。
A na 進入序列,T --> IObservable<T>
B ind 修改序列,IObservable<T1>
--> IObservable<T2>
C ata 離開序列, IObservable<T>
--> T
Select
經典的轉換函式是Select。它允許你提供一個接受TSource的值並返回TResult的值的函式。Select的函式定義良好且簡單,並且建議其最常見的用法是從一種類型轉換到另一種類型,即IObservable <TSource>
到IObservable <TResult>
。
IObservable<TResult> Select<TSource, TResult>( this IObservable<TSource> source, Func<TSource, TResult> selector)
注意,並沒有任何限制說TSource和TResult不能相同。 所以對於我們的第一個範例,我們將用一個整數序列,並對每個值用加3的方式來轉換,以產生另一個整數序列。
var source = Observable.Range(0, 5); source.Select(i=>i+3) .Dump("+3")
輸出:
+3-->3 +3-->4 +3-->5 +3-->6 +3-->7 +3 completed
雖然這可能很有用,但更常見的用途是將值從一種型別轉換為另一種型別。在這個例子中,我們將整數值轉換為字元。
Observable.Range(1, 5) .Select(i =>(char)(i + 64)) .Dump("char");
輸出:
char-->A char-->B char-->C char-->D char-->E char completed
如果我們真的想利用LINQ,我們可以將我們的整數序列轉換為一個匿名型別序列。
Observable.Range(1, 5) .Select(i => new { Number = i, Character = (char)(i + 64) }) .Dump("anon");
輸出:
anon-->{ Number = 1, Character = A } anon-->{ Number = 2, Character = B } anon-->{ Number = 3, Character = C } anon-->{ Number = 4, Character = D } anon-->{ Number = 5, Character = E } anon completed
為了更進一步利用LINQ,我們可以使用query comprehension syntax編寫上述查詢。
var query = from i in Observable.Range(1, 5) select new { Number = i, Character = (char) (i + 64) }; query.Dump("anon");
在Rx中,Select有另一個覆載。第二個覆載為選擇器函數提供兩個參數。附加參數是序列中元素的索引。如果序列中的元素的索引對於selector function很重要,請使用此方法。
Cast and OfType
如果你取得的是一個object序列,例如IObservable <object>
,你可能會發現它不太有用。有一個專門針對IObservable <object>
的方法,它將每個元素轉換為給定的型別,並在邏輯上將其稱為Cast <T>()
。
var objects = new Subject<object>(); objects.Cast<int>().Dump("cast"); objects.OnNext(1); objects.OnNext(2); objects.OnNext(3); objects.OnCompleted();
輸出:
cast-->1 cast-->2 cast-->3 cast completed
然而,如果我們要添加一個不能被轉換成序列的值(型別不符),那麼我們會得到錯誤。
var objects = new Subject<object>(); objects.Cast<int>().Dump("cast"); objects.OnNext(1); objects.OnNext(2); objects.OnNext("3");//Fail
輸出:
cast-->1 cast-->2 cast failed -->Specified cast is not valid.
幸運的是,如果這不是我們想要的,我們可以使用替代的擴充函式OfType <T>()
。
var objects = new Subject<object>(); objects.OfType<int>().Dump("OfType"); objects.OnNext(1); objects.OnNext(2); objects.OnNext("3");//Ignored objects.OnNext(4); objects.OnCompleted();
輸出:
OfType-->1 OfType-->2 OfType-->4 OfType completed
公平地說,雖然這些函式很方便,但我們可以使用我們已知的運算子來建立它們。
//source.Cast<int>(); is equivalent to source.Select(i=>(int)i); //source.OfType<int>(); source.Where(i=>i is int).Select(i=>(int)i);
Timestamp and TimeInterval
由於可觀察序列是非同步的,因此可以很方便地知道接收到元素的時間。Timestamp擴充函式是一種方便的函式,它將序列的元素包裹在輕量Timestamp結構T中。 Timestampe<T>
型別是一個結構,它公開了它所包裝的元素的值,以及使用DateTimeOffset建立的timestamp。
在這個例子中,我們每隔一秒建立一個包含三個值的序列,然後將其轉換為帶Timestamp的序列。 ToString()在Timestampe<T>
上的實做給了我們一個易讀的輸出。
Observable.Interval(TimeSpan.FromSeconds(1)) .Take(3) .Timestamp() .Dump("TimeStamp");
輸出:
TimeStamp-->0@01/01/2012 12:00:01 a.m. +00:00 TimeStamp-->1@01/01/2012 12:00:02 a.m. +00:00 TimeStamp-->2@01/01/2012 12:00:03 a.m. +00:00 TimeStamp completed
我們可以看到,值0、1和2每隔一秒產生。取得絕對timestamp的另一種方法是僅獲取自最後一個元素以來的間隔。TimeInterval擴充函式提供了這個。根據timestamp函式,元素被包裹在輕量結構中。 這個時候的結構是型別TimeInterval <T>
。
Observable.Interval(TimeSpan.FromSeconds(1)) .Take(3) .TimeInterval() .Dump("TimeInterval");
Output:
TimeInterval-->0@00:00:01.0180000 TimeInterval-->1@00:00:01.0010000 TimeInterval-->2@00:00:00.9980000 TimeInterval completed
正如你可以從輸出中看到的,間隔不是正好一秒鐘,但是非常接近。
Materialize and Dematerialize
Timestamp和TimeInterval變換運算子可以證明對記錄和除錯序列有用,Materialize運算子也是如此。Materialize將序列轉換為序列的metadata 代表,將IObservable<T>
轉換為IObservable<Notification<T>>
。通知型別提供序列事件的metadata。
如果我們對一個序列做materialize,我們可以看到回傳的包裝的值。
Observable.Range(1, 3) .Materialize() .Dump("Materialize");
輸出:
Materialize-->OnNext(1) Materialize-->OnNext(2) Materialize-->OnNext(3) Materialize-->OnCompleted() Materialize completed
注意,當來源序列完成時,materialized的序列產生一個“OnCompleted”推送值,然後完成。Notification<T>
是一個具有三個實做的抽像類:
- OnNextNotification
- OnErrorNotification
- OnCompletedNotification
Notification<T>
公開了四個公開屬性,以讓你知道它:Kind、HasValue、Value和Exception。顯然只有OnNextNotification會為HasValue返回true,並且有一個有用的Value實做。還應當明顯的是,OnErrorNotification是唯一的具有Exception值的實做。Kind屬性回傳一個列舉,應該夠你知道有哪些方法適合使用。
public enum NotificationKind { OnNext, OnError, OnCompleted, }
在下一個例子中,我們產生一個有錯誤的序列。注意,序列的最終值是OnErrorNotification。此外,這一個materialized序列沒有錯誤,它成功完成。
var source = new Subject<int>(); source.Materialize() .Dump("Materialize"); source.OnNext(1); source.OnNext(2); source.OnNext(3); source.OnError(new Exception("Fail?"));
輸出:
Materialize-->OnNext(1) Materialize-->OnNext(2) Materialize-->OnNext(3) Materialize-->OnError(System.Exception) Materialize completed
對序列Materializing的實做對於執行序列的分析或記錄非常方便。你可以通過應用Dematerialize擴充函式以解開被materialized的序列。Dematerialize只在IObservable<Notification<TSource>>
上有用。
SelectMany
在上面的轉換運算子中,我們可以看到Select是最有用的。它在其變換輸出中非常的有彈性,甚至可以用於再現一些其他的變換運算子。然而SelectMany函式甚至更強大。在LINQ以及Rx中,bind函式是SelectMany。大多數其他轉換運算子可以使用SelectMany建立。考慮到這一點,可以認為SelectMany可能是LINQ中最被誤解的方法之一。
在我個人理解的Rx,我很艱難的學習SelectMany擴充函式。我的一個同事建議我把它想成"from one, select many",這幫我更好的理解SelectMany。一個更好的定義是,"From one, select zero or more"。如果我們看SelectMany的函式定義,可看到它需要一個來源序列和一個函數作為其參數。
IObservable<TResult> SelectMany<TSource, TResult>( this IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector)
選擇器參數是一個取單一T值並返回一個序列的函式。請注意,選擇器回傳的序列不必與來源型別相同。最後,SelectMany回傳型別與選擇器回傳型別相同。
如果你希望有效地使用Rx,瞭解這個函式是非常重要的,所以讓我們慢慢地來,同樣重要的是注意它與IEnumerable<T>
的SelectMany運算子的些微差別,我們很快就會看到。
我們的第一個範例將採用單一值'3'的序列。我們提供的選擇器函數將產生另一個帶有數字的序列。該結果序列將是從1到所提供的值即3的範圍。因此,我們取序列[3]並從我們的選擇器函數返回序列[1,2,3]。
Observable.Return(3) .SelectMany(i => Observable.Range(1, i)) .Dump("SelectMany");
Output:
SelectMany-->1 SelectMany-->2 SelectMany-->3 SelectMany completed
如果我們將源碼修改為[1,2,3]的序列,就像這樣...
Observable.Range(1,3) .SelectMany(i => Observable.Range(1, i)) .Dump("SelectMany");
...我們現在得到一個輸出,每個序列([1],[1,2]和[1,2,3])的結果被平展以產生[1,1,2,1,2,3] 。
SelectMany-->1 SelectMany-->1 SelectMany-->2 SelectMany-->1 SelectMany-->2 SelectMany-->3 SelectMany completed
最後一個範例更好地說明了SelectMany如何獲取單個值並將其擴展為多個值。當我們將其應用於值序列時,結果是每個子序列被組合以產生最終序列。 在這兩個範例中,我們返回了一個與來源類型相同的序列。這不是一個限制,所以在下一個範例中,我們回傳一個不同的類型。我們將重用將整數轉換為ASCII的Select範例。為此,選擇器函數僅回傳具有單一值的char序列。
Func<int, char> letter = i => (char)(i + 64); Observable.Return(1) .SelectMany( i => Observable.Return(letter(i))) .Dump("SelectMany");
因此,輸入[1],我們回傳序列[A]。
SelectMany-->A SelectMany completed
擴展來源序列以具有許多值,將給我們帶有許多值的結果。
Func<int, char> letter = i => (char)(i + 64); Observable.Range(1,3) .SelectMany(i => Observable.Return(letter(i))) .Dump("SelectMany");
現在,[1,2,3]的輸入產生[[A],[B],[C]],其被平展為僅存[A,B,C]。
SelectMany-->A SelectMany-->B SelectMany-->C
注意,我們有效地重新建立了Select運算子。
最後一個範例將數字對映至字元。由於只有26個字母,忽略大於26的值是很好的且很容易做到。雖然我們必須為來源的每個元素返回一個序列,但沒有任何規則阻止它是一個空序列。在這種情況下,如果元素值是在範圍1-26之外的數字,我們返回一個空序列。
Func<int, char> letter = i => (char)(i + 64); Observable.Range(1, 30) .SelectMany(i => { if (0 < i && i < 27) { return Observable.Return(letter(i)); } else { return Observable.Empty<char>(); } }) .Dump("SelectMany");
輸出:
A B C ... X Y Z Completed
要清楚,對於來源序列[1..30],值1產生序列[A],值2產生序列[B],依此類推,直到值26產生序列[Z]。當來源產生值27時,選擇器函數返回空序列[]。值28,29和30也產生空序列。一旦來自對選擇器的呼叫的所有序列已經被延展以產生最終結果,我們最終得到序列[A..Z]。
現在我們已經瞭解了我們三個higher order函數中的第三個函數,讓我們花時間來思考我們已經學習的一些函式。首先我們可以考慮Where擴充函式。我們首先在縮減序列的章節中看到這個方法。雖然這種方法確實縮減了一個序列,但它不適合functional fold,因為結果仍然是一個序列。考慮到這一點,我們發現Where實際上是bind的一個fit。作為練習,嘗試使用SelectMany運算符寫自己的擴充函式版本。查看最後一個範例以獲得一些幫助...
使用SelectMany寫的Where的擴充函式範例:
public static IObservable<T> Where<T>(this IObservable<T> source, Func<T, bool> predicate) { return source.SelectMany( item => { if (predicate(item)) { return Observable.Return(item); } else { return Observable.Empty<T>(); } }); }
現在我們知道我們可以使用SelectMany來建立Where函式,它應該是一個很自然的過程,我們可以擴充這個以重現其他過濾器,如Skip和Take。
作為另一個練習,嘗試使用SelectMany編寫您自己的Select擴充函式版本。如果你需要一些幫助,參考我們使用SelectMany將int值轉換為char值的範例...
使用SelectMany編寫的Select擴充函式的範例:
public static IObservable<TResult> MySelect<TSource, TResult>( this IObservable<TSource> source, Func<TSource, TResult> selector) { return source.SelectMany( value => Observable.Return(selector(value))); }
IEnumerable vs. IObservable SelectMany
值得注意的是,IEnumerable<T>
SelectMany和IObservable <T>
SelectMany的實做之間的區別。考慮IEnumerable <T>
序列是基於pull和blocking的。這意味著當使用SelectMany處理IEnumerable <T>
時,它會一次向選擇器函數傳遞一個項目,並等待它在從來源請求(pull)下一個值之前處理來自選擇器的所有值 。
考慮一個[1,2,3]的IEnumerable <T>
來源序列。如果我們使用返回[x * 10,(x * 10)+1,(x * 10)+2]序列的SelectMany運算子來處理,我們將得到[[10,11,12],[20 ,21,22],[30,31,32]]。
private IEnumerable<int> GetSubValues(int offset) { yield return offset * 10; yield return (offset * 10) + 1; yield return (offset * 10) + 2; }
然後我們使用以下程式應用GetSubValues函式:
var enumerableSource = new [] {1, 2, 3}; var enumerableResult = enumerableSource.SelectMany(GetSubValues); foreach (var value in enumerableResult) { Console.WriteLine(value); }
所得到的子序列被展開[10,11,12,20,21,22,30,31,32]:
10 11 12 20 21 22 30 31 32
與IObservable <T>
序列的區別在於,對SelectMany的選擇器函數的呼叫不會阻塞,結果序列可以隨時間產生值。這意味著後續的“子”序列可以重疊。 讓我們再次考慮一個[1,2,3]的序列,但是這個時間值是相隔三秒產生的。 選擇器函數也將按照上面的例子產生[x * 10,(x * 10)+1,(x * 10)+2]的序列,然而這些值將相隔4秒。
為了可視化這種非同步數據,我們需要空間和時間的表示。
Visualizing sequences
Let's divert quickly and talk about a technique we will use to help communicate the concepts relating to sequences. Marble diagrams are a way of visualizing sequences. Marble diagrams are great for sharing Rx concepts and describing composition of sequences. When using marble diagrams there are only a few things you need to know
a sequence is represented by a horizontal line time moves to the right (i.e. things on the left happened before things on the right) notifications are represented by symbols: 讓我們快速轉向並談談我們用來幫助傳達與序列相關的概念的技術。Marble 圖是一種可視化序列的方法。Marble 圖是共享Rx概念和描述序列的組成的好方法。當使用Marble 圖時,只有幾個事情你需要知道:
- 序列由水平線表示
- 時間向右移動(即左邊的事情發生在右邊的事情之前)
- 通知由符號表示:
- '0' for OnNext
- 'X' for an OnError
- '|' for OnCompleted
- 許多同步序列可以通過建立序列行來可視化
這是完成的三個值的序列的樣本:
--0--0--0-|
這是一個四個值序列的樣本,然後是一個錯誤:
--0--0--0--0--X
現在回到我們的SelectMany範例,我們可以通過使用值而不是0標記來可視化我們的輸入序列。這是間隔三秒的序列[1,2,3]的Marble 圖表示(注意每個字元表示一秒鐘)。
--1--2--3|
現在我們可以通過引入時間和空間的概念來利用Marble 圖的效用。這裡我們看到由第一個值1產生的序列的可視化,它給出了序列[10,11,12] (請由上往下看)。這些值間隔四秒,但是初始值立即產生。
1---1---1| 0 1 2|
由於值是兩位數字,它們占用兩行,因此值10不會與值1緊接著的值0混淆。我們為選擇器函數產生的每個序列添加一行。
--1--2--3| 1---1---1| 0 1 2| 2---2---2| 0 1 2| 3---3---3| 0 1 2|
現在我們可以可視化來源序列及其子序列,我們應該能夠推導出SelectMany運算子的預期輸出。為了為Marble 圖創建一個結果的行,我們簡單的允許每個子序列的值“落入”新的結果行。
--1--2--3| 1---1---1| 0 1 2| 2---2---2| 0 1 2| 3---3---3| 0 1 2| --1--21-321-32--3| 0 01 012 12 2|
如果我們進行這個練習,現在將其應用於程式中,我們可以驗證我們的Marble 圖。首先我們的函式將產生我們的子序列:
private IObservable<long> GetSubValues(long offset) { //Produce values [x*10, (x*10)+1, (x*10)+2] 4 seconds apart, but starting immediately. return Observable.Timer( TimeSpan.Zero, TimeSpan.FromSeconds(4)) .Select(x => (offset*10) + x) .Take(3); }
這是接收來源序列以產生最終輸出的程式:
// Values [1,2,3] 3 seconds apart. Observable.Interval(TimeSpan.FromSeconds(3)) .Select(i => i + 1) //Values start at 0, so add 1. .Take(3) //We only want 3 values .SelectMany(GetSubValues) //project into child sequences .Dump("SelectMany");
產生的輸出符合我們對Marble 圖的期望。
SelectMany-->10 SelectMany-->20 SelectMany-->11 SelectMany-->30 SelectMany-->21 SelectMany-->12 SelectMany-->31 SelectMany-->22 SelectMany-->32 SelectMany completed
我們之前已經看過在查詢語法中使用Select運算子,因而值得注意的是如何使用SelectMany運算子。Select擴充函式很明顯地映射到query comprehension syntax,而SelectMany不是那麼明顯。正如我們在前面的例子中所看到的,只使用Select的簡單實做如下:
var query = from i in Observable.Range(1, 5) select i;
如果我們想添加一個簡單的where子句,我們可以這樣做:
var query = from i in Observable.Range(1, 5) where i%2==0 select i;
要在查詢中增加SelectMany,我們實際上加了一個額外的from子句。
var query = from i in Observable.Range(1, 5) where i%2==0 from j in GetSubValues(i) select j; //Equivalent to var query = Observable.Range(1, 5) .Where(i=>i%2==0) .SelectMany(GetSubValues);
使用query comprehension syntax的優點是,您可以輕鬆地存取查詢範圍內的其他變數。在這個例子中,我們將來源的值及子值轉換至一匿名型別中。
var query = from i in Observable.Range(1, 5) where i%2==0 from j in GetSubValues(i) select new {i, j}; query.Dump("SelectMany");
輸出:
SelectMany-->{ i = 2, j = 20 } SelectMany-->{ i = 4, j = 40 } SelectMany-->{ i = 2, j = 21 } SelectMany-->{ i = 4, j = 41 } SelectMany-->{ i = 2, j = 22 } SelectMany-->{ i = 4, j = 42 } SelectMany completed
我們準備結束第2章了,這裡的關鍵是讓讀者能夠理解Rx的一個關鍵原則:函數式合成。當我們學會本章後,範例會變得越來越複雜。我們利用LINQ的力量將擴充函式鏈接在一起組成複雜的查詢。
我們沒有一次嘗試學習所有的運算子,我們將它們分組以學習。
- Creation
- Reduction
- Inspection
- Aggregation
- Transformation
對運算子的更深入分析,我們發現大多數運算子實際上是higher order function 概念的特殊化。我們將它們命名為所謂ABC的函數式編程:
- Anamorphism, aka:
- Ana
- Unfold
- Generate
- Bind, aka:
- Map
- SelectMany
- Projection
- Transform
- Catamorphism, aka:
- Cata
- Fold
- Reduce
- Accumulate
- Inject
現在你應該覺得你對如何操作序列有很深的理解。然而,到目前為止我們所學到的東西也都大部分可以應用在IEnumerable序列上。Rx可能比許多人在IEnumerable世界中處理的複雜得多,正如我們在SelectMany函式中看到的。在本書的下一部分中,我們將瞭解Rx天然非同步的特性。憑著我們已經建立的基礎,我們應該能夠解決Rx中更具挑戰性和有趣的功能。