User Details
- User Since
- Feb 11 2015, 6:02 PM (505 w, 6 d)
- Availability
- Available
- IRC Nick
- joal
- LDAP User
- Unknown
- MediaWiki User
- JAllemandou (WMF) [ Global Accounts ]
Thu, Oct 17
While talking with @Ottomata , I realized that we can relatively easily monitor late-arrived events in Gobblin when writing _IMPORTED flags: if there already is a flag in folder to be flagged, report late-events instead of overwriting the flag.
As gobblin has prometheus metrics integration, it'll be possible to report those using prometheus :)
This feels like a nice and wasy approach.
After a talk with @Antoine_Quhen and @Ottomata we've decided to simplify the Gobblin algorithm: we'll use what would have been the low-volume algorithm for all topics instead of just the low-volume ones.
This allows to simplify the code, and to cover for cases such as volume-changing topics and manually partitioned topics that would have been corner cases otherwise.
The downside of the chosen solution is that it puts us more at risk of late events (not explaining here, too complicated - please ask if you wish me to explain :).
We had in our plans to monitor and alert about late events anyhow, so we think it's worth going in this direction.
News on this front: After talking with @Antoine_Quhen and @Ottomata the preferred solution is to make this flag passed by our Airflow SparkSQL operator instead of being hard-coded in Saprk config.
The difference is for users: if we put it in the global spark-conf, users can have the cases where there is corrupted (partial) data in the result folder they're writing to, and we prefer to avoid this.
It's less of a problem for production jobs as failed spark jobs will be automatically retried, or raise an error.
Tue, Oct 15
@JAllemandou - Sorry I just saw this ping. We can definitely look into this. Shall we create a new ticket?
Mon, Oct 14
Thanks a lot for fixing the data-deletion checksum @xcollazo !
Fri, Oct 11
About the RPC storm: I have found some documentation about new settings we should use for our HDFS namenode: https://community.cloudera.com/t5/Community-Articles/Scaling-the-HDFS-NameNode-part-3-RPC-scalability-features/ta-p/246719
This morning I started to backfill hours with missing data and monitored jobs.
Since jobs were running in parallel, they have experienced the "temporary folder deletion" issue.
I had 2 jobs running in parallel, and the one having finished later had failed tasks. However both result folders have 257 files.
This means our solution setting spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2 for the jobs works.
Thank you for the great analysis and explanation @Hghani.
In the discussion above I made a mistake: I stated that the jobs fail while they don't. They generate corrupted data, as in a portion of the entire dataset (no data-mix between jobs).
In this wikitech doc the problem is explained and it is said that the spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version = 2 should be used if the job fails during backfill, but actually we should have set the parameter by default on every job.
Thu, Oct 10
No such pattern on raw data:
hdfs dfs -du -s -h /wmf/data/raw/webrequest/webrequest_text/year=2024/month=10/day=09/*
The problem is visible on HDFS:
hdfs dfs -du -s -h /wmf/data/wmf/webrequest/webrequest_source=text/year=2024/month=10/day=9/* Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8 47.6 G /wmf/data/wmf/webrequest/webrequest_source=text/year=2024/month=10/day=9/hour=0 48.4 G /wmf/data/wmf/webrequest/webrequest_source=text/year=2024/month=10/day=9/hour=1 61.7 G /wmf/data/wmf/webrequest/webrequest_source=text/year=2024/month=10/day=9/hour=10 68.7 G /wmf/data/wmf/webrequest/webrequest_source=text/year=2024/month=10/day=9/hour=11 72.6 G /wmf/data/wmf/webrequest/webrequest_source=text/year=2024/month=10/day=9/hour=12 75.7 G /wmf/data/wmf/webrequest/webrequest_source=text/year=2024/month=10/day=9/hour=13 75.0 G /wmf/data/wmf/webrequest/webrequest_source=text/year=2024/month=10/day=9/hour=14 74.3 G /wmf/data/wmf/webrequest/webrequest_source=text/year=2024/month=10/day=9/hour=15 72.1 G /wmf/data/wmf/webrequest/webrequest_source=text/year=2024/month=10/day=9/hour=16 34.6 G /wmf/data/wmf/webrequest/webrequest_source=text/year=2024/month=10/day=9/hour=17 <------ 73.2 G /wmf/data/wmf/webrequest/webrequest_source=text/year=2024/month=10/day=9/hour=18 73.2 G /wmf/data/wmf/webrequest/webrequest_source=text/year=2024/month=10/day=9/hour=19 48.5 G /wmf/data/wmf/webrequest/webrequest_source=text/year=2024/month=10/day=9/hour=2 68.8 G /wmf/data/wmf/webrequest/webrequest_source=text/year=2024/month=10/day=9/hour=20 58.9 G /wmf/data/wmf/webrequest/webrequest_source=text/year=2024/month=10/day=9/hour=21 50.4 G /wmf/data/wmf/webrequest/webrequest_source=text/year=2024/month=10/day=9/hour=22 11.2 G /wmf/data/wmf/webrequest/webrequest_source=text/year=2024/month=10/day=9/hour=23 <------ 48.6 G /wmf/data/wmf/webrequest/webrequest_source=text/year=2024/month=10/day=9/hour=3 48.1 G /wmf/data/wmf/webrequest/webrequest_source=text/year=2024/month=10/day=9/hour=4 48.9 G /wmf/data/wmf/webrequest/webrequest_source=text/year=2024/month=10/day=9/hour=5 29.0 G /wmf/data/wmf/webrequest/webrequest_source=text/year=2024/month=10/day=9/hour=6 <------- 56.6 G /wmf/data/wmf/webrequest/webrequest_source=text/year=2024/month=10/day=9/hour=7 57.9 G /wmf/data/wmf/webrequest/webrequest_source=text/year=2024/month=10/day=9/hour=8 59.7 G /wmf/data/wmf/webrequest/webrequest_source=text/year=2024/month=10/day=9/hour=9
My assumption is that the spark job has issues writing some files (we expect the job to write 256 data files plus one _SUCCESS file):
for h in $(seq 0 23); do hdfs dfs -count /wmf/data/wmf/webrequest/webrequest_source=text/year=2024/month=10/day=9/hour=${h} done
I think this task can be closed, the investigation is done. We're now in the "solving problem" state :)
Should I set this value to true so that we delete all unused segments, or should we reduce the durationToRetain value to something lower than 90D?
Tue, Oct 8
Ah! I had forgotten :) But this st4ream is not ingested in hadoop.
Anyhow, if you agree with the approach, I can implement it.
Our algorithm about flagging folders is working at topic-partition-level for high-volume topics, and at topic-level for low-volume topics. It doesn't really make sense to mix them.
My suggested solution would be to discard low-volume topic-partitions from high-volume topics. I think this would cover the issue.
I'm also taking this as an opportunity to refactor tests for this algorithm.
I've decided to implement the solution number 2, updating the Gobblin algorithm flagging folders. This allows to keep the solution internal to Gobblin, instead of delegating the responsibility of choosing whether a folder is done or not to Airflow.
Wed, Oct 2
Tue, Oct 1
We very much can test the setting onto the test-cluster.
I think there is small benefit in having the same parameter for the druid-public cluster: there is no reindexation, so no unused segments that should be deleted (we delete entire datasources in there).
Mon, Sep 30
Thu, Sep 26
Exploration on specific datasets:
- mediawiki_page_content_change_v1--> No timestamp-late-events, only gobblin/kafka hiccups
- resource_purge --> Uses meta.dt, and there are late timestamps. We don't refine this table (see here) but not because of late events. I wonder if changing the behavior of event-gate to overwrite meta.dt would change anything to other dowstream consumers.
- mediawiki_revision-create --> Regular small number of late events (less than 10 per hour), with one spike at ~5k per hour for 4 hours on 2024-08-30 at 8amUTC. I think this is due to the fact that meta.dt is set to the revision timestamp, and some revision changes can occur for past revisions.
- mediawiki_recentchange --> Regular number of late events (less than 10 usually with some bumps). I don't really know why this happens, probably the same reason as the revision-create,
Tue, Sep 24
After explaining my finding to the team yesterday, here are the following steps: I'm gonna have a look at late-events in a few streams to try to understand the reason why we have them, and at the same time we are going to change the data event-publishing contract: event-publishing libraries should set the meta.dt field and use this field as kafka-timestamp field, overriding the field value if it is already set by the producer.
Sep 20 2024
Second round of analysis : https://phabricator.wikimedia.org/P69381
Sep 18 2024
Ping @lbowmaker , we should prioritize this if we can :)
Sep 17 2024
Great finding @xcollazo :)
Sep 16 2024
Patch sent (see above). One wiki was already added to the list (btmwiki), and another was not yet present in the databases (cywikivoyage).
I ran an analysis over the /wmf/data/raw/event folder (event-platform events not yet refined) on Friday 13th of September.
The code and walk-through results can be found in this phab paste.
Sep 13 2024
The reason we originally started using different users/groups was to silo permissions.
I think it's still good to have different writing abilities and read abilities: only search-user can write to this datasets, but we wish all private-data-users to be able to read.
We had set the name with date information on purpose, to facilitate identifying tasks in Yarn (there is a bug, hourly tasks are named with day data, and this should be corrected).
I understand it would make things better for lineage to name them without the date-moving part. But looking in the Datahub spark doc, I found that we can set the flow name explicitely using spark.datahub.flow_name.
Sep 12 2024
Jun 20 2024
Jun 17 2024
Here we are! we have a new subdivision_code map key in our wmf.webrequest table in geocoded_data field :)
Jun 12 2024
Jun 11 2024
Jun 7 2024
Hi @KCVelaga_WMF , I have quickly looked at the patch you sent and I have questions:
- Does the table exists in all wikis? If not, there will be failures when trying to sqoop it. If you have a precise lit of wikis for which you wish the job to run, we'll need a configuration file similar to https://github.com/wikimedia/analytics-refinery/blob/master/static_data/mediawiki/grouped_wikis/grouped_wikis.csv to have the job processing only the wikis you're interested in.
- You stated in the review you wish the job to be run hourly. We currently don't have sqoop jobs running hourly, hour minimal frequency is daily and this happens only a either single-wiki (wikilambda), or small tables (discussiontools_subscription). Explanation: when downloading tables, our sqoop-management-script generates one job per wiki and table it needs to import, so potentially many jobs. Would a daily import be enough for you?
- Finally, for the import to happen, you'll need another patch than the one you wrote here to configure a timer running the sqoop-management-script. Those timers are setup by puppet here: https://github.com/wikimedia/operations-puppet/blob/ee29ef012aa771b1e49863c7318a6daa7110cfcc/modules/profile/manifests/analytics/refinery/job/sqoop_mediawiki.pp.
The spark job finished, we have data (from superset):
SELECT revision_id, revision_timestamp FROM wmf.mediawiki_wikitext_history WHERE snapshot = '2024-04' AND wiki_db = 'wikidatawiki' LIMIT 10
Hi Folks, I've been late in delivering this but it's landing as I write.
The spark job transforming wikidata-xml-history for snapshot 2024-04 is currently running . I expect it to finish either today or tomorrow.
The spark job is scheduled by a separate Airflow DAG and only computes Wikidata-xml-history, while the other DAG keeps running without wikidata to get data faster on other dumps.
This month having seen errors on the dumps release process, it's a bad timing for tests, but hopefully things will settle and we'll have a proper run next month.
Jun 6 2024
Jun 5 2024
Jun 3 2024
Task description updated with latest stack trace.
May 30 2024
May 28 2024
May 27 2024
May 22 2024
Hi @Audiodude, thank you for reporting your issue.
May 21 2024
Super interesting that cassandra can now do a delete via Range! This could help us apply a deletion-policy if we sometimes decide to use one!
May 15 2024
Quite note on what the traffic this task refers is.
Port 50010 on an-worker nodes is hadoop HDFS datanodes data transfer port.
This is the port serving data requested to HDFS served by that worker. Most of the traffic on this port is expected to be coming either from same/other an-worker nodes, or from an-presto nodes (both are compute-nodes reading HDFS data).
The amount of data read varies a lot depending on jobs. One can see that the amount of data read from HDFS is very spiky: https://grafana.wikimedia.org/d/000000585/hadoop?orgId=1&var-hadoop_cluster=analytics-hadoop&from=now-7d&to=now&viewPanel=111
May 13 2024
Hi @XiaoXiao-WMF ,
the impact of this issue is that the hive wmf.mediawiki_wikitext_history is currently not containing the wikidatawiki project's data.
I sent a slack message a while asking if anyone was using this data back but made the mistake to only call-out Fabian, from the research team.
I estimate this issue to take between one and two week(s) to be solved once someone starts working on it.
May 7 2024
I'm sorry, what Luke expressed before can't be done, I messed up when looking at wikidata dumps generation: I looked at a subset of files that were generated the 8th, but forgot the rest of hte files /facepalm/.
This means we need to build a parallel pipeline for wikidata dump import as planned originally.
Sorry for the false joy :S
May 6 2024
No objection :) I'd have gone for option 1 as it seems the easiest to maintain, but I agree, it means installing some stuff to the blazegraph machines.
I would suggest using the hdfs-rsync tool to do this - it requires some setting up with puppet, but it is helpful, through copying only new stuff from folders (see https://github.com/wikimedia/operations-puppet/blob/1c4d67ff19372832484f7551dc49836be5806024/modules/hdfs_tools/manifests/hdfs_rsync_job.pp and https://github.com/wikimedia/operations-puppet/blob/1c4d67ff19372832484f7551dc49836be5806024/modules/dumps/manifests/web/fetches/stats.pp)
Apr 28 2024
Apr 26 2024
Reportupdater jobs have all been either deprecated or migrated to Airflow!
The report-updater jobs have been stopped, and data-synchronization have been updated from the report-updater folders to hadoop-folders (updated by airflow jobs).
We can call report-updater deprecated for real, even if we still need to do some code cleaning.
Also, if anything goes wrong with the new system, we still have the data generated by reportupdater stored on HDFS and we can reset the old system.
This is a great step toward not using other scheduler than airflow - Lots of kudos to @amastilovic for migrating the jobs, and to @BTullis for finalizing the operations on deprecating the tool.
Apr 24 2024
Apr 23 2024
I'm not sure if the spark-cassandra-connector can read a Java Truststore on HDFS! I'd go for an automated deployment of the trustore on every cluster host. For the moment it'll be enough as our prod jobs are launched fron the cluster (skein). It would probably also be good to have the truststore deployed on stat machines, to allow for manual runs. This should be enough for now, until we move launchers away from skein to use k8s - We'll revisit at that time (ping @BTullis :)
Apr 18 2024
Global execution times have been divided by 3 (10mins for 170 jobs). We are using a new launchers queue to launch small jobs and have scaled the airflow parallelization to 10 tasks. We can replicate this model to other jobs :)