博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊flink的MemCheckpointStreamFactory
阅读量:5826 次
发布时间:2019-06-18

本文共 14643 字,大约阅读时间需要 48 分钟。

本文主要研究一下flink的MemCheckpointStreamFactory

CheckpointStreamFactory

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/CheckpointStreamFactory.java

/** * A factory for checkpoint output streams, which are used to persist data for checkpoints. * * 

Stream factories can be created from the {@link CheckpointStorage} through * {@link CheckpointStorage#resolveCheckpointStorageLocation(long, CheckpointStorageLocationReference)}. */public interface CheckpointStreamFactory { CheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws IOException; abstract class CheckpointStateOutputStream extends FSDataOutputStream { @Nullable public abstract StreamStateHandle closeAndGetHandle() throws IOException; @Override public abstract void close() throws IOException; }}

  • CheckpointStreamFactory为checkpoint output streams(用于持久化checkpoint的数据)的工厂,它定义了createCheckpointStateOutputStream方法,这里返回的是CheckpointStateOutputStream;CheckpointStateOutputStream继承了FSDataOutputStream,它定义了closeAndGetHandle及close两个抽象方法
  • CheckpointStreamFactory有两个以factory命名的实现类,分别是MemCheckpointStreamFactory(它有两个子类分别为NonPersistentMetadataCheckpointStorageLocation、PersistentMetadataCheckpointStorageLocation)、FsCheckpointStreamFactory(它有一个子类为FsCheckpointStorageLocation)
  • CheckpointStorageLocation接口继承了CheckpointStreamFactory接口,它有三个实现类,分别是NonPersistentMetadataCheckpointStorageLocation、PersistentMetadataCheckpointStorageLocation、FsCheckpointStorageLocation

FSDataOutputStream

flink-core-1.7.0-sources.jar!/org/apache/flink/core/fs/FSDataOutputStream.java

@Publicpublic abstract class FSDataOutputStream extends OutputStream {    public abstract long getPos() throws IOException;    public abstract void flush() throws IOException;    public abstract void sync() throws IOException;    public abstract void close() throws IOException;}
  • FSDataOutputStream继承了java的OutputStream,它多定义了getPos、flush、sync、close几个抽象方法

CheckpointStorageLocation

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/CheckpointStorageLocation.java

/** * A storage location for one particular checkpoint, offering data persistent, metadata persistence, * and lifecycle/cleanup methods. * * 

CheckpointStorageLocations are typically created and initialized via * {@link CheckpointStorage#initializeLocationForCheckpoint(long)} or * {@link CheckpointStorage#initializeLocationForSavepoint(long, String)}. */public interface CheckpointStorageLocation extends CheckpointStreamFactory { CheckpointMetadataOutputStream createMetadataOutputStream() throws IOException; void disposeOnFailure() throws IOException; CheckpointStorageLocationReference getLocationReference();}

  • CheckpointStorageLocation继承了CheckpointStreamFactory接口,它通常是由CheckpointStorage来创建及初始化,提供数据持久化、metadata存储及lifecycle/cleanup相关方法;这里定义了createMetadataOutputStream方法用来创建CheckpointMetadataOutputStream;disposeOnFailure方法用于在checkpoint失败的时候dispose checkpoint location;getLocationReference用于返回CheckpointStorageLocationReference

MemCheckpointStreamFactory

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java

/** * {@link CheckpointStreamFactory} that produces streams that write to in-memory byte arrays. */public class MemCheckpointStreamFactory implements CheckpointStreamFactory {    /** The maximal size that the snapshotted memory state may have */    private final int maxStateSize;    /**     * Creates a new in-memory stream factory that accepts states whose serialized forms are     * up to the given number of bytes.     *     * @param maxStateSize The maximal size of the serialized state     */    public MemCheckpointStreamFactory(int maxStateSize) {        this.maxStateSize = maxStateSize;    }    @Override    public CheckpointStateOutputStream createCheckpointStateOutputStream(            CheckpointedStateScope scope) throws IOException    {        return new MemoryCheckpointOutputStream(maxStateSize);    }    @Override    public String toString() {        return "In-Memory Stream Factory";    }    static void checkSize(int size, int maxSize) throws IOException {        if (size > maxSize) {            throw new IOException(                    "Size of the state is larger than the maximum permitted memory-backed state. Size="                            + size + " , maxSize=" + maxSize                            + " . Consider using a different state backend, like the File System State backend.");        }    }    /**     * A {@code CheckpointStateOutputStream} that writes into a byte array.     */    public static class MemoryCheckpointOutputStream extends CheckpointStateOutputStream {        private final ByteArrayOutputStreamWithPos os = new ByteArrayOutputStreamWithPos();        private final int maxSize;        private AtomicBoolean closed;        boolean isEmpty = true;        public MemoryCheckpointOutputStream(int maxSize) {            this.maxSize = maxSize;            this.closed = new AtomicBoolean(false);        }        @Override        public void write(int b) throws IOException {            os.write(b);            isEmpty = false;        }        @Override        public void write(byte[] b, int off, int len) throws IOException {            os.write(b, off, len);            isEmpty = false;        }        @Override        public void flush() throws IOException {            os.flush();        }        @Override        public void sync() throws IOException { }        // --------------------------------------------------------------------        @Override        public void close() {            if (closed.compareAndSet(false, true)) {                closeInternal();            }        }        @Nullable        @Override        public StreamStateHandle closeAndGetHandle() throws IOException {            if (isEmpty) {                return null;            }            return new ByteStreamStateHandle(String.valueOf(UUID.randomUUID()), closeAndGetBytes());        }        @Override        public long getPos() throws IOException {            return os.getPosition();        }        public boolean isClosed() {            return closed.get();        }        /**         * Closes the stream and returns the byte array containing the stream's data.         * @return The byte array containing the stream's data.         * @throws IOException Thrown if the size of the data exceeds the maximal         */        public byte[] closeAndGetBytes() throws IOException {            if (closed.compareAndSet(false, true)) {                checkSize(os.size(), maxSize);                byte[] bytes = os.toByteArray();                closeInternal();                return bytes;            } else {                throw new IOException("stream has already been closed");            }        }        private void closeInternal() {            os.reset();        }    }}
  • MemCheckpointStreamFactory实现了CheckpointStreamFactory接口,这里createCheckpointStateOutputStream方法返回MemoryCheckpointOutputStream
  • MemoryCheckpointOutputStream继承了CheckpointStateOutputStream,里头使用了ByteArrayOutputStreamWithPos,它在closeAndGetHandle的时候会校验大小是否超过maxSize的限制,超出则抛出IOException异常
  • MemCheckpointStreamFactory有两个子类分别为NonPersistentMetadataCheckpointStorageLocation、PersistentMetadataCheckpointStorageLocation,它们都实现了CheckpointStorageLocation接口

NonPersistentMetadataCheckpointStorageLocation

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/memory/NonPersistentMetadataCheckpointStorageLocation.java

/** * A checkpoint storage location for the {@link MemoryStateBackend} in case no durable persistence * for metadata has been configured. */public class NonPersistentMetadataCheckpointStorageLocation        extends MemCheckpointStreamFactory        implements CheckpointStorageLocation {    /** The external pointer returned for checkpoints that are not externally addressable. */    public static final String EXTERNAL_POINTER = "
"; public NonPersistentMetadataCheckpointStorageLocation(int maxStateSize) { super(maxStateSize); } @Override public CheckpointMetadataOutputStream createMetadataOutputStream() throws IOException { return new MetadataOutputStream(); } @Override public void disposeOnFailure() {} @Override public CheckpointStorageLocationReference getLocationReference() { return CheckpointStorageLocationReference.getDefault(); } // ------------------------------------------------------------------------ // CompletedCheckpointStorageLocation // ------------------------------------------------------------------------ /** * A {@link CompletedCheckpointStorageLocation} that is not persistent and only holds the * metadata in an internal byte array. */ private static class NonPersistentCompletedCheckpointStorageLocation implements CompletedCheckpointStorageLocation { private static final long serialVersionUID = 1L; private final ByteStreamStateHandle metaDataHandle; NonPersistentCompletedCheckpointStorageLocation(ByteStreamStateHandle metaDataHandle) { this.metaDataHandle = metaDataHandle; } @Override public String getExternalPointer() { return EXTERNAL_POINTER; } @Override public StreamStateHandle getMetadataHandle() { return metaDataHandle; } @Override public void disposeStorageLocation() {} } // ------------------------------------------------------------------------ // CheckpointMetadataOutputStream // ------------------------------------------------------------------------ private static class MetadataOutputStream extends CheckpointMetadataOutputStream { private final ByteArrayOutputStreamWithPos os = new ByteArrayOutputStreamWithPos(); private boolean closed; @Override public void write(int b) throws IOException { os.write(b); } @Override public void write(byte[] b, int off, int len) throws IOException { os.write(b, off, len); } @Override public void flush() throws IOException { os.flush(); } @Override public long getPos() throws IOException { return os.getPosition(); } @Override public void sync() throws IOException { } @Override public CompletedCheckpointStorageLocation closeAndFinalizeCheckpoint() throws IOException { synchronized (this) { if (!closed) { closed = true; byte[] bytes = os.toByteArray(); ByteStreamStateHandle handle = new ByteStreamStateHandle(UUID.randomUUID().toString(), bytes); return new NonPersistentCompletedCheckpointStorageLocation(handle); } else { throw new IOException("Already closed"); } } } @Override public void close() { if (!closed) { closed = true; os.reset(); } } }}
  • MemoryBackendCheckpointStorage在没有配置checkpointsDirectory的时候创建的是NonPersistentMetadataCheckpointStorageLocation;其createMetadataOutputStream方法创建的是MetadataOutputStream
  • MetadataOutputStream继承了CheckpointMetadataOutputStream,里头使用的是ByteArrayOutputStreamWithPos,而closeAndFinalizeCheckpoint返回的是NonPersistentCompletedCheckpointStorageLocation
  • NonPersistentCompletedCheckpointStorageLocation实现了CompletedCheckpointStorageLocation接口,其getMetadataHandle方法返回的是ByteStreamStateHandle

PersistentMetadataCheckpointStorageLocation

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/memory/PersistentMetadataCheckpointStorageLocation.java

/** * A checkpoint storage location for the {@link MemoryStateBackend} when it durably * persists the metadata in a file system. */public class PersistentMetadataCheckpointStorageLocation        extends MemCheckpointStreamFactory        implements CheckpointStorageLocation {    private final FileSystem fileSystem;    private final Path checkpointDirectory;    private final Path metadataFilePath;    /**     * Creates a checkpoint storage persists metadata to a file system and stores state     * in line in state handles with the metadata.     *     * @param fileSystem The file system to which the metadata will be written.     * @param checkpointDir The directory where the checkpoint metadata will be written.     */    public PersistentMetadataCheckpointStorageLocation(            FileSystem fileSystem,            Path checkpointDir,            int maxStateSize) {        super(maxStateSize);        this.fileSystem = checkNotNull(fileSystem);        this.checkpointDirectory = checkNotNull(checkpointDir);        this.metadataFilePath = new Path(checkpointDir, AbstractFsCheckpointStorage.METADATA_FILE_NAME);    }    // ------------------------------------------------------------------------    @Override    public CheckpointMetadataOutputStream createMetadataOutputStream() throws IOException {        return new FsCheckpointMetadataOutputStream(fileSystem, metadataFilePath, checkpointDirectory);    }    @Override    public void disposeOnFailure() throws IOException {        // on a failure, no chunk in the checkpoint directory needs to be saved, so        // we can drop it as a whole        fileSystem.delete(checkpointDirectory, true);    }    @Override    public CheckpointStorageLocationReference getLocationReference() {        return CheckpointStorageLocationReference.getDefault();    }}
  • MemoryBackendCheckpointStorage在配置了checkpointsDirectory的时候创建的是PersistentMetadataCheckpointStorageLocation;其createMetadataOutputStream方法创建的是FsCheckpointMetadataOutputStream;FsCheckpointMetadataOutputStream的构造器接收三个参数,分别是fileSystem、metadataFilePath、exclusiveCheckpointDir;其中fileSystem用于根据metadataFilePath来创建FSDataOutputStream,而exclusiveCheckpointDir则在返回FsCompletedCheckpointStorageLocation的时候用到

小结

  • MemoryBackendCheckpointStorage在没有配置checkpointsDirectory的时候创建的是NonPersistentMetadataCheckpointStorageLocation;在配置了checkpointsDirectory的时候创建的是PersistentMetadataCheckpointStorageLocation
  • NonPersistentMetadataCheckpointStorageLocation及PersistentMetadataCheckpointStorageLocation都继承了MemCheckpointStreamFactory类,同时实现了CheckpointStorageLocation接口(其createMetadataOutputStream方法返回的CheckpointMetadataOutputStream类型分别为MetadataOutputStream、FsCheckpointMetadataOutputStream)
  • MemCheckpointStreamFactory实现了CheckpointStreamFactory接口,它的createCheckpointStateOutputStream方法返回MemoryCheckpointOutputStream;CheckpointStorageLocation继承了CheckpointStreamFactory接口,它通常是由CheckpointStorage来创建及初始化,提供数据持久化、metadata存储及lifecycle/cleanup相关方法

doc

转载地址:http://xasdx.baihongyu.com/

你可能感兴趣的文章