Skip to content

Commit

Permalink
Adding Apache Pinot Query Runner (getredash#5798)
Browse files Browse the repository at this point in the history
* Adding Apache Pinot integration

* address comments
  • Loading branch information
xiangfu0 authored Oct 7, 2022
1 parent 71458e5 commit a863c8c
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 0 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ Redash supports more than 35 SQL and NoSQL [data sources](https://redash.io/help
- MySQL
- Oracle
- Apache Phoenix
- Apache Pinot
- PostgreSQL
- Presto
- Prometheus
Expand Down
Binary file added client/app/assets/images/db-logos/pinot.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
134 changes: 134 additions & 0 deletions redash/query_runner/pinot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
try:
import pinotdb
enabled = True
except ImportError:
enabled = False

from redash.query_runner import BaseQueryRunner, register
from redash.query_runner import TYPE_DATETIME, TYPE_FLOAT, TYPE_STRING, TYPE_INTEGER, TYPE_BOOLEAN
from redash.utils import json_dumps

import requests
from requests.auth import HTTPBasicAuth
import logging

logger = logging.getLogger(__name__)

PINOT_TYPES_MAPPING = {
"BOOLEAN": TYPE_BOOLEAN,
"INT": TYPE_INTEGER,
"LONG": TYPE_INTEGER,
"FLOAT": TYPE_FLOAT,
"DOUBLE": TYPE_FLOAT,
"STRING": TYPE_STRING,
"BYTES": TYPE_STRING,
"JSON": TYPE_STRING,
"TIMESTAMP": TYPE_DATETIME,
}


class Pinot(BaseQueryRunner):
noop_query = "SELECT 1"
username = None
password = None

@classmethod
def configuration_schema(cls):
return {
"type": "object",
"properties": {
"brokerHost": {"type": "string", "default": ""},
"brokerPort": {"type": "number", "default": 8099},
"brokerScheme": {"type": "string", "default": "http"},
"controllerURI": {"type": "string", "default": ""},
"username": {"type": "string"},
"password": {"type": "string"},
},
"order": ["brokerScheme", "brokerHost", "brokerPort", "controllerURI", "username", "password"],
"required": ["brokerHost", "controllerURI"],
"secret": ["password"],
}

@classmethod
def enabled(cls):
return enabled

def __init__(self, configuration):
super(Pinot, self).__init__(configuration)
self.controller_uri = self.configuration.get("controllerURI")
self.username=(self.configuration.get("username") or None)
self.password=(self.configuration.get("password") or None)

def run_query(self, query, user):
logger.debug("Running query %s with username: %s, password: %s", query, self.username, self.password)
connection = pinotdb.connect(
host=self.configuration["brokerHost"],
port=self.configuration["brokerPort"],
path="/query/sql",
scheme=(self.configuration.get("brokerScheme") or "http"),
verify_ssl=False,
username=self.username,
password=self.password,
)

cursor = connection.cursor()

try:
cursor.execute(query)
logger.debug("cursor.schema = %s",cursor.schema)
columns = self.fetch_columns(
[(i["name"], PINOT_TYPES_MAPPING.get(i["type"], None)) for i in cursor.schema]
)
rows = [
dict(zip((column["name"] for column in columns), row)) for row in cursor
]

data = {"columns": columns, "rows": rows}
error = None
json_data = json_dumps(data)
logger.debug("Pinot execute query [%s]", query)
finally:
connection.close()

return json_data, error

def get_schema(self, get_stats=False):
schema = {}
for schema_name in self.get_schema_names():
for table_name in self.get_table_names():
schema_table_name = "{}.{}".format(schema_name, table_name)
if table_name not in schema:
schema[schema_table_name] = {"name": schema_table_name, "columns": []}
table_schema =self.get_pinot_table_schema(table_name)

for column in table_schema.get("dimensionFieldSpecs", []) + table_schema.get(
"metricFieldSpecs", []) + table_schema.get("dateTimeFieldSpecs", []):
c = {
"name": column["name"],
"type": PINOT_TYPES_MAPPING[column["dataType"]],
}
schema[schema_table_name]["columns"].append(c)
return list(schema.values())

def get_schema_names(self):
return ["default"]

def get_pinot_table_schema(self, pinot_table_name):
return self.get_metadata_from_controller("/tables/" + pinot_table_name + "/schema")

def get_table_names(self):
return self.get_metadata_from_controller("/tables")["tables"]

def get_metadata_from_controller(self, path):
url = self.controller_uri + path
r = requests.get(url, headers={"Accept": "application/json"}, auth= HTTPBasicAuth(self.username, self.password))
try:
result = r.json()
logger.debug("get_metadata_from_controller from path %s", path)
except ValueError as e:
raise pinotdb.exceptions.DatabaseError(
f"Got invalid json response from {self.controller_uri}:{path}: {r.text}"
) from e
return result

register(Pinot)
1 change: 1 addition & 0 deletions redash/settings/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ def email_server_is_configured():
"redash.query_runner.amazon_elasticsearch",
"redash.query_runner.trino",
"redash.query_runner.presto",
"redash.query_runner.pinot",
"redash.query_runner.databricks",
"redash.query_runner.hive_ds",
"redash.query_runner.impala_ds",
Expand Down
1 change: 1 addition & 0 deletions requirements_all_ds.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,4 @@ pandas==1.3.4
nzpy>=1.15
nzalchemy
python-arango==6.1.0
pinotdb>=0.4.5

0 comments on commit a863c8c

Please sign in to comment.