ParquetファイルをJavaで生成する

Parquetファイルを生成するサンプルJavaコードを書きました。

以下の記事を参考にしました。

ソースファイル

Javaのソース1つとライブラリ依存性を記載した pom.xml の2ファイルです。

$ tree
.
├── pom.xml
└── src
    └── main
        └── java
            └── org
                └── example
                    └── Main.java

5 directories, 2 files

Main.java

src/main/java/org/example/Main.java のソースは以下の内容です。

Mainクラスのほか、CustomParquetWriterCustomParquetWriterというクラスを定義しています。1ファイルにおさめたかったので内部クラスにしていますが、別ソースファイルにして普通のクラスにしても大丈夫です。

package org.example;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.RecordConsumer;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

public class Main {

  public static void main(String[] args) throws Exception {
    // 出力先ファイルパス
    var outputFilePath = "output.parquet";

    var outputFile = new File(outputFilePath);
    if (outputFile.exists()) {
      outputFile.delete();
    }
    var path = new Path("file:" + outputFile.getAbsolutePath());
    var writer = new CustomParquetWriter(path, getSchema(), CompressionCodecName.SNAPPY
    );

    // データを書き込む
    for (var record: getData()) {
      writer.write(record);
    }
    writer.close();
  }

  // サンプルデータのスキーマを定義
  // サンプルとしていろいろなパターンのデータ型を並べています
  private static MessageType getSchema() {
    var rawSchema =
      "message m { " +
        "required BOOLEAN col_b1; " +

        "required INT32 col_i32; " +
        "required INT32 col_i32_int8 (INT_8); " +
        "required INT32 col_i32_int16 (INT_16); " +
        "required INT32 col_i32_int32 (INT_32); " +
        "required INT32 col_i32_uint8 (UINT_8); " +
        "required INT32 col_i32_uint16 (UINT_16); " +
        "required INT32 col_i32_uint32 (UINT_32); " +
        "required INT64 col_i64; " +
        "required INT64 col_i64_int64 (INT_64); " +
        "required INT64 col_i64_uint64 (UINT_64); " +

        "required INT32 col_i32_dec10 (DECIMAL(1, 0)); " +
        "required INT32 col_i32_dec31 (DECIMAL(3, 1)); " +
        "required INT32 col_i32_dec94 (DECIMAL(9, 4)); " +
        "required INT64 col_i64_dec188 (DECIMAL(18, 8)); " +
        "required BINARY col_bin_dec42 (DECIMAL(4, 2)); " +
        "required FIXED_LEN_BYTE_ARRAY(2) col_fixed_dec42 (DECIMAL(4, 2)); " +

        "required INT32 col_i32_date (DATE); " +
        "required INT32 col_i32_timemillis (TIME_MILLIS); " +
        "required INT64 col_i64_timemicros (TIME_MICROS); " +
        "required INT64 col_i64_timestampmillis (TIMESTAMP_MILLIS); " +
        "required INT64 col_i64_timestampmicros (TIMESTAMP_MICROS); " +

        "required FLOAT col_f32; " +
        "required DOUBLE col_f64; " +

        "required BINARY col_bin; " +
        "required BINARY col_bin_utf8 (UTF8); " +
        "required FIXED_LEN_BYTE_ARRAY(3) col_fixed; " +
      "}";
    return MessageTypeParser.parseMessageType(rawSchema);
  }

  // サンプルデータを作成
  private static List<List<String>> getData() {
    var data = new ArrayList<List<String>>();
    {
      // サンプルデータの1レコード目
      var item = new ArrayList<String>();

      item.add("true");

      item.add("123");
      item.add("123");
      item.add("123");
      item.add("123");
      item.add("123");
      item.add("123");
      item.add("123");
      item.add("5000000000");
      item.add("5000000000");
      item.add("5000000000");

      item.add("3");
      item.add("123"); // 12.3
      item.add("123456789"); // 12345.6789
      item.add("123456789012345678"); // 1234567890.12345678
      item.add("!!"); // 0x2121 -> decimal 84.81
      item.add("!!"); // 0x2121 -> decimal 84.81

      item.add("18563"); // 2020-10-28
      item.add("43200000"); // 12:00:00
      item.add("43200000000"); // 12:00:00
      item.add("1603886400000"); // 2020-10-28T12:00:00
      item.add("1603886400000000"); // 2020-10-28T12:00:00

      item.add("1.2");
      item.add("1.2");

      item.add("abc");
      item.add("abc");
      item.add("abc");

      data.add(item);
    }

    // dataに1レコードしか入れていないが、
    // 複数レコード入れてもよい

    return data;
  }

