All Articles

C#

Sliding Stream

Sliding Stream

Buffer, Offset, Count 형태로 읽을 수 있을 경우에 읽고 쓰기 가능한 자료구조 

 

구현 이유

- NetCoreServer를 이용해서 OnReceived할 때, 읽는 속도보다 쌓이는 속도가 빨라서, Receive 버퍼에 현재 데이터만 쌓이는 게 아니라 다음 데이터까지 읽게 되는 문제가 생김

- Send할때 길이를 지정해주고 Received쪽에서 읽기 위해 SlidingStream으로 해당 길이만큼 읽을 수 있을 때 읽게함

 

특징

- ArraySegment로 Write할때는 빠르게 Write하게함

- Read단에서 읽을 수 있는지 판단하여 반환하도록함

- Read가 가능할 때, 읽어서 반환

- Read가 불가능할 때, 스트림에 다시 Write하고 0 반환

- Write는 들어온 데이터 그대로 ArraySegment 형식으로 저장

 

TODO:

- Length가 ArraySegement의 길이가 아니라 전체 count를 세서 반환해야함

 

// Reference from:
// https://stackoverflow.com/questions/8221136/fifo-queue-buffer-specialising-in-byte-streams
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;

namespace SettingNetwork.Util
{
    public class SlidingStream : Stream
    {
        #region Other stream member implementations
        public override bool CanRead => true;
        public override bool CanSeek => false;
        public override bool CanWrite => true;

        public override long Length => _pendingSegments.Count;

        public override long Position 
        {
            get => 0;
            set => throw new NotImplementedException();
        }

        public override void Flush()
        {
            _pendingSegments.Clear();
        }

        public override long Seek(long offset, SeekOrigin origin)
        {
            throw new NotImplementedException();
        }

        public override void SetLength(long value)
        {
            throw new NotImplementedException();
        }

        #endregion Other stream member implementations

        public SlidingStream()
        {
            ReadTimeout = -1;
        }

        private readonly object _writeSyncRoot = new object();
        private readonly object _readSyncRoot = new object();
        private readonly LinkedList<ArraySegment<byte>> _pendingSegments = new LinkedList<ArraySegment<byte>>();
        private readonly ManualResetEventSlim _dataAvailableResetEvent = new ManualResetEventSlim();


        public override int ReadTimeout { get; set; }

        public override int Read(byte[] buffer, int offset, int count)
        {
            lock (_readSyncRoot)
            {
                int currentCount = 0;

                while (currentCount < count)
                {
                    ArraySegment<byte> segment = _pendingSegments.First.Value;
                    _pendingSegments.RemoveFirst();

                    int index = segment.Offset;
                    for (; index < segment.Count; index++)
                    {
                        if (currentCount == count)
                        {
                            break;
                        }
                        buffer[currentCount] = segment.Array[index];
                        currentCount++;
                    }

                    if (currentCount == count)
                    {
                        if (index < segment.Offset + segment.Count)
                        {
                            int remainCount = segment.Count - index;
                            byte[] copy = new byte[remainCount];
                            Array.Copy(segment.Array, index, copy, 0, remainCount);
                            _pendingSegments.AddFirst(new ArraySegment<byte>(copy, 0, remainCount));
                        }

                        return currentCount;
                    }

                    if (_pendingSegments.Count == 0)
                    {
                        lock (_writeSyncRoot)
                        {
                            byte[] copy = new byte[count];
                            Array.Copy(buffer, 0, copy, 0, currentCount);
                            _pendingSegments.AddFirst(new ArraySegment<byte>(copy));
                        }
                        return 0;
                    }
                }

                return currentCount;
            }
        }

        public override void Write(byte[] buffer, int offset, int count)
        {
            lock (_writeSyncRoot)
            {
                byte[] copy = new byte[count];
                Array.Copy(buffer, offset, copy, 0, count);

                _pendingSegments.AddLast(new ArraySegment<byte>(copy));
            }
        }

    }
}