W dzisiejszym świecie opartym na danych, wydajne przetwarzanie danych ma kluczowe znaczenie dla organizacji poszukujących wglądu i podejmowania świadomych decyzji. Google Cloud Platform (GCP) oferuje potężne narzędzia, takie jak Apache Airflow i BigQuery, usprawniające procesy przetwarzania danych. W tym przewodniku zbadamy, jak wykorzystać te narzędzia do tworzenia solidnych i skalowalnych potoków danych.
Konfiguracja Apache Airflow na Google Cloud Platform
Apache Airflow, platforma open-source, orkiestruje skomplikowane przepływy pracy. Pozwala ona programistom definiować, planować i monitorować przepływy pracy przy użyciu skierowanych grafów acyklicznych (DAG), zapewniając elastyczność i skalowalność zadań przetwarzania danych. Konfiguracja Airflow na GCP jest prosta przy użyciu usług zarządzanych, takich jak Cloud Composer. Aby rozpocząć, proszę wykonać poniższe kroki:
- Proszę utworzyć środowisko Google Cloud Composer: Proszę przejść do sekcji Cloud Composer w konsoli GCP i utworzyć nowe środowisko. Proszę wybrać żądane opcje konfiguracji, takie jak liczba węzłów i typ maszyny.
- Proszę zainstalować dodatkowe pakiety Python: Airflow obsługuje niestandardowe pakiety Python w celu rozszerzenia jego funkcjonalności. Mogą Państwo zainstalować dodatkowe pakiety używając requirements.txt lub instalując je bezpośrednio z poziomu interfejsu internetowego Airflow.
- Konfiguracja połączeń: Airflow wykorzystuje obiekty połączeń do łączenia się z systemami zewnętrznymi, takimi jak BigQuery. Proszę skonfigurować niezbędne połączenia w interfejsie internetowym Airflow, podając dane uwierzytelniające i szczegóły połączenia.
Projektowanie potoków danych za pomocą Apache Airflow
Po skonfigurowaniu Airflow można projektować potoki danych przy użyciu Directed Acyclic Graphs (DAGs). DAG reprezentuje przepływ pracy składający się z zadań, gdzie każde zadanie wykonuje określoną operację przetwarzania danych. Oto jak zaprojektować potoki danych za pomocą Airflow:
- Definiowanie DAG: Tworzenie skryptów Python do definiowania DAG w Airflow. Każdy skrypt DAG powinien importować niezbędne moduły i definiować zadania przy użyciu operatorów dostarczanych przez Airflow, takich jak BigQueryOperatordo interakcji z BigQuery.
destination_dataset_table=”your_project.your_dataset.output_table2″,
write_disposition=’WRITE_APPEND’,
dag=dag
)
# Definiowanie zależności zadań
start_task >> bq_query_task1 >> bq_query_task2 >> end_task” data-lang=”text/x-python”>
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators.bigquery_to_gcs import BigQueryToGCSOperator
from datetime import datetime
# Define the default arguments for the DAG
default_args = {
     'owner': 'airflow',
     'depends_on_past': False,
     'start_date': datetime(2024, 3, 3),
     'email_on_failure': False,
     'email_on_retry': False,
     'retries': 1
}
# Instantiate the DAG object
dag = DAG(
     'bigquery_data_pipeline',
     default_args=default_args,
     description='A DAG for data pipeline with BigQuery tasks',
     schedule_interval="@daily"
)
# Define tasks
start_task = DummyOperator(task_id='start_task', dag=dag)
end_task = DummyOperator(task_id='end_task', dag=dag)
# Define BigQuery tasks
bq_query_task1 = BigQueryOperator(
     task_id='bq_query_task1',
     sql="SELECT * FROM your_table",
     destination_dataset_table="your_project.your_dataset.output_table1",
     write_disposition='WRITE_TRUNCATE',
     dag=dag
)
 
