博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark中foreachRDD的正确使用
阅读量:3958 次
发布时间:2019-05-24

本文共 1278 字,大约阅读时间需要 4 分钟。

常出现的使用误区:

误区一:在driver上创建连接对象(比如网络连接或数据库连接)
    如果在driver上创建连接对象,然后在RDD的算子函数内使用连接对象,那么就意味着需要将连接对象序列化后从driver传递到worker上。而连接对象(比如Connection对象)通常来说是不支持序列化的,此时通常会报序列化的异常(serialization errors)。因此连接对象必须在worker上创建,不要在driver上创建。

dstream.foreachRDD { rdd =>  val connection = createNewConnection() // 数据库连接在driver上执行  rdd.foreach { record =>  connection.send(record) // 在worker上执行  }}

误区二:为每一条记录都创建一个连接对象

    通常来说,连接对象的创建和销毁都是很消耗时间的。因此频繁地创建和销毁连接对象,可能会导致降低spark作业的整体性能和吞吐量。

dstream.foreachRDD { rdd =>rdd.foreach { record =>    val connection = createNewConnection() //每插入一条数据,创建一个连接    connection.send(record)    connection.close()    }}
 

比较正确的做法是:对DStream中的RDD,调用foreachPartition,对RDD中每个分区创建一个连接对象,使用一个连接对象将一个分区内的数据都写入数据库中。这样可以大大减少创建的连接对象的数量。

正确做法一:为每个RDD分区创建一个连接对象

dstream.foreachRDD { rdd =>rdd.foreachPartition { partitionOfRecords =>    val connection = createNewConnection()    partitionOfRecords.foreach(record => connection.send(record))    connection.close()    }}

 

正确做法二:为每个RDD分区使用一个连接池中的连接对象

dstream.foreachRDD { rdd =>rdd.foreachPartition { partitionOfRecords =>    // 从数据库连接池中获取连接    val connection = ConnectionPool.getConnection()    partitionOfRecords.foreach(record => connection.send(record))    ConnectionPool.returnConnection(connection) // 用完以后将连接返    回给连接池,进行复用}}
转载自
你可能感兴趣的文章
Python使用heapq实现小顶堆(TopK大)、大顶堆(BtmK小)
查看>>
用python的matplotlib包绘制热度图
查看>>
matplot pip安装
查看>>
序列S的所有可能情况
查看>>
在Linux上用pip安装scipy
查看>>
随机salt二次加密及hash加密漫谈
查看>>
linux 技巧:使用 screen 管理你的远程会话
查看>>
同时装了Python3和Python2,怎么用pip?
查看>>
linux tar 解压缩zip文件报错的解决
查看>>
vim,ctag和Taglist
查看>>
Ubuntu的apt命令详解
查看>>
Ubuntu Server 设置sshd
查看>>
sort,uniq命令的使用。
查看>>
linux下md5加密(使用openssl库C实现)
查看>>
openssl、MD5的linux安装方法
查看>>
DevC++ 工程没有调试信息的解决办法
查看>>
http消息长度的确定
查看>>
手机和电脑如何连接蓝牙
查看>>
HTTP协议参数
查看>>
wireshark检索命令
查看>>