tongsiying

阅读|运动|自律

0%

第33篇:进程、线程、协程

一、IO多路复用

img

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
1.阻塞I/O模型
老李去火车站买票,排队三天买到一张退票。
耗费:在车站吃喝拉撒睡 3天,其他事一件没干。

2.非阻塞I/O模型
老李去火车站买票,隔12小时去火车站问有没有退票,三天后买到一张票。耗费:往返车站6次,路上6小时,其他时间做了好多事。3.I/O复用模型

3.I/O复用模型 
 1.select/poll
  老李去火车站买票,委托黄牛,然后每隔6小时电话黄牛询问,黄牛三天内买到票,然后老李去火车站交钱领票。
耗费:往返车站2次,路上2小时,黄牛手续费100元,打电话17
 2.epoll
  老李去火车站买票,委托黄牛,黄牛买到后即通知老李去领,然后老李去火车站交钱领票。
耗费:往返车站2次,路上2小时,黄牛手续费100元,无需打电话

4.信号驱动I/O模型
老李去火车站买票,给售票员留下电话,有票后,售票员电话通知老李,然后老李去火车站交钱领票。
耗费:往返车站2次,路上2小时,免黄牛费100元,无需打电话

5.异步I/O模型
老李去火车站买票,给售票员留下电话,有票后,售票员电话通知老李并快递送票上门。
耗费:往返车站1次,路上1小时,免黄牛费100元,无需打电话

  如果一个I/O流进来,我们就开启一个进程处理这个I/O流。那么假设现在有一百万个I/O流进来,那我们就需要开启一百万个进程一一对应处理这些I/O流(——这就是传统意义下的多进程并发处理)。思考一下,一百万个进程,你的CPU占有率会多高,这个实现方式及其的不合理。所以人们提出了I/O多路复用这个模型,一个线程,通过记录I/O流的状态来同时管理多个I/O,可以提高服务器的吞吐能力

  I/O multiplexing 也就是我们所说的I/O多路复用,但是这个翻译真的很不生动,所以我更喜欢将它拆开,变成 I/O multi plexing。multi意味着多,而plex意味着丛(丛:聚集,许多事物凑在一起。),那么字面上来看I/O multiplexing 就是将多个I/O凑在一起。就像下面这张图的前半部分一样,中间的那条线就是我们的单个线程,它通过记录传入的每一个I/O流的状态来同时管理多个IO。

IO多路复用的实现

img

我们来分析一下上面这张图

  • 当进程调用select,进程就会被阻塞
  • 此时内核会监视所有select负责的的socket,当socket的数据准备好后,就立即返回。
  • 进程再调用read操作,数据就会从内核拷贝到进程。

其实多路复用的实现有多种方式:select、poll、epoll

selectors模块_client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# -*- coding: utf-8 -*-

"""
@Datetime: 2018/9/7
@Author: Zhang Yafei
"""
from socket import *

sock = socket(AF_INET,SOCK_STREAM)
sock.connect(('127.0.0.1',8090))

while True:
data = input('>>:')
if not data:
continue
sock.send(data.encode('utf-8'))
result = sock.recv(1024)
print(result.decode('utf-8'))

selectors模块_client

selectors_serrer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# -*- coding: utf-8 -*-

"""
@Datetime: 2018/9/7
@Author: Zhang Yafei
"""
import selectors
import socket

sel = selectors.DefaultSelector()


def accept(sock, mask):
conn, addr = sock.accept() # Should be ready
print('accepted', conn, 'from', addr)
conn.setblocking(False)
sel.register(conn, selectors.EVENT_READ, read)


def read(conn, mask):
try:
data = conn.recv(1000) # Should be ready
if data:
print('echoing', repr(data), 'to', conn)
conn.send(data) # Hope it won't block
else:
print('closing', conn)
sel.unregister(conn)
conn.close()
except Exception as e:
print(e)
print('closing', conn)
sel.unregister(conn)
conn.close()


sock = socket.socket()
sock.bind(('localhost', 8090))
sock.listen(100)
sock.setblocking(False)
sel.register(sock, selectors.EVENT_READ, accept)

print('server is starting..........')
while True:
events = sel.select()
for key, mask in events:
callback = key.data
callback(key.fileobj, mask)

selectors_serrer

聊天

client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# -*- coding: utf-8 -*-

