Luồng khí: Lời khuyên ít được biết đến, Thủ thuật và Thực tiễn Tốt nhất

Có một số điều nhất định với tất cả các công cụ bạn sử dụng mà bạn đã giành được ngay cả khi sử dụng nó trong một thời gian dài. Và một khi bạn biết điều đó, bạn giống như một người khác. Airflow như các công cụ khác không khác, có một số đá quý ẩn có thể làm cho cuộc sống của bạn dễ dàng và làm cho sự phát triển DAG trở nên thú vị.

Bạn có thể đã biết một số trong số họ và nếu bạn biết tất cả - bạn cũng là một PRO rồi.

(1) DAG với Trình quản lý bối cảnh

Bạn có thấy khó chịu với chính mình khi bạn quên thêm dag = dag vào nhiệm vụ của mình và lỗi Airflow bị lỗi không? Có, thật dễ dàng để quên thêm nó cho mỗi nhiệm vụ. Nó cũng là dự phòng để thêm tham số tương tự như trong ví dụ sau (tệp example_dag.py):

Ví dụ (tệp example_dag.py) ở trên chỉ có 2 tác vụ, nhưng nếu bạn có 10 hoặc nhiều hơn thì sự dư thừa sẽ trở nên rõ ràng hơn. Để tránh điều này, bạn có thể sử dụng Airflow DAG làm trình quản lý ngữ cảnh để tự động gán toán tử mới cho DAG đó như trong ví dụ trên (example_dag_with_context.py) bằng cách sử dụng với câu lệnh.

(2) Sử dụng Danh sách để đặt phụ thuộc Nhiệm vụ

Khi bạn muốn tạo DAG tương tự như DAG được hiển thị trong hình bên dưới, bạn sẽ phải lặp lại tên tác vụ khi đặt phụ thuộc tác vụ.

Như được hiển thị trong đoạn mã trên, sử dụng cách thông thường để đặt phụ thuộc tác vụ sẽ có nghĩa là task_two và end được lặp lại 3 lần. Điều này có thể được thay thế bằng cách sử dụng danh sách python để đạt được kết quả tương tự theo cách thanh lịch hơn.

(3) Sử dụng đối số mặc định để tránh lặp lại đối số

Luồng khí cho phép vượt qua một từ điển các tham số sẽ có sẵn cho tất cả các nhiệm vụ trong DAG đó.

Ví dụ: tại DataReply, chúng tôi sử dụng BigQuery cho tất cả các DAG liên quan đến DataWareshouse của chúng tôi và thay vì truyền các tham số như nhãn, bigquery_conn_id cho mỗi tác vụ, chúng tôi chỉ cần chuyển từ điển indefault_args như được hiển thị trong DAG bên dưới.

Điều này cũng hữu ích khi bạn muốn thông báo về các lỗi tác vụ riêng lẻ thay vì chỉ các lỗi DAG mà tôi đã đề cập trong bài đăng trên blog cuối cùng của mình về Tích hợp cảnh báo Slack trong Airflow.

(4) Đối số của params

Thông số kỹ thuật của Nhật Bản là một từ điển các tham số mức DAG có thể truy cập được trong các mẫu. Các thông số này có thể được ghi đè ở cấp độ nhiệm vụ.

Đây là một đối số cực kỳ hữu ích và cá nhân tôi đã sử dụng nó rất nhiều vì nó có thể được truy cập trong trường templated với jinja templating bằng params.param_name. Một ví dụ sử dụng như sau:

Nó giúp bạn dễ dàng viết DAG tham số hóa thay vì các giá trị mã hóa cứng. Cũng như trong các ví dụ ở trên, từ điển params có thể được xác định tại 3 vị trí: (1) Trong đối tượng DAG (2) Trong từ điển default_args (3) Mỗi ​​tác vụ.

(5) Lưu trữ dữ liệu nhạy cảm trong các kết nối

Hầu hết người dùng đều biết điều này nhưng tôi vẫn thấy mật khẩu được lưu trữ trong văn bản đơn giản bên trong DAG. Vì lợi ích - don lồng làm điều đó. Bạn nên viết DAG của bạn theo cách mà bạn đủ tự tin để lưu trữ các DAG của bạn trong một kho lưu trữ công cộng.

Theo mặc định, Airflow sẽ lưu mật khẩu cho kết nối bằng văn bản thuần trong cơ sở dữ liệu siêu dữ liệu. Gói tiền điện tử rất được khuyến khích trong quá trình cài đặt Airflow và có thể được thực hiện đơn giản bằng cách cài đặt pip apache-airflow [crypto].

