Flink SQL操作Hudi并同步Hive使用总结

前言

记录总结自己第一次如何使用Flink SQL读写Hudi并同步Hive,以及遇到的问题及解决过程。

关于Flink SQL客户端如何使用可以参考:Flink SQL 客户端查询Hive配置及问题解决

版本

Flink 1.14.3
Hudi 0.12.0/0.12.1

本文采用Flink yarn-session模式,不会的可以参考之前的文章。

Hudi包

下载地址:https://repo1.maven.org/maven2/org/apache/hudi/hudi-flink1.14-bundle/0.12.1/hudi-flink1.14-bundle-0.12.1.jar

如果想同步Hive的话,就不能使用上面下载的包了,必须使用profileflink-bundle-shade-hive自己打包,这里参考官网:https://hudi.apache.org/cn/docs/syncing_metastore,打包命令

## Hive3
mvn clean package -DskipTests -Drat.skip=true -Pflink-bundle-shade-hive3  -Dflink1.14 -Dscala-2.12
## Hive2
mvn clean package -DskipTests -Drat.skip=true -Pflink-bundle-shade-hive2  -Dflink1.14 -Dscala-2.12
## Hive1
mvn clean package -DskipTests -Drat.skip=true -Pflink-bundle-shade-hive1  -Dflink1.14 -Dscala-2.12

为了避免不必要的麻烦,最好自己修改一下对应的profile中的Hive版本,比如我们环境的Hive版本为HDP的3.1.0.3.1.0.0-78,我们将hive.version对应的值改为3.1.0.3.1.0.0-78再打包就可以了。

方式1、建在内存中、不同步Hive表

这种建表方式,元数据在内存中,退出SQL客户端后,需要重新建表(表数据文件还在)

建表

CREATE TABLE test_hudi_flink1 (
  id int PRIMARY KEY NOT ENFORCED,
  name VARCHAR(10),
  price int,
  ts int,
  dt VARCHAR(10)
)
PARTITIONED BY (dt)
WITH (
  'connector' = 'hudi',
  'path' = '/tmp/hudi/test_hudi_flink1',
  'table.type' = 'MERGE_ON_READ',
  'hoodie.datasource.write.keygenerator.class' = 'org.apache.hudi.keygen.ComplexAvroKeyGenerator',
  'hoodie.datasource.write.recordkey.field' = 'id',
  'hoodie.datasource.write.hive_style_partitioning' = 'true'
);

PRIMARY KEY和hoodie.datasource.write.recordkey.field作用相同,联合主键时,可以单独放在最后 PRIMARY KEY (id1, id2) NOT ENFORCED

CREATE TABLE test_hudi_flink1 (
  id1 int,
  id2 int,
  name VARCHAR(10),
  price int,
  ts int,
  dt VARCHAR(10),
PRIMARY KEY (id1, id2) NOT ENFORCED
)

Insert

insert into test_hudi_flink1 values (1,'hudi',10,100,'2022-10-31'),(2,'hudi',10,100,'2022-10-31');

至于upadte和delete可以参考官网:https://hudi.apache.org/cn/docs/flink-quick-start-guide

查询

select * from test_hudi_flink1;

通过Flink查询出来的结果是没有Hudi的元数据字段的

方式2、建在Hive Catalog中、不同步Hive表

这种建表方式,会在对应的Hive中创建表,好处是,当我们退出SQL客户端后,再重新启动一个新的SQL客户端,我们可以直接使用Hive Catalog中的表,进行读写数据。

建表

CREATE CATALOG hive_catalog WITH (
    'type' = 'hive',
    'default-database' = 'default',
    'hive-conf-dir' = '/usr/hdp/3.1.0.0-78/hive/conf'
);

use catalog hive_catalog;
use hudi;

CREATE TABLE test_hudi_flink2 (
  id int PRIMARY KEY NOT ENFORCED,
  name VARCHAR(10),
  price int,
  ts int,
  dt VARCHAR(10)
)
PARTITIONED BY (dt)
WITH (
  'connector' = 'hudi',
  'path' = '/tmp/hudi/test_hudi_flink2',
  'hoodie.datasource.write.keygenerator.class' = 'org.apache.hudi.keygen.ComplexAvroKeyGenerator',
  'hoodie.datasource.write.recordkey.field' = 'id',
  'hoodie.datasource.write.hive_style_partitioning' = 'true'
);

