Membangun Hi-Load TCP/IP Server

April 22, 2006 at 9:14 am | Posted in Code Samples | 3 Comments

by: Jaimy Azle

Fitur TCP/IP dalam delphi dibagi dalam dua aspek, Client dan Server, masing-masing mensupport mode koneksi non-blocked, blocked, dan thread-blocking.

  • Modus koneksi non-blocked sangat berguna untuk membuat aplikasi client yang bersifat multitasking, GUI based karena dengan menggunakan modus koneksi non-blocked, aplikasi bisa melakukan hal-hal lain selama proses menerima output yang dikirimkan oleh server. Contoh atas non-blocking connection yang paling jelas terlihat adalah pada aplikasi mail client seperti outlook, ataupun outlook-express dimana user dapat tetap melakukan interaksi dengan aplikasi sementara secara background aplikasi melakukan retrieve email dari server.
  • Modus koneksi blocked biasanya digunakan pada aplikasi console based, baik dos-box ataupun console linux, dimana user harus menunggu selama proses transaksi data antara client dengan server. Kendati dengan keterbatasan tersebut, modus Blocked menawarkan implementasi konektifitas secara mudah dan paling terstruktur menurut algoritma pemrograman konvensional.
  • Modus Thread-Blocking dalam konsepnya sebenarnya sama halnya dengan modus blocked biasa, hanya saja setiap koneksi dihandle melalui thread. Dengan demikian server mampu menangani lebih dari satu koneksi. Modus ini umumnya digunakan di sisi server karena cukup mudah diimplementasikan daripada non-blocked.

Arsitektur Spesifik sebuah High-Load Server

Pada umumnya Threaded-Blocking, cukup untuk digunakan dalam keperluan membangun sebuah server yang mampu menangani multiple user. Namun berbicara tentang sebuah sistem yang mampu menangani banyak user, maka topiknya akan menjadi berbeda. Implementasi Threaded-Blocking akan membuat server mengalokasikan satu thread setiap satu koneksi, ini berarti jika terdapat 100 user maka server akan membuat 100 thread. Hal ini tentunya sangat tidak efektif, terutama bila dikaitkan dengan implementasi proses pada sebuah hi-load sistem umumnya pendek-pendek untuk memungkinkannya mensupport banyak user dengan cara

|– Client Side –|—— Server Side——-|
connect-request-proses-output-disconnect
.

Dengan model transaksi demikian, proses membuat thread untuk masing-masing koneksi kemudian men-destroy-nya ketika koneksi user terputus akan menjadi kurang efisien, dan memakan cukup banyak resource server. akan lebih baik jika membuat sejumlah thread sebagai worker thread secara persisten dan setiap proses akan handle oleh thread-thread tersebut.

Umumnya developer tidak mau ambil pusing untuk melakukan redesign secara low-level seperti ini, melainkan memanfaatkan implementasi server yang memang dirancang untuk kebutuhan sebagai hi-load server, Application Server umumnya menggunakan Apache ataupun IIS sebagai backbone server karena memang HTTPd umumnya dirancang sebagai hi-load server agar mampu mendukung banyak user. Namun HTTPd menggunakan basis text sebagai media komunikasinya, hal ini memungkinkan penurunan performa karena keperluan untuk rutin konversi dan parsing yang cukup banyak.

Windows NT (2000, XP, dst) secara default telah menyediakan Input/Output Completion Port yang dapat digunakan untuk mewujudkan implementasi hi-load server. Dua fungsi WINAPI utama yang digunakan untuk memanfaatkan IOCP adalah GetQueuedCompletionStatus, dan PostQueuedCompletionStatus.

Namun demikian, implementasinya akan menyebabkan sisi client-side harus dimodifikasi agar mampu berkomunikasi dengan server yang memanfaatkan IOCP secara benar. Hal ini dikarenakan IOCP di sisi Server akan memecah sebuah paket besar ke dalam packet-packet yang lebih kecil pada saat dikirimkan ke client.

