BigQueryへのデータ読み込み_3.Cloud Data FusionでCSVを整形する

前の記事では、Cloud Functionsを使い、Cloud StorageにCSVファイルが追加されたらBigQueryに自動で読み込むことをしてみた。この際、CSVは予め手動で整形した上でCloud Storageに置いていた。

この記事では、CSVをBigQueryに読み込み可能な状態に整形する過程を自動化する方法を探る。

概要

今回、Cloud Data Fusionを用いてCSV整形を実施してみた。

Cloud Data Fusionは、データパイプラインをGUIで作成できるサービスで、ETLのTransform部分も様々な機能が用意されているため、CSVの整形もGUIで行えてしまう。

今回は、以下の2段階で進めた手順をまとめた。

A. 定型的にCSVを整形し、BigQueryに読み込む手順をCloud Data Fusionで作成する

B. 任意のCSVファイル名を渡したらA.が実行されるようにする

事前準備

クイックスタート|Cloud Data Fusion ドキュメント を参照して、APIを有効にし、インスタンスを作成しておく。

下記のように、インスタンス名の前に緑のチェックマークが入ったら作成完了。

操作方法や概観をつかむため、事前に以下のチュートリアルをやった。

手順

A. 定型的にCSVを整形し、BigQueryに読み込む手順をCloud Data Fusionで作成する

スタートとゴール

前の記事同様、気象庁のデータを使用する。

気象庁のHPからダウンロードした段階では、以下のような形式になっている。このCSVをCloud Storageの特定のフォルダに置いた状態からスタートする。

ダウンロードした時刻:2021/11/21 16:31:04

,東京,東京,東京,東京,東京,東京
年月日,平均気温(℃),最高気温(℃),最低気温(℃),最大風速(m/s),最大風速(m/s),天気概況(昼:06時~18時)
,,,,,風向,
2021/1/1,4.4,10.5,-1.3,3.1,北北東,快晴
2021/1/2,4.8,10.8,0.1,4.7,北北東,快晴

これを下記の形式にData Fusion内で整形し、BigQueryに格納することを目指す。

date,ave_temp,max_temp,min_temp,max_wind_speed,wind_direction,weather
2021-01-01,4.4,10.5,-1.3,3.1,北北東,快晴
2021-01-02,4.8,10.8,0.1,4.7,北北東,快晴

① 読み込み先のBigQueryの準備をしておく

BigQueryに読み込み先のデータセットとテーブルを準備しておく。

スキーマはこれからCloud Data Fusionで整形する予定の内容に合わせておく。

② Cloud Data Fusion UIでCSVを読み込む

「事前準備」でインスタンスを作成してあるので、アクションの「インスタンスを表示」をクリックすると、UIに画面が遷移する。

Wrangleをクリックする。

Upload元としてCloud Storageを選択し、用意しておいたCSVを読み込むと、以下のような感じで表示される。行によってカンマの数が異なるデータのため、うまくカンマ区切りで読み込まれず、全て1列に入る形になっている。

③ データを整形する

ゴールの内容に合うようにデータを整形していく。

  • 不要なヘッダーの削除
    今回は冒頭の5行を消してしまい、列名は後から付け直す方針で進める。 Filter > value matches regex で正規表現により日付から始まる列のみ残すように(冒頭の5行を削除するように)した。
  • カンマ区切りで区切る Parse > CSV を選択し、区切り文字はCommaを選択する。

ここまでの処理でこのような形式になっている。

  • 不要な列を削除する
    カラム区切りで分割した際、分割前の状態も列として残っていている。不要なため Delete columnより削除する。
  • 列名をつける
    各列打ち替えていく。
  • 型変換
    • 日付:読み込んだ時点では1行全体で1つの文字列として判断されているため、現状日付も文字列となっている。Parse > Simple date より、今回2021/1/1形式だったのでCustom formatを選択し、yyyy/m/dで実行した。

すると、不要な時間までついてきてしまった。

公式のドキュメントでは見つけられなかったのだけど、stack overflowを参照して、Custom transform > date.toLocalDate()をかけたら、Date型になってくれた。

  • 数値:Change data type > Floatを選択して変換する。

Wranglerで行った各変換は画面右で表示されており、不要な処理を削除することが可能。

④パイプライン作成

Create a Pipeline をクリックし、Batch pipelineを選択すると

データ元のGCSと設定したWranglerがパイプラインでつながった状態で表示される。

⑤ データの書き出し先を設定する(BigQuery)

Sink > BigQueryを選択(Sinkはデータの出力先)してBigQueryのノードを追加し、Wranglerと接続する。

ポインタをBigQuryノードの上に置くとPropertiesが表示されるので、クリックする。BigQuery側のデータセットやテーブル等の指定を行う。

⑥ デプロイと実行

Runをクリックすると、処理が進んでいく(けっこう時間がかかった 20分程度)。Statusの部分の表示で処理の段階がわかり、無事成功すると最終的にSucceededになる。

B. 任意のCSVファイル名を渡したらA.が実行されるようにする

A. では、元となるCSVファイル名をベタ打ちで指定していた。これだとファイル名が変わるたびに毎回作り直す必要があるので、任意のファイル名を渡して処理を実行できるようにしたい。

A. の処理をベースにし、一部書き換える。

① データの読み込み先の変更

Cloud Storageからの読み込んでいる最初のノードのPropertiesをクリックし、読み込み先のパスの指定部分末尾のCSVファイル名が入る部分を${csv_file_name}に変更する。

※ 下図のFormat部分をcsvとしているが、ここは今回textにする必要があった(csvだと1行目しか読み込まれなかった。おそらく、今回のCSVファイルがCSVと判断される形式でなかったのが要因)

② デプロイ

変数を入れてRunしてみると、無事成功した。

未解決な部分

今回文字列として読み込まれた小数点の数値列について、文字列からFloatに変換すると元々存在しなかった不要な桁が出てしまったが、これをうまく処理することができなかった。

(Wranglerの処理のプレビュー画面 不要な桁が残る処理となっている)

round関数があるので、wranglerでround(ave_temp, 1)などと試してみたが、エラーになり、roundの桁数を指定する方法が見つけられなかった。

この記事の中で、

We could write the recipe for the transformation directly with Directives using JEXL syntax(https://commons.apache.org/proper/commons-jexl/reference/syntax.html)

とあったので、該当ページを参照したが、今回行いたい操作のヒントは得ることができなかった。

まとめ

今回、Data Fusionを利用してCSVファイルの整形を行ってみた。GUIで様々な処理が行えるのはとっつきやすくはあったが、少し込み入った処理をしようとするとやりづらかったり、結局学習コストがかかるので、コードを書いて管理する方が楽かもしれないと感じた。

次は、ワークフローを管理できるCloud Composerを触ってみたい。

タイトルとURLをコピーしました