並發系列64章(並行編程)第五章
前言
並行編程,先來看下概念。並行編程用於分解計算密集型的任務片段,並將它們分配給多個執行緒。
劃重點,這個是計算密集型的東西,而不是IO密集型。也就是說切割成的片段用於計算使用cpu計算,而不是記憶體。
如果一個操作時記憶體密集型那麼並行處理是會起反的效果的,因為io意味著等待。原本等待一段的,現在每一段都要等待。
數據的並行處理
class Program
{
static void Main(string[] args)
{
List<int> intlist = new List<int>();
intlist.Add(1);
intlist.Add(2);
intlist.Add(3);
intlist.Add(4);
RotateMatrices(intlist);
Console.ReadKey();
}
private static void RotateMatrices(List<int> intlist)
{
Parallel.ForEach(intlist, arg =>
{
Console.WriteLine(Thread.CurrentThread.ManagedThreadId.ToString());
});
}
}
結果是:
上述得出一個結論,不是說並行的每一個都是在不同執行緒中,而是說並行的任務可能在同一執行緒。
注意:因為可能在不同執行緒中,對於公共變數注意鎖。
並行聚合
上述並行中,現在有一個需求就是,讓他們並行加上某個數,並得出他們的結果。
class Program
{
static void Main(string[] args)
{
List<int> intlist = new List<int>();
intlist.Add(1);
intlist.Add(2);
intlist.Add(3);
intlist.Add(4);
var reuslt=RotateMatrices(intlist);
Console.WriteLine(reuslt);
Console.ReadKey();
}
private static int RotateMatrices(List<int> intlist)
{
object mutex = new object();
int result = 0;
Parallel.ForEach(source: intlist, localInit: () => 1, body: (item, state, localvalue) =>
{
Console.WriteLine(Thread.CurrentThread.ManagedThreadId.ToString());
return item + localvalue;
},
localFinally: localresult =>
{
lock (mutex)
{
result += localresult;
}
}
);
return result;
}
}
localInit: () => 1 設置了為1,然後並行執行了body部分,最後得出結果相加。
從上面可以得出,並行編程其實是阻塞的。如果要達到更好的效果,需要結合非同步編程。
上面還有個state 沒有用上:
(item, state, localvalue)
這個state 可以 state.stop(); 停止
state.break() 跳出循環。
上面這樣計算是有問題的:
class Program
{
static void Main(string[] args)
{
List<int> intlist = new List<int>();
for (int i = 0; i < 1000; i++)
{
intlist.Add(1);
intlist.Add(2);
intlist.Add(3);
intlist.Add(4);
}
var reuslt=RotateMatrices(intlist);
Console.WriteLine("查看result:"+reuslt);
Console.ReadKey();
}
private static int RotateMatrices(List<int> intlist)
{
object mutex = new object();
int result = 0;
Parallel.ForEach(source: intlist, localInit: () => 1, body: (item, state, localvalue) =>
{
Console.WriteLine(Thread.CurrentThread.ManagedThreadId.ToString());
return item + localvalue;
},
localFinally: localvalue =>
{
lock (mutex)
{
result += localvalue;
}
}
);
return result;
}
}
第一次的結果:
第二次的結果:
兩次結果不一致。
驚喜不驚喜開心不開心?
剛開始我也很迷茫,後來看了一下群里的大佬點播了一下。
請跑一下下面的,一切都會很清晰的。
private static int RotateMatrices(List<int> intlist)
{
object mutex = new object();
int fornumber = 0;
int tasknumber = 0;
int result = 0;
Parallel.ForEach(source: intlist, localInit: () => 1, body: (item, state, localvalue) =>
{
Console.WriteLine(Thread.CurrentThread.ManagedThreadId.ToString());
Console.WriteLine("查看localvalue:"+ localvalue);
Console.WriteLine("item:"+item);
Interlocked.Add(ref fornumber, 1);
return item+localvalue;
},
localFinally: localvalue =>
{
Console.WriteLine("查看Tasklocalvalue:" + localvalue);
lock (mutex)
{
result += localvalue-1;
}
Interlocked.Add(ref tasknumber, 1);
}
);
Console.WriteLine("fornumber:"+fornumber);
Console.WriteLine("tasknumber:" + tasknumber);
return result;
}
重點部分我畫了紅字:
簡化版:
intlist.AsParallel().Sum();
intlist.AsParallel().Aggregate(seed: 0, func: (sum, item) => sum + item);
下一章
我整理了一些:
1.並行調用
2.動態並行
3.並行Linq