Apache Pig supportedit

 

Apache Pig is a platform for analyzing large data sets that consists of a high-level language for expressing data analysis programs, coupled with infrastructure for evaluating these programs.

 
 -- Pig website

It provides a high-level, powerful, scripting-like transformation language which gets compiled into Map/Reduce jobs at runtime by the Pig compiler. To simplify working with arbitrary data, Pig associates a schema (or type information) with each data set for validation and performance. This in turn, breaks it down into discrete data types that can be transformed through various operators or custom functions (or UDFs). Data can be loaded from and stored to various storages such as the local file-system or HDFS, and with elasticsearch-hadoop into Elasticsearch as well.

Installationedit

In order to use elasticsearch-hadoop, its jar needs to be in Pig’s classpath. There are various ways of making that happen though typically the REGISTER command is used:

REGISTER /path/elasticsearch-hadoop.jar;
Note

the command expects a proper URI that can be found either on the local file-system or remotely. Typically it’s best to use a distributed file-system (like HDFS or Amazon S3) and use that since the script might be executed on various machines.

As an alternative, when using the command-line, one can register additional jars through the -Dpig.additional.jars option (that accepts an URI as well):

$ pig -Dpig.additional.jars=/path/elasticsearch-hadoop.jar:<other.jars> script.pig

or if the jars are on HDFS

$ pig \
-Dpig.additional.jars=hdfs://<cluster-name>:<cluster-port>/<path>/elasticsearch-hadoop.jar:<other.jars> script.pig

Configurationedit

With Pig, one can specify the configuration properties (as an alternative to Hadoop Configuration object) as a constructor parameter when declaring EsStorage:

STORE B INTO 'radio/artists' USING org.elasticsearch.hadoop.pig.EsStorage
             ('es.http.timeout = 5m',
              'es.index.auto.create = false' );

elasticsearch-hadoop configuration (target resource)

elasticsearch-hadoop option (http timeout)

another elasticsearch-hadoop configuration (disable automatic index creation)

Tip

To avoid having to specify the fully qualified class name (org.elasticsearch.hadoop.pig.EsStorage), consider using a shortcut through DEFINE command:

DEFINE EsStorage org.elasticsearch.hadoop.pig.EsStorage();

Do note that it is possible (and recommended) to specify the configuration parameters to reduce script duplication, such as es.query or es.mapping.names:

DEFINE EsStorage org.elasticsearch.hadoop.pig.EsStorage('my.cfg.param=value');
Important

Pig definitions are replaced as are; even though the syntax allows parametrization, Pig will silently ignore any parameters outside the DEFINE declaration.

Mappingedit

Out of the box, elasticsearch-hadoop uses the Pig schema to map the data in Elasticsearch, using both the field names and types in the process. There are cases however when the names in Pig cannot be used with Elasticsearch (invalid characters, existing names with different layout, etc…). For such cases, one can use the es.mapping.names setting which accepts a comma-separated list of names mapping in the following format: Pig field name : {es} field name

For example:

STORE B INTO  '...' USING org.elasticsearch.hadoop.pig.EsStorage(
        'es.mapping.names=date:@timestamp, uRL:url')

name mapping for two fields

Pig column date mapped in Elasticsearch to @timestamp

Pig column url mapped in Elasticsearch to url_123

Tip

Elasticsearch accepts only lower-case field name and, as such, elasticsearch-hadoop will always convert Pig column names to lower-case. Because Pig is case sensitive, elasticsearch-hadoop handles the reverse field mapping as well. It is recommended to use the default Pig style and use upper-case names only for commands and avoid mixed-case names.

Writing data to Elasticsearchedit

Elasticsearch is exposed as a native Storage to Pig so it can be used to store data into it:

-- load data from HDFS into Pig using a schema
A = LOAD 'src/test/resources/artists.dat' USING PigStorage()
                    AS (id:long, name, url:chararray, picture: chararray);
-- transform data
B = FOREACH A GENERATE name, TOTUPLE(url, picture) AS links;
-- save the result to Elasticsearch
STORE B INTO 'radio/artists' USING org.elasticsearch.pig.EsStorage();

Elasticsearch resource (index and type) associated with the given storage

additional configuration parameters can be passed here - in this case the defaults are used

Writing existing JSON to Elasticsearchedit

When the job output data is already in JSON, elasticsearch-hadoop allows direct indexing without applying any transformation; the data is taken as is and sent directly to Elasticsearch. In such cases, one needs to indicate the json input by setting the es.input.json parameter. As such, in this case elasticsearch-hadoop expects to receive a tuple with a single field (representing the JSON document); the library will recognize common textual types such as chararray or bytearray otherwise it just calls toString to get a hold of the JSON content.

Table 4. Pig types to use for JSON representation

Pig type Comment

bytearray

use this when the JSON data is represented as a byte[] or similar

chararray

use this if the JSON data is represented as a String

anything else

make sure the toString() returns the desired JSON document


Important

Make sure the data is properly encoded, in UTF-8. The field content is considered the final form of the document sent to Elasticsearch.

A = LOAD '/resources/artists.json' USING PigStorage() AS (json:chararray);"
STORE B INTO 'radio/artists'
    USING org.elasticsearch.pig.EsStorage('es.input.json=true'...);

Load the (JSON) data as a single field (json)

Indicate the input is of type JSON.

Writing to dynamic/multi-resourcesedit

One can index the data to a different resource, depending on the row being read, by using patterns. Reusing the aforementioned media example, one could configure it as follows:

A = LOAD 'src/test/resources/media.dat' USING PigStorage()
            AS (name:chararray, type:chararray , year: chararray);
STORE B INTO 'my-collection/{type}' USING org.elasticsearch.pig.EsStorage();

Tuple field used by the resource pattern. Any of the declared fields can be used.

Resource pattern using field type

For each tuple about to be written, elasticsearch-hadoop will extract the type field and use its value to determine the target resource.

The functionality is also available when dealing with raw JSON - in this case, the value will be extracted from the JSON document itself. Assuming the JSON source contains documents with the following structure:

{
    "media_type":"game",
    "title":"Final Fantasy VI",
    "year":"1994"
}

field within the JSON document that will be used by the pattern

the table declaration can be as follows:

A = LOAD '/resources/media.json' USING PigStorage() AS (json:chararray);"
STORE B INTO 'my-collection/{media_type}'
    USING org.elasticsearch.pig.EsStorage('es.input.json=true');

Schema declaration for the tuple. Since JSON input is used, the schema is simply a holder to the raw data

Resource pattern relying on fields within the JSON document and not on the table schema

Reading data from Elasticsearchedit

As you would expect, loading the data is straight forward:

-- execute Elasticsearch query and load data into Pig
A = LOAD 'radio/artists'
    USING org.elasticsearch.pig.EsStorage('es.query=?me*');
DUMP A;

Elasticsearch resource

search query to execute

Type conversionedit

Important

If automatic index creation is used, please review this section for more information.

Pig internally uses native java types for most of its types and elasticsearch-hadoop abides to that convention.

Pig type Elasticsearch type

null

null

chararray

string

int

int

long

long

double

double

float

float

bytearray

binary

tuple

map

bag

array

map

map

Available in Pig 0.10 or higher

boolean

boolean

Available in Pig 0.11 or higher

datetime

date

Available in Pig 0.12 or higher

biginteger

not supported

bigdecimal

not supported

Note

While Elasticsearch understands the Pig types up to version 0.12, it is backwards compatible with Pig 0.9