Python实现文件同步服务器的方法

Python实现文件同步服务器是如何来实现的呢?下面的内容将会通过具体的实例来演示Python实现文件同步服务器的实现方法及相关技巧:

本文实例讲述了python实现的文件同步服务器。分享给大家供大家参考。具体实现方法如下:

服务端使用asyncore, 收到文件后保存到本地。

客户端使用pyinotify监视目录的变化 ,把变动的文件发送到服务端。

重点:

1. 使用structs打包发送文件的信息,服务端收到后,根据文件信息来接收客户端传送过来的文件。

2. 客户端使用多线程,pyinotify监视到文件变化,放到队列中,由另外一个线程发送。

上代码:

服务端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# receive file from client and store them into file use asyncore.#
#/usr/bin/python
#coding: utf-8
import asyncore
import socket
from socket import errno
import logging
import time
import sys
import struct
import os
import fcntl
import threading
from rrd_graph import MakeGraph
try:
  import rrdtool
except (ImportError, ImportWarnning):
  print "Hope this information can help you:"
  print "Can not find pyinotify module in sys path, just run [apt-get install python-rrdtool] in ubuntu."
  sys.exit(1)
class RequestHandler(asyncore.dispatcher):
  def __init__(self, sock, map=None, chunk_size=1024):
    self.logger = logging.getLogger('%s-%s' % (self.__class__.__name__, str(sock.getsockname())))
    self.chunk_size = chunk_size
    asyncore.dispatcher.__init__(self,sock,map)
    self.data_to_write = list()
  def readable(self):
    #self.logger.debug("readable() called.")
    return True
  def writable(self):
    response = (not self.connected) or len(self.data_to_write)
    #self.logger.debug('writable() -> %s data length -> %s' % (response, len(self.data_to_write)))
    return response
  def handle_write(self):
    data = self.data_to_write.pop()
    #self.logger.debug("handle_write()->%s size: %s",data.rstrip('rn'),len(data))
    sent = self.send(data[:self.chunk_size])
    if sent < len(data):
      remaining = data[sent:]
      self.data_to_write.append(remaining)
  def handle_read(self):
    self.writen_size = 0
    nagios_perfdata = '../perfdata'
    head_packet_format = "!LL128s128sL"
    head_packet_size = struct.calcsize(head_packet_format)
    data = self.recv(head_packet_size)
    if not data:
      return
    filepath_len, filename_len, filepath,filename, filesize = struct.unpack(head_packet_format,data)
    filepath = os.path.join(nagios_perfdata, filepath[:filepath_len])
    filename = filename[:filename_len]
    self.logger.debug("update file: %s" % filepath + '/' + filename)
    try:
      if not os.path.exists(filepath):
        os.makedirs(filepath)
    except OSError:
      pass
    self.fd = open(os.path.join(filepath,filename), 'w')
    #self.fd = open(filename,'w')
    if filesize > self.chunk_size:
      times = filesize / self.chunk_size
      first_part_size = times * self.chunk_size
      second_part_size = filesize % self.chunk_size
      while 1:
        try:
          data = self.recv(self.chunk_size)
          #self.logger.debug("handle_read()->%s size.",len(data))
        except socket.error,e:
          if e.args[0] == errno.EWOULDBLOCK:
            print "EWOULDBLOCK"
            time.sleep(1)
          else:
            #self.logger.debug("Error happend while receive data: %s" % e)
            break
        else:
          self.fd.write(data)
          self.fd.flush()
          self.writen_size += len(data)
          if self.writen_size == first_part_size:
            break
      #receive the packet at last
      while 1:
        try:
          data = self.recv(second_part_size)
          #self.logger.debug("handle_read()->%s size.",len(data))
        except socket.error,e:
          if e.args[0] == errno.EWOULDBLOCK:
            print "EWOULDBLOCK"
            time.sleep(1)
          else:
            #self.logger.debug("Error happend while receive data: %s" % e)
            break
        else:
          self.fd.write(data)
          self.fd.flush()
          self.writen_size += len(data)
          if len(data) == second_part_size:
            break
    elif filesize <= self.chunk_size:
      while 1:
        try:
          data = self.recv(filesize)
          #self.logger.debug("handle_read()->%s size.",len(data))
        except socket.error,e:
          if e.args[0] == errno.EWOULDBLOCK:
            print "EWOULDBLOCK"
            time.sleep(1)
          else:
            #self.logger.debug("Error happend while receive data: %s" % e)
            break
        else:
          self.fd.write(data)
          self.fd.flush()
          self.writen_size += len(data)
          if len(data) == filesize:
            break
    self.logger.debug("File size: %s" % self.writen_size)