Sau đó bạn có thể dễ dàng truy cập nó như sau:

từ airflow.hooks.base_hook nhập BaseHook
slack_token = BaseHook.get_connection ('slack'). mật khẩu

(6) Hạn chế số lượng biến Airflow trong DAG của bạn

Biến không khí được lưu trữ trong Cơ sở dữ liệu siêu dữ liệu, do đó, bất kỳ lệnh gọi đến biến nào cũng có nghĩa là kết nối với Metadata DB. Các tệp DAG của bạn được phân tích cú pháp mỗi X giây. Sử dụng một số lượng lớn biến trong DAG của bạn (và tệ hơn là default_args) có thể có nghĩa là bạn có thể sẽ bão hòa số lượng kết nối được phép vào cơ sở dữ liệu của bạn.

Để tránh tình trạng này, bạn chỉ có thể sử dụng một biến Airflow duy nhất với giá trị JSON. Vì một biến Airflow có thể chứa giá trị JSON, bạn có thể lưu trữ tất cả cấu hình DAG của mình bên trong một biến duy nhất như trong hình bên dưới:

Như được hiển thị trong ảnh chụp màn hình này, bạn có thể lưu trữ các giá trị trong các biến Airflow riêng biệt hoặc dưới một biến Airflow duy nhất dưới dạng trường JSON

Sau đó, bạn có thể truy cập chúng như hiển thị bên dưới theo cách được đề xuất:

(7) Từ điển bối cảnh của người Viking

Người dùng thường quên nội dung của từ điển ngữ cảnh khi sử dụng PythonOperator với chức năng có thể gọi được.

Ngữ cảnh chứa các tham chiếu đến các đối tượng liên quan đến thể hiện tác vụ và được ghi lại dưới phần macro của API vì chúng cũng có sẵn cho trường templated.

{
      'dag': task.dag,
      'DS': DS,
      'next_ds': next_ds,
      'next_ds_nodash': next_ds_nodash,
      'trước_ds': trước_ds,
      'trước_ds_nodash': trước_ds_nodash,
      'DS_nodash': ds_nodash,
      'ts': ts,
      'ts_nodash': ts_nodash,
      'ts_nodash_with_tz': ts_nodash_with_tz,
      'ngày hôm qua': ngày hôm qua
      'ngày hôm qua_ds_nodash': ngày hôm qua_ds_nodash,
      'orrow_ds ':orrow_ds,
      'orrow_ds_nodash ': ngày mai_ds_nodash,
      'END_DATE': DS,
      'end_date': DS,
      'dag_run': dag_run,
      'run_id': run_id,
      'exec_date': self.execut_date,
      'trước_execut_date': trước_execut_date,
      'next_execut_date': next_execut_date,
      'mới nhất_date': DS,
      'macro': macro,
      'params': params,
      'bảng': bảng,
      'nhiệm vụ': nhiệm vụ,
      'task_instance': tự,
      'ti': tự,
      'task_instance_key_str': ti_key_str,
      'conf': cấu hình,
      'test_mode': self.test_mode,
      'var': {
          'value': Var biếnAccessor (),
          'json': Var biếnJsonAccessor ()
      },
      'đầu vào': task.inlets,
      'cửa hàng': task.outlets,
}

(8) Tạo nhiệm vụ luồng không khí động

Tôi đã trả lời nhiều câu hỏi trên StackOverflow về cách tạo các tác vụ động. Câu trả lời rất đơn giản, bạn chỉ cần tạo task_id duy nhất cho tất cả các tác vụ của mình. Dưới đây là 2 ví dụ về cách đạt được điều đó:

(9) Chạy luồng khí nâng cấpbb thay vì luồng khí luồng initdb

Cảm ơn Ash Berlin về lời khuyên này trong bài nói chuyện của ông trong Cuộc gặp gỡ Apache Airflow London đầu tiên.

luồng khí initdb sẽ tạo ra tất cả các kết nối mặc định, biểu đồ, vv mà chúng tôi có thể không sử dụng và không muốn dùng trong cơ sở dữ liệu sản xuất của mình. thay vào đó, airflowb sẽ chỉ áp dụng bất kỳ chuyển đổi bị thiếu nào vào bảng cơ sở dữ liệu. (bao gồm cả việc tạo các bảng bị thiếu, v.v.) Nó cũng an toàn để chạy mỗi lần, nó theo dõi những lần di chuyển nào đã được áp dụng (sử dụng mô-đun Alembic).

Hãy cho tôi biết trong phần bình luận bên dưới nếu bạn biết điều gì đó đáng để thêm vào bài viết trên blog này. Chúc mừng Airflow