Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…nto dev
  • Loading branch information
terrymanu committed Mar 23, 2018
2 parents 2b3c998 + a949190 commit 8e635c5
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
<finalName>${project.artifactId}-${project.parent.version}</finalName>
<appendAssemblyId>false</appendAssemblyId>
<descriptors>
<descriptor>src/main/resources/assemblies/assembly.xml</descriptor>
<descriptor>src/main/resources/assembly/assembly.xml</descriptor>
</descriptors>
</configuration>
<executions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,15 @@ public void execute(final JobExecutionContext jobExecutionContext) {
TransactionLogStorage transactionLogStorage = (TransactionLogStorage) jobExecutionContext.getJobDetail().getJobDataMap().get("transactionLogStorage");
List<TransactionLog> transactionLogList = transactionLogStorage.findEligibleTransactionLogs(baseTransactionJobConfiguration.getJobConfig().getTransactionLogFetchDataCount(),
baseTransactionJobConfiguration.getJobConfig().getMaxDeliveryTryTimes(), baseTransactionJobConfiguration.getJobConfig().getMaxDeliveryTryDelayMillis());
if (null != transactionLogList) {
for (TransactionLog each : transactionLogList) {
try (Connection conn = baseTransactionJobConfiguration.getTargetDataSource(each.getDataSource()).getConnection()) {
transactionLogStorage.processData(conn, each, baseTransactionJobConfiguration.getJobConfig().getMaxDeliveryTryTimes());
} catch (final SQLException | TransactionCompensationException ex) {
log.error(String.format("Async delivery times %s error, max try times is %s, exception is %s", each.getAsyncDeliveryTryTimes() + 1,
baseTransactionJobConfiguration.getJobConfig().getMaxDeliveryTryTimes(), ex.getMessage()));
}
if (null == transactionLogList) {
return;
}
for (TransactionLog each : transactionLogList) {
try (Connection conn = baseTransactionJobConfiguration.getTargetDataSource(each.getDataSource()).getConnection()) {
transactionLogStorage.processData(conn, each, baseTransactionJobConfiguration.getJobConfig().getMaxDeliveryTryTimes());
} catch (final SQLException | TransactionCompensationException ex) {
log.error(String.format("Async delivery times %s error, max try times is %s, exception is %s", each.getAsyncDeliveryTryTimes() + 1,
baseTransactionJobConfiguration.getJobConfig().getMaxDeliveryTryTimes(), ex.getMessage()));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package io.shardingjdbc.transaction.job;

import io.shardingjdbc.transaction.config.AsyncSoftTransactionJobConfiguration;
import lombok.Getter;
import lombok.Setter;

Expand Down Expand Up @@ -46,7 +47,7 @@ public final class BASETransactionJobConfiguration {
/**
* Asynchronized B.A.S.E transaction job configuration.
*/
private io.shardingjdbc.transaction.config.AsyncSoftTransactionJobConfiguration jobConfig;
private AsyncSoftTransactionJobConfiguration jobConfig;

/**
* Get one data source by name.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
@RequiredArgsConstructor
public final class BASETransactionJobFactory {

private final BASETransactionJobConfiguration baseTransactionJobConfiguration;
private final BASETransactionJobConfiguration baseTransactionJobConfig;

/**
* start job.
Expand All @@ -53,16 +53,16 @@ public void start() throws SchedulerException {
}

private JobDetail buildJobDetail() {
JobDetail jobDetail = JobBuilder.newJob(BASETransactionJob.class).withIdentity(baseTransactionJobConfiguration.getJobConfig().getName() + "-Job").build();
jobDetail.getJobDataMap().put("baseTransactionJobConfiguration", baseTransactionJobConfiguration);
JobDetail jobDetail = JobBuilder.newJob(BASETransactionJob.class).withIdentity(baseTransactionJobConfig.getJobConfig().getName() + "-Job").build();
jobDetail.getJobDataMap().put("baseTransactionJobConfiguration", baseTransactionJobConfig);
jobDetail.getJobDataMap().put("transactionLogStorage",
TransactionLogStorageFactory.createTransactionLogStorage(new RdbTransactionLogDataSource(baseTransactionJobConfiguration.getDefaultTransactionLogDataSource())));
TransactionLogStorageFactory.createTransactionLogStorage(new RdbTransactionLogDataSource(baseTransactionJobConfig.getDefaultTransactionLogDataSource())));
return jobDetail;
}

private Trigger buildTrigger() {
return TriggerBuilder.newTrigger()
.withIdentity(baseTransactionJobConfiguration.getJobConfig().getName() + "-Trigger")
.withSchedule(CronScheduleBuilder.cronSchedule(baseTransactionJobConfiguration.getJobConfig().getCron())).build();
.withIdentity(baseTransactionJobConfig.getJobConfig().getName() + "-Trigger")
.withSchedule(CronScheduleBuilder.cronSchedule(baseTransactionJobConfig.getJobConfig().getCron())).build();
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#事务日志的数据源.
#Transaction log data source.
targetDataSource:
ds_trans_0: !!org.apache.commons.dbcp2.BasicDataSource
driverClassName: com.mysql.jdbc.Driver
Expand All @@ -11,7 +11,7 @@ targetDataSource:
username: root
password:

#事务日志的数据源.
#Data source for transaction manager.
transactionLogDataSource:
ds_trans: !!org.apache.commons.dbcp2.BasicDataSource
driverClassName: com.mysql.jdbc.Driver
Expand All @@ -20,17 +20,17 @@ transactionLogDataSource:
password:

jobConfig:
#作业名称
#job name
name: BASETransaction

#触发作业的cron表达式
#cron expression
cron: 0/5 * * * * ?

#每次作业获取的事务日志最大数量
#The maximum number of transaction logs fetch from each job
transactionLogFetchDataCount: 100

#事务送达的最大尝试次数.
#The maximum number of try to deliver a transaction.
maxDeliveryTryTimes: 3

#执行送达事务的延迟毫秒数,早于此间隔时间的入库事务才会被作业执行
#The milliseconds of delay delivery of the transaction.
maxDeliveryTryDelayMillis: 60000

0 comments on commit 8e635c5

Please sign in to comment.