Jump to content

Data Platform/Data Lake/Edits/MediaWiki history dumps/Scala spark examples

From Wikitech
The printable version is no longer supported and may have rendering errors. Please update your browser bookmarks and please use the default browser print function instead.

This page provides examples of using Scala Spark to process the Mediawiki history dumps.

Loading the data (unescape strings and split arrays)

// Update this value to you path
val mediawiki_history_path = "/PATH/TO/MEDIAWIKI/HISTORY/FILES"

import org.apache.spark.sql.types._

// Note: string unescaping and array conversion is done later
val mediawiki_history_schema = StructType(Seq(

  StructField("wiki_db", StringType, nullable = false),
  StructField("event_entity", StringType, nullable = false),
  StructField("event_type", StringType, nullable = false),
  StructField("event_timestamp", StringType, nullable = true),
  StructField("event_comment_escaped", StringType, nullable = true),

  StructField("event_user_id", LongType, nullable = true),
  StructField("event_user_text_historical_escaped", StringType, nullable = true),
  StructField("event_user_text_escaped", StringType, nullable = true),
  StructField("event_user_blocks_historical_string", StringType, nullable = true),
  StructField("event_user_blocks_string", StringType, nullable = true),
  StructField("event_user_groups_historical_string", StringType, nullable = true),
  StructField("event_user_groups_string", StringType, nullable = true),
  StructField("event_user_is_bot_by_historical_string", StringType, nullable = true),
  StructField("event_user_is_bot_by_string", StringType, nullable = true),
  StructField("event_user_is_created_by_self", BooleanType, nullable = true),
  StructField("event_user_is_created_by_system", BooleanType, nullable = true),
  StructField("event_user_is_created_by_peer", BooleanType, nullable = true),
  StructField("event_user_is_anonymous", BooleanType, nullable = true),
  StructField("event_user_registration_timestamp", StringType, nullable = true),
  StructField("event_user_creation_timestamp", StringType, nullable = true),
  StructField("event_user_first_edit_timestamp", StringType, nullable = true),
  StructField("event_user_revision_count", LongType, nullable = true),
  StructField("event_user_seconds_since_previous_revision", LongType, nullable = true),

  StructField("page_id", LongType, nullable = true),
  StructField("page_title_historical_escaped", StringType, nullable = true),
  StructField("page_title_escaped", StringType, nullable = true),
  StructField("page_namespace_historical", IntegerType, nullable = true),
  StructField("page_namespace_is_content_historical", BooleanType, nullable = true),
  StructField("page_namespace", IntegerType, nullable = true),
  StructField("page_namespace_is_content", BooleanType, nullable = true),
  StructField("page_is_redirect", BooleanType, nullable = true),
  StructField("page_is_deleted", BooleanType, nullable = true),
  StructField("page_creation_timestamp", StringType, nullable = true),
  StructField("page_first_edit_timestamp", StringType, nullable = true),
  StructField("page_revision_count", LongType, nullable = true),
  StructField("page_seconds_since_previous_revision", LongType, nullable = true),

  StructField("user_id", LongType, nullable = true),
  StructField("user_text_historical_escaped",  StringType, nullable = true),
  StructField("user_text_escaped", StringType, nullable = true),
  StructField("user_blocks_historical_string", StringType, nullable = true),
  StructField("user_blocks_string", StringType, nullable = true),
  StructField("user_groups_historical_string", StringType, nullable = true),
  StructField("user_groups_string", StringType, nullable = true),
  StructField("user_is_bot_by_historical_string", StringType, nullable = true),
  StructField("user_is_bot_by_string", StringType, nullable = true),
  StructField("user_is_created_by_self", BooleanType, nullable = true),
  StructField("user_is_created_by_system", BooleanType, nullable = true),
  StructField("user_is_created_by_peer", BooleanType, nullable = true),
  StructField("user_is_anonymous", BooleanType, nullable = true),
  StructField("user_registration_timestamp", StringType, nullable = true),
  StructField("user_creation_timestamp", StringType, nullable = true),
  StructField("user_first_edit_timestamp", StringType, nullable = true),

  StructField("revision_id", LongType, nullable = true),
  StructField("revision_parent_id", LongType, nullable = true),
  StructField("revision_minor_edit", BooleanType, nullable = true),
  StructField("revision_deleted_parts_string", StringType, nullable = true),
  StructField("revision_deleted_parts_are_suppressed", BooleanType, nullable = true),
  StructField("revision_text_bytes", LongType, nullable = true),
  StructField("revision_text_bytes_diff", LongType, nullable = true),
  StructField("revision_text_sha1", StringType, nullable = true),
  StructField("revision_content_model", StringType, nullable = true),
  StructField("revision_content_format", StringType, nullable = true),
  StructField("revision_is_deleted_by_page_deletion", BooleanType, nullable = true),
  StructField("revision_deleted_by_page_deletion_timestamp", StringType, nullable = true),
  StructField("revision_is_identity_reverted", BooleanType, nullable = true),
  StructField("revision_first_identity_reverting_revision_id", LongType, nullable = true),
  StructField("revision_seconds_to_identity_revert", LongType, nullable = true),
  StructField("revision_is_identity_revert", BooleanType, nullable = true),
  StructField("revision_is_from_before_page_creation", BooleanType, nullable = true),
  StructField("revision_tags_string", StringType, nullable = true)
))

