PythonでCloud Tasksを使用して分散処理
ゴール
サーバーを立ち上げて、curl -X POST http://localhost:8080/set_tasks
を実行すると
「Cloud Tasksのコンソールで分散したデータを格納」することができる
※グレーになっている部分はターミナルで実行
- 前提
- 使用するライブラリ
- 手順
1. キューの作成
2. 既存のファイルを編集
3. ライブラリをインストール
4. 確認 - Tasksからデータを受け取る
- 参考にした資料
前提
「Hello World」が表示できる環境からスタート
使用するライブラリ
- google-cloud-tasks
手順
① キューの作成
Google Cloud Platformのコンソール上で、Cloud Tasksから「push キューの作成
」を行う
キュー名:test
リージョン:asia-northeast1(東京)
② 既存のファイルを編集
requirements.txtに下記を追加
google-cloud-tasks
app.pyを下記に変更
import json
import os
from flask import Flask, request
from google.cloud import tasks_v2
app = Flask(__name__)
values = [1,2,3,4,5,6,7,8,9]
# tasksにセット
@app.route('/set_tasks', methods=['POST'])
def set_tasks():
executions_data = []
# 3つずつ分割する
for i in range(0, len(values), 3):
value = {"data":values[i: i + 3]}
executions_data.append(value)
for data in executions_data:
client = tasks_v2.CloudTasksClient()
project = 'プロジェクト名'
location = 'asia-northeast1'
queue = 'コンソールで作成したキュー名:test'
parent = client.queue_path(project, location, queue)
task = {
"http_request": {
"http_method": tasks_v2.HttpMethod.POST,
# TODO Runにしたら'RunのURL' + '今後の処理(例:/update)'
"url": 'https://localhost:0000',
}
}
task["http_request"]["headers"] = {"Content-type": "application/json"}
task["http_request"]["body"] = json.dumps(data).encode()
client.create_task(request={"parent": parent, "task": task})
return "ok"
if __name__ == "__main__":
app.run(debug=True, host='0.0.0.0', port=int(os.environ.get('PORT', 8080)))
③ ライブラリをインストール
pip install -r requirements.txt
④ 確認
– ターミナルでpython3 app.py を実行すると、Running on http://....8080
と表示される
– 別ターミナルでcurl -X POST http://localhost:8080/set_tasks
を実行
→ Cloud Tasksのコンソールで、データ3個のタスクが3つあればOK!
(今回では3つのタスク、ペイロードに{"data": [1, 2, 3]}
が入っている)
Tasksからデータを受け取る
@app.route("/update", methods=['POST'])
def update():
tasks_data = request.json["data"]
先ほど記載した「'RunのURL' + '今後の処理(例:/update)'」
で次の処理が上記
routeの/update
から処理が可能
また、request.json[“data"]でペイロードにあった{"data": [1, 2, 3]}
らを使える!
ディスカッション
コメント一覧
まだ、コメントがありません