W dzisiejszym świecie opartym na danych, wydajne przetwarzanie danych ma kluczowe znaczenie dla organizacji poszukujących wglądu i podejmowania świadomych decyzji. Google Cloud Platform (GCP) oferuje potężne narzędzia, takie jak Apache Airflow i BigQuery, usprawniające procesy przetwarzania danych. W tym przewodniku zbadamy, jak wykorzystać te narzędzia do tworzenia solidnych i skalowalnych potoków danych.
Konfiguracja Apache Airflow na Google Cloud Platform
Apache Airflow, platforma open-source, orkiestruje skomplikowane przepływy pracy. Pozwala ona programistom definiować, planować i monitorować przepływy pracy przy użyciu skierowanych grafów acyklicznych (DAG), zapewniając elastyczność i skalowalność zadań przetwarzania danych. Konfiguracja Airflow na GCP jest prosta przy użyciu usług zarządzanych, takich jak Cloud Composer. Aby rozpocząć, proszę wykonać poniższe kroki:
- Proszę utworzyć środowisko Google Cloud Composer: Proszę przejść do sekcji Cloud Composer w konsoli GCP i utworzyć nowe środowisko. Proszę wybrać żądane opcje konfiguracji, takie jak liczba węzłów i typ maszyny.
- Proszę zainstalować dodatkowe pakiety Python: Airflow obsługuje niestandardowe pakiety Python w celu rozszerzenia jego funkcjonalności. Mogą Państwo zainstalować dodatkowe pakiety używając requirements.txt lub instalując je bezpośrednio z poziomu interfejsu internetowego Airflow.
- Konfiguracja połączeń: Airflow wykorzystuje obiekty połączeń do łączenia się z systemami zewnętrznymi, takimi jak BigQuery. Proszę skonfigurować niezbędne połączenia w interfejsie internetowym Airflow, podając dane uwierzytelniające i szczegóły połączenia.
Projektowanie potoków danych za pomocą Apache Airflow
Po skonfigurowaniu Airflow można projektować potoki danych przy użyciu Directed Acyclic Graphs (DAGs). DAG reprezentuje przepływ pracy składający się z zadań, gdzie każde zadanie wykonuje określoną operację przetwarzania danych. Oto jak zaprojektować potoki danych za pomocą Airflow:
- Definiowanie DAG: Tworzenie skryptów Python do definiowania DAG w Airflow. Każdy skrypt DAG powinien importować niezbędne moduły i definiować zadania przy użyciu operatorów dostarczanych przez Airflow, takich jak
BigQueryOperator
do interakcji z BigQuery.
destination_dataset_table=”your_project.your_dataset.output_table2″,
write_disposition=’WRITE_APPEND’,
dag=dag
)
# Definiowanie zależności zadań
start_task >> bq_query_task1 >> bq_query_task2 >> end_task” data-lang=”text/x-python”>
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators.bigquery_to_gcs import BigQueryToGCSOperator
from datetime import datetime
# Define the default arguments for the DAG
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 3, 3),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1
}
# Instantiate the DAG object
dag = DAG(
'bigquery_data_pipeline',
default_args=default_args,
description='A DAG for data pipeline with BigQuery tasks',
schedule_interval="@daily"
)
# Define tasks
start_task = DummyOperator(task_id='start_task', dag=dag)
end_task = DummyOperator(task_id='end_task', dag=dag)
# Define BigQuery tasks
bq_query_task1 = BigQueryOperator(
task_id='bq_query_task1',
sql="SELECT * FROM your_table",
destination_dataset_table="your_project.your_dataset.output_table1",
write_disposition='WRITE_TRUNCATE',
dag=dag
)
bq_query_task2 = BigQueryOperator(
task_id='bq_query_task2',
sql="SELECT * FROM your_table WHERE date > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)",
destination_dataset_table="your_project.your_dataset.output_table2",
write_disposition='WRITE_APPEND',
dag=dag
)
# Define task dependencies
start_task >> bq_query_task1 >> bq_query_task2 >> end_task
W tym przykładzie:
- Definiujemy DAG o nazwie
bigquery_data_pipeline
z dziennym interwałem harmonogramu przy użyciuschedule_interval
parametr ustawiony na'@daily'
. - Dwa fikcyjne zadania (
start_task
iend_task
) są zdefiniowane przy użyciuDummyOperator
. Zadania te służą jako symbole zastępcze i nie są powiązane z żadnym faktycznym przetwarzaniem. - Dwa zadania BigQuery (
bq_query_task1
orazbq_query_task2
) są zdefiniowane przy użyciuBigQueryOperator
. Zadania te wykonują zapytania SQL w BigQuery i przechowują wyniki w tabelach docelowych. - Każde
BigQueryOperator
określa zapytanie SQL, które ma zostać wykonane (SQL parametr), docelowy zbiór danych i tabela (destination_dataset_table
parametr) oraz dyspozycja zapisu (write_disposition
parametr). - Zależności zadań są zdefiniowane w taki sposób, że
bq_query_task1
musi zostać uruchomione przedbq_query_task2
i obabq_query_task1
orazbq_query_task2
musi działać pomiędzystart_task
iend_task
.
Definiując DAG w ten sposób, można tworzyć solidne potoki danych w Apache Airflow, które współdziałają z BigQuery w celu przetwarzania i analizy danych. Proszę dostosować zapytania SQL i tabele docelowe w zależności od potrzeb, aby dopasować je do konkretnego przypadku użycia.
- Konfiguracja zależności zadań: Proszę określić zależności zadań w DAG, aby zapewnić właściwą kolejność wykonywania. Airflow umożliwia definiowanie zależności przy użyciu funkcji
set_upstream
iset_downstream
metody.
# Define tasks
task1 = DummyOperator(task_id='task1', dag=dag)
task2 = DummyOperator(task_id='task2', dag=dag)
task3 = DummyOperator(task_id='task3', dag=dag)
task4 = DummyOperator(task_id='task4', dag=dag)
# Set task dependencies
task1.set_downstream(task2)
task1.set_downstream(task3)
task2.set_downstream(task4)
task3.set_downstream(task4)
W tym przykładzie:
- Tworzymy DAG o nazwie
sample_dag
z dziennym interwałem harmonogramu. - Cztery zadania (
task1
,task2
,task3
,task4
) są zdefiniowane przy użyciuDummyOperator
, która reprezentuje zadania zastępcze. - Zależności zadań są konfigurowane za pomocą
set_downstream
. W tym przypadku,task2
oraztask3
są poniżejtask1
oraztask4
znajduje się poniżej obutask2
itask3
.
Ta konfiguracja zapewnia, że task1
zostanie wykonana jako pierwsza, a następnie task2
lub task3
(ponieważ są zrównoleglone), i wreszcie task4
zostanie wykonany po obu task2
i task3
zostały zakończone.
- Ustawianie harmonogramów zadań: Konfigurowanie harmonogramów zadań w DAG, aby kontrolować, kiedy powinny być wykonywane. Airflow obsługuje różne opcje planowania, w tym wyrażenia cron i harmonogramy interwałowe.
# Set task schedules
task1_execution_time = datetime(2024, 3, 3, 10, 0, 0) # Task 1 scheduled to run at 10:00 AM
task2_execution_time = task1_execution_time + timedelta(hours=1) # Task 2 scheduled to run 1 hour after Task 1
task3_execution_time = task1_execution_time + timedelta(hours=2) # Task 3 scheduled to run 2 hours after Task 1
task1.execution_date = task1_execution_time
task2.execution_date = task2_execution_time
task3.execution_date = task3_execution_time
# Define task dependencies
task1.set_downstream(task2)
task2.set_downstream(task3)
W tym przykładzie:
- Tworzymy DAG o nazwie
sample_scheduled_dag
z dziennym interwałem harmonogramu przy użyciuschedule_interval
parametr ustawiony na'@daily'
w konfiguracji zadań Zależności. - Harmonogramy zadań są konfigurowane poprzez określenie parametru
execution_date
dla każdego zadania.task1
ma zostać uruchomione o godzinie 10:00,task2
jest zaplanowane na 1 godzinę potask1
oraztask3
jest zaplanowany do uruchomienia 2 godziny potask1
. - Zależności zadań są ustawione w taki sposób, że
task2
znajduje się poniżejtask1
oraztask3
jest poniżejtask2
.
Konfigurując harmonogramy zadań w DAG, można kontrolować, kiedy każde zadanie powinno zostać wykonane, co pozwala na precyzyjną orkiestrację przepływów pracy przetwarzania danych w Apache Airflow.
Integracja z BigQuery w celu przetwarzania danych
BigQuery, oferowany przez Google Cloud, jest w pełni zarządzanym i bezserwerowym rozwiązaniem hurtowni danych. Oferuje wysokowydajne zapytania SQL i skalowalną pamięć masową do analizy dużych zbiorów danych. Oto jak zintegrować BigQuery z Apache Airflow w celu przetwarzania danych:
- Wykonywanie zapytań SQL: Korzystanie z
BigQueryOperator
można wykonywać zapytania SQL w BigQuery w ramach Apache Airflow DAGs, umożliwiając płynną integrację przepływów pracy przetwarzania danych z Google BigQuery. W razie potrzeby można dostosować zapytania SQL i tabele docelowe do konkretnych wymagań. - Ładowanie i eksportowanie danych: Airflow umożliwia ładowanie danych do BigQuery z zewnętrznych źródeł lub eksportowanie danych z BigQuery do innych miejsc docelowych. Proszę użyć operatorów takich jak
BigQueryToBigQueryOperator
iBigQueryToGCSOperator
dla operacji ładowania i eksportowania danych.
# Define BigQuery tasks for loading data from external source
bq_load_external_data_task = BigQueryToBigQueryOperator(
task_id='bq_load_external_data',
source_project_dataset_table="external_project.external_dataset.external_table",
destination_project_dataset_table="your_project.your_dataset.internal_table",
write_disposition='WRITE_TRUNCATE',
create_disposition='CREATE_IF_NEEDED',
dag=dag
)
# Define BigQuery tasks for exporting data to Google Cloud Storage (GCS)
bq_export_to_gcs_task = BigQueryToGCSOperator(
task_id='bq_export_to_gcs',
source_project_dataset_table="your_project.your_dataset.internal_table",
destination_cloud_storage_uris=['gs://your_bucket/your_file.csv'],
export_format="CSV",
dag=dag
)
# Define task dependencies
start_task >> bq_load_external_data_task >> bq_export_to_gcs_task >> end_task
- Monitorowanie i zarządzanie zadaniami: Airflow zapewnia wbudowane funkcje monitorowania i rejestrowania do zarządzania zadaniami BigQuery. Można monitorować statusy zadań, przeglądać dzienniki i obsługiwać awarie zadań za pomocą interfejsu internetowego Airflow lub narzędzi wiersza poleceń.
Oto jak można skutecznie monitorować i zarządzać zadaniami BigQuery w Airflow:
1. Interfejs sieciowy Airflow
- Strona uruchomień DAG: Interfejs internetowy Airflow udostępnia stronę “DAG Runs”, na której można wyświetlić status każdego uruchomienia DAG. Obejmuje to informacje o tym, czy uruchomienie DAG powiodło się, nie powiodło się lub jest obecnie uruchomione.
- Dzienniki instancji zadań: Można uzyskać dostęp do dzienników dla każdego wystąpienia zadania w ramach uruchomienia DAG. Dzienniki te dostarczają szczegółowych informacji na temat wykonywania zadań, w tym wszelkich napotkanych błędów lub wyjątków.
- Widok wykresu: Widok wykresu w interfejsie użytkownika Airflow zapewnia wizualną reprezentację DAG i jego zależności zadań. Mogą Państwo użyć tego widoku, aby zrozumieć przepływ pracy i zidentyfikować wszelkie wąskie gardła lub problemy.
2. Interfejs wiersza poleceń (CLI)
airflow dags list
: Proszę użyćairflow dags list
aby wyświetlić listę wszystkich dostępnych DAG w Państwa środowisku Airflow. Polecenie to zapewnia podstawowe informacje o każdym DAG, w tym jego status i datę ostatniego wykonania.airflow dags show
: Polecenieairflow dags show
umożliwia wyświetlenie szczegółowych informacji na temat określonego DAG, w tym jego zadań, zależności zadań i interwałów harmonogramu.airflow tasks list
: Proszę użyć poleceniaairflow tasks list
aby wyświetlić listę wszystkich zadań w ramach określonego DAG. To polecenie dostarcza informacji o każdym zadaniu, takich jak jego bieżący stan i data wykonania.airflow task logs
: Dostęp do dzienników zadań można uzyskać za pomocą poleceniaairflow task logs
. Polecenie to umożliwia przeglądanie dzienników dla określonego wystąpienia zadania, pomagając w rozwiązywaniu problemów z błędami lub awariami.
3. Rejestrowanie i alerty
- Rejestrowanie przepływu powietrza: Airflow rejestruje wszystkie wykonania zadań i przebiegi DAG, ułatwiając śledzenie postępów zadań i identyfikację problemów. Można skonfigurować poziomy rejestrowania i programy obsługi, aby kontrolować szczegółowość i miejsce docelowe dzienników.
- Alertowanie: Proszę skonfigurować alerty i powiadomienia, które będą wyzwalane na podstawie określonych zdarzeń, takich jak awarie zadań lub stany uruchomienia DAG. Można użyć narzędzi takich jak Slack, e-mail lub PagerDuty, aby otrzymywać alerty i podejmować odpowiednie działania.
4. Narzędzia do monitorowania
- Monitorowanie Stackdriver: Jeśli korzystają Państwo z Airflow na Google Cloud Platform, mogą Państwo użyć Stackdriver Monitoring do monitorowania kondycji i wydajności środowiska Airflow. Obejmuje to metryki, takie jak użycie procesora, użycie pamięci i czasy wykonywania zadań.
- Prometheus i Grafana: Integracja Airflow z Prometheus i Grafana w celu zaawansowanego monitorowania i wizualizacji wskaźników wydajności. Umożliwia to tworzenie niestandardowych pulpitów nawigacyjnych i uzyskiwanie wglądu w zachowanie zadań Airflow.
Wykorzystując te możliwości monitorowania i zarządzania zapewniane przez Apache Airflow, można skutecznie monitorować statusy zadań, przeglądać dzienniki i obsługiwać awarie zadań, zapewniając niezawodność i wydajność przepływów pracy z danymi, w tym tych obejmujących BigQuery.
Najlepsze praktyki w zakresie usprawniania przetwarzania danych
Aby zapewnić wydajne przetwarzanie danych na Google Cloud Platform, proszę rozważyć następujące najlepsze praktyki:
1. Optymalizacja wydajności zapytań
- Proszę używać wydajnych zapytań SQL: Proszę tworzyć zapytania SQL, które efektywnie wykorzystują możliwości BigQuery. Optymalizacja złączeń, agregacji i warunków filtrowania w celu zminimalizowania skanowania danych i poprawy wydajności zapytań.
- Wykorzystanie partycjonowania i klastrowania: Partycjonowanie tabel w oparciu o często filtrowane kolumny w celu zmniejszenia kosztów zapytań i poprawy ich wydajności. Wykorzystanie klastrowania do organizowania danych w ramach partycji w celu dalszej optymalizacji.
- Wykorzystanie buforowania zapytań: Proszę skorzystać z mechanizmu buforowania BigQuery, aby uniknąć zbędnych obliczeń. Ponowne wykorzystanie buforowanych wyników dla identycznych zapytań w celu skrócenia czasu wykonywania zapytań i obniżenia kosztów.
2. Dynamiczne skalowanie zasobów
- Automatyczne skalowanie: Proszę skonfigurować Airflow i powiązane zasoby do automatycznego skalowania w oparciu o wymagania obciążenia. Proszę skorzystać z usług zarządzanych, takich jak Cloud Composer na GCP, które mogą automatycznie skalować klastry Airflow w oparciu o liczbę aktywnych DAG i zadań.
- Preemptible VMs: W przypadku zadań przetwarzania wsadowego, które mogą tolerować przerwy w działaniu, należy korzystać z wywłaszczalnych maszyn wirtualnych (instancji wywłaszczalnych). Preemptible VMs są opłacalne i mogą znacznie obniżyć koszty zasobów dla niekrytycznych obciążeń.
3. Wdrożenie obsługi błędów
- Ponawianie zadań: Proszę skonfigurować zadania Airflow do automatycznego ponawiania po niepowodzeniu. Proszę stosować wykładnicze strategie backoff w celu stopniowego zwiększania interwałów ponawiania prób i uniknięcia przeciążenia usług niższego rzędu.
- Mechanizmy obsługi błędów: Wdrożenie solidnych mechanizmów obsługi błędów w potokach danych w celu sprawnej obsługi błędów przejściowych, problemów sieciowych i przerw w świadczeniu usług. Wykorzystanie wbudowanych funkcji obsługi błędów Airflow, takich jak on_failure_callback, do wykonywania niestandardowej logiki obsługi błędów.
- Monitorowanie alertów: Proszę skonfigurować alerty monitorowania i powiadomienia, aby proaktywnie wykrywać awarie potoku i reagować na nie. Proszę korzystać z usług monitorowania i alertów GCP, takich jak Cloud Monitoring i Stackdriver Logging, aby monitorować wykonywanie zadań Airflow i wyzwalać alerty w oparciu o predefiniowane warunki.
4. Monitorowanie i dostrajanie wydajności
- Monitorowanie metryk wydajności: Monitorowanie wskaźników wydajności potoku, w tym czasu wykonywania zapytań, przepustowości przetwarzania danych i wykorzystania zasobów. Proszę korzystać z narzędzi monitorujących GCP, aby śledzić wskaźniki wydajności w czasie rzeczywistym i identyfikować wąskie gardła wydajności.
- Dostrajanie konfiguracji: Proszę regularnie sprawdzać i dostosowywać konfiguracje potoków w oparciu o dane z monitorowania wydajności. Optymalizacja alokacji zasobów, dostosowanie ustawień równoległości i dostosowanie parametrów zapytań w celu poprawy ogólnej wydajności.
- Planowanie wydajności: Wykonywanie ćwiczeń planowania wydajności w celu zapewnienia, że zasoby są dostarczane optymalnie, aby sprostać wymaganiom obciążenia. Skalowanie zasobów w górę lub w dół w zależności od potrzeb w oparciu o historyczne wzorce użytkowania i przewidywany wzrost.
Wnioski
Wykorzystując Apache Airflow i BigQuery na Google Cloud Platform, deweloperzy mogą usprawnić procesy przetwarzania danych i budować skalowalne potoki danych do analizy i podejmowania decyzji. Aby zaprojektować wydajne potoki danych, zintegrować się z BigQuery i wdrożyć najlepsze praktyki w celu optymalizacji wydajności i niezawodności, proszę postępować zgodnie z wytycznymi przedstawionymi w tym przewodniku dla programistów. Dzięki odpowiednim narzędziom i praktykom organizacje mogą uwolnić pełny potencjał swoich zasobów danych i odnieść sukces biznesowy w chmurze.