从零开始编写一个BitTorrent下载器

从零开始编写一个BitTorrent下载器

BT协议

简介

BT协议Bit Torrent(BT)是一种通信协议,又是一种应用程序,广泛用于对等网络通信(P2P)。曾经风靡一时,由于它引起了巨大的流量,对因特网的运营、维护和管理都产生了重要的影响。

BT协议的典型特征就是没有中心服务器。BT协议中,作为参与者的机器被称为peerspeer之间的通信协议又被称为peer wire protocal,即peer连线协议,是一个基于TCP协议的应用层协议。

BT协议在20年里不断发展(从2001年开始),加入加密、私有种子等设计,也扩展了搜寻peer主机的方法。

连接

由于没有中心服务器,参与者需要使用另外的方法取得他人的地址,以建立对等连接,确定自己的机器应当从何处下载需要的文件。传统的BT协议使用中介服务器trackers来告知每个参与者如何进行下载。trackers服务器是基于HTTP的,这类服务器本身不托管文件资源,仅为每个参与者分配peers。

在BT协议网络中传播违法资源的现象十分常见,这导致其中介服务器常常会受到法律制裁,查封事件屡见不鲜。要解决这一问题,就需要将主机搜寻的工作下放到每个参与者的机器,即分布式处理(distributed process)。BT协议未来的核心就是DHT、PEX、磁力链

.torrent文件解析

以debian发布的镜像文件种子为例。

image

一个.torrent文件描述了可下载文件的内容以及需要连接到的tracker中介服务器的信息,其编码格式为Bencode

文件的头部信息可以直接以文本形式查看:

d8:announce41://bttracker.debian.org:6969/announce7:comment35:"Debian CD from cdimage.debian.org"13:creation datei1612616380e9:httpseedsl146://cdimage.debian.org/cdimage/release/edu//srv/cdbuilder.debian.org/dst/deb-cd/weekly-builds/amd64/iso-cd/debian-edu-10.8.0-amd64-netinst.iso146://cdimage.debian.org/cdimage/archive/edu//srv/cdbuilder.debian.org/dst/deb-cd/weekly-builds/amd64/iso-cd/debian-edu-10.8.0-amd64-netinst.isoe4:infod6:lengthi425721856e4:name35:debian-edu-10.8.0-amd64-netinst.iso12:piece lengthi262144e6:pieces32480:[每个部分的hash,以二进制表示]

之后的内容为二进制,无法直接查看。

美化一下这个部分的信息,可以发现清晰的结构特征:

d
	8:announce
		41://bttracker.debian.org:6969/announce
	7:comment
		35:"Debian CD from cdimage.debian.org"
	13:creation date
		i1612616380e
	9:httpseeds
        l
            146://cdimage.debian.org/cdimage/release/edu//srv/cdbuilder.debian.org/dst/deb-cd/weekly-builds/amd64/iso-cd/debian-edu-10.8.0-amd64-netinst.iso
            146://cdimage.debian.org/cdimage/archive/edu//srv/cdbuilder.debian.org/dst/deb-cd/weekly-builds/amd64/iso-cd/debian-edu-10.8.0-amd64-netinst.iso
        e
	4:info
	d
		6:length
			i425721856e
		4:name
			35:debian-edu-10.8.0-amd64-netinst.iso
		12:piece length
			i262144e
		6:pieces
			32480:[每个部分的hash,以二进制表示]
	e
e

其中包含了tracker服务器的URL、创建事件(Unix时间戳)、文件名和文件大小、以及一系列表示每个文件块的SHA-1哈希值的二进制片段(文件块是指文件被等量拆分后形成的几个部分)。每个种子中文件被拆分的大小依据是不同的,但基本处在一个区间内(256KB到1MB)。因为这样的设计,大型文件将会被拆分成众多碎片。在实际下载中,下载执行者会从能够连接的那些peers主机下载文件块,并且根据种子文件校验其哈希值,最后拼接成完整的文件。

这种机制能够确保每个文件块的完整性,抵御设备故障或恶意投毒(torrent poisoning)造成的损害。如果攻击者不能破解SHA-1进行原像攻击(preimage attack),那么下载取得的文件就是安全可靠的。

Bencode编码

