Przetwarzanie danych w GCP za pomocą Apache Airflow i BigQuery

W dzisiejszym świecie opartym na danych, wydajne przetwarzanie danych ma kluczowe znaczenie dla organizacji poszukujących wglądu i podejmowania świadomych decyzji. Google Cloud Platform (GCP) oferuje potężne narzędzia, takie jak Apache Airflow i BigQuery, usprawniające procesy przetwarzania danych. W tym przewodniku zbadamy, jak wykorzystać te narzędzia do tworzenia solidnych i skalowalnych potoków danych.

Konfiguracja Apache Airflow na Google Cloud Platform

Apache Airflow, platforma open-source, orkiestruje skomplikowane przepływy pracy. Pozwala ona programistom definiować, planować i monitorować przepływy pracy przy użyciu skierowanych grafów acyklicznych (DAG), zapewniając elastyczność i skalowalność zadań przetwarzania danych. Konfiguracja Airflow na GCP jest prosta przy użyciu usług zarządzanych, takich jak Cloud Composer. Aby rozpocząć, proszę wykonać poniższe kroki:

  1. Proszę utworzyć środowisko Google Cloud Composer: Proszę przejść do sekcji Cloud Composer w konsoli GCP i utworzyć nowe środowisko. Proszę wybrać żądane opcje konfiguracji, takie jak liczba węzłów i typ maszyny.
  2. Proszę zainstalować dodatkowe pakiety Python: Airflow obsługuje niestandardowe pakiety Python w celu rozszerzenia jego funkcjonalności. Mogą Państwo zainstalować dodatkowe pakiety używając requirements.txt lub instalując je bezpośrednio z poziomu interfejsu internetowego Airflow.
  3. Konfiguracja połączeń: Airflow wykorzystuje obiekty połączeń do łączenia się z systemami zewnętrznymi, takimi jak BigQuery. Proszę skonfigurować niezbędne połączenia w interfejsie internetowym Airflow, podając dane uwierzytelniające i szczegóły połączenia.

Projektowanie potoków danych za pomocą Apache Airflow

Po skonfigurowaniu Airflow można projektować potoki danych przy użyciu Directed Acyclic Graphs (DAGs). DAG reprezentuje przepływ pracy składający się z zadań, gdzie każde zadanie wykonuje określoną operację przetwarzania danych. Oto jak zaprojektować potoki danych za pomocą Airflow:

  1. Definiowanie DAG: Tworzenie skryptów Python do definiowania DAG w Airflow. Każdy skrypt DAG powinien importować niezbędne moduły i definiować zadania przy użyciu operatorów dostarczanych przez Airflow, takich jak BigQueryOperator do interakcji z BigQuery.
TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)”,
destination_dataset_table=”your_project.your_dataset.output_table2″,
write_disposition=’WRITE_APPEND’,
dag=dag
)

# Definiowanie zależności zadań
start_task >> bq_query_task1 >> bq_query_task2 >> end_task” data-lang=”text/x-python”>

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators.bigquery_to_gcs import BigQueryToGCSOperator
from datetime import datetime

# Define the default arguments for the DAG

default_args = {
     'owner': 'airflow',
     'depends_on_past': False,
     'start_date': datetime(2024, 3, 3),
     'email_on_failure': False,
     'email_on_retry': False,
     'retries': 1
}

# Instantiate the DAG object
dag = DAG(
     'bigquery_data_pipeline',
     default_args=default_args,
     description='A DAG for data pipeline with BigQuery tasks',
     schedule_interval="@daily"
)

# Define tasks
start_task = DummyOperator(task_id='start_task', dag=dag)
end_task = DummyOperator(task_id='end_task', dag=dag)


# Define BigQuery tasks
bq_query_task1 = BigQueryOperator(
     task_id='bq_query_task1',
     sql="SELECT * FROM your_table",
     destination_dataset_table="your_project.your_dataset.output_table1",
     write_disposition='WRITE_TRUNCATE',
     dag=dag
)

 
bq_query_task2 = BigQueryOperator(
     task_id='bq_query_task2',
     sql="SELECT * FROM your_table WHERE date > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)",
     destination_dataset_table="your_project.your_dataset.output_table2",
     write_disposition='WRITE_APPEND',
     dag=dag
)