"""
@Datetime: 2018/9/6
@Author: Zhang Yafei
"""
import socket
sk=socket.socket()
sk.connect(('127.0.0.1',8801))

while True:
inp=input(">>>>")
sk.sendall(bytes(inp,"utf8"))
data=sk.recv(1024)
print(str(data,'utf8'))

client

server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# -*- coding: utf-8 -*-

"""
@Datetime: 2018/9/6
@Author: Zhang Yafei
"""
import socket
import select
sk=socket.socket()
sk.bind(("127.0.0.1",8801))
sk.listen(5)
inputs=[sk,]
while True:
r,w,e=select.select(inputs,[],[],5)

for obj in r:
if obj==sk:
conn,add=obj.accept()
print(conn)
inputs.append(conn)
else:
data_byte=obj.recv(1024)
print(str(data_byte,'utf8'))
inp=input('回答%s号客户>>>'%inputs.index(obj))
obj.sendall(bytes(inp,'utf8'))

# print('>>',r)

server

非阻塞IO

client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# -*- coding: utf-8 -*-

"""
@Datetime: 2018/9/6
@Author: Zhang Yafei
"""
import time
import socket
sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM)

while True:
sk.connect(('127.0.0.1',6667))
print("hello")
sk.sendall(bytes("hello","utf8"))
time.sleep(2)
break

client

server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# -*- coding: utf-8 -*-

"""
@Datetime: 2018/9/6
@Author: Zhang Yafei
"""
import time
from socket import *
sk = socket(AF_INET,SOCK_STREAM)
sk.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
sk.bind(('127.0.0.1',6667))
sk.listen(5)
sk.setblocking(False)

print ('waiting client connection .......')
while True:
try:
connection,address = sk.accept() # 进程主动轮询
print("+++",address)
client_messge = connection.recv(1024)
print(str(client_messge,'utf8'))
connection.close()
except Exception as e:
print(e)
time.sleep(4)

server

IO多路复用

client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# -*- coding: utf-8 -*-

"""
@Datetime: 2018/9/6
@Author: Zhang Yafei
"""
import socket

sk=socket.socket()

sk.connect(("127.0.0.1",9904))

while 1:
inp=input(">>").strip()
sk.send(inp.encode("utf8"))
data=sk.recv(1024)
print(data.decode("utf8"))

client

server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# -*- coding: utf-8 -*-

"""
@Datetime: 2018/9/6
@Author: Zhang Yafei
"""
import socket
import select
sk=socket.socket()
sk.bind(("127.0.0.1",9904))
sk.listen(5)
inp = [sk,]
while True:
r,w,e=select.select(inp,[],[],5) #水平触发方式 r表示有人给我发送数据 w表示有人和我创建连接成功
for i in r:
conn,add=i.accept()
#print(conn)
inp.append(conn)
print("hello")
print('>>>>>>')

server

二、select_FTP

client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# -*- coding: utf-8 -*-

"""
@Datetime: 2018/9/7
@Author: Zhang Yafei
"""
from socket import *
import os,sys

BASE_DIR = os.path.dirname(os.path.abspath(__file__))

class selectFtpClient(object):
def __init__(self):
self.args = sys.argv
if len(self.args) > 1:
self.port = (self.args[1],int(self.args[2]))
else:
self.port = ('127.0.0.1',8080)
self.create_socket()
self.command_fanout()

def create_socket(self):
try:
self.sk = socket(AF_INET,SOCK_STREAM)
self.sk.connect(self.port)
print('连接FTP服务器成功!')
except Exception as e:
print('error:',e)

def command_fanout(self):
while True:
cmd = input('>>:').strip()
if cmd == 'exit':
break
cmd,file = cmd.split()
if hasattr(self,cmd):
func = getattr(self,cmd)
func(cmd,file)
else:
print('调用错误!')

def put(self,cmd,file):
if os.path.isfile(file):
fileName = os.path.basename(file)
fileSize = os.path.getsize(file)
fileiInfo = '%s|%s|%s'%(cmd,fileName,fileSize)
self.sk.send(bytes(fileiInfo,encoding='utf-8'))
recvStatus = self.sk.recv(1024)
print('recvStatus:',recvStatus)
has_send = 0