Insert

insert into test_hudi_flink2 values (1,'hudi',10,100,'2022-10-31'),(2,'hudi',10,100,'2022-10-31');

查询

select * from test_hudi_flink2;

但是同样地也无法查询Hudi的元数据字段,而且在Hive表中查询此表是会有异常的,因为表结构是这样的:

show create table test_hudi_flink2;
+----------------------------------------------------+
|                   createtab_stmt                   |
+----------------------------------------------------+
| CREATE TABLE `test_hudi_flink2`(                   |
| )                                                  |
| ROW FORMAT SERDE                                   |
|   'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'  |
| STORED AS INPUTFORMAT                              |
|   'org.apache.hadoop.mapred.TextInputFormat'       |
| OUTPUTFORMAT                                       |
|   'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat' |
| LOCATION                                           |
|   'hdfs://cluster1/warehouse/tablespace/managed/hive/hudi.db/test_hudi_flink2' |
| TBLPROPERTIES (                                    |
|   'flink.connector'='hudi',                        |
|   'flink.hoodie.datasource.write.hive_style_partitioning'='true',  |
|   'flink.hoodie.datasource.write.recordkey.field'='id',  |
|   'flink.partition.keys.0.name'='dt',              |
|   'flink.path'='/tmp/hudi/test_hudi_flink2',       |
|   'flink.schema.0.data-type'='INT NOT NULL',       |
|   'flink.schema.0.name'='id',                      |
|   'flink.schema.1.data-type'='VARCHAR(10)',        |
|   'flink.schema.1.name'='name',                    |
|   'flink.schema.2.data-type'='INT',                |
|   'flink.schema.2.name'='price',                   |
|   'flink.schema.3.data-type'='INT',                |
|   'flink.schema.3.name'='ts',                      |
|   'flink.schema.4.data-type'='VARCHAR(10)',        |
|   'flink.schema.4.name'='dt',                      |
|   'flink.schema.primary-key.columns'='id',         |
|   'flink.schema.primary-key.name'='PK_3386',       |
|   'transient_lastDdlTime'='1667129407')            |
+----------------------------------------------------+
select * from test_hudi_flink2;
Error: Error while compiling statement: FAILED: SemanticException Line 0:-1 Invalid column reference 'TOK_ALLCOLREF' (state=42000,code=40000)

方式3、建在内存中、同步Hive表

这样建表的好处是,可以利用同步到Hive中的表,通过Hive SQL和Spark SQL查询,也可以利用Spark进行insert、update等,但是Flink SQL客户端退出后,不能利用Hive中的表进行写数据,需要再重新建表

MOR表

建表

配置环境变量HIVE_CONF_DIR

export HIVE_CONF_DIR=/usr/hdp/3.1.0.0-78/hive/conf
CREATE TABLE test_hudi_flink3 (
  id int PRIMARY KEY NOT ENFORCED,
  name VARCHAR(10),
  price int,
  ts int,
  dt VARCHAR(10)
)
PARTITIONED BY (dt)
WITH (
  'connector' = 'hudi',
  'path' = '/tmp/hudi/test_hudi_flink3',
  'table.type' = 'MERGE_ON_READ',
  'hoodie.datasource.write.keygenerator.class' = 'org.apache.hudi.keygen.ComplexAvroKeyGenerator',
  'hoodie.datasource.write.recordkey.field' = 'id',
  'hoodie.datasource.write.hive_style_partitioning' = 'true',
  'hive_sync.enable' = 'true',
  'hive_sync.mode' = 'hms',
  'hive_sync.metastore.uris' = 'thrift://indata-192-168-44-128.indata.com:9083',
  'hive_sync.conf.dir'='/usr/hdp/3.1.0.0-78/hive/conf',
  'hive_sync.db' = 'hudi',
  'hive_sync.table' = 'test_hudi_flink3',
  'hive_sync.partition_fields' = 'dt',
  'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.HiveStylePartitionValueExtractor'
);

