代码之家  ›  专栏  ›  技术社区  ›  dsp_099

使用C#和Redis从大量文本文件中导入和删除重复项

  •  1
  • dsp_099  · 技术社区  · 9 年前

    这有点糟糕,自从我使用C#以来已经有一段时间了,所以请容忍我:

    我正在运行一个jruby脚本来迭代900个文件(大小为5Mb-1500Mb),以确定这些(已经统一的)文件中仍然存在多少重复。我运气不好 awk .

    我的最新想法是将它们插入本地MongoDB实例,如下所示:

    db.collection('hashes').update({ :_id => hash}, { $inc: { count: 1} }, { upsert: true)

    db.collection.where({ count: { $gt: 1 } })

    这很有效,只是已经结束了 24小时 在写这篇文章的时候,我已经有72532927个蒙哥条目,而且还在增长。

    .each_line 正在阻碍IO硬核:

    enter image description here

    所以,我现在想的是编译一个C#程序,该程序启动一个线程PER EACH FILE,并将该行(md5哈希)插入到Redis列表中。

    从那里,我可以让另一个编译的C#程序简单地弹出值,如果计数为 1 .

    1. 使用编译的文件读取器和多线程读取文件会显著提高性能吗?

    2. 使用Redis是否有必要?有了大量的AWS内存,我能不能不使用线程原子化地填充某种列表并从那里继续?

    提前谢谢。

    2 回复  |  直到 5 年前
        1
  •  1
  •   Artavazd Balayan    9 年前

    已更新 New solution . Old solution 。主要思想是计算每行的伪哈希值(字符串中所有字符的总和)并将其存储在 Dictionary<ulong, List<LinePosition>> _hash2LinePositions 。同一个流中可能有多个哈希,它通过字典值中的列表进行求解。当散列相同时,我们读取并比较流中的字符串。LinePosition用于存储流中的行位置及其长度信息。我没有你那么大的文件,但我的测试表明它可以工作。以下是完整代码:

    using System;
    using System.Collections.Generic;
    using System.IO;
    using System.Linq;
    public class Solution
    {
        struct LinePosition
        {
            public long Start;
            public long Length;
    
            public LinePosition(long start, long count)
            {
                Start = start;
                Length = count;
            }
            public override string ToString()
            {
                return string.Format("Start: {0}, Length: {1}", Start, Length);
            }
        }
        class TextFileHasher : IDisposable
        {
            readonly Dictionary<ulong, List<LinePosition>> _hash2LinePositions;
            readonly Stream _stream;
            bool _isDisposed;
    
            public HashSet<ulong> Hashes { get; private set; }
            public string Name { get; private set; }
            public TextFileHasher(string name, Stream stream)
            {
                Name = name;
                _stream = stream;
                _hash2LinePositions = new Dictionary<ulong, List<LinePosition>>();
                Hashes = new HashSet<ulong>();
            }
            public override string ToString()
            {
                return Name;
            }
            public void CalculateFileHash()
            {
                int readByte = -1;
                ulong dummyLineHash = 0;
                // Line start position in file
                long startPosition = 0;
                while ((readByte = _stream.ReadByte()) != -1) {
                    // Read until new line
                    if (readByte == '\r' || readByte == '\n') {
                        // If there was data
                        if (dummyLineHash != 0) {
                            // Add line hash and line position to the dict
                            AddToDictAndHash(dummyLineHash, startPosition, _stream.Position - 1 - startPosition);
                            // Reset line hash
                            dummyLineHash = 0;
                        }
                    }
                    else {
                        // Was it new line ?
                        if (dummyLineHash == 0)
                            startPosition = _stream.Position - 1;
                        // Calculate dummy hash
                        dummyLineHash += (uint)readByte;
                    }
                }
                if (dummyLineHash != 0) {
                    // Add line hash and line position to the dict
                    AddToDictAndHash(dummyLineHash, startPosition, _stream.Position - startPosition);
                    // Reset line hash
                    dummyLineHash = 0;
                }
            }
            public List<LinePosition> GetLinePositions(ulong hash)
            {
                return _hash2LinePositions[hash];
            }
            public List<string> GetDuplicates()
            {
                List<string> duplicates = new List<string>();
    
                foreach (var key in _hash2LinePositions.Keys) {
                    List<LinePosition> linesPos = _hash2LinePositions[key];
                    if (linesPos.Count > 1) {
                        duplicates.AddRange(FindExactDuplicates(linesPos));
                    }
                }
                return duplicates;
            }
            public void Dispose()
            {
                if (_isDisposed)
                    return;
    
                _stream.Dispose();
                _isDisposed = true;
            }
            private void AddToDictAndHash(ulong hash, long start, long count)
            {
                List<LinePosition> linesPosition;
                if (!_hash2LinePositions.TryGetValue(hash, out linesPosition)) {
                    linesPosition = new List<LinePosition>() { new LinePosition(start, count) };
                    _hash2LinePositions.Add(hash, linesPosition);
                }
                else {
                    linesPosition.Add(new LinePosition(start, count));
                }
                Hashes.Add(hash);
            }
            public byte[] GetLineAsByteArray(LinePosition prevPos)
            {
                long len = prevPos.Length;
                byte[] lineBytes = new byte[len];
                _stream.Seek(prevPos.Start, SeekOrigin.Begin);
                _stream.Read(lineBytes, 0, (int)len);
                return lineBytes;
            }
            private List<string> FindExactDuplicates(List<LinePosition> linesPos)
            {
                List<string> duplicates = new List<string>();
                linesPos.Sort((x, y) => x.Length.CompareTo(y.Length));
    
                LinePosition prevPos = linesPos[0];
                for (int i = 1; i < linesPos.Count; i++) {
                    if (prevPos.Length == linesPos[i].Length) {
                        var prevLineArray = GetLineAsByteArray(prevPos);
                        var thisLineArray = GetLineAsByteArray(linesPos[i]);
    
                        if (prevLineArray.SequenceEqual(thisLineArray)) {
                            var line = System.Text.Encoding.Default.GetString(prevLineArray);
                            duplicates.Add(line);
                        }
    #if false
                        string prevLine = System.Text.Encoding.Default.GetString(prevLineArray);
                        string thisLine = System.Text.Encoding.Default.GetString(thisLineArray);
    
                        Console.WriteLine("PrevLine: {0}\r\nThisLine: {1}", prevLine, thisLine);
                        StringBuilder sb = new StringBuilder();
                        sb.Append(prevPos);
                        sb.Append(" is '");
                        sb.Append(prevLine);
                        sb.Append("'. ");
                        sb.AppendLine();
                        sb.Append(linesPos[i]);
                        sb.Append(" is '");
                        sb.Append(thisLine);
                        sb.AppendLine("'. ");
                        sb.Append("Equals => ");
                        sb.Append(prevLine.CompareTo(thisLine) == 0);
                        Console.WriteLine(sb.ToString());
    #endif
    
                    }
                    else {
                        prevPos = linesPos[i];
                    }
                }
                return duplicates;
            }
        }
        public static void Main(String[] args)
        {
            List<TextFileHasher> textFileHashers = new List<TextFileHasher>();
            string text1 = "abc\r\ncba\r\nabc";
            TextFileHasher tfh1 = new TextFileHasher("Text1", new MemoryStream(System.Text.Encoding.Default.GetBytes(text1)));
            tfh1.CalculateFileHash();
            textFileHashers.Add(tfh1);
    
            string text2 = "def\r\ncba\r\nwet";
            TextFileHasher tfh2 = new TextFileHasher("Text2", new MemoryStream(System.Text.Encoding.Default.GetBytes(text2)));
            tfh2.CalculateFileHash();
            textFileHashers.Add(tfh2);
    
            string text3 = "def\r\nbla\r\nwat";
            TextFileHasher tfh3 = new TextFileHasher("Text3", new MemoryStream(System.Text.Encoding.Default.GetBytes(text3)));
            tfh3.CalculateFileHash();
            textFileHashers.Add(tfh3);
    
            List<string> totalDuplicates = new List<string>();
    
            Dictionary<ulong, Dictionary<TextFileHasher, List<LinePosition>>> totalHashes = new Dictionary<ulong, Dictionary<TextFileHasher, List<LinePosition>>>();
            textFileHashers.ForEach(tfh => {
                foreach(var dummyHash in tfh.Hashes) {
                    Dictionary<TextFileHasher, List<LinePosition>> tfh2LinePositions = null;
                    if (!totalHashes.TryGetValue(dummyHash, out tfh2LinePositions))
                        totalHashes[dummyHash] = new Dictionary<TextFileHasher, List<LinePosition>>() { { tfh, tfh.GetLinePositions(dummyHash) } };
                    else {
                        List<LinePosition> linePositions = null;
                        if (!tfh2LinePositions.TryGetValue(tfh, out linePositions))
                            tfh2LinePositions[tfh] = tfh.GetLinePositions(dummyHash);
                        else
                            linePositions.AddRange(tfh.GetLinePositions(dummyHash));
                    }
                }
            });
    
            HashSet<TextFileHasher> alreadyGotDuplicates = new HashSet<TextFileHasher>();
    
            foreach(var hash in totalHashes.Keys) {
                var tfh2LinePositions = totalHashes[hash];
                var tfh = tfh2LinePositions.Keys.FirstOrDefault();
                // Get duplicates in the TextFileHasher itself
                if (tfh != null && !alreadyGotDuplicates.Contains(tfh)) {
                    totalDuplicates.AddRange(tfh.GetDuplicates());
                    alreadyGotDuplicates.Add(tfh);
                }
                if (tfh2LinePositions.Count <= 1) {
                    continue;
                }
                // Algo to get duplicates in more than 1 TextFileHashers
                var tfhs = tfh2LinePositions.Keys.ToArray();
                for (int i = 0; i < tfhs.Length; i++) {
                    var tfh1Positions = tfhs[i].GetLinePositions(hash);
                    for (int j = i + 1; j < tfhs.Length; j++) {
                        var tfh2Positions = tfhs[j].GetLinePositions(hash);
    
                        for (int k = 0; k < tfh1Positions.Count; k++) {
                            var tfh1Pos = tfh1Positions[k];
                            var tfh1ByteArray = tfhs[i].GetLineAsByteArray(tfh1Pos);
                            for (int m = 0; m < tfh2Positions.Count; m++) {
                                var tfh2Pos = tfh2Positions[m];
                                if (tfh1Pos.Length != tfh2Pos.Length)
                                    continue;
                                var tfh2ByteArray = tfhs[j].GetLineAsByteArray(tfh2Pos);
    
                                if (tfh1ByteArray.SequenceEqual(tfh2ByteArray)) {
                                    var line = System.Text.Encoding.Default.GetString(tfh1ByteArray);
                                    totalDuplicates.Add(line);
                                }
                            }
                        }
                    }
                }
            }
    
            Console.WriteLine();
            if (totalDuplicates.Count > 0) {
                Console.WriteLine("Total number of duplicates: {0}", totalDuplicates.Count);
                Console.WriteLine("#######################");
                totalDuplicates.ForEach(x => Console.WriteLine("{0}", x));
                Console.WriteLine("#######################");
            }
            // Free resources
            foreach (var tfh in textFileHashers)
                tfh.Dispose();
        }
    }
    
        2
  •  -1
  •   user2125311    3 年前

    如果你有很多公羊…你们想得太多了。。。

    var fileLines=文件.ReadAllLines(@“c:\File.csv”).Distinct();