pythonpcap原生python讀取

本文程式碼都由python編寫,無需安裝第三方拓展庫,程式碼更新:https://github.com/mengdj/python

tcp/ip協議 4層架構

.pcap文件是一種簡單網路包記錄文件,較它的升級版.pcapng簡單多了

pcap結構圖

可以看到.pcap文件,就由一個pcap文件頭+無數個(pcap包頭+包數據組成),我們只需要一個個解析即可,文件頭用於描述.pcap文件本身(就一個文件頭),包頭則描述包的資訊(抓取時間、長度等等),包的數據就是我們要的4層數據了(鏈路+網路+傳輸+應用),值得注意的是.pcap文件抓取的包是鏈路層抓取的,所以此時的包還沒有經過重組,網路包重組(ip重組、tcp重組),本文暫不說明,後期可關注github,會用python實現的

Pcap文件頭24B各欄位說明: Magic:        4B:0×1A 2B 3C 4D:用來識別文件自己和位元組順序。0xa1b2c3d4用來表示按照原來的順序讀取,0xd4c3b2a1表示下面的位元組都要交換順序讀取。一般,我們使用0xa1b2c3d4 Major:        2B,0×02 00:當前文件主要的版本號 Minor:        2B,0×04 00當前文件次要的版本號 ThisZone:    4B 時區。GMT和本地時間的相差,用秒來表示。如果本地的時區是GMT,那麼這個值就設置為0.這個值一般也設置為0 SigFigs:4B時間戳的精度;全零 SnapLen:    4B最大的存儲長度(該值設置所抓獲的數據包的最大長度,如果所有數據包都要抓獲,將該值設置為65535; 例如:想獲取數據包的前64位元組,可將該值設置為64) LinkType:    4B鏈路類型 常用類型: 0           BSD loopback devices, except for later OpenBSD 1           Ethernet, and Linux loopback devices 6           802.5 Token Ring 7           ARCnet 8           SLIP 9           PPP 10          FDDI 100         LLC/SNAP-encapsulated ATM 101         「raw IP」, with no link 102         BSD/OS SLIP 103         BSD/OS PPP 104         Cisco HDLC 105         802.11 108         later OpenBSD loopback devices (with the AF_value in network byte order) 113         special Linux 「cooked」 capture 114         LocalTalk

 現在我們分別用python來解析(注意解析時,每一層程式碼都只拆分出上層數據,然後交給上層自己處理,)

.pcap文件頭處理 ==> .pcap包處理 ==> 鏈路層==> 網路層==> 傳輸層==> 應用層

1.pcap.py 文件頭處理

解析文件頭以及眾多包,拿到包數據但不細節,解析包的工作我們放到包處理來做,同時考慮到文件通常很大,我們用生成器來處理遍歷操作

