Skip to content

Commit

Permalink
开发计划 和 bug fix
Browse files Browse the repository at this point in the history
  • Loading branch information
qq254963746 committed Jan 6, 2016
1 parent 7ef1609 commit df98b76
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 9 deletions.
4 changes: 2 additions & 2 deletions lts-core/src/main/java/com/lts/kv/DBImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ public void put(K key, V value) {
}

public void remove(K key) {
// 先移除缓存
dataCache.remove(key);

// 1. 先写Log
StoreTxLogPosition storeTxLogPosition = storeTxLogEngine.append(Operation.REMOVE, key);
Expand All @@ -115,8 +117,6 @@ public void remove(K key) {
// 3. 移除Data
dataBlockEngine.remove(storeTxLogPosition, indexItem);
}
// 4. 移除缓存
dataCache.remove(key);
}

public DBIterator<Entry<K, V>> iterator() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@ protected void clientStart() throws RemotingException {
.option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(//
defaultEventExecutorGroup, //
nettyCodecFactory.getEncoder(), //
nettyCodecFactory.getDecoder(), //
ch.pipeline().addLast(
defaultEventExecutorGroup,
nettyCodecFactory.getEncoder(),
nettyCodecFactory.getDecoder(),
new IdleStateHandler(remotingClientConfig.getReaderIdleTimeSeconds(), remotingClientConfig.getWriterIdleTimeSeconds(), remotingClientConfig.getClientChannelMaxIdleTimeSeconds()),//
new NettyConnectManageHandler(), //
new NettyConnectManageHandler(),
new NettyClientHandler());
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ public class JobPriorityBlockingQueue {

private int capacity;

private transient int size;
private transient JobPo[] queue;
private volatile int size;
private JobPo[] queue;
private ConcurrentHashSet<String/*jobId*/> JOB_ID_SET = new ConcurrentHashSet<String>();

public JobPriorityBlockingQueue(int capacity) {
Expand Down
4 changes: 4 additions & 0 deletions lts-startup/lts-startup-jobtracker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
</dependency>
<dependency>
<groupId>org.mongodb.morphia</groupId>
<artifactId>morphia</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
32 changes: 32 additions & 0 deletions 开发计划.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
####1.任务暂停和恢复功能(紧急,耗时短)
大概思路就是,新创建一个暂停任务表,当任务在页面上被用户点击暂停之后,就会被移动到这个暂停表里面,恢复的时候,从这个暂停表里面恢复到可执行表里面。当然这里面需要考虑任务正在执行等情况。
####2.任务实时触发功能(紧急,耗时短)
这个功能暂时定为只有Cron任务才会有这个功能,对于实时和定时任务,只需要修改下任务执行时间即可(如果定时任务也需要这个功能,内部也只是将任务执行时间和优先级设置下就行),对于Cron任务,就需要内部新添加一个任务去执行。
####3.LTS KV存储优化:
开始LTS自己的KV存储的目的主要是为了解决对第三方KV存储引擎的依赖,可以针对LTS的使用场景做各种优化和定制化。优化部分主要包含:

* 当每个DataBlock的存储entry数目低于一定阀值(譬如50%)的时候,需要将DataBlock进行“垃圾回收”,删除并整理一些逻辑上已经删除的数据块。
* 索引Index,目前只提供了一种内存方式的实现,还需要提供一种B+树的实现来解决大数据量下内存的局限性问题。目前之后Index snapshot来解决每次启动时缩短重放事务日志的问题。
* 对于事务日志TxLog,目前没有删除策略,需要增加一个策略:当TxLog日志文件到达一定数量之后,并且当前所存活的kv个数为0的时候,可以将这些TxLog移除到bak目录下,待bak目录个数到达一定个数或者一定时间之后再删除。
* 可以思考下数据压缩的问题

####4.任务监控和报警(紧急,耗时长):
监控方面主要着手几个方面:

* 对于各个节点的监控和报警
* 某一个group的节点全挂了之后,需要报警
* 对于某台机器的资源,譬如内存,cpu 等不足时报警
* 对于某台TaskTracker消费任务失败率达到80%(一个阀值)之后报警,并自动将他隔离。

* 对于任务执行指标的监控和报警
* 可以设置对某种任务消费异常的报警
* 对于任务队列中的任务消费延迟,堆积个数太多的时候进行告警
* 可以自定义收到特定的BizLog进行报警
* 报警形式可以是短信,邮件等

####5.LTS 的注册中心
可以参考diamond 和 RokectMQ的nameserver等。无非就是一个pub/sub
####6.zookeeper客户端的封装
主要是为了去除对zkClient和curator的依赖
####7.LTS nio框架的实现
主要是为了去除对netty和mina的依赖(群主正在开发中)

0 comments on commit df98b76

Please sign in to comment.