Streaming of Postgres through the binary protocol into Haskell. Uses attoparsec and some hand-written kernels for array extraction. Results are streamed into vectors or batched into serial or parallel Conduit pipelines for stream composition.
$ cabal install pg_stream.cabal
Connections
Connections to Postgres are established with the connect
function yielding
the connection object.
connect :: ConnSettings -> IO PQ.Connection
Connections are specified by a ConnSettings.
creds :: ConnSettings
creds = ConnSettings {_host = "localhost", _dbname="invest", _user="dbadmin"}
Connections are pooled per process. Connection pooling is specified by three parameters.
- Stripes: Stripe count. The number of distinct sub-pools to maintain. The smallest acceptable value is 1.
- Keep Alive: Amount of time for which an unused resource is kept open. The smallest acceptable value is 0.5 seconds.
- Affinity: Maximum number of resources to keep open per stripe. The smallest acceptable value is 1.
The default settings are:
defaultPoolSettings :: PoolSettings
defaultPoolSettings = PoolSettings { _stripes = 1, _keepalive = 10, _affinity = 10 }
Queries
Queries are executed using query
for statements that yield result sets or by
execute
for queries that return a status code.
query :: (FromRow r, ToSQL a) => PQ.Connection -> Query -> a -> IO [r]
execute :: (ToSQL a) => PQ.Connection -> Query -> a -> IO ()
For example:
run :: IO [Row]
run = do
conn <- connect creds
query conn sample args
SQL queries are constructed via quasiquoter ([sql| ... |]
) which generates a
Query
(newtype around a bytestring). Values and SQL fragments can be spliced
into this template as arguments.
{-# LANGUAGE QuasiQuotes #-}
sample :: Query
sample = [sql|
SELECT
deltas.sid AS sid,
EXTRACT(EPOCH FROM deltas.day) AS day,
(ohlcs :: float4[])
FROM deltas
INNER JOIN security_groupings ON deltas.sid = security_groupings.sid
INNER JOIN currentprice ON (
deltas.sid = currentprice.sid
AND deltas.DAY = currentprice.DAY
AND currentprice.val BETWEEN 0 AND 500
)
WHERE security_groupings.name = 'SP900'
AND deltas.day BETWEEN TO_TIMESTAMP({1}) AND TO_TIMESTAMP({2})
ORDER BY deltas.sid,
deltas.DAY ASC
{3}
;
|]
Arguments
If the types of arguments are constrained by inference then no annotations are necessary. Otherwise annotations are needed to refine the Num/String instances into concrete types so they can be serialized and sent to Postgres.
args :: (Int, Int, SQL)
args = ( 1335855600 , 1336374000 , "LIMIT 100000")
The conversion from Haskell to Postgres types is defined by the FromField/ToField typeclasses with the mapping given by.
Postgres | Haskell |
---|---|
int2 | Int16 |
int4 | Int32 |
int8 | Int64 |
float4 | Float |
float8 | Double |
numeric | Scientific |
uuid | UUID |
char | Char |
text | Text |
date | Day |
bytea | ByteString |
bool | Bool |
int4[] | Vector Int32 |
float4[] | Vector Float |
money | Fixed E3 |
null a | Maybe a |
If the result set type is given as Maybe a
then any missing value are
manifest as Nothing
values. And all concrete values are Just
.
Effectively makes errors from null values used in unchecked logic
unrepresentable as any function which consumes a potentially nullable field is
forced by the type system to handle both cases.
Streaming
stream :: (FromRow r, ToSQL a, MonadBaseControl IO m, MonadIO m) =>
PQ.Connection -- ^ Connection
-> Query -- ^ Query
-> a -- ^ Query arguments
-> Int -- ^ Batch size
-> C.Source m [r] -- ^ Source conduit
Parallel streams can be composed together Software Transactional Memory (STM) threads to synchronize the polling.
import Database.PostgreSQL.Stream.Parallel
parallelStream ::
PQ.Connection
-> (PQ.Connection -> Source (ResourceT IO) a) -- Source
-> Sink a (ResourceT IO) () -- Sink
-> IO ()
$ cabal sandbox init
$ cabal install --only-dependencies
$ cabal build
To attach to the Elsen compute engine:
$ cabal sandbox add-source path_to_tree
$ cabal haddock
Copyright (c) 2015 Elsen Inc. All rights reserved.