使用Clojure编写Hive UDF

Table of Contents

在Hive中可以用调用外部脚本的方式处理数据,用外部脚本的好处是不受语言限制,通常情况下我们用Python写一些外部脚本来处理一些数据,但是有些情况下写成UDF(User-defined function)可以简化查询语句,也能更好地使用条件判断。

Hive是用JAVA开发的,不过任何运行在JVM上的语言,只要支持和JAVA交互就可以写UDF,选Clojure的原因很简单,因为它是Lisp方言,也能很好地和JAVA代码交互,并且开发效率很高。

Hive的自定义函数分了三种:UDTF、UDAF和UDF,前两种我基本上用不上。

UDF编写很简单,只要保证Hive能调用你暴露的evaluate函数(方法)即可,同时满足:

  1. 继承org.apache.hadoop.hive.ql.exec.UDF;
  2. 调用UDF时,保证传入的参数和返回值为org.apache.hadoop.io.Text类型(参数和返回值也可以是String类型)

本文为Hive写一个upper函数,将字母转成大写(虽然很无聊,但因为代码量很少,容易让人看懂)

1 使用Leiningen创建项目

关于Leiningen是什么、怎么安装,这里不多说了。先创建个项目:

$ lein new upper

然后进入upper项目,修改project.clj,如下:

(defproject upper "0.1.0-SNAPSHOT"
    :description "FIXME: write description"
    :url "http://example.com/FIXME"
    :license {:name "Eclipse Public License"
    :url "http://www.eclipse.org/legal/epl-v10.html"}
    :plugins [[cider/cider-nrepl "0.8.0-SNAPSHOT"]]
    :dependencies [[org.clojure/clojure "1.5.1"]
		   [hive/hive-exec "0.5.0"]
		   [org.apache.hadoop/hadoop-core "0.20.2-dev"]])

主要改动:

1、增加Hadoop和Hive的依赖

2、安装cider/cider-nrepl插件,便于和Cider交互开发

然后自动安装依赖:

$ lein deps

2 编写UDF

编辑src/upper/core.clj,详细请看代码注释:

(ns upper.core
  (:import [org.apache.hadoop.hive.ql.exec UDF])
  (:import [org.apache.hadoop.io Text])
  (:gen-class
   :name upper.core
   ;; 继承org.apache.hadoop.hive.ql.exec.UDF,必须的
   :extends org.apache.hadoop.hive.ql.exec.UDF
   ;; 暴露evaluate方法给Hive,同时声明参数类型和返回值类型
   ;; 列表里是参数类型,参数数目必须和列表数目对应
   ;; 参数和返回值都必须是org.apache.hadoop.io.Text类型
   :methods [[evaluate [org.apache.hadoop.io.Text] org.apache.hadoop.io.Text]]))

(defn upper [string]
  (.toUpperCase string))

;; #^Text是元数据,声明函数返回的是Text类型
(defn #^Text -evaluate
  ;; 声明参数为Text类型
  [this #^Text string]
  ;; 先将参数值转成字符串类型处理后,再包装成Text类型
  (Text. (upper (.toString string))))

这里要注意一个细节,新建Text实例时不能给类型为nil(对应JAVA中的null)的参数,所以必须保证upper函数调用结果是字符串,否则会异常。

3 编译成class文件并打包

首先,需要修改project.clj,增加:aot选项:

:aot [upper.core]

然后,将代码编译成class。编译时一定要注意JDK版本,如果Hive用的JAVA6,就必须用JAVA6编译:

$ lein compile Compiling upper.core

最后,如果编译顺利通过,再将它打包成jar文件:

$ lein uberjar Created /tmp/upper/target/upper-0.1.0-SNAPSHOT.jar Created /tmp/upper/target/upper-0.1.0-SNAPSHOT-standalone.jar

这时会在target目录下生成两个.jar文件,其中以“-standalone.jar”结尾的jar文比较大,应该有17M左右,因为它包含了所有的依赖包括Clojure自身),所以可以直接放Hive上使用。

4 在Hive中调用

为了方便测试,新建一个hive.sql文件,把HiveQL语句写进去:

-- 将生成的jar文件添加到Hive中,它会自动分发给其他节点
add jar target/upper-0.1.0-SNAPSHOT-standalone.jar;
-- 注册成Hive函数
create temporary function my_upper as 'upper.core';

select my_upper(en_name) from user_information;

然后执行:

$ hive -f hive.sql

如果顺利的话,Hive将正常返回调用结果。

5 关于调试

开发UDF难免会遇到代码错误,可能是编译中出现错误,也可能是运行时出错。编译中出现错误一般就是语法一类的问题,很好解决;但如果是在Hive运行时出错,MapReduce任务会被杀死,调试也会变得比较难,所以在这之前建议写好单元测试,保证各个部分的代码稳定后再放到Hive中。如果在Hive运行中死掉,可以到JobTracker的WEB监控页面看调试日志中的JAVA异常信息。

6 关于性能优化

工作中我主要是写UDF处理海量日志,在写某个大量匹配功能的UDF时,一千万条日志花了4小时。但日志里要匹配的字段有大量是重复的,所以我用了memoize函数缓存结果,这时一千万条日志只跑了不到半个小时。如果你处理的数据中如果有比较多重复数据的话,建议使用memoize。

另外,在大规模数据处理下,一些代码细节优化对速度提升很不明显,建议仔细揣摩代码,优先优化流程。像上面说的场景,我在优化了数据处理流程的情况下,最后只花几分钟就可以处理完一千万条日志了。