The Parallel Programming Of .NET Framework 4.0(3) - Deep Into Task Library

Task Library除了支援Planed/Un plan Exit時的例外處理,及Local Queue、Working Stealing機制外,還有一項很有趣的機制,那就是Continue With機制,這個機制允許設計師在一個執行緒結束後,緊接著安排另一個執行緒來執行指定的delegate,以較簡單、白話的說,就是執行緒的流程控管機制。

  

The Parallel Programming Of .NET Framework 4.0(3) - Deep Into Task Library
 
 
/黃忠成
 
 
ContinueWith
 
   Task Library除了支援Planed/Un plan Exit時的例外處理,及Local QueueWorking Stealing機制外,還有一項很有趣的機制,那就是Continue With機制,這個機制允許設計師在一個執行緒結束後,緊接著安排另一個執行緒來執行指定的delegate,以較簡單、白話的說,就是執行緒的流程控管機制。
 
16
 
Continue With機制,允許設計師於執行緒結束的時候,依據結束的狀態,分別指定特定的delegate,這些delegate會在特定狀態時被排入排程,以另一個執行緒來執行。
就圖16來說,我們分別排了三個delegate,在TaskFaulted(有例外發生,而未被正常補捉)Complete(執行緒正常結束)Cancel(執行緒被取消)時執行,請注意!只有在狀態符合時,指定的delegate才會被執行。下面是一個使用ContinueWith的小例子。
 
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
 
namespace ConsoleApplication12
{
    class Program
    {
        static void Main(string[] args)
        {
            Task t = new Task(() =>
                {
                    for (int i = 0; i < 5; i++)
                        Thread.Sleep(500);
                });
            t.ContinueWith((task) =>
                {
                    Console.WriteLine("Finish");
                });
            t.Start();
            Console.ReadLine();
        }
    }
}
 
呼叫ContinueWith而未帶任何參數時,預設為None模式,也就是不管Task發生例外或是取消、還是正常結束,都會執行指定的delegate,設計師如使用此法呼叫ContinueWith,通常必須自己判斷Task的結束狀態(Task.Status),這也是為何ContinueWith接受的delegate必須傳入Task物件的原因。
如果需要於不同狀態來執行不同的delegate,可於呼叫ContinueWith時傳入TaskContinuationOptions參數。
 
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
 
namespace ConsoleApplication12
{
    class Program
    {
        static void Main(string[] args)
        {
            Task t = new Task(() =>
                {
                    for (int i = 0; i < 5; i++)
                        Thread.Sleep(500);
                });
 
            t.ContinueWith((task) =>
                {
                    Console.WriteLine("Finish");
                },TaskContinuationOptions.OnlyOnRanToCompletion);
 
            t.ContinueWith((task) =>
            {
                Console.WriteLine("Canceled");
            }, TaskContinuationOptions.OnlyOnCanceled);
 
            t.ContinueWith((task) =>
            {
                Console.WriteLine("Faulted");
            }, TaskContinuationOptions.OnlyOnFaulted);
 
            t.Start();
            Console.ReadLine();
        }
    }
}
 
因為ContinueWith是於特定狀態時,將delegate排入執行緒排程來執行,所以其會回傳一個Task物件,如果需要的話,也可以使用串接的方式來串接Task
 
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
 
namespace ConsoleApplication12
{
    class Program
    {
        static void Main(string[] args)
        {
            Task t = new Task(() =>
                {
                    for (int i = 0; i < 5; i++)
                        Thread.Sleep(500);
                });
 
            t.ContinueWith((task) =>
                {
                    Console.WriteLine("Finish");
                },TaskContinuationOptions.OnlyOnRanToCompletion).ContinueWith((task)=>
                    {
                        Console.WriteLine("chain continue with.");
                    },TaskContinuationOptions.OnlyOnRanToCompletion);
 
            t.ContinueWith((task) =>
            {
                Console.WriteLine("Canceled");
            }, TaskContinuationOptions.OnlyOnCanceled);
 
            t.ContinueWith((task) =>
            {
                Console.WriteLine("Faulted");
            }, TaskContinuationOptions.OnlyOnFaulted);
 
            t.Start();
            Console.ReadLine();
        }
    }
}
 
執行結果如下:
 
