Vorlage "Cloud Storage Text f�r BigQuery (Stream) mit Python-UDF"

Die Pipeline "Cloud Storage Text f�r BigQuery" ist eine Streamingpipeline, die in Cloud Storage gespeicherte Textdateien streamt, diese mit einer von Ihnen bereitgestellten benutzerdefinierten Python-Funktion (User-Defined Function, UDF) transformiert und das Ergebnis an BigQuery anh�ngt.

Die Pipeline wird auf unbestimmte Zeit ausgef�hrt und muss manuell �ber eine Cancel-Anweisung und kein Drain beendet werden, aufgrund ihrer Verwendung der Watch Transformation, die eine splittable DoFn ist, die den Draining nicht unterst�tzt.

Pipelineanforderungen

  • Erstellen Sie eine JSON-Datei, die das Schema Ihrer Ausgabetabelle in BigQuery beschreibt.

    Stellen Sie ein JSON-Array der obersten Ebene mit dem Namen fields bereit, dessen Inhalt dem Muster {"name": "COLUMN_NAME", "type": "DATA_TYPE"} folgt. Beispiel:

    {
      "fields": [
        {
          "name": "name",
          "type": "STRING"
        },
        {
          "name": "age",
          "type": "INTEGER"
        }
      ]
    }
    
  • Erstellen Sie eine Python-Datei (.py) mit Ihrer UDF, die die Logik f�r die Transformation der Textzeilen bereitstellt. Ihre Funktion muss einen JSON-String zur�ckgeben.

    Im folgenden Beispiel wird jede Zeile einer CSV-Datei aufgeteilt, ein JSON-Objekt mit den Werten erstellt und ein JSON-String zur�ckgegeben:

    import json
    def process(value):
      data = value.split(',')
      obj = { 'name': data[0], 'age': int(data[1]) }
      return json.dumps(obj)
    

Vorlagenparameter

Parameter Beschreibung
pythonExternalTextTransformGcsPath Der Cloud Storage-URI der Python-Codedatei, in der die benutzerdefinierte Funktion (UDF) definiert wird, die Sie verwenden m�chten. Beispiel: gs://my-bucket/my-udfs/my_file.py.
pythonExternalTextTransformFunctionName Der Name der benutzerdefinierten Python-Funktion (UDF), die Sie verwenden m�chten.
JSONPath Der Cloud Storage-Speicherort Ihrer BigQuery-Schemadatei im JSON-Format. Beispiel: gs://path/to/my/schema.json.
outputTable Die vollst�ndig qualifizierte BigQuery-Tabelle. Beispiel: my-project:dataset.table.
inputFilePattern Der Cloud Storage-Speicherort des Textes, den Sie verarbeiten m�chten. Beispiel: gs://my-bucket/my-files/text.txt
bigQueryLoadingTemporaryDirectory Das tempor�re Verzeichnis f�r den BigQuery-Ladevorgang. Beispiel: gs://my-bucket/my-files/temp_dir
outputDeadletterTable Tabelle f�r Nachrichten, die die Ausgabetabelle nicht erreicht haben. Beispiel: my-project:dataset.my-unprocessed-table. Wenn sie nicht vorhanden ist, wird sie w�hrend der Pipelineausf�hrung erstellt. Wenn nicht angegeben, wird stattdessen <outputTableSpec>_error_records verwendet.

Benutzerdefinierte Funktion

Diese Vorlage erfordert eine UDF, die die Eingabedateien parst, wie unter Pipelineanforderungen beschrieben. Die Vorlage ruft die UDF f�r jede Textzeile in jeder Eingabedatei auf. Weitere Informationen zum Erstellen von benutzerdefinierten Funktionen finden Sie unter Benutzerdefinierte Funktionen f�r Dataflow-Vorlagen erstellen.

Funktionsspezifikation

UDFs haben die folgende Spezifikation:

  • Eingabe: eine einzelne Textzeile aus einer Eingabedatei
  • Ausgabe: Ein JSON-String, der mit dem Schema der BigQuery-Zieltabelle �bereinstimmt.

