Boyang Yue

Software Engineering, Big Data, and the miscellaneous

16 Jan 2022

The Comprehensive Guide to Hive UDF

One of the most essential features of Spark is interaction with Hive, the data warehouse platform built on top of Hadoop. Naturally, Spark SQL supports the integration of Hive UDFs, UDAFs, and UDTFs.

At a glance, delving into Hive UDFs might seem unnecessary in the Spark context, considering the extensive functionalities provided by Spark UDF. Nevertheless, Hive UDF could prove indispensable in particular scenarios, such as building pure SQL environments or optimizing performance. Despite the abundance of Spark tutorials, there is a dearth of practical guides on how to work with Hive UDF, and that’s why this article is being written.

What is UDF

Both Spark and Hive come equipped with an assortment of built-in functions. When the built-in functions fail to meet complex or specific requirements, User Defined Functions (UDFs) empower developers to devise custom functions tailored to address unique use cases.

Hive supports three categories of custom functions:

  • User Defined Functions (UDFs)
    • Operate on single rows and return a singular value.
  • User Defined Aggregate Functions (UDAFs)
    • Process multiple rows and return a single aggregated value.
  • User Defined Tabular Functions (UDTFs)
    • Operate on a single row but return multiple rows.

In a more restricted sense, UDF refers to any function, user-defined or built-in, that takes a row argument or one or more columns from a row and returns a single value. At the time of penning this article (with Spark’s latest stable release being 3.2.0), Spark inherently supports UDFs and UDAFs, but it lacks native support for UDTFs. As an alternative, we can employ a UDF that returns an array of objects, coupled with the explode function to mimic UDTF functionality.

Dependency

Writing UDFs in Spark and Hive is not exactly the same, not to mention writing a Hive UDF that would be called in the Spark SQL. Hive UDFs can be classified into simple and generic, both of which share the same dependency. After initialize a new Java project with Maven as the build system, the first step is to add hive-exec as a dependency.

<dependencies>
    <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>hive-exec</artifactId>
        <version>2.3.9</version>
        <scope>provided</scope>
    </dependency>
</dependencies>

The dependency scope, it should be noted, ought to be specified as provided. It’s much more manageable to leverage dependencies in the Spark environment instead of constructing an Uber JAR, which may instigate conflicts with pre-existing dependencies. Misconfiguration can commonly culminate in an exception:

org.apache.spark.sql.AnalysisException: 
    No handler for UDF/UDAF/UDTF 'com.***': 
        java.lang.ClassNotFoundException: com.***;
            ...

Furthermore, it is crucial to synchronize the version of hive-exec with the version embedded in Spark. This can be ascertained by examining specific JARs within the system classpath or referencing spark.sql.hive.version on the Environment tab of Spark UI. The Hive version in Spark 3.2.0 is 2.3.9, whereas the latest stable release of Hive is 3.1.2.

A trivial error may occur while packaging after importing hive-exec 2.3.9:

Could not find artifact 
    org.pentaho:pentaho-aggdesigner-algorithm:pom:5.1.5-jhyde
    ...

This transpires due to the unavailability of the 5.1.5-jhyde package, a dependency of hive-exec 2.3.9, in the Maven Central Repository. To rectify this issue, incorporate an alternative repository containing this package into the project’s pom file.

<repositories>
    <repository>
        <id>aliyun</id>
        <url>https://maven.aliyun.com/repository/spring-plugin</url>
    </repository>
</repositories>

(Note: This issue is prevalent with certain newer versions of Maven, possibly attributed to the fact that Maven 3.8.1 disables support for repositories using “http” protocol.)

Simple UDF

Embarking on the creation of a simple Hive UDF is rather straightforward:

  1. Extend the org.apache.hadoop.hive.ql.exec.UDF class.
  2. Implement one or more evaluate methods.

Let us consider a scenario where we have the side length of a square and wish to compute its area:

package org.example.hive.udf;

import org.apache.hadoop.hive.ql.exec.UDF;

public class GetSquareArea extends UDF {
    public int evaluate(int length) {
        return length * length;
    }

    public double evaluate(double length) {
        return length * length;
    }
}

Being a simple function, the UDF can be subjected to testing through JUnit.

package org.example.hive.udf;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class GetSquareAreaTest {
    @Test
    void eval() {
        GetSquareArea target = new GetSquareArea();
        Assertions.assertAll(() -> Assertions.assertEquals(25, target.evaluate(5)),
                () -> Assertions.assertEquals(121.0, target.evaluate(11.0)));
    }
}

