flinkjar开发 自定义函数

编写自定义加密函数,继承ScalarFunction类,实现eval方法,参数个数类型和返回值根据业务来自定义。

import org.apache.flink.table.functions.ScalarFunction;
import javax.crypto.Cipher;
import javax.crypto.KeyGenerator;
import javax.crypto.SecretKey;
import javax.crypto.spec.SecretKeySpec;
import java.nio.charset.StandardCharsets;
import java.security.Key;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.Base64;

public class AESUtil extends ScalarFunction {
    private static String DEFAULT_CIPHER_ALGORITHM = "SHA1PRNG";
    private static String KEY_ALGORITHM = "AES";
    private static String key = "AD42F6697B035B75";

//必须有这个方法,在这个方法里实现业务逻辑
    public String eval(String str) {
        return encrypt(str);
    }
    /**
     * 加密
     *
     * @param key
     * @param messBytes
     * @return
     */
    private static byte[] encrypt(Key key, byte[] messBytes) throws Exception {
        if (key != null) {

            Cipher cipher = Cipher.getInstance(KEY_ALGORITHM);
            cipher.init(Cipher.ENCRYPT_MODE, key);
            return cipher.doFinal(messBytes);
        }
        return null;
    }

    /**
     * AES(256)解密
     *
     * @param key
     * @param cipherBytes
     * @return
     */
    private static byte[] decrypt(Key key, byte[] cipherBytes) throws Exception {
        if (key != null) {

            Cipher cipher = Cipher.getInstance(KEY_ALGORITHM);
            cipher.init(Cipher.DECRYPT_MODE, key);
            return cipher.doFinal(cipherBytes);
        }
        return null;
    }


    /**
     * 生成加密秘钥
     *
     * @return
     * @throws NoSuchAlgorithmException
     */
    private static KeyGenerator getKeyGenerator() {
        KeyGenerator keygen = null;
        try {
            keygen = KeyGenerator.getInstance(KEY_ALGORITHM);
            SecureRandom secureRandom = SecureRandom.getInstance(DEFAULT_CIPHER_ALGORITHM);
            secureRandom.setSeed(key.getBytes());
            keygen.init(128, secureRandom);
        } catch (NoSuchAlgorithmException e) {

        }

        return keygen;
    }

    public static String encrypt(String message) {
        try {
            KeyGenerator keygen = getKeyGenerator();
            SecretKey secretKey = new SecretKeySpec(keygen.generateKey().getEncoded(), KEY_ALGORITHM);
            return Base64.getEncoder().encodeToString(encrypt(secretKey, message.getBytes(StandardCharsets.UTF_8)));
        } catch (Exception e) {

        }
        return null;
    }

    public static String decrypt(String ciphertext) {
        try {
            KeyGenerator keygen = getKeyGenerator();
            SecretKey secretKey = new SecretKeySpec(keygen.generateKey().getEncoded(), KEY_ALGORITHM);
            return new String(decrypt(secretKey, Base64.getDecoder().decode(ciphertext)), StandardCharsets.UTF_8);
        } catch (Exception e) {

        }
        return null;
    }
  

FlinkCDC mysql到mysql 业务代码


import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.example.util.AESUtil;

public class FlinkMysqlToMysql {

    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));
        env.enableCheckpointing(5000);
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 创建Table环境
        EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

// 注册源表和目标表
        tEnv.executeSql("create table sourceTable(id bigint,test VARCHAR, PRIMARY KEY (id) NOT ENFORCED ) WITH (\n" +
//源表连接器一定得是mysql-cdc
                "'connector' = 'mysql-cdc'," +
                "'hostname' = 'localhost',\n" +
                " 'port' = '3306',\n" +
                " 'database-name' = 'testdb',\n" +
                " 'table-name' = 'flinktest',\n" +
                " 'username' = 'root',\n" +
                " 'password' = 'admin'\n" +
                ")");
//这里注册加密函数
        tEnv.createTemporarySystemFunction("encrypt", new AESUtil());
//sql里面使用自定义函数加密
        Table result = tEnv.sqlQuery("SELECT id,encrypt(test) FROM sourceTable");
        tEnv.registerTable("sourceTable", result);
          //创建skink表
        tEnv.executeSql("create table targetTable(id bigint,test VARCHAR ,PRIMARY KEY (id) NOT ENFORCED ) WITH (\n" +
//目标表连接器是jdbc
                "'connector' = 'jdbc'," +
                "'url' = 'jdbc:mysql://localhost:3306/testdb?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=false',\n" +
                " 'table-name' = 'flinktest2',\n" +
                " 'username' = 'root',\n" +
                " 'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
                " 'password' = 'admin'\n" +
                ")");
// 执行CDC过程
        String query = "INSERT INTO targetTable SELECT * FROM sourceTable";
        tEnv.executeSql(query).print();
    }
}

运行结果,加密成功

相关推荐

  1. hive定义函数

    2024-02-03 06:46:01       8 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-02-03 06:46:01       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-02-03 06:46:01       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-02-03 06:46:01       19 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-02-03 06:46:01       20 阅读

热门阅读

  1. PHP 配置Redis拓展及应用

    2024-02-03 06:46:01       37 阅读
  2. 学习方法分享

    2024-02-03 06:46:01       35 阅读
  3. Python数据库编程:SQLite、MySQL与MongoDB

    2024-02-03 06:46:01       32 阅读
  4. NetApp FAS2750 和 FAS2820 简化分布式企业的存储

    2024-02-03 06:46:01       33 阅读