data stream processing
aziz_hadi
New Altair Community Member
please any one can help me , i have data base table i want to read it as batches than in every batch i do some processing such as averaging SD
Tagged:
0
Answers
-
So you mean differently to the Loop Batches operator where you would read in the entire dataset then loop over 1,000 records at a time?
Completely untested, but how about this?<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<process version="5.3.013">
<context>
<input/>
<output/>
<macros/>
</context>
<operator activated="true" class="process" compatibility="5.3.013" expanded="true" name="Process">
<process expanded="true">
<operator activated="true" class="read_database" compatibility="5.3.013" expanded="true" height="60" name="Read Database (2)" width="90" x="112" y="255">
<parameter key="connection" value="MySQLConnection"/>
<parameter key="query" value="Select CEIL(COUNT(*)/%{mBatchSize}) as 'MyTableRecords' FROM MyTable"/>
<enumeration key="parameters"/>
</operator>
<operator activated="true" class="extract_macro" compatibility="5.3.013" expanded="true" height="60" name="Extract Macro" width="90" x="246" y="255">
<parameter key="macro" value="mMyTableRecords"/>
<parameter key="macro_type" value="data_value"/>
<parameter key="attribute_name" value="MyTableRecords"/>
<parameter key="example_index" value="1"/>
<list key="additional_macros"/>
</operator>
<operator activated="true" class="loop" compatibility="5.3.013" expanded="true" height="76" name="Loop" width="90" x="246" y="120">
<parameter key="iterations" value="%{mMyTableRecords}"/>
<process expanded="true">
<operator activated="true" class="read_database" compatibility="5.3.013" expanded="true" height="60" name="Read Database (3)" width="90" x="45" y="120">
<parameter key="connection" value="MySQLConnection"/>
<parameter key="query" value="Select CEIL(COUNT(*)/1000) as 'MyTableRecords' FROM MyTable LIMIT (%{mBatchStart}+1),(%{mMyTableRecords}*%{mBatchSize})"/>
<enumeration key="parameters"/>
</operator>
<operator activated="true" class="subprocess" compatibility="5.3.013" expanded="true" height="76" name="SomeProcessing" width="90" x="246" y="75">
<process expanded="true">
<portSpacing port="source_in 1" spacing="0"/>
<portSpacing port="source_in 2" spacing="0"/>
<portSpacing port="sink_out 1" spacing="0"/>
<portSpacing port="sink_out 2" spacing="0"/>
</process>
</operator>
<operator activated="true" class="generate_macro" compatibility="5.3.013" expanded="true" height="76" name="Generate Macro" width="90" x="45" y="300">
<list key="function_descriptions">
<parameter key="mBatchStart" value="%{mBatchSize} * %{mMyTableRecords}"/>
</list>
</operator>
<connect from_op="Read Database (3)" from_port="output" to_op="SomeProcessing" to_port="in 1"/>
<connect from_op="SomeProcessing" from_port="out 1" to_op="Generate Macro" to_port="through 1"/>
<connect from_op="Generate Macro" from_port="through 1" to_port="output 1"/>
<portSpacing port="source_input 1" spacing="0"/>
<portSpacing port="source_input 2" spacing="0"/>
<portSpacing port="sink_output 1" spacing="0"/>
<portSpacing port="sink_output 2" spacing="0"/>
</process>
</operator>
<operator activated="true" class="set_macros" compatibility="5.3.013" expanded="true" height="60" name="Set Macros" width="90" x="45" y="165">
<list key="macros">
<parameter key="mBatchStart" value="0"/>
<parameter key="mBatchSize" value="1000"/>
</list>
</operator>
<connect from_op="Read Database (2)" from_port="output" to_op="Extract Macro" to_port="example set"/>
<connect from_op="Extract Macro" from_port="example set" to_op="Loop" to_port="input 1"/>
<connect from_op="Loop" from_port="output 1" to_port="result 1"/>
<portSpacing port="source_input 1" spacing="0"/>
<portSpacing port="sink_result 1" spacing="0"/>
<portSpacing port="sink_result 2" spacing="0"/>
</process>
</operator>
</process>0