Spark—GraphX实战 ID Mapping

ID Mapping(下)

上一节我们已经讲过什么是ID Mapping 了,顾名思义我们知道ID Mapping 的操作对象是ID,目标或者是动作是Mapping,也就是说我们要做的事情其实就是想把不同平台不同设备上的ID 打通,从而更好的去刻画用户,也就是说我们希望能打通用户各个维度的数据,从而更好的去服务业务服务用户。

上一节我们讲了常见的ID,例如登陆ID、设备ID 还有例如身份证号、手机号码等一些列的ID,都可以去标示一个用户,接下来我们就看一下如何去做ID Mapping,关于理论我们不多说。

这里有一点要说一下ID Mapping 做完之后我们就会做One ID ,也就是说我们为这些Mapping 上的ID 生成一个新的ID,这个ID 就是One ID,也就是说当我们的One ID 生成之后我们就可以使用这个ID 来打通所有的业务系统,关于One ID 可以参考下一篇文章。

ID Mapping 的难点

  1. 各个平台和各个设备ID 无法直接关联,所以要想关联需要找到关联对象,用SQL 举例就是如果你要想把 A 和 C关联起来,你必须找到可以同时和它们可以关联起来的B,而我们的用户ID 非常多,所以要想关联起来你不止要梳理清楚关联关系,而且你还得写大量的关联代码
  2. 时间问题,有些数据可能属于同一个人,但在某个阶段上,这些数据之间没有任何联系,那么这人的数据可能会被打上两个不同的标识,也就是说你需要在某一刻同时获得这些信息,但是这又是非常困难的。

图计算实现 ID Mapping

图计算(Graph Computing)在金融行业、互联网行业、社交领域、内容推荐领域都有着举足轻重的地位,更多应用可以参考,所以如果感兴趣你可以查阅资料,下面我们就看一下如何使用图计算实现我们的ID Mapping,并且这个图计算 体系也非常大,可以说是一个专门的学科,也不是一句两句说的清楚,大致可以分为下面四个方向。

  1. 图数据库:Neo4j、Titan、OrientDB、DEX和InfiniteGraph等基于遍历算法的、实时的图数据库;
  2. 图数据查询:对图数据库中的内容进行查询;
  3. 图数据分析:Google Pregel、Spark GraphX、GraphLab等图计算软件。传统的数据分析方法侧重于事物本身,即实体,例如银行交易、资产注册等等。而图数据不仅关注事物,还关注事物之间的联系。例如,如果在通话记录中发现张三曾打电话给李四,就可以将张三和李四关联起来,这种关联关系提供了与两者相关的有价值的信息,这样的信息是不可能仅从两者单纯的个体数据中获取的。
  4. 图数据可视化:OLTP风格的图数据库或者OLAP风格的图数据分析系统(或称为图计算软件),都可以应用图数据库可视化技术。需要注意的是,图可视化与关系数据可视化之间有很大的差异,关系数据可视化的目标是对数据取得直观的了解,而图数据可视化的目标在于对数据或算法进行调试。

在整个图论里学习里,我们知道有很多经典的应用,例如最短路径规划、PageRank、连通性 等,而我们的ID Mapping使用的就是连通性,下面我们简单介绍一下这个概念

在图论中,连通图基于连通的概念。在一个无向图 G 中,若从顶点i到顶点j有路径相连(当然从j到i也一定有路径),则称i和j是连通的。如果 G 是有向图,那么连接i和j的路径中所有的边都必须同向。如果图中任意两点都是连通的,那么图被称作连通图。如果此图是有向图,则称为强连通图(注意:有向图需要双向都有路径),图的连通性是图的基本性质。

连通分量:无向图 G的一个极大连通子图称为 G的一个连通分量(或连通分支)。连通图只有一个连通分量,即其自身;非连通的无向图有多个连通分量。到这里就可以解决我们的问题了,我们的一个图中有多个连通图,也就是多个连通分量,而这里的多个连通分量就是一个用户信息集合,其中里面的一个个顶点就是用户不同的ID 。

到这里我们的理论阶段就结束了,核心就是连通图,也就是说我们的目标就是利用图计算工具来计算图的连通性,来找到各种id标识之间的关联关系,从而识别出哪些id标识属于同一个人, ID Mapping的最后目标,就是形成一个id映射字典:

ID       guid
idx01 -> gid01
idy01 -> gid01
idz01 -> gid01
idx02 -> gid01