if recvStatus.decode('utf-8') == 'OK':
with open(file,'rb') as f:
while has_send < fileSize:
content = f.read(1024)
recv_size = len(content)
self.sk.send(content)
has_send += recv_size
s = str(int(has_send/fileSize*100))+'%'
print('正在上传文件:'+fileName+'已经上传:'+s)
print('%s文件上传完毕:'%fileName)
else:
print('文件不存在!')

def get(self):
pass


if __name__ == '__main__':
selectFtpClient()

client

server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# -*- coding: utf-8 -*-

"""
@Datetime: 2018/9/7
@Author: Zhang Yafei
"""
import os
import time

BASE_DIR = os.path.dirname(os.path.abspath(__file__))

from socket import *
import selectors


class selectFtpServer(object):

def __init__(self):
self.dic = {}
self.hasReceived = 0
self.sel = selectors.DefaultSelector()
self.create_socket()
self.handle()

def create_socket(self):
sk = socket(AF_INET,SOCK_STREAM)
sk.bind(('127.0.0.1',8080))
sk.listen(100)
sk.setblocking(False)
self.sel.register(sk,selectors.EVENT_READ,self.accept)

print('服务端已开启,正在等待用户连接......')

def handle(self):
while True:
events = self.sel.select() #[sock,conn1,conn2]
for key,mask in events:
callback = key.data
callback(key.fileobj,mask)

def accept(self,sock,mask):
conn,addr = sock.accept()
print(addr)
print(type(addr))
print('from %s %s connected'%addr)
conn.setblocking(False)
self.sel.register(conn,selectors.EVENT_READ,self.read)

self.dic[conn] = {}

def read(self,conn,mask):
try:
if not self.dic[conn]:
data = conn.recv(1024)
cmd,fileName,fileSize = str(data,encoding='utf-8').split('|')

self.dic = {conn:{'cmd':cmd,'fileName':fileName,'fileSize':int(fileSize)}}
if cmd == 'put':
conn.send(bytes('OK',encoding='utf-8'))

if cmd == 'get':
file = os.path.join(BASE_DIR,'download',fileName)

if os.path.exists(file):
file_size = os.path.getsize(file)
send_info = '%s|%s'%('YES',file_size)
conn.send(bytes(send_info,encoding='utf-8'))
else:
send_info = '%s|%s'%('NO',0)
conn.send(bytes(send_info,encoding='utf-8'))
else:
if self.dic[conn].get('cmd',None):
cmd = self.dic[conn].get('cmd')
if hasattr(self,cmd):
func = getattr(self,cmd)
func(conn)
else:
print('error cmd!')
conn.close()
else:
print('error cmd!')
conn.close()
except Exception as e:
print('error:',e)
self.sel.unregister(conn)
conn.close()

def put(self,conn):
fileName = self.dic[conn]['fileName']
fileSize = self.dic[conn]['fileSize']
path = os.path.join(BASE_DIR,'upload',fileName)
f = open(path,'ab')
while self.hasReceived < fileSize:
try:
recv_data = conn.recv(1024)
except:
break
f.write(recv_data)
self.hasReceived += len(recv_data)

if fileSize == self.hasReceived:
self.hasReceived = 0
if conn in self.dic.keys():
self.dic[conn] = {}
print('%s上传完毕!'%fileName)

def get(self,conn):
pass


if __name__ == '__main__':
selectFtpServer()

server

三、GIL

  • GIL(全局解释器锁)(Global Interpreter Lock)是计算机程序设计语言解释器用于同步线程的工具,使得任何时刻一个进程仅有一个线程占用cpu
  • 这就造成了多线程不能同时执行,并且增加了切换的开销,串行的效率可能更高

四、进程

  • 进程定义:最小的资源单位
  • 进程就是一个程序在一个数据集上的一次动态执行过程。(抽象)
  • 进程一般由程序、数据集、进程控制块三部分组成。
  • 我们编写的程序用来描述进程要完成哪些功能以及如何完成;
  • 数据集则是程序在执行过程中所需要使用的资源;
  • 进程控制块用来记录进程的外部特征,描述进程的执行变化过程,系统可以利用它来控制和管理进程,它是系
  • 统感知进程存在的唯一标志

1. 进程的调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# -*- coding: utf-8 -*-

