package ml import java.util import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.mllib.linalg.distributed.RowMatrix import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SQLContext, Row} import org.apache.spark.{SparkContext, SparkConf} import java.util.Arrays import org.apache.spark.ml.attribute.{Attribute, AttributeGroup, NumericAttribute} import org.apache.spark.ml.feature.VectorSlicer import org.apache.spark.sql.types.{DataTypes, StructField, StructType} /* VectorSlicer是一个转换器,输入一个特征向量输出一个特征向量, 它是原特征的一个子集。这在从向量列中抽取特征非常有用。 VectorSlicer接收一个拥有特定索引的特征列, 它的输出是一个新的特征列,它的值通过输入的索引来选择。 有两种类型的索引: 1、整数索引表示进入向量的索引,调用setIndices() 2、字符串索引表示进入向量的特征列的名称,调用setNames()。 这种情况需要向量列拥有一个AttributeGroup,这是因为实现是通过属性的名字来匹配的。 * */ object FeatureSelectors { def main(args: Array[String]) { val conf = new SparkConf().setAppName("test").setMaster("local") val sc = new SparkContext(conf) val sql = new SQLContext(sc); val data = Arrays.asList( // Row(Vectors.dense(-2.0, 2, 0.0)), Row(Vectors.sparse(3, Seq((0, -2.0), (1, 2.3)))), Row(Vectors.dense(-2.0, 2, 0.0)) ) val defaultAttr: NumericAttribute = NumericAttribute.defaultAttr val attrs = Array("f1", "f2", "f3").map(defaultAttr.withName) val attrGroup = new AttributeGroup("userFeatures", attrs.asInstanceOf[Array[Attribute]]) //从三列中选择两列参与模型训练 val dataset = sql.createDataFrame(data, StructType(Array(attrGroup.toStructField()))) dataset.printSchema() val slicer = new VectorSlicer().setInputCol("userFeatures").setOutputCol("features") //setIndices(Array(1)) 第二列 setNames(Array("f3")) 第三列 // slicer.setIndices(Array(1)).setNames(Array("f3")) slicer.setIndices(Array(1)).setNames(Array("f3")) // or slicer.setIndices(Array(1, 2)), or slicer.setNames(Array("f2", "f3")) val output: DataFrame = slicer.transform(dataset) output.printSchema() output.show(false) output.select("features").show() // val out: RDD[Row] = output.rdd.map(row => Row(row.get(0),row.get(1))) val out: DataFrame = output.select("features") val rdd: RDD[Row] = out.toDF().map{ row => val r: Vector = row.getAs[Vector](0) Row(r.apply(0),r.apply(1)) // println("---"+r.apply(0)+"---"+r.apply(1)) } val fields = new util.ArrayList[StructField]; fields.add(DataTypes.createStructField("id", DataTypes.DoubleType, true)); fields.add(DataTypes.createStructField("feature", DataTypes.DoubleType, true)); val structType = DataTypes.createStructType(fields); sql.createDataFrame(rdd,structType).show() } }