这里的 guid 就是连通图的一个标示, 我们可以先不考虑,我们只需要知道我们的这里的意思是 同一个guid 也就是同一连通图里的ID 是同一个用户的,这里就是idx01 idy01 idz01 idx02

或者是这样的一个字典

guid    ids
gid01   idx01,idy01,idz01,idx02
具体流程
  1. 将当日数据中的所有用户标识字段,及标志字段之间的关联,生成点集合 、边集合
  2. 将上一日的ids->guid的映射关系,也生成点集合、边集合
  3. 将上面两类点集合、边集合合并到一起生成一个图
  4. 再对上述的图执行“最大连通子图”算法,得到一个连通子图结果
  5. 最后将结果保存,也就是那些ID 属于一个用户
开发实现

这里我们使用Spark 的GraphX 组件实现,关于这个组件的使用可以参考Spark—GraphX编程指南 ,下面是我们的测试数据,每一个JSON 就是一个用户的数据,下面我们就需要将用户的数据打通。

{"name":"zs","uid":"u_001","account":"","email":"","phoneNbr":"15857635755","birthday":"","isRegistered":"","isLogin":"","addr":"","gender":"","phone":{"imei":"imei_zs_001","mac":"mac_zs_001","imsi":"imsi_zs_001","androidId":"androidId_zs_001","deviceId":"deviceId_zs_001","uuid":"uuid_zs_001"}}
{"name":"zs","uid":"u_001","account":"","email":"","phoneNbr":"15857635755","birthday":"","isRegistered":"","isLogin":"","addr":"","gender":"","phone":{"imei":"imei_zs_001","mac":"mac_zs_001","androidId":"androidId_zs_001","deviceId":"deviceId_zs_001"}}
{"name":"zs","uid":"u_001","phone":{"imei":"imei_zs_002","mac":"mac_zs_002","imsi":"imsi_zs_002","androidId":"androidId_zs_002","deviceId":"deviceId_zs_001","uuid":"uuid_zs_002"}}
{"name":"zs","uid":"u_001","phone":{"mac":"mac_zs_002","imsi":"imsi_zs_002","deviceId":"deviceId_zs_001","uuid":"uuid_zs_001"}}
{"name":"zs","uid":"u_001","phone":{"imei":"imei_zs_002","mac":"mac_zs_002","imsi":"imsi_zs_002","androidId":"androidId_zs_001","deviceId":"deviceId_zs_001","uuid":"uuid_zs_001"}}
{"name":"ls","uid":"u_002","phone":{"imei":"imei_ls_001","mac":"mac_ls_001","imsi":"imsi_ls_001","androidId":"androidId_ls_001","deviceId":"deviceId_ls_001","uuid":"uuid_ls_001"}}
{"name":"ls","uid":"u_002","phone":{"mac":"mac_ls_001","imsi":"imsi_ls_002","androidId":"androidId_ls_002","deviceId":"deviceId_ls_001","uuid":"uuid_ls_001"}}
{"name":"ls","uid":"u_002","phone":{"mac":"mac_ls_001","imsi":"imsi_ls_002","androidId":"androidId_ls_002","deviceId":"deviceId_ls_001","uuid":"uuid_ls_001"}}
{"name":"ls","uid":"u_002","phone":{"imei":"imei_ls_002","mac":"mac_ls_001","imsi":"imsi_ls_002","androidId":"androidId_ls_002","deviceId":"deviceId_ls_002","uuid":"uuid_ls_002"}}

下面就是代码实现

