Newer
Older
PrismSoftware / packages / cpprestsdk.v141.2.10.12.1 / build / native / include / cpprest / astreambuf.h
/***
 * Copyright (C) Microsoft. All rights reserved.
 * Licensed under the MIT license. See LICENSE.txt file in the project root for full license information.
 *
 * =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
 *
 * Asynchronous I/O: stream buffer. This is an extension to the PPL concurrency features and therefore
 * lives in the Concurrency namespace.
 *
 * For the latest on this and related APIs, please see: https://github.com/Microsoft/cpprestsdk
 *
 * =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
 ****/
#pragma once

#include "cpprest/asyncrt_utils.h"
#include "cpprest/details/basic_types.h"
#include "pplx/pplxtasks.h"
#include <atomic>
#include <cstring>
#include <ios>
#include <math.h>
#include <memory>

#if (defined(_MSC_VER) && (_MSC_VER >= 1800)) && !CPPREST_FORCE_PPLX
namespace Concurrency // since namespace pplx = Concurrency
#else
namespace pplx
#endif
{
namespace details
{
template<class F, class T = bool>
pplx::task<T> _do_while(F func)
{
    pplx::task<T> first = func();
    return first.then([=](bool guard) -> pplx::task<T> {
        if (guard)
            return pplx::details::_do_while<F, T>(func);
        else
            return first;
    });
}
} // namespace details
}

namespace Concurrency
{
/// Library for asynchronous streams.
namespace streams
{
/// <summary>
/// Extending the standard char_traits type with one that adds values and types
/// that are unique to "C++ REST SDK" streams.
/// </summary>
/// <typeparam name="_CharType">
/// The data type of the basic element of the stream.
/// </typeparam>
template<typename _CharType>
struct char_traits : std::char_traits<_CharType>
{
    /// <summary>
    /// Some synchronous functions will return this value if the operation
    /// requires an asynchronous call in a given situation.
    /// </summary>
    /// <returns>An <c>int_type</c> value which implies that an asynchronous call is required.</returns>
    static typename std::char_traits<_CharType>::int_type requires_async()
    {
        return std::char_traits<_CharType>::eof() - 1;
    }
};
#if !defined(_WIN32)
template<>
struct char_traits<unsigned char> : private std::char_traits<char>
{
public:
    typedef unsigned char char_type;

    using std::char_traits<char>::eof;
    using std::char_traits<char>::int_type;
    using std::char_traits<char>::off_type;
    using std::char_traits<char>::pos_type;

    static size_t length(const unsigned char* str)
    {
        return std::char_traits<char>::length(reinterpret_cast<const char*>(str));
    }

    static void assign(unsigned char& left, const unsigned char& right) { left = right; }
    static unsigned char* assign(unsigned char* left, size_t n, unsigned char value)
    {
        return reinterpret_cast<unsigned char*>(
            std::char_traits<char>::assign(reinterpret_cast<char*>(left), n, static_cast<char>(value)));
    }

    static unsigned char* copy(unsigned char* left, const unsigned char* right, size_t n)
    {
        return reinterpret_cast<unsigned char*>(
            std::char_traits<char>::copy(reinterpret_cast<char*>(left), reinterpret_cast<const char*>(right), n));
    }

    static unsigned char* move(unsigned char* left, const unsigned char* right, size_t n)
    {
        return reinterpret_cast<unsigned char*>(
            std::char_traits<char>::move(reinterpret_cast<char*>(left), reinterpret_cast<const char*>(right), n));
    }

    static int_type requires_async() { return eof() - 1; }
};
#endif

namespace details
{
/// <summary>
/// Stream buffer base class.
/// </summary>
template<typename _CharType>
class basic_streambuf
{
public:
    typedef _CharType char_type;
    typedef ::concurrency::streams::char_traits<_CharType> traits;

    typedef typename traits::int_type int_type;
    typedef typename traits::pos_type pos_type;
    typedef typename traits::off_type off_type;

    /// <summary>
    /// Virtual constructor for stream buffers.
    /// </summary>
    virtual ~basic_streambuf() {}

    /// <summary>
    /// <c>can_read</c> is used to determine whether a stream buffer will support read operations (get).
    /// </summary>
    virtual bool can_read() const = 0;

    /// <summary>
    /// <c>can_write</c> is used to determine whether a stream buffer will support write operations (put).
    /// </summary>
    virtual bool can_write() const = 0;

    /// <summary>
    /// <c>can_seek<c/> is used to determine whether a stream buffer supports seeking.
    /// </summary>
    virtual bool can_seek() const = 0;

    /// <summary>
    /// <c>has_size<c/> is used to determine whether a stream buffer supports size().
    /// </summary>
    virtual bool has_size() const = 0;

    /// <summary>
    /// <c>is_eof</c> is used to determine whether a read head has reached the end of the buffer.
    /// </summary>
    virtual bool is_eof() const = 0;

    /// <summary>
    /// Gets the stream buffer size, if one has been set.
    /// </summary>
    /// <param name="direction">The direction of buffering (in or out)</param>
    /// <returns>The size of the internal buffer (for the given direction).</returns>
    /// <remarks>An implementation that does not support buffering will always return 0.</remarks>
    virtual size_t buffer_size(std::ios_base::openmode direction = std::ios_base::in) const = 0;

    /// <summary>
    /// Sets the stream buffer implementation to buffer or not buffer.
    /// </summary>
    /// <param name="size">The size to use for internal buffering, 0 if no buffering should be done.</param>
    /// <param name="direction">The direction of buffering (in or out)</param>
    /// <remarks>An implementation that does not support buffering will silently ignore calls to this function and it
    /// will not have any effect on what is returned by subsequent calls to <see cref="::buffer_size method"
    /// />.</remarks>
    virtual void set_buffer_size(size_t size, std::ios_base::openmode direction = std::ios_base::in) = 0;

    /// <summary>
    /// For any input stream, <c>in_avail</c> returns the number of characters that are immediately available
    /// to be consumed without blocking. May be used in conjunction with <cref="::sbumpc method"/> to read data without
    /// incurring the overhead of using tasks.
    /// </summary>
    virtual size_t in_avail() const = 0;