  private static class CustomParquetWriter extends ParquetWriter<List<String>> {

    CustomParquetWriter(Path file, MessageType schema, CompressionCodecName codecName) throws IOException {
      super(file, new CustomWriteSupport(schema), codecName,
        DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE, false, false);
    }

    private static class CustomWriteSupport extends WriteSupport<List<String>> {
      MessageType schema;
      RecordConsumer recordConsumer;
      List<ColumnDescriptor> cols;

      CustomWriteSupport(MessageType schema) {
        this.schema = schema;
        this.cols = schema.getColumns();
      }

      @Override
      public WriteContext init(Configuration config) {
        return new WriteContext(schema, new HashMap<String, String>());
      }

      @Override
      public void prepareForWrite(RecordConsumer recordConsumer) {
        this.recordConsumer = recordConsumer;
      }

      @Override
      public void write(List<String> values) {
        if (values.size() != cols.size()) {
          throw new IllegalArgumentException();
        }

        recordConsumer.startMessage();
        for (int i = 0; i < cols.size(); ++i) {
          String val = values.get(i);
          if (val.length() > 0) {
            recordConsumer.startField(cols.get(i).getPath()[0], i);
            switch (cols.get(i).getType()) {
              case BOOLEAN:
                recordConsumer.addBoolean(Boolean.parseBoolean(val));
                break;
              case FLOAT:
                recordConsumer.addFloat(Float.parseFloat(val));
                break;
              case DOUBLE:
                recordConsumer.addDouble(Double.parseDouble(val));
                break;
              case INT32:
                recordConsumer.addInteger(Integer.parseInt(val));
                break;
              case INT64:
                recordConsumer.addLong(Long.parseLong(val));
                break;
              case BINARY:
                recordConsumer.addBinary(Binary.fromString(val.toString()));
                break;
              default:
                throw new IllegalArgumentException();
            }
            recordConsumer.endField(cols.get(i).getPath()[0], i);
          }
        }
        recordConsumer.endMessage();
      }
    }

  }

}

pom.xml

pom.xml は以下の内容です。あとで実行しやすいようにFat Jarを作成する設定です。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>example</artifactId>
    <packaging>jar</packaging>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>1.10</maven.compiler.source>
        <maven.compiler.target>1.10</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.parquet</groupId>
            <artifactId>parquet-hadoop</artifactId>
            <version>1.10.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-core</artifactId>
            <version>1.2.1</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.3.0</version>
                <configuration>
                    <finalName>test</finalName>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass>org.example.Main</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

実行

次のコマンドでビルドと実行ができます。

$ mvn clean install
$ java -jar target/test-jar-with-dependencies.jar

実行するとログが多く流れ、最後にoutput.parquetというファイルが生成されます。

Parquetファイルの中身を確認

Parquetファイルの中身はParquet Toolsで見ることができます。以下の記事にインストール方法を書きました。

parquet-toolsのcatコマンドで見ると次のように見えます。catコマンドはDECIMAL型や日時のデータ型に正しく対応していないように見えます。

