先談golang
package main
import (
"fmt"
"time"
)
//ex one
func worker(id int, c chan int) {
for {
fmt.Printf("is %d,come %c\n", id, <-c)
}
}
//ex two return is a chan
func createworker(id int) chan int {
c := make(chan int)
go func() {
for {
fmt.Printf("is %d,come %c\n", id, <-c)
}
}()
return c
}
func chanDemo() {
var channels [10]chan int
//ex one
for i := 0; i < 10; i++ {
channels[i] = make(chan int)
go worker(i, channels[i])
channels[i] <- 'a' + i
channels[i] <- 'A' + i
}
//extwo
for i := 0; i < 10; i++ {
channels[i] = createworker(i)
}
for i := 0; i < 10; i++ {
channels[i] <- 'a' + i
}
for i := 0; i < 10; i++ {
channels[i] <- 'B' + i
}
for i := 0; i < 10; i++ {
channels[i] <- 'A' + i
}
//like swoole co::sleep()
time.Sleep(time.Microsecond)
}
func workerTwo(id int, c chan string) {
//use your channal stop output
for n := range c {
fmt.Printf("is %d,come %s\n", id, n)
}
}
//like swoole chan(3)
func bufferChannel() {
c := make(chan string, 3)
go workerTwo(0, c)
// input your channel
c <- "蘋果皮"
c <- "豆腐皮"
c <- "香蕉皮"
close(c)
// like swoole co::sleep()
time.Sleep(time.Microsecond)
}
func main() {
chanDemo()
bufferChannel()
}
再看swoole
/**
* 創(chuàng)建一個主協(xié)程
*/
const NUM = 1; //總執(zhí)行次數(shù)
$c = new chan(600); //定義管道
// 記錄開始時間
$StartTime = time();
const MaxCoroutine = 10; //默認3000
Swoole\Coroutine::set(array(
'max_coroutine' => MaxCoroutine,
));
go(function () use ($c) {
/**
* 連接數(shù)據(jù)庫
*/
$swoole_mysql = new Swoole\Coroutine\MySQL();
$res = $swoole_mysql->connect([
'host' => '127.0.0.1',
'port' => 3306,
'user' => 'root',
'password' => 'root',
'database' => 'demo',
'fetch_mode' => true, //fetch 要開啟這個
]);
if ($res == false) {
// log($res);
echo "MYSQL-CONNECT-ERROR" . PHP_EOL;
return;
}
/**
* 調(diào)用pop防止 堵塞 子協(xié)程
*/
go(function () use ($c, $swoole_mysql) {
popChan($c, $swoole_mysql);
});
/**
* 調(diào)用push 子協(xié)程
*/
go(function () use ($c) {
pushChan($c);
});
});
/**
* 調(diào)用pop
*
* @param [type] $c
* @param [type] $swoole_mysql
* @return void
*/
function popChan(&$c, &$swoole_mysql)
{
$n = 0;
$stm = $swoole_mysql->prepare('insert into `cate` (`id`,`name`) value(?,?)');
if (false == $stm) {
// log("預(yù)處理錯誤的原因:" . $swoole_mysql->erron . ":" . $swoole_mysql->error);
consolelog("mysql 添加失敗" . $swoole_mysql->error);
die();
}
while (true) {
consolelog("wite-begin-insert:" . date('Y-m-d H:i:s') . ":" . $n);
//consolelog("insert-data:".json_encode($c->pop()));
$n++;
consolelog("begin-insert:" . date('Y-m-d H:i:s') . ":" . $n);
$ret = $stm->execute([null, json_encode($c->pop())]);
if (false == $ret) {
// log("添加錯誤的原因:" . $swoole_mysql->erron . ":" . $swoole_mysql->error);
//consolelog('insert-error'. $swoole_mysql->erron() . ":" . $swoole_mysql->error());
$n--; //回退
$c->push($c->pop());
}
if (NUM == $n) {
consolelog('End at : ' . date('Y-m-d H:i:s') . PHP_EOL
. ' 耗時: ' . (time() - $GLOBALS['StartTime'])
. 'S 速率: ' . round($n / (time() - $GLOBALS['StartTime']), 2) . '/S');
return;
}
}
}
/**
* push
*/
function pushChan(&$c)
{
for ($i = 0; $i < NUM; $i++) {
/**
* 以防萬一
*/
while ($c->isFull() || (Swoole\Coroutine::stats())['coroutine_num'] > MaxCoroutine - 5) {
consolelog("sleep:" . $i);
co::sleep(0.3);
}
$data = [
'www.baidu.com',
'www.qq.com',
'www.csdn.com',
];
foreach ($data as $v) {
$ips[] = co::gethostbyname($v, AF_INET, 0.5);
consolelog("begin-select:" . $i);
}
$c->push($ips);
}
}
/**
* 定義打印輸出
*
* @param [type] $msg
* @return void
*/
function consolelog($msg)
{
$msg = $msg . PHP_EOL;
echo $msg;
}
/**
* 定義日志文件
*
* @param [type] $msg
* @return void
*/
因為協(xié)程是并行執(zhí)行的他的速度非沉镒澹快躬它,如果不用time我們看不到他的執(zhí)行結(jié)果,所以現(xiàn)在修改一下代碼,也可以實現(xiàn)
package main
import (
"fmt"
)
//create worker
func createWorker(id int) workers {
w := workers{
in: make(chan int),
done: make(chan bool),
}
go doWorker(id, w.in, w.done)
return w
}
// do worker
func doWorker(id int, c chan int, done chan bool) {
for n := range c {
fmt.Printf("is %d,come %c\n", id, n)
done <- true //out
}
}
type workers struct { //type a struct
in chan int //in channel
done chan bool //out channel
}
func chanDemo() {
var workers [10]workers
//extwo
for i := 0; i < 10; i++ {
workers[i] = createWorker(i)
}
//all words goto channel
for i, worker := range workers {
worker.in <- 'a' + i
}
//make word out
for _, woker := range workers {
<-woker.done
}
//all words goto channel
for i, worker := range workers {
worker.in <- 'A' + i
}
//make word out
for _, woker := range workers {
<-woker.done
}
}
func main() {
chanDemo()
}
golang里面內(nèi)置了一個WaitGroup方法 還可以這樣來
package main
import (
"fmt"
"sync"
)
//create worker
func createWorker(id int, wait *sync.WaitGroup) workers {
w := workers{
in: make(chan int),
wait: wait,
}
go doWorker(id, w.in, wait)
return w
}
// do worker
func doWorker(id int, c chan int, wait *sync.WaitGroup) {
for n := range c {
fmt.Printf("is %d,come %c\n", id, n)
wait.Done()
}
}
type workers struct {
in chan int
wait *sync.WaitGroup //usr chanDemo wait so use *
}
func chanDemo() {
var workers [10]workers
var wait sync.WaitGroup
for i := 0; i < 10; i++ {
workers[i] = createWorker(i, &wait)
}
wait.Add(20) //we have 20 work todo
for i, worker := range workers {
worker.in <- 'a' + i
//or we can do this
//wait.Add(1)
}
for i, worker := range workers {
worker.in <- 'A' + i
}
wait.Wait() //wait 20 work to end
}
func main() {
chanDemo()
}
總結(jié):swoole的chan 類似于 隊列汇跨,先進先出,可以設(shè)置一個緩存區(qū),就是你的管道的長度, 在swoole里面有isFull方法進行判斷,如果滿了,可以將當前協(xié)程掛起伍俘,(其他判斷方法也可以)叽掘,如果發(fā)生I/O阻塞,協(xié)程本身就是并發(fā),大家可以想想會發(fā)生什么,你的CPU,內(nèi)存缀旁,所以代碼一定要嚴謹记劈。
go里面的channel是并行執(zhí)行,因為速度非常快,我們第一種通過time的方式可以實現(xiàn)輸出,第二種先并行執(zhí)行前10個并巍,done出來,再并行執(zhí)行后10個目木,done出來,第三種,通過內(nèi)置的waitgroup方法,并行執(zhí)行20個。
-----個人理解