    /// <summary>
    /// Checks if the stream buffer is open.
    /// </summary>
    /// <remarks>No separation is made between open for reading and open for writing.</remarks>
    virtual bool is_open() const = 0;

    /// <summary>
    /// Closes the stream buffer, preventing further read or write operations.
    /// </summary>
    /// <param name="mode">The I/O mode (in or out) to close for.</param>
    virtual pplx::task<void> close(std::ios_base::openmode mode = (std::ios_base::in | std::ios_base::out)) = 0;

    /// <summary>
    /// Closes the stream buffer with an exception.
    /// </summary>
    /// <param name="mode">The I/O mode (in or out) to close for.</param>
    /// <param name="eptr">Pointer to the exception.</param>
    virtual pplx::task<void> close(std::ios_base::openmode mode, std::exception_ptr eptr) = 0;

    /// <summary>
    /// Writes a single character to the stream.
    /// </summary>
    /// <param name="ch">The character to write</param>
    /// <returns>A <c>task</c> that holds the value of the character. This value is EOF if the write operation
    /// fails.</returns>
    virtual pplx::task<int_type> putc(_CharType ch) = 0;

    /// <summary>
    /// Writes a number of characters to the stream.
    /// </summary>
    /// <param name="ptr">A pointer to the block of data to be written.</param>
    /// <param name="count">The number of characters to write.</param>
    /// <returns>A <c>task</c> that holds the number of characters actually written, either 'count' or 0.</returns>
    virtual pplx::task<size_t> putn(const _CharType* ptr, size_t count) = 0;

    /// <summary>
    /// Writes a number of characters to the stream. Note: callers must make sure the data to be written is valid until
    /// the returned task completes.
    /// </summary>
    /// <param name="ptr">A pointer to the block of data to be written.</param>
    /// <param name="count">The number of characters to write.</param>
    /// <returns>A <c>task</c> that holds the number of characters actually written, either 'count' or 0.</returns>
    virtual pplx::task<size_t> putn_nocopy(const _CharType* ptr, size_t count) = 0;

    /// <summary>
    /// Reads a single character from the stream and advances the read position.
    /// </summary>
    /// <returns>A <c>task</c> that holds the value of the character. This value is EOF if the read fails.</returns>
    virtual pplx::task<int_type> bumpc() = 0;

    /// <summary>
    /// Reads a single character from the stream and advances the read position.
    /// </summary>
    /// <returns>The value of the character. <c>-1</c> if the read fails. <c>-2</c> if an asynchronous read is
    /// required</returns> <remarks>This is a synchronous operation, but is guaranteed to never block.</remarks>
    virtual int_type sbumpc() = 0;

    /// <summary>
    /// Reads a single character from the stream without advancing the read position.
    /// </summary>
    /// <returns>A <c>task</c> that holds the value of the byte. This value is EOF if the read fails.</returns>
    virtual pplx::task<int_type> getc() = 0;

    /// <summary>
    /// Reads a single character from the stream without advancing the read position.
    /// </summary>
    /// <returns>The value of the character. EOF if the read fails. <see cref="::requires_async method" /> if an
    /// asynchronous read is required</returns> <remarks>This is a synchronous operation, but is guaranteed to never
    /// block.</remarks>
    virtual int_type sgetc() = 0;

    /// <summary>
    /// Advances the read position, then returns the next character without advancing again.
    /// </summary>
    /// <returns>A <c>task</c> that holds the value of the character. This value is EOF if the read fails.</returns>
    virtual pplx::task<int_type> nextc() = 0;

    /// <summary>
    /// Retreats the read position, then returns the current character without advancing.
    /// </summary>
    /// <returns>A <c>task</c> that holds the value of the character. This value is EOF if the read fails,
    /// <c>requires_async</c> if an asynchronous read is required</returns>
    virtual pplx::task<int_type> ungetc() = 0;

    /// <summary>
    /// Reads up to a given number of characters from the stream.
    /// </summary>
    /// <param name="ptr">The address of the target memory area.</param>
    /// <param name="count">The maximum number of characters to read.</param>
    /// <returns>A <c>task</c> that holds the number of characters read. This value is O if the end of the stream is
    /// reached.</returns>
    virtual pplx::task<size_t> getn(_Out_writes_(count) _CharType* ptr, _In_ size_t count) = 0;

    /// <summary>
    /// Copies up to a given number of characters from the stream, synchronously.
    /// </summary>
    /// <param name="ptr">The address of the target memory area.</param>
    /// <param name="count">The maximum number of characters to read.</param>
    /// <returns>The number of characters copied. O if the end of the stream is reached or an asynchronous read is
    /// required.</returns> <remarks>This is a synchronous operation, but is guaranteed to never block.</remarks>
    virtual size_t scopy(_Out_writes_(count) _CharType* ptr, _In_ size_t count) = 0;

    /// <summary>
    /// Gets the current read or write position in the stream.
    /// </summary>
    /// <param name="direction">The I/O direction to seek (see remarks)</param>
    /// <returns>The current position. EOF if the operation fails.</returns>
    /// <remarks>Some streams may have separate write and read cursors.
    ///          For such streams, the direction parameter defines whether to move the read or the write
    ///          cursor.</remarks>
    virtual pos_type getpos(std::ios_base::openmode direction) const = 0;

    /// <summary>
    /// Gets the size of the stream, if known. Calls to <c>has_size</c> will determine whether
    /// the result of <c>size</c> can be relied on.
    /// </summary>
    virtual utility::size64_t size() const = 0;

    /// <summary>
    /// Seeks to the given position.
    /// </summary>
    /// <param name="pos">The offset from the beginning of the stream.</param>
    /// <param name="direction">The I/O direction to seek (see remarks).</param>
    /// <returns>The position. EOF if the operation fails.</returns>
    /// <remarks>Some streams may have separate write and read cursors. For such streams, the direction parameter
    /// defines whether to move the read or the write cursor.</remarks>
    virtual pos_type seekpos(pos_type pos, std::ios_base::openmode direction) = 0;

