一,C#并行处理
在C#中,并行处理指的是同时执行多个任务或操作,以利用多核或多处理器的优势,从而提高应用程序的性能。C#提供了多种工具和框架来帮助开发者实现并行处理。以下是一些在C#中实现并行处理的方法:
- Parallel 类:
System.Threading.Tasks.Parallel 类提供了一系列静态方法,如 For, ForEach, Invoke 等,用于简化并行循环和操作的执行。
csharp代码
using System; |
|
using System.Threading.Tasks; |
|
class Program |
|
{ |
|
static void Main() |
|
{ |
|
// 使用 Parallel.For 并行执行循环 |
|
Parallel.For(0, 10, i => |
|
{ |
|
Console.WriteLine($"Processing {i} on thread {Thread.CurrentThread.ManagedThreadId}"); |
|
}); |
|
} |
|
} |
2.Task Parallel Library (TPL):
TPL 是基于任务的并行编程模型,它提供了 Task 和 Task<T> 类来表示异步操作。Task.Run 方法用于在线程池线程上异步执行代码。
csharp代码
using System; |
|
using System.Threading.Tasks; |
|
class Program |
|
{ |
|
static async Task Main() |
|
{ |
|
// 使用 Task.Run 在线程池上异步执行方法 |
|
Task task1 = Task.Run(() => DoWork("Task 1")); |
|
Task task2 = Task.Run(() => DoWork("Task 2")); |
|
// 等待所有任务完成 |
|
await Task.WhenAll(task1, task2); |
|
Console.WriteLine("All tasks completed."); |
|
} |
|
static void DoWork(string taskName) |
|
{ |
|
Console.WriteLine($"{taskName} is running on thread {Thread.CurrentThread.ManagedThreadId}"); |
|
// 模拟工作 |
|
Thread.Sleep(1000); |
|
} |
|
} |
3.async 和 await 关键字:
这两个关键字用于编写异步方法,它们允许你以同步的方式编写异步代码,从而避免回调地狱和保持代码的清晰性。
csharp代码
using System; |
|
using System.Threading.Tasks; |
|
class Program |
|
{ |
|
static async Task Main() |
|
{ |
|
Console.WriteLine("Starting asynchronous operations..."); |
|
await Task.WhenAll(PerformAsyncOperation1(), PerformAsyncOperation2()); |
|
Console.WriteLine("All operations completed."); |
|
} |
|
static async Task PerformAsyncOperation1() |
|
{ |
|
await Task.Run(() => |
|
{ |
|
Console.WriteLine("Async operation 1 is running on thread {0}", Thread.CurrentThread.ManagedThreadId); |
|
// 模拟异步工作 |
|
Thread.Sleep(1000); |
|
}); |
|
} |
|
static async Task PerformAsyncOperation2() |
|
{ |
|
await Task.Run(() => |
|
{ |
|
Console.WriteLine("Async operation 2 is running on thread {0}", Thread.CurrentThread.ManagedThreadId); |
|
// 模拟异步工作 |
|
Thread.Sleep(1000); |
|
}); |
|
} |
|
} |
4.数据并行处理:
对于数据集合的处理,可以使用 Parallel.ForEach 或 Parallel.Invoke 来并行处理每个元素或操作。
csharp代码
using System; |
|
using System.Collections.Generic; |
|
using System.Threading.Tasks; |
|
class Program |
|
{ |
|
static void Main() |
|
{ |
|
List<int> numbers = new List<int> { 1, 2, 3, 4, 5 }; |
|
// 使用 Parallel.ForEach 并行处理集合中的每个元素 |
|
Parallel.ForEach(numbers, number => |
|
{ |
|
Console.WriteLine($"Processing {number} on thread {Thread.CurrentThread.ManagedThreadId}"); |
|
}); |
|
} |
|
} |
5.取消操作:
使用 CancellationToken 和 CancellationTokenSource 可以提供取消正在进行的并行或异步操作的能力。
csharp代码
using System; |
|
using System.Threading; |
|
using System.Threading.Tasks; |
|
class Program |
|
{ |
|
static void Main() |
|
{ |
|
CancellationTokenSource cts = new CancellationTokenSource(); |
|
CancellationToken token = cts.Token; |
|
Task task = Task.Run(() => DoWork(token), token); |
|
// 在一段时间后取消任务 |
|
Thread.Sleep(2000); |
|
cts.Cancel(); |
|
} |
二,异步流
在C#中,异步流(Asynchronous Streams)通常指的是使用异步编程模型(如async和await关键字)来处理数据流,这样可以避免阻塞调用线程,提高应用程序的响应性和性能。异步流在处理大量数据、网络请求或IO密集型操作时特别有用。
在C#中,有几种方式可以处理异步流:
- 异步方法(Async Methods):
使用async和await关键字来编写异步方法,这些方法可以返回Task或Task<T>,并且可以在其中执行异步操作,如读取文件、访问网络等。
csharp代码
public async Task<Stream> GetStreamAsync() |
|
{ |
|
// 模拟一个异步操作,例如从网络下载数据 |
|
await Task.Delay(1000); |
|
return new MemoryStream(Encoding.UTF8.GetBytes("Some data")); |
|
} |
2.异步流处理(Asynchronous Stream Processing):
使用StreamReader、StreamWriter、NetworkStream等类型的异步方法,可以逐块读取或写入数据,而不需要一次性加载整个流。
csharp代码
using (var stream = await GetStreamAsync()) |
|
using (var reader = new StreamReader(stream)) |
|
{ |
|
string line; |
|
while ((line = await reader.ReadLineAsync()) != null) |
|
{ |
|
// 处理每一行数据 |
|
Console.WriteLine(line); |
|
} |
|
} |
3.异步枚举器(Asynchronous Enumerators):
C# 8.0引入了异步枚举器,允许你创建可以异步迭代的集合。这通常用于实现异步数据流。
csharp代码
public async IAsyncEnumerable<int> GetAsyncEnumerable() |
|
{ |
|
for (int i = 0; i < 10; i++) |
|
{ |
|
await Task.Delay(100); // 模拟耗时操作 |
|
yield return i; |
|
} |
|
} |
|
// 使用方式 |
|
foreach await (var item in GetAsyncEnumerable()) |
|
{ |
|
Console.WriteLine(item); |
|
} |
4.管道和流(Pipes and Streams):
在.NET Core和.NET 5+中,System.IO.Pipes和System.Threading.Channels命名空间提供了额外的工具来创建和使用管道和通道,这些工具特别适合在进程间或线程间传递异步数据流。
csharp代码
var channel = Channel.CreateUnbounded<int>(); |
|
var reader = channel.Reader; |
|
var writer = channel.Writer; |
|
// 在一个任务中写入数据 |
|
Task.Run(async () => |
|
{ |
|
for (int i = 0; i < 10; i++) |
|
{ |
|
await writer.WriteAsync(i); |
|
await Task.Delay(100); |
|
} |
|
writer.Complete(); |
|
}); |
|
// 在另一个任务中读取数据 |
|
Task.Run(async () => |
|
{ |
|
while (await reader.WaitToReadAsync()) |
|
{ |
|
while (reader.TryRead(out var value)) |
|
{ |
|
Console.WriteLine(value); |
|
} |
|
} |
|
}); |
在处理异步流时,重要的是要确保流的生命周期得到妥善管理,避免资源泄漏。使用using语句可以确保Stream、StreamReader、StreamWriter等资源在使用后被正确释放。此外,当使用异步编程时,务必注意异常处理,因为异步操作可能会抛出未在调用线程上捕获的异常。