class SyncServer(asyncore.dispatcher):
  def __init__(self,host,port):
    asyncore.dispatcher.__init__(self)
    self.debug = True
    self.logger = logging.getLogger(self.__class__.__name__)
    self.create_socket(socket.AF_INET,socket.SOCK_STREAM)
    self.set_reuse_addr()
    self.bind((host,port))
    self.listen(2000)
  def handle_accept(self):
    client_socket = self.accept()
    if client_socket is None:
      pass
    else:
      sock, addr = client_socket
      #self.logger.debug("Incoming connection from %s" % repr(addr))
      handler = RequestHandler(sock=sock)
class RunServer(threading.Thread):
  def __init__(self):
    super(RunServer,self).__init__()
    self.daemon = False
  def run(self):
    server = SyncServer('',9999)
    asyncore.loop(use_poll=True)
def StartServer():
  logging.basicConfig(level=logging.DEBUG,
            format='%(name)s: %(message)s',
            )
  RunServer().start()
  #MakeGraph().start()
if __name__ == '__main__':
  StartServer()

客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# monitor path with inotify(python module), and send them to remote server.#
# use sendfile(2) instead of send function in socket, if we have python-sendfile installed.#
import socket
import time
import os
import sys
import struct
import threading
import Queue
try:
   import pyinotify
except (ImportError, ImportWarnning):
   print "Hope this information can help you:"
   print "Can not find pyinotify module in sys path, just run [apt-get install python-pyinotify] in ubuntu."
   sys.exit(1)
try:
   from sendfile import sendfile
except (ImportError,ImportWarnning):
   pass
filetype_filter = [".rrd",".xml"]
def check_filetype(pathname):
   for suffix_name in filetype_filter:
     if pathname[-4:] == suffix_name:
       return True
   try:
     end_string = pathname.rsplit('.')[-1:][0]
     end_int = int(end_string)
   except:
     pass
   else:
     # means pathname endwith digit
     return False
class sync_file(threading.Thread):
   def __init__(self, addr, events_queue):
     super(sync_file,self).__init__()
     self.daemon = False
     self.queue = events_queue
     self.addr = addr
     self.chunk_size = 1024
   def run(self):
     while 1:
       event = self.queue.get()
       if check_filetype(event.pathname):
         print time.asctime(),event.maskname, event.pathname
         filepath = event.path.split('/')[-1:][0]
         filename = event.name
         filesize = os.stat(os.path.join(event.path, filename)).st_size
         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
         filepath_len = len(filepath)
         filename_len = len(filename)
         sock.connect(self.addr)
         offset = 0
         data = struct.pack("!LL128s128sL",filepath_len, filename_len, filepath,filename,filesize)
         fd = open(event.pathname,'rb')
         sock.sendall(data)
         if "sendfile" in sys.modules:
           # print "use sendfile(2)"
           while 1:
             sent = sendfile(sock.fileno(), fd.fileno(), offset, self.chunk_size)
             if sent == 0:
               break
             offset += sent
         else:
           # print "use original send function"
           while 1:
             data = fd.read(self.chunk_size)
             if not data: break
             sock.send(data)
         sock.close()
         fd.close()
