-
Notifications
You must be signed in to change notification settings - Fork 6
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'master' of https://github.com/amurto/ms332_datapirates1
- Loading branch information
Showing
6 changed files
with
267 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
**/README.md | ||
**/Dockerfile | ||
**/docker-compose.yml | ||
**/.env |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
# This is a basic workflow to help you get started with Actions | ||
|
||
name: Deploy loadbalancer | ||
|
||
# Controls when the action will run. Triggers the workflow on push or pull request | ||
# events but only for the master branch | ||
on: | ||
push: | ||
branches: [ master ] | ||
|
||
|
||
# A workflow run is made up of one or more jobs that can run sequentially or in parallel | ||
jobs: | ||
# This workflow contains a single job called "build" | ||
build: | ||
# The type of runner that the job will run on | ||
runs-on: ubuntu-latest | ||
|
||
# Steps represent a sequence of tasks that will be executed as part of the job | ||
steps: | ||
# Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it | ||
- uses: actions/checkout@v2 | ||
|
||
- name: Run a one-line script | ||
run: echo Hello, world! | ||
|
||
|
||
- name: Deploy package to digitalocean | ||
uses: appleboy/ssh-action@master | ||
with: | ||
host: ${{ secrets.BALANCER_HOST }} | ||
username: ${{ secrets.BALANCER_USERNAME }} | ||
password: ${{ secrets.BALANCER_PASSWORD }} | ||
port: ${{ secrets.BALANCER_PORT }} | ||
script: | | ||
cd /app/gearstalk_load_balancer | ||
git pull | ||
docker-compose up -d --build |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
/env |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
FROM python:3.7-slim | ||
|
||
WORKDIR /app | ||
RUN apt-get update && apt-get -y dist-upgrade | ||
RUN apt install -y netcat | ||
COPY requirements.txt /app | ||
RUN pip install -r /app/requirements.txt | ||
|
||
COPY mediator.py /app | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
version: '3' | ||
services: | ||
main: | ||
container_name: main | ||
networks: | ||
- loadbalancer | ||
env_file: | ||
- .env | ||
build : . | ||
restart: always | ||
command: > | ||
/bin/sh -c " | ||
echo Waiting for rabbitmq service start...; | ||
while ! nc -z rabbit 5672; | ||
do | ||
sleep 1; | ||
done; | ||
echo Connected!; | ||
python mediator.py | ||
" | ||
rabbit: | ||
restart: always | ||
networks: | ||
- loadbalancer | ||
image: "bitnami/rabbitmq:3.7.17" | ||
container_name: rabbit | ||
ports: | ||
- "15672:15672" | ||
- "5672:5672" | ||
env_file: | ||
- .env | ||
|
||
networks: | ||
loadbalancer: {} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,176 @@ | ||
import pika | ||
import json | ||
import os | ||
import collections | ||
import ast | ||
import requests | ||
import pymongo | ||
from bson import ObjectId | ||
from datetime import datetime,timedelta | ||
from dotenv import load_dotenv | ||
|
||
load_dotenv() | ||
RABBITMQ_HOST = os.getenv("RABBITMQ_HOST") | ||
RABBITMQ_USERNAME = os.getenv("RABBITMQ_USERNAME") | ||
RABBITMQ_PASSWORD = os.getenv("RABBITMQ_PASSWORD") | ||
CONNECTION_STRING = os.getenv("MONGODB_STRING_CLOUD") | ||
MAIN_SERVER = os.getenv("MAIN_SERVER") | ||
|
||
|
||
client = pymongo.MongoClient(CONNECTION_STRING) | ||
db = client.get_database('gearstalk') | ||
|
||
|
||
features_pack = {} | ||
frame_rate = 1 | ||
|
||
|
||
|
||
'''------------------------------------------------ | ||
Saving into db | ||
--------------------------------------------------''' | ||
|
||
def save_frame(video_id,frame_output,timestamp,frame_sec): | ||
if video_id == None: | ||
status = False | ||
message = "No video_Id in param." | ||
return status,message | ||
else: | ||
features = db.features.find_one({ "video_id": video_id,"timestamp": timestamp}) | ||
# print(features) | ||
if features == None: | ||
frame_details = [{ | ||
"frame_sec" : frame_sec, | ||
"persons" : json.dumps(frame_output) | ||
}] | ||
db.features.insert_one({ | ||
"video_id": video_id, | ||
"timestamp": timestamp, | ||
"metadata" : frame_details | ||
}) | ||
status = True | ||
message = "Frame output successfully added to the db!!" | ||
else: | ||
frame_details = { | ||
"frame_sec" : frame_sec, | ||
"persons" : json.dumps(frame_output) | ||
} | ||
newvalues = { "$push": {"metadata" : frame_details }} | ||
result = db.features.find_one_and_update({ "_id": ObjectId(features['_id'])}, newvalues ) | ||
status = True | ||
message = "Frame output successfully added to the db!!" | ||
|
||
return status,message | ||
|
||
|
||
|
||
|
||
'''------------------------------------------------ | ||
Finding unique persons from the video | ||
--------------------------------------------------''' | ||
#todo | ||
#find the unique person in video (traverse sequentially) | ||
##find the similarity between 2 frames based on labels | ||
##check the similarity betwen them using box sizes | ||
#Save into db twice at 2 different locations (Whole video_output and unique_person) | ||
def UniquePersonSearch(video_id, object_data,timestamp): | ||
|
||
#Saving all the frames into the db | ||
# db.features.insert_many(object_data) | ||
|
||
#converting to 3d-Array | ||
array3d=[] | ||
array3d = [collections.Counter([ str(feature['labels']+feature['colors']) for feature in data['persons'] if feature is not []]) for data in object_data] | ||
# print(array3d) | ||
unique_person = [] | ||
|
||
#Finding the Unique ones | ||
for i in range(len(array3d)-1): | ||
person = array3d[i]-array3d[i+1] | ||
# print(person) | ||
if person: | ||
for k in person.keys() : | ||
unpack = ast.literal_eval(k) | ||
for _ in range(person[k]): | ||
new_timestamp = timestamp + timedelta(seconds=i*frame_rate) | ||
unique_person.append({'video_id': video_id,"frame_sec":i, "timestamp": new_timestamp, "date": str(new_timestamp.date()), "time": new_timestamp.strftime("%X") , 'labels': unpack[:(len(unpack)//2)], 'colors': unpack[(len(unpack)//2):]}) | ||
new_timestamp = timestamp + timedelta(seconds=(i+1)*frame_rate) | ||
for k in array3d[len(array3d)-1].keys(): | ||
unpack = ast.literal_eval(k) | ||
for _ in range(array3d[len(array3d)-1][k]): | ||
unique_person.append({'video_id': video_id,"frame_sec":i+1, "timestamp": new_timestamp, "date": str(new_timestamp.date()), "time": new_timestamp.strftime("%X") , 'labels': unpack[:(len(unpack)//2)], 'colors': unpack[(len(unpack)//2):]}) | ||
|
||
#Send to the main Server(gearstalk_baxkend1) | ||
print(unique_person) | ||
r = requests.post(MAIN_SERVER+"/process/FindUnique", data=json.dumps({"video_id": video_id, "unique_person":unique_person}) ) | ||
|
||
return "Your video is processed" | ||
|
||
|
||
|
||
#supporting functions | ||
def FindUnique(data): | ||
data = json.loads(data) | ||
video_id = data['video_id'] | ||
frame_sec = data['frame_sec'] | ||
timestamp = json.loads(data['timestamp']) | ||
total_frames = int(data['total_frames']) | ||
frame_details = json.loads(data['frame_details']) | ||
message = "Video Processing not over!!" | ||
new_timestamp = datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S") | ||
|
||
print(frame_sec) | ||
save_frame(video_id,frame_details,timestamp,frame_sec) | ||
|
||
if video_id in features_pack.keys(): | ||
features_pack[video_id][int(frame_sec//frame_rate)] = {"frame_sec":frame_sec,"persons":frame_details} | ||
else: | ||
arr = [None]*total_frames | ||
features_pack[video_id] = arr | ||
features_pack[video_id][int(frame_sec//frame_rate)] = {"frame_sec":frame_sec,"persons":frame_details} | ||
|
||
if None not in features_pack[video_id]: | ||
video_output = features_pack.pop(video_id) | ||
print(video_output) | ||
# db.features.insert_one({ | ||
# "video_id": video_id, | ||
# "timestamp": timestamp, | ||
# "metadata" : video_output | ||
# }) | ||
message = UniquePersonSearch(video_id,video_output,new_timestamp) | ||
|
||
|
||
|
||
'''----------------------------------------- | ||
Consuming packets from rabbitmq | ||
and adding it into a subprocess | ||
------------------------------------------''' | ||
|
||
def rabbitmq_consumer(): | ||
credentials = pika.PlainCredentials(RABBITMQ_USERNAME, RABBITMQ_PASSWORD) | ||
|
||
connection = pika.BlockingConnection( | ||
pika.ConnectionParameters(RABBITMQ_HOST, 5672, '/', credentials)) #load_balancer url/ip in (host) | ||
channel = connection.channel() | ||
|
||
channel.queue_declare(queue='frame_output') | ||
|
||
def callback(ch, method, properties, body): | ||
print(" [x] Received ") | ||
FindUnique(body) | ||
|
||
|
||
channel.basic_consume( | ||
queue='frame_output', on_message_callback=callback, auto_ack=True) | ||
|
||
print(' [*] Waiting for messages. To exit press CTRL+C') | ||
channel.start_consuming() | ||
|
||
|
||
|
||
if __name__ == '__main__': | ||
# app.run(host="0.0.0.0", debug=True, use_reloader=True, threaded=True) | ||
try: | ||
rabbitmq_consumer() | ||
except KeyboardInterrupt: | ||
quit = True |