To build the project:

mvn clean package 

Then spark_hive_udf-1.0-SNAPSHOT.jar can be found in the target folder. To add the JAR and test locally:

bin/spark-sql --jars /path_to/spark_hive_udf-1.0-SNAPSHOT.jar

In production environments, it is standard practice to upload the JAR to HDFS.

> add jar hdfs://D2/projects/my-udf.jar;
> create temporary function get_square_area as 'org.example.hive.udf.GetSquareArea';
> select get_square_area(6); -- 36
> select get_square_area(8.0); -- 64.0

Here shows two features of the simple UDF:

  • It processes and returns primitive types.
  • It supports overloading.

In essence, a simple UDF can accept various data types to represent the column types, including Java primitive types & wrapper classes, and Hadoop & Hive writable types.

Hive Data TypesJava PrimitivesJava Wrapper ClassesHadoop I/O
tinyintbyteByteByteWritable
smallintshortShortShortWritable
intintIntegerIntWritable
bigintlongLongLongWritable
string-StringText
booleanbooleanBooleanBooleanWritable
floatfloatFloatFloatWritable
doubledoubleDoubleDoubleWritable
array-ListArrayWritable
map-MapMapWritable

However, for enhanced reliability, it is recommended to employ Hadoop Writable types. On the contrary, it is advisable to abstain from using Java primitive types, as they inadequately handle null types. Null values are prevalent in large datasets. With Java primitive types, developers don’t even get a chance to conduct a check for null arguments before IllegalArgumentException is triggered.

> select get_square_area(null);

org.apache.hadoop.hive.ql.metadata.HiveException: 
    Unable to execute method public int org.example.hive.udf.GetSquareArea.evaluate(int) 
    with arguments {null}:null
    ...

As its system is based on reflection and method overloading, the simple UDF can only accept restricted types. Overloaded methods are differentiated by the number and the types of arguments passed into the method, while the types of arguments cannot differ by only the generic type. There are several restrictions on generics, and one of them is, generic types cannot be instantiated with primitive types. Considering type erasure, complex nested type parameters (e.g. struct) are not supported in a simple UDF either. This is one of the reasons why a generic UDF is needed.

Generic UDF

To develop a generic Hive UDF:

  1. Extend the org.apache.hadoop.hive.ql.udf.generic.GenericUDF class.
  2. Implement three abstract methods.
/**
* Initialize this GenericUDF. This will be called once and only once per
* GenericUDF instance.
*
* @param arguments
*          The ObjectInspector for the arguments
* @throws UDFArgumentException
*           Thrown when arguments have wrong types, wrong length, etc.
* @return The ObjectInspector for the return value
*/
public abstract ObjectInspector initialize(ObjectInspector[] arguments)
    throws UDFArgumentException;

/**
* Evaluate the GenericUDF with the arguments.
*
* @param arguments
*          The arguments as DeferedObject, use DeferedObject.get() to get the
*          actual argument Object. The Objects can be inspected by the
*          ObjectInspectors passed in the initialize call.
* @return The
*/
public abstract Object evaluate(DeferredObject[] arguments)
    throws HiveException;

/**
* Get the String to be displayed in explain.
*/
public abstract String getDisplayString(String[] children);

A notable component is the initialize method. It accepts an array of ObjectInspectors that signifies the arguments of the query and returns an ObjectInspector for the return type. All objects are passed around using the Object type in a generic UDF, circumventing the superfluous overhead of object instantiation and deserialization. When Hive analyses the query, actual types of the arguments passed into the generic UDF are computed and the initialize method would is invoked. Subsequently, arguments are parsed lazily in the evaluate method using the ObjectInspector returned by the initialize method .

For instance, let’s say we aim to ascertain the individual with the highest salary within a department:

package org.example.hive.udf;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

public class GetHighestSalary extends GenericUDF {

    ListObjectInspector listIO;
    StructObjectInspector structIO;

    @Override
    public ObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException {
        if (args.length != 1) {
            throw new UDFArgumentLengthException("This UDF only takes 1 argument.");
        }
        ObjectInspector oi = args[0];

        if (!(oi instanceof ListObjectInspector)) {
            throw new UDFArgumentException("The argument must be a list.");
        }
        this.listIO = (ListObjectInspector) oi;

        ArrayList<String> structFieldName = new ArrayList<>();
        ArrayList<ObjectInspector> structFieldOI = new ArrayList<>();

        structFieldName.add("name");
        structFieldOI.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        structFieldName.add("salary");
        structFieldOI.add(PrimitiveObjectInspectorFactory.javaIntObjectInspector);

        this.structIO = ObjectInspectorFactory.getStandardStructObjectInspector(structFieldName,
                structFieldOI);

        return ObjectInspectorFactory.getStandardListObjectInspector(this.structIO);
    }