Permasalahannya, Component TCP/IP bawaan Delphi tidak mensupport IOCP, karenanya harus di treat sedemikian rupa agar ke dalam modus blocked dan secara manual dihandle oleh worker thread sehingga dapat digambarkan sebagai berikut:

Contoh berikut adalah bagaimana memanfaatkan TTcpServer (Sockets.pas) untuk mengimplementasikan sebuah IOCP server. Ide atas implementasi ini saya dapatkan dari codeproject: Developing a Truly Scalable Winsock Server using IO Completion Ports

unit IOCPServerSocket;
interface

uses
  SysUtils, Classes, SyncObjs, Windows, Sockets, WinSock;

type
  EIOCPError = Exception;
  TCustomIOCPServer = class;

TIOCPClientSocketThread = class(TThread)
  private
    FIOCPServer: TCustomIOCPServer;
    FClientSocket: TCustomIpClient;
  protected
    procedure Execute; override;
  public
    constructor Create(AIOCPServer: TCustomIOCPServer);
    destructor Destroy; override;
  end;

TIOCPServerSocketThread = class(TThread)
  private
    FIOCPServer: TCustomIOCPServer;
    FSocket: TCustomTcpServer;
  protected
		procedure Execute; override;
  public
    constructor Create(AIOCPServer: TCustomIOCPServer);
    destructor Destroy; override;
  end;

TIOCPThreadStatusType =
    (iocpActual, iocpCurrent, iocpClient);
  TIOCPThreadStatusRec = packed record
    CurThread: integer;
    ActThread: integer;
    ClientThread: integer;
  end;

TErrorEvent = procedure(AE: Exception) of object;
  TUpdateEvent = procedure(IOCPThreadStatus: TIOCPThreadStatusRec)
                    of object;
  TProcessEvent = procedure(ASock: TCustomIpClient) of object;

TCustomIOCPServer = class(TComponent)
  private
    FServerSocketThread: TIOCPServerSocketThread;
    FErrorException: EIOCPError;
    FIOCPHandle: THandle;
    FLocalHost: TSocketHost;
    FLocalPort: TSocketPort;
    FThreadPool: TList;
    FThreadStatus: TIOCPThreadStatusRec;
    FMaxThreadsInPool: integer;
    FMinThreadsInPool: integer;
    FTerminateEvent: TSimpleEvent;
		FSvcTerminateEvent: TSimpleEvent;
    FCriticalSection: TCriticalSection;
    FIdleTimeOut: integer;
    FOnErrorEvent: TErrorEvent;
    FOnUpdateEvent: TUpdateEvent;
    FOnProcessEvent: TProcessEvent;
    function GetActive: Boolean;
    procedure SetActive(const Value: Boolean);
    procedure SetLocalHost(const Value: TSocketHost);
    procedure SetLocalPort(const Value: TSocketPort);

procedure IncThreadStatus(AType: TIOCPThreadStatusType);
    procedure DecThreadStatus(AType: TIOCPThreadStatusType);
    procedure SetMaxThreadsInPool(const Value: integer);
    procedure SetMinThreadsInPool(const Value: integer);
    procedure SetIdleTimeOut(const Value: integer);

procedure SyncErros;
  protected
    procedure EnsureClosed;
  public
    constructor Create(AOwner: TComponent); override;
    destructor Destroy; override;

procedure WaitForTerminate;

procedure Open; virtual;
    procedure Close; virtual;
    procedure SyncUpdate;

property Active: Boolean
      read GetActive write SetActive;
    property IdleTimeOut: integer
      read FIdleTimeOut write SetIdleTimeOut;
    property LocalHost: TSocketHost
      read FLocalHost write SetLocalHost;
    property LocalPort: TSocketPort
      read FLocalPort write SetLocalPort;
    property MaxThreadsInPool: integer
      read FMaxThreadsInPool write SetMaxThreadsInPool;
		property MinThreadsInPool: integer
      read FMinThreadsInPool write SetMinThreadsInPool;

