poller.go (6712B)
1 // Package filenotify is adapted from https://github.com/moby/moby/tree/master/pkg/filenotify, Apache-2.0 License. 2 // Hopefully this can be replaced with an external package sometime in the future, see https://github.com/fsnotify/fsnotify/issues/9 3 package filenotify 4 5 import ( 6 "errors" 7 "fmt" 8 "os" 9 "path/filepath" 10 "sync" 11 "time" 12 13 "github.com/fsnotify/fsnotify" 14 ) 15 16 var ( 17 // errPollerClosed is returned when the poller is closed 18 errPollerClosed = errors.New("poller is closed") 19 // errNoSuchWatch is returned when trying to remove a watch that doesn't exist 20 errNoSuchWatch = errors.New("watch does not exist") 21 ) 22 23 // filePoller is used to poll files for changes, especially in cases where fsnotify 24 // can't be run (e.g. when inotify handles are exhausted) 25 // filePoller satisfies the FileWatcher interface 26 type filePoller struct { 27 // the duration between polls. 28 interval time.Duration 29 // watches is the list of files currently being polled, close the associated channel to stop the watch 30 watches map[string]struct{} 31 // Will be closed when done. 32 done chan struct{} 33 // events is the channel to listen to for watch events 34 events chan fsnotify.Event 35 // errors is the channel to listen to for watch errors 36 errors chan error 37 // mu locks the poller for modification 38 mu sync.Mutex 39 // closed is used to specify when the poller has already closed 40 closed bool 41 } 42 43 // Add adds a filename to the list of watches 44 // once added the file is polled for changes in a separate goroutine 45 func (w *filePoller) Add(name string) error { 46 w.mu.Lock() 47 defer w.mu.Unlock() 48 49 if w.closed { 50 return errPollerClosed 51 } 52 53 item, err := newItemToWatch(name) 54 if err != nil { 55 return err 56 } 57 if item.left.FileInfo == nil { 58 return os.ErrNotExist 59 } 60 61 if w.watches == nil { 62 w.watches = make(map[string]struct{}) 63 } 64 if _, exists := w.watches[name]; exists { 65 return fmt.Errorf("watch exists") 66 } 67 w.watches[name] = struct{}{} 68 69 go w.watch(item) 70 return nil 71 } 72 73 // Remove stops and removes watch with the specified name 74 func (w *filePoller) Remove(name string) error { 75 w.mu.Lock() 76 defer w.mu.Unlock() 77 return w.remove(name) 78 } 79 80 func (w *filePoller) remove(name string) error { 81 if w.closed { 82 return errPollerClosed 83 } 84 85 _, exists := w.watches[name] 86 if !exists { 87 return errNoSuchWatch 88 } 89 delete(w.watches, name) 90 return nil 91 } 92 93 // Events returns the event channel 94 // This is used for notifications on events about watched files 95 func (w *filePoller) Events() <-chan fsnotify.Event { 96 return w.events 97 } 98 99 // Errors returns the errors channel 100 // This is used for notifications about errors on watched files 101 func (w *filePoller) Errors() <-chan error { 102 return w.errors 103 } 104 105 // Close closes the poller 106 // All watches are stopped, removed, and the poller cannot be added to 107 func (w *filePoller) Close() error { 108 w.mu.Lock() 109 defer w.mu.Unlock() 110 111 if w.closed { 112 return nil 113 } 114 w.closed = true 115 close(w.done) 116 for name := range w.watches { 117 w.remove(name) 118 } 119 120 return nil 121 } 122 123 // sendEvent publishes the specified event to the events channel 124 func (w *filePoller) sendEvent(e fsnotify.Event) error { 125 select { 126 case w.events <- e: 127 case <-w.done: 128 return fmt.Errorf("closed") 129 } 130 return nil 131 } 132 133 // sendErr publishes the specified error to the errors channel 134 func (w *filePoller) sendErr(e error) error { 135 select { 136 case w.errors <- e: 137 case <-w.done: 138 return fmt.Errorf("closed") 139 } 140 return nil 141 } 142 143 // watch watches item for changes until done is closed. 144 func (w *filePoller) watch(item *itemToWatch) { 145 ticker := time.NewTicker(w.interval) 146 defer ticker.Stop() 147 148 for { 149 select { 150 case <-ticker.C: 151 case <-w.done: 152 return 153 } 154 155 evs, err := item.checkForChanges() 156 if err != nil { 157 if err := w.sendErr(err); err != nil { 158 return 159 } 160 } 161 162 item.left, item.right = item.right, item.left 163 164 for _, ev := range evs { 165 if err := w.sendEvent(ev); err != nil { 166 return 167 } 168 } 169 170 } 171 } 172 173 // recording records the state of a file or a dir. 174 type recording struct { 175 os.FileInfo 176 177 // Set if FileInfo is a dir. 178 entries map[string]os.FileInfo 179 } 180 181 func (r *recording) clear() { 182 r.FileInfo = nil 183 if r.entries != nil { 184 for k := range r.entries { 185 delete(r.entries, k) 186 } 187 } 188 } 189 190 func (r *recording) record(filename string) error { 191 r.clear() 192 193 fi, err := os.Stat(filename) 194 if err != nil && !os.IsNotExist(err) { 195 return err 196 } 197 198 if fi == nil { 199 return nil 200 } 201 202 r.FileInfo = fi 203 204 // If fi is a dir, we watch the files inside that directory (not recursively). 205 // This matches the behaviour of fsnotity. 206 if fi.IsDir() { 207 f, err := os.Open(filename) 208 if err != nil { 209 if os.IsNotExist(err) { 210 return nil 211 } 212 return err 213 } 214 defer f.Close() 215 216 fis, err := f.Readdir(-1) 217 if err != nil { 218 if os.IsNotExist(err) { 219 return nil 220 } 221 return err 222 } 223 224 for _, fi := range fis { 225 r.entries[fi.Name()] = fi 226 } 227 } 228 229 return nil 230 } 231 232 // itemToWatch may be a file or a dir. 233 type itemToWatch struct { 234 // Full path to the filename. 235 filename string 236 237 // Snapshots of the stat state of this file or dir. 238 left *recording 239 right *recording 240 } 241 242 func newItemToWatch(filename string) (*itemToWatch, error) { 243 r := &recording{ 244 entries: make(map[string]os.FileInfo), 245 } 246 err := r.record(filename) 247 if err != nil { 248 return nil, err 249 } 250 251 return &itemToWatch{filename: filename, left: r}, nil 252 253 } 254 255 func (item *itemToWatch) checkForChanges() ([]fsnotify.Event, error) { 256 if item.right == nil { 257 item.right = &recording{ 258 entries: make(map[string]os.FileInfo), 259 } 260 } 261 262 err := item.right.record(item.filename) 263 if err != nil && !os.IsNotExist(err) { 264 return nil, err 265 } 266 267 dirOp := checkChange(item.left.FileInfo, item.right.FileInfo) 268 269 if dirOp != 0 { 270 evs := []fsnotify.Event{fsnotify.Event{Op: dirOp, Name: item.filename}} 271 return evs, nil 272 } 273 274 if item.left.FileInfo == nil || !item.left.IsDir() { 275 // Done. 276 return nil, nil 277 } 278 279 leftIsIn := false 280 left, right := item.left.entries, item.right.entries 281 if len(right) > len(left) { 282 left, right = right, left 283 leftIsIn = true 284 } 285 286 var evs []fsnotify.Event 287 288 for name, fi1 := range left { 289 fi2 := right[name] 290 fil, fir := fi1, fi2 291 if leftIsIn { 292 fil, fir = fir, fil 293 } 294 op := checkChange(fil, fir) 295 if op != 0 { 296 evs = append(evs, fsnotify.Event{Op: op, Name: filepath.Join(item.filename, name)}) 297 } 298 299 } 300 301 return evs, nil 302 303 } 304 305 func checkChange(fi1, fi2 os.FileInfo) fsnotify.Op { 306 if fi1 == nil && fi2 != nil { 307 return fsnotify.Create 308 } 309 if fi1 != nil && fi2 == nil { 310 return fsnotify.Remove 311 } 312 if fi1 == nil && fi2 == nil { 313 return 0 314 } 315 if fi1.IsDir() || fi2.IsDir() { 316 return 0 317 } 318 if fi1.Mode() != fi2.Mode() { 319 return fsnotify.Chmod 320 } 321 if fi1.ModTime() != fi2.ModTime() || fi1.Size() != fi2.Size() { 322 return fsnotify.Write 323 } 324 325 return 0 326 }