Skip to content

Commit

Permalink
Seperate decoding for nsq msgs at different forwarders(Quick fix)
Browse files Browse the repository at this point in the history
  • Loading branch information
supriyopaul committed Mar 19, 2018
1 parent a66eea2 commit 56a1581
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 5 deletions.
1 change: 0 additions & 1 deletion logagg/forwarder.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ def _send_msgs_to_target(self, target, msgs):

def _write_messages(self, msgs):
fn = self._send_msgs_to_target
msgs = [json.loads(m.body) for m in msgs]

jobs = []
for t in self.targets:
Expand Down
8 changes: 4 additions & 4 deletions logagg/forwarders.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,10 @@ def _ensure_connection(self):
def _parse_msg_for_mongodb(self, msgs):
msgs_list = []
#TODO: We need to do this by using iteration object.
msgs = [json.loads(m.body.decode(encoding='utf-8',errors='strict')) for m in msgs]
for msg in msgs:
msg_body = json.loads(msg.body.decode(encoding='utf-8',
errors='strict'))
msg_body['_id'] = msg_body.pop('id')
msgs_list.append(msg_body)
msg['_id'] = msg.pop('id')
msgs_list.append(msg)
return msgs_list

def _insert_1by1(self, records):
Expand Down Expand Up @@ -141,6 +140,7 @@ def _tag_and_field_maker(self, event):

def _parse_msg_for_influxdb(self, msgs):
series = []
msgs = [json.loads(m.body.decode(encoding='utf-8',errors='strict')) for m in msgs]

for msg in msgs:
if msg.get('error') == True:
Expand Down

0 comments on commit 56a1581

Please sign in to comment.