property OnUpdate: TUpdateEvent
      read FOnUpdateEvent write FOnUpdateEvent;
    property OnProcess: TProcessEvent
      read FOnProcessEvent write FOnProcessEvent;
    property OnError: TErrorEvent
      read FOnErrorEvent write FOnErrorEvent;
  end;

TIOCPServer = class(TCustomIOCPServer)
  published
    property Active;
    property IdleTimeOut;
    property LocalHost;
    property LocalPort;
    property MaxThreadsInPool;
    property MinThreadsInPool;

property OnUpdate;
    property OnProcess;
    property OnError;
  end;

implementation

const
	TERMINATE_THREAD = $7fffffff;

resourcestring
  ERRIOCPCreation =
   'Could not initialize IOCP connection';

{ TIOCPClientSocketThread }

constructor TIOCPClientSocketThread.Create(AIOCPServer: TCustomIOCPServer);
begin
  inherited Create(False);
  FreeOnTerminate := True;
  FIOCPServer := AIOCPServer;
end;

destructor TIOCPClientSocketThread.Destroy;
begin
  inherited Destroy;
end;

procedure TIOCPClientSocketThread.Execute;
var
  Transfered: dword;
  AClientSocket: TSocket;
  OverlappedPtr: POverlapped;
  Thread: TThread;
  i: integer;
begin
  with FIOCPServer do
  begin
    while not Terminated do
    begin
      if not GetQueuedCompletionStatus(FIOCPHandle, Transfered,
        Cardinal(AClientSocket), OverlappedPtr, FIdleTimeOut + 500) then
      begin
        if GetLastError = WAIT_TIMEOUT then
        begin
          FCriticalSection.Enter;
          try
            if (FThreadStatus.CurThread > FMinThreadsInPool) then
              Terminate;
          finally
            FCriticalSection.Leave;
          end;
        end else
        begin
          if (AClientSocket = TERMINATE_THREAD) then
          begin
  	    Terminate;
  	    Break;
  	  end;
          IncThreadStatus(iocpActual);
          DecThreadStatus(iocpClient);
  	  Synchronize(SyncUpdate);
          FCriticalSection.Enter;
          try
  	    if (FThreadStatus.CurThread < FMaxThreadsInPool) then
            begin
  	      if (FThreadStatus.ActThread = FThreadStatus.CurThread) then
              begin
  	        Thread := TIOCPClientSocketThread.Create(FIOCPServer);
                FThreadPool.Add(Thread);
                IncThreadStatus(iocpCurrent);
  		Synchronize(SyncUpdate);
  	      end;
            end;
  	  finally
            FCriticalSection.Leave;
  	  end;
          FClientSocket := TCustomIpClient.Create(nil);
          with FClientSocket do
          try
            try
              FServerSocketThread.FSocket.Accept(FClientSocket);
              if Assigned(FOnProcessEvent) then
                FOnProcessEvent(FClientSocket);
            except
              on E:Exception do
                begin
                  FCriticalSection.Enter;
                  try
  	            FErrorException := e;
  		  finally
  		    FCriticalSection.Leave;
  		  end;
                  Synchronize(SyncErros);
                end;
            end;
          finally
            DecThreadStatus(iocpActual);
			      Close;
            Synchronize(SyncUpdate);
            FreeAndNil(FClientSocket);
          end;
        end;
      end;
    end;
    FCriticalSection.Enter;
    try
      i := FThreadPool.IndexOf(Self);
      if i >= 0 then
        FThreadPool.Delete(i);
      DecThreadStatus(iocpCurrent);
      Synchronize(SyncUpdate);
      if FThreadStatus.CurThread = 0 then
        FTerminateEvent.SetEvent;
    finally
      FCriticalSection.Leave;
    end;
  end;
end;

{ TIOCPServerSocketThread }

