Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft changes for sharding cacheops #68

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 177 additions & 0 deletions bench_invalidation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
# coding: utf-8

# Синтетика для алгоритмов инвалидации
# - с транзакцией
# - с локами без объекдинения ключей на клиенте (redis, twem)
# - с локами и объединением ключей на клиенте (redis, twem)
import functools
from random import randint
from timeit import timeit
from uuid import uuid4
import sys
import os
import redis

os.environ['DJANGO_SETTINGS_MODULE'] = 'tests.settings'

from cacheops.invalidation import redis_lock_acquire, redis_lock_release


def native(redis_client, conjs_keys):
""" Оригинальный алгоритм с транзакцией на redis
"""

# Reading scheme version, cache_keys and deleting invalidators in
# a single transaction.
def _invalidate_conjs(pipe):
# get schemes version to check later that it's not obsolete
# pipe.get(cache_schemes.get_version_key(model))
# Get a union of all cache keys registered in invalidators
pipe.sunion(conjs_keys)
# `conjs_keys` are keys of sets containing `cache_keys` we are going to delete,
# so we'll remove them too.
# NOTE: There could be some other invalidators not matched with current object,
# which reference cache keys we delete, they will be hanging out for a while.
pipe.delete(*conjs_keys)

# NOTE: we delete fetched cache_keys later which makes a tiny possibility that something
# will fail in the middle leaving those cache keys hanging without invalidators.
# The alternative WATCH-based optimistic locking proved to be pessimistic.
cache_keys, _ = redis_client.transaction(_invalidate_conjs)
if cache_keys:
redis_client.delete(*cache_keys)


def twem_proxy_compat(redis_client, conjs_keys):
""" Алгоритм на программных локах, совместимый с twem proxy - базовая
версия.
"""
for ck in conjs_keys:
lock_key = redis_lock_acquire("lock:" + ck, redis_client=redis_client)
pipe = redis_client.pipeline(transaction=False)
pipe.smembers(ck)
pipe.delete(ck)
cache_keys, _ = pipe.execute()
redis_lock_release(lock_key, redis_client=redis_client)
if cache_keys:
redis_client.delete(*cache_keys)

def twem_proxy_compat_opt_1(redis_client, conjs_keys):
""" Совместимый с twem proxy - все операции за 2 pipeline.
"""
# Тут придётся лочить все ключи модели
lock_key = redis_lock_acquire("lock:model_name", redis_client=redis_client)
pipe = redis_client.pipeline(transaction=False)
for ck in conjs_keys:
pipe.smembers(ck)
pipe.delete(ck)
redis_lock_release(lock_key, redis_client=redis_client)

results = pipe.execute()
pipe = redis_client.pipeline(transaction=False)
for cache_keys in results[::2]:
if cache_keys:
pipe.delete(*cache_keys)
pipe.execute()


def twem_proxy_compat_opt_2(redis_client, conjs_keys):
""" Совместимый с twem proxy - слияние всех cache key на клиенте.
"""
lock_key = redis_lock_acquire("lock:model_name", redis_client=redis_client)
pipe = redis_client.pipeline(transaction=False)
for ck in conjs_keys:
pipe.smembers(ck)
pipe.delete(ck)
results = pipe.execute()
redis_lock_release(lock_key, redis_client=redis_client)

s = set()
for cache_keys in results[::2]:
if cache_keys:
s |= cache_keys
redis_client.delete(*cache_keys)


def twem_proxy_compat_opt_3(redis_client, conjs_keys):
""" Совместимый с twem proxy - допускаем что все conj_keys на одном
сервере и возвращаем sunion.

Допущение возможно за счёт настройки twemproxy - hash_tag
"""
lock_key = redis_lock_acquire("lock:model_name", redis_client=redis_client)
pipe = redis_client.pipeline(transaction=False)
pipe.sunion(*conjs_keys)
pipe.delete(*conjs_keys)
cache_keys, _ = pipe.execute()
redis_lock_release(lock_key, redis_client=redis_client)

if cache_keys:
redis_client.delete(*cache_keys)


def generate_conjs_keys_map(cache_keys=100, conjs_keys=100):
conjs_keys_map = {'conj:model_name:' + uuid4().hex:
[randint(1, 100) for j in xrange(1, cache_keys)]
for i in xrange(1, conjs_keys)}

return conjs_keys_map


def populate_conjs_keys(redis_client, conjs_keys_map):
conjs_keys = []
pipe = redis_client.pipeline(transaction=False)
for conjs_key, cache_keys in conjs_keys_map.items():
for k in cache_keys:
pipe.set(k, 1)
pipe.sadd(conjs_key, *cache_keys)
conjs_keys.append(conjs_key)
pipe.execute()
return conjs_keys



REDIS_CONF = {
'host': '192.168.144.151',
'port': 6379,
'db': 0,
'socket_timeout': 60}

TWEM_CONF = {
'host': '192.168.144.170',
'port': 6379,
'socket_timeout': 60}


def run_bench():

redis_client = redis.StrictRedis(**REDIS_CONF)
twem_client = redis.StrictRedis(**TWEM_CONF)

cases = ((10, 10), (100, 100), (100, 1000), (1000, 100))

for conjs_keys_num, cache_keys_num in cases:

print "\nConj keys: {}, cache keys: {}\n".format(conjs_keys_num,
cache_keys_num)

conjs_keys_map = generate_conjs_keys_map(conjs_keys_num, cache_keys_num)

funcs = (native, twem_proxy_compat, twem_proxy_compat_opt_1,
twem_proxy_compat_opt_2, twem_proxy_compat_opt_3)
for f in funcs:
for name, cli in (('Redis', redis_client), ('TWEM', twem_client)):
try:
print "Backend: {} - {}".format(name, f.__name__)
if name == 'TWEM' and f.__name__ == 'native':
raise RuntimeError('unsupported')
conjs_keys = populate_conjs_keys(cli, conjs_keys_map)
stmt = functools.partial(f, cli, conjs_keys)
res = timeit(stmt, number=1)
print "\t time: {}".format(res)
except Exception as e:
print "\t error: {}".format(e)
#raise

if __name__ == '__main__':
run_bench()
8 changes: 8 additions & 0 deletions cacheops/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@
# Support degradation on redis fail
DEGRADE_ON_FAILURE = getattr(settings, 'CACHEOPS_DEGRADE_ON_FAILURE', False)

# Provide atomic operations by setting lock key instead of using redis
# transaction, by default disabled
USE_SOFT_LOCK = getattr(settings, 'CACHEOPS_USE_SOFT_LOCK', False)

# Expire time of lock keys in miliseconds, by default 1000
SOFT_LOCK_TIMEOUT = getattr(settings, 'CACHEOPS_SOFT_LOCK_TIMEOUT', 1000)


def handle_connection_failure(func):
if not DEGRADE_ON_FAILURE:
return func
Expand Down
101 changes: 76 additions & 25 deletions cacheops/invalidation.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# -*- coding: utf-8 -*-
import contextlib
import time
import six
from cacheops.conf import redis_client, handle_connection_failure
from cacheops.conf import redis_client, handle_connection_failure, USE_SOFT_LOCK, SOFT_LOCK_TIMEOUT
from cacheops.utils import get_model_name, non_proxy


Expand All @@ -20,6 +22,9 @@ def conj_cache_key_from_scheme(model, scheme, obj):
return 'conj:%s:' % get_model_name(model) \
+ '&'.join('%s=%s' % (f, getattr(obj, f)) for f in scheme)

def conj_lock_key(model):
return 'conj:%s:lock' % get_model_name(model)


class ConjSchemes(object):
"""
Expand All @@ -40,13 +45,19 @@ def get_version_key(self, model_or_name):
model_or_name = get_model_name(model_or_name)
return 'schemes:%s:version' % model_or_name

def get_schemes_lock_key(self, model_or_name):
if not isinstance(model_or_name, six.string_types):
model_or_name = get_model_name(model_or_name)
return "schemes:%s:lock" % model_or_name

def load_schemes(self, model):
model_name = get_model_name(model)

txn = redis_client.pipeline()
txn.get(self.get_version_key(model))
txn.smembers(self.get_lookup_key(model_name))
version, members = txn.execute()
lock_key = self.get_schemes_lock_key(model_name)
with atomic_redis_pipeline(lock_key) as txn:
txn.get(self.get_version_key(model))
txn.smembers(self.get_lookup_key(model_name))
version, members = txn.execute()

self.local[model_name] = set(map(deserialize_scheme, members))
self.local[model_name].add(()) # Всегда добавляем пустую схему
Expand Down Expand Up @@ -84,13 +95,13 @@ def ensure_known(self, model, new_schemes):
schemes = self.load_schemes(model)
if new_schemes - schemes:
# Write new schemes to redis
txn = redis_client.pipeline()
txn.incr(self.get_version_key(model_name)) # Увеличиваем версию схем

lookup_key = self.get_lookup_key(model_name)
for scheme in new_schemes - schemes:
txn.sadd(lookup_key, serialize_scheme(scheme))
txn.execute()
lock_key = self.get_schemes_lock_key(model_name)
with atomic_redis_pipeline(lock_key) as txn:
txn.incr(self.get_version_key(model_name))
lookup_key = self.get_lookup_key(model_name)
for scheme in new_schemes - schemes:
txn.sadd(lookup_key, serialize_scheme(scheme))
txn.execute()

# Updating local version
self.local[model_name].update(new_schemes)
Expand All @@ -114,6 +125,36 @@ def clear_all(self):
cache_schemes = ConjSchemes()


class LockWaitTimeout(Exception):
""" Error raised if soft lock could not be acquired for limited time.
"""


@contextlib.contextmanager
def atomic_redis_pipeline(lock_key, timeout_ms=SOFT_LOCK_TIMEOUT,
redis_client=redis_client):
""" Context manager which provide atomic Redis pipeline.

There are two different atomic strategies:
- Redis native transaction by default
- Setting lock key if CACHEOPS_USE_SOFT_LOCK = True

May raise LockWaitTimeout if can`t get lock for 2 * timeout_ms
"""
try:
if USE_SOFT_LOCK:
start_time = time.time()
while not redis_client.set(lock_key, "1", nx=True, px=timeout_ms):
# Something like spin-lock, may be highly CPU expensive.
if int(time.time() - start_time) * 1000 > 2 * timeout_ms:
# Prevent infinite loop
raise LockWaitTimeout(lock_key)
# If soft lock is used then transaction is not needed
yield redis_client.pipeline(transaction=not USE_SOFT_LOCK)
finally:
if USE_SOFT_LOCK:
redis_client.delete(lock_key)

@handle_connection_failure
def invalidate_obj(obj):
"""
Expand All @@ -132,21 +173,31 @@ def invalidate_obj(obj):

# Reading scheme version, cache_keys and deleting invalidators in
# a single transaction.
def _invalidate_conjs(pipe):
# get schemes version to check later that it's not obsolete
try:
lock_key = conj_lock_key(model)
with atomic_redis_pipeline(lock_key) as pipe:
# get schemes version to check later that it's not obsolete
pipe.get(cache_schemes.get_version_key(model))
# Get a union of all cache keys registered in invalidators
pipe.sunion(conjs_keys)
# `conjs_keys` are keys of sets containing `cache_keys` we are going to delete,
# so we'll remove them too.
# NOTE: There could be some other invalidators not matched with current object,
# which reference cache keys we delete, they will be hanging out for a while.
pipe.delete(*conjs_keys)

# NOTE: we delete fetched cache_keys later which makes a tiny possibility that something
# will fail in the middle leaving those cache keys hanging without invalidators.
# The alternative WATCH-based optimistic locking proved to be pessimistic.
version, cache_keys, _ = pipe.execute()
except LockWaitTimeout:
# Looks no good, but we still can clear cache_keys. Conjs_keys
# will be cleared later, hope so.
pipe = redis_client.pipeline(transaction=False)
pipe.get(cache_schemes.get_version_key(model))
# Get a union of all cache keys registered in invalidators
pipe.sunion(conjs_keys)
# `conjs_keys` are keys of sets containing `cache_keys` we are going to delete,
# so we'll remove them too.
# NOTE: There could be some other invalidators not matched with current object,
# which reference cache keys we delete, they will be hanging out for a while.
pipe.delete(*conjs_keys)

# NOTE: we delete fetched cache_keys later which makes a tiny possibility that something
# will fail in the middle leaving those cache keys hanging without invalidators.
# The alternative WATCH-based optimistic locking proved to be pessimistic.
version, cache_keys, _ = redis_client.transaction(_invalidate_conjs)
version, cache_keys = pipe.execute()

if cache_keys:
redis_client.delete(*cache_keys)

Expand Down
Loading