HIVE_CONF_DIR和hive_sync.conf.dir作用是一样的,如果没有配置hive_sync.conf.dir的话就会取HIVE_CONF_DIR,如果都没有配置,同步会有异常,具体看后面的异常解决。

关于同步Hive的参数,官方文档上说hive_sync.metastore.uris是必须的,但是经过验证,不设置也可以,因为hive_sync.conf.dir下面有hive-site.xml读取里面的配置信息获取即可,Spark SQL同步Hive也是读取hive-site.xml的。其他的参数可以自己去了解

同步表

只有在写数据的时候才会触发同步Hive表

insert into test_hudi_flink3 values (1,'hudi',10,100,'2022-10-31'),(2,'hudi',10,100,'2022-10-31');

然后我们可以看到在Hive库中生成了两张表test_hudi_flink3_rotest_hudi_flink3_rt,这和我们使用Spark SQL同步的表是一样的,可以用Hive查询,也可以用Spark查询、写数据

MOR表一开始没有生成parquet文件,在Hive里查询为空(RO、RT都为空),我们可以在SparkSQL里再插入几条数据,就可以查询出数据来了

# ro、rt都支持Spark SQL insert
insert into test_hudi_flink3_ro values (3,'hudi',10,100,'2022-10-31'),(4,'hudi',10,100,'2022-10-31');
insert into test_hudi_flink3_rt values (5,'hudi',10,100,'2022-10-31'),(6,'hudi',10,100,'2022-10-31');

关于Flink SQL和Spark SQL配置一致性问题:

  • hoodie.datasource.write.keygenerator.class 这里设置的为ComplexAvroKeyGenerator,也就是复合主键,原因是Fink SQL 默认值为SimpleKey,但是SparkSQL默认值SqlKeyGenerator,它是ComplexKeyGenerator,也就是默认值为复合主键,但是由于ComplexKeyGenerator在hudi-spark-client中,flink模块没有,所以flink中需要设置hudi-client-common中的ComplexAvroKeyGenerator即可保持一致性(如果keygenerator不一致会导致重复数据)
  • hoodie.datasource.write.hive_style_partitioning flink sql默认值为false,但是SparkSQL为true,所以这里设置为true
  • hive_sync.partition_extractor_class SparkSQL中默认值为MultiPartKeysValueExtractor,对于本例中的字符串类型的分区字段是支持的,但是Flink SQL中的默认值为SlashEncodedDayPartitionValueExtractor,它要求分区字段必须是日期格式的,所以这里设置为HiveStylePartitionValueExtractor即可解决
  • hoodie.allow.operation.metadata.field Flink支持这个配置项,当为true时,Hudi元数据字段中会多一个_hoodie_operation,但是目前Spark还不支持,所以对于这种,对于Flink SQL同步的Hive表,不能再通过Spark SQL写数据,不过后面我会提PR支持
  • 我们发现对于Flink的很多配置项key值或者默认值都和Spark或者Hudi Common中不一致,这一点如果需要Flink和Spark配合使用的话,就需要注意保持一致性

COW表

我们来看一下COW表会同步哪些表

建表

CREATE TABLE test_hudi_flink4 (
  id int PRIMARY KEY NOT ENFORCED,
  name VARCHAR(10),
  price int,
  ts int,
  dt VARCHAR(10)
)
PARTITIONED BY (dt)
WITH (
  'connector' = 'hudi',
  'path' = '/tmp/hudi/test_hudi_flink4',
  'table.type' = 'COPY_ON_WRITE',
  'hoodie.datasource.write.keygenerator.class' = 'org.apache.hudi.keygen.ComplexAvroKeyGenerator',
  'hoodie.datasource.write.recordkey.field' = 'id',
  'hoodie.datasource.write.hive_style_partitioning' = 'true',
  'hive_sync.enable' = 'true',
  'hive_sync.mode' = 'hms',
  'hive_sync.metastore.uris' = 'thrift://indata-192-168-44-128.indata.com:9083',
  'hive_sync.conf.dir'='/usr/hdp/3.1.0.0-78/hive/conf',
  'hive_sync.db' = 'hudi',
  'hive_sync.table' = 'test_hudi_flink4',
  'hive_sync.partition_fields' = 'dt',
  'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.HiveStylePartitionValueExtractor'
);

同步表

写数据触发同步Hive表

insert into test_hudi_flink4 values (1,'hudi',10,100,'2022-10-31'),(2,'hudi',10,100,'2022-10-31');

因为COW表只有RT表,所以不会通过_rt来区分,同步的表名和配置的表名一致。这点可以参考我之前总结的文章Hudi查询类型/视图总结

+----------------------------------------------------+
|                   createtab_stmt                   |
+----------------------------------------------------+
| CREATE EXTERNAL TABLE `test_hudi_flink4`(          |
|   `_hoodie_commit_time` string COMMENT '',         |
|   `_hoodie_commit_seqno` string COMMENT '',        |
|   `_hoodie_record_key` string COMMENT '',          |
|   `_hoodie_partition_path` string COMMENT '',      |
|   `_hoodie_file_name` string COMMENT '',           |
|   `id` int COMMENT '',                             |
|   `name` string COMMENT '',                        |
|   `price` int COMMENT '',                          |
|   `ts` int COMMENT '')                             |
| PARTITIONED BY (                                   |
|   `dt` string COMMENT '')                          |
| ROW FORMAT SERDE                                   |
|   'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'  |
| WITH SERDEPROPERTIES (                             |
|   'hoodie.query.as.ro.table'='false',              |
|   'path'='/tmp/hudi/test_hudi_flink4')             |
| STORED AS INPUTFORMAT                              |
|   'org.apache.hudi.hadoop.HoodieParquetInputFormat'  |
| OUTPUTFORMAT                                       |
|   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' |
| LOCATION                                           |
|   'hdfs://cluster1/tmp/hudi/test_hudi_flink4'      |
| TBLPROPERTIES (                                    |
|   'last_commit_time_sync'='20221030204426056',     |
|   'spark.sql.sources.provider'='hudi',             |
|   'spark.sql.sources.schema.numPartCols'='1',      |
|   'spark.sql.sources.schema.numParts'='1',         |
|   'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"integer","nullable":false,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"price","type":"integer","nullable":true,"metadata":{}},{"name":"ts","type":"integer","nullable":true,"metadata":{}},{"name":"dt","type":"string","nullable":true,"metadata":{}}]}',  |
|   'spark.sql.sources.schema.partCol.0'='dt',       |
|   'transient_lastDdlTime'='1667133869')            |
+----------------------------------------------------+

方式4、建在Hive Catalog中、同步Hive表

这样建表的好处是,我们既可以利用Hive Catalog中的表通过Flink SQL写数据,也可以利用同步的Hive表通过Hive SQL查询、Spark SQL读写

MOR表

建表

配置环境变量HIVE_CONF_DIR

export HIVE_CONF_DIR=/usr/hdp/3.1.0.0-78/hive/conf
CREATE CATALOG hive_catalog WITH (
    'type' = 'hive',
    'default-database' = 'default',
    'hive-conf-dir' = '/usr/hdp/3.1.0.0-78/hive/conf'
);

use catalog hive_catalog;
use hudi;

CREATE TABLE test_hudi_flink5 (
  id int PRIMARY KEY NOT ENFORCED,
  name VARCHAR(10),
  price int,
  ts int,
  dt VARCHAR(10)
)
PARTITIONED BY (dt)
WITH (
  'connector' = 'hudi',
  'path' = '/tmp/hudi/test_hudi_flink5',
  'table.type' = 'MERGE_ON_READ',
  'hoodie.datasource.write.keygenerator.class' = 'org.apache.hudi.keygen.ComplexAvroKeyGenerator',
  'hoodie.datasource.write.recordkey.field' = 'id',
  'hoodie.datasource.write.hive_style_partitioning' = 'true',
  'hive_sync.enable' = 'true',
  'hive_sync.mode' = 'hms',
  'hive_sync.metastore.uris' = 'thrift://indata-192-168-44-128.indata.com:9083',
  'hive_sync.conf.dir'='/usr/hdp/3.1.0.0-78/hive/conf',
  'hive_sync.db' = 'hudi',
  'hive_sync.table' = 'test_hudi_flink5',
  'hive_sync.partition_fields' = 'dt',
  'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.HiveStylePartitionValueExtractor'
);

同步表

同样写几条数据触发同步Hive

insert into test_hudi_flink5 values (1,'hudi',10,100,'2022-10-31'),(2,'hudi',10,100,'2022-10-31');

然后我们可以看到在Hive库中生成了三张表test_hudi_flink4test_hudi_flink4_rotest_hudi_flink4_rt,其中test_hudi_flink4是Flink格式的,和上面的方式2中的表结构一样,不能用Hive查询,但是可以在Flink中写数据、查询数据,对于test_hudi_flink4_rotest_hudi_flink4_rt,我们就可以用Hive查询,也可以用Spark查询、写数据。

COW表

但是对于COW表来说因为同步的表名没有_rt也就是和Hive Catalog表名一样,这样就有问题,所以我们需要区分出Hive Catalog表和同步的表名,一种方式是修改hive_sync.table,另一种方式是Hive Catalog表和同步表保存在不同的Hive Database中,比如下面的示例

CREATE CATALOG hive_catalog WITH (
    'type' = 'hive',
    'default-database' = 'default',
    'hive-conf-dir' = '/usr/hdp/3.1.0.0-78/hive/conf'
);

use catalog hive_catalog;
use flink_hudi;

CREATE TABLE test_hudi_flink6 (
  id int PRIMARY KEY NOT ENFORCED,
  name VARCHAR(10),
  price int,
  ts int,
  dt VARCHAR(10)
)
PARTITIONED BY (dt)
WITH (
  'connector' = 'hudi',
  'path' = '/tmp/hudi/test_hudi_flink6',
  'table.type' = 'MERGE_ON_READ',
  'hoodie.datasource.write.keygenerator.class' = 'org.apache.hudi.keygen.ComplexAvroKeyGenerator',
  'hoodie.datasource.write.recordkey.field' = 'id',
  'hoodie.datasource.write.hive_style_partitioning' = 'true',
  'hive_sync.enable' = 'true',
  'hive_sync.mode' = 'hms',
  'hive_sync.metastore.uris' = 'thrift://indata-192-168-44-128.indata.com:9083',
  'hive_sync.conf.dir'='/usr/hdp/3.1.0.0-78/hive/conf',
  'hive_sync.db' = 'hudi',
  'hive_sync.table' = 'test_hudi_flink6',
  'hive_sync.partition_fields' = 'dt',
  'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.HiveStylePartitionValueExtractor'
);

这样Catalog表保存在flink_hudi库中,同步的表保存在hudi库中

insert into test_hudi_flink6 values (1,'hudi',10,100,'2022-10-31'),(2,'hudi',10,100,'2022-10-31');

异常解决

记录异常信息及解决方法,由于没有及时整理,顺序可能有点乱

不同步Hive

最开始使用的在maven里下载的包,在Hive里查询发现没有同步表,后来在官网https://hudi.apache.org/cn/docs/syncing_metastore,发现要使用profileflink-bundle-shade-hive自己打包

mvn clean package -DskipTests -Drat.skip=true -Pflink-bundle-shade-hive3  -Dflink1.14 -Dscala-2.12

但是用自己的打的包依旧不成功,在Flink SQL客户端没有异常,就很费解,后来发现在Flink yarn-session对应的web界面的Job Manager菜单里能看到具体的日志信息,比如写Hudi的Starting Javalin,这样就好办了,根据具体的异常信息对应解决即可。

异常1


2022-10-29 16:02:41,694 WARN  hive.metastore  [] - set_ugi() not successful, Likely cause: new client talking to old server. Continuing without it.
org.apache.thrift.transport.TTransportException: null
	at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0]

解决方法:配置环境变量HIVE_CONF_DIR或者配置参数hive_sync.conf.dir,这个问题困扰了我一整天,因为关于这个配置网上没有资料,我是在源码中找到的答案:

  public static org.apache.hadoop.conf.Configuration getHiveConf(Configuration conf) {
    String explicitDir = conf.getString(FlinkOptions.HIVE_SYNC_CONF_DIR, System.getenv("HIVE_CONF_DIR"));
    org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration();
    if (explicitDir != null) {
      hadoopConf.addResource(new Path(explicitDir, "hive-site.xml"));
    }
    return hadoopConf;
  }

// StreamWriteOperatorCoordinator
this.hiveConf = new SerializableConfiguration(HadoopConfigurations.getHiveConf(conf));  

异常2

java.lang.NoSuchMethodError: org.apache.parquet.schema.Types$PrimitiveBuilder.as(Lorg/apache/parquet/schema/LogicalTypeAnnotation;)Lorg/apache/parquet/schema/Types$Builder;
	at org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:177) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0]
	at org.apache.parquet.avro.AvroSchemaConverter.convertUnion(AvroSchemaConverter.java:242) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0]
	at org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:199) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0]
	at org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:152) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0]
	at org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:260) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0]
	at org.apache.parquet.avro.AvroSchemaConverter.convertFields(AvroSchemaConverter.java:146) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0]
	at org.apache.parquet.avro.AvroSchemaConverter.convert(AvroSchemaConverter.java:137) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0]
	at org.apache.hudi.common.table.TableSchemaResolver.readSchemaFromLogFile(TableSchemaResolver.java:485) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0]
	at org.apache.hudi.common.table.TableSchemaResolver.readSchemaFromLogFile(TableSchemaResolver.java:468) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0]
	at org.apache.hudi.common.table.TableSchemaResolver.fetchSchemaFromFiles(TableSchemaResolver.java:604) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0]
	at org.apache.hudi.common.table.TableSchemaResolver.getTableParquetSchemaFromDataFile(TableSchemaResolver.java:251) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0]
	at org.apache.hudi.common.table.TableSchemaResolver.getTableAvroSchemaFromDataFile(TableSchemaResolver.java:117) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0]
	at org.apache.hudi.common.table.TableSchemaResolver.hasOperationField(TableSchemaResolver.java:537) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0]
	at org.apache.hudi.util.Lazy.get(Lazy.java:53) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0]
	at org.apache.hudi.common.table.TableSchemaResolver.getTableSchemaFromLatestCommitMetadata(TableSchemaResolver.java:208) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0]
	at org.apache.hudi.common.table.TableSchemaResolver.getTableAvroSchemaInternal(TableSchemaResolver.java:176) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0]
	at org.apache.hudi.common.table.TableSchemaResolver.getTableAvroSchema(TableSchemaResolver.java:138) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0]
	at org.apache.hudi.common.table.TableSchemaResolver.getTableParquetSchema(TableSchemaResolver.java:156) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0]
	at org.apache.hudi.sync.common.HoodieSyncClient.getStorageSchema(HoodieSyncClient.java:103) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0]
	at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:206) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0]
	at org.apache.hudi.hive.HiveSyncTool.doSync(HiveSyncTool.java:157) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0]
	at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:141) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0]
	at org.apache.hudi.sink.StreamWriteOperatorCoordinator.doSyncHive(StreamWriteOperatorCoordinator.java:335) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0]
	at org.apache.hudi.sink.StreamWriteOperatorCoordinator.syncHive(StreamWriteOperatorCoordinator.java:326) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0]
	at org.apache.hudi.sink.StreamWriteOperatorCoordinator.handleEndInputEvent(StreamWriteOperatorCoordinator.java:426) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0]
	at org.apache.hudi.sink.StreamWriteOperatorCoordinator.lambda$handleEventFromOperator$3(StreamWriteOperatorCoordinator.java:278) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0]
	at org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:130) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_181]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_181]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_181]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_181]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]

原因是jar包冲突,根据异常信息可知hudi包的org.apache.parquet.schema.Types这个类可能和flink环境下面的其他jar包冲突,经排查,发现hive-exec*.jar里也有一样的类名,将该jar包删除,验证问题解决。(在之前的文章中有写到因为缺某些类,才会将hive-exec*.jar放到flink下面,现在验证不缺这个类了,如果还有的话,可以找其他没有冲突的包替代)