    /// <summary>
    /// Seeks to a position given by a relative offset.
    /// </summary>
    /// <param name="offset">The relative position to seek to</param>
    /// <param name="way">The starting point (beginning, end, current) for the seek.</param>
    /// <param name="mode">The I/O direction to seek (see remarks)</param>
    /// <returns>The position. EOF if the operation fails.</returns>
    /// <remarks>Some streams may have separate write and read cursors.
    ///          For such streams, the mode parameter defines whether to move the read or the write cursor.</remarks>
    virtual pos_type seekoff(off_type offset, std::ios_base::seekdir way, std::ios_base::openmode mode) = 0;

    /// <summary>
    /// For output streams, flush any internally buffered data to the underlying medium.
    /// </summary>
    /// <returns>A <c>task</c> that returns <c>true</c> if the sync succeeds, <c>false</c> if not.</returns>
    virtual pplx::task<void> sync() = 0;

    //
    // Efficient read and write.
    //
    // The following routines are intended to be used for more efficient, copy-free, reading and
    // writing of data from/to the stream. Rather than having the caller provide a buffer into which
    // data is written or from which it is read, the stream buffer provides a pointer directly to the
    // internal data blocks that it is using. Since not all stream buffers use internal data structures
    // to copy data, the functions may not be supported by all. An application that wishes to use this
    // functionality should therefore first try them and check for failure to support. If there is
    // such failure, the application should fall back on the copying interfaces (putn / getn)
    //

    /// <summary>
    /// Allocates a contiguous memory block and returns it.
    /// </summary>
    /// <param name="count">The number of characters to allocate.</param>
    /// <returns>A pointer to a block to write to, null if the stream buffer implementation does not support
    /// alloc/commit.</returns>
    virtual _CharType* alloc(_In_ size_t count) = 0;

    /// <summary>
    /// Submits a block already allocated by the stream buffer.
    /// </summary>
    /// <param name="count">The number of characters to be committed.</param>
    virtual void commit(_In_ size_t count) = 0;

    /// <summary>
    /// Gets a pointer to the next already allocated contiguous block of data.
    /// </summary>
    /// <param name="ptr">A reference to a pointer variable that will hold the address of the block on success.</param>
    /// <param name="count">The number of contiguous characters available at the address in 'ptr'.</param>
    /// <returns><c>true</c> if the operation succeeded, <c>false</c> otherwise.</returns>
    /// <remarks>
    /// A return of false does not necessarily indicate that a subsequent read operation would fail, only that
    /// there is no block to return immediately or that the stream buffer does not support the operation.
    /// The stream buffer may not de-allocate the block until <see cref="::release method" /> is called.
    /// If the end of the stream is reached, the function will return <c>true</c>, a null pointer, and a count of zero;
    /// a subsequent read will not succeed.
    /// </remarks>
    virtual bool acquire(_Out_ _CharType*& ptr, _Out_ size_t& count) = 0;

    /// <summary>
    /// Releases a block of data acquired using <see cref="::acquire method"/>. This frees the stream buffer to
    /// de-allocate the memory, if it so desires. Move the read position ahead by the count.
    /// </summary>
    /// <param name="ptr">A pointer to the block of data to be released.</param>
    /// <param name="count">The number of characters that were read.</param>
    virtual void release(_Out_writes_(count) _CharType* ptr, _In_ size_t count) = 0;