从已知的信息可以看出,.torrent文件中的元数据均以“键:值”形式存储,故可以将整个内容理解为一个经过特殊编码的字典,或者一个近似的JSON。

Bencode中,数字采用十进制编码,相比纯二进制编码显得效率较低,但保证了良好的跨平台性(无大小端存储问题)。

Bencode支持四种类型的数据:string、int、Dictionary<string, object>、List<object>。

  • string类型

    string类型的编码格式为[length]:[string],以字符串长度开头,以字符串内容结束。示例:

    "abc" => 3:abc
    
  • int类型

    int类型的编码格式为i[int]e,以i开头,以e结尾。示例:

    123 => i123e
    
  • Dictionary<string, object>类型

    Dictionary<string, object>类型的编码格式为d[Key-Value Pair]e,以d开头,以e结尾。示例:

    Dictionary<{"name":"create chen"},{"age":23}> => d4:name11:create chen3:agei23ee
    
  • List<object>类型

    List<object>类型的编码格式为l[object]e,以l开头,以e结尾。示例:

    List<"abc", 123> => l3:abci123ee
    

Bencode实现

编码

public static string Encode(object obj)
{
    var sb = new StringBuilder();
    if(obj is Dictionary<string,object>)
    {
        var parseObj = obj as Dictionary<string, object>;
        sb.Append("d");
        foreach (var o in parseObj)
        {
            sb.AppendFormat("{0}:{1}{2}", o.Key.Length,o.Key, Encode(o.Value));
        }
        sb.Append("e");
    }
    if ((obj as int?) != null)
    {
        var parseObj = (int) obj;
        sb.AppendFormat("i{0}e", parseObj);
    }
    if (obj is List<object>)
    {
        var parseObj = obj as List<object>;
        sb.Append("l");
        foreach (var o in parseObj)
        {
            sb.Append(Encode(o));
        }
        sb.Append("e");
    }
    if (obj is string)
    {
        var parseObj = obj as string;
        sb.AppendFormat("{0}:{1}", parseObj.Length, parseObj);
    }
    return sb.ToString();
}

解码

public static object Decode(string s)
{
    return DecodeObject(s, ref _index, EncodeState.Value);
}
 
private enum EncodeState
{
    Key,
    Value
}
 
private static int _index;
 
private static object DecodeObject(string str,ref int index, EncodeState state)
{
    var obj = new Dictionary<string, object>();
 
    var c = str[index];
    while (c != 'e')
    {
        if (c == 'd')
        {
            index++;
            return DecodeObject(str, ref index,EncodeState.Key);
        }
        if (c == 'i')
        {
            var value = "";
            index++; c = str[index];
            while (c != 'e')
            {
                value += c.ToString(CultureInfo.InvariantCulture);
                index++;
                c = str[index];
            }
            return Convert.ToInt32(value);
        }
        if (c == 'l')
        {
            index++;
            var value = new List<object>();
            while (str[index]!='e')
            {
                value.Add(DecodeObject(str, ref index, EncodeState.Value));
                index++;
            }
            return value;
        }
        if ('0' < c && c <= '9') 
        {
            string strLength = "";
            while (c != ':')
            {
                strLength += c.ToString(CultureInfo.InvariantCulture);
                c = str[++index];
            }
            var length = Convert.ToInt32(strLength);
            var strContent = "";
            for (int i = 0; i < length; i++)
            {
                strContent += str[index + 1].ToString(CultureInfo.InvariantCulture);
                index++;
            }
            if (state == EncodeState.Value)
            {
                return strContent;
            }
            index++;
            obj.Add(strContent, DecodeObject(str, ref index, EncodeState.Value));
            state = EncodeState.Key;
            index++;
        }
        c = str[index];
    }
    return obj;
}

编写项目

这里使用Go来编写,也是首次使用Go完成网络工具。仅包含主要代码,完整项目见Github

寻找

解析种子(~/torrentfile/torrentfile.go)

import (
	"github.com/jackpal/bencode-go"
)

这里省略了自带库文件的导入。

type bencodeInfo struct {
	Pieces string `bencode:"pieces"`
	PieceLength int `bencode:"piece length"`
	Length int `bencode:"length"`
	Name string `bencode:"name"`
}