"""
@Datetime: 2018/9/6
@Author: Zhang Yafei
"""
"""
#并行
from multiprocessing import Process
import time


def f(name):
time.sleep(1)
print('hello', name,time.ctime())


if __name__ == '__main__':
p_list=[]

for i in range(3):
p = Process(target=f, args=('alvin',))
p_list.append(p)
p.start()

for p in p_list:
p.join()

print('ending....')
"""
"""
from multiprocessing import Process
import time

class MyProcess(Process):
def __init__(self):
super(MyProcess, self).__init__()
#self.name = name

def run(self):
time.sleep(1)
print('hello', self.name,time.ctime())


if __name__ == '__main__':
p_list=[]

for i in range(3):
p = MyProcess()
p.daemon = True
p.start()
p_list.append(p)

# for p in p_list:
# p.join()

print('ending......')
"""
from multiprocessing import Process
import os
import time


def info(title):
print("title:", title)
print('parent process:', os.getppid())
print('process id:', os.getpid())


def f(name):
info('function f')
print('hello', name)


if __name__ == '__main__':
info('main process line')
time.sleep(1)
print("------------------")
p = Process(target=info, args=('fei',))
p.start()
p.join()

2. process类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# -*- coding: utf-8 -*-

"""
@Datetime: 2018/9/6
@Author: Zhang Yafei
"""
import time
from multiprocessing import Process


class MyProcess(Process):
def __init__(self,num):
super(MyProcess, self).__init__()
self.num = num

def run(self):
time.sleep(1)
print(self.pid)
print(self.is_alive())
print(self.num)


if __name__ == '__main__':
p_list=[]
for i in range(10):
p = MyProcess(i)
p_list.append(p)

for p in p_list:
p.start()

print('main process end')

3. 进程通信

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# -*- coding: utf-8 -*-

"""
@Datetime: 2018/9/6
@Author: Zhang Yafei
"""
"""方式一:进程队列 不能实现数据共享"""
"""
import multiprocessing
import time


def foo(q):
time.sleep(1)
print('son process:',id(q))
q.put(123)
q.put('fei')


if __name__ == '__main__':
# q = queue.Queue() #线程队列
q = multiprocessing.Queue()
p = multiprocessing.Process(target=foo,args=(q,))

p.start()
print('main process:',id(q))
print(q.get())
print(q.get())
"""
"""方式二:管道 不能实现数据共享"""
"""
from multiprocessing import Process, Pipe


def f(conn):
conn.send([12, {"name":"yuan"},、 'hello']) #没走网络通道,所以不用转化成字节
response=conn.recv()
print("response",response)
conn.close()
print("q_ID2:",id(conn))


if __name__ == '__main__':

parent_conn, child_conn = Pipe()
print("q_ID1:",id(child_conn))
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv()) # prints "[42, None, 'hello']"
parent_conn.send("儿子你好!")
p.join()
"""
"""Queue和pipe只是实现了数据交互,并没实现数据共享,即一个进程去更改另一个进程的数据。"""
from multiprocessing import Process, Manager


def f(d, l,n):
d[n] = '1' #{0:'1'}
d['2'] = 2 #{0:'1','2':2}
l.append(n) #[0,1,2,3,4,0,1,2,3,4,5,6,7,8,9]
#print(l)

print("son process:",id(d),id(l))


if __name__ == '__main__':

with Manager() as manager:
d = manager.dict()
l = manager.list(range(5)) #[0,1,2,3,4]

# print("main process:",id(d),id(l))

p_list = []

for i in range(10):
p = Process(target=f, args=(d,l,i))
p.start()
p_list.append(p)

for res in p_list:
res.join()

print(d)
print(l)

4. 进程同步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# -*- coding: utf-8 -*-

"""
@Datetime: 2018/9/6
@Author: Zhang Yafei
"""
import time
from multiprocessing import Process, Lock


def f(l, i):
l.acquire()
print('hello world %s' % i)
l.release()


if __name__ == '__main__':
lock = Lock()

for num in range(10):
Process(target=f, args=(lock, num)).start()

5. 多进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# -*- coding: utf-8 -*-

"""
@Datetime: 2018/9/6
@Author: Zhang Yafei
"""
from multiprocessing import Process,Pool
import time,os


def Foo(i):
time.sleep(1)
print(i)
print('son process:',os.getpid())
return 'hello %s'%i

#回调函数由主进程调用
def Bar(arg):
print(arg)
print('callback process:',os.getpid())