def main(args: Array[String]): Unit = {
  val spark: SparkSession = SparkSession.builder()
    .appName("id-mapping")
    .master("local[1]")
    .getOrCreate()

  import spark.implicits._

	// 读取数据
  val rawData = spark.read.textFile("data/graphx/idmapping/idmapping.txt")
  val data: RDD[Array[String]] = rawData.rdd.map(line => {
    //将每行数据解析成json对象
    val jsonObj = JSON.parseObject(line)

    // 从json对象中取user对象
    //      val userObj = jsonObj.getJSONObject("user")
    val uid = jsonObj.getString("uid")

    // 从user对象中取phone对象,也就是我们用户的各种标示 也就是ID
    val phoneObj = jsonObj.getJSONObject("phone")
    val imei = phoneObj.getOrDefault("imei", "").toString
    val mac = phoneObj.getOrDefault("mac", "").toString
    val imsi = phoneObj.getOrDefault("imsi", "").toString
    val androidId = phoneObj.getOrDefault("androidId", "").toString
    val deviceId = phoneObj.getOrDefault("deviceId", "").toString
    val uuid = phoneObj.getOrDefault("uuid", "").toString
    Array(uid, imei, mac, imsi, androidId, deviceId, uuid).filter(StringUtils.isNotBlank(_))
  }
  )

  // 构建顶点集
  val vertices: RDD[(Long, String)] = data.flatMap(arr => {
  	// 这个Graph的数据格式要求,其实hashCode 存在重复的问题,后面我们会在One ID 中解决这个问题
    for (id <- arr) yield (id.hashCode.toLong, id)
  }).distinct
  // 构建边集 这里使用了双重for 循环,是为了将一个用户的不同ID 连接起来
  val edges: RDD[Edge[String]] = data.flatMap(arr => {
    for (i <- 0 to arr.length - 2; j <- i + 1 to arr.length - 1) yield Edge(arr(i).hashCode.toLong, arr(j).hashCode.toLong, "")
  })
  .map(edge => (edge, 1)).reduceByKey(_ + _)
  .map(x => x._1)

    
  //用 点集合 和 边集合 构造一张图  使用Graph算法
  val graph = Graph(vertices, edges)


  //并调用最大连同子图算法VertexRDD[VertexId] ==>rdd 里面装的元组(Long值,组中最小值)
  val res: VertexRDD[VertexId] = graph.connectedComponents().vertices
  val firstIds = res.toDF("id_hashcode", "guid_hashcode")
  firstIds.show(10000,false)
}

我们看一下运行的结果

+-----------+-----------+
|id_hashcode|guid_hashcode|
+-----------+-----------+
|110929768  |-1782473361|
|2140645736 |2140645736 |
|-1884715312|-1884715312|
|-1381665247|-1381665247|
|1985563774 |1985563774 |
|-1483907198|-1483907198|
|-1908595409|-1908595409|
|-1419274017|-1782473361|
|-1115460503|-1782473361|
|-1419274018|-1419274018|
|-1018465903|-1908595409|
|-1483907197|-1483907197|
|-1782473362|-1782473362|
|1985563773 |-1782473361|
|-1782473361|-1782473361|
|-714652389 |-714652389 |
|2140645735 |-1782473361|
|-714652388 |-1908595409|
|110929767  |-1908595409|
|-1908595408|-1908595408|
|-1381665248|-1908595409|
|-1753513447|-1908595409|
|-1018465904|-1018465904|
|-1884715311|-1884715311|
+-----------+-----------+

我们看到这个结果的可读性不高,这是因为这个关系是ID 的 hashcode 的,所以我们需要将根据hashcode 将节点的ID 找出来。

//并调用最大连同子图算法VertexRDD[VertexId] ==>rdd 里面装的元组(Long值,组中最小值)
val res: VertexRDD[VertexId] = graph.connectedComponents().vertices
val firstIds = res.toDF("id_hashcode", "guid_hashcode")
val verticesDF=vertices.toDF("id_hashcode", "id")
firstIds.createOrReplaceTempView("ids")
verticesDF.createOrReplaceTempView("vertices")
// sql 实现
spark.sql(
  """
    |select
    |   b.id,c.id as guid
    |from
    |   ids a
    |inner join
    |   vertices b
    |on
    |   a.id_hashcode=b.id_hashcode
    |inner join
    |   vertices c
    |on
    |   a.guid_hashcode=c.id_hashcode
    |""".stripMargin)
  .show()

这里我们使用Spark SQL 的语法,当然你也可以使用DataFram api 或者 RDD api,下面我们看一下结果

+----------------+-----------+
|              id|       guid|
+----------------+-----------+
| deviceId_ls_001|imei_ls_001|
|androidId_ls_001|imei_ls_001|
|           u_002|imei_ls_001|
|     uuid_ls_001|imei_ls_001|
|     imei_ls_001|imei_ls_001|
|      mac_ls_001|imei_ls_001|
| deviceId_ls_002|imei_ls_001|
|androidId_ls_002|imei_ls_001|
|     uuid_ls_002|imei_ls_001|
|     imsi_ls_001|imei_ls_001|
|     imsi_ls_002|imei_ls_001|
|     imei_ls_002|imei_ls_001|
|     imsi_zs_002|uuid_zs_001|
|androidId_zs_001|uuid_zs_001|
|     imsi_zs_001|uuid_zs_001|
|      mac_zs_002|uuid_zs_001|
|     imei_zs_001|uuid_zs_001|
|     uuid_zs_002|uuid_zs_001|
|     imei_zs_002|uuid_zs_001|
|           u_001|uuid_zs_001|
+----------------+-----------+

