使用.NET簡單實現一個Redis的高性能克隆版(四、五)

譯者注

該原文是Ayende Rahien大佬業餘自己在使用C# 和 .NET構建一個簡單、高性能兼容Redis協議的數據庫的經歷。
首先這個”Redis”是非常簡單的實現,但是他在優化這個簡單”Redis”路程很有趣,也能給我們在從事性能優化工作時帶來一些啟示。
由於接下來的兩篇較短,本文一起把它們一起翻譯
原作者:Ayende Rahien
原鏈接:
//ayende.com/blog/197505-C/high-performance-net-building-a-redis-clone-separation-of-computation-i-o

//ayende.com/blog/197537-A/high-performance-net-building-a-redis-clone-the-wrong-optimization-path

另外Ayende大佬是.NET開源的高性能多範式數據庫RavenDB所在公司的CTO,不排除這些文章是為了以後會在RavenDB上兼容Redis協議做的嘗試。大家也可以多多支持,下方給出了鏈接
RavenDB地址://github.com/ravendb/ravendb

構建Redis克隆版-計算與I/O的分離(四)

在達到125w/s的性能以後,我決定試試把代碼修改成流水線(pipeline)會發生什麼。這個改動很複雜,因為我要追蹤所有的輸入請求,又需要將輸入請求發送到對應的的多個線程進行處理。

在我看來,這些代碼本身就是垃圾。但是只要它能在架構上為我指明正確的方向,那麼就是值得的。您可以再下面閱讀那些代碼,但是它有點複雜,我們儘可能的多讀取客戶端請求,然後將其發送到每個專用線程來運行它。

就性能而言,它比上一個版本的代碼慢(大約20%),但是它有一個好處,那就是能很容易的看出哪裡的花費的資源最多。

看看下面的分析器結果:

您可以看到,我們在 I/O 和字符串處理方面花費了很多時間。GC也花費了很多時間。
我想分階段解決這個問題。第一部分是停止到處使用字符串。之後的下一個階段可能是更改 I/O 模型。
就目前而言,我們的代碼是這樣的:

using System.Collections.Concurrent;
using System.Net.Sockets;
using System.Text;
using System.Threading.Channels;

var listener = new TcpListener(System.Net.IPAddress.Any, 6379);
listener.Start();

ShardedDictionary _state = new(Environment.ProcessorCount / 2);

while (true)
{
    var tcp = listener.AcceptTcpClient();
    var stream = tcp.GetStream();
    var client = new Client(tcp, new StreamReader(stream), new StreamWriter(stream)
    {
        AutoFlush = true
    }, _state);
    var _ = client.ReadAsync();
}

class Client
{
    public readonly TcpClient Tcp;
    public readonly StreamReader Reader;
    public readonly StreamWriter Writer;
    public readonly ShardedDictionary Dic;

    public struct Command
    {
        public string Key;
        public string? Value;
        public bool Completed;
    }


    private List<string> _args = new();
    private Task<string?> _nextLine;
    private Command[] _commands = Array.Empty<Command>();
    private int _commandsLength = 0;
    private StringBuilder _buffer = new();
    private int _shardFactor;

    public Client(TcpClient tcp, StreamReader reader, StreamWriter writer, ShardedDictionary dic)
    {
        Tcp = tcp;
        Reader = reader;
        Writer = writer;
        Dic = dic;
        _shardFactor = dic.Factor;
    }

    public async Task ReadAsync()
    {
        try
        {
            while (true)
            {
                if (_buffer.Length != 0)
                {
                    await Writer.WriteAsync(_buffer);
                    _buffer.Length = 0;
                }
                var lineTask = _nextLine ?? Reader.ReadLineAsync();
                if (lineTask.IsCompleted == false)
                {
                    if (_commandsLength != 0)
                    {
                        _nextLine = lineTask;
                        Dic.Enqueue(this, Math.Abs(_commands[0].Key.GetHashCode()) % _shardFactor);
                        return;
                    }
                }
                var line = await lineTask;
                _nextLine = null;
                if (line == null)
                {
                    using (Tcp) // done reading...
                    {
                        return;
                    }
                }

                await ReadCommand(line);

                AddCommand();
            }
        }
        catch (Exception e)
        {
            await HandleError(e);
        }
    }