if __name__ == '__main__':
pool = Pool(5)
print('main process:',os.getpid())
for i in range(10):
# pool.apply(func=Foo,args=(i,)) #同步接口
#回调函数:某个动作或函数执行结束之后再去执行的函数
pool.apply_async(func=Foo, args=(i,),callback=Bar)

# pool.map(Foo, [i for i in range(100)])

pool.close()
pool.join()
print('end')

五、线程

1
2
线程:最小的执行单位
进程和线程的区别
  • (1)、一个程序至少有一个进程,一个进程至少有一个线程.(进程可以理解成线程的容器)

  • (2)、进程在执行过程中拥有独立的内存单元,而多个线程共享内存,从而极大地提高了程序的运行效率。

  • (3)、线程在执行过程中与进程还是有区别的。每个独立的线程有一个程序运行的入口、顺序执行序列和

    程序的出口。但是线程不能够独立执行,必须依存在应用程序中,由应用程序提供多个线程执行控制。

  • (4)、进程是具有一定独立功能的程序关于某个数据集合上的一次运行活动,进程是系统进行资源分配和调

    度的一个独立单位. 线程是进程的一个实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位.线程

    自己基本上不拥有系统资源,只拥有一点在运行中必不可少的资源(如程序计数器,一组寄存器和栈)但是

    它可与同属一个进程的其他的线程共享进程所拥有的全部资源.

    一个线程可以创建和撤销另一个线程;同一个进程中的多个线程之间可以并发执行.

1
同步锁(互斥锁):对某一段程序设置同步锁,在这段程序内禁止cpu切换,只能串行。

1. 线程继承

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# -*- coding: utf-8 -*-

"""
@Datetime: 2018/9/5
@Author: Zhang Yafei
"""
import threading
import time


class MyThread(threading.Thread):
def __init__(self, num):
threading.Thread.__init__(self)
self.num = num

def run(self): # 定义每个线程要运行的函数

print("running on number:%s" % self.num)

time.sleep(3)


if __name__ == '__main__':
t1 = MyThread(1)
t2 = MyThread(2)
t1.start()
t2.start()

print("ending......")

线程继承

2. 计算密集型任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# -*- coding: utf-8 -*-

"""
@Datetime: 2018/9/6
@Author: Zhang Yafei
"""
import time
from threading import Thread


def add():
sum = 0
i = 1
while i<=1000000:
sum += i
i += 1
print('sum:',sum)


def mul():
sum2 = 1
i = 1
while i<=100000:
sum2 = sum2 * i
i += 1
print('sum2:',sum2)


start = time.time()
t1 = Thread(target=add)
t2 = Thread(target=mul)

l = []
l.append(t1)
l.append(t2)

for t in l:
t.start()

for t in l:
t.join()
# add()
# mul() #串行比多线程还快

print('cost time %s'%(time.time()-start))

计算密集型任务

3. 同步锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# -*- coding: utf-8 -*-

"""
@Datetime: 2018/9/6
@Author: Zhang Yafei
"""
import threading
import time
from threading import Thread

num = 100


def sub():
global num
# num -= 1 #没有IO操作,计算型任务,多线程不起作用,cpu会按照时间轮询切换,而这个操作时间太短,打不倒轮询的时间
lock.acquire() #开启同步锁
temp = num
time.sleep(0.01) #IO阻塞,cpu会去执行其他线程,1秒时间足够长,所有线程拿到的temp都是100
num = temp-1
lock.release() #释放同步锁

l = []
lock = threading.Lock()
for i in range(100):
t = Thread(target=sub)
t.start()
l.append(t)

for t in l:
t.join()

print(num)

4. 递归锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# -*- coding: utf-8 -*-

"""
@Datetime: 2018/9/6
@Author: Zhang Yafei
"""
import threading
import time


class MyThread(threading.Thread):

def actionA(self):
r_lock.acquire()#acquire一次,计数加一,只要计数大于零,其他线程将不能获得这把锁,这就造成
print(self.name,'getA',time.ctime())
time.sleep(2)

r_lock.acquire()
print(self.name,'getB',time.ctime())
time.sleep(1)

r_lock.release()
r_lock.release()

def actionB(self):
r_lock.acquire()
print(self.name,'getB',time.ctime())
time.sleep(2)

r_lock.acquire()
print(self.name,'getA',time.ctime())
time.sleep(1)

r_lock.release()
r_lock.release()

def run(self):
self.actionA()
time.sleep(0.5)
self.actionB()