# Define task dependencies
start_task >> bq_query_task1 >> bq_query_task2 >> end_task

W tym przykładzie:

  • Definiujemy DAG o nazwie bigquery_data_pipeline z dziennym interwałem harmonogramu przy użyciu schedule_interval parametr ustawiony na '@daily'.
  • Dwa fikcyjne zadania (start_task i end_task) są zdefiniowane przy użyciu DummyOperator. Zadania te służą jako symbole zastępcze i nie są powiązane z żadnym faktycznym przetwarzaniem.
  • Dwa zadania BigQuery (bq_query_task1 oraz bq_query_task2) są zdefiniowane przy użyciu BigQueryOperator. Zadania te wykonują zapytania SQL w BigQuery i przechowują wyniki w tabelach docelowych.
  • Każde BigQueryOperator określa zapytanie SQL, które ma zostać wykonane (SQL parametr), docelowy zbiór danych i tabela (destination_dataset_table parametr) oraz dyspozycja zapisu (write_disposition parametr).
  • Zależności zadań są zdefiniowane w taki sposób, że bq_query_task1 musi zostać uruchomione przed bq_query_task2i oba bq_query_task1 oraz bq_query_task2 musi działać pomiędzy start_task i end_task.

Definiując DAG w ten sposób, można tworzyć solidne potoki danych w Apache Airflow, które współdziałają z BigQuery w celu przetwarzania i analizy danych. Proszę dostosować zapytania SQL i tabele docelowe w zależności od potrzeb, aby dopasować je do konkretnego przypadku użycia.

  1. Konfiguracja zależności zadań: Proszę określić zależności zadań w DAG, aby zapewnić właściwą kolejność wykonywania. Airflow umożliwia definiowanie zależności przy użyciu funkcji set_upstream i set_downstream metody.
# Define tasks
task1 = DummyOperator(task_id='task1', dag=dag)
task2 = DummyOperator(task_id='task2', dag=dag)
task3 = DummyOperator(task_id='task3', dag=dag)
task4 = DummyOperator(task_id='task4', dag=dag)


# Set task dependencies
task1.set_downstream(task2)
task1.set_downstream(task3)
task2.set_downstream(task4)
task3.set_downstream(task4)

W tym przykładzie:

  • Tworzymy DAG o nazwie sample_dag z dziennym interwałem harmonogramu.
  • Cztery zadania (task1, task2, task3, task4) są zdefiniowane przy użyciu DummyOperator, która reprezentuje zadania zastępcze.
  • Zależności zadań są konfigurowane za pomocą set_downstream . W tym przypadku, task2 oraz task3 są poniżej task1oraz task4 znajduje się poniżej obu task2 i task3.

Ta konfiguracja zapewnia, że task1 zostanie wykonana jako pierwsza, a następnie task2 lub task3 (ponieważ są zrównoleglone), i wreszcie task4 zostanie wykonany po obu task2 i task3 zostały zakończone.

  1. Ustawianie harmonogramów zadań: Konfigurowanie harmonogramów zadań w DAG, aby kontrolować, kiedy powinny być wykonywane. Airflow obsługuje różne opcje planowania, w tym wyrażenia cron i harmonogramy interwałowe.
# Set task schedules
task1_execution_time = datetime(2024, 3, 3, 10, 0, 0) # Task 1 scheduled to run at 10:00 AM
task2_execution_time = task1_execution_time + timedelta(hours=1) # Task 2 scheduled to run 1 hour after Task 1
task3_execution_time = task1_execution_time + timedelta(hours=2) # Task 3 scheduled to run 2 hours after Task 1

task1.execution_date = task1_execution_time
task2.execution_date = task2_execution_time
task3.execution_date = task3_execution_time

# Define task dependencies
task1.set_downstream(task2)
task2.set_downstream(task3)

