今天来分享下个人搭建IM的心得

GatewayWorker基于Workerman开发的一个项目框架,用于快速开发TCP长连接应用,例如app推送服务端、即时IM服务端、游戏服务端、物联网、智能家居等等

GatewayWorker使用经典的Gateway和Worker进程模型。Gateway进程负责维持客户端连接,并转发客户端的数据给BusinessWorker进程处理,BusinessWorker进程负责处理实际的业务逻辑(默认调用Events.php处理业务),并将结果推送给对应的客户端。Gateway服务和BusinessWorker服务可以分开部署在不同的服务器上,实现分布式集群。

GatewayWorker提供非常方便的API,可以全局广播数据、可以向某个群体广播数据、也可以向某个特定客户端推送数据。配合Workerman的定时器,也可以定时推送数据。

第一步

composer安装:

1
composer require workerman/gateway-worker

(可选,用于服务端主动推送消息)

1
composer require workerman/gatewayclient

(可选,用于连接数据库)

1
composer require workerman/mysql

(可选,用于连接redis)

1
composer require workerman/redis

第二步

删掉start.php中的require_once DIR . ‘/vendor/autoload.php’;

原因是tp有自己的自动加载机制,貌似在集成composer自动加载机制时有BUG

第三步

在start_gateway.php中创建wss服务,因为小程序要用websocket必须是wss:
假设我的证书目录是统一放在tp根目录,start_gateway.php都放在application里的workerman里
记得把原来的tcp链接要改掉!不然bug

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 证书最好是申请的证书
$context = array(
// 更多ssl选项请参考手册 http://php.net/manual/zh/context.ssl.php
'ssl' => array(
// 请使用绝对路径
'local_cert' => __DIR__.'/../../certificate/websocket/wss.pem', // 也可以是crt文件
'local_pk' => __DIR__.'/../../certificate/websocket/wss.key',
'verify_peer' => false,
// 'allow_self_signed' => true, //如果是自签名证书需要开启此选项
)
);
// websocket协议(端口任意,只要没有被其它程序占用就行)
$gateway = new Gateway("websocket://0.0.0.0:443", $context);
// 开启SSL,websocket+SSL 即wss
$gateway->transport = 'ssl';

基本环境已经配置好了,现在我们可以在前端先测试一下

1
2
3
4
5
6
7
8
9
10
11
12
13
// 证书是会检查域名的,请使用域名连接
ws = new WebSocket("wss://test.com:443");
ws.onopen = function() {
console.log("连接成功");
ws.send('tom');
console.log("给服务端发送一个字符串:tom");
};
ws.onmessage = function(e) {
console.log("收到服务端的消息:" + e.data);
};
ws.onerror =function(e){
console.log(e);
}

第四步

接下来开始在events.php中写我们的主要通信逻辑,本项目主要涉及离线消息推送、在线用户状态显示、群聊、私聊等功能,根据自身情况去调整

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
<?php
/**
* This file is part of workerman.
*
* Licensed under The MIT License
* For full copyright and license information, please see the MIT-LICENSE.txt
* Redistributions of files must retain the above copyright notice.
*
* @author walkor<walkor@workerman.net>
* @copyright walkor<walkor@workerman.net>
* @link http://www.workerman.net/
* @license http://www.opensource.org/licenses/mit-license.php MIT License
*/

/**
* 用于检测业务代码死循环或者长时间阻塞等问题
* 如果发现业务卡死,可以将下面declare打开(去掉//注释),并执行php start.php reload
* 然后观察一段时间workerman.log看是否有process_timeout异常
*/
//declare(ticks=1);
use GatewayWorker\Lib\Gateway;
use Workerman\MySQL\Connection;
use Workerman\Redis\Client;

/**
* 主逻辑
* 主要是处理 onConnect onMessage onClose 三个方法
* onConnect 和 onClose 如果不需要可以不用实现并删除
*/
class Events
{
/**
* 新建静态成员,用来保存数据库实例
*/
public static $db = null;
public static $redis = null;
/**
* 进程启动后初始化数据库连接
*/
public static function onWorkerStart()
{
self::$db = new Connection('IP', '端口号', '账户', '密码', '数据库');
self::$redis = new Client('redis://127.0.0.1:6379');
}
/**
* 当客户端连接时触发
* 如果业务不需此回调可以删除onConnect
*
* @param int $client_id 连接id
*/
public static function onConnect($client_id)
{

}

/**
* 当客户端发来消息时触发
* @param int $client_id 连接id
* @param mixed $message 具体消息
*/
public static function onMessage($client_id, $message)
{
//fromUid,toUid,isGroup,type,content为必须参数
$msg=json_decode($message,true);//来源消息
Gateway::bindUid($client_id, $msg['fromUid']);//将client_id与uid绑定
//心跳
if($msg['type']=='heart'){
GateWay::sendToUid($msg['fromUid'], json_encode(['type'=>'heart']));
return false;
}
//处理要发送的数据
//私聊
if($msg['isGroup']==0){
self::sendMsg($msg['toUid'],$msg);
}
//群聊
else{
self::sendMsgGroup($msg['fromUid'],$msg['toUid'],$msg);
}
return false;
}

/**
* 当用户断开连接时触发
* @param int $client_id 连接id
*/
public static function onClose($client_id)
{
//向所有客户端通知下线
$list=array_keys(Gateway::getAllUidList());
GateWay::sendToAll(json_encode(['list'=>$list,'type'=>'online']));
}
/**
* 发送私聊消息
* @param int $uid 目标uid
* @param array $message 消息内容
*/
public static function sendMsg($uid,$message){
GateWay::sendToUid($uid, json_encode($message));
return false;
}
/**
* 发送群聊消息
* @param int $uid 发送者uid
* @param int $guid 目标群uid
* @param array $message 消息内容
*/
public static function sendMsgGroup($uid, $guid, $message)
{
$mem_string=self::$db->select('member')->from('oh_ai_group')->where("guid=$guid")->single();
$member=explode(',',$mem_string);
foreach ($member as $v) {
if($v!=$uid){//排除自己
self::sendMsg($v,$message);
}
}
return false;
}
}

最后是TP框架主动推送消息的一种方法:

需要安装GatewayClient

总体思路图如下

img

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<?php


namespace app\workerman\controller;

use app\BaseController;
use GatewayClient\Gateway;
use think\facade\Db;

class RestfulApi extends BaseController
{
public function __construct()
{
Gateway::$registerAddress = '127.0.0.1:1238';
}
public function sendMsg(){
$message=input('post.message');
// 向任意uid发送数据
Gateway::sendToUid($message['toUid'], json_encode($message));
$this->success('发送成功',$message);
}
}

其他高级功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
//拉取所有人线状态
if($msg['type']=='online'){
$list=array_keys(Gateway::getAllUidList());
foreach ($list as $v){
GateWay::sendToUid($v, json_encode(['list'=>$list,'type'=>'online']));
}
return false;
}
//拉取离线消息
if($msg['type']=='pull'){
self::$redis->lrange($msg['fromUid'],0,-1,function ($list) use ($msg) {
if(!empty($list)){
GateWay::sendToUid($msg['fromUid'], json_encode(['message'=>$list,'type'=>'pull']));
self::$redis->del($msg['fromUid']);//清除redis缓存
}
});
return false;
}
//自己和自己发消息,不执行任何操作
if($msg['fromUid']==$msg['toUid']){
return false;
}
//判断是否为好友(排除AI)
if($message['isGroup']==0 && $message['type']!=='read' && $message['type']!=='friend_request' && $message['type']!=='friend_response' && $message['type']!=='group_request' && $message['type']!=='group_response' && $uid!==0){
$db=self::$db->select('frienduid')->from('oh_ai_friend')->where("uid={$message['toUid']} and frienduid={$message['fromUid']}")->single();
if(empty($db)){
GateWay::sendToUid($message['fromUid'], json_encode(['id'=>$message['id'],'type'=>'stranger']));
return false;
}
}
//如果对方不在线
if(!Gateway::isUidOnline($uid)){
if($message['type']!=='heart' && $message['type']!=='online'){
self::$redis->set('ai_msg_time',date('Y-m-d',time()));
self::$redis->rpush($uid,json_encode($message));
self::$redis->get('ai_msg_time', function ($date) use ($uid) {
if($date!==date('Y-m-d',time())){
self::$redis->expire($uid,43200);
}
});
}
}
//判断用户是否在此群(如果不设置,被踢出的群员扔会发送消息)
$db=self::$db->select('friendguid')->from('oh_ai_groupfriend')->where("uid=$uid and friendguid=$guid")->single();
if(empty($db)){
GateWay::sendToUid($message['fromUid'], json_encode(['id'=>$message['id'],'type'=>'stranger']));
return false;
}