Jump to content

Data Platform/Data Lake/Edits/MediaWiki history dumps/Python spark examples

From Wikitech
The printable version is no longer supported and may have rendering errors. Please update your browser bookmarks and please use the default browser print function instead.

This page provides examples of using Python Spark to process the Mediawiki history dumps.

Loading the data (unescape strings and split arrays)

import re
from pyspark.sql.types import StructType, StructField, StringType, LongType, BooleanType, IntegerType, ArrayType
from pyspark.sql.functions import count, lit, desc

# Update this value to you path
mediawiki_history_path = "/PATH/TO/MEDIAWIKI/HISTORY/FILES"

# Note: string unescaping and array conversion is done later
mediawiki_history_schema = StructType([

    StructField("wiki_db", StringType(), nullable = False),
    StructField("event_entity", StringType(), nullable = False),
    StructField("event_type", StringType(), nullable = False),
    StructField("event_timestamp", StringType(), nullable = True),
    StructField("event_comment_escaped", StringType(), nullable = True),

    StructField("event_user_id", LongType(), nullable = True),
    StructField("event_user_text_historical_escaped", StringType(), nullable = True),
    StructField("event_user_text_escaped", StringType(), nullable = True),
    StructField("event_user_blocks_historical_string", StringType(), nullable = True),
    StructField("event_user_blocks_string", StringType(), nullable = True),
    StructField("event_user_groups_historical_string", StringType(), nullable = True),
    StructField("event_user_groups_string", StringType(), nullable = True),
    StructField("event_user_is_bot_by_historical_string", StringType(), nullable = True),
    StructField("event_user_is_bot_by_string", StringType(), nullable = True),
    StructField("event_user_is_created_by_self", BooleanType(), nullable = True),
    StructField("event_user_is_created_by_system", BooleanType(), nullable = True),
    StructField("event_user_is_created_by_peer", BooleanType(), nullable = True),
    StructField("event_user_is_anonymous", BooleanType(), nullable = True),
    StructField("event_user_registration_timestamp", StringType(), nullable = True),
    StructField("event_user_creation_timestamp", StringType(), nullable = True),
    StructField("event_user_first_edit_timestamp", StringType(), nullable = True),
    StructField("event_user_revision_count", LongType(), nullable = True),
    StructField("event_user_seconds_since_previous_revision", LongType(), nullable = True),

    StructField("page_id", LongType(), nullable = True),
    StructField("page_title_historical_escaped", StringType(), nullable = True),
    StructField("page_title_escaped", StringType(), nullable = True),
    StructField("page_namespace_historical", IntegerType(), nullable = True),
    StructField("page_namespace_is_content_historical", BooleanType(), nullable = True),
    StructField("page_namespace", IntegerType(), nullable = True),
    StructField("page_namespace_is_content", BooleanType(), nullable = True),
    StructField("page_is_redirect", BooleanType(), nullable = True),
    StructField("page_is_deleted", BooleanType(), nullable = True),
    StructField("page_creation_timestamp", StringType(), nullable = True),
    StructField("page_first_edit_timestamp", StringType(), nullable = True),
    StructField("page_revision_count", LongType(), nullable = True),
    StructField("page_seconds_since_previous_revision", LongType(), nullable = True),

    StructField("user_id", LongType(), nullable = True),
    StructField("user_text_historical_escaped",  StringType(), nullable = True),
    StructField("user_text_escaped", StringType(), nullable = True),
    StructField("user_blocks_historical_string", StringType(), nullable = True),
    StructField("user_blocks_string", StringType(), nullable = True),
    StructField("user_groups_historical_string", StringType(), nullable = True),
    StructField("user_groups_string", StringType(), nullable = True),
    StructField("user_is_bot_by_historical_string", StringType(), nullable = True),
    StructField("user_is_bot_by_string", StringType(), nullable = True),
    StructField("user_is_created_by_self", BooleanType(), nullable = True),
    StructField("user_is_created_by_system", BooleanType(), nullable = True),
    StructField("user_is_created_by_peer", BooleanType(), nullable = True),
    StructField("user_is_anonymous", BooleanType(), nullable = True),
    StructField("user_registration_timestamp", StringType(), nullable = True),
    StructField("user_creation_timestamp", StringType(), nullable = True),
    StructField("user_first_edit_timestamp", StringType(), nullable = True),

    StructField("revision_id", LongType(), nullable = True),
    StructField("revision_parent_id", LongType(), nullable = True),
    StructField("revision_minor_edit", BooleanType(), nullable = True),
    StructField("revision_deleted_parts_string", StringType(), nullable = True),
    StructField("revision_deleted_parts_are_suppressed", BooleanType(), nullable = True),
    StructField("revision_text_bytes", LongType(), nullable = True),
    StructField("revision_text_bytes_diff", LongType(), nullable = True),
    StructField("revision_text_sha1", StringType(), nullable = True),
    StructField("revision_content_model", StringType(), nullable = True),
    StructField("revision_content_format", StringType(), nullable = True),
    StructField("revision_is_deleted_by_page_deletion", BooleanType(), nullable = True),
    StructField("revision_deleted_by_page_deletion_timestamp", StringType(), nullable = True),
    StructField("revision_is_identity_reverted", BooleanType(), nullable = True),
    StructField("revision_first_identity_reverting_revision_id", LongType(), nullable = True),
    StructField("revision_seconds_to_identity_revert", LongType(), nullable = True),
    StructField("revision_is_identity_revert", BooleanType(), nullable = True),
    StructField("revision_is_from_before_page_creation", BooleanType(), nullable = True),
    StructField("revision_tags_string", StringType(), nullable = True)
])