W tym przykładzie:

  • Tworzymy DAG o nazwie sample_scheduled_dag z dziennym interwałem harmonogramu przy użyciu schedule_interval parametr ustawiony na '@daily' w konfiguracji zadań Zależności.
  • Harmonogramy zadań są konfigurowane poprzez określenie parametru execution_date dla każdego zadania. task1 ma zostać uruchomione o godzinie 10:00, task2 jest zaplanowane na 1 godzinę po task1oraz task3 jest zaplanowany do uruchomienia 2 godziny po task1.
  • Zależności zadań są ustawione w taki sposób, że task2 znajduje się poniżej task1oraz task3 jest poniżej task2.

Konfigurując harmonogramy zadań w DAG, można kontrolować, kiedy każde zadanie powinno zostać wykonane, co pozwala na precyzyjną orkiestrację przepływów pracy przetwarzania danych w Apache Airflow.

Integracja z BigQuery w celu przetwarzania danych

BigQuery, oferowany przez Google Cloud, jest w pełni zarządzanym i bezserwerowym rozwiązaniem hurtowni danych. Oferuje wysokowydajne zapytania SQL i skalowalną pamięć masową do analizy dużych zbiorów danych. Oto jak zintegrować BigQuery z Apache Airflow w celu przetwarzania danych:

  1. Wykonywanie zapytań SQL: Korzystanie z BigQueryOperatormożna wykonywać zapytania SQL w BigQuery w ramach Apache Airflow DAGs, umożliwiając płynną integrację przepływów pracy przetwarzania danych z Google BigQuery. W razie potrzeby można dostosować zapytania SQL i tabele docelowe do konkretnych wymagań.
  2. Ładowanie i eksportowanie danych: Airflow umożliwia ładowanie danych do BigQuery z zewnętrznych źródeł lub eksportowanie danych z BigQuery do innych miejsc docelowych. Proszę użyć operatorów takich jak BigQueryToBigQueryOperator i BigQueryToGCSOperator dla operacji ładowania i eksportowania danych.
> bq_load_external_data_task > > bq_export_to_gcs_task > > end_task” data-lang=”text/x-python”>

# Define BigQuery tasks for loading data from external source
bq_load_external_data_task = BigQueryToBigQueryOperator(
task_id='bq_load_external_data',
source_project_dataset_table="external_project.external_dataset.external_table",
destination_project_dataset_table="your_project.your_dataset.internal_table",
write_disposition='WRITE_TRUNCATE',
create_disposition='CREATE_IF_NEEDED',
dag=dag
)

# Define BigQuery tasks for exporting data to Google Cloud Storage (GCS)
bq_export_to_gcs_task = BigQueryToGCSOperator(
task_id='bq_export_to_gcs',
source_project_dataset_table="your_project.your_dataset.internal_table",
destination_cloud_storage_uris=['gs://your_bucket/your_file.csv'],
export_format="CSV",
dag=dag
)

# Define task dependencies
start_task >> bq_load_external_data_task >> bq_export_to_gcs_task >> end_task

    1. Monitorowanie i zarządzanie zadaniami: Airflow zapewnia wbudowane funkcje monitorowania i rejestrowania do zarządzania zadaniami BigQuery. Można monitorować statusy zadań, przeglądać dzienniki i obsługiwać awarie zadań za pomocą interfejsu internetowego Airflow lub narzędzi wiersza poleceń.

Oto jak można skutecznie monitorować i zarządzać zadaniami BigQuery w Airflow:

1. Interfejs sieciowy Airflow

  • Strona uruchomień DAG: Interfejs internetowy Airflow udostępnia stronę “DAG Runs”, na której można wyświetlić status każdego uruchomienia DAG. Obejmuje to informacje o tym, czy uruchomienie DAG powiodło się, nie powiodło się lub jest obecnie uruchomione.
  • Dzienniki instancji zadań: Można uzyskać dostęp do dzienników dla każdego wystąpienia zadania w ramach uruchomienia DAG. Dzienniki te dostarczają szczegółowych informacji na temat wykonywania zadań, w tym wszelkich napotkanych błędów lub wyjątków.
  • Widok wykresu: Widok wykresu w interfejsie użytkownika Airflow zapewnia wizualną reprezentację DAG i jego zależności zadań. Mogą Państwo użyć tego widoku, aby zrozumieć przepływ pracy i zidentyfikować wszelkie wąskie gardła lub problemy.

