クライアントでは、Databricks で実行されるプロセスのコストを削減する必要がありました。 Databricks が担当した機能の 1 つは、さまざまな SFTP からファイルを収集し、それらを解凍して Data Lake に配置することでした。
データ ワークフローの自動化は、現代のデータ エンジニアリングにおいて重要な要素です。この記事では、GitLab CI/CD と Terraform を使用して、Go アプリケーションが SFTP サーバーに接続し、ファイルを収集して Amazon S3 に保存し、最終的に Databricks でジョブをトリガーできるようにする AWS Lambda 関数を作成する方法を説明します。このエンドツーエンドのプロセスは、効率的なデータ統合と自動化に依存するシステムにとって不可欠です。
まず、SFTP サーバーに接続してファイルを収集する Go アプリケーションを作成します。 SFTP 接続を確立するには github.com/pkg/sftp などのパッケージを使用し、AWS S3 サービスと対話するには github.com/aws/aws-sdk-go などのパッケージを使用します。
package main import ( "fmt" "log" "os" "path/filepath" "github.com/pkg/sftp" "golang.org/x/crypto/ssh" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3/s3manager" ) func main() { // Configuração do cliente SFTP user := "seu_usuario_sftp" pass := "sua_senha_sftp" host := "endereco_sftp:22" config := &ssh.ClientConfig{ User: user, Auth: []ssh.AuthMethod{ ssh.Password(pass), }, HostKeyCallback: ssh.InsecureIgnoreHostKey(), } // Conectar ao servidor SFTP conn, err := ssh.Dial("tcp", host, config) if err != nil { log.Fatal(err) } client, err := sftp.NewClient(conn) if err != nil { log.Fatal(err) } defer client.Close() // Baixar arquivos do SFTP remoteFilePath := "/path/to/remote/file" localDir := "/path/to/local/dir" localFilePath := filepath.Join(localDir, filepath.Base(remoteFilePath)) dstFile, err := os.Create(localFilePath) if err != nil { log.Fatal(err) } defer dstFile.Close() srcFile, err := client.Open(remoteFilePath) if err != nil { log.Fatal(err) } defer srcFile.Close() if _, err := srcFile.WriteTo(dstFile); err != nil { log.Fatal(err) } fmt.Println("Arquivo baixado com sucesso:", localFilePath) // Configuração do cliente S3 sess := session.Must(session.NewSession(&aws.Config{ Region: aws.String("us-west-2"), })) uploader := s3manager.NewUploader(sess) // Carregar arquivo para o S3 file, err := os.Open(localFilePath) if err != nil { log.Fatal(err) } defer file.Close() _, err = uploader.Upload(&s3manager.UploadInput{ Bucket: aws.String("seu-bucket-s3"), Key: aws.String(filepath.Base(localFilePath)), Body: file, }) if err != nil { log.Fatal("Falha ao carregar arquivo para o S3:", err) } fmt.Println("Arquivo carregado com sucesso no S3") }
Terraform は、Lambda 関数と必要なリソースを AWS にプロビジョニングするために使用されます。 Lambda 関数、IAM ポリシー、S3 バケットの作成に必要な設定を含む main.tf ファイルを作成します。
provider "aws" { region = "us-east-1" } resource "aws_iam_role" "lambda_execution_role" { name = "lambda_execution_role" assume_role_policy = jsonencode({ Version = "2012-10-17", Statement = [ { Action = "sts:AssumeRole", Effect = "Allow", Principal = { Service = "lambda.amazonaws.com" }, }, ] }) } resource "aws_iam_policy" "lambda_policy" { name = "lambda_policy" description = "A policy that allows a lambda function to access S3 and SFTP resources" policy = jsonencode({ Version = "2012-10-17", Statement = [ { Action = [ "s3:ListBucket", "s3:GetObject", "s3:PutObject", ], Effect = "Allow", Resource = [ "arn:aws:s3:::seu-bucket-s3", "arn:aws:s3:::seu-bucket-s3/*", ], }, ] }) } resource "aws_iam_role_policy_attachment" "lambda_policy_attachment" { role = aws_iam_role.lambda_execution_role.name policy_arn = aws_iam_policy.lambda_policy.arn } resource "aws_lambda_function" "sftp_lambda" { function_name = "sftp_lambda_function" s3_bucket = "seu-bucket-s3-com-codigo-lambda" s3_key = "sftp-lambda.zip" handler = "main" runtime = "go1.x" role = aws_iam_role.lambda_execution_role.arn environment { variables = { SFTP_HOST = "endereco_sftp", SFTP_USER = "seu_usuario_sftp", SFTP_PASSWORD = "sua_senha_sftp", S3_BUCKET = "seu-bucket-s3", } } } resource "aws_s3_bucket" "s3_bucket" { bucket = "seu-bucket-s3" acl = "private" }
GitLab で、.gitlab-ci.yml ファイルに CI/CD パイプラインを定義します。このパイプラインには、Go アプリケーションをテストするステップ、Terraform を実行してインフラストラクチャをプロビジョニングするステップ、および必要に応じてクリーンアップするステップが含まれている必要があります。
stages: - test - build - deploy variables: S3_BUCKET: "seu-bucket-s3" AWS_DEFAULT_REGION: "us-east-1" TF_VERSION: "1.0.0" before_script: - 'which ssh-agent || ( apt-get update -y && apt-get install openssh-client -y )' - eval $(ssh-agent -s) - echo "$PRIVATE_KEY" | tr -d '\r' | ssh-add - - mkdir -p ~/.ssh - chmod 700 ~/.ssh - ssh-keyscan -H 'endereco_sftp' >> ~/.ssh/known_hosts test: stage: test image: golang:1.18 script: - go test -v ./... build: stage: build image: golang:1.18 script: - go build -o myapp - zip -r sftp-lambda.zip myapp artifacts: paths: - sftp-lambda.zip only: - master deploy: stage: deploy image: hashicorp/terraform:$TF_VERSION script: - terraform init - terraform apply -auto-approve only: - master environment: name: production
S3 にファイルをアップロードした後、Lambda 関数は Databricks でジョブをトリガーする必要があります。これは、Databricks API を使用して既存のジョブを起動することで実行できます。
package main import ( "bytes" "encoding/json" "fmt" "net/http" ) // Estrutura para a requisição de iniciar um job no Databricks type DatabricksJobRequest struct { JobID int `json:"job_id"` } // Função para acionar um job no Databricks func triggerDatabricksJob(databricksInstance string, token string, jobID int) error { url := fmt.Sprintf("https://%s/api/2.0/jobs/run-now", databricksInstance) requestBody, _ := json.Marshal(DatabricksJobRequest{JobID: jobID}) req, err := http.NewRequest("POST", url, bytes.NewBuffer(requestBody)) if err != nil { return err } req.Header.Set("Content-Type", "application/json") req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token)) client := &http.Client{} resp, err := client.Do(req) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return fmt.Errorf("Failed to trigger Databricks job, status code: %d", resp.StatusCode) } return nil } func main() { // ... (código existente para conectar ao SFTP e carregar no S3) // Substitua pelos seus valores reais databricksInstance := "your-databricks-instance" databricksToken := "your-databricks-token" databricksJobID := 123 // ID do job que você deseja acionar // Acionar o job no Databricks após o upload para o S3 err := triggerDatabricksJob(databricksInstance, databricksToken, databricksJobID) if err != nil { log.Fatal("Erro ao acionar o job do Databricks:", err) } fmt.Println("Job do Databricks acionado com sucesso") }
パイプラインを実行するためにコードを GitLab リポジトリにプッシュします。すべての手順が正常に完了し、Lambda 関数が動作し、S3 および Databricks と正しく対話していることを確認してください。
完全なコードと .gitlab-ci.yml ファイルを設定したら、次の手順に従ってパイプラインを実行できます。
package main import ( "fmt" "log" "os" "path/filepath" "github.com/pkg/sftp" "golang.org/x/crypto/ssh" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3/s3manager" ) func main() { // Configuração do cliente SFTP user := "seu_usuario_sftp" pass := "sua_senha_sftp" host := "endereco_sftp:22" config := &ssh.ClientConfig{ User: user, Auth: []ssh.AuthMethod{ ssh.Password(pass), }, HostKeyCallback: ssh.InsecureIgnoreHostKey(), } // Conectar ao servidor SFTP conn, err := ssh.Dial("tcp", host, config) if err != nil { log.Fatal(err) } client, err := sftp.NewClient(conn) if err != nil { log.Fatal(err) } defer client.Close() // Baixar arquivos do SFTP remoteFilePath := "/path/to/remote/file" localDir := "/path/to/local/dir" localFilePath := filepath.Join(localDir, filepath.Base(remoteFilePath)) dstFile, err := os.Create(localFilePath) if err != nil { log.Fatal(err) } defer dstFile.Close() srcFile, err := client.Open(remoteFilePath) if err != nil { log.Fatal(err) } defer srcFile.Close() if _, err := srcFile.WriteTo(dstFile); err != nil { log.Fatal(err) } fmt.Println("Arquivo baixado com sucesso:", localFilePath) // Configuração do cliente S3 sess := session.Must(session.NewSession(&aws.Config{ Region: aws.String("us-west-2"), })) uploader := s3manager.NewUploader(sess) // Carregar arquivo para o S3 file, err := os.Open(localFilePath) if err != nil { log.Fatal(err) } defer file.Close() _, err = uploader.Upload(&s3manager.UploadInput{ Bucket: aws.String("seu-bucket-s3"), Key: aws.String(filepath.Base(localFilePath)), Body: file, }) if err != nil { log.Fatal("Falha ao carregar arquivo para o S3:", err) } fmt.Println("Arquivo carregado com sucesso no S3") }
provider "aws" { region = "us-east-1" } resource "aws_iam_role" "lambda_execution_role" { name = "lambda_execution_role" assume_role_policy = jsonencode({ Version = "2012-10-17", Statement = [ { Action = "sts:AssumeRole", Effect = "Allow", Principal = { Service = "lambda.amazonaws.com" }, }, ] }) } resource "aws_iam_policy" "lambda_policy" { name = "lambda_policy" description = "A policy that allows a lambda function to access S3 and SFTP resources" policy = jsonencode({ Version = "2012-10-17", Statement = [ { Action = [ "s3:ListBucket", "s3:GetObject", "s3:PutObject", ], Effect = "Allow", Resource = [ "arn:aws:s3:::seu-bucket-s3", "arn:aws:s3:::seu-bucket-s3/*", ], }, ] }) } resource "aws_iam_role_policy_attachment" "lambda_policy_attachment" { role = aws_iam_role.lambda_execution_role.name policy_arn = aws_iam_policy.lambda_policy.arn } resource "aws_lambda_function" "sftp_lambda" { function_name = "sftp_lambda_function" s3_bucket = "seu-bucket-s3-com-codigo-lambda" s3_key = "sftp-lambda.zip" handler = "main" runtime = "go1.x" role = aws_iam_role.lambda_execution_role.arn environment { variables = { SFTP_HOST = "endereco_sftp", SFTP_USER = "seu_usuario_sftp", SFTP_PASSWORD = "sua_senha_sftp", S3_BUCKET = "seu-bucket-s3", } } } resource "aws_s3_bucket" "s3_bucket" { bucket = "seu-bucket-s3" acl = "private" }
アクセス トークンや秘密キーなどの機密情報を保存するには、GitLab CI/CD で環境変数を構成する必要があることに注意してください。これは、[設定] > [設定] で行うことができます。 「CI/CD」> GitLab プロジェクトの「変数」。
また、Databricks トークンにジョブをトリガーするために必要な権限があること、および指定された ID でジョブが存在することを確認してください。
GitLab CI/CD、Terraform、AWS Lambda などのツールを使用すると、データ エンジニアリング タスクの自動化を大幅に簡素化できます。この記事で説明する手順に従うことで、Go の効率性とシンプルさを備えた SFTP、S3、Databricks 間のデータ収集と統合を自動化する堅牢なシステムを作成できます。このアプローチにより、次のような問題に対処する準備が整います。大規模なデータ統合の課題
私の連絡先:
LinkedIn - エアトン リラ ジュニア
iMasters - エアトン・リラ・ジュニア
stages: - test - build - deploy variables: S3_BUCKET: "seu-bucket-s3" AWS_DEFAULT_REGION: "us-east-1" TF_VERSION: "1.0.0" before_script: - 'which ssh-agent || ( apt-get update -y && apt-get install openssh-client -y )' - eval $(ssh-agent -s) - echo "$PRIVATE_KEY" | tr -d '\r' | ssh-add - - mkdir -p ~/.ssh - chmod 700 ~/.ssh - ssh-keyscan -H 'endereco_sftp' >> ~/.ssh/known_hosts test: stage: test image: golang:1.18 script: - go test -v ./... build: stage: build image: golang:1.18 script: - go build -o myapp - zip -r sftp-lambda.zip myapp artifacts: paths: - sftp-lambda.zip only: - master deploy: stage: deploy image: hashicorp/terraform:$TF_VERSION script: - terraform init - terraform apply -auto-approve only: - master environment: name: production
以上がGitLab CI/CD および SFTP 統合のための Terraform を使用した Lambda の実装、Go の S Databricksの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。