# Note: It's important to set .option("quote", "") to prevent spark to automaticallu use double-quotes to quote text
mediawiki_history_raw = spark.read.option("delimiter", "\t").option("quote", "").schema(mediawiki_history_schema).csv(mediawiki_history_path)

# Unescaping and array-splitting UDFs
def unescape(str):
    if (str is None):
        return None
    else:
        return str.replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t")
# The comma splitter applies a negative lookahead for \ to prevent splitting escaped commas
def toArray(str):
    if (str is None):
        return []
    else:
        return [s.strip().replace("\\,", ",") for s in re.split("(?<!\\\\),", unescape(str))]

spark.udf.register("unescape", unescape, StringType())
spark.udf.register("to_array", toArray, ArrayType(StringType(), False))

# Update dataframe using unescaping and array-splitting UDFs
mediawiki_history = mediawiki_history_raw.selectExpr(
  
  "wiki_db",
  "event_entity",
  "event_type",
  "event_timestamp",
  "unescape(event_comment_escaped) AS event_comment",
  
  "event_user_id",
  "unescape(event_user_text_historical_escaped) AS event_user_text_historical",
  "unescape(event_user_text_escaped) AS event_user_text",
  "to_array(event_user_blocks_historical_string) AS event_user_blocks_historical",
  "to_array(event_user_blocks_string) AS event_user_blocks",
  "to_array(event_user_groups_historical_string) AS event_user_groups_historical",
  "to_array(event_user_groups_string) AS event_user_groups",
  "to_array(event_user_is_bot_by_historical_string) AS event_user_is_bot_by_historical",
  "to_array(event_user_is_bot_by_string) AS event_user_is_bot_by",
  "event_user_is_created_by_self",
  "event_user_is_created_by_system",
  "event_user_is_created_by_peer",
  "event_user_is_anonymous",
  "event_user_registration_timestamp",
  "event_user_creation_timestamp",
  "event_user_first_edit_timestamp",
  "event_user_revision_count",
  "event_user_seconds_since_previous_revision",
  
  "page_id",
  "unescape(page_title_historical_escaped) AS page_title_historical",
  "unescape(page_title_escaped) AS page_title",
  "page_namespace_historical",
  "page_namespace_is_content_historical",
  "page_namespace",
  "page_namespace_is_content",
  "page_is_redirect",
  "page_is_deleted",
  "page_creation_timestamp",
  "page_first_edit_timestamp",
  "page_revision_count",
  "page_seconds_since_previous_revision",
  
  "user_id",
  "unescape(user_text_historical_escaped) AS user_text_historical",
  "unescape(user_text_escaped) AS user_text",
  "to_array(user_blocks_historical_string) AS user_blocks_historical",
  "to_array(user_blocks_string) AS user_blocks",
  "to_array(user_groups_historical_string) AS user_groups_historical",
  "to_array(user_groups_string) AS user_groups",
  "to_array(user_is_bot_by_historical_string) AS user_is_bot_by_historical",
  "to_array(user_is_bot_by_string) AS user_is_bot_by",
  "user_is_created_by_self",
  "user_is_created_by_system",
  "user_is_created_by_peer",
  "user_is_anonymous",
  "user_registration_timestamp",
  "user_creation_timestamp",
  "user_first_edit_timestamp",
  
  "revision_id",
  "revision_parent_id",
  "revision_minor_edit",
  "to_array(revision_deleted_parts_string) AS revision_deleted_parts",
  "revision_deleted_parts_are_suppressed",
  "revision_text_bytes",
  "revision_text_bytes_diff",
  "revision_text_sha1",
  "revision_content_model",
  "revision_content_format",
  "revision_is_deleted_by_page_deletion",
  "revision_deleted_by_page_deletion_timestamp",
  "revision_is_identity_reverted",
  "revision_first_identity_reverting_revision_id",
  "revision_seconds_to_identity_revert",
  "revision_is_identity_revert",
  "revision_is_from_before_page_creation",
  "to_array(revision_tags_string) AS revision_tags"
)

# Have fun with data :)
# Top 10 projects in number of revisions for month 2019-12
mediawiki_history. \
  where("event_entity = 'revision' and event_type = 'create'"). \
  selectExpr("wiki_db", "SUBSTR(event_timestamp, 0, 7) as month"). \
  where("month = '2019-12'"). \
  groupBy("wiki_db", "month"). \
  agg(count(lit(1)).alias("revision_count")). \
  sort(desc("revision_count")). \
  show(10, False)
  
# +--------------+-------+--------------+                                         
# |wiki_db       |month  |revision_count|
# +--------------+-------+--------------+
# |wikidatawiki  |2019-12|21511762      |
# |commonswiki   |2019-12|6046688       |
# |enwiki        |2019-12|4756250       |
# |arwiki        |2019-12|1599840       |
# |frwiki        |2019-12|903838        |
# |dewiki        |2019-12|795638        |
# |eswiki        |2019-12|710516        |
# |viwiki        |2019-12|679525        |
# |ruwiki        |2019-12|652051        |
# |itwiki        |2019-12|563592        |
# +--------------+-------+--------------+