Apache Spark và Amazon S3 - Gotchas và các thực tiễn tốt nhất

S3 là một kho lưu trữ đối tượng và không phải là một hệ thống tệp, do đó các vấn đề phát sinh từ tính nhất quán cuối cùng, các đổi tên không nguyên tử phải được xử lý trong mã ứng dụng. Máy chủ thư mục trong một hệ thống tập tin đã được thay thế bằng thuật toán băm của tên tệp. Điều này là xấu khi liệt kê những thứ, hoạt động thư mục, xóa và đổi tên (sao chép và xóa vì về mặt kỹ thuật không có đổi tên trong các cửa hàng đối tượng)

Bắt đầu sử dụng S3A (lược đồ URI: s3a: //) - Hadoop 2.7+. S3a là Máy khách S3 được đề xuất cho Hadoop 2.7 và sau đó S3a hoạt động hiệu quả hơn và hỗ trợ các tệp lớn hơn (tối đa 5TB) và có hỗ trợ tải lên nhiều phần. Tất cả các đối tượng có thể truy cập từ s3n: // URL cũng có thể được truy cập từ s3a chỉ bằng cách thay thế lược đồ URL. Các báo cáo lỗi nhất đối với S3N sẽ bị đóng dưới dạng WONTFIX

Làm cho Spark 2.0.1 hoạt động với S3a Đối với Spark 2.0.1, hãy sử dụng hadoop-aws-2.7.3.jar, aws-java-sdk-1.7.4.jar, joda-time-2.9.3.jar trong đường dẫn lớp của bạn; đừng quên cập nhật spark-default.conf bằng các khóa AWS và S3A FileSystemClass

Spark.hadoop.fs.s3a.access.key XXXXXXX
spark.hadoop.fs.s3a.secret.key XXXXXXX
spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem

Chắc chắn sử dụng Dataframes khi sắp xếp lại truy vấn và đẩy xuống vị ngữ có sẵn trong hộp và do đó ít dữ liệu được tìm nạp cuối cùng sẽ tăng tốc truy vấn của bạn

Nếu bạn đang đọc cùng một dữ liệu nhiều lần, hãy thử sử dụng .cache hoặc s3distcp để chuyển các tệp sang cụm EMR cục bộ của bạn để hưởng lợi từ hiệu suất đọc tệp tốt hơn của hệ thống tệp thực. Tùy chọn groupBy của s3distcp là một tùy chọn tuyệt vời để giải quyết vấn đề tệp nhỏ bằng cách hợp nhất một số lượng lớn các tệp nhỏ.

Điều này mang đến cho tôi vấn đề đọc một số lượng lớn các tệp nhỏ. Nếu việc hợp nhất các tệp bằng công cụ không phải là một tùy chọn, hãy thử đoạn mã sau hoạt động hiệu quả xung quanh nút cổ chai thư mục S3 chậm

nhập com.amazonaws.service.s3._, model._
    nhập com.amazonaws.auth.BasicAWSCredentials

    yêu cầu val = ListObjectsRequest () mới
    request.setBucketName (xô)
    request.setPrefix (tiền tố)
    request.setMaxKeys (pageLpm)
    def s3 = new AmazonS3Client (new BasicAWSCredentials (khóa, bí mật))

    val objs = s3.listObjects (request) // Lưu ý rằng phương thức này trả về dữ liệu bị cắt nếu dài hơn "pageLpm" ở trên. Bạn có thể cần phải đối phó với điều đó.
    sc.metize (objs.getObjectSummaries.map (_. getKey) .toList)
        .flatMap {key => Source.fromInputStream (s3.getObject (xô, khóa) .getObjectContent: InputStream) .getLines}

Đảm bảo rằng tùy chọn spark.sql.parquet.filterPushdown là đúng và spark.sql.parquet.mergeSchema là sai (để tránh lược đồ hợp nhất trong khi viết mà thực sự làm chậm giai đoạn viết của bạn). Rất may Spark 2.0 có mặc định đúng

Bạn có tự hỏi tại sao ngay khi công việc sắp hoàn thành, không có gì được ghi vào nhật ký và mọi hoạt động của tia lửa dường như đã dừng lại nhưng kết quả vẫn chưa có trong thư mục đầu ra của S3, điều gì đang xảy ra? Vâng, mỗi khi người thi hành viết kết quả của công việc, mỗi người trong số họ viết vào một thư mục tạm thời bên ngoài thư mục chính nơi các tệp phải được ghi và một khi tất cả các trình thực thi được thực hiện thì việc đổi tên được thực hiện để có được độc quyền nguyên tử. Điều này hoàn toàn ổn trong một hệ thống tập tin tiêu chuẩn như hdfs, nơi đổi tên là tức thời nhưng trên kho lưu trữ đối tượng như S3, điều này không có lợi vì việc đổi tên trên S3 được thực hiện với tốc độ 6MB / s.

Nếu có thể, hãy viết đầu ra của các công việc sang các hdfs EMR (để tận dụng các đổi tên gần như tức thời và tệp IO tốt hơn của các hdfs cục bộ) và thêm một bước dstcp để di chuyển các tệp sang S3, để tự cứu mình khỏi mọi rắc rối khi xử lý các bộ phận của một cửa hàng đối tượng đang cố gắng trở thành một hệ thống tập tin. Ngoài ra, viết vào hdfs cục bộ sẽ cho phép bạn kích hoạt đầu cơ để kiểm soát các tác vụ chạy trốn mà không rơi vào bẫy bế tắc liên quan đến DirectOutputCommiter.

Nếu bạn phải sử dụng S3 làm thư mục đầu ra, hãy đảm bảo rằng các cấu hình Spark sau được đặt

spark.hadoop.mapreduce.fileoutputcommitter.alerskym.version 2
tia lửa

Lưu ý: DirectParquetOutputCommitter bị xóa khỏi Spark 2.0 do khả năng mất dữ liệu. Thật không may cho đến khi chúng tôi đã cải thiện tính nhất quán từ S3a, chúng tôi phải làm việc với các cách giải quyết. Mọi thứ đang được cải thiện với Hadoop 2.8

Tránh các tên khóa theo thứ tự từ điển. Người ta có thể sử dụng tiền tố băm / ngẫu nhiên hoặc đảo ngược thời gian ngày để đi lại. Thủ thuật là đặt tên cho các khóa của bạn theo thứ bậc, đặt những thứ phổ biến nhất bạn lọc ở bên trái của khóa. Và không bao giờ có dấu gạch dưới trong tên xô do vấn đề DNS.

Kích hoạt song song fs.s3a.fast.up tải các phần của một tệp lên Amazon S3

Vâng, đó là đống rác của các vấn đề trong sản xuất mà tôi đã giải quyết gần đây để khiến Spark hoạt động với S3. Hãy theo dõi để biết thêm về điều này khi tôi đào sâu hơn trong bài tiếp theo