if __name__ == '__main__':
# A = threading.Lock()
# B = threading.Lock()

r_lock = threading.RLock()

L = []

for i in range(5):
t = MyThread()
t.start()
L.append(t)

for t in L:
t.join()

print('ending.....')

5. 同步对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# -*- coding: utf-8 -*-

"""
@Datetime: 2018/9/6
@Author: Zhang Yafei
"""
import threading,time


class Boss(threading.Thread):
def run(self):
print("BOSS:今晚大家都要加班到22:00。")
print(event.isSet()) #FALSE
event.set()
time.sleep(5)
print("BOSS:<22:00>可以下班了。")
print(event.isSet())
event.set() #event设定之后,员工的wait解除,可以向下执行


class Worker(threading.Thread):
def run(self):
event.wait() #一旦event被设定,等同于pass
print("Worker:哎……命苦啊!")
time.sleep(1)
event.clear() #清空event.set,目的是为了等待老板的一句话
event.wait()
print("Worker:OhYeah!")


if __name__=="__main__":
event=threading.Event()
threads=[]
for i in range(5):
threads.append(Worker())
threads.append(Boss())
for t in threads:
t.start()
for t in threads:
t.join()

print('ending.....')

6. 信号量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# -*- coding: utf-8 -*-

"""
@Datetime: 2018/9/6
@Author: Zhang Yafei
"""
import threading,time


class myThread(threading.Thread):
def run(self):
if semaphore.acquire():
print(self.name)
time.sleep(3)
semaphore.release()


if __name__=="__main__":
semaphore=threading.Semaphore(5) #允许同时运行多少个线程
thrs=[]
for i in range(100):
thrs.append(myThread())
for t in thrs:
t.start()

7. 队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
"""
@Datetime: 2018/9/6
@Author: Zhang Yafei
"""
"""列表是不安全的数据结构"""
"""
#先进先出 FIFO 队列是为了线程同步而使用的,多个线程共享变量
import threading,time

li=[1,2,3,4,5]

def pri():
while li:
a=li[-1]
print(a)
time.sleep(1)
try:
li.remove(a)
except Exception as e:
print('----',a,e)

t1=threading.Thread(target=pri,args=())
t1.start()
t2=threading.Thread(target=pri,args=())
t2.start()
"""


"""
import queue #线程队列

q = queue.Queue(3) #设置最大队列长度

q.put(12)
q.put('hello')
q.put({'name':'fei'})

q.put(34,block=False) #若添加不进去,报错,不阻塞
while True:
data = q.get(block=False) #若取不到值,不阻塞,报错
print(q.qsize())
print(data)
print('-----------')
if q.qsize() == 0:
break
"""


"""
#后进先出
import queue

q = queue.LifoQueue()

q.put(12)
q.put('hello')
q.put({'name':'fei'})

while True:
data = q.get()
print(data)
print('--------')
"""


#高优先级优先
import queue

q = queue.PriorityQueue(3)

q.put([3,12])
q.put([2,'hello'])
q.put([4,{'name':'fei'}])

print(q.qsize())
print(q.empty())
print(q.full())

while True:
data = q.get()
print(data[1])
print('--------')

8. 多线程一

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# -*- coding: utf-8 -*-

"""
@Datetime: 2018/9/5
@Author: Zhang Yafei
"""
import threading #线程
import time

def H(num):
print('hello {}'.format(num))
time.sleep(3)
print(time.time())


if __name__ == '__main__':
t1 = threading.Thread(target=H, args=(10,)) #创建一个线程对象t1 子线程
t1.start()

t2 = threading.Thread(target=H, args=(9,)) #创建一个线程对象t2 子线程
t2.start()

print('ending.......') #主线程
print(time.time())

9. 多线程二

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# -*- coding: utf-8 -*-

"""
@Datetime: 2018/9/5
@Author: Zhang Yafei
"""
import threading #线程
import time


def music():
print('begin to listen music {}'.format(time.ctime()))
time.sleep(3)
print('stop to listen music {}'.format(time.ctime()))


def game():
print('begin to play game {}'.format(time.ctime()))
time.sleep(5)
print('stop to play game {}'.format(time.ctime()))


