一文带你彻底了解大数据处理引擎Flink内存管理

摘要: Flink是jvm之上的大数据处理引擎。

Flink是jvm之上的大数据处理引擎,jvm存在java对象存储密度低、full gc时消耗性能,gc存在stw的问题,同时omm时会影响稳定性。同时针对频繁序列化和反序列化问题flink使用堆内堆外内存可以直接在一些场景下操作二进制数据,减少序列化反序列化的消耗。同时基于大数据流式处理的特点,flink定制了自己的一套序列化框架。flink也会基于cpu L1 L2 L3高速缓存的机制以及局部性原理,设计使用缓存友好的数据结构。flink内存管理和spark的tungsten的内存管理的出发点很相似。

内存模型

Flink可以使用堆内和堆外内存,内存模型如图所示:

flink使用内存划分为堆内内存和堆外内存。按照用途可以划分为task所用内存,network memory、managed memory、以及framework所用内存,其中task network managed所用内存计入slot内存。framework为taskmanager公用。

堆内内存包含用户代码所用内存、heapstatebackend、框架执行所用内存。

堆外内存是未经jvm虚拟化的内存,直接映射到操作系统的内存地址,堆外内存包含框架执行所用内存,jvm堆外内存、Direct、native等。

Direct memory内存可用于网络传输缓冲。network memory属于direct memory的范畴,flink可以借助于此进行zero copy,从而减少内核态到用户态copy次数,从而进行更高效的io操作。

jvm metaspace存放jvm加载的类的元数据,加载的类越多,需要的空间越大,overhead用于jvm的其他开销,如native memory、code cache、thread stack等。

Managed Memory主要用于RocksDBStateBackend和批处理算子,也属于native memory的范畴,其中rocksdbstatebackend对应rocksdb,rocksdb基于lsm数据结构实现,每个state对应一个列族,占有独立的writebuffer,rocksdb占用native内存大小为 blockCahe + writebufferNum * writeBuffer + index ,同时堆外内存是进程之间共享的,jvm虚拟化大量heap内存耗时较久,使用堆外内存的话可以有效的避免该环节。但堆外内存也有一定的弊端,即监控调试使用相对复杂,对于生命周期较短的segment使用堆内内存开销更低,flink在一些情况下,直接操作二进制数据,避免一些反序列化带来的开销。如果需要处理的数据超出了内存限制,则会将部分数据存储到硬盘上。

内存管理

类似于OS中的page机制,flink模拟了操作系统的机制,通过page来管理内存,flink对应page的数据结构为dataview和MemorySegment,memorysegment是flink内存分配的最小单位,默认32kb,其可以在堆上也可以在堆外,flink通过MemorySegment的数据结构来访问堆内堆外内存,借助于flink序列化机制(序列化机制会在下一小节讲解),memorysegment提供了对二进制数据的读取和写入的方法,flink使用datainputview和dataoutputview进行memorysegment的二进制的读取和写入,flink可以通过HeapMemorySegment 管理堆内内存,通过HybridMemorySegment来管理堆内和堆外内存,MemorySegment管理jvm堆内存时,其定义一个字节数组的引用指向内存端,基于该内部字节数组的引用进行操作的HeapMemorySegment。

public abstract class MemorySegment {
    /**
     * The heap byte array object relative to which we access the memory.
     *  如果为堆内存,则指向访问的内存的引用,否则若内存为非堆内存,则为null
     * <p>Is non-<tt>null</tt> if the memory is on the heap, and is <tt>null</tt>, if the memory is
     * off the heap. If we have this buffer, we must never void this reference, or the memory
     * segment will point to undefined addresses outside the heap and may in out-of-order execution
     * cases cause segmentation faults.
     */
    protected final byte[] heapMemory;
    /**
     * The address to the data, relative to the heap memory byte array. If the heap memory byte
     * array is <tt>null</tt>, this becomes an absolute memory address outside the heap.
     * 字节数组对应的相对地址
     */
    protected long address;  
}

HeapMemorySegment用来分配堆上内存。

