Skip to content

Commit

Permalink
Merge pull request prisma#2555 from prisma/migration-engine/use-bidi-…
Browse files Browse the repository at this point in the history
…json-rpc
  • Loading branch information
tomhoule authored Jan 5, 2022
2 parents abbf52e + 8303587 commit 0adc784
Show file tree
Hide file tree
Showing 19 changed files with 221 additions and 124 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/build-wasm.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ jobs:
command: build
args: -p migration-connector --release --target=wasm32-unknown-unknown

- name: Build the migration-core crate with default features
- name: Build the migration-core crate
uses: actions-rs/cargo@v1
with:
command: build
args: -p migration-core --release --target=wasm32-unknown-unknown
args: -p migration-core --release --target=wasm32-unknown-unknown --no-default-features

- name: Build the prisma-fmt crate
uses: actions-rs/cargo@v1
Expand Down
2 changes: 1 addition & 1 deletion .test_database_urls/sqlite
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
export TEST_DATABASE_URL=sqlite
set -e TEST_SHADOW_DATABASE_URL
unset TEST_SHADOW_DATABASE_URL
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

55 changes: 50 additions & 5 deletions libs/json-rpc-stdio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
//!
//! Notifications in the Client are not supported yet.
use jsonrpc_core::{IoHandler, MethodCall, Request};
use jsonrpc_core::{IoHandler, MethodCall, Notification, Request};
use serde::{Deserialize, Serialize};
use std::{
collections::HashMap,
Expand All @@ -22,20 +22,49 @@ static REQUEST_ID_COUNTER: AtomicU64 = AtomicU64::new(0);
/// A handle to a connected client you can use to send requests.
#[derive(Debug, Clone)]
pub struct Client {
notification_sender: mpsc::Sender<Notification>,
request_sender: mpsc::Sender<(MethodCall, oneshot::Sender<jsonrpc_core::Output>)>,
}

/// Constructor a JSON-RPC client. Returns a tuple: the client you can use to send requests, and
/// the adapter you must pass to `run_with_client()` to connect the client to the proper IO.
pub fn new_client() -> (Client, ClientAdapter) {
let (request_sender, request_receiver) = mpsc::channel(30);
let client = Client { request_sender };
let adapter = ClientAdapter { request_receiver };
let (notification_sender, notification_receiver) = mpsc::channel(30);
let client = Client {
request_sender,
notification_sender,
};

let adapter = ClientAdapter {
request_receiver,
notification_receiver,
};

(client, adapter)
}

impl Client {
/// Asynchronously send a JSON-RPC notification.
pub async fn notify<Req>(&self, method: String, params: Req) -> jsonrpc_core::Result<()>
where
Req: serde::Serialize,
{
let json_params = serde_json::to_value(params).map_err(|_err| jsonrpc_core::Error::invalid_request())?;
let params = match json_params {
jsonrpc_core::Value::Array(arr) => jsonrpc_core::Params::Array(arr),
jsonrpc_core::Value::Object(obj) => jsonrpc_core::Params::Map(obj),
_ => return Err(jsonrpc_core::Error::invalid_request()),
};
let notification = jsonrpc_core::Notification {
jsonrpc: Some(jsonrpc_core::Version::V2),
method,
params,
};
self.notification_sender.send(notification).await.unwrap();
Ok(())
}

/// Asynchronously send a JSON-RPC request.
pub async fn call<Req, Res>(&self, method: String, params: Req) -> jsonrpc_core::Result<Res>
where
Expand Down Expand Up @@ -70,6 +99,7 @@ impl Client {
/// The other side of the channels. Only used as a handle to be passed into run_with_client().
pub struct ClientAdapter {
request_receiver: mpsc::Receiver<(MethodCall, oneshot::Sender<jsonrpc_core::Output>)>,
notification_receiver: mpsc::Receiver<Notification>,
}

#[derive(Serialize, Deserialize)]
Expand Down Expand Up @@ -109,6 +139,9 @@ async fn run_with_io(
next_request = client_adapter.request_receiver.recv() => {
handle_next_client_request(next_request, &mut output, &mut in_flight).await?
}
next_notification = client_adapter.notification_receiver.recv() => {
handle_next_client_notification(next_notification, &mut output).await?
}
}
}
}
Expand All @@ -120,8 +153,20 @@ async fn handle_next_client_request<T: AsyncWrite + Unpin>(
) -> io::Result<()> {
let (next_request, channel) = next_request.unwrap();
in_flight.insert(next_request.id.clone(), channel);
let request_json = serde_json::to_string(&next_request)?;
output.write_all(request_json.as_bytes()).await?;
let request_json = serde_json::to_vec(&next_request)?;
output.write_all(&request_json).await?;
output.write_all(b"\n").await?;
output.flush().await?;
Ok(())
}

async fn handle_next_client_notification<T: AsyncWrite + Unpin>(
next_notification: Option<Notification>,
output: &mut tokio::io::BufWriter<T>,
) -> io::Result<()> {
let next_notification = next_notification.unwrap();
let request_json = serde_json::to_vec(&next_notification)?;
output.write_all(&request_json).await?;
output.write_all(b"\n").await?;
output.flush().await?;
Ok(())
Expand Down
1 change: 1 addition & 0 deletions migration-engine/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ migration-connector = { path = "../connectors/migration-connector" }
migration-core = { path = "../core", features = ["sql", "mongodb"] }
user-facing-errors = { path = "../../libs/user-facing-errors" }

async-trait = "0.1.0"
backtrace = "0.3.59"
base64 = "0.13"
json-rpc-stdio = { path = "../../libs/json-rpc-stdio" }
Expand Down
12 changes: 11 additions & 1 deletion migration-engine/cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,16 @@ fn set_panic_hook() {
}));
}

struct JsonRpcHost;

#[async_trait::async_trait]
impl migration_connector::ConnectorHost for JsonRpcHost {
async fn print(&self, text: &str) -> migration_connector::ConnectorResult<()> {
tracing::info!(migrate_action = "log", "{}", text);
Ok(())
}
}

async fn start_engine(datamodel_location: &str) {
use std::io::Read as _;

Expand All @@ -82,7 +92,7 @@ async fn start_engine(datamodel_location: &str) {
let mut datamodel = String::new();
file.read_to_string(&mut datamodel).unwrap();

match rpc_api(&datamodel).await {
match rpc_api(&datamodel, Box::new(JsonRpcHost)).await {
// Block the thread and handle IO in async until EOF.
Ok(api) => json_rpc_stdio::run(&api).await.unwrap(),
Err(err) => {
Expand Down
22 changes: 22 additions & 0 deletions migration-engine/connectors/migration-connector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,32 @@ impl Migration {
}
}

/// An abstract host for a migration connector. It exposes IO that is not directly performed by the
/// connectors.
#[async_trait::async_trait]
pub trait ConnectorHost: Sync + Send + 'static {
/// Print to the console.
async fn print(&self, text: &str) -> ConnectorResult<()>;
}

/// A no-op ConnectorHost.
#[derive(Debug, Clone)]
pub struct EmptyHost;

#[async_trait::async_trait]
impl ConnectorHost for EmptyHost {
async fn print(&self, _text: &str) -> ConnectorResult<()> {
Ok(())
}
}

/// The top-level trait for connectors. This is the abstraction the migration engine core relies on to
/// interface with different database backends.
#[async_trait::async_trait]
pub trait MigrationConnector: Send + Sync + 'static {
/// Accept a new ConnectorHost.
fn set_host(&mut self, host: Box<dyn ConnectorHost>);

/// If possible on the target connector, acquire an advisory lock, so multiple instances of migrate do not run concurrently.
async fn acquire_lock(&self) -> ConnectorResult<()>;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ impl MigrationConnector for MongoDbMigrationConnector {
Ok(())
}

fn set_host(&mut self, _host: Box<dyn migration_connector::ConnectorHost>) {}

async fn validate_migrations(
&self,
_migrations: &[migration_connector::migrations_directory::MigrationDirectory],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub struct SqlMigrationConnector {
flavour: Box<dyn SqlFlavour + Send + Sync + 'static>,
shadow_database_connection_string: Option<String>,
preview_features: BitFlags<PreviewFeature>,
host: Box<dyn ConnectorHost>,
}

impl SqlMigrationConnector {
Expand All @@ -58,6 +59,7 @@ impl SqlMigrationConnector {
flavour,
shadow_database_connection_string,
preview_features,
host: Box::new(EmptyHost),
})
}

Expand Down Expand Up @@ -218,6 +220,10 @@ impl SqlMigrationConnector {

#[async_trait::async_trait]
impl MigrationConnector for SqlMigrationConnector {
fn set_host(&mut self, host: Box<dyn migration_connector::ConnectorHost>) {
self.host = host;
}

fn connector_type(&self) -> &'static str {
self.connection_info.sql_family().as_str()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ impl DatabaseMigrationStepApplier for SqlMigrationConnector {

#[tracing::instrument(skip(self, script))]
async fn apply_script(&self, migration_name: &str, script: &str) -> ConnectorResult<()> {
tracing::info!(migrate_action = "log", "Applying migration `{}`", migration_name);
self.host
.print(&format!("Applying migration `{}`", migration_name))
.await?;
self.flavour.scan_migration_script(script);
let conn = self.conn().await?;
self.flavour.apply_migration_script(migration_name, script, conn).await
Expand Down
1 change: 1 addition & 0 deletions migration-engine/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@ tracing-futures = "0.2"
url = "2.1.1"

[features]
default = ["sql"]
mongodb = ["mongodb-migration-connector"]
sql = ["sql-migration-connector"]
7 changes: 7 additions & 0 deletions migration-engine/core/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ pub trait GenericApi: Send + Sync + 'static {
/// The command behind `prisma db push`.
async fn schema_push(&self, input: &SchemaPushInput) -> CoreResult<SchemaPushOutput>;

/// Set the `ConnectorHost` to use.
fn set_host(&mut self, host: Box<dyn migration_connector::ConnectorHost>);

/// Access to the migration connector.
fn connector(&self) -> &dyn MigrationConnector;
}
Expand Down Expand Up @@ -184,4 +187,8 @@ impl<C: MigrationConnector> GenericApi for C {
.instrument(tracing::info_span!("SchemaPush"))
.await
}

fn set_host(&mut self, host: Box<dyn migration_connector::ConnectorHost>) {
MigrationConnector::set_host(self, host)
}
}
102 changes: 96 additions & 6 deletions migration-engine/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,112 @@

//! The top-level library crate for the migration engine.
pub mod commands;

mod api;
mod core_error;
mod rpc;

pub mod commands;
pub use self::{api::GenericApi, core_error::*, rpc::rpc_api};

pub use core_error::*;
pub use migration_connector;

#[cfg(not(target_arch = "wasm32"))]
mod native;

#[cfg(not(target_arch = "wasm32"))]
pub use native::*;
use datamodel::{
common::{
preview_features::PreviewFeature,
provider_names::{MSSQL_SOURCE_NAME, MYSQL_SOURCE_NAME, POSTGRES_SOURCE_NAME, SQLITE_SOURCE_NAME},
},
Datasource,
};
use enumflags2::BitFlags;
use std::env;
use user_facing_errors::{common::InvalidConnectionString, KnownError};

use datamodel::{Configuration, Datamodel};

fn parse_schema(schema: &str) -> CoreResult<(Configuration, Datamodel)> {
datamodel::parse_schema(schema).map_err(CoreError::new_schema_parser_error)
}

#[cfg(feature = "mongodb")]
use datamodel::common::provider_names::MONGODB_SOURCE_NAME;
#[cfg(feature = "mongodb")]
use mongodb_migration_connector::MongoDbMigrationConnector;
#[cfg(feature = "sql")]
use sql_migration_connector::SqlMigrationConnector;

/// Top-level constructor for the migration engine API.
pub fn migration_api(datamodel: &str) -> CoreResult<Box<dyn api::GenericApi>> {
let (source, url, preview_features, shadow_database_url) = parse_configuration(datamodel)?;

match source.active_provider.as_str() {
#[cfg(feature = "sql")]
POSTGRES_SOURCE_NAME => {
let mut u = url::Url::parse(&url).map_err(|err| {
let details = user_facing_errors::quaint::invalid_connection_string_description(&format!(
"Error parsing connection string: {}",
err
));

CoreError::from(KnownError::new(InvalidConnectionString { details }))
})?;

let params: Vec<(String, String)> = u.query_pairs().map(|(k, v)| (k.to_string(), v.to_string())).collect();

u.query_pairs_mut().clear();

for (k, v) in params.into_iter() {
if k == "statement_cache_size" {
u.query_pairs_mut().append_pair("statement_cache_size", "0");
} else {
u.query_pairs_mut().append_pair(&k, &v);
}
}

if !u.query_pairs().any(|(k, _)| k == "statement_cache_size") {
u.query_pairs_mut().append_pair("statement_cache_size", "0");
}

let connector = SqlMigrationConnector::new(u.to_string(), preview_features, shadow_database_url)?;

Ok(Box::new(connector))
}
#[cfg(feature = "sql")]
MYSQL_SOURCE_NAME | SQLITE_SOURCE_NAME | MSSQL_SOURCE_NAME => {
let connector = SqlMigrationConnector::new(url, preview_features, shadow_database_url)?;

Ok(Box::new(connector))
}
#[cfg(feature = "mongodb")]
MONGODB_SOURCE_NAME => Ok(Box::new(MongoDbMigrationConnector::new(url, preview_features))),
provider => Err(CoreError::from_msg(format!(
"`{}` is not a supported connector.",
provider
))),
}
}

fn parse_configuration(datamodel: &str) -> CoreResult<(Datasource, String, BitFlags<PreviewFeature>, Option<String>)> {
let config = datamodel::parse_configuration(datamodel)
.map(|validated_config| validated_config.subject)
.map_err(|err| CoreError::new_schema_parser_error(err.to_pretty_string("schema.prisma", datamodel)))?;

let url = config.datasources[0]
.load_url(|key| env::var(key).ok())
.map_err(|err| CoreError::new_schema_parser_error(err.to_pretty_string("schema.prisma", datamodel)))?;

let shadow_database_url = config.datasources[0]
.load_shadow_database_url()
.map_err(|err| CoreError::new_schema_parser_error(err.to_pretty_string("schema.prisma", datamodel)))?;

let preview_features = config.preview_features();

let source = config
.datasources
.into_iter()
.next()
.ok_or_else(|| CoreError::from_msg("There is no datasource in the schema.".into()))?;

Ok((source, url, preview_features, shadow_database_url))
}
Loading

0 comments on commit 0adc784

Please sign in to comment.