    /// <summary>
    /// Retrieves the stream buffer exception_ptr if it has been set.
    /// </summary>
    /// <returns>Pointer to the exception, if it has been set; otherwise, <c>nullptr</c> will be returned</returns>
    virtual std::exception_ptr exception() const = 0;
};

template<typename _CharType>
class streambuf_state_manager : public basic_streambuf<_CharType>,
                                public std::enable_shared_from_this<streambuf_state_manager<_CharType>>
{
public:
    typedef typename details::basic_streambuf<_CharType>::traits traits;
    typedef typename details::basic_streambuf<_CharType>::int_type int_type;
    typedef typename details::basic_streambuf<_CharType>::pos_type pos_type;
    typedef typename details::basic_streambuf<_CharType>::off_type off_type;

    /// <summary>
    /// <c>can_read</c> is used to determine whether a stream buffer will support read operations (get).
    /// </summary>
    virtual bool can_read() const { return m_stream_can_read; }

    /// <summary>
    /// <c>can_write</c> is used to determine whether a stream buffer will support write operations (put).
    /// </summary>
    virtual bool can_write() const { return m_stream_can_write; }

    /// <summary>
    /// Checks if the stream buffer is open.
    /// </summary>
    /// <remarks>No separation is made between open for reading and open for writing.</remarks>
    virtual bool is_open() const { return can_read() || can_write(); }

    /// <summary>
    /// Closes the stream buffer, preventing further read or write operations.
    /// </summary>
    /// <param name="mode">The I/O mode (in or out) to close for.</param>
    virtual pplx::task<void> close(std::ios_base::openmode mode = std::ios_base::in | std::ios_base::out)
    {
        pplx::task<void> closeOp = pplx::task_from_result();

        if (mode & std::ios_base::in && can_read())
        {
            closeOp = _close_read();
        }

        // After the flush_internal task completed, "this" object may have been destroyed,
        // accessing the members is invalid, use shared_from_this to avoid access violation exception.
        auto this_ptr = std::static_pointer_cast<streambuf_state_manager>(this->shared_from_this());

        if (mode & std::ios_base::out && can_write())
        {
            if (closeOp.is_done())
                closeOp = closeOp && _close_write().then([this_ptr] {}); // passing down exceptions from closeOp
            else
                closeOp = closeOp.then([this_ptr] { return this_ptr->_close_write().then([this_ptr] {}); });
        }

        return closeOp;
    }

    /// <summary>
    /// Closes the stream buffer with an exception.
    /// </summary>
    /// <param name="mode">The I/O mode (in or out) to close for.</param>
    /// <param name="eptr">Pointer to the exception.</param>
    virtual pplx::task<void> close(std::ios_base::openmode mode, std::exception_ptr eptr)
    {
        if (m_currentException == nullptr) m_currentException = eptr;
        return close(mode);
    }

    /// <summary>
    /// <c>is_eof</c> is used to determine whether a read head has reached the end of the buffer.
    /// </summary>
    virtual bool is_eof() const { return m_stream_read_eof; }

    /// <summary>
    /// Writes a single character to the stream.
    /// </summary>
    /// <param name="ch">The character to write</param>
    /// <returns>The value of the character. EOF if the write operation fails</returns>
    virtual pplx::task<int_type> putc(_CharType ch)
    {
        if (!can_write()) return create_exception_checked_value_task<int_type>(traits::eof());

        return create_exception_checked_task<int_type>(_putc(ch), [](int_type) {
            return false; // no EOF for write
        });
    }

    /// <summary>
    /// Writes a number of characters to the stream.
    /// </summary>
    /// <param name="ptr">A pointer to the block of data to be written.</param>
    /// <param name="count">The number of characters to write.</param>
    /// <returns>The number of characters actually written, either 'count' or 0.</returns>
    CASABLANCA_DEPRECATED("This API in some cases performs a copy. It is deprecated and will be removed in a future "
                          "release. Use putn_nocopy instead.")
    virtual pplx::task<size_t> putn(const _CharType* ptr, size_t count)
    {
        if (!can_write()) return create_exception_checked_value_task<size_t>(0);
        if (count == 0) return pplx::task_from_result<size_t>(0);

        return create_exception_checked_task<size_t>(_putn(ptr, count, true), [](size_t) {
            return false; // no EOF for write
        });
    }

    /// <summary>
    /// Writes a number of characters to the stream. Note: callers must make sure the data to be written is valid until
    /// the returned task completes.
    /// </summary>
    /// <param name="ptr">A pointer to the block of data to be written.</param>
    /// <param name="count">The number of characters to write.</param>
    /// <returns>A <c>task</c> that holds the number of characters actually written, either 'count' or 0.</returns>
    virtual pplx::task<size_t> putn_nocopy(const _CharType* ptr, size_t count)
    {
        if (!can_write()) return create_exception_checked_value_task<size_t>(0);
        if (count == 0) return pplx::task_from_result<size_t>(0);

        return create_exception_checked_task<size_t>(_putn(ptr, count), [](size_t) {
            return false; // no EOF for write
        });
    }

    /// <summary>
    /// Reads a single character from the stream and advances the read position.
    /// </summary>
    /// <returns>The value of the character. EOF if the read fails.</returns>
    virtual pplx::task<int_type> bumpc()
    {
        if (!can_read())
            return create_exception_checked_value_task<int_type>(streambuf_state_manager<_CharType>::traits::eof());

        return create_exception_checked_task<int_type>(
            _bumpc(), [](int_type val) { return val == streambuf_state_manager<_CharType>::traits::eof(); });
    }

    /// <summary>
    /// Reads a single character from the stream and advances the read position.
    /// </summary>
    /// <returns>The value of the character. <c>-1</c> if the read fails. <c>-2</c> if an asynchronous read is
    /// required</returns> <remarks>This is a synchronous operation, but is guaranteed to never block.</remarks>
    virtual int_type sbumpc()
    {
        if (!(m_currentException == nullptr)) std::rethrow_exception(m_currentException);
        if (!can_read()) return traits::eof();
        return check_sync_read_eof(_sbumpc());
    }

    /// <summary>
    /// Reads a single character from the stream without advancing the read position.
    /// </summary>
    /// <returns>The value of the byte. EOF if the read fails.</returns>
    virtual pplx::task<int_type> getc()
    {
        if (!can_read()) return create_exception_checked_value_task<int_type>(traits::eof());

        return create_exception_checked_task<int_type>(
            _getc(), [](int_type val) { return val == streambuf_state_manager<_CharType>::traits::eof(); });
    }

    /// <summary>
    /// Reads a single character from the stream without advancing the read position.
    /// </summary>
    /// <returns>The value of the character. EOF if the read fails. <see cref="::requires_async method" /> if an
    /// asynchronous read is required</returns> <remarks>This is a synchronous operation, but is guaranteed to never
    /// block.</remarks>
    virtual int_type sgetc()
    {
        if (!(m_currentException == nullptr)) std::rethrow_exception(m_currentException);
        if (!can_read()) return traits::eof();
        return check_sync_read_eof(_sgetc());
    }

    /// <summary>
    /// Advances the read position, then returns the next character without advancing again.
    /// </summary>
    /// <returns>The value of the character. EOF if the read fails.</returns>
    virtual pplx::task<int_type> nextc()
    {
        if (!can_read()) return create_exception_checked_value_task<int_type>(traits::eof());

        return create_exception_checked_task<int_type>(
            _nextc(), [](int_type val) { return val == streambuf_state_manager<_CharType>::traits::eof(); });
    }

    /// <summary>
    /// Retreats the read position, then returns the current character without advancing.
    /// </summary>
    /// <returns>The value of the character. EOF if the read fails. <see cref="::requires_async method" /> if an
    /// asynchronous read is required</returns>
    virtual pplx::task<int_type> ungetc()
    {
        if (!can_read()) return create_exception_checked_value_task<int_type>(traits::eof());

        return create_exception_checked_task<int_type>(_ungetc(), [](int_type) { return false; });
    }

    /// <summary>
    /// Reads up to a given number of characters from the stream.
    /// </summary>
    /// <param name="ptr">The address of the target memory area.</param>
    /// <param name="count">The maximum number of characters to read.</param>
    /// <returns>The number of characters read. O if the end of the stream is reached.</returns>
    virtual pplx::task<size_t> getn(_Out_writes_(count) _CharType* ptr, _In_ size_t count)
    {
        if (!can_read()) return create_exception_checked_value_task<size_t>(0);
        if (count == 0) return pplx::task_from_result<size_t>(0);

        return create_exception_checked_task<size_t>(_getn(ptr, count), [](size_t val) { return val == 0; });
    }

    /// <summary>
    /// Copies up to a given number of characters from the stream, synchronously.
    /// </summary>
    /// <param name="ptr">The address of the target memory area.</param>
    /// <param name="count">The maximum number of characters to read.</param>
    /// <returns>The number of characters copied. O if the end of the stream is reached or an asynchronous read is
    /// required.</returns> <remarks>This is a synchronous operation, but is guaranteed to never block.</remarks>
    virtual size_t scopy(_Out_writes_(count) _CharType* ptr, _In_ size_t count)
    {
        if (!(m_currentException == nullptr)) std::rethrow_exception(m_currentException);
        if (!can_read()) return 0;

        return _scopy(ptr, count);
    }

    /// <summary>
    /// For output streams, flush any internally buffered data to the underlying medium.
    /// </summary>
    /// <returns><c>true</c> if the flush succeeds, <c>false</c> if not</returns>
    virtual pplx::task<void> sync()
    {
        if (!can_write())
        {
            if (m_currentException == nullptr)
                return pplx::task_from_result();
            else
                return pplx::task_from_exception<void>(m_currentException);
        }
        return create_exception_checked_task<bool>(_sync(), [](bool) { return false; }).then([](bool) {});
    }

    /// <summary>
    /// Retrieves the stream buffer exception_ptr if it has been set.
    /// </summary>
    /// <returns>Pointer to the exception, if it has been set; otherwise, <c>nullptr</c> will be returned.</returns>
    virtual std::exception_ptr exception() const { return m_currentException; }

    /// <summary>
    /// Allocates a contiguous memory block and returns it.
    /// </summary>
    /// <param name="count">The number of characters to allocate.</param>
    /// <returns>A pointer to a block to write to, null if the stream buffer implementation does not support
    /// alloc/commit.</returns> <remarks>This is intended as an advanced API to be used only when it is important to
    /// avoid extra copies.</remarks>
    _CharType* alloc(size_t count)
    {
        if (m_alloced)
            throw std::logic_error(
                "The buffer is already allocated, this maybe caused by overlap of stream read or write");

        _CharType* alloc_result = _alloc(count);

        if (alloc_result) m_alloced = true;

        return alloc_result;
    }

    /// <summary>
    /// Submits a block already allocated by the stream buffer.
    /// </summary>
    /// <param name="count">The number of characters to be committed.</param>
    /// <remarks>This is intended as an advanced API to be used only when it is important to avoid extra
    /// copies.</remarks>
    void commit(size_t count)
    {
        if (!m_alloced) throw std::logic_error("The buffer needs to allocate first");

        _commit(count);
        m_alloced = false;
    }

public:
    virtual bool can_seek() const = 0;
    virtual bool has_size() const = 0;
    virtual utility::size64_t size() const { return 0; }
    virtual size_t buffer_size(std::ios_base::openmode direction = std::ios_base::in) const = 0;
    virtual void set_buffer_size(size_t size, std::ios_base::openmode direction = std::ios_base::in) = 0;
    virtual size_t in_avail() const = 0;
    virtual pos_type getpos(std::ios_base::openmode direction) const = 0;
    virtual pos_type seekpos(pos_type pos, std::ios_base::openmode direction) = 0;
    virtual pos_type seekoff(off_type offset, std::ios_base::seekdir way, std::ios_base::openmode mode) = 0;
    virtual bool acquire(_Out_writes_(count) _CharType*& ptr, _In_ size_t& count) = 0;
    virtual void release(_Out_writes_(count) _CharType* ptr, _In_ size_t count) = 0;

protected:
    virtual pplx::task<int_type> _putc(_CharType ch) = 0;

    // This API is only needed for file streams and until we remove the deprecated stream buffer putn overload.
    virtual pplx::task<size_t> _putn(const _CharType* ptr, size_t count, bool)
    {
        // Default to no copy, only the file streams API overloads and performs a copy.
        return _putn(ptr, count);
    }
    virtual pplx::task<size_t> _putn(const _CharType* ptr, size_t count) = 0;

    virtual pplx::task<int_type> _bumpc() = 0;
    virtual int_type _sbumpc() = 0;
    virtual pplx::task<int_type> _getc() = 0;
    virtual int_type _sgetc() = 0;
    virtual pplx::task<int_type> _nextc() = 0;
    virtual pplx::task<int_type> _ungetc() = 0;
    virtual pplx::task<size_t> _getn(_Out_writes_(count) _CharType* ptr, _In_ size_t count) = 0;
    virtual size_t _scopy(_Out_writes_(count) _CharType* ptr, _In_ size_t count) = 0;
    virtual pplx::task<bool> _sync() = 0;
    virtual _CharType* _alloc(size_t count) = 0;
    virtual void _commit(size_t count) = 0;

    /// <summary>
    /// The real read head close operation, implementation should override it if there is any resource to be released.
    /// </summary>
    virtual pplx::task<void> _close_read()
    {
        m_stream_can_read = false;
        return pplx::task_from_result();
    }

    /// <summary>
    /// The real write head close operation, implementation should override it if there is any resource to be released.
    /// </summary>
    virtual pplx::task<void> _close_write()
    {
        m_stream_can_write = false;
        return pplx::task_from_result();
    }

protected:
    streambuf_state_manager(std::ios_base::openmode mode)
    {
        m_stream_can_read = (mode & std::ios_base::in) != 0;
        m_stream_can_write = (mode & std::ios_base::out) != 0;
        m_stream_read_eof = false;
        m_alloced = false;
    }

    std::exception_ptr m_currentException;
    // The in/out mode for the buffer
    std::atomic<bool> m_stream_can_read;
    std::atomic<bool> m_stream_can_write;
    std::atomic<bool> m_stream_read_eof;
    std::atomic<bool> m_alloced;

private:
    template<typename _CharType1>
    pplx::task<_CharType1> create_exception_checked_value_task(const _CharType1& val) const
    {
        if (this->exception() == nullptr)
            return pplx::task_from_result<_CharType1>(static_cast<_CharType1>(val));
        else
            return pplx::task_from_exception<_CharType1>(this->exception());
    }

    // Set exception and eof states for async read
    template<typename _CharType1>
    pplx::task<_CharType1> create_exception_checked_task(pplx::task<_CharType1> result,
                                                         std::function<bool(_CharType1)> eof_test,
                                                         std::ios_base::openmode mode = std::ios_base::in |
                                                                                        std::ios_base::out)
    {
        auto thisPointer = this->shared_from_this();

        auto func1 = [=](pplx::task<_CharType1> t1) -> pplx::task<_CharType1> {
            try
            {
                thisPointer->m_stream_read_eof = eof_test(t1.get());
            }
            catch (...)
            {
                thisPointer->close(mode, std::current_exception()).get();
                return pplx::task_from_exception<_CharType1>(thisPointer->exception(), pplx::task_options());
            }
            if (thisPointer->m_stream_read_eof && !(thisPointer->exception() == nullptr))
                return pplx::task_from_exception<_CharType1>(thisPointer->exception(), pplx::task_options());
            return t1;
        };

        if (result.is_done())
        {
            // If the data is already available, we should avoid scheduling a continuation, so we do it inline.
            return func1(result);
        }
        else
        {
            return result.then(func1);
        }
    }

    // Set eof states for sync read
    int_type check_sync_read_eof(int_type ch)
    {
        m_stream_read_eof = ch == traits::eof();
        return ch;
    }
};

} // namespace details

// Forward declarations
template<typename _CharType>
class basic_istream;
template<typename _CharType>
class basic_ostream;

/// <summary>
/// Reference-counted stream buffer.
/// </summary>
/// <typeparam name="_CharType">
/// The data type of the basic element of the <c>streambuf.</c>
/// </typeparam>
/// <typeparam name="_CharType2">
/// The data type of the basic element of the <c>streambuf.</c>
/// </typeparam>
template<typename _CharType>
class streambuf : public details::basic_streambuf<_CharType>
{
public:
    typedef typename details::basic_streambuf<_CharType>::traits traits;
    typedef typename details::basic_streambuf<_CharType>::int_type int_type;
    typedef typename details::basic_streambuf<_CharType>::pos_type pos_type;
    typedef typename details::basic_streambuf<_CharType>::off_type off_type;
    typedef typename details::basic_streambuf<_CharType>::char_type char_type;

