使用.NET簡單實現一個Redis的高性能克隆版(四、五)
譯者注
該原文是Ayende Rahien大佬業餘自己在使用C# 和 .NET構建一個簡單、高性能兼容Redis協議的數據庫的經歷。
首先這個”Redis”是非常簡單的實現,但是他在優化這個簡單”Redis”路程很有趣,也能給我們在從事性能優化工作時帶來一些啟示。
由於接下來的兩篇較短,本文一起把它們一起翻譯
原作者:Ayende Rahien
原鏈接:
//ayende.com/blog/197505-C/high-performance-net-building-a-redis-clone-separation-of-computation-i-o
//ayende.com/blog/197537-A/high-performance-net-building-a-redis-clone-the-wrong-optimization-path
另外Ayende大佬是.NET開源的高性能多範式數據庫RavenDB所在公司的CTO,不排除這些文章是為了以後會在RavenDB上兼容Redis協議做的嘗試。大家也可以多多支持,下方給出了鏈接
RavenDB地址://github.com/ravendb/ravendb
構建Redis克隆版-計算與I/O的分離(四)
在達到125w/s的性能以後,我決定試試把代碼修改成流水線(pipeline)會發生什麼。這個改動很複雜,因為我要追蹤所有的輸入請求,又需要將輸入請求發送到對應的的多個線程進行處理。
在我看來,這些代碼本身就是垃圾。但是只要它能在架構上為我指明正確的方向,那麼就是值得的。您可以再下面閱讀那些代碼,但是它有點複雜,我們儘可能的多讀取客戶端請求,然後將其發送到每個專用線程來運行它。
就性能而言,它比上一個版本的代碼慢(大約20%),但是它有一個好處,那就是能很容易的看出哪裡的花費的資源最多。
看看下面的分析器結果:
您可以看到,我們在 I/O 和字符串處理方面花費了很多時間。GC也花費了很多時間。
我想分階段解決這個問題。第一部分是停止到處使用字符串。之後的下一個階段可能是更改 I/O 模型。
就目前而言,我們的代碼是這樣的:
using System.Collections.Concurrent;
using System.Net.Sockets;
using System.Text;
using System.Threading.Channels;
var listener = new TcpListener(System.Net.IPAddress.Any, 6379);
listener.Start();
ShardedDictionary _state = new(Environment.ProcessorCount / 2);
while (true)
{
var tcp = listener.AcceptTcpClient();
var stream = tcp.GetStream();
var client = new Client(tcp, new StreamReader(stream), new StreamWriter(stream)
{
AutoFlush = true
}, _state);
var _ = client.ReadAsync();
}
class Client
{
public readonly TcpClient Tcp;
public readonly StreamReader Reader;
public readonly StreamWriter Writer;
public readonly ShardedDictionary Dic;
public struct Command
{
public string Key;
public string? Value;
public bool Completed;
}
private List<string> _args = new();
private Task<string?> _nextLine;
private Command[] _commands = Array.Empty<Command>();
private int _commandsLength = 0;
private StringBuilder _buffer = new();
private int _shardFactor;
public Client(TcpClient tcp, StreamReader reader, StreamWriter writer, ShardedDictionary dic)
{
Tcp = tcp;
Reader = reader;
Writer = writer;
Dic = dic;
_shardFactor = dic.Factor;
}
public async Task ReadAsync()
{
try
{
while (true)
{
if (_buffer.Length != 0)
{
await Writer.WriteAsync(_buffer);
_buffer.Length = 0;
}
var lineTask = _nextLine ?? Reader.ReadLineAsync();
if (lineTask.IsCompleted == false)
{
if (_commandsLength != 0)
{
_nextLine = lineTask;
Dic.Enqueue(this, Math.Abs(_commands[0].Key.GetHashCode()) % _shardFactor);
return;
}
}
var line = await lineTask;
_nextLine = null;
if (line == null)
{
using (Tcp) // done reading...
{
return;
}
}
await ReadCommand(line);
AddCommand();
}
}
catch (Exception e)
{
await HandleError(e);
}
}
private async Task ReadCommand(string line)
{
_args.Clear();
if (line[0] != '*')
throw new InvalidDataException("Cannot understand arg batch: " + line);
var argsv = int.Parse(line.Substring(1));
for (int i = 0; i < argsv; i++)
{
line = await Reader.ReadLineAsync() ?? string.Empty;
if (line[0] != '$')
throw new InvalidDataException("Cannot understand arg length: " + line);
var argLen = int.Parse(line.Substring(1));
line = await Reader.ReadLineAsync() ?? string.Empty;
if (line.Length != argLen)
throw new InvalidDataException("Wrong arg length expected " + argLen + " got: " + line);
_args.Add(line);
}
}
private void AddCommand()
{
if (_commandsLength >= _commands.Length)
{
Array.Resize(ref _commands, _commands.Length + 8);
}
ref Command cmd = ref _commands[_commandsLength++];
cmd.Completed = false;
switch (_args[0])
{
case "GET":
cmd.Key = _args[1];
cmd.Value = null;
break;
case "SET":
cmd.Key = _args[1];
cmd.Value = _args[2];
break;
default:
throw new ArgumentOutOfRangeException("Unknown command: " + _args[0]);
}
}
public async Task NextAsync()
{
try
{
WriteToBuffer();
await ReadAsync();
}
catch (Exception e)
{
await HandleError(e);
}
}
private void WriteToBuffer()
{
for (int i = 0; i < _commandsLength; i++)
{
ref Command cmd = ref _commands[i];
if (cmd.Value == null)
{
_buffer.Append("$-1\r\n");
}
else
{
_buffer.Append($"${cmd.Value.Length}\r\n{cmd.Value}\r\n");
}
}
_commandsLength = 0;
}
public async Task HandleError(Exception e)
{
using (Tcp)
{
try
{
string? line;
var errReader = new StringReader(e.ToString());
while ((line = errReader.ReadLine()) != null)
{
await Writer.WriteAsync("-");
await Writer.WriteLineAsync(line);
}
await Writer.FlushAsync();
}
catch (Exception)
{
// nothing we can do
}
}
}
internal void Execute(Dictionary<string, string> localDic, int index)
{
int? next = null;
for (int i = 0; i < _commandsLength; i++)
{
ref var cmd = ref _commands[i];
var cur = Math.Abs(cmd.Key.GetHashCode()) % _shardFactor;
if (cur == index) // match
{
cmd.Completed = true;
if (cmd.Value != null)
{
localDic[cmd.Key] = cmd.Value;
}
else
{
localDic.TryGetValue(cmd.Key, out cmd.Value);
}
}
else if (cmd.Completed == false)
{
next = cur;
}
}
if (next != null)
{
Dic.Enqueue(this, next.Value);
}
else
{
_ = NextAsync();
}
}
}
class ShardedDictionary
{
Dictionary<string, string>[] _dics;
BlockingCollection<Client>[] _workers;
public int Factor => _dics.Length;
public ShardedDictionary(int shardingFactor)
{
_dics = new Dictionary<string, string>[shardingFactor];
_workers = new BlockingCollection<Client>[shardingFactor];
for (int i = 0; i < shardingFactor; i++)
{
var dic = new Dictionary<string, string>();
var worker = new BlockingCollection<Client>();
_dics[i] = dic;
_workers[i] = worker;
var index = i;
// readers
new Thread(() =>
{
ExecWorker(dic, index, worker);
})
{
IsBackground = true,
}.Start();
}
}
private static void ExecWorker(Dictionary<string, string> dic, int index, BlockingCollection<Client> worker)
{
while (true)
{
worker.Take().Execute(dic, index);
}
}
public void Enqueue(Client c, int index)
{
_workers[index].Add(c);
}
}
構建Redis克隆版-踩了一個坑(五)
現在,我已經完成了這些簡單的工作,我決定將Redis實現改為使用System.IO.Pipelines
。這是一個高性能的I/O API,專門針對那些需要高系統性能的服務器設計。
API有一點不同,但是它的使用方式非常合乎邏輯,並且有意義。下面是用於處理來自客戶端命令的主循環:
public async Task HandleConnection()
{
while (true)
{
var result = await _netReader.ReadAsync();
var (consumed, examined) = ParseNetworkData(result);
_netReader.AdvanceTo(consumed, examined);
await _netWriter.FlushAsync();
}
}
我們的想法是,我們從網絡獲得一個緩衝區,我們讀取一切(包括流水線命令) ,然後刷新到客戶端。當我們開始處理實際的命令時,更有趣的事情發生了,因為現在我們使用的不是 StreamReader
而是PipeReader
。所以我們處理的是位元組級別,而不是字符串級別。
下面是大致的代碼,我沒有展示整個代碼,因為我想集中在我遇到的問題:
(SequencePosition Consumed, SequencePosition Examined) ParseNetworkData(ReadResult result)
{
var reader = new SequenceReader<byte>(result.Buffer);
while (true)
{
_cmds.Clear();
if (reader.TryReadTo(out ReadOnlySpan<byte> line, (byte)'\n') == false)
return (reader.Consumed, reader.Position);
if (line.Length == 0 || line[0] != '*' || line[line.Length - 1] != '\r')
ThrowBadBuffer(result.Buffer);
if (Utf8Parser.TryParse(line.Slice(1), out int argc, out int bytesConsumed) == false ||
bytesConsumed + 2 != line.Length) // account for the * and \r
ThrowBadBuffer(result.Buffer);
for (int i = 0; i < argc; i++)
{
// **** redacted - reading cmd to _cmds buffer
}
ExecCommand(_cmds);
}
}
代碼從緩衝區讀取並解析Redis協議,然後執行命令。它在同一個緩衝區(流水線)中支持多個命令,而且性能非常糟糕。
是的,相對於使用字符串的簡單性而言,對於位元組處理想使用正確API要難得多,而且它的速度比字符串還要慢得多。在我的開發機器上,我說的慢得多是指以下幾點:
- 以前的版本大約每秒鐘126,017.72次操作。
- 此版本低於每秒100次操作。
是的,你沒有看錯,每秒少於100次操作,而未優化版本的操作則超過10萬次。
你可以想像,那真是… 令人驚訝。
我實際上寫了兩次實現,使用不同的方法,試圖找出我做錯了什麼。使用PipeReader
肯定沒那麼糟。
我查看了分析器的輸出,試圖弄清楚發生了什麼:
它非常清楚地表明,這個實現非常糟糕,不是嗎?到底怎麼回事?
底層的問題實際上相當簡單,並且與Pipelines API如何實現這麼高的性能有關。替代掉那些高頻的System call,您需要獲得一個緩衝區並處理。處理完緩衝區之後,您可以很方便的看到處理了多少數據,然後可以處理另一個調用。
然而,實際使用的數據和我們期望的數據是有區別的,如下所示:
# 請求redis 設置15位元組的Key - memtier-2818567
# 數據為256位元組 - xxxxxxxxxx ... xxxxxx
*3
$3
SET
$15
memtier-2818567
$256
xxxxxxxxxx ... xxxxxx
# 請求redis 獲取Key - memtier-2818567 對應的數據
*2
$3
GET
$15
memtier-7689405
# 請求redis 獲取Key - memtier-2818567 對應的數據
*2
$3
GET
$15
# !!! 這裡發現有問題,Key應是memtier-2818567 但是只讀取出了memt
memt
您在這裡看到的是一個流水線命令,緩衝區中有335個位元組。我們將在一次讀取中中處理所有這些命令,除了… 看着最後四行。這是什麼?
我們得到了客戶端發送來的部分命令。換句話說,我們需要執行一個Key大小為15位元組的GET操作,但是這裡只接收到了前4個位元組。這是意料之中的事,我們消耗了緩存區所有空間,直到最後四行(從而讓 PipeReader 知道我們已經完成了它們)。
問題是,當我們現在在客戶端發出一個請求時,我們在服務端得到最後四行的部分(我們沒有使用它) ,但是我們還沒有準備好處理它。所以數據丟失了,PipeReader知道它需要從網絡上讀取更多的數據。
但是… 我的代碼有一個小bug。它將報告說它檢查了下面黃色的部分,而沒有檢查綠色的部分。
換句話說,我們告訴PipeReader,我們已經消費了緩衝區的一部分,又檢查了緩衝區的一部分,但緩衝區上還有一些位元組既沒有消費也沒有檢查。這意味着,當我們發出讀取調用,期望從網絡上獲得數據時,我們實際上會再次獲得相同的緩衝區,進行完全相同的處理。
最終,我們在緩衝區中會有更多來自另客戶端的數據,雖然解決方案的正確性不會受到影響,但這會非常的影響性能。
修復非常簡單,我們需要告訴PipeReader我們檢查了整個緩衝區,這樣它就不會忙碌地等待和等待來自網絡的更多數據。以下是錯誤修復方法:
< return (reader.Consumed, reader.Position);
修改為:
> return (reader.Consumed, result.Buffer.End);
有了這一改動,我們可以達到每秒187,104.21次操作!這比以前提高了50%,這真是太棒了。我還沒有對事情進行適當的分析,因為我還想解決另一個問題,我們如何處理來自網絡的數據。在我的下一篇文章中會有更多關於這個問題的內容。
關於上文中提到的BUG – 譯者注
這一個微小的BUG大家可能比較難理解,因為很多人都沒有接觸過PipeReader這麼底層的API。我們來看看上文中while循環的代碼:
public async Task HandleConnection()
{
while (true)
{
var result = await _netReader.ReadAsync();
var (consumed, examined) = ParseNetworkData(result);
// 主要是AdvanceTo方法,這個方法有兩個參數
// consumed: 目前處理了多少數據,比如redis協議是按行處理,也就是\n
// examined:檢查了多少數據,檢查的數據和處理的數據不一定一樣,因為
// 可能由於網絡延時,還沒有接收一個完整的數據包
_netReader.AdvanceTo(consumed, examined);
await _netWriter.FlushAsync();
}
}
另外就是修改點:
< return (reader.Consumed, reader.Position);
修改為:
> return (reader.Consumed, result.Buffer.End);
修改前的代碼是檢查的數據是返回當前的Position
,但是當前的Position是小於我們實際上檢查的長度
按照 if (reader.TryReadTo(out ReadOnlySpan<byte> line, (byte)'\n') == false)
代碼所示,我們其實檢查了流中的所有位置,只是從頭讀到尾巴沒有讀取到\n
,如上面的例子就是讀取到了最後一行,只讀取了ment
,因為網絡請求原因,完整的memtier-7689405\n
還沒有接收到。
此時我們返回Position
是上圖中黃色的部分,但是實際上我們是檢查到了綠色的memt
部分,返回到上層以後,執行_netReader.AdvanceTo(consumed, examined);
。
Pipeline發現還有剩餘的綠色memt
沒有被檢查,就會繼續走var (consumed, examined) = ParseNetworkData(result);
,又重新讀取了memt
,由於沒有\n
又返回了黃色部分的Position
,所以這裡就形成了忙等,再沒有新的數據到來之前,這裡將一直循環;雖然沒有BUG,但是非常影響性能。
而修改以後檢查位置返回result.Buffer.End
,就包括了綠色的memt
部分,這樣的話var result = await _netReader.ReadAsync();
只有當有新的數據到來時才會繼續走下面的代碼,這樣的話充分的利用了Pipelines的優勢,性能會更加好。
公眾號
之前一直有朋友讓開通公眾號,由於一直比較忙沒有弄。
現在終於抽空弄好了,譯者公眾號如下,歡迎大家關注。
系列鏈接
使用.NET簡單實現一個Redis的高性能克隆版(一)
使用.NET簡單實現一個Redis的高性能克隆版(二)
使用.NET簡單實現一個Redis的高性能克隆版(三)