Skip to content

Commit

Permalink
Merge pull request #3895 from laboro/fix/CRM-2803
Browse files Browse the repository at this point in the history
CRM-2803: Deadlock during magento sync
  • Loading branch information
x86demon committed Mar 6, 2015
2 parents 3e0db89 + c56cd2c commit 401d7d0
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 12 deletions.
42 changes: 33 additions & 9 deletions src/Oro/Bundle/ImportExportBundle/Job/JobExecutor.php
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ class JobExecutor
*/
protected $batchJobRepository;

/**
* @param ConnectorRegistry $jobRegistry
* @param BatchJobRepository $batchJobRepository
* @param ContextRegistry $contextRegistry
* @param ManagerRegistry $managerRegistry
*/
public function __construct(
ConnectorRegistry $jobRegistry,
BatchJobRepository $batchJobRepository,
Expand All @@ -70,7 +76,7 @@ public function __construct(
* @param array $configuration
* @return JobResult
*/
public function executeJob($jobType, $jobName, array $configuration = array())
public function executeJob($jobType, $jobName, array $configuration = [])
{
$this->initialize();

Expand Down Expand Up @@ -116,7 +122,11 @@ protected function doJob(JobInstance $jobInstance, JobExecution $jobExecution)
$jobResult = new JobResult();
$jobResult->setSuccessful(false);

$this->entityManager->beginTransaction();
$isTransactionRunning = $this->isTransactionRunning();
if (!$isTransactionRunning) {
$this->entityManager->beginTransaction();
}

try {
$job = $this->batchJobRegistry->getJob($jobInstance);
if (!$job) {
Expand All @@ -127,11 +137,15 @@ protected function doJob(JobInstance $jobInstance, JobExecution $jobExecution)

$failureExceptions = $this->collectFailureExceptions($jobExecution);

if ($jobExecution->getStatus()->getValue() == BatchStatus::COMPLETED && !$failureExceptions) {
$this->entityManager->commit();
if ($jobExecution->getStatus()->getValue() === BatchStatus::COMPLETED && !$failureExceptions) {
if (!$isTransactionRunning) {
$this->entityManager->commit();
}
$jobResult->setSuccessful(true);
} else {
$this->entityManager->rollback();
if (!$isTransactionRunning) {
$this->entityManager->rollback();
}
foreach ($failureExceptions as $failureException) {
$jobResult->addFailureException($failureException);
}
Expand All @@ -141,7 +155,9 @@ protected function doJob(JobInstance $jobInstance, JobExecution $jobExecution)
$this->batchJobRepository->getJobManager()->flush();
$this->batchJobRepository->getJobManager()->clear();
} catch (\Exception $exception) {
$this->entityManager->rollback();
if (!$isTransactionRunning) {
$this->entityManager->rollback();
}
$jobExecution->addFailureException($exception);
$jobResult->addFailureException($exception->getMessage());

Expand All @@ -151,6 +167,14 @@ protected function doJob(JobInstance $jobInstance, JobExecution $jobExecution)
return $jobResult;
}

/**
* @return bool
*/
protected function isTransactionRunning()
{
return $this->entityManager->getConnection()->getTransactionNestingLevel() !== 0;
}

/**
* Try to save batch entities only in case when it's possible
*
Expand Down Expand Up @@ -194,7 +218,7 @@ public function getJobFailureExceptions($jobCode)
protected function getJobExecutionByJobInstanceCode($jobCode)
{
/** @var JobInstance $jobInstance */
$jobInstance = $this->getJobInstanceRepository()->findOneBy(array('code' => $jobCode));
$jobInstance = $this->getJobInstanceRepository()->findOneBy(['code' => $jobCode]);
if (!$jobInstance) {
throw new LogicException(sprintf('No job instance found with code %s', $jobCode));
}
Expand Down Expand Up @@ -222,7 +246,7 @@ protected function getJobInstanceRepository()
*/
protected function collectFailureExceptions(JobExecution $jobExecution)
{
$failureExceptions = array();
$failureExceptions = [];
foreach ($jobExecution->getAllFailureExceptions() as $exceptionData) {
if (!empty($exceptionData['message'])) {
$failureExceptions[] = $exceptionData['message'];
Expand All @@ -238,7 +262,7 @@ protected function collectFailureExceptions(JobExecution $jobExecution)
*/
protected function collectErrors(JobExecution $jobExecution)
{
$errors = array();
$errors = [];
foreach ($jobExecution->getStepExecutions() as $stepExecution) {
$errors = array_merge(
$errors,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ class JobExecutorTest extends \PHPUnit_Framework_TestCase
*/
protected $entityManager;

/**
* @var \PHPUnit_Framework_MockObject_MockObject
*/
protected $connection;

/**
* @var \PHPUnit_Framework_MockObject_MockObject
*/
Expand Down Expand Up @@ -51,6 +56,12 @@ protected function setUp()
$this->entityManager = $this->getMockBuilder('Doctrine\ORM\EntityManager')
->disableOriginalConstructor()
->getMock();
$this->connection = $this->getMockBuilder('Doctrine\DBAL\Connection')
->disableOriginalConstructor()
->getMock();
$this->entityManager->expects($this->any())
->method('getConnection')
->will($this->returnValue($this->connection));
$this->managerRegistry = $this->getMockBuilder('Symfony\Bridge\Doctrine\ManagerRegistry')
->disableOriginalConstructor()
->getMock();
Expand Down Expand Up @@ -92,6 +103,9 @@ function ($instance) {

public function testExecuteJobUnknownJob()
{
$this->connection->expects($this->once())
->method('getTransactionNestingLevel')
->will($this->returnValue(0));
$this->entityManager->expects($this->once())
->method('beginTransaction');
$this->entityManager->expects($this->once())
Expand All @@ -111,7 +125,9 @@ public function testExecuteJobUnknownJob()
public function testExecuteJobSuccess()
{
$configuration = array('test' => true);

$this->connection->expects($this->once())
->method('getTransactionNestingLevel')
->will($this->returnValue(0));
$this->entityManager->expects($this->once())
->method('beginTransaction');
$this->entityManager->expects($this->never())
Expand Down Expand Up @@ -169,10 +185,75 @@ function (JobExecution $jobExecution) use ($configuration, $stepExecution) {
$this->assertEquals($context, $result->getContext());
}

public function testExecuteJobStopped()
public function testExecuteJobSuccessWithTransactionStarted()
{
$configuration = array('test' => true);
$this->connection->expects($this->once())
->method('getTransactionNestingLevel')
->will($this->returnValue(1));
$this->entityManager->expects($this->never())
->method('beginTransaction');
$this->entityManager->expects($this->never())
->method('rollback');
$this->entityManager->expects($this->never())
->method('commit');

$this->batchJobManager->expects($this->once())->method('persist')
->with($this->isInstanceOf('Akeneo\Bundle\BatchBundle\Entity\JobInstance'));
$this->batchJobManager->expects($this->once())->method('flush')
->with();

$context = $this->getMockBuilder('Oro\Bundle\ImportExportBundle\Context\ContextInterface')
->getMockForAbstractClass();
$stepExecution = $this->getMockBuilder('Akeneo\Bundle\BatchBundle\Entity\StepExecution')
->disableOriginalConstructor()
->getMock();
$stepExecution->expects($this->any())
->method('getFailureExceptions')
->will($this->returnValue(array()));
$this->contextRegistry->expects($this->once())
->method('getByStepExecution')
->with($stepExecution)
->will($this->returnValue($context));

$job = $this->getMockBuilder('Akeneo\Bundle\BatchBundle\Job\JobInterface')
->getMock();
$job->expects($this->once())
->method('execute')
->with($this->isInstanceOf('Akeneo\Bundle\BatchBundle\Entity\JobExecution'))
->will(
$this->returnCallback(
function (JobExecution $jobExecution) use ($configuration, $stepExecution) {
\PHPUnit_Framework_Assert::assertEquals(
'import.test',
$jobExecution->getJobInstance()->getLabel()
);
\PHPUnit_Framework_Assert::assertEquals(
$configuration,
$jobExecution->getJobInstance()->getRawConfiguration()
);
$jobExecution->setStatus(new BatchStatus(BatchStatus::COMPLETED));
$jobExecution->addStepExecution($stepExecution);
}
)
);

$this->batchJobRegistry->expects($this->once())
->method('getJob')
->with($this->isInstanceOf('Akeneo\Bundle\BatchBundle\Entity\JobInstance'))
->will($this->returnValue($job));
$result = $this->executor->executeJob('import', 'test', $configuration);
$this->assertInstanceOf('Oro\Bundle\ImportExportBundle\Job\JobResult', $result);
$this->assertTrue($result->isSuccessful());
$this->assertEquals($context, $result->getContext());
}

public function testExecuteJobStopped()
{
$configuration = array('test' => true);
$this->connection->expects($this->once())
->method('getTransactionNestingLevel')
->will($this->returnValue(0));
$this->entityManager->expects($this->once())
->method('beginTransaction');
$this->entityManager->expects($this->once())
Expand Down Expand Up @@ -210,7 +291,9 @@ function (JobExecution $jobExecution) use ($configuration) {
public function testExecuteJobFailure()
{
$configuration = array('test' => true);

$this->connection->expects($this->once())
->method('getTransactionNestingLevel')
->will($this->returnValue(0));
$this->entityManager->expects($this->once())
->method('beginTransaction');
$this->entityManager->expects($this->once())
Expand Down

0 comments on commit 401d7d0

Please sign in to comment.