    template<typename _CharType2>
    friend class streambuf;

    /// <summary>
    /// Constructor.
    /// </summary>
    /// <param name="ptr">A pointer to the concrete stream buffer implementation.</param>
    streambuf(_In_ const std::shared_ptr<details::basic_streambuf<_CharType>>& ptr) : m_buffer(ptr) {}

    /// <summary>
    /// Default constructor.
    /// </summary>
    streambuf() {}

    /// <summary>
    /// Converter Constructor.
    /// </summary>
    /// <typeparam name="AlterCharType">
    /// The data type of the basic element of the source <c>streambuf</c>.
    /// </typeparam>
    /// <param name="other">The source buffer to be converted.</param>
    template<typename AlterCharType>
    streambuf(const streambuf<AlterCharType>& other)
        : m_buffer(std::static_pointer_cast<details::basic_streambuf<_CharType>>(
              std::static_pointer_cast<void>(other.m_buffer)))
    {
        static_assert(std::is_same<pos_type, typename details::basic_streambuf<AlterCharType>::pos_type>::value &&
                          std::is_same<off_type, typename details::basic_streambuf<AlterCharType>::off_type>::value &&
                          std::is_integral<_CharType>::value && std::is_integral<AlterCharType>::value &&
                          std::is_integral<int_type>::value &&
                          std::is_integral<typename details::basic_streambuf<AlterCharType>::int_type>::value &&
                          sizeof(_CharType) == sizeof(AlterCharType) &&
                          sizeof(int_type) == sizeof(typename details::basic_streambuf<AlterCharType>::int_type),
                      "incompatible stream character types");
    }

