/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #define FUSE_USE_VERSION 26 #ifdef HAVE_CONFIG_H #include #endif #ifdef linux /* For pread()/pwrite() */ #define _XOPEN_SOURCE 500 #endif #include #include #include #include #include #include #include #include #ifdef HAVE_SETXATTR #include #endif #include // for ceil #include #include #include #include #include // Constants // static const int default_id = 99; // nobody - not configurable since soon uids in dfs, yeah! static const size_t rd_buf_size = 32768; static const int nowrites = 0; static const int do_trash = 0; // // Structure to store fuse_dfs specific data // this will be created and passed to fuse at startup // and fuse will pass it back to us via the context function // on every operation. // typedef struct dfs_context_struct { int debug; char *nn_hostname; int nn_port; hdfsFS fs; // todo: // total hack city - use this to strip off the dfs url from the filenames // that the dfs API is now providing in 0.14.5 // Will do a better job of fixing this once I am back from vacation // char dfs_uri[1024]; int dfs_uri_len; } dfs_context; // // Start of read-only functions // static int dfs_getattr(const char *path, struct stat *st) { // retrieve dfs specific data dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data; // check params and the context var assert(dfs); assert(path); assert(st); // if not connected, try to connect and fail out if we can't. if (NULL == dfs->fs && NULL == (dfs->fs = hdfsConnect(dfs->nn_hostname,dfs->nn_port))) { syslog(LOG_ERR, "ERROR: could not connect to dfs %s:%d\n", __FILE__, __LINE__); return -EIO; } syslog(LOG_DEBUG,"in dfs_getattr(%s)",path); // call the dfs API to get the actual information hdfsFileInfo *info = hdfsGetPathInfo(dfs->fs,path); if (NULL == info) { return -ENOENT; } // initialize the stat structure memset(st, 0, sizeof(struct stat)); // setup hard link info - for a file it is 1 else num entries in a dir + 2 (for . and ..) if (info[0].mKind == kObjectKindDirectory) { int numEntries = 0; hdfsFileInfo *info = hdfsListDirectory(dfs->fs,path,&numEntries); if (info) { hdfsFreeFileInfo(info,numEntries); } st->st_nlink = numEntries + 2; } else { // not a directory st->st_nlink = 1; } // set stat metadata st->st_size = (info[0].mKind == kObjectKindDirectory) ? 4096 : info[0].mSize; st->st_blksize = 512; st->st_blocks = ceil(st->st_size/st->st_blksize); st->st_mode = (info[0].mKind == kObjectKindDirectory) ? (S_IFDIR | 0777) : (S_IFREG | 0666); st->st_uid = default_id; st->st_gid = default_id; st->st_atime = info[0].mCreationTime; st->st_mtime = info[0].mCreationTime; st->st_ctime = info[0].mCreationTime; // free the info pointer hdfsFreeFileInfo(info,1); return 0; } static int dfs_readdir(const char *path, void *buf, fuse_fill_dir_t filler, off_t offset, struct fuse_file_info *fi) { (void) offset; (void) fi; // retrieve dfs specific data dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data; // check params and the context var assert(dfs); assert(path); assert(buf); // if not connected, try to connect and fail out if we can't. if (NULL == dfs->fs && NULL == (dfs->fs = hdfsConnect(dfs->nn_hostname,dfs->nn_port))) { syslog(LOG_ERR, "ERROR: could not connect to dfs %s:%d\n", __FILE__, __LINE__); return -EIO; } int path_len = strlen(path); // call dfs to read the dir int numEntries = 0; hdfsFileInfo *info = hdfsListDirectory(dfs->fs,path,&numEntries); // NULL means either the directory doesn't exist or maybe IO error. if (NULL == info) { return -ENOENT; } int i ; for (i = 0; i < numEntries; i++) { // check the info[i] struct if (NULL == info[i].mName) { syslog(LOG_ERR,"ERROR: for <%s> info[%d].mName==NULL %s:%d", path, i, __FILE__,__LINE__); continue; } struct stat st; memset(&st, 0, sizeof(struct stat)); // set to 0 to indicate not supported for directory because we cannot (efficiently) get this info for every subdirectory st.st_nlink = (info[i].mKind == kObjectKindDirectory) ? 0 : 1; // setup stat size and acl meta data st.st_size = info[i].mSize; st.st_blksize = 512; st.st_blocks = ceil(st.st_size/st.st_blksize); st.st_mode = (info[i].mKind == kObjectKindDirectory) ? (S_IFDIR | 0777) : (S_IFREG | 0666); st.st_uid = default_id; st.st_gid = default_id; st.st_atime = info[i].mCreationTime; st.st_mtime = info[i].mCreationTime; st.st_ctime = info[i].mCreationTime; // hack city: todo fix the below to something nicer and more maintainable but // with good performance // strip off the path but be careful if the path is solely '/' // NOTE - this API started returning filenames as full dfs uris const char *const str = info[i].mName + dfs->dfs_uri_len + path_len + ((path_len == 1 && *path == '/') ? 0 : 1); // pack this entry into the fuse buffer int res = 0; if ((res = filler(buf,str,&st,0)) != 0) { syslog(LOG_ERR, "ERROR: readdir filling the buffer %d %s:%d\n",res, __FILE__, __LINE__); } } // insert '.' and '..' const char *const dots [] = { ".",".."}; for (i = 0 ; i < 2 ; i++) { struct stat st; memset(&st, 0, sizeof(struct stat)); // set to 0 to indicate not supported for directory because we cannot (efficiently) get this info for every subdirectory st.st_nlink = 0; // setup stat size and acl meta data st.st_size = 512; st.st_blksize = 512; st.st_blocks = 1; st.st_mode = (S_IFDIR | 0777); st.st_uid = default_id; st.st_gid = default_id; // todo fix below times st.st_atime = 0; st.st_mtime = 0; st.st_ctime = 0; const char *const str = dots[i]; // flatten the info using fuse's function into a buffer int res = 0; if ((res = filler(buf,str,&st,0)) != 0) { syslog(LOG_ERR, "ERROR: readdir filling the buffer %d %s:%d", res, __FILE__, __LINE__); } } // free the info pointers hdfsFreeFileInfo(info,numEntries); return 0; } static int dfs_read(const char *path, char *buf, size_t size, off_t offset, struct fuse_file_info *fi) { // retrieve dfs specific data dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data; syslog(LOG_DEBUG,"in dfs_read"); // check params and the context var assert(dfs); assert(path); assert(buf); // if not connected, try to connect and fail out if we can't. if (NULL == dfs->fs && NULL == (dfs->fs = hdfsConnect(dfs->nn_hostname,dfs->nn_port))) { syslog(LOG_ERR, "ERROR: could not connect to dfs %s:%d\n", __FILE__, __LINE__); return -EIO; } // open the file with our read buffer size which should be large. hdfsFile fh = hdfsOpenFile(dfs->fs, path, O_RDONLY, rd_buf_size, 0, 0); // NULL means either file doesn't exist or maybe IO error. if (NULL == fh) { return -ENOENT; } // do the actual read const tSize num_read = hdfsPread(dfs->fs, fh, offset, buf, size); // handle errors if (num_read < 0) { syslog(LOG_ERR, "Read error - pread failed for %s with return code %d %s:%d", path, num_read, __FILE__, __LINE__); hdfsDisconnect(dfs->fs); dfs->fs = NULL; return -EIO; } hdfsCloseFile(dfs->fs, fh); return num_read; } static int dfs_statfs(const char *path, struct statvfs *st) { // retrieve dfs specific data dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data; // check params and the context var assert(path); assert(st); assert(dfs); // init the stat structure memset(st,0,sizeof(struct statvfs)); // if not connected, try to connect and fail out if we can't. if (NULL == dfs->fs && NULL == (dfs->fs = hdfsConnect(dfs->nn_hostname,dfs->nn_port))) { syslog(LOG_ERR, "ERROR: could not connect to dfs %s:%d\n", __FILE__, __LINE__); return -EIO; } const long cap = hdfsGetCapacity(dfs->fs); const long used = hdfsGetUsed(dfs->fs); const long bsize = hdfsGetDefaultBlockSize(dfs->fs); // fill in the statvfs structure /* FOR REFERENCE: struct statvfs { unsigned long f_bsize; // file system block size unsigned long f_frsize; // fragment size fsblkcnt_t f_blocks; // size of fs in f_frsize units fsblkcnt_t f_bfree; // # free blocks fsblkcnt_t f_bavail; // # free blocks for non-root fsfilcnt_t f_files; // # inodes fsfilcnt_t f_ffree; // # free inodes fsfilcnt_t f_favail; // # free inodes for non-root unsigned long f_fsid; // file system id unsigned long f_flag; / mount flags unsigned long f_namemax; // maximum filename length }; */ st->f_bsize = bsize; st->f_frsize = st->f_bsize; st->f_blocks = cap/st->f_bsize; st->f_bfree = (cap-used)/st->f_bsize; st->f_bavail = st->f_bfree; st->f_files = 1000; st->f_ffree = 500; st->f_favail = 500; st->f_fsid = 1023; st->f_flag = ST_RDONLY | ST_NOSUID; st->f_namemax = 1023; return 0; } static int dfs_access(const char *path, int mask) { // no permissions on dfs, always a success return 0; } // // The remainder are write functionality and therefore not implemented right now // static const char * const badpaths[] = { /* "/", "/user", "/user/", "/user/facebook", "/user/facebook/", "/user/facebook/warehouse", "/Trash/", */ NULL }; static int dfs_mkdir(const char *path, mode_t mode) { // retrieve dfs specific data dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data; // check params and the context var assert(path); assert(dfs); // if not connected, try to connect and fail out if we can't. if (NULL == dfs->fs && NULL == (dfs->fs = hdfsConnect(dfs->nn_hostname,dfs->nn_port))) { syslog(LOG_ERR, "ERROR: could not connect to dfs %s:%d\n", __FILE__, __LINE__); return -EIO; } assert('/' == *path); int i ; for(i = 0; badpaths[i]; i++) { if(strcmp(path, badpaths[i]) == 0) { syslog(LOG_ERR,"ERROR: hdfs trying to create the directory: %s", path); return -EACCES; } } if(nowrites || hdfsCreateDirectory(dfs->fs, path)) { syslog(LOG_ERR,"ERROR: hdfs trying to create directory %s",path); return -EIO; } return 0; } static int dfs_rename(const char *from, const char *to) { // retrieve dfs specific data dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data; // check params and the context var assert(from); assert(to); assert(dfs); // if not connected, try to connect and fail out if we can't. if (NULL == dfs->fs && NULL == (dfs->fs = hdfsConnect(dfs->nn_hostname,dfs->nn_port))) { syslog(LOG_ERR, "ERROR: could not connect to dfs %s:%d\n", __FILE__, __LINE__); return -EIO; } assert('/' == *from); assert('/' == *to); int i ; for(i = 0; badpaths[i] != NULL; i++) { if(strcmp(from, badpaths[i]) == 0) { syslog(LOG_ERR,"ERROR: hdfs trying to rename directories %s to %s",from,to); return -EACCES; } if(strcmp(to, badpaths[i]) == 0) { syslog(LOG_ERR,"ERROR: hdfs trying to rename directories %s to %s",from,to); return -EACCES; } } if(nowrites || hdfsRename(dfs->fs, from, to)) { syslog(LOG_ERR,"ERROR: hdfs trying to rename %s to %s",from, to); return -EIO; } return 0; } static int dfs_rmdir(const char *path) { // retrieve dfs specific data dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data; // check params and the context var assert(path); assert(dfs); // if not connected, try to connect and fail out if we can't. if (NULL == dfs->fs && NULL == (dfs->fs = hdfsConnect(dfs->nn_hostname,dfs->nn_port))) { syslog(LOG_ERR, "ERROR: could not connect to dfs %s:%d\n", __FILE__, __LINE__); return -EIO; } assert('/' == *path); int i ; for(i = 0; badpaths[i]; i++) { if(strcmp(path, badpaths[i]) == 0) { syslog(LOG_ERR,"ERROR: hdfs trying to delete the directory: %s ",path); return -EACCES; } } int numEntries = 0; hdfsFileInfo *info = hdfsListDirectory(dfs->fs,path,&numEntries); // free the info pointers hdfsFreeFileInfo(info,numEntries); if(numEntries) { return -ENOTEMPTY; } // since these commands go through the programmatic hadoop API, there is no // trash feature. So, force it here. // But make sure the person isn't deleting from Trash itself :) // NOTE: /Trash is in badpaths so they cannot delete all of trash if(do_trash && strncmp(path, "/Trash", strlen("/Trash")) != 0) { char target[4096]; char dir[4096]; int status; { // find the directory and full targets in Trash sprintf(target, "/Trash/Current%s",path); // strip off the actual file or directory name from the fullpath char *name = rindex(path, '/'); assert(name); *name = 0; // use that path to ensure the directory exists in the Trash dir // prepend Trash to the directory sprintf(dir,"/Trash/Current%s/",path); // repair the path not used again but in case the caller expects it. *name = '/'; } // if the directory doesn't already exist in the Trash // then we go through with the rename if( hdfsExists(dfs->fs, target) != 0) { // 0 means it exists. weird // make the directory to put it in in the Trash if((status = dfs_mkdir(dir,0)) != 0) { return status; } // do the rename return dfs_rename(path,target); } // if the directory exists in the Trash, then we don't bother doing the rename // and just delete the existing one by falling though. } if(nowrites || hdfsDelete(dfs->fs, path)) { syslog(LOG_ERR,"ERROR: hdfs trying to delete the directory %s",path); return -EIO; } return 0; } static int dfs_unlink(const char *path) { // retrieve dfs specific data dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data; // check params and the context var assert(path); assert(dfs); // if not connected, try to connect and fail out if we can't. if (NULL == dfs->fs && NULL == (dfs->fs = hdfsConnect(dfs->nn_hostname,dfs->nn_port))) { syslog(LOG_ERR, "ERROR: could not connect to dfs %s:%d\n", __FILE__, __LINE__); return -EIO; } assert('/' == *path); int i ; for(i = 0; badpaths[i]; i++) { if(strcmp(path, badpaths[i]) == 0) { syslog(LOG_ERR,"ERROR: hdfs trying to delete the directory: %s ",path); return -EACCES; } } // since these commands go through the programmatic hadoop API, there is no // trash feature. So, force it here. // But make sure the person isn't deleting from Trash itself :) // NOTE: /Trash is in badpaths so they cannot delete all of trash if(do_trash && strncmp(path, "/Trash", strlen("/Trash")) != 0) { char target[4096]; char dir[4096]; int status; { // find the directory and full targets in Trash sprintf(target, "/Trash/Current%s",path); // strip off the actual file or directory name from the fullpath char *name = rindex(path, '/'); assert(name); *name = 0; // use that path to ensure the directory exists in the Trash dir // prepend Trash to the directory sprintf(dir,"/Trash/Current%s/",path); // repair the path not used again but in case the caller expects it. *name = '/'; } // if this is a file and it's already got a copy in the Trash, to be conservative, we // don't do the delete. if(hdfsExists(dfs->fs, target) == 0) { syslog(LOG_ERR,"ERROR: hdfs trying to delete a file that was already deleted so cannot back it to Trash: ",target); return -EIO; } // make the directory to put it in in the Trash if((status = dfs_mkdir(dir,0)) != 0) { return status; } // do the rename return dfs_rename(path,target); } if(nowrites || hdfsDelete(dfs->fs, path)) { syslog(LOG_ERR,"ERROR: hdfs trying to delete the file %s",path); return -EIO; } return 0; } static int dfs_chmod(const char *path, mode_t mode) { return -ENOTSUP; } static int dfs_chown(const char *path, uid_t uid, gid_t gid) { return -ENOTSUP; } static int dfs_truncate(const char *path, off_t size) { return -ENOTSUP; } static int dfs_open(const char *path, struct fuse_file_info *fi) { return 0; #if 0 dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data; // check params and the context var assert(path); assert('/' == *path); assert(dfs); syslog(LOG_DEBUG,"in dfs_open"); int ret = 0; // if not connected, try to connect and fail out if we can't. if (NULL == dfs->fs && NULL == (dfs->fs = hdfsConnect(dfs->nn_hostname,dfs->nn_port))) { syslog(LOG_ERR, "ERROR: could not connect to dfs %s:%d\n", __FILE__, __LINE__); return -EIO; } if(fi->flags & O_CREAT) { // retrieve dfs specific data fi->fh = (uint64_t)hdfsOpenFile(dfs->fs, path, O_CREAT, 0, 3, 0); if(0 == fi->fh) { syslog(LOG_ERR, "ERROR: could not open file %s dfs %s:%d\n", path,__FILE__, __LINE__); ret = -EIO; } syslog(LOG_DEBUG,"opened the file it is: %d",fi->fh); } else if(fi->flags & O_RDONLY) { if(hdfsExists(dfs->fs,path) != 0) { // file doesn't exist, how can I open it? I'm not a magician ret = -ENOENT; } } else { // some other form of write syslog(LOG_ERR, "ERROR: trying to open non create file not ro %s %s:%d\n", path,__FILE__, __LINE__); ret = -EIO; } return ret; #endif } #if 0 static int dfs_write(const char *path, const char *buf, size_t size, off_t offset, struct fuse_file_info *fi) { // retrieve dfs specific data dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data; syslog(LOG_DEBUG,"in dfs_write"); // check params and the context var assert(path); assert(dfs); assert('/' == *path); // if not connected, try to connect and fail out if we can't. if (NULL == dfs->fs && NULL == (dfs->fs = hdfsConnect(dfs->nn_hostname,dfs->nn_port))) { syslog(LOG_ERR, "ERROR: could not connect to dfs %s:%d\n", __FILE__, __LINE__); return -EIO; } hdfsFile file_handle = (hdfsFile)fi->fh; if(NULL == file_handle) { syslog(LOG_ERR, "ERROR: fuse problem - no file_handle for %s %s:%d\n",path, __FILE__, __LINE__); return -EIO; } tOffset cur_offset = hdfsTell(dfs->fs, file_handle); if(cur_offset != offset) { syslog(LOG_ERR, "ERROR: user trying to random access write to a file %d!=%d for %s %s:%d\n",cur_offset, offset,path, __FILE__, __LINE__); return -EIO; } tSize length = hdfsWrite(dfs->fs, file_handle, buf, size); if(length != size) { syslog(LOG_ERR, "ERROR: fuse problem - could not write all the bytes for %s %d!=%d%s:%d\n",path,length,size, __FILE__, __LINE__); return -EIO; } return 0; } int dfs_release (const char *path, struct fuse_file_info *fi) { // retrieve dfs specific data dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data; syslog(LOG_DEBUG,"in dfs_release"); // check params and the context var assert(path); assert(dfs); assert('/' == *path); // if not connected, try to connect and fail out if we can't. if (NULL == dfs->fs && NULL == (dfs->fs = hdfsConnect(dfs->nn_hostname,dfs->nn_port))) { syslog(LOG_ERR, "ERROR: could not connect to dfs %s:%d\n", __FILE__, __LINE__); return -EIO; } hdfsFile file_handle = (hdfsFile)fi->fh; if(NULL == file_handle) { // syslog(LOG_ERR, "ERROR: fuse problem - no file_handle for %s %s:%d\n",path, __FILE__, __LINE__); // return -EIO; return 0; } syslog(LOG_DEBUG,"HELLO: file_handle = %d",fi->fh); if(hdfsCloseFile(dfs->fs, file_handle) != 0) { syslog(LOG_ERR, "ERROR: dfs problem - could not close file_handle for %s %s:%d\n",path, __FILE__, __LINE__); return -EIO; } fi->fh = (uint64_t)0; return 0; } static int dfs_mknod(const char *path, mode_t mode, dev_t rdev) { syslog(LOG_DEBUG,"in dfs_mknod"); return 0; } static int dfs_create(const char *path, mode_t mode, struct fuse_file_info *fi) { syslog(LOG_DEBUG,"in dfs_create!!!"); fi->flags |= mode; dfs_open(path, fi); return dfs_release(path,fi); } int dfs_flush(const char *path, struct fuse_file_info *fi) { return 0; } //void dfs_setattr(fuse_req_t req, fuse_ino_t ino,struct stat *attr, int to_set, struct fuse_file_info *fi) //{ //} #endif static struct fuse_operations dfs_oper = { .getattr = dfs_getattr, .access = dfs_access, .readdir = dfs_readdir, .open = dfs_open, .read = dfs_read, .statfs = dfs_statfs, .mkdir = dfs_mkdir, .rmdir = dfs_rmdir, .rename = dfs_rename, .unlink = dfs_unlink, // .release = dfs_release, // .create = dfs_create, // .write = dfs_write, // .flush = dfs_flush, // .setattr = dfs_setattr, // .mknod = dfs_mknod, // .chmod = dfs_chmod, // .chown = dfs_chown, // .truncate = dfs_truncate, }; static void print_usage(char *pname) { fprintf(stdout,"USAGE: %s [--persistent_connection] [--debug] [--help] --server --port [fuse options]\n",pname); fprintf(stdout,"NOTE: a useful fuse option is -o allow_others\n"); } int main(int argc, char *argv[]) { umask(0); openlog(argv[0], LOG_PERROR | LOG_PID, LOG_USER); // // Create a private struct of data we will pass to fuse here and which // will then be accessible on every call. // dfs_context *dfs = (dfs_context*)malloc(sizeof (dfs_context)); if(NULL == dfs) { syslog(LOG_ERR, "FATAL: could not malloc fuse dfs context struct - out of memory %s:%d", __FILE__, __LINE__); exit(1); } // initialize the context dfs->debug = 0; dfs->nn_hostname = NULL; dfs->nn_port = -1; dfs->fs = NULL; dfs->debug = 1; assert(argc > 1); const char *dfs_uri = argv[1]; assert(strlen(dfs_uri) > strlen("dfs://") + 1 + strlen(":") + 1); const char *hostname = argv[1] + strlen("dfs://"); char *port = index(hostname,':'); *port = 0; port = port + 1; dfs->nn_hostname = strdup(hostname); dfs->nn_port = atoi(port); // these options are mandatory if (NULL == dfs->nn_hostname || -1 == dfs->nn_port) { print_usage(argv[0]); exit(1); } bzero(dfs->dfs_uri,0); sprintf(dfs->dfs_uri,"dfs://%s:%d/",dfs->nn_hostname,dfs->nn_port); dfs->dfs_uri_len = strlen(dfs->dfs_uri); // use ERR level to ensure it makes it into the log. syslog(LOG_ERR, "mounting %s", dfs->dfs_uri); // pretty hacky - jump past my args so can hand fuse what it expects argv += 1; argc -= 1; return fuse_main(argc, argv, &dfs_oper, dfs); }