Search Consoleの過去データもBigQueryに蓄積する_4.Cloud Functionsで実行する

前段の記事:Search Consoleの過去データもBigQueryに蓄積する_1.全体像

最後のこの記事では、以下の点について記載していく。

Search Console API に取得上限数が設けられているため、任意の期間を指定して実行するようにしたい。Cloud Functions で毎日期間をずらして実行するにはどうしたらよいか。

Search Console API の取得上限数

Search Console API には取得上限数が設けられている。

API 使用量の割り当てに加え、検索分析のメソッドでは、検索タイプ(ウェブ、画像など)ごとに 1 日あたり最大 50,000 行がデータ別に表示されます(クリック順)。

パフォーマンス データを取得する | Search Console API

query メソッド の rowLimit プロパティで取得上限数を指定できる。デフォルトが1,000で、最大が25,000(レスポンスの最初の行のインデックスを指定する startRow プロパティと併用すると50,000行まで取得できるようだったが、今回は対応しなかった)

上記の制限があるため、今回作成したコード(https://github.com/yrarchi/search_console_to_bigquery)では、以下のパラメータを任意で指定できるようにした。

例えば、2024-02-01分から公式のエクスポート機能でBigQueryにエクスポートしており、その前の2週間分を1日あたり7日分を取得したい場合は以下のような指定になる。

{
    "target_start_date": "2024-01-18", // 開始日を指定
    "target_end_date": "2024-01-31",  // 終了日を指定
    "days": 7,  // 1日あたりに取得する日数
    "site_url": "your_site_url",
    "dataset_id": "searchconsole_hoge",
    "site_table": "past_searchdata_site_impression",  // 任意のテーブル
    "url_table": "past_searchdata_url_impression"  // 任意のテーブル
}

Cloud Functions で実行する

上記のように Search Console API の取得上限数への対応を行ったため、毎日1回コードの実行を行うようにしたい。手動で実行するのはつらいので、Cloud Functions で実行するよう設定を行った。

手元で実行する場合とコードの大半は同じだが、異なる部分を以下で記載する。

  • クライアントの作成
    今回、Cloud Functions の実行を行うサービスアカウントに BigQuery の必要な権限を付与したため、google.auth の default を利用した。
from google.auth import default
from google.cloud import bigquery

credentials, project_id = default()
client = bigquery.Client(credentials=credentials, project=project_id)

ローカルでの実行の際は、サービスアカウントのクレデンシャル情報をファイルとして置いて、それを読む形式にしていた。

from google.oauth2 import service_account
from google.cloud import bigquery

credentials = service_account.Credentials.from_service_account_file(
		"service-account.json",
		scopes=[
		    "https://www.googleapis.com/auth/bigquery.insertdata",
		    "https://www.googleapis.com/auth/bigquery"
		]
)
client = bigquery.Client(
    credentials=credentials, project=credentials.project_id
)
  • ライブラリの追加
    Cloud Functions の実行に必要なため、以下のライブラリを requirements.txt に追加している。
functions-framework==3.*

毎日対象期間をずらしながらCloud Functions の実行を行う必要があるが、変数を毎日+1していく直接的な方法でなく、BigQuery に挿入済みのデータの日付から判断していく方式を取った。

コードとしては以下のあたりが該当する。

def get_max_date(self, table_id):
    query = f"""
        SELECT MAX(data_date) AS max_date
        FROM `{self.dataset_id}.{table_id}`
    """
    try:
        query_job = self.bigquery_client.query(query)
        result = list(query_job.result())
        if result:
            return result[0]["max_date"]
        else:
            return None
    except Exception as error:
        print(f"An error occurred while getting the max date from {table_id}: {error}")
        raise
# (中略)

def get_next_day_after_max_date(client, table_id, default_start_date):
    max_date = client.get_max_date(table_id)
    if max_date:
        next_day = max_date + timedelta(days=1)
        return next_day
    else:
        return datetime.strptime(default_start_date, "%Y-%m-%d").date()

以上の対応により、任意の期間を指定した上で、Cloud Functions を毎日期間をずらして実行することができるようになった。

コードの全体は以下に置いている。
https://github.com/yrarchi/search_console_to_bigquery

Search Consoleの過去データもBigQueryに蓄積する_3.APIからの取得結果の整形

以下の点について引き続き記載していく。

Search Console の API で取得したデータと、公式エクスポート機能で入っているデータの形式は異なる。前者のデータをどのように整形したら後者に寄せられるか。

※ ここでいう公式エクスポート機能は以下のページの内容を指す
About bulk data export of Search Console data to BigQuery – Search Console Help

公式エクスポート機能で BigQuery に入るデータの形式については前記事で確認した。
Search Consoleの過去データもBigQueryに蓄積する_3.APIからの取得結果の整形

この記事では、Search Console APIから取得したデータを公式エクスポート機能で BigQuery に入るデータに寄せた集計をする方法を検討する。

Search Console APIから取得できるデータ

Search Console の検索パフォーマンスのデータの取得は query メソッドを利用する。

参照:Search Analytics: query | Search Console API

searchdata_site_impression テーブルに近づけて集計する

例えば2024-02-20のwebのデータを取得する場合、以下のような形式でリクエストを送る。

query_request = {
	"startDate": "2024-02-20",
	"endDate": "2024-02-20",
	"dimensions": ["query", "country", "device", "date"],
	"type": "web",
	"aggregationType": "byProperty",
}

search_console_client.searchanalytics().query(
    siteUrl="your_site_url",
    body=query_request
).execute()

このリクエストによる集計と公式エクスポート機能で BigQuery に入るデータを比較した図

このリクエストでは匿名化したクエリの結果は集計されない(= searchdata_site_impression テーブルの is_anonymous カラムがTrueのデータは集計されない)。

ただ、is_anonymous カラムがTrueの場合、query カラムはnullになる。そのため、分析上大きな問題にならない場合が多いと判断し、集計から除くこととした。

※ 匿名化したクエリ:

匿名化されたクエリは、2~3 か月間に数十人未満のユーザーが発行したクエリです。プライバシー保護のため、実際の検索語句は検索のパフォーマンス データに表示されません

A deep dive into Search Console performance data filtering and limits | Google Search Central Blog

リクエストに対するレスポンスは以下のような形式で返ってくるので、searchdata_site_impression テーブルのカラム名に合わせて整形する。

[
    {
        'type': 'WEB', 
        'data': {
            'rows': [
                {
                    'keys': [
	                    'bigquery 日付 比較', 
	                    'jpn', 
	                    'DESKTOP', 
	                    '2024-01-29'
	                  ], 
                    'clicks': 1, 
                    'impressions': 3, 
                    'ctr': 0.3, 
                    'position': 9
                }, …(略)
            ], 
        'responseAggregationType': 'byProperty'
        }
    }, { …(略)
]

※ コードは以下
https://github.com/yrarchi/search_console_to_bigquery

searchdata_url_impression テーブルに近づけて集計する

例えば2024-02-20のwebのデータを取得する場合、以下のような形式でリクエストを送る。(searchdata_site_impression の場合とほぼ同じだが、dimensions と aggregationType が変化している)

query_request = {
	"startDate": "2024-02-20",
	"endDate": "2024-02-20",
	"dimensions": ["query", "page", "country", "device", "date"],
	"type": "web",
	"aggregationType": "byPage",
}

search_console_client.searchanalytics().query(
    siteUrl="your_site_url",
    body=query_request
).execute()

このリクエストによる集計と公式エクスポート機能で BigQuery に入るデータを比較した図

このリクエストでは searchdata_site_impression テーブル同様、匿名化したクエリの結果は集計されない(= is_anonymous カラムがTrueのデータは集計されない)が、同様に問題としないこととした。is_anonymized_discover カラムについても類似した要因でTrueのデータ(url や country などがnullとなる)も集計結果に入れていない。

また、今回対象としたサイト(このサイト)は検索での見え方のタイプに該当するようなデータがなかったため、それらの is_~ カラム(検索での見え方のタイプを示すのに使用されるブール値フィールド)ごとの集計も行わなかった。

リクエストに対するレスポンスは以下のような形式で返ってくるので、searchdata_url_impression テーブルのカラム名に合わせて整形する。

[
    {
        'type': 'WEB', 
        'data': {
            'rows': [
                {
                    'keys': [
	                    'bigquery timestamp', 
	                    '<https://yrarchi.net/managing_learning_with_notion/>', 
	                    'jpn', 
	                    'DESKTOP', 
	                    '2024-01-29'
                    ], 
                    'clicks': 0, 
                    'impressions': 4, 
                    'ctr': 0.25, 
                    'position': 44
                }, { …(略)
            ], 
            'responseAggregationType': 'byPage'
        }
    }, {…(略)
]

※ コードは以下
https://github.com/yrarchi/search_console_to_bigquery

以上で、一部カラムは集計対象から除いたものの公式エクスポート機能に寄せた集計を Search Console APIで行えるようになった。

最後に、これを複数日に渡って実行できるよう、Cloud Functions に組み込む。

Search Consoleの過去データもBigQueryに蓄積する_4.Cloud Functionsで実行する

Search Consoleの過去データもBigQueryに蓄積する_2.公式エクスポート機能の構成

前段の記事:Search Consoleの過去データもBigQueryに蓄積する_1.全体像

前記事の以下の点について記載していく。

Search Console の API で取得したデータと、公式エクスポート機能で入っているデータの形式は異なる。前者のデータをどのように整形したら後者に寄せられるか。

※ ここでいう公式エクスポート機能は以下のページの内容を指す
About bulk data export of Search Console data to BigQuery – Search Console Help

まず、公式エクスポート機能で入るデータの形式を確認する。

テーブルの種類

Table guidelines and reference – Search Console Help に記載の通り、以下の3つのテーブルが作成される。

  • ExportLog テーブル:エクスポートのログ
  • searchdata_site_impression テーブル:プロパティごとに集計されたデータ
  • searchdata_url_impression テーブル:URL別に集計されたデータ

テーブルの集計単位(プロパティ・URL)の違いについて

テーブル searchdata_site_impression と searchdata_url_impression の集計単位(プロパティ・URL)は以下のような違いがある。

例えば https://example.com/a とhttps://example.com/b が1回ずつ検索結果に表示された場合
・プロパティごと:1回のインプレッションとしてカウント
・URLごと:2回のインプレッションとしてカウント(URLごとに1回とカウント)

参考:Performance report (Search) – Search Console Help

※ プロパティは Search Console の用語で、対象とするウェブサイトとして指定したドメインまたはURLプレフィックスのこと。

例えば、https://example.com/ というサイトの場合、 (ドメインプロパティを使わなければ)以下は別々のプロパティと認識される。
・https://example.com/
・http://example.com/
・https://www.example.com/

参考:Add a website property to Search Console – Search Console Help

公式エクスポート機能で生成されるテーブルがわかったところで、各々のテーブルの中身を確認していく。

searchdata_site_impression テーブルの詳細

公式エクスポート機能でBigQueryに生成される searchdata_site_impression テーブルと、Search Console のコンソールを比較すると以下の図のようになる。多くのカラムは Search Console の コンソールにある絞り込み機能と一致するため理解しやすい。

searchdata_site_impression テーブルで見慣れず調べたカラムについて記載する。
引用部分は Table guidelines and reference – Search Console Help より

  • is_anonymous

まれなクエリ(匿名化されたクエリ)は、このブール値でマークされます。このフィールドが true の場合、クエリを行ったユーザーのプライバシーを保護するため、クエリフィールドは null になります。

検索数の少ないクエリについては、クエリが非表示となる(Search Console のコンソールでも非表示となる)。裏を返すと、is_anonymous is false の場合は、query カラムに何らかの文字列が入ることになる。

  • sum_top_position

そのテーブルの行の各インプレッションに対する、検索結果におけるサイトの最上位の掲載順位の合計(0 は結果における最上位の掲載順位です)。平均掲載順位(1 ベース)求めるには、SUM(sum_top_position)/SUM(impressions) + 1 を計算します。

例えば以下の図のような順位(0位、1位、8位 ※ 最上位を0位と数える)で3回表示された場合、上で示した図(impressions = 3, sum_top_position = 9, 掲載順位 = 4)の結果となる。

searchdata_url_impression テーブルの詳細

searchdata_site_impression テーブルに比べ、こちらの方が見慣れないカラムが多く、データの意味するところが捉えづらい。公式エクスポート機能で BigQuery に生成される searchdata_url_impression テーブルと、Search Console の コンソール を比較すると以下の図のようになる。

searchdata_url_impression テーブルで見慣れず調べたカラムについて記載する。
引用部分は Table guidelines and reference – Search Console Help より

  • is_anonymized_discover

データ行が Discover の匿名化しきい値を下回っているかどうかを示します。しきい値を下回っている場合、ユーザーのプライバシー保護のため、他の一部のフィールド(url や country など)は表示されなくなります。

匿名化クエリ(is_anonymous カラム)のDiscover版といえるか? 詳細の記載は見つけられなかったが、件数が少ない場合はurl等のカラムがnullになる模様。

  • その他の is_~カラム

検索での見え方のタイプを示すのに使用されるブール値フィールドがいくつかあります(is_amp_top_stories、is_job_listing、is_job_details など)。特定のリッチリザルトで問題の行が表示される場合、フィールドは true になります。

この「検索での見え方」は以下に挙げられている、例えば商品スニペットやレシピギャラリーなど、特別な検索結果の表示の仕方を指している。
参照:Performance report (Search) – Search Console Help
Search Console のコンソールにもそれ用のタブがある。

  • sum_position

クエリの検索結果における URL の最上位の掲載順位を示す 0 ベースの数値(0 は結果における最上位の掲載順位です)。平均掲載順位(1 ベース)を求めるには、SUM(sum_position)/SUM(impressions) + 1 を計算します。

例えば以下の図のような順位(44位、9位、0位、… ※ 最上位を0位と数える)で 20回表示されたとしたら、上で示した図(impressions = 20, 掲載順位 = 13.4)の結果となる。

ここまで、公式エクスポート機能で BigQuery に入るデータを Search Console のコンソール上のデータの集計結果と比較しながら把握した。

次に、Search Console のAPIから取得可能なデータを確認し、公式エクスポート機能に寄せた集計をするにはどうしたら良いか考えていく。
Search Consoleの過去データもBigQueryに蓄積する_3.APIからの取得結果の整形

Search Consoleの過去データもBigQueryに蓄積する_1.全体像

目的

以下のドキュメントで示されている Search Console データのエクスポート機能(以下「公式エクスポート機能」)では、設定以後の期間のデータが毎日エクスポートされるが、過去期間のデータはエクスポートされない。

BigQuery への Search Console データの一括エクスポートについて

設定前の期間も BigQuery にデータを入れて分析できる状態を作りたいと思い、今回その仕組みを作ってみた。

※ 設定前の期間のデータは API で取得するため、無限に遡れるわけではなく Search Console にある過去 16 か月間まで

作った仕組みの概要

  1. 以下の操作を行うPythonコードを作成
    ・Search Console から API を通じてデータを取得
    ・公式エクスポート機能で BigQuery にエクスポートされている形式になるべく合わせて、取得したデータを整形
    ・整形したデータを BigQuery に挿入
  2. 1.のコードを Cloud Functions で実行するよう設定
  3. 2.のCloud Functions を Cloud Scheculer で実行
    Search Console の API に取得上限数が設けられているため、検索数が多いサイトだと一気に実行できない場合がある。そのため、1日に任意の日数分取得する設定をした上で、毎日期間をずらして実行する形式にしている。

コード

コードは以下に置いている。
https://github.com/yrarchi/search_console_to_bigquery

後続の記事で、作成時に検討が必要だった下記の点について詳細を記載する。

BigQueryにエクスポートしたGA4のデータと比較する

BigQueryにエクスポートしたGA4のデータの概要を把握する で、ある程度作りを把握したので、この記事ではBigQueryでクエリを書いて集計した結果とGA4のコンソールで表示した結果を見比べながら理解を深めていく。

イベントについて

GA4ではユーザーの行動(ページビュー、セッション開始、クリック等)は「イベント」として計測されている。
そこでまずはイベントの値として何が入っているかを見比べてみる。

[BigQuery側]
イベントはevent_name というカラムとして格納されている。

SELECT distinct(event_name)
FROM analytics_<property_id>.events_YYYYMMDD

イベントのイメージとしては、例えばユーザーがサイトにアクセスしたら session_start イベントと page_view イベントが発生し、さらに初回の訪問ならfirst_visit イベントも発生する。

[コンソール側]
管理 > プロパティ設定 > データの表示 > イベント より、設定しているイベントを確認できる。

BigQuery側とコンソール側でイベントが一致していることが確認できた。

なお、GA4のイベントには4種類あり、上記の7つのイベントは記載の区分に当てはまる。

  • 自動収集イベント:自動で計測される 今回だと first_visit, session_start
  • 拡張計測機能イベント:GA4管理画面上で有効にすると自動で計測される 今回だと page_view, scroll, click
  • 推奨イベント:ユーザーが設定するが、名前とパラメータがGoogleで定義されている
  • カスタムイベント:ユーザーが自由にイベント名を定義する 今回だと internal_link_click, top_page_view

[GA4] About events – Analytics Help

イベントの概観を確認できたので、以下でイベントを指定して値を集計してみる。

first_visit イベント

チャネルごとの新規ユーザー数
first_visitは初回の訪問時に発生するイベントなので、ある日に初めて訪問したユーザー数をチャネル別で集計してみる。

[BigQuery側]

SELECT
  traffic_source.name AS medium,
  COUNT(DISTINCT user_pseudo_id) AS new_users
FROM
  analytics_<property_id>.events_YYYYMMDD
WHERE
  event_name = "first_visit"
GROUP BY
  medium

※ ユーザー数について
COUNT(DISTINCT user_pseudo_id) でユーザー数を集計している。
以下確認より、user_pseudo_id はアクセスのあったユーザーのブラウザに割り当てられる一時的なID(同じユーザーでもブラウザが異なれば異なるIDとなる)と理解している。今回、user_pseudo_id はユーザーエクスプローラで「有効なユーザー ID」として表示される値と一致していた。

コンソール上で以下の説明があり、このサイトでは「ユーザー ID」の設定がなくウェブサイトのため、今回は実質「クライアント ID」であると解釈した。

有効なユーザー ID とは、ユーザーのブラウザ インスタンスまたはアプリのインストールに関連付けられている一意の ID です。Google アナリティクスではこの ID を使って、同一ユーザーのセッションによる訪問が識別されます。

有効なユーザー ID には、利用可能であればユーザー ID が使用されます。利用可能でない場合は、デバイス ID が使用されます。デバイス ID は、モバイルアプリのアプリ インスタンス ID またはウェブサイトのクライアント ID です。

[コンソール側]
ライフサイクル > 集客 > ユーザー獲得 より、ディメンションを「最初のユーザーのデフォルトチャネルグループ」に設定して確認

BigQuery側とコンソール側で、ユーザー数が一致していることが確認できた。

page_viewイベント

ページごとの表示回数とユーザー数
page_viewはサイト内のページにアクセスすると発生するイベントなので、ある日のページごとの表示回数とユーザー数を集計してみる。

[BigQuery側]

SELECT
  event_params.value.string_value AS page_path,
  COUNT(*) AS pageviews,
  COUNT(DISTINCT user_pseudo_id) AS users,
FROM
  analytics_<property_id>.events_YYYYMMDD
  ,UNNEST(event_params) AS event_params
WHERE
  event_name = 'page_view'
  AND event_params.key = 'page_location'
GROUP BY
  page_path

※ テーブルのつくりの確認
クエリだけだと理解しづらいので、使用しているカラムをプレビュー画面に記載した

※ ユーザー数については上記と同様

[コンソール側]
ライフサイクル > エンゲージメント > ページとスクリーン より確認

BigQuery側とコンソール側で数が一致していることが確認できた。

clickイベント

ある日に、どのページで何回クリックが発生したかをカウントしてみる。

[BigQuery側]

SELECT
  event_name,
  event_params.value.string_value as page_location,
  COUNT(*) AS click_count
FROM
  analytics_<property_id>.events_YYYYMMDD
  ,UNNEST(event_params) AS event_params
WHERE
  event_name = 'click'
  and event_params.key = "page_location"
GROUP BY
  event_name,
  page_location

[コンソール側]
探索 からレポートを作成し確認する。
ディメンション(行)としてページロケーション、指標(値)としてイベント数を指定して、フィルタでイベント名をclickにする。

BigQuery側とコンソール側で数が一致していることが確認できた。

まとめ

この記事では、BigQueryにエクスポートしたデータに対する集計結果とGA4のコンソールでの集計をイベントを切り口として比較した。今回の範囲ではBigQueryの集計とコンソールの集計が一致したが、試している中では一部微妙にずれる集計も存在したため、次にその要因と対応方法を確認したい。

不定期番組の放送を通知する仕組みをChatGPTと作る

これは GMOペパボディレクター Advent Calendar 2023 15日の記事です。

HNK 番組表API を利用して、登録したキーワードに合致する番組をチェックし、Slackに通知する仕組みを作ってみた。

Slack通知の例

この記事では 1. 作成物の概要 と 2. 今回ChatGPTの力を借りて作成を行ったのでその感想 を記載する。

作成のきっかけ

普段、テレビは録画しておいた番組をみる機会が多く、週に1回程度番組表をざっと眺めて録画の予約をしている。この方法だと、関心のある番組なのに録り忘れることがしばしばあった(特に数ヶ月に1回など不定期放送の番組は見逃しがちだった)。

条件に合致する番組があったら通知してくれるアプリとかありそうだな、と思って探したが、放送直前の通知が多く、数日手前の段階での通知機能を持つアプリは見つけることができなかった。

この調査の中で、HNK 番組表API の存在を知り、今回の作成を行った。※1

仕組みの概要

  • HNK 番組表API を通じて1週間分の番組情報を取得する
  • あらかじめ設定しておいた番組名かキーワードに合致する番組をピックアップする
  • ピックアップした番組の情報をSlackに通知する

コードはこちら:https://github.com/yrarchi/nhk_programs_checker

ChatGPTとの共同作業

今回の作成はChatGPTに助けてもらいながら行ってみた。条件を与えて最初のベースになるコードを書いてもらい、条件を付加してコードを修正してもらってみたり、リファクタリングをしてもらってみたりした。

意図通り示してもらえる部分も、意図を汲んでもらえない場合もあったが、自分で一から作るのに比べ3分の1程度の時間で完成したように思う。

以下に質問の具体例をいくつか示す。

  • 最初のベースになるコードを書いてもらう
(前略 意図した仕組みと作り方を伝え、懸念点等を聞いていた) 
NHK APIキーの取得はできたので、大きく以下のブロックに分けて進めたいと考えています。 
1. NHK APIキーをたたいて、番組名一覧を取得し、登録した番組名と一致する番組名があるかをチェックするPythonスクリプトを作成する 
2. Slackに通知する仕組みを追加する
まずは1.について、コードを示してください。なお、1つのスクリプトにまとめてしまわず複数のファイルに適切に分割してくださるとありがたいです。 

→ requestsライブラリを使用してHTTPリクエストを送信し、返された番組情報を処理する20行程度のコードを書いてくれた。それを修正していけば良いので、動き出しのはずみになった

  • 条件を付加してコードを修正してもらう
現在のコードは以下です。 
このコードだと、同じ番組が再放送等で複数存在する場合に、それぞれtitleやcontentを出力してしまいます。 同じ番組の場合はそれらの出力は1回になるよう、コードを修正してください。

→ 意図を汲んでコードを修正してくれたものの、意図通りには処理されないものになっていなかったため、その要因と思われる点を伝え、再修正してもらった

以下のような、Pythonの辞書があります。 
dict = { 
'title_a': {'subtitle': 'subtitle', 'content': 'content', 'start_time': '2023-11-28T04:10:00+09:00'}, 
'title_b': {'subtitle': 'subtitle', 'content': 'content', 'start_time': '2023-11-29T04:10:00+09:00'} 
} 
これを以下のjsonに当てはめて、dictの長さ分だけ生成するコードを書いてください。
 なお、jsonは外部ファイルから読み込み、また当てはめた結果をjsonとして出力するようなコードとしてください。

→ これはおそらくこちらの伝え方がうまくなく、一応コードを生成してくれたものの再度

示していただいたコードはjinja templateを使ったら簡潔に書けるのではないでしょうか 

と方法をこちらから指定して再度書き直してもらった

  • リファクタリングをしてもらう
以下のコードについて、可読性を上げられることはないか・変数名としてより適切なものはあるか・セキュリティの観点から修正すべき点がないか指摘してください。 

→具体的なコードの箇所を示しての指示を期待したが、一般的な注意点の提示だった

今後の発展

以下のような改善を今後してみたい。

  • 今回はコードの自動実行の設定までいかなかったので、週に1回定期実行されるようにしたい
  • 今回は指定したキーワードに合致した番組を通知する形だが、「この番組に興味を持つならこれも好きかも」と推薦を行えるようにしてみたい

※1 今回のAPIはNHKの番組のみが対象だが、不定期放送で見逃しがちな番組の多くがNHKだったため、用途上大きな問題がなかった

BigQueryにエクスポートしたGA4のデータの概要を把握する

GA4のデータを異なるGoogleアカウントからBigQueryにエクスポートする」の操作により、BigQueryにGA4のデータがエクスポートされるようになった。

この記事では、エクスポートされたデータの作りやGAのコンソール上での見え方との比較を行って、概要の把握を進める。

BigQueryでのテーブル構成の概要

BigQueryへのエクスポートには、数分前のデータが都度入ってくるストリーミングエクスポートと、1日1回エクスポートされる2種類があり、今回は後者のみとしている。

この場合、以下の形式でデータが作成される。
・データセット:analytics_<property_id>
・テーブル:日毎のテーブル events_YYYYMMDD

テーブルの中身をpreviewで確認すると、event単位でネストされていることがわかる。例えば下記画像の例だと、session_startとpage_viewのイベントそれぞれに対して、page_title等のパラメータとその値が格納されている。

ユーザーエクスプローラーの結果と見比べて概要を把握する

GA4のコンソール上での表示と、BigQueryのデータを見比べて同じであることを確認してみる。user_pseudo_id を指定して比較する。

  • BigQuery側
SELECT
  event_date,
  event_timestamp,
  DATETIME(TIMESTAMP_MICROS(event_timestamp), "Asia/Tokyo") as event_datetime,
  event_name
FROM 
  analytics_<property_id>.events_20231010
WHERE 
  user_pseudo_id = <user_pseudo_id>
  • GAコンソール側
    [探索] → [テンプレート ギャラリー] → [ユーザー エクスプローラ] より、user_pseudo_idを指定して表示する。

「イベント4件」となっていて、user_engagementはリストに表示されていないものの、右上の「上位のイベント」として表示はされているので、BigQuery側と項目としては同じであることが確認できる。また、日時も合致していることが確認できる。

複数日にまたがってクエリを書くには

events_YYYYMMDD という名称で日毎にテーブルが作成されるため、複数日にわたって集計するクエリを書きたい場合は、複数のテーブルを結合する必要がある。

BigQueryのワイルドカード テーブルを使用すると、テーブル名をワイルドカードで指定してクエリを書くことができる。
 Query multiple tables using a wildcard table | BigQuery | Google Cloud

例えば、2023/10/01〜2023/10/20の20日分を集計したい場合は以下のように書ける。

SELECT 
	…
FROM
  analytics_<property_id>.events_*`
WHERE
  _TABLE_SUFFIX BETWEEN "20231001" AND "20231020"

大まかだが、以上でBigQueryへエクスポートされたデータの作りを確認したので、次に具体的にクエリを書きながら理解を深めていく。
→ BigQueryにエクスポートしたGA4のデータをコンソールと比較する

GA4のデータを異なるGoogleアカウントからBigQueryにエクスポートする

UAからGA4になったことで、無料のGoogle AnalyticsでもBigQueryにエクスポートできるようになった※ ので、試しにやってみた。今回、Google CloudとGoogle Analytics のGoogleアカウントが異なることで何点かつまづいた箇所があったので、それを踏まえて行った手順を記載する。

※ BigQueryサンドボックスを利用すると、その制限内ならBigQueryの利用も無料となる 参照:[GA4] BigQuery Export- Analytics Help

具体的な手順

基本的には下記のヘルプの手順に沿って進めていく。

[GA4] Set up BigQuery Export – Analytics Help

  1. BigQuery APIを有効にする
    [API とサービス]>[ライブラリ] より有効にする
BigQuery API
  1. プロジェクトを作成する
  2. GA4プロパティをBigQueryにリンクさせる
    GAの管理画面 [管理] > [サービス間のリンク設定] > [BigQuery のリンク] から「リンク」をクリックする
GA4のBigQueryのリンク設定画面

ヘルプの記載だと、「[BigQuery プロジェクトを選択] をクリックして、アクセス可能なプロジェクトのリストを表示します」とあるが、選択できるプロジェクトが表示されなかった。

これは、GAとGoogle Cloudで異なるGoogleアカウントを利用していることが要因だったようで、Google CloudでGAのGoogleアカウントをIAMから編集者として追加したら、プロジェクトが表示されるようになった。追加は[IAMと管理] > [IAM] より、「アクセス権を付与」をクリックして行う。

Google Cloud のIAMでGA4のGoogleアカウントを追加

GAの管理画面に戻り、BigQueryのリンクを再び行ってみる。

上記アカウントの追加によりプロジェクトは表示されるようになったが、権限が足りないと表示される。

GA4の管理画面でエラーが表示される

選択した Google Cloud プロジェクトへのアクセス権がないため、リンク処理を完了できません。
必要な権限: serviceusage.services.enable、resourcemanager.projects.setIamPolicy。

挙げられている2つの権限 serviceusage.services.enable も serviceusage.services.getも、今回付与した編集権限の中に含まれていた。おかしいな…と思ったら、ヘルプの中に以下の記載があり、エラーメッセージには記載のない resourcemanager.projects.setIamPolicy が編集権限には含まれていなかった。

BigQuery リンクの作成に最低限必要な権限は次のとおりです。
・resourcemanager.projects.get プロジェクトを取得する
・resourcemanager.projects.getIamPolicy 権限のリストを取得する
・resourcemanager.projects.setIamPolicy ユーザーがこのプロジェクトでリンクを作成する権限を持っているかどうかを確認する
・serviceusage.services.enable BigQuery API を有効にする serviceusage.services.get BigQuery API が有効かどうかを確認する

そこで、必要な権限のみ付与したカスタムロールを作成した。

GA4のGoogleアカウントの権限を編集権限からこのカスタムロールに変更したらエラーが出なくなり、進めるようになった。

三度GAの管理画面に戻り、BigQueryのリンクを行う。エクスポートは毎日を選択し、データ数が少ないので除外イベントは設定しなかった。

何かエラーが起きた。

時間をおいて再度実行しても解決せず、同じエラーが出た。
ヘルプに以下の記載もあり、

アナリティクスへのログイン時には、BigQuery プロジェクトの所有者権限(アクセス要件の詳細については、以下の権限を参照)と、リンクするデータ ストリームを含むアナリティクス プロパティの編集者ロールの両方があるメールアドレスを使用します

「アクセス要件の詳細については、以下の権限を参照」に記載の権限は付与しているのだが、結局所有者権限が必要なのだろうか…となり、権限を付与してみたらエラーが起きず、通るようになった。

これで、一応BigQueryにGA4のデータがエクスポートされるようになったので、次にエクスポートされたデータの概要を確認する。
BigQueryにエクスポートしたGA4のデータの概要を把握する

Healthデータのスプレッドシートへの送り込み

最近の活動量計等は歩数や体重などのデータをスマホに送信できることも多い。それが可能な活動量計と体脂肪計を利用しているが、アプリで時折変化を眺めて満足してしまうことが多かった。予め用意されたフォーマットで見るだけでなく自分で分析したいし、データをバックアップしておきたいので、スプレッドシートにデータを定期的にアップロードするようにした。

できること

毎日定時にスプレッドシートに下記のような1行分のレコード(「昨日」の各種値)が追加される

仕組み

スマホ上でデータはHealthアプリに集約する(ここは各デバイスの純正アプリにすでにHealthへの同期機能があったため、それを利用)。HealthアプリからShortcutsアプリでデータを取り出し、GASを介してスプレッドシートに書き出す。

具体的な手順

  1. Shortcutsアプリで必要なデータを取り出してPOSTリクエストを送るショートカットを作成する
    • 色々はまるポイントがあったので、以下画像内で記載する
  1. GASでPOSTリクエストを受けてシートに書き込むスクリプトを作る
function doPost(e) {  // POSTリクエストを送信されたら実行される
  var data = getData(e);
  appendData(data);
}

function getData(e) {
  var params = JSON.parse(e.postData.getDataAsString());   // POSTされたデータを取得
  var values = JSON.parse(params.results);  // Shortcutsアプリで作成したショートカットで、results: {values: {Weight: …} の形式でデータを入れている
  return [
    new Date(), 
    values.Weight, 
    values.BodyFatPercentage, 
    values.RestingEnergy,
    values.ActiveEnergy,
    values.Steps,
    values.BodyTemperature,
  ];
}

function appendData(data) {
  var sheetName = "HealthCareApp";
  var spreadsheet = SpreadsheetApp.getActiveSpreadsheet();
  var sheet = spreadsheet.getSheetByName(sheetName);
  sheet.appendRow(data);
}
  1. 1.で作成したショートカットを毎日定時に実行するよう設定する
    Shortcutsアプリの中でAutomationから定時実行を設定できる

感想

  • ショートカットの作成をスマホ上で行うのがつらい。
    • 簡単なショートカットだと問題ないのだけど、今回のように長くなるとぽちぽち押しての操作がつらくなる。Shortcutsアプリ自体はMacOSにもあるのでPCでもショートカットの編集は行えるが、HealthアプリがMacOSにはないのでHealthアプリの操作をする部分はPCでは編集できなかった。
    • 編集後の差分を確認できないので、コードとして管理したい気持ちになった(Shortcutsアプリの意義と逆方向の感想になってしまうが…)

グラフを動画化する

これは GMOペパボディレクター Advent Calendar 2022 17日の記事です。


上記のような感じで、時系列かつ複数の要素が同時に起こるデータを動画化してみたので、その作成過程を記載する。
(自分の手元では確認したのだけど、動画ちゃんと動いているかな…)

例えば、どのような検索キーワードが合わせて検索されることが増えているか、社内でどの部署同士の結びつきが強くなっているか、などの可視化に使えそうなイメージをしている。

ここではBigQueryの公開データセット theLook eCommerce を例として使用させていただいた。theLook eCommerce は架空の衣料品サイトにおける商品や注文等のデータが格納されているので、ある商品分野(「シャツ」「ジーンズ」など)の商品の併せ買いのされやすさの変化を可視化してみる。

データの前処理

使用したのは theLook eCommerce の productsテーブルとorder_items テーブルの2つ。

年月単位で、同時注文したカテゴリの組み合わせ別に注文数を集計する。

WITH duplicate_order_combinations AS(
  SELECT
    order_items.order_id,
    DATE(TIMESTAMP_TRUNC(order_items.created_at, MONTH)) AS created_at,
    CASE 
      WHEN products_destination.category IS NULL 
        THEN products.category
      WHEN products.category < products_destination.category 
        THEN CONCAT(products.category, "-", products_destination.category) 
      ELSE CONCAT(products_destination.category, "-", products.category) 
    END AS categories,
  FROM 
    bigquery-public-data.thelook_ecommerce.order_items
  INNER JOIN 
    bigquery-public-data.thelook_ecommerce.products
    ON order_items.product_id = products.id
  LEFT JOIN 
    bigquery-public-data.thelook_ecommerce.order_items AS order_destination
    ON order_items.order_id = order_destination.order_id
    AND order_items.product_id != order_destination.product_id
  LEFT JOIN 
    bigquery-public-data.thelook_ecommerce.products AS products_destination
    ON order_destination.product_id = products_destination.id
),

order_combinations AS(
  SELECT
    order_id,
    created_at,
    categories,
  FROM
    duplicate_order_combinations
  GROUP BY
      order_id,
      created_at,
      categories
)

SELECT
  created_at,
  SPLIT(categories, "-")[offset(0)] AS category, 
  CASE WHEN categories LIKE "%-%" 
    THEN SPLIT(categories, "-")[offset(1)] ELSE NULL END AS other_category, 
  COUNT(*) AS _count
FROM 
  order_combinations
GROUP BY
  created_at,
  categories

以下のような感じで集計される。以下のレコードだとAccessoriesとOuterwear & Coatsの同時購入が2020-05に20件あったということになる。

created_at, category, other_category, _count
2022-05-01, Accessories, Outerwear & Coats, 20

この結果にdataという名称をつけておく。

また、カテゴリの一覧も取得してcategoriesという名称をつけておく。

SELECT distinct(category)
FROM bigquery-public-data.thelook_ecommerce.products

グラフ化

グラフを扱えるNetworkX と可視化を行う Matplotlib を使用してグラフ化している。

以下の環境で実行を確認した。

Python 3.8.16
networkX 2.8.8
pandas 1.3.5
matplotlib 3.2.2

大まかな流れとして

1. ノード(今回だと購入した商品のカテゴリ)とエッジ(今回だと同時に購入した商品のカテゴリをつなぐ)を作成する

2. ノードとエッジを描画する(多い組み合わせほど太いエッジとなるようにし、同じカテゴリで複数購入された場合は同じノードを環状につなぐようにした)

import networkx as nx
import pandas as pd
from matplotlib import pyplot as plt

nlist = [
  ['Pants', 'Shorts', 'Skirts', 'Pants & Capris', 'Jeans'],
  ['Tops & Tees', 'Sweaters', 'Fashion Hoodies & Sweatshirts', 'Blazers & Jackets', 'Suits & Sport Coats', 'Outerwear & Coats'],
  ['Jumpsuits & Rompers', 'Dresses', 'Suits', 'Clothing Sets', 'Active', 'Swim', 'Maternity', 'Sleep & Lounge'],
  ['Socks', 'Socks & Hosiery', 'Leggings', 'Intimates', 'Underwear', 'Plus', 'Accessories']
]  # 使用しているshell_layoutの引数 nlistで同じリストに属するカテゴリが同じ円状に配置される


def make_graph(data, term, categories, origin_column, destination_column):
  data_internal = data[data[destination_column].isnull()]
  data_external = data[~data[destination_column].isnull()]
  G = nx.Graph()

  # nodeの作成
  for category in categories:
    counts = data_internal.query(f'{origin_column} == @category')['_count'].values
    if len(counts) > 0:
      G.add_nodes_from([(category, {'count': counts[0]})])
    else:
      G.add_nodes_from([(category, {'count': 1})])  # 単品での購入が行われていない場合

  # edgeの作成
  for origin, destination in zip(data_external[origin_column], data_external[destination_column]):
    weight = data_external.query(f'{origin_column} == @origin and {destination_column} == @destination')['_count'].values
    if len(weight) > 0:
      G.add_edge(origin, destination, weight=weight[0])
    else:
      G.add_edge(origin, destination, weight=0)
  
  return G


def plot_graph(G, term, nlist):
  # graphの描画
  plt.figure(figsize=(15, 15))
  pos = nx.shell_layout(G, nlist=nlist)

  node_size = [d['count']*8 for (n, d) in G.nodes(data=True)]
  nx.draw_networkx_nodes(G, pos, node_color='w', edgecolors='b', alpha=0.6, node_size=node_size)
  nx.draw_networkx_labels(G, pos)
  edge_width = [d['weight']*0.08 for (u, v, d) in G.edges(data=True)]
  edge_color = edge_width / max(edge_width)
  nx.draw_networkx_edges(G, pos, alpha=0.6, edge_color=edge_color, width=edge_width, edge_cmap=plt.cm.cool)

  plt.axis('off')
  title = f'{pd.to_datetime(term).year}-{pd.to_datetime(term).month}'
  plt.title(title, loc='left', fontsize=24)
  plt.savefig(f'{title}.png')


for month in data['created_at'].unique():
  data_month = data.query('created_at==@month')
  G = make_graph(data_month, month, categories['category'], 'category', 'other_category')
  plot_graph(G, month, nlist)

以下のような感じで月毎に画像が生成される。

動画化

動画への変換は OpenCV を利用した。上記で月毎に画像を生成した際、年月をタイトルとして保存してあるので、その順で重ねて動画にしている。

import cv2


def movie_export(prefix):
  fourcc = cv2.VideoWriter_fourcc('m', 'p', '4', 'v')
  video  = cv2.VideoWriter(f'{prefix}_movie.mp4', fourcc, 6.0, (1080, 1080))

  for month in sorted(data['created_at'].unique()):
    title = f'{pd.to_datetime(month).year}-{pd.to_datetime(month).month}'
    img = cv2.imread(f'/content/{title}.png')
    video.write(img)
  video.release()


movie_export('category')

次のような動画が生成される。

類似度を用いてレシートOCRの精度を上げる

以前作成したレシート画像から品目や価格を読み取りCSV化する仕組みについて、品目の判定精度が低く困っていた。そこで、読み取った品目と過去履歴の品目との類似度を求める仕組みを組み込むことで、品目判定の精度が上がったのでやったことを記録する。

困りごと

元々の仕組みでは、下記のようにOCRによる品目の読み取り結果とそれを人が見て修正した結果をペアで履歴としてたまるようにしておき、全く同じ読み取り結果を得た時にのみ人が修正した結果に自動で変換するようにしていた。
# 読み取り結果,人が修正した結果
キャベッ,キャベツ
TV1.0テイシボ,牛乳
Pロコリー,ブロッコリー  # 次回以降に「Pロコリー」という読み取り結果が得られた時に「ブロッコリー」に自動変換する
普段買うものはある程度決まっているため、履歴がたまってくれば人手による修正は減っていくだろうと思っていた。実際は読み取り結果が予想よりバラエティに富んでいて(読み取り精度が思ったより低くて)、人手による修正はなかなか減らなかった。
例えば、以下はレシートに「ブルガリアYG脂肪0プ」と印字されていた場合の読み取り結果の履歴一覧である(印字は「ブルガリアヨーグルト脂肪0プレーン」の略称と思われる)。間違い探しのように毎回少しずつ異なる結果が得られて、なかなか自動修正されなかった。
# 読み取り結果,人が修正した結果
ブルガリ~YG脂肪0プ,ヨーグルト
プルガリアYG脂肪0Qプ,ヨーグルト
フルガリアYG脂肪0プ,ヨーグルト
ブルガリアYG脂肪0プ,ヨーグルト
ブルガリアYG脂肪0プ。,ヨーグルト
ブルガリア了YG脂肪0Uプ,ヨーグルト
フルガリア了YG脂肪0プ,ヨーグルト
ブルガリア了YG脂肪0プ,ヨーグルト
ワルガリアYG脂肪0プ,ヨーグルト
上記の例だと漢字の「脂肪」は毎回正しく読み取れている。ただ、漢字でも正しく認識されていないことも多い。 例えば「超熟(6)」と印字されていた場合の読み取り結果の履歴一覧は以下のようになっている。熱と熟など人間から見てほぼ同じに見えるものから、弟と熟などそこまで似ていないように見えるものまである。
# 読み取り結果,人が修正した結果
記熟(6),食パン
超衣(6),食パン
超熟(6),食パン
超熱(6),食パン
超誰(6),食パン
超弟(6),食パン
超認(6),食パン
履歴を眺めているとなんとなく画数の多い漢字の方が正答率が高く、カタカナやひらがなは正答率が低い傾向に見えた(なお、ここでは示していないが半角カタカナの印字の場合、元の文字列を類推することが不可能なレベルのもっと壊滅的な結果が得られる)。
今回の対応範囲は、上で示したヨーグルトと食パンのような人間が見れば同一品目と類推できるようなレベルの読み取りであれば、自動で変換して欲しいという点においた。

やったこと

読み取り結果に完全一致する品目が履歴になくても、類似度が高い品目が履歴にあれば自動でその品目名に修正するようにした。
具体的には、履歴の品目の文字列群と今回読み取った文字列のレーベンシュタイン距離を計算し、距離が最短の品目について閾値を下回っていればその品目(の人が修正した結果)に変換するようにした。距離の閾値については、いくつか試した上でバランスを見て定めた(後述)。
※ 類似度として使用したレーベンシュタイン距離については別記事に整理した
イメージを持ちやすくするためまずはレーベンシュタイン距離を求めた結果を示す。
※ 以下、レーベンシュタイン距離は文字列の長さで正規化しているため、最大で1となる
「ブルガリアYG脂肪0プ」という文字列に対してだと、以下のようなレーベンシュタイン距離となった。ブルガリアヨーグルト関連の文字列に対して距離が小さい(=類似度が高い)一方、他の品目に対しては距離がぐっと伸びている(=類似度が低い)ことがわかる。
# 読み取り結果, 人が修正した結果, レーベンシュタイン距離
ブルガリア了YG脂肪0プ, ヨーグルト, 0.08
プルガリアYG脂肪0Qプ, ヨーグルト, 0.17
ニニガリアツアグ, 厚揚げ, 0.73
アルカリ乾電池単1形, 電池, 0.82
リがクリループ\\", ガム, 0.91
今回の目的に対し使えそうな感触を得られたので、以下のデータを用いて自動変換する距離の閾値を求めていった。
- 履歴にある品目数: 321件(人による修正結果単位で見ると141件)
- テストデータの品目数: 56件
いくつか具体例を示す。
例1:読み取り結果が「シーチキン[|Newマイルド」だった場合、履歴の品目のうち最小距離となるのは「シーチキンNeeマイルド」だった。これらは人が同一品目と判断する例なので、一致すると判断し自動変換して欲しい。両者の距離は0.21だったため、閾値を0.21以上に設定していれば正しく変換できることになる。
例2:読み取り結果が「ミツカンやさしいお酢3」だった場合、履歴の品目のうち最小距離となるのは「生しいたけ」だった(両者の文字列に「しい」が共通しているため最小となったと思われる)。これらは人が別品目と判断する例なので、読み取り結果に一致する品目が履歴にないと判断して欲しい。両者の距離は0.82だったため、閾値を0.82以上に設定した場合は誤変換してしまうことになる。
このように、閾値をいくつに設定するかによって正しく変換できなかったり、誤って別の品目に変換してしまう可能性がある。
閾値を0.3から0.8の範囲で変えてみて、変換結果の割合を確認したのが下図となる。
完全一致した場合のみ変換する既存の方法に比べ、レーベンシュタイン距離を用いると変換成功する確率が上がることがわかる。また、閾値を高く設定する(= 類似度が比較的低くても変換する)と変換に成功する確率が上がるが、同時に誤変換する確率も上がる。
今回は、誤変換と変換成功のバランスをみつつ、誤変換に気づかずそのまま記録するよりは変換できずに手動で修正する方が良いと考え(偽陽性を防ぐイメージ)、閾値を0.5に設定することとした。

今後

いったん上記の方法で類似度を出す方法を組み込んだので、しばらく利用して感触を確かめてみる。以下のあたりが次の課題だと現時点では考えている。
  • 漢字とひらがなの表記揺れへの対応(例えば「たまねぎ」と「玉ねぎ」は距離が0とならないため、より距離が短い品目があるとそちらに誤変換されてしまう)
  • 半角カタカナへの対応(上記で記載したが、半角カタカナの読み取り精度は著しく低いので、同じ品目でも読み取り結果のばらつきが大きすぎ、今回の類似度による変換はほぼ効かない)

レーベンシュタイン距離の求め方を整理する

これは、類似度を用いてレシートOCRの精度を上げるで文字列間の類似度を判定するために使用した、レーベンシュタイン距離について自分の理解のためにまとめた記事となる。

レーベンシュタイン距離とは

レーベンシュタイン距離は、文字列間の類似度を距離として示すもの(距離が近いほど類似していることを示す)。挿入・削除・置換の各操作を合計で何回行えばもう一方の文字列に変換できるかで距離を測る。
挿入・削除・置換はそれぞれ以下の操作を指す。
・挿入:1文字を挿入する e.g. たこ → たいこ
・削除:1文字を削除する e.g. たいこ → たこ
・置換:1文字を置換する e.g. たこ → たて
これらの操作を最小の回数だけ行い、もう一方の単語に変換する。
例えば「とまと」と「たまご」なら、以下のように2回置換を行うのが一番手数が少なく変換できるのでレーベンシュタイン距離は2となる(※)
「とまと」
→ とをたに変換して「たまと」
→ とをごに変換して「たまご」
※ 用途により挿入・削除・置換それぞれの重み付けを変えることもできるが、等しい重みとしている(以下も同様)。

レーベンシュタイン距離を求めるには

以下のような2次元配列を用意して、各セルに各単語のクロスする部分文字列間のレーベンシュタイン距離を入れていく。
左上は□(空文字列)と□(空文字列)のレーベンシュタイン距離で、同じ文字列なのでレーベンシュタイン距離は0と確定する。そこから右下に向かって順次求めていくと、最後に元々求めたかった文字列間のレーベンシュタイン距離が求まる。
具体的な流れを見ていくと
① 空文字との距離である0行目・0列目はそれぞれ削除と挿入により至るセルなので、それぞれ一つ上/左のセルの距離+1が入る
② ①以外のセルは、置換によってもたどり着けるセルのため、削除と挿入に加え置換も考える。置換については斜め上のセルの距離から+1あるいは+0となる。挿入・削除・置換のうち、一番短い距離がそのセルの値となる(なるべく短くなるような距離が求めたいため)。
③ 最終的に右下までたどり着くと、2単語間のレーベンシュタイン距離が求まったことになる。
※ 一般に文字列が長いほど距離が長くなるため、レーベンシュタイン距離同士を比較する際に文字列の長さで割って標準化する場合もある

Pythonでレーベンシュタイン距離を求める処理を書く

上記の流れを素直にそのままPythonで書いた。
def levenshtein(word_A, word_B):
    # 挿入・削除・置換のコストを定義しておく
    INSERT_COST = 1
    DELETE_COST = 1
    SUBSTITUTE_COST = 1

    # 2次元配列を用意しておく
    distances = []
    len_A = len(word_A)
    len_B = len(word_B)
    dp = [[0] * (len_B + 1) for _ in range(len_A + 1)]

    # 上記①の工程
    for i in range(len_A + 1):
        dp[i][0] = i * INSERT_COST
    for i in range(len_B + 1):
        dp[0][i] = i * DELETE_COST

    # 上記②の工程
    for i_A in range(1, len_A + 1):
        for i_B in range(1, len_B + 1):
            insertion = dp[i_A - 1][i_B] + INSERT_COST
            deletion = dp[i_A][i_B - 1] + DELETE_COST
            substitution = (
                dp[i_A - 1][i_B - 1]
                if word_A[i_A - 1] == word_B[i_B - 1]
                else dp[i_A - 1][i_B - 1] + SUBSTITUTE_COST
            )
            dp[i_A][i_B] = min(insertion, deletion, substitution)
    
    # 上記③の工程
    distance = dp[len_A][len_B] / max(len_A, len_B)  # 標準化している
    return distance

ワンライナーの演習用環境をDockerで作成する

1日1問、半年以内に習得 シェル・ワンライナー160本ノック という、シェルのワンライナー(その場かぎりの1行プログラム)の問題が160問載っている本を解いている。

単純にコマンドの学習だけでなくLinuxやその周辺知識も学びがあるので、自分にとっては結構難しい問題もあるのだけど、コツコツと進めている。

この記事では、こちらも学習ついでにLinuxの環境をDockerで作成し、それを使って演習問題を解いているので、その内容を記載する。

環境

・PC環境:
 macOS 10.15

・Dockerで作成する環境(書籍内で動作確認したと記載されている環境):
 Ubuntu20.04 LTS
 UTF-8を用いる日本語環境(ja_JP.UTF-8)

結論

必要なパッケージや設定等をDockerfileに記載し、それを元に作成したコンテナ内でbashを起動している。

Dockerfile

FROM ubuntu:20.04
RUN apt update && \\
    apt install -y language-pack-ja && \\
    update-locale LANG=ja_JP.UTF-8 && \\
    echo "export LANG=ja_JP.UTF-8" >> ~/.bashrc
RUN apt install -y man-db && yes | unminimize
RUN apt-get update && \\
    apt install -y vim
# 以下、必要なコマンドが出てくるたびに追加していく
RUN apt install -y gawk
RUN apt install -y imagemagick
RUN apt install -y parallel
RUN apt install rename
RUN apt install num-utils
RUN apt install -y pandoc

手順

$ docker build -t shell_oneliner .
$ docker run -it --rm -v (ホストマシン上のディレクトリのパス):(コンテナ内のマウントされるディレクトリのパス) shell_oneliner /bin/bash

詳細

Dockerfileとコマンドの内容について記載する。

日本語入力が行えるようにする

随所で日本語の文字列を扱う問題が出てくるが、初めに作成した環境では日本語の文字列を入力することができなかった。

コンテナ環境内でロケール設定を確認すると以下のようになっていた。

$ locale
LANG=
LANGUAGE=
LC_CTYPE="POSIX"
LC_NUMERIC="POSIX"
LC_TIME="POSIX"
LC_COLLATE="POSIX"
LC_MONETARY="POSIX"
LC_MESSAGES="POSIX"
LC_PAPER="POSIX"
LC_NAME="POSIX"
LC_ADDRESS="POSIX"
LC_TELEPHONE="POSIX"
LC_MEASUREMENT="POSIX"
LC_IDENTIFICATION="POSIX"
LC_ALL=

本の中で、日本語環境の設定として以下のコマンドが示されている(練習1.2.gの補足)。

$ sudo apt install language-pack-ja
$ sudo update-locale LANG=ja_JP.UTF-8

当初、これをDockerfileに組み込んだのだけどlocaleの結果が上記と同様のままで日本語入力ができるようにならなかった。以下のように起動時に読み込む.bashrcへの追記を追加したらlocaleも変わり日本語入力可能になった。

RUN apt update && \\
    apt install -y language-pack-ja && \\
    update-locale LANG=ja_JP.UTF-8 && \\
    echo "export LANG=ja_JP.UTF-8" >> ~/.bashrc
$ locale
LANG=ja_JP.UTF-8
LANGUAGE=
LC_CTYPE="ja_JP.UTF-8"
LC_NUMERIC="ja_JP.UTF-8"
LC_TIME="ja_JP.UTF-8"
LC_COLLATE="ja_JP.UTF-8"
LC_MONETARY="ja_JP.UTF-8"
LC_MESSAGES="ja_JP.UTF-8"
LC_PAPER="ja_JP.UTF-8"
LC_NAME="ja_JP.UTF-8"
LC_ADDRESS="ja_JP.UTF-8"
LC_TELEPHONE="ja_JP.UTF-8"
LC_MEASUREMENT="ja_JP.UTF-8"
LC_IDENTIFICATION="ja_JP.UTF-8"
LC_ALL=

manコマンドが使えるようにする

初めに作成した環境ではmanコマンドを使用することができなかった(下記のとおり、最低限の構成のためmanコマンドは含まれていない)。

$ man bash
This system has been minimized by removing packages and content that are
not required on a system that users do not log into.

To restore this content, including manpages, you can run the 'unminimize'
command. You will still need to ensure the 'man-db' package is installed.

上記コメントに従い、man-dbのインストールとunminimizeの実行を行う以下の1行をDockerfileに追加した。

RUN apt install -y man-db && yes | unminimize

unminimizeコマンドの実行中、y/nの選択をする必要がある。yesコマンドを挟むことで、unminimizeコマンドの実行時にy(yes)を継続的に渡してくれる。

Vimを使えるようにする

問題はワンライナーがほとんどなのだけど、時々シェルスクリプトを作成する問題もあったため、Vimを使えるようにした。

はじめRUN apt install -y vimをDockerfileに追加したのだけど、エラーになった。

Failed to fetch <http://security.ubuntu.com/ubuntu/pool/main/s/sqlite3/libsqlite3-0_3.31.1-4ubuntu0.2_amd64.deb>  404  Not Found [IP: 91.189.91.38 80]
#12 17.78 E: Unable to fetch some archives, maybe run apt-get update or try with --fix-missing?
------
executor failed running [/bin/sh -c apt install -y vim]: exit code: 100

コメントに素直に従ってrun apt-get updateを追加したら成功するようになった。(aptコマンドを使用している場合でもapt-getのupdateが必要なのか…と思ったけど、この部分は少し調べてもわからなかった)

RUN apt-get update && \\
    apt install -y vim

コマンド実行

  • Dockerイメージの作成
$ docker build -t shell_oneliner .

Dockerfileと同じディレクトリで実行する。-tをつけてイメージにshell_onelinerというイメージ名をつけている。

  • コンテナの作成・起動
$ docker run -it --rm -v (ホストマシン上のディレクトリのパス):(コンテナ内のマウントされるディレクトリのパス) shell_oneliner /bin/bash

問題に使用するファイルがGitHubのリポジトリに用意されているため、そのリポジトリをクローンして手元に持ってきて、コンテナ作成時にマウントしてコンテナ内で当該ファイルを使用できるようにした。

GitHub Actionsでlinterとformatterを実行する


GitHub Actionsでlinterとformatterが自動で実行されるようにするため、学習を兼ねて、GitHubリポジトリの環境を以下手順で改修していく。
1. パッケージ管理をPipenvからPoetryに切り替える
2. linterとformatterを導入する
3. GitHub Actionsでlinterとformatterを実行する(この記事) 前提:
・Mac OS 10.15
・Python 3.7
・対象とするリポジトリは yrarchi / household_accounts
3. GitHub Actionsでlinterとformatterを走らせる
pushされたことをトリガーにして、BlackとFlake8によるチェックを自動で実行するワークフローを作成する。

① GitHub Actions の基本的な書き方を確認する

.github/workflows/以下にYAMLファイルを作成することで、ワークフローを作成することができる。GitHub Docs: GitHub Actions / Learn GitHub Actions / Understanding GitHub Actionsを参照すると、YAMLファイルの基本的な型は以下のようになる。
name: learn-github-actions  # ワークフロー名  
on: [push]  # イベント: ワークフローを実行するトリガー
jobs:
  check-bats-version:  # ジョブ: 同じランナーで実行されるステップのセット
    runs-on: ubuntu-latest  # ランナー: ワークフローを実行するサーバ (Linux, Mac, Windows)
    steps:  # ステップ: 同じランナーで実行されるタスク
      - uses: actions/checkout@v3  # uses: アクション(繰り返されるタスク)の実行
      - uses: actions/setup-node@v3
        with:
          node-version: '14'
      - run: npm install -g bats  # run: コマンドの実行

② YAMLファイルの作成

① で確認した書き方に沿って、今回の目的に合わせて書き換えてみる。
name: CI
on:
  push:
jobs:
  lint_and_format:
    runs-on: macos-10.15
    steps:
      - uses: actions/checkout@v3
      - uses: actions/setup-python@v3
        with:
          python-version: '3.7.8'
      - name: Install Poetry
        run: |
          curl -sSL https://install.python-poetry.org | python -
          echo "$HOME/.local/bin" >> $GITHUB_PATH
      - name: Install Dependencies
        run: poetry install --no-interaction
      - name: Lint with flake8
        run: poetry run flake8 .
      - name: Format with black
        run: poetry run black --check .
以下、部分ごとに分割して設定内容を記載する。
イベント: ワークフローを実行するトリガー
 on:
   push:
GitHub Docs: GitHub Actions / Using workflows / Events that trigger workflows の通りに設定した。
For example, you can run a workflow when the push event occurs.
on:
  push
・ランナー: ワークフローを実行するサーバ
  runs-on: macos-10.15
GitHub Docs: GitHub Actions / Using jobs / Choosing the runner for a job に使用可能なランナーの種類として、Windows / Ubuntu / MacOS が記載されている。今回GitHub Actionsを設定するリポジトリは、Mac環境上で動くデスクトップアプリのためMacを選択した。 ・ステップ: 同じランナーで実行されるタスク
   steps:
      - uses: actions/checkout@v3
      - uses: actions/setup-python@v3
        with:
          python-version: '3.7.8'
↑ リポジトリをcheckoutし、Python環境を作成している。
actions/checkout@v3
actions/setup-python@v3
      - name: Install Poetry
        run: |
          curl -sSL https://install.python-poetry.org | python -
          echo "$HOME/.local/bin" >> $GITHUB_PATH
      - name: Install Dependencies
        run: poetry install --no-interaction
↑ Poetryをインストールした上で、必要なライブラリのインストールを行っている。 echo "$HOME/.local/bin" >> $GITHUB_PATH は、Poetryのインストールされた /Users/runner/.local/bin$GITHUB_PATH に設定している。poetry install を行うステップと分けるのは、GitHub Docs: GitHub Actions / Using workflows / Workflow commands に記載の以下の理由のため必要と理解した。
Prepends a directory to the system PATH variable and automatically makes it available to all subsequent actions in the current job; the currently running action cannot access the updated path variable.
      - name: Lint with flake8
        run: poetry run flake8 .
      - name: Format with black
        run: poetry run black --check .
↑ Flake8 と Black を実行している。

③ pushしてワークフローが実行されるか試してみる

git push するとワークフローが実行された。ActionsタブでGitHub Actions の実行結果を確認することができる。
各ステップの名称をクリックすると、その詳細を確認することができる。例えば Format with black をクリックすると、poetry run black –check . を実行した結果を確認できる。
Run poetry run black --check .
Skipping .ipynb files as Jupyter dependencies are not installed.
You can fix this by running ``pip install black[jupyter]``
All done! ✨ 🍰 ✨
13 files would be left unchanged.
以上で、当初の目的であったGitHub Actionsでlinterとformatterを実行する設定をすることができた。

linterとformatterを導入する


GitHub Actionsでlinterとformatterが自動で実行されるようにするため、学習を兼ねて、GitHubリポジトリの環境を以下手順で改修していく。
1. パッケージ管理をPipenvからPoetryに切り替える
2. linterとformatterを導入する(この記事)
3. GitHub Actionsでlinterとformatterを実行する 前提:
・Mac OS 10.15
・Python 3.7
・対象とするリポジトリは yrarchi / household_accounts
2. linterとformatterを導入する
linterとしてFlake8、formatterとしてBlackをインストールしていく。

① Blackのインストール

Black: Installation を参照しつつ、PipenvからPoetryに切り替える でPoetryを使うように切り替えたため、Poetryでインストールする。
$poetry add --dev black
Using version ^22.3.0 for black

Updating dependencies
Resolving dependencies... (14.8s)

Writing lock file

Package operations: 7 installs, 0 updates, 0 removals

  • Installing click (8.1.3)
  • Installing mypy-extensions (0.4.3)
  • Installing pathspec (0.9.0)
  • Installing platformdirs (2.5.2)
  • Installing tomli (2.0.1)
  • Installing typed-ast (1.5.3)
  • Installing black (22.3.0)
インストール後、pyproject.tomlに追加されているのを確認した。
[tool.poetry.dev-dependencies]
…
black = "^22.3.0"

② Blackの動きを確認する

①でインストールしたので、どのように実行されるか試してみる。
Black: The basics / Usage / Writeback and reporting
By default Black reformats the files given and/or found in place. Sometimes you need Black to just tell you what it would do without actually rewriting the Python files.
There’s two variations to this mode that are independently enabled by their respective flags. Both variations can be enabled at once.
Blackは通常フォーマットしてしまうけど、チェックのみかける方法として2つの方法が記載されていたので、これを試してみる。
・1つ目の方法: –check
Passing --check will make Black exit with:
・ code 0 if nothing would change;
・ code 1 if some files would be reformatted; or
・ code 123 if there was an internal error
実際に実行してみる。
$poetry run black household_accounts/calc.py --check
would reformat household_accounts/calc.py
Oh no! 💥 💔 💥
1 file would be reformatted.

$echo $?
1
Blackのルールに引っかかる箇所があったため、実行するとフォーマットされる旨が出ており、終了ステータスは1となっている。
・2つ目の方法:–diff
Passing --diff will make Black print out diffs that indicate what changes Black would’ve made. They are printed to stdout so capturing them is simple.
実際に実行してみる。
$poetry run black household_accounts/calc.py --diff
--- household_accounts/calc.py  2021-05-09 08:22:51.153728 +0000
+++ household_accounts/calc.py  2022-05-04 02:00:42.554375 +0000
@@ -1,27 +1,34 @@
 import re
 import config

-def calc_price_tax_in(price_list, discount_list, reduced_tax_rate_flg_list, tax_excluded_flg):
+
+def calc_price_tax_in(
+    price_list, discount_list, reduced_tax_rate_flg_list, tax_excluded_flg
+):
(中略)

All done! ✨ 🍰 ✨
1 file would be reformatted.
実際に実行した場合に修正される箇所を確認することができる。

③ Flake8のインストール

Flake8: Quickstart / Installation を参照しつつ、PipenvからPoetryに切り替える でPoetryを使うように切り替えたため、Poetryでインストールする。
$poetry add --dev flake8
Using version ^4.0.1 for flake8

Updating dependencies
Resolving dependencies... (0.5s)

Writing lock file

Package operations: 4 installs, 0 updates, 0 removals

  • Installing mccabe (0.6.1)
  • Installing pycodestyle (2.8.0)
  • Installing pyflakes (2.4.0)
  • Installing flake8 (4.0.1)

④ Flake8の動きを確認する

実際にFlake8を動かしてみる。
$poetry run flake8 household_accounts/calc.py
household_accounts/calc.py:12:80: E501 line too long (87 > 79 characters)
household_accounts/calc.py:25:80: E501 line too long (80 > 79 characters)
household_accounts/calc.py:32:80: E501 line too long (86 > 79 characters)
1行が長すぎると出た。Black: Code style / Line lengh を参照して、Flake8と併用する場合のおすすめ設定に倣い、setup.cfgを以下のように設定した。
[flake8] 
max-line-length = 88 
extend-ignore = E203
また、$poetry run flake8 . と特定のファイルでなく全体に対してFlake8を走らせたところ、時間がかかり.venvの中にも指摘が大量に出た。そのため、Flake8: Options and their Descriptions / –exclude を参照してFlake8の対象外とするファイルの設定を追加した。
[flake8] 
max-line-length = 88 
extend-ignore = E203 
exclude = .venv/  # これを追加
以上で、Flake8とBlackが使えるようになった。次にこれらを自動で実行するようにGitHub Actionsを設定していく。→ GitHub Actionsでlinterとformatterを実行する

PipenvからPoetryに切り替える


GitHub Actionsでlinterとformatterが自動で実行されるようにするため、学習を兼ねてGitHubリポジトリの環境を以下手順で改修していく。
1. パッケージ管理をPipenvからPoetryに切り替える(この記事)
2. linterとformatterを導入する
3. GitHub Actionsでlinterとformatterを実行する 前提:
・Mac OS 10.15
・Python 3.7
・対象とするリポジトリは yrarchi / household_accounts
1. パッケージ管理をPipenvからPoetryに切り替える
Pythonのパッケージ管理ツールとしてPipenvを使用していたが、Pipfile.lockの生成に時間がかかることが多かったためPoetryに切り替えることにする。
PipenvからPoetryへ切り替える際に行った手順を記載していく。

① Poetryのインストール

当初、以下を参照してインストールを実行した。
Poetry: Introduction / Installation
$ curl -sSL https://raw.githubusercontent.com/python-poetry/poetry/master/get-poetry.py | python -
結果、インストール自体はできたのだが、This installer is deprecated. と表示されたため、アンインストールして、同じくPoetryのドキュメント(master版) 記載の別の方法でインストールをし直した。
Poetry: Introduction / Installation
$ curl -sSL https://install.python-poetry.org | python3 -
bash_profileを確認すると、パスに自動で追加されていた。
export PATH="$HOME/.poetry/bin:$PATH"
以下が実行できたのでpoetryコマンドを使える状態になったようだ。
$ poetry --version
Poetry version 1.1.13

② Poetryの設定

仮想環境をプロジェクトのルートディレクトリに作るようにしたいので、以下を参照して設定しておく。
Poetry: Configuration / Available settings / virtualenvs.in-project
Create the virtualenv inside the project’s root directory. Defaults to None.
If set to true, the virtualenv will be created and expected in a folder named .venv within the root directory of the project.
To change or otherwise add a new configuration setting, you can pass a value after the setting’s name:
poetry config virtualenvs.path /path/to/cache/directory/virtualenvs
以下のコマンドを実行する。
$ poetry config virtualenvs.in-project true
設定できたことを以下で確認した。
$ poetry config virtualenvs.in-project
true

③ ライブラリのインストール

すでにある環境からPoetryに乗り換える方法が記載されていたので、その方法を参照して進めていく。
Poetry: Basic usage / Initialising a pre-existing project
Instead of creating a new project, Poetry can be used to ‘initialise’ a pre-populated directory. To interactively create a pyproject.toml file in directory pre-existing-project:
cd pre-existing-project 
poetry init
poetry init を実行すると、パッケージ名やライセンス等、順に聞かれていくのでそれに答えていくとpyproject.tomlが作られた。
$poetry init

This command will guide you through creating your pyproject.toml config.

Package name [household_accounts]:
Version [0.1.0]:
(後略)
$poetry install
Creating virtualenv household-accounts in /Users/username/src/household_accounts/.venv
Updating dependencies
Resolving dependencies... (0.1s)

Writing lock file

Installing the current project: household_accounts (0.1.0)
次に、以下を参照して使用しているライブラリ類をインストールした。 Poetry: Basic usage / Specifying dependencies
instead of modifying the pyproject.toml file by hand, you can use the add command.
$ poetry add pendulum
$poetry add pyocr==0.8
$poetry add --dev jupyterlab==3.2.9  # 開発用のパッケージは--devをつける

④ Pipenvのアンインストール

Pipenvはhomebrewでインストールしていたので、以下でアンインストールを行った。
$brew uninstall pipenv
Uninstalling /usr/local/Cellar/pipenv/2022.4.21... (2,654 files, 39.6MB)
Pipenvのアンインストール後、Pipenvでのみ使用していた依存パッケージが残っていたため、それらもアンインストールした。 ※ 依存関係の確認は以下のコマンドで行った
$brew deps --tree pipenv  # pipenvの依存パッケージ
pipenv
├── python@3.10
│   ├── gdbm
│   ├── mpdecimal
│   ├── openssl@1.1
│   │   └── ca-certificates
│   ├── readline
│   ├── sqlite
│   │   └── readline
│   └── xz
└── six

$brew uses --installed python@3.10  # python@3.10に依存しているパッケージ
pipenv
以上でPipenvからPoetryに切り替えが終了したので、次に元々やりたかったlinerとformatterの導入を行っていく。→ linterとformatterを導入する

BigQuery: TIMESTAMPに対し日付を指定する際の間違い事例集

BigQueryでTIMESTAMPのカラムに対し日付を指定する際に、自分が今まで誤って行った方法をまとめてみる。

課題設定

startTime というTIMESTAMP型のカラムがあるテーブルを対象として、
startTime
----------
2016-01-01 01:00:00 UTC
2013-04-21 11:33:01 UTC
2019-09-27 10:30:20 UTC
…
startTimeが2016年7月1日のレコードのみを取り出したいと想定する。
SELECT startTime, … 
FROM dataset.table
WHERE [  ?  ]  // 2016-07-01のデータのみ取り出したい場合に、どう書くのが適切なのか
(話を単純にするため、UTCからの変換は考慮に入れる必要がないと仮定する) ものすごく単純な課題にもかかわらず、色々踏み抜いてきたので以下一つずつ書いていく。

失敗例1:

日付である 2016-07-01 を指定するぞ、という意識だけで以下のクエリをまず書いた。
WHERE startTime = date(2016, 7, 1)
※ BigQueryのDATE型での特定の日時の書き方は以下を参照:
BigQueryリファレンス: Date functions 対象のstartTimeがTIMESTAMP型だという認識がそもそも持てていない時の失敗。( No matching signature for operator = for argument types: TIMESTAMP, DATE. と怒られる)

失敗例2:

比較対象の型に合わせないといけないのだった、DATEからTIMESTAMPに変えよう…となって、次に以下のようなクエリを書いた。
WHERE startTime = timestamp('2016-07-01')
※ BigQueryでのTIMESTAMP型での特定の日時の書き方は以下を参照:
BigQueryリファレンス: Timestamp functions これは実行できてしまうのだけど、意図した形では動かない。 timestamp('2016-07-01')2016-07-01 00:00:00 UTCのことなので、00時ちょうどのデータしか合致しない。結果が1行も返ってこないか、想定よりとても少ない件数が返ってくることになる(count などで気づければよいのだけど、処理途中の部分だったりすると気づけないことがある)。

失敗例3:

では時刻まで含めた形で指定すれば良いのでは、となって以下のクエリを書いた。
WHERE startTime BETWEEN timestamp('2016-07-01 00:00:00') AND timestamp('2016-07-01 23:59:59')
2016/07/01 のレコードの大半がこれで取得できるけど、正確ではない(終了側の指定の仕方が適切ではない)。 TIMESTAMP の説明として以下の記載がある。
BigQueryリファレンス: Data-types
A TIMESTAMP object represents an absolute point in time, independent of any time zone or convention such as Daylight Savings Time with microsecond precision.
TIMESTAMPはマイクロ秒の精度であることがわかる。範囲としては 0001-01-01 00:00:00 to 9999-12-31 23:59:59.999999 UTCとある。 そのため、記載したクエリだと2016-07-01の23時59分59.5秒のデータなどが範囲に含まれないことになってしまう。

失敗例4:

ではTIMESTAMPの方をDATE型に変換してしまうか、となって以下のクエリを書いた。
WHERE date(startTime) = date(2016, 07, 01)
DATE(timestamp_expression)という形式でTIMESTAMPをDATEに変換できる BigQueryリファレンス: Date funcitons これでも2016/07/01のレコードを取得できる。ただ、これがパーティション分割テーブルだったりすると、全てスキャンされることになりパフォーマンスが下がる。 ※ パーティション分割テーブルのスキャン範囲についてはこのあたりに記載がある
BigQueryリファレンス: パーティション分割テーブルに対するクエリ
クエリでスキャンされるパーティションを制限するには、フィルタで定数式を使用します。クエリフィルタで動的式を使用すると、BigQuery はすべてのパーティションをスキャンする必要があります。

成功例:

数々の失敗を経て以下の形式にたどり着いた。
WHERE 
  startTime >= timestamp('2016-07-01 00:00:00') 
  AND startTime < timestamp('2016-07-02 00:00:00')
これだと、意図した範囲を指定できているし、定数式にもなっている。
(この条件を満たす他の書き方もあるのかもしれないけど…) 1つのネタでよくこれだけ失敗できたなという感があるけど、色々回り道したことで理解が深まったのでよかったと思うことにする。

BigQueryへのデータ読み込み_3.Cloud Data FusionでCSVを整形する

前の記事では、Cloud Functionsを使い、Cloud StorageにCSVファイルが追加されたらBigQueryに自動で読み込むことをしてみた。この際、CSVは予め手動で整形した上でCloud Storageに置いていた。
この記事では、CSVをBigQueryに読み込み可能な状態に整形する過程を自動化する方法を探る。

概要

今回、Cloud Data Fusionを用いてCSV整形を実施してみた。
Cloud Data Fusionは、データパイプラインをGUIで作成できるサービスで、ETLのTransform部分も様々な機能が用意されているため、CSVの整形もGUIで行えてしまう。
今回は、以下の2段階で進めた手順をまとめた。
A. 定型的にCSVを整形し、BigQueryに読み込む手順をCloud Data Fusionで作成する
B. 任意のCSVファイル名を渡したらA.が実行されるようにする

事前準備

クイックスタート|Cloud Data Fusion ドキュメント を参照して、APIを有効にし、インスタンスを作成しておく。
下記のように、インスタンス名の前に緑のチェックマークが入ったら作成完了。
操作方法や概観をつかむため、事前に以下のチュートリアルをやった。
ターゲティングキャンペーンパイプライン | Cloud Data Fusion ドキュメント 顧客一覧から、住所が特定の条件に合致する顧客情報のみを抜き出すという内容。今回の目的のうち、CSVの整形を行う部分で参考になった。
再利用可能なパイプラインの作成 | Cloud Data Fusion ドキュメント 引数で情報を渡すよう設定し、再利用可能なパイプラインを構築するという内容。今回の目的のうち、CSVのファイル名を与えたら実行されるよう設定しておく部分で参考になった。

手順

A. 定型的にCSVを整形し、BigQueryに読み込む手順をCloud Data Fusionで作成する

スタートとゴール

前の記事同様、気象庁のデータを使用する。
気象庁のHPからダウンロードした段階では、以下のような形式になっている。このCSVをCloud Storageの特定のフォルダに置いた状態からスタートする。
ダウンロードした時刻:2021/11/21 16:31:04

,東京,東京,東京,東京,東京,東京
年月日,平均気温(℃),最高気温(℃),最低気温(℃),最大風速(m/s),最大風速(m/s),天気概況(昼:06時~18時)
,,,,,風向,
2021/1/1,4.4,10.5,-1.3,3.1,北北東,快晴
2021/1/2,4.8,10.8,0.1,4.7,北北東,快晴
これを下記の形式にData Fusion内で整形し、BigQueryに格納することを目指す。
date,ave_temp,max_temp,min_temp,max_wind_speed,wind_direction,weather
2021-01-01,4.4,10.5,-1.3,3.1,北北東,快晴
2021-01-02,4.8,10.8,0.1,4.7,北北東,快晴

読み込み先のBigQueryの準備をしておく

BigQueryに読み込み先のデータセットとテーブルを準備しておく。 スキーマはこれからCloud Data Fusionで整形する予定の内容に合わせておく。

② Cloud Data Fusion UIでCSVを読み込む

「事前準備」でインスタンスを作成してあるので、アクションの「インスタンスを表示」をクリックすると、UIに画面が遷移する。
Wrangleをクリックする。
Upload元としてCloud Storageを選択し、用意しておいたCSVを読み込むと、以下のような感じで表示される。行によってカンマの数が異なるデータのため、うまくカンマ区切りで読み込まれず、全て1列に入る形になっている。

③ データを整形する

ゴールの内容に合うようにデータを整形していく。
  • 不要なヘッダーの削除
    今回は冒頭の5行を消してしまい、列名は後から付け直す方針で進める。 Filter > value matches regex で正規表現により日付から始まる列のみ残すように(冒頭の5行を削除するように)した。
  • カンマ区切りで区切る Parse > CSV を選択し、区切り文字はCommaを選択する。
ここまでの処理でこのような形式になっている。
  • 不要な列を削除する
    カラム区切りで分割した際、分割前の状態も列として残っていている。不要なため Delete columnより削除する。
  • 列名をつける
    各列打ち替えていく。
  • 型変換
    • 日付:読み込んだ時点では1行全体で1つの文字列として判断されているため、現状日付も文字列となっている。Parse > Simple date より、今回2021/1/1形式だったのでCustom formatを選択し、yyyy/m/dで実行した。
すると、不要な時間までついてきてしまった。
公式のドキュメントでは見つけられなかったのだけど、stack overflowを参照して、Custom transform > date.toLocalDate()をかけたら、Date型になってくれた。
  • 数値:Change data type > Floatを選択して変換する。
Wranglerで行った各変換は画面右で表示されており、不要な処理を削除することが可能。

④パイプライン作成

Create a Pipeline をクリックし、Batch pipelineを選択すると
データ元のGCSと設定したWranglerがパイプラインでつながった状態で表示される。

⑤ データの書き出し先を設定する(BigQuery)

Sink > BigQueryを選択(Sinkはデータの出力先)してBigQueryのノードを追加し、Wranglerと接続する。
ポインタをBigQuryノードの上に置くとPropertiesが表示されるので、クリックする。BigQuery側のデータセットやテーブル等の指定を行う。

⑥ デプロイと実行

Runをクリックすると、処理が進んでいく(けっこう時間がかかった 20分程度)。Statusの部分の表示で処理の段階がわかり、無事成功すると最終的にSucceededになる。

B. 任意のCSVファイル名を渡したらA.が実行されるようにする

A. では、元となるCSVファイル名をベタ打ちで指定していた。これだとファイル名が変わるたびに毎回作り直す必要があるので、任意のファイル名を渡して処理を実行できるようにしたい。
A. の処理をベースにし、一部書き換える。

① データの読み込み先の変更

Cloud Storageからの読み込んでいる最初のノードのPropertiesをクリックし、読み込み先のパスの指定部分末尾のCSVファイル名が入る部分を${csv_file_name}に変更する。
※ 下図のFormat部分をcsvとしているが、ここは今回textにする必要があった(csvだと1行目しか読み込まれなかった。おそらく、今回のCSVファイルがCSVと判断される形式でなかったのが要因)

② デプロイ

変数を入れてRunしてみると、無事成功した。

未解決な部分

今回文字列として読み込まれた小数点の数値列について、文字列からFloatに変換すると元々存在しなかった不要な桁が出てしまったが、これをうまく処理することができなかった。
(Wranglerの処理のプレビュー画面 不要な桁が残る処理となっている)
round関数があるので、wranglerでround(ave_temp, 1)などと試してみたが、エラーになり、roundの桁数を指定する方法が見つけられなかった。
この記事の中で、
We could write the recipe for the transformation directly with Directives using JEXL syntax(https://commons.apache.org/proper/commons-jexl/reference/syntax.html)
とあったので、該当ページを参照したが、今回行いたい操作のヒントは得ることができなかった。

まとめ

今回、Data Fusionを利用してCSVファイルの整形を行ってみた。GUIで様々な処理が行えるのはとっつきやすくはあったが、少し込み入った処理をしようとするとやりづらかったり、結局学習コストがかかるので、コードを書いて管理する方が楽かもしれないと感じた。
次は、ワークフローを管理できるCloud Composerを触ってみたい。

ふりかえり方法のふりかえり2021

これは GMOペパボ ディレクター Advent Calendar 2021 の13日の記事です。 今年2021年は、個人のふりかえりを週次で行ってみた。
もともと日記を(日次で)書いてはいたけど、出来事を数行おざなりに書く程度だった。1日の終わりに書く日記にそれ以上の気力が出ず、日記の改善はあきらめて週次でふりかえるように変更した。
2021年のふりかえりとして、この週次のふりかえりについてふりかえってみる。

どういう形式でふりかえったか

以下の項目を立ててふりかえるようにしていた。
今週のトピック
急に「今週をふりかえるぞ」と思っても、そもそもどのような週だったかが意外と記憶から抜け落ちている。まずは何が起きた/した週だったのか書き出す。
KPT
上記の書き出しでだいたいどんな週だったか掴めたら、Keep / Problem / Try という観点からその週をふりかえる。
課題
上記2項目は短文での簡単なふりかえりなので、もっと突っ込んでふりかえりたい出来事については別で書く。
こんな感じで書いていた(思ったことをそのまま書いていくので、後から読み返すと理論がつながっていない)

うまくいったところ

日々の感情の動きを少し俯瞰できた

ふりかえろうとすると、心の中で思っているだけなら不要だった言語化が必然的に必要になる。
漠とした気持ちをどうにか文章に落とそうとする過程で整理されていき、感情がラベリングされる。これがその時点での俯瞰につながり、毎週の記録が積み重なっていくことで過去との差分が見えてきて、それが長期的な視点での俯瞰につながったように思う。
これは「いったい何週間同じことで悩んでいるのか、悩み方に進歩がない」ということを(薄々気づいてはいるのだが)明確に突きつけてくるので、その点でも良かったと思う。

継続して取り組むことができた

こういった取り組みは徐々にやらなくなってしまうことが多い(自分は)。今回、途中飛んだ週は若干あったものの、継続できたのは以下の点がうまく働いたように思う。
・取り組む時間と場所を決めた
毎週土曜日の午前中にやるようにしていた。休日の方が外的要因による気持ちの波が小さく、午前中だと睡眠により脳が整理されていて取り組みやすかった。また(コロナが落ち着いている時期は)カフェで行うようにしていたので、場所も固定され「この時間にここに行ったらやる」という習慣になりやすかったと思う。
・予定を設定した
やることをNotionで管理しているので、毎週このふりかえりのtodoを入れるようにした。必ずNotionは見るので、todoが未完了で残る気持ち悪さが着手を後押しした。

うまくいかなかったところ

本当にしんどい時は書けなかった

本来、何かつらいこと(≒ 学びにつながる可能性が高いこと)があった時こそふりかえるべきなのだろうけど、それができない週もあった。でもその週の記録が残っていないと「あの時期苦しかったな」という漠然とした認識になってしまうし、自分に都合の良いように記憶を改竄してしまう恐れもある。
同じつらかったこと/失敗したことでも、ふりかえりを書ける時と書けない時が存在した。 書けるのは、その出来事の直後に「しまった、ああすれば良かった」と悔いたときだった。要は次回の改善策まで思い至っている時で、失敗を思い出すのは嫌でも、とにかく書いてしまえば脳内で抱えておく必要がなくなる。そのため、吐き出すように勢いよく改善策まで書き下せてしまえる。
一方、書けないのは、つらいという感情に支配され、しかもどうすればよいかわからない時だった。つらさの渦中にあるからとにかくそれを想起したくない、文章にすると自分でそれを認めることになるから苦しくて書けない。
→ 改善策:ふりかえりのタイミングを一部フレキシブルにする
毎週土曜に固定してふりかえっていたけど、まだその出来事を消化できておらず向き合えないタイミングで土曜日となることもある。ならば、それをタイトルとして1行書いておいて、消化できたタイミングで詳細を書きに戻れば良いのではと考えた。逆パターンとして、土曜にふりかえるからいいやと放置したことで、いざ書こうとした時に記憶が薄れている場合もあった。土曜にしか書かないのでなく、段階的に書き足していく方式を取ってみたい。

ふりかえりで行動が改善されたのかが不明

毎週ふりかえってはいたものの、では実施しなかった時に比べて本当に自分の行動が改善されたのかと問われると、明確に言えないことに気づいた。ただの自己満足なら、毎週20分程度時間をかけている意味は薄い。
→ 改善策:先週のふりかえりの記録を読み返す時間を作る
KPTのTryを出したり、課題に対する対応策を書いて、それで終わりになっていることが多かった。実際にそのTryや対応策を実施できたのか、実施したならその対応は適切だったのか、実施できなかったならどうすれば実施できるようになるのか。そのあたりをふりかえる機会を設ければ、毎週のふりかえりが連続したものとなっていき、何を改善できたかもう少し実感を持てそうだと思った。 以上でうまくいったところ/いかなかったところが洗い出せたので、それをもとに来年2022年も週次のふりかえりを続けてみようと思う。

BigQueryへのデータ読み込み_2.Cloud Storageにデータ追加されたら自動で読み込む

前の記事では、手動でBigQueryにデータを読み込む方法を試した。この記事では、データが新しく生成されたら自動で読み込みを行いたいケースを想定し、「Cloud StorageにCSVファイルが追加されたら、自動でBigQueryにデータを読み込む」ことを目標に進める。

概要

今回はCloud Functionsを使った。Cloud FunctionsはGCP上で関数を実行できるサービスで、関数を実行するトリガーの一つにCloud Storageでのイベントの発生がある。そのため、今回の「Cloud StorageにCSVファイルが追加されたら」もトリガーとして設定できる。
今回のイメージ図
今回、やりたいことを分解すると以下の2つが必要になる。
A. Cloud Storageにオブジェクトが追加されたことをトリガーとしてCloud Fuctionsの関数を実行する
B. Cloud Fuctionsの関数内で、BigQueryのデータにアクセスして操作を行う
今回、まずはA, Bそれぞれ単体で試して作り方を確認後、AとBを合わせて元々作りたかった関数を作成する手順で進めた。不慣れでA, B単体でもエラーが出たり色々とまどう部分があったので、それも含めて記載していく。
A. Cloud Storageにオブジェクトが追加されたことをトリガーとしてCloud Fuctionsの関数を実行する
B. Cloud Fuctionsの関数において、BigQueryのデータにアクセスして操作を行う
C. Cloud Storageにオブジェクトが追加されたらBigQueryにデータを読み込む

手順

A. Cloud Storageにオブジェクトが追加されたことをトリガーとしてCloud Fuctionsの関数を実行する

Cloud Storage のチュートリアル | Cloud Functions ドキュメントに、Cloud Storageでは下記4点のイベントに対応していると記載がある。
・ ファイナライズ
・ 削除
・ アーカイブ
・ メタデータの更新
このうち、ファイナライズについて
オブジェクト ファイナライズ イベントは、Cloud Storage オブジェクトの「書き込み」が正常にファイナライズされた時点でトリガーされます。つまり、新しいオブジェクトの作成または既存のオブジェクトの上書きによって、このイベントがトリガーされます。
とあるので、今回はファイナライズをトリガーとすればよいことがわかった。
同ドキュメント内でファイナライズのサンプル関数が掲載されているので、それを以下で実行してみた(ただし、ドキュメントはコマンドラインを用いた操作手順だったが、今回はCloud Consoleから操作した)。
① 関数を作成する(トリガーの指定)
1) 関数の作成をクリック
2) 関数名、リージョン、トリガーの内容等を入力
トリガーのEvent typeは先ほど記載したファイナライズ、バケットはCloud Fucntionsのオブジェクトのアップロード先バケットを指定する。
② 関数を作成する(関数の内容の記載)
チュートリアルに記載の内容のままmain.pyに記載する。
③ 関数が動作することを確認する
先ほど指定したCloud Storageのバケットに適当にテキストファイルをアップロードし、これをトリガーとして関数が実行されることを確認する。Cloud Functionsのログに、print関数の結果が吐き出されていることを確認した。

B. Cloud Fuctionsの関数において、BigQueryのデータにアクセスして操作を行う

クイックスタート: クライアント ライブラリの使用 | BigQuery ドキュメントにBigQueryのテーブルにクエリを投げてデータを取得する関数の事例が掲載されている。今回最終的に行いたいのはSelect クエリではなくデータのインサートだが、まずはこの事例を試してみる。
① クエリを実行する関数を作成する
A.と同様の手順で、記載の関数を少し変形してCloud Functionsにデプロイした。(※ このB.では、トリガーはHTTPリクエストとなっている)
main.py

from google.cloud import bigquery


def query_stackoverflow(request):
    client = bigquery.Client()
    query_job = client.query(
        """
        SELECT
          CONCAT(
            'https://stackoverflow.com/questions/',
            CAST(id as STRING)) as url,
          view_count
        FROM `bigquery-public-data.stackoverflow.posts_questions`
        WHERE tags like '%google-bigquery%'
        ORDER BY view_count DESC
        LIMIT 10"""
    )

    results = query_job.result()  # Waits for job to complete.

    for row in results:
        print("{} : {} views".format(row.url, row.view_count))
    return "finish!"
requirements.txt
google-cloud-bigquery>=1.28.0
② 関数をテストする
200が返ってきてprint関数が出力されていることが確認できる。
つまずいた点
上記手順で実行できるまでにつまずいた点を記載する。
1) requirements.txtの不足
requirements.txtを記載していなかったところ、以下のようなエラーが出た。
"/workspace/main.py", line 1, in  from google.cloud import bigquery ImportError: 
cannot import name 'bigquery' from 'google.cloud' (unknown location)
今回ローカルに開発環境を作らずCloud Consoleで行っていたので、予めインストールされるようなイメージを勝手に持ってしまっていたけど、requirements.txtへの記載が必要だった。
requirements.txtに google-cloud-bigquery>=1.28.0 を記載することで解決した。
2) 引数の不足
上記手順で def query_stackoverflow(request): としているところを当初 def query_stackoverflow(): として引数を入れていなかったらエラーとなった。ログには以下の記載があった。
TypeError: query_stackoverflow() takes 0 positional arguments but 1 was given
関数の内容上、引数は必要なかったため記載していなかったが、Cloud Functionsでトリガーにより実行する場合は暗黙的に引数が渡されるため関数で引数が必要なようだった(参考)(ドキュメントでの明確な言及は見つけられなかったが、トリガーにより実行する以上、引数が必要なのは当然ということなのかもしれない)。
関数の引数を追加することで解決した。
3) returnの不足
特に何も返さない関数にしていたところ、以下のエラーが返ってきた。
TypeError( TypeError: The view function did not return a valid response. 
The function either returned None or ended without a return statement.
関数の内容上、特に必要なかったが関数にreturnを追加することで解決した。

C. Cloud Storageにオブジェクトが追加されたらBigQueryにデータを読み込む

A, Bを試す中で今回やりたいことに必要な概要がつかめたので、最終的にやりたかった内容の関数を作る。
B.ではSelectクエリの実行だったけど、この部分をCSVデータの読み込みに置き換える必要がある。この部分はCloud Storage からの CSV データの読み込み | BigQuery ドキュメントを参照した。
① 関数をデプロイする
関数のデプロイ手順自体はA.と同様。
任意の名称のCSVファイルが追加されたら、BigQueryにデータの追加を行うようにした。
main.py

from google.cloud import bigquery


def append_data_into_bigquery(table_id, uri):
    client = bigquery.Client()

    job_config = bigquery.LoadJobConfig(
      autodetect=True,  # スキーマの自動検出
      write_disposition=bigquery.WriteDisposition.WRITE_APPEND,  # データの追加
      skip_leading_rows=1  # 冒頭1行は今回ヘッダ行なので読み飛ばし
    )

    load_job = client.load_table_from_uri(
      uri,
      table_id,
      job_config=job_config
    )
    load_job.result()

    table = client.get_table(table_id)
    print("Loaded {} rows to table {}".format(table.num_rows, table_id))


def append_weather_data_into_bigquery(event, context):
    if event['name'].endswith('weather_tokyo.csv') == True:
        project_id = 'learn-bigquery-327203'
        bq_dataset = 'level2_from_gcs'
        bq_table = 'weather_tokyo'
        table_id = project_id + '.' + bq_dataset + '.' + bq_table  # project.dataset.table_name
        uri = 'gs://' + event['bucket'] + '/' + event['name']  # gs://bucket_name/object_name_or_glob
        append_data_into_bigquery(table_id, uri)
requirements.txt
google-cloud-bigquery>=1.28.0
② 関数が実行されることを確認する
Cloud StorageにCSVファイルをアップロードしたことをトリガーとして、BigQueryにデータがインサートされるかを確認する。 Cloud Functionsのログでは関数の実行が完了した旨が出ている。
BigQueryでも、データが追加されていることが確認できた。

次:

今回、Cloud StorageにCSVデータが追加されたら自動でBigQueryにデータを読み込むことをやってみた。しかしながら、CSVは予め手動でBigQueryに受け入れられる形式に変換していた。この部分も自動化できそうなので、次回はその辺りもGCP上で行う方法を調べてみたい。