博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
flink以gz格式写入hdfs
阅读量:3959 次
发布时间:2019-05-24

本文共 5686 字,大约阅读时间需要 18 分钟。

在flink的官方文档中,有读取gz文件的API,但是没有写入hdfs以gz格式写入的API,如果我们存储的数据量比较大,严重的占用我们的存储空间怎么办?

博主在这里分享一下本人的做法。注意:博主的hdfs文件生成依赖于flink的checkpoint,如果在程序中不配置flink的checkpoint,该功能时无法实现的。

package com.push.redis;import org.apache.flink.api.common.serialization.BulkWriter;import org.apache.flink.core.fs.FSDataOutputStream;import java.io.IOException;import java.util.zip.GZIPOutputStream;public class GzipBulkStringWriterFactory
implements BulkWriter.Factory
{ @Override public BulkWriter
create(FSDataOutputStream fsDataOutputStream) throws IOException { final GZIPOutputStream gzipOutputStream = new GZIPOutputStream(fsDataOutputStream, true); return new GzipStringBulkWriter<>(gzipOutputStream); }}
package com.push.redis;import org.apache.flink.api.common.serialization.BulkWriter;import java.io.IOException;import java.nio.charset.StandardCharsets;import java.util.zip.GZIPOutputStream;public class GzipStringBulkWriter
implements BulkWriter
{ private final GZIPOutputStream gzipOutputStream; //private final ObjectOutputStream objectOutputStream; public GzipStringBulkWriter(GZIPOutputStream gzipOutputStream) { //this.objectOutputStream = objectOutputStream; this.gzipOutputStream = gzipOutputStream; } @Override public void addElement(T t) throws IOException { //objectOutputStream.writeUTF(String.valueOf(t)); // write String only gzipOutputStream.write(String.valueOf(t).getBytes(StandardCharsets.UTF_8)); } @Override public void flush() throws IOException { //objectOutputStream.flush(); gzipOutputStream.flush(); } @Override public void finish() throws IOException { //objectOutputStream.close(); gzipOutputStream.close(); }}

在这里,不能使用Object类型,如果使用Object类型,在zcat读取hdfs的时候,会出现乱码的情况。

package com.push.redis;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.core.fs.Path;import org.apache.flink.core.io.SimpleVersionedSerializer;import org.apache.flink.runtime.state.memory.MemoryStateBackend;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.CheckpointConfig;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;import java.time.Instant;import java.time.ZoneId;import java.time.format.DateTimeFormatter;import java.util.Properties;public class testFlinkGZToHdfs {    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        CheckpointConfig config = env.getCheckpointConfig();        env.enableCheckpointing(10*6*1000);   //5s进行一次checkPoint        config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);  //精准一次处理        config.setCheckpointTimeout(10*6*1000);  //配置checkpoint必须在1s内完成一次checkpoint,否则检查点终止        config.setMinPauseBetweenCheckpoints(10*6*1000);  //设置checkpoint最小时间间隔        config.setMaxConcurrentCheckpoints(1);  //配置checkpoint并行度,同一时间值允许进行一次检查点        config.setFailOnCheckpointingErrors(false);  //一旦检查点不能正常运行,Task不终止        config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);   //在cancel任务时候,系统保留checkpoint//        env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.of(3, TimeUnit.MINUTES),Time.of(10, TimeUnit.SECONDS)));        env.setStateBackend(new MemoryStateBackend());        Properties properties = new Properties();        properties.setProperty("bootstrap.servers","centos:9092");        properties.setProperty("group.id", "aa");        FlinkKafkaConsumer011
kafkaSource0 = new FlinkKafkaConsumer011
("hhhh", new SimpleStringSchema(), properties); DataStreamSource
kafkaSource = (DataStreamSource
) env.addSource(kafkaSource0); kafkaSource0.setStartFromLatest(); SingleOutputStreamOperator
streamOperator = kafkaSource.map(t -> parse(t)); // gizp final StreamingFileSink
streamingGzipFileSink = StreamingFileSink.forBulkFormat(new Path("hdfs://192.168.139.188:9000/testGZ06"), new GzipBulkStringWriterFactory
()) // 设置元素落到哪个 bucket .withBucketAssigner(new BucketAssigner
() { private static final long serialVersionUID = 1L; private transient DateTimeFormatter dateTimeFormatter; @Override public String getBucketId(String element, Context context) { if (dateTimeFormatter == null) { dateTimeFormatter = DateTimeFormatter.ofPattern("yyyyMMddHH").withZone(ZoneId.systemDefault()); } //yyyyMMddHH 数据落入的目录,根据实际情况设置,比如从元素中获取元素生成时间 return "ymd="+dateTimeFormatter.format(Instant.ofEpochMilli(System.currentTimeMillis())); } @Override public SimpleVersionedSerializer
getSerializer() { return SimpleVersionedStringSerializer.INSTANCE; } }).build(); streamOperator.addSink(streamingGzipFileSink); env.execute("testHdfsGZ"); } public static String parse(String a){ return a + '\n'; }}

博主的测试的流程是,f’lin’k消耗kafka的数据,以压缩的形式写入hdfs。但是注意,在使用的时候,对于消费来的数据,默认是没有换行符的,所以在数据存储的时候,要在每一条数据的后面添加上换行符。

经过本人测试,该形式的压缩与txt文件比较,压缩比在11:1,可以很好的实现空间的节省,在分区文件的生成上,也严格的依赖于flink的checkpoint的时间间隔。

转载地址:http://wimzi.baihongyu.com/

你可能感兴趣的文章