Cascading is a data processing API and processing query planner used for defining, sharing, and executing data-processing workflows on a single computing node or distributed computing cluster.
|-- Cascading website|
Cascading abstracting the Map/Reduce API and focusing on data processing
in terms of tuples flowing through pipes between taps,
from input (called
SourceTap) to output (named
SinkTap). As the data flows, various operations are applied to the tuple; the whole system being transformed to Map/Reduce operations at runtime.
With elasticsearch-hadoop, Elasticsearch can be plugged into Cascading flows as a
Just like other libraries, elasticsearch-hadoop needs to be available in the jar classpath (either by being manually deployed in the cluster or shipped along with the Hadoop job).
Cascading is configured through a
Map<Object, Object>, typically a
Properties object which indicates the various Cascading settings and also the application jar:
Properties props = new Properties(); AppProps.setApplicationJarClass(props, Main.class); FlowConnector flow = new HadoopFlowConnector(props);
elasticsearch-hadoop options can be specified in the same way, these being picked up automatically by all `EsTap`s down the flow:
Properties props = new Properties(); props.setProperty("es.index.auto.create", "false"); ... FlowConnector flow = new HadoopFlowConnector(props);
This approach can be used for local and remote/Hadoop flows - simply use the appropriate
Depending on the platform used, Cascading can use internally either
Writable or JDK types for its tuples. Elasticsearch handles both transparently
(see the Map/Reduce conversion section) though we recommend using the same types (if possible) in both cases to avoid the overhead of maintaining two different versions.
If automatic index creation is used, please review this section for more information.
EsTap into the Cascading flow:
Tap in = Lfs(new TextDelimited(new Fields("id", "name", "url", "picture")), "/resources/artists.dat"); Tap out = new EsTap("radio/artists" , new Fields("name", "url", "picture") ); new HadoopFlowConnector().connect(in, out, new Pipe("write-to-Es")).complete();
When the job output data is already in JSON, elasticsearch-hadoop allows direct indexing without applying any transformation; the data is taken as is and sent directly to Elasticsearch. In such cases, one needs to indicate the json input by setting
es.input.json parameter. As such, in this case elasticsearch-hadoop expects to receive a tuple with a single field (representing the JSON document); the library will recognize
BytesWritable types otherwise it just
toString to get a hold of the JSON content.
Make sure the data is properly encoded, in
UTF-8. The job output is considered the final form of the document sent to Elasticsearch.
Properties props = new Properties(); ... props.setProperty("es.input.json", "true"); Tap in = new Lfs(new TextLine(new Fields("line")),"/resources/artists.json"); Tap out = new EsTap("json-cascading-local/artists"); FlowConnector flow = new HadoopFlowConnector(props); flow.connect(in, out, new Pipe("import-json")).complete();
Just the same, add
EsTap on the other end of a pipe, to read (instead of writing) to it.