今天来分享下个人搭建IM的心得

GatewayWorker基于Workerman开发的一个项目框架,用于快速开发TCP长连接应用,例如app推送服务端、即时IM服务端、游戏服务端、物联网、智能家居等等

GatewayWorker使用经典的Gateway和Worker进程模型。Gateway进程负责维持客户端连接,并转发客户端的数据给BusinessWorker进程处理,BusinessWorker进程负责处理实际的业务逻辑(默认调用Events.php处理业务),并将结果推送给对应的客户端。Gateway服务和BusinessWorker服务可以分开部署在不同的服务器上,实现分布式集群。

GatewayWorker提供非常方便的API,可以全局广播数据、可以向某个群体广播数据、也可以向某个特定客户端推送数据。配合Workerman的定时器,也可以定时推送数据。

第一步

composer安装:

1
composer require workerman/gateway-worker

(可选,用于服务端主动推送消息)

1
composer require workerman/gatewayclient

(可选,用于连接数据库)

1
composer require workerman/mysql

(可选,用于连接redis)

1
composer require workerman/redis

第二步

删掉start.php中的require_once DIR . ‘/vendor/autoload.php’;

原因是tp有自己的自动加载机制,貌似在集成composer自动加载机制时有BUG

第三步

在start_gateway.php中创建wss服务,因为小程序要用websocket必须是wss:
假设我的证书目录是统一放在tp根目录,start_gateway.php都放在application里的workerman里
记得把原来的tcp链接要改掉!不然bug

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 证书最好是申请的证书
$context = array(
// 更多ssl选项请参考手册 http://php.net/manual/zh/context.ssl.php
'ssl' => array(
// 请使用绝对路径
'local_cert' => __DIR__.'/../../certificate/websocket/wss.pem', // 也可以是crt文件
'local_pk' => __DIR__.'/../../certificate/websocket/wss.key',
'verify_peer' => false,
// 'allow_self_signed' => true, //如果是自签名证书需要开启此选项
)
);
// websocket协议(端口任意,只要没有被其它程序占用就行)
$gateway = new Gateway("websocket://0.0.0.0:443", $context);
// 开启SSL,websocket+SSL 即wss
$gateway->transport = 'ssl';

基本环境已经配置好了,现在我们可以在前端先测试一下

1
2
3
4
5
6
7
8
9
10
11
12
13
// 证书是会检查域名的,请使用域名连接
ws = new WebSocket("wss://test.com:443");
ws.onopen = function() {
console.log("连接成功");
ws.send('tom');
console.log("给服务端发送一个字符串:tom");
};
ws.onmessage = function(e) {
console.log("收到服务端的消息:" + e.data);
};
ws.onerror =function(e){
console.log(e);
}

第四步

接下来开始在events.php中写我们的主要通信逻辑,本项目主要涉及离线消息推送、在线用户状态显示、群聊、私聊等功能,根据自身情况去调整

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
<?php
/**
* This file is part of workerman.
*
* Licensed under The MIT License
* For full copyright and license information, please see the MIT-LICENSE.txt
* Redistributions of files must retain the above copyright notice.
*
* @author walkor<walkor@workerman.net>
* @copyright walkor<walkor@workerman.net>
* @link http://www.workerman.net/
* @license http://www.opensource.org/licenses/mit-license.php MIT License
*/

/**
* 用于检测业务代码死循环或者长时间阻塞等问题
* 如果发现业务卡死,可以将下面declare打开(去掉//注释),并执行php start.php reload
* 然后观察一段时间workerman.log看是否有process_timeout异常
*/
//declare(ticks=1);
use GatewayWorker\Lib\Gateway;
use Workerman\MySQL\Connection;
use Workerman\Redis\Client;