bq_query_task2 = BigQueryOperator(
     task_id='bq_query_task2',
     sql="SELECT * FROM your_table WHERE date > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)",
     destination_dataset_table="your_project.your_dataset.output_table2",
     write_disposition='WRITE_APPEND',
     dag=dag
)
# Define task dependencies
start_task >> bq_query_task1 >> bq_query_task2 >> end_taskW tym przykładzie:
- Definiujemy DAG o nazwie bigquery_data_pipelinez dziennym interwałem harmonogramu przy użyciuschedule_intervalparametr ustawiony na'@daily'.
- Dwa fikcyjne zadania (start_taskiend_task) są zdefiniowane przy użyciuDummyOperator. Zadania te służą jako symbole zastępcze i nie są powiązane z żadnym faktycznym przetwarzaniem.
- Dwa zadania BigQuery (bq_query_task1orazbq_query_task2) są zdefiniowane przy użyciuBigQueryOperator. Zadania te wykonują zapytania SQL w BigQuery i przechowują wyniki w tabelach docelowych.
- Każde BigQueryOperatorokreśla zapytanie SQL, które ma zostać wykonane (SQL parametr), docelowy zbiór danych i tabela (destination_dataset_tableparametr) oraz dyspozycja zapisu (write_dispositionparametr).
- Zależności zadań są zdefiniowane w taki sposób, że bq_query_task1musi zostać uruchomione przedbq_query_task2i obabq_query_task1orazbq_query_task2musi działać pomiędzystart_taskiend_task.
Definiując DAG w ten sposób, można tworzyć solidne potoki danych w Apache Airflow, które współdziałają z BigQuery w celu przetwarzania i analizy danych. Proszę dostosować zapytania SQL i tabele docelowe w zależności od potrzeb, aby dopasować je do konkretnego przypadku użycia.
- Konfiguracja zależności zadań: Proszę określić zależności zadań w DAG, aby zapewnić właściwą kolejność wykonywania. Airflow umożliwia definiowanie zależności przy użyciu funkcji set_upstreamiset_downstreammetody.
# Define tasks
task1 = DummyOperator(task_id='task1', dag=dag)
task2 = DummyOperator(task_id='task2', dag=dag)
task3 = DummyOperator(task_id='task3', dag=dag)
task4 = DummyOperator(task_id='task4', dag=dag)
# Set task dependencies
task1.set_downstream(task2)
task1.set_downstream(task3)
task2.set_downstream(task4)
task3.set_downstream(task4)W tym przykładzie:
- Tworzymy DAG o nazwie sample_dagz dziennym interwałem harmonogramu.
- Cztery zadania (task1,task2,task3,task4) są zdefiniowane przy użyciuDummyOperator, która reprezentuje zadania zastępcze.
- Zależności zadań są konfigurowane za pomocą set_downstream. W tym przypadku,task2oraztask3są poniżejtask1oraztask4znajduje się poniżej obutask2itask3.
Ta konfiguracja zapewnia, że task1 zostanie wykonana jako pierwsza, a następnie task2 lub task3 (ponieważ są zrównoleglone), i wreszcie task4 zostanie wykonany po obu task2 i task3 zostały zakończone.
- Ustawianie harmonogramów zadań: Konfigurowanie harmonogramów zadań w DAG, aby kontrolować, kiedy powinny być wykonywane. Airflow obsługuje różne opcje planowania, w tym wyrażenia cron i harmonogramy interwałowe.
# Set task schedules
task1_execution_time = datetime(2024, 3, 3, 10, 0, 0) # Task 1 scheduled to run at 10:00 AM
task2_execution_time = task1_execution_time + timedelta(hours=1) # Task 2 scheduled to run 1 hour after Task 1
task3_execution_time = task1_execution_time + timedelta(hours=2) # Task 3 scheduled to run 2 hours after Task 1
task1.execution_date = task1_execution_time
task2.execution_date = task2_execution_time
task3.execution_date = task3_execution_time
# Define task dependencies
task1.set_downstream(task2)
task2.set_downstream(task3)W tym przykładzie:
- Tworzymy DAG o nazwie sample_scheduled_dagz dziennym interwałem harmonogramu przy użyciuschedule_intervalparametr ustawiony na'@daily'w konfiguracji zadań Zależności.
- Harmonogramy zadań są konfigurowane poprzez określenie parametru execution_datedla każdego zadania.task1ma zostać uruchomione o godzinie 10:00,task2jest zaplanowane na 1 godzinę potask1oraztask3jest zaplanowany do uruchomienia 2 godziny potask1.
- Zależności zadań są ustawione w taki sposób, że task2znajduje się poniżejtask1oraztask3jest poniżejtask2.
Konfigurując harmonogramy zadań w DAG, można kontrolować, kiedy każde zadanie powinno zostać wykonane, co pozwala na precyzyjną orkiestrację przepływów pracy przetwarzania danych w Apache Airflow.
Integracja z BigQuery w celu przetwarzania danych
BigQuery, oferowany przez Google Cloud, jest w pełni zarządzanym i bezserwerowym rozwiązaniem hurtowni danych. Oferuje wysokowydajne zapytania SQL i skalowalną pamięć masową do analizy dużych zbiorów danych. Oto jak zintegrować BigQuery z Apache Airflow w celu przetwarzania danych:
- Wykonywanie zapytań SQL: Korzystanie z BigQueryOperatormożna wykonywać zapytania SQL w BigQuery w ramach Apache Airflow DAGs, umożliwiając płynną integrację przepływów pracy przetwarzania danych z Google BigQuery. W razie potrzeby można dostosować zapytania SQL i tabele docelowe do konkretnych wymagań.
- Ładowanie i eksportowanie danych: Airflow umożliwia ładowanie danych do BigQuery z zewnętrznych źródeł lub eksportowanie danych z BigQuery do innych miejsc docelowych. Proszę użyć operatorów takich jak BigQueryToBigQueryOperatoriBigQueryToGCSOperatordla operacji ładowania i eksportowania danych.
# Define BigQuery tasks for loading data from external source
bq_load_external_data_task = BigQueryToBigQueryOperator(
task_id='bq_load_external_data',
source_project_dataset_table="external_project.external_dataset.external_table",
destination_project_dataset_table="your_project.your_dataset.internal_table",
write_disposition='WRITE_TRUNCATE',
create_disposition='CREATE_IF_NEEDED',
dag=dag
)
# Define BigQuery tasks for exporting data to Google Cloud Storage (GCS)
bq_export_to_gcs_task = BigQueryToGCSOperator(
task_id='bq_export_to_gcs',
source_project_dataset_table="your_project.your_dataset.internal_table",
destination_cloud_storage_uris=['gs://your_bucket/your_file.csv'],
export_format="CSV",
dag=dag
)
# Define task dependencies
start_task >> bq_load_external_data_task >> bq_export_to_gcs_task >> end_task
- Monitorowanie i zarządzanie zadaniami: Airflow zapewnia wbudowane funkcje monitorowania i rejestrowania do zarządzania zadaniami BigQuery. Można monitorować statusy zadań, przeglądać dzienniki i obsługiwać awarie zadań za pomocą interfejsu internetowego Airflow lub narzędzi wiersza poleceń.
Oto jak można skutecznie monitorować i zarządzać zadaniami BigQuery w Airflow:
1. Interfejs sieciowy Airflow
- Strona uruchomień DAG: Interfejs internetowy Airflow udostępnia stronę “DAG Runs”, na której można wyświetlić status każdego uruchomienia DAG. Obejmuje to informacje o tym, czy uruchomienie DAG powiodło się, nie powiodło się lub jest obecnie uruchomione.
- Dzienniki instancji zadań: Można uzyskać dostęp do dzienników dla każdego wystąpienia zadania w ramach uruchomienia DAG. Dzienniki te dostarczają szczegółowych informacji na temat wykonywania zadań, w tym wszelkich napotkanych błędów lub wyjątków.
- Widok wykresu: Widok wykresu w interfejsie użytkownika Airflow zapewnia wizualną reprezentację DAG i jego zależności zadań. Mogą Państwo użyć tego widoku, aby zrozumieć przepływ pracy i zidentyfikować wszelkie wąskie gardła lub problemy.
2. Interfejs wiersza poleceń (CLI)
- airflow dags list: Proszę użyć- airflow dags listaby wyświetlić listę wszystkich dostępnych DAG w Państwa środowisku Airflow. Polecenie to zapewnia podstawowe informacje o każdym DAG, w tym jego status i datę ostatniego wykonania.
- airflow dags show: Polecenie- airflow dags showumożliwia wyświetlenie szczegółowych informacji na temat określonego DAG, w tym jego zadań, zależności zadań i interwałów harmonogramu.
- airflow tasks list: Proszę użyć polecenia- airflow tasks listaby wyświetlić listę wszystkich zadań w ramach określonego DAG. To polecenie dostarcza informacji o każdym zadaniu, takich jak jego bieżący stan i data wykonania.
- airflow task logs: Dostęp do dzienników zadań można uzyskać za pomocą polecenia- airflow task logs. Polecenie to umożliwia przeglądanie dzienników dla określonego wystąpienia zadania, pomagając w rozwiązywaniu problemów z błędami lub awariami.
3. Rejestrowanie i alerty
- Rejestrowanie przepływu powietrza: Airflow rejestruje wszystkie wykonania zadań i przebiegi DAG, ułatwiając śledzenie postępów zadań i identyfikację problemów. Można skonfigurować poziomy rejestrowania i programy obsługi, aby kontrolować szczegółowość i miejsce docelowe dzienników.
- Alertowanie: Proszę skonfigurować alerty i powiadomienia, które będą wyzwalane na podstawie określonych zdarzeń, takich jak awarie zadań lub stany uruchomienia DAG. Można użyć narzędzi takich jak Slack, e-mail lub PagerDuty, aby otrzymywać alerty i podejmować odpowiednie działania.
4. Narzędzia do monitorowania
- Monitorowanie Stackdriver: Jeśli korzystają Państwo z Airflow na Google Cloud Platform, mogą Państwo użyć Stackdriver Monitoring do monitorowania kondycji i wydajności środowiska Airflow. Obejmuje to metryki, takie jak użycie procesora, użycie pamięci i czasy wykonywania zadań.
- Prometheus i Grafana: Integracja Airflow z Prometheus i Grafana w celu zaawansowanego monitorowania i wizualizacji wskaźników wydajności. Umożliwia to tworzenie niestandardowych pulpitów nawigacyjnych i uzyskiwanie wglądu w zachowanie zadań Airflow.
Wykorzystując te możliwości monitorowania i zarządzania zapewniane przez Apache Airflow, można skutecznie monitorować statusy zadań, przeglądać dzienniki i obsługiwać awarie zadań, zapewniając niezawodność i wydajność przepływów pracy z danymi, w tym tych obejmujących BigQuery.
Najlepsze praktyki w zakresie usprawniania przetwarzania danych
Aby zapewnić wydajne przetwarzanie danych na Google Cloud Platform, proszę rozważyć następujące najlepsze praktyki:
1. Optymalizacja wydajności zapytań
- Proszę używać wydajnych zapytań SQL: Proszę tworzyć zapytania SQL, które efektywnie wykorzystują możliwości BigQuery. Optymalizacja złączeń, agregacji i warunków filtrowania w celu zminimalizowania skanowania danych i poprawy wydajności zapytań.
- Wykorzystanie partycjonowania i klastrowania: Partycjonowanie tabel w oparciu o często filtrowane kolumny w celu zmniejszenia kosztów zapytań i poprawy ich wydajności. Wykorzystanie klastrowania do organizowania danych w ramach partycji w celu dalszej optymalizacji.
- Wykorzystanie buforowania zapytań: Proszę skorzystać z mechanizmu buforowania BigQuery, aby uniknąć zbędnych obliczeń. Ponowne wykorzystanie buforowanych wyników dla identycznych zapytań w celu skrócenia czasu wykonywania zapytań i obniżenia kosztów.
2. Dynamiczne skalowanie zasobów
- Automatyczne skalowanie: Proszę skonfigurować Airflow i powiązane zasoby do automatycznego skalowania w oparciu o wymagania obciążenia. Proszę skorzystać z usług zarządzanych, takich jak Cloud Composer na GCP, które mogą automatycznie skalować klastry Airflow w oparciu o liczbę aktywnych DAG i zadań.
- Preemptible VMs: W przypadku zadań przetwarzania wsadowego, które mogą tolerować przerwy w działaniu, należy korzystać z wywłaszczalnych maszyn wirtualnych (instancji wywłaszczalnych). Preemptible VMs są opłacalne i mogą znacznie obniżyć koszty zasobów dla niekrytycznych obciążeń.
3. Wdrożenie obsługi błędów
- Ponawianie zadań: Proszę skonfigurować zadania Airflow do automatycznego ponawiania po niepowodzeniu. Proszę stosować wykładnicze strategie backoff w celu stopniowego zwiększania interwałów ponawiania prób i uniknięcia przeciążenia usług niższego rzędu.
- Mechanizmy obsługi błędów: Wdrożenie solidnych mechanizmów obsługi błędów w potokach danych w celu sprawnej obsługi błędów przejściowych, problemów sieciowych i przerw w świadczeniu usług. Wykorzystanie wbudowanych funkcji obsługi błędów Airflow, takich jak on_failure_callback, do wykonywania niestandardowej logiki obsługi błędów.
- Monitorowanie alertów: Proszę skonfigurować alerty monitorowania i powiadomienia, aby proaktywnie wykrywać awarie potoku i reagować na nie. Proszę korzystać z usług monitorowania i alertów GCP, takich jak Cloud Monitoring i Stackdriver Logging, aby monitorować wykonywanie zadań Airflow i wyzwalać alerty w oparciu o predefiniowane warunki.
4. Monitorowanie i dostrajanie wydajności
- Monitorowanie metryk wydajności: Monitorowanie wskaźników wydajności potoku, w tym czasu wykonywania zapytań, przepustowości przetwarzania danych i wykorzystania zasobów. Proszę korzystać z narzędzi monitorujących GCP, aby śledzić wskaźniki wydajności w czasie rzeczywistym i identyfikować wąskie gardła wydajności.
- Dostrajanie konfiguracji: Proszę regularnie sprawdzać i dostosowywać konfiguracje potoków w oparciu o dane z monitorowania wydajności. Optymalizacja alokacji zasobów, dostosowanie ustawień równoległości i dostosowanie parametrów zapytań w celu poprawy ogólnej wydajności.
- Planowanie wydajności: Wykonywanie ćwiczeń planowania wydajności w celu zapewnienia, że zasoby są dostarczane optymalnie, aby sprostać wymaganiom obciążenia. Skalowanie zasobów w górę lub w dół w zależności od potrzeb w oparciu o historyczne wzorce użytkowania i przewidywany wzrost.
Wnioski
Wykorzystując Apache Airflow i BigQuery na Google Cloud Platform, deweloperzy mogą usprawnić procesy przetwarzania danych i budować skalowalne potoki danych do analizy i podejmowania decyzji. Aby zaprojektować wydajne potoki danych, zintegrować się z BigQuery i wdrożyć najlepsze praktyki w celu optymalizacji wydajności i niezawodności, proszę postępować zgodnie z wytycznymi przedstawionymi w tym przewodniku dla programistów. Dzięki odpowiednim narzędziom i praktykom organizacje mogą uwolnić pełny potencjał swoich zasobów danych i odnieść sukces biznesowy w chmurze.
