Monday, September 17, 2012

Big Data, Spatial Hive, Sequence Files


Following the last post, where we used Pig to analyze data stored in HDFS, in this post we will be using Hive and spatially enabling it for geo analysis. Hive enable you to write SQL like statements in a language called HiveQL that Hive converts to a MapReduce job that is submitted to Hadoop for execution. Again, if you know SQL, then learning HiveQL is very easy and intuitive.  Hive is not intended for OLTP and real-time analysis. Like Pig, Hive is extensible via User Defined Functions (UDFs), so we will be using almost the same functions as in the previous post to find things that are near and/or within some criteria.

There are several ways to store data in HDFS, one of them is in the SequenceFile format. This is a key/value binary format with compression capabilities. For this post, I will be transforming an input into a well know binary format to be stored onto HDFS for later query and analysis.

An object that required persistence onto a SequenceFile has to implement the Writable interface. So, here we go, since we deal with spatial features, let's declare a Feature class that implements the Writable interface:


public class Feature implements Writable
{
    public IGeometry geometry;
    public ISymbol symbol = NoopSymbol.NOOP;

    public Feature()
    {
    }

    public void write(final DataOutput dataOutput) throws IOException
    {
        geometry.write(dataOutput);
        symbol.write(dataOutput);
    }

    public void readFields(final DataInput dataInput) throws IOException
    {
        geometry.readFields(dataInput);
        symbol.readFields(dataInput);
    }
}

A Feature has a geometry and a symbol. A geometry is also Writable:


public interface IGeometry extends Writable
{
}

An implementation of the geometry interface is a two dimensional MapPoint:


public class MapPoint implements IGeometry
{
    public double x;
    public double y;

    public MapPoint()
    {
    }

    public void write(final DataOutput dataOutput) throws IOException
    {
        dataOutput.writeDouble(x);
        dataOutput.writeDouble(y);
    }

    public void readFields(final DataInput dataInput) throws IOException
    {
        x = dataInput.readDouble();
        y = dataInput.readDouble();
    }
}

For now, feature will have an no operation (noop) symbol associated with them:


public class NoopSymbol implements ISymbol
{
    public final static ISymbol NOOP = new NoopSymbol();

    public NoopSymbol()
    {
    }

    public void write(final DataOutput dataOutput) throws IOException
    {
    }

    public void readFields(final DataInput dataInput) throws IOException
    {
    }
}

The input source that I wanted to test in my ETL is a set of cities (cities1000) in TSV format downloaded from geonames. The Writable object to read and write from a SequenceFile is a City class that extends a Feature and is augmented with attributes.


public class City extends Feature
{
    public int cityId;
    public String name;
    public String country;
    public int population;
    public int elevation;
    public String timeZone;

    @Override
    public void write(final DataOutput dataOutput) throws IOException
    {
        super.write(dataOutput);
        dataOutput.writeInt(cityId);
        dataOutput.writeUTF(name);
        dataOutput.writeUTF(country);
        dataOutput.writeInt(population);
        dataOutput.writeInt(elevation);
        dataOutput.writeUTF(timeZone);
    }

    @Override
    public void readFields(final DataInput dataInput) throws IOException
    {
        geometry = MapPoint.newMapPoint(dataInput);
        symbol = NoopSymbol.NOOP;
        cityId = dataInput.readInt();
        name = dataInput.readUTF();
        country = dataInput.readUTF();
        population = dataInput.readInt();
        elevation = dataInput.readInt();
        timeZone = dataInput.readUTF();
    }
}

Using Hadoop's command line interface, I prepared a working directory to load the cities into HDFS:


$ hadoop fs -mkdir cities

Next, I wrote and executed a Java program using the opencsv library to extract, transform and load the TSV into SequenceFile records onto HDFS.

I highly recommend that you read Hadoop In Action. It has a nice introduction to installing and running Hive. Remember, Hive operates on SQL-like statements, so to operate on the loaded City data, we create a table that maps to the City object. From the Hive command line interface, we execute the following command:


