Skip to content

Commit

Permalink
massive improvmenets.
Browse files Browse the repository at this point in the history
  • Loading branch information
Bennyelg authored and Bennyelg committed Sep 15, 2017
1 parent 87b4a79 commit 29f36f6
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 25 deletions.
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ Simple presto-db connector using nim. (Still under heavy development.)
## Usage:

```nim
import presto
let con = connect("host", 8889, "hive", "default", "benny")
import db_presto
let con = open("host", 8889, "hive", "default", "benny")
defer: con.close()
var cur = con.cursor()
cur.execute("SELECT NOW()")
Expand All @@ -37,4 +37,5 @@ I'll be happy to get any help, just work & pull request.


## TODO:
* A lot of tests.
* Table coursor.
* Tests.
61 changes: 40 additions & 21 deletions presto.nim → db_presto.nim
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ proc range(size: int): seq[int] =

type NoConnectionError = object of Exception
type CursorError = object of Exception
type QueryExecutionError = object of Exception
type NoTransactionError = object of Exception
type NotValidProtocolError = object of Exception

type
ResultSet = ref object
Expand All @@ -32,18 +34,28 @@ type
schema: string
source: string
sessionProps: string
poolInterval: string
poolInterval: int
username: string
tableCursor: bool
host: string
protocol: string
port: string
resultSet: ResultSet

type
Connection = ref object of RootObj
host: string
port: int
timeOutInSeconds: int
cur: Cursor

proc ping(this: Connection): bool =
var client = newHttpClient(timeout=this.timeOutInSeconds * 100)
var url = this.cur.protocol & "://" & this.host & ":" & this.port.intToStr
if client.get(url).status != "200 OK":
return false
return true

proc close*(this: Connection) =
# There is no Actual close so just cleaning the connection information.
this.cur = Cursor()
Expand All @@ -52,11 +64,14 @@ proc close*(this: Connection) =
discard

proc commit*(this: Connection) =
discard
raise newException(NoTransactionError, "Presto does not have transcations.")

proc cursor*(this: Connection): Cursor =
return this.cur

proc rollback*(this: Connection) =
raise newException(NoTransactionError, "Presto does not have transcations.")

proc getColumns*(this: Cursor): seq[string] =
return this.resultSet.columns

Expand All @@ -65,7 +80,7 @@ proc processResponse(this: Cursor, response: Response) =
raise newException(NoConnectionError, "Status code returned bad. %s" % response.status)
let data = parseJson(response.body)
if data.hasKey("error"):
raise newException(CursorError, data["error"].getStr)
raise newException(QueryExecutionError, data["error"]["message"].getStr)
var state = data["stats"]["state"].str
if not data.hasKey("nextUri"):
this.resultSet.nextUri = ""
Expand Down Expand Up @@ -96,41 +111,36 @@ proc processResponse(this: Cursor, response: Response) =
this.resultSet.state = state
return

proc rollback*(this: Connection) =
raise newException(NoTransactionError, "Presto does not have transcations")

proc execute*(this: Cursor, query: string) =
this.resultSet = ResultSet(nextUri: "", state: "STARTED", columns: @[])
var additional: seq[string] = @[]
if this.sessionProps.len != 0:
var additionalProperties = this.sessionProps.split(",")
var k = range(additionalProperties.len)
var evens = filter(zip(k, additionalProperties), proc (x: (int, string)): bool = x[0] mod 2 == 0)
var odds = filter(zip(k, additionalProperties), proc (x: (int, string)): bool = x[0] mod 2 != 0)
for ind in countup(0, evens.len - 1):
additional.add(format("$1=$2", evens[ind][1], odds[ind][1]))
additional.add(format("$1=$2", evens[ind][1], odds[ind][1]))
var client = newHttpClient()
var protocol = "http://"
var url = protocol & this.host & format(":$1", this.port) & "/v1/statement/"
var url = this.protocol & "://" & this.host & format(":$1", this.port) & "/v1/statement/"
client.headers = newHttpHeaders()
client.headers.add("X-Presto-Catalog", this.catalog)
client.headers.add("X-Presto-Schema", this.schema)
client.headers.add("X-Presto-Source", this.source)
client.headers.add("X-Presto-User", this.username)
if additional.len > 0:
client.headers.add("X-Presto-Session", additional.join(","))
this.resultSet.nextUri = url
this.resultSet.client = client
this.resultSet = ResultSet(nextUri: url, state: "STARTED", columns: @[], client: client)
var response = client.request(this.resultSet.nextUri, httpMethod = HttpPost, body = query)
this.processResponse(response)

proc fetchOne*(this: Cursor): seq[string] =
if this.resultSet.state == "FINISHED":
raise newException(CursorError, "No More Rows.")

if this.resultSet.data.len > 0:
return this.resultSet.data.pop()
elif this.resultSet.data.len == 0:
os.sleep(500) # Sleeps 500 miliseconds before each request.
os.sleep(this.poolInterval)
var response = this.resultSet.client.request(this.resultSet.nextUri, httpMethod = HttpGet)
this.processResponse(response)

Expand All @@ -150,34 +160,43 @@ proc fetchAll*(this: Cursor): seq[seq[string]] =
break
if this.resultSet.data.len > 0:
queryData.add(this.resultSet.data)
# Sleeps 500 miliseconds before each request.
os.sleep(100)
os.sleep(this.poolInterval)
var response = this.resultSet.client.request(this.resultSet.nextUri, httpMethod = HttpGet)
this.processResponse(response)
return queryData

proc connect*(host: string, port: int, catalog: string, schema: string,
username: string, source = "NimPresto", sessionProps = ""): Connection =
proc open*(host: string, port: int, protocol: string = "http", catalog: string, schema: string,
username: string, source = "NimPresto", poolInterval = 1,
sessionProps = "", tableCursor = false): Connection =
if protocol notin ["http", "https"]:
raise newException(NotValidProtocolError, "The protocol you specified is not valid protocol: %s" % protocol)
let cursor = Cursor(catalog: catalog,
schema: schema,
source: source,
sessionProps: sessionProps,
poolInterval: "1",
poolInterval: poolInterval * 1000,
tableCursor: tableCursor,
username: username,
host: host,
protocol: protocol,
port: port.intToStr)
return Connection(
let connection = Connection(
host: host,
port: port,
timeOutInSeconds: 10,
cur: cursor
)
if not connection.ping():
raise newException(NoConnectionError, "Failed to Established Connection.")
return connection


when isMainModule:
let con = connect("HOST", 8889, "hive", "dwh", "benny")
let con = open(host="host", port=8889, catalog="hive", schema="dwh", username="benny")
defer: con.close()
var cur = con.cursor()
cur.execute("SQL")
cur.execute("SELECT * FROM abc")
echo(cur.fetchOne())
#echo(cur.getColumns())
# for s in cur.fetchMany(10):
# echo(s)
Expand Down
2 changes: 1 addition & 1 deletion presto.nimble → db_presto.nimble
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Package

version = "0.5.0"
version = "0.7.0"
author = "Bennyelg"
description = "prestodb simple connector"
license = "MIT"
Expand Down

0 comments on commit 29f36f6

Please sign in to comment.