#!/usr/bin/env python3  # -*- coding: utf-8 -*-  __author__ = "[email protected]"  from pcap.proc.packet import Packet  from pcap.proc.util import BytesBuffer  from pcap.proc.util import BytesOrder      class PcapHead(object):      """pcap文件頭 24B"""      _magic_number = None      _version_major = None      _version_minor = None      _thiszone = None      _sigfigs = None      _snaplen = None      _link_type = None        def __init__(self, data):          assert len(data) == 24          self._magic_number = data[:4]          if PcapHead.signature(self._magic_number) is False:              raise Exception("不支援的文件格式")          self._version_major = BytesOrder.bytes2int(data[4:6])          self._version_minor = BytesOrder.bytes2int(data[6:8])          self._thiszone = BytesOrder.bytes2int(data[8:12])          self._sigfigs = BytesOrder.bytes2int(data[12:16])          self._snaplen = BytesOrder.bytes2int(data[16:20])          self._link_type = BytesOrder.bytes2int(data[20:24])        def __str__(self):          return "order:%s magor:%d minor:%d zone:%d sig:%d snap_len:%d type:%d" % (              BytesOrder.order, self._version_major, self._version_minor, self._thiszone, self._sigfigs, self._snaplen,              self._link_type)        @staticmethod      def signature(data):          """驗證簽名同時確定排序,雖然還無法讀取到大小端但不影響"""          sig = BytesOrder.bytes2int(data)          if sig == 0xa1b2c3d4:              BytesOrder.order = "big"              return True          elif sig == 0xd4c3b2a1:              BytesOrder.order = "little"              return True          return False      class Pcap(object):      """.pcap解析類"""      __head = None      __ret = 0        def parse(self, file, buffSize=2048):          """          解析pcap文件,返回值為一個生成器 yield          :param file:緩衝文件大小          :param buffSize:          :return:返回一個生成器(用於處理大包)          """          assert file != ""          _buff = BytesBuffer()          _packet = None          ret = 0          with open(file, "rb") as o:              ctx = None              while 1:                  # 優先處理緩衝區數據(如果快取數據超過了指定大小)                  bsize = len(_buff)                  if bsize > 0:                      if bsize >= buffSize:                          ctx = _buff.getvalue()                      else:                          _buff.write(o.read(buffSize))                          ctx = _buff.getvalue()                      _buff.clear()                  else:                      ctx = o.read(buffSize)                  size = len(ctx)                  if size > 0:                      if self.__head is None:                          # 文件頭佔24位元組                          if size >= 24:                              self.__head = PcapHead(ctx[:24])                              size -= 24                              ctx = ctx[24:]                          else:                              _buff.write(ctx)                      # 分析包頭(包頭佔16位元組)                      if size > 16:                          if _packet is None:                              _packet = Packet()                              ctx, size = _packet.parse(ctx)                              if _packet.finish():                                  yield _packet                                  ret += 1                                  _packet = None                              if size > 0:                                  _buff.write(ctx)                          else:                              ctx, size = _packet.parse(ctx)                              if _packet.finish():                                  yield _packet                                  ret += 1                                  _packet = None                              if size > 0:                                  _buff.write(ctx)                      else:                          _buff.write(ctx)                  else:                      break              del ctx          del _buff          self.__ret = ret        def __len__(self):          return self.__ret        @property      def head(self):          """獲取包頭,務必保證有調用parse後才能獲得包頭"""          return self.__head

2.packet.py 數據包處理

處理詳細包數據,並解析一層數據(交給鏈路層處理,獲得鏈路層MAC實例)

#!/usr/bin/env python3  # -*- coding: utf-8 -*-  __author__ = "[email protected]"  from pcap.proc.mac import MAC  from pcap.proc.util import BytesBuffer, BytesOrder, ProcData      class PacketHead(object):      """包頭 16B"""      _ts_sec = 0      _ts_usec = 0      _incl_len = 0      _orig_len = 0        def __init__(self, data):          self._ts_sec = BytesOrder.bytes2int(data[:4])          self._ts_usec = BytesOrder.bytes2int(data[4:8])          self._incl_len = BytesOrder.bytes2int(data[8:12])          self._orig_len = BytesOrder.bytes2int(data[12:16])        @property      def sec(self):          return self._ts_sec        @property      def usec(self):          return self._ts_usec        @property      def incl(self):          return self._incl_len        @property      def orig(self):          return self._orig_len        def __str__(self):          return "PACKET sec:%d usec:%d incl len:%d orig len:%d" % (              self._ts_sec, self._ts_usec, self._incl_len, self._incl_len)      class Packet(ProcData):      """數據包(未拆包)"""      _head = None      _buff = None      name = "Packet"        def __init__(self):          super(ProcData, self).__init__()          self._buff = BytesBuffer()        def parse(self, data):          """          解析包數據          :param data: 位元組數據          :return:    data,size          """          size = len(data)          assert size > 0          if self._head is None:              self._head = PacketHead(data)              size -= 16              data = data[16:]          if size > 0:              _bs = len(self._buff)              if _bs + size < self._head.incl:                  self._buff.write(data)                  size = 0                  data = None              else:                  offset = self._head.incl - _bs                  self._buff.write(data[:offset])                  data = data[offset:]                  size -= offset                  assert len(data) == size          return data, size        def __del__(self):          self._buff.close()        @property      def head(self):          return self._head        @property      def data(self):          return MAC(self._buff.getvalue(),None)        def finish(self):          return len(self._buff) == self._head.incl