hive> create external table cities(
 x double,
 y double,
 cityid int,
 name  string,
 country string,
 population int,
 elevation int,
 timezone string
 ) row format serde 'com.esri.CitySerDe'
 stored as sequencefile location '/user/mraad_admin/cities';

If you know SQL, this should be familiar. But note the last two lines;  It instructs Hive to read the data in a SequenceFile format from an HDFS location that we previously prepared and since the data is in a binary format, each row is serialized and deserialized using a helper SerDe class.
The CitySerDe class knows how to serialize and deserialize a writable object from the input and output stream into and from a concrete City class instance. In addition, it provides column metadata such as the column name and the type to Hive. The SerDe is compiled and packaged into a jar that is added to the hive runtime for usage:


hive> add jar /Users/mraad_admin/JavaWorkspace/GeomX/GeomX.jar;

Added /Users/mraad_admin/JavaWorkspace/GeomX/GeomX.jar to class path
Added resource: /Users/mraad_admin/JavaWorkspace/GeomX/GeomX.jar

hive> show tables;
OK
cities
Time taken: 3.98 seconds

hive> describe cities;
OK
x double from deserializer
y double from deserializer
cityid int from deserializer
name string from deserializer
country string from deserializer
population int from deserializer
elevation int from deserializer
timezone string from deserializer
Time taken: 0.434 seconds

hive> select * from cities limit 5;
OK
1.49129 42.46372 3039163 Sant Julia de Loria AD 8022 0 Europe/Andorra
1.73361 42.54277 3039604 Pas de la Casa AD 2363 2050 Europe/Andorra
1.53319 42.55623 3039678 Ordino AD 3066 0 Europe/Andorra
1.53414 42.50729 3040051 les Escaldes AD 15853 0 Europe/Andorra
1.51483 42.54499 3040132 la Massana AD 7211 0 Europe/Andorra
Time taken: 0.108 seconds

Like I said, If you know SQL, you can find the top 5 countries with the most cities by issuing the following statement - no need to write MR jobs:


hive> select country,count(country) as c from cities group by country order by c desc limit 5;

Onto spatial.  Hive is extensible via User Defined Functions (UDFs). So I wanted to find all the cities that are near a specific location, I extend hive with a 'near' function that was packaged in the added jar and defined it as follows:


hive> create temporary function near as 'com.esri.UDFNear';

I can now use the 'near' function to locate cities within 5 miles of a specific location:


hive> select name from cities where near(x,y,-84.20299,39.43534,5);

The UDFNear function extends the UDF class and implements in this case the Haversine distance calculation between two geographical locations.


public class UDFNear extends UDF
{
    private final static BooleanWritable TRUE = new BooleanWritable(true);
    private final static BooleanWritable FALSE = new BooleanWritable(false);

    public BooleanWritable evaluate(
            DoubleWritable x1, DoubleWritable y1,
            DoubleWritable x2, DoubleWritable y2,
            DoubleWritable distance
    )
    {
        return HaversineMiles.distance(y1.get(), x1.get(), y2.get(), x1.get()) < distance.get() ? TRUE : FALSE;
    }

    public boolean evaluate(
            double x1, double y1,
            double x2, double y2,
            double distance
    )
    {
        return HaversineMiles.distance(y1, x1, y2, x2) < distance;
    }
}

Let's assume that the field 'country' was not defined in the table, and I want to find the count of cities per country where I only have the spatial boundaries of the countries.   This is a perfect spatial join and where UDF, DistributedCache, and spatial libraries like JTS and geotools come to the rescue.

I extended the GenericUDF class with a GenericUDFPip class that performs a 'naive' point-in-polygon (pip) based on geometries loaded into the distributed cache.  This enabled me to write a spatial query as follows:


hive> add file borders.shp;
hive> add file borders.shx;
hive> create temporary function pip as 'com.esri.GenericUDFPip';
hive> select t.p,count(t.p) from (select pip(x,y,'./borders.shp') as p from cities) t where t.p != -1 group by t.p;

