並發系列64章(並行編程二)第六章

前言

續前一章。

1.並行調用

2.動態並行

3.並行linq

並行調用

指的是調用一批方法,並且這些方法是(大部分)相互獨立的。

static void ProcessArray(double[] array)
{
	Parallel.Invoke(
		() => { ProcessPartialArray(array, 0, array.Length / 2); },
		() => { ProcessPartialArray(array, array.Length / 2, array.Length); }
		);
}

static void ProcessPartialArray(double[] array, int begin,int end)
{
	// 開始計算
}

當然我們不知道到底有多少方法需要並行的時候,可以這樣:

static void DoActionTimes(CancellationToken token, params Action[] action)
{
	Parallel.Invoke(new ParallelOptions { CancellationToken=token} ,action);
}

之所以填寫一個token進來,是希望有一個可以取消的狀態。

動態並行

上面的並行調用中,解決了一個問題,就是在確定要並行的數量的時候,我們通過傳入一個action數組,來實現。

但是呢,有時候我們不確定到底我們多少並行數量。上文中,我們action是一個數組已經確定了數量了,因為數組可以簡單的遍歷,通過下標就可以得到每一個並行委託。

有些卻不能,如鏈表,樹,圖等,複製一些的數據,需要遍歷計算的時候,就是在運行的時候才知道數量級是多少,所以又叫動態並行。

public class Node
{
	public Node left;
	public Node right;
}

static void Travrese(Node current)
{
	// 對current 做一些操作
	if (current.left!=null)
	{
		Task.Factory.StartNew(() => Travrese(current.left),
			CancellationToken.None, TaskCreationOptions.AttachedToParent, TaskScheduler.Default);
	}

	if (current.right != null)
	{
		Task.Factory.StartNew(() => Travrese(current.right),
			CancellationToken.None, TaskCreationOptions.AttachedToParent, TaskScheduler.Default);
	}
}

static void ProcessTree(double[] array)
{
	Node root = new Node();
	var task= Task.Factory.StartNew(() => Travrese(root),
			CancellationToken.None, TaskCreationOptions.AttachedToParent, TaskScheduler.Default);
	task.Wait();
}

關鍵部分,在TaskCreationOptions.AttachedToParent;

//
// 摘要:
//     指定將任務附加到任務層次結構中的某個父級。 默認情況下,子任務(即由外部任務創建的內部任務)將獨立於其父任務執行。 可以使用 System.Threading.Tasks.TaskContinuationOptions.AttachedToParent
//     選項以便將父任務和子任務同步。 請注意,如果使用 System.Threading.Tasks.TaskCreationOptions.DenyChildAttach
//     選項配置父任務,則子任務中的 System.Threading.Tasks.TaskCreationOptions.AttachedToParent 選項不起作用,並且子任務將作為分離的子任務執行。
//     有關詳細資訊,請參閱附加和分離的子任務。

這種情況是父任務和子任務。

在此解釋一下,task.wait() 因為父任務和子任務同步了,所以說這個await是等待了所以任務完成後,這裡的並行是子任務是並行的,當然不能完全這麼說,父父子子,子又是父,大概就是這個意思哈。

如果沒有子任務等待你可以這樣:

var task= Task.Factory.StartNew(() => { },
		CancellationToken.None, TaskCreationOptions. None, TaskScheduler.Default);

task.ContinueWith(t => Trace.WriteLine("Task is dome"),
	CancellationToken.None, TaskContinuationOptions.None, TaskScheduler.Default
	);

上面的code不是並行的範疇,而是非同步編程的範疇了。

當然,上面的動態並行也可以這樣寫。

if (current.left!=null)
{
   var leftTask=Task.Factory.StartNew(() => Travrese(current.left),
		CancellationToken.None, TaskCreationOptions.AttachedToParent, TaskScheduler.Default);
}

if (current.right != null)
{
   var rightTask=Task.Factory.StartNew(() => Travrese(current.right),
		CancellationToken.None, TaskCreationOptions.AttachedToParent, TaskScheduler.Default);
}
Task.WaitAll(leftTask, rightTask);

當然了,上面肯定會報錯,因為leftTask, rightTask拿不到。由此我們可見通過建立父子任務,這樣在做一些判斷的時候 current.right != null

來決定是否創建task的時候,waitall顯得笨重,因為它必須確定task的數量。

並行linq

這個就是希望我們可以使用linq了,linq不僅僅是為了使我們的程式碼方便,而是一種編程思想,還很多好處,這裡就不列舉了,百度很多。

static IEnumerable<int> MultiplyBy2(IEnumerable<int> values)
{
	return values.AsParallel().Select(item => item * 2);
}

這樣是沒有順序的。

return values.AsParallel().AsOrdered().Select(item => item * 2);

這樣是有順序的,試想一下,如果讓我們去實現並行後IEnumerable 按照原來的順序,是不是非常麻煩。

總之,能使用linq就用linq吧。

下一章

整理了基礎的數據流。