forked from dragonflydb/dragonfly
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdefrag_mem_test.py
executable file
·176 lines (154 loc) · 6.89 KB
/
defrag_mem_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
#!/usr/bin/env python3
import asyncio
import aioredis
import async_timeout
import sys
import argparse
'''
To install: pip install -r requirements.txt
Run
dragonfly --mem_defrag_threshold=0.01 --commit_use_threshold=1.2 --mem_utilization_threshold=0.8
defrag_mem_test.py -k 800000 -v 645
This program would try to re-create the issue with memory defragmentation.
See issue number 448 for more details.
To run this:
You can just execute this from the command line without any arguemnts.
Or you can run with --help to see the options.
The defaults are:
number of keys: 800,000
value size: 64 bytes
key name pattern: key-for-testing
host: localhost
port: default redis port
Please note that this would create 4 * number of keys entries
You can see the memory usage/defrag state with the monitoring task that
prints the current state
NOTE:
If this seems to get stuck please kill it with ctrl+c
This can happen in case we don't have "defrag_realloc_total > 0"
'''
class TaskCancel:
def __init__(self):
self.run = True
def dont_stop(self):
return self.run
def stop(self):
self.run = False
async def run_cmd(connection, cmd, sub_val):
val = await connection.execute_command(cmd, sub_val)
return val
async def handle_defrag_stats(connection, prev):
info = await run_cmd(connection, "info", "stats")
if info is not None:
if info['defrag_task_invocation_total'] != prev:
print("--------------------------------------------------------------")
print(f"defrag_task_invocation_total: {info['defrag_task_invocation_total']:,}")
print(f"defrag_realloc_total: {info['defrag_realloc_total']:,}")
print(f"defrag_attempt_total: {info['defrag_attempt_total']:,}")
print("--------------------------------------------------------------")
if info["defrag_realloc_total"] > 0:
return True, None
return False, info['defrag_task_invocation_total']
return False, None
async def memory_stats(connection):
print("--------------------------------------------------------------")
info = await run_cmd(connection, "info", "memory")
print(f"memory commited: {info['comitted_memory']:,}")
print(f"memory used: {info['used_memory']:,}")
print(f"memory usage ratio: {info['comitted_memory']/info['used_memory']:.2f}")
print("--------------------------------------------------------------")
async def stats_check(connection, condition):
try:
defrag_task_invocation_total = 0;
runs=0
while condition.dont_stop():
await asyncio.sleep(0.3)
done, d = await handle_defrag_stats(connection, defrag_task_invocation_total)
if done:
print("defrag task successfully found memory locations to reallocate")
condition.stop()
else:
if d is not None:
defrag_task_invocation_total = d
runs += 1
if runs % 3 == 0:
await memory_stats(connection)
for i in range(5):
done, d = await handle_defrag_stats(connection, -1)
if done:
print("defrag task successfully found memory locations to reallocate")
return True
else:
await asyncio.sleep(2)
return True
except Exception as e:
print(f"failed to run monitor task: {e}")
return False
async def delete_keys(connection, keys):
results = await connection.delete(*keys)
return results
def generate_keys(pattern: str, count: int, batch_size: int) -> list:
for i in range(1, count, batch_size):
batch = [f"{pattern}{j}" for j in range(i, batch_size + i, 3)]
yield batch
async def mem_cleanup(connection, pattern, num, cond, keys_count):
counter=0
for keys in generate_keys(pattern=pattern, count=keys_count, batch_size=950):
if cond.dont_stop() == False:
print(f"task number {num} that deleted keys {pattern} finished")
return counter
counter += await delete_keys(connection, keys)
await asyncio.sleep(0.2)
print(f"task number {num} that deleted keys {pattern} finished")
return counter
async def run_tasks(pool, key_name, value_size, keys_count):
keys = [f"{key_name}-{i}" for i in range(4)]
stop_cond = TaskCancel()
try:
connection = aioredis.Redis(connection_pool=pool)
for key in keys:
print(f"creating key {key} with size {value_size} of count {keys_count}")
await connection.execute_command("DEBUG", "POPULATE", keys_count, key, value_size)
await asyncio.sleep(2)
tasks = []
count = 0
for key in keys:
pattern=f"{key}:"
print(f"deleting keys from {pattern}")
tasks.append(mem_cleanup(connection=connection, pattern=pattern, num=count, cond=stop_cond, keys_count=int(keys_count)))
count += 1
monitor_task = asyncio.create_task(stats_check(connection, stop_cond))
total = await asyncio.gather(*tasks, return_exceptions=True)
print(f"successfully deleted {sum(total)} keys")
stop_cond.stop()
await monitor_task
print("finish executing")
return True
except Exception as e:
print(f"got error {e} while running delete keys")
return False
def connect_and_run(key_name, value_size, keys_count, host="localhost", port=6379):
async_pool = aioredis.ConnectionPool(host=host, port=port,
db=0, decode_responses=True, max_connections=16)
loop = asyncio.new_event_loop()
success = loop.run_until_complete(run_tasks(pool=async_pool, key_name=key_name, value_size=value_size, keys_count=keys_count))
return success
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='active memory testing', formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('-k', '--keys', type=int, default=800000, help='total number of keys')
parser.add_argument('-v', '--value_size', type=int, default=645, help='size of the values')
parser.add_argument('-n', '--key_name', type=str, default="key-for-testing", help='the base key name')
parser.add_argument('-s', '--server', type=str, default="localhost", help='server host name')
parser.add_argument('-p', '--port', type=int, default=6379, help='server port number')
args = parser.parse_args()
keys_num = args.keys
key_name = args.key_name
value_size = args.value_size
host = args.server
port = args.port
print(f"running key deletion on {host}:{port} for keys {key_name} value size of {value_size} and number of keys {keys_num}")
result = connect_and_run(key_name=key_name, value_size=value_size, keys_count=keys_num, host=host, port=port)
if result == True:
print("finished successfully")
else:
print("failed")