Skip to content

Commit

Permalink
prover job queue db
Browse files Browse the repository at this point in the history
  • Loading branch information
dvush committed Dec 3, 2020
1 parent ded619a commit 32f80eb
Show file tree
Hide file tree
Showing 40 changed files with 2,047 additions and 1,906 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion contracts/contracts/ZkSync.sol
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,8 @@ contract ZkSync is UpgradeableMaster, Storage, Config, Events, ReentrancyGuard {
);
++currentTotalBlocksProofed;

_proof.commitments[_commitmentIdxs[i]] = uint256(_committedBlocks[i].commitment);
uint256 mask = (~uint256(0)) >> 3;
require(_proof.commitments[_commitmentIdxs[i]] & mask == uint256(_committedBlocks[i].commitment) & mask, "pbl3"); // incorrect block commitment in proof
}

bool success =
Expand Down
3 changes: 1 addition & 2 deletions core/bin/data_restore/src/storage_interactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::{convert::TryFrom, str::FromStr};
// External deps
use web3::types::H256;
// Workspace deps
use zksync_crypto::proof::EncodedProofPlonk;
use zksync_storage::{
data_restore::records::{NewBlockEvent, StoredBlockEvent, StoredRollupOpsBlock},
StorageProcessor,
Expand Down Expand Up @@ -83,7 +82,7 @@ pub async fn update_tree_state(

let verify_op = Operation {
action: Action::Verify {
proof: Box::new(EncodedProofPlonk::default()),
proof: Box::new(Default::default()),
},
block: block.clone(),
id: None,
Expand Down
6 changes: 1 addition & 5 deletions core/bin/prover/src/bin/dummy_prover.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
use anyhow::Error;
use std::sync::mpsc;
use std::time::Duration;
use zksync_crypto::proof::EncodedProofPlonk;
use zksync_prover::cli_utils::main_for_prover_impl;
use zksync_prover::{ApiClient, ProverConfig, ProverImpl};
use zksync_prover_utils::aggregated_proofs::{
AggregatedProof, NewProofType, OldProofType, SingleProof,
};
use zksync_prover_utils::api::{JobRequestData, JobResultData};
use zksync_utils::get_env;

Expand Down Expand Up @@ -51,5 +47,5 @@ impl ProverImpl for DummyProver {

#[tokio::main]
async fn main() {
main_for_prover_impl::<DummyProver>();
main_for_prover_impl::<DummyProver>().await;
}
2 changes: 1 addition & 1 deletion core/bin/prover/src/bin/plonk_step_by_step_prover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ use zksync_prover::plonk_step_by_step_prover::PlonkStepByStepProver;

#[tokio::main]
async fn main() {
main_for_prover_impl::<PlonkStepByStepProver>();
main_for_prover_impl::<PlonkStepByStepProver>().await;
}
10 changes: 6 additions & 4 deletions core/bin/prover/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,12 @@ use reqwest::Url;
// Workspace deps
use crate::client;
use zksync_circuit::circuit::ZkSyncCircuit;
use zksync_crypto::proof::EncodedProofPlonk;
use zksync_circuit::serialization::ProverData;
use zksync_crypto::Engine;
use zksync_prover_utils::api::{
ProverId, ProverInputRequest, ProverInputResponse, ProverOutputRequest, ProverStopped,
WorkingOn,
};
use zksync_prover_utils::prover_data::ProverData;

#[derive(Debug, Clone)]
pub struct ApiClient {
Expand Down Expand Up @@ -61,10 +60,13 @@ impl crate::ApiClient for ApiClient {
Ok(response.json().await?)
}

async fn working_on(&self, job_id: i32) -> Result<(), anyhow::Error> {
async fn working_on(&self, job_id: i32, prover_name: &str) -> Result<(), anyhow::Error> {
self.http_client
.post(self.working_on_url.clone())
.json(&WorkingOn { job_id })
.json(&WorkingOn {
job_id,
prover_name: prover_name.to_string(),
})
.send()
.await?;
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion core/bin/prover/src/exit_proof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::time::Instant;
use zksync_circuit::exit_circuit::create_exit_circuit_with_public_input;
use zksync_crypto::circuit::account::CircuitAccount;
use zksync_crypto::circuit::CircuitAccountTree;
use zksync_crypto::proof::{EncodedAggregatedProof, EncodedProofPlonk};
use zksync_crypto::proof::EncodedAggregatedProof;
use zksync_prover_utils::aggregated_proofs::{gen_aggregate_proof, SingleProofData};
use zksync_prover_utils::{gen_verified_proof_for_exit_circuit, PlonkVerificationKey};
use zksync_types::{AccountId, AccountMap, Address, TokenId};
Expand Down
13 changes: 8 additions & 5 deletions core/bin/prover/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ use zksync_crypto::rand::{
// Workspace deps
use tokio::stream::StreamExt;
use zksync_config::ProverOptions;
use zksync_crypto::proof::EncodedAggregatedProof;
use zksync_crypto::{proof::EncodedProofPlonk, Engine};
use zksync_prover_utils::aggregated_proofs::{AggregatedProof, SingleProof};
use zksync_crypto::proof::{AggregatedProof, EncodedAggregatedProof, SingleProof};
use zksync_crypto::Engine;
use zksync_prover_utils::api::{
JobRequestData, JobResultData, ProverId, ProverInputRequest, ProverInputRequestAuxData,
ProverInputResponse, ProverOutputRequest,
Expand Down Expand Up @@ -95,7 +94,7 @@ pub trait ProverImpl {
#[async_trait::async_trait]
pub trait ApiClient: Debug {
async fn get_job(&self, req: ProverInputRequest) -> Result<ProverInputResponse, anyhow::Error>;
async fn working_on(&self, job_id: i32) -> Result<(), anyhow::Error>;
async fn working_on(&self, job_id: i32, prover_name: &str) -> Result<(), anyhow::Error>;
async fn publish(&self, data: ProverOutputRequest) -> Result<(), anyhow::Error>;
async fn prover_stopped(&self, prover_id: ProverId) -> Result<(), anyhow::Error>;
}
Expand Down Expand Up @@ -166,6 +165,8 @@ async fn prover_work_cycle<PROVER, CLIENT>(
let ProverInputResponse {
job_id,
data: job_data,
first_block,
last_block,
} = prover_input_response;
let job_data = if let Some(job_data) = job_data {
job_data
Expand All @@ -187,7 +188,7 @@ async fn prover_work_cycle<PROVER, CLIENT>(

tokio::time::delay_for(timeout_value).await;
client
.working_on(job_id)
.working_on(job_id, &prover_name)
.await
.map_err(|e| log::warn!("Failed to send hearbeat"))
.unwrap_or_default();
Expand All @@ -210,6 +211,8 @@ async fn prover_work_cycle<PROVER, CLIENT>(
client
.publish(ProverOutputRequest {
job_id,
first_block,
last_block,
data: proof,
})
.await
Expand Down
5 changes: 2 additions & 3 deletions core/bin/prover/src/plonk_step_by_step_prover.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use crate::{ApiClient, ProverConfig, ProverImpl};
use std::sync::Mutex;
use std::time::Duration;
use zksync_crypto::proof::{AggregatedProof, SingleProof};
use zksync_crypto::Engine;
use zksync_prover_utils::aggregated_proofs::{
gen_aggregate_proof, prepare_proof_data, AggregatedProof, SingleProof,
};
use zksync_prover_utils::aggregated_proofs::{gen_aggregate_proof, prepare_proof_data};
use zksync_prover_utils::api::{JobRequestData, JobResultData};
use zksync_prover_utils::{PlonkVerificationKey, SetupForStepByStepProver};
use zksync_types::BlockNumber;
Expand Down
74 changes: 57 additions & 17 deletions core/bin/zksync_core/src/committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,36 +305,76 @@ async fn create_aggregated_operations(storage: &mut StorageProcessor<'_>) -> any
}

let aggregated_op_create = AggregatedOperation::CreateProofBlocks(block_numbers);
// TODO: should execute after proof is produced
let aggregated_op_publish =
AggregatedOperation::PublishProofBlocksOnchain(BlocksProofOperation {
blocks,
proof: Default::default(),
block_idxs_in_proof,
});

storage
.chain()
.operations_schema()
.store_aggregated_action(aggregated_op_create)
.await?;
storage
.chain()
.operations_schema()
.store_aggregated_action(aggregated_op_publish)
.await?;

log::info!(
"Created aggregated create proof / publish proof op: {} - {}",
"Created aggregated create proof op: {} - {}",
last_aggregate_create_proof_block + 1,
last_committed_block
);
}

// TODO: should execute after proof is produced
// if last_aggregate_create_proof_block > last_aggregate_publish_proof_block {
//
// }
if last_aggregate_create_proof_block > last_aggregate_publish_proof_block {
let create_proof_blocks =
if let Some(AggregatedOperation::CreateProofBlocks(create_proof_blocks)) = storage
.chain()
.operations_schema()
.get_aggregated_op_that_affects_block(
AggregatedActionType::CreateProofBlocks,
last_aggregate_create_proof_block + 1,
)
.await?
{
create_proof_blocks
} else {
panic!("Create proof blocks action should exist");
};

let first_block = *create_proof_blocks.first().expect("should have 1 block");
let last_block = *create_proof_blocks.last().expect("should have 1 block");
let proof = storage
.prover_schema()
.load_aggregated_proof(first_block, last_block)
.await?;

if let Some(proof) = proof {
let proof = proof.serialize_aggregated_proof();
let mut blocks = Vec::new();
let mut block_idxs_in_proof = Vec::new();
for (idx, block_number) in create_proof_blocks.into_iter().enumerate() {
let block = storage
.chain()
.block_schema()
.get_block(block_number)
.await?
.expect("Failed to get last committed block from db");
blocks.push(block);
block_idxs_in_proof.push(idx);
}

let aggregated_op_publish =
AggregatedOperation::PublishProofBlocksOnchain(BlocksProofOperation {
blocks,
proof,
block_idxs_in_proof,
});
storage
.chain()
.operations_schema()
.store_aggregated_action(aggregated_op_publish)
.await?;
log::info!(
"Created aggregated publish proof op: {} - {}",
first_block,
last_block
);
}
}

if last_aggregate_publish_proof_block > last_aggregate_executed_block {
let mut blocks = Vec::new();
Expand Down
Loading

0 comments on commit 32f80eb

Please sign in to comment.