-
Notifications
You must be signed in to change notification settings - Fork 134
/
Copy pathupgrade_supercolumns_test.py
185 lines (156 loc) · 8.54 KB
/
upgrade_supercolumns_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
from distutils.version import LooseVersion
import os
import pytest
import logging
from dtest import Tester
from thrift_test import get_thrift_client
from tools.assertions import assert_all
from thrift_bindings.thrift010.Cassandra import (CfDef, Column, ColumnDef,
ColumnOrSuperColumn, ColumnParent,
ColumnPath, ColumnSlice,
ConsistencyLevel, CounterColumn,
Deletion, IndexExpression,
IndexOperator, IndexType,
InvalidRequestException, KeyRange,
KeySlice, KsDef, MultiSliceRequest,
Mutation, NotFoundException,
SlicePredicate, SliceRange,
SuperColumn)
from upgrade_tests.upgrade_manifest import indev_2_2_x, indev_3_0_x, indev_3_11_x, indev_4_0_x, indev_4_1_x, \
CASSANDRA_4_0
since = pytest.mark.since
logger = logging.getLogger(__name__)
# Use static supercolumn data to reduce total test time and avoid driver issues connecting to C* 1.2.
# The data contained in the SSTables is (name, {'attr': {'name': name}}) for the name in NAMES.
SCHEMA_PATH = os.path.join("./", "upgrade_tests", "supercolumn-data", "cassandra-2.0", "schema-2.0.cql")
TABLES_PATH = os.path.join("./", "upgrade_tests", "supercolumn-data", "cassandra-2.0", "supcols", "cols")
NAMES = [name.encode() for name in ["Alice", "Bob", "Claire", "Dave", "Ed", "Frank", "Grace"]]
@pytest.mark.upgrade_test
@since('2.2', max_version='3.99')
class TestSCUpgrade(Tester):
"""
Tests upgrade between a 2.0 cluster with predefined super columns and all other versions. Verifies data with both
CQL and Thrift.
"""
@pytest.fixture(autouse=True)
def fixture_add_additional_log_patterns(self, fixture_dtest_setup):
fixture_dtest_setup.allow_log_errors = True
fixture_dtest_setup.ignore_log_patterns = (
# This one occurs if we do a non-rolling upgrade, the node
# it's trying to send the migration to hasn't started yet,
# and when it does, it gets replayed and everything is fine.
r'Can\'t send migration request: node.*is down',
)
if fixture_dtest_setup.dtest_config.cassandra_version_from_build < '2.2':
_known_teardown_race_error = (
'ScheduledThreadPoolExecutor$ScheduledFutureTask@[0-9a-f]+ '
'rejected from org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor'
)
# don't alter ignore_log_patterns on the class, just the obj for this test
fixture_dtest_setup.ignore_log_patterns += [_known_teardown_race_error]
def prepare(self, num_nodes=1, cassandra_version=indev_2_2_x.version):
cluster = self.cluster
# Forcing cluster version on purpose
cluster.set_install_dir(version=cassandra_version)
self.install_nodetool_legacy_parsing()
self.fixture_dtest_setup.reinitialize_cluster_for_different_version()
if "memtable_allocation_type" in cluster._config_options:
del cluster._config_options['memtable_allocation_type']
cluster.populate(num_nodes).start()
return cluster
def verify_with_thrift(self):
# No more thrift in 4.0
if self.cluster.version() >= '4':
return
node = self.cluster.nodelist()[0]
host, port = node.network_interfaces['thrift']
client = get_thrift_client(host, port)
client.transport.open()
client.set_keyspace('supcols')
p = SlicePredicate(slice_range=SliceRange(''.encode(), ''.encode(), False, 1000))
for name in NAMES:
super_col_value = client.get_slice(name, ColumnParent("cols"), p, ConsistencyLevel.ONE)
logger.debug("get_slice(%s) returned %s" % (name, super_col_value))
assert name == super_col_value[0].column.value
def verify_with_cql(self, session):
session.execute("USE supcols")
expected = [[name.encode(), 'attr'.encode(), 'name', name.encode()] for name in ['Grace', 'Claire', 'Dave', 'Frank', 'Ed', 'Bob', 'Alice']]
assert_all(session, "SELECT * FROM cols", expected)
def _upgrade_super_columns_through_versions_test(self, upgrade_path):
cluster = self.prepare()
self.install_nodetool_legacy_parsing()
node1 = cluster.nodelist()[0]
node1.run_cqlsh(cmds="""CREATE KEYSPACE supcols WITH replication = {
'class': 'SimpleStrategy',
'replication_factor': '1'
};
USE supcols;
CREATE TABLE cols (
key blob,
column1 blob,
column2 text,
value blob,
PRIMARY KEY ((key), column1, column2)
) WITH COMPACT STORAGE AND
bloom_filter_fp_chance=0.010000 AND
caching='KEYS_ONLY' AND
comment='' AND
dclocal_read_repair_chance=0.000000 AND
gc_grace_seconds=864000 AND
index_interval=128 AND
read_repair_chance=0.100000 AND
replicate_on_write='true' AND
populate_io_cache_on_flush='false' AND
default_time_to_live=0 AND
speculative_retry='99.0PERCENTILE' AND
memtable_flush_period_in_ms=0 AND
compaction={'class': 'SizeTieredCompactionStrategy'} AND
compression={'sstable_compression': 'SnappyCompressor'};
""")
node1.bulkload(options=[TABLES_PATH])
node1.nodetool("upgradesstables -a")
session = self.patient_exclusive_cql_connection(node1)
self.verify_with_cql(session)
node1.nodetool("enablethrift")
self.verify_with_thrift()
for version in upgrade_path:
self.upgrade_to_version(version.version)
session = self.patient_exclusive_cql_connection(node1)
self.verify_with_cql(session)
if self.cluster.version() < CASSANDRA_4_0:
node1.nodetool("enablethrift")
self.verify_with_thrift()
cluster.remove(node=node1)
def test_upgrade_super_columns_through_all_versions(self):
self._upgrade_super_columns_through_versions_test(upgrade_path=[indev_2_2_x, indev_3_0_x,
indev_3_11_x, indev_4_0_x, indev_4_1_x])
def test_upgrade_super_columns_through_limited_versions(self):
self._upgrade_super_columns_through_versions_test(upgrade_path=[indev_3_0_x, indev_4_0_x])
def upgrade_to_version(self, tag, nodes=None):
logger.debug('Upgrading to ' + tag)
if nodes is None:
nodes = self.cluster.nodelist()
for node in nodes:
logger.debug('Shutting down node: ' + node.name)
node.drain()
node.watch_log_for("DRAINED")
node.stop(wait_other_notice=False)
# Update Cassandra Directory
for node in nodes:
node.set_install_dir(version=tag)
self.install_nodetool_legacy_parsing()
logger.debug("Set new cassandra dir for %s: %s" % (node.name, node.get_install_dir()))
self.cluster.set_install_dir(version=tag)
self.install_nodetool_legacy_parsing()
self.fixture_dtest_setup.reinitialize_cluster_for_different_version()
for node in nodes:
if tag < "2.1":
if "memtable_allocation_type" in node.config_options:
node.config_options.__delitem__("memtable_allocation_type")
# Restart nodes on new version
for node in nodes:
logger.debug('Starting %s on new version (%s)' % (node.name, tag))
# Setup log4j / logback again (necessary moving from 2.0 -> 2.1):
node.set_log_level("INFO")
node.start(wait_for_binary_proto=True)
node.nodetool('upgradesstables -a')