Finish
chain continue with.
 
 
下表為TaskContinuationOptions可能的值,分成狀態及執行ContinueWith指定之delegateTaskCreationOptions兩類,這兩類值可以以OR方式合併使用的。
 
 
說明
Task CreationOptions
AttachedToParent
ContinueWith所指定的delegate,會附加到外層的Task上,請注意!不是附加到呼叫ContinueWithTask,而是外層。
ExecuteSynchronously
ContinueWith所指定的delegate,會執行於該Task相同的執行緒中,也就是說:
t.ContinueWith( (task)=>
{
   //此處將執行於t這個Task所產生的
   //執行緒中
}
, TaskContinuationOptions.ExecuteSynchronously
);
PS:此值不可與LongRunning合用。
LongRunning
ContinueWith所指定的delegate,不會排入Pool排程,而是直接建立一個Thread執行。
PreferFairness
ContinueWith所指定的delegate,將被排入Global Queue
Task Exit Status
None
預設值,不論執行緒結束時狀態,一律執行指定的delegate,並且未指定上述的四種Task Creation Options,也就是說此delegate將以一般型態被排入Local Queue執行。
NotOnCanceled
如果執行緒不是以Canceled結束,執行指定的delegate,情況包含:正常結束、Faulted結束。
NotOnFaulted
如果執行緒不是以Faulted結束,執行指定的delegate,情況包含:正常結束、Cancel結束。
NotOnRanToCompletion
如果執行緒不是正常結束,執行指定的delegate,情況包含:FaultedCancel結束。
OnlyOnCanceled
當執行緒以Cancel狀態結束時,執行指定的delegate
OnlyOnFaulted
當執行緒以Faulted狀態結束時,執行指定的delegate
OnlyOnRanToCompletion
當執行緒以正常狀態結束時,執行指定的delegate
 
OnlyXXX為參數的模式很容易理解,就是在特定狀態下執行delegateNotXXX的模式就是非在指定狀態下執行delegate,見下面的例子。
 
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
 
namespace ConsoleApplication12
{
    class Program
    {
        static void Main(string[] args)
        {
            Task t = new Task(() =>
                {
                    for (int i = 0; i < 5; i++)
                        Thread.Sleep(500);
                });
 
            t.ContinueWith((task) =>
                {
                    Console.WriteLine("Finish");
                }, TaskContinuationOptions.OnlyOnRanToCompletion);
 
            t.ContinueWith((task) =>
            {
                Console.WriteLine("Error");
            }, TaskContinuationOptions.NotOnRanToCompletion);
 
 
            t.Start();
            Console.ReadLine();
        }
    }
}
 
這個例子於正常結束時印出Finish,非正常結束如CancelFaulted時印出Error。你可以將NotXXXOR方式連結起來,例如NotOnFaulted|NotOnCanceled就會變成OnlyRanToCompletionNotOnFaulted|NotOnRanToCompletion就會變成OnlyOnCanceled,但是請注意一點,你不能將OnlyXXXOR方式連起來,OnlyOnCanceled|OnlyOnFaulted不等於NotOnRanToCompletion
必要時,你也可以透過以OR串接Task Creation Options來指定ContinueWith所產生的Task排程模式,下例會將ContinueWith所指定的Task排入Global Queue
 
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
 
namespace ConsoleApplication12
{
    class Program
    {
        static void Main(string[] args)
        {
            Task t = new Task(() =>
                {
                    for (int i = 0; i < 5; i++)
                        Thread.Sleep(500);
                    throw new Exception("TEST");
                });
 
            t.ContinueWith((task) =>
                {                   
                    Console.WriteLine("Finish");                   
                }, TaskContinuationOptions.OnlyOnRanToCompletion);
 
            t.ContinueWith((task) =>
            {
                Console.WriteLine("Error");
            },TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.PreferFairness );
 
 
            t.Start();
            Console.ReadLine();
        }
    }
}
 
因為ContinueWith內部也是使用Task,所以你也能指定CancellationToke給它,處理ContinueWith所指定之delegateCancel狀態。
 
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
 
namespace ConsoleApplication12
{
    class Program
    {
        static void Main(string[] args)
        {
            Task t = new Task(() =>
                {
                    for (int i = 0; i < 5; i++)
                        Thread.Sleep(500);
                });
            CancellationTokenSource cts = new CancellationTokenSource();
            t.ContinueWith((task) =>
                {
                    Thread.Sleep(3000);
                    Console.WriteLine("Finish");
                    cts.Token.ThrowIfCancellationRequested();
                },cts.Token).ContinueWith((task)=>
                    {
                        Console.WriteLine("Cancel on Finish");
                    },TaskContinuationOptions.OnlyOnCanceled); ;
            t.Start();
            Thread.Sleep(3000);
            cts.Cancel();
            Console.ReadLine();
        }
    }
}
 
如果是使用有回傳值的Task,那麼就必須指定delegate的參數。
 
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
 