    /// <summary>
    /// Constructs an input stream head for this stream buffer.
    /// </summary>
    /// <returns><c>basic_istream</c>.</returns>
    concurrency::streams::basic_istream<_CharType> create_istream() const
    {
        if (!can_read()) throw std::runtime_error("stream buffer not set up for input of data");
        return concurrency::streams::basic_istream<_CharType>(*this);
    }

    /// <summary>
    /// Constructs an output stream for this stream buffer.
    /// </summary>
    /// <returns>basic_ostream</returns>
    concurrency::streams::basic_ostream<_CharType> create_ostream() const
    {
        if (!can_write()) throw std::runtime_error("stream buffer not set up for output of data");
        return concurrency::streams::basic_ostream<_CharType>(*this);
    }

    /// <summary>
    /// Checks if the stream buffer has been initialized or not.
    /// </summary>
    operator bool() const { return (bool)m_buffer; }

    /// <summary>
    /// Destructor
    /// </summary>
    virtual ~streambuf() {}

    const std::shared_ptr<details::basic_streambuf<_CharType>>& get_base() const
    {
        if (!m_buffer)
        {
            throw std::invalid_argument("Invalid streambuf object");
        }

        return m_buffer;
    }

    /// <summary>
    /// <c>can_read</c> is used to determine whether a stream buffer will support read operations (get).
    /// </summary>
    virtual bool can_read() const { return get_base()->can_read(); }

    /// <summary>
    /// <c>can_write</c> is used to determine whether a stream buffer will support write operations (put).
    /// </summary>
    virtual bool can_write() const { return get_base()->can_write(); }

    /// <summary>
    /// <c>can_seek</c> is used to determine whether a stream buffer supports seeking.
    /// </summary>
    /// <returns>True if seeking is supported, false otherwise.</returns>
    virtual bool can_seek() const { return get_base()->can_seek(); }

    /// <summary>
    /// <c>has_size</c> is used to determine whether a stream buffer supports size().
    /// </summary>
    /// <returns>True if the <c>size</c> API is supported, false otherwise.</returns>
    virtual bool has_size() const { return get_base()->has_size(); }

    /// <summary>
    /// Gets the total number of characters in the stream buffer, if known. Calls to <c>has_size</c> will determine
    /// whether the result of <c>size</c> can be relied on.
    /// </summary>
    /// <returns>The total number of characters in the stream buffer.</returns>
    virtual utility::size64_t size() const { return get_base()->size(); }

    /// <summary>
    /// Gets the stream buffer size, if one has been set.
    /// </summary>
    /// <param name="direction">The direction of buffering (in or out)</param>
    /// <returns>The size of the internal buffer (for the given direction).</returns>
    /// <remarks>An implementation that does not support buffering will always return 0.</remarks>
    virtual size_t buffer_size(std::ios_base::openmode direction = std::ios_base::in) const
    {
        return get_base()->buffer_size(direction);
    }

    /// <summary>
    /// Sets the stream buffer implementation to buffer or not buffer.
    /// </summary>
    /// <param name="size">The size to use for internal buffering, 0 if no buffering should be done.</param>
    /// <param name="direction">The direction of buffering (in or out)</param>
    /// <remarks>An implementation that does not support buffering will silently ignore calls to this function and it
    /// will not have any effect on what is returned by subsequent calls to <see cref="::buffer_size method"
    /// />.</remarks>
    virtual void set_buffer_size(size_t size, std::ios_base::openmode direction = std::ios_base::in)
    {
        get_base()->set_buffer_size(size, direction);
    }

    /// <summary>
    /// For any input stream, <c>in_avail</c> returns the number of characters that are immediately available
    /// to be consumed without blocking. May be used in conjunction with <cref="::sbumpc method"/> to read data without
    /// incurring the overhead of using tasks.
    /// </summary>
    /// <returns>Number of characters that are ready to read.</returns>
    virtual size_t in_avail() const { return get_base()->in_avail(); }

    /// <summary>
    /// Checks if the stream buffer is open.
    /// </summary>
    /// <remarks>No separation is made between open for reading and open for writing.</remarks>
    /// <returns>True if the stream buffer is open for reading or writing, false otherwise.</returns>
    virtual bool is_open() const { return get_base()->is_open(); }

    /// <summary>
    /// <c>is_eof</c> is used to determine whether a read head has reached the end of the buffer.
    /// </summary>
    /// <returns>True if at the end of the buffer, false otherwise.</returns>
    virtual bool is_eof() const { return get_base()->is_eof(); }

    /// <summary>
    /// Closes the stream buffer, preventing further read or write operations.
    /// </summary>
    /// <param name="mode">The I/O mode (in or out) to close for.</param>
    virtual pplx::task<void> close(std::ios_base::openmode mode = (std::ios_base::in | std::ios_base::out))
    {
        // We preserve the check here to workaround a Dev10 compiler crash
        auto buffer = get_base();
        return buffer ? buffer->close(mode) : pplx::task_from_result();
    }

    /// <summary>
    /// Closes the stream buffer with an exception.
    /// </summary>
    /// <param name="mode">The I/O mode (in or out) to close for.</param>
    /// <param name="eptr">Pointer to the exception.</param>
    virtual pplx::task<void> close(std::ios_base::openmode mode, std::exception_ptr eptr)
    {
        // We preserve the check here to workaround a Dev10 compiler crash
        auto buffer = get_base();
        return buffer ? buffer->close(mode, eptr) : pplx::task_from_result();
    }

    /// <summary>
    /// Writes a single character to the stream.
    /// </summary>
    /// <param name="ch">The character to write</param>
    /// <returns>The value of the character. EOF if the write operation fails</returns>
    virtual pplx::task<int_type> putc(_CharType ch) { return get_base()->putc(ch); }

    /// <summary>
    /// Allocates a contiguous memory block and returns it.
    /// </summary>
    /// <param name="count">The number of characters to allocate.</param>
    /// <returns>A pointer to a block to write to, null if the stream buffer implementation does not support
    /// alloc/commit.</returns>
    virtual _CharType* alloc(size_t count) { return get_base()->alloc(count); }

    /// <summary>
    /// Submits a block already allocated by the stream buffer.
    /// </summary>
    /// <param name="count">The number of characters to be committed.</param>
    virtual void commit(size_t count) { get_base()->commit(count); }

    /// <summary>
    /// Gets a pointer to the next already allocated contiguous block of data.
    /// </summary>
    /// <param name="ptr">A reference to a pointer variable that will hold the address of the block on success.</param>
    /// <param name="count">The number of contiguous characters available at the address in 'ptr'.</param>
    /// <returns><c>true</c> if the operation succeeded, <c>false</c> otherwise.</returns>
    /// <remarks>
    /// A return of false does not necessarily indicate that a subsequent read operation would fail, only that
    /// there is no block to return immediately or that the stream buffer does not support the operation.
    /// The stream buffer may not de-allocate the block until <see cref="::release method" /> is called.
    /// If the end of the stream is reached, the function will return <c>true</c>, a null pointer, and a count of zero;
    /// a subsequent read will not succeed.
    /// </remarks>
    virtual bool acquire(_Out_ _CharType*& ptr, _Out_ size_t& count)
    {
        ptr = nullptr;
        count = 0;
        return get_base()->acquire(ptr, count);
    }

    /// <summary>
    /// Releases a block of data acquired using <see cref="::acquire method"/>. This frees the stream buffer to
    /// de-allocate the memory, if it so desires. Move the read position ahead by the count.
    /// </summary>
    /// <param name="ptr">A pointer to the block of data to be released.</param>
    /// <param name="count">The number of characters that were read.</param>
    virtual void release(_Out_writes_(count) _CharType* ptr, _In_ size_t count) { get_base()->release(ptr, count); }

    /// <summary>
    /// Writes a number of characters to the stream.
    /// </summary>
    /// <param name="ptr">A pointer to the block of data to be written.</param>
    /// <param name="count">The number of characters to write.</param>
    /// <returns>The number of characters actually written, either 'count' or 0.</returns>
    CASABLANCA_DEPRECATED("This API in some cases performs a copy. It is deprecated and will be removed in a future "
                          "release. Use putn_nocopy instead.")
    virtual pplx::task<size_t> putn(const _CharType* ptr, size_t count) { return get_base()->putn(ptr, count); }

    /// <summary>
    /// Writes a number of characters to the stream. Note: callers must make sure the data to be written is valid until
    /// the returned task completes.
    /// </summary>
    /// <param name="ptr">A pointer to the block of data to be written.</param>
    /// <param name="count">The number of characters to write.</param>
    /// <returns>The number of characters actually written, either 'count' or 0.</returns>
    virtual pplx::task<size_t> putn_nocopy(const _CharType* ptr, size_t count)
    {
        return get_base()->putn_nocopy(ptr, count);
    }

    /// <summary>
    /// Reads a single character from the stream and advances the read position.
    /// </summary>
    /// <returns>The value of the character. EOF if the read fails.</returns>
    virtual pplx::task<int_type> bumpc() { return get_base()->bumpc(); }

    /// <summary>
    /// Reads a single character from the stream and advances the read position.
    /// </summary>
    /// <returns>The value of the character. <c>-1</c> if the read fails. <c>-2</c> if an asynchronous read is
    /// required</returns> <remarks>This is a synchronous operation, but is guaranteed to never block.</remarks>
    virtual typename details::basic_streambuf<_CharType>::int_type sbumpc() { return get_base()->sbumpc(); }

    /// <summary>
    /// Reads a single character from the stream without advancing the read position.
    /// </summary>
    /// <returns>The value of the byte. EOF if the read fails.</returns>
    virtual pplx::task<int_type> getc() { return get_base()->getc(); }

    /// <summary>
    /// Reads a single character from the stream without advancing the read position.
    /// </summary>
    /// <returns>The value of the character. EOF if the read fails. <see cref="::requires_async method" /> if an
    /// asynchronous read is required</returns> <remarks>This is a synchronous operation, but is guaranteed to never
    /// block.</remarks>
    virtual typename details::basic_streambuf<_CharType>::int_type sgetc() { return get_base()->sgetc(); }

    /// <summary>
    /// Advances the read position, then returns the next character without advancing again.
    /// </summary>
    /// <returns>The value of the character. EOF if the read fails.</returns>
    pplx::task<int_type> nextc() { return get_base()->nextc(); }

    /// <summary>
    /// Retreats the read position, then returns the current character without advancing.
    /// </summary>
    /// <returns>The value of the character. EOF if the read fails. <see cref="::requires_async method" /> if an
    /// asynchronous read is required</returns>
    pplx::task<int_type> ungetc() { return get_base()->ungetc(); }

    /// <summary>
    /// Reads up to a given number of characters from the stream.
    /// </summary>
    /// <param name="ptr">The address of the target memory area.</param>
    /// <param name="count">The maximum number of characters to read.</param>
    /// <returns>The number of characters read. O if the end of the stream is reached.</returns>
    virtual pplx::task<size_t> getn(_Out_writes_(count) _CharType* ptr, _In_ size_t count)
    {
        return get_base()->getn(ptr, count);
    }

    /// <summary>
    /// Copies up to a given number of characters from the stream, synchronously.
    /// </summary>
    /// <param name="ptr">The address of the target memory area.</param>
    /// <param name="count">The maximum number of characters to read.</param>
    /// <returns>The number of characters copied. O if the end of the stream is reached or an asynchronous read is
    /// required.</returns> <remarks>This is a synchronous operation, but is guaranteed to never block.</remarks>
    virtual size_t scopy(_Out_writes_(count) _CharType* ptr, _In_ size_t count)
    {
        return get_base()->scopy(ptr, count);
    }

    /// <summary>
    /// Gets the current read or write position in the stream.
    /// </summary>
    /// <param name="direction">The I/O direction to seek (see remarks)</param>
    /// <returns>The current position. EOF if the operation fails.</returns>
    /// <remarks>Some streams may have separate write and read cursors.
    ///          For such streams, the direction parameter defines whether to move the read or the write
    ///          cursor.</remarks>
    virtual typename details::basic_streambuf<_CharType>::pos_type getpos(std::ios_base::openmode direction) const
    {
        return get_base()->getpos(direction);
    }

    /// <summary>
    /// Seeks to the given position.
    /// </summary>
    /// <param name="pos">The offset from the beginning of the stream.</param>
    /// <param name="direction">The I/O direction to seek (see remarks).</param>
    /// <returns>The position. EOF if the operation fails.</returns>
    /// <remarks>Some streams may have separate write and read cursors. For such streams, the direction parameter
    /// defines whether to move the read or the write cursor.</remarks>
    virtual typename details::basic_streambuf<_CharType>::pos_type seekpos(
        typename details::basic_streambuf<_CharType>::pos_type pos, std::ios_base::openmode direction)
    {
        return get_base()->seekpos(pos, direction);
    }

    /// <summary>
    /// Seeks to a position given by a relative offset.
    /// </summary>
    /// <param name="offset">The relative position to seek to</param>
    /// <param name="way">The starting point (beginning, end, current) for the seek.</param>
    /// <param name="mode">The I/O direction to seek (see remarks)</param>
    /// <returns>The position. EOF if the operation fails.</returns>
    /// <remarks>Some streams may have separate write and read cursors.
    ///          For such streams, the mode parameter defines whether to move the read or the write cursor.</remarks>
    virtual typename details::basic_streambuf<_CharType>::pos_type seekoff(
        typename details::basic_streambuf<_CharType>::off_type offset,
        std::ios_base::seekdir way,
        std::ios_base::openmode mode)
    {
        return get_base()->seekoff(offset, way, mode);
    }

    /// <summary>
    /// For output streams, flush any internally buffered data to the underlying medium.
    /// </summary>
    /// <returns><c>true</c> if the flush succeeds, <c>false</c> if not</returns>
    virtual pplx::task<void> sync() { return get_base()->sync(); }

    /// <summary>
    /// Retrieves the stream buffer exception_ptr if it has been set.
    /// </summary>
    /// <returns>Pointer to the exception, if it has been set; otherwise, <c>nullptr</c> will be returned</returns>
    virtual std::exception_ptr exception() const { return get_base()->exception(); }

private:
    std::shared_ptr<details::basic_streambuf<_CharType>> m_buffer;
};

} // namespace streams
} // namespace Concurrency