Skip to content

Commit

Permalink
Added support to stream to BQ
Browse files Browse the repository at this point in the history
  • Loading branch information
EricDLarson committed Oct 29, 2020
1 parent 6a57926 commit 5587546
Showing 1 changed file with 40 additions and 4 deletions.
44 changes: 40 additions & 4 deletions python/list_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import json
from apiclient.discovery import build
from google.oauth2 import service_account
from google.cloud import bigquery

parser = argparse.ArgumentParser()
parser.add_argument(
Expand Down Expand Up @@ -47,6 +48,16 @@
required=False,
action='store_true',
help='Return only unjoined events')
parser.add_argument(
'--bq_table',
required=False,
type=str,
help='Stream events to a BigQuery table')
parser.add_argument(
'-v', '--verbose',
required=False,
action='store_true',
help='Verbose output')

args = parser.parse_args()

Expand All @@ -61,7 +72,6 @@
if args.events_missing_catalog_items:
filter_string = (filter_string + ' eventsMissingCatalogItems')


SCOPES = ['https://www.googleapis.com/auth/cloud-platform']
SERVICE_ACCOUNT_FILE = args.service_account
credentials = service_account.Credentials.from_service_account_file(
Expand All @@ -71,6 +81,9 @@
# But the above creditals code may be somewhat more "correct"
# os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = args.service_account

if args.bq_table:
bq = bigquery.Client(credentials=credentials)

service = build('recommendationengine',
'v1beta1',
discoveryServiceUrl=
Expand All @@ -82,6 +95,8 @@
next_page = ''
while next_page is not None:

if args.verbose:
print('Getting Recommendations Events')
request = service.projects().locations().catalogs().eventStores()\
.userEvents().list(
parent='projects/'+ args.project +
Expand All @@ -98,13 +113,34 @@
next_page = None

try:
bq_list = []
for event in response['userEvents']:
try: # Add currencyCode since it's not returned currently (b/130748472)
for i in range(len(event['productEventDetail']['productDetails'])):
event['productEventDetail']['productDetails'][i]['currencyCode'] = (
'USD')
except KeyError:
except KeyError as err:
pass
print(json.dumps(event))
except KeyError:

if args.bq_table:
# Remove custom attributes
if 'productEventDetail' in event and 'productDetails' in event['productEventDetail']:
for item in event['productEventDetail']['productDetails']:
del(item['itemAttributes'])

bq_list.append(event)
else:
print(json.dumps(event))

if args.bq_table:
if args.verbose:
print ('Writing to BigQuery')
errors = bq.insert_rows_json(args.bq_table, bq_list)
if errors:
print(json.dumps(errors))
exit()
else:
if args.verbose:
print('Wrote ' + str(len(bq_list)) + ' events to BiqQuery')
except KeyError as err:
pass

0 comments on commit 5587546

Please sign in to comment.