我有一個用 python 寫的 azure 持久函數,帶有一個協調器和兩個活動函數
orchestrator 呼叫第一個活動函數,並作為回報接收一個列表變數(名稱列表和此列表可以在每次執行函數時都是動態的)
下一步是為每個列表項呼叫第二個活動函數(順序處理 - 由於第二個活動函數呼叫的 api 限制)
#dynamically gets generated by the first activity function payload=[1,2,3,4] tasks = [context.call_activity("secondfunction",ps) for ps in payload] output = yield context.task_all(tasks)
我在扇出方法中使用的不是串行的,但我似乎無法找到我想要做的事情的替代方法。
此外,在 host.json 檔案中,我嘗試強制在給定時間只能執行一個活動函數,以避免並行處理
"extensions": { "durableTask": { "maxConcurrentActivityFunctions": 1, "maxConcurrentOrchestratorFunctions": 1 } }
還值得注意的是,我無法將整個列表傳遞給活動函數,就好像我執行活動函數將花費超過 5-10 分鐘,這是 azure 函數的超時限制,因此嘗試迭代列表編排功能
但結果不是連續的
非常感謝您的回饋
您可以嘗試使用以下兩種方法來實現您的要求:-
方法 1:-
#我的function_app.py#:-
import azure.functions as func import azure.durable_functions as df myapp = df.dfapp(http_auth_level=func.authlevel.anonymous) # http starter @myapp.route(route="orchestrators/{functionname}") @myapp.durable_client_input(client_name="client") async def http_start(req: func.httprequest, client): function_name = req.route_params.get('functionname') instance_id = await client.start_new(function_name, none) # pass the functionname here response = client.create_check_status_response(req, instance_id) return response # orchestrator @myapp.orchestration_trigger(context_name="context") def hello_orchestrator(context): cities = ["seattle", "tokyo", "london"] tasks = [] for city in cities: tasks.append(context.call_activity("hello", city)) # wait for all tasks to complete results = yield context.task_all(tasks) return results # activity @myapp.activity_trigger(input_name="city") def hello(city: str): print(f"processing {city}...") # your activity function logic goes here result = f"hello {city}!" return result
輸出:-
函數 url:-
#http://localhost:7071/api/orchestrators/hello_orchestrator
方法 2:-
#function_app.py:-
import azure.functions as func import azure.durable_functions as df myApp = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS) # HTTP Starter @myApp.route(route="orchestrators/{functionName}") @myApp.durable_client_input(client_name="client") async def http_start(req: func.HttpRequest, client): function_name = req.route_params.get('functionName') instance_id = await client.start_new(function_name, None) # Pass the functionName here response = client.create_check_status_response(req, instance_id) return response # Orchestrator @myApp.orchestration_trigger(context_name="context") def hello_orchestrator(context): # Call the first activity to get a list of names names_list = yield context.call_activity("get_names") # Process each name sequentially using the second activity results = [] for name in names_list: result = yield context.call_activity("process_name", name) results.append(result) return results # First Activity @myApp.activity_trigger def get_names(): # Your logic to retrieve a dynamic list of names goes here # For demonstration purposes, returning a hardcoded list return ["John", "Alice", "Bob"] # Second Activity @myApp.activity_trigger(input_name="name") def process_name(name: str): print(f"Processing {name}...") # Your logic to process each name goes here result = f"Hello {name}!" return result
以上是Azure持久性功能:處理列表的詳細內容。更多資訊請關注PHP中文網其他相關文章!