// Note: It's important to set .option("quote", "") to prevent spark to automaticallu use double-quotes to quote text
val mediawiki_history_raw = spark.read.option("delimiter", "\t").option("quote", "").schema(mediawiki_history_schema).csv(mediawiki_history_path)

// Unescaping and array-splitting UDFs

val unescape = (str: String) => if (str == null) null else str.replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t")
// The comma splitter applies a negative lookahead for \ to prevent splitting escaped commas
val commaSplitter = "(?<!\\\\),".r
val toArray = (str: String) => if (str == null) Array.empty[String] else commaSplitter.split(unescape(str)).map(_.trim.replace("\\,", ","))

spark.udf.register("unescape", unescape)
spark.udf.register("to_array", toArray)

// Update dataframe using unescaping and array-splitting UDFs
val mediawiki_history = mediawiki_history_raw.selectExpr(
  
  "wiki_db",
  "event_entity",
  "event_type",
  "event_timestamp",
  "unescape(event_comment_escaped) AS event_comment",
  
  "event_user_id",
  "unescape(event_user_text_historical_escaped) AS event_user_text_historical",
  "unescape(event_user_text_escaped) AS event_user_text",
  "to_array(event_user_blocks_historical_string) AS event_user_blocks_historical",
  "to_array(event_user_blocks_string) AS event_user_blocks",
  "to_array(event_user_groups_historical_string) AS event_user_groups_historical",
  "to_array(event_user_groups_string) AS event_user_groups",
  "to_array(event_user_is_bot_by_historical_string) AS event_user_is_bot_by_historical",
  "to_array(event_user_is_bot_by_string) AS event_user_is_bot_by",
  "event_user_is_created_by_self",
  "event_user_is_created_by_system",
  "event_user_is_created_by_peer",
  "event_user_is_anonymous",
  "event_user_registration_timestamp",
  "event_user_creation_timestamp",
  "event_user_first_edit_timestamp",
  "event_user_revision_count",
  "event_user_seconds_since_previous_revision",
  
  "page_id",
  "unescape(page_title_historical_escaped) AS page_title_historical",
  "unescape(page_title_escaped) AS page_title",
  "page_namespace_historical",
  "page_namespace_is_content_historical",
  "page_namespace",
  "page_namespace_is_content",
  "page_is_redirect",
  "page_is_deleted",
  "page_creation_timestamp",
  "page_first_edit_timestamp",
  "page_revision_count",
  "page_seconds_since_previous_revision",
  
  "user_id",
  "unescape(user_text_historical_escaped) AS user_text_historical",
  "unescape(user_text_escaped) AS user_text",
  "to_array(user_blocks_historical_string) AS user_blocks_historical",
  "to_array(user_blocks_string) AS user_blocks",
  "to_array(user_groups_historical_string) AS user_groups_historical",
  "to_array(user_groups_string) AS user_groups",
  "to_array(user_is_bot_by_historical_string) AS user_is_bot_by_historical",
  "to_array(user_is_bot_by_string) AS user_is_bot_by",
  "user_is_created_by_self",
  "user_is_created_by_system",
  "user_is_created_by_peer",
  "user_is_anonymous",
  "user_registration_timestamp",
  "user_creation_timestamp",
  "user_first_edit_timestamp",
  
  "revision_id",
  "revision_parent_id",
  "revision_minor_edit",
  "to_array(revision_deleted_parts_string) AS revision_deleted_parts",
  "revision_deleted_parts_are_suppressed",
  "revision_text_bytes",
  "revision_text_bytes_diff",
  "revision_text_sha1",
  "revision_content_model",
  "revision_content_format",
  "revision_is_deleted_by_page_deletion",
  "revision_deleted_by_page_deletion_timestamp",
  "revision_is_identity_reverted",
  "revision_first_identity_reverting_revision_id",
  "revision_seconds_to_identity_revert",
  "revision_is_identity_revert",
  "revision_is_from_before_page_creation",
  "to_array(revision_tags_string) AS revision_tags"
)

// Have fun with data :)
// Top 10 projects in number of revisions for month 2019-12
mediawiki_history.
  where("event_entity = 'revision' and event_type = 'create'").
  selectExpr("wiki_db", "SUBSTR(event_timestamp, 0, 7) as month").
  where("month = '2019-12'").
  groupBy("wiki_db", "month").
  agg(count(lit(1L)).as("revision_count")).
  sort(desc("revision_count")).
  show(10, false)

/*
+--------------+-------+--------------+                                         
|wiki_db       |month  |revision_count|
+--------------+-------+--------------+
|wikidatawiki  |2019-12|21511762      |
|commonswiki   |2019-12|6046688       |
|enwiki        |2019-12|4756250       |
|arwiki        |2019-12|1599840       |
|frwiki        |2019-12|903838        |
|dewiki        |2019-12|795638        |
|eswiki        |2019-12|710516        |
|viwiki        |2019-12|679525        |
|ruwiki        |2019-12|652051        |
|itwiki        |2019-12|563592        |
+--------------+-------+--------------+
only showing top 10 rows
*/