Führen Sie die Vorlage aus.

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Cloud Storage Text to BigQuery (Stream) with Python UDF templateaus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/Stream_GCS_Text_to_BigQuery_Xlang \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
pythonExternalTextTransformGcsPath=PATH_TO_PYTHON_UDF_FILE,\
pythonExternalTextTransformFunctionName=PYTHON_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
inputFilePattern=PATH_TO_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
outputDeadletterTable=BIGQUERY_UNPROCESSED_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

Ersetzen Sie Folgendes:

  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • REGION_NAME: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: Die Version der Vorlage, die Sie verwenden m�chten

    Sie k�nnen die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten �bergeordneten Ordner im Bucket verf�gbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten �bergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • STAGING_LOCATION: der Speicherort f�r das Staging lokaler Dateien (z.�B. gs://your-bucket/staging)
  • PYTHON_FUNCTION: der Name der benutzerdefinierten Python-Funktion (UDF), die Sie verwenden m�chten.
  • PATH_TO_BIGQUERY_SCHEMA_JSON: der Cloud Storage-Pfad zur JSON-Datei, die die Schemadefinition enth�lt
  • PATH_TO_PYTHON_UDF_FILE: Der Cloud Storage-URI der Python-Codedatei, in der die benutzerdefinierte Funktion (UDF) definiert wird, die Sie verwenden m�chten. Beispiel: gs://my-bucket/my-udfs/my_file.py.
  • PATH_TO_TEXT_DATA: der Cloud Storage-Pfad zu Ihrem Text-Dataset
  • BIGQUERY_TABLE: Ihr BigQuery-Tabellenname
  • BIGQUERY_UNPROCESSED_TABLE: der Name Ihrer BigQuery-Tabelle f�r nicht verarbeitete Nachrichten
  • PATH_TO_TEMP_DIR_ON_GCS: der Cloud Storage-Pfad zum tempor�ren Verzeichnis

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuf�hren. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
       "pythonExternalTextTransformFunctionName": "PYTHON_FUNCTION",
       "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "pythonExternalTextTransformGcsPath": "PATH_TO_PYTHON_UDF_FILE",
       "inputFilePattern":"PATH_TO_TEXT_DATA",
       "outputTable":"BIGQUERY_TABLE",
       "outputDeadletterTable":"BIGQUERY_UNPROCESSED_TABLE",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Stream_GCS_Text_to_BigQuery_Xlang",
   }
}

Ersetzen Sie Folgendes:

  • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausf�hren m�chten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • LOCATION: die Region, in der Sie Ihren Dataflow-Job bereitstellen m�chten, z. B. us-central1
  • VERSION: Die Version der Vorlage, die Sie verwenden m�chten

    Sie k�nnen die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten �bergeordneten Ordner im Bucket verf�gbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten �bergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • STAGING_LOCATION: der Speicherort f�r das Staging lokaler Dateien (z.�B. gs://your-bucket/staging)
  • PYTHON_FUNCTION: der Name der benutzerdefinierten Python-Funktion (UDF), die Sie verwenden m�chten.
  • PATH_TO_BIGQUERY_SCHEMA_JSON: der Cloud Storage-Pfad zur JSON-Datei, die die Schemadefinition enth�lt
  • PATH_TO_PYTHON_UDF_FILE: Der Cloud Storage-URI der Python-Codedatei, in der die benutzerdefinierte Funktion (UDF) definiert wird, die Sie verwenden m�chten. Beispiel: gs://my-bucket/my-udfs/my_file.py.
  • PATH_TO_TEXT_DATA: der Cloud Storage-Pfad zu Ihrem Text-Dataset
  • BIGQUERY_TABLE: Ihr BigQuery-Tabellenname
  • BIGQUERY_UNPROCESSED_TABLE: der Name Ihrer BigQuery-Tabelle f�r nicht verarbeitete Nachrichten
  • PATH_TO_TEMP_DIR_ON_GCS: der Cloud Storage-Pfad zum tempor�ren Verzeichnis

N�chste Schritte