3.mac.py 鏈路層

鏈路層其實很簡單,鏈路層由 14位元組(存儲目標mac,來源mac,上層協議類型)包頭+數據構成 其實我們可以發現底層協議都會有一個欄位,然後後面直接上層協議數據

#!/usr/bin/env python3  # -*- coding: utf-8 -*-  __author__ = "[email protected]"  from pcap.proc.arp import ARP  from pcap.proc.ip import IP  from pcap.proc.ipv6 import IPV6  from pcap.proc.util import ProcData      class MAC(ProcData):      """mac協議 14B+"""      _dst = None      _src = None      _type = None      _data = None        def __init__(self, data, upper):          super(MAC, self).__init__(upper)          size = len(data)          assert size > 18          self._dst = data[:6]          self._src = data[6:12]          self._type = data[12:14]          # fcs校驗欄位 self._fcs = data[size - 4:]          self._data = data[14:]        def __str__(self):          return "MAC dst=>%s src=>%s type:%s" % (self.dst_desc, self.src_desc, self.type_desc)        @property      def dst_desc(self):          return [hex(s).replace("0x", "").upper() for s in self._dst]        @property      def src_desc(self):          return [hex(s).replace("0x", "").upper() for s in self._src]        @property      def type_desc(self):          return [hex(i) for i in self._type]        @property      def dst(self):          return self._dst        @property      def src(self):          return self._src        @property      def type(self):          return self._type        @property      def data(self):          ret = None          if self._type[0] == 0x08:              if self._type[1] == 0x00:                  # ipv4 0x0800                  ret = IP(self._data, self)              elif self._type[1] == 0x06:                  # arp 0x0806                  ret = ARP(self._data, self)          elif self._type[0] == 0x86:              if self._type[1] == 0xdd:                  # ipv6 0x86dd                  ret = IPV6(self._data, self)          return ret

4.ip.py 網路層(ip協議)

#!/usr/bin/env python3  # -*- coding: utf-8 -*-  __author__ = "[email protected]"  from pcap.proc.tcp import TCP  from pcap.proc.udp import UDP  from pcap.proc.util import BytesOrder, ProcData      class Services(object):      """IP服務類型"""      PRIORITY = 0      DELAY = 0      THROUGHPUT = 0      RELIABILITY = 0      COST = 0      RESERVED = 0        def __init__(self, ser):          pass      class Flag(object):      """IP分片標誌(python偏移真坑)"""      DF = 0      MF = 0        def __init__(self, flag):          """              如果DF=0,那麼標識不允許分段;DF=1則是表示這個數據包允許分段。MF=0表示分完段              之後這個數據段是整個包的最後那段,MF=1則是不是最後段的標誌          """          self.DF = ((~(~(1 << 6))) & flag) >> 6          self.MF = ((~(~(1 << 5))) & flag) >> 5        def __str__(self):          return "(DF:%d MF:%d)" % (self.DF, self.MF)      class IP(ProcData):      """ip協議(ipv4) 20B"""      _header_version_len = 0      _service_set = 0      # 標示IP頭部有多少個4位元組,IP頭部最長是60位元組      _total_len = 0      _id = 0      _flag_offset = 0      _time_to_live = 0      _protocol = 0      _check_sum = 0      _src = 0      _dst = 0      _data = None      _flag = None        def __init__(self, data, upper):          super(IP, self).__init__(upper)          # 版本和長度各佔4位,一共1個位元組          self._header_version_len = data[0]          self._service_set = data[1]          self._total_len = data[2:4]          self._id = data[4:6]          self._flag_offset = data[6:8]          self._time_to_live = data[8]          self._protocol = data[9]          self._check_sum = data[10:12]          self._src = data[12:16]          self._dst = data[16:20]          self._data = data[self.head_len_byte:]        def __str__(self):          return (                  "IPv%d src:%s dst:%s len(header):%d service:%s len(total):%d id:%d flag:%s "                  "time to live:%d protocol:%d check sum:%s payload:%d" %                  (                      self.version, self.src, self.dst, self.head_len_byte, bin(self._service_set), self.total_len,                      self.id,                      self.flag, self.time_to_live, self._protocol,                      self._check_sum, len(self._data))          )        @property      def version(self):          return self._header_version_len >> 4        @property      def head_len(self):          return (0xff >> 4) & self._header_version_len        @property      def flag(self):          if self._flag is None:              self._flag = Flag(self._flag_offset[0])          return self._flag        @property      def total_len(self):          return BytesOrder.bytes2int(self._total_len, "big")        @property      def time_to_live(self):          return self._time_to_live        @property      def id(self):          """IP序號"""          return BytesOrder.bytes2int(self._id, "big")        @property      def src(self):          return [i for i in self._src]        @property      def dst(self):          return [i for i in self._dst]        @property      def head_len_byte(self):          """頭部位元組數"""          return self.head_len << 2        @property      def data(self):          """獲取傳輸層協議"""          ret = None          # 46~1500 檢測是否有填充數據(既數據部分不滿足46位元組會填充,傳遞時候要過濾掉這部分數據)          # tcp自身有分包機制,不用處理分包,其他協議需要處理分包          data = self._data[:self.total_len - 20]          if self._protocol == 0x06:              ret = TCP(data, self)          elif self._protocol == 0x11:              ret = UDP(data, self)          return ret