class EventHandler(pyinotify.ProcessEvent):
   def __init__(self, events_queue):
     super(EventHandler,self).__init__()
     self.events_queue = events_queue
   def my_init(self):
     pass
   def process_IN_CLOSE_WRITE(self,event):
     self.events_queue.put(event)
   def process_IN_MOVED_TO(self,event):
     self.events_queue.put(event)
def start_notify(path, mask, sync_server):
   events_queue = Queue.Queue()
   sync_thread_pool = list()
   for i in range(500):
     sync_thread_pool.append(sync_file(sync_server, events_queue))
   for i in sync_thread_pool:
     i.start()
   wm = pyinotify.WatchManager()
   notifier = pyinotify.Notifier(wm,EventHandler(events_queue))
   wdd = wm.add_watch(path,mask,rec=True)
   notifier.loop()
def do_notify():
   perfdata_path = '/var/lib/pnp4nagios/perfdata'
   mask = pyinotify.IN_CLOSE_WRITE|pyinotify.IN_MOVED_TO
   sync_server = ('127.0.0.1',9999)
   start_notify(perfdata_path,mask,sync_server)
if __name__ == '__main__':
   do_notify()

python监视线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
#!/usr/bin/python
import threading
import time
class Monitor(threading.Thread):
  def __init__(self, *args,**kwargs):
    super(Monitor,self).__init__()
    self.daemon = False
    self.args = args
    self.kwargs = kwargs
    self.pool_list = []
  def run(self):
    print self.args
    print self.kwargs
    for name,value in self.kwargs.items():
      obj = value[0]
      temp = {}
      temp[name] = obj
      self.pool_list.append(temp)
    while 1:
      print self.pool_list
      for name,value in self.kwargs.items():
        obj = value[0]
        parameters = value[1:]
        died_threads = self.cal_died_thread(self.pool_list,name)
        print "died_threads", died_threads
        if died_threads >0:
          for i in range(died_threads):
            print "start %s thread..." % name
            t = obj[0].__class__(*parameters)
            t.start()
            self.add_to_pool_list(t,name)
        else:
          break
      time.sleep(0.5)
  def cal_died_thread(self,pool_list,name):
    i = 0
    for item in self.pool_list:
      for k,v in item.items():
        if name == k:
          lists = v
    for t in lists:
      if not t.isAlive():
        self.remove_from_pool_list(t)
        i +=1
    return i
  def add_to_pool_list(self,obj,name):
    for item in self.pool_list:
      for k,v in item.items():
        if name == k:
          v.append(obj)
  def remove_from_pool_list(self, obj):
    for item in self.pool_list:
      for k,v in item.items():
        try:
          v.remove(obj)
        except:
          pass
        else:
          return

使用方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
rrds_queue = Queue.Queue()
  make_rrds_pool = []
  for i in range(5):
    make_rrds_pool.append(MakeRrds(rrds_queue))
  for i in make_rrds_pool:
    i.start()
  make_graph_pool = []
  for i in range(5):
    make_graph_pool.append(MakeGraph(rrds_queue))
  for i in make_graph_pool:
    i.start()
  monitor = Monitor(make_rrds_pool=(make_rrds_pool, rrds_queue),  
           make_graph_pool=(make_graph_pool, rrds_queue))
  monitor.start()

解析:

1. 接受字典参数,value为一个元组,第一个元素是线程池,后面的都是参数。

2. 每0.5秒监视线程池中的线程数量,如果线程死掉了,记录死掉线程的数目,再启动同样数量的线程。

3. 如果没有线程死去,则什么也不做。

从外部调用Django模块

1
2
3
4
5
6
7
8
import os
import sys
sys.path.insert(0,'/data/cloud_manage')
from django.core.management import setup_environ
import settings
setup_environ(settings)
from common.monitor import Monitor
from django.db import connection, transaction

前提就是,要新建一个django的project,这里我们新建了一个cloud_manage.

这样不仅可以调用django自身的模块,还能调用project本身的东西。

Python实现文件同步服务器就是这样,欢迎大家参考。。。。

Sidebar