Skip to content

Commit

Permalink
[Feature][external catalog/lakesoul] support lakesoul catalog (apache…
Browse files Browse the repository at this point in the history
…#32164)

Issue Number: close apache#32163
  • Loading branch information
Ceng23333 authored May 27, 2024
1 parent 074b87a commit 9b5a464
Show file tree
Hide file tree
Showing 41 changed files with 2,474 additions and 11 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,5 @@ lru_cache_test

# other
compile_commands.json
.github
docker/runtime/be/resource/apache-doris/
1 change: 1 addition & 0 deletions be/src/common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ pch_reuse(Common)

# Generate env_config.h according to env_config.h.in
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/env_config.h.in ${GENSRC_DIR}/common/env_config.h)
target_include_directories(Common PUBLIC ${GENSRC_DIR}/common/)
86 changes: 86 additions & 0 deletions be/src/vec/exec/format/table/lakesoul_jni_reader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "lakesoul_jni_reader.h"

#include <map>
#include <ostream>

#include "common/logging.h"
#include "runtime/descriptors.h"
#include "runtime/runtime_state.h"
#include "runtime/types.h"
#include "vec/core/types.h"

namespace doris {
class RuntimeProfile;
class RuntimeState;

namespace vectorized {
class Block;
} // namespace vectorized
} // namespace doris

namespace doris::vectorized {
LakeSoulJniReader::LakeSoulJniReader(const TLakeSoulFileDesc& lakesoul_params,
const std::vector<SlotDescriptor*>& file_slot_descs,
RuntimeState* state, RuntimeProfile* profile)
: _lakesoul_params(lakesoul_params),
_file_slot_descs(file_slot_descs),
_state(state),
_profile(profile) {
std::vector<std::string> required_fields;
for (auto& desc : _file_slot_descs) {
required_fields.emplace_back(desc->col_name());
}

std::map<String, String> params = {
{"query_id", print_id(_state->query_id())},
{"file_paths", join(_lakesoul_params.file_paths, ";")},
{"primary_keys", join(_lakesoul_params.primary_keys, ";")},
{"partition_descs", join(_lakesoul_params.partition_descs, ";")},
{"required_fields", join(required_fields, ";")},
{"options", _lakesoul_params.options},
{"table_schema", _lakesoul_params.table_schema},
};
_jni_connector = std::make_unique<JniConnector>("org/apache/doris/lakesoul/LakeSoulJniScanner",
params, required_fields);
}

Status LakeSoulJniReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
RETURN_IF_ERROR(_jni_connector->get_next_block(block, read_rows, eof));
if (*eof) {
RETURN_IF_ERROR(_jni_connector->close());
}
return Status::OK();
}

Status LakeSoulJniReader::get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type,
std::unordered_set<std::string>* missing_cols) {
for (auto& desc : _file_slot_descs) {
name_to_type->emplace(desc->col_name(), desc->type());
}
return Status::OK();
}

Status LakeSoulJniReader::init_reader(
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) {
_colname_to_value_range = colname_to_value_range;
RETURN_IF_ERROR(_jni_connector->init(colname_to_value_range));
return _jni_connector->open(_state, _profile);
}
} // namespace doris::vectorized
70 changes: 70 additions & 0 deletions be/src/vec/exec/format/table/lakesoul_jni_reader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <cstddef>
#include <memory>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <vector>

#include "common/status.h"
#include "exec/olap_common.h"
#include "vec/exec/format/generic_reader.h"
#include "vec/exec/jni_connector.h"

namespace doris {
class RuntimeProfile;
class RuntimeState;
class SlotDescriptor;

namespace vectorized {
class Block;
} // namespace vectorized
struct TypeDescriptor;
} // namespace doris

namespace doris::vectorized {
class LakeSoulJniReader : public ::doris::vectorized::GenericReader {
ENABLE_FACTORY_CREATOR(LakeSoulJniReader);

public:
LakeSoulJniReader(const TLakeSoulFileDesc& lakesoul_params,
const std::vector<SlotDescriptor*>& file_slot_descs, RuntimeState* state,
RuntimeProfile* profile);

~LakeSoulJniReader() override = default;

Status get_next_block(::doris::vectorized::Block* block, size_t* read_rows, bool* eof) override;

Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type,
std::unordered_set<std::string>* missing_cols) override;

Status init_reader(
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range);

private:
const TLakeSoulFileDesc& _lakesoul_params;
const std::vector<SlotDescriptor*>& _file_slot_descs;
RuntimeState* _state;
RuntimeProfile* _profile;
std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range;
std::unique_ptr<::doris::vectorized::JniConnector> _jni_connector;
};
} // namespace doris::vectorized
8 changes: 8 additions & 0 deletions be/src/vec/exec/scan/vfile_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
#include "vec/exec/format/parquet/vparquet_reader.h"
#include "vec/exec/format/table/hudi_jni_reader.h"
#include "vec/exec/format/table/iceberg_reader.h"
#include "vec/exec/format/table/lakesoul_jni_reader.h"
#include "vec/exec/format/table/max_compute_jni_reader.h"
#include "vec/exec/format/table/paimon_jni_reader.h"
#include "vec/exec/format/table/paimon_reader.h"
Expand Down Expand Up @@ -806,6 +807,13 @@ Status VFileScanner::_get_next_reader() {
_file_slot_descs, _state, _profile);
init_status =
((HudiJniReader*)_cur_reader.get())->init_reader(_colname_to_value_range);
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "lakesoul") {
_cur_reader =
LakeSoulJniReader::create_unique(range.table_format_params.lakesoul_params,
_file_slot_descs, _state, _profile);
init_status = ((LakeSoulJniReader*)_cur_reader.get())
->init_reader(_colname_to_value_range);
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "trino_connector") {
_cur_reader = TrinoConnectorJniReader::create_unique(_file_slot_descs, _state,
Expand Down
2 changes: 2 additions & 0 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,7 @@ if [[ "${BUILD_BE_JAVA_EXTENSIONS}" -eq 1 ]]; then
modules+=("be-java-extensions/trino-connector-scanner")
modules+=("be-java-extensions/max-compute-scanner")
modules+=("be-java-extensions/avro-scanner")
modules+=("be-java-extensions/lakesoul-scanner")
modules+=("be-java-extensions/preload-extensions")

# If the BE_EXTENSION_IGNORE variable is not empty, remove the modules that need to be ignored from FE_MODULES
Expand Down Expand Up @@ -834,6 +835,7 @@ EOF
extensions_modules+=("trino-connector-scanner")
extensions_modules+=("max-compute-scanner")
extensions_modules+=("avro-scanner")
extensions_modules+=("lakesoul-scanner")
extensions_modules+=("preload-extensions")

if [[ -n "${BE_EXTENSION_IGNORE}" ]]; then
Expand Down
64 changes: 64 additions & 0 deletions docker/thirdparties/docker-compose/lakesoul/lakesoul.yaml.tpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

version: '3'

services:
lakesoul-meta-db:
image: postgres:14.5
container_name: lakesoul-test-pg
hostname: lakesoul-docker-compose-env-lakesoul-meta-db-1
ports:
- "5432:5432"
restart: always
environment:
POSTGRES_PASSWORD: lakesoul_test
POSTGRES_USER: lakesoul_test
POSTGRES_DB: lakesoul_test
command:
- "postgres"
- "-c"
- "max_connections=4096"
- "-c"
- "default_transaction_isolation=serializable"
volumes:
- ./meta_init.sql:/docker-entrypoint-initdb.d/meta_init.sql
- ./meta_cleanup.sql:/meta_cleanup.sql

# minio:
# image: bitnami/minio:latest
# ports:
# - "9000:9000"
# - "9001:9001"
# environment:
# MINIO_DEFAULT_BUCKETS: lakesoul-test-bucket:public
# MINIO_ROOT_USER: minioadmin1
# MINIO_ROOT_PASSWORD: minioadmin1
# healthcheck:
# test: [ "CMD", "curl", "-f", "http://localhost:9000/minio/health/live" ]
# interval: 3s
# timeout: 5s
# retries: 3
# hostname: minio
# profiles: ["s3"]


networks:
default:
driver: bridge
ipam:
driver: default
11 changes: 11 additions & 0 deletions docker/thirdparties/docker-compose/lakesoul/meta_cleanup.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
-- SPDX-FileCopyrightText: 2023 LakeSoul Contributors
--
-- SPDX-License-Identifier: Apache-2.0

delete from namespace;
insert into namespace(namespace, properties, comment) values ('default', '{}', '');
delete from data_commit_info;
delete from table_info;
delete from table_path_id;
delete from table_name_id;
delete from partition_info;
Loading

0 comments on commit 9b5a464

Please sign in to comment.