namespace ConsoleApplication12
{
    class Program
    {
        static void Main(string[] args)
        {
            Task<int> t = new Task<int>(() =>
                {
                    int v = 0;
                    for (int i = 0; i < 5; i++)
                    {
                        Thread.Sleep(500);
                        v += i;
                    }
                    return v;
                });
            t.ContinueWith((Task<int> task) =>
                {
                    Thread.Sleep(3000);
                    Console.WriteLine(task.Result);
                    Console.WriteLine("Finish");
                });
            t.Start();
            Console.ReadLine();
        }
    }
}
 
ContinueWith內部是Task,所以我們自然也能指定其產生有傳回值的Task<TResult>物件。
 
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
 
namespace ConsoleApplication12
{
    class Program
    {
        static void Main(string[] args)
        {
            Task<int> t = new Task<int>(() =>
                {
                    int v = 0;
                    for (int i = 0; i < 5; i++)
                    {
                        Thread.Sleep(500);
                        v += i;
                    }
                    return v;
                });
            Task<int> resultTask = t.ContinueWith<int>((Task<int> task) =>
                {
                    Thread.Sleep(3000);
                    Console.WriteLine(task.Result);
                    Console.WriteLine("Finish");
                    return task.Result + 1000;
                });
            t.Start();
            Console.WriteLine(resultTask.Result);
            Console.ReadLine();
        }
    }
}
 
  
 
ExecuteSynchronously
 
    TaskContinuationOptions有個很有趣的值:ExecuteSynchronously,當你於ContinueWith時使用此值時,該ContinueWith所產生的Task將會執行在其所附加的Task所使用的執行緒中,例如下例:
 
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Windows.Forms;
using System.Threading;
using System.Threading.Tasks;
 
namespace WindowsFormsApplication3
{
    public partial class Form1 : Form
    {
        public Form1()
        {
            InitializeComponent();
        }
 
        private void button1_Click(object sender, EventArgs e)
        {
            Task<int> t = new Task<int>(() =>
                {
                    int total = 0;
                    for (int i = 0; i < 10; i++)
                        total += i;
                    MessageBox.Show(Thread.CurrentThread.ManagedThreadId.ToString());
                    return total;
                });
            t.ContinueWith((Task<int> task) =>
                {
                    MessageBox.Show(Thread.CurrentThread.ManagedThreadId.ToString());
                },TaskContinuationOptions.OnlyOnRanToCompletion |
                 TaskContinuationOptions.ExecuteSynchronously);
            t.Start();
        }
    }
}
 
執行此例時,你會發現兩個Task所列出的ManagedThreadId是相同的數字,拿掉ExecuteSynchronously後重新執行,這兩個TaskManagedThreadId將可能完全不同,為何用【可能】這個字眼呢?因為依據Task Library排程的情況不同,運氣好的話這兩個Task會在同一個執行緒(因為Thread Pool重用已結束但未回收執行緒的行為),但運氣不好的話,就不會在同一個執行緒中,當然!寫程式需要的是精確度,不是比誰運氣好,所以當你需要ContinueWith所產生的Task執行在其附加的Task所在執行緒時,請加上ExecuteSynchronously
那何時會用到這個機制呢?最常用到的是Thread Static變數,見下例:
 
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Windows.Forms;
using System.Threading;
using System.Threading.Tasks;
 
namespace WindowsFormsApplication3
{
    public partial class Form1 : Form
    {
        public Form1()
        {
            InitializeComponent();
        }
 
        private void button1_Click(object sender, EventArgs e)
        {
            Task<int> t = new Task<int>(() =>
                {
                    int total = 0;
                    for (int i = 0; i < 10; i++)
                        total += i;
                    ThreadLocalVariable.Data = total;
                    return total;
                });
            t.ContinueWith((Task<int> task) =>
                {
                    MessageBox.Show(ThreadLocalVariable.Data.ToString());
                }, TaskContinuationOptions.OnlyOnRanToCompletion |
                  TaskContinuationOptions.ExecuteSynchronously );
            t.Start();
        }
    }
 
    public class ThreadLocalVariable
    {       
        [ThreadStatic]
        public static int Data = 0;
    }
}
 
此例中將ThreadLocalVariable.Data設為Thread Static形態,要求CLR將此變數視為是每個執行緒都有一份,也就是說當有10個執行緒設定此變數值,那麼每個執行緒都會擁有一份自己的值,A執行緒對ThreadLocalVariable.Data 的變更,不會反應在B執行緒上。此例於t這個Task中設定了Data的值,接著於ContinueWithTask中列出其值,假如ContinueWithTask未執行在t這個Task所執行的執行緒中,Data的值將會維持在零。讀者們可觀察ExecuteSynchronously拿掉前後的差異,就能了解此值存在的意義,
 
 
 