if __name__ == '__main__':
music()
game()
print('ending.....')
# t1 = threading.Thread(target=music) #创建一个线程对象t1 子线程
# t2 = threading.Thread(target=game) #创建一个线程对象t2 子线程
#
# t1.start()
# t2.start()
#
# # t1.join() #等待子线程执行完 t1不执行完,谁也不准往下走
# t2.join()
#
# print('ending.......') #主线程
# print(time.ctime())

10. 多线程三

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# -*- coding: utf-8 -*-

"""
@Datetime: 2018/9/5
@Author: Zhang Yafei
"""
#http://www.cnblogs.com/yuanchenqi/articles/6248025.html
import threading
from time import ctime,sleep
import time


def ListenMusic(name):

print ("Begin listening to %s. %s" %(name,ctime()))
sleep(3) #sleep等同于IO操作
print("end listening %s"%ctime())


def RecordBlog(title):

print ("Begin recording the %s! %s" %(title,ctime()))
sleep(5)
print('end recording %s'%ctime())


threads = []


t1 = threading.Thread(target=ListenMusic,args=('水手',))
t2 = threading.Thread(target=RecordBlog,args=('python线程',))

threads.append(t1)
threads.append(t2)

11. 生产者消费者模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# -*- coding: utf-8 -*-

"""
@Datetime: 2018/9/6
@Author: Zhang Yafei
"""
import time,random
import queue,threading

q = queue.Queue()


def Producer(name):
count = 0
while count <10:
print("making........")
time.sleep(5)
q.put(count)
print('Producer %s has produced %s baozi..' %(name, count))
count +=1
# q.task_done() #给队列发信号,我做好了一个包子
q.join()
print("ok......")


def Consumer(name):
count = 0
while count <10:
time.sleep(random.randrange(4))
print('waiting......')
# q.join() #等待
# if not q.empty():
data = q.get()
print('eating....')
time.sleep(4)
q.task_done() #告诉厨师我吃完了一个包子
print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data))
# else:
# print("-----no baozi anymore----")
count +=1


p1 = threading.Thread(target=Producer, args=('厨师A',))
c1 = threading.Thread(target=Consumer, args=('B君',))
c2 = threading.Thread(target=Consumer, args=('C君',))
c3 = threading.Thread(target=Consumer, args=('D君',))
p1.start()
c1.start()
c2.start()
c3.start()

六、协程

线程和进程的操作是由程序触发系统接口,最后的执行者是系统;协程的操作则是程序员。

协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时(保存状态,下次继续)。协程,则只使用一个线程,在一个线程中规定某个代码块执行顺序。

协程的适用场景:当程序中存在大量不需要CPU的操作时(IO),适用于协程;

greenlet

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# -*- coding: utf-8 -*-

"""
@Datetime: 2018/9/6
@Author: Zhang Yafei
"""
from greenlet import greenlet


def test1():
print(12)
gr2.switch()
print(34)
gr2.switch()


def test2():
print(56)
gr1.switch()
print(78)


gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()

greenlet

gevent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# -*- coding: utf-8 -*-

"""
@Datetime: 2018/9/6
@Author: Zhang Yafei
"""
import gevent
import requests,time


start=time.time()

def f(url):
print('GET: %s' % url)
resp =requests.get(url)
data = resp.text
print('%d bytes received from %s.' % (len(data), url))

gevent.joinall([

gevent.spawn(f, 'https://www.python.org/'),
gevent.spawn(f, 'https://www.yahoo.com/'),
gevent.spawn(f, 'https://www.baidu.com/'),
gevent.spawn(f, 'https://www.sina.com.cn/'),

])
#
# f('https://www.python.org/')
#
# f('https://www.yahoo.com/')
#
# f('https://baidu.com/')
#
# f('https://www.sina.com.cn/')

print("cost time:",time.time()-start)

gevent

yield

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# -*- coding: utf-8 -*-

"""
@Datetime: 2018/9/6
@Author: Zhang Yafei
"""
import time
import queue


def consumer(name):
print("--->ready to eat baozi...")
while True:
new_baozi = yield
print("[%s] is eating baozi %s" % (name,new_baozi))
#time.sleep(1)


def producer():

r = con.__next__()
r = con2.__next__()
n = 0
while 1:
time.sleep(1)
print("\033[32;1m[producer]\033[0m is making baozi %s and %s" %(n,n+1) )
con.send(n)
con2.send(n+1)

n +=2


if __name__ == '__main__':
con = consumer("c1")
con2 = consumer("c2")
p = producer()

yield
赞赏一下吧~