    @Override
    public Object evaluate(DeferredObject[] args) throws HiveException {
        List<Object> staff = (List<Object>) args[0].get();

        List<Integer> salary = staff.stream().map(
                s -> (Integer) structIO.getStructFieldData(s, structIO.getStructFieldRef("salary")))
                .collect(Collectors.toList());

        Integer highestSalary = Collections.max(salary);

        return staff.stream()
                .filter(s -> (Integer) structIO.getStructFieldData(s,
                        structIO.getStructFieldRef("salary")) == highestSalary)
                .collect(Collectors.toList());
    }

    @Override
    public String getDisplayString(String[] strings) {
        return "Get Highest Salary";
    }
}

To invoke the UDF and obtain the result:

> create temporary function get_highest_salary as 'org.example.hive.udf.GetHighestSalary';
> select get_highest_salary(
>   array(
>     named_struct("name", "Bob", "salary", 2000),
>     named_struct("name", "Tom", "salary", 8000),
>     named_struct("name", "Jerry", "salary", 6000)
>   )
> );

[{"name":"Tom","salary":8000}]

This exemplifies that a generic UDF can accommodate structs of typed named fields, and it is highly adaptable, supporting all complex nested parameters. Furthermore, the generic UDF provides greater flexibility and better null handling, while it requires slightly more boilerplate code.

To illustrate, here shows the call pattern for a generic UDF:

  1. The UDF is initialized using a default constructor.
  2. The initialize method is invoked, receiving an ObjectInspector for each argument. Developers can throw an exception if the method receives incorrect numbers or types of arguments. If all checks pass, developers probably would like to save the ObjectInspectors in member variables for future use. Subsequently, the return type of the UDF should be determined and a corresponding ObjectInspector should be returned.
  3. The evaluate method is executed for each row in the query with the provided arguments. Values of the deferred objects should be extracted using the stored ObjectInspectors received in the initialize method. After the business logic is performed, the value that aligns with the ObjectInspector returned from the initialize method should be returned. In other words, the return value of the initialize method determines the type of return value of the evaluate method.

Although it is a must to override the getDisplayString method, this method is merely used inside the Hadoop tasks to display debugging information. Besides, the generic UDF provides a callback close method to free up resources.

/**
* Close GenericUDF.
* This is only called in runtime of MapRedTask.
*/
@Override
public void close() throws IOException {
}

While this non-abstract method doesn’t have to be implemented, it can be useful, such as in the case where the UDF makes a JDBC connection.

Once the call order is decided, writing tests for the generic UDF becomes intuitive:

package org.example.hive.udf;

import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.List;

public class GetHighestSalaryTest {

    public Object hireStaff(String name, Integer salary) {
        Object[] staff = new Object[2];
        staff[0] = name;
        staff[1] = salary;
        return staff;
    }

    @Test
    void eval() throws HiveException {
        GetHighestSalary target = new GetHighestSalary();

        ArrayList<String> structFieldName = new ArrayList<>();
        ArrayList<ObjectInspector> structFieldOI = new ArrayList<>();
        structFieldName.add("name");
        structFieldOI.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        structFieldName.add("salary");
        structFieldOI.add(PrimitiveObjectInspectorFactory.javaIntObjectInspector);
        ObjectInspector structIO = ObjectInspectorFactory
                .getStandardStructObjectInspector(structFieldName, structFieldOI);

        ObjectInspector listOI = ObjectInspectorFactory.getStandardListObjectInspector(structIO);

        ListObjectInspector resultInspector =
                (ListObjectInspector) target.initialize(new ObjectInspector[] {listOI});

        List<Object> staffList = new ArrayList<Object>();
        staffList.add(hireStaff("Brian", 7500));
        staffList.add(hireStaff("Carl", 3670));
        staffList.add(hireStaff("John", 8600));
        staffList.add(hireStaff("Lydia", 5000));

        Object result = target.evaluate(
                new GenericUDF.DeferredObject[] {new GenericUDF.DeferredJavaObject(staffList)});

        Assertions.assertArrayEquals(resultInspector.getList(result).toArray(),
                new ArrayList<Object>() {
                    {
                        add(hireStaff("John", 8600));
                    }
                }.toArray());
    }
}

