C# 初识System.IO.Pipelines

写在前面

在进一步了解Socket粘包分包的过程中,了解到了.NET 中的 System.IO.Pipelines,可以更优雅高效的解决这个问题;先跟随官方的示例做个初步的认识。

System.IO.Pipelines 是一个库,旨在使在 .NET 中执行高性能 I/O 更加容易。 该库的目标为适用于所有 .NET 实现的 .NET Standard。
System.IO.Pipelines 具有高性能的流数据分析功能,可以减少代码复杂性。

老规矩通过NuGet安装该类库

代码实现

using System.Buffers;
using System.IO.Pipelines;
using System.Text;

class Program
{
    static async Task Main()
    {
        using var stream = File.OpenRead("lorem-ipsum.txt");

        var reader = PipeReader.Create(stream);
        var writer = PipeWriter.Create(
            Console.OpenStandardOutput(),
            new StreamPipeWriterOptions(leaveOpen: true));

        WriteUserCancellationPrompt();

        var processMessagesTask = ProcessMessagesAsync(reader, writer);
        var userCanceled = false;
        var cancelProcessingTask = Task.Run(() =>
        {
            while (char.ToUpperInvariant(Console.ReadKey().KeyChar) != 'C')
            {
                WriteUserCancellationPrompt();
            }

            userCanceled = true;

            // No exceptions thrown
            reader.CancelPendingRead();
            writer.CancelPendingFlush();
        });

        await Task.WhenAny(cancelProcessingTask, processMessagesTask);

        Console.WriteLine(
            $"\n\nProcessing {(userCanceled ? "cancelled" : "completed")}.\n");
    }

    static void WriteUserCancellationPrompt() =>
        Console.WriteLine("Press 'C' to cancel processing...\n");

    static async Task ProcessMessagesAsync(
        PipeReader reader,
        PipeWriter writer)
    {
        try
        {
            while (true)
            {
                ReadResult readResult = await reader.ReadAsync();
                ReadOnlySequence<byte> buffer = readResult.Buffer;

                try
                {
                    if (readResult.IsCanceled)
                    {
                        break;
                    }

                    if (TryParseLines(ref buffer, out string message))
                    {
                        FlushResult flushResult =
                            await WriteMessagesAsync(writer, message);

                        if (flushResult.IsCanceled || flushResult.IsCompleted)
                        {
                            break;
                        }
                    }

                    if (readResult.IsCompleted)
                    {
                        if (!buffer.IsEmpty)
                        {
                            throw new InvalidDataException("Incomplete message.");
                        }
                        break;
                    }
                }
                finally
                {
                    reader.AdvanceTo(buffer.Start, buffer.End);
                }
            }
        }
        catch (Exception ex)
        {
            Console.Error.WriteLine(ex);
        }
        finally
        {
            await reader.CompleteAsync();
            await writer.CompleteAsync();
        }
    }

    static bool TryParseLines(
        ref ReadOnlySequence<byte> buffer,
        out string message)
    {
        SequencePosition? position;
        StringBuilder outputMessage = new();

        while (true)
        {
            position = buffer.PositionOf((byte)'\n');

            if (!position.HasValue)
                break;

            outputMessage.Append(Encoding.ASCII.GetString(buffer.Slice(buffer.Start, position.Value)))
                        .AppendLine();

            buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
        };

        message = outputMessage.ToString();
        return message.Length != 0;
    }

    static ValueTask<FlushResult> WriteMessagesAsync(
        PipeWriter writer,
        string message) =>
        writer.WriteAsync(Encoding.ASCII.GetBytes(message));
}

调用示例

总结

例子中用到的文本文件是一个以\n 换行符作为结尾的多行文本,微软官方示例没有提供,这个是自己建的测试文件,如果没有检测到\n会抛出异常。

 从运行的结果可以看到,从传入的流中识别以\n结尾,作为数据块的区分,利用这个特性定义数据报文的尾部,实现分包。

相关推荐

  1. C#语言

    2023-12-26 20:20:05       58 阅读
  2. <span style='color:red;'>初</span><span style='color:red;'>识</span><span style='color:red;'>C</span>++

    C++

    2023-12-26 20:20:05      44 阅读
  3. C语言

    2023-12-26 20:20:05       66 阅读
  4. C++(3)

    2023-12-26 20:20:05       55 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2023-12-26 20:20:05       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2023-12-26 20:20:05       101 阅读
  3. 在Django里面运行非项目文件

    2023-12-26 20:20:05       82 阅读
  4. Python语言-面向对象

    2023-12-26 20:20:05       91 阅读

热门阅读

  1. 防御100g的ddos攻击需要花多少钱

    2023-12-26 20:20:05       55 阅读
  2. 掌握这些百度SEO优化技巧

    2023-12-26 20:20:05       49 阅读
  3. Qt判断linux是否存在网卡

    2023-12-26 20:20:05       56 阅读
  4. Ts声明ElementUI控件

    2023-12-26 20:20:05       60 阅读
  5. PTA删除单链表偶数节点(C语言)

    2023-12-26 20:20:05       60 阅读
  6. GBASE南大通用-管理用户定义函数(UDF)

    2023-12-26 20:20:05       56 阅读
  7. 【力扣100】199.二叉树的右视图

    2023-12-26 20:20:05       62 阅读
  8. C++精进之路(4)复合类型

    2023-12-26 20:20:05       52 阅读
  9. AI论文范文:AIGC中的图像转视频技术研究

    2023-12-26 20:20:05       46 阅读