Jump to content

User:Joal/Clickstream historical

From Wikitech
The printable version is no longer supported and may have rendering errors. Please update your browser bookmarks and please use the default browser print function instead.

This page describes my experiments with the Clickstream dataset.

First, launch a spark-2 scala shell from any of the stat100[46] machine (with nice limitations for resource sharing):

spark2-shell --master yarn --conf spark.dynamicAllocation.maxExecutors=64 --driver-memory 4G --executor-memory 8G --executor-cores 2 --jars /srv/deployment/analytics/refinery/artifacts/refinery-job.jar

Then do some work:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.{StructType, StructField, StringType, LongType}
import org.apache.spark.sql.Row

val wiki = Seq(
  "enwiki"
)

val dates = Seq(
  "2017-11",
  "2017-12",
  "2018-01",
  "2018-02",
  "2018-03",
  "2018-04",
  "2018-05",
  "2018-06",
  "2018-07",
  "2018-08",
  "2018-09",
  "2018-10",
  "2018-11",
  "2018-12",
  "2019-01",
  "2019-02",
  "2019-03",
  "2019-04"
)

val wiki_dates = wiki.flatMap(w => dates.map(d => (w, d))).par

val rawSchema = StructType(
  StructField("page_from", StringType, false) ::
  StructField("page_to", StringType, false) ::
  StructField("link_type", StringType, false) ::
  StructField("count", LongType, false) :: Nil
)

val augSchema = rawSchema.
  add(StructField("date", LongType, false)).
  add(StructField("wiki_db", LongType, false))

def readDf(wiki: String, dt: String): DataFrame = {
  spark.
    read.
    option("delimiter", "\t").
    schema(rawSchema).
    csv(s"/wmf/data/archive/clickstream/${dt}/clickstream-${wiki}-${dt}.tsv.gz").
    withColumn("date", lit(dt)).
    withColumn("wiki_db", lit(wiki))
}

val emptyDf = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], augSchema)
val clickstreams = wiki_dates.
  map(w_d => readDf(w_d._1, w_d._2)).
  fold(emptyDf)((d1,  d2) => d1.union(d2)).
  repartition(64).
  cache()

// Takes a long time as it has to read the full dataset (unzip is not parallel :(
clickstreams.groupBy("wiki_db", "date").count().sort("wiki_db", "date").show(100, false)

clickstreams.
  where("page_from IN ('Global_warming', 'Climate_change') AND page_to IN ('Global_warming', 'Climate_change')").
  sort("date", "page_from", "page_to").
  show(1000, false)

clickstreams.
  where("page_from IN ('Smog', 'Air_pollution') AND page_to IN ('Smog', 'Air_pollution')").
  sort("date", "page_from", "page_to").
  show(1000, false)