/**
* 主逻辑
* 主要是处理 onConnect onMessage onClose 三个方法
* onConnect 和 onClose 如果不需要可以不用实现并删除
*/
class Events
{
/**
* 新建静态成员,用来保存数据库实例
*/
public static $db = null;
public static $redis = null;
/**
* 进程启动后初始化数据库连接
*/
public static function onWorkerStart()
{
self::$db = new Connection('IP', '端口号', '账户', '密码', '数据库');
self::$redis = new Client('redis://127.0.0.1:6379');
}
/**
* 当客户端连接时触发
* 如果业务不需此回调可以删除onConnect
*
* @param int $client_id 连接id
*/
public static function onConnect($client_id)
{

}

/**
* 当客户端发来消息时触发
* @param int $client_id 连接id
* @param mixed $message 具体消息
*/
public static function onMessage($client_id, $message)
{
//fromUid,toUid,isGroup,type为必须参数
$msg=json_decode($message,true);//来源消息
Gateway::bindUid($client_id, $msg['fromUid']);//将client_id与uid绑定
$guid=self::$db->select('friendguid')->from('oh_ai_groupfriend')->where("uid={$msg['fromUid']}")->query();
foreach ($guid as $k=>$v){
Gateway::joinGroup($client_id,$v['friendguid']);//将client_id与群uid绑定
}
//常见事件(只处理,不给客户端发送)
switch ($msg['type']){
//离线消息(可选)
case 'pull':
self::$redis->lrange($msg['fromUid'],0,-1,function ($list) use ($msg) {
if(!empty($list)){
GateWay::sendToUid($msg['fromUid'], json_encode(['message'=>$list,'type'=>'pull']));
self::$redis->del($msg['fromUid']);//清除redis缓存
}
});
return false;
//上线
case 'online':
Gateway::sendToAll(json_encode(['list'=>array_keys(Gateway::getAllUidList()),'type'=>'online']));
return false;
//心跳
case 'heart':
GateWay::sendToUid($msg['fromUid'], json_encode(['type'=>'heart','group'=>Gateway::getClientSessionsByGroup(10000),'client'=>$client_id]));
return false;
//创建群聊
case 'create_group':
Gateway::joinGroup($client_id,$msg['toUid']);
return false;
//加入群聊
case 'join_group':
$client_id_array=Gateway::getClientIdByUid($msg['fromUid']);
Gateway::joinGroup($client_id_array[0],$msg['toUid']);
return false;
//解散群聊
case 'dismiss_group':
GateWay::ungroup($msg['toUid']);
return false;
//退出群聊
case 'leave_group':
Gateway::leaveGroup($client_id, $msg['toUid']);
return false;
//踢出群聊
case 'kick_group':
$client_id_array=Gateway::getClientIdByUid($msg['fromUid']);
Gateway::leaveGroup($client_id_array[0], $msg['toUid']);
return false;
}
//自己和自己发消息,不执行任何操作
if($msg['fromUid']==$msg['toUid']){
return false;
}
//处理要发送的数据
//私聊
if($msg['isGroup']==0){
self::sendMsg($msg['toUid'],$msg);
}
//群聊
else{
self::sendMsgGroup($msg['fromUid'],$msg['toUid'],$msg);
}
return false;
}

/**
* 当用户断开连接时触发
* @param int $client_id 连接id
*/
public static function onClose($client_id)
{
//向所有客户端通知下线
$list=array_keys(Gateway::getAllUidList());
GateWay::sendToAll(json_encode(['list'=>$list,'type'=>'online']));
}
/**
* 发送私聊消息
* @param int $uid 目标uid
* @param array $message 消息内容
*/
public static function sendMsg($uid,$message){
//判断是否为好友
if($message['type']!=='read' && $message['type']!=='friend_request' && $message['type']!=='friend_response' && $message['type']!=='group_request' && $message['type']!=='group_response'){
$db=self::$db->select('frienduid')->from('oh_ai_friend')->where("uid={$message['toUid']} and frienduid={$message['fromUid']}")->single();
if(empty($db)){
GateWay::sendToUid($message['fromUid'], json_encode(['id'=>$message['id'],'type'=>'stranger']));
return false;
}
}
//如果对方不在线
if(!Gateway::isUidOnline($uid)){
if($message['type']!=='heart' && $message['type']!=='online'){
self::$redis->set('ai_msg_time',date('Y-m-d',time()));
self::$redis->rpush($uid,json_encode($message));
self::$redis->get('ai_msg_time', function ($date) use ($uid) {
if($date!==date('Y-m-d',time())){
self::$redis->expire($uid,43200);
}
});
}
}
//如果对方在线
else{
GateWay::sendToUid($uid, json_encode($message));
}
return false;
}
/**
* 发送群聊消息
* @param int $uid 发送者uid
* @param int $guid 目标群uid
* @param array $message 消息内容
*/
public static function sendMsgGroup($uid, $guid, $message)
{
//判断用户是否在此群
$db=self::$db->select('friendguid')->from('oh_ai_groupfriend')->where("uid=$uid and friendguid=$guid")->single();
if(empty($db)){
GateWay::sendToUid($message['fromUid'], json_encode(['id'=>$message['id'],'type'=>'stranger']));
return false;
}
$group_info=self::$db->select('is_at,ai_id')->from('oh_ai_group')->where("guid=$guid")->row();
$client_id_array=Gateway::getClientIdByUid($uid);
Gateway::sendToGroup($guid,json_encode($message),$client_id_array[0]);//一定要排除自己
return false;
}

最后是TP框架主动推送消息的一种方法:

需要安装GatewayClient

总体思路图如下

img

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
<?php


namespace app\workerman\controller;

use app\BaseController;
use GatewayClient\Gateway;
use think\facade\Db;

class RestfulApi extends BaseController
{
public function __construct()
{
Gateway::$registerAddress = '127.0.0.1:1238';
}
public function sendMsg(){
$message=input('post.message');
// 向任意uid发送数据
Gateway::sendToUid($message['toUid'], json_encode($message));
return success('发送成功',$message);
}
public function sendGroupMsg(){
$message=input('post.message');
$client_id_array=Gateway::getClientIdByUid($message['fromUid']);
Gateway::sendToGroup($message['toUid'],json_encode($message),$client_id_array[0]);//一定要排除自己
return success('发送成功',$message);
}
}