forked from GoogleCloudPlatform/python-docs-samples
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathe2e_test.py
97 lines (83 loc) · 2.55 KB
/
e2e_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# Copyright 2020 Google, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This sample creates a secure two-service application running on Cloud Run.
# This test builds and deploys the two secure services
# to test that they interact properly together.
import os
import subprocess
from urllib import request
import uuid
import pytest
@pytest.fixture()
def services():
# Unique suffix to create distinct service names
suffix = uuid.uuid4().hex
project = os.environ["GOOGLE_CLOUD_PROJECT"]
# Build and Deploy Cloud Run Services
subprocess.run(
[
"gcloud",
"builds",
"submit",
f"--project={project}",
f"--substitutions=_SUFFIX={suffix}",
"--config=e2e_test_setup.yaml",
"--quiet",
],
check=True,
)
# Get the URL for the service and the token
service = subprocess.run(
[
"gcloud",
"run",
f"--project={project}",
"--platform=managed",
"--region=us-central1",
"services",
"describe",
f"helloworld-{suffix}",
"--format=value(status.url)",
],
stdout=subprocess.PIPE,
check=True,
).stdout.strip()
token = subprocess.run(
["gcloud", "auth", "print-identity-token"], stdout=subprocess.PIPE, check=True
).stdout.strip()
yield service, token
subprocess.run(
[
"gcloud",
"run",
"services",
"delete",
f"helloworld-{suffix}",
f"--project={project}",
"--platform=managed",
"--region=us-central1",
"--quiet",
],
check=True,
)
def test_end_to_end(services):
service = services[0].decode()
token = services[1].decode()
req = request.Request(
f"{service}/", headers={"Authorization": f"Bearer {token}"}
)
response = request.urlopen(req)
assert response.status == 200
body = response.read()
assert "Hello Test!" == body.decode()