The first two lines load into the distributed cache the countries borders shape file and its spatial index - these will be used by the pip function first time through to create an in-memory spatial index for fast searching. The pip function is defined as a class in the previously added jar file. The pip function expects 3 arguments; the first is a longitude, the second is a latitude and the third is the shape file location in the distributed cache. Based on these arguments, it will return the country border record identifier where the longitude and latitude arguments fall into or a -1 if there is no intersection.  For the query, a nested table is first created based on the pip result, which is then grouped and aggregated based on a non-negative border identifier.
Pretty cool - no ? So imagine what else you could do with HQL and these libraries...Say find me the top 10 cities with the most surrounding cities in a 25 mile radius (exercise for the reader. Hint, use join and look at the source code for UDFDist :-)

The fun is not about to stop. Since this is SQL-Like, Hive comes with a JDBC driver. Using my favorite Java framework, Spring-Hadoop integrates with Hive to become a JDBC client.

First make sure to start Hive as a service:


$ hive --service hiverserver

Next, define a Spring application context as follows:



   
          destroy-method="close"
          p:driverClassName="org.apache.hadoop.hive.jdbc.HiveDriver"
          p:url="jdbc:hive://localhost:10000/default"
          p:connectionInitSqls-ref="initSqls">
   

   
        add jar /Users/mraad_admin/JavaWorkspace/GeomX/GeomX.jar
        create temporary function near as 'com.esri.UDFNear'
        create temporary function dist as 'com.esri.UDFDist'
   

   
          c:dataSource-ref="hive-ds"/>


A Hive data source bean is defined using the Apache commons database connection pool library. The data source driver class property is set to the Hive JDBC driver and a set of SQL statements are executed upon start up.  These statements add the jar containing the UDF classes to the distributed cache and declares a reference to the 'near' and 'dist' UDFs.  Finally, a JDBC Spring template is defined with a reference to the aforementioned data source. This template will be injected into a service bean to enable SQL query execution and row mapping.

The see physically the result of the query on a world map, the Flex API for ArcGIS Server is utilized. Bridging the server side and the client side is the Spring Flex Integration project. This enables a Flex client application to execute functions on Spring based Remote Objects.


@Service("hiveService")
@RemotingDestination
public class HiveService
{
    @Autowired
    public JdbcTemplate jdbcTemplate;

    public FeatureSet query(final String where)
    {
        final List list = jdbcTemplate.query("SELECT X,Y,NAME FROM CITIES WHERE " + where, new RowMapper()
        {
            public Feature mapRow(final ResultSet resultSet, final int i) throws SQLException
            {
                final double x = WebMercator.longitudeToX(resultSet.getDouble(1));
                final double y = WebMercator.latitudeToY(resultSet.getDouble(2));
                final MapPoint mapPoint = new MapPoint(x, y);

                final String name = resultSet.getString(3);
                final ASObject attributes = new ASObject();
                attributes.put("name", name);

                return new Feature(mapPoint, attributes);
            }
        });
        final Feature[] features = new Feature[list.size()];
        list.toArray(features);
        return new FeatureSet(features);
    }
}

I've talked extensively in my previous posts about the beauty of the no-impedance mismatch between the server side and client side transfer objects such as in the case of FeatureSet, MapPoint and Feature instances. This HiveService is injected with the Spring defined JDBC template and exposes a 'query' function that expects a 'where' clause string. Upon a successful execution, each result set is transformed into a Feature instance that is appended to a list who this transferred back to the client in a FeatureSet instance.

Onto the client. This is a simple Flex implementation where the map and a data grid are stacked on top of each other. A user can enter a where clause that is sent to the server 'query' function using the RemoteObject capabilities.  Upon success execution, a FeatureSet is retuned and the features are rendered on the map in a GraphicLayer instance and the same features are rendered in a DataGrid instance as data rows. A user can click on a row in the data grid to highlight the feature on the map. Vice versa, a user can click on a feature on the map to highlight a row in the data grid.

I know that this is a lot of information.  Thanks for reading it through. Like usual you can find all the source code from here.

No comments: