poller.go (6712B)
1 // Package filenotify is adapted from https://github.com/moby/moby/tree/master/pkg/filenotify, Apache-2.0 License.
2 // Hopefully this can be replaced with an external package sometime in the future, see https://github.com/fsnotify/fsnotify/issues/9
3 package filenotify
4
5 import (
6 "errors"
7 "fmt"
8 "os"
9 "path/filepath"
10 "sync"
11 "time"
12
13 "github.com/fsnotify/fsnotify"
14 )
15
16 var (
17 // errPollerClosed is returned when the poller is closed
18 errPollerClosed = errors.New("poller is closed")
19 // errNoSuchWatch is returned when trying to remove a watch that doesn't exist
20 errNoSuchWatch = errors.New("watch does not exist")
21 )
22
23 // filePoller is used to poll files for changes, especially in cases where fsnotify
24 // can't be run (e.g. when inotify handles are exhausted)
25 // filePoller satisfies the FileWatcher interface
26 type filePoller struct {
27 // the duration between polls.
28 interval time.Duration
29 // watches is the list of files currently being polled, close the associated channel to stop the watch
30 watches map[string]struct{}
31 // Will be closed when done.
32 done chan struct{}
33 // events is the channel to listen to for watch events
34 events chan fsnotify.Event
35 // errors is the channel to listen to for watch errors
36 errors chan error
37 // mu locks the poller for modification
38 mu sync.Mutex
39 // closed is used to specify when the poller has already closed
40 closed bool
41 }
42
43 // Add adds a filename to the list of watches
44 // once added the file is polled for changes in a separate goroutine
45 func (w *filePoller) Add(name string) error {
46 w.mu.Lock()
47 defer w.mu.Unlock()
48
49 if w.closed {
50 return errPollerClosed
51 }
52
53 item, err := newItemToWatch(name)
54 if err != nil {
55 return err
56 }
57 if item.left.FileInfo == nil {
58 return os.ErrNotExist
59 }
60
61 if w.watches == nil {
62 w.watches = make(map[string]struct{})
63 }
64 if _, exists := w.watches[name]; exists {
65 return fmt.Errorf("watch exists")
66 }
67 w.watches[name] = struct{}{}
68
69 go w.watch(item)
70 return nil
71 }
72
73 // Remove stops and removes watch with the specified name
74 func (w *filePoller) Remove(name string) error {
75 w.mu.Lock()
76 defer w.mu.Unlock()
77 return w.remove(name)
78 }
79
80 func (w *filePoller) remove(name string) error {
81 if w.closed {
82 return errPollerClosed
83 }
84
85 _, exists := w.watches[name]
86 if !exists {
87 return errNoSuchWatch
88 }
89 delete(w.watches, name)
90 return nil
91 }
92
93 // Events returns the event channel
94 // This is used for notifications on events about watched files
95 func (w *filePoller) Events() <-chan fsnotify.Event {
96 return w.events
97 }
98
99 // Errors returns the errors channel
100 // This is used for notifications about errors on watched files
101 func (w *filePoller) Errors() <-chan error {
102 return w.errors
103 }
104
105 // Close closes the poller
106 // All watches are stopped, removed, and the poller cannot be added to
107 func (w *filePoller) Close() error {
108 w.mu.Lock()
109 defer w.mu.Unlock()
110
111 if w.closed {
112 return nil
113 }
114 w.closed = true
115 close(w.done)
116 for name := range w.watches {
117 w.remove(name)
118 }
119
120 return nil
121 }
122
123 // sendEvent publishes the specified event to the events channel
124 func (w *filePoller) sendEvent(e fsnotify.Event) error {
125 select {
126 case w.events <- e:
127 case <-w.done:
128 return fmt.Errorf("closed")
129 }
130 return nil
131 }
132
133 // sendErr publishes the specified error to the errors channel
134 func (w *filePoller) sendErr(e error) error {
135 select {
136 case w.errors <- e:
137 case <-w.done:
138 return fmt.Errorf("closed")
139 }
140 return nil
141 }
142
143 // watch watches item for changes until done is closed.
144 func (w *filePoller) watch(item *itemToWatch) {
145 ticker := time.NewTicker(w.interval)
146 defer ticker.Stop()
147
148 for {
149 select {
150 case <-ticker.C:
151 case <-w.done:
152 return
153 }
154
155 evs, err := item.checkForChanges()
156 if err != nil {
157 if err := w.sendErr(err); err != nil {
158 return
159 }
160 }
161
162 item.left, item.right = item.right, item.left
163
164 for _, ev := range evs {
165 if err := w.sendEvent(ev); err != nil {
166 return
167 }
168 }
169
170 }
171 }
172
173 // recording records the state of a file or a dir.
174 type recording struct {
175 os.FileInfo
176
177 // Set if FileInfo is a dir.
178 entries map[string]os.FileInfo
179 }
180
181 func (r *recording) clear() {
182 r.FileInfo = nil
183 if r.entries != nil {
184 for k := range r.entries {
185 delete(r.entries, k)
186 }
187 }
188 }
189
190 func (r *recording) record(filename string) error {
191 r.clear()
192
193 fi, err := os.Stat(filename)
194 if err != nil && !os.IsNotExist(err) {
195 return err
196 }
197
198 if fi == nil {
199 return nil
200 }
201
202 r.FileInfo = fi
203
204 // If fi is a dir, we watch the files inside that directory (not recursively).
205 // This matches the behaviour of fsnotity.
206 if fi.IsDir() {
207 f, err := os.Open(filename)
208 if err != nil {
209 if os.IsNotExist(err) {
210 return nil
211 }
212 return err
213 }
214 defer f.Close()
215
216 fis, err := f.Readdir(-1)
217 if err != nil {
218 if os.IsNotExist(err) {
219 return nil
220 }
221 return err
222 }
223
224 for _, fi := range fis {
225 r.entries[fi.Name()] = fi
226 }
227 }
228
229 return nil
230 }
231
232 // itemToWatch may be a file or a dir.
233 type itemToWatch struct {
234 // Full path to the filename.
235 filename string
236
237 // Snapshots of the stat state of this file or dir.
238 left *recording
239 right *recording
240 }
241
242 func newItemToWatch(filename string) (*itemToWatch, error) {
243 r := &recording{
244 entries: make(map[string]os.FileInfo),
245 }
246 err := r.record(filename)
247 if err != nil {
248 return nil, err
249 }
250
251 return &itemToWatch{filename: filename, left: r}, nil
252
253 }
254
255 func (item *itemToWatch) checkForChanges() ([]fsnotify.Event, error) {
256 if item.right == nil {
257 item.right = &recording{
258 entries: make(map[string]os.FileInfo),
259 }
260 }
261
262 err := item.right.record(item.filename)
263 if err != nil && !os.IsNotExist(err) {
264 return nil, err
265 }
266
267 dirOp := checkChange(item.left.FileInfo, item.right.FileInfo)
268
269 if dirOp != 0 {
270 evs := []fsnotify.Event{fsnotify.Event{Op: dirOp, Name: item.filename}}
271 return evs, nil
272 }
273
274 if item.left.FileInfo == nil || !item.left.IsDir() {
275 // Done.
276 return nil, nil
277 }
278
279 leftIsIn := false
280 left, right := item.left.entries, item.right.entries
281 if len(right) > len(left) {
282 left, right = right, left
283 leftIsIn = true
284 }
285
286 var evs []fsnotify.Event
287
288 for name, fi1 := range left {
289 fi2 := right[name]
290 fil, fir := fi1, fi2
291 if leftIsIn {
292 fil, fir = fir, fil
293 }
294 op := checkChange(fil, fir)
295 if op != 0 {
296 evs = append(evs, fsnotify.Event{Op: op, Name: filepath.Join(item.filename, name)})
297 }
298
299 }
300
301 return evs, nil
302
303 }
304
305 func checkChange(fi1, fi2 os.FileInfo) fsnotify.Op {
306 if fi1 == nil && fi2 != nil {
307 return fsnotify.Create
308 }
309 if fi1 != nil && fi2 == nil {
310 return fsnotify.Remove
311 }
312 if fi1 == nil && fi2 == nil {
313 return 0
314 }
315 if fi1.IsDir() || fi2.IsDir() {
316 return 0
317 }
318 if fi1.Mode() != fi2.Mode() {
319 return fsnotify.Chmod
320 }
321 if fi1.ModTime() != fi2.ModTime() || fi1.Size() != fi2.Size() {
322 return fsnotify.Write
323 }
324
325 return 0
326 }