Skip to content

Commit

Permalink
自动去重(根据URL),暂不支持参数 close #I193U2
Browse files Browse the repository at this point in the history
  • Loading branch information
javamxd committed Apr 11, 2020
1 parent 54cdef9 commit 2c78a18
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 6 deletions.
3 changes: 2 additions & 1 deletion db/spiderflow.sql
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ CREATE TABLE `sp_task` (
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8mb4;

/* v0.4.0 新增*/
/* v0.4.0 新增 */
DROP TABLE IF EXISTS `sp_function`;
CREATE TABLE `sp_function` (
`id` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
Expand All @@ -65,6 +65,7 @@ CREATE TABLE `sp_function` (
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;

/* v0.5.0 新增 */
DROP TABLE IF EXISTS `sp_flow_notice`;
CREATE TABLE `sp_flow_notice` (
`id` varchar(32) NOT NULL,
Expand Down
5 changes: 5 additions & 0 deletions spider-flow-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@
<groupId>org.spiderflow</groupId>
<artifactId>spider-flow-api</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>28.2-jre</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package org.spiderflow.core.executor.shape;

import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnel;
import com.google.common.hash.Funnels;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
Expand All @@ -9,21 +12,21 @@
import org.spiderflow.Grammerable;
import org.spiderflow.context.CookieContext;
import org.spiderflow.context.SpiderContext;
import org.spiderflow.core.executor.function.MD5FunctionExecutor;
import org.spiderflow.core.io.HttpRequest;
import org.spiderflow.core.io.HttpResponse;
import org.spiderflow.core.utils.ExpressionUtils;
import org.spiderflow.executor.ShapeExecutor;
import org.spiderflow.io.SpiderResponse;
import org.spiderflow.listener.SpiderListener;
import org.spiderflow.model.Grammer;
import org.spiderflow.model.SpiderNode;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.*;
import java.nio.charset.Charset;
import java.util.*;

/**
Expand All @@ -32,7 +35,7 @@
*
*/
@Component
public class RequestExecutor implements ShapeExecutor,Grammerable{
public class RequestExecutor implements ShapeExecutor,Grammerable, SpiderListener {

public static final String SLEEP = "sleep";

Expand Down Expand Up @@ -84,9 +87,19 @@ public class RequestExecutor implements ShapeExecutor,Grammerable{

public static final String COOKIE_AUTO_SET = "cookie-auto-set";

public static final String REPEAT_ENABLE = "repeat-enable";

public static final String BLOOM_FILTER_KEY = "_bloomfilter";

@Value("${spider.workspace}")
private String workspcace;

@Value("${spider.bloomfilter.capacity:5000000}")
private Integer capacity;

@Value("${spider.bloomfilter.error-rate:0.00001}")
private Double errorRate;

private static final Logger logger = LoggerFactory.getLogger(RequestExecutor.class);

@Override
Expand Down Expand Up @@ -127,6 +140,7 @@ public void execute(SpiderNode node, SpiderContext context, Map<String,Object> v
logger.error("设置延迟时间失败", t);
}
}
BloomFilter<String> bloomFilter = null;
//重试次数
int retryCount = NumberUtils.toInt(node.getStringJsonValue(RETRY_COUNT), 0) + 1;
//重试间隔时间,单位毫秒
Expand All @@ -142,6 +156,15 @@ public void execute(SpiderNode node, SpiderContext context, Map<String,Object> v
logger.error("设置请求url出错,异常信息", e);
ExceptionUtils.wrapAndThrow(e);
}
if("1".equalsIgnoreCase(node.getStringJsonValue(REPEAT_ENABLE,"0"))){
bloomFilter = createBloomFilter(context);
synchronized (bloomFilter){
if(bloomFilter.mightContain(MD5FunctionExecutor.string(url))){
logger.info("过滤重复URL:{}",url);
return;
}
}
}
context.pause(node.getNodeId(),"common",URL,url);
logger.info("设置请求url:{}", url);
request.url(url);
Expand Down Expand Up @@ -236,6 +259,11 @@ public void execute(SpiderNode node, SpiderContext context, Map<String,Object> v
HttpResponse response = request.execute();
successed = response.getStatusCode() == 200;
if(successed){
if(bloomFilter != null){
synchronized (bloomFilter){
bloomFilter.put(MD5FunctionExecutor.string(url));
}
}
String charset = node.getStringJsonValue(RESPONSE_CHARSET);
if(StringUtils.isNotBlank(charset)){
response.setCharset(charset);
Expand Down Expand Up @@ -407,4 +435,47 @@ public List<Grammer> grammers() {
grammers.add(grammer);
return grammers;
}

@Override
public void beforeStart(SpiderContext context) {

}

private BloomFilter<String> createBloomFilter(SpiderContext context){
BloomFilter<String> filter = context.get(BLOOM_FILTER_KEY);
if(filter == null){
Funnel<CharSequence> funnel = Funnels.stringFunnel(Charset.forName("UTF-8"));
String fileName = context.getFlowId() + File.separator + "url.bf";
File file = new File(workspcace,fileName);
if(file.exists()){
try(FileInputStream fis = new FileInputStream(file)){
filter = BloomFilter.readFrom(fis,funnel);
} catch (IOException e) {
logger.error("读取布隆过滤器出错",e);
}

}else{
filter = BloomFilter.create(funnel,capacity,errorRate);
}
context.put(BLOOM_FILTER_KEY,filter);
}
return filter;
}

@Override
public void afterEnd(SpiderContext context) {
BloomFilter<String> filter = context.get(BLOOM_FILTER_KEY);
if(filter != null){
File file = new File(workspcace,context.getFlowId() + File.separator + "url.bf");
if(!file.getParentFile().exists()){
file.getParentFile().mkdirs();
}
try(FileOutputStream fos = new FileOutputStream(file)){
filter.writeTo(fos);
fos.flush();
}catch(IOException e){
logger.error("保存布隆过滤器出错",e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@
<input type="checkbox" title="跟随重定向" value="follow-redirect" lay-skin="primary" {{d.data.object['follow-redirect'] == '0' ? '' : 'checked'}}/>
<input type="checkbox" title="TLS证书验证" value="tls-validate" lay-skin="primary" {{d.data.object['tls-validate'] == '0' ? '' : 'checked'}}/>
<input type="checkbox" title="自动管理Cookie" value="cookie-auto-set" lay-skin="primary" {{d.data.object['cookie-auto-set'] == '0' ? '' : 'checked'}}/>
<input type="checkbox" title="自动去重" value="repeat-enable" lay-skin="primary" {{d.data.object['repeat-enable'] == '1' ? 'checked' : ''}}/>
</div>
</div>
</div>
Expand Down

0 comments on commit 2c78a18

Please sign in to comment.