/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #define FUSE_USE_VERSION 26 #ifdef HAVE_CONFIG_H #include #endif #ifdef linux /* For pread()/pwrite() */ #define _XOPEN_SOURCE 500 #endif #include #include #include #include #include #include #include #include #ifdef HAVE_SETXATTR #include #endif #include // for ceil #include #include #include #include // Constants // static const int default_id = 99; // nobody - not configurable since soon uids in dfs, yeah! static const size_t rd_buf_size = 32768; // // Structure to store fuse_dfs specific data // this will be created and passed to fuse at startup // and fuse will pass it back to us via the context function // on every operation. // typedef struct dfs_context_struct { int persistent_connection; // stay connected to the NN? int debug; char *nn_hostname; int nn_port; hdfsFS fs; } dfs_context; // // Start of read-only functions // static int dfs_getattr(const char *path, struct stat *st) { // retrieve dfs specific data dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data; // check params and the context var assert(dfs); assert(dfs->persistent_connection || NULL == dfs->fs); assert(path); assert(st); // if not connected, try to connect and fail out if we can't. if (NULL == dfs->fs && NULL == (dfs->fs = hdfsConnect(dfs->nn_hostname,dfs->nn_port))) { syslog(LOG_ERR, "ERROR: could not connect to dfs %s:%d\n", __FILE__, __LINE__); return -EIO; } // call the dfs API to get the actual information hdfsFileInfo *info = hdfsGetPathInfo(dfs->fs,path); // NULL means either file doesn't exist or maybe IO error. // Taking the conservatve tract here and disconnecting but a better way is to a: request a change // to the API and b: maybe use open to confirm file doesn't exist. // todo - fix this if (NULL == info) { hdfsDisconnect (dfs->fs); dfs->fs = NULL; return -ENOENT; } // initialize the stat structure memset(st, 0, sizeof(struct stat)); // setup hard link info - for a file it is 1 else num entries in a dir + 2 (for . and ..) if (info[0].mKind == kObjectKindDirectory) { int numEntries = 0; hdfsFileInfo *info = hdfsListDirectory(dfs->fs,path,&numEntries); if (info) { hdfsFreeFileInfo(info,numEntries); } st->st_nlink = numEntries + 2; } else { // not a directory st->st_nlink = 1; } // set stat metadata st->st_size = (info[0].mKind == kObjectKindDirectory) ? 4096 : info[0].mSize; st->st_blksize = 512; st->st_blocks = ceil(st->st_size/st->st_blksize); st->st_mode = (info[0].mKind == kObjectKindDirectory) ? (S_IFDIR | 0555) : (S_IFREG | 0444); st->st_uid = default_id; st->st_gid = default_id; st->st_atime = info[0].mCreationTime; st->st_mtime = info[0].mCreationTime; st->st_ctime = info[0].mCreationTime; if (dfs->debug) { syslog(LOG_DEBUG, "size=%d,blksize=%d,blocks=%d\n", (int)st->st_size, (int)st->st_blksize, (int)st->st_blocks, __FILE__, __LINE__); syslog(LOG_DEBUG, "filename=%s,%d\n",info[0].mName, (int)st->st_atime); } // free the info pointer hdfsFreeFileInfo(info,1); // if using for this function call only, disconnect it and reset if ( ! dfs->persistent_connection) { hdfsDisconnect(dfs->fs); dfs->fs = NULL; } return 0; } static int dfs_readdir(const char *path, void *buf, fuse_fill_dir_t filler, off_t offset, struct fuse_file_info *fi) { (void) offset; (void) fi; // retrieve dfs specific data dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data; // check params and the context var assert(dfs); assert(dfs->persistent_connection || NULL == dfs->fs); assert(path); assert(buf); // if not connected, try to connect and fail out if we can't. if (NULL == dfs->fs && NULL == (dfs->fs = hdfsConnect(dfs->nn_hostname,dfs->nn_port))) { syslog(LOG_ERR, "ERROR: could not connect to dfs %s:%d\n", __FILE__, __LINE__); return -EIO; } // call dfs to read the dir int numEntries = 0; hdfsFileInfo *info = hdfsListDirectory(dfs->fs,path,&numEntries); // NULL means either the directory doesn't exist or maybe IO error. if (NULL == info) { // even though this could just mean file doesn't exist, disconnect to be conservative. hdfsDisconnect(dfs->fs); dfs->fs = NULL; return 0; } int i ; for (i = 0; i < numEntries; i++) { // check the info[i] struct if (NULL == info[i].mName) { syslog(LOG_ERR,"ERROR: for <%s> info[%d].mName==NULL %s:%d", path, i, __FILE__,__LINE__); continue; } struct stat st; memset(&st, 0, sizeof(struct stat)); // set to 0 to indicate not supported for directory because we cannot (efficiently) get this info for every subdirectory st.st_nlink = (info[i].mKind == kObjectKindDirectory) ? 0 : 1; // setup stat size and acl meta data st.st_size = info[i].mSize; st.st_blksize = 512; st.st_blocks = ceil(st.st_size/st.st_blksize); st.st_mode = (info[i].mKind == kObjectKindDirectory) ? (S_IFDIR | 0555) : (S_IFREG | 0444); st.st_uid = default_id; st.st_gid = default_id; st.st_atime = info[i].mCreationTime; st.st_mtime = info[i].mCreationTime; st.st_ctime = info[i].mCreationTime; // strip off the path but be careful if the path is solely '/' const char *const str = info[i].mName + strlen(path) + ((strlen(path) == 1 && *path == '/') ? 0 : 1); // pack this entry into the fuse buffer int res = 0; if ((res = filler(buf,str,&st,0)) != 0) { syslog(LOG_ERR, "ERROR: readdir filling the buffer %d %s:%d\n",res, __FILE__, __LINE__); } if (dfs->debug) { syslog(LOG_DEBUG, "filename=%s,%x\n", str, st.st_mode); } } // insert '.' and '..' const char *const dots [] = { ".",".."}; for (i = 0 ; i < 2 ; i++) { struct stat st; memset(&st, 0, sizeof(struct stat)); // set to 0 to indicate not supported for directory because we cannot (efficiently) get this info for every subdirectory st.st_nlink = 0; // setup stat size and acl meta data st.st_size = 512; st.st_blksize = 512; st.st_blocks = 1; st.st_mode = (S_IFDIR | 0555); st.st_uid = default_id; st.st_gid = default_id; // todo fix below times st.st_atime = 0; st.st_mtime = 0; st.st_ctime = 0; const char *const str = dots[i]; // flatten the info using fuse's function into a buffer int res = 0; if ((res = filler(buf,str,&st,0)) != 0) { syslog(LOG_ERR, "ERROR: readdir filling the buffer %d %s:%d", res, __FILE__, __LINE__); } } // free the info pointers hdfsFreeFileInfo(info,numEntries); // if using for this function call only, disconnect it and reset. if ( ! dfs->persistent_connection) { hdfsDisconnect(dfs->fs); dfs->fs = NULL; } return 0; } static int dfs_open(const char *path, struct fuse_file_info *fi) { // assume the best that the file exists and just return success. // fuse still does the right thing if I try and cat a file that doesn't // exist by echoiing file does not exist. return 0; } static int dfs_read(const char *path, char *buf, size_t size, off_t offset, struct fuse_file_info *fi) { // retrieve dfs specific data dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data; // check params and the context var assert(dfs); assert(dfs->persistent_connection || NULL == dfs->fs); assert(path); assert(buf); // if not connected, try to connect and fail out if we can't. if (NULL == dfs->fs && NULL == (dfs->fs = hdfsConnect(dfs->nn_hostname,dfs->nn_port))) { syslog(LOG_ERR, "ERROR: could not connect to dfs %s:%d\n", __FILE__, __LINE__); return -EIO; } // open the file with our read buffer size which should be large. hdfsFile fh = hdfsOpenFile(dfs->fs, path, O_RDONLY, rd_buf_size, 0, 0); // NULL means either file doesn't exist or maybe IO error. if (NULL == fh) { hdfsDisconnect(dfs->fs); dfs->fs = NULL; return -ENOENT; } if (dfs->debug) { syslog(LOG_DEBUG, "Read requested %d bytes at offset %d\n", (int)size, (int)offset); } // do the actual read const tSize num_read = hdfsPread(dfs->fs, fh, offset, buf, size); // handle errors if (num_read < 0) { syslog(LOG_ERR, "Read error - pread failed for %s with return code %d %s:%d", path, num_read, __FILE__, __LINE__); hdfsDisconnect(dfs->fs); dfs->fs = NULL; return -EIO; } hdfsCloseFile(dfs->fs, fh); // if using for this function call only, disconnect it and reset. if ( ! dfs->persistent_connection) { hdfsDisconnect(dfs->fs); dfs->fs = NULL; } return num_read; } static int dfs_statfs(const char *path, struct statvfs *st) { // retrieve dfs specific data dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data; // check params and the context var assert(path); assert(st); assert(dfs); assert(dfs->persistent_connection || NULL == dfs->fs); // init the stat structure memset(st,0,sizeof(struct statvfs)); // if not connected, try to connect and fail out if we can't. if (NULL == dfs->fs && NULL == (dfs->fs = hdfsConnect(dfs->nn_hostname,dfs->nn_port))) { syslog(LOG_ERR, "ERROR: could not connect to dfs %s:%d\n", __FILE__, __LINE__); return -EIO; } const long cap = hdfsGetCapacity(dfs->fs); const long used = hdfsGetUsed(dfs->fs); const long bsize = hdfsGetDefaultBlockSize(dfs->fs); // fill in the statvfs structure /* FOR REFERENCE: struct statvfs { unsigned long f_bsize; // file system block size unsigned long f_frsize; // fragment size fsblkcnt_t f_blocks; // size of fs in f_frsize units fsblkcnt_t f_bfree; // # free blocks fsblkcnt_t f_bavail; // # free blocks for non-root fsfilcnt_t f_files; // # inodes fsfilcnt_t f_ffree; // # free inodes fsfilcnt_t f_favail; // # free inodes for non-root unsigned long f_fsid; // file system id unsigned long f_flag; / mount flags unsigned long f_namemax; // maximum filename length }; */ st->f_bsize = bsize; st->f_frsize = st->f_bsize; st->f_blocks = cap/st->f_bsize; st->f_bfree = (cap-used)/st->f_bsize; st->f_bavail = st->f_bfree; st->f_files = 1000; st->f_ffree = 500; st->f_favail = 500; st->f_fsid = 1023; st->f_flag = ST_RDONLY | ST_NOSUID; st->f_namemax = 1023; // if using for only this function call, disconnect it and reset. if ( ! dfs->persistent_connection) { hdfsDisconnect(dfs->fs); dfs->fs = NULL; } return 0; } static int dfs_access(const char *path, int mask) { // no permissions on dfs, always a success return 0; } /* // // The remainder are write functionality and therefore not implemented right now // static int dfs_mkdir(const char *path, mode_t mode) { return -ENOTSUP; } static int dfs_rmdir(const char *path) { return -ENOTSUP; } static int dfs_rename(const char *from, const char *to) { return -ENOTSUP; } static int dfs_chmod(const char *path, mode_t mode) { return -ENOTSUP; } static int dfs_chown(const char *path, uid_t uid, gid_t gid) { return -ENOTSUP; } static int dfs_truncate(const char *path, off_t size) { return -ENOTSUP; } static int dfs_write(const char *path, const char *buf, size_t size, off_t offset, struct fuse_file_info *fi) { return -ENOTSUP; } */ static struct fuse_operations dfs_oper = { .getattr = dfs_getattr, .access = dfs_access, .readdir = dfs_readdir, .open = dfs_open, .read = dfs_read, .statfs = dfs_statfs, // .mkdir = dfs_mkdir, // .rmdir = dfs_rmdir, // .rename = dfs_rename, // .chmod = dfs_chmod, // .chown = dfs_chown, // .truncate = dfs_truncate, // .write = dfs_write, }; static void print_usage(char *pname) { fprintf(stdout,"USAGE: %s [--persistent_connection] [--debug] [--help] --server --port [fuse options]\n",pname); fprintf(stdout,"NOTE: a useful fuse option is -o allow_others\n"); } int main(int argc, char *argv[]) { umask(0); openlog(argv[0], LOG_PERROR | LOG_PID, LOG_USER); // // Create a private struct of data we will pass to fuse here and which // will then be accessible on every call. // dfs_context *dfs = (dfs_context*)malloc(sizeof (dfs_context)); if(NULL == dfs) { syslog(LOG_ERR, "FATAL: could not malloc fuse dfs context struct - out of memory %s:%d", __FILE__, __LINE__); exit(1); } // initialize the context dfs->persistent_connection = 1; dfs->debug = 0; dfs->nn_hostname = NULL; dfs->nn_port = -1; dfs->fs = NULL; // NOTE on options: // User must specify dfs related options before any fuse related options. // This simplifies the sharing of options. (admittedly not great). int next_option; const char* const short_options = "+rtzq:l:"; const struct option long_options[] = { { "persistent_connection", no_argument, NULL, 'r' }, { "debug", no_argument, NULL, 't' }, { "help", no_argument, NULL, 'z' }, { "server", required_argument, NULL, 'q' }, { "port", required_argument, NULL, 'l' }, { NULL, 0, NULL, 'o' }, }; // TODO: fix option processing. I looked at the new fuse options processing helper // function, but they looked a bit hacky. But, I guess there's no choice :( // hacky var to count how many of argc fuse_dfs is responsible for int dfs_args_done = 0; int myopts_count = 0; while ( ! dfs_args_done && 0 < (next_option = getopt_long(argc, argv, short_options, long_options, NULL))) { switch (next_option) { case 'r': myopts_count++; dfs->persistent_connection = 1; break; case 't': myopts_count++; dfs->debug = 1; break; case 'z': print_usage(argv[0]); exit(0); case 'q': myopts_count += 2; dfs->nn_hostname = optarg; break; case 'l': myopts_count += 2; dfs->nn_port = atoi(optarg); break; default: // do this so that the second we see a non-dfs option, we drop out and assume the // rest are fuse options. This is better than the alternative of having dfs and fuse // options intermixed in which case the hacky jump past dfs options trick won't work. dfs_args_done = 1; break; } } // these options are mandatory if (NULL == dfs->nn_hostname || -1 == dfs->nn_port) { print_usage(argv[0]); exit(1); } // use ERR level to ensure it makes it into the log. syslog(LOG_ERR, "mounting %s:%d", dfs->nn_hostname, dfs->nn_port); // pretty hacky - jump past my args so can hand fuse what it expects argv += myopts_count; argc -= myopts_count; return fuse_main(argc, argv, &dfs_oper, dfs); closelog(); }