异常3

Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to start the operator coordinators
        at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:90) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
        at org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:585) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
        at org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:965) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
        at org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:882) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
        at org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:389) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
        at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:181) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.lambda$start$0(AkkaRpcActor.java:624) ~[flink-rpc-akka_0f8ea990-3e27-4639-9ea1-d92b6879facc.jar:1.14.3]
        at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_0f8ea990-3e27-4639-9ea1-d92b6879facc.jar:1.14.3]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:623) ~[flink-rpc-akka_0f8ea990-3e27-4639-9ea1-d92b6879facc.jar:1.14.3]
        ... 20 more
Caused by: java.lang.NoClassDefFoundError: org/apache/hadoop/hive/conf/HiveConf
        at org.apache.hudi.sink.StreamWriteOperatorCoordinator.initHiveSync(StreamWriteOperatorCoordinator.java:315) ~[hudi-flink1.14-bundle-0.12.1.jar:0.12.1]
        at org.apache.hudi.sink.StreamWriteOperatorCoordinator.start(StreamWriteOperatorCoordinator.java:191) ~[hudi-flink1.14-bundle-0.12.1.jar:0.12.1]
        at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:194) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
        at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:85) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
        at org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:585) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
        at org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:965) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
        at org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:882) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
        at org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:389) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
        at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:181) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.lambda$start$0(AkkaRpcActor.java:624) ~[flink-rpc-akka_0f8ea990-3e27-4639-9ea1-d92b6879facc.jar:1.14.3]
        at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_0f8ea990-3e27-4639-9ea1-d92b6879facc.jar:1.14.3]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:623) ~[flink-rpc-akka_0f8ea990-3e27-4639-9ea1-d92b6879facc.jar:1.14.3]
        ... 20 more
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hive.conf.HiveConf
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ~[?:1.8.0_181]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[?:1.8.0_181]
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) ~[?:1.8.0_181]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[?:1.8.0_181]
        at org.apache.hudi.sink.StreamWriteOperatorCoordinator.initHiveSync(StreamWriteOperatorCoordinator.java:315) ~[hudi-flink1.14-bundle-0.12.1.jar:0.12.1]
        at org.apache.hudi.sink.StreamWriteOperatorCoordinator.start(StreamWriteOperatorCoordinator.java:191) ~[hudi-flink1.14-bundle-0.12.1.jar:0.12.1]
        at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:194) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
        at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:85) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
        at org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:585) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
        at org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:965) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
        at org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:882) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
        at org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:389) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
        at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:181) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.lambda$start$0(AkkaRpcActor.java:624) ~[flink-rpc-akka_0f8ea990-3e27-4639-9ea1-d92b6879facc.jar:1.14.3]
        at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_0f8ea990-3e27-4639-9ea1-d92b6879facc.jar:1.14.3]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:623) ~[flink-rpc-akka_0f8ea990-3e27-4639-9ea1-d92b6879facc.jar:1.14.3]

这个异常就是使用在maven下载的包同步hive产生的异常,但是无法在Flink yarn-session对应的web界面看日志,因为yarn-session对应的任务会跑挂掉,我们可以通过下面的命令查看日志信息

yarn logs -applicationId application_1666247158647_0121

异常4


Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not instantiate outputs in order.
	at org.apache.flink.streaming.api.graph.StreamConfig.getOutEdgesInOrder(StreamConfig.java:488) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriters(StreamTask.java:1612) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriterDelegate(StreamTask.java:1596) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:376) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:359) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:332) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:324) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:314) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.<init>(OneInputStreamTask.java:75) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:1.8.0_181]
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[?:1.8.0_181]
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[?:1.8.0_181]
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[?:1.8.0_181]
	at org.apache.flink.runtime.taskmanager.Task.loadAndInstantiateInvokable(Task.java:1582) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:740) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
	at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_181]
Caused by: java.io.IOException: unexpected exception type

这个原因是因为yarn-session所用的hudi包和sql-client所用的hudi包版本不一致,改为一致即可

其他异常

比如缺相关依赖包异常,去环境上Hive路径下拷贝对应的jar包到Flink路径下即可

总结

本文记录了自己使用Flink SQL读写Hudi表并同步Hive的一些配置,并且做了Flink SQL和Spark SQL的一致性配置。其实关于Flink SQL读写Hudi还有一个HoodieHiveCatalog也可以使用,有时间等我研究明白了,再分享给大家。

相关阅读

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注