2. Interfejs wiersza poleceń (CLI)

  • airflow dags list: Proszę użyć airflow dags list aby wyświetlić listę wszystkich dostępnych DAG w Państwa środowisku Airflow. Polecenie to zapewnia podstawowe informacje o każdym DAG, w tym jego status i datę ostatniego wykonania.
  • airflow dags show: Polecenie airflow dags show umożliwia wyświetlenie szczegółowych informacji na temat określonego DAG, w tym jego zadań, zależności zadań i interwałów harmonogramu.
  • airflow tasks list: Proszę użyć polecenia airflow tasks list aby wyświetlić listę wszystkich zadań w ramach określonego DAG. To polecenie dostarcza informacji o każdym zadaniu, takich jak jego bieżący stan i data wykonania.
  • airflow task logs: Dostęp do dzienników zadań można uzyskać za pomocą polecenia airflow task logs . Polecenie to umożliwia przeglądanie dzienników dla określonego wystąpienia zadania, pomagając w rozwiązywaniu problemów z błędami lub awariami.

3. Rejestrowanie i alerty

  • Rejestrowanie przepływu powietrza: Airflow rejestruje wszystkie wykonania zadań i przebiegi DAG, ułatwiając śledzenie postępów zadań i identyfikację problemów. Można skonfigurować poziomy rejestrowania i programy obsługi, aby kontrolować szczegółowość i miejsce docelowe dzienników.
  • Alertowanie: Proszę skonfigurować alerty i powiadomienia, które będą wyzwalane na podstawie określonych zdarzeń, takich jak awarie zadań lub stany uruchomienia DAG. Można użyć narzędzi takich jak Slack, e-mail lub PagerDuty, aby otrzymywać alerty i podejmować odpowiednie działania.

4. Narzędzia do monitorowania

  • Monitorowanie Stackdriver: Jeśli korzystają Państwo z Airflow na Google Cloud Platform, mogą Państwo użyć Stackdriver Monitoring do monitorowania kondycji i wydajności środowiska Airflow. Obejmuje to metryki, takie jak użycie procesora, użycie pamięci i czasy wykonywania zadań.
  • Prometheus i Grafana: Integracja Airflow z Prometheus i Grafana w celu zaawansowanego monitorowania i wizualizacji wskaźników wydajności. Umożliwia to tworzenie niestandardowych pulpitów nawigacyjnych i uzyskiwanie wglądu w zachowanie zadań Airflow.

Wykorzystując te możliwości monitorowania i zarządzania zapewniane przez Apache Airflow, można skutecznie monitorować statusy zadań, przeglądać dzienniki i obsługiwać awarie zadań, zapewniając niezawodność i wydajność przepływów pracy z danymi, w tym tych obejmujących BigQuery.

Najlepsze praktyki w zakresie usprawniania przetwarzania danych

Aby zapewnić wydajne przetwarzanie danych na Google Cloud Platform, proszę rozważyć następujące najlepsze praktyki:

1. Optymalizacja wydajności zapytań

  • Proszę używać wydajnych zapytań SQL: Proszę tworzyć zapytania SQL, które efektywnie wykorzystują możliwości BigQuery. Optymalizacja złączeń, agregacji i warunków filtrowania w celu zminimalizowania skanowania danych i poprawy wydajności zapytań.
  • Wykorzystanie partycjonowania i klastrowania: Partycjonowanie tabel w oparciu o często filtrowane kolumny w celu zmniejszenia kosztów zapytań i poprawy ich wydajności. Wykorzystanie klastrowania do organizowania danych w ramach partycji w celu dalszej optymalizacji.
  • Wykorzystanie buforowania zapytań: Proszę skorzystać z mechanizmu buforowania BigQuery, aby uniknąć zbędnych obliczeń. Ponowne wykorzystanie buforowanych wyników dla identycznych zapytań w celu skrócenia czasu wykonywania zapytań i obniżenia kosztów.