type bencodeTorrent struct {
	Announce string `bencode:"announce"`
	Info bencodeInfo `bencode:"info"`
}
// Open函数用于解析种子
func Open(path string) (TorrentFile, error) {
	file, err := os.Open(path)
	if err != nil {
		return TorrentFile{}, err
	}
	defer file.Close()

	bto := bencodeTorrent{}
	err = bencode.Unmarshal(file, &bto)
	if err != nil {
		return TorrentFile{}, err
	}
	return bto.toTorrentFile()
}

处理时,将pieces对应的值(原先为哈希值的字符串)变成哈希值切片(每个长度为20 bytes),以便后续调用每个独立的哈希值。另外,计算info对应的整个字典(含有名称、大小、文件块哈希值)的SHA-1哈希值,存储在infohash,在与trackers服务器和peers主机交互时表示所需的文件。

type TorrentFile struct {
	Announce string
	InfoHash [20]byte
	PieceHashes [][20]byte
	PieceLength int
	Length int
	Name string
}
func (bto bencodeTorrent) toTorrentFile() (TorrentFile, error) {
	// ...
}

从trackers服务器获取peers主机地址(~/torrentfile/tracker.go)

处理完种子后,就可以向trackers服务器发起请求:作为一台peer主机,需要获取同一网络中的其它peers主机的列表。只需要对announce对应URL发起GET请求(需要设置几个请求参数)。

// buildTrackerURL函数用于构成请求peers列表的序列
func (t * TorrentFile) buildTrackerURL(peerID [20]byte, port uint16) (string, error) {
	base, err:= url.Parse(t.Announce)
	if err != nil {
		return "", err
	}
	params := url.Values{
		"info_hash": []string{string(t.InfoHash[:])},
		"peer_id": []string{string(peerID[:])},
		"port": []string{strconv.Itoa(int(port))},
		"uploaded": []string{"0"},
		"downloaded": []string{"0"},
		"compact": []string{"1"},
		"left": []string{strconv.Itoa(t.Length)},
	}
	base.RawQuery = params.Encode()
	return base.String(), nil
}

其中重要的参数有:

  • info_hash:用以标识需要下载的文件,其值就是之前由info对应值计算出的infohash。trackers服务器基于这个值返回能够为下载提供资源的peers主机。
  • peer_id:20字节长的数据,用于向peers主机和trackers服务器标识自己的身份。具体实现仅仅是产生随机的20个字节。真实的BitTorrent客户端ID形如-TR2940-k8hj0wgej6ch,标出了客户端软件及其版本(TR2940表示Transmission Client 2.94)。

处理trackers服务器的响应(~/peers/peers.go)

服务器响应也是采用Bencode编码的:

d
  8:interval
    i900e
  5:peers
    252:[很长的二进制块]
e

interval表示本地应当在多长的时间间隔后再次向tracker服务器请求以刷新peers主机列表,900的单位是秒。peers包含了每个peer主机的IP地址,以二进制表示,由若干个6字节元组成,前4个字节表示主机IP,后2个字节表示端口号(大端存储的16位无符号整型,uint16)。大端存储,即big-endian,是网络中所采用的存储方式(相对于小端存储),故被称为network order。运算时可以直接将一组字节从左至右拼接以形成所要表达的整数,如0x1A0xE1能拼接成0x1AE1,即十进制的6881。

type Peer struct {
	IP net.IP
	Port uint16
}
// Unmarshal函数从缓冲区解析IP及其端口
func Unmarshal(peerBin []byte)([]Peer, error) {
	const peerSize = 6
	numPeers := len(peerBin) / peerSize
	if len(peerBin) % peerSize != 0 {
		err := fmt.Errorf("received malformed peers")
		return nil, err
	}
	peers := make([]Peer, numPeers)
	for i := 0; i < numPeers ; i++ {
		offset := i * peerSize
		peers[i].IP = net.IP(peerBin[offset : offset+4])
		peers[i].Port = binary.BigEndian.Uint16(peersBin[offset+4 : offset+6])
	}
	return peers, nil
}

下载

在取得peers主机的地址后,就可以进行下载了。对每台peer主机的连接,有如下的几个步骤:

  1. 与目标peer建立TCP连接;
  2. 完成BitTorrent握手;
  3. 交换信息(告知对方本地需要的资源)。

