Skip to content

Commit

Permalink
poor mans connection pool
Browse files Browse the repository at this point in the history
  • Loading branch information
elventear committed Jun 18, 2015
1 parent 3766f45 commit 750236a
Showing 1 changed file with 64 additions and 53 deletions.
117 changes: 64 additions & 53 deletions src/web.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
import functools
import random
import uuid
import multiprocessing
import queue
import contextlib

import pytz
import postgresql
Expand All @@ -15,7 +18,8 @@
MIME_JSON = 'application/json'

SERVICES = {}
DB_URI = None

DB_POOL = None

APP = Flask(__name__)

Expand Down Expand Up @@ -44,9 +48,10 @@ def log_access(f):

@functools.wraps(f)
def w(*a, **kw):
db = connect_db()
if DB_POOL is None:
return

if db is not None:
with DB_POOL() as db:
e = request.environ

LOCAL_ADDR = socket.gethostbyname(e['HTTP_HOST'].replace(':%s' % e['SERVER_PORT'], ''))
Expand Down Expand Up @@ -82,20 +87,19 @@ def end():
@APP.route('/logs')
@log_access
def logs():
db = connect_db()

ps = db.prepare("""
SELECT *
FROM
(SELECT time, src_ip, src_port, dst_ip, dst_port,
http_method, http_path, user_agent
FROM pyapp_log
ORDER BY time DESC
LIMIT 100) AS a
ORDER BY time ASC
""")

return LogTable(Log(*x) for x in ps).__html__()
with DB_POOL() as db:
ps = db.prepare("""
SELECT *
FROM
(SELECT time, src_ip, src_port, dst_ip, dst_port,
http_method, http_path, user_agent
FROM pyapp_log
ORDER BY time DESC
LIMIT 100) AS a
ORDER BY time ASC
""")

return LogTable(Log(*x) for x in ps).__html__()

@APP.route('/env')
@log_access
Expand All @@ -114,10 +118,25 @@ def get_env_config(key, default_val=None, val_type=lambda x: x):
return default_val
return val_type(os.environ[key])

# http://stackoverflow.com/questions/98687/what-is-the-best-solution-for-database-connection-pooling-in-python
def pool(ctor, limit=None):
local_pool = queue.Queue()
n = multiprocessing.Value('i', 0)
@contextlib.contextmanager
def pooled(ctor=ctor, lpool=local_pool, n=n):
# block iff at limit
try: i = lpool.get(limit and n.value >= limit)
except queue.Empty:
n.value += 1
i = ctor()
yield i
lpool.put(i)
return pooled

def read_db_info():
global DB_URI
global DB_POOL

if DB_URI is None:
if DB_POOL is None:
for k in SERVICES:
if k.startswith('postgresql'):
try:
Expand All @@ -126,49 +145,41 @@ def read_db_info():
continue

if uri.startswith('postgres://'):
DB_URI = uri.replace('postgres://', 'pq://')
db_uri = uri.replace('postgres://', 'pq://')
DB_POOL = pool(lambda: postgresql.open(db_uri), 20)
return

def connect_db():
if DB_URI is not None:
try:
return postgresql.open(DB_URI)
except postgresql.exceptions.ClientCannotConnectError:
print('Unable to connect to', DB_URI)
return

def tables_missing(db):
return db.query("SELECT count(*) FROM information_schema.tables WHERE table_name = 'pyapp_log'",)[0][0] == 0

def init_database():
db = connect_db()

if db is None:
if DB_POOL is None:
print('Database is not found')
return

if tables_missing(db):
print('Creating DB schema')
with db.xact():
db.execute("""CREATE TYPE http_method AS ENUM (
'GET', 'HEAD', 'POST', 'PUT', 'DELETE', 'TRACE', 'CONNECT')""")
db.execute("""
CREATE TABLE pyapp_log (
id SERIAL PRIMARY KEY,
time timestamptz NOT NULL,
src_ip inet NOT NULL ,
src_port integer NOT NULL,
dst_ip inet NOT NULL,
dst_port integer NOT NULL,
http_method http_method NOT NULL,
http_path text NOT NULL,
http_query text NOT NULL,
user_agent text NOT NULL
)
""")
db.execute("""CREATE INDEX pyapp_log_time_idx ON pyapp_log (time DESC)""")
else:
print('DB schema is up to date')
with DB_POOL() as db:
if tables_missing(db):
print('Creating DB schema')
with db.xact():
db.execute("""CREATE TYPE http_method AS ENUM (
'GET', 'HEAD', 'POST', 'PUT', 'DELETE', 'TRACE', 'CONNECT')""")
db.execute("""
CREATE TABLE pyapp_log (
id SERIAL PRIMARY KEY,
time timestamptz NOT NULL,
src_ip inet NOT NULL ,
src_port integer NOT NULL,
dst_ip inet NOT NULL,
dst_port integer NOT NULL,
http_method http_method NOT NULL,
http_path text NOT NULL,
http_query text NOT NULL,
user_agent text NOT NULL
)
""")
db.execute("""CREATE INDEX pyapp_log_time_idx ON pyapp_log (time DESC)""")
else:
print('DB schema is up to date')



Expand Down

0 comments on commit 750236a

Please sign in to comment.