The ObjectInspector is a key concept when working with the Generic UDF and it can be confusing at first. An instance of ObjectInspector should be obtained via a factory pattern, as shown in the source code.

/**
 * ObjectInspector helps us to look into the internal structure of a complex
 * object.
 *
 * A (probably configured) ObjectInspector instance stands for a specific type
 * and a specific way to store the data of that type in the memory.
 *
 * For native java Object, we can directly access the internal structure through
 * member fields and methods. ObjectInspector is a way to delegate that
 * functionality away from the Object, so that we have more control on the
 * behavior of those actions.
 *
 * An efficient implementation of ObjectInspector should rely on factory, so
 * that we can make sure the same ObjectInspector only has one instance. That
 * also makes sure hashCode() and equals() methods of java.lang.Object directly
 * works for ObjectInspector as well.
 */

public interface ObjectInspector extends Cloneable {}

There are multiple versions of ObjectInspectors. For the primitive types, they can be obtained by the static field of the PrimitiveObjectInspectorFactory.

Hive Data TypesJava Wrapper TypesWritable Types
tinyintjavaByteObjectInspectorwritableByteObjectInspector
smallintjavaShortObjectInspectorwritableShortObjectInspector
intjavaIntObjectInspectorwritableIntObjectInspector
bigintjavaLongObjectInspectorwritableLongObjectInspector
stringjavaStringObjectInspectorwritableStringObjectInspector
booleanjavaBooleanObjectInspectorwritableBooleanObjectInspector
floatjavaFloatObjectInspectorwritableFloatObjectInspector
doublejavaDoubleObjectInspectorwritableDoubleObjectInspector

For composite data types such as array, map, and struct, they can be obtained by the static method of ObjectInspectorFactory.

Hive Data TypesObjectInspectorFactory Static Methods
arraygetStandardListObjectInspector
mapgetStandardMapObjectInspector
structgetStandardStructObjectInspector

Do note that the types of return values of the initialize method and the evaluate method should match, as some tools like ObjectInspectorUtils.compare might be strict about this. Cast exceptions would arise if an inappropriate ObjectInspector is used.

Reflection

Writing at this juncture, a pertinent question lingers: Is Hive UDF still a relevant and worthwhile function to delve into, especially in the shadow of the growing dominance of Spark? Candidly, my stance leans towards skepticism. Let’s envision several scenarios where Hive UDFs may prove valuable:

  1. When tethered to a purely SQL environment but still faced with the imperative to craft intricate business logic.
  2. When striving for performance optimization, such as transposing PySpark UDF to Hive UDF to curtail the overhead associated with Py4j.

The first scenario embodies a peculiar contradiction: One finds oneself shackled to SQL, but the constraints and inadequacies of the development environment necessitate the use of Hive UDFs for essential implementations. It’s akin to being cornered into making do with a fledgling development ecosystem. Often, this predicament is born out of the misalignment between techniques and organizational methodologies and best practices - probably and essentially, a quagmire of office politics or bureaucratic entanglements. In these instances, Hive UDF does emerge as a viable stopgap.

The second scenario, at a cursory glance, seems more reasonable, particularly in the context of utilizing PySpark. Nonetheless, it may not always represent the optimal approach and could potentially backfire. It is widely acknowledged that performance isn’t always the ultimate goal; often, maintainability and scalability take precedence. Besides, from my experience, a substantial portion of business logic can be executed exclusively with Spark SQL built-in functions alone, especially with the introduction of transform and aggregate. However, this necessitates that developers are well-versed and proficient in the requisite skills (e.g. a firm handle on functional programming). Moreover, pure SQL does fall short in certain contexts, such as when there is a need to interface with external systems.

At the Spark + AI Summit 2019, hosted by Databricks, Facebook delved into their journey of migrating pipelines from Hive to Spark SQL through a talk titled Supporting Over a Thousand Custom Hive User Defined Functions and shed light on various hurdles encountered during this process, including the incompatibility, bad performance, and issues with serialization/deserialization. What is also worth noticing is that, as it stands, Hive UDF is essentially an enigmatic “black box” to Spark, rendering its internal code impervious to optimizations through Catalyst or Tungsten. From my point of view, the industry trend should be progressively migrating from Hive to Spark, rather than the opposite. With all this being said, familiarizing oneself with Hive UDF can still be beneficial. After all, knowledge is no burden.