RapidMiner Process: Searching and Analyzing a Video Stored in an Amazon S3 Bucket

joeanalytica
joeanalytica New Altair Community Member
edited November 5 in Community Q&A
Hi everyone.
sgenzer 
This post builds and extends on the one posted by Scott Sgenzer (Using AWS Image ML inside a RapidMiner process).

Please note you need to follow the steps in the following links:
Link#1: Searching Stored Videos for Faces
Link#2: Analyzing a Video Stored in an Amazon S3 Bucket with Java or Python (SDK)  
Link#3: Configuring Amazon Rekognition Video

Important Note about Video Formats and Storage
Amazon Rekognition operations can analyze videos that are stored in Amazon S3 buckets. The video must be encoded using the H.264 codec. The supported file formats are MPEG-4 and MOV. 

Note: For test purposes, upload a video that's no longer than 30 seconds in length. 

Snapshot of the process:




Here is the xml:
       
<?xml version="1.0" encoding="UTF-8"?><process version="9.4.001">
  <context>
    <input/>
    <output/>
    <macros/>
  </context>
  <operator activated="true" class="process" compatibility="9.4.001" expanded="true" name="Process">
    <parameter key="logverbosity" value="init"/>
    <parameter key="random_seed" value="2001"/>
    <parameter key="send_mail" value="never"/>
    <parameter key="notification_email" value=""/>
    <parameter key="process_duration_for_mail" value="30"/>
    <parameter key="encoding" value="SYSTEM"/>
    <process expanded="true">
      <operator activated="true" class="open_file" compatibility="9.4.001" expanded="true" height="68" name="Open File" width="90" x="45" y="238">
        <parameter key="resource_type" value="file"/>
        <parameter key="filename" value="C:\Users\Administrator\Desktop\videos\jg.mp4"/>
      </operator>
      <operator activated="true" class="retrieve" compatibility="9.4.001" expanded="true" height="68" name="Retrieve" width="90" x="45" y="391">
        <parameter key="repository_entry" value="/Connections/awsRekVideo"/>
      </operator>
      <operator activated="true" class="cloud_connectivity:write_amazons3" compatibility="9.4.000" expanded="true" height="82" name="Write Amazon S3" width="90" x="179" y="238">
        <parameter key="connection_source" value="repository"/>
        <parameter key="file" value="/my-rapidminer-photo-bucket01/jg.mp4"/>
      </operator>
      <operator activated="true" class="text:create_document" compatibility="8.2.000" expanded="true" height="68" name="Create Document" width="90" x="380" y="238">
        <parameter key="text" value="#Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.&#10;#PDX-License-Identifier: MIT-0 (For details, see https://github.com/awsdocs/amazon-rekognition-developer-guide/blob/master/LICENSE-SAMPLECODE.)&#10;&#10;import boto3&#10;import json&#10;import sys&#10;import time&#10;&#10;&#10;&#10;class VideoDetect:&#10;    jobId = ''&#10;    rek = boto3.client('rekognition')&#10;    sqs = boto3.client('sqs')&#10;    sns = boto3.client('sns')&#10;    &#10;    roleArn = ''&#10;    bucket = ''&#10;    video = ''&#10;    startJobId = ''&#10;&#10;    sqsQueueUrl = ''&#10;    snsTopicArn = ''&#10;    processType = ''&#10;&#10;    def __init__(self, role, bucket, video):    &#10;        self.roleArn = role&#10;        self.bucket = bucket&#10;        self.video = video&#10;&#10;    def GetSQSMessageSuccess(self):&#10;&#10;        jobFound = False&#10;        succeeded = False&#10;    &#10;        dotLine=0&#10;        while jobFound == False:&#10;            sqsResponse = self.sqs.receive_message(QueueUrl=self.sqsQueueUrl, MessageAttributeNames=['ALL'],&#10;                                          MaxNumberOfMessages=10)&#10;&#10;            if sqsResponse:&#10;                &#10;                if 'Messages' not in sqsResponse:&#10;                    if dotLine&lt;40:&#10;                        print('.', end='')&#10;                        dotLine=dotLine+1&#10;                    else:&#10;                        print()&#10;                        dotLine=0    &#10;                    sys.stdout.flush()&#10;                    time.sleep(5)&#10;                    continue&#10;&#10;                for message in sqsResponse['Messages']:&#10;                    notification = json.loads(message['Body'])&#10;                    rekMessage = json.loads(notification['Message'])&#10;                    print(rekMessage['JobId'])&#10;                    print(rekMessage['Status'])&#10;                    if rekMessage['JobId'] == self.startJobId:&#10;                        print('Matching Job Found:' + rekMessage['JobId'])&#10;                        jobFound = True&#10;                        if (rekMessage['Status']=='SUCCEEDED'):&#10;                            succeeded=True&#10;&#10;                        self.sqs.delete_message(QueueUrl=self.sqsQueueUrl,&#10;                                       ReceiptHandle=message['ReceiptHandle'])&#10;                    else:&#10;                        print(&quot;Job didn't match:&quot; +&#10;                              str(rekMessage['JobId']) + ' : ' + self.startJobId)&#10;                    # Delete the unknown message. Consider sending to dead letter queue&#10;                    self.sqs.delete_message(QueueUrl=self.sqsQueueUrl,&#10;                                   ReceiptHandle=message['ReceiptHandle'])&#10;&#10;&#10;        return succeeded&#10;&#10;    def StartLabelDetection(self):&#10;        response=self.rek.start_label_detection(Video={'S3Object': {'Bucket': self.bucket, 'Name': self.video}},&#10;            NotificationChannel={'RoleArn': self.roleArn, 'SNSTopicArn': self.snsTopicArn})&#10;&#10;        self.startJobId=response['JobId']&#10;        print('Start Job Id: ' + self.startJobId)&#10;&#10;&#10;    def GetLabelDetectionResults(self):&#10;        maxResults = 10&#10;        paginationToken = ''&#10;        finished = False&#10;&#10;        while finished == False:&#10;            response = self.rek.get_label_detection(JobId=self.startJobId,&#10;                                            MaxResults=maxResults,&#10;                                            NextToken=paginationToken,&#10;                                            SortBy='TIMESTAMP')&#10;&#10;            print('Codec: ' + response['VideoMetadata']['Codec'])&#10;            print('Duration: ' + str(response['VideoMetadata']['DurationMillis']))&#10;            print('Format: ' + response['VideoMetadata']['Format'])&#10;            print('Frame rate: ' + str(response['VideoMetadata']['FrameRate']))&#10;            print()&#10;&#10;            for labelDetection in response['Labels']:&#10;                label=labelDetection['Label']&#10;&#10;                print(&quot;Timestamp: &quot; + str(labelDetection['Timestamp']))&#10;                print(&quot;   Label: &quot; + label['Name'])&#10;                print(&quot;   Confidence: &quot; +  str(label['Confidence']))&#10;                print(&quot;   Instances:&quot;)&#10;                for instance in label['Instances']:&#10;                    print (&quot;      Confidence: &quot; + str(instance['Confidence']))&#10;                    print (&quot;      Bounding box&quot;)&#10;                    print (&quot;        Top: &quot; + str(instance['BoundingBox']['Top']))&#10;                    print (&quot;        Left: &quot; + str(instance['BoundingBox']['Left']))&#10;                    print (&quot;        Width: &quot; +  str(instance['BoundingBox']['Width']))&#10;                    print (&quot;        Height: &quot; +  str(instance['BoundingBox']['Height']))&#10;                    print()&#10;                print()&#10;                print (&quot;   Parents:&quot;)&#10;                for parent in label['Parents']:&#10;                    print (&quot;      &quot; + parent['Name'])&#10;                print ()&#10;&#10;                if 'NextToken' in response:&#10;                    paginationToken = response['NextToken']&#10;                else:&#10;                    finished = True&#10;&#10;&#10;       # ============== Face Search ===============&#10;    def StartFaceSearchCollection(self,collection):&#10;        response = self.rek.start_face_search(Video={'S3Object':{'Bucket':self.bucket,'Name':self.video}},&#10;            CollectionId=collection,&#10;            NotificationChannel={'RoleArn':self.roleArn, 'SNSTopicArn':self.snsTopicArn})&#10;        &#10;        self.startJobId=response['JobId']&#10;        &#10;        print('Start Job Id: ' + self.startJobId)&#10;&#10;&#10;    def GetFaceSearchCollectionResults(self):&#10;        maxResults = 10&#10;        paginationToken = ''&#10;&#10;        finished = False&#10;&#10;        while finished == False:&#10;            response = self.rek.get_face_search(JobId=self.startJobId,&#10;                                        MaxResults=maxResults,&#10;                                        NextToken=paginationToken)&#10;&#10;            print(response['VideoMetadata']['Codec'])&#10;            print(str(response['VideoMetadata']['DurationMillis']))&#10;            print(response['VideoMetadata']['Format'])&#10;            print(response['VideoMetadata']['FrameRate'])&#10;&#10;            for personMatch in response['Persons']:&#10;                print('Person Index: ' + str(personMatch['Person']['Index']))&#10;                print('Timestamp: ' + str(personMatch['Timestamp']))&#10;&#10;                if ('FaceMatches' in personMatch):&#10;                    for faceMatch in personMatch['FaceMatches']:&#10;                        print('Face ID: ' + faceMatch['Face']['FaceId'])&#10;                        print('Similarity: ' + str(faceMatch['Similarity']))&#10;                print()&#10;            if 'NextToken' in response:&#10;                paginationToken = response['NextToken']&#10;            else:&#10;                finished = True&#10;            print()&#10;&#10;            &#10;    &#10;    def CreateTopicandQueue(self):&#10;      &#10;        millis = str(int(round(time.time() * 1000)))&#10;&#10;        #Create SNS topic&#10;        &#10;        snsTopicName=&quot;AmazonRekognitionExample&quot; + millis&#10;&#10;        topicResponse=self.sns.create_topic(Name=snsTopicName)&#10;        self.snsTopicArn = topicResponse['TopicArn']&#10;&#10;        #create SQS queue&#10;        sqsQueueName=&quot;AmazonRekognitionQueue&quot; + millis&#10;        self.sqs.create_queue(QueueName=sqsQueueName)&#10;        self.sqsQueueUrl = self.sqs.get_queue_url(QueueName=sqsQueueName)['QueueUrl']&#10; &#10;        attribs = self.sqs.get_queue_attributes(QueueUrl=self.sqsQueueUrl,&#10;                                                    AttributeNames=['QueueArn'])['Attributes']&#10;                                        &#10;        sqsQueueArn = attribs['QueueArn']&#10;&#10;        # Subscribe SQS queue to SNS topic&#10;        self.sns.subscribe(&#10;            TopicArn=self.snsTopicArn,&#10;            Protocol='sqs',&#10;            Endpoint=sqsQueueArn)&#10;&#10;        #Authorize SNS to write SQS queue &#10;        policy = &quot;&quot;&quot;{{&#10;  &quot;Version&quot;:&quot;2012-10-17&quot;,&#10;  &quot;Statement&quot;:[&#10;    {{&#10;      &quot;Sid&quot;:&quot;MyPolicy&quot;,&#10;      &quot;Effect&quot;:&quot;Allow&quot;,&#10;      &quot;Principal&quot; : {{&quot;AWS&quot; : &quot;*&quot;}},&#10;      &quot;Action&quot;:&quot;SQS:SendMessage&quot;,&#10;      &quot;Resource&quot;: &quot;{}&quot;,&#10;      &quot;Condition&quot;:{{&#10;        &quot;ArnEquals&quot;:{{&#10;          &quot;aws:SourceArn&quot;: &quot;{}&quot;&#10;        }}&#10;      }}&#10;    }}&#10;  ]&#10;}}&quot;&quot;&quot;.format(sqsQueueArn, self.snsTopicArn)&#10; &#10;        response = self.sqs.set_queue_attributes(&#10;            QueueUrl = self.sqsQueueUrl,&#10;            Attributes = {&#10;                'Policy' : policy&#10;            })&#10;&#10;    def DeleteTopicandQueue(self):&#10;        self.sqs.delete_queue(QueueUrl=self.sqsQueueUrl)&#10;        self.sns.delete_topic(TopicArn=self.snsTopicArn)&#10;&#10;&#10;def main():&#10;    roleArn = 'arn:aws:iam::"youriam":role/rapidminerRek'   &#10;    bucket = 'my-rapidminer-photo-bucket01'&#10;    video = 'jg.mp4'&#10;&#10;    analyzer=VideoDetect(roleArn, bucket,video)&#10;    analyzer.CreateTopicandQueue()&#10;&#10;    collection='rapidminer-awsrek-video'&#10;    analyzer.StartFaceSearchCollection(collection)&#10;    &#10;    if analyzer.GetSQSMessageSuccess()==True:&#10;        analyzer.GetFaceSearchCollectionResults()&#10;    &#10;    analyzer.DeleteTopicandQueue()&#10;&#10;&#10;if __name__ == &quot;__main__&quot;:&#10;    main()"/>
        <parameter key="add label" value="false"/>
        <parameter key="label_type" value="nominal"/>
      </operator>
      <operator activated="true" class="text:write_document" compatibility="8.2.000" expanded="true" height="82" name="Write Document" width="90" x="514" y="238">
        <parameter key="file" value="C:\Python\Python37-32\awsRek-SearchingStoredVideosforFaces-Copy.py"/>
        <parameter key="overwrite" value="true"/>
        <parameter key="encoding" value="SYSTEM"/>
      </operator>
      <operator activated="true" class="productivity:execute_program" compatibility="9.4.001" expanded="true" height="103" name="Execute Program" width="90" x="715" y="238">
        <parameter key="command" value="cmd /c C:\Python\Python37-32\awsRek-SearchingStoredVideosforFaces.bat"/>
        <parameter key="log_stdout" value="true"/>
        <parameter key="log_stderr" value="true"/>
        <parameter key="working_directory" value="C:\Python\Python37-32"/>
        <list key="env_variables"/>
      </operator>
      <operator activated="true" class="text:read_document" compatibility="8.2.000" expanded="true" height="68" name="Read Document" width="90" x="916" y="238">
        <parameter key="file" value="C:\awsRek-SearchingStoredVideosforFaces.txt"/>
        <parameter key="extract_text_only" value="true"/>
        <parameter key="use_file_extension_as_type" value="true"/>
        <parameter key="content_type" value="txt"/>
        <parameter key="encoding" value="SYSTEM"/>
      </operator>
      <connect from_op="Open File" from_port="file" to_op="Write Amazon S3" to_port="file"/>
      <connect from_op="Retrieve" from_port="output" to_op="Write Amazon S3" to_port="connection"/>
      <connect from_op="Create Document" from_port="output" to_op="Write Document" to_port="document"/>
      <connect from_op="Read Document" from_port="output" to_port="result 1"/>
      <portSpacing port="source_input 1" spacing="0"/>
      <portSpacing port="sink_result 1" spacing="0"/>
      <portSpacing port="sink_result 2" spacing="0"/>
      <description align="center" color="yellow" colored="false" height="96" resized="true" width="490" x="176" y="67">Searching and Analyzing a Video Stored in an Amazon S3 Bucket&lt;br&gt;</description>
      <description align="center" color="transparent" colored="true" height="63" resized="true" width="119" x="35" y="309">open video from stored location</description>
      <description align="center" color="transparent" colored="true" height="50" resized="true" width="126" x="163" y="320">write to S3 bucket</description>
      <description align="center" color="transparent" colored="true" height="50" resized="true" width="128" x="24" y="464">AWS S3 Connection</description>
      <description align="center" color="yellow" colored="false" height="68" resized="true" width="129" x="360" y="316">Python Code:&lt;br&gt;Searching Stored Videos for Faces</description>
      <description align="center" color="yellow" colored="false" height="59" resized="true" width="112" x="505" y="324">&amp;quot;awsVideoRek-pythonCode&amp;quot;.py</description>
      <description align="left" color="yellow" colored="false" height="107" resized="true" width="164" x="708" y="349">Batch script:&lt;br&gt;&lt;br&gt;python.exe awsVideoRek-pythonCode.py&amp;quot; &amp;gt; &amp;quot;c:\text.txt&amp;quot;</description>
      <description align="center" color="yellow" colored="false" height="58" resized="true" width="138" x="899" y="317">read &amp;quot;text.txt&amp;quot; from previous operator</description>
    </process>
  </operator>
</process>


Cheers

Best Answer

Answers