RapidMiner Process: Searching and Analyzing a Video Stored in an Amazon S3 Bucket
joeanalytica
New Altair Community Member
Hi everyone.
sgenzer
This post builds and extends on the one posted by Scott Sgenzer (Using AWS Image ML inside a RapidMiner process).
Please note you need to follow the steps in the following links:
Link#1: Searching Stored Videos for Faces
Link#2: Analyzing a Video Stored in an Amazon S3 Bucket with Java or Python (SDK)
Link#3: Configuring Amazon Rekognition Video
Important Note about Video Formats and Storage :
Amazon Rekognition operations can analyze videos that are stored in Amazon S3 buckets. The video must be encoded using the H.264 codec. The supported file formats are MPEG-4 and MOV.
Note: For test purposes, upload a video that's no longer than 30 seconds in length.
Snapshot of the process:
Here is the xml:
<?xml version="1.0" encoding="UTF-8"?><process version="9.4.001">
<context>
<input/>
<output/>
<macros/>
</context>
<operator activated="true" class="process" compatibility="9.4.001" expanded="true" name="Process">
<parameter key="logverbosity" value="init"/>
<parameter key="random_seed" value="2001"/>
<parameter key="send_mail" value="never"/>
<parameter key="notification_email" value=""/>
<parameter key="process_duration_for_mail" value="30"/>
<parameter key="encoding" value="SYSTEM"/>
<process expanded="true">
<operator activated="true" class="open_file" compatibility="9.4.001" expanded="true" height="68" name="Open File" width="90" x="45" y="238">
<parameter key="resource_type" value="file"/>
<parameter key="filename" value="C:\Users\Administrator\Desktop\videos\jg.mp4"/>
</operator>
<operator activated="true" class="retrieve" compatibility="9.4.001" expanded="true" height="68" name="Retrieve" width="90" x="45" y="391">
<parameter key="repository_entry" value="/Connections/awsRekVideo"/>
</operator>
<operator activated="true" class="cloud_connectivity:write_amazons3" compatibility="9.4.000" expanded="true" height="82" name="Write Amazon S3" width="90" x="179" y="238">
<parameter key="connection_source" value="repository"/>
<parameter key="file" value="/my-rapidminer-photo-bucket01/jg.mp4"/>
</operator>
<operator activated="true" class="text:create_document" compatibility="8.2.000" expanded="true" height="68" name="Create Document" width="90" x="380" y="238">
<parameter key="text" value="#Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. #PDX-License-Identifier: MIT-0 (For details, see https://github.com/awsdocs/amazon-rekognition-developer-guide/blob/master/LICENSE-SAMPLECODE.) import boto3 import json import sys import time class VideoDetect: jobId = '' rek = boto3.client('rekognition') sqs = boto3.client('sqs') sns = boto3.client('sns') roleArn = '' bucket = '' video = '' startJobId = '' sqsQueueUrl = '' snsTopicArn = '' processType = '' def __init__(self, role, bucket, video): self.roleArn = role self.bucket = bucket self.video = video def GetSQSMessageSuccess(self): jobFound = False succeeded = False dotLine=0 while jobFound == False: sqsResponse = self.sqs.receive_message(QueueUrl=self.sqsQueueUrl, MessageAttributeNames=['ALL'], MaxNumberOfMessages=10) if sqsResponse: if 'Messages' not in sqsResponse: if dotLine<40: print('.', end='') dotLine=dotLine+1 else: print() dotLine=0 sys.stdout.flush() time.sleep(5) continue for message in sqsResponse['Messages']: notification = json.loads(message['Body']) rekMessage = json.loads(notification['Message']) print(rekMessage['JobId']) print(rekMessage['Status']) if rekMessage['JobId'] == self.startJobId: print('Matching Job Found:' + rekMessage['JobId']) jobFound = True if (rekMessage['Status']=='SUCCEEDED'): succeeded=True self.sqs.delete_message(QueueUrl=self.sqsQueueUrl, ReceiptHandle=message['ReceiptHandle']) else: print("Job didn't match:" + str(rekMessage['JobId']) + ' : ' + self.startJobId) # Delete the unknown message. Consider sending to dead letter queue self.sqs.delete_message(QueueUrl=self.sqsQueueUrl, ReceiptHandle=message['ReceiptHandle']) return succeeded def StartLabelDetection(self): response=self.rek.start_label_detection(Video={'S3Object': {'Bucket': self.bucket, 'Name': self.video}}, NotificationChannel={'RoleArn': self.roleArn, 'SNSTopicArn': self.snsTopicArn}) self.startJobId=response['JobId'] print('Start Job Id: ' + self.startJobId) def GetLabelDetectionResults(self): maxResults = 10 paginationToken = '' finished = False while finished == False: response = self.rek.get_label_detection(JobId=self.startJobId, MaxResults=maxResults, NextToken=paginationToken, SortBy='TIMESTAMP') print('Codec: ' + response['VideoMetadata']['Codec']) print('Duration: ' + str(response['VideoMetadata']['DurationMillis'])) print('Format: ' + response['VideoMetadata']['Format']) print('Frame rate: ' + str(response['VideoMetadata']['FrameRate'])) print() for labelDetection in response['Labels']: label=labelDetection['Label'] print("Timestamp: " + str(labelDetection['Timestamp'])) print(" Label: " + label['Name']) print(" Confidence: " + str(label['Confidence'])) print(" Instances:") for instance in label['Instances']: print (" Confidence: " + str(instance['Confidence'])) print (" Bounding box") print (" Top: " + str(instance['BoundingBox']['Top'])) print (" Left: " + str(instance['BoundingBox']['Left'])) print (" Width: " + str(instance['BoundingBox']['Width'])) print (" Height: " + str(instance['BoundingBox']['Height'])) print() print() print (" Parents:") for parent in label['Parents']: print (" " + parent['Name']) print () if 'NextToken' in response: paginationToken = response['NextToken'] else: finished = True # ============== Face Search =============== def StartFaceSearchCollection(self,collection): response = self.rek.start_face_search(Video={'S3Object':{'Bucket':self.bucket,'Name':self.video}}, CollectionId=collection, NotificationChannel={'RoleArn':self.roleArn, 'SNSTopicArn':self.snsTopicArn}) self.startJobId=response['JobId'] print('Start Job Id: ' + self.startJobId) def GetFaceSearchCollectionResults(self): maxResults = 10 paginationToken = '' finished = False while finished == False: response = self.rek.get_face_search(JobId=self.startJobId, MaxResults=maxResults, NextToken=paginationToken) print(response['VideoMetadata']['Codec']) print(str(response['VideoMetadata']['DurationMillis'])) print(response['VideoMetadata']['Format']) print(response['VideoMetadata']['FrameRate']) for personMatch in response['Persons']: print('Person Index: ' + str(personMatch['Person']['Index'])) print('Timestamp: ' + str(personMatch['Timestamp'])) if ('FaceMatches' in personMatch): for faceMatch in personMatch['FaceMatches']: print('Face ID: ' + faceMatch['Face']['FaceId']) print('Similarity: ' + str(faceMatch['Similarity'])) print() if 'NextToken' in response: paginationToken = response['NextToken'] else: finished = True print() def CreateTopicandQueue(self): millis = str(int(round(time.time() * 1000))) #Create SNS topic snsTopicName="AmazonRekognitionExample" + millis topicResponse=self.sns.create_topic(Name=snsTopicName) self.snsTopicArn = topicResponse['TopicArn'] #create SQS queue sqsQueueName="AmazonRekognitionQueue" + millis self.sqs.create_queue(QueueName=sqsQueueName) self.sqsQueueUrl = self.sqs.get_queue_url(QueueName=sqsQueueName)['QueueUrl'] attribs = self.sqs.get_queue_attributes(QueueUrl=self.sqsQueueUrl, AttributeNames=['QueueArn'])['Attributes'] sqsQueueArn = attribs['QueueArn'] # Subscribe SQS queue to SNS topic self.sns.subscribe( TopicArn=self.snsTopicArn, Protocol='sqs', Endpoint=sqsQueueArn) #Authorize SNS to write SQS queue policy = """{{ "Version":"2012-10-17", "Statement":[ {{ "Sid":"MyPolicy", "Effect":"Allow", "Principal" : {{"AWS" : "*"}}, "Action":"SQS:SendMessage", "Resource": "{}", "Condition":{{ "ArnEquals":{{ "aws:SourceArn": "{}" }} }} }} ] }}""".format(sqsQueueArn, self.snsTopicArn) response = self.sqs.set_queue_attributes( QueueUrl = self.sqsQueueUrl, Attributes = { 'Policy' : policy }) def DeleteTopicandQueue(self): self.sqs.delete_queue(QueueUrl=self.sqsQueueUrl) self.sns.delete_topic(TopicArn=self.snsTopicArn) def main(): roleArn = 'arn:aws:iam::"youriam":role/rapidminerRek' bucket = 'my-rapidminer-photo-bucket01' video = 'jg.mp4' analyzer=VideoDetect(roleArn, bucket,video) analyzer.CreateTopicandQueue() collection='rapidminer-awsrek-video' analyzer.StartFaceSearchCollection(collection) if analyzer.GetSQSMessageSuccess()==True: analyzer.GetFaceSearchCollectionResults() analyzer.DeleteTopicandQueue() if __name__ == "__main__": main()"/>
<parameter key="add label" value="false"/>
<parameter key="label_type" value="nominal"/>
</operator>
<operator activated="true" class="text:write_document" compatibility="8.2.000" expanded="true" height="82" name="Write Document" width="90" x="514" y="238">
<parameter key="file" value="C:\Python\Python37-32\awsRek-SearchingStoredVideosforFaces-Copy.py"/>
<parameter key="overwrite" value="true"/>
<parameter key="encoding" value="SYSTEM"/>
</operator>
<operator activated="true" class="productivity:execute_program" compatibility="9.4.001" expanded="true" height="103" name="Execute Program" width="90" x="715" y="238">
<parameter key="command" value="cmd /c C:\Python\Python37-32\awsRek-SearchingStoredVideosforFaces.bat"/>
<parameter key="log_stdout" value="true"/>
<parameter key="log_stderr" value="true"/>
<parameter key="working_directory" value="C:\Python\Python37-32"/>
<list key="env_variables"/>
</operator>
<operator activated="true" class="text:read_document" compatibility="8.2.000" expanded="true" height="68" name="Read Document" width="90" x="916" y="238">
<parameter key="file" value="C:\awsRek-SearchingStoredVideosforFaces.txt"/>
<parameter key="extract_text_only" value="true"/>
<parameter key="use_file_extension_as_type" value="true"/>
<parameter key="content_type" value="txt"/>
<parameter key="encoding" value="SYSTEM"/>
</operator>
<connect from_op="Open File" from_port="file" to_op="Write Amazon S3" to_port="file"/>
<connect from_op="Retrieve" from_port="output" to_op="Write Amazon S3" to_port="connection"/>
<connect from_op="Create Document" from_port="output" to_op="Write Document" to_port="document"/>
<connect from_op="Read Document" from_port="output" to_port="result 1"/>
<portSpacing port="source_input 1" spacing="0"/>
<portSpacing port="sink_result 1" spacing="0"/>
<portSpacing port="sink_result 2" spacing="0"/>
<description align="center" color="yellow" colored="false" height="96" resized="true" width="490" x="176" y="67">Searching and Analyzing a Video Stored in an Amazon S3 Bucket<br></description>
<description align="center" color="transparent" colored="true" height="63" resized="true" width="119" x="35" y="309">open video from stored location</description>
<description align="center" color="transparent" colored="true" height="50" resized="true" width="126" x="163" y="320">write to S3 bucket</description>
<description align="center" color="transparent" colored="true" height="50" resized="true" width="128" x="24" y="464">AWS S3 Connection</description>
<description align="center" color="yellow" colored="false" height="68" resized="true" width="129" x="360" y="316">Python Code:<br>Searching Stored Videos for Faces</description>
<description align="center" color="yellow" colored="false" height="59" resized="true" width="112" x="505" y="324">&quot;awsVideoRek-pythonCode&quot;.py</description>
<description align="left" color="yellow" colored="false" height="107" resized="true" width="164" x="708" y="349">Batch script:<br><br>python.exe awsVideoRek-pythonCode.py&quot; &gt; &quot;c:\text.txt&quot;</description>
<description align="center" color="yellow" colored="false" height="58" resized="true" width="138" x="899" y="317">read &quot;text.txt&quot; from previous operator</description>
</process>
</operator>
</process>
Cheers
This post builds and extends on the one posted by Scott Sgenzer (Using AWS Image ML inside a RapidMiner process).
Please note you need to follow the steps in the following links:
Link#1: Searching Stored Videos for Faces
Link#2: Analyzing a Video Stored in an Amazon S3 Bucket with Java or Python (SDK)
Link#3: Configuring Amazon Rekognition Video
Important Note about Video Formats and Storage :
Amazon Rekognition operations can analyze videos that are stored in Amazon S3 buckets. The video must be encoded using the H.264 codec. The supported file formats are MPEG-4 and MOV.
Note: For test purposes, upload a video that's no longer than 30 seconds in length.
Snapshot of the process:
Here is the xml:
<?xml version="1.0" encoding="UTF-8"?><process version="9.4.001">
<context>
<input/>
<output/>
<macros/>
</context>
<operator activated="true" class="process" compatibility="9.4.001" expanded="true" name="Process">
<parameter key="logverbosity" value="init"/>
<parameter key="random_seed" value="2001"/>
<parameter key="send_mail" value="never"/>
<parameter key="notification_email" value=""/>
<parameter key="process_duration_for_mail" value="30"/>
<parameter key="encoding" value="SYSTEM"/>
<process expanded="true">
<operator activated="true" class="open_file" compatibility="9.4.001" expanded="true" height="68" name="Open File" width="90" x="45" y="238">
<parameter key="resource_type" value="file"/>
<parameter key="filename" value="C:\Users\Administrator\Desktop\videos\jg.mp4"/>
</operator>
<operator activated="true" class="retrieve" compatibility="9.4.001" expanded="true" height="68" name="Retrieve" width="90" x="45" y="391">
<parameter key="repository_entry" value="/Connections/awsRekVideo"/>
</operator>
<operator activated="true" class="cloud_connectivity:write_amazons3" compatibility="9.4.000" expanded="true" height="82" name="Write Amazon S3" width="90" x="179" y="238">
<parameter key="connection_source" value="repository"/>
<parameter key="file" value="/my-rapidminer-photo-bucket01/jg.mp4"/>
</operator>
<operator activated="true" class="text:create_document" compatibility="8.2.000" expanded="true" height="68" name="Create Document" width="90" x="380" y="238">
<parameter key="text" value="#Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. #PDX-License-Identifier: MIT-0 (For details, see https://github.com/awsdocs/amazon-rekognition-developer-guide/blob/master/LICENSE-SAMPLECODE.) import boto3 import json import sys import time class VideoDetect: jobId = '' rek = boto3.client('rekognition') sqs = boto3.client('sqs') sns = boto3.client('sns') roleArn = '' bucket = '' video = '' startJobId = '' sqsQueueUrl = '' snsTopicArn = '' processType = '' def __init__(self, role, bucket, video): self.roleArn = role self.bucket = bucket self.video = video def GetSQSMessageSuccess(self): jobFound = False succeeded = False dotLine=0 while jobFound == False: sqsResponse = self.sqs.receive_message(QueueUrl=self.sqsQueueUrl, MessageAttributeNames=['ALL'], MaxNumberOfMessages=10) if sqsResponse: if 'Messages' not in sqsResponse: if dotLine<40: print('.', end='') dotLine=dotLine+1 else: print() dotLine=0 sys.stdout.flush() time.sleep(5) continue for message in sqsResponse['Messages']: notification = json.loads(message['Body']) rekMessage = json.loads(notification['Message']) print(rekMessage['JobId']) print(rekMessage['Status']) if rekMessage['JobId'] == self.startJobId: print('Matching Job Found:' + rekMessage['JobId']) jobFound = True if (rekMessage['Status']=='SUCCEEDED'): succeeded=True self.sqs.delete_message(QueueUrl=self.sqsQueueUrl, ReceiptHandle=message['ReceiptHandle']) else: print("Job didn't match:" + str(rekMessage['JobId']) + ' : ' + self.startJobId) # Delete the unknown message. Consider sending to dead letter queue self.sqs.delete_message(QueueUrl=self.sqsQueueUrl, ReceiptHandle=message['ReceiptHandle']) return succeeded def StartLabelDetection(self): response=self.rek.start_label_detection(Video={'S3Object': {'Bucket': self.bucket, 'Name': self.video}}, NotificationChannel={'RoleArn': self.roleArn, 'SNSTopicArn': self.snsTopicArn}) self.startJobId=response['JobId'] print('Start Job Id: ' + self.startJobId) def GetLabelDetectionResults(self): maxResults = 10 paginationToken = '' finished = False while finished == False: response = self.rek.get_label_detection(JobId=self.startJobId, MaxResults=maxResults, NextToken=paginationToken, SortBy='TIMESTAMP') print('Codec: ' + response['VideoMetadata']['Codec']) print('Duration: ' + str(response['VideoMetadata']['DurationMillis'])) print('Format: ' + response['VideoMetadata']['Format']) print('Frame rate: ' + str(response['VideoMetadata']['FrameRate'])) print() for labelDetection in response['Labels']: label=labelDetection['Label'] print("Timestamp: " + str(labelDetection['Timestamp'])) print(" Label: " + label['Name']) print(" Confidence: " + str(label['Confidence'])) print(" Instances:") for instance in label['Instances']: print (" Confidence: " + str(instance['Confidence'])) print (" Bounding box") print (" Top: " + str(instance['BoundingBox']['Top'])) print (" Left: " + str(instance['BoundingBox']['Left'])) print (" Width: " + str(instance['BoundingBox']['Width'])) print (" Height: " + str(instance['BoundingBox']['Height'])) print() print() print (" Parents:") for parent in label['Parents']: print (" " + parent['Name']) print () if 'NextToken' in response: paginationToken = response['NextToken'] else: finished = True # ============== Face Search =============== def StartFaceSearchCollection(self,collection): response = self.rek.start_face_search(Video={'S3Object':{'Bucket':self.bucket,'Name':self.video}}, CollectionId=collection, NotificationChannel={'RoleArn':self.roleArn, 'SNSTopicArn':self.snsTopicArn}) self.startJobId=response['JobId'] print('Start Job Id: ' + self.startJobId) def GetFaceSearchCollectionResults(self): maxResults = 10 paginationToken = '' finished = False while finished == False: response = self.rek.get_face_search(JobId=self.startJobId, MaxResults=maxResults, NextToken=paginationToken) print(response['VideoMetadata']['Codec']) print(str(response['VideoMetadata']['DurationMillis'])) print(response['VideoMetadata']['Format']) print(response['VideoMetadata']['FrameRate']) for personMatch in response['Persons']: print('Person Index: ' + str(personMatch['Person']['Index'])) print('Timestamp: ' + str(personMatch['Timestamp'])) if ('FaceMatches' in personMatch): for faceMatch in personMatch['FaceMatches']: print('Face ID: ' + faceMatch['Face']['FaceId']) print('Similarity: ' + str(faceMatch['Similarity'])) print() if 'NextToken' in response: paginationToken = response['NextToken'] else: finished = True print() def CreateTopicandQueue(self): millis = str(int(round(time.time() * 1000))) #Create SNS topic snsTopicName="AmazonRekognitionExample" + millis topicResponse=self.sns.create_topic(Name=snsTopicName) self.snsTopicArn = topicResponse['TopicArn'] #create SQS queue sqsQueueName="AmazonRekognitionQueue" + millis self.sqs.create_queue(QueueName=sqsQueueName) self.sqsQueueUrl = self.sqs.get_queue_url(QueueName=sqsQueueName)['QueueUrl'] attribs = self.sqs.get_queue_attributes(QueueUrl=self.sqsQueueUrl, AttributeNames=['QueueArn'])['Attributes'] sqsQueueArn = attribs['QueueArn'] # Subscribe SQS queue to SNS topic self.sns.subscribe( TopicArn=self.snsTopicArn, Protocol='sqs', Endpoint=sqsQueueArn) #Authorize SNS to write SQS queue policy = """{{ "Version":"2012-10-17", "Statement":[ {{ "Sid":"MyPolicy", "Effect":"Allow", "Principal" : {{"AWS" : "*"}}, "Action":"SQS:SendMessage", "Resource": "{}", "Condition":{{ "ArnEquals":{{ "aws:SourceArn": "{}" }} }} }} ] }}""".format(sqsQueueArn, self.snsTopicArn) response = self.sqs.set_queue_attributes( QueueUrl = self.sqsQueueUrl, Attributes = { 'Policy' : policy }) def DeleteTopicandQueue(self): self.sqs.delete_queue(QueueUrl=self.sqsQueueUrl) self.sns.delete_topic(TopicArn=self.snsTopicArn) def main(): roleArn = 'arn:aws:iam::"youriam":role/rapidminerRek' bucket = 'my-rapidminer-photo-bucket01' video = 'jg.mp4' analyzer=VideoDetect(roleArn, bucket,video) analyzer.CreateTopicandQueue() collection='rapidminer-awsrek-video' analyzer.StartFaceSearchCollection(collection) if analyzer.GetSQSMessageSuccess()==True: analyzer.GetFaceSearchCollectionResults() analyzer.DeleteTopicandQueue() if __name__ == "__main__": main()"/>
<parameter key="add label" value="false"/>
<parameter key="label_type" value="nominal"/>
</operator>
<operator activated="true" class="text:write_document" compatibility="8.2.000" expanded="true" height="82" name="Write Document" width="90" x="514" y="238">
<parameter key="file" value="C:\Python\Python37-32\awsRek-SearchingStoredVideosforFaces-Copy.py"/>
<parameter key="overwrite" value="true"/>
<parameter key="encoding" value="SYSTEM"/>
</operator>
<operator activated="true" class="productivity:execute_program" compatibility="9.4.001" expanded="true" height="103" name="Execute Program" width="90" x="715" y="238">
<parameter key="command" value="cmd /c C:\Python\Python37-32\awsRek-SearchingStoredVideosforFaces.bat"/>
<parameter key="log_stdout" value="true"/>
<parameter key="log_stderr" value="true"/>
<parameter key="working_directory" value="C:\Python\Python37-32"/>
<list key="env_variables"/>
</operator>
<operator activated="true" class="text:read_document" compatibility="8.2.000" expanded="true" height="68" name="Read Document" width="90" x="916" y="238">
<parameter key="file" value="C:\awsRek-SearchingStoredVideosforFaces.txt"/>
<parameter key="extract_text_only" value="true"/>
<parameter key="use_file_extension_as_type" value="true"/>
<parameter key="content_type" value="txt"/>
<parameter key="encoding" value="SYSTEM"/>
</operator>
<connect from_op="Open File" from_port="file" to_op="Write Amazon S3" to_port="file"/>
<connect from_op="Retrieve" from_port="output" to_op="Write Amazon S3" to_port="connection"/>
<connect from_op="Create Document" from_port="output" to_op="Write Document" to_port="document"/>
<connect from_op="Read Document" from_port="output" to_port="result 1"/>
<portSpacing port="source_input 1" spacing="0"/>
<portSpacing port="sink_result 1" spacing="0"/>
<portSpacing port="sink_result 2" spacing="0"/>
<description align="center" color="yellow" colored="false" height="96" resized="true" width="490" x="176" y="67">Searching and Analyzing a Video Stored in an Amazon S3 Bucket<br></description>
<description align="center" color="transparent" colored="true" height="63" resized="true" width="119" x="35" y="309">open video from stored location</description>
<description align="center" color="transparent" colored="true" height="50" resized="true" width="126" x="163" y="320">write to S3 bucket</description>
<description align="center" color="transparent" colored="true" height="50" resized="true" width="128" x="24" y="464">AWS S3 Connection</description>
<description align="center" color="yellow" colored="false" height="68" resized="true" width="129" x="360" y="316">Python Code:<br>Searching Stored Videos for Faces</description>
<description align="center" color="yellow" colored="false" height="59" resized="true" width="112" x="505" y="324">&quot;awsVideoRek-pythonCode&quot;.py</description>
<description align="left" color="yellow" colored="false" height="107" resized="true" width="164" x="708" y="349">Batch script:<br><br>python.exe awsVideoRek-pythonCode.py&quot; &gt; &quot;c:\text.txt&quot;</description>
<description align="center" color="yellow" colored="false" height="58" resized="true" width="138" x="899" y="317">read &quot;text.txt&quot; from previous operator</description>
</process>
</operator>
</process>
Cheers
2
Best Answer
-
thank you @joeanalytica !!1
Answers
-
thank you @joeanalytica !!1