[Spark] 读取Hive数据进行协同过滤ALS推荐,例子

这里仅给出个简单的例子的代码

 

 

package recommend

import org.apache.log4j.{ Level, Logger }
import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel
import org.apache.spark.mllib.recommendation.Rating
import org.apache.hadoop.conf.Configuration
import com.huawei.hadoop.security.LoginUtil
import org.apache.spark.sql.hive.HiveContext

object ALSTest {
  def main(args: Array[String]): Unit = {

    val principal = "abcd"
    val userkeytabpath = "/srv/BigData/kafka/data1/HadoopCli/keytab/user.keytab"
    val krb5path = "/srv/BigData/kafka/data1/HadoopCli/keytab/krb5.conf"
    val hadoopConf: Configuration = new Configuration()
    LoginUtil.login(principal, userkeytabpath, krb5path, hadoopConf)
    
    //0 构建Spark对象
    val conf = new SparkConf().setAppName("ALS")
    val sc = new SparkContext(conf)
    Logger.getRootLogger.setLevel(Level.WARN)

    //1 读取样本数据
    //  val data = sc.textFile("hdfs://master:9000/test/als_test.txt")
    //  val ratings = data.map(_.split(',') match {
    //    case Array(user, item, rate) =>
    //      Rating(user.toInt, item.toInt, rate.toDouble)
    //  })
    val sqlContext = new HiveContext(sc)
    import sqlContext.implicits._
    val dataFrame = sqlContext.sql("select uid,gid,pref from tab_als")
    println("###dataFrame==>" + dataFrame)
    val ratings = dataFrame.rdd.map(f => (Rating(f.getString(0).toInt, f.getString(1).toInt, f.getString(2).toDouble)))

    //2 建立模型
    //rank ,特征向量纬度,这个设置就要看了,如果太小拟合的就会不够,误差就很大;
    //  如果设置很大,就会导致模型大泛化能力较差;所以就需要自己把握一个度了,一般情况下10~100都是可以的;
    val rank = 10 //List(8,10,12)
    val numIterations = 20
    val model = ALS.train(ratings, rank, numIterations, 0.01)

    /**
     * lambda也是和rank一样的,如果设置很大就可以防止过拟合问题,如果设置很小,其实可以理解为直接设置为0,
     * 那么就不会有防止过拟合的功能了;怎么设置呢?可以从0.0001 ,0.0003,0.001,0.003,0.01,0.03,0.1,0.3,1,3,10
     * 这样每次大概3倍的设置,先大概看下哪个值效果比较好,然后在那个比较好的值(比如说0.01)前后再设置一个范围,
     * (0.003,0.3)之间,间隔设置小点,即0.003,0.005,0.007,0.009,0.011,,,,。
     * 当然,如果机器性能够好,而且你够时间,可以直接设置从0到100,间隔很小,然后一组参数一组的试试也是可以的。
     *
     *
     */

    //3 预测结果
    val usersProducts = ratings.map {
      case Rating(user, product, rate) =>
        (user, product)
    }
    println("#####usersProducts collect ####")
    usersProducts.collect().foreach(println)
    val predictions =
      model.predict(usersProducts).map {
        case Rating(user, product, rate) =>
          ((user, product), rate)
      }

    println("#####predictions collect ####")
    predictions.collect().foreach(println)
    val ratesAndPreds = ratings.map {
      case Rating(user, product, rate) =>
        ((user, product), rate)
    }.join(predictions)
    println("#####ratesAndPreds collect ####")
    ratesAndPreds.collect().foreach(println)

    val MSE = ratesAndPreds.map {
      case ((user, product), (r1, r2)) =>
        val err = (r1 - r2)
        err * err
    }.mean()
    println("Mean Squared Error = " + MSE)

    //4 保存/加载模型
    //  model.save(sc, path)
    //  val sameModel = MatrixFactorizationModel.load(sc,path)
  }

}

分类上一篇:     分类下一篇:无,已是最新文章

Leave a Reply