データ基盤をDataformで構築して気づいたこと

Dataform導入の経緯

ドワンゴ教育事業でデータサイエンティストとして一応なんとか働いている堀です。

私たちのグループではBigQuery上で教育データのデータウェアハウス基盤を構築しています。

当初はDataformもリリースされていなかったこともあり、Scheduled Queryでデータウェアハウス基盤を構築していました。 しかしながらデータウェアハウス基盤が複雑化し実行SQLが増えるにつれ、SQLの依存関係をもとに各SQLの実行時間を定める必要がでてきて、Scheduled Queryでの管理が厳しくなってきました......

そういった背景から、現在Scheduled QueryからDataformへの移行を行なっています。 本記事ではDataform導入にあたり、どのようなしんどいところがあったのかや、その解決策について紹介します。 Dataformに興味のある方や、導入しようと考えてる方の参考になれば幸いです。

Dataformとは

Dataformは、Google Cloudが提供するデータ変換ワークフローを構築するためのサービスです。

SQLを拡張したSQLXという構文を使用し、複数のSQLXファイルの依存関係を自動的に解決することで、複雑なETLパイプラインを構築できます。 加えて、BigQuery・GitHubとの連携や、パイプラインの可視化・定期実行機能があり、複雑で大規模なデータウェアハウス基盤の構築に向いています。

つまづいたところとその解決策

自動変換SQLは可読性悪いので、エラー時のハンドリングしんどい

Dataformでは、SQLXという既存のSQLを拡張したクエリを用いてパイプラインを構築します。

内部では、SQLXを自動でSQLに変換し、そのSQLをBigQuery上で実行しています。

具体的には

config {
    name: "sample1",
    schema: "sample",
    type: "incremental"
}
SELECT *
FROM ${ref('user_log')} AS ul
INNER JOIN ${ref('user_master')} AS um
ON ul.user_id=um.user_id

といった上記のSQLX1は下記の可読性の低いSQLに変換されます。

BEGIN
CREATE SCHEMA IF NOT EXISTS `dwh-sample.sample` OPTIONS(location="US");
EXCEPTION WHEN ERROR THEN
IF NOT CONTAINS_SUBSTR(@@error.message, "already exists: dataset") AND
NOT CONTAINS_SUBSTR(@@error.message, "too many dataset metadata update operations") AND
NOT CONTAINS_SUBSTR(@@error.message, "User does not have bigquery.datasets.create permission")
THEN
RAISE USING MESSAGE = @@error.message;
END IF;
END;
BEGIN
DECLARE dataform_table_type DEFAULT (
SELECT ANY_VALUE(table_type)
FROM `dwh-sample.sample.INFORMATION_SCHEMA.TABLES`
WHERE table_name = 'sample1'
);
IF dataform_table_type IS NOT NULL THEN
IF dataform_table_type = 'VIEW' THEN DROP VIEW IF EXISTS `dwh-sample.sample.sample1`;
ELSEIF dataform_table_type = 'MATERIALIZED VIEW' THEN DROP MATERIALIZED VIEW IF EXISTS `dwh-sample.sample.sample1`;
END IF;
END IF;
IF dataform_table_type IS NOT NULL THEN
BEGIN
DECLARE dataform_columns ARRAY<STRING>;
DECLARE dataform_columns_list STRING;
SET dataform_columns = (
SELECT
ARRAY_AGG(DISTINCT "`" || column_name || "`")
FROM `dwh-sample.sample.INFORMATION_SCHEMA.COLUMNS`
WHERE table_name = 'sample1'
);
SET dataform_columns_list = (
SELECT
STRING_AGG(column)
FROM UNNEST(dataform_columns) AS column);
EXECUTE IMMEDIATE
"""
CREATE OR REPLACE PROCEDURE `dwh-sample.sample.df_XXXXXXXXXXX`() OPTIONS(strict_mode=false)
BEGIN
INSERT INTO `dwh-sample.sample.sample1`
(""" || dataform_columns_list || """)
SELECT """ || dataform_columns_list || """
FROM (
SELECT *
FROM `dwh-sample.sample.user_log` AS ul
INNER JOIN `dwh-sample.sample.user_master` AS um
ON ul.user_id=um.user_id
);
END;
""";
BEGIN
CALL `dwh-sample.sample.df_XXXXXXXXXXX`();
EXCEPTION WHEN ERROR THEN
DROP PROCEDURE IF EXISTS `dwh-sample.sample.df_XXXXXXXXXXX`;
RAISE;
END;
DROP PROCEDURE IF EXISTS `dwh-sample.sample.df_XXXXXXXXXXX`;
END;
ELSE
BEGIN
CREATE TABLE IF NOT EXISTS `dwh-sample.sample.sample1`
OPTIONS()
AS (
SELECT *
FROM `dwh-sample.sample.user_log` AS ul
INNER JOIN `dwh-sample.sample.user_master` AS um
ON ul.user_id=um.user_id
);
END;
END IF;
END;

中身を読んでみると、incrementalの処理の追加やSQLX記法(\ref参照)の解消などの処理が追加されているようです。

正直な意見を言うと、この変換後のSQLを読むのはちょっとしんどいです...... そして、実際にエラーが発生する場合は、変換後のSQLでエラーが起こり、エラーの行数などもそれに基づいて表示されるため、状況によっては変換後のSQLを読まないといけないこともあります。

そのため、まずは普通のSQLで書いて動くのを確認したあと、それをDataform用にSQLXに書き直すといったことをしたりもしています。

SQLXとSQLの変換に関しては、Google Cloud Japanのこの記事が参考になりましたので、興味のある方は是非ご覧ください。

レコードの追加更新

既存のテーブルにレコードを追加や更新する際、SQLではINSERTMERGEを使うのではないでしょうか。

INSERT INTO `user_master` (user_id, updated_at)
SELECT user_id, updated_at
FROM `new_data`
MERGE INTO `user_master` AS target
USING (
    SELECT user_id, updated_at
    FROM `new_data`
)AS source
ON target.user_id = source.user_id
WHEN MATCHED THEN
    UPDATE SET target.updated_at = source.updated_at
WHEN NOT MATCHED THEN
    INSERT (user_id, updated_at)
    VALUES (source.user_id, source.updated_at)

このようなケースでは、Dataformではそれぞれincrementalincremental+uniqueKeyを使うことで、同じようなことができます。

config {
  type: "incremental",
  name: "user_master",
  uniqueKey: ["user_id"]
}
SELECT user_id, updated_at
FROM `new_data`

具体的には、type: "incremental"で既存のテーブルを更新する形で実行、uniqueKey: ["user_id"]を加えることで、どのカラムをキーにして更新するかを指定しています。

個人的には、この書き方の方がSQLよりもスッキリしていて好みです。

パーティションフィルタがある場合のレコード更新の注意点

ただ、挿入先(例でいうuser_master)にパーティションフィルタ必須の場合は注意が必要です。

というのは、その際にuniqueKeyの検証を行うところの処理で、パーティションフィルタがないとエラーが発生してしまうからです。 そういった場合は

config {
  type: "incremental",
  name: "user_master",
  uniqueKey: ["user_id"],
  bigquery: {
    partitionBy: "DATE(updated_at)",
    updatePartitionFilter: "updated_at >= timestamp_sub(current_timestamp(), interval XXX day)"
  }
}
SELECT user_id, updated_at
FROM `new_data`

のように、updatePartitionFilterで参照する日付を指定する対応が必要です。

assertionの話

Dataformではassertion機能があり、assertionを設定するとクエリが実行されるたびに不適切なデータがないかチェックされます。

例えば、user_idがNULLでないことをチェックする場合は

config {
  type: "table",
  name: "user_master",
  assertions: {
    nonNull: ['user_id']
  }
}
SELECT user_id, updated_at
FROM `new_data`

のように書くことで、user_idがNULLでないことをお手軽にチェックできます。

しかし、このnonNullのような組み込みアサーションを使用する際には注意が必要です。

内部的には、Dataformがこのアサーション(例: user_idがNULLでないこと)を検証するために、対象テーブルの全レコードに対してuser_id IS NULLのような条件をチェックするクエリを自動生成し実行します。

もし挿入先のテーブルでパーティションフィルタが必須(require_partition_filter = trueなど)と設定されている場合、この自動生成されたクエリがパーティションフィルタを指定せずに全パーティションにアクセスしようと試みます。

そのため、「パーティションフィルタがありません」といったエラーを引き起こし、アサーションが失敗してしまいます。

しかし、ここも挿入先がパーティションフィルタ必須の場合は注意が必要です。 というのは、assertion周りの処理がパーティションフィルタを考慮していないため、エラーが発生してしまいます。

この問題を回避策として、手動アサーション (custom assertion) があります。 手動アサーションでは、アサーションのロジックを自分でSQLクエリとして記述するため、パーティションフィルタを含めた必要な条件をクエリに含めることができます。

具体的には先ほどのSQLを

config {
  type: "assertion",
  name: "usermaster_user_id_not_null",
}
-- この結果が空でない場合、エラーとみなされます
SELECT *
FROM ${ref("user_master")} AS um
WHERE
  user_id IS NULL
  AND
  um.updated_at < TIMESTAMP("2025-02-05")

といったアサーション用のSQL(手動アサーションとよぶそうです)と実際に実行されるクエリ

config {
    type: "incremental",
    name: "user_master",
    dependencies: ["usermaster_user_id_not_null"]
}
SELECT user_id, updated_at
FROM `new_data`

の2つを分けて書きます。 アサーション用のSQLは、問題のあるデータ(例えば、user_idがNULLのレコード)を選択するクエリとして記述し、このクエリが1行でも結果を返した場合にアサーションが失敗(エラー)となります。

UDFの取り回しが通常と異なる

BigQueryにはUDF(User Defined Function)という、自分で定義した関数をSQL内で使える機能があります。 UDFはBigQuery内に永続的に定義することができるほか、temporary functionとしてそのクエリ実行時のみの一時的な定義も可能です。 自分はよく、ちょっと長いSQLを書いているとtemporary functionに度々使う処理や定数などを書くことがあります。 例えば

-- 日付の閾値を定義
CREATE TEMPORARY FUNCTION timestamp_thresh() RETURNS TIMESTAMP AS (TIMESTAMP('2025-01-01'));

SELECT * FROM `user_log`
WHERE created_at >= timestamp_thresh();

といった感じです。

しかしSQLXでは、typetableincrementalの場合、1ファイルに1ステートメントしか書けないという決まりがあるため、そのままではtemporary functionを記述するとエラーになってしまいます。

SQLXにはpre_operationsという、本実行が行われる前に別のステートメントを実行する機能があるので、それと組み合わせることでtemporary functionを実現できます。

config {
  type: "incremental",
  name: "user_log",
}

pre_operations {
  CREATE TEMPORARY FUNCTION timestamp_thresh() RETURNS TIMESTAMP AS (TIMESTAMP('2025-01-01'));
}
SELECT * FROM `user_log`
WHERE created_at >= timestamp_thresh()

エラーが起きるようなSQLXがリポジトリにあると実行をキックできない

Dataformなどで開発する場合、開発環境(Development workspace)を立てて、そこで複数人が作業する、といったケースはよくあるかと思います。 自分たちもDataformではそのような形で開発をしています。

一方で、Dataformは全体をパイプラインとして管理しているため、単体でエラーが起きるようなSQLXファイルがディレクトリ内にあると、そことは関係のない他のSQLXファイルの実行もキックできなくなります。

もちろん、拡張子を変更するなり、関係ない場所にSQLXファイルを移動させることで暫定的に対応はできます。 しかし、複数人で分担してSQLXを書いている状況では、誰かが途中書きのSQLXを置くだけで全体が機能を停止してしまうのはなかなかに面倒です。

そのため、作業者が1人1つ作業用のDataformを持ち、それに対し1つのリポジトリをリンクさせることでこの問題を解決しました。

graph TD
    A[Aさん用Dataform]
    C[Cさん用Dataform]
    B[Bさん用Dataform]
    GitHub[GitHubリポジトリ]
    GitHub -->|pull| A
    GitHub -->|pull| B
    GitHub -->|pull| C

まとめ

Dataformを導入してみて、SQLと異なる部分も多々あり、色々と試行錯誤する部分が多かったです。 この試行錯誤が、今後Dataform導入を検討している方に、少しでも役に立てばと思っています。

最後に、株式会社ドワンゴの教育事業では、一緒に働けるメンバーを募集しております。 「未来の当たり前の教育をつくる」というビジョンを実現してきた組織であり、まだ誰もやっていない取り組みも果敢に挑戦できる環境です。

気になった方はカジュアル面談も行っておりますので、お気軽にご連絡ください。 開発チームの取り組み、教育事業の今後については、他記事や採用資料をご覧ください。

採用情報はこちらから

www.nnn.ed.nico

speakerdeck.com


  1. refはDataform内のテーブルを参照するSQLX独自記法