constructor TIOCPServerSocketThread.Create(AIOCPServer: TCustomIOCPServer);
begin
  inherited Create(False);
  FreeOnTerminate := True;
  FIOCPServer := AIOCPServer;
  FSocket := TCustomTcpServer.Create(nil);
  FSocket.BlockMode := bmBlocking;
end;

destructor TIOCPServerSocketThread.Destroy;
begin
  FSocket.Free;
  inherited Destroy;
end;

procedure TIOCPServerSocketThread.Execute;
var
  ASocket: TSocket;
  AClientSocket: TCustomIpClient;
  i: integer;
begin
  with FSocket do
  begin
    LocalHost := FIOCPServer.LocalHost;
    LocalPort := FIOCPServer.LocalPort;
    Open;
    repeat
      if WaitForData(5000) then
      begin
        AClientSocket := TCustomIpClient.Create(nil);
        try
          Accept(AClientSocket);
          ASocket := AClientSocket.Handle;
        finally
          AClientSocket.Free;
        end;
        if ASocket <> INVALID_SOCKET then
        begin
          FIOCPServer.IncThreadStatus(iocpCurrent);
          PostQueuedCompletionStatus(FIOCPServer.FIOCPHandle, 0,
            Cardinal(ASocket), nil);
        end;
      end;
    until Terminated;
    Close;
    FIOCPServer.FCriticalSection.Enter;
    try
      for i := (FIOCPServer.FThreadPool.Count-1) downto 0 do
        PostQueuedCompletionStatus(FIOCPServer.FIOCPHandle, 0,
          Cardinal(TERMINATE_THREAD), nil);
    finally
      FIOCPServer.FCriticalSection.Leave;
    end;
    WaitForSingleObject(FIOCPServer.FTerminateEvent.Handle, INFINITE);
  end;
end;

{ TCustomIOCPServer }

procedure TCustomIOCPServer.Close;
begin
  try
    FServerSocketThread.FSocket.Close;
    FServerSocketThread.Terminate;
    FServerSocketThread.WaitFor;
  finally
    CloseHandle(FIOCPHandle);
    FSvcTerminateEvent.SetEvent;
  end;
end;

constructor TCustomIOCPServer.Create(AOwner: TComponent);
begin
  inherited Create(AOwner);
  FIOCPHandle := 0;
  FMinThreadsInPool := 4;
  FMaxThreadsInPool := 10;
  FCriticalSection  := TCriticalSection.Create;
  FTerminateEvent   := TSimpleEvent.Create;
  FSvcTerminateEvent:= TSimpleEvent.Create;
  FThreadPool       := TList.Create;
end;

procedure TCustomIOCPServer.DecThreadStatus(AType: TIOCPThreadStatusType);
begin
  case AType of
    iocpActual: InterlockedDecrement(FThreadStatus.ActThread);
    iocpCurrent: InterlockedDecrement(FThreadStatus.CurThread);
    iocpClient: InterlockedDecrement(FThreadStatus.ClientThread);
  end;
end;

destructor TCustomIOCPServer.Destroy;
begin
  FThreadPool.Free;
  FSvcTerminateEvent.Free;
  FTerminateEvent.Free;
  FCriticalSection.Free;
  inherited Destroy;
end;

procedure TCustomIOCPServer.EnsureClosed;
begin
  if Active then
    Close;
end;

function TCustomIOCPServer.GetActive: Boolean;
begin
  Result := (FIOCPHandle <> 0);
end;

procedure TCustomIOCPServer.IncThreadStatus(AType: TIOCPThreadStatusType);
begin
  case AType of
    iocpActual: InterlockedIncrement(FThreadStatus.ActThread);
    iocpCurrent: InterlockedIncrement(FThreadStatus.CurThread);
    iocpClient: InterlockedIncrement(FThreadStatus.ClientThread);
  end;
end;

