/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.store.file.data;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.KeyValueSerializer;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.data.DataFilePathFactory;
import org.apache.flink.table.store.file.stats.BinaryTableStats;
import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.FileUtils;
import org.apache.flink.table.store.file.writer.BaseFileWriter;
import org.apache.flink.table.store.file.writer.FileWriter;
import org.apache.flink.table.store.file.writer.Metric;
import org.apache.flink.table.store.file.writer.MetricFileWriter;
import org.apache.flink.table.store.file.writer.RollingFileWriter;
import org.apache.flink.table.store.format.FieldStats;
import org.apache.flink.table.store.format.FileFormat;
import org.apache.flink.table.store.format.FileStatsExtractor;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.CloseableIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataFileWriter {
    private static final Logger LOG = LoggerFactory.getLogger(DataFileWriter.class);
    private final long schemaId;
    private final RowType keyType;
    private final RowType valueType;
    private final BulkWriter.Factory<RowData> writerFactory;
    private final FileStatsExtractor fileStatsExtractor;
    private final FieldStatsArraySerializer keyStatsConverter;
    private final FieldStatsArraySerializer valueStatsConverter;
    private final DataFilePathFactory pathFactory;
    private final long suggestedFileSize;

    private DataFileWriter(long schemaId, RowType keyType, RowType valueType, BulkWriter.Factory<RowData> writerFactory, @Nullable FileStatsExtractor fileStatsExtractor, DataFilePathFactory pathFactory, long suggestedFileSize) {
        this.schemaId = schemaId;
        this.keyType = keyType;
        this.valueType = valueType;
        this.writerFactory = writerFactory;
        this.fileStatsExtractor = fileStatsExtractor;
        this.keyStatsConverter = new FieldStatsArraySerializer(keyType);
        this.valueStatsConverter = new FieldStatsArraySerializer(valueType);
        this.pathFactory = pathFactory;
        this.suggestedFileSize = suggestedFileSize;
    }

    public RowType keyType() {
        return this.keyType;
    }

    public RowType valueType() {
        return this.valueType;
    }

    @VisibleForTesting
    public long suggestedFileSize() {
        return this.suggestedFileSize;
    }

    @VisibleForTesting
    public DataFilePathFactory pathFactory() {
        return this.pathFactory;
    }

    public Path writeLevel0Changelog(CloseableIterator<KeyValue> iterator) throws Exception {
        FileWriter.Factory<KeyValue, Metric> writerFactory = this.createFileWriterFactory();
        Path changelogPath = this.pathFactory.newChangelogPath();
        this.doWrite(writerFactory.create(changelogPath), iterator);
        return changelogPath;
    }

    public Optional<DataFileMeta> writeLevel0(CloseableIterator<KeyValue> iterator) throws Exception {
        List<DataFileMeta> files = this.write(iterator, 0);
        if (files.size() > 1) {
            throw new RuntimeException("Produce illegal multiple Level 0 files: " + files);
        }
        return files.size() == 0 ? Optional.empty() : Optional.of(files.get(0));
    }

    public List<DataFileMeta> write(CloseableIterator<KeyValue> iterator, int level) throws Exception {
        long suggestedFileSize = level == 0 ? Long.MAX_VALUE : this.suggestedFileSize;
        return this.doWrite(this.createRollingKvWriter(level, suggestedFileSize), iterator);
    }

    private <R> R doWrite(FileWriter<KeyValue, R> fileWriter, CloseableIterator<KeyValue> iterator) throws Exception {
        try (FileWriter<KeyValue, R> writer = fileWriter;){
            writer.write(iterator);
        }
        catch (Throwable e) {
            LOG.warn("Exception occurs when writing data files. Cleaning up.", e);
            fileWriter.abort();
            throw e;
        }
        finally {
            iterator.close();
        }
        return fileWriter.result();
    }

    public void delete(DataFileMeta file) {
        this.delete(file.fileName());
    }

    public void delete(String file) {
        FileUtils.deleteOrWarn(this.pathFactory.toPath(file));
    }

    private Supplier<KvFileWriter> createWriterFactory(int level) {
        return () -> {
            try {
                return new KvFileWriter(this.createFileWriterFactory(), this.pathFactory.newPath(), level);
            }
            catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        };
    }

    private FileWriter.Factory<KeyValue, Metric> createFileWriterFactory() {
        KeyValueSerializer kvSerializer = new KeyValueSerializer(this.keyType, this.valueType);
        return MetricFileWriter.createFactory(this.writerFactory, kvSerializer::toRow, KeyValue.schema(this.keyType, this.valueType), this.fileStatsExtractor);
    }

    private RollingKvWriter createRollingKvWriter(int level, long targetFileSize) {
        return new RollingKvWriter(this.createWriterFactory(level), targetFileSize);
    }

    public static class Factory {
        private final long schemaId;
        private final RowType keyType;
        private final RowType valueType;
        private final FileFormat fileFormat;
        private final FileStorePathFactory pathFactory;
        private final long suggestedFileSize;

        public Factory(long schemaId, RowType keyType, RowType valueType, FileFormat fileFormat, FileStorePathFactory pathFactory, long suggestedFileSize) {
            this.schemaId = schemaId;
            this.keyType = keyType;
            this.valueType = valueType;
            this.fileFormat = fileFormat;
            this.pathFactory = pathFactory;
            this.suggestedFileSize = suggestedFileSize;
        }

        public DataFileWriter create(BinaryRowData partition, int bucket) {
            RowType recordType = KeyValue.schema(this.keyType, this.valueType);
            return new DataFileWriter(this.schemaId, this.keyType, this.valueType, this.fileFormat.createWriterFactory(recordType), this.fileFormat.createStatsExtractor(recordType).orElse(null), this.pathFactory.createDataFilePathFactory(partition, bucket), this.suggestedFileSize);
        }
    }

    private static class RollingKvWriter
    extends RollingFileWriter<KeyValue, DataFileMeta> {
        public RollingKvWriter(Supplier<KvFileWriter> writerFactory, long targetFileSize) {
            super(writerFactory, targetFileSize);
        }
    }

    private class KvFileWriter
    extends BaseFileWriter<KeyValue, DataFileMeta> {
        private final int level;
        private final RowDataSerializer keySerializer;
        private BinaryRowData minKey;
        private RowData maxKey;
        private long minSeqNumber;
        private long maxSeqNumber;

        public KvFileWriter(FileWriter.Factory<KeyValue, Metric> writerFactory, Path path, int level) throws IOException {
            super(writerFactory, path);
            this.minKey = null;
            this.maxKey = null;
            this.minSeqNumber = Long.MAX_VALUE;
            this.maxSeqNumber = Long.MIN_VALUE;
            this.level = level;
            this.keySerializer = new RowDataSerializer(DataFileWriter.this.keyType);
        }

        @Override
        public void write(KeyValue kv) throws IOException {
            super.write(kv);
            this.updateMinKey(kv);
            this.updateMaxKey(kv);
            this.updateMinSeqNumber(kv);
            this.updateMaxSeqNumber(kv);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Write key value " + kv.toString(DataFileWriter.this.keyType, DataFileWriter.this.valueType));
            }
        }

        private void updateMinKey(KeyValue kv) {
            if (this.minKey == null) {
                this.minKey = this.keySerializer.toBinaryRow(kv.key()).copy();
            }
        }

        private void updateMaxKey(KeyValue kv) {
            this.maxKey = kv.key();
        }

        private void updateMinSeqNumber(KeyValue kv) {
            this.minSeqNumber = Math.min(this.minSeqNumber, kv.sequenceNumber());
        }

        private void updateMaxSeqNumber(KeyValue kv) {
            this.maxSeqNumber = Math.max(this.maxSeqNumber, kv.sequenceNumber());
        }

        @Override
        protected DataFileMeta createResult(Path path, Metric metric) throws IOException {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Closing data file " + path);
            }
            FieldStats[] rowStats = metric.fieldStats();
            int numKeyFields = DataFileWriter.this.keyType.getFieldCount();
            FieldStats[] keyFieldStats = Arrays.copyOfRange(rowStats, 0, numKeyFields);
            BinaryTableStats keyStats = DataFileWriter.this.keyStatsConverter.toBinary(keyFieldStats);
            FieldStats[] valFieldStats = Arrays.copyOfRange(rowStats, numKeyFields + 2, rowStats.length);
            BinaryTableStats valueStats = DataFileWriter.this.valueStatsConverter.toBinary(valFieldStats);
            return new DataFileMeta(path.getName(), FileUtils.getFileSize(path), this.recordCount(), this.minKey, this.keySerializer.toBinaryRow(this.maxKey).copy(), keyStats, valueStats, this.minSeqNumber, this.maxSeqNumber, DataFileWriter.this.schemaId, this.level);
        }
    }
}

