Skip to content

Commit a091e49

Browse files
committed
Add notification ops and conversions
1 parent 54ad710 commit a091e49

5 files changed

Lines changed: 223 additions & 20 deletions

File tree

connection.go

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -369,18 +369,24 @@ func (c *Connection) readMessage() (*buffer.InMessage, error) {
369369
}
370370

371371
// Write the supplied message to the kernel.
372-
func (c *Connection) writeMessage(msg []byte) error {
373-
// Avoid the retry loop in os.File.Write.
374-
n, err := syscall.Write(int(c.dev.Fd()), msg)
375-
if err != nil {
376-
return err
372+
func (c *Connection) writeMessage(outMsg *buffer.OutMessage) error {
373+
var err error
374+
var n int
375+
expectedLen := outMsg.Len()
376+
if outMsg.Sglist != nil {
377+
n, err = writev(int(c.dev.Fd()), outMsg.Sglist)
378+
} else {
379+
// Avoid the retry loop in os.File.Write.
380+
n, err = syscall.Write(int(c.dev.Fd()), outMsg.OutHeaderBytes())
377381
}
378-
379-
if n != len(msg) {
380-
return fmt.Errorf("Wrote %d bytes; expected %d", n, len(msg))
382+
if err == nil && n != expectedLen {
383+
err = fmt.Errorf("Wrote %d bytes; expected %d", n, expectedLen)
381384
}
382-
383-
return nil
385+
if err != nil && c.errorLogger != nil {
386+
c.errorLogger.Printf("writeMessage: %v %v", err, outMsg.OutHeaderBytes())
387+
}
388+
outMsg.Sglist = nil
389+
return err
384390
}
385391

386392
// ReadOp consumes the next op from the kernel process, returning the op and a
@@ -510,19 +516,21 @@ func (c *Connection) Reply(ctx context.Context, opErr error) {
510516
noResponse := c.kernelResponse(outMsg, inMsg.Header().Unique, op, opErr)
511517

512518
if !noResponse {
513-
var err error
514-
if outMsg.Sglist != nil {
515-
_, err = writev(int(c.dev.Fd()), outMsg.Sglist)
516-
} else {
517-
err = c.writeMessage(outMsg.OutHeaderBytes())
518-
}
519-
if err != nil && c.errorLogger != nil {
520-
c.errorLogger.Printf("writeMessage: %v %v", err, outMsg.OutHeaderBytes())
521-
}
522-
outMsg.Sglist = nil
519+
c.writeMessage(outMsg)
523520
}
524521
}
525522

523+
// Send a notification to the kernel
524+
// notification must be a pointer to one of fuseops.NotifyXXX structures
525+
// To avoid a deadlock notifications must not be called in the execution path of a related filesytem operation or within any code that could hold a lock that could be needed to execute such an operation. As of kernel 4.18, a "related operation" is a lookup(), symlink(), mknod(), mkdir(), unlink(), rename(), link() or create() request for the parent, and a setattr(), unlink(), rmdir(), rename(), setxattr(), removexattr(), readdir() or readdirplus() request for the inode itself.
526+
func (c *Connection) Notify(notification interface{}) error {
527+
outMsg := c.getOutMessage()
528+
defer c.putOutMessage(outMsg)
529+
c.kernelNotification(outMsg, notification)
530+
outMsg.OutHeader().Len = uint32(outMsg.Len())
531+
return c.writeMessage(outMsg)
532+
}
533+
526534
// Close the connection. Must not be called until operations that were read
527535
// from the connection have been responded to.
528536
func (c *Connection) close() error {

conversions.go

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -704,6 +704,26 @@ func convertInMessage(
704704
OpContext: fuseops.OpContext{Pid: inMsg.Header().Pid},
705705
}
706706

707+
case fusekernel.OpNotifyReply:
708+
type input fusekernel.NotifyRetrieveIn
709+
in := (*input)(inMsg.Consume(unsafe.Sizeof(input{})))
710+
if in == nil {
711+
return nil, errors.New("Corrupt OpNotifyReply")
712+
}
713+
714+
buf := inMsg.ConsumeBytes(inMsg.Len())
715+
if len(buf) < int(in.Size) {
716+
return nil, errors.New("Corrupt OpNotifyReply")
717+
}
718+
719+
o = &fuseops.NotifyRetrieveReplyOp{
720+
Inode: fuseops.InodeID(inMsg.Header().Nodeid),
721+
Unique: inMsg.Header().Unique,
722+
Offset: in.Offset,
723+
Length: in.Size,
724+
OpContext: fuseops.OpContext{Pid: inMsg.Header().Pid},
725+
}
726+
707727
default:
708728
o = &unknownOp{
709729
OpCode: inMsg.Header().Opcode,
@@ -737,6 +757,9 @@ func (c *Connection) kernelResponse(
737757
case *fuseops.BatchForgetOp:
738758
return true
739759

760+
case *fuseops.NotifyRetrieveReplyOp:
761+
return true
762+
740763
case *interruptOp:
741764
return true
742765
}
@@ -964,13 +987,79 @@ func (c *Connection) kernelResponseForOp(
964987
out := (*fusekernel.PollOut)(m.Grow(int(unsafe.Sizeof(fusekernel.PollOut{}))))
965988
out.Revents = uint32(o.Revents)
966989

990+
case *fuseops.NotifyRetrieveReplyOp:
991+
// Empty response
992+
967993
default:
968994
panic(fmt.Sprintf("Unexpected op: %#v", op))
969995
}
970996

971997
return
972998
}
973999

1000+
// Like kernelResponse, but assumes the user replied with a nil error to the op.
1001+
func (c *Connection) kernelNotification(
1002+
m *buffer.OutMessage,
1003+
op interface{}) {
1004+
1005+
h := m.OutHeader()
1006+
h.Unique = 0
1007+
1008+
// Create the appropriate output message
1009+
switch o := op.(type) {
1010+
case *fuseops.NotifyPollWakeup:
1011+
h.Error = fusekernel.NotifyCodePoll
1012+
out := (*fusekernel.NotifyPollWakeupOut)(m.Grow(int(unsafe.Sizeof(fusekernel.NotifyPollWakeupOut{}))))
1013+
out.Kh = o.Kh
1014+
1015+
case *fuseops.NotifyInvalInode:
1016+
h.Error = fusekernel.NotifyCodeInvalInode
1017+
out := (*fusekernel.NotifyInvalInodeOut)(m.Grow(int(unsafe.Sizeof(fusekernel.NotifyInvalInodeOut{}))))
1018+
out.Ino = uint64(o.Inode)
1019+
out.Off = o.Offset
1020+
out.Len = o.Length
1021+
1022+
case *fuseops.NotifyInvalEntry:
1023+
h.Error = fusekernel.NotifyCodeInvalEntry
1024+
out := (*fusekernel.NotifyInvalEntryOut)(m.Grow(int(unsafe.Sizeof(fusekernel.NotifyInvalEntryOut{}))))
1025+
out.Parent = uint64(o.Parent)
1026+
out.Namelen = uint32(len(o.Name))
1027+
m.AppendString(o.Name)
1028+
m.AppendString("\x00")
1029+
1030+
case *fuseops.NotifyDelete:
1031+
h.Error = fusekernel.NotifyCodeDelete
1032+
out := (*fusekernel.NotifyDeleteOut)(m.Grow(int(unsafe.Sizeof(fusekernel.NotifyDeleteOut{}))))
1033+
out.Parent = uint64(o.Parent)
1034+
out.Child = uint64(o.Child)
1035+
out.Namelen = uint32(len(o.Name))
1036+
m.AppendString(o.Name)
1037+
m.AppendString("\x00")
1038+
1039+
case *fuseops.NotifyStore:
1040+
h.Error = fusekernel.NotifyCodeStore
1041+
out := (*fusekernel.NotifyStoreOut)(m.Grow(int(unsafe.Sizeof(fusekernel.NotifyStoreOut{}))))
1042+
out.Nodeid = uint64(o.Inode)
1043+
out.Offset = o.Offset
1044+
out.Size = o.Length
1045+
m.Append(o.Data...)
1046+
m.ShrinkTo(buffer.OutMessageHeaderSize + int(unsafe.Sizeof(fusekernel.NotifyStoreOut{})) + int(o.Length))
1047+
1048+
case *fuseops.NotifyRetrieve:
1049+
h.Error = fusekernel.NotifyCodeRetrieve
1050+
out := (*fusekernel.NotifyRetrieveOut)(m.Grow(int(unsafe.Sizeof(fusekernel.NotifyRetrieveOut{}))))
1051+
out.Unique = o.Unique
1052+
out.Nodeid = uint64(o.Inode)
1053+
out.Offset = o.Offset
1054+
out.Size = o.Length
1055+
1056+
default:
1057+
panic(fmt.Sprintf("Unexpected notification: %#v", op))
1058+
}
1059+
1060+
return
1061+
}
1062+
9741063
////////////////////////////////////////////////////////////////////////
9751064
// General conversions
9761065
////////////////////////////////////////////////////////////////////////

fuseops/ops.go

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -990,3 +990,102 @@ type PollOp struct {
990990
Revents fusekernel.PollEvents
991991
OpContext OpContext
992992
}
993+
994+
// Notify consumers waiting for poll/epoll that events are incoming
995+
// for the specified kernel handle. The kernel will send a PollOp request
996+
// to get the event mask after receiving this notification
997+
type NotifyPollWakeup struct {
998+
Kh uint64
999+
}
1000+
1001+
// Notify to invalidate cache for an inode.
1002+
//
1003+
// If the filesystem has writeback caching enabled, invalidating an inode
1004+
// will first trigger a writeback of all dirty pages. The call will block
1005+
// until all writeback requests have completed and the inode has been
1006+
// invalidated. It will, however, not wait for completion of pending writeback
1007+
// requests that have been issued before.
1008+
type NotifyInvalInode struct {
1009+
Inode InodeID
1010+
Offset int64
1011+
Length int64
1012+
}
1013+
1014+
// Notify to invalidate parent attributes and the dentry matching parent/name
1015+
//
1016+
// To avoid a deadlock this request must not be sent in the execution path
1017+
// of a related filesytem operation or within any code that could hold a lock
1018+
// that could be needed to execute such an operation. As of kernel 4.18, a
1019+
// "related operation" is a lookup(), symlink(), mknod(), mkdir(), unlink(),
1020+
// rename(), link() or create() request for the parent, and a setattr(),
1021+
// unlink(), rmdir(), rename(), setxattr(), removexattr(), readdir() or
1022+
// readdirplus() request for the inode itself.
1023+
//
1024+
// When called correctly, it will never block.
1025+
type NotifyInvalEntry struct {
1026+
Parent InodeID
1027+
Name string
1028+
}
1029+
1030+
// This request behaves like NotifyInvalEntry with the following additional
1031+
// effect (at least as of Linux kernel 4.8):
1032+
//
1033+
// If the provided child inode matches the inode that is currently associated
1034+
// with the cached dentry, and if there are any inotify watches registered for
1035+
// the dentry, then the watchers are informed that the dentry has been deleted.
1036+
//
1037+
// To avoid a deadlock this request must not be sent while executing a
1038+
// related filesytem operation or while holding a lock that could be needed to
1039+
// execute such an operation.
1040+
type NotifyDelete struct {
1041+
Parent InodeID
1042+
Child InodeID
1043+
Name string
1044+
}
1045+
1046+
// Store data to the kernel buffers
1047+
//
1048+
// Synchronously store data in the kernel buffers belonging to the given inode.
1049+
// The stored data is marked up-to-date (no read will be performed against it,
1050+
// unless it's invalidated or evicted from the cache).
1051+
//
1052+
// If the stored data overflows the current file size, then the size is extended,
1053+
// similarly to a write(2) on the filesystem.
1054+
//
1055+
// If this request returns an error, then the store wasn't fully completed, but
1056+
// it may have been partially completed.
1057+
type NotifyStore struct {
1058+
Inode InodeID
1059+
Offset uint64
1060+
Length uint32
1061+
Data [][]byte
1062+
}
1063+
1064+
// Retrieve data from the kernel buffers belonging to the given inode
1065+
//
1066+
// If successful then the kernel will send a NotifyRetrieveReplyOp as a reply.
1067+
// Only present pages are returned in the retrieve reply. Retrieving stops when it
1068+
// finds a non-present page and only data prior to that is returned.
1069+
//
1070+
// If this request returns an error, then the retrieve will not be completed and
1071+
// no reply will be sent.
1072+
//
1073+
// This request doesn't change the dirty state of pages in the kernel buffer. For
1074+
// dirty pages the write() method will be called regardless of having been retrieved
1075+
// previously.
1076+
type NotifyRetrieve struct {
1077+
Inode InodeID
1078+
Unique uint64
1079+
Offset uint64
1080+
Length uint32
1081+
}
1082+
1083+
// Matches the size of WriteIn
1084+
type NotifyRetrieveReplyOp struct {
1085+
Inode InodeID
1086+
Unique uint64
1087+
Offset uint64
1088+
Length uint32
1089+
1090+
OpContext OpContext
1091+
}

fuseutil/file_system.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ type FileSystem interface {
6565
Fallocate(context.Context, *fuseops.FallocateOp) error
6666
Poll(context.Context, *fuseops.PollOp) error
6767

68+
SetConnection(*fuse.Connection)
69+
6870
// Regard all inodes (including the root inode) as having their lookup counts
6971
// decremented to zero, and clean up any resources associated with the file
7072
// system. No further calls to the file system will be made.
@@ -96,6 +98,8 @@ type fileSystemServer struct {
9698
}
9799

98100
func (s *fileSystemServer) ServeOps(c *fuse.Connection) {
101+
s.fs.SetConnection(c)
102+
99103
// When we are done, we clean up by waiting for all in-flight ops then
100104
// destroying the file system.
101105
defer func() {

fuseutil/not_implemented_file_system.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,5 +210,8 @@ func (fs *NotImplementedFileSystem) Poll(
210210
return fuse.ENOSYS
211211
}
212212

213+
func (fs *NotImplementedFileSystem) SetConnection(*fuse.Connection) {
214+
}
215+
213216
func (fs *NotImplementedFileSystem) Destroy() {
214217
}

0 commit comments

Comments
 (0)