这个结果就和我们上面说的字典的形式一样了,下面我们看一下另外一种字典的形式

val mapping= spark.sql(
  """
    |select
    |   b.id,c.id as guid
    |from
    |   ids a
    |inner join
    |   vertices b
    |on
    |   a.id_hashcode=b.id_hashcode
    |inner join
    |   vertices c
    |on
    |   a.guid_hashcode=c.id_hashcode
    |""".stripMargin)
mapping.createOrReplaceTempView("mapping")

spark.sql(
  """
    |select
    |   guid,collect_list(id) as ids
    |from
    |   mapping
    |group by
    |   guid
    |""".stripMargin)
  .show(false)
+-----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|guid       |ids                                                                                                                                                                    |
+-----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|imei_ls_001|[deviceId_ls_001, androidId_ls_001, u_002, uuid_ls_001, imei_ls_001, mac_ls_001, deviceId_ls_002, androidId_ls_002, uuid_ls_002, imsi_ls_001, imsi_ls_002, imei_ls_002]|
|uuid_zs_001|[imsi_zs_002, androidId_zs_001, imsi_zs_001, mac_zs_002, imei_zs_001, uuid_zs_002, imei_zs_002, u_001, uuid_zs_001, mac_zs_001, deviceId_zs_001, androidId_zs_002]     |
+-----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+

存在的问题
  1. 假如一个人偶尔用他朋友的手机登录过一次,则会出现他的uid跟他朋友的各设备id之间产生关联,也就会出现我们会把这两个用户判断成一个人,解决方案就是去掉弱关联,保留强关联,我们在构造edge 的时候可以根据阈值进行过滤

    val edges: RDD[Edge[String]] = data.flatMap(arr => {
      for (i <- 0 to arr.length - 2; j <- i + 1 to arr.length - 1) yield Edge(arr(i).hashCode.toLong, arr(j).hashCode.toLong, "")
    })
      .map(edge => (edge, 1)).reduceByKey(_ + _)
      .filter((edge,cnt)=>cnt>2)
      .map(x => x._1)
    

总结

  1. 这里我们是使用Spark 的GraphX 进行计算的,当然我们也可以使用其他图数据库来实现
  2. ID Mapping 的核心是连通图所以我们需要有一定的图论知识基础
  3. ID Mapping 是很多业务场景的基础,例如One ID 、用户画像

相关推荐

  1. IMAP4揭秘:实现高效、灵活的电子邮件管理

    2024-03-25 10:02:04       22 阅读
  2. 网易灵犀办公企业邮箱的IMAP和POP3服务器地址

    2024-03-25 10:02:04       9 阅读
  3. <span style='color:red;'>实战</span>

    实战

    2024-03-25 10:02:04      8 阅读
  4. MySQL实战

    2024-03-25 10:02:04       33 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-03-25 10:02:04       16 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-03-25 10:02:04       16 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-03-25 10:02:04       15 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-03-25 10:02:04       18 阅读

热门阅读

  1. 想注册滴滴司机驾龄不够怎么办?

    2024-03-25 10:02:04       16 阅读
  2. 10种常用排序算法简介

    2024-03-25 10:02:04       17 阅读
  3. 想注册滴滴司机驾龄不够怎么办?

    2024-03-25 10:02:04       17 阅读
  4. 【蓝桥杯3.23小白赛】(详解)

    2024-03-25 10:02:04       19 阅读
  5. 机器学习的步骤与方法

    2024-03-25 10:02:04       15 阅读
  6. 【ML】机器学习任务攻略 4

    2024-03-25 10:02:04       18 阅读
  7. Qt如何重写closeEvent

    2024-03-25 10:02:04       16 阅读
  8. 【Node.js】markdown 转 html

    2024-03-25 10:02:04       15 阅读
  9. 想注册滴滴司机驾龄不够怎么办?

    2024-03-25 10:02:04       15 阅读
  10. oracle 关闭归档

    2024-03-25 10:02:04       15 阅读