Skip to content

Commit

Permalink
get rid of hard coded configurations
Browse files Browse the repository at this point in the history
  • Loading branch information
datamut committed Sep 23, 2016
1 parent 2d27b20 commit be80d41
Show file tree
Hide file tree
Showing 12 changed files with 54 additions and 45 deletions.
26 changes: 7 additions & 19 deletions article-crawler/crawler/kafka_consume_spider.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@
import kafka
import scrapy

KAFKA_TOPIC_ID = 'KAFKA_TOPIC_ID'
KAFKA_GROUP_ID = 'KAFKA_GROUP_ID'
KAFKA_BOOTSTRAP_SERVERS = 'KAFKA_BOOTSTRAP_SERVERS'


class KafkaConsumeSpider(scrapy.Spider):
"""This abstract spider integrate Kafka consumer with scrapy.Spider.
Expand All @@ -33,22 +29,13 @@ def set_kafka(self, settings):
Raise ValueError when topic, group, or bootstrap_servers is not
specified.
"""
topic = settings.get(KAFKA_TOPIC_ID, None)
if not topic:
raise ValueError('{} setting is required'.format(KAFKA_TOPIC_ID))

group = settings.get(KAFKA_GROUP_ID, None)
if not group:
raise ValueError('{} setting is required'.format(KAFKA_GROUP_ID))

servers = settings.get(KAFKA_BOOTSTRAP_SERVERS, None)
if not servers:
raise ValueError(
'{} setting is required'.format(KAFKA_BOOTSTRAP_SERVERS))
bootstrap_servers = servers.split(',')
topic = settings.get('KAFKA_TOPIC_ID')
group = settings.get('KAFKA_GROUP_ID')
servers = settings.get('KAFKA_BOOTSTRAP_SERVERS')
kafka_servers = servers.split(',')

self.consumer = kafka.KafkaConsumer(topic, group_id=group,
bootstrap_servers=bootstrap_servers)
bootstrap_servers=kafka_servers)
self.crawler.signals.connect(self.spider_idle,
scrapy.signals.spider_idle)
self.crawler.signals.connect(self.item_scraped,
Expand All @@ -64,7 +51,8 @@ def process_request(self):
for record in msg_list:
url = record.value
if url:
url = url.decode()[1:-1] # TODO: remove " from kafka?
url = url.decode()[
1:-1] # TODO: remove " from kafka-conf?
requests = self.make_requests_from_url(url)
if requests:
self.crawler.engine.crawl(requests, spider=self)
Expand Down
1 change: 1 addition & 0 deletions article-crawler/crawler/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
# See: http://doc.scrapy.org/en/latest/topics/item-pipeline.html

import json
from scrapy.exceptions import DropItem


class ContentWriterPipeline(object):
Expand Down
20 changes: 16 additions & 4 deletions article-crawler/crawler/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
# http://scrapy.readthedocs.org/en/latest/topics/downloader-middleware.html
# http://scrapy.readthedocs.org/en/latest/topics/spider-middleware.html

import os


BOT_NAME = 'crawler'

SPIDER_MODULES = ['crawler.spiders']
Expand Down Expand Up @@ -90,8 +93,17 @@
#HTTPCACHE_IGNORE_HTTP_CODES = []
#HTTPCACHE_STORAGE = 'scrapy.extensions.httpcache.FilesystemCacheStorage'

KAFKA_TOPIC_ID = 'whatsnews_topic_index'
KAFKA_GROUP_ID = 'whatsnews_group_index'
KAFKA_BOOTSTRAP_SERVERS = 'localhost:9092'
KAFKA_TOPIC_ID = os.environ.get('KAFKA_TOPIC_ID', None)
assert KAFKA_TOPIC_ID is not None, \
'Environment variable KAFKA_TOPIC_ID not found'

KAFKA_GROUP_ID = os.environ.get('KAFKA_GROUP_ID', None)
assert KAFKA_GROUP_ID is not None, \
'Environment variable KAFKA_GROUP_ID not found'

KAFKA_BOOTSTRAP_SERVERS = os.environ.get('KAFKA_BOOTSTRAP_SERVERS', None)
assert KAFKA_BOOTSTRAP_SERVERS is not None, \
'Environment variable KAFKA_BOOTSTRAP_SERVERS not found'

ARTICLE_OUT_FILE = '/data/project/whatsnews/article_out/crawl_articles.txt'
ARTICLE_OUT_FILE = os.environ.get('OUT_FILE', None)
assert ARTICLE_OUT_FILE is not None, 'Environment variable OUT_FILE not found'
15 changes: 0 additions & 15 deletions index-builder/indexer/message_consumer.py

This file was deleted.

31 changes: 25 additions & 6 deletions index-builder/indexer/mongodb_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,21 @@
"""

import json
from kafka import KafkaConsumer
import os
from pymongo import MongoClient
from pymongo.errors import DuplicateKeyError

from indexer.message_consumer import ArticleConsumer


def run_indexer(hosts):
def run_indexer(mongodb_hosts, kafka_topic, kafka_group, kafka_servers):
"""MongoDB index builder.
Consume/parse messages from Kafka and save articles to mongodb.
"""

consumer = ArticleConsumer.new_kafka_consumer()
with MongoClient(host=hosts) as mongo:
consumer = KafkaConsumer(kafka_topic, group_id=kafka_group,
bootstrap_servers=kafka_servers.split(','))

with MongoClient(host=mongodb_hosts) as mongo:
for message in consumer:
# TODO: how to deal with codec? or ensure utf-8 is correct
content_str = message.value.decode('utf-8').strip()
Expand All @@ -31,4 +33,21 @@ def run_indexer(hosts):


if __name__ == '__main__':
run_indexer('mongodb://localhost:27017')
_mongodb_hosts = os.environ.get('MONGODB_HOSTS')
assert _mongodb_hosts is not None, \
'Environment variable MONGODB_HOSTS not found'

_kafka_topic = os.environ.get('KAFKA_TOPIC')
assert _kafka_topic is not None, \
'Environment variable KAFKA_TOPIC not found'

_kafka_group = os.environ.get('KAFKA_GROUP')
assert _kafka_group is not None, \
'Environment variable KAFKA_GROUP not found'

_kafka_servers = os.environ.get('KAFKA_SERVERS')
assert _kafka_servers is not None, \
'Environment variable KAFKA_SERVERS not found'

run_indexer(mongodb_hosts=_mongodb_hosts, kafka_topic=_kafka_topic,
kafka_group=_kafka_group, kafka_server=_kafka_servers)
6 changes: 5 additions & 1 deletion index-crawler/crawler/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
# http://scrapy.readthedocs.org/en/latest/topics/downloader-middleware.html
# http://scrapy.readthedocs.org/en/latest/topics/spider-middleware.html

import os


BOT_NAME = 'crawler'

SPIDER_MODULES = ['crawler.spiders']
Expand Down Expand Up @@ -90,4 +93,5 @@
#HTTPCACHE_IGNORE_HTTP_CODES = []
#HTTPCACHE_STORAGE = 'scrapy.extensions.httpcache.FilesystemCacheStorage'

INDEX_OUT_FILE = '/data/project/whatsnews/index_out/crawl_urls.txt'
INDEX_OUT_FILE = os.environ.get('OUT_FILE', None)
assert INDEX_OUT_FILE is not None, 'Environment variable OUT_FILE not found'
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.

0 comments on commit be80d41

Please sign in to comment.