    private async Task ReadCommand(string line)
    {
        _args.Clear();
        if (line[0] != '*')
            throw new InvalidDataException("Cannot understand arg batch: " + line);
        var argsv = int.Parse(line.Substring(1));
        for (int i = 0; i < argsv; i++)
        {
            line = await Reader.ReadLineAsync() ?? string.Empty;
            if (line[0] != '$')
                throw new InvalidDataException("Cannot understand arg length: " + line);
            var argLen = int.Parse(line.Substring(1));
            line = await Reader.ReadLineAsync() ?? string.Empty;
            if (line.Length != argLen)
                throw new InvalidDataException("Wrong arg length expected " + argLen + " got: " + line);

            _args.Add(line);
        }
    }

    private void AddCommand()
    {
        if (_commandsLength >= _commands.Length)
        {
            Array.Resize(ref _commands, _commands.Length + 8);
        }
        ref Command cmd = ref _commands[_commandsLength++];
        cmd.Completed = false;
        switch (_args[0])
        {
            case "GET":
                cmd.Key = _args[1];
                cmd.Value = null;
                break;
            case "SET":
                cmd.Key = _args[1];
                cmd.Value = _args[2];
                break;
            default:
                throw new ArgumentOutOfRangeException("Unknown command: " + _args[0]);
        }
    }

    public async Task NextAsync()
    {
        try
        {
            WriteToBuffer();

            await ReadAsync();
        }
        catch (Exception e)
        {
            await HandleError(e);
        }
    }

    private void WriteToBuffer()
    {
        for (int i = 0; i < _commandsLength; i++)
        {
            ref Command cmd = ref _commands[i];
            if (cmd.Value == null)
            {
                _buffer.Append("$-1\r\n");
            }
            else
            {
                _buffer.Append($"${cmd.Value.Length}\r\n{cmd.Value}\r\n");
            }
        }
        _commandsLength = 0;
    }

    public async Task HandleError(Exception e)
    {
        using (Tcp)
        {
            try
            {
                string? line;
                var errReader = new StringReader(e.ToString());
                while ((line = errReader.ReadLine()) != null)
                {
                    await Writer.WriteAsync("-");
                    await Writer.WriteLineAsync(line);
                }
                await Writer.FlushAsync();
            }
            catch (Exception)
            {
                // nothing we can do
            }
        }
    }

    internal void Execute(Dictionary<string, string> localDic, int index)
    {
        int? next = null;
        for (int i = 0; i < _commandsLength; i++)
        {
            ref var cmd = ref _commands[i];
            var cur = Math.Abs(cmd.Key.GetHashCode()) % _shardFactor;
            if (cur == index) // match
            {
                cmd.Completed = true;
                if (cmd.Value != null)
                {
                    localDic[cmd.Key] = cmd.Value;
                }
                else
                {
                    localDic.TryGetValue(cmd.Key, out cmd.Value);
                }
            }
            else if (cmd.Completed == false)
            {
                next = cur;
            }
        }
        if (next != null)
        {
            Dic.Enqueue(this, next.Value);
        }
        else
        {
            _ = NextAsync();
        }
    }
}

class ShardedDictionary
{
    Dictionary<string, string>[] _dics;
    BlockingCollection<Client>[] _workers;

    public int Factor => _dics.Length;

    public ShardedDictionary(int shardingFactor)
    {
        _dics = new Dictionary<string, string>[shardingFactor];
        _workers = new BlockingCollection<Client>[shardingFactor];

        for (int i = 0; i < shardingFactor; i++)
        {
            var dic = new Dictionary<string, string>();
            var worker = new BlockingCollection<Client>();
            _dics[i] = dic;
            _workers[i] = worker;
            var index = i;
            // readers
            new Thread(() =>
            {
                ExecWorker(dic, index, worker);
            })
            {
                IsBackground = true,
            }.Start();
        }
    }

    private static void ExecWorker(Dictionary<string, string> dic, int index, BlockingCollection<Client> worker)
    {
        while (true)
        {
            worker.Take().Execute(dic, index);
        }
    }

