博客列表 >c#并发集合

c#并发集合

GODWEI的博客
GODWEI的博客原创
2018年06月17日 17:18:07885浏览
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.IO;
namespace HelloWorldApp
{
    class TestImmutable
    {
        public static async Task StartPiplelineAsync()
        {
            var filenames = new BlockingCollection<string>();
            var lines = new BlockingCollection<string>();
            var words = new ConcurrentDictionary<string, int>();
            var items = new BlockingCollection<Info>();
            var coloredItems =new BlockingCollection<Info>();
            Task t1 = PiplelineStages.ReadFilenamesAsync(@"../../..",filenames);
            ColoredConsole.WriteLine("start stage 1");
            Task t2 = PiplelineStages.LoadContentAsync(filenames,lines);
            ColoredConsole.WriteLine("start stage 2");
            Task t3 = PiplelineStages.ProcessContentAsync(lines,words);
            await Task.WhenAll(t1,t2,t3);
            ColoredConsole.WriteLine("stage 1,2,3 completed");
            
            Task t4 = PiplelineStages.TransferContentAsync(words, items);
            Task t5 = PiplelineStages.AddColorAsync(items,coloredItems);
            Task t6 = PiplelineStages.ShowContentAsync(coloredItems);
        }
    }
    class Info
    {
        public string Word { get; set; }
        public int Count { get; set; }
        public string Color { get; set; }
        public override string ToString() => $"{}times:{Word}";
    }
    static class PiplelineStages
    {
        public static Task ReadFilenamesAsync(string path,BlockingCollection<string> output)
        {
            return Task.Factory.StartNew(()=>
            {
                foreach(string filename in Directory.EnumerateFiles(path,"*.cs",SearchOption.AllDirectories))
                {
                    output.Add(filename);
                    ColoredConsole.WriteLine($"stage 1:added{filename}");
                }
                output.CompleteAdding();
            },TaskCreationOptions.LongRunning);
        }
        public static async Task LoadContentAsync(BlockingCollection<string> input,BlockingCollection<string> output)
        {
            //GetConsumingEnumerable()是必要的,读取的同时也在填充,该方法获得阻塞集合的枚举器
            foreach (var filename in input.GetConsumingEnumerable())
            {
                using(FileStream stream=File.OpenRead(filename))
                {
                    var reader = new StreamReader(stream);
                    string line = null;
                    while((line=await reader.ReadLineAsync())!=null)
                    {
                        output.Add(line);
                        ColoredConsole.WriteLine($"stage 2:added{line}");
                    }
                    //必须使用此方法,否则读取器会等待更多的项被添加
                    output.CompleteAdding();
                }
            }
        }
        public static Task ProcessContentAsync(BlockingCollection<string> input,ConcurrentDictionary<string,int> output)
        {
            return Task.Factory.StartNew(()=> {
                foreach(var line in input.GetConsumingEnumerable())
                {
                    string[] words = line.Split(' ', ';', '\t','{','}','(',')',':',',','"');
                    foreach(var word in words.Where(w=>!string.IsNullOrEmpty(w)))
                    {
                        output.AddOrUpdate(key:word,addValue:1,updateValueFactory:(s,i)=>++i);
                        ColoredConsole.WriteLine($"stage 3:added{word}");
                    }
                }
            },TaskCreationOptions.LongRunning);
        }
        public static Task TransferContentAsync(ConcurrentDictionary<string, int> input,BlockingCollection<Info> output)
        {
            return Task.Factory.StartNew(()=>
            {
                foreach (var word in input.Keys)
                {
                    int value;
                    if (input.TryGetValue(word,out value))
                    {
                        var info = new Info { Word=word,Count=value};
                        output.Add(info);
                        ColoredConsole.WriteLine($"stage 4:added{info}");
                    }
                }
                output.CompleteAdding();
            },TaskCreationOptions.LongRunning
            );
        }
        public static Task AddColorAsync(BlockingCollection<Info> input, BlockingCollection<Info> output)
        {
            return Task.Factory.StartNew(()=>
            {
                foreach (var item in input.GetConsumingEnumerable())
                {
                    if(item.Count>40)
                    {
                        item.Color = "Red";
                    }
                    else if(item.Count>20)
                    {
                        item.Color = "Yellow";
                    }
                    else
                    {
                        item.Color = "Green";
                    }
                    output.Add(item);
                    ColoredConsole.WriteLine($"stage 5:added color {item.Color} to {item}");
                }
                output.CompleteAdding();
            },TaskCreationOptions.LongRunning);
        }
        public static Task ShowContentAsync(BlockingCollection<Info> input)
        {
            return Task.Factory.StartNew(()=>
            {
                foreach(var item in input.GetConsumingEnumerable())
                {
                    ColoredConsole.WriteLine($"stage 6:{item}",item.Color);
                }
            },TaskCreationOptions.LongRunning);
        }
    }
    static class ColoredConsole
    {
        private static object syncOutput = new object();
        public static void WriteLine(string message)
        {
            lock(syncOutput)
            {
                Console.WriteLine(message);
            }
        }
        public static void WriteLine(string message,string color)
        {
            lock (syncOutput)
            {
                Console.ForegroundColor = (ConsoleColor)Enum.Parse(typeof(ConsoleColor),color);
                Console.WriteLine(message);
                Console.ResetColor();
            }
        }
    }
}

声明:本文内容转载自脚本之家,由网友自发贡献,版权归原作者所有,如您发现涉嫌抄袭侵权,请联系admin@php.cn 核实处理。
全部评论
文明上网理性发言,请遵守新闻评论服务协议