forked from chdb-io/chdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy paths3uploader
executable file
·192 lines (166 loc) · 8.34 KB
/
s3uploader
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import logging
import argparse
import tarfile
import math
try:
from boto.s3.connection import S3Connection
from boto.s3.key import Key
except ImportError:
raise ImportError("You have to install boto package 'pip install boto'")
class S3API(object):
def __init__(self, access_key, secret_access_key, mds_api, mds_url):
self.connection = S3Connection(
host=mds_api,
aws_access_key_id=access_key,
aws_secret_access_key=secret_access_key,
)
self.mds_url = mds_url
def upload_file(self, bucket_name, file_path, s3_path):
logging.info("Start uploading file to bucket %s", bucket_name)
bucket = self.connection.get_bucket(bucket_name)
key = bucket.initiate_multipart_upload(s3_path)
logging.info("Will upload to s3 path %s", s3_path)
chunksize = 1024 * 1024 * 1024 # 1 GB
filesize = os.stat(file_path).st_size
logging.info("File size is %s", filesize)
chunkcount = int(math.ceil(filesize / chunksize))
def call_back(x, y):
print("Uploaded {}/{} bytes".format(x, y))
try:
for i in range(chunkcount + 1):
logging.info("Uploading chunk %s of %s", i, chunkcount + 1)
offset = chunksize * i
bytes_size = min(chunksize, filesize - offset)
with open(file_path, 'r') as fp:
fp.seek(offset)
key.upload_part_from_file(fp=fp, part_num=i+1,
size=bytes_size, cb=call_back,
num_cb=100)
key.complete_upload()
except Exception as ex:
key.cancel_upload()
raise ex
logging.info("Contents were set")
return "https://{bucket}.{mds_url}/{path}".format(
bucket=bucket_name, mds_url=self.mds_url, path=s3_path)
def set_file_contents(self, bucket, local_file_path, s3_file_path):
key = Key(bucket)
key.key = s3_file_path
file_size = os.stat(local_file_path).st_size
logging.info("Uploading file `%s` to `%s`. Size is %s", local_file_path, s3_file_path, file_size)
def call_back(x, y):
print("Uploaded {}/{} bytes".format(x, y))
key.set_contents_from_filename(local_file_path, cb=call_back)
def upload_data_for_static_files_disk(self, bucket_name, directory_path, s3_path):
bucket = self.connection.get_bucket(bucket_name)
if s3_path.endswith("/"):
s3_path += "store/"
else:
s3_path += "/store/"
print(s3_path)
for root, dirs, files in os.walk(directory_path):
path = root.split(os.sep)
for file in files:
local_file_path = os.path.join(root, file)
s3_file = local_file_path[len(directory_path) + 1:]
s3_file_path = os.path.join(s3_path, s3_file)
self.set_file_contents(bucket, local_file_path, s3_file_path)
logging.info("Uploading finished")
return "https://{bucket}.{mds_url}/{path}".format(bucket=bucket_name, mds_url=self.mds_url, path=s3_path)
def list_bucket_keys(self, bucket_name):
bucket = self.connection.get_bucket(bucket_name)
for obj in bucket.get_all_keys():
print(obj.key)
def remove_folder_from_bucket(self, bucket_name, folder_path):
bucket = self.connection.get_bucket(bucket_name)
bucket.get_all_keys()
for obj in bucket.get_all_keys():
if obj.key.startswith(folder_path):
print('Removing ' + obj.key)
obj.delete()
def make_tar_file_for_table(clickhouse_data_path, db_name, table_name,
tmp_prefix):
relative_data_path = os.path.join('data', db_name, table_name)
relative_meta_path = os.path.join('metadata', db_name, table_name + '.sql')
path_to_data = os.path.join(clickhouse_data_path, relative_data_path)
path_to_metadata = os.path.join(clickhouse_data_path, relative_meta_path)
temporary_file_name = tmp_prefix + '/{tname}.tar'.format(tname=table_name)
with tarfile.open(temporary_file_name, "w") as bundle:
bundle.add(path_to_data, arcname=relative_data_path)
bundle.add(path_to_metadata, arcname=relative_meta_path)
return temporary_file_name
USAGE_EXAMPLES = '''
examples:
\t./s3uploader --dataset-name some_ds --access-key-id XXX --secret-access-key YYY --clickhouse-data-path /opt/clickhouse/ --table-name default.some_tbl --bucket-name some-bucket
\t./s3uploader --dataset-name some_ds --access-key-id XXX --secret-access-key YYY --file-path some_ds.tsv.xz --bucket-name some-bucket --s3-path /path/to/
'''
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s')
parser = argparse.ArgumentParser(
description="Simple tool for uploading datasets to clickhouse S3",
usage='%(prog)s [options] {}'.format(USAGE_EXAMPLES))
parser.add_argument('--s3-api-url', default='s3.amazonaws.com')
parser.add_argument('--s3-common-url', default='s3.amazonaws.com')
parser.add_argument('--bucket-name', default='clickhouse-datasets')
parser.add_argument('--dataset-name', required=True,
help='Name of dataset, will be used in uploaded path')
parser.add_argument('--access-key-id', required=True)
parser.add_argument('--secret-access-key', required=True)
parser.add_argument('--clickhouse-data-path',
default='/var/lib/clickhouse/',
help='Path to clickhouse database on filesystem')
parser.add_argument('--s3-path', help='Path in s3, where to upload file')
parser.add_argument('--tmp-prefix', default='/tmp',
help='Prefix to store temporary downloaded file')
data_group = parser.add_mutually_exclusive_group(required=True)
table_name_argument = data_group.add_argument('--table-name',
help='Name of table with database, if you are uploading partitions')
data_group.add_argument('--file-path',
help='Name of file, if you are uploading')
data_group.add_argument('--directory-path', help='Path to directory with files to upload')
data_group.add_argument('--list-directory', help='List s3 directory by --directory-path')
data_group.add_argument('--remove-directory', help='Remove s3 directory by --directory-path')
args = parser.parse_args()
if args.table_name is not None and args.clickhouse_data_path is None:
raise argparse.ArgumentError(table_name_argument,
"You should specify --clickhouse-data-path to upload --table")
s3_conn = S3API(
args.access_key_id, args.secret_access_key,
args.s3_api_url, args.s3_common_url)
file_path = ''
directory_path = args.directory_path
s3_path = args.s3_path
if args.list_directory:
s3_conn.list_bucket_keys(args.bucket_name)
elif args.remove_directory:
print('Removing s3 path: ' + args.remove_directory)
s3_conn.remove_folder_from_bucket(args.bucket_name, args.remove_directory)
elif args.directory_path is not None:
url = s3_conn.upload_data_for_static_files_disk(args.bucket_name, directory_path, s3_path)
logging.info("Data uploaded: %s", url)
else:
if args.table_name is not None:
if '.' not in args.table_name:
db_name = 'default'
else:
db_name, table_name = args.table_name.split('.')
file_path = make_tar_file_for_table(
args.clickhouse_data_path, db_name, table_name, args.tmp_prefix)
else:
file_path = args.file_path
if 'tsv' in file_path:
s3_path = os.path.join(
args.dataset_name, 'tsv', os.path.basename(file_path))
if args.table_name is not None:
s3_path = os.path.join(
args.dataset_name, 'partitions', os.path.basename(file_path))
elif args.s3_path is not None:
s3_path = os.path.join(
args.dataset_name, args.s3_path, os.path.basename(file_path))
else:
raise Exception("Don't know s3-path to upload")
url = s3_conn.upload_file(args.bucket_name, file_path, s3_path)
logging.info("Data uploaded: %s", url)