(*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements. See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership. The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *)

unit Thrift.Transport.STOMP;

interface

uses
  Classes,Windows, SysUtils,
  Thrift,
  Thrift.Transport,
  Thrift.Protocol,
  Thrift.Stream,
  StompClient,
  StompTypes;

type
  TStompTransportImpl = class( TStreamTransportImpl)
  strict private
    FData     : TStringStream;
    FServer   : string;
    FOutQueue : string;
    FStompCli : IStompClient;
  protected
    function GetIsOpen: Boolean; override;
    function Peek: Boolean; override;
  public
    constructor Create( const aServerAndPort, aOutQueue : string);
    destructor Destroy;  override;

    procedure Open();  override;
    procedure Close();  override;
    procedure Flush;  override;
  end;


  TStompServerTransportImpl = class( TServerTransportImpl)
  strict private
    FServer  : string;
    FInQueue : string;
    FClient  : IStompClient;
  protected
    procedure Listen; override;
    procedure Close; override;
    function Accept( const fnAccepting: TProc): ITransport; override;
  public
    constructor Create( const aServerAndPort, aInQueue : string);
    destructor Destroy;  override;
  end;


const
  QUEUE_PREFIX    = '/queue/';
  TOPIC_PREFIX    = '/topic/';
  EXCHANGE_PREFIX = '/exchange/';


implementation



constructor TStompTransportImpl.Create( const aServerAndPort, aOutQueue : string);
var adapter : IThriftStream;
begin
  FData     := TStringStream.Create;
  FServer   := aServerAndPort;
  FOutQueue := aOutQueue;

  adapter := TThriftStreamAdapterDelphi.Create( FData, FALSE);
  inherited Create( nil, adapter);  // output only
end;


destructor TStompTransportImpl.Destroy;
begin
  inherited Destroy;
  FreeAndNil( FData);
  FStompCli := nil;
end;


function TStompTransportImpl.GetIsOpen: Boolean;
begin
  result := (FStompCli <> nil);
end;


function TStompTransportImpl.Peek: Boolean;
begin
  result := FALSE;  // output only
end;


procedure TStompTransportImpl.Open;
begin
  if FStompCli <> nil
  then raise TTransportException.Create( TTransportException.TExceptionType.AlreadyOpen, 'already open')
  else FStompCli := StompUtils.NewStomp( FServer);
end;


procedure TStompTransportImpl.Close;
begin
  FStompCli := nil;
  FData.Clear;
end;


procedure TStompTransportImpl.Flush;
begin
  if FStompCli = nil
  then raise TTransportException.Create( TTransportException.TExceptionType.NotOpen, 'not open');

  FStompCli.Send( FOutQueue, FData.DataString);
  FData.Clear;
end;


//--- TStompServerTransportImpl --------------------------------------------


constructor TStompServerTransportImpl.Create( const aServerAndPort, aInQueue : string);
begin
  inherited Create;
  FServer  := aServerAndPort;
  FInQueue := aInQueue;
end;


destructor TStompServerTransportImpl.Destroy;
begin
  try
    Close;
  finally
    inherited Destroy;
  end;
end;


procedure TStompServerTransportImpl.Listen;
begin
  FClient := StompUtils.NewStomp(FServer);
  FClient.Subscribe( FInQueue);
end;


procedure TStompServerTransportImpl.Close;
begin
  if FClient <> nil then begin
    FClient.Unsubscribe( FInQueue);
    FClient := nil;
  end;
end;


function TStompServerTransportImpl.Accept( const fnAccepting: TProc): ITransport;
var frame   : IStompFrame;
    adapter : IThriftStream;
    stream  : TStringStream;
begin
  if FClient = nil
  then raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
                                         'Not connected.');

  if Assigned(fnAccepting)
  then fnAccepting();

  try
    frame := FClient.Receive(MAXINT);
    if frame = nil then Exit(nil);

    stream  := TStringStream.Create( frame.GetBody);
    adapter := TThriftStreamAdapterDelphi.Create( stream, TRUE);
    result  := TStreamTransportImpl.Create( adapter, nil);

  except
    on E: Exception
    do raise TTransportException.Create( E.ToString );
  end;
end;


end.