写点什么

以解析 csv 数据为例,讨论 string、char[]、stream 不同类型来源是否能进行高性能读取解析封装可能性

作者:八苦-瞿昙
  • 2024-02-28
    美国
  • 本文字数:7755 字

    阅读完需:约 25 分钟

篇幅较长,所以首先列举结果,也就是我们的目的

核心目的为探索特定场景对不同类型数据进行统一抽象,并达到足够高性能,也就是一份代码实现,对不同类型数据依然高性能


以下为结果,也就是我们的目的:


对 1w 行 csv 数据的 string 进行 RFC4180 csv标准进行解析,


string 类型 csv 应该比 StringReader 性能更高


甚至对比大家使用非常多的 csvhelper 不应该性能差太多


测试代码如下


[MemoryDiagnoser]public class CsvTest{    private const string testdata = """            a,b            1,2            3sss,3333            1,2            3sss,3333            1,2/// 1w 行            """;
private CsvConfiguration config = new CsvConfiguration(CultureInfo.InvariantCulture) { Mode = CsvMode.RFC4180, };
[Benchmark] public void CsvHelper_Read() { using var sr = new StringReader(testdata); using var csv = new CsvHelper.CsvReader(sr, config); var records = new List<string[]>(); csv.Read(); csv.ReadHeader(); while (csv.Read()) { var record = new string[csv.ColumnCount]; for (var i = 0; i < record.Length; i++) { record[i] = csv.GetField(i); } records.Add(record); } //var d = records.ToArray(); }
[Benchmark] public void RuQu_Read_Csv_StringReader() { using var sr = new StringReader(testdata); using var reader = new RuQu.Csv.CsvReader(sr, fristIsHeader: true); var d = reader.ToArray(); }
[Benchmark] public void RuQu_Read_Csv_String() { using var reader = new RuQu.Csv.CsvReader(testdata, fristIsHeader: true); var d = reader.ToArray(); }}
复制代码


性能测试结果:



BenchmarkDotNet v0.13.12, Windows 11 (10.0.22631.3155/23H2/2023Update/SunValley3)13th Gen Intel Core i9-13900KF, 1 CPU, 32 logical and 24 physical cores.NET SDK 8.0.200 [Host] : .NET 8.0.2 (8.0.224.6711), X64 RyuJIT AVX2 DefaultJob : .NET 8.0.2 (8.0.224.6711), X64 RyuJIT AVX2

复制代码



那么这样的表现,如何达到呢?我们就从最初我的思考开始

数据类型多样性

众所周知,我们可以将 csv 这样的文本数据用各种各样的数据类型或者存储形式承载比如:


csv |--- string    "a,b\r\n1,2\r\n3,4" |--- char[] |--- byte[] |--- MemoryStream |--- NetworkStream |--- ....
复制代码


那么我们是否能对这些类型进行封装抽象,然后以一份代码实现 csv 解析,并达到高性能呢?

数据类型归类

根据数据类型特点,我们可以归类为两种


  • 无需编码转换的固定长度数组

  • string

  • char[]

  • 需要编码转换的不明确长度的来源

  • byte[]

  • MemoryStream

  • NetworkStream


那么我们以后者更高的复杂度抽象肯定能兼容前者

高性能基石

其次以 csv 解析实现考虑,字符对比,查找必然是首要考虑


现在这方面首选必然是 ReadOnlySpan<T>


其主要对于我们解析有两大优势


  1. 减少数据复制

  2. ReadOnlySpan 实例通常用于引用数组的元素或数组的一部分。 但是,与数组不同, ReadOnlySpan 实例可以指向堆栈上托管的内存、本机内存或托管的内存。

  3. 实现的部分代码如下

  4. 从上述三个方法可以看出,其通过指针等操作,以 struct 极小代价能让我们共享访问数组数据或者片段

  5. span 有 SIMD 优化

  6. span 有着很多 SIMD优化

  7. SIMD,即 Single Instruction, Multiple Data,一条指令操作多个数据.是 CPU 基本指令集的扩展.主要用于提供 fine grain parallelism,即小碎数据的并行操作.比如说图像处理,图像的数据常用的数据类型是 RGB565, RGBA8888, YUV422 等格式,这些格式的数据特点是一个像素点的一个分量总是用小于等于8bit 的数据表示的.如果使用传统的处理器做计算,虽然处理器的寄存器是 32 位或是 64 位的,处理这些数据确只能用于他们的低8位,似乎有点浪费.如果把 64 位寄存器拆成8个8位寄存器就能同时完成8个操作,计算效率提升了8倍.

  8. 以下是 span 部分代码示例

接口抽象

接下来尝试抽象


public interface IReaderBuffer<T> : IDisposable where T : struct{    public int ConsumedCount { get; }    public int Index { get; }    public ReadOnlySpan<T> Readed { get; }    public bool IsEOF { get; }
/// 标记已读, 以方便释放空间 public void Consume(int count); /// 不同场景可以预览不同数组数据, 要求使用方法 就可以在预览未读取数据时将数据读取到数组中 public bool Peek(int count, out ReadOnlySpan<T> data);
public bool Peek(out T data);
public bool PeekByOffset(int offset, out T data);
/// 读取下一份数据 public bool ReadNextBuffer(int count);}
/// 此接口用于表明 固定长度的类型, 以便于我们可以做性能优化public interface IFixedReaderBuffer<T> : IReaderBuffer<T> where T : struct{}
复制代码

String 对应 buffer 实现

非常简单,基本就是 string 的直接方法


public class StringReaderBuffer : IFixedReaderBuffer<char>{    internal string _buffer;    internal int _offset;    internal int _consumedCount;
public StringReaderBuffer(string content) { _buffer = content; }
public ReadOnlySpan<char> Readed { [MethodImpl(MethodImplOptions.AggressiveInlining)] get => _buffer.AsSpan(_offset); }
public bool IsEOF { [MethodImpl(MethodImplOptions.AggressiveInlining)] get => _offset == _buffer.Length; }
public int ConsumedCount { [MethodImpl(MethodImplOptions.AggressiveInlining)] get => _consumedCount; }
public int Index { [MethodImpl(MethodImplOptions.AggressiveInlining)] get => _offset; }
public void Consume(int count) { _offset += count; _consumedCount += count; }
public void Dispose() { }
public bool Peek(int count, out ReadOnlySpan<char> data) { if (_offset + count > _buffer.Length) { data = default; return false; } data = _buffer.AsSpan(_offset, count); return true; }
public bool Peek(out char data) { if (_offset >= _buffer.Length) { data = default; return false; } data = _buffer[_offset]; return true; }
public bool PeekByOffset(int offset, out char data) { var o = _offset + offset; if (o >= _buffer.Length) { data = default; return false; } data = _buffer[o]; return true; }
public bool ReadNextBuffer(int count) => false;}
复制代码

TextReader 对 buffer 实现

这里使用对 TextReader 封装,主要考虑到避免 字符编码 的复杂度


该实现参考自 System.Text.JsonReadBufferState


不一定是最优方式(欢迎大家提供更优秀方式)


public class TextReaderBuffer : IReaderBuffer<char>{    internal char[] _buffer;    internal int _offset;    internal int _count;    internal int _maxCount;    internal int _consumedCount;    private TextReader _reader;    private bool _isFinalBlock;    private bool _isReaded;
public ReadOnlySpan<char> Readed { [MethodImpl(MethodImplOptions.AggressiveInlining)] get { if (!_isReaded) { ReadNextBuffer(1); _isReaded = true; } return _buffer.AsSpan(_offset, _count - _offset); } }
public bool IsEOF { [MethodImpl(MethodImplOptions.AggressiveInlining)] get => _isFinalBlock && _offset == _count; }
public int ConsumedCount { [MethodImpl(MethodImplOptions.AggressiveInlining)] get => _consumedCount; }
public int Index { [MethodImpl(MethodImplOptions.AggressiveInlining)] get => _offset; }
public TextReaderBuffer(TextReader reader, int initialBufferSize) { if (initialBufferSize <= 0) { initialBufferSize = 256; } _buffer = ArrayPool<char>.Shared.Rent(initialBufferSize); _consumedCount = _count = _offset = 0; _reader = reader; }
public void Consume(int count) { _offset += count; _consumedCount += count; }
/// 调整buffer 数组大小,以便能更有效多读取数据,减少数据迁移带来的数组操作 public void AdvanceBuffer(int count) { var remaining = _buffer.Length - _count + _offset; if (remaining <= (_buffer.Length / 2) && _buffer.Length != int.MaxValue) { // We have less than half the buffer available, double the buffer size. char[] oldBuffer = _buffer; int oldMaxCount = _maxCount; var newSize = (_buffer.Length < (int.MaxValue / 2)) ? _buffer.Length * 2 : int.MaxValue; while (newSize < count) { newSize *= (newSize < (int.MaxValue / 2)) ? newSize * 2 : int.MaxValue; } char[] newBuffer = ArrayPool<char>.Shared.Rent(newSize); // Copy the unprocessed data to the new buffer while shifting the processed bytes. Buffer.BlockCopy(oldBuffer, _offset, newBuffer, 0, _count - _offset); _buffer = newBuffer; // Clear and return the old buffer new Span<char>(oldBuffer, 0, oldMaxCount).Clear(); ArrayPool<char>.Shared.Return(oldBuffer); _maxCount = _count; _count -= _offset; _offset = 0; } else if (_offset != 0) { _count -= _offset; // Shift the processed bytes to the beginning of buffer to make more room. Buffer.BlockCopy(_buffer, _offset, _buffer, 0, _count); _offset = 0; } }
public void Dispose() { if (_buffer != null) { new Span<char>(_buffer, 0, _maxCount).Clear(); char[] toReturn = _buffer; ArrayPool<char>.Shared.Return(toReturn); _buffer = null!; } }
public bool Peek(int count, out ReadOnlySpan<char> data) { if (!_isReaded) { ReadNextBuffer(count); _isReaded = true; } if (!_isFinalBlock && count + _offset > _count) { ReadNextBuffer(count); } if (_offset + count > _count) { data = default; return false; } data = _buffer.AsSpan(_offset, count); return true; }
public bool Peek(out char data) { if (!_isReaded) { ReadNextBuffer(1); _isReaded = true; } if (!_isFinalBlock && 1 + _offset > _count) { ReadNextBuffer(1); } if (_offset >= _count) { data = default; return false; } data = _buffer[_offset]; return true; }
public bool PeekByOffset(int offset, out char data) { var o = offset + 1; if (!_isReaded) { ReadNextBuffer(o); _isReaded = true; } if (!_isFinalBlock && o > _count) { ReadNextBuffer(o); } if (_offset >= _count) { data = default; return false; } data = _buffer[o]; return true; }
public bool ReadNextBuffer(int count) { if (!_isFinalBlock) { AdvanceBuffer(count); do { int readCount = _reader.Read(_buffer.AsSpan(_count)); if (readCount == 0) { _isFinalBlock = true; break; }
_count += readCount; } while (_count < _buffer.Length);
if (_count > _maxCount) { _maxCount = _count; } return true; } return false; }}
复制代码

RFC4180 csv 标准 解析实现

PS: 不一定完全正确,毕竟没有完整测试过,仅供参考,哈哈


可以看到,由于要考虑不确定长度的抽象, 代码还是有一定复杂度的


public class CsvReader : TextDataReader<string[]>{    public CsvReader(string content, char separater = ',', bool fristIsHeader = false) : base(content)    {        Separater = separater;        HasHeader = fristIsHeader;    }
public CsvReader(TextReader reader, int bufferSize = 256, char separater = ',', bool fristIsHeader = false) : base(reader, bufferSize) { Separater = separater; HasHeader = fristIsHeader; }
public char Separater { get; private set; } = ',';
public bool HasHeader { get; private set; }
public string[] Header { get; private set; }
public int FieldCount { get; private set; }
public override bool MoveNext() { string[] row; if (HasHeader && Header == null) { if (!ProcessFirstRow(out row)) { throw new ParseException("Missing header"); } Header = row; }
var r = FieldCount == 0 ? ProcessFirstRow(out row) : ProcessRow(out row); Current = row; return r; }
private bool ProcessFirstRow(out string[]? row) { var r = new List<string>(); var hasValue = false; while (ProcessField(out var f)) { r.Add(f); hasValue = true; } reader.IngoreCRLF(); row = r.ToArray(); FieldCount = row.Length; return hasValue; }
private bool TakeString(out string s) { if (reader.IsEOF) { throw new ParseException($"Expect some string end with '\"' at {reader.Index} but got eof"); }
int pos = 0; int len; ReadOnlySpan<char> remaining; do { remaining = reader.Readed; len = remaining.Length; var charBufferSpan = remaining[pos..]; var i = charBufferSpan.IndexOf(Separater); if (i >= 0) { if (reader.PeekByOffset(i + 1, out var n) && n == Separater) { pos += i + 2; continue; } s = remaining[..i].ToString(); reader.Consume(i + 1); return true; } else { pos += charBufferSpan.Length; } } while (reader.ReadNextBuffer(len)); s = reader.Readed.ToString(); return true; }
private bool ProcessField(out string? f) { if (!reader.Peek(out var c) || reader.IngoreCRLF()) { f = null; return false; } if (c == Separater) { f = string.Empty; reader.Consume(1); return true; } else if (c is '"') { /// 读取可能转义的字段数据 reader.Consume(1); return TakeString(out f); } else { /// 读取不包含转义的普通字段数据 var i = reader.IndexOfAny(Separater, '\r', '\n'); if (i == 0) { f = string.Empty; } else if (i > 0) { f = reader.Readed[..i].ToString(); reader.Consume(i); } else { f = reader.Readed.ToString(); reader.Consume(f.Length); } if (reader.Peek(out var cc) && cc == Separater) { reader.Consume(1); } return true; } }
private bool ProcessRow(out string[]? row) { row = new string[FieldCount];
for (int i = 0; i < FieldCount; i++) { if (!ProcessField(out var f)) { reader.IngoreCRLF(); return false; } row[i] = f; } reader.IngoreCRLF(); return true; }
}
复制代码


至于其性能,就是最顶上的结果


达到了预期,不算浪费秃头掉发了


完整代码参考 https://github.com/fs7744/ruqu


发布于: 4 小时前阅读数: 9
用户头像

八苦-瞿昙

关注

一个假和尚,不懂人情世故。 2018-11-23 加入

会点点技术,能写些代码,只爱静静。 g hub: https://github.com/fs7744 黑历史:https://www.cnblogs.com/fs7744

评论

发布
暂无评论
以解析csv数据为例,讨论string、char[]、stream 不同类型来源是否能进行高性能读取解析封装可能性_C#_八苦-瞿昙_InfoQ写作社区