28
28
import org .apache .paimon .format .FileFormatFactory ;
29
29
import org .apache .paimon .format .FormatWriter ;
30
30
import org .apache .paimon .format .FormatWriterFactory ;
31
+ import org .apache .paimon .format .HadoopCompressionType ;
31
32
import org .apache .paimon .format .SupportsDirectWrite ;
32
33
import org .apache .paimon .format .csv .CsvFileFormatFactory ;
33
34
import org .apache .paimon .format .parquet .ParquetFileFormatFactory ;
34
35
import org .apache .paimon .fs .FileIO ;
35
36
import org .apache .paimon .fs .Path ;
36
37
import org .apache .paimon .fs .PositionOutputStream ;
37
38
import org .apache .paimon .fs .ResolvingFileIO ;
39
+ import org .apache .paimon .io .DataFilePathFactory ;
38
40
import org .apache .paimon .options .CatalogOptions ;
39
41
import org .apache .paimon .options .ConfigOption ;
40
42
import org .apache .paimon .options .Options ;
@@ -589,11 +591,13 @@ void testFormatTableRead(boolean partitioned) throws Exception {
589
591
String dbName = "test_db" ;
590
592
catalog .createDatabase (dbName , true );
591
593
int partitionValue = 10 ;
594
+ HadoopCompressionType compressionType = HadoopCompressionType .GZIP ;
592
595
Schema .Builder schemaBuilder = Schema .newBuilder ();
593
596
schemaBuilder .column ("f1" , DataTypes .INT ());
594
597
schemaBuilder .column ("dt" , DataTypes .INT ());
595
598
schemaBuilder .option ("type" , "format-table" );
596
599
schemaBuilder .option ("target-file-size" , "1 kb" );
600
+ schemaBuilder .option ("file.compression" , compressionType .value ());
597
601
String [] formats = {
598
602
"csv" , "parquet" ,
599
603
};
@@ -620,19 +624,46 @@ void testFormatTableRead(boolean partitioned) throws Exception {
620
624
Map <String , String > partitionSpec = null ;
621
625
if (partitioned ) {
622
626
Path partitionPath =
623
- new Path (
624
- String .format (
625
- "%s/%s/%s" ,
626
- table .location (), "dt=" + partitionValue , "data" ));
627
+ new Path (String .format ("%s/%s" , table .location (), "dt=" + partitionValue ));
628
+ DataFilePathFactory dataFilePathFactory =
629
+ new DataFilePathFactory (
630
+ partitionPath ,
631
+ format ,
632
+ "data" ,
633
+ "change" ,
634
+ true ,
635
+ compressionType .value (),
636
+ null );
627
637
Path diffPartitionPath =
628
- new Path (String .format ("%s/%s/%s" , table .location (), "dt=" + 11 , "data" ));
629
- write (factory , partitionPath , datas );
630
- write (factory , diffPartitionPath , dataWithDiffPartition );
638
+ new Path (String .format ("%s/%s" , table .location (), "dt=" + 11 ));
639
+ DataFilePathFactory diffPartitionPathFactory =
640
+ new DataFilePathFactory (
641
+ diffPartitionPath ,
642
+ format ,
643
+ "data" ,
644
+ "change" ,
645
+ true ,
646
+ compressionType .value (),
647
+ null );
648
+ write (factory , dataFilePathFactory .newPath (), compressionType .value (), datas );
649
+ write (
650
+ factory ,
651
+ diffPartitionPathFactory .newPath (),
652
+ compressionType .value (),
653
+ dataWithDiffPartition );
631
654
partitionSpec = new HashMap <>();
632
655
partitionSpec .put ("dt" , "" + partitionValue );
633
656
} else {
634
- Path filePath = new Path (table .location (), "data" );
635
- write (factory , filePath , datas );
657
+ DataFilePathFactory dataFilePathFactory =
658
+ new DataFilePathFactory (
659
+ new Path (table .location ()),
660
+ format ,
661
+ "data" ,
662
+ "change" ,
663
+ true ,
664
+ compressionType .value (),
665
+ null );
666
+ write (factory , dataFilePathFactory .newPath (), compressionType .value (), datas );
636
667
}
637
668
List <InternalRow > readData = read (table , null , partitionSpec );
638
669
@@ -652,15 +683,16 @@ protected FileFormatFactory buildFileFormatFactory(String format) {
652
683
}
653
684
}
654
685
655
- protected void write (FormatWriterFactory factory , Path file , InternalRow ... rows )
686
+ protected void write (
687
+ FormatWriterFactory factory , Path file , String compression , InternalRow ... rows )
656
688
throws IOException {
657
689
FormatWriter writer ;
658
690
PositionOutputStream out = null ;
659
691
if (factory instanceof SupportsDirectWrite ) {
660
- writer = ((SupportsDirectWrite ) factory ).create (fileIO , file , "gzip" );
692
+ writer = ((SupportsDirectWrite ) factory ).create (fileIO , file , compression );
661
693
} else {
662
694
out = fileIO .newOutputStream (file , true );
663
- writer = factory .create (out , "gzip" );
695
+ writer = factory .create (out , compression );
664
696
}
665
697
for (InternalRow row : rows ) {
666
698
writer .addElement (row );
0 commit comments