App EngineのテストケースでTaskQueueのテストを行う
App Engineのテストケースを書いてるときにTaskQueueが正しく登録されたかを確認する必要がある時がある。
その時の確認の方法について
前提
Kayをframeworkとして使っている。
コード
以下のような感じで、stubから取得を行う。
from google.appengine.ext import testbed
from google.appengine.api import apiproxy_stub_map
from kay.ext.testutils.gae_test_base import GAETestBase
class TaskQueueTest(GAETestBase):
def setUp(self):
# TaskQueueのスタブ取得
self.task_stub = apiproxy_stub_map.apiproxy.GetStub(testbed.TASKQUEUE_SERVICE_NAME)
# クリア
#self.task_stub.FlushQueue('default')
def tearDown(self):
def test_taskqueue(self):
tasks = self.task_stub.GetTasks(queue_name='default')
params = dict(map((lambda x: (x[0], urllib.unquote_plus(x[1].decode('utf-8')))), [tuple(p.split('=')) for p in urllib.unquote(base64.b64decode(tasks[2]['body'])).split('&')]))
tasksには複数のTaskQueueのTaskが入る。
1つずつのTaskは以下のようなデータになっている。
[{'name': 'task-123',
'queue_name': 'default',
'url': '/update',
'method': 'GET',
'eta': '2009/02/02 05:37:42',
'eta_delta': '0:00:06.342511 ago',
'body': '',
'headers': [('user-header', 'some-value')
('X-AppEngine-QueueName': 'update-queue'),
('X-AppEngine-TaskName': 'task-123'),
('X-AppEngine-TaskRetryCount': '0'),
('X-AppEngine-TaskETA': '1234567890.123456'),
('X-AppEngine-Development-Payload': '1'),
('Content-Length': 0),
('Content-Type': 'application/octet-stream')]
# google.appengine.api.taskqueue.taskqueue_stub.py より