5.1.tcp.py 傳輸層(tcp協議)

tcp協議是一個很複雜的協議,如果你了解透了會對以後設計應用層協議大有幫助的,篇幅有限在這不廢話,如穩定性的udp實現,其實就是tcp的另外一個實現

#!/usr/bin/env python3  # -*- coding: utf-8 -*-  __author__ = "[email protected]"  from pcap.proc.util import BytesOrder, ProcData      class Flag(object):      """     CWR:擁塞窗口減少標誌被發送主機設置,用來表明它接收到了設置ECE標誌的TCP包。擁塞窗口是被TCP維護           的一個內部變數,用來管理髮送窗口大小。      ECE:ECN響應標誌被用來在TCP3次握手時表明一個TCP端是具備ECN功能的,並且表明接收到的TCP包的IP           頭部的ECN被設置為11。更多資訊請參考RFC793。      URG:緊急標誌。緊急標誌為"1"表明該位有效。      ACK:確認標誌。表明確認編號欄有效。大多數情況下該標誌位是置位的。TCP報頭內的確認編號欄內包含的           確認編號(w+1)為下一個預期的序列編號,同時提示遠端系統已經成功接收所有數據。      PSH:推標誌。該標誌置位時,接收端不將該數據進行隊列處理,而是儘可能快地將數據轉由應用處理。在處理           Telnet或rlogin等交互模式的連接時,該標誌總是置位的。      RST:複位標誌。用於複位相應的TCP連接。      SYN:同步標誌。表明同步序列編號欄有效。該標誌僅在三次握手建立TCP連接時有效。它提示TCP連接的服務端           檢查序列編號,該序列編號為TCP連接初始端(一般是客戶端)的初始序列編號。在這裡,可以把TCP序列           編號看作是一個範圍從0到4,294,967,295的32位計數器。通過TCP連接交換的數據中每一個位元組都經           過序列編號。在TCP報頭中的序列編號欄包括了TCP分段中第一個位元組的序列編號。      FIN:結束標誌。      """      CWR = 0      ECE = 0      URG = 0      ACK = 0      PSH = 0      RST = 0      SYN = 0      FIN = 0        def __init__(self, flag):          # 取反補位(一次1位元組的後6位)          self.CWR = ((~(~(1 << 7))) & flag) >> 7          self.ECE = ((~(~(1 << 6))) & flag) >> 6          self.URG = ((~(~(1 << 5))) & flag) >> 5          self.ACK = ((~(~(1 << 4))) & flag) >> 4          self.PSH = ((~(~(1 << 3))) & flag) >> 3          self.RST = ((~(~(1 << 2))) & flag) >> 2          self.SYN = ((~(~(1 << 1))) & flag) >> 1          self.FIN = ((~(~1)) & flag)        def __str__(self):          return "(CWR:%d ECE:%d URG:%d ACK:%d PSH:%d RST:%d SYN:%d FIN:%d)" % (              self.CWR, self.ECE, self.URG, self.ACK, self.PSH, self.RST, self.SYN, self.FIN)      class TCP(ProcData):      """UDP協議 20B+,暫未處理分段數據 """      _src = 0      _dst = 0      # 發送、確認編號      _seq_no = 0      _ack_no = 0      _header_len_reserved = 0      _reserved_flag = 0      _wnd_size = 0      _check_sum = 0      # 緊急指針(偏移量)      _urqt_p = 0      _option = []      _flag = None      _data = []        def __init__(self, data, upper):          super(TCP, self).__init__(upper)          self._src = data[:2]          self._dst = data[2:4]          self._seq_no = data[4:8]          self._ack_no = data[8:12]          # 4+4          self._header_len_reserved = data[12]          # 2+6          self._reserved_flag = data[13]          self._wnd_size = data[14:16]          self._check_sum = data[16:18]          self._urqt_p = data[18:20]          # 其他可選欄位          if self.header_len > 20:              self._option = data[20:self.header_len]          self._data = data[self.header_len:]        def __str__(self):          return "TCP src(port):%d dst(port):%d seq:%d ack:%d len(header):%d "                  "flag:%s win:%d check_sum:%s urqt_p:%d option:%d payload:%d" % (                     self.src, self.dst, self.seq, self.ack, self.header_len, self.flag, self.wnd_size,                     self.check_sum, self.urqt_p,                     len(self._option),                     len(self._data))        def __len__(self):          return len(self._data)        @property      def src(self):          return BytesOrder.bytes2int(self._src, "big")        @property      def option(self):          """分析tcp的可選項欄位(分析了常用欄位)"""          size = len(self._option)          ret = []          if size > 0:              option = self._option              while size > 0:                  if option[0] == 0x00:                      ret.append({"END": option[0]})                      break                  if option[0] == 0x01:                      ret.append({"NOP": option[0]})                      size -= 1                      option = option[1:]                  elif option[0] == 0x02:                      # MSS                      ret.append({"MSS": {"length": option[1], "value": BytesOrder.bytes2int(option[2:4], "big")}})                      size -= 4                      option = option[4:]                  elif option[0] == 0x03:                      # 窗口擴大因子                      ret.append({"WSALE": {"length": option[1], "shift_count": option[2]}})                      size -= 3                      option = option[3:]                  elif option[0] == 0x04:                      # SACK                      ret.append({"SACK": {"length": option[1]}})                      size -= 2                      option = option[2:]                  elif option[0] == 0x08:                      # 時間戳                      ret.append({"TIMESTAMP": {"length": option[1], "value": BytesOrder.bytes2int(option[2:6], "big"),                                                "repl_value": BytesOrder.bytes2int(option[6:10], "big")}})                      size -= 10                      option = option[10:]                  else:                      break          else:              ret = None          return ret        @property      def flag(self):          """獲取標誌對象"""          if self._flag is None:              self._flag = Flag(self._reserved_flag)          return self._flag        @property      def flag_desc(self):          return bin(self._reserved_flag)        @property      def dst(self):          return BytesOrder.bytes2int(self._dst, "big")        @property      def seq(self):          """獲取序列號"""          return BytesOrder.bytes2int(self._seq_no, "big")        @property      def ack(self):          """獲取確認號"""          return BytesOrder.bytes2int(self._ack_no, "big")        @property      def header_len(self):          """獲取頭部長度"""          return (self._header_len_reserved >> 4) << 2        @property      def wnd_size(self):          """獲取滑動窗口大小"""          return BytesOrder.bytes2int(self._wnd_size, "big")        @property      def check_sum(self):          """獲取校驗"""          return self._check_sum        @property      def urqt_p(self):          """獲取緊急指針"""          return BytesOrder.bytes2int(self._urqt_p, "big")        @property      def data(self):          """獲取原始包(可能包含分段數據,此數據未進行重組)"""          return self._data

5.1.udp.py 傳輸層(udp協議)

udp協議頭(首部)佔用8位元組,記錄埠號,頭長度以及校驗和(非必須)

#!/usr/bin/env python3  # -*- coding: utf-8 -*-  __author__ = "[email protected]"  from pcap.proc.util import BytesOrder, ProcData      class UDP(ProcData):      """UDP 8B"""      _src = 0      _dst = 0      # UDP頭部和UDP數據的總長度位元組      _header_len = 0      _check_sum = 0      _data = None        def __init__(self, data, upper):          super(UDP, self).__init__(upper)          self._src = data[:2]          self._dst = data[2:4]          self._header_len = data[4:6]          self._check_sum = data[6:8]          self._data = data[8:]        def __str__(self):          return "UDP src port:%d dst:%d header_len:%d check_sum:%s" % (              self.src, self.dst, self.header_len, self.check_sum)        @property      def src(self):          return BytesOrder.bytes2int(self._src, "big")        @property      def dst(self):          return BytesOrder.bytes2int(self._dst, "big")        @property      def header_len(self):          return BytesOrder.bytes2int(self._header_len, "big")        @property      def check_sum(self):          return self._check_sum        @property      def data(self):          return self._data

6.基礎類(封裝了位元組大小端轉換、位元組緩衝區操作)

#!/usr/bin/env python3  # -*- coding: utf-8 -*-  __author__ = "[email protected]"  from io import BytesIO      class ProcData(object):      __upper = 0        def __init__(self, upper=None):          self.__upper = upper        @property      def data(self):          """返回上層數據,未處理分片"""          pass        @property      def upper(self):          return self.__upper      class AppProcData(object):      """此介面由應用層來實現"""        def __init__(self):          pass        def find(self, data):          """校驗數據並完成初始化,成功返回self,鏈式調用"""          pass      class BytesOrder(object):      """大小端排序工具類"""      order = "big"        @staticmethod      def bytes2int(data, ord=""):          if ord == "":              ord = BytesOrder.order          return int.from_bytes(data, ord)      class BytesBuffer(BytesIO):      """封裝BytesIO,增加重置"""      # 寫入長度快取      __length = 0      # 統計寫入次數      __count = 0        def __len__(self):          """獲取長度,使用切片而不複製數據,同時增加計算快取"""          if self.__length == 0:              self.__length = len(self.getbuffer())          return self.__length        def clear(self):          """清理快取區然後重置索引,seek必須調用"""          self.truncate(0)          self.seek(0)          self.__length = 0          self.__count = 0        def write(self, *args, **kwargs):          self.__length = 0          self.__count += 1          return super(BytesBuffer, self).write(*args, **kwargs)        def writelines(self, *args, **kwargs):          self.__length = 0          self.__count += 1          return super(BytesBuffer, self).writelines(*args, **kwargs)        def count(self):          return self.__count

值得注意的是,由於抓取的鏈路層的數據,尚未進行重組MTU,MSS,因此抓到是可能是分段數據而不是完整的數據,分段操作,對於tcp(mss)由自己完成,其他則右IP協議完成,所以你發一個tcp包大小為1537位元組,最終可能拆分成2個包,每個包都會帶上tcp協議頭,tcp的mss通常為1460位元組;而ip分段則只會第一個包帶上首部,分包重組需要詳細了解協議知識,關於tcp和ip分包重組,請關注本博