算法篇之BitMap原理与改造,利与弊的取舍

  • 2019 年 11 月 14 日
  • 筆記

内存不足,一直都是一件令人头疼的事情,在有限的资源下,时间与空间的取舍是我们平时开发中思考最多的问题。无论是操作数据库还是Redis缓存,都没有直接使用内存缓存速度快,尤其是对大批量数据的处理。为能把内存使用率降低,并且提升业务的处理速度,我想了很多办法,但效果都不显著,如果数据量再增加,估计也只能加机器。今天分享下,我为适应业务需求对BIMAP算法的改造思路。

BITMAP实现原理

一个byte类型有8bit,可以存储8位数,如0,1,2,3,4,5,6,7;byte的0位为1,则说明0这个数存在这个byte中,byte的1位为1,则说明1这个数存在这个byte中。这与数组的原理是一样的,只是把元素当成数组的下标,然后用0|1表示这个元素是否存在,0为false,1为true。

可是一个byte只能存在0~7的元素,那8以上的数怎么存储。这就是需要一个byte数组,byte[0]存储0~7,byte[1]存储8~15,以此类推。

比如存储25这个数,先拿25右移3位(实际是除以8,但使用移位运算速度更快)拿到一个下标,这个下标就表示这个数存在哪个区间(byte数组的下标),再用这个数与8(一个byte=8bit)取模,可以拿到这个元素用byte的哪一个bit来存储。

举例:还是以25这个数字为例,将25写入BitMap中。

假设byte数组的大小为10(不够扩容,跟List一样道理)。

计算出25在byte数组的下标:

int index = 25>>3 

等价于 25/8=3(数组下标从0开始,刚好不用减1);

计算出bit在byte中的下标:

int bitIndex = 25%8 = 1(byte元素下标也是从0开始);

所以得出

 int e = byteArray[index];   e |= 0x01<<bitIndex;   byteArray[index]=e;

那么怎么判断一个元素是否存在呢?

byte dest = (0x01<<bitIndex);  return e & dest == dest;

移除元素25:

int e = byteArray[index];  e ^= 0x01<<bitIndex;  byteArray[index]=e;

注意:可根据需求实现是否线程安全的。

BITMAP存在的缺点

看起来BITMAP确实很优秀,存储1~80000的数字,如果以int数组来存储,需要80000*4(int=4byte)=160000byte的内存空间,而使用BitMap只需要80000/4/8=2500byte的内存空间,减少了64倍的内存空间。而且,它天然的拥有Set集合的去重功能,比如一个数组存在10个1,最终只会存储一个1。

然而,这些优点都只是理论上的,最优的情况下。因为存储的是连续的数字,如果数字不是连续的呢,比如我就存储两个元素,一个是1,一个是10000,那它依然占据了1000个元素的存储空间,是不是利变成了弊。再假如,我存储的数字是10000~40000,那是不是10000之前的空间就是多余的,浪费掉了?

所以BitMap的使用尤其要根据业务场景选择,如果要存储的整数数据没有连续性,或者元素个数非常少的情况的,就不要考虑这种方案。可以用在存储数据库记录的自增唯一标识,这种都会是连续性的,而且也会比较集中,不会出现1~100就到3000~4000的这种间隔很大的情况。

而第二个缺点,假设存储数字是从40000开始的这种情况,可以将存储的元素减去起始值再存储。但除非是排序好的数字,否则你还得先提前知道你要存储的这些数据中的最小值。怎么解决,假设需要存储的数据是1000~4000,那起始值就设置为1000,比如存储元素1003,先再1003-1000得到3,再存储3。判断一个元素是否存在或者移除一个元素也是先将目标元素减去起始值再判断。

改造后的BITMAP

接下来介绍,我为适应业务而改造的BITMAP。就是为了解决无法预估最小值的情况。

先说下原理。改造后的BitMap可以在new的时候指定一个中间值,如果不指定,那就在写第一个元素的时候,将第一个元素设置为中间值。中间值与起始值的目的等同。为的是实现以中心分散的集中存储方式,更高效的节省BitMap的内存使用。借用负无穷到0,0到正无穷的思路。定义两个byte数组(不是一个了),小于中间值的存在一个数组,大于等于中间值的存储在另一个数组。

这就是改造后的BitMap。在存储元素的时候,实际是先使用中间值减去元素值得出目标值,再存储这个目标值。看个例子,比如中间值设置为100,现在要存储的元素是134,存储过程如下。

计算出134在byte数组的下标:

int index = (100[中间值] - 134)>> 3 

等价于 -34/8=4(数组下标从0开始,刚好不用减1);

说明:index为负数=>存储在第一个byte数组;

index为正数=>存在在第二个byte数组;

