PythonでCloud Tasksを使用して分散処理

2023年1月5日

ゴール

サーバーを立ち上げて、curl -X POST http://localhost:8080/set_tasksを実行すると
「Cloud Tasksのコンソールで分散したデータを格納」することができる
グレーになっている部分はターミナルで実行

目次
  1. 前提
  2. 使用するライブラリ
  3. 手順
    1. キューの作成
    2. 既存のファイルを編集
    3. ライブラリをインストール
    4. 確認
  4. Tasksからデータを受け取る
  5. 参考にした資料

前提

「Hello World」が表示できる環境からスタート

使用するライブラリ

  • google-cloud-tasks

手順

① キューの作成
Google Cloud Platformのコンソール上で、Cloud Tasksから「push キューの作成を行う
キュー名:test
リージョン:asia-northeast1(東京)

② 既存のファイルを編集
requirements.txtに下記を追加

google-cloud-tasks

app.pyを下記に変更

import json
import os
from flask import Flask, request
from google.cloud import tasks_v2

app = Flask(__name__)

values = [1,2,3,4,5,6,7,8,9]

# tasksにセット
@app.route('/set_tasks', methods=['POST'])
def set_tasks():
    
    executions_data = []
    # 3つずつ分割する
    for i in range(0, len(values), 3):
        value = {"data":values[i: i + 3]}
        executions_data.append(value)
        
    for data in executions_data:
        client = tasks_v2.CloudTasksClient()
        project = 'プロジェクト名'
        location = 'asia-northeast1'
        queue = 'コンソールで作成したキュー名:test'
        parent = client.queue_path(project, location, queue)
        task = {
            "http_request": {  
                "http_method": tasks_v2.HttpMethod.POST,
                # TODO Runにしたら'RunのURL' + '今後の処理(例:/update)'
                "url": 'https://localhost:0000',  
            }
        }
        task["http_request"]["headers"] = {"Content-type": "application/json"}
        task["http_request"]["body"] = json.dumps(data).encode()
        client.create_task(request={"parent": parent, "task": task})
    return "ok"

if __name__ == "__main__":
    app.run(debug=True, host='0.0.0.0', port=int(os.environ.get('PORT', 8080)))

③ ライブラリをインストール
pip install -r requirements.txt

④ 確認
– ターミナルでpython3 app.py を実行すると、Running on http://....8080と表示される
– 別ターミナルでcurl -X POST http://localhost:8080/set_tasksを実行

→ Cloud Tasksのコンソールで、データ3個のタスクが3つあればOK!
(今回では3つのタスク、ペイロードに{"data": [1, 2, 3]}が入っている)

Tasksからデータを受け取る

@app.route("/update", methods=['POST'])
def update():
    tasks_data = request.json["data"]

先ほど記載した「'RunのURL' + '今後の処理(例:/update)'」で次の処理が上記
routeの/updateから処理が可能
また、request.json[“data"]でペイロードにあった{"data": [1, 2, 3]}らを使える

参考にした資料

gcp,tasks

Posted by shun