using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using System.IO; namespace HelloWorldApp { class TestImmutable { public static async Task StartPiplelineAsync() { var filenames = new BlockingCollection<string>(); var lines = new BlockingCollection<string>(); var words = new ConcurrentDictionary<string, int>(); var items = new BlockingCollection<Info>(); var coloredItems =new BlockingCollection<Info>(); Task t1 = PiplelineStages.ReadFilenamesAsync(@"../../..",filenames); ColoredConsole.WriteLine("start stage 1"); Task t2 = PiplelineStages.LoadContentAsync(filenames,lines); ColoredConsole.WriteLine("start stage 2"); Task t3 = PiplelineStages.ProcessContentAsync(lines,words); await Task.WhenAll(t1,t2,t3); ColoredConsole.WriteLine("stage 1,2,3 completed"); Task t4 = PiplelineStages.TransferContentAsync(words, items); Task t5 = PiplelineStages.AddColorAsync(items,coloredItems); Task t6 = PiplelineStages.ShowContentAsync(coloredItems); } } class Info { public string Word { get; set; } public int Count { get; set; } public string Color { get; set; } public override string ToString() => $"{}times:{Word}"; } static class PiplelineStages { public static Task ReadFilenamesAsync(string path,BlockingCollection<string> output) { return Task.Factory.StartNew(()=> { foreach(string filename in Directory.EnumerateFiles(path,"*.cs",SearchOption.AllDirectories)) { output.Add(filename); ColoredConsole.WriteLine($"stage 1:added{filename}"); } output.CompleteAdding(); },TaskCreationOptions.LongRunning); } public static async Task LoadContentAsync(BlockingCollection<string> input,BlockingCollection<string> output) { //GetConsumingEnumerable()是必要的,读取的同时也在填充,该方法获得阻塞集合的枚举器 foreach (var filename in input.GetConsumingEnumerable()) { using(FileStream stream=File.OpenRead(filename)) { var reader = new StreamReader(stream); string line = null; while((line=await reader.ReadLineAsync())!=null) { output.Add(line); ColoredConsole.WriteLine($"stage 2:added{line}"); } //必须使用此方法,否则读取器会等待更多的项被添加 output.CompleteAdding(); } } } public static Task ProcessContentAsync(BlockingCollection<string> input,ConcurrentDictionary<string,int> output) { return Task.Factory.StartNew(()=> { foreach(var line in input.GetConsumingEnumerable()) { string[] words = line.Split(' ', ';', '\t','{','}','(',')',':',',','"'); foreach(var word in words.Where(w=>!string.IsNullOrEmpty(w))) { output.AddOrUpdate(key:word,addValue:1,updateValueFactory:(s,i)=>++i); ColoredConsole.WriteLine($"stage 3:added{word}"); } } },TaskCreationOptions.LongRunning); } public static Task TransferContentAsync(ConcurrentDictionary<string, int> input,BlockingCollection<Info> output) { return Task.Factory.StartNew(()=> { foreach (var word in input.Keys) { int value; if (input.TryGetValue(word,out value)) { var info = new Info { Word=word,Count=value}; output.Add(info); ColoredConsole.WriteLine($"stage 4:added{info}"); } } output.CompleteAdding(); },TaskCreationOptions.LongRunning ); } public static Task AddColorAsync(BlockingCollection<Info> input, BlockingCollection<Info> output) { return Task.Factory.StartNew(()=> { foreach (var item in input.GetConsumingEnumerable()) { if(item.Count>40) { item.Color = "Red"; } else if(item.Count>20) { item.Color = "Yellow"; } else { item.Color = "Green"; } output.Add(item); ColoredConsole.WriteLine($"stage 5:added color {item.Color} to {item}"); } output.CompleteAdding(); },TaskCreationOptions.LongRunning); } public static Task ShowContentAsync(BlockingCollection<Info> input) { return Task.Factory.StartNew(()=> { foreach(var item in input.GetConsumingEnumerable()) { ColoredConsole.WriteLine($"stage 6:{item}",item.Color); } },TaskCreationOptions.LongRunning); } } static class ColoredConsole { private static object syncOutput = new object(); public static void WriteLine(string message) { lock(syncOutput) { Console.WriteLine(message); } } public static void WriteLine(string message,string color) { lock (syncOutput) { Console.ForegroundColor = (ConsoleColor)Enum.Parse(typeof(ConsoleColor),color); Console.WriteLine(message); Console.ResetColor(); } } } }