-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathcassandra_rw.py
47 lines (35 loc) · 1.34 KB
/
cassandra_rw.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import uuid
from cassandra.cqlengine import connection
from datetime import datetime
from cassandra.cqlengine.management import sync_table
import csv
from TxInfo import TxInfoModel
import pandas as pd
class CassandraReadWriteDb:
def __init__(self, ip_addrs, keyspace):
connection.setup( ip_addrs, keyspace, protocol_version=3)
def sync_class_table(self, typeOfClass):
self.typeOfClass = typeOfClass
sync_table(typeOfClass)
#Write CSV to cassandra
def write_file_table(self, credit_logs):
with open(credit_logs) as csv_file:
csv_reader = csv.DictReader(csv_file, delimiter=',')
for row in csv_reader:
self.typeOfClass.create(**dict(row))
#Read Cassandra data to pandas
def get_pandas_from_cassandra(self):
tx_info = pd.DataFrame()
for q in TxInfoModel.objects():
d = pd.DataFrame.from_records([q.values()])
tx_info = tx_info.append(d)
tx_info.columns = q.keys()
return tx_info
def write_json_table(self, data):
print (data)
self.typeOfClass.create(**dict(data))
if __name__ == '__main__':
cwd = CassandraReadWriteDb(ip_addrs=['172.17.0.2'], keyspace="emp")
cwd.sync_class_table(TxInfoModel)
#cwd.write_file_table('credit.csv')
print(cwd.get_pandas_from_cassandra())