    public void Enqueue(Client c, int index)
    {
        _workers[index].Add(c);
    }

}

構建Redis克隆版-踩了一個坑(五)

現在,我已經完成了這些簡單的工作,我決定將Redis實現改為使用System.IO.Pipelines。這是一個高性能的I/O API,專門針對那些需要高系統性能的服務器設計。

API有一點不同,但是它的使用方式非常合乎邏輯,並且有意義。下面是用於處理來自客戶端命令的主循環:


public async Task HandleConnection()
{
    while (true)
    {
        var result = await _netReader.ReadAsync();
        var (consumed, examined) = ParseNetworkData(result);
        _netReader.AdvanceTo(consumed, examined);
        await _netWriter.FlushAsync();
    }
}

我們的想法是,我們從網絡獲得一個緩衝區,我們讀取一切(包括流水線命令) ,然後刷新到客戶端。當我們開始處理實際的命令時,更有趣的事情發生了,因為現在我們使用的不是 StreamReader而是PipeReader。所以我們處理的是位元組級別,而不是字符串級別。
下面是大致的代碼,我沒有展示整個代碼,因為我想集中在我遇到的問題:

(SequencePosition Consumed, SequencePosition Examined) ParseNetworkData(ReadResult result)
{
    var reader = new SequenceReader<byte>(result.Buffer);
    while (true)
    {
        _cmds.Clear();

        if (reader.TryReadTo(out ReadOnlySpan<byte> line, (byte)'\n') == false)
            return (reader.Consumed, reader.Position);

        if (line.Length == 0 || line[0] != '*' || line[line.Length - 1] != '\r')
            ThrowBadBuffer(result.Buffer);
        if (Utf8Parser.TryParse(line.Slice(1), out int argc, out int bytesConsumed) == false ||
            bytesConsumed + 2 != line.Length) // account for the * and \r
            ThrowBadBuffer(result.Buffer);

        for (int i = 0; i < argc; i++)
        {
            // **** redacted - reading cmd to _cmds buffer
        }

        ExecCommand(_cmds);
    }
}

代碼從緩衝區讀取並解析Redis協議,然後執行命令。它在同一個緩衝區(流水線)中支持多個命令,而且性能非常糟糕。

是的,相對於使用字符串的簡單性而言,對於位元組處理想使用正確API要難得多,而且它的速度比字符串還要慢得多。在我的開發機器上,我說的慢得多是指以下幾點:

  • 以前的版本大約每秒鐘126,017.72次操作。
  • 此版本低於每秒100次操作。

是的,你沒有看錯,每秒少於100次操作,而未優化版本的操作則超過10萬次。
你可以想像,那真是… 令人驚訝。
我實際上寫了兩次實現,使用不同的方法,試圖找出我做錯了什麼。使用PipeReader肯定沒那麼糟。
我查看了分析器的輸出,試圖弄清楚發生了什麼:

它非常清楚地表明,這個實現非常糟糕,不是嗎?到底怎麼回事?

底層的問題實際上相當簡單,並且與Pipelines API如何實現這麼高的性能有關。替代掉那些高頻的System call,您需要獲得一個緩衝區並處理。處理完緩衝區之後,您可以很方便的看到處理了多少數據,然後可以處理另一個調用。

然而,實際使用的數據和我們期望的數據是有區別的,如下所示:

# 請求redis 設置15位元組的Key - memtier-2818567
# 數據為256位元組 - xxxxxxxxxx ... xxxxxx
*3
$3
SET
$15
memtier-2818567
$256
xxxxxxxxxx ... xxxxxx

# 請求redis 獲取Key - memtier-2818567 對應的數據
*2
$3
GET
$15
memtier-7689405

# 請求redis 獲取Key - memtier-2818567 對應的數據
*2
$3
GET
$15  
# !!! 這裡發現有問題,Key應是memtier-2818567 但是只讀取出了memt
memt

您在這裡看到的是一個流水線命令,緩衝區中有335個位元組。我們將在一次讀取中中處理所有這些命令,除了… 看着最後四行。這是什麼?

我們得到了客戶端發送來的部分命令。換句話說,我們需要執行一個Key大小為15位元組的GET操作,但是這裡只接收到了前4個位元組。這是意料之中的事,我們消耗了緩存區所有空間,直到最後四行(從而讓 PipeReader 知道我們已經完成了它們)。