procedure TCustomIOCPServer.Open;
var
  AClientThread: TIOCPClientSocketThread;
  ASystemInfo: TSystemInfo;
  i: integer;
begin
  if not (csLoading in ComponentState) and
     not (csDesigning in ComponentState) then
  begin
    EnsureClosed;
    GetSystemInfo(ASystemInfo);
    FIOCPHandle := CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0,
      ASystemInfo.dwNumberOfProcessors);
    if (FIOCPHandle = 0) then
	  	raise EIOCPError.Create(ERRIOCPCreation);
    try
      for i := 0 to FMinThreadsInPool-1 do
      begin
        AClientThread := TIOCPClientSocketThread.Create(Self);
        FThreadPool.Add(AClientThread);
        IncThreadStatus(iocpCurrent);
      end;
      FServerSocketThread := TIOCPServerSocketThread.Create(Self);
    except
      EnsureClosed;
      raise;
    end;
  end;
end;

procedure TCustomIOCPServer.SetActive(const Value: Boolean);
begin
  if (Value <> Active) and
     (not (csLoading in ComponentState) and
      not (csDesigning in ComponentState)) then
  begin
    if Value then Open else Close;
  end;
end;

procedure TCustomIOCPServer.SetIdleTimeOut(const Value: integer);
begin
  EnsureClosed;
  FIdleTimeOut := Value;
end;

procedure TCustomIOCPServer.SetLocalHost(const Value: TSocketHost);
begin
  FLocalHost := Value;
end;

procedure TCustomIOCPServer.SetLocalPort(const Value: TSocketPort);
begin
  FLocalPort := Value;
end;

procedure TCustomIOCPServer.SetMaxThreadsInPool(const Value: integer);
begin
  EnsureClosed;
  FMaxThreadsInPool := Value;
end;

procedure TCustomIOCPServer.SetMinThreadsInPool(const Value: integer);
begin
  EnsureClosed;
  FMinThreadsInPool := Value;
end;

procedure TCustomIOCPServer.SyncErros;
begin
  if Assigned(FOnErrorEvent) then
  	FOnErrorEvent(FErrorException);
end;

procedure TCustomIOCPServer.SyncUpdate;
begin
  if Assigned(FOnUpdateEvent) then
    FOnUpdateEvent(FThreadStatus);
end;

procedure TCustomIOCPServer.WaitForTerminate;
begin
  FSvcTerminateEvent.WaitFor(INFINITE)
end;

end.

3 Comments »

RSS feed for comments on this post. TrackBack URI

  1. Innovative Product Introduced at World

    Innovative Product Introduced at World Tea ExpoDrinks Media Wire (Communiqués de presse),France-Jun 7, 2007Tea Temp is designed to make brewing

  2. Sellwood pet shop gears up

    Dogs are welcome inside Sellwood Dog Supply; an indoor “watering hole” and a prominent front door sign make that clear.

  3. Keren, sekalian saya ada pertanyaan yg mengganjal dari dulu, kebetulan saya sendiri sedang membangun aplikasi model YM, di mana untuk komunikasi di dalam chat room, setiap pesan (baik berupa teks, gambar, suara atau video) yang dikirim seorang user ke room pastinya akan di broadcast ke semua user oleh server socket, pertanyaannya adalah : mekanisme apa yang baik untuk di pakai, saya mengenal mekanisme broadcast dan multicast di mana option socket ini hanya berlaku untuk udp sokcet , yang tentunya reability nya tidak bisa di andalkan walaupun untuk masalah kebetuhan realtime, pemilihan UDP socket lebih tepat. tapi dengan TCP socket, apakah server harus membroadcast pesan secara manual (iterasi loop) satu per satu ke tiap user, di mana akan ada user yang akan menerima data di urutan terakhir. mohon pencerahannya. oya untuk info tambahan, saya membangun component socket sendiri menggunakan header winsock API ver 2.2


Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

Create a free website or blog at WordPress.com.
Entries and comments feeds.

%d bloggers like this: