從零開始編寫一個BitTorrent下載器
從零開始編寫一個BitTorrent下載器
BT協議
簡介
BT協議Bit Torrent(BT)是一種通訊協議,又是一種應用程式,廣泛用於對等網路通訊(P2P)。曾經風靡一時,由於它引起了巨大的流量,對網際網路的運營、維護和管理都產生了重要的影響。
BT協議的典型特徵就是沒有中心伺服器。BT協議中,作為參與者的機器被稱為peers。peer之間的通訊協議又被稱為peer wire protocal,即peer連線協議,是一個基於TCP協議的應用層協議。
BT協議在20年里不斷發展(從2001年開始),加入加密、私有種子等設計,也擴展了搜尋peer主機的方法。
連接
由於沒有中心伺服器,參與者需要使用另外的方法取得他人的地址,以建立對等連接,確定自己的機器應當從何處下載需要的文件。傳統的BT協議使用中介伺服器trackers來告知每個參與者如何進行下載。trackers伺服器是基於HTTP的,這類伺服器本身不託管文件資源,僅為每個參與者分配peers。
在BT協議網路中傳播違法資源的現象十分常見,這導致其中介伺服器常常會受到法律制裁,查封事件屢見不鮮。要解決這一問題,就需要將主機搜尋的工作下放到每個參與者的機器,即分散式處理(distributed process)。BT協議未來的核心就是DHT、PEX、磁力鏈。
.torrent文件解析
以debian發布的鏡像文件種子為例。
一個.torrent文件描述了可下載文件的內容以及需要連接到的tracker中介伺服器的資訊,其編碼格式為Bencode。
文件的頭部資訊可以直接以文本形式查看:
d8:announce41://bttracker.debian.org:6969/announce7:comment35:"Debian CD from cdimage.debian.org"13:creation datei1612616380e9:httpseedsl146://cdimage.debian.org/cdimage/release/edu//srv/cdbuilder.debian.org/dst/deb-cd/weekly-builds/amd64/iso-cd/debian-edu-10.8.0-amd64-netinst.iso146://cdimage.debian.org/cdimage/archive/edu//srv/cdbuilder.debian.org/dst/deb-cd/weekly-builds/amd64/iso-cd/debian-edu-10.8.0-amd64-netinst.isoe4:infod6:lengthi425721856e4:name35:debian-edu-10.8.0-amd64-netinst.iso12:piece lengthi262144e6:pieces32480:[每個部分的hash,以二進位表示]
之後的內容為二進位,無法直接查看。
美化一下這個部分的資訊,可以發現清晰的結構特徵:
d
8:announce
41://bttracker.debian.org:6969/announce
7:comment
35:"Debian CD from cdimage.debian.org"
13:creation date
i1612616380e
9:httpseeds
l
146://cdimage.debian.org/cdimage/release/edu//srv/cdbuilder.debian.org/dst/deb-cd/weekly-builds/amd64/iso-cd/debian-edu-10.8.0-amd64-netinst.iso
146://cdimage.debian.org/cdimage/archive/edu//srv/cdbuilder.debian.org/dst/deb-cd/weekly-builds/amd64/iso-cd/debian-edu-10.8.0-amd64-netinst.iso
e
4:info
d
6:length
i425721856e
4:name
35:debian-edu-10.8.0-amd64-netinst.iso
12:piece length
i262144e
6:pieces
32480:[每個部分的hash,以二進位表示]
e
e
其中包含了tracker伺服器的URL、創建事件(Unix時間戳)、文件名和文件大小、以及一系列表示每個文件塊的SHA-1哈希值的二進位片段(文件塊是指文件被等量拆分後形成的幾個部分)。每個種子中文件被拆分的大小依據是不同的,但基本處在一個區間內(256KB到1MB)。因為這樣的設計,大型文件將會被拆分成眾多碎片。在實際下載中,下載執行者會從能夠連接的那些peers主機下載文件塊,並且根據種子文件校驗其哈希值,最後拼接成完整的文件。
這種機制能夠確保每個文件塊的完整性,抵禦設備故障或惡意投毒(torrent poisoning)造成的損害。如果攻擊者不能破解SHA-1進行原像攻擊(preimage attack),那麼下載取得的文件就是安全可靠的。
Bencode編碼
從已知的資訊可以看出,.torrent文件中的元數據均以「鍵:值」形式存儲,故可以將整個內容理解為一個經過特殊編碼的字典,或者一個近似的JSON。
Bencode中,數字採用十進位編碼,相比純二進位編碼顯得效率較低,但保證了良好的跨平台性(無大小端存儲問題)。
Bencode支援四種類型的數據:string、int、Dictionary<string, object>、List<object>。
-
string類型
string類型的編碼格式為[length]:[string],以字元串長度開頭,以字元串內容結束。示例:
"abc" => 3:abc
-
int類型
int類型的編碼格式為i[int]e,以i開頭,以e結尾。示例:
123 => i123e
-
Dictionary<string, object>類型
Dictionary<string, object>類型的編碼格式為d[Key-Value Pair]e,以d開頭,以e結尾。示例:
Dictionary<{"name":"create chen"},{"age":23}> => d4:name11:create chen3:agei23ee
-
List<object>類型
List<object>類型的編碼格式為l[object]e,以l開頭,以e結尾。示例:
List<"abc", 123> => l3:abci123ee
Bencode實現
編碼
public static string Encode(object obj)
{
var sb = new StringBuilder();
if(obj is Dictionary<string,object>)
{
var parseObj = obj as Dictionary<string, object>;
sb.Append("d");
foreach (var o in parseObj)
{
sb.AppendFormat("{0}:{1}{2}", o.Key.Length,o.Key, Encode(o.Value));
}
sb.Append("e");
}
if ((obj as int?) != null)
{
var parseObj = (int) obj;
sb.AppendFormat("i{0}e", parseObj);
}
if (obj is List<object>)
{
var parseObj = obj as List<object>;
sb.Append("l");
foreach (var o in parseObj)
{
sb.Append(Encode(o));
}
sb.Append("e");
}
if (obj is string)
{
var parseObj = obj as string;
sb.AppendFormat("{0}:{1}", parseObj.Length, parseObj);
}
return sb.ToString();
}
解碼
public static object Decode(string s)
{
return DecodeObject(s, ref _index, EncodeState.Value);
}
private enum EncodeState
{
Key,
Value
}
private static int _index;
private static object DecodeObject(string str,ref int index, EncodeState state)
{
var obj = new Dictionary<string, object>();
var c = str[index];
while (c != 'e')
{
if (c == 'd')
{
index++;
return DecodeObject(str, ref index,EncodeState.Key);
}
if (c == 'i')
{
var value = "";
index++; c = str[index];
while (c != 'e')
{
value += c.ToString(CultureInfo.InvariantCulture);
index++;
c = str[index];
}
return Convert.ToInt32(value);
}
if (c == 'l')
{
index++;
var value = new List<object>();
while (str[index]!='e')
{
value.Add(DecodeObject(str, ref index, EncodeState.Value));
index++;
}
return value;
}
if ('0' < c && c <= '9')
{
string strLength = "";
while (c != ':')
{
strLength += c.ToString(CultureInfo.InvariantCulture);
c = str[++index];
}
var length = Convert.ToInt32(strLength);
var strContent = "";
for (int i = 0; i < length; i++)
{
strContent += str[index + 1].ToString(CultureInfo.InvariantCulture);
index++;
}
if (state == EncodeState.Value)
{
return strContent;
}
index++;
obj.Add(strContent, DecodeObject(str, ref index, EncodeState.Value));
state = EncodeState.Key;
index++;
}
c = str[index];
}
return obj;
}
編寫項目
這裡使用Go來編寫,也是首次使用Go完成網路工具。僅包含主要程式碼,完整項目見Github。
尋找
解析種子(~/torrentfile/torrentfile.go)
import (
"github.com/jackpal/bencode-go"
)
這裡省略了自帶庫文件的導入。
type bencodeInfo struct {
Pieces string `bencode:"pieces"`
PieceLength int `bencode:"piece length"`
Length int `bencode:"length"`
Name string `bencode:"name"`
}
type bencodeTorrent struct {
Announce string `bencode:"announce"`
Info bencodeInfo `bencode:"info"`
}
// Open函數用於解析種子
func Open(path string) (TorrentFile, error) {
file, err := os.Open(path)
if err != nil {
return TorrentFile{}, err
}
defer file.Close()
bto := bencodeTorrent{}
err = bencode.Unmarshal(file, &bto)
if err != nil {
return TorrentFile{}, err
}
return bto.toTorrentFile()
}
處理時,將pieces對應的值(原先為哈希值的字元串)變成哈希值切片(每個長度為20 bytes),以便後續調用每個獨立的哈希值。另外,計算info對應的整個字典(含有名稱、大小、文件塊哈希值)的SHA-1哈希值,存儲在infohash,在與trackers伺服器和peers主機交互時表示所需的文件。
type TorrentFile struct {
Announce string
InfoHash [20]byte
PieceHashes [][20]byte
PieceLength int
Length int
Name string
}
func (bto bencodeTorrent) toTorrentFile() (TorrentFile, error) {
// ...
}
從trackers伺服器獲取peers主機地址(~/torrentfile/tracker.go)
處理完種子後,就可以向trackers伺服器發起請求:作為一台peer主機,需要獲取同一網路中的其它peers主機的列表。只需要對announce對應URL發起GET請求(需要設置幾個請求參數)。
// buildTrackerURL函數用於構成請求peers列表的序列
func (t * TorrentFile) buildTrackerURL(peerID [20]byte, port uint16) (string, error) {
base, err:= url.Parse(t.Announce)
if err != nil {
return "", err
}
params := url.Values{
"info_hash": []string{string(t.InfoHash[:])},
"peer_id": []string{string(peerID[:])},
"port": []string{strconv.Itoa(int(port))},
"uploaded": []string{"0"},
"downloaded": []string{"0"},
"compact": []string{"1"},
"left": []string{strconv.Itoa(t.Length)},
}
base.RawQuery = params.Encode()
return base.String(), nil
}
其中重要的參數有:
- info_hash:用以標識需要下載的文件,其值就是之前由info對應值計算出的infohash。trackers伺服器基於這個值返回能夠為下載提供資源的peers主機。
- peer_id:20位元組長的數據,用於向peers主機和trackers伺服器標識自己的身份。具體實現僅僅是產生隨機的20個位元組。真實的BitTorrent客戶端ID形如
-TR2940-k8hj0wgej6ch
,標出了客戶端軟體及其版本(TR2940
表示Transmission Client 2.94)。
處理trackers伺服器的響應(~/peers/peers.go)
伺服器響應也是採用Bencode編碼的:
d
8:interval
i900e
5:peers
252:[很長的二進位塊]
e
interval
表示本地應當在多長的時間間隔後再次向tracker伺服器請求以刷新peers主機列表,900的單位是秒。peers
包含了每個peer主機的IP地址,以二進位表示,由若干個6位元組元組成,前4個位元組表示主機IP,後2個位元組表示埠號(大端存儲的16位無符號整型,uint16)。大端存儲,即big-endian,是網路中所採用的存儲方式(相對於小端存儲),故被稱為network order。運算時可以直接將一組位元組從左至右拼接以形成所要表達的整數,如0x1A
和0xE1
能拼接成0x1AE1
,即十進位的6881。
type Peer struct {
IP net.IP
Port uint16
}
// Unmarshal函數從緩衝區解析IP及其埠
func Unmarshal(peerBin []byte)([]Peer, error) {
const peerSize = 6
numPeers := len(peerBin) / peerSize
if len(peerBin) % peerSize != 0 {
err := fmt.Errorf("received malformed peers")
return nil, err
}
peers := make([]Peer, numPeers)
for i := 0; i < numPeers ; i++ {
offset := i * peerSize
peers[i].IP = net.IP(peerBin[offset : offset+4])
peers[i].Port = binary.BigEndian.Uint16(peersBin[offset+4 : offset+6])
}
return peers, nil
}
下載
在取得peers主機的地址後,就可以進行下載了。對每台peer主機的連接,有如下的幾個步驟:
- 與目標peer建立TCP連接;
- 完成BitTorrent握手;
- 交換資訊(告知對方本地需要的資源)。
TCP連接(~/client/client.go)
設定一個超時檢測機制,防止消耗過多網路資源。
conn, err := net.DialTimeout("tcp", peer.String(), 3*time.Second)
if err != nil {
return nil, err
}
握手(~/handshake/handshake.go)
通過達成握手,以確定某peer主機具有期望的功能:
- 能夠使用BT協議通訊;
- 能夠理解本機發出的資訊,並作出響應;
- 持有本機需要的文件資源,或者持有文件資源在網路中位置的索引。
BitTorrent握手行為需要傳輸的資訊由5個部分構成:
- 協議標識(表明這是BitTorrent協議)的長度,即19,十六進位表示為
0x13
; - 協議標識,被稱為pstr,即
BitTorrent protocol
; - 8個保留位元組,默認全為0,如果客戶端支援BT協議的某些擴展,則需要將其中一些設置為1;
- infohash,基於種子中info對應的全部資訊計算得出的哈希值,用於標明本機需要的文件;
- PEER ID,用於標明本機身份。
這些資訊組合起來,就是達成握手需要的序列:
\x13BitTorrent protocol\x00\x00\x00\x00\x00\x00\x00\x00\x86\xd4\xc8\x00\x24\xa4\x69\xbe\x4c\x50\xbc\x5a\x10\x2c\xf7\x17\x80\x31\x00\x74-TR2940-k8hj0wgej6ch
本機發出這些資訊後,peers主機應當以相同形式響應,且返回的infohash應當與本機持有的一致。
使用一個結構體表示握手包,並添加一些序列化、讀取函數。
// 握手包結構體
type Handshake struct {
Pstr string
InfoHash [20]byte
PeerID [20]byte
}
//Serialize函數用於序列化握手資訊
func (h *Handshake) Serialize() []byte {
buf := make([]byte, len(h.Pstr)+49)
buf[0] = byte(len(h.Pstr))
curr := 1
curr += copy(buf[curr:], h.Pstr)
curr += copy(buf[curr:], make([]byte, 8)) //即8個保留位元組
curr += copy(buf[curr:], h.InfoHash[:])
curr += copy(buf[curr:], h.PeerID[:])
return buf
}
func Read(r io.Reader) (* Handshake, error) {
// ...
}
資訊
完成握手後就將開始正式的收發資訊。如果遠端的peers主機未能做好收發的準備,本機仍舊無法發送資訊,此時本機會被遠端認定為阻塞的(choked)。在peers主機完成準備後,會向本機發送解除阻塞(unchoke)資訊。程式碼設計中,默認需要傑出阻塞才能進行下載。
解析(~/message/message.go)
資訊包含三個部分:長度、ID、payload。
長度為32位整型,是大端存儲形式的4個位元組。ID用以表示資訊類型,這在程式碼中進行了詳細定義。
type messageID uint8
const (
// MsgChoke表示阻塞
MsgChoke messageID = 0
// MsgUnchoke表示解除阻塞
MsgUnchoke messageID = 1
// MsgInterested表示資訊相關
MsgInterested messageID = 2
// MsgNotInterested表示資訊不相關
MsgNotInterested messageID = 3
// MsgHave表示提醒接收者,發送者擁有資源
MsgHave messageID = 4
// MsgBitfield表示發送者擁有資源的哪些部分
MsgBitfield messageID = 5
// MsgRequest表示向接收方請求數據
MsgRequest messageID = 6
// MsgPiece表示發送數據以完成請求
MsgPiece messageID = 7
// MsgCancel表示取消一個請求
MsgCancel messageID = 8
)
//Message結構體儲存ID和包含資訊的payload
type Message struct {
ID messageID
Payload []byte
}
// Serialize函數用於執行序列化
// 資訊依次為前綴、資訊的ID、payload
// 需要將`nil`解釋為`keep-alive`
func (m *Message) Serialize() []byte {
if m == nil {
return make([]byte, 4)
}
length := uint32(len(m.Payload) + 1)
buf := make([]byte, 4+length)
binary.BigEndian.PutUint32(buf[0:4], length)
buf[4] = byte(m.ID)
copy(buf[5:], m.Payload)
return buf
}
為讀取資訊,也需要依照資訊格式編寫函數。先讀取4個位元組並作為一個uint32
以表示長度length,然後依據這個數字讀取相應位數的數據,這部分中的第一個位元組表示ID,剩下的表示payload。
// Read函數用於解析資訊
func Read(r io.Reader) (*Message, error) {
lengthBuf := make([]byte, 4)
_, err := io.ReadFull(r, lengthBuf)
if err != nil {
return nil, err
}
length := binary.BigEndian.Uint32(lengthBuf)
// keep-alive
if length == 0 {
return nil, nil
}
messageBuf := make([]byte, length)
_, err = io.ReadFull(r, messageBuf)
if err != nil {
return nil, err
}
m := Message{
ID: messageID(messageBuf[0]),
Payload: messageBuf[1:],
}
return &m, nil
}
位域(~/bitfield/bitfield.go)
peers主機使用位域來高效地編碼自身能夠提供的資源分塊。位域類似基於位元組的數組,被標為1的位即代表擁有這個資源分塊。因為使用單個的位即能完成標註,位域有極高的壓縮能力,這意味著在一個布爾(bool
)空間內完成了8次布爾類型的操作。
當然這樣的思路需要一定的代價:可以定址的最小記憶體單位是位元組,處理單個的位就需要額外的函數設計。
// Bitfield用以表示一台peer主機擁有的資源分塊
type Bitfield []byte
// HasPiece用以表明一個位域(bitfield)是否有特定的索引集
func (bf Bitfield) HasPiece(index int) bool {
byteIndex := index / 8
offset := index % 8
if byteIndex < 0 || byteIndex >= len(bf) {
return false
}
return bf[byteIndex] >> uint(7-offset)&1 != 0
}
// SetPiece用以在位域設置單個位
func (bf Bitfield) SetPiece(index int) {
byteIndex := index / 8
offset := index % 8
// 撇除不合規的索引
if byteIndex < 0 || byteIndex >= len(bf) {
return
}
bf[byteIndex] |= 1 << uint(7-offset)
}
組裝
至此完成了所有下載種子文件的工具:
- 從trackers伺服器獲得了peers主機列表;
- 與peers主機達成TCP連接;
- 與peers主機進行握手;
- 與peers主機收發資訊。
現在面臨的問題是如何解決下載必然造成的高並發(concurrency),並且需要統一管理每個連接的peer主機的狀態(state)。
高並發(~/p2p/p2p.go)
在Effective Go中對並發的描述中有這樣一句話:
Do not communicate by sharing memory; instead, share memory by communicating.
官網給出了解釋。
這裡將Go中重要的Channel類型作為簡潔且執行緒安全的隊列。Channel可以被認為是管道,通過並發核心單元就可以發送或者接收數據進行通訊(communication)。
建立兩個Channel來同步並發工作:一個用於在peers主機間分派工作(要下載的資源分塊),另一個用於已下載的分塊。
workQueue := make(chan *pieceWork, len(t.PieceHashes))
results := make(chan *pieceResult)
for index, hash := range t.PieceHashes {
length := t.calculatePieceSize(index)
workQueue <- &pieceWork{index, hash, length}
}
// 執行下載
for _, peer := range t.Peers {
go t.startDownloadWorker(peer, workQueue, results)
}
// 收集分塊
buf := make([]byte, t.Length)
donePieces := 0
for donePieces < len(t.PieceHashes) {
res := <- results
begin, end := t.calculateBoundsForPiece(res.index)
copy(buf[begin:end], res.buf)
donePieces ++
percent := float64(donePieces) / float64(len(t.PieceHashes)) * 100
numWorkers := runtime.NumGoroutine() - 1
log.Printf("(%0.2f%%) downloaded piece #%d from %d peers\n", percent, res.index, numWorkers)
}
close(workQueue)
為取得的每個peer主機都生成一個goroutine(輕量級執行緒)。每個執行緒連接peer主機並握手,然後從workQueue
中抽取任務,嘗試進行下載,並把下載得到的分塊傳至名為results
的channel。
可以用流程圖表示這個過程:
func (t *Torrent) startDownloadWorker(peer peers.Peer, workQueue chan *pieceWork, results chan *pieceResult) {
c, err := client.New(peer, t.PeerID, t.InfoHash)
if err != nil {
log.Printf("could not handshake eith %s\ndisconnecting\n", peer.IP)
return
}
defer c.Conn.Close()
log.Printf("completed handshake with %s\n", peer.IP)
c.SendUnchoke()
c.SendInterested()
for pw := range workQueue {
if !c.Bitfield.HasPiece(pw.index) {
workQueue <- pw // 將分塊重新放入隊列
continue
}
// 下載分塊
buf, err := attemptDownloadPiece(c, pw)
if err != nil {
log.Println("exiting", err)
workQueue <- pw // 將分塊重新放入隊列
return
}
err = checkIntegrity(pw, buf)
if err != nil {
log.Printf("piece #%d failed integrity check\n", pw.index)
workQueue <- pw // 將分塊重新放入隊列
continue
}
c.SendHave(pw.index)
results <- &pieceResult{pw.index, buf}
}
}
狀態(~/p2p/p2p.go)
除了上述的流程式控制制,還需要對每台peer主機跟蹤狀態。這裡使用一個結構體記錄跟蹤資訊,並且需要實時更改跟蹤記錄,如:從該主機下載的量、向該主機發起請求的量、是否被該主機認定為阻塞。如果足夠專業,可以把這種監控擴展為一種有限狀態機,但限於項目體量,一個結構體就足以完成任務。
type pieceProgress struct {
index int
client *client.Client
buf []byte
downloaded int
requested int
backlog int
}
func (state *pieceProgress) readMessage() error {
msg, err := state.client.Read()
if err != nil {
return err
}
if msg == nil { // keep-alive
return nil
}
switch msg.ID {
case message.MsgUnchoke:
state.client.Choked = false
case message.MsgChoke:
state.client.Choked = true
case message.MsgHave:
index, err := message.ParseHave(msg)
if err != nil {
return err
}
state.client.Bitfield.SetPiece(index)
case message.MsgPiece:
n, err := message.ParsePiece(state.index, state.buf, msg)
if err != nil {
return err
}
state.downloaded += n
state.backlog --
}
return nil
}
一般而言,BT客戶端的任務隊列就是5個,所以這裡也設定為5個。任務隊列的大小在不同網路環境中表現不同,常常提升至10個左右時表現會更好,因此較新的BT的客戶端都會彈性調整隊列大小。
// MaxBlockSize表示單個請求最多可以獲取的位元組數
const MaxBlockSize = 16384
// MaxBacklog表示客戶端在管道中能夠保有的最多未完成請求數
const MaxBacklog = 5
func attemptDownloadPiece(c *client.Client, pw *pieceWork) ([]byte, error) {
state := pieceProgress{
index: pw.index,
client: c,
buf: make([]byte, pw.length),
}
// 設定超時檢測以幫助刪去不能正常運行的peers主機
c.Conn.SetDeadline(time.Now().Add(30 * time.Second))
defer c.Conn.SetDeadline(time.Time{})
for state.downloaded < pw.length {
if !state.client.Choked {
for state.backlog < MaxBacklog && state.requested < pw.length {
blockSize := MaxBlockSize
if pw.length - state.requested < blockSize {
blockSize = pw.length - state.requested
}
err := c.SendRequest(pw.index, state.requested, blockSize)
if err != nil {
return nil, err
}
state.backlog ++
state.requested += blockSize
}
}
err := state.readMessage()
if err != nil {
return nil, err
}
}
return state.buf, nil
}
主函數
最後來到了主函數。
func main() {
inPath := os.Args[1]
outPath := os.Args[2]
tf, err := torrentfile.Open(inPath)
if err != nil {
log.Fatal(err)
}
err = tf.DownloadToFile(outPath)
if err != nil {
log.Fatal(err)
}
}
項目運行截圖
將項目打包為單程式,這裡僅展示Windows平台:
現在使用下圖中的文件進行下載測試:
該程式只有一種命令格式:
> Tiny-BT-Client [種子文件名] [最終產物文件名]
下載成功: