Skip to content

Commit

Permalink
refactor: simplify get_tables_history() (databendlabs#16572)
Browse files Browse the repository at this point in the history
Make this method return `TableNIV` instead of `TableInfo`. TableInfo
includes information that `SchemaApi` can not provide.
  • Loading branch information
drmingdrmer authored Oct 9, 2024
1 parent 0c550c5 commit 0c8d546
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 201 deletions.
1 change: 0 additions & 1 deletion src/meta/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ pub use util::deserialize_struct;
pub use util::deserialize_u64;
pub use util::fetch_id;
pub use util::get_u64_value;
pub use util::list_keys;
pub use util::list_u64_value;
pub use util::send_txn;
pub use util::serialize_struct;
Expand Down
7 changes: 2 additions & 5 deletions src/meta/api/src/schema_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use databend_common_meta_app::schema::dictionary_name_ident::DictionaryNameIdent
use databend_common_meta_app::schema::index_id_ident::IndexId;
use databend_common_meta_app::schema::index_id_ident::IndexIdIdent;
use databend_common_meta_app::schema::least_visible_time_ident::LeastVisibleTimeIdent;
use databend_common_meta_app::schema::table_niv::TableNIV;
use databend_common_meta_app::schema::CatalogInfo;
use databend_common_meta_app::schema::CatalogMeta;
use databend_common_meta_app::schema::CatalogNameIdent;
Expand Down Expand Up @@ -209,11 +210,7 @@ pub trait SchemaApi: Send + Sync {
table_id_history: &TableIdHistoryIdent,
) -> Result<Vec<(TableId, SeqV<TableMeta>)>, KVAppError>;

async fn get_tables_history(
&self,
req: ListTableReq,
db_name: &str,
) -> Result<Vec<Arc<TableInfo>>, KVAppError>;
async fn get_tables_history(&self, req: ListTableReq) -> Result<Vec<TableNIV>, KVAppError>;

/// List all tables in the database.
///
Expand Down
70 changes: 14 additions & 56 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,6 @@ use crate::get_u64_value;
use crate::kv_app_error::KVAppError;
use crate::kv_pb_api::KVPbApi;
use crate::kv_pb_crud_api::KVPbCrudApi;
use crate::list_keys;
use crate::list_u64_value;
use crate::meta_txn_error::MetaTxnError;
use crate::name_id_value_api::NameIdValueApi;
Expand Down Expand Up @@ -1562,77 +1561,36 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {

#[logcall::logcall]
#[fastrace::trace]
async fn get_tables_history(
&self,
req: ListTableReq,
db_name: &str,
) -> Result<Vec<Arc<TableInfo>>, KVAppError> {
async fn get_tables_history(&self, req: ListTableReq) -> Result<Vec<TableNIV>, KVAppError> {
debug!(req :? =(&req); "SchemaApi: {}", func_name!());

// List tables by tenant, db_id, table_name.
// List tables by tenant, db_id.
let table_id_history_ident = TableIdHistoryIdent {
database_id: req.database_id.db_id,
table_name: "dummy".to_string(),
};

let dir_name = DirName::new(table_id_history_ident);

let table_id_list_keys = list_keys(self, &dir_name).await?;
let ident_histories = self.list_pb_vec(&dir_name).await?;

let mut tbs_info_list = vec![];
let mut res = vec![];
let now = Utc::now();
let keys: Vec<String> = table_id_list_keys
.iter()
.map(|table_id_list_key| {
TableIdHistoryIdent {
database_id: req.database_id.db_id,
table_name: table_id_list_key.table_name.clone(),
}
.to_string_key()
})
.collect();

let mut table_id_list_keys_iter = table_id_list_keys.into_iter();
for c in keys.chunks(DEFAULT_MGET_SIZE) {
let tb_id_list_seq_vec: Vec<(u64, Option<TableIdList>)> =
mget_pb_values(self, c).await?;
for (tb_id_list_seq, tb_id_list_opt) in tb_id_list_seq_vec {
let table_id_list_key = table_id_list_keys_iter.next().unwrap();
let tb_id_list = if tb_id_list_seq == 0 {
continue;
} else {
match tb_id_list_opt {
Some(list) => list,
None => {
continue;
}
}
};
for (ident, history) in ident_histories {
debug!(name :% =(&ident); "get_tables_history");

debug!(
name :% =(&table_id_list_key);
"get_tables_history"
);
let id_metas = get_table_meta_history(self, &now, history.data).await?;

let metas = get_table_meta_history(self, &now, tb_id_list).await?;
let tb_info_list: Vec<Arc<TableInfo>> = metas
.into_iter()
.map(|(table_id, seqv)| {
Arc::new(TableInfo {
ident: TableIdent::new(table_id.table_id, seqv.seq()),
desc: format!("'{}'.'{}'", db_name, table_id_list_key.table_name,),
name: table_id_list_key.table_name.to_string(),
meta: seqv.data,
db_type: DatabaseType::NormalDB,
catalog_info: Default::default(),
})
})
.collect();
tbs_info_list.extend(tb_info_list);
}
let table_nivs = id_metas.into_iter().map(|(table_id, seq_meta)| {
let name = DBIdTableName::new(ident.database_id, ident.table_name.clone());
TableNIV::new(name, table_id, seq_meta)
});

res.extend(table_nivs);
}

return Ok(tbs_info_list);
return Ok(res);
}

#[logcall::logcall]
Expand Down
Loading

0 comments on commit 0c8d546

Please sign in to comment.