计算出bit在byte中的下标:

int bitIndex = |-34|%8 = 2(byte元素下标也是从0开始);

所以得出

int e = leftByteArray[index];  e |= 0x01<<bitIndex;  leftByteArray[index]=e;

献上完整的工具类代码

/**   * @author wujiuye   * @version 1.0 on 2019/9/25 {描述:}   */  public class IntegerBitMap implements BitMap<Integer> {        private int mid;      private byte[] left;      private byte[] right;      private int leftSize = 8;      private int rightSzie = 8;      private int multiplier = 2;      private volatile int realSize;        public IntegerBitMap() {        }        public IntegerBitMap(int mid) {          this.mid = mid;          left = new byte[leftSize];          right = new byte[rightSzie];      }        public IntegerBitMap(int mid, int multiplier) {          this(mid);          this.multiplier = multiplier;      }        /**       * 推荐使用这种方式,使用首个插入的元素作为中间值,这样分布更均匀       *       * @param obj       */      private synchronized void midByFist(Integer obj) {          if (realSize == 0) {              if (mid == 0) {                  this.mid = obj;              }          }      }        /**       * 数组下标,正数是left的下标,负数则是right的下标       *       * @param obj       * @return       */      private int index(Integer obj) {          if (obj == null) {              throw new NullPointerException("obj is null");          }          int intObj = mid - obj;          return intObj >> 3;      }        private synchronized void ensureSize(int index) {          int size = Math.abs(index);          int newSize;          if (index >= 0 && size >= leftSize) {              newSize = leftSize * multiplier > size ? leftSize * multiplier : (size + 1);              byte[] newLeft = new byte[newSize];              System.arraycopy(left, 0, newLeft, 0, Math.min(leftSize, newLeft.length));              left = newLeft;              leftSize = newLeft.length;          }          if (index < 0 && size >= rightSzie) {              newSize = rightSzie * multiplier > size ? rightSzie * multiplier : (size + 1);              byte[] newRight = new byte[newSize];              System.arraycopy(right, 0, newRight, 0, Math.min(rightSzie, newRight.length));              right = newRight;              rightSzie = newRight.length;          }      }        @Override      public synchronized boolean set(Integer obj) {          midByFist(obj);          int byteIndex = index(obj);          ensureSize(byteIndex);          byte map = byteIndex >= 0 ? left[byteIndex] : right[-byteIndex];          int inByteIndex = Math.abs(mid - obj) % 8;          // 0x0000 0000          byte target = (byte) (0x01 << inByteIndex);          boolean isExist = (map & target) == target;          map |= target;          if (byteIndex >= 0) {              left[byteIndex] = map;          } else {              right[-byteIndex] = map;          }          if (!isExist) {              realSize++;          }          return isExist;      }        /**       * 判断元素是否存在(时间复杂度比HashMap更优)       *       * @param obj       * @return       */      @Override      public synchronized boolean contains(Integer obj) {          midByFist(obj);          int byteIndex = index(obj);          ensureSize(byteIndex);          byte map = byteIndex >= 0 ? left[byteIndex] : right[-byteIndex];          int inByteIndex = Math.abs(mid - obj) % 8;          // 0x0000 0000          byte target = (byte) (0x01 << inByteIndex);          return (map & target) == target;      }        @Override      public synchronized boolean remove(Integer obj) {          midByFist(obj);          int byteIndex = index(obj);          ensureSize(byteIndex);          byte map = byteIndex >= 0 ? left[byteIndex] : right[-byteIndex];          int inByteIndex = Math.abs(mid - obj) % 8;          // 0x0000 0000          byte target = (byte) (0x01 << inByteIndex);          boolean isExist = (map & target) == target;          if (isExist) {              map ^= target;              if (byteIndex >= 0) {                  left[byteIndex] = map;              } else {                  right[-byteIndex] = map;              }              realSize--;          }          return isExist;      }        @Override      public synchronized void reset() {          leftSize = 8;          rightSzie = 8;          left = new byte[leftSize];          right = new byte[rightSzie];          mid = 0;          realSize = 0;      }        /**       * 将BitMap转为数组       *       * @return       */      @Override      public Integer[] toArray() {          Integer[] array = new Integer[realSize];          int index = 0;          index = writeToArray(right, array, index);          writeToArray(left, array, index);          return array;      }        private int writeToArray(byte[] source, Integer[] array, int posIndex) {          for (int i = 0; i < source.length; i++) {              for (int j = 0; j < 8; j++) {                  byte r = source[i];                  byte target = (byte) (0x01 << j);                  if ((r & target) == target) {                      array[posIndex++] = (i << 3) + j;                  }              }          }          return posIndex;      }