1 '''
2 Created on Oct 21, 2011
3
4 @author: bolme
5 '''
6
7 import time
8 from collections import defaultdict
9 import cProfile
10 import traceback
11 import shelve
12
13
16 return "<MissingData>"
17
20 self.default = default
21
23 tmp = str(self.default)
24 tmp = " ".join(tmp.split())
25 if len(tmp) > 40:
26 tmp = tmp[:37] + "..."
27 return "<DefaultData:%s>"%(tmp,)
28
29 EMPTY_DATA = EmptyData()
30
31
32
33
35 '''
36 This provides an interface and support functions for a video processing
37 task. Typically a subclass will overide the constructor which will
38 be used as a task factory and will create the task and specify the
39 arguments.
40 '''
41
42
44 '''
45 @param frame_id: the frame_id associated with this task.
46 @param args: specification of the data that is required to execute the task.
47 '''
48 self.frame_id = frame_id
49 self.args = args
50
51 self.task_id = None
52 self.label = self.__class__.__name__
53 if not hasattr(self,'subgraph'):
54 self.subgraph = None
55 if not hasattr(self,'color'):
56 self.color = None
57
58 self._arg_map = {}
59 self._added_args = 0
60 self._default_args = 0
61 for i in range(len(args)):
62 each = args[i]
63 dtype = each[0]
64 fid = each[1]
65 key = (dtype,fid)
66
67
68 if len(each) == 2:
69 self._arg_map[key] = EMPTY_DATA
70 elif len(each) == 3:
71 self._arg_map[key] = DefaultData(each[2])
72 self._default_args += 1
73 else:
74 raise ValueError("Argument should have 2 or 3 values: %s"%each)
75
76 self.collected_args = [False for each in self.args]
77 self.processed_args = [each for each in self.args]
78 self.distributable = False
79 self.is_ready = False
80
81
83 '''
84 Check to see if the data item is needed for this task. If it is then keep a reference.
85 '''
86
87 key = (data_item.getType(),data_item.getFrameId())
88
89
90 if self._arg_map.has_key(key):
91 curr_val = self._arg_map[key]
92
93
94 if curr_val == EMPTY_DATA:
95 self._arg_map[key] = data_item.getData()
96 self._added_args += 1
97 return True
98
99
100 elif isinstance(curr_val,DefaultData):
101 self._arg_map[key] = data_item.getData()
102 self._added_args += 1
103 self._default_args -= 1
104 assert self._default_args >= 0
105 return True
106 return False
107
108
110 '''
111 Returns True if this task is ready to run.
112 '''
113 return self._added_args == len(self._arg_map)
114
115
117 '''
118 Returns True if this task could run with the default arguments.
119 '''
120 return self._added_args + self._default_args == len(self._arg_map)
121
122
124
125 args = []
126 for i in range(len(self.args)):
127 each = self.args[i]
128 key = (each[0],each[1])
129 if isinstance(self._arg_map[key],DefaultData):
130 args.append(self._arg_map[key].default)
131 else:
132 args.append(self._arg_map[key])
133
134 return self.execute(*args)
135
136
138 '''
139 @returns: the frame_id associated with this task.
140 '''
141 return self.frame_id
142
144 '''
145 @returns: the list of required data.
146 '''
147 return self.args
148
149 - def execute(self, *args, **kwargs):
150 '''
151 This is an abstract method that needs to be implemented in subclasses.
152 One argument is suppled for each item in the required arguments. This
153 method should return a list of new data items. If no data is
154 generated by this method an empty list should be returned.
155 '''
156 raise NotImplementedError("Abstract Method")
157
159 print "VideoTask {%s:%d}"%(self.__class__.__name__,self.getFrameId())
160 for key in self._arg_map.keys():
161 dtype,frame_id = key
162
163 if self._arg_map[key] is EMPTY_DATA or isinstance(self._arg_map[key],DefaultData):
164 print " Argument <%s,%d> -> %s"%(dtype,frame_id,str(self._arg_map[key]))
165
166 else:
167 tmp = str(self._arg_map[key])
168 tmp = " ".join(tmp.split())
169 if len(tmp) > 40:
170 tmp = tmp[:37] + "..."
171 print " Argument <%s,%d> -> %s"%(dtype,frame_id,tmp)
172
173
175 '''
176 This class keeps track of data items and when they are used.
177 '''
179 self._data_type = data_tuple[0]
180 self._frame_id = data_tuple[1]
181 self._data = data_tuple[2]
182 self._touched = 0
183
185 ''' Get the item type. '''
186 return self._data_type
187
189 ''' Get the frame id. '''
190 return self._frame_id
191
193 ''' Get the actual data. '''
194 return self._data
195
197 ''' Get the key. '''
198 return (self._data_type,self._frame_id)
199
201 ''' Count the number of times this data was touched. '''
202 self._touched += 1
203
205 ''' Return the number of times the data was touched. '''
206 return self._touched
207
209 return "_VideoDataItem((%s,%s,%s)"%(self._data_type,self._frame_id,self._data)
210
211
212
214 '''
215 Each task_queue item should have three items (task_id,frame_id,command/task).
216 the command "quit" is used to stop the process.
217
218 The vtmProcessor will return (task_id, frame_id, results). If there is an exception
219 then the result will be replaced by the exception and a stack trace will be printed.
220 '''
221
222 while True:
223 item = task_queue.get()
224 try:
225 task_id,frame_id,task = item
226
227 result = task.run()
228
229 results_queue.put((task_id,frame_id,result))
230
231 except Exception, error:
232 traceback.print_exc()
233 results_queue.put((task_id,frame_id,error))
234
235
236
237
238
239
241 '''
242 The framework provide by this class will allow complex video processing
243 systems to be constructed from simple tasks. Often video processing
244 loops can be complicated because data needs to persist across many frame
245 and many operations or tasks need to be completed to solve a video analysis
246 problem. This class allows for many small and simple tasks to be managed
247 in a way that can produce a complex and powerful system. #
248
249 Tasks request only the data they need, which keeps the complexity of tasks
250 as simple as possible. This also reduces the coupling between tasks and
251 eliminates complex video processing loops. The video task manager handles
252 much of the complexity of the video processing system like data buffering,
253 and insures that each task gets its required data. #
254
255 This class manages tasks that are run on video frames. The video task
256 manager maintains a list of data objects and task objects. Each task is
257 a listener for data objects. When the data objects are avalible required
258 to execute a task the tasks execute method will be called and the required
259 data items will be passed as arguments. #
260
261 New frames are added using the addFrame method. When a frame is added
262 it creates a data item that includes a frame_id, a data type of "FRAME",
263 and a pv.Image that contains the frame data. Tasks can register to
264 receive that frame data or any data products of other tasks and when
265 that data becomes available the task will be executed.
266 '''
267
268 - def __init__(self,debug_level=0, buffer_size=10, show = False):
269 '''
270 Create a task manager.
271
272 @param debug_level: 0=quiet, 1=errors, 2=warnings, 3=info, 4=verbose
273 @type debug_level: int
274 @param buffer_size: the size of the frame and data buffer.
275 @type buffer_size: int
276 '''
277 self.debug_level = debug_level
278
279
280 self.frame_id = 0
281 self.task_list = []
282 self.task_factories = []
283 self.buffer_size = buffer_size
284
285 self.frame_list = []
286 self.show = show
287
288
289 self.flow = defaultdict(set)
290 self.task_set = set()
291 self.data_set = set((('FRAME',None),('LAST_FRAME',None),))
292 self.task_data = defaultdict(dict)
293 self.task_id = 0
294
295 self.lastFrameCreated = 0
296
297 self.recording_shelf = None
298 self.playback_shelf = None
299 self.recording_filter = None
300 self.task_filter = None
301 self.playback_filter = None
302
303 if self.debug_level >= 3:
304 print "TaskManager[INFO]: Initialized"
305
306
308 '''
309 This function add a task factory function to the video task manager.
310 The function is called once for every frame processed by the
311 VideoTaskManager. This function should take one argument which
312 is the frame_id of that frame. The task factory should return an
313 instance of the VideoTask class that will perform processing on this
314 frame. There are three options for implementing a task factory. #
315 - A class object for a VideoTask which has a constructor that takes
316 a frame_id as an argument. When called the constructor for that
317 class and will create a task.
318 - A function that takes a frame id argument. The function can
319 create and return a task.
320 - Any other object that implements the __call__ method which
321 returns a task instance.
322
323 Any additional arguments or keyword arguments passed to this
324 to this function will be pased after the frame_id argument
325 to the task factory. #
326
327 @param task_factory: a function or callible object that returns a task.
328 @type task_factory: callable
329 @param profile: Keyword argument. If true, profile data will be
330 generated for each call to this task.
331 @type profile: True | False
332 '''
333 self.task_id += 1
334 profile = False
335 if kwargs.has_key('profile'):
336 profile = kwargs['profile']
337 del kwargs['profile']
338 self.task_factories.append((task_factory,args,kwargs,profile,self.task_id))
339
340
342 '''
343 Adds a new frame to the task manager and then start processing.
344
345 @param frame: the next frame of video.
346 @type frame: pv.Image
347 '''
348
349 start = time.time()
350
351 frame_data = _VideoDataItem(("FRAME",self.frame_id,frame))
352 self._createTasksForFrame(self.frame_id)
353 self.addDataItem(frame_data)
354 last_data = _VideoDataItem(("LAST_FRAME",self.frame_id-1,False))
355 self.addDataItem(last_data)
356 self.frame_list.append(frame_data)
357
358
359 if self.playback_shelf != None and self.playback_shelf.has_key(str(self.frame_id)):
360 data_items = self.playback_shelf[str(self.frame_id)]
361 for each in data_items:
362 if self.playback_filter==None or each.getType() in self.playback_filter:
363 self.addDataItem(each)
364 self.data_set.add((each.getKey()[0],None))
365 self.flow[('Playback',each.getType())].add(0)
366
367
368 self._runTasks()
369
370 if self.recording_shelf != None:
371 self.recording_shelf.sync()
372
373
374
375
376 stop = time.time()
377
378
379 self.frame_id += 1
380
381 self.showFrames(ilog=ilog)
382
383 if self.debug_level >= 3:
384 print "TaskManager[INFO]: Frame Processing Time=%0.3fms"%(1000*(stop-start),)
385
395
396
397
398
400 '''
401 Process any new data items and associate them with tasks.
402 '''
403 if self.recording_shelf != None:
404 frame_id = str(self.frame_id)
405 if not self.recording_shelf.has_key(frame_id):
406 self.recording_shelf[frame_id] = []
407 if self.recording_filter == None or data_item.getType() in self.recording_filter:
408 self.recording_shelf[frame_id].append(data_item)
409
410 for task in self.task_list:
411 was_added = task.addData(data_item)
412 if was_added:
413
414 self.flow[(data_item.getKey()[0],task.task_id)].add(data_item.getKey()[1]-task.getFrameId())
415
416
418 '''
419 This calls the task factories to create tasks for the current frame.
420 '''
421 while self.lastFrameCreated < frame_id + self.buffer_size:
422 start = time.time()
423 count = 0
424 for factory,args,kwargs,profile,task_id in self.task_factories:
425 task = factory(self.lastFrameCreated,*args,**kwargs)
426 task.task_id=task_id
427 self.task_data[task.task_id]['class_name'] = task.__class__.__name__
428
429 task.profile=profile
430 count += 1
431
432 if self.task_filter == None or task.__class__.__name__ in self.task_filter:
433 self.task_list += [task]
434 stop = time.time() - start
435 if self.debug_level >= 3:
436 print "TaskManager[INFO]: Created %d new tasks for frame %s. Total Tasks=%d. Time=%0.2fms"%(count,self.lastFrameCreated,len(self.task_list),stop*1000)
437 self.lastFrameCreated += 1
438
440 '''
441 Run any tasks that have all data available.
442 '''
443 if self.debug_level >= 3: print "TaskManager[INFO]: Running Tasks..."
444 while True:
445 start_count = len(self.task_list)
446 remaining_tasks = []
447 for task in self.task_list:
448 if self._evaluateTask(task,flush=flush):
449 remaining_tasks.append(task)
450 self.task_list = remaining_tasks
451 if start_count == len(self.task_list):
452 break
453
454
456 '''
457 Run all tasks that can be run and then finish up. The LAST_FRAME data
458 item will be set to true for the last frame inserted.
459 '''
460 last_data = _VideoDataItem(("LAST_FRAME",self.frame_id-1,True))
461 self.addDataItem(last_data)
462
463 self._runTasks(flush=True)
464
466 '''
467 Attempts to run a task. This is intended to be run within a filter operation.
468
469 @returns: false if task should be deleted and true otherwise.
470 '''
471 self.task_set.add(task.task_id)
472
473
474 should_run = False
475
476 if task.ready():
477 should_run = True
478 elif (flush or self.frame_id - task.getFrameId() > self.buffer_size) and task.couldRun():
479 should_run = True
480 elif (flush or self.frame_id - task.getFrameId() > self.buffer_size) and not task.couldRun():
481 if self.debug_level >= 2:
482 print "TaskManager[WARNING]: Task %s for frame %d was not executed."%(task,task.getFrameId())
483 task.printInfo()
484
485
486 return False
487
488
489 if not should_run:
490 return True
491
492
493 start = time.time()
494
495
496 if task.profile:
497 prof = cProfile.Profile()
498 prof.enable()
499
500
501 result = task.run()
502
503
504 if task.profile:
505 prof.disable()
506 print
507 print "Profiled task:",task.__class__.__name__
508 prof.print_stats('time')
509 print
510
511
512 try:
513 len(result)
514 except:
515 raise Exception("Task did not return a valid list of data.\n Task: %s\n Data:%s"%(task,result))
516
517
518 for each in result:
519 self.flow[(task.task_id,each[0])].add(0)
520 self.data_set.add((each[0],task.subgraph))
521
522
523 for i in range(len(task.collected_args)):
524 if task.collected_args[i]:
525 each = task.processed_args[i]
526 self.flow[(each.getKey()[0],task.task_id)].add(each.getKey()[1]-task.getFrameId())
527 self.data_set.add((each.getKey()[0],task.subgraph))
528
529
530
531 for data_item in result:
532 if len(data_item) != 3:
533 raise Exception("Task returned a data item that does not have 3 elements.\n Task: %s\n Data: %s"%(task,data_item))
534 data_item = _VideoDataItem(data_item)
535 self.addDataItem(data_item)
536 stop = time.time() - start
537 if self.debug_level >= 3:
538 print "TaskManager[INFO]: Evaluate task %s for frame %d. Time=%0.2fms"%(task,task.getFrameId(),stop*1000)
539
540
541 if not self.task_data[task.task_id].has_key('time_sum'):
542 self.task_data[task.task_id]['time_sum'] = 0.0
543 self.task_data[task.task_id]['call_count'] = 0
544 self.task_data[task.task_id]['time_sum'] += stop
545 self.task_data[task.task_id]['call_count'] += 1
546 self.task_data[task.task_id]['color'] = task.color
547 self.task_data[task.task_id]['subgraph'] = task.subgraph
548
549
550 return False
551
552
553 - def _remainingTasksForFrame(self,frame_id):
554 '''
555 @returns: the number of tasks that need to be run for this frame.
556 '''
557 count = 0
558 for task in self.task_list:
559 if task.getFrameId() == frame_id:
560 count += 1
561 return count
562
563
565 '''
566 Show any frames with no remaining tasks.
567 '''
568 while len(self.frame_list) > 0:
569 frame_data = self.frame_list[0]
570 frame_id = frame_data.getFrameId()
571 frame = frame_data.getData()
572 task_count = self._remainingTasksForFrame(frame_id)
573
574 if task_count == 0:
575 if self.show:
576 frame.show(delay=1)
577 if ilog != None:
578 ilog(frame,ext='jpg')
579 del self.frame_list[0]
580 else:
581 break
582
584 '''
585 Set up an output file for recording.
586 '''
587 assert self.playback_shelf == None
588 self.recording_shelf = shelve.open(filename, flag='n', protocol=2, writeback=True)
589
591 '''
592 Set up an input file for playback.
593 '''
594 assert self.recording_shelf == None
595 self.playback_shelf = shelve.open(filename, flag='r', protocol=2, writeback=False)
596
598 '''
599 Only recorded data_types in the list.
600 '''
601 self.recording_filter = set(data_types)
602
604 '''
605 Only generate tasks in the list.
606 '''
607 self.task_filter = set(task_types)
608
610 '''
611 Only playback data_types in the list.
612 '''
613 self.playback_filter = set(data_types)
614
616 '''
617 This uses runtime analysis to create a dataflow graph for this VTM.
618 '''
619 import pydot
620 import pyvision as pv
621 import PIL.Image
622 from cStringIO import StringIO
623
624 def formatNum(n):
625 '''
626 This formats frame offsets correctly: -1,0,+1
627 '''
628 if n == 0:
629 return '0'
630 else:
631 return "%+d"%n
632
633 def record_strings(my_list):
634 return '{''}'
635
636
637 graph = pydot.Dot(graph_type='digraph',nodesep=.3,ranksep=.5)
638 graph.add_node(pydot.Node("Data Input",shape='invhouse',style='filled',fillcolor='#ffCC99'))
639 graph.add_node(pydot.Node("Video Input",shape='invhouse',style='filled',fillcolor='#ffCC99'))
640 graph.add_edge(pydot.Edge("Video Input","FRAME"))
641 graph.add_edge(pydot.Edge("Video Input","LAST_FRAME"))
642
643 if self.playback_shelf != None:
644 graph.add_node(pydot.Node("Playback",shape='invhouse',style='filled',fillcolor='#ffCC99'))
645
646 subgraphs = {None:graph}
647
648
649 for each in self.task_set:
650 if self.task_data[each].has_key('call_count'):
651 class_name = self.task_data[each]['class_name']
652 call_count = self.task_data[each]['call_count']
653 mean_time = self.task_data[each]['time_sum']/call_count
654 node_label = "{" + " | ".join([class_name,
655 "Time=%0.2fms"%(mean_time*1000.0,),
656 "Calls=%d"%(call_count,),
657 ]) + "}"
658 color = '#99CC99'
659 print each, self.task_data[each]
660 if self.task_data[each]['color'] is not None:
661 color = self.task_data[each]['color']
662 subgraph = self.task_data[each]['subgraph']
663 subgraph_name = subgraph
664 if subgraph_name != None:
665 subgraph_name = "_".join(subgraph.split())
666 if not subgraphs.has_key(subgraph):
667 print "adding subgraph",subgraph
668 subgraphs[subgraph_name] = pydot.Cluster(subgraph_name,label=subgraph,shape='box',style='filled',fillcolor='#DDDDDD',nodesep=1.0)
669 subgraphs[None].add_subgraph(subgraphs[subgraph_name])
670 print "adding node",each,subgraph
671 subgraphs[subgraph_name].add_node(pydot.Node(each,label=node_label,shape='record',style='filled',fillcolor=color))
672 else:
673
674 call_count = 0
675 mean_time = -1
676 class_name = self.task_data[each]['class_name']
677 node_label = "{" + " | ".join([class_name,
678 "Time=%0.2fms"%(mean_time*1000.0,),
679 "Calls=%d"%(call_count,),
680 ]) + "}"
681 graph.add_node(pydot.Node(each,label=node_label,shape='record',style='filled',fillcolor='#CC3333'))
682
683
684 for each,subgraph in self.data_set:
685 subgraph_name = subgraph
686 if subgraph_name != None:
687 subgraph_name = "_".join(subgraph.split())
688 subgraphs[subgraph_name].add_node(pydot.Node(each,shape='box',style='rounded, filled',fillcolor='#9999ff'))
689
690
691 for each,offsets in self.flow.iteritems():
692 offsets = list(offsets)
693 if len(offsets) == 1 and list(offsets)[0] == 0:
694 graph.add_edge(pydot.Edge(each[0],each[1]))
695 else:
696 offsets = formatOffsets(offsets)
697 graph.add_edge(pydot.Edge(each[0],each[1],label=offsets,label_scheme=2,labeldistance=2,labelfloat=False))
698
699
700 if as_image:
701 data = graph.create_png()
702 f = StringIO(data)
703 im = pv.Image(PIL.Image.open(f))
704 return im
705 return graph
706
714
716 offsets.sort()
717 group = []
718 groups = [group]
719 for each in offsets:
720 if len(group) == 0 or each == group[-1]+1:
721 group.append(each)
722 else:
723 group = [each]
724 groups.append(group)
725
726
727 return groups
728
733
734 if __name__ == '__main__':
735 offsets = [-3,-2,-1,0,1,3,4,5,6,7,8,10,15,20,21,22,23,-21,-22,56,57]
736 offsets.sort()
737
738 print offsets
739 groups = groupOffsets(offsets)
740 print groups
741 print ",".join([formatGroup(each) for each in groups])
742