ParquetファイルをJavaで生成する
Parquetファイルを生成するサンプルJavaコードを書きました。
以下の記事を参考にしました。
ソースファイル
Javaのソース1つとライブラリ依存性を記載した pom.xml
の2ファイルです。
$ tree . ├── pom.xml └── src └── main └── java └── org └── example └── Main.java 5 directories, 2 files
Main.java
src/main/java/org/example/Main.java
のソースは以下の内容です。
Main
クラスのほか、CustomParquetWriter
とCustomParquetWriter
というクラスを定義しています。1ファイルにおさめたかったので内部クラスにしていますが、別ソースファイルにして普通のクラスにしても大丈夫です。
package org.example; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.api.RecordConsumer; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.MessageTypeParser; import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; public class Main { public static void main(String[] args) throws Exception { // 出力先ファイルパス var outputFilePath = "output.parquet"; var outputFile = new File(outputFilePath); if (outputFile.exists()) { outputFile.delete(); } var path = new Path("file:" + outputFile.getAbsolutePath()); var writer = new CustomParquetWriter(path, getSchema(), CompressionCodecName.SNAPPY ); // データを書き込む for (var record: getData()) { writer.write(record); } writer.close(); } // サンプルデータのスキーマを定義 // サンプルとしていろいろなパターンのデータ型を並べています private static MessageType getSchema() { var rawSchema = "message m { " + "required BOOLEAN col_b1; " + "required INT32 col_i32; " + "required INT32 col_i32_int8 (INT_8); " + "required INT32 col_i32_int16 (INT_16); " + "required INT32 col_i32_int32 (INT_32); " + "required INT32 col_i32_uint8 (UINT_8); " + "required INT32 col_i32_uint16 (UINT_16); " + "required INT32 col_i32_uint32 (UINT_32); " + "required INT64 col_i64; " + "required INT64 col_i64_int64 (INT_64); " + "required INT64 col_i64_uint64 (UINT_64); " + "required INT32 col_i32_dec10 (DECIMAL(1, 0)); " + "required INT32 col_i32_dec31 (DECIMAL(3, 1)); " + "required INT32 col_i32_dec94 (DECIMAL(9, 4)); " + "required INT64 col_i64_dec188 (DECIMAL(18, 8)); " + "required BINARY col_bin_dec42 (DECIMAL(4, 2)); " + "required FIXED_LEN_BYTE_ARRAY(2) col_fixed_dec42 (DECIMAL(4, 2)); " + "required INT32 col_i32_date (DATE); " + "required INT32 col_i32_timemillis (TIME_MILLIS); " + "required INT64 col_i64_timemicros (TIME_MICROS); " + "required INT64 col_i64_timestampmillis (TIMESTAMP_MILLIS); " + "required INT64 col_i64_timestampmicros (TIMESTAMP_MICROS); " + "required FLOAT col_f32; " + "required DOUBLE col_f64; " + "required BINARY col_bin; " + "required BINARY col_bin_utf8 (UTF8); " + "required FIXED_LEN_BYTE_ARRAY(3) col_fixed; " + "}"; return MessageTypeParser.parseMessageType(rawSchema); } // サンプルデータを作成 private static List<List<String>> getData() { var data = new ArrayList<List<String>>(); { // サンプルデータの1レコード目 var item = new ArrayList<String>(); item.add("true"); item.add("123"); item.add("123"); item.add("123"); item.add("123"); item.add("123"); item.add("123"); item.add("123"); item.add("5000000000"); item.add("5000000000"); item.add("5000000000"); item.add("3"); item.add("123"); // 12.3 item.add("123456789"); // 12345.6789 item.add("123456789012345678"); // 1234567890.12345678 item.add("!!"); // 0x2121 -> decimal 84.81 item.add("!!"); // 0x2121 -> decimal 84.81 item.add("18563"); // 2020-10-28 item.add("43200000"); // 12:00:00 item.add("43200000000"); // 12:00:00 item.add("1603886400000"); // 2020-10-28T12:00:00 item.add("1603886400000000"); // 2020-10-28T12:00:00 item.add("1.2"); item.add("1.2"); item.add("abc"); item.add("abc"); item.add("abc"); data.add(item); } // dataに1レコードしか入れていないが、 // 複数レコード入れてもよい return data; } private static class CustomParquetWriter extends ParquetWriter<List<String>> { CustomParquetWriter(Path file, MessageType schema, CompressionCodecName codecName) throws IOException { super(file, new CustomWriteSupport(schema), codecName, DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE, false, false); } private static class CustomWriteSupport extends WriteSupport<List<String>> { MessageType schema; RecordConsumer recordConsumer; List<ColumnDescriptor> cols; CustomWriteSupport(MessageType schema) { this.schema = schema; this.cols = schema.getColumns(); } @Override public WriteContext init(Configuration config) { return new WriteContext(schema, new HashMap<String, String>()); } @Override public void prepareForWrite(RecordConsumer recordConsumer) { this.recordConsumer = recordConsumer; } @Override public void write(List<String> values) { if (values.size() != cols.size()) { throw new IllegalArgumentException(); } recordConsumer.startMessage(); for (int i = 0; i < cols.size(); ++i) { String val = values.get(i); if (val.length() > 0) { recordConsumer.startField(cols.get(i).getPath()[0], i); switch (cols.get(i).getType()) { case BOOLEAN: recordConsumer.addBoolean(Boolean.parseBoolean(val)); break; case FLOAT: recordConsumer.addFloat(Float.parseFloat(val)); break; case DOUBLE: recordConsumer.addDouble(Double.parseDouble(val)); break; case INT32: recordConsumer.addInteger(Integer.parseInt(val)); break; case INT64: recordConsumer.addLong(Long.parseLong(val)); break; case BINARY: recordConsumer.addBinary(Binary.fromString(val.toString())); break; default: throw new IllegalArgumentException(); } recordConsumer.endField(cols.get(i).getPath()[0], i); } } recordConsumer.endMessage(); } } } }
pom.xml
pom.xml
は以下の内容です。あとで実行しやすいようにFat Jarを作成する設定です。
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>example</artifactId> <packaging>jar</packaging> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>1.10</maven.compiler.source> <maven.compiler.target>1.10</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.apache.parquet</groupId> <artifactId>parquet-hadoop</artifactId> <version>1.10.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-core</artifactId> <version>1.2.1</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.3.0</version> <configuration> <finalName>test</finalName> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass>org.example.Main</mainClass> </manifest> </archive> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
実行
次のコマンドでビルドと実行ができます。
$ mvn clean install $ java -jar target/test-jar-with-dependencies.jar
実行するとログが多く流れ、最後にoutput.parquet
というファイルが生成されます。
Parquetファイルの中身を確認
Parquetファイルの中身はParquet Toolsで見ることができます。以下の記事にインストール方法を書きました。
parquet-toolsのcatコマンドで見ると次のように見えます。catコマンドはDECIMAL型や日時のデータ型に正しく対応していないように見えます。
$ parquet-tools cat --json output.parquet | jq . { "col_b1": true, "col_i32": 123, "col_i32_int8": 123, "col_i32_int16": 123, "col_i32_int32": 123, "col_i32_uint8": 123, "col_i32_uint16": 123, "col_i32_uint32": 123, "col_i64": 5000000000, "col_i64_int64": 5000000000, "col_i64_uint64": 5000000000, "col_i32_dec10": 3, "col_i32_dec31": 123, "col_i32_dec94": 123456789, "col_i64_dec188": 123456789012345680, "col_bin_dec42": 84.81, "col_fixed_dec42": 84.81, "col_i32_date": 18563, "col_i32_timemillis": 43200000, "col_i64_timemicros": 43200000000, "col_i64_timestampmillis": 1603886400000, "col_i64_timestampmicros": 1603886400000000, "col_f32": 1.2, "col_f64": 1.2, "col_bin": "YWJj", "col_bin_utf8": "abc", "col_fixed": "YWJj" }
parquet-toolsのschemaコマンドで見ると、Javaソースに書いたスキーマと一致していることが確認できます。
$ parquet-tools schema output.parquet message m { required boolean col_b1; required int32 col_i32; required int32 col_i32_int8 (INT_8); required int32 col_i32_int16 (INT_16); required int32 col_i32_int32 (INT_32); required int32 col_i32_uint8 (UINT_8); required int32 col_i32_uint16 (UINT_16); required int32 col_i32_uint32 (UINT_32); required int64 col_i64; required int64 col_i64_int64 (INT_64); required int64 col_i64_uint64 (UINT_64); required int32 col_i32_dec10 (DECIMAL(1,0)); required int32 col_i32_dec31 (DECIMAL(3,1)); required int32 col_i32_dec94 (DECIMAL(9,4)); required int64 col_i64_dec188 (DECIMAL(18,8)); required binary col_bin_dec42 (DECIMAL(4,2)); required fixed_len_byte_array(2) col_fixed_dec42 (DECIMAL(4,2)); required int32 col_i32_date (DATE); required int32 col_i32_timemillis (TIME_MILLIS); required int64 col_i64_timemicros (TIME_MICROS); required int64 col_i64_timestampmillis (TIMESTAMP_MILLIS); required int64 col_i64_timestampmicros (TIMESTAMP_MICROS); required float col_f32; required double col_f64; required binary col_bin; required binary col_bin_utf8 (UTF8); required fixed_len_byte_array(3) col_fixed; }
parquet-toolsのmetaコマンドで見ると、次のような結果になります。後半の統計情報を見ると、catコマンドと違って、正しく値が入っていることが確認できます。
$ parquet-tools meta output.parquet file: file:/home/xxxx/output.parquet creator: parquet-mr version 1.10.1 (build a89df8f9932b6ef6633d06069e50c9b7970bebd1) file schema: m -------------------------------------------------------------------------------- col_b1: REQUIRED BOOLEAN R:0 D:0 col_i32: REQUIRED INT32 R:0 D:0 col_i32_int8: REQUIRED INT32 O:INT_8 R:0 D:0 col_i32_int16: REQUIRED INT32 O:INT_16 R:0 D:0 col_i32_int32: REQUIRED INT32 O:INT_32 R:0 D:0 col_i32_uint8: REQUIRED INT32 O:UINT_8 R:0 D:0 col_i32_uint16: REQUIRED INT32 O:UINT_16 R:0 D:0 col_i32_uint32: REQUIRED INT32 O:UINT_32 R:0 D:0 col_i64: REQUIRED INT64 R:0 D:0 col_i64_int64: REQUIRED INT64 O:INT_64 R:0 D:0 col_i64_uint64: REQUIRED INT64 O:UINT_64 R:0 D:0 col_i32_dec10: REQUIRED INT32 O:DECIMAL R:0 D:0 col_i32_dec31: REQUIRED INT32 O:DECIMAL R:0 D:0 col_i32_dec94: REQUIRED INT32 O:DECIMAL R:0 D:0 col_i64_dec188: REQUIRED INT64 O:DECIMAL R:0 D:0 col_bin_dec42: REQUIRED BINARY O:DECIMAL R:0 D:0 col_fixed_dec42: REQUIRED FIXED_LEN_BYTE_ARRAY O:DECIMAL R:0 D:0 col_i32_date: REQUIRED INT32 O:DATE R:0 D:0 col_i32_timemillis: REQUIRED INT32 O:TIME_MILLIS R:0 D:0 col_i64_timemicros: REQUIRED INT64 O:TIME_MICROS R:0 D:0 col_i64_timestampmillis: REQUIRED INT64 O:TIMESTAMP_MILLIS R:0 D:0 col_i64_timestampmicros: REQUIRED INT64 O:TIMESTAMP_MICROS R:0 D:0 col_f32: REQUIRED FLOAT R:0 D:0 col_f64: REQUIRED DOUBLE R:0 D:0 col_bin: REQUIRED BINARY R:0 D:0 col_bin_utf8: REQUIRED BINARY O:UTF8 R:0 D:0 col_fixed: REQUIRED FIXED_LEN_BYTE_ARRAY R:0 D:0 row group 1: RC:1 TS:1445 OFFSET:4 -------------------------------------------------------------------------------- col_b1: BOOLEAN SNAPPY DO:0 FPO:4 SZ:36/34/0.94 VC:1 ENC:PLAIN,BIT_PACKED ST:[min: true, max: true, num_nulls: 0] col_i32: INT32 SNAPPY DO:0 FPO:40 SZ:51/49/0.96 VC:1 ENC:PLAIN,BIT_PACKED ST:[min: 123, max: 123, num_nulls: 0] col_i32_int8: INT32 SNAPPY DO:0 FPO:91 SZ:51/49/0.96 VC:1 ENC:PLAIN,BIT_PACKED ST:[min: 123, max: 123, num_nulls: 0] col_i32_int16: INT32 SNAPPY DO:0 FPO:142 SZ:51/49/0.96 VC:1 ENC:PLAIN,BIT_PACKED ST:[min: 123, max: 123, num_nulls: 0] col_i32_int32: INT32 SNAPPY DO:0 FPO:193 SZ:51/49/0.96 VC:1 ENC:PLAIN,BIT_PACKED ST:[min: 123, max: 123, num_nulls: 0] col_i32_uint8: INT32 SNAPPY DO:0 FPO:244 SZ:51/49/0.96 VC:1 ENC:PLAIN,BIT_PACKED ST:[min: 123, max: 123, num_nulls: 0] col_i32_uint16: INT32 SNAPPY DO:0 FPO:295 SZ:51/49/0.96 VC:1 ENC:PLAIN,BIT_PACKED ST:[min: 123, max: 123, num_nulls: 0] col_i32_uint32: INT32 SNAPPY DO:0 FPO:346 SZ:51/49/0.96 VC:1 ENC:PLAIN,BIT_PACKED ST:[min: 123, max: 123, num_nulls: 0] col_i64: INT64 SNAPPY DO:0 FPO:397 SZ:71/69/0.97 VC:1 ENC:PLAIN,BIT_PACKED ST:[min: 5000000000, max: 5000000000, num_nulls: 0] col_i64_int64: INT64 SNAPPY DO:0 FPO:468 SZ:71/69/0.97 VC:1 ENC:PLAIN,BIT_PACKED ST:[min: 5000000000, max: 5000000000, num_nulls: 0] col_i64_uint64: INT64 SNAPPY DO:0 FPO:539 SZ:71/69/0.97 VC:1 ENC:PLAIN,BIT_PACKED ST:[min: 5000000000, max: 5000000000, num_nulls: 0] col_i32_dec10: INT32 SNAPPY DO:0 FPO:610 SZ:51/49/0.96 VC:1 ENC:PLAIN,BIT_PACKED ST:[min: 3, max: 3, num_nulls: 0] col_i32_dec31: INT32 SNAPPY DO:0 FPO:661 SZ:51/49/0.96 VC:1 ENC:PLAIN,BIT_PACKED ST:[min: 12.3, max: 12.3, num_nulls: 0] col_i32_dec94: INT32 SNAPPY DO:0 FPO:712 SZ:51/49/0.96 VC:1 ENC:PLAIN,BIT_PACKED ST:[min: 12345.6789, max: 12345.6789, num_nulls: 0] col_i64_dec188: INT64 SNAPPY DO:0 FPO:763 SZ:71/69/0.97 VC:1 ENC:PLAIN,BIT_PACKED ST:[min: 1234567890.12345678, max: 1234567890.12345678, num_nulls: 0] col_bin_dec42: BINARY SNAPPY DO:0 FPO:834 SZ:45/43/0.96 VC:1 ENC:PLAIN,BIT_PACKED ST:[min: 84.81, max: 84.81, num_nulls: 0] col_fixed_dec42: FIXED_LEN_BYTE_ARRAY SNAPPY DO:0 FPO:879 SZ:41/39/0.95 VC:1 ENC:PLAIN,BIT_PACKED ST:[min: 84.81, max: 84.81, num_nulls: 0] col_i32_date: INT32 SNAPPY DO:0 FPO:920 SZ:51/49/0.96 VC:1 ENC:PLAIN,BIT_PACKED ST:[min: 2020-10-28, max: 2020-10-28, num_nulls: 0] col_i32_timemillis: INT32 SNAPPY DO:0 FPO:971 SZ:51/49/0.96 VC:1 ENC:PLAIN,BIT_PACKED ST:[min: 12:00:00.000, max: 12:00:00.000, num_nulls: 0] col_i64_timemicros: INT64 SNAPPY DO:0 FPO:1022 SZ:71/69/0.97 VC:1 ENC:PLAIN,BIT_PACKED ST:[min: 12:00:00.000000, max: 12:00:00.000000, num_nulls: 0] col_i64_timestampmillis: INT64 SNAPPY DO:0 FPO:1093 SZ:71/69/0.97 VC:1 ENC:PLAIN,BIT_PACKED ST:[min: 2020-10-28T12:00:00.000, max: 2020-10-28T12:00:00.000, num_nulls: 0] col_i64_timestampmicros: INT64 SNAPPY DO:0 FPO:1164 SZ:71/69/0.97 VC:1 ENC:PLAIN,BIT_PACKED ST:[min: 2020-10-28T12:00:00.000000, max: 2020-10-28T12:00:00.000000, num_nulls: 0] col_f32: FLOAT SNAPPY DO:0 FPO:1235 SZ:51/49/0.96 VC:1 ENC:PLAIN,BIT_PACKED ST:[min: 1.2, max: 1.2, num_nulls: 0] col_f64: DOUBLE SNAPPY DO:0 FPO:1286 SZ:71/69/0.97 VC:1 ENC:PLAIN,BIT_PACKED ST:[min: 1.2, max: 1.2, num_nulls: 0] col_bin: BINARY SNAPPY DO:0 FPO:1357 SZ:50/48/0.96 VC:1 ENC:PLAIN,BIT_PACKED ST:[min: 0x616263, max: 0x616263, num_nulls: 0] col_bin_utf8: BINARY SNAPPY DO:0 FPO:1407 SZ:50/48/0.96 VC:1 ENC:PLAIN,BIT_PACKED ST:[min: abc, max: abc, num_nulls: 0] col_fixed: FIXED_LEN_BYTE_ARRAY SNAPPY DO:0 FPO:1457 SZ:46/44/0.96 VC:1 ENC:PLAIN,BIT_PACKED ST:[min: 0x616263, max: 0x616263, num_nulls: 0]
Parquetのデータ型の解釈はparquet-toolsでもいい加減だなという印象。