first commit
This commit is contained in:
207
Ultron.Proxy.Server/ClientConnectionManager.cs
Normal file
207
Ultron.Proxy.Server/ClientConnectionManager.cs
Normal file
@@ -0,0 +1,207 @@
|
||||
using System;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Collections.Generic;
|
||||
using System.Net;
|
||||
using System.Net.Sockets;
|
||||
using System.Threading.Tasks;
|
||||
using System.Threading.Tasks.Dataflow;
|
||||
using Ultron.Proxy.Models;
|
||||
using Ultron.Proxy.Utils;
|
||||
|
||||
namespace Ultron.Proxy
|
||||
{
|
||||
/// <summary>
|
||||
/// 反向连接处理类
|
||||
/// </summary>
|
||||
public class ClientConnectionManager
|
||||
{
|
||||
/// <summary>
|
||||
/// 当app增加时触发
|
||||
/// </summary>
|
||||
public event EventHandler<AppChangedEventArgs> AppTcpClientMapReverseConnected = delegate { };
|
||||
public event EventHandler<AppChangedEventArgs> AppTcpClientMapConfigConnected = delegate { };
|
||||
//public event EventHandler<AppChangedEventArgs> AppRemoved = delegate { };
|
||||
|
||||
//端口和app的映射关系,需定时清理
|
||||
public Dictionary<int, AppModel> PortAppMap = new Dictionary<int, AppModel>();
|
||||
|
||||
//app和代理客户端socket之间的映射关系
|
||||
public ConcurrentDictionary<ClientIDAppID, BufferBlock<TcpClient>> AppTcpClientMap = new ConcurrentDictionary<ClientIDAppID, BufferBlock<TcpClient>>();
|
||||
|
||||
//已注册的clientID,和appid之间的关系,appid序号=元素下标序号+1
|
||||
public Dictionary<int, List<ClientIDAppID>> RegisteredClient = new Dictionary<int, List<ClientIDAppID>>();
|
||||
|
||||
private ClientConnectionManager()
|
||||
{
|
||||
ServerHost.Logger.Debug("ClientManager initialized");
|
||||
Task.Run(ListenServiceClient);
|
||||
}
|
||||
|
||||
private object _lockObject = new Object();
|
||||
private object _lockObject2 = new Object();
|
||||
private Random _rand = new Random();
|
||||
private async Task ListenServiceClient()
|
||||
{
|
||||
//侦听,并且构造连接池
|
||||
ServerHost.Logger.Debug("Listening client on port " + ServerHost.ClientServicePort + "...");
|
||||
TcpListener listenter = new TcpListener(IPAddress.Any, ServerHost.ClientServicePort);
|
||||
listenter.Start(1000);
|
||||
while (true)
|
||||
{
|
||||
TcpClient incomeClient = await listenter.AcceptTcpClientAsync();
|
||||
ServerHost.Logger.Debug("已建立一个空连接");
|
||||
ProcessReverseRequest(incomeClient);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 处理反向连接请求
|
||||
/// </summary>
|
||||
/// <param name="incomeClient"></param>
|
||||
/// <returns></returns>
|
||||
private async Task ProcessReverseRequest(TcpClient incomeClient)
|
||||
{
|
||||
try
|
||||
{
|
||||
//读取头四个字节
|
||||
byte[] bytes = new byte[4];
|
||||
await incomeClient.GetStream().ReadAsync(bytes);
|
||||
|
||||
var clientIdAppId = GetAppFromBytes(bytes);
|
||||
ServerHost.Logger.Debug("已获取到消息ClientID:" + clientIdAppId.ClientID.ToString()
|
||||
+ "AppID:" + clientIdAppId.AppID.ToString()
|
||||
);
|
||||
//分配
|
||||
lock (_lockObject)
|
||||
{
|
||||
AppTcpClientMap.GetOrAdd(clientIdAppId, new BufferBlock<TcpClient>()).Post(incomeClient);
|
||||
}
|
||||
//var arg = new AppChangedEventArgs();
|
||||
//arg.App = clientIdAppId;
|
||||
//AppTcpClientMapReverseConnected(this, arg);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
ServerHost.Logger.Debug(e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
private static ClientConnectionManager Instance = new Lazy<ClientConnectionManager>(() => new ClientConnectionManager()).Value;
|
||||
|
||||
public static ClientConnectionManager GetInstance()
|
||||
{
|
||||
return Instance;
|
||||
}
|
||||
|
||||
public async Task<TcpClient> GetClient(int consumerPort)
|
||||
{
|
||||
//从字典的list中取出tcpclient,并将其移除
|
||||
ClientIDAppID clientappid = PortAppMap[consumerPort].ClientIdAppId;
|
||||
|
||||
TcpClient client = await AppTcpClientMap[clientappid].ReceiveAsync();
|
||||
PortAppMap[consumerPort].ReverseClients.Add(client);
|
||||
// AppTcpClientMap[clientappid].Remove(client);
|
||||
//AppRemoved(this, new AppChangedEventArgs { App = clientappid });
|
||||
return client;
|
||||
}
|
||||
|
||||
//通过客户端的id请求,分配好服务端端口和appid交给客户端
|
||||
//arrange ConfigId from top 4 bytes which received from client.
|
||||
//response:
|
||||
// 2 1 1 1 1 ...N
|
||||
// clientid appid port appid2 port2
|
||||
//request:
|
||||
// 2 2
|
||||
// clientid count
|
||||
// methodType value = 0
|
||||
public byte[] ArrageConfigIds(byte[] appRequestBytes, byte[] consumerPortBytes)
|
||||
{
|
||||
// byte[] arrangedBytes = new byte[256];
|
||||
ClientModel clientModel = new ClientModel();
|
||||
int clientId = (appRequestBytes[0] << 8) + appRequestBytes[1];
|
||||
int appCount = (int)appRequestBytes[2];
|
||||
|
||||
if (clientId == 0)
|
||||
{
|
||||
lock (_lockObject)
|
||||
{
|
||||
byte[] tempClientIdBytes = new byte[2];
|
||||
//分配clientid
|
||||
for (int i = 0; i < 10000; i++)
|
||||
{
|
||||
_rand.NextBytes(tempClientIdBytes);
|
||||
int tempClientId = (tempClientIdBytes[0] << 8) + tempClientIdBytes[1];
|
||||
if (!RegisteredClient.ContainsKey(tempClientId))
|
||||
{
|
||||
|
||||
clientModel.ClientId = tempClientId;
|
||||
clientId = tempClientId;
|
||||
//注册客户端
|
||||
RegisteredClient.Add(tempClientId, new List<ClientIDAppID>());
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
clientModel.ClientId = clientId;
|
||||
}
|
||||
lock (_lockObject2)
|
||||
{
|
||||
//循环获取appid,appid是元素下标+1
|
||||
int maxAppCount = RegisteredClient[clientId].Count;
|
||||
//增加请求的客户端
|
||||
//int[] ports = NetworkUtil.FindAvailableTCPPorts(20000, appCount);
|
||||
//foreach (var oneport in ports) Logger.Info(oneport + " ");
|
||||
clientModel.AppList = new List<App>(appCount);
|
||||
for (int i = 0; i < appCount; i++)
|
||||
{
|
||||
int startPort = StringUtil.DoubleBytesToInt(consumerPortBytes[2 * i], consumerPortBytes[2 * i + 1]);
|
||||
int arrangedAppid = maxAppCount + i + 1;
|
||||
if (arrangedAppid > 255) throw new Exception("Stack overflow.");
|
||||
//查找port的起始端口如果未指定,则设置为20000
|
||||
if (startPort == 0) startPort = 20000;
|
||||
int port = NetworkUtil.FindOneAvailableTCPPort(startPort);
|
||||
RegisteredClient[clientId].Add(new ClientIDAppID
|
||||
{
|
||||
ClientID = clientId,
|
||||
AppID = arrangedAppid
|
||||
});
|
||||
clientModel.AppList.Add(new App
|
||||
{
|
||||
AppId = arrangedAppid,
|
||||
Port = port
|
||||
});
|
||||
var appClient = PortAppMap[port] = new AppModel()
|
||||
{
|
||||
ClientIdAppId = new ClientIDAppID()
|
||||
{
|
||||
ClientID = clientId,
|
||||
AppID = arrangedAppid
|
||||
},
|
||||
Tunnels = new List<TcpTunnel>(),
|
||||
ReverseClients = new List<TcpClient>()
|
||||
};
|
||||
ServerHost.Logger.Info(port);
|
||||
//配置时触发
|
||||
AppTcpClientMapConfigConnected(this, new AppChangedEventArgs() { App = appClient.ClientIdAppId });
|
||||
}
|
||||
ServerHost.Logger.Debug(" <=端口已分配。");
|
||||
}
|
||||
return clientModel.ToBytes();
|
||||
}
|
||||
|
||||
private ClientIDAppID GetAppFromBytes(byte[] bytes)
|
||||
{
|
||||
return new ClientIDAppID()
|
||||
{
|
||||
ClientID = (bytes[0] << 8) + bytes[1],
|
||||
AppID = bytes[2]
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user