PySparkでDeltaテーブルに対してMERGEを実行すると、「multipleSourceRowMatchingTargetRowInMergeException」が発生することがあります。重複したキーやテーブルパスの整合性、データ型の不一致など、意外な落とし穴が潜んでいるため、原因と対策を正しく把握して対処することが大切です。
Deltaテーブルへのロードで発生する重複キー問題とは
Deltaテーブルを使っていると、MERGE文によるデータ更新や挿入が非常に便利です。特に、既存データと新規データを比較しながら、差分だけを更新したり、なければ新規挿入したりといった処理が簡単に書けるため、多くのデータパイプラインで利用されます。しかし、以下のようなケースでエラーが出ることがあります。
Py4JJavaError: ... multipleSourceRowMatchingTargetRowInMergeException ...
このエラーの背景には、MERGEで指定した結合キーに対して「1対1」でマッチさせるはずが、実際には「1対多」や「多対1」といった状態になっており、矛盾が発生していることが挙げられます。とくにキー列に重複がある場合、同じキーを持つ行が複数存在してしまうため、MERGEが正しく動作せずにエラーが起こります。
重複キーのチェックがなぜ必要か
多くの場合、Deltaテーブル上でMERGEを行う際は、ある特定の列を「ユニークな識別子(主キー)」として扱う想定でコードを書きます。たとえば uprn
という列をキーに使っているとします。ところが、実データに誤りがあったり、外部システムの連携ミスなどにより、同じ uprn
を持つ複数レコードが混在することがあるのです。この状態で下記のようなコードを実行すると問題が生じます。
from delta.tables import DeltaTable
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
dfNewData = spark.read.format("parquet").load("/path/to/new_data")
deltaTarget = DeltaTable.forPath(spark, "/path/to/delta_table")
deltaTarget.alias("t").merge(
dfNewData.alias("s"),
"t.uprn = s.uprn"
).whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
もし uprn
がユニークであれば問題はありません。しかし、ターゲットテーブル(t
)もしくはソース(s
)のいずれか、あるいは両方に重複したキーが含まれていると、Sparkは「どの行とどの行を対応付ければよいのか?」を判断できず、multipleSourceRowMatchingTargetRowInMergeException
が発生します。
重複キーの具体的な確認方法
ソース側のDataFrameに重複がないかを確認する簡単な方法として、次のようなクエリを実行できます。
from pyspark.sql.functions import count, col
# 重複のあるキーをグルーピング
dfNewData.groupBy("uprn") \
.agg(count("*").alias("cnt")) \
.filter(col("cnt") > 1) \
.show()
これによって、uprn
列でグルーピングした際にレコード数が2以上のキーがあれば、それが重複していることを示します。ターゲット側(Deltaテーブル)についても同様に、DeltaテーブルをDataFrameとして読み込み、同じチェックを行うとよいでしょう。
dfTarget = spark.read.format("delta").load("/path/to/delta_table")
dfTarget.groupBy("uprn") \
.agg(count("*").alias("cnt")) \
.filter(col("cnt") > 1) \
.show()
このチェックにより、重複キーが存在するかどうかを素早く把握できます。
CETASとDeltaTableパスの整合性に注意
MERGEに限らず、Deltaテーブルを扱う際に最初にやりがちなミスとして「テーブルのパスが一致していない」問題があります。DatabricksやAzure Synapseなどの環境では、SQLセルで以下のようにテーブルを作成するケースがあります。
CREATE TABLE FactEntities
USING DELTA
LOCATION 'abfss://mycontainer@myaccount.dfs.core.windows.net/delta/fact_entities'
AS
SELECT
...
一方、PySparkでDataFrame操作を行う際には、次のように DeltaTable.forPath
でパスを指定することになります。
DeltaTable.forPath(spark, "abfss://mycontainer@myaccount.dfs.core.windows.net/delta/fact_entities")
もしもSQL文で指定したLOCATIONと、PySpark側で指定したパスが少しでも異なっていると、実際には同じテーブルを参照していない可能性があります。そのため、正しいテーブルを操作しているつもりでも、実は異なる場所にあるDeltaテーブルを参照しているケースがあるのです。これが原因で「テーブルが見つからない」「想定外のデータが残っている」といったトラブルにつながります。
最初にSQL文で設定したパスを確認し、それと同じ文字列をPySpark側の DeltaTable.forPath
に指定するように注意しましょう。もし書き方を変えたい場合は、同じストレージアカウント、コンテナ、フォルダ構造を厳密に一致させてください。大文字小文字の差異やフォルダ区切りが正しく設定されていないといった細かな点も、クラウドストレージ環境やDatabricksの設定によっては影響する場合があります。
テーブルパスを誤指定した場合の症状
- MERGE対象のテーブルが存在しないエラーが出る
- すでにデータがあるはずなのに空のテーブルが返される
- エラー自体は起きないが、最新データが更新されない
- 同じストレージを使っているはずなのに、操作結果が予期せず分離される
データ型や列名の不一致によるトラブル
Deltaテーブルはスキーマ(列名やデータ型)のバージョニングをサポートしていますが、依然としてソースとターゲットで大きく異なるスキーマをもつ状態でMERGEを実行すると問題が起こることがあります。たとえば、以下のようなケースです。
- キーとして使用する列名が大文字小文字で異なる(
uprn
とUPRN
など) - ソースは整数型、ターゲットは文字列型、といった型の不一致
- ターゲットには存在しない列をソースに含めている
特に、キー列のデータ型や大文字小文字が一致していないと、Spark SQLが期待する動作を行えず、結合やMERGEが失敗する場合があります。PySparkは大文字小文字を区別しないモード(case-insensitive)を設定しているときでも、実際の列名解決が期待通りにならないケースがありますので、両側で必ず同名・同型にそろえておくことが望ましいです。
スキーマを比較する実践的な方法
スキーマの違いを確認する際は、PySpark DataFrameの printSchema()
を使うとよいでしょう。たとえば次のようにソースとターゲットのスキーマをそれぞれ確認し、差分をチェックします。
dfNewData.printSchema()
dfTarget.printSchema()
どの列がどのデータ型になっているのか、また列名に違いがないかをしっかりと把握しておくことが重要です。もし差分があった場合は、ソースの前処理で型を変換したり、ターゲットの列名を修正したりといった調整が必要になります。
Notebook実行順序と権限設定も見落としやすい
Azure SynapseやDatabricks Notebook上でSQLセルとPySparkセルを混在させる場合、実行順序にも気を配りましょう。先にSQLでテーブルを作成してからPySparkのセルを実行しないと、テーブルが見つからずにエラーになることがあります。また、Notebookを並行実行している場合、片方でテーブルをロックしていてもう片方が処理を待機・タイムアウトするケースも考えられます。
さらに、ストレージやテーブル作成権限にも注意が必要です。たとえば、AzureのData Lake Storageに対する権限(RBACロール)設定が不十分で書き込みに失敗すると、MERGE以前の問題として書き込みエラーが発生する可能性があります。権限が不足している場合は管理者に依頼して正しいロールやアクセス制御を付与してもらうか、自分自身がアプリケーションIDなどを使って認証する仕組みを整える必要があります。
権限に関連するエラー例
java.io.IOException: 403 The account does not have permission to write…
AnalysisException: Insufficient privileges to operate on schema…
com.databricks.backend.daemon.dbutils.DBUtilsException: ... Permission denied ...
これらはあくまで一例ですが、同じようなエラーが出た場合、認証情報やアクセス制御の確認を行ってください。
具体的な対処方法まとめ
ここまで紹介した課題を踏まえ、MERGE実行時に発生しがちなエラーを解消するための対策をまとめます。
対策項目 | 内容 |
---|---|
キー列の重複チェック | ソースとターゲットの両方で重複を検知する。グルーピングやdistinct等で事前に問題を回避。 |
CETASとDeltaテーブルパスの整合性 | 同一のLOCATIONパスをSQLセルとPySparkコードで指定しているかを再確認。 |
データ型や列名の不一致対策 | キー列の型・大文字小文字を含め、ソースとターゲットでスキーマをそろえておく。 |
Notebookの実行順序とロック | テーブル作成のタイミングやNotebookの並行実行に注意。ロックがかかった場合のエラーを回避する。 |
権限設定の確認 | ストレージへの読み書きやテーブル操作の権限があるかを確認。RBACやACLなどの仕組みを再点検。 |
エラー処理やログの充実化 | 大規模データ処理ではtry-exceptで例外をキャッチし、ログをしっかり出力。問題発生時のトラブルシュートを容易にする。 |
重複がある場合のソースDataFrame前処理例
もしソースDataFrameに重複があると判明したら、マージの前に一意な状態にするか、重複をまとめて集約するなどの前処理が必要です。簡単な例として、重複しているキーに対して最新の行だけ残す方法を示します。たとえば、タイムスタンプ列があると仮定して、以下のようにウィンドウ関数を使えば最新行だけ抽出可能です。
from pyspark.sql.window import Window
import pyspark.sql.functions as F
windowSpec = Window.partitionBy("uprn").orderBy(F.desc("last_update_timestamp"))
dfDeduplicated = dfNewData \
.withColumn("rn", F.row_number().over(windowSpec)) \
.filter("rn = 1") \
.drop("rn")
このようにすれば、同じ uprn
を持つ複数行があっても、last_update_timestamp
が最新の行だけ残るため、MERGE時に重複エラーを回避できます。ビジネスロジック上、どのレコードを残すかは要件によるので、適宜グルーピングや集約関数を組み合わせて最適な方法を選びましょう。
既存Deltaテーブルに重複がある場合
ターゲット側のDeltaテーブルに重複があった場合は、Deltaテーブルを一旦読み込んで重複排除したうえで書き戻す必要があります。以下はイメージ例です。
dfTarget = spark.read.format("delta").load("/path/to/delta_table")
# 重複排除(latest_date列を仮定)
windowSpecTarget = Window.partitionBy("uprn").orderBy(F.desc("latest_date"))
dfTargetDedup = dfTarget \
.withColumn("rn", F.row_number().over(windowSpecTarget)) \
.filter("rn = 1") \
.drop("rn")
# テーブルを一度置き換え
dfTargetDedup.write.format("delta").mode("overwrite").save("/path/to/delta_table")
注意点として、mode("overwrite")
を使うとテーブル全体が置き換えられるため、慎重にバックアップを取ったり、時間分割パーティションなどの考慮も必要です。大規模データの場合、処理コストも相応に発生するので、業務要件やシステム構成に合わせた最適化が求められます。
トラブルシュートをスムーズに進めるためのヒント
最後に、DeltaテーブルのMERGEエラー解消をスムーズに行うために、いくつかのヒントを紹介します。
1. ロギングとモニタリングの強化
大規模なバッチ処理やストリーミング処理を行う場合、問題が起こってから対処するのではなく、普段から処理の状態をロギングしておくのが有効です。PySparkのジョブログや、Databricksのジョブトラッキング機能、Azure監視機能などを組み合わせ、何が原因でエラーが発生したのか素早く突き止められる体制を整えましょう。
2. テストデータを使った検証
本番データでいきなりMERGEを行うのではなく、まずは少量のテストデータを用いてMERGE処理が期待通り動くかどうかを検証しましょう。重複キーを意図的に含むテストケースを作っておくと、不具合の再現性確認に役立ちます。
3. スキーマの変化を意識した設計
Deltaテーブルはスキーマエボリューションをサポートしていますが、それでも列の追加・変更が頻繁に起こる場合は注意が必要です。事前にスキーマ管理のルールやバージョン管理のポリシーを決めておくと、将来的な拡張や運用が楽になります。
4. 一貫性のあるキー設計
大規模なデータウェアハウスの構築やマスターデータ管理が絡んでくる場合、キーの一意性を保つにはデータモデリング時点での慎重な設計が必要となります。実際に運用に入ってからキーの重複が見つかると、修正に大きな工数がかかってしまうこともあります。たとえば、データのソースシステムからしっかりと一貫したIDが供給されるようになっているか、データインテグレーション時点で正規化や重複排除ができているか、日頃からチェックしておくとよいでしょう。
まとめ
PySparkのMERGEを使ってDeltaテーブルにデータをマージするときに発生する「multipleSourceRowMatchingTargetRowInMergeException」は、主にキーの重複が原因となります。また、テーブルパスの不整合や列のスキーマ相違、Notebookの実行順序や権限不足など、さまざまな要因が絡むこともあるため、総合的な観点でトラブルシュートを行いましょう。
大切なのは、問題が起きた際に「なぜそのエラーが発生したのか」をデータの重複や設定の違いなどさまざまな観点で検証することです。キー列の重複チェックやテーブルパスの見直し、スキーマの再点検などをしっかりと行い、正しい方法でMERGEが実行できるようにしておくと、安定したデータパイプラインの実装が可能になります。
コメント