跳到主要内容
版本: 1.0

FAQ

部署 FAQ

Q1:无法识别 hdfs 访问地址别名,提交任务报错:

Caused by: java.io.IOException: Cannot instantiate file system for URI: hdfs://nameservice/flink/lib
...
Caused by: java.lang.IllegalArgumentException: java.net.UnknownHostException: nameservice
...

提供 3 种方式解决这个问题:

HADOOP_HOME=/opt/hadoop
  • 编辑启动脚本 auto.sh,增加以下代码:
export HADOOP_HOME=/opt/hadoop

Q2:planner 找不到或冲突,报错:

java.lang.NoClassDefFoundError: org/apache/flink/table/planner/...

org.apache.flink.table.api.TableException: Could not instantiate the executor. Make sure a planner module is on the classpath
  1. flink-table-planner 和 flink-table-planner-loader 无法同时存在于 classpath,同时加载时会报错

  2. /flink/opt 目录下的 flink-table-planner.jar 拷贝至 /dinky/plugins/flink

  3. 删除 /dinky/plugins/flink 目录下 flink-table-planner-loader.jar

Q3:servlet 或 gson 找不到或冲突,报错:

java.lang.NoSuchMethodError:javax.servlet....

java.lang.NoSuchMethodError:com.google.gson....
  1. 添加 flink-shade-hadoop-uber-3 包时,删除该包内部的 javax.servlet、com.google.gson 等冲突内容

  2. 添加其他依赖时,同样排除 servlet、gson 等依赖项

Q4:添加 cdc、kafka 等 connector 依赖,报错找不到类:

java.lang.ClassNotFoundException: org.apache.kafka.connect....
  1. 检查 /flink/lib 目录和 /dinky/plugins 目录下有相应 jar 包

  2. 检查是否有 flink-sql-connector-kafka 和 flink-connector-kafka 这种胖瘦包放在一起,一般只放胖包不放瘦包

  3. 检查其他客户端依赖,如 kafka-client 包

Q5:连接 Hive 异常,报错:

Caused by: java.lang.ClassNotFoundException: org.apache.http.client.HttpClient

/dinky/plugins 下添加 httpclient-4.5.3.jar、httpcore-4.4.6.jar 依赖

Q6:与 CDH 集成并使用 HiveCatalog,报错:

Cause by: java.lang.IllegalArgumentException: Unrecognized Hadoop major version number: 3.1.1.7.2.8.0-224
at org.apache.hadoop.hive.shims.ShimLoader.getMajorVersion(Shimloader.java:177) ~[flink-sql-connector-hive-2.2.0...]
  1. 要从新编译对应 Flink 版本的源码,下载 Flink 对应版本源码,并切换到对应 hive 版本 flink-sql-connector 目录下

  2. 修改 pom,添加如下信息

<!-- 添加 cloudera 仓库 -->
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
<name>Cloudera Repositories</name>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>

<!-- 以 flink-sql-connector-hive-2.2.0 为例,修改如下 -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>2.1.1-cdh6.3.2</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.pentaho</groupId>
<artifactId>pentaho-aggdesigner-algorithm</artifactId>
</exclusion>
</exclusions>
</dependency>
  1. 编译成功后,将对应的 jar 包拿出来即可,分别放到 flink/libdinky/plugins 下,重启 Flink 和 Dinky

开发 FAQ

Q1:提交作业后报错 ClassCastException 类加载问题:

java.lang.ClassCastException: class org.dinky.model.Table cannot be cast to class org.dinky.model.
Table (org.dinky.model.Table is in unnamed module of loader 'app'; org.dinky.model.
Table is in unnamed module of loader org.apache.flink.util.ChildFirstClassLoader @66565121)

Caused by: java.lang.ClassCastException: cannot assign instance of org.apache.kafka.clients.consumer.OffsetResetStrategy to field
org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer.offsetResetStrategy of type
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetResetStrategy in instance of
org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer

Caused by: java.lang.ArrayStoreException
at java.lang.System.arraycopy(Native Method) ~[?:1.8.0_202]
  1. 如果是普通作业或 stanlone 模式,在作业开发页面右下角自定义配置内加入此参数 classloader.resolve-order: parent-first

  2. 如果是 application 作业在集群配置中,加入自定义参数 classloader.resolve-order: parent-first 即可

Q2:提交 cdc 作业后,类型转换报错:

java.lang.classCastException: java.lang.Integer cannot be cast to java.lang.Booleanat 
org.apache.flink.table.data.GenericRowData.getBoolean(GenericRowData.java:134)at
org.apache.doris.flink.deserialization,converter.DorisRowConverter ,lambda$createExternalConverter$81fa9aea33 (DorisRowConverter.java:239)
at org.apache.doris.flink.deserialization.converter.DorisRowConverter.

org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
...
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
...
Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Boolean

  1. 在 mysql 中 tinyint 会被 cdc 识别为 boolean 类型,造成转换错误

  2. 添加以下参数

'jdbc.properties.tinyInt1isBit' = 'false',
'jdbc.properties.transformedBitIsBoolean' = 'false',

Q3:修改 sql 后无法从 checkpoint 启动,报错:

Cannot map checkpoint/savepoint state for operator xxx to the new program, because the operator is not available in the new program.
If you want to allow to skip this, you can set the --allowNonRestoredState option on the CLI.

添加参数 execution.savepoint.ignore-unclaimed-state:true,跳过无法还原的算子

本地调试FAQ

Q1: 为什么不支持除了 Java8 和 Java11 以外的其他版本呢?

A1: 因为 Flink 目前仅支持 Java8 和 Java11。


Q2: 为什么 Maven Profile 切换了不生效呢?? 提交任务时还是报各种依赖问题,Profile 像是不生效呢?????

A2-1: 因为你没刷新 Maven Profile,导致不生效

A2-2: 因为虽然你刷新了 Maven Profile, 没重启 Dinky 服务(不要问为什么需要重启,这是一个开发人员的基本认知),导致依赖没包含在当前已启动的服务中.

A2-3: Profile 切的不对,注意灰色的 Profile 选项.请仔细仔细仔细仔细的看看.

A2-4: 查看你的 IDEA 的版本,不要太旧,尽量保持在 2022.x 以上(别问为什么,上边已经说了)

A2-5: Profile 切换加载,基于依赖的 <scope></scope>标签属性声明 ,如果不懂,自行百度/谷歌/CSDN/StackOverFlow/ChatGPT


Q3: 我在 IDEA 中启动 Dinky 后, 前端页面访问不了, 报错找不到页面??????

A3-1: 可以在执行 Install 阶段勾选 web Profile,不然 dinky-admin/src/main/resources/ 下没有静态资源文件目录 static.

A3-2: 可以单独启动前端,参考 本地调试-启动前端 部分


Q4: 为什么在 IDEA 中启动 Dinky 后,Profile 也加载了,我用到了一个 connector 仍然报错找不到类?????

A4-1: Dinky 只加载了 Dinky 在开发中过程中用到的相关 Flink 依赖以及 Flink 的基本环境依赖.如报此类错误,请检查你的 pom.xml 文件,是否包含了 connector 所依赖的 jar 包

A4-2: 如上述问题未解决,请检查你的 dinky-flink 模块下的与你Flink 版本一致的 pom.xml 文件,是否包含了 connector 所依赖的 jar 包

SQL开发FAQ

Q1:ADD JAR语法如何支持s3路径? 由于dinky代码检查阶段无相应flink-conf.yaml配置,导致无法获取正确ak,sk,所以在sql层面做了语法增强,支持在sql内进行s3配置,如下:

SET 's3.access-key'='xxx';
SET 's3.secret-key'='xxx';
SET 's3.endpoint'='xxx';

ADD JAR 's3://xxxx/udf.jar';

create temporary function ip2int as 'com.sopei.udf.Ip2Int';

select ip2int('192.168.1.1')as ip;