TCP连接(~/client/client.go)

设定一个超时检测机制,防止消耗过多网络资源。

conn, err := net.DialTimeout("tcp", peer.String(), 3*time.Second)
if err != nil {
    return nil, err
}

握手(~/handshake/handshake.go)

通过达成握手,以确定某peer主机具有期望的功能:

  • 能够使用BT协议通信;
  • 能够理解本机发出的信息,并作出响应;
  • 持有本机需要的文件资源,或者持有文件资源在网络中位置的索引。

BitTorrent握手行为需要传输的信息由5个部分构成:

  1. 协议标识(表明这是BitTorrent协议)的长度,即19,十六进制表示为0x13
  2. 协议标识,被称为pstr,即BitTorrent protocol
  3. 8个保留字节,默认全为0,如果客户端支持BT协议的某些扩展,则需要将其中一些设置为1;
  4. infohash,基于种子中info对应的全部信息计算得出的哈希值,用于标明本机需要的文件;
  5. PEER ID,用于标明本机身份。

这些信息组合起来,就是达成握手需要的序列:

\x13BitTorrent protocol\x00\x00\x00\x00\x00\x00\x00\x00\x86\xd4\xc8\x00\x24\xa4\x69\xbe\x4c\x50\xbc\x5a\x10\x2c\xf7\x17\x80\x31\x00\x74-TR2940-k8hj0wgej6ch

本机发出这些信息后,peers主机应当以相同形式响应,且返回的infohash应当与本机持有的一致。

使用一个结构体表示握手包,并添加一些序列化、读取函数。

// 握手包结构体
type Handshake struct {
	Pstr string
	InfoHash [20]byte
	PeerID [20]byte
}
//Serialize函数用于序列化握手信息
func (h *Handshake) Serialize() []byte {
	buf := make([]byte, len(h.Pstr)+49)
	buf[0] = byte(len(h.Pstr))
	curr := 1
	curr += copy(buf[curr:], h.Pstr)
	curr += copy(buf[curr:], make([]byte, 8))  //即8个保留字节
	curr += copy(buf[curr:], h.InfoHash[:])
	curr += copy(buf[curr:], h.PeerID[:])
	return buf
}

func Read(r io.Reader) (* Handshake, error) {
	// ...
}

信息

完成握手后就将开始正式的收发信息。如果远端的peers主机未能做好收发的准备,本机仍旧无法发送信息,此时本机会被远端认定为阻塞的(choked)。在peers主机完成准备后,会向本机发送解除阻塞(unchoke)信息。代码设计中,默认需要杰出阻塞才能进行下载。

解析(~/message/message.go)

信息包含三个部分:长度、ID、payload。

长度为32位整型,是大端存储形式的4个字节。ID用以表示信息类型,这在代码中进行了详细定义。

type messageID uint8

const (
	// MsgChoke表示阻塞
	MsgChoke messageID = 0
	// MsgUnchoke表示解除阻塞
	MsgUnchoke messageID = 1
	// MsgInterested表示信息相关
	MsgInterested messageID = 2
	// MsgNotInterested表示信息不相关
	MsgNotInterested messageID = 3
	// MsgHave表示提醒接收者,发送者拥有资源
	MsgHave messageID = 4
	// MsgBitfield表示发送者拥有资源的哪些部分
	MsgBitfield messageID = 5
	// MsgRequest表示向接收方请求数据
	MsgRequest messageID = 6
	// MsgPiece表示发送数据以完成请求
	MsgPiece messageID = 7
	// MsgCancel表示取消一个请求
	MsgCancel messageID = 8
)

//Message结构体储存ID和包含信息的payload
type Message struct {
	ID messageID
	Payload []byte
}
// Serialize函数用于执行序列化
// 信息依次为前缀、信息的ID、payload
// 需要将`nil`解释为`keep-alive`
func (m *Message) Serialize() []byte {
	if m == nil {
		return make([]byte, 4)
	}
	length := uint32(len(m.Payload) + 1)
	buf := make([]byte, 4+length)
	binary.BigEndian.PutUint32(buf[0:4], length)
	buf[4] = byte(m.ID)
	copy(buf[5:], m.Payload)
	return buf
}

为读取信息,也需要依照信息格式编写函数。先读取4个字节并作为一个uint32以表示长度length,然后依据这个数字读取相应位数的数据,这部分中的第一个字节表示ID,剩下的表示payload

// Read函数用于解析信息
func Read(r io.Reader) (*Message, error) {
	lengthBuf := make([]byte, 4)
	_, err := io.ReadFull(r, lengthBuf)
	if err != nil {
		return nil, err
	}
	length := binary.BigEndian.Uint32(lengthBuf)

	// keep-alive
	if length == 0 {
		return nil, nil
	}

	messageBuf := make([]byte, length)
	_, err = io.ReadFull(r, messageBuf)
	if err != nil {
		return nil, err
	}

	m := Message{
		ID: messageID(messageBuf[0]),
		Payload: messageBuf[1:],
	}
	return &m, nil
}
位域(~/bitfield/bitfield.go)

peers主机使用位域来高效地编码自身能够提供的资源分块。位域类似基于字节的数组,被标为1的位即代表拥有这个资源分块。因为使用单个的位即能完成标注,位域有极高的压缩能力,这意味着在一个布尔(bool)空间内完成了8次布尔类型的操作。

当然这样的思路需要一定的代价:可以寻址的最小内存单位是字节,处理单个的位就需要额外的函数设计。

// Bitfield用以表示一台peer主机拥有的资源分块
type Bitfield []byte
// HasPiece用以表明一个位域(bitfield)是否有特定的索引集
func (bf Bitfield) HasPiece(index int) bool {
	byteIndex := index / 8
	offset := index % 8
	if byteIndex < 0 || byteIndex >= len(bf) {
		return false
	}
	return bf[byteIndex] >> uint(7-offset)&1 != 0
}

// SetPiece用以在位域设置单个位
func (bf Bitfield) SetPiece(index int) {
	byteIndex := index / 8
	offset := index % 8

	// 撇除不合规的索引
	if byteIndex < 0 || byteIndex >= len(bf) {
		return
	}
	bf[byteIndex] |= 1 << uint(7-offset)
}

组装

至此完成了所有下载种子文件的工具:

  • 从trackers服务器获得了peers主机列表;
  • 与peers主机达成TCP连接;
  • 与peers主机进行握手;
  • 与peers主机收发信息。

现在面临的问题是如何解决下载必然造成的高并发(concurrency),并且需要统一管理每个连接的peer主机的状态(state)

高并发(~/p2p/p2p.go)

Effective Go中对并发的描述中有这样一句话:

Do not communicate by sharing memory; instead, share memory by communicating.

官网给出了解释。

这里将Go中重要的Channel类型作为简洁且线程安全的队列。Channel可以被认为是管道,通过并发核心单元就可以发送或者接收数据进行通讯(communication)。

建立两个Channel来同步并发工作:一个用于在peers主机间分派工作(要下载的资源分块),另一个用于已下载的分块。

workQueue := make(chan *pieceWork, len(t.PieceHashes))
	results := make(chan *pieceResult)
	for index, hash := range t.PieceHashes {
		length := t.calculatePieceSize(index)
		workQueue <- &pieceWork{index, hash, length}
	}

	// 执行下载
	for _, peer := range t.Peers {
		go t.startDownloadWorker(peer, workQueue, results)
	}

	// 收集分块
	buf := make([]byte, t.Length)
	donePieces := 0
	for donePieces < len(t.PieceHashes) {
		res := <- results
		begin, end := t.calculateBoundsForPiece(res.index)
		copy(buf[begin:end], res.buf)
		donePieces ++

		percent := float64(donePieces) / float64(len(t.PieceHashes)) * 100
		numWorkers := runtime.NumGoroutine() - 1
		log.Printf("(%0.2f%%) downloaded piece #%d from %d peers\n", percent, res.index, numWorkers)
	}
	close(workQueue)

为取得的每个peer主机都生成一个goroutine(轻量级线程)。每个线程连接peer主机并握手,然后从workQueue中抽取任务,尝试进行下载,并把下载得到的分块传至名为resultschannel

可以用流程图表示这个过程:

image

func (t *Torrent) startDownloadWorker(peer peers.Peer, workQueue chan *pieceWork, results chan *pieceResult) {
	c, err := client.New(peer, t.PeerID, t.InfoHash)
	if err != nil {
		log.Printf("could not handshake eith %s\ndisconnecting\n", peer.IP)
		return
	}
	defer c.Conn.Close()
	log.Printf("completed handshake with %s\n", peer.IP)

	c.SendUnchoke()
	c.SendInterested()

	for pw := range workQueue {
		if !c.Bitfield.HasPiece(pw.index) {
			workQueue <- pw  // 将分块重新放入队列
			continue
		}

		// 下载分块
		buf, err := attemptDownloadPiece(c, pw)
		if err != nil {
			log.Println("exiting", err)
			workQueue <- pw  // 将分块重新放入队列
			return
		}

		err = checkIntegrity(pw, buf)
		if err != nil {
			log.Printf("piece #%d failed integrity check\n", pw.index)
			workQueue <- pw  // 将分块重新放入队列
			continue
		}

		c.SendHave(pw.index)
		results <- &pieceResult{pw.index, buf}
	}
}

状态(~/p2p/p2p.go)

除了上述的流程控制,还需要对每台peer主机跟踪状态。这里使用一个结构体记录跟踪信息,并且需要实时更改跟踪记录,如:从该主机下载的量、向该主机发起请求的量、是否被该主机认定为阻塞。如果足够专业,可以把这种监控扩展为一种有限状态机,但限于项目体量,一个结构体就足以完成任务。

type pieceProgress struct {
	index int
	client *client.Client
	buf []byte
	downloaded int
	requested int
	backlog int
}
func (state *pieceProgress) readMessage() error {
	msg, err := state.client.Read()
	if err != nil {
		return err
	}

	if msg == nil {  // keep-alive
		return nil
	}

	switch msg.ID {
	case message.MsgUnchoke:
		state.client.Choked = false
	case message.MsgChoke:
		state.client.Choked = true
	case message.MsgHave:
		index, err := message.ParseHave(msg)
		if err != nil {
			return err
		}
		state.client.Bitfield.SetPiece(index)
	case message.MsgPiece:
		n, err := message.ParsePiece(state.index, state.buf, msg)
		if err != nil {
			return err
		}
		state.downloaded += n
		state.backlog --
	}
	return nil
}

一般而言,BT客户端的任务队列就是5个,所以这里也设定为5个。任务队列的大小在不同网络环境中表现不同,常常提升至10个左右时表现会更好,因此较新的BT的客户端都会弹性调整队列大小。

// MaxBlockSize表示单个请求最多可以获取的字节数
const MaxBlockSize = 16384

// MaxBacklog表示客户端在管道中能够保有的最多未完成请求数
const MaxBacklog = 5
func attemptDownloadPiece(c *client.Client, pw *pieceWork) ([]byte, error) {
	state := pieceProgress{
		index: pw.index,
		client: c,
		buf: make([]byte, pw.length),
	}

	// 设定超时检测以帮助删去不能正常运行的peers主机
	c.Conn.SetDeadline(time.Now().Add(30 * time.Second))
	defer c.Conn.SetDeadline(time.Time{})

	for state.downloaded < pw.length {
		if !state.client.Choked {
			for state.backlog < MaxBacklog && state.requested < pw.length {
				blockSize := MaxBlockSize
				if pw.length - state.requested < blockSize {
					blockSize = pw.length - state.requested
				}

				err := c.SendRequest(pw.index, state.requested, blockSize)
				if err != nil {
					return nil, err
				}
				state.backlog ++
				state.requested += blockSize
			}
		}
		err := state.readMessage()
		if err != nil {
			return nil, err
		}
	}
	return state.buf, nil
}

主函数

最后来到了主函数。

func main() {
	inPath := os.Args[1]
	outPath := os.Args[2]

	tf, err := torrentfile.Open(inPath)
	if err != nil {
		log.Fatal(err)
	}
	err = tf.DownloadToFile(outPath)
	if err != nil {
		log.Fatal(err)
	}
}

项目运行截图

将项目打包为单程序,这里仅展示Windows平台:

image

现在使用下图中的文件进行下载测试:

image

该程序只有一种命令格式:

> Tiny-BT-Client [种子文件名] [最终产物文件名]

image

image

下载成功:

image

image

Tags: