Skip to content

Commit

Permalink
Start working on livestreaming support. Currently using livestreamer …
Browse files Browse the repository at this point in the history
…but in

future switch to streamlink. Add testing.

- Add validation before process is launched & enable garbage collection.

Eventually:

- Export models as a zip file.

- Implement training detection model using YOLO

- Implement location net and clustering using PQKMeans

- Implement fine-tuning / training face recognition models

- Pre-condition checks, and failed to launch error messages for processes.
  • Loading branch information
akshay bhat committed Feb 5, 2018
1 parent 16b2a3d commit b504a5b
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 7 deletions.
4 changes: 3 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,6 @@ google-cloud-storage
urllib3
flask
redis
hiredis
hiredis
livestreamer
psutil
19 changes: 15 additions & 4 deletions server/dvaapp/operations/livestreaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from ..models import Segment
from ..fs import upload_file_to_remote
from django.conf import settings
from ..processing import process_next

try:
import psutil
Expand Down Expand Up @@ -36,6 +37,8 @@ def __init__(self,dv,event,wait_time=3,max_time=31536000,max_wait=120):
self.segment_frames_dict = {}
self.start_index = 0
self.csv_format = None
self.segments_batch_size = event.arguments.get('segments_batch_size',max_time)
self.segments_batch = set()
self.last_segment_time = time.time()

def detect_csv_segment_format(self):
Expand Down Expand Up @@ -95,8 +98,9 @@ def upload(self,final=False):
else:
segment_file_name = '{}{}.mp4'.format(self.segments_dir, self.last_processed_segment_index + 1)
segment_index = self.last_processed_segment_index + 1
self.process_segment(segment_index, segment_file_name)
segments_processed = True
if os.path.isfile(segment_file_name):
self.process_segment(segment_index, segment_file_name)
segments_processed = True
return segments_processed

def process_segment(self, segment_index, segment_file_name):
Expand Down Expand Up @@ -129,8 +133,12 @@ def process_segment(self, segment_index, segment_file_name):
upload_file_to_remote(ds.framelist_path(""))
self.dv.segments = self.last_processed_segment_index + 1
self.dv.save()
self.segments_batch.add(segment_index)
self.last_segment_time = time.time()
self.processed_segments.add(segment_file_name)
if (self.last_processed_segment_index % self.segments_batch_size == 0):
process_next(self.event.pk,map_filters=[{'segment_index__in':list(self.segments_batch)}])
self.segments_batch = set()

def poll(self):
while (time.time() - self.start_time < self.max_time) and (self.capture.poll() is None):
Expand All @@ -145,7 +153,10 @@ def poll(self):
break
logging.info("Killing capture process, no new segment found in last {} seconds".format(self.max_wait))
kill(self.capture.pid)
self.upload(final=True)
try:
self.upload(final=True)
except:
pass

def finalize(self):
pass
process_next(self.event.pk, map_filters=[{'segment_index__in': list(self.segments_batch)}])
5 changes: 3 additions & 2 deletions server/dvaapp/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ def launch_tasks(k, dt, inject_filters, map_filters = None, launch_type = ""):
return tids


def process_next(task_id,inject_filters=None,custom_next_tasks=None,sync=True,launch_next=True):
def process_next(task_id,inject_filters=None,custom_next_tasks=None,sync=True,launch_next=True,map_filters=None):
if custom_next_tasks is None:
custom_next_tasks = []
dt = TEvent.objects.get(pk=task_id)
Expand All @@ -263,7 +263,8 @@ def process_next(task_id,inject_filters=None,custom_next_tasks=None,sync=True,la
else:
launched += launch_tasks(k,dt,inject_filters,None,'sync')
for k in next_tasks+custom_next_tasks:
map_filters = get_map_filters(k,dt.video)
if map_filters is None:
map_filters = get_map_filters(k,dt.video)
launched += launch_tasks(k, dt, inject_filters,map_filters,'map')
for reduce_task in dt.arguments.get('reduce',[]):
next_task = TEvent.objects.create(video=dt.video, operation="perform_reduce",
Expand Down

0 comments on commit b504a5b

Please sign in to comment.