2. Dynamiczne skalowanie zasobów

  • Automatyczne skalowanie: Proszę skonfigurować Airflow i powiązane zasoby do automatycznego skalowania w oparciu o wymagania obciążenia. Proszę skorzystać z usług zarządzanych, takich jak Cloud Composer na GCP, które mogą automatycznie skalować klastry Airflow w oparciu o liczbę aktywnych DAG i zadań.
  • Preemptible VMs: W przypadku zadań przetwarzania wsadowego, które mogą tolerować przerwy w działaniu, należy korzystać z wywłaszczalnych maszyn wirtualnych (instancji wywłaszczalnych). Preemptible VMs są opłacalne i mogą znacznie obniżyć koszty zasobów dla niekrytycznych obciążeń.

3. Wdrożenie obsługi błędów

  • Ponawianie zadań: Proszę skonfigurować zadania Airflow do automatycznego ponawiania po niepowodzeniu. Proszę stosować wykładnicze strategie backoff w celu stopniowego zwiększania interwałów ponawiania prób i uniknięcia przeciążenia usług niższego rzędu.
  • Mechanizmy obsługi błędów: Wdrożenie solidnych mechanizmów obsługi błędów w potokach danych w celu sprawnej obsługi błędów przejściowych, problemów sieciowych i przerw w świadczeniu usług. Wykorzystanie wbudowanych funkcji obsługi błędów Airflow, takich jak on_failure_callback, do wykonywania niestandardowej logiki obsługi błędów.
  • Monitorowanie alertów: Proszę skonfigurować alerty monitorowania i powiadomienia, aby proaktywnie wykrywać awarie potoku i reagować na nie. Proszę korzystać z usług monitorowania i alertów GCP, takich jak Cloud Monitoring i Stackdriver Logging, aby monitorować wykonywanie zadań Airflow i wyzwalać alerty w oparciu o predefiniowane warunki.

4. Monitorowanie i dostrajanie wydajności

  • Monitorowanie metryk wydajności: Monitorowanie wskaźników wydajności potoku, w tym czasu wykonywania zapytań, przepustowości przetwarzania danych i wykorzystania zasobów. Proszę korzystać z narzędzi monitorujących GCP, aby śledzić wskaźniki wydajności w czasie rzeczywistym i identyfikować wąskie gardła wydajności.
  • Dostrajanie konfiguracji: Proszę regularnie sprawdzać i dostosowywać konfiguracje potoków w oparciu o dane z monitorowania wydajności. Optymalizacja alokacji zasobów, dostosowanie ustawień równoległości i dostosowanie parametrów zapytań w celu poprawy ogólnej wydajności.
  • Planowanie wydajności: Wykonywanie ćwiczeń planowania wydajności w celu zapewnienia, że zasoby są dostarczane optymalnie, aby sprostać wymaganiom obciążenia. Skalowanie zasobów w górę lub w dół w zależności od potrzeb w oparciu o historyczne wzorce użytkowania i przewidywany wzrost.

Wnioski

Wykorzystując Apache Airflow i BigQuery na Google Cloud Platform, deweloperzy mogą usprawnić procesy przetwarzania danych i budować skalowalne potoki danych do analizy i podejmowania decyzji. Aby zaprojektować wydajne potoki danych, zintegrować się z BigQuery i wdrożyć najlepsze praktyki w celu optymalizacji wydajności i niezawodności, proszę postępować zgodnie z wytycznymi przedstawionymi w tym przewodniku dla programistów. Dzięki odpowiednim narzędziom i praktykom organizacje mogą uwolnić pełny potencjał swoich zasobów danych i odnieść sukces biznesowy w chmurze.