当前位置:首页 > spark分享文档
(3)IP:10.171.29.191 hostname:master
2、安装scala
(1)下载scala
wget http://downloads.typesafe.com/scala/2.10.5/scala-2.10.5.tgz
(2)解压文件
tar -zxvf scala-2.10.5.tgz
(3)配置环境变量 #vi/etc/profile
#SCALA VARIABLES START
export SCALA_HOME=/home/jediael/setupfile/scala-2.10.5 export PATH=$PATH:$SCALA_HOME/bin #SCALA VARIABLES END
$ source /etc/profile $ scala -version
Scala code runner version 2.10.5 -- Copyright 2002-2013, LAMP/EPFL
(4)验证scala $ scala
Welcome to Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_51). Type in expressions to have them evaluated. Type :help for more information.
scala> 9*9 res0: Int = 81
3、安装spark (1)下载spark wget
http://mirror.bit.edu.cn/apache/spark/spark-1.3.1/spark-1.3.1-bin-hadoop2.6.tgz
(2)解压spark tar -zxvf http://mirror.bit.edu.cn/apache/spark/spark-1.3.1/spark-1.3.1-bin-hadoop2.6.tgz
(3)配置环境变量 #vi/etc/profile
#SPARK VARIABLES START
3
export SPARK_HOME=/mnt/jediael/spark-1.3.1-bin-hadoop2.6 export PATH=$PATH:$SPARK_HOME/bin #SPARK VARIABLES END
$ source /etc/profile
(4)配置spark $ pwd
/mnt/jediael/spark-1.3.1-bin-hadoop2.6/conf
$ mv spark-env.sh.template spark-env.sh $vi spark-env.sh
export SCALA_HOME=/home/jediael/setupfile/scala-2.10.5 export JAVA_HOME=/usr/java/jdk1.7.0_51 export SPARK_MASTER_IP=10.171.29.191 export SPARK_WORKER_MEMORY=512m export master=spark://10.171.29.191:7070
$vi slaves localhost
(5)启动spark pwd
/mnt/jediael/spark-1.3.1-bin-hadoop2.6/sbin $ ./start-all.sh
注意,hadoop也有start-all.sh脚本,因此必须进入具体目录执行脚本 $ jps
30302 Worker 30859 Jps 30172 Master
4、验证安装情况 (1)运行自带示例
$ bin/run-example org.apache.spark.examples.SparkPi
(2)查看集群环境 http://master:8080/
(3)进入spark-shell $spark-shell
(4)查看jobs等信息 http://master:4040/jobs/
4
二 系统技术实现介绍 2.1 数据产生及存储
1 用户操作行为数据
用户从进入聊天室时到退出时,将会记录用户在这个聊天室的起始与结束时间,以及用户所待的时间长度、当前聊天室的话题和其自身的属性。
2 用户属性数据
用户在注册时所填写的属性及用户使用过程中的帐号等级、财富等级等属性。
3 数据存储
数据存储采用hbase 分布式的、面向列的开源数据库存储。
2.2 数据处理关键流程
2.2.1 数据读取
数据读取的用户操作行为数据源在hbase, 连接hbase如下:
valsqconf = HBaseConfiguration.create()
sqconf.set(\, \)
sqconf.set(\, \)
其中10.254.33.105为数据服务器主机ip,2181为端口。
读取hbase的表中的数据如下:
sqconf.set(TableInputFormat.INPUT_TABLE, \)
varusersRDD = sc.newAPIHadoopRDD(sqconf, classOf[TableInputFormat],
5
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
sqconf 为hbase连接配置,t_log_user_ac 为待读取的表名,上面这段代码的含义为将t_log_user_ac 表的数据读取到RDD。
注意,此时并没有真正去读数据, 只是后续在操作这个RDD时,才会去真正读取hbase表数据。
数据读取的用户属性数据源在mysql,读取mysql方式如下:
valjdbcDF= sq.read.format(\).options( Map(\->m_url,
\->m_tabName)).load()
其中,m_url为mysql连接字符串。jdbcDF 为DataFrame。
2.2.2 数据转换
hbase转换到hive
在处理数据时, 将会带来大量的io操作,此时若数据的存储方式影响了io,将会极大的降低数据处理效率,相较于hbase数据库,hive在数据的读写上具有极大效率优势。
所以在处理数据时,将hbase的数据先转换到hive中。
由于存储在hbase中的数据一般都是非结构化的数据,所以在将hbase的数据导入到hive时,必须先进行结构化。
根据hbase的数据结构,先定义结构化数据类型:
// 结构化对象
caseclassuserAc(user_id:String,peer_id:String,ac_type:String,start_time:String,end_time:String,duration:String)
在根据hbase的结果列,结构化数据:
// 从hbase数据行,结构化为userAc
defcreateObj(p:Result,fieldsConf:Array[FieldDec]):userAc = {
returnuserAc( Bytes.toString(p.getValue(Bytes.toBytes(fieldsConf.apply(0).m
6
共分享92篇相关文档