Why ContinueWith??
 
    乍看之下,ContinueWith似乎很好用,但如果細想一下,你會開始懷疑這個機制存在的必要性,因為就算不使用ContinueWith,我們似乎也能得到同樣的結果。
 
static void Main(string[] args)
{
            Task<int> t = new Task<int>(() =>
            {
                int total = 0;
                for (int i = 0; i < 10; i++)
                    total += i;
                Thread.Sleep(5000);
                Console.WriteLine("complete");
                return total;
            });
 
            t.Start();
            Console.WriteLine(t.Result);
            Console.ReadLine();
}
 
 
這個例子的執行結果與下面這個使用ContinueWith的結果幾近相同,差別只在於【complete】列出的順序而已。
 
static void Main(string[] args)
 {
            Task<int> t = new Task<int>(() =>
            {
                int total = 0;
                for (int i = 0; i < 10; i++)
                    total += i;
                Thread.Sleep(5000);
                return total;
            });
            t.ContinueWith((Task<int> task) =>
            {
                Console.WriteLine("complete");
            }, TaskContinuationOptions.OnlyOnRanToCompletion);
            t.Start();
            Console.WriteLine(t.Result);
            Console.ReadLine();
}
 
這兩個例子披露了一個事實,ContinueWith存在的意義似乎不大,其所能達到的效果,我們直接在Task中也能夠以try...catch來完成,也能用cts.Token. IsCancellationRequested來處理取消,那為何要設計ContinueWith這種機制呢?答案是動態及靜態的不同,在未使用ContinueWith的例子中,我們是將結束時處理的程式碼寫在Taskdelegate中,此為靜態,這在執行時期是不可變的,但是當使用ContinueWith後,結束時處理的程式碼是【附加】到Task上的,其在執行時期是可變的,以一個模擬的例子來說,假設我們有一個Class Library,其將訂單的處理機制寫在Task中,由外部來呼叫執行,如下所示:
 
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Data.SqlClient;
 
namespace ClassLibrary1
{
    public class Class1
    {
        public static Task UpdateOrders(SqlConnection conn)
        {
            Task t = new Task( ()=>
            {
                //updating order..
                Thread.Sleep(10000);
            });
            return t;
        }
    }
}
 
呼叫者以下列程式碼執行:
 
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Data.SqlClient;
 
namespace ConsoleApplication12
{
    class Program
    {
 
        static void Main(string[] args)
        {
            SqlConnection conn = new SqlConnection();
            Task t = ClassLibrary1.Class1.UpdateOrders(conn);           
            t.Start();
            t.Wait();
            Console.ReadLine();
        }
    }
}
 
OK,看來很正常沒有任何問題,倘若今天需要在更新訂單後記錄此次更新於另一資料表,按原本的寫法,我們可以在t.Wait後寫下記錄的程式碼,但這麼做就失去了非同步寫法的優勢:
 
Task t = ClassLibrary1.Class1.UpdateOrders(conn);           
 t.Start();
 t.Wait();
//log order
 
要維持非同步的優勢,我們有兩種選擇,一是修改UpdateOrders的原始碼,加入記錄的程式碼,二是將UpdateOrders包在另一個Task中,如下所示:
 
Task t_wrapper = new Task( ()=>
{
 Task t = ClassLibrary1.Class1.UpdateOrders(conn);           
   t.Start();
   t.Wait();
 //log order
});
 
修改UpdateOrders原始碼是比較直接的寫法,但重點是你得有該Class Library的原始碼,Wrapper的寫法也不錯,但缺點是無法保證這兩個Task是跑在同一個執行緒中,此時ContinueWith的優勢就出現了。
 
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Data.SqlClient;
 
namespace ConsoleApplication12
{
    class Program
    {
 
        static void Main(string[] args)
        {
            SqlConnection conn = new SqlConnection();
            Task t = ClassLibrary1.Class1.UpdateOrders(conn);
            t.ContinueWith((task) =>
                {
                    //updating order log.
                    Thread.Sleep(3000);
                    Console.WriteLine("order loged.");
                }, TaskContinuationOptions.OnlyOnRanToCompletion |
                  TaskContinuationOptions.ExecuteSynchronously);
            t.Start();
            t.Wait();
            Console.ReadLine();
        }
    }
}
 
重點就是,當使用ContinueWith時,你附加的動作是完全動態的,這比起原先靜態的寫法靈活許多。