問題是,當我們現在在客戶端發出一個請求時,我們在服務端得到最後四行的部分(我們沒有使用它) ,但是我們還沒有準備好處理它。所以數據丟失了,PipeReader知道它需要從網絡上讀取更多的數據。

但是… 我的代碼有一個小bug。它將報告說它檢查了下面黃色的部分,而沒有檢查綠色的部分。

換句話說,我們告訴PipeReader,我們已經消費了緩衝區的一部分,又檢查了緩衝區的一部分,但緩衝區上還有一些位元組既沒有消費也沒有檢查。這意味着,當我們發出讀取調用,期望從網絡上獲得數據時,我們實際上會再次獲得相同的緩衝區,進行完全相同的處理。

最終,我們在緩衝區中會有更多來自另客戶端的數據,雖然解決方案的正確性不會受到影響,但這會非常的影響性能。

修復非常簡單,我們需要告訴PipeReader我們檢查了整個緩衝區,這樣它就不會忙碌地等待和等待來自網絡的更多數據。以下是錯誤修復方法:

<             return (reader.Consumed, reader.Position);
修改為:
>             return (reader.Consumed, result.Buffer.End);

有了這一改動,我們可以達到每秒187,104.21次操作!這比以前提高了50%,這真是太棒了。我還沒有對事情進行適當的分析,因為我還想解決另一個問題,我們如何處理來自網絡的數據。在我的下一篇文章中會有更多關於這個問題的內容。

關於上文中提到的BUG – 譯者注

這一個微小的BUG大家可能比較難理解,因為很多人都沒有接觸過PipeReader這麼底層的API。我們來看看上文中while循環的代碼:

public async Task HandleConnection()
{
    while (true)
    {
        var result = await _netReader.ReadAsync();
        var (consumed, examined) = ParseNetworkData(result);
        // 主要是AdvanceTo方法,這個方法有兩個參數
        // consumed: 目前處理了多少數據,比如redis協議是按行處理,也就是\n
        // examined:檢查了多少數據,檢查的數據和處理的數據不一定一樣,因為
        // 可能由於網絡延時,還沒有接收一個完整的數據包
        _netReader.AdvanceTo(consumed, examined);
        await _netWriter.FlushAsync();
    }
}

另外就是修改點:

<             return (reader.Consumed, reader.Position);
修改為:
>             return (reader.Consumed, result.Buffer.End);

修改前的代碼是檢查的數據是返回當前的Position,但是當前的Position是小於我們實際上檢查的長度
按照 if (reader.TryReadTo(out ReadOnlySpan<byte> line, (byte)'\n') == false)
代碼所示,我們其實檢查了流中的所有位置,只是從頭讀到尾巴沒有讀取到\n,如上面的例子就是讀取到了最後一行,只讀取了ment,因為網絡請求原因,完整的memtier-7689405\n還沒有接收到。

此時我們返回Position是上圖中黃色的部分,但是實際上我們是檢查到了綠色的memt部分,返回到上層以後,執行_netReader.AdvanceTo(consumed, examined);

Pipeline發現還有剩餘的綠色memt沒有被檢查,就會繼續走var (consumed, examined) = ParseNetworkData(result);,又重新讀取了memt,由於沒有\n又返回了黃色部分的Position,所以這裡就形成了忙等,再沒有新的數據到來之前,這裡將一直循環;雖然沒有BUG,但是非常影響性能。

而修改以後檢查位置返回result.Buffer.End,就包括了綠色的memt部分,這樣的話var result = await _netReader.ReadAsync();只有當有新的數據到來時才會繼續走下面的代碼,這樣的話充分的利用了Pipelines的優勢,性能會更加好。

公眾號

之前一直有朋友讓開通公眾號,由於一直比較忙沒有弄。
現在終於抽空弄好了,譯者公眾號如下,歡迎大家關注。

系列鏈接

使用.NET簡單實現一個Redis的高性能克隆版(一)
使用.NET簡單實現一個Redis的高性能克隆版(二)
使用.NET簡單實現一個Redis的高性能克隆版(三)