首頁  >  文章  >  後端開發  >  Azure持久性功能:處理列表

Azure持久性功能:處理列表

王林
王林轉載
2024-02-22 12:25:10885瀏覽

Azure持久性功能:處理列表

問題內容

我有一個用 python 寫的 azure 持久函數,帶有一個協調器和兩個活動函數

orchestrator 呼叫第一個活動函數,並作為回報接收一個列表變數(名稱列表和此列表可以在每次執行函數時都是動態的)

下一步是為每個列表項呼叫第二個活動函數(順序處理 - 由於第二個活動函數呼叫的 api 限制)

#dynamically gets generated by the first activity function
payload=[1,2,3,4]            

tasks = [context.call_activity("secondfunction",ps) for ps in payload]
output = yield context.task_all(tasks)

我在扇出方法中使用的不是串行的,但我似乎無法找到我想要做的事情的替代方法。

此外,在 host.json 檔案中,我嘗試強制在給定時間只能執行一個活動函數,以避免並行處理

"extensions": {
    "durableTask": {
      "maxConcurrentActivityFunctions": 1,
      "maxConcurrentOrchestratorFunctions": 1
    }
  }

還值得注意的是,我無法將整個列表傳遞給活動函數,就好像我執行活動函數將花費超過 5-10 分鐘,這是 azure 函數的超時限制,因此嘗試迭代列表編排功能

但結果不是連續的

非常感謝您的回饋


正確答案


您可以嘗試使用以下兩種方法來實現您的要求:-

方法 1:-

#我的function_app.py#:-

import azure.functions as func
import azure.durable_functions as df

myapp = df.dfapp(http_auth_level=func.authlevel.anonymous)

# http starter
@myapp.route(route="orchestrators/{functionname}")
@myapp.durable_client_input(client_name="client")
async def http_start(req: func.httprequest, client):
    function_name = req.route_params.get('functionname')
    instance_id = await client.start_new(function_name, none)  # pass the functionname here
    response = client.create_check_status_response(req, instance_id)
    return response

# orchestrator
@myapp.orchestration_trigger(context_name="context")
def hello_orchestrator(context):
    cities = ["seattle", "tokyo", "london"]

    tasks = []
    for city in cities:
        tasks.append(context.call_activity("hello", city))

    # wait for all tasks to complete
    results = yield context.task_all(tasks)

    return results

# activity
@myapp.activity_trigger(input_name="city")
def hello(city: str):
    print(f"processing {city}...")
    # your activity function logic goes here
    result = f"hello {city}!"

    return result

輸出:-

函數 url:-

#
http://localhost:7071/api/orchestrators/hello_orchestrator

方法 2:-

#function_app.py:-

import azure.functions as func
import azure.durable_functions as df

myApp = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)

# HTTP Starter
@myApp.route(route="orchestrators/{functionName}")
@myApp.durable_client_input(client_name="client")
async def http_start(req: func.HttpRequest, client):
    function_name = req.route_params.get('functionName')
    instance_id = await client.start_new(function_name, None)  # Pass the functionName here
    response = client.create_check_status_response(req, instance_id)
    return response

# Orchestrator
@myApp.orchestration_trigger(context_name="context")
def hello_orchestrator(context):
    # Call the first activity to get a list of names
    names_list = yield context.call_activity("get_names")

    # Process each name sequentially using the second activity
    results = []
    for name in names_list:
        result = yield context.call_activity("process_name", name)
        results.append(result)

    return results

# First Activity
@myApp.activity_trigger
def get_names():
    # Your logic to retrieve a dynamic list of names goes here
    # For demonstration purposes, returning a hardcoded list
    return ["John", "Alice", "Bob"]

# Second Activity
@myApp.activity_trigger(input_name="name")
def process_name(name: str):
    print(f"Processing {name}...")
    # Your logic to process each name goes here
    result = f"Hello {name}!"

    return result

以上是Azure持久性功能:處理列表的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:stackoverflow.com。如有侵權,請聯絡admin@php.cn刪除