public final class HeapMemorySegment extends MemorySegment {
    /**
     * An extra reference to the heap memory, so we can let byte array checks fail by the built-in
     * checks automatically without extra checks.
     * 字节数组的引用指向该内存段
     */
    private byte[] memory;
    public void free() {
        super.free();
        this.memory = null;
    }
 
    public final void get(DataOutput out, int offset, int length) throws IOException {
        out.write(this.memory, offset, length);
    }
}

HybridMemorySegment即支持onheap和offheap内存,flink通过jvm的unsafe操作,如果对象o不为null,为onheap的场景,并且后面的地址或者位置是相对位置,那么会直接对当前对象(比如数组)的相对位置进行操作。如果对象o为null,操作的内存块不是JVM堆内存,为off-heap的场景,并且后面的地址是某个内存块的绝对地址,那么这些方法的调用也相当于对该内存块进行操作。

public final class HybridMemorySegment extends MemorySegment {
  @Override
    public ByteBuffer wrap(int offset, int length) {
        if (address <= addressLimit) {
            if (heapMemory != null) {
                return ByteBuffer.wrap(heapMemory, offset, length);
            }
            else {
                try {
                    ByteBuffer wrapper = offHeapBuffer.duplicate();
                    wrapper.limit(offset + length);
                    wrapper.position(offset);
                    return wrapper;
                }
                catch (IllegalArgumentException e) {
                    throw new IndexOutOfBoundsException();
                }
            }
        }
        else {
            throw new IllegalStateException("segment has been freed");
        }
    }
}

flink通过MemorySegmentFactory来创建memorySegment,memorySegment是flink内存分配的最小单位。对于跨memorysegment的数据方位,flink抽象出一个访问视图,数据读取datainputView,数据写入dataoutputview。

/**
 * This interface defines a view over some memory that can be used to sequentially read the contents of the memory.
 * The view is typically backed by one or more {@link org.apache.flink.core.memory.MemorySegment}.
 */
@Public
public interface DataInputView extends DataInput {
private MemorySegment[] memorySegments; // view持有的MemorySegment的引用, 该组memorysegment可以视为一个内存页,
flink可以顺序读取memorysegmet中的数据
/**
     * Reads up to {@code len} bytes of memory and stores it into {@code b} starting at offset {@code off}.
     * It returns the number of read bytes or -1 if there is no more data left.
     * @param b byte array to store the data to
     * @param off offset into byte array
     * @param len byte length to read
     * @return the number of actually read bytes of -1 if there is no more data left
     */
    int read(byte[] b, int off, int len) throws IOException;
}

dataoutputview是数据写入的视图,outputview持有多个memorysegment的引用,flink可以顺序的写入segment。

/**
 * This interface defines a view over some memory that can be used to sequentially write contents to the memory.
 * The view is typically backed by one or more {@link org.apache.flink.core.memory.MemorySegment}.
 */