$ parquet-tools cat --json output.parquet | jq .
{
  "col_b1": true,
  "col_i32": 123,
  "col_i32_int8": 123,
  "col_i32_int16": 123,
  "col_i32_int32": 123,
  "col_i32_uint8": 123,
  "col_i32_uint16": 123,
  "col_i32_uint32": 123,
  "col_i64": 5000000000,
  "col_i64_int64": 5000000000,
  "col_i64_uint64": 5000000000,
  "col_i32_dec10": 3,
  "col_i32_dec31": 123,
  "col_i32_dec94": 123456789,
  "col_i64_dec188": 123456789012345680,
  "col_bin_dec42": 84.81,
  "col_fixed_dec42": 84.81,
  "col_i32_date": 18563,
  "col_i32_timemillis": 43200000,
  "col_i64_timemicros": 43200000000,
  "col_i64_timestampmillis": 1603886400000,
  "col_i64_timestampmicros": 1603886400000000,
  "col_f32": 1.2,
  "col_f64": 1.2,
  "col_bin": "YWJj",
  "col_bin_utf8": "abc",
  "col_fixed": "YWJj"
}

parquet-toolsのschemaコマンドで見ると、Javaソースに書いたスキーマと一致していることが確認できます。

$ parquet-tools schema output.parquet
message m {
  required boolean col_b1;
  required int32 col_i32;
  required int32 col_i32_int8 (INT_8);
  required int32 col_i32_int16 (INT_16);
  required int32 col_i32_int32 (INT_32);
  required int32 col_i32_uint8 (UINT_8);
  required int32 col_i32_uint16 (UINT_16);
  required int32 col_i32_uint32 (UINT_32);
  required int64 col_i64;
  required int64 col_i64_int64 (INT_64);
  required int64 col_i64_uint64 (UINT_64);
  required int32 col_i32_dec10 (DECIMAL(1,0));
  required int32 col_i32_dec31 (DECIMAL(3,1));
  required int32 col_i32_dec94 (DECIMAL(9,4));
  required int64 col_i64_dec188 (DECIMAL(18,8));
  required binary col_bin_dec42 (DECIMAL(4,2));
  required fixed_len_byte_array(2) col_fixed_dec42 (DECIMAL(4,2));
  required int32 col_i32_date (DATE);
  required int32 col_i32_timemillis (TIME_MILLIS);
  required int64 col_i64_timemicros (TIME_MICROS);
  required int64 col_i64_timestampmillis (TIMESTAMP_MILLIS);
  required int64 col_i64_timestampmicros (TIMESTAMP_MICROS);
  required float col_f32;
  required double col_f64;
  required binary col_bin;
  required binary col_bin_utf8 (UTF8);
  required fixed_len_byte_array(3) col_fixed;
}

parquet-toolsのmetaコマンドで見ると、次のような結果になります。後半の統計情報を見ると、catコマンドと違って、正しく値が入っていることが確認できます。

$ parquet-tools meta output.parquet
file:                    file:/home/xxxx/output.parquet 
creator:                 parquet-mr version 1.10.1 (build a89df8f9932b6ef6633d06069e50c9b7970bebd1) 

file schema:             m 
--------------------------------------------------------------------------------
col_b1:                  REQUIRED BOOLEAN R:0 D:0
col_i32:                 REQUIRED INT32 R:0 D:0
col_i32_int8:            REQUIRED INT32 O:INT_8 R:0 D:0
col_i32_int16:           REQUIRED INT32 O:INT_16 R:0 D:0
col_i32_int32:           REQUIRED INT32 O:INT_32 R:0 D:0
col_i32_uint8:           REQUIRED INT32 O:UINT_8 R:0 D:0
col_i32_uint16:          REQUIRED INT32 O:UINT_16 R:0 D:0
col_i32_uint32:          REQUIRED INT32 O:UINT_32 R:0 D:0
col_i64:                 REQUIRED INT64 R:0 D:0
col_i64_int64:           REQUIRED INT64 O:INT_64 R:0 D:0
col_i64_uint64:          REQUIRED INT64 O:UINT_64 R:0 D:0
col_i32_dec10:           REQUIRED INT32 O:DECIMAL R:0 D:0
col_i32_dec31:           REQUIRED INT32 O:DECIMAL R:0 D:0
col_i32_dec94:           REQUIRED INT32 O:DECIMAL R:0 D:0
col_i64_dec188:          REQUIRED INT64 O:DECIMAL R:0 D:0
col_bin_dec42:           REQUIRED BINARY O:DECIMAL R:0 D:0
col_fixed_dec42:         REQUIRED FIXED_LEN_BYTE_ARRAY O:DECIMAL R:0 D:0
col_i32_date:            REQUIRED INT32 O:DATE R:0 D:0
col_i32_timemillis:      REQUIRED INT32 O:TIME_MILLIS R:0 D:0
col_i64_timemicros:      REQUIRED INT64 O:TIME_MICROS R:0 D:0
col_i64_timestampmillis: REQUIRED INT64 O:TIMESTAMP_MILLIS R:0 D:0
col_i64_timestampmicros: REQUIRED INT64 O:TIMESTAMP_MICROS R:0 D:0
col_f32:                 REQUIRED FLOAT R:0 D:0
col_f64:                 REQUIRED DOUBLE R:0 D:0
col_bin:                 REQUIRED BINARY R:0 D:0
col_bin_utf8:            REQUIRED BINARY O:UTF8 R:0 D:0
col_fixed:               REQUIRED FIXED_LEN_BYTE_ARRAY R:0 D:0

row group 1:             RC:1 TS:1445 OFFSET:4 
--------------------------------------------------------------------------------
col_b1:                   BOOLEAN SNAPPY DO:0 FPO:4 SZ:36/34/0.94 VC:1 ENC:PLAIN,BIT_PACKED ST:[min: true, max: true, num_nulls: 0]
col_i32:                  INT32 SNAPPY DO:0 FPO:40 SZ:51/49/0.96 VC:1 ENC:PLAIN,BIT_PACKED ST:[min: 123, max: 123, num_nulls: 0]
col_i32_int8:             INT32 SNAPPY DO:0 FPO:91 SZ:51/49/0.96 VC:1 ENC:PLAIN,BIT_PACKED ST:[min: 123, max: 123, num_nulls: 0]
col_i32_int16:            INT32 SNAPPY DO:0 FPO:142 SZ:51/49/0.96 VC:1 ENC:PLAIN,BIT_PACKED ST:[min: 123, max: 123, num_nulls: 0]
col_i32_int32:            INT32 SNAPPY DO:0 FPO:193 SZ:51/49/0.96 VC:1 ENC:PLAIN,BIT_PACKED ST:[min: 123, max: 123, num_nulls: 0]
col_i32_uint8:            INT32 SNAPPY DO:0 FPO:244 SZ:51/49/0.96 VC:1 ENC:PLAIN,BIT_PACKED ST:[min: 123, max: 123, num_nulls: 0]
col_i32_uint16:           INT32 SNAPPY DO:0 FPO:295 SZ:51/49/0.96 VC:1 ENC:PLAIN,BIT_PACKED ST:[min: 123, max: 123, num_nulls: 0]
col_i32_uint32:           INT32 SNAPPY DO:0 FPO:346 SZ:51/49/0.96 VC:1 ENC:PLAIN,BIT_PACKED ST:[min: 123, max: 123, num_nulls: 0]
col_i64:                  INT64 SNAPPY DO:0 FPO:397 SZ:71/69/0.97 VC:1 ENC:PLAIN,BIT_PACKED ST:[min: 5000000000, max: 5000000000, num_nulls: 0]
col_i64_int64:            INT64 SNAPPY DO:0 FPO:468 SZ:71/69/0.97 VC:1 ENC:PLAIN,BIT_PACKED ST:[min: 5000000000, max: 5000000000, num_nulls: 0]
col_i64_uint64:           INT64 SNAPPY DO:0 FPO:539 SZ:71/69/0.97 VC:1 ENC:PLAIN,BIT_PACKED ST:[min: 5000000000, max: 5000000000, num_nulls: 0]
col_i32_dec10:            INT32 SNAPPY DO:0 FPO:610 SZ:51/49/0.96 VC:1 ENC:PLAIN,BIT_PACKED ST:[min: 3, max: 3, num_nulls: 0]
col_i32_dec31:            INT32 SNAPPY DO:0 FPO:661 SZ:51/49/0.96 VC:1 ENC:PLAIN,BIT_PACKED ST:[min: 12.3, max: 12.3, num_nulls: 0]
col_i32_dec94:            INT32 SNAPPY DO:0 FPO:712 SZ:51/49/0.96 VC:1 ENC:PLAIN,BIT_PACKED ST:[min: 12345.6789, max: 12345.6789, num_nulls: 0]
col_i64_dec188:           INT64 SNAPPY DO:0 FPO:763 SZ:71/69/0.97 VC:1 ENC:PLAIN,BIT_PACKED ST:[min: 1234567890.12345678, max: 1234567890.12345678, num_nulls: 0]
col_bin_dec42:            BINARY SNAPPY DO:0 FPO:834 SZ:45/43/0.96 VC:1 ENC:PLAIN,BIT_PACKED ST:[min: 84.81, max: 84.81, num_nulls: 0]
col_fixed_dec42:          FIXED_LEN_BYTE_ARRAY SNAPPY DO:0 FPO:879 SZ:41/39/0.95 VC:1 ENC:PLAIN,BIT_PACKED ST:[min: 84.81, max: 84.81, num_nulls: 0]
col_i32_date:             INT32 SNAPPY DO:0 FPO:920 SZ:51/49/0.96 VC:1 ENC:PLAIN,BIT_PACKED ST:[min: 2020-10-28, max: 2020-10-28, num_nulls: 0]
col_i32_timemillis:       INT32 SNAPPY DO:0 FPO:971 SZ:51/49/0.96 VC:1 ENC:PLAIN,BIT_PACKED ST:[min: 12:00:00.000, max: 12:00:00.000, num_nulls: 0]
col_i64_timemicros:       INT64 SNAPPY DO:0 FPO:1022 SZ:71/69/0.97 VC:1 ENC:PLAIN,BIT_PACKED ST:[min: 12:00:00.000000, max: 12:00:00.000000, num_nulls: 0]
col_i64_timestampmillis:  INT64 SNAPPY DO:0 FPO:1093 SZ:71/69/0.97 VC:1 ENC:PLAIN,BIT_PACKED ST:[min: 2020-10-28T12:00:00.000, max: 2020-10-28T12:00:00.000, num_nulls: 0]
col_i64_timestampmicros:  INT64 SNAPPY DO:0 FPO:1164 SZ:71/69/0.97 VC:1 ENC:PLAIN,BIT_PACKED ST:[min: 2020-10-28T12:00:00.000000, max: 2020-10-28T12:00:00.000000, num_nulls: 0]
col_f32:                  FLOAT SNAPPY DO:0 FPO:1235 SZ:51/49/0.96 VC:1 ENC:PLAIN,BIT_PACKED ST:[min: 1.2, max: 1.2, num_nulls: 0]
col_f64:                  DOUBLE SNAPPY DO:0 FPO:1286 SZ:71/69/0.97 VC:1 ENC:PLAIN,BIT_PACKED ST:[min: 1.2, max: 1.2, num_nulls: 0]
col_bin:                  BINARY SNAPPY DO:0 FPO:1357 SZ:50/48/0.96 VC:1 ENC:PLAIN,BIT_PACKED ST:[min: 0x616263, max: 0x616263, num_nulls: 0]
col_bin_utf8:             BINARY SNAPPY DO:0 FPO:1407 SZ:50/48/0.96 VC:1 ENC:PLAIN,BIT_PACKED ST:[min: abc, max: abc, num_nulls: 0]
col_fixed:                FIXED_LEN_BYTE_ARRAY SNAPPY DO:0 FPO:1457 SZ:46/44/0.96 VC:1 ENC:PLAIN,BIT_PACKED ST:[min: 0x616263, max: 0x616263, num_nulls: 0]

Parquetのデータ型の解釈はparquet-toolsでもいい加減だなという印象。