DataX迁移MongoDB

DataX迁移MongoDB

源码修改

  • 目前版本中,在迁移MongoDB时,若列的类型为二进制,mongodbreader未做处理,源码src/main/java/com/alibaba/datax/plugin/reader/mongodbreader/MongoDBReader.java

    if (tempCol == null) {
         
        //continue; 这个不能直接continue会导致record到目的端错位
        record.addColumn(new StringColumn(null));
    }else if (tempCol instanceof Double) {
         
        //TODO deal with Double.isNaN()
        record.addColumn(new DoubleColumn((Double) tempCol));
    } else if (tempCol instanceof Boolean) {
         
        record.addColumn(new BoolColumn((Boolean) tempCol));
    } else if (tempCol instanceof Date) {
         
        record.addColumn(new DateColumn((Date) tempCol));
    } else if (tempCol instanceof Integer) {
         
        record.addColumn(new LongColumn((Integer) tempCol));
    }else if (tempCol instanceof Long) {
         
        record.addColumn(new LongColumn((Long) tempCol));
    } else {
         
        if(KeyConstant.isArrayType(column.getString(KeyConstant.COLUMN_TYPE))) {
         
            String splitter = column.getString(KeyConstant.COLUMN_SPLITTER);
            if(Strings.isNullOrEmpty(splitter)) {
         
                throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE,
                    MongoDBReaderErrorCode.ILLEGAL_VALUE.getDescription());
            } else {
         
                ArrayList array = (ArrayList)tempCol;
                String tempArrayStr = Joiner.on(splitter).join(array);
                record.addColumn(new StringColumn(tempArrayStr));
            }
        } else {
         
            record.addColumn(new StringColumn(tempCol.toString()));
        }
    }
    
  • 修改为:

    if (tempCol == null) {
         
        //continue; 这个不能直接continue会导致record到目的端错位
        record.addColumn(new StringColumn(null));
    }else if (tempCol instanceof Double) {
         
        //TODO deal with Double.isNaN()
        record.addColumn(new DoubleColumn((Double) tempCol));
    } else if (tempCol instanceof Boolean) {
         
        record.addColumn(new BoolColumn((Boolean) tempCol));
    } else if (tempCol instanceof Date) {
         
        record.addColumn(new DateColumn((Date) tempCol));
    } else if (tempCol instanceof Integer) {
         
        record.addColumn(new LongColumn((Integer) tempCol));
    }else if (tempCol instanceof Long) {
         
        record.addColumn(new LongColumn((Long) tempCol));
    }  else if (tempCol instanceof Binary) {
         
        // 处理 MongoDB 的 Binary 类型数据
        Binary binaryData = (Binary) tempCol;
        byte[] binaryBytes = binaryData.getData();
        // 将字节数组添加到 DataX 中的二进制列
        record.addColumn(new BytesColumn(binaryBytes));
    } else {
         
        if(KeyConstant.isArrayType(column.getString(KeyConstant.COLUMN_TYPE))) {
         
            String splitter = column.getString(KeyConstant.COLUMN_SPLITTER);
            if(Strings.isNullOrEmpty(splitter)) {
         
                throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE,
                    MongoDBReaderErrorCode.ILLEGAL_VALUE.getDescription());
            } else {
         
                ArrayList array = (ArrayList)tempCol;
                String tempArrayStr = Joiner.on(splitter).join(array);
                record.addColumn(new StringColumn(tempArrayStr));
            }
        } else {
         
            record.addColumn(new StringColumn(tempCol.toString()));
        }
    }
    
    • 修改源码后,要重新打包,由于只更改了mongodbreader,故在打包时,可以考虑将根

迁移脚本

  • 编写job脚本:1.json

    {
         
        "job": {
         
            "content": [
                {
         
                    "reader": {
         
                        "name": "mongodbreader",
                        "parameter": {
         
                            "address": ["ip1:27017"],
                            "collectionName": "data",
                            "column": [
    							{
         
                                    "name": "_id",
                                    "type": "long"
                                },
                                {
         
                                    "name": "fileContent",
                                    "type": "bytes"
                                }
    						],
                            "dbName": "monitor",
                            "userName": "root",
                            "userPassword": "123456",
    						"query": {
         
    						  "_id": {
         
    							"$lt": 21
    						  }
    						}
                        }
                    },
                    "writer": {
         
                        "name": "mongodbwriter",
                        "parameter": {
         
                            "address": ["ip2:27017"],
                            "collectionName": "data",
                            "column": [
    							{
         
                                    "name": "_id",
                                    "type": "long"
                                },
                                {
         
                                    "name": "fileContent",
                                    "type": "bytes"
                                }
    						],
    						"writeMode": {
         
    						  "isReplace": "true",
    						  "replaceKey": "_id"
    						}
                            "dbName": "test",
    						"userName": "root",
                            "userPassword": "123456",
                        }
                    }
                }
            ],
            "setting": {
         
                "speed": {
         
                    "channel": "2"
                }
            }
        }
    }
    
    • reader中的query节点为查询条件,上述demo中是查询_id小于21的记录。
  • 执行命令:

    python datax.py G:\Code\1.json
    
    • datax.py在打包后的target目录下,相对路径:target\datax\datax\bin

相关推荐

  1. DataX迁移MongoDB

    2023-12-20 12:26:02       50 阅读
  2. MongoDB数据迁移

    2023-12-20 12:26:02       32 阅读
  3. datax oracle->pg库 迁移

    2023-12-20 12:26:02       35 阅读
  4. mysql 迁移-data目录拷贝方式

    2023-12-20 12:26:02       30 阅读
  5. 迁移 MySQL 数据到 OceanBase 集群(mysqldump+datax)

    2023-12-20 12:26:02       22 阅读
  6. DataX-数据迁移Oracle到Mysql-ETL工具

    2023-12-20 12:26:02       19 阅读

最近更新

  1. TCP协议是安全的吗?

    2023-12-20 12:26:02       16 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2023-12-20 12:26:02       16 阅读
  3. 【Python教程】压缩PDF文件大小

    2023-12-20 12:26:02       15 阅读
  4. 通过文章id递归查询所有评论(xml)

    2023-12-20 12:26:02       18 阅读

热门阅读

  1. mongoDB

    mongoDB

    2023-12-20 12:26:02      47 阅读
  2. 如何用python开发打包APP

    2023-12-20 12:26:02       52 阅读
  3. element-ui 抽屉里面嵌套弹窗

    2023-12-20 12:26:02       43 阅读
  4. 74.搜索二维矩阵

    2023-12-20 12:26:02       52 阅读
  5. C++对C语言数据类型的扩展

    2023-12-20 12:26:02       39 阅读