@Public
public interface DataOutputView extends DataOutput {
private final List<MemorySegment> memory; // memorysegment的引用
/**
     * Copies {@code numBytes} bytes from the source to this view.
     * @param source The source to copy the bytes from.
     * @param numBytes The number of bytes to copy.
    void write(DataInputView source, int numBytes) throws IOException;
}

上一小节中讲到的managedmemory内存部分,flink使用memorymanager来管理该内存,managedmemory只使用堆外内存,主要用于批处理中的sorting、hashing、以及caching(社区消息,未来流处理也会使用到该部分),在流计算中作为rocksdbstatebackend的部分内存。memeorymanager通过memorypool来管理memorysegment。

/**
 * The memory manager governs the memory that Flink uses for sorting, hashing, caching or off-heap state backends
 * (e.g. RocksDB). Memory is represented either in {@link MemorySegment}s of equal size or in reserved chunks of certain
 * size. Operators allocate the memory either by requesting a number of memory segments or by reserving chunks.
 * Any allocated memory has to be released to be reused later.
 * <p>The memory segments are represented as off-heap unsafe memory regions (both via {@link HybridMemorySegment}).
 * Releasing a memory segment will make it re-claimable by the garbage collector, but does not necessarily immediately
 * releases the underlying memory.
 */
public class MemoryManager {
 /**
     * Allocates a set of memory segments from this memory manager.
     * <p>The total allocated memory will not exceed its size limit, announced in the constructor.
     * @param owner The owner to associate with the memory segment, for the fallback release.
     * @param target The list into which to put the allocated memory pages.
     * @param numberOfPages The number of pages to allocate.
     * @throws MemoryAllocationException Thrown, if this memory manager does not have the requested amount
     *                                   of memory pages any more.
     */
    public void allocatePages(
            Object owner,
            Collection<MemorySegment> target,
            int numberOfPages) throws MemoryAllocationException {
}

private static void freeSegment(MemorySegment segment, @Nullable Collection<MemorySegment> segments) {
        segment.free();
        if (segments != null) {
            segments.remove(segment);
        }
    }
/**
     * Frees this memory segment.
     * <p>After this operation has been called, no further operations are possible on the memory
     * segment and will fail. The actual memory (heap or off-heap) will only be released after this
     * memory segment object has become garbage collected.
     */
    public void free() {
        // this ensures we can place no more data and trigger
        // the checks for the freed segment
        address = addressLimit + 1;
    }
}

对于上一小节中提到的NetWorkMemory的内存,flink使用networkbuffer做了一层buffer封装。buffer的底层也是memorysegment,flink通过bufferpool来管理buffer,每个taskmanager都有一个netwokbufferpool,该tm上的各个task共享该networkbufferpool,同时task对应的localbufferpool所需的内存需要从networkbufferpool申请而来,它们都是flink申请的堆外内存。

上游算子向resultpartition写入数据时,申请buffer资源,使用bufferbuilder将数据写入memorysegment,下游算子从resultsubpartition消费数据时,利用bufferconsumer从memorysegment中读取数据,bufferbuilder与bufferconsumer一一对应。同时这一流程也和flink的反压机制相关。如图

/**
 * A buffer pool used to manage a number of {@link Buffer} instances from the
 * {@link NetworkBufferPool}.
 * <p>Buffer requests are mediated to the network buffer pool to ensure dead-lock
 * free operation of the network stack by limiting the number of buffers per
 * local buffer pool. It also implements the default mechanism for buffer
 * recycling, which ensures that every buffer is ultimately returned to the
 * network buffer pool.
 * <p>The size of this pool can be dynamically changed at runtime ({@link #setNumBuffers(int)}. It
 * will then lazily return the required number of buffers to the {@link NetworkBufferPool} to
 * match its new size.
 */
class LocalBufferPool implements BufferPool {
@Nullable
    private MemorySegment requestMemorySegment(int targetChannel) throws IOException {
        MemorySegment segment = null;
        synchronized (availableMemorySegments) {
            returnExcessMemorySegments();

            if (availableMemorySegments.isEmpty()) {
                segment = requestMemorySegmentFromGlobal();
            }
            // segment may have been released by buffer pool owner
            if (segment == null) {
                segment = availableMemorySegments.poll();
            }
            if (segment == null) {
                availabilityHelper.resetUnavailable();
            }
            if (segment != null && targetChannel != UNKNOWN_CHANNEL) {
                if (subpartitionBuffersCount[targetChannel]++ == maxBuffersPerChannel) {
                    unavailableSubpartitionsCount++;
                    availabilityHelper.resetUnavailable();
                }
            }
        }
        return segment;
    }
    }
    /**
 * A result partition for data produced by a single task.
 *
 * <p>This class is the runtime part of a logical {@link IntermediateResultPartition}. Essentially,
 * a result partition is a collection of {@link Buffer} instances. The buffers are organized in one
 * or more {@link ResultSubpartition} instances, which further partition the data depending on the
 * number of consuming tasks and the data {@link DistributionPattern}.
 * <p>Tasks, which consume a result partition have to request one of its subpartitions. The request
 * happens either remotely (see {@link RemoteInputChannel}) or locally (see {@link LocalInputChannel})
  The life-cycle of each result partition has three (possibly overlapping) phases:
    Produce  Consume  Release  Buffer management State management
 */
public abstract class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
      @Override
    public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException {
        checkInProduceState();
        return bufferPool.requestBufferBuilderBlocking(targetChannel);
    }
    }
}

自定义序列化框架

flink对自身支持的基本数据类型,实现了定制的序列化机制,flink数据集对象相对固定,可以只保存一份schema信息,从而节省存储空间,数据序列化就是java对象和二进制数据之间的数据转换,flink使用TypeInformation的createSerializer接口负责创建每种类型的序列化器,进行数据的序列化反序列化,类型信息在构建streamtransformation时通过typeextractor根据方法签名类信息等提取类型信息并存储在streamconfig中。

/**
     * Creates a serializer for the type. The serializer may use the ExecutionConfig
     * for parameterization.
     * 创建出对应类型的序列化器
     * @param config The config used to parameterize the serializer.
     * @return A serializer for this type.
     */
    @PublicEvolving
    public abstract TypeSerializer<T> createSerializer(ExecutionConfig config);
/**
 * A utility for reflection analysis on classes, to determine the return type of implementations of transformation
 * functions.
 */
@Public
public class TypeExtractor {
/**
 * Creates a {@link TypeInformation} from the given parameters.
     * If the given {@code instance} implements {@link ResultTypeQueryable}, its information
     * is used to determine the type information. Otherwise, the type information is derived
     * based on the given class information.
     * @param instance            instance to determine type information for
     * @param baseClass            base class of {@code instance}
     * @param clazz                class of {@code instance}
     * @param returnParamPos    index of the return type in the type arguments of {@code clazz}
     * @param <OUT>                output type
     * @return type information
     */
    @SuppressWarnings("unchecked")
    @PublicEvolving
    public static <OUT> TypeInformation<OUT> createTypeInfo(Object instance, Class<?> baseClass, Class<?> clazz,
 int returnParamPos) {
        if (instance instanceof ResultTypeQueryable) {
            return ((ResultTypeQueryable<OUT>) instance).getProducedType();
        } else {
            return createTypeInfo(baseClass, clazz, returnParamPos, null, null);
        }
    }
}

对于嵌套的数据类型,flink从最内层的字段开始序列化,内层序列化的结果将组成外层序列化结果,反序列时,从内存中顺序读取二进制数据,根据偏移量反序列化为java对象。flink自带序列化机制存储密度很高,序列化对应的类型值即可。

flink中的table模块在memorysegment的基础上使用了BinaryRow的数据结构,可以更好地减少反序列化开销,需要反序列化是可以只序列化相应的字段,而无需序列化整个对象。

同时你也可以注册子类型和自定义序列化器,对于flink无法序列化的类型,会交给kryo进行处理,如果kryo也无法处理,将强制使用avro来序列化,kryo序列化性能相对flink自带序列化机制较低,开发时可以使用env.getConfig().disableGenericTypes()来禁用kryo,尽量使用flink框架自带的序列化器对应的数据类型。

缓存友好的数据结构

cpu中L1、L2、L3的缓存读取速度比从内存中读取数据快很多,高速缓存的访问速度是主存的访问速度的很多倍。另外一个重要的程序特性是局部性原理,程序常常使用它们最近使用的数据和指令,其中两种局部性类型,时间局部性指最近访问的内容很可能短期内被再次访问,空间局部性是指地址相互临近的项目很可能短时间内被再次访问。

结合这两个特性设计缓存友好的数据结构可以有效的提升缓存命中率和本地化特性,该特性主要用于排序操作中,常规情况下一个指针指向一个<key,v>对象,排序时需要根据指针pointer获取到实际数据,然后再进行比较,这个环节涉及到内存的随机访问,缓存本地化会很低,使用序列化的定长key + pointer,这样key就会连续存储到内存中,避免的内存的随机访问,还可以提升cpu缓存命中率。对两条记录进行排序时首先比较key,如果大小不同直接返回结果,只需交换指针即可,不用交换实际数据,如果相同,则比较指针实际指向的数据。

后记

flink社区已走向流批一体的发展,后继将更多的关注与流批一体的引擎实现及结合存储层面的实现。flink服务请使用华为云 EI DLI-FLINK serverless服务。

参考

 [1]: //ci.apache.org/projects/flink/flink-docs-stable/

 [2]: //github.com/apache/flink

 [3]: //ververica.cn/

 

点击关注,第一时间了解华为云新鲜技术~