Skip to content

Commit

Permalink
readmes and gcp pubsub receiver
Browse files Browse the repository at this point in the history
  • Loading branch information
pisuke committed May 5, 2023
1 parent c279fdc commit 89f20e6
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 0 deletions.
41 changes: 41 additions & 0 deletions bos-testing/gcp-pubsub-receive/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# gcp-pubsub-receive

This command line program allows to see the message stream of a specific devices or class of devices with a common abbreviation.

## Installation

`python3 -m pip install -r requirements`

## Usage

```
$ gcloud auth login
$ gcloud auth application-default login
$ ./gcp-pubsub-receive.py -h
_ _
__ _ ___ _ __ _ __ _ _| |__ ___ _ _| |__
/ _` |/ __| '_ \ _____| '_ \| | | | '_ \/ __| | | | '_ \
| (_| | (__| |_) |_____| |_) | |_| | |_) \__ \ |_| | |_) |
\__, |\___| .__/ | .__/ \__,_|_.__/|___/\__,_|_.__/
|___/ |_| |_|
_
_ __ ___ ___ ___(_)_ _____
| '__/ _ \/ __/ _ \ \ \ / / _ \
| | | __/ (_| __/ |\ V / __/
|_| \___|\___\___|_| \_/ \___|
usage: gcp-pubsub-receive.py [-h] [-v] [-p PROJECT] [-s SUB] [-d DEVICE] [-t TIMEOUT]
options:
-h, --help show this help message and exit
-v, --verbose increase the verbosity level
-p PROJECT, --project PROJECT
GCP project id (required)
-s SUB, --sub SUB GCP PubSub subscription (required)
-d DEVICE, --device DEVICE
device name or abbreviation (required, for all devices use "all")
-t TIMEOUT, --timeout TIMEOUT
time interval in seconds for which to receive messages (optional, default=60 seconds)
```
106 changes: 106 additions & 0 deletions bos-testing/gcp-pubsub-receive/gcp-pubsub-receive.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
#!/usr/bin/env python3
"""
gcp-pubsub-receive.py
Read data stream for specific devices.
[Usage]
python3 gcp-pubsub-receive.py -p PROJECT_ID -s SUBSCRIPTION_ID -d TARGET_DEVICE -t TIMEOUT
"""

__author__ = "Francesco Anselmo"
__copyright__ = "Copyright 2023"
__credits__ = ["Francesco Anselmo"]
__license__ = "MIT"
__version__ = "0.1"
__maintainer__ = "Francesco Anselmo"
__email__ = "[email protected]"
__status__ = "Dev"

from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1
import json
import base64
import argparse
from pyfiglet import *

TARGET_DEVICE_ID = ""

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
global TARGET_DEVICE_ID
#print(message.attributes)
#print(dir(message.attributes))
#print(base64.b64decode(message))
#attributes = json.loads(base64.b64decode(message))['attributes']
device_id = message.attributes['deviceId']
#print(device_id)
if TARGET_DEVICE_ID!="all" and TARGET_DEVICE_ID in device_id:
print(f"Received {message}.")
#body = base64.b64decode(message.data)
body = message.data
print(body)
elif TARGET_DEVICE_ID=="all":
print(f"Received {message}.")
body = message.data
print(body)
message.ack()


def show_title():
"""Show the program title
"""
f1 = Figlet(font='standard')
print(f1.renderText('gcp-pubsub'))
print(f1.renderText('receive'))

def main():
global TARGET_DEVICE_ID

show_title()

parser = argparse.ArgumentParser()
group = parser.add_mutually_exclusive_group()
group.add_argument("-v", "--verbose", action="store_true", default=False, help="increase the verbosity level")
parser.add_argument("-p","--project", default="", help="GCP project id (required)")
parser.add_argument("-s","--sub", default="", help="GCP PubSub subscription (required)")
parser.add_argument("-d", "--device", default="", help="device name or abbreviation (required, for all devices use \"all\")")
parser.add_argument("-t","--timeout", default="60", help="time interval in seconds for which to receive messages (optional, default=60 seconds)")

args = parser.parse_args()

if args.verbose:
print("program arguments:")
print(args)


if args.project!="" and args.sub!="" and args.device!="":
PROJECT_ID = args.project
SUBSCRIPTION_ID = args.sub
TARGET_DEVICE_ID = args.device

# Number of seconds the subscriber should listen for messages
TIMEOUT = 60.0

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(PROJECT_ID, SUBSCRIPTION_ID)

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
try:
# When `timeout` is not set, result() will block indefinitely,
# unless an exception is encountered first.
streaming_pull_future.result(timeout=TIMEOUT)
except TimeoutError:
streaming_pull_future.cancel() # Trigger the shutdown.
streaming_pull_future.result() # Block until the shutdown is complete.

else:
print("Please run ""%s -h"" to see the program options" % sys.argv[0])

if __name__ == "__main__":
main()
1 change: 1 addition & 0 deletions bos-testing/gcp-pubsub-receive/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
